fix: harden run completion checks and graceful timeout
This commit is contained in:
parent
5f5b476f12
commit
47a8c3e4a9
@ -143,6 +143,25 @@ describe("checkCompletionConditions", () => {
|
|||||||
expect(result).toBe(false)
|
expect(result).toBe(false)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it("returns false when child status is missing", async () => {
|
||||||
|
// given
|
||||||
|
spyOn(console, "log").mockImplementation(() => {})
|
||||||
|
const ctx = createMockContext({
|
||||||
|
childrenBySession: {
|
||||||
|
"test-session": [{ id: "child-1" }],
|
||||||
|
"child-1": [],
|
||||||
|
},
|
||||||
|
statuses: {},
|
||||||
|
})
|
||||||
|
const { checkCompletionConditions } = await import("./completion")
|
||||||
|
|
||||||
|
// when
|
||||||
|
const result = await checkCompletionConditions(ctx)
|
||||||
|
|
||||||
|
// then
|
||||||
|
expect(result).toBe(false)
|
||||||
|
})
|
||||||
|
|
||||||
it("returns true when all descendants idle (recursive)", async () => {
|
it("returns true when all descendants idle (recursive)", async () => {
|
||||||
// given
|
// given
|
||||||
spyOn(console, "log").mockImplementation(() => {})
|
spyOn(console, "log").mockImplementation(() => {})
|
||||||
|
|||||||
@ -65,7 +65,14 @@ async function areAllDescendantsIdle(
|
|||||||
|
|
||||||
for (const child of children) {
|
for (const child of children) {
|
||||||
const status = allStatuses[child.id]
|
const status = allStatuses[child.id]
|
||||||
if (status && status.type !== "idle") {
|
if (!status) {
|
||||||
|
console.log(
|
||||||
|
pc.dim(` Waiting: session ${child.id.slice(0, 8)}... status unknown`)
|
||||||
|
)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if (status.type !== "idle") {
|
||||||
console.log(
|
console.log(
|
||||||
pc.dim(` Waiting: session ${child.id.slice(0, 8)}... is ${status.type}`)
|
pc.dim(` Waiting: session ${child.id.slice(0, 8)}... is ${status.type}`)
|
||||||
)
|
)
|
||||||
|
|||||||
@ -310,7 +310,7 @@ describe("pollForCompletion", () => {
|
|||||||
//#then - returns 1 (not 130/timeout), error message printed
|
//#then - returns 1 (not 130/timeout), error message printed
|
||||||
expect(result).toBe(1)
|
expect(result).toBe(1)
|
||||||
const errorCalls = (console.error as ReturnType<typeof mock>).mock.calls
|
const errorCalls = (console.error as ReturnType<typeof mock>).mock.calls
|
||||||
expect(errorCalls.some((call) => call[0]?.includes("Session ended with error"))).toBe(true)
|
expect(errorCalls.some((call: unknown[]) => String(call[0] ?? "").includes("Session ended with error"))).toBe(true)
|
||||||
})
|
})
|
||||||
|
|
||||||
it("returns 1 when session errors while tool is active (error not masked by tool gate)", async () => {
|
it("returns 1 when session errors while tool is active (error not masked by tool gate)", async () => {
|
||||||
@ -335,4 +335,69 @@ describe("pollForCompletion", () => {
|
|||||||
//#then - returns 1
|
//#then - returns 1
|
||||||
expect(result).toBe(1)
|
expect(result).toBe(1)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it("returns 130 after graceful timeout window expires", async () => {
|
||||||
|
//#given
|
||||||
|
spyOn(console, "log").mockImplementation(() => {})
|
||||||
|
spyOn(console, "error").mockImplementation(() => {})
|
||||||
|
const ctx = createMockContext({
|
||||||
|
statuses: {
|
||||||
|
"test-session": { type: "busy" },
|
||||||
|
},
|
||||||
|
})
|
||||||
|
const eventState = createEventState()
|
||||||
|
eventState.mainSessionIdle = false
|
||||||
|
eventState.hasReceivedMeaningfulWork = true
|
||||||
|
const abortController = new AbortController()
|
||||||
|
|
||||||
|
//#when
|
||||||
|
const result = await pollForCompletion(ctx, eventState, abortController, {
|
||||||
|
pollIntervalMs: 10,
|
||||||
|
requiredConsecutive: 1,
|
||||||
|
minStabilizationMs: 0,
|
||||||
|
timeoutMs: 30,
|
||||||
|
timeoutGraceMs: 40,
|
||||||
|
})
|
||||||
|
|
||||||
|
//#then
|
||||||
|
expect(result).toBe(130)
|
||||||
|
expect(abortController.signal.aborted).toBe(true)
|
||||||
|
})
|
||||||
|
|
||||||
|
it("allows completion during graceful timeout window", async () => {
|
||||||
|
//#given
|
||||||
|
spyOn(console, "log").mockImplementation(() => {})
|
||||||
|
spyOn(console, "error").mockImplementation(() => {})
|
||||||
|
const ctx = createMockContext()
|
||||||
|
const eventState = createEventState()
|
||||||
|
eventState.mainSessionIdle = true
|
||||||
|
eventState.hasReceivedMeaningfulWork = true
|
||||||
|
const abortController = new AbortController()
|
||||||
|
let todoCalls = 0
|
||||||
|
|
||||||
|
;(ctx.client.session as unknown as {
|
||||||
|
todo: ReturnType<typeof mock>
|
||||||
|
children: ReturnType<typeof mock>
|
||||||
|
status: ReturnType<typeof mock>
|
||||||
|
}).todo = mock(async () => {
|
||||||
|
todoCalls++
|
||||||
|
if (todoCalls === 1) {
|
||||||
|
return { data: [{ id: "1", content: "wip", status: "in_progress", priority: "high" }] }
|
||||||
|
}
|
||||||
|
return { data: [] }
|
||||||
|
})
|
||||||
|
|
||||||
|
//#when
|
||||||
|
const result = await pollForCompletion(ctx, eventState, abortController, {
|
||||||
|
pollIntervalMs: 10,
|
||||||
|
requiredConsecutive: 1,
|
||||||
|
minStabilizationMs: 0,
|
||||||
|
timeoutMs: 20,
|
||||||
|
timeoutGraceMs: 80,
|
||||||
|
})
|
||||||
|
|
||||||
|
//#then
|
||||||
|
expect(result).toBe(0)
|
||||||
|
expect(abortController.signal.aborted).toBe(false)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
@ -8,11 +8,14 @@ const DEFAULT_POLL_INTERVAL_MS = 500
|
|||||||
const DEFAULT_REQUIRED_CONSECUTIVE = 3
|
const DEFAULT_REQUIRED_CONSECUTIVE = 3
|
||||||
const ERROR_GRACE_CYCLES = 3
|
const ERROR_GRACE_CYCLES = 3
|
||||||
const MIN_STABILIZATION_MS = 10_000
|
const MIN_STABILIZATION_MS = 10_000
|
||||||
|
const DEFAULT_TIMEOUT_GRACE_MS = 15_000
|
||||||
|
|
||||||
export interface PollOptions {
|
export interface PollOptions {
|
||||||
pollIntervalMs?: number
|
pollIntervalMs?: number
|
||||||
requiredConsecutive?: number
|
requiredConsecutive?: number
|
||||||
minStabilizationMs?: number
|
minStabilizationMs?: number
|
||||||
|
timeoutMs?: number
|
||||||
|
timeoutGraceMs?: number
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function pollForCompletion(
|
export async function pollForCompletion(
|
||||||
@ -26,14 +29,35 @@ export async function pollForCompletion(
|
|||||||
options.requiredConsecutive ?? DEFAULT_REQUIRED_CONSECUTIVE
|
options.requiredConsecutive ?? DEFAULT_REQUIRED_CONSECUTIVE
|
||||||
const minStabilizationMs =
|
const minStabilizationMs =
|
||||||
options.minStabilizationMs ?? MIN_STABILIZATION_MS
|
options.minStabilizationMs ?? MIN_STABILIZATION_MS
|
||||||
|
const timeoutMs = options.timeoutMs ?? 0
|
||||||
|
const timeoutGraceMs = options.timeoutGraceMs ?? DEFAULT_TIMEOUT_GRACE_MS
|
||||||
let consecutiveCompleteChecks = 0
|
let consecutiveCompleteChecks = 0
|
||||||
let errorCycleCount = 0
|
let errorCycleCount = 0
|
||||||
let firstWorkTimestamp: number | null = null
|
let firstWorkTimestamp: number | null = null
|
||||||
const pollStartTimestamp = Date.now()
|
const pollStartTimestamp = Date.now()
|
||||||
|
let timeoutNoticePrinted = false
|
||||||
|
|
||||||
while (!abortController.signal.aborted) {
|
while (!abortController.signal.aborted) {
|
||||||
await new Promise((resolve) => setTimeout(resolve, pollIntervalMs))
|
await new Promise((resolve) => setTimeout(resolve, pollIntervalMs))
|
||||||
|
|
||||||
|
if (abortController.signal.aborted) {
|
||||||
|
return 130
|
||||||
|
}
|
||||||
|
|
||||||
|
if (timeoutMs > 0) {
|
||||||
|
const elapsedMs = Date.now() - pollStartTimestamp
|
||||||
|
if (elapsedMs >= timeoutMs && !timeoutNoticePrinted) {
|
||||||
|
console.log(pc.yellow("\nTimeout reached. Entering graceful shutdown window..."))
|
||||||
|
timeoutNoticePrinted = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if (elapsedMs >= timeoutMs + timeoutGraceMs) {
|
||||||
|
console.log(pc.yellow("Grace period expired. Aborting..."))
|
||||||
|
abortController.abort()
|
||||||
|
return 130
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ERROR CHECK FIRST — errors must not be masked by other gates
|
// ERROR CHECK FIRST — errors must not be masked by other gates
|
||||||
if (eventState.mainSessionError) {
|
if (eventState.mainSessionError) {
|
||||||
errorCycleCount++
|
errorCycleCount++
|
||||||
@ -91,6 +115,10 @@ export async function pollForCompletion(
|
|||||||
|
|
||||||
const shouldExit = await checkCompletionConditions(ctx)
|
const shouldExit = await checkCompletionConditions(ctx)
|
||||||
if (shouldExit) {
|
if (shouldExit) {
|
||||||
|
if (abortController.signal.aborted) {
|
||||||
|
return 130
|
||||||
|
}
|
||||||
|
|
||||||
consecutiveCompleteChecks++
|
consecutiveCompleteChecks++
|
||||||
if (consecutiveCompleteChecks >= requiredConsecutive) {
|
if (consecutiveCompleteChecks >= requiredConsecutive) {
|
||||||
console.log(pc.green("\n\nAll tasks completed."))
|
console.log(pc.green("\n\nAll tasks completed."))
|
||||||
|
|||||||
@ -48,14 +48,6 @@ export async function run(options: RunOptions): Promise<number> {
|
|||||||
const pluginConfig = loadPluginConfig(directory, { command: "run" })
|
const pluginConfig = loadPluginConfig(directory, { command: "run" })
|
||||||
const resolvedAgent = resolveRunAgent(options, pluginConfig)
|
const resolvedAgent = resolveRunAgent(options, pluginConfig)
|
||||||
const abortController = new AbortController()
|
const abortController = new AbortController()
|
||||||
let timeoutId: ReturnType<typeof setTimeout> | null = null
|
|
||||||
|
|
||||||
if (timeout > 0) {
|
|
||||||
timeoutId = setTimeout(() => {
|
|
||||||
console.log(pc.yellow("\nTimeout reached. Aborting..."))
|
|
||||||
abortController.abort()
|
|
||||||
}, timeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const { client, cleanup: serverCleanup } = await createServerConnection({
|
const { client, cleanup: serverCleanup } = await createServerConnection({
|
||||||
@ -65,7 +57,6 @@ export async function run(options: RunOptions): Promise<number> {
|
|||||||
})
|
})
|
||||||
|
|
||||||
const cleanup = () => {
|
const cleanup = () => {
|
||||||
if (timeoutId) clearTimeout(timeoutId)
|
|
||||||
serverCleanup()
|
serverCleanup()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -108,7 +99,9 @@ export async function run(options: RunOptions): Promise<number> {
|
|||||||
})
|
})
|
||||||
|
|
||||||
console.log(pc.dim("Waiting for completion...\n"))
|
console.log(pc.dim("Waiting for completion...\n"))
|
||||||
const exitCode = await pollForCompletion(ctx, eventState, abortController)
|
const exitCode = await pollForCompletion(ctx, eventState, abortController, {
|
||||||
|
timeoutMs: timeout,
|
||||||
|
})
|
||||||
|
|
||||||
// Abort the event stream to stop the processor
|
// Abort the event stream to stop the processor
|
||||||
abortController.abort()
|
abortController.abort()
|
||||||
@ -144,7 +137,6 @@ export async function run(options: RunOptions): Promise<number> {
|
|||||||
throw err
|
throw err
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
if (timeoutId) clearTimeout(timeoutId)
|
|
||||||
if (jsonManager) jsonManager.restore()
|
if (jsonManager) jsonManager.restore()
|
||||||
if (err instanceof Error && err.name === "AbortError") {
|
if (err instanceof Error && err.name === "AbortError") {
|
||||||
return 130
|
return 130
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user