diff --git a/agent_instructions/modifying_chat_ui.md b/agent_instructions/modifying_chat_ui.md index 46fe421..086b039 100644 --- a/agent_instructions/modifying_chat_ui.md +++ b/agent_instructions/modifying_chat_ui.md @@ -78,6 +78,42 @@ Key selectors: 3. **New callbacks?** → Wrap in `useCallback` with tight dependencies 4. **New selectors?** → Create surgical selector hooks in the store file +## Message Queue (sending while streaming) + +The input stays editable while a response streams. An **idle** send goes out immediately; a send +issued **while a turn is in flight** is queued and dispatched when the current turn completes. + +- The queue is a plain, framework-agnostic class: `pages/chat/messageQueue.ts` (`MessageQueue`), + used as an **app-wide singleton** (`chatMessageQueue`) and wrapped by the `useMessageQueue` hook. + `ChatPage` calls `sendOrQueue(content, files)`; the hook exposes `queuedMessages` + + `removeQueuedMessage` + `clearQueue`, threaded through `ChatView` to `ChatInput`, which renders + removable chips above the input. The class is unit-tested in + `pages/chat/__tests__/messageQueue.test.ts`. +- **Why a singleton, not component state:** the first message of a conversation navigates `/chat` → + `/chat/:id` (two separate `` elements), which **remounts `ChatPage`**. The first turn's + stream is *not* aborted on unmount (useChat has no unmount cleanup), so it keeps running on the + global stores. Component-scoped queue state would reset the in-flight lock on the remounted + instance, and a queued message would start a **second turn concurrently** — two `initStreaming` + calls for one model render two side-by-side responses ("the whole thing restarts"). The singleton + keeps the lock alive across the remount. The hook pushes the latest `sendMessage` in via + `setSend` every render and subscribes for queue updates; `ChatPage` calls `clearQueue()` on a real + conversation switch (skipping the create transition) so queued messages don't leak across + conversations. +- `useChat`'s `sendMessage` is **async and resolves only when the whole turn completes** (including + multi-round tool execution). Serialization keys off that promise. Do **not** drive the queue off + `isStreaming` transitions — `isStreaming` briefly flips false *between* tool rounds (see the + comment near `useChat.ts` "more rounds coming"), which would dispatch the next message mid-turn + and clobber the in-flight stream (`initStreaming` resets the streaming store). +- Only queue when busy. Routing the **first/idle** send through a long-lived drain loop is fragile: + that send also creates the conversation and navigates (`/chat` → `/chat/:id`), and coupling the + queue lock to that lifecycle caused ordering/stuck-queue bugs. Idle sends now bypass the queue + entirely (no chip), matching pre-queue behavior. +- `MessageQueue` reads `send` through a getter so each dispatch uses the latest `sendMessage` + closure (picks up model/tool/config changes made while draining). A failed turn is caught so it + never strands the rest of the queue or leaves the queue stuck busy. +- **Stop** is a separate button (shown only while streaming) that aborts the in-flight response; it + does not clear the queue. Remove queued messages via their chip's ✕. + ## Model Instances The chat supports **model instances** — multiple copies of the same model with different settings (e.g., compare GPT-4 with temperature 0.3 vs 0.9): diff --git a/docs/content/docs/features/chat-ui.mdx b/docs/content/docs/features/chat-ui.mdx index 154c81a..303291e 100644 --- a/docs/content/docs/features/chat-ui.mdx +++ b/docs/content/docs/features/chat-ui.mdx @@ -106,13 +106,22 @@ The chat UI is optimized for high-performance multi-model streaming: ### Streaming Features -| Feature | Description | -| ------------------- | ----------------------------------------------- | -| Live markdown | Content renders as markdown while streaming | -| Syntax highlighting | Code blocks highlighted in real-time | -| Auto-scroll | Follows streaming content, pauses on scroll-up | -| Cancel | Stop any or all streams mid-generation | -| Usage stats | Time-to-first-token and tokens/second displayed | +| Feature | Description | +| ------------------- | --------------------------------------------------------- | +| Live markdown | Content renders as markdown while streaming | +| Syntax highlighting | Code blocks highlighted in real-time | +| Auto-scroll | Follows streaming content, pauses on scroll-up | +| Cancel | Stop any or all streams mid-generation | +| Message queue | Keep typing and queue follow-ups while a response streams | +| Usage stats | Time-to-first-token and tokens/second displayed | + +### Message Queue + +You don't have to wait for a response to finish before composing the next message. While a +response is streaming, the input stays editable and the send button changes to **Queue** — +queued messages appear as removable chips above the input and are sent one at a time as each +turn (including any tool-execution rounds) completes. A separate **Stop** button remains +available to cancel the in-flight response without affecting queued messages. ### Usage Statistics diff --git a/ui/src/components/ChatInput/ChatInput.stories.tsx b/ui/src/components/ChatInput/ChatInput.stories.tsx index d9ee470..3580454 100644 --- a/ui/src/components/ChatInput/ChatInput.stories.tsx +++ b/ui/src/components/ChatInput/ChatInput.stories.tsx @@ -234,6 +234,106 @@ export const Streaming: Story = { }, }; +/** + * Test: While streaming, the textarea stays enabled and "Send" becomes "Queue" + * (queuing the message) while a separate Stop button aborts the response. + */ +export const StreamingAllowsQueueing: Story = { + args: { + // A queue-backed turn is in flight: both flags are set, so the primary + // button queues the next message rather than starting a concurrent turn. + isStreaming: true, + isQueuing: true, + placeholder: "Type a message...", + onSend: fn(), + onStop: fn(), + }, + play: async ({ canvasElement, args }) => { + const canvas = within(canvasElement); + + // Textarea remains usable while streaming + const textarea = canvas.getByPlaceholderText("Type a message..."); + await expect(textarea).toBeEnabled(); + await userEvent.type(textarea, "Next question"); + + // Primary button now queues rather than stops + const queueButton = canvas.getByRole("button", { name: /queue message/i }); + await expect(queueButton).toBeEnabled(); + await userEvent.click(queueButton); + await expect(args.onSend).toHaveBeenCalledWith("Next question", []); + await expect(args.onStop).not.toHaveBeenCalled(); + + // A distinct Stop button is still available to abort the in-flight response + const stopButton = canvas.getByRole("button", { name: /stop response/i }); + await userEvent.click(stopButton); + await expect(args.onStop).toHaveBeenCalled(); + }, +}; + +/** + * Test: A stream started outside the queue (editAndRerun/regenerateResponse sets + * `isStreaming` but not `isQueuing`) disables the primary button, so a click or + * Enter can't start a second concurrent turn that would clobber the active one. + */ +export const NonQueueStreamBlocksSend: Story = { + args: { + isStreaming: true, + isQueuing: false, + placeholder: "Type a message...", + onSend: fn(), + onStop: fn(), + }, + play: async ({ canvasElement, args }) => { + const canvas = within(canvasElement); + + const textarea = canvas.getByPlaceholderText("Type a message..."); + await userEvent.type(textarea, "Next question"); + + // The primary button is present but disabled in this state. + const queueButton = canvas.getByRole("button", { name: /queue message/i }); + await expect(queueButton).toBeDisabled(); + + // Enter must not slip past the disabled button and dispatch a send. + await userEvent.type(textarea, "{Enter}"); + await expect(args.onSend).not.toHaveBeenCalled(); + + // Stop remains available to abort the externally-started stream. + const stopButton = canvas.getByRole("button", { name: /stop response/i }); + await userEvent.click(stopButton); + await expect(args.onStop).toHaveBeenCalled(); + }, +}; + +/** + * Test: Queued messages render as removable chips above the input + */ +export const WithQueuedMessages: Story = { + args: { + // Messages are queued, so the queue is busy and queueing stays enabled. + isStreaming: true, + isQueuing: true, + placeholder: "Type a message...", + onRemoveQueuedMessage: fn(), + queuedMessages: [ + { id: "q1", content: "First queued message", files: [] }, + { id: "q2", content: "Second queued message", files: [] }, + ], + }, + play: async ({ canvasElement, args }) => { + const canvas = within(canvasElement); + + // Both queued messages are listed + await expect(canvas.getByText("First queued message")).toBeInTheDocument(); + await expect(canvas.getByText("Second queued message")).toBeInTheDocument(); + + // Removing the first chip calls back with its id + const removeButtons = canvas.getAllByRole("button", { name: /remove queued message/i }); + await expect(removeButtons).toHaveLength(2); + await userEvent.click(removeButtons[0]); + await expect(args.onRemoveQueuedMessage).toHaveBeenCalledWith("q1"); + }, +}; + /** * Test: Typing enables the send button */ diff --git a/ui/src/components/ChatInput/ChatInput.tsx b/ui/src/components/ChatInput/ChatInput.tsx index da9f1ae..524f91f 100644 --- a/ui/src/components/ChatInput/ChatInput.tsx +++ b/ui/src/components/ChatInput/ChatInput.tsx @@ -1,6 +1,7 @@ import { AlertCircle, ArrowsUpFromLine, + Clock, MousePointerClick, Paperclip, Send, @@ -47,7 +48,7 @@ import { cn } from "@/utils/cn"; import { isFileTypeAllowed, buildAcceptAttribute } from "@/utils/fileTypes"; import { useQuotedText, usePendingPrompt, useChatUIStore } from "@/stores/chatUIStore"; -import type { ChatFile, HistoryMode } from "@/components/chat-types"; +import type { ChatFile, HistoryMode, QueuedMessage } from "@/components/chat-types"; import { File, FileText, @@ -120,6 +121,10 @@ interface ChatInputProps { onStop?: () => void; onSettingsClick?: () => void; isStreaming?: boolean; + /** True while a turn is in flight (so a send queues instead of dispatching). + * Stays true across tool rounds, unlike `isStreaming`, which flickers false + * between them — used so the action button reliably reads "Queue". */ + isQueuing?: boolean; disabled?: boolean; /** Whether no models are selected (shows a prominent hint overlay) */ noModelsSelected?: boolean; @@ -155,6 +160,10 @@ interface ChatInputProps { onOpenMCPConfig?: () => void; /** Callback when a prompt template is applied */ onApplyPrompt?: (content: string) => void; + /** Messages queued while a response is streaming (sent as each turn completes) */ + queuedMessages?: QueuedMessage[]; + /** Remove a queued message before it is sent */ + onRemoveQueuedMessage?: (id: string) => void; } export function ChatInput({ @@ -162,6 +171,7 @@ export function ChatInput({ onStop, onSettingsClick, isStreaming = false, + isQueuing = false, disabled = false, noModelsSelected = false, noModelsAvailable = false, @@ -181,6 +191,8 @@ export function ChatInput({ onSubAgentModelChange, onOpenMCPConfig, onApplyPrompt, + queuedMessages = [], + onRemoveQueuedMessage, }: ChatInputProps) { const [content, setContent] = useState(""); const [files, setFiles] = useState([]); @@ -258,11 +270,13 @@ export function ChatInput({ const acceptAttribute = useMemo(() => buildAcceptAttribute(allowedTypes), [allowedTypes]); + // Send always submits (queuing the message if a response is still streaming). + // Stopping the current response is a separate button, shown while streaming. const handleSubmit = useCallback(() => { - if (isStreaming) { - onStop?.(); - return; - } + // Mirror the Send button's `disabled` guard for the Enter-key path: block + // sending while a non-queue stream (editAndRerun/regenerateResponse) is in + // flight, since that would start a second concurrent turn and clobber it. + if (isStreaming && !isQueuing) return; const trimmedContent = content.trim(); if (!trimmedContent && files.length === 0) return; @@ -277,7 +291,7 @@ export function ChatInput({ setContent(""); setFiles([]); setPendingSkill(null); - }, [content, files, isStreaming, onSend, onStop, pendingSkill]); + }, [content, files, onSend, pendingSkill, isStreaming, isQueuing]); const enableSkill = useChatUIStore((s) => s.enableSkill); @@ -463,6 +477,39 @@ export function ChatInput({ return (
+ {/* Queued messages: composed while a response was still streaming. They + send one at a time as each in-flight turn completes. */} + {queuedMessages.length > 0 && ( +
    + {queuedMessages.map((msg) => ( +
  • + + + {msg.content || (msg.files.length > 0 ? "(attachment only)" : "")} + + {msg.files.length > 0 && ( + + {msg.files.length} file{msg.files.length > 1 ? "s" : ""} + + )} + {onRemoveQueuedMessage && ( + + )} +
  • + ))} +
+ )} + {/* File previews */} {files.length > 0 && (
@@ -566,7 +613,7 @@ export function ChatInput({ className="min-h-[56px] w-full resize-none border-0 bg-transparent px-4 pt-3 pb-1 text-base focus-visible:ring-0 focus-visible:ring-offset-0" autoResize maxHeight={200} - disabled={disabled || isStreaming} + disabled={disabled} /> {slashQuery && ( @@ -611,11 +658,11 @@ export function ChatInput({ {/* Templates */} {onApplyPrompt && ( - + )} {/* Skills */} - + {/* History mode toggle - only show when multiple models */} {hasMultipleModels && onHistoryModeChange && ( @@ -634,7 +681,7 @@ export function ChatInput({ onClick={() => onHistoryModeChange(historyMode === "all" ? "same-model" : "all") } - disabled={disabled || isStreaming} + disabled={disabled} aria-label={ historyMode === "all" ? "Switch to isolated history" @@ -686,7 +733,7 @@ export function ChatInput({ variant="ghost" className="h-8 w-8 shrink-0 rounded-lg text-muted-foreground hover:text-foreground" onClick={() => fileInputRef.current?.click()} - disabled={disabled || isStreaming} + disabled={disabled} aria-label="Attach files" > @@ -708,7 +755,7 @@ export function ChatInput({ onVectorStoreIdsChange={onVectorStoreIdsChange} vectorStoreOwnerType={vectorStoreOwnerType} vectorStoreOwnerId={vectorStoreOwnerId} - disabled={disabled || isStreaming} + disabled={disabled} availableModels={availableModels} subAgentModel={subAgentModel} onSubAgentModelChange={onSubAgentModelChange} @@ -717,29 +764,40 @@ export function ChatInput({ )}
- {/* Send button */} - )} - + +
diff --git a/ui/src/components/ChatView/ChatView.tsx b/ui/src/components/ChatView/ChatView.tsx index 035cd1c..70aaded 100644 --- a/ui/src/components/ChatView/ChatView.tsx +++ b/ui/src/components/ChatView/ChatView.tsx @@ -1,5 +1,5 @@ import type { VectorStoreOwnerType } from "@/api/generated/types.gen"; -import type { Conversation, ModelParameters } from "@/components/chat-types"; +import type { Conversation, ModelParameters, QueuedMessage } from "@/components/chat-types"; import { ChatHeader } from "@/components/ChatHeader/ChatHeader"; import { ChatInput } from "@/components/ChatInput/ChatInput"; import { ChatMessageList } from "@/components/ChatMessageList/ChatMessageList"; @@ -83,6 +83,13 @@ export interface ChatViewProps { vectorStoreOwnerType?: VectorStoreOwnerType; /** Owner ID for vector store filtering (e.g., user id, org id) */ vectorStoreOwnerId?: string; + /** Messages queued while a response is streaming (sent as each turn completes) */ + queuedMessages?: QueuedMessage[]; + /** Remove a queued message before it is sent */ + onRemoveQueuedMessage?: (id: string) => void; + /** True while a turn is in flight, so further sends queue. Stays true across + * tool rounds (unlike `isStreaming`); drives the Send/Queue button label. */ + isQueuing?: boolean; } export function ChatView({ @@ -105,6 +112,9 @@ export function ChatView({ onRespondMcpApproval, vectorStoreOwnerType, vectorStoreOwnerId, + queuedMessages, + onRemoveQueuedMessage, + isQueuing = false, }: ChatViewProps) { // Subscribe to stores const selectedInstances = useSelectedInstances(); @@ -270,6 +280,7 @@ export function ChatView({ onSend={onSendMessage} onStop={onStopStreaming} isStreaming={isStreaming} + isQueuing={isQueuing} disabled={inputDisabled} noModelsSelected={selectedInstances.length === 0} noModelsAvailable={!isLoadingModels && availableModels.length === 0} @@ -290,6 +301,8 @@ export function ChatView({ onSubAgentModelChange={setSubAgentModel} onOpenMCPConfig={() => setMCPConfigModalOpen(true)} onApplyPrompt={setPendingPrompt} + queuedMessages={queuedMessages} + onRemoveQueuedMessage={onRemoveQueuedMessage} /> diff --git a/ui/src/components/chat-types.ts b/ui/src/components/chat-types.ts index b98ad21..642ee70 100644 --- a/ui/src/components/chat-types.ts +++ b/ui/src/components/chat-types.ts @@ -872,6 +872,17 @@ export interface ChatFile { preview?: string; } +/** + * A message the user composed while a response was still streaming. Queued + * messages are sent one at a time as each in-flight turn completes (see the + * queue drain in `ChatPage`). + */ +export interface QueuedMessage { + id: string; + content: string; + files: ChatFile[]; +} + export interface Conversation { id: string; /** Server-assigned ID after sync (may differ from local id) */ diff --git a/ui/src/pages/chat/ChatPage.tsx b/ui/src/pages/chat/ChatPage.tsx index 982ca01..d65081d 100644 --- a/ui/src/pages/chat/ChatPage.tsx +++ b/ui/src/pages/chat/ChatPage.tsx @@ -4,6 +4,7 @@ import { useQuery } from "@tanstack/react-query"; import { apiV1ModelsOptions } from "@/api/generated/@tanstack/react-query.gen"; import { ChatView, type ChatFile } from "@/components/ChatView/ChatView"; +import { useMessageQueue } from "./useMessageQueue"; import { ErrorBoundary } from "@/components/ErrorBoundary/ErrorBoundary"; import { useConversationsContext } from "@/components/ConversationsProvider/ConversationsProvider"; import { @@ -256,6 +257,30 @@ export default function ChatPage() { const [forkModalOpen, setForkModalOpen] = useState(false); const [forkMessageId, setForkMessageId] = useState(undefined); + // Message queue: lets the user keep composing (and hit "send") while a + // response is still streaming. An idle send goes out immediately; a send + // issued mid-turn is queued and dispatched when the turn completes. See + // `MessageQueue` for why serialization keys off the `sendMessage` promise + // rather than `isStreaming` (which flickers between tool rounds). + const { queuedMessages, isBusy, sendOrQueue, removeQueuedMessage, clearQueue } = + useMessageQueue(sendMessage); + + // Drop pending queued messages when leaving the conversation they were queued + // for, so the singleton doesn't drain them through a different conversation's + // send context. The cleanup fires on a same-route switch (/chat/:idA → + // /chat/:idB, where the id dep changes) and on a remount that unmounts this + // instance (/chat/:id → /chat via "New Chat"), using the id captured at setup. + // The create transition (/chat → /chat/:id) unmounts the conversation-less + // /chat instance, whose captured id is undefined, so a queued follow-up is + // preserved. The local→remote URL flip keeps currentConversation.id stable, so + // the dep doesn't change and the queue survives it. + useEffect(() => { + const id = currentConversation?.id; + return () => { + if (id !== undefined) clearQueue(); + }; + }, [currentConversation?.id, clearQueue]); + const handleSendMessage = useCallback( (content: string, files?: ChatFile[]) => { if (!currentConversation) { @@ -267,9 +292,9 @@ export default function ChatPage() { navigate(`/chat/${newConv.id}`, { replace: true }); setPendingProject({ id: null }); } - sendMessage(content, files ?? []); + sendOrQueue(content, files ?? []); }, - [currentConversation, createConversation, navigate, selectedModels, sendMessage, pendingProject] + [currentConversation, createConversation, navigate, selectedModels, pendingProject, sendOrQueue] ); // Handle regeneration of a single model response @@ -353,6 +378,9 @@ export default function ChatPage() { pendingProjectId={pendingProject.id} onEditAndRerun={editAndRerun} onRespondMcpApproval={respondToMcpApproval} + queuedMessages={queuedMessages} + onRemoveQueuedMessage={removeQueuedMessage} + isQueuing={isBusy} /> {currentConversation && ( diff --git a/ui/src/pages/chat/__tests__/messageQueue.test.ts b/ui/src/pages/chat/__tests__/messageQueue.test.ts new file mode 100644 index 0000000..636c088 --- /dev/null +++ b/ui/src/pages/chat/__tests__/messageQueue.test.ts @@ -0,0 +1,298 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; + +import type { QueuedMessage } from "@/components/chat-types"; +import { MessageQueue, type SendFn } from "../messageQueue"; + +/** A send whose promise we resolve manually, so we can interleave events. */ +function deferred() { + let resolve!: () => void; + let reject!: (e: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +const flush = () => new Promise((r) => setTimeout(r, 0)); + +describe("MessageQueue", () => { + let lastQueue: QueuedMessage[] = []; + const onChange = (q: QueuedMessage[]) => { + lastQueue = q; + }; + + /** Build a queue wired to a send function and the shared `onChange` spy. */ + function makeQueue(send: SendFn) { + const q = new MessageQueue(); + q.subscribe(onChange); + q.setSend(send); + return q; + } + + beforeEach(() => { + lastQueue = []; + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it("sends immediately when idle and shows no queue", () => { + const send = vi.fn().mockResolvedValue(undefined); + const q = makeQueue(send); + + q.sendOrQueue("hello", []); + + expect(send).toHaveBeenCalledTimes(1); + expect(send).toHaveBeenCalledWith("hello", []); + // Idle send is never added to the visible queue. + expect(lastQueue).toEqual([]); + }); + + it("queues a second send while the first is in flight and dispatches it only after the first resolves", async () => { + const order: string[] = []; + const first = deferred(); + const second = deferred(); + const send = vi + .fn() + .mockImplementationOnce(async (content: string) => { + order.push(`start:${content}`); + await first.promise; + order.push(`end:${content}`); + }) + .mockImplementationOnce(async (content: string) => { + order.push(`start:${content}`); + await second.promise; + order.push(`end:${content}`); + }); + + const q = makeQueue(send); + + q.sendOrQueue("one", []); // idle -> sends immediately + q.sendOrQueue("two", []); // busy -> queued + + // Second send must NOT have started yet — the first turn is still streaming. + expect(send).toHaveBeenCalledTimes(1); + expect(q.size).toBe(1); + expect(lastQueue.map((m) => m.content)).toEqual(["two"]); + + // Finish the first turn; the queued message is dispatched next. + first.resolve(); + await flush(); + + expect(send).toHaveBeenCalledTimes(2); + expect(q.size).toBe(0); + expect(lastQueue).toEqual([]); + + second.resolve(); + await flush(); + + // Strictly serialized: never overlapping. + expect(order).toEqual(["start:one", "end:one", "start:two", "end:two"]); + }); + + it("preserves order across several queued messages", async () => { + const gates = [deferred(), deferred(), deferred()]; + let i = 0; + const started: string[] = []; + const send = vi.fn().mockImplementation(async (content: string) => { + started.push(content); + await gates[i++].promise; + }); + + const q = makeQueue(send); + + q.sendOrQueue("a", []); + q.sendOrQueue("b", []); + q.sendOrQueue("c", []); + + expect(started).toEqual(["a"]); + + gates[0].resolve(); + await flush(); + expect(started).toEqual(["a", "b"]); + + gates[1].resolve(); + await flush(); + expect(started).toEqual(["a", "b", "c"]); + + gates[2].resolve(); + await flush(); + expect(q.size).toBe(0); + }); + + it("does not strand the queue when a send rejects", async () => { + const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + const first = deferred(); + const send = vi + .fn() + .mockImplementationOnce(async () => { + await first.promise; + throw new Error("boom"); + }) + .mockResolvedValue(undefined); + + const q = makeQueue(send); + + q.sendOrQueue("one", []); + q.sendOrQueue("two", []); + expect(q.size).toBe(1); + + first.resolve(); // gate opens, then the first turn throws "boom" + await flush(); + + // Despite the first turn failing, the queued message is still dispatched + // and the queue is no longer busy. + expect(send).toHaveBeenCalledTimes(2); + expect(send).toHaveBeenLastCalledWith("two", []); + expect(q.size).toBe(0); + expect(q.isBusy).toBe(false); + errorSpy.mockRestore(); + }); + + it("removes a queued message before it is dispatched", async () => { + const first = deferred(); + const send = vi + .fn() + .mockImplementationOnce(async () => { + await first.promise; + }) + .mockResolvedValue(undefined); + + const q = makeQueue(send); + + q.sendOrQueue("one", []); + q.sendOrQueue("two", []); + const queuedId = lastQueue[0].id; + expect(q.size).toBe(1); + + q.remove(queuedId); + expect(q.size).toBe(0); + expect(lastQueue).toEqual([]); + + first.resolve(); + await flush(); + + // The removed message is never sent (only the idle "one"). + expect(send).toHaveBeenCalledTimes(1); + }); + + it("clear() drops queued messages without affecting the in-flight turn", async () => { + const first = deferred(); + const send = vi + .fn() + .mockImplementationOnce(async () => { + await first.promise; + }) + .mockResolvedValue(undefined); + + const q = makeQueue(send); + + q.sendOrQueue("one", []); + q.sendOrQueue("two", []); + q.sendOrQueue("three", []); + expect(q.size).toBe(2); + + q.clear(); + expect(q.size).toBe(0); + expect(lastQueue).toEqual([]); + + first.resolve(); + await flush(); + + // Only the in-flight "one" ran; the cleared messages did not. + expect(send).toHaveBeenCalledTimes(1); + }); + + it("models the streaming commit order without clobbering an in-flight turn", async () => { + // Simulate the real send: append user msg synchronously, start "streaming", + // and append the assistant msg when the turn resolves. + const transcript: string[] = []; + let streaming = false; + const gate1 = deferred(); + const gate2 = deferred(); + const gates = [gate1, gate2]; + let turn = 0; + + const send = vi.fn().mockImplementation(async (content: string) => { + // No two turns may overlap (would reset the streaming store). + expect(streaming).toBe(false); + transcript.push(`user:${content}`); + streaming = true; + const g = gates[turn++]; + await g.promise; + transcript.push(`assistant:${content}`); + streaming = false; + }); + + const q = makeQueue(send); + + q.sendOrQueue("q1", []); + // User queues q2 while q1 is still streaming. + q.sendOrQueue("q2", []); + expect(transcript).toEqual(["user:q1"]); + + gate1.resolve(); + await flush(); + gate2.resolve(); + await flush(); + + expect(transcript).toEqual(["user:q1", "assistant:q1", "user:q2", "assistant:q2"]); + }); + + it("notifies busy subscribers across the whole drain, not per message", async () => { + const busyEvents: boolean[] = []; + const gates = [deferred(), deferred()]; + let i = 0; + const send = vi.fn().mockImplementation(async () => { + await gates[i++].promise; + }); + + const q = makeQueue(send); + q.subscribeBusy((busy) => busyEvents.push(busy)); + + // subscribeBusy fires immediately with the current (idle) value. + expect(busyEvents).toEqual([false]); + + q.sendOrQueue("one", []); // idle -> busy flips true + q.sendOrQueue("two", []); // queued while busy -> no extra busy event + expect(busyEvents).toEqual([false, true]); + expect(q.isBusy).toBe(true); + + gates[0].resolve(); + await flush(); + // Still draining "two": busy must stay true between turns. + expect(q.isBusy).toBe(true); + expect(busyEvents).toEqual([false, true]); + + gates[1].resolve(); + await flush(); + // Drain complete -> busy flips back to false exactly once. + expect(busyEvents).toEqual([false, true, false]); + expect(q.isBusy).toBe(false); + }); + + it("uses the latest send function for each dispatch", async () => { + const first = deferred(); + const sendA = vi.fn().mockImplementation(async () => { + await first.promise; + }); + const sendB = vi.fn().mockResolvedValue(undefined); + + const q = makeQueue(sendA); + + q.sendOrQueue("one", []); + q.sendOrQueue("two", []); + + // Config changes while the queue drains -> point at the new send function. + q.setSend(sendB); + + first.resolve(); + await flush(); + + expect(sendA).toHaveBeenCalledTimes(1); + expect(sendB).toHaveBeenCalledTimes(1); + expect(sendB).toHaveBeenCalledWith("two", []); + }); +}); diff --git a/ui/src/pages/chat/messageQueue.ts b/ui/src/pages/chat/messageQueue.ts new file mode 100644 index 0000000..b29d18a --- /dev/null +++ b/ui/src/pages/chat/messageQueue.ts @@ -0,0 +1,150 @@ +import type { ChatFile, QueuedMessage } from "@/components/chat-types"; + +export type SendFn = (content: string, files: ChatFile[]) => Promise; + +/** + * Serializes chat sends so the user can keep typing (and hit "send") while a + * response is still streaming. + * + * - An **idle** send goes out immediately — identical to sending without a + * queue at all. + * - A send issued **while a turn is in flight** is appended to the queue and + * dispatched one at a time as each turn completes. + * + * The serialization signal is the `send` promise itself, which (see `useChat`) + * resolves only when the whole turn is done — including multi-round tool + * execution. `isStreaming` can briefly flip false *between* tool rounds, so it + * is deliberately not used to drive the queue here. + * + * This is used as an app-wide singleton (see `chatMessageQueue`) rather than + * component state. The first message of a conversation navigates `/chat` → + * `/chat/:id`, which remounts `ChatPage`; component-scoped state would reset the + * in-flight lock and let a queued message start a second turn *concurrently* + * with the one still streaming (two side-by-side responses for one model). A + * singleton keeps the lock alive across that remount. + * + * `send` is kept current via {@link setSend} (called from an effect in + * `useMessageQueue`) so each dispatch uses the latest `sendMessage` closure, + * picking up model/tool/config changes the user made while the queue was + * draining. + */ +export class MessageQueue { + private queue: QueuedMessage[] = []; + private busy = false; + private send: SendFn | null = null; + private readonly listeners = new Set<(queue: QueuedMessage[]) => void>(); + private readonly busyListeners = new Set<(busy: boolean) => void>(); + + /** Current number of queued (not-yet-dispatched) messages. */ + get size(): number { + return this.queue.length; + } + + /** True while a turn is being sent (and therefore further sends will queue). */ + get isBusy(): boolean { + return this.busy; + } + + // Flip the busy flag and notify subscribers. The UI uses this to keep the + // Send/Queue label in step with the actual dispatch decision: `busy` stays + // true across a whole turn (including between tool rounds, where `isStreaming` + // briefly flips false), so a click always matches the rendered label. + private setBusy(value: boolean) { + if (this.busy === value) return; + this.busy = value; + for (const listener of this.busyListeners) listener(value); + } + + /** Point the queue at the current send function. Call on every render. */ + setSend(send: SendFn) { + this.send = send; + } + + /** Subscribe to queue changes. The listener is invoked immediately with the + * current queue, and on every subsequent change. Returns an unsubscribe. */ + subscribe(listener: (queue: QueuedMessage[]) => void): () => void { + this.listeners.add(listener); + listener([...this.queue]); + return () => { + this.listeners.delete(listener); + }; + } + + /** Subscribe to busy-state changes. The listener is invoked immediately with + * the current value, and on every subsequent change. Returns an unsubscribe. */ + subscribeBusy(listener: (busy: boolean) => void): () => void { + this.busyListeners.add(listener); + listener(this.busy); + return () => { + this.busyListeners.delete(listener); + }; + } + + private emit() { + const snapshot = [...this.queue]; + for (const listener of this.listeners) listener(snapshot); + } + + /** Send now if idle, otherwise queue for after the in-flight turn completes. */ + sendOrQueue(content: string, files: ChatFile[]) { + if (this.busy) { + this.queue = [...this.queue, { id: crypto.randomUUID(), content, files }]; + this.emit(); + return; + } + void this.pump(content, files); + } + + /** Remove a queued message before it is dispatched. */ + remove(id: string) { + const next = this.queue.filter((m) => m.id !== id); + if (next.length === this.queue.length) return; + this.queue = next; + this.emit(); + } + + /** Drop all queued messages (e.g. when switching conversations). Does not + * affect a turn already in flight. */ + clear() { + if (this.queue.length === 0) return; + this.queue = []; + this.emit(); + } + + // Dispatch `first`, then keep draining the queue until it is empty. `busy` + // stays true across the whole drain so no other send can start concurrently + // (which would clobber the in-flight stream). + private async pump(first: string, firstFiles: ChatFile[]) { + this.setBusy(true); + try { + await this.runSafe(first, firstFiles); + while (this.queue.length > 0) { + const [next, ...rest] = this.queue; + this.queue = rest; + this.emit(); + await this.runSafe(next.content, next.files); + } + } finally { + this.setBusy(false); + } + } + + // A single send that never rejects — a failed turn must not strand the rest + // of the queue (or leave `busy` stuck true). + private async runSafe(content: string, files: ChatFile[]) { + const send = this.send; + if (!send) return; + try { + await send(content, files); + } catch (err) { + console.error("Chat message failed to send:", err); + } + } +} + +/** + * App-wide chat send queue. A singleton (not component state) so the in-flight + * lock survives the `ChatPage` remount that happens when the first message of a + * conversation navigates `/chat` → `/chat/:id`. + */ +export const chatMessageQueue = new MessageQueue(); diff --git a/ui/src/pages/chat/useChat.ts b/ui/src/pages/chat/useChat.ts index 3034305..e432597 100644 --- a/ui/src/pages/chat/useChat.ts +++ b/ui/src/pages/chat/useChat.ts @@ -225,7 +225,8 @@ interface UseChatReturn { messages: ChatMessage[]; modelResponses: ModelResponse[]; isStreaming: boolean; - sendMessage: (content: string, files: ChatFile[]) => void; + /** Resolves when the turn fully completes (including multi-round tool execution). */ + sendMessage: (content: string, files: ChatFile[]) => Promise; stopStreaming: () => void; clearMessages: () => void; /** Set messages directly. For functional updates, use the conversation store's actions. */ diff --git a/ui/src/pages/chat/useMessageQueue.ts b/ui/src/pages/chat/useMessageQueue.ts new file mode 100644 index 0000000..21817b2 --- /dev/null +++ b/ui/src/pages/chat/useMessageQueue.ts @@ -0,0 +1,47 @@ +import { useCallback, useEffect, useState } from "react"; + +import type { ChatFile, QueuedMessage } from "@/components/chat-types"; +import { chatMessageQueue, type SendFn } from "./messageQueue"; + +/** + * React wrapper around the {@link chatMessageQueue} singleton. Lets the user + * keep composing and hitting "send" while a response streams; messages queued + * mid-turn are dispatched one at a time as each turn completes. + * + * The queue is a module-level singleton so its in-flight lock survives the + * `ChatPage` remount triggered by the first message's `/chat` → `/chat/:id` + * navigation. The latest `send` closure is pushed in on every render. + */ +export function useMessageQueue(send: SendFn) { + const [queuedMessages, setQueuedMessages] = useState([]); + const [isBusy, setIsBusy] = useState(chatMessageQueue.isBusy); + + // Keep the singleton pointed at the latest send closure. In an effect (not in + // the render phase) so a render that React starts but discards in concurrent + // mode can't leave the singleton holding an uncommitted closure. `send` + // changes whenever model/tool/config changes; the effect commits well before + // any user-triggered send or queue drain, so dispatch always uses the current + // context. + useEffect(() => { + chatMessageQueue.setSend(send); + }, [send]); + + useEffect(() => chatMessageQueue.subscribe(setQueuedMessages), []); + + // Track the busy flag reactively so the UI can label the action button to + // match the dispatch decision. `busy` stays true across a whole turn — unlike + // `isStreaming`, which flickers false between tool rounds — so a click during + // that window correctly reads as "Queue" rather than "Send". + useEffect(() => chatMessageQueue.subscribeBusy(setIsBusy), []); + + const sendOrQueue = useCallback( + (content: string, files: ChatFile[]) => chatMessageQueue.sendOrQueue(content, files), + [] + ); + + const removeQueuedMessage = useCallback((id: string) => chatMessageQueue.remove(id), []); + + const clearQueue = useCallback(() => chatMessageQueue.clear(), []); + + return { queuedMessages, isBusy, sendOrQueue, removeQueuedMessage, clearQueue }; +}