feat: Add Polygon data fetch script

- scripts/fetch_polygon_data.py: Async script to fetch OHLCV data from Polygon API
- Supports backfilling historical data (default 365 days)
- Rate limiting for free tier (5 req/min)
- Batch inserts with ON CONFLICT handling
- Configurable via .env file

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Adrian Flores Cortes 2026-01-25 05:53:16 -06:00
parent 62a9f3e1d9
commit 0e20c7c75c

View File

@ -0,0 +1,349 @@
#!/usr/bin/env python3
"""
Polygon Data Fetcher - Trading Platform
Fetches historical OHLCV data from Polygon.io and stores in PostgreSQL.
Usage:
~/venvs/data-service/bin/python scripts/fetch_polygon_data.py --days 30
Environment variables:
POLYGON_API_KEY - Polygon.io API key
DB_HOST - PostgreSQL host (default: localhost)
DB_PORT - PostgreSQL port (default: 5432)
DB_NAME - Database name (default: trading_platform)
DB_USER - Database user (default: trading_user)
DB_PASSWORD - Database password
"""
import os
import asyncio
import argparse
import logging
from datetime import datetime, timedelta
from pathlib import Path
# Add src to path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
import asyncpg
import aiohttp
from dotenv import load_dotenv
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# Load environment
load_dotenv(Path(__file__).parent.parent / ".env")
# Configuration
POLYGON_API_KEY = os.getenv("POLYGON_API_KEY")
POLYGON_BASE_URL = os.getenv("POLYGON_BASE_URL", "https://api.polygon.io")
RATE_LIMIT = int(os.getenv("POLYGON_RATE_LIMIT", "5"))
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = int(os.getenv("DB_PORT", "5432"))
DB_NAME = os.getenv("DB_NAME", "trading_platform")
DB_USER = os.getenv("DB_USER", "trading_user")
DB_PASSWORD = os.getenv("DB_PASSWORD", "trading_dev_2026")
class PolygonFetcher:
"""Simple Polygon API fetcher with rate limiting."""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = POLYGON_BASE_URL
self.request_count = 0
self.last_request_time = datetime.min
async def fetch_aggregates(
self,
ticker: str,
multiplier: int,
timespan: str,
start_date: datetime,
end_date: datetime,
session: aiohttp.ClientSession
) -> list:
"""Fetch aggregate bars from Polygon API."""
all_results = []
start_str = start_date.strftime("%Y-%m-%d")
end_str = end_date.strftime("%Y-%m-%d")
url = f"{self.base_url}/v2/aggs/ticker/{ticker}/range/{multiplier}/{timespan}/{start_str}/{end_str}"
params = {
"adjusted": "true",
"sort": "asc",
"limit": 50000,
"apiKey": self.api_key
}
while url:
# Rate limiting
await self._rate_limit()
async with session.get(url, params=params if "apiKey" not in url else None) as response:
if response.status == 429:
logger.warning("Rate limited, waiting 60s...")
await asyncio.sleep(60)
continue
if response.status != 200:
text = await response.text()
logger.error(f"API error {response.status}: {text}")
break
data = await response.json()
results = data.get("results", [])
all_results.extend(results)
if len(results) > 0:
logger.info(f" Fetched {len(results)} bars (total: {len(all_results)})")
# Check for pagination
next_url = data.get("next_url")
if next_url:
url = next_url + f"&apiKey={self.api_key}"
params = None
else:
break
return all_results
async def _rate_limit(self):
"""Enforce rate limiting."""
now = datetime.now()
# Reset counter after 60 seconds
if (now - self.last_request_time).total_seconds() >= 60:
self.request_count = 0
self.last_request_time = now
# Wait if limit reached
if self.request_count >= RATE_LIMIT:
wait_time = 60 - (now - self.last_request_time).total_seconds()
if wait_time > 0:
logger.info(f"Rate limit reached, waiting {wait_time:.1f}s...")
await asyncio.sleep(wait_time)
self.request_count = 0
self.last_request_time = datetime.now()
self.request_count += 1
async def get_tickers(pool: asyncpg.Pool) -> list:
"""Get active tickers from database."""
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, symbol, polygon_ticker, asset_type
FROM market_data.tickers
WHERE is_active = true
ORDER BY id
"""
)
return [dict(row) for row in rows]
async def get_last_timestamp(pool: asyncpg.Pool, ticker_id: int, table: str) -> datetime:
"""Get the last timestamp for a ticker."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
f"""
SELECT MAX(timestamp) as last_ts
FROM market_data.{table}
WHERE ticker_id = $1
""",
ticker_id
)
last_ts = row["last_ts"] if row and row["last_ts"] else None
# Remove timezone info for consistent comparison
if last_ts and last_ts.tzinfo:
last_ts = last_ts.replace(tzinfo=None)
return last_ts
async def insert_bars(pool: asyncpg.Pool, ticker_id: int, bars: list, table: str) -> int:
"""Insert OHLCV bars into database."""
if not bars:
return 0
# Convert bars to tuples
data = []
for bar in bars:
ts = datetime.fromtimestamp(bar["t"] / 1000)
data.append((
ticker_id,
ts,
float(bar["o"]),
float(bar["h"]),
float(bar["l"]),
float(bar["c"]),
float(bar.get("v", 0)),
float(bar.get("vw")) if bar.get("vw") else None,
int(ts.timestamp())
))
async with pool.acquire() as conn:
await conn.executemany(
f"""
INSERT INTO market_data.{table}
(ticker_id, timestamp, open, high, low, close, volume, vwap, ts_epoch)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (ticker_id, timestamp) DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
volume = EXCLUDED.volume,
vwap = EXCLUDED.vwap
""",
data
)
return len(data)
async def sync_ticker(
fetcher: PolygonFetcher,
pool: asyncpg.Pool,
ticker: dict,
backfill_days: int,
session: aiohttp.ClientSession
) -> dict:
"""Sync a single ticker."""
symbol = ticker["symbol"]
polygon_ticker = ticker["polygon_ticker"]
ticker_id = ticker["id"]
logger.info(f"\n{'='*50}")
logger.info(f"Syncing {symbol} ({polygon_ticker})")
logger.info(f"{'='*50}")
# Determine time range
last_ts = await get_last_timestamp(pool, ticker_id, "ohlcv_5m")
if last_ts:
start_date = last_ts + timedelta(minutes=5)
logger.info(f"Continuing from: {start_date}")
else:
start_date = datetime.now() - timedelta(days=backfill_days)
logger.info(f"Backfilling {backfill_days} days from: {start_date}")
end_date = datetime.now()
if start_date >= end_date:
logger.info(f"Already up to date")
return {"symbol": symbol, "status": "up_to_date", "rows": 0}
# Fetch data
try:
bars = await fetcher.fetch_aggregates(
ticker=polygon_ticker,
multiplier=5,
timespan="minute",
start_date=start_date,
end_date=end_date,
session=session
)
if not bars:
logger.info(f"No new data available")
return {"symbol": symbol, "status": "no_data", "rows": 0}
# Insert into database
inserted = await insert_bars(pool, ticker_id, bars, "ohlcv_5m")
logger.info(f"Inserted {inserted} rows for {symbol}")
return {"symbol": symbol, "status": "success", "rows": inserted}
except Exception as e:
logger.error(f"Error syncing {symbol}: {e}")
return {"symbol": symbol, "status": "error", "error": str(e), "rows": 0}
async def main(backfill_days: int = 30, symbols: list = None):
"""Main entry point."""
logger.info("="*60)
logger.info("Polygon Data Fetcher - Trading Platform")
logger.info("="*60)
if not POLYGON_API_KEY:
logger.error("POLYGON_API_KEY is required!")
return
logger.info(f"Polygon API Key: {POLYGON_API_KEY[:10]}...")
logger.info(f"Database: {DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}")
logger.info(f"Backfill days: {backfill_days}")
# Connect to database
logger.info("\nConnecting to database...")
pool = await asyncpg.create_pool(
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
min_size=2,
max_size=5
)
# Get tickers
tickers = await get_tickers(pool)
logger.info(f"Found {len(tickers)} active tickers")
if symbols:
tickers = [t for t in tickers if t["symbol"] in symbols]
logger.info(f"Filtered to {len(tickers)} tickers: {[t['symbol'] for t in tickers]}")
# Create fetcher
fetcher = PolygonFetcher(POLYGON_API_KEY)
# Sync all tickers
results = []
async with aiohttp.ClientSession() as session:
for ticker in tickers:
result = await sync_ticker(
fetcher=fetcher,
pool=pool,
ticker=ticker,
backfill_days=backfill_days,
session=session
)
results.append(result)
# Small delay between tickers
await asyncio.sleep(1)
# Print summary
logger.info("\n" + "="*60)
logger.info("SYNC SUMMARY")
logger.info("="*60)
total_rows = 0
for r in results:
status_icon = "" if r["status"] == "success" else "" if r["status"] == "up_to_date" else ""
logger.info(f" {status_icon} {r['symbol']}: {r['status']} ({r['rows']} rows)")
total_rows += r["rows"]
logger.info(f"\nTotal rows inserted: {total_rows}")
# Close pool
await pool.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Fetch Polygon data")
parser.add_argument("--days", type=int, default=30, help="Days to backfill")
parser.add_argument("--symbols", nargs="+", help="Specific symbols to sync")
args = parser.parse_args()
asyncio.run(main(backfill_days=args.days, symbols=args.symbols))