diff --git a/src/modules/portfolio/controllers/portfolio.controller.ts b/src/modules/portfolio/controllers/portfolio.controller.ts index e8f8cf0..963d25b 100644 --- a/src/modules/portfolio/controllers/portfolio.controller.ts +++ b/src/modules/portfolio/controllers/portfolio.controller.ts @@ -5,6 +5,7 @@ import { Request, Response, NextFunction } from 'express'; import { portfolioService, RiskProfile } from '../services/portfolio.service'; +import { snapshotRepository } from '../repositories/snapshot.repository'; // ============================================================================ // Types @@ -458,3 +459,108 @@ export async function deleteGoal(req: AuthRequest, res: Response, next: NextFunc next(error); } } + +// ============================================================================ +// Performance & Analytics +// ============================================================================ + +/** + * Get portfolio performance history for charts + */ +export async function getPortfolioPerformance(req: AuthRequest, res: Response, next: NextFunction): Promise { + try { + const userId = req.user?.id; + if (!userId) { + res.status(401).json({ + success: false, + error: { message: 'Unauthorized', code: 'UNAUTHORIZED' }, + }); + return; + } + + const { portfolioId } = req.params; + const period = (req.query.period as string) || 'month'; + + const validPeriods = ['week', 'month', '3months', 'year', 'all']; + if (!validPeriods.includes(period)) { + res.status(400).json({ + success: false, + error: { message: 'Invalid period', code: 'VALIDATION_ERROR' }, + }); + return; + } + + const portfolio = await portfolioService.getPortfolio(portfolioId); + if (!portfolio) { + res.status(404).json({ + success: false, + error: { message: 'Portfolio not found', code: 'NOT_FOUND' }, + }); + return; + } + + if (portfolio.userId !== userId) { + res.status(403).json({ + success: false, + error: { message: 'Forbidden', code: 'FORBIDDEN' }, + }); + return; + } + + const performanceData = await snapshotRepository.getPerformanceData( + portfolioId, + period as 'week' | 'month' | '3months' | 'year' | 'all' + ); + + res.json({ + success: true, + data: performanceData, + }); + } catch (error) { + next(error); + } +} + +/** + * Get detailed performance statistics + */ +export async function getPerformanceStats(req: AuthRequest, res: Response, next: NextFunction): Promise { + try { + const userId = req.user?.id; + if (!userId) { + res.status(401).json({ + success: false, + error: { message: 'Unauthorized', code: 'UNAUTHORIZED' }, + }); + return; + } + + const { portfolioId } = req.params; + + const portfolio = await portfolioService.getPortfolio(portfolioId); + if (!portfolio) { + res.status(404).json({ + success: false, + error: { message: 'Portfolio not found', code: 'NOT_FOUND' }, + }); + return; + } + + if (portfolio.userId !== userId) { + res.status(403).json({ + success: false, + error: { message: 'Forbidden', code: 'FORBIDDEN' }, + }); + return; + } + + const stats = await snapshotRepository.getPerformanceStats(portfolioId); + + res.json({ + success: true, + data: stats, + }); + } catch (error) { + next(error); + } +} diff --git a/src/modules/portfolio/portfolio.routes.ts b/src/modules/portfolio/portfolio.routes.ts index 7de80f8..a7ded46 100644 --- a/src/modules/portfolio/portfolio.routes.ts +++ b/src/modules/portfolio/portfolio.routes.ts @@ -48,6 +48,19 @@ router.put('/:portfolioId/allocations', authHandler(portfolioController.updateAl */ router.get('/:portfolioId/stats', authHandler(portfolioController.getPortfolioStats)); +/** + * GET /api/v1/portfolio/:portfolioId/performance + * Get portfolio performance history for charts + * Query: period = 'week' | 'month' | '3months' | 'year' | 'all' + */ +router.get('/:portfolioId/performance', authHandler(portfolioController.getPortfolioPerformance)); + +/** + * GET /api/v1/portfolio/:portfolioId/performance/stats + * Get detailed performance statistics + */ +router.get('/:portfolioId/performance/stats', authHandler(portfolioController.getPerformanceStats)); + // ============================================================================ // Rebalancing (Authenticated) // ============================================================================ diff --git a/src/modules/portfolio/repositories/index.ts b/src/modules/portfolio/repositories/index.ts index 0c7f14b..d54d217 100644 --- a/src/modules/portfolio/repositories/index.ts +++ b/src/modules/portfolio/repositories/index.ts @@ -4,3 +4,4 @@ export * from './portfolio.repository'; export * from './goal.repository'; +export * from './snapshot.repository'; diff --git a/src/modules/portfolio/repositories/snapshot.repository.ts b/src/modules/portfolio/repositories/snapshot.repository.ts new file mode 100644 index 0000000..73d7ca9 --- /dev/null +++ b/src/modules/portfolio/repositories/snapshot.repository.ts @@ -0,0 +1,338 @@ +/** + * Portfolio Snapshot Repository + * Handles database operations for portfolio snapshots (historical data) + */ + +import { db } from '../../../shared/database'; + +// ============================================================================ +// Types +// ============================================================================ + +export interface SnapshotRow { + id: string; + portfolio_id: string; + snapshot_date: Date; + total_value: string; + total_cost: string; + unrealized_pnl: string; + unrealized_pnl_percent: string; + day_change: string; + day_change_percent: string; + allocations: Record | null; + created_at: Date; +} + +export interface PortfolioSnapshot { + id: string; + portfolioId: string; + snapshotDate: Date; + totalValue: number; + totalCost: number; + unrealizedPnl: number; + unrealizedPnlPercent: number; + dayChange: number; + dayChangePercent: number; + allocations: Record | null; + createdAt: Date; +} + +export interface CreateSnapshotInput { + portfolioId: string; + snapshotDate: Date; + totalValue: number; + totalCost: number; + unrealizedPnl: number; + unrealizedPnlPercent: number; + dayChange?: number; + dayChangePercent?: number; + allocations?: Record; +} + +export interface PerformanceDataPoint { + date: string; + value: number; + pnl: number; + pnlPercent: number; + change: number; + changePercent: number; +} + +// ============================================================================ +// Helper Functions +// ============================================================================ + +function mapRowToSnapshot(row: SnapshotRow): PortfolioSnapshot { + return { + id: row.id, + portfolioId: row.portfolio_id, + snapshotDate: row.snapshot_date, + totalValue: parseFloat(row.total_value || '0'), + totalCost: parseFloat(row.total_cost || '0'), + unrealizedPnl: parseFloat(row.unrealized_pnl || '0'), + unrealizedPnlPercent: parseFloat(row.unrealized_pnl_percent || '0'), + dayChange: parseFloat(row.day_change || '0'), + dayChangePercent: parseFloat(row.day_change_percent || '0'), + allocations: row.allocations, + createdAt: row.created_at, + }; +} + +// ============================================================================ +// Repository Class +// ============================================================================ + +class SnapshotRepository { + /** + * Create a new snapshot + */ + async create(input: CreateSnapshotInput): Promise { + const result = await db.query( + `INSERT INTO portfolio.portfolio_snapshots ( + portfolio_id, snapshot_date, total_value, total_cost, + unrealized_pnl, unrealized_pnl_percent, day_change, day_change_percent, allocations + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (portfolio_id, snapshot_date) + DO UPDATE SET + total_value = EXCLUDED.total_value, + total_cost = EXCLUDED.total_cost, + unrealized_pnl = EXCLUDED.unrealized_pnl, + unrealized_pnl_percent = EXCLUDED.unrealized_pnl_percent, + day_change = EXCLUDED.day_change, + day_change_percent = EXCLUDED.day_change_percent, + allocations = EXCLUDED.allocations + RETURNING *`, + [ + input.portfolioId, + input.snapshotDate, + input.totalValue, + input.totalCost, + input.unrealizedPnl, + input.unrealizedPnlPercent, + input.dayChange ?? 0, + input.dayChangePercent ?? 0, + input.allocations ? JSON.stringify(input.allocations) : null, + ] + ); + + return mapRowToSnapshot(result.rows[0]); + } + + /** + * Get snapshots for a portfolio within date range + */ + async findByPortfolioId( + portfolioId: string, + startDate?: Date, + endDate?: Date + ): Promise { + let query = `SELECT * FROM portfolio.portfolio_snapshots WHERE portfolio_id = $1`; + const params: (string | Date)[] = [portfolioId]; + + if (startDate) { + query += ` AND snapshot_date >= $${params.length + 1}`; + params.push(startDate); + } + + if (endDate) { + query += ` AND snapshot_date <= $${params.length + 1}`; + params.push(endDate); + } + + query += ` ORDER BY snapshot_date ASC`; + + const result = await db.query(query, params); + return result.rows.map(mapRowToSnapshot); + } + + /** + * Get latest snapshot for a portfolio + */ + async findLatest(portfolioId: string): Promise { + const result = await db.query( + `SELECT * FROM portfolio.portfolio_snapshots + WHERE portfolio_id = $1 + ORDER BY snapshot_date DESC + LIMIT 1`, + [portfolioId] + ); + + if (result.rows.length === 0) return null; + return mapRowToSnapshot(result.rows[0]); + } + + /** + * Get performance data for charts + */ + async getPerformanceData( + portfolioId: string, + period: 'week' | 'month' | '3months' | 'year' | 'all' + ): Promise { + let startDate: Date; + const now = new Date(); + + switch (period) { + case 'week': + startDate = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000); + break; + case 'month': + startDate = new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000); + break; + case '3months': + startDate = new Date(now.getTime() - 90 * 24 * 60 * 60 * 1000); + break; + case 'year': + startDate = new Date(now.getTime() - 365 * 24 * 60 * 60 * 1000); + break; + case 'all': + default: + startDate = new Date(0); + } + + const result = await db.query<{ + snapshot_date: Date; + total_value: string; + unrealized_pnl: string; + unrealized_pnl_percent: string; + day_change: string; + day_change_percent: string; + }>( + `SELECT snapshot_date, total_value, unrealized_pnl, unrealized_pnl_percent, + day_change, day_change_percent + FROM portfolio.portfolio_snapshots + WHERE portfolio_id = $1 AND snapshot_date >= $2 + ORDER BY snapshot_date ASC`, + [portfolioId, startDate] + ); + + return result.rows.map((row) => ({ + date: row.snapshot_date.toISOString().split('T')[0], + value: parseFloat(row.total_value), + pnl: parseFloat(row.unrealized_pnl), + pnlPercent: parseFloat(row.unrealized_pnl_percent), + change: parseFloat(row.day_change), + changePercent: parseFloat(row.day_change_percent), + })); + } + + /** + * Get cumulative performance stats + */ + async getPerformanceStats(portfolioId: string): Promise<{ + dayChange: number; + dayChangePercent: number; + weekChange: number; + weekChangePercent: number; + monthChange: number; + monthChangePercent: number; + yearChange: number; + yearChangePercent: number; + allTimeChange: number; + allTimeChangePercent: number; + }> { + const now = new Date(); + const dayAgo = new Date(now.getTime() - 24 * 60 * 60 * 1000); + const weekAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000); + const monthAgo = new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000); + const yearAgo = new Date(now.getTime() - 365 * 24 * 60 * 60 * 1000); + + const result = await db.query<{ + period: string; + start_value: string; + end_value: string; + }>( + `WITH periods AS ( + SELECT 'day' as period, $2::date as start_date + UNION ALL SELECT 'week', $3::date + UNION ALL SELECT 'month', $4::date + UNION ALL SELECT 'year', $5::date + UNION ALL SELECT 'all', '1970-01-01'::date + ), + latest AS ( + SELECT total_value + FROM portfolio.portfolio_snapshots + WHERE portfolio_id = $1 + ORDER BY snapshot_date DESC + LIMIT 1 + ), + period_starts AS ( + SELECT p.period, + COALESCE( + (SELECT total_value FROM portfolio.portfolio_snapshots + WHERE portfolio_id = $1 AND snapshot_date <= p.start_date + ORDER BY snapshot_date DESC LIMIT 1), + (SELECT total_value FROM latest) + ) as start_value + FROM periods p + ) + SELECT ps.period, ps.start_value, l.total_value as end_value + FROM period_starts ps + CROSS JOIN latest l`, + [portfolioId, dayAgo, weekAgo, monthAgo, yearAgo] + ); + + const stats = { + dayChange: 0, + dayChangePercent: 0, + weekChange: 0, + weekChangePercent: 0, + monthChange: 0, + monthChangePercent: 0, + yearChange: 0, + yearChangePercent: 0, + allTimeChange: 0, + allTimeChangePercent: 0, + }; + + result.rows.forEach((row) => { + const startVal = parseFloat(row.start_value || '0'); + const endVal = parseFloat(row.end_value || '0'); + const change = endVal - startVal; + const changePercent = startVal > 0 ? (change / startVal) * 100 : 0; + + switch (row.period) { + case 'day': + stats.dayChange = change; + stats.dayChangePercent = changePercent; + break; + case 'week': + stats.weekChange = change; + stats.weekChangePercent = changePercent; + break; + case 'month': + stats.monthChange = change; + stats.monthChangePercent = changePercent; + break; + case 'year': + stats.yearChange = change; + stats.yearChangePercent = changePercent; + break; + case 'all': + stats.allTimeChange = change; + stats.allTimeChangePercent = changePercent; + break; + } + }); + + return stats; + } + + /** + * Delete old snapshots (cleanup) + */ + async deleteOldSnapshots(portfolioId: string, keepDays: number = 365): Promise { + const cutoffDate = new Date(); + cutoffDate.setDate(cutoffDate.getDate() - keepDays); + + const result = await db.query( + `DELETE FROM portfolio.portfolio_snapshots + WHERE portfolio_id = $1 AND snapshot_date < $2`, + [portfolioId, cutoffDate] + ); + + return result.rowCount || 0; + } +} + +// Export singleton instance +export const snapshotRepository = new SnapshotRepository(); diff --git a/src/modules/portfolio/websocket/portfolio.websocket.ts b/src/modules/portfolio/websocket/portfolio.websocket.ts new file mode 100644 index 0000000..ae16eac --- /dev/null +++ b/src/modules/portfolio/websocket/portfolio.websocket.ts @@ -0,0 +1,288 @@ +/** + * Portfolio WebSocket Handler + * Real-time updates for portfolio data + */ + +import { wsManager, WSClient, WSMessage } from '../../../core/websocket/websocket.server'; +import { portfolioService } from '../services/portfolio.service'; +import { logger } from '../../../shared/utils/logger'; + +// ============================================================================ +// Types +// ============================================================================ + +interface PortfolioUpdateMessage { + portfolioId: string; + totalValue: number; + unrealizedPnl: number; + unrealizedPnlPercent: number; + allocations: { + asset: string; + value: number; + currentPercent: number; + pnl: number; + pnlPercent: number; + }[]; +} + +interface PriceUpdateMessage { + asset: string; + price: number; + change24h: number; +} + +// ============================================================================ +// Portfolio WebSocket Service +// ============================================================================ + +class PortfolioWebSocketService { + private updateIntervals: Map = new Map(); + private readonly UPDATE_INTERVAL = 10000; // 10 seconds + + /** + * Initialize portfolio WebSocket handlers + */ + initialize(): void { + // Register message handlers + wsManager.registerHandler('portfolio:subscribe', this.handleSubscribe.bind(this)); + wsManager.registerHandler('portfolio:unsubscribe', this.handleUnsubscribe.bind(this)); + wsManager.registerHandler('portfolio:refresh', this.handleRefresh.bind(this)); + + // Listen for subscription events + wsManager.on('subscribe', (client: WSClient, channel: string) => { + if (channel.startsWith('portfolio:')) { + this.onPortfolioSubscribe(client, channel); + } + }); + + wsManager.on('unsubscribe', (client: WSClient, channel: string) => { + if (channel.startsWith('portfolio:')) { + this.onPortfolioUnsubscribe(channel); + } + }); + + logger.info('[Portfolio WS] Handlers initialized'); + } + + /** + * Handle portfolio subscribe message + */ + private async handleSubscribe(client: WSClient, message: WSMessage): Promise { + const portfolioId = (message.data as { portfolioId?: string })?.portfolioId; + + if (!portfolioId) { + wsManager.send(client, { + type: 'error', + data: { message: 'portfolioId is required' }, + }); + return; + } + + if (!client.userId) { + wsManager.send(client, { + type: 'error', + data: { message: 'Authentication required' }, + }); + return; + } + + // Verify user owns this portfolio + const portfolio = await portfolioService.getPortfolio(portfolioId); + if (!portfolio || portfolio.userId !== client.userId) { + wsManager.send(client, { + type: 'error', + data: { message: 'Portfolio not found or access denied' }, + }); + return; + } + + // Add subscription + const channel = `portfolio:${portfolioId}`; + client.subscriptions.add(channel); + + // Send initial data + await this.sendPortfolioUpdate(portfolioId); + + wsManager.send(client, { + type: 'portfolio:subscribed', + data: { portfolioId, channel }, + }); + } + + /** + * Handle portfolio unsubscribe message + */ + private handleUnsubscribe(client: WSClient, message: WSMessage): void { + const portfolioId = (message.data as { portfolioId?: string })?.portfolioId; + + if (portfolioId) { + const channel = `portfolio:${portfolioId}`; + client.subscriptions.delete(channel); + + wsManager.send(client, { + type: 'portfolio:unsubscribed', + data: { portfolioId }, + }); + } + } + + /** + * Handle manual refresh request + */ + private async handleRefresh(client: WSClient, message: WSMessage): Promise { + const portfolioId = (message.data as { portfolioId?: string })?.portfolioId; + + if (!portfolioId || !client.userId) return; + + const portfolio = await portfolioService.getPortfolio(portfolioId); + if (!portfolio || portfolio.userId !== client.userId) return; + + // Send fresh data to requesting client only + const updateData = this.formatPortfolioUpdate(portfolio); + wsManager.send(client, { + type: 'portfolio:update', + data: updateData, + }); + } + + /** + * Called when a client subscribes to a portfolio channel + */ + private onPortfolioSubscribe(client: WSClient, channel: string): void { + const portfolioId = channel.replace('portfolio:', ''); + + // Start update interval if not already running + if (!this.updateIntervals.has(portfolioId)) { + const interval = setInterval(async () => { + await this.sendPortfolioUpdate(portfolioId); + }, this.UPDATE_INTERVAL); + + this.updateIntervals.set(portfolioId, interval); + logger.debug('[Portfolio WS] Started updates for:', { portfolioId }); + } + } + + /** + * Called when a client unsubscribes from a portfolio channel + */ + private onPortfolioUnsubscribe(channel: string): void { + const portfolioId = channel.replace('portfolio:', ''); + + // Check if anyone is still subscribed + const subscriberCount = wsManager.getChannelSubscriberCount(channel); + + if (subscriberCount === 0) { + // Stop update interval + const interval = this.updateIntervals.get(portfolioId); + if (interval) { + clearInterval(interval); + this.updateIntervals.delete(portfolioId); + logger.debug('[Portfolio WS] Stopped updates for:', { portfolioId }); + } + } + } + + /** + * Send portfolio update to all subscribers + */ + async sendPortfolioUpdate(portfolioId: string): Promise { + try { + const portfolio = await portfolioService.getPortfolio(portfolioId); + if (!portfolio) return; + + const updateData = this.formatPortfolioUpdate(portfolio); + + wsManager.broadcast(`portfolio:${portfolioId}`, { + type: 'portfolio:update', + data: updateData, + }); + } catch (error) { + logger.error('[Portfolio WS] Error sending update:', { + portfolioId, + error: (error as Error).message, + }); + } + } + + /** + * Format portfolio data for WebSocket message + */ + private formatPortfolioUpdate(portfolio: { + id: string; + totalValue: number; + unrealizedPnl: number; + unrealizedPnlPercent: number; + allocations: { + asset: string; + value: number; + currentPercent: number; + pnl: number; + pnlPercent: number; + }[]; + }): PortfolioUpdateMessage { + return { + portfolioId: portfolio.id, + totalValue: portfolio.totalValue, + unrealizedPnl: portfolio.unrealizedPnl, + unrealizedPnlPercent: portfolio.unrealizedPnlPercent, + allocations: portfolio.allocations.map((a) => ({ + asset: a.asset, + value: a.value, + currentPercent: a.currentPercent, + pnl: a.pnl, + pnlPercent: a.pnlPercent, + })), + }; + } + + /** + * Broadcast price update to all portfolio channels that have this asset + */ + async broadcastPriceUpdate(asset: string, price: number, change24h: number): Promise { + const message: PriceUpdateMessage = { asset, price, change24h }; + + // Get all active portfolio channels + const channels = wsManager.getActiveChannels().filter((c) => c.startsWith('portfolio:')); + + // Broadcast to each channel + channels.forEach((channel) => { + wsManager.broadcast(channel, { + type: 'price:update', + data: message, + }); + }); + } + + /** + * Send notification to user about portfolio events + */ + sendUserNotification( + userId: string, + notification: { + type: 'rebalance_needed' | 'goal_achieved' | 'significant_change'; + title: string; + message: string; + portfolioId?: string; + goalId?: string; + } + ): void { + wsManager.sendToUser(userId, { + type: 'portfolio:notification', + data: notification, + }); + } + + /** + * Cleanup on shutdown + */ + shutdown(): void { + this.updateIntervals.forEach((interval) => { + clearInterval(interval); + }); + this.updateIntervals.clear(); + logger.info('[Portfolio WS] Service shut down'); + } +} + +// Export singleton instance +export const portfolioWebSocket = new PortfolioWebSocketService();