--- id: "ADR-004" title: "Sistema de Cola de Tareas" type: "ADR" status: "Accepted" date: "2025-12-08" deciders: ["Architecture Team"] tags: ["queue", "bullmq", "redis", "async", "workers"] project: "platform_marketing_content" created_date: "2025-12-08" updated_date: "2026-01-04" --- # ADR-004: Sistema de Cola de Tareas **Fecha:** 2025-12-08 **Estado:** Aceptado --- ## Contexto La generación de imágenes y otras tareas pesadas requieren: 1. Procesamiento asíncrono (no bloquear requests) 2. Manejo de prioridades 3. Reintentos automáticos 4. Monitoreo de estado 5. Escalabilidad (múltiples workers) --- ## Decisión ### Sistema: Bull/BullMQ con Redis Usaremos **BullMQ** (versión moderna de Bull) sobre **Redis** para gestionar la cola de tareas. ### Arquitectura ``` ┌──────────────┐ │ API │ │ Server │ └──────┬───────┘ │ enqueue ▼ ┌──────────────┐ ┌──────────────┐ │ Redis │◀───▶│ Worker 1 │──▶ ComfyUI │ Queue │ └──────────────┘ │ │ ┌──────────────┐ │ │◀───▶│ Worker 2 │──▶ ComfyUI │ │ └──────────────┘ └──────────────┘ ┌──────────────┐ │ Worker N │──▶ Text Gen └──────────────┘ ``` ### Colas Definidas ```typescript // Colas por tipo de tarea const queues = { 'generation:image': { priority: true, concurrency: 2, // Limitado por GPU VRAM }, 'generation:text': { concurrency: 10, // API calls pueden ser paralelos }, 'generation:training': { concurrency: 1, // Un training a la vez limiter: { max: 5, duration: 86400000, // 5 por día } }, 'assets:processing': { concurrency: 5, // Thumbnails, conversiones }, 'notifications:email': { concurrency: 5, } }; ``` ### Prioridades ```typescript enum JobPriority { URGENT = 1, // Admin/sistema HIGH = 3, // Usuarios premium NORMAL = 5, // Usuarios regulares LOW = 7, // Batch/background BACKGROUND = 10 // Mantenimiento } ``` ### Configuración de Reintentos ```typescript const defaultJobOptions = { attempts: 3, backoff: { type: 'exponential', delay: 5000, // 5s, 10s, 20s }, removeOnComplete: { age: 3600, // 1 hora count: 1000, // Últimos 1000 }, removeOnFail: { age: 86400, // 24 horas } }; ``` --- ## Implementación ### Productor (API) ```typescript @Injectable() export class GenerationService { constructor( @InjectQueue('generation:image') private imageQueue: Queue, ) {} async createImageJob(data: CreateImageJobDto, userId: string) { const job = await this.imageQueue.add('generate', { ...data, userId, tenantId: getCurrentTenant(), }, { priority: this.calculatePriority(userId), jobId: uuid(), }); return { jobId: job.id }; } } ``` ### Consumidor (Worker) ```typescript @Processor('generation:image') export class ImageGenerationProcessor { @Process('generate') async handleGeneration(job: Job) { await job.updateProgress(10); // Llamar a ComfyUI const result = await this.comfyui.execute(job.data); await job.updateProgress(90); // Guardar resultado await this.assetsService.createFromGeneration(result); await job.updateProgress(100); return { assetId: result.id }; } @OnWorkerEvent('failed') onFailed(job: Job, error: Error) { this.logger.error(`Job ${job.id} failed: ${error.message}`); // Notificar al usuario si excede reintentos } } ``` ### Monitoreo con Bull Board ```typescript // main.ts import { createBullBoard } from '@bull-board/api'; import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; import { ExpressAdapter } from '@bull-board/express'; const serverAdapter = new ExpressAdapter(); createBullBoard({ queues: [ new BullMQAdapter(imageQueue), new BullMQAdapter(textQueue), ], serverAdapter, }); app.use('/admin/queues', serverAdapter.getRouter()); ``` --- ## Consecuencias ### Positivas - **Desacoplado:** API no espera por generación - **Resiliente:** Jobs sobreviven reinicios - **Escalable:** Agregar workers es trivial - **Observable:** Bull Board para monitoreo - **Probado:** Bull es muy estable y popular - **Simple:** Redis ya lo usamos para cache ### Negativas - **Redis es SPOF:** Requiere replicación para HA - **Memoria:** Jobs grandes consumen RAM de Redis - **Complejidad:** Debugging de jobs distribuidos - **Orden:** No garantizado estricto (prioridades son best-effort) ### Mitigaciones 1. **Redis Sentinel** para alta disponibilidad 2. **Payloads pequeños** (referencias, no datos) 3. **Logging estructurado** con correlation IDs 4. **Timeouts** para evitar jobs colgados --- ## Alternativas Consideradas ### 1. RabbitMQ **Pros:** - Más features de messaging - Mejor para pub/sub complejo **Contras:** - Infraestructura adicional - Más complejo de operar - No tan integrado con Node **Rechazo:** Overkill, ya tenemos Redis ### 2. AWS SQS **Pros:** - Managed, sin operación - Escalabilidad infinita **Contras:** - Vendor lock-in - Sin prioridades nativas - Costos variables **Rechazo:** Preferimos portabilidad ### 3. PostgreSQL SKIP LOCKED **Pros:** - Sin infraestructura adicional - Transaccional **Contras:** - Performance limitada - Sin prioridades elegantes - Menos features **Rechazo:** No escala para alta carga --- ## Referencias - [BullMQ Documentation](https://docs.bullmq.io/) - [Bull Board](https://github.com/felixmosh/bull-board) - [Redis Persistence](https://redis.io/docs/manual/persistence/) --- **Documento generado por:** Requirements-Analyst **Fecha:** 2025-12-08