Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/opencode/src/project/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { Command } from "../command"
import { Instance } from "./instance"
import { Log } from "@/util/log"
import { ShareNext } from "@/share/share-next"
import { Session } from "../session"

export async function InstanceBootstrap() {
Log.Default.info("bootstrapping", { directory: Instance.directory })
Expand All @@ -22,6 +23,12 @@ export async function InstanceBootstrap() {
FileWatcher.init()
Vcs.init()
Snapshot.init()
await Session.recoverInterrupted().catch((error) => {
Log.Default.error("session recovery failed", {
error,
directory: Instance.directory,
})
})

Bus.subscribe(Command.Event.Executed, async (payload) => {
if (payload.properties.name === Command.Default.INIT) {
Expand Down
82 changes: 82 additions & 0 deletions packages/opencode/src/session/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,88 @@ export namespace Session {
},
)

export async function interruptAssistant(input: {
msg: MessageV2.Assistant
error: string
time?: number
}) {
const now = input.time ?? Date.now()
const done = !input.msg.time.completed
const list = (await MessageV2.parts(input.msg.id)).filter(
(part): part is MessageV2.ToolPart =>
part.type === "tool" && (part.state.status === "pending" || part.state.status === "running"),
)

await Promise.all(
list.map((part) =>
updatePart({
...part,
state: {
status: "error",
input: part.state.input,
error: input.error,
...(part.state.status === "running" && part.state.metadata ? { metadata: part.state.metadata } : {}),
time: {
start: part.state.status === "running" ? part.state.time.start : now,
end: now,
},
},
}),
),
)

if (done) {
input.msg.time.completed = now
}

if (list.length > 0 || done) {
await updateMessage(input.msg)
}

return list.length > 0 || done
}

export async function recoverInterrupted() {
const rows = Database.use((db) =>
db
.select({
id: MessageTable.id,
sessionID: MessageTable.session_id,
data: MessageTable.data,
})
.from(MessageTable)
.innerJoin(SessionTable, eq(SessionTable.id, MessageTable.session_id))
.where(eq(SessionTable.project_id, Instance.project.id))
.all(),
)

const list = rows.flatMap((row) => {
if (row.data.role !== "assistant") return []
const msg = row.data as MessageV2.Assistant
const completed = "completed" in msg.time ? msg.time.completed : undefined
if (completed != null) return []
return [{ ...msg, id: row.id, sessionID: row.sessionID } as MessageV2.Assistant]
})

const count = (await Promise.all(
list.map((msg) =>
interruptAssistant({
msg,
error: "Tool execution was interrupted (server restart)",
}),
),
)).filter(Boolean).length

if (count) {
log.info("recovered interrupted assistant messages", {
count,
projectID: Instance.project.id,
})
}

return count
}

export const getUsage = fn(
z.object({
model: z.custom<Provider.Model>(),
Expand Down
23 changes: 4 additions & 19 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -399,25 +399,10 @@ export namespace SessionProcessor {
}
snapshot = undefined
}
const p = await MessageV2.parts(input.assistantMessage.id)
for (const part of p) {
if (part.type === "tool" && part.state.status !== "completed" && part.state.status !== "error") {
await Session.updatePart({
...part,
state: {
...part.state,
status: "error",
error: "Tool execution aborted",
time: {
start: Date.now(),
end: Date.now(),
},
},
})
}
}
input.assistantMessage.time.completed = Date.now()
await Session.updateMessage(input.assistantMessage)
await Session.interruptAssistant({
msg: input.assistantMessage,
error: "Tool execution aborted",
})
if (needsCompaction) return "compact"
if (blocked) return "stop"
if (input.assistantMessage.error) return "stop"
Expand Down
Loading