Changes include: - Updated architecture documentation - Enhanced module definitions (OQI-001 to OQI-008) - ML integration documentation updates - Trading strategies documentation - Orchestration and inventory updates - Docker configuration updates 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1239 lines
42 KiB
Markdown
1239 lines
42 KiB
Markdown
---
|
|
id: "ET-DATA-001"
|
|
title: "Arquitectura del Proceso Batch con Priorizacion"
|
|
type: "Technical Specification"
|
|
status: "To Do"
|
|
priority: "Alta"
|
|
epic: "Transversal"
|
|
project: "trading-platform"
|
|
version: "1.0.0"
|
|
created_date: "2026-01-04"
|
|
updated_date: "2026-01-04"
|
|
---
|
|
|
|
# ET-DATA-001: Arquitectura del Proceso Batch con Priorizacion
|
|
|
|
## Metadata
|
|
|
|
| Campo | Valor |
|
|
|-------|-------|
|
|
| **ID** | ET-DATA-001 |
|
|
| **Modulo** | Data Service |
|
|
| **Tipo** | Especificacion Tecnica |
|
|
| **Version** | 1.0.0 |
|
|
| **Estado** | Planificado |
|
|
| **Implementa** | RF-DATA-001 |
|
|
| **Ultima actualizacion** | 2026-01-04 |
|
|
|
|
---
|
|
|
|
## 1. Proposito
|
|
|
|
Especificar la arquitectura tecnica y el diseno de implementacion del proceso batch de actualizacion de activos con sistema de priorizacion y cola, incluyendo diagramas de secuencia, estructura de codigo, y patrones de diseno utilizados.
|
|
|
|
---
|
|
|
|
## 2. Arquitectura del Sistema
|
|
|
|
### 2.1 Vista de Componentes
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────────────────┐
|
|
│ DATA SERVICE (apps/data-service) │
|
|
│ │
|
|
│ src/ │
|
|
│ ├── api/ │
|
|
│ │ ├── routes.py # Rutas existentes │
|
|
│ │ └── batch_routes.py # [NEW] Endpoints de batch │
|
|
│ │ │
|
|
│ ├── config/ │
|
|
│ │ ├── settings.py # Configuracion existente │
|
|
│ │ └── priority_assets.py # [NEW] Configuracion de prioridades │
|
|
│ │ │
|
|
│ ├── providers/ │
|
|
│ │ ├── polygon_client.py # Cliente Polygon (existente) │
|
|
│ │ └── rate_limiter.py # [NEW] Rate limiter mejorado │
|
|
│ │ │
|
|
│ ├── services/ │
|
|
│ │ ├── sync_service.py # Sync existente │
|
|
│ │ ├── scheduler.py # Scheduler existente │
|
|
│ │ ├── priority_queue.py # [NEW] Cola de prioridad │
|
|
│ │ ├── asset_updater.py # [NEW] Servicio de actualizacion │
|
|
│ │ └── batch_orchestrator.py # [NEW] Orquestador del batch │
|
|
│ │ │
|
|
│ ├── models/ │
|
|
│ │ ├── market.py # Modelos existentes │
|
|
│ │ └── batch.py # [NEW] Modelos de batch │
|
|
│ │ │
|
|
│ └── main.py # Entry point (modificar) │
|
|
│ │
|
|
└─────────────────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
### 2.2 Diagrama de Clases
|
|
|
|
```
|
|
┌─────────────────────────┐ ┌─────────────────────────┐
|
|
│ BatchOrchestrator │ │ PriorityQueue │
|
|
├─────────────────────────┤ ├─────────────────────────┤
|
|
│ - asset_updater │────▶│ - _queue: List │
|
|
│ - priority_queue │ │ - _in_queue: Set │
|
|
│ - scheduler │ │ - max_size: int │
|
|
├─────────────────────────┤ ├─────────────────────────┤
|
|
│ + start() │ │ + enqueue() │
|
|
│ + stop() │ │ + dequeue() │
|
|
│ + run_priority_batch() │ │ + peek() │
|
|
│ + process_queue() │ │ + get_stats() │
|
|
└───────────┬─────────────┘ └─────────────────────────┘
|
|
│
|
|
│ uses
|
|
▼
|
|
┌─────────────────────────┐ ┌─────────────────────────┐
|
|
│ AssetUpdater │ │ RateLimiter │
|
|
├─────────────────────────┤ ├─────────────────────────┤
|
|
│ - polygon_client │────▶│ - limit: int │
|
|
│ - rate_limiter │ │ - window_seconds: int │
|
|
│ - db_pool │ │ - _calls: int │
|
|
│ - redis_client │ │ - _window_start: dt │
|
|
├─────────────────────────┤ ├─────────────────────────┤
|
|
│ + update_asset() │ │ + acquire() │
|
|
│ + update_priority() │ │ + wait_if_needed() │
|
|
│ + publish_event() │ │ + get_remaining() │
|
|
└───────────┬─────────────┘ └─────────────────────────┘
|
|
│
|
|
│ uses
|
|
▼
|
|
┌─────────────────────────┐
|
|
│ PolygonClient │
|
|
├─────────────────────────┤
|
|
│ (existente) │
|
|
│ - api_key │
|
|
│ - base_url │
|
|
├─────────────────────────┤
|
|
│ + get_snapshot_forex() │
|
|
│ + get_snapshot_crypto() │
|
|
│ + get_universal() │
|
|
└─────────────────────────┘
|
|
```
|
|
|
|
---
|
|
|
|
## 3. Diseno Detallado
|
|
|
|
### 3.1 Modelos de Datos
|
|
|
|
```python
|
|
# src/models/batch.py
|
|
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional, List, Dict, Any
|
|
from pydantic import BaseModel
|
|
|
|
|
|
class AssetPriority(Enum):
|
|
"""Priority levels for asset updates"""
|
|
CRITICAL = 1 # XAU, EURUSD, BTC - Always first
|
|
HIGH = 2 # Major pairs
|
|
MEDIUM = 3 # Secondary crypto
|
|
LOW = 4 # Indices, minor pairs
|
|
|
|
|
|
class SyncStatus(Enum):
|
|
"""Status of asset sync"""
|
|
PENDING = "pending"
|
|
IN_PROGRESS = "in_progress"
|
|
SUCCESS = "success"
|
|
FAILED = "failed"
|
|
STALE = "stale" # No update in 15+ min
|
|
|
|
|
|
@dataclass(order=True)
|
|
class QueuedAsset:
|
|
"""Asset waiting in the update queue"""
|
|
priority: int
|
|
enqueued_at: datetime = field(compare=False)
|
|
symbol: str = field(compare=False)
|
|
polygon_ticker: str = field(compare=False)
|
|
asset_type: str = field(compare=False)
|
|
retry_count: int = field(default=0, compare=False)
|
|
last_error: Optional[str] = field(default=None, compare=False)
|
|
|
|
|
|
@dataclass
|
|
class AssetSnapshot:
|
|
"""Snapshot data from API"""
|
|
symbol: str
|
|
bid: float
|
|
ask: float
|
|
spread: float
|
|
last_price: float
|
|
daily_open: Optional[float]
|
|
daily_high: Optional[float]
|
|
daily_low: Optional[float]
|
|
daily_close: Optional[float]
|
|
daily_volume: Optional[float]
|
|
timestamp: datetime
|
|
source: str = "polygon"
|
|
|
|
|
|
@dataclass
|
|
class BatchResult:
|
|
"""Result of a batch execution"""
|
|
batch_id: str
|
|
started_at: datetime
|
|
completed_at: datetime
|
|
duration_ms: int
|
|
priority_updated: List[str]
|
|
priority_failed: List[Dict[str, str]]
|
|
queue_processed: int
|
|
queue_remaining: int
|
|
api_calls_used: int
|
|
rate_limit_waits: int
|
|
errors: List[Dict[str, Any]]
|
|
|
|
|
|
# Pydantic models for API responses
|
|
class BatchStatusResponse(BaseModel):
|
|
"""API response for batch status"""
|
|
batch_id: str
|
|
status: str
|
|
last_run: Optional[datetime]
|
|
next_run: Optional[datetime]
|
|
priority_assets: List[str]
|
|
queue_size: int
|
|
api_calls_remaining: int
|
|
|
|
|
|
class BatchRunResponse(BaseModel):
|
|
"""API response for manual batch run"""
|
|
success: bool
|
|
batch_id: str
|
|
result: Dict[str, Any]
|
|
message: str
|
|
```
|
|
|
|
### 3.2 Configuracion de Activos Prioritarios
|
|
|
|
```python
|
|
# src/config/priority_assets.py
|
|
|
|
from typing import List, Dict
|
|
from models.batch import AssetPriority
|
|
|
|
|
|
class PriorityAssetsConfig:
|
|
"""Configuration for priority assets"""
|
|
|
|
# Critical assets - updated every batch cycle
|
|
PRIORITY_ASSETS: List[Dict] = [
|
|
{
|
|
"symbol": "XAUUSD",
|
|
"polygon_ticker": "C:XAUUSD",
|
|
"asset_type": "forex",
|
|
"name": "Gold Spot / US Dollar",
|
|
"priority": AssetPriority.CRITICAL,
|
|
"base_asset": "XAU",
|
|
"quote_asset": "USD",
|
|
"price_precision": 2,
|
|
},
|
|
{
|
|
"symbol": "EURUSD",
|
|
"polygon_ticker": "C:EURUSD",
|
|
"asset_type": "forex",
|
|
"name": "Euro / US Dollar",
|
|
"priority": AssetPriority.CRITICAL,
|
|
"base_asset": "EUR",
|
|
"quote_asset": "USD",
|
|
"price_precision": 5,
|
|
},
|
|
{
|
|
"symbol": "BTCUSD",
|
|
"polygon_ticker": "X:BTCUSD",
|
|
"asset_type": "crypto",
|
|
"name": "Bitcoin / US Dollar",
|
|
"priority": AssetPriority.CRITICAL,
|
|
"base_asset": "BTC",
|
|
"quote_asset": "USD",
|
|
"price_precision": 2,
|
|
},
|
|
]
|
|
|
|
# Secondary assets - queued for gradual update
|
|
SECONDARY_ASSETS: List[Dict] = [
|
|
{
|
|
"symbol": "ETHUSDT",
|
|
"polygon_ticker": "X:ETHUSDT",
|
|
"asset_type": "crypto",
|
|
"priority": AssetPriority.HIGH,
|
|
},
|
|
{
|
|
"symbol": "GBPUSD",
|
|
"polygon_ticker": "C:GBPUSD",
|
|
"asset_type": "forex",
|
|
"priority": AssetPriority.HIGH,
|
|
},
|
|
{
|
|
"symbol": "USDJPY",
|
|
"polygon_ticker": "C:USDJPY",
|
|
"asset_type": "forex",
|
|
"priority": AssetPriority.HIGH,
|
|
},
|
|
{
|
|
"symbol": "XAGUSD",
|
|
"polygon_ticker": "C:XAGUSD",
|
|
"asset_type": "forex", # Silver as commodity
|
|
"priority": AssetPriority.MEDIUM,
|
|
},
|
|
{
|
|
"symbol": "AUDUSD",
|
|
"polygon_ticker": "C:AUDUSD",
|
|
"asset_type": "forex",
|
|
"priority": AssetPriority.MEDIUM,
|
|
},
|
|
{
|
|
"symbol": "USDCAD",
|
|
"polygon_ticker": "C:USDCAD",
|
|
"asset_type": "forex",
|
|
"priority": AssetPriority.MEDIUM,
|
|
},
|
|
]
|
|
|
|
@classmethod
|
|
def get_priority_symbols(cls) -> List[str]:
|
|
"""Get list of priority symbol names"""
|
|
return [a["symbol"] for a in cls.PRIORITY_ASSETS]
|
|
|
|
@classmethod
|
|
def get_all_assets(cls) -> List[Dict]:
|
|
"""Get all configured assets"""
|
|
return cls.PRIORITY_ASSETS + cls.SECONDARY_ASSETS
|
|
```
|
|
|
|
### 3.3 Rate Limiter Mejorado
|
|
|
|
```python
|
|
# src/providers/rate_limiter.py
|
|
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RateLimiter:
|
|
"""
|
|
Token bucket rate limiter for API calls.
|
|
|
|
Features:
|
|
- Configurable calls per window
|
|
- Automatic waiting when limit reached
|
|
- Metrics tracking
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
calls_per_minute: int = 5,
|
|
window_seconds: int = 60
|
|
):
|
|
self.limit = calls_per_minute
|
|
self.window_seconds = window_seconds
|
|
self._calls = 0
|
|
self._window_start = datetime.utcnow()
|
|
self._lock = asyncio.Lock()
|
|
self._total_waits = 0
|
|
self._total_calls = 0
|
|
|
|
async def acquire(self) -> float:
|
|
"""
|
|
Acquire a rate limit token.
|
|
|
|
Returns:
|
|
Wait time in seconds (0 if no wait needed)
|
|
"""
|
|
async with self._lock:
|
|
now = datetime.utcnow()
|
|
elapsed = (now - self._window_start).total_seconds()
|
|
|
|
# Reset window if expired
|
|
if elapsed >= self.window_seconds:
|
|
self._calls = 0
|
|
self._window_start = now
|
|
elapsed = 0
|
|
|
|
# Check if we need to wait
|
|
if self._calls >= self.limit:
|
|
wait_time = self.window_seconds - elapsed
|
|
if wait_time > 0:
|
|
logger.info(
|
|
f"Rate limit reached ({self._calls}/{self.limit}), "
|
|
f"waiting {wait_time:.1f}s"
|
|
)
|
|
self._total_waits += 1
|
|
await asyncio.sleep(wait_time)
|
|
|
|
# Reset after wait
|
|
self._calls = 0
|
|
self._window_start = datetime.utcnow()
|
|
|
|
self._calls += 1
|
|
self._total_calls += 1
|
|
return 0
|
|
|
|
def get_remaining(self) -> int:
|
|
"""Get remaining calls in current window"""
|
|
now = datetime.utcnow()
|
|
elapsed = (now - self._window_start).total_seconds()
|
|
|
|
if elapsed >= self.window_seconds:
|
|
return self.limit
|
|
|
|
return max(0, self.limit - self._calls)
|
|
|
|
def get_reset_time(self) -> datetime:
|
|
"""Get when the current window resets"""
|
|
return self._window_start + timedelta(seconds=self.window_seconds)
|
|
|
|
def get_stats(self) -> dict:
|
|
"""Get rate limiter statistics"""
|
|
return {
|
|
"limit": self.limit,
|
|
"window_seconds": self.window_seconds,
|
|
"current_calls": self._calls,
|
|
"remaining": self.get_remaining(),
|
|
"reset_at": self.get_reset_time().isoformat(),
|
|
"total_calls": self._total_calls,
|
|
"total_waits": self._total_waits,
|
|
}
|
|
```
|
|
|
|
### 3.4 Cola de Prioridad
|
|
|
|
```python
|
|
# src/services/priority_queue.py
|
|
|
|
import asyncio
|
|
import heapq
|
|
from datetime import datetime
|
|
from typing import Optional, List, Dict
|
|
import logging
|
|
|
|
from models.batch import QueuedAsset, AssetPriority
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PriorityQueue:
|
|
"""
|
|
Thread-safe priority queue for asset updates.
|
|
|
|
Uses min-heap with priority as key.
|
|
Critical (1) < High (2) < Medium (3) < Low (4)
|
|
"""
|
|
|
|
def __init__(self, max_size: int = 1000):
|
|
self._heap: List[QueuedAsset] = []
|
|
self._in_queue: set = set()
|
|
self.max_size = max_size
|
|
self._lock = asyncio.Lock()
|
|
self._total_enqueued = 0
|
|
self._total_processed = 0
|
|
|
|
async def enqueue(
|
|
self,
|
|
symbol: str,
|
|
polygon_ticker: str,
|
|
asset_type: str,
|
|
priority: AssetPriority = AssetPriority.MEDIUM
|
|
) -> bool:
|
|
"""
|
|
Add asset to queue if not already present.
|
|
|
|
Returns:
|
|
True if enqueued, False if already in queue or queue full
|
|
"""
|
|
async with self._lock:
|
|
if symbol in self._in_queue:
|
|
logger.debug(f"Asset {symbol} already in queue")
|
|
return False
|
|
|
|
if len(self._heap) >= self.max_size:
|
|
logger.warning(f"Queue full ({self.max_size}), dropping {symbol}")
|
|
return False
|
|
|
|
item = QueuedAsset(
|
|
priority=priority.value,
|
|
enqueued_at=datetime.utcnow(),
|
|
symbol=symbol,
|
|
polygon_ticker=polygon_ticker,
|
|
asset_type=asset_type
|
|
)
|
|
|
|
heapq.heappush(self._heap, item)
|
|
self._in_queue.add(symbol)
|
|
self._total_enqueued += 1
|
|
|
|
logger.debug(f"Enqueued {symbol} with priority {priority.name}")
|
|
return True
|
|
|
|
async def dequeue(self) -> Optional[QueuedAsset]:
|
|
"""
|
|
Get and remove highest priority asset.
|
|
|
|
Returns:
|
|
QueuedAsset or None if queue empty
|
|
"""
|
|
async with self._lock:
|
|
if not self._heap:
|
|
return None
|
|
|
|
item = heapq.heappop(self._heap)
|
|
self._in_queue.discard(item.symbol)
|
|
self._total_processed += 1
|
|
|
|
return item
|
|
|
|
async def requeue(
|
|
self,
|
|
item: QueuedAsset,
|
|
error: Optional[str] = None
|
|
) -> bool:
|
|
"""
|
|
Re-add failed item with lower priority.
|
|
|
|
Returns:
|
|
True if requeued, False if max retries reached
|
|
"""
|
|
if item.retry_count >= 3:
|
|
logger.warning(f"Max retries reached for {item.symbol}")
|
|
return False
|
|
|
|
item.retry_count += 1
|
|
item.last_error = error
|
|
item.priority = min(item.priority + 1, AssetPriority.LOW.value)
|
|
item.enqueued_at = datetime.utcnow()
|
|
|
|
async with self._lock:
|
|
if item.symbol not in self._in_queue:
|
|
heapq.heappush(self._heap, item)
|
|
self._in_queue.add(item.symbol)
|
|
return True
|
|
return False
|
|
|
|
async def peek(self) -> Optional[QueuedAsset]:
|
|
"""View next item without removing"""
|
|
async with self._lock:
|
|
return self._heap[0] if self._heap else None
|
|
|
|
@property
|
|
def size(self) -> int:
|
|
"""Current queue size"""
|
|
return len(self._heap)
|
|
|
|
@property
|
|
def is_empty(self) -> bool:
|
|
"""Check if queue is empty"""
|
|
return len(self._heap) == 0
|
|
|
|
async def get_stats(self) -> Dict:
|
|
"""Get queue statistics"""
|
|
async with self._lock:
|
|
priority_counts = {p.name: 0 for p in AssetPriority}
|
|
oldest_age = 0
|
|
|
|
for item in self._heap:
|
|
priority_name = AssetPriority(item.priority).name
|
|
priority_counts[priority_name] += 1
|
|
|
|
if self._heap:
|
|
oldest = min(self._heap, key=lambda x: x.enqueued_at)
|
|
oldest_age = (datetime.utcnow() - oldest.enqueued_at).total_seconds()
|
|
|
|
return {
|
|
"size": len(self._heap),
|
|
"max_size": self.max_size,
|
|
"by_priority": priority_counts,
|
|
"oldest_age_seconds": oldest_age,
|
|
"total_enqueued": self._total_enqueued,
|
|
"total_processed": self._total_processed,
|
|
}
|
|
|
|
async def clear(self):
|
|
"""Clear the queue"""
|
|
async with self._lock:
|
|
self._heap.clear()
|
|
self._in_queue.clear()
|
|
```
|
|
|
|
### 3.5 Servicio de Actualizacion
|
|
|
|
```python
|
|
# src/services/asset_updater.py
|
|
|
|
import asyncio
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, List, Any
|
|
import json
|
|
import logging
|
|
|
|
from providers.polygon_client import PolygonClient, AssetType, TickerSnapshot
|
|
from providers.rate_limiter import RateLimiter
|
|
from models.batch import AssetSnapshot, SyncStatus
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AssetUpdater:
|
|
"""
|
|
Service for updating asset data from Polygon API to PostgreSQL.
|
|
|
|
Responsibilities:
|
|
- Fetch snapshots from API
|
|
- Update database tables
|
|
- Publish Redis events
|
|
- Track sync status
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
polygon_client: PolygonClient,
|
|
rate_limiter: RateLimiter,
|
|
db_pool,
|
|
redis_client
|
|
):
|
|
self.polygon = polygon_client
|
|
self.rate_limiter = rate_limiter
|
|
self.db = db_pool
|
|
self.redis = redis_client
|
|
|
|
async def update_asset(
|
|
self,
|
|
symbol: str,
|
|
polygon_ticker: str,
|
|
asset_type: str
|
|
) -> Optional[AssetSnapshot]:
|
|
"""
|
|
Update a single asset from API.
|
|
|
|
Args:
|
|
symbol: Asset symbol (e.g., XAUUSD)
|
|
polygon_ticker: Polygon ticker (e.g., C:XAUUSD)
|
|
asset_type: Type (forex, crypto, index)
|
|
|
|
Returns:
|
|
AssetSnapshot if successful, None otherwise
|
|
"""
|
|
try:
|
|
# Acquire rate limit token
|
|
await self.rate_limiter.acquire()
|
|
|
|
# Fetch from API
|
|
snapshot = await self._fetch_snapshot(polygon_ticker, asset_type)
|
|
|
|
if not snapshot:
|
|
await self._update_sync_status(symbol, SyncStatus.FAILED, "No data")
|
|
return None
|
|
|
|
# Convert to our model
|
|
asset_snapshot = self._convert_snapshot(symbol, snapshot)
|
|
|
|
# Update database
|
|
await self._update_database(symbol, asset_snapshot)
|
|
|
|
# Update sync status
|
|
await self._update_sync_status(symbol, SyncStatus.SUCCESS)
|
|
|
|
# Publish event
|
|
await self._publish_update(symbol, asset_snapshot)
|
|
|
|
logger.info(f"Updated {symbol}: {asset_snapshot.last_price}")
|
|
return asset_snapshot
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating {symbol}: {e}")
|
|
await self._update_sync_status(symbol, SyncStatus.FAILED, str(e))
|
|
raise
|
|
|
|
async def _fetch_snapshot(
|
|
self,
|
|
polygon_ticker: str,
|
|
asset_type: str
|
|
) -> Optional[TickerSnapshot]:
|
|
"""Fetch snapshot from Polygon API"""
|
|
try:
|
|
if asset_type == "forex":
|
|
return await self.polygon.get_snapshot_forex(polygon_ticker)
|
|
elif asset_type == "crypto":
|
|
return await self.polygon.get_snapshot_crypto(polygon_ticker)
|
|
else:
|
|
logger.warning(f"Unknown asset type: {asset_type}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"API error for {polygon_ticker}: {e}")
|
|
raise
|
|
|
|
def _convert_snapshot(
|
|
self,
|
|
symbol: str,
|
|
snapshot: TickerSnapshot
|
|
) -> AssetSnapshot:
|
|
"""Convert Polygon snapshot to our model"""
|
|
spread = snapshot.ask - snapshot.bid if snapshot.ask and snapshot.bid else 0
|
|
|
|
return AssetSnapshot(
|
|
symbol=symbol,
|
|
bid=snapshot.bid or 0,
|
|
ask=snapshot.ask or 0,
|
|
spread=spread,
|
|
last_price=snapshot.last_price or 0,
|
|
daily_open=snapshot.daily_open,
|
|
daily_high=snapshot.daily_high,
|
|
daily_low=snapshot.daily_low,
|
|
daily_close=snapshot.daily_close,
|
|
daily_volume=snapshot.daily_volume,
|
|
timestamp=snapshot.timestamp or datetime.utcnow(),
|
|
source="polygon"
|
|
)
|
|
|
|
async def _update_database(
|
|
self,
|
|
symbol: str,
|
|
snapshot: AssetSnapshot
|
|
):
|
|
"""Update asset data in PostgreSQL"""
|
|
async with self.db.acquire() as conn:
|
|
# Update trading.symbols metadata
|
|
metadata = {
|
|
"last_update": {
|
|
"bid": snapshot.bid,
|
|
"ask": snapshot.ask,
|
|
"spread": snapshot.spread,
|
|
"last_price": snapshot.last_price,
|
|
"daily_open": snapshot.daily_open,
|
|
"daily_high": snapshot.daily_high,
|
|
"daily_low": snapshot.daily_low,
|
|
"daily_close": snapshot.daily_close,
|
|
"timestamp": snapshot.timestamp.isoformat(),
|
|
"source": snapshot.source
|
|
}
|
|
}
|
|
|
|
await conn.execute("""
|
|
UPDATE trading.symbols
|
|
SET
|
|
updated_at = NOW(),
|
|
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb
|
|
WHERE symbol = $1
|
|
""", symbol, json.dumps(metadata))
|
|
|
|
async def _update_sync_status(
|
|
self,
|
|
symbol: str,
|
|
status: SyncStatus,
|
|
error: Optional[str] = None
|
|
):
|
|
"""Update sync status in database"""
|
|
async with self.db.acquire() as conn:
|
|
await conn.execute("""
|
|
INSERT INTO data_sources.data_sync_status
|
|
(ticker_id, provider_id, last_sync_timestamp,
|
|
sync_status, error_message, updated_at)
|
|
SELECT
|
|
s.id,
|
|
(SELECT id FROM data_sources.api_providers
|
|
WHERE code = 'polygon' LIMIT 1),
|
|
NOW(),
|
|
$2,
|
|
$3,
|
|
NOW()
|
|
FROM trading.symbols s
|
|
WHERE s.symbol = $1
|
|
ON CONFLICT (ticker_id, provider_id)
|
|
DO UPDATE SET
|
|
last_sync_timestamp = NOW(),
|
|
sync_status = $2,
|
|
error_message = $3,
|
|
updated_at = NOW()
|
|
""", symbol, status.value, error)
|
|
|
|
async def _publish_update(
|
|
self,
|
|
symbol: str,
|
|
snapshot: AssetSnapshot
|
|
):
|
|
"""Publish update event via Redis Pub/Sub"""
|
|
channel = f"asset:update:{symbol}"
|
|
message = json.dumps({
|
|
"type": "price_update",
|
|
"symbol": symbol,
|
|
"data": {
|
|
"bid": snapshot.bid,
|
|
"ask": snapshot.ask,
|
|
"spread": snapshot.spread,
|
|
"last_price": snapshot.last_price,
|
|
"timestamp": snapshot.timestamp.isoformat()
|
|
}
|
|
})
|
|
|
|
await self.redis.publish(channel, message)
|
|
```
|
|
|
|
### 3.6 Orquestador del Batch
|
|
|
|
```python
|
|
# src/services/batch_orchestrator.py
|
|
|
|
import asyncio
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Dict, List, Any, Optional
|
|
import logging
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
|
|
from config.priority_assets import PriorityAssetsConfig
|
|
from services.priority_queue import PriorityQueue
|
|
from services.asset_updater import AssetUpdater
|
|
from models.batch import BatchResult, AssetPriority
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BatchOrchestrator:
|
|
"""
|
|
Orchestrates the batch update process.
|
|
|
|
Responsibilities:
|
|
- Schedule batch jobs
|
|
- Coordinate priority updates
|
|
- Manage asset queue
|
|
- Track batch metrics
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
asset_updater: AssetUpdater,
|
|
priority_queue: PriorityQueue,
|
|
batch_interval_minutes: int = 5
|
|
):
|
|
self.updater = asset_updater
|
|
self.queue = priority_queue
|
|
self.interval = batch_interval_minutes
|
|
self.scheduler = AsyncIOScheduler()
|
|
self._is_running = False
|
|
self._last_batch_result: Optional[BatchResult] = None
|
|
|
|
async def start(self):
|
|
"""Start the batch orchestrator"""
|
|
if self._is_running:
|
|
logger.warning("Orchestrator already running")
|
|
return
|
|
|
|
logger.info("Starting Batch Orchestrator")
|
|
|
|
# Priority batch - every 5 minutes
|
|
self.scheduler.add_job(
|
|
self._run_priority_batch,
|
|
trigger=IntervalTrigger(minutes=self.interval),
|
|
id="priority_batch",
|
|
name="Priority Assets Batch (XAU, EURUSD, BTC)",
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
next_run_time=datetime.now()
|
|
)
|
|
|
|
# Queue processor - every 15 seconds
|
|
self.scheduler.add_job(
|
|
self._process_queue,
|
|
trigger=IntervalTrigger(seconds=15),
|
|
id="queue_processor",
|
|
name="Process Secondary Assets Queue",
|
|
replace_existing=True,
|
|
max_instances=1
|
|
)
|
|
|
|
# Enqueue secondary assets - every 30 minutes
|
|
self.scheduler.add_job(
|
|
self._enqueue_secondary,
|
|
trigger=IntervalTrigger(minutes=30),
|
|
id="enqueue_secondary",
|
|
name="Enqueue Secondary Assets",
|
|
replace_existing=True,
|
|
max_instances=1,
|
|
next_run_time=datetime.now()
|
|
)
|
|
|
|
self.scheduler.start()
|
|
self._is_running = True
|
|
|
|
logger.info(
|
|
f"Orchestrator started with {len(self.scheduler.get_jobs())} jobs"
|
|
)
|
|
|
|
async def stop(self):
|
|
"""Stop the orchestrator"""
|
|
if not self._is_running:
|
|
return
|
|
|
|
self.scheduler.shutdown(wait=True)
|
|
self._is_running = False
|
|
logger.info("Orchestrator stopped")
|
|
|
|
async def run_manual_batch(self) -> BatchResult:
|
|
"""Run batch manually (via API call)"""
|
|
return await self._run_priority_batch()
|
|
|
|
async def _run_priority_batch(self) -> BatchResult:
|
|
"""Execute priority batch job"""
|
|
batch_id = str(uuid.uuid4())[:8]
|
|
started_at = datetime.utcnow()
|
|
|
|
logger.info(f"=== Priority Batch {batch_id} Started ===")
|
|
|
|
updated = []
|
|
failed = []
|
|
api_calls = 0
|
|
rate_waits = 0
|
|
|
|
for asset in PriorityAssetsConfig.PRIORITY_ASSETS:
|
|
try:
|
|
snapshot = await self.updater.update_asset(
|
|
symbol=asset["symbol"],
|
|
polygon_ticker=asset["polygon_ticker"],
|
|
asset_type=asset["asset_type"]
|
|
)
|
|
|
|
api_calls += 1
|
|
|
|
if snapshot:
|
|
updated.append(asset["symbol"])
|
|
else:
|
|
failed.append({
|
|
"symbol": asset["symbol"],
|
|
"error": "No data returned"
|
|
})
|
|
|
|
except Exception as e:
|
|
api_calls += 1
|
|
failed.append({
|
|
"symbol": asset["symbol"],
|
|
"error": str(e)
|
|
})
|
|
|
|
completed_at = datetime.utcnow()
|
|
duration = int((completed_at - started_at).total_seconds() * 1000)
|
|
|
|
result = BatchResult(
|
|
batch_id=batch_id,
|
|
started_at=started_at,
|
|
completed_at=completed_at,
|
|
duration_ms=duration,
|
|
priority_updated=updated,
|
|
priority_failed=failed,
|
|
queue_processed=0,
|
|
queue_remaining=self.queue.size,
|
|
api_calls_used=api_calls,
|
|
rate_limit_waits=rate_waits,
|
|
errors=[f for f in failed]
|
|
)
|
|
|
|
self._last_batch_result = result
|
|
|
|
logger.info(
|
|
f"Batch {batch_id} completed in {duration}ms: "
|
|
f"{len(updated)} updated, {len(failed)} failed"
|
|
)
|
|
|
|
return result
|
|
|
|
async def _process_queue(self):
|
|
"""Process queued secondary assets"""
|
|
# Use remaining 2 API calls per minute
|
|
processed = 0
|
|
max_items = 2
|
|
|
|
for _ in range(max_items):
|
|
item = await self.queue.dequeue()
|
|
if not item:
|
|
break
|
|
|
|
try:
|
|
snapshot = await self.updater.update_asset(
|
|
symbol=item.symbol,
|
|
polygon_ticker=item.polygon_ticker,
|
|
asset_type=item.asset_type
|
|
)
|
|
|
|
processed += 1
|
|
|
|
if not snapshot:
|
|
await self.queue.requeue(item, "No data")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to update {item.symbol}: {e}")
|
|
await self.queue.requeue(item, str(e))
|
|
|
|
if processed > 0:
|
|
logger.debug(
|
|
f"Queue: processed {processed}, remaining {self.queue.size}"
|
|
)
|
|
|
|
async def _enqueue_secondary(self):
|
|
"""Enqueue secondary assets for gradual update"""
|
|
enqueued = 0
|
|
|
|
for asset in PriorityAssetsConfig.SECONDARY_ASSETS:
|
|
success = await self.queue.enqueue(
|
|
symbol=asset["symbol"],
|
|
polygon_ticker=asset["polygon_ticker"],
|
|
asset_type=asset["asset_type"],
|
|
priority=asset.get("priority", AssetPriority.MEDIUM)
|
|
)
|
|
if success:
|
|
enqueued += 1
|
|
|
|
if enqueued > 0:
|
|
logger.info(f"Enqueued {enqueued} secondary assets")
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get current orchestrator status"""
|
|
jobs = [
|
|
{
|
|
"id": job.id,
|
|
"name": job.name,
|
|
"next_run": job.next_run_time.isoformat()
|
|
if job.next_run_time else None,
|
|
}
|
|
for job in self.scheduler.get_jobs()
|
|
]
|
|
|
|
return {
|
|
"is_running": self._is_running,
|
|
"jobs": jobs,
|
|
"queue_size": self.queue.size,
|
|
"last_batch": {
|
|
"batch_id": self._last_batch_result.batch_id,
|
|
"completed_at": self._last_batch_result.completed_at.isoformat(),
|
|
"priority_updated": self._last_batch_result.priority_updated,
|
|
} if self._last_batch_result else None,
|
|
"priority_assets": PriorityAssetsConfig.get_priority_symbols(),
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## 4. Endpoints de API
|
|
|
|
### 4.1 Rutas del Batch
|
|
|
|
```python
|
|
# src/api/batch_routes.py
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
|
|
from typing import Dict, Any
|
|
|
|
from services.batch_orchestrator import BatchOrchestrator
|
|
from models.batch import BatchStatusResponse, BatchRunResponse
|
|
from api.dependencies import get_orchestrator, require_admin
|
|
|
|
router = APIRouter(prefix="/batch", tags=["Batch Operations"])
|
|
|
|
|
|
@router.get("/status", response_model=BatchStatusResponse)
|
|
async def get_batch_status(
|
|
orchestrator: BatchOrchestrator = Depends(get_orchestrator)
|
|
):
|
|
"""Get current batch job status"""
|
|
status = orchestrator.get_status()
|
|
rate_limiter = orchestrator.updater.rate_limiter
|
|
|
|
return BatchStatusResponse(
|
|
batch_id=status["last_batch"]["batch_id"] if status["last_batch"] else "",
|
|
status="running" if status["is_running"] else "stopped",
|
|
last_run=status["last_batch"]["completed_at"] if status["last_batch"] else None,
|
|
next_run=status["jobs"][0]["next_run"] if status["jobs"] else None,
|
|
priority_assets=status["priority_assets"],
|
|
queue_size=status["queue_size"],
|
|
api_calls_remaining=rate_limiter.get_remaining(),
|
|
)
|
|
|
|
|
|
@router.post("/run", response_model=BatchRunResponse)
|
|
async def run_batch_manually(
|
|
background_tasks: BackgroundTasks,
|
|
orchestrator: BatchOrchestrator = Depends(get_orchestrator),
|
|
_: None = Depends(require_admin) # Admin only
|
|
):
|
|
"""
|
|
Trigger batch job manually.
|
|
|
|
Requires admin authentication.
|
|
"""
|
|
try:
|
|
result = await orchestrator.run_manual_batch()
|
|
|
|
return BatchRunResponse(
|
|
success=len(result.priority_failed) == 0,
|
|
batch_id=result.batch_id,
|
|
result={
|
|
"duration_ms": result.duration_ms,
|
|
"updated": result.priority_updated,
|
|
"failed": result.priority_failed,
|
|
"api_calls_used": result.api_calls_used,
|
|
},
|
|
message=f"Batch completed: {len(result.priority_updated)} updated"
|
|
)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Batch execution failed: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/queue/stats")
|
|
async def get_queue_stats(
|
|
orchestrator: BatchOrchestrator = Depends(get_orchestrator)
|
|
) -> Dict[str, Any]:
|
|
"""Get queue statistics"""
|
|
return await orchestrator.queue.get_stats()
|
|
|
|
|
|
@router.get("/rate-limit")
|
|
async def get_rate_limit_status(
|
|
orchestrator: BatchOrchestrator = Depends(get_orchestrator)
|
|
) -> Dict[str, Any]:
|
|
"""Get rate limiter status"""
|
|
return orchestrator.updater.rate_limiter.get_stats()
|
|
```
|
|
|
|
---
|
|
|
|
## 5. Diagrama de Secuencia
|
|
|
|
### 5.1 Flujo de Batch Prioritario
|
|
|
|
```
|
|
┌──────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐ ┌────────┐
|
|
│Scheduler │ │Orchestrator │ │AssetUpdater │ │ Polygon │ │ DB │
|
|
└────┬─────┘ └──────┬──────┘ └──────┬──────┘ └────┬────┘ └───┬────┘
|
|
│ │ │ │ │
|
|
│ trigger batch │ │ │ │
|
|
│─────────────────▶│ │ │ │
|
|
│ │ │ │ │
|
|
│ │ loop [XAU, EURUSD, BTC] │ │
|
|
│ │───┐ │ │ │
|
|
│ │ │ update_asset │ │ │
|
|
│ │ │──────────────▶│ │ │
|
|
│ │ │ │ │ │
|
|
│ │ │ │ rate_limit.acquire() │
|
|
│ │ │ │────────┐ │ │
|
|
│ │ │ │ │ │ │
|
|
│ │ │ │◀───────┘ │ │
|
|
│ │ │ │ │ │
|
|
│ │ │ │ get_snapshot │ │
|
|
│ │ │ │────────────────▶│ │
|
|
│ │ │ │ │ │
|
|
│ │ │ │ snapshot │ │
|
|
│ │ │ │◀────────────────│ │
|
|
│ │ │ │ │ │
|
|
│ │ │ │ update trading.symbols │
|
|
│ │ │ │─────────────────────────────────▶
|
|
│ │ │ │ │ │
|
|
│ │ │ │ publish to Redis│ │
|
|
│ │ │ │────┐ │ │
|
|
│ │ │ │ │ │ │
|
|
│ │ │ │◀───┘ │ │
|
|
│ │ │ │ │ │
|
|
│ │ │ snapshot │ │ │
|
|
│ │ │◀──────────────│ │ │
|
|
│ │◀──┘ │ │ │
|
|
│ │ │ │ │
|
|
│ BatchResult │ │ │ │
|
|
│◀─────────────────│ │ │ │
|
|
│ │ │ │ │
|
|
```
|
|
|
|
---
|
|
|
|
## 6. Archivos a Crear/Modificar
|
|
|
|
### 6.1 Archivos Nuevos
|
|
|
|
| Archivo | Proposito |
|
|
|---------|-----------|
|
|
| `src/config/priority_assets.py` | Configuracion de activos prioritarios |
|
|
| `src/providers/rate_limiter.py` | Rate limiter mejorado |
|
|
| `src/services/priority_queue.py` | Cola de prioridad |
|
|
| `src/services/asset_updater.py` | Servicio de actualizacion |
|
|
| `src/services/batch_orchestrator.py` | Orquestador del batch |
|
|
| `src/api/batch_routes.py` | Endpoints de API |
|
|
| `src/models/batch.py` | Modelos de datos |
|
|
| `tests/test_priority_queue.py` | Tests de cola |
|
|
| `tests/test_asset_updater.py` | Tests de actualizador |
|
|
| `tests/test_batch_orchestrator.py` | Tests del orquestador |
|
|
|
|
### 6.2 Archivos a Modificar
|
|
|
|
| Archivo | Cambio |
|
|
|---------|--------|
|
|
| `src/main.py` | Agregar inicializacion del BatchOrchestrator |
|
|
| `src/app.py` | Registrar batch_routes |
|
|
| `src/config/settings.py` | Agregar configuracion de batch |
|
|
| `.env.example` | Agregar variables de batch |
|
|
|
|
---
|
|
|
|
## 7. Plan de Implementacion
|
|
|
|
### Fase 1: Infraestructura (2 SP)
|
|
1. Crear modelos de datos (`models/batch.py`)
|
|
2. Implementar RateLimiter mejorado
|
|
3. Implementar PriorityQueue
|
|
|
|
### Fase 2: Servicios Core (5 SP)
|
|
1. Implementar AssetUpdater
|
|
2. Implementar BatchOrchestrator
|
|
3. Configuracion de activos prioritarios
|
|
|
|
### Fase 3: API y Integracion (3 SP)
|
|
1. Crear endpoints de batch
|
|
2. Integrar con main.py
|
|
3. Actualizar configuracion
|
|
|
|
### Fase 4: Testing (3 SP)
|
|
1. Unit tests para cada componente
|
|
2. Integration tests
|
|
3. Manual testing con API real
|
|
|
|
**Total Story Points: 13**
|
|
|
|
---
|
|
|
|
## 8. Dependencias Externas
|
|
|
|
| Dependencia | Version | Uso |
|
|
|-------------|---------|-----|
|
|
| apscheduler | ^3.10.0 | Programacion de jobs |
|
|
| asyncpg | ^0.29.0 | PostgreSQL async |
|
|
| aioredis | ^2.0.0 | Redis async |
|
|
| aiohttp | ^3.9.0 | HTTP client (existente) |
|
|
|
|
---
|
|
|
|
## 9. Referencias
|
|
|
|
- [RF-DATA-001: Requerimiento Funcional](../requerimientos/RF-DATA-001-sincronizacion-batch-activos.md)
|
|
- [INT-DATA-003: Integracion Batch](../integraciones/INT-DATA-003-batch-actualizacion-activos.md)
|
|
- [INT-DATA-001: Data Service Base](../integraciones/INT-DATA-001-data-service.md)
|
|
- [Polygon.io API Docs](https://polygon.io/docs)
|
|
|
|
---
|
|
|
|
**Creado por:** Orquestador Agent
|
|
**Fecha:** 2026-01-04
|