feat: add ecc2 bulk memory connector sync

This commit is contained in:
Affaan Mustafa 2026-04-10 06:34:40 -07:00
parent 966af37f89
commit 5258a75382

View File

@ -507,7 +507,11 @@ enum GraphCommands {
/// Import external memory from a configured connector
ConnectorSync {
/// Connector name from ecc2.toml
name: String,
#[arg(required_unless_present = "all", conflicts_with = "all")]
name: Option<String>,
/// Sync every configured memory connector
#[arg(long, required_unless_present = "name")]
all: bool,
/// Maximum non-empty records to process
#[arg(long, default_value_t = 256)]
limit: usize,
@ -574,6 +578,16 @@ struct GraphConnectorSyncStats {
skipped_records: usize,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
struct GraphConnectorSyncReport {
connectors_synced: usize,
records_read: usize,
entities_upserted: usize,
observations_added: usize,
skipped_records: usize,
connectors: Vec<GraphConnectorSyncStats>,
}
#[derive(Debug, Clone, Default, Deserialize)]
#[serde(default)]
struct JsonlMemoryConnectorRecord {
@ -1409,12 +1423,29 @@ async fn main() -> Result<()> {
);
}
}
GraphCommands::ConnectorSync { name, limit, json } => {
let stats = sync_memory_connector(&db, &cfg, &name, limit)?;
if json {
println!("{}", serde_json::to_string_pretty(&stats)?);
GraphCommands::ConnectorSync {
name,
all,
limit,
json,
} => {
if all {
let report = sync_all_memory_connectors(&db, &cfg, limit)?;
if json {
println!("{}", serde_json::to_string_pretty(&report)?);
} else {
println!("{}", format_graph_connector_sync_report_human(&report));
}
} else {
println!("{}", format_graph_connector_sync_stats_human(&stats));
let name = name.as_deref().ok_or_else(|| {
anyhow::anyhow!("connector name required unless --all is set")
})?;
let stats = sync_memory_connector(&db, &cfg, name, limit)?;
if json {
println!("{}", serde_json::to_string_pretty(&stats)?);
} else {
println!("{}", format_graph_connector_sync_stats_human(&stats));
}
}
}
GraphCommands::Recall {
@ -1624,6 +1655,26 @@ fn sync_memory_connector(
}
}
fn sync_all_memory_connectors(
db: &session::store::StateStore,
cfg: &config::Config,
limit: usize,
) -> Result<GraphConnectorSyncReport> {
let mut report = GraphConnectorSyncReport::default();
for name in cfg.memory_connectors.keys() {
let stats = sync_memory_connector(db, cfg, name, limit)?;
report.connectors_synced += 1;
report.records_read += stats.records_read;
report.entities_upserted += stats.entities_upserted;
report.observations_added += stats.observations_added;
report.skipped_records += stats.skipped_records;
report.connectors.push(stats);
}
Ok(report)
}
fn sync_jsonl_memory_connector(
db: &session::store::StateStore,
name: &str,
@ -3210,6 +3261,33 @@ fn format_graph_connector_sync_stats_human(stats: &GraphConnectorSyncStats) -> S
.join("\n")
}
fn format_graph_connector_sync_report_human(report: &GraphConnectorSyncReport) -> String {
let mut lines = vec![
format!(
"Memory connector sync complete: {} connector(s)",
report.connectors_synced
),
format!("- records read {}", report.records_read),
format!("- entities upserted {}", report.entities_upserted),
format!("- observations added {}", report.observations_added),
format!("- skipped records {}", report.skipped_records),
];
if !report.connectors.is_empty() {
lines.push(String::new());
lines.push("Connectors:".to_string());
for stats in &report.connectors {
lines.push(format!("- {}", stats.connector_name));
lines.push(format!(" records read {}", stats.records_read));
lines.push(format!(" entities upserted {}", stats.entities_upserted));
lines.push(format!(" observations added {}", stats.observations_added));
lines.push(format!(" skipped records {}", stats.skipped_records));
}
}
lines.join("\n")
}
fn format_graph_entity_detail_human(detail: &session::ContextGraphEntityDetail) -> String {
let mut lines = vec![format_graph_entity_human(&detail.entity)];
lines.push(String::new());
@ -5233,9 +5311,16 @@ mod tests {
match cli.command {
Some(Commands::Graph {
command: GraphCommands::ConnectorSync { name, limit, json },
command:
GraphCommands::ConnectorSync {
name,
all,
limit,
json,
},
}) => {
assert_eq!(name, "hermes_notes");
assert_eq!(name.as_deref(), Some("hermes_notes"));
assert!(!all);
assert_eq!(limit, 32);
assert!(json);
}
@ -5243,6 +5328,38 @@ mod tests {
}
}
#[test]
fn cli_parses_graph_connector_sync_all_command() {
let cli = Cli::try_parse_from([
"ecc",
"graph",
"connector-sync",
"--all",
"--limit",
"16",
"--json",
])
.expect("graph connector-sync --all should parse");
match cli.command {
Some(Commands::Graph {
command:
GraphCommands::ConnectorSync {
name,
all,
limit,
json,
},
}) => {
assert_eq!(name, None);
assert!(all);
assert_eq!(limit, 16);
assert!(json);
}
_ => panic!("expected graph connector-sync --all subcommand"),
}
}
#[test]
fn format_decisions_human_renders_details() {
let text = format_decisions_human(
@ -5422,6 +5539,39 @@ mod tests {
assert!(text.contains("- skipped records 1"));
}
#[test]
fn format_graph_connector_sync_report_human_renders_totals_and_connectors() {
let text = format_graph_connector_sync_report_human(&GraphConnectorSyncReport {
connectors_synced: 2,
records_read: 7,
entities_upserted: 5,
observations_added: 5,
skipped_records: 2,
connectors: vec![
GraphConnectorSyncStats {
connector_name: "hermes_notes".to_string(),
records_read: 4,
entities_upserted: 3,
observations_added: 3,
skipped_records: 1,
},
GraphConnectorSyncStats {
connector_name: "workspace_note".to_string(),
records_read: 3,
entities_upserted: 2,
observations_added: 2,
skipped_records: 1,
},
],
});
assert!(text.contains("Memory connector sync complete: 2 connector(s)"));
assert!(text.contains("- records read 7"));
assert!(text.contains("Connectors:"));
assert!(text.contains("- hermes_notes"));
assert!(text.contains("- workspace_note"));
}
#[test]
fn sync_memory_connector_imports_jsonl_observations() -> Result<()> {
let tempdir = TestDir::new("graph-connector-sync")?;
@ -5774,6 +5924,91 @@ INVALID LINE
Ok(())
}
#[test]
fn sync_all_memory_connectors_aggregates_results() -> Result<()> {
let tempdir = TestDir::new("graph-connector-sync-all")?;
let db = session::store::StateStore::open(&tempdir.path().join("state.db"))?;
let now = chrono::Utc::now();
db.insert_session(&session::Session {
id: "session-1".to_string(),
task: "memory import".to_string(),
project: "everything-claude-code".to_string(),
task_group: "memory".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: session::SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: session::SessionMetrics::default(),
})?;
let jsonl_path = tempdir.path().join("hermes-memory.jsonl");
fs::write(
&jsonl_path,
serde_json::json!({
"entity_name": "Portal routing",
"summary": "Route reinstalls to portal before checkout",
})
.to_string(),
)?;
let markdown_path = tempdir.path().join("workspace-memory.md");
fs::write(
&markdown_path,
r#"# Billing incident
Customer wiped setup and got charged twice after reinstalling.
## Docs fix
Guide users to repair before reinstall.
"#,
)?;
let mut cfg = config::Config::default();
cfg.memory_connectors.insert(
"hermes_notes".to_string(),
config::MemoryConnectorConfig::JsonlFile(config::MemoryConnectorJsonlFileConfig {
path: jsonl_path,
session_id: Some("latest".to_string()),
default_entity_type: Some("incident".to_string()),
default_observation_type: Some("external_note".to_string()),
}),
);
cfg.memory_connectors.insert(
"workspace_note".to_string(),
config::MemoryConnectorConfig::MarkdownFile(
config::MemoryConnectorMarkdownFileConfig {
path: markdown_path,
session_id: Some("latest".to_string()),
default_entity_type: Some("note_section".to_string()),
default_observation_type: Some("external_note".to_string()),
},
),
);
let report = sync_all_memory_connectors(&db, &cfg, 10)?;
assert_eq!(report.connectors_synced, 2);
assert_eq!(report.records_read, 3);
assert_eq!(report.entities_upserted, 3);
assert_eq!(report.observations_added, 3);
assert_eq!(report.skipped_records, 0);
assert_eq!(
report
.connectors
.iter()
.map(|stats| stats.connector_name.as_str())
.collect::<Vec<_>>(),
vec!["hermes_notes", "workspace_note"]
);
let recalled = db.recall_context_entities(None, "charged twice portal reinstall", 10)?;
assert_eq!(recalled.len(), 3);
Ok(())
}
#[test]
fn format_graph_sync_stats_human_renders_counts() {
let text = format_graph_sync_stats_human(