From 0513898b9dab4d088a7c55e26f2814f363695420 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Thu, 9 Apr 2026 09:02:39 -0700 Subject: [PATCH] feat: add otel export for ecc sessions --- ecc2/src/main.rs | 491 ++++++++++++++++++++++++++++++++++++++ ecc2/src/session/store.rs | 30 +++ 2 files changed, 521 insertions(+) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 699f22dd..7c0a2d39 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -250,6 +250,14 @@ enum Commands { #[arg(long)] json: bool, }, + /// Export sessions, tool spans, and metrics in OTLP-compatible JSON + ExportOtel { + /// Session ID or alias. Omit to export all sessions. + session_id: Option, + /// Write the export to a file instead of stdout + #[arg(long)] + output: Option, + }, /// Stop a running session Stop { /// Session ID or alias @@ -808,6 +816,21 @@ async fn main() -> Result<()> { println!("{}", format_prune_worktrees_human(&outcome)); } } + Some(Commands::ExportOtel { session_id, output }) => { + sync_runtime_session_metrics(&db, &cfg)?; + let resolved_session_id = session_id + .as_deref() + .map(|value| resolve_session_id(&db, value)) + .transpose()?; + let export = build_otel_export(&db, resolved_session_id.as_deref())?; + let rendered = serde_json::to_string_pretty(&export)?; + if let Some(path) = output { + std::fs::write(&path, rendered)?; + println!("OTLP export written to {}", path.display()); + } else { + println!("{rendered}"); + } + } Some(Commands::Stop { session_id }) => { session::manager::stop_session(&db, &session_id).await?; println!("Session stopped: {session_id}"); @@ -1081,6 +1104,93 @@ struct WorktreeResolutionReport { resolution_steps: Vec, } +#[derive(Debug, Clone, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] +struct OtlpExport { + resource_spans: Vec, +} + +#[derive(Debug, Clone, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] +struct OtlpResourceSpans { + resource: OtlpResource, + scope_spans: Vec, +} + +#[derive(Debug, Clone, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] +struct OtlpResource { + attributes: Vec, +} + +#[derive(Debug, Clone, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] +struct OtlpScopeSpans { + scope: OtlpInstrumentationScope, + spans: Vec, +} + +#[derive(Debug, Clone, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] +struct OtlpInstrumentationScope { + name: String, + version: String, +} + +#[derive(Debug, Clone, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] +struct OtlpSpan { + trace_id: String, + span_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + parent_span_id: Option, + name: String, + kind: String, + start_time_unix_nano: String, + end_time_unix_nano: String, + attributes: Vec, + #[serde(skip_serializing_if = "Vec::is_empty")] + links: Vec, + status: OtlpSpanStatus, +} + +#[derive(Debug, Clone, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] +struct OtlpSpanLink { + trace_id: String, + span_id: String, + #[serde(skip_serializing_if = "Vec::is_empty")] + attributes: Vec, +} + +#[derive(Debug, Clone, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] +struct OtlpSpanStatus { + code: String, + #[serde(skip_serializing_if = "Option::is_none")] + message: Option, +} + +#[derive(Debug, Clone, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] +struct OtlpKeyValue { + key: String, + value: OtlpAnyValue, +} + +#[derive(Debug, Clone, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] +struct OtlpAnyValue { + #[serde(skip_serializing_if = "Option::is_none")] + string_value: Option, + #[serde(skip_serializing_if = "Option::is_none")] + int_value: Option, + #[serde(skip_serializing_if = "Option::is_none")] + double_value: Option, + #[serde(skip_serializing_if = "Option::is_none")] + bool_value: Option, +} + fn build_worktree_status_report( session: &session::Session, include_patch: bool, @@ -1449,6 +1559,214 @@ fn format_prune_worktrees_human(outcome: &session::manager::WorktreePruneOutcome lines.join("\n") } +fn build_otel_export( + db: &session::store::StateStore, + session_id: Option<&str>, +) -> Result { + let sessions = if let Some(session_id) = session_id { + vec![db + .get_session(session_id)? + .ok_or_else(|| anyhow::anyhow!("Session not found: {session_id}"))?] + } else { + db.list_sessions()? + }; + + let mut spans = Vec::new(); + for session in &sessions { + spans.extend(build_session_otel_spans(db, session)?); + } + + Ok(OtlpExport { + resource_spans: vec![OtlpResourceSpans { + resource: OtlpResource { + attributes: vec![ + otlp_string_attr("service.name", "ecc2"), + otlp_string_attr("service.version", env!("CARGO_PKG_VERSION")), + otlp_string_attr("telemetry.sdk.language", "rust"), + ], + }, + scope_spans: vec![OtlpScopeSpans { + scope: OtlpInstrumentationScope { + name: "ecc2".to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + }, + spans, + }], + }], + }) +} + +fn build_session_otel_spans( + db: &session::store::StateStore, + session: &session::Session, +) -> Result> { + let trace_id = otlp_trace_id(&session.id); + let session_span_id = otlp_span_id(&format!("session:{}", session.id)); + let parent_link = db.latest_task_handoff_source(&session.id)?; + let session_end = session.updated_at.max(session.created_at); + let mut spans = vec![OtlpSpan { + trace_id: trace_id.clone(), + span_id: session_span_id.clone(), + parent_span_id: None, + name: format!("session {}", session.task), + kind: "SPAN_KIND_INTERNAL".to_string(), + start_time_unix_nano: otlp_timestamp_nanos(session.created_at), + end_time_unix_nano: otlp_timestamp_nanos(session_end), + attributes: vec![ + otlp_string_attr("ecc.session.id", &session.id), + otlp_string_attr("ecc.session.state", &session.state.to_string()), + otlp_string_attr("ecc.agent.type", &session.agent_type), + otlp_string_attr("ecc.session.task", &session.task), + otlp_string_attr( + "ecc.working_dir", + session.working_dir.to_string_lossy().as_ref(), + ), + otlp_int_attr("ecc.metrics.input_tokens", session.metrics.input_tokens), + otlp_int_attr("ecc.metrics.output_tokens", session.metrics.output_tokens), + otlp_int_attr("ecc.metrics.tokens_used", session.metrics.tokens_used), + otlp_int_attr("ecc.metrics.tool_calls", session.metrics.tool_calls), + otlp_int_attr( + "ecc.metrics.files_changed", + u64::from(session.metrics.files_changed), + ), + otlp_int_attr("ecc.metrics.duration_secs", session.metrics.duration_secs), + otlp_double_attr("ecc.metrics.cost_usd", session.metrics.cost_usd), + ], + links: parent_link + .into_iter() + .map(|parent_session_id| OtlpSpanLink { + trace_id: otlp_trace_id(&parent_session_id), + span_id: otlp_span_id(&format!("session:{parent_session_id}")), + attributes: vec![otlp_string_attr( + "ecc.parent_session.id", + &parent_session_id, + )], + }) + .collect(), + status: otlp_session_status(&session.state), + }]; + + for entry in db.list_tool_logs_for_session(&session.id)? { + let span_end = chrono::DateTime::parse_from_rfc3339(&entry.timestamp) + .unwrap_or_else(|_| session.updated_at.into()) + .with_timezone(&chrono::Utc); + let span_start = span_end - chrono::Duration::milliseconds(entry.duration_ms as i64); + + spans.push(OtlpSpan { + trace_id: trace_id.clone(), + span_id: otlp_span_id(&format!("tool:{}:{}", session.id, entry.id)), + parent_span_id: Some(session_span_id.clone()), + name: format!("tool {}", entry.tool_name), + kind: "SPAN_KIND_INTERNAL".to_string(), + start_time_unix_nano: otlp_timestamp_nanos(span_start), + end_time_unix_nano: otlp_timestamp_nanos(span_end), + attributes: vec![ + otlp_string_attr("ecc.session.id", &entry.session_id), + otlp_string_attr("tool.name", &entry.tool_name), + otlp_string_attr("tool.input_summary", &entry.input_summary), + otlp_string_attr("tool.output_summary", &entry.output_summary), + otlp_string_attr("tool.trigger_summary", &entry.trigger_summary), + otlp_string_attr("tool.input_params_json", &entry.input_params_json), + otlp_int_attr("tool.duration_ms", entry.duration_ms), + otlp_double_attr("tool.risk_score", entry.risk_score), + ], + links: Vec::new(), + status: OtlpSpanStatus { + code: "STATUS_CODE_UNSET".to_string(), + message: None, + }, + }); + } + + Ok(spans) +} + +fn otlp_timestamp_nanos(value: chrono::DateTime) -> String { + value + .timestamp_nanos_opt() + .unwrap_or_default() + .max(0) + .to_string() +} + +fn otlp_trace_id(seed: &str) -> String { + format!( + "{:016x}{:016x}", + fnv1a64(seed.as_bytes()), + fnv1a64_with_seed(seed.as_bytes(), 1099511628211) + ) +} + +fn otlp_span_id(seed: &str) -> String { + format!("{:016x}", fnv1a64(seed.as_bytes())) +} + +fn fnv1a64(bytes: &[u8]) -> u64 { + fnv1a64_with_seed(bytes, 14695981039346656037) +} + +fn fnv1a64_with_seed(bytes: &[u8], offset_basis: u64) -> u64 { + let mut hash = offset_basis; + for byte in bytes { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(1099511628211); + } + hash +} + +fn otlp_string_attr(key: &str, value: &str) -> OtlpKeyValue { + OtlpKeyValue { + key: key.to_string(), + value: OtlpAnyValue { + string_value: Some(value.to_string()), + int_value: None, + double_value: None, + bool_value: None, + }, + } +} + +fn otlp_int_attr(key: &str, value: u64) -> OtlpKeyValue { + OtlpKeyValue { + key: key.to_string(), + value: OtlpAnyValue { + string_value: None, + int_value: Some(value.to_string()), + double_value: None, + bool_value: None, + }, + } +} + +fn otlp_double_attr(key: &str, value: f64) -> OtlpKeyValue { + OtlpKeyValue { + key: key.to_string(), + value: OtlpAnyValue { + string_value: None, + int_value: None, + double_value: Some(value), + bool_value: None, + }, + } +} + +fn otlp_session_status(state: &session::SessionState) -> OtlpSpanStatus { + match state { + session::SessionState::Completed => OtlpSpanStatus { + code: "STATUS_CODE_OK".to_string(), + message: None, + }, + session::SessionState::Failed => OtlpSpanStatus { + code: "STATUS_CODE_ERROR".to_string(), + message: Some("session failed".to_string()), + }, + _ => OtlpSpanStatus { + code: "STATUS_CODE_UNSET".to_string(), + message: None, + }, + } +} + fn summarize_coordinate_backlog( outcome: &session::manager::CoordinateBacklogOutcome, ) -> CoordinateBacklogPassSummary { @@ -1556,6 +1874,66 @@ fn send_handoff_message(db: &session::store::StateStore, from_id: &str, to_id: & mod tests { use super::*; use crate::config::Config; + use crate::session::store::StateStore; + use crate::session::{Session, SessionMetrics, SessionState}; + use chrono::{Duration, Utc}; + use std::fs; + use std::path::{Path, PathBuf}; + + struct TestDir { + path: PathBuf, + } + + impl TestDir { + fn new(label: &str) -> Result { + let path = + std::env::temp_dir().join(format!("ecc2-main-{label}-{}", uuid::Uuid::new_v4())); + fs::create_dir_all(&path)?; + Ok(Self { path }) + } + + fn path(&self) -> &Path { + &self.path + } + } + + impl Drop for TestDir { + fn drop(&mut self) { + let _ = fs::remove_dir_all(&self.path); + } + } + + fn build_session(id: &str, task: &str, state: SessionState) -> Session { + let now = Utc::now(); + Session { + id: id.to_string(), + task: task.to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp/ecc"), + state, + pid: None, + worktree: None, + created_at: now - Duration::seconds(5), + updated_at: now, + last_heartbeat_at: now, + metrics: SessionMetrics { + input_tokens: 120, + output_tokens: 30, + tokens_used: 150, + tool_calls: 2, + files_changed: 1, + duration_secs: 5, + cost_usd: 0.42, + }, + } + } + + fn attr_value<'a>(attrs: &'a [OtlpKeyValue], key: &str) -> Option<&'a OtlpAnyValue> { + attrs + .iter() + .find(|attr| attr.key == key) + .map(|attr| &attr.value) + } #[test] fn worktree_policy_defaults_to_config_setting() { @@ -1598,6 +1976,26 @@ mod tests { } } + #[test] + fn cli_parses_export_otel_command() { + let cli = Cli::try_parse_from([ + "ecc", + "export-otel", + "worker-1234", + "--output", + "/tmp/ecc-otel.json", + ]) + .expect("export-otel should parse"); + + match cli.command { + Some(Commands::ExportOtel { session_id, output }) => { + assert_eq!(session_id.as_deref(), Some("worker-1234")); + assert_eq!(output.as_deref(), Some(Path::new("/tmp/ecc-otel.json"))); + } + _ => panic!("expected export-otel subcommand"), + } + } + #[test] fn cli_parses_messages_send_command() { let cli = Cli::try_parse_from([ @@ -1886,6 +2284,99 @@ mod tests { } } + #[test] + fn build_otel_export_includes_session_and_tool_spans() -> Result<()> { + let tempdir = TestDir::new("otel-export-session")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + let session = build_session("session-1", "Investigate export", SessionState::Completed); + db.insert_session(&session)?; + db.insert_tool_log( + &session.id, + "Write", + "Write src/lib.rs", + "{\"file\":\"src/lib.rs\"}", + "Updated file", + "manual test", + 120, + 0.75, + &Utc::now().to_rfc3339(), + )?; + + let export = build_otel_export(&db, Some("session-1"))?; + let spans = &export.resource_spans[0].scope_spans[0].spans; + assert_eq!(spans.len(), 2); + + let session_span = spans + .iter() + .find(|span| span.parent_span_id.is_none()) + .expect("session root span"); + let tool_span = spans + .iter() + .find(|span| span.parent_span_id.is_some()) + .expect("tool child span"); + + assert_eq!(session_span.trace_id, tool_span.trace_id); + assert_eq!( + tool_span.parent_span_id.as_deref(), + Some(session_span.span_id.as_str()) + ); + assert_eq!(session_span.status.code, "STATUS_CODE_OK"); + assert_eq!( + attr_value(&session_span.attributes, "ecc.session.id") + .and_then(|value| value.string_value.as_deref()), + Some("session-1") + ); + assert_eq!( + attr_value(&tool_span.attributes, "tool.name") + .and_then(|value| value.string_value.as_deref()), + Some("Write") + ); + assert_eq!( + attr_value(&tool_span.attributes, "tool.duration_ms") + .and_then(|value| value.int_value.as_deref()), + Some("120") + ); + + Ok(()) + } + + #[test] + fn build_otel_export_links_delegated_session_to_parent_trace() -> Result<()> { + let tempdir = TestDir::new("otel-export-parent-link")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + let parent = build_session("lead-1", "Lead task", SessionState::Running); + let child = build_session("worker-1", "Delegated task", SessionState::Running); + db.insert_session(&parent)?; + db.insert_session(&child)?; + db.send_message( + &parent.id, + &child.id, + "{\"task\":\"Delegated task\",\"context\":\"Delegated from lead\"}", + "task_handoff", + )?; + + let export = build_otel_export(&db, Some("worker-1"))?; + let session_span = export.resource_spans[0].scope_spans[0] + .spans + .iter() + .find(|span| span.parent_span_id.is_none()) + .expect("session root span"); + + assert_eq!(session_span.links.len(), 1); + assert_eq!(session_span.links[0].trace_id, otlp_trace_id("lead-1")); + assert_eq!( + session_span.links[0].span_id, + otlp_span_id("session:lead-1") + ); + assert_eq!( + attr_value(&session_span.links[0].attributes, "ecc.parent_session.id") + .and_then(|value| value.string_value.as_deref()), + Some("lead-1") + ); + + Ok(()) + } + #[test] fn cli_parses_worktree_status_check_flag() { let cli = Cli::try_parse_from(["ecc", "worktree-status", "--check"]) diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 3b3244ca..3963c6f0 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -1705,6 +1705,36 @@ impl StateStore { }) } + pub fn list_tool_logs_for_session(&self, session_id: &str) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT id, session_id, tool_name, input_summary, input_params_json, output_summary, trigger_summary, duration_ms, risk_score, timestamp + FROM tool_log + WHERE session_id = ?1 + ORDER BY timestamp ASC, id ASC", + )?; + + let entries = stmt + .query_map(rusqlite::params![session_id], |row| { + Ok(ToolLogEntry { + id: row.get(0)?, + session_id: row.get(1)?, + tool_name: row.get(2)?, + input_summary: row.get::<_, Option>(3)?.unwrap_or_default(), + input_params_json: row + .get::<_, Option>(4)? + .unwrap_or_else(|| "{}".to_string()), + output_summary: row.get::<_, Option>(5)?.unwrap_or_default(), + trigger_summary: row.get::<_, Option>(6)?.unwrap_or_default(), + duration_ms: row.get::<_, Option>(7)?.unwrap_or_default(), + risk_score: row.get::<_, Option>(8)?.unwrap_or_default(), + timestamp: row.get(9)?, + }) + })? + .collect::, _>>()?; + + Ok(entries) + } + pub fn list_file_activity( &self, session_id: &str,