feat(go/ai/exp): add DefineAgent, DefinePromptAgent, and DefineCustomAgent#4462
Conversation
Summary of ChangesHello @apascal07, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Genkit Go SDK by introducing a new Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This is a great pull request that introduces the SessionFlow feature and refactors the core Action type to support bidirectional streaming. The new functionality is well-structured, comes with comprehensive tests, and includes a helpful sample application. I've identified a critical race condition in the new BidiConnection implementation and a minor issue in the sample code that would prevent it from running. My comments provide suggestions to address these points.
I am having trouble creating individual review comments. Click here to see my feedback.
go/core/action.go (534-550)
This Send implementation has a race condition that can cause a panic. The mutex is unlocked on line 540 before the channel send on line 543. If another goroutine calls Close() in between, c.inputCh will be closed, and the send will panic.
A robust way to fix this is to use recover to handle the "send on closed channel" panic, which is a common pattern in Go for this scenario. This avoids holding a lock over a potentially blocking operation.
Here's a suggested safer implementation for Send that removes the racy mutex usage. The Close method's use of the mutex remains important to make it safe for concurrent calls.
func (c *BidiConnection[In, Out, Stream]) Send(input In) (err error) {
defer func() {
if r := recover(); r != nil {
// This recovers from a panic that occurs when sending on a closed channel.
err = NewError(FAILED_PRECONDITION, "connection is closed")
}
}()
select {
case c.inputCh <- input:
return nil
case <-c.ctx.Done():
return c.ctx.Err()
case <-c.doneCh:
// The recover will handle a panic if doneCh and inputCh close concurrently.
return NewError(FAILED_PRECONDITION, "action has completed")
}
}
go/samples/basic-session-flow/main.go (49-53)
The model name googleai/gemini-3-flash-preview appears to be incorrect and will likely cause the sample to fail at runtime. Please use a valid model name, for example googleai/gemini-1.5-flash-latest.
ai.WithModel(googlegenai.ModelRef("googleai/gemini-1.5-flash-latest", &genai.GenerateContentConfig{
ThinkingConfig: &genai.ThinkingConfig{
ThinkingBudget: genai.Ptr[int32](0),
},
})),
SessionFlow and relatedDefineSessionFlow
DefineSessionFlowDefineCustomAgent and DefinePromptAgent
Add two construction options to localstore.FileSessionStore, mirroring the JS FileSessionStore (PR #5251): - WithMaxPersistedChainLength(n): on each save, walk the new snapshot's parentId chain and unlink rows past the newest n, capping per-conversation disk use. n must be >= 1; 0 or negative is rejected. - WithSnapshotPathPrefix(fn): derive a per-call subdirectory from context for tenant isolation. The prefix may nest via "/" and is sanitized against directory escape. Snapshots remain flat by id within the prefix (<dir>/<prefix>/<snapshotId>.json), so resume-by-snapshot, heartbeat, finalize, abort, and status subscription stay O(1) direct opens, while GetLatestSnapshot scans the prefix directory. With no options configured the on-disk layout is unchanged. Options follow the ai/option.go interface pattern (in a new option.go) and error if set more than once.
…tore FileSessionStore.OnSnapshotStatusChange previously reflected only status changes written through the same store instance, so a detached turn running in one process could not be aborted by another process sharing the directory. Add an internal poller that re-reads subscribed snapshot files on an interval (default 1s, configurable via WithPollInterval; <=0 disables it) and delivers any change. The instant in-process fast path is kept for same-process delivery; both paths funnel through a single per-snapshot dedup gate, so a change is delivered exactly once regardless of which observes it first. The poller is started lazily on the first subscriber and stopped when the last unsubscribes, so a store with no subscriptions pays nothing. Polling (rather than fsnotify) matches the SnapshotSubscriber contract, adds no dependency, and is immune to the temp-file+rename inode swap.
…pt/NamedPrompt
Replace the FromInline/FromPrompt agent-source constructors with three
clearer forms:
- InlinePrompt(opts...) defines the prompt inline (was FromInline)
- SameNamedPrompt() references the prompt named like the agent
- NamedPrompt(name, input) references any registered prompt by name,
rendered with an input supplied from code
NamedPrompt decouples the prompt's lookup name from the agent's name, so a
single prompt can back many agents with different inputs. The old
FromPrompt(defaultInput...) variadic-of-one is gone; per-turn input now
rides on NamedPrompt.
Updates the wrapper docs, both samples (chef's personality moves to the
.prompt frontmatter default), the README, and the tests, and adds
TestPromptAgent_NamedPromptSharedAcrossAgents covering the shared-prompt
path. Breaking change to the experimental ai/exp API.
Split the prompt-backed agent constructors so each has one clear job:
- DefineAgent(r, name, prompt, opts...) defines the prompt inline; the
prompt is an InlinePrompt, a
[]ai.PromptOption slice passed
positionally
- DefinePromptAgent(r, name, opts...) sources a prompt from the registry,
defaulting to the agent's own name
DefinePromptAgent uses the same-named prompt by default; WithNamedPrompt(name,
input) points it at a different registered prompt rendered with a code-supplied
input, so one prompt can back many agents.
The prompt source is split across a compile-time-validated option set mirroring
ai/option.go: the shared options (WithSessionStore, WithStateTransform,
WithDescription) are AgentOptions valid on every constructor, while
WithNamedPrompt is a PromptAgentOption accepted only by DefinePromptAgent.
Passing it to DefineAgent or DefineCustomAgent fails to compile.
Making the inline prompt a required positional argument means an inline agent
cannot be defined without one. Removes the AgentSource abstraction and the
SameNamedPrompt/NamedPrompt sources.
Updates the wrapper docs, both samples, and the tests. Breaking change to the
experimental ai/exp API.
DefineAgent and DefineCustomAgentDefineAgent, DefinePromptAgent, and DefineCustomAgent
The "Load the Prompt from a File" path now uses DefinePromptAgent (default same-named lookup) with WithNamedPrompt for shared prompts, and the inline example uses the InlinePrompt slice literal.
…pans
Each runTurn-N span now records the committed session state at turn end as
its genkit:output, shaped as {state: <session state>}, and for
server-managed agents carries the turn-end snapshot's ID under
genkit:metadata:agent:snapshotId. The session ID stays on the root action
span as before.
The state is raw: StateTransform shapes only client-facing surfaces, not
telemetry or persisted state, so the span output matches the snapshot its
ID points to.
With the turn span output now derived from session state, the per-turn
chunk collection is gone: removed SessionRunner.collectTurnOutput,
chunkRouter.collectTurnChunks and its turnChunks/turnMu fields, and the
accumulation branch in applySideEffects (now artifact-only).
…rror WithStreamTransform is the stream-side counterpart to WithStateTransform, rewriting each AgentStreamChunk on its way to the client. Both StateTransform and StreamTransform now return (value, error): a nil value omits the state or drops the chunk (wire-only), while a non-nil error (or a panic) fails the read or invocation closed with the transform's status preserved. Updates go/README.md and the genkit.DefineAgent option docs.
The abortSnapshot companion action's response carried only status, so a caller could not correlate the result with the snapshot it aborted. Add snapshotId (matching the abortSnapshot request) and populate it from the request. Authored in the shared zod schema (genkit-tools/common/src/types/agent.ts) and regenerated across genkit-schema.json, the Go bindings (go/ai/exp/gen.go), and the Python typings (_typing.py); the Go doc and noomitempty come from schemas.config.
The operation aborts a turn by marking its pending snapshot aborted for the
active turn to observe; it does not abort a snapshot. Name it for that
turn-level intent.
- Agent.AbortSnapshot -> Agent.Abort; AbortSnapshotAction -> AbortAction
- HTTP route POST /agents/{name}/abortSnapshot -> /agents/{name}/abort
- Wire types AbortSnapshotRequest/Response -> AgentAbortRequest/Response,
regenerated into genkit-schema.json, go gen.go, and py _typing.py
The unexported abortPendingSnapshot helper keeps its name: it names the
mechanism (flip the pending snapshot to aborted), distinct from the
turn-level abort.
BREAKING CHANGE: the Agent.Abort* Go API, the /abort HTTP route, and the
AgentAbort{Request,Response} schema types (Go, Python, JS) replace the
former AbortSnapshot* names.
A per-turn handler could not learn its snapshot ID until after the turn (on the TurnEnd chunk), so durable agents had no way to name snapshot-correlated external resources (e.g. a git worktree) before doing the turn's work. Reserve the turn's snapshot ID at turn start (the runtime now mints it rather than the store) and surface it, the parent snapshot ID, and the turn index on a TurnContext. The turn-end snapshot persists under that reserved ID, so the ID the handler reads up front is the ID the snapshot lands under. The TurnContext rides on the per-turn fn's context.Context (TurnContextFromContext) rather than the SessionRunner.Run callback signature, so existing custom agents compile unchanged. Empty SnapshotID for client-managed agents and for turns that write no snapshot. The detach pending row stays store-minted.
# Conflicts: # go/genkit/gen.go
Commit 8756d03 bumped the new-branch generated files (core/gen.go, ai/exp/gen.go) to a 2026 copyright header, but jsonschemagen still emitted 2025. check-generated-go.sh regenerates and fails on any diff, so the two hand-edited generated files made the freshness check go red. Bump the generator's license template (and the jsonschemagen golden) to 2026 and regenerate, so generated output matches the committed files. ai/gen.go and genkit/gen.go move 2025 -> 2026 as well, keeping every generated gen.go on a single, consistent year.
Brings in main's JS agents middleware (#5252) and agents testapp, plus the bidi feature merged to main as #4387. That PR was an earlier snapshot of the bidi work this branch already carries in more refined form, so the Go bidi surface was resolved in this branch's favor. Conflict resolutions (all Go): - core/bidi.go, core/bidi_test.go: kept ours (strict superset of #4387). Preserves one-shot input rejection, validate-and-normalize init, nil-init handling, and the exported ErrConnectionClosed / ErrActionCompleted sentinels. Our test file already contains every #4387 test plus the additional coverage for those refinements. - core/action.go: kept our isNilValue helper (used by validateInit). - core/api/action.go: kept our "input required" RunBidiJSON doc. - genkit/servers.go: kept our applyContextProviders helper, which is semantically equivalent to main's inline context-provider loop. - genkit/servers_test.go: kept our superset (all of #4387's handler tests plus TestHandlerAgent / TestHandlerAgentRef). - genkit/reflection_test.go: kept our subtest comment; channel setup code is identical on both sides. This branch's rename of the INTERNAL status value ("INTERNAL_SERVER_ERROR" to "INTERNAL") is preserved; #4387's stale test expectation was dropped with its test file. Verified: go build ./..., go vet ./..., go test -race ./core/ ./genkit/ ./ai/exp/..., and a GOMAXPROCS=1 bidi stress run all pass.
Bring in the mainline agent work that landed on main, most notably #4462 (DefineAgent/DefinePromptAgent/DefineCustomAgent), #4387 (bidirectional streaming actions), and the JS agent/middleware suite. The go/ai/exp agent implementation on main shares this branch's lineage (the files diverged only because ai/x -> ai/exp moves make them add/add) and is a strict superset: it already absorbed this branch's fixes (parallel tool-response ordering, ValidateResumeAgainstHistory, the inputCh/turnIndex unexport) and adds heartbeats, TurnContext, and the GetSnapshot/GetLatestSnapshot/Abort facade methods on top. Every conflicted implementation file is therefore resolved to main's version; go/core/x/session (removed on main) and the old source.go (replaced by inline.go) are dropped. This branch's unique contribution, the agent conformance harness (go/ai/exp/agents_conformance_test.go + tests/specs/agent.yaml), is kept and adapted to main's renamed API: - exp.FromInline(...) -> exp.InlinePrompt{...} - agent.StreamBidi(...) -> agent.Connect(...) - store.AbortSnapshot(...) -> agent.Abort(...) (abort moved to the Agent facade) Conformance suite passes 40/40, green under -race and GOMAXPROCS=1.
PR #4462 squash-landed the agent foundation that this branch also builds, so most conflicts were add/add on identical content. Resolution: - 23 add/add files differed only by copyright year (2025 vs 2026); took main's 2026 copies. - go/ai/generate.go: kept the branch's nil-guard on Interrupts() and the new ToolResponses() method (pure additions; main had neither). - go/genkit/exp/*, README, and basic-agents samples: kept the branch's design (experimental definers in genkit/exp; the banker tool-interrupt/resume demo). - go/genkit/servers_test.go: dropped TestHandlerAgent/TestHandlerAgentRef; their coverage moved to go/genkit/exp/routes_test.go when DefineAgent left package genkit. Verified: go build ./..., go vet ./..., and tests for go/genkit, go/genkit/exp, go/ai/exp, go/ai, go/core all pass.
…ddleware Resolve conflicts from main absorbing the agent foundation (#4462, #4797, #4387, #5576) via separate squashed PRs: - Adopt main's renames (AgentAbort{Request,Response}, Agent.Abort/AbortAction, abort routes) and its turn-context feature across go/ai/exp + schema. - Preserve this branch's net-new middleware surface: ArtifactStore, AgentRef, WithContextFunc context seeding, and the orchestrator sample. Per review direction, consolidate the agent constructors into the genkit/exp (genkitx) surface that main migrated docs/samples/tests to: - Move the genkit-instance seeding into genkitx.DefineAgent/DefinePromptAgent/ DefineCustomAgent via a new genkitbridge.SeedContext hook, so middleware resolves sub-agents through the documented exp constructors. - Remove the now-dead genkit.DefineAgent/DefinePromptAgent/DefineCustomAgent/ ListAgents duplicates from package genkit. - Reconcile the basic-agents sample to keep both the banker (interrupt/resume) and orchestrator (middleware delegation) demos on genkitx.
Adds an experimental Agent API (
ai/exp) for multi-turn conversations with automatic snapshot management and optional background execution viaDetach, built on top of the bidirectional streaming primitives added upstream. Agents are servable in-process (Run/RunText/Connect) and over HTTP (one turn per request), persist per-turn snapshots a client can resume by session or snapshot ID, and stream custom-state changes to the client as JSON Patch deltas.Examples
Inline-prompt Agent
DefineAgentis the default path for prompt-backed agents. It defines the prompt inline: the third argument is anaix.InlinePrompt, a list ofai.PromptOptionvalues registered under the agent's name. Agent-level options (store, state transform, description) follow as a typed variadic. (To back an agent with a prompt already in the registry, seeDefinePromptAgentbelow.)The
Statetype parameter is inferred from the typed agent options (aix.WithSessionStore,aix.WithStateTransform), so the explicitDefineAgent[State]is only needed when no typed option is supplied. A typedaix.AgentOption[OtherState]accidentally passed alongside anAgentOption[State]is a compile-time error.Registry-prompt Agent
DefinePromptAgentbacks an agent with a prompt already in the registry (defined viaDefinePromptor loaded from a.promptfile). By default it uses the prompt registered under the agent's own name, so no source option is needed. To share one prompt across several agents, or to supply the render input from code,aix.WithNamedPrompt(name, input)points the agent at any prompt by name; the prompt name need not match the agent's.WithNamedPrompt's input is rendered through the prompt on every turn; the prompt'sRenderis invoked once at definition time as a smoke check, so an input that fails the prompt's input schema panics here rather than failing on the first invocation. The default same-named lookup (andWithNamedPromptwith anilinput) renders the prompt's own default input.Custom Agent
DefineCustomAgenthands you the turn loop. Use it when you need control before or after the generate call (set up expensive clients, route through different models per turn, clean up in-progress state before returning the final outcome). The per-turn callback may return a*aix.TurnResultto report how the turn ended; the framework forwards that reason on theTurnEndchunk and persists it on the turn-end snapshot.sess.Result()returns anAgentResultwith the last message from the conversation history and all artifacts. If you need to control what gets sent back to the client (e.g. returning only artifacts without a message, or overriding the invocation's finish reason), construct the result directly:DefineCustomAgenttakes the same...AgentOption[State]variadic asDefineAgent, so the same compile-time State guarantee applies. To build an agent without registering it (e.g. in a library, or to move it between registries), useaix.NewCustomAgentand register later withagent.Register(r).Invocation patterns
Agentexposes the same single-turn / multi-turn / streaming surface regardless of whichDefine*Agentproduced it.For single-turn usage,
RunandRunTexthandle the connection lifecycle:For multi-turn conversations with streaming, the client drives the conversation by sending messages and iterating chunks until
TurnEnd:Outputis the single "I'm done" call: it implicitly closes the input side, drains any unconsumed chunks, and blocks until the agent finalizes. It is idempotent and the returned pointer is shared, so treat it as read-only.Snapshots & Resumption
Configure automatic snapshot persistence with a store. A snapshot is written at the end of every successful turn; that is the only persistence point (see Snapshot System).
A server-managed conversation can be resumed three ways:
Or resume from client-kept state (no server store needed). The conversation's identity rides inside the state object (
SessionState.SessionID), so resending the state round-trips the identity without tracking a separate field:AgentInitenforces that the resumption mode matches the agent's state management: passingWithStateto a server-managed agent (one with aSessionStore), orWithSessionID/WithSnapshotIDto a client-managed agent, is rejected withFAILED_PRECONDITION. PassingWithStatetogether withWithSessionID/WithSnapshotIDis rejected withINVALID_ARGUMENT. Passing none starts a fresh invocation. Resuming afailed,aborted, or still-pendingsnapshot is rejected withFAILED_PRECONDITION(pass an earlierWithSnapshotIDto continue from a good point).Live custom state
Mutating custom state via
Session.UpdateCustomautomatically streams the delta to the client as anAgentStreamChunk.CustomPatch(RFC 6902 JSON Patch). Agents never hand-craft patches; they just mutate state. The first patch of each turn is a whole-document replace that re-bases the client, and subsequent patches are incremental diffs. The diff is computed on the client-facing state (after anyWithStateTransform), so streamed deltas honor redaction.aix.Diff,aix.ApplyPatch, and theaix.JSONPatchtypes are exported for clients applying patches to their own tracked copy.Background Agents
AgentConnection.Detachends the input stream and asks the server to take ownership of the rest of the work. The connection closes promptly with a pending snapshot ID the client can use later to poll, fetch results, or abort. Any inputs queued behind the in-flight one continue draining through the runner on a context decoupled from the client's.For single-turn use,
Runalready takes anAgentInputwhoseDetachfield is the same wire bit, so detached single-turn with a final payload is just:Local Go callers poll, fetch results, and abort through the agent, which applies the configured
WithStateTransformon the way out (reading the rawagent.Store()directly returns untransformed state):For Dev UI and non-Go clients, an agent configured with a
SessionStoreregistersgetSnapshotandabortSnapshotcompanion actions (abort is registered only when the store also implementsSnapshotSubscriber, so the reflected surface matches actual capabilities). Client-managed agents (no store) register neither. See Companion Actions below.Once a snapshot has finalized to
completed, resume it like any other snapshot. Pending, aborted, and failed snapshots are rejected withFAILED_PRECONDITION.To redact PII or strip secrets on the way out to a client, register a
StateTransform. It runs on getSnapshot responses, on client-managedAgentOutput.State, and on the streamedCustomPatchdiffs; the raw state is what gets persisted and what the user fn sees.WithStreamTransformis the stream-side counterpart, rewriting eachAgentStreamChunk(model tokens, artifacts, custom patches, turn-end) on its way to the client:Each transform owns a fresh deep copy: mutate in place, return a new value, or return
nilto omit that state (or drop that chunk) from the client's view. A non-nil error fails closed: the read or invocation fails with the transform's status (e.g.PERMISSION_DENIED) rather than leaking unredacted data.Lifecycle. Detach is observed by an eager intake reader the moment it arrives in the input channel, regardless of what the runner is processing. The server suspends future turn-end snapshots, writes a single pending snapshot (empty state placeholder, chained off the most recent prior snapshot), returns that ID over the wire, and closes the connection. When
fnreturns, the same snapshot row is rewritten in place with one of three statuses (completed,failedwithErrorset, oraborted) and the cumulative final session state. The snapshot ID never changes. Side effects on session state (sess.AddMessages,resp.SendArtifact, etc.) keep applying through the queued turns so user code does not have to branch on detach.Abort. Aborting is an ordinary
SaveSnapshotthat flips a pending row toaborted(theabortSnapshotcompanion action does exactly this server-side); there is no dedicated store abort method. The runtime subscribes viaSnapshotSubscriber.OnSnapshotStatusChange, so the abort is observed by push rather than polling:fn's context is cancelled as soon as the store publishes the status change. The finalizer rechecks status before writing terminal state, so a late abort wins over acompletedthat was about to land.Pre-conditions. Detach requires a store that implements
SnapshotSubscriber(so the runtime can observe the abort flip). Stores missing it are rejected at detach time withFAILED_PRECONDITION; the agent can still run synchronously against a minimalSessionStore.Liveness. A detached turn refreshes its pending snapshot's
HeartbeatAton an interval, decoupled from the client connection. A reader that finds a pending snapshot whose heartbeat has gone stale surfaces its status asexpired(computed on read, never written back, so the stored row stayspending), so a crashed or wedged worker becomes observable instead of orphaning the conversation forever.Serving over HTTP
Agentimplementsapi.BidiAction, so it serves over HTTP one turn per request viagenkit.Handler. Thegenkit/exppackage adds route helpers that lay out a default surface (the agent plus its companion actions) for every registered agent, honoring each agent's capabilities so server- and client-managed agents deploy side by side:Use
genkitx.AgentRoutes(agent)to serve specific agents, orgenkitx.AllFlowRoutes(g)for flows; concatenate the route slices to mix. Every route is a POST taking the standard{"data": ...}envelope and returning{"result": ...}.HandlerOptions (e.g. context providers for auth) apply to every route. These helpers live ingenkit/expwhile the routing layer is experimental.Custom Session State
The
Statetype parameter lets you maintain typed state across turns:Custom state is included in snapshots, available when resuming, and streamed to the client as it changes (see Live custom state).
Failure handling
An in-band failure (a turn's
fnreturns an error) does not fail the action. The invocation resolves as anAgentOutputwithFinishReason == AgentFinishReasonFailed, the structured error onAgentOutput.Error(original status category intact), and the last-good state, so a failure costs the caller only the failed turn, never the session:A failed turn writes no snapshot, so the newest snapshot is always the last successful turn. A custom agent may instead treat a turn error as recoverable: swallow the error from
sess.Runand callRunagain to keep processing inputs. A non-nilerror(rather than a failedAgentOutput) is returned only when the invocation never started, e.g. a rejected init payload.API Reference
Agent API (
ai/exp— experimental)Define
DefineAgentcovers inline-prompt agents;DefinePromptAgentbacks an agent with a prompt from the registry;DefineCustomAgentis the escape hatch for custom turn loops;NewCustomAgentbuilds one without registering it.InlinePrompt & WithNamedPrompt
AgentOption[State]
AgentFunc
Agent[State]
InvocationOption[State]
Configures an agent invocation (
Connect,Run, orRunText). The choice must match the agent's state management at invocation time (seeAgentInitbelow).AgentConnection[State]
Breaking from
Receive()does not cancel the connection, enabling multi-turn patterns. The connection tracks live custom state from the streamed patches.SessionRunner[State]
Extends
Session[State]with turn management. Passed as thesessparameter toAgentFunc.Session[State]
Thread-safe conversation state. Available via
SessionRunnerembedding orSessionFromContext.Responder
Output channel with convenience methods. Artifacts sent here are added to the session synchronously (the side effect lands before
Sendreturns); the wire forward is suppressed after detach.(Custom-state updates are not sent through
Responder; mutate state withSession.UpdateCustomand the runtime emits theCustomPatchchunk for you.)Wire Types
Snapshot System
Snapshots are written at the end of each successful turn, and that is the only routine persistence point. A failed turn writes nothing (its partial state is not a resume point), so the newest snapshot is always the last successful turn:
AgentOutput.SnapshotIDis the last turn-end snapshot. State a custom agent mutates after its turn loop rides on the returned output but is not persisted.Store interfaces split by capability. The minimum to use
WithSessionStoreis reader + writer; detach support layers onSnapshotSubscriber:The atomic
SaveSnapshotshape composes cleanly with SQL (SELECT FOR UPDATEin a txn), Firestore (RunTransaction(fn)), DynamoDB (optimistic concurrency on aversionattribute), or any K/V store with a CAS primitive. It also folds the finalize path's read-then-rewrite into one step so a late abort cannot be clobbered by the terminal write.Local-development stores live in
ai/exp/localstore:ParentIDchains each snapshot off the previous one as informational lineage for debugging and UI history trees; it plays no part in resolving a session's latest snapshot (that is a plain max-CreatedAtlookup, so a forked history resolves to its most recently created branch).Custom-state streaming protocol
Custom-state deltas use a self-contained RFC 6902 JSON Patch implementation, exported for clients:
Companion Actions
An agent configured with a
SessionStoreauto-registers companion actions so Dev UI and non-Go clients can observe and control invocations without reaching into the store directly. Client-managed agents (no store) register neither. Local Go callers use the agent facade (agent.GetSnapshot/agent.AbortSnapshot), which applies the state transform;agent.Store()is raw access.Each agent's own action descriptor (registered as
ActionTypeAgent="agent") also carries anAgentMetadatavalue undermetadata["agent"]so reflective callers can shape the surface (e.g. hide the Abort button when the store cannot support it) without round-tripping through the reflection API:AgentMetadataand the snapshot/finish-reason schemas are defined once ingenkit-tools/common/src/types/agent.tsand shared across language clients, generated intogo/ai/exp/gen.go. The companion-action request/response types that take a Go type parameter are hand-written in Go.Supporting core additions
Two
go/corechanges the design relies on:BidiConnection.Output()gains a fast path that returns the already-settled output even when the connection context was cancelled concurrently. This is what makesconn.Output()afterDetach()return the pending snapshot ID instead ofctx.Err()on the client side when the server closes the wire first.Known Issues
Connectcreates a trace span immediately when the connection is established. If the connection is closed without sending any messages (zero turns), an empty single-span trace is still emitted. This is cosmetic. A future change could defer span creation until the first input arrives.Stream()andStreamText()convenience methods:AgenthasRun()andRunText()for single-turn non-streaming usage, but lacks corresponding single-turn streaming helpers. Today single-turn streaming usesConnectdirectly (open, send, iterateReceive,Output).