fix(ralph-loop): add inFlight guard and improve completion detection to prevent infinite loops
Closes #2084
This commit is contained in:
parent
c505989ad4
commit
a6617d93c0
@ -79,8 +79,8 @@ export async function detectCompletionInSessionMessages(
|
||||
if (assistantMessages.length === 0) return false
|
||||
|
||||
const pattern = buildPromisePattern(options.promise)
|
||||
const recentAssistants = assistantMessages.slice(-3)
|
||||
for (const assistant of recentAssistants) {
|
||||
for (let index = assistantMessages.length - 1; index >= 0; index -= 1) {
|
||||
const assistant = assistantMessages[index]
|
||||
if (!assistant.parts) continue
|
||||
|
||||
let responseText = ""
|
||||
|
||||
@ -494,6 +494,7 @@ describe("ralph-loop", () => {
|
||||
config: {
|
||||
enabled: true,
|
||||
default_max_iterations: 200,
|
||||
default_strategy: "continue",
|
||||
},
|
||||
})
|
||||
|
||||
@ -708,6 +709,57 @@ describe("ralph-loop", () => {
|
||||
expect(promptCalls[0].text).toContain("<promise>CALCULATOR_DONE</promise>")
|
||||
})
|
||||
|
||||
test("should skip concurrent idle events for same session when handler is in flight", async () => {
|
||||
// given - active loop with delayed prompt injection
|
||||
let releasePromptAsync: (() => void) | undefined
|
||||
const promptAsyncBlocked = new Promise<void>((resolve) => {
|
||||
releasePromptAsync = resolve
|
||||
})
|
||||
let firstPromptStartedResolve: (() => void) | undefined
|
||||
const firstPromptStarted = new Promise<void>((resolve) => {
|
||||
firstPromptStartedResolve = resolve
|
||||
})
|
||||
|
||||
const mockInput = createMockPluginInput() as {
|
||||
client: {
|
||||
session: {
|
||||
promptAsync: (opts: { path: { id: string }; body: { parts: Array<{ type: string; text: string }> } }) => Promise<unknown>
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const originalPromptAsync = mockInput.client.session.promptAsync
|
||||
let promptAsyncCalls = 0
|
||||
mockInput.client.session.promptAsync = async (opts) => {
|
||||
promptAsyncCalls += 1
|
||||
if (promptAsyncCalls === 1) {
|
||||
firstPromptStartedResolve?.()
|
||||
}
|
||||
await promptAsyncBlocked
|
||||
return originalPromptAsync(opts)
|
||||
}
|
||||
|
||||
const hook = createRalphLoopHook(mockInput as Parameters<typeof createRalphLoopHook>[0])
|
||||
hook.startLoop("session-123", "Build feature", { maxIterations: 10 })
|
||||
|
||||
// when - second idle arrives while first idle processing is still in flight
|
||||
const firstIdle = hook.event({
|
||||
event: { type: "session.idle", properties: { sessionID: "session-123" } },
|
||||
})
|
||||
await firstPromptStarted
|
||||
const secondIdle = hook.event({
|
||||
event: { type: "session.idle", properties: { sessionID: "session-123" } },
|
||||
})
|
||||
|
||||
releasePromptAsync?.()
|
||||
await Promise.all([firstIdle, secondIdle])
|
||||
|
||||
// then - only one continuation should be injected
|
||||
expect(promptAsyncCalls).toBe(1)
|
||||
expect(promptCalls.length).toBe(1)
|
||||
expect(hook.getState()?.iteration).toBe(2)
|
||||
})
|
||||
|
||||
test("should clear loop state on user abort (MessageAbortedError)", async () => {
|
||||
// given - active loop
|
||||
const hook = createRalphLoopHook(createMockPluginInput())
|
||||
@ -782,8 +834,8 @@ describe("ralph-loop", () => {
|
||||
expect(hook.getState()).toBeNull()
|
||||
})
|
||||
|
||||
test("should NOT detect completion if promise is older than last 3 assistant messages", async () => {
|
||||
// given - promise appears in an assistant message older than last 3
|
||||
test("should detect completion even when promise is older than previous narrow window", async () => {
|
||||
// given - promise appears in an older assistant message with additional assistant output after it
|
||||
mockSessionMessages = [
|
||||
{ info: { role: "user" }, parts: [{ type: "text", text: "Start task" }] },
|
||||
{ info: { role: "assistant" }, parts: [{ type: "text", text: "Promise early <promise>DONE</promise>" }] },
|
||||
@ -801,9 +853,40 @@ describe("ralph-loop", () => {
|
||||
event: { type: "session.idle", properties: { sessionID: "session-123" } },
|
||||
})
|
||||
|
||||
// then - loop should continue (promise is older than last 3 assistant messages)
|
||||
expect(promptCalls.length).toBe(1)
|
||||
expect(hook.getState()?.iteration).toBe(2)
|
||||
// then - loop should complete because all assistant messages are scanned
|
||||
expect(promptCalls.length).toBe(0)
|
||||
expect(toastCalls.some((t) => t.title === "Ralph Loop Complete!")).toBe(true)
|
||||
expect(hook.getState()).toBeNull()
|
||||
})
|
||||
|
||||
test("should detect completion when many assistant messages are emitted after promise", async () => {
|
||||
// given - completion promise followed by long assistant output sequence
|
||||
mockSessionMessages = [
|
||||
{ info: { role: "user" }, parts: [{ type: "text", text: "Start task" }] },
|
||||
{ info: { role: "assistant" }, parts: [{ type: "text", text: "Done now <promise>DONE</promise>" }] },
|
||||
]
|
||||
|
||||
for (let index = 1; index <= 25; index += 1) {
|
||||
mockSessionMessages.push({
|
||||
info: { role: "assistant" },
|
||||
parts: [{ type: "text", text: `Post-completion assistant output ${index}` }],
|
||||
})
|
||||
}
|
||||
|
||||
const hook = createRalphLoopHook(createMockPluginInput(), {
|
||||
getTranscriptPath: () => join(TEST_DIR, "nonexistent.jsonl"),
|
||||
})
|
||||
hook.startLoop("session-123", "Build something", { completionPromise: "DONE" })
|
||||
|
||||
// when - session goes idle
|
||||
await hook.event({
|
||||
event: { type: "session.idle", properties: { sessionID: "session-123" } },
|
||||
})
|
||||
|
||||
// then - loop should complete despite large trailing output
|
||||
expect(promptCalls.length).toBe(0)
|
||||
expect(toastCalls.some((t) => t.title === "Ralph Loop Complete!")).toBe(true)
|
||||
expect(hook.getState()).toBeNull()
|
||||
})
|
||||
|
||||
test("should allow starting new loop while previous loop is active (different session)", async () => {
|
||||
|
||||
@ -25,6 +25,8 @@ export function createRalphLoopEventHandler(
|
||||
ctx: PluginInput,
|
||||
options: RalphLoopEventHandlerOptions,
|
||||
) {
|
||||
const inFlightSessions = new Set<string>()
|
||||
|
||||
return async ({ event }: { event: { type: string; properties?: unknown } }): Promise<void> => {
|
||||
const props = event.properties as Record<string, unknown> | undefined
|
||||
|
||||
@ -32,115 +34,127 @@ export function createRalphLoopEventHandler(
|
||||
const sessionID = props?.sessionID as string | undefined
|
||||
if (!sessionID) return
|
||||
|
||||
if (options.sessionRecovery.isRecovering(sessionID)) {
|
||||
log(`[${HOOK_NAME}] Skipped: in recovery`, { sessionID })
|
||||
if (inFlightSessions.has(sessionID)) {
|
||||
log(`[${HOOK_NAME}] Skipped: handler in flight`, { sessionID })
|
||||
return
|
||||
}
|
||||
|
||||
const state = options.loopState.getState()
|
||||
if (!state || !state.active) {
|
||||
return
|
||||
}
|
||||
|
||||
if (state.session_id && state.session_id !== sessionID) {
|
||||
if (options.checkSessionExists) {
|
||||
try {
|
||||
const exists = await options.checkSessionExists(state.session_id)
|
||||
if (!exists) {
|
||||
options.loopState.clear()
|
||||
log(`[${HOOK_NAME}] Cleared orphaned state from deleted session`, {
|
||||
orphanedSessionId: state.session_id,
|
||||
currentSessionId: sessionID,
|
||||
})
|
||||
return
|
||||
}
|
||||
} catch (err) {
|
||||
log(`[${HOOK_NAME}] Failed to check session existence`, {
|
||||
sessionId: state.session_id,
|
||||
error: String(err),
|
||||
})
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
const transcriptPath = options.getTranscriptPath(sessionID)
|
||||
const completionViaTranscript = detectCompletionInTranscript(transcriptPath, state.completion_promise)
|
||||
const completionViaApi = completionViaTranscript
|
||||
? false
|
||||
: await detectCompletionInSessionMessages(ctx, {
|
||||
sessionID,
|
||||
promise: state.completion_promise,
|
||||
apiTimeoutMs: options.apiTimeoutMs,
|
||||
directory: options.directory,
|
||||
})
|
||||
|
||||
if (completionViaTranscript || completionViaApi) {
|
||||
log(`[${HOOK_NAME}] Completion detected!`, {
|
||||
sessionID,
|
||||
iteration: state.iteration,
|
||||
promise: state.completion_promise,
|
||||
detectedVia: completionViaTranscript
|
||||
? "transcript_file"
|
||||
: "session_messages_api",
|
||||
})
|
||||
options.loopState.clear()
|
||||
|
||||
const title = state.ultrawork ? "ULTRAWORK LOOP COMPLETE!" : "Ralph Loop Complete!"
|
||||
const message = state.ultrawork ? `JUST ULW ULW! Task completed after ${state.iteration} iteration(s)` : `Task completed after ${state.iteration} iteration(s)`
|
||||
await ctx.client.tui?.showToast?.({ body: { title, message, variant: "success", duration: 5000 } }).catch(() => {})
|
||||
return
|
||||
}
|
||||
|
||||
if (state.iteration >= state.max_iterations) {
|
||||
log(`[${HOOK_NAME}] Max iterations reached`, {
|
||||
sessionID,
|
||||
iteration: state.iteration,
|
||||
max: state.max_iterations,
|
||||
})
|
||||
options.loopState.clear()
|
||||
|
||||
await ctx.client.tui?.showToast?.({
|
||||
body: { title: "Ralph Loop Stopped", message: `Max iterations (${state.max_iterations}) reached without completion`, variant: "warning", duration: 5000 },
|
||||
}).catch(() => {})
|
||||
return
|
||||
}
|
||||
|
||||
const newState = options.loopState.incrementIteration()
|
||||
if (!newState) {
|
||||
log(`[${HOOK_NAME}] Failed to increment iteration`, { sessionID })
|
||||
return
|
||||
}
|
||||
|
||||
log(`[${HOOK_NAME}] Continuing loop`, {
|
||||
sessionID,
|
||||
iteration: newState.iteration,
|
||||
max: newState.max_iterations,
|
||||
})
|
||||
|
||||
await ctx.client.tui?.showToast?.({
|
||||
body: {
|
||||
title: "Ralph Loop",
|
||||
message: `Iteration ${newState.iteration}/${newState.max_iterations}`,
|
||||
variant: "info",
|
||||
duration: 2000,
|
||||
},
|
||||
}).catch(() => {})
|
||||
inFlightSessions.add(sessionID)
|
||||
|
||||
try {
|
||||
await continueIteration(ctx, newState, {
|
||||
previousSessionID: sessionID,
|
||||
directory: options.directory,
|
||||
apiTimeoutMs: options.apiTimeoutMs,
|
||||
loopState: options.loopState,
|
||||
})
|
||||
} catch (err) {
|
||||
log(`[${HOOK_NAME}] Failed to inject continuation`, {
|
||||
|
||||
if (options.sessionRecovery.isRecovering(sessionID)) {
|
||||
log(`[${HOOK_NAME}] Skipped: in recovery`, { sessionID })
|
||||
return
|
||||
}
|
||||
|
||||
const state = options.loopState.getState()
|
||||
if (!state || !state.active) {
|
||||
return
|
||||
}
|
||||
|
||||
if (state.session_id && state.session_id !== sessionID) {
|
||||
if (options.checkSessionExists) {
|
||||
try {
|
||||
const exists = await options.checkSessionExists(state.session_id)
|
||||
if (!exists) {
|
||||
options.loopState.clear()
|
||||
log(`[${HOOK_NAME}] Cleared orphaned state from deleted session`, {
|
||||
orphanedSessionId: state.session_id,
|
||||
currentSessionId: sessionID,
|
||||
})
|
||||
return
|
||||
}
|
||||
} catch (err) {
|
||||
log(`[${HOOK_NAME}] Failed to check session existence`, {
|
||||
sessionId: state.session_id,
|
||||
error: String(err),
|
||||
})
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
const transcriptPath = options.getTranscriptPath(sessionID)
|
||||
const completionViaTranscript = detectCompletionInTranscript(transcriptPath, state.completion_promise)
|
||||
const completionViaApi = completionViaTranscript
|
||||
? false
|
||||
: await detectCompletionInSessionMessages(ctx, {
|
||||
sessionID,
|
||||
promise: state.completion_promise,
|
||||
apiTimeoutMs: options.apiTimeoutMs,
|
||||
directory: options.directory,
|
||||
})
|
||||
|
||||
if (completionViaTranscript || completionViaApi) {
|
||||
log(`[${HOOK_NAME}] Completion detected!`, {
|
||||
sessionID,
|
||||
iteration: state.iteration,
|
||||
promise: state.completion_promise,
|
||||
detectedVia: completionViaTranscript
|
||||
? "transcript_file"
|
||||
: "session_messages_api",
|
||||
})
|
||||
options.loopState.clear()
|
||||
|
||||
const title = state.ultrawork ? "ULTRAWORK LOOP COMPLETE!" : "Ralph Loop Complete!"
|
||||
const message = state.ultrawork ? `JUST ULW ULW! Task completed after ${state.iteration} iteration(s)` : `Task completed after ${state.iteration} iteration(s)`
|
||||
await ctx.client.tui?.showToast?.({ body: { title, message, variant: "success", duration: 5000 } }).catch(() => {})
|
||||
return
|
||||
}
|
||||
|
||||
if (state.iteration >= state.max_iterations) {
|
||||
log(`[${HOOK_NAME}] Max iterations reached`, {
|
||||
sessionID,
|
||||
iteration: state.iteration,
|
||||
max: state.max_iterations,
|
||||
})
|
||||
options.loopState.clear()
|
||||
|
||||
await ctx.client.tui?.showToast?.({
|
||||
body: { title: "Ralph Loop Stopped", message: `Max iterations (${state.max_iterations}) reached without completion`, variant: "warning", duration: 5000 },
|
||||
}).catch(() => {})
|
||||
return
|
||||
}
|
||||
|
||||
const newState = options.loopState.incrementIteration()
|
||||
if (!newState) {
|
||||
log(`[${HOOK_NAME}] Failed to increment iteration`, { sessionID })
|
||||
return
|
||||
}
|
||||
|
||||
log(`[${HOOK_NAME}] Continuing loop`, {
|
||||
sessionID,
|
||||
error: String(err),
|
||||
iteration: newState.iteration,
|
||||
max: newState.max_iterations,
|
||||
})
|
||||
|
||||
await ctx.client.tui?.showToast?.({
|
||||
body: {
|
||||
title: "Ralph Loop",
|
||||
message: `Iteration ${newState.iteration}/${newState.max_iterations}`,
|
||||
variant: "info",
|
||||
duration: 2000,
|
||||
},
|
||||
}).catch(() => {})
|
||||
|
||||
try {
|
||||
await continueIteration(ctx, newState, {
|
||||
previousSessionID: sessionID,
|
||||
directory: options.directory,
|
||||
apiTimeoutMs: options.apiTimeoutMs,
|
||||
loopState: options.loopState,
|
||||
})
|
||||
} catch (err) {
|
||||
log(`[${HOOK_NAME}] Failed to inject continuation`, {
|
||||
sessionID,
|
||||
error: String(err),
|
||||
})
|
||||
}
|
||||
return
|
||||
} finally {
|
||||
inFlightSessions.delete(sessionID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if (event.type === "session.deleted") {
|
||||
|
||||
@ -36,7 +36,7 @@ async function waitUntil(condition: () => boolean): Promise<void> {
|
||||
}
|
||||
|
||||
describe("ralph-loop reset strategy race condition", () => {
|
||||
test("should continue iteration when old session idle arrives before TUI switch completes", async () => {
|
||||
test("should skip duplicate idle while reset iteration handling is in flight", async () => {
|
||||
// given - reset strategy loop with blocked TUI session switch
|
||||
const promptCalls: Array<{ sessionID: string; text: string }> = []
|
||||
const createSessionCalls: Array<{ parentID?: string }> = []
|
||||
@ -85,7 +85,7 @@ describe("ralph-loop reset strategy race condition", () => {
|
||||
},
|
||||
},
|
||||
},
|
||||
} as Parameters<typeof createRalphLoopHook>[0])
|
||||
} as unknown as Parameters<typeof createRalphLoopHook>[0])
|
||||
|
||||
hook.startLoop("session-old", "Build feature", { strategy: "reset" })
|
||||
|
||||
@ -100,14 +100,12 @@ describe("ralph-loop reset strategy race condition", () => {
|
||||
event: { type: "session.idle", properties: { sessionID: "session-old" } },
|
||||
})
|
||||
|
||||
await waitUntil(() => selectSessionCalls > 1)
|
||||
|
||||
selectSessionDeferred.resolve()
|
||||
await Promise.all([firstIdleEvent, secondIdleEvent])
|
||||
|
||||
// then - second idle should not be skipped during reset transition
|
||||
expect(createSessionCalls.length).toBe(2)
|
||||
expect(promptCalls.length).toBe(2)
|
||||
expect(hook.getState()?.iteration).toBe(3)
|
||||
// then - duplicate idle should be skipped to prevent concurrent continuation injection
|
||||
expect(createSessionCalls.length).toBe(1)
|
||||
expect(promptCalls.length).toBe(1)
|
||||
expect(hook.getState()?.iteration).toBe(2)
|
||||
})
|
||||
})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user