fix: prevent stale timeout from killing actively running background tasks
The stale detection was checking lastUpdate timestamps BEFORE consulting session.status(), causing tasks to be unfairly killed after 3 minutes even when the session was actively running (e.g., during long tool executions or extended thinking). Changes: - Reorder pollRunningTasks to fetch session.status() before stale check - Skip stale-kill entirely when session status is 'running' - Port no-lastUpdate handling from task-poller.ts into manager.ts (previously manager silently skipped tasks without lastUpdate) - Add sessionStatuses parameter to checkAndInterruptStaleTasks - Add 7 new test cases covering session-status-aware stale detection
This commit is contained in:
parent
65a06aa2b7
commit
a0c9381672
@ -2293,6 +2293,217 @@ describe("BackgroundManager.checkAndInterruptStaleTasks", () => {
|
|||||||
|
|
||||||
expect(task.status).toBe("cancelled")
|
expect(task.status).toBe("cancelled")
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test("should NOT interrupt task when session is running, even with stale lastUpdate", async () => {
|
||||||
|
//#given
|
||||||
|
const client = {
|
||||||
|
session: {
|
||||||
|
prompt: async () => ({}),
|
||||||
|
promptAsync: async () => ({}),
|
||||||
|
abort: async () => ({}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
const manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput, { staleTimeoutMs: 180_000 })
|
||||||
|
|
||||||
|
const task: BackgroundTask = {
|
||||||
|
id: "task-running-session",
|
||||||
|
sessionID: "session-running",
|
||||||
|
parentSessionID: "parent-rs",
|
||||||
|
parentMessageID: "msg-rs",
|
||||||
|
description: "Task with running session",
|
||||||
|
prompt: "Test",
|
||||||
|
agent: "test-agent",
|
||||||
|
status: "running",
|
||||||
|
startedAt: new Date(Date.now() - 300_000),
|
||||||
|
progress: {
|
||||||
|
toolCalls: 2,
|
||||||
|
lastUpdate: new Date(Date.now() - 300_000),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
getTaskMap(manager).set(task.id, task)
|
||||||
|
|
||||||
|
//#when — session is actively running
|
||||||
|
await manager["checkAndInterruptStaleTasks"]({ "session-running": { type: "running" } })
|
||||||
|
|
||||||
|
//#then — task survives because session is running
|
||||||
|
expect(task.status).toBe("running")
|
||||||
|
})
|
||||||
|
|
||||||
|
test("should interrupt task when session is idle and lastUpdate exceeds stale timeout", async () => {
|
||||||
|
//#given
|
||||||
|
const client = {
|
||||||
|
session: {
|
||||||
|
prompt: async () => ({}),
|
||||||
|
promptAsync: async () => ({}),
|
||||||
|
abort: async () => ({}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
const manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput, { staleTimeoutMs: 180_000 })
|
||||||
|
stubNotifyParentSession(manager)
|
||||||
|
|
||||||
|
const task: BackgroundTask = {
|
||||||
|
id: "task-idle-session",
|
||||||
|
sessionID: "session-idle",
|
||||||
|
parentSessionID: "parent-is",
|
||||||
|
parentMessageID: "msg-is",
|
||||||
|
description: "Task with idle session",
|
||||||
|
prompt: "Test",
|
||||||
|
agent: "test-agent",
|
||||||
|
status: "running",
|
||||||
|
startedAt: new Date(Date.now() - 300_000),
|
||||||
|
progress: {
|
||||||
|
toolCalls: 2,
|
||||||
|
lastUpdate: new Date(Date.now() - 300_000),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
getTaskMap(manager).set(task.id, task)
|
||||||
|
|
||||||
|
//#when — session is idle
|
||||||
|
await manager["checkAndInterruptStaleTasks"]({ "session-idle": { type: "idle" } })
|
||||||
|
|
||||||
|
//#then — killed because session is idle with stale lastUpdate
|
||||||
|
expect(task.status).toBe("cancelled")
|
||||||
|
expect(task.error).toContain("Stale timeout")
|
||||||
|
})
|
||||||
|
|
||||||
|
test("should NOT interrupt running session even with very old lastUpdate (no safety net)", async () => {
|
||||||
|
//#given
|
||||||
|
const client = {
|
||||||
|
session: {
|
||||||
|
prompt: async () => ({}),
|
||||||
|
promptAsync: async () => ({}),
|
||||||
|
abort: async () => ({}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
const manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput, { staleTimeoutMs: 180_000 })
|
||||||
|
|
||||||
|
const task: BackgroundTask = {
|
||||||
|
id: "task-long-running",
|
||||||
|
sessionID: "session-long",
|
||||||
|
parentSessionID: "parent-lr",
|
||||||
|
parentMessageID: "msg-lr",
|
||||||
|
description: "Long running task",
|
||||||
|
prompt: "Test",
|
||||||
|
agent: "test-agent",
|
||||||
|
status: "running",
|
||||||
|
startedAt: new Date(Date.now() - 900_000),
|
||||||
|
progress: {
|
||||||
|
toolCalls: 5,
|
||||||
|
lastUpdate: new Date(Date.now() - 900_000),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
getTaskMap(manager).set(task.id, task)
|
||||||
|
|
||||||
|
//#when — session is running, lastUpdate 15min old
|
||||||
|
await manager["checkAndInterruptStaleTasks"]({ "session-long": { type: "running" } })
|
||||||
|
|
||||||
|
//#then — running sessions are NEVER stale-killed
|
||||||
|
expect(task.status).toBe("running")
|
||||||
|
})
|
||||||
|
|
||||||
|
test("should NOT interrupt running session with no progress (undefined lastUpdate)", async () => {
|
||||||
|
//#given — no progress at all, but session is running
|
||||||
|
const client = {
|
||||||
|
session: {
|
||||||
|
prompt: async () => ({}),
|
||||||
|
promptAsync: async () => ({}),
|
||||||
|
abort: async () => ({}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
const manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput, { messageStalenessTimeoutMs: 600_000 })
|
||||||
|
|
||||||
|
const task: BackgroundTask = {
|
||||||
|
id: "task-running-no-progress",
|
||||||
|
sessionID: "session-rnp",
|
||||||
|
parentSessionID: "parent-rnp",
|
||||||
|
parentMessageID: "msg-rnp",
|
||||||
|
description: "Running no progress",
|
||||||
|
prompt: "Test",
|
||||||
|
agent: "test-agent",
|
||||||
|
status: "running",
|
||||||
|
startedAt: new Date(Date.now() - 15 * 60 * 1000),
|
||||||
|
progress: undefined,
|
||||||
|
}
|
||||||
|
|
||||||
|
getTaskMap(manager).set(task.id, task)
|
||||||
|
|
||||||
|
//#when — session is running despite no progress
|
||||||
|
await manager["checkAndInterruptStaleTasks"]({ "session-rnp": { type: "running" } })
|
||||||
|
|
||||||
|
//#then — running sessions are NEVER killed
|
||||||
|
expect(task.status).toBe("running")
|
||||||
|
})
|
||||||
|
|
||||||
|
test("should interrupt task with no lastUpdate after messageStalenessTimeout", async () => {
|
||||||
|
//#given
|
||||||
|
const client = {
|
||||||
|
session: {
|
||||||
|
prompt: async () => ({}),
|
||||||
|
promptAsync: async () => ({}),
|
||||||
|
abort: async () => ({}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
const manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput, { messageStalenessTimeoutMs: 600_000 })
|
||||||
|
stubNotifyParentSession(manager)
|
||||||
|
|
||||||
|
const task: BackgroundTask = {
|
||||||
|
id: "task-no-update",
|
||||||
|
sessionID: "session-no-update",
|
||||||
|
parentSessionID: "parent-nu",
|
||||||
|
parentMessageID: "msg-nu",
|
||||||
|
description: "No update task",
|
||||||
|
prompt: "Test",
|
||||||
|
agent: "test-agent",
|
||||||
|
status: "running",
|
||||||
|
startedAt: new Date(Date.now() - 15 * 60 * 1000),
|
||||||
|
progress: undefined,
|
||||||
|
}
|
||||||
|
|
||||||
|
getTaskMap(manager).set(task.id, task)
|
||||||
|
|
||||||
|
//#when — no progress update for 15 minutes
|
||||||
|
await manager["checkAndInterruptStaleTasks"]({})
|
||||||
|
|
||||||
|
//#then — killed after messageStalenessTimeout
|
||||||
|
expect(task.status).toBe("cancelled")
|
||||||
|
expect(task.error).toContain("no activity")
|
||||||
|
})
|
||||||
|
|
||||||
|
test("should NOT interrupt task with no lastUpdate within messageStalenessTimeout", async () => {
|
||||||
|
//#given
|
||||||
|
const client = {
|
||||||
|
session: {
|
||||||
|
prompt: async () => ({}),
|
||||||
|
promptAsync: async () => ({}),
|
||||||
|
abort: async () => ({}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
const manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput, { messageStalenessTimeoutMs: 600_000 })
|
||||||
|
|
||||||
|
const task: BackgroundTask = {
|
||||||
|
id: "task-fresh-no-update",
|
||||||
|
sessionID: "session-fresh",
|
||||||
|
parentSessionID: "parent-fn",
|
||||||
|
parentMessageID: "msg-fn",
|
||||||
|
description: "Fresh no-update task",
|
||||||
|
prompt: "Test",
|
||||||
|
agent: "test-agent",
|
||||||
|
status: "running",
|
||||||
|
startedAt: new Date(Date.now() - 5 * 60 * 1000),
|
||||||
|
progress: undefined,
|
||||||
|
}
|
||||||
|
|
||||||
|
getTaskMap(manager).set(task.id, task)
|
||||||
|
|
||||||
|
//#when — only 5 min since start, within 10min timeout
|
||||||
|
await manager["checkAndInterruptStaleTasks"]({})
|
||||||
|
|
||||||
|
//#then — task survives
|
||||||
|
expect(task.status).toBe("running")
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
describe("BackgroundManager.shutdown session abort", () => {
|
describe("BackgroundManager.shutdown session abort", () => {
|
||||||
|
|||||||
@ -12,6 +12,7 @@ import { ConcurrencyManager } from "./concurrency"
|
|||||||
import type { BackgroundTaskConfig, TmuxConfig } from "../../config/schema"
|
import type { BackgroundTaskConfig, TmuxConfig } from "../../config/schema"
|
||||||
import { isInsideTmux } from "../../shared/tmux"
|
import { isInsideTmux } from "../../shared/tmux"
|
||||||
import {
|
import {
|
||||||
|
DEFAULT_MESSAGE_STALENESS_TIMEOUT_MS,
|
||||||
DEFAULT_STALE_TIMEOUT_MS,
|
DEFAULT_STALE_TIMEOUT_MS,
|
||||||
MIN_IDLE_TIME_MS,
|
MIN_IDLE_TIME_MS,
|
||||||
MIN_RUNTIME_BEFORE_STALE_MS,
|
MIN_RUNTIME_BEFORE_STALE_MS,
|
||||||
@ -1437,24 +1438,54 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async checkAndInterruptStaleTasks(): Promise<void> {
|
private async checkAndInterruptStaleTasks(
|
||||||
|
allStatuses: Record<string, { type: string }> = {},
|
||||||
|
): Promise<void> {
|
||||||
const staleTimeoutMs = this.config?.staleTimeoutMs ?? DEFAULT_STALE_TIMEOUT_MS
|
const staleTimeoutMs = this.config?.staleTimeoutMs ?? DEFAULT_STALE_TIMEOUT_MS
|
||||||
|
const messageStalenessMs = this.config?.messageStalenessTimeoutMs ?? DEFAULT_MESSAGE_STALENESS_TIMEOUT_MS
|
||||||
const now = Date.now()
|
const now = Date.now()
|
||||||
|
|
||||||
for (const task of this.tasks.values()) {
|
for (const task of this.tasks.values()) {
|
||||||
if (task.status !== "running") continue
|
if (task.status !== "running") continue
|
||||||
if (!task.progress?.lastUpdate) continue
|
|
||||||
|
|
||||||
const startedAt = task.startedAt
|
const startedAt = task.startedAt
|
||||||
const sessionID = task.sessionID
|
const sessionID = task.sessionID
|
||||||
if (!startedAt || !sessionID) continue
|
if (!startedAt || !sessionID) continue
|
||||||
|
|
||||||
|
const sessionIsRunning = allStatuses[sessionID]?.type === "running"
|
||||||
const runtime = now - startedAt.getTime()
|
const runtime = now - startedAt.getTime()
|
||||||
|
|
||||||
|
if (!task.progress?.lastUpdate) {
|
||||||
|
if (sessionIsRunning) continue
|
||||||
|
if (runtime <= messageStalenessMs) continue
|
||||||
|
|
||||||
|
const staleMinutes = Math.round(runtime / 60000)
|
||||||
|
task.status = "cancelled"
|
||||||
|
task.error = `Stale timeout (no activity for ${staleMinutes}min since start)`
|
||||||
|
task.completedAt = new Date()
|
||||||
|
|
||||||
|
if (task.concurrencyKey) {
|
||||||
|
this.concurrencyManager.release(task.concurrencyKey)
|
||||||
|
task.concurrencyKey = undefined
|
||||||
|
}
|
||||||
|
|
||||||
|
this.client.session.abort({ path: { id: sessionID } }).catch(() => {})
|
||||||
|
log(`[background-agent] Task ${task.id} interrupted: no progress since start`)
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.enqueueNotificationForParent(task.parentSessionID, () => this.notifyParentSession(task))
|
||||||
|
} catch (err) {
|
||||||
|
log("[background-agent] Error in notifyParentSession for stale task:", { taskId: task.id, error: err })
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sessionIsRunning) continue
|
||||||
|
|
||||||
if (runtime < MIN_RUNTIME_BEFORE_STALE_MS) continue
|
if (runtime < MIN_RUNTIME_BEFORE_STALE_MS) continue
|
||||||
|
|
||||||
const timeSinceLastUpdate = now - task.progress.lastUpdate.getTime()
|
const timeSinceLastUpdate = now - task.progress.lastUpdate.getTime()
|
||||||
if (timeSinceLastUpdate <= staleTimeoutMs) continue
|
if (timeSinceLastUpdate <= staleTimeoutMs) continue
|
||||||
|
|
||||||
if (task.status !== "running") continue
|
if (task.status !== "running") continue
|
||||||
|
|
||||||
const staleMinutes = Math.round(timeSinceLastUpdate / 60000)
|
const staleMinutes = Math.round(timeSinceLastUpdate / 60000)
|
||||||
@ -1467,10 +1498,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
|||||||
task.concurrencyKey = undefined
|
task.concurrencyKey = undefined
|
||||||
}
|
}
|
||||||
|
|
||||||
this.client.session.abort({
|
this.client.session.abort({ path: { id: sessionID } }).catch(() => {})
|
||||||
path: { id: sessionID },
|
|
||||||
}).catch(() => {})
|
|
||||||
|
|
||||||
log(`[background-agent] Task ${task.id} interrupted: stale timeout`)
|
log(`[background-agent] Task ${task.id} interrupted: stale timeout`)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -1483,11 +1511,12 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
|||||||
|
|
||||||
private async pollRunningTasks(): Promise<void> {
|
private async pollRunningTasks(): Promise<void> {
|
||||||
this.pruneStaleTasksAndNotifications()
|
this.pruneStaleTasksAndNotifications()
|
||||||
await this.checkAndInterruptStaleTasks()
|
|
||||||
|
|
||||||
const statusResult = await this.client.session.status()
|
const statusResult = await this.client.session.status()
|
||||||
const allStatuses = (statusResult.data ?? {}) as Record<string, { type: string }>
|
const allStatuses = (statusResult.data ?? {}) as Record<string, { type: string }>
|
||||||
|
|
||||||
|
await this.checkAndInterruptStaleTasks(allStatuses)
|
||||||
|
|
||||||
for (const task of this.tasks.values()) {
|
for (const task of this.tasks.values()) {
|
||||||
if (task.status !== "running") continue
|
if (task.status !== "running") continue
|
||||||
|
|
||||||
@ -1497,7 +1526,6 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
|||||||
try {
|
try {
|
||||||
const sessionStatus = allStatuses[sessionID]
|
const sessionStatus = allStatuses[sessionID]
|
||||||
|
|
||||||
// Don't skip if session not in status - fall through to message-based detection
|
|
||||||
if (sessionStatus?.type === "idle") {
|
if (sessionStatus?.type === "idle") {
|
||||||
// Edge guard: Validate session has actual output before completing
|
// Edge guard: Validate session has actual output before completing
|
||||||
const hasValidOutput = await this.validateSessionHasOutput(sessionID)
|
const hasValidOutput = await this.validateSessionHasOutput(sessionID)
|
||||||
|
|||||||
@ -34,7 +34,7 @@ export async function pollRunningTasks(args: {
|
|||||||
tasks: Iterable<BackgroundTask>
|
tasks: Iterable<BackgroundTask>
|
||||||
client: OpencodeClient
|
client: OpencodeClient
|
||||||
pruneStaleTasksAndNotifications: () => void
|
pruneStaleTasksAndNotifications: () => void
|
||||||
checkAndInterruptStaleTasks: () => Promise<void>
|
checkAndInterruptStaleTasks: (statuses: Record<string, { type: string }>) => Promise<void>
|
||||||
validateSessionHasOutput: (sessionID: string) => Promise<boolean>
|
validateSessionHasOutput: (sessionID: string) => Promise<boolean>
|
||||||
checkSessionTodos: (sessionID: string) => Promise<boolean>
|
checkSessionTodos: (sessionID: string) => Promise<boolean>
|
||||||
tryCompleteTask: (task: BackgroundTask, source: string) => Promise<boolean>
|
tryCompleteTask: (task: BackgroundTask, source: string) => Promise<boolean>
|
||||||
@ -54,11 +54,12 @@ export async function pollRunningTasks(args: {
|
|||||||
} = args
|
} = args
|
||||||
|
|
||||||
pruneStaleTasksAndNotifications()
|
pruneStaleTasksAndNotifications()
|
||||||
await checkAndInterruptStaleTasks()
|
|
||||||
|
|
||||||
const statusResult = await client.session.status()
|
const statusResult = await client.session.status()
|
||||||
const allStatuses = ((statusResult as { data?: unknown }).data ?? {}) as SessionStatusMap
|
const allStatuses = ((statusResult as { data?: unknown }).data ?? {}) as SessionStatusMap
|
||||||
|
|
||||||
|
await checkAndInterruptStaleTasks(allStatuses)
|
||||||
|
|
||||||
for (const task of tasks) {
|
for (const task of tasks) {
|
||||||
if (task.status !== "running") continue
|
if (task.status !== "running") continue
|
||||||
|
|
||||||
|
|||||||
@ -136,6 +136,125 @@ describe("checkAndInterruptStaleTasks", () => {
|
|||||||
expect(task.error).toContain("no activity")
|
expect(task.error).toContain("no activity")
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it("should NOT interrupt task when session is running, even if lastUpdate exceeds stale timeout", async () => {
|
||||||
|
//#given — lastUpdate is 5min old but session is actively running
|
||||||
|
const task = createRunningTask({
|
||||||
|
startedAt: new Date(Date.now() - 300_000),
|
||||||
|
progress: {
|
||||||
|
toolCalls: 2,
|
||||||
|
lastUpdate: new Date(Date.now() - 300_000),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
//#when — session status is "running"
|
||||||
|
await checkAndInterruptStaleTasks({
|
||||||
|
tasks: [task],
|
||||||
|
client: mockClient as never,
|
||||||
|
config: { staleTimeoutMs: 180_000 },
|
||||||
|
concurrencyManager: mockConcurrencyManager as never,
|
||||||
|
notifyParentSession: mockNotify,
|
||||||
|
sessionStatuses: { "ses-1": { type: "running" } },
|
||||||
|
})
|
||||||
|
|
||||||
|
//#then — task should survive because session is actively running
|
||||||
|
expect(task.status).toBe("running")
|
||||||
|
})
|
||||||
|
|
||||||
|
it("should interrupt task when session is idle and lastUpdate exceeds stale timeout", async () => {
|
||||||
|
//#given — lastUpdate is 5min old and session is idle
|
||||||
|
const task = createRunningTask({
|
||||||
|
startedAt: new Date(Date.now() - 300_000),
|
||||||
|
progress: {
|
||||||
|
toolCalls: 2,
|
||||||
|
lastUpdate: new Date(Date.now() - 300_000),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
//#when — session status is "idle"
|
||||||
|
await checkAndInterruptStaleTasks({
|
||||||
|
tasks: [task],
|
||||||
|
client: mockClient as never,
|
||||||
|
config: { staleTimeoutMs: 180_000 },
|
||||||
|
concurrencyManager: mockConcurrencyManager as never,
|
||||||
|
notifyParentSession: mockNotify,
|
||||||
|
sessionStatuses: { "ses-1": { type: "idle" } },
|
||||||
|
})
|
||||||
|
|
||||||
|
//#then — task should be killed because session is idle with stale lastUpdate
|
||||||
|
expect(task.status).toBe("cancelled")
|
||||||
|
expect(task.error).toContain("Stale timeout")
|
||||||
|
})
|
||||||
|
|
||||||
|
it("should NOT interrupt running session task even with very old lastUpdate", async () => {
|
||||||
|
//#given — lastUpdate is 15min old, but session is still running
|
||||||
|
const task = createRunningTask({
|
||||||
|
startedAt: new Date(Date.now() - 900_000),
|
||||||
|
progress: {
|
||||||
|
toolCalls: 2,
|
||||||
|
lastUpdate: new Date(Date.now() - 900_000),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
//#when — session running, lastUpdate far exceeds any timeout
|
||||||
|
await checkAndInterruptStaleTasks({
|
||||||
|
tasks: [task],
|
||||||
|
client: mockClient as never,
|
||||||
|
config: { staleTimeoutMs: 180_000, messageStalenessTimeoutMs: 600_000 },
|
||||||
|
concurrencyManager: mockConcurrencyManager as never,
|
||||||
|
notifyParentSession: mockNotify,
|
||||||
|
sessionStatuses: { "ses-1": { type: "running" } },
|
||||||
|
})
|
||||||
|
|
||||||
|
//#then — running sessions are NEVER stale-killed (babysitter + TTL prune handle these)
|
||||||
|
expect(task.status).toBe("running")
|
||||||
|
})
|
||||||
|
|
||||||
|
it("should NOT interrupt running session even with no progress (undefined lastUpdate)", async () => {
|
||||||
|
//#given — task has no progress at all, but session is running
|
||||||
|
const task = createRunningTask({
|
||||||
|
startedAt: new Date(Date.now() - 15 * 60 * 1000),
|
||||||
|
progress: undefined,
|
||||||
|
})
|
||||||
|
|
||||||
|
//#when — session is running
|
||||||
|
await checkAndInterruptStaleTasks({
|
||||||
|
tasks: [task],
|
||||||
|
client: mockClient as never,
|
||||||
|
config: { messageStalenessTimeoutMs: 600_000 },
|
||||||
|
concurrencyManager: mockConcurrencyManager as never,
|
||||||
|
notifyParentSession: mockNotify,
|
||||||
|
sessionStatuses: { "ses-1": { type: "running" } },
|
||||||
|
})
|
||||||
|
|
||||||
|
//#then — running sessions are NEVER killed, even without progress
|
||||||
|
expect(task.status).toBe("running")
|
||||||
|
})
|
||||||
|
|
||||||
|
it("should use default stale timeout when session status is unknown/missing", async () => {
|
||||||
|
//#given — lastUpdate exceeds stale timeout, session not in status map
|
||||||
|
const task = createRunningTask({
|
||||||
|
startedAt: new Date(Date.now() - 300_000),
|
||||||
|
progress: {
|
||||||
|
toolCalls: 1,
|
||||||
|
lastUpdate: new Date(Date.now() - 200_000),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
//#when — empty sessionStatuses (session not found)
|
||||||
|
await checkAndInterruptStaleTasks({
|
||||||
|
tasks: [task],
|
||||||
|
client: mockClient as never,
|
||||||
|
config: { staleTimeoutMs: 180_000 },
|
||||||
|
concurrencyManager: mockConcurrencyManager as never,
|
||||||
|
notifyParentSession: mockNotify,
|
||||||
|
sessionStatuses: {},
|
||||||
|
})
|
||||||
|
|
||||||
|
//#then — unknown session treated as potentially stale, apply default timeout
|
||||||
|
expect(task.status).toBe("cancelled")
|
||||||
|
expect(task.error).toContain("Stale timeout")
|
||||||
|
})
|
||||||
|
|
||||||
it("should release concurrency key when interrupting a never-updated task", async () => {
|
it("should release concurrency key when interrupting a never-updated task", async () => {
|
||||||
//#given
|
//#given
|
||||||
const releaseMock = mock(() => {})
|
const releaseMock = mock(() => {})
|
||||||
|
|||||||
@ -57,14 +57,17 @@ export function pruneStaleTasksAndNotifications(args: {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type SessionStatusMap = Record<string, { type: string }>
|
||||||
|
|
||||||
export async function checkAndInterruptStaleTasks(args: {
|
export async function checkAndInterruptStaleTasks(args: {
|
||||||
tasks: Iterable<BackgroundTask>
|
tasks: Iterable<BackgroundTask>
|
||||||
client: OpencodeClient
|
client: OpencodeClient
|
||||||
config: BackgroundTaskConfig | undefined
|
config: BackgroundTaskConfig | undefined
|
||||||
concurrencyManager: ConcurrencyManager
|
concurrencyManager: ConcurrencyManager
|
||||||
notifyParentSession: (task: BackgroundTask) => Promise<void>
|
notifyParentSession: (task: BackgroundTask) => Promise<void>
|
||||||
|
sessionStatuses?: SessionStatusMap
|
||||||
}): Promise<void> {
|
}): Promise<void> {
|
||||||
const { tasks, client, config, concurrencyManager, notifyParentSession } = args
|
const { tasks, client, config, concurrencyManager, notifyParentSession, sessionStatuses } = args
|
||||||
const staleTimeoutMs = config?.staleTimeoutMs ?? DEFAULT_STALE_TIMEOUT_MS
|
const staleTimeoutMs = config?.staleTimeoutMs ?? DEFAULT_STALE_TIMEOUT_MS
|
||||||
const now = Date.now()
|
const now = Date.now()
|
||||||
|
|
||||||
@ -77,9 +80,11 @@ export async function checkAndInterruptStaleTasks(args: {
|
|||||||
const sessionID = task.sessionID
|
const sessionID = task.sessionID
|
||||||
if (!startedAt || !sessionID) continue
|
if (!startedAt || !sessionID) continue
|
||||||
|
|
||||||
|
const sessionIsRunning = sessionStatuses?.[sessionID]?.type === "running"
|
||||||
const runtime = now - startedAt.getTime()
|
const runtime = now - startedAt.getTime()
|
||||||
|
|
||||||
if (!task.progress?.lastUpdate) {
|
if (!task.progress?.lastUpdate) {
|
||||||
|
if (sessionIsRunning) continue
|
||||||
if (runtime <= messageStalenessMs) continue
|
if (runtime <= messageStalenessMs) continue
|
||||||
|
|
||||||
const staleMinutes = Math.round(runtime / 60000)
|
const staleMinutes = Math.round(runtime / 60000)
|
||||||
@ -103,6 +108,8 @@ export async function checkAndInterruptStaleTasks(args: {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (sessionIsRunning) continue
|
||||||
|
|
||||||
if (runtime < MIN_RUNTIME_BEFORE_STALE_MS) continue
|
if (runtime < MIN_RUNTIME_BEFORE_STALE_MS) continue
|
||||||
|
|
||||||
const timeSinceLastUpdate = now - task.progress.lastUpdate.getTime()
|
const timeSinceLastUpdate = now - task.progress.lastUpdate.getTime()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user