Merge pull request #803 from GollyJer/concurrency-hardening

feat(concurrency): prevent background task races and leaks

Summary
Fixes race conditions and memory leaks in the background task system that could cause "all tasks complete" notifications to never fire, leaving parent sessions waiting indefinitely.

Why This Change?
When background tasks are tracked for completion notifications, the system maintains a pendingByParent map to know when all tasks for a parent session are done. Several edge cases caused "stale entries" to accumulate in this map:
1. Re-registering completed tasks added them back to pending tracking, but they'd never complete again
2. Changing a task's parent session left orphan entries in the old parent's tracking set
3. Concurrent task operations could cause double-acquisition of concurrency slots
These bugs meant the system would sometimes wait forever for tasks that were already done.

What Changed
- Concurrency management: Added proper acquire/release lifecycle with cleanup on process exit (SIGINT, SIGTERM)
- Parent session tracking: Fixed cleanup order. Now clears old parent's tracking before updating parent ID
- Stale entry prevention: Only tracks tasks that are actually running; actively cleans up completed tasks
- Renamed registerExternalTask → trackTask: Clearer name (the old name implied external API consumers, but it's internal)
This commit is contained in:
Jeremy Gollehon 2026-01-15 11:09:52 -08:00 committed by GitHub
commit 837176d947
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 635 additions and 71 deletions

View File

@ -349,3 +349,70 @@ describe("ConcurrencyManager.acquire/release", () => {
await waitPromise await waitPromise
}) })
}) })
describe("ConcurrencyManager.cleanup", () => {
test("cancelWaiters should reject all pending acquires", async () => {
// #given
const config: BackgroundTaskConfig = { defaultConcurrency: 1 }
const manager = new ConcurrencyManager(config)
await manager.acquire("model-a")
// Queue waiters
const errors: Error[] = []
const p1 = manager.acquire("model-a").catch(e => errors.push(e))
const p2 = manager.acquire("model-a").catch(e => errors.push(e))
// #when
manager.cancelWaiters("model-a")
await Promise.all([p1, p2])
// #then
expect(errors.length).toBe(2)
expect(errors[0].message).toContain("cancelled")
})
test("clear should cancel all models and reset state", async () => {
// #given
const config: BackgroundTaskConfig = { defaultConcurrency: 1 }
const manager = new ConcurrencyManager(config)
await manager.acquire("model-a")
await manager.acquire("model-b")
const errors: Error[] = []
const p1 = manager.acquire("model-a").catch(e => errors.push(e))
const p2 = manager.acquire("model-b").catch(e => errors.push(e))
// #when
manager.clear()
await Promise.all([p1, p2])
// #then
expect(errors.length).toBe(2)
expect(manager.getCount("model-a")).toBe(0)
expect(manager.getCount("model-b")).toBe(0)
})
test("getCount and getQueueLength should return correct values", async () => {
// #given
const config: BackgroundTaskConfig = { defaultConcurrency: 2 }
const manager = new ConcurrencyManager(config)
// #when
await manager.acquire("model-a")
expect(manager.getCount("model-a")).toBe(1)
expect(manager.getQueueLength("model-a")).toBe(0)
await manager.acquire("model-a")
expect(manager.getCount("model-a")).toBe(2)
// Queue one more
const p = manager.acquire("model-a").catch(() => {})
await Promise.resolve() // let it queue
expect(manager.getQueueLength("model-a")).toBe(1)
// Cleanup
manager.cancelWaiters("model-a")
await p
})
})

View File

@ -1,9 +1,21 @@
import type { BackgroundTaskConfig } from "../../config/schema" import type { BackgroundTaskConfig } from "../../config/schema"
/**
* Queue entry with settled-flag pattern to prevent double-resolution.
*
* The settled flag ensures that cancelWaiters() doesn't reject
* an entry that was already resolved by release().
*/
interface QueueEntry {
resolve: () => void
rawReject: (error: Error) => void
settled: boolean
}
export class ConcurrencyManager { export class ConcurrencyManager {
private config?: BackgroundTaskConfig private config?: BackgroundTaskConfig
private counts: Map<string, number> = new Map() private counts: Map<string, number> = new Map()
private queues: Map<string, Array<() => void>> = new Map() private queues: Map<string, QueueEntry[]> = new Map()
constructor(config?: BackgroundTaskConfig) { constructor(config?: BackgroundTaskConfig) {
this.config = config this.config = config
@ -38,9 +50,20 @@ export class ConcurrencyManager {
return return
} }
return new Promise<void>((resolve) => { return new Promise<void>((resolve, reject) => {
const queue = this.queues.get(model) ?? [] const queue = this.queues.get(model) ?? []
queue.push(resolve)
const entry: QueueEntry = {
resolve: () => {
if (entry.settled) return
entry.settled = true
resolve()
},
rawReject: reject,
settled: false,
}
queue.push(entry)
this.queues.set(model, queue) this.queues.set(model, queue)
}) })
} }
@ -52,15 +75,63 @@ export class ConcurrencyManager {
} }
const queue = this.queues.get(model) const queue = this.queues.get(model)
if (queue && queue.length > 0) {
// Try to hand off to a waiting entry (skip any settled entries from cancelWaiters)
while (queue && queue.length > 0) {
const next = queue.shift()! const next = queue.shift()!
this.counts.set(model, this.counts.get(model) ?? 0) if (!next.settled) {
next() // Hand off the slot to this waiter (count stays the same)
} else { next.resolve()
return
}
}
// No handoff occurred - decrement the count to free the slot
const current = this.counts.get(model) ?? 0 const current = this.counts.get(model) ?? 0
if (current > 0) { if (current > 0) {
this.counts.set(model, current - 1) this.counts.set(model, current - 1)
} }
} }
/**
* Cancel all waiting acquires for a model. Used during cleanup.
*/
cancelWaiters(model: string): void {
const queue = this.queues.get(model)
if (queue) {
for (const entry of queue) {
if (!entry.settled) {
entry.settled = true
entry.rawReject(new Error(`Concurrency queue cancelled for model: ${model}`))
}
}
this.queues.delete(model)
}
}
/**
* Clear all state. Used during manager cleanup/shutdown.
* Cancels all pending waiters.
*/
clear(): void {
for (const [model] of this.queues) {
this.cancelWaiters(model)
}
this.counts.clear()
this.queues.clear()
}
/**
* Get current count for a model (for testing/debugging)
*/
getCount(model: string): number {
return this.counts.get(model) ?? 0
}
/**
* Get queue length for a model (for testing/debugging)
*/
getQueueLength(model: string): number {
return this.queues.get(model)?.length ?? 0
} }
} }

View File

@ -1,5 +1,11 @@
import { describe, test, expect, beforeEach } from "bun:test" import { describe, test, expect, beforeEach } from "bun:test"
import { afterEach } from "bun:test"
import { tmpdir } from "node:os"
import type { PluginInput } from "@opencode-ai/plugin"
import type { BackgroundTask, ResumeInput } from "./types" import type { BackgroundTask, ResumeInput } from "./types"
import { BackgroundManager } from "./manager"
import { ConcurrencyManager } from "./concurrency"
const TASK_TTL_MS = 30 * 60 * 1000 const TASK_TTL_MS = 30 * 60 * 1000
@ -122,6 +128,10 @@ class MockBackgroundManager {
throw new Error(`Task not found for session: ${input.sessionId}`) throw new Error(`Task not found for session: ${input.sessionId}`)
} }
if (existingTask.status === "running") {
return existingTask
}
this.resumeCalls.push({ sessionId: input.sessionId, prompt: input.prompt }) this.resumeCalls.push({ sessionId: input.sessionId, prompt: input.prompt })
existingTask.status = "running" existingTask.status = "running"
@ -152,6 +162,44 @@ function createMockTask(overrides: Partial<BackgroundTask> & { id: string; sessi
} }
} }
function createBackgroundManager(): BackgroundManager {
const client = {
session: {
prompt: async () => ({}),
},
}
return new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput)
}
function getConcurrencyManager(manager: BackgroundManager): ConcurrencyManager {
return (manager as unknown as { concurrencyManager: ConcurrencyManager }).concurrencyManager
}
function getTaskMap(manager: BackgroundManager): Map<string, BackgroundTask> {
return (manager as unknown as { tasks: Map<string, BackgroundTask> }).tasks
}
function stubNotifyParentSession(manager: BackgroundManager): void {
(manager as unknown as { notifyParentSession: (task: BackgroundTask) => Promise<void> }).notifyParentSession = async () => {}
}
async function tryCompleteTaskForTest(manager: BackgroundManager, task: BackgroundTask): Promise<boolean> {
return (manager as unknown as { tryCompleteTask: (task: BackgroundTask, source: string) => Promise<boolean> }).tryCompleteTask(task, "test")
}
function getCleanupSignals(): Array<NodeJS.Signals | "beforeExit" | "exit"> {
const signals: Array<NodeJS.Signals | "beforeExit" | "exit"> = ["SIGINT", "SIGTERM", "beforeExit", "exit"]
if (process.platform === "win32") {
signals.push("SIGBREAK")
}
return signals
}
function getListenerCounts(signals: Array<NodeJS.Signals | "beforeExit" | "exit">): Record<string, number> {
return Object.fromEntries(signals.map((signal) => [signal, process.listenerCount(signal)]))
}
describe("BackgroundManager.getAllDescendantTasks", () => { describe("BackgroundManager.getAllDescendantTasks", () => {
let manager: MockBackgroundManager let manager: MockBackgroundManager
@ -572,6 +620,7 @@ describe("BackgroundManager.resume", () => {
parentSessionID: "old-parent", parentSessionID: "old-parent",
description: "original description", description: "original description",
agent: "explore", agent: "explore",
status: "completed",
}) })
manager.addTask(existingTask) manager.addTask(existingTask)
@ -598,6 +647,7 @@ describe("BackgroundManager.resume", () => {
id: "task-a", id: "task-a",
sessionID: "session-a", sessionID: "session-a",
parentSessionID: "session-parent", parentSessionID: "session-parent",
status: "completed",
}) })
manager.addTask(task) manager.addTask(task)
@ -623,6 +673,7 @@ describe("BackgroundManager.resume", () => {
id: "task-a", id: "task-a",
sessionID: "session-a", sessionID: "session-a",
parentSessionID: "session-parent", parentSessionID: "session-parent",
status: "completed",
}) })
taskWithProgress.progress = { taskWithProgress.progress = {
toolCalls: 42, toolCalls: 42,
@ -642,6 +693,29 @@ describe("BackgroundManager.resume", () => {
// #then // #then
expect(result.progress?.toolCalls).toBe(42) expect(result.progress?.toolCalls).toBe(42)
}) })
test("should ignore resume when task is already running", () => {
// #given
const runningTask = createMockTask({
id: "task-a",
sessionID: "session-a",
parentSessionID: "session-parent",
status: "running",
})
manager.addTask(runningTask)
// #when
const result = manager.resume({
sessionId: "session-a",
prompt: "resume should be ignored",
parentSessionID: "new-parent",
parentMessageID: "new-msg",
})
// #then
expect(result.parentSessionID).toBe("session-parent")
expect(manager.resumeCalls).toHaveLength(0)
})
}) })
describe("LaunchInput.skillContent", () => { describe("LaunchInput.skillContent", () => {
@ -813,3 +887,176 @@ function buildNotificationPromptBody(
return body return body
} }
describe("BackgroundManager.tryCompleteTask", () => {
let manager: BackgroundManager
beforeEach(() => {
// #given
manager = createBackgroundManager()
stubNotifyParentSession(manager)
})
afterEach(() => {
manager.shutdown()
})
test("should release concurrency and clear key on completion", async () => {
// #given
const concurrencyKey = "anthropic/claude-opus-4-5"
const concurrencyManager = getConcurrencyManager(manager)
await concurrencyManager.acquire(concurrencyKey)
const task: BackgroundTask = {
id: "task-1",
sessionID: "session-1",
parentSessionID: "session-parent",
parentMessageID: "msg-1",
description: "test task",
prompt: "test",
agent: "explore",
status: "running",
startedAt: new Date(),
concurrencyKey,
}
// #when
const completed = await tryCompleteTaskForTest(manager, task)
// #then
expect(completed).toBe(true)
expect(task.status).toBe("completed")
expect(task.concurrencyKey).toBeUndefined()
expect(concurrencyManager.getCount(concurrencyKey)).toBe(0)
})
test("should prevent double completion and double release", async () => {
// #given
const concurrencyKey = "anthropic/claude-opus-4-5"
const concurrencyManager = getConcurrencyManager(manager)
await concurrencyManager.acquire(concurrencyKey)
const task: BackgroundTask = {
id: "task-1",
sessionID: "session-1",
parentSessionID: "session-parent",
parentMessageID: "msg-1",
description: "test task",
prompt: "test",
agent: "explore",
status: "running",
startedAt: new Date(),
concurrencyKey,
}
// #when
await tryCompleteTaskForTest(manager, task)
const secondAttempt = await tryCompleteTaskForTest(manager, task)
// #then
expect(secondAttempt).toBe(false)
expect(task.status).toBe("completed")
expect(concurrencyManager.getCount(concurrencyKey)).toBe(0)
})
})
describe("BackgroundManager.trackTask", () => {
let manager: BackgroundManager
beforeEach(() => {
// #given
manager = createBackgroundManager()
stubNotifyParentSession(manager)
})
afterEach(() => {
manager.shutdown()
})
test("should not double acquire on duplicate registration", async () => {
// #given
const input = {
taskId: "task-1",
sessionID: "session-1",
parentSessionID: "parent-session",
description: "external task",
agent: "sisyphus_task",
concurrencyKey: "external-key",
}
// #when
await manager.trackTask(input)
await manager.trackTask(input)
// #then
const concurrencyManager = getConcurrencyManager(manager)
expect(concurrencyManager.getCount("external-key")).toBe(1)
expect(getTaskMap(manager).size).toBe(1)
})
})
describe("BackgroundManager.resume concurrency key", () => {
let manager: BackgroundManager
beforeEach(() => {
// #given
manager = createBackgroundManager()
stubNotifyParentSession(manager)
})
afterEach(() => {
manager.shutdown()
})
test("should re-acquire using external task concurrency key", async () => {
// #given
const task = await manager.trackTask({
taskId: "task-1",
sessionID: "session-1",
parentSessionID: "parent-session",
description: "external task",
agent: "sisyphus_task",
concurrencyKey: "external-key",
})
await tryCompleteTaskForTest(manager, task)
// #when
await manager.resume({
sessionId: "session-1",
prompt: "resume",
parentSessionID: "parent-session-2",
parentMessageID: "msg-2",
})
// #then
const concurrencyManager = getConcurrencyManager(manager)
expect(concurrencyManager.getCount("external-key")).toBe(1)
expect(task.concurrencyKey).toBe("external-key")
})
})
describe("BackgroundManager process cleanup", () => {
test("should remove listeners after last shutdown", () => {
// #given
const signals = getCleanupSignals()
const baseline = getListenerCounts(signals)
const managerA = createBackgroundManager()
const managerB = createBackgroundManager()
// #when
const afterCreate = getListenerCounts(signals)
managerA.shutdown()
const afterFirstShutdown = getListenerCounts(signals)
managerB.shutdown()
const afterSecondShutdown = getListenerCounts(signals)
// #then
for (const signal of signals) {
expect(afterCreate[signal]).toBe(baseline[signal] + 1)
expect(afterFirstShutdown[signal]).toBe(baseline[signal] + 1)
expect(afterSecondShutdown[signal]).toBe(baseline[signal])
}
})
})

View File

@ -18,8 +18,11 @@ import { join } from "node:path"
const TASK_TTL_MS = 30 * 60 * 1000 const TASK_TTL_MS = 30 * 60 * 1000
const MIN_STABILITY_TIME_MS = 10 * 1000 // Must run at least 10s before stability detection kicks in const MIN_STABILITY_TIME_MS = 10 * 1000 // Must run at least 10s before stability detection kicks in
type ProcessCleanupEvent = NodeJS.Signals | "beforeExit" | "exit"
type OpencodeClient = PluginInput["client"] type OpencodeClient = PluginInput["client"]
interface MessagePartInfo { interface MessagePartInfo {
sessionID?: string sessionID?: string
type?: string type?: string
@ -45,6 +48,10 @@ interface Todo {
} }
export class BackgroundManager { export class BackgroundManager {
private static cleanupManagers = new Set<BackgroundManager>()
private static cleanupRegistered = false
private static cleanupHandlers = new Map<ProcessCleanupEvent, () => void>()
private tasks: Map<string, BackgroundTask> private tasks: Map<string, BackgroundTask>
private notifications: Map<string, BackgroundTask[]> private notifications: Map<string, BackgroundTask[]>
private pendingByParent: Map<string, Set<string>> // Track pending tasks per parent for batching private pendingByParent: Map<string, Set<string>> // Track pending tasks per parent for batching
@ -52,6 +59,8 @@ export class BackgroundManager {
private directory: string private directory: string
private pollingInterval?: ReturnType<typeof setInterval> private pollingInterval?: ReturnType<typeof setInterval>
private concurrencyManager: ConcurrencyManager private concurrencyManager: ConcurrencyManager
private shutdownTriggered = false
constructor(ctx: PluginInput, config?: BackgroundTaskConfig) { constructor(ctx: PluginInput, config?: BackgroundTaskConfig) {
this.tasks = new Map() this.tasks = new Map()
@ -60,6 +69,7 @@ export class BackgroundManager {
this.client = ctx.client this.client = ctx.client
this.directory = ctx.directory this.directory = ctx.directory
this.concurrencyManager = new ConcurrencyManager(config) this.concurrencyManager = new ConcurrencyManager(config)
this.registerProcessCleanup()
} }
async launch(input: LaunchInput): Promise<BackgroundTask> { async launch(input: LaunchInput): Promise<BackgroundTask> {
@ -126,8 +136,10 @@ export class BackgroundManager {
parentAgent: input.parentAgent, parentAgent: input.parentAgent,
model: input.model, model: input.model,
concurrencyKey, concurrencyKey,
concurrencyGroup: concurrencyKey,
} }
this.tasks.set(task.id, task) this.tasks.set(task.id, task)
this.startPolling() this.startPolling()
@ -186,8 +198,9 @@ export class BackgroundManager {
existingTask.completedAt = new Date() existingTask.completedAt = new Date()
if (existingTask.concurrencyKey) { if (existingTask.concurrencyKey) {
this.concurrencyManager.release(existingTask.concurrencyKey) this.concurrencyManager.release(existingTask.concurrencyKey)
existingTask.concurrencyKey = undefined // Prevent double-release existingTask.concurrencyKey = undefined
} }
this.markForNotification(existingTask) this.markForNotification(existingTask)
this.notifyParentSession(existingTask).catch(err => { this.notifyParentSession(existingTask).catch(err => {
log("[background-agent] Failed to notify on error:", err) log("[background-agent] Failed to notify on error:", err)
@ -235,17 +248,60 @@ export class BackgroundManager {
} }
/** /**
* Register an external task (e.g., from sisyphus_task) for notification tracking. * Track a task created elsewhere (e.g., from sisyphus_task) for notification tracking.
* This allows tasks created by external tools to receive the same toast/prompt notifications. * This allows tasks created by other tools to receive the same toast/prompt notifications.
*/ */
registerExternalTask(input: { async trackTask(input: {
taskId: string taskId: string
sessionID: string sessionID: string
parentSessionID: string parentSessionID: string
description: string description: string
agent?: string agent?: string
parentAgent?: string parentAgent?: string
}): BackgroundTask { concurrencyKey?: string
}): Promise<BackgroundTask> {
const existingTask = this.tasks.get(input.taskId)
if (existingTask) {
// P2 fix: Clean up old parent's pending set BEFORE changing parent
// Otherwise cleanupPendingByParent would use the new parent ID
const parentChanged = input.parentSessionID !== existingTask.parentSessionID
if (parentChanged) {
this.cleanupPendingByParent(existingTask) // Clean from OLD parent
existingTask.parentSessionID = input.parentSessionID
}
if (input.parentAgent !== undefined) {
existingTask.parentAgent = input.parentAgent
}
if (!existingTask.concurrencyGroup) {
existingTask.concurrencyGroup = input.concurrencyKey ?? existingTask.agent
}
subagentSessions.add(existingTask.sessionID)
this.startPolling()
// Track for batched notifications only if task is still running
// Don't add stale entries for completed tasks
if (existingTask.status === "running") {
const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set()
pending.add(existingTask.id)
this.pendingByParent.set(input.parentSessionID, pending)
} else if (!parentChanged) {
// Only clean up if parent didn't change (already cleaned above if it did)
this.cleanupPendingByParent(existingTask)
}
log("[background-agent] External task already registered:", { taskId: existingTask.id, sessionID: existingTask.sessionID, status: existingTask.status })
return existingTask
}
const concurrencyGroup = input.concurrencyKey ?? input.agent ?? "sisyphus_task"
// Acquire concurrency slot if a key is provided
if (input.concurrencyKey) {
await this.concurrencyManager.acquire(input.concurrencyKey)
}
const task: BackgroundTask = { const task: BackgroundTask = {
id: input.taskId, id: input.taskId,
sessionID: input.sessionID, sessionID: input.sessionID,
@ -261,12 +317,15 @@ export class BackgroundManager {
lastUpdate: new Date(), lastUpdate: new Date(),
}, },
parentAgent: input.parentAgent, parentAgent: input.parentAgent,
concurrencyKey: input.concurrencyKey,
concurrencyGroup,
} }
this.tasks.set(task.id, task) this.tasks.set(task.id, task)
subagentSessions.add(input.sessionID) subagentSessions.add(input.sessionID)
this.startPolling() this.startPolling()
// Track for batched notifications (external tasks need tracking too) // Track for batched notifications (external tasks need tracking too)
const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set() const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set()
pending.add(task.id) pending.add(task.id)
@ -283,6 +342,21 @@ export class BackgroundManager {
throw new Error(`Task not found for session: ${input.sessionId}`) throw new Error(`Task not found for session: ${input.sessionId}`)
} }
if (existingTask.status === "running") {
log("[background-agent] Resume skipped - task already running:", {
taskId: existingTask.id,
sessionID: existingTask.sessionID,
})
return existingTask
}
// Re-acquire concurrency using the persisted concurrency group
const concurrencyKey = existingTask.concurrencyGroup ?? existingTask.agent
await this.concurrencyManager.acquire(concurrencyKey)
existingTask.concurrencyKey = concurrencyKey
existingTask.concurrencyGroup = concurrencyKey
existingTask.status = "running" existingTask.status = "running"
existingTask.completedAt = undefined existingTask.completedAt = undefined
existingTask.error = undefined existingTask.error = undefined
@ -344,10 +418,11 @@ export class BackgroundManager {
const errorMessage = error instanceof Error ? error.message : String(error) const errorMessage = error instanceof Error ? error.message : String(error)
existingTask.error = errorMessage existingTask.error = errorMessage
existingTask.completedAt = new Date() existingTask.completedAt = new Date()
// Release concurrency on resume error (matches launch error handler)
// Release concurrency on error to prevent slot leaks
if (existingTask.concurrencyKey) { if (existingTask.concurrencyKey) {
this.concurrencyManager.release(existingTask.concurrencyKey) this.concurrencyManager.release(existingTask.concurrencyKey)
existingTask.concurrencyKey = undefined // Prevent double-release existingTask.concurrencyKey = undefined
} }
this.markForNotification(existingTask) this.markForNotification(existingTask)
this.notifyParentSession(existingTask).catch(err => { this.notifyParentSession(existingTask).catch(err => {
@ -417,29 +492,31 @@ export class BackgroundManager {
// Edge guard: Verify session has actual assistant output before completing // Edge guard: Verify session has actual assistant output before completing
this.validateSessionHasOutput(sessionID).then(async (hasValidOutput) => { this.validateSessionHasOutput(sessionID).then(async (hasValidOutput) => {
// Re-check status after async operation (could have been completed by polling)
if (task.status !== "running") {
log("[background-agent] Task status changed during validation, skipping:", { taskId: task.id, status: task.status })
return
}
if (!hasValidOutput) { if (!hasValidOutput) {
log("[background-agent] Session.idle but no valid output yet, waiting:", task.id) log("[background-agent] Session.idle but no valid output yet, waiting:", task.id)
return return
} }
const hasIncompleteTodos = await this.checkSessionTodos(sessionID) const hasIncompleteTodos = await this.checkSessionTodos(sessionID)
// Re-check status after async operation again
if (task.status !== "running") {
log("[background-agent] Task status changed during todo check, skipping:", { taskId: task.id, status: task.status })
return
}
if (hasIncompleteTodos) { if (hasIncompleteTodos) {
log("[background-agent] Task has incomplete todos, waiting for todo-continuation:", task.id) log("[background-agent] Task has incomplete todos, waiting for todo-continuation:", task.id)
return return
} }
task.status = "completed" await this.tryCompleteTask(task, "session.idle event")
task.completedAt = new Date()
// Release concurrency immediately on completion
if (task.concurrencyKey) {
this.concurrencyManager.release(task.concurrencyKey)
task.concurrencyKey = undefined // Prevent double-release
}
// Clean up pendingByParent to prevent stale entries
this.cleanupPendingByParent(task)
this.markForNotification(task)
await this.notifyParentSession(task)
log("[background-agent] Task completed via session.idle event:", task.id)
}).catch(err => { }).catch(err => {
log("[background-agent] Error in session.idle handler:", err) log("[background-agent] Error in session.idle handler:", err)
}) })
@ -461,7 +538,7 @@ export class BackgroundManager {
if (task.concurrencyKey) { if (task.concurrencyKey) {
this.concurrencyManager.release(task.concurrencyKey) this.concurrencyManager.release(task.concurrencyKey)
task.concurrencyKey = undefined // Prevent double-release task.concurrencyKey = undefined
} }
// Clean up pendingByParent to prevent stale entries // Clean up pendingByParent to prevent stale entries
this.cleanupPendingByParent(task) this.cleanupPendingByParent(task)
@ -587,12 +664,48 @@ export class BackgroundManager {
} }
} }
cleanup(): void { private registerProcessCleanup(): void {
this.stopPolling() BackgroundManager.cleanupManagers.add(this)
this.tasks.clear()
this.notifications.clear() if (BackgroundManager.cleanupRegistered) return
this.pendingByParent.clear() BackgroundManager.cleanupRegistered = true
const cleanupAll = () => {
for (const manager of BackgroundManager.cleanupManagers) {
try {
manager.shutdown()
} catch (error) {
log("[background-agent] Error during shutdown cleanup:", error)
} }
}
}
const registerSignal = (signal: ProcessCleanupEvent, exitAfter: boolean): void => {
const listener = registerProcessSignal(signal, cleanupAll, exitAfter)
BackgroundManager.cleanupHandlers.set(signal, listener)
}
registerSignal("SIGINT", true)
registerSignal("SIGTERM", true)
if (process.platform === "win32") {
registerSignal("SIGBREAK", true)
}
registerSignal("beforeExit", false)
registerSignal("exit", false)
}
private unregisterProcessCleanup(): void {
BackgroundManager.cleanupManagers.delete(this)
if (BackgroundManager.cleanupManagers.size > 0) return
for (const [signal, listener] of BackgroundManager.cleanupHandlers.entries()) {
process.off(signal, listener)
}
BackgroundManager.cleanupHandlers.clear()
BackgroundManager.cleanupRegistered = false
}
/** /**
* Get all running tasks (for compaction hook) * Get all running tasks (for compaction hook)
@ -608,12 +721,44 @@ cleanup(): void {
return Array.from(this.tasks.values()).filter(t => t.status !== "running") return Array.from(this.tasks.values()).filter(t => t.status !== "running")
} }
private async notifyParentSession(task: BackgroundTask): Promise<void> { /**
* Safely complete a task with race condition protection.
* Returns true if task was successfully completed, false if already completed by another path.
*/
private async tryCompleteTask(task: BackgroundTask, source: string): Promise<boolean> {
// Guard: Check if task is still running (could have been completed by another path)
if (task.status !== "running") {
log("[background-agent] Task already completed, skipping:", { taskId: task.id, status: task.status, source })
return false
}
// Atomically mark as completed to prevent race conditions
task.status = "completed"
task.completedAt = new Date()
// Release concurrency BEFORE any async operations to prevent slot leaks
if (task.concurrencyKey) { if (task.concurrencyKey) {
this.concurrencyManager.release(task.concurrencyKey) this.concurrencyManager.release(task.concurrencyKey)
task.concurrencyKey = undefined task.concurrencyKey = undefined
} }
this.markForNotification(task)
try {
await this.notifyParentSession(task)
log(`[background-agent] Task completed via ${source}:`, task.id)
} catch (err) {
log("[background-agent] Error in notifyParentSession:", { taskId: task.id, error: err })
// Concurrency already released, notification failed but task is complete
}
return true
}
private async notifyParentSession(task: BackgroundTask): Promise<void> {
// Note: Callers must release concurrency before calling this method
// to ensure slots are freed even if notification fails
const duration = this.formatDuration(task.startedAt, task.completedAt) const duration = this.formatDuration(task.startedAt, task.completedAt)
log("[background-agent] notifyParentSession called for task:", task.id) log("[background-agent] notifyParentSession called for task:", task.id)
@ -727,10 +872,12 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
const taskId = task.id const taskId = task.id
setTimeout(() => { setTimeout(() => {
// Concurrency already released at completion - just cleanup notifications and task // Guard: Only delete if task still exists (could have been deleted by session.deleted event)
if (this.tasks.has(taskId)) {
this.clearNotificationsForTask(taskId) this.clearNotificationsForTask(taskId)
this.tasks.delete(taskId) this.tasks.delete(taskId)
log("[background-agent] Removed completed task from memory:", taskId) log("[background-agent] Removed completed task from memory:", taskId)
}
}, 5 * 60 * 1000) }, 5 * 60 * 1000)
} }
@ -767,7 +914,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
task.completedAt = new Date() task.completedAt = new Date()
if (task.concurrencyKey) { if (task.concurrencyKey) {
this.concurrencyManager.release(task.concurrencyKey) this.concurrencyManager.release(task.concurrencyKey)
task.concurrencyKey = undefined // Prevent double-release task.concurrencyKey = undefined
} }
// Clean up pendingByParent to prevent stale entries // Clean up pendingByParent to prevent stale entries
this.cleanupPendingByParent(task) this.cleanupPendingByParent(task)
@ -803,7 +950,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
for (const task of this.tasks.values()) { for (const task of this.tasks.values()) {
if (task.status !== "running") continue if (task.status !== "running") continue
try { try {
const sessionStatus = allStatuses[task.sessionID] const sessionStatus = allStatuses[task.sessionID]
// Don't skip if session not in status - fall through to message-based detection // Don't skip if session not in status - fall through to message-based detection
@ -815,24 +962,16 @@ try {
continue continue
} }
// Re-check status after async operation
if (task.status !== "running") continue
const hasIncompleteTodos = await this.checkSessionTodos(task.sessionID) const hasIncompleteTodos = await this.checkSessionTodos(task.sessionID)
if (hasIncompleteTodos) { if (hasIncompleteTodos) {
log("[background-agent] Task has incomplete todos via polling, waiting:", task.id) log("[background-agent] Task has incomplete todos via polling, waiting:", task.id)
continue continue
} }
task.status = "completed" await this.tryCompleteTask(task, "polling (idle status)")
task.completedAt = new Date()
// Release concurrency immediately on completion
if (task.concurrencyKey) {
this.concurrencyManager.release(task.concurrencyKey)
task.concurrencyKey = undefined // Prevent double-release
}
// Clean up pendingByParent to prevent stale entries
this.cleanupPendingByParent(task)
this.markForNotification(task)
await this.notifyParentSession(task)
log("[background-agent] Task completed via polling:", task.id)
continue continue
} }
@ -872,7 +1011,7 @@ try {
task.progress.toolCalls = toolCalls task.progress.toolCalls = toolCalls
task.progress.lastTool = lastTool task.progress.lastTool = lastTool
task.progress.lastUpdate = new Date() task.progress.lastUpdate = new Date()
if (lastMessage) { if (lastMessage) {
task.progress.lastMessage = lastMessage task.progress.lastMessage = lastMessage
task.progress.lastMessageAt = new Date() task.progress.lastMessageAt = new Date()
} }
@ -892,20 +1031,12 @@ if (lastMessage) {
continue continue
} }
// Re-check status after async operation
if (task.status !== "running") continue
const hasIncompleteTodos = await this.checkSessionTodos(task.sessionID) const hasIncompleteTodos = await this.checkSessionTodos(task.sessionID)
if (!hasIncompleteTodos) { if (!hasIncompleteTodos) {
task.status = "completed" await this.tryCompleteTask(task, "stability detection")
task.completedAt = new Date()
// Release concurrency immediately on completion
if (task.concurrencyKey) {
this.concurrencyManager.release(task.concurrencyKey)
task.concurrencyKey = undefined // Prevent double-release
}
// Clean up pendingByParent to prevent stale entries
this.cleanupPendingByParent(task)
this.markForNotification(task)
await this.notifyParentSession(task)
log("[background-agent] Task completed via stability detection:", task.id)
continue continue
} }
} }
@ -924,8 +1055,53 @@ if (lastMessage) {
this.stopPolling() this.stopPolling()
} }
} }
/**
* Shutdown the manager gracefully.
* Cancels all pending concurrency waiters and clears timers.
* Should be called when the plugin is unloaded.
*/
shutdown(): void {
if (this.shutdownTriggered) return
this.shutdownTriggered = true
log("[background-agent] Shutting down BackgroundManager")
this.stopPolling()
// Release concurrency for all running tasks first
for (const task of this.tasks.values()) {
if (task.concurrencyKey) {
this.concurrencyManager.release(task.concurrencyKey)
task.concurrencyKey = undefined
}
}
// Then clear all state (cancels any remaining waiters)
this.concurrencyManager.clear()
this.tasks.clear()
this.notifications.clear()
this.pendingByParent.clear()
this.unregisterProcessCleanup()
log("[background-agent] Shutdown complete")
}
} }
function registerProcessSignal(
signal: ProcessCleanupEvent,
handler: () => void,
exitAfter: boolean
): () => void {
const listener = () => {
handler()
if (exitAfter) {
process.exit(0)
}
}
process.on(signal, listener)
return listener
}
function getMessageDir(sessionID: string): string | null { function getMessageDir(sessionID: string): string | null {
if (!existsSync(MESSAGE_STORAGE)) return null if (!existsSync(MESSAGE_STORAGE)) return null

View File

@ -28,10 +28,13 @@ export interface BackgroundTask {
progress?: TaskProgress progress?: TaskProgress
parentModel?: { providerID: string; modelID: string } parentModel?: { providerID: string; modelID: string }
model?: { providerID: string; modelID: string; variant?: string } model?: { providerID: string; modelID: string; variant?: string }
/** Agent name used for concurrency tracking */ /** Active concurrency slot key */
concurrencyKey?: string concurrencyKey?: string
/** Persistent key for re-acquiring concurrency on resume */
concurrencyGroup?: string
/** Parent session's agent name for notification */ /** Parent session's agent name for notification */
parentAgent?: string parentAgent?: string
/** Last message count for stability detection */ /** Last message count for stability detection */
lastMsgCount?: number lastMsgCount?: number
/** Number of consecutive polls with stable message count */ /** Number of consecutive polls with stable message count */