feat: add ecc2 markdown memory connectors

This commit is contained in:
Affaan Mustafa 2026-04-10 06:26:42 -07:00
parent d3b680b6db
commit 22a5a8de6d
2 changed files with 438 additions and 60 deletions

View File

@ -108,6 +108,7 @@ pub struct OrchestrationTemplateStepConfig {
pub enum MemoryConnectorConfig {
JsonlFile(MemoryConnectorJsonlFileConfig),
JsonlDirectory(MemoryConnectorJsonlDirectoryConfig),
MarkdownFile(MemoryConnectorMarkdownFileConfig),
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
@ -129,6 +130,15 @@ pub struct MemoryConnectorJsonlDirectoryConfig {
pub default_observation_type: Option<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct MemoryConnectorMarkdownFileConfig {
pub path: PathBuf,
pub session_id: Option<String>,
pub default_entity_type: Option<String>,
pub default_observation_type: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ResolvedOrchestrationTemplate {
pub template_name: String,
@ -1323,6 +1333,41 @@ default_observation_type = "external_note"
}
}
#[test]
fn memory_markdown_file_connectors_deserialize_from_toml() {
let config: Config = toml::from_str(
r#"
[memory_connectors.workspace_note]
kind = "markdown_file"
path = "/tmp/hermes-memory.md"
session_id = "latest"
default_entity_type = "note_section"
default_observation_type = "external_note"
"#,
)
.unwrap();
let connector = config
.memory_connectors
.get("workspace_note")
.expect("connector should deserialize");
match connector {
crate::config::MemoryConnectorConfig::MarkdownFile(settings) => {
assert_eq!(settings.path, PathBuf::from("/tmp/hermes-memory.md"));
assert_eq!(settings.session_id.as_deref(), Some("latest"));
assert_eq!(
settings.default_entity_type.as_deref(),
Some("note_section")
);
assert_eq!(
settings.default_observation_type.as_deref(),
Some("external_note")
);
}
_ => panic!("expected markdown_file connector"),
}
}
#[test]
fn completion_summary_notifications_deserialize_from_toml() {
let config: Config = toml::from_str(

View File

@ -588,6 +588,18 @@ struct JsonlMemoryConnectorRecord {
details: BTreeMap<String, String>,
}
const MARKDOWN_CONNECTOR_SUMMARY_LIMIT: usize = 160;
const MARKDOWN_CONNECTOR_BODY_LIMIT: usize = 4000;
#[derive(Debug, Clone)]
struct MarkdownMemorySection {
heading: String,
path: String,
summary: String,
body: String,
line_number: usize,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
@ -1594,6 +1606,9 @@ fn sync_memory_connector(
config::MemoryConnectorConfig::JsonlDirectory(settings) => {
sync_jsonl_directory_memory_connector(db, name, settings, limit)
}
config::MemoryConnectorConfig::MarkdownFile(settings) => {
sync_markdown_memory_connector(db, name, settings, limit)
}
}
}
@ -1716,72 +1731,152 @@ fn sync_jsonl_memory_reader<R: BufRead>(
}
};
let session_id = match record.session_id.as_deref() {
Some(value) => match resolve_session_id(db, value) {
Ok(resolved) => Some(resolved),
Err(_) => {
stats.skipped_records += 1;
continue;
}
},
None => default_session_id.clone(),
};
let entity_type = record
.entity_type
.as_deref()
.or(default_entity_type)
.map(str::trim)
.filter(|value| !value.is_empty());
let observation_type = record
.observation_type
.as_deref()
.or(default_observation_type)
.map(str::trim)
.filter(|value| !value.is_empty());
let entity_name = record.entity_name.trim();
let summary = record.summary.trim();
let Some(entity_type) = entity_type else {
stats.skipped_records += 1;
continue;
};
let Some(observation_type) = observation_type else {
stats.skipped_records += 1;
continue;
};
if entity_name.is_empty() || summary.is_empty() {
stats.skipped_records += 1;
continue;
}
let entity_summary = record
.entity_summary
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or(summary);
let entity = db.upsert_context_entity(
session_id.as_deref(),
entity_type,
entity_name,
record.path.as_deref(),
entity_summary,
&record.metadata,
import_memory_connector_record(
db,
&mut stats,
default_session_id.as_deref(),
default_entity_type,
default_observation_type,
record,
)?;
db.add_context_observation(
session_id.as_deref(),
entity.id,
observation_type,
summary,
&record.details,
)?;
stats.entities_upserted += 1;
stats.observations_added += 1;
}
Ok(stats)
}
fn sync_markdown_memory_connector(
db: &session::store::StateStore,
name: &str,
settings: &config::MemoryConnectorMarkdownFileConfig,
limit: usize,
) -> Result<GraphConnectorSyncStats> {
if settings.path.as_os_str().is_empty() {
anyhow::bail!("memory connector {name} has no path configured");
}
let body = std::fs::read_to_string(&settings.path)
.with_context(|| format!("read memory connector file {}", settings.path.display()))?;
let default_session_id = settings
.session_id
.as_deref()
.map(|value| resolve_session_id(db, value))
.transpose()?;
let sections = parse_markdown_memory_sections(&settings.path, &body, limit);
let mut stats = GraphConnectorSyncStats {
connector_name: name.to_string(),
..Default::default()
};
for section in sections {
stats.records_read += 1;
let mut details = BTreeMap::new();
if !section.body.is_empty() {
details.insert("body".to_string(), section.body.clone());
}
details.insert(
"source_path".to_string(),
settings.path.display().to_string(),
);
details.insert("line".to_string(), section.line_number.to_string());
let mut metadata = BTreeMap::new();
metadata.insert("connector".to_string(), "markdown_file".to_string());
import_memory_connector_record(
db,
&mut stats,
default_session_id.as_deref(),
settings.default_entity_type.as_deref(),
settings.default_observation_type.as_deref(),
JsonlMemoryConnectorRecord {
session_id: None,
entity_type: None,
entity_name: section.heading,
path: Some(section.path),
entity_summary: Some(section.summary.clone()),
metadata,
observation_type: None,
summary: section.summary,
details,
},
)?;
}
Ok(stats)
}
fn import_memory_connector_record(
db: &session::store::StateStore,
stats: &mut GraphConnectorSyncStats,
default_session_id: Option<&str>,
default_entity_type: Option<&str>,
default_observation_type: Option<&str>,
record: JsonlMemoryConnectorRecord,
) -> Result<()> {
let session_id = match record.session_id.as_deref() {
Some(value) => match resolve_session_id(db, value) {
Ok(resolved) => Some(resolved),
Err(_) => {
stats.skipped_records += 1;
return Ok(());
}
},
None => default_session_id.map(str::to_string),
};
let entity_type = record
.entity_type
.as_deref()
.or(default_entity_type)
.map(str::trim)
.filter(|value| !value.is_empty());
let observation_type = record
.observation_type
.as_deref()
.or(default_observation_type)
.map(str::trim)
.filter(|value| !value.is_empty());
let entity_name = record.entity_name.trim();
let summary = record.summary.trim();
let Some(entity_type) = entity_type else {
stats.skipped_records += 1;
return Ok(());
};
let Some(observation_type) = observation_type else {
stats.skipped_records += 1;
return Ok(());
};
if entity_name.is_empty() || summary.is_empty() {
stats.skipped_records += 1;
return Ok(());
}
let entity_summary = record
.entity_summary
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or(summary);
let entity = db.upsert_context_entity(
session_id.as_deref(),
entity_type,
entity_name,
record.path.as_deref(),
entity_summary,
&record.metadata,
)?;
db.add_context_observation(
session_id.as_deref(),
entity.id,
observation_type,
summary,
&record.details,
)?;
stats.entities_upserted += 1;
stats.observations_added += 1;
Ok(())
}
fn collect_jsonl_paths(root: &Path, recurse: bool) -> Result<Vec<PathBuf>> {
let mut paths = Vec::new();
collect_jsonl_paths_inner(root, recurse, &mut paths)?;
@ -1812,6 +1907,157 @@ fn collect_jsonl_paths_inner(root: &Path, recurse: bool, paths: &mut Vec<PathBuf
Ok(())
}
fn parse_markdown_memory_sections(
path: &Path,
body: &str,
limit: usize,
) -> Vec<MarkdownMemorySection> {
if limit == 0 {
return Vec::new();
}
let source_path = path.display().to_string();
let fallback_heading = path
.file_stem()
.and_then(|value| value.to_str())
.filter(|value| !value.trim().is_empty())
.unwrap_or("note")
.trim()
.to_string();
let mut sections = Vec::new();
let mut preamble = Vec::new();
let mut current_heading: Option<(String, usize)> = None;
let mut current_body = Vec::new();
for (index, line) in body.lines().enumerate() {
let line_number = index + 1;
if let Some(heading) = markdown_heading_title(line) {
if let Some((title, start_line)) = current_heading.take() {
if let Some(section) = markdown_memory_section(
&source_path,
&title,
start_line,
&current_body.join("\n"),
) {
sections.push(section);
}
} else if !preamble.join("\n").trim().is_empty() {
if let Some(section) = markdown_memory_section(
&source_path,
&fallback_heading,
1,
&preamble.join("\n"),
) {
sections.push(section);
}
}
current_heading = Some((heading.to_string(), line_number));
current_body.clear();
continue;
}
if current_heading.is_some() {
current_body.push(line.to_string());
} else {
preamble.push(line.to_string());
}
}
if let Some((title, start_line)) = current_heading {
if let Some(section) =
markdown_memory_section(&source_path, &title, start_line, &current_body.join("\n"))
{
sections.push(section);
}
} else if let Some(section) =
markdown_memory_section(&source_path, &fallback_heading, 1, &preamble.join("\n"))
{
sections.push(section);
}
sections.truncate(limit);
sections
}
fn markdown_heading_title(line: &str) -> Option<&str> {
let trimmed = line.trim_start();
let hashes = trimmed.chars().take_while(|ch| *ch == '#').count();
if hashes == 0 || hashes > 6 {
return None;
}
let title = trimmed[hashes..].trim_start();
if title.is_empty() {
return None;
}
Some(title.trim())
}
fn markdown_memory_section(
source_path: &str,
heading: &str,
line_number: usize,
body: &str,
) -> Option<MarkdownMemorySection> {
let heading = heading.trim();
if heading.is_empty() {
return None;
}
let normalized_body = body.trim();
let summary = markdown_section_summary(heading, normalized_body);
if summary.is_empty() {
return None;
}
let slug = markdown_heading_slug(heading);
let path = if slug.is_empty() {
source_path.to_string()
} else {
format!("{source_path}#{slug}")
};
Some(MarkdownMemorySection {
heading: truncate_connector_text(heading, MARKDOWN_CONNECTOR_SUMMARY_LIMIT),
path,
summary,
body: truncate_connector_text(normalized_body, MARKDOWN_CONNECTOR_BODY_LIMIT),
line_number,
})
}
fn markdown_section_summary(heading: &str, body: &str) -> String {
let candidate = body
.lines()
.map(str::trim)
.find(|line| !line.is_empty())
.unwrap_or(heading);
truncate_connector_text(candidate, MARKDOWN_CONNECTOR_SUMMARY_LIMIT)
}
fn markdown_heading_slug(value: &str) -> String {
let mut slug = String::new();
let mut last_dash = false;
for ch in value.chars() {
if ch.is_ascii_alphanumeric() {
slug.push(ch.to_ascii_lowercase());
last_dash = false;
} else if !last_dash {
slug.push('-');
last_dash = true;
}
}
slug.trim_matches('-').to_string()
}
fn truncate_connector_text(value: &str, max_chars: usize) -> String {
let trimmed = value.trim();
if trimmed.chars().count() <= max_chars {
return trimmed.to_string();
}
let truncated: String = trimmed.chars().take(max_chars.saturating_sub(1)).collect();
format!("{truncated}")
}
fn build_message(
kind: MessageKindArg,
text: String,
@ -5144,6 +5390,93 @@ mod tests {
Ok(())
}
#[test]
fn sync_memory_connector_imports_markdown_file_sections() -> Result<()> {
let tempdir = TestDir::new("graph-connector-sync-markdown")?;
let db = session::store::StateStore::open(&tempdir.path().join("state.db"))?;
let now = chrono::Utc::now();
db.insert_session(&session::Session {
id: "session-1".to_string(),
task: "knowledge import".to_string(),
project: "everything-claude-code".to_string(),
task_group: "memory".to_string(),
agent_type: "claude".to_string(),
working_dir: PathBuf::from("/tmp"),
state: session::SessionState::Running,
pid: None,
worktree: None,
created_at: now,
updated_at: now,
last_heartbeat_at: now,
metrics: session::SessionMetrics::default(),
})?;
let connector_path = tempdir.path().join("workspace-memory.md");
fs::write(
&connector_path,
r#"# Billing incident
Customer wiped setup and got charged twice after reinstalling.
## Portal routing
Route existing installs to portal first before presenting checkout again.
## Docs fix
Guide users to repair before reinstall so wiped setups do not buy twice.
"#,
)?;
let mut cfg = config::Config::default();
cfg.memory_connectors.insert(
"workspace_note".to_string(),
config::MemoryConnectorConfig::MarkdownFile(
config::MemoryConnectorMarkdownFileConfig {
path: connector_path.clone(),
session_id: Some("latest".to_string()),
default_entity_type: Some("note_section".to_string()),
default_observation_type: Some("external_note".to_string()),
},
),
);
let stats = sync_memory_connector(&db, &cfg, "workspace_note", 10)?;
assert_eq!(stats.records_read, 3);
assert_eq!(stats.entities_upserted, 3);
assert_eq!(stats.observations_added, 3);
assert_eq!(stats.skipped_records, 0);
let recalled = db.recall_context_entities(None, "charged twice reinstall", 10)?;
assert!(recalled
.iter()
.any(|entry| entry.entity.name == "Billing incident"));
assert!(recalled.iter().any(|entry| entry.entity.name == "Docs fix"));
let billing = recalled
.iter()
.find(|entry| entry.entity.name == "Billing incident")
.expect("billing section should exist");
let expected_anchor_path = format!("{}#billing-incident", connector_path.display());
assert_eq!(
billing.entity.path.as_deref(),
Some(expected_anchor_path.as_str())
);
let observations = db.list_context_observations(Some(billing.entity.id), 5)?;
assert_eq!(observations.len(), 1);
let expected_source_path = connector_path.display().to_string();
assert_eq!(
observations[0]
.details
.get("source_path")
.map(String::as_str),
Some(expected_source_path.as_str())
);
assert!(observations[0]
.details
.get("body")
.is_some_and(|value: &String| value.contains("charged twice")));
Ok(())
}
#[test]
fn format_graph_sync_stats_human_renders_counts() {
let text = format_graph_sync_stats_human(