diff --git a/rust/crates/runtime/src/lane_events.rs b/rust/crates/runtime/src/lane_events.rs index 2dcb042..33ab49d 100644 --- a/rust/crates/runtime/src/lane_events.rs +++ b/rust/crates/runtime/src/lane_events.rs @@ -244,6 +244,7 @@ pub struct LaneEventBuilder { event: LaneEventName, status: LaneEventStatus, emitted_at: String, + session_id: Option, metadata: LaneEventMetadata, detail: Option, failure_class: Option, @@ -264,6 +265,7 @@ impl LaneEventBuilder { event, status, emitted_at: emitted_at.into(), + session_id: None, metadata: LaneEventMetadata::new(seq, provenance), detail: None, failure_class: None, @@ -278,6 +280,13 @@ impl LaneEventBuilder { self } + /// Add boot-scoped session correlation id + #[must_use] + pub fn with_session_id(mut self, session_id: impl Into) -> Self { + self.session_id = Some(session_id.into()); + self + } + /// Add ownership info #[must_use] pub fn with_ownership(mut self, ownership: LaneOwnership) -> Self { @@ -328,6 +337,7 @@ impl LaneEventBuilder { event: self.event, status: self.status, emitted_at: self.emitted_at, + session_id: self.session_id, failure_class: self.failure_class, detail: self.detail, data: self.data, @@ -405,7 +415,10 @@ pub enum BlockedSubphase { #[serde(rename = "blocked.branch_freshness")] BranchFreshness { behind_main: u32 }, #[serde(rename = "blocked.test_hang")] - TestHang { elapsed_secs: u32, test_name: Option }, + TestHang { + elapsed_secs: u32, + test_name: Option, + }, #[serde(rename = "blocked.report_pending")] ReportPending { since_secs: u32 }, } @@ -462,6 +475,8 @@ pub struct LaneEvent { pub status: LaneEventStatus, #[serde(rename = "emittedAt")] pub emitted_at: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub session_id: Option, #[serde(rename = "failureClass", skip_serializing_if = "Option::is_none")] pub failure_class: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -485,6 +500,7 @@ impl LaneEvent { event, status, emitted_at: emitted_at.into(), + session_id: None, failure_class: None, detail: None, data: None, @@ -543,7 +559,8 @@ impl LaneEvent { .with_failure_class(blocker.failure_class) .with_detail(blocker.detail.clone()); if let Some(ref subphase) = blocker.subphase { - event = event.with_data(serde_json::to_value(subphase).expect("subphase should serialize")); + event = + event.with_data(serde_json::to_value(subphase).expect("subphase should serialize")); } event } @@ -554,7 +571,8 @@ impl LaneEvent { .with_failure_class(blocker.failure_class) .with_detail(blocker.detail.clone()); if let Some(ref subphase) = blocker.subphase { - event = event.with_data(serde_json::to_value(subphase).expect("subphase should serialize")); + event = + event.with_data(serde_json::to_value(subphase).expect("subphase should serialize")); } event } @@ -562,8 +580,12 @@ impl LaneEvent { /// Ship prepared — §4.44.5 #[must_use] pub fn ship_prepared(emitted_at: impl Into, provenance: &ShipProvenance) -> Self { - Self::new(LaneEventName::ShipPrepared, LaneEventStatus::Ready, emitted_at) - .with_data(serde_json::to_value(provenance).expect("ship provenance should serialize")) + Self::new( + LaneEventName::ShipPrepared, + LaneEventStatus::Ready, + emitted_at, + ) + .with_data(serde_json::to_value(provenance).expect("ship provenance should serialize")) } /// Ship commits selected — §4.44.5 @@ -573,22 +595,34 @@ impl LaneEvent { commit_count: u32, commit_range: impl Into, ) -> Self { - Self::new(LaneEventName::ShipCommitsSelected, LaneEventStatus::Ready, emitted_at) - .with_detail(format!("{} commits: {}", commit_count, commit_range.into())) + Self::new( + LaneEventName::ShipCommitsSelected, + LaneEventStatus::Ready, + emitted_at, + ) + .with_detail(format!("{} commits: {}", commit_count, commit_range.into())) } /// Ship merged — §4.44.5 #[must_use] pub fn ship_merged(emitted_at: impl Into, provenance: &ShipProvenance) -> Self { - Self::new(LaneEventName::ShipMerged, LaneEventStatus::Completed, emitted_at) - .with_data(serde_json::to_value(provenance).expect("ship provenance should serialize")) + Self::new( + LaneEventName::ShipMerged, + LaneEventStatus::Completed, + emitted_at, + ) + .with_data(serde_json::to_value(provenance).expect("ship provenance should serialize")) } /// Ship pushed to main — §4.44.5 #[must_use] pub fn ship_pushed_main(emitted_at: impl Into, provenance: &ShipProvenance) -> Self { - Self::new(LaneEventName::ShipPushedMain, LaneEventStatus::Completed, emitted_at) - .with_data(serde_json::to_value(provenance).expect("ship provenance should serialize")) + Self::new( + LaneEventName::ShipPushedMain, + LaneEventStatus::Completed, + emitted_at, + ) + .with_data(serde_json::to_value(provenance).expect("ship provenance should serialize")) } #[must_use] @@ -614,6 +648,12 @@ impl LaneEvent { self.data = Some(data); self } + + #[must_use] + pub fn with_session_id(mut self, session_id: impl Into) -> Self { + self.session_id = Some(session_id.into()); + self + } } #[must_use] @@ -1044,6 +1084,7 @@ mod tests { 42, EventProvenance::Test, ) + .with_session_id("boot-abc123def4567890") .with_session_identity(SessionIdentity::new("test-lane", "/tmp", "test")) .with_ownership(LaneOwnership { owner: "bot-1".to_string(), @@ -1055,6 +1096,7 @@ mod tests { .build(); assert_eq!(event.event, LaneEventName::Started); + assert_eq!(event.session_id.as_deref(), Some("boot-abc123def4567890")); assert_eq!(event.metadata.seq, 42); assert_eq!(event.metadata.provenance, EventProvenance::Test); assert_eq!( @@ -1084,4 +1126,34 @@ mod tests { assert_eq!(round_trip.provenance, EventProvenance::Healthcheck); assert_eq!(round_trip.nudge_id, Some("nudge-abc".to_string())); } + + #[test] + fn lane_event_session_id_round_trips_through_serialization() { + let event = LaneEventBuilder::new( + LaneEventName::Started, + LaneEventStatus::Running, + "2026-04-04T00:00:00Z", + 1, + EventProvenance::LiveLane, + ) + .with_session_id("boot-0123456789abcdef") + .build(); + + let json = serde_json::to_value(&event).expect("should serialize"); + assert_eq!(json["session_id"], "boot-0123456789abcdef"); + + let round_trip: LaneEvent = serde_json::from_value(json).expect("should deserialize"); + assert_eq!( + round_trip.session_id.as_deref(), + Some("boot-0123456789abcdef") + ); + } + + #[test] + fn lane_event_session_id_omits_field_when_absent() { + let event = LaneEvent::started("2026-04-04T00:00:00Z"); + let json = serde_json::to_value(&event).expect("should serialize"); + + assert!(json.get("session_id").is_none()); + } } diff --git a/rust/crates/runtime/src/lib.rs b/rust/crates/runtime/src/lib.rs index c7d8709..be018fe 100644 --- a/rust/crates/runtime/src/lib.rs +++ b/rust/crates/runtime/src/lib.rs @@ -36,6 +36,7 @@ mod remote; pub mod sandbox; mod session; pub mod session_control; +mod session_identity; pub use session_control::SessionStore; mod sse; pub mod stale_base; @@ -153,6 +154,9 @@ pub use session::{ ContentBlock, ConversationMessage, MessageRole, Session, SessionCompaction, SessionError, SessionFork, SessionPromptEntry, }; +pub use session_identity::{ + begin_session, current_boot_session_id, end_session, is_active_session, +}; pub use sse::{IncrementalSseParser, SseEvent}; pub use stale_base::{ check_base_commit, format_stale_base_warning, read_claw_base_file, resolve_expected_base, diff --git a/rust/crates/runtime/src/session_identity.rs b/rust/crates/runtime/src/session_identity.rs new file mode 100644 index 0000000..71d2044 --- /dev/null +++ b/rust/crates/runtime/src/session_identity.rs @@ -0,0 +1,84 @@ +use std::collections::hash_map::DefaultHasher; +use std::env; +use std::hash::{Hash, Hasher}; +use std::process; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::OnceLock; +use std::time::{SystemTime, UNIX_EPOCH}; + +static BOOT_SESSION_ID: OnceLock = OnceLock::new(); +static BOOT_SESSION_COUNTER: AtomicU64 = AtomicU64::new(0); +static ACTIVE_SESSION: AtomicBool = AtomicBool::new(false); + +#[must_use] +pub fn current_boot_session_id() -> &'static str { + BOOT_SESSION_ID.get_or_init(resolve_boot_session_id) +} + +pub fn begin_session() { + ACTIVE_SESSION.store(true, Ordering::SeqCst); +} + +pub fn end_session() { + ACTIVE_SESSION.store(false, Ordering::SeqCst); +} + +#[must_use] +pub fn is_active_session() -> bool { + ACTIVE_SESSION.load(Ordering::SeqCst) +} + +fn resolve_boot_session_id() -> String { + match env::var("CLAW_SESSION_ID") { + Ok(value) if !value.trim().is_empty() => value, + _ => generate_boot_session_id(), + } +} + +fn generate_boot_session_id() -> String { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + let counter = BOOT_SESSION_COUNTER.fetch_add(1, Ordering::Relaxed); + let mut hasher = DefaultHasher::new(); + process::id().hash(&mut hasher); + nanos.hash(&mut hasher); + counter.hash(&mut hasher); + format!("boot-{:016x}", hasher.finish()) +} + +#[cfg(test)] +mod tests { + use super::{begin_session, current_boot_session_id, end_session, is_active_session}; + + #[test] + fn given_current_boot_session_id_when_called_twice_then_it_is_stable() { + let first = current_boot_session_id(); + let second = current_boot_session_id(); + + assert_eq!(first, second); + assert!(first.starts_with("boot-")); + } + + #[test] + fn given_current_boot_session_id_when_inspected_then_it_is_opaque_and_non_empty() { + let session_id = current_boot_session_id(); + + assert!(!session_id.trim().is_empty()); + assert_eq!(session_id.len(), 21); + assert!(!session_id.contains(' ')); + } + + #[test] + fn given_begin_and_end_session_when_checked_then_active_state_toggles() { + end_session(); + assert!(!is_active_session()); + + begin_session(); + assert!(is_active_session()); + + end_session(); + assert!(!is_active_session()); + } +} diff --git a/rust/crates/runtime/src/worker_boot.rs b/rust/crates/runtime/src/worker_boot.rs index 9096990..d45780d 100644 --- a/rust/crates/runtime/src/worker_boot.rs +++ b/rust/crates/runtime/src/worker_boot.rs @@ -18,6 +18,8 @@ use std::time::{SystemTime, UNIX_EPOCH}; use serde::{Deserialize, Serialize}; +use crate::current_boot_session_id; + fn now_secs() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) @@ -768,6 +770,7 @@ fn push_event( #[derive(serde::Serialize)] struct StateSnapshot<'a> { worker_id: &'a str, + session_id: &'a str, status: WorkerStatus, is_ready: bool, trust_gate_cleared: bool, @@ -790,6 +793,7 @@ fn emit_state_file(worker: &Worker) { let now = now_secs(); let snapshot = StateSnapshot { worker_id: &worker.worker_id, + session_id: current_boot_session_id(), status: worker.status, is_ready: worker.status == WorkerStatus::ReadyForPrompt, trust_gate_cleared: worker.trust_gate_cleared, @@ -1449,6 +1453,10 @@ mod tests { Some("spawning"), "initial status should be spawning" ); + assert_eq!( + value["session_id"].as_str(), + Some(current_boot_session_id()) + ); assert_eq!(value["is_ready"].as_bool(), Some(false)); // Transition to ReadyForPrompt by observing trust-cleared text diff --git a/rust/crates/tools/src/lib.rs b/rust/crates/tools/src/lib.rs index 1890190..7583954 100644 --- a/rust/crates/tools/src/lib.rs +++ b/rust/crates/tools/src/lib.rs @@ -11,8 +11,8 @@ use api::{ use plugins::PluginTool; use reqwest::blocking::Client; use runtime::{ - check_freshness, dedupe_superseded_commit_events, edit_file, execute_bash, glob_search, - grep_search, load_system_prompt, + check_freshness, current_boot_session_id, dedupe_superseded_commit_events, edit_file, + execute_bash, glob_search, grep_search, load_system_prompt, lsp_client::LspRegistry, mcp_tool_bridge::McpToolRegistry, permission_enforcer::{EnforcementResult, PermissionEnforcer}, @@ -3535,7 +3535,9 @@ where created_at: created_at.clone(), started_at: Some(created_at), completed_at: None, - lane_events: vec![LaneEvent::started(iso8601_now())], + lane_events: vec![ + LaneEvent::started(iso8601_now()).with_session_id(current_boot_session_id()) + ], current_blocker: None, derived_state: String::from("working"), error: None, @@ -3744,6 +3746,11 @@ fn persist_agent_terminal_state( error: Option, ) -> Result<(), String> { let blocker = error.as_deref().map(classify_lane_blocker); + let session_id = manifest + .lane_events + .last() + .and_then(|event| event.session_id.clone()) + .unwrap_or_else(|| current_boot_session_id().to_string()); append_agent_output( &manifest.output_file, &format_agent_terminal_output(status, result, blocker.as_ref(), error.as_deref()), @@ -3758,26 +3765,31 @@ fn persist_agent_terminal_state( if let Some(blocker) = blocker { next_manifest .lane_events - .push(LaneEvent::blocked(iso8601_now(), &blocker)); + .push(LaneEvent::blocked(iso8601_now(), &blocker).with_session_id(session_id.clone())); next_manifest .lane_events - .push(LaneEvent::failed(iso8601_now(), &blocker)); + .push(LaneEvent::failed(iso8601_now(), &blocker).with_session_id(session_id.clone())); } else { next_manifest.current_blocker = None; let mut finished_summary = build_lane_finished_summary(&next_manifest, result); finished_summary.data.disabled_cron_ids = disable_matching_crons(&next_manifest, result); next_manifest.lane_events.push( - LaneEvent::finished(iso8601_now(), finished_summary.detail).with_data( - serde_json::to_value(&finished_summary.data) - .expect("lane summary metadata should serialize"), - ), + LaneEvent::finished(iso8601_now(), finished_summary.detail) + .with_data( + serde_json::to_value(&finished_summary.data) + .expect("lane summary metadata should serialize"), + ) + .with_session_id(session_id.clone()), ); if let Some(provenance) = maybe_commit_provenance(result) { - next_manifest.lane_events.push(LaneEvent::commit_created( - iso8601_now(), - Some(format!("commit {}", provenance.commit)), - provenance, - )); + next_manifest.lane_events.push( + LaneEvent::commit_created( + iso8601_now(), + Some(format!("commit {}", provenance.commit)), + provenance, + ) + .with_session_id(session_id), + ); } } write_agent_manifest(&next_manifest) @@ -7761,6 +7773,9 @@ mod tests { assert!(manifest_contents.contains("\"status\": \"running\"")); assert_eq!(manifest_json["laneEvents"][0]["event"], "lane.started"); assert_eq!(manifest_json["laneEvents"][0]["status"], "running"); + assert!(manifest_json["laneEvents"][0]["session_id"] + .as_str() + .is_some()); assert!(manifest_json["currentBlocker"].is_null()); let captured_job = captured .lock() @@ -7838,10 +7853,17 @@ mod tests { completed_manifest_json["laneEvents"][0]["event"], "lane.started" ); + let session_id = completed_manifest_json["laneEvents"][0]["session_id"] + .as_str() + .expect("startup session_id should exist"); assert_eq!( completed_manifest_json["laneEvents"][1]["event"], "lane.finished" ); + assert_eq!( + completed_manifest_json["laneEvents"][1]["session_id"], + session_id + ); assert_eq!( completed_manifest_json["laneEvents"][1]["data"]["qualityFloorApplied"], false @@ -7854,6 +7876,10 @@ mod tests { completed_manifest_json["laneEvents"][2]["event"], "lane.commit.created" ); + assert_eq!( + completed_manifest_json["laneEvents"][2]["session_id"], + session_id + ); assert_eq!( completed_manifest_json["laneEvents"][2]["data"]["commit"], "abc1234"