Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions agent-streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ The Genkit code for the streaming flow can be found in `src/server/index.ts`.

## How it works

- **Backend (Express):** Exposes an endpoint `/api/chat` using Server-Sent Events (SSE). It invokes the Genkit flow `streamingThoughtsFlow`, which streams both intermediate thoughts and the final text chunks.
- **Backend (Express):** Exposes an endpoint `/api/chat` using Server-Sent Events (SSE) compatible with the Genkit flow streaming protocol. It invokes the Genkit flow `streamingThoughtsFlow`, which streams both intermediate thoughts and the final text chunks.
- **Genkit Flow:** Uses `googleAI.model('gemini-3.5-flash')` with `thinkingConfig` (`includeThoughts: true`) to stream reasoning details. The flow yields custom chunk objects with `type: 'thought'` or `type: 'text'`.
- **Frontend (React + TypeScript):** Reads the Server-Sent Events stream, updates a collapsible "Thinking" card with step labels, and renders the model's Markdown text in real-time.
- **Frontend (React + TypeScript):** Uses the [Genkit Beta Client library](https://js.api.genkit.dev/modules/genkit.beta_client.html) to consume the flow's SSE chunks asynchronously, updates a collapsible "Thinking" card with step labels, and renders the model's Markdown text in real-time.

## Running the app

Expand Down
47 changes: 9 additions & 38 deletions agent-streaming/src/client/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { ChatInput } from './components/ChatInput.js';
import { ThoughtBox } from './components/ThoughtBox.js';
import { MessageBubble } from './components/MessageBubble.js';
import { Message, StreamChunk } from './types.js';
import { streamFlow } from 'genkit/beta/client';

export const App: React.FC = () => {
const [messages, setMessages] = useState<Message[]>([]);
Expand Down Expand Up @@ -40,46 +41,16 @@ export const App: React.FC = () => {
setMessages((prev) => [...prev, userMessage]);

try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ prompt }),
signal: controller.signal,
// Use the Genkit Beta Client helper to consume the stream.
// See: https://js.api.genkit.dev/modules/genkit.beta_client.html
const response = streamFlow<string, StreamChunk>({
url: '/api/chat',
input: prompt,
abortSignal: controller.signal,
});

if (!response.ok) {
throw new Error(`Network response was not ok (Status: ${response.status})`);
}
if (!response.body) {
throw new Error(`Response body is empty (Status: ${response.status})`);
}

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';

while (true) {
const { value, done } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');

// Store the last partial line in the buffer to prevent parsing
// errors if a chunk is split mid-line
buffer = lines.pop() || '';

const dataPrefix = 'data: ';
for (const line of lines) {
const trimmed = line.trim();
if (trimmed.startsWith(dataPrefix)) {
const jsonStr = trimmed.slice(dataPrefix.length);
const chunk = JSON.parse(jsonStr) as StreamChunk;
handleStreamChunk(chunk);
}
}
for await (const chunk of response.stream) {
handleStreamChunk(chunk);
}
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
Expand Down
16 changes: 10 additions & 6 deletions agent-streaming/src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ app.use(express.static(path.join(__dirname, '../../dist')));

// API Endpoint to stream the chat response (Server-Sent Events)
app.post('/api/chat', async (req: Request, res: Response) => {
const { prompt } = req.body;
const { data } = req.body;

if (typeof prompt !== 'string' || prompt.trim() === '') {
if (typeof data !== 'string' || data.trim() === '') {
return res.status(400).json({ error: 'Prompt must be a non-empty string' });
}

Expand All @@ -124,20 +124,24 @@ app.post('/api/chat', async (req: Request, res: Response) => {
res.setHeader('Connection', 'keep-alive');

try {
const responseStream = streamingThoughtsFlow.stream(prompt);
const responseStream = streamingThoughtsFlow.stream(data);

// Stream the chunks as they arrive
for await (const chunk of responseStream.stream) {
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
res.write(`data: ${JSON.stringify({ message: chunk })}\n\n`);
}

// Write the final result to the stream to satisfy the streamFlow client expectations
const finalResult = await responseStream.output;
res.write(`data: ${JSON.stringify({ result: finalResult })}\n\n`);

res.end();
} catch (error) {
console.error('Error in stream processing:', error);
if (!res.headersSent) {
res.status(500).json({ error: 'An error occurred while generating response.' });
} else {
res.write(`data: ${JSON.stringify({ messageId: crypto.randomUUID(), type: 'error', content: 'An error occurred while generating response.' })}\n\n`);
} else if (res.writable) {
res.write(`data: ${JSON.stringify({ error: { status: 'INTERNAL', message: 'An error occurred while generating response.', details: error instanceof Error ? error.message : String(error) } })}\n\n`);
res.end();
}
}
Expand Down