trading-platform-data-servi.../scripts/migrate_historical_data.py
Adrian Flores Cortes 96e80a64f5 feat: Update API examples and add migration script
Updates:
- api_examples.sh: Updated API usage examples
- requirements.txt: Updated dependencies

New:
- migrate_historical_data.py: Script for historical data migration

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-30 12:24:57 -06:00

1079 lines
40 KiB
Python

#!/usr/bin/env python3
"""
Historical Data Migration Script - Trading Platform
Migrates legacy MySQL dumps to PostgreSQL market_data schema.
Handles:
- tickers_agg_data: OHLCV data (from db.sql)
- tickers_agg_ind_data: Technical indicators (from db_res.sql)
Usage:
python scripts/migrate_historical_data.py --dry-run
python scripts/migrate_historical_data.py --file db.sql
python scripts/migrate_historical_data.py --file db_res.sql --include-indicators
Environment variables:
DB_HOST - PostgreSQL host (default: localhost)
DB_PORT - PostgreSQL port (default: 5432)
DB_NAME - Database name (default: trading_platform)
DB_USER - Database user (default: trading_user)
DB_PASSWORD - Database password (default: trading_dev_2026)
Author: SIMCO Migration Tool
Date: 2026-01-25
"""
import os
import re
import sys
import argparse
import logging
import time
from datetime import datetime
from pathlib import Path
from typing import Generator, Optional, Dict, List, Tuple, Any
from dataclasses import dataclass, field
from decimal import Decimal, InvalidOperation
from collections import defaultdict
import psycopg2
from psycopg2 import sql
from psycopg2.extras import execute_values
from tqdm import tqdm
from dotenv import load_dotenv
# Load environment
load_dotenv(Path(__file__).parent.parent / ".env")
# Configuration
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = int(os.getenv("DB_PORT", "5432"))
DB_NAME = os.getenv("DB_NAME", "trading_platform")
DB_USER = os.getenv("DB_USER", "trading_user")
DB_PASSWORD = os.getenv("DB_PASSWORD", "trading_dev_2026")
# Default dump file paths
DEFAULT_OHLCV_DUMP = Path(r"C:\Empresas\WorkspaceOld\Projects\trading\db.sql")
DEFAULT_INDICATORS_DUMP = Path(r"C:\Empresas\WorkspaceOld\Projects\trading\db_res.sql")
# Batch configuration
BATCH_SIZE = 10000
MAX_RETRIES = 3
RETRY_DELAY = 5 # seconds
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(Path(__file__).parent / "migration.log", encoding="utf-8")
]
)
logger = logging.getLogger(__name__)
@dataclass
class MigrationStats:
"""Statistics for migration progress."""
total_lines_processed: int = 0
total_inserts_parsed: int = 0
total_rows_parsed: int = 0
total_rows_inserted: int = 0
total_rows_skipped: int = 0
total_errors: int = 0
tickers_found: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
start_time: float = field(default_factory=time.time)
def elapsed_time(self) -> float:
return time.time() - self.start_time
def rows_per_second(self) -> float:
elapsed = self.elapsed_time()
if elapsed > 0:
return self.total_rows_inserted / elapsed
return 0
@dataclass
class OHLCVRecord:
"""Represents an OHLCV record from the legacy MySQL database."""
id: int
ticker: str
date_agg: datetime
open: Decimal
close: Decimal
high: Decimal
low: Decimal
volume: Decimal
vwap: Decimal
ts: int
periodint: int
# Optional indicator fields (from tickers_agg_ind_data)
macd: Optional[Decimal] = None
macd_signal: Optional[Decimal] = None
macd_hist: Optional[Decimal] = None
sma_10: Optional[Decimal] = None
sma_20: Optional[Decimal] = None
atr: Optional[Decimal] = None
sar: Optional[Decimal] = None
mfi: Optional[Decimal] = None
rsi: Optional[Decimal] = None
fractal_alcista: Optional[Decimal] = None
fractal_bajista: Optional[Decimal] = None
obv: Optional[Decimal] = None
ad: Optional[Decimal] = None
cmf: Optional[Decimal] = None
volume_z: Optional[Decimal] = None
volume_anomaly: Optional[bool] = None
hour: Optional[datetime] = None
class MySQLDumpParser:
"""
Streaming parser for MySQL dump files.
Parses INSERT statements character by character to handle massive lines
without loading them entirely into memory.
"""
def __init__(self, file_path: Path, target_table: str):
self.file_path = file_path
self.target_table = target_table
self._file_size = file_path.stat().st_size if file_path.exists() else 0
self._bytes_read = 0
def get_file_size(self) -> int:
return self._file_size
def get_bytes_read(self) -> int:
return self._bytes_read
def stream_records(self) -> Generator[List[str], None, None]:
"""
Stream records from the MySQL dump file.
Yields lists of parsed values for each row.
Uses character-by-character streaming to handle massive INSERT lines.
"""
if not self.file_path.exists():
raise FileNotFoundError(f"Dump file not found: {self.file_path}")
self._bytes_read = 0
in_target_table = False
in_insert = False
buffer = ""
BUFFER_READ_SIZE = 65536 # 64KB read buffer
with open(self.file_path, "r", encoding="utf-8", errors="replace") as f:
while True:
chunk = f.read(BUFFER_READ_SIZE)
if not chunk:
break
self._bytes_read += len(chunk.encode("utf-8", errors="replace"))
for char in chunk:
buffer += char
# Check for section markers
if not in_target_table:
if f"LOCK TABLES `{self.target_table}` WRITE" in buffer:
in_target_table = True
buffer = ""
continue
# Keep buffer from growing too large when not in target table
if len(buffer) > 1000:
buffer = buffer[-500:]
continue
# Check for end of target table
if "UNLOCK TABLES" in buffer:
in_target_table = False
buffer = ""
continue
# Check for INSERT statement start
if not in_insert and "INSERT INTO" in buffer.upper():
if f"`{self.target_table}`" in buffer:
# Find VALUES keyword and start parsing tuples
values_idx = buffer.upper().find("VALUES")
if values_idx >= 0:
in_insert = True
buffer = buffer[values_idx + 6:].lstrip()
continue
else:
buffer = ""
continue
# Parse tuples when in INSERT mode
if in_insert:
# Process complete tuples from buffer
while True:
tuple_result = self._extract_tuple(buffer)
if tuple_result is None:
break
values, remaining = tuple_result
buffer = remaining
if values:
yield values
# Check for end of INSERT statement
if buffer.lstrip().startswith(";"):
in_insert = False
buffer = buffer.lstrip()[1:] # Skip semicolon
break
# Handle any remaining buffer content
if in_insert and buffer.strip():
tuple_result = self._extract_tuple(buffer)
if tuple_result:
values, _ = tuple_result
if values:
yield values
def _extract_tuple(self, buffer: str) -> Optional[Tuple[List[str], str]]:
"""
Extract the first complete tuple from buffer.
Returns (parsed_values, remaining_buffer) or None if no complete tuple.
"""
buffer = buffer.lstrip()
# Skip comma between tuples
if buffer.startswith(","):
buffer = buffer[1:].lstrip()
if not buffer.startswith("("):
return None
# Find matching closing parenthesis
depth = 0
in_string = False
string_char = None
i = 0
while i < len(buffer):
char = buffer[i]
if not in_string:
if char in ("'", '"'):
in_string = True
string_char = char
elif char == "(":
depth += 1
elif char == ")":
depth -= 1
if depth == 0:
# Found complete tuple
tuple_content = buffer[1:i] # Exclude parentheses
remaining = buffer[i + 1:]
values = self._parse_values(tuple_content)
return (values, remaining)
else:
# Inside string
if char == string_char:
# Check for escaped quote
if i + 1 < len(buffer) and buffer[i + 1] == string_char:
i += 1 # Skip escaped quote
elif i > 0 and buffer[i - 1] == "\\":
pass # Backslash escaped, continue
else:
in_string = False
string_char = None
i += 1
# No complete tuple found yet
return None
def _parse_values(self, values_str: str) -> Optional[List[str]]:
"""
Parse a comma-separated values string, handling quoted strings and NULL.
Format: value1,'string value',value3,...
"""
values = []
current_value = ""
in_quotes = False
quote_char = None
i = 0
just_ended_quote = False
while i < len(values_str):
char = values_str[i]
if not in_quotes:
if just_ended_quote:
# After a quoted string, skip to comma or end
if char == ",":
just_ended_quote = False
i += 1
continue
else:
i += 1
continue
if char in ("'", '"'):
# Start of quoted string
in_quotes = True
quote_char = char
current_value = ""
elif char == ",":
# End of unquoted value
val = current_value.strip()
if val.upper() == "NULL":
values.append(None)
else:
values.append(val if val else None)
current_value = ""
else:
current_value += char
else:
# Inside quotes
if char == quote_char:
# Check for escaped quote (double quote)
if i + 1 < len(values_str) and values_str[i + 1] == quote_char:
current_value += char
i += 1
else:
# End of quoted string
in_quotes = False
values.append(current_value)
current_value = ""
just_ended_quote = True
elif char == "\\" and i + 1 < len(values_str) and values_str[i + 1] == quote_char:
# Backslash escaped quote
current_value += values_str[i + 1]
i += 1
else:
current_value += char
i += 1
# Handle last value (unquoted)
if current_value.strip() and not just_ended_quote:
val = current_value.strip()
if val.upper() == "NULL":
values.append(None)
else:
values.append(val)
return values if values else None
class HistoricalDataMigrator:
"""
Migrates historical OHLCV data from MySQL dumps to PostgreSQL.
"""
def __init__(
self,
dry_run: bool = False,
include_indicators: bool = False,
verbose: bool = False
):
self.dry_run = dry_run
self.include_indicators = include_indicators
self.verbose = verbose
self.stats = MigrationStats()
self.conn: Optional[psycopg2.extensions.connection] = None
self.ticker_id_cache: Dict[str, int] = {}
self.indicator_cache: Dict[int, Dict] = {} # Cache indicators by legacy id
def connect(self) -> None:
"""Establish database connection."""
logger.info(f"Connecting to PostgreSQL: {DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}")
if self.dry_run:
logger.info("DRY RUN mode - no database changes will be made")
return
self.conn = psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
self.conn.autocommit = False
# Load existing ticker mappings
self._load_ticker_cache()
def disconnect(self) -> None:
"""Close database connection."""
if self.conn:
self.conn.close()
self.conn = None
def _load_ticker_cache(self) -> None:
"""Load ticker symbol to ID mapping from database."""
if not self.conn:
return
with self.conn.cursor() as cur:
cur.execute("""
SELECT symbol, id FROM market_data.tickers
""")
for row in cur.fetchall():
self.ticker_id_cache[row[0].upper()] = row[1]
logger.info(f"Loaded {len(self.ticker_id_cache)} tickers from database")
def _ensure_ticker_exists(self, ticker: str) -> Optional[int]:
"""
Get ticker ID, creating the ticker if it doesn't exist.
Returns None if ticker cannot be created.
"""
ticker_upper = ticker.upper()
if ticker_upper in self.ticker_id_cache:
return self.ticker_id_cache[ticker_upper]
if self.dry_run:
# In dry run, simulate auto-increment
fake_id = len(self.ticker_id_cache) + 1000
self.ticker_id_cache[ticker_upper] = fake_id
logger.info(f"DRY RUN: Would create ticker {ticker_upper} with ID {fake_id}")
return fake_id
# Determine asset type based on ticker name
asset_type = self._infer_asset_type(ticker_upper)
base_currency, quote_currency = self._parse_currencies(ticker_upper)
try:
with self.conn.cursor() as cur:
cur.execute("""
INSERT INTO market_data.tickers
(symbol, name, asset_type, base_currency, quote_currency, is_active, is_ml_enabled)
VALUES (%s, %s, %s, %s, %s, true, true)
ON CONFLICT (symbol) DO UPDATE SET updated_at = NOW()
RETURNING id
""", (
ticker_upper,
f"{base_currency}/{quote_currency} (Legacy Import)",
asset_type,
base_currency,
quote_currency
))
ticker_id = cur.fetchone()[0]
self.conn.commit()
self.ticker_id_cache[ticker_upper] = ticker_id
logger.info(f"Created ticker {ticker_upper} with ID {ticker_id}")
return ticker_id
except Exception as e:
logger.error(f"Failed to create ticker {ticker_upper}: {e}")
self.conn.rollback()
return None
def _infer_asset_type(self, ticker: str) -> str:
"""Infer asset type from ticker symbol."""
if ticker.startswith("XAU") or ticker.startswith("XAG"):
return "commodity"
elif ticker.startswith("BTC") or ticker.startswith("ETH"):
return "crypto"
elif ticker in ("SPX500", "NAS100", "DJI30", "DAX40"):
return "index"
else:
return "forex"
def _parse_currencies(self, ticker: str) -> Tuple[str, str]:
"""Parse base and quote currency from ticker."""
if len(ticker) >= 6:
return ticker[:3], ticker[3:6]
return ticker, "USD"
def _ensure_historical_table_exists(self) -> None:
"""Create the ohlcv_historical table if it doesn't exist."""
if self.dry_run or not self.conn:
return
with self.conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS market_data.ohlcv_historical (
id BIGSERIAL PRIMARY KEY,
ticker_id INTEGER NOT NULL REFERENCES market_data.tickers(id),
timestamp TIMESTAMPTZ NOT NULL,
open DECIMAL(20,8) NOT NULL,
high DECIMAL(20,8) NOT NULL,
low DECIMAL(20,8) NOT NULL,
close DECIMAL(20,8) NOT NULL,
volume DECIMAL(20,4) DEFAULT 0,
vwap DECIMAL(20,8),
ts_epoch BIGINT,
period_interval INTEGER DEFAULT 5,
-- Technical Indicators
macd DECIMAL(20,8),
macd_signal DECIMAL(20,8),
macd_hist DECIMAL(20,8),
sma_10 DECIMAL(20,8),
sma_20 DECIMAL(20,8),
atr DECIMAL(20,8),
sar DECIMAL(20,8),
mfi DECIMAL(20,8),
rsi DECIMAL(20,8),
fractal_alcista DECIMAL(20,8),
fractal_bajista DECIMAL(20,8),
obv DECIMAL(20,8),
ad DECIMAL(20,8),
cmf DECIMAL(20,8),
volume_z DECIMAL(20,8),
volume_anomaly BOOLEAN,
-- Metadata
source VARCHAR(50) DEFAULT 'legacy_mysql',
legacy_id INTEGER,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT ohlcv_historical_unique UNIQUE (ticker_id, timestamp, period_interval)
);
CREATE INDEX IF NOT EXISTS idx_ohlcv_historical_ticker_ts
ON market_data.ohlcv_historical(ticker_id, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_ohlcv_historical_timestamp
ON market_data.ohlcv_historical(timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_ohlcv_historical_source
ON market_data.ohlcv_historical(source);
CREATE INDEX IF NOT EXISTS idx_ohlcv_historical_legacy_id
ON market_data.ohlcv_historical(legacy_id) WHERE legacy_id IS NOT NULL;
COMMENT ON TABLE market_data.ohlcv_historical IS
'Historical OHLCV data migrated from legacy MySQL database';
""")
self.conn.commit()
logger.info("Ensured ohlcv_historical table exists")
def _parse_ohlcv_values(self, values: List[str]) -> Optional[OHLCVRecord]:
"""
Parse values from tickers_agg_data INSERT statement.
Expected format: (id, ticker, date_agg, open, close, high, low, volume, vwap, ts, periodint)
"""
if len(values) < 11:
return None
try:
record = OHLCVRecord(
id=int(values[0]),
ticker=str(values[1]).strip(),
date_agg=self._parse_datetime(values[2]),
open=self._parse_decimal(values[3]),
close=self._parse_decimal(values[4]),
high=self._parse_decimal(values[5]),
low=self._parse_decimal(values[6]),
volume=self._parse_decimal(values[7]),
vwap=self._parse_decimal(values[8]),
ts=int(values[9]),
periodint=int(values[10])
)
return record
except (ValueError, TypeError, InvalidOperation) as e:
if self.verbose:
logger.warning(f"Failed to parse OHLCV values: {values[:5]}... Error: {e}")
return None
def _parse_indicator_values(self, values: List[str]) -> Optional[Dict]:
"""
Parse values from tickers_agg_ind_data INSERT statement.
Expected format: (id, MACD, MACD_signal, MACD_hist, SMA_10, SMA_20, ATR, SAR, MFI, RSI,
Fractal_Alcista, Fractal_Bajista, OBV, AD, CMF, volume_z, volume_anomaly, hour)
"""
if len(values) < 18:
return None
try:
return {
"id": int(values[0]),
"macd": self._parse_decimal_nullable(values[1]),
"macd_signal": self._parse_decimal_nullable(values[2]),
"macd_hist": self._parse_decimal_nullable(values[3]),
"sma_10": self._parse_decimal_nullable(values[4]),
"sma_20": self._parse_decimal_nullable(values[5]),
"atr": self._parse_decimal_nullable(values[6]),
"sar": self._parse_decimal_nullable(values[7]),
"mfi": self._parse_decimal_nullable(values[8]),
"rsi": self._parse_decimal_nullable(values[9]),
"fractal_alcista": self._parse_decimal_nullable(values[10]),
"fractal_bajista": self._parse_decimal_nullable(values[11]),
"obv": self._parse_decimal_nullable(values[12]),
"ad": self._parse_decimal_nullable(values[13]),
"cmf": self._parse_decimal_nullable(values[14]),
"volume_z": self._parse_decimal_nullable(values[15]),
"volume_anomaly": values[16] == "1" if values[16] else None,
"hour": self._parse_datetime_nullable(values[17])
}
except (ValueError, TypeError, InvalidOperation) as e:
if self.verbose:
logger.warning(f"Failed to parse indicator values: {values[:5]}... Error: {e}")
return None
def _parse_datetime(self, value: str) -> datetime:
"""Parse datetime string from MySQL format."""
if not value:
raise ValueError("Empty datetime value")
value = value.strip()
# Try common MySQL datetime formats
formats = [
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M:%S.%f",
"%Y-%m-%d",
]
for fmt in formats:
try:
return datetime.strptime(value, fmt)
except ValueError:
continue
raise ValueError(f"Cannot parse datetime: {value}")
def _parse_datetime_nullable(self, value: Optional[str]) -> Optional[datetime]:
"""Parse nullable datetime."""
if not value or value.upper() == "NULL":
return None
try:
return self._parse_datetime(value)
except ValueError:
return None
def _parse_decimal(self, value: str) -> Decimal:
"""Parse decimal value."""
if not value or value.upper() == "NULL":
return Decimal("0")
return Decimal(str(value).strip())
def _parse_decimal_nullable(self, value: Optional[str]) -> Optional[Decimal]:
"""Parse nullable decimal value."""
if not value or value.upper() == "NULL":
return None
try:
return Decimal(str(value).strip())
except InvalidOperation:
return None
def _validate_record(self, record: OHLCVRecord) -> bool:
"""Validate an OHLCV record before insertion."""
# Basic validation
if not record.ticker:
return False
# Check OHLC values are positive
if record.open <= 0 or record.high <= 0 or record.low <= 0 or record.close <= 0:
return False
# Check high >= low
if record.high < record.low:
return False
# Check high >= open and close
if record.high < record.open or record.high < record.close:
return False
# Check low <= open and close
if record.low > record.open or record.low > record.close:
return False
return True
def load_indicators_cache(self, indicators_file: Path) -> int:
"""
Pre-load indicators from dump file into memory cache.
This enables joining indicators with OHLCV data during migration.
"""
if not indicators_file.exists():
logger.warning(f"Indicators file not found: {indicators_file}")
return 0
logger.info(f"Loading indicators from: {indicators_file}")
parser = MySQLDumpParser(indicators_file, "tickers_agg_ind_data")
count = 0
file_size = parser.get_file_size()
with tqdm(total=file_size, unit="B", unit_scale=True, desc="Loading indicators") as pbar:
bytes_processed = 0
for values in parser.stream_records():
indicator = self._parse_indicator_values(values)
if indicator:
self.indicator_cache[indicator["id"]] = indicator
count += 1
# Update progress using actual bytes read
if count % 10000 == 0:
current_bytes = parser.get_bytes_read()
if current_bytes > bytes_processed:
pbar.update(current_bytes - bytes_processed)
bytes_processed = current_bytes
logger.info(f"Loaded {count:,} indicators into cache")
return count
def migrate_ohlcv_data(
self,
dump_file: Path,
start_from_id: int = 0,
limit: Optional[int] = None
) -> MigrationStats:
"""
Migrate OHLCV data from MySQL dump to PostgreSQL.
Args:
dump_file: Path to the MySQL dump file
start_from_id: Skip records with ID less than this value (for resumption)
limit: Maximum number of records to migrate (for testing)
"""
if not dump_file.exists():
raise FileNotFoundError(f"Dump file not found: {dump_file}")
self._ensure_historical_table_exists()
parser = MySQLDumpParser(dump_file, "tickers_agg_data")
file_size = parser.get_file_size()
logger.info(f"Starting migration from: {dump_file}")
logger.info(f"File size: {file_size / (1024*1024*1024):.2f} GB")
logger.info(f"Batch size: {BATCH_SIZE:,}")
logger.info(f"Include indicators: {self.include_indicators}")
if start_from_id > 0:
logger.info(f"Resuming from ID: {start_from_id:,}")
if limit:
logger.info(f"Limit: {limit:,} records")
batch: List[Tuple] = []
records_migrated = 0
with tqdm(total=file_size, unit="B", unit_scale=True, desc="Migrating OHLCV") as pbar:
bytes_processed = 0
for values in parser.stream_records():
self.stats.total_inserts_parsed += 1
record = self._parse_ohlcv_values(values)
if not record:
self.stats.total_errors += 1
continue
self.stats.total_rows_parsed += 1
# Skip if resuming from a specific ID
if record.id < start_from_id:
continue
# Validate record
if not self._validate_record(record):
self.stats.total_rows_skipped += 1
continue
# Get ticker ID
ticker_id = self._ensure_ticker_exists(record.ticker)
if not ticker_id:
self.stats.total_rows_skipped += 1
continue
# Track tickers
self.stats.tickers_found[record.ticker] += 1
# Get indicators if available
indicators = None
if self.include_indicators and record.id in self.indicator_cache:
indicators = self.indicator_cache[record.id]
# Build tuple for insertion
row = self._build_insert_tuple(record, ticker_id, indicators)
batch.append(row)
# Insert batch when full
if len(batch) >= BATCH_SIZE:
inserted = self._insert_batch(batch)
self.stats.total_rows_inserted += inserted
records_migrated += inserted
batch = []
# Update progress using actual bytes read
current_bytes = parser.get_bytes_read()
if current_bytes > bytes_processed:
pbar.update(current_bytes - bytes_processed)
bytes_processed = current_bytes
# Log progress
if self.stats.total_rows_inserted % 100000 == 0:
logger.info(
f"Progress: {self.stats.total_rows_inserted:,} rows inserted, "
f"{self.stats.rows_per_second():.0f} rows/sec"
)
# Check limit
if limit and records_migrated >= limit:
logger.info(f"Reached limit of {limit:,} records")
break
# Insert remaining batch
if batch:
inserted = self._insert_batch(batch)
self.stats.total_rows_inserted += inserted
return self.stats
def _build_insert_tuple(
self,
record: OHLCVRecord,
ticker_id: int,
indicators: Optional[Dict]
) -> Tuple:
"""Build a tuple for batch insertion."""
return (
ticker_id,
record.date_agg,
float(record.open),
float(record.high),
float(record.low),
float(record.close),
float(record.volume),
float(record.vwap) if record.vwap else None,
record.ts,
record.periodint,
# Indicators
float(indicators["macd"]) if indicators and indicators.get("macd") else None,
float(indicators["macd_signal"]) if indicators and indicators.get("macd_signal") else None,
float(indicators["macd_hist"]) if indicators and indicators.get("macd_hist") else None,
float(indicators["sma_10"]) if indicators and indicators.get("sma_10") else None,
float(indicators["sma_20"]) if indicators and indicators.get("sma_20") else None,
float(indicators["atr"]) if indicators and indicators.get("atr") else None,
float(indicators["sar"]) if indicators and indicators.get("sar") else None,
float(indicators["mfi"]) if indicators and indicators.get("mfi") else None,
float(indicators["rsi"]) if indicators and indicators.get("rsi") else None,
float(indicators["fractal_alcista"]) if indicators and indicators.get("fractal_alcista") else None,
float(indicators["fractal_bajista"]) if indicators and indicators.get("fractal_bajista") else None,
float(indicators["obv"]) if indicators and indicators.get("obv") else None,
float(indicators["ad"]) if indicators and indicators.get("ad") else None,
float(indicators["cmf"]) if indicators and indicators.get("cmf") else None,
float(indicators["volume_z"]) if indicators and indicators.get("volume_z") else None,
indicators.get("volume_anomaly") if indicators else None,
"legacy_mysql",
record.id
)
def _insert_batch(self, batch: List[Tuple], retry: int = 0) -> int:
"""Insert a batch of records into PostgreSQL."""
if self.dry_run:
return len(batch)
if not self.conn or not batch:
return 0
try:
with self.conn.cursor() as cur:
execute_values(
cur,
"""
INSERT INTO market_data.ohlcv_historical (
ticker_id, timestamp, open, high, low, close, volume, vwap,
ts_epoch, period_interval,
macd, macd_signal, macd_hist, sma_10, sma_20, atr, sar, mfi, rsi,
fractal_alcista, fractal_bajista, obv, ad, cmf, volume_z, volume_anomaly,
source, legacy_id
) VALUES %s
ON CONFLICT (ticker_id, timestamp, period_interval) DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
volume = EXCLUDED.volume,
vwap = EXCLUDED.vwap,
macd = COALESCE(EXCLUDED.macd, market_data.ohlcv_historical.macd),
macd_signal = COALESCE(EXCLUDED.macd_signal, market_data.ohlcv_historical.macd_signal),
macd_hist = COALESCE(EXCLUDED.macd_hist, market_data.ohlcv_historical.macd_hist),
sma_10 = COALESCE(EXCLUDED.sma_10, market_data.ohlcv_historical.sma_10),
sma_20 = COALESCE(EXCLUDED.sma_20, market_data.ohlcv_historical.sma_20),
atr = COALESCE(EXCLUDED.atr, market_data.ohlcv_historical.atr),
sar = COALESCE(EXCLUDED.sar, market_data.ohlcv_historical.sar),
mfi = COALESCE(EXCLUDED.mfi, market_data.ohlcv_historical.mfi),
rsi = COALESCE(EXCLUDED.rsi, market_data.ohlcv_historical.rsi),
fractal_alcista = COALESCE(EXCLUDED.fractal_alcista, market_data.ohlcv_historical.fractal_alcista),
fractal_bajista = COALESCE(EXCLUDED.fractal_bajista, market_data.ohlcv_historical.fractal_bajista),
obv = COALESCE(EXCLUDED.obv, market_data.ohlcv_historical.obv),
ad = COALESCE(EXCLUDED.ad, market_data.ohlcv_historical.ad),
cmf = COALESCE(EXCLUDED.cmf, market_data.ohlcv_historical.cmf),
volume_z = COALESCE(EXCLUDED.volume_z, market_data.ohlcv_historical.volume_z),
volume_anomaly = COALESCE(EXCLUDED.volume_anomaly, market_data.ohlcv_historical.volume_anomaly)
""",
batch,
page_size=1000
)
self.conn.commit()
return len(batch)
except Exception as e:
self.conn.rollback()
if retry < MAX_RETRIES:
logger.warning(f"Batch insert failed (retry {retry + 1}/{MAX_RETRIES}): {e}")
time.sleep(RETRY_DELAY)
return self._insert_batch(batch, retry + 1)
else:
logger.error(f"Batch insert failed after {MAX_RETRIES} retries: {e}")
self.stats.total_errors += len(batch)
return 0
def print_summary(self) -> None:
"""Print migration summary."""
elapsed = self.stats.elapsed_time()
print("\n" + "=" * 70)
print("MIGRATION SUMMARY")
print("=" * 70)
print(f" Mode: {'DRY RUN' if self.dry_run else 'LIVE'}")
print(f" Elapsed time: {elapsed / 60:.1f} minutes")
print(f" Lines processed: {self.stats.total_lines_processed:,}")
print(f" INSERT statements: {self.stats.total_inserts_parsed:,}")
print(f" Rows parsed: {self.stats.total_rows_parsed:,}")
print(f" Rows inserted: {self.stats.total_rows_inserted:,}")
print(f" Rows skipped: {self.stats.total_rows_skipped:,}")
print(f" Errors: {self.stats.total_errors:,}")
print(f" Throughput: {self.stats.rows_per_second():.0f} rows/sec")
print()
print(" Tickers found:")
for ticker, count in sorted(self.stats.tickers_found.items(), key=lambda x: -x[1]):
print(f" - {ticker}: {count:,} rows")
print("=" * 70)
def main():
"""Main entry point."""
global BATCH_SIZE
parser = argparse.ArgumentParser(
description="Migrate historical OHLCV data from MySQL dumps to PostgreSQL",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Dry run to test parsing
python migrate_historical_data.py --dry-run --limit 1000
# Migrate OHLCV data only
python migrate_historical_data.py --file db.sql
# Migrate with indicators
python migrate_historical_data.py --file db.sql --indicators db_res.sql
# Resume from specific ID
python migrate_historical_data.py --file db.sql --start-from 5000000
"""
)
parser.add_argument(
"--file", "-f",
type=Path,
default=DEFAULT_OHLCV_DUMP,
help=f"MySQL dump file with OHLCV data (default: {DEFAULT_OHLCV_DUMP})"
)
parser.add_argument(
"--indicators", "-i",
type=Path,
default=None,
help="MySQL dump file with indicator data (e.g., db_res.sql)"
)
parser.add_argument(
"--dry-run", "-d",
action="store_true",
help="Parse and validate without inserting to database"
)
parser.add_argument(
"--limit", "-l",
type=int,
default=None,
help="Maximum number of records to migrate (for testing)"
)
parser.add_argument(
"--start-from", "-s",
type=int,
default=0,
help="Skip records with ID less than this value (for resumption)"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
parser.add_argument(
"--batch-size", "-b",
type=int,
default=BATCH_SIZE,
help=f"Batch size for inserts (default: {BATCH_SIZE})"
)
args = parser.parse_args()
# Update batch size if specified
BATCH_SIZE = args.batch_size
# Print banner
print("=" * 70)
print("Historical Data Migration Tool - Trading Platform")
print("=" * 70)
print(f" Source file: {args.file}")
print(f" Indicators file: {args.indicators or 'None'}")
print(f" Dry run: {args.dry_run}")
print(f" Batch size: {BATCH_SIZE:,}")
print(f" Limit: {args.limit or 'None'}")
print(f" Start from ID: {args.start_from}")
print("=" * 70)
# Verify dump file exists
if not args.file.exists():
logger.error(f"Dump file not found: {args.file}")
sys.exit(1)
# Create migrator
migrator = HistoricalDataMigrator(
dry_run=args.dry_run,
include_indicators=args.indicators is not None,
verbose=args.verbose
)
try:
# Connect to database
migrator.connect()
# Load indicators if specified
if args.indicators:
if args.indicators.exists():
migrator.load_indicators_cache(args.indicators)
else:
logger.warning(f"Indicators file not found: {args.indicators}")
# Run migration
stats = migrator.migrate_ohlcv_data(
dump_file=args.file,
start_from_id=args.start_from,
limit=args.limit
)
# Print summary
migrator.print_summary()
# Exit with appropriate code
if stats.total_errors > 0:
logger.warning(f"Migration completed with {stats.total_errors} errors")
sys.exit(1)
else:
logger.info("Migration completed successfully")
sys.exit(0)
except KeyboardInterrupt:
logger.info("\nMigration interrupted by user")
migrator.print_summary()
sys.exit(130)
except Exception as e:
logger.exception(f"Migration failed: {e}")
sys.exit(1)
finally:
migrator.disconnect()
if __name__ == "__main__":
main()