DDL schemas for Trading Platform: - User management - Authentication - Payments - Education - ML predictions - Trading data Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
394 lines
12 KiB
Python
394 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MySQL to PostgreSQL Migration Script
|
|
OrbiQuant IA Trading Platform
|
|
|
|
Migrates market data from MySQL (remote) to PostgreSQL (local).
|
|
|
|
Usage:
|
|
python migrate_mysql_to_postgres.py --full # Full migration
|
|
python migrate_mysql_to_postgres.py --incremental # Only new data
|
|
python migrate_mysql_to_postgres.py --ticker BTCUSD # Specific ticker
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, List, Dict, Any
|
|
import pandas as pd
|
|
import numpy as np
|
|
from tqdm import tqdm
|
|
|
|
# Database connections
|
|
import mysql.connector
|
|
import psycopg2
|
|
from psycopg2.extras import execute_values
|
|
|
|
# Configuration
|
|
MYSQL_CONFIG = {
|
|
'host': '72.60.226.4',
|
|
'port': 3306,
|
|
'user': 'root',
|
|
'password': 'AfcItz2391,.',
|
|
'database': 'db_trading_meta'
|
|
}
|
|
|
|
POSTGRES_CONFIG = {
|
|
'host': 'localhost',
|
|
'port': 5432,
|
|
'user': 'orbiquant_user',
|
|
'password': 'orbiquant_dev_2025',
|
|
'database': 'orbiquant_trading'
|
|
}
|
|
|
|
# Ticker mapping (MySQL symbol -> asset_type)
|
|
TICKER_MAPPING = {
|
|
'X:BTCUSD': ('BTCUSD', 'crypto', 'BTC', 'USD'),
|
|
'C:EURUSD': ('EURUSD', 'forex', 'EUR', 'USD'),
|
|
'C:GBPUSD': ('GBPUSD', 'forex', 'GBP', 'USD'),
|
|
'C:USDJPY': ('USDJPY', 'forex', 'USD', 'JPY'),
|
|
'C:USDCAD': ('USDCAD', 'forex', 'USD', 'CAD'),
|
|
'C:AUDUSD': ('AUDUSD', 'forex', 'AUD', 'USD'),
|
|
'C:NZDUSD': ('NZDUSD', 'forex', 'NZD', 'USD'),
|
|
'C:EURGBP': ('EURGBP', 'forex', 'EUR', 'GBP'),
|
|
'C:EURAUD': ('EURAUD', 'forex', 'EUR', 'AUD'),
|
|
'C:EURCHF': ('EURCHF', 'forex', 'EUR', 'CHF'),
|
|
'C:GBPJPY': ('GBPJPY', 'forex', 'GBP', 'JPY'),
|
|
'C:GBPAUD': ('GBPAUD', 'forex', 'GBP', 'AUD'),
|
|
'C:GBPCAD': ('GBPCAD', 'forex', 'GBP', 'CAD'),
|
|
'C:GBPNZD': ('GBPNZD', 'forex', 'GBP', 'NZD'),
|
|
'C:AUDCAD': ('AUDCAD', 'forex', 'AUD', 'CAD'),
|
|
'C:AUDCHF': ('AUDCHF', 'forex', 'AUD', 'CHF'),
|
|
'C:AUDNZD': ('AUDNZD', 'forex', 'AUD', 'NZD'),
|
|
'C:XAUUSD': ('XAUUSD', 'commodity', 'XAU', 'USD'),
|
|
}
|
|
|
|
# Logging setup
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MySQLConnection:
|
|
"""MySQL connection manager."""
|
|
|
|
def __init__(self, config: Dict[str, Any]):
|
|
self.config = config
|
|
self.conn = None
|
|
|
|
def __enter__(self):
|
|
self.conn = mysql.connector.connect(**self.config)
|
|
return self.conn
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
if self.conn:
|
|
self.conn.close()
|
|
|
|
|
|
class PostgresConnection:
|
|
"""PostgreSQL connection manager."""
|
|
|
|
def __init__(self, config: Dict[str, Any]):
|
|
self.config = config
|
|
self.conn = None
|
|
|
|
def __enter__(self):
|
|
self.conn = psycopg2.connect(**self.config)
|
|
return self.conn
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
if self.conn:
|
|
self.conn.close()
|
|
|
|
|
|
def create_database_if_not_exists():
|
|
"""Create PostgreSQL database if it doesn't exist."""
|
|
config = POSTGRES_CONFIG.copy()
|
|
db_name = config.pop('database')
|
|
|
|
try:
|
|
conn = psycopg2.connect(**config, database='postgres')
|
|
conn.autocommit = True
|
|
cursor = conn.cursor()
|
|
|
|
# Check if database exists
|
|
cursor.execute(
|
|
"SELECT 1 FROM pg_catalog.pg_database WHERE datname = %s",
|
|
(db_name,)
|
|
)
|
|
|
|
if not cursor.fetchone():
|
|
cursor.execute(f'CREATE DATABASE {db_name}')
|
|
logger.info(f"Created database: {db_name}")
|
|
else:
|
|
logger.info(f"Database {db_name} already exists")
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating database: {e}")
|
|
raise
|
|
|
|
|
|
def run_migrations():
|
|
"""Run SQL migrations."""
|
|
migration_file = os.path.join(
|
|
os.path.dirname(__file__),
|
|
'..', 'migrations', '001_create_market_data_schema.sql'
|
|
)
|
|
|
|
with PostgresConnection(POSTGRES_CONFIG) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Read and execute migration
|
|
with open(migration_file, 'r') as f:
|
|
sql = f.read()
|
|
|
|
try:
|
|
cursor.execute(sql)
|
|
conn.commit()
|
|
logger.info("Migrations completed successfully")
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.warning(f"Migration error (may already exist): {e}")
|
|
|
|
cursor.close()
|
|
|
|
|
|
def insert_tickers():
|
|
"""Insert ticker master data."""
|
|
with PostgresConnection(POSTGRES_CONFIG) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
for mysql_symbol, (symbol, asset_type, base, quote) in TICKER_MAPPING.items():
|
|
try:
|
|
cursor.execute("""
|
|
INSERT INTO market_data.tickers
|
|
(symbol, name, asset_type, base_currency, quote_currency)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
ON CONFLICT (symbol) DO NOTHING
|
|
""", (symbol, f"{base}/{quote}", asset_type, base, quote))
|
|
except Exception as e:
|
|
logger.warning(f"Error inserting ticker {symbol}: {e}")
|
|
|
|
conn.commit()
|
|
cursor.close()
|
|
logger.info(f"Inserted {len(TICKER_MAPPING)} tickers")
|
|
|
|
|
|
def get_ticker_id_map() -> Dict[str, int]:
|
|
"""Get mapping of symbol to ticker_id."""
|
|
with PostgresConnection(POSTGRES_CONFIG) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT symbol, id FROM market_data.tickers")
|
|
result = {row[0]: row[1] for row in cursor.fetchall()}
|
|
cursor.close()
|
|
return result
|
|
|
|
|
|
def get_last_timestamp(ticker_id: int) -> Optional[datetime]:
|
|
"""Get the last timestamp for a ticker in PostgreSQL."""
|
|
with PostgresConnection(POSTGRES_CONFIG) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT MAX(timestamp) FROM market_data.ohlcv_5m
|
|
WHERE ticker_id = %s
|
|
""", (ticker_id,))
|
|
result = cursor.fetchone()[0]
|
|
cursor.close()
|
|
return result
|
|
|
|
|
|
def migrate_ohlcv_data(
|
|
mysql_ticker: str,
|
|
pg_ticker_id: int,
|
|
start_date: Optional[datetime] = None,
|
|
batch_size: int = 50000
|
|
):
|
|
"""Migrate OHLCV data for a specific ticker."""
|
|
|
|
# Build query
|
|
query = f"""
|
|
SELECT
|
|
date_agg,
|
|
open,
|
|
high,
|
|
low,
|
|
close,
|
|
volume,
|
|
vwap,
|
|
ts
|
|
FROM tickers_agg_data
|
|
WHERE ticker = '{mysql_ticker}'
|
|
"""
|
|
|
|
if start_date:
|
|
query += f" AND date_agg > '{start_date.strftime('%Y-%m-%d %H:%M:%S')}'"
|
|
|
|
query += " ORDER BY date_agg"
|
|
|
|
with MySQLConnection(MYSQL_CONFIG) as mysql_conn:
|
|
mysql_cursor = mysql_conn.cursor()
|
|
mysql_cursor.execute(query)
|
|
|
|
total_rows = 0
|
|
batch = []
|
|
|
|
with PostgresConnection(POSTGRES_CONFIG) as pg_conn:
|
|
pg_cursor = pg_conn.cursor()
|
|
|
|
for row in tqdm(mysql_cursor, desc=f"Migrating {mysql_ticker}"):
|
|
date_agg, open_p, high, low, close, volume, vwap, ts = row
|
|
|
|
batch.append((
|
|
pg_ticker_id,
|
|
date_agg,
|
|
float(open_p),
|
|
float(high),
|
|
float(low),
|
|
float(close),
|
|
float(volume),
|
|
float(vwap) if vwap else None,
|
|
int(ts) if ts else None
|
|
))
|
|
|
|
if len(batch) >= batch_size:
|
|
insert_batch(pg_cursor, batch)
|
|
pg_conn.commit()
|
|
total_rows += len(batch)
|
|
batch = []
|
|
|
|
# Insert remaining
|
|
if batch:
|
|
insert_batch(pg_cursor, batch)
|
|
pg_conn.commit()
|
|
total_rows += len(batch)
|
|
|
|
pg_cursor.close()
|
|
|
|
mysql_cursor.close()
|
|
|
|
logger.info(f"Migrated {total_rows} rows for {mysql_ticker}")
|
|
return total_rows
|
|
|
|
|
|
def insert_batch(cursor, batch: List[tuple]):
|
|
"""Insert batch of OHLCV data."""
|
|
query = """
|
|
INSERT INTO market_data.ohlcv_5m
|
|
(ticker_id, timestamp, open, high, low, close, volume, vwap, ts_epoch)
|
|
VALUES %s
|
|
ON CONFLICT (ticker_id, timestamp) DO NOTHING
|
|
"""
|
|
execute_values(cursor, query, batch)
|
|
|
|
|
|
def migrate_all(incremental: bool = False, tickers: Optional[List[str]] = None):
|
|
"""Migrate all data from MySQL to PostgreSQL."""
|
|
|
|
logger.info("Starting migration...")
|
|
|
|
# Step 1: Create database and run migrations
|
|
create_database_if_not_exists()
|
|
run_migrations()
|
|
|
|
# Step 2: Insert tickers
|
|
insert_tickers()
|
|
|
|
# Step 3: Get ticker ID mapping
|
|
ticker_id_map = get_ticker_id_map()
|
|
logger.info(f"Ticker ID map: {ticker_id_map}")
|
|
|
|
# Step 4: Migrate OHLCV data
|
|
total_migrated = 0
|
|
|
|
for mysql_symbol, (pg_symbol, _, _, _) in TICKER_MAPPING.items():
|
|
if tickers and pg_symbol not in tickers:
|
|
continue
|
|
|
|
if pg_symbol not in ticker_id_map:
|
|
logger.warning(f"Ticker {pg_symbol} not found in PostgreSQL")
|
|
continue
|
|
|
|
ticker_id = ticker_id_map[pg_symbol]
|
|
|
|
# Check for incremental
|
|
start_date = None
|
|
if incremental:
|
|
start_date = get_last_timestamp(ticker_id)
|
|
if start_date:
|
|
logger.info(f"Incremental from {start_date} for {pg_symbol}")
|
|
|
|
rows = migrate_ohlcv_data(mysql_symbol, ticker_id, start_date)
|
|
total_migrated += rows
|
|
|
|
logger.info(f"Migration complete. Total rows: {total_migrated}")
|
|
|
|
|
|
def verify_migration():
|
|
"""Verify migration by comparing row counts."""
|
|
logger.info("Verifying migration...")
|
|
|
|
with MySQLConnection(MYSQL_CONFIG) as mysql_conn:
|
|
mysql_cursor = mysql_conn.cursor()
|
|
|
|
with PostgresConnection(POSTGRES_CONFIG) as pg_conn:
|
|
pg_cursor = pg_conn.cursor()
|
|
|
|
for mysql_symbol, (pg_symbol, _, _, _) in TICKER_MAPPING.items():
|
|
# MySQL count
|
|
mysql_cursor.execute(
|
|
f"SELECT COUNT(*) FROM tickers_agg_data WHERE ticker = '{mysql_symbol}'"
|
|
)
|
|
mysql_count = mysql_cursor.fetchone()[0]
|
|
|
|
# PostgreSQL count
|
|
pg_cursor.execute("""
|
|
SELECT COUNT(*) FROM market_data.ohlcv_5m o
|
|
JOIN market_data.tickers t ON t.id = o.ticker_id
|
|
WHERE t.symbol = %s
|
|
""", (pg_symbol,))
|
|
pg_count = pg_cursor.fetchone()[0]
|
|
|
|
status = "✅" if mysql_count == pg_count else "❌"
|
|
logger.info(f"{status} {pg_symbol}: MySQL={mysql_count}, PostgreSQL={pg_count}")
|
|
|
|
pg_cursor.close()
|
|
mysql_cursor.close()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='MySQL to PostgreSQL Migration')
|
|
parser.add_argument('--full', action='store_true', help='Full migration')
|
|
parser.add_argument('--incremental', action='store_true', help='Incremental migration')
|
|
parser.add_argument('--ticker', type=str, help='Specific ticker to migrate')
|
|
parser.add_argument('--verify', action='store_true', help='Verify migration')
|
|
parser.add_argument('--schema-only', action='store_true', help='Only create schema')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.schema_only:
|
|
create_database_if_not_exists()
|
|
run_migrations()
|
|
insert_tickers()
|
|
logger.info("Schema created successfully")
|
|
return
|
|
|
|
if args.verify:
|
|
verify_migration()
|
|
return
|
|
|
|
tickers = [args.ticker] if args.ticker else None
|
|
incremental = args.incremental
|
|
|
|
migrate_all(incremental=incremental, tickers=tickers)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|