From 6f08e78456227c7f223e5ed9b5defa495bf48c9a Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Thu, 9 Apr 2026 06:47:28 -0700 Subject: [PATCH] feat: auto-pause ecc2 sessions when budgets are exceeded --- ecc2/src/config/mod.rs | 3 +- ecc2/src/main.rs | 1 + ecc2/src/session/manager.rs | 186 ++++++++++++++++++++++++++++++++++-- ecc2/src/session/store.rs | 16 +++- ecc2/src/tui/app.rs | 4 +- ecc2/src/tui/dashboard.rs | 163 +++++++++++++++++++++---------- 6 files changed, 310 insertions(+), 63 deletions(-) diff --git a/ecc2/src/config/mod.rs b/ecc2/src/config/mod.rs index 694e6a6d..58449d0a 100644 --- a/ecc2/src/config/mod.rs +++ b/ecc2/src/config/mod.rs @@ -227,7 +227,8 @@ impl PaneNavigationConfig { } fn shortcut_matches(spec: &str, key: KeyEvent) -> bool { - parse_shortcut(spec).is_some_and(|(modifiers, code)| key.modifiers == modifiers && key.code == code) + parse_shortcut(spec) + .is_some_and(|(modifiers, code)| key.modifiers == modifiers && key.code == code) } fn parse_shortcut(spec: &str) -> Option<(KeyModifiers, KeyCode)> { diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index c52b0096..c2398a56 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -899,6 +899,7 @@ fn sync_runtime_session_metrics( ) -> Result<()> { db.refresh_session_durations()?; db.sync_cost_tracker_metrics(&cfg.cost_metrics_path())?; + let _ = session::manager::enforce_budget_hard_limits(db, cfg)?; Ok(()) } diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index 1562d796..d64f5398 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -353,6 +353,56 @@ pub async fn stop_session(db: &StateStore, id: &str) -> Result<()> { stop_session_with_options(db, id, true).await } +#[derive(Debug, Clone, Default, Serialize, PartialEq)] +pub struct BudgetEnforcementOutcome { + pub token_budget_exceeded: bool, + pub cost_budget_exceeded: bool, + pub paused_sessions: Vec, +} + +impl BudgetEnforcementOutcome { + pub fn hard_limit_exceeded(&self) -> bool { + self.token_budget_exceeded || self.cost_budget_exceeded + } +} + +pub fn enforce_budget_hard_limits( + db: &StateStore, + cfg: &Config, +) -> Result { + let sessions = db.list_sessions()?; + let total_tokens = sessions + .iter() + .map(|session| session.metrics.tokens_used) + .sum::(); + let total_cost = sessions + .iter() + .map(|session| session.metrics.cost_usd) + .sum::(); + + let mut outcome = BudgetEnforcementOutcome { + token_budget_exceeded: cfg.token_budget > 0 && total_tokens >= cfg.token_budget, + cost_budget_exceeded: cfg.cost_budget_usd > 0.0 && total_cost >= cfg.cost_budget_usd, + paused_sessions: Vec::new(), + }; + + if !outcome.hard_limit_exceeded() { + return Ok(outcome); + } + + for session in sessions.into_iter().filter(|session| { + matches!( + session.state, + SessionState::Pending | SessionState::Running | SessionState::Idle + ) + }) { + stop_session_recorded(db, &session, false)?; + outcome.paused_sessions.push(session.id); + } + + Ok(outcome) +} + pub fn record_tool_call( db: &StateStore, session_id: &str, @@ -1175,9 +1225,12 @@ async fn stop_session_with_options( cleanup_worktree: bool, ) -> Result<()> { let session = resolve_session(db, id)?; + stop_session_recorded(db, &session, cleanup_worktree) +} +fn stop_session_recorded(db: &StateStore, session: &Session, cleanup_worktree: bool) -> Result<()> { if let Some(pid) = session.pid { - kill_process(pid).await?; + kill_process(pid)?; } db.update_pid(&session.id, None)?; @@ -1193,13 +1246,27 @@ async fn stop_session_with_options( } #[cfg(unix)] -async fn kill_process(pid: u32) -> Result<()> { +fn kill_process(pid: u32) -> Result<()> { send_signal(pid, libc::SIGTERM)?; - tokio::time::sleep(std::time::Duration::from_millis(1200)).await; + std::thread::sleep(std::time::Duration::from_millis(1200)); send_signal(pid, libc::SIGKILL)?; Ok(()) } +#[cfg(windows)] +fn kill_process(pid: u32) -> Result<()> { + let status = std::process::Command::new("taskkill") + .args(["/PID", &pid.to_string(), "/T", "/F"]) + .status() + .with_context(|| format!("Failed to invoke taskkill for process {pid}"))?; + + if status.success() { + Ok(()) + } else { + Err(anyhow::anyhow!("taskkill exited with status {status}")) + } +} + #[cfg(unix)] fn send_signal(pid: u32, signal: i32) -> Result<()> { let outcome = unsafe { libc::kill(pid as i32, signal) }; @@ -1416,9 +1483,7 @@ impl fmt::Display for SessionStatus { writeln!( f, "Tokens: {} total (in {} / out {})", - s.metrics.tokens_used, - s.metrics.input_tokens, - s.metrics.output_tokens + s.metrics.tokens_used, s.metrics.input_tokens, s.metrics.output_tokens )?; writeln!(f, "Tools: {}", s.metrics.tool_calls)?; writeln!(f, "Files: {}", s.metrics.files_changed)?; @@ -1885,6 +1950,115 @@ mod tests { Ok(()) } + #[test] + fn enforce_budget_hard_limits_stops_active_sessions_without_cleaning_worktrees() -> Result<()> { + let tempdir = TestDir::new("manager-budget-pause")?; + let mut cfg = build_config(tempdir.path()); + cfg.token_budget = 100; + cfg.cost_budget_usd = 0.0; + + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + let worktree_path = tempdir.path().join("keep-worktree"); + fs::create_dir_all(&worktree_path)?; + + db.insert_session(&Session { + id: "active-over-budget".to_string(), + task: "pause on hard limit".to_string(), + agent_type: "claude".to_string(), + working_dir: tempdir.path().to_path_buf(), + state: SessionState::Running, + pid: Some(999_999), + worktree: Some(crate::session::WorktreeInfo { + path: worktree_path.clone(), + branch: "ecc/active-over-budget".to_string(), + base_branch: "main".to_string(), + }), + created_at: now - Duration::minutes(1), + updated_at: now, + metrics: SessionMetrics::default(), + })?; + db.update_metrics( + "active-over-budget", + &SessionMetrics { + input_tokens: 90, + output_tokens: 30, + tokens_used: 120, + tool_calls: 0, + files_changed: 0, + duration_secs: 60, + cost_usd: 0.0, + }, + )?; + + let outcome = enforce_budget_hard_limits(&db, &cfg)?; + assert!(outcome.token_budget_exceeded); + assert!(!outcome.cost_budget_exceeded); + assert_eq!( + outcome.paused_sessions, + vec!["active-over-budget".to_string()] + ); + + let session = db + .get_session("active-over-budget")? + .context("session should still exist")?; + assert_eq!(session.state, SessionState::Stopped); + assert_eq!(session.pid, None); + assert!( + worktree_path.exists(), + "hard-limit pauses should preserve worktrees for resume" + ); + + Ok(()) + } + + #[test] + fn enforce_budget_hard_limits_ignores_inactive_sessions() -> Result<()> { + let tempdir = TestDir::new("manager-budget-ignore-inactive")?; + let mut cfg = build_config(tempdir.path()); + cfg.token_budget = 100; + cfg.cost_budget_usd = 0.0; + + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "completed-over-budget".to_string(), + task: "already done".to_string(), + agent_type: "claude".to_string(), + working_dir: tempdir.path().to_path_buf(), + state: SessionState::Completed, + pid: None, + worktree: None, + created_at: now - Duration::minutes(2), + updated_at: now - Duration::minutes(1), + metrics: SessionMetrics::default(), + })?; + db.update_metrics( + "completed-over-budget", + &SessionMetrics { + input_tokens: 90, + output_tokens: 30, + tokens_used: 120, + tool_calls: 0, + files_changed: 0, + duration_secs: 60, + cost_usd: 0.0, + }, + )?; + + let outcome = enforce_budget_hard_limits(&db, &cfg)?; + assert!(outcome.token_budget_exceeded); + assert!(outcome.paused_sessions.is_empty()); + + let session = db + .get_session("completed-over-budget")? + .context("completed session should still exist")?; + assert_eq!(session.state, SessionState::Completed); + + Ok(()) + } + #[tokio::test(flavor = "current_thread")] async fn resume_session_requeues_failed_session() -> Result<()> { let tempdir = TestDir::new("manager-resume-session")?; diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index d3f9da9c..f25c8f4b 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -545,7 +545,9 @@ impl StateStore { .with_timezone(&chrono::Utc); let effective_end = match state { SessionState::Pending | SessionState::Running | SessionState::Idle => now, - SessionState::Completed | SessionState::Failed | SessionState::Stopped => updated_at, + SessionState::Completed | SessionState::Failed | SessionState::Stopped => { + updated_at + } }; let duration_secs = effective_end .signed_duration_since(created_at) @@ -622,7 +624,9 @@ impl StateStore { rusqlite::params![ aggregate.input_tokens, aggregate.output_tokens, - aggregate.input_tokens.saturating_add(aggregate.output_tokens), + aggregate + .input_tokens + .saturating_add(aggregate.output_tokens), aggregate.cost_usd, session_id, ], @@ -1448,8 +1452,12 @@ mod tests { db.refresh_session_durations()?; - let running = db.get_session("running-1")?.expect("running session should exist"); - let completed = db.get_session("done-1")?.expect("completed session should exist"); + let running = db + .get_session("running-1")? + .expect("running session should exist"); + let completed = db + .get_session("done-1")? + .expect("completed session should exist"); assert!(running.metrics.duration_secs >= 95); assert!(completed.metrics.duration_secs >= 75); diff --git a/ecc2/src/tui/app.rs b/ecc2/src/tui/app.rs index 91248342..b1d936ee 100644 --- a/ecc2/src/tui/app.rs +++ b/ecc2/src/tui/app.rs @@ -53,7 +53,9 @@ pub async fn run(db: StateStore, cfg: Config) -> Result<()> { match (key.modifiers, key.code) { (KeyModifiers::CONTROL, KeyCode::Char('c')) => break, - (KeyModifiers::CONTROL, KeyCode::Char('w')) => dashboard.begin_pane_command_mode(), + (KeyModifiers::CONTROL, KeyCode::Char('w')) => { + dashboard.begin_pane_command_mode() + } (_, KeyCode::Char('q')) => break, _ if dashboard.handle_pane_navigation_key(key) => {} (_, KeyCode::Tab) => dashboard.next_pane(), diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index 5f92bf5b..c3186fa6 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -2746,27 +2746,33 @@ impl Dashboard { self.sync_from_store(); } - fn sync_runtime_metrics(&mut self) { + fn sync_runtime_metrics(&mut self) -> Option { if let Err(error) = self.db.refresh_session_durations() { tracing::warn!("Failed to refresh session durations: {error}"); } let metrics_path = self.cfg.cost_metrics_path(); let signature = cost_metrics_signature(&metrics_path); - if signature == self.last_cost_metrics_signature { - return; + if signature != self.last_cost_metrics_signature { + self.last_cost_metrics_signature = signature; + if signature.is_some() { + if let Err(error) = self.db.sync_cost_tracker_metrics(&metrics_path) { + tracing::warn!("Failed to sync cost tracker metrics: {error}"); + } + } } - self.last_cost_metrics_signature = signature; - if signature.is_some() { - if let Err(error) = self.db.sync_cost_tracker_metrics(&metrics_path) { - tracing::warn!("Failed to sync cost tracker metrics: {error}"); + match manager::enforce_budget_hard_limits(&self.db, &self.cfg) { + Ok(outcome) => Some(outcome), + Err(error) => { + tracing::warn!("Failed to enforce budget hard limits: {error}"); + None } } } fn sync_from_store(&mut self) { - self.sync_runtime_metrics(); + let budget_enforcement = self.sync_runtime_metrics(); let selected_id = self.selected_session_id().map(ToOwned::to_owned); self.sessions = match self.db.list_sessions() { Ok(sessions) => sessions, @@ -2794,6 +2800,56 @@ impl Dashboard { self.sync_selected_messages(); self.sync_selected_lineage(); self.refresh_logs(); + self.sync_budget_alerts(); + + if let Some(outcome) = + budget_enforcement.filter(|outcome| !outcome.paused_sessions.is_empty()) + { + self.set_operator_note(budget_auto_pause_note(&outcome)); + } + } + + fn sync_budget_alerts(&mut self) { + let aggregate = self.aggregate_usage(); + let thresholds = self.cfg.effective_budget_alert_thresholds(); + let current_state = aggregate.overall_state; + if current_state == self.last_budget_alert_state { + return; + } + + let previous_state = self.last_budget_alert_state; + self.last_budget_alert_state = current_state; + + if current_state <= previous_state { + return; + } + + let Some(summary_suffix) = current_state.summary_suffix(thresholds) else { + return; + }; + + let token_budget = if self.cfg.token_budget > 0 { + format!( + "{} / {}", + format_token_count(aggregate.total_tokens), + format_token_count(self.cfg.token_budget) + ) + } else { + format!("{} / no budget", format_token_count(aggregate.total_tokens)) + }; + let cost_budget = if self.cfg.cost_budget_usd > 0.0 { + format!( + "{} / {}", + format_currency(aggregate.total_cost_usd), + format_currency(self.cfg.cost_budget_usd) + ) + } else { + format!("{} / no budget", format_currency(aggregate.total_cost_usd)) + }; + + self.set_operator_note(format!( + "{summary_suffix} | tokens {token_budget} | cost {cost_budget}" + )); } fn sync_selection(&mut self) { @@ -4102,49 +4158,6 @@ impl Dashboard { (text, aggregate.overall_state.style()) } - fn sync_budget_alerts(&mut self) { - let aggregate = self.aggregate_usage(); - let thresholds = self.cfg.effective_budget_alert_thresholds(); - let current_state = aggregate.overall_state; - if current_state == self.last_budget_alert_state { - return; - } - - let previous_state = self.last_budget_alert_state; - self.last_budget_alert_state = current_state; - - if current_state <= previous_state { - return; - } - - let Some(summary_suffix) = current_state.summary_suffix(thresholds) else { - return; - }; - - let token_budget = if self.cfg.token_budget > 0 { - format!( - "{} / {}", - format_token_count(aggregate.total_tokens), - format_token_count(self.cfg.token_budget) - ) - } else { - format!("{} / no budget", format_token_count(aggregate.total_tokens)) - }; - let cost_budget = if self.cfg.cost_budget_usd > 0.0 { - format!( - "{} / {}", - format_currency(aggregate.total_cost_usd), - format_currency(self.cfg.cost_budget_usd) - ) - } else { - format!("{} / no budget", format_currency(aggregate.total_cost_usd)) - }; - - self.set_operator_note(format!( - "{summary_suffix} | tokens {token_budget} | cost {cost_budget}" - )); - } - fn attention_queue_items(&self, limit: usize) -> Vec { let mut items = Vec::new(); let suppress_inbox_attention = self @@ -5307,6 +5320,20 @@ fn session_state_color(state: &SessionState) -> Color { } } +fn budget_auto_pause_note(outcome: &manager::BudgetEnforcementOutcome) -> String { + let cause = match (outcome.token_budget_exceeded, outcome.cost_budget_exceeded) { + (true, true) => "token and cost budgets exceeded", + (true, false) => "token budget exceeded", + (false, true) => "cost budget exceeded", + (false, false) => "budget exceeded", + }; + + format!( + "{cause} | auto-paused {} active session(s)", + outcome.paused_sessions.len() + ) +} + fn format_session_id(id: &str) -> String { id.chars().take(8).collect() } @@ -7188,6 +7215,40 @@ diff --git a/src/next.rs b/src/next.rs assert_eq!(dashboard.last_budget_alert_state, BudgetState::Alert75); } + #[test] + fn refresh_auto_pauses_over_budget_sessions_and_sets_operator_note() { + let db = StateStore::open(Path::new(":memory:")).unwrap(); + let mut cfg = Config::default(); + cfg.token_budget = 100; + cfg.cost_budget_usd = 0.0; + + db.insert_session(&budget_session("sess-1", 120, 0.0)) + .expect("insert session"); + db.update_metrics( + "sess-1", + &SessionMetrics { + input_tokens: 90, + output_tokens: 30, + tokens_used: 120, + tool_calls: 0, + files_changed: 0, + duration_secs: 0, + cost_usd: 0.0, + }, + ) + .expect("persist metrics"); + + let mut dashboard = Dashboard::new(db, cfg); + dashboard.refresh(); + + assert_eq!(dashboard.sessions.len(), 1); + assert_eq!(dashboard.sessions[0].state, SessionState::Stopped); + assert_eq!( + dashboard.operator_note.as_deref(), + Some("token budget exceeded | auto-paused 1 active session(s)") + ); + } + #[test] fn new_session_task_uses_selected_session_context() { let dashboard = test_dashboard(