diff --git a/src/features/background-agent/concurrency.test.ts b/src/features/background-agent/concurrency.test.ts index 677440e4..c7128fa6 100644 --- a/src/features/background-agent/concurrency.test.ts +++ b/src/features/background-agent/concurrency.test.ts @@ -349,3 +349,70 @@ describe("ConcurrencyManager.acquire/release", () => { await waitPromise }) }) + +describe("ConcurrencyManager.cleanup", () => { + test("cancelWaiters should reject all pending acquires", async () => { + // #given + const config: BackgroundTaskConfig = { defaultConcurrency: 1 } + const manager = new ConcurrencyManager(config) + await manager.acquire("model-a") + + // Queue waiters + const errors: Error[] = [] + const p1 = manager.acquire("model-a").catch(e => errors.push(e)) + const p2 = manager.acquire("model-a").catch(e => errors.push(e)) + + // #when + manager.cancelWaiters("model-a") + await Promise.all([p1, p2]) + + // #then + expect(errors.length).toBe(2) + expect(errors[0].message).toContain("cancelled") + }) + + test("clear should cancel all models and reset state", async () => { + // #given + const config: BackgroundTaskConfig = { defaultConcurrency: 1 } + const manager = new ConcurrencyManager(config) + await manager.acquire("model-a") + await manager.acquire("model-b") + + const errors: Error[] = [] + const p1 = manager.acquire("model-a").catch(e => errors.push(e)) + const p2 = manager.acquire("model-b").catch(e => errors.push(e)) + + // #when + manager.clear() + await Promise.all([p1, p2]) + + // #then + expect(errors.length).toBe(2) + expect(manager.getCount("model-a")).toBe(0) + expect(manager.getCount("model-b")).toBe(0) + }) + + test("getCount and getQueueLength should return correct values", async () => { + // #given + const config: BackgroundTaskConfig = { defaultConcurrency: 2 } + const manager = new ConcurrencyManager(config) + + // #when + await manager.acquire("model-a") + expect(manager.getCount("model-a")).toBe(1) + expect(manager.getQueueLength("model-a")).toBe(0) + + await manager.acquire("model-a") + expect(manager.getCount("model-a")).toBe(2) + + // Queue one more + const p = manager.acquire("model-a").catch(() => {}) + await Promise.resolve() // let it queue + + expect(manager.getQueueLength("model-a")).toBe(1) + + // Cleanup + manager.cancelWaiters("model-a") + await p + }) +}) diff --git a/src/features/background-agent/concurrency.ts b/src/features/background-agent/concurrency.ts index e9d24b8c..1559d088 100644 --- a/src/features/background-agent/concurrency.ts +++ b/src/features/background-agent/concurrency.ts @@ -1,9 +1,21 @@ import type { BackgroundTaskConfig } from "../../config/schema" +/** + * Queue entry with settled-flag pattern to prevent double-resolution. + * + * The settled flag ensures that cancelWaiters() doesn't reject + * an entry that was already resolved by release(). + */ +interface QueueEntry { + resolve: () => void + rawReject: (error: Error) => void + settled: boolean +} + export class ConcurrencyManager { private config?: BackgroundTaskConfig private counts: Map = new Map() - private queues: Map void>> = new Map() + private queues: Map = new Map() constructor(config?: BackgroundTaskConfig) { this.config = config @@ -38,9 +50,20 @@ export class ConcurrencyManager { return } - return new Promise((resolve) => { + return new Promise((resolve, reject) => { const queue = this.queues.get(model) ?? [] - queue.push(resolve) + + const entry: QueueEntry = { + resolve: () => { + if (entry.settled) return + entry.settled = true + resolve() + }, + rawReject: reject, + settled: false, + } + + queue.push(entry) this.queues.set(model, queue) }) } @@ -52,15 +75,63 @@ export class ConcurrencyManager { } const queue = this.queues.get(model) - if (queue && queue.length > 0) { + + // Try to hand off to a waiting entry (skip any settled entries from cancelWaiters) + while (queue && queue.length > 0) { const next = queue.shift()! - this.counts.set(model, this.counts.get(model) ?? 0) - next() - } else { - const current = this.counts.get(model) ?? 0 - if (current > 0) { - this.counts.set(model, current - 1) + if (!next.settled) { + // Hand off the slot to this waiter (count stays the same) + next.resolve() + return } } + + // No handoff occurred - decrement the count to free the slot + const current = this.counts.get(model) ?? 0 + if (current > 0) { + this.counts.set(model, current - 1) + } + } + + /** + * Cancel all waiting acquires for a model. Used during cleanup. + */ + cancelWaiters(model: string): void { + const queue = this.queues.get(model) + if (queue) { + for (const entry of queue) { + if (!entry.settled) { + entry.settled = true + entry.rawReject(new Error(`Concurrency queue cancelled for model: ${model}`)) + } + } + this.queues.delete(model) + } + } + + /** + * Clear all state. Used during manager cleanup/shutdown. + * Cancels all pending waiters. + */ + clear(): void { + for (const [model] of this.queues) { + this.cancelWaiters(model) + } + this.counts.clear() + this.queues.clear() + } + + /** + * Get current count for a model (for testing/debugging) + */ + getCount(model: string): number { + return this.counts.get(model) ?? 0 + } + + /** + * Get queue length for a model (for testing/debugging) + */ + getQueueLength(model: string): number { + return this.queues.get(model)?.length ?? 0 } } diff --git a/src/features/background-agent/manager.test.ts b/src/features/background-agent/manager.test.ts index 0aeedf6b..304cf99d 100644 --- a/src/features/background-agent/manager.test.ts +++ b/src/features/background-agent/manager.test.ts @@ -1,5 +1,11 @@ import { describe, test, expect, beforeEach } from "bun:test" +import { afterEach } from "bun:test" +import { tmpdir } from "node:os" +import type { PluginInput } from "@opencode-ai/plugin" import type { BackgroundTask, ResumeInput } from "./types" +import { BackgroundManager } from "./manager" +import { ConcurrencyManager } from "./concurrency" + const TASK_TTL_MS = 30 * 60 * 1000 @@ -122,6 +128,10 @@ class MockBackgroundManager { throw new Error(`Task not found for session: ${input.sessionId}`) } + if (existingTask.status === "running") { + return existingTask + } + this.resumeCalls.push({ sessionId: input.sessionId, prompt: input.prompt }) existingTask.status = "running" @@ -152,6 +162,44 @@ function createMockTask(overrides: Partial & { id: string; sessi } } +function createBackgroundManager(): BackgroundManager { + const client = { + session: { + prompt: async () => ({}), + }, + } + return new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput) +} + +function getConcurrencyManager(manager: BackgroundManager): ConcurrencyManager { + return (manager as unknown as { concurrencyManager: ConcurrencyManager }).concurrencyManager +} + +function getTaskMap(manager: BackgroundManager): Map { + return (manager as unknown as { tasks: Map }).tasks +} + +function stubNotifyParentSession(manager: BackgroundManager): void { + (manager as unknown as { notifyParentSession: (task: BackgroundTask) => Promise }).notifyParentSession = async () => {} +} + +async function tryCompleteTaskForTest(manager: BackgroundManager, task: BackgroundTask): Promise { + return (manager as unknown as { tryCompleteTask: (task: BackgroundTask, source: string) => Promise }).tryCompleteTask(task, "test") +} + +function getCleanupSignals(): Array { + const signals: Array = ["SIGINT", "SIGTERM", "beforeExit", "exit"] + if (process.platform === "win32") { + signals.push("SIGBREAK") + } + return signals +} + +function getListenerCounts(signals: Array): Record { + return Object.fromEntries(signals.map((signal) => [signal, process.listenerCount(signal)])) +} + + describe("BackgroundManager.getAllDescendantTasks", () => { let manager: MockBackgroundManager @@ -572,6 +620,7 @@ describe("BackgroundManager.resume", () => { parentSessionID: "old-parent", description: "original description", agent: "explore", + status: "completed", }) manager.addTask(existingTask) @@ -598,6 +647,7 @@ describe("BackgroundManager.resume", () => { id: "task-a", sessionID: "session-a", parentSessionID: "session-parent", + status: "completed", }) manager.addTask(task) @@ -623,6 +673,7 @@ describe("BackgroundManager.resume", () => { id: "task-a", sessionID: "session-a", parentSessionID: "session-parent", + status: "completed", }) taskWithProgress.progress = { toolCalls: 42, @@ -642,6 +693,29 @@ describe("BackgroundManager.resume", () => { // #then expect(result.progress?.toolCalls).toBe(42) }) + + test("should ignore resume when task is already running", () => { + // #given + const runningTask = createMockTask({ + id: "task-a", + sessionID: "session-a", + parentSessionID: "session-parent", + status: "running", + }) + manager.addTask(runningTask) + + // #when + const result = manager.resume({ + sessionId: "session-a", + prompt: "resume should be ignored", + parentSessionID: "new-parent", + parentMessageID: "new-msg", + }) + + // #then + expect(result.parentSessionID).toBe("session-parent") + expect(manager.resumeCalls).toHaveLength(0) + }) }) describe("LaunchInput.skillContent", () => { @@ -813,3 +887,176 @@ function buildNotificationPromptBody( return body } + +describe("BackgroundManager.tryCompleteTask", () => { + let manager: BackgroundManager + + beforeEach(() => { + // #given + manager = createBackgroundManager() + stubNotifyParentSession(manager) + }) + + afterEach(() => { + manager.shutdown() + }) + + test("should release concurrency and clear key on completion", async () => { + // #given + const concurrencyKey = "anthropic/claude-opus-4-5" + const concurrencyManager = getConcurrencyManager(manager) + await concurrencyManager.acquire(concurrencyKey) + + const task: BackgroundTask = { + id: "task-1", + sessionID: "session-1", + parentSessionID: "session-parent", + parentMessageID: "msg-1", + description: "test task", + prompt: "test", + agent: "explore", + status: "running", + startedAt: new Date(), + concurrencyKey, + } + + // #when + const completed = await tryCompleteTaskForTest(manager, task) + + // #then + expect(completed).toBe(true) + expect(task.status).toBe("completed") + expect(task.concurrencyKey).toBeUndefined() + expect(concurrencyManager.getCount(concurrencyKey)).toBe(0) + }) + + test("should prevent double completion and double release", async () => { + // #given + const concurrencyKey = "anthropic/claude-opus-4-5" + const concurrencyManager = getConcurrencyManager(manager) + await concurrencyManager.acquire(concurrencyKey) + + const task: BackgroundTask = { + id: "task-1", + sessionID: "session-1", + parentSessionID: "session-parent", + parentMessageID: "msg-1", + description: "test task", + prompt: "test", + agent: "explore", + status: "running", + startedAt: new Date(), + concurrencyKey, + } + + // #when + await tryCompleteTaskForTest(manager, task) + const secondAttempt = await tryCompleteTaskForTest(manager, task) + + // #then + expect(secondAttempt).toBe(false) + expect(task.status).toBe("completed") + expect(concurrencyManager.getCount(concurrencyKey)).toBe(0) + }) +}) + +describe("BackgroundManager.trackTask", () => { + let manager: BackgroundManager + + beforeEach(() => { + // #given + manager = createBackgroundManager() + stubNotifyParentSession(manager) + }) + + afterEach(() => { + manager.shutdown() + }) + + test("should not double acquire on duplicate registration", async () => { + // #given + const input = { + taskId: "task-1", + sessionID: "session-1", + parentSessionID: "parent-session", + description: "external task", + agent: "sisyphus_task", + concurrencyKey: "external-key", + } + + // #when + await manager.trackTask(input) + await manager.trackTask(input) + + // #then + const concurrencyManager = getConcurrencyManager(manager) + expect(concurrencyManager.getCount("external-key")).toBe(1) + expect(getTaskMap(manager).size).toBe(1) + }) +}) + +describe("BackgroundManager.resume concurrency key", () => { + let manager: BackgroundManager + + beforeEach(() => { + // #given + manager = createBackgroundManager() + stubNotifyParentSession(manager) + }) + + afterEach(() => { + manager.shutdown() + }) + + test("should re-acquire using external task concurrency key", async () => { + // #given + const task = await manager.trackTask({ + taskId: "task-1", + sessionID: "session-1", + parentSessionID: "parent-session", + description: "external task", + agent: "sisyphus_task", + concurrencyKey: "external-key", + }) + + await tryCompleteTaskForTest(manager, task) + + // #when + await manager.resume({ + sessionId: "session-1", + prompt: "resume", + parentSessionID: "parent-session-2", + parentMessageID: "msg-2", + }) + + // #then + const concurrencyManager = getConcurrencyManager(manager) + expect(concurrencyManager.getCount("external-key")).toBe(1) + expect(task.concurrencyKey).toBe("external-key") + }) +}) + +describe("BackgroundManager process cleanup", () => { + test("should remove listeners after last shutdown", () => { + // #given + const signals = getCleanupSignals() + const baseline = getListenerCounts(signals) + const managerA = createBackgroundManager() + const managerB = createBackgroundManager() + + // #when + const afterCreate = getListenerCounts(signals) + managerA.shutdown() + const afterFirstShutdown = getListenerCounts(signals) + managerB.shutdown() + const afterSecondShutdown = getListenerCounts(signals) + + // #then + for (const signal of signals) { + expect(afterCreate[signal]).toBe(baseline[signal] + 1) + expect(afterFirstShutdown[signal]).toBe(baseline[signal] + 1) + expect(afterSecondShutdown[signal]).toBe(baseline[signal]) + } + }) +}) + diff --git a/src/features/background-agent/manager.ts b/src/features/background-agent/manager.ts index 2abc4cac..16c88d75 100644 --- a/src/features/background-agent/manager.ts +++ b/src/features/background-agent/manager.ts @@ -18,8 +18,11 @@ import { join } from "node:path" const TASK_TTL_MS = 30 * 60 * 1000 const MIN_STABILITY_TIME_MS = 10 * 1000 // Must run at least 10s before stability detection kicks in +type ProcessCleanupEvent = NodeJS.Signals | "beforeExit" | "exit" + type OpencodeClient = PluginInput["client"] + interface MessagePartInfo { sessionID?: string type?: string @@ -45,6 +48,10 @@ interface Todo { } export class BackgroundManager { + private static cleanupManagers = new Set() + private static cleanupRegistered = false + private static cleanupHandlers = new Map void>() + private tasks: Map private notifications: Map private pendingByParent: Map> // Track pending tasks per parent for batching @@ -52,6 +59,8 @@ export class BackgroundManager { private directory: string private pollingInterval?: ReturnType private concurrencyManager: ConcurrencyManager + private shutdownTriggered = false + constructor(ctx: PluginInput, config?: BackgroundTaskConfig) { this.tasks = new Map() @@ -60,6 +69,7 @@ export class BackgroundManager { this.client = ctx.client this.directory = ctx.directory this.concurrencyManager = new ConcurrencyManager(config) + this.registerProcessCleanup() } async launch(input: LaunchInput): Promise { @@ -126,8 +136,10 @@ export class BackgroundManager { parentAgent: input.parentAgent, model: input.model, concurrencyKey, + concurrencyGroup: concurrencyKey, } + this.tasks.set(task.id, task) this.startPolling() @@ -186,8 +198,9 @@ export class BackgroundManager { existingTask.completedAt = new Date() if (existingTask.concurrencyKey) { this.concurrencyManager.release(existingTask.concurrencyKey) - existingTask.concurrencyKey = undefined // Prevent double-release + existingTask.concurrencyKey = undefined } + this.markForNotification(existingTask) this.notifyParentSession(existingTask).catch(err => { log("[background-agent] Failed to notify on error:", err) @@ -235,17 +248,60 @@ export class BackgroundManager { } /** - * Register an external task (e.g., from sisyphus_task) for notification tracking. - * This allows tasks created by external tools to receive the same toast/prompt notifications. + * Track a task created elsewhere (e.g., from sisyphus_task) for notification tracking. + * This allows tasks created by other tools to receive the same toast/prompt notifications. */ - registerExternalTask(input: { + async trackTask(input: { taskId: string sessionID: string parentSessionID: string description: string agent?: string parentAgent?: string - }): BackgroundTask { + concurrencyKey?: string + }): Promise { + const existingTask = this.tasks.get(input.taskId) + if (existingTask) { + // P2 fix: Clean up old parent's pending set BEFORE changing parent + // Otherwise cleanupPendingByParent would use the new parent ID + const parentChanged = input.parentSessionID !== existingTask.parentSessionID + if (parentChanged) { + this.cleanupPendingByParent(existingTask) // Clean from OLD parent + existingTask.parentSessionID = input.parentSessionID + } + if (input.parentAgent !== undefined) { + existingTask.parentAgent = input.parentAgent + } + if (!existingTask.concurrencyGroup) { + existingTask.concurrencyGroup = input.concurrencyKey ?? existingTask.agent + } + + subagentSessions.add(existingTask.sessionID) + this.startPolling() + + // Track for batched notifications only if task is still running + // Don't add stale entries for completed tasks + if (existingTask.status === "running") { + const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set() + pending.add(existingTask.id) + this.pendingByParent.set(input.parentSessionID, pending) + } else if (!parentChanged) { + // Only clean up if parent didn't change (already cleaned above if it did) + this.cleanupPendingByParent(existingTask) + } + + log("[background-agent] External task already registered:", { taskId: existingTask.id, sessionID: existingTask.sessionID, status: existingTask.status }) + + return existingTask + } + + const concurrencyGroup = input.concurrencyKey ?? input.agent ?? "sisyphus_task" + + // Acquire concurrency slot if a key is provided + if (input.concurrencyKey) { + await this.concurrencyManager.acquire(input.concurrencyKey) + } + const task: BackgroundTask = { id: input.taskId, sessionID: input.sessionID, @@ -261,12 +317,15 @@ export class BackgroundManager { lastUpdate: new Date(), }, parentAgent: input.parentAgent, + concurrencyKey: input.concurrencyKey, + concurrencyGroup, } this.tasks.set(task.id, task) subagentSessions.add(input.sessionID) this.startPolling() + // Track for batched notifications (external tasks need tracking too) const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set() pending.add(task.id) @@ -283,6 +342,21 @@ export class BackgroundManager { throw new Error(`Task not found for session: ${input.sessionId}`) } + if (existingTask.status === "running") { + log("[background-agent] Resume skipped - task already running:", { + taskId: existingTask.id, + sessionID: existingTask.sessionID, + }) + return existingTask + } + + // Re-acquire concurrency using the persisted concurrency group + const concurrencyKey = existingTask.concurrencyGroup ?? existingTask.agent + await this.concurrencyManager.acquire(concurrencyKey) + existingTask.concurrencyKey = concurrencyKey + existingTask.concurrencyGroup = concurrencyKey + + existingTask.status = "running" existingTask.completedAt = undefined existingTask.error = undefined @@ -344,10 +418,11 @@ export class BackgroundManager { const errorMessage = error instanceof Error ? error.message : String(error) existingTask.error = errorMessage existingTask.completedAt = new Date() - // Release concurrency on resume error (matches launch error handler) + + // Release concurrency on error to prevent slot leaks if (existingTask.concurrencyKey) { this.concurrencyManager.release(existingTask.concurrencyKey) - existingTask.concurrencyKey = undefined // Prevent double-release + existingTask.concurrencyKey = undefined } this.markForNotification(existingTask) this.notifyParentSession(existingTask).catch(err => { @@ -417,29 +492,31 @@ export class BackgroundManager { // Edge guard: Verify session has actual assistant output before completing this.validateSessionHasOutput(sessionID).then(async (hasValidOutput) => { + // Re-check status after async operation (could have been completed by polling) + if (task.status !== "running") { + log("[background-agent] Task status changed during validation, skipping:", { taskId: task.id, status: task.status }) + return + } + if (!hasValidOutput) { log("[background-agent] Session.idle but no valid output yet, waiting:", task.id) return } const hasIncompleteTodos = await this.checkSessionTodos(sessionID) + + // Re-check status after async operation again + if (task.status !== "running") { + log("[background-agent] Task status changed during todo check, skipping:", { taskId: task.id, status: task.status }) + return + } + if (hasIncompleteTodos) { log("[background-agent] Task has incomplete todos, waiting for todo-continuation:", task.id) return } - task.status = "completed" - task.completedAt = new Date() - // Release concurrency immediately on completion - if (task.concurrencyKey) { - this.concurrencyManager.release(task.concurrencyKey) - task.concurrencyKey = undefined // Prevent double-release - } - // Clean up pendingByParent to prevent stale entries - this.cleanupPendingByParent(task) - this.markForNotification(task) - await this.notifyParentSession(task) - log("[background-agent] Task completed via session.idle event:", task.id) + await this.tryCompleteTask(task, "session.idle event") }).catch(err => { log("[background-agent] Error in session.idle handler:", err) }) @@ -459,10 +536,10 @@ export class BackgroundManager { task.error = "Session deleted" } - if (task.concurrencyKey) { - this.concurrencyManager.release(task.concurrencyKey) - task.concurrencyKey = undefined // Prevent double-release - } + if (task.concurrencyKey) { + this.concurrencyManager.release(task.concurrencyKey) + task.concurrencyKey = undefined + } // Clean up pendingByParent to prevent stale entries this.cleanupPendingByParent(task) this.tasks.delete(task.id) @@ -587,13 +664,49 @@ export class BackgroundManager { } } -cleanup(): void { - this.stopPolling() - this.tasks.clear() - this.notifications.clear() - this.pendingByParent.clear() + private registerProcessCleanup(): void { + BackgroundManager.cleanupManagers.add(this) + + if (BackgroundManager.cleanupRegistered) return + BackgroundManager.cleanupRegistered = true + + const cleanupAll = () => { + for (const manager of BackgroundManager.cleanupManagers) { + try { + manager.shutdown() + } catch (error) { + log("[background-agent] Error during shutdown cleanup:", error) + } + } + } + + const registerSignal = (signal: ProcessCleanupEvent, exitAfter: boolean): void => { + const listener = registerProcessSignal(signal, cleanupAll, exitAfter) + BackgroundManager.cleanupHandlers.set(signal, listener) + } + + registerSignal("SIGINT", true) + registerSignal("SIGTERM", true) + if (process.platform === "win32") { + registerSignal("SIGBREAK", true) + } + registerSignal("beforeExit", false) + registerSignal("exit", false) } + private unregisterProcessCleanup(): void { + BackgroundManager.cleanupManagers.delete(this) + + if (BackgroundManager.cleanupManagers.size > 0) return + + for (const [signal, listener] of BackgroundManager.cleanupHandlers.entries()) { + process.off(signal, listener) + } + BackgroundManager.cleanupHandlers.clear() + BackgroundManager.cleanupRegistered = false + } + + /** * Get all running tasks (for compaction hook) */ @@ -608,12 +721,44 @@ cleanup(): void { return Array.from(this.tasks.values()).filter(t => t.status !== "running") } -private async notifyParentSession(task: BackgroundTask): Promise { + /** + * Safely complete a task with race condition protection. + * Returns true if task was successfully completed, false if already completed by another path. + */ + private async tryCompleteTask(task: BackgroundTask, source: string): Promise { + // Guard: Check if task is still running (could have been completed by another path) + if (task.status !== "running") { + log("[background-agent] Task already completed, skipping:", { taskId: task.id, status: task.status, source }) + return false + } + + // Atomically mark as completed to prevent race conditions + task.status = "completed" + task.completedAt = new Date() + + // Release concurrency BEFORE any async operations to prevent slot leaks if (task.concurrencyKey) { this.concurrencyManager.release(task.concurrencyKey) task.concurrencyKey = undefined } + this.markForNotification(task) + + try { + await this.notifyParentSession(task) + log(`[background-agent] Task completed via ${source}:`, task.id) + } catch (err) { + log("[background-agent] Error in notifyParentSession:", { taskId: task.id, error: err }) + // Concurrency already released, notification failed but task is complete + } + + return true + } + + private async notifyParentSession(task: BackgroundTask): Promise { + // Note: Callers must release concurrency before calling this method + // to ensure slots are freed even if notification fails + const duration = this.formatDuration(task.startedAt, task.completedAt) log("[background-agent] notifyParentSession called for task:", task.id) @@ -727,10 +872,12 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea const taskId = task.id setTimeout(() => { - // Concurrency already released at completion - just cleanup notifications and task - this.clearNotificationsForTask(taskId) - this.tasks.delete(taskId) - log("[background-agent] Removed completed task from memory:", taskId) + // Guard: Only delete if task still exists (could have been deleted by session.deleted event) + if (this.tasks.has(taskId)) { + this.clearNotificationsForTask(taskId) + this.tasks.delete(taskId) + log("[background-agent] Removed completed task from memory:", taskId) + } }, 5 * 60 * 1000) } @@ -767,7 +914,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea task.completedAt = new Date() if (task.concurrencyKey) { this.concurrencyManager.release(task.concurrencyKey) - task.concurrencyKey = undefined // Prevent double-release + task.concurrencyKey = undefined } // Clean up pendingByParent to prevent stale entries this.cleanupPendingByParent(task) @@ -803,7 +950,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea for (const task of this.tasks.values()) { if (task.status !== "running") continue -try { + try { const sessionStatus = allStatuses[task.sessionID] // Don't skip if session not in status - fall through to message-based detection @@ -815,24 +962,16 @@ try { continue } + // Re-check status after async operation + if (task.status !== "running") continue + const hasIncompleteTodos = await this.checkSessionTodos(task.sessionID) if (hasIncompleteTodos) { log("[background-agent] Task has incomplete todos via polling, waiting:", task.id) continue } - task.status = "completed" - task.completedAt = new Date() - // Release concurrency immediately on completion - if (task.concurrencyKey) { - this.concurrencyManager.release(task.concurrencyKey) - task.concurrencyKey = undefined // Prevent double-release - } - // Clean up pendingByParent to prevent stale entries - this.cleanupPendingByParent(task) - this.markForNotification(task) - await this.notifyParentSession(task) - log("[background-agent] Task completed via polling:", task.id) + await this.tryCompleteTask(task, "polling (idle status)") continue } @@ -872,7 +1011,7 @@ try { task.progress.toolCalls = toolCalls task.progress.lastTool = lastTool task.progress.lastUpdate = new Date() -if (lastMessage) { + if (lastMessage) { task.progress.lastMessage = lastMessage task.progress.lastMessageAt = new Date() } @@ -892,20 +1031,12 @@ if (lastMessage) { continue } + // Re-check status after async operation + if (task.status !== "running") continue + const hasIncompleteTodos = await this.checkSessionTodos(task.sessionID) if (!hasIncompleteTodos) { - task.status = "completed" - task.completedAt = new Date() - // Release concurrency immediately on completion - if (task.concurrencyKey) { - this.concurrencyManager.release(task.concurrencyKey) - task.concurrencyKey = undefined // Prevent double-release - } - // Clean up pendingByParent to prevent stale entries - this.cleanupPendingByParent(task) - this.markForNotification(task) - await this.notifyParentSession(task) - log("[background-agent] Task completed via stability detection:", task.id) + await this.tryCompleteTask(task, "stability detection") continue } } @@ -924,8 +1055,53 @@ if (lastMessage) { this.stopPolling() } } + + /** + * Shutdown the manager gracefully. + * Cancels all pending concurrency waiters and clears timers. + * Should be called when the plugin is unloaded. + */ + shutdown(): void { + if (this.shutdownTriggered) return + this.shutdownTriggered = true + log("[background-agent] Shutting down BackgroundManager") + this.stopPolling() + + // Release concurrency for all running tasks first + for (const task of this.tasks.values()) { + if (task.concurrencyKey) { + this.concurrencyManager.release(task.concurrencyKey) + task.concurrencyKey = undefined + } + } + + // Then clear all state (cancels any remaining waiters) + this.concurrencyManager.clear() + this.tasks.clear() + this.notifications.clear() + this.pendingByParent.clear() + this.unregisterProcessCleanup() + log("[background-agent] Shutdown complete") + + } } +function registerProcessSignal( + signal: ProcessCleanupEvent, + handler: () => void, + exitAfter: boolean +): () => void { + const listener = () => { + handler() + if (exitAfter) { + process.exit(0) + } + } + process.on(signal, listener) + return listener +} + + function getMessageDir(sessionID: string): string | null { if (!existsSync(MESSAGE_STORAGE)) return null diff --git a/src/features/background-agent/types.ts b/src/features/background-agent/types.ts index 8c384211..795ca89b 100644 --- a/src/features/background-agent/types.ts +++ b/src/features/background-agent/types.ts @@ -28,10 +28,13 @@ export interface BackgroundTask { progress?: TaskProgress parentModel?: { providerID: string; modelID: string } model?: { providerID: string; modelID: string; variant?: string } - /** Agent name used for concurrency tracking */ + /** Active concurrency slot key */ concurrencyKey?: string + /** Persistent key for re-acquiring concurrency on resume */ + concurrencyGroup?: string /** Parent session's agent name for notification */ parentAgent?: string + /** Last message count for stability detection */ lastMsgCount?: number /** Number of consecutive polls with stable message count */