Merge pull request #1836 from code-yeongyu/fix/issue-1769-background-staleness
fix(background-agent): detect stale tasks that never received progress updates
This commit is contained in:
commit
32470f5ca0
@ -6,6 +6,8 @@ export const BackgroundTaskConfigSchema = z.object({
|
|||||||
modelConcurrency: z.record(z.string(), z.number().min(0)).optional(),
|
modelConcurrency: z.record(z.string(), z.number().min(0)).optional(),
|
||||||
/** Stale timeout in milliseconds - interrupt tasks with no activity for this duration (default: 180000 = 3 minutes, minimum: 60000 = 1 minute) */
|
/** Stale timeout in milliseconds - interrupt tasks with no activity for this duration (default: 180000 = 3 minutes, minimum: 60000 = 1 minute) */
|
||||||
staleTimeoutMs: z.number().min(60000).optional(),
|
staleTimeoutMs: z.number().min(60000).optional(),
|
||||||
|
/** Timeout for tasks that never received any progress update, falling back to startedAt (default: 600000 = 10 minutes, minimum: 60000 = 1 minute) */
|
||||||
|
messageStalenessTimeoutMs: z.number().min(60000).optional(),
|
||||||
})
|
})
|
||||||
|
|
||||||
export type BackgroundTaskConfig = z.infer<typeof BackgroundTaskConfigSchema>
|
export type BackgroundTaskConfig = z.infer<typeof BackgroundTaskConfigSchema>
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import type { BackgroundTask, LaunchInput } from "./types"
|
|||||||
export const TASK_TTL_MS = 30 * 60 * 1000
|
export const TASK_TTL_MS = 30 * 60 * 1000
|
||||||
export const MIN_STABILITY_TIME_MS = 10 * 1000
|
export const MIN_STABILITY_TIME_MS = 10 * 1000
|
||||||
export const DEFAULT_STALE_TIMEOUT_MS = 180_000
|
export const DEFAULT_STALE_TIMEOUT_MS = 180_000
|
||||||
|
export const DEFAULT_MESSAGE_STALENESS_TIMEOUT_MS = 600_000
|
||||||
export const MIN_RUNTIME_BEFORE_STALE_MS = 30_000
|
export const MIN_RUNTIME_BEFORE_STALE_MS = 30_000
|
||||||
export const MIN_IDLE_TIME_MS = 5000
|
export const MIN_IDLE_TIME_MS = 5000
|
||||||
export const POLLING_INTERVAL_MS = 3000
|
export const POLLING_INTERVAL_MS = 3000
|
||||||
|
|||||||
192
src/features/background-agent/task-poller.test.ts
Normal file
192
src/features/background-agent/task-poller.test.ts
Normal file
@ -0,0 +1,192 @@
|
|||||||
|
import { describe, it, expect, mock } from "bun:test"
|
||||||
|
|
||||||
|
import { checkAndInterruptStaleTasks, pruneStaleTasksAndNotifications } from "./task-poller"
|
||||||
|
import type { BackgroundTask } from "./types"
|
||||||
|
|
||||||
|
describe("checkAndInterruptStaleTasks", () => {
|
||||||
|
const mockClient = {
|
||||||
|
session: {
|
||||||
|
abort: mock(() => Promise.resolve()),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
const mockConcurrencyManager = {
|
||||||
|
release: mock(() => {}),
|
||||||
|
}
|
||||||
|
const mockNotify = mock(() => Promise.resolve())
|
||||||
|
|
||||||
|
function createRunningTask(overrides: Partial<BackgroundTask> = {}): BackgroundTask {
|
||||||
|
return {
|
||||||
|
id: "task-1",
|
||||||
|
sessionID: "ses-1",
|
||||||
|
parentSessionID: "parent-ses-1",
|
||||||
|
parentMessageID: "msg-1",
|
||||||
|
description: "test",
|
||||||
|
prompt: "test",
|
||||||
|
agent: "explore",
|
||||||
|
status: "running",
|
||||||
|
startedAt: new Date(Date.now() - 120_000),
|
||||||
|
...overrides,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
it("should interrupt tasks with lastUpdate exceeding stale timeout", async () => {
|
||||||
|
//#given
|
||||||
|
const task = createRunningTask({
|
||||||
|
progress: {
|
||||||
|
toolCalls: 1,
|
||||||
|
lastUpdate: new Date(Date.now() - 200_000),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
//#when
|
||||||
|
await checkAndInterruptStaleTasks({
|
||||||
|
tasks: [task],
|
||||||
|
client: mockClient as never,
|
||||||
|
config: { staleTimeoutMs: 180_000 },
|
||||||
|
concurrencyManager: mockConcurrencyManager as never,
|
||||||
|
notifyParentSession: mockNotify,
|
||||||
|
})
|
||||||
|
|
||||||
|
//#then
|
||||||
|
expect(task.status).toBe("cancelled")
|
||||||
|
expect(task.error).toContain("Stale timeout")
|
||||||
|
})
|
||||||
|
|
||||||
|
it("should NOT interrupt tasks with recent lastUpdate", async () => {
|
||||||
|
//#given
|
||||||
|
const task = createRunningTask({
|
||||||
|
progress: {
|
||||||
|
toolCalls: 1,
|
||||||
|
lastUpdate: new Date(Date.now() - 10_000),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
//#when
|
||||||
|
await checkAndInterruptStaleTasks({
|
||||||
|
tasks: [task],
|
||||||
|
client: mockClient as never,
|
||||||
|
config: { staleTimeoutMs: 180_000 },
|
||||||
|
concurrencyManager: mockConcurrencyManager as never,
|
||||||
|
notifyParentSession: mockNotify,
|
||||||
|
})
|
||||||
|
|
||||||
|
//#then
|
||||||
|
expect(task.status).toBe("running")
|
||||||
|
})
|
||||||
|
|
||||||
|
it("should interrupt tasks with NO progress.lastUpdate that exceeded messageStalenessTimeoutMs since startedAt", async () => {
|
||||||
|
//#given — task started 15 minutes ago, never received any progress update
|
||||||
|
const task = createRunningTask({
|
||||||
|
startedAt: new Date(Date.now() - 15 * 60 * 1000),
|
||||||
|
progress: undefined,
|
||||||
|
})
|
||||||
|
|
||||||
|
//#when
|
||||||
|
await checkAndInterruptStaleTasks({
|
||||||
|
tasks: [task],
|
||||||
|
client: mockClient as never,
|
||||||
|
config: { messageStalenessTimeoutMs: 600_000 },
|
||||||
|
concurrencyManager: mockConcurrencyManager as never,
|
||||||
|
notifyParentSession: mockNotify,
|
||||||
|
})
|
||||||
|
|
||||||
|
//#then
|
||||||
|
expect(task.status).toBe("cancelled")
|
||||||
|
expect(task.error).toContain("no activity")
|
||||||
|
})
|
||||||
|
|
||||||
|
it("should NOT interrupt tasks with NO progress.lastUpdate that are within messageStalenessTimeoutMs", async () => {
|
||||||
|
//#given — task started 5 minutes ago, default timeout is 10 minutes
|
||||||
|
const task = createRunningTask({
|
||||||
|
startedAt: new Date(Date.now() - 5 * 60 * 1000),
|
||||||
|
progress: undefined,
|
||||||
|
})
|
||||||
|
|
||||||
|
//#when
|
||||||
|
await checkAndInterruptStaleTasks({
|
||||||
|
tasks: [task],
|
||||||
|
client: mockClient as never,
|
||||||
|
config: { messageStalenessTimeoutMs: 600_000 },
|
||||||
|
concurrencyManager: mockConcurrencyManager as never,
|
||||||
|
notifyParentSession: mockNotify,
|
||||||
|
})
|
||||||
|
|
||||||
|
//#then
|
||||||
|
expect(task.status).toBe("running")
|
||||||
|
})
|
||||||
|
|
||||||
|
it("should use DEFAULT_MESSAGE_STALENESS_TIMEOUT_MS when messageStalenessTimeoutMs is not configured", async () => {
|
||||||
|
//#given — task started 15 minutes ago, no config for messageStalenessTimeoutMs
|
||||||
|
const task = createRunningTask({
|
||||||
|
startedAt: new Date(Date.now() - 15 * 60 * 1000),
|
||||||
|
progress: undefined,
|
||||||
|
})
|
||||||
|
|
||||||
|
//#when — default is 10 minutes (600_000ms)
|
||||||
|
await checkAndInterruptStaleTasks({
|
||||||
|
tasks: [task],
|
||||||
|
client: mockClient as never,
|
||||||
|
config: undefined,
|
||||||
|
concurrencyManager: mockConcurrencyManager as never,
|
||||||
|
notifyParentSession: mockNotify,
|
||||||
|
})
|
||||||
|
|
||||||
|
//#then
|
||||||
|
expect(task.status).toBe("cancelled")
|
||||||
|
expect(task.error).toContain("no activity")
|
||||||
|
})
|
||||||
|
|
||||||
|
it("should release concurrency key when interrupting a never-updated task", async () => {
|
||||||
|
//#given
|
||||||
|
const releaseMock = mock(() => {})
|
||||||
|
const task = createRunningTask({
|
||||||
|
startedAt: new Date(Date.now() - 15 * 60 * 1000),
|
||||||
|
progress: undefined,
|
||||||
|
concurrencyKey: "anthropic/claude-opus-4-6",
|
||||||
|
})
|
||||||
|
|
||||||
|
//#when
|
||||||
|
await checkAndInterruptStaleTasks({
|
||||||
|
tasks: [task],
|
||||||
|
client: mockClient as never,
|
||||||
|
config: { messageStalenessTimeoutMs: 600_000 },
|
||||||
|
concurrencyManager: { release: releaseMock } as never,
|
||||||
|
notifyParentSession: mockNotify,
|
||||||
|
})
|
||||||
|
|
||||||
|
//#then
|
||||||
|
expect(releaseMock).toHaveBeenCalledWith("anthropic/claude-opus-4-6")
|
||||||
|
expect(task.concurrencyKey).toBeUndefined()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe("pruneStaleTasksAndNotifications", () => {
|
||||||
|
it("should prune tasks that exceeded TTL", () => {
|
||||||
|
//#given
|
||||||
|
const tasks = new Map<string, BackgroundTask>()
|
||||||
|
const oldTask: BackgroundTask = {
|
||||||
|
id: "old-task",
|
||||||
|
parentSessionID: "parent",
|
||||||
|
parentMessageID: "msg",
|
||||||
|
description: "old",
|
||||||
|
prompt: "old",
|
||||||
|
agent: "explore",
|
||||||
|
status: "running",
|
||||||
|
startedAt: new Date(Date.now() - 31 * 60 * 1000),
|
||||||
|
}
|
||||||
|
tasks.set("old-task", oldTask)
|
||||||
|
|
||||||
|
const pruned: string[] = []
|
||||||
|
const notifications = new Map<string, BackgroundTask[]>()
|
||||||
|
|
||||||
|
//#when
|
||||||
|
pruneStaleTasksAndNotifications({
|
||||||
|
tasks,
|
||||||
|
notifications,
|
||||||
|
onTaskPruned: (taskId) => pruned.push(taskId),
|
||||||
|
})
|
||||||
|
|
||||||
|
//#then
|
||||||
|
expect(pruned).toContain("old-task")
|
||||||
|
})
|
||||||
|
})
|
||||||
@ -6,6 +6,7 @@ import type { ConcurrencyManager } from "./concurrency"
|
|||||||
import type { OpencodeClient } from "./opencode-client"
|
import type { OpencodeClient } from "./opencode-client"
|
||||||
|
|
||||||
import {
|
import {
|
||||||
|
DEFAULT_MESSAGE_STALENESS_TIMEOUT_MS,
|
||||||
DEFAULT_STALE_TIMEOUT_MS,
|
DEFAULT_STALE_TIMEOUT_MS,
|
||||||
MIN_RUNTIME_BEFORE_STALE_MS,
|
MIN_RUNTIME_BEFORE_STALE_MS,
|
||||||
TASK_TTL_MS,
|
TASK_TTL_MS,
|
||||||
@ -67,15 +68,41 @@ export async function checkAndInterruptStaleTasks(args: {
|
|||||||
const staleTimeoutMs = config?.staleTimeoutMs ?? DEFAULT_STALE_TIMEOUT_MS
|
const staleTimeoutMs = config?.staleTimeoutMs ?? DEFAULT_STALE_TIMEOUT_MS
|
||||||
const now = Date.now()
|
const now = Date.now()
|
||||||
|
|
||||||
|
const messageStalenessMs = config?.messageStalenessTimeoutMs ?? DEFAULT_MESSAGE_STALENESS_TIMEOUT_MS
|
||||||
|
|
||||||
for (const task of tasks) {
|
for (const task of tasks) {
|
||||||
if (task.status !== "running") continue
|
if (task.status !== "running") continue
|
||||||
if (!task.progress?.lastUpdate) continue
|
|
||||||
|
|
||||||
const startedAt = task.startedAt
|
const startedAt = task.startedAt
|
||||||
const sessionID = task.sessionID
|
const sessionID = task.sessionID
|
||||||
if (!startedAt || !sessionID) continue
|
if (!startedAt || !sessionID) continue
|
||||||
|
|
||||||
const runtime = now - startedAt.getTime()
|
const runtime = now - startedAt.getTime()
|
||||||
|
|
||||||
|
if (!task.progress?.lastUpdate) {
|
||||||
|
if (runtime <= messageStalenessMs) continue
|
||||||
|
|
||||||
|
const staleMinutes = Math.round(runtime / 60000)
|
||||||
|
task.status = "cancelled"
|
||||||
|
task.error = `Stale timeout (no activity for ${staleMinutes}min since start)`
|
||||||
|
task.completedAt = new Date()
|
||||||
|
|
||||||
|
if (task.concurrencyKey) {
|
||||||
|
concurrencyManager.release(task.concurrencyKey)
|
||||||
|
task.concurrencyKey = undefined
|
||||||
|
}
|
||||||
|
|
||||||
|
client.session.abort({ path: { id: sessionID } }).catch(() => {})
|
||||||
|
log(`[background-agent] Task ${task.id} interrupted: no progress since start`)
|
||||||
|
|
||||||
|
try {
|
||||||
|
await notifyParentSession(task)
|
||||||
|
} catch (err) {
|
||||||
|
log("[background-agent] Error in notifyParentSession for stale task:", { taskId: task.id, error: err })
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if (runtime < MIN_RUNTIME_BEFORE_STALE_MS) continue
|
if (runtime < MIN_RUNTIME_BEFORE_STALE_MS) continue
|
||||||
|
|
||||||
const timeSinceLastUpdate = now - task.progress.lastUpdate.getTime()
|
const timeSinceLastUpdate = now - task.progress.lastUpdate.getTime()
|
||||||
@ -92,10 +119,7 @@ export async function checkAndInterruptStaleTasks(args: {
|
|||||||
task.concurrencyKey = undefined
|
task.concurrencyKey = undefined
|
||||||
}
|
}
|
||||||
|
|
||||||
client.session.abort({
|
client.session.abort({ path: { id: sessionID } }).catch(() => {})
|
||||||
path: { id: sessionID },
|
|
||||||
}).catch(() => {})
|
|
||||||
|
|
||||||
log(`[background-agent] Task ${task.id} interrupted: stale timeout`)
|
log(`[background-agent] Task ${task.id} interrupted: stale timeout`)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user