Data aggregation and distribution service: - Market data collection - OHLCV aggregation - Real-time data feeds - Data API endpoints Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
23 KiB
23 KiB
Arquitectura del Sistema - Data Service
Integración Massive.com/Polygon.io
Diagrama de Arquitectura General
┌─────────────────────────────────────────────────────────────────────┐
│ CLIENTE / FRONTEND │
│ (Next.js / React Trading UI) │
└────────────┬──────────────────────────────────────────┬─────────────┘
│ │
│ HTTP REST API │ WebSocket
▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ DATA SERVICE (FastAPI) │
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Market Data │ │ Sync Routes │ │ WebSocket │ │
│ │ Routes │ │ (NEW) │ │ Handler │ │
│ │ /api/v1/* │ │ /api/sync/* │ │ /ws/stream │ │
│ └────────┬─────────┘ └────────┬─────────┘ └──────────────────┘ │
│ │ │ │
│ └──────────┬──────────┘ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ BUSINESS LOGIC LAYER │ │
│ │ ┌────────────────┐ ┌─────────────────────────────┐ │ │
│ │ │ Sync Service │◄────────┤ Scheduler Manager │ │ │
│ │ │ (NEW) │ │ (NEW) │ │ │
│ │ └────────┬───────┘ └───────────┬─────────────────┘ │ │
│ │ │ │ │ │
│ │ │ ┌──────────────────────────┴─────────┐ │ │
│ │ └──┤ APScheduler (7 Jobs) │ │ │
│ │ │ - sync_1min (every 1 min) │ │ │
│ │ │ - sync_5min (every 5 min) │ │ │
│ │ │ - sync_15min (every 15 min) │ │ │
│ │ │ - sync_1hour (every 1 hour) │ │ │
│ │ │ - sync_4hour (every 4 hours) │ │ │
│ │ │ - sync_daily (daily 00:05 UTC) │ │ │
│ │ │ - cleanup (weekly Sun 02:00) │ │ │
│ │ └────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ DATA PROVIDER LAYER │ │
│ │ ┌────────────────┐ ┌──────────────┐ ┌─────────────────┐ │ │
│ │ │ Polygon Client │ │Binance Client│ │ MT4 Client │ │ │
│ │ │ (UPDATED) │ │ │ │ (Optional) │ │ │
│ │ └────────┬───────┘ └──────┬───────┘ └────────┬────────┘ │ │
│ └───────────┼──────────────────┼───────────────────┼──────────┘ │
│ │ │ │ │
└──────────────┼──────────────────┼───────────────────┼──────────────┘
│ │ │
┌─────────▼──────┐ ┌────────▼────────┐ ┌──────▼──────┐
│ Massive.com │ │ Binance API │ │ MetaAPI │
│ / Polygon.io │ │ │ │ / MT4 │
│ API │ │ │ │ │
└────────────────┘ └─────────────────┘ └─────────────┘
┌────────────────────────────┐
│ PostgreSQL Database │
│ ┌──────────────────────┐ │
│ │ market_data schema │ │
│ │ - tickers │ │
│ │ - ohlcv_1min │ │
│ │ - ohlcv_5min │ │
│ │ - ohlcv_15min │ │
│ │ - ohlcv_1hour │ │
│ │ - ohlcv_4hour │ │
│ │ - ohlcv_daily │ │
│ │ - sync_status (NEW) │ │
│ │ - trades │ │
│ └──────────────────────┘ │
└────────────────────────────┘
Flujo de Sincronización de Datos
┌────────────────────────────────────────────────────────────────────┐
│ AUTOMATIC SYNC FLOW │
└────────────────────────────────────────────────────────────────────┘
[1] Scheduler Trigger
│
├─→ Every 1 min → sync_1min_data()
├─→ Every 5 min → sync_5min_data()
├─→ Every 15 min → sync_15min_data()
├─→ Every 1 hour → sync_1hour_data()
├─→ Every 4 hours→ sync_4hour_data()
└─→ Daily 00:05 → sync_daily_data()
│
▼
[2] Sync Service
│
├─→ Get active tickers from DB
│ SELECT * FROM tickers WHERE is_active = true
│
├─→ For each ticker:
│ │
│ ├─→ Get last sync timestamp
│ │ SELECT MAX(timestamp) FROM ohlcv_5min WHERE ticker_id = ?
│ │
│ ├─→ Calculate date range
│ │ start_date = last_sync_timestamp + 1
│ │ end_date = NOW()
│ │
│ └─→ Fetch from Polygon API
│ │
│ ▼
[3] Polygon Client
│
├─→ Check rate limit (5 req/min for free tier)
│ Wait if needed
│
├─→ Format symbol (e.g., EURUSD → C:EURUSD)
│
├─→ Call API: GET /v2/aggs/ticker/{symbol}/range/{multiplier}/{timespan}/{from}/{to}
│ Headers: Authorization: Bearer {api_key}
│
├─→ Handle pagination (next_url)
│
└─→ Yield OHLCVBar objects
│
▼
[4] Data Processing
│
├─→ Collect bars in batches (10,000 rows)
│
├─→ Transform to database format
│ (ticker_id, timestamp, open, high, low, close, volume, vwap, trades)
│
└─→ Insert to database
│
▼
[5] Database Insert
│
├─→ INSERT INTO ohlcv_5min (...) VALUES (...)
│ ON CONFLICT (ticker_id, timestamp) DO UPDATE
│ SET open = EXCLUDED.open, ...
│
└─→ Batch insert (10K rows at a time)
│
▼
[6] Update Sync Status
│
└─→ INSERT INTO sync_status (ticker_id, timeframe, last_sync_timestamp, ...)
ON CONFLICT (ticker_id, timeframe) DO UPDATE
SET last_sync_timestamp = NOW(), status = 'success', ...
Flujo de Request Manual
┌────────────────────────────────────────────────────────────────────┐
│ MANUAL SYNC REQUEST FLOW │
└────────────────────────────────────────────────────────────────────┘
[User/Frontend]
│
│ POST /api/sync/sync/EURUSD
│ Body: {
│ "asset_type": "forex",
│ "timeframe": "5min",
│ "backfill_days": 30
│ }
▼
[Sync Routes]
│
├─→ Validate symbol is supported
│ (Check TICKER_MAPPINGS config)
│
├─→ Parse request parameters
│ - symbol: EURUSD
│ - asset_type: forex (enum)
│ - timeframe: 5min (enum)
│ - backfill_days: 30
│
└─→ Call sync_service.sync_ticker_data()
│
▼
[Sync Service]
│
├─→ Get or create ticker in DB
│ (auto-fetch details from Polygon if new)
│
├─→ Calculate date range
│ start_date = NOW() - 30 days
│ end_date = NOW()
│
├─→ Call polygon_client.get_aggregates()
│ (async generator)
│
├─→ Process bars in batches
│ - Collect 10K rows
│ - Insert to DB
│ - Repeat
│
└─→ Return result
│
▼
[Response]
{
"status": "success",
"symbol": "EURUSD",
"timeframe": "5min",
"rows_inserted": 8640,
"start_date": "2024-11-08T00:00:00",
"end_date": "2024-12-08T00:00:00"
}
Estructura de Directorios
data-service/
│
├── src/
│ ├── api/
│ │ ├── __init__.py
│ │ ├── dependencies.py # Dependency injection
│ │ ├── routes.py # Main market data routes
│ │ └── sync_routes.py # [NEW] Sync management routes
│ │
│ ├── services/
│ │ ├── __init__.py
│ │ ├── price_adjustment.py # Price adjustment logic
│ │ ├── sync_service.py # [NEW] Data sync service
│ │ └── scheduler.py # [NEW] Automatic scheduler
│ │
│ ├── providers/
│ │ ├── __init__.py
│ │ ├── polygon_client.py # [EXISTING] Polygon/Massive client
│ │ ├── binance_client.py # Binance API client
│ │ └── mt4_client.py # MT4 API client
│ │
│ ├── models/
│ │ ├── __init__.py
│ │ └── market.py # Pydantic models
│ │
│ ├── websocket/
│ │ ├── __init__.py
│ │ ├── manager.py # WebSocket connection manager
│ │ └── handlers.py # WebSocket message handlers
│ │
│ ├── config.py # Configuration management
│ ├── app.py # [EXISTING] Main application
│ ├── app_updated.py # [NEW] Updated with scheduler
│ └── main.py # Entry point
│
├── tests/
│ ├── __init__.py # [NEW]
│ ├── conftest.py # [NEW] Pytest config
│ ├── test_sync_service.py # [NEW] Sync service tests
│ └── test_polygon_client.py # [NEW] Client tests
│
├── migrations/
│ ├── 001_initial_schema.sql # [EXISTING] Initial tables
│ └── 002_sync_status.sql # [NEW] Sync status table
│
├── examples/
│ ├── sync_example.py # [NEW] Programmatic usage
│ └── api_examples.sh # [NEW] API call examples
│
├── .env.example # [NEW] Environment template
├── requirements.txt # [EXISTING] Dependencies
├── requirements_sync.txt # [NEW] Additional dependencies
├── README.md # [EXISTING] Main readme
├── README_SYNC.md # [NEW] Sync documentation
├── IMPLEMENTATION_SUMMARY.md # [NEW] Technical summary
├── TECH_LEADER_REPORT.md # [NEW] Manager report
└── ARCHITECTURE.md # [NEW] This file
Modelo de Datos
Tabla: tickers
CREATE TABLE market_data.tickers (
id SERIAL PRIMARY KEY,
symbol VARCHAR(20) UNIQUE NOT NULL, -- EURUSD, BTCUSD, etc.
name VARCHAR(100),
asset_type VARCHAR(20) NOT NULL, -- forex, crypto, index
base_currency VARCHAR(10),
quote_currency VARCHAR(10),
exchange VARCHAR(50),
price_precision INTEGER,
quantity_precision INTEGER,
min_quantity DECIMAL,
max_quantity DECIMAL,
min_notional DECIMAL,
tick_size DECIMAL,
lot_size DECIMAL,
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
Tabla: ohlcv_5min (ejemplo)
CREATE TABLE market_data.ohlcv_5min (
id BIGSERIAL PRIMARY KEY,
ticker_id INTEGER REFERENCES tickers(id),
timestamp TIMESTAMP NOT NULL,
open DECIMAL NOT NULL,
high DECIMAL NOT NULL,
low DECIMAL NOT NULL,
close DECIMAL NOT NULL,
volume DECIMAL,
vwap DECIMAL, -- Volume-weighted average price
trades INTEGER, -- Number of trades
ts_epoch BIGINT, -- Unix timestamp
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(ticker_id, timestamp)
);
CREATE INDEX idx_ohlcv_5min_ticker_timestamp
ON market_data.ohlcv_5min(ticker_id, timestamp DESC);
Tabla: sync_status [NEW]
CREATE TABLE market_data.sync_status (
id SERIAL PRIMARY KEY,
ticker_id INTEGER REFERENCES tickers(id),
timeframe VARCHAR(20) NOT NULL, -- 1min, 5min, 1hour, etc.
last_sync_timestamp TIMESTAMP, -- Last successful sync
last_sync_rows INTEGER DEFAULT 0, -- Rows inserted in last sync
sync_status VARCHAR(20) NOT NULL, -- pending, success, failed
error_message TEXT, -- Error if failed
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
UNIQUE(ticker_id, timeframe)
);
CREATE INDEX idx_sync_status_ticker ON sync_status(ticker_id);
CREATE INDEX idx_sync_status_status ON sync_status(sync_status);
Componentes Principales
1. PolygonClient (providers/polygon_client.py)
Responsabilidades:
- Comunicación con Massive.com/Polygon.io API
- Rate limiting (5 req/min)
- Formateo de símbolos (EURUSD → C:EURUSD)
- Paginación de resultados
- Retry en caso de rate limit
Métodos principales:
async def get_aggregates(
symbol: str,
asset_type: AssetType,
timeframe: Timeframe,
start_date: datetime,
end_date: datetime
) -> AsyncGenerator[OHLCVBar]:
# Fetch historical OHLCV data
...
async def get_ticker_details(
symbol: str,
asset_type: AssetType
) -> Dict:
# Get ticker metadata
...
2. DataSyncService (services/sync_service.py)
Responsabilidades:
- Orquestación de sincronización
- Gestión de tickers en DB
- Inserción por lotes
- Tracking de estado
- Manejo de errores
Métodos principales:
async def sync_ticker_data(
symbol: str,
asset_type: AssetType,
timeframe: Timeframe,
backfill_days: int = 30
) -> Dict:
# Sync specific ticker
...
async def sync_all_active_tickers(
timeframe: Timeframe,
backfill_days: int = 1
) -> Dict:
# Sync all active tickers
...
async def get_sync_status(
symbol: Optional[str] = None
) -> List[Dict]:
# Get sync status
...
3. DataSyncScheduler (services/scheduler.py)
Responsabilidades:
- Programación de tareas periódicas
- Ejecución automática de syncs
- Limpieza de datos antiguos
- Control de jobs
Jobs:
- sync_1min: Cada 1 minuto
- sync_5min: Cada 5 minutos
- sync_15min: Cada 15 minutos
- sync_1hour: Cada hora
- sync_4hour: Cada 4 horas
- sync_daily: Diario (00:05 UTC)
- cleanup_old_data: Semanal (Domingo 02:00)
Flujo de Rate Limiting
┌────────────────────────────────────────────────────────────────────┐
│ RATE LIMITING FLOW │
└────────────────────────────────────────────────────────────────────┘
[Client Request]
│
▼
[PolygonClient._rate_limit_wait()]
│
├─→ Check current minute
│ - Is it a new minute?
│ Yes → Reset counter to 0
│ No → Check counter
│
├─→ Check request count
│ - count < 5?
│ Yes → Increment counter, proceed
│ No → Calculate wait time
│
├─→ Wait if needed
│ wait_time = 60 - (now - last_request_time)
│ asyncio.sleep(wait_time)
│
└─→ Reset counter, proceed
│
▼
[Make API Request]
│
├─→ Response 200 OK
│ → Return data
│
├─→ Response 429 Too Many Requests
│ → Wait retry_after seconds
│ → Retry request
│
└─→ Response 4xx/5xx
→ Raise error
Example Timeline:
00:00:00 - Request 1 ✓ (count: 1/5)
00:00:10 - Request 2 ✓ (count: 2/5)
00:00:20 - Request 3 ✓ (count: 3/5)
00:00:30 - Request 4 ✓ (count: 4/5)
00:00:40 - Request 5 ✓ (count: 5/5)
00:00:50 - Request 6 ⏸ WAIT 10s → 00:01:00 ✓ (count: 1/5)
Escalabilidad y Performance
Optimizaciones Actuales
-
Async I/O
- Todo el stack es asíncrono
- No bloqueo en I/O operations
- Múltiples requests concurrentes
-
Batch Processing
- Inserción de 10,000 rows por batch
- Reduce round-trips a DB
- Mejor throughput
-
Connection Pooling
- asyncpg pool: 5-20 connections
- Reutilización de conexiones
- Menor latencia
-
Database Indexing
- Índices en (ticker_id, timestamp)
- Índices en sync_status
- Queries optimizadas
-
ON CONFLICT DO UPDATE
- Upsert nativo de PostgreSQL
- Evita duplicados
- Actualiza datos existentes
Límites Actuales
| Métrica | Valor | Límite |
|---|---|---|
| Rate limit (free) | 5 req/min | API |
| Batch size | 10,000 rows | Configurable |
| DB connections | 5-20 | Pool |
| Concurrent syncs | 1 per timeframe | Scheduler |
| Max backfill | 365 días | Configurable |
Propuestas de Mejora
-
Redis Cache
- Cache de símbolos frecuentes
- Reduce queries a DB
- TTL configurable
-
Task Queue
- Celery o RQ
- Syncs asíncronos largos
- Retry automático
-
Multiple Workers
- Paralelización de syncs
- Mayor throughput
- Load balancing
-
Table Partitioning
- Partition por fecha
- Mejora performance de queries
- Mantenimiento más fácil
Monitoreo y Observabilidad
Logs
Niveles configurados:
- DEBUG: Detalles de cada request
- INFO: Operaciones normales
- WARNING: Rate limits, retries
- ERROR: Fallos de sync, API errors
Formato:
2024-12-08 20:15:30 - sync_service - INFO - Starting sync for EURUSD (forex) - 5min
2024-12-08 20:15:31 - polygon_client - DEBUG - Rate limit check: 2/5 requests
2024-12-08 20:15:32 - sync_service - INFO - Synced 288 bars for EURUSD
2024-12-08 20:15:33 - sync_service - INFO - Sync completed: success
Métricas Propuestas
Prometheus metrics:
sync_duration_seconds- Duración de cada syncsync_rows_inserted_total- Total de rows insertadossync_errors_total- Total de erroresapi_requests_total- Requests a Polygon APIrate_limit_waits_total- Veces que se esperó por rate limit
Health Checks
Endpoints:
/health- Health general del servicio/api/sync/health- Health del sync service/scheduler/status- Estado del scheduler
Seguridad
API Keys
- Nunca en código fuente
- Solo en variables de entorno
- .env en .gitignore
- Rotación periódica recomendada
Database
- Conexiones autenticadas
- Usuario con permisos limitados
- SSL recomendado en producción
Rate Limiting
- Protección contra abuse
- Límites configurables
- Logging de excesos
Input Validation
- Pydantic models para requests
- Validación de símbolos soportados
- Sanitización de parámetros
Deployment
Desarrollo
# Local
python src/app.py
# Con reload
uvicorn src.app:app --reload --port 8001
Producción
# Con Gunicorn + Uvicorn workers
gunicorn src.app:app \
-w 4 \
-k uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8001 \
--log-level info
# Con systemd
systemctl start trading-data-service
Docker
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY src/ ./src/
COPY migrations/ ./migrations/
ENV PYTHONPATH=/app/src
CMD ["uvicorn", "src.app:app", "--host", "0.0.0.0", "--port", "8001"]
Conclusión
Esta arquitectura proporciona:
✅ Separación clara de responsabilidades ✅ Escalabilidad horizontal y vertical ✅ Mantenibilidad y extensibilidad ✅ Observabilidad completa ✅ Alta disponibilidad ✅ Performance optimizado
Status: ✅ Producción Ready
Última actualización: 2024-12-08 Versión: 2.0.0