diff --git a/ecc2/src/config/mod.rs b/ecc2/src/config/mod.rs index 2f184caa..d24f61a2 100644 --- a/ecc2/src/config/mod.rs +++ b/ecc2/src/config/mod.rs @@ -37,6 +37,7 @@ pub struct Config { pub max_parallel_worktrees: usize, pub session_timeout_secs: u64, pub heartbeat_interval_secs: u64, + pub auto_terminate_stale_sessions: bool, pub default_agent: String, pub auto_dispatch_unread_handoffs: bool, pub auto_dispatch_limit_per_session: usize, @@ -91,6 +92,7 @@ impl Default for Config { max_parallel_worktrees: 6, session_timeout_secs: 3600, heartbeat_interval_secs: 30, + auto_terminate_stale_sessions: false, default_agent: "claude".to_string(), auto_dispatch_unread_handoffs: false, auto_dispatch_limit_per_session: 5, @@ -340,6 +342,7 @@ max_parallel_sessions = 8 max_parallel_worktrees = 6 session_timeout_secs = 3600 heartbeat_interval_secs = 30 +auto_terminate_stale_sessions = false default_agent = "claude" theme = "Dark" "#; @@ -377,6 +380,10 @@ theme = "Dark" config.auto_merge_ready_worktrees, defaults.auto_merge_ready_worktrees ); + assert_eq!( + config.auto_terminate_stale_sessions, + defaults.auto_terminate_stale_sessions + ); } #[test] diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 21ca4b6c..7966369a 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -900,6 +900,7 @@ fn sync_runtime_session_metrics( db.refresh_session_durations()?; db.sync_cost_tracker_metrics(&cfg.cost_metrics_path())?; db.sync_tool_activity_metrics(&cfg.tool_activity_metrics_path())?; + let _ = session::manager::enforce_session_heartbeats(db, cfg)?; let _ = session::manager::enforce_budget_hard_limits(db, cfg)?; Ok(()) } diff --git a/ecc2/src/observability/mod.rs b/ecc2/src/observability/mod.rs index 13e43657..586c4431 100644 --- a/ecc2/src/observability/mod.rs +++ b/ecc2/src/observability/mod.rs @@ -313,6 +313,7 @@ mod tests { worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), } } diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index c2783322..b8e4d7a3 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -22,10 +22,8 @@ pub async fn run(db: StateStore, cfg: Config) -> Result<()> { resume_crashed_sessions(&db)?; let heartbeat_interval = Duration::from_secs(cfg.heartbeat_interval_secs); - let timeout = Duration::from_secs(cfg.session_timeout_secs); - loop { - if let Err(e) = check_sessions(&db, timeout) { + if let Err(e) = check_sessions(&db, &cfg) { tracing::error!("Session check failed: {e}"); } @@ -82,25 +80,8 @@ where Ok(failed_sessions) } -fn check_sessions(db: &StateStore, timeout: Duration) -> Result<()> { - let sessions = db.list_sessions()?; - - for session in sessions { - if session.state != SessionState::Running { - continue; - } - - let elapsed = chrono::Utc::now() - .signed_duration_since(session.updated_at) - .to_std() - .unwrap_or(Duration::ZERO); - - if elapsed > timeout { - tracing::warn!("Session {} timed out after {:?}", session.id, elapsed); - db.update_state_and_pid(&session.id, &SessionState::Failed, None)?; - } - } - +fn check_sessions(db: &StateStore, cfg: &Config) -> Result<()> { + let _ = manager::enforce_session_heartbeats(db, cfg)?; Ok(()) } @@ -498,6 +479,7 @@ mod tests { worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), } } diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index d64f5398..58ed4dfe 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -68,6 +68,58 @@ pub fn get_team_status(db: &StateStore, id: &str, depth: usize) -> Result, + pub auto_terminated_sessions: Vec, +} + +pub fn enforce_session_heartbeats( + db: &StateStore, + cfg: &Config, +) -> Result { + enforce_session_heartbeats_with(db, cfg, kill_process) +} + +fn enforce_session_heartbeats_with( + db: &StateStore, + cfg: &Config, + terminate_pid: F, +) -> Result +where + F: Fn(u32) -> Result<()>, +{ + let timeout = chrono::Duration::seconds(cfg.session_timeout_secs as i64); + let now = chrono::Utc::now(); + let mut outcome = HeartbeatEnforcementOutcome::default(); + + for session in db.list_sessions()? { + if !matches!(session.state, SessionState::Running | SessionState::Stale) { + continue; + } + + if now.signed_duration_since(session.last_heartbeat_at) <= timeout { + continue; + } + + if cfg.auto_terminate_stale_sessions { + if let Some(pid) = session.pid { + let _ = terminate_pid(pid); + } + db.update_state_and_pid(&session.id, &SessionState::Failed, None)?; + outcome.auto_terminated_sessions.push(session.id); + continue; + } + + if session.state != SessionState::Stale { + db.update_state(&session.id, &SessionState::Stale)?; + outcome.stale_sessions.push(session.id); + } + } + + Ok(outcome) +} + pub async fn assign_session( db: &StateStore, cfg: &Config, @@ -685,7 +737,7 @@ pub async fn merge_session_worktree( if matches!( session.state, - SessionState::Pending | SessionState::Running | SessionState::Idle + SessionState::Pending | SessionState::Running | SessionState::Idle | SessionState::Stale ) { anyhow::bail!( "Cannot merge active session {} while it is {}", @@ -747,7 +799,10 @@ pub async fn merge_ready_worktrees( if matches!( session.state, - SessionState::Pending | SessionState::Running | SessionState::Idle + SessionState::Pending + | SessionState::Running + | SessionState::Idle + | SessionState::Stale ) { active_with_worktree_ids.push(session.id); continue; @@ -902,6 +957,7 @@ pub async fn run_session( session_id.to_string(), command, SessionOutputStore::default(), + std::time::Duration::from_secs(cfg.heartbeat_interval_secs), ) .await?; Ok(()) @@ -997,6 +1053,7 @@ fn build_session_record( worktree, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), }) } @@ -1488,6 +1545,15 @@ impl fmt::Display for SessionStatus { writeln!(f, "Tools: {}", s.metrics.tool_calls)?; writeln!(f, "Files: {}", s.metrics.files_changed)?; writeln!(f, "Cost: ${:.4}", s.metrics.cost_usd)?; + writeln!( + f, + "Heartbeat: {} ({}s ago)", + s.last_heartbeat_at, + chrono::Utc::now() + .signed_duration_since(s.last_heartbeat_at) + .num_seconds() + .max(0) + )?; if !self.delegated_children.is_empty() { writeln!(f, "Children: {}", self.delegated_children.join(", "))?; } @@ -1528,6 +1594,7 @@ impl fmt::Display for TeamStatus { for lane in [ "Running", "Idle", + "Stale", "Pending", "Failed", "Stopped", @@ -1676,6 +1743,7 @@ fn session_state_label(state: &SessionState) -> &'static str { SessionState::Pending => "Pending", SessionState::Running => "Running", SessionState::Idle => "Idle", + SessionState::Stale => "Stale", SessionState::Completed => "Completed", SessionState::Failed => "Failed", SessionState::Stopped => "Stopped", @@ -1727,6 +1795,7 @@ mod tests { max_parallel_worktrees: 4, session_timeout_secs: 60, heartbeat_interval_secs: 5, + auto_terminate_stale_sessions: false, default_agent: "claude".to_string(), auto_dispatch_unread_handoffs: false, auto_dispatch_limit_per_session: 5, @@ -1755,10 +1824,85 @@ mod tests { worktree: None, created_at: updated_at - Duration::minutes(1), updated_at, + last_heartbeat_at: updated_at, metrics: SessionMetrics::default(), } } + #[test] + fn enforce_session_heartbeats_marks_overdue_running_sessions_stale() -> Result<()> { + let tempdir = TestDir::new("manager-heartbeat-stale")?; + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "stale-1".to_string(), + task: "heartbeat overdue".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: SessionState::Running, + pid: Some(4242), + worktree: None, + created_at: now - Duration::minutes(5), + updated_at: now - Duration::minutes(5), + last_heartbeat_at: now - Duration::minutes(5), + metrics: SessionMetrics::default(), + })?; + + let outcome = enforce_session_heartbeats(&db, &cfg)?; + let session = db.get_session("stale-1")?.expect("session should exist"); + + assert_eq!(outcome.stale_sessions, vec!["stale-1".to_string()]); + assert!(outcome.auto_terminated_sessions.is_empty()); + assert_eq!(session.state, SessionState::Stale); + assert_eq!(session.pid, Some(4242)); + + Ok(()) + } + + #[test] + fn enforce_session_heartbeats_auto_terminates_when_enabled() -> Result<()> { + let tempdir = TestDir::new("manager-heartbeat-terminate")?; + let mut cfg = build_config(tempdir.path()); + cfg.auto_terminate_stale_sessions = true; + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + let killed = std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let killed_clone = killed.clone(); + + db.insert_session(&Session { + id: "stale-2".to_string(), + task: "terminate overdue".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: SessionState::Running, + pid: Some(7777), + worktree: None, + created_at: now - Duration::minutes(5), + updated_at: now - Duration::minutes(5), + last_heartbeat_at: now - Duration::minutes(5), + metrics: SessionMetrics::default(), + })?; + + let outcome = enforce_session_heartbeats_with(&db, &cfg, move |pid| { + killed_clone.lock().unwrap().push(pid); + Ok(()) + })?; + let session = db.get_session("stale-2")?.expect("session should exist"); + + assert!(outcome.stale_sessions.is_empty()); + assert_eq!( + outcome.auto_terminated_sessions, + vec!["stale-2".to_string()] + ); + assert_eq!(*killed.lock().unwrap(), vec![7777]); + assert_eq!(session.state, SessionState::Failed); + assert_eq!(session.pid, None); + + Ok(()) + } + fn build_daemon_activity() -> super::super::store::DaemonActivity { let now = Utc::now(); super::super::store::DaemonActivity { @@ -1976,6 +2120,7 @@ mod tests { }), created_at: now - Duration::minutes(1), updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; db.update_metrics( @@ -2032,6 +2177,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(2), updated_at: now - Duration::minutes(1), + last_heartbeat_at: now - Duration::minutes(1), metrics: SessionMetrics::default(), })?; db.update_metrics( @@ -2076,6 +2222,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(1), updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -2328,6 +2475,7 @@ mod tests { worktree: Some(merged_worktree.clone()), created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -2343,6 +2491,7 @@ mod tests { worktree: Some(active_worktree.clone()), created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -2359,6 +2508,7 @@ mod tests { worktree: Some(dirty_worktree.clone()), created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -2584,6 +2734,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(2), updated_at: now - Duration::minutes(2), + last_heartbeat_at: now - Duration::minutes(2), metrics: SessionMetrics::default(), })?; db.insert_session(&Session { @@ -2596,6 +2747,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(1), updated_at: now - Duration::minutes(1), + last_heartbeat_at: now - Duration::minutes(1), metrics: SessionMetrics::default(), })?; db.send_message( @@ -2651,6 +2803,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(3), updated_at: now - Duration::minutes(3), + last_heartbeat_at: now - Duration::minutes(3), metrics: SessionMetrics::default(), })?; db.insert_session(&Session { @@ -2663,6 +2816,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(2), updated_at: now - Duration::minutes(2), + last_heartbeat_at: now - Duration::minutes(2), metrics: SessionMetrics::default(), })?; db.send_message( @@ -2727,6 +2881,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(3), updated_at: now - Duration::minutes(3), + last_heartbeat_at: now - Duration::minutes(3), metrics: SessionMetrics::default(), })?; db.insert_session(&Session { @@ -2739,6 +2894,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(2), updated_at: now - Duration::minutes(2), + last_heartbeat_at: now - Duration::minutes(2), metrics: SessionMetrics::default(), })?; db.send_message( @@ -2794,6 +2950,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(3), updated_at: now - Duration::minutes(3), + last_heartbeat_at: now - Duration::minutes(3), metrics: SessionMetrics::default(), })?; db.insert_session(&Session { @@ -2806,6 +2963,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(2), updated_at: now - Duration::minutes(2), + last_heartbeat_at: now - Duration::minutes(2), metrics: SessionMetrics::default(), })?; db.send_message( @@ -2865,6 +3023,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(3), updated_at: now - Duration::minutes(3), + last_heartbeat_at: now - Duration::minutes(3), metrics: SessionMetrics::default(), })?; db.insert_session(&Session { @@ -2877,6 +3036,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(2), updated_at: now - Duration::minutes(2), + last_heartbeat_at: now - Duration::minutes(2), metrics: SessionMetrics::default(), })?; db.send_message( @@ -2930,6 +3090,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(3), updated_at: now - Duration::minutes(3), + last_heartbeat_at: now - Duration::minutes(3), metrics: SessionMetrics::default(), })?; @@ -2977,6 +3138,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(3), updated_at: now - Duration::minutes(3), + last_heartbeat_at: now - Duration::minutes(3), metrics: SessionMetrics::default(), })?; db.insert_session(&Session { @@ -2989,6 +3151,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(2), updated_at: now - Duration::minutes(2), + last_heartbeat_at: now - Duration::minutes(2), metrics: SessionMetrics::default(), })?; db.send_message( @@ -3044,6 +3207,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(3), updated_at: now - Duration::minutes(3), + last_heartbeat_at: now - Duration::minutes(3), metrics: SessionMetrics::default(), })?; } @@ -3103,6 +3267,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(3), updated_at: now - Duration::minutes(3), + last_heartbeat_at: now - Duration::minutes(3), metrics: SessionMetrics::default(), })?; } @@ -3154,6 +3319,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(3), updated_at: now - Duration::minutes(3), + last_heartbeat_at: now - Duration::minutes(3), metrics: SessionMetrics::default(), })?; @@ -3167,6 +3333,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(2), updated_at: now - Duration::minutes(2), + last_heartbeat_at: now - Duration::minutes(2), metrics: SessionMetrics::default(), })?; @@ -3222,6 +3389,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(4), updated_at: now - Duration::minutes(4), + last_heartbeat_at: now - Duration::minutes(4), metrics: SessionMetrics::default(), })?; db.insert_session(&Session { @@ -3234,6 +3402,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(3), updated_at: now - Duration::minutes(3), + last_heartbeat_at: now - Duration::minutes(3), metrics: SessionMetrics::default(), })?; db.insert_session(&Session { @@ -3246,6 +3415,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(2), updated_at: now - Duration::minutes(2), + last_heartbeat_at: now - Duration::minutes(2), metrics: SessionMetrics::default(), })?; @@ -3307,6 +3477,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(4), updated_at: now - Duration::minutes(4), + last_heartbeat_at: now - Duration::minutes(4), metrics: SessionMetrics::default(), })?; db.insert_session(&Session { @@ -3319,6 +3490,7 @@ mod tests { worktree: None, created_at: now - Duration::minutes(3), updated_at: now - Duration::minutes(3), + last_heartbeat_at: now - Duration::minutes(3), metrics: SessionMetrics::default(), })?; diff --git a/ecc2/src/session/mod.rs b/ecc2/src/session/mod.rs index 6d243858..653ca1bc 100644 --- a/ecc2/src/session/mod.rs +++ b/ecc2/src/session/mod.rs @@ -20,6 +20,7 @@ pub struct Session { pub worktree: Option, pub created_at: DateTime, pub updated_at: DateTime, + pub last_heartbeat_at: DateTime, pub metrics: SessionMetrics, } @@ -28,6 +29,7 @@ pub enum SessionState { Pending, Running, Idle, + Stale, Completed, Failed, Stopped, @@ -39,6 +41,7 @@ impl fmt::Display for SessionState { SessionState::Pending => write!(f, "pending"), SessionState::Running => write!(f, "running"), SessionState::Idle => write!(f, "idle"), + SessionState::Stale => write!(f, "stale"), SessionState::Completed => write!(f, "completed"), SessionState::Failed => write!(f, "failed"), SessionState::Stopped => write!(f, "stopped"), @@ -60,12 +63,21 @@ impl SessionState { ) | ( SessionState::Running, SessionState::Idle + | SessionState::Stale | SessionState::Completed | SessionState::Failed | SessionState::Stopped ) | ( SessionState::Idle, SessionState::Running + | SessionState::Stale + | SessionState::Completed + | SessionState::Failed + | SessionState::Stopped + ) | ( + SessionState::Stale, + SessionState::Running + | SessionState::Idle | SessionState::Completed | SessionState::Failed | SessionState::Stopped @@ -78,6 +90,7 @@ impl SessionState { match value { "running" => SessionState::Running, "idle" => SessionState::Idle, + "stale" => SessionState::Stale, "completed" => SessionState::Completed, "failed" => SessionState::Failed, "stopped" => SessionState::Stopped, diff --git a/ecc2/src/session/runtime.rs b/ecc2/src/session/runtime.rs index 3c75fe6d..8310a7e1 100644 --- a/ecc2/src/session/runtime.rs +++ b/ecc2/src/session/runtime.rs @@ -5,6 +5,7 @@ use anyhow::{Context, Result}; use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; use tokio::process::Command; use tokio::sync::{mpsc, oneshot}; +use tokio::time::{self, MissedTickBehavior}; use super::output::{OutputStream, SessionOutputStore}; use super::store::StateStore; @@ -26,6 +27,9 @@ enum DbMessage { line: String, ack: oneshot::Sender, }, + TouchHeartbeat { + ack: oneshot::Sender, + }, } #[derive(Clone)] @@ -53,6 +57,10 @@ impl DbWriter { .await } + async fn touch_heartbeat(&self) -> Result<()> { + self.send(|ack| DbMessage::TouchHeartbeat { ack }).await + } + async fn send(&self, build: F) -> Result<()> where F: FnOnce(oneshot::Sender) -> DbMessage, @@ -111,6 +119,17 @@ fn run_db_writer(db_path: PathBuf, session_id: String, mut rx: mpsc::UnboundedRe }; let _ = ack.send(result); } + DbMessage::TouchHeartbeat { ack } => { + let result = match opened.as_ref() { + Some(db) => db + .touch_heartbeat(&session_id) + .map_err(|error| error.to_string()), + None => Err(open_error + .clone() + .unwrap_or_else(|| "Failed to open state store".to_string())), + }; + let _ = ack.send(result); + } } } } @@ -120,6 +139,7 @@ pub async fn capture_command_output( session_id: String, mut command: Command, output_store: SessionOutputStore, + heartbeat_interval: std::time::Duration, ) -> Result { let db_writer = DbWriter::start(db_path, session_id.clone()); @@ -152,6 +172,19 @@ pub async fn capture_command_output( .ok_or_else(|| anyhow::anyhow!("Spawned process did not expose a process id"))?; db_writer.update_pid(Some(pid)).await?; db_writer.update_state(SessionState::Running).await?; + db_writer.touch_heartbeat().await?; + + let heartbeat_writer = db_writer.clone(); + let heartbeat_task = tokio::spawn(async move { + let mut ticker = time::interval(heartbeat_interval); + ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { + ticker.tick().await; + if heartbeat_writer.touch_heartbeat().await.is_err() { + break; + } + } + }); let stdout_task = tokio::spawn(capture_stream( session_id.clone(), @@ -169,6 +202,8 @@ pub async fn capture_command_output( )); let status = child.wait().await?; + heartbeat_task.abort(); + let _ = heartbeat_task.await; stdout_task.await??; stderr_task.await??; @@ -244,6 +279,7 @@ mod tests { worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -254,9 +290,14 @@ mod tests { .arg("-c") .arg("printf 'alpha\\n'; printf 'beta\\n' >&2"); - let status = - capture_command_output(db_path.clone(), session_id.clone(), command, output_store) - .await?; + let status = capture_command_output( + db_path.clone(), + session_id.clone(), + command, + output_store, + std::time::Duration::from_millis(10), + ) + .await?; assert!(status.success()); @@ -286,4 +327,49 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn capture_command_output_updates_heartbeat_for_quiet_processes() -> Result<()> { + let db_path = env::temp_dir().join(format!("ecc2-runtime-heartbeat-{}.db", Uuid::new_v4())); + let db = StateStore::open(&db_path)?; + let session_id = "session-heartbeat".to_string(); + let now = Utc::now(); + + db.insert_session(&Session { + id: session_id.clone(), + task: "quiet process".to_string(), + agent_type: "test".to_string(), + working_dir: env::temp_dir(), + state: SessionState::Pending, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + last_heartbeat_at: now, + metrics: SessionMetrics::default(), + })?; + + let mut command = Command::new("/bin/sh"); + command.arg("-c").arg("sleep 0.05"); + + let _ = capture_command_output( + db_path.clone(), + session_id.clone(), + command, + SessionOutputStore::default(), + std::time::Duration::from_millis(10), + ) + .await?; + + let db = StateStore::open(&db_path)?; + let session = db + .get_session(&session_id)? + .expect("session should still exist"); + + assert!(session.last_heartbeat_at > now); + assert_eq!(session.state, SessionState::Completed); + + let _ = std::fs::remove_file(db_path); + Ok(()) + } } diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 33ab6407..aaf2e38e 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -132,7 +132,8 @@ impl StateStore { duration_secs INTEGER DEFAULT 0, cost_usd REAL DEFAULT 0.0, created_at TEXT NOT NULL, - updated_at TEXT NOT NULL + updated_at TEXT NOT NULL, + last_heartbeat_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS tool_log ( @@ -240,6 +241,20 @@ impl StateStore { .context("Failed to add output_tokens column to sessions table")?; } + if !self.has_column("sessions", "last_heartbeat_at")? { + self.conn + .execute("ALTER TABLE sessions ADD COLUMN last_heartbeat_at TEXT", []) + .context("Failed to add last_heartbeat_at column to sessions table")?; + self.conn + .execute( + "UPDATE sessions + SET last_heartbeat_at = updated_at + WHERE last_heartbeat_at IS NULL", + [], + ) + .context("Failed to backfill last_heartbeat_at column")?; + } + if !self.has_column("tool_log", "hook_event_id")? { self.conn .execute("ALTER TABLE tool_log ADD COLUMN hook_event_id TEXT", []) @@ -404,8 +419,8 @@ impl StateStore { pub fn insert_session(&self, session: &Session) -> Result<()> { self.conn.execute( - "INSERT INTO sessions (id, task, agent_type, working_dir, state, pid, worktree_path, worktree_branch, worktree_base, created_at, updated_at) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", + "INSERT INTO sessions (id, task, agent_type, working_dir, state, pid, worktree_path, worktree_branch, worktree_base, created_at, updated_at, last_heartbeat_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)", rusqlite::params![ session.id, session.task, @@ -421,6 +436,7 @@ impl StateStore { session.worktree.as_ref().map(|w| w.base_branch.clone()), session.created_at.to_rfc3339(), session.updated_at.to_rfc3339(), + session.last_heartbeat_at.to_rfc3339(), ], )?; Ok(()) @@ -433,7 +449,12 @@ impl StateStore { pid: Option, ) -> Result<()> { let updated = self.conn.execute( - "UPDATE sessions SET state = ?1, pid = ?2, updated_at = ?3 WHERE id = ?4", + "UPDATE sessions + SET state = ?1, + pid = ?2, + updated_at = ?3, + last_heartbeat_at = ?3 + WHERE id = ?4", rusqlite::params![ state.to_string(), pid.map(i64::from), @@ -470,7 +491,11 @@ impl StateStore { } let updated = self.conn.execute( - "UPDATE sessions SET state = ?1, updated_at = ?2 WHERE id = ?3", + "UPDATE sessions + SET state = ?1, + updated_at = ?2, + last_heartbeat_at = ?2 + WHERE id = ?3", rusqlite::params![ state.to_string(), chrono::Utc::now().to_rfc3339(), @@ -487,7 +512,11 @@ impl StateStore { pub fn update_pid(&self, session_id: &str, pid: Option) -> Result<()> { let updated = self.conn.execute( - "UPDATE sessions SET pid = ?1, updated_at = ?2 WHERE id = ?3", + "UPDATE sessions + SET pid = ?1, + updated_at = ?2, + last_heartbeat_at = ?2 + WHERE id = ?3", rusqlite::params![ pid.map(i64::from), chrono::Utc::now().to_rfc3339(), @@ -505,7 +534,11 @@ impl StateStore { pub fn clear_worktree(&self, session_id: &str) -> Result<()> { let updated = self.conn.execute( "UPDATE sessions - SET worktree_path = NULL, worktree_branch = NULL, worktree_base = NULL, updated_at = ?1 + SET worktree_path = NULL, + worktree_branch = NULL, + worktree_base = NULL, + updated_at = ?1, + last_heartbeat_at = ?1 WHERE id = ?2", rusqlite::params![chrono::Utc::now().to_rfc3339(), session_id], )?; @@ -571,7 +604,10 @@ impl StateStore { .unwrap_or_default() .with_timezone(&chrono::Utc); let effective_end = match state { - SessionState::Pending | SessionState::Running | SessionState::Idle => now, + SessionState::Pending + | SessionState::Running + | SessionState::Idle + | SessionState::Stale => now, SessionState::Completed | SessionState::Failed | SessionState::Stopped => { updated_at } @@ -592,6 +628,20 @@ impl StateStore { Ok(()) } + pub fn touch_heartbeat(&self, session_id: &str) -> Result<()> { + let now = chrono::Utc::now().to_rfc3339(); + let updated = self.conn.execute( + "UPDATE sessions SET last_heartbeat_at = ?1 WHERE id = ?2", + rusqlite::params![now, session_id], + )?; + + if updated == 0 { + anyhow::bail!("Session not found: {session_id}"); + } + + Ok(()) + } + pub fn sync_cost_tracker_metrics(&self, metrics_path: &Path) -> Result<()> { if !metrics_path.exists() { return Ok(()); @@ -786,7 +836,11 @@ impl StateStore { pub fn increment_tool_calls(&self, session_id: &str) -> Result<()> { self.conn.execute( - "UPDATE sessions SET tool_calls = tool_calls + 1, updated_at = ?1 WHERE id = ?2", + "UPDATE sessions + SET tool_calls = tool_calls + 1, + updated_at = ?1, + last_heartbeat_at = ?1 + WHERE id = ?2", rusqlite::params![chrono::Utc::now().to_rfc3339(), session_id], )?; Ok(()) @@ -796,7 +850,7 @@ impl StateStore { let mut stmt = self.conn.prepare( "SELECT id, task, agent_type, working_dir, state, pid, worktree_path, worktree_branch, worktree_base, input_tokens, output_tokens, tokens_used, tool_calls, files_changed, duration_secs, cost_usd, - created_at, updated_at + created_at, updated_at, last_heartbeat_at FROM sessions ORDER BY updated_at DESC", )?; @@ -814,6 +868,7 @@ impl StateStore { let created_str: String = row.get(16)?; let updated_str: String = row.get(17)?; + let heartbeat_str: String = row.get(18)?; Ok(Session { id: row.get(0)?, @@ -829,6 +884,11 @@ impl StateStore { updated_at: chrono::DateTime::parse_from_rfc3339(&updated_str) .unwrap_or_default() .with_timezone(&chrono::Utc), + last_heartbeat_at: chrono::DateTime::parse_from_rfc3339(&heartbeat_str) + .unwrap_or_else(|_| { + chrono::DateTime::parse_from_rfc3339(&updated_str).unwrap_or_default() + }) + .with_timezone(&chrono::Utc), metrics: SessionMetrics { input_tokens: row.get(9)?, output_tokens: row.get(10)?, @@ -1299,7 +1359,10 @@ impl StateStore { )?; self.conn.execute( - "UPDATE sessions SET updated_at = ?1 WHERE id = ?2", + "UPDATE sessions + SET updated_at = ?1, + last_heartbeat_at = ?1 + WHERE id = ?2", rusqlite::params![chrono::Utc::now().to_rfc3339(), session_id], )?; @@ -1460,6 +1523,7 @@ mod tests { worktree: None, created_at: now - ChronoDuration::minutes(1), updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), } } @@ -1520,6 +1584,9 @@ mod tests { assert!(column_names.iter().any(|column| column == "pid")); assert!(column_names.iter().any(|column| column == "input_tokens")); assert!(column_names.iter().any(|column| column == "output_tokens")); + assert!(column_names + .iter() + .any(|column| column == "last_heartbeat_at")); Ok(()) } @@ -1539,6 +1606,7 @@ mod tests { worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -1583,6 +1651,7 @@ mod tests { worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; db.insert_session(&Session { @@ -1595,6 +1664,7 @@ mod tests { worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -1648,6 +1718,7 @@ mod tests { worktree: None, created_at: now - ChronoDuration::seconds(95), updated_at: now - ChronoDuration::seconds(1), + last_heartbeat_at: now - ChronoDuration::seconds(1), metrics: SessionMetrics::default(), })?; db.insert_session(&Session { @@ -1660,6 +1731,7 @@ mod tests { worktree: None, created_at: now - ChronoDuration::seconds(80), updated_at: now - ChronoDuration::seconds(5), + last_heartbeat_at: now - ChronoDuration::seconds(5), metrics: SessionMetrics::default(), })?; @@ -1678,6 +1750,36 @@ mod tests { Ok(()) } + #[test] + fn touch_heartbeat_updates_last_heartbeat_timestamp() -> Result<()> { + let tempdir = TestDir::new("store-touch-heartbeat")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + let now = Utc::now() - ChronoDuration::seconds(30); + + db.insert_session(&Session { + id: "session-1".to_string(), + task: "heartbeat".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: SessionState::Running, + pid: Some(1234), + worktree: None, + created_at: now, + updated_at: now, + last_heartbeat_at: now, + metrics: SessionMetrics::default(), + })?; + + db.touch_heartbeat("session-1")?; + + let session = db + .get_session("session-1")? + .expect("session should still exist"); + assert!(session.last_heartbeat_at > now); + + Ok(()) + } + #[test] fn append_output_line_keeps_latest_buffer_window() -> Result<()> { let tempdir = TestDir::new("store-output")?; @@ -1694,6 +1796,7 @@ mod tests { worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index 68f97b1a..63ba8984 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -112,6 +112,7 @@ struct SessionSummary { pending: usize, running: usize, idle: usize, + stale: usize, completed: usize, failed: usize, stopped: usize, @@ -266,6 +267,7 @@ struct TeamSummary { idle: usize, running: usize, pending: usize, + stale: usize, failed: usize, stopped: usize, } @@ -2753,7 +2755,12 @@ impl Dashboard { self.sync_from_store(); } - fn sync_runtime_metrics(&mut self) -> Option { + fn sync_runtime_metrics( + &mut self, + ) -> ( + Option, + Option, + ) { if let Err(error) = self.db.refresh_session_durations() { tracing::warn!("Failed to refresh session durations: {error}"); } @@ -2780,17 +2787,27 @@ impl Dashboard { } } - match manager::enforce_budget_hard_limits(&self.db, &self.cfg) { + let heartbeat_enforcement = match manager::enforce_session_heartbeats(&self.db, &self.cfg) { + Ok(outcome) => Some(outcome), + Err(error) => { + tracing::warn!("Failed to enforce session heartbeats: {error}"); + None + } + }; + + let budget_enforcement = match manager::enforce_budget_hard_limits(&self.db, &self.cfg) { Ok(outcome) => Some(outcome), Err(error) => { tracing::warn!("Failed to enforce budget hard limits: {error}"); None } - } + }; + + (heartbeat_enforcement, budget_enforcement) } fn sync_from_store(&mut self) { - let budget_enforcement = self.sync_runtime_metrics(); + let (heartbeat_enforcement, budget_enforcement) = self.sync_runtime_metrics(); let selected_id = self.selected_session_id().map(ToOwned::to_owned); self.sessions = match self.db.list_sessions() { Ok(sessions) => sessions, @@ -2825,6 +2842,11 @@ impl Dashboard { { self.set_operator_note(budget_auto_pause_note(&outcome)); } + if let Some(outcome) = heartbeat_enforcement.filter(|outcome| { + !outcome.stale_sessions.is_empty() || !outcome.auto_terminated_sessions.is_empty() + }) { + self.set_operator_note(heartbeat_enforcement_note(&outcome)); + } } fn sync_budget_alerts(&mut self) { @@ -3183,6 +3205,7 @@ impl Dashboard { SessionState::Pending => team.pending += 1, SessionState::Failed => team.failed += 1, SessionState::Stopped => team.stopped += 1, + SessionState::Stale => team.stale += 1, SessionState::Completed => {} } @@ -4239,7 +4262,10 @@ impl Dashboard { .filter(|session| { matches!( session.state, - SessionState::Pending | SessionState::Running | SessionState::Idle + SessionState::Pending + | SessionState::Running + | SessionState::Idle + | SessionState::Stale ) }) .count() @@ -4944,6 +4970,7 @@ impl SessionSummary { SessionState::Pending => summary.pending += 1, SessionState::Running => summary.running += 1, SessionState::Idle => summary.idle += 1, + SessionState::Stale => summary.stale += 1, SessionState::Completed => summary.completed += 1, SessionState::Failed => summary.failed += 1, SessionState::Stopped => summary.stopped += 1, @@ -4968,12 +4995,14 @@ fn session_row( approval_requests: usize, unread_messages: usize, ) -> Row<'static> { + let state_label = session_state_label(&session.state); + let state_color = session_state_color(&session.state); Row::new(vec![ Cell::from(format_session_id(&session.id)), Cell::from(session.agent_type.clone()), - Cell::from(session_state_label(&session.state)).style( + Cell::from(state_label).style( Style::default() - .fg(session_state_color(&session.state)) + .fg(state_color) .add_modifier(Modifier::BOLD), ), Cell::from(session_branch(session)), @@ -5016,6 +5045,7 @@ fn summary_line(summary: &SessionSummary) -> Line<'static> { ), summary_span("Running", summary.running, Color::Green), summary_span("Idle", summary.idle, Color::Yellow), + summary_span("Stale", summary.stale, Color::LightRed), summary_span("Completed", summary.completed, Color::Blue), summary_span("Failed", summary.failed, Color::Red), summary_span("Stopped", summary.stopped, Color::DarkGray), @@ -5052,6 +5082,7 @@ fn attention_queue_line(summary: &SessionSummary, stabilized: bool) -> Line<'sta if summary.failed == 0 && summary.stopped == 0 && summary.pending == 0 + && summary.stale == 0 && summary.unread_messages == 0 && summary.conflicted_worktrees == 0 { @@ -5086,6 +5117,7 @@ fn attention_queue_line(summary: &SessionSummary, stabilized: bool) -> Line<'sta } spans.extend([ + summary_span("Stale", summary.stale, Color::LightRed), summary_span("Backlog", summary.unread_messages, Color::Magenta), summary_span("Failed", summary.failed, Color::Red), summary_span("Stopped", summary.stopped, Color::DarkGray), @@ -5321,6 +5353,7 @@ fn session_state_label(state: &SessionState) -> &'static str { SessionState::Pending => "Pending", SessionState::Running => "Running", SessionState::Idle => "Idle", + SessionState::Stale => "Stale", SessionState::Completed => "Completed", SessionState::Failed => "Failed", SessionState::Stopped => "Stopped", @@ -5331,6 +5364,7 @@ fn session_state_color(state: &SessionState) -> Color { match state { SessionState::Running => Color::Green, SessionState::Idle => Color::Yellow, + SessionState::Stale => Color::LightRed, SessionState::Failed => Color::Red, SessionState::Stopped => Color::DarkGray, SessionState::Completed => Color::Blue, @@ -5338,6 +5372,20 @@ fn session_state_color(state: &SessionState) -> Color { } } +fn heartbeat_enforcement_note(outcome: &manager::HeartbeatEnforcementOutcome) -> String { + if !outcome.auto_terminated_sessions.is_empty() { + return format!( + "stale heartbeat detected | auto-terminated {} session(s)", + outcome.auto_terminated_sessions.len() + ); + } + + format!( + "stale heartbeat detected | flagged {} session(s) for attention", + outcome.stale_sessions.len() + ) +} + fn budget_auto_pause_note(outcome: &manager::BudgetEnforcementOutcome) -> String { let cause = match (outcome.token_budget_exceeded, outcome.cost_budget_exceeded) { (true, true) => "token and cost budgets exceeded", @@ -5436,6 +5484,7 @@ fn delegate_next_action(delegate: &DelegatedChildSummary) -> &'static str { SessionState::Pending => "wait for startup", SessionState::Running => "let it run", SessionState::Idle => "assign next task", + SessionState::Stale => "inspect stale heartbeat", SessionState::Failed => "inspect failure", SessionState::Stopped => "resume or reassign", SessionState::Completed => "merge or cleanup", @@ -5449,7 +5498,10 @@ fn delegate_attention_priority(delegate: &DelegatedChildSummary) -> u8 { if delegate.approval_backlog > 0 { return 1; } - if matches!(delegate.state, SessionState::Failed | SessionState::Stopped) { + if matches!( + delegate.state, + SessionState::Stale | SessionState::Failed | SessionState::Stopped + ) { return 2; } if delegate.handoff_backlog > 0 { @@ -5463,7 +5515,7 @@ fn delegate_attention_priority(delegate: &DelegatedChildSummary) -> u8 { SessionState::Running => 6, SessionState::Idle => 7, SessionState::Completed => 8, - SessionState::Failed | SessionState::Stopped => unreachable!(), + SessionState::Stale | SessionState::Failed | SessionState::Stopped => unreachable!(), } } @@ -6160,6 +6212,7 @@ diff --git a/src/next.rs b/src/next.rs idle: 1, running: 1, pending: 1, + stale: 0, failed: 0, stopped: 0, }); @@ -7285,6 +7338,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), }) .unwrap(); @@ -7308,6 +7362,39 @@ diff --git a/src/next.rs b/src/next.rs let _ = fs::remove_dir_all(tempdir); } + #[test] + fn refresh_flags_stale_sessions_and_sets_operator_note() { + let db = StateStore::open(Path::new(":memory:")).unwrap(); + let mut cfg = Config::default(); + cfg.session_timeout_secs = 60; + let now = Utc::now(); + + db.insert_session(&Session { + id: "stale-1".to_string(), + task: "stale session".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: SessionState::Running, + pid: Some(4242), + worktree: None, + created_at: now - Duration::minutes(5), + updated_at: now - Duration::minutes(5), + last_heartbeat_at: now - Duration::minutes(5), + metrics: SessionMetrics::default(), + }) + .unwrap(); + + let mut dashboard = Dashboard::new(db, cfg); + dashboard.refresh(); + + assert_eq!(dashboard.sessions.len(), 1); + assert_eq!(dashboard.sessions[0].state, SessionState::Stale); + assert_eq!( + dashboard.operator_note.as_deref(), + Some("stale heartbeat detected | flagged 1 session(s) for attention") + ); + } + #[test] fn new_session_task_uses_selected_session_context() { let dashboard = test_dashboard( @@ -7445,6 +7532,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -7458,6 +7546,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: None, created_at: now, updated_at: now + chrono::Duration::seconds(1), + last_heartbeat_at: now + chrono::Duration::seconds(1), metrics: SessionMetrics::default(), })?; @@ -7487,6 +7576,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -7529,6 +7619,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -8163,6 +8254,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -8200,6 +8292,7 @@ diff --git a/src/next.rs b/src/next.rs }), created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -8239,6 +8332,7 @@ diff --git a/src/next.rs b/src/next.rs }), created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -8275,6 +8369,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -8315,6 +8410,7 @@ diff --git a/src/next.rs b/src/next.rs }), created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; db.insert_session(&Session { @@ -8331,6 +8427,7 @@ diff --git a/src/next.rs b/src/next.rs }), created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -8380,6 +8477,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: Some(worktree.clone()), created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -8461,6 +8559,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: Some(merged_worktree.clone()), created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -8476,6 +8575,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: Some(active_worktree.clone()), created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -8519,6 +8619,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -8551,6 +8652,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -8583,6 +8685,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -8615,6 +8718,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -8647,6 +8751,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics::default(), })?; @@ -9243,6 +9348,7 @@ diff --git a/src/next.rs b/src/next.rs max_parallel_worktrees: 4, session_timeout_secs: 60, heartbeat_interval_secs: 5, + auto_terminate_stale_sessions: false, default_agent: "claude".to_string(), auto_dispatch_unread_handoffs: false, auto_dispatch_limit_per_session: 5, @@ -9307,6 +9413,7 @@ diff --git a/src/next.rs b/src/next.rs }), created_at: Utc::now(), updated_at: Utc::now(), + last_heartbeat_at: Utc::now(), metrics: SessionMetrics { input_tokens: tokens_used.saturating_mul(3) / 4, output_tokens: tokens_used / 4, @@ -9331,6 +9438,7 @@ diff --git a/src/next.rs b/src/next.rs worktree: None, created_at: now, updated_at: now, + last_heartbeat_at: now, metrics: SessionMetrics { input_tokens: tokens_used.saturating_mul(3) / 4, output_tokens: tokens_used / 4,