fix: prevent overlapping poll cycles in managers
Guarding polling re-entry avoids stacked async polls under slow responses, and unref on pending-call cleanup timer reduces idle wakeups.
This commit is contained in:
parent
8c88da51e1
commit
2b5887aca3
53
src/features/background-agent/manager.polling.test.ts
Normal file
53
src/features/background-agent/manager.polling.test.ts
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
import { describe, test, expect } from "bun:test"
|
||||||
|
import { tmpdir } from "node:os"
|
||||||
|
import type { PluginInput } from "@opencode-ai/plugin"
|
||||||
|
import { BackgroundManager } from "./manager"
|
||||||
|
|
||||||
|
function createManagerWithStatus(statusImpl: () => Promise<{ data: Record<string, { type: string }> }>): BackgroundManager {
|
||||||
|
const client = {
|
||||||
|
session: {
|
||||||
|
status: statusImpl,
|
||||||
|
prompt: async () => ({}),
|
||||||
|
promptAsync: async () => ({}),
|
||||||
|
abort: async () => ({}),
|
||||||
|
todo: async () => ({ data: [] }),
|
||||||
|
messages: async () => ({ data: [] }),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput)
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("BackgroundManager polling overlap", () => {
|
||||||
|
test("skips overlapping pollRunningTasks executions", async () => {
|
||||||
|
//#given
|
||||||
|
let activeCalls = 0
|
||||||
|
let maxActiveCalls = 0
|
||||||
|
let statusCallCount = 0
|
||||||
|
let releaseStatus: (() => void) | undefined
|
||||||
|
const statusGate = new Promise<void>((resolve) => {
|
||||||
|
releaseStatus = resolve
|
||||||
|
})
|
||||||
|
|
||||||
|
const manager = createManagerWithStatus(async () => {
|
||||||
|
statusCallCount += 1
|
||||||
|
activeCalls += 1
|
||||||
|
maxActiveCalls = Math.max(maxActiveCalls, activeCalls)
|
||||||
|
await statusGate
|
||||||
|
activeCalls -= 1
|
||||||
|
return { data: {} }
|
||||||
|
})
|
||||||
|
|
||||||
|
//#when
|
||||||
|
const firstPoll = (manager as unknown as { pollRunningTasks: () => Promise<void> }).pollRunningTasks()
|
||||||
|
await Promise.resolve()
|
||||||
|
const secondPoll = (manager as unknown as { pollRunningTasks: () => Promise<void> }).pollRunningTasks()
|
||||||
|
releaseStatus?.()
|
||||||
|
await Promise.all([firstPoll, secondPoll])
|
||||||
|
manager.shutdown()
|
||||||
|
|
||||||
|
//#then
|
||||||
|
expect(maxActiveCalls).toBe(1)
|
||||||
|
expect(statusCallCount).toBe(1)
|
||||||
|
})
|
||||||
|
})
|
||||||
@ -80,6 +80,7 @@ export class BackgroundManager {
|
|||||||
private client: OpencodeClient
|
private client: OpencodeClient
|
||||||
private directory: string
|
private directory: string
|
||||||
private pollingInterval?: ReturnType<typeof setInterval>
|
private pollingInterval?: ReturnType<typeof setInterval>
|
||||||
|
private pollingInFlight = false
|
||||||
private concurrencyManager: ConcurrencyManager
|
private concurrencyManager: ConcurrencyManager
|
||||||
private shutdownTriggered = false
|
private shutdownTriggered = false
|
||||||
private config?: BackgroundTaskConfig
|
private config?: BackgroundTaskConfig
|
||||||
@ -1546,6 +1547,9 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
|||||||
}
|
}
|
||||||
|
|
||||||
private async pollRunningTasks(): Promise<void> {
|
private async pollRunningTasks(): Promise<void> {
|
||||||
|
if (this.pollingInFlight) return
|
||||||
|
this.pollingInFlight = true
|
||||||
|
try {
|
||||||
this.pruneStaleTasksAndNotifications()
|
this.pruneStaleTasksAndNotifications()
|
||||||
|
|
||||||
const statusResult = await this.client.session.status()
|
const statusResult = await this.client.session.status()
|
||||||
@ -1601,6 +1605,9 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
|||||||
if (!this.hasRunningTasks()) {
|
if (!this.hasRunningTasks()) {
|
||||||
this.stopPolling()
|
this.stopPolling()
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
this.pollingInFlight = false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
56
src/features/tmux-subagent/polling-manager.test.ts
Normal file
56
src/features/tmux-subagent/polling-manager.test.ts
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
import { describe, test, expect } from "bun:test"
|
||||||
|
import { TmuxPollingManager } from "./polling-manager"
|
||||||
|
import type { TrackedSession } from "./types"
|
||||||
|
|
||||||
|
describe("TmuxPollingManager overlap", () => {
|
||||||
|
test("skips overlapping pollSessions executions", async () => {
|
||||||
|
//#given
|
||||||
|
const sessions = new Map<string, TrackedSession>()
|
||||||
|
sessions.set("ses-1", {
|
||||||
|
sessionId: "ses-1",
|
||||||
|
paneId: "%1",
|
||||||
|
description: "test",
|
||||||
|
createdAt: new Date(),
|
||||||
|
lastSeenAt: new Date(),
|
||||||
|
})
|
||||||
|
|
||||||
|
let activeCalls = 0
|
||||||
|
let maxActiveCalls = 0
|
||||||
|
let statusCallCount = 0
|
||||||
|
let releaseStatus: (() => void) | undefined
|
||||||
|
const statusGate = new Promise<void>((resolve) => {
|
||||||
|
releaseStatus = resolve
|
||||||
|
})
|
||||||
|
|
||||||
|
const client = {
|
||||||
|
session: {
|
||||||
|
status: async () => {
|
||||||
|
statusCallCount += 1
|
||||||
|
activeCalls += 1
|
||||||
|
maxActiveCalls = Math.max(maxActiveCalls, activeCalls)
|
||||||
|
await statusGate
|
||||||
|
activeCalls -= 1
|
||||||
|
return { data: { "ses-1": { type: "running" } } }
|
||||||
|
},
|
||||||
|
messages: async () => ({ data: [] }),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
const manager = new TmuxPollingManager(
|
||||||
|
client as unknown as import("../../tools/delegate-task/types").OpencodeClient,
|
||||||
|
sessions,
|
||||||
|
async () => {},
|
||||||
|
)
|
||||||
|
|
||||||
|
//#when
|
||||||
|
const firstPoll = (manager as unknown as { pollSessions: () => Promise<void> }).pollSessions()
|
||||||
|
await Promise.resolve()
|
||||||
|
const secondPoll = (manager as unknown as { pollSessions: () => Promise<void> }).pollSessions()
|
||||||
|
releaseStatus?.()
|
||||||
|
await Promise.all([firstPoll, secondPoll])
|
||||||
|
|
||||||
|
//#then
|
||||||
|
expect(maxActiveCalls).toBe(1)
|
||||||
|
expect(statusCallCount).toBe(1)
|
||||||
|
})
|
||||||
|
})
|
||||||
@ -11,6 +11,7 @@ const STABLE_POLLS_REQUIRED = 3
|
|||||||
|
|
||||||
export class TmuxPollingManager {
|
export class TmuxPollingManager {
|
||||||
private pollInterval?: ReturnType<typeof setInterval>
|
private pollInterval?: ReturnType<typeof setInterval>
|
||||||
|
private pollingInFlight = false
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private client: OpencodeClient,
|
private client: OpencodeClient,
|
||||||
@ -37,12 +38,14 @@ export class TmuxPollingManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private async pollSessions(): Promise<void> {
|
private async pollSessions(): Promise<void> {
|
||||||
|
if (this.pollingInFlight) return
|
||||||
|
this.pollingInFlight = true
|
||||||
|
try {
|
||||||
if (this.sessions.size === 0) {
|
if (this.sessions.size === 0) {
|
||||||
this.stopPolling()
|
this.stopPolling()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
|
||||||
const statusResult = await this.client.session.status({ path: undefined })
|
const statusResult = await this.client.session.status({ path: undefined })
|
||||||
const allStatuses = normalizeSDKResponse(statusResult, {} as Record<string, { type: string }>)
|
const allStatuses = normalizeSDKResponse(statusResult, {} as Record<string, { type: string }>)
|
||||||
|
|
||||||
@ -135,6 +138,8 @@ export class TmuxPollingManager {
|
|||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
log("[tmux-session-manager] poll error", { error: String(err) })
|
log("[tmux-session-manager] poll error", { error: String(err) })
|
||||||
|
} finally {
|
||||||
|
this.pollingInFlight = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
38
src/hooks/comment-checker/pending-calls.test.ts
Normal file
38
src/hooks/comment-checker/pending-calls.test.ts
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
import { describe, test, expect } from "bun:test"
|
||||||
|
|
||||||
|
describe("pending-calls cleanup interval", () => {
|
||||||
|
test("starts cleanup once and unrefs timer", async () => {
|
||||||
|
//#given
|
||||||
|
const originalSetInterval = globalThis.setInterval
|
||||||
|
const setIntervalCalls: number[] = []
|
||||||
|
let unrefCalled = 0
|
||||||
|
|
||||||
|
globalThis.setInterval = ((
|
||||||
|
_handler: TimerHandler,
|
||||||
|
timeout?: number,
|
||||||
|
..._args: any[]
|
||||||
|
) => {
|
||||||
|
setIntervalCalls.push(timeout as number)
|
||||||
|
return {
|
||||||
|
unref: () => {
|
||||||
|
unrefCalled += 1
|
||||||
|
},
|
||||||
|
} as unknown as ReturnType<typeof setInterval>
|
||||||
|
}) as unknown as typeof setInterval
|
||||||
|
|
||||||
|
try {
|
||||||
|
const modulePath = new URL("./pending-calls.ts", import.meta.url).pathname
|
||||||
|
const pendingCallsModule = await import(`${modulePath}?pending-calls-test-once`)
|
||||||
|
|
||||||
|
//#when
|
||||||
|
pendingCallsModule.startPendingCallCleanup()
|
||||||
|
pendingCallsModule.startPendingCallCleanup()
|
||||||
|
|
||||||
|
//#then
|
||||||
|
expect(setIntervalCalls).toEqual([10_000])
|
||||||
|
expect(unrefCalled).toBe(1)
|
||||||
|
} finally {
|
||||||
|
globalThis.setInterval = originalSetInterval
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
@ -4,6 +4,7 @@ const pendingCalls = new Map<string, PendingCall>()
|
|||||||
const PENDING_CALL_TTL = 60_000
|
const PENDING_CALL_TTL = 60_000
|
||||||
|
|
||||||
let cleanupIntervalStarted = false
|
let cleanupIntervalStarted = false
|
||||||
|
let cleanupInterval: ReturnType<typeof setInterval> | undefined
|
||||||
|
|
||||||
function cleanupOldPendingCalls(): void {
|
function cleanupOldPendingCalls(): void {
|
||||||
const now = Date.now()
|
const now = Date.now()
|
||||||
@ -17,7 +18,10 @@ function cleanupOldPendingCalls(): void {
|
|||||||
export function startPendingCallCleanup(): void {
|
export function startPendingCallCleanup(): void {
|
||||||
if (cleanupIntervalStarted) return
|
if (cleanupIntervalStarted) return
|
||||||
cleanupIntervalStarted = true
|
cleanupIntervalStarted = true
|
||||||
setInterval(cleanupOldPendingCalls, 10_000)
|
cleanupInterval = setInterval(cleanupOldPendingCalls, 10_000)
|
||||||
|
if (typeof cleanupInterval === "object" && "unref" in cleanupInterval) {
|
||||||
|
cleanupInterval.unref()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export function registerPendingCall(callID: string, pendingCall: PendingCall): void {
|
export function registerPendingCall(callID: string, pendingCall: PendingCall): void {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user