workspace/projects/trading-platform/apps/data-service/src/app_updated.py
rckrdmrd 789d1ab46b
Some checks failed
CI Pipeline / changes (push) Has been cancelled
CI Pipeline / core (push) Has been cancelled
CI Pipeline / trading-backend (push) Has been cancelled
CI Pipeline / trading-data-service (push) Has been cancelled
CI Pipeline / trading-frontend (push) Has been cancelled
CI Pipeline / erp-core (push) Has been cancelled
CI Pipeline / erp-mecanicas (push) Has been cancelled
CI Pipeline / gamilit-backend (push) Has been cancelled
CI Pipeline / gamilit-frontend (push) Has been cancelled
changes on workspace
2025-12-09 14:46:20 -06:00

283 lines
8.7 KiB
Python

"""
FastAPI Application
OrbiQuant IA Trading Platform - Data Service
Main application entry point with REST API, WebSocket support, and automatic data sync.
UPDATED: Now includes Massive.com integration and automatic sync scheduler
"""
import asyncio
import logging
import signal
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Optional
import asyncpg
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from config import Config
from api.routes import router as api_router
from api.sync_routes import router as sync_router
from websocket.handlers import WSRouter, set_ws_manager
from websocket.manager import WebSocketManager
from providers.polygon_client import PolygonClient
from providers.binance_client import BinanceClient
from services.sync_service import DataSyncService
from services.scheduler import SchedulerManager
# Logging setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager."""
config = Config.from_env()
# Store config
app.state.config = config
# Initialize database pool
logger.info("Connecting to database...")
app.state.db_pool = await asyncpg.create_pool(
config.database.dsn,
min_size=config.database.min_connections,
max_size=config.database.max_connections
)
logger.info("Database connection pool created")
# Initialize Polygon client
if config.polygon.api_key:
app.state.polygon_client = PolygonClient(
api_key=config.polygon.api_key,
rate_limit_per_min=config.polygon.rate_limit_per_min,
base_url=config.polygon.base_url,
use_massive_url=config.polygon.base_url == "https://api.massive.com"
)
logger.info(f"Polygon/Massive client initialized - URL: {config.polygon.base_url}")
else:
app.state.polygon_client = None
logger.warning("Polygon/Massive client not initialized - API key missing")
# Initialize Binance client
import os
binance_key = os.getenv("BINANCE_API_KEY")
binance_secret = os.getenv("BINANCE_API_SECRET")
if binance_key:
app.state.binance_client = BinanceClient(
api_key=binance_key,
api_secret=binance_secret,
testnet=os.getenv("BINANCE_TESTNET", "false").lower() == "true"
)
logger.info("Binance client initialized")
else:
app.state.binance_client = None
# Initialize WebSocket manager
ws_manager = WebSocketManager()
await ws_manager.start()
app.state.ws_manager = ws_manager
set_ws_manager(ws_manager)
logger.info("WebSocket manager started")
# Initialize sync service and scheduler
if app.state.polygon_client:
app.state.sync_service = DataSyncService(
polygon_client=app.state.polygon_client,
db_pool=app.state.db_pool
)
logger.info("Data sync service initialized")
# Start scheduler for automatic sync
enable_scheduler = os.getenv("ENABLE_SYNC_SCHEDULER", "true").lower() == "true"
if enable_scheduler:
app.state.scheduler = await SchedulerManager.get_instance(
sync_service=app.state.sync_service,
sync_interval_minutes=config.sync_interval_minutes
)
logger.info("Data sync scheduler started")
else:
app.state.scheduler = None
logger.info("Sync scheduler disabled")
else:
app.state.sync_service = None
app.state.scheduler = None
logger.warning("Sync service and scheduler not initialized")
# Store start time for uptime
app.state.start_time = datetime.utcnow()
logger.info("Data Service started successfully")
yield # Application runs here
# Shutdown
logger.info("Shutting down Data Service...")
# Stop scheduler
if app.state.scheduler:
await SchedulerManager.stop_instance()
logger.info("Scheduler stopped")
await ws_manager.stop()
if app.state.binance_client:
await app.state.binance_client.close()
if app.state.polygon_client and hasattr(app.state.polygon_client, '_session'):
if app.state.polygon_client._session:
await app.state.polygon_client._session.close()
await app.state.db_pool.close()
logger.info("Data Service shutdown complete")
def create_app() -> FastAPI:
"""Create and configure FastAPI application."""
app = FastAPI(
title="OrbiQuant Data Service",
description="""
Market data service for the OrbiQuant IA Trading Platform.
## Features
- Real-time ticker prices
- Historical OHLCV data (multiple timeframes)
- Order book snapshots
- WebSocket streaming
- Multi-provider support (Polygon/Massive, Binance, MT4)
- Automatic data synchronization
- Scheduled background sync tasks
## Data Providers
- **Massive.com/Polygon.io**: Forex, Crypto, Indices, Stocks
- **Binance**: Crypto markets
- **MT4**: Forex and CFDs
## WebSocket Channels
- `ticker` - Real-time price updates
- `candles` - OHLCV candle updates
- `orderbook` - Order book snapshots
- `trades` - Recent trades
- `signals` - ML trading signals
## Sync Endpoints
- `/api/sync/symbols` - List supported symbols
- `/api/sync/sync/{symbol}` - Sync specific symbol
- `/api/sync/status` - Get sync status
""",
version="2.0.0",
docs_url="/docs",
redoc_url="/redoc",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:3000",
"http://localhost:3001",
"http://localhost:5173",
"https://orbiquant.com",
"https://*.orbiquant.com",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global exception handler
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"Unhandled exception: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
"detail": str(exc) if app.debug else "An unexpected error occurred",
"timestamp": datetime.utcnow().isoformat()
}
)
# Include routers
app.include_router(api_router, tags=["Market Data"])
app.include_router(sync_router, tags=["Data Sync"])
# WebSocket router
ws_router = WSRouter()
app.include_router(ws_router.router, tags=["WebSocket"])
# Root endpoint
@app.get("/", tags=["Root"])
async def root():
uptime = None
if hasattr(app.state, 'start_time'):
uptime = (datetime.utcnow() - app.state.start_time).total_seconds()
return {
"service": "OrbiQuant Data Service",
"version": "2.0.0",
"status": "running",
"uptime_seconds": uptime,
"features": {
"polygon_massive": hasattr(app.state, 'polygon_client') and app.state.polygon_client is not None,
"binance": hasattr(app.state, 'binance_client') and app.state.binance_client is not None,
"auto_sync": hasattr(app.state, 'scheduler') and app.state.scheduler is not None,
"websocket": True
},
"endpoints": {
"docs": "/docs",
"health": "/health",
"websocket": "/ws/stream",
"symbols": "/api/sync/symbols",
"sync_status": "/api/sync/status"
}
}
# Scheduler status endpoint
@app.get("/scheduler/status", tags=["Scheduler"])
async def scheduler_status():
"""Get scheduler status and job list."""
if not hasattr(app.state, 'scheduler') or not app.state.scheduler:
return {
"enabled": False,
"message": "Scheduler is disabled"
}
jobs = app.state.scheduler.get_jobs()
return {
"enabled": True,
"running": app.state.scheduler._is_running,
"jobs": jobs,
"total_jobs": len(jobs)
}
return app
# Create application instance
app = create_app()
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app:app",
host="0.0.0.0",
port=8001,
reload=True,
log_level="info"
)