--- id: "ET-INV-006" title: "Jobs Programados (Cron Jobs)" type: "Technical Specification" status: "Done" priority: "Alta" epic: "OQI-004" project: "trading-platform" version: "1.0.0" created_date: "2025-12-05" updated_date: "2026-01-04" --- # ET-INV-006: Jobs Programados (Cron Jobs) **Epic:** OQI-004 Cuentas de Inversión **Versión:** 1.0 **Fecha:** 2025-12-05 **Responsable:** Requirements-Analyst --- ## 1. Descripción Define los trabajos programados (cron jobs) necesarios para el módulo de inversión: - Cálculo diario de performance - Distribución mensual de utilidades - Procesamiento automático de retiros aprobados - Sincronización con ML Engine - Limpieza de datos temporales --- ## 2. Arquitectura de Jobs ``` ┌─────────────────────────────────────────────────────────────────┐ │ Scheduled Jobs Architecture │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌────────────────────────────────────────────────────────┐ │ │ │ Job Scheduler │ │ │ │ (node-cron) │ │ │ └────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌────────────────────┼────────────────────┐ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ Daily │ │ Monthly │ │ Withdrawal │ │ │ │ Performance │ │ Distribution │ │ Processing │ │ │ │ (00:30) │ │ (1st 01:00) │ │ (*/30min) │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ Investment Database │ │ │ └──────────────────────────────────────────────────────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ ML Engine │ │ Stripe │ │ │ └──────────────┘ └──────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` --- ## 3. Implementación de Jobs ### 3.1 Job Manager ```typescript // src/jobs/job-manager.ts import { DailyPerformanceJob } from './daily-performance.job'; import { MonthlyDistributionJob } from './monthly-distribution.job'; import { WithdrawalProcessingJob } from './withdrawal-processing.job'; import { SyncMLPerformanceJob } from './sync-ml-performance.job'; import { DataCleanupJob } from './data-cleanup.job'; import { logger } from '../utils/logger'; export class JobManager { private jobs: any[] = []; constructor() { // Inicializar todos los jobs this.jobs = [ new DailyPerformanceJob(), new MonthlyDistributionJob(), new WithdrawalProcessingJob(), new SyncMLPerformanceJob(), new DataCleanupJob(), ]; } /** * Inicia todos los jobs programados */ start(): void { logger.info('Starting all scheduled jobs'); this.jobs.forEach((job) => { try { job.start(); logger.info(`Job started: ${job.constructor.name}`); } catch (error: any) { logger.error(`Failed to start job: ${job.constructor.name}`, { error: error.message, }); } }); logger.info(`Total jobs started: ${this.jobs.length}`); } /** * Detiene todos los jobs */ stop(): void { logger.info('Stopping all scheduled jobs'); this.jobs.forEach((job) => { try { job.stop(); logger.info(`Job stopped: ${job.constructor.name}`); } catch (error: any) { logger.error(`Failed to stop job: ${job.constructor.name}`, { error: error.message, }); } }); } /** * Ejecuta un job manualmente (útil para testing) */ async runJob(jobName: string): Promise { const job = this.jobs.find((j) => j.constructor.name === jobName); if (!job) { throw new Error(`Job not found: ${jobName}`); } logger.info(`Running job manually: ${jobName}`); await job.run(); } } ``` --- ### 3.2 Daily Performance Job ```typescript // src/jobs/daily-performance.job.ts import { CronJob } from 'cron'; import { InvestmentRepository } from '../modules/investment/investment.repository'; import { MLEngineService } from '../services/ml-engine/ml-engine.service'; import { logger } from '../utils/logger'; export class DailyPerformanceJob { private repository: InvestmentRepository; private mlEngineService: MLEngineService; private job: CronJob; constructor() { this.repository = new InvestmentRepository(); this.mlEngineService = new MLEngineService(); // Ejecutar diariamente a las 00:30 UTC this.job = new CronJob( '30 0 * * *', () => this.run(), null, false, 'UTC' ); } async run(): Promise { const startTime = Date.now(); logger.info('Starting daily performance calculation job'); try { // Obtener fecha del día anterior const yesterday = new Date(); yesterday.setDate(yesterday.getDate() - 1); const dateStr = yesterday.toISOString().split('T')[0]; // Obtener todas las cuentas activas const accounts = await this.repository.getActiveAccounts(); logger.info(`Processing ${accounts.length} active accounts`); let successCount = 0; let errorCount = 0; for (const account of accounts) { try { // Obtener performance del ML Engine const performance = await this.mlEngineService.getPerformance( account.id, { start_date: dateStr, end_date: dateStr, } ); if (performance.length === 0) { logger.warn('No performance data from ML Engine', { account_id: account.id, date: dateStr, }); continue; } const dailyPerf = performance[0]; // Calcular retornos acumulados const previousPerf = await this.repository.getLatestPerformance( account.id, dateStr ); const cumulativeReturn = previousPerf ? previousPerf.cumulative_return + dailyPerf.daily_return : dailyPerf.daily_return; const cumulativeReturnPercentage = (cumulativeReturn / account.initial_investment) * 100; // Guardar performance diaria await this.repository.createDailyPerformance({ account_id: account.id, product_id: account.product_id, date: dateStr, opening_balance: dailyPerf.opening_balance, closing_balance: dailyPerf.closing_balance, daily_return: dailyPerf.daily_return, daily_return_percentage: dailyPerf.daily_return_percentage, cumulative_return: cumulativeReturn, cumulative_return_percentage: cumulativeReturnPercentage, trades_executed: dailyPerf.trades_executed, winning_trades: dailyPerf.winning_trades, losing_trades: dailyPerf.losing_trades, ml_agent_data: dailyPerf.ml_agent_data || null, }); // Actualizar balance actual de la cuenta await this.repository.updateAccount(account.id, { current_balance: dailyPerf.closing_balance, total_return_percentage: cumulativeReturnPercentage, }); successCount++; } catch (error: any) { errorCount++; logger.error('Error processing account performance', { account_id: account.id, error: error.message, }); } } const duration = Date.now() - startTime; logger.info('Daily performance calculation completed', { total_accounts: accounts.length, successful: successCount, errors: errorCount, duration_ms: duration, }); } catch (error: any) { logger.error('Daily performance job failed', { error: error.message }); throw error; } } start(): void { this.job.start(); logger.info('Daily performance job scheduled: 00:30 UTC daily'); } stop(): void { this.job.stop(); } } ``` --- ### 3.3 Monthly Distribution Job ```typescript // src/jobs/monthly-distribution.job.ts import { CronJob } from 'cron'; import { InvestmentRepository } from '../modules/investment/investment.repository'; import { logger } from '../utils/logger'; export class MonthlyDistributionJob { private repository: InvestmentRepository; private job: CronJob; constructor() { this.repository = new InvestmentRepository(); // Ejecutar el día 1 de cada mes a las 01:00 UTC this.job = new CronJob( '0 1 1 * *', () => this.run(), null, false, 'UTC' ); } async run(): Promise { const startTime = Date.now(); logger.info('Starting monthly profit distribution job'); try { // Calcular período anterior const now = new Date(); const lastMonth = new Date(now.getFullYear(), now.getMonth() - 1, 1); const period = `${lastMonth.getFullYear()}-${String(lastMonth.getMonth() + 1).padStart(2, '0')}`; const periodStart = new Date(lastMonth.getFullYear(), lastMonth.getMonth(), 1); const periodEnd = new Date(lastMonth.getFullYear(), lastMonth.getMonth() + 1, 0); logger.info(`Processing distributions for period: ${period}`); // Obtener todas las cuentas activas const accounts = await this.repository.getActiveAccounts(); let successCount = 0; let errorCount = 0; let totalDistributed = 0; for (const account of accounts) { try { // Verificar si ya existe distribución para este período const existingDistribution = await this.repository.getDistributionByPeriod( account.id, period ); if (existingDistribution) { logger.warn('Distribution already exists for period', { account_id: account.id, period, }); continue; } // Obtener performance del período const performance = await this.repository.getPerformanceByDateRange( account.id, periodStart.toISOString().split('T')[0], periodEnd.toISOString().split('T')[0] ); if (performance.length === 0) { logger.warn('No performance data for period', { account_id: account.id, period, }); continue; } // Calcular opening y closing balance del período const openingBalance = performance[0].opening_balance; const closingBalance = performance[performance.length - 1].closing_balance; const grossProfit = closingBalance - openingBalance; // Solo distribuir si hay ganancia if (grossProfit <= 0) { logger.info('No profit to distribute', { account_id: account.id, period, gross_profit: grossProfit, }); continue; } // Obtener producto para fee percentage const product = await this.repository.getProductById(account.product_id); const performanceFeePercentage = product.performance_fee_percentage; const performanceFeeAmount = (grossProfit * performanceFeePercentage) / 100; const netProfit = grossProfit - performanceFeeAmount; // Crear registro de distribución const distribution = await this.repository.createDistribution({ account_id: account.id, product_id: account.product_id, user_id: account.user_id, period, period_start: periodStart.toISOString().split('T')[0], period_end: periodEnd.toISOString().split('T')[0], opening_balance: openingBalance, closing_balance: closingBalance, gross_profit: grossProfit, performance_fee_percentage: performanceFeePercentage, performance_fee_amount: performanceFeeAmount, net_profit: netProfit, status: 'pending', }); // Crear transacción de distribución const transaction = await this.repository.createTransaction({ account_id: account.id, user_id: account.user_id, type: 'profit_distribution', amount: netProfit, balance_before: closingBalance, balance_after: closingBalance, // No afecta balance, solo informativo distribution_id: distribution.id, distribution_period: period, status: 'completed', processed_at: new Date(), }); // Actualizar distribución con transaction_id await this.repository.updateDistribution(distribution.id, { status: 'distributed', distributed_at: new Date(), transaction_id: transaction.id, }); // Actualizar cuenta await this.repository.updateAccount(account.id, { total_profit_distributed: account.total_profit_distributed + netProfit, }); totalDistributed += netProfit; successCount++; logger.info('Profit distributed successfully', { account_id: account.id, period, gross_profit: grossProfit, fee_amount: performanceFeeAmount, net_profit: netProfit, }); } catch (error: any) { errorCount++; logger.error('Error distributing profit', { account_id: account.id, period, error: error.message, }); } } const duration = Date.now() - startTime; logger.info('Monthly distribution job completed', { period, total_accounts: accounts.length, successful: successCount, errors: errorCount, total_distributed: totalDistributed, duration_ms: duration, }); } catch (error: any) { logger.error('Monthly distribution job failed', { error: error.message }); throw error; } } start(): void { this.job.start(); logger.info('Monthly distribution job scheduled: 1st day of month at 01:00 UTC'); } stop(): void { this.job.stop(); } } ``` --- ### 3.4 Withdrawal Processing Job ```typescript // src/jobs/withdrawal-processing.job.ts import { CronJob } from 'cron'; import { InvestmentRepository } from '../modules/investment/investment.repository'; import { StripeService } from '../services/stripe/stripe.service'; import { MLEngineService } from '../services/ml-engine/ml-engine.service'; import { logger } from '../utils/logger'; export class WithdrawalProcessingJob { private repository: InvestmentRepository; private stripeService: StripeService; private mlEngineService: MLEngineService; private job: CronJob; constructor() { this.repository = new InvestmentRepository(); this.stripeService = new StripeService(); this.mlEngineService = new MLEngineService(); // Ejecutar cada 30 minutos this.job = new CronJob('*/30 * * * *', () => this.run()); } async run(): Promise { logger.info('Starting withdrawal processing job'); try { // Obtener solicitudes de retiro aprobadas const approvedRequests = await this.repository.getWithdrawalRequestsByStatus( 'approved' ); if (approvedRequests.length === 0) { logger.info('No approved withdrawal requests to process'); return; } logger.info(`Processing ${approvedRequests.length} approved withdrawals`); let successCount = 0; let errorCount = 0; for (const request of approvedRequests) { try { const account = await this.repository.getAccountById(request.account_id); if (!account) { logger.error('Account not found', { account_id: request.account_id }); continue; } // Verificar que hay balance suficiente if (account.current_balance < request.amount) { logger.error('Insufficient balance for withdrawal', { request_id: request.id, balance: account.current_balance, requested: request.amount, }); await this.repository.updateWithdrawalRequest(request.id, { status: 'rejected', rejection_reason: 'Insufficient balance', }); continue; } // Procesar según método de retiro let payoutId: string | null = null; if (request.withdrawal_method === 'stripe_payout') { // Crear payout en Stripe const payout = await this.stripeService.createPayout({ amount: request.amount, destination: request.destination_details.bank_account_id, metadata: { withdrawal_request_id: request.id, account_id: account.id, user_id: account.user_id, }, }); payoutId = payout.id; } // Actualizar balance de cuenta const newBalance = account.current_balance - request.amount; await this.repository.updateAccount(account.id, { current_balance: newBalance, total_withdrawn: account.total_withdrawn + request.amount, }); // Crear transacción const transaction = await this.repository.createTransaction({ account_id: account.id, user_id: account.user_id, type: 'withdrawal', amount: request.amount, balance_before: account.current_balance, balance_after: newBalance, withdrawal_request_id: request.id, withdrawal_method: request.withdrawal_method, withdrawal_destination_id: payoutId || request.destination_details.bank_account_id, status: 'completed', processed_at: new Date(), }); // Actualizar solicitud de retiro await this.repository.updateWithdrawalRequest(request.id, { status: 'completed', processed_at: new Date(), transaction_id: transaction.id, }); // Notificar ML Engine await this.mlEngineService.notifyWithdrawal({ account_id: account.id, amount: request.amount, new_balance: newBalance, }); successCount++; logger.info('Withdrawal processed successfully', { request_id: request.id, account_id: account.id, amount: request.amount, new_balance: newBalance, }); } catch (error: any) { errorCount++; logger.error('Error processing withdrawal', { request_id: request.id, error: error.message, }); // Marcar como failed si hubo error await this.repository.updateWithdrawalRequest(request.id, { status: 'rejected', rejection_reason: `Processing error: ${error.message}`, }); } } logger.info('Withdrawal processing job completed', { total_requests: approvedRequests.length, successful: successCount, errors: errorCount, }); } catch (error: any) { logger.error('Withdrawal processing job failed', { error: error.message }); throw error; } } start(): void { this.job.start(); logger.info('Withdrawal processing job scheduled: every 30 minutes'); } stop(): void { this.job.stop(); } } ``` --- ### 3.5 Data Cleanup Job ```typescript // src/jobs/data-cleanup.job.ts import { CronJob } from 'cron'; import { InvestmentRepository } from '../modules/investment/investment.repository'; import { logger } from '../utils/logger'; export class DataCleanupJob { private repository: InvestmentRepository; private job: CronJob; constructor() { this.repository = new InvestmentRepository(); // Ejecutar semanalmente los domingos a las 03:00 UTC this.job = new CronJob('0 3 * * 0', () => this.run()); } async run(): Promise { logger.info('Starting data cleanup job'); try { // Eliminar transacciones fallidas antiguas (>90 días) const deletedTransactions = await this.repository.deleteOldFailedTransactions(90); // Archivar cuentas cerradas antiguas (>365 días) const archivedAccounts = await this.repository.archiveOldClosedAccounts(365); // Limpiar logs antiguos (>180 días) // await this.repository.deleteOldLogs(180); logger.info('Data cleanup job completed', { deleted_transactions: deletedTransactions, archived_accounts: archivedAccounts, }); } catch (error: any) { logger.error('Data cleanup job failed', { error: error.message }); throw error; } } start(): void { this.job.start(); logger.info('Data cleanup job scheduled: Sundays at 03:00 UTC'); } stop(): void { this.job.stop(); } } ``` --- ## 4. Inicialización en App ```typescript // src/server.ts import express from 'express'; import { JobManager } from './jobs/job-manager'; import { logger } from './utils/logger'; const app = express(); const jobManager = new JobManager(); // Iniciar servidor const PORT = process.env.PORT || 3000; const server = app.listen(PORT, () => { logger.info(`Server running on port ${PORT}`); // Iniciar jobs programados jobManager.start(); }); // Graceful shutdown process.on('SIGTERM', () => { logger.info('SIGTERM received, shutting down gracefully'); // Detener jobs jobManager.stop(); // Cerrar servidor server.close(() => { logger.info('Server closed'); process.exit(0); }); }); ``` --- ## 5. Configuración ### 5.1 Variables de Entorno ```bash # Cron Jobs ENABLE_CRON_JOBS=true CRON_TIMEZONE=UTC # Job Settings DAILY_PERFORMANCE_CRON=30 0 * * * MONTHLY_DISTRIBUTION_CRON=0 1 1 * * WITHDRAWAL_PROCESSING_CRON=*/30 * * * * DATA_CLEANUP_CRON=0 3 * * 0 ``` --- ## 6. Monitoreo y Logs ### 6.1 Logging ```typescript // Logs detallados para cada job logger.info('Job started', { job_name: 'DailyPerformanceJob', scheduled_time: '00:30 UTC', trigger: 'scheduled', }); logger.info('Job completed', { job_name: 'DailyPerformanceJob', duration_ms: 12500, processed_items: 150, success_count: 148, error_count: 2, }); ``` ### 6.2 Health Checks ```typescript // Endpoint para verificar estado de jobs app.get('/health/jobs', async (req, res) => { const jobStatuses = await jobManager.getJobStatuses(); res.json(jobStatuses); }); ``` --- ## 7. Testing ### 7.1 Manual Job Execution ```typescript // Script para ejecutar job manualmente // scripts/run-job.ts import { JobManager } from '../src/jobs/job-manager'; const jobName = process.argv[2]; if (!jobName) { console.error('Usage: npm run job '); process.exit(1); } const jobManager = new JobManager(); jobManager .runJob(jobName) .then(() => { console.log(`Job ${jobName} completed successfully`); process.exit(0); }) .catch((error) => { console.error(`Job ${jobName} failed:`, error); process.exit(1); }); ``` ```bash # Ejecutar manualmente npm run job DailyPerformanceJob npm run job MonthlyDistributionJob ``` --- ## 8. Referencias - node-cron Documentation - Cron Expression Guide - Job Scheduling Best Practices - Error Handling in Background Jobs