[OQI-008] feat: Add portfolio Phase 3 - WebSocket, snapshots, performance APIs

- Add portfolio.websocket.ts for real-time portfolio updates
- Add snapshot.repository.ts for historical performance data
- Add getPortfolioPerformance and getPerformanceStats endpoints
- Update routes with /performance and /performance/stats

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Adrian Flores Cortes 2026-01-25 08:40:22 -06:00
parent f40dfa8061
commit 32a088698e
5 changed files with 746 additions and 0 deletions

View File

@ -5,6 +5,7 @@
import { Request, Response, NextFunction } from 'express';
import { portfolioService, RiskProfile } from '../services/portfolio.service';
import { snapshotRepository } from '../repositories/snapshot.repository';
// ============================================================================
// Types
@ -458,3 +459,108 @@ export async function deleteGoal(req: AuthRequest, res: Response, next: NextFunc
next(error);
}
}
// ============================================================================
// Performance & Analytics
// ============================================================================
/**
* Get portfolio performance history for charts
*/
export async function getPortfolioPerformance(req: AuthRequest, res: Response, next: NextFunction): Promise<void> {
try {
const userId = req.user?.id;
if (!userId) {
res.status(401).json({
success: false,
error: { message: 'Unauthorized', code: 'UNAUTHORIZED' },
});
return;
}
const { portfolioId } = req.params;
const period = (req.query.period as string) || 'month';
const validPeriods = ['week', 'month', '3months', 'year', 'all'];
if (!validPeriods.includes(period)) {
res.status(400).json({
success: false,
error: { message: 'Invalid period', code: 'VALIDATION_ERROR' },
});
return;
}
const portfolio = await portfolioService.getPortfolio(portfolioId);
if (!portfolio) {
res.status(404).json({
success: false,
error: { message: 'Portfolio not found', code: 'NOT_FOUND' },
});
return;
}
if (portfolio.userId !== userId) {
res.status(403).json({
success: false,
error: { message: 'Forbidden', code: 'FORBIDDEN' },
});
return;
}
const performanceData = await snapshotRepository.getPerformanceData(
portfolioId,
period as 'week' | 'month' | '3months' | 'year' | 'all'
);
res.json({
success: true,
data: performanceData,
});
} catch (error) {
next(error);
}
}
/**
* Get detailed performance statistics
*/
export async function getPerformanceStats(req: AuthRequest, res: Response, next: NextFunction): Promise<void> {
try {
const userId = req.user?.id;
if (!userId) {
res.status(401).json({
success: false,
error: { message: 'Unauthorized', code: 'UNAUTHORIZED' },
});
return;
}
const { portfolioId } = req.params;
const portfolio = await portfolioService.getPortfolio(portfolioId);
if (!portfolio) {
res.status(404).json({
success: false,
error: { message: 'Portfolio not found', code: 'NOT_FOUND' },
});
return;
}
if (portfolio.userId !== userId) {
res.status(403).json({
success: false,
error: { message: 'Forbidden', code: 'FORBIDDEN' },
});
return;
}
const stats = await snapshotRepository.getPerformanceStats(portfolioId);
res.json({
success: true,
data: stats,
});
} catch (error) {
next(error);
}
}

View File

@ -48,6 +48,19 @@ router.put('/:portfolioId/allocations', authHandler(portfolioController.updateAl
*/
router.get('/:portfolioId/stats', authHandler(portfolioController.getPortfolioStats));
/**
* GET /api/v1/portfolio/:portfolioId/performance
* Get portfolio performance history for charts
* Query: period = 'week' | 'month' | '3months' | 'year' | 'all'
*/
router.get('/:portfolioId/performance', authHandler(portfolioController.getPortfolioPerformance));
/**
* GET /api/v1/portfolio/:portfolioId/performance/stats
* Get detailed performance statistics
*/
router.get('/:portfolioId/performance/stats', authHandler(portfolioController.getPerformanceStats));
// ============================================================================
// Rebalancing (Authenticated)
// ============================================================================

View File

@ -4,3 +4,4 @@
export * from './portfolio.repository';
export * from './goal.repository';
export * from './snapshot.repository';

View File

@ -0,0 +1,338 @@
/**
* Portfolio Snapshot Repository
* Handles database operations for portfolio snapshots (historical data)
*/
import { db } from '../../../shared/database';
// ============================================================================
// Types
// ============================================================================
export interface SnapshotRow {
id: string;
portfolio_id: string;
snapshot_date: Date;
total_value: string;
total_cost: string;
unrealized_pnl: string;
unrealized_pnl_percent: string;
day_change: string;
day_change_percent: string;
allocations: Record<string, unknown> | null;
created_at: Date;
}
export interface PortfolioSnapshot {
id: string;
portfolioId: string;
snapshotDate: Date;
totalValue: number;
totalCost: number;
unrealizedPnl: number;
unrealizedPnlPercent: number;
dayChange: number;
dayChangePercent: number;
allocations: Record<string, unknown> | null;
createdAt: Date;
}
export interface CreateSnapshotInput {
portfolioId: string;
snapshotDate: Date;
totalValue: number;
totalCost: number;
unrealizedPnl: number;
unrealizedPnlPercent: number;
dayChange?: number;
dayChangePercent?: number;
allocations?: Record<string, unknown>;
}
export interface PerformanceDataPoint {
date: string;
value: number;
pnl: number;
pnlPercent: number;
change: number;
changePercent: number;
}
// ============================================================================
// Helper Functions
// ============================================================================
function mapRowToSnapshot(row: SnapshotRow): PortfolioSnapshot {
return {
id: row.id,
portfolioId: row.portfolio_id,
snapshotDate: row.snapshot_date,
totalValue: parseFloat(row.total_value || '0'),
totalCost: parseFloat(row.total_cost || '0'),
unrealizedPnl: parseFloat(row.unrealized_pnl || '0'),
unrealizedPnlPercent: parseFloat(row.unrealized_pnl_percent || '0'),
dayChange: parseFloat(row.day_change || '0'),
dayChangePercent: parseFloat(row.day_change_percent || '0'),
allocations: row.allocations,
createdAt: row.created_at,
};
}
// ============================================================================
// Repository Class
// ============================================================================
class SnapshotRepository {
/**
* Create a new snapshot
*/
async create(input: CreateSnapshotInput): Promise<PortfolioSnapshot> {
const result = await db.query<SnapshotRow>(
`INSERT INTO portfolio.portfolio_snapshots (
portfolio_id, snapshot_date, total_value, total_cost,
unrealized_pnl, unrealized_pnl_percent, day_change, day_change_percent, allocations
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (portfolio_id, snapshot_date)
DO UPDATE SET
total_value = EXCLUDED.total_value,
total_cost = EXCLUDED.total_cost,
unrealized_pnl = EXCLUDED.unrealized_pnl,
unrealized_pnl_percent = EXCLUDED.unrealized_pnl_percent,
day_change = EXCLUDED.day_change,
day_change_percent = EXCLUDED.day_change_percent,
allocations = EXCLUDED.allocations
RETURNING *`,
[
input.portfolioId,
input.snapshotDate,
input.totalValue,
input.totalCost,
input.unrealizedPnl,
input.unrealizedPnlPercent,
input.dayChange ?? 0,
input.dayChangePercent ?? 0,
input.allocations ? JSON.stringify(input.allocations) : null,
]
);
return mapRowToSnapshot(result.rows[0]);
}
/**
* Get snapshots for a portfolio within date range
*/
async findByPortfolioId(
portfolioId: string,
startDate?: Date,
endDate?: Date
): Promise<PortfolioSnapshot[]> {
let query = `SELECT * FROM portfolio.portfolio_snapshots WHERE portfolio_id = $1`;
const params: (string | Date)[] = [portfolioId];
if (startDate) {
query += ` AND snapshot_date >= $${params.length + 1}`;
params.push(startDate);
}
if (endDate) {
query += ` AND snapshot_date <= $${params.length + 1}`;
params.push(endDate);
}
query += ` ORDER BY snapshot_date ASC`;
const result = await db.query<SnapshotRow>(query, params);
return result.rows.map(mapRowToSnapshot);
}
/**
* Get latest snapshot for a portfolio
*/
async findLatest(portfolioId: string): Promise<PortfolioSnapshot | null> {
const result = await db.query<SnapshotRow>(
`SELECT * FROM portfolio.portfolio_snapshots
WHERE portfolio_id = $1
ORDER BY snapshot_date DESC
LIMIT 1`,
[portfolioId]
);
if (result.rows.length === 0) return null;
return mapRowToSnapshot(result.rows[0]);
}
/**
* Get performance data for charts
*/
async getPerformanceData(
portfolioId: string,
period: 'week' | 'month' | '3months' | 'year' | 'all'
): Promise<PerformanceDataPoint[]> {
let startDate: Date;
const now = new Date();
switch (period) {
case 'week':
startDate = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000);
break;
case 'month':
startDate = new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000);
break;
case '3months':
startDate = new Date(now.getTime() - 90 * 24 * 60 * 60 * 1000);
break;
case 'year':
startDate = new Date(now.getTime() - 365 * 24 * 60 * 60 * 1000);
break;
case 'all':
default:
startDate = new Date(0);
}
const result = await db.query<{
snapshot_date: Date;
total_value: string;
unrealized_pnl: string;
unrealized_pnl_percent: string;
day_change: string;
day_change_percent: string;
}>(
`SELECT snapshot_date, total_value, unrealized_pnl, unrealized_pnl_percent,
day_change, day_change_percent
FROM portfolio.portfolio_snapshots
WHERE portfolio_id = $1 AND snapshot_date >= $2
ORDER BY snapshot_date ASC`,
[portfolioId, startDate]
);
return result.rows.map((row) => ({
date: row.snapshot_date.toISOString().split('T')[0],
value: parseFloat(row.total_value),
pnl: parseFloat(row.unrealized_pnl),
pnlPercent: parseFloat(row.unrealized_pnl_percent),
change: parseFloat(row.day_change),
changePercent: parseFloat(row.day_change_percent),
}));
}
/**
* Get cumulative performance stats
*/
async getPerformanceStats(portfolioId: string): Promise<{
dayChange: number;
dayChangePercent: number;
weekChange: number;
weekChangePercent: number;
monthChange: number;
monthChangePercent: number;
yearChange: number;
yearChangePercent: number;
allTimeChange: number;
allTimeChangePercent: number;
}> {
const now = new Date();
const dayAgo = new Date(now.getTime() - 24 * 60 * 60 * 1000);
const weekAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000);
const monthAgo = new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000);
const yearAgo = new Date(now.getTime() - 365 * 24 * 60 * 60 * 1000);
const result = await db.query<{
period: string;
start_value: string;
end_value: string;
}>(
`WITH periods AS (
SELECT 'day' as period, $2::date as start_date
UNION ALL SELECT 'week', $3::date
UNION ALL SELECT 'month', $4::date
UNION ALL SELECT 'year', $5::date
UNION ALL SELECT 'all', '1970-01-01'::date
),
latest AS (
SELECT total_value
FROM portfolio.portfolio_snapshots
WHERE portfolio_id = $1
ORDER BY snapshot_date DESC
LIMIT 1
),
period_starts AS (
SELECT p.period,
COALESCE(
(SELECT total_value FROM portfolio.portfolio_snapshots
WHERE portfolio_id = $1 AND snapshot_date <= p.start_date
ORDER BY snapshot_date DESC LIMIT 1),
(SELECT total_value FROM latest)
) as start_value
FROM periods p
)
SELECT ps.period, ps.start_value, l.total_value as end_value
FROM period_starts ps
CROSS JOIN latest l`,
[portfolioId, dayAgo, weekAgo, monthAgo, yearAgo]
);
const stats = {
dayChange: 0,
dayChangePercent: 0,
weekChange: 0,
weekChangePercent: 0,
monthChange: 0,
monthChangePercent: 0,
yearChange: 0,
yearChangePercent: 0,
allTimeChange: 0,
allTimeChangePercent: 0,
};
result.rows.forEach((row) => {
const startVal = parseFloat(row.start_value || '0');
const endVal = parseFloat(row.end_value || '0');
const change = endVal - startVal;
const changePercent = startVal > 0 ? (change / startVal) * 100 : 0;
switch (row.period) {
case 'day':
stats.dayChange = change;
stats.dayChangePercent = changePercent;
break;
case 'week':
stats.weekChange = change;
stats.weekChangePercent = changePercent;
break;
case 'month':
stats.monthChange = change;
stats.monthChangePercent = changePercent;
break;
case 'year':
stats.yearChange = change;
stats.yearChangePercent = changePercent;
break;
case 'all':
stats.allTimeChange = change;
stats.allTimeChangePercent = changePercent;
break;
}
});
return stats;
}
/**
* Delete old snapshots (cleanup)
*/
async deleteOldSnapshots(portfolioId: string, keepDays: number = 365): Promise<number> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - keepDays);
const result = await db.query(
`DELETE FROM portfolio.portfolio_snapshots
WHERE portfolio_id = $1 AND snapshot_date < $2`,
[portfolioId, cutoffDate]
);
return result.rowCount || 0;
}
}
// Export singleton instance
export const snapshotRepository = new SnapshotRepository();

View File

@ -0,0 +1,288 @@
/**
* Portfolio WebSocket Handler
* Real-time updates for portfolio data
*/
import { wsManager, WSClient, WSMessage } from '../../../core/websocket/websocket.server';
import { portfolioService } from '../services/portfolio.service';
import { logger } from '../../../shared/utils/logger';
// ============================================================================
// Types
// ============================================================================
interface PortfolioUpdateMessage {
portfolioId: string;
totalValue: number;
unrealizedPnl: number;
unrealizedPnlPercent: number;
allocations: {
asset: string;
value: number;
currentPercent: number;
pnl: number;
pnlPercent: number;
}[];
}
interface PriceUpdateMessage {
asset: string;
price: number;
change24h: number;
}
// ============================================================================
// Portfolio WebSocket Service
// ============================================================================
class PortfolioWebSocketService {
private updateIntervals: Map<string, NodeJS.Timeout> = new Map();
private readonly UPDATE_INTERVAL = 10000; // 10 seconds
/**
* Initialize portfolio WebSocket handlers
*/
initialize(): void {
// Register message handlers
wsManager.registerHandler('portfolio:subscribe', this.handleSubscribe.bind(this));
wsManager.registerHandler('portfolio:unsubscribe', this.handleUnsubscribe.bind(this));
wsManager.registerHandler('portfolio:refresh', this.handleRefresh.bind(this));
// Listen for subscription events
wsManager.on('subscribe', (client: WSClient, channel: string) => {
if (channel.startsWith('portfolio:')) {
this.onPortfolioSubscribe(client, channel);
}
});
wsManager.on('unsubscribe', (client: WSClient, channel: string) => {
if (channel.startsWith('portfolio:')) {
this.onPortfolioUnsubscribe(channel);
}
});
logger.info('[Portfolio WS] Handlers initialized');
}
/**
* Handle portfolio subscribe message
*/
private async handleSubscribe(client: WSClient, message: WSMessage): Promise<void> {
const portfolioId = (message.data as { portfolioId?: string })?.portfolioId;
if (!portfolioId) {
wsManager.send(client, {
type: 'error',
data: { message: 'portfolioId is required' },
});
return;
}
if (!client.userId) {
wsManager.send(client, {
type: 'error',
data: { message: 'Authentication required' },
});
return;
}
// Verify user owns this portfolio
const portfolio = await portfolioService.getPortfolio(portfolioId);
if (!portfolio || portfolio.userId !== client.userId) {
wsManager.send(client, {
type: 'error',
data: { message: 'Portfolio not found or access denied' },
});
return;
}
// Add subscription
const channel = `portfolio:${portfolioId}`;
client.subscriptions.add(channel);
// Send initial data
await this.sendPortfolioUpdate(portfolioId);
wsManager.send(client, {
type: 'portfolio:subscribed',
data: { portfolioId, channel },
});
}
/**
* Handle portfolio unsubscribe message
*/
private handleUnsubscribe(client: WSClient, message: WSMessage): void {
const portfolioId = (message.data as { portfolioId?: string })?.portfolioId;
if (portfolioId) {
const channel = `portfolio:${portfolioId}`;
client.subscriptions.delete(channel);
wsManager.send(client, {
type: 'portfolio:unsubscribed',
data: { portfolioId },
});
}
}
/**
* Handle manual refresh request
*/
private async handleRefresh(client: WSClient, message: WSMessage): Promise<void> {
const portfolioId = (message.data as { portfolioId?: string })?.portfolioId;
if (!portfolioId || !client.userId) return;
const portfolio = await portfolioService.getPortfolio(portfolioId);
if (!portfolio || portfolio.userId !== client.userId) return;
// Send fresh data to requesting client only
const updateData = this.formatPortfolioUpdate(portfolio);
wsManager.send(client, {
type: 'portfolio:update',
data: updateData,
});
}
/**
* Called when a client subscribes to a portfolio channel
*/
private onPortfolioSubscribe(client: WSClient, channel: string): void {
const portfolioId = channel.replace('portfolio:', '');
// Start update interval if not already running
if (!this.updateIntervals.has(portfolioId)) {
const interval = setInterval(async () => {
await this.sendPortfolioUpdate(portfolioId);
}, this.UPDATE_INTERVAL);
this.updateIntervals.set(portfolioId, interval);
logger.debug('[Portfolio WS] Started updates for:', { portfolioId });
}
}
/**
* Called when a client unsubscribes from a portfolio channel
*/
private onPortfolioUnsubscribe(channel: string): void {
const portfolioId = channel.replace('portfolio:', '');
// Check if anyone is still subscribed
const subscriberCount = wsManager.getChannelSubscriberCount(channel);
if (subscriberCount === 0) {
// Stop update interval
const interval = this.updateIntervals.get(portfolioId);
if (interval) {
clearInterval(interval);
this.updateIntervals.delete(portfolioId);
logger.debug('[Portfolio WS] Stopped updates for:', { portfolioId });
}
}
}
/**
* Send portfolio update to all subscribers
*/
async sendPortfolioUpdate(portfolioId: string): Promise<void> {
try {
const portfolio = await portfolioService.getPortfolio(portfolioId);
if (!portfolio) return;
const updateData = this.formatPortfolioUpdate(portfolio);
wsManager.broadcast(`portfolio:${portfolioId}`, {
type: 'portfolio:update',
data: updateData,
});
} catch (error) {
logger.error('[Portfolio WS] Error sending update:', {
portfolioId,
error: (error as Error).message,
});
}
}
/**
* Format portfolio data for WebSocket message
*/
private formatPortfolioUpdate(portfolio: {
id: string;
totalValue: number;
unrealizedPnl: number;
unrealizedPnlPercent: number;
allocations: {
asset: string;
value: number;
currentPercent: number;
pnl: number;
pnlPercent: number;
}[];
}): PortfolioUpdateMessage {
return {
portfolioId: portfolio.id,
totalValue: portfolio.totalValue,
unrealizedPnl: portfolio.unrealizedPnl,
unrealizedPnlPercent: portfolio.unrealizedPnlPercent,
allocations: portfolio.allocations.map((a) => ({
asset: a.asset,
value: a.value,
currentPercent: a.currentPercent,
pnl: a.pnl,
pnlPercent: a.pnlPercent,
})),
};
}
/**
* Broadcast price update to all portfolio channels that have this asset
*/
async broadcastPriceUpdate(asset: string, price: number, change24h: number): Promise<void> {
const message: PriceUpdateMessage = { asset, price, change24h };
// Get all active portfolio channels
const channels = wsManager.getActiveChannels().filter((c) => c.startsWith('portfolio:'));
// Broadcast to each channel
channels.forEach((channel) => {
wsManager.broadcast(channel, {
type: 'price:update',
data: message,
});
});
}
/**
* Send notification to user about portfolio events
*/
sendUserNotification(
userId: string,
notification: {
type: 'rebalance_needed' | 'goal_achieved' | 'significant_change';
title: string;
message: string;
portfolioId?: string;
goalId?: string;
}
): void {
wsManager.sendToUser(userId, {
type: 'portfolio:notification',
data: notification,
});
}
/**
* Cleanup on shutdown
*/
shutdown(): void {
this.updateIntervals.forEach((interval) => {
clearInterval(interval);
});
this.updateIntervals.clear();
logger.info('[Portfolio WS] Service shut down');
}
}
// Export singleton instance
export const portfolioWebSocket = new PortfolioWebSocketService();