diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index 7dcceb0d..b85ba625 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -507,7 +507,11 @@ enum GraphCommands { /// Import external memory from a configured connector ConnectorSync { /// Connector name from ecc2.toml - name: String, + #[arg(required_unless_present = "all", conflicts_with = "all")] + name: Option, + /// Sync every configured memory connector + #[arg(long, required_unless_present = "name")] + all: bool, /// Maximum non-empty records to process #[arg(long, default_value_t = 256)] limit: usize, @@ -574,6 +578,16 @@ struct GraphConnectorSyncStats { skipped_records: usize, } +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +struct GraphConnectorSyncReport { + connectors_synced: usize, + records_read: usize, + entities_upserted: usize, + observations_added: usize, + skipped_records: usize, + connectors: Vec, +} + #[derive(Debug, Clone, Default, Deserialize)] #[serde(default)] struct JsonlMemoryConnectorRecord { @@ -1409,12 +1423,29 @@ async fn main() -> Result<()> { ); } } - GraphCommands::ConnectorSync { name, limit, json } => { - let stats = sync_memory_connector(&db, &cfg, &name, limit)?; - if json { - println!("{}", serde_json::to_string_pretty(&stats)?); + GraphCommands::ConnectorSync { + name, + all, + limit, + json, + } => { + if all { + let report = sync_all_memory_connectors(&db, &cfg, limit)?; + if json { + println!("{}", serde_json::to_string_pretty(&report)?); + } else { + println!("{}", format_graph_connector_sync_report_human(&report)); + } } else { - println!("{}", format_graph_connector_sync_stats_human(&stats)); + let name = name.as_deref().ok_or_else(|| { + anyhow::anyhow!("connector name required unless --all is set") + })?; + let stats = sync_memory_connector(&db, &cfg, name, limit)?; + if json { + println!("{}", serde_json::to_string_pretty(&stats)?); + } else { + println!("{}", format_graph_connector_sync_stats_human(&stats)); + } } } GraphCommands::Recall { @@ -1624,6 +1655,26 @@ fn sync_memory_connector( } } +fn sync_all_memory_connectors( + db: &session::store::StateStore, + cfg: &config::Config, + limit: usize, +) -> Result { + let mut report = GraphConnectorSyncReport::default(); + + for name in cfg.memory_connectors.keys() { + let stats = sync_memory_connector(db, cfg, name, limit)?; + report.connectors_synced += 1; + report.records_read += stats.records_read; + report.entities_upserted += stats.entities_upserted; + report.observations_added += stats.observations_added; + report.skipped_records += stats.skipped_records; + report.connectors.push(stats); + } + + Ok(report) +} + fn sync_jsonl_memory_connector( db: &session::store::StateStore, name: &str, @@ -3210,6 +3261,33 @@ fn format_graph_connector_sync_stats_human(stats: &GraphConnectorSyncStats) -> S .join("\n") } +fn format_graph_connector_sync_report_human(report: &GraphConnectorSyncReport) -> String { + let mut lines = vec![ + format!( + "Memory connector sync complete: {} connector(s)", + report.connectors_synced + ), + format!("- records read {}", report.records_read), + format!("- entities upserted {}", report.entities_upserted), + format!("- observations added {}", report.observations_added), + format!("- skipped records {}", report.skipped_records), + ]; + + if !report.connectors.is_empty() { + lines.push(String::new()); + lines.push("Connectors:".to_string()); + for stats in &report.connectors { + lines.push(format!("- {}", stats.connector_name)); + lines.push(format!(" records read {}", stats.records_read)); + lines.push(format!(" entities upserted {}", stats.entities_upserted)); + lines.push(format!(" observations added {}", stats.observations_added)); + lines.push(format!(" skipped records {}", stats.skipped_records)); + } + } + + lines.join("\n") +} + fn format_graph_entity_detail_human(detail: &session::ContextGraphEntityDetail) -> String { let mut lines = vec![format_graph_entity_human(&detail.entity)]; lines.push(String::new()); @@ -5233,9 +5311,16 @@ mod tests { match cli.command { Some(Commands::Graph { - command: GraphCommands::ConnectorSync { name, limit, json }, + command: + GraphCommands::ConnectorSync { + name, + all, + limit, + json, + }, }) => { - assert_eq!(name, "hermes_notes"); + assert_eq!(name.as_deref(), Some("hermes_notes")); + assert!(!all); assert_eq!(limit, 32); assert!(json); } @@ -5243,6 +5328,38 @@ mod tests { } } + #[test] + fn cli_parses_graph_connector_sync_all_command() { + let cli = Cli::try_parse_from([ + "ecc", + "graph", + "connector-sync", + "--all", + "--limit", + "16", + "--json", + ]) + .expect("graph connector-sync --all should parse"); + + match cli.command { + Some(Commands::Graph { + command: + GraphCommands::ConnectorSync { + name, + all, + limit, + json, + }, + }) => { + assert_eq!(name, None); + assert!(all); + assert_eq!(limit, 16); + assert!(json); + } + _ => panic!("expected graph connector-sync --all subcommand"), + } + } + #[test] fn format_decisions_human_renders_details() { let text = format_decisions_human( @@ -5422,6 +5539,39 @@ mod tests { assert!(text.contains("- skipped records 1")); } + #[test] + fn format_graph_connector_sync_report_human_renders_totals_and_connectors() { + let text = format_graph_connector_sync_report_human(&GraphConnectorSyncReport { + connectors_synced: 2, + records_read: 7, + entities_upserted: 5, + observations_added: 5, + skipped_records: 2, + connectors: vec![ + GraphConnectorSyncStats { + connector_name: "hermes_notes".to_string(), + records_read: 4, + entities_upserted: 3, + observations_added: 3, + skipped_records: 1, + }, + GraphConnectorSyncStats { + connector_name: "workspace_note".to_string(), + records_read: 3, + entities_upserted: 2, + observations_added: 2, + skipped_records: 1, + }, + ], + }); + + assert!(text.contains("Memory connector sync complete: 2 connector(s)")); + assert!(text.contains("- records read 7")); + assert!(text.contains("Connectors:")); + assert!(text.contains("- hermes_notes")); + assert!(text.contains("- workspace_note")); + } + #[test] fn sync_memory_connector_imports_jsonl_observations() -> Result<()> { let tempdir = TestDir::new("graph-connector-sync")?; @@ -5774,6 +5924,91 @@ INVALID LINE Ok(()) } + #[test] + fn sync_all_memory_connectors_aggregates_results() -> Result<()> { + let tempdir = TestDir::new("graph-connector-sync-all")?; + let db = session::store::StateStore::open(&tempdir.path().join("state.db"))?; + let now = chrono::Utc::now(); + db.insert_session(&session::Session { + id: "session-1".to_string(), + task: "memory import".to_string(), + project: "everything-claude-code".to_string(), + task_group: "memory".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: session::SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + last_heartbeat_at: now, + metrics: session::SessionMetrics::default(), + })?; + + let jsonl_path = tempdir.path().join("hermes-memory.jsonl"); + fs::write( + &jsonl_path, + serde_json::json!({ + "entity_name": "Portal routing", + "summary": "Route reinstalls to portal before checkout", + }) + .to_string(), + )?; + + let markdown_path = tempdir.path().join("workspace-memory.md"); + fs::write( + &markdown_path, + r#"# Billing incident +Customer wiped setup and got charged twice after reinstalling. + +## Docs fix +Guide users to repair before reinstall. +"#, + )?; + + let mut cfg = config::Config::default(); + cfg.memory_connectors.insert( + "hermes_notes".to_string(), + config::MemoryConnectorConfig::JsonlFile(config::MemoryConnectorJsonlFileConfig { + path: jsonl_path, + session_id: Some("latest".to_string()), + default_entity_type: Some("incident".to_string()), + default_observation_type: Some("external_note".to_string()), + }), + ); + cfg.memory_connectors.insert( + "workspace_note".to_string(), + config::MemoryConnectorConfig::MarkdownFile( + config::MemoryConnectorMarkdownFileConfig { + path: markdown_path, + session_id: Some("latest".to_string()), + default_entity_type: Some("note_section".to_string()), + default_observation_type: Some("external_note".to_string()), + }, + ), + ); + + let report = sync_all_memory_connectors(&db, &cfg, 10)?; + assert_eq!(report.connectors_synced, 2); + assert_eq!(report.records_read, 3); + assert_eq!(report.entities_upserted, 3); + assert_eq!(report.observations_added, 3); + assert_eq!(report.skipped_records, 0); + assert_eq!( + report + .connectors + .iter() + .map(|stats| stats.connector_name.as_str()) + .collect::>(), + vec!["hermes_notes", "workspace_note"] + ); + + let recalled = db.recall_context_entities(None, "charged twice portal reinstall", 10)?; + assert_eq!(recalled.len(), 3); + + Ok(()) + } + #[test] fn format_graph_sync_stats_human_renders_counts() { let text = format_graph_sync_stats_human(