Changes include: - Updated architecture documentation - Enhanced module definitions (OQI-001 to OQI-008) - ML integration documentation updates - Trading strategies documentation - Orchestration and inventory updates - Docker configuration updates 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
25 KiB
25 KiB
| id | title | type | status | priority | epic | project | version | created_date | updated_date |
|---|---|---|---|---|---|---|---|---|---|
| ET-INV-006 | Jobs Programados (Cron Jobs) | Technical Specification | Done | Alta | OQI-004 | trading-platform | 1.0.0 | 2025-12-05 | 2026-01-04 |
ET-INV-006: Jobs Programados (Cron Jobs)
Epic: OQI-004 Cuentas de Inversión Versión: 1.0 Fecha: 2025-12-05 Responsable: Requirements-Analyst
1. Descripción
Define los trabajos programados (cron jobs) necesarios para el módulo de inversión:
- Cálculo diario de performance
- Distribución mensual de utilidades
- Procesamiento automático de retiros aprobados
- Sincronización con ML Engine
- Limpieza de datos temporales
2. Arquitectura de Jobs
┌─────────────────────────────────────────────────────────────────┐
│ Scheduled Jobs Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Job Scheduler │ │
│ │ (node-cron) │ │
│ └────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────┼────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Daily │ │ Monthly │ │ Withdrawal │ │
│ │ Performance │ │ Distribution │ │ Processing │ │
│ │ (00:30) │ │ (1st 01:00) │ │ (*/30min) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Investment Database │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ ML Engine │ │ Stripe │ │
│ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
3. Implementación de Jobs
3.1 Job Manager
// src/jobs/job-manager.ts
import { DailyPerformanceJob } from './daily-performance.job';
import { MonthlyDistributionJob } from './monthly-distribution.job';
import { WithdrawalProcessingJob } from './withdrawal-processing.job';
import { SyncMLPerformanceJob } from './sync-ml-performance.job';
import { DataCleanupJob } from './data-cleanup.job';
import { logger } from '../utils/logger';
export class JobManager {
private jobs: any[] = [];
constructor() {
// Inicializar todos los jobs
this.jobs = [
new DailyPerformanceJob(),
new MonthlyDistributionJob(),
new WithdrawalProcessingJob(),
new SyncMLPerformanceJob(),
new DataCleanupJob(),
];
}
/**
* Inicia todos los jobs programados
*/
start(): void {
logger.info('Starting all scheduled jobs');
this.jobs.forEach((job) => {
try {
job.start();
logger.info(`Job started: ${job.constructor.name}`);
} catch (error: any) {
logger.error(`Failed to start job: ${job.constructor.name}`, {
error: error.message,
});
}
});
logger.info(`Total jobs started: ${this.jobs.length}`);
}
/**
* Detiene todos los jobs
*/
stop(): void {
logger.info('Stopping all scheduled jobs');
this.jobs.forEach((job) => {
try {
job.stop();
logger.info(`Job stopped: ${job.constructor.name}`);
} catch (error: any) {
logger.error(`Failed to stop job: ${job.constructor.name}`, {
error: error.message,
});
}
});
}
/**
* Ejecuta un job manualmente (útil para testing)
*/
async runJob(jobName: string): Promise<void> {
const job = this.jobs.find((j) => j.constructor.name === jobName);
if (!job) {
throw new Error(`Job not found: ${jobName}`);
}
logger.info(`Running job manually: ${jobName}`);
await job.run();
}
}
3.2 Daily Performance Job
// src/jobs/daily-performance.job.ts
import { CronJob } from 'cron';
import { InvestmentRepository } from '../modules/investment/investment.repository';
import { MLEngineService } from '../services/ml-engine/ml-engine.service';
import { logger } from '../utils/logger';
export class DailyPerformanceJob {
private repository: InvestmentRepository;
private mlEngineService: MLEngineService;
private job: CronJob;
constructor() {
this.repository = new InvestmentRepository();
this.mlEngineService = new MLEngineService();
// Ejecutar diariamente a las 00:30 UTC
this.job = new CronJob(
'30 0 * * *',
() => this.run(),
null,
false,
'UTC'
);
}
async run(): Promise<void> {
const startTime = Date.now();
logger.info('Starting daily performance calculation job');
try {
// Obtener fecha del día anterior
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
const dateStr = yesterday.toISOString().split('T')[0];
// Obtener todas las cuentas activas
const accounts = await this.repository.getActiveAccounts();
logger.info(`Processing ${accounts.length} active accounts`);
let successCount = 0;
let errorCount = 0;
for (const account of accounts) {
try {
// Obtener performance del ML Engine
const performance = await this.mlEngineService.getPerformance(
account.id,
{
start_date: dateStr,
end_date: dateStr,
}
);
if (performance.length === 0) {
logger.warn('No performance data from ML Engine', {
account_id: account.id,
date: dateStr,
});
continue;
}
const dailyPerf = performance[0];
// Calcular retornos acumulados
const previousPerf = await this.repository.getLatestPerformance(
account.id,
dateStr
);
const cumulativeReturn = previousPerf
? previousPerf.cumulative_return + dailyPerf.daily_return
: dailyPerf.daily_return;
const cumulativeReturnPercentage =
(cumulativeReturn / account.initial_investment) * 100;
// Guardar performance diaria
await this.repository.createDailyPerformance({
account_id: account.id,
product_id: account.product_id,
date: dateStr,
opening_balance: dailyPerf.opening_balance,
closing_balance: dailyPerf.closing_balance,
daily_return: dailyPerf.daily_return,
daily_return_percentage: dailyPerf.daily_return_percentage,
cumulative_return: cumulativeReturn,
cumulative_return_percentage: cumulativeReturnPercentage,
trades_executed: dailyPerf.trades_executed,
winning_trades: dailyPerf.winning_trades,
losing_trades: dailyPerf.losing_trades,
ml_agent_data: dailyPerf.ml_agent_data || null,
});
// Actualizar balance actual de la cuenta
await this.repository.updateAccount(account.id, {
current_balance: dailyPerf.closing_balance,
total_return_percentage: cumulativeReturnPercentage,
});
successCount++;
} catch (error: any) {
errorCount++;
logger.error('Error processing account performance', {
account_id: account.id,
error: error.message,
});
}
}
const duration = Date.now() - startTime;
logger.info('Daily performance calculation completed', {
total_accounts: accounts.length,
successful: successCount,
errors: errorCount,
duration_ms: duration,
});
} catch (error: any) {
logger.error('Daily performance job failed', { error: error.message });
throw error;
}
}
start(): void {
this.job.start();
logger.info('Daily performance job scheduled: 00:30 UTC daily');
}
stop(): void {
this.job.stop();
}
}
3.3 Monthly Distribution Job
// src/jobs/monthly-distribution.job.ts
import { CronJob } from 'cron';
import { InvestmentRepository } from '../modules/investment/investment.repository';
import { logger } from '../utils/logger';
export class MonthlyDistributionJob {
private repository: InvestmentRepository;
private job: CronJob;
constructor() {
this.repository = new InvestmentRepository();
// Ejecutar el día 1 de cada mes a las 01:00 UTC
this.job = new CronJob(
'0 1 1 * *',
() => this.run(),
null,
false,
'UTC'
);
}
async run(): Promise<void> {
const startTime = Date.now();
logger.info('Starting monthly profit distribution job');
try {
// Calcular período anterior
const now = new Date();
const lastMonth = new Date(now.getFullYear(), now.getMonth() - 1, 1);
const period = `${lastMonth.getFullYear()}-${String(lastMonth.getMonth() + 1).padStart(2, '0')}`;
const periodStart = new Date(lastMonth.getFullYear(), lastMonth.getMonth(), 1);
const periodEnd = new Date(lastMonth.getFullYear(), lastMonth.getMonth() + 1, 0);
logger.info(`Processing distributions for period: ${period}`);
// Obtener todas las cuentas activas
const accounts = await this.repository.getActiveAccounts();
let successCount = 0;
let errorCount = 0;
let totalDistributed = 0;
for (const account of accounts) {
try {
// Verificar si ya existe distribución para este período
const existingDistribution = await this.repository.getDistributionByPeriod(
account.id,
period
);
if (existingDistribution) {
logger.warn('Distribution already exists for period', {
account_id: account.id,
period,
});
continue;
}
// Obtener performance del período
const performance = await this.repository.getPerformanceByDateRange(
account.id,
periodStart.toISOString().split('T')[0],
periodEnd.toISOString().split('T')[0]
);
if (performance.length === 0) {
logger.warn('No performance data for period', {
account_id: account.id,
period,
});
continue;
}
// Calcular opening y closing balance del período
const openingBalance = performance[0].opening_balance;
const closingBalance = performance[performance.length - 1].closing_balance;
const grossProfit = closingBalance - openingBalance;
// Solo distribuir si hay ganancia
if (grossProfit <= 0) {
logger.info('No profit to distribute', {
account_id: account.id,
period,
gross_profit: grossProfit,
});
continue;
}
// Obtener producto para fee percentage
const product = await this.repository.getProductById(account.product_id);
const performanceFeePercentage = product.performance_fee_percentage;
const performanceFeeAmount = (grossProfit * performanceFeePercentage) / 100;
const netProfit = grossProfit - performanceFeeAmount;
// Crear registro de distribución
const distribution = await this.repository.createDistribution({
account_id: account.id,
product_id: account.product_id,
user_id: account.user_id,
period,
period_start: periodStart.toISOString().split('T')[0],
period_end: periodEnd.toISOString().split('T')[0],
opening_balance: openingBalance,
closing_balance: closingBalance,
gross_profit: grossProfit,
performance_fee_percentage: performanceFeePercentage,
performance_fee_amount: performanceFeeAmount,
net_profit: netProfit,
status: 'pending',
});
// Crear transacción de distribución
const transaction = await this.repository.createTransaction({
account_id: account.id,
user_id: account.user_id,
type: 'profit_distribution',
amount: netProfit,
balance_before: closingBalance,
balance_after: closingBalance, // No afecta balance, solo informativo
distribution_id: distribution.id,
distribution_period: period,
status: 'completed',
processed_at: new Date(),
});
// Actualizar distribución con transaction_id
await this.repository.updateDistribution(distribution.id, {
status: 'distributed',
distributed_at: new Date(),
transaction_id: transaction.id,
});
// Actualizar cuenta
await this.repository.updateAccount(account.id, {
total_profit_distributed:
account.total_profit_distributed + netProfit,
});
totalDistributed += netProfit;
successCount++;
logger.info('Profit distributed successfully', {
account_id: account.id,
period,
gross_profit: grossProfit,
fee_amount: performanceFeeAmount,
net_profit: netProfit,
});
} catch (error: any) {
errorCount++;
logger.error('Error distributing profit', {
account_id: account.id,
period,
error: error.message,
});
}
}
const duration = Date.now() - startTime;
logger.info('Monthly distribution job completed', {
period,
total_accounts: accounts.length,
successful: successCount,
errors: errorCount,
total_distributed: totalDistributed,
duration_ms: duration,
});
} catch (error: any) {
logger.error('Monthly distribution job failed', { error: error.message });
throw error;
}
}
start(): void {
this.job.start();
logger.info('Monthly distribution job scheduled: 1st day of month at 01:00 UTC');
}
stop(): void {
this.job.stop();
}
}
3.4 Withdrawal Processing Job
// src/jobs/withdrawal-processing.job.ts
import { CronJob } from 'cron';
import { InvestmentRepository } from '../modules/investment/investment.repository';
import { StripeService } from '../services/stripe/stripe.service';
import { MLEngineService } from '../services/ml-engine/ml-engine.service';
import { logger } from '../utils/logger';
export class WithdrawalProcessingJob {
private repository: InvestmentRepository;
private stripeService: StripeService;
private mlEngineService: MLEngineService;
private job: CronJob;
constructor() {
this.repository = new InvestmentRepository();
this.stripeService = new StripeService();
this.mlEngineService = new MLEngineService();
// Ejecutar cada 30 minutos
this.job = new CronJob('*/30 * * * *', () => this.run());
}
async run(): Promise<void> {
logger.info('Starting withdrawal processing job');
try {
// Obtener solicitudes de retiro aprobadas
const approvedRequests = await this.repository.getWithdrawalRequestsByStatus(
'approved'
);
if (approvedRequests.length === 0) {
logger.info('No approved withdrawal requests to process');
return;
}
logger.info(`Processing ${approvedRequests.length} approved withdrawals`);
let successCount = 0;
let errorCount = 0;
for (const request of approvedRequests) {
try {
const account = await this.repository.getAccountById(request.account_id);
if (!account) {
logger.error('Account not found', { account_id: request.account_id });
continue;
}
// Verificar que hay balance suficiente
if (account.current_balance < request.amount) {
logger.error('Insufficient balance for withdrawal', {
request_id: request.id,
balance: account.current_balance,
requested: request.amount,
});
await this.repository.updateWithdrawalRequest(request.id, {
status: 'rejected',
rejection_reason: 'Insufficient balance',
});
continue;
}
// Procesar según método de retiro
let payoutId: string | null = null;
if (request.withdrawal_method === 'stripe_payout') {
// Crear payout en Stripe
const payout = await this.stripeService.createPayout({
amount: request.amount,
destination: request.destination_details.bank_account_id,
metadata: {
withdrawal_request_id: request.id,
account_id: account.id,
user_id: account.user_id,
},
});
payoutId = payout.id;
}
// Actualizar balance de cuenta
const newBalance = account.current_balance - request.amount;
await this.repository.updateAccount(account.id, {
current_balance: newBalance,
total_withdrawn: account.total_withdrawn + request.amount,
});
// Crear transacción
const transaction = await this.repository.createTransaction({
account_id: account.id,
user_id: account.user_id,
type: 'withdrawal',
amount: request.amount,
balance_before: account.current_balance,
balance_after: newBalance,
withdrawal_request_id: request.id,
withdrawal_method: request.withdrawal_method,
withdrawal_destination_id: payoutId || request.destination_details.bank_account_id,
status: 'completed',
processed_at: new Date(),
});
// Actualizar solicitud de retiro
await this.repository.updateWithdrawalRequest(request.id, {
status: 'completed',
processed_at: new Date(),
transaction_id: transaction.id,
});
// Notificar ML Engine
await this.mlEngineService.notifyWithdrawal({
account_id: account.id,
amount: request.amount,
new_balance: newBalance,
});
successCount++;
logger.info('Withdrawal processed successfully', {
request_id: request.id,
account_id: account.id,
amount: request.amount,
new_balance: newBalance,
});
} catch (error: any) {
errorCount++;
logger.error('Error processing withdrawal', {
request_id: request.id,
error: error.message,
});
// Marcar como failed si hubo error
await this.repository.updateWithdrawalRequest(request.id, {
status: 'rejected',
rejection_reason: `Processing error: ${error.message}`,
});
}
}
logger.info('Withdrawal processing job completed', {
total_requests: approvedRequests.length,
successful: successCount,
errors: errorCount,
});
} catch (error: any) {
logger.error('Withdrawal processing job failed', { error: error.message });
throw error;
}
}
start(): void {
this.job.start();
logger.info('Withdrawal processing job scheduled: every 30 minutes');
}
stop(): void {
this.job.stop();
}
}
3.5 Data Cleanup Job
// src/jobs/data-cleanup.job.ts
import { CronJob } from 'cron';
import { InvestmentRepository } from '../modules/investment/investment.repository';
import { logger } from '../utils/logger';
export class DataCleanupJob {
private repository: InvestmentRepository;
private job: CronJob;
constructor() {
this.repository = new InvestmentRepository();
// Ejecutar semanalmente los domingos a las 03:00 UTC
this.job = new CronJob('0 3 * * 0', () => this.run());
}
async run(): Promise<void> {
logger.info('Starting data cleanup job');
try {
// Eliminar transacciones fallidas antiguas (>90 días)
const deletedTransactions = await this.repository.deleteOldFailedTransactions(90);
// Archivar cuentas cerradas antiguas (>365 días)
const archivedAccounts = await this.repository.archiveOldClosedAccounts(365);
// Limpiar logs antiguos (>180 días)
// await this.repository.deleteOldLogs(180);
logger.info('Data cleanup job completed', {
deleted_transactions: deletedTransactions,
archived_accounts: archivedAccounts,
});
} catch (error: any) {
logger.error('Data cleanup job failed', { error: error.message });
throw error;
}
}
start(): void {
this.job.start();
logger.info('Data cleanup job scheduled: Sundays at 03:00 UTC');
}
stop(): void {
this.job.stop();
}
}
4. Inicialización en App
// src/server.ts
import express from 'express';
import { JobManager } from './jobs/job-manager';
import { logger } from './utils/logger';
const app = express();
const jobManager = new JobManager();
// Iniciar servidor
const PORT = process.env.PORT || 3000;
const server = app.listen(PORT, () => {
logger.info(`Server running on port ${PORT}`);
// Iniciar jobs programados
jobManager.start();
});
// Graceful shutdown
process.on('SIGTERM', () => {
logger.info('SIGTERM received, shutting down gracefully');
// Detener jobs
jobManager.stop();
// Cerrar servidor
server.close(() => {
logger.info('Server closed');
process.exit(0);
});
});
5. Configuración
5.1 Variables de Entorno
# Cron Jobs
ENABLE_CRON_JOBS=true
CRON_TIMEZONE=UTC
# Job Settings
DAILY_PERFORMANCE_CRON=30 0 * * *
MONTHLY_DISTRIBUTION_CRON=0 1 1 * *
WITHDRAWAL_PROCESSING_CRON=*/30 * * * *
DATA_CLEANUP_CRON=0 3 * * 0
6. Monitoreo y Logs
6.1 Logging
// Logs detallados para cada job
logger.info('Job started', {
job_name: 'DailyPerformanceJob',
scheduled_time: '00:30 UTC',
trigger: 'scheduled',
});
logger.info('Job completed', {
job_name: 'DailyPerformanceJob',
duration_ms: 12500,
processed_items: 150,
success_count: 148,
error_count: 2,
});
6.2 Health Checks
// Endpoint para verificar estado de jobs
app.get('/health/jobs', async (req, res) => {
const jobStatuses = await jobManager.getJobStatuses();
res.json(jobStatuses);
});
7. Testing
7.1 Manual Job Execution
// Script para ejecutar job manualmente
// scripts/run-job.ts
import { JobManager } from '../src/jobs/job-manager';
const jobName = process.argv[2];
if (!jobName) {
console.error('Usage: npm run job <JobName>');
process.exit(1);
}
const jobManager = new JobManager();
jobManager
.runJob(jobName)
.then(() => {
console.log(`Job ${jobName} completed successfully`);
process.exit(0);
})
.catch((error) => {
console.error(`Job ${jobName} failed:`, error);
process.exit(1);
});
# Ejecutar manualmente
npm run job DailyPerformanceJob
npm run job MonthlyDistributionJob
8. Referencias
- node-cron Documentation
- Cron Expression Guide
- Job Scheduling Best Practices
- Error Handling in Background Jobs