#!/usr/bin/env python3 """ Polygon Data Fetcher - Trading Platform Fetches historical OHLCV data from Polygon.io and stores in PostgreSQL. Usage: ~/venvs/data-service/bin/python scripts/fetch_polygon_data.py --days 30 Environment variables: POLYGON_API_KEY - Polygon.io API key DB_HOST - PostgreSQL host (default: localhost) DB_PORT - PostgreSQL port (default: 5432) DB_NAME - Database name (default: trading_platform) DB_USER - Database user (default: trading_user) DB_PASSWORD - Database password """ import os import asyncio import argparse import logging from datetime import datetime, timedelta from pathlib import Path # Add src to path import sys sys.path.insert(0, str(Path(__file__).parent.parent / "src")) import asyncpg import aiohttp from dotenv import load_dotenv # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler()] ) logger = logging.getLogger(__name__) # Load environment load_dotenv(Path(__file__).parent.parent / ".env") # Configuration POLYGON_API_KEY = os.getenv("POLYGON_API_KEY") POLYGON_BASE_URL = os.getenv("POLYGON_BASE_URL", "https://api.polygon.io") RATE_LIMIT = int(os.getenv("POLYGON_RATE_LIMIT", "5")) DB_HOST = os.getenv("DB_HOST", "localhost") DB_PORT = int(os.getenv("DB_PORT", "5432")) DB_NAME = os.getenv("DB_NAME", "trading_platform") DB_USER = os.getenv("DB_USER", "trading_user") DB_PASSWORD = os.getenv("DB_PASSWORD", "trading_dev_2026") class PolygonFetcher: """Simple Polygon API fetcher with rate limiting.""" def __init__(self, api_key: str): self.api_key = api_key self.base_url = POLYGON_BASE_URL self.request_count = 0 self.last_request_time = datetime.min async def fetch_aggregates( self, ticker: str, multiplier: int, timespan: str, start_date: datetime, end_date: datetime, session: aiohttp.ClientSession ) -> list: """Fetch aggregate bars from Polygon API.""" all_results = [] start_str = start_date.strftime("%Y-%m-%d") end_str = end_date.strftime("%Y-%m-%d") url = f"{self.base_url}/v2/aggs/ticker/{ticker}/range/{multiplier}/{timespan}/{start_str}/{end_str}" params = { "adjusted": "true", "sort": "asc", "limit": 50000, "apiKey": self.api_key } while url: # Rate limiting await self._rate_limit() async with session.get(url, params=params if "apiKey" not in url else None) as response: if response.status == 429: logger.warning("Rate limited, waiting 60s...") await asyncio.sleep(60) continue if response.status != 200: text = await response.text() logger.error(f"API error {response.status}: {text}") break data = await response.json() results = data.get("results", []) all_results.extend(results) if len(results) > 0: logger.info(f" Fetched {len(results)} bars (total: {len(all_results)})") # Check for pagination next_url = data.get("next_url") if next_url: url = next_url + f"&apiKey={self.api_key}" params = None else: break return all_results async def _rate_limit(self): """Enforce rate limiting.""" now = datetime.now() # Reset counter after 60 seconds if (now - self.last_request_time).total_seconds() >= 60: self.request_count = 0 self.last_request_time = now # Wait if limit reached if self.request_count >= RATE_LIMIT: wait_time = 60 - (now - self.last_request_time).total_seconds() if wait_time > 0: logger.info(f"Rate limit reached, waiting {wait_time:.1f}s...") await asyncio.sleep(wait_time) self.request_count = 0 self.last_request_time = datetime.now() self.request_count += 1 async def get_tickers(pool: asyncpg.Pool) -> list: """Get active tickers from database.""" async with pool.acquire() as conn: rows = await conn.fetch( """ SELECT id, symbol, polygon_ticker, asset_type FROM market_data.tickers WHERE is_active = true ORDER BY id """ ) return [dict(row) for row in rows] async def get_last_timestamp(pool: asyncpg.Pool, ticker_id: int, table: str) -> datetime: """Get the last timestamp for a ticker.""" async with pool.acquire() as conn: row = await conn.fetchrow( f""" SELECT MAX(timestamp) as last_ts FROM market_data.{table} WHERE ticker_id = $1 """, ticker_id ) last_ts = row["last_ts"] if row and row["last_ts"] else None # Remove timezone info for consistent comparison if last_ts and last_ts.tzinfo: last_ts = last_ts.replace(tzinfo=None) return last_ts async def insert_bars(pool: asyncpg.Pool, ticker_id: int, bars: list, table: str) -> int: """Insert OHLCV bars into database.""" if not bars: return 0 # Convert bars to tuples data = [] for bar in bars: ts = datetime.fromtimestamp(bar["t"] / 1000) data.append(( ticker_id, ts, float(bar["o"]), float(bar["h"]), float(bar["l"]), float(bar["c"]), float(bar.get("v", 0)), float(bar.get("vw")) if bar.get("vw") else None, int(ts.timestamp()) )) async with pool.acquire() as conn: await conn.executemany( f""" INSERT INTO market_data.{table} (ticker_id, timestamp, open, high, low, close, volume, vwap, ts_epoch) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (ticker_id, timestamp) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, volume = EXCLUDED.volume, vwap = EXCLUDED.vwap """, data ) return len(data) async def sync_ticker( fetcher: PolygonFetcher, pool: asyncpg.Pool, ticker: dict, backfill_days: int, session: aiohttp.ClientSession ) -> dict: """Sync a single ticker.""" symbol = ticker["symbol"] polygon_ticker = ticker["polygon_ticker"] ticker_id = ticker["id"] logger.info(f"\n{'='*50}") logger.info(f"Syncing {symbol} ({polygon_ticker})") logger.info(f"{'='*50}") # Determine time range last_ts = await get_last_timestamp(pool, ticker_id, "ohlcv_5m") if last_ts: start_date = last_ts + timedelta(minutes=5) logger.info(f"Continuing from: {start_date}") else: start_date = datetime.now() - timedelta(days=backfill_days) logger.info(f"Backfilling {backfill_days} days from: {start_date}") end_date = datetime.now() if start_date >= end_date: logger.info(f"Already up to date") return {"symbol": symbol, "status": "up_to_date", "rows": 0} # Fetch data try: bars = await fetcher.fetch_aggregates( ticker=polygon_ticker, multiplier=5, timespan="minute", start_date=start_date, end_date=end_date, session=session ) if not bars: logger.info(f"No new data available") return {"symbol": symbol, "status": "no_data", "rows": 0} # Insert into database inserted = await insert_bars(pool, ticker_id, bars, "ohlcv_5m") logger.info(f"Inserted {inserted} rows for {symbol}") return {"symbol": symbol, "status": "success", "rows": inserted} except Exception as e: logger.error(f"Error syncing {symbol}: {e}") return {"symbol": symbol, "status": "error", "error": str(e), "rows": 0} async def main(backfill_days: int = 30, symbols: list = None): """Main entry point.""" logger.info("="*60) logger.info("Polygon Data Fetcher - Trading Platform") logger.info("="*60) if not POLYGON_API_KEY: logger.error("POLYGON_API_KEY is required!") return logger.info(f"Polygon API Key: {POLYGON_API_KEY[:10]}...") logger.info(f"Database: {DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}") logger.info(f"Backfill days: {backfill_days}") # Connect to database logger.info("\nConnecting to database...") pool = await asyncpg.create_pool( host=DB_HOST, port=DB_PORT, database=DB_NAME, user=DB_USER, password=DB_PASSWORD, min_size=2, max_size=5 ) # Get tickers tickers = await get_tickers(pool) logger.info(f"Found {len(tickers)} active tickers") if symbols: tickers = [t for t in tickers if t["symbol"] in symbols] logger.info(f"Filtered to {len(tickers)} tickers: {[t['symbol'] for t in tickers]}") # Create fetcher fetcher = PolygonFetcher(POLYGON_API_KEY) # Sync all tickers results = [] async with aiohttp.ClientSession() as session: for ticker in tickers: result = await sync_ticker( fetcher=fetcher, pool=pool, ticker=ticker, backfill_days=backfill_days, session=session ) results.append(result) # Small delay between tickers await asyncio.sleep(1) # Print summary logger.info("\n" + "="*60) logger.info("SYNC SUMMARY") logger.info("="*60) total_rows = 0 for r in results: status_icon = "✓" if r["status"] == "success" else "○" if r["status"] == "up_to_date" else "✗" logger.info(f" {status_icon} {r['symbol']}: {r['status']} ({r['rows']} rows)") total_rows += r["rows"] logger.info(f"\nTotal rows inserted: {total_rows}") # Close pool await pool.close() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Fetch Polygon data") parser.add_argument("--days", type=int, default=30, help="Days to backfill") parser.add_argument("--symbols", nargs="+", help="Specific symbols to sync") args = parser.parse_args() asyncio.run(main(backfill_days=args.days, symbols=args.symbols))