[SYNC] fix: Update market data service

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Adrian Flores Cortes 2026-02-03 08:11:41 -06:00
parent abc7e85dbe
commit e7745d18b9

View File

@ -1,6 +1,7 @@
/**
* Market Data Service
* Provides OHLCV data from PostgreSQL with Redis caching
* Supports 5m, 15m (direct DB), 1h, 4h, 1d (aggregated from 5m)
*/
import { db } from '../../../shared/database';
@ -15,12 +16,40 @@ import {
TickerRow,
} from '../types/market-data.types';
// =============================================================================
// Cache TTL Configuration by Timeframe
// =============================================================================
const CACHE_TTL_BY_TIMEFRAME: Record<Timeframe, number> = {
'5m': 60, // 1 minute cache
'15m': 60, // 1 minute cache
'1h': 300, // 5 minute cache
'4h': 900, // 15 minute cache
'1d': 3600, // 1 hour cache
};
// =============================================================================
// Timeframe Minutes Mapping
// =============================================================================
const TIMEFRAME_MINUTES: Record<Timeframe, number> = {
'5m': 5,
'15m': 15,
'1h': 60,
'4h': 240,
'1d': 1440,
};
class MarketDataService {
private readonly CACHE_TTL = 60;
private readonly DEFAULT_LIMIT = 100;
private readonly BASE_TIMEFRAME: Timeframe = '5m';
private readonly BASE_MINUTES = 5;
async getOHLCV(symbol: string, timeframe: Timeframe, limit?: number): Promise<OHLCV[]> {
const cacheKey = `market-data:ohlcv:${symbol}:${timeframe}:${limit || this.DEFAULT_LIMIT}`;
const effectiveLimit = limit || this.DEFAULT_LIMIT;
const cacheTTL = CACHE_TTL_BY_TIMEFRAME[timeframe];
const cacheKey = `market-data:ohlcv:${symbol}:${timeframe}:${effectiveLimit}`;
try {
const redisClient = await redis.getClient();
@ -36,11 +65,17 @@ class MarketDataService {
});
}
const data = await this.fetchOHLCVFromDB(symbol, timeframe, limit);
let data: OHLCV[];
if (this.requiresAggregation(timeframe)) {
data = await this.fetchAggregatedOHLCV(symbol, timeframe, effectiveLimit);
} else {
data = await this.fetchOHLCVFromDB(symbol, timeframe, effectiveLimit);
}
try {
const redisClient = await redis.getClient();
await redisClient.setex(cacheKey, this.CACHE_TTL, JSON.stringify(data));
await redisClient.setex(cacheKey, cacheTTL, JSON.stringify(data));
} catch (error) {
logger.warn('Redis set failed', { error: (error as Error).message });
}
@ -49,6 +84,7 @@ class MarketDataService {
}
async getHistoricalData(symbol: string, timeframe: Timeframe, from: Date, to: Date): Promise<OHLCV[]> {
const cacheTTL = CACHE_TTL_BY_TIMEFRAME[timeframe];
const cacheKey = `market-data:historical:${symbol}:${timeframe}:${from.getTime()}:${to.getTime()}`;
try {
@ -65,11 +101,17 @@ class MarketDataService {
});
}
const data = await this.fetchHistoricalFromDB(symbol, timeframe, from, to);
let data: OHLCV[];
if (this.requiresAggregation(timeframe)) {
data = await this.fetchAggregatedHistoricalData(symbol, timeframe, from, to);
} else {
data = await this.fetchHistoricalFromDB(symbol, timeframe, from, to);
}
try {
const redisClient = await redis.getClient();
await redisClient.setex(cacheKey, this.CACHE_TTL, JSON.stringify(data));
await redisClient.setex(cacheKey, cacheTTL, JSON.stringify(data));
} catch (error) {
logger.warn('Redis set failed', { error: (error as Error).message });
}
@ -77,6 +119,38 @@ class MarketDataService {
return data;
}
/**
* Fetch aggregated historical data for 1h, 4h, 1d timeframes
*/
private async fetchAggregatedHistoricalData(
symbol: string,
timeframe: Timeframe,
from: Date,
to: Date
): Promise<OHLCV[]> {
const targetMinutes = TIMEFRAME_MINUTES[timeframe];
// Fetch base 5m candles for the date range
const baseCandles = await this.fetchHistoricalFromDB(symbol, this.BASE_TIMEFRAME, from, to);
if (baseCandles.length === 0) {
return [];
}
const aggregated = this.aggregateCandles(baseCandles, targetMinutes);
// Filter to only include complete periods within the requested range
const fromMs = from.getTime();
const toMs = to.getTime();
return aggregated
.filter((candle) => {
const candleTime = new Date(candle.timestamp).getTime();
return candleTime >= fromMs && candleTime <= toMs;
})
.sort((a, b) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime());
}
private async fetchOHLCVFromDB(symbol: string, timeframe: Timeframe, limit?: number): Promise<OHLCV[]> {
const tableName = this.getTableName(timeframe);
const effectiveLimit = limit || this.DEFAULT_LIMIT;
@ -160,15 +234,133 @@ class MarketDataService {
return 'market_data.ohlcv_5m';
case '15m':
return 'market_data.ohlcv_15m';
case '1h':
case '4h':
case '1d':
throw new Error(`Timeframe ${timeframe} not yet implemented. Use 5m or 15m.`);
default:
throw new Error(`Invalid timeframe: ${timeframe}`);
throw new Error(`Invalid timeframe for direct DB query: ${timeframe}`);
}
}
// ===========================================================================
// Aggregation Methods for 1h, 4h, 1d Timeframes
// ===========================================================================
/**
* Check if timeframe requires aggregation from base data
*/
private requiresAggregation(timeframe: Timeframe): boolean {
return timeframe === '1h' || timeframe === '4h' || timeframe === '1d';
}
/**
* Fetch aggregated OHLCV data by computing from 5m candles
*/
private async fetchAggregatedOHLCV(
symbol: string,
timeframe: Timeframe,
limit: number
): Promise<OHLCV[]> {
const targetMinutes = TIMEFRAME_MINUTES[timeframe];
const candlesPerPeriod = targetMinutes / this.BASE_MINUTES;
// Fetch more base candles to ensure we have enough for aggregation
// Add extra for partial periods at boundaries
const baseLimit = limit * candlesPerPeriod + candlesPerPeriod;
const baseCandles = await this.fetchOHLCVFromDB(symbol, this.BASE_TIMEFRAME, baseLimit);
if (baseCandles.length === 0) {
return [];
}
const aggregated = this.aggregateCandles(baseCandles, targetMinutes);
// Return the requested limit, sorted by timestamp DESC (most recent first)
return aggregated.slice(0, limit);
}
/**
* Aggregate 5m candles into larger timeframe candles
* Groups by period boundaries and calculates:
* - Open: first candle's open
* - High: max high across period
* - Low: min low across period
* - Close: last candle's close
* - Volume: sum of all volumes
* - VWAP: volume-weighted average price
*/
private aggregateCandles(candles: OHLCV[], targetMinutes: number): OHLCV[] {
if (candles.length === 0) {
return [];
}
// Sort candles by timestamp ascending for proper aggregation
const sortedCandles = [...candles].sort(
(a, b) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime()
);
const periodMs = targetMinutes * 60 * 1000;
const groupedByPeriod = new Map<number, OHLCV[]>();
// Group candles by period boundary
for (const candle of sortedCandles) {
const candleTime = new Date(candle.timestamp).getTime();
const periodStart = Math.floor(candleTime / periodMs) * periodMs;
if (!groupedByPeriod.has(periodStart)) {
groupedByPeriod.set(periodStart, []);
}
groupedByPeriod.get(periodStart)!.push(candle);
}
// Aggregate each period
const aggregatedCandles: OHLCV[] = [];
const periodEntries = Array.from(groupedByPeriod.entries());
for (const [periodStart, periodCandles] of periodEntries) {
if (periodCandles.length === 0) continue;
// Sort period candles by time to ensure correct open/close
periodCandles.sort(
(a, b) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime()
);
const firstCandle = periodCandles[0];
const lastCandle = periodCandles[periodCandles.length - 1];
let high = -Infinity;
let low = Infinity;
let totalVolume = 0;
let vwapNumerator = 0;
for (const candle of periodCandles) {
high = Math.max(high, candle.high);
low = Math.min(low, candle.low);
totalVolume += candle.volume;
// Calculate typical price for VWAP: (high + low + close) / 3
const typicalPrice = (candle.high + candle.low + candle.close) / 3;
vwapNumerator += typicalPrice * candle.volume;
}
const vwap = totalVolume > 0 ? vwapNumerator / totalVolume : undefined;
aggregatedCandles.push({
timestamp: new Date(periodStart),
open: firstCandle.open,
high,
low,
close: lastCandle.close,
volume: totalVolume,
symbol: firstCandle.symbol,
vwap,
});
}
// Sort by timestamp descending (most recent first) to match expected output
return aggregatedCandles.sort(
(a, b) => new Date(b.timestamp).getTime() - new Date(a.timestamp).getTime()
);
}
private transformOHLCV(row: OhlcvDataRow, symbol: string): OHLCV {
return {
timestamp: row.timestamp,