Skip to content

feat(go/ai/exp): add DefineAgent, DefinePromptAgent, and DefineCustomAgent#4462

Merged
apascal07 merged 158 commits into
mainfrom
ap/go-session-flow
Jun 24, 2026
Merged

feat(go/ai/exp): add DefineAgent, DefinePromptAgent, and DefineCustomAgent#4462
apascal07 merged 158 commits into
mainfrom
ap/go-session-flow

Conversation

@apascal07

@apascal07 apascal07 commented Feb 6, 2026

Copy link
Copy Markdown
Collaborator

Adds an experimental Agent API (ai/exp) for multi-turn conversations with automatic snapshot management and optional background execution via Detach, built on top of the bidirectional streaming primitives added upstream. Agents are servable in-process (Run/RunText/Connect) and over HTTP (one turn per request), persist per-turn snapshots a client can resume by session or snapshot ID, and stream custom-state changes to the client as JSON Patch deltas.

Examples

Inline-prompt Agent

DefineAgent is the default path for prompt-backed agents. It defines the prompt inline: the third argument is an aix.InlinePrompt, a list of ai.PromptOption values registered under the agent's name. Agent-level options (store, state transform, description) follow as a typed variadic. (To back an agent with a prompt already in the registry, see DefinePromptAgent below.)

chatAgent := genkit.DefineAgent(g, "chat",
    aix.InlinePrompt{
        ai.WithModelName("googleai/gemini-flash-latest"),
        ai.WithSystem("You are a sarcastic pirate. Keep responses concise."),
    },
    aix.WithSessionStore(localstore.NewInMemorySessionStore[any]()),
)

conn, _ := chatAgent.Connect(ctx)
conn.SendText("What is Go?")
for chunk, _ := range conn.Receive() {
    if chunk.ModelChunk != nil { fmt.Print(chunk.ModelChunk.Text()) }
    if chunk.TurnEnd != nil { break }
}
conn.Close()

The State type parameter is inferred from the typed agent options (aix.WithSessionStore, aix.WithStateTransform), so the explicit DefineAgent[State] is only needed when no typed option is supplied. A typed aix.AgentOption[OtherState] accidentally passed alongside an AgentOption[State] is a compile-time error.


Registry-prompt Agent

DefinePromptAgent backs an agent with a prompt already in the registry (defined via DefinePrompt or loaded from a .prompt file). By default it uses the prompt registered under the agent's own name, so no source option is needed. To share one prompt across several agents, or to supply the render input from code, aix.WithNamedPrompt(name, input) points the agent at any prompt by name; the prompt name need not match the agent's.

# prompts/chat.prompt
---
model: googleai/gemini-flash-latest
input:
  schema:
    personality: string
  default:
    personality: a helpful assistant
---
{{ role "system" }}
You are {{ personality }}. Keep responses concise.
type ChatInput struct {
    Personality string `json:"personality"`
}

// Default: uses the prompt named "chat" (chat.prompt) with its frontmatter default.
chatAgent := genkit.DefinePromptAgent(g, "chat",
    aix.WithSessionStore(localstore.NewInMemorySessionStore[any]()),
)

// Or back several agents with the one prompt, each with its own input:
pirate := genkit.DefinePromptAgent(g, "pirate",
    aix.WithNamedPrompt[any]("chat", ChatInput{Personality: "a sarcastic pirate"}),
    aix.WithSessionStore(localstore.NewInMemorySessionStore[any]()),
)

WithNamedPrompt's input is rendered through the prompt on every turn; the prompt's Render is invoked once at definition time as a smoke check, so an input that fails the prompt's input schema panics here rather than failing on the first invocation. The default same-named lookup (and WithNamedPrompt with a nil input) renders the prompt's own default input.


Custom Agent

DefineCustomAgent hands you the turn loop. Use it when you need control before or after the generate call (set up expensive clients, route through different models per turn, clean up in-progress state before returning the final outcome). The per-turn callback may return a *aix.TurnResult to report how the turn ended; the framework forwards that reason on the TurnEnd chunk and persists it on the turn-end snapshot.

chatAgent := genkit.DefineCustomAgent(g, "chat",
    func(ctx context.Context, resp aix.Responder, sess *aix.SessionRunner[ChatState]) (*aix.AgentResult, error) {
        if err := sess.Run(ctx, func(ctx context.Context, input *aix.AgentInput) (*aix.TurnResult, error) {
            for chunk, err := range genkit.GenerateStream(ctx, g,
                ai.WithModelName("googleai/gemini-flash-latest"),
                ai.WithMessages(sess.Messages()...),
            ) {
                if err != nil {
                    return nil, err
                }
                if chunk.Done {
                    sess.AddMessages(chunk.Response.Message)
                    return &aix.TurnResult{
                        FinishReason: aix.AgentFinishReason(chunk.Response.FinishReason),
                    }, nil
                }
                resp.SendModelChunk(chunk.Chunk) // stream tokens to client
            }
            return nil, nil
        }); err != nil {
            return nil, err
        }
        return sess.Result(), nil
    },
)

sess.Result() returns an AgentResult with the last message from the conversation history and all artifacts. If you need to control what gets sent back to the client (e.g. returning only artifacts without a message, or overriding the invocation's finish reason), construct the result directly:

return &aix.AgentResult{Artifacts: sess.Artifacts()}, nil

DefineCustomAgent takes the same ...AgentOption[State] variadic as DefineAgent, so the same compile-time State guarantee applies. To build an agent without registering it (e.g. in a library, or to move it between registries), use aix.NewCustomAgent and register later with agent.Register(r).


Invocation patterns

Agent exposes the same single-turn / multi-turn / streaming surface regardless of which Define*Agent produced it.

For single-turn usage, Run and RunText handle the connection lifecycle:

output, _ := chatAgent.RunText(ctx, "What is Go?")
fmt.Println(output.Message.Text())

For multi-turn conversations with streaming, the client drives the conversation by sending messages and iterating chunks until TurnEnd:

conn, _ := chatAgent.Connect(ctx)

conn.SendText("What is Go?")

for chunk, err := range conn.Receive() {
    if chunk.ModelChunk != nil {
        fmt.Print(chunk.ModelChunk.Text())
    }
    if chunk.TurnEnd != nil {
        break // turn complete, ready for next input
    }
}

conn.SendText("Tell me more about its concurrency model")
// ... iterate conn.Receive() again ...

output, _ := conn.Output() // closes the input side, drains, returns final AgentOutput

Output is the single "I'm done" call: it implicitly closes the input side, drains any unconsumed chunks, and blocks until the agent finalizes. It is idempotent and the returned pointer is shared, so treat it as read-only.


Snapshots & Resumption

Configure automatic snapshot persistence with a store. A snapshot is written at the end of every successful turn; that is the only persistence point (see Snapshot System).

store := localstore.NewInMemorySessionStore[MyState]()

chatAgent := genkit.DefineAgent(g, "chat",
    aix.InlinePrompt{ai.WithModelName("googleai/gemini-flash-latest")},
    aix.WithSessionStore(store),
)

A server-managed conversation can be resumed three ways:

// By session ID: continue the conversation from its latest snapshot. This is
// the common case; the client only needs to remember out.SessionID.
out, _ := chatAgent.RunText(ctx, "continue", aix.WithSessionID[MyState](sessionID))

// By snapshot ID: continue from one specific snapshot (e.g. an earlier branch).
out, _ = chatAgent.RunText(ctx, "continue", aix.WithSnapshotID[MyState](snapshotID))

// Both: the snapshot picks the resume point and the session ID is asserted
// against it, so an invocation never silently continues the wrong conversation.
out, _ = chatAgent.RunText(ctx, "continue",
    aix.WithSnapshotID[MyState](snapshotID), aix.WithSessionID[MyState](sessionID))

Or resume from client-kept state (no server store needed). The conversation's identity rides inside the state object (SessionState.SessionID), so resending the state round-trips the identity without tracking a separate field:

out, _ := chatAgent.RunText(ctx, "continue", aix.WithState(prevOutput.State))

AgentInit enforces that the resumption mode matches the agent's state management: passing WithState to a server-managed agent (one with a SessionStore), or WithSessionID / WithSnapshotID to a client-managed agent, is rejected with FAILED_PRECONDITION. Passing WithState together with WithSessionID/WithSnapshotID is rejected with INVALID_ARGUMENT. Passing none starts a fresh invocation. Resuming a failed, aborted, or still-pending snapshot is rejected with FAILED_PRECONDITION (pass an earlier WithSnapshotID to continue from a good point).


Live custom state

Mutating custom state via Session.UpdateCustom automatically streams the delta to the client as an AgentStreamChunk.CustomPatch (RFC 6902 JSON Patch). Agents never hand-craft patches; they just mutate state. The first patch of each turn is a whole-document replace that re-bases the client, and subsequent patches are incremental diffs. The diff is computed on the client-facing state (after any WithStateTransform), so streamed deltas honor redaction.

// In the agent: a normal state mutation; the patch is emitted for you.
sess.UpdateCustom(func(s ChatState) ChatState {
    s.Step = "searching flights"
    return s
})

// On the client: AgentConnection tracks the patches and exposes the live value.
for chunk, _ := range conn.Receive() {
    if len(chunk.CustomPatch) > 0 {
        state, _ := conn.Custom() // reflects every delta observed so far
        render(state.Step)
    }
    if chunk.TurnEnd != nil { break }
}

aix.Diff, aix.ApplyPatch, and the aix.JSONPatch types are exported for clients applying patches to their own tracked copy.


Background Agents

AgentConnection.Detach ends the input stream and asks the server to take ownership of the rest of the work. The connection closes promptly with a pending snapshot ID the client can use later to poll, fetch results, or abort. Any inputs queued behind the in-flight one continue draining through the runner on a context decoupled from the client's.

chatAgent := genkit.DefineAgent(g, "chat",
    aix.InlinePrompt{ai.WithModelName("googleai/gemini-flash-latest")},
    aix.WithSessionStore(store),
)

conn, _ := chatAgent.Connect(ctx)
conn.SendText("draft a long report on Go's runtime")
conn.SendText("...and email it to me when done")

// Client walks away. The server keeps working.
conn.Detach()

out, _ := conn.Output() // returns immediately; FinishReason == AgentFinishReasonDetached
fmt.Println(out.SnapshotID)

For single-turn use, Run already takes an AgentInput whose Detach field is the same wire bit, so detached single-turn with a final payload is just:

out, _ := chatAgent.Run(ctx, &aix.AgentInput{
    Detach:  true,
    Message: ai.NewUserTextMessage("..."),
})

Local Go callers poll, fetch results, and abort through the agent, which applies the configured WithStateTransform on the way out (reading the raw agent.Store() directly returns untransformed state):

snap, _ := chatAgent.GetSnapshot(ctx, id) // transform applied; GetLatestSnapshot(ctx, sessionID) resolves by session

switch snap.Status {
case aix.SnapshotStatusPending:   // still working; snap.State is empty until finalize
case aix.SnapshotStatusCompleted: // snap.State has the cumulative final state
case aix.SnapshotStatusFailed:    // snap.Error has the structured failure (*core.GenkitError)
case aix.SnapshotStatusAborted:   // someone (or some thing) aborted it
case aix.SnapshotStatusExpired:   // pending row whose detached worker went dark (heartbeat stale)
}

// Abort flips a still-pending row to aborted; the runtime observes the flip via
// SnapshotSubscriber and cancels the background work. No-op on a missing or
// already-terminal snapshot. (Remote clients call the abortSnapshot companion
// action, which does the same flip server-side.)
status, _ := chatAgent.AbortSnapshot(ctx, id)

For Dev UI and non-Go clients, an agent configured with a SessionStore registers getSnapshot and abortSnapshot companion actions (abort is registered only when the store also implements SnapshotSubscriber, so the reflected surface matches actual capabilities). Client-managed agents (no store) register neither. See Companion Actions below.

Once a snapshot has finalized to completed, resume it like any other snapshot. Pending, aborted, and failed snapshots are rejected with FAILED_PRECONDITION.

To redact PII or strip secrets on the way out to a client, register a StateTransform. It runs on getSnapshot responses, on client-managed AgentOutput.State, and on the streamed CustomPatch diffs; the raw state is what gets persisted and what the user fn sees. WithStreamTransform is the stream-side counterpart, rewriting each AgentStreamChunk (model tokens, artifacts, custom patches, turn-end) on its way to the client:

chatAgent := genkit.DefineAgent(g, "chat",
    aix.InlinePrompt{ai.WithModelName("googleai/gemini-flash-latest")},
    aix.WithSessionStore(store),
    aix.WithStateTransform[MyState](func(ctx context.Context, s *aix.SessionState[MyState]) (*aix.SessionState[MyState], error) {
        return redactPII(ctx, s) // ctx carries caller identity for RBAC-aware redaction
    }),
    // A chunk carries no state type, so its State arg is passed explicitly.
    aix.WithStreamTransform[MyState](func(ctx context.Context, c *aix.AgentStreamChunk) (*aix.AgentStreamChunk, error) {
        return redactChunk(ctx, c), nil
    }),
)

Each transform owns a fresh deep copy: mutate in place, return a new value, or return nil to omit that state (or drop that chunk) from the client's view. A non-nil error fails closed: the read or invocation fails with the transform's status (e.g. PERMISSION_DENIED) rather than leaking unredacted data.

Lifecycle. Detach is observed by an eager intake reader the moment it arrives in the input channel, regardless of what the runner is processing. The server suspends future turn-end snapshots, writes a single pending snapshot (empty state placeholder, chained off the most recent prior snapshot), returns that ID over the wire, and closes the connection. When fn returns, the same snapshot row is rewritten in place with one of three statuses (completed, failed with Error set, or aborted) and the cumulative final session state. The snapshot ID never changes. Side effects on session state (sess.AddMessages, resp.SendArtifact, etc.) keep applying through the queued turns so user code does not have to branch on detach.

Abort. Aborting is an ordinary SaveSnapshot that flips a pending row to aborted (the abortSnapshot companion action does exactly this server-side); there is no dedicated store abort method. The runtime subscribes via SnapshotSubscriber.OnSnapshotStatusChange, so the abort is observed by push rather than polling: fn's context is cancelled as soon as the store publishes the status change. The finalizer rechecks status before writing terminal state, so a late abort wins over a completed that was about to land.

Pre-conditions. Detach requires a store that implements SnapshotSubscriber (so the runtime can observe the abort flip). Stores missing it are rejected at detach time with FAILED_PRECONDITION; the agent can still run synchronously against a minimal SessionStore.

Liveness. A detached turn refreshes its pending snapshot's HeartbeatAt on an interval, decoupled from the client connection. A reader that finds a pending snapshot whose heartbeat has gone stale surfaces its status as expired (computed on read, never written back, so the stored row stays pending), so a crashed or wedged worker becomes observable instead of orphaning the conversation forever.


Serving over HTTP

Agent implements api.BidiAction, so it serves over HTTP one turn per request via genkit.Handler. The genkit/exp package adds route helpers that lay out a default surface (the agent plus its companion actions) for every registered agent, honoring each agent's capabilities so server- and client-managed agents deploy side by side:

import genkitx "github.com/firebase/genkit/go/genkit/exp"

mux := http.NewServeMux()
for _, r := range genkitx.AllAgentRoutes(g) {
    mux.HandleFunc(r.Pattern(), r.Handler())
}
// POST /agents/chat                one turn per request (?stream=true for SSE)
// POST /agents/chat/getSnapshot    read a snapshot by ID
// POST /agents/chat/abortSnapshot  abort background work
log.Fatal(server.Start(ctx, "127.0.0.1:8080", mux))

Use genkitx.AgentRoutes(agent) to serve specific agents, or genkitx.AllFlowRoutes(g) for flows; concatenate the route slices to mix. Every route is a POST taking the standard {"data": ...} envelope and returning {"result": ...}. HandlerOptions (e.g. context providers for auth) apply to every route. These helpers live in genkit/exp while the routing layer is experimental.


Custom Session State

The State type parameter lets you maintain typed state across turns:

type ChatState struct {
    TopicsDiscussed []string `json:"topicsDiscussed"`
}

chatAgent := genkit.DefineCustomAgent(g, "chat",
    func(ctx context.Context, resp aix.Responder, sess *aix.SessionRunner[ChatState]) (*aix.AgentResult, error) {
        if err := sess.Run(ctx, func(ctx context.Context, input *aix.AgentInput) (*aix.TurnResult, error) {
            // ... generate response ...

            sess.UpdateCustom(func(s ChatState) ChatState {
                s.TopicsDiscussed = append(s.TopicsDiscussed, extractTopic(input))
                return s
            })
            return nil, nil
        }); err != nil {
            return nil, err
        }
        return sess.Result(), nil
    },
    aix.WithSessionStore(localstore.NewInMemorySessionStore[ChatState]()),
)

Custom state is included in snapshots, available when resuming, and streamed to the client as it changes (see Live custom state).


Failure handling

An in-band failure (a turn's fn returns an error) does not fail the action. The invocation resolves as an AgentOutput with FinishReason == AgentFinishReasonFailed, the structured error on AgentOutput.Error (original status category intact), and the last-good state, so a failure costs the caller only the failed turn, never the session:

out, _ := chatAgent.RunText(ctx, "...")
if out.FinishReason == aix.AgentFinishReasonFailed {
    log.Printf("turn failed: %v (status %s)", out.Error.Message, out.Error.Status)
    // Server-managed: out.SnapshotID points at the last good turn.
    // Client-managed: out.State holds the last good state. Resume and retry.
}

A failed turn writes no snapshot, so the newest snapshot is always the last successful turn. A custom agent may instead treat a turn error as recoverable: swallow the error from sess.Run and call Run again to keep processing inputs. A non-nil error (rather than a failed AgentOutput) is returned only when the invocation never started, e.g. a rejected init payload.


API Reference

Agent API (ai/exp — experimental)

Define

DefineAgent covers inline-prompt agents; DefinePromptAgent backs an agent with a prompt from the registry; DefineCustomAgent is the escape hatch for custom turn loops; NewCustomAgent builds one without registering it.

// DefineAgent registers an agent backed by an inline prompt. The prompt is
// an InlinePrompt ([]ai.PromptOption) registered under the agent's name.
// State is inferred from the typed variadic of AgentOption[State]; a
// mismatch fails at compile time.
func DefineAgent[State any](
    r api.Registry, name string,
    prompt InlinePrompt,
    opts ...AgentOption[State],
) *Agent[State]

// DefinePromptAgent registers an agent backed by a prompt from the registry.
// With no source option it uses the prompt registered under the agent's own
// name; WithNamedPrompt points it at a different one. It takes a wider
// PromptAgentOption[State] variadic (every AgentOption is one) that also
// admits WithNamedPrompt.
func DefinePromptAgent[State any](
    r api.Registry, name string,
    opts ...PromptAgentOption[State],
) *Agent[State]

// DefineCustomAgent registers an agent whose per-turn body you implement.
func DefineCustomAgent[State any](
    r api.Registry, name string,
    fn AgentFunc[State],
    opts ...AgentOption[State],
) *Agent[State]

// NewCustomAgent builds a custom agent without registering it. Register
// later with Agent.Register (which also registers companion actions).
func NewCustomAgent[State any](
    name string,
    fn AgentFunc[State],
    opts ...AgentOption[State],
) *Agent[State]
InlinePrompt & WithNamedPrompt
// InlinePrompt is the inline prompt definition passed positionally to
// DefineAgent: a list of prompt options, registered under the agent's name.
type InlinePrompt []ai.PromptOption

// WithNamedPrompt points a DefinePromptAgent at the prompt registered under
// name, rendered with input each turn (pass nil for the prompt's own
// default). name need not match the agent's, so one prompt can back many
// agents with different inputs. Without it, DefinePromptAgent uses the prompt
// registered under the agent's own name. It is a PromptAgentOption, so it is
// accepted only by DefinePromptAgent; passing it to DefineAgent or
// DefineCustomAgent is a compile-time error.
func WithNamedPrompt[State any](name string, input any) PromptAgentOption[State]
AgentOption[State]
// AgentOption configures an agent at definition time. All three constructors
// accept the shared options below as a typed `...AgentOption[State]` variadic,
// so a State mismatch is a compile-time error. DefinePromptAgent widens its
// parameter to PromptAgentOption[State], which also admits WithNamedPrompt;
// every AgentOption is a PromptAgentOption, but not the reverse.

// WithSessionStore sets the store for persisting snapshots. The store must
// implement SnapshotReader and SnapshotWriter at minimum. Detach support
// also requires SnapshotSubscriber; detach attempts on a store that lacks it
// are rejected at runtime.
WithSessionStore[State](store SessionStore[State])

// WithStateTransform rewrites session state on its way out to a client
// (getSnapshot response, client-managed AgentOutput.State, and streamed
// CustomPatch diffs). Not applied to state persisted in the store or
// passed to the user fn. Typical use: PII redaction or secret stripping.
WithStateTransform[State](transform StateTransform[State])

// WithStreamTransform is the stream-side counterpart: it rewrites each
// AgentStreamChunk on its way to the client. A chunk carries no state type,
// so State cannot be inferred and is passed explicitly.
WithStreamTransform[State](transform StreamTransform)

// WithDescription sets a human-readable description, stored on the agent
// action's descriptor and surfaced in the Dev UI's action listing.
WithDescription[State](description string)

AgentFunc

// AgentFunc is the function signature for a custom agent. It streams
// output through resp and reads/mutates state through sess; mutating
// custom state via Session.UpdateCustom auto-streams a CustomPatch delta.
type AgentFunc[State any] = func(
    ctx context.Context,
    resp Responder,
    sess *SessionRunner[State],
) (*AgentResult, error)

Agent[State]

// Agent is a bidirectional streaming agent with automatic snapshot
// management. It implements api.BidiAction, so generic transports
// (e.g. genkit.Handler) accept it directly to serve one turn per request.

// Connect starts a multi-turn streaming invocation.
func (*Agent) Connect(ctx context.Context, opts ...InvocationOption[State]) (*AgentConnection[State], error)

// Run starts a single-turn invocation. In-band failures resolve as a
// failed AgentOutput; a rejected init payload errors.
func (*Agent) Run(ctx context.Context, input *AgentInput, opts ...InvocationOption[State]) (*AgentOutput[State], error)

// RunText is Run with a single user text message.
func (*Agent) RunText(ctx context.Context, text string, opts ...InvocationOption[State]) (*AgentOutput[State], error)

// Snapshot facade: GetSnapshot / GetLatestSnapshot apply WithStateTransform and
// read-time shaping (prefer them to a raw Store() read); AbortSnapshot flips a
// pending detached row to aborted. All three require a store.
func (*Agent) GetSnapshot(ctx context.Context, snapshotID string) (*SessionSnapshot[State], error)
func (*Agent) GetLatestSnapshot(ctx context.Context, sessionID string) (*SessionSnapshot[State], error)
func (*Agent) AbortSnapshot(ctx context.Context, snapshotID string) (SnapshotStatus, error)

// Register, Store, and companion-action accessors for serving/travel.
func (*Agent) Register(r api.Registry)
func (*Agent) Store() SessionStore[State] // raw backend; prefer the facade above
func (*Agent) GetSnapshotAction() api.Action   // nil when client-managed
func (*Agent) AbortSnapshotAction() api.Action // nil unless store is a SnapshotSubscriber
func (*Agent) Name() string
func (*Agent) Desc() api.ActionDesc
InvocationOption[State]

Configures an agent invocation (Connect, Run, or RunText). The choice must match the agent's state management at invocation time (see AgentInit below).

// WithSessionID resumes a server-managed conversation from its latest
// snapshot. Mutually exclusive with WithState; combinable with
// WithSnapshotID (the session ID is asserted against the snapshot).
WithSessionID[State](id string)

// WithSnapshotID loads state from a persisted snapshot by ID
// (server-managed). Mutually exclusive with WithState.
WithSnapshotID[State](id string)

// WithState sets the initial state for the invocation (client-managed).
// The conversation's identity rides inside it (SessionState.SessionID).
WithState[State](state *SessionState[State])

AgentConnection[State]

Breaking from Receive() does not cancel the connection, enabling multi-turn patterns. The connection tracks live custom state from the streamed patches.

// Send sends an AgentInput. Once the invocation has resolved (e.g. a
// failed turn ended it), Send and the helpers fail with
// core.ErrActionCompleted; the outcome is on Output.
func (*AgentConnection) Send(input *AgentInput) error
func (*AgentConnection) SendMessage(message *ai.Message) error
func (*AgentConnection) SendText(text string) error

// SendResume continues an interrupted generation. Construct parts with
// ai.ToolDef.RestartWith / ai.ToolDef.RespondWith.
func (*AgentConnection) SendResume(resume *ToolResume) error

// Detach hands the rest of the work to the server (see Background Agents).
// Requires a store that implements SnapshotSubscriber.
func (*AgentConnection) Detach() error

// Close signals no more inputs. Optional before Output.
func (*AgentConnection) Close() error

// Receive yields stream chunks; breaking does not cancel the connection.
// Each chunk's CustomPatch is applied to the tracked custom state first.
func (*AgentConnection) Receive() iter.Seq2[*AgentStreamChunk, error]

// Custom returns the live custom state tracked from streamed patches.
func (*AgentConnection) Custom() (State, error)

// Output closes the input side, drains, and blocks for finalization.
// Idempotent; in-band failures resolve rather than error.
func (*AgentConnection) Output() (*AgentOutput[State], error)

// Done is closed when the connection completes.
func (*AgentConnection) Done() <-chan struct{}

SessionRunner[State]

Extends Session[State] with turn management. Passed as the sess parameter to AgentFunc.

type SessionRunner[State any] struct {
    *Session[State]
    InputCh   <-chan *AgentInput  // per-turn inputs from the client (advanced use)
    TurnIndex int                 // zero-based index of the current turn (advanced use)
}

// Run loops over the input channel, calling fn for each turn (wrapped in a
// trace span; the input message is added to the session first). fn may
// return a *TurnResult to report how the turn ended. After a successful
// turn a snapshot is written and a TurnEnd chunk is sent. On a fn error,
// Run emits a failed TurnEnd (no snapshot), stops, and returns the error;
// the custom agent may recover or propagate it.
func (*SessionRunner) Run(ctx context.Context, fn func(ctx context.Context, input *AgentInput) (*TurnResult, error)) error

// Result builds an AgentResult from current session state (last message +
// all artifacts).
func (*SessionRunner) Result() *AgentResult

Session[State]

Thread-safe conversation state. Available via SessionRunner embedding or SessionFromContext.

// State returns a deep copy of the current state.
func (*Session) State() *SessionState[State]

// SessionID returns the stable conversation ID (settled before fn runs).
func (*Session) SessionID() string

// Conversation history
func (*Session) Messages() []*ai.Message
func (*Session) AddMessages(messages ...*ai.Message)
func (*Session) SetMessages(messages []*ai.Message)
func (*Session) UpdateMessages(fn func([]*ai.Message) []*ai.Message)

// Custom state (UpdateCustom auto-streams a CustomPatch delta)
func (*Session) Custom() State
func (*Session) UpdateCustom(fn func(State) State)

// Artifacts
func (*Session) Artifacts() []*Artifact
func (*Session) AddArtifacts(artifacts ...*Artifact)
func (*Session) UpdateArtifacts(fn func([]*Artifact) []*Artifact)

// Context helpers
func NewSessionContext[State](ctx, *Session[State]) context.Context
func SessionFromContext[State](ctx) *Session[State]

Responder

Output channel with convenience methods. Artifacts sent here are added to the session synchronously (the side effect lands before Send returns); the wire forward is suppressed after detach.

// SendModelChunk sends a generation chunk (token-level streaming).
func (Responder) SendModelChunk(chunk *ai.ModelResponseChunk)

// SendArtifact sends an artifact and adds it to the session (replacing any
// same-named artifact).
func (Responder) SendArtifact(artifact *Artifact)

(Custom-state updates are not sent through Responder; mutate state with Session.UpdateCustom and the runtime emits the CustomPatch chunk for you.)


Wire Types

// AgentInit is the input for starting an agent invocation. SessionID,
// SnapshotID, and State are competing conversation sources; which are
// valid depends on the agent's state management:
//   - Server-managed (SessionStore configured): SessionID (resume latest),
//     SnapshotID (resume a specific snapshot), or both (asserted); State
//     is rejected with FAILED_PRECONDITION.
//   - Client-managed (no store): State (identity rides inside it);
//     SessionID/SnapshotID are rejected with FAILED_PRECONDITION.
// State together with SessionID/SnapshotID is rejected with
// INVALID_ARGUMENT. No fields starts a fresh invocation.
type AgentInit[State any] struct {
    SessionID  string               `json:"sessionId,omitempty"`
    SnapshotID string               `json:"snapshotId,omitempty"`
    State      *SessionState[State] `json:"state,omitempty"`
}

// AgentInput is the input sent to an agent during a conversation turn.
type AgentInput struct {
    Detach  bool        `json:"detach,omitempty"`  // disconnect; server continues in background
    Message *ai.Message `json:"message,omitempty"` // user's input for this turn
    Resume  *ToolResume `json:"resume,omitempty"`  // resume parts for an interrupted generation
}

// ToolResume holds the parts that resume an interrupted agent turn.
// Mirrors ai.GenerateActionResume.
type ToolResume struct {
    Respond []*ai.Part `json:"respond,omitempty"` // tool response parts
    Restart []*ai.Part `json:"restart,omitempty"` // tool request parts to restart
}

// AgentStreamChunk represents a single item in the agent's output stream.
// Multiple fields can be populated in a single chunk.
type AgentStreamChunk struct {
    ModelChunk  *ai.ModelResponseChunk `json:"modelChunk,omitempty"`  // token-level streaming
    CustomPatch JSONPatch              `json:"customPatch,omitempty"` // RFC 6902 delta on custom state
    Artifact    *Artifact              `json:"artifact,omitempty"`    // newly produced artifact
    TurnEnd     *TurnEnd               `json:"turnEnd,omitempty"`     // non-nil signals turn complete
}

// TurnEnd groups the signals emitted when an agent turn finishes. Emitted
// exactly once per turn.
type TurnEnd struct {
    SnapshotID   string            `json:"snapshotId,omitempty"`   // turn-end snapshot ID (empty if none)
    FinishReason AgentFinishReason `json:"finishReason,omitempty"` // how the turn ended
}

// AgentOutput is the output when an agent invocation completes. It wraps
// AgentResult with framework-managed fields.
type AgentOutput[State any] struct {
    Artifacts    []*Artifact          `json:"artifacts,omitempty"`
    Message      *ai.Message          `json:"message,omitempty"`
    SessionID    string               `json:"sessionId,omitempty"`    // stable conversation ID
    SnapshotID   string               `json:"snapshotId,omitempty"`   // most recent turn-end snapshot (or pending on detach)
    State        *SessionState[State] `json:"state,omitempty"`        // final state (client-managed only)
    FinishReason AgentFinishReason    `json:"finishReason,omitempty"` // detached / failed / last turn's reason
    Error        *core.GenkitError    `json:"error,omitempty"`        // populated when FinishReason == failed
}

// AgentResult is the return value from an AgentFunc.
type AgentResult struct {
    Artifacts    []*Artifact       `json:"artifacts,omitempty"`
    Message      *ai.Message       `json:"message,omitempty"`
    FinishReason AgentFinishReason `json:"finishReason,omitempty"` // override the default (last turn's reason)
}

// AgentFinishReason is why a turn or invocation finished. The first group
// mirrors ai.FinishReason (forwarded verbatim from a single generate call);
// the rest are agent-specific.
type AgentFinishReason string
const (
    AgentFinishReasonStop        AgentFinishReason = "stop"
    AgentFinishReasonLength      AgentFinishReason = "length"
    AgentFinishReasonBlocked     AgentFinishReason = "blocked"
    AgentFinishReasonInterrupted AgentFinishReason = "interrupted"
    AgentFinishReasonOther       AgentFinishReason = "other"
    AgentFinishReasonUnknown     AgentFinishReason = "unknown"
    AgentFinishReasonAborted     AgentFinishReason = "aborted"   // aborted (e.g. via abortSnapshot)
    AgentFinishReasonDetached    AgentFinishReason = "detached"  // moved to the background
    AgentFinishReasonFailed      AgentFinishReason = "failed"    // terminated with an error
)

// SessionState is the portable conversation state that flows between client
// and server.
type SessionState[State any] struct {
    SessionID string        `json:"sessionId,omitempty"` // framework-owned conversation ID
    Messages  []*ai.Message `json:"messages,omitempty"`  // history (excludes prompt-rendered messages)
    Custom    State         `json:"custom,omitempty"`    // user-defined state
    Artifacts []*Artifact   `json:"artifacts,omitempty"` // named artifact collections
}

// Artifact represents a named collection of parts produced during a session.
type Artifact struct {
    Name     string         `json:"name,omitempty"`
    Parts    []*ai.Part     `json:"parts"`
    Metadata map[string]any `json:"metadata,omitempty"`
}

Snapshot System

Snapshots are written at the end of each successful turn, and that is the only routine persistence point. A failed turn writes nothing (its partial state is not a resume point), so the newest snapshot is always the last successful turn:

  • A synchronous invocation's AgentOutput.SnapshotID is the last turn-end snapshot. State a custom agent mutates after its turn loop rides on the returned output but is not persisted.
  • A failed invocation reports the last turn-end snapshot (server-managed) or the last-good state inline (client-managed) as its resume point.
  • A detached invocation writes one pending row up front and rewrites it in place when the background work finishes.

Store interfaces split by capability. The minimum to use WithSessionStore is reader + writer; detach support layers on SnapshotSubscriber:

// SnapshotReader retrieves snapshots. Required.
type SnapshotReader[State any] interface {
    // GetSnapshot retrieves a snapshot by ID. Returns nil if not found.
    GetSnapshot(ctx context.Context, snapshotID string) (*SessionSnapshot[State], error)

    // GetLatestSnapshot returns the session's most recently created snapshot
    // (greatest CreatedAt; break ties deterministically), whatever its status.
    // Returns nil for an unknown session. Backs WithSessionID resume and the
    // getSnapshot-by-session companion path. Implementable as one indexed query
    // (WHERE sessionId = ? ORDER BY createdAt DESC LIMIT 1). A later rewrite of
    // an older row (e.g. a detach finalize) never moves it ahead of a
    // newer-created sibling, since CreatedAt is preserved across rewrites.
    GetLatestSnapshot(ctx context.Context, sessionID string) (*SessionSnapshot[State], error)
}

// SnapshotWriter persists snapshots. Required.
type SnapshotWriter[State any] interface {
    // SaveSnapshot atomically reads the row at id (if any), applies fn, and
    // persists the result. The store owns only identity: SnapshotID (generated
    // if id is empty) and SessionID preservation across rewrites. The caller
    // (fn) owns the lifecycle timestamps and status - fn stamps CreatedAt and
    // UpdatedAt on a new row, advances UpdatedAt on a state-changing rewrite,
    // and preserves both on a non-state write (e.g. a heartbeat refresh, which
    // carries the row through unchanged but for HeartbeatAt). Status defaults to
    // SnapshotStatusCompleted if fn leaves it empty; callers writing a pending
    // row set it explicitly. fn must be a pure function of its input (stores may
    // retry it); returning (nil, nil) skips the write.
    SaveSnapshot(
        ctx context.Context,
        snapshotID string,
        fn func(existing *SessionSnapshot[State]) (*SessionSnapshot[State], error),
    ) (*SessionSnapshot[State], error)
}

// SessionStore is the minimum interface required by WithSessionStore.
type SessionStore[State any] interface {
    SnapshotReader[State]
    SnapshotWriter[State]
}

// SnapshotSubscriber is the optional capability that makes detached
// invocations abortable. Aborting itself is just a SaveSnapshot that flips a
// pending row to aborted (there is no dedicated abort method); this interface
// is what lets the runtime observe that flip and cancel the background work.
type SnapshotSubscriber interface {
    // OnSnapshotStatusChange yields the snapshot's status on subscription
    // and on every change until ctx is cancelled, so the runtime observes an
    // abort by push rather than polling.
    OnSnapshotStatusChange(ctx context.Context, snapshotID string) <-chan SnapshotStatus
}

The atomic SaveSnapshot shape composes cleanly with SQL (SELECT FOR UPDATE in a txn), Firestore (RunTransaction(fn)), DynamoDB (optimistic concurrency on a version attribute), or any K/V store with a CAS primitive. It also folds the finalize path's read-then-rewrite into one step so a late abort cannot be clobbered by the terminal write.

Local-development stores live in ai/exp/localstore:

// In-memory, thread-safe; state lost on exit. Implements SessionStore +
// SnapshotSubscriber.
func localstore.NewInMemorySessionStore[State any]() *InMemorySessionStore[State]

// Snapshots as JSON files (one per snapshot) under dir, created 0700.
// Single-process; implements SessionStore + SnapshotSubscriber.
func localstore.NewFileSessionStore[State any](dir string) (*FileSessionStore[State], error)
// SnapshotStatus is the lifecycle state of a snapshot.
type SnapshotStatus string
const (
    SnapshotStatusPending   SnapshotStatus = "pending"   // detached invocation still running
    SnapshotStatusCompleted SnapshotStatus = "completed" // settled state (empty value also treated as completed)
    SnapshotStatusAborted   SnapshotStatus = "aborted"   // pending row flipped to aborted via SaveSnapshot
    SnapshotStatusFailed    SnapshotStatus = "failed"    // invocation failed; Error is populated
    SnapshotStatusExpired   SnapshotStatus = "expired"   // computed on read: pending row whose heartbeat went stale (never persisted)
)

// SessionSnapshot is a persisted point-in-time capture of session state.
// Shared schema: defined in genkit-tools/common/src/types/agent.ts and
// generated into Go (with Error mapped to *core.GenkitError) and Python.
type SessionSnapshot[State any] struct {
    SnapshotID   string               `json:"snapshotId"`
    SessionID    string               `json:"sessionId,omitempty"`    // conversation chain ID
    ParentID     string               `json:"parentId,omitempty"`     // previous snapshot (informational lineage)
    CreatedAt    time.Time            `json:"createdAt"`
    UpdatedAt    time.Time            `json:"updatedAt,omitempty"`    // advanced on state-changing rewrites; equals CreatedAt until then
    HeartbeatAt  *time.Time           `json:"heartbeatAt,omitempty"`  // refreshed while a detached turn is in flight; nil if never detached
    Status       SnapshotStatus       `json:"status,omitempty"`       // empty treated as Completed for back-compat
    FinishReason AgentFinishReason    `json:"finishReason,omitempty"` // how the captured turn ended
    Error        *core.GenkitError    `json:"error,omitempty"`        // populated when Status=Failed
    State        *SessionState[State] `json:"state,omitempty"`        // nil on pending rows; populated on terminal rewrite
}

// StateTransform rewrites session state on its way out to a client (getSnapshot
// responses, client-managed AgentOutput.State, and streamed CustomPatch diffs).
// Not applied to state persisted in the store or passed to the user fn. ctx
// carries the caller's identity (RBAC-aware redaction), trace, and cancellation.
// state is a fresh deep copy the transform owns: mutate in place and return it,
// return a new pointer, return nil to omit state, or return a non-nil error to
// fail closed (the read or invocation fails with that error instead of leaking).
type StateTransform[State any] = func(ctx context.Context, state *SessionState[State]) (*SessionState[State], error)

// StreamTransform is the stream-side counterpart: it rewrites each
// AgentStreamChunk on its way to the client (a nil chunk drops it from the
// stream; a non-nil error fails the invocation closed). Not parameterized by
// State, since a chunk carries no state type.
type StreamTransform = func(ctx context.Context, chunk *AgentStreamChunk) (*AgentStreamChunk, error)

ParentID chains each snapshot off the previous one as informational lineage for debugging and UI history trees; it plays no part in resolving a session's latest snapshot (that is a plain max-CreatedAt lookup, so a forked history resolves to its most recently created branch).

Custom-state streaming protocol

Custom-state deltas use a self-contained RFC 6902 JSON Patch implementation, exported for clients:

// Diff computes a patch transforming `from` into `to`. Pointers are rooted
// at the document (e.g. "/agentStatus"); only add/remove/replace are
// emitted; object keys are visited in sorted order (deterministic). A root
// change that cannot be expressed member-by-member collapses to a single
// whole-document replace at path "".
func Diff(from, to any) JSONPatch

// ApplyPatch applies a patch and returns the new value (input not mutated).
// Lenient by design to keep streaming robust; agrees with the JS reference
// applier on every patch Diff produces.
func ApplyPatch(document any, patch JSONPatch) (any, error)

Companion Actions

An agent configured with a SessionStore auto-registers companion actions so Dev UI and non-Go clients can observe and control invocations without reaching into the store directly. Client-managed agents (no store) register neither. Local Go callers use the agent facade (agent.GetSnapshot / agent.AbortSnapshot), which applies the state transform; agent.Store() is raw access.

// Registered under action type "agent-snapshot", action name = agent name.
// Registered when a SessionStore is configured. Fetches by snapshot ID or
// (alone) the session's latest snapshot; returns the stored SessionSnapshot
// directly, with WithStateTransform applied to its state.
type GetSnapshotRequest struct {
    SnapshotID string `json:"snapshotId,omitempty"` // optional when SessionID is given
    SessionID  string `json:"sessionId,omitempty"`  // optional when SnapshotID is given
}
// → *SessionSnapshot[State]

// Registered under action type "agent-abort", action name = agent name.
// Registered only when the store also implements SnapshotSubscriber, so the
// reflected surface matches actual capabilities.
type AbortSnapshotRequest struct {
    SnapshotID string `json:"snapshotId"`
}
type AbortSnapshotResponse struct {
    Status SnapshotStatus `json:"status,omitempty"` // aborted on success; existing terminal status otherwise
}

Each agent's own action descriptor (registered as ActionTypeAgent = "agent") also carries an AgentMetadata value under metadata["agent"] so reflective callers can shape the surface (e.g. hide the Abort button when the store cannot support it) without round-tripping through the reflection API:

type AgentMetadata struct {
    StateManagement AgentStateManagement `json:"stateManagement,omitempty"` // "server" or "client"
    Abortable       bool                 `json:"abortable,omitempty"`       // true iff store implements SnapshotSubscriber
    StateSchema     map[string]any       `json:"stateSchema,omitempty"`     // JSON schema of the custom State type
}

type AgentStateManagement string
const (
    AgentStateManagementServer AgentStateManagement = "server" // a SessionStore is configured
    AgentStateManagementClient AgentStateManagement = "client" // no store; state round-trips through init/output
)

AgentMetadata and the snapshot/finish-reason schemas are defined once in genkit-tools/common/src/types/agent.ts and shared across language clients, generated into go/ai/exp/gen.go. The companion-action request/response types that take a Go type parameter are hand-written in Go.


Supporting core additions

Two go/core changes the design relies on:

// WithFlowContext attaches flow-context metadata so core.Run and
// core.FlowNameFromContext resolve from inside a custom flow-like action
// wired via core.NewBidiAction / core.DefineBidiAction. Agents take that
// path so the agent metadata can be set on the action descriptor at
// construction time (actions are immutable once registered).
func WithFlowContext(ctx context.Context, flowName string) context.Context

BidiConnection.Output() gains a fast path that returns the already-settled output even when the connection context was cancelled concurrently. This is what makes conn.Output() after Detach() return the pending snapshot ID instead of ctx.Err() on the client side when the server closes the wire first.


Known Issues

  • Empty trace on zero-turn connections: Connect creates a trace span immediately when the connection is established. If the connection is closed without sending any messages (zero turns), an empty single-span trace is still emitted. This is cosmetic. A future change could defer span creation until the first input arrives.
  • Missing Stream() and StreamText() convenience methods: Agent has Run() and RunText() for single-turn non-streaming usage, but lacks corresponding single-turn streaming helpers. Today single-turn streaming uses Connect directly (open, send, iterate Receive, Output).
  • Incomplete traces on client cancellation: When the client cancels the context mid-invocation (e.g. Ctrl+C during an active bidi connection), the trace span wrapping the action may not finalize cleanly before the dev server connection is torn down. This is OTel exporter behavior surfaced by ctx cancellation, not specific to the agent API.

@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @apascal07, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Genkit Go SDK by introducing a new SessionFlow API, specifically designed for building and managing complex, stateful, multi-turn conversational AI applications. This new API is built upon a fundamental refactoring of the core Action type to support bidirectional streaming, enabling more dynamic and interactive AI experiences. The changes provide a structured approach to handling conversational state, including message history, custom data, and generated artifacts, with built-in mechanisms for persistence and lifecycle management.

Highlights

  • Introduction of SessionFlow API: A new SessionFlow API has been introduced in go/ai/x to manage stateful, multi-turn conversational AI interactions, including automatic snapshot management and artifact handling.
  • Core Action Refactoring for Bidirectional Streaming: The core ActionDef type has been refactored to Action to natively support bidirectional streaming, which is a foundational change enabling the SessionFlow API. This includes new BidiFunc and BidiConnection types.
  • Update of Existing AI Components: All existing AI components (embedder, evaluator, model, prompt, resource, retriever) have been updated to utilize the new core.Action type and its extended capabilities.
  • Snapshot Management Features: The SessionFlow now includes robust snapshot management, allowing state to be persisted, loaded, and controlled via callbacks, with an in-memory store implementation provided.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • go/ai/embedder.go
    • Updated embedder struct to use core.Action instead of core.ActionDef.
    • Modified NewEmbedder and LookupEmbedder to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/evaluator.go
    • Updated evaluator struct to use core.Action instead of core.ActionDef.
    • Modified NewEvaluator, NewBatchEvaluator, and LookupEvaluator to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/generate.go
    • Updated model and generateAction structs to use core.Action instead of core.ActionDef.
    • Modified LookupModel, model.Generate, and model.supportsConstrained to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/prompt.go
    • Updated prompt struct to use core.Action instead of core.ActionDef.
    • Modified DefinePrompt, LookupPrompt, and prompt.Desc to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/resource.go
    • Updated resource struct to use core.Action instead of core.ActionDef.
    • Modified DefineResource, NewResource, FindMatchingResource, and LookupResource to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/retriever.go
    • Updated retriever struct to use core.Action instead of core.ActionDef.
    • Modified NewRetriever and LookupRetriever to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/x/option.go
    • Added SessionFlowOption and StreamBidiOption interfaces for configuring session flows and bidirectional streams.
    • Introduced WithSnapshotStore, WithSnapshotCallback, WithState, and WithSnapshotID functions for flexible session flow initialization.
  • go/ai/x/session_flow.go
    • Introduced SessionFlowArtifact, SessionFlowInput, SessionFlowInit, SessionFlowOutput, and SessionFlowStreamChunk data structures.
    • Defined the Session type for managing conversational state (messages, custom state, artifacts) with methods for manipulation and snapshot handling.
    • Implemented Responder for sending various types of stream chunks (generation, status, artifacts).
    • Introduced the SessionFlow type, DefineSessionFlow for registration, and StreamBidi for initiating sessions.
    • Added logic for snapshot creation, loading, and integration with tracing, including SessionFlowConnection for buffered chunk reception.
  • go/ai/x/session_flow_test.go
    • Added comprehensive unit tests for SessionFlow covering multi-turn interactions, snapshot persistence, resuming from snapshots, client-managed state, artifact handling, snapshot callbacks, and error handling.
  • go/ai/x/snapshot.go
    • Defined SessionState for portable conversation state, SnapshotEvent for trigger types, and SessionSnapshot for persisted state.
    • Introduced SnapshotContext and SnapshotCallback for custom snapshot logic.
    • Defined SnapshotStore interface and provided an InMemorySnapshotStore implementation.
    • Added SnapshotOn utility function for selective snapshotting.
  • go/core/action.go
    • Refactored ActionDef to Action and added a new Init type parameter for bidirectional actions.
    • Introduced BidiFunc for bidirectional streaming function signatures and ActionOptions for configuration.
    • Implemented NewBidiAction and DefineBidiAction for creating and registering bidirectional actions.
    • Added StreamBidi method to Action for initiating bidirectional connections.
    • Introduced BidiConnection type for managing bidirectional streaming, including Send, Close, Receive, Output, and Done methods.
    • Updated ResolveActionFor and LookupActionFor to use the new Action type and Init parameter.
    • Added wrapBidiAsStreaming to adapt BidiFunc to StreamingFunc.
  • go/core/action_test.go
    • Updated existing tests to use DefineStreamingAction and the new Init type parameter in ResolveActionFor and LookupActionFor.
    • Added new tests for BidiAction functionality, covering echo, initialization, send after close, context cancellation, and Done channel.
  • go/core/api/action.go
    • Added new ActionType constants: ActionTypeSessionFlow and ActionTypeSnapshotStore.
    • Extended ActionDesc with StreamSchema and InitSchema fields for describing bidirectional actions.
  • go/core/background_action.go
    • Updated BackgroundActionDef to use core.Action instead of core.ActionDef.
    • Modified Register, NewBackgroundAction, and LookupBackgroundAction to align with the new core.Action type and updated ResolveActionFor signature.
  • go/core/flow.go
    • Refactored Flow to be a struct embedding *Action with the new Init type parameter.
    • Updated DefineFlow and DefineStreamingFlow to use the new Flow struct and Action type.
    • Introduced NewBidiFlow and DefineBidiFlow for creating and registering bidirectional flows.
    • Updated Run and Stream methods to use the embedded Action's Run method.
  • go/core/flow_test.go
    • Updated existing tests to reflect changes in Flow type and method calls.
    • Added new tests for BidiFlow functionality, including registration, echo, and integration with core.Run.
  • go/genkit/genkit.go
    • Updated DefineFlow and DefineStreamingFlow signatures to include the new Init type parameter.
    • Added DefineBidiFlow function to expose the new bidirectional flow definition.
  • go/genkit/session_flow.go
    • Introduced DefineSessionFlow as a top-level Genkit function to define and register session flows, wrapping aix.DefineSessionFlow.
  • go/samples/basic-session-flow/main.go
    • Added a sample CLI REPL application demonstrating the usage of SessionFlow for multi-turn conversations with token-level streaming and snapshot management.
Activity
  • The pull request author, apascal07, has indicated that the PR title adheres to conventional commits.
  • The author has confirmed that the changes have been manually and unit tested.
  • Documentation updates are noted as pending in the PR description.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@apascal07 apascal07 changed the base branch from main to ap/go-bidi February 6, 2026 05:48

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is a great pull request that introduces the SessionFlow feature and refactors the core Action type to support bidirectional streaming. The new functionality is well-structured, comes with comprehensive tests, and includes a helpful sample application. I've identified a critical race condition in the new BidiConnection implementation and a minor issue in the sample code that would prevent it from running. My comments provide suggestions to address these points.

I am having trouble creating individual review comments. Click here to see my feedback.

go/core/action.go (534-550)

high

This Send implementation has a race condition that can cause a panic. The mutex is unlocked on line 540 before the channel send on line 543. If another goroutine calls Close() in between, c.inputCh will be closed, and the send will panic.

A robust way to fix this is to use recover to handle the "send on closed channel" panic, which is a common pattern in Go for this scenario. This avoids holding a lock over a potentially blocking operation.

Here's a suggested safer implementation for Send that removes the racy mutex usage. The Close method's use of the mutex remains important to make it safe for concurrent calls.

func (c *BidiConnection[In, Out, Stream]) Send(input In) (err error) {
	defer func() {
		if r := recover(); r != nil {
			// This recovers from a panic that occurs when sending on a closed channel.
			err = NewError(FAILED_PRECONDITION, "connection is closed")
		}
	}()

	select {
	case c.inputCh <- input:
		return nil
	case <-c.ctx.Done():
		return c.ctx.Err()
	case <-c.doneCh:
		// The recover will handle a panic if doneCh and inputCh close concurrently.
		return NewError(FAILED_PRECONDITION, "action has completed")
	}
}

go/samples/basic-session-flow/main.go (49-53)

medium

The model name googleai/gemini-3-flash-preview appears to be incorrect and will likely cause the sample to fail at runtime. Please use a valid model name, for example googleai/gemini-1.5-flash-latest.

					ai.WithModel(googlegenai.ModelRef("googleai/gemini-1.5-flash-latest", &genai.GenerateContentConfig{
						ThinkingConfig: &genai.ThinkingConfig{
							ThinkingBudget: genai.Ptr[int32](0),
						},
					})),

@apascal07 apascal07 changed the title feat(go): added SessionFlow and related feat(go): added DefineSessionFlow Feb 6, 2026
@apascal07 apascal07 mentioned this pull request Feb 6, 2026
@apascal07 apascal07 linked an issue Feb 6, 2026 that may be closed by this pull request
@apascal07 apascal07 changed the title feat(go): added DefineSessionFlow feat(go): added DefineCustomAgent and DefinePromptAgent Feb 18, 2026
Comment thread go/genkit/exp/routes.go
Comment thread go/README.md
Add two construction options to localstore.FileSessionStore, mirroring
the JS FileSessionStore (PR #5251):

- WithMaxPersistedChainLength(n): on each save, walk the new snapshot's
  parentId chain and unlink rows past the newest n, capping
  per-conversation disk use. n must be >= 1; 0 or negative is rejected.
- WithSnapshotPathPrefix(fn): derive a per-call subdirectory from
  context for tenant isolation. The prefix may nest via "/" and is
  sanitized against directory escape.

Snapshots remain flat by id within the prefix
(<dir>/<prefix>/<snapshotId>.json), so resume-by-snapshot, heartbeat,
finalize, abort, and status subscription stay O(1) direct opens, while
GetLatestSnapshot scans the prefix directory. With no options configured
the on-disk layout is unchanged.

Options follow the ai/option.go interface pattern (in a new option.go)
and error if set more than once.
…tore

FileSessionStore.OnSnapshotStatusChange previously reflected only status
changes written through the same store instance, so a detached turn running
in one process could not be aborted by another process sharing the
directory.

Add an internal poller that re-reads subscribed snapshot files on an
interval (default 1s, configurable via WithPollInterval; <=0 disables it)
and delivers any change. The instant in-process fast path is kept for
same-process delivery; both paths funnel through a single per-snapshot
dedup gate, so a change is delivered exactly once regardless of which
observes it first. The poller is started lazily on the first subscriber and
stopped when the last unsubscribes, so a store with no subscriptions pays
nothing.

Polling (rather than fsnotify) matches the SnapshotSubscriber contract,
adds no dependency, and is immune to the temp-file+rename inode swap.
…pt/NamedPrompt

Replace the FromInline/FromPrompt agent-source constructors with three
clearer forms:

  - InlinePrompt(opts...)     defines the prompt inline (was FromInline)
  - SameNamedPrompt()         references the prompt named like the agent
  - NamedPrompt(name, input)  references any registered prompt by name,
                              rendered with an input supplied from code

NamedPrompt decouples the prompt's lookup name from the agent's name, so a
single prompt can back many agents with different inputs. The old
FromPrompt(defaultInput...) variadic-of-one is gone; per-turn input now
rides on NamedPrompt.

Updates the wrapper docs, both samples (chef's personality moves to the
.prompt frontmatter default), the README, and the tests, and adds
TestPromptAgent_NamedPromptSharedAcrossAgents covering the shared-prompt
path. Breaking change to the experimental ai/exp API.
Split the prompt-backed agent constructors so each has one clear job:

  - DefineAgent(r, name, prompt, opts...)  defines the prompt inline; the
                                           prompt is an InlinePrompt, a
                                           []ai.PromptOption slice passed
                                           positionally
  - DefinePromptAgent(r, name, opts...)    sources a prompt from the registry,
                                           defaulting to the agent's own name

DefinePromptAgent uses the same-named prompt by default; WithNamedPrompt(name,
input) points it at a different registered prompt rendered with a code-supplied
input, so one prompt can back many agents.

The prompt source is split across a compile-time-validated option set mirroring
ai/option.go: the shared options (WithSessionStore, WithStateTransform,
WithDescription) are AgentOptions valid on every constructor, while
WithNamedPrompt is a PromptAgentOption accepted only by DefinePromptAgent.
Passing it to DefineAgent or DefineCustomAgent fails to compile.

Making the inline prompt a required positional argument means an inline agent
cannot be defined without one. Removes the AgentSource abstraction and the
SameNamedPrompt/NamedPrompt sources.

Updates the wrapper docs, both samples, and the tests. Breaking change to the
experimental ai/exp API.
@apascal07 apascal07 changed the title feat(go/ai/exp): add DefineAgent and DefineCustomAgent feat(go/ai/exp): add DefineAgent, DefinePromptAgent, and DefineCustomAgent Jun 23, 2026
The "Load the Prompt from a File" path now uses DefinePromptAgent (default
same-named lookup) with WithNamedPrompt for shared prompts, and the inline
example uses the InlinePrompt slice literal.
…pans

Each runTurn-N span now records the committed session state at turn end as
its genkit:output, shaped as {state: <session state>}, and for
server-managed agents carries the turn-end snapshot's ID under
genkit:metadata:agent:snapshotId. The session ID stays on the root action
span as before.

The state is raw: StateTransform shapes only client-facing surfaces, not
telemetry or persisted state, so the span output matches the snapshot its
ID points to.

With the turn span output now derived from session state, the per-turn
chunk collection is gone: removed SessionRunner.collectTurnOutput,
chunkRouter.collectTurnChunks and its turnChunks/turnMu fields, and the
accumulation branch in applySideEffects (now artifact-only).
…rror

WithStreamTransform is the stream-side counterpart to WithStateTransform, rewriting each AgentStreamChunk on its way to the client. Both StateTransform and StreamTransform now return (value, error): a nil value omits the state or drops the chunk (wire-only), while a non-nil error (or a panic) fails the read or invocation closed with the transform's status preserved. Updates go/README.md and the genkit.DefineAgent option docs.
The abortSnapshot companion action's response carried only status, so a caller could not correlate the result with the snapshot it aborted. Add snapshotId (matching the abortSnapshot request) and populate it from the request.

Authored in the shared zod schema (genkit-tools/common/src/types/agent.ts) and regenerated across genkit-schema.json, the Go bindings (go/ai/exp/gen.go), and the Python typings (_typing.py); the Go doc and noomitempty come from schemas.config.
The operation aborts a turn by marking its pending snapshot aborted for the
active turn to observe; it does not abort a snapshot. Name it for that
turn-level intent.

- Agent.AbortSnapshot -> Agent.Abort; AbortSnapshotAction -> AbortAction
- HTTP route POST /agents/{name}/abortSnapshot -> /agents/{name}/abort
- Wire types AbortSnapshotRequest/Response -> AgentAbortRequest/Response,
  regenerated into genkit-schema.json, go gen.go, and py _typing.py

The unexported abortPendingSnapshot helper keeps its name: it names the
mechanism (flip the pending snapshot to aborted), distinct from the
turn-level abort.

BREAKING CHANGE: the Agent.Abort* Go API, the /abort HTTP route, and the
AgentAbort{Request,Response} schema types (Go, Python, JS) replace the
former AbortSnapshot* names.
A per-turn handler could not learn its snapshot ID until after the turn (on the TurnEnd chunk), so durable agents had no way to name snapshot-correlated external resources (e.g. a git worktree) before doing the turn's work. Reserve the turn's snapshot ID at turn start (the runtime now mints it rather than the store) and surface it, the parent snapshot ID, and the turn index on a TurnContext. The turn-end snapshot persists under that reserved ID, so the ID the handler reads up front is the ID the snapshot lands under.

The TurnContext rides on the per-turn fn's context.Context (TurnContextFromContext) rather than the SessionRunner.Run callback signature, so existing custom agents compile unchanged. Empty SnapshotID for client-managed agents and for turns that write no snapshot. The detach pending row stays store-minted.
Commit 8756d03 bumped the new-branch generated files (core/gen.go,
ai/exp/gen.go) to a 2026 copyright header, but jsonschemagen still emitted
2025. check-generated-go.sh regenerates and fails on any diff, so the two
hand-edited generated files made the freshness check go red.

Bump the generator's license template (and the jsonschemagen golden) to
2026 and regenerate, so generated output matches the committed files.
ai/gen.go and genkit/gen.go move 2025 -> 2026 as well, keeping every
generated gen.go on a single, consistent year.
Base automatically changed from ap/go-bidi to main June 24, 2026 16:27
@apascal07 apascal07 requested a review from a team as a code owner June 24, 2026 16:27
Brings in main's JS agents middleware (#5252) and agents testapp, plus
the bidi feature merged to main as #4387. That PR was an earlier snapshot
of the bidi work this branch already carries in more refined form, so the
Go bidi surface was resolved in this branch's favor.

Conflict resolutions (all Go):
- core/bidi.go, core/bidi_test.go: kept ours (strict superset of #4387).
  Preserves one-shot input rejection, validate-and-normalize init,
  nil-init handling, and the exported ErrConnectionClosed /
  ErrActionCompleted sentinels. Our test file already contains every
  #4387 test plus the additional coverage for those refinements.
- core/action.go: kept our isNilValue helper (used by validateInit).
- core/api/action.go: kept our "input required" RunBidiJSON doc.
- genkit/servers.go: kept our applyContextProviders helper, which is
  semantically equivalent to main's inline context-provider loop.
- genkit/servers_test.go: kept our superset (all of #4387's handler
  tests plus TestHandlerAgent / TestHandlerAgentRef).
- genkit/reflection_test.go: kept our subtest comment; channel setup
  code is identical on both sides.

This branch's rename of the INTERNAL status value ("INTERNAL_SERVER_ERROR"
to "INTERNAL") is preserved; #4387's stale test expectation was dropped
with its test file.

Verified: go build ./..., go vet ./..., go test -race ./core/ ./genkit/
./ai/exp/..., and a GOMAXPROCS=1 bidi stress run all pass.
@apascal07 apascal07 merged commit b91ad90 into main Jun 24, 2026
26 checks passed
@apascal07 apascal07 deleted the ap/go-session-flow branch June 24, 2026 17:00
apascal07 added a commit that referenced this pull request Jun 24, 2026
Bring in the mainline agent work that landed on main, most notably
#4462 (DefineAgent/DefinePromptAgent/DefineCustomAgent), #4387
(bidirectional streaming actions), and the JS agent/middleware suite.

The go/ai/exp agent implementation on main shares this branch's lineage
(the files diverged only because ai/x -> ai/exp moves make them add/add)
and is a strict superset: it already absorbed this branch's fixes
(parallel tool-response ordering, ValidateResumeAgainstHistory, the
inputCh/turnIndex unexport) and adds heartbeats, TurnContext, and the
GetSnapshot/GetLatestSnapshot/Abort facade methods on top. Every
conflicted implementation file is therefore resolved to main's version;
go/core/x/session (removed on main) and the old source.go (replaced by
inline.go) are dropped.

This branch's unique contribution, the agent conformance harness
(go/ai/exp/agents_conformance_test.go + tests/specs/agent.yaml), is
kept and adapted to main's renamed API:
  - exp.FromInline(...)      -> exp.InlinePrompt{...}
  - agent.StreamBidi(...)    -> agent.Connect(...)
  - store.AbortSnapshot(...) -> agent.Abort(...) (abort moved to the
                                Agent facade)

Conformance suite passes 40/40, green under -race and GOMAXPROCS=1.
apascal07 added a commit that referenced this pull request Jun 24, 2026
PR #4462 squash-landed the agent foundation that this branch also builds, so most conflicts were add/add on identical content. Resolution:

- 23 add/add files differed only by copyright year (2025 vs 2026); took main's 2026 copies.
- go/ai/generate.go: kept the branch's nil-guard on Interrupts() and the new ToolResponses() method (pure additions; main had neither).
- go/genkit/exp/*, README, and basic-agents samples: kept the branch's design (experimental definers in genkit/exp; the banker tool-interrupt/resume demo).
- go/genkit/servers_test.go: dropped TestHandlerAgent/TestHandlerAgentRef; their coverage moved to go/genkit/exp/routes_test.go when DefineAgent left package genkit.

Verified: go build ./..., go vet ./..., and tests for go/genkit, go/genkit/exp, go/ai/exp, go/ai, go/core all pass.
apascal07 added a commit that referenced this pull request Jun 25, 2026
…ddleware

Resolve conflicts from main absorbing the agent foundation (#4462, #4797,
#4387, #5576) via separate squashed PRs:

- Adopt main's renames (AgentAbort{Request,Response}, Agent.Abort/AbortAction,
  abort routes) and its turn-context feature across go/ai/exp + schema.
- Preserve this branch's net-new middleware surface: ArtifactStore, AgentRef,
  WithContextFunc context seeding, and the orchestrator sample.

Per review direction, consolidate the agent constructors into the genkit/exp
(genkitx) surface that main migrated docs/samples/tests to:

- Move the genkit-instance seeding into genkitx.DefineAgent/DefinePromptAgent/
  DefineCustomAgent via a new genkitbridge.SeedContext hook, so middleware
  resolves sub-agents through the documented exp constructors.
- Remove the now-dead genkit.DefineAgent/DefinePromptAgent/DefineCustomAgent/
  ListAgents duplicates from package genkit.
- Reconcile the basic-agents sample to keep both the banker (interrupt/resume)
  and orchestrator (middleware delegation) demos on genkitx.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

docs Improvements or additions to documentation go js python Python tooling

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RFC: Session flows

4 participants