trading-platform/docs/90-transversal/integraciones/INT-DATA-003-batch-actualizacion-activos.md
rckrdmrd c1b5081208 feat(ml): Complete FASE 11 - BTCUSD update and comprehensive documentation alignment
ML Engine Updates:
- Updated BTCUSD with Polygon API data (2024-2025): 215,699 new records
- Re-trained all ML models: Attention (R²: 0.223), Base, Metamodel (87.3% confidence)
- Backtest results: +176.71R profit with aggressive_filter strategy

Documentation Consolidation:
- Created docs/99-analisis/_MAP.md index with 13 new analysis documents
- Consolidated inventories: removed duplicates from orchestration/inventarios/
- Updated ML_INVENTORY.yml with BTCUSD metrics and training results
- Added execution reports: FASE11-BTCUSD, correction issues, alignment validation

Architecture & Integration:
- Updated all module documentation with NEXUS v3.4 frontmatter
- Fixed _MAP.md indexes across all folders
- Updated orchestration plans and traces

Files: 229 changed, 5064 insertions(+), 1872 deletions(-)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-07 09:31:29 -06:00

975 lines
35 KiB
Markdown

---
id: "INT-DATA-003"
title: "Batch de Actualizacion de Activos con Priorizacion"
type: "Integration Specification"
project: "trading-platform"
version: "1.0.0"
status: "Planificado"
priority: "Alta"
created_date: "2026-01-04"
updated_date: "2026-01-04"
author: "Orquestador Agent"
---
# INT-DATA-003: Proceso Batch de Actualizacion de Activos con Priorizacion
## Metadata
| Campo | Valor |
|-------|-------|
| **ID** | INT-DATA-003 |
| **Modulo** | Data Service |
| **Tipo** | Especificacion de Integracion |
| **Version** | 1.0.0 |
| **Estado** | Planificado |
| **Fecha creacion** | 2026-01-04 |
| **Ultima actualizacion** | 2026-01-04 |
| **Autor** | Orquestador Agent |
---
## 1. Resumen Ejecutivo
Este documento especifica la implementacion de un **proceso batch de actualizacion de datos de activos financieros** desde la API de Polygon.io/Massive.com hacia la base de datos PostgreSQL del proyecto trading-platform.
### Caracteristicas Principales:
- **Ejecucion cada 5 minutos** - Proceso batch programado
- **Priorizacion de activos** - Oro (XAU), EURUSD y Bitcoin se actualizan primero
- **Rate limiting** - 5 llamadas API por minuto (cuenta gratuita)
- **Sistema de colas** - Activos no prioritarios se encolan para actualizacion diferida
---
## 2. Contexto del Requerimiento
### 2.1 Situacion Actual
El proyecto ya cuenta con:
- `PolygonClient` en `apps/data-service/src/providers/polygon_client.py`
- `DataSyncScheduler` en `apps/data-service/src/services/scheduler.py`
- Tabla `trading.symbols` para catalogo de activos
- Infraestructura de sync incremental
### 2.2 Necesidad
Se requiere:
1. Actualizar datos de activos respetando el rate limit de 5 calls/min
2. Priorizar activos criticos (XAU, EURUSD, BTCUSD) en cada ciclo
3. Encolar activos secundarios para actualizacion gradual
4. Mantener datos frescos para el ML Engine
### 2.3 Restricciones
| Restriccion | Valor | Nota |
|-------------|-------|------|
| Rate Limit API | 5 calls/min | Cuenta gratuita |
| Intervalo Batch | 5 minutos | Configurable |
| Activos Prioritarios | 3 | XAU, EURUSD, BTCUSD |
| Tiempo Maximo Ciclo | 60 segundos | 5 calls * 12s spacing |
---
## 3. Arquitectura de la Solucion
### 3.1 Diagrama de Componentes
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ BATCH ASSET UPDATE SYSTEM │
│ │
│ ┌────────────────────────────────────────────────────────────────────────┐ │
│ │ SCHEDULER LAYER (APScheduler) │ │
│ │ │ │
│ │ ┌─────────────────────┐ ┌─────────────────────────────────────┐ │ │
│ │ │ PriorityBatchJob │ │ QueueProcessorJob │ │ │
│ │ │ Every 5 minutes │ │ Continuous │ │ │
│ │ │ │ │ │ │ │
│ │ │ 1. XAU/USD │ │ Process queued assets │ │ │
│ │ │ 2. EUR/USD │ │ respecting rate limit │ │ │
│ │ │ 3. BTC/USD │ │ │ │ │
│ │ └──────────┬──────────┘ └──────────────┬──────────────────────┘ │ │
│ └──────────────┼───────────────────────────────┼────────────────────────┘ │
│ │ │ │
│ ┌──────────────▼───────────────────────────────▼────────────────────────┐ │
│ │ SERVICE LAYER │ │
│ │ │ │
│ │ ┌─────────────────────┐ ┌─────────────────────────────────────┐ │ │
│ │ │ PriorityQueueService│ │ AssetUpdateService │ │ │
│ │ │ │ │ │ │ │
│ │ │ • Priority Queue │ │ • Fetch from API │ │ │
│ │ │ • FIFO for others │ │ • Update database │ │ │
│ │ │ • Deduplication │ │ • Emit events │ │ │
│ │ └──────────┬──────────┘ └──────────────┬──────────────────────┘ │ │
│ └──────────────┼───────────────────────────────┼────────────────────────┘ │
│ │ │ │
│ ┌──────────────▼───────────────────────────────▼────────────────────────┐ │
│ │ PROVIDER LAYER │ │
│ │ │ │
│ │ ┌─────────────────────┐ ┌─────────────────────────────────────┐ │ │
│ │ │ RateLimiter │────▶│ PolygonClient │ │ │
│ │ │ 5 req/min │ │ (existing) │ │ │
│ │ └─────────────────────┘ └─────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │ │
└────────────────────────────────────┼────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ EXTERNAL SYSTEMS │
│ │
│ ┌────────────────────────┐ ┌───────────────────────────────────────┐ │
│ │ Polygon.io API │ │ PostgreSQL │ │
│ │ api.polygon.io │ │ trading.symbols │ │
│ │ │ │ market_data.ohlcv_5m │ │
│ │ Rate: 5 calls/min │ │ data_sources.sync_status │ │
│ └────────────────────────┘ └───────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
```
### 3.2 Flujo de Ejecucion
```
Cada 5 minutos
┌──────────────────────┐
│ Batch Job Trigger │
└──────────┬───────────┘
┌─────────────┴─────────────┐
│ │
▼ ▼
┌───────────────────────┐ ┌───────────────────────┐
│ Priority Assets │ │ Enqueue Non-Priority │
│ XAU, EURUSD, BTCUSD │ │ to Update Queue │
└───────────┬───────────┘ └───────────┬───────────┘
│ │
▼ ▼
┌───────────────────────┐ ┌───────────────────────┐
│ Rate Limited Fetch │ │ Background Processor │
│ (3 calls, 12s each) │ │ (2 calls remaining) │
└───────────┬───────────┘ └───────────┬───────────┘
│ │
▼ ▼
┌───────────────────────┐ ┌───────────────────────┐
│ Update Database │ │ Process Queue Items │
│ trading.symbols │ │ (gradual sync) │
└───────────┬───────────┘ └───────────┬───────────┘
│ │
└─────────────┬─────────────┘
┌───────────────────────┐
│ Emit Update Events │
│ via Redis Pub/Sub │
└───────────────────────┘
```
---
## 4. Especificacion Tecnica
### 4.1 Configuracion de Activos Prioritarios
```python
# config/priority_assets.py
PRIORITY_ASSETS = [
{
"symbol": "XAUUSD",
"polygon_ticker": "C:XAUUSD",
"asset_type": "forex", # Oro como commodity via forex
"name": "Gold Spot",
"priority": 1,
"update_frequency_seconds": 300 # 5 min
},
{
"symbol": "EURUSD",
"polygon_ticker": "C:EURUSD",
"asset_type": "forex",
"name": "Euro/US Dollar",
"priority": 2,
"update_frequency_seconds": 300
},
{
"symbol": "BTCUSD",
"polygon_ticker": "X:BTCUSD",
"asset_type": "crypto",
"name": "Bitcoin/US Dollar",
"priority": 3,
"update_frequency_seconds": 300
}
]
# Activos secundarios (se encolan)
SECONDARY_ASSETS = [
{"symbol": "ETHUSDT", "polygon_ticker": "X:ETHUSDT", "asset_type": "crypto"},
{"symbol": "GBPUSD", "polygon_ticker": "C:GBPUSD", "asset_type": "forex"},
{"symbol": "USDJPY", "polygon_ticker": "C:USDJPY", "asset_type": "forex"},
{"symbol": "XAGUSD", "polygon_ticker": "C:XAGUSD", "asset_type": "forex"}, # Plata
# ... mas activos
]
```
### 4.2 Servicio de Cola de Prioridad
```python
# services/priority_queue_service.py
import asyncio
from datetime import datetime
from typing import Optional, List, Dict
from dataclasses import dataclass, field
from enum import Enum
import heapq
import logging
logger = logging.getLogger(__name__)
class AssetPriority(Enum):
CRITICAL = 1 # XAU, EURUSD, BTC
HIGH = 2 # Major forex pairs
MEDIUM = 3 # Other crypto
LOW = 4 # Indices, commodities
@dataclass(order=True)
class QueuedAsset:
"""Asset in the update queue"""
priority: int
enqueued_at: datetime = field(compare=False)
symbol: str = field(compare=False)
polygon_ticker: str = field(compare=False)
asset_type: str = field(compare=False)
retry_count: int = field(default=0, compare=False)
class PriorityQueueService:
"""
Manages priority queue for asset updates.
Priority levels:
1 - Critical (XAU, EURUSD, BTC) - Always updated first
2 - High - Updated when slots available
3 - Medium - Gradual updates
4 - Low - Best effort
"""
def __init__(self, max_queue_size: int = 1000):
self._queue: List[QueuedAsset] = []
self._in_queue: set = set()
self.max_queue_size = max_queue_size
self._lock = asyncio.Lock()
async def enqueue(
self,
symbol: str,
polygon_ticker: str,
asset_type: str,
priority: AssetPriority = AssetPriority.MEDIUM
) -> bool:
"""Add asset to update queue if not already present"""
async with self._lock:
if symbol in self._in_queue:
logger.debug(f"Asset {symbol} already in queue, skipping")
return False
if len(self._queue) >= self.max_queue_size:
logger.warning(f"Queue full, dropping {symbol}")
return False
item = QueuedAsset(
priority=priority.value,
enqueued_at=datetime.utcnow(),
symbol=symbol,
polygon_ticker=polygon_ticker,
asset_type=asset_type
)
heapq.heappush(self._queue, item)
self._in_queue.add(symbol)
logger.debug(f"Enqueued {symbol} with priority {priority.name}")
return True
async def dequeue(self) -> Optional[QueuedAsset]:
"""Get next asset to update (highest priority first)"""
async with self._lock:
if not self._queue:
return None
item = heapq.heappop(self._queue)
self._in_queue.discard(item.symbol)
return item
async def peek(self) -> Optional[QueuedAsset]:
"""View next item without removing"""
async with self._lock:
return self._queue[0] if self._queue else None
@property
def size(self) -> int:
return len(self._queue)
@property
def is_empty(self) -> bool:
return len(self._queue) == 0
async def get_queue_stats(self) -> Dict:
"""Get queue statistics"""
async with self._lock:
priority_counts = {p.name: 0 for p in AssetPriority}
for item in self._queue:
priority_name = AssetPriority(item.priority).name
priority_counts[priority_name] += 1
return {
"total_items": len(self._queue),
"by_priority": priority_counts,
"oldest_item_age_seconds": (
(datetime.utcnow() - self._queue[0].enqueued_at).total_seconds()
if self._queue else 0
)
}
```
### 4.3 Servicio de Actualizacion de Activos
```python
# services/asset_update_service.py
import asyncio
from datetime import datetime, timedelta
from typing import Optional, Dict, List, Any
import logging
from providers.polygon_client import PolygonClient, AssetType, TickerSnapshot
from config.priority_assets import PRIORITY_ASSETS, SECONDARY_ASSETS
from services.priority_queue_service import PriorityQueueService, AssetPriority
logger = logging.getLogger(__name__)
class AssetUpdateService:
"""
Service for updating asset data from Polygon API.
Handles:
- Rate limiting (5 calls/min)
- Priority-based updates
- Database synchronization
- Error handling and retries
"""
def __init__(
self,
polygon_client: PolygonClient,
db_pool,
redis_client,
priority_queue: PriorityQueueService
):
self.polygon = polygon_client
self.db = db_pool
self.redis = redis_client
self.queue = priority_queue
# Rate limiting state
self._calls_this_minute = 0
self._minute_start = datetime.utcnow()
self._rate_limit = 5
async def _wait_for_rate_limit(self):
"""Wait if rate limit reached"""
now = datetime.utcnow()
# Reset counter each minute
if (now - self._minute_start).total_seconds() >= 60:
self._calls_this_minute = 0
self._minute_start = now
# Wait if limit reached
if self._calls_this_minute >= self._rate_limit:
wait_time = 60 - (now - self._minute_start).total_seconds()
if wait_time > 0:
logger.info(f"Rate limit reached, waiting {wait_time:.1f}s")
await asyncio.sleep(wait_time)
self._calls_this_minute = 0
self._minute_start = datetime.utcnow()
self._calls_this_minute += 1
async def update_priority_assets(self) -> Dict[str, Any]:
"""
Update all priority assets (XAU, EURUSD, BTC).
Uses 3 of 5 available API calls.
"""
results = {
"updated": [],
"failed": [],
"api_calls_used": 0
}
for asset in PRIORITY_ASSETS:
try:
await self._wait_for_rate_limit()
# Fetch snapshot from Polygon
snapshot = await self._fetch_asset_snapshot(
asset["polygon_ticker"],
asset["asset_type"]
)
if snapshot:
# Update database
await self._update_asset_in_db(asset, snapshot)
# Publish update event
await self._publish_update_event(asset["symbol"], snapshot)
results["updated"].append(asset["symbol"])
else:
results["failed"].append({
"symbol": asset["symbol"],
"error": "No data returned"
})
results["api_calls_used"] += 1
except Exception as e:
logger.error(f"Error updating {asset['symbol']}: {e}")
results["failed"].append({
"symbol": asset["symbol"],
"error": str(e)
})
return results
async def _fetch_asset_snapshot(
self,
polygon_ticker: str,
asset_type: str
) -> Optional[TickerSnapshot]:
"""Fetch current snapshot from Polygon API"""
try:
if asset_type == "forex":
return await self.polygon.get_snapshot_forex(polygon_ticker)
elif asset_type == "crypto":
return await self.polygon.get_snapshot_crypto(polygon_ticker)
else:
logger.warning(f"Unknown asset type: {asset_type}")
return None
except Exception as e:
logger.error(f"Error fetching {polygon_ticker}: {e}")
return None
async def _update_asset_in_db(
self,
asset: Dict,
snapshot: TickerSnapshot
):
"""Update asset data in PostgreSQL"""
async with self.db.acquire() as conn:
# Update trading.symbols with latest price info
await conn.execute("""
UPDATE trading.symbols
SET
updated_at = NOW(),
-- Store latest prices in a JSONB column if exists
metadata = jsonb_set(
COALESCE(metadata, '{}'),
'{last_update}',
$2::jsonb
)
WHERE symbol = $1
""", asset["symbol"], {
"bid": snapshot.bid,
"ask": snapshot.ask,
"last_price": snapshot.last_price,
"daily_open": snapshot.daily_open,
"daily_high": snapshot.daily_high,
"daily_low": snapshot.daily_low,
"daily_close": snapshot.daily_close,
"timestamp": snapshot.timestamp.isoformat()
})
# Update sync status
await conn.execute("""
INSERT INTO data_sources.data_sync_status
(ticker_id, provider_id, last_sync_timestamp, sync_status, updated_at)
SELECT
s.id,
(SELECT id FROM data_sources.api_providers WHERE code = 'polygon'),
NOW(),
'success',
NOW()
FROM trading.symbols s
WHERE s.symbol = $1
ON CONFLICT (ticker_id, provider_id)
DO UPDATE SET
last_sync_timestamp = NOW(),
sync_status = 'success',
updated_at = NOW()
""", asset["symbol"])
async def _publish_update_event(
self,
symbol: str,
snapshot: TickerSnapshot
):
"""Publish update event via Redis for real-time consumers"""
channel = f"asset:update:{symbol}"
message = {
"symbol": symbol,
"bid": snapshot.bid,
"ask": snapshot.ask,
"last_price": snapshot.last_price,
"timestamp": snapshot.timestamp.isoformat()
}
await self.redis.publish(channel, str(message))
async def process_queued_asset(self) -> Optional[Dict]:
"""
Process next asset from queue.
Uses remaining API calls (2 of 5 per cycle).
"""
item = await self.queue.dequeue()
if not item:
return None
try:
await self._wait_for_rate_limit()
snapshot = await self._fetch_asset_snapshot(
item.polygon_ticker,
item.asset_type
)
if snapshot:
asset = {
"symbol": item.symbol,
"polygon_ticker": item.polygon_ticker,
"asset_type": item.asset_type
}
await self._update_asset_in_db(asset, snapshot)
await self._publish_update_event(item.symbol, snapshot)
return {"symbol": item.symbol, "status": "success"}
else:
# Re-enqueue with lower priority on failure
if item.retry_count < 3:
item.retry_count += 1
await self.queue.enqueue(
item.symbol,
item.polygon_ticker,
item.asset_type,
AssetPriority.LOW
)
return {"symbol": item.symbol, "status": "failed", "requeued": True}
except Exception as e:
logger.error(f"Error processing queued asset {item.symbol}: {e}")
return {"symbol": item.symbol, "status": "error", "error": str(e)}
async def enqueue_secondary_assets(self):
"""Enqueue all secondary assets for gradual update"""
for asset in SECONDARY_ASSETS:
await self.queue.enqueue(
symbol=asset["symbol"],
polygon_ticker=asset["polygon_ticker"],
asset_type=asset["asset_type"],
priority=AssetPriority.MEDIUM
)
logger.info(f"Enqueued {len(SECONDARY_ASSETS)} secondary assets")
```
### 4.4 Job de Batch Programado
```python
# services/batch_scheduler.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class AssetBatchScheduler:
"""
Scheduler for batch asset updates.
Schedule:
- Every 5 minutes: Update priority assets (XAU, EURUSD, BTC)
- Continuous: Process queued secondary assets
"""
def __init__(
self,
asset_update_service,
batch_interval_minutes: int = 5
):
self.update_service = asset_update_service
self.batch_interval = batch_interval_minutes
self.scheduler = AsyncIOScheduler()
self._is_running = False
async def start(self):
"""Start the batch scheduler"""
if self._is_running:
logger.warning("Scheduler already running")
return
logger.info("Starting Asset Batch Scheduler")
# Priority assets job - every 5 minutes
self.scheduler.add_job(
self._priority_batch_job,
trigger=IntervalTrigger(minutes=self.batch_interval),
id="priority_assets_batch",
name="Update Priority Assets (XAU, EURUSD, BTC)",
replace_existing=True,
max_instances=1,
next_run_time=datetime.now() # Run immediately on start
)
# Queue processor job - every 15 seconds
self.scheduler.add_job(
self._queue_processor_job,
trigger=IntervalTrigger(seconds=15),
id="queue_processor",
name="Process Queued Secondary Assets",
replace_existing=True,
max_instances=1
)
# Enqueue secondary assets job - every 30 minutes
self.scheduler.add_job(
self._enqueue_secondary_job,
trigger=IntervalTrigger(minutes=30),
id="enqueue_secondary",
name="Enqueue Secondary Assets",
replace_existing=True,
max_instances=1
)
self.scheduler.start()
self._is_running = True
logger.info(f"Scheduler started with {len(self.scheduler.get_jobs())} jobs")
async def _priority_batch_job(self):
"""Job: Update priority assets"""
logger.info("=== Priority Batch Job Started ===")
start_time = datetime.utcnow()
try:
result = await self.update_service.update_priority_assets()
elapsed = (datetime.utcnow() - start_time).total_seconds()
logger.info(
f"Priority batch completed in {elapsed:.1f}s: "
f"{len(result['updated'])} updated, "
f"{len(result['failed'])} failed, "
f"{result['api_calls_used']} API calls"
)
# Log details
for symbol in result['updated']:
logger.debug(f" [OK] {symbol}")
for fail in result['failed']:
logger.warning(f" [FAIL] {fail['symbol']}: {fail['error']}")
except Exception as e:
logger.error(f"Priority batch job failed: {e}", exc_info=True)
async def _queue_processor_job(self):
"""Job: Process queued secondary assets"""
# Only process if we have API calls available
# Since priority uses 3, we have 2 remaining per minute
try:
for _ in range(2): # Process up to 2 items per cycle
result = await self.update_service.process_queued_asset()
if not result:
break # Queue empty
if result['status'] == 'success':
logger.debug(f"Queue: Updated {result['symbol']}")
else:
logger.debug(f"Queue: Failed {result['symbol']}")
except Exception as e:
logger.error(f"Queue processor job failed: {e}")
async def _enqueue_secondary_job(self):
"""Job: Enqueue secondary assets for update"""
try:
await self.update_service.enqueue_secondary_assets()
except Exception as e:
logger.error(f"Enqueue secondary job failed: {e}")
async def stop(self):
"""Stop the scheduler"""
if self._is_running:
self.scheduler.shutdown(wait=True)
self._is_running = False
logger.info("Scheduler stopped")
def get_job_status(self) -> list:
"""Get status of all scheduled jobs"""
return [
{
"id": job.id,
"name": job.name,
"next_run": job.next_run_time.isoformat() if job.next_run_time else None,
"trigger": str(job.trigger)
}
for job in self.scheduler.get_jobs()
]
```
---
## 5. Configuracion
### 5.1 Variables de Entorno
```bash
# .env - Data Service Configuration
# Polygon.io API (Massive.com compatible)
POLYGON_API_KEY=f09bA2V7OG7bHn4HxIT6Xs45ujg_pRXk
POLYGON_BASE_URL=https://api.polygon.io
POLYGON_RATE_LIMIT=5
POLYGON_TIER=free
# Batch Configuration
BATCH_INTERVAL_MINUTES=5
PRIORITY_ASSETS_ENABLED=true
QUEUE_MAX_SIZE=1000
# Database
DB_HOST=localhost
DB_PORT=5432
DB_NAME=trading_data
DB_USER=trading_user
DB_PASSWORD=trading_dev_2025
# Redis (for queue and events)
REDIS_URL=redis://localhost:6379/0
```
### 5.2 Configuracion de Produccion
```yaml
# config/production.yml
batch:
interval_minutes: 5
priority_assets:
- symbol: XAUUSD
enabled: true
priority: 1
- symbol: EURUSD
enabled: true
priority: 2
- symbol: BTCUSD
enabled: true
priority: 3
rate_limiting:
calls_per_minute: 5
tier: free
upgrade_url: https://polygon.io/pricing
queue:
max_size: 1000
retry_max_attempts: 3
retry_delay_seconds: 60
```
---
## 6. Plan de Migracion a Cuenta de Pago
### 6.1 Cuando Migrar
Se recomienda migrar a cuenta de pago cuando:
- Se requieran datos en tiempo real (< 15 min delay)
- Se necesiten mas de 5 activos actualizados por minuto
- Se requiera soporte WebSocket para streaming
### 6.2 Planes Disponibles
| Plan | Rate Limit | Precio | Recomendado Para |
|------|------------|--------|------------------|
| Free | 5/min | $0 | Desarrollo, testing |
| Starter | Unlimited | $47/mo | Produccion basica |
| Developer | Unlimited | $99/mo | Produccion avanzada |
| Advanced | Unlimited | $199/mo | Tiempo real |
### 6.3 Cambios en Codigo para Plan de Pago
```python
# config/polygon_config.py
POLYGON_CONFIGS = {
"free": {
"rate_limit": 5,
"delay_minutes": 15,
"websocket_enabled": False
},
"starter": {
"rate_limit": None, # Unlimited
"delay_minutes": 0,
"websocket_enabled": False
},
"developer": {
"rate_limit": None,
"delay_minutes": 0,
"websocket_enabled": True
}
}
```
---
## 7. Testing
### 7.1 Tests Unitarios
```python
# tests/test_priority_queue.py
import pytest
from services.priority_queue_service import PriorityQueueService, AssetPriority
@pytest.mark.asyncio
async def test_priority_ordering():
queue = PriorityQueueService()
await queue.enqueue("ETH", "X:ETH", "crypto", AssetPriority.LOW)
await queue.enqueue("XAU", "C:XAU", "forex", AssetPriority.CRITICAL)
await queue.enqueue("GBP", "C:GBP", "forex", AssetPriority.MEDIUM)
# Should dequeue by priority
item = await queue.dequeue()
assert item.symbol == "XAU"
item = await queue.dequeue()
assert item.symbol == "GBP"
item = await queue.dequeue()
assert item.symbol == "ETH"
@pytest.mark.asyncio
async def test_deduplication():
queue = PriorityQueueService()
result1 = await queue.enqueue("XAU", "C:XAU", "forex")
result2 = await queue.enqueue("XAU", "C:XAU", "forex")
assert result1 is True
assert result2 is False
assert queue.size == 1
```
### 7.2 Tests de Integracion
```python
# tests/test_batch_integration.py
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_priority_batch_respects_rate_limit():
"""Verify batch job uses max 3 API calls for priority assets"""
with patch('providers.polygon_client.PolygonClient') as mock_client:
mock_client.get_snapshot_forex = AsyncMock()
mock_client.get_snapshot_crypto = AsyncMock()
service = AssetUpdateService(mock_client, None, None, None)
result = await service.update_priority_assets()
# Should only make 3 calls (XAU, EURUSD, BTC)
assert result['api_calls_used'] <= 3
```
---
## 8. Monitoreo
### 8.1 Metricas a Rastrear
| Metrica | Descripcion | Alerta |
|---------|-------------|--------|
| `batch.priority.success_rate` | % de actualizaciones exitosas | < 90% |
| `batch.priority.latency_ms` | Latencia promedio del batch | > 30000 |
| `queue.size` | Tamano de cola de secundarios | > 500 |
| `api.rate_limit_waits` | Veces que espero por rate limit | > 10/hora |
### 8.2 Endpoint de Health
```python
# api/health.py
@router.get("/health/batch")
async def batch_health(scheduler: AssetBatchScheduler):
jobs = scheduler.get_job_status()
queue_stats = await scheduler.update_service.queue.get_queue_stats()
return {
"status": "healthy",
"jobs": jobs,
"queue": queue_stats,
"last_priority_update": await get_last_priority_update(),
"api_calls_remaining": await get_api_calls_remaining()
}
```
---
## 9. Trazabilidad
### 9.1 Documentos Relacionados
| Documento | Tipo | Relacion |
|-----------|------|----------|
| [INT-DATA-001-data-service.md](./INT-DATA-001-data-service.md) | Integracion | Base |
| [RF-DATA-001-sincronizacion-batch.md](../RF-DATA-001-sincronizacion-batch.md) | Requerimiento | Implementa |
| [ET-DATA-001-arquitectura-batch.md](../ET-DATA-001-arquitectura-batch.md) | Especificacion | Detalla |
### 9.2 Archivos de Codigo
| Archivo | Proposito |
|---------|-----------|
| `apps/data-service/src/config/priority_assets.py` | Configuracion de activos |
| `apps/data-service/src/services/priority_queue_service.py` | Cola de prioridad |
| `apps/data-service/src/services/asset_update_service.py` | Servicio de actualizacion |
| `apps/data-service/src/services/batch_scheduler.py` | Scheduler del batch |
---
## 10. Historial de Cambios
| Version | Fecha | Autor | Cambios |
|---------|-------|-------|---------|
| 1.0.0 | 2026-01-04 | Orquestador Agent | Creacion inicial |
---
**Estado de Implementacion:**
| Aspecto | Estado | Notas |
|---------|--------|-------|
| Documentacion | ✅ | Este documento |
| Diseno tecnico | ✅ | Arquitectura definida |
| Codigo Python | ⏳ | Pendiente |
| Tests | ⏳ | Pendiente |
| Integracion | ⏳ | Pendiente |