diff --git a/scripts/fetch_polygon_data.py b/scripts/fetch_polygon_data.py new file mode 100644 index 0000000..046b92a --- /dev/null +++ b/scripts/fetch_polygon_data.py @@ -0,0 +1,349 @@ +#!/usr/bin/env python3 +""" +Polygon Data Fetcher - Trading Platform +Fetches historical OHLCV data from Polygon.io and stores in PostgreSQL. + +Usage: + ~/venvs/data-service/bin/python scripts/fetch_polygon_data.py --days 30 + +Environment variables: + POLYGON_API_KEY - Polygon.io API key + DB_HOST - PostgreSQL host (default: localhost) + DB_PORT - PostgreSQL port (default: 5432) + DB_NAME - Database name (default: trading_platform) + DB_USER - Database user (default: trading_user) + DB_PASSWORD - Database password +""" + +import os +import asyncio +import argparse +import logging +from datetime import datetime, timedelta +from pathlib import Path + +# Add src to path +import sys +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) + +import asyncpg +import aiohttp +from dotenv import load_dotenv + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.StreamHandler()] +) +logger = logging.getLogger(__name__) + +# Load environment +load_dotenv(Path(__file__).parent.parent / ".env") + +# Configuration +POLYGON_API_KEY = os.getenv("POLYGON_API_KEY") +POLYGON_BASE_URL = os.getenv("POLYGON_BASE_URL", "https://api.polygon.io") +RATE_LIMIT = int(os.getenv("POLYGON_RATE_LIMIT", "5")) + +DB_HOST = os.getenv("DB_HOST", "localhost") +DB_PORT = int(os.getenv("DB_PORT", "5432")) +DB_NAME = os.getenv("DB_NAME", "trading_platform") +DB_USER = os.getenv("DB_USER", "trading_user") +DB_PASSWORD = os.getenv("DB_PASSWORD", "trading_dev_2026") + + +class PolygonFetcher: + """Simple Polygon API fetcher with rate limiting.""" + + def __init__(self, api_key: str): + self.api_key = api_key + self.base_url = POLYGON_BASE_URL + self.request_count = 0 + self.last_request_time = datetime.min + + async def fetch_aggregates( + self, + ticker: str, + multiplier: int, + timespan: str, + start_date: datetime, + end_date: datetime, + session: aiohttp.ClientSession + ) -> list: + """Fetch aggregate bars from Polygon API.""" + all_results = [] + + start_str = start_date.strftime("%Y-%m-%d") + end_str = end_date.strftime("%Y-%m-%d") + + url = f"{self.base_url}/v2/aggs/ticker/{ticker}/range/{multiplier}/{timespan}/{start_str}/{end_str}" + params = { + "adjusted": "true", + "sort": "asc", + "limit": 50000, + "apiKey": self.api_key + } + + while url: + # Rate limiting + await self._rate_limit() + + async with session.get(url, params=params if "apiKey" not in url else None) as response: + if response.status == 429: + logger.warning("Rate limited, waiting 60s...") + await asyncio.sleep(60) + continue + + if response.status != 200: + text = await response.text() + logger.error(f"API error {response.status}: {text}") + break + + data = await response.json() + + results = data.get("results", []) + all_results.extend(results) + + if len(results) > 0: + logger.info(f" Fetched {len(results)} bars (total: {len(all_results)})") + + # Check for pagination + next_url = data.get("next_url") + if next_url: + url = next_url + f"&apiKey={self.api_key}" + params = None + else: + break + + return all_results + + async def _rate_limit(self): + """Enforce rate limiting.""" + now = datetime.now() + + # Reset counter after 60 seconds + if (now - self.last_request_time).total_seconds() >= 60: + self.request_count = 0 + self.last_request_time = now + + # Wait if limit reached + if self.request_count >= RATE_LIMIT: + wait_time = 60 - (now - self.last_request_time).total_seconds() + if wait_time > 0: + logger.info(f"Rate limit reached, waiting {wait_time:.1f}s...") + await asyncio.sleep(wait_time) + self.request_count = 0 + self.last_request_time = datetime.now() + + self.request_count += 1 + + +async def get_tickers(pool: asyncpg.Pool) -> list: + """Get active tickers from database.""" + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT id, symbol, polygon_ticker, asset_type + FROM market_data.tickers + WHERE is_active = true + ORDER BY id + """ + ) + return [dict(row) for row in rows] + + +async def get_last_timestamp(pool: asyncpg.Pool, ticker_id: int, table: str) -> datetime: + """Get the last timestamp for a ticker.""" + async with pool.acquire() as conn: + row = await conn.fetchrow( + f""" + SELECT MAX(timestamp) as last_ts + FROM market_data.{table} + WHERE ticker_id = $1 + """, + ticker_id + ) + last_ts = row["last_ts"] if row and row["last_ts"] else None + # Remove timezone info for consistent comparison + if last_ts and last_ts.tzinfo: + last_ts = last_ts.replace(tzinfo=None) + return last_ts + + +async def insert_bars(pool: asyncpg.Pool, ticker_id: int, bars: list, table: str) -> int: + """Insert OHLCV bars into database.""" + if not bars: + return 0 + + # Convert bars to tuples + data = [] + for bar in bars: + ts = datetime.fromtimestamp(bar["t"] / 1000) + data.append(( + ticker_id, + ts, + float(bar["o"]), + float(bar["h"]), + float(bar["l"]), + float(bar["c"]), + float(bar.get("v", 0)), + float(bar.get("vw")) if bar.get("vw") else None, + int(ts.timestamp()) + )) + + async with pool.acquire() as conn: + await conn.executemany( + f""" + INSERT INTO market_data.{table} + (ticker_id, timestamp, open, high, low, close, volume, vwap, ts_epoch) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (ticker_id, timestamp) DO UPDATE SET + open = EXCLUDED.open, + high = EXCLUDED.high, + low = EXCLUDED.low, + close = EXCLUDED.close, + volume = EXCLUDED.volume, + vwap = EXCLUDED.vwap + """, + data + ) + + return len(data) + + +async def sync_ticker( + fetcher: PolygonFetcher, + pool: asyncpg.Pool, + ticker: dict, + backfill_days: int, + session: aiohttp.ClientSession +) -> dict: + """Sync a single ticker.""" + symbol = ticker["symbol"] + polygon_ticker = ticker["polygon_ticker"] + ticker_id = ticker["id"] + + logger.info(f"\n{'='*50}") + logger.info(f"Syncing {symbol} ({polygon_ticker})") + logger.info(f"{'='*50}") + + # Determine time range + last_ts = await get_last_timestamp(pool, ticker_id, "ohlcv_5m") + + if last_ts: + start_date = last_ts + timedelta(minutes=5) + logger.info(f"Continuing from: {start_date}") + else: + start_date = datetime.now() - timedelta(days=backfill_days) + logger.info(f"Backfilling {backfill_days} days from: {start_date}") + + end_date = datetime.now() + + if start_date >= end_date: + logger.info(f"Already up to date") + return {"symbol": symbol, "status": "up_to_date", "rows": 0} + + # Fetch data + try: + bars = await fetcher.fetch_aggregates( + ticker=polygon_ticker, + multiplier=5, + timespan="minute", + start_date=start_date, + end_date=end_date, + session=session + ) + + if not bars: + logger.info(f"No new data available") + return {"symbol": symbol, "status": "no_data", "rows": 0} + + # Insert into database + inserted = await insert_bars(pool, ticker_id, bars, "ohlcv_5m") + logger.info(f"Inserted {inserted} rows for {symbol}") + + return {"symbol": symbol, "status": "success", "rows": inserted} + + except Exception as e: + logger.error(f"Error syncing {symbol}: {e}") + return {"symbol": symbol, "status": "error", "error": str(e), "rows": 0} + + +async def main(backfill_days: int = 30, symbols: list = None): + """Main entry point.""" + logger.info("="*60) + logger.info("Polygon Data Fetcher - Trading Platform") + logger.info("="*60) + + if not POLYGON_API_KEY: + logger.error("POLYGON_API_KEY is required!") + return + + logger.info(f"Polygon API Key: {POLYGON_API_KEY[:10]}...") + logger.info(f"Database: {DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}") + logger.info(f"Backfill days: {backfill_days}") + + # Connect to database + logger.info("\nConnecting to database...") + pool = await asyncpg.create_pool( + host=DB_HOST, + port=DB_PORT, + database=DB_NAME, + user=DB_USER, + password=DB_PASSWORD, + min_size=2, + max_size=5 + ) + + # Get tickers + tickers = await get_tickers(pool) + logger.info(f"Found {len(tickers)} active tickers") + + if symbols: + tickers = [t for t in tickers if t["symbol"] in symbols] + logger.info(f"Filtered to {len(tickers)} tickers: {[t['symbol'] for t in tickers]}") + + # Create fetcher + fetcher = PolygonFetcher(POLYGON_API_KEY) + + # Sync all tickers + results = [] + async with aiohttp.ClientSession() as session: + for ticker in tickers: + result = await sync_ticker( + fetcher=fetcher, + pool=pool, + ticker=ticker, + backfill_days=backfill_days, + session=session + ) + results.append(result) + + # Small delay between tickers + await asyncio.sleep(1) + + # Print summary + logger.info("\n" + "="*60) + logger.info("SYNC SUMMARY") + logger.info("="*60) + + total_rows = 0 + for r in results: + status_icon = "✓" if r["status"] == "success" else "○" if r["status"] == "up_to_date" else "✗" + logger.info(f" {status_icon} {r['symbol']}: {r['status']} ({r['rows']} rows)") + total_rows += r["rows"] + + logger.info(f"\nTotal rows inserted: {total_rows}") + + # Close pool + await pool.close() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Fetch Polygon data") + parser.add_argument("--days", type=int, default=30, help="Days to backfill") + parser.add_argument("--symbols", nargs="+", help="Specific symbols to sync") + args = parser.parse_args() + + asyncio.run(main(backfill_days=args.days, symbols=args.symbols))