From 9995b680f717e19e05e98ee6c47967105ab44cb5 Mon Sep 17 00:00:00 2001 From: Aleksey Bragin Date: Wed, 14 Jan 2026 19:42:27 -0500 Subject: [PATCH] fix: add session cursor for tool output --- src/shared/index.ts | 1 + src/shared/session-cursor.test.ts | 53 +++++++++++++++++++ src/shared/session-cursor.ts | 84 ++++++++++++++++++++++++++++++ src/tools/background-task/tools.ts | 18 ++++++- src/tools/call-omo-agent/tools.ts | 9 +++- 5 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 src/shared/session-cursor.test.ts create mode 100644 src/shared/session-cursor.ts diff --git a/src/shared/index.ts b/src/shared/index.ts index df790379..79cc7024 100644 --- a/src/shared/index.ts +++ b/src/shared/index.ts @@ -22,3 +22,4 @@ export * from "./permission-compat" export * from "./external-plugin-detector" export * from "./zip-extractor" export * from "./agent-variant" +export * from "./session-cursor" diff --git a/src/shared/session-cursor.test.ts b/src/shared/session-cursor.test.ts new file mode 100644 index 00000000..bb395a48 --- /dev/null +++ b/src/shared/session-cursor.test.ts @@ -0,0 +1,53 @@ +import { beforeEach, describe, expect, it } from "bun:test" +import { getNewMessages, resetMessageCursor } from "./session-cursor" + +describe("getNewMessages", () => { + const sessionID = "session-123" + + const buildMessage = (id: string, created: number) => ({ + info: { id, time: { created } }, + }) + + beforeEach(() => { + resetMessageCursor(sessionID) + }) + + it("returns all messages on first read and none on repeat", () => { + // #given + const messages = [buildMessage("m1", 1), buildMessage("m2", 2)] + + // #when + const first = getNewMessages(sessionID, messages) + const second = getNewMessages(sessionID, messages) + + // #then + expect(first).toEqual(messages) + expect(second).toEqual([]) + }) + + it("returns only new messages after cursor advances", () => { + // #given + const messages = [buildMessage("m1", 1), buildMessage("m2", 2)] + getNewMessages(sessionID, messages) + const extended = [...messages, buildMessage("m3", 3)] + + // #when + const next = getNewMessages(sessionID, extended) + + // #then + expect(next).toEqual([extended[2]]) + }) + + it("resets when message history shrinks", () => { + // #given + const messages = [buildMessage("m1", 1), buildMessage("m2", 2)] + getNewMessages(sessionID, messages) + const shorter = [buildMessage("n1", 1)] + + // #when + const next = getNewMessages(sessionID, shorter) + + // #then + expect(next).toEqual(shorter) + }) +}) diff --git a/src/shared/session-cursor.ts b/src/shared/session-cursor.ts new file mode 100644 index 00000000..0fc942b8 --- /dev/null +++ b/src/shared/session-cursor.ts @@ -0,0 +1,84 @@ +type MessageTime = + | { created?: number | string } + | number + | string + | undefined + +type MessageInfo = { + id?: string + time?: MessageTime +} + +export type CursorMessage = { + info?: MessageInfo +} + +interface CursorState { + lastKey?: string + lastCount: number +} + +const sessionCursors = new Map() + +function buildMessageKey(message: CursorMessage, index: number): string { + const id = message.info?.id + if (id) return `id:${id}` + + const time = message.info?.time + if (typeof time === "number" || typeof time === "string") { + return `t:${time}:${index}` + } + + const created = time?.created + if (typeof created === "number") { + return `t:${created}:${index}` + } + if (typeof created === "string") { + return `t:${created}:${index}` + } + + return `i:${index}` +} + +export function getNewMessages( + sessionID: string | undefined, + messages: T[] +): T[] { + if (!sessionID) return messages + + const keys = messages.map((message, index) => buildMessageKey(message, index)) + const cursor = sessionCursors.get(sessionID) + let startIndex = 0 + + if (cursor) { + if (cursor.lastCount > messages.length) { + startIndex = 0 + } else if (cursor.lastKey) { + const lastIndex = keys.lastIndexOf(cursor.lastKey) + if (lastIndex >= 0) { + startIndex = lastIndex + 1 + } else if (cursor.lastCount <= messages.length) { + startIndex = cursor.lastCount + } + } + } + + if (messages.length === 0) { + sessionCursors.delete(sessionID) + } else { + sessionCursors.set(sessionID, { + lastKey: keys[keys.length - 1], + lastCount: messages.length, + }) + } + + return messages.slice(startIndex) +} + +export function resetMessageCursor(sessionID?: string): void { + if (sessionID) { + sessionCursors.delete(sessionID) + return + } + sessionCursors.clear() +} diff --git a/src/tools/background-task/tools.ts b/src/tools/background-task/tools.ts index 3a2eeae5..19962e1e 100644 --- a/src/tools/background-task/tools.ts +++ b/src/tools/background-task/tools.ts @@ -7,6 +7,7 @@ import { BACKGROUND_TASK_DESCRIPTION, BACKGROUND_OUTPUT_DESCRIPTION, BACKGROUND_ import { findNearestMessageWithFields, findFirstMessageWithAgent, MESSAGE_STORAGE } from "../../features/hook-message-injector" import { getSessionAgent } from "../../features/claude-code-session-state" import { log } from "../../shared/logger" +import { getNewMessages } from "../../shared/session-cursor" type OpencodeClient = PluginInput["client"] @@ -239,11 +240,26 @@ Session ID: ${task.sessionID} return timeA.localeCompare(timeB) }) + const newMessages = getNewMessages(task.sessionID, sortedMessages) + if (newMessages.length === 0) { + const duration = formatDuration(task.startedAt, task.completedAt) + return `Task Result + +Task ID: ${task.id} +Description: ${task.description} +Duration: ${duration} +Session ID: ${task.sessionID} + +--- + +(No new output since last check)` + } + // Extract content from ALL messages, not just the last one // Tool results may be in earlier messages while the final message is empty const extractedContent: string[] = [] - for (const message of sortedMessages) { + for (const message of newMessages) { for (const part of message.parts ?? []) { // Handle both "text" and "reasoning" parts (thinking models use "reasoning") if ((part.type === "text" || part.type === "reasoning") && part.text) { diff --git a/src/tools/call-omo-agent/tools.ts b/src/tools/call-omo-agent/tools.ts index 0ed498bf..18e3168f 100644 --- a/src/tools/call-omo-agent/tools.ts +++ b/src/tools/call-omo-agent/tools.ts @@ -5,6 +5,7 @@ import { ALLOWED_AGENTS, CALL_OMO_AGENT_DESCRIPTION } from "./constants" import type { CallOmoAgentArgs } from "./types" import type { BackgroundManager } from "../../features/background-agent" import { log } from "../../shared/logger" +import { getNewMessages } from "../../shared/session-cursor" import { findFirstMessageWithAgent, findNearestMessageWithFields, MESSAGE_STORAGE } from "../../features/hook-message-injector" import { getSessionAgent } from "../../features/claude-code-session-state" @@ -290,11 +291,17 @@ async function executeSync( return timeA - timeB }) + const newMessages = getNewMessages(sessionID, sortedMessages) + + if (newMessages.length === 0) { + return `No new output since last check.\n\n\nsession_id: ${sessionID}\n` + } + // Extract content from ALL messages, not just the last one // Tool results may be in earlier messages while the final message is empty const extractedContent: string[] = [] - for (const message of sortedMessages) { + for (const message of newMessages) { // eslint-disable-next-line @typescript-eslint/no-explicit-any for (const part of (message as any).parts ?? []) { // Handle both "text" and "reasoning" parts (thinking models use "reasoning")