--- id: "PLAN-BATCH-001" title: "Plan de Desarrollo - Batch de Actualizacion de Activos" type: "Development Plan" project: "trading-platform" version: "1.0.0" status: "Aprobado" created_date: "2026-01-04" updated_date: "2026-01-04" --- # Plan de Desarrollo: Batch de Actualizacion de Activos con Priorizacion ## Resumen Ejecutivo | Campo | Valor | |-------|-------| | **Objetivo** | Implementar proceso batch de actualizacion de activos con priorizacion | | **Modulo** | Data Service (apps/data-service) | | **Story Points Totales** | 13 SP | | **Fases** | 4 | | **Perfiles Requeridos** | Backend (Python), Database | | **Dependencias** | PolygonClient existente, PostgreSQL, Redis | --- ## 1. Alcance del Desarrollo ### 1.1 Funcionalidades a Implementar | ID | Funcionalidad | Prioridad | SP | |----|---------------|-----------|-----| | F1 | Cola de prioridad para activos | Alta | 2 | | F2 | Rate limiter mejorado | Alta | 2 | | F3 | Servicio de actualizacion de activos | Alta | 3 | | F4 | Orquestador del batch | Alta | 3 | | F5 | Endpoints de API | Media | 2 | | F6 | Tests unitarios e integracion | Media | 1 | ### 1.2 Fuera de Alcance - Migracion a cuenta de pago de Polygon.io - WebSocket streaming de precios - Dashboard de monitoreo en frontend - Integracion con alertas de Slack/Email --- ## 2. Fases de Desarrollo ### Fase 1: Infraestructura Base (2 SP) **Objetivo:** Crear modelos de datos y componentes base **Tareas:** | # | Tarea | Archivo | Esfuerzo | |---|-------|---------|----------| | 1.1 | Crear modelos de datos Pydantic | `src/models/batch.py` | 0.5 SP | | 1.2 | Implementar RateLimiter mejorado | `src/providers/rate_limiter.py` | 0.5 SP | | 1.3 | Implementar PriorityQueue | `src/services/priority_queue.py` | 0.5 SP | | 1.4 | Crear configuracion de activos | `src/config/priority_assets.py` | 0.5 SP | **Entregables:** - Modelos de datos para batch, cola, snapshots - Rate limiter con token bucket - Cola de prioridad thread-safe - Configuracion de XAU, EURUSD, BTC **Criterios de Aceptacion:** ```python # Test rate limiter limiter = RateLimiter(calls_per_minute=5) assert limiter.get_remaining() == 5 await limiter.acquire() assert limiter.get_remaining() == 4 # Test priority queue queue = PriorityQueue() await queue.enqueue("ETH", "X:ETH", "crypto", AssetPriority.LOW) await queue.enqueue("XAU", "C:XAU", "forex", AssetPriority.CRITICAL) item = await queue.dequeue() assert item.symbol == "XAU" # Critical first ``` --- ### Fase 2: Servicios Core (5 SP) **Objetivo:** Implementar logica de negocio principal **Tareas:** | # | Tarea | Archivo | Esfuerzo | |---|-------|---------|----------| | 2.1 | Implementar AssetUpdater | `src/services/asset_updater.py` | 2 SP | | 2.2 | Implementar BatchOrchestrator | `src/services/batch_orchestrator.py` | 2 SP | | 2.3 | Integracion con PolygonClient | - | 0.5 SP | | 2.4 | Integracion con PostgreSQL | - | 0.5 SP | **Entregables:** - Servicio para actualizar activos individuales - Orquestador con APScheduler - Publicacion de eventos Redis **Criterios de Aceptacion:** ```python # Test asset update updater = AssetUpdater(polygon, rate_limiter, db, redis) snapshot = await updater.update_asset("XAUUSD", "C:XAUUSD", "forex") assert snapshot is not None assert snapshot.bid > 0 assert snapshot.ask > snapshot.bid # Test batch orchestrator orchestrator = BatchOrchestrator(updater, queue) result = await orchestrator.run_manual_batch() assert "XAUUSD" in result.priority_updated assert "EURUSD" in result.priority_updated assert "BTCUSD" in result.priority_updated ``` --- ### Fase 3: API e Integracion (3 SP) **Objetivo:** Exponer endpoints y conectar al sistema **Tareas:** | # | Tarea | Archivo | Esfuerzo | |---|-------|---------|----------| | 3.1 | Crear endpoints REST | `src/api/batch_routes.py` | 1 SP | | 3.2 | Modificar main.py | `src/main.py` | 0.5 SP | | 3.3 | Actualizar configuracion | `src/config/settings.py` | 0.5 SP | | 3.4 | Documentar API | `docs/api/batch.md` | 0.5 SP | | 3.5 | Actualizar .env.example | `.env.example` | 0.5 SP | **Entregables:** - Endpoints: `/batch/status`, `/batch/run`, `/batch/queue/stats` - Inicializacion automatica al arrancar - Variables de entorno documentadas **Endpoints:** ``` GET /api/data/batch/status - Estado del batch POST /api/data/batch/run - Ejecutar batch manual (admin) GET /api/data/batch/queue/stats - Estadisticas de cola GET /api/data/batch/rate-limit - Estado del rate limiter ``` --- ### Fase 4: Testing y Validacion (3 SP) **Objetivo:** Asegurar calidad y funcionamiento correcto **Tareas:** | # | Tarea | Archivo | Esfuerzo | |---|-------|---------|----------| | 4.1 | Tests unitarios - PriorityQueue | `tests/test_priority_queue.py` | 0.5 SP | | 4.2 | Tests unitarios - RateLimiter | `tests/test_rate_limiter.py` | 0.5 SP | | 4.3 | Tests unitarios - AssetUpdater | `tests/test_asset_updater.py` | 0.5 SP | | 4.4 | Tests integracion - Batch | `tests/test_batch_integration.py` | 1 SP | | 4.5 | Pruebas manuales con API real | - | 0.5 SP | **Cobertura Minima:** 80% **Casos de Prueba Criticos:** 1. **Rate Limiting:** Verificar que no excede 5 calls/min 2. **Priorizacion:** XAU, EURUSD, BTC siempre primero 3. **Cola:** Items procesados en orden de prioridad 4. **Reintentos:** Activos fallidos se reencolan max 3 veces 5. **Base de Datos:** trading.symbols actualizada correctamente 6. **Redis Events:** Eventos publicados en canales correctos --- ## 3. Secuencia de Implementacion ``` Semana 1: ├── Dia 1-2: Fase 1 (Infraestructura) │ ├── Modelos de datos │ ├── RateLimiter │ ├── PriorityQueue │ └── Configuracion de activos │ ├── Dia 3-4: Fase 2 (Servicios Core) │ ├── AssetUpdater │ └── BatchOrchestrator │ └── Dia 5: Fase 3 (API) ├── Endpoints REST └── Integracion main.py Semana 2: ├── Dia 1-2: Fase 4 (Testing) │ ├── Tests unitarios │ └── Tests integracion │ └── Dia 3: Validacion Final ├── Pruebas con API real └── Documentacion ``` --- ## 4. Configuracion del Entorno ### 4.1 Variables de Entorno ```bash # .env - Batch Configuration # Polygon.io API POLYGON_API_KEY=f09bA2V7OG7bHn4HxIT6Xs45ujg_pRXk POLYGON_BASE_URL=https://api.polygon.io POLYGON_RATE_LIMIT=5 POLYGON_TIER=free # Batch Settings BATCH_INTERVAL_MINUTES=5 BATCH_AUTO_START=true BATCH_PRIORITY_ENABLED=true # Queue Settings QUEUE_MAX_SIZE=1000 QUEUE_RETRY_MAX=3 QUEUE_RETRY_DELAY_SECONDS=60 # Redis REDIS_URL=redis://localhost:6379/0 # Database DB_HOST=localhost DB_PORT=5432 DB_NAME=trading_data DB_USER=trading_user DB_PASSWORD=trading_dev_2025 ``` ### 4.2 Dependencias Python ```txt # requirements.txt - Nuevas dependencias apscheduler>=3.10.0 # Job scheduling aioredis>=2.0.0 # Redis async (si no existe) ``` --- ## 5. Riesgos y Mitigacion | Riesgo | Probabilidad | Impacto | Mitigacion | |--------|--------------|---------|------------| | API Polygon no disponible | Baja | Alto | Implementar circuit breaker, cache fallback | | Rate limit excedido | Media | Medio | Rate limiter robusto con espera automatica | | Datos inconsistentes en DB | Baja | Alto | Transacciones atomicas, validacion de datos | | Scheduler no arranca | Baja | Alto | Health check, reinicio automatico | | Redis no disponible | Baja | Medio | Graceful degradation, logs sin eventos | --- ## 6. Metricas de Exito | Metrica | Objetivo | Medicion | |---------|----------|----------| | **Uptime del Batch** | > 99.5% | Logs de ejecucion | | **Latencia por Activo** | < 2s | Metricas internas | | **Tasa de Exito Priority** | > 99% | Conteo updated/failed | | **Cobertura de Tests** | > 80% | pytest-cov | | **Tiempo de Ciclo Completo** | < 60s | Duracion del batch | --- ## 7. Comandos de Desarrollo ```bash # Instalar dependencias cd apps/data-service pip install -r requirements.txt # Ejecutar tests pytest tests/ -v --cov=src # Ejecutar servicio en desarrollo python src/main.py # Verificar batch manualmente curl -X POST http://localhost:3084/api/data/batch/run \ -H "Authorization: Bearer $ADMIN_TOKEN" # Ver estado del batch curl http://localhost:3084/api/data/batch/status # Ver estadisticas de cola curl http://localhost:3084/api/data/batch/queue/stats ``` --- ## 8. Checklist de Entrega ### Fase 1 - [ ] `src/models/batch.py` creado y documentado - [ ] `src/providers/rate_limiter.py` implementado - [ ] `src/services/priority_queue.py` implementado - [ ] `src/config/priority_assets.py` configurado ### Fase 2 - [ ] `src/services/asset_updater.py` implementado - [ ] `src/services/batch_orchestrator.py` implementado - [ ] Integracion con PolygonClient funcionando - [ ] Actualizacion de base de datos funcionando ### Fase 3 - [ ] `src/api/batch_routes.py` implementado - [ ] `src/main.py` modificado para iniciar orchestrator - [ ] `.env.example` actualizado - [ ] Endpoints probados manualmente ### Fase 4 - [ ] Tests unitarios pasando (> 80% cobertura) - [ ] Tests de integracion pasando - [ ] Prueba con API real de Polygon exitosa - [ ] Documentacion actualizada ### Final - [ ] Code review completado - [ ] Merge a develop - [ ] Despliegue en ambiente de desarrollo --- ## 9. Perfiles y Responsabilidades | Perfil | Responsabilidad | Tareas | |--------|-----------------|--------| | **Backend Python** | Implementacion core | Fases 1, 2, 3 | | **Database** | Verificacion de queries | Revision de updates | | **QA/Testing** | Tests y validacion | Fase 4 | | **DevOps** | Configuracion de entorno | Variables, despliegue | --- ## 10. Referencias - [RF-DATA-001: Requerimiento Funcional](../requerimientos/RF-DATA-001-sincronizacion-batch-activos.md) - [ET-DATA-001: Especificacion Tecnica](../especificaciones/ET-DATA-001-arquitectura-batch-priorizacion.md) - [INT-DATA-003: Documento de Integracion](../integraciones/INT-DATA-003-batch-actualizacion-activos.md) - [Polygon.io API Documentation](https://polygon.io/docs) --- **Aprobado por:** Orquestador Agent **Fecha de Aprobacion:** 2026-01-04 **Proxima Revision:** Al completar Fase 1