fix(memory): filter autosave noise and scope recall/store by session (#3695)
* fix(memory): filter autosave noise and scope memory by session * style: format rebase-resolved gateway and memory loader * fix(tests): update memory loader mock for session-aware context * fix(openai-codex): decode utf-8 safely across stream chunks
This commit is contained in:
parent
7db8853085
commit
b5af73cac6
@ -33,6 +33,7 @@ pub struct Agent {
|
||||
skills: Vec<crate::skills::Skill>,
|
||||
skills_prompt_mode: crate::config::SkillsPromptInjectionMode,
|
||||
auto_save: bool,
|
||||
memory_session_id: Option<String>,
|
||||
history: Vec<ConversationMessage>,
|
||||
classification_config: crate::config::QueryClassificationConfig,
|
||||
available_hints: Vec<String>,
|
||||
@ -57,6 +58,7 @@ pub struct AgentBuilder {
|
||||
skills: Option<Vec<crate::skills::Skill>>,
|
||||
skills_prompt_mode: Option<crate::config::SkillsPromptInjectionMode>,
|
||||
auto_save: Option<bool>,
|
||||
memory_session_id: Option<String>,
|
||||
classification_config: Option<crate::config::QueryClassificationConfig>,
|
||||
available_hints: Option<Vec<String>>,
|
||||
route_model_by_hint: Option<HashMap<String, String>>,
|
||||
@ -82,6 +84,7 @@ impl AgentBuilder {
|
||||
skills: None,
|
||||
skills_prompt_mode: None,
|
||||
auto_save: None,
|
||||
memory_session_id: None,
|
||||
classification_config: None,
|
||||
available_hints: None,
|
||||
route_model_by_hint: None,
|
||||
@ -168,6 +171,11 @@ impl AgentBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn memory_session_id(mut self, memory_session_id: Option<String>) -> Self {
|
||||
self.memory_session_id = memory_session_id;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn classification_config(
|
||||
mut self,
|
||||
classification_config: crate::config::QueryClassificationConfig,
|
||||
@ -242,6 +250,7 @@ impl AgentBuilder {
|
||||
skills: self.skills.unwrap_or_default(),
|
||||
skills_prompt_mode: self.skills_prompt_mode.unwrap_or_default(),
|
||||
auto_save: self.auto_save.unwrap_or(false),
|
||||
memory_session_id: self.memory_session_id,
|
||||
history: Vec::new(),
|
||||
classification_config: self.classification_config.unwrap_or_default(),
|
||||
available_hints: self.available_hints.unwrap_or_default(),
|
||||
@ -265,6 +274,10 @@ impl Agent {
|
||||
self.history.clear();
|
||||
}
|
||||
|
||||
pub fn set_memory_session_id(&mut self, session_id: Option<String>) {
|
||||
self.memory_session_id = session_id;
|
||||
}
|
||||
|
||||
pub fn from_config(config: &Config) -> Result<Self> {
|
||||
let observer: Arc<dyn Observer> =
|
||||
Arc::from(observability::create_observer(&config.observability));
|
||||
@ -515,13 +528,22 @@ impl Agent {
|
||||
if self.auto_save {
|
||||
let _ = self
|
||||
.memory
|
||||
.store("user_msg", user_message, MemoryCategory::Conversation, None)
|
||||
.store(
|
||||
"user_msg",
|
||||
user_message,
|
||||
MemoryCategory::Conversation,
|
||||
self.memory_session_id.as_deref(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
let context = self
|
||||
.memory_loader
|
||||
.load_context(self.memory.as_ref(), user_message)
|
||||
.load_context(
|
||||
self.memory.as_ref(),
|
||||
user_message,
|
||||
self.memory_session_id.as_deref(),
|
||||
)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
|
||||
@ -269,6 +269,15 @@ fn autosave_memory_key(prefix: &str) -> String {
|
||||
format!("{prefix}_{}", Uuid::new_v4())
|
||||
}
|
||||
|
||||
fn memory_session_id_from_state_file(path: &Path) -> Option<String> {
|
||||
let raw = path.to_string_lossy().trim().to_string();
|
||||
if raw.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(format!("cli:{raw}"))
|
||||
}
|
||||
|
||||
/// Trim conversation history to prevent unbounded growth.
|
||||
/// Preserves the system prompt (first message if role=system) and the most recent messages.
|
||||
fn trim_history(history: &mut Vec<ChatMessage>, max_history: usize) {
|
||||
@ -419,11 +428,16 @@ fn save_interactive_session_history(path: &Path, history: &[ChatMessage]) -> Res
|
||||
/// Build context preamble by searching memory for relevant entries.
|
||||
/// Entries with a hybrid score below `min_relevance_score` are dropped to
|
||||
/// prevent unrelated memories from bleeding into the conversation.
|
||||
async fn build_context(mem: &dyn Memory, user_msg: &str, min_relevance_score: f64) -> String {
|
||||
async fn build_context(
|
||||
mem: &dyn Memory,
|
||||
user_msg: &str,
|
||||
min_relevance_score: f64,
|
||||
session_id: Option<&str>,
|
||||
) -> String {
|
||||
let mut context = String::new();
|
||||
|
||||
// Pull relevant memories for this message
|
||||
if let Ok(entries) = mem.recall(user_msg, 5, None).await {
|
||||
if let Ok(entries) = mem.recall(user_msg, 5, session_id).await {
|
||||
let relevant: Vec<_> = entries
|
||||
.iter()
|
||||
.filter(|e| match e.score {
|
||||
@ -438,6 +452,9 @@ async fn build_context(mem: &dyn Memory, user_msg: &str, min_relevance_score: f6
|
||||
if memory::is_assistant_autosave_key(&entry.key) {
|
||||
continue;
|
||||
}
|
||||
if memory::should_skip_autosave_content(&entry.content) {
|
||||
continue;
|
||||
}
|
||||
// Skip entries containing tool_result blocks — they can leak
|
||||
// stale tool output from previous heartbeat ticks into new
|
||||
// sessions, presenting the LLM with orphan tool_result data.
|
||||
@ -3351,6 +3368,9 @@ pub async fn run(
|
||||
None
|
||||
};
|
||||
let channel_name = if interactive { "cli" } else { "daemon" };
|
||||
let memory_session_id = session_state_file
|
||||
.as_deref()
|
||||
.and_then(memory_session_id_from_state_file);
|
||||
|
||||
// ── Execute ──────────────────────────────────────────────────
|
||||
let start = Instant::now();
|
||||
@ -3359,16 +3379,29 @@ pub async fn run(
|
||||
|
||||
if let Some(msg) = message {
|
||||
// Auto-save user message to memory (skip short/trivial messages)
|
||||
if config.memory.auto_save && msg.chars().count() >= AUTOSAVE_MIN_MESSAGE_CHARS {
|
||||
if config.memory.auto_save
|
||||
&& msg.chars().count() >= AUTOSAVE_MIN_MESSAGE_CHARS
|
||||
&& !memory::should_skip_autosave_content(&msg)
|
||||
{
|
||||
let user_key = autosave_memory_key("user_msg");
|
||||
let _ = mem
|
||||
.store(&user_key, &msg, MemoryCategory::Conversation, None)
|
||||
.store(
|
||||
&user_key,
|
||||
&msg,
|
||||
MemoryCategory::Conversation,
|
||||
memory_session_id.as_deref(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Inject memory + hardware RAG context into user message
|
||||
let mem_context =
|
||||
build_context(mem.as_ref(), &msg, config.memory.min_relevance_score).await;
|
||||
let mem_context = build_context(
|
||||
mem.as_ref(),
|
||||
&msg,
|
||||
config.memory.min_relevance_score,
|
||||
memory_session_id.as_deref(),
|
||||
)
|
||||
.await;
|
||||
let rag_limit = if config.agent.compact_context { 2 } else { 5 };
|
||||
let hw_context = hardware_rag
|
||||
.as_ref()
|
||||
@ -3507,16 +3540,29 @@ pub async fn run(
|
||||
}
|
||||
|
||||
// Auto-save conversation turns (skip short/trivial messages)
|
||||
if config.memory.auto_save && user_input.chars().count() >= AUTOSAVE_MIN_MESSAGE_CHARS {
|
||||
if config.memory.auto_save
|
||||
&& user_input.chars().count() >= AUTOSAVE_MIN_MESSAGE_CHARS
|
||||
&& !memory::should_skip_autosave_content(&user_input)
|
||||
{
|
||||
let user_key = autosave_memory_key("user_msg");
|
||||
let _ = mem
|
||||
.store(&user_key, &user_input, MemoryCategory::Conversation, None)
|
||||
.store(
|
||||
&user_key,
|
||||
&user_input,
|
||||
MemoryCategory::Conversation,
|
||||
memory_session_id.as_deref(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Inject memory + hardware RAG context into user message
|
||||
let mem_context =
|
||||
build_context(mem.as_ref(), &user_input, config.memory.min_relevance_score).await;
|
||||
let mem_context = build_context(
|
||||
mem.as_ref(),
|
||||
&user_input,
|
||||
config.memory.min_relevance_score,
|
||||
memory_session_id.as_deref(),
|
||||
)
|
||||
.await;
|
||||
let rag_limit = if config.agent.compact_context { 2 } else { 5 };
|
||||
let hw_context = hardware_rag
|
||||
.as_ref()
|
||||
@ -3615,7 +3661,11 @@ pub async fn run(
|
||||
|
||||
/// Process a single message through the full agent (with tools, peripherals, memory).
|
||||
/// Used by channels (Telegram, Discord, etc.) to enable hardware and tool use.
|
||||
pub async fn process_message(config: Config, message: &str) -> Result<String> {
|
||||
pub async fn process_message(
|
||||
config: Config,
|
||||
message: &str,
|
||||
session_id: Option<&str>,
|
||||
) -> Result<String> {
|
||||
let observer: Arc<dyn Observer> =
|
||||
Arc::from(observability::create_observer(&config.observability));
|
||||
let runtime: Arc<dyn runtime::RuntimeAdapter> =
|
||||
@ -3808,7 +3858,13 @@ pub async fn process_message(config: Config, message: &str) -> Result<String> {
|
||||
system_prompt.push_str(&build_tool_instructions(&tools_registry));
|
||||
}
|
||||
|
||||
let mem_context = build_context(mem.as_ref(), message, config.memory.min_relevance_score).await;
|
||||
let mem_context = build_context(
|
||||
mem.as_ref(),
|
||||
message,
|
||||
config.memory.min_relevance_score,
|
||||
session_id,
|
||||
)
|
||||
.await;
|
||||
let rag_limit = if config.agent.compact_context { 2 } else { 5 };
|
||||
let hw_context = hardware_rag
|
||||
.as_ref()
|
||||
@ -5481,7 +5537,7 @@ Tail"#;
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let context = build_context(&mem, "status updates", 0.0).await;
|
||||
let context = build_context(&mem, "status updates", 0.0, None).await;
|
||||
assert!(context.contains("user_msg_real"));
|
||||
assert!(!context.contains("assistant_resp_poisoned"));
|
||||
assert!(!context.contains("fabricated event"));
|
||||
|
||||
@ -4,8 +4,12 @@ use std::fmt::Write;
|
||||
|
||||
#[async_trait]
|
||||
pub trait MemoryLoader: Send + Sync {
|
||||
async fn load_context(&self, memory: &dyn Memory, user_message: &str)
|
||||
-> anyhow::Result<String>;
|
||||
async fn load_context(
|
||||
&self,
|
||||
memory: &dyn Memory,
|
||||
user_message: &str,
|
||||
session_id: Option<&str>,
|
||||
) -> anyhow::Result<String>;
|
||||
}
|
||||
|
||||
pub struct DefaultMemoryLoader {
|
||||
@ -37,8 +41,9 @@ impl MemoryLoader for DefaultMemoryLoader {
|
||||
&self,
|
||||
memory: &dyn Memory,
|
||||
user_message: &str,
|
||||
session_id: Option<&str>,
|
||||
) -> anyhow::Result<String> {
|
||||
let entries = memory.recall(user_message, self.limit, None).await?;
|
||||
let entries = memory.recall(user_message, self.limit, session_id).await?;
|
||||
if entries.is_empty() {
|
||||
return Ok(String::new());
|
||||
}
|
||||
@ -48,6 +53,9 @@ impl MemoryLoader for DefaultMemoryLoader {
|
||||
if memory::is_assistant_autosave_key(&entry.key) {
|
||||
continue;
|
||||
}
|
||||
if memory::should_skip_autosave_content(&entry.content) {
|
||||
continue;
|
||||
}
|
||||
if let Some(score) = entry.score {
|
||||
if score < self.min_relevance_score {
|
||||
continue;
|
||||
@ -191,7 +199,10 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn default_loader_formats_context() {
|
||||
let loader = DefaultMemoryLoader::default();
|
||||
let context = loader.load_context(&MockMemory, "hello").await.unwrap();
|
||||
let context = loader
|
||||
.load_context(&MockMemory, "hello", None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(context.contains("[Memory context]"));
|
||||
assert!(context.contains("- k: v"));
|
||||
}
|
||||
@ -222,7 +233,10 @@ mod tests {
|
||||
]),
|
||||
};
|
||||
|
||||
let context = loader.load_context(&memory, "answer style").await.unwrap();
|
||||
let context = loader
|
||||
.load_context(&memory, "answer style", None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(context.contains("user_fact"));
|
||||
assert!(!context.contains("assistant_resp_legacy"));
|
||||
assert!(!context.contains("fabricated detail"));
|
||||
|
||||
@ -1041,6 +1041,10 @@ fn should_skip_memory_context_entry(key: &str, content: &str) -> bool {
|
||||
return true;
|
||||
}
|
||||
|
||||
if memory::should_skip_autosave_content(content) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if key.trim().to_ascii_lowercase().ends_with("_history") {
|
||||
return true;
|
||||
}
|
||||
@ -1332,10 +1336,11 @@ async fn build_memory_context(
|
||||
mem: &dyn Memory,
|
||||
user_msg: &str,
|
||||
min_relevance_score: f64,
|
||||
session_id: Option<&str>,
|
||||
) -> String {
|
||||
let mut context = String::new();
|
||||
|
||||
if let Ok(entries) = mem.recall(user_msg, 5, None).await {
|
||||
if let Ok(entries) = mem.recall(user_msg, 5, session_id).await {
|
||||
let mut included = 0usize;
|
||||
let mut used_chars = 0usize;
|
||||
|
||||
@ -1855,7 +1860,10 @@ async fn process_channel_message(
|
||||
return;
|
||||
}
|
||||
};
|
||||
if ctx.auto_save_memory && msg.content.chars().count() >= AUTOSAVE_MIN_MESSAGE_CHARS {
|
||||
if ctx.auto_save_memory
|
||||
&& msg.content.chars().count() >= AUTOSAVE_MIN_MESSAGE_CHARS
|
||||
&& !memory::should_skip_autosave_content(&msg.content)
|
||||
{
|
||||
let autosave_key = conversation_memory_key(&msg);
|
||||
let _ = ctx
|
||||
.memory
|
||||
@ -1863,7 +1871,7 @@ async fn process_channel_message(
|
||||
&autosave_key,
|
||||
&msg.content,
|
||||
crate::memory::MemoryCategory::Conversation,
|
||||
None,
|
||||
Some(&history_key),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@ -1939,8 +1947,13 @@ async fn process_channel_message(
|
||||
// Only enrich with memory context when there is no prior conversation
|
||||
// history. Follow-up turns already include context from previous messages.
|
||||
if !had_prior_history {
|
||||
let memory_context =
|
||||
build_memory_context(ctx.memory.as_ref(), &msg.content, ctx.min_relevance_score).await;
|
||||
let memory_context = build_memory_context(
|
||||
ctx.memory.as_ref(),
|
||||
&msg.content,
|
||||
ctx.min_relevance_score,
|
||||
Some(&history_key),
|
||||
)
|
||||
.await;
|
||||
if let Some(last_turn) = prior_turns.last_mut() {
|
||||
if last_turn.role == "user" && !memory_context.is_empty() {
|
||||
last_turn.content = format!("{memory_context}{}", msg.content);
|
||||
@ -6989,7 +7002,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let context = build_memory_context(&mem, "age", 0.0).await;
|
||||
let context = build_memory_context(&mem, "age", 0.0, None).await;
|
||||
assert!(context.contains("[Memory context]"));
|
||||
assert!(context.contains("Age is 45"));
|
||||
}
|
||||
@ -7021,7 +7034,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let context = build_memory_context(&mem, "screenshot", 0.0).await;
|
||||
let context = build_memory_context(&mem, "screenshot", 0.0, None).await;
|
||||
|
||||
// The image-marker entry must be excluded to prevent duplication.
|
||||
assert!(
|
||||
|
||||
@ -75,6 +75,22 @@ fn nextcloud_talk_memory_key(msg: &crate::channels::traits::ChannelMessage) -> S
|
||||
format!("nextcloud_talk_{}_{}", msg.sender, msg.id)
|
||||
}
|
||||
|
||||
fn sender_session_id(channel: &str, msg: &crate::channels::traits::ChannelMessage) -> String {
|
||||
match &msg.thread_ts {
|
||||
Some(thread_id) => format!("{channel}_{thread_id}_{}", msg.sender),
|
||||
None => format!("{channel}_{}", msg.sender),
|
||||
}
|
||||
}
|
||||
|
||||
fn webhook_session_id(headers: &HeaderMap) -> Option<String> {
|
||||
headers
|
||||
.get("X-Session-Id")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(str::trim)
|
||||
.filter(|value| !value.is_empty())
|
||||
.map(str::to_owned)
|
||||
}
|
||||
|
||||
fn hash_webhook_secret(value: &str) -> String {
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
@ -908,9 +924,13 @@ async fn run_gateway_chat_simple(state: &AppState, message: &str) -> anyhow::Res
|
||||
}
|
||||
|
||||
/// Full-featured chat with tools for channel handlers (WhatsApp, Linq, Nextcloud Talk).
|
||||
async fn run_gateway_chat_with_tools(state: &AppState, message: &str) -> anyhow::Result<String> {
|
||||
async fn run_gateway_chat_with_tools(
|
||||
state: &AppState,
|
||||
message: &str,
|
||||
session_id: Option<&str>,
|
||||
) -> anyhow::Result<String> {
|
||||
let config = state.config.lock().clone();
|
||||
Box::pin(crate::agent::process_message(config, message)).await
|
||||
Box::pin(crate::agent::process_message(config, message, session_id)).await
|
||||
}
|
||||
|
||||
/// Webhook request body
|
||||
@ -1002,12 +1022,18 @@ async fn handle_webhook(
|
||||
}
|
||||
|
||||
let message = &webhook_body.message;
|
||||
let session_id = webhook_session_id(&headers);
|
||||
|
||||
if state.auto_save {
|
||||
if state.auto_save && !memory::should_skip_autosave_content(message) {
|
||||
let key = webhook_memory_key();
|
||||
let _ = state
|
||||
.mem
|
||||
.store(&key, message, MemoryCategory::Conversation, None)
|
||||
.store(
|
||||
&key,
|
||||
message,
|
||||
MemoryCategory::Conversation,
|
||||
session_id.as_deref(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
@ -1228,17 +1254,29 @@ async fn handle_whatsapp_message(
|
||||
msg.sender,
|
||||
truncate_with_ellipsis(&msg.content, 50)
|
||||
);
|
||||
let session_id = sender_session_id("whatsapp", msg);
|
||||
|
||||
// Auto-save to memory
|
||||
if state.auto_save {
|
||||
if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
|
||||
let key = whatsapp_memory_key(msg);
|
||||
let _ = state
|
||||
.mem
|
||||
.store(&key, &msg.content, MemoryCategory::Conversation, None)
|
||||
.store(
|
||||
&key,
|
||||
&msg.content,
|
||||
MemoryCategory::Conversation,
|
||||
Some(&session_id),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
match Box::pin(run_gateway_chat_with_tools(&state, &msg.content)).await {
|
||||
match Box::pin(run_gateway_chat_with_tools(
|
||||
&state,
|
||||
&msg.content,
|
||||
Some(&session_id),
|
||||
))
|
||||
.await
|
||||
{
|
||||
Ok(response) => {
|
||||
// Send reply via WhatsApp
|
||||
if let Err(e) = wa
|
||||
@ -1335,18 +1373,30 @@ async fn handle_linq_webhook(
|
||||
msg.sender,
|
||||
truncate_with_ellipsis(&msg.content, 50)
|
||||
);
|
||||
let session_id = sender_session_id("linq", msg);
|
||||
|
||||
// Auto-save to memory
|
||||
if state.auto_save {
|
||||
if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
|
||||
let key = linq_memory_key(msg);
|
||||
let _ = state
|
||||
.mem
|
||||
.store(&key, &msg.content, MemoryCategory::Conversation, None)
|
||||
.store(
|
||||
&key,
|
||||
&msg.content,
|
||||
MemoryCategory::Conversation,
|
||||
Some(&session_id),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Call the LLM
|
||||
match Box::pin(run_gateway_chat_with_tools(&state, &msg.content)).await {
|
||||
match Box::pin(run_gateway_chat_with_tools(
|
||||
&state,
|
||||
&msg.content,
|
||||
Some(&session_id),
|
||||
))
|
||||
.await
|
||||
{
|
||||
Ok(response) => {
|
||||
// Send reply via Linq
|
||||
if let Err(e) = linq
|
||||
@ -1427,18 +1477,30 @@ async fn handle_wati_webhook(State(state): State<AppState>, body: Bytes) -> impl
|
||||
msg.sender,
|
||||
truncate_with_ellipsis(&msg.content, 50)
|
||||
);
|
||||
let session_id = sender_session_id("wati", msg);
|
||||
|
||||
// Auto-save to memory
|
||||
if state.auto_save {
|
||||
if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
|
||||
let key = wati_memory_key(msg);
|
||||
let _ = state
|
||||
.mem
|
||||
.store(&key, &msg.content, MemoryCategory::Conversation, None)
|
||||
.store(
|
||||
&key,
|
||||
&msg.content,
|
||||
MemoryCategory::Conversation,
|
||||
Some(&session_id),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Call the LLM
|
||||
match Box::pin(run_gateway_chat_with_tools(&state, &msg.content)).await {
|
||||
match Box::pin(run_gateway_chat_with_tools(
|
||||
&state,
|
||||
&msg.content,
|
||||
Some(&session_id),
|
||||
))
|
||||
.await
|
||||
{
|
||||
Ok(response) => {
|
||||
// Send reply via WATI
|
||||
if let Err(e) = wati
|
||||
@ -1533,16 +1595,28 @@ async fn handle_nextcloud_talk_webhook(
|
||||
msg.sender,
|
||||
truncate_with_ellipsis(&msg.content, 50)
|
||||
);
|
||||
let session_id = sender_session_id("nextcloud_talk", msg);
|
||||
|
||||
if state.auto_save {
|
||||
if state.auto_save && !memory::should_skip_autosave_content(&msg.content) {
|
||||
let key = nextcloud_talk_memory_key(msg);
|
||||
let _ = state
|
||||
.mem
|
||||
.store(&key, &msg.content, MemoryCategory::Conversation, None)
|
||||
.store(
|
||||
&key,
|
||||
&msg.content,
|
||||
MemoryCategory::Conversation,
|
||||
Some(&session_id),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
match Box::pin(run_gateway_chat_with_tools(&state, &msg.content)).await {
|
||||
match Box::pin(run_gateway_chat_with_tools(
|
||||
&state,
|
||||
&msg.content,
|
||||
Some(&session_id),
|
||||
))
|
||||
.await
|
||||
{
|
||||
Ok(response) => {
|
||||
if let Err(e) = nextcloud_talk
|
||||
.send(&SendMessage::new(response, &msg.reply_target))
|
||||
|
||||
@ -116,7 +116,7 @@ pub async fn handle_ws_chat(
|
||||
.into_response()
|
||||
}
|
||||
|
||||
async fn handle_socket(socket: WebSocket, state: AppState, _session_id: Option<String>) {
|
||||
async fn handle_socket(socket: WebSocket, state: AppState, session_id: Option<String>) {
|
||||
let (mut sender, mut receiver) = socket.split();
|
||||
|
||||
// Build a persistent Agent for this connection so history is maintained across turns.
|
||||
@ -129,6 +129,7 @@ async fn handle_socket(socket: WebSocket, state: AppState, _session_id: Option<S
|
||||
return;
|
||||
}
|
||||
};
|
||||
agent.set_memory_session_id(session_id.clone());
|
||||
|
||||
while let Some(msg) = receiver.next().await {
|
||||
let msg = match msg {
|
||||
|
||||
@ -90,6 +90,20 @@ pub fn is_assistant_autosave_key(key: &str) -> bool {
|
||||
normalized == "assistant_resp" || normalized.starts_with("assistant_resp_")
|
||||
}
|
||||
|
||||
/// Filter known synthetic autosave noise patterns that should not be
|
||||
/// persisted as user conversation memories.
|
||||
pub fn should_skip_autosave_content(content: &str) -> bool {
|
||||
let normalized = content.trim();
|
||||
if normalized.is_empty() {
|
||||
return true;
|
||||
}
|
||||
|
||||
let lowered = normalized.to_ascii_lowercase();
|
||||
lowered.starts_with("[cron:")
|
||||
|| lowered.starts_with("[distilled_")
|
||||
|| lowered.contains("distilled_index_sig:")
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq)]
|
||||
struct ResolvedEmbeddingConfig {
|
||||
provider: String,
|
||||
@ -450,6 +464,17 @@ mod tests {
|
||||
assert!(!is_assistant_autosave_key("user_msg_1234"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn autosave_content_filter_drops_cron_and_distilled_noise() {
|
||||
assert!(should_skip_autosave_content("[cron:auto] patrol check"));
|
||||
assert!(should_skip_autosave_content(
|
||||
"[DISTILLED_MEMORY_CHUNK 1/2] DISTILLED_INDEX_SIG:abc123"
|
||||
));
|
||||
assert!(!should_skip_autosave_content(
|
||||
"User prefers concise answers."
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn factory_markdown() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
|
||||
@ -473,6 +473,75 @@ fn extract_stream_error_message(event: &Value) -> Option<String> {
|
||||
None
|
||||
}
|
||||
|
||||
fn append_utf8_stream_chunk(
|
||||
body: &mut String,
|
||||
pending: &mut Vec<u8>,
|
||||
chunk: &[u8],
|
||||
) -> anyhow::Result<()> {
|
||||
if pending.is_empty() {
|
||||
if let Ok(text) = std::str::from_utf8(chunk) {
|
||||
body.push_str(text);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
if !chunk.is_empty() {
|
||||
pending.extend_from_slice(chunk);
|
||||
}
|
||||
if pending.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match std::str::from_utf8(pending) {
|
||||
Ok(text) => {
|
||||
body.push_str(text);
|
||||
pending.clear();
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => {
|
||||
let valid_up_to = err.valid_up_to();
|
||||
if valid_up_to > 0 {
|
||||
// SAFETY: `valid_up_to` always points to the end of a valid UTF-8 prefix.
|
||||
let prefix = std::str::from_utf8(&pending[..valid_up_to])
|
||||
.expect("valid UTF-8 prefix from Utf8Error::valid_up_to");
|
||||
body.push_str(prefix);
|
||||
pending.drain(..valid_up_to);
|
||||
}
|
||||
|
||||
if err.error_len().is_some() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"OpenAI Codex response contained invalid UTF-8: {err}"
|
||||
));
|
||||
}
|
||||
|
||||
// `error_len == None` means we have a valid prefix and an incomplete
|
||||
// multi-byte sequence at the end; keep it buffered until next chunk.
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_utf8_stream_chunks<'a, I>(chunks: I) -> anyhow::Result<String>
|
||||
where
|
||||
I: IntoIterator<Item = &'a [u8]>,
|
||||
{
|
||||
let mut body = String::new();
|
||||
let mut pending = Vec::new();
|
||||
|
||||
for chunk in chunks {
|
||||
append_utf8_stream_chunk(&mut body, &mut pending, chunk)?;
|
||||
}
|
||||
|
||||
if !pending.is_empty() {
|
||||
let err = std::str::from_utf8(&pending).expect_err("pending bytes should be invalid UTF-8");
|
||||
return Err(anyhow::anyhow!(
|
||||
"OpenAI Codex response ended with incomplete UTF-8: {err}"
|
||||
));
|
||||
}
|
||||
|
||||
Ok(body)
|
||||
}
|
||||
|
||||
/// Read the response body incrementally via `bytes_stream()` to avoid
|
||||
/// buffering the entire SSE payload in memory. The previous implementation
|
||||
/// used `response.text().await?` which holds the HTTP connection open until
|
||||
@ -481,15 +550,21 @@ fn extract_stream_error_message(event: &Value) -> Option<String> {
|
||||
/// reported in #3544.
|
||||
async fn decode_responses_body(response: reqwest::Response) -> anyhow::Result<String> {
|
||||
let mut body = String::new();
|
||||
let mut pending_utf8 = Vec::new();
|
||||
let mut stream = response.bytes_stream();
|
||||
|
||||
while let Some(chunk) = stream.next().await {
|
||||
let bytes = chunk
|
||||
.map_err(|err| anyhow::anyhow!("error reading OpenAI Codex response stream: {err}"))?;
|
||||
let text = std::str::from_utf8(&bytes).map_err(|err| {
|
||||
anyhow::anyhow!("OpenAI Codex response contained invalid UTF-8: {err}")
|
||||
})?;
|
||||
body.push_str(text);
|
||||
append_utf8_stream_chunk(&mut body, &mut pending_utf8, &bytes)?;
|
||||
}
|
||||
|
||||
if !pending_utf8.is_empty() {
|
||||
let err = std::str::from_utf8(&pending_utf8)
|
||||
.expect_err("pending bytes should be invalid UTF-8 at end of stream");
|
||||
return Err(anyhow::anyhow!(
|
||||
"OpenAI Codex response ended with incomplete UTF-8: {err}"
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(text) = parse_sse_text(&body)? {
|
||||
@ -901,6 +976,21 @@ data: [DONE]
|
||||
assert_eq!(parse_sse_text(payload).unwrap().as_deref(), Some("Done"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_utf8_stream_chunks_handles_multibyte_split_across_chunks() {
|
||||
let payload =
|
||||
"data: {\"type\":\"response.output_text.delta\",\"delta\":\"Hello 世\"}\n\ndata: [DONE]\n";
|
||||
let bytes = payload.as_bytes();
|
||||
let split_at = payload.find('世').unwrap() + 1;
|
||||
|
||||
let decoded = decode_utf8_stream_chunks([&bytes[..split_at], &bytes[split_at..]]).unwrap();
|
||||
assert_eq!(decoded, payload);
|
||||
assert_eq!(
|
||||
parse_sse_text(&decoded).unwrap().as_deref(),
|
||||
Some("Hello 世")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_responses_input_maps_content_types_by_role() {
|
||||
let messages = vec![
|
||||
|
||||
@ -131,7 +131,12 @@ impl StaticMemoryLoader {
|
||||
|
||||
#[async_trait]
|
||||
impl MemoryLoader for StaticMemoryLoader {
|
||||
async fn load_context(&self, _memory: &dyn Memory, _user_message: &str) -> Result<String> {
|
||||
async fn load_context(
|
||||
&self,
|
||||
_memory: &dyn Memory,
|
||||
_user_message: &str,
|
||||
_session_id: Option<&str>,
|
||||
) -> Result<String> {
|
||||
Ok(self.context.clone())
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user