feat: Initial Data Service implementation

- FastAPI service for market data synchronization from Polygon.io
- Async PostgreSQL integration with SQLAlchemy
- API endpoints for tickers, OHLCV data, and sync operations
- Rate-limited Polygon.io API client
- Background task support for historical data sync
- Support for 6 assets: XAUUSD, EURUSD, BTCUSD, GBPUSD, USDJPY, AUDUSD

Endpoints:
- GET /api/v1/tickers - List all tickers
- GET /api/v1/tickers/{symbol}/ohlcv - Get OHLCV data
- GET /api/v1/tickers/{symbol}/latest - Get latest price
- POST /api/v1/sync/historical - Start historical sync
- POST /api/v1/sync/ticker/{symbol} - Sync single ticker

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
rckrdmrd 2026-01-13 13:28:18 -06:00
commit 01683a850f
18 changed files with 1231 additions and 0 deletions

26
.env.example Normal file
View File

@ -0,0 +1,26 @@
# =============================================================================
# Data Service - Environment Variables
# =============================================================================
# Server
PORT=8002
HOST=0.0.0.0
ENVIRONMENT=development
LOG_LEVEL=INFO
# Database
DATABASE_URL=postgresql+asyncpg://trading_user:trading_dev_2025@localhost:5432/trading_platform
# Polygon.io API
POLYGON_API_KEY=your_polygon_api_key_here
POLYGON_BASE_URL=https://api.polygon.io
# Rate Limiting (requests per minute)
POLYGON_RATE_LIMIT=5
# Sync Settings
SYNC_BATCH_SIZE=1000
SYNC_HISTORY_DAYS=365
# Redis (optional)
REDIS_URL=redis://localhost:6379/0

28
.gitignore vendored Normal file
View File

@ -0,0 +1,28 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
venv/
ENV/
env/
.venv/
# Environment
.env
.env.local
# IDE
.vscode/
.idea/
*.swp
*.swo
# Logs
*.log
# Test
.pytest_cache/
.coverage
htmlcov/

33
Dockerfile Normal file
View File

@ -0,0 +1,33 @@
# Data Service Dockerfile
# Market data synchronization service using Polygon.io API
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for layer caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# Expose port
EXPOSE 8002
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import httpx; httpx.get('http://localhost:8002/health')" || exit 1
# Run the application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8002"]

90
main.py Normal file
View File

@ -0,0 +1,90 @@
"""
Data Service - Market Data Synchronization API
FastAPI service for fetching and synchronizing market data from Polygon.io
to the trading platform database.
"""
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from src.config import get_settings
from src.routers import tickers_router, sync_router
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
settings = get_settings()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler."""
logger.info("Data Service starting up...")
logger.info(f"Environment: {settings.environment}")
logger.info(f"Database: {settings.database_url.split('@')[1] if '@' in settings.database_url else 'configured'}")
yield
logger.info("Data Service shutting down...")
app = FastAPI(
title="Trading Platform - Data Service",
description="Market data synchronization service using Polygon.io API",
version="1.0.0",
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(tickers_router)
app.include_router(sync_router)
@app.get("/")
async def root():
"""Root endpoint with service info."""
return {
"service": "data-service",
"version": "1.0.0",
"description": "Market data synchronization from Polygon.io",
"endpoints": {
"tickers": "/api/v1/tickers",
"sync": "/api/v1/sync",
"docs": "/docs"
}
}
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"service": "data-service"
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=settings.port,
reload=settings.environment == "development"
)

41
requirements.txt Normal file
View File

@ -0,0 +1,41 @@
# =============================================================================
# Data Service - Requirements
# =============================================================================
# Python 3.11+
# Puerto: 8002
# =============================================================================
# Web Framework
fastapi==0.109.0
uvicorn[standard]==0.27.0
python-multipart==0.0.6
# Database
asyncpg==0.29.0
sqlalchemy[asyncio]==2.0.25
psycopg2-binary==2.9.9
# HTTP Client
httpx==0.26.0
aiohttp==3.9.1
# Data Processing (opcional - no requerido para core)
# pandas>=2.2.0 # Requiere Python 3.9-3.12
# numpy>=2.0.0
# Cache (opcional)
redis==5.0.1
# Environment
python-dotenv==1.0.0
# Utilities
pydantic==2.5.3
pydantic-settings==2.1.0
# Logging
structlog==24.1.0
# Date/Time
python-dateutil==2.8.2
pytz==2024.1

4
src/__init__.py Normal file
View File

@ -0,0 +1,4 @@
"""
Data Service - Market data synchronization from Polygon.io
"""
__version__ = "1.0.0"

3
src/config/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .settings import Settings, get_settings
__all__ = ["Settings", "get_settings"]

40
src/config/settings.py Normal file
View File

@ -0,0 +1,40 @@
"""
Data Service - Configuration Settings
"""
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
# Server
port: int = 8002
host: str = "0.0.0.0"
environment: str = "development"
log_level: str = "INFO"
# Database
database_url: str = "postgresql+asyncpg://trading_user:trading_dev_2025@localhost:5432/trading_platform"
# Polygon.io API
polygon_api_key: str = ""
polygon_base_url: str = "https://api.polygon.io"
polygon_rate_limit: int = 5 # requests per minute
# Sync Settings
sync_batch_size: int = 1000
sync_history_days: int = 365
# Redis (optional)
redis_url: str = "redis://localhost:6379/0"
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
@lru_cache()
def get_settings() -> Settings:
"""Get cached settings instance."""
return Settings()

4
src/models/__init__.py Normal file
View File

@ -0,0 +1,4 @@
"""Database models for Data Service."""
from .database import get_db, get_db_context, engine
__all__ = ["get_db", "get_db_context", "engine"]

53
src/models/database.py Normal file
View File

@ -0,0 +1,53 @@
"""
Database connection and session management.
"""
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from src.config import get_settings
settings = get_settings()
# Create async engine
engine = create_async_engine(
settings.database_url,
echo=settings.environment == "development",
pool_size=5,
max_overflow=10
)
# Session factory
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
Base = declarative_base()
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Dependency for getting database sessions."""
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
@asynccontextmanager
async def get_db_context() -> AsyncGenerator[AsyncSession, None]:
"""Context manager for database sessions."""
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise

4
src/routers/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .tickers import router as tickers_router
from .sync import router as sync_router
__all__ = ["tickers_router", "sync_router"]

133
src/routers/sync.py Normal file
View File

@ -0,0 +1,133 @@
"""
Sync API Router - Endpoints for data synchronization.
"""
from fastapi import APIRouter, BackgroundTasks, HTTPException
from datetime import datetime
from typing import List, Optional
from src.schemas import SyncRequest, SyncResponse, SyncStatus
from src.services.sync_service import SyncService
router = APIRouter(prefix="/api/v1/sync", tags=["sync"])
# Singleton sync service
sync_service = SyncService()
@router.get("/status", response_model=SyncResponse)
async def get_sync_status():
"""Get current synchronization status."""
status = sync_service.get_status()
return SyncResponse(
status=SyncStatus(status.get("status", "idle")),
message=status.get("message", ""),
symbols_processed=status.get("symbols_processed", []),
records_inserted=status.get("records_inserted", 0),
errors=status.get("errors", []),
started_at=status.get("started_at"),
completed_at=status.get("completed_at")
)
@router.post("/historical", response_model=SyncResponse)
async def sync_historical(
request: SyncRequest,
background_tasks: BackgroundTasks
):
"""
Start historical data synchronization.
This endpoint starts a background task to sync historical data
from Polygon.io to the database.
"""
# Check if sync is already running
current_status = sync_service.get_status()
if current_status.get("status") == "running":
raise HTTPException(
status_code=409,
detail="Sync already in progress"
)
# Start sync in background
background_tasks.add_task(
sync_service.sync_all,
symbols=request.symbols,
start_date=request.start_date,
end_date=request.end_date,
timeframe=request.timeframe
)
return SyncResponse(
status=SyncStatus.pending,
message="Historical sync started in background",
symbols_processed=[],
records_inserted=0,
errors=[],
started_at=datetime.now(),
completed_at=None
)
@router.post("/latest", response_model=SyncResponse)
async def sync_latest(
symbols: Optional[List[str]] = None,
background_tasks: BackgroundTasks = None
):
"""
Sync latest data for all or specified tickers.
This fetches the most recent data since the last sync.
"""
# Check if sync is already running
current_status = sync_service.get_status()
if current_status.get("status") == "running":
raise HTTPException(
status_code=409,
detail="Sync already in progress"
)
# Start sync in background
background_tasks.add_task(
sync_service.sync_all,
symbols=symbols,
start_date=None, # Will auto-detect from last timestamp
end_date=None,
timeframe="5m"
)
return SyncResponse(
status=SyncStatus.pending,
message="Latest data sync started in background",
symbols_processed=[],
records_inserted=0,
errors=[],
started_at=datetime.now(),
completed_at=None
)
@router.post("/ticker/{symbol}", response_model=SyncResponse)
async def sync_single_ticker(
symbol: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
timeframe: str = "5m"
):
"""Sync data for a single ticker (synchronous)."""
# This runs synchronously for single ticker
result = await sync_service.sync_all(
symbols=[symbol.upper()],
start_date=start_date,
end_date=end_date,
timeframe=timeframe
)
return SyncResponse(
status=SyncStatus(result.get("status", "completed")),
message=result.get("message", ""),
symbols_processed=result.get("symbols_processed", []),
records_inserted=result.get("records_inserted", 0),
errors=result.get("errors", []),
started_at=result.get("started_at"),
completed_at=result.get("completed_at")
)

204
src/routers/tickers.py Normal file
View File

@ -0,0 +1,204 @@
"""
Tickers API Router - Endpoints for market data access.
"""
from fastapi import APIRouter, Depends, Query, HTTPException
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime, timedelta
from typing import List, Optional
from src.models.database import get_db
from src.schemas import TickerResponse, OHLCVListResponse, OHLCVBase
router = APIRouter(prefix="/api/v1/tickers", tags=["tickers"])
@router.get("", response_model=List[TickerResponse])
async def get_tickers(
active_only: bool = Query(True, description="Only return active tickers"),
db: AsyncSession = Depends(get_db)
):
"""Get all tickers."""
query = text("""
SELECT id, symbol, name, asset_type, base_currency, quote_currency,
is_ml_enabled, supported_timeframes, polygon_ticker,
is_active, created_at, updated_at
FROM market_data.tickers
WHERE (:active_only = false OR is_active = true)
ORDER BY id
""")
result = await db.execute(query, {"active_only": active_only})
rows = result.fetchall()
return [
TickerResponse(
id=row[0],
symbol=row[1],
name=row[2],
asset_type=row[3],
base_currency=row[4],
quote_currency=row[5],
is_ml_enabled=row[6],
supported_timeframes=row[7] or ["5m", "15m"],
polygon_ticker=row[8],
is_active=row[9],
created_at=row[10],
updated_at=row[11]
)
for row in rows
]
@router.get("/{symbol}", response_model=TickerResponse)
async def get_ticker(symbol: str, db: AsyncSession = Depends(get_db)):
"""Get a specific ticker by symbol."""
query = text("""
SELECT id, symbol, name, asset_type, base_currency, quote_currency,
is_ml_enabled, supported_timeframes, polygon_ticker,
is_active, created_at, updated_at
FROM market_data.tickers
WHERE symbol = :symbol
""")
result = await db.execute(query, {"symbol": symbol.upper()})
row = result.fetchone()
if not row:
raise HTTPException(status_code=404, detail=f"Ticker {symbol} not found")
return TickerResponse(
id=row[0],
symbol=row[1],
name=row[2],
asset_type=row[3],
base_currency=row[4],
quote_currency=row[5],
is_ml_enabled=row[6],
supported_timeframes=row[7] or ["5m", "15m"],
polygon_ticker=row[8],
is_active=row[9],
created_at=row[10],
updated_at=row[11]
)
@router.get("/{symbol}/ohlcv", response_model=OHLCVListResponse)
async def get_ticker_ohlcv(
symbol: str,
timeframe: str = Query("5m", description="Timeframe (5m or 15m)"),
start: Optional[datetime] = Query(None, description="Start datetime"),
end: Optional[datetime] = Query(None, description="End datetime"),
limit: int = Query(1000, ge=1, le=10000, description="Maximum records"),
db: AsyncSession = Depends(get_db)
):
"""Get OHLCV data for a ticker."""
# Validate timeframe
if timeframe not in ["5m", "15m"]:
raise HTTPException(status_code=400, detail="Timeframe must be 5m or 15m")
table = "ohlcv_5m" if timeframe == "5m" else "ohlcv_15m"
# Get ticker ID
ticker_query = text("""
SELECT id FROM market_data.tickers WHERE symbol = :symbol
""")
ticker_result = await db.execute(ticker_query, {"symbol": symbol.upper()})
ticker_row = ticker_result.fetchone()
if not ticker_row:
raise HTTPException(status_code=404, detail=f"Ticker {symbol} not found")
ticker_id = ticker_row[0]
# Build OHLCV query
if start is None:
start = datetime.now() - timedelta(days=7)
if end is None:
end = datetime.now()
query = text(f"""
SELECT timestamp, open, high, low, close, volume, vwap
FROM market_data.{table}
WHERE ticker_id = :ticker_id
AND timestamp >= :start
AND timestamp <= :end
ORDER BY timestamp DESC
LIMIT :limit
""")
result = await db.execute(query, {
"ticker_id": ticker_id,
"start": start,
"end": end,
"limit": limit
})
rows = result.fetchall()
data = [
OHLCVBase(
timestamp=row[0],
open=float(row[1]),
high=float(row[2]),
low=float(row[3]),
close=float(row[4]),
volume=float(row[5]) if row[5] else 0,
vwap=float(row[6]) if row[6] else None
)
for row in rows
]
return OHLCVListResponse(
symbol=symbol.upper(),
timeframe=timeframe,
count=len(data),
data=data
)
@router.get("/{symbol}/latest")
async def get_latest_price(
symbol: str,
db: AsyncSession = Depends(get_db)
):
"""Get the latest price for a ticker."""
# Get ticker ID
ticker_query = text("""
SELECT id FROM market_data.tickers WHERE symbol = :symbol
""")
ticker_result = await db.execute(ticker_query, {"symbol": symbol.upper()})
ticker_row = ticker_result.fetchone()
if not ticker_row:
raise HTTPException(status_code=404, detail=f"Ticker {symbol} not found")
ticker_id = ticker_row[0]
# Get latest OHLCV
query = text("""
SELECT timestamp, open, high, low, close, volume
FROM market_data.ohlcv_5m
WHERE ticker_id = :ticker_id
ORDER BY timestamp DESC
LIMIT 1
""")
result = await db.execute(query, {"ticker_id": ticker_id})
row = result.fetchone()
if not row:
return {
"symbol": symbol.upper(),
"price": None,
"timestamp": None,
"message": "No data available"
}
return {
"symbol": symbol.upper(),
"price": float(row[4]), # close price
"open": float(row[1]),
"high": float(row[2]),
"low": float(row[3]),
"close": float(row[4]),
"volume": float(row[5]) if row[5] else 0,
"timestamp": row[0].isoformat()
}

21
src/schemas/__init__.py Normal file
View File

@ -0,0 +1,21 @@
from .market_data import (
TickerBase,
TickerResponse,
OHLCVBase,
OHLCVResponse,
OHLCVListResponse,
SyncRequest,
SyncStatus,
SyncResponse
)
__all__ = [
"TickerBase",
"TickerResponse",
"OHLCVBase",
"OHLCVResponse",
"OHLCVListResponse",
"SyncRequest",
"SyncStatus",
"SyncResponse"
]

View File

@ -0,0 +1,94 @@
"""
Pydantic schemas for market data.
"""
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional, List
from enum import Enum
class AssetType(str, Enum):
"""Asset type enumeration."""
forex = "forex"
crypto = "crypto"
commodity = "commodity"
stock = "stock"
class TickerBase(BaseModel):
"""Base ticker schema."""
symbol: str
name: str
asset_type: AssetType
base_currency: str
quote_currency: str
class TickerResponse(TickerBase):
"""Ticker response schema."""
id: int
polygon_ticker: Optional[str] = None
is_active: bool = True
is_ml_enabled: bool = True
supported_timeframes: List[str] = ["5m", "15m"]
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class OHLCVBase(BaseModel):
"""Base OHLCV schema."""
timestamp: datetime
open: float
high: float
low: float
close: float
volume: float = 0
vwap: Optional[float] = None
class OHLCVResponse(OHLCVBase):
"""OHLCV response schema."""
id: int
ticker_id: int
class Config:
from_attributes = True
class OHLCVListResponse(BaseModel):
"""List of OHLCV data response."""
symbol: str
timeframe: str
count: int
data: List[OHLCVBase]
class SyncRequest(BaseModel):
"""Sync request schema."""
symbols: Optional[List[str]] = None # None = all active
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
timeframe: str = "5m"
class SyncStatus(str, Enum):
"""Sync status enumeration."""
idle = "idle"
pending = "pending"
running = "running"
completed = "completed"
failed = "failed"
class SyncResponse(BaseModel):
"""Sync response schema."""
status: SyncStatus
message: str
symbols_processed: List[str] = []
records_inserted: int = 0
errors: List[str] = []
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None

4
src/services/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .polygon_service import PolygonService
from .sync_service import SyncService
__all__ = ["PolygonService", "SyncService"]

View File

@ -0,0 +1,196 @@
"""
Polygon.io API Service for fetching market data.
"""
import httpx
import asyncio
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import logging
from src.config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
class PolygonService:
"""Service for interacting with Polygon.io API."""
def __init__(self):
self.api_key = settings.polygon_api_key
self.base_url = settings.polygon_base_url
self.rate_limit = settings.polygon_rate_limit
self._last_request_time = None
self._request_count = 0
async def _rate_limit_wait(self):
"""Implement rate limiting for API calls."""
if self._last_request_time is None:
self._last_request_time = datetime.now()
self._request_count = 1
return
now = datetime.now()
elapsed = (now - self._last_request_time).total_seconds()
if elapsed < 60:
self._request_count += 1
if self._request_count >= self.rate_limit:
wait_time = 60 - elapsed
logger.info(f"Rate limit reached, waiting {wait_time:.1f}s")
await asyncio.sleep(wait_time)
self._last_request_time = datetime.now()
self._request_count = 1
else:
self._last_request_time = now
self._request_count = 1
async def get_aggregates(
self,
ticker: str,
multiplier: int = 5,
timespan: str = "minute",
start_date: datetime = None,
end_date: datetime = None,
limit: int = 50000
) -> List[Dict[str, Any]]:
"""
Fetch aggregate bars (OHLCV) from Polygon.io.
Args:
ticker: Polygon ticker symbol (e.g., "C:XAUUSD", "X:BTCUSD")
multiplier: Size of timespan multiplier (e.g., 5 for 5-minute)
timespan: Type of window size (minute, hour, day, etc.)
start_date: Start date for data
end_date: End date for data
limit: Maximum number of results
Returns:
List of OHLCV bars
"""
if not self.api_key:
logger.error("Polygon API key not configured")
return []
if start_date is None:
start_date = datetime.now() - timedelta(days=30)
if end_date is None:
end_date = datetime.now()
# Format dates as YYYY-MM-DD
start_str = start_date.strftime("%Y-%m-%d")
end_str = end_date.strftime("%Y-%m-%d")
url = (
f"{self.base_url}/v2/aggs/ticker/{ticker}/range/"
f"{multiplier}/{timespan}/{start_str}/{end_str}"
)
params = {
"apiKey": self.api_key,
"adjusted": "true",
"sort": "asc",
"limit": limit
}
await self._rate_limit_wait()
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
if data.get("status") == "OK" and data.get("results"):
results = data["results"]
logger.info(
f"Fetched {len(results)} bars for {ticker} "
f"from {start_str} to {end_str}"
)
return results
elif data.get("status") == "OK" and not data.get("results"):
logger.warning(f"No data available for {ticker}")
return []
else:
logger.error(f"Polygon API error: {data}")
return []
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error fetching {ticker}: {e}")
return []
except Exception as e:
logger.error(f"Error fetching {ticker}: {e}")
return []
async def get_forex_aggregates(
self,
ticker: str,
timeframe: str = "5m",
start_date: datetime = None,
end_date: datetime = None
) -> List[Dict[str, Any]]:
"""
Fetch forex aggregates with timeframe string.
Args:
ticker: Polygon forex ticker (e.g., "C:EURUSD")
timeframe: Timeframe string ("5m", "15m", "1h", "1d")
start_date: Start date
end_date: End date
Returns:
List of OHLCV bars
"""
# Parse timeframe
timeframe_map = {
"1m": (1, "minute"),
"5m": (5, "minute"),
"15m": (15, "minute"),
"30m": (30, "minute"),
"1h": (1, "hour"),
"4h": (4, "hour"),
"1d": (1, "day"),
}
if timeframe not in timeframe_map:
logger.error(f"Invalid timeframe: {timeframe}")
return []
multiplier, timespan = timeframe_map[timeframe]
return await self.get_aggregates(
ticker=ticker,
multiplier=multiplier,
timespan=timespan,
start_date=start_date,
end_date=end_date
)
def transform_to_ohlcv(self, bars: List[Dict[str, Any]], ticker_id: int) -> List[Dict[str, Any]]:
"""
Transform Polygon API response to database format.
Args:
bars: Raw bars from Polygon API
ticker_id: Database ticker ID
Returns:
List of OHLCV records ready for database insertion
"""
ohlcv_records = []
for bar in bars:
record = {
"ticker_id": ticker_id,
"timestamp": datetime.fromtimestamp(bar["t"] / 1000),
"open": bar["o"],
"high": bar["h"],
"low": bar["l"],
"close": bar["c"],
"volume": bar.get("v", 0),
"vwap": bar.get("vw"),
"ts_epoch": bar["t"]
}
ohlcv_records.append(record)
return ohlcv_records

View File

@ -0,0 +1,253 @@
"""
Synchronization Service for loading market data into database.
"""
import asyncio
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import logging
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from src.config import get_settings
from src.models.database import get_db_context
from src.services.polygon_service import PolygonService
logger = logging.getLogger(__name__)
settings = get_settings()
class SyncService:
"""Service for synchronizing market data from Polygon to database."""
def __init__(self):
self.polygon = PolygonService()
self.batch_size = settings.sync_batch_size
self.sync_status = {
"status": "idle",
"message": "",
"symbols_processed": [],
"records_inserted": 0,
"errors": [],
"started_at": None,
"completed_at": None
}
async def get_active_tickers(self, session: AsyncSession) -> List[Dict[str, Any]]:
"""Get all active tickers from database."""
query = text("""
SELECT id, symbol, name, asset_type, polygon_ticker, is_active
FROM market_data.tickers
WHERE is_active = true AND polygon_ticker IS NOT NULL
ORDER BY id
""")
result = await session.execute(query)
rows = result.fetchall()
return [
{
"id": row[0],
"symbol": row[1],
"name": row[2],
"asset_type": row[3],
"polygon_ticker": row[4],
"is_active": row[5]
}
for row in rows
]
async def get_latest_timestamp(
self,
session: AsyncSession,
ticker_id: int,
timeframe: str = "5m"
) -> Optional[datetime]:
"""Get the latest timestamp for a ticker."""
table = "ohlcv_5m" if timeframe == "5m" else "ohlcv_15m"
query = text(f"""
SELECT MAX(timestamp) FROM market_data.{table}
WHERE ticker_id = :ticker_id
""")
result = await session.execute(query, {"ticker_id": ticker_id})
row = result.fetchone()
return row[0] if row and row[0] else None
async def insert_ohlcv_batch(
self,
session: AsyncSession,
records: List[Dict[str, Any]],
timeframe: str = "5m"
) -> int:
"""Insert OHLCV records in batch, handling duplicates."""
if not records:
return 0
table = "ohlcv_5m" if timeframe == "5m" else "ohlcv_15m"
# Use INSERT ON CONFLICT to handle duplicates
query = text(f"""
INSERT INTO market_data.{table}
(ticker_id, timestamp, open, high, low, close, volume, vwap, ts_epoch)
VALUES (:ticker_id, :timestamp, :open, :high, :low, :close, :volume, :vwap, :ts_epoch)
ON CONFLICT (ticker_id, timestamp) DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
volume = EXCLUDED.volume,
vwap = EXCLUDED.vwap,
ts_epoch = EXCLUDED.ts_epoch
""")
inserted = 0
for i in range(0, len(records), self.batch_size):
batch = records[i:i + self.batch_size]
for record in batch:
try:
await session.execute(query, record)
inserted += 1
except Exception as e:
logger.error(f"Error inserting record: {e}")
await session.commit()
logger.info(f"Inserted batch of {len(batch)} records")
return inserted
async def sync_ticker(
self,
ticker: Dict[str, Any],
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
timeframe: str = "5m"
) -> Dict[str, Any]:
"""
Sync data for a single ticker.
Returns:
Dict with sync results
"""
result = {
"symbol": ticker["symbol"],
"records_inserted": 0,
"error": None
}
async with get_db_context() as session:
try:
# Determine start date
if start_date is None:
latest = await self.get_latest_timestamp(
session, ticker["id"], timeframe
)
if latest:
start_date = latest + timedelta(minutes=5)
else:
start_date = datetime.now() - timedelta(
days=settings.sync_history_days
)
if end_date is None:
end_date = datetime.now()
logger.info(
f"Syncing {ticker['symbol']} from {start_date} to {end_date}"
)
# Fetch data from Polygon
bars = await self.polygon.get_forex_aggregates(
ticker=ticker["polygon_ticker"],
timeframe=timeframe,
start_date=start_date,
end_date=end_date
)
if not bars:
logger.warning(f"No data returned for {ticker['symbol']}")
return result
# Transform and insert
records = self.polygon.transform_to_ohlcv(bars, ticker["id"])
inserted = await self.insert_ohlcv_batch(session, records, timeframe)
result["records_inserted"] = inserted
logger.info(f"Synced {inserted} records for {ticker['symbol']}")
except Exception as e:
error_msg = f"Error syncing {ticker['symbol']}: {str(e)}"
logger.error(error_msg)
result["error"] = error_msg
return result
async def sync_all(
self,
symbols: Optional[List[str]] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
timeframe: str = "5m"
) -> Dict[str, Any]:
"""
Sync data for all active tickers.
Args:
symbols: List of symbols to sync (None = all active)
start_date: Start date for sync
end_date: End date for sync
timeframe: Timeframe to sync
Returns:
Sync status dictionary
"""
self.sync_status = {
"status": "running",
"message": "Sync started",
"symbols_processed": [],
"records_inserted": 0,
"errors": [],
"started_at": datetime.now(),
"completed_at": None
}
try:
async with get_db_context() as session:
tickers = await self.get_active_tickers(session)
if symbols:
tickers = [t for t in tickers if t["symbol"] in symbols]
logger.info(f"Starting sync for {len(tickers)} tickers")
for ticker in tickers:
result = await self.sync_ticker(
ticker=ticker,
start_date=start_date,
end_date=end_date,
timeframe=timeframe
)
self.sync_status["symbols_processed"].append(ticker["symbol"])
self.sync_status["records_inserted"] += result["records_inserted"]
if result["error"]:
self.sync_status["errors"].append(result["error"])
# Small delay between tickers for rate limiting
await asyncio.sleep(1)
self.sync_status["status"] = "completed"
self.sync_status["message"] = (
f"Sync completed. Processed {len(tickers)} symbols, "
f"inserted {self.sync_status['records_inserted']} records."
)
except Exception as e:
self.sync_status["status"] = "failed"
self.sync_status["message"] = str(e)
self.sync_status["errors"].append(str(e))
self.sync_status["completed_at"] = datetime.now()
return self.sync_status
def get_status(self) -> Dict[str, Any]:
"""Get current sync status."""
return self.sync_status