diff --git a/src/config/schema/background-task.ts b/src/config/schema/background-task.ts index 6e6ad331..233fe286 100644 --- a/src/config/schema/background-task.ts +++ b/src/config/schema/background-task.ts @@ -6,6 +6,8 @@ export const BackgroundTaskConfigSchema = z.object({ modelConcurrency: z.record(z.string(), z.number().min(0)).optional(), /** Stale timeout in milliseconds - interrupt tasks with no activity for this duration (default: 180000 = 3 minutes, minimum: 60000 = 1 minute) */ staleTimeoutMs: z.number().min(60000).optional(), + /** Timeout for tasks that never received any progress update, falling back to startedAt (default: 600000 = 10 minutes, minimum: 60000 = 1 minute) */ + messageStalenessTimeoutMs: z.number().min(60000).optional(), }) export type BackgroundTaskConfig = z.infer diff --git a/src/features/background-agent/constants.ts b/src/features/background-agent/constants.ts index 99b4f298..6e985d6d 100644 --- a/src/features/background-agent/constants.ts +++ b/src/features/background-agent/constants.ts @@ -4,6 +4,7 @@ import type { BackgroundTask, LaunchInput } from "./types" export const TASK_TTL_MS = 30 * 60 * 1000 export const MIN_STABILITY_TIME_MS = 10 * 1000 export const DEFAULT_STALE_TIMEOUT_MS = 180_000 +export const DEFAULT_MESSAGE_STALENESS_TIMEOUT_MS = 600_000 export const MIN_RUNTIME_BEFORE_STALE_MS = 30_000 export const MIN_IDLE_TIME_MS = 5000 export const POLLING_INTERVAL_MS = 3000 diff --git a/src/features/background-agent/task-poller.test.ts b/src/features/background-agent/task-poller.test.ts new file mode 100644 index 00000000..29ff4744 --- /dev/null +++ b/src/features/background-agent/task-poller.test.ts @@ -0,0 +1,192 @@ +import { describe, it, expect, mock } from "bun:test" + +import { checkAndInterruptStaleTasks, pruneStaleTasksAndNotifications } from "./task-poller" +import type { BackgroundTask } from "./types" + +describe("checkAndInterruptStaleTasks", () => { + const mockClient = { + session: { + abort: mock(() => Promise.resolve()), + }, + } + const mockConcurrencyManager = { + release: mock(() => {}), + } + const mockNotify = mock(() => Promise.resolve()) + + function createRunningTask(overrides: Partial = {}): BackgroundTask { + return { + id: "task-1", + sessionID: "ses-1", + parentSessionID: "parent-ses-1", + parentMessageID: "msg-1", + description: "test", + prompt: "test", + agent: "explore", + status: "running", + startedAt: new Date(Date.now() - 120_000), + ...overrides, + } + } + + it("should interrupt tasks with lastUpdate exceeding stale timeout", async () => { + //#given + const task = createRunningTask({ + progress: { + toolCalls: 1, + lastUpdate: new Date(Date.now() - 200_000), + }, + }) + + //#when + await checkAndInterruptStaleTasks({ + tasks: [task], + client: mockClient as never, + config: { staleTimeoutMs: 180_000 }, + concurrencyManager: mockConcurrencyManager as never, + notifyParentSession: mockNotify, + }) + + //#then + expect(task.status).toBe("cancelled") + expect(task.error).toContain("Stale timeout") + }) + + it("should NOT interrupt tasks with recent lastUpdate", async () => { + //#given + const task = createRunningTask({ + progress: { + toolCalls: 1, + lastUpdate: new Date(Date.now() - 10_000), + }, + }) + + //#when + await checkAndInterruptStaleTasks({ + tasks: [task], + client: mockClient as never, + config: { staleTimeoutMs: 180_000 }, + concurrencyManager: mockConcurrencyManager as never, + notifyParentSession: mockNotify, + }) + + //#then + expect(task.status).toBe("running") + }) + + it("should interrupt tasks with NO progress.lastUpdate that exceeded messageStalenessTimeoutMs since startedAt", async () => { + //#given — task started 15 minutes ago, never received any progress update + const task = createRunningTask({ + startedAt: new Date(Date.now() - 15 * 60 * 1000), + progress: undefined, + }) + + //#when + await checkAndInterruptStaleTasks({ + tasks: [task], + client: mockClient as never, + config: { messageStalenessTimeoutMs: 600_000 }, + concurrencyManager: mockConcurrencyManager as never, + notifyParentSession: mockNotify, + }) + + //#then + expect(task.status).toBe("cancelled") + expect(task.error).toContain("no activity") + }) + + it("should NOT interrupt tasks with NO progress.lastUpdate that are within messageStalenessTimeoutMs", async () => { + //#given — task started 5 minutes ago, default timeout is 10 minutes + const task = createRunningTask({ + startedAt: new Date(Date.now() - 5 * 60 * 1000), + progress: undefined, + }) + + //#when + await checkAndInterruptStaleTasks({ + tasks: [task], + client: mockClient as never, + config: { messageStalenessTimeoutMs: 600_000 }, + concurrencyManager: mockConcurrencyManager as never, + notifyParentSession: mockNotify, + }) + + //#then + expect(task.status).toBe("running") + }) + + it("should use DEFAULT_MESSAGE_STALENESS_TIMEOUT_MS when messageStalenessTimeoutMs is not configured", async () => { + //#given — task started 15 minutes ago, no config for messageStalenessTimeoutMs + const task = createRunningTask({ + startedAt: new Date(Date.now() - 15 * 60 * 1000), + progress: undefined, + }) + + //#when — default is 10 minutes (600_000ms) + await checkAndInterruptStaleTasks({ + tasks: [task], + client: mockClient as never, + config: undefined, + concurrencyManager: mockConcurrencyManager as never, + notifyParentSession: mockNotify, + }) + + //#then + expect(task.status).toBe("cancelled") + expect(task.error).toContain("no activity") + }) + + it("should release concurrency key when interrupting a never-updated task", async () => { + //#given + const releaseMock = mock(() => {}) + const task = createRunningTask({ + startedAt: new Date(Date.now() - 15 * 60 * 1000), + progress: undefined, + concurrencyKey: "anthropic/claude-opus-4-6", + }) + + //#when + await checkAndInterruptStaleTasks({ + tasks: [task], + client: mockClient as never, + config: { messageStalenessTimeoutMs: 600_000 }, + concurrencyManager: { release: releaseMock } as never, + notifyParentSession: mockNotify, + }) + + //#then + expect(releaseMock).toHaveBeenCalledWith("anthropic/claude-opus-4-6") + expect(task.concurrencyKey).toBeUndefined() + }) +}) + +describe("pruneStaleTasksAndNotifications", () => { + it("should prune tasks that exceeded TTL", () => { + //#given + const tasks = new Map() + const oldTask: BackgroundTask = { + id: "old-task", + parentSessionID: "parent", + parentMessageID: "msg", + description: "old", + prompt: "old", + agent: "explore", + status: "running", + startedAt: new Date(Date.now() - 31 * 60 * 1000), + } + tasks.set("old-task", oldTask) + + const pruned: string[] = [] + const notifications = new Map() + + //#when + pruneStaleTasksAndNotifications({ + tasks, + notifications, + onTaskPruned: (taskId) => pruned.push(taskId), + }) + + //#then + expect(pruned).toContain("old-task") + }) +}) diff --git a/src/features/background-agent/task-poller.ts b/src/features/background-agent/task-poller.ts index a3b48b8c..a9f63a9a 100644 --- a/src/features/background-agent/task-poller.ts +++ b/src/features/background-agent/task-poller.ts @@ -6,6 +6,7 @@ import type { ConcurrencyManager } from "./concurrency" import type { OpencodeClient } from "./opencode-client" import { + DEFAULT_MESSAGE_STALENESS_TIMEOUT_MS, DEFAULT_STALE_TIMEOUT_MS, MIN_RUNTIME_BEFORE_STALE_MS, TASK_TTL_MS, @@ -67,15 +68,41 @@ export async function checkAndInterruptStaleTasks(args: { const staleTimeoutMs = config?.staleTimeoutMs ?? DEFAULT_STALE_TIMEOUT_MS const now = Date.now() + const messageStalenessMs = config?.messageStalenessTimeoutMs ?? DEFAULT_MESSAGE_STALENESS_TIMEOUT_MS + for (const task of tasks) { if (task.status !== "running") continue - if (!task.progress?.lastUpdate) continue const startedAt = task.startedAt const sessionID = task.sessionID if (!startedAt || !sessionID) continue const runtime = now - startedAt.getTime() + + if (!task.progress?.lastUpdate) { + if (runtime <= messageStalenessMs) continue + + const staleMinutes = Math.round(runtime / 60000) + task.status = "cancelled" + task.error = `Stale timeout (no activity for ${staleMinutes}min since start)` + task.completedAt = new Date() + + if (task.concurrencyKey) { + concurrencyManager.release(task.concurrencyKey) + task.concurrencyKey = undefined + } + + client.session.abort({ path: { id: sessionID } }).catch(() => {}) + log(`[background-agent] Task ${task.id} interrupted: no progress since start`) + + try { + await notifyParentSession(task) + } catch (err) { + log("[background-agent] Error in notifyParentSession for stale task:", { taskId: task.id, error: err }) + } + continue + } + if (runtime < MIN_RUNTIME_BEFORE_STALE_MS) continue const timeSinceLastUpdate = now - task.progress.lastUpdate.getTime() @@ -92,10 +119,7 @@ export async function checkAndInterruptStaleTasks(args: { task.concurrencyKey = undefined } - client.session.abort({ - path: { id: sessionID }, - }).catch(() => {}) - + client.session.abort({ path: { id: sessionID } }).catch(() => {}) log(`[background-agent] Task ${task.id} interrupted: stale timeout`) try {