""" FastAPI Application OrbiQuant IA Trading Platform - Data Service Main application entry point with REST API, WebSocket support, and automatic data sync. UPDATED: Now includes Massive.com integration and automatic sync scheduler """ import asyncio import logging import signal from contextlib import asynccontextmanager from datetime import datetime from typing import Optional import asyncpg from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from config import Config from api.routes import router as api_router from api.sync_routes import router as sync_router from websocket.handlers import WSRouter, set_ws_manager from websocket.manager import WebSocketManager from providers.polygon_client import PolygonClient from providers.binance_client import BinanceClient from services.sync_service import DataSyncService from services.scheduler import SchedulerManager # Logging setup logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager.""" config = Config.from_env() # Store config app.state.config = config # Initialize database pool logger.info("Connecting to database...") app.state.db_pool = await asyncpg.create_pool( config.database.dsn, min_size=config.database.min_connections, max_size=config.database.max_connections ) logger.info("Database connection pool created") # Initialize Polygon client if config.polygon.api_key: app.state.polygon_client = PolygonClient( api_key=config.polygon.api_key, rate_limit_per_min=config.polygon.rate_limit_per_min, base_url=config.polygon.base_url, use_massive_url=config.polygon.base_url == "https://api.massive.com" ) logger.info(f"Polygon/Massive client initialized - URL: {config.polygon.base_url}") else: app.state.polygon_client = None logger.warning("Polygon/Massive client not initialized - API key missing") # Initialize Binance client import os binance_key = os.getenv("BINANCE_API_KEY") binance_secret = os.getenv("BINANCE_API_SECRET") if binance_key: app.state.binance_client = BinanceClient( api_key=binance_key, api_secret=binance_secret, testnet=os.getenv("BINANCE_TESTNET", "false").lower() == "true" ) logger.info("Binance client initialized") else: app.state.binance_client = None # Initialize WebSocket manager ws_manager = WebSocketManager() await ws_manager.start() app.state.ws_manager = ws_manager set_ws_manager(ws_manager) logger.info("WebSocket manager started") # Initialize sync service and scheduler if app.state.polygon_client: app.state.sync_service = DataSyncService( polygon_client=app.state.polygon_client, db_pool=app.state.db_pool ) logger.info("Data sync service initialized") # Start scheduler for automatic sync enable_scheduler = os.getenv("ENABLE_SYNC_SCHEDULER", "true").lower() == "true" if enable_scheduler: app.state.scheduler = await SchedulerManager.get_instance( sync_service=app.state.sync_service, sync_interval_minutes=config.sync_interval_minutes ) logger.info("Data sync scheduler started") else: app.state.scheduler = None logger.info("Sync scheduler disabled") else: app.state.sync_service = None app.state.scheduler = None logger.warning("Sync service and scheduler not initialized") # Store start time for uptime app.state.start_time = datetime.utcnow() logger.info("Data Service started successfully") yield # Application runs here # Shutdown logger.info("Shutting down Data Service...") # Stop scheduler if app.state.scheduler: await SchedulerManager.stop_instance() logger.info("Scheduler stopped") await ws_manager.stop() if app.state.binance_client: await app.state.binance_client.close() if app.state.polygon_client and hasattr(app.state.polygon_client, '_session'): if app.state.polygon_client._session: await app.state.polygon_client._session.close() await app.state.db_pool.close() logger.info("Data Service shutdown complete") def create_app() -> FastAPI: """Create and configure FastAPI application.""" app = FastAPI( title="OrbiQuant Data Service", description=""" Market data service for the OrbiQuant IA Trading Platform. ## Features - Real-time ticker prices - Historical OHLCV data (multiple timeframes) - Order book snapshots - WebSocket streaming - Multi-provider support (Polygon/Massive, Binance, MT4) - Automatic data synchronization - Scheduled background sync tasks ## Data Providers - **Massive.com/Polygon.io**: Forex, Crypto, Indices, Stocks - **Binance**: Crypto markets - **MT4**: Forex and CFDs ## WebSocket Channels - `ticker` - Real-time price updates - `candles` - OHLCV candle updates - `orderbook` - Order book snapshots - `trades` - Recent trades - `signals` - ML trading signals ## Sync Endpoints - `/api/sync/symbols` - List supported symbols - `/api/sync/sync/{symbol}` - Sync specific symbol - `/api/sync/status` - Get sync status """, version="2.0.0", docs_url="/docs", redoc_url="/redoc", lifespan=lifespan ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=[ "http://localhost:3000", "http://localhost:3001", "http://localhost:5173", "https://orbiquant.com", "https://*.orbiquant.com", ], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Global exception handler @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): logger.error(f"Unhandled exception: {exc}", exc_info=True) return JSONResponse( status_code=500, content={ "error": "Internal server error", "detail": str(exc) if app.debug else "An unexpected error occurred", "timestamp": datetime.utcnow().isoformat() } ) # Include routers app.include_router(api_router, tags=["Market Data"]) app.include_router(sync_router, tags=["Data Sync"]) # WebSocket router ws_router = WSRouter() app.include_router(ws_router.router, tags=["WebSocket"]) # Root endpoint @app.get("/", tags=["Root"]) async def root(): uptime = None if hasattr(app.state, 'start_time'): uptime = (datetime.utcnow() - app.state.start_time).total_seconds() return { "service": "OrbiQuant Data Service", "version": "2.0.0", "status": "running", "uptime_seconds": uptime, "features": { "polygon_massive": hasattr(app.state, 'polygon_client') and app.state.polygon_client is not None, "binance": hasattr(app.state, 'binance_client') and app.state.binance_client is not None, "auto_sync": hasattr(app.state, 'scheduler') and app.state.scheduler is not None, "websocket": True }, "endpoints": { "docs": "/docs", "health": "/health", "websocket": "/ws/stream", "symbols": "/api/sync/symbols", "sync_status": "/api/sync/status" } } # Scheduler status endpoint @app.get("/scheduler/status", tags=["Scheduler"]) async def scheduler_status(): """Get scheduler status and job list.""" if not hasattr(app.state, 'scheduler') or not app.state.scheduler: return { "enabled": False, "message": "Scheduler is disabled" } jobs = app.state.scheduler.get_jobs() return { "enabled": True, "running": app.state.scheduler._is_running, "jobs": jobs, "total_jobs": len(jobs) } return app # Create application instance app = create_app() if __name__ == "__main__": import uvicorn uvicorn.run( "app:app", host="0.0.0.0", port=8001, reload=True, log_level="info" )