From 9995b680f717e19e05e98ee6c47967105ab44cb5 Mon Sep 17 00:00:00 2001 From: Aleksey Bragin Date: Wed, 14 Jan 2026 19:42:27 -0500 Subject: [PATCH 1/4] fix: add session cursor for tool output --- src/shared/index.ts | 1 + src/shared/session-cursor.test.ts | 53 +++++++++++++++++++ src/shared/session-cursor.ts | 84 ++++++++++++++++++++++++++++++ src/tools/background-task/tools.ts | 18 ++++++- src/tools/call-omo-agent/tools.ts | 9 +++- 5 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 src/shared/session-cursor.test.ts create mode 100644 src/shared/session-cursor.ts diff --git a/src/shared/index.ts b/src/shared/index.ts index df790379..79cc7024 100644 --- a/src/shared/index.ts +++ b/src/shared/index.ts @@ -22,3 +22,4 @@ export * from "./permission-compat" export * from "./external-plugin-detector" export * from "./zip-extractor" export * from "./agent-variant" +export * from "./session-cursor" diff --git a/src/shared/session-cursor.test.ts b/src/shared/session-cursor.test.ts new file mode 100644 index 00000000..bb395a48 --- /dev/null +++ b/src/shared/session-cursor.test.ts @@ -0,0 +1,53 @@ +import { beforeEach, describe, expect, it } from "bun:test" +import { getNewMessages, resetMessageCursor } from "./session-cursor" + +describe("getNewMessages", () => { + const sessionID = "session-123" + + const buildMessage = (id: string, created: number) => ({ + info: { id, time: { created } }, + }) + + beforeEach(() => { + resetMessageCursor(sessionID) + }) + + it("returns all messages on first read and none on repeat", () => { + // #given + const messages = [buildMessage("m1", 1), buildMessage("m2", 2)] + + // #when + const first = getNewMessages(sessionID, messages) + const second = getNewMessages(sessionID, messages) + + // #then + expect(first).toEqual(messages) + expect(second).toEqual([]) + }) + + it("returns only new messages after cursor advances", () => { + // #given + const messages = [buildMessage("m1", 1), buildMessage("m2", 2)] + getNewMessages(sessionID, messages) + const extended = [...messages, buildMessage("m3", 3)] + + // #when + const next = getNewMessages(sessionID, extended) + + // #then + expect(next).toEqual([extended[2]]) + }) + + it("resets when message history shrinks", () => { + // #given + const messages = [buildMessage("m1", 1), buildMessage("m2", 2)] + getNewMessages(sessionID, messages) + const shorter = [buildMessage("n1", 1)] + + // #when + const next = getNewMessages(sessionID, shorter) + + // #then + expect(next).toEqual(shorter) + }) +}) diff --git a/src/shared/session-cursor.ts b/src/shared/session-cursor.ts new file mode 100644 index 00000000..0fc942b8 --- /dev/null +++ b/src/shared/session-cursor.ts @@ -0,0 +1,84 @@ +type MessageTime = + | { created?: number | string } + | number + | string + | undefined + +type MessageInfo = { + id?: string + time?: MessageTime +} + +export type CursorMessage = { + info?: MessageInfo +} + +interface CursorState { + lastKey?: string + lastCount: number +} + +const sessionCursors = new Map() + +function buildMessageKey(message: CursorMessage, index: number): string { + const id = message.info?.id + if (id) return `id:${id}` + + const time = message.info?.time + if (typeof time === "number" || typeof time === "string") { + return `t:${time}:${index}` + } + + const created = time?.created + if (typeof created === "number") { + return `t:${created}:${index}` + } + if (typeof created === "string") { + return `t:${created}:${index}` + } + + return `i:${index}` +} + +export function getNewMessages( + sessionID: string | undefined, + messages: T[] +): T[] { + if (!sessionID) return messages + + const keys = messages.map((message, index) => buildMessageKey(message, index)) + const cursor = sessionCursors.get(sessionID) + let startIndex = 0 + + if (cursor) { + if (cursor.lastCount > messages.length) { + startIndex = 0 + } else if (cursor.lastKey) { + const lastIndex = keys.lastIndexOf(cursor.lastKey) + if (lastIndex >= 0) { + startIndex = lastIndex + 1 + } else if (cursor.lastCount <= messages.length) { + startIndex = cursor.lastCount + } + } + } + + if (messages.length === 0) { + sessionCursors.delete(sessionID) + } else { + sessionCursors.set(sessionID, { + lastKey: keys[keys.length - 1], + lastCount: messages.length, + }) + } + + return messages.slice(startIndex) +} + +export function resetMessageCursor(sessionID?: string): void { + if (sessionID) { + sessionCursors.delete(sessionID) + return + } + sessionCursors.clear() +} diff --git a/src/tools/background-task/tools.ts b/src/tools/background-task/tools.ts index 3a2eeae5..19962e1e 100644 --- a/src/tools/background-task/tools.ts +++ b/src/tools/background-task/tools.ts @@ -7,6 +7,7 @@ import { BACKGROUND_TASK_DESCRIPTION, BACKGROUND_OUTPUT_DESCRIPTION, BACKGROUND_ import { findNearestMessageWithFields, findFirstMessageWithAgent, MESSAGE_STORAGE } from "../../features/hook-message-injector" import { getSessionAgent } from "../../features/claude-code-session-state" import { log } from "../../shared/logger" +import { getNewMessages } from "../../shared/session-cursor" type OpencodeClient = PluginInput["client"] @@ -239,11 +240,26 @@ Session ID: ${task.sessionID} return timeA.localeCompare(timeB) }) + const newMessages = getNewMessages(task.sessionID, sortedMessages) + if (newMessages.length === 0) { + const duration = formatDuration(task.startedAt, task.completedAt) + return `Task Result + +Task ID: ${task.id} +Description: ${task.description} +Duration: ${duration} +Session ID: ${task.sessionID} + +--- + +(No new output since last check)` + } + // Extract content from ALL messages, not just the last one // Tool results may be in earlier messages while the final message is empty const extractedContent: string[] = [] - for (const message of sortedMessages) { + for (const message of newMessages) { for (const part of message.parts ?? []) { // Handle both "text" and "reasoning" parts (thinking models use "reasoning") if ((part.type === "text" || part.type === "reasoning") && part.text) { diff --git a/src/tools/call-omo-agent/tools.ts b/src/tools/call-omo-agent/tools.ts index 0ed498bf..18e3168f 100644 --- a/src/tools/call-omo-agent/tools.ts +++ b/src/tools/call-omo-agent/tools.ts @@ -5,6 +5,7 @@ import { ALLOWED_AGENTS, CALL_OMO_AGENT_DESCRIPTION } from "./constants" import type { CallOmoAgentArgs } from "./types" import type { BackgroundManager } from "../../features/background-agent" import { log } from "../../shared/logger" +import { getNewMessages } from "../../shared/session-cursor" import { findFirstMessageWithAgent, findNearestMessageWithFields, MESSAGE_STORAGE } from "../../features/hook-message-injector" import { getSessionAgent } from "../../features/claude-code-session-state" @@ -290,11 +291,17 @@ async function executeSync( return timeA - timeB }) + const newMessages = getNewMessages(sessionID, sortedMessages) + + if (newMessages.length === 0) { + return `No new output since last check.\n\n\nsession_id: ${sessionID}\n` + } + // Extract content from ALL messages, not just the last one // Tool results may be in earlier messages while the final message is empty const extractedContent: string[] = [] - for (const message of sortedMessages) { + for (const message of newMessages) { // eslint-disable-next-line @typescript-eslint/no-explicit-any for (const part of (message as any).parts ?? []) { // Handle both "text" and "reasoning" parts (thinking models use "reasoning") From acb16bcb27574313a20fa24bbf8029600bb56bcd Mon Sep 17 00:00:00 2001 From: Aleksey Bragin Date: Wed, 14 Jan 2026 19:58:56 -0500 Subject: [PATCH 2/4] fix: reset cursor when history changes --- src/shared/session-cursor.test.ts | 13 +++++++++++++ src/shared/session-cursor.ts | 5 +++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/shared/session-cursor.test.ts b/src/shared/session-cursor.test.ts index bb395a48..2edf1e16 100644 --- a/src/shared/session-cursor.test.ts +++ b/src/shared/session-cursor.test.ts @@ -50,4 +50,17 @@ describe("getNewMessages", () => { // #then expect(next).toEqual(shorter) }) + + it("returns all messages when last key is missing", () => { + // #given + const messages = [buildMessage("m1", 1), buildMessage("m2", 2)] + getNewMessages(sessionID, messages) + const replaced = [buildMessage("n1", 1), buildMessage("n2", 2)] + + // #when + const next = getNewMessages(sessionID, replaced) + + // #then + expect(next).toEqual(replaced) + }) }) diff --git a/src/shared/session-cursor.ts b/src/shared/session-cursor.ts index 0fc942b8..d9c38a28 100644 --- a/src/shared/session-cursor.ts +++ b/src/shared/session-cursor.ts @@ -57,8 +57,9 @@ export function getNewMessages( const lastIndex = keys.lastIndexOf(cursor.lastKey) if (lastIndex >= 0) { startIndex = lastIndex + 1 - } else if (cursor.lastCount <= messages.length) { - startIndex = cursor.lastCount + } else { + // History changed without a shrink; reset to avoid skipping messages. + startIndex = 0 } } } From 3de559ff87cab398c3ed7eaf34cde3cd7cedaa09 Mon Sep 17 00:00:00 2001 From: Kenny Date: Wed, 14 Jan 2026 21:06:26 -0500 Subject: [PATCH 3/4] refactor: rename getNewMessages to consumeNewMessages Rename to signal mutation behavior - the function advances the cursor as a side effect, so 'consume' better reflects that calling it twice with the same input yields different results. --- src/shared/session-cursor.test.ts | 20 ++++++++++---------- src/shared/session-cursor.ts | 2 +- src/tools/background-task/tools.ts | 4 ++-- src/tools/call-omo-agent/tools.ts | 4 ++-- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/shared/session-cursor.test.ts b/src/shared/session-cursor.test.ts index 2edf1e16..4ef0ff8d 100644 --- a/src/shared/session-cursor.test.ts +++ b/src/shared/session-cursor.test.ts @@ -1,7 +1,7 @@ import { beforeEach, describe, expect, it } from "bun:test" -import { getNewMessages, resetMessageCursor } from "./session-cursor" +import { consumeNewMessages, resetMessageCursor } from "./session-cursor" -describe("getNewMessages", () => { +describe("consumeNewMessages", () => { const sessionID = "session-123" const buildMessage = (id: string, created: number) => ({ @@ -17,8 +17,8 @@ describe("getNewMessages", () => { const messages = [buildMessage("m1", 1), buildMessage("m2", 2)] // #when - const first = getNewMessages(sessionID, messages) - const second = getNewMessages(sessionID, messages) + const first = consumeNewMessages(sessionID, messages) + const second = consumeNewMessages(sessionID, messages) // #then expect(first).toEqual(messages) @@ -28,11 +28,11 @@ describe("getNewMessages", () => { it("returns only new messages after cursor advances", () => { // #given const messages = [buildMessage("m1", 1), buildMessage("m2", 2)] - getNewMessages(sessionID, messages) + consumeNewMessages(sessionID, messages) const extended = [...messages, buildMessage("m3", 3)] // #when - const next = getNewMessages(sessionID, extended) + const next = consumeNewMessages(sessionID, extended) // #then expect(next).toEqual([extended[2]]) @@ -41,11 +41,11 @@ describe("getNewMessages", () => { it("resets when message history shrinks", () => { // #given const messages = [buildMessage("m1", 1), buildMessage("m2", 2)] - getNewMessages(sessionID, messages) + consumeNewMessages(sessionID, messages) const shorter = [buildMessage("n1", 1)] // #when - const next = getNewMessages(sessionID, shorter) + const next = consumeNewMessages(sessionID, shorter) // #then expect(next).toEqual(shorter) @@ -54,11 +54,11 @@ describe("getNewMessages", () => { it("returns all messages when last key is missing", () => { // #given const messages = [buildMessage("m1", 1), buildMessage("m2", 2)] - getNewMessages(sessionID, messages) + consumeNewMessages(sessionID, messages) const replaced = [buildMessage("n1", 1), buildMessage("n2", 2)] // #when - const next = getNewMessages(sessionID, replaced) + const next = consumeNewMessages(sessionID, replaced) // #then expect(next).toEqual(replaced) diff --git a/src/shared/session-cursor.ts b/src/shared/session-cursor.ts index d9c38a28..37ec0bab 100644 --- a/src/shared/session-cursor.ts +++ b/src/shared/session-cursor.ts @@ -40,7 +40,7 @@ function buildMessageKey(message: CursorMessage, index: number): string { return `i:${index}` } -export function getNewMessages( +export function consumeNewMessages( sessionID: string | undefined, messages: T[] ): T[] { diff --git a/src/tools/background-task/tools.ts b/src/tools/background-task/tools.ts index 19962e1e..02000931 100644 --- a/src/tools/background-task/tools.ts +++ b/src/tools/background-task/tools.ts @@ -7,7 +7,7 @@ import { BACKGROUND_TASK_DESCRIPTION, BACKGROUND_OUTPUT_DESCRIPTION, BACKGROUND_ import { findNearestMessageWithFields, findFirstMessageWithAgent, MESSAGE_STORAGE } from "../../features/hook-message-injector" import { getSessionAgent } from "../../features/claude-code-session-state" import { log } from "../../shared/logger" -import { getNewMessages } from "../../shared/session-cursor" +import { consumeNewMessages } from "../../shared/session-cursor" type OpencodeClient = PluginInput["client"] @@ -240,7 +240,7 @@ Session ID: ${task.sessionID} return timeA.localeCompare(timeB) }) - const newMessages = getNewMessages(task.sessionID, sortedMessages) + const newMessages = consumeNewMessages(task.sessionID, sortedMessages) if (newMessages.length === 0) { const duration = formatDuration(task.startedAt, task.completedAt) return `Task Result diff --git a/src/tools/call-omo-agent/tools.ts b/src/tools/call-omo-agent/tools.ts index 18e3168f..ef92341c 100644 --- a/src/tools/call-omo-agent/tools.ts +++ b/src/tools/call-omo-agent/tools.ts @@ -5,7 +5,7 @@ import { ALLOWED_AGENTS, CALL_OMO_AGENT_DESCRIPTION } from "./constants" import type { CallOmoAgentArgs } from "./types" import type { BackgroundManager } from "../../features/background-agent" import { log } from "../../shared/logger" -import { getNewMessages } from "../../shared/session-cursor" +import { consumeNewMessages } from "../../shared/session-cursor" import { findFirstMessageWithAgent, findNearestMessageWithFields, MESSAGE_STORAGE } from "../../features/hook-message-injector" import { getSessionAgent } from "../../features/claude-code-session-state" @@ -291,7 +291,7 @@ async function executeSync( return timeA - timeB }) - const newMessages = getNewMessages(sessionID, sortedMessages) + const newMessages = consumeNewMessages(sessionID, sortedMessages) if (newMessages.length === 0) { return `No new output since last check.\n\n\nsession_id: ${sessionID}\n` From ef65f405e8c7715d293f79847df58772d10c65c6 Mon Sep 17 00:00:00 2001 From: Kenny Date: Wed, 14 Jan 2026 21:17:41 -0500 Subject: [PATCH 4/4] fix: clean up session cursor state on session deletion Add resetMessageCursor call in session.deleted handler to prevent unbounded memory growth from orphaned cursor entries. --- src/index.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/index.ts b/src/index.ts index c4f3bb25..9f4871ef 100644 --- a/src/index.ts +++ b/src/index.ts @@ -73,7 +73,7 @@ import { BackgroundManager } from "./features/background-agent"; import { SkillMcpManager } from "./features/skill-mcp-manager"; import { initTaskToastManager } from "./features/task-toast-manager"; import { type HookName } from "./config"; -import { log, detectExternalNotificationPlugin, getNotificationConflictWarning } from "./shared"; +import { log, detectExternalNotificationPlugin, getNotificationConflictWarning, resetMessageCursor } from "./shared"; import { loadPluginConfig } from "./plugin-config"; import { createModelCacheState, getModelLimit } from "./plugin-state"; import { createConfigHandler } from "./plugin-handlers"; @@ -445,6 +445,7 @@ const OhMyOpenCodePlugin: Plugin = async (ctx) => { } if (sessionInfo?.id) { clearSessionAgent(sessionInfo.id); + resetMessageCursor(sessionInfo.id); firstMessageVariantGate.clear(sessionInfo.id); await skillMcpManager.disconnectSession(sessionInfo.id); await lspManager.cleanupTempDirectoryClients();