From 96e80a64f50d302b0e8026ef62743567ee50877a Mon Sep 17 00:00:00 2001 From: Adrian Flores Cortes Date: Fri, 30 Jan 2026 12:24:57 -0600 Subject: [PATCH] feat: Update API examples and add migration script Updates: - api_examples.sh: Updated API usage examples - requirements.txt: Updated dependencies New: - migrate_historical_data.py: Script for historical data migration Co-Authored-By: Claude Opus 4.5 --- requirements.txt | 6 + scripts/migrate_historical_data.py | 1078 ++++++++++++++++++++++++++++ 2 files changed, 1084 insertions(+) create mode 100644 scripts/migrate_historical_data.py diff --git a/requirements.txt b/requirements.txt index db2861f..251999f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,6 +18,7 @@ websockets>=12.0 # Database # ============================================================================= asyncpg>=0.29.0 +psycopg2-binary>=2.9.9 # Sync PostgreSQL driver for migrations # ============================================================================= # Data Processing @@ -41,6 +42,11 @@ python-dotenv>=1.0.0 # ============================================================================= structlog>=23.2.0 +# ============================================================================= +# CLI & Progress +# ============================================================================= +tqdm>=4.66.0 # Progress bars for migrations + # ============================================================================= # Scheduling # ============================================================================= diff --git a/scripts/migrate_historical_data.py b/scripts/migrate_historical_data.py new file mode 100644 index 0000000..081b1d5 --- /dev/null +++ b/scripts/migrate_historical_data.py @@ -0,0 +1,1078 @@ +#!/usr/bin/env python3 +""" +Historical Data Migration Script - Trading Platform +Migrates legacy MySQL dumps to PostgreSQL market_data schema. + +Handles: +- tickers_agg_data: OHLCV data (from db.sql) +- tickers_agg_ind_data: Technical indicators (from db_res.sql) + +Usage: + python scripts/migrate_historical_data.py --dry-run + python scripts/migrate_historical_data.py --file db.sql + python scripts/migrate_historical_data.py --file db_res.sql --include-indicators + +Environment variables: + DB_HOST - PostgreSQL host (default: localhost) + DB_PORT - PostgreSQL port (default: 5432) + DB_NAME - Database name (default: trading_platform) + DB_USER - Database user (default: trading_user) + DB_PASSWORD - Database password (default: trading_dev_2026) + +Author: SIMCO Migration Tool +Date: 2026-01-25 +""" + +import os +import re +import sys +import argparse +import logging +import time +from datetime import datetime +from pathlib import Path +from typing import Generator, Optional, Dict, List, Tuple, Any +from dataclasses import dataclass, field +from decimal import Decimal, InvalidOperation +from collections import defaultdict + +import psycopg2 +from psycopg2 import sql +from psycopg2.extras import execute_values +from tqdm import tqdm +from dotenv import load_dotenv + +# Load environment +load_dotenv(Path(__file__).parent.parent / ".env") + +# Configuration +DB_HOST = os.getenv("DB_HOST", "localhost") +DB_PORT = int(os.getenv("DB_PORT", "5432")) +DB_NAME = os.getenv("DB_NAME", "trading_platform") +DB_USER = os.getenv("DB_USER", "trading_user") +DB_PASSWORD = os.getenv("DB_PASSWORD", "trading_dev_2026") + +# Default dump file paths +DEFAULT_OHLCV_DUMP = Path(r"C:\Empresas\WorkspaceOld\Projects\trading\db.sql") +DEFAULT_INDICATORS_DUMP = Path(r"C:\Empresas\WorkspaceOld\Projects\trading\db_res.sql") + +# Batch configuration +BATCH_SIZE = 10000 +MAX_RETRIES = 3 +RETRY_DELAY = 5 # seconds + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler(Path(__file__).parent / "migration.log", encoding="utf-8") + ] +) +logger = logging.getLogger(__name__) + + +@dataclass +class MigrationStats: + """Statistics for migration progress.""" + total_lines_processed: int = 0 + total_inserts_parsed: int = 0 + total_rows_parsed: int = 0 + total_rows_inserted: int = 0 + total_rows_skipped: int = 0 + total_errors: int = 0 + tickers_found: Dict[str, int] = field(default_factory=lambda: defaultdict(int)) + start_time: float = field(default_factory=time.time) + + def elapsed_time(self) -> float: + return time.time() - self.start_time + + def rows_per_second(self) -> float: + elapsed = self.elapsed_time() + if elapsed > 0: + return self.total_rows_inserted / elapsed + return 0 + + +@dataclass +class OHLCVRecord: + """Represents an OHLCV record from the legacy MySQL database.""" + id: int + ticker: str + date_agg: datetime + open: Decimal + close: Decimal + high: Decimal + low: Decimal + volume: Decimal + vwap: Decimal + ts: int + periodint: int + + # Optional indicator fields (from tickers_agg_ind_data) + macd: Optional[Decimal] = None + macd_signal: Optional[Decimal] = None + macd_hist: Optional[Decimal] = None + sma_10: Optional[Decimal] = None + sma_20: Optional[Decimal] = None + atr: Optional[Decimal] = None + sar: Optional[Decimal] = None + mfi: Optional[Decimal] = None + rsi: Optional[Decimal] = None + fractal_alcista: Optional[Decimal] = None + fractal_bajista: Optional[Decimal] = None + obv: Optional[Decimal] = None + ad: Optional[Decimal] = None + cmf: Optional[Decimal] = None + volume_z: Optional[Decimal] = None + volume_anomaly: Optional[bool] = None + hour: Optional[datetime] = None + + +class MySQLDumpParser: + """ + Streaming parser for MySQL dump files. + Parses INSERT statements character by character to handle massive lines + without loading them entirely into memory. + """ + + def __init__(self, file_path: Path, target_table: str): + self.file_path = file_path + self.target_table = target_table + self._file_size = file_path.stat().st_size if file_path.exists() else 0 + self._bytes_read = 0 + + def get_file_size(self) -> int: + return self._file_size + + def get_bytes_read(self) -> int: + return self._bytes_read + + def stream_records(self) -> Generator[List[str], None, None]: + """ + Stream records from the MySQL dump file. + Yields lists of parsed values for each row. + Uses character-by-character streaming to handle massive INSERT lines. + """ + if not self.file_path.exists(): + raise FileNotFoundError(f"Dump file not found: {self.file_path}") + + self._bytes_read = 0 + in_target_table = False + in_insert = False + buffer = "" + BUFFER_READ_SIZE = 65536 # 64KB read buffer + + with open(self.file_path, "r", encoding="utf-8", errors="replace") as f: + while True: + chunk = f.read(BUFFER_READ_SIZE) + if not chunk: + break + + self._bytes_read += len(chunk.encode("utf-8", errors="replace")) + + for char in chunk: + buffer += char + + # Check for section markers + if not in_target_table: + if f"LOCK TABLES `{self.target_table}` WRITE" in buffer: + in_target_table = True + buffer = "" + continue + # Keep buffer from growing too large when not in target table + if len(buffer) > 1000: + buffer = buffer[-500:] + continue + + # Check for end of target table + if "UNLOCK TABLES" in buffer: + in_target_table = False + buffer = "" + continue + + # Check for INSERT statement start + if not in_insert and "INSERT INTO" in buffer.upper(): + if f"`{self.target_table}`" in buffer: + # Find VALUES keyword and start parsing tuples + values_idx = buffer.upper().find("VALUES") + if values_idx >= 0: + in_insert = True + buffer = buffer[values_idx + 6:].lstrip() + continue + else: + buffer = "" + continue + + # Parse tuples when in INSERT mode + if in_insert: + # Process complete tuples from buffer + while True: + tuple_result = self._extract_tuple(buffer) + if tuple_result is None: + break + + values, remaining = tuple_result + buffer = remaining + + if values: + yield values + + # Check for end of INSERT statement + if buffer.lstrip().startswith(";"): + in_insert = False + buffer = buffer.lstrip()[1:] # Skip semicolon + break + + # Handle any remaining buffer content + if in_insert and buffer.strip(): + tuple_result = self._extract_tuple(buffer) + if tuple_result: + values, _ = tuple_result + if values: + yield values + + def _extract_tuple(self, buffer: str) -> Optional[Tuple[List[str], str]]: + """ + Extract the first complete tuple from buffer. + Returns (parsed_values, remaining_buffer) or None if no complete tuple. + """ + buffer = buffer.lstrip() + + # Skip comma between tuples + if buffer.startswith(","): + buffer = buffer[1:].lstrip() + + if not buffer.startswith("("): + return None + + # Find matching closing parenthesis + depth = 0 + in_string = False + string_char = None + i = 0 + + while i < len(buffer): + char = buffer[i] + + if not in_string: + if char in ("'", '"'): + in_string = True + string_char = char + elif char == "(": + depth += 1 + elif char == ")": + depth -= 1 + if depth == 0: + # Found complete tuple + tuple_content = buffer[1:i] # Exclude parentheses + remaining = buffer[i + 1:] + values = self._parse_values(tuple_content) + return (values, remaining) + else: + # Inside string + if char == string_char: + # Check for escaped quote + if i + 1 < len(buffer) and buffer[i + 1] == string_char: + i += 1 # Skip escaped quote + elif i > 0 and buffer[i - 1] == "\\": + pass # Backslash escaped, continue + else: + in_string = False + string_char = None + + i += 1 + + # No complete tuple found yet + return None + + def _parse_values(self, values_str: str) -> Optional[List[str]]: + """ + Parse a comma-separated values string, handling quoted strings and NULL. + Format: value1,'string value',value3,... + """ + values = [] + current_value = "" + in_quotes = False + quote_char = None + i = 0 + just_ended_quote = False + + while i < len(values_str): + char = values_str[i] + + if not in_quotes: + if just_ended_quote: + # After a quoted string, skip to comma or end + if char == ",": + just_ended_quote = False + i += 1 + continue + else: + i += 1 + continue + + if char in ("'", '"'): + # Start of quoted string + in_quotes = True + quote_char = char + current_value = "" + elif char == ",": + # End of unquoted value + val = current_value.strip() + if val.upper() == "NULL": + values.append(None) + else: + values.append(val if val else None) + current_value = "" + else: + current_value += char + else: + # Inside quotes + if char == quote_char: + # Check for escaped quote (double quote) + if i + 1 < len(values_str) and values_str[i + 1] == quote_char: + current_value += char + i += 1 + else: + # End of quoted string + in_quotes = False + values.append(current_value) + current_value = "" + just_ended_quote = True + elif char == "\\" and i + 1 < len(values_str) and values_str[i + 1] == quote_char: + # Backslash escaped quote + current_value += values_str[i + 1] + i += 1 + else: + current_value += char + + i += 1 + + # Handle last value (unquoted) + if current_value.strip() and not just_ended_quote: + val = current_value.strip() + if val.upper() == "NULL": + values.append(None) + else: + values.append(val) + + return values if values else None + + +class HistoricalDataMigrator: + """ + Migrates historical OHLCV data from MySQL dumps to PostgreSQL. + """ + + def __init__( + self, + dry_run: bool = False, + include_indicators: bool = False, + verbose: bool = False + ): + self.dry_run = dry_run + self.include_indicators = include_indicators + self.verbose = verbose + self.stats = MigrationStats() + self.conn: Optional[psycopg2.extensions.connection] = None + self.ticker_id_cache: Dict[str, int] = {} + self.indicator_cache: Dict[int, Dict] = {} # Cache indicators by legacy id + + def connect(self) -> None: + """Establish database connection.""" + logger.info(f"Connecting to PostgreSQL: {DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}") + + if self.dry_run: + logger.info("DRY RUN mode - no database changes will be made") + return + + self.conn = psycopg2.connect( + host=DB_HOST, + port=DB_PORT, + database=DB_NAME, + user=DB_USER, + password=DB_PASSWORD + ) + self.conn.autocommit = False + + # Load existing ticker mappings + self._load_ticker_cache() + + def disconnect(self) -> None: + """Close database connection.""" + if self.conn: + self.conn.close() + self.conn = None + + def _load_ticker_cache(self) -> None: + """Load ticker symbol to ID mapping from database.""" + if not self.conn: + return + + with self.conn.cursor() as cur: + cur.execute(""" + SELECT symbol, id FROM market_data.tickers + """) + for row in cur.fetchall(): + self.ticker_id_cache[row[0].upper()] = row[1] + + logger.info(f"Loaded {len(self.ticker_id_cache)} tickers from database") + + def _ensure_ticker_exists(self, ticker: str) -> Optional[int]: + """ + Get ticker ID, creating the ticker if it doesn't exist. + Returns None if ticker cannot be created. + """ + ticker_upper = ticker.upper() + + if ticker_upper in self.ticker_id_cache: + return self.ticker_id_cache[ticker_upper] + + if self.dry_run: + # In dry run, simulate auto-increment + fake_id = len(self.ticker_id_cache) + 1000 + self.ticker_id_cache[ticker_upper] = fake_id + logger.info(f"DRY RUN: Would create ticker {ticker_upper} with ID {fake_id}") + return fake_id + + # Determine asset type based on ticker name + asset_type = self._infer_asset_type(ticker_upper) + base_currency, quote_currency = self._parse_currencies(ticker_upper) + + try: + with self.conn.cursor() as cur: + cur.execute(""" + INSERT INTO market_data.tickers + (symbol, name, asset_type, base_currency, quote_currency, is_active, is_ml_enabled) + VALUES (%s, %s, %s, %s, %s, true, true) + ON CONFLICT (symbol) DO UPDATE SET updated_at = NOW() + RETURNING id + """, ( + ticker_upper, + f"{base_currency}/{quote_currency} (Legacy Import)", + asset_type, + base_currency, + quote_currency + )) + ticker_id = cur.fetchone()[0] + self.conn.commit() + self.ticker_id_cache[ticker_upper] = ticker_id + logger.info(f"Created ticker {ticker_upper} with ID {ticker_id}") + return ticker_id + except Exception as e: + logger.error(f"Failed to create ticker {ticker_upper}: {e}") + self.conn.rollback() + return None + + def _infer_asset_type(self, ticker: str) -> str: + """Infer asset type from ticker symbol.""" + if ticker.startswith("XAU") or ticker.startswith("XAG"): + return "commodity" + elif ticker.startswith("BTC") or ticker.startswith("ETH"): + return "crypto" + elif ticker in ("SPX500", "NAS100", "DJI30", "DAX40"): + return "index" + else: + return "forex" + + def _parse_currencies(self, ticker: str) -> Tuple[str, str]: + """Parse base and quote currency from ticker.""" + if len(ticker) >= 6: + return ticker[:3], ticker[3:6] + return ticker, "USD" + + def _ensure_historical_table_exists(self) -> None: + """Create the ohlcv_historical table if it doesn't exist.""" + if self.dry_run or not self.conn: + return + + with self.conn.cursor() as cur: + cur.execute(""" + CREATE TABLE IF NOT EXISTS market_data.ohlcv_historical ( + id BIGSERIAL PRIMARY KEY, + ticker_id INTEGER NOT NULL REFERENCES market_data.tickers(id), + timestamp TIMESTAMPTZ NOT NULL, + open DECIMAL(20,8) NOT NULL, + high DECIMAL(20,8) NOT NULL, + low DECIMAL(20,8) NOT NULL, + close DECIMAL(20,8) NOT NULL, + volume DECIMAL(20,4) DEFAULT 0, + vwap DECIMAL(20,8), + ts_epoch BIGINT, + period_interval INTEGER DEFAULT 5, + + -- Technical Indicators + macd DECIMAL(20,8), + macd_signal DECIMAL(20,8), + macd_hist DECIMAL(20,8), + sma_10 DECIMAL(20,8), + sma_20 DECIMAL(20,8), + atr DECIMAL(20,8), + sar DECIMAL(20,8), + mfi DECIMAL(20,8), + rsi DECIMAL(20,8), + fractal_alcista DECIMAL(20,8), + fractal_bajista DECIMAL(20,8), + obv DECIMAL(20,8), + ad DECIMAL(20,8), + cmf DECIMAL(20,8), + volume_z DECIMAL(20,8), + volume_anomaly BOOLEAN, + + -- Metadata + source VARCHAR(50) DEFAULT 'legacy_mysql', + legacy_id INTEGER, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + CONSTRAINT ohlcv_historical_unique UNIQUE (ticker_id, timestamp, period_interval) + ); + + CREATE INDEX IF NOT EXISTS idx_ohlcv_historical_ticker_ts + ON market_data.ohlcv_historical(ticker_id, timestamp DESC); + CREATE INDEX IF NOT EXISTS idx_ohlcv_historical_timestamp + ON market_data.ohlcv_historical(timestamp DESC); + CREATE INDEX IF NOT EXISTS idx_ohlcv_historical_source + ON market_data.ohlcv_historical(source); + CREATE INDEX IF NOT EXISTS idx_ohlcv_historical_legacy_id + ON market_data.ohlcv_historical(legacy_id) WHERE legacy_id IS NOT NULL; + + COMMENT ON TABLE market_data.ohlcv_historical IS + 'Historical OHLCV data migrated from legacy MySQL database'; + """) + self.conn.commit() + logger.info("Ensured ohlcv_historical table exists") + + def _parse_ohlcv_values(self, values: List[str]) -> Optional[OHLCVRecord]: + """ + Parse values from tickers_agg_data INSERT statement. + Expected format: (id, ticker, date_agg, open, close, high, low, volume, vwap, ts, periodint) + """ + if len(values) < 11: + return None + + try: + record = OHLCVRecord( + id=int(values[0]), + ticker=str(values[1]).strip(), + date_agg=self._parse_datetime(values[2]), + open=self._parse_decimal(values[3]), + close=self._parse_decimal(values[4]), + high=self._parse_decimal(values[5]), + low=self._parse_decimal(values[6]), + volume=self._parse_decimal(values[7]), + vwap=self._parse_decimal(values[8]), + ts=int(values[9]), + periodint=int(values[10]) + ) + return record + except (ValueError, TypeError, InvalidOperation) as e: + if self.verbose: + logger.warning(f"Failed to parse OHLCV values: {values[:5]}... Error: {e}") + return None + + def _parse_indicator_values(self, values: List[str]) -> Optional[Dict]: + """ + Parse values from tickers_agg_ind_data INSERT statement. + Expected format: (id, MACD, MACD_signal, MACD_hist, SMA_10, SMA_20, ATR, SAR, MFI, RSI, + Fractal_Alcista, Fractal_Bajista, OBV, AD, CMF, volume_z, volume_anomaly, hour) + """ + if len(values) < 18: + return None + + try: + return { + "id": int(values[0]), + "macd": self._parse_decimal_nullable(values[1]), + "macd_signal": self._parse_decimal_nullable(values[2]), + "macd_hist": self._parse_decimal_nullable(values[3]), + "sma_10": self._parse_decimal_nullable(values[4]), + "sma_20": self._parse_decimal_nullable(values[5]), + "atr": self._parse_decimal_nullable(values[6]), + "sar": self._parse_decimal_nullable(values[7]), + "mfi": self._parse_decimal_nullable(values[8]), + "rsi": self._parse_decimal_nullable(values[9]), + "fractal_alcista": self._parse_decimal_nullable(values[10]), + "fractal_bajista": self._parse_decimal_nullable(values[11]), + "obv": self._parse_decimal_nullable(values[12]), + "ad": self._parse_decimal_nullable(values[13]), + "cmf": self._parse_decimal_nullable(values[14]), + "volume_z": self._parse_decimal_nullable(values[15]), + "volume_anomaly": values[16] == "1" if values[16] else None, + "hour": self._parse_datetime_nullable(values[17]) + } + except (ValueError, TypeError, InvalidOperation) as e: + if self.verbose: + logger.warning(f"Failed to parse indicator values: {values[:5]}... Error: {e}") + return None + + def _parse_datetime(self, value: str) -> datetime: + """Parse datetime string from MySQL format.""" + if not value: + raise ValueError("Empty datetime value") + value = value.strip() + + # Try common MySQL datetime formats + formats = [ + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%d %H:%M:%S.%f", + "%Y-%m-%d", + ] + + for fmt in formats: + try: + return datetime.strptime(value, fmt) + except ValueError: + continue + + raise ValueError(f"Cannot parse datetime: {value}") + + def _parse_datetime_nullable(self, value: Optional[str]) -> Optional[datetime]: + """Parse nullable datetime.""" + if not value or value.upper() == "NULL": + return None + try: + return self._parse_datetime(value) + except ValueError: + return None + + def _parse_decimal(self, value: str) -> Decimal: + """Parse decimal value.""" + if not value or value.upper() == "NULL": + return Decimal("0") + return Decimal(str(value).strip()) + + def _parse_decimal_nullable(self, value: Optional[str]) -> Optional[Decimal]: + """Parse nullable decimal value.""" + if not value or value.upper() == "NULL": + return None + try: + return Decimal(str(value).strip()) + except InvalidOperation: + return None + + def _validate_record(self, record: OHLCVRecord) -> bool: + """Validate an OHLCV record before insertion.""" + # Basic validation + if not record.ticker: + return False + + # Check OHLC values are positive + if record.open <= 0 or record.high <= 0 or record.low <= 0 or record.close <= 0: + return False + + # Check high >= low + if record.high < record.low: + return False + + # Check high >= open and close + if record.high < record.open or record.high < record.close: + return False + + # Check low <= open and close + if record.low > record.open or record.low > record.close: + return False + + return True + + def load_indicators_cache(self, indicators_file: Path) -> int: + """ + Pre-load indicators from dump file into memory cache. + This enables joining indicators with OHLCV data during migration. + """ + if not indicators_file.exists(): + logger.warning(f"Indicators file not found: {indicators_file}") + return 0 + + logger.info(f"Loading indicators from: {indicators_file}") + parser = MySQLDumpParser(indicators_file, "tickers_agg_ind_data") + + count = 0 + file_size = parser.get_file_size() + + with tqdm(total=file_size, unit="B", unit_scale=True, desc="Loading indicators") as pbar: + bytes_processed = 0 + for values in parser.stream_records(): + indicator = self._parse_indicator_values(values) + if indicator: + self.indicator_cache[indicator["id"]] = indicator + count += 1 + + # Update progress using actual bytes read + if count % 10000 == 0: + current_bytes = parser.get_bytes_read() + if current_bytes > bytes_processed: + pbar.update(current_bytes - bytes_processed) + bytes_processed = current_bytes + + logger.info(f"Loaded {count:,} indicators into cache") + return count + + def migrate_ohlcv_data( + self, + dump_file: Path, + start_from_id: int = 0, + limit: Optional[int] = None + ) -> MigrationStats: + """ + Migrate OHLCV data from MySQL dump to PostgreSQL. + + Args: + dump_file: Path to the MySQL dump file + start_from_id: Skip records with ID less than this value (for resumption) + limit: Maximum number of records to migrate (for testing) + """ + if not dump_file.exists(): + raise FileNotFoundError(f"Dump file not found: {dump_file}") + + self._ensure_historical_table_exists() + + parser = MySQLDumpParser(dump_file, "tickers_agg_data") + file_size = parser.get_file_size() + + logger.info(f"Starting migration from: {dump_file}") + logger.info(f"File size: {file_size / (1024*1024*1024):.2f} GB") + logger.info(f"Batch size: {BATCH_SIZE:,}") + logger.info(f"Include indicators: {self.include_indicators}") + if start_from_id > 0: + logger.info(f"Resuming from ID: {start_from_id:,}") + if limit: + logger.info(f"Limit: {limit:,} records") + + batch: List[Tuple] = [] + records_migrated = 0 + + with tqdm(total=file_size, unit="B", unit_scale=True, desc="Migrating OHLCV") as pbar: + bytes_processed = 0 + + for values in parser.stream_records(): + self.stats.total_inserts_parsed += 1 + + record = self._parse_ohlcv_values(values) + if not record: + self.stats.total_errors += 1 + continue + + self.stats.total_rows_parsed += 1 + + # Skip if resuming from a specific ID + if record.id < start_from_id: + continue + + # Validate record + if not self._validate_record(record): + self.stats.total_rows_skipped += 1 + continue + + # Get ticker ID + ticker_id = self._ensure_ticker_exists(record.ticker) + if not ticker_id: + self.stats.total_rows_skipped += 1 + continue + + # Track tickers + self.stats.tickers_found[record.ticker] += 1 + + # Get indicators if available + indicators = None + if self.include_indicators and record.id in self.indicator_cache: + indicators = self.indicator_cache[record.id] + + # Build tuple for insertion + row = self._build_insert_tuple(record, ticker_id, indicators) + batch.append(row) + + # Insert batch when full + if len(batch) >= BATCH_SIZE: + inserted = self._insert_batch(batch) + self.stats.total_rows_inserted += inserted + records_migrated += inserted + batch = [] + + # Update progress using actual bytes read + current_bytes = parser.get_bytes_read() + if current_bytes > bytes_processed: + pbar.update(current_bytes - bytes_processed) + bytes_processed = current_bytes + + # Log progress + if self.stats.total_rows_inserted % 100000 == 0: + logger.info( + f"Progress: {self.stats.total_rows_inserted:,} rows inserted, " + f"{self.stats.rows_per_second():.0f} rows/sec" + ) + + # Check limit + if limit and records_migrated >= limit: + logger.info(f"Reached limit of {limit:,} records") + break + + # Insert remaining batch + if batch: + inserted = self._insert_batch(batch) + self.stats.total_rows_inserted += inserted + + return self.stats + + def _build_insert_tuple( + self, + record: OHLCVRecord, + ticker_id: int, + indicators: Optional[Dict] + ) -> Tuple: + """Build a tuple for batch insertion.""" + return ( + ticker_id, + record.date_agg, + float(record.open), + float(record.high), + float(record.low), + float(record.close), + float(record.volume), + float(record.vwap) if record.vwap else None, + record.ts, + record.periodint, + # Indicators + float(indicators["macd"]) if indicators and indicators.get("macd") else None, + float(indicators["macd_signal"]) if indicators and indicators.get("macd_signal") else None, + float(indicators["macd_hist"]) if indicators and indicators.get("macd_hist") else None, + float(indicators["sma_10"]) if indicators and indicators.get("sma_10") else None, + float(indicators["sma_20"]) if indicators and indicators.get("sma_20") else None, + float(indicators["atr"]) if indicators and indicators.get("atr") else None, + float(indicators["sar"]) if indicators and indicators.get("sar") else None, + float(indicators["mfi"]) if indicators and indicators.get("mfi") else None, + float(indicators["rsi"]) if indicators and indicators.get("rsi") else None, + float(indicators["fractal_alcista"]) if indicators and indicators.get("fractal_alcista") else None, + float(indicators["fractal_bajista"]) if indicators and indicators.get("fractal_bajista") else None, + float(indicators["obv"]) if indicators and indicators.get("obv") else None, + float(indicators["ad"]) if indicators and indicators.get("ad") else None, + float(indicators["cmf"]) if indicators and indicators.get("cmf") else None, + float(indicators["volume_z"]) if indicators and indicators.get("volume_z") else None, + indicators.get("volume_anomaly") if indicators else None, + "legacy_mysql", + record.id + ) + + def _insert_batch(self, batch: List[Tuple], retry: int = 0) -> int: + """Insert a batch of records into PostgreSQL.""" + if self.dry_run: + return len(batch) + + if not self.conn or not batch: + return 0 + + try: + with self.conn.cursor() as cur: + execute_values( + cur, + """ + INSERT INTO market_data.ohlcv_historical ( + ticker_id, timestamp, open, high, low, close, volume, vwap, + ts_epoch, period_interval, + macd, macd_signal, macd_hist, sma_10, sma_20, atr, sar, mfi, rsi, + fractal_alcista, fractal_bajista, obv, ad, cmf, volume_z, volume_anomaly, + source, legacy_id + ) VALUES %s + ON CONFLICT (ticker_id, timestamp, period_interval) DO UPDATE SET + open = EXCLUDED.open, + high = EXCLUDED.high, + low = EXCLUDED.low, + close = EXCLUDED.close, + volume = EXCLUDED.volume, + vwap = EXCLUDED.vwap, + macd = COALESCE(EXCLUDED.macd, market_data.ohlcv_historical.macd), + macd_signal = COALESCE(EXCLUDED.macd_signal, market_data.ohlcv_historical.macd_signal), + macd_hist = COALESCE(EXCLUDED.macd_hist, market_data.ohlcv_historical.macd_hist), + sma_10 = COALESCE(EXCLUDED.sma_10, market_data.ohlcv_historical.sma_10), + sma_20 = COALESCE(EXCLUDED.sma_20, market_data.ohlcv_historical.sma_20), + atr = COALESCE(EXCLUDED.atr, market_data.ohlcv_historical.atr), + sar = COALESCE(EXCLUDED.sar, market_data.ohlcv_historical.sar), + mfi = COALESCE(EXCLUDED.mfi, market_data.ohlcv_historical.mfi), + rsi = COALESCE(EXCLUDED.rsi, market_data.ohlcv_historical.rsi), + fractal_alcista = COALESCE(EXCLUDED.fractal_alcista, market_data.ohlcv_historical.fractal_alcista), + fractal_bajista = COALESCE(EXCLUDED.fractal_bajista, market_data.ohlcv_historical.fractal_bajista), + obv = COALESCE(EXCLUDED.obv, market_data.ohlcv_historical.obv), + ad = COALESCE(EXCLUDED.ad, market_data.ohlcv_historical.ad), + cmf = COALESCE(EXCLUDED.cmf, market_data.ohlcv_historical.cmf), + volume_z = COALESCE(EXCLUDED.volume_z, market_data.ohlcv_historical.volume_z), + volume_anomaly = COALESCE(EXCLUDED.volume_anomaly, market_data.ohlcv_historical.volume_anomaly) + """, + batch, + page_size=1000 + ) + self.conn.commit() + return len(batch) + + except Exception as e: + self.conn.rollback() + + if retry < MAX_RETRIES: + logger.warning(f"Batch insert failed (retry {retry + 1}/{MAX_RETRIES}): {e}") + time.sleep(RETRY_DELAY) + return self._insert_batch(batch, retry + 1) + else: + logger.error(f"Batch insert failed after {MAX_RETRIES} retries: {e}") + self.stats.total_errors += len(batch) + return 0 + + def print_summary(self) -> None: + """Print migration summary.""" + elapsed = self.stats.elapsed_time() + + print("\n" + "=" * 70) + print("MIGRATION SUMMARY") + print("=" * 70) + print(f" Mode: {'DRY RUN' if self.dry_run else 'LIVE'}") + print(f" Elapsed time: {elapsed / 60:.1f} minutes") + print(f" Lines processed: {self.stats.total_lines_processed:,}") + print(f" INSERT statements: {self.stats.total_inserts_parsed:,}") + print(f" Rows parsed: {self.stats.total_rows_parsed:,}") + print(f" Rows inserted: {self.stats.total_rows_inserted:,}") + print(f" Rows skipped: {self.stats.total_rows_skipped:,}") + print(f" Errors: {self.stats.total_errors:,}") + print(f" Throughput: {self.stats.rows_per_second():.0f} rows/sec") + print() + print(" Tickers found:") + for ticker, count in sorted(self.stats.tickers_found.items(), key=lambda x: -x[1]): + print(f" - {ticker}: {count:,} rows") + print("=" * 70) + + +def main(): + """Main entry point.""" + global BATCH_SIZE + + parser = argparse.ArgumentParser( + description="Migrate historical OHLCV data from MySQL dumps to PostgreSQL", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Dry run to test parsing + python migrate_historical_data.py --dry-run --limit 1000 + + # Migrate OHLCV data only + python migrate_historical_data.py --file db.sql + + # Migrate with indicators + python migrate_historical_data.py --file db.sql --indicators db_res.sql + + # Resume from specific ID + python migrate_historical_data.py --file db.sql --start-from 5000000 + """ + ) + + parser.add_argument( + "--file", "-f", + type=Path, + default=DEFAULT_OHLCV_DUMP, + help=f"MySQL dump file with OHLCV data (default: {DEFAULT_OHLCV_DUMP})" + ) + parser.add_argument( + "--indicators", "-i", + type=Path, + default=None, + help="MySQL dump file with indicator data (e.g., db_res.sql)" + ) + parser.add_argument( + "--dry-run", "-d", + action="store_true", + help="Parse and validate without inserting to database" + ) + parser.add_argument( + "--limit", "-l", + type=int, + default=None, + help="Maximum number of records to migrate (for testing)" + ) + parser.add_argument( + "--start-from", "-s", + type=int, + default=0, + help="Skip records with ID less than this value (for resumption)" + ) + parser.add_argument( + "--verbose", "-v", + action="store_true", + help="Enable verbose logging" + ) + parser.add_argument( + "--batch-size", "-b", + type=int, + default=BATCH_SIZE, + help=f"Batch size for inserts (default: {BATCH_SIZE})" + ) + + args = parser.parse_args() + + # Update batch size if specified + BATCH_SIZE = args.batch_size + + # Print banner + print("=" * 70) + print("Historical Data Migration Tool - Trading Platform") + print("=" * 70) + print(f" Source file: {args.file}") + print(f" Indicators file: {args.indicators or 'None'}") + print(f" Dry run: {args.dry_run}") + print(f" Batch size: {BATCH_SIZE:,}") + print(f" Limit: {args.limit or 'None'}") + print(f" Start from ID: {args.start_from}") + print("=" * 70) + + # Verify dump file exists + if not args.file.exists(): + logger.error(f"Dump file not found: {args.file}") + sys.exit(1) + + # Create migrator + migrator = HistoricalDataMigrator( + dry_run=args.dry_run, + include_indicators=args.indicators is not None, + verbose=args.verbose + ) + + try: + # Connect to database + migrator.connect() + + # Load indicators if specified + if args.indicators: + if args.indicators.exists(): + migrator.load_indicators_cache(args.indicators) + else: + logger.warning(f"Indicators file not found: {args.indicators}") + + # Run migration + stats = migrator.migrate_ohlcv_data( + dump_file=args.file, + start_from_id=args.start_from, + limit=args.limit + ) + + # Print summary + migrator.print_summary() + + # Exit with appropriate code + if stats.total_errors > 0: + logger.warning(f"Migration completed with {stats.total_errors} errors") + sys.exit(1) + else: + logger.info("Migration completed successfully") + sys.exit(0) + + except KeyboardInterrupt: + logger.info("\nMigration interrupted by user") + migrator.print_summary() + sys.exit(130) + + except Exception as e: + logger.exception(f"Migration failed: {e}") + sys.exit(1) + + finally: + migrator.disconnect() + + +if __name__ == "__main__": + main()