/** * Distribution Job * Daily calculation and distribution of investment returns * Runs at 00:00 UTC */ import { db } from '../../../shared/database'; import { logger } from '../../../shared/utils/logger'; import { notificationService } from '../../notifications'; // ============================================================================ // Types // ============================================================================ interface InvestmentAccount { id: string; userId: string; productId: string; productCode: string; productName: string; accountNumber: string; currentBalance: number; status: string; } interface Product { id: string; code: string; name: string; targetReturnMin: number; targetReturnMax: number; performanceFee: number; } interface DistributionResult { accountId: string; userId: string; productName: string; accountNumber: string; grossReturn: number; performanceFee: number; netReturn: number; previousBalance: number; newBalance: number; } interface DistributionSummary { totalAccounts: number; successfulDistributions: number; failedDistributions: number; totalGrossReturns: number; totalFees: number; totalNetReturns: number; startTime: Date; endTime: Date; duration: number; } // ============================================================================ // Distribution Job Class // ============================================================================ class DistributionJob { private isRunning = false; private lastRunAt: Date | null = null; private cronInterval: NodeJS.Timeout | null = null; /** * Start the distribution job scheduler */ start(): void { if (this.cronInterval) { logger.warn('[DistributionJob] Job already running'); return; } // Calculate time until next midnight UTC const now = new Date(); const nextMidnight = new Date(Date.UTC( now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate() + 1, 0, 0, 0, 0 )); const msUntilMidnight = nextMidnight.getTime() - now.getTime(); // Schedule first run at next midnight, then every 24 hours setTimeout(() => { this.run(); this.cronInterval = setInterval(() => this.run(), 24 * 60 * 60 * 1000); }, msUntilMidnight); logger.info('[DistributionJob] Scheduled to run at 00:00 UTC', { nextRun: nextMidnight.toISOString(), msUntilRun: msUntilMidnight, }); } /** * Stop the distribution job scheduler */ stop(): void { if (this.cronInterval) { clearInterval(this.cronInterval); this.cronInterval = null; } logger.info('[DistributionJob] Stopped'); } /** * Execute the distribution job */ async run(): Promise { if (this.isRunning) { logger.warn('[DistributionJob] Distribution already in progress, skipping'); throw new Error('Distribution already in progress'); } this.isRunning = true; const startTime = new Date(); const summary: DistributionSummary = { totalAccounts: 0, successfulDistributions: 0, failedDistributions: 0, totalGrossReturns: 0, totalFees: 0, totalNetReturns: 0, startTime, endTime: startTime, duration: 0, }; try { logger.info('[DistributionJob] Starting daily distribution', { date: startTime.toISOString().split('T')[0], }); // Get all active investment accounts const accounts = await this.getActiveAccounts(); summary.totalAccounts = accounts.length; if (accounts.length === 0) { logger.info('[DistributionJob] No active accounts to process'); return summary; } // Get products with their return rates const products = await this.getProducts(); const productMap = new Map(products.map(p => [p.id, p])); // Process each account for (const account of accounts) { try { const product = productMap.get(account.productId); if (!product) { logger.warn('[DistributionJob] Product not found for account', { accountId: account.id, productId: account.productId, }); summary.failedDistributions++; continue; } // Calculate and distribute returns const result = await this.distributeReturns(account, product); if (result) { summary.successfulDistributions++; summary.totalGrossReturns += result.grossReturn; summary.totalFees += result.performanceFee; summary.totalNetReturns += result.netReturn; // Send notification to user await this.notifyUser(result); } } catch (error) { logger.error('[DistributionJob] Failed to process account', { accountId: account.id, error: (error as Error).message, }); summary.failedDistributions++; } } const endTime = new Date(); summary.endTime = endTime; summary.duration = endTime.getTime() - startTime.getTime(); // Log summary await this.logDistributionRun(summary); logger.info('[DistributionJob] Distribution completed', { processed: summary.successfulDistributions, failed: summary.failedDistributions, totalNetReturns: summary.totalNetReturns.toFixed(2), duration: `${summary.duration}ms`, }); this.lastRunAt = endTime; return summary; } finally { this.isRunning = false; } } /** * Get all active investment accounts */ private async getActiveAccounts(): Promise { const result = await db.query<{ id: string; user_id: string; product_id: string; product_code: string; product_name: string; account_number: string; current_balance: string; status: string; }>( `SELECT a.id, a.user_id, a.product_id, p.code as product_code, p.name as product_name, a.account_number, a.current_balance, a.status FROM investment.accounts a JOIN investment.products p ON p.id = a.product_id WHERE a.status = 'active' AND a.current_balance > 0 ORDER BY a.created_at` ); return result.rows.map(row => ({ id: row.id, userId: row.user_id, productId: row.product_id, productCode: row.product_code, productName: row.product_name, accountNumber: row.account_number, currentBalance: parseFloat(row.current_balance), status: row.status, })); } /** * Get all active products */ private async getProducts(): Promise { const result = await db.query<{ id: string; code: string; name: string; target_return_min: string; target_return_max: string; performance_fee: string; }>( `SELECT id, code, name, target_return_min, target_return_max, performance_fee FROM investment.products WHERE is_active = TRUE` ); return result.rows.map(row => ({ id: row.id, code: row.code, name: row.name, targetReturnMin: parseFloat(row.target_return_min), targetReturnMax: parseFloat(row.target_return_max), performanceFee: parseFloat(row.performance_fee), })); } /** * Calculate and distribute returns for an account */ private async distributeReturns( account: InvestmentAccount, product: Product ): Promise { // Calculate daily return rate // Monthly return range is targetReturnMin to targetReturnMax // Daily rate = monthly rate / 30 (approximation) // We use a random value within the range to simulate market variation const monthlyReturnMin = product.targetReturnMin / 100; const monthlyReturnMax = product.targetReturnMax / 100; // Add some daily variance (can be slightly negative on bad days) const variance = (Math.random() - 0.3) * 0.5; // -0.15 to +0.35 const dailyReturnRate = ((monthlyReturnMin + monthlyReturnMax) / 2 / 30) * (1 + variance); // Calculate gross return const grossReturn = account.currentBalance * dailyReturnRate; // Only distribute if positive (skip on negative days) if (grossReturn <= 0) { logger.debug('[DistributionJob] Skipping negative return day', { accountId: account.id, grossReturn: grossReturn.toFixed(4), }); return null; } // Calculate performance fee (only on positive returns) const performanceFeeRate = product.performanceFee / 100; const performanceFee = grossReturn * performanceFeeRate; const netReturn = grossReturn - performanceFee; // Round to 2 decimal places const roundedNetReturn = Math.round(netReturn * 100) / 100; if (roundedNetReturn <= 0) { return null; } // Execute distribution in a transaction return await db.transaction(async (client) => { // Lock account row const lockResult = await client.query<{ current_balance: string }>( 'SELECT current_balance FROM investment.accounts WHERE id = $1 FOR UPDATE', [account.id] ); if (lockResult.rows.length === 0) { throw new Error('Account not found'); } const previousBalance = parseFloat(lockResult.rows[0].current_balance); const newBalance = previousBalance + roundedNetReturn; // Update account balance await client.query( `UPDATE investment.accounts SET current_balance = $1, total_earnings = total_earnings + $2, updated_at = NOW() WHERE id = $3`, [newBalance, roundedNetReturn, account.id] ); // Record distribution transaction await client.query( `INSERT INTO investment.transactions ( account_id, type, amount, fee_amount, description, status, processed_at ) VALUES ($1, 'distribution', $2, $3, $4, 'completed', NOW())`, [ account.id, roundedNetReturn, Math.round(performanceFee * 100) / 100, `Daily distribution from ${product.name}`, ] ); // Record in distribution history await client.query( `INSERT INTO investment.distribution_history ( account_id, product_id, distribution_date, gross_amount, fee_amount, net_amount, balance_before, balance_after ) VALUES ($1, $2, CURRENT_DATE, $3, $4, $5, $6, $7)`, [ account.id, account.productId, Math.round(grossReturn * 100) / 100, Math.round(performanceFee * 100) / 100, roundedNetReturn, previousBalance, newBalance, ] ); return { accountId: account.id, userId: account.userId, productName: account.productName, accountNumber: account.accountNumber, grossReturn: Math.round(grossReturn * 100) / 100, performanceFee: Math.round(performanceFee * 100) / 100, netReturn: roundedNetReturn, previousBalance, newBalance, }; }); } /** * Send distribution notification to user */ private async notifyUser(result: DistributionResult): Promise { try { await notificationService.sendDistributionNotification(result.userId, { productName: result.productName, amount: result.netReturn, accountNumber: result.accountNumber, newBalance: result.newBalance, }); } catch (error) { logger.error('[DistributionJob] Failed to send notification', { userId: result.userId, error: (error as Error).message, }); } } /** * Log distribution run to database */ private async logDistributionRun(summary: DistributionSummary): Promise { try { await db.query( `INSERT INTO investment.distribution_runs ( run_date, total_accounts, successful_count, failed_count, total_gross_amount, total_fee_amount, total_net_amount, started_at, completed_at, duration_ms ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, [ summary.startTime.toISOString().split('T')[0], summary.totalAccounts, summary.successfulDistributions, summary.failedDistributions, Math.round(summary.totalGrossReturns * 100) / 100, Math.round(summary.totalFees * 100) / 100, Math.round(summary.totalNetReturns * 100) / 100, summary.startTime, summary.endTime, summary.duration, ] ); } catch (error) { logger.error('[DistributionJob] Failed to log distribution run', { error: (error as Error).message, }); } } /** * Get job status */ getStatus(): { isRunning: boolean; lastRunAt: Date | null; isScheduled: boolean; } { return { isRunning: this.isRunning, lastRunAt: this.lastRunAt, isScheduled: this.cronInterval !== null, }; } /** * Manually trigger distribution (for testing/admin) */ async triggerManually(): Promise { logger.info('[DistributionJob] Manual trigger requested'); return this.run(); } } // Export singleton instance export const distributionJob = new DistributionJob();