fix(cli-run): rely on continuation markers for completion

Use hook-written continuation marker state to gate run completion checks and remove the noisy event-stream shutdown timeout log in run mode.
This commit is contained in:
YeonGyu-Kim 2026-02-17 17:50:47 +09:00
parent 706ee61333
commit d0bd24bede
19 changed files with 401 additions and 74 deletions

View File

@ -1,11 +1,22 @@
import pc from "picocolors"
import type { RunContext, Todo, ChildSession, SessionStatus } from "./types"
import { normalizeSDKResponse } from "../../shared"
import { getContinuationState } from "./continuation-state"
import {
getContinuationState,
type ContinuationState,
} from "./continuation-state"
export async function checkCompletionConditions(ctx: RunContext): Promise<boolean> {
try {
if (!await areAllTodosComplete(ctx)) {
const continuationState = getContinuationState(ctx.directory, ctx.sessionID)
if (continuationState.hasActiveHookMarker) {
const reason = continuationState.activeHookMarkerReason ?? "continuation hook is active"
console.log(pc.dim(` Waiting: ${reason}`))
return false
}
if (!continuationState.hasTodoHookMarker && !await areAllTodosComplete(ctx)) {
return false
}
@ -13,7 +24,7 @@ export async function checkCompletionConditions(ctx: RunContext): Promise<boolea
return false
}
if (!areContinuationHooksIdle(ctx)) {
if (!areContinuationHooksIdle(continuationState)) {
return false
}
@ -24,9 +35,7 @@ export async function checkCompletionConditions(ctx: RunContext): Promise<boolea
}
}
function areContinuationHooksIdle(ctx: RunContext): boolean {
const continuationState = getContinuationState(ctx.directory, ctx.sessionID)
function areContinuationHooksIdle(continuationState: ContinuationState): boolean {
if (continuationState.hasActiveBoulder) {
console.log(pc.dim(" Waiting: boulder continuation is active"))
return false

View File

@ -0,0 +1,54 @@
import { afterEach, describe, expect, it } from "bun:test"
import { mkdtempSync, rmSync } from "node:fs"
import { join } from "node:path"
import { tmpdir } from "node:os"
import { setContinuationMarkerSource } from "../../features/run-continuation-state"
import { getContinuationState } from "./continuation-state"
const tempDirs: string[] = []
function createTempDir(): string {
const directory = mkdtempSync(join(tmpdir(), "omo-run-cont-state-"))
tempDirs.push(directory)
return directory
}
afterEach(() => {
while (tempDirs.length > 0) {
const directory = tempDirs.pop()
if (directory) {
rmSync(directory, { recursive: true, force: true })
}
}
})
describe("getContinuationState marker integration", () => {
it("reports active marker state from continuation hooks", () => {
// given
const directory = createTempDir()
const sessionID = "ses_marker_active"
setContinuationMarkerSource(directory, sessionID, "todo", "active", "todos remaining")
// when
const state = getContinuationState(directory, sessionID)
// then
expect(state.hasActiveHookMarker).toBe(true)
expect(state.activeHookMarkerReason).toContain("todos")
})
it("does not report active marker when all sources are idle/stopped", () => {
// given
const directory = createTempDir()
const sessionID = "ses_marker_idle"
setContinuationMarkerSource(directory, sessionID, "todo", "idle")
setContinuationMarkerSource(directory, sessionID, "stop", "stopped")
// when
const state = getContinuationState(directory, sessionID)
// then
expect(state.hasActiveHookMarker).toBe(false)
expect(state.activeHookMarkerReason).toBeNull()
})
})

View File

@ -1,15 +1,30 @@
import { getPlanProgress, readBoulderState } from "../../features/boulder-state"
import {
getActiveContinuationMarkerReason,
isContinuationMarkerActive,
readContinuationMarker,
} from "../../features/run-continuation-state"
import { readState as readRalphLoopState } from "../../hooks/ralph-loop/storage"
export interface ContinuationState {
hasActiveBoulder: boolean
hasActiveRalphLoop: boolean
hasHookMarker: boolean
hasTodoHookMarker: boolean
hasActiveHookMarker: boolean
activeHookMarkerReason: string | null
}
export function getContinuationState(directory: string, sessionID: string): ContinuationState {
const marker = readContinuationMarker(directory, sessionID)
return {
hasActiveBoulder: hasActiveBoulderContinuation(directory, sessionID),
hasActiveRalphLoop: hasActiveRalphLoopContinuation(directory, sessionID),
hasHookMarker: marker !== null,
hasTodoHookMarker: marker?.sources.todo !== undefined,
hasActiveHookMarker: isContinuationMarkerActive(marker),
activeHookMarkerReason: getActiveContinuationMarkerReason(marker),
}
}

View File

@ -0,0 +1,7 @@
const isCI = Boolean(process.env.CI || process.env.GITHUB_ACTIONS)
export const displayChars = {
treeEnd: isCI ? "`-" : "└─",
treeIndent: " ",
treeJoin: isCI ? " " : " ",
} as const

View File

@ -13,6 +13,8 @@ import type {
} from "./types"
import type { EventState } from "./event-state"
import { serializeError } from "./event-formatting"
import { formatToolInputPreview } from "./tool-input-preview"
import { displayChars } from "./display-chars"
function getSessionId(props?: { sessionID?: string; sessionId?: string }): string | undefined {
return props?.sessionID ?? props?.sessionId
@ -74,6 +76,9 @@ export function handleMessagePartUpdated(ctx: RunContext, payload: EventPayload,
const infoSid = getInfoSessionId(props)
if ((partSid ?? infoSid) !== ctx.sessionID) return
const role = props?.info?.role ?? state.currentMessageRole
if (role === "user") return
const part = props?.part
if (!part) return
@ -101,19 +106,9 @@ function handleToolPart(
if (status === "running") {
state.currentTool = toolName
let inputPreview = ""
const input = part.state?.input
if (input) {
if (input.command) {
inputPreview = ` ${pc.dim(String(input.command).slice(0, 60))}`
} else if (input.pattern) {
inputPreview = ` ${pc.dim(String(input.pattern).slice(0, 40))}`
} else if (input.filePath) {
inputPreview = ` ${pc.dim(String(input.filePath))}`
} else if (input.query) {
inputPreview = ` ${pc.dim(String(input.query).slice(0, 40))}`
}
}
const inputPreview = part.state?.input
? formatToolInputPreview(part.state.input)
: ""
state.hasReceivedMeaningfulWork = true
process.stdout.write(`\n${pc.cyan(">")} ${pc.bold(toolName)}${inputPreview}\n`)
}
@ -124,7 +119,7 @@ function handleToolPart(
const preview = output.length > maxLen ? output.slice(0, maxLen) + "..." : output
if (preview.trim()) {
const lines = preview.split("\n").slice(0, 3)
process.stdout.write(pc.dim(` └─ ${lines.join("\n ")}\n`))
process.stdout.write(pc.dim(`${displayChars.treeIndent}${displayChars.treeEnd} ${lines.join(`\n${displayChars.treeJoin}`)}\n`))
}
state.currentTool = null
state.lastPartText = ""
@ -136,6 +131,8 @@ export function handleMessageUpdated(ctx: RunContext, payload: EventPayload, sta
const props = payload.properties as MessageUpdatedProps | undefined
if (getInfoSessionId(props) !== ctx.sessionID) return
state.currentMessageRole = props?.info?.role ?? null
if (props?.info?.role !== "assistant") return
state.hasReceivedMeaningfulWork = true
@ -167,20 +164,9 @@ export function handleToolExecute(ctx: RunContext, payload: EventPayload, state:
const toolName = props?.name || "unknown"
state.currentTool = toolName
let inputPreview = ""
if (props?.input) {
const input = props.input
if (input.command) {
inputPreview = ` ${pc.dim(String(input.command).slice(0, 60))}`
} else if (input.pattern) {
inputPreview = ` ${pc.dim(String(input.pattern).slice(0, 40))}`
} else if (input.filePath) {
inputPreview = ` ${pc.dim(String(input.filePath))}`
} else if (input.query) {
inputPreview = ` ${pc.dim(String(input.query).slice(0, 40))}`
}
}
const inputPreview = props?.input
? formatToolInputPreview(props.input)
: ""
state.hasReceivedMeaningfulWork = true
process.stdout.write(`\n${pc.cyan(">")} ${pc.bold(toolName)}${inputPreview}\n`)
@ -198,7 +184,7 @@ export function handleToolResult(ctx: RunContext, payload: EventPayload, state:
if (preview.trim()) {
const lines = preview.split("\n").slice(0, 3)
process.stdout.write(pc.dim(` └─ ${lines.join("\n ")}\n`))
process.stdout.write(pc.dim(`${displayChars.treeIndent}${displayChars.treeEnd} ${lines.join(`\n${displayChars.treeJoin}`)}\n`))
}
state.currentTool = null

View File

@ -13,6 +13,8 @@ export interface EventState {
currentAgent: string | null
/** Current model ID from the latest assistant message */
currentModel: string | null
/** Current message role (user/assistant) — used to filter user messages from display */
currentMessageRole: string | null
}
export function createEventState(): EventState {
@ -27,5 +29,6 @@ export function createEventState(): EventState {
messageCount: 0,
currentAgent: null,
currentModel: null,
currentMessageRole: null,
}
}

View File

@ -94,6 +94,7 @@ describe("pollForCompletion", () => {
const result = await pollForCompletion(ctx, eventState, abortController, {
pollIntervalMs: 10,
requiredConsecutive: 3,
minStabilizationMs: 500,
})
//#then - should be aborted, not completed (tool blocked exit)
@ -159,6 +160,7 @@ describe("pollForCompletion", () => {
const result = await pollForCompletion(ctx, eventState, abortController, {
pollIntervalMs: 10,
requiredConsecutive: 3,
minStabilizationMs: 500,
})
//#then

View File

@ -5,9 +5,9 @@ import { checkCompletionConditions } from "./completion"
import { normalizeSDKResponse } from "../../shared"
const DEFAULT_POLL_INTERVAL_MS = 500
const DEFAULT_REQUIRED_CONSECUTIVE = 3
const DEFAULT_REQUIRED_CONSECUTIVE = 1
const ERROR_GRACE_CYCLES = 3
const MIN_STABILIZATION_MS = 10_000
const MIN_STABILIZATION_MS = 0
export interface PollOptions {
pollIntervalMs?: number
@ -75,6 +75,11 @@ export async function pollForCompletion(
}
if (!eventState.hasReceivedMeaningfulWork) {
if (minStabilizationMs <= 0) {
consecutiveCompleteChecks = 0
continue
}
if (Date.now() - pollStartTimestamp < minStabilizationMs) {
consecutiveCompleteChecks = 0
continue

View File

@ -1,6 +1,6 @@
/// <reference types="bun-types" />
import { describe, it, expect, spyOn, afterEach } from "bun:test"
import { describe, it, expect } from "bun:test"
import type { OhMyOpenCodeConfig } from "../../config"
import { resolveRunAgent, waitForEventProcessorShutdown } from "./runner"
@ -83,14 +83,6 @@ describe("resolveRunAgent", () => {
})
describe("waitForEventProcessorShutdown", () => {
let consoleLogSpy: ReturnType<typeof spyOn<typeof console, "log">> | null = null
afterEach(() => {
if (consoleLogSpy) {
consoleLogSpy.mockRestore()
consoleLogSpy = null
}
})
it("returns quickly when event processor completes", async () => {
//#given
@ -99,7 +91,6 @@ describe("waitForEventProcessorShutdown", () => {
resolve()
}, 25)
})
consoleLogSpy = spyOn(console, "log").mockImplementation(() => {})
const start = performance.now()
//#when
@ -108,29 +99,19 @@ describe("waitForEventProcessorShutdown", () => {
//#then
const elapsed = performance.now() - start
expect(elapsed).toBeLessThan(200)
expect(console.log).not.toHaveBeenCalledWith(
"[run] Event stream did not close within 200ms after abort; continuing shutdown.",
)
})
it("times out and continues when event processor does not complete", async () => {
//#given
const eventProcessor = new Promise<void>(() => {})
const spy = spyOn(console, "log").mockImplementation(() => {})
consoleLogSpy = spy
const timeoutMs = 200
const start = performance.now()
try {
//#when
await waitForEventProcessorShutdown(eventProcessor, timeoutMs)
//#when
await waitForEventProcessorShutdown(eventProcessor, timeoutMs)
//#then
const elapsed = performance.now() - start
expect(elapsed).toBeGreaterThanOrEqual(timeoutMs - 10)
expect(spy.mock.calls.length).toBeGreaterThanOrEqual(1)
} finally {
spy.mockRestore()
}
//#then
const elapsed = performance.now() - start
expect(elapsed).toBeGreaterThanOrEqual(timeoutMs - 10)
})
})

View File

@ -22,13 +22,7 @@ export async function waitForEventProcessorShutdown(
new Promise<boolean>((resolve) => setTimeout(() => resolve(false), timeoutMs)),
])
if (!completed) {
console.log(
pc.dim(
`[run] Event stream did not close within ${timeoutMs}ms after abort; continuing shutdown.`,
),
)
}
void completed
}
export async function run(options: RunOptions): Promise<number> {

View File

@ -0,0 +1,38 @@
import pc from "picocolors"
const SINGLE_VALUE_FIELDS = ["command", "filePath"] as const
const MULTI_VALUE_FIELDS = [
"description",
"pattern",
"query",
"url",
"category",
"subagent_type",
"lang",
"run_in_background",
] as const
export function formatToolInputPreview(input: Record<string, unknown>): string {
for (const key of SINGLE_VALUE_FIELDS) {
if (!input[key]) continue
const maxLen = key === "command" ? 80 : 120
return ` ${pc.dim(String(input[key]).slice(0, maxLen))}`
}
const parts: string[] = []
let totalLen = 0
for (const key of MULTI_VALUE_FIELDS) {
const val = input[key]
if (val === undefined || val === null) continue
const str = String(val)
const truncated = str.length > 50 ? str.slice(0, 47) + "..." : str
const entry = `${key}=${truncated}`
if (totalLen + entry.length > 120) break
parts.push(entry)
totalLen += entry.length + 1
}
return parts.length > 0 ? ` ${pc.dim(parts.join(" "))}` : ""
}

View File

@ -0,0 +1 @@
export const CONTINUATION_MARKER_DIR = ".sisyphus/run-continuation"

View File

@ -0,0 +1,3 @@
export * from "./types"
export * from "./constants"
export * from "./storage"

View File

@ -0,0 +1,91 @@
import { afterEach, describe, expect, it } from "bun:test"
import { mkdtempSync, rmSync } from "node:fs"
import { join } from "node:path"
import { tmpdir } from "node:os"
import {
clearContinuationMarker,
isContinuationMarkerActive,
readContinuationMarker,
setContinuationMarkerSource,
} from "./storage"
const tempDirs: string[] = []
function createTempDir(): string {
const directory = mkdtempSync(join(tmpdir(), "omo-run-marker-"))
tempDirs.push(directory)
return directory
}
afterEach(() => {
while (tempDirs.length > 0) {
const directory = tempDirs.pop()
if (directory) {
rmSync(directory, { recursive: true, force: true })
}
}
})
describe("run-continuation-state storage", () => {
it("stores and reads per-source marker state", () => {
// given
const directory = createTempDir()
const sessionID = "ses_test"
// when
setContinuationMarkerSource(directory, sessionID, "todo", "active", "2 todos remaining")
setContinuationMarkerSource(directory, sessionID, "stop", "stopped", "user requested stop")
const marker = readContinuationMarker(directory, sessionID)
// then
expect(marker).not.toBeNull()
expect(marker?.sessionID).toBe(sessionID)
expect(marker?.sources.todo?.state).toBe("active")
expect(marker?.sources.todo?.reason).toBe("2 todos remaining")
expect(marker?.sources.stop?.state).toBe("stopped")
})
it("treats marker as active when any source is active", () => {
// given
const directory = createTempDir()
const sessionID = "ses_active"
setContinuationMarkerSource(directory, sessionID, "todo", "active", "pending")
setContinuationMarkerSource(directory, sessionID, "stop", "idle")
const marker = readContinuationMarker(directory, sessionID)
// when
const isActive = isContinuationMarkerActive(marker)
// then
expect(isActive).toBe(true)
})
it("returns inactive when no source is active", () => {
// given
const directory = createTempDir()
const sessionID = "ses_idle"
setContinuationMarkerSource(directory, sessionID, "todo", "idle")
setContinuationMarkerSource(directory, sessionID, "stop", "stopped")
const marker = readContinuationMarker(directory, sessionID)
// when
const isActive = isContinuationMarkerActive(marker)
// then
expect(isActive).toBe(false)
})
it("clears marker for a session", () => {
// given
const directory = createTempDir()
const sessionID = "ses_clear"
setContinuationMarkerSource(directory, sessionID, "todo", "active")
// when
clearContinuationMarker(directory, sessionID)
const marker = readContinuationMarker(directory, sessionID)
// then
expect(marker).toBeNull()
})
})

View File

@ -0,0 +1,80 @@
import { existsSync, mkdirSync, readFileSync, rmSync, writeFileSync } from "node:fs"
import { join } from "node:path"
import { CONTINUATION_MARKER_DIR } from "./constants"
import type {
ContinuationMarker,
ContinuationMarkerSource,
ContinuationMarkerState,
} from "./types"
function getMarkerPath(directory: string, sessionID: string): string {
return join(directory, CONTINUATION_MARKER_DIR, `${sessionID}.json`)
}
export function readContinuationMarker(
directory: string,
sessionID: string,
): ContinuationMarker | null {
const markerPath = getMarkerPath(directory, sessionID)
if (!existsSync(markerPath)) return null
try {
const raw = readFileSync(markerPath, "utf-8")
const parsed = JSON.parse(raw)
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) return null
return parsed as ContinuationMarker
} catch {
return null
}
}
export function setContinuationMarkerSource(
directory: string,
sessionID: string,
source: ContinuationMarkerSource,
state: ContinuationMarkerState,
reason?: string,
): ContinuationMarker {
const now = new Date().toISOString()
const existing = readContinuationMarker(directory, sessionID)
const next: ContinuationMarker = {
sessionID,
updatedAt: now,
sources: {
...(existing?.sources ?? {}),
[source]: {
state,
...(reason ? { reason } : {}),
updatedAt: now,
},
},
}
const markerPath = getMarkerPath(directory, sessionID)
mkdirSync(join(directory, CONTINUATION_MARKER_DIR), { recursive: true })
writeFileSync(markerPath, JSON.stringify(next, null, 2), "utf-8")
return next
}
export function clearContinuationMarker(directory: string, sessionID: string): void {
const markerPath = getMarkerPath(directory, sessionID)
if (!existsSync(markerPath)) return
try {
rmSync(markerPath)
} catch {
}
}
export function isContinuationMarkerActive(marker: ContinuationMarker | null): boolean {
if (!marker) return false
return Object.values(marker.sources).some((entry) => entry?.state === "active")
}
export function getActiveContinuationMarkerReason(marker: ContinuationMarker | null): string | null {
if (!marker) return null
const active = Object.entries(marker.sources).find(([, entry]) => entry?.state === "active")
if (!active || !active[1]) return null
const [source, entry] = active
return entry.reason ?? `${source} continuation is active`
}

View File

@ -0,0 +1,15 @@
export type ContinuationMarkerSource = "todo" | "stop"
export type ContinuationMarkerState = "idle" | "active" | "stopped"
export interface ContinuationMarkerSourceEntry {
state: ContinuationMarkerState
reason?: string
updatedAt: string
}
export interface ContinuationMarker {
sessionID: string
updatedAt: string
sources: Partial<Record<ContinuationMarkerSource, ContinuationMarkerSourceEntry>>
}

View File

@ -1,5 +1,9 @@
import type { PluginInput } from "@opencode-ai/plugin"
import {
clearContinuationMarker,
setContinuationMarkerSource,
} from "../../features/run-continuation-state"
import { log } from "../../shared/logger"
const HOOK_NAME = "stop-continuation-guard"
@ -13,12 +17,13 @@ export interface StopContinuationGuard {
}
export function createStopContinuationGuardHook(
_ctx: PluginInput
ctx: PluginInput
): StopContinuationGuard {
const stoppedSessions = new Set<string>()
const stop = (sessionID: string): void => {
stoppedSessions.add(sessionID)
setContinuationMarkerSource(ctx.directory, sessionID, "stop", "stopped", "continuation stopped")
log(`[${HOOK_NAME}] Continuation stopped for session`, { sessionID })
}
@ -28,6 +33,7 @@ export function createStopContinuationGuardHook(
const clear = (sessionID: string): void => {
stoppedSessions.delete(sessionID)
setContinuationMarkerSource(ctx.directory, sessionID, "stop", "idle")
log(`[${HOOK_NAME}] Continuation guard cleared for session`, { sessionID })
}
@ -42,6 +48,7 @@ export function createStopContinuationGuardHook(
const sessionInfo = props?.info as { id?: string } | undefined
if (sessionInfo?.id) {
clear(sessionInfo.id)
clearContinuationMarker(ctx.directory, sessionInfo.id)
log(`[${HOOK_NAME}] Session deleted: cleaned up`, { sessionID: sessionInfo.id })
}
}

View File

@ -1,7 +1,28 @@
import { describe, expect, test } from "bun:test"
import { afterEach, describe, expect, test } from "bun:test"
import { mkdtempSync, rmSync } from "node:fs"
import { join } from "node:path"
import { tmpdir } from "node:os"
import { readContinuationMarker } from "../../features/run-continuation-state"
import { createStopContinuationGuardHook } from "./index"
describe("stop-continuation-guard", () => {
const tempDirs: string[] = []
function createTempDir(): string {
const directory = mkdtempSync(join(tmpdir(), "omo-stop-guard-"))
tempDirs.push(directory)
return directory
}
afterEach(() => {
while (tempDirs.length > 0) {
const directory = tempDirs.pop()
if (directory) {
rmSync(directory, { recursive: true, force: true })
}
}
})
function createMockPluginInput() {
return {
client: {
@ -9,13 +30,14 @@ describe("stop-continuation-guard", () => {
showToast: async () => ({}),
},
},
directory: "/tmp/test",
} as never
directory: createTempDir(),
} as any
}
test("should mark session as stopped", () => {
// given - a guard hook with no stopped sessions
const guard = createStopContinuationGuardHook(createMockPluginInput())
const input = createMockPluginInput()
const guard = createStopContinuationGuardHook(input)
const sessionID = "test-session-1"
// when - we stop continuation for the session
@ -23,6 +45,9 @@ describe("stop-continuation-guard", () => {
// then - session should be marked as stopped
expect(guard.isStopped(sessionID)).toBe(true)
const marker = readContinuationMarker(input.directory, sessionID)
expect(marker?.sources.stop?.state).toBe("stopped")
})
test("should return false for non-stopped sessions", () => {

View File

@ -1,6 +1,9 @@
import type { PluginInput } from "@opencode-ai/plugin"
import type { BackgroundManager } from "../../features/background-agent"
import {
clearContinuationMarker,
} from "../../features/run-continuation-state"
import { log } from "../../shared/logger"
import { DEFAULT_SKIP_AGENTS, HOOK_NAME } from "./constants"
@ -45,6 +48,7 @@ export function createTodoContinuationHandler(args: {
if (event.type === "session.idle") {
const sessionID = props?.sessionID as string | undefined
if (!sessionID) return
await handleSessionIdle({
ctx,
sessionID,
@ -56,6 +60,13 @@ export function createTodoContinuationHandler(args: {
return
}
if (event.type === "session.deleted") {
const sessionInfo = props?.info as { id?: string } | undefined
if (sessionInfo?.id) {
clearContinuationMarker(ctx.directory, sessionInfo.id)
}
}
handleNonIdleEvent({
eventType: event.type,
properties: props,