feat(memory): add post-turn durable fact extraction across all agent entry points

This commit is contained in:
argenis de la rosa 2026-03-03 16:20:45 -05:00
parent f2ba33fce8
commit 09d7684cfa
3 changed files with 1274 additions and 32 deletions

View File

@ -2,6 +2,7 @@ use crate::agent::dispatcher::{
NativeToolDispatcher, ParsedToolCall, ToolDispatcher, ToolExecutionResult, XmlToolDispatcher,
};
use crate::agent::loop_::detection::{DetectionVerdict, LoopDetectionConfig, LoopDetector};
use crate::agent::loop_::history::{extract_facts_from_turns, TurnBuffer};
use crate::agent::memory_loader::{DefaultMemoryLoader, MemoryLoader};
use crate::agent::prompt::{PromptContext, SystemPromptBuilder};
use crate::agent::research;
@ -37,6 +38,8 @@ pub struct Agent {
skills: Vec<crate::skills::Skill>,
skills_prompt_mode: crate::config::SkillsPromptInjectionMode,
auto_save: bool,
session_id: Option<String>,
turn_buffer: TurnBuffer,
history: Vec<ConversationMessage>,
classification_config: crate::config::QueryClassificationConfig,
available_hints: Vec<String>,
@ -60,6 +63,7 @@ pub struct AgentBuilder {
skills: Option<Vec<crate::skills::Skill>>,
skills_prompt_mode: Option<crate::config::SkillsPromptInjectionMode>,
auto_save: Option<bool>,
session_id: Option<String>,
classification_config: Option<crate::config::QueryClassificationConfig>,
available_hints: Option<Vec<String>>,
route_model_by_hint: Option<HashMap<String, String>>,
@ -84,6 +88,7 @@ impl AgentBuilder {
skills: None,
skills_prompt_mode: None,
auto_save: None,
session_id: None,
classification_config: None,
available_hints: None,
route_model_by_hint: None,
@ -169,6 +174,12 @@ impl AgentBuilder {
self
}
/// Set the session identifier for memory isolation across users/channels.
pub fn session_id(mut self, session_id: String) -> Self {
self.session_id = Some(session_id);
self
}
pub fn classification_config(
mut self,
classification_config: crate::config::QueryClassificationConfig,
@ -229,6 +240,8 @@ impl AgentBuilder {
skills: self.skills.unwrap_or_default(),
skills_prompt_mode: self.skills_prompt_mode.unwrap_or_default(),
auto_save: self.auto_save.unwrap_or(false),
session_id: self.session_id,
turn_buffer: TurnBuffer::new(),
history: Vec::new(),
classification_config: self.classification_config.unwrap_or_default(),
available_hints: self.available_hints.unwrap_or_default(),
@ -500,7 +513,12 @@ impl Agent {
if self.auto_save {
let _ = self
.memory
.store("user_msg", user_message, MemoryCategory::Conversation, None)
.store(
"user_msg",
user_message,
MemoryCategory::Conversation,
self.session_id.as_deref(),
)
.await;
}
@ -617,12 +635,31 @@ impl Agent {
"assistant_resp",
&final_text,
MemoryCategory::Conversation,
None,
self.session_id.as_deref(),
)
.await;
}
self.trim_history();
// ── Post-turn fact extraction ──────────────────────
if self.auto_save {
self.turn_buffer.push(user_message, &final_text);
if self.turn_buffer.should_extract() {
let turns = self.turn_buffer.drain_for_extraction();
let result = extract_facts_from_turns(
self.provider.as_ref(),
&self.model_name,
&turns,
self.memory.as_ref(),
self.session_id.as_deref(),
)
.await;
if result.stored > 0 || result.no_facts {
self.turn_buffer.mark_extract_success();
}
}
}
return Ok(final_text);
}
@ -678,8 +715,44 @@ impl Agent {
)
}
/// Flush any remaining buffered turns for fact extraction.
/// Call this when the session/conversation ends to avoid losing
/// facts from short (< 5 turn) sessions.
///
/// On failure the turns are restored so callers that keep the agent
/// alive can still fall back to compaction-based extraction.
pub async fn flush_turn_buffer(&mut self) {
if !self.auto_save || self.turn_buffer.is_empty() {
return;
}
let turns = self.turn_buffer.drain_for_extraction();
let result = extract_facts_from_turns(
self.provider.as_ref(),
&self.model_name,
&turns,
self.memory.as_ref(),
self.session_id.as_deref(),
)
.await;
if result.stored > 0 || result.no_facts {
self.turn_buffer.mark_extract_success();
} else {
// Restore turns so compaction fallback can still pick them up
// if the agent isn't dropped immediately.
tracing::warn!(
"Exit flush failed; restoring {} turn(s) to buffer",
turns.len()
);
for (u, a) in turns {
self.turn_buffer.push(&u, &a);
}
}
}
pub async fn run_single(&mut self, message: &str) -> Result<String> {
self.turn(message).await
let result = self.turn(message).await?;
self.flush_turn_buffer().await;
Ok(result)
}
pub async fn run_interactive(&mut self) -> Result<()> {
@ -705,6 +778,7 @@ impl Agent {
}
listen_handle.abort();
self.flush_turn_buffer().await;
Ok(())
}
}

View File

@ -35,7 +35,7 @@ use uuid::Uuid;
mod context;
pub(crate) mod detection;
mod execution;
mod history;
pub(crate) mod history;
mod parsing;
use context::{build_context, build_hardware_context};
@ -46,7 +46,7 @@ use execution::{
};
#[cfg(test)]
use history::{apply_compaction_summary, build_compaction_transcript};
use history::{auto_compact_history, trim_history};
use history::{auto_compact_history, extract_facts_from_turns, trim_history, TurnBuffer};
#[allow(unused_imports)]
use parsing::{
default_param_for_tool, detect_tool_call_parse_issue, extract_json_values, map_tool_name_alias,
@ -2823,6 +2823,19 @@ pub async fn run(
}
println!("{response}");
observer.record_event(&ObserverEvent::TurnComplete);
// ── Post-turn fact extraction (single-message mode) ────────
if config.memory.auto_save {
let turns = vec![(msg.clone(), response.clone())];
let _ = extract_facts_from_turns(
provider.as_ref(),
&model_name,
&turns,
mem.as_ref(),
None,
)
.await;
}
} else {
println!("🦀 ZeroClaw Interactive Mode");
println!("Type /help for commands.\n");
@ -2831,6 +2844,7 @@ pub async fn run(
// Persistent conversation history across turns
let mut history = vec![ChatMessage::system(&system_prompt)];
let mut interactive_turn: usize = 0;
let mut turn_buffer = TurnBuffer::new();
// Reusable readline editor for UTF-8 input support
let mut rl = Editor::with_config(
RlConfig::builder()
@ -2843,6 +2857,18 @@ pub async fn run(
let input = match rl.readline("> ") {
Ok(line) => line,
Err(ReadlineError::Interrupted | ReadlineError::Eof) => {
// Flush any remaining buffered turns before exit.
if config.memory.auto_save && !turn_buffer.is_empty() {
let turns = turn_buffer.drain_for_extraction();
let _ = extract_facts_from_turns(
provider.as_ref(),
&model_name,
&turns,
mem.as_ref(),
None,
)
.await;
}
break;
}
Err(e) => {
@ -2857,7 +2883,21 @@ pub async fn run(
}
rl.add_history_entry(&input)?;
match user_input.as_str() {
"/quit" | "/exit" => break,
"/quit" | "/exit" => {
// Flush any remaining buffered turns before exit.
if config.memory.auto_save && !turn_buffer.is_empty() {
let turns = turn_buffer.drain_for_extraction();
let _ = extract_facts_from_turns(
provider.as_ref(),
&model_name,
&turns,
mem.as_ref(),
None,
)
.await;
}
break;
}
"/help" => {
println!("Available commands:");
println!(" /help Show this help message");
@ -2882,6 +2922,7 @@ pub async fn run(
history.clear();
history.push(ChatMessage::system(&system_prompt));
interactive_turn = 0;
turn_buffer = TurnBuffer::new();
// Clear conversation and daily memory
let mut cleared = 0;
for category in [MemoryCategory::Conversation, MemoryCategory::Daily] {
@ -3042,18 +3083,58 @@ pub async fn run(
}
observer.record_event(&ObserverEvent::TurnComplete);
// ── Post-turn fact extraction ────────────────────────────
if config.memory.auto_save {
turn_buffer.push(&user_input, &response);
if turn_buffer.should_extract() {
let turns = turn_buffer.drain_for_extraction();
let result = extract_facts_from_turns(
provider.as_ref(),
&model_name,
&turns,
mem.as_ref(),
None,
)
.await;
if result.stored > 0 || result.no_facts {
turn_buffer.mark_extract_success();
}
}
}
// Auto-compaction before hard trimming to preserve long-context signal.
if let Ok(compacted) = auto_compact_history(
// post_turn_active is only true when auto_save is on AND the
// turn buffer confirms recent extraction succeeded; otherwise
// compaction must fall back to its own flush_durable_facts.
let post_turn_active =
config.memory.auto_save && !turn_buffer.needs_compaction_fallback();
if let Ok((compacted, flush_ok)) = auto_compact_history(
&mut history,
provider.as_ref(),
&model_name,
config.agent.max_history_messages,
effective_hooks,
Some(mem.as_ref()),
None,
post_turn_active,
)
.await
{
if compacted {
if !post_turn_active {
// Compaction ran its own flush_durable_facts as
// fallback. Drain any buffered turns to prevent
// duplicate extraction.
if !turn_buffer.is_empty() {
let _ = turn_buffer.drain_for_extraction();
}
// Only reset the failure flag when the fallback
// flush actually succeeded; otherwise keep the
// flag so subsequent compactions retry.
if flush_ok {
turn_buffer.mark_extract_success();
}
}
println!("🧹 Auto-compaction complete");
}
}
@ -3290,7 +3371,7 @@ pub async fn process_message_with_session(
} else {
None
};
scope_cost_enforcement_context(
let response = scope_cost_enforcement_context(
cost_enforcement_context,
SAFETY_HEARTBEAT_CONFIG.scope(
hb_cfg,
@ -3308,7 +3389,22 @@ pub async fn process_message_with_session(
),
),
)
.await
.await?;
// ── Post-turn fact extraction (channel / single-message-with-session) ──
if config.memory.auto_save {
let turns = vec![(message.to_owned(), response.clone())];
let _ = extract_facts_from_turns(
provider.as_ref(),
&model_name,
&turns,
mem.as_ref(),
session_id,
)
.await;
}
Ok(response)
}
#[cfg(test)]

File diff suppressed because it is too large Load Diff