channels: persist sessions via SessionManager
Fix channel runtime history persistence (load/seed/update) and remove duplicate agent turn call in process_message.
This commit is contained in:
parent
fa8a98113e
commit
db47f569ce
@ -2163,6 +2163,7 @@ pub async fn process_message(
|
|||||||
sender_id: &str,
|
sender_id: &str,
|
||||||
channel_name: &str,
|
channel_name: &str,
|
||||||
) -> Result<String> {
|
) -> Result<String> {
|
||||||
|
tracing::warn!(sender_id, channel_name, "process_message called");
|
||||||
let observer: Arc<dyn Observer> =
|
let observer: Arc<dyn Observer> =
|
||||||
Arc::from(observability::create_observer(&config.observability));
|
Arc::from(observability::create_observer(&config.observability));
|
||||||
let runtime: Arc<dyn runtime::RuntimeAdapter> =
|
let runtime: Arc<dyn runtime::RuntimeAdapter> =
|
||||||
@ -2332,13 +2333,24 @@ pub async fn process_message(
|
|||||||
|
|
||||||
let session_manager = channel_session_manager(&config).await?;
|
let session_manager = channel_session_manager(&config).await?;
|
||||||
let session_id = resolve_session_id(&config.agent.session, sender_id, Some(channel_name));
|
let session_id = resolve_session_id(&config.agent.session, sender_id, Some(channel_name));
|
||||||
|
tracing::warn!(session_id, "session_id resolved");
|
||||||
if let Some(mgr) = session_manager {
|
if let Some(mgr) = session_manager {
|
||||||
let session = mgr.get_or_create(&session_id).await?;
|
let session = mgr.get_or_create(&session_id).await?;
|
||||||
|
let stored_history = session.get_history().await?;
|
||||||
|
tracing::warn!(
|
||||||
|
history_len = stored_history.len(),
|
||||||
|
"session history loaded"
|
||||||
|
);
|
||||||
|
let filtered_history: Vec<ChatMessage> = stored_history
|
||||||
|
.into_iter()
|
||||||
|
.filter(|m| m.role != "system")
|
||||||
|
.collect();
|
||||||
|
|
||||||
let mut history = Vec::new();
|
let mut history = Vec::new();
|
||||||
history.push(ChatMessage::system(&system_prompt));
|
history.push(ChatMessage::system(&system_prompt));
|
||||||
history.extend(session.get_history().await?);
|
history.extend(filtered_history);
|
||||||
history.push(ChatMessage::user(&enriched));
|
history.push(ChatMessage::user(&enriched));
|
||||||
let output = agent_turn(
|
let reply = agent_turn(
|
||||||
provider.as_ref(),
|
provider.as_ref(),
|
||||||
&mut history,
|
&mut history,
|
||||||
&tools_registry,
|
&tools_registry,
|
||||||
@ -2353,13 +2365,20 @@ pub async fn process_message(
|
|||||||
.await?;
|
.await?;
|
||||||
let persisted: Vec<ChatMessage> = history
|
let persisted: Vec<ChatMessage> = history
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter(|m| m.role != "system")
|
.filter(|m| {
|
||||||
|
m.role != "system"
|
||||||
|
&& m.role != "tool"
|
||||||
|
&& m.role != "tool_use"
|
||||||
|
&& m.role != "tool_result"
|
||||||
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
let saved_len = persisted.len();
|
||||||
session
|
session
|
||||||
.update_history(persisted)
|
.update_history(persisted)
|
||||||
.await
|
.await
|
||||||
.context("Failed to update session history")?;
|
.context("Failed to update session history")?;
|
||||||
Ok(output)
|
tracing::warn!(saved_len, "session history saved");
|
||||||
|
Ok(reply)
|
||||||
} else {
|
} else {
|
||||||
let mut history = vec![
|
let mut history = vec![
|
||||||
ChatMessage::system(&system_prompt),
|
ChatMessage::system(&system_prompt),
|
||||||
|
|||||||
@ -71,6 +71,7 @@ use crate::agent::loop_::{
|
|||||||
build_shell_policy_instructions, build_tool_instructions_from_specs,
|
build_shell_policy_instructions, build_tool_instructions_from_specs,
|
||||||
run_tool_call_loop_with_non_cli_approval_context, scrub_credentials, NonCliApprovalContext,
|
run_tool_call_loop_with_non_cli_approval_context, scrub_credentials, NonCliApprovalContext,
|
||||||
};
|
};
|
||||||
|
use crate::agent::session::{create_session_manager, resolve_session_id, SessionManager};
|
||||||
use crate::approval::{ApprovalManager, ApprovalResponse, PendingApprovalError};
|
use crate::approval::{ApprovalManager, ApprovalResponse, PendingApprovalError};
|
||||||
use crate::config::{Config, NonCliNaturalLanguageApprovalMode};
|
use crate::config::{Config, NonCliNaturalLanguageApprovalMode};
|
||||||
use crate::identity;
|
use crate::identity;
|
||||||
@ -94,6 +95,7 @@ use tokio_util::sync::CancellationToken;
|
|||||||
|
|
||||||
/// Per-sender conversation history for channel messages.
|
/// Per-sender conversation history for channel messages.
|
||||||
type ConversationHistoryMap = Arc<Mutex<HashMap<String, Vec<ChatMessage>>>>;
|
type ConversationHistoryMap = Arc<Mutex<HashMap<String, Vec<ChatMessage>>>>;
|
||||||
|
static CHANNEL_SESSION_CONFIG: OnceLock<crate::config::AgentSessionConfig> = OnceLock::new();
|
||||||
/// Maximum history messages to keep per sender.
|
/// Maximum history messages to keep per sender.
|
||||||
const MAX_CHANNEL_HISTORY: usize = 50;
|
const MAX_CHANNEL_HISTORY: usize = 50;
|
||||||
/// Minimum user-message length (in chars) for auto-save to memory.
|
/// Minimum user-message length (in chars) for auto-save to memory.
|
||||||
@ -270,6 +272,7 @@ struct ChannelRuntimeContext {
|
|||||||
max_tool_iterations: usize,
|
max_tool_iterations: usize,
|
||||||
min_relevance_score: f64,
|
min_relevance_score: f64,
|
||||||
conversation_histories: ConversationHistoryMap,
|
conversation_histories: ConversationHistoryMap,
|
||||||
|
session_manager: Option<Arc<dyn SessionManager + Send + Sync>>,
|
||||||
provider_cache: ProviderCacheMap,
|
provider_cache: ProviderCacheMap,
|
||||||
route_overrides: RouteSelectionMap,
|
route_overrides: RouteSelectionMap,
|
||||||
api_key: Option<String>,
|
api_key: Option<String>,
|
||||||
@ -3006,6 +3009,9 @@ async fn process_channel_message(
|
|||||||
msg: traits::ChannelMessage,
|
msg: traits::ChannelMessage,
|
||||||
cancellation_token: CancellationToken,
|
cancellation_token: CancellationToken,
|
||||||
) {
|
) {
|
||||||
|
let sender_id = msg.sender.as_str();
|
||||||
|
let channel_name = msg.channel.as_str();
|
||||||
|
tracing::warn!(sender_id, channel_name, "process_message called");
|
||||||
if cancellation_token.is_cancelled() {
|
if cancellation_token.is_cancelled() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -3099,6 +3105,50 @@ or tune thresholds in config.",
|
|||||||
}
|
}
|
||||||
|
|
||||||
let history_key = conversation_history_key(&msg);
|
let history_key = conversation_history_key(&msg);
|
||||||
|
if let Some(manager) = ctx.session_manager.as_ref() {
|
||||||
|
let should_seed = {
|
||||||
|
let histories = ctx
|
||||||
|
.conversation_histories
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(|e| e.into_inner());
|
||||||
|
!histories.contains_key(&history_key)
|
||||||
|
};
|
||||||
|
|
||||||
|
if should_seed {
|
||||||
|
let session_config = CHANNEL_SESSION_CONFIG.get().cloned().unwrap_or_default();
|
||||||
|
let session_id = resolve_session_id(
|
||||||
|
&session_config,
|
||||||
|
msg.sender.as_str(),
|
||||||
|
Some(msg.channel.as_str()),
|
||||||
|
);
|
||||||
|
tracing::warn!(session_id, "session_id resolved");
|
||||||
|
match manager.get_or_create(&session_id).await {
|
||||||
|
Ok(session) => match session.get_history().await {
|
||||||
|
Ok(history) => {
|
||||||
|
tracing::warn!(
|
||||||
|
history_len = history.len(),
|
||||||
|
"session history loaded"
|
||||||
|
);
|
||||||
|
let filtered: Vec<ChatMessage> = history
|
||||||
|
.into_iter()
|
||||||
|
.filter(|m| m.role != "system")
|
||||||
|
.collect();
|
||||||
|
let mut histories = ctx
|
||||||
|
.conversation_histories
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(|e| e.into_inner());
|
||||||
|
histories.entry(history_key.clone()).or_insert(filtered);
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
tracing::warn!("Failed to load session history: {err}");
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
tracing::warn!("Failed to open session: {err}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
// Try classification first, fall back to sender/default route
|
// Try classification first, fall back to sender/default route
|
||||||
let route = classify_message_route(ctx.as_ref(), &msg.content)
|
let route = classify_message_route(ctx.as_ref(), &msg.content)
|
||||||
.unwrap_or_else(|| get_route_selection(ctx.as_ref(), &history_key));
|
.unwrap_or_else(|| get_route_selection(ctx.as_ref(), &history_key));
|
||||||
@ -3517,6 +3567,44 @@ or tune thresholds in config.",
|
|||||||
&history_key,
|
&history_key,
|
||||||
ChatMessage::assistant(&history_response),
|
ChatMessage::assistant(&history_response),
|
||||||
);
|
);
|
||||||
|
if let Some(manager) = ctx.session_manager.as_ref() {
|
||||||
|
let session_config = CHANNEL_SESSION_CONFIG.get().cloned().unwrap_or_default();
|
||||||
|
let session_id = resolve_session_id(
|
||||||
|
&session_config,
|
||||||
|
msg.sender.as_str(),
|
||||||
|
Some(msg.channel.as_str()),
|
||||||
|
);
|
||||||
|
tracing::warn!(session_id, "session_id resolved");
|
||||||
|
match manager.get_or_create(&session_id).await {
|
||||||
|
Ok(session) => {
|
||||||
|
let latest = {
|
||||||
|
let histories = ctx
|
||||||
|
.conversation_histories
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(|e| e.into_inner());
|
||||||
|
histories.get(&history_key).cloned().unwrap_or_default()
|
||||||
|
};
|
||||||
|
let filtered: Vec<ChatMessage> = latest
|
||||||
|
.into_iter()
|
||||||
|
.filter(|m| {
|
||||||
|
m.role != "system"
|
||||||
|
&& m.role != "tool"
|
||||||
|
&& m.role != "tool_use"
|
||||||
|
&& m.role != "tool_result"
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let saved_len = filtered.len();
|
||||||
|
if let Err(err) = session.update_history(filtered).await {
|
||||||
|
tracing::warn!("Failed to update session history: {err}");
|
||||||
|
} else {
|
||||||
|
tracing::warn!(saved_len, "session history saved");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
tracing::warn!("Failed to open session: {err}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
println!(
|
println!(
|
||||||
" 🤖 Reply ({}ms): {}",
|
" 🤖 Reply ({}ms): {}",
|
||||||
started_at.elapsed().as_millis(),
|
started_at.elapsed().as_millis(),
|
||||||
@ -5141,6 +5229,10 @@ pub async fn start_channels(config: Config) -> Result<()> {
|
|||||||
.as_ref()
|
.as_ref()
|
||||||
.is_some_and(|tg| tg.interrupt_on_new_message);
|
.is_some_and(|tg| tg.interrupt_on_new_message);
|
||||||
|
|
||||||
|
let _ = CHANNEL_SESSION_CONFIG.set(config.agent.session.clone());
|
||||||
|
let session_manager = create_session_manager(&config.agent.session, &config.workspace_dir)?
|
||||||
|
.map(|mgr| mgr as Arc<dyn SessionManager + Send + Sync>);
|
||||||
|
|
||||||
let runtime_ctx = Arc::new(ChannelRuntimeContext {
|
let runtime_ctx = Arc::new(ChannelRuntimeContext {
|
||||||
channels_by_name,
|
channels_by_name,
|
||||||
provider: Arc::clone(&provider),
|
provider: Arc::clone(&provider),
|
||||||
@ -5155,6 +5247,7 @@ pub async fn start_channels(config: Config) -> Result<()> {
|
|||||||
max_tool_iterations: config.agent.max_tool_iterations,
|
max_tool_iterations: config.agent.max_tool_iterations,
|
||||||
min_relevance_score: config.memory.min_relevance_score,
|
min_relevance_score: config.memory.min_relevance_score,
|
||||||
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
session_manager,
|
||||||
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
|
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
|
||||||
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
||||||
api_key: config.api_key.clone(),
|
api_key: config.api_key.clone(),
|
||||||
@ -5503,6 +5596,7 @@ mod tests {
|
|||||||
max_tool_iterations: 5,
|
max_tool_iterations: 5,
|
||||||
min_relevance_score: 0.0,
|
min_relevance_score: 0.0,
|
||||||
conversation_histories: Arc::new(Mutex::new(histories)),
|
conversation_histories: Arc::new(Mutex::new(histories)),
|
||||||
|
session_manager: None,
|
||||||
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
||||||
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
||||||
api_key: None,
|
api_key: None,
|
||||||
@ -5557,6 +5651,7 @@ mod tests {
|
|||||||
max_tool_iterations: 5,
|
max_tool_iterations: 5,
|
||||||
min_relevance_score: 0.0,
|
min_relevance_score: 0.0,
|
||||||
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
session_manager: None,
|
||||||
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
||||||
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
||||||
api_key: None,
|
api_key: None,
|
||||||
@ -5614,6 +5709,7 @@ mod tests {
|
|||||||
max_tool_iterations: 5,
|
max_tool_iterations: 5,
|
||||||
min_relevance_score: 0.0,
|
min_relevance_score: 0.0,
|
||||||
conversation_histories: Arc::new(Mutex::new(histories)),
|
conversation_histories: Arc::new(Mutex::new(histories)),
|
||||||
|
session_manager: None,
|
||||||
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
||||||
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
||||||
api_key: None,
|
api_key: None,
|
||||||
@ -6212,6 +6308,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
|||||||
max_tool_iterations: 5,
|
max_tool_iterations: 5,
|
||||||
min_relevance_score: 0.0,
|
min_relevance_score: 0.0,
|
||||||
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
session_manager: None,
|
||||||
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
|
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
|
||||||
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
||||||
api_key: None,
|
api_key: None,
|
||||||
@ -6289,6 +6386,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
|||||||
max_tool_iterations: 10,
|
max_tool_iterations: 10,
|
||||||
min_relevance_score: 0.0,
|
min_relevance_score: 0.0,
|
||||||
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
session_manager: None,
|
||||||
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
||||||
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
||||||
api_key: None,
|
api_key: None,
|
||||||
@ -6353,6 +6451,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
|||||||
max_tool_iterations: 10,
|
max_tool_iterations: 10,
|
||||||
min_relevance_score: 0.0,
|
min_relevance_score: 0.0,
|
||||||
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
session_manager: None,
|
||||||
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
||||||
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
||||||
api_key: None,
|
api_key: None,
|
||||||
@ -6431,6 +6530,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
|||||||
max_tool_iterations: 10,
|
max_tool_iterations: 10,
|
||||||
min_relevance_score: 0.0,
|
min_relevance_score: 0.0,
|
||||||
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
session_manager: None,
|
||||||
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
||||||
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
||||||
api_key: None,
|
api_key: None,
|
||||||
@ -6508,6 +6608,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
|||||||
max_tool_iterations: 10,
|
max_tool_iterations: 10,
|
||||||
min_relevance_score: 0.0,
|
min_relevance_score: 0.0,
|
||||||
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
session_manager: None,
|
||||||
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
||||||
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
||||||
api_key: None,
|
api_key: None,
|
||||||
@ -6577,6 +6678,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
|||||||
max_tool_iterations: 10,
|
max_tool_iterations: 10,
|
||||||
min_relevance_score: 0.0,
|
min_relevance_score: 0.0,
|
||||||
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
session_manager: None,
|
||||||
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
||||||
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
||||||
api_key: None,
|
api_key: None,
|
||||||
@ -6641,6 +6743,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
|||||||
max_tool_iterations: 10,
|
max_tool_iterations: 10,
|
||||||
min_relevance_score: 0.0,
|
min_relevance_score: 0.0,
|
||||||
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
session_manager: None,
|
||||||
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
||||||
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
||||||
api_key: None,
|
api_key: None,
|
||||||
@ -6714,6 +6817,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
|||||||
max_tool_iterations: 5,
|
max_tool_iterations: 5,
|
||||||
min_relevance_score: 0.0,
|
min_relevance_score: 0.0,
|
||||||
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
session_manager: None,
|
||||||
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
|
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
|
||||||
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
||||||
api_key: None,
|
api_key: None,
|
||||||
@ -6815,6 +6919,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
|||||||
max_tool_iterations: 5,
|
max_tool_iterations: 5,
|
||||||
min_relevance_score: 0.0,
|
min_relevance_score: 0.0,
|
||||||
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
session_manager: None,
|
||||||
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
|
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
|
||||||
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
||||||
api_key: None,
|
api_key: None,
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user