""" Data Synchronization Service OrbiQuant IA Trading Platform Handles automatic synchronization of market data from Massive.com/Polygon.io """ import asyncio import logging from datetime import datetime, timedelta from typing import Optional, List, Dict, Any from enum import Enum import asyncpg from providers.polygon_client import PolygonClient, AssetType, Timeframe, OHLCVBar from config import TICKER_MAPPINGS logger = logging.getLogger(__name__) class SyncStatus(str, Enum): """Sync status values.""" PENDING = "pending" IN_PROGRESS = "in_progress" SUCCESS = "success" FAILED = "failed" PARTIAL = "partial" class DataSyncService: """ Service to sync market data from Polygon/Massive to PostgreSQL. Features: - Automatic backfill of historical data - Incremental sync from last timestamp - Multi-timeframe support - Rate limiting and error handling - Sync status tracking """ # Supported timeframes with their table mappings TIMEFRAME_TABLES = { Timeframe.MINUTE_1: "ohlcv_1min", Timeframe.MINUTE_5: "ohlcv_5min", Timeframe.MINUTE_15: "ohlcv_15min", Timeframe.HOUR_1: "ohlcv_1hour", Timeframe.HOUR_4: "ohlcv_4hour", Timeframe.DAY_1: "ohlcv_daily", } def __init__( self, polygon_client: PolygonClient, db_pool: asyncpg.Pool, batch_size: int = 10000 ): self.client = polygon_client self.db = db_pool self.batch_size = batch_size self._sync_tasks: Dict[str, asyncio.Task] = {} async def get_or_create_ticker( self, symbol: str, asset_type: AssetType ) -> Optional[int]: """ Get ticker ID from database or create new ticker entry. Args: symbol: Ticker symbol (e.g., 'EURUSD', 'BTCUSD') asset_type: Type of asset Returns: Ticker ID or None if error """ async with self.db.acquire() as conn: # Try to get existing ticker row = await conn.fetchrow( """ SELECT id FROM market_data.tickers WHERE UPPER(symbol) = UPPER($1) """, symbol ) if row: return row["id"] # Create new ticker try: # Get ticker details from Polygon details = await self.client.get_ticker_details(symbol, asset_type) ticker_id = await conn.fetchval( """ INSERT INTO market_data.tickers (symbol, name, asset_type, base_currency, quote_currency, exchange, is_active, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, NOW(), NOW()) RETURNING id """, symbol.upper(), details.get("name") if details else symbol, asset_type.value, symbol[:3] if len(symbol) >= 6 else "USD", # Basic parsing symbol[3:] if len(symbol) >= 6 else "USD", details.get("primary_exchange") if details else "POLYGON", True ) logger.info(f"Created new ticker: {symbol} (ID: {ticker_id})") return ticker_id except Exception as e: logger.error(f"Error creating ticker {symbol}: {e}") return None async def sync_ticker_data( self, symbol: str, asset_type: AssetType, timeframe: Timeframe = Timeframe.MINUTE_5, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, backfill_days: int = 30 ) -> Dict[str, Any]: """ Sync historical data for a ticker. Args: symbol: Ticker symbol asset_type: Type of asset timeframe: Data timeframe start_date: Start date (if None, uses last sync or backfill_days) end_date: End date (if None, uses current time) backfill_days: Days to backfill if no previous data Returns: Dict with sync results (rows_inserted, status, etc.) """ logger.info(f"Starting sync for {symbol} ({asset_type.value}) - {timeframe.value}") # Get or create ticker ticker_id = await self.get_or_create_ticker(symbol, asset_type) if not ticker_id: return { "status": SyncStatus.FAILED, "error": "Failed to get/create ticker", "rows_inserted": 0 } # Get table name table_name = self.TIMEFRAME_TABLES.get(timeframe, "ohlcv_5min") # Determine time range if not start_date: async with self.db.acquire() as conn: row = await conn.fetchrow( f""" SELECT MAX(timestamp) as last_ts FROM market_data.{table_name} WHERE ticker_id = $1 """, ticker_id ) if row["last_ts"]: # Continue from last sync start_date = row["last_ts"] + timedelta(minutes=1) logger.info(f"Continuing from last sync: {start_date}") else: # Backfill from N days ago start_date = datetime.now() - timedelta(days=backfill_days) logger.info(f"Starting backfill from {backfill_days} days ago") if not end_date: end_date = datetime.now() # Prevent syncing future data if start_date >= end_date: logger.warning(f"Start date >= end date, nothing to sync") return { "status": SyncStatus.SUCCESS, "rows_inserted": 0, "message": "Already up to date" } # Collect bars from API bars = [] total_bars = 0 try: async for bar in self.client.get_aggregates( symbol=symbol, asset_type=asset_type, timeframe=timeframe, start_date=start_date, end_date=end_date, adjusted=True, limit=50000 ): bars.append(( ticker_id, bar.timestamp, float(bar.open), float(bar.high), float(bar.low), float(bar.close), float(bar.volume) if bar.volume else 0.0, float(bar.vwap) if bar.vwap else None, bar.transactions, int(bar.timestamp.timestamp()) )) # Insert in batches if len(bars) >= self.batch_size: inserted = await self._insert_bars(table_name, bars) total_bars += inserted bars = [] # Insert remaining bars if bars: inserted = await self._insert_bars(table_name, bars) total_bars += inserted # Update sync status await self._update_sync_status( ticker_id=ticker_id, status=SyncStatus.SUCCESS, rows=total_bars, timeframe=timeframe.value ) logger.info(f"Sync completed for {symbol}: {total_bars} bars inserted") return { "status": SyncStatus.SUCCESS, "symbol": symbol, "timeframe": timeframe.value, "rows_inserted": total_bars, "start_date": start_date.isoformat(), "end_date": end_date.isoformat() } except Exception as e: logger.error(f"Error syncing {symbol}: {e}", exc_info=True) # Update sync status with error await self._update_sync_status( ticker_id=ticker_id, status=SyncStatus.FAILED, rows=total_bars, error=str(e), timeframe=timeframe.value ) return { "status": SyncStatus.FAILED, "symbol": symbol, "error": str(e), "rows_inserted": total_bars } async def _insert_bars( self, table_name: str, bars: List[tuple] ) -> int: """ Insert bars into database with conflict handling. Args: table_name: Target table name bars: List of bar tuples Returns: Number of rows inserted/updated """ if not bars: return 0 async with self.db.acquire() as conn: # Use ON CONFLICT to handle duplicates await conn.executemany( f""" INSERT INTO market_data.{table_name} (ticker_id, timestamp, open, high, low, close, volume, vwap, trades, ts_epoch) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (ticker_id, timestamp) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, volume = EXCLUDED.volume, vwap = EXCLUDED.vwap, trades = EXCLUDED.trades """, bars ) return len(bars) async def _update_sync_status( self, ticker_id: int, status: SyncStatus, rows: int = 0, error: Optional[str] = None, timeframe: str = "5min" ): """Update sync status in database.""" async with self.db.acquire() as conn: await conn.execute( """ INSERT INTO market_data.sync_status (ticker_id, timeframe, last_sync_timestamp, last_sync_rows, sync_status, error_message, updated_at) VALUES ($1, $2, NOW(), $3, $4, $5, NOW()) ON CONFLICT (ticker_id, timeframe) DO UPDATE SET last_sync_timestamp = NOW(), last_sync_rows = $3, sync_status = $4, error_message = $5, updated_at = NOW() """, ticker_id, timeframe, rows, status.value, error ) async def sync_all_active_tickers( self, timeframe: Timeframe = Timeframe.MINUTE_5, backfill_days: int = 1 ) -> Dict[str, Any]: """ Sync all active tickers from database. Args: timeframe: Timeframe to sync backfill_days: Days to backfill for new data Returns: Summary of sync results """ logger.info("Starting sync for all active tickers") # Get active tickers async with self.db.acquire() as conn: rows = await conn.fetch( """ SELECT id, symbol, asset_type FROM market_data.tickers WHERE is_active = true ORDER BY symbol """ ) results = [] for row in rows: try: asset_type = AssetType(row["asset_type"]) result = await self.sync_ticker_data( symbol=row["symbol"], asset_type=asset_type, timeframe=timeframe, backfill_days=backfill_days ) results.append(result) # Small delay to respect rate limits await asyncio.sleep(0.5) except Exception as e: logger.error(f"Error syncing {row['symbol']}: {e}") results.append({ "status": SyncStatus.FAILED, "symbol": row["symbol"], "error": str(e) }) # Calculate summary total = len(results) success = sum(1 for r in results if r["status"] == SyncStatus.SUCCESS) failed = total - success total_rows = sum(r.get("rows_inserted", 0) for r in results) summary = { "total_tickers": total, "successful": success, "failed": failed, "total_rows_inserted": total_rows, "results": results } logger.info(f"Sync completed: {success}/{total} successful, {total_rows} rows") return summary async def get_sync_status( self, symbol: Optional[str] = None ) -> List[Dict[str, Any]]: """ Get sync status for tickers. Args: symbol: Optional symbol to filter by Returns: List of sync status records """ async with self.db.acquire() as conn: if symbol: rows = await conn.fetch( """ SELECT t.symbol, t.asset_type, s.timeframe, s.last_sync_timestamp, s.last_sync_rows, s.sync_status, s.error_message, s.updated_at FROM market_data.tickers t LEFT JOIN market_data.sync_status s ON s.ticker_id = t.id WHERE UPPER(t.symbol) = UPPER($1) ORDER BY s.timeframe """, symbol ) else: rows = await conn.fetch( """ SELECT t.symbol, t.asset_type, s.timeframe, s.last_sync_timestamp, s.last_sync_rows, s.sync_status, s.error_message, s.updated_at FROM market_data.tickers t LEFT JOIN market_data.sync_status s ON s.ticker_id = t.id WHERE t.is_active = true ORDER BY t.symbol, s.timeframe LIMIT 100 """ ) return [ { "symbol": row["symbol"], "asset_type": row["asset_type"], "timeframe": row["timeframe"], "last_sync": row["last_sync_timestamp"].isoformat() if row["last_sync_timestamp"] else None, "rows_synced": row["last_sync_rows"], "status": row["sync_status"], "error": row["error_message"], "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None } for row in rows ] async def get_supported_symbols( self, asset_type: Optional[AssetType] = None ) -> List[Dict[str, Any]]: """ Get list of supported symbols for Polygon/Massive. This returns symbols from our config that we support. Args: asset_type: Optional filter by asset type Returns: List of supported symbols with metadata """ symbols = [] for symbol, mapping in TICKER_MAPPINGS.items(): # Determine asset type from prefix polygon_symbol = mapping["polygon"] if polygon_symbol.startswith("C:"): detected_type = AssetType.FOREX elif polygon_symbol.startswith("X:"): detected_type = AssetType.CRYPTO elif polygon_symbol.startswith("I:"): detected_type = AssetType.INDEX else: detected_type = AssetType.STOCK # Filter by asset type if specified if asset_type and detected_type != asset_type: continue symbols.append({ "symbol": symbol, "polygon_symbol": polygon_symbol, "mt4_symbol": mapping.get("mt4"), "asset_type": detected_type.value, "pip_value": mapping.get("pip_value"), "supported": True }) return symbols