/** * useStreamingChat Hook * Handle real-time streaming responses with chunk buffering */ import { useState, useCallback, useRef, useEffect } from 'react'; export type StreamStatus = 'idle' | 'connecting' | 'streaming' | 'complete' | 'error' | 'cancelled'; export interface StreamChunk { type: 'content' | 'tool_start' | 'tool_end' | 'thinking' | 'error' | 'done'; content?: string; toolName?: string; toolResult?: unknown; error?: string; } export interface StreamState { status: StreamStatus; content: string; chunks: StreamChunk[]; currentTool: string | null; progress: number; error: string | null; startTime: number | null; endTime: number | null; } export interface StreamOptions { onChunk?: (chunk: StreamChunk) => void; onComplete?: (content: string) => void; onError?: (error: string) => void; onToolStart?: (toolName: string) => void; onToolEnd?: (toolName: string, result: unknown) => void; animateTokens?: boolean; tokenDelay?: number; } const initialState: StreamState = { status: 'idle', content: '', chunks: [], currentTool: null, progress: 0, error: null, startTime: null, endTime: null, }; export function useStreamingChat(options: StreamOptions = {}) { const { onChunk, onComplete, onError, onToolStart, onToolEnd, animateTokens = true, tokenDelay = 20, } = options; const [state, setState] = useState(initialState); // Refs for cleanup and animation const eventSourceRef = useRef(null); const abortControllerRef = useRef(null); const animationFrameRef = useRef(null); const tokenQueueRef = useRef([]); const isProcessingRef = useRef(false); // Cleanup function const cleanup = useCallback(() => { if (eventSourceRef.current) { eventSourceRef.current.close(); eventSourceRef.current = null; } if (abortControllerRef.current) { abortControllerRef.current.abort(); abortControllerRef.current = null; } if (animationFrameRef.current) { cancelAnimationFrame(animationFrameRef.current); animationFrameRef.current = null; } tokenQueueRef.current = []; isProcessingRef.current = false; }, []); // Cleanup on unmount useEffect(() => { return cleanup; }, [cleanup]); // Token animation processor const processTokenQueue = useCallback(() => { if (!isProcessingRef.current || tokenQueueRef.current.length === 0) { isProcessingRef.current = false; return; } const token = tokenQueueRef.current.shift(); if (token) { setState((prev) => ({ ...prev, content: prev.content + token, })); } if (tokenQueueRef.current.length > 0) { animationFrameRef.current = requestAnimationFrame(() => { setTimeout(processTokenQueue, tokenDelay); }); } else { isProcessingRef.current = false; } }, [tokenDelay]); // Add content with optional animation const addContent = useCallback((text: string) => { if (animateTokens) { // Split into tokens (words or characters based on length) const tokens = text.length > 50 ? text.match(/.{1,5}/g) || [text] : text.split(''); tokenQueueRef.current.push(...tokens); if (!isProcessingRef.current) { isProcessingRef.current = true; processTokenQueue(); } } else { setState((prev) => ({ ...prev, content: prev.content + text, })); } }, [animateTokens, processTokenQueue]); // Process incoming chunk const processChunk = useCallback((chunk: StreamChunk) => { setState((prev) => ({ ...prev, chunks: [...prev.chunks, chunk], })); onChunk?.(chunk); switch (chunk.type) { case 'content': if (chunk.content) { addContent(chunk.content); } break; case 'tool_start': setState((prev) => ({ ...prev, currentTool: chunk.toolName || null })); if (chunk.toolName) { onToolStart?.(chunk.toolName); } break; case 'tool_end': setState((prev) => ({ ...prev, currentTool: null })); if (chunk.toolName) { onToolEnd?.(chunk.toolName, chunk.toolResult); } break; case 'thinking': setState((prev) => ({ ...prev, status: 'streaming' })); break; case 'error': setState((prev) => ({ ...prev, status: 'error', error: chunk.error || 'Unknown error', })); if (chunk.error) { onError?.(chunk.error); } break; case 'done': setState((prev) => ({ ...prev, status: 'complete', endTime: Date.now(), progress: 100, })); break; } }, [addContent, onChunk, onToolStart, onToolEnd, onError]); // Start SSE stream const startStream = useCallback(async (url: string, body?: Record) => { cleanup(); setState({ ...initialState, status: 'connecting', startTime: Date.now(), }); try { // For POST requests with SSE, we need to use fetch + ReadableStream if (body) { abortControllerRef.current = new AbortController(); const response = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Accept': 'text/event-stream', }, body: JSON.stringify(body), signal: abortControllerRef.current.signal, }); if (!response.ok) { throw new Error(`HTTP ${response.status}: ${response.statusText}`); } if (!response.body) { throw new Error('No response body'); } setState((prev) => ({ ...prev, status: 'streaming' })); const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) { processChunk({ type: 'done' }); break; } buffer += decoder.decode(value, { stream: true }); // Process SSE format: data: {...}\n\n const lines = buffer.split('\n\n'); buffer = lines.pop() || ''; for (const line of lines) { if (line.startsWith('data: ')) { try { const data = JSON.parse(line.slice(6)); processChunk(data as StreamChunk); } catch { // If not JSON, treat as plain content processChunk({ type: 'content', content: line.slice(6) }); } } } } } else { // For GET requests, use EventSource eventSourceRef.current = new EventSource(url); eventSourceRef.current.onopen = () => { setState((prev) => ({ ...prev, status: 'streaming' })); }; eventSourceRef.current.onmessage = (event) => { try { const data = JSON.parse(event.data); processChunk(data as StreamChunk); } catch { processChunk({ type: 'content', content: event.data }); } }; eventSourceRef.current.onerror = () => { setState((prev) => ({ ...prev, status: 'error', error: 'Connection lost', })); onError?.('Connection lost'); cleanup(); }; } } catch (err) { if ((err as Error).name === 'AbortError') { setState((prev) => ({ ...prev, status: 'cancelled', endTime: Date.now(), })); } else { const errorMsg = err instanceof Error ? err.message : 'Stream failed'; setState((prev) => ({ ...prev, status: 'error', error: errorMsg, endTime: Date.now(), })); onError?.(errorMsg); } } }, [cleanup, processChunk, onError]); // Stop streaming const stopStream = useCallback(() => { cleanup(); setState((prev) => ({ ...prev, status: prev.status === 'streaming' ? 'cancelled' : prev.status, endTime: Date.now(), })); }, [cleanup]); // Reset state const reset = useCallback(() => { cleanup(); setState(initialState); }, [cleanup]); // Calculate duration const duration = state.startTime ? (state.endTime || Date.now()) - state.startTime : 0; return { // State ...state, duration, isActive: state.status === 'connecting' || state.status === 'streaming', // Actions startStream, stopStream, reset, // Utilities appendContent: addContent, }; } // Export types export type UseStreamingChatReturn = ReturnType; export default useStreamingChat;