| id |
title |
type |
status |
priority |
epic |
project |
version |
created_date |
updated_date |
| ET-DATA-001 |
Arquitectura del Proceso Batch con Priorizacion |
Technical Specification |
To Do |
Alta |
Transversal |
trading-platform |
1.0.0 |
2026-01-04 |
2026-01-04 |
ET-DATA-001: Arquitectura del Proceso Batch con Priorizacion
Metadata
| Campo |
Valor |
| ID |
ET-DATA-001 |
| Modulo |
Data Service |
| Tipo |
Especificacion Tecnica |
| Version |
1.0.0 |
| Estado |
Planificado |
| Implementa |
RF-DATA-001 |
| Ultima actualizacion |
2026-01-04 |
1. Proposito
Especificar la arquitectura tecnica y el diseno de implementacion del proceso batch de actualizacion de activos con sistema de priorizacion y cola, incluyendo diagramas de secuencia, estructura de codigo, y patrones de diseno utilizados.
2. Arquitectura del Sistema
2.1 Vista de Componentes
┌─────────────────────────────────────────────────────────────────────────────┐
│ DATA SERVICE (apps/data-service) │
│ │
│ src/ │
│ ├── api/ │
│ │ ├── routes.py # Rutas existentes │
│ │ └── batch_routes.py # [NEW] Endpoints de batch │
│ │ │
│ ├── config/ │
│ │ ├── settings.py # Configuracion existente │
│ │ └── priority_assets.py # [NEW] Configuracion de prioridades │
│ │ │
│ ├── providers/ │
│ │ ├── polygon_client.py # Cliente Polygon (existente) │
│ │ └── rate_limiter.py # [NEW] Rate limiter mejorado │
│ │ │
│ ├── services/ │
│ │ ├── sync_service.py # Sync existente │
│ │ ├── scheduler.py # Scheduler existente │
│ │ ├── priority_queue.py # [NEW] Cola de prioridad │
│ │ ├── asset_updater.py # [NEW] Servicio de actualizacion │
│ │ └── batch_orchestrator.py # [NEW] Orquestador del batch │
│ │ │
│ ├── models/ │
│ │ ├── market.py # Modelos existentes │
│ │ └── batch.py # [NEW] Modelos de batch │
│ │ │
│ └── main.py # Entry point (modificar) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
2.2 Diagrama de Clases
┌─────────────────────────┐ ┌─────────────────────────┐
│ BatchOrchestrator │ │ PriorityQueue │
├─────────────────────────┤ ├─────────────────────────┤
│ - asset_updater │────▶│ - _queue: List │
│ - priority_queue │ │ - _in_queue: Set │
│ - scheduler │ │ - max_size: int │
├─────────────────────────┤ ├─────────────────────────┤
│ + start() │ │ + enqueue() │
│ + stop() │ │ + dequeue() │
│ + run_priority_batch() │ │ + peek() │
│ + process_queue() │ │ + get_stats() │
└───────────┬─────────────┘ └─────────────────────────┘
│
│ uses
▼
┌─────────────────────────┐ ┌─────────────────────────┐
│ AssetUpdater │ │ RateLimiter │
├─────────────────────────┤ ├─────────────────────────┤
│ - polygon_client │────▶│ - limit: int │
│ - rate_limiter │ │ - window_seconds: int │
│ - db_pool │ │ - _calls: int │
│ - redis_client │ │ - _window_start: dt │
├─────────────────────────┤ ├─────────────────────────┤
│ + update_asset() │ │ + acquire() │
│ + update_priority() │ │ + wait_if_needed() │
│ + publish_event() │ │ + get_remaining() │
└───────────┬─────────────┘ └─────────────────────────┘
│
│ uses
▼
┌─────────────────────────┐
│ PolygonClient │
├─────────────────────────┤
│ (existente) │
│ - api_key │
│ - base_url │
├─────────────────────────┤
│ + get_snapshot_forex() │
│ + get_snapshot_crypto() │
│ + get_universal() │
└─────────────────────────┘
3. Diseno Detallado
3.1 Modelos de Datos
# src/models/batch.py
from datetime import datetime
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from pydantic import BaseModel
class AssetPriority(Enum):
"""Priority levels for asset updates"""
CRITICAL = 1 # XAU, EURUSD, BTC - Always first
HIGH = 2 # Major pairs
MEDIUM = 3 # Secondary crypto
LOW = 4 # Indices, minor pairs
class SyncStatus(Enum):
"""Status of asset sync"""
PENDING = "pending"
IN_PROGRESS = "in_progress"
SUCCESS = "success"
FAILED = "failed"
STALE = "stale" # No update in 15+ min
@dataclass(order=True)
class QueuedAsset:
"""Asset waiting in the update queue"""
priority: int
enqueued_at: datetime = field(compare=False)
symbol: str = field(compare=False)
polygon_ticker: str = field(compare=False)
asset_type: str = field(compare=False)
retry_count: int = field(default=0, compare=False)
last_error: Optional[str] = field(default=None, compare=False)
@dataclass
class AssetSnapshot:
"""Snapshot data from API"""
symbol: str
bid: float
ask: float
spread: float
last_price: float
daily_open: Optional[float]
daily_high: Optional[float]
daily_low: Optional[float]
daily_close: Optional[float]
daily_volume: Optional[float]
timestamp: datetime
source: str = "polygon"
@dataclass
class BatchResult:
"""Result of a batch execution"""
batch_id: str
started_at: datetime
completed_at: datetime
duration_ms: int
priority_updated: List[str]
priority_failed: List[Dict[str, str]]
queue_processed: int
queue_remaining: int
api_calls_used: int
rate_limit_waits: int
errors: List[Dict[str, Any]]
# Pydantic models for API responses
class BatchStatusResponse(BaseModel):
"""API response for batch status"""
batch_id: str
status: str
last_run: Optional[datetime]
next_run: Optional[datetime]
priority_assets: List[str]
queue_size: int
api_calls_remaining: int
class BatchRunResponse(BaseModel):
"""API response for manual batch run"""
success: bool
batch_id: str
result: Dict[str, Any]
message: str
3.2 Configuracion de Activos Prioritarios
# src/config/priority_assets.py
from typing import List, Dict
from models.batch import AssetPriority
class PriorityAssetsConfig:
"""Configuration for priority assets"""
# Critical assets - updated every batch cycle
PRIORITY_ASSETS: List[Dict] = [
{
"symbol": "XAUUSD",
"polygon_ticker": "C:XAUUSD",
"asset_type": "forex",
"name": "Gold Spot / US Dollar",
"priority": AssetPriority.CRITICAL,
"base_asset": "XAU",
"quote_asset": "USD",
"price_precision": 2,
},
{
"symbol": "EURUSD",
"polygon_ticker": "C:EURUSD",
"asset_type": "forex",
"name": "Euro / US Dollar",
"priority": AssetPriority.CRITICAL,
"base_asset": "EUR",
"quote_asset": "USD",
"price_precision": 5,
},
{
"symbol": "BTCUSD",
"polygon_ticker": "X:BTCUSD",
"asset_type": "crypto",
"name": "Bitcoin / US Dollar",
"priority": AssetPriority.CRITICAL,
"base_asset": "BTC",
"quote_asset": "USD",
"price_precision": 2,
},
]
# Secondary assets - queued for gradual update
SECONDARY_ASSETS: List[Dict] = [
{
"symbol": "ETHUSDT",
"polygon_ticker": "X:ETHUSDT",
"asset_type": "crypto",
"priority": AssetPriority.HIGH,
},
{
"symbol": "GBPUSD",
"polygon_ticker": "C:GBPUSD",
"asset_type": "forex",
"priority": AssetPriority.HIGH,
},
{
"symbol": "USDJPY",
"polygon_ticker": "C:USDJPY",
"asset_type": "forex",
"priority": AssetPriority.HIGH,
},
{
"symbol": "XAGUSD",
"polygon_ticker": "C:XAGUSD",
"asset_type": "forex", # Silver as commodity
"priority": AssetPriority.MEDIUM,
},
{
"symbol": "AUDUSD",
"polygon_ticker": "C:AUDUSD",
"asset_type": "forex",
"priority": AssetPriority.MEDIUM,
},
{
"symbol": "USDCAD",
"polygon_ticker": "C:USDCAD",
"asset_type": "forex",
"priority": AssetPriority.MEDIUM,
},
]
@classmethod
def get_priority_symbols(cls) -> List[str]:
"""Get list of priority symbol names"""
return [a["symbol"] for a in cls.PRIORITY_ASSETS]
@classmethod
def get_all_assets(cls) -> List[Dict]:
"""Get all configured assets"""
return cls.PRIORITY_ASSETS + cls.SECONDARY_ASSETS
3.3 Rate Limiter Mejorado
# src/providers/rate_limiter.py
import asyncio
from datetime import datetime, timedelta
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class RateLimiter:
"""
Token bucket rate limiter for API calls.
Features:
- Configurable calls per window
- Automatic waiting when limit reached
- Metrics tracking
"""
def __init__(
self,
calls_per_minute: int = 5,
window_seconds: int = 60
):
self.limit = calls_per_minute
self.window_seconds = window_seconds
self._calls = 0
self._window_start = datetime.utcnow()
self._lock = asyncio.Lock()
self._total_waits = 0
self._total_calls = 0
async def acquire(self) -> float:
"""
Acquire a rate limit token.
Returns:
Wait time in seconds (0 if no wait needed)
"""
async with self._lock:
now = datetime.utcnow()
elapsed = (now - self._window_start).total_seconds()
# Reset window if expired
if elapsed >= self.window_seconds:
self._calls = 0
self._window_start = now
elapsed = 0
# Check if we need to wait
if self._calls >= self.limit:
wait_time = self.window_seconds - elapsed
if wait_time > 0:
logger.info(
f"Rate limit reached ({self._calls}/{self.limit}), "
f"waiting {wait_time:.1f}s"
)
self._total_waits += 1
await asyncio.sleep(wait_time)
# Reset after wait
self._calls = 0
self._window_start = datetime.utcnow()
self._calls += 1
self._total_calls += 1
return 0
def get_remaining(self) -> int:
"""Get remaining calls in current window"""
now = datetime.utcnow()
elapsed = (now - self._window_start).total_seconds()
if elapsed >= self.window_seconds:
return self.limit
return max(0, self.limit - self._calls)
def get_reset_time(self) -> datetime:
"""Get when the current window resets"""
return self._window_start + timedelta(seconds=self.window_seconds)
def get_stats(self) -> dict:
"""Get rate limiter statistics"""
return {
"limit": self.limit,
"window_seconds": self.window_seconds,
"current_calls": self._calls,
"remaining": self.get_remaining(),
"reset_at": self.get_reset_time().isoformat(),
"total_calls": self._total_calls,
"total_waits": self._total_waits,
}
3.4 Cola de Prioridad
# src/services/priority_queue.py
import asyncio
import heapq
from datetime import datetime
from typing import Optional, List, Dict
import logging
from models.batch import QueuedAsset, AssetPriority
logger = logging.getLogger(__name__)
class PriorityQueue:
"""
Thread-safe priority queue for asset updates.
Uses min-heap with priority as key.
Critical (1) < High (2) < Medium (3) < Low (4)
"""
def __init__(self, max_size: int = 1000):
self._heap: List[QueuedAsset] = []
self._in_queue: set = set()
self.max_size = max_size
self._lock = asyncio.Lock()
self._total_enqueued = 0
self._total_processed = 0
async def enqueue(
self,
symbol: str,
polygon_ticker: str,
asset_type: str,
priority: AssetPriority = AssetPriority.MEDIUM
) -> bool:
"""
Add asset to queue if not already present.
Returns:
True if enqueued, False if already in queue or queue full
"""
async with self._lock:
if symbol in self._in_queue:
logger.debug(f"Asset {symbol} already in queue")
return False
if len(self._heap) >= self.max_size:
logger.warning(f"Queue full ({self.max_size}), dropping {symbol}")
return False
item = QueuedAsset(
priority=priority.value,
enqueued_at=datetime.utcnow(),
symbol=symbol,
polygon_ticker=polygon_ticker,
asset_type=asset_type
)
heapq.heappush(self._heap, item)
self._in_queue.add(symbol)
self._total_enqueued += 1
logger.debug(f"Enqueued {symbol} with priority {priority.name}")
return True
async def dequeue(self) -> Optional[QueuedAsset]:
"""
Get and remove highest priority asset.
Returns:
QueuedAsset or None if queue empty
"""
async with self._lock:
if not self._heap:
return None
item = heapq.heappop(self._heap)
self._in_queue.discard(item.symbol)
self._total_processed += 1
return item
async def requeue(
self,
item: QueuedAsset,
error: Optional[str] = None
) -> bool:
"""
Re-add failed item with lower priority.
Returns:
True if requeued, False if max retries reached
"""
if item.retry_count >= 3:
logger.warning(f"Max retries reached for {item.symbol}")
return False
item.retry_count += 1
item.last_error = error
item.priority = min(item.priority + 1, AssetPriority.LOW.value)
item.enqueued_at = datetime.utcnow()
async with self._lock:
if item.symbol not in self._in_queue:
heapq.heappush(self._heap, item)
self._in_queue.add(item.symbol)
return True
return False
async def peek(self) -> Optional[QueuedAsset]:
"""View next item without removing"""
async with self._lock:
return self._heap[0] if self._heap else None
@property
def size(self) -> int:
"""Current queue size"""
return len(self._heap)
@property
def is_empty(self) -> bool:
"""Check if queue is empty"""
return len(self._heap) == 0
async def get_stats(self) -> Dict:
"""Get queue statistics"""
async with self._lock:
priority_counts = {p.name: 0 for p in AssetPriority}
oldest_age = 0
for item in self._heap:
priority_name = AssetPriority(item.priority).name
priority_counts[priority_name] += 1
if self._heap:
oldest = min(self._heap, key=lambda x: x.enqueued_at)
oldest_age = (datetime.utcnow() - oldest.enqueued_at).total_seconds()
return {
"size": len(self._heap),
"max_size": self.max_size,
"by_priority": priority_counts,
"oldest_age_seconds": oldest_age,
"total_enqueued": self._total_enqueued,
"total_processed": self._total_processed,
}
async def clear(self):
"""Clear the queue"""
async with self._lock:
self._heap.clear()
self._in_queue.clear()
3.5 Servicio de Actualizacion
# src/services/asset_updater.py
import asyncio
from datetime import datetime
from typing import Optional, Dict, List, Any
import json
import logging
from providers.polygon_client import PolygonClient, AssetType, TickerSnapshot
from providers.rate_limiter import RateLimiter
from models.batch import AssetSnapshot, SyncStatus
logger = logging.getLogger(__name__)
class AssetUpdater:
"""
Service for updating asset data from Polygon API to PostgreSQL.
Responsibilities:
- Fetch snapshots from API
- Update database tables
- Publish Redis events
- Track sync status
"""
def __init__(
self,
polygon_client: PolygonClient,
rate_limiter: RateLimiter,
db_pool,
redis_client
):
self.polygon = polygon_client
self.rate_limiter = rate_limiter
self.db = db_pool
self.redis = redis_client
async def update_asset(
self,
symbol: str,
polygon_ticker: str,
asset_type: str
) -> Optional[AssetSnapshot]:
"""
Update a single asset from API.
Args:
symbol: Asset symbol (e.g., XAUUSD)
polygon_ticker: Polygon ticker (e.g., C:XAUUSD)
asset_type: Type (forex, crypto, index)
Returns:
AssetSnapshot if successful, None otherwise
"""
try:
# Acquire rate limit token
await self.rate_limiter.acquire()
# Fetch from API
snapshot = await self._fetch_snapshot(polygon_ticker, asset_type)
if not snapshot:
await self._update_sync_status(symbol, SyncStatus.FAILED, "No data")
return None
# Convert to our model
asset_snapshot = self._convert_snapshot(symbol, snapshot)
# Update database
await self._update_database(symbol, asset_snapshot)
# Update sync status
await self._update_sync_status(symbol, SyncStatus.SUCCESS)
# Publish event
await self._publish_update(symbol, asset_snapshot)
logger.info(f"Updated {symbol}: {asset_snapshot.last_price}")
return asset_snapshot
except Exception as e:
logger.error(f"Error updating {symbol}: {e}")
await self._update_sync_status(symbol, SyncStatus.FAILED, str(e))
raise
async def _fetch_snapshot(
self,
polygon_ticker: str,
asset_type: str
) -> Optional[TickerSnapshot]:
"""Fetch snapshot from Polygon API"""
try:
if asset_type == "forex":
return await self.polygon.get_snapshot_forex(polygon_ticker)
elif asset_type == "crypto":
return await self.polygon.get_snapshot_crypto(polygon_ticker)
else:
logger.warning(f"Unknown asset type: {asset_type}")
return None
except Exception as e:
logger.error(f"API error for {polygon_ticker}: {e}")
raise
def _convert_snapshot(
self,
symbol: str,
snapshot: TickerSnapshot
) -> AssetSnapshot:
"""Convert Polygon snapshot to our model"""
spread = snapshot.ask - snapshot.bid if snapshot.ask and snapshot.bid else 0
return AssetSnapshot(
symbol=symbol,
bid=snapshot.bid or 0,
ask=snapshot.ask or 0,
spread=spread,
last_price=snapshot.last_price or 0,
daily_open=snapshot.daily_open,
daily_high=snapshot.daily_high,
daily_low=snapshot.daily_low,
daily_close=snapshot.daily_close,
daily_volume=snapshot.daily_volume,
timestamp=snapshot.timestamp or datetime.utcnow(),
source="polygon"
)
async def _update_database(
self,
symbol: str,
snapshot: AssetSnapshot
):
"""Update asset data in PostgreSQL"""
async with self.db.acquire() as conn:
# Update trading.symbols metadata
metadata = {
"last_update": {
"bid": snapshot.bid,
"ask": snapshot.ask,
"spread": snapshot.spread,
"last_price": snapshot.last_price,
"daily_open": snapshot.daily_open,
"daily_high": snapshot.daily_high,
"daily_low": snapshot.daily_low,
"daily_close": snapshot.daily_close,
"timestamp": snapshot.timestamp.isoformat(),
"source": snapshot.source
}
}
await conn.execute("""
UPDATE trading.symbols
SET
updated_at = NOW(),
metadata = COALESCE(metadata, '{}'::jsonb) || $2::jsonb
WHERE symbol = $1
""", symbol, json.dumps(metadata))
async def _update_sync_status(
self,
symbol: str,
status: SyncStatus,
error: Optional[str] = None
):
"""Update sync status in database"""
async with self.db.acquire() as conn:
await conn.execute("""
INSERT INTO data_sources.data_sync_status
(ticker_id, provider_id, last_sync_timestamp,
sync_status, error_message, updated_at)
SELECT
s.id,
(SELECT id FROM data_sources.api_providers
WHERE code = 'polygon' LIMIT 1),
NOW(),
$2,
$3,
NOW()
FROM trading.symbols s
WHERE s.symbol = $1
ON CONFLICT (ticker_id, provider_id)
DO UPDATE SET
last_sync_timestamp = NOW(),
sync_status = $2,
error_message = $3,
updated_at = NOW()
""", symbol, status.value, error)
async def _publish_update(
self,
symbol: str,
snapshot: AssetSnapshot
):
"""Publish update event via Redis Pub/Sub"""
channel = f"asset:update:{symbol}"
message = json.dumps({
"type": "price_update",
"symbol": symbol,
"data": {
"bid": snapshot.bid,
"ask": snapshot.ask,
"spread": snapshot.spread,
"last_price": snapshot.last_price,
"timestamp": snapshot.timestamp.isoformat()
}
})
await self.redis.publish(channel, message)
3.6 Orquestador del Batch
# src/services/batch_orchestrator.py
import asyncio
import uuid
from datetime import datetime
from typing import Dict, List, Any, Optional
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from config.priority_assets import PriorityAssetsConfig
from services.priority_queue import PriorityQueue
from services.asset_updater import AssetUpdater
from models.batch import BatchResult, AssetPriority
logger = logging.getLogger(__name__)
class BatchOrchestrator:
"""
Orchestrates the batch update process.
Responsibilities:
- Schedule batch jobs
- Coordinate priority updates
- Manage asset queue
- Track batch metrics
"""
def __init__(
self,
asset_updater: AssetUpdater,
priority_queue: PriorityQueue,
batch_interval_minutes: int = 5
):
self.updater = asset_updater
self.queue = priority_queue
self.interval = batch_interval_minutes
self.scheduler = AsyncIOScheduler()
self._is_running = False
self._last_batch_result: Optional[BatchResult] = None
async def start(self):
"""Start the batch orchestrator"""
if self._is_running:
logger.warning("Orchestrator already running")
return
logger.info("Starting Batch Orchestrator")
# Priority batch - every 5 minutes
self.scheduler.add_job(
self._run_priority_batch,
trigger=IntervalTrigger(minutes=self.interval),
id="priority_batch",
name="Priority Assets Batch (XAU, EURUSD, BTC)",
replace_existing=True,
max_instances=1,
next_run_time=datetime.now()
)
# Queue processor - every 15 seconds
self.scheduler.add_job(
self._process_queue,
trigger=IntervalTrigger(seconds=15),
id="queue_processor",
name="Process Secondary Assets Queue",
replace_existing=True,
max_instances=1
)
# Enqueue secondary assets - every 30 minutes
self.scheduler.add_job(
self._enqueue_secondary,
trigger=IntervalTrigger(minutes=30),
id="enqueue_secondary",
name="Enqueue Secondary Assets",
replace_existing=True,
max_instances=1,
next_run_time=datetime.now()
)
self.scheduler.start()
self._is_running = True
logger.info(
f"Orchestrator started with {len(self.scheduler.get_jobs())} jobs"
)
async def stop(self):
"""Stop the orchestrator"""
if not self._is_running:
return
self.scheduler.shutdown(wait=True)
self._is_running = False
logger.info("Orchestrator stopped")
async def run_manual_batch(self) -> BatchResult:
"""Run batch manually (via API call)"""
return await self._run_priority_batch()
async def _run_priority_batch(self) -> BatchResult:
"""Execute priority batch job"""
batch_id = str(uuid.uuid4())[:8]
started_at = datetime.utcnow()
logger.info(f"=== Priority Batch {batch_id} Started ===")
updated = []
failed = []
api_calls = 0
rate_waits = 0
for asset in PriorityAssetsConfig.PRIORITY_ASSETS:
try:
snapshot = await self.updater.update_asset(
symbol=asset["symbol"],
polygon_ticker=asset["polygon_ticker"],
asset_type=asset["asset_type"]
)
api_calls += 1
if snapshot:
updated.append(asset["symbol"])
else:
failed.append({
"symbol": asset["symbol"],
"error": "No data returned"
})
except Exception as e:
api_calls += 1
failed.append({
"symbol": asset["symbol"],
"error": str(e)
})
completed_at = datetime.utcnow()
duration = int((completed_at - started_at).total_seconds() * 1000)
result = BatchResult(
batch_id=batch_id,
started_at=started_at,
completed_at=completed_at,
duration_ms=duration,
priority_updated=updated,
priority_failed=failed,
queue_processed=0,
queue_remaining=self.queue.size,
api_calls_used=api_calls,
rate_limit_waits=rate_waits,
errors=[f for f in failed]
)
self._last_batch_result = result
logger.info(
f"Batch {batch_id} completed in {duration}ms: "
f"{len(updated)} updated, {len(failed)} failed"
)
return result
async def _process_queue(self):
"""Process queued secondary assets"""
# Use remaining 2 API calls per minute
processed = 0
max_items = 2
for _ in range(max_items):
item = await self.queue.dequeue()
if not item:
break
try:
snapshot = await self.updater.update_asset(
symbol=item.symbol,
polygon_ticker=item.polygon_ticker,
asset_type=item.asset_type
)
processed += 1
if not snapshot:
await self.queue.requeue(item, "No data")
except Exception as e:
logger.warning(f"Failed to update {item.symbol}: {e}")
await self.queue.requeue(item, str(e))
if processed > 0:
logger.debug(
f"Queue: processed {processed}, remaining {self.queue.size}"
)
async def _enqueue_secondary(self):
"""Enqueue secondary assets for gradual update"""
enqueued = 0
for asset in PriorityAssetsConfig.SECONDARY_ASSETS:
success = await self.queue.enqueue(
symbol=asset["symbol"],
polygon_ticker=asset["polygon_ticker"],
asset_type=asset["asset_type"],
priority=asset.get("priority", AssetPriority.MEDIUM)
)
if success:
enqueued += 1
if enqueued > 0:
logger.info(f"Enqueued {enqueued} secondary assets")
def get_status(self) -> Dict[str, Any]:
"""Get current orchestrator status"""
jobs = [
{
"id": job.id,
"name": job.name,
"next_run": job.next_run_time.isoformat()
if job.next_run_time else None,
}
for job in self.scheduler.get_jobs()
]
return {
"is_running": self._is_running,
"jobs": jobs,
"queue_size": self.queue.size,
"last_batch": {
"batch_id": self._last_batch_result.batch_id,
"completed_at": self._last_batch_result.completed_at.isoformat(),
"priority_updated": self._last_batch_result.priority_updated,
} if self._last_batch_result else None,
"priority_assets": PriorityAssetsConfig.get_priority_symbols(),
}
4. Endpoints de API
4.1 Rutas del Batch
# src/api/batch_routes.py
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from typing import Dict, Any
from services.batch_orchestrator import BatchOrchestrator
from models.batch import BatchStatusResponse, BatchRunResponse
from api.dependencies import get_orchestrator, require_admin
router = APIRouter(prefix="/batch", tags=["Batch Operations"])
@router.get("/status", response_model=BatchStatusResponse)
async def get_batch_status(
orchestrator: BatchOrchestrator = Depends(get_orchestrator)
):
"""Get current batch job status"""
status = orchestrator.get_status()
rate_limiter = orchestrator.updater.rate_limiter
return BatchStatusResponse(
batch_id=status["last_batch"]["batch_id"] if status["last_batch"] else "",
status="running" if status["is_running"] else "stopped",
last_run=status["last_batch"]["completed_at"] if status["last_batch"] else None,
next_run=status["jobs"][0]["next_run"] if status["jobs"] else None,
priority_assets=status["priority_assets"],
queue_size=status["queue_size"],
api_calls_remaining=rate_limiter.get_remaining(),
)
@router.post("/run", response_model=BatchRunResponse)
async def run_batch_manually(
background_tasks: BackgroundTasks,
orchestrator: BatchOrchestrator = Depends(get_orchestrator),
_: None = Depends(require_admin) # Admin only
):
"""
Trigger batch job manually.
Requires admin authentication.
"""
try:
result = await orchestrator.run_manual_batch()
return BatchRunResponse(
success=len(result.priority_failed) == 0,
batch_id=result.batch_id,
result={
"duration_ms": result.duration_ms,
"updated": result.priority_updated,
"failed": result.priority_failed,
"api_calls_used": result.api_calls_used,
},
message=f"Batch completed: {len(result.priority_updated)} updated"
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Batch execution failed: {str(e)}"
)
@router.get("/queue/stats")
async def get_queue_stats(
orchestrator: BatchOrchestrator = Depends(get_orchestrator)
) -> Dict[str, Any]:
"""Get queue statistics"""
return await orchestrator.queue.get_stats()
@router.get("/rate-limit")
async def get_rate_limit_status(
orchestrator: BatchOrchestrator = Depends(get_orchestrator)
) -> Dict[str, Any]:
"""Get rate limiter status"""
return orchestrator.updater.rate_limiter.get_stats()
5. Diagrama de Secuencia
5.1 Flujo de Batch Prioritario
┌──────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐ ┌────────┐
│Scheduler │ │Orchestrator │ │AssetUpdater │ │ Polygon │ │ DB │
└────┬─────┘ └──────┬──────┘ └──────┬──────┘ └────┬────┘ └───┬────┘
│ │ │ │ │
│ trigger batch │ │ │ │
│─────────────────▶│ │ │ │
│ │ │ │ │
│ │ loop [XAU, EURUSD, BTC] │ │
│ │───┐ │ │ │
│ │ │ update_asset │ │ │
│ │ │──────────────▶│ │ │
│ │ │ │ │ │
│ │ │ │ rate_limit.acquire() │
│ │ │ │────────┐ │ │
│ │ │ │ │ │ │
│ │ │ │◀───────┘ │ │
│ │ │ │ │ │
│ │ │ │ get_snapshot │ │
│ │ │ │────────────────▶│ │
│ │ │ │ │ │
│ │ │ │ snapshot │ │
│ │ │ │◀────────────────│ │
│ │ │ │ │ │
│ │ │ │ update trading.symbols │
│ │ │ │─────────────────────────────────▶
│ │ │ │ │ │
│ │ │ │ publish to Redis│ │
│ │ │ │────┐ │ │
│ │ │ │ │ │ │
│ │ │ │◀───┘ │ │
│ │ │ │ │ │
│ │ │ snapshot │ │ │
│ │ │◀──────────────│ │ │
│ │◀──┘ │ │ │
│ │ │ │ │
│ BatchResult │ │ │ │
│◀─────────────────│ │ │ │
│ │ │ │ │
6. Archivos a Crear/Modificar
6.1 Archivos Nuevos
| Archivo |
Proposito |
src/config/priority_assets.py |
Configuracion de activos prioritarios |
src/providers/rate_limiter.py |
Rate limiter mejorado |
src/services/priority_queue.py |
Cola de prioridad |
src/services/asset_updater.py |
Servicio de actualizacion |
src/services/batch_orchestrator.py |
Orquestador del batch |
src/api/batch_routes.py |
Endpoints de API |
src/models/batch.py |
Modelos de datos |
tests/test_priority_queue.py |
Tests de cola |
tests/test_asset_updater.py |
Tests de actualizador |
tests/test_batch_orchestrator.py |
Tests del orquestador |
6.2 Archivos a Modificar
| Archivo |
Cambio |
src/main.py |
Agregar inicializacion del BatchOrchestrator |
src/app.py |
Registrar batch_routes |
src/config/settings.py |
Agregar configuracion de batch |
.env.example |
Agregar variables de batch |
7. Plan de Implementacion
Fase 1: Infraestructura (2 SP)
- Crear modelos de datos (
models/batch.py)
- Implementar RateLimiter mejorado
- Implementar PriorityQueue
Fase 2: Servicios Core (5 SP)
- Implementar AssetUpdater
- Implementar BatchOrchestrator
- Configuracion de activos prioritarios
Fase 3: API y Integracion (3 SP)
- Crear endpoints de batch
- Integrar con main.py
- Actualizar configuracion
Fase 4: Testing (3 SP)
- Unit tests para cada componente
- Integration tests
- Manual testing con API real
Total Story Points: 13
8. Dependencias Externas
| Dependencia |
Version |
Uso |
| apscheduler |
^3.10.0 |
Programacion de jobs |
| asyncpg |
^0.29.0 |
PostgreSQL async |
| aioredis |
^2.0.0 |
Redis async |
| aiohttp |
^3.9.0 |
HTTP client (existente) |
9. Referencias
Creado por: Orquestador Agent
Fecha: 2026-01-04