feat: add ecc2 connector sync checkpoints

This commit is contained in:
Affaan Mustafa 2026-04-10 06:44:05 -07:00
parent 406722b5ef
commit 9523575721
2 changed files with 204 additions and 4 deletions

View File

@ -576,6 +576,7 @@ struct GraphConnectorSyncStats {
entities_upserted: usize,
observations_added: usize,
skipped_records: usize,
skipped_unchanged_sources: usize,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
@ -585,6 +586,7 @@ struct GraphConnectorSyncReport {
entities_upserted: usize,
observations_added: usize,
skipped_records: usize,
skipped_unchanged_sources: usize,
connectors: Vec<GraphConnectorSyncStats>,
}
@ -1672,6 +1674,7 @@ fn sync_all_memory_connectors(
report.entities_upserted += stats.entities_upserted;
report.observations_added += stats.observations_added;
report.skipped_records += stats.skipped_records;
report.skipped_unchanged_sources += stats.skipped_unchanged_sources;
report.connectors.push(stats);
}
@ -1696,8 +1699,17 @@ fn sync_jsonl_memory_connector(
.as_deref()
.map(|value| resolve_session_id(db, value))
.transpose()?;
let source_path = settings.path.display().to_string();
let signature = connector_source_signature(&settings.path)?;
if db.connector_source_is_unchanged(name, &source_path, &signature)? {
return Ok(GraphConnectorSyncStats {
connector_name: name.to_string(),
skipped_unchanged_sources: 1,
..Default::default()
});
}
sync_jsonl_memory_reader(
let stats = sync_jsonl_memory_reader(
db,
name,
reader,
@ -1705,7 +1717,11 @@ fn sync_jsonl_memory_connector(
settings.default_entity_type.as_deref(),
settings.default_observation_type.as_deref(),
limit,
)
)?;
if stats.records_read < limit {
db.upsert_connector_source_checkpoint(name, &source_path, &signature)?;
}
Ok(stats)
}
fn sync_jsonl_directory_memory_connector(
@ -1741,9 +1757,16 @@ fn sync_jsonl_directory_memory_connector(
if remaining == 0 {
break;
}
let source_path = path.display().to_string();
let signature = connector_source_signature(&path)?;
if db.connector_source_is_unchanged(name, &source_path, &signature)? {
stats.skipped_unchanged_sources += 1;
continue;
}
let file = File::open(&path)
.with_context(|| format!("open memory connector file {}", path.display()))?;
let reader = BufReader::new(file);
let remaining_before = remaining;
let file_stats = sync_jsonl_memory_reader(
db,
name,
@ -1758,6 +1781,10 @@ fn sync_jsonl_directory_memory_connector(
stats.entities_upserted += file_stats.entities_upserted;
stats.observations_added += file_stats.observations_added;
stats.skipped_records += file_stats.skipped_records;
stats.skipped_unchanged_sources += file_stats.skipped_unchanged_sources;
if file_stats.records_read < remaining_before {
db.upsert_connector_source_checkpoint(name, &source_path, &signature)?;
}
}
Ok(stats)
@ -1825,7 +1852,16 @@ fn sync_markdown_memory_connector(
.as_deref()
.map(|value| resolve_session_id(db, value))
.transpose()?;
sync_markdown_memory_path(
let source_path = settings.path.display().to_string();
let signature = connector_source_signature(&settings.path)?;
if db.connector_source_is_unchanged(name, &source_path, &signature)? {
return Ok(GraphConnectorSyncStats {
connector_name: name.to_string(),
skipped_unchanged_sources: 1,
..Default::default()
});
}
let stats = sync_markdown_memory_path(
db,
name,
"markdown_file",
@ -1834,7 +1870,11 @@ fn sync_markdown_memory_connector(
settings.default_entity_type.as_deref(),
settings.default_observation_type.as_deref(),
limit,
)
)?;
if stats.records_read < limit {
db.upsert_connector_source_checkpoint(name, &source_path, &signature)?;
}
Ok(stats)
}
fn sync_markdown_directory_memory_connector(
@ -1870,6 +1910,13 @@ fn sync_markdown_directory_memory_connector(
if remaining == 0 {
break;
}
let source_path = path.display().to_string();
let signature = connector_source_signature(&path)?;
if db.connector_source_is_unchanged(name, &source_path, &signature)? {
stats.skipped_unchanged_sources += 1;
continue;
}
let remaining_before = remaining;
let file_stats = sync_markdown_memory_path(
db,
name,
@ -1885,6 +1932,10 @@ fn sync_markdown_directory_memory_connector(
stats.entities_upserted += file_stats.entities_upserted;
stats.observations_added += file_stats.observations_added;
stats.skipped_records += file_stats.skipped_records;
stats.skipped_unchanged_sources += file_stats.skipped_unchanged_sources;
if file_stats.records_read < remaining_before {
db.upsert_connector_source_checkpoint(name, &source_path, &signature)?;
}
}
Ok(stats)
@ -1960,6 +2011,15 @@ fn sync_dotenv_memory_connector(
.as_deref()
.map(|value| resolve_session_id(db, value))
.transpose()?;
let source_path = settings.path.display().to_string();
let signature = connector_source_signature(&settings.path)?;
if db.connector_source_is_unchanged(name, &source_path, &signature)? {
return Ok(GraphConnectorSyncStats {
connector_name: name.to_string(),
skipped_unchanged_sources: 1,
..Default::default()
});
}
let entries = parse_dotenv_memory_entries(&settings.path, &body, settings, limit);
let mut stats = GraphConnectorSyncStats {
connector_name: name.to_string(),
@ -1988,6 +2048,10 @@ fn sync_dotenv_memory_connector(
)?;
}
if stats.records_read < limit {
db.upsert_connector_source_checkpoint(name, &source_path, &signature)?;
}
Ok(stats)
}
@ -2077,6 +2141,18 @@ fn collect_markdown_paths(root: &Path, recurse: bool) -> Result<Vec<PathBuf>> {
Ok(paths)
}
fn connector_source_signature(path: &Path) -> Result<String> {
let metadata = std::fs::metadata(path)
.with_context(|| format!("read memory connector metadata {}", path.display()))?;
let modified = metadata
.modified()
.ok()
.and_then(|timestamp| timestamp.duration_since(std::time::UNIX_EPOCH).ok())
.map(|duration| duration.as_nanos())
.unwrap_or(0);
Ok(format!("{}:{modified}", metadata.len()))
}
fn collect_jsonl_paths_inner(root: &Path, recurse: bool, paths: &mut Vec<PathBuf>) -> Result<()> {
for entry in std::fs::read_dir(root)
.with_context(|| format!("read memory connector directory {}", root.display()))?
@ -3368,6 +3444,10 @@ fn format_graph_connector_sync_stats_human(stats: &GraphConnectorSyncStats) -> S
format!("- entities upserted {}", stats.entities_upserted),
format!("- observations added {}", stats.observations_added),
format!("- skipped records {}", stats.skipped_records),
format!(
"- skipped unchanged sources {}",
stats.skipped_unchanged_sources
),
]
.join("\n")
}
@ -3382,6 +3462,10 @@ fn format_graph_connector_sync_report_human(report: &GraphConnectorSyncReport) -
format!("- entities upserted {}", report.entities_upserted),
format!("- observations added {}", report.observations_added),
format!("- skipped records {}", report.skipped_records),
format!(
"- skipped unchanged sources {}",
report.skipped_unchanged_sources
),
];
if !report.connectors.is_empty() {
@ -3393,6 +3477,10 @@ fn format_graph_connector_sync_report_human(report: &GraphConnectorSyncReport) -
lines.push(format!(" entities upserted {}", stats.entities_upserted));
lines.push(format!(" observations added {}", stats.observations_added));
lines.push(format!(" skipped records {}", stats.skipped_records));
lines.push(format!(
" skipped unchanged sources {}",
stats.skipped_unchanged_sources
));
}
}
@ -5641,6 +5729,7 @@ mod tests {
entities_upserted: 3,
observations_added: 3,
skipped_records: 1,
skipped_unchanged_sources: 2,
});
assert!(text.contains("Memory connector sync complete: hermes_notes"));
@ -5648,6 +5737,7 @@ mod tests {
assert!(text.contains("- entities upserted 3"));
assert!(text.contains("- observations added 3"));
assert!(text.contains("- skipped records 1"));
assert!(text.contains("- skipped unchanged sources 2"));
}
#[test]
@ -5658,6 +5748,7 @@ mod tests {
entities_upserted: 5,
observations_added: 5,
skipped_records: 2,
skipped_unchanged_sources: 3,
connectors: vec![
GraphConnectorSyncStats {
connector_name: "hermes_notes".to_string(),
@ -5665,6 +5756,7 @@ mod tests {
entities_upserted: 3,
observations_added: 3,
skipped_records: 1,
skipped_unchanged_sources: 2,
},
GraphConnectorSyncStats {
connector_name: "workspace_note".to_string(),
@ -5672,15 +5764,18 @@ mod tests {
entities_upserted: 2,
observations_added: 2,
skipped_records: 1,
skipped_unchanged_sources: 1,
},
],
});
assert!(text.contains("Memory connector sync complete: 2 connector(s)"));
assert!(text.contains("- records read 7"));
assert!(text.contains("- skipped unchanged sources 3"));
assert!(text.contains("Connectors:"));
assert!(text.contains("- hermes_notes"));
assert!(text.contains("- workspace_note"));
assert!(text.contains(" skipped unchanged sources 2"));
}
#[test]
@ -5756,6 +5851,61 @@ mod tests {
Ok(())
}
#[test]
fn sync_memory_connector_skips_unchanged_jsonl_sources() -> Result<()> {
let tempdir = TestDir::new("graph-connector-sync-unchanged")?;
let db = session::store::StateStore::open(&tempdir.path().join("state.db"))?;
let now = chrono::Utc::now();
db.insert_session(&session::Session {
id: "session-1".to_string(),
task: "recovery incident".to_string(),
project: "ecc-tools".to_string(),
task_group: "incident".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: session::SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: session::SessionMetrics::default(),
})?;
let connector_path = tempdir.path().join("hermes-memory.jsonl");
fs::write(
&connector_path,
serde_json::json!({
"entity_name": "Portal routing",
"summary": "Route reinstalls to portal before checkout",
})
.to_string(),
)?;
let mut cfg = config::Config::default();
cfg.memory_connectors.insert(
"hermes_notes".to_string(),
config::MemoryConnectorConfig::JsonlFile(config::MemoryConnectorJsonlFileConfig {
path: connector_path,
session_id: Some("latest".to_string()),
default_entity_type: Some("incident".to_string()),
default_observation_type: Some("external_note".to_string()),
}),
);
let first = sync_memory_connector(&db, &cfg, "hermes_notes", 10)?;
assert_eq!(first.records_read, 1);
assert_eq!(first.skipped_unchanged_sources, 0);
let second = sync_memory_connector(&db, &cfg, "hermes_notes", 10)?;
assert_eq!(second.records_read, 0);
assert_eq!(second.entities_upserted, 0);
assert_eq!(second.observations_added, 0);
assert_eq!(second.skipped_unchanged_sources, 1);
Ok(())
}
#[test]
fn sync_memory_connector_imports_jsonl_directory_observations() -> Result<()> {
let tempdir = TestDir::new("graph-connector-sync-dir")?;

View File

@ -272,6 +272,14 @@ impl StateStore {
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS context_graph_connector_checkpoints (
connector_name TEXT NOT NULL,
source_path TEXT NOT NULL,
source_signature TEXT NOT NULL,
updated_at TEXT NOT NULL,
PRIMARY KEY (connector_name, source_path)
);
CREATE TABLE IF NOT EXISTS pending_worktree_queue (
session_id TEXT PRIMARY KEY REFERENCES sessions(id) ON DELETE CASCADE,
repo_root TEXT NOT NULL,
@ -334,6 +342,8 @@ impl StateStore {
ON context_graph_relations(to_entity_id, created_at, id);
CREATE INDEX IF NOT EXISTS idx_context_graph_observations_entity
ON context_graph_observations(entity_id, created_at, id);
CREATE INDEX IF NOT EXISTS idx_context_graph_connector_checkpoints_updated_at
ON context_graph_connector_checkpoints(updated_at, connector_name, source_path);
CREATE INDEX IF NOT EXISTS idx_conflict_incidents_sessions
ON conflict_incidents(first_session_id, second_session_id, resolved_at, updated_at);
CREATE INDEX IF NOT EXISTS idx_pending_worktree_queue_requested_at
@ -2304,6 +2314,46 @@ impl StateStore {
Ok(entries)
}
pub fn connector_source_is_unchanged(
&self,
connector_name: &str,
source_path: &str,
source_signature: &str,
) -> Result<bool> {
let stored_signature = self
.conn
.query_row(
"SELECT source_signature
FROM context_graph_connector_checkpoints
WHERE connector_name = ?1 AND source_path = ?2",
rusqlite::params![connector_name, source_path],
|row| row.get::<_, String>(0),
)
.optional()?;
Ok(stored_signature
.as_deref()
.is_some_and(|stored| stored == source_signature))
}
pub fn upsert_connector_source_checkpoint(
&self,
connector_name: &str,
source_path: &str,
source_signature: &str,
) -> Result<()> {
let now = chrono::Utc::now().to_rfc3339();
self.conn.execute(
"INSERT INTO context_graph_connector_checkpoints (
connector_name, source_path, source_signature, updated_at
) VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(connector_name, source_path)
DO UPDATE SET source_signature = excluded.source_signature,
updated_at = excluded.updated_at",
rusqlite::params![connector_name, source_path, source_signature, now],
)?;
Ok(())
}
fn compact_context_graph_observations(
&self,
session_id: Option<&str>,