From e7745d18b96d1a224cb15faa07460900a57aba06 Mon Sep 17 00:00:00 2001 From: Adrian Flores Cortes Date: Tue, 3 Feb 2026 08:11:41 -0600 Subject: [PATCH] [SYNC] fix: Update market data service Co-Authored-By: Claude Opus 4.5 --- .../services/marketData.service.ts | 212 +++++++++++++++++- 1 file changed, 202 insertions(+), 10 deletions(-) diff --git a/src/modules/market-data/services/marketData.service.ts b/src/modules/market-data/services/marketData.service.ts index 521fc3a..07795e4 100644 --- a/src/modules/market-data/services/marketData.service.ts +++ b/src/modules/market-data/services/marketData.service.ts @@ -1,6 +1,7 @@ /** * Market Data Service * Provides OHLCV data from PostgreSQL with Redis caching + * Supports 5m, 15m (direct DB), 1h, 4h, 1d (aggregated from 5m) */ import { db } from '../../../shared/database'; @@ -15,12 +16,40 @@ import { TickerRow, } from '../types/market-data.types'; +// ============================================================================= +// Cache TTL Configuration by Timeframe +// ============================================================================= + +const CACHE_TTL_BY_TIMEFRAME: Record = { + '5m': 60, // 1 minute cache + '15m': 60, // 1 minute cache + '1h': 300, // 5 minute cache + '4h': 900, // 15 minute cache + '1d': 3600, // 1 hour cache +}; + +// ============================================================================= +// Timeframe Minutes Mapping +// ============================================================================= + +const TIMEFRAME_MINUTES: Record = { + '5m': 5, + '15m': 15, + '1h': 60, + '4h': 240, + '1d': 1440, +}; + class MarketDataService { private readonly CACHE_TTL = 60; private readonly DEFAULT_LIMIT = 100; + private readonly BASE_TIMEFRAME: Timeframe = '5m'; + private readonly BASE_MINUTES = 5; async getOHLCV(symbol: string, timeframe: Timeframe, limit?: number): Promise { - const cacheKey = `market-data:ohlcv:${symbol}:${timeframe}:${limit || this.DEFAULT_LIMIT}`; + const effectiveLimit = limit || this.DEFAULT_LIMIT; + const cacheTTL = CACHE_TTL_BY_TIMEFRAME[timeframe]; + const cacheKey = `market-data:ohlcv:${symbol}:${timeframe}:${effectiveLimit}`; try { const redisClient = await redis.getClient(); @@ -36,11 +65,17 @@ class MarketDataService { }); } - const data = await this.fetchOHLCVFromDB(symbol, timeframe, limit); + let data: OHLCV[]; + + if (this.requiresAggregation(timeframe)) { + data = await this.fetchAggregatedOHLCV(symbol, timeframe, effectiveLimit); + } else { + data = await this.fetchOHLCVFromDB(symbol, timeframe, effectiveLimit); + } try { const redisClient = await redis.getClient(); - await redisClient.setex(cacheKey, this.CACHE_TTL, JSON.stringify(data)); + await redisClient.setex(cacheKey, cacheTTL, JSON.stringify(data)); } catch (error) { logger.warn('Redis set failed', { error: (error as Error).message }); } @@ -49,6 +84,7 @@ class MarketDataService { } async getHistoricalData(symbol: string, timeframe: Timeframe, from: Date, to: Date): Promise { + const cacheTTL = CACHE_TTL_BY_TIMEFRAME[timeframe]; const cacheKey = `market-data:historical:${symbol}:${timeframe}:${from.getTime()}:${to.getTime()}`; try { @@ -65,11 +101,17 @@ class MarketDataService { }); } - const data = await this.fetchHistoricalFromDB(symbol, timeframe, from, to); + let data: OHLCV[]; + + if (this.requiresAggregation(timeframe)) { + data = await this.fetchAggregatedHistoricalData(symbol, timeframe, from, to); + } else { + data = await this.fetchHistoricalFromDB(symbol, timeframe, from, to); + } try { const redisClient = await redis.getClient(); - await redisClient.setex(cacheKey, this.CACHE_TTL, JSON.stringify(data)); + await redisClient.setex(cacheKey, cacheTTL, JSON.stringify(data)); } catch (error) { logger.warn('Redis set failed', { error: (error as Error).message }); } @@ -77,6 +119,38 @@ class MarketDataService { return data; } + /** + * Fetch aggregated historical data for 1h, 4h, 1d timeframes + */ + private async fetchAggregatedHistoricalData( + symbol: string, + timeframe: Timeframe, + from: Date, + to: Date + ): Promise { + const targetMinutes = TIMEFRAME_MINUTES[timeframe]; + + // Fetch base 5m candles for the date range + const baseCandles = await this.fetchHistoricalFromDB(symbol, this.BASE_TIMEFRAME, from, to); + + if (baseCandles.length === 0) { + return []; + } + + const aggregated = this.aggregateCandles(baseCandles, targetMinutes); + + // Filter to only include complete periods within the requested range + const fromMs = from.getTime(); + const toMs = to.getTime(); + + return aggregated + .filter((candle) => { + const candleTime = new Date(candle.timestamp).getTime(); + return candleTime >= fromMs && candleTime <= toMs; + }) + .sort((a, b) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime()); + } + private async fetchOHLCVFromDB(symbol: string, timeframe: Timeframe, limit?: number): Promise { const tableName = this.getTableName(timeframe); const effectiveLimit = limit || this.DEFAULT_LIMIT; @@ -160,15 +234,133 @@ class MarketDataService { return 'market_data.ohlcv_5m'; case '15m': return 'market_data.ohlcv_15m'; - case '1h': - case '4h': - case '1d': - throw new Error(`Timeframe ${timeframe} not yet implemented. Use 5m or 15m.`); default: - throw new Error(`Invalid timeframe: ${timeframe}`); + throw new Error(`Invalid timeframe for direct DB query: ${timeframe}`); } } + // =========================================================================== + // Aggregation Methods for 1h, 4h, 1d Timeframes + // =========================================================================== + + /** + * Check if timeframe requires aggregation from base data + */ + private requiresAggregation(timeframe: Timeframe): boolean { + return timeframe === '1h' || timeframe === '4h' || timeframe === '1d'; + } + + /** + * Fetch aggregated OHLCV data by computing from 5m candles + */ + private async fetchAggregatedOHLCV( + symbol: string, + timeframe: Timeframe, + limit: number + ): Promise { + const targetMinutes = TIMEFRAME_MINUTES[timeframe]; + const candlesPerPeriod = targetMinutes / this.BASE_MINUTES; + + // Fetch more base candles to ensure we have enough for aggregation + // Add extra for partial periods at boundaries + const baseLimit = limit * candlesPerPeriod + candlesPerPeriod; + + const baseCandles = await this.fetchOHLCVFromDB(symbol, this.BASE_TIMEFRAME, baseLimit); + + if (baseCandles.length === 0) { + return []; + } + + const aggregated = this.aggregateCandles(baseCandles, targetMinutes); + + // Return the requested limit, sorted by timestamp DESC (most recent first) + return aggregated.slice(0, limit); + } + + /** + * Aggregate 5m candles into larger timeframe candles + * Groups by period boundaries and calculates: + * - Open: first candle's open + * - High: max high across period + * - Low: min low across period + * - Close: last candle's close + * - Volume: sum of all volumes + * - VWAP: volume-weighted average price + */ + private aggregateCandles(candles: OHLCV[], targetMinutes: number): OHLCV[] { + if (candles.length === 0) { + return []; + } + + // Sort candles by timestamp ascending for proper aggregation + const sortedCandles = [...candles].sort( + (a, b) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime() + ); + + const periodMs = targetMinutes * 60 * 1000; + const groupedByPeriod = new Map(); + + // Group candles by period boundary + for (const candle of sortedCandles) { + const candleTime = new Date(candle.timestamp).getTime(); + const periodStart = Math.floor(candleTime / periodMs) * periodMs; + + if (!groupedByPeriod.has(periodStart)) { + groupedByPeriod.set(periodStart, []); + } + groupedByPeriod.get(periodStart)!.push(candle); + } + + // Aggregate each period + const aggregatedCandles: OHLCV[] = []; + const periodEntries = Array.from(groupedByPeriod.entries()); + + for (const [periodStart, periodCandles] of periodEntries) { + if (periodCandles.length === 0) continue; + + // Sort period candles by time to ensure correct open/close + periodCandles.sort( + (a, b) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime() + ); + + const firstCandle = periodCandles[0]; + const lastCandle = periodCandles[periodCandles.length - 1]; + + let high = -Infinity; + let low = Infinity; + let totalVolume = 0; + let vwapNumerator = 0; + + for (const candle of periodCandles) { + high = Math.max(high, candle.high); + low = Math.min(low, candle.low); + totalVolume += candle.volume; + + // Calculate typical price for VWAP: (high + low + close) / 3 + const typicalPrice = (candle.high + candle.low + candle.close) / 3; + vwapNumerator += typicalPrice * candle.volume; + } + + const vwap = totalVolume > 0 ? vwapNumerator / totalVolume : undefined; + + aggregatedCandles.push({ + timestamp: new Date(periodStart), + open: firstCandle.open, + high, + low, + close: lastCandle.close, + volume: totalVolume, + symbol: firstCandle.symbol, + vwap, + }); + } + + // Sort by timestamp descending (most recent first) to match expected output + return aggregatedCandles.sort( + (a, b) => new Date(b.timestamp).getTime() - new Date(a.timestamp).getTime() + ); + } + private transformOHLCV(row: OhlcvDataRow, symbol: string): OHLCV { return { timestamp: row.timestamp,