From 792fbe1de726f76d3f3e039c29ac846d76254062 Mon Sep 17 00:00:00 2001 From: Alex Duan Date: Tue, 16 Jun 2026 11:59:27 -0700 Subject: [PATCH] refactor(agent-streaming): Use Genkit beta client library --- agent-streaming/README.md | 4 +-- agent-streaming/src/client/App.tsx | 47 ++++++----------------------- agent-streaming/src/server/index.ts | 16 ++++++---- 3 files changed, 21 insertions(+), 46 deletions(-) diff --git a/agent-streaming/README.md b/agent-streaming/README.md index cbfe22f..a2d57b4 100644 --- a/agent-streaming/README.md +++ b/agent-streaming/README.md @@ -8,9 +8,9 @@ The Genkit code for the streaming flow can be found in `src/server/index.ts`. ## How it works -- **Backend (Express):** Exposes an endpoint `/api/chat` using Server-Sent Events (SSE). It invokes the Genkit flow `streamingThoughtsFlow`, which streams both intermediate thoughts and the final text chunks. +- **Backend (Express):** Exposes an endpoint `/api/chat` using Server-Sent Events (SSE) compatible with the Genkit flow streaming protocol. It invokes the Genkit flow `streamingThoughtsFlow`, which streams both intermediate thoughts and the final text chunks. - **Genkit Flow:** Uses `googleAI.model('gemini-3.5-flash')` with `thinkingConfig` (`includeThoughts: true`) to stream reasoning details. The flow yields custom chunk objects with `type: 'thought'` or `type: 'text'`. -- **Frontend (React + TypeScript):** Reads the Server-Sent Events stream, updates a collapsible "Thinking" card with step labels, and renders the model's Markdown text in real-time. +- **Frontend (React + TypeScript):** Uses the [Genkit Beta Client library](https://js.api.genkit.dev/modules/genkit.beta_client.html) to consume the flow's SSE chunks asynchronously, updates a collapsible "Thinking" card with step labels, and renders the model's Markdown text in real-time. ## Running the app diff --git a/agent-streaming/src/client/App.tsx b/agent-streaming/src/client/App.tsx index e88a0a8..1608013 100644 --- a/agent-streaming/src/client/App.tsx +++ b/agent-streaming/src/client/App.tsx @@ -3,6 +3,7 @@ import { ChatInput } from './components/ChatInput.js'; import { ThoughtBox } from './components/ThoughtBox.js'; import { MessageBubble } from './components/MessageBubble.js'; import { Message, StreamChunk } from './types.js'; +import { streamFlow } from 'genkit/beta/client'; export const App: React.FC = () => { const [messages, setMessages] = useState([]); @@ -40,46 +41,16 @@ export const App: React.FC = () => { setMessages((prev) => [...prev, userMessage]); try { - const response = await fetch('/api/chat', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ prompt }), - signal: controller.signal, + // Use the Genkit Beta Client helper to consume the stream. + // See: https://js.api.genkit.dev/modules/genkit.beta_client.html + const response = streamFlow({ + url: '/api/chat', + input: prompt, + abortSignal: controller.signal, }); - if (!response.ok) { - throw new Error(`Network response was not ok (Status: ${response.status})`); - } - if (!response.body) { - throw new Error(`Response body is empty (Status: ${response.status})`); - } - - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ''; - - while (true) { - const { value, done } = await reader.read(); - if (done) break; - - buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split('\n'); - - // Store the last partial line in the buffer to prevent parsing - // errors if a chunk is split mid-line - buffer = lines.pop() || ''; - - const dataPrefix = 'data: '; - for (const line of lines) { - const trimmed = line.trim(); - if (trimmed.startsWith(dataPrefix)) { - const jsonStr = trimmed.slice(dataPrefix.length); - const chunk = JSON.parse(jsonStr) as StreamChunk; - handleStreamChunk(chunk); - } - } + for await (const chunk of response.stream) { + handleStreamChunk(chunk); } } catch (error) { if (error instanceof Error && error.name === 'AbortError') { diff --git a/agent-streaming/src/server/index.ts b/agent-streaming/src/server/index.ts index d6ce316..eefd824 100644 --- a/agent-streaming/src/server/index.ts +++ b/agent-streaming/src/server/index.ts @@ -112,9 +112,9 @@ app.use(express.static(path.join(__dirname, '../../dist'))); // API Endpoint to stream the chat response (Server-Sent Events) app.post('/api/chat', async (req: Request, res: Response) => { - const { prompt } = req.body; + const { data } = req.body; - if (typeof prompt !== 'string' || prompt.trim() === '') { + if (typeof data !== 'string' || data.trim() === '') { return res.status(400).json({ error: 'Prompt must be a non-empty string' }); } @@ -124,20 +124,24 @@ app.post('/api/chat', async (req: Request, res: Response) => { res.setHeader('Connection', 'keep-alive'); try { - const responseStream = streamingThoughtsFlow.stream(prompt); + const responseStream = streamingThoughtsFlow.stream(data); // Stream the chunks as they arrive for await (const chunk of responseStream.stream) { - res.write(`data: ${JSON.stringify(chunk)}\n\n`); + res.write(`data: ${JSON.stringify({ message: chunk })}\n\n`); } + // Write the final result to the stream to satisfy the streamFlow client expectations + const finalResult = await responseStream.output; + res.write(`data: ${JSON.stringify({ result: finalResult })}\n\n`); + res.end(); } catch (error) { console.error('Error in stream processing:', error); if (!res.headersSent) { res.status(500).json({ error: 'An error occurred while generating response.' }); - } else { - res.write(`data: ${JSON.stringify({ messageId: crypto.randomUUID(), type: 'error', content: 'An error occurred while generating response.' })}\n\n`); + } else if (res.writable) { + res.write(`data: ${JSON.stringify({ error: { status: 'INTERNAL', message: 'An error occurred while generating response.', details: error instanceof Error ? error.message : String(error) } })}\n\n`); res.end(); } }