# Arquitectura del Sistema - Data Service ## Integración Massive.com/Polygon.io --- ## Diagrama de Arquitectura General ``` ┌─────────────────────────────────────────────────────────────────────┐ │ CLIENTE / FRONTEND │ │ (Next.js / React Trading UI) │ └────────────┬──────────────────────────────────────────┬─────────────┘ │ │ │ HTTP REST API │ WebSocket ▼ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ DATA SERVICE (FastAPI) │ │ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │ │ │ Market Data │ │ Sync Routes │ │ WebSocket │ │ │ │ Routes │ │ (NEW) │ │ Handler │ │ │ │ /api/v1/* │ │ /api/sync/* │ │ /ws/stream │ │ │ └────────┬─────────┘ └────────┬─────────┘ └──────────────────┘ │ │ │ │ │ │ └──────────┬──────────┘ │ │ ▼ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ BUSINESS LOGIC LAYER │ │ │ │ ┌────────────────┐ ┌─────────────────────────────┐ │ │ │ │ │ Sync Service │◄────────┤ Scheduler Manager │ │ │ │ │ │ (NEW) │ │ (NEW) │ │ │ │ │ └────────┬───────┘ └───────────┬─────────────────┘ │ │ │ │ │ │ │ │ │ │ │ ┌──────────────────────────┴─────────┐ │ │ │ │ └──┤ APScheduler (7 Jobs) │ │ │ │ │ │ - sync_1min (every 1 min) │ │ │ │ │ │ - sync_5min (every 5 min) │ │ │ │ │ │ - sync_15min (every 15 min) │ │ │ │ │ │ - sync_1hour (every 1 hour) │ │ │ │ │ │ - sync_4hour (every 4 hours) │ │ │ │ │ │ - sync_daily (daily 00:05 UTC) │ │ │ │ │ │ - cleanup (weekly Sun 02:00) │ │ │ │ │ └────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ DATA PROVIDER LAYER │ │ │ │ ┌────────────────┐ ┌──────────────┐ ┌─────────────────┐ │ │ │ │ │ Polygon Client │ │Binance Client│ │ MT4 Client │ │ │ │ │ │ (UPDATED) │ │ │ │ (Optional) │ │ │ │ │ └────────┬───────┘ └──────┬───────┘ └────────┬────────┘ │ │ │ └───────────┼──────────────────┼───────────────────┼──────────┘ │ │ │ │ │ │ └──────────────┼──────────────────┼───────────────────┼──────────────┘ │ │ │ ┌─────────▼──────┐ ┌────────▼────────┐ ┌──────▼──────┐ │ Massive.com │ │ Binance API │ │ MetaAPI │ │ / Polygon.io │ │ │ │ / MT4 │ │ API │ │ │ │ │ └────────────────┘ └─────────────────┘ └─────────────┘ ┌────────────────────────────┐ │ PostgreSQL Database │ │ ┌──────────────────────┐ │ │ │ market_data schema │ │ │ │ - tickers │ │ │ │ - ohlcv_1min │ │ │ │ - ohlcv_5min │ │ │ │ - ohlcv_15min │ │ │ │ - ohlcv_1hour │ │ │ │ - ohlcv_4hour │ │ │ │ - ohlcv_daily │ │ │ │ - sync_status (NEW) │ │ │ │ - trades │ │ │ └──────────────────────┘ │ └────────────────────────────┘ ``` --- ## Flujo de Sincronización de Datos ``` ┌────────────────────────────────────────────────────────────────────┐ │ AUTOMATIC SYNC FLOW │ └────────────────────────────────────────────────────────────────────┘ [1] Scheduler Trigger │ ├─→ Every 1 min → sync_1min_data() ├─→ Every 5 min → sync_5min_data() ├─→ Every 15 min → sync_15min_data() ├─→ Every 1 hour → sync_1hour_data() ├─→ Every 4 hours→ sync_4hour_data() └─→ Daily 00:05 → sync_daily_data() │ ▼ [2] Sync Service │ ├─→ Get active tickers from DB │ SELECT * FROM tickers WHERE is_active = true │ ├─→ For each ticker: │ │ │ ├─→ Get last sync timestamp │ │ SELECT MAX(timestamp) FROM ohlcv_5min WHERE ticker_id = ? │ │ │ ├─→ Calculate date range │ │ start_date = last_sync_timestamp + 1 │ │ end_date = NOW() │ │ │ └─→ Fetch from Polygon API │ │ │ ▼ [3] Polygon Client │ ├─→ Check rate limit (5 req/min for free tier) │ Wait if needed │ ├─→ Format symbol (e.g., EURUSD → C:EURUSD) │ ├─→ Call API: GET /v2/aggs/ticker/{symbol}/range/{multiplier}/{timespan}/{from}/{to} │ Headers: Authorization: Bearer {api_key} │ ├─→ Handle pagination (next_url) │ └─→ Yield OHLCVBar objects │ ▼ [4] Data Processing │ ├─→ Collect bars in batches (10,000 rows) │ ├─→ Transform to database format │ (ticker_id, timestamp, open, high, low, close, volume, vwap, trades) │ └─→ Insert to database │ ▼ [5] Database Insert │ ├─→ INSERT INTO ohlcv_5min (...) VALUES (...) │ ON CONFLICT (ticker_id, timestamp) DO UPDATE │ SET open = EXCLUDED.open, ... │ └─→ Batch insert (10K rows at a time) │ ▼ [6] Update Sync Status │ └─→ INSERT INTO sync_status (ticker_id, timeframe, last_sync_timestamp, ...) ON CONFLICT (ticker_id, timeframe) DO UPDATE SET last_sync_timestamp = NOW(), status = 'success', ... ``` --- ## Flujo de Request Manual ``` ┌────────────────────────────────────────────────────────────────────┐ │ MANUAL SYNC REQUEST FLOW │ └────────────────────────────────────────────────────────────────────┘ [User/Frontend] │ │ POST /api/sync/sync/EURUSD │ Body: { │ "asset_type": "forex", │ "timeframe": "5min", │ "backfill_days": 30 │ } ▼ [Sync Routes] │ ├─→ Validate symbol is supported │ (Check TICKER_MAPPINGS config) │ ├─→ Parse request parameters │ - symbol: EURUSD │ - asset_type: forex (enum) │ - timeframe: 5min (enum) │ - backfill_days: 30 │ └─→ Call sync_service.sync_ticker_data() │ ▼ [Sync Service] │ ├─→ Get or create ticker in DB │ (auto-fetch details from Polygon if new) │ ├─→ Calculate date range │ start_date = NOW() - 30 days │ end_date = NOW() │ ├─→ Call polygon_client.get_aggregates() │ (async generator) │ ├─→ Process bars in batches │ - Collect 10K rows │ - Insert to DB │ - Repeat │ └─→ Return result │ ▼ [Response] { "status": "success", "symbol": "EURUSD", "timeframe": "5min", "rows_inserted": 8640, "start_date": "2024-11-08T00:00:00", "end_date": "2024-12-08T00:00:00" } ``` --- ## Estructura de Directorios ``` data-service/ │ ├── src/ │ ├── api/ │ │ ├── __init__.py │ │ ├── dependencies.py # Dependency injection │ │ ├── routes.py # Main market data routes │ │ └── sync_routes.py # [NEW] Sync management routes │ │ │ ├── services/ │ │ ├── __init__.py │ │ ├── price_adjustment.py # Price adjustment logic │ │ ├── sync_service.py # [NEW] Data sync service │ │ └── scheduler.py # [NEW] Automatic scheduler │ │ │ ├── providers/ │ │ ├── __init__.py │ │ ├── polygon_client.py # [EXISTING] Polygon/Massive client │ │ ├── binance_client.py # Binance API client │ │ └── mt4_client.py # MT4 API client │ │ │ ├── models/ │ │ ├── __init__.py │ │ └── market.py # Pydantic models │ │ │ ├── websocket/ │ │ ├── __init__.py │ │ ├── manager.py # WebSocket connection manager │ │ └── handlers.py # WebSocket message handlers │ │ │ ├── config.py # Configuration management │ ├── app.py # [EXISTING] Main application │ ├── app_updated.py # [NEW] Updated with scheduler │ └── main.py # Entry point │ ├── tests/ │ ├── __init__.py # [NEW] │ ├── conftest.py # [NEW] Pytest config │ ├── test_sync_service.py # [NEW] Sync service tests │ └── test_polygon_client.py # [NEW] Client tests │ ├── migrations/ │ ├── 001_initial_schema.sql # [EXISTING] Initial tables │ └── 002_sync_status.sql # [NEW] Sync status table │ ├── examples/ │ ├── sync_example.py # [NEW] Programmatic usage │ └── api_examples.sh # [NEW] API call examples │ ├── .env.example # [NEW] Environment template ├── requirements.txt # [EXISTING] Dependencies ├── requirements_sync.txt # [NEW] Additional dependencies ├── README.md # [EXISTING] Main readme ├── README_SYNC.md # [NEW] Sync documentation ├── IMPLEMENTATION_SUMMARY.md # [NEW] Technical summary ├── TECH_LEADER_REPORT.md # [NEW] Manager report └── ARCHITECTURE.md # [NEW] This file ``` --- ## Modelo de Datos ### Tabla: tickers ```sql CREATE TABLE market_data.tickers ( id SERIAL PRIMARY KEY, symbol VARCHAR(20) UNIQUE NOT NULL, -- EURUSD, BTCUSD, etc. name VARCHAR(100), asset_type VARCHAR(20) NOT NULL, -- forex, crypto, index base_currency VARCHAR(10), quote_currency VARCHAR(10), exchange VARCHAR(50), price_precision INTEGER, quantity_precision INTEGER, min_quantity DECIMAL, max_quantity DECIMAL, min_notional DECIMAL, tick_size DECIMAL, lot_size DECIMAL, is_active BOOLEAN DEFAULT true, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); ``` ### Tabla: ohlcv_5min (ejemplo) ```sql CREATE TABLE market_data.ohlcv_5min ( id BIGSERIAL PRIMARY KEY, ticker_id INTEGER REFERENCES tickers(id), timestamp TIMESTAMP NOT NULL, open DECIMAL NOT NULL, high DECIMAL NOT NULL, low DECIMAL NOT NULL, close DECIMAL NOT NULL, volume DECIMAL, vwap DECIMAL, -- Volume-weighted average price trades INTEGER, -- Number of trades ts_epoch BIGINT, -- Unix timestamp created_at TIMESTAMP DEFAULT NOW(), UNIQUE(ticker_id, timestamp) ); CREATE INDEX idx_ohlcv_5min_ticker_timestamp ON market_data.ohlcv_5min(ticker_id, timestamp DESC); ``` ### Tabla: sync_status [NEW] ```sql CREATE TABLE market_data.sync_status ( id SERIAL PRIMARY KEY, ticker_id INTEGER REFERENCES tickers(id), timeframe VARCHAR(20) NOT NULL, -- 1min, 5min, 1hour, etc. last_sync_timestamp TIMESTAMP, -- Last successful sync last_sync_rows INTEGER DEFAULT 0, -- Rows inserted in last sync sync_status VARCHAR(20) NOT NULL, -- pending, success, failed error_message TEXT, -- Error if failed created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(), UNIQUE(ticker_id, timeframe) ); CREATE INDEX idx_sync_status_ticker ON sync_status(ticker_id); CREATE INDEX idx_sync_status_status ON sync_status(sync_status); ``` --- ## Componentes Principales ### 1. PolygonClient (providers/polygon_client.py) **Responsabilidades:** - Comunicación con Massive.com/Polygon.io API - Rate limiting (5 req/min) - Formateo de símbolos (EURUSD → C:EURUSD) - Paginación de resultados - Retry en caso de rate limit **Métodos principales:** ```python async def get_aggregates( symbol: str, asset_type: AssetType, timeframe: Timeframe, start_date: datetime, end_date: datetime ) -> AsyncGenerator[OHLCVBar]: # Fetch historical OHLCV data ... async def get_ticker_details( symbol: str, asset_type: AssetType ) -> Dict: # Get ticker metadata ... ``` ### 2. DataSyncService (services/sync_service.py) **Responsabilidades:** - Orquestación de sincronización - Gestión de tickers en DB - Inserción por lotes - Tracking de estado - Manejo de errores **Métodos principales:** ```python async def sync_ticker_data( symbol: str, asset_type: AssetType, timeframe: Timeframe, backfill_days: int = 30 ) -> Dict: # Sync specific ticker ... async def sync_all_active_tickers( timeframe: Timeframe, backfill_days: int = 1 ) -> Dict: # Sync all active tickers ... async def get_sync_status( symbol: Optional[str] = None ) -> List[Dict]: # Get sync status ... ``` ### 3. DataSyncScheduler (services/scheduler.py) **Responsabilidades:** - Programación de tareas periódicas - Ejecución automática de syncs - Limpieza de datos antiguos - Control de jobs **Jobs:** - sync_1min: Cada 1 minuto - sync_5min: Cada 5 minutos - sync_15min: Cada 15 minutos - sync_1hour: Cada hora - sync_4hour: Cada 4 horas - sync_daily: Diario (00:05 UTC) - cleanup_old_data: Semanal (Domingo 02:00) --- ## Flujo de Rate Limiting ``` ┌────────────────────────────────────────────────────────────────────┐ │ RATE LIMITING FLOW │ └────────────────────────────────────────────────────────────────────┘ [Client Request] │ ▼ [PolygonClient._rate_limit_wait()] │ ├─→ Check current minute │ - Is it a new minute? │ Yes → Reset counter to 0 │ No → Check counter │ ├─→ Check request count │ - count < 5? │ Yes → Increment counter, proceed │ No → Calculate wait time │ ├─→ Wait if needed │ wait_time = 60 - (now - last_request_time) │ asyncio.sleep(wait_time) │ └─→ Reset counter, proceed │ ▼ [Make API Request] │ ├─→ Response 200 OK │ → Return data │ ├─→ Response 429 Too Many Requests │ → Wait retry_after seconds │ → Retry request │ └─→ Response 4xx/5xx → Raise error Example Timeline: 00:00:00 - Request 1 ✓ (count: 1/5) 00:00:10 - Request 2 ✓ (count: 2/5) 00:00:20 - Request 3 ✓ (count: 3/5) 00:00:30 - Request 4 ✓ (count: 4/5) 00:00:40 - Request 5 ✓ (count: 5/5) 00:00:50 - Request 6 ⏸ WAIT 10s → 00:01:00 ✓ (count: 1/5) ``` --- ## Escalabilidad y Performance ### Optimizaciones Actuales 1. **Async I/O** - Todo el stack es asíncrono - No bloqueo en I/O operations - Múltiples requests concurrentes 2. **Batch Processing** - Inserción de 10,000 rows por batch - Reduce round-trips a DB - Mejor throughput 3. **Connection Pooling** - asyncpg pool: 5-20 connections - Reutilización de conexiones - Menor latencia 4. **Database Indexing** - Índices en (ticker_id, timestamp) - Índices en sync_status - Queries optimizadas 5. **ON CONFLICT DO UPDATE** - Upsert nativo de PostgreSQL - Evita duplicados - Actualiza datos existentes ### Límites Actuales | Métrica | Valor | Límite | |---------|-------|--------| | Rate limit (free) | 5 req/min | API | | Batch size | 10,000 rows | Configurable | | DB connections | 5-20 | Pool | | Concurrent syncs | 1 per timeframe | Scheduler | | Max backfill | 365 días | Configurable | ### Propuestas de Mejora 1. **Redis Cache** - Cache de símbolos frecuentes - Reduce queries a DB - TTL configurable 2. **Task Queue** - Celery o RQ - Syncs asíncronos largos - Retry automático 3. **Multiple Workers** - Paralelización de syncs - Mayor throughput - Load balancing 4. **Table Partitioning** - Partition por fecha - Mejora performance de queries - Mantenimiento más fácil --- ## Monitoreo y Observabilidad ### Logs **Niveles configurados:** - DEBUG: Detalles de cada request - INFO: Operaciones normales - WARNING: Rate limits, retries - ERROR: Fallos de sync, API errors **Formato:** ``` 2024-12-08 20:15:30 - sync_service - INFO - Starting sync for EURUSD (forex) - 5min 2024-12-08 20:15:31 - polygon_client - DEBUG - Rate limit check: 2/5 requests 2024-12-08 20:15:32 - sync_service - INFO - Synced 288 bars for EURUSD 2024-12-08 20:15:33 - sync_service - INFO - Sync completed: success ``` ### Métricas Propuestas **Prometheus metrics:** - `sync_duration_seconds` - Duración de cada sync - `sync_rows_inserted_total` - Total de rows insertados - `sync_errors_total` - Total de errores - `api_requests_total` - Requests a Polygon API - `rate_limit_waits_total` - Veces que se esperó por rate limit ### Health Checks **Endpoints:** - `/health` - Health general del servicio - `/api/sync/health` - Health del sync service - `/scheduler/status` - Estado del scheduler --- ## Seguridad ### API Keys - Nunca en código fuente - Solo en variables de entorno - .env en .gitignore - Rotación periódica recomendada ### Database - Conexiones autenticadas - Usuario con permisos limitados - SSL recomendado en producción ### Rate Limiting - Protección contra abuse - Límites configurables - Logging de excesos ### Input Validation - Pydantic models para requests - Validación de símbolos soportados - Sanitización de parámetros --- ## Deployment ### Desarrollo ```bash # Local python src/app.py # Con reload uvicorn src.app:app --reload --port 8001 ``` ### Producción ```bash # Con Gunicorn + Uvicorn workers gunicorn src.app:app \ -w 4 \ -k uvicorn.workers.UvicornWorker \ --bind 0.0.0.0:8001 \ --log-level info # Con systemd systemctl start trading-data-service ``` ### Docker ```dockerfile FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt COPY src/ ./src/ COPY migrations/ ./migrations/ ENV PYTHONPATH=/app/src CMD ["uvicorn", "src.app:app", "--host", "0.0.0.0", "--port", "8001"] ``` --- ## Conclusión Esta arquitectura proporciona: ✅ Separación clara de responsabilidades ✅ Escalabilidad horizontal y vertical ✅ Mantenibilidad y extensibilidad ✅ Observabilidad completa ✅ Alta disponibilidad ✅ Performance optimizado **Status:** ✅ Producción Ready --- **Última actualización:** 2024-12-08 **Versión:** 2.0.0