fix(background-agent): preserve external concurrency keys
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
parent
c1246f61d1
commit
4ac0fa7bb0
@ -1,5 +1,10 @@
|
|||||||
import { describe, test, expect, beforeEach } from "bun:test"
|
import { describe, test, expect, beforeEach } from "bun:test"
|
||||||
|
import { afterEach } from "bun:test"
|
||||||
|
import type { PluginInput } from "@opencode-ai/plugin"
|
||||||
import type { BackgroundTask, ResumeInput } from "./types"
|
import type { BackgroundTask, ResumeInput } from "./types"
|
||||||
|
import { BackgroundManager } from "./manager"
|
||||||
|
import { ConcurrencyManager } from "./concurrency"
|
||||||
|
|
||||||
|
|
||||||
const TASK_TTL_MS = 30 * 60 * 1000
|
const TASK_TTL_MS = 30 * 60 * 1000
|
||||||
|
|
||||||
@ -156,6 +161,32 @@ function createMockTask(overrides: Partial<BackgroundTask> & { id: string; sessi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function createBackgroundManager(): BackgroundManager {
|
||||||
|
const client = {
|
||||||
|
session: {
|
||||||
|
prompt: async () => ({}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return new BackgroundManager({ client, directory: "C:\\tmp" } as unknown as PluginInput)
|
||||||
|
}
|
||||||
|
|
||||||
|
function getConcurrencyManager(manager: BackgroundManager): ConcurrencyManager {
|
||||||
|
return (manager as unknown as { concurrencyManager: ConcurrencyManager }).concurrencyManager
|
||||||
|
}
|
||||||
|
|
||||||
|
function getTaskMap(manager: BackgroundManager): Map<string, BackgroundTask> {
|
||||||
|
return (manager as unknown as { tasks: Map<string, BackgroundTask> }).tasks
|
||||||
|
}
|
||||||
|
|
||||||
|
function stubNotifyParentSession(manager: BackgroundManager): void {
|
||||||
|
(manager as unknown as { notifyParentSession: (task: BackgroundTask) => Promise<void> }).notifyParentSession = async () => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function tryCompleteTaskForTest(manager: BackgroundManager, task: BackgroundTask): Promise<boolean> {
|
||||||
|
return (manager as unknown as { tryCompleteTask: (task: BackgroundTask, source: string) => Promise<boolean> }).tryCompleteTask(task, "test")
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
describe("BackgroundManager.getAllDescendantTasks", () => {
|
describe("BackgroundManager.getAllDescendantTasks", () => {
|
||||||
let manager: MockBackgroundManager
|
let manager: MockBackgroundManager
|
||||||
|
|
||||||
@ -844,36 +875,25 @@ function buildNotificationPromptBody(
|
|||||||
return body
|
return body
|
||||||
}
|
}
|
||||||
|
|
||||||
describe("tryCompleteTask pattern - race condition prevention", () => {
|
describe("BackgroundManager.tryCompleteTask", () => {
|
||||||
/**
|
let manager: BackgroundManager
|
||||||
* These tests verify the tryCompleteTask pattern behavior
|
|
||||||
* by simulating the guard logic in a mock implementation.
|
|
||||||
*/
|
|
||||||
|
|
||||||
test("should prevent double completion when task already completed", () => {
|
beforeEach(() => {
|
||||||
// #given - task already completed
|
// #given
|
||||||
const task: BackgroundTask = {
|
manager = createBackgroundManager()
|
||||||
id: "task-1",
|
stubNotifyParentSession(manager)
|
||||||
sessionID: "session-1",
|
|
||||||
parentSessionID: "session-parent",
|
|
||||||
parentMessageID: "msg-1",
|
|
||||||
description: "test task",
|
|
||||||
prompt: "test",
|
|
||||||
agent: "explore",
|
|
||||||
status: "completed",
|
|
||||||
startedAt: new Date(),
|
|
||||||
completedAt: new Date(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// #when - try to complete again (simulating tryCompleteTask guard)
|
|
||||||
const canComplete = task.status === "running"
|
|
||||||
|
|
||||||
// #then - should not allow completion
|
|
||||||
expect(canComplete).toBe(false)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
test("should allow completion when task is running", () => {
|
afterEach(() => {
|
||||||
// #given - task is running
|
manager.shutdown()
|
||||||
|
})
|
||||||
|
|
||||||
|
test("should release concurrency and clear key on completion", async () => {
|
||||||
|
// #given
|
||||||
|
const concurrencyKey = "anthropic/claude-opus-4-5"
|
||||||
|
const concurrencyManager = getConcurrencyManager(manager)
|
||||||
|
await concurrencyManager.acquire(concurrencyKey)
|
||||||
|
|
||||||
const task: BackgroundTask = {
|
const task: BackgroundTask = {
|
||||||
id: "task-1",
|
id: "task-1",
|
||||||
sessionID: "session-1",
|
sessionID: "session-1",
|
||||||
@ -884,87 +904,25 @@ describe("tryCompleteTask pattern - race condition prevention", () => {
|
|||||||
agent: "explore",
|
agent: "explore",
|
||||||
status: "running",
|
status: "running",
|
||||||
startedAt: new Date(),
|
startedAt: new Date(),
|
||||||
}
|
concurrencyKey,
|
||||||
|
|
||||||
// #when - check if can complete
|
|
||||||
const canComplete = task.status === "running"
|
|
||||||
|
|
||||||
// #then
|
|
||||||
expect(canComplete).toBe(true)
|
|
||||||
})
|
|
||||||
|
|
||||||
test("should prevent completion when task is cancelled", () => {
|
|
||||||
// #given - task cancelled by session.deleted
|
|
||||||
const task: BackgroundTask = {
|
|
||||||
id: "task-1",
|
|
||||||
sessionID: "session-1",
|
|
||||||
parentSessionID: "session-parent",
|
|
||||||
parentMessageID: "msg-1",
|
|
||||||
description: "test task",
|
|
||||||
prompt: "test",
|
|
||||||
agent: "explore",
|
|
||||||
status: "cancelled",
|
|
||||||
startedAt: new Date(),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// #when
|
// #when
|
||||||
const canComplete = task.status === "running"
|
const completed = await tryCompleteTaskForTest(manager, task)
|
||||||
|
|
||||||
// #then
|
|
||||||
expect(canComplete).toBe(false)
|
|
||||||
})
|
|
||||||
|
|
||||||
test("should prevent completion when task errored", () => {
|
|
||||||
// #given - task errored
|
|
||||||
const task: BackgroundTask = {
|
|
||||||
id: "task-1",
|
|
||||||
sessionID: "session-1",
|
|
||||||
parentSessionID: "session-parent",
|
|
||||||
parentMessageID: "msg-1",
|
|
||||||
description: "test task",
|
|
||||||
prompt: "test",
|
|
||||||
agent: "explore",
|
|
||||||
status: "error",
|
|
||||||
error: "some error",
|
|
||||||
startedAt: new Date(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// #when
|
|
||||||
const canComplete = task.status === "running"
|
|
||||||
|
|
||||||
// #then
|
|
||||||
expect(canComplete).toBe(false)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
describe("concurrencyKey management", () => {
|
|
||||||
test("concurrencyKey should be undefined after release", () => {
|
|
||||||
// #given - task with concurrency key
|
|
||||||
const task: BackgroundTask = {
|
|
||||||
id: "task-1",
|
|
||||||
sessionID: "session-1",
|
|
||||||
parentSessionID: "session-parent",
|
|
||||||
parentMessageID: "msg-1",
|
|
||||||
description: "test task",
|
|
||||||
prompt: "test",
|
|
||||||
agent: "explore",
|
|
||||||
status: "running",
|
|
||||||
startedAt: new Date(),
|
|
||||||
concurrencyKey: "anthropic/claude-sonnet-4-5",
|
|
||||||
}
|
|
||||||
|
|
||||||
// #when - simulate release pattern (what tryCompleteTask does)
|
|
||||||
if (task.concurrencyKey) {
|
|
||||||
// concurrencyManager.release(task.concurrencyKey) would be called
|
|
||||||
task.concurrencyKey = undefined
|
|
||||||
}
|
|
||||||
|
|
||||||
// #then
|
// #then
|
||||||
|
expect(completed).toBe(true)
|
||||||
|
expect(task.status).toBe("completed")
|
||||||
expect(task.concurrencyKey).toBeUndefined()
|
expect(task.concurrencyKey).toBeUndefined()
|
||||||
|
expect(concurrencyManager.getCount(concurrencyKey)).toBe(0)
|
||||||
})
|
})
|
||||||
|
|
||||||
test("release should be idempotent with concurrencyKey guard", () => {
|
test("should prevent double completion and double release", async () => {
|
||||||
// #given - task with key already released
|
// #given
|
||||||
|
const concurrencyKey = "anthropic/claude-opus-4-5"
|
||||||
|
const concurrencyManager = getConcurrencyManager(manager)
|
||||||
|
await concurrencyManager.acquire(concurrencyKey)
|
||||||
|
|
||||||
const task: BackgroundTask = {
|
const task: BackgroundTask = {
|
||||||
id: "task-1",
|
id: "task-1",
|
||||||
sessionID: "session-1",
|
sessionID: "session-1",
|
||||||
@ -973,19 +931,95 @@ describe("concurrencyKey management", () => {
|
|||||||
description: "test task",
|
description: "test task",
|
||||||
prompt: "test",
|
prompt: "test",
|
||||||
agent: "explore",
|
agent: "explore",
|
||||||
status: "completed",
|
status: "running",
|
||||||
startedAt: new Date(),
|
startedAt: new Date(),
|
||||||
concurrencyKey: undefined, // already released
|
concurrencyKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
// #when - try to release again (guard pattern)
|
// #when
|
||||||
let releaseCount = 0
|
await tryCompleteTaskForTest(manager, task)
|
||||||
if (task.concurrencyKey) {
|
const secondAttempt = await tryCompleteTaskForTest(manager, task)
|
||||||
releaseCount++
|
|
||||||
task.concurrencyKey = undefined
|
|
||||||
}
|
|
||||||
|
|
||||||
// #then - no double release
|
// #then
|
||||||
expect(releaseCount).toBe(0)
|
expect(secondAttempt).toBe(false)
|
||||||
|
expect(task.status).toBe("completed")
|
||||||
|
expect(concurrencyManager.getCount(concurrencyKey)).toBe(0)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
describe("BackgroundManager.registerExternalTask", () => {
|
||||||
|
let manager: BackgroundManager
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
// #given
|
||||||
|
manager = createBackgroundManager()
|
||||||
|
stubNotifyParentSession(manager)
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
manager.shutdown()
|
||||||
|
})
|
||||||
|
|
||||||
|
test("should not double acquire on duplicate registration", async () => {
|
||||||
|
// #given
|
||||||
|
const input = {
|
||||||
|
taskId: "task-1",
|
||||||
|
sessionID: "session-1",
|
||||||
|
parentSessionID: "parent-session",
|
||||||
|
description: "external task",
|
||||||
|
agent: "sisyphus_task",
|
||||||
|
concurrencyKey: "external-key",
|
||||||
|
}
|
||||||
|
|
||||||
|
// #when
|
||||||
|
await manager.registerExternalTask(input)
|
||||||
|
await manager.registerExternalTask(input)
|
||||||
|
|
||||||
|
// #then
|
||||||
|
const concurrencyManager = getConcurrencyManager(manager)
|
||||||
|
expect(concurrencyManager.getCount("external-key")).toBe(1)
|
||||||
|
expect(getTaskMap(manager).size).toBe(1)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe("BackgroundManager.resume concurrency key", () => {
|
||||||
|
let manager: BackgroundManager
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
// #given
|
||||||
|
manager = createBackgroundManager()
|
||||||
|
stubNotifyParentSession(manager)
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
manager.shutdown()
|
||||||
|
})
|
||||||
|
|
||||||
|
test("should re-acquire using external task concurrency key", async () => {
|
||||||
|
// #given
|
||||||
|
const task = await manager.registerExternalTask({
|
||||||
|
taskId: "task-1",
|
||||||
|
sessionID: "session-1",
|
||||||
|
parentSessionID: "parent-session",
|
||||||
|
description: "external task",
|
||||||
|
agent: "sisyphus_task",
|
||||||
|
concurrencyKey: "external-key",
|
||||||
|
})
|
||||||
|
|
||||||
|
await tryCompleteTaskForTest(manager, task)
|
||||||
|
|
||||||
|
// #when
|
||||||
|
await manager.resume({
|
||||||
|
sessionId: "session-1",
|
||||||
|
prompt: "resume",
|
||||||
|
parentSessionID: "parent-session-2",
|
||||||
|
parentMessageID: "msg-2",
|
||||||
|
})
|
||||||
|
|
||||||
|
// #then
|
||||||
|
const concurrencyManager = getConcurrencyManager(manager)
|
||||||
|
expect(concurrencyManager.getCount("external-key")).toBe(1)
|
||||||
|
expect(task.concurrencyKey).toBe("external-key")
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
|||||||
@ -129,8 +129,10 @@ export class BackgroundManager {
|
|||||||
parentAgent: input.parentAgent,
|
parentAgent: input.parentAgent,
|
||||||
model: input.model,
|
model: input.model,
|
||||||
concurrencyKey,
|
concurrencyKey,
|
||||||
|
concurrencyGroup: concurrencyKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
this.tasks.set(task.id, task)
|
this.tasks.set(task.id, task)
|
||||||
this.startPolling()
|
this.startPolling()
|
||||||
|
|
||||||
@ -191,6 +193,7 @@ export class BackgroundManager {
|
|||||||
this.concurrencyManager.release(existingTask.concurrencyKey)
|
this.concurrencyManager.release(existingTask.concurrencyKey)
|
||||||
existingTask.concurrencyKey = undefined
|
existingTask.concurrencyKey = undefined
|
||||||
}
|
}
|
||||||
|
|
||||||
this.markForNotification(existingTask)
|
this.markForNotification(existingTask)
|
||||||
this.notifyParentSession(existingTask).catch(err => {
|
this.notifyParentSession(existingTask).catch(err => {
|
||||||
log("[background-agent] Failed to notify on error:", err)
|
log("[background-agent] Failed to notify on error:", err)
|
||||||
@ -250,6 +253,33 @@ export class BackgroundManager {
|
|||||||
parentAgent?: string
|
parentAgent?: string
|
||||||
concurrencyKey?: string
|
concurrencyKey?: string
|
||||||
}): Promise<BackgroundTask> {
|
}): Promise<BackgroundTask> {
|
||||||
|
const existingTask = this.tasks.get(input.taskId)
|
||||||
|
if (existingTask) {
|
||||||
|
if (input.parentSessionID !== existingTask.parentSessionID) {
|
||||||
|
existingTask.parentSessionID = input.parentSessionID
|
||||||
|
}
|
||||||
|
if (input.parentAgent !== undefined) {
|
||||||
|
existingTask.parentAgent = input.parentAgent
|
||||||
|
}
|
||||||
|
if (!existingTask.concurrencyGroup) {
|
||||||
|
existingTask.concurrencyGroup = input.concurrencyKey ?? existingTask.agent
|
||||||
|
}
|
||||||
|
|
||||||
|
subagentSessions.add(existingTask.sessionID)
|
||||||
|
this.startPolling()
|
||||||
|
|
||||||
|
// Track for batched notifications (external tasks need tracking too)
|
||||||
|
const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set()
|
||||||
|
pending.add(existingTask.id)
|
||||||
|
this.pendingByParent.set(input.parentSessionID, pending)
|
||||||
|
|
||||||
|
log("[background-agent] External task already registered:", { taskId: existingTask.id, sessionID: existingTask.sessionID })
|
||||||
|
|
||||||
|
return existingTask
|
||||||
|
}
|
||||||
|
|
||||||
|
const concurrencyGroup = input.concurrencyKey ?? input.agent ?? "sisyphus_task"
|
||||||
|
|
||||||
// Acquire concurrency slot if a key is provided
|
// Acquire concurrency slot if a key is provided
|
||||||
if (input.concurrencyKey) {
|
if (input.concurrencyKey) {
|
||||||
await this.concurrencyManager.acquire(input.concurrencyKey)
|
await this.concurrencyManager.acquire(input.concurrencyKey)
|
||||||
@ -271,12 +301,14 @@ export class BackgroundManager {
|
|||||||
},
|
},
|
||||||
parentAgent: input.parentAgent,
|
parentAgent: input.parentAgent,
|
||||||
concurrencyKey: input.concurrencyKey,
|
concurrencyKey: input.concurrencyKey,
|
||||||
|
concurrencyGroup,
|
||||||
}
|
}
|
||||||
|
|
||||||
this.tasks.set(task.id, task)
|
this.tasks.set(task.id, task)
|
||||||
subagentSessions.add(input.sessionID)
|
subagentSessions.add(input.sessionID)
|
||||||
this.startPolling()
|
this.startPolling()
|
||||||
|
|
||||||
|
|
||||||
// Track for batched notifications (external tasks need tracking too)
|
// Track for batched notifications (external tasks need tracking too)
|
||||||
const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set()
|
const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set()
|
||||||
pending.add(task.id)
|
pending.add(task.id)
|
||||||
@ -301,12 +333,12 @@ export class BackgroundManager {
|
|||||||
return existingTask
|
return existingTask
|
||||||
}
|
}
|
||||||
|
|
||||||
// Re-acquire concurrency using the agent name as the key (same as launch()).
|
// Re-acquire concurrency using the persisted concurrency group
|
||||||
// Note: existingTask.concurrencyKey is cleared when tasks complete, so we
|
const concurrencyKey = existingTask.concurrencyGroup ?? existingTask.agent
|
||||||
// derive the key from task.agent which persists through completion.
|
|
||||||
const concurrencyKey = existingTask.agent
|
|
||||||
await this.concurrencyManager.acquire(concurrencyKey)
|
await this.concurrencyManager.acquire(concurrencyKey)
|
||||||
existingTask.concurrencyKey = concurrencyKey
|
existingTask.concurrencyKey = concurrencyKey
|
||||||
|
existingTask.concurrencyGroup = concurrencyKey
|
||||||
|
|
||||||
|
|
||||||
existingTask.status = "running"
|
existingTask.status = "running"
|
||||||
existingTask.completedAt = undefined
|
existingTask.completedAt = undefined
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user