feat(concurrency): prevent background task races and leaks
Ensure queue waiters settle once, centralize completion with status guards, and release slots before async work so shutdown and cancellations don’t leak concurrency. Internal hardening only.
This commit is contained in:
parent
4c22d6de76
commit
03871262b2
@ -349,3 +349,70 @@ describe("ConcurrencyManager.acquire/release", () => {
|
|||||||
await waitPromise
|
await waitPromise
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
describe("ConcurrencyManager.cleanup", () => {
|
||||||
|
test("cancelWaiters should reject all pending acquires", async () => {
|
||||||
|
// #given
|
||||||
|
const config: BackgroundTaskConfig = { defaultConcurrency: 1 }
|
||||||
|
const manager = new ConcurrencyManager(config)
|
||||||
|
await manager.acquire("model-a")
|
||||||
|
|
||||||
|
// Queue waiters
|
||||||
|
const errors: Error[] = []
|
||||||
|
const p1 = manager.acquire("model-a").catch(e => errors.push(e))
|
||||||
|
const p2 = manager.acquire("model-a").catch(e => errors.push(e))
|
||||||
|
|
||||||
|
// #when
|
||||||
|
manager.cancelWaiters("model-a")
|
||||||
|
await Promise.all([p1, p2])
|
||||||
|
|
||||||
|
// #then
|
||||||
|
expect(errors.length).toBe(2)
|
||||||
|
expect(errors[0].message).toContain("cancelled")
|
||||||
|
})
|
||||||
|
|
||||||
|
test("clear should cancel all models and reset state", async () => {
|
||||||
|
// #given
|
||||||
|
const config: BackgroundTaskConfig = { defaultConcurrency: 1 }
|
||||||
|
const manager = new ConcurrencyManager(config)
|
||||||
|
await manager.acquire("model-a")
|
||||||
|
await manager.acquire("model-b")
|
||||||
|
|
||||||
|
const errors: Error[] = []
|
||||||
|
const p1 = manager.acquire("model-a").catch(e => errors.push(e))
|
||||||
|
const p2 = manager.acquire("model-b").catch(e => errors.push(e))
|
||||||
|
|
||||||
|
// #when
|
||||||
|
manager.clear()
|
||||||
|
await Promise.all([p1, p2])
|
||||||
|
|
||||||
|
// #then
|
||||||
|
expect(errors.length).toBe(2)
|
||||||
|
expect(manager.getCount("model-a")).toBe(0)
|
||||||
|
expect(manager.getCount("model-b")).toBe(0)
|
||||||
|
})
|
||||||
|
|
||||||
|
test("getCount and getQueueLength should return correct values", async () => {
|
||||||
|
// #given
|
||||||
|
const config: BackgroundTaskConfig = { defaultConcurrency: 2 }
|
||||||
|
const manager = new ConcurrencyManager(config)
|
||||||
|
|
||||||
|
// #when
|
||||||
|
await manager.acquire("model-a")
|
||||||
|
expect(manager.getCount("model-a")).toBe(1)
|
||||||
|
expect(manager.getQueueLength("model-a")).toBe(0)
|
||||||
|
|
||||||
|
await manager.acquire("model-a")
|
||||||
|
expect(manager.getCount("model-a")).toBe(2)
|
||||||
|
|
||||||
|
// Queue one more
|
||||||
|
const p = manager.acquire("model-a").catch(() => {})
|
||||||
|
await Promise.resolve() // let it queue
|
||||||
|
|
||||||
|
expect(manager.getQueueLength("model-a")).toBe(1)
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
manager.cancelWaiters("model-a")
|
||||||
|
await p
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|||||||
@ -1,9 +1,21 @@
|
|||||||
import type { BackgroundTaskConfig } from "../../config/schema"
|
import type { BackgroundTaskConfig } from "../../config/schema"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Queue entry with settled-flag pattern to prevent double-resolution.
|
||||||
|
*
|
||||||
|
* The settled flag ensures that cancelWaiters() doesn't reject
|
||||||
|
* an entry that was already resolved by release().
|
||||||
|
*/
|
||||||
|
interface QueueEntry {
|
||||||
|
resolve: () => void
|
||||||
|
rawReject: (error: Error) => void
|
||||||
|
settled: boolean
|
||||||
|
}
|
||||||
|
|
||||||
export class ConcurrencyManager {
|
export class ConcurrencyManager {
|
||||||
private config?: BackgroundTaskConfig
|
private config?: BackgroundTaskConfig
|
||||||
private counts: Map<string, number> = new Map()
|
private counts: Map<string, number> = new Map()
|
||||||
private queues: Map<string, Array<() => void>> = new Map()
|
private queues: Map<string, QueueEntry[]> = new Map()
|
||||||
|
|
||||||
constructor(config?: BackgroundTaskConfig) {
|
constructor(config?: BackgroundTaskConfig) {
|
||||||
this.config = config
|
this.config = config
|
||||||
@ -38,9 +50,20 @@ export class ConcurrencyManager {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
return new Promise<void>((resolve) => {
|
return new Promise<void>((resolve, reject) => {
|
||||||
const queue = this.queues.get(model) ?? []
|
const queue = this.queues.get(model) ?? []
|
||||||
queue.push(resolve)
|
|
||||||
|
const entry: QueueEntry = {
|
||||||
|
resolve: () => {
|
||||||
|
if (entry.settled) return
|
||||||
|
entry.settled = true
|
||||||
|
resolve()
|
||||||
|
},
|
||||||
|
rawReject: reject,
|
||||||
|
settled: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
queue.push(entry)
|
||||||
this.queues.set(model, queue)
|
this.queues.set(model, queue)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -52,15 +75,63 @@ export class ConcurrencyManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const queue = this.queues.get(model)
|
const queue = this.queues.get(model)
|
||||||
if (queue && queue.length > 0) {
|
|
||||||
|
// Try to hand off to a waiting entry (skip any settled entries from cancelWaiters)
|
||||||
|
while (queue && queue.length > 0) {
|
||||||
const next = queue.shift()!
|
const next = queue.shift()!
|
||||||
this.counts.set(model, this.counts.get(model) ?? 0)
|
if (!next.settled) {
|
||||||
next()
|
// Hand off the slot to this waiter (count stays the same)
|
||||||
} else {
|
next.resolve()
|
||||||
const current = this.counts.get(model) ?? 0
|
return
|
||||||
if (current > 0) {
|
|
||||||
this.counts.set(model, current - 1)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// No handoff occurred - decrement the count to free the slot
|
||||||
|
const current = this.counts.get(model) ?? 0
|
||||||
|
if (current > 0) {
|
||||||
|
this.counts.set(model, current - 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancel all waiting acquires for a model. Used during cleanup.
|
||||||
|
*/
|
||||||
|
cancelWaiters(model: string): void {
|
||||||
|
const queue = this.queues.get(model)
|
||||||
|
if (queue) {
|
||||||
|
for (const entry of queue) {
|
||||||
|
if (!entry.settled) {
|
||||||
|
entry.settled = true
|
||||||
|
entry.rawReject(new Error(`Concurrency queue cancelled for model: ${model}`))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.queues.delete(model)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear all state. Used during manager cleanup/shutdown.
|
||||||
|
* Cancels all pending waiters.
|
||||||
|
*/
|
||||||
|
clear(): void {
|
||||||
|
for (const [model] of this.queues) {
|
||||||
|
this.cancelWaiters(model)
|
||||||
|
}
|
||||||
|
this.counts.clear()
|
||||||
|
this.queues.clear()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get current count for a model (for testing/debugging)
|
||||||
|
*/
|
||||||
|
getCount(model: string): number {
|
||||||
|
return this.counts.get(model) ?? 0
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get queue length for a model (for testing/debugging)
|
||||||
|
*/
|
||||||
|
getQueueLength(model: string): number {
|
||||||
|
return this.queues.get(model)?.length ?? 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -122,6 +122,10 @@ class MockBackgroundManager {
|
|||||||
throw new Error(`Task not found for session: ${input.sessionId}`)
|
throw new Error(`Task not found for session: ${input.sessionId}`)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (existingTask.status === "running") {
|
||||||
|
return existingTask
|
||||||
|
}
|
||||||
|
|
||||||
this.resumeCalls.push({ sessionId: input.sessionId, prompt: input.prompt })
|
this.resumeCalls.push({ sessionId: input.sessionId, prompt: input.prompt })
|
||||||
|
|
||||||
existingTask.status = "running"
|
existingTask.status = "running"
|
||||||
@ -572,6 +576,7 @@ describe("BackgroundManager.resume", () => {
|
|||||||
parentSessionID: "old-parent",
|
parentSessionID: "old-parent",
|
||||||
description: "original description",
|
description: "original description",
|
||||||
agent: "explore",
|
agent: "explore",
|
||||||
|
status: "completed",
|
||||||
})
|
})
|
||||||
manager.addTask(existingTask)
|
manager.addTask(existingTask)
|
||||||
|
|
||||||
@ -598,6 +603,7 @@ describe("BackgroundManager.resume", () => {
|
|||||||
id: "task-a",
|
id: "task-a",
|
||||||
sessionID: "session-a",
|
sessionID: "session-a",
|
||||||
parentSessionID: "session-parent",
|
parentSessionID: "session-parent",
|
||||||
|
status: "completed",
|
||||||
})
|
})
|
||||||
manager.addTask(task)
|
manager.addTask(task)
|
||||||
|
|
||||||
@ -623,6 +629,7 @@ describe("BackgroundManager.resume", () => {
|
|||||||
id: "task-a",
|
id: "task-a",
|
||||||
sessionID: "session-a",
|
sessionID: "session-a",
|
||||||
parentSessionID: "session-parent",
|
parentSessionID: "session-parent",
|
||||||
|
status: "completed",
|
||||||
})
|
})
|
||||||
taskWithProgress.progress = {
|
taskWithProgress.progress = {
|
||||||
toolCalls: 42,
|
toolCalls: 42,
|
||||||
@ -642,6 +649,29 @@ describe("BackgroundManager.resume", () => {
|
|||||||
// #then
|
// #then
|
||||||
expect(result.progress?.toolCalls).toBe(42)
|
expect(result.progress?.toolCalls).toBe(42)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test("should ignore resume when task is already running", () => {
|
||||||
|
// #given
|
||||||
|
const runningTask = createMockTask({
|
||||||
|
id: "task-a",
|
||||||
|
sessionID: "session-a",
|
||||||
|
parentSessionID: "session-parent",
|
||||||
|
status: "running",
|
||||||
|
})
|
||||||
|
manager.addTask(runningTask)
|
||||||
|
|
||||||
|
// #when
|
||||||
|
const result = manager.resume({
|
||||||
|
sessionId: "session-a",
|
||||||
|
prompt: "resume should be ignored",
|
||||||
|
parentSessionID: "new-parent",
|
||||||
|
parentMessageID: "new-msg",
|
||||||
|
})
|
||||||
|
|
||||||
|
// #then
|
||||||
|
expect(result.parentSessionID).toBe("session-parent")
|
||||||
|
expect(manager.resumeCalls).toHaveLength(0)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
describe("LaunchInput.skillContent", () => {
|
describe("LaunchInput.skillContent", () => {
|
||||||
@ -813,3 +843,149 @@ function buildNotificationPromptBody(
|
|||||||
|
|
||||||
return body
|
return body
|
||||||
}
|
}
|
||||||
|
|
||||||
|
describe("tryCompleteTask pattern - race condition prevention", () => {
|
||||||
|
/**
|
||||||
|
* These tests verify the tryCompleteTask pattern behavior
|
||||||
|
* by simulating the guard logic in a mock implementation.
|
||||||
|
*/
|
||||||
|
|
||||||
|
test("should prevent double completion when task already completed", () => {
|
||||||
|
// #given - task already completed
|
||||||
|
const task: BackgroundTask = {
|
||||||
|
id: "task-1",
|
||||||
|
sessionID: "session-1",
|
||||||
|
parentSessionID: "session-parent",
|
||||||
|
parentMessageID: "msg-1",
|
||||||
|
description: "test task",
|
||||||
|
prompt: "test",
|
||||||
|
agent: "explore",
|
||||||
|
status: "completed",
|
||||||
|
startedAt: new Date(),
|
||||||
|
completedAt: new Date(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// #when - try to complete again (simulating tryCompleteTask guard)
|
||||||
|
const canComplete = task.status === "running"
|
||||||
|
|
||||||
|
// #then - should not allow completion
|
||||||
|
expect(canComplete).toBe(false)
|
||||||
|
})
|
||||||
|
|
||||||
|
test("should allow completion when task is running", () => {
|
||||||
|
// #given - task is running
|
||||||
|
const task: BackgroundTask = {
|
||||||
|
id: "task-1",
|
||||||
|
sessionID: "session-1",
|
||||||
|
parentSessionID: "session-parent",
|
||||||
|
parentMessageID: "msg-1",
|
||||||
|
description: "test task",
|
||||||
|
prompt: "test",
|
||||||
|
agent: "explore",
|
||||||
|
status: "running",
|
||||||
|
startedAt: new Date(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// #when - check if can complete
|
||||||
|
const canComplete = task.status === "running"
|
||||||
|
|
||||||
|
// #then
|
||||||
|
expect(canComplete).toBe(true)
|
||||||
|
})
|
||||||
|
|
||||||
|
test("should prevent completion when task is cancelled", () => {
|
||||||
|
// #given - task cancelled by session.deleted
|
||||||
|
const task: BackgroundTask = {
|
||||||
|
id: "task-1",
|
||||||
|
sessionID: "session-1",
|
||||||
|
parentSessionID: "session-parent",
|
||||||
|
parentMessageID: "msg-1",
|
||||||
|
description: "test task",
|
||||||
|
prompt: "test",
|
||||||
|
agent: "explore",
|
||||||
|
status: "cancelled",
|
||||||
|
startedAt: new Date(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// #when
|
||||||
|
const canComplete = task.status === "running"
|
||||||
|
|
||||||
|
// #then
|
||||||
|
expect(canComplete).toBe(false)
|
||||||
|
})
|
||||||
|
|
||||||
|
test("should prevent completion when task errored", () => {
|
||||||
|
// #given - task errored
|
||||||
|
const task: BackgroundTask = {
|
||||||
|
id: "task-1",
|
||||||
|
sessionID: "session-1",
|
||||||
|
parentSessionID: "session-parent",
|
||||||
|
parentMessageID: "msg-1",
|
||||||
|
description: "test task",
|
||||||
|
prompt: "test",
|
||||||
|
agent: "explore",
|
||||||
|
status: "error",
|
||||||
|
error: "some error",
|
||||||
|
startedAt: new Date(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// #when
|
||||||
|
const canComplete = task.status === "running"
|
||||||
|
|
||||||
|
// #then
|
||||||
|
expect(canComplete).toBe(false)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe("concurrencyKey management", () => {
|
||||||
|
test("concurrencyKey should be undefined after release", () => {
|
||||||
|
// #given - task with concurrency key
|
||||||
|
const task: BackgroundTask = {
|
||||||
|
id: "task-1",
|
||||||
|
sessionID: "session-1",
|
||||||
|
parentSessionID: "session-parent",
|
||||||
|
parentMessageID: "msg-1",
|
||||||
|
description: "test task",
|
||||||
|
prompt: "test",
|
||||||
|
agent: "explore",
|
||||||
|
status: "running",
|
||||||
|
startedAt: new Date(),
|
||||||
|
concurrencyKey: "anthropic/claude-sonnet-4-5",
|
||||||
|
}
|
||||||
|
|
||||||
|
// #when - simulate release pattern (what tryCompleteTask does)
|
||||||
|
if (task.concurrencyKey) {
|
||||||
|
// concurrencyManager.release(task.concurrencyKey) would be called
|
||||||
|
task.concurrencyKey = undefined
|
||||||
|
}
|
||||||
|
|
||||||
|
// #then
|
||||||
|
expect(task.concurrencyKey).toBeUndefined()
|
||||||
|
})
|
||||||
|
|
||||||
|
test("release should be idempotent with concurrencyKey guard", () => {
|
||||||
|
// #given - task with key already released
|
||||||
|
const task: BackgroundTask = {
|
||||||
|
id: "task-1",
|
||||||
|
sessionID: "session-1",
|
||||||
|
parentSessionID: "session-parent",
|
||||||
|
parentMessageID: "msg-1",
|
||||||
|
description: "test task",
|
||||||
|
prompt: "test",
|
||||||
|
agent: "explore",
|
||||||
|
status: "completed",
|
||||||
|
startedAt: new Date(),
|
||||||
|
concurrencyKey: undefined, // already released
|
||||||
|
}
|
||||||
|
|
||||||
|
// #when - try to release again (guard pattern)
|
||||||
|
let releaseCount = 0
|
||||||
|
if (task.concurrencyKey) {
|
||||||
|
releaseCount++
|
||||||
|
task.concurrencyKey = undefined
|
||||||
|
}
|
||||||
|
|
||||||
|
// #then - no double release
|
||||||
|
expect(releaseCount).toBe(0)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|||||||
@ -52,6 +52,8 @@ export class BackgroundManager {
|
|||||||
private directory: string
|
private directory: string
|
||||||
private pollingInterval?: ReturnType<typeof setInterval>
|
private pollingInterval?: ReturnType<typeof setInterval>
|
||||||
private concurrencyManager: ConcurrencyManager
|
private concurrencyManager: ConcurrencyManager
|
||||||
|
private cleanupRegistered = false
|
||||||
|
private shutdownTriggered = false
|
||||||
|
|
||||||
constructor(ctx: PluginInput, config?: BackgroundTaskConfig) {
|
constructor(ctx: PluginInput, config?: BackgroundTaskConfig) {
|
||||||
this.tasks = new Map()
|
this.tasks = new Map()
|
||||||
@ -60,6 +62,7 @@ export class BackgroundManager {
|
|||||||
this.client = ctx.client
|
this.client = ctx.client
|
||||||
this.directory = ctx.directory
|
this.directory = ctx.directory
|
||||||
this.concurrencyManager = new ConcurrencyManager(config)
|
this.concurrencyManager = new ConcurrencyManager(config)
|
||||||
|
this.registerProcessCleanup()
|
||||||
}
|
}
|
||||||
|
|
||||||
async launch(input: LaunchInput): Promise<BackgroundTask> {
|
async launch(input: LaunchInput): Promise<BackgroundTask> {
|
||||||
@ -186,7 +189,7 @@ export class BackgroundManager {
|
|||||||
existingTask.completedAt = new Date()
|
existingTask.completedAt = new Date()
|
||||||
if (existingTask.concurrencyKey) {
|
if (existingTask.concurrencyKey) {
|
||||||
this.concurrencyManager.release(existingTask.concurrencyKey)
|
this.concurrencyManager.release(existingTask.concurrencyKey)
|
||||||
existingTask.concurrencyKey = undefined // Prevent double-release
|
existingTask.concurrencyKey = undefined
|
||||||
}
|
}
|
||||||
this.markForNotification(existingTask)
|
this.markForNotification(existingTask)
|
||||||
this.notifyParentSession(existingTask).catch(err => {
|
this.notifyParentSession(existingTask).catch(err => {
|
||||||
@ -238,14 +241,20 @@ export class BackgroundManager {
|
|||||||
* Register an external task (e.g., from sisyphus_task) for notification tracking.
|
* Register an external task (e.g., from sisyphus_task) for notification tracking.
|
||||||
* This allows tasks created by external tools to receive the same toast/prompt notifications.
|
* This allows tasks created by external tools to receive the same toast/prompt notifications.
|
||||||
*/
|
*/
|
||||||
registerExternalTask(input: {
|
async registerExternalTask(input: {
|
||||||
taskId: string
|
taskId: string
|
||||||
sessionID: string
|
sessionID: string
|
||||||
parentSessionID: string
|
parentSessionID: string
|
||||||
description: string
|
description: string
|
||||||
agent?: string
|
agent?: string
|
||||||
parentAgent?: string
|
parentAgent?: string
|
||||||
}): BackgroundTask {
|
concurrencyKey?: string
|
||||||
|
}): Promise<BackgroundTask> {
|
||||||
|
// Acquire concurrency slot if a key is provided
|
||||||
|
if (input.concurrencyKey) {
|
||||||
|
await this.concurrencyManager.acquire(input.concurrencyKey)
|
||||||
|
}
|
||||||
|
|
||||||
const task: BackgroundTask = {
|
const task: BackgroundTask = {
|
||||||
id: input.taskId,
|
id: input.taskId,
|
||||||
sessionID: input.sessionID,
|
sessionID: input.sessionID,
|
||||||
@ -261,6 +270,7 @@ export class BackgroundManager {
|
|||||||
lastUpdate: new Date(),
|
lastUpdate: new Date(),
|
||||||
},
|
},
|
||||||
parentAgent: input.parentAgent,
|
parentAgent: input.parentAgent,
|
||||||
|
concurrencyKey: input.concurrencyKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
this.tasks.set(task.id, task)
|
this.tasks.set(task.id, task)
|
||||||
@ -283,6 +293,21 @@ export class BackgroundManager {
|
|||||||
throw new Error(`Task not found for session: ${input.sessionId}`)
|
throw new Error(`Task not found for session: ${input.sessionId}`)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (existingTask.status === "running") {
|
||||||
|
log("[background-agent] Resume skipped - task already running:", {
|
||||||
|
taskId: existingTask.id,
|
||||||
|
sessionID: existingTask.sessionID,
|
||||||
|
})
|
||||||
|
return existingTask
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-acquire concurrency using the agent name as the key (same as launch()).
|
||||||
|
// Note: existingTask.concurrencyKey is cleared when tasks complete, so we
|
||||||
|
// derive the key from task.agent which persists through completion.
|
||||||
|
const concurrencyKey = existingTask.agent
|
||||||
|
await this.concurrencyManager.acquire(concurrencyKey)
|
||||||
|
existingTask.concurrencyKey = concurrencyKey
|
||||||
|
|
||||||
existingTask.status = "running"
|
existingTask.status = "running"
|
||||||
existingTask.completedAt = undefined
|
existingTask.completedAt = undefined
|
||||||
existingTask.error = undefined
|
existingTask.error = undefined
|
||||||
@ -344,11 +369,12 @@ export class BackgroundManager {
|
|||||||
const errorMessage = error instanceof Error ? error.message : String(error)
|
const errorMessage = error instanceof Error ? error.message : String(error)
|
||||||
existingTask.error = errorMessage
|
existingTask.error = errorMessage
|
||||||
existingTask.completedAt = new Date()
|
existingTask.completedAt = new Date()
|
||||||
// Release concurrency on resume error (matches launch error handler)
|
|
||||||
if (existingTask.concurrencyKey) {
|
// Release concurrency on error to prevent slot leaks
|
||||||
this.concurrencyManager.release(existingTask.concurrencyKey)
|
if (existingTask.concurrencyKey) {
|
||||||
existingTask.concurrencyKey = undefined // Prevent double-release
|
this.concurrencyManager.release(existingTask.concurrencyKey)
|
||||||
}
|
existingTask.concurrencyKey = undefined
|
||||||
|
}
|
||||||
this.markForNotification(existingTask)
|
this.markForNotification(existingTask)
|
||||||
this.notifyParentSession(existingTask).catch(err => {
|
this.notifyParentSession(existingTask).catch(err => {
|
||||||
log("[background-agent] Failed to notify on resume error:", err)
|
log("[background-agent] Failed to notify on resume error:", err)
|
||||||
@ -417,29 +443,31 @@ export class BackgroundManager {
|
|||||||
|
|
||||||
// Edge guard: Verify session has actual assistant output before completing
|
// Edge guard: Verify session has actual assistant output before completing
|
||||||
this.validateSessionHasOutput(sessionID).then(async (hasValidOutput) => {
|
this.validateSessionHasOutput(sessionID).then(async (hasValidOutput) => {
|
||||||
|
// Re-check status after async operation (could have been completed by polling)
|
||||||
|
if (task.status !== "running") {
|
||||||
|
log("[background-agent] Task status changed during validation, skipping:", { taskId: task.id, status: task.status })
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if (!hasValidOutput) {
|
if (!hasValidOutput) {
|
||||||
log("[background-agent] Session.idle but no valid output yet, waiting:", task.id)
|
log("[background-agent] Session.idle but no valid output yet, waiting:", task.id)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
const hasIncompleteTodos = await this.checkSessionTodos(sessionID)
|
const hasIncompleteTodos = await this.checkSessionTodos(sessionID)
|
||||||
|
|
||||||
|
// Re-check status after async operation again
|
||||||
|
if (task.status !== "running") {
|
||||||
|
log("[background-agent] Task status changed during todo check, skipping:", { taskId: task.id, status: task.status })
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if (hasIncompleteTodos) {
|
if (hasIncompleteTodos) {
|
||||||
log("[background-agent] Task has incomplete todos, waiting for todo-continuation:", task.id)
|
log("[background-agent] Task has incomplete todos, waiting for todo-continuation:", task.id)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
task.status = "completed"
|
await this.tryCompleteTask(task, "session.idle event")
|
||||||
task.completedAt = new Date()
|
|
||||||
// Release concurrency immediately on completion
|
|
||||||
if (task.concurrencyKey) {
|
|
||||||
this.concurrencyManager.release(task.concurrencyKey)
|
|
||||||
task.concurrencyKey = undefined // Prevent double-release
|
|
||||||
}
|
|
||||||
// Clean up pendingByParent to prevent stale entries
|
|
||||||
this.cleanupPendingByParent(task)
|
|
||||||
this.markForNotification(task)
|
|
||||||
await this.notifyParentSession(task)
|
|
||||||
log("[background-agent] Task completed via session.idle event:", task.id)
|
|
||||||
}).catch(err => {
|
}).catch(err => {
|
||||||
log("[background-agent] Error in session.idle handler:", err)
|
log("[background-agent] Error in session.idle handler:", err)
|
||||||
})
|
})
|
||||||
@ -459,10 +487,10 @@ export class BackgroundManager {
|
|||||||
task.error = "Session deleted"
|
task.error = "Session deleted"
|
||||||
}
|
}
|
||||||
|
|
||||||
if (task.concurrencyKey) {
|
if (task.concurrencyKey) {
|
||||||
this.concurrencyManager.release(task.concurrencyKey)
|
this.concurrencyManager.release(task.concurrencyKey)
|
||||||
task.concurrencyKey = undefined // Prevent double-release
|
task.concurrencyKey = undefined
|
||||||
}
|
}
|
||||||
// Clean up pendingByParent to prevent stale entries
|
// Clean up pendingByParent to prevent stale entries
|
||||||
this.cleanupPendingByParent(task)
|
this.cleanupPendingByParent(task)
|
||||||
this.tasks.delete(task.id)
|
this.tasks.delete(task.id)
|
||||||
@ -587,11 +615,25 @@ export class BackgroundManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup(): void {
|
private registerProcessCleanup(): void {
|
||||||
this.stopPolling()
|
if (this.cleanupRegistered) return
|
||||||
this.tasks.clear()
|
this.cleanupRegistered = true
|
||||||
this.notifications.clear()
|
|
||||||
this.pendingByParent.clear()
|
const cleanup = () => {
|
||||||
|
try {
|
||||||
|
this.shutdown()
|
||||||
|
} catch (error) {
|
||||||
|
log("[background-agent] Error during shutdown cleanup:", error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
registerProcessSignal("SIGINT", cleanup)
|
||||||
|
registerProcessSignal("SIGTERM", cleanup)
|
||||||
|
if (process.platform === "win32") {
|
||||||
|
registerProcessSignal("SIGBREAK", cleanup)
|
||||||
|
}
|
||||||
|
process.on("beforeExit", cleanup)
|
||||||
|
process.on("exit", cleanup)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -608,12 +650,44 @@ cleanup(): void {
|
|||||||
return Array.from(this.tasks.values()).filter(t => t.status !== "running")
|
return Array.from(this.tasks.values()).filter(t => t.status !== "running")
|
||||||
}
|
}
|
||||||
|
|
||||||
private async notifyParentSession(task: BackgroundTask): Promise<void> {
|
/**
|
||||||
|
* Safely complete a task with race condition protection.
|
||||||
|
* Returns true if task was successfully completed, false if already completed by another path.
|
||||||
|
*/
|
||||||
|
private async tryCompleteTask(task: BackgroundTask, source: string): Promise<boolean> {
|
||||||
|
// Guard: Check if task is still running (could have been completed by another path)
|
||||||
|
if (task.status !== "running") {
|
||||||
|
log("[background-agent] Task already completed, skipping:", { taskId: task.id, status: task.status, source })
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Atomically mark as completed to prevent race conditions
|
||||||
|
task.status = "completed"
|
||||||
|
task.completedAt = new Date()
|
||||||
|
|
||||||
|
// Release concurrency BEFORE any async operations to prevent slot leaks
|
||||||
if (task.concurrencyKey) {
|
if (task.concurrencyKey) {
|
||||||
this.concurrencyManager.release(task.concurrencyKey)
|
this.concurrencyManager.release(task.concurrencyKey)
|
||||||
task.concurrencyKey = undefined
|
task.concurrencyKey = undefined
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.markForNotification(task)
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.notifyParentSession(task)
|
||||||
|
log(`[background-agent] Task completed via ${source}:`, task.id)
|
||||||
|
} catch (err) {
|
||||||
|
log("[background-agent] Error in notifyParentSession:", { taskId: task.id, error: err })
|
||||||
|
// Concurrency already released, notification failed but task is complete
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
private async notifyParentSession(task: BackgroundTask): Promise<void> {
|
||||||
|
// Note: Callers must release concurrency before calling this method
|
||||||
|
// to ensure slots are freed even if notification fails
|
||||||
|
|
||||||
const duration = this.formatDuration(task.startedAt, task.completedAt)
|
const duration = this.formatDuration(task.startedAt, task.completedAt)
|
||||||
|
|
||||||
log("[background-agent] notifyParentSession called for task:", task.id)
|
log("[background-agent] notifyParentSession called for task:", task.id)
|
||||||
@ -715,10 +789,12 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
|||||||
|
|
||||||
const taskId = task.id
|
const taskId = task.id
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
// Concurrency already released at completion - just cleanup notifications and task
|
// Guard: Only delete if task still exists (could have been deleted by session.deleted event)
|
||||||
this.clearNotificationsForTask(taskId)
|
if (this.tasks.has(taskId)) {
|
||||||
this.tasks.delete(taskId)
|
this.clearNotificationsForTask(taskId)
|
||||||
log("[background-agent] Removed completed task from memory:", taskId)
|
this.tasks.delete(taskId)
|
||||||
|
log("[background-agent] Removed completed task from memory:", taskId)
|
||||||
|
}
|
||||||
}, 5 * 60 * 1000)
|
}, 5 * 60 * 1000)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -755,7 +831,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
|||||||
task.completedAt = new Date()
|
task.completedAt = new Date()
|
||||||
if (task.concurrencyKey) {
|
if (task.concurrencyKey) {
|
||||||
this.concurrencyManager.release(task.concurrencyKey)
|
this.concurrencyManager.release(task.concurrencyKey)
|
||||||
task.concurrencyKey = undefined // Prevent double-release
|
task.concurrencyKey = undefined
|
||||||
}
|
}
|
||||||
// Clean up pendingByParent to prevent stale entries
|
// Clean up pendingByParent to prevent stale entries
|
||||||
this.cleanupPendingByParent(task)
|
this.cleanupPendingByParent(task)
|
||||||
@ -791,7 +867,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
|||||||
for (const task of this.tasks.values()) {
|
for (const task of this.tasks.values()) {
|
||||||
if (task.status !== "running") continue
|
if (task.status !== "running") continue
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const sessionStatus = allStatuses[task.sessionID]
|
const sessionStatus = allStatuses[task.sessionID]
|
||||||
|
|
||||||
// Don't skip if session not in status - fall through to message-based detection
|
// Don't skip if session not in status - fall through to message-based detection
|
||||||
@ -803,24 +879,16 @@ try {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Re-check status after async operation
|
||||||
|
if (task.status !== "running") continue
|
||||||
|
|
||||||
const hasIncompleteTodos = await this.checkSessionTodos(task.sessionID)
|
const hasIncompleteTodos = await this.checkSessionTodos(task.sessionID)
|
||||||
if (hasIncompleteTodos) {
|
if (hasIncompleteTodos) {
|
||||||
log("[background-agent] Task has incomplete todos via polling, waiting:", task.id)
|
log("[background-agent] Task has incomplete todos via polling, waiting:", task.id)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
task.status = "completed"
|
await this.tryCompleteTask(task, "polling (idle status)")
|
||||||
task.completedAt = new Date()
|
|
||||||
// Release concurrency immediately on completion
|
|
||||||
if (task.concurrencyKey) {
|
|
||||||
this.concurrencyManager.release(task.concurrencyKey)
|
|
||||||
task.concurrencyKey = undefined // Prevent double-release
|
|
||||||
}
|
|
||||||
// Clean up pendingByParent to prevent stale entries
|
|
||||||
this.cleanupPendingByParent(task)
|
|
||||||
this.markForNotification(task)
|
|
||||||
await this.notifyParentSession(task)
|
|
||||||
log("[background-agent] Task completed via polling:", task.id)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -860,7 +928,7 @@ try {
|
|||||||
task.progress.toolCalls = toolCalls
|
task.progress.toolCalls = toolCalls
|
||||||
task.progress.lastTool = lastTool
|
task.progress.lastTool = lastTool
|
||||||
task.progress.lastUpdate = new Date()
|
task.progress.lastUpdate = new Date()
|
||||||
if (lastMessage) {
|
if (lastMessage) {
|
||||||
task.progress.lastMessage = lastMessage
|
task.progress.lastMessage = lastMessage
|
||||||
task.progress.lastMessageAt = new Date()
|
task.progress.lastMessageAt = new Date()
|
||||||
}
|
}
|
||||||
@ -880,20 +948,12 @@ if (lastMessage) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Re-check status after async operation
|
||||||
|
if (task.status !== "running") continue
|
||||||
|
|
||||||
const hasIncompleteTodos = await this.checkSessionTodos(task.sessionID)
|
const hasIncompleteTodos = await this.checkSessionTodos(task.sessionID)
|
||||||
if (!hasIncompleteTodos) {
|
if (!hasIncompleteTodos) {
|
||||||
task.status = "completed"
|
await this.tryCompleteTask(task, "stability detection")
|
||||||
task.completedAt = new Date()
|
|
||||||
// Release concurrency immediately on completion
|
|
||||||
if (task.concurrencyKey) {
|
|
||||||
this.concurrencyManager.release(task.concurrencyKey)
|
|
||||||
task.concurrencyKey = undefined // Prevent double-release
|
|
||||||
}
|
|
||||||
// Clean up pendingByParent to prevent stale entries
|
|
||||||
this.cleanupPendingByParent(task)
|
|
||||||
this.markForNotification(task)
|
|
||||||
await this.notifyParentSession(task)
|
|
||||||
log("[background-agent] Task completed via stability detection:", task.id)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -912,6 +972,43 @@ if (lastMessage) {
|
|||||||
this.stopPolling()
|
this.stopPolling()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown the manager gracefully.
|
||||||
|
* Cancels all pending concurrency waiters and clears timers.
|
||||||
|
* Should be called when the plugin is unloaded.
|
||||||
|
*/
|
||||||
|
shutdown(): void {
|
||||||
|
if (this.shutdownTriggered) return
|
||||||
|
this.shutdownTriggered = true
|
||||||
|
log("[background-agent] Shutting down BackgroundManager")
|
||||||
|
this.stopPolling()
|
||||||
|
|
||||||
|
// Release concurrency for all running tasks first
|
||||||
|
for (const task of this.tasks.values()) {
|
||||||
|
if (task.concurrencyKey) {
|
||||||
|
this.concurrencyManager.release(task.concurrencyKey)
|
||||||
|
task.concurrencyKey = undefined
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then clear all state (cancels any remaining waiters)
|
||||||
|
this.concurrencyManager.clear()
|
||||||
|
this.tasks.clear()
|
||||||
|
this.notifications.clear()
|
||||||
|
this.pendingByParent.clear()
|
||||||
|
log("[background-agent] Shutdown complete")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function registerProcessSignal(
|
||||||
|
signal: NodeJS.Signals,
|
||||||
|
handler: () => void
|
||||||
|
): void {
|
||||||
|
process.on(signal, () => {
|
||||||
|
handler()
|
||||||
|
process.exit(0)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
function getMessageDir(sessionID: string): string | null {
|
function getMessageDir(sessionID: string): string | null {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user