[개발이야기#077] 메시지 기반 처리 서비스 - 데이터베이스 이관 프로그램 작성

in kr •  6 hours ago 


안녕하세요 가야태자 @talkit 입니다.

@talkit.bank 서비스와 talkitsteem 프로젝트를 진행하고 있는데 이 프로그램들의 제어를 위해서 메시지 기반 처리 서비스를 생각하고 있습니다.

메시지 기반 처리 서비스 중에 백업을 먼저 고려하고 앞에서 말씀 드렸지만, Kafka 실행이 실패 했습니다. T.T

그래서 CDC(Change Data Capture) 툴을 python으로 작성해 보았습니다.

소스 MySQL 접속

타겟 MySQL 접속

소스에 있는 필요한 테이블 정의

타겟에 테이블이 존재한다고 생각

실제 필드에서 CDC를 진행한다면 처음 일정 부분의 데이터를 부어 놓고 일을 하겠지만, ^^ 저는 개발 서브를 만들기 위해서 시작 한거라 해당 작업은 넘어 갔습니다.

여러테이블에서 PK기준 데이터 검색

데이터가 없으면 Insert

데이터가 있으면 Update

데이터가 있으면 입력할 데이터와 전체 데이터 비교

이때 데이터 변경이 없으면 다음으로 넘어가고 있으면 Update를 수행 했습니다.

삭제 관련 고려 없음.

원래는 Delete도 고려 해야하는데 Delete는 고려하지 않았습니다.

import pymysql

import logging

  

# 로그 설정

logging.basicConfig(level=logging.INFO)

  

# 소스 MySQL에 연결하는 함수

def connect_mysql(host, user, password, database):

    try:

        connection = pymysql.connect(

            host=host,

            user=user,

            password=password,

            database=database,

            charset='utf8'

        )

        return connection

    except pymysql.MySQLError as e:

        logging.error(f"MySQL connection error: {e}")

        return None

  

# 소스 MySQL에서 데이터를 조회하는 함수

def fetch_data_from_source(connection, table_name):

    cursor = connection.cursor(pymysql.cursors.DictCursor)

    try:

        # 원하는 테이블에서 1000건 조회

        query = f"SELECT * FROM {table_name} LIMIT 1000"

        cursor.execute(query)

        data = cursor.fetchall()

        return data

    except pymysql.MySQLError as e:

        logging.error(f"Error fetching data from source: {e}")

        return None

    finally:

        cursor.close()

  

# 타겟 MySQL에서 PK가 존재하는지 조회하는 함수

def check_pk_in_target(connection, table_name, pk_value):

    cursor = connection.cursor()

    try:

        if table_name == 'binance_simple_earn_flexible':

            query = f"SELECT COUNT(*) FROM {table_name} WHERE collect_time = %s AND asset = %s AND product_id = %s"

        elif table_name == 'user_exercise_daily':

            query = f"SELECT COUNT(*) FROM {table_name} WHERE user_id = %s AND year = %s AND month = %s AND day = %s"

        elif table_name == 'user_exercise_monthly':

            query = f"SELECT COUNT(*) FROM {table_name} WHERE user_id = %s AND year = %s AND month = %s"

        elif table_name == 'users':

            query = f"SELECT COUNT(*) FROM {table_name} WHERE user_id = %s"

        else:

            return False

        cursor.execute(query, pk_value)

        count = cursor.fetchone()[0]

        return count > 0  # 존재하면 True, 없으면 False

    except pymysql.MySQLError as e:

        logging.error(f"Error checking PK in target: {e}")

        return False

    finally:

        cursor.close()

  

# 타겟 MySQL에 데이터를 Insert 하는 함수

def insert_into_target(connection, table_name, data):

    cursor = connection.cursor()

  

    try:

        columns = ', '.join(data.keys())

        values = ', '.join(['%s'] * len(data))

        query = f"INSERT INTO {table_name} ({columns}) VALUES ({values})"

        cursor.execute(query, tuple(data.values()))

        connection.commit()

        logging.info(f"Inserted data into {table_name}")

    except pymysql.MySQLError as e:

        logging.error(f"Error inserting data into target: {e}")

        connection.rollback()

    finally:

        cursor.close()

  

# 타겟 MySQL에 데이터를 Update 하는 함수

def update_target(connection, table_name, data, pk_value):

    cursor = connection.cursor()

  

    try:

        set_clause = ', '.join([f"{key} = %s" for key in data.keys()])

        if table_name == 'binance_simple_earn_flexible':

            query = f"UPDATE {table_name} SET {set_clause} WHERE collect_time = %s AND asset = %s AND product_id = %s"

        elif table_name == 'user_exercise_daily':

            query = f"UPDATE {table_name} SET {set_clause} WHERE user_id = %s AND year = %s AND month = %s AND day = %s"

        elif table_name == 'user_exercise_monthly':

            query = f"UPDATE {table_name} SET {set_clause} WHERE user_id = %s AND year = %s AND month = %s"

        elif table_name == 'users':

            query = f"UPDATE {table_name} SET {set_clause} WHERE user_id = %s"

        cursor.execute(query, tuple(data.values()) + pk_value)

        connection.commit()

        logging.info(f"Updated data in {table_name}")

    except pymysql.MySQLError as e:

        logging.error(f"Error updating data in target: {e}")

        connection.rollback()

    finally:

        cursor.close()

  

# 메인 로직

def process_data():

    source_connection = connect_mysql('10.0.0.아이피', 'admin', '비밀번호!', 'steemit_postings')

    target_connection = connect_mysql('10.0.0.아이피', 'admin', '비밀번호!', 'steemit_postings')

  

    if not source_connection or not target_connection:

        logging.error("Failed to connect to MySQL.")

        return

  

    # CDC 대상 테이블 리스트

    target_tables = ['binance_simple_earn_flexible', 'user_exercise_daily', 'user_exercise_monthly', 'users']

  

    for table in target_tables:

        # 소스 MySQL에서 데이터 조회

        data = fetch_data_from_source(source_connection, table)

        if not data:

            logging.error(f"No data found in source table {table}")

            continue

        for row in data:

            # 테이블별 PK 처리

            if table == 'binance_simple_earn_flexible':

                pk_value = (row['collect_time'], row['asset'], row['product_id'])

            elif table == 'user_exercise_daily':

                pk_value = (row['user_id'], row['year'], row['month'], row['day'])

            elif table == 'user_exercise_monthly':

                pk_value = (row['user_id'], row['year'], row['month'])

            elif table == 'users':

                pk_value = (row['user_id'],)

  

            # 타겟 MySQL에서 해당 PK가 있는지 확인

            if check_pk_in_target(target_connection, table, pk_value):

                # 타겟에서 해당 데이터가 있으면, 기존 데이터와 비교하여 다르면 Update

                logging.info(f"PK found in target for {table}, checking data.")

                if row != fetch_data_from_source(target_connection, table):  # 데이터가 다르면 업데이트

                    update_target(target_connection, table, row, pk_value)

                else:

                    logging.info(f"No changes for PK {pk_value} in table {table}. Skipping update.")

            else:

                # PK가 없다면 Insert

                insert_into_target(target_connection, table, row)

  

    source_connection.close()

    target_connection.close()

  

if __name__ == '__main__':

    process_data()

소스는 위와 같습니다.

해당 소스를 이용해서 저는 UI 개발에 박차를 가하겠습니다.

감사합니다.



Posted through the ECblog app (https://blog.etain.club)

Authors get paid when people like you upvote their post.
If you enjoyed what you read here, create your account today and start earning FREE STEEM!
Sort Order:  

[광고] STEEM 개발자 커뮤니티에 참여 하시면, 다양한 혜택을 받을 수 있습니다.

Upvoted! Thank you for supporting witness @jswit.