feat(background-agent): implement process cleanup for BackgroundManager
Add functionality to manage process cleanup by registering and unregistering signal listeners. This ensures that BackgroundManager instances properly shut down and remove their listeners on process exit. Introduce tests to verify listener removal after shutdown.
This commit is contained in:
parent
4ac0fa7bb0
commit
7050d447cd
@ -1,5 +1,6 @@
|
|||||||
import { describe, test, expect, beforeEach } from "bun:test"
|
import { describe, test, expect, beforeEach } from "bun:test"
|
||||||
import { afterEach } from "bun:test"
|
import { afterEach } from "bun:test"
|
||||||
|
import { tmpdir } from "node:os"
|
||||||
import type { PluginInput } from "@opencode-ai/plugin"
|
import type { PluginInput } from "@opencode-ai/plugin"
|
||||||
import type { BackgroundTask, ResumeInput } from "./types"
|
import type { BackgroundTask, ResumeInput } from "./types"
|
||||||
import { BackgroundManager } from "./manager"
|
import { BackgroundManager } from "./manager"
|
||||||
@ -167,7 +168,7 @@ function createBackgroundManager(): BackgroundManager {
|
|||||||
prompt: async () => ({}),
|
prompt: async () => ({}),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return new BackgroundManager({ client, directory: "C:\\tmp" } as unknown as PluginInput)
|
return new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput)
|
||||||
}
|
}
|
||||||
|
|
||||||
function getConcurrencyManager(manager: BackgroundManager): ConcurrencyManager {
|
function getConcurrencyManager(manager: BackgroundManager): ConcurrencyManager {
|
||||||
@ -186,6 +187,18 @@ async function tryCompleteTaskForTest(manager: BackgroundManager, task: Backgrou
|
|||||||
return (manager as unknown as { tryCompleteTask: (task: BackgroundTask, source: string) => Promise<boolean> }).tryCompleteTask(task, "test")
|
return (manager as unknown as { tryCompleteTask: (task: BackgroundTask, source: string) => Promise<boolean> }).tryCompleteTask(task, "test")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function getCleanupSignals(): Array<NodeJS.Signals | "beforeExit" | "exit"> {
|
||||||
|
const signals: Array<NodeJS.Signals | "beforeExit" | "exit"> = ["SIGINT", "SIGTERM", "beforeExit", "exit"]
|
||||||
|
if (process.platform === "win32") {
|
||||||
|
signals.push("SIGBREAK")
|
||||||
|
}
|
||||||
|
return signals
|
||||||
|
}
|
||||||
|
|
||||||
|
function getListenerCounts(signals: Array<NodeJS.Signals | "beforeExit" | "exit">): Record<string, number> {
|
||||||
|
return Object.fromEntries(signals.map((signal) => [signal, process.listenerCount(signal)]))
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
describe("BackgroundManager.getAllDescendantTasks", () => {
|
describe("BackgroundManager.getAllDescendantTasks", () => {
|
||||||
let manager: MockBackgroundManager
|
let manager: MockBackgroundManager
|
||||||
@ -1023,3 +1036,27 @@ describe("BackgroundManager.resume concurrency key", () => {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
describe("BackgroundManager process cleanup", () => {
|
||||||
|
test("should remove listeners after last shutdown", () => {
|
||||||
|
// #given
|
||||||
|
const signals = getCleanupSignals()
|
||||||
|
const baseline = getListenerCounts(signals)
|
||||||
|
const managerA = createBackgroundManager()
|
||||||
|
const managerB = createBackgroundManager()
|
||||||
|
|
||||||
|
// #when
|
||||||
|
const afterCreate = getListenerCounts(signals)
|
||||||
|
managerA.shutdown()
|
||||||
|
const afterFirstShutdown = getListenerCounts(signals)
|
||||||
|
managerB.shutdown()
|
||||||
|
const afterSecondShutdown = getListenerCounts(signals)
|
||||||
|
|
||||||
|
// #then
|
||||||
|
for (const signal of signals) {
|
||||||
|
expect(afterCreate[signal]).toBe(baseline[signal] + 1)
|
||||||
|
expect(afterFirstShutdown[signal]).toBe(baseline[signal] + 1)
|
||||||
|
expect(afterSecondShutdown[signal]).toBe(baseline[signal])
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
|||||||
@ -18,8 +18,11 @@ import { join } from "node:path"
|
|||||||
const TASK_TTL_MS = 30 * 60 * 1000
|
const TASK_TTL_MS = 30 * 60 * 1000
|
||||||
const MIN_STABILITY_TIME_MS = 10 * 1000 // Must run at least 10s before stability detection kicks in
|
const MIN_STABILITY_TIME_MS = 10 * 1000 // Must run at least 10s before stability detection kicks in
|
||||||
|
|
||||||
|
type ProcessCleanupEvent = NodeJS.Signals | "beforeExit" | "exit"
|
||||||
|
|
||||||
type OpencodeClient = PluginInput["client"]
|
type OpencodeClient = PluginInput["client"]
|
||||||
|
|
||||||
|
|
||||||
interface MessagePartInfo {
|
interface MessagePartInfo {
|
||||||
sessionID?: string
|
sessionID?: string
|
||||||
type?: string
|
type?: string
|
||||||
@ -45,6 +48,10 @@ interface Todo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export class BackgroundManager {
|
export class BackgroundManager {
|
||||||
|
private static cleanupManagers = new Set<BackgroundManager>()
|
||||||
|
private static cleanupRegistered = false
|
||||||
|
private static cleanupHandlers = new Map<ProcessCleanupEvent, () => void>()
|
||||||
|
|
||||||
private tasks: Map<string, BackgroundTask>
|
private tasks: Map<string, BackgroundTask>
|
||||||
private notifications: Map<string, BackgroundTask[]>
|
private notifications: Map<string, BackgroundTask[]>
|
||||||
private pendingByParent: Map<string, Set<string>> // Track pending tasks per parent for batching
|
private pendingByParent: Map<string, Set<string>> // Track pending tasks per parent for batching
|
||||||
@ -52,9 +59,9 @@ export class BackgroundManager {
|
|||||||
private directory: string
|
private directory: string
|
||||||
private pollingInterval?: ReturnType<typeof setInterval>
|
private pollingInterval?: ReturnType<typeof setInterval>
|
||||||
private concurrencyManager: ConcurrencyManager
|
private concurrencyManager: ConcurrencyManager
|
||||||
private cleanupRegistered = false
|
|
||||||
private shutdownTriggered = false
|
private shutdownTriggered = false
|
||||||
|
|
||||||
|
|
||||||
constructor(ctx: PluginInput, config?: BackgroundTaskConfig) {
|
constructor(ctx: PluginInput, config?: BackgroundTaskConfig) {
|
||||||
this.tasks = new Map()
|
this.tasks = new Map()
|
||||||
this.notifications = new Map()
|
this.notifications = new Map()
|
||||||
@ -648,26 +655,48 @@ export class BackgroundManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private registerProcessCleanup(): void {
|
private registerProcessCleanup(): void {
|
||||||
if (this.cleanupRegistered) return
|
BackgroundManager.cleanupManagers.add(this)
|
||||||
this.cleanupRegistered = true
|
|
||||||
|
|
||||||
const cleanup = () => {
|
if (BackgroundManager.cleanupRegistered) return
|
||||||
try {
|
BackgroundManager.cleanupRegistered = true
|
||||||
this.shutdown()
|
|
||||||
} catch (error) {
|
const cleanupAll = () => {
|
||||||
log("[background-agent] Error during shutdown cleanup:", error)
|
for (const manager of BackgroundManager.cleanupManagers) {
|
||||||
|
try {
|
||||||
|
manager.shutdown()
|
||||||
|
} catch (error) {
|
||||||
|
log("[background-agent] Error during shutdown cleanup:", error)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
registerProcessSignal("SIGINT", cleanup)
|
const registerSignal = (signal: ProcessCleanupEvent, exitAfter: boolean): void => {
|
||||||
registerProcessSignal("SIGTERM", cleanup)
|
const listener = registerProcessSignal(signal, cleanupAll, exitAfter)
|
||||||
if (process.platform === "win32") {
|
BackgroundManager.cleanupHandlers.set(signal, listener)
|
||||||
registerProcessSignal("SIGBREAK", cleanup)
|
|
||||||
}
|
}
|
||||||
process.on("beforeExit", cleanup)
|
|
||||||
process.on("exit", cleanup)
|
registerSignal("SIGINT", true)
|
||||||
|
registerSignal("SIGTERM", true)
|
||||||
|
if (process.platform === "win32") {
|
||||||
|
registerSignal("SIGBREAK", true)
|
||||||
|
}
|
||||||
|
registerSignal("beforeExit", false)
|
||||||
|
registerSignal("exit", false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private unregisterProcessCleanup(): void {
|
||||||
|
BackgroundManager.cleanupManagers.delete(this)
|
||||||
|
|
||||||
|
if (BackgroundManager.cleanupManagers.size > 0) return
|
||||||
|
|
||||||
|
for (const [signal, listener] of BackgroundManager.cleanupHandlers.entries()) {
|
||||||
|
process.off(signal, listener)
|
||||||
|
}
|
||||||
|
BackgroundManager.cleanupHandlers.clear()
|
||||||
|
BackgroundManager.cleanupRegistered = false
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get all running tasks (for compaction hook)
|
* Get all running tasks (for compaction hook)
|
||||||
*/
|
*/
|
||||||
@ -1029,20 +1058,28 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
|||||||
this.tasks.clear()
|
this.tasks.clear()
|
||||||
this.notifications.clear()
|
this.notifications.clear()
|
||||||
this.pendingByParent.clear()
|
this.pendingByParent.clear()
|
||||||
|
this.unregisterProcessCleanup()
|
||||||
log("[background-agent] Shutdown complete")
|
log("[background-agent] Shutdown complete")
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function registerProcessSignal(
|
function registerProcessSignal(
|
||||||
signal: NodeJS.Signals,
|
signal: ProcessCleanupEvent,
|
||||||
handler: () => void
|
handler: () => void,
|
||||||
): void {
|
exitAfter: boolean
|
||||||
process.on(signal, () => {
|
): () => void {
|
||||||
|
const listener = () => {
|
||||||
handler()
|
handler()
|
||||||
process.exit(0)
|
if (exitAfter) {
|
||||||
})
|
process.exit(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
process.on(signal, listener)
|
||||||
|
return listener
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
function getMessageDir(sessionID: string): string | null {
|
function getMessageDir(sessionID: string): string | null {
|
||||||
if (!existsSync(MESSAGE_STORAGE)) return null
|
if (!existsSync(MESSAGE_STORAGE)) return null
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user