From 9523575721cba6285c95d688f08736017e0d2615 Mon Sep 17 00:00:00 2001 From: Affaan Mustafa Date: Fri, 10 Apr 2026 06:44:05 -0700 Subject: [PATCH] feat: add ecc2 connector sync checkpoints --- ecc2/src/main.rs | 158 +++++++++++++++++++++++++++++++++++++- ecc2/src/session/store.rs | 50 ++++++++++++ 2 files changed, 204 insertions(+), 4 deletions(-) diff --git a/ecc2/src/main.rs b/ecc2/src/main.rs index e0b4ea36..982b5ef2 100644 --- a/ecc2/src/main.rs +++ b/ecc2/src/main.rs @@ -576,6 +576,7 @@ struct GraphConnectorSyncStats { entities_upserted: usize, observations_added: usize, skipped_records: usize, + skipped_unchanged_sources: usize, } #[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] @@ -585,6 +586,7 @@ struct GraphConnectorSyncReport { entities_upserted: usize, observations_added: usize, skipped_records: usize, + skipped_unchanged_sources: usize, connectors: Vec, } @@ -1672,6 +1674,7 @@ fn sync_all_memory_connectors( report.entities_upserted += stats.entities_upserted; report.observations_added += stats.observations_added; report.skipped_records += stats.skipped_records; + report.skipped_unchanged_sources += stats.skipped_unchanged_sources; report.connectors.push(stats); } @@ -1696,8 +1699,17 @@ fn sync_jsonl_memory_connector( .as_deref() .map(|value| resolve_session_id(db, value)) .transpose()?; + let source_path = settings.path.display().to_string(); + let signature = connector_source_signature(&settings.path)?; + if db.connector_source_is_unchanged(name, &source_path, &signature)? { + return Ok(GraphConnectorSyncStats { + connector_name: name.to_string(), + skipped_unchanged_sources: 1, + ..Default::default() + }); + } - sync_jsonl_memory_reader( + let stats = sync_jsonl_memory_reader( db, name, reader, @@ -1705,7 +1717,11 @@ fn sync_jsonl_memory_connector( settings.default_entity_type.as_deref(), settings.default_observation_type.as_deref(), limit, - ) + )?; + if stats.records_read < limit { + db.upsert_connector_source_checkpoint(name, &source_path, &signature)?; + } + Ok(stats) } fn sync_jsonl_directory_memory_connector( @@ -1741,9 +1757,16 @@ fn sync_jsonl_directory_memory_connector( if remaining == 0 { break; } + let source_path = path.display().to_string(); + let signature = connector_source_signature(&path)?; + if db.connector_source_is_unchanged(name, &source_path, &signature)? { + stats.skipped_unchanged_sources += 1; + continue; + } let file = File::open(&path) .with_context(|| format!("open memory connector file {}", path.display()))?; let reader = BufReader::new(file); + let remaining_before = remaining; let file_stats = sync_jsonl_memory_reader( db, name, @@ -1758,6 +1781,10 @@ fn sync_jsonl_directory_memory_connector( stats.entities_upserted += file_stats.entities_upserted; stats.observations_added += file_stats.observations_added; stats.skipped_records += file_stats.skipped_records; + stats.skipped_unchanged_sources += file_stats.skipped_unchanged_sources; + if file_stats.records_read < remaining_before { + db.upsert_connector_source_checkpoint(name, &source_path, &signature)?; + } } Ok(stats) @@ -1825,7 +1852,16 @@ fn sync_markdown_memory_connector( .as_deref() .map(|value| resolve_session_id(db, value)) .transpose()?; - sync_markdown_memory_path( + let source_path = settings.path.display().to_string(); + let signature = connector_source_signature(&settings.path)?; + if db.connector_source_is_unchanged(name, &source_path, &signature)? { + return Ok(GraphConnectorSyncStats { + connector_name: name.to_string(), + skipped_unchanged_sources: 1, + ..Default::default() + }); + } + let stats = sync_markdown_memory_path( db, name, "markdown_file", @@ -1834,7 +1870,11 @@ fn sync_markdown_memory_connector( settings.default_entity_type.as_deref(), settings.default_observation_type.as_deref(), limit, - ) + )?; + if stats.records_read < limit { + db.upsert_connector_source_checkpoint(name, &source_path, &signature)?; + } + Ok(stats) } fn sync_markdown_directory_memory_connector( @@ -1870,6 +1910,13 @@ fn sync_markdown_directory_memory_connector( if remaining == 0 { break; } + let source_path = path.display().to_string(); + let signature = connector_source_signature(&path)?; + if db.connector_source_is_unchanged(name, &source_path, &signature)? { + stats.skipped_unchanged_sources += 1; + continue; + } + let remaining_before = remaining; let file_stats = sync_markdown_memory_path( db, name, @@ -1885,6 +1932,10 @@ fn sync_markdown_directory_memory_connector( stats.entities_upserted += file_stats.entities_upserted; stats.observations_added += file_stats.observations_added; stats.skipped_records += file_stats.skipped_records; + stats.skipped_unchanged_sources += file_stats.skipped_unchanged_sources; + if file_stats.records_read < remaining_before { + db.upsert_connector_source_checkpoint(name, &source_path, &signature)?; + } } Ok(stats) @@ -1960,6 +2011,15 @@ fn sync_dotenv_memory_connector( .as_deref() .map(|value| resolve_session_id(db, value)) .transpose()?; + let source_path = settings.path.display().to_string(); + let signature = connector_source_signature(&settings.path)?; + if db.connector_source_is_unchanged(name, &source_path, &signature)? { + return Ok(GraphConnectorSyncStats { + connector_name: name.to_string(), + skipped_unchanged_sources: 1, + ..Default::default() + }); + } let entries = parse_dotenv_memory_entries(&settings.path, &body, settings, limit); let mut stats = GraphConnectorSyncStats { connector_name: name.to_string(), @@ -1988,6 +2048,10 @@ fn sync_dotenv_memory_connector( )?; } + if stats.records_read < limit { + db.upsert_connector_source_checkpoint(name, &source_path, &signature)?; + } + Ok(stats) } @@ -2077,6 +2141,18 @@ fn collect_markdown_paths(root: &Path, recurse: bool) -> Result> { Ok(paths) } +fn connector_source_signature(path: &Path) -> Result { + let metadata = std::fs::metadata(path) + .with_context(|| format!("read memory connector metadata {}", path.display()))?; + let modified = metadata + .modified() + .ok() + .and_then(|timestamp| timestamp.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|duration| duration.as_nanos()) + .unwrap_or(0); + Ok(format!("{}:{modified}", metadata.len())) +} + fn collect_jsonl_paths_inner(root: &Path, recurse: bool, paths: &mut Vec) -> Result<()> { for entry in std::fs::read_dir(root) .with_context(|| format!("read memory connector directory {}", root.display()))? @@ -3368,6 +3444,10 @@ fn format_graph_connector_sync_stats_human(stats: &GraphConnectorSyncStats) -> S format!("- entities upserted {}", stats.entities_upserted), format!("- observations added {}", stats.observations_added), format!("- skipped records {}", stats.skipped_records), + format!( + "- skipped unchanged sources {}", + stats.skipped_unchanged_sources + ), ] .join("\n") } @@ -3382,6 +3462,10 @@ fn format_graph_connector_sync_report_human(report: &GraphConnectorSyncReport) - format!("- entities upserted {}", report.entities_upserted), format!("- observations added {}", report.observations_added), format!("- skipped records {}", report.skipped_records), + format!( + "- skipped unchanged sources {}", + report.skipped_unchanged_sources + ), ]; if !report.connectors.is_empty() { @@ -3393,6 +3477,10 @@ fn format_graph_connector_sync_report_human(report: &GraphConnectorSyncReport) - lines.push(format!(" entities upserted {}", stats.entities_upserted)); lines.push(format!(" observations added {}", stats.observations_added)); lines.push(format!(" skipped records {}", stats.skipped_records)); + lines.push(format!( + " skipped unchanged sources {}", + stats.skipped_unchanged_sources + )); } } @@ -5641,6 +5729,7 @@ mod tests { entities_upserted: 3, observations_added: 3, skipped_records: 1, + skipped_unchanged_sources: 2, }); assert!(text.contains("Memory connector sync complete: hermes_notes")); @@ -5648,6 +5737,7 @@ mod tests { assert!(text.contains("- entities upserted 3")); assert!(text.contains("- observations added 3")); assert!(text.contains("- skipped records 1")); + assert!(text.contains("- skipped unchanged sources 2")); } #[test] @@ -5658,6 +5748,7 @@ mod tests { entities_upserted: 5, observations_added: 5, skipped_records: 2, + skipped_unchanged_sources: 3, connectors: vec![ GraphConnectorSyncStats { connector_name: "hermes_notes".to_string(), @@ -5665,6 +5756,7 @@ mod tests { entities_upserted: 3, observations_added: 3, skipped_records: 1, + skipped_unchanged_sources: 2, }, GraphConnectorSyncStats { connector_name: "workspace_note".to_string(), @@ -5672,15 +5764,18 @@ mod tests { entities_upserted: 2, observations_added: 2, skipped_records: 1, + skipped_unchanged_sources: 1, }, ], }); assert!(text.contains("Memory connector sync complete: 2 connector(s)")); assert!(text.contains("- records read 7")); + assert!(text.contains("- skipped unchanged sources 3")); assert!(text.contains("Connectors:")); assert!(text.contains("- hermes_notes")); assert!(text.contains("- workspace_note")); + assert!(text.contains(" skipped unchanged sources 2")); } #[test] @@ -5756,6 +5851,61 @@ mod tests { Ok(()) } + #[test] + fn sync_memory_connector_skips_unchanged_jsonl_sources() -> Result<()> { + let tempdir = TestDir::new("graph-connector-sync-unchanged")?; + let db = session::store::StateStore::open(&tempdir.path().join("state.db"))?; + let now = chrono::Utc::now(); + db.insert_session(&session::Session { + id: "session-1".to_string(), + task: "recovery incident".to_string(), + project: "ecc-tools".to_string(), + task_group: "incident".to_string(), + agent_type: "claude".to_string(), + working_dir: PathBuf::from("/tmp"), + state: session::SessionState::Running, + pid: None, + worktree: None, + created_at: now, + updated_at: now, + last_heartbeat_at: now, + metrics: session::SessionMetrics::default(), + })?; + + let connector_path = tempdir.path().join("hermes-memory.jsonl"); + fs::write( + &connector_path, + serde_json::json!({ + "entity_name": "Portal routing", + "summary": "Route reinstalls to portal before checkout", + }) + .to_string(), + )?; + + let mut cfg = config::Config::default(); + cfg.memory_connectors.insert( + "hermes_notes".to_string(), + config::MemoryConnectorConfig::JsonlFile(config::MemoryConnectorJsonlFileConfig { + path: connector_path, + session_id: Some("latest".to_string()), + default_entity_type: Some("incident".to_string()), + default_observation_type: Some("external_note".to_string()), + }), + ); + + let first = sync_memory_connector(&db, &cfg, "hermes_notes", 10)?; + assert_eq!(first.records_read, 1); + assert_eq!(first.skipped_unchanged_sources, 0); + + let second = sync_memory_connector(&db, &cfg, "hermes_notes", 10)?; + assert_eq!(second.records_read, 0); + assert_eq!(second.entities_upserted, 0); + assert_eq!(second.observations_added, 0); + assert_eq!(second.skipped_unchanged_sources, 1); + + Ok(()) + } + #[test] fn sync_memory_connector_imports_jsonl_directory_observations() -> Result<()> { let tempdir = TestDir::new("graph-connector-sync-dir")?; diff --git a/ecc2/src/session/store.rs b/ecc2/src/session/store.rs index 31d93ce6..356131d9 100644 --- a/ecc2/src/session/store.rs +++ b/ecc2/src/session/store.rs @@ -272,6 +272,14 @@ impl StateStore { created_at TEXT NOT NULL ); + CREATE TABLE IF NOT EXISTS context_graph_connector_checkpoints ( + connector_name TEXT NOT NULL, + source_path TEXT NOT NULL, + source_signature TEXT NOT NULL, + updated_at TEXT NOT NULL, + PRIMARY KEY (connector_name, source_path) + ); + CREATE TABLE IF NOT EXISTS pending_worktree_queue ( session_id TEXT PRIMARY KEY REFERENCES sessions(id) ON DELETE CASCADE, repo_root TEXT NOT NULL, @@ -334,6 +342,8 @@ impl StateStore { ON context_graph_relations(to_entity_id, created_at, id); CREATE INDEX IF NOT EXISTS idx_context_graph_observations_entity ON context_graph_observations(entity_id, created_at, id); + CREATE INDEX IF NOT EXISTS idx_context_graph_connector_checkpoints_updated_at + ON context_graph_connector_checkpoints(updated_at, connector_name, source_path); CREATE INDEX IF NOT EXISTS idx_conflict_incidents_sessions ON conflict_incidents(first_session_id, second_session_id, resolved_at, updated_at); CREATE INDEX IF NOT EXISTS idx_pending_worktree_queue_requested_at @@ -2304,6 +2314,46 @@ impl StateStore { Ok(entries) } + pub fn connector_source_is_unchanged( + &self, + connector_name: &str, + source_path: &str, + source_signature: &str, + ) -> Result { + let stored_signature = self + .conn + .query_row( + "SELECT source_signature + FROM context_graph_connector_checkpoints + WHERE connector_name = ?1 AND source_path = ?2", + rusqlite::params![connector_name, source_path], + |row| row.get::<_, String>(0), + ) + .optional()?; + Ok(stored_signature + .as_deref() + .is_some_and(|stored| stored == source_signature)) + } + + pub fn upsert_connector_source_checkpoint( + &self, + connector_name: &str, + source_path: &str, + source_signature: &str, + ) -> Result<()> { + let now = chrono::Utc::now().to_rfc3339(); + self.conn.execute( + "INSERT INTO context_graph_connector_checkpoints ( + connector_name, source_path, source_signature, updated_at + ) VALUES (?1, ?2, ?3, ?4) + ON CONFLICT(connector_name, source_path) + DO UPDATE SET source_signature = excluded.source_signature, + updated_at = excluded.updated_at", + rusqlite::params![connector_name, source_path, source_signature, now], + )?; + Ok(()) + } + fn compact_context_graph_observations( &self, session_id: Option<&str>,