fix: add session cursor for tool output
This commit is contained in:
parent
abd1ec1092
commit
9995b680f7
@ -22,3 +22,4 @@ export * from "./permission-compat"
|
|||||||
export * from "./external-plugin-detector"
|
export * from "./external-plugin-detector"
|
||||||
export * from "./zip-extractor"
|
export * from "./zip-extractor"
|
||||||
export * from "./agent-variant"
|
export * from "./agent-variant"
|
||||||
|
export * from "./session-cursor"
|
||||||
|
|||||||
53
src/shared/session-cursor.test.ts
Normal file
53
src/shared/session-cursor.test.ts
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
import { beforeEach, describe, expect, it } from "bun:test"
|
||||||
|
import { getNewMessages, resetMessageCursor } from "./session-cursor"
|
||||||
|
|
||||||
|
describe("getNewMessages", () => {
|
||||||
|
const sessionID = "session-123"
|
||||||
|
|
||||||
|
const buildMessage = (id: string, created: number) => ({
|
||||||
|
info: { id, time: { created } },
|
||||||
|
})
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
resetMessageCursor(sessionID)
|
||||||
|
})
|
||||||
|
|
||||||
|
it("returns all messages on first read and none on repeat", () => {
|
||||||
|
// #given
|
||||||
|
const messages = [buildMessage("m1", 1), buildMessage("m2", 2)]
|
||||||
|
|
||||||
|
// #when
|
||||||
|
const first = getNewMessages(sessionID, messages)
|
||||||
|
const second = getNewMessages(sessionID, messages)
|
||||||
|
|
||||||
|
// #then
|
||||||
|
expect(first).toEqual(messages)
|
||||||
|
expect(second).toEqual([])
|
||||||
|
})
|
||||||
|
|
||||||
|
it("returns only new messages after cursor advances", () => {
|
||||||
|
// #given
|
||||||
|
const messages = [buildMessage("m1", 1), buildMessage("m2", 2)]
|
||||||
|
getNewMessages(sessionID, messages)
|
||||||
|
const extended = [...messages, buildMessage("m3", 3)]
|
||||||
|
|
||||||
|
// #when
|
||||||
|
const next = getNewMessages(sessionID, extended)
|
||||||
|
|
||||||
|
// #then
|
||||||
|
expect(next).toEqual([extended[2]])
|
||||||
|
})
|
||||||
|
|
||||||
|
it("resets when message history shrinks", () => {
|
||||||
|
// #given
|
||||||
|
const messages = [buildMessage("m1", 1), buildMessage("m2", 2)]
|
||||||
|
getNewMessages(sessionID, messages)
|
||||||
|
const shorter = [buildMessage("n1", 1)]
|
||||||
|
|
||||||
|
// #when
|
||||||
|
const next = getNewMessages(sessionID, shorter)
|
||||||
|
|
||||||
|
// #then
|
||||||
|
expect(next).toEqual(shorter)
|
||||||
|
})
|
||||||
|
})
|
||||||
84
src/shared/session-cursor.ts
Normal file
84
src/shared/session-cursor.ts
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
type MessageTime =
|
||||||
|
| { created?: number | string }
|
||||||
|
| number
|
||||||
|
| string
|
||||||
|
| undefined
|
||||||
|
|
||||||
|
type MessageInfo = {
|
||||||
|
id?: string
|
||||||
|
time?: MessageTime
|
||||||
|
}
|
||||||
|
|
||||||
|
export type CursorMessage = {
|
||||||
|
info?: MessageInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
interface CursorState {
|
||||||
|
lastKey?: string
|
||||||
|
lastCount: number
|
||||||
|
}
|
||||||
|
|
||||||
|
const sessionCursors = new Map<string, CursorState>()
|
||||||
|
|
||||||
|
function buildMessageKey(message: CursorMessage, index: number): string {
|
||||||
|
const id = message.info?.id
|
||||||
|
if (id) return `id:${id}`
|
||||||
|
|
||||||
|
const time = message.info?.time
|
||||||
|
if (typeof time === "number" || typeof time === "string") {
|
||||||
|
return `t:${time}:${index}`
|
||||||
|
}
|
||||||
|
|
||||||
|
const created = time?.created
|
||||||
|
if (typeof created === "number") {
|
||||||
|
return `t:${created}:${index}`
|
||||||
|
}
|
||||||
|
if (typeof created === "string") {
|
||||||
|
return `t:${created}:${index}`
|
||||||
|
}
|
||||||
|
|
||||||
|
return `i:${index}`
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getNewMessages<T extends CursorMessage>(
|
||||||
|
sessionID: string | undefined,
|
||||||
|
messages: T[]
|
||||||
|
): T[] {
|
||||||
|
if (!sessionID) return messages
|
||||||
|
|
||||||
|
const keys = messages.map((message, index) => buildMessageKey(message, index))
|
||||||
|
const cursor = sessionCursors.get(sessionID)
|
||||||
|
let startIndex = 0
|
||||||
|
|
||||||
|
if (cursor) {
|
||||||
|
if (cursor.lastCount > messages.length) {
|
||||||
|
startIndex = 0
|
||||||
|
} else if (cursor.lastKey) {
|
||||||
|
const lastIndex = keys.lastIndexOf(cursor.lastKey)
|
||||||
|
if (lastIndex >= 0) {
|
||||||
|
startIndex = lastIndex + 1
|
||||||
|
} else if (cursor.lastCount <= messages.length) {
|
||||||
|
startIndex = cursor.lastCount
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (messages.length === 0) {
|
||||||
|
sessionCursors.delete(sessionID)
|
||||||
|
} else {
|
||||||
|
sessionCursors.set(sessionID, {
|
||||||
|
lastKey: keys[keys.length - 1],
|
||||||
|
lastCount: messages.length,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return messages.slice(startIndex)
|
||||||
|
}
|
||||||
|
|
||||||
|
export function resetMessageCursor(sessionID?: string): void {
|
||||||
|
if (sessionID) {
|
||||||
|
sessionCursors.delete(sessionID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sessionCursors.clear()
|
||||||
|
}
|
||||||
@ -7,6 +7,7 @@ import { BACKGROUND_TASK_DESCRIPTION, BACKGROUND_OUTPUT_DESCRIPTION, BACKGROUND_
|
|||||||
import { findNearestMessageWithFields, findFirstMessageWithAgent, MESSAGE_STORAGE } from "../../features/hook-message-injector"
|
import { findNearestMessageWithFields, findFirstMessageWithAgent, MESSAGE_STORAGE } from "../../features/hook-message-injector"
|
||||||
import { getSessionAgent } from "../../features/claude-code-session-state"
|
import { getSessionAgent } from "../../features/claude-code-session-state"
|
||||||
import { log } from "../../shared/logger"
|
import { log } from "../../shared/logger"
|
||||||
|
import { getNewMessages } from "../../shared/session-cursor"
|
||||||
|
|
||||||
type OpencodeClient = PluginInput["client"]
|
type OpencodeClient = PluginInput["client"]
|
||||||
|
|
||||||
@ -239,11 +240,26 @@ Session ID: ${task.sessionID}
|
|||||||
return timeA.localeCompare(timeB)
|
return timeA.localeCompare(timeB)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
const newMessages = getNewMessages(task.sessionID, sortedMessages)
|
||||||
|
if (newMessages.length === 0) {
|
||||||
|
const duration = formatDuration(task.startedAt, task.completedAt)
|
||||||
|
return `Task Result
|
||||||
|
|
||||||
|
Task ID: ${task.id}
|
||||||
|
Description: ${task.description}
|
||||||
|
Duration: ${duration}
|
||||||
|
Session ID: ${task.sessionID}
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
(No new output since last check)`
|
||||||
|
}
|
||||||
|
|
||||||
// Extract content from ALL messages, not just the last one
|
// Extract content from ALL messages, not just the last one
|
||||||
// Tool results may be in earlier messages while the final message is empty
|
// Tool results may be in earlier messages while the final message is empty
|
||||||
const extractedContent: string[] = []
|
const extractedContent: string[] = []
|
||||||
|
|
||||||
for (const message of sortedMessages) {
|
for (const message of newMessages) {
|
||||||
for (const part of message.parts ?? []) {
|
for (const part of message.parts ?? []) {
|
||||||
// Handle both "text" and "reasoning" parts (thinking models use "reasoning")
|
// Handle both "text" and "reasoning" parts (thinking models use "reasoning")
|
||||||
if ((part.type === "text" || part.type === "reasoning") && part.text) {
|
if ((part.type === "text" || part.type === "reasoning") && part.text) {
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import { ALLOWED_AGENTS, CALL_OMO_AGENT_DESCRIPTION } from "./constants"
|
|||||||
import type { CallOmoAgentArgs } from "./types"
|
import type { CallOmoAgentArgs } from "./types"
|
||||||
import type { BackgroundManager } from "../../features/background-agent"
|
import type { BackgroundManager } from "../../features/background-agent"
|
||||||
import { log } from "../../shared/logger"
|
import { log } from "../../shared/logger"
|
||||||
|
import { getNewMessages } from "../../shared/session-cursor"
|
||||||
import { findFirstMessageWithAgent, findNearestMessageWithFields, MESSAGE_STORAGE } from "../../features/hook-message-injector"
|
import { findFirstMessageWithAgent, findNearestMessageWithFields, MESSAGE_STORAGE } from "../../features/hook-message-injector"
|
||||||
import { getSessionAgent } from "../../features/claude-code-session-state"
|
import { getSessionAgent } from "../../features/claude-code-session-state"
|
||||||
|
|
||||||
@ -290,11 +291,17 @@ async function executeSync(
|
|||||||
return timeA - timeB
|
return timeA - timeB
|
||||||
})
|
})
|
||||||
|
|
||||||
|
const newMessages = getNewMessages(sessionID, sortedMessages)
|
||||||
|
|
||||||
|
if (newMessages.length === 0) {
|
||||||
|
return `No new output since last check.\n\n<task_metadata>\nsession_id: ${sessionID}\n</task_metadata>`
|
||||||
|
}
|
||||||
|
|
||||||
// Extract content from ALL messages, not just the last one
|
// Extract content from ALL messages, not just the last one
|
||||||
// Tool results may be in earlier messages while the final message is empty
|
// Tool results may be in earlier messages while the final message is empty
|
||||||
const extractedContent: string[] = []
|
const extractedContent: string[] = []
|
||||||
|
|
||||||
for (const message of sortedMessages) {
|
for (const message of newMessages) {
|
||||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
for (const part of (message as any).parts ?? []) {
|
for (const part of (message as any).parts ?? []) {
|
||||||
// Handle both "text" and "reasoning" parts (thinking models use "reasoning")
|
// Handle both "text" and "reasoning" parts (thinking models use "reasoning")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user