diff --git a/src/index.ts b/src/index.ts index be5bcd8..6384440 100644 --- a/src/index.ts +++ b/src/index.ts @@ -37,6 +37,7 @@ import { llmRouter } from './modules/llm/llm.routes.js'; import { portfolioRouter } from './modules/portfolio/portfolio.routes.js'; import { agentsRouter } from './modules/agents/agents.routes.js'; import { notificationRouter } from './modules/notifications/notification.routes.js'; +import { marketDataRouter } from './modules/market-data/index.js'; // Service clients for health checks import { tradingAgentsClient, mlEngineClient, llmAgentClient } from './shared/clients/index.js'; @@ -154,6 +155,7 @@ apiRouter.use('/llm', llmRouter); apiRouter.use('/portfolio', portfolioRouter); apiRouter.use('/agents', agentsRouter); apiRouter.use('/notifications', notificationRouter); +apiRouter.use('/market-data', marketDataRouter); // Mount API router app.use('/api/v1', apiRouter); diff --git a/src/modules/market-data/controllers/market-data.controller.ts b/src/modules/market-data/controllers/market-data.controller.ts new file mode 100644 index 0000000..1ef455f --- /dev/null +++ b/src/modules/market-data/controllers/market-data.controller.ts @@ -0,0 +1,158 @@ +/** + * Market Data Controller + * Handles OHLCV data endpoints + */ + +import { Request, Response, NextFunction } from 'express'; +import { marketDataService } from '../services/marketData.service'; +import { Timeframe } from '../types/market-data.types'; +import { logger } from '../../../shared/utils/logger'; + +export async function getOHLCV(req: Request, res: Response, next: NextFunction): Promise { + try { + const { symbol, timeframe } = req.params; + const { limit } = req.query; + + if (!symbol || !timeframe) { + res.status(400).json({ + success: false, + error: { + message: 'Missing required parameters: symbol and timeframe', + code: 'VALIDATION_ERROR', + }, + }); + return; + } + + const validTimeframes: Timeframe[] = ['5m', '15m', '1h', '4h', '1d']; + if (!validTimeframes.includes(timeframe as Timeframe)) { + res.status(400).json({ + success: false, + error: { + message: `Invalid timeframe. Must be one of: ${validTimeframes.join(', ')}`, + code: 'INVALID_TIMEFRAME', + }, + }); + return; + } + + const effectiveLimit = limit ? parseInt(limit as string, 10) : undefined; + + const data = await marketDataService.getOHLCV( + symbol, + timeframe as Timeframe, + effectiveLimit + ); + + res.json({ + success: true, + data, + count: data.length, + cached: false, + }); + } catch (error) { + logger.error('Error in getOHLCV', { error: (error as Error).message }); + next(error); + } +} + +export async function getHistoricalData(req: Request, res: Response, next: NextFunction): Promise { + try { + const { symbol } = req.params; + const { timeframe, from, to } = req.query; + + if (!symbol || !timeframe || !from || !to) { + res.status(400).json({ + success: false, + error: { + message: 'Missing required parameters: symbol, timeframe, from, to', + code: 'VALIDATION_ERROR', + }, + }); + return; + } + + const validTimeframes: Timeframe[] = ['5m', '15m', '1h', '4h', '1d']; + if (!validTimeframes.includes(timeframe as Timeframe)) { + res.status(400).json({ + success: false, + error: { + message: `Invalid timeframe. Must be one of: ${validTimeframes.join(', ')}`, + code: 'INVALID_TIMEFRAME', + }, + }); + return; + } + + const fromDate = new Date(from as string); + const toDate = new Date(to as string); + + if (isNaN(fromDate.getTime()) || isNaN(toDate.getTime())) { + res.status(400).json({ + success: false, + error: { + message: 'Invalid date format for from or to parameters', + code: 'INVALID_DATE', + }, + }); + return; + } + + if (fromDate >= toDate) { + res.status(400).json({ + success: false, + error: { + message: 'from date must be before to date', + code: 'INVALID_DATE_RANGE', + }, + }); + return; + } + + const data = await marketDataService.getHistoricalData( + symbol, + timeframe as Timeframe, + fromDate, + toDate + ); + + res.json({ + success: true, + data, + count: data.length, + cached: false, + }); + } catch (error) { + logger.error('Error in getHistoricalData', { error: (error as Error).message }); + next(error); + } +} + +export async function getAvailableSymbols(req: Request, res: Response, next: NextFunction): Promise { + try { + const symbols = await marketDataService.getAvailableSymbols(); + + res.json({ + success: true, + data: symbols, + count: symbols.length, + }); + } catch (error) { + logger.error('Error in getAvailableSymbols', { error: (error as Error).message }); + next(error); + } +} + +export async function healthCheck(req: Request, res: Response, next: NextFunction): Promise { + try { + const health = await marketDataService.healthCheck(); + + res.json({ + success: true, + ...health, + }); + } catch (error) { + logger.error('Error in healthCheck', { error: (error as Error).message }); + next(error); + } +} diff --git a/src/modules/market-data/index.ts b/src/modules/market-data/index.ts new file mode 100644 index 0000000..ed37727 --- /dev/null +++ b/src/modules/market-data/index.ts @@ -0,0 +1,8 @@ +/** + * Market Data Module + * Exports market data service, routes, and types + */ + +export { marketDataService } from './services/marketData.service'; +export { marketDataRouter } from './market-data.routes'; +export * from './types/market-data.types'; diff --git a/src/modules/market-data/market-data.routes.ts b/src/modules/market-data/market-data.routes.ts new file mode 100644 index 0000000..69912fd --- /dev/null +++ b/src/modules/market-data/market-data.routes.ts @@ -0,0 +1,24 @@ +/** + * Market Data Routes + * Defines REST endpoints for OHLCV market data + */ + +import { Router } from 'express'; +import { + getOHLCV, + getHistoricalData, + getAvailableSymbols, + healthCheck, +} from './controllers/market-data.controller'; + +const router = Router(); + +router.get('/health', healthCheck); + +router.get('/symbols', getAvailableSymbols); + +router.get('/ohlcv/:symbol/:timeframe', getOHLCV); + +router.get('/historical/:symbol', getHistoricalData); + +export { router as marketDataRouter }; diff --git a/src/modules/market-data/services/marketData.service.ts b/src/modules/market-data/services/marketData.service.ts new file mode 100644 index 0000000..521fc3a --- /dev/null +++ b/src/modules/market-data/services/marketData.service.ts @@ -0,0 +1,233 @@ +/** + * Market Data Service + * Provides OHLCV data from PostgreSQL with Redis caching + */ + +import { db } from '../../../shared/database'; +import { redis } from '../../../shared/redis'; +import { logger } from '../../../shared/utils/logger'; +import { + OHLCV, + Timeframe, + CandleQueryOptions, + HistoricalDataOptions, + OhlcvDataRow, + TickerRow, +} from '../types/market-data.types'; + +class MarketDataService { + private readonly CACHE_TTL = 60; + private readonly DEFAULT_LIMIT = 100; + + async getOHLCV(symbol: string, timeframe: Timeframe, limit?: number): Promise { + const cacheKey = `market-data:ohlcv:${symbol}:${timeframe}:${limit || this.DEFAULT_LIMIT}`; + + try { + const redisClient = await redis.getClient(); + const cached = await redisClient.get(cacheKey); + + if (cached) { + logger.debug('Market data cache hit', { symbol, timeframe }); + return JSON.parse(cached); + } + } catch (error) { + logger.warn('Redis get failed, proceeding without cache', { + error: (error as Error).message, + }); + } + + const data = await this.fetchOHLCVFromDB(symbol, timeframe, limit); + + try { + const redisClient = await redis.getClient(); + await redisClient.setex(cacheKey, this.CACHE_TTL, JSON.stringify(data)); + } catch (error) { + logger.warn('Redis set failed', { error: (error as Error).message }); + } + + return data; + } + + async getHistoricalData(symbol: string, timeframe: Timeframe, from: Date, to: Date): Promise { + const cacheKey = `market-data:historical:${symbol}:${timeframe}:${from.getTime()}:${to.getTime()}`; + + try { + const redisClient = await redis.getClient(); + const cached = await redisClient.get(cacheKey); + + if (cached) { + logger.debug('Historical data cache hit', { symbol, timeframe, from, to }); + return JSON.parse(cached); + } + } catch (error) { + logger.warn('Redis get failed, proceeding without cache', { + error: (error as Error).message, + }); + } + + const data = await this.fetchHistoricalFromDB(symbol, timeframe, from, to); + + try { + const redisClient = await redis.getClient(); + await redisClient.setex(cacheKey, this.CACHE_TTL, JSON.stringify(data)); + } catch (error) { + logger.warn('Redis set failed', { error: (error as Error).message }); + } + + return data; + } + + private async fetchOHLCVFromDB(symbol: string, timeframe: Timeframe, limit?: number): Promise { + const tableName = this.getTableName(timeframe); + const effectiveLimit = limit || this.DEFAULT_LIMIT; + + const tickerResult = await db.query( + 'SELECT id, symbol FROM market_data.tickers WHERE symbol = $1 AND is_active = true', + [symbol.toUpperCase()] + ); + + if (tickerResult.rows.length === 0) { + logger.warn('Ticker not found', { symbol }); + return []; + } + + const tickerId = tickerResult.rows[0].id; + + const query = ` + SELECT + timestamp, + open, + high, + low, + close, + volume, + vwap + FROM ${tableName} + WHERE ticker_id = $1 + ORDER BY timestamp DESC + LIMIT $2 + `; + + const result = await db.query(query, [tickerId, effectiveLimit]); + + return result.rows.map((row) => this.transformOHLCV(row, symbol)); + } + + private async fetchHistoricalFromDB( + symbol: string, + timeframe: Timeframe, + from: Date, + to: Date + ): Promise { + const tableName = this.getTableName(timeframe); + + const tickerResult = await db.query( + 'SELECT id, symbol FROM market_data.tickers WHERE symbol = $1 AND is_active = true', + [symbol.toUpperCase()] + ); + + if (tickerResult.rows.length === 0) { + logger.warn('Ticker not found', { symbol }); + return []; + } + + const tickerId = tickerResult.rows[0].id; + + const query = ` + SELECT + timestamp, + open, + high, + low, + close, + volume, + vwap + FROM ${tableName} + WHERE ticker_id = $1 + AND timestamp >= $2 + AND timestamp <= $3 + ORDER BY timestamp ASC + `; + + const result = await db.query(query, [tickerId, from, to]); + + return result.rows.map((row) => this.transformOHLCV(row, symbol)); + } + + private getTableName(timeframe: Timeframe): string { + switch (timeframe) { + case '5m': + return 'market_data.ohlcv_5m'; + case '15m': + return 'market_data.ohlcv_15m'; + case '1h': + case '4h': + case '1d': + throw new Error(`Timeframe ${timeframe} not yet implemented. Use 5m or 15m.`); + default: + throw new Error(`Invalid timeframe: ${timeframe}`); + } + } + + private transformOHLCV(row: OhlcvDataRow, symbol: string): OHLCV { + return { + timestamp: row.timestamp, + open: parseFloat(row.open), + high: parseFloat(row.high), + low: parseFloat(row.low), + close: parseFloat(row.close), + volume: parseFloat(row.volume), + symbol, + vwap: row.vwap ? parseFloat(row.vwap) : undefined, + }; + } + + async getAvailableSymbols(): Promise { + const cacheKey = 'market-data:symbols'; + + try { + const redisClient = await redis.getClient(); + const cached = await redisClient.get(cacheKey); + + if (cached) { + return JSON.parse(cached); + } + } catch (error) { + logger.warn('Redis get failed', { error: (error as Error).message }); + } + + const result = await db.query( + 'SELECT symbol FROM market_data.tickers WHERE is_active = true ORDER BY symbol' + ); + + const symbols = result.rows.map((row) => row.symbol); + + try { + const redisClient = await redis.getClient(); + await redisClient.setex(cacheKey, 300, JSON.stringify(symbols)); + } catch (error) { + logger.warn('Redis set failed', { error: (error as Error).message }); + } + + return symbols; + } + + async healthCheck(): Promise<{ status: string; message: string }> { + try { + const result = await db.query('SELECT COUNT(*) as count FROM market_data.tickers'); + const count = parseInt(result.rows[0].count, 10); + + return { + status: 'healthy', + message: `Market data service operational. ${count} tickers available.`, + }; + } catch (error) { + return { + status: 'unhealthy', + message: (error as Error).message, + }; + } + } +} + +export const marketDataService = new MarketDataService(); diff --git a/src/modules/market-data/types/market-data.types.ts b/src/modules/market-data/types/market-data.types.ts new file mode 100644 index 0000000..c8ee363 --- /dev/null +++ b/src/modules/market-data/types/market-data.types.ts @@ -0,0 +1,56 @@ +/** + * Market Data Module Types + * OHLCV data types for the market data module + */ + +export type Timeframe = '5m' | '15m' | '1h' | '4h' | '1d'; + +export interface OHLCV { + timestamp: Date; + open: number; + high: number; + low: number; + close: number; + volume: number; + symbol: string; + vwap?: number; +} + +export interface CandleQueryOptions { + symbol: string; + timeframe: Timeframe; + limit?: number; + from?: Date; + to?: Date; +} + +export interface HistoricalDataOptions { + symbol: string; + timeframe: Timeframe; + from: Date; + to: Date; +} + +export interface OhlcvDataRow { + id: string; + ticker_id: number; + timestamp: Date; + open: string; + high: string; + low: string; + close: string; + volume: string; + vwap: string | null; + created_at: Date; +} + +export interface TickerRow { + id: number; + symbol: string; + name: string; + asset_type: string; + exchange: string | null; + base_currency: string; + quote_currency: string; + is_active: boolean; +}