From 1c27f7b29ab5bad711499e92dff4089a648a470f Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Thu, 9 Apr 2026 04:42:13 -0700 Subject: [PATCH] feat: add ecc2 approval queue sidebar --- ecc2/src/session/store.rs | 92 ++++++++++++++++++ ecc2/src/tui/dashboard.rs | 199 +++++++++++++++++++++++++++++++++++--- 2 files changed, 279 insertions(+), 12 deletions(-) diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 128c7c4e..22362723 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -641,6 +641,51 @@ impl StateStore { Ok(counts) } + pub fn unread_approval_counts(&self) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT to_session, COUNT(*) + FROM messages + WHERE read = 0 AND msg_type IN ('query', 'conflict') + GROUP BY to_session", + )?; + + let counts = stmt + .query_map([], |row| { + Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)? as usize)) + })? + .collect::, _>>()?; + + Ok(counts) + } + + pub fn unread_approval_queue(&self, limit: usize) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT id, from_session, to_session, content, msg_type, read, timestamp + FROM messages + WHERE read = 0 AND msg_type IN ('query', 'conflict') + ORDER BY id ASC + LIMIT ?1", + )?; + + let messages = stmt.query_map(rusqlite::params![limit as i64], |row| { + let timestamp: String = row.get(6)?; + + Ok(SessionMessage { + id: row.get(0)?, + from_session: row.get(1)?, + to_session: row.get(2)?, + content: row.get(3)?, + msg_type: row.get(4)?, + read: row.get::<_, i64>(5)? != 0, + timestamp: chrono::DateTime::parse_from_rfc3339(×tamp) + .unwrap_or_default() + .with_timezone(&chrono::Utc), + }) + })?; + + messages.collect::, _>>().map_err(Into::into) + } + pub fn unread_task_handoffs_for_session( &self, session_id: &str, @@ -1274,6 +1319,53 @@ mod tests { Ok(()) } + #[test] + fn approval_queue_counts_only_queries_and_conflicts() -> Result<()> { + let tempdir = TestDir::new("store-approval-queue")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + + db.insert_session(&build_session("planner", SessionState::Running))?; + db.insert_session(&build_session("worker", SessionState::Pending))?; + db.insert_session(&build_session("worker-2", SessionState::Pending))?; + + db.send_message( + "planner", + "worker", + "{\"question\":\"Need operator approval\"}", + "query", + )?; + db.send_message( + "planner", + "worker", + "{\"file\":\"src/main.rs\",\"description\":\"Merge conflict\"}", + "conflict", + )?; + db.send_message( + "worker", + "planner", + "{\"summary\":\"Finished pass\",\"files_changed\":[]}", + "completed", + )?; + db.send_message( + "planner", + "worker-2", + "{\"task\":\"Review auth flow\",\"context\":\"Delegated from planner\"}", + "task_handoff", + )?; + + let counts = db.unread_approval_counts()?; + assert_eq!(counts.get("worker"), Some(&2)); + assert_eq!(counts.get("planner"), None); + assert_eq!(counts.get("worker-2"), None); + + let queue = db.unread_approval_queue(10)?; + assert_eq!(queue.len(), 2); + assert_eq!(queue[0].msg_type, "query"); + assert_eq!(queue[1].msg_type, "conflict"); + + Ok(()) + } + #[test] fn daemon_activity_round_trips_latest_passes() -> Result<()> { let tempdir = TestDir::new("store-daemon-activity")?; diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index 171e87d1..85d51bb0 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -55,6 +55,8 @@ pub struct Dashboard { sessions: Vec, session_output_cache: HashMap>, unread_message_counts: HashMap, + approval_queue_counts: HashMap, + approval_queue_preview: Vec, handoff_backlog_counts: HashMap, worktree_health_by_session: HashMap, global_handoff_backlog_leads: usize, @@ -229,6 +231,8 @@ impl Dashboard { sessions, session_output_cache: HashMap::new(), unread_message_counts: HashMap::new(), + approval_queue_counts: HashMap::new(), + approval_queue_preview: Vec::new(), handoff_backlog_counts: HashMap::new(), worktree_health_by_session: HashMap::new(), global_handoff_backlog_leads: 0, @@ -358,22 +362,31 @@ impl Dashboard { &self.worktree_health_by_session, stabilized, ); + let mut overview_lines = vec![ + summary_line(&summary), + attention_queue_line(&summary, stabilized), + approval_queue_line(&self.approval_queue_counts), + ]; + if let Some(preview) = approval_queue_preview_line(&self.approval_queue_preview) { + overview_lines.push(preview); + } let chunks = Layout::default() .direction(Direction::Vertical) - .constraints([Constraint::Length(2), Constraint::Min(3)]) + .constraints([ + Constraint::Length(overview_lines.len() as u16), + Constraint::Min(3), + ]) .split(inner_area); - frame.render_widget( - Paragraph::new(vec![ - summary_line(&summary), - attention_queue_line(&summary, stabilized), - ]), - chunks[0], - ); + frame.render_widget(Paragraph::new(overview_lines), chunks[0]); let rows = self.sessions.iter().map(|session| { session_row( session, + self.approval_queue_counts + .get(&session.id) + .copied() + .unwrap_or(0), self.handoff_backlog_counts .get(&session.id) .copied() @@ -381,7 +394,14 @@ impl Dashboard { ) }); let header = Row::new([ - "ID", "Agent", "State", "Branch", "Backlog", "Tokens", "Duration", + "ID", + "Agent", + "State", + "Branch", + "Approvals", + "Backlog", + "Tokens", + "Duration", ]) .style(Style::default().add_modifier(Modifier::BOLD)); let widths = [ @@ -389,6 +409,7 @@ impl Dashboard { Constraint::Length(10), Constraint::Length(10), Constraint::Min(12), + Constraint::Length(10), Constraint::Length(7), Constraint::Length(8), Constraint::Length(8), @@ -2216,6 +2237,23 @@ impl Dashboard { } } + fn sync_approval_queue(&mut self) { + self.approval_queue_counts = match self.db.unread_approval_counts() { + Ok(counts) => counts, + Err(error) => { + tracing::warn!("Failed to refresh approval queue counts: {error}"); + HashMap::new() + } + }; + self.approval_queue_preview = match self.db.unread_approval_queue(3) { + Ok(messages) => messages, + Err(error) => { + tracing::warn!("Failed to refresh approval queue preview: {error}"); + Vec::new() + } + }; + } + fn sync_handoff_backlog_counts(&mut self) { let limit = self.sessions.len().max(1); self.handoff_backlog_counts.clear(); @@ -2308,6 +2346,7 @@ impl Dashboard { fn sync_selected_messages(&mut self) { let Some(session_id) = self.selected_session_id().map(ToOwned::to_owned) else { self.selected_messages.clear(); + self.sync_approval_queue(); return; }; @@ -2337,6 +2376,8 @@ impl Dashboard { Vec::new() } }; + + self.sync_approval_queue(); } fn sync_selected_lineage(&mut self) { @@ -3620,7 +3661,11 @@ impl SessionSummary { } } -fn session_row(session: &Session, unread_messages: usize) -> Row<'static> { +fn session_row( + session: &Session, + approval_requests: usize, + unread_messages: usize, +) -> Row<'static> { Row::new(vec![ Cell::from(format_session_id(&session.id)), Cell::from(session.agent_type.clone()), @@ -3630,6 +3675,18 @@ fn session_row(session: &Session, unread_messages: usize) -> Row<'static> { .add_modifier(Modifier::BOLD), ), Cell::from(session_branch(session)), + Cell::from(if approval_requests == 0 { + "-".to_string() + } else { + approval_requests.to_string() + }) + .style(if approval_requests == 0 { + Style::default() + } else { + Style::default() + .fg(Color::Yellow) + .add_modifier(Modifier::BOLD) + }), Cell::from(if unread_messages == 0 { "-".to_string() } else { @@ -3734,6 +3791,49 @@ fn attention_queue_line(summary: &SessionSummary, stabilized: bool) -> Line<'sta Line::from(spans) } +fn approval_queue_line(approval_queue_counts: &HashMap) -> Line<'static> { + let pending_sessions = approval_queue_counts.len(); + let pending_items: usize = approval_queue_counts.values().sum(); + + if pending_items == 0 { + return Line::from(vec![ + Span::styled( + "Approval queue clear", + Style::default() + .fg(Color::Green) + .add_modifier(Modifier::BOLD), + ), + Span::raw(" no unanswered queries or conflicts"), + ]); + } + + Line::from(vec![ + Span::styled( + "Approval queue ", + Style::default() + .fg(Color::Yellow) + .add_modifier(Modifier::BOLD), + ), + summary_span("Pending", pending_items, Color::Yellow), + summary_span("Sessions", pending_sessions, Color::Yellow), + ]) +} + +fn approval_queue_preview_line(messages: &[SessionMessage]) -> Option> { + let message = messages.first()?; + let preview = truncate_for_dashboard(&comms::preview(&message.msg_type, &message.content), 72); + + Some(Line::from(vec![ + Span::raw("- "), + Span::styled( + format_session_id(&message.to_session), + Style::default().add_modifier(Modifier::BOLD), + ), + Span::raw(" | "), + Span::raw(preview), + ])) +} + fn truncate_for_dashboard(value: &str, max_chars: usize) -> String { let trimmed = value.trim(); if trimmed.chars().count() <= max_chars { @@ -3968,7 +4068,7 @@ mod tests { #[test] fn render_sessions_shows_summary_headers_and_selected_row() { - let dashboard = test_dashboard( + let mut dashboard = test_dashboard( vec![ sample_session( "run-12345678", @@ -3989,6 +4089,16 @@ mod tests { ], 1, ); + dashboard.approval_queue_counts = HashMap::from([(String::from("run-12345678"), 2usize)]); + dashboard.approval_queue_preview = vec![SessionMessage { + id: 1, + from_session: "lead-12345678".to_string(), + to_session: "run-12345678".to_string(), + content: "{\"question\":\"Need approval to continue\"}".to_string(), + msg_type: "query".to_string(), + read: false, + timestamp: Utc::now(), + }]; let rendered = render_dashboard_text(dashboard, 180, 24); assert!(rendered.contains("ID")); @@ -3996,10 +4106,73 @@ mod tests { assert!(rendered.contains("Total 2")); assert!(rendered.contains("Running 1")); assert!(rendered.contains("Completed 1")); - assert!(rendered.contains("Attention queue clear")); + assert!(rendered.contains("Approval queue")); assert!(rendered.contains("done-876")); } + #[test] + fn approval_queue_preview_line_uses_target_session_and_preview() { + let line = approval_queue_preview_line(&[SessionMessage { + id: 1, + from_session: "lead-12345678".to_string(), + to_session: "run-12345678".to_string(), + content: "{\"question\":\"Need approval to continue\"}".to_string(), + msg_type: "query".to_string(), + read: false, + timestamp: Utc::now(), + }]) + .expect("approval preview line"); + + let rendered = line + .spans + .iter() + .map(|span| span.content.as_ref()) + .collect::(); + assert!(rendered.contains("run-123")); + assert!(rendered.contains("query")); + } + + #[test] + fn sync_selected_messages_refreshes_approval_queue_after_marking_read() { + let sessions = vec![ + sample_session( + "lead-12345678", + "planner", + SessionState::Running, + Some("ecc/lead"), + 512, + 42, + ), + sample_session( + "worker-123456", + "reviewer", + SessionState::Idle, + Some("ecc/worker"), + 64, + 5, + ), + ]; + let mut dashboard = test_dashboard(sessions, 1); + for session in &dashboard.sessions { + dashboard.db.insert_session(session).unwrap(); + } + dashboard + .db + .send_message( + "lead-12345678", + "worker-123456", + "{\"question\":\"Need operator input\"}", + "query", + ) + .unwrap(); + dashboard.unread_message_counts = dashboard.db.unread_message_counts().unwrap(); + + dashboard.sync_selected_messages(); + + assert_eq!(dashboard.approval_queue_counts.get("worker-123456"), None); + assert!(dashboard.approval_queue_preview.is_empty()); + } + #[test] fn selected_session_metrics_text_includes_worktree_output_and_attention_queue() { let mut dashboard = test_dashboard( @@ -6254,6 +6427,8 @@ diff --git a/src/next.rs b/src/next.rs sessions, session_output_cache: HashMap::new(), unread_message_counts: HashMap::new(), + approval_queue_counts: HashMap::new(), + approval_queue_preview: Vec::new(), handoff_backlog_counts: HashMap::new(), worktree_health_by_session: HashMap::new(), global_handoff_backlog_leads: 0,