#!/usr/bin/env python3 """ Historical Data Migration Script - Trading Platform Migrates legacy MySQL dumps to PostgreSQL market_data schema. Handles: - tickers_agg_data: OHLCV data (from db.sql) - tickers_agg_ind_data: Technical indicators (from db_res.sql) Usage: python scripts/migrate_historical_data.py --dry-run python scripts/migrate_historical_data.py --file db.sql python scripts/migrate_historical_data.py --file db_res.sql --include-indicators Environment variables: DB_HOST - PostgreSQL host (default: localhost) DB_PORT - PostgreSQL port (default: 5432) DB_NAME - Database name (default: trading_platform) DB_USER - Database user (default: trading_user) DB_PASSWORD - Database password (default: trading_dev_2026) Author: SIMCO Migration Tool Date: 2026-01-25 """ import os import re import sys import argparse import logging import time from datetime import datetime from pathlib import Path from typing import Generator, Optional, Dict, List, Tuple, Any from dataclasses import dataclass, field from decimal import Decimal, InvalidOperation from collections import defaultdict import psycopg2 from psycopg2 import sql from psycopg2.extras import execute_values from tqdm import tqdm from dotenv import load_dotenv # Load environment load_dotenv(Path(__file__).parent.parent / ".env") # Configuration DB_HOST = os.getenv("DB_HOST", "localhost") DB_PORT = int(os.getenv("DB_PORT", "5432")) DB_NAME = os.getenv("DB_NAME", "trading_platform") DB_USER = os.getenv("DB_USER", "trading_user") DB_PASSWORD = os.getenv("DB_PASSWORD", "trading_dev_2026") # Default dump file paths DEFAULT_OHLCV_DUMP = Path(r"C:\Empresas\WorkspaceOld\Projects\trading\db.sql") DEFAULT_INDICATORS_DUMP = Path(r"C:\Empresas\WorkspaceOld\Projects\trading\db_res.sql") # Batch configuration BATCH_SIZE = 10000 MAX_RETRIES = 3 RETRY_DELAY = 5 # seconds # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler(Path(__file__).parent / "migration.log", encoding="utf-8") ] ) logger = logging.getLogger(__name__) @dataclass class MigrationStats: """Statistics for migration progress.""" total_lines_processed: int = 0 total_inserts_parsed: int = 0 total_rows_parsed: int = 0 total_rows_inserted: int = 0 total_rows_skipped: int = 0 total_errors: int = 0 tickers_found: Dict[str, int] = field(default_factory=lambda: defaultdict(int)) start_time: float = field(default_factory=time.time) def elapsed_time(self) -> float: return time.time() - self.start_time def rows_per_second(self) -> float: elapsed = self.elapsed_time() if elapsed > 0: return self.total_rows_inserted / elapsed return 0 @dataclass class OHLCVRecord: """Represents an OHLCV record from the legacy MySQL database.""" id: int ticker: str date_agg: datetime open: Decimal close: Decimal high: Decimal low: Decimal volume: Decimal vwap: Decimal ts: int periodint: int # Optional indicator fields (from tickers_agg_ind_data) macd: Optional[Decimal] = None macd_signal: Optional[Decimal] = None macd_hist: Optional[Decimal] = None sma_10: Optional[Decimal] = None sma_20: Optional[Decimal] = None atr: Optional[Decimal] = None sar: Optional[Decimal] = None mfi: Optional[Decimal] = None rsi: Optional[Decimal] = None fractal_alcista: Optional[Decimal] = None fractal_bajista: Optional[Decimal] = None obv: Optional[Decimal] = None ad: Optional[Decimal] = None cmf: Optional[Decimal] = None volume_z: Optional[Decimal] = None volume_anomaly: Optional[bool] = None hour: Optional[datetime] = None class MySQLDumpParser: """ Streaming parser for MySQL dump files. Parses INSERT statements character by character to handle massive lines without loading them entirely into memory. """ def __init__(self, file_path: Path, target_table: str): self.file_path = file_path self.target_table = target_table self._file_size = file_path.stat().st_size if file_path.exists() else 0 self._bytes_read = 0 def get_file_size(self) -> int: return self._file_size def get_bytes_read(self) -> int: return self._bytes_read def stream_records(self) -> Generator[List[str], None, None]: """ Stream records from the MySQL dump file. Yields lists of parsed values for each row. Uses character-by-character streaming to handle massive INSERT lines. """ if not self.file_path.exists(): raise FileNotFoundError(f"Dump file not found: {self.file_path}") self._bytes_read = 0 in_target_table = False in_insert = False buffer = "" BUFFER_READ_SIZE = 65536 # 64KB read buffer with open(self.file_path, "r", encoding="utf-8", errors="replace") as f: while True: chunk = f.read(BUFFER_READ_SIZE) if not chunk: break self._bytes_read += len(chunk.encode("utf-8", errors="replace")) for char in chunk: buffer += char # Check for section markers if not in_target_table: if f"LOCK TABLES `{self.target_table}` WRITE" in buffer: in_target_table = True buffer = "" continue # Keep buffer from growing too large when not in target table if len(buffer) > 1000: buffer = buffer[-500:] continue # Check for end of target table if "UNLOCK TABLES" in buffer: in_target_table = False buffer = "" continue # Check for INSERT statement start if not in_insert and "INSERT INTO" in buffer.upper(): if f"`{self.target_table}`" in buffer: # Find VALUES keyword and start parsing tuples values_idx = buffer.upper().find("VALUES") if values_idx >= 0: in_insert = True buffer = buffer[values_idx + 6:].lstrip() continue else: buffer = "" continue # Parse tuples when in INSERT mode if in_insert: # Process complete tuples from buffer while True: tuple_result = self._extract_tuple(buffer) if tuple_result is None: break values, remaining = tuple_result buffer = remaining if values: yield values # Check for end of INSERT statement if buffer.lstrip().startswith(";"): in_insert = False buffer = buffer.lstrip()[1:] # Skip semicolon break # Handle any remaining buffer content if in_insert and buffer.strip(): tuple_result = self._extract_tuple(buffer) if tuple_result: values, _ = tuple_result if values: yield values def _extract_tuple(self, buffer: str) -> Optional[Tuple[List[str], str]]: """ Extract the first complete tuple from buffer. Returns (parsed_values, remaining_buffer) or None if no complete tuple. """ buffer = buffer.lstrip() # Skip comma between tuples if buffer.startswith(","): buffer = buffer[1:].lstrip() if not buffer.startswith("("): return None # Find matching closing parenthesis depth = 0 in_string = False string_char = None i = 0 while i < len(buffer): char = buffer[i] if not in_string: if char in ("'", '"'): in_string = True string_char = char elif char == "(": depth += 1 elif char == ")": depth -= 1 if depth == 0: # Found complete tuple tuple_content = buffer[1:i] # Exclude parentheses remaining = buffer[i + 1:] values = self._parse_values(tuple_content) return (values, remaining) else: # Inside string if char == string_char: # Check for escaped quote if i + 1 < len(buffer) and buffer[i + 1] == string_char: i += 1 # Skip escaped quote elif i > 0 and buffer[i - 1] == "\\": pass # Backslash escaped, continue else: in_string = False string_char = None i += 1 # No complete tuple found yet return None def _parse_values(self, values_str: str) -> Optional[List[str]]: """ Parse a comma-separated values string, handling quoted strings and NULL. Format: value1,'string value',value3,... """ values = [] current_value = "" in_quotes = False quote_char = None i = 0 just_ended_quote = False while i < len(values_str): char = values_str[i] if not in_quotes: if just_ended_quote: # After a quoted string, skip to comma or end if char == ",": just_ended_quote = False i += 1 continue else: i += 1 continue if char in ("'", '"'): # Start of quoted string in_quotes = True quote_char = char current_value = "" elif char == ",": # End of unquoted value val = current_value.strip() if val.upper() == "NULL": values.append(None) else: values.append(val if val else None) current_value = "" else: current_value += char else: # Inside quotes if char == quote_char: # Check for escaped quote (double quote) if i + 1 < len(values_str) and values_str[i + 1] == quote_char: current_value += char i += 1 else: # End of quoted string in_quotes = False values.append(current_value) current_value = "" just_ended_quote = True elif char == "\\" and i + 1 < len(values_str) and values_str[i + 1] == quote_char: # Backslash escaped quote current_value += values_str[i + 1] i += 1 else: current_value += char i += 1 # Handle last value (unquoted) if current_value.strip() and not just_ended_quote: val = current_value.strip() if val.upper() == "NULL": values.append(None) else: values.append(val) return values if values else None class HistoricalDataMigrator: """ Migrates historical OHLCV data from MySQL dumps to PostgreSQL. """ def __init__( self, dry_run: bool = False, include_indicators: bool = False, verbose: bool = False ): self.dry_run = dry_run self.include_indicators = include_indicators self.verbose = verbose self.stats = MigrationStats() self.conn: Optional[psycopg2.extensions.connection] = None self.ticker_id_cache: Dict[str, int] = {} self.indicator_cache: Dict[int, Dict] = {} # Cache indicators by legacy id def connect(self) -> None: """Establish database connection.""" logger.info(f"Connecting to PostgreSQL: {DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}") if self.dry_run: logger.info("DRY RUN mode - no database changes will be made") return self.conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, database=DB_NAME, user=DB_USER, password=DB_PASSWORD ) self.conn.autocommit = False # Load existing ticker mappings self._load_ticker_cache() def disconnect(self) -> None: """Close database connection.""" if self.conn: self.conn.close() self.conn = None def _load_ticker_cache(self) -> None: """Load ticker symbol to ID mapping from database.""" if not self.conn: return with self.conn.cursor() as cur: cur.execute(""" SELECT symbol, id FROM market_data.tickers """) for row in cur.fetchall(): self.ticker_id_cache[row[0].upper()] = row[1] logger.info(f"Loaded {len(self.ticker_id_cache)} tickers from database") def _ensure_ticker_exists(self, ticker: str) -> Optional[int]: """ Get ticker ID, creating the ticker if it doesn't exist. Returns None if ticker cannot be created. """ ticker_upper = ticker.upper() if ticker_upper in self.ticker_id_cache: return self.ticker_id_cache[ticker_upper] if self.dry_run: # In dry run, simulate auto-increment fake_id = len(self.ticker_id_cache) + 1000 self.ticker_id_cache[ticker_upper] = fake_id logger.info(f"DRY RUN: Would create ticker {ticker_upper} with ID {fake_id}") return fake_id # Determine asset type based on ticker name asset_type = self._infer_asset_type(ticker_upper) base_currency, quote_currency = self._parse_currencies(ticker_upper) try: with self.conn.cursor() as cur: cur.execute(""" INSERT INTO market_data.tickers (symbol, name, asset_type, base_currency, quote_currency, is_active, is_ml_enabled) VALUES (%s, %s, %s, %s, %s, true, true) ON CONFLICT (symbol) DO UPDATE SET updated_at = NOW() RETURNING id """, ( ticker_upper, f"{base_currency}/{quote_currency} (Legacy Import)", asset_type, base_currency, quote_currency )) ticker_id = cur.fetchone()[0] self.conn.commit() self.ticker_id_cache[ticker_upper] = ticker_id logger.info(f"Created ticker {ticker_upper} with ID {ticker_id}") return ticker_id except Exception as e: logger.error(f"Failed to create ticker {ticker_upper}: {e}") self.conn.rollback() return None def _infer_asset_type(self, ticker: str) -> str: """Infer asset type from ticker symbol.""" if ticker.startswith("XAU") or ticker.startswith("XAG"): return "commodity" elif ticker.startswith("BTC") or ticker.startswith("ETH"): return "crypto" elif ticker in ("SPX500", "NAS100", "DJI30", "DAX40"): return "index" else: return "forex" def _parse_currencies(self, ticker: str) -> Tuple[str, str]: """Parse base and quote currency from ticker.""" if len(ticker) >= 6: return ticker[:3], ticker[3:6] return ticker, "USD" def _ensure_historical_table_exists(self) -> None: """Create the ohlcv_historical table if it doesn't exist.""" if self.dry_run or not self.conn: return with self.conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS market_data.ohlcv_historical ( id BIGSERIAL PRIMARY KEY, ticker_id INTEGER NOT NULL REFERENCES market_data.tickers(id), timestamp TIMESTAMPTZ NOT NULL, open DECIMAL(20,8) NOT NULL, high DECIMAL(20,8) NOT NULL, low DECIMAL(20,8) NOT NULL, close DECIMAL(20,8) NOT NULL, volume DECIMAL(20,4) DEFAULT 0, vwap DECIMAL(20,8), ts_epoch BIGINT, period_interval INTEGER DEFAULT 5, -- Technical Indicators macd DECIMAL(20,8), macd_signal DECIMAL(20,8), macd_hist DECIMAL(20,8), sma_10 DECIMAL(20,8), sma_20 DECIMAL(20,8), atr DECIMAL(20,8), sar DECIMAL(20,8), mfi DECIMAL(20,8), rsi DECIMAL(20,8), fractal_alcista DECIMAL(20,8), fractal_bajista DECIMAL(20,8), obv DECIMAL(20,8), ad DECIMAL(20,8), cmf DECIMAL(20,8), volume_z DECIMAL(20,8), volume_anomaly BOOLEAN, -- Metadata source VARCHAR(50) DEFAULT 'legacy_mysql', legacy_id INTEGER, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), CONSTRAINT ohlcv_historical_unique UNIQUE (ticker_id, timestamp, period_interval) ); CREATE INDEX IF NOT EXISTS idx_ohlcv_historical_ticker_ts ON market_data.ohlcv_historical(ticker_id, timestamp DESC); CREATE INDEX IF NOT EXISTS idx_ohlcv_historical_timestamp ON market_data.ohlcv_historical(timestamp DESC); CREATE INDEX IF NOT EXISTS idx_ohlcv_historical_source ON market_data.ohlcv_historical(source); CREATE INDEX IF NOT EXISTS idx_ohlcv_historical_legacy_id ON market_data.ohlcv_historical(legacy_id) WHERE legacy_id IS NOT NULL; COMMENT ON TABLE market_data.ohlcv_historical IS 'Historical OHLCV data migrated from legacy MySQL database'; """) self.conn.commit() logger.info("Ensured ohlcv_historical table exists") def _parse_ohlcv_values(self, values: List[str]) -> Optional[OHLCVRecord]: """ Parse values from tickers_agg_data INSERT statement. Expected format: (id, ticker, date_agg, open, close, high, low, volume, vwap, ts, periodint) """ if len(values) < 11: return None try: record = OHLCVRecord( id=int(values[0]), ticker=str(values[1]).strip(), date_agg=self._parse_datetime(values[2]), open=self._parse_decimal(values[3]), close=self._parse_decimal(values[4]), high=self._parse_decimal(values[5]), low=self._parse_decimal(values[6]), volume=self._parse_decimal(values[7]), vwap=self._parse_decimal(values[8]), ts=int(values[9]), periodint=int(values[10]) ) return record except (ValueError, TypeError, InvalidOperation) as e: if self.verbose: logger.warning(f"Failed to parse OHLCV values: {values[:5]}... Error: {e}") return None def _parse_indicator_values(self, values: List[str]) -> Optional[Dict]: """ Parse values from tickers_agg_ind_data INSERT statement. Expected format: (id, MACD, MACD_signal, MACD_hist, SMA_10, SMA_20, ATR, SAR, MFI, RSI, Fractal_Alcista, Fractal_Bajista, OBV, AD, CMF, volume_z, volume_anomaly, hour) """ if len(values) < 18: return None try: return { "id": int(values[0]), "macd": self._parse_decimal_nullable(values[1]), "macd_signal": self._parse_decimal_nullable(values[2]), "macd_hist": self._parse_decimal_nullable(values[3]), "sma_10": self._parse_decimal_nullable(values[4]), "sma_20": self._parse_decimal_nullable(values[5]), "atr": self._parse_decimal_nullable(values[6]), "sar": self._parse_decimal_nullable(values[7]), "mfi": self._parse_decimal_nullable(values[8]), "rsi": self._parse_decimal_nullable(values[9]), "fractal_alcista": self._parse_decimal_nullable(values[10]), "fractal_bajista": self._parse_decimal_nullable(values[11]), "obv": self._parse_decimal_nullable(values[12]), "ad": self._parse_decimal_nullable(values[13]), "cmf": self._parse_decimal_nullable(values[14]), "volume_z": self._parse_decimal_nullable(values[15]), "volume_anomaly": values[16] == "1" if values[16] else None, "hour": self._parse_datetime_nullable(values[17]) } except (ValueError, TypeError, InvalidOperation) as e: if self.verbose: logger.warning(f"Failed to parse indicator values: {values[:5]}... Error: {e}") return None def _parse_datetime(self, value: str) -> datetime: """Parse datetime string from MySQL format.""" if not value: raise ValueError("Empty datetime value") value = value.strip() # Try common MySQL datetime formats formats = [ "%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d", ] for fmt in formats: try: return datetime.strptime(value, fmt) except ValueError: continue raise ValueError(f"Cannot parse datetime: {value}") def _parse_datetime_nullable(self, value: Optional[str]) -> Optional[datetime]: """Parse nullable datetime.""" if not value or value.upper() == "NULL": return None try: return self._parse_datetime(value) except ValueError: return None def _parse_decimal(self, value: str) -> Decimal: """Parse decimal value.""" if not value or value.upper() == "NULL": return Decimal("0") return Decimal(str(value).strip()) def _parse_decimal_nullable(self, value: Optional[str]) -> Optional[Decimal]: """Parse nullable decimal value.""" if not value or value.upper() == "NULL": return None try: return Decimal(str(value).strip()) except InvalidOperation: return None def _validate_record(self, record: OHLCVRecord) -> bool: """Validate an OHLCV record before insertion.""" # Basic validation if not record.ticker: return False # Check OHLC values are positive if record.open <= 0 or record.high <= 0 or record.low <= 0 or record.close <= 0: return False # Check high >= low if record.high < record.low: return False # Check high >= open and close if record.high < record.open or record.high < record.close: return False # Check low <= open and close if record.low > record.open or record.low > record.close: return False return True def load_indicators_cache(self, indicators_file: Path) -> int: """ Pre-load indicators from dump file into memory cache. This enables joining indicators with OHLCV data during migration. """ if not indicators_file.exists(): logger.warning(f"Indicators file not found: {indicators_file}") return 0 logger.info(f"Loading indicators from: {indicators_file}") parser = MySQLDumpParser(indicators_file, "tickers_agg_ind_data") count = 0 file_size = parser.get_file_size() with tqdm(total=file_size, unit="B", unit_scale=True, desc="Loading indicators") as pbar: bytes_processed = 0 for values in parser.stream_records(): indicator = self._parse_indicator_values(values) if indicator: self.indicator_cache[indicator["id"]] = indicator count += 1 # Update progress using actual bytes read if count % 10000 == 0: current_bytes = parser.get_bytes_read() if current_bytes > bytes_processed: pbar.update(current_bytes - bytes_processed) bytes_processed = current_bytes logger.info(f"Loaded {count:,} indicators into cache") return count def migrate_ohlcv_data( self, dump_file: Path, start_from_id: int = 0, limit: Optional[int] = None ) -> MigrationStats: """ Migrate OHLCV data from MySQL dump to PostgreSQL. Args: dump_file: Path to the MySQL dump file start_from_id: Skip records with ID less than this value (for resumption) limit: Maximum number of records to migrate (for testing) """ if not dump_file.exists(): raise FileNotFoundError(f"Dump file not found: {dump_file}") self._ensure_historical_table_exists() parser = MySQLDumpParser(dump_file, "tickers_agg_data") file_size = parser.get_file_size() logger.info(f"Starting migration from: {dump_file}") logger.info(f"File size: {file_size / (1024*1024*1024):.2f} GB") logger.info(f"Batch size: {BATCH_SIZE:,}") logger.info(f"Include indicators: {self.include_indicators}") if start_from_id > 0: logger.info(f"Resuming from ID: {start_from_id:,}") if limit: logger.info(f"Limit: {limit:,} records") batch: List[Tuple] = [] records_migrated = 0 with tqdm(total=file_size, unit="B", unit_scale=True, desc="Migrating OHLCV") as pbar: bytes_processed = 0 for values in parser.stream_records(): self.stats.total_inserts_parsed += 1 record = self._parse_ohlcv_values(values) if not record: self.stats.total_errors += 1 continue self.stats.total_rows_parsed += 1 # Skip if resuming from a specific ID if record.id < start_from_id: continue # Validate record if not self._validate_record(record): self.stats.total_rows_skipped += 1 continue # Get ticker ID ticker_id = self._ensure_ticker_exists(record.ticker) if not ticker_id: self.stats.total_rows_skipped += 1 continue # Track tickers self.stats.tickers_found[record.ticker] += 1 # Get indicators if available indicators = None if self.include_indicators and record.id in self.indicator_cache: indicators = self.indicator_cache[record.id] # Build tuple for insertion row = self._build_insert_tuple(record, ticker_id, indicators) batch.append(row) # Insert batch when full if len(batch) >= BATCH_SIZE: inserted = self._insert_batch(batch) self.stats.total_rows_inserted += inserted records_migrated += inserted batch = [] # Update progress using actual bytes read current_bytes = parser.get_bytes_read() if current_bytes > bytes_processed: pbar.update(current_bytes - bytes_processed) bytes_processed = current_bytes # Log progress if self.stats.total_rows_inserted % 100000 == 0: logger.info( f"Progress: {self.stats.total_rows_inserted:,} rows inserted, " f"{self.stats.rows_per_second():.0f} rows/sec" ) # Check limit if limit and records_migrated >= limit: logger.info(f"Reached limit of {limit:,} records") break # Insert remaining batch if batch: inserted = self._insert_batch(batch) self.stats.total_rows_inserted += inserted return self.stats def _build_insert_tuple( self, record: OHLCVRecord, ticker_id: int, indicators: Optional[Dict] ) -> Tuple: """Build a tuple for batch insertion.""" return ( ticker_id, record.date_agg, float(record.open), float(record.high), float(record.low), float(record.close), float(record.volume), float(record.vwap) if record.vwap else None, record.ts, record.periodint, # Indicators float(indicators["macd"]) if indicators and indicators.get("macd") else None, float(indicators["macd_signal"]) if indicators and indicators.get("macd_signal") else None, float(indicators["macd_hist"]) if indicators and indicators.get("macd_hist") else None, float(indicators["sma_10"]) if indicators and indicators.get("sma_10") else None, float(indicators["sma_20"]) if indicators and indicators.get("sma_20") else None, float(indicators["atr"]) if indicators and indicators.get("atr") else None, float(indicators["sar"]) if indicators and indicators.get("sar") else None, float(indicators["mfi"]) if indicators and indicators.get("mfi") else None, float(indicators["rsi"]) if indicators and indicators.get("rsi") else None, float(indicators["fractal_alcista"]) if indicators and indicators.get("fractal_alcista") else None, float(indicators["fractal_bajista"]) if indicators and indicators.get("fractal_bajista") else None, float(indicators["obv"]) if indicators and indicators.get("obv") else None, float(indicators["ad"]) if indicators and indicators.get("ad") else None, float(indicators["cmf"]) if indicators and indicators.get("cmf") else None, float(indicators["volume_z"]) if indicators and indicators.get("volume_z") else None, indicators.get("volume_anomaly") if indicators else None, "legacy_mysql", record.id ) def _insert_batch(self, batch: List[Tuple], retry: int = 0) -> int: """Insert a batch of records into PostgreSQL.""" if self.dry_run: return len(batch) if not self.conn or not batch: return 0 try: with self.conn.cursor() as cur: execute_values( cur, """ INSERT INTO market_data.ohlcv_historical ( ticker_id, timestamp, open, high, low, close, volume, vwap, ts_epoch, period_interval, macd, macd_signal, macd_hist, sma_10, sma_20, atr, sar, mfi, rsi, fractal_alcista, fractal_bajista, obv, ad, cmf, volume_z, volume_anomaly, source, legacy_id ) VALUES %s ON CONFLICT (ticker_id, timestamp, period_interval) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, volume = EXCLUDED.volume, vwap = EXCLUDED.vwap, macd = COALESCE(EXCLUDED.macd, market_data.ohlcv_historical.macd), macd_signal = COALESCE(EXCLUDED.macd_signal, market_data.ohlcv_historical.macd_signal), macd_hist = COALESCE(EXCLUDED.macd_hist, market_data.ohlcv_historical.macd_hist), sma_10 = COALESCE(EXCLUDED.sma_10, market_data.ohlcv_historical.sma_10), sma_20 = COALESCE(EXCLUDED.sma_20, market_data.ohlcv_historical.sma_20), atr = COALESCE(EXCLUDED.atr, market_data.ohlcv_historical.atr), sar = COALESCE(EXCLUDED.sar, market_data.ohlcv_historical.sar), mfi = COALESCE(EXCLUDED.mfi, market_data.ohlcv_historical.mfi), rsi = COALESCE(EXCLUDED.rsi, market_data.ohlcv_historical.rsi), fractal_alcista = COALESCE(EXCLUDED.fractal_alcista, market_data.ohlcv_historical.fractal_alcista), fractal_bajista = COALESCE(EXCLUDED.fractal_bajista, market_data.ohlcv_historical.fractal_bajista), obv = COALESCE(EXCLUDED.obv, market_data.ohlcv_historical.obv), ad = COALESCE(EXCLUDED.ad, market_data.ohlcv_historical.ad), cmf = COALESCE(EXCLUDED.cmf, market_data.ohlcv_historical.cmf), volume_z = COALESCE(EXCLUDED.volume_z, market_data.ohlcv_historical.volume_z), volume_anomaly = COALESCE(EXCLUDED.volume_anomaly, market_data.ohlcv_historical.volume_anomaly) """, batch, page_size=1000 ) self.conn.commit() return len(batch) except Exception as e: self.conn.rollback() if retry < MAX_RETRIES: logger.warning(f"Batch insert failed (retry {retry + 1}/{MAX_RETRIES}): {e}") time.sleep(RETRY_DELAY) return self._insert_batch(batch, retry + 1) else: logger.error(f"Batch insert failed after {MAX_RETRIES} retries: {e}") self.stats.total_errors += len(batch) return 0 def print_summary(self) -> None: """Print migration summary.""" elapsed = self.stats.elapsed_time() print("\n" + "=" * 70) print("MIGRATION SUMMARY") print("=" * 70) print(f" Mode: {'DRY RUN' if self.dry_run else 'LIVE'}") print(f" Elapsed time: {elapsed / 60:.1f} minutes") print(f" Lines processed: {self.stats.total_lines_processed:,}") print(f" INSERT statements: {self.stats.total_inserts_parsed:,}") print(f" Rows parsed: {self.stats.total_rows_parsed:,}") print(f" Rows inserted: {self.stats.total_rows_inserted:,}") print(f" Rows skipped: {self.stats.total_rows_skipped:,}") print(f" Errors: {self.stats.total_errors:,}") print(f" Throughput: {self.stats.rows_per_second():.0f} rows/sec") print() print(" Tickers found:") for ticker, count in sorted(self.stats.tickers_found.items(), key=lambda x: -x[1]): print(f" - {ticker}: {count:,} rows") print("=" * 70) def main(): """Main entry point.""" global BATCH_SIZE parser = argparse.ArgumentParser( description="Migrate historical OHLCV data from MySQL dumps to PostgreSQL", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Dry run to test parsing python migrate_historical_data.py --dry-run --limit 1000 # Migrate OHLCV data only python migrate_historical_data.py --file db.sql # Migrate with indicators python migrate_historical_data.py --file db.sql --indicators db_res.sql # Resume from specific ID python migrate_historical_data.py --file db.sql --start-from 5000000 """ ) parser.add_argument( "--file", "-f", type=Path, default=DEFAULT_OHLCV_DUMP, help=f"MySQL dump file with OHLCV data (default: {DEFAULT_OHLCV_DUMP})" ) parser.add_argument( "--indicators", "-i", type=Path, default=None, help="MySQL dump file with indicator data (e.g., db_res.sql)" ) parser.add_argument( "--dry-run", "-d", action="store_true", help="Parse and validate without inserting to database" ) parser.add_argument( "--limit", "-l", type=int, default=None, help="Maximum number of records to migrate (for testing)" ) parser.add_argument( "--start-from", "-s", type=int, default=0, help="Skip records with ID less than this value (for resumption)" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Enable verbose logging" ) parser.add_argument( "--batch-size", "-b", type=int, default=BATCH_SIZE, help=f"Batch size for inserts (default: {BATCH_SIZE})" ) args = parser.parse_args() # Update batch size if specified BATCH_SIZE = args.batch_size # Print banner print("=" * 70) print("Historical Data Migration Tool - Trading Platform") print("=" * 70) print(f" Source file: {args.file}") print(f" Indicators file: {args.indicators or 'None'}") print(f" Dry run: {args.dry_run}") print(f" Batch size: {BATCH_SIZE:,}") print(f" Limit: {args.limit or 'None'}") print(f" Start from ID: {args.start_from}") print("=" * 70) # Verify dump file exists if not args.file.exists(): logger.error(f"Dump file not found: {args.file}") sys.exit(1) # Create migrator migrator = HistoricalDataMigrator( dry_run=args.dry_run, include_indicators=args.indicators is not None, verbose=args.verbose ) try: # Connect to database migrator.connect() # Load indicators if specified if args.indicators: if args.indicators.exists(): migrator.load_indicators_cache(args.indicators) else: logger.warning(f"Indicators file not found: {args.indicators}") # Run migration stats = migrator.migrate_ohlcv_data( dump_file=args.file, start_from_id=args.start_from, limit=args.limit ) # Print summary migrator.print_summary() # Exit with appropriate code if stats.total_errors > 0: logger.warning(f"Migration completed with {stats.total_errors} errors") sys.exit(1) else: logger.info("Migration completed successfully") sys.exit(0) except KeyboardInterrupt: logger.info("\nMigration interrupted by user") migrator.print_summary() sys.exit(130) except Exception as e: logger.exception(f"Migration failed: {e}") sys.exit(1) finally: migrator.disconnect() if __name__ == "__main__": main()