trading-platform-database-v2/scripts/migrate_mysql_to_postgres.py
rckrdmrd 45e77e9a9c feat: Initial commit - Database schemas and scripts
DDL schemas for Trading Platform:
- User management
- Authentication
- Payments
- Education
- ML predictions
- Trading data

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-18 04:30:23 -06:00

394 lines
12 KiB
Python

#!/usr/bin/env python3
"""
MySQL to PostgreSQL Migration Script
OrbiQuant IA Trading Platform
Migrates market data from MySQL (remote) to PostgreSQL (local).
Usage:
python migrate_mysql_to_postgres.py --full # Full migration
python migrate_mysql_to_postgres.py --incremental # Only new data
python migrate_mysql_to_postgres.py --ticker BTCUSD # Specific ticker
"""
import os
import sys
import argparse
import logging
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
import pandas as pd
import numpy as np
from tqdm import tqdm
# Database connections
import mysql.connector
import psycopg2
from psycopg2.extras import execute_values
# Configuration
MYSQL_CONFIG = {
'host': '72.60.226.4',
'port': 3306,
'user': 'root',
'password': 'AfcItz2391,.',
'database': 'db_trading_meta'
}
POSTGRES_CONFIG = {
'host': 'localhost',
'port': 5432,
'user': 'orbiquant_user',
'password': 'orbiquant_dev_2025',
'database': 'orbiquant_trading'
}
# Ticker mapping (MySQL symbol -> asset_type)
TICKER_MAPPING = {
'X:BTCUSD': ('BTCUSD', 'crypto', 'BTC', 'USD'),
'C:EURUSD': ('EURUSD', 'forex', 'EUR', 'USD'),
'C:GBPUSD': ('GBPUSD', 'forex', 'GBP', 'USD'),
'C:USDJPY': ('USDJPY', 'forex', 'USD', 'JPY'),
'C:USDCAD': ('USDCAD', 'forex', 'USD', 'CAD'),
'C:AUDUSD': ('AUDUSD', 'forex', 'AUD', 'USD'),
'C:NZDUSD': ('NZDUSD', 'forex', 'NZD', 'USD'),
'C:EURGBP': ('EURGBP', 'forex', 'EUR', 'GBP'),
'C:EURAUD': ('EURAUD', 'forex', 'EUR', 'AUD'),
'C:EURCHF': ('EURCHF', 'forex', 'EUR', 'CHF'),
'C:GBPJPY': ('GBPJPY', 'forex', 'GBP', 'JPY'),
'C:GBPAUD': ('GBPAUD', 'forex', 'GBP', 'AUD'),
'C:GBPCAD': ('GBPCAD', 'forex', 'GBP', 'CAD'),
'C:GBPNZD': ('GBPNZD', 'forex', 'GBP', 'NZD'),
'C:AUDCAD': ('AUDCAD', 'forex', 'AUD', 'CAD'),
'C:AUDCHF': ('AUDCHF', 'forex', 'AUD', 'CHF'),
'C:AUDNZD': ('AUDNZD', 'forex', 'AUD', 'NZD'),
'C:XAUUSD': ('XAUUSD', 'commodity', 'XAU', 'USD'),
}
# Logging setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MySQLConnection:
"""MySQL connection manager."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.conn = None
def __enter__(self):
self.conn = mysql.connector.connect(**self.config)
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
if self.conn:
self.conn.close()
class PostgresConnection:
"""PostgreSQL connection manager."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.conn = None
def __enter__(self):
self.conn = psycopg2.connect(**self.config)
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
if self.conn:
self.conn.close()
def create_database_if_not_exists():
"""Create PostgreSQL database if it doesn't exist."""
config = POSTGRES_CONFIG.copy()
db_name = config.pop('database')
try:
conn = psycopg2.connect(**config, database='postgres')
conn.autocommit = True
cursor = conn.cursor()
# Check if database exists
cursor.execute(
"SELECT 1 FROM pg_catalog.pg_database WHERE datname = %s",
(db_name,)
)
if not cursor.fetchone():
cursor.execute(f'CREATE DATABASE {db_name}')
logger.info(f"Created database: {db_name}")
else:
logger.info(f"Database {db_name} already exists")
cursor.close()
conn.close()
except Exception as e:
logger.error(f"Error creating database: {e}")
raise
def run_migrations():
"""Run SQL migrations."""
migration_file = os.path.join(
os.path.dirname(__file__),
'..', 'migrations', '001_create_market_data_schema.sql'
)
with PostgresConnection(POSTGRES_CONFIG) as conn:
cursor = conn.cursor()
# Read and execute migration
with open(migration_file, 'r') as f:
sql = f.read()
try:
cursor.execute(sql)
conn.commit()
logger.info("Migrations completed successfully")
except Exception as e:
conn.rollback()
logger.warning(f"Migration error (may already exist): {e}")
cursor.close()
def insert_tickers():
"""Insert ticker master data."""
with PostgresConnection(POSTGRES_CONFIG) as conn:
cursor = conn.cursor()
for mysql_symbol, (symbol, asset_type, base, quote) in TICKER_MAPPING.items():
try:
cursor.execute("""
INSERT INTO market_data.tickers
(symbol, name, asset_type, base_currency, quote_currency)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (symbol) DO NOTHING
""", (symbol, f"{base}/{quote}", asset_type, base, quote))
except Exception as e:
logger.warning(f"Error inserting ticker {symbol}: {e}")
conn.commit()
cursor.close()
logger.info(f"Inserted {len(TICKER_MAPPING)} tickers")
def get_ticker_id_map() -> Dict[str, int]:
"""Get mapping of symbol to ticker_id."""
with PostgresConnection(POSTGRES_CONFIG) as conn:
cursor = conn.cursor()
cursor.execute("SELECT symbol, id FROM market_data.tickers")
result = {row[0]: row[1] for row in cursor.fetchall()}
cursor.close()
return result
def get_last_timestamp(ticker_id: int) -> Optional[datetime]:
"""Get the last timestamp for a ticker in PostgreSQL."""
with PostgresConnection(POSTGRES_CONFIG) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT MAX(timestamp) FROM market_data.ohlcv_5m
WHERE ticker_id = %s
""", (ticker_id,))
result = cursor.fetchone()[0]
cursor.close()
return result
def migrate_ohlcv_data(
mysql_ticker: str,
pg_ticker_id: int,
start_date: Optional[datetime] = None,
batch_size: int = 50000
):
"""Migrate OHLCV data for a specific ticker."""
# Build query
query = f"""
SELECT
date_agg,
open,
high,
low,
close,
volume,
vwap,
ts
FROM tickers_agg_data
WHERE ticker = '{mysql_ticker}'
"""
if start_date:
query += f" AND date_agg > '{start_date.strftime('%Y-%m-%d %H:%M:%S')}'"
query += " ORDER BY date_agg"
with MySQLConnection(MYSQL_CONFIG) as mysql_conn:
mysql_cursor = mysql_conn.cursor()
mysql_cursor.execute(query)
total_rows = 0
batch = []
with PostgresConnection(POSTGRES_CONFIG) as pg_conn:
pg_cursor = pg_conn.cursor()
for row in tqdm(mysql_cursor, desc=f"Migrating {mysql_ticker}"):
date_agg, open_p, high, low, close, volume, vwap, ts = row
batch.append((
pg_ticker_id,
date_agg,
float(open_p),
float(high),
float(low),
float(close),
float(volume),
float(vwap) if vwap else None,
int(ts) if ts else None
))
if len(batch) >= batch_size:
insert_batch(pg_cursor, batch)
pg_conn.commit()
total_rows += len(batch)
batch = []
# Insert remaining
if batch:
insert_batch(pg_cursor, batch)
pg_conn.commit()
total_rows += len(batch)
pg_cursor.close()
mysql_cursor.close()
logger.info(f"Migrated {total_rows} rows for {mysql_ticker}")
return total_rows
def insert_batch(cursor, batch: List[tuple]):
"""Insert batch of OHLCV data."""
query = """
INSERT INTO market_data.ohlcv_5m
(ticker_id, timestamp, open, high, low, close, volume, vwap, ts_epoch)
VALUES %s
ON CONFLICT (ticker_id, timestamp) DO NOTHING
"""
execute_values(cursor, query, batch)
def migrate_all(incremental: bool = False, tickers: Optional[List[str]] = None):
"""Migrate all data from MySQL to PostgreSQL."""
logger.info("Starting migration...")
# Step 1: Create database and run migrations
create_database_if_not_exists()
run_migrations()
# Step 2: Insert tickers
insert_tickers()
# Step 3: Get ticker ID mapping
ticker_id_map = get_ticker_id_map()
logger.info(f"Ticker ID map: {ticker_id_map}")
# Step 4: Migrate OHLCV data
total_migrated = 0
for mysql_symbol, (pg_symbol, _, _, _) in TICKER_MAPPING.items():
if tickers and pg_symbol not in tickers:
continue
if pg_symbol not in ticker_id_map:
logger.warning(f"Ticker {pg_symbol} not found in PostgreSQL")
continue
ticker_id = ticker_id_map[pg_symbol]
# Check for incremental
start_date = None
if incremental:
start_date = get_last_timestamp(ticker_id)
if start_date:
logger.info(f"Incremental from {start_date} for {pg_symbol}")
rows = migrate_ohlcv_data(mysql_symbol, ticker_id, start_date)
total_migrated += rows
logger.info(f"Migration complete. Total rows: {total_migrated}")
def verify_migration():
"""Verify migration by comparing row counts."""
logger.info("Verifying migration...")
with MySQLConnection(MYSQL_CONFIG) as mysql_conn:
mysql_cursor = mysql_conn.cursor()
with PostgresConnection(POSTGRES_CONFIG) as pg_conn:
pg_cursor = pg_conn.cursor()
for mysql_symbol, (pg_symbol, _, _, _) in TICKER_MAPPING.items():
# MySQL count
mysql_cursor.execute(
f"SELECT COUNT(*) FROM tickers_agg_data WHERE ticker = '{mysql_symbol}'"
)
mysql_count = mysql_cursor.fetchone()[0]
# PostgreSQL count
pg_cursor.execute("""
SELECT COUNT(*) FROM market_data.ohlcv_5m o
JOIN market_data.tickers t ON t.id = o.ticker_id
WHERE t.symbol = %s
""", (pg_symbol,))
pg_count = pg_cursor.fetchone()[0]
status = "" if mysql_count == pg_count else ""
logger.info(f"{status} {pg_symbol}: MySQL={mysql_count}, PostgreSQL={pg_count}")
pg_cursor.close()
mysql_cursor.close()
def main():
parser = argparse.ArgumentParser(description='MySQL to PostgreSQL Migration')
parser.add_argument('--full', action='store_true', help='Full migration')
parser.add_argument('--incremental', action='store_true', help='Incremental migration')
parser.add_argument('--ticker', type=str, help='Specific ticker to migrate')
parser.add_argument('--verify', action='store_true', help='Verify migration')
parser.add_argument('--schema-only', action='store_true', help='Only create schema')
args = parser.parse_args()
if args.schema_only:
create_database_if_not_exists()
run_migrations()
insert_tickers()
logger.info("Schema created successfully")
return
if args.verify:
verify_migration()
return
tickers = [args.ticker] if args.ticker else None
incremental = args.incremental
migrate_all(incremental=incremental, tickers=tickers)
if __name__ == '__main__':
main()