feat: Add complete Market Data OHLCV module

- Create market-data module with types, service, controller, and routes
- Implement getOHLCV endpoint: GET /api/v1/market-data/ohlcv/:symbol/:timeframe
- Implement getHistoricalData endpoint: GET /api/v1/market-data/historical/:symbol
- Add Redis caching with 60s TTL
- Support 5m and 15m timeframes from market_data schema
- Query PostgreSQL tables: market_data.ohlcv_5m, market_data.ohlcv_15m
- Validate parameters and return { data, count, cached } response
- Follow existing module patterns (ml, trading, notifications)

Resolves: GAP-P1-001

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Adrian Flores Cortes 2026-01-28 12:26:16 -06:00
parent ad51d5d5a8
commit 3295f255ee
6 changed files with 481 additions and 0 deletions

View File

@ -37,6 +37,7 @@ import { llmRouter } from './modules/llm/llm.routes.js';
import { portfolioRouter } from './modules/portfolio/portfolio.routes.js';
import { agentsRouter } from './modules/agents/agents.routes.js';
import { notificationRouter } from './modules/notifications/notification.routes.js';
import { marketDataRouter } from './modules/market-data/index.js';
// Service clients for health checks
import { tradingAgentsClient, mlEngineClient, llmAgentClient } from './shared/clients/index.js';
@ -154,6 +155,7 @@ apiRouter.use('/llm', llmRouter);
apiRouter.use('/portfolio', portfolioRouter);
apiRouter.use('/agents', agentsRouter);
apiRouter.use('/notifications', notificationRouter);
apiRouter.use('/market-data', marketDataRouter);
// Mount API router
app.use('/api/v1', apiRouter);

View File

@ -0,0 +1,158 @@
/**
* Market Data Controller
* Handles OHLCV data endpoints
*/
import { Request, Response, NextFunction } from 'express';
import { marketDataService } from '../services/marketData.service';
import { Timeframe } from '../types/market-data.types';
import { logger } from '../../../shared/utils/logger';
export async function getOHLCV(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const { symbol, timeframe } = req.params;
const { limit } = req.query;
if (!symbol || !timeframe) {
res.status(400).json({
success: false,
error: {
message: 'Missing required parameters: symbol and timeframe',
code: 'VALIDATION_ERROR',
},
});
return;
}
const validTimeframes: Timeframe[] = ['5m', '15m', '1h', '4h', '1d'];
if (!validTimeframes.includes(timeframe as Timeframe)) {
res.status(400).json({
success: false,
error: {
message: `Invalid timeframe. Must be one of: ${validTimeframes.join(', ')}`,
code: 'INVALID_TIMEFRAME',
},
});
return;
}
const effectiveLimit = limit ? parseInt(limit as string, 10) : undefined;
const data = await marketDataService.getOHLCV(
symbol,
timeframe as Timeframe,
effectiveLimit
);
res.json({
success: true,
data,
count: data.length,
cached: false,
});
} catch (error) {
logger.error('Error in getOHLCV', { error: (error as Error).message });
next(error);
}
}
export async function getHistoricalData(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const { symbol } = req.params;
const { timeframe, from, to } = req.query;
if (!symbol || !timeframe || !from || !to) {
res.status(400).json({
success: false,
error: {
message: 'Missing required parameters: symbol, timeframe, from, to',
code: 'VALIDATION_ERROR',
},
});
return;
}
const validTimeframes: Timeframe[] = ['5m', '15m', '1h', '4h', '1d'];
if (!validTimeframes.includes(timeframe as Timeframe)) {
res.status(400).json({
success: false,
error: {
message: `Invalid timeframe. Must be one of: ${validTimeframes.join(', ')}`,
code: 'INVALID_TIMEFRAME',
},
});
return;
}
const fromDate = new Date(from as string);
const toDate = new Date(to as string);
if (isNaN(fromDate.getTime()) || isNaN(toDate.getTime())) {
res.status(400).json({
success: false,
error: {
message: 'Invalid date format for from or to parameters',
code: 'INVALID_DATE',
},
});
return;
}
if (fromDate >= toDate) {
res.status(400).json({
success: false,
error: {
message: 'from date must be before to date',
code: 'INVALID_DATE_RANGE',
},
});
return;
}
const data = await marketDataService.getHistoricalData(
symbol,
timeframe as Timeframe,
fromDate,
toDate
);
res.json({
success: true,
data,
count: data.length,
cached: false,
});
} catch (error) {
logger.error('Error in getHistoricalData', { error: (error as Error).message });
next(error);
}
}
export async function getAvailableSymbols(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const symbols = await marketDataService.getAvailableSymbols();
res.json({
success: true,
data: symbols,
count: symbols.length,
});
} catch (error) {
logger.error('Error in getAvailableSymbols', { error: (error as Error).message });
next(error);
}
}
export async function healthCheck(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
const health = await marketDataService.healthCheck();
res.json({
success: true,
...health,
});
} catch (error) {
logger.error('Error in healthCheck', { error: (error as Error).message });
next(error);
}
}

View File

@ -0,0 +1,8 @@
/**
* Market Data Module
* Exports market data service, routes, and types
*/
export { marketDataService } from './services/marketData.service';
export { marketDataRouter } from './market-data.routes';
export * from './types/market-data.types';

View File

@ -0,0 +1,24 @@
/**
* Market Data Routes
* Defines REST endpoints for OHLCV market data
*/
import { Router } from 'express';
import {
getOHLCV,
getHistoricalData,
getAvailableSymbols,
healthCheck,
} from './controllers/market-data.controller';
const router = Router();
router.get('/health', healthCheck);
router.get('/symbols', getAvailableSymbols);
router.get('/ohlcv/:symbol/:timeframe', getOHLCV);
router.get('/historical/:symbol', getHistoricalData);
export { router as marketDataRouter };

View File

@ -0,0 +1,233 @@
/**
* Market Data Service
* Provides OHLCV data from PostgreSQL with Redis caching
*/
import { db } from '../../../shared/database';
import { redis } from '../../../shared/redis';
import { logger } from '../../../shared/utils/logger';
import {
OHLCV,
Timeframe,
CandleQueryOptions,
HistoricalDataOptions,
OhlcvDataRow,
TickerRow,
} from '../types/market-data.types';
class MarketDataService {
private readonly CACHE_TTL = 60;
private readonly DEFAULT_LIMIT = 100;
async getOHLCV(symbol: string, timeframe: Timeframe, limit?: number): Promise<OHLCV[]> {
const cacheKey = `market-data:ohlcv:${symbol}:${timeframe}:${limit || this.DEFAULT_LIMIT}`;
try {
const redisClient = await redis.getClient();
const cached = await redisClient.get(cacheKey);
if (cached) {
logger.debug('Market data cache hit', { symbol, timeframe });
return JSON.parse(cached);
}
} catch (error) {
logger.warn('Redis get failed, proceeding without cache', {
error: (error as Error).message,
});
}
const data = await this.fetchOHLCVFromDB(symbol, timeframe, limit);
try {
const redisClient = await redis.getClient();
await redisClient.setex(cacheKey, this.CACHE_TTL, JSON.stringify(data));
} catch (error) {
logger.warn('Redis set failed', { error: (error as Error).message });
}
return data;
}
async getHistoricalData(symbol: string, timeframe: Timeframe, from: Date, to: Date): Promise<OHLCV[]> {
const cacheKey = `market-data:historical:${symbol}:${timeframe}:${from.getTime()}:${to.getTime()}`;
try {
const redisClient = await redis.getClient();
const cached = await redisClient.get(cacheKey);
if (cached) {
logger.debug('Historical data cache hit', { symbol, timeframe, from, to });
return JSON.parse(cached);
}
} catch (error) {
logger.warn('Redis get failed, proceeding without cache', {
error: (error as Error).message,
});
}
const data = await this.fetchHistoricalFromDB(symbol, timeframe, from, to);
try {
const redisClient = await redis.getClient();
await redisClient.setex(cacheKey, this.CACHE_TTL, JSON.stringify(data));
} catch (error) {
logger.warn('Redis set failed', { error: (error as Error).message });
}
return data;
}
private async fetchOHLCVFromDB(symbol: string, timeframe: Timeframe, limit?: number): Promise<OHLCV[]> {
const tableName = this.getTableName(timeframe);
const effectiveLimit = limit || this.DEFAULT_LIMIT;
const tickerResult = await db.query<TickerRow>(
'SELECT id, symbol FROM market_data.tickers WHERE symbol = $1 AND is_active = true',
[symbol.toUpperCase()]
);
if (tickerResult.rows.length === 0) {
logger.warn('Ticker not found', { symbol });
return [];
}
const tickerId = tickerResult.rows[0].id;
const query = `
SELECT
timestamp,
open,
high,
low,
close,
volume,
vwap
FROM ${tableName}
WHERE ticker_id = $1
ORDER BY timestamp DESC
LIMIT $2
`;
const result = await db.query<OhlcvDataRow>(query, [tickerId, effectiveLimit]);
return result.rows.map((row) => this.transformOHLCV(row, symbol));
}
private async fetchHistoricalFromDB(
symbol: string,
timeframe: Timeframe,
from: Date,
to: Date
): Promise<OHLCV[]> {
const tableName = this.getTableName(timeframe);
const tickerResult = await db.query<TickerRow>(
'SELECT id, symbol FROM market_data.tickers WHERE symbol = $1 AND is_active = true',
[symbol.toUpperCase()]
);
if (tickerResult.rows.length === 0) {
logger.warn('Ticker not found', { symbol });
return [];
}
const tickerId = tickerResult.rows[0].id;
const query = `
SELECT
timestamp,
open,
high,
low,
close,
volume,
vwap
FROM ${tableName}
WHERE ticker_id = $1
AND timestamp >= $2
AND timestamp <= $3
ORDER BY timestamp ASC
`;
const result = await db.query<OhlcvDataRow>(query, [tickerId, from, to]);
return result.rows.map((row) => this.transformOHLCV(row, symbol));
}
private getTableName(timeframe: Timeframe): string {
switch (timeframe) {
case '5m':
return 'market_data.ohlcv_5m';
case '15m':
return 'market_data.ohlcv_15m';
case '1h':
case '4h':
case '1d':
throw new Error(`Timeframe ${timeframe} not yet implemented. Use 5m or 15m.`);
default:
throw new Error(`Invalid timeframe: ${timeframe}`);
}
}
private transformOHLCV(row: OhlcvDataRow, symbol: string): OHLCV {
return {
timestamp: row.timestamp,
open: parseFloat(row.open),
high: parseFloat(row.high),
low: parseFloat(row.low),
close: parseFloat(row.close),
volume: parseFloat(row.volume),
symbol,
vwap: row.vwap ? parseFloat(row.vwap) : undefined,
};
}
async getAvailableSymbols(): Promise<string[]> {
const cacheKey = 'market-data:symbols';
try {
const redisClient = await redis.getClient();
const cached = await redisClient.get(cacheKey);
if (cached) {
return JSON.parse(cached);
}
} catch (error) {
logger.warn('Redis get failed', { error: (error as Error).message });
}
const result = await db.query<TickerRow>(
'SELECT symbol FROM market_data.tickers WHERE is_active = true ORDER BY symbol'
);
const symbols = result.rows.map((row) => row.symbol);
try {
const redisClient = await redis.getClient();
await redisClient.setex(cacheKey, 300, JSON.stringify(symbols));
} catch (error) {
logger.warn('Redis set failed', { error: (error as Error).message });
}
return symbols;
}
async healthCheck(): Promise<{ status: string; message: string }> {
try {
const result = await db.query('SELECT COUNT(*) as count FROM market_data.tickers');
const count = parseInt(result.rows[0].count, 10);
return {
status: 'healthy',
message: `Market data service operational. ${count} tickers available.`,
};
} catch (error) {
return {
status: 'unhealthy',
message: (error as Error).message,
};
}
}
}
export const marketDataService = new MarketDataService();

View File

@ -0,0 +1,56 @@
/**
* Market Data Module Types
* OHLCV data types for the market data module
*/
export type Timeframe = '5m' | '15m' | '1h' | '4h' | '1d';
export interface OHLCV {
timestamp: Date;
open: number;
high: number;
low: number;
close: number;
volume: number;
symbol: string;
vwap?: number;
}
export interface CandleQueryOptions {
symbol: string;
timeframe: Timeframe;
limit?: number;
from?: Date;
to?: Date;
}
export interface HistoricalDataOptions {
symbol: string;
timeframe: Timeframe;
from: Date;
to: Date;
}
export interface OhlcvDataRow {
id: string;
ticker_id: number;
timestamp: Date;
open: string;
high: string;
low: string;
close: string;
volume: string;
vwap: string | null;
created_at: Date;
}
export interface TickerRow {
id: number;
symbol: string;
name: string;
asset_type: string;
exchange: string | null;
base_currency: string;
quote_currency: string;
is_active: boolean;
}