diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index 3f5f2c2c..d9fbc2d5 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -1156,7 +1156,7 @@ async fn assign_session_in_dir_with_runner_program( .unwrap_or(0) == 0 }) - .min_by_key(|session| session.updated_at) + .max_by_key(|session| delegate_selection_key(db, session, task)) { send_task_handoff(db, &lead, &idle_delegate.id, task, "reused idle delegate")?; return Ok(AssignmentOutcome { @@ -1208,13 +1208,14 @@ async fn assign_session_in_dir_with_runner_program( if let Some(active_delegate) = delegates .iter() .filter(|session| matches!(session.state, SessionState::Running | SessionState::Pending)) - .min_by_key(|session| { + .max_by_key(|session| { ( - delegate_handoff_backlog + graph_context_match_score(db, &session.id, task), + -(delegate_handoff_backlog .get(&session.id) .copied() - .unwrap_or(0), - session.updated_at, + .unwrap_or(0) as i64), + -session.updated_at.timestamp_millis(), ) }) { @@ -2358,6 +2359,61 @@ fn direct_delegate_sessions( Ok(sessions) } +fn delegate_selection_key(db: &StateStore, session: &Session, task: &str) -> (usize, i64) { + ( + graph_context_match_score(db, &session.id, task), + -session.updated_at.timestamp_millis(), + ) +} + +fn graph_context_match_score(db: &StateStore, session_id: &str, task: &str) -> usize { + let terms = graph_match_terms(task); + if terms.is_empty() { + return 0; + } + + let entities = match db.list_context_entities(Some(session_id), None, 48) { + Ok(entities) => entities, + Err(_) => return 0, + }; + + let mut matched = HashSet::new(); + for entity in entities { + let mut haystacks = vec![entity.name.to_lowercase(), entity.summary.to_lowercase()]; + if let Some(path) = entity.path.as_ref() { + haystacks.push(path.to_lowercase()); + } + for (key, value) in entity.metadata { + haystacks.push(key.to_lowercase()); + haystacks.push(value.to_lowercase()); + } + + for term in &terms { + if haystacks.iter().any(|haystack| haystack.contains(term)) { + matched.insert(term.clone()); + } + } + } + + matched.len() +} + +fn graph_match_terms(task: &str) -> Vec { + let mut terms = Vec::new(); + let mut seen = HashSet::new(); + for token in task + .split(|ch: char| !(ch.is_ascii_alphanumeric() || matches!(ch, '_' | '.' | '-'))) + .map(str::trim) + .filter(|token| token.len() >= 3) + { + let lowered = token.to_ascii_lowercase(); + if seen.insert(lowered.clone()) { + terms.push(lowered); + } + } + terms +} + fn summarize_backlog_pressure( db: &StateStore, cfg: &Config, @@ -4740,6 +4796,112 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "current_thread")] + async fn assign_session_prefers_idle_delegate_with_graph_context_match() -> Result<()> { + let tempdir = TestDir::new("manager-assign-graph-context-idle")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let now = Utc::now(); + + db.insert_session(&Session { + id: "lead".to_string(), + task: "lead task".to_string(), + project: "workspace".to_string(), + task_group: "general".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Running, + pid: Some(42), + worktree: None, + created_at: now - Duration::minutes(4), + updated_at: now - Duration::minutes(4), + last_heartbeat_at: now - Duration::minutes(4), + metrics: SessionMetrics::default(), + })?; + db.insert_session(&Session { + id: "older-worker".to_string(), + task: "legacy delegated task".to_string(), + project: "workspace".to_string(), + task_group: "general".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Idle, + pid: Some(100), + worktree: None, + created_at: now - Duration::minutes(3), + updated_at: now - Duration::minutes(3), + last_heartbeat_at: now - Duration::minutes(3), + metrics: SessionMetrics::default(), + })?; + db.insert_session(&Session { + id: "auth-worker".to_string(), + task: "auth delegated task".to_string(), + project: "workspace".to_string(), + task_group: "general".to_string(), + agent_type: "claude".to_string(), + working_dir: repo_root.clone(), + state: SessionState::Idle, + pid: Some(101), + worktree: None, + created_at: now - Duration::minutes(2), + updated_at: now - Duration::minutes(2), + last_heartbeat_at: now - Duration::minutes(2), + metrics: SessionMetrics::default(), + })?; + db.send_message( + "lead", + "older-worker", + "{\"task\":\"legacy delegated task\",\"context\":\"Delegated from lead\"}", + "task_handoff", + )?; + db.send_message( + "lead", + "auth-worker", + "{\"task\":\"auth delegated task\",\"context\":\"Delegated from lead\"}", + "task_handoff", + )?; + db.mark_messages_read("older-worker")?; + db.mark_messages_read("auth-worker")?; + + db.upsert_context_entity( + Some("auth-worker"), + "file", + "auth-callback.ts", + Some("src/auth/callback.ts"), + "Auth callback recovery edge cases", + &BTreeMap::new(), + )?; + + let (fake_runner, _) = write_fake_claude(tempdir.path())?; + let outcome = assign_session_in_dir_with_runner_program( + &db, + &cfg, + "lead", + "Investigate auth callback recovery", + "claude", + true, + &repo_root, + &fake_runner, + None, + SessionGrouping::default(), + ) + .await?; + + assert_eq!(outcome.action, AssignmentAction::ReusedIdle); + assert_eq!(outcome.session_id, "auth-worker"); + + let auth_messages = db.list_messages_for_session("auth-worker", 10)?; + assert!(auth_messages.iter().any(|message| { + message.msg_type == "task_handoff" + && message.content.contains("Investigate auth callback recovery") + })); + + Ok(()) + } + #[tokio::test(flavor = "current_thread")] async fn assign_session_spawns_instead_of_reusing_backed_up_idle_delegate() -> Result<()> { let tempdir = TestDir::new("manager-assign-spawn-backed-up-idle")?;