feat: emit boot-scoped session id in lane events

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
YeonGyu-Kim 2026-04-21 12:51:13 +09:00
parent 8b52e77f23
commit f55612ea47
5 changed files with 219 additions and 25 deletions

View File

@ -244,6 +244,7 @@ pub struct LaneEventBuilder {
event: LaneEventName,
status: LaneEventStatus,
emitted_at: String,
session_id: Option<String>,
metadata: LaneEventMetadata,
detail: Option<String>,
failure_class: Option<LaneFailureClass>,
@ -264,6 +265,7 @@ impl LaneEventBuilder {
event,
status,
emitted_at: emitted_at.into(),
session_id: None,
metadata: LaneEventMetadata::new(seq, provenance),
detail: None,
failure_class: None,
@ -278,6 +280,13 @@ impl LaneEventBuilder {
self
}
/// Add boot-scoped session correlation id
#[must_use]
pub fn with_session_id(mut self, session_id: impl Into<String>) -> Self {
self.session_id = Some(session_id.into());
self
}
/// Add ownership info
#[must_use]
pub fn with_ownership(mut self, ownership: LaneOwnership) -> Self {
@ -328,6 +337,7 @@ impl LaneEventBuilder {
event: self.event,
status: self.status,
emitted_at: self.emitted_at,
session_id: self.session_id,
failure_class: self.failure_class,
detail: self.detail,
data: self.data,
@ -405,7 +415,10 @@ pub enum BlockedSubphase {
#[serde(rename = "blocked.branch_freshness")]
BranchFreshness { behind_main: u32 },
#[serde(rename = "blocked.test_hang")]
TestHang { elapsed_secs: u32, test_name: Option<String> },
TestHang {
elapsed_secs: u32,
test_name: Option<String>,
},
#[serde(rename = "blocked.report_pending")]
ReportPending { since_secs: u32 },
}
@ -462,6 +475,8 @@ pub struct LaneEvent {
pub status: LaneEventStatus,
#[serde(rename = "emittedAt")]
pub emitted_at: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
#[serde(rename = "failureClass", skip_serializing_if = "Option::is_none")]
pub failure_class: Option<LaneFailureClass>,
#[serde(skip_serializing_if = "Option::is_none")]
@ -485,6 +500,7 @@ impl LaneEvent {
event,
status,
emitted_at: emitted_at.into(),
session_id: None,
failure_class: None,
detail: None,
data: None,
@ -543,7 +559,8 @@ impl LaneEvent {
.with_failure_class(blocker.failure_class)
.with_detail(blocker.detail.clone());
if let Some(ref subphase) = blocker.subphase {
event = event.with_data(serde_json::to_value(subphase).expect("subphase should serialize"));
event =
event.with_data(serde_json::to_value(subphase).expect("subphase should serialize"));
}
event
}
@ -554,7 +571,8 @@ impl LaneEvent {
.with_failure_class(blocker.failure_class)
.with_detail(blocker.detail.clone());
if let Some(ref subphase) = blocker.subphase {
event = event.with_data(serde_json::to_value(subphase).expect("subphase should serialize"));
event =
event.with_data(serde_json::to_value(subphase).expect("subphase should serialize"));
}
event
}
@ -562,8 +580,12 @@ impl LaneEvent {
/// Ship prepared — §4.44.5
#[must_use]
pub fn ship_prepared(emitted_at: impl Into<String>, provenance: &ShipProvenance) -> Self {
Self::new(LaneEventName::ShipPrepared, LaneEventStatus::Ready, emitted_at)
.with_data(serde_json::to_value(provenance).expect("ship provenance should serialize"))
Self::new(
LaneEventName::ShipPrepared,
LaneEventStatus::Ready,
emitted_at,
)
.with_data(serde_json::to_value(provenance).expect("ship provenance should serialize"))
}
/// Ship commits selected — §4.44.5
@ -573,22 +595,34 @@ impl LaneEvent {
commit_count: u32,
commit_range: impl Into<String>,
) -> Self {
Self::new(LaneEventName::ShipCommitsSelected, LaneEventStatus::Ready, emitted_at)
.with_detail(format!("{} commits: {}", commit_count, commit_range.into()))
Self::new(
LaneEventName::ShipCommitsSelected,
LaneEventStatus::Ready,
emitted_at,
)
.with_detail(format!("{} commits: {}", commit_count, commit_range.into()))
}
/// Ship merged — §4.44.5
#[must_use]
pub fn ship_merged(emitted_at: impl Into<String>, provenance: &ShipProvenance) -> Self {
Self::new(LaneEventName::ShipMerged, LaneEventStatus::Completed, emitted_at)
.with_data(serde_json::to_value(provenance).expect("ship provenance should serialize"))
Self::new(
LaneEventName::ShipMerged,
LaneEventStatus::Completed,
emitted_at,
)
.with_data(serde_json::to_value(provenance).expect("ship provenance should serialize"))
}
/// Ship pushed to main — §4.44.5
#[must_use]
pub fn ship_pushed_main(emitted_at: impl Into<String>, provenance: &ShipProvenance) -> Self {
Self::new(LaneEventName::ShipPushedMain, LaneEventStatus::Completed, emitted_at)
.with_data(serde_json::to_value(provenance).expect("ship provenance should serialize"))
Self::new(
LaneEventName::ShipPushedMain,
LaneEventStatus::Completed,
emitted_at,
)
.with_data(serde_json::to_value(provenance).expect("ship provenance should serialize"))
}
#[must_use]
@ -614,6 +648,12 @@ impl LaneEvent {
self.data = Some(data);
self
}
#[must_use]
pub fn with_session_id(mut self, session_id: impl Into<String>) -> Self {
self.session_id = Some(session_id.into());
self
}
}
#[must_use]
@ -1044,6 +1084,7 @@ mod tests {
42,
EventProvenance::Test,
)
.with_session_id("boot-abc123def4567890")
.with_session_identity(SessionIdentity::new("test-lane", "/tmp", "test"))
.with_ownership(LaneOwnership {
owner: "bot-1".to_string(),
@ -1055,6 +1096,7 @@ mod tests {
.build();
assert_eq!(event.event, LaneEventName::Started);
assert_eq!(event.session_id.as_deref(), Some("boot-abc123def4567890"));
assert_eq!(event.metadata.seq, 42);
assert_eq!(event.metadata.provenance, EventProvenance::Test);
assert_eq!(
@ -1084,4 +1126,34 @@ mod tests {
assert_eq!(round_trip.provenance, EventProvenance::Healthcheck);
assert_eq!(round_trip.nudge_id, Some("nudge-abc".to_string()));
}
#[test]
fn lane_event_session_id_round_trips_through_serialization() {
let event = LaneEventBuilder::new(
LaneEventName::Started,
LaneEventStatus::Running,
"2026-04-04T00:00:00Z",
1,
EventProvenance::LiveLane,
)
.with_session_id("boot-0123456789abcdef")
.build();
let json = serde_json::to_value(&event).expect("should serialize");
assert_eq!(json["session_id"], "boot-0123456789abcdef");
let round_trip: LaneEvent = serde_json::from_value(json).expect("should deserialize");
assert_eq!(
round_trip.session_id.as_deref(),
Some("boot-0123456789abcdef")
);
}
#[test]
fn lane_event_session_id_omits_field_when_absent() {
let event = LaneEvent::started("2026-04-04T00:00:00Z");
let json = serde_json::to_value(&event).expect("should serialize");
assert!(json.get("session_id").is_none());
}
}

View File

@ -36,6 +36,7 @@ mod remote;
pub mod sandbox;
mod session;
pub mod session_control;
mod session_identity;
pub use session_control::SessionStore;
mod sse;
pub mod stale_base;
@ -153,6 +154,9 @@ pub use session::{
ContentBlock, ConversationMessage, MessageRole, Session, SessionCompaction, SessionError,
SessionFork, SessionPromptEntry,
};
pub use session_identity::{
begin_session, current_boot_session_id, end_session, is_active_session,
};
pub use sse::{IncrementalSseParser, SseEvent};
pub use stale_base::{
check_base_commit, format_stale_base_warning, read_claw_base_file, resolve_expected_base,

View File

@ -0,0 +1,84 @@
use std::collections::hash_map::DefaultHasher;
use std::env;
use std::hash::{Hash, Hasher};
use std::process;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::OnceLock;
use std::time::{SystemTime, UNIX_EPOCH};
static BOOT_SESSION_ID: OnceLock<String> = OnceLock::new();
static BOOT_SESSION_COUNTER: AtomicU64 = AtomicU64::new(0);
static ACTIVE_SESSION: AtomicBool = AtomicBool::new(false);
#[must_use]
pub fn current_boot_session_id() -> &'static str {
BOOT_SESSION_ID.get_or_init(resolve_boot_session_id)
}
pub fn begin_session() {
ACTIVE_SESSION.store(true, Ordering::SeqCst);
}
pub fn end_session() {
ACTIVE_SESSION.store(false, Ordering::SeqCst);
}
#[must_use]
pub fn is_active_session() -> bool {
ACTIVE_SESSION.load(Ordering::SeqCst)
}
fn resolve_boot_session_id() -> String {
match env::var("CLAW_SESSION_ID") {
Ok(value) if !value.trim().is_empty() => value,
_ => generate_boot_session_id(),
}
}
fn generate_boot_session_id() -> String {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let counter = BOOT_SESSION_COUNTER.fetch_add(1, Ordering::Relaxed);
let mut hasher = DefaultHasher::new();
process::id().hash(&mut hasher);
nanos.hash(&mut hasher);
counter.hash(&mut hasher);
format!("boot-{:016x}", hasher.finish())
}
#[cfg(test)]
mod tests {
use super::{begin_session, current_boot_session_id, end_session, is_active_session};
#[test]
fn given_current_boot_session_id_when_called_twice_then_it_is_stable() {
let first = current_boot_session_id();
let second = current_boot_session_id();
assert_eq!(first, second);
assert!(first.starts_with("boot-"));
}
#[test]
fn given_current_boot_session_id_when_inspected_then_it_is_opaque_and_non_empty() {
let session_id = current_boot_session_id();
assert!(!session_id.trim().is_empty());
assert_eq!(session_id.len(), 21);
assert!(!session_id.contains(' '));
}
#[test]
fn given_begin_and_end_session_when_checked_then_active_state_toggles() {
end_session();
assert!(!is_active_session());
begin_session();
assert!(is_active_session());
end_session();
assert!(!is_active_session());
}
}

View File

@ -18,6 +18,8 @@ use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use crate::current_boot_session_id;
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
@ -768,6 +770,7 @@ fn push_event(
#[derive(serde::Serialize)]
struct StateSnapshot<'a> {
worker_id: &'a str,
session_id: &'a str,
status: WorkerStatus,
is_ready: bool,
trust_gate_cleared: bool,
@ -790,6 +793,7 @@ fn emit_state_file(worker: &Worker) {
let now = now_secs();
let snapshot = StateSnapshot {
worker_id: &worker.worker_id,
session_id: current_boot_session_id(),
status: worker.status,
is_ready: worker.status == WorkerStatus::ReadyForPrompt,
trust_gate_cleared: worker.trust_gate_cleared,
@ -1449,6 +1453,10 @@ mod tests {
Some("spawning"),
"initial status should be spawning"
);
assert_eq!(
value["session_id"].as_str(),
Some(current_boot_session_id())
);
assert_eq!(value["is_ready"].as_bool(), Some(false));
// Transition to ReadyForPrompt by observing trust-cleared text

View File

@ -11,8 +11,8 @@ use api::{
use plugins::PluginTool;
use reqwest::blocking::Client;
use runtime::{
check_freshness, dedupe_superseded_commit_events, edit_file, execute_bash, glob_search,
grep_search, load_system_prompt,
check_freshness, current_boot_session_id, dedupe_superseded_commit_events, edit_file,
execute_bash, glob_search, grep_search, load_system_prompt,
lsp_client::LspRegistry,
mcp_tool_bridge::McpToolRegistry,
permission_enforcer::{EnforcementResult, PermissionEnforcer},
@ -3535,7 +3535,9 @@ where
created_at: created_at.clone(),
started_at: Some(created_at),
completed_at: None,
lane_events: vec![LaneEvent::started(iso8601_now())],
lane_events: vec![
LaneEvent::started(iso8601_now()).with_session_id(current_boot_session_id())
],
current_blocker: None,
derived_state: String::from("working"),
error: None,
@ -3744,6 +3746,11 @@ fn persist_agent_terminal_state(
error: Option<String>,
) -> Result<(), String> {
let blocker = error.as_deref().map(classify_lane_blocker);
let session_id = manifest
.lane_events
.last()
.and_then(|event| event.session_id.clone())
.unwrap_or_else(|| current_boot_session_id().to_string());
append_agent_output(
&manifest.output_file,
&format_agent_terminal_output(status, result, blocker.as_ref(), error.as_deref()),
@ -3758,26 +3765,31 @@ fn persist_agent_terminal_state(
if let Some(blocker) = blocker {
next_manifest
.lane_events
.push(LaneEvent::blocked(iso8601_now(), &blocker));
.push(LaneEvent::blocked(iso8601_now(), &blocker).with_session_id(session_id.clone()));
next_manifest
.lane_events
.push(LaneEvent::failed(iso8601_now(), &blocker));
.push(LaneEvent::failed(iso8601_now(), &blocker).with_session_id(session_id.clone()));
} else {
next_manifest.current_blocker = None;
let mut finished_summary = build_lane_finished_summary(&next_manifest, result);
finished_summary.data.disabled_cron_ids = disable_matching_crons(&next_manifest, result);
next_manifest.lane_events.push(
LaneEvent::finished(iso8601_now(), finished_summary.detail).with_data(
serde_json::to_value(&finished_summary.data)
.expect("lane summary metadata should serialize"),
),
LaneEvent::finished(iso8601_now(), finished_summary.detail)
.with_data(
serde_json::to_value(&finished_summary.data)
.expect("lane summary metadata should serialize"),
)
.with_session_id(session_id.clone()),
);
if let Some(provenance) = maybe_commit_provenance(result) {
next_manifest.lane_events.push(LaneEvent::commit_created(
iso8601_now(),
Some(format!("commit {}", provenance.commit)),
provenance,
));
next_manifest.lane_events.push(
LaneEvent::commit_created(
iso8601_now(),
Some(format!("commit {}", provenance.commit)),
provenance,
)
.with_session_id(session_id),
);
}
}
write_agent_manifest(&next_manifest)
@ -7761,6 +7773,9 @@ mod tests {
assert!(manifest_contents.contains("\"status\": \"running\""));
assert_eq!(manifest_json["laneEvents"][0]["event"], "lane.started");
assert_eq!(manifest_json["laneEvents"][0]["status"], "running");
assert!(manifest_json["laneEvents"][0]["session_id"]
.as_str()
.is_some());
assert!(manifest_json["currentBlocker"].is_null());
let captured_job = captured
.lock()
@ -7838,10 +7853,17 @@ mod tests {
completed_manifest_json["laneEvents"][0]["event"],
"lane.started"
);
let session_id = completed_manifest_json["laneEvents"][0]["session_id"]
.as_str()
.expect("startup session_id should exist");
assert_eq!(
completed_manifest_json["laneEvents"][1]["event"],
"lane.finished"
);
assert_eq!(
completed_manifest_json["laneEvents"][1]["session_id"],
session_id
);
assert_eq!(
completed_manifest_json["laneEvents"][1]["data"]["qualityFloorApplied"],
false
@ -7854,6 +7876,10 @@ mod tests {
completed_manifest_json["laneEvents"][2]["event"],
"lane.commit.created"
);
assert_eq!(
completed_manifest_json["laneEvents"][2]["session_id"],
session_id
);
assert_eq!(
completed_manifest_json["laneEvents"][2]["data"]["commit"],
"abc1234"