diff --git a/ecc2/src/config/mod.rs b/ecc2/src/config/mod.rs index 4e00bc9b..702e66af 100644 --- a/ecc2/src/config/mod.rs +++ b/ecc2/src/config/mod.rs @@ -103,6 +103,21 @@ pub struct OrchestrationTemplateStepConfig { pub task_group: Option, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum MemoryConnectorConfig { + JsonlFile(MemoryConnectorJsonlFileConfig), +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default)] +pub struct MemoryConnectorJsonlFileConfig { + pub path: PathBuf, + pub session_id: Option, + pub default_entity_type: Option, + pub default_observation_type: Option, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ResolvedOrchestrationTemplate { pub template_name: String, @@ -139,6 +154,7 @@ pub struct Config { pub default_agent_profile: Option, pub agent_profiles: BTreeMap, pub orchestration_templates: BTreeMap, + pub memory_connectors: BTreeMap, pub auto_dispatch_unread_handoffs: bool, pub auto_dispatch_limit_per_session: usize, pub auto_create_worktrees: bool, @@ -203,6 +219,7 @@ impl Default for Config { default_agent_profile: None, agent_profiles: BTreeMap::new(), orchestration_templates: BTreeMap::new(), + memory_connectors: BTreeMap::new(), auto_dispatch_unread_handoffs: false, auto_dispatch_limit_per_session: 5, auto_create_worktrees: true, @@ -1231,6 +1248,37 @@ task = "Plan {{task}} for {{component}}" assert!(error_text.contains("missing orchestration template variable(s): component")); } + #[test] + fn memory_connectors_deserialize_from_toml() { + let config: Config = toml::from_str( + r#" +[memory_connectors.hermes_notes] +kind = "jsonl_file" +path = "/tmp/hermes-memory.jsonl" +session_id = "latest" +default_entity_type = "incident" +default_observation_type = "external_note" +"#, + ) + .unwrap(); + + let connector = config + .memory_connectors + .get("hermes_notes") + .expect("connector should deserialize"); + match connector { + crate::config::MemoryConnectorConfig::JsonlFile(settings) => { + assert_eq!(settings.path, PathBuf::from("/tmp/hermes-memory.jsonl")); + assert_eq!(settings.session_id.as_deref(), Some("latest")); + assert_eq!(settings.default_entity_type.as_deref(), Some("incident")); + assert_eq!( + settings.default_observation_type.as_deref(), + Some("external_note") + ); + } + } + } + #[test] fn completion_summary_notifications_deserialize_from_toml() { let config: Config = toml::from_str( diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 9329af08..993f92dd 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -6,10 +6,12 @@ mod session; mod tui; mod worktree; -use anyhow::Result; +use anyhow::{Context, Result}; use clap::Parser; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; +use std::fs::File; +use std::io::{BufRead, BufReader}; use std::path::PathBuf; use tracing_subscriber::EnvFilter; @@ -502,6 +504,17 @@ enum GraphCommands { #[arg(long)] json: bool, }, + /// Import external memory from a configured connector + ConnectorSync { + /// Connector name from ecc2.toml + name: String, + /// Maximum non-empty records to process + #[arg(long, default_value_t = 256)] + limit: usize, + /// Emit machine-readable JSON instead of the human summary + #[arg(long)] + json: bool, + }, /// Recall relevant context graph entities for a query Recall { /// Filter by source session ID or alias @@ -552,6 +565,29 @@ enum MessageKindArg { Conflict, } +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +struct GraphConnectorSyncStats { + connector_name: String, + records_read: usize, + entities_upserted: usize, + observations_added: usize, + skipped_records: usize, +} + +#[derive(Debug, Clone, Default, Deserialize)] +#[serde(default)] +struct JsonlMemoryConnectorRecord { + session_id: Option, + entity_type: Option, + entity_name: String, + path: Option, + entity_summary: Option, + metadata: BTreeMap, + observation_type: Option, + summary: String, + details: BTreeMap, +} + #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() @@ -1352,6 +1388,14 @@ async fn main() -> Result<()> { ); } } + GraphCommands::ConnectorSync { name, limit, json } => { + let stats = sync_memory_connector(&db, &cfg, &name, limit)?; + if json { + println!("{}", serde_json::to_string_pretty(&stats)?); + } else { + println!("{}", format_graph_connector_sync_stats_human(&stats)); + } + } GraphCommands::Recall { session_id, query, @@ -1532,6 +1576,133 @@ fn sync_runtime_session_metrics( Ok(()) } +fn sync_memory_connector( + db: &session::store::StateStore, + cfg: &config::Config, + name: &str, + limit: usize, +) -> Result { + let connector = cfg + .memory_connectors + .get(name) + .ok_or_else(|| anyhow::anyhow!("Unknown memory connector: {name}"))?; + + match connector { + config::MemoryConnectorConfig::JsonlFile(settings) => { + sync_jsonl_memory_connector(db, name, settings, limit) + } + } +} + +fn sync_jsonl_memory_connector( + db: &session::store::StateStore, + name: &str, + settings: &config::MemoryConnectorJsonlFileConfig, + limit: usize, +) -> Result { + if settings.path.as_os_str().is_empty() { + anyhow::bail!("memory connector {name} has no path configured"); + } + + let default_session_id = settings + .session_id + .as_deref() + .map(|value| resolve_session_id(db, value)) + .transpose()?; + let file = File::open(&settings.path) + .with_context(|| format!("open memory connector file {}", settings.path.display()))?; + let reader = BufReader::new(file); + + let mut stats = GraphConnectorSyncStats { + connector_name: name.to_string(), + ..Default::default() + }; + + for line in reader.lines() { + let line = line?; + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + if stats.records_read >= limit { + break; + } + stats.records_read += 1; + + let record: JsonlMemoryConnectorRecord = match serde_json::from_str(trimmed) { + Ok(record) => record, + Err(_) => { + stats.skipped_records += 1; + continue; + } + }; + + let session_id = match record.session_id.as_deref() { + Some(value) => match resolve_session_id(db, value) { + Ok(resolved) => Some(resolved), + Err(_) => { + stats.skipped_records += 1; + continue; + } + }, + None => default_session_id.clone(), + }; + let entity_type = record + .entity_type + .as_deref() + .or(settings.default_entity_type.as_deref()) + .map(str::trim) + .filter(|value| !value.is_empty()); + let observation_type = record + .observation_type + .as_deref() + .or(settings.default_observation_type.as_deref()) + .map(str::trim) + .filter(|value| !value.is_empty()); + let entity_name = record.entity_name.trim(); + let summary = record.summary.trim(); + + let Some(entity_type) = entity_type else { + stats.skipped_records += 1; + continue; + }; + let Some(observation_type) = observation_type else { + stats.skipped_records += 1; + continue; + }; + if entity_name.is_empty() || summary.is_empty() { + stats.skipped_records += 1; + continue; + } + + let entity_summary = record + .entity_summary + .as_deref() + .map(str::trim) + .filter(|value| !value.is_empty()) + .unwrap_or(summary); + let entity = db.upsert_context_entity( + session_id.as_deref(), + entity_type, + entity_name, + record.path.as_deref(), + entity_summary, + &record.metadata, + )?; + db.add_context_observation( + session_id.as_deref(), + entity.id, + observation_type, + summary, + &record.details, + )?; + stats.entities_upserted += 1; + stats.observations_added += 1; + } + + Ok(stats) +} + fn build_message( kind: MessageKindArg, text: String, @@ -2480,6 +2651,17 @@ fn format_graph_compaction_stats_human( .join("\n") } +fn format_graph_connector_sync_stats_human(stats: &GraphConnectorSyncStats) -> String { + [ + format!("Memory connector sync complete: {}", stats.connector_name), + format!("- records read {}", stats.records_read), + format!("- entities upserted {}", stats.entities_upserted), + format!("- observations added {}", stats.observations_added), + format!("- skipped records {}", stats.skipped_records), + ] + .join("\n") +} + fn format_graph_entity_detail_human(detail: &session::ContextGraphEntityDetail) -> String { let mut lines = vec![format_graph_entity_human(&detail.entity)]; lines.push(String::new()); @@ -4488,6 +4670,31 @@ mod tests { } } + #[test] + fn cli_parses_graph_connector_sync_command() { + let cli = Cli::try_parse_from([ + "ecc", + "graph", + "connector-sync", + "hermes_notes", + "--limit", + "32", + "--json", + ]) + .expect("graph connector-sync should parse"); + + match cli.command { + Some(Commands::Graph { + command: GraphCommands::ConnectorSync { name, limit, json }, + }) => { + assert_eq!(name, "hermes_notes"); + assert_eq!(limit, 32); + assert!(json); + } + _ => panic!("expected graph connector-sync subcommand"), + } + } + #[test] fn format_decisions_human_renders_details() { let text = format_decisions_human( @@ -4650,6 +4857,96 @@ mod tests { assert!(text.contains("- observations retained 9")); } + #[test] + fn format_graph_connector_sync_stats_human_renders_counts() { + let text = format_graph_connector_sync_stats_human(&GraphConnectorSyncStats { + connector_name: "hermes_notes".to_string(), + records_read: 4, + entities_upserted: 3, + observations_added: 3, + skipped_records: 1, + }); + + assert!(text.contains("Memory connector sync complete: hermes_notes")); + assert!(text.contains("- records read 4")); + assert!(text.contains("- entities upserted 3")); + assert!(text.contains("- observations added 3")); + assert!(text.contains("- skipped records 1")); + } + + #[test] + fn sync_memory_connector_imports_jsonl_observations() -> Result<()> { + let tempdir = TestDir::new("graph-connector-sync")?; + let db = session::store::StateStore::open(&tempdir.path().join("state.db"))?; + let now = chrono::Utc::now(); + db.insert_session(&session::Session { + id: "session-1".to_string(), + task: "recovery incident".to_string(), + project: "ecc-tools".to_string(), + task_group: "incident".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: session::SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + last_heartbeat_at: now, + metrics: session::SessionMetrics::default(), + })?; + + let connector_path = tempdir.path().join("hermes-memory.jsonl"); + std::fs::write( + &connector_path, + [ + serde_json::json!({ + "entity_name": "Auth callback recovery", + "summary": "Customer wiped setup and got charged twice", + "details": {"customer": "viktor"} + }) + .to_string(), + serde_json::json!({ + "session_id": "latest", + "entity_type": "file", + "entity_name": "callback.ts", + "path": "src/routes/auth/callback.ts", + "observation_type": "incident_note", + "summary": "Recovery flow needs portal-first routing" + }) + .to_string(), + ] + .join("\n"), + )?; + + let mut cfg = config::Config::default(); + cfg.memory_connectors.insert( + "hermes_notes".to_string(), + config::MemoryConnectorConfig::JsonlFile(config::MemoryConnectorJsonlFileConfig { + path: connector_path, + session_id: Some("latest".to_string()), + default_entity_type: Some("incident".to_string()), + default_observation_type: Some("external_note".to_string()), + }), + ); + + let stats = sync_memory_connector(&db, &cfg, "hermes_notes", 10)?; + assert_eq!(stats.records_read, 2); + assert_eq!(stats.entities_upserted, 2); + assert_eq!(stats.observations_added, 2); + assert_eq!(stats.skipped_records, 0); + + let recalled = db.recall_context_entities(None, "charged twice routing", 5)?; + assert_eq!(recalled.len(), 2); + assert!(recalled + .iter() + .any(|entry| entry.entity.name == "Auth callback recovery")); + assert!(recalled + .iter() + .any(|entry| entry.entity.name == "callback.ts")); + + Ok(()) + } + #[test] fn format_graph_sync_stats_human_renders_counts() { let text = format_graph_sync_stats_human( diff --git a/ecc2/src/session/manager.rs b/ecc2/src/session/manager.rs index 2d7254f7..093d7bf1 100644 --- a/ecc2/src/session/manager.rs +++ b/ecc2/src/session/manager.rs @@ -3253,6 +3253,7 @@ mod tests { default_agent_profile: None, agent_profiles: Default::default(), orchestration_templates: Default::default(), + memory_connectors: Default::default(), auto_dispatch_unread_handoffs: false, auto_dispatch_limit_per_session: 5, auto_create_worktrees: true, diff --git a/ecc2/src/tui/dashboard.rs b/ecc2/src/tui/dashboard.rs index b4cf65be..c0a013fa 100644 --- a/ecc2/src/tui/dashboard.rs +++ b/ecc2/src/tui/dashboard.rs @@ -14509,6 +14509,7 @@ diff --git a/src/lib.rs b/src/lib.rs default_agent_profile: None, agent_profiles: Default::default(), orchestration_templates: Default::default(), + memory_connectors: Default::default(), auto_dispatch_unread_handoffs: false, auto_dispatch_limit_per_session: 5, auto_create_worktrees: true,