diff --git a/ecc2/Cargo.lock b/ecc2/Cargo.lock index 40cd4724..ff240c32 100644 --- a/ecc2/Cargo.lock +++ b/ecc2/Cargo.lock @@ -315,6 +315,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "cron" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07" +dependencies = [ + "chrono", + "nom", + "once_cell", +] + [[package]] name = "crossterm" version = "0.28.1" @@ -507,6 +518,7 @@ dependencies = [ "anyhow", "chrono", "clap", + "cron", "crossterm 0.28.1", "dirs", "git2", diff --git a/ecc2/Cargo.toml b/ecc2/Cargo.toml index 85399a3b..ea8d9733 100644 --- a/ecc2/Cargo.toml +++ b/ecc2/Cargo.toml @@ -43,6 +43,7 @@ libc = "0.2" # Time chrono = { version = "0.4", features = ["serde"] } +cron = "0.12" # UUID for session IDs uuid = { version = "1", features = ["v4"] } diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index f1eed4aa..d5668649 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -322,6 +322,11 @@ enum Commands { #[command(subcommand)] command: GraphCommands, }, + /// Manage persistent scheduled task dispatch + Schedule { + #[command(subcommand)] + command: ScheduleCommands, + }, /// Export sessions, tool spans, and metrics in OTLP-compatible JSON ExportOtel { /// Session ID or alias. Omit to export all sessions. @@ -387,6 +392,56 @@ enum MessageCommands { }, } +#[derive(clap::Subcommand, Debug)] +enum ScheduleCommands { + /// Add a persistent scheduled task + Add { + /// Cron expression in 5, 6, or 7-field form + #[arg(long)] + cron: String, + /// Task description to run on each schedule + #[arg(short, long)] + task: String, + /// Agent type (claude, codex, gemini, opencode) + #[arg(short, long)] + agent: Option, + /// Agent profile defined in ecc2.toml + #[arg(long)] + profile: Option, + #[command(flatten)] + worktree: WorktreePolicyArgs, + /// Optional project grouping override + #[arg(long)] + project: Option, + /// Optional task-group grouping override + #[arg(long)] + task_group: Option, + /// Emit machine-readable JSON instead of the human summary + #[arg(long)] + json: bool, + }, + /// List scheduled tasks + List { + /// Emit machine-readable JSON instead of the human summary + #[arg(long)] + json: bool, + }, + /// Remove a scheduled task + Remove { + /// Schedule ID + schedule_id: i64, + }, + /// Dispatch currently due scheduled tasks + RunDue { + /// Maximum due schedules to dispatch in one pass + #[arg(long, default_value_t = 10)] + limit: usize, + /// Emit machine-readable JSON instead of the human summary + #[arg(long)] + json: bool, + }, +} + #[derive(clap::Subcommand, Debug)] enum GraphCommands { /// Create or update a graph entity @@ -1727,6 +1782,90 @@ async fn main() -> Result<()> { } } }, + Some(Commands::Schedule { command }) => match command { + ScheduleCommands::Add { + cron, + task, + agent, + profile, + worktree, + project, + task_group, + json, + } => { + let schedule = session::manager::create_scheduled_task( + &db, + &cfg, + &cron, + &task, + agent.as_deref().unwrap_or(&cfg.default_agent), + profile.as_deref(), + worktree.resolve(&cfg), + session::SessionGrouping { + project, + task_group, + }, + )?; + if json { + println!("{}", serde_json::to_string_pretty(&schedule)?); + } else { + println!( + "Scheduled task {} next runs at {}", + schedule.id, + schedule.next_run_at.to_rfc3339() + ); + println!( + "- {} [{}] | {}", + schedule.task, schedule.agent_type, schedule.cron_expr + ); + } + } + ScheduleCommands::List { json } => { + let schedules = session::manager::list_scheduled_tasks(&db)?; + if json { + println!("{}", serde_json::to_string_pretty(&schedules)?); + } else if schedules.is_empty() { + println!("No scheduled tasks"); + } else { + println!("Scheduled tasks"); + for schedule in schedules { + println!( + "#{} {} [{}] | {} | next {}", + schedule.id, + schedule.task, + schedule.agent_type, + schedule.cron_expr, + schedule.next_run_at.to_rfc3339() + ); + } + } + } + ScheduleCommands::Remove { schedule_id } => { + if !session::manager::delete_scheduled_task(&db, schedule_id)? { + anyhow::bail!("Scheduled task not found: {schedule_id}"); + } + println!("Removed scheduled task {schedule_id}"); + } + ScheduleCommands::RunDue { limit, json } => { + let outcomes = session::manager::run_due_schedules(&db, &cfg, limit).await?; + if json { + println!("{}", serde_json::to_string_pretty(&outcomes)?); + } else if outcomes.is_empty() { + println!("No due scheduled tasks"); + } else { + println!("Dispatched {} scheduled task(s)", outcomes.len()); + for outcome in outcomes { + println!( + "#{} -> {} | {} | next {}", + outcome.schedule_id, + short_session(&outcome.session_id), + outcome.task, + outcome.next_run_at.to_rfc3339() + ); + } + } + } + }, Some(Commands::Daemon) => { println!("Starting ECC daemon..."); session::daemon::run(db, cfg).await?; @@ -4384,6 +4523,51 @@ mod tests { } } + #[test] + fn cli_parses_schedule_add_command() { + let cli = Cli::try_parse_from([ + "ecc", + "schedule", + "add", + "--cron", + "*/15 * * * *", + "--task", + "Check backlog health", + "--agent", + "codex", + "--profile", + "planner", + "--project", + "ecc-core", + "--task-group", + "scheduled maintenance", + ]) + .expect("schedule add should parse"); + + match cli.command { + Some(Commands::Schedule { + command: + ScheduleCommands::Add { + cron, + task, + agent, + profile, + project, + task_group, + .. + }, + }) => { + assert_eq!(cron, "*/15 * * * *"); + assert_eq!(task, "Check backlog health"); + assert_eq!(agent.as_deref(), Some("codex")); + assert_eq!(profile.as_deref(), Some("planner")); + assert_eq!(project.as_deref(), Some("ecc-core")); + assert_eq!(task_group.as_deref(), Some("scheduled maintenance")); + } + _ => panic!("expected schedule add subcommand"), + } + } + #[test] fn cli_parses_start_with_handoff_source() { let cli = Cli::try_parse_from([ diff --git a/ecc2/src/session/daemon.rs b/ecc2/src/session/daemon.rs index 2f5096fb..9f55df04 100644 --- a/ecc2/src/session/daemon.rs +++ b/ecc2/src/session/daemon.rs @@ -27,6 +27,10 @@ pub async fn run(db: StateStore, cfg: Config) -> Result<()> { tracing::error!("Session check failed: {e}"); } + if let Err(e) = maybe_run_due_schedules(&db, &cfg).await { + tracing::error!("Scheduled task dispatch pass failed: {e}"); + } + if let Err(e) = coordinate_backlog_cycle(&db, &cfg).await { tracing::error!("Backlog coordination pass failed: {e}"); } @@ -89,6 +93,14 @@ fn check_sessions(db: &StateStore, cfg: &Config) -> Result<()> { Ok(()) } +async fn maybe_run_due_schedules(db: &StateStore, cfg: &Config) -> Result { + let outcomes = manager::run_due_schedules(db, cfg, cfg.max_parallel_sessions).await?; + if !outcomes.is_empty() { + tracing::info!("Dispatched {} scheduled task(s)", outcomes.len()); + } + Ok(outcomes.len()) +} + async fn maybe_auto_dispatch(db: &StateStore, cfg: &Config) -> Result { let summary = maybe_auto_dispatch_with_recorder( cfg, diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index f651a08a..150b4ef0 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -1,17 +1,21 @@ use anyhow::{Context, Result}; +use chrono::Utc; +use cron::Schedule as CronSchedule; use serde::Serialize; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt; use std::path::{Path, PathBuf}; use std::process::Stdio; +use std::str::FromStr; use tokio::process::Command; use super::output::SessionOutputStore; use super::runtime::capture_command_output; use super::store::StateStore; use super::{ - default_project_label, default_task_group_label, normalize_group_label, HarnessKind, Session, - SessionAgentProfile, SessionGrouping, SessionHarnessInfo, SessionMetrics, SessionState, + default_project_label, default_task_group_label, normalize_group_label, HarnessKind, + ScheduledTask, Session, SessionAgentProfile, SessionGrouping, SessionHarnessInfo, + SessionMetrics, SessionState, }; use crate::comms::{self, MessageType}; use crate::config::Config; @@ -108,6 +112,48 @@ pub async fn create_session_from_source_with_profile_and_grouping( .await } +async fn run_due_schedules_with_runner_program( + db: &StateStore, + cfg: &Config, + limit: usize, + runner_program: &Path, +) -> Result> { + let now = Utc::now(); + let schedules = db.list_due_scheduled_tasks(now, limit)?; + let mut outcomes = Vec::new(); + + for schedule in schedules { + let grouping = SessionGrouping { + project: normalize_group_label(&schedule.project), + task_group: normalize_group_label(&schedule.task_group), + }; + let session_id = queue_session_in_dir_with_runner_program( + db, + cfg, + &schedule.task, + &schedule.agent_type, + schedule.use_worktree, + &schedule.working_dir, + runner_program, + schedule.profile_name.as_deref(), + None, + grouping, + ) + .await?; + let next_run_at = next_schedule_run_at(&schedule.cron_expr, now)?; + db.record_scheduled_task_run(schedule.id, now, next_run_at)?; + outcomes.push(ScheduledRunOutcome { + schedule_id: schedule.id, + session_id, + task: schedule.task, + cron_expr: schedule.cron_expr, + next_run_at, + }); + } + + Ok(outcomes) +} + pub fn list_sessions(db: &StateStore) -> Result> { db.list_sessions() } @@ -155,6 +201,66 @@ pub fn get_team_status(db: &StateStore, id: &str, depth: usize) -> Result, + use_worktree: bool, + grouping: SessionGrouping, +) -> Result { + let working_dir = + std::env::current_dir().context("Failed to resolve current working directory")?; + let project = grouping + .project + .as_deref() + .and_then(normalize_group_label) + .unwrap_or_else(|| default_project_label(&working_dir)); + let task_group = grouping + .task_group + .as_deref() + .and_then(normalize_group_label) + .unwrap_or_else(|| default_task_group_label(task)); + let agent_type = HarnessKind::canonical_agent_type(agent_type); + + if let Some(profile_name) = profile_name { + cfg.resolve_agent_profile(profile_name)?; + } + + let next_run_at = next_schedule_run_at(cron_expr, Utc::now())?; + db.insert_scheduled_task( + cron_expr, + task, + &agent_type, + profile_name, + &working_dir, + &project, + &task_group, + use_worktree, + next_run_at, + ) +} + +pub fn list_scheduled_tasks(db: &StateStore) -> Result> { + db.list_scheduled_tasks() +} + +pub fn delete_scheduled_task(db: &StateStore, schedule_id: i64) -> Result { + Ok(db.delete_scheduled_task(schedule_id)? > 0) +} + +pub async fn run_due_schedules( + db: &StateStore, + cfg: &Config, + limit: usize, +) -> Result> { + let runner_program = + std::env::current_exe().context("Failed to resolve ECC executable path")?; + run_due_schedules_with_runner_program(db, cfg, limit, &runner_program).await +} + #[derive(Debug, Clone, Serialize, PartialEq, Eq)] pub struct TemplateLaunchStepOutcome { pub step_name: String, @@ -1916,6 +2022,32 @@ fn resolve_session(db: &StateStore, id: &str) -> Result { session.ok_or_else(|| anyhow::anyhow!("Session not found: {id}")) } +fn parse_cron_schedule(expr: &str) -> Result { + let trimmed = expr.trim(); + let normalized = match trimmed.split_whitespace().count() { + 5 => format!("0 {trimmed}"), + 6 | 7 => trimmed.to_string(), + fields => { + anyhow::bail!( + "invalid cron expression `{trimmed}`: expected 5, 6, or 7 fields but found {fields}" + ) + } + }; + CronSchedule::from_str(&normalized) + .with_context(|| format!("invalid cron expression `{trimmed}`")) +} + +fn next_schedule_run_at( + expr: &str, + after: chrono::DateTime, +) -> Result> { + parse_cron_schedule(expr)? + .after(&after) + .next() + .map(|value| value.with_timezone(&chrono::Utc)) + .ok_or_else(|| anyhow::anyhow!("cron expression `{expr}` did not yield a future run time")) +} + pub async fn run_session( cfg: &Config, session_id: &str, @@ -2805,6 +2937,15 @@ pub struct LeadDispatchOutcome { pub routed: Vec, } +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +pub struct ScheduledRunOutcome { + pub schedule_id: i64, + pub session_id: String, + pub task: String, + pub cron_expr: String, + pub next_run_at: chrono::DateTime, +} + pub struct RebalanceOutcome { pub from_session_id: String, pub message_id: i64, @@ -3891,6 +4032,53 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "current_thread")] + async fn run_due_schedules_dispatches_due_tasks_and_advances_next_run() -> Result<()> { + let tempdir = TestDir::new("manager-run-due-schedules")?; + let repo_root = tempdir.path().join("repo"); + init_git_repo(&repo_root)?; + + let cfg = build_config(tempdir.path()); + let db = StateStore::open(&cfg.db_path)?; + let (fake_runner, log_path) = write_fake_claude(tempdir.path())?; + let due_at = Utc::now() - Duration::minutes(1); + + let schedule = db.insert_scheduled_task( + "*/15 * * * *", + "Check backlog health", + "claude", + None, + &repo_root, + "ecc-core", + "scheduled maintenance", + true, + due_at, + )?; + + let outcomes = run_due_schedules_with_runner_program(&db, &cfg, 10, &fake_runner).await?; + assert_eq!(outcomes.len(), 1); + assert_eq!(outcomes[0].schedule_id, schedule.id); + assert_eq!(outcomes[0].task, "Check backlog health"); + + let session = db + .get_session(&outcomes[0].session_id)? + .context("scheduled session should exist")?; + assert_eq!(session.project, "ecc-core"); + assert_eq!(session.task_group, "scheduled maintenance"); + + let refreshed = db + .get_scheduled_task(schedule.id)? + .context("scheduled task should still exist")?; + assert!(refreshed.last_run_at.is_some()); + assert!(refreshed.next_run_at > due_at); + + let log = wait_for_file(&log_path)?; + assert!(log.contains("Check backlog health")); + + stop_session_with_options(&db, &outcomes[0].session_id, true).await?; + Ok(()) + } + #[tokio::test(flavor = "current_thread")] async fn stop_session_kills_process_and_optionally_cleans_worktree() -> Result<()> { let tempdir = TestDir::new("manager-stop-session")?; diff --git a/ecc2/src/session/mod.rs b/ecc2/src/session/mod.rs index 8ad6c54a..ffff01e7 100644 --- a/ecc2/src/session/mod.rs +++ b/ecc2/src/session/mod.rs @@ -274,6 +274,23 @@ pub struct SessionMessage { pub timestamp: DateTime, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ScheduledTask { + pub id: i64, + pub cron_expr: String, + pub task: String, + pub agent_type: String, + pub profile_name: Option, + pub working_dir: PathBuf, + pub project: String, + pub task_group: String, + pub use_worktree: bool, + pub last_run_at: Option>, + pub next_run_at: DateTime, + pub created_at: DateTime, + pub updated_at: DateTime, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct FileActivityEntry { pub session_id: String, diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index a1ee54a4..31196d3c 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -18,8 +18,8 @@ use super::{ ContextGraphCompactionStats, ContextGraphEntity, ContextGraphEntityDetail, ContextGraphObservation, ContextGraphRecallEntry, ContextGraphRelation, ContextGraphSyncStats, ContextObservationPriority, DecisionLogEntry, FileActivityAction, FileActivityEntry, - HarnessKind, Session, SessionAgentProfile, SessionHarnessInfo, SessionMessage, SessionMetrics, - SessionState, WorktreeInfo, + HarnessKind, ScheduledTask, Session, SessionAgentProfile, SessionHarnessInfo, SessionMessage, + SessionMetrics, SessionState, WorktreeInfo, }; pub struct StateStore { @@ -299,6 +299,22 @@ impl StateStore { requested_at TEXT NOT NULL ); + CREATE TABLE IF NOT EXISTS scheduled_tasks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + cron_expr TEXT NOT NULL, + task TEXT NOT NULL, + agent_type TEXT NOT NULL, + profile_name TEXT, + working_dir TEXT NOT NULL, + project TEXT NOT NULL DEFAULT '', + task_group TEXT NOT NULL DEFAULT '', + use_worktree INTEGER NOT NULL DEFAULT 1, + last_run_at TEXT, + next_run_at TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS conflict_incidents ( id INTEGER PRIMARY KEY AUTOINCREMENT, conflict_key TEXT NOT NULL UNIQUE, @@ -1029,6 +1045,125 @@ impl StateStore { Ok(rows) } + pub fn insert_scheduled_task( + &self, + cron_expr: &str, + task: &str, + agent_type: &str, + profile_name: Option<&str>, + working_dir: &Path, + project: &str, + task_group: &str, + use_worktree: bool, + next_run_at: chrono::DateTime, + ) -> Result { + let now = chrono::Utc::now(); + self.conn.execute( + "INSERT INTO scheduled_tasks ( + cron_expr, + task, + agent_type, + profile_name, + working_dir, + project, + task_group, + use_worktree, + next_run_at, + created_at, + updated_at + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", + rusqlite::params![ + cron_expr, + task, + agent_type, + profile_name, + working_dir.display().to_string(), + project, + task_group, + if use_worktree { 1_i64 } else { 0_i64 }, + next_run_at.to_rfc3339(), + now.to_rfc3339(), + now.to_rfc3339(), + ], + )?; + let id = self.conn.last_insert_rowid(); + self.get_scheduled_task(id)? + .ok_or_else(|| anyhow::anyhow!("Scheduled task {id} was not found after insert")) + } + + pub fn list_scheduled_tasks(&self) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT id, cron_expr, task, agent_type, profile_name, working_dir, project, task_group, + use_worktree, last_run_at, next_run_at, created_at, updated_at + FROM scheduled_tasks + ORDER BY next_run_at ASC, id ASC", + )?; + + let rows = stmt.query_map([], map_scheduled_task)?; + rows.collect::, _>>().map_err(Into::into) + } + + pub fn list_due_scheduled_tasks( + &self, + now: chrono::DateTime, + limit: usize, + ) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT id, cron_expr, task, agent_type, profile_name, working_dir, project, task_group, + use_worktree, last_run_at, next_run_at, created_at, updated_at + FROM scheduled_tasks + WHERE next_run_at <= ?1 + ORDER BY next_run_at ASC, id ASC + LIMIT ?2", + )?; + + let rows = stmt.query_map( + rusqlite::params![now.to_rfc3339(), limit as i64], + map_scheduled_task, + )?; + rows.collect::, _>>().map_err(Into::into) + } + + pub fn get_scheduled_task(&self, schedule_id: i64) -> Result> { + self.conn + .query_row( + "SELECT id, cron_expr, task, agent_type, profile_name, working_dir, project, task_group, + use_worktree, last_run_at, next_run_at, created_at, updated_at + FROM scheduled_tasks + WHERE id = ?1", + [schedule_id], + map_scheduled_task, + ) + .optional() + .map_err(Into::into) + } + + pub fn delete_scheduled_task(&self, schedule_id: i64) -> Result { + self.conn + .execute("DELETE FROM scheduled_tasks WHERE id = ?1", [schedule_id]) + .map_err(Into::into) + } + + pub fn record_scheduled_task_run( + &self, + schedule_id: i64, + last_run_at: chrono::DateTime, + next_run_at: chrono::DateTime, + ) -> Result<()> { + self.conn.execute( + "UPDATE scheduled_tasks + SET last_run_at = ?2, next_run_at = ?3, updated_at = ?4 + WHERE id = ?1", + rusqlite::params![ + schedule_id, + last_run_at.to_rfc3339(), + next_run_at.to_rfc3339(), + chrono::Utc::now().to_rfc3339(), + ], + )?; + Ok(()) + } + pub fn update_metrics(&self, session_id: &str, metrics: &SessionMetrics) -> Result<()> { self.conn.execute( "UPDATE sessions @@ -3565,6 +3700,31 @@ fn map_conflict_incident(row: &rusqlite::Row<'_>) -> rusqlite::Result) -> rusqlite::Result { + let last_run_at = row + .get::<_, Option>(9)? + .map(|value| parse_store_timestamp(value, 9)) + .transpose()?; + let next_run_at = parse_store_timestamp(row.get::<_, String>(10)?, 10)?; + let created_at = parse_store_timestamp(row.get::<_, String>(11)?, 11)?; + let updated_at = parse_store_timestamp(row.get::<_, String>(12)?, 12)?; + Ok(ScheduledTask { + id: row.get(0)?, + cron_expr: row.get(1)?, + task: row.get(2)?, + agent_type: row.get(3)?, + profile_name: normalize_optional_string(row.get(4)?), + working_dir: PathBuf::from(row.get::<_, String>(5)?), + project: row.get(6)?, + task_group: row.get(7)?, + use_worktree: row.get::<_, i64>(8)? != 0, + last_run_at, + next_run_at, + created_at, + updated_at, + }) +} + fn parse_timestamp_column( value: String, index: usize, @@ -5096,6 +5256,49 @@ mod tests { Ok(()) } + #[test] + fn scheduled_tasks_round_trip_and_advance_runs() -> Result<()> { + let tempdir = TestDir::new("store-scheduled-tasks")?; + let db = StateStore::open(&tempdir.path().join("state.db"))?; + let now = Utc::now(); + let due_next_run = now - ChronoDuration::minutes(1); + + let inserted = db.insert_scheduled_task( + "*/15 * * * *", + "Check backlog health", + "claude", + Some("planner"), + tempdir.path(), + "ecc-core", + "scheduled maintenance", + true, + due_next_run, + )?; + + let listed = db.list_scheduled_tasks()?; + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].id, inserted.id); + assert_eq!(listed[0].profile_name.as_deref(), Some("planner")); + + let due = db.list_due_scheduled_tasks(now, 10)?; + assert_eq!(due.len(), 1); + assert_eq!(due[0].id, inserted.id); + + let advanced_next_run = now + ChronoDuration::minutes(15); + db.record_scheduled_task_run(inserted.id, now, advanced_next_run)?; + + let refreshed = db + .get_scheduled_task(inserted.id)? + .context("scheduled task should still exist")?; + assert_eq!(refreshed.last_run_at, Some(now)); + assert_eq!(refreshed.next_run_at, advanced_next_run); + + assert_eq!(db.delete_scheduled_task(inserted.id)?, 1); + assert!(db.get_scheduled_task(inserted.id)?.is_none()); + + Ok(()) + } + #[test] fn context_graph_detail_includes_incoming_and_outgoing_relations() -> Result<()> { let tempdir = TestDir::new("store-context-relations")?;