Merge pull request #1646 from code-yeongyu/fix/background-task-race-condition-1582
fix(background-agent): serialize parent notifications (#1582)
This commit is contained in:
commit
cfd63482d7
@ -1123,6 +1123,99 @@ describe("BackgroundManager.tryCompleteTask", () => {
|
|||||||
expect(task.status).toBe("completed")
|
expect(task.status).toBe("completed")
|
||||||
expect(getPendingByParent(manager).get(task.parentSessionID)).toBeUndefined()
|
expect(getPendingByParent(manager).get(task.parentSessionID)).toBeUndefined()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test("should avoid overlapping promptAsync calls when tasks complete concurrently", async () => {
|
||||||
|
// given
|
||||||
|
type PromptAsyncBody = Record<string, unknown> & { noReply?: boolean }
|
||||||
|
|
||||||
|
let resolveMessages: ((value: { data: unknown[] }) => void) | undefined
|
||||||
|
const messagesBarrier = new Promise<{ data: unknown[] }>((resolve) => {
|
||||||
|
resolveMessages = resolve
|
||||||
|
})
|
||||||
|
|
||||||
|
const promptBodies: PromptAsyncBody[] = []
|
||||||
|
let promptInFlight = false
|
||||||
|
let rejectedCount = 0
|
||||||
|
let promptCallCount = 0
|
||||||
|
|
||||||
|
let releaseFirstPrompt: (() => void) | undefined
|
||||||
|
let resolveFirstStarted: (() => void) | undefined
|
||||||
|
const firstStarted = new Promise<void>((resolve) => {
|
||||||
|
resolveFirstStarted = resolve
|
||||||
|
})
|
||||||
|
|
||||||
|
const client = {
|
||||||
|
session: {
|
||||||
|
prompt: async () => ({}),
|
||||||
|
abort: async () => ({}),
|
||||||
|
messages: async () => messagesBarrier,
|
||||||
|
promptAsync: async (args: { path: { id: string }; body: PromptAsyncBody }) => {
|
||||||
|
promptBodies.push(args.body)
|
||||||
|
|
||||||
|
if (!promptInFlight) {
|
||||||
|
promptCallCount += 1
|
||||||
|
if (promptCallCount === 1) {
|
||||||
|
promptInFlight = true
|
||||||
|
resolveFirstStarted?.()
|
||||||
|
return await new Promise((resolve) => {
|
||||||
|
releaseFirstPrompt = () => {
|
||||||
|
promptInFlight = false
|
||||||
|
resolve({})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return {}
|
||||||
|
}
|
||||||
|
|
||||||
|
rejectedCount += 1
|
||||||
|
throw new Error("BUSY")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
manager.shutdown()
|
||||||
|
manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput)
|
||||||
|
|
||||||
|
const parentSessionID = "parent-session"
|
||||||
|
const taskA = createMockTask({
|
||||||
|
id: "task-a",
|
||||||
|
sessionID: "session-a",
|
||||||
|
parentSessionID,
|
||||||
|
})
|
||||||
|
const taskB = createMockTask({
|
||||||
|
id: "task-b",
|
||||||
|
sessionID: "session-b",
|
||||||
|
parentSessionID,
|
||||||
|
})
|
||||||
|
|
||||||
|
getTaskMap(manager).set(taskA.id, taskA)
|
||||||
|
getTaskMap(manager).set(taskB.id, taskB)
|
||||||
|
getPendingByParent(manager).set(parentSessionID, new Set([taskA.id, taskB.id]))
|
||||||
|
|
||||||
|
// when
|
||||||
|
const completionA = tryCompleteTaskForTest(manager, taskA)
|
||||||
|
const completionB = tryCompleteTaskForTest(manager, taskB)
|
||||||
|
resolveMessages?.({ data: [] })
|
||||||
|
|
||||||
|
await firstStarted
|
||||||
|
|
||||||
|
// Give the second completion a chance to attempt promptAsync while the first is in-flight.
|
||||||
|
// In the buggy implementation, this triggers an overlap and increments rejectedCount.
|
||||||
|
for (let i = 0; i < 20; i++) {
|
||||||
|
await Promise.resolve()
|
||||||
|
if (rejectedCount > 0) break
|
||||||
|
if (promptBodies.length >= 2) break
|
||||||
|
}
|
||||||
|
|
||||||
|
releaseFirstPrompt?.()
|
||||||
|
await Promise.all([completionA, completionB])
|
||||||
|
|
||||||
|
// then
|
||||||
|
expect(rejectedCount).toBe(0)
|
||||||
|
expect(promptBodies.length).toBe(2)
|
||||||
|
expect(promptBodies.some((b) => b.noReply === false)).toBe(true)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
describe("BackgroundManager.trackTask", () => {
|
describe("BackgroundManager.trackTask", () => {
|
||||||
|
|||||||
@ -89,6 +89,7 @@ export class BackgroundManager {
|
|||||||
private processingKeys: Set<string> = new Set()
|
private processingKeys: Set<string> = new Set()
|
||||||
private completionTimers: Map<string, ReturnType<typeof setTimeout>> = new Map()
|
private completionTimers: Map<string, ReturnType<typeof setTimeout>> = new Map()
|
||||||
private idleDeferralTimers: Map<string, ReturnType<typeof setTimeout>> = new Map()
|
private idleDeferralTimers: Map<string, ReturnType<typeof setTimeout>> = new Map()
|
||||||
|
private notificationQueueByParent: Map<string, Promise<void>> = new Map()
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
ctx: PluginInput,
|
ctx: PluginInput,
|
||||||
@ -358,7 +359,7 @@ export class BackgroundManager {
|
|||||||
|
|
||||||
this.markForNotification(existingTask)
|
this.markForNotification(existingTask)
|
||||||
this.cleanupPendingByParent(existingTask)
|
this.cleanupPendingByParent(existingTask)
|
||||||
this.notifyParentSession(existingTask).catch(err => {
|
this.enqueueNotificationForParent(existingTask.parentSessionID, () => this.notifyParentSession(existingTask)).catch(err => {
|
||||||
log("[background-agent] Failed to notify on error:", err)
|
log("[background-agent] Failed to notify on error:", err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -615,7 +616,7 @@ export class BackgroundManager {
|
|||||||
|
|
||||||
this.markForNotification(existingTask)
|
this.markForNotification(existingTask)
|
||||||
this.cleanupPendingByParent(existingTask)
|
this.cleanupPendingByParent(existingTask)
|
||||||
this.notifyParentSession(existingTask).catch(err => {
|
this.enqueueNotificationForParent(existingTask.parentSessionID, () => this.notifyParentSession(existingTask)).catch(err => {
|
||||||
log("[background-agent] Failed to notify on resume error:", err)
|
log("[background-agent] Failed to notify on resume error:", err)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@ -949,7 +950,7 @@ export class BackgroundManager {
|
|||||||
this.markForNotification(task)
|
this.markForNotification(task)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await this.notifyParentSession(task)
|
await this.enqueueNotificationForParent(task.parentSessionID, () => this.notifyParentSession(task))
|
||||||
log(`[background-agent] Task cancelled via ${source}:`, task.id)
|
log(`[background-agent] Task cancelled via ${source}:`, task.id)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
log("[background-agent] Error in notifyParentSession for cancelled task:", { taskId: task.id, error: err })
|
log("[background-agent] Error in notifyParentSession for cancelled task:", { taskId: task.id, error: err })
|
||||||
@ -1084,7 +1085,7 @@ export class BackgroundManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await this.notifyParentSession(task)
|
await this.enqueueNotificationForParent(task.parentSessionID, () => this.notifyParentSession(task))
|
||||||
log(`[background-agent] Task completed via ${source}:`, task.id)
|
log(`[background-agent] Task completed via ${source}:`, task.id)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
log("[background-agent] Error in notifyParentSession:", { taskId: task.id, error: err })
|
log("[background-agent] Error in notifyParentSession:", { taskId: task.id, error: err })
|
||||||
@ -1114,16 +1115,19 @@ export class BackgroundManager {
|
|||||||
|
|
||||||
// Update pending tracking and check if all tasks complete
|
// Update pending tracking and check if all tasks complete
|
||||||
const pendingSet = this.pendingByParent.get(task.parentSessionID)
|
const pendingSet = this.pendingByParent.get(task.parentSessionID)
|
||||||
|
let allComplete = false
|
||||||
|
let remainingCount = 0
|
||||||
if (pendingSet) {
|
if (pendingSet) {
|
||||||
pendingSet.delete(task.id)
|
pendingSet.delete(task.id)
|
||||||
if (pendingSet.size === 0) {
|
remainingCount = pendingSet.size
|
||||||
|
allComplete = remainingCount === 0
|
||||||
|
if (allComplete) {
|
||||||
this.pendingByParent.delete(task.parentSessionID)
|
this.pendingByParent.delete(task.parentSessionID)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
allComplete = true
|
||||||
}
|
}
|
||||||
|
|
||||||
const allComplete = !pendingSet || pendingSet.size === 0
|
|
||||||
const remainingCount = pendingSet?.size ?? 0
|
|
||||||
|
|
||||||
const statusText = task.status === "completed" ? "COMPLETED" : "CANCELLED"
|
const statusText = task.status === "completed" ? "COMPLETED" : "CANCELLED"
|
||||||
const errorInfo = task.error ? `\n**Error:** ${task.error}` : ""
|
const errorInfo = task.error ? `\n**Error:** ${task.error}` : ""
|
||||||
|
|
||||||
@ -1378,7 +1382,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
|||||||
log(`[background-agent] Task ${task.id} interrupted: stale timeout`)
|
log(`[background-agent] Task ${task.id} interrupted: stale timeout`)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await this.notifyParentSession(task)
|
await this.enqueueNotificationForParent(task.parentSessionID, () => this.notifyParentSession(task))
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
log("[background-agent] Error in notifyParentSession for stale task:", { taskId: task.id, error: err })
|
log("[background-agent] Error in notifyParentSession for stale task:", { taskId: task.id, error: err })
|
||||||
}
|
}
|
||||||
@ -1572,12 +1576,37 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
|||||||
this.tasks.clear()
|
this.tasks.clear()
|
||||||
this.notifications.clear()
|
this.notifications.clear()
|
||||||
this.pendingByParent.clear()
|
this.pendingByParent.clear()
|
||||||
|
this.notificationQueueByParent.clear()
|
||||||
this.queuesByKey.clear()
|
this.queuesByKey.clear()
|
||||||
this.processingKeys.clear()
|
this.processingKeys.clear()
|
||||||
this.unregisterProcessCleanup()
|
this.unregisterProcessCleanup()
|
||||||
log("[background-agent] Shutdown complete")
|
log("[background-agent] Shutdown complete")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private enqueueNotificationForParent(
|
||||||
|
parentSessionID: string | undefined,
|
||||||
|
operation: () => Promise<void>
|
||||||
|
): Promise<void> {
|
||||||
|
if (!parentSessionID) {
|
||||||
|
return operation()
|
||||||
|
}
|
||||||
|
|
||||||
|
const previous = this.notificationQueueByParent.get(parentSessionID) ?? Promise.resolve()
|
||||||
|
const current = previous
|
||||||
|
.catch(() => {})
|
||||||
|
.then(operation)
|
||||||
|
|
||||||
|
this.notificationQueueByParent.set(parentSessionID, current)
|
||||||
|
|
||||||
|
void current.finally(() => {
|
||||||
|
if (this.notificationQueueByParent.get(parentSessionID) === current) {
|
||||||
|
this.notificationQueueByParent.delete(parentSessionID)
|
||||||
|
}
|
||||||
|
}).catch(() => {})
|
||||||
|
|
||||||
|
return current
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function registerProcessSignal(
|
function registerProcessSignal(
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user