Merge pull request #1877 from code-yeongyu/fix/1752-compaction-race
fix: cancel pending compaction timer on session.idle and add error logging (#1752)
This commit is contained in:
commit
24ea3627ad
@ -0,0 +1,105 @@
|
|||||||
|
import { beforeEach, describe, expect, mock, test } from "bun:test"
|
||||||
|
import type { PluginInput } from "@opencode-ai/plugin"
|
||||||
|
|
||||||
|
const executeCompactMock = mock(async () => {})
|
||||||
|
const getLastAssistantMock = mock(async () => ({
|
||||||
|
providerID: "anthropic",
|
||||||
|
modelID: "claude-sonnet-4-5",
|
||||||
|
}))
|
||||||
|
const parseAnthropicTokenLimitErrorMock = mock(() => ({
|
||||||
|
providerID: "anthropic",
|
||||||
|
modelID: "claude-sonnet-4-5",
|
||||||
|
}))
|
||||||
|
|
||||||
|
mock.module("./executor", () => ({
|
||||||
|
executeCompact: executeCompactMock,
|
||||||
|
getLastAssistant: getLastAssistantMock,
|
||||||
|
}))
|
||||||
|
|
||||||
|
mock.module("./parser", () => ({
|
||||||
|
parseAnthropicTokenLimitError: parseAnthropicTokenLimitErrorMock,
|
||||||
|
}))
|
||||||
|
|
||||||
|
mock.module("../../shared/logger", () => ({
|
||||||
|
log: () => {},
|
||||||
|
}))
|
||||||
|
|
||||||
|
function createMockContext(): PluginInput {
|
||||||
|
return {
|
||||||
|
client: {
|
||||||
|
session: {
|
||||||
|
messages: mock(() => Promise.resolve({ data: [] })),
|
||||||
|
},
|
||||||
|
tui: {
|
||||||
|
showToast: mock(() => Promise.resolve()),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
directory: "/tmp",
|
||||||
|
} as PluginInput
|
||||||
|
}
|
||||||
|
|
||||||
|
function setupDelayedTimeoutMocks(): {
|
||||||
|
restore: () => void
|
||||||
|
getClearTimeoutCalls: () => Array<ReturnType<typeof setTimeout>>
|
||||||
|
} {
|
||||||
|
const originalSetTimeout = globalThis.setTimeout
|
||||||
|
const originalClearTimeout = globalThis.clearTimeout
|
||||||
|
const clearTimeoutCalls: Array<ReturnType<typeof setTimeout>> = []
|
||||||
|
let timeoutCounter = 0
|
||||||
|
|
||||||
|
globalThis.setTimeout = ((_: () => void, _delay?: number) => {
|
||||||
|
timeoutCounter += 1
|
||||||
|
return timeoutCounter as ReturnType<typeof setTimeout>
|
||||||
|
}) as typeof setTimeout
|
||||||
|
|
||||||
|
globalThis.clearTimeout = ((timeoutID: ReturnType<typeof setTimeout>) => {
|
||||||
|
clearTimeoutCalls.push(timeoutID)
|
||||||
|
}) as typeof clearTimeout
|
||||||
|
|
||||||
|
return {
|
||||||
|
restore: () => {
|
||||||
|
globalThis.setTimeout = originalSetTimeout
|
||||||
|
globalThis.clearTimeout = originalClearTimeout
|
||||||
|
},
|
||||||
|
getClearTimeoutCalls: () => clearTimeoutCalls,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("createAnthropicContextWindowLimitRecoveryHook", () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
executeCompactMock.mockClear()
|
||||||
|
getLastAssistantMock.mockClear()
|
||||||
|
parseAnthropicTokenLimitErrorMock.mockClear()
|
||||||
|
})
|
||||||
|
|
||||||
|
test("cancels pending timer when session.idle handles compaction first", async () => {
|
||||||
|
//#given
|
||||||
|
const { restore, getClearTimeoutCalls } = setupDelayedTimeoutMocks()
|
||||||
|
const { createAnthropicContextWindowLimitRecoveryHook } = await import("./recovery-hook")
|
||||||
|
const hook = createAnthropicContextWindowLimitRecoveryHook(createMockContext())
|
||||||
|
|
||||||
|
try {
|
||||||
|
//#when
|
||||||
|
await hook.event({
|
||||||
|
event: {
|
||||||
|
type: "session.error",
|
||||||
|
properties: { sessionID: "session-race", error: "prompt is too long" },
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
await hook.event({
|
||||||
|
event: {
|
||||||
|
type: "session.idle",
|
||||||
|
properties: { sessionID: "session-race" },
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
//#then
|
||||||
|
expect(getClearTimeoutCalls()).toEqual([1 as ReturnType<typeof setTimeout>])
|
||||||
|
expect(executeCompactMock).toHaveBeenCalledTimes(1)
|
||||||
|
expect(executeCompactMock.mock.calls[0]?.[0]).toBe("session-race")
|
||||||
|
} finally {
|
||||||
|
restore()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
@ -28,6 +28,7 @@ export function createAnthropicContextWindowLimitRecoveryHook(
|
|||||||
) {
|
) {
|
||||||
const autoCompactState = createRecoveryState()
|
const autoCompactState = createRecoveryState()
|
||||||
const experimental = options?.experimental
|
const experimental = options?.experimental
|
||||||
|
const pendingCompactionTimeoutBySession = new Map<string, ReturnType<typeof setTimeout>>()
|
||||||
|
|
||||||
const eventHandler = async ({ event }: { event: { type: string; properties?: unknown } }) => {
|
const eventHandler = async ({ event }: { event: { type: string; properties?: unknown } }) => {
|
||||||
const props = event.properties as Record<string, unknown> | undefined
|
const props = event.properties as Record<string, unknown> | undefined
|
||||||
@ -35,6 +36,12 @@ export function createAnthropicContextWindowLimitRecoveryHook(
|
|||||||
if (event.type === "session.deleted") {
|
if (event.type === "session.deleted") {
|
||||||
const sessionInfo = props?.info as { id?: string } | undefined
|
const sessionInfo = props?.info as { id?: string } | undefined
|
||||||
if (sessionInfo?.id) {
|
if (sessionInfo?.id) {
|
||||||
|
const timeoutID = pendingCompactionTimeoutBySession.get(sessionInfo.id)
|
||||||
|
if (timeoutID !== undefined) {
|
||||||
|
clearTimeout(timeoutID)
|
||||||
|
pendingCompactionTimeoutBySession.delete(sessionInfo.id)
|
||||||
|
}
|
||||||
|
|
||||||
autoCompactState.pendingCompact.delete(sessionInfo.id)
|
autoCompactState.pendingCompact.delete(sessionInfo.id)
|
||||||
autoCompactState.errorDataBySession.delete(sessionInfo.id)
|
autoCompactState.errorDataBySession.delete(sessionInfo.id)
|
||||||
autoCompactState.retryStateBySession.delete(sessionInfo.id)
|
autoCompactState.retryStateBySession.delete(sessionInfo.id)
|
||||||
@ -76,7 +83,8 @@ export function createAnthropicContextWindowLimitRecoveryHook(
|
|||||||
})
|
})
|
||||||
.catch(() => {})
|
.catch(() => {})
|
||||||
|
|
||||||
setTimeout(() => {
|
const timeoutID = setTimeout(() => {
|
||||||
|
pendingCompactionTimeoutBySession.delete(sessionID)
|
||||||
executeCompact(
|
executeCompact(
|
||||||
sessionID,
|
sessionID,
|
||||||
{ providerID, modelID },
|
{ providerID, modelID },
|
||||||
@ -86,6 +94,8 @@ export function createAnthropicContextWindowLimitRecoveryHook(
|
|||||||
experimental,
|
experimental,
|
||||||
)
|
)
|
||||||
}, 300)
|
}, 300)
|
||||||
|
|
||||||
|
pendingCompactionTimeoutBySession.set(sessionID, timeoutID)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -114,6 +124,12 @@ export function createAnthropicContextWindowLimitRecoveryHook(
|
|||||||
|
|
||||||
if (!autoCompactState.pendingCompact.has(sessionID)) return
|
if (!autoCompactState.pendingCompact.has(sessionID)) return
|
||||||
|
|
||||||
|
const timeoutID = pendingCompactionTimeoutBySession.get(sessionID)
|
||||||
|
if (timeoutID !== undefined) {
|
||||||
|
clearTimeout(timeoutID)
|
||||||
|
pendingCompactionTimeoutBySession.delete(sessionID)
|
||||||
|
}
|
||||||
|
|
||||||
const errorData = autoCompactState.errorDataBySession.get(sessionID)
|
const errorData = autoCompactState.errorDataBySession.get(sessionID)
|
||||||
const lastAssistant = await getLastAssistant(sessionID, ctx.client, ctx.directory)
|
const lastAssistant = await getLastAssistant(sessionID, ctx.client, ctx.directory)
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,12 @@
|
|||||||
import { describe, it, expect, mock, beforeEach } from "bun:test"
|
import { describe, it, expect, mock, beforeEach } from "bun:test"
|
||||||
import { createPreemptiveCompactionHook } from "./preemptive-compaction"
|
|
||||||
|
const logMock = mock(() => {})
|
||||||
|
|
||||||
|
mock.module("../shared/logger", () => ({
|
||||||
|
log: logMock,
|
||||||
|
}))
|
||||||
|
|
||||||
|
const { createPreemptiveCompactionHook } = await import("./preemptive-compaction")
|
||||||
|
|
||||||
function createMockCtx() {
|
function createMockCtx() {
|
||||||
return {
|
return {
|
||||||
@ -21,6 +28,7 @@ describe("preemptive-compaction", () => {
|
|||||||
|
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
ctx = createMockCtx()
|
ctx = createMockCtx()
|
||||||
|
logMock.mockClear()
|
||||||
})
|
})
|
||||||
|
|
||||||
// #given event caches token info from message.updated
|
// #given event caches token info from message.updated
|
||||||
@ -152,4 +160,45 @@ describe("preemptive-compaction", () => {
|
|||||||
|
|
||||||
expect(ctx.client.session.summarize).not.toHaveBeenCalled()
|
expect(ctx.client.session.summarize).not.toHaveBeenCalled()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it("should log summarize errors instead of swallowing them", async () => {
|
||||||
|
//#given
|
||||||
|
const hook = createPreemptiveCompactionHook(ctx as never)
|
||||||
|
const sessionID = "ses_log_error"
|
||||||
|
const summarizeError = new Error("summarize failed")
|
||||||
|
ctx.client.session.summarize.mockRejectedValueOnce(summarizeError)
|
||||||
|
|
||||||
|
await hook.event({
|
||||||
|
event: {
|
||||||
|
type: "message.updated",
|
||||||
|
properties: {
|
||||||
|
info: {
|
||||||
|
role: "assistant",
|
||||||
|
sessionID,
|
||||||
|
providerID: "anthropic",
|
||||||
|
modelID: "claude-sonnet-4-5",
|
||||||
|
finish: true,
|
||||||
|
tokens: {
|
||||||
|
input: 170000,
|
||||||
|
output: 0,
|
||||||
|
reasoning: 0,
|
||||||
|
cache: { read: 10000, write: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
//#when
|
||||||
|
await hook["tool.execute.after"](
|
||||||
|
{ tool: "bash", sessionID, callID: "call_log" },
|
||||||
|
{ title: "", output: "test", metadata: null }
|
||||||
|
)
|
||||||
|
|
||||||
|
//#then
|
||||||
|
expect(logMock).toHaveBeenCalledWith("[preemptive-compaction] Compaction failed", {
|
||||||
|
sessionID,
|
||||||
|
error: String(summarizeError),
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
@ -1,3 +1,5 @@
|
|||||||
|
import { log } from "../shared/logger"
|
||||||
|
|
||||||
const DEFAULT_ACTUAL_LIMIT = 200_000
|
const DEFAULT_ACTUAL_LIMIT = 200_000
|
||||||
|
|
||||||
const ANTHROPIC_ACTUAL_LIMIT =
|
const ANTHROPIC_ACTUAL_LIMIT =
|
||||||
@ -76,8 +78,8 @@ export function createPreemptiveCompactionHook(ctx: PluginInput) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
compactedSessions.add(sessionID)
|
compactedSessions.add(sessionID)
|
||||||
} catch {
|
} catch (error) {
|
||||||
// best-effort; do not disrupt tool execution
|
log("[preemptive-compaction] Compaction failed", { sessionID, error: String(error) })
|
||||||
} finally {
|
} finally {
|
||||||
compactionInProgress.delete(sessionID)
|
compactionInProgress.delete(sessionID)
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user