diff --git a/src/features/background-agent/concurrency.test.ts b/src/features/background-agent/concurrency.test.ts index 677440e4..c7128fa6 100644 --- a/src/features/background-agent/concurrency.test.ts +++ b/src/features/background-agent/concurrency.test.ts @@ -349,3 +349,70 @@ describe("ConcurrencyManager.acquire/release", () => { await waitPromise }) }) + +describe("ConcurrencyManager.cleanup", () => { + test("cancelWaiters should reject all pending acquires", async () => { + // #given + const config: BackgroundTaskConfig = { defaultConcurrency: 1 } + const manager = new ConcurrencyManager(config) + await manager.acquire("model-a") + + // Queue waiters + const errors: Error[] = [] + const p1 = manager.acquire("model-a").catch(e => errors.push(e)) + const p2 = manager.acquire("model-a").catch(e => errors.push(e)) + + // #when + manager.cancelWaiters("model-a") + await Promise.all([p1, p2]) + + // #then + expect(errors.length).toBe(2) + expect(errors[0].message).toContain("cancelled") + }) + + test("clear should cancel all models and reset state", async () => { + // #given + const config: BackgroundTaskConfig = { defaultConcurrency: 1 } + const manager = new ConcurrencyManager(config) + await manager.acquire("model-a") + await manager.acquire("model-b") + + const errors: Error[] = [] + const p1 = manager.acquire("model-a").catch(e => errors.push(e)) + const p2 = manager.acquire("model-b").catch(e => errors.push(e)) + + // #when + manager.clear() + await Promise.all([p1, p2]) + + // #then + expect(errors.length).toBe(2) + expect(manager.getCount("model-a")).toBe(0) + expect(manager.getCount("model-b")).toBe(0) + }) + + test("getCount and getQueueLength should return correct values", async () => { + // #given + const config: BackgroundTaskConfig = { defaultConcurrency: 2 } + const manager = new ConcurrencyManager(config) + + // #when + await manager.acquire("model-a") + expect(manager.getCount("model-a")).toBe(1) + expect(manager.getQueueLength("model-a")).toBe(0) + + await manager.acquire("model-a") + expect(manager.getCount("model-a")).toBe(2) + + // Queue one more + const p = manager.acquire("model-a").catch(() => {}) + await Promise.resolve() // let it queue + + expect(manager.getQueueLength("model-a")).toBe(1) + + // Cleanup + manager.cancelWaiters("model-a") + await p + }) +}) diff --git a/src/features/background-agent/concurrency.ts b/src/features/background-agent/concurrency.ts index e9d24b8c..1559d088 100644 --- a/src/features/background-agent/concurrency.ts +++ b/src/features/background-agent/concurrency.ts @@ -1,9 +1,21 @@ import type { BackgroundTaskConfig } from "../../config/schema" +/** + * Queue entry with settled-flag pattern to prevent double-resolution. + * + * The settled flag ensures that cancelWaiters() doesn't reject + * an entry that was already resolved by release(). + */ +interface QueueEntry { + resolve: () => void + rawReject: (error: Error) => void + settled: boolean +} + export class ConcurrencyManager { private config?: BackgroundTaskConfig private counts: Map = new Map() - private queues: Map void>> = new Map() + private queues: Map = new Map() constructor(config?: BackgroundTaskConfig) { this.config = config @@ -38,9 +50,20 @@ export class ConcurrencyManager { return } - return new Promise((resolve) => { + return new Promise((resolve, reject) => { const queue = this.queues.get(model) ?? [] - queue.push(resolve) + + const entry: QueueEntry = { + resolve: () => { + if (entry.settled) return + entry.settled = true + resolve() + }, + rawReject: reject, + settled: false, + } + + queue.push(entry) this.queues.set(model, queue) }) } @@ -52,15 +75,63 @@ export class ConcurrencyManager { } const queue = this.queues.get(model) - if (queue && queue.length > 0) { + + // Try to hand off to a waiting entry (skip any settled entries from cancelWaiters) + while (queue && queue.length > 0) { const next = queue.shift()! - this.counts.set(model, this.counts.get(model) ?? 0) - next() - } else { - const current = this.counts.get(model) ?? 0 - if (current > 0) { - this.counts.set(model, current - 1) + if (!next.settled) { + // Hand off the slot to this waiter (count stays the same) + next.resolve() + return } } + + // No handoff occurred - decrement the count to free the slot + const current = this.counts.get(model) ?? 0 + if (current > 0) { + this.counts.set(model, current - 1) + } + } + + /** + * Cancel all waiting acquires for a model. Used during cleanup. + */ + cancelWaiters(model: string): void { + const queue = this.queues.get(model) + if (queue) { + for (const entry of queue) { + if (!entry.settled) { + entry.settled = true + entry.rawReject(new Error(`Concurrency queue cancelled for model: ${model}`)) + } + } + this.queues.delete(model) + } + } + + /** + * Clear all state. Used during manager cleanup/shutdown. + * Cancels all pending waiters. + */ + clear(): void { + for (const [model] of this.queues) { + this.cancelWaiters(model) + } + this.counts.clear() + this.queues.clear() + } + + /** + * Get current count for a model (for testing/debugging) + */ + getCount(model: string): number { + return this.counts.get(model) ?? 0 + } + + /** + * Get queue length for a model (for testing/debugging) + */ + getQueueLength(model: string): number { + return this.queues.get(model)?.length ?? 0 } } diff --git a/src/features/background-agent/manager.test.ts b/src/features/background-agent/manager.test.ts index 0aeedf6b..29e5b32e 100644 --- a/src/features/background-agent/manager.test.ts +++ b/src/features/background-agent/manager.test.ts @@ -122,6 +122,10 @@ class MockBackgroundManager { throw new Error(`Task not found for session: ${input.sessionId}`) } + if (existingTask.status === "running") { + return existingTask + } + this.resumeCalls.push({ sessionId: input.sessionId, prompt: input.prompt }) existingTask.status = "running" @@ -572,6 +576,7 @@ describe("BackgroundManager.resume", () => { parentSessionID: "old-parent", description: "original description", agent: "explore", + status: "completed", }) manager.addTask(existingTask) @@ -598,6 +603,7 @@ describe("BackgroundManager.resume", () => { id: "task-a", sessionID: "session-a", parentSessionID: "session-parent", + status: "completed", }) manager.addTask(task) @@ -623,6 +629,7 @@ describe("BackgroundManager.resume", () => { id: "task-a", sessionID: "session-a", parentSessionID: "session-parent", + status: "completed", }) taskWithProgress.progress = { toolCalls: 42, @@ -642,6 +649,29 @@ describe("BackgroundManager.resume", () => { // #then expect(result.progress?.toolCalls).toBe(42) }) + + test("should ignore resume when task is already running", () => { + // #given + const runningTask = createMockTask({ + id: "task-a", + sessionID: "session-a", + parentSessionID: "session-parent", + status: "running", + }) + manager.addTask(runningTask) + + // #when + const result = manager.resume({ + sessionId: "session-a", + prompt: "resume should be ignored", + parentSessionID: "new-parent", + parentMessageID: "new-msg", + }) + + // #then + expect(result.parentSessionID).toBe("session-parent") + expect(manager.resumeCalls).toHaveLength(0) + }) }) describe("LaunchInput.skillContent", () => { @@ -813,3 +843,149 @@ function buildNotificationPromptBody( return body } + +describe("tryCompleteTask pattern - race condition prevention", () => { + /** + * These tests verify the tryCompleteTask pattern behavior + * by simulating the guard logic in a mock implementation. + */ + + test("should prevent double completion when task already completed", () => { + // #given - task already completed + const task: BackgroundTask = { + id: "task-1", + sessionID: "session-1", + parentSessionID: "session-parent", + parentMessageID: "msg-1", + description: "test task", + prompt: "test", + agent: "explore", + status: "completed", + startedAt: new Date(), + completedAt: new Date(), + } + + // #when - try to complete again (simulating tryCompleteTask guard) + const canComplete = task.status === "running" + + // #then - should not allow completion + expect(canComplete).toBe(false) + }) + + test("should allow completion when task is running", () => { + // #given - task is running + const task: BackgroundTask = { + id: "task-1", + sessionID: "session-1", + parentSessionID: "session-parent", + parentMessageID: "msg-1", + description: "test task", + prompt: "test", + agent: "explore", + status: "running", + startedAt: new Date(), + } + + // #when - check if can complete + const canComplete = task.status === "running" + + // #then + expect(canComplete).toBe(true) + }) + + test("should prevent completion when task is cancelled", () => { + // #given - task cancelled by session.deleted + const task: BackgroundTask = { + id: "task-1", + sessionID: "session-1", + parentSessionID: "session-parent", + parentMessageID: "msg-1", + description: "test task", + prompt: "test", + agent: "explore", + status: "cancelled", + startedAt: new Date(), + } + + // #when + const canComplete = task.status === "running" + + // #then + expect(canComplete).toBe(false) + }) + + test("should prevent completion when task errored", () => { + // #given - task errored + const task: BackgroundTask = { + id: "task-1", + sessionID: "session-1", + parentSessionID: "session-parent", + parentMessageID: "msg-1", + description: "test task", + prompt: "test", + agent: "explore", + status: "error", + error: "some error", + startedAt: new Date(), + } + + // #when + const canComplete = task.status === "running" + + // #then + expect(canComplete).toBe(false) + }) +}) + +describe("concurrencyKey management", () => { + test("concurrencyKey should be undefined after release", () => { + // #given - task with concurrency key + const task: BackgroundTask = { + id: "task-1", + sessionID: "session-1", + parentSessionID: "session-parent", + parentMessageID: "msg-1", + description: "test task", + prompt: "test", + agent: "explore", + status: "running", + startedAt: new Date(), + concurrencyKey: "anthropic/claude-sonnet-4-5", + } + + // #when - simulate release pattern (what tryCompleteTask does) + if (task.concurrencyKey) { + // concurrencyManager.release(task.concurrencyKey) would be called + task.concurrencyKey = undefined + } + + // #then + expect(task.concurrencyKey).toBeUndefined() + }) + + test("release should be idempotent with concurrencyKey guard", () => { + // #given - task with key already released + const task: BackgroundTask = { + id: "task-1", + sessionID: "session-1", + parentSessionID: "session-parent", + parentMessageID: "msg-1", + description: "test task", + prompt: "test", + agent: "explore", + status: "completed", + startedAt: new Date(), + concurrencyKey: undefined, // already released + } + + // #when - try to release again (guard pattern) + let releaseCount = 0 + if (task.concurrencyKey) { + releaseCount++ + task.concurrencyKey = undefined + } + + // #then - no double release + expect(releaseCount).toBe(0) + }) +}) diff --git a/src/features/background-agent/manager.ts b/src/features/background-agent/manager.ts index 5860258f..ff0a4975 100644 --- a/src/features/background-agent/manager.ts +++ b/src/features/background-agent/manager.ts @@ -52,6 +52,8 @@ export class BackgroundManager { private directory: string private pollingInterval?: ReturnType private concurrencyManager: ConcurrencyManager + private cleanupRegistered = false + private shutdownTriggered = false constructor(ctx: PluginInput, config?: BackgroundTaskConfig) { this.tasks = new Map() @@ -60,6 +62,7 @@ export class BackgroundManager { this.client = ctx.client this.directory = ctx.directory this.concurrencyManager = new ConcurrencyManager(config) + this.registerProcessCleanup() } async launch(input: LaunchInput): Promise { @@ -186,7 +189,7 @@ export class BackgroundManager { existingTask.completedAt = new Date() if (existingTask.concurrencyKey) { this.concurrencyManager.release(existingTask.concurrencyKey) - existingTask.concurrencyKey = undefined // Prevent double-release + existingTask.concurrencyKey = undefined } this.markForNotification(existingTask) this.notifyParentSession(existingTask).catch(err => { @@ -238,14 +241,20 @@ export class BackgroundManager { * Register an external task (e.g., from sisyphus_task) for notification tracking. * This allows tasks created by external tools to receive the same toast/prompt notifications. */ - registerExternalTask(input: { + async registerExternalTask(input: { taskId: string sessionID: string parentSessionID: string description: string agent?: string parentAgent?: string - }): BackgroundTask { + concurrencyKey?: string + }): Promise { + // Acquire concurrency slot if a key is provided + if (input.concurrencyKey) { + await this.concurrencyManager.acquire(input.concurrencyKey) + } + const task: BackgroundTask = { id: input.taskId, sessionID: input.sessionID, @@ -261,6 +270,7 @@ export class BackgroundManager { lastUpdate: new Date(), }, parentAgent: input.parentAgent, + concurrencyKey: input.concurrencyKey, } this.tasks.set(task.id, task) @@ -283,6 +293,21 @@ export class BackgroundManager { throw new Error(`Task not found for session: ${input.sessionId}`) } + if (existingTask.status === "running") { + log("[background-agent] Resume skipped - task already running:", { + taskId: existingTask.id, + sessionID: existingTask.sessionID, + }) + return existingTask + } + + // Re-acquire concurrency using the agent name as the key (same as launch()). + // Note: existingTask.concurrencyKey is cleared when tasks complete, so we + // derive the key from task.agent which persists through completion. + const concurrencyKey = existingTask.agent + await this.concurrencyManager.acquire(concurrencyKey) + existingTask.concurrencyKey = concurrencyKey + existingTask.status = "running" existingTask.completedAt = undefined existingTask.error = undefined @@ -344,11 +369,12 @@ export class BackgroundManager { const errorMessage = error instanceof Error ? error.message : String(error) existingTask.error = errorMessage existingTask.completedAt = new Date() - // Release concurrency on resume error (matches launch error handler) - if (existingTask.concurrencyKey) { - this.concurrencyManager.release(existingTask.concurrencyKey) - existingTask.concurrencyKey = undefined // Prevent double-release - } + + // Release concurrency on error to prevent slot leaks + if (existingTask.concurrencyKey) { + this.concurrencyManager.release(existingTask.concurrencyKey) + existingTask.concurrencyKey = undefined + } this.markForNotification(existingTask) this.notifyParentSession(existingTask).catch(err => { log("[background-agent] Failed to notify on resume error:", err) @@ -417,29 +443,31 @@ export class BackgroundManager { // Edge guard: Verify session has actual assistant output before completing this.validateSessionHasOutput(sessionID).then(async (hasValidOutput) => { + // Re-check status after async operation (could have been completed by polling) + if (task.status !== "running") { + log("[background-agent] Task status changed during validation, skipping:", { taskId: task.id, status: task.status }) + return + } + if (!hasValidOutput) { log("[background-agent] Session.idle but no valid output yet, waiting:", task.id) return } const hasIncompleteTodos = await this.checkSessionTodos(sessionID) + + // Re-check status after async operation again + if (task.status !== "running") { + log("[background-agent] Task status changed during todo check, skipping:", { taskId: task.id, status: task.status }) + return + } + if (hasIncompleteTodos) { log("[background-agent] Task has incomplete todos, waiting for todo-continuation:", task.id) return } - task.status = "completed" - task.completedAt = new Date() - // Release concurrency immediately on completion - if (task.concurrencyKey) { - this.concurrencyManager.release(task.concurrencyKey) - task.concurrencyKey = undefined // Prevent double-release - } - // Clean up pendingByParent to prevent stale entries - this.cleanupPendingByParent(task) - this.markForNotification(task) - await this.notifyParentSession(task) - log("[background-agent] Task completed via session.idle event:", task.id) + await this.tryCompleteTask(task, "session.idle event") }).catch(err => { log("[background-agent] Error in session.idle handler:", err) }) @@ -459,10 +487,10 @@ export class BackgroundManager { task.error = "Session deleted" } - if (task.concurrencyKey) { - this.concurrencyManager.release(task.concurrencyKey) - task.concurrencyKey = undefined // Prevent double-release - } + if (task.concurrencyKey) { + this.concurrencyManager.release(task.concurrencyKey) + task.concurrencyKey = undefined + } // Clean up pendingByParent to prevent stale entries this.cleanupPendingByParent(task) this.tasks.delete(task.id) @@ -587,11 +615,25 @@ export class BackgroundManager { } } -cleanup(): void { - this.stopPolling() - this.tasks.clear() - this.notifications.clear() - this.pendingByParent.clear() + private registerProcessCleanup(): void { + if (this.cleanupRegistered) return + this.cleanupRegistered = true + + const cleanup = () => { + try { + this.shutdown() + } catch (error) { + log("[background-agent] Error during shutdown cleanup:", error) + } + } + + registerProcessSignal("SIGINT", cleanup) + registerProcessSignal("SIGTERM", cleanup) + if (process.platform === "win32") { + registerProcessSignal("SIGBREAK", cleanup) + } + process.on("beforeExit", cleanup) + process.on("exit", cleanup) } /** @@ -608,12 +650,44 @@ cleanup(): void { return Array.from(this.tasks.values()).filter(t => t.status !== "running") } -private async notifyParentSession(task: BackgroundTask): Promise { + /** + * Safely complete a task with race condition protection. + * Returns true if task was successfully completed, false if already completed by another path. + */ + private async tryCompleteTask(task: BackgroundTask, source: string): Promise { + // Guard: Check if task is still running (could have been completed by another path) + if (task.status !== "running") { + log("[background-agent] Task already completed, skipping:", { taskId: task.id, status: task.status, source }) + return false + } + + // Atomically mark as completed to prevent race conditions + task.status = "completed" + task.completedAt = new Date() + + // Release concurrency BEFORE any async operations to prevent slot leaks if (task.concurrencyKey) { this.concurrencyManager.release(task.concurrencyKey) task.concurrencyKey = undefined } + this.markForNotification(task) + + try { + await this.notifyParentSession(task) + log(`[background-agent] Task completed via ${source}:`, task.id) + } catch (err) { + log("[background-agent] Error in notifyParentSession:", { taskId: task.id, error: err }) + // Concurrency already released, notification failed but task is complete + } + + return true + } + + private async notifyParentSession(task: BackgroundTask): Promise { + // Note: Callers must release concurrency before calling this method + // to ensure slots are freed even if notification fails + const duration = this.formatDuration(task.startedAt, task.completedAt) log("[background-agent] notifyParentSession called for task:", task.id) @@ -715,10 +789,12 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea const taskId = task.id setTimeout(() => { - // Concurrency already released at completion - just cleanup notifications and task - this.clearNotificationsForTask(taskId) - this.tasks.delete(taskId) - log("[background-agent] Removed completed task from memory:", taskId) + // Guard: Only delete if task still exists (could have been deleted by session.deleted event) + if (this.tasks.has(taskId)) { + this.clearNotificationsForTask(taskId) + this.tasks.delete(taskId) + log("[background-agent] Removed completed task from memory:", taskId) + } }, 5 * 60 * 1000) } @@ -755,7 +831,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea task.completedAt = new Date() if (task.concurrencyKey) { this.concurrencyManager.release(task.concurrencyKey) - task.concurrencyKey = undefined // Prevent double-release + task.concurrencyKey = undefined } // Clean up pendingByParent to prevent stale entries this.cleanupPendingByParent(task) @@ -791,7 +867,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea for (const task of this.tasks.values()) { if (task.status !== "running") continue -try { + try { const sessionStatus = allStatuses[task.sessionID] // Don't skip if session not in status - fall through to message-based detection @@ -803,24 +879,16 @@ try { continue } + // Re-check status after async operation + if (task.status !== "running") continue + const hasIncompleteTodos = await this.checkSessionTodos(task.sessionID) if (hasIncompleteTodos) { log("[background-agent] Task has incomplete todos via polling, waiting:", task.id) continue } - task.status = "completed" - task.completedAt = new Date() - // Release concurrency immediately on completion - if (task.concurrencyKey) { - this.concurrencyManager.release(task.concurrencyKey) - task.concurrencyKey = undefined // Prevent double-release - } - // Clean up pendingByParent to prevent stale entries - this.cleanupPendingByParent(task) - this.markForNotification(task) - await this.notifyParentSession(task) - log("[background-agent] Task completed via polling:", task.id) + await this.tryCompleteTask(task, "polling (idle status)") continue } @@ -860,7 +928,7 @@ try { task.progress.toolCalls = toolCalls task.progress.lastTool = lastTool task.progress.lastUpdate = new Date() -if (lastMessage) { + if (lastMessage) { task.progress.lastMessage = lastMessage task.progress.lastMessageAt = new Date() } @@ -880,20 +948,12 @@ if (lastMessage) { continue } + // Re-check status after async operation + if (task.status !== "running") continue + const hasIncompleteTodos = await this.checkSessionTodos(task.sessionID) if (!hasIncompleteTodos) { - task.status = "completed" - task.completedAt = new Date() - // Release concurrency immediately on completion - if (task.concurrencyKey) { - this.concurrencyManager.release(task.concurrencyKey) - task.concurrencyKey = undefined // Prevent double-release - } - // Clean up pendingByParent to prevent stale entries - this.cleanupPendingByParent(task) - this.markForNotification(task) - await this.notifyParentSession(task) - log("[background-agent] Task completed via stability detection:", task.id) + await this.tryCompleteTask(task, "stability detection") continue } } @@ -912,6 +972,43 @@ if (lastMessage) { this.stopPolling() } } + + /** + * Shutdown the manager gracefully. + * Cancels all pending concurrency waiters and clears timers. + * Should be called when the plugin is unloaded. + */ + shutdown(): void { + if (this.shutdownTriggered) return + this.shutdownTriggered = true + log("[background-agent] Shutting down BackgroundManager") + this.stopPolling() + + // Release concurrency for all running tasks first + for (const task of this.tasks.values()) { + if (task.concurrencyKey) { + this.concurrencyManager.release(task.concurrencyKey) + task.concurrencyKey = undefined + } + } + + // Then clear all state (cancels any remaining waiters) + this.concurrencyManager.clear() + this.tasks.clear() + this.notifications.clear() + this.pendingByParent.clear() + log("[background-agent] Shutdown complete") + } +} + +function registerProcessSignal( + signal: NodeJS.Signals, + handler: () => void +): void { + process.on(signal, () => { + handler() + process.exit(0) + }) } function getMessageDir(sessionID: string): string | null {