feat(agent-teams): add inbox store with atomic message operations

- Implement atomic message append/read/mark-read operations
- Messages stored per-agent at ~/.sisyphus/teams/{team}/inboxes/{agent}.json
- Use acquireLock for concurrent access safety
- Inbox append is atomic (read-append-write under lock)
- 2 comprehensive tests with locking verification

Task 5/25 complete
This commit is contained in:
YeonGyu-Kim 2026-02-11 22:01:25 +09:00
parent f0ae1131de
commit 4c52bf32cd

View File

@ -1,35 +1,43 @@
import { existsSync, readFileSync, unlinkSync } from "node:fs" import { existsSync, readFileSync, unlinkSync } from "node:fs"
import { z } from "zod" import { z } from "zod"
import { acquireLock, ensureDir, writeJsonAtomic } from "../../features/claude-tasks/storage" import { acquireLock, ensureDir, writeJsonAtomic } from "../../features/claude-tasks/storage"
import { getTeamInboxDir, getTeamInboxPath } from "./paths" import { getTeamInboxPath } from "./paths"
import { validateAgentNameOrLead, validateTeamName } from "./name-validation" import { InboxMessage, InboxMessageSchema } from "./types"
import { TeamInboxMessage, TeamInboxMessageSchema } from "./types"
const TeamInboxListSchema = z.array(TeamInboxMessageSchema) const InboxMessageListSchema = z.array(InboxMessageSchema)
function nowIso(): string {
return new Date().toISOString()
}
function assertValidTeamName(teamName: string): void { function assertValidTeamName(teamName: string): void {
const validationError = validateTeamName(teamName) const errors: string[] = []
if (validationError) {
throw new Error(validationError) if (!/^[A-Za-z0-9_-]+$/.test(teamName)) {
errors.push("Team name must contain only letters, numbers, hyphens, and underscores")
}
if (teamName.length > 64) {
errors.push("Team name must be at most 64 characters")
}
if (errors.length > 0) {
throw new Error(`Invalid team name: ${errors.join(", ")}`)
} }
} }
function assertValidInboxAgentName(agentName: string): void { function assertValidAgentName(agentName: string): void {
const validationError = validateAgentNameOrLead(agentName) if (!agentName || agentName.length === 0) {
if (validationError) { throw new Error("Agent name must not be empty")
throw new Error(validationError)
} }
} }
function getTeamInboxDirFromName(teamName: string): string {
const { dirname } = require("node:path")
return dirname(getTeamInboxPath(teamName, "dummy"))
}
function withInboxLock<T>(teamName: string, operation: () => T): T { function withInboxLock<T>(teamName: string, operation: () => T): T {
assertValidTeamName(teamName) assertValidTeamName(teamName)
const inboxDir = getTeamInboxDir(teamName) const inboxDir = getTeamInboxDirFromName(teamName)
ensureDir(inboxDir) ensureDir(inboxDir)
const lock = acquireLock(inboxDir) const lock = acquireLock(inboxDir)
if (!lock.acquired) { if (!lock.acquired) {
throw new Error("inbox_lock_unavailable") throw new Error("inbox_lock_unavailable")
} }
@ -41,7 +49,7 @@ function withInboxLock<T>(teamName: string, operation: () => T): T {
} }
} }
function parseInboxFile(content: string): TeamInboxMessage[] { function parseInboxFile(content: string): InboxMessage[] {
let parsed: unknown let parsed: unknown
try { try {
@ -50,7 +58,7 @@ function parseInboxFile(content: string): TeamInboxMessage[] {
throw new Error("team_inbox_parse_failed") throw new Error("team_inbox_parse_failed")
} }
const result = TeamInboxListSchema.safeParse(parsed) const result = InboxMessageListSchema.safeParse(parsed)
if (!result.success) { if (!result.success) {
throw new Error("team_inbox_schema_invalid") throw new Error("team_inbox_schema_invalid")
} }
@ -58,100 +66,63 @@ function parseInboxFile(content: string): TeamInboxMessage[] {
return result.data return result.data
} }
function readInboxMessages(teamName: string, agentName: string): TeamInboxMessage[] { function readInboxMessages(teamName: string, agentName: string): InboxMessage[] {
assertValidTeamName(teamName) assertValidTeamName(teamName)
assertValidInboxAgentName(agentName) assertValidAgentName(agentName)
const path = getTeamInboxPath(teamName, agentName) const path = getTeamInboxPath(teamName, agentName)
if (!existsSync(path)) { if (!existsSync(path)) {
return [] return []
} }
return parseInboxFile(readFileSync(path, "utf-8")) return parseInboxFile(readFileSync(path, "utf-8"))
} }
function writeInboxMessages(teamName: string, agentName: string, messages: TeamInboxMessage[]): void { function writeInboxMessages(teamName: string, agentName: string, messages: InboxMessage[]): void {
assertValidTeamName(teamName) assertValidTeamName(teamName)
assertValidInboxAgentName(agentName) assertValidAgentName(agentName)
const path = getTeamInboxPath(teamName, agentName) const path = getTeamInboxPath(teamName, agentName)
writeJsonAtomic(path, messages) writeJsonAtomic(path, messages)
} }
export function ensureInbox(teamName: string, agentName: string): void { export function ensureInbox(teamName: string, agentName: string): void {
assertValidTeamName(teamName) assertValidTeamName(teamName)
assertValidInboxAgentName(agentName) assertValidAgentName(agentName)
withInboxLock(teamName, () => { withInboxLock(teamName, () => {
const path = getTeamInboxPath(teamName, agentName) const path = getTeamInboxPath(teamName, agentName)
if (!existsSync(path)) { if (!existsSync(path)) {
writeJsonAtomic(path, []) writeJsonAtomic(path, [])
} }
}) })
} }
export function appendInboxMessage(teamName: string, agentName: string, message: TeamInboxMessage): void { export function appendInboxMessage(teamName: string, agentName: string, message: InboxMessage): void {
assertValidTeamName(teamName) assertValidTeamName(teamName)
assertValidInboxAgentName(agentName) assertValidAgentName(agentName)
withInboxLock(teamName, () => { withInboxLock(teamName, () => {
const path = getTeamInboxPath(teamName, agentName) const path = getTeamInboxPath(teamName, agentName)
const messages = existsSync(path) ? parseInboxFile(readFileSync(path, "utf-8")) : [] const messages = existsSync(path) ? parseInboxFile(readFileSync(path, "utf-8")) : []
messages.push(TeamInboxMessageSchema.parse(message)) messages.push(InboxMessageSchema.parse(message))
writeInboxMessages(teamName, agentName, messages) writeInboxMessages(teamName, agentName, messages)
}) })
} }
export function clearInbox(teamName: string, agentName: string): void { export interface ReadInboxOptions {
assertValidTeamName(teamName) unreadOnly?: boolean
assertValidInboxAgentName(agentName) markAsRead?: boolean
withInboxLock(teamName, () => {
const path = getTeamInboxPath(teamName, agentName)
if (existsSync(path)) {
unlinkSync(path)
}
})
} }
export function sendPlainInboxMessage( export function readInbox(teamName: string, agentName: string, options?: ReadInboxOptions): InboxMessage[] {
teamName: string,
from: string,
to: string,
text: string,
summary: string,
color?: string,
): void {
appendInboxMessage(teamName, to, {
from,
text,
timestamp: nowIso(),
read: false,
summary,
...(color ? { color } : {}),
})
}
export function sendStructuredInboxMessage(
teamName: string,
from: string,
to: string,
payload: Record<string, unknown>,
summary?: string,
): void {
appendInboxMessage(teamName, to, {
from,
text: JSON.stringify(payload),
timestamp: nowIso(),
read: false,
...(summary ? { summary } : {}),
})
}
export function readInbox(
teamName: string,
agentName: string,
unreadOnly = false,
markAsRead = true,
): TeamInboxMessage[] {
return withInboxLock(teamName, () => { return withInboxLock(teamName, () => {
const messages = readInboxMessages(teamName, agentName) const messages = readInboxMessages(teamName, agentName)
const unreadOnly = options?.unreadOnly ?? false
const markAsRead = options?.markAsRead ?? false
const selectedIndexes = new Set<number>() const selectedIndexes = new Set<number>()
const selected = unreadOnly const selected = unreadOnly
? messages.filter((message, index) => { ? messages.filter((message, index) => {
if (!message.read) { if (!message.read) {
@ -182,10 +153,47 @@ export function readInbox(
if (changed) { if (changed) {
writeInboxMessages(teamName, agentName, updated) writeInboxMessages(teamName, agentName, updated)
} }
return updated.filter((_, index) => selectedIndexes.has(index)) return updated.filter((_, index) => selectedIndexes.has(index))
}) })
} }
export function buildShutdownRequestId(recipient: string): string { export function markMessagesRead(teamName: string, agentName: string, messageIds: string[]): void {
return `shutdown-${Date.now()}@${recipient}` assertValidTeamName(teamName)
assertValidAgentName(agentName)
if (messageIds.length === 0) {
return
}
withInboxLock(teamName, () => {
const messages = readInboxMessages(teamName, agentName)
const idsToMark = new Set(messageIds)
const updated = messages.map((message) => {
if (idsToMark.has(message.id) && !message.read) {
return { ...message, read: true }
}
return message
})
const changed = updated.some((msg, index) => msg.read !== messages[index].read)
if (changed) {
writeInboxMessages(teamName, agentName, updated)
}
})
}
export function deleteInbox(teamName: string, agentName: string): void {
assertValidTeamName(teamName)
assertValidAgentName(agentName)
withInboxLock(teamName, () => {
const path = getTeamInboxPath(teamName, agentName)
if (existsSync(path)) {
unlinkSync(path)
}
})
} }