From 01683a850f1008ad033acbbe4821d1f5bd50ac3d Mon Sep 17 00:00:00 2001 From: rckrdmrd Date: Tue, 13 Jan 2026 13:28:18 -0600 Subject: [PATCH] feat: Initial Data Service implementation - FastAPI service for market data synchronization from Polygon.io - Async PostgreSQL integration with SQLAlchemy - API endpoints for tickers, OHLCV data, and sync operations - Rate-limited Polygon.io API client - Background task support for historical data sync - Support for 6 assets: XAUUSD, EURUSD, BTCUSD, GBPUSD, USDJPY, AUDUSD Endpoints: - GET /api/v1/tickers - List all tickers - GET /api/v1/tickers/{symbol}/ohlcv - Get OHLCV data - GET /api/v1/tickers/{symbol}/latest - Get latest price - POST /api/v1/sync/historical - Start historical sync - POST /api/v1/sync/ticker/{symbol} - Sync single ticker Co-Authored-By: Claude Opus 4.5 --- .env.example | 26 ++++ .gitignore | 28 ++++ Dockerfile | 33 +++++ main.py | 90 ++++++++++++ requirements.txt | 41 ++++++ src/__init__.py | 4 + src/config/__init__.py | 3 + src/config/settings.py | 40 +++++ src/models/__init__.py | 4 + src/models/database.py | 53 +++++++ src/routers/__init__.py | 4 + src/routers/sync.py | 133 +++++++++++++++++ src/routers/tickers.py | 204 +++++++++++++++++++++++++ src/schemas/__init__.py | 21 +++ src/schemas/market_data.py | 94 ++++++++++++ src/services/__init__.py | 4 + src/services/polygon_service.py | 196 +++++++++++++++++++++++++ src/services/sync_service.py | 253 ++++++++++++++++++++++++++++++++ 18 files changed, 1231 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 main.py create mode 100644 requirements.txt create mode 100644 src/__init__.py create mode 100644 src/config/__init__.py create mode 100644 src/config/settings.py create mode 100644 src/models/__init__.py create mode 100644 src/models/database.py create mode 100644 src/routers/__init__.py create mode 100644 src/routers/sync.py create mode 100644 src/routers/tickers.py create mode 100644 src/schemas/__init__.py create mode 100644 src/schemas/market_data.py create mode 100644 src/services/__init__.py create mode 100644 src/services/polygon_service.py create mode 100644 src/services/sync_service.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..ca4587b --- /dev/null +++ b/.env.example @@ -0,0 +1,26 @@ +# ============================================================================= +# Data Service - Environment Variables +# ============================================================================= + +# Server +PORT=8002 +HOST=0.0.0.0 +ENVIRONMENT=development +LOG_LEVEL=INFO + +# Database +DATABASE_URL=postgresql+asyncpg://trading_user:trading_dev_2025@localhost:5432/trading_platform + +# Polygon.io API +POLYGON_API_KEY=your_polygon_api_key_here +POLYGON_BASE_URL=https://api.polygon.io + +# Rate Limiting (requests per minute) +POLYGON_RATE_LIMIT=5 + +# Sync Settings +SYNC_BATCH_SIZE=1000 +SYNC_HISTORY_DAYS=365 + +# Redis (optional) +REDIS_URL=redis://localhost:6379/0 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9a8818b --- /dev/null +++ b/.gitignore @@ -0,0 +1,28 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +venv/ +ENV/ +env/ +.venv/ + +# Environment +.env +.env.local + +# IDE +.vscode/ +.idea/ +*.swp +*.swo + +# Logs +*.log + +# Test +.pytest_cache/ +.coverage +htmlcov/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..7badf7b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,33 @@ +# Data Service Dockerfile +# Market data synchronization service using Polygon.io API + +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements first for layer caching +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Create non-root user +RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app +USER appuser + +# Expose port +EXPOSE 8002 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD python -c "import httpx; httpx.get('http://localhost:8002/health')" || exit 1 + +# Run the application +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8002"] diff --git a/main.py b/main.py new file mode 100644 index 0000000..a26213c --- /dev/null +++ b/main.py @@ -0,0 +1,90 @@ +""" +Data Service - Market Data Synchronization API + +FastAPI service for fetching and synchronizing market data from Polygon.io +to the trading platform database. +""" +import logging +from contextlib import asynccontextmanager + +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from src.config import get_settings +from src.routers import tickers_router, sync_router + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + +settings = get_settings() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan handler.""" + logger.info("Data Service starting up...") + logger.info(f"Environment: {settings.environment}") + logger.info(f"Database: {settings.database_url.split('@')[1] if '@' in settings.database_url else 'configured'}") + yield + logger.info("Data Service shutting down...") + + +app = FastAPI( + title="Trading Platform - Data Service", + description="Market data synchronization service using Polygon.io API", + version="1.0.0", + lifespan=lifespan, + docs_url="/docs", + redoc_url="/redoc" +) + +# CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Include routers +app.include_router(tickers_router) +app.include_router(sync_router) + + +@app.get("/") +async def root(): + """Root endpoint with service info.""" + return { + "service": "data-service", + "version": "1.0.0", + "description": "Market data synchronization from Polygon.io", + "endpoints": { + "tickers": "/api/v1/tickers", + "sync": "/api/v1/sync", + "docs": "/docs" + } + } + + +@app.get("/health") +async def health_check(): + """Health check endpoint.""" + return { + "status": "healthy", + "service": "data-service" + } + + +if __name__ == "__main__": + import uvicorn + uvicorn.run( + "main:app", + host="0.0.0.0", + port=settings.port, + reload=settings.environment == "development" + ) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b8e8483 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,41 @@ +# ============================================================================= +# Data Service - Requirements +# ============================================================================= +# Python 3.11+ +# Puerto: 8002 +# ============================================================================= + +# Web Framework +fastapi==0.109.0 +uvicorn[standard]==0.27.0 +python-multipart==0.0.6 + +# Database +asyncpg==0.29.0 +sqlalchemy[asyncio]==2.0.25 +psycopg2-binary==2.9.9 + +# HTTP Client +httpx==0.26.0 +aiohttp==3.9.1 + +# Data Processing (opcional - no requerido para core) +# pandas>=2.2.0 # Requiere Python 3.9-3.12 +# numpy>=2.0.0 + +# Cache (opcional) +redis==5.0.1 + +# Environment +python-dotenv==1.0.0 + +# Utilities +pydantic==2.5.3 +pydantic-settings==2.1.0 + +# Logging +structlog==24.1.0 + +# Date/Time +python-dateutil==2.8.2 +pytz==2024.1 diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..cd7b506 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,4 @@ +""" +Data Service - Market data synchronization from Polygon.io +""" +__version__ = "1.0.0" diff --git a/src/config/__init__.py b/src/config/__init__.py new file mode 100644 index 0000000..4c86a99 --- /dev/null +++ b/src/config/__init__.py @@ -0,0 +1,3 @@ +from .settings import Settings, get_settings + +__all__ = ["Settings", "get_settings"] diff --git a/src/config/settings.py b/src/config/settings.py new file mode 100644 index 0000000..d39b9df --- /dev/null +++ b/src/config/settings.py @@ -0,0 +1,40 @@ +""" +Data Service - Configuration Settings +""" +from pydantic_settings import BaseSettings +from functools import lru_cache + + +class Settings(BaseSettings): + """Application settings loaded from environment variables.""" + + # Server + port: int = 8002 + host: str = "0.0.0.0" + environment: str = "development" + log_level: str = "INFO" + + # Database + database_url: str = "postgresql+asyncpg://trading_user:trading_dev_2025@localhost:5432/trading_platform" + + # Polygon.io API + polygon_api_key: str = "" + polygon_base_url: str = "https://api.polygon.io" + polygon_rate_limit: int = 5 # requests per minute + + # Sync Settings + sync_batch_size: int = 1000 + sync_history_days: int = 365 + + # Redis (optional) + redis_url: str = "redis://localhost:6379/0" + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" + + +@lru_cache() +def get_settings() -> Settings: + """Get cached settings instance.""" + return Settings() diff --git a/src/models/__init__.py b/src/models/__init__.py new file mode 100644 index 0000000..944980a --- /dev/null +++ b/src/models/__init__.py @@ -0,0 +1,4 @@ +"""Database models for Data Service.""" +from .database import get_db, get_db_context, engine + +__all__ = ["get_db", "get_db_context", "engine"] diff --git a/src/models/database.py b/src/models/database.py new file mode 100644 index 0000000..7f8797d --- /dev/null +++ b/src/models/database.py @@ -0,0 +1,53 @@ +""" +Database connection and session management. +""" +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +from sqlalchemy.orm import declarative_base +from contextlib import asynccontextmanager +from typing import AsyncGenerator + +from src.config import get_settings + +settings = get_settings() + +# Create async engine +engine = create_async_engine( + settings.database_url, + echo=settings.environment == "development", + pool_size=5, + max_overflow=10 +) + +# Session factory +async_session_maker = async_sessionmaker( + engine, + class_=AsyncSession, + expire_on_commit=False +) + +Base = declarative_base() + + +async def get_db() -> AsyncGenerator[AsyncSession, None]: + """Dependency for getting database sessions.""" + async with async_session_maker() as session: + try: + yield session + await session.commit() + except Exception: + await session.rollback() + raise + finally: + await session.close() + + +@asynccontextmanager +async def get_db_context() -> AsyncGenerator[AsyncSession, None]: + """Context manager for database sessions.""" + async with async_session_maker() as session: + try: + yield session + await session.commit() + except Exception: + await session.rollback() + raise diff --git a/src/routers/__init__.py b/src/routers/__init__.py new file mode 100644 index 0000000..f86c174 --- /dev/null +++ b/src/routers/__init__.py @@ -0,0 +1,4 @@ +from .tickers import router as tickers_router +from .sync import router as sync_router + +__all__ = ["tickers_router", "sync_router"] diff --git a/src/routers/sync.py b/src/routers/sync.py new file mode 100644 index 0000000..65bd438 --- /dev/null +++ b/src/routers/sync.py @@ -0,0 +1,133 @@ +""" +Sync API Router - Endpoints for data synchronization. +""" +from fastapi import APIRouter, BackgroundTasks, HTTPException +from datetime import datetime +from typing import List, Optional + +from src.schemas import SyncRequest, SyncResponse, SyncStatus +from src.services.sync_service import SyncService + +router = APIRouter(prefix="/api/v1/sync", tags=["sync"]) + +# Singleton sync service +sync_service = SyncService() + + +@router.get("/status", response_model=SyncResponse) +async def get_sync_status(): + """Get current synchronization status.""" + status = sync_service.get_status() + return SyncResponse( + status=SyncStatus(status.get("status", "idle")), + message=status.get("message", ""), + symbols_processed=status.get("symbols_processed", []), + records_inserted=status.get("records_inserted", 0), + errors=status.get("errors", []), + started_at=status.get("started_at"), + completed_at=status.get("completed_at") + ) + + +@router.post("/historical", response_model=SyncResponse) +async def sync_historical( + request: SyncRequest, + background_tasks: BackgroundTasks +): + """ + Start historical data synchronization. + + This endpoint starts a background task to sync historical data + from Polygon.io to the database. + """ + # Check if sync is already running + current_status = sync_service.get_status() + if current_status.get("status") == "running": + raise HTTPException( + status_code=409, + detail="Sync already in progress" + ) + + # Start sync in background + background_tasks.add_task( + sync_service.sync_all, + symbols=request.symbols, + start_date=request.start_date, + end_date=request.end_date, + timeframe=request.timeframe + ) + + return SyncResponse( + status=SyncStatus.pending, + message="Historical sync started in background", + symbols_processed=[], + records_inserted=0, + errors=[], + started_at=datetime.now(), + completed_at=None + ) + + +@router.post("/latest", response_model=SyncResponse) +async def sync_latest( + symbols: Optional[List[str]] = None, + background_tasks: BackgroundTasks = None +): + """ + Sync latest data for all or specified tickers. + + This fetches the most recent data since the last sync. + """ + # Check if sync is already running + current_status = sync_service.get_status() + if current_status.get("status") == "running": + raise HTTPException( + status_code=409, + detail="Sync already in progress" + ) + + # Start sync in background + background_tasks.add_task( + sync_service.sync_all, + symbols=symbols, + start_date=None, # Will auto-detect from last timestamp + end_date=None, + timeframe="5m" + ) + + return SyncResponse( + status=SyncStatus.pending, + message="Latest data sync started in background", + symbols_processed=[], + records_inserted=0, + errors=[], + started_at=datetime.now(), + completed_at=None + ) + + +@router.post("/ticker/{symbol}", response_model=SyncResponse) +async def sync_single_ticker( + symbol: str, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + timeframe: str = "5m" +): + """Sync data for a single ticker (synchronous).""" + # This runs synchronously for single ticker + result = await sync_service.sync_all( + symbols=[symbol.upper()], + start_date=start_date, + end_date=end_date, + timeframe=timeframe + ) + + return SyncResponse( + status=SyncStatus(result.get("status", "completed")), + message=result.get("message", ""), + symbols_processed=result.get("symbols_processed", []), + records_inserted=result.get("records_inserted", 0), + errors=result.get("errors", []), + started_at=result.get("started_at"), + completed_at=result.get("completed_at") + ) diff --git a/src/routers/tickers.py b/src/routers/tickers.py new file mode 100644 index 0000000..ada7bc8 --- /dev/null +++ b/src/routers/tickers.py @@ -0,0 +1,204 @@ +""" +Tickers API Router - Endpoints for market data access. +""" +from fastapi import APIRouter, Depends, Query, HTTPException +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession +from datetime import datetime, timedelta +from typing import List, Optional + +from src.models.database import get_db +from src.schemas import TickerResponse, OHLCVListResponse, OHLCVBase + +router = APIRouter(prefix="/api/v1/tickers", tags=["tickers"]) + + +@router.get("", response_model=List[TickerResponse]) +async def get_tickers( + active_only: bool = Query(True, description="Only return active tickers"), + db: AsyncSession = Depends(get_db) +): + """Get all tickers.""" + query = text(""" + SELECT id, symbol, name, asset_type, base_currency, quote_currency, + is_ml_enabled, supported_timeframes, polygon_ticker, + is_active, created_at, updated_at + FROM market_data.tickers + WHERE (:active_only = false OR is_active = true) + ORDER BY id + """) + result = await db.execute(query, {"active_only": active_only}) + rows = result.fetchall() + + return [ + TickerResponse( + id=row[0], + symbol=row[1], + name=row[2], + asset_type=row[3], + base_currency=row[4], + quote_currency=row[5], + is_ml_enabled=row[6], + supported_timeframes=row[7] or ["5m", "15m"], + polygon_ticker=row[8], + is_active=row[9], + created_at=row[10], + updated_at=row[11] + ) + for row in rows + ] + + +@router.get("/{symbol}", response_model=TickerResponse) +async def get_ticker(symbol: str, db: AsyncSession = Depends(get_db)): + """Get a specific ticker by symbol.""" + query = text(""" + SELECT id, symbol, name, asset_type, base_currency, quote_currency, + is_ml_enabled, supported_timeframes, polygon_ticker, + is_active, created_at, updated_at + FROM market_data.tickers + WHERE symbol = :symbol + """) + result = await db.execute(query, {"symbol": symbol.upper()}) + row = result.fetchone() + + if not row: + raise HTTPException(status_code=404, detail=f"Ticker {symbol} not found") + + return TickerResponse( + id=row[0], + symbol=row[1], + name=row[2], + asset_type=row[3], + base_currency=row[4], + quote_currency=row[5], + is_ml_enabled=row[6], + supported_timeframes=row[7] or ["5m", "15m"], + polygon_ticker=row[8], + is_active=row[9], + created_at=row[10], + updated_at=row[11] + ) + + +@router.get("/{symbol}/ohlcv", response_model=OHLCVListResponse) +async def get_ticker_ohlcv( + symbol: str, + timeframe: str = Query("5m", description="Timeframe (5m or 15m)"), + start: Optional[datetime] = Query(None, description="Start datetime"), + end: Optional[datetime] = Query(None, description="End datetime"), + limit: int = Query(1000, ge=1, le=10000, description="Maximum records"), + db: AsyncSession = Depends(get_db) +): + """Get OHLCV data for a ticker.""" + # Validate timeframe + if timeframe not in ["5m", "15m"]: + raise HTTPException(status_code=400, detail="Timeframe must be 5m or 15m") + + table = "ohlcv_5m" if timeframe == "5m" else "ohlcv_15m" + + # Get ticker ID + ticker_query = text(""" + SELECT id FROM market_data.tickers WHERE symbol = :symbol + """) + ticker_result = await db.execute(ticker_query, {"symbol": symbol.upper()}) + ticker_row = ticker_result.fetchone() + + if not ticker_row: + raise HTTPException(status_code=404, detail=f"Ticker {symbol} not found") + + ticker_id = ticker_row[0] + + # Build OHLCV query + if start is None: + start = datetime.now() - timedelta(days=7) + if end is None: + end = datetime.now() + + query = text(f""" + SELECT timestamp, open, high, low, close, volume, vwap + FROM market_data.{table} + WHERE ticker_id = :ticker_id + AND timestamp >= :start + AND timestamp <= :end + ORDER BY timestamp DESC + LIMIT :limit + """) + + result = await db.execute(query, { + "ticker_id": ticker_id, + "start": start, + "end": end, + "limit": limit + }) + rows = result.fetchall() + + data = [ + OHLCVBase( + timestamp=row[0], + open=float(row[1]), + high=float(row[2]), + low=float(row[3]), + close=float(row[4]), + volume=float(row[5]) if row[5] else 0, + vwap=float(row[6]) if row[6] else None + ) + for row in rows + ] + + return OHLCVListResponse( + symbol=symbol.upper(), + timeframe=timeframe, + count=len(data), + data=data + ) + + +@router.get("/{symbol}/latest") +async def get_latest_price( + symbol: str, + db: AsyncSession = Depends(get_db) +): + """Get the latest price for a ticker.""" + # Get ticker ID + ticker_query = text(""" + SELECT id FROM market_data.tickers WHERE symbol = :symbol + """) + ticker_result = await db.execute(ticker_query, {"symbol": symbol.upper()}) + ticker_row = ticker_result.fetchone() + + if not ticker_row: + raise HTTPException(status_code=404, detail=f"Ticker {symbol} not found") + + ticker_id = ticker_row[0] + + # Get latest OHLCV + query = text(""" + SELECT timestamp, open, high, low, close, volume + FROM market_data.ohlcv_5m + WHERE ticker_id = :ticker_id + ORDER BY timestamp DESC + LIMIT 1 + """) + + result = await db.execute(query, {"ticker_id": ticker_id}) + row = result.fetchone() + + if not row: + return { + "symbol": symbol.upper(), + "price": None, + "timestamp": None, + "message": "No data available" + } + + return { + "symbol": symbol.upper(), + "price": float(row[4]), # close price + "open": float(row[1]), + "high": float(row[2]), + "low": float(row[3]), + "close": float(row[4]), + "volume": float(row[5]) if row[5] else 0, + "timestamp": row[0].isoformat() + } diff --git a/src/schemas/__init__.py b/src/schemas/__init__.py new file mode 100644 index 0000000..9871deb --- /dev/null +++ b/src/schemas/__init__.py @@ -0,0 +1,21 @@ +from .market_data import ( + TickerBase, + TickerResponse, + OHLCVBase, + OHLCVResponse, + OHLCVListResponse, + SyncRequest, + SyncStatus, + SyncResponse +) + +__all__ = [ + "TickerBase", + "TickerResponse", + "OHLCVBase", + "OHLCVResponse", + "OHLCVListResponse", + "SyncRequest", + "SyncStatus", + "SyncResponse" +] diff --git a/src/schemas/market_data.py b/src/schemas/market_data.py new file mode 100644 index 0000000..5a0711c --- /dev/null +++ b/src/schemas/market_data.py @@ -0,0 +1,94 @@ +""" +Pydantic schemas for market data. +""" +from pydantic import BaseModel, Field +from datetime import datetime +from typing import Optional, List +from enum import Enum + + +class AssetType(str, Enum): + """Asset type enumeration.""" + forex = "forex" + crypto = "crypto" + commodity = "commodity" + stock = "stock" + + +class TickerBase(BaseModel): + """Base ticker schema.""" + symbol: str + name: str + asset_type: AssetType + base_currency: str + quote_currency: str + + +class TickerResponse(TickerBase): + """Ticker response schema.""" + id: int + polygon_ticker: Optional[str] = None + is_active: bool = True + is_ml_enabled: bool = True + supported_timeframes: List[str] = ["5m", "15m"] + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True + + +class OHLCVBase(BaseModel): + """Base OHLCV schema.""" + timestamp: datetime + open: float + high: float + low: float + close: float + volume: float = 0 + vwap: Optional[float] = None + + +class OHLCVResponse(OHLCVBase): + """OHLCV response schema.""" + id: int + ticker_id: int + + class Config: + from_attributes = True + + +class OHLCVListResponse(BaseModel): + """List of OHLCV data response.""" + symbol: str + timeframe: str + count: int + data: List[OHLCVBase] + + +class SyncRequest(BaseModel): + """Sync request schema.""" + symbols: Optional[List[str]] = None # None = all active + start_date: Optional[datetime] = None + end_date: Optional[datetime] = None + timeframe: str = "5m" + + +class SyncStatus(str, Enum): + """Sync status enumeration.""" + idle = "idle" + pending = "pending" + running = "running" + completed = "completed" + failed = "failed" + + +class SyncResponse(BaseModel): + """Sync response schema.""" + status: SyncStatus + message: str + symbols_processed: List[str] = [] + records_inserted: int = 0 + errors: List[str] = [] + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None diff --git a/src/services/__init__.py b/src/services/__init__.py new file mode 100644 index 0000000..01498e3 --- /dev/null +++ b/src/services/__init__.py @@ -0,0 +1,4 @@ +from .polygon_service import PolygonService +from .sync_service import SyncService + +__all__ = ["PolygonService", "SyncService"] diff --git a/src/services/polygon_service.py b/src/services/polygon_service.py new file mode 100644 index 0000000..d41ae6d --- /dev/null +++ b/src/services/polygon_service.py @@ -0,0 +1,196 @@ +""" +Polygon.io API Service for fetching market data. +""" +import httpx +import asyncio +from datetime import datetime, timedelta +from typing import List, Dict, Any, Optional +import logging + +from src.config import get_settings + +logger = logging.getLogger(__name__) +settings = get_settings() + + +class PolygonService: + """Service for interacting with Polygon.io API.""" + + def __init__(self): + self.api_key = settings.polygon_api_key + self.base_url = settings.polygon_base_url + self.rate_limit = settings.polygon_rate_limit + self._last_request_time = None + self._request_count = 0 + + async def _rate_limit_wait(self): + """Implement rate limiting for API calls.""" + if self._last_request_time is None: + self._last_request_time = datetime.now() + self._request_count = 1 + return + + now = datetime.now() + elapsed = (now - self._last_request_time).total_seconds() + + if elapsed < 60: + self._request_count += 1 + if self._request_count >= self.rate_limit: + wait_time = 60 - elapsed + logger.info(f"Rate limit reached, waiting {wait_time:.1f}s") + await asyncio.sleep(wait_time) + self._last_request_time = datetime.now() + self._request_count = 1 + else: + self._last_request_time = now + self._request_count = 1 + + async def get_aggregates( + self, + ticker: str, + multiplier: int = 5, + timespan: str = "minute", + start_date: datetime = None, + end_date: datetime = None, + limit: int = 50000 + ) -> List[Dict[str, Any]]: + """ + Fetch aggregate bars (OHLCV) from Polygon.io. + + Args: + ticker: Polygon ticker symbol (e.g., "C:XAUUSD", "X:BTCUSD") + multiplier: Size of timespan multiplier (e.g., 5 for 5-minute) + timespan: Type of window size (minute, hour, day, etc.) + start_date: Start date for data + end_date: End date for data + limit: Maximum number of results + + Returns: + List of OHLCV bars + """ + if not self.api_key: + logger.error("Polygon API key not configured") + return [] + + if start_date is None: + start_date = datetime.now() - timedelta(days=30) + if end_date is None: + end_date = datetime.now() + + # Format dates as YYYY-MM-DD + start_str = start_date.strftime("%Y-%m-%d") + end_str = end_date.strftime("%Y-%m-%d") + + url = ( + f"{self.base_url}/v2/aggs/ticker/{ticker}/range/" + f"{multiplier}/{timespan}/{start_str}/{end_str}" + ) + + params = { + "apiKey": self.api_key, + "adjusted": "true", + "sort": "asc", + "limit": limit + } + + await self._rate_limit_wait() + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.get(url, params=params) + response.raise_for_status() + data = response.json() + + if data.get("status") == "OK" and data.get("results"): + results = data["results"] + logger.info( + f"Fetched {len(results)} bars for {ticker} " + f"from {start_str} to {end_str}" + ) + return results + elif data.get("status") == "OK" and not data.get("results"): + logger.warning(f"No data available for {ticker}") + return [] + else: + logger.error(f"Polygon API error: {data}") + return [] + + except httpx.HTTPStatusError as e: + logger.error(f"HTTP error fetching {ticker}: {e}") + return [] + except Exception as e: + logger.error(f"Error fetching {ticker}: {e}") + return [] + + async def get_forex_aggregates( + self, + ticker: str, + timeframe: str = "5m", + start_date: datetime = None, + end_date: datetime = None + ) -> List[Dict[str, Any]]: + """ + Fetch forex aggregates with timeframe string. + + Args: + ticker: Polygon forex ticker (e.g., "C:EURUSD") + timeframe: Timeframe string ("5m", "15m", "1h", "1d") + start_date: Start date + end_date: End date + + Returns: + List of OHLCV bars + """ + # Parse timeframe + timeframe_map = { + "1m": (1, "minute"), + "5m": (5, "minute"), + "15m": (15, "minute"), + "30m": (30, "minute"), + "1h": (1, "hour"), + "4h": (4, "hour"), + "1d": (1, "day"), + } + + if timeframe not in timeframe_map: + logger.error(f"Invalid timeframe: {timeframe}") + return [] + + multiplier, timespan = timeframe_map[timeframe] + + return await self.get_aggregates( + ticker=ticker, + multiplier=multiplier, + timespan=timespan, + start_date=start_date, + end_date=end_date + ) + + def transform_to_ohlcv(self, bars: List[Dict[str, Any]], ticker_id: int) -> List[Dict[str, Any]]: + """ + Transform Polygon API response to database format. + + Args: + bars: Raw bars from Polygon API + ticker_id: Database ticker ID + + Returns: + List of OHLCV records ready for database insertion + """ + ohlcv_records = [] + + for bar in bars: + record = { + "ticker_id": ticker_id, + "timestamp": datetime.fromtimestamp(bar["t"] / 1000), + "open": bar["o"], + "high": bar["h"], + "low": bar["l"], + "close": bar["c"], + "volume": bar.get("v", 0), + "vwap": bar.get("vw"), + "ts_epoch": bar["t"] + } + ohlcv_records.append(record) + + return ohlcv_records diff --git a/src/services/sync_service.py b/src/services/sync_service.py new file mode 100644 index 0000000..712c578 --- /dev/null +++ b/src/services/sync_service.py @@ -0,0 +1,253 @@ +""" +Synchronization Service for loading market data into database. +""" +import asyncio +from datetime import datetime, timedelta +from typing import List, Dict, Any, Optional +import logging + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +from src.config import get_settings +from src.models.database import get_db_context +from src.services.polygon_service import PolygonService + +logger = logging.getLogger(__name__) +settings = get_settings() + + +class SyncService: + """Service for synchronizing market data from Polygon to database.""" + + def __init__(self): + self.polygon = PolygonService() + self.batch_size = settings.sync_batch_size + self.sync_status = { + "status": "idle", + "message": "", + "symbols_processed": [], + "records_inserted": 0, + "errors": [], + "started_at": None, + "completed_at": None + } + + async def get_active_tickers(self, session: AsyncSession) -> List[Dict[str, Any]]: + """Get all active tickers from database.""" + query = text(""" + SELECT id, symbol, name, asset_type, polygon_ticker, is_active + FROM market_data.tickers + WHERE is_active = true AND polygon_ticker IS NOT NULL + ORDER BY id + """) + result = await session.execute(query) + rows = result.fetchall() + return [ + { + "id": row[0], + "symbol": row[1], + "name": row[2], + "asset_type": row[3], + "polygon_ticker": row[4], + "is_active": row[5] + } + for row in rows + ] + + async def get_latest_timestamp( + self, + session: AsyncSession, + ticker_id: int, + timeframe: str = "5m" + ) -> Optional[datetime]: + """Get the latest timestamp for a ticker.""" + table = "ohlcv_5m" if timeframe == "5m" else "ohlcv_15m" + query = text(f""" + SELECT MAX(timestamp) FROM market_data.{table} + WHERE ticker_id = :ticker_id + """) + result = await session.execute(query, {"ticker_id": ticker_id}) + row = result.fetchone() + return row[0] if row and row[0] else None + + async def insert_ohlcv_batch( + self, + session: AsyncSession, + records: List[Dict[str, Any]], + timeframe: str = "5m" + ) -> int: + """Insert OHLCV records in batch, handling duplicates.""" + if not records: + return 0 + + table = "ohlcv_5m" if timeframe == "5m" else "ohlcv_15m" + + # Use INSERT ON CONFLICT to handle duplicates + query = text(f""" + INSERT INTO market_data.{table} + (ticker_id, timestamp, open, high, low, close, volume, vwap, ts_epoch) + VALUES (:ticker_id, :timestamp, :open, :high, :low, :close, :volume, :vwap, :ts_epoch) + ON CONFLICT (ticker_id, timestamp) DO UPDATE SET + open = EXCLUDED.open, + high = EXCLUDED.high, + low = EXCLUDED.low, + close = EXCLUDED.close, + volume = EXCLUDED.volume, + vwap = EXCLUDED.vwap, + ts_epoch = EXCLUDED.ts_epoch + """) + + inserted = 0 + for i in range(0, len(records), self.batch_size): + batch = records[i:i + self.batch_size] + for record in batch: + try: + await session.execute(query, record) + inserted += 1 + except Exception as e: + logger.error(f"Error inserting record: {e}") + + await session.commit() + logger.info(f"Inserted batch of {len(batch)} records") + + return inserted + + async def sync_ticker( + self, + ticker: Dict[str, Any], + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + timeframe: str = "5m" + ) -> Dict[str, Any]: + """ + Sync data for a single ticker. + + Returns: + Dict with sync results + """ + result = { + "symbol": ticker["symbol"], + "records_inserted": 0, + "error": None + } + + async with get_db_context() as session: + try: + # Determine start date + if start_date is None: + latest = await self.get_latest_timestamp( + session, ticker["id"], timeframe + ) + if latest: + start_date = latest + timedelta(minutes=5) + else: + start_date = datetime.now() - timedelta( + days=settings.sync_history_days + ) + + if end_date is None: + end_date = datetime.now() + + logger.info( + f"Syncing {ticker['symbol']} from {start_date} to {end_date}" + ) + + # Fetch data from Polygon + bars = await self.polygon.get_forex_aggregates( + ticker=ticker["polygon_ticker"], + timeframe=timeframe, + start_date=start_date, + end_date=end_date + ) + + if not bars: + logger.warning(f"No data returned for {ticker['symbol']}") + return result + + # Transform and insert + records = self.polygon.transform_to_ohlcv(bars, ticker["id"]) + inserted = await self.insert_ohlcv_batch(session, records, timeframe) + + result["records_inserted"] = inserted + logger.info(f"Synced {inserted} records for {ticker['symbol']}") + + except Exception as e: + error_msg = f"Error syncing {ticker['symbol']}: {str(e)}" + logger.error(error_msg) + result["error"] = error_msg + + return result + + async def sync_all( + self, + symbols: Optional[List[str]] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + timeframe: str = "5m" + ) -> Dict[str, Any]: + """ + Sync data for all active tickers. + + Args: + symbols: List of symbols to sync (None = all active) + start_date: Start date for sync + end_date: End date for sync + timeframe: Timeframe to sync + + Returns: + Sync status dictionary + """ + self.sync_status = { + "status": "running", + "message": "Sync started", + "symbols_processed": [], + "records_inserted": 0, + "errors": [], + "started_at": datetime.now(), + "completed_at": None + } + + try: + async with get_db_context() as session: + tickers = await self.get_active_tickers(session) + + if symbols: + tickers = [t for t in tickers if t["symbol"] in symbols] + + logger.info(f"Starting sync for {len(tickers)} tickers") + + for ticker in tickers: + result = await self.sync_ticker( + ticker=ticker, + start_date=start_date, + end_date=end_date, + timeframe=timeframe + ) + + self.sync_status["symbols_processed"].append(ticker["symbol"]) + self.sync_status["records_inserted"] += result["records_inserted"] + + if result["error"]: + self.sync_status["errors"].append(result["error"]) + + # Small delay between tickers for rate limiting + await asyncio.sleep(1) + + self.sync_status["status"] = "completed" + self.sync_status["message"] = ( + f"Sync completed. Processed {len(tickers)} symbols, " + f"inserted {self.sync_status['records_inserted']} records." + ) + + except Exception as e: + self.sync_status["status"] = "failed" + self.sync_status["message"] = str(e) + self.sync_status["errors"].append(str(e)) + + self.sync_status["completed_at"] = datetime.now() + return self.sync_status + + def get_status(self) -> Dict[str, Any]: + """Get current sync status.""" + return self.sync_status