ML Engine Updates: - Updated BTCUSD with Polygon API data (2024-2025): 215,699 new records - Re-trained all ML models: Attention (R²: 0.223), Base, Metamodel (87.3% confidence) - Backtest results: +176.71R profit with aggressive_filter strategy Documentation Consolidation: - Created docs/99-analisis/_MAP.md index with 13 new analysis documents - Consolidated inventories: removed duplicates from orchestration/inventarios/ - Updated ML_INVENTORY.yml with BTCUSD metrics and training results - Added execution reports: FASE11-BTCUSD, correction issues, alignment validation Architecture & Integration: - Updated all module documentation with NEXUS v3.4 frontmatter - Fixed _MAP.md indexes across all folders - Updated orchestration plans and traces Files: 229 changed, 5064 insertions(+), 1872 deletions(-) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
975 lines
35 KiB
Markdown
975 lines
35 KiB
Markdown
---
|
|
id: "INT-DATA-003"
|
|
title: "Batch de Actualizacion de Activos con Priorizacion"
|
|
type: "Integration Specification"
|
|
project: "trading-platform"
|
|
version: "1.0.0"
|
|
status: "Planificado"
|
|
priority: "Alta"
|
|
created_date: "2026-01-04"
|
|
updated_date: "2026-01-04"
|
|
author: "Orquestador Agent"
|
|
---
|
|
|
|
# INT-DATA-003: Proceso Batch de Actualizacion de Activos con Priorizacion
|
|
|
|
## Metadata
|
|
|
|
| Campo | Valor |
|
|
|-------|-------|
|
|
| **ID** | INT-DATA-003 |
|
|
| **Modulo** | Data Service |
|
|
| **Tipo** | Especificacion de Integracion |
|
|
| **Version** | 1.0.0 |
|
|
| **Estado** | Planificado |
|
|
| **Fecha creacion** | 2026-01-04 |
|
|
| **Ultima actualizacion** | 2026-01-04 |
|
|
| **Autor** | Orquestador Agent |
|
|
|
|
---
|
|
|
|
## 1. Resumen Ejecutivo
|
|
|
|
Este documento especifica la implementacion de un **proceso batch de actualizacion de datos de activos financieros** desde la API de Polygon.io/Massive.com hacia la base de datos PostgreSQL del proyecto trading-platform.
|
|
|
|
### Caracteristicas Principales:
|
|
- **Ejecucion cada 5 minutos** - Proceso batch programado
|
|
- **Priorizacion de activos** - Oro (XAU), EURUSD y Bitcoin se actualizan primero
|
|
- **Rate limiting** - 5 llamadas API por minuto (cuenta gratuita)
|
|
- **Sistema de colas** - Activos no prioritarios se encolan para actualizacion diferida
|
|
|
|
---
|
|
|
|
## 2. Contexto del Requerimiento
|
|
|
|
### 2.1 Situacion Actual
|
|
|
|
El proyecto ya cuenta con:
|
|
- `PolygonClient` en `apps/data-service/src/providers/polygon_client.py`
|
|
- `DataSyncScheduler` en `apps/data-service/src/services/scheduler.py`
|
|
- Tabla `trading.symbols` para catalogo de activos
|
|
- Infraestructura de sync incremental
|
|
|
|
### 2.2 Necesidad
|
|
|
|
Se requiere:
|
|
1. Actualizar datos de activos respetando el rate limit de 5 calls/min
|
|
2. Priorizar activos criticos (XAU, EURUSD, BTCUSD) en cada ciclo
|
|
3. Encolar activos secundarios para actualizacion gradual
|
|
4. Mantener datos frescos para el ML Engine
|
|
|
|
### 2.3 Restricciones
|
|
|
|
| Restriccion | Valor | Nota |
|
|
|-------------|-------|------|
|
|
| Rate Limit API | 5 calls/min | Cuenta gratuita |
|
|
| Intervalo Batch | 5 minutos | Configurable |
|
|
| Activos Prioritarios | 3 | XAU, EURUSD, BTCUSD |
|
|
| Tiempo Maximo Ciclo | 60 segundos | 5 calls * 12s spacing |
|
|
|
|
---
|
|
|
|
## 3. Arquitectura de la Solucion
|
|
|
|
### 3.1 Diagrama de Componentes
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────────────────┐
|
|
│ BATCH ASSET UPDATE SYSTEM │
|
|
│ │
|
|
│ ┌────────────────────────────────────────────────────────────────────────┐ │
|
|
│ │ SCHEDULER LAYER (APScheduler) │ │
|
|
│ │ │ │
|
|
│ │ ┌─────────────────────┐ ┌─────────────────────────────────────┐ │ │
|
|
│ │ │ PriorityBatchJob │ │ QueueProcessorJob │ │ │
|
|
│ │ │ Every 5 minutes │ │ Continuous │ │ │
|
|
│ │ │ │ │ │ │ │
|
|
│ │ │ 1. XAU/USD │ │ Process queued assets │ │ │
|
|
│ │ │ 2. EUR/USD │ │ respecting rate limit │ │ │
|
|
│ │ │ 3. BTC/USD │ │ │ │ │
|
|
│ │ └──────────┬──────────┘ └──────────────┬──────────────────────┘ │ │
|
|
│ └──────────────┼───────────────────────────────┼────────────────────────┘ │
|
|
│ │ │ │
|
|
│ ┌──────────────▼───────────────────────────────▼────────────────────────┐ │
|
|
│ │ SERVICE LAYER │ │
|
|
│ │ │ │
|
|
│ │ ┌─────────────────────┐ ┌─────────────────────────────────────┐ │ │
|
|
│ │ │ PriorityQueueService│ │ AssetUpdateService │ │ │
|
|
│ │ │ │ │ │ │ │
|
|
│ │ │ • Priority Queue │ │ • Fetch from API │ │ │
|
|
│ │ │ • FIFO for others │ │ • Update database │ │ │
|
|
│ │ │ • Deduplication │ │ • Emit events │ │ │
|
|
│ │ └──────────┬──────────┘ └──────────────┬──────────────────────┘ │ │
|
|
│ └──────────────┼───────────────────────────────┼────────────────────────┘ │
|
|
│ │ │ │
|
|
│ ┌──────────────▼───────────────────────────────▼────────────────────────┐ │
|
|
│ │ PROVIDER LAYER │ │
|
|
│ │ │ │
|
|
│ │ ┌─────────────────────┐ ┌─────────────────────────────────────┐ │ │
|
|
│ │ │ RateLimiter │────▶│ PolygonClient │ │ │
|
|
│ │ │ 5 req/min │ │ (existing) │ │ │
|
|
│ │ └─────────────────────┘ └─────────────────────────────────────┘ │ │
|
|
│ └───────────────────────────────────────────────────────────────────────┘ │
|
|
│ │ │
|
|
└────────────────────────────────────┼────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────────────────────┐
|
|
│ EXTERNAL SYSTEMS │
|
|
│ │
|
|
│ ┌────────────────────────┐ ┌───────────────────────────────────────┐ │
|
|
│ │ Polygon.io API │ │ PostgreSQL │ │
|
|
│ │ api.polygon.io │ │ trading.symbols │ │
|
|
│ │ │ │ market_data.ohlcv_5m │ │
|
|
│ │ Rate: 5 calls/min │ │ data_sources.sync_status │ │
|
|
│ └────────────────────────┘ └───────────────────────────────────────┘ │
|
|
└─────────────────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
### 3.2 Flujo de Ejecucion
|
|
|
|
```
|
|
Cada 5 minutos
|
|
│
|
|
▼
|
|
┌──────────────────────┐
|
|
│ Batch Job Trigger │
|
|
└──────────┬───────────┘
|
|
│
|
|
┌─────────────┴─────────────┐
|
|
│ │
|
|
▼ ▼
|
|
┌───────────────────────┐ ┌───────────────────────┐
|
|
│ Priority Assets │ │ Enqueue Non-Priority │
|
|
│ XAU, EURUSD, BTCUSD │ │ to Update Queue │
|
|
└───────────┬───────────┘ └───────────┬───────────┘
|
|
│ │
|
|
▼ ▼
|
|
┌───────────────────────┐ ┌───────────────────────┐
|
|
│ Rate Limited Fetch │ │ Background Processor │
|
|
│ (3 calls, 12s each) │ │ (2 calls remaining) │
|
|
└───────────┬───────────┘ └───────────┬───────────┘
|
|
│ │
|
|
▼ ▼
|
|
┌───────────────────────┐ ┌───────────────────────┐
|
|
│ Update Database │ │ Process Queue Items │
|
|
│ trading.symbols │ │ (gradual sync) │
|
|
└───────────┬───────────┘ └───────────┬───────────┘
|
|
│ │
|
|
└─────────────┬─────────────┘
|
|
│
|
|
▼
|
|
┌───────────────────────┐
|
|
│ Emit Update Events │
|
|
│ via Redis Pub/Sub │
|
|
└───────────────────────┘
|
|
```
|
|
|
|
---
|
|
|
|
## 4. Especificacion Tecnica
|
|
|
|
### 4.1 Configuracion de Activos Prioritarios
|
|
|
|
```python
|
|
# config/priority_assets.py
|
|
|
|
PRIORITY_ASSETS = [
|
|
{
|
|
"symbol": "XAUUSD",
|
|
"polygon_ticker": "C:XAUUSD",
|
|
"asset_type": "forex", # Oro como commodity via forex
|
|
"name": "Gold Spot",
|
|
"priority": 1,
|
|
"update_frequency_seconds": 300 # 5 min
|
|
},
|
|
{
|
|
"symbol": "EURUSD",
|
|
"polygon_ticker": "C:EURUSD",
|
|
"asset_type": "forex",
|
|
"name": "Euro/US Dollar",
|
|
"priority": 2,
|
|
"update_frequency_seconds": 300
|
|
},
|
|
{
|
|
"symbol": "BTCUSD",
|
|
"polygon_ticker": "X:BTCUSD",
|
|
"asset_type": "crypto",
|
|
"name": "Bitcoin/US Dollar",
|
|
"priority": 3,
|
|
"update_frequency_seconds": 300
|
|
}
|
|
]
|
|
|
|
# Activos secundarios (se encolan)
|
|
SECONDARY_ASSETS = [
|
|
{"symbol": "ETHUSDT", "polygon_ticker": "X:ETHUSDT", "asset_type": "crypto"},
|
|
{"symbol": "GBPUSD", "polygon_ticker": "C:GBPUSD", "asset_type": "forex"},
|
|
{"symbol": "USDJPY", "polygon_ticker": "C:USDJPY", "asset_type": "forex"},
|
|
{"symbol": "XAGUSD", "polygon_ticker": "C:XAGUSD", "asset_type": "forex"}, # Plata
|
|
# ... mas activos
|
|
]
|
|
```
|
|
|
|
### 4.2 Servicio de Cola de Prioridad
|
|
|
|
```python
|
|
# services/priority_queue_service.py
|
|
|
|
import asyncio
|
|
from datetime import datetime
|
|
from typing import Optional, List, Dict
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
import heapq
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AssetPriority(Enum):
|
|
CRITICAL = 1 # XAU, EURUSD, BTC
|
|
HIGH = 2 # Major forex pairs
|
|
MEDIUM = 3 # Other crypto
|
|
LOW = 4 # Indices, commodities
|
|
|
|
|
|
@dataclass(order=True)
|
|
class QueuedAsset:
|
|
"""Asset in the update queue"""
|
|
priority: int
|
|
enqueued_at: datetime = field(compare=False)
|
|
symbol: str = field(compare=False)
|
|
polygon_ticker: str = field(compare=False)
|
|
asset_type: str = field(compare=False)
|
|
retry_count: int = field(default=0, compare=False)
|
|
|
|
|
|
class PriorityQueueService:
|
|
"""
|
|
Manages priority queue for asset updates.
|
|
|
|
Priority levels:
|
|
1 - Critical (XAU, EURUSD, BTC) - Always updated first
|
|
2 - High - Updated when slots available
|
|
3 - Medium - Gradual updates
|
|
4 - Low - Best effort
|
|
"""
|
|
|
|
def __init__(self, max_queue_size: int = 1000):
|
|
self._queue: List[QueuedAsset] = []
|
|
self._in_queue: set = set()
|
|
self.max_queue_size = max_queue_size
|
|
self._lock = asyncio.Lock()
|
|
|
|
async def enqueue(
|
|
self,
|
|
symbol: str,
|
|
polygon_ticker: str,
|
|
asset_type: str,
|
|
priority: AssetPriority = AssetPriority.MEDIUM
|
|
) -> bool:
|
|
"""Add asset to update queue if not already present"""
|
|
async with self._lock:
|
|
if symbol in self._in_queue:
|
|
logger.debug(f"Asset {symbol} already in queue, skipping")
|
|
return False
|
|
|
|
if len(self._queue) >= self.max_queue_size:
|
|
logger.warning(f"Queue full, dropping {symbol}")
|
|
return False
|
|
|
|
item = QueuedAsset(
|
|
priority=priority.value,
|
|
enqueued_at=datetime.utcnow(),
|
|
symbol=symbol,
|
|
polygon_ticker=polygon_ticker,
|
|
asset_type=asset_type
|
|
)
|
|
|
|
heapq.heappush(self._queue, item)
|
|
self._in_queue.add(symbol)
|
|
|
|
logger.debug(f"Enqueued {symbol} with priority {priority.name}")
|
|
return True
|
|
|
|
async def dequeue(self) -> Optional[QueuedAsset]:
|
|
"""Get next asset to update (highest priority first)"""
|
|
async with self._lock:
|
|
if not self._queue:
|
|
return None
|
|
|
|
item = heapq.heappop(self._queue)
|
|
self._in_queue.discard(item.symbol)
|
|
return item
|
|
|
|
async def peek(self) -> Optional[QueuedAsset]:
|
|
"""View next item without removing"""
|
|
async with self._lock:
|
|
return self._queue[0] if self._queue else None
|
|
|
|
@property
|
|
def size(self) -> int:
|
|
return len(self._queue)
|
|
|
|
@property
|
|
def is_empty(self) -> bool:
|
|
return len(self._queue) == 0
|
|
|
|
async def get_queue_stats(self) -> Dict:
|
|
"""Get queue statistics"""
|
|
async with self._lock:
|
|
priority_counts = {p.name: 0 for p in AssetPriority}
|
|
for item in self._queue:
|
|
priority_name = AssetPriority(item.priority).name
|
|
priority_counts[priority_name] += 1
|
|
|
|
return {
|
|
"total_items": len(self._queue),
|
|
"by_priority": priority_counts,
|
|
"oldest_item_age_seconds": (
|
|
(datetime.utcnow() - self._queue[0].enqueued_at).total_seconds()
|
|
if self._queue else 0
|
|
)
|
|
}
|
|
```
|
|
|
|
### 4.3 Servicio de Actualizacion de Activos
|
|
|
|
```python
|
|
# services/asset_update_service.py
|
|
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, List, Any
|
|
import logging
|
|
|
|
from providers.polygon_client import PolygonClient, AssetType, TickerSnapshot
|
|
from config.priority_assets import PRIORITY_ASSETS, SECONDARY_ASSETS
|
|
from services.priority_queue_service import PriorityQueueService, AssetPriority
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AssetUpdateService:
|
|
"""
|
|
Service for updating asset data from Polygon API.
|
|
|
|
Handles:
|
|
- Rate limiting (5 calls/min)
|
|
- Priority-based updates
|
|
- Database synchronization
|
|
- Error handling and retries
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
polygon_client: PolygonClient,
|
|
db_pool,
|
|
redis_client,
|
|
priority_queue: PriorityQueueService
|
|
):
|
|
self.polygon = polygon_client
|
|
self.db = db_pool
|
|
self.redis = redis_client
|
|
self.queue = priority_queue
|
|
|
|
# Rate limiting state
|
|
self._calls_this_minute = 0
|
|
self._minute_start = datetime.utcnow()
|
|
self._rate_limit = 5
|
|
|
|
async def _wait_for_rate_limit(self):
|
|
"""Wait if rate limit reached"""
|
|
now = datetime.utcnow()
|
|
|
|
# Reset counter each minute
|
|
if (now - self._minute_start).total_seconds() >= 60:
|
|
self._calls_this_minute = 0
|
|
self._minute_start = now
|
|
|
|
# Wait if limit reached
|
|
if self._calls_this_minute >= self._rate_limit:
|
|
wait_time = 60 - (now - self._minute_start).total_seconds()
|
|
if wait_time > 0:
|
|
logger.info(f"Rate limit reached, waiting {wait_time:.1f}s")
|
|
await asyncio.sleep(wait_time)
|
|
self._calls_this_minute = 0
|
|
self._minute_start = datetime.utcnow()
|
|
|
|
self._calls_this_minute += 1
|
|
|
|
async def update_priority_assets(self) -> Dict[str, Any]:
|
|
"""
|
|
Update all priority assets (XAU, EURUSD, BTC).
|
|
Uses 3 of 5 available API calls.
|
|
"""
|
|
results = {
|
|
"updated": [],
|
|
"failed": [],
|
|
"api_calls_used": 0
|
|
}
|
|
|
|
for asset in PRIORITY_ASSETS:
|
|
try:
|
|
await self._wait_for_rate_limit()
|
|
|
|
# Fetch snapshot from Polygon
|
|
snapshot = await self._fetch_asset_snapshot(
|
|
asset["polygon_ticker"],
|
|
asset["asset_type"]
|
|
)
|
|
|
|
if snapshot:
|
|
# Update database
|
|
await self._update_asset_in_db(asset, snapshot)
|
|
|
|
# Publish update event
|
|
await self._publish_update_event(asset["symbol"], snapshot)
|
|
|
|
results["updated"].append(asset["symbol"])
|
|
else:
|
|
results["failed"].append({
|
|
"symbol": asset["symbol"],
|
|
"error": "No data returned"
|
|
})
|
|
|
|
results["api_calls_used"] += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating {asset['symbol']}: {e}")
|
|
results["failed"].append({
|
|
"symbol": asset["symbol"],
|
|
"error": str(e)
|
|
})
|
|
|
|
return results
|
|
|
|
async def _fetch_asset_snapshot(
|
|
self,
|
|
polygon_ticker: str,
|
|
asset_type: str
|
|
) -> Optional[TickerSnapshot]:
|
|
"""Fetch current snapshot from Polygon API"""
|
|
try:
|
|
if asset_type == "forex":
|
|
return await self.polygon.get_snapshot_forex(polygon_ticker)
|
|
elif asset_type == "crypto":
|
|
return await self.polygon.get_snapshot_crypto(polygon_ticker)
|
|
else:
|
|
logger.warning(f"Unknown asset type: {asset_type}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error fetching {polygon_ticker}: {e}")
|
|
return None
|
|
|
|
async def _update_asset_in_db(
|
|
self,
|
|
asset: Dict,
|
|
snapshot: TickerSnapshot
|
|
):
|
|
"""Update asset data in PostgreSQL"""
|
|
async with self.db.acquire() as conn:
|
|
# Update trading.symbols with latest price info
|
|
await conn.execute("""
|
|
UPDATE trading.symbols
|
|
SET
|
|
updated_at = NOW(),
|
|
-- Store latest prices in a JSONB column if exists
|
|
metadata = jsonb_set(
|
|
COALESCE(metadata, '{}'),
|
|
'{last_update}',
|
|
$2::jsonb
|
|
)
|
|
WHERE symbol = $1
|
|
""", asset["symbol"], {
|
|
"bid": snapshot.bid,
|
|
"ask": snapshot.ask,
|
|
"last_price": snapshot.last_price,
|
|
"daily_open": snapshot.daily_open,
|
|
"daily_high": snapshot.daily_high,
|
|
"daily_low": snapshot.daily_low,
|
|
"daily_close": snapshot.daily_close,
|
|
"timestamp": snapshot.timestamp.isoformat()
|
|
})
|
|
|
|
# Update sync status
|
|
await conn.execute("""
|
|
INSERT INTO data_sources.data_sync_status
|
|
(ticker_id, provider_id, last_sync_timestamp, sync_status, updated_at)
|
|
SELECT
|
|
s.id,
|
|
(SELECT id FROM data_sources.api_providers WHERE code = 'polygon'),
|
|
NOW(),
|
|
'success',
|
|
NOW()
|
|
FROM trading.symbols s
|
|
WHERE s.symbol = $1
|
|
ON CONFLICT (ticker_id, provider_id)
|
|
DO UPDATE SET
|
|
last_sync_timestamp = NOW(),
|
|
sync_status = 'success',
|
|
updated_at = NOW()
|
|
""", asset["symbol"])
|
|
|
|
async def _publish_update_event(
|
|
self,
|
|
symbol: str,
|
|
snapshot: TickerSnapshot
|
|
):
|
|
"""Publish update event via Redis for real-time consumers"""
|
|
channel = f"asset:update:{symbol}"
|
|
message = {
|
|
"symbol": symbol,
|
|
"bid": snapshot.bid,
|
|
"ask": snapshot.ask,
|
|
"last_price": snapshot.last_price,
|
|
"timestamp": snapshot.timestamp.isoformat()
|
|
}
|
|
|
|
await self.redis.publish(channel, str(message))
|
|
|
|
async def process_queued_asset(self) -> Optional[Dict]:
|
|
"""
|
|
Process next asset from queue.
|
|
Uses remaining API calls (2 of 5 per cycle).
|
|
"""
|
|
item = await self.queue.dequeue()
|
|
if not item:
|
|
return None
|
|
|
|
try:
|
|
await self._wait_for_rate_limit()
|
|
|
|
snapshot = await self._fetch_asset_snapshot(
|
|
item.polygon_ticker,
|
|
item.asset_type
|
|
)
|
|
|
|
if snapshot:
|
|
asset = {
|
|
"symbol": item.symbol,
|
|
"polygon_ticker": item.polygon_ticker,
|
|
"asset_type": item.asset_type
|
|
}
|
|
await self._update_asset_in_db(asset, snapshot)
|
|
await self._publish_update_event(item.symbol, snapshot)
|
|
|
|
return {"symbol": item.symbol, "status": "success"}
|
|
else:
|
|
# Re-enqueue with lower priority on failure
|
|
if item.retry_count < 3:
|
|
item.retry_count += 1
|
|
await self.queue.enqueue(
|
|
item.symbol,
|
|
item.polygon_ticker,
|
|
item.asset_type,
|
|
AssetPriority.LOW
|
|
)
|
|
return {"symbol": item.symbol, "status": "failed", "requeued": True}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing queued asset {item.symbol}: {e}")
|
|
return {"symbol": item.symbol, "status": "error", "error": str(e)}
|
|
|
|
async def enqueue_secondary_assets(self):
|
|
"""Enqueue all secondary assets for gradual update"""
|
|
for asset in SECONDARY_ASSETS:
|
|
await self.queue.enqueue(
|
|
symbol=asset["symbol"],
|
|
polygon_ticker=asset["polygon_ticker"],
|
|
asset_type=asset["asset_type"],
|
|
priority=AssetPriority.MEDIUM
|
|
)
|
|
|
|
logger.info(f"Enqueued {len(SECONDARY_ASSETS)} secondary assets")
|
|
```
|
|
|
|
### 4.4 Job de Batch Programado
|
|
|
|
```python
|
|
# services/batch_scheduler.py
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AssetBatchScheduler:
|
|
"""
|
|
Scheduler for batch asset updates.
|
|
|
|
Schedule:
|
|
- Every 5 minutes: Update priority assets (XAU, EURUSD, BTC)
|
|
- Continuous: Process queued secondary assets
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
asset_update_service,
|
|
batch_interval_minutes: int = 5
|
|
):
|
|
self.update_service = asset_update_service
|
|
self.batch_interval = batch_interval_minutes
|
|
self.scheduler = AsyncIOScheduler()
|
|
self._is_running = False
|
|
|
|
async def start(self):
|
|
"""Start the batch scheduler"""
|
|
if self._is_running:
|
|
logger.warning("Scheduler already running")
|
|
return
|
|
|
|
logger.info("Starting Asset Batch Scheduler")
|
|
|
|
# Priority assets job - every 5 minutes
|
|
self.scheduler.add_job(
|
|
self._priority_batch_job,
|
|
trigger=IntervalTrigger(minutes=self.batch_interval),
|
|
id="priority_assets_batch",
|
|
name="Update Priority Assets (XAU, EURUSD, BTC)",
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
next_run_time=datetime.now() # Run immediately on start
|
|
)
|
|
|
|
# Queue processor job - every 15 seconds
|
|
self.scheduler.add_job(
|
|
self._queue_processor_job,
|
|
trigger=IntervalTrigger(seconds=15),
|
|
id="queue_processor",
|
|
name="Process Queued Secondary Assets",
|
|
replace_existing=True,
|
|
max_instances=1
|
|
)
|
|
|
|
# Enqueue secondary assets job - every 30 minutes
|
|
self.scheduler.add_job(
|
|
self._enqueue_secondary_job,
|
|
trigger=IntervalTrigger(minutes=30),
|
|
id="enqueue_secondary",
|
|
name="Enqueue Secondary Assets",
|
|
replace_existing=True,
|
|
max_instances=1
|
|
)
|
|
|
|
self.scheduler.start()
|
|
self._is_running = True
|
|
|
|
logger.info(f"Scheduler started with {len(self.scheduler.get_jobs())} jobs")
|
|
|
|
async def _priority_batch_job(self):
|
|
"""Job: Update priority assets"""
|
|
logger.info("=== Priority Batch Job Started ===")
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
result = await self.update_service.update_priority_assets()
|
|
|
|
elapsed = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
logger.info(
|
|
f"Priority batch completed in {elapsed:.1f}s: "
|
|
f"{len(result['updated'])} updated, "
|
|
f"{len(result['failed'])} failed, "
|
|
f"{result['api_calls_used']} API calls"
|
|
)
|
|
|
|
# Log details
|
|
for symbol in result['updated']:
|
|
logger.debug(f" [OK] {symbol}")
|
|
for fail in result['failed']:
|
|
logger.warning(f" [FAIL] {fail['symbol']}: {fail['error']}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Priority batch job failed: {e}", exc_info=True)
|
|
|
|
async def _queue_processor_job(self):
|
|
"""Job: Process queued secondary assets"""
|
|
# Only process if we have API calls available
|
|
# Since priority uses 3, we have 2 remaining per minute
|
|
try:
|
|
for _ in range(2): # Process up to 2 items per cycle
|
|
result = await self.update_service.process_queued_asset()
|
|
if not result:
|
|
break # Queue empty
|
|
|
|
if result['status'] == 'success':
|
|
logger.debug(f"Queue: Updated {result['symbol']}")
|
|
else:
|
|
logger.debug(f"Queue: Failed {result['symbol']}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Queue processor job failed: {e}")
|
|
|
|
async def _enqueue_secondary_job(self):
|
|
"""Job: Enqueue secondary assets for update"""
|
|
try:
|
|
await self.update_service.enqueue_secondary_assets()
|
|
except Exception as e:
|
|
logger.error(f"Enqueue secondary job failed: {e}")
|
|
|
|
async def stop(self):
|
|
"""Stop the scheduler"""
|
|
if self._is_running:
|
|
self.scheduler.shutdown(wait=True)
|
|
self._is_running = False
|
|
logger.info("Scheduler stopped")
|
|
|
|
def get_job_status(self) -> list:
|
|
"""Get status of all scheduled jobs"""
|
|
return [
|
|
{
|
|
"id": job.id,
|
|
"name": job.name,
|
|
"next_run": job.next_run_time.isoformat() if job.next_run_time else None,
|
|
"trigger": str(job.trigger)
|
|
}
|
|
for job in self.scheduler.get_jobs()
|
|
]
|
|
```
|
|
|
|
---
|
|
|
|
## 5. Configuracion
|
|
|
|
### 5.1 Variables de Entorno
|
|
|
|
```bash
|
|
# .env - Data Service Configuration
|
|
|
|
# Polygon.io API (Massive.com compatible)
|
|
POLYGON_API_KEY=f09bA2V7OG7bHn4HxIT6Xs45ujg_pRXk
|
|
POLYGON_BASE_URL=https://api.polygon.io
|
|
POLYGON_RATE_LIMIT=5
|
|
POLYGON_TIER=free
|
|
|
|
# Batch Configuration
|
|
BATCH_INTERVAL_MINUTES=5
|
|
PRIORITY_ASSETS_ENABLED=true
|
|
QUEUE_MAX_SIZE=1000
|
|
|
|
# Database
|
|
DB_HOST=localhost
|
|
DB_PORT=5432
|
|
DB_NAME=trading_data
|
|
DB_USER=trading_user
|
|
DB_PASSWORD=trading_dev_2025
|
|
|
|
# Redis (for queue and events)
|
|
REDIS_URL=redis://localhost:6379/0
|
|
```
|
|
|
|
### 5.2 Configuracion de Produccion
|
|
|
|
```yaml
|
|
# config/production.yml
|
|
|
|
batch:
|
|
interval_minutes: 5
|
|
priority_assets:
|
|
- symbol: XAUUSD
|
|
enabled: true
|
|
priority: 1
|
|
- symbol: EURUSD
|
|
enabled: true
|
|
priority: 2
|
|
- symbol: BTCUSD
|
|
enabled: true
|
|
priority: 3
|
|
|
|
rate_limiting:
|
|
calls_per_minute: 5
|
|
tier: free
|
|
upgrade_url: https://polygon.io/pricing
|
|
|
|
queue:
|
|
max_size: 1000
|
|
retry_max_attempts: 3
|
|
retry_delay_seconds: 60
|
|
```
|
|
|
|
---
|
|
|
|
## 6. Plan de Migracion a Cuenta de Pago
|
|
|
|
### 6.1 Cuando Migrar
|
|
|
|
Se recomienda migrar a cuenta de pago cuando:
|
|
- Se requieran datos en tiempo real (< 15 min delay)
|
|
- Se necesiten mas de 5 activos actualizados por minuto
|
|
- Se requiera soporte WebSocket para streaming
|
|
|
|
### 6.2 Planes Disponibles
|
|
|
|
| Plan | Rate Limit | Precio | Recomendado Para |
|
|
|------|------------|--------|------------------|
|
|
| Free | 5/min | $0 | Desarrollo, testing |
|
|
| Starter | Unlimited | $47/mo | Produccion basica |
|
|
| Developer | Unlimited | $99/mo | Produccion avanzada |
|
|
| Advanced | Unlimited | $199/mo | Tiempo real |
|
|
|
|
### 6.3 Cambios en Codigo para Plan de Pago
|
|
|
|
```python
|
|
# config/polygon_config.py
|
|
|
|
POLYGON_CONFIGS = {
|
|
"free": {
|
|
"rate_limit": 5,
|
|
"delay_minutes": 15,
|
|
"websocket_enabled": False
|
|
},
|
|
"starter": {
|
|
"rate_limit": None, # Unlimited
|
|
"delay_minutes": 0,
|
|
"websocket_enabled": False
|
|
},
|
|
"developer": {
|
|
"rate_limit": None,
|
|
"delay_minutes": 0,
|
|
"websocket_enabled": True
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## 7. Testing
|
|
|
|
### 7.1 Tests Unitarios
|
|
|
|
```python
|
|
# tests/test_priority_queue.py
|
|
|
|
import pytest
|
|
from services.priority_queue_service import PriorityQueueService, AssetPriority
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_priority_ordering():
|
|
queue = PriorityQueueService()
|
|
|
|
await queue.enqueue("ETH", "X:ETH", "crypto", AssetPriority.LOW)
|
|
await queue.enqueue("XAU", "C:XAU", "forex", AssetPriority.CRITICAL)
|
|
await queue.enqueue("GBP", "C:GBP", "forex", AssetPriority.MEDIUM)
|
|
|
|
# Should dequeue by priority
|
|
item = await queue.dequeue()
|
|
assert item.symbol == "XAU"
|
|
|
|
item = await queue.dequeue()
|
|
assert item.symbol == "GBP"
|
|
|
|
item = await queue.dequeue()
|
|
assert item.symbol == "ETH"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_deduplication():
|
|
queue = PriorityQueueService()
|
|
|
|
result1 = await queue.enqueue("XAU", "C:XAU", "forex")
|
|
result2 = await queue.enqueue("XAU", "C:XAU", "forex")
|
|
|
|
assert result1 is True
|
|
assert result2 is False
|
|
assert queue.size == 1
|
|
```
|
|
|
|
### 7.2 Tests de Integracion
|
|
|
|
```python
|
|
# tests/test_batch_integration.py
|
|
|
|
import pytest
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_priority_batch_respects_rate_limit():
|
|
"""Verify batch job uses max 3 API calls for priority assets"""
|
|
with patch('providers.polygon_client.PolygonClient') as mock_client:
|
|
mock_client.get_snapshot_forex = AsyncMock()
|
|
mock_client.get_snapshot_crypto = AsyncMock()
|
|
|
|
service = AssetUpdateService(mock_client, None, None, None)
|
|
result = await service.update_priority_assets()
|
|
|
|
# Should only make 3 calls (XAU, EURUSD, BTC)
|
|
assert result['api_calls_used'] <= 3
|
|
```
|
|
|
|
---
|
|
|
|
## 8. Monitoreo
|
|
|
|
### 8.1 Metricas a Rastrear
|
|
|
|
| Metrica | Descripcion | Alerta |
|
|
|---------|-------------|--------|
|
|
| `batch.priority.success_rate` | % de actualizaciones exitosas | < 90% |
|
|
| `batch.priority.latency_ms` | Latencia promedio del batch | > 30000 |
|
|
| `queue.size` | Tamano de cola de secundarios | > 500 |
|
|
| `api.rate_limit_waits` | Veces que espero por rate limit | > 10/hora |
|
|
|
|
### 8.2 Endpoint de Health
|
|
|
|
```python
|
|
# api/health.py
|
|
|
|
@router.get("/health/batch")
|
|
async def batch_health(scheduler: AssetBatchScheduler):
|
|
jobs = scheduler.get_job_status()
|
|
queue_stats = await scheduler.update_service.queue.get_queue_stats()
|
|
|
|
return {
|
|
"status": "healthy",
|
|
"jobs": jobs,
|
|
"queue": queue_stats,
|
|
"last_priority_update": await get_last_priority_update(),
|
|
"api_calls_remaining": await get_api_calls_remaining()
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## 9. Trazabilidad
|
|
|
|
### 9.1 Documentos Relacionados
|
|
|
|
| Documento | Tipo | Relacion |
|
|
|-----------|------|----------|
|
|
| [INT-DATA-001-data-service.md](./INT-DATA-001-data-service.md) | Integracion | Base |
|
|
| [RF-DATA-001-sincronizacion-batch.md](../RF-DATA-001-sincronizacion-batch.md) | Requerimiento | Implementa |
|
|
| [ET-DATA-001-arquitectura-batch.md](../ET-DATA-001-arquitectura-batch.md) | Especificacion | Detalla |
|
|
|
|
### 9.2 Archivos de Codigo
|
|
|
|
| Archivo | Proposito |
|
|
|---------|-----------|
|
|
| `apps/data-service/src/config/priority_assets.py` | Configuracion de activos |
|
|
| `apps/data-service/src/services/priority_queue_service.py` | Cola de prioridad |
|
|
| `apps/data-service/src/services/asset_update_service.py` | Servicio de actualizacion |
|
|
| `apps/data-service/src/services/batch_scheduler.py` | Scheduler del batch |
|
|
|
|
---
|
|
|
|
## 10. Historial de Cambios
|
|
|
|
| Version | Fecha | Autor | Cambios |
|
|
|---------|-------|-------|---------|
|
|
| 1.0.0 | 2026-01-04 | Orquestador Agent | Creacion inicial |
|
|
|
|
---
|
|
|
|
**Estado de Implementacion:**
|
|
|
|
| Aspecto | Estado | Notas |
|
|
|---------|--------|-------|
|
|
| Documentacion | ✅ | Este documento |
|
|
| Diseno tecnico | ✅ | Arquitectura definida |
|
|
| Codigo Python | ⏳ | Pendiente |
|
|
| Tests | ⏳ | Pendiente |
|
|
| Integracion | ⏳ | Pendiente |
|