diff --git a/ecc2/src/comms/mod.rs b/ecc2/src/comms/mod.rs index 24dffa11..376dfd57 100644 --- a/ecc2/src/comms/mod.rs +++ b/ecc2/src/comms/mod.rs @@ -3,11 +3,26 @@ use serde::{Deserialize, Serialize}; use crate::session::store::StateStore; +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +#[serde(rename_all = "snake_case")] +pub enum TaskPriority { + Low, + #[default] + Normal, + High, + Critical, +} + /// Message types for inter-agent communication. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum MessageType { /// Task handoff from one agent to another - TaskHandoff { task: String, context: String }, + TaskHandoff { + task: String, + context: String, + #[serde(default)] + priority: TaskPriority, + }, /// Agent requesting information from another Query { question: String }, /// Response to a query @@ -46,7 +61,16 @@ pub fn parse(content: &str) -> Option { pub fn preview(msg_type: &str, content: &str) -> String { match parse(content) { Some(MessageType::TaskHandoff { task, .. }) => { - format!("handoff {}", truncate(&task, 56)) + let priority = handoff_priority(content); + if priority == TaskPriority::Normal { + format!("handoff {}", truncate(&task, 56)) + } else { + format!( + "handoff [{}] {}", + priority_label(priority), + truncate(&task, 48) + ) + } } Some(MessageType::Query { question }) => { format!("query {}", truncate(&question, 56)) @@ -75,6 +99,39 @@ pub fn preview(msg_type: &str, content: &str) -> String { } } +pub fn handoff_priority(content: &str) -> TaskPriority { + match parse(content) { + Some(MessageType::TaskHandoff { priority, .. }) => priority, + _ => extract_legacy_handoff_priority(content), + } +} + +fn extract_legacy_handoff_priority(content: &str) -> TaskPriority { + let value: serde_json::Value = match serde_json::from_str(content) { + Ok(value) => value, + Err(_) => return TaskPriority::Normal, + }; + match value + .get("priority") + .and_then(|priority| priority.as_str()) + .unwrap_or("normal") + { + "low" => TaskPriority::Low, + "high" => TaskPriority::High, + "critical" => TaskPriority::Critical, + _ => TaskPriority::Normal, + } +} + +fn priority_label(priority: TaskPriority) -> &'static str { + match priority { + TaskPriority::Low => "low", + TaskPriority::Normal => "normal", + TaskPriority::High => "high", + TaskPriority::Critical => "critical", + } +} + fn truncate(value: &str, max_chars: usize) -> String { let trimmed = value.trim(); if trimmed.chars().count() <= max_chars { diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index b5193bd3..f1eed4aa 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -374,6 +374,8 @@ enum MessageCommands { text: String, #[arg(long)] context: Option, + #[arg(long, value_enum, default_value_t = TaskPriorityArg::Normal)] + priority: TaskPriorityArg, #[arg(long)] file: Vec, }, @@ -599,6 +601,25 @@ enum MessageKindArg { Conflict, } +#[derive(clap::ValueEnum, Clone, Copy, Debug, PartialEq, Eq)] +enum TaskPriorityArg { + Low, + Normal, + High, + Critical, +} + +impl From for comms::TaskPriority { + fn from(value: TaskPriorityArg) -> Self { + match value { + TaskPriorityArg::Low => Self::Low, + TaskPriorityArg::Normal => Self::Normal, + TaskPriorityArg::High => Self::High, + TaskPriorityArg::Critical => Self::Critical, + } + } +} + #[derive(clap::ValueEnum, Clone, Debug)] enum ObservationPriorityArg { Low, @@ -1665,11 +1686,12 @@ async fn main() -> Result<()> { kind, text, context, + priority, file, } => { let from = resolve_session_id(&db, &from)?; let to = resolve_session_id(&db, &to)?; - let message = build_message(kind, text, context, file)?; + let message = build_message(kind, text, context, priority, file)?; comms::send(&db, &from, &to, &message)?; println!( "Message sent: {} -> {}", @@ -2701,12 +2723,14 @@ fn build_message( kind: MessageKindArg, text: String, context: Option, + priority: TaskPriorityArg, files: Vec, ) -> Result { Ok(match kind { MessageKindArg::Handoff => comms::MessageType::TaskHandoff { task: text, context: context.unwrap_or_default(), + priority: priority.into(), }, MessageKindArg::Query => comms::MessageType::Query { question: text }, MessageKindArg::Response => comms::MessageType::Response { answer: text }, @@ -4168,6 +4192,7 @@ fn send_handoff_message(db: &session::store::StateStore, from_id: &str, to_id: & &comms::MessageType::TaskHandoff { task: from_session.task, context, + priority: comms::TaskPriority::Normal, }, ) } @@ -4345,6 +4370,7 @@ mod tests { to, kind, text, + priority, .. }, }) => { @@ -4352,6 +4378,7 @@ mod tests { assert_eq!(to, "worker"); assert!(matches!(kind, MessageKindArg::Query)); assert_eq!(text, "Need context"); + assert_eq!(priority, TaskPriorityArg::Normal); } _ => panic!("expected messages send subcommand"), } diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index e36e2318..f651a08a 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -2484,6 +2484,7 @@ fn send_task_handoff( &crate::comms::MessageType::TaskHandoff { task: task.to_string(), context, + priority: crate::comms::TaskPriority::Normal, }, ) } @@ -5843,6 +5844,62 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "current_thread")] + async fn drain_inbox_routes_high_priority_handoff_first() -> Result<()> { + let tempdir = TestDir::new("manager-drain-inbox-priority")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "lead".to_string(), + task: "lead task".to_string(), + project: "workspace".to_string(), + task_group: "general".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(42), + worktree: None, + created_at: now - Duration::minutes(3), + updated_at: now - Duration::minutes(3), + last_heartbeat_at: now - Duration::minutes(3), + metrics: SessionMetrics::default(), + })?; + + db.send_message( + "planner", + "lead", + "{\"task\":\"Document cleanup\",\"context\":\"Inbound request\",\"priority\":\"low\"}", + "task_handoff", + )?; + db.send_message( + "planner", + "lead", + "{\"task\":\"Critical auth outage\",\"context\":\"Inbound request\",\"priority\":\"critical\"}", + "task_handoff", + )?; + + let outcomes = drain_inbox(&db, &cfg, "lead", "claude", true, 1).await?; + assert_eq!(outcomes.len(), 1); + assert_eq!(outcomes[0].task, "Critical auth outage"); + assert_eq!(outcomes[0].action, AssignmentAction::Spawned); + + let unread = db.unread_task_handoffs_for_session("lead", 10)?; + assert_eq!(unread.len(), 1); + assert!(unread[0].content.contains("Document cleanup")); + + let messages = db.list_messages_for_session(&outcomes[0].session_id, 10)?; + assert!(messages.iter().any(|message| { + message.msg_type == "task_handoff" && message.content.contains("Critical auth outage") + })); + + Ok(()) + } + #[tokio::test(flavor = "current_thread")] async fn auto_dispatch_backlog_routes_multiple_lead_inboxes() -> Result<()> { let tempdir = TestDir::new("manager-auto-dispatch")?; @@ -6307,6 +6364,7 @@ mod tests { &crate::comms::MessageType::TaskHandoff { task: "Review src/lib.rs".to_string(), context: "Lead delegated follow-up".to_string(), + priority: crate::comms::TaskPriority::Normal, }, )?; diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index fdebe58d..a1ee54a4 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -8,6 +8,7 @@ use std::io::{BufRead, BufReader}; use std::path::{Path, PathBuf}; use std::time::Duration; +use crate::comms; use crate::config::Config; use crate::observability::{ToolCallEvent, ToolLogEntry, ToolLogPage}; @@ -1885,11 +1886,10 @@ impl StateStore { "SELECT id, from_session, to_session, content, msg_type, read, timestamp FROM messages WHERE to_session = ?1 AND msg_type = 'task_handoff' AND read = 0 - ORDER BY id ASC - LIMIT ?2", + ORDER BY id ASC", )?; - let messages = stmt.query_map(rusqlite::params![session_id, limit as i64], |row| { + let messages = stmt.query_map(rusqlite::params![session_id], |row| { let timestamp: String = row.get(6)?; Ok(SessionMessage { @@ -1905,7 +1905,16 @@ impl StateStore { }) })?; - messages.collect::, _>>().map_err(Into::into) + let mut messages = messages.collect::, _>>()?; + messages.sort_by(|left, right| { + let left_priority = comms::handoff_priority(&left.content); + let right_priority = comms::handoff_priority(&right.content); + Reverse(left_priority) + .cmp(&Reverse(right_priority)) + .then_with(|| left.id.cmp(&right.id)) + }); + messages.truncate(limit); + Ok(messages) } pub fn unread_task_handoff_count(&self, session_id: &str) -> Result { @@ -1923,19 +1932,49 @@ impl StateStore { pub fn unread_task_handoff_targets(&self, limit: usize) -> Result> { let mut stmt = self.conn.prepare( - "SELECT to_session, COUNT(*) as unread_count + "SELECT to_session, content, id FROM messages WHERE msg_type = 'task_handoff' AND read = 0 - GROUP BY to_session - ORDER BY unread_count DESC, MAX(id) ASC - LIMIT ?1", + ORDER BY id ASC", )?; - let targets = stmt.query_map(rusqlite::params![limit as i64], |row| { - Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)? as usize)) + let targets = stmt.query_map([], |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, i64>(2)?, + )) })?; + let mut aggregated: HashMap = HashMap::new(); + for (to_session, content, id) in targets.collect::, _>>()? { + let priority = comms::handoff_priority(&content); + aggregated + .entry(to_session) + .and_modify(|entry| { + entry.0 += 1; + if priority > entry.1 { + entry.1 = priority; + } + if id < entry.2 { + entry.2 = id; + } + }) + .or_insert((1, priority, id)); + } - targets.collect::, _>>().map_err(Into::into) + let mut targets = aggregated.into_iter().collect::>(); + targets.sort_by(|(left_session, left), (right_session, right)| { + Reverse(left.1) + .cmp(&Reverse(right.1)) + .then_with(|| Reverse(left.0).cmp(&Reverse(right.0))) + .then_with(|| left.2.cmp(&right.2)) + .then_with(|| left_session.cmp(right_session)) + }); + targets.truncate(limit); + Ok(targets + .into_iter() + .map(|(session_id, (count, _, _))| (session_id, count)) + .collect()) } pub fn mark_messages_read(&self, session_id: &str) -> Result { @@ -5521,7 +5560,19 @@ mod tests { db.send_message( "planner", "worker-3", - "{\"task\":\"Check billing\",\"context\":\"Delegated from planner\"}", + "{\"task\":\"Check billing\",\"context\":\"Delegated from planner\",\"priority\":\"high\"}", + "task_handoff", + )?; + db.send_message( + "planner", + "worker-4", + "{\"task\":\"Low priority follow-up\",\"context\":\"Delegated from planner\",\"priority\":\"low\"}", + "task_handoff", + )?; + db.send_message( + "planner", + "worker-4", + "{\"task\":\"Critical production incident\",\"context\":\"Delegated from planner\",\"priority\":\"critical\"}", "task_handoff", )?; @@ -5531,12 +5582,28 @@ mod tests { ); assert_eq!( db.delegated_children("planner", 10)?, - vec!["worker-3".to_string(), "worker-2".to_string(),] + vec![ + "worker-4".to_string(), + "worker-3".to_string(), + "worker-2".to_string(), + ] ); assert_eq!( db.unread_task_handoff_targets(10)?, - vec![("worker-2".to_string(), 1), ("worker-3".to_string(), 1),] + vec![ + ("worker-4".to_string(), 2), + ("worker-3".to_string(), 1), + ("worker-2".to_string(), 1), + ] ); + let worker_4_handoffs = db.unread_task_handoffs_for_session("worker-4", 10)?; + assert_eq!(worker_4_handoffs.len(), 2); + assert!(worker_4_handoffs[0] + .content + .contains("Critical production incident")); + assert!(worker_4_handoffs[1] + .content + .contains("Low priority follow-up")); let planner_entities = db.list_context_entities(Some("planner"), Some("session"), 10)?; assert_eq!(planner_entities.len(), 1); diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index eeb5dfec..dab25e72 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -2174,6 +2174,7 @@ impl Dashboard { &comms::MessageType::TaskHandoff { task: source_session.task.clone(), context, + priority: comms::TaskPriority::Normal, }, ) { tracing::warn!( @@ -3655,6 +3656,7 @@ impl Dashboard { &comms::MessageType::TaskHandoff { task: task.clone(), context: context.clone(), + priority: comms::TaskPriority::Normal, }, ) { tracing::warn!(