#!/usr/bin/env python3 """ MySQL to PostgreSQL Migration Script OrbiQuant IA Trading Platform Migrates market data from MySQL (remote) to PostgreSQL (local). Usage: python migrate_mysql_to_postgres.py --full # Full migration python migrate_mysql_to_postgres.py --incremental # Only new data python migrate_mysql_to_postgres.py --ticker BTCUSD # Specific ticker """ import os import sys import argparse import logging from datetime import datetime, timedelta from typing import Optional, List, Dict, Any import pandas as pd import numpy as np from tqdm import tqdm # Database connections import mysql.connector import psycopg2 from psycopg2.extras import execute_values # Configuration MYSQL_CONFIG = { 'host': '72.60.226.4', 'port': 3306, 'user': 'root', 'password': 'AfcItz2391,.', 'database': 'db_trading_meta' } POSTGRES_CONFIG = { 'host': 'localhost', 'port': 5432, 'user': 'orbiquant_user', 'password': 'orbiquant_dev_2025', 'database': 'orbiquant_trading' } # Ticker mapping (MySQL symbol -> asset_type) TICKER_MAPPING = { 'X:BTCUSD': ('BTCUSD', 'crypto', 'BTC', 'USD'), 'C:EURUSD': ('EURUSD', 'forex', 'EUR', 'USD'), 'C:GBPUSD': ('GBPUSD', 'forex', 'GBP', 'USD'), 'C:USDJPY': ('USDJPY', 'forex', 'USD', 'JPY'), 'C:USDCAD': ('USDCAD', 'forex', 'USD', 'CAD'), 'C:AUDUSD': ('AUDUSD', 'forex', 'AUD', 'USD'), 'C:NZDUSD': ('NZDUSD', 'forex', 'NZD', 'USD'), 'C:EURGBP': ('EURGBP', 'forex', 'EUR', 'GBP'), 'C:EURAUD': ('EURAUD', 'forex', 'EUR', 'AUD'), 'C:EURCHF': ('EURCHF', 'forex', 'EUR', 'CHF'), 'C:GBPJPY': ('GBPJPY', 'forex', 'GBP', 'JPY'), 'C:GBPAUD': ('GBPAUD', 'forex', 'GBP', 'AUD'), 'C:GBPCAD': ('GBPCAD', 'forex', 'GBP', 'CAD'), 'C:GBPNZD': ('GBPNZD', 'forex', 'GBP', 'NZD'), 'C:AUDCAD': ('AUDCAD', 'forex', 'AUD', 'CAD'), 'C:AUDCHF': ('AUDCHF', 'forex', 'AUD', 'CHF'), 'C:AUDNZD': ('AUDNZD', 'forex', 'AUD', 'NZD'), 'C:XAUUSD': ('XAUUSD', 'commodity', 'XAU', 'USD'), } # Logging setup logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class MySQLConnection: """MySQL connection manager.""" def __init__(self, config: Dict[str, Any]): self.config = config self.conn = None def __enter__(self): self.conn = mysql.connector.connect(**self.config) return self.conn def __exit__(self, exc_type, exc_val, exc_tb): if self.conn: self.conn.close() class PostgresConnection: """PostgreSQL connection manager.""" def __init__(self, config: Dict[str, Any]): self.config = config self.conn = None def __enter__(self): self.conn = psycopg2.connect(**self.config) return self.conn def __exit__(self, exc_type, exc_val, exc_tb): if self.conn: self.conn.close() def create_database_if_not_exists(): """Create PostgreSQL database if it doesn't exist.""" config = POSTGRES_CONFIG.copy() db_name = config.pop('database') try: conn = psycopg2.connect(**config, database='postgres') conn.autocommit = True cursor = conn.cursor() # Check if database exists cursor.execute( "SELECT 1 FROM pg_catalog.pg_database WHERE datname = %s", (db_name,) ) if not cursor.fetchone(): cursor.execute(f'CREATE DATABASE {db_name}') logger.info(f"Created database: {db_name}") else: logger.info(f"Database {db_name} already exists") cursor.close() conn.close() except Exception as e: logger.error(f"Error creating database: {e}") raise def run_migrations(): """Run SQL migrations.""" migration_file = os.path.join( os.path.dirname(__file__), '..', 'migrations', '001_create_market_data_schema.sql' ) with PostgresConnection(POSTGRES_CONFIG) as conn: cursor = conn.cursor() # Read and execute migration with open(migration_file, 'r') as f: sql = f.read() try: cursor.execute(sql) conn.commit() logger.info("Migrations completed successfully") except Exception as e: conn.rollback() logger.warning(f"Migration error (may already exist): {e}") cursor.close() def insert_tickers(): """Insert ticker master data.""" with PostgresConnection(POSTGRES_CONFIG) as conn: cursor = conn.cursor() for mysql_symbol, (symbol, asset_type, base, quote) in TICKER_MAPPING.items(): try: cursor.execute(""" INSERT INTO market_data.tickers (symbol, name, asset_type, base_currency, quote_currency) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (symbol) DO NOTHING """, (symbol, f"{base}/{quote}", asset_type, base, quote)) except Exception as e: logger.warning(f"Error inserting ticker {symbol}: {e}") conn.commit() cursor.close() logger.info(f"Inserted {len(TICKER_MAPPING)} tickers") def get_ticker_id_map() -> Dict[str, int]: """Get mapping of symbol to ticker_id.""" with PostgresConnection(POSTGRES_CONFIG) as conn: cursor = conn.cursor() cursor.execute("SELECT symbol, id FROM market_data.tickers") result = {row[0]: row[1] for row in cursor.fetchall()} cursor.close() return result def get_last_timestamp(ticker_id: int) -> Optional[datetime]: """Get the last timestamp for a ticker in PostgreSQL.""" with PostgresConnection(POSTGRES_CONFIG) as conn: cursor = conn.cursor() cursor.execute(""" SELECT MAX(timestamp) FROM market_data.ohlcv_5m WHERE ticker_id = %s """, (ticker_id,)) result = cursor.fetchone()[0] cursor.close() return result def migrate_ohlcv_data( mysql_ticker: str, pg_ticker_id: int, start_date: Optional[datetime] = None, batch_size: int = 50000 ): """Migrate OHLCV data for a specific ticker.""" # Build query query = f""" SELECT date_agg, open, high, low, close, volume, vwap, ts FROM tickers_agg_data WHERE ticker = '{mysql_ticker}' """ if start_date: query += f" AND date_agg > '{start_date.strftime('%Y-%m-%d %H:%M:%S')}'" query += " ORDER BY date_agg" with MySQLConnection(MYSQL_CONFIG) as mysql_conn: mysql_cursor = mysql_conn.cursor() mysql_cursor.execute(query) total_rows = 0 batch = [] with PostgresConnection(POSTGRES_CONFIG) as pg_conn: pg_cursor = pg_conn.cursor() for row in tqdm(mysql_cursor, desc=f"Migrating {mysql_ticker}"): date_agg, open_p, high, low, close, volume, vwap, ts = row batch.append(( pg_ticker_id, date_agg, float(open_p), float(high), float(low), float(close), float(volume), float(vwap) if vwap else None, int(ts) if ts else None )) if len(batch) >= batch_size: insert_batch(pg_cursor, batch) pg_conn.commit() total_rows += len(batch) batch = [] # Insert remaining if batch: insert_batch(pg_cursor, batch) pg_conn.commit() total_rows += len(batch) pg_cursor.close() mysql_cursor.close() logger.info(f"Migrated {total_rows} rows for {mysql_ticker}") return total_rows def insert_batch(cursor, batch: List[tuple]): """Insert batch of OHLCV data.""" query = """ INSERT INTO market_data.ohlcv_5m (ticker_id, timestamp, open, high, low, close, volume, vwap, ts_epoch) VALUES %s ON CONFLICT (ticker_id, timestamp) DO NOTHING """ execute_values(cursor, query, batch) def migrate_all(incremental: bool = False, tickers: Optional[List[str]] = None): """Migrate all data from MySQL to PostgreSQL.""" logger.info("Starting migration...") # Step 1: Create database and run migrations create_database_if_not_exists() run_migrations() # Step 2: Insert tickers insert_tickers() # Step 3: Get ticker ID mapping ticker_id_map = get_ticker_id_map() logger.info(f"Ticker ID map: {ticker_id_map}") # Step 4: Migrate OHLCV data total_migrated = 0 for mysql_symbol, (pg_symbol, _, _, _) in TICKER_MAPPING.items(): if tickers and pg_symbol not in tickers: continue if pg_symbol not in ticker_id_map: logger.warning(f"Ticker {pg_symbol} not found in PostgreSQL") continue ticker_id = ticker_id_map[pg_symbol] # Check for incremental start_date = None if incremental: start_date = get_last_timestamp(ticker_id) if start_date: logger.info(f"Incremental from {start_date} for {pg_symbol}") rows = migrate_ohlcv_data(mysql_symbol, ticker_id, start_date) total_migrated += rows logger.info(f"Migration complete. Total rows: {total_migrated}") def verify_migration(): """Verify migration by comparing row counts.""" logger.info("Verifying migration...") with MySQLConnection(MYSQL_CONFIG) as mysql_conn: mysql_cursor = mysql_conn.cursor() with PostgresConnection(POSTGRES_CONFIG) as pg_conn: pg_cursor = pg_conn.cursor() for mysql_symbol, (pg_symbol, _, _, _) in TICKER_MAPPING.items(): # MySQL count mysql_cursor.execute( f"SELECT COUNT(*) FROM tickers_agg_data WHERE ticker = '{mysql_symbol}'" ) mysql_count = mysql_cursor.fetchone()[0] # PostgreSQL count pg_cursor.execute(""" SELECT COUNT(*) FROM market_data.ohlcv_5m o JOIN market_data.tickers t ON t.id = o.ticker_id WHERE t.symbol = %s """, (pg_symbol,)) pg_count = pg_cursor.fetchone()[0] status = "✅" if mysql_count == pg_count else "❌" logger.info(f"{status} {pg_symbol}: MySQL={mysql_count}, PostgreSQL={pg_count}") pg_cursor.close() mysql_cursor.close() def main(): parser = argparse.ArgumentParser(description='MySQL to PostgreSQL Migration') parser.add_argument('--full', action='store_true', help='Full migration') parser.add_argument('--incremental', action='store_true', help='Incremental migration') parser.add_argument('--ticker', type=str, help='Specific ticker to migrate') parser.add_argument('--verify', action='store_true', help='Verify migration') parser.add_argument('--schema-only', action='store_true', help='Only create schema') args = parser.parse_args() if args.schema_only: create_database_if_not_exists() run_migrations() insert_tickers() logger.info("Schema created successfully") return if args.verify: verify_migration() return tickers = [args.ticker] if args.ticker else None incremental = args.incremental migrate_all(incremental=incremental, tickers=tickers) if __name__ == '__main__': main()