trading-platform/docs/02-definicion-modulos/OQI-004-investment-accounts/especificaciones/ET-INV-006-cron.md
rckrdmrd a7cca885f0 feat: Major platform documentation and architecture updates
Changes include:
- Updated architecture documentation
- Enhanced module definitions (OQI-001 to OQI-008)
- ML integration documentation updates
- Trading strategies documentation
- Orchestration and inventory updates
- Docker configuration updates

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-07 05:33:35 -06:00

25 KiB

id title type status priority epic project version created_date updated_date
ET-INV-006 Jobs Programados (Cron Jobs) Technical Specification Done Alta OQI-004 trading-platform 1.0.0 2025-12-05 2026-01-04

ET-INV-006: Jobs Programados (Cron Jobs)

Epic: OQI-004 Cuentas de Inversión Versión: 1.0 Fecha: 2025-12-05 Responsable: Requirements-Analyst


1. Descripción

Define los trabajos programados (cron jobs) necesarios para el módulo de inversión:

  • Cálculo diario de performance
  • Distribución mensual de utilidades
  • Procesamiento automático de retiros aprobados
  • Sincronización con ML Engine
  • Limpieza de datos temporales

2. Arquitectura de Jobs

┌─────────────────────────────────────────────────────────────────┐
│                      Scheduled Jobs Architecture                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                   │
│  ┌────────────────────────────────────────────────────────┐     │
│  │                    Job Scheduler                       │     │
│  │                    (node-cron)                         │     │
│  └────────────────────────────────────────────────────────┘     │
│                              │                                   │
│         ┌────────────────────┼────────────────────┐             │
│         │                    │                    │             │
│         ▼                    ▼                    ▼             │
│  ┌──────────────┐   ┌──────────────┐   ┌──────────────┐        │
│  │   Daily      │   │   Monthly    │   │  Withdrawal  │        │
│  │ Performance  │   │ Distribution │   │  Processing  │        │
│  │   (00:30)    │   │  (1st 01:00) │   │   (*/30min)  │        │
│  └──────────────┘   └──────────────┘   └──────────────┘        │
│         │                    │                    │             │
│         ▼                    ▼                    ▼             │
│  ┌──────────────────────────────────────────────────────┐      │
│  │              Investment Database                     │      │
│  └──────────────────────────────────────────────────────┘      │
│         │                                        │              │
│         ▼                                        ▼              │
│  ┌──────────────┐                        ┌──────────────┐      │
│  │  ML Engine   │                        │    Stripe    │      │
│  └──────────────┘                        └──────────────┘      │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

3. Implementación de Jobs

3.1 Job Manager

// src/jobs/job-manager.ts

import { DailyPerformanceJob } from './daily-performance.job';
import { MonthlyDistributionJob } from './monthly-distribution.job';
import { WithdrawalProcessingJob } from './withdrawal-processing.job';
import { SyncMLPerformanceJob } from './sync-ml-performance.job';
import { DataCleanupJob } from './data-cleanup.job';
import { logger } from '../utils/logger';

export class JobManager {
  private jobs: any[] = [];

  constructor() {
    // Inicializar todos los jobs
    this.jobs = [
      new DailyPerformanceJob(),
      new MonthlyDistributionJob(),
      new WithdrawalProcessingJob(),
      new SyncMLPerformanceJob(),
      new DataCleanupJob(),
    ];
  }

  /**
   * Inicia todos los jobs programados
   */
  start(): void {
    logger.info('Starting all scheduled jobs');

    this.jobs.forEach((job) => {
      try {
        job.start();
        logger.info(`Job started: ${job.constructor.name}`);
      } catch (error: any) {
        logger.error(`Failed to start job: ${job.constructor.name}`, {
          error: error.message,
        });
      }
    });

    logger.info(`Total jobs started: ${this.jobs.length}`);
  }

  /**
   * Detiene todos los jobs
   */
  stop(): void {
    logger.info('Stopping all scheduled jobs');

    this.jobs.forEach((job) => {
      try {
        job.stop();
        logger.info(`Job stopped: ${job.constructor.name}`);
      } catch (error: any) {
        logger.error(`Failed to stop job: ${job.constructor.name}`, {
          error: error.message,
        });
      }
    });
  }

  /**
   * Ejecuta un job manualmente (útil para testing)
   */
  async runJob(jobName: string): Promise<void> {
    const job = this.jobs.find((j) => j.constructor.name === jobName);

    if (!job) {
      throw new Error(`Job not found: ${jobName}`);
    }

    logger.info(`Running job manually: ${jobName}`);
    await job.run();
  }
}

3.2 Daily Performance Job

// src/jobs/daily-performance.job.ts

import { CronJob } from 'cron';
import { InvestmentRepository } from '../modules/investment/investment.repository';
import { MLEngineService } from '../services/ml-engine/ml-engine.service';
import { logger } from '../utils/logger';

export class DailyPerformanceJob {
  private repository: InvestmentRepository;
  private mlEngineService: MLEngineService;
  private job: CronJob;

  constructor() {
    this.repository = new InvestmentRepository();
    this.mlEngineService = new MLEngineService();

    // Ejecutar diariamente a las 00:30 UTC
    this.job = new CronJob(
      '30 0 * * *',
      () => this.run(),
      null,
      false,
      'UTC'
    );
  }

  async run(): Promise<void> {
    const startTime = Date.now();
    logger.info('Starting daily performance calculation job');

    try {
      // Obtener fecha del día anterior
      const yesterday = new Date();
      yesterday.setDate(yesterday.getDate() - 1);
      const dateStr = yesterday.toISOString().split('T')[0];

      // Obtener todas las cuentas activas
      const accounts = await this.repository.getActiveAccounts();
      logger.info(`Processing ${accounts.length} active accounts`);

      let successCount = 0;
      let errorCount = 0;

      for (const account of accounts) {
        try {
          // Obtener performance del ML Engine
          const performance = await this.mlEngineService.getPerformance(
            account.id,
            {
              start_date: dateStr,
              end_date: dateStr,
            }
          );

          if (performance.length === 0) {
            logger.warn('No performance data from ML Engine', {
              account_id: account.id,
              date: dateStr,
            });
            continue;
          }

          const dailyPerf = performance[0];

          // Calcular retornos acumulados
          const previousPerf = await this.repository.getLatestPerformance(
            account.id,
            dateStr
          );

          const cumulativeReturn = previousPerf
            ? previousPerf.cumulative_return + dailyPerf.daily_return
            : dailyPerf.daily_return;

          const cumulativeReturnPercentage =
            (cumulativeReturn / account.initial_investment) * 100;

          // Guardar performance diaria
          await this.repository.createDailyPerformance({
            account_id: account.id,
            product_id: account.product_id,
            date: dateStr,
            opening_balance: dailyPerf.opening_balance,
            closing_balance: dailyPerf.closing_balance,
            daily_return: dailyPerf.daily_return,
            daily_return_percentage: dailyPerf.daily_return_percentage,
            cumulative_return: cumulativeReturn,
            cumulative_return_percentage: cumulativeReturnPercentage,
            trades_executed: dailyPerf.trades_executed,
            winning_trades: dailyPerf.winning_trades,
            losing_trades: dailyPerf.losing_trades,
            ml_agent_data: dailyPerf.ml_agent_data || null,
          });

          // Actualizar balance actual de la cuenta
          await this.repository.updateAccount(account.id, {
            current_balance: dailyPerf.closing_balance,
            total_return_percentage: cumulativeReturnPercentage,
          });

          successCount++;
        } catch (error: any) {
          errorCount++;
          logger.error('Error processing account performance', {
            account_id: account.id,
            error: error.message,
          });
        }
      }

      const duration = Date.now() - startTime;
      logger.info('Daily performance calculation completed', {
        total_accounts: accounts.length,
        successful: successCount,
        errors: errorCount,
        duration_ms: duration,
      });
    } catch (error: any) {
      logger.error('Daily performance job failed', { error: error.message });
      throw error;
    }
  }

  start(): void {
    this.job.start();
    logger.info('Daily performance job scheduled: 00:30 UTC daily');
  }

  stop(): void {
    this.job.stop();
  }
}

3.3 Monthly Distribution Job

// src/jobs/monthly-distribution.job.ts

import { CronJob } from 'cron';
import { InvestmentRepository } from '../modules/investment/investment.repository';
import { logger } from '../utils/logger';

export class MonthlyDistributionJob {
  private repository: InvestmentRepository;
  private job: CronJob;

  constructor() {
    this.repository = new InvestmentRepository();

    // Ejecutar el día 1 de cada mes a las 01:00 UTC
    this.job = new CronJob(
      '0 1 1 * *',
      () => this.run(),
      null,
      false,
      'UTC'
    );
  }

  async run(): Promise<void> {
    const startTime = Date.now();
    logger.info('Starting monthly profit distribution job');

    try {
      // Calcular período anterior
      const now = new Date();
      const lastMonth = new Date(now.getFullYear(), now.getMonth() - 1, 1);
      const period = `${lastMonth.getFullYear()}-${String(lastMonth.getMonth() + 1).padStart(2, '0')}`;

      const periodStart = new Date(lastMonth.getFullYear(), lastMonth.getMonth(), 1);
      const periodEnd = new Date(lastMonth.getFullYear(), lastMonth.getMonth() + 1, 0);

      logger.info(`Processing distributions for period: ${period}`);

      // Obtener todas las cuentas activas
      const accounts = await this.repository.getActiveAccounts();

      let successCount = 0;
      let errorCount = 0;
      let totalDistributed = 0;

      for (const account of accounts) {
        try {
          // Verificar si ya existe distribución para este período
          const existingDistribution = await this.repository.getDistributionByPeriod(
            account.id,
            period
          );

          if (existingDistribution) {
            logger.warn('Distribution already exists for period', {
              account_id: account.id,
              period,
            });
            continue;
          }

          // Obtener performance del período
          const performance = await this.repository.getPerformanceByDateRange(
            account.id,
            periodStart.toISOString().split('T')[0],
            periodEnd.toISOString().split('T')[0]
          );

          if (performance.length === 0) {
            logger.warn('No performance data for period', {
              account_id: account.id,
              period,
            });
            continue;
          }

          // Calcular opening y closing balance del período
          const openingBalance = performance[0].opening_balance;
          const closingBalance = performance[performance.length - 1].closing_balance;
          const grossProfit = closingBalance - openingBalance;

          // Solo distribuir si hay ganancia
          if (grossProfit <= 0) {
            logger.info('No profit to distribute', {
              account_id: account.id,
              period,
              gross_profit: grossProfit,
            });
            continue;
          }

          // Obtener producto para fee percentage
          const product = await this.repository.getProductById(account.product_id);
          const performanceFeePercentage = product.performance_fee_percentage;
          const performanceFeeAmount = (grossProfit * performanceFeePercentage) / 100;
          const netProfit = grossProfit - performanceFeeAmount;

          // Crear registro de distribución
          const distribution = await this.repository.createDistribution({
            account_id: account.id,
            product_id: account.product_id,
            user_id: account.user_id,
            period,
            period_start: periodStart.toISOString().split('T')[0],
            period_end: periodEnd.toISOString().split('T')[0],
            opening_balance: openingBalance,
            closing_balance: closingBalance,
            gross_profit: grossProfit,
            performance_fee_percentage: performanceFeePercentage,
            performance_fee_amount: performanceFeeAmount,
            net_profit: netProfit,
            status: 'pending',
          });

          // Crear transacción de distribución
          const transaction = await this.repository.createTransaction({
            account_id: account.id,
            user_id: account.user_id,
            type: 'profit_distribution',
            amount: netProfit,
            balance_before: closingBalance,
            balance_after: closingBalance, // No afecta balance, solo informativo
            distribution_id: distribution.id,
            distribution_period: period,
            status: 'completed',
            processed_at: new Date(),
          });

          // Actualizar distribución con transaction_id
          await this.repository.updateDistribution(distribution.id, {
            status: 'distributed',
            distributed_at: new Date(),
            transaction_id: transaction.id,
          });

          // Actualizar cuenta
          await this.repository.updateAccount(account.id, {
            total_profit_distributed:
              account.total_profit_distributed + netProfit,
          });

          totalDistributed += netProfit;
          successCount++;

          logger.info('Profit distributed successfully', {
            account_id: account.id,
            period,
            gross_profit: grossProfit,
            fee_amount: performanceFeeAmount,
            net_profit: netProfit,
          });
        } catch (error: any) {
          errorCount++;
          logger.error('Error distributing profit', {
            account_id: account.id,
            period,
            error: error.message,
          });
        }
      }

      const duration = Date.now() - startTime;
      logger.info('Monthly distribution job completed', {
        period,
        total_accounts: accounts.length,
        successful: successCount,
        errors: errorCount,
        total_distributed: totalDistributed,
        duration_ms: duration,
      });
    } catch (error: any) {
      logger.error('Monthly distribution job failed', { error: error.message });
      throw error;
    }
  }

  start(): void {
    this.job.start();
    logger.info('Monthly distribution job scheduled: 1st day of month at 01:00 UTC');
  }

  stop(): void {
    this.job.stop();
  }
}

3.4 Withdrawal Processing Job

// src/jobs/withdrawal-processing.job.ts

import { CronJob } from 'cron';
import { InvestmentRepository } from '../modules/investment/investment.repository';
import { StripeService } from '../services/stripe/stripe.service';
import { MLEngineService } from '../services/ml-engine/ml-engine.service';
import { logger } from '../utils/logger';

export class WithdrawalProcessingJob {
  private repository: InvestmentRepository;
  private stripeService: StripeService;
  private mlEngineService: MLEngineService;
  private job: CronJob;

  constructor() {
    this.repository = new InvestmentRepository();
    this.stripeService = new StripeService();
    this.mlEngineService = new MLEngineService();

    // Ejecutar cada 30 minutos
    this.job = new CronJob('*/30 * * * *', () => this.run());
  }

  async run(): Promise<void> {
    logger.info('Starting withdrawal processing job');

    try {
      // Obtener solicitudes de retiro aprobadas
      const approvedRequests = await this.repository.getWithdrawalRequestsByStatus(
        'approved'
      );

      if (approvedRequests.length === 0) {
        logger.info('No approved withdrawal requests to process');
        return;
      }

      logger.info(`Processing ${approvedRequests.length} approved withdrawals`);

      let successCount = 0;
      let errorCount = 0;

      for (const request of approvedRequests) {
        try {
          const account = await this.repository.getAccountById(request.account_id);

          if (!account) {
            logger.error('Account not found', { account_id: request.account_id });
            continue;
          }

          // Verificar que hay balance suficiente
          if (account.current_balance < request.amount) {
            logger.error('Insufficient balance for withdrawal', {
              request_id: request.id,
              balance: account.current_balance,
              requested: request.amount,
            });

            await this.repository.updateWithdrawalRequest(request.id, {
              status: 'rejected',
              rejection_reason: 'Insufficient balance',
            });
            continue;
          }

          // Procesar según método de retiro
          let payoutId: string | null = null;

          if (request.withdrawal_method === 'stripe_payout') {
            // Crear payout en Stripe
            const payout = await this.stripeService.createPayout({
              amount: request.amount,
              destination: request.destination_details.bank_account_id,
              metadata: {
                withdrawal_request_id: request.id,
                account_id: account.id,
                user_id: account.user_id,
              },
            });

            payoutId = payout.id;
          }

          // Actualizar balance de cuenta
          const newBalance = account.current_balance - request.amount;

          await this.repository.updateAccount(account.id, {
            current_balance: newBalance,
            total_withdrawn: account.total_withdrawn + request.amount,
          });

          // Crear transacción
          const transaction = await this.repository.createTransaction({
            account_id: account.id,
            user_id: account.user_id,
            type: 'withdrawal',
            amount: request.amount,
            balance_before: account.current_balance,
            balance_after: newBalance,
            withdrawal_request_id: request.id,
            withdrawal_method: request.withdrawal_method,
            withdrawal_destination_id: payoutId || request.destination_details.bank_account_id,
            status: 'completed',
            processed_at: new Date(),
          });

          // Actualizar solicitud de retiro
          await this.repository.updateWithdrawalRequest(request.id, {
            status: 'completed',
            processed_at: new Date(),
            transaction_id: transaction.id,
          });

          // Notificar ML Engine
          await this.mlEngineService.notifyWithdrawal({
            account_id: account.id,
            amount: request.amount,
            new_balance: newBalance,
          });

          successCount++;

          logger.info('Withdrawal processed successfully', {
            request_id: request.id,
            account_id: account.id,
            amount: request.amount,
            new_balance: newBalance,
          });
        } catch (error: any) {
          errorCount++;
          logger.error('Error processing withdrawal', {
            request_id: request.id,
            error: error.message,
          });

          // Marcar como failed si hubo error
          await this.repository.updateWithdrawalRequest(request.id, {
            status: 'rejected',
            rejection_reason: `Processing error: ${error.message}`,
          });
        }
      }

      logger.info('Withdrawal processing job completed', {
        total_requests: approvedRequests.length,
        successful: successCount,
        errors: errorCount,
      });
    } catch (error: any) {
      logger.error('Withdrawal processing job failed', { error: error.message });
      throw error;
    }
  }

  start(): void {
    this.job.start();
    logger.info('Withdrawal processing job scheduled: every 30 minutes');
  }

  stop(): void {
    this.job.stop();
  }
}

3.5 Data Cleanup Job

// src/jobs/data-cleanup.job.ts

import { CronJob } from 'cron';
import { InvestmentRepository } from '../modules/investment/investment.repository';
import { logger } from '../utils/logger';

export class DataCleanupJob {
  private repository: InvestmentRepository;
  private job: CronJob;

  constructor() {
    this.repository = new InvestmentRepository();

    // Ejecutar semanalmente los domingos a las 03:00 UTC
    this.job = new CronJob('0 3 * * 0', () => this.run());
  }

  async run(): Promise<void> {
    logger.info('Starting data cleanup job');

    try {
      // Eliminar transacciones fallidas antiguas (>90 días)
      const deletedTransactions = await this.repository.deleteOldFailedTransactions(90);

      // Archivar cuentas cerradas antiguas (>365 días)
      const archivedAccounts = await this.repository.archiveOldClosedAccounts(365);

      // Limpiar logs antiguos (>180 días)
      // await this.repository.deleteOldLogs(180);

      logger.info('Data cleanup job completed', {
        deleted_transactions: deletedTransactions,
        archived_accounts: archivedAccounts,
      });
    } catch (error: any) {
      logger.error('Data cleanup job failed', { error: error.message });
      throw error;
    }
  }

  start(): void {
    this.job.start();
    logger.info('Data cleanup job scheduled: Sundays at 03:00 UTC');
  }

  stop(): void {
    this.job.stop();
  }
}

4. Inicialización en App

// src/server.ts

import express from 'express';
import { JobManager } from './jobs/job-manager';
import { logger } from './utils/logger';

const app = express();
const jobManager = new JobManager();

// Iniciar servidor
const PORT = process.env.PORT || 3000;
const server = app.listen(PORT, () => {
  logger.info(`Server running on port ${PORT}`);

  // Iniciar jobs programados
  jobManager.start();
});

// Graceful shutdown
process.on('SIGTERM', () => {
  logger.info('SIGTERM received, shutting down gracefully');

  // Detener jobs
  jobManager.stop();

  // Cerrar servidor
  server.close(() => {
    logger.info('Server closed');
    process.exit(0);
  });
});

5. Configuración

5.1 Variables de Entorno

# Cron Jobs
ENABLE_CRON_JOBS=true
CRON_TIMEZONE=UTC

# Job Settings
DAILY_PERFORMANCE_CRON=30 0 * * *
MONTHLY_DISTRIBUTION_CRON=0 1 1 * *
WITHDRAWAL_PROCESSING_CRON=*/30 * * * *
DATA_CLEANUP_CRON=0 3 * * 0

6. Monitoreo y Logs

6.1 Logging

// Logs detallados para cada job
logger.info('Job started', {
  job_name: 'DailyPerformanceJob',
  scheduled_time: '00:30 UTC',
  trigger: 'scheduled',
});

logger.info('Job completed', {
  job_name: 'DailyPerformanceJob',
  duration_ms: 12500,
  processed_items: 150,
  success_count: 148,
  error_count: 2,
});

6.2 Health Checks

// Endpoint para verificar estado de jobs
app.get('/health/jobs', async (req, res) => {
  const jobStatuses = await jobManager.getJobStatuses();
  res.json(jobStatuses);
});

7. Testing

7.1 Manual Job Execution

// Script para ejecutar job manualmente
// scripts/run-job.ts

import { JobManager } from '../src/jobs/job-manager';

const jobName = process.argv[2];

if (!jobName) {
  console.error('Usage: npm run job <JobName>');
  process.exit(1);
}

const jobManager = new JobManager();

jobManager
  .runJob(jobName)
  .then(() => {
    console.log(`Job ${jobName} completed successfully`);
    process.exit(0);
  })
  .catch((error) => {
    console.error(`Job ${jobName} failed:`, error);
    process.exit(1);
  });
# Ejecutar manualmente
npm run job DailyPerformanceJob
npm run job MonthlyDistributionJob

8. Referencias

  • node-cron Documentation
  • Cron Expression Guide
  • Job Scheduling Best Practices
  • Error Handling in Background Jobs