Merge pull request #801 from stranger2904/fix/session-cursor-output

fix: add session cursor for tool output
This commit is contained in:
Kenny 2026-01-14 21:25:40 -05:00 committed by GitHub
commit 84e97ba900
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 179 additions and 3 deletions

View File

@ -73,7 +73,7 @@ import { BackgroundManager } from "./features/background-agent";
import { SkillMcpManager } from "./features/skill-mcp-manager";
import { initTaskToastManager } from "./features/task-toast-manager";
import { type HookName } from "./config";
import { log, detectExternalNotificationPlugin, getNotificationConflictWarning } from "./shared";
import { log, detectExternalNotificationPlugin, getNotificationConflictWarning, resetMessageCursor } from "./shared";
import { loadPluginConfig } from "./plugin-config";
import { createModelCacheState, getModelLimit } from "./plugin-state";
import { createConfigHandler } from "./plugin-handlers";
@ -445,6 +445,7 @@ const OhMyOpenCodePlugin: Plugin = async (ctx) => {
}
if (sessionInfo?.id) {
clearSessionAgent(sessionInfo.id);
resetMessageCursor(sessionInfo.id);
firstMessageVariantGate.clear(sessionInfo.id);
await skillMcpManager.disconnectSession(sessionInfo.id);
await lspManager.cleanupTempDirectoryClients();

View File

@ -22,3 +22,4 @@ export * from "./permission-compat"
export * from "./external-plugin-detector"
export * from "./zip-extractor"
export * from "./agent-variant"
export * from "./session-cursor"

View File

@ -0,0 +1,66 @@
import { beforeEach, describe, expect, it } from "bun:test"
import { consumeNewMessages, resetMessageCursor } from "./session-cursor"
describe("consumeNewMessages", () => {
const sessionID = "session-123"
const buildMessage = (id: string, created: number) => ({
info: { id, time: { created } },
})
beforeEach(() => {
resetMessageCursor(sessionID)
})
it("returns all messages on first read and none on repeat", () => {
// #given
const messages = [buildMessage("m1", 1), buildMessage("m2", 2)]
// #when
const first = consumeNewMessages(sessionID, messages)
const second = consumeNewMessages(sessionID, messages)
// #then
expect(first).toEqual(messages)
expect(second).toEqual([])
})
it("returns only new messages after cursor advances", () => {
// #given
const messages = [buildMessage("m1", 1), buildMessage("m2", 2)]
consumeNewMessages(sessionID, messages)
const extended = [...messages, buildMessage("m3", 3)]
// #when
const next = consumeNewMessages(sessionID, extended)
// #then
expect(next).toEqual([extended[2]])
})
it("resets when message history shrinks", () => {
// #given
const messages = [buildMessage("m1", 1), buildMessage("m2", 2)]
consumeNewMessages(sessionID, messages)
const shorter = [buildMessage("n1", 1)]
// #when
const next = consumeNewMessages(sessionID, shorter)
// #then
expect(next).toEqual(shorter)
})
it("returns all messages when last key is missing", () => {
// #given
const messages = [buildMessage("m1", 1), buildMessage("m2", 2)]
consumeNewMessages(sessionID, messages)
const replaced = [buildMessage("n1", 1), buildMessage("n2", 2)]
// #when
const next = consumeNewMessages(sessionID, replaced)
// #then
expect(next).toEqual(replaced)
})
})

View File

@ -0,0 +1,85 @@
type MessageTime =
| { created?: number | string }
| number
| string
| undefined
type MessageInfo = {
id?: string
time?: MessageTime
}
export type CursorMessage = {
info?: MessageInfo
}
interface CursorState {
lastKey?: string
lastCount: number
}
const sessionCursors = new Map<string, CursorState>()
function buildMessageKey(message: CursorMessage, index: number): string {
const id = message.info?.id
if (id) return `id:${id}`
const time = message.info?.time
if (typeof time === "number" || typeof time === "string") {
return `t:${time}:${index}`
}
const created = time?.created
if (typeof created === "number") {
return `t:${created}:${index}`
}
if (typeof created === "string") {
return `t:${created}:${index}`
}
return `i:${index}`
}
export function consumeNewMessages<T extends CursorMessage>(
sessionID: string | undefined,
messages: T[]
): T[] {
if (!sessionID) return messages
const keys = messages.map((message, index) => buildMessageKey(message, index))
const cursor = sessionCursors.get(sessionID)
let startIndex = 0
if (cursor) {
if (cursor.lastCount > messages.length) {
startIndex = 0
} else if (cursor.lastKey) {
const lastIndex = keys.lastIndexOf(cursor.lastKey)
if (lastIndex >= 0) {
startIndex = lastIndex + 1
} else {
// History changed without a shrink; reset to avoid skipping messages.
startIndex = 0
}
}
}
if (messages.length === 0) {
sessionCursors.delete(sessionID)
} else {
sessionCursors.set(sessionID, {
lastKey: keys[keys.length - 1],
lastCount: messages.length,
})
}
return messages.slice(startIndex)
}
export function resetMessageCursor(sessionID?: string): void {
if (sessionID) {
sessionCursors.delete(sessionID)
return
}
sessionCursors.clear()
}

View File

@ -7,6 +7,7 @@ import { BACKGROUND_TASK_DESCRIPTION, BACKGROUND_OUTPUT_DESCRIPTION, BACKGROUND_
import { findNearestMessageWithFields, findFirstMessageWithAgent, MESSAGE_STORAGE } from "../../features/hook-message-injector"
import { getSessionAgent } from "../../features/claude-code-session-state"
import { log } from "../../shared/logger"
import { consumeNewMessages } from "../../shared/session-cursor"
type OpencodeClient = PluginInput["client"]
@ -239,11 +240,26 @@ Session ID: ${task.sessionID}
return timeA.localeCompare(timeB)
})
const newMessages = consumeNewMessages(task.sessionID, sortedMessages)
if (newMessages.length === 0) {
const duration = formatDuration(task.startedAt, task.completedAt)
return `Task Result
Task ID: ${task.id}
Description: ${task.description}
Duration: ${duration}
Session ID: ${task.sessionID}
---
(No new output since last check)`
}
// Extract content from ALL messages, not just the last one
// Tool results may be in earlier messages while the final message is empty
const extractedContent: string[] = []
for (const message of sortedMessages) {
for (const message of newMessages) {
for (const part of message.parts ?? []) {
// Handle both "text" and "reasoning" parts (thinking models use "reasoning")
if ((part.type === "text" || part.type === "reasoning") && part.text) {

View File

@ -5,6 +5,7 @@ import { ALLOWED_AGENTS, CALL_OMO_AGENT_DESCRIPTION } from "./constants"
import type { CallOmoAgentArgs } from "./types"
import type { BackgroundManager } from "../../features/background-agent"
import { log } from "../../shared/logger"
import { consumeNewMessages } from "../../shared/session-cursor"
import { findFirstMessageWithAgent, findNearestMessageWithFields, MESSAGE_STORAGE } from "../../features/hook-message-injector"
import { getSessionAgent } from "../../features/claude-code-session-state"
@ -290,11 +291,17 @@ async function executeSync(
return timeA - timeB
})
const newMessages = consumeNewMessages(sessionID, sortedMessages)
if (newMessages.length === 0) {
return `No new output since last check.\n\n<task_metadata>\nsession_id: ${sessionID}\n</task_metadata>`
}
// Extract content from ALL messages, not just the last one
// Tool results may be in earlier messages while the final message is empty
const extractedContent: string[] = []
for (const message of sortedMessages) {
for (const message of newMessages) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
for (const part of (message as any).parts ?? []) {
// Handle both "text" and "reasoning" parts (thinking models use "reasoning")