From 2a4902c3a595d3511fb955c8138c1a856c174fb7 Mon Sep 17 00:00:00 2001 From: VirtualHotBar <96966978+VirtualHotBar@users.noreply.github.com> Date: Sat, 28 Feb 2026 17:34:00 +0800 Subject: [PATCH] fix(qq): stabilize conversation history key --- src/channels/mod.rs | 387 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 314 insertions(+), 73 deletions(-) diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 84075b5b0..ef279b9a8 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -71,7 +71,7 @@ use crate::agent::loop_::{ build_shell_policy_instructions, build_tool_instructions_from_specs, run_tool_call_loop_with_non_cli_approval_context, scrub_credentials, NonCliApprovalContext, }; -use crate::agent::session::{create_session_manager, resolve_session_id, SessionManager}; +use crate::agent::session::{resolve_session_id, shared_session_manager, Session, SessionManager}; use crate::approval::{ApprovalManager, ApprovalResponse, PendingApprovalError}; use crate::config::{Config, NonCliNaturalLanguageApprovalMode}; use crate::identity; @@ -95,7 +95,8 @@ use tokio_util::sync::CancellationToken; /// Per-sender conversation history for channel messages. type ConversationHistoryMap = Arc>>>; -static CHANNEL_SESSION_CONFIG: OnceLock = OnceLock::new(); +type ConversationLockMap = + Arc>>>>; /// Maximum history messages to keep per sender. const MAX_CHANNEL_HISTORY: usize = 50; /// Minimum user-message length (in chars) for auto-save to memory. @@ -126,6 +127,9 @@ const MEMORY_CONTEXT_ENTRY_MAX_CHARS: usize = 800; const MEMORY_CONTEXT_MAX_CHARS: usize = 4_000; const CHANNEL_HISTORY_COMPACT_KEEP_MESSAGES: usize = 12; const CHANNEL_HISTORY_COMPACT_CONTENT_CHARS: usize = 600; +const CHANNEL_CONTEXT_TOKEN_ESTIMATE_LIMIT: usize = 90_000; +const CHANNEL_CONTEXT_TOKEN_ESTIMATE_TARGET: usize = 80_000; +const CHANNEL_CONTEXT_MIN_KEEP_NON_SYSTEM_MESSAGES: usize = 10; /// Guardrail for hook-modified outbound channel content. const CHANNEL_HOOK_MAX_OUTBOUND_CHARS: usize = 20_000; @@ -272,6 +276,8 @@ struct ChannelRuntimeContext { max_tool_iterations: usize, min_relevance_score: f64, conversation_histories: ConversationHistoryMap, + conversation_locks: ConversationLockMap, + session_config: crate::config::AgentSessionConfig, session_manager: Option>, provider_cache: ProviderCacheMap, route_overrides: RouteSelectionMap, @@ -333,8 +339,10 @@ fn conversation_memory_key(msg: &traits::ChannelMessage) -> String { fn conversation_history_key(msg: &traits::ChannelMessage) -> String { // Include thread_ts for per-topic session isolation in forum groups - match &msg.thread_ts { + let channel = msg.channel.as_str(); + match msg.thread_ts.as_deref().filter(|_| channel != "qq") { Some(tid) => format!("{}_{}_{}", msg.channel, tid, msg.sender), + None if channel == "qq" => format!("{}_{}_{}", msg.channel, msg.reply_target, msg.sender), None => format!("{}_{}", msg.channel, msg.sender), } } @@ -1676,6 +1684,40 @@ fn append_sender_turn(ctx: &ChannelRuntimeContext, sender_key: &str, turn: ChatM } } +fn estimated_message_tokens(message: &ChatMessage) -> usize { + (message.content.chars().count().saturating_add(2) / 3).saturating_add(4) +} + +fn estimated_history_tokens(history: &[ChatMessage]) -> usize { + history.iter().map(estimated_message_tokens).sum() +} + +fn trim_channel_prompt_history(history: &mut Vec) -> bool { + let mut total = estimated_history_tokens(history); + if total <= CHANNEL_CONTEXT_TOKEN_ESTIMATE_LIMIT { + return false; + } + + let mut trimmed = false; + loop { + if total <= CHANNEL_CONTEXT_TOKEN_ESTIMATE_TARGET { + break; + } + let non_system = history.iter().filter(|m| m.role != "system").count(); + if non_system <= CHANNEL_CONTEXT_MIN_KEEP_NON_SYSTEM_MESSAGES { + break; + } + let Some(idx) = history.iter().position(|m| m.role != "system") else { + break; + }; + let removed = history.remove(idx); + total = total.saturating_sub(estimated_message_tokens(&removed)); + trimmed = true; + } + + trimmed +} + fn rollback_orphan_user_turn( ctx: &ChannelRuntimeContext, sender_key: &str, @@ -3044,7 +3086,33 @@ or tune thresholds in config.", } let history_key = conversation_history_key(&msg); + let conversation_lock = { + let mut locks = ctx.conversation_locks.lock().await; + locks + .entry(history_key.clone()) + .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))) + .clone() + }; + let _conversation_guard = conversation_lock.lock().await; + let mut session: Option = None; if let Some(manager) = ctx.session_manager.as_ref() { + let session_id = resolve_session_id( + &ctx.session_config, + msg.sender.as_str(), + Some(msg.channel.as_str()), + ); + tracing::debug!(session_id, "session_id resolved"); + match manager.get_or_create(&session_id).await { + Ok(opened) => { + session = Some(opened); + } + Err(err) => { + tracing::warn!("Failed to open session: {err}"); + } + } + } + + if let Some(session) = session.as_ref() { let should_seed = { let histories = ctx .conversation_histories @@ -3054,35 +3122,23 @@ or tune thresholds in config.", }; if should_seed { - if let Some(session_config) = CHANNEL_SESSION_CONFIG.get().cloned() { - let session_id = resolve_session_id( - &session_config, - msg.sender.as_str(), - Some(msg.channel.as_str()), - ); - tracing::debug!(session_id, "session_id resolved"); - match manager.get_or_create(&session_id).await { - Ok(session) => match session.get_history().await { - Ok(history) => { - tracing::debug!(history_len = history.len(), "session history loaded"); - let filtered: Vec = - history.into_iter().filter(|m| m.role != "system").collect(); - let mut histories = ctx - .conversation_histories - .lock() - .unwrap_or_else(|e| e.into_inner()); - histories.entry(history_key.clone()).or_insert(filtered); - } - Err(err) => { - tracing::warn!("Failed to load session history: {err}"); - } - }, - Err(err) => { - tracing::warn!("Failed to open session: {err}"); - } + match session.get_history().await { + Ok(history) => { + tracing::debug!(history_len = history.len(), "session history loaded"); + let filtered: Vec = + history + .into_iter() + .filter(|m| crate::providers::is_user_or_assistant_role(m.role.as_str())) + .collect(); + let mut histories = ctx + .conversation_histories + .lock() + .unwrap_or_else(|e| e.into_inner()); + histories.entry(history_key.clone()).or_insert(filtered); + } + Err(err) => { + tracing::warn!("Failed to load session history: {err}"); } - } else { - tracing::warn!("CHANNEL_SESSION_CONFIG not initialized, skipping session"); } } } @@ -3186,6 +3242,7 @@ or tune thresholds in config.", )); let mut history = vec![ChatMessage::system(system_prompt)]; history.extend(prior_turns); + let _ = trim_channel_prompt_history(&mut history); let use_streaming = target_channel .as_ref() .is_some_and(|ch| ch.supports_draft_updates()); @@ -3504,45 +3561,23 @@ or tune thresholds in config.", &history_key, ChatMessage::assistant(&history_response), ); - if let Some(manager) = ctx.session_manager.as_ref() { - if let Some(session_config) = CHANNEL_SESSION_CONFIG.get().cloned() { - let session_id = resolve_session_id( - &session_config, - msg.sender.as_str(), - Some(msg.channel.as_str()), - ); - tracing::debug!(session_id, "session_id resolved"); - match manager.get_or_create(&session_id).await { - Ok(session) => { - let latest = { - let histories = ctx - .conversation_histories - .lock() - .unwrap_or_else(|e| e.into_inner()); - histories.get(&history_key).cloned().unwrap_or_default() - }; - let filtered: Vec = latest - .into_iter() - .filter(|m| { - m.role != "system" - && m.role != "tool" - && m.role != "tool_use" - && m.role != "tool_result" - }) - .collect(); - let saved_len = filtered.len(); - if let Err(err) = session.update_history(filtered).await { - tracing::warn!("Failed to update session history: {err}"); - } else { - tracing::debug!(saved_len, "session history saved"); - } - } - Err(err) => { - tracing::warn!("Failed to open session: {err}"); - } - } + if let Some(session) = session.as_ref() { + let latest = { + let histories = ctx + .conversation_histories + .lock() + .unwrap_or_else(|e| e.into_inner()); + histories.get(&history_key).cloned().unwrap_or_default() + }; + let filtered: Vec = latest + .into_iter() + .filter(|m| crate::providers::is_user_or_assistant_role(m.role.as_str())) + .collect(); + let saved_len = filtered.len(); + if let Err(err) = session.update_history(filtered).await { + tracing::warn!("Failed to update session history: {err}"); } else { - tracing::warn!("CHANNEL_SESSION_CONFIG not initialized, skipping session"); + tracing::debug!(saved_len, "session history saved"); } } println!( @@ -5170,8 +5205,7 @@ pub async fn start_channels(config: Config) -> Result<()> { .as_ref() .is_some_and(|tg| tg.interrupt_on_new_message); - let _ = CHANNEL_SESSION_CONFIG.set(config.agent.session.clone()); - let session_manager = create_session_manager(&config.agent.session, &config.workspace_dir)? + let session_manager = shared_session_manager(&config.agent.session, &config.workspace_dir)? .map(|mgr| mgr as Arc); let runtime_ctx = Arc::new(ChannelRuntimeContext { @@ -5188,6 +5222,8 @@ pub async fn start_channels(config: Config) -> Result<()> { max_tool_iterations: config.agent.max_tool_iterations, min_relevance_score: config.memory.min_relevance_score, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + session_config: config.agent.session.clone(), session_manager, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -5538,6 +5574,8 @@ mod tests { max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(histories)), + conversation_locks: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -5593,6 +5631,8 @@ mod tests { max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -5651,6 +5691,8 @@ mod tests { max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(histories)), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -5709,6 +5751,11 @@ mod tests { reactions_removed: tokio::sync::Mutex>, } + #[derive(Default)] + struct QqRecordingChannel { + sent_messages: tokio::sync::Mutex>, + } + #[derive(Default)] struct TelegramRecordingChannel { sent_messages: tokio::sync::Mutex>, @@ -5865,6 +5912,36 @@ mod tests { } } + #[async_trait::async_trait] + impl Channel for QqRecordingChannel { + fn name(&self) -> &str { + "qq" + } + + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { + self.sent_messages + .lock() + .await + .push(format!("{}:{}", message.recipient, message.content)); + Ok(()) + } + + async fn listen( + &self, + _tx: tokio::sync::mpsc::Sender, + ) -> anyhow::Result<()> { + Ok(()) + } + + async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> { + Ok(()) + } + + async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> { + Ok(()) + } + } + struct SlowProvider { delay: Duration, } @@ -6250,6 +6327,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -6328,6 +6407,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -6393,6 +6474,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -6472,6 +6555,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -6550,6 +6635,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -6620,6 +6707,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -6685,6 +6774,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -6759,6 +6850,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -6861,6 +6954,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -7012,6 +7107,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -7123,6 +7220,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -7229,6 +7328,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -7326,6 +7427,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -7336,7 +7439,7 @@ BTC is currently around $65,000 based on latest tool output."# zeroclaw_dir: Some(temp.path().to_path_buf()), ..providers::ProviderRuntimeOptions::default() }, - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_dir: Arc::new(temp.path().join("workspace")), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, interrupt_on_new_message: false, multimodal: crate::config::MultimodalConfig::default(), @@ -7473,6 +7576,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -7483,7 +7588,7 @@ BTC is currently around $65,000 based on latest tool output."# zeroclaw_dir: Some(temp.path().to_path_buf()), ..providers::ProviderRuntimeOptions::default() }, - workspace_dir: Arc::new(std::env::temp_dir()), + workspace_dir: Arc::new(temp.path().join("workspace")), message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, interrupt_on_new_message: false, multimodal: crate::config::MultimodalConfig::default(), @@ -7568,6 +7673,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -7714,6 +7821,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -7830,6 +7939,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -7926,6 +8037,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -8044,6 +8157,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -8160,6 +8275,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(route_overrides)), @@ -8237,6 +8354,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -8327,6 +8446,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -8473,6 +8594,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -8580,6 +8703,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 12, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -8646,6 +8771,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 3, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -8824,6 +8951,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -8910,6 +9039,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -9008,6 +9139,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -9088,6 +9221,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -9153,6 +9288,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -9675,6 +9812,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -9743,6 +9882,100 @@ BTC is currently around $65,000 based on latest tool output."# assert!(calls[1][3].1.contains("follow up")); } + #[tokio::test] + async fn process_channel_message_qq_keeps_history_across_distinct_message_ids() { + let channel_impl = Arc::new(QqRecordingChannel::default()); + let channel: Arc = channel_impl.clone(); + + let mut channels_by_name = HashMap::new(); + channels_by_name.insert(channel.name().to_string(), channel); + + let provider_impl = Arc::new(HistoryCaptureProvider::default()); + + let runtime_ctx = Arc::new(ChannelRuntimeContext { + channels_by_name: Arc::new(channels_by_name), + provider: provider_impl.clone(), + default_provider: Arc::new("test-provider".to_string()), + memory: Arc::new(NoopMemory), + tools_registry: Arc::new(vec![]), + observer: Arc::new(NoopObserver), + system_prompt: Arc::new("test-system-prompt".to_string()), + model: Arc::new("test-model".to_string()), + temperature: 0.0, + auto_save_memory: false, + max_tool_iterations: 5, + min_relevance_score: 0.0, + conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), + session_manager: None, + provider_cache: Arc::new(Mutex::new(HashMap::new())), + route_overrides: Arc::new(Mutex::new(HashMap::new())), + api_key: None, + api_url: None, + reliability: Arc::new(crate::config::ReliabilityConfig::default()), + provider_runtime_options: providers::ProviderRuntimeOptions::default(), + workspace_dir: Arc::new(std::env::temp_dir()), + message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS, + interrupt_on_new_message: false, + multimodal: crate::config::MultimodalConfig::default(), + hooks: None, + non_cli_excluded_tools: Arc::new(Mutex::new(Vec::new())), + query_classification: crate::config::QueryClassificationConfig::default(), + model_routes: Vec::new(), + approval_manager: Arc::new(ApprovalManager::from_config( + &crate::config::AutonomyConfig::default(), + )), + }); + + process_channel_message( + runtime_ctx.clone(), + traits::ChannelMessage { + id: "msg-a".to_string(), + sender: "alice".to_string(), + reply_target: "group:1".to_string(), + content: "hello".to_string(), + channel: "qq".to_string(), + timestamp: 1, + thread_ts: Some("msg-1".to_string()), + }, + CancellationToken::new(), + ) + .await; + + process_channel_message( + runtime_ctx, + traits::ChannelMessage { + id: "msg-b".to_string(), + sender: "alice".to_string(), + reply_target: "group:1".to_string(), + content: "follow up".to_string(), + channel: "qq".to_string(), + timestamp: 2, + thread_ts: Some("msg-2".to_string()), + }, + CancellationToken::new(), + ) + .await; + + let calls = provider_impl + .calls + .lock() + .unwrap_or_else(|e| e.into_inner()); + assert_eq!(calls.len(), 2); + assert_eq!(calls[0].len(), 2); + assert_eq!(calls[0][0].0, "system"); + assert_eq!(calls[0][1].0, "user"); + assert_eq!(calls[1].len(), 4); + assert_eq!(calls[1][0].0, "system"); + assert_eq!(calls[1][1].0, "user"); + assert_eq!(calls[1][2].0, "assistant"); + assert_eq!(calls[1][3].0, "user"); + assert!(calls[1][1].1.contains("hello")); + assert!(calls[1][2].1.contains("response-1")); + assert!(calls[1][3].1.contains("follow up")); + } + #[tokio::test] async fn process_channel_message_enriches_current_turn_without_persisting_context() { let channel_impl = Arc::new(RecordingChannel::default()); @@ -9766,6 +9999,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -9857,6 +10092,8 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(histories)), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -10581,6 +10818,8 @@ BTC is currently around $65,000 based on latest tool output."#; max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), @@ -10653,6 +10892,8 @@ BTC is currently around $65,000 based on latest tool output."#; max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + conversation_locks: Default::default(), + session_config: crate::config::AgentSessionConfig::default(), session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())),