fix: session leftovers after db47f56

- Demote session normal flow logs to debug\n- Skip session operations when CHANNEL_SESSION_CONFIG is uninitialized\n- Add spawn_blocking panic context for SQLite session manager\n- Fix fmt/clippy regressions (Box::pin large futures, cfg features, misc lints)
This commit is contained in:
VirtualHotBar 2026-02-28 15:23:42 +08:00
parent db47f569ce
commit 0a42329ca5
29 changed files with 413 additions and 269 deletions

View File

@ -231,6 +231,9 @@ rag-pdf = ["dep:pdf-extract"]
wasm-tools = ["dep:wasmtime", "dep:wasmtime-wasi"]
# whatsapp-web = Native WhatsApp Web client with custom rusqlite storage backend
whatsapp-web = ["dep:wa-rs", "dep:wa-rs-core", "dep:wa-rs-binary", "dep:wa-rs-proto", "dep:wa-rs-ureq-http", "dep:wa-rs-tokio-transport", "dep:serde-big-array", "dep:prost", "dep:qrcode"]
firecrawl = []
web-fetch-html2md = []
web-fetch-plaintext = []
[profile.release]
opt-level = "z" # Optimize for size

View File

@ -32,6 +32,7 @@ mod execution;
mod history;
mod parsing;
use crate::agent::session::{create_session_manager, resolve_session_id, SessionManager};
use context::{build_context, build_hardware_context};
use execution::{
execute_tools_parallel, execute_tools_sequential, should_execute_tools_in_parallel,
@ -47,7 +48,6 @@ use parsing::{
parse_perl_style_tool_calls, parse_structured_tool_calls, parse_tool_call_value,
parse_tool_calls, parse_tool_calls_from_json_value, tool_call_signature, ParsedToolCall,
};
use crate::agent::session::{create_session_manager, resolve_session_id, SessionManager};
/// Minimum characters per chunk when relaying LLM text to a streaming draft.
const STREAM_CHUNK_MIN_CHARS: usize = 80;
@ -150,8 +150,12 @@ impl Helper for SlashCommandCompleter {}
static CHANNEL_SESSION_MANAGER: LazyLock<Mutex<HashMap<String, Arc<dyn SessionManager>>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
async fn channel_session_manager(config: &Config) -> Result<Option<Arc<dyn SessionManager>>> {
let key = format!("{:?}:{:?}", config.workspace_dir, config.agent.session);
fn channel_session_manager(config: &Config) -> Result<Option<Arc<dyn SessionManager>>> {
let key = format!(
"{}:{:?}",
config.workspace_dir.display(),
config.agent.session
);
{
let map = CHANNEL_SESSION_MANAGER.lock().unwrap();
@ -951,7 +955,7 @@ pub(crate) async fn run_tool_call_loop(
Some(model),
Some(&turn_id),
Some(false),
Some(&parse_issue),
Some(parse_issue),
serde_json::json!({
"iteration": iteration + 1,
"response_excerpt": truncate_with_ellipsis(
@ -2163,7 +2167,7 @@ pub async fn process_message(
sender_id: &str,
channel_name: &str,
) -> Result<String> {
tracing::warn!(sender_id, channel_name, "process_message called");
tracing::debug!(sender_id, channel_name, "process_message called");
let observer: Arc<dyn Observer> =
Arc::from(observability::create_observer(&config.observability));
let runtime: Arc<dyn runtime::RuntimeAdapter> =
@ -2331,16 +2335,13 @@ pub async fn process_message(
format!("{context}[{now}] {message}")
};
let session_manager = channel_session_manager(&config).await?;
let session_manager = channel_session_manager(&config)?;
let session_id = resolve_session_id(&config.agent.session, sender_id, Some(channel_name));
tracing::warn!(session_id, "session_id resolved");
tracing::debug!(session_id, "session_id resolved");
if let Some(mgr) = session_manager {
let session = mgr.get_or_create(&session_id).await?;
let stored_history = session.get_history().await?;
tracing::warn!(
history_len = stored_history.len(),
"session history loaded"
);
tracing::debug!(history_len = stored_history.len(), "session history loaded");
let filtered_history: Vec<ChatMessage> = stored_history
.into_iter()
.filter(|m| m.role != "system")
@ -2377,7 +2378,7 @@ pub async fn process_message(
.update_history(persisted)
.await
.context("Failed to update session history")?;
tracing::warn!(saved_len, "session history saved");
tracing::debug!(saved_len, "session history saved");
Ok(reply)
} else {
let mut history = vec![

View File

@ -115,7 +115,9 @@ impl PromptSection for IdentitySection {
inject_workspace_file(&mut prompt, ctx.workspace_dir, "MEMORY.md");
}
let extra_files = ctx.identity_config.map_or(&[][..], |cfg| cfg.extra_files.as_slice());
let extra_files = ctx
.identity_config
.map_or(&[][..], |cfg| cfg.extra_files.as_slice());
for file in extra_files {
if let Some(safe_relative) = normalize_openclaw_identity_extra_file(file) {
inject_workspace_file(&mut prompt, ctx.workspace_dir, safe_relative);

View File

@ -1,5 +1,7 @@
use crate::providers::ChatMessage;
use crate::{config::AgentSessionBackend, config::AgentSessionConfig, config::AgentSessionStrategy};
use crate::{
config::AgentSessionBackend, config::AgentSessionConfig, config::AgentSessionStrategy,
};
use anyhow::{Context, Result};
use async_trait::async_trait;
use parking_lot::Mutex;
@ -147,10 +149,12 @@ impl SessionManager for MemorySessionManager {
async fn get_history(&self, session_id: &str) -> Result<Vec<ChatMessage>> {
let mut sessions = self.inner.sessions.write().await;
let now = unix_seconds_now();
let entry = sessions.entry(session_id.to_string()).or_insert_with(|| MemorySessionState {
history: Vec::new(),
updated_at_unix: now,
});
let entry = sessions
.entry(session_id.to_string())
.or_insert_with(|| MemorySessionState {
history: Vec::new(),
updated_at_unix: now,
});
entry.updated_at_unix = now;
Ok(entry.history.clone())
}
@ -243,7 +247,7 @@ impl SqliteSessionManager {
let conn = self.conn.clone();
let session_id = session_id.to_string();
let age_secs = age.as_secs() as i64;
tokio::task::spawn_blocking(move || {
let conn = conn.lock();
let new_time = unix_seconds_now() - age_secs;
@ -252,7 +256,9 @@ impl SqliteSessionManager {
params![session_id, new_time],
)?;
Ok(())
}).await?
})
.await
.context("SQLite blocking task panicked")?
}
}
@ -291,7 +297,9 @@ impl SessionManager for SqliteSessionManager {
params![session_id, now],
)?;
Ok(Vec::new())
}).await?
})
.await
.context("SQLite blocking task panicked")?
}
async fn set_history(&self, session_id: &str, mut history: Vec<ChatMessage>) -> Result<()> {
@ -310,13 +318,15 @@ impl SessionManager for SqliteSessionManager {
params![session_id, json, now],
)?;
Ok(())
}).await?
})
.await
.context("SQLite blocking task panicked")?
}
async fn delete(&self, session_id: &str) -> Result<()> {
let conn = self.conn.clone();
let session_id = session_id.to_string();
tokio::task::spawn_blocking(move || {
let conn = conn.lock();
conn.execute(
@ -324,7 +334,9 @@ impl SessionManager for SqliteSessionManager {
params![session_id],
)?;
Ok(())
}).await?
})
.await
.context("SQLite blocking task panicked")?
}
async fn cleanup_expired(&self) -> Result<usize> {
@ -333,7 +345,7 @@ impl SessionManager for SqliteSessionManager {
}
let conn = self.conn.clone();
let ttl_secs = self.ttl.as_secs() as i64;
tokio::task::spawn_blocking(move || {
let cutoff = unix_seconds_now() - ttl_secs;
let conn = conn.lock();
@ -342,7 +354,9 @@ impl SessionManager for SqliteSessionManager {
params![cutoff],
)?;
Ok(removed)
}).await?
})
.await
.context("SQLite blocking task panicked")?
}
}
@ -472,10 +486,11 @@ mod tests {
session
.update_history(vec![ChatMessage::user("hi"), ChatMessage::assistant("ok")])
.await?;
// Force expire by setting age to 2 seconds
mgr.force_expire_session("s1", Duration::from_secs(2)).await?;
mgr.force_expire_session("s1", Duration::from_secs(2))
.await?;
let removed = mgr.cleanup_expired().await?;
assert!(removed >= 1);
Ok(())

View File

@ -3011,7 +3011,7 @@ async fn process_channel_message(
) {
let sender_id = msg.sender.as_str();
let channel_name = msg.channel.as_str();
tracing::warn!(sender_id, channel_name, "process_message called");
tracing::debug!(sender_id, channel_name, "process_message called");
if cancellation_token.is_cancelled() {
return;
}
@ -3115,37 +3115,35 @@ or tune thresholds in config.",
};
if should_seed {
let session_config = CHANNEL_SESSION_CONFIG.get().cloned().unwrap_or_default();
let session_id = resolve_session_id(
&session_config,
msg.sender.as_str(),
Some(msg.channel.as_str()),
);
tracing::warn!(session_id, "session_id resolved");
match manager.get_or_create(&session_id).await {
Ok(session) => match session.get_history().await {
Ok(history) => {
tracing::warn!(
history_len = history.len(),
"session history loaded"
);
let filtered: Vec<ChatMessage> = history
.into_iter()
.filter(|m| m.role != "system")
.collect();
let mut histories = ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
histories.entry(history_key.clone()).or_insert(filtered);
}
if let Some(session_config) = CHANNEL_SESSION_CONFIG.get().cloned() {
let session_id = resolve_session_id(
&session_config,
msg.sender.as_str(),
Some(msg.channel.as_str()),
);
tracing::debug!(session_id, "session_id resolved");
match manager.get_or_create(&session_id).await {
Ok(session) => match session.get_history().await {
Ok(history) => {
tracing::debug!(history_len = history.len(), "session history loaded");
let filtered: Vec<ChatMessage> =
history.into_iter().filter(|m| m.role != "system").collect();
let mut histories = ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
histories.entry(history_key.clone()).or_insert(filtered);
}
Err(err) => {
tracing::warn!("Failed to load session history: {err}");
}
},
Err(err) => {
tracing::warn!("Failed to load session history: {err}");
tracing::warn!("Failed to open session: {err}");
}
},
Err(err) => {
tracing::warn!("Failed to open session: {err}");
}
} else {
tracing::warn!("CHANNEL_SESSION_CONFIG not initialized, skipping session");
}
}
}
@ -3568,41 +3566,44 @@ or tune thresholds in config.",
ChatMessage::assistant(&history_response),
);
if let Some(manager) = ctx.session_manager.as_ref() {
let session_config = CHANNEL_SESSION_CONFIG.get().cloned().unwrap_or_default();
let session_id = resolve_session_id(
&session_config,
msg.sender.as_str(),
Some(msg.channel.as_str()),
);
tracing::warn!(session_id, "session_id resolved");
match manager.get_or_create(&session_id).await {
Ok(session) => {
let latest = {
let histories = ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
histories.get(&history_key).cloned().unwrap_or_default()
};
let filtered: Vec<ChatMessage> = latest
.into_iter()
.filter(|m| {
m.role != "system"
&& m.role != "tool"
&& m.role != "tool_use"
&& m.role != "tool_result"
})
.collect();
let saved_len = filtered.len();
if let Err(err) = session.update_history(filtered).await {
tracing::warn!("Failed to update session history: {err}");
} else {
tracing::warn!(saved_len, "session history saved");
if let Some(session_config) = CHANNEL_SESSION_CONFIG.get().cloned() {
let session_id = resolve_session_id(
&session_config,
msg.sender.as_str(),
Some(msg.channel.as_str()),
);
tracing::debug!(session_id, "session_id resolved");
match manager.get_or_create(&session_id).await {
Ok(session) => {
let latest = {
let histories = ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
histories.get(&history_key).cloned().unwrap_or_default()
};
let filtered: Vec<ChatMessage> = latest
.into_iter()
.filter(|m| {
m.role != "system"
&& m.role != "tool"
&& m.role != "tool_use"
&& m.role != "tool_result"
})
.collect();
let saved_len = filtered.len();
if let Err(err) = session.update_history(filtered).await {
tracing::warn!("Failed to update session history: {err}");
} else {
tracing::debug!(saved_len, "session history saved");
}
}
Err(err) => {
tracing::warn!("Failed to open session: {err}");
}
}
Err(err) => {
tracing::warn!("Failed to open session: {err}");
}
} else {
tracing::warn!("CHANNEL_SESSION_CONFIG not initialized, skipping session");
}
}
println!(
@ -7070,6 +7071,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -7180,6 +7182,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -7285,6 +7288,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -7381,6 +7385,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -7527,6 +7532,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -7621,6 +7627,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -7766,6 +7773,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -7881,6 +7889,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -7976,6 +7985,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -8093,6 +8103,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -8208,6 +8219,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(route_overrides)),
api_key: None,
@ -8284,6 +8296,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -8373,6 +8386,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -8518,6 +8532,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: Some("http://127.0.0.1:11434".to_string()),
@ -8624,6 +8639,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 12,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -8689,6 +8705,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 3,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -8866,6 +8883,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -8951,6 +8969,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -9048,6 +9067,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -9127,6 +9147,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -9191,6 +9212,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -9712,6 +9734,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -9802,6 +9825,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -9892,6 +9916,7 @@ BTC is currently around $65,000 based on latest tool output."#
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(histories)),
session_manager: None,
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -10596,6 +10621,7 @@ BTC is currently around $65,000 based on latest tool output."#;
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
@ -10667,6 +10693,7 @@ BTC is currently around $65,000 based on latest tool output."#;
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
session_manager: None,
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,

View File

@ -735,7 +735,7 @@ impl TelegramChannel {
}
fn log_poll_transport_error(sanitized: &str, consecutive_failures: u32) {
if consecutive_failures >= 6 && consecutive_failures % 6 == 0 {
if consecutive_failures >= 6 && consecutive_failures.is_multiple_of(6) {
tracing::warn!(
"Telegram poll transport error persists (consecutive={}): {}",
consecutive_failures,

View File

@ -6,23 +6,22 @@ pub use schema::{
apply_runtime_proxy_to_builder, build_runtime_proxy_client,
build_runtime_proxy_client_with_timeouts, runtime_proxy_config, set_runtime_proxy_config,
AgentConfig, AgentSessionBackend, AgentSessionConfig, AgentSessionStrategy, AgentsIpcConfig,
AuditConfig, AutonomyConfig, BrowserComputerUseConfig,
BrowserConfig, BuiltinHooksConfig, ChannelsConfig, ClassificationRule, ComposioConfig, Config,
CoordinationConfig, CostConfig, CronConfig, DelegateAgentConfig, DiscordConfig,
DockerRuntimeConfig, EmbeddingRouteConfig, EstopConfig, FeishuConfig, GatewayConfig,
GroupReplyConfig, GroupReplyMode, HardwareConfig, HardwareTransport, HeartbeatConfig,
HooksConfig, HttpRequestConfig, IMessageConfig, IdentityConfig, LarkConfig, MatrixConfig,
MemoryConfig, ModelRouteConfig, MultimodalConfig, NextcloudTalkConfig, ObservabilityConfig,
AuditConfig, AutonomyConfig, BrowserComputerUseConfig, BrowserConfig, BuiltinHooksConfig,
ChannelsConfig, ClassificationRule, ComposioConfig, Config, CoordinationConfig, CostConfig,
CronConfig, DelegateAgentConfig, DiscordConfig, DockerRuntimeConfig, EmbeddingRouteConfig,
EstopConfig, FeishuConfig, GatewayConfig, GroupReplyConfig, GroupReplyMode, HardwareConfig,
HardwareTransport, HeartbeatConfig, HooksConfig, HttpRequestConfig, IMessageConfig,
IdentityConfig, LarkConfig, MatrixConfig, MemoryConfig, ModelRouteConfig, MultimodalConfig,
NextcloudTalkConfig, NonCliNaturalLanguageApprovalMode, ObservabilityConfig,
OtpChallengeDelivery, OtpConfig, OtpMethod, PeripheralBoardConfig, PeripheralsConfig,
NonCliNaturalLanguageApprovalMode, PerplexityFilterConfig, PluginEntryConfig, PluginsConfig,
ProviderConfig, ProxyConfig, ProxyScope, QdrantConfig, QueryClassificationConfig,
ReliabilityConfig, ResearchPhaseConfig, ResearchTrigger, ResourceLimitsConfig, RuntimeConfig,
SandboxBackend, SandboxConfig, SchedulerConfig, SecretsConfig, SecurityConfig,
SecurityRoleConfig, SkillsConfig, SkillsPromptInjectionMode, SlackConfig, StorageConfig,
StorageProviderConfig, StorageProviderSection, StreamMode, SyscallAnomalyConfig,
TelegramConfig, TranscriptionConfig, TunnelConfig, UrlAccessConfig,
WasmCapabilityEscalationMode, WasmConfig, WasmModuleHashPolicy, WasmRuntimeConfig,
WasmSecurityConfig, WebFetchConfig, WebSearchConfig, WebhookConfig,
PerplexityFilterConfig, PluginEntryConfig, PluginsConfig, ProviderConfig, ProxyConfig,
ProxyScope, QdrantConfig, QueryClassificationConfig, ReliabilityConfig, ResearchPhaseConfig,
ResearchTrigger, ResourceLimitsConfig, RuntimeConfig, SandboxBackend, SandboxConfig,
SchedulerConfig, SecretsConfig, SecurityConfig, SecurityRoleConfig, SkillsConfig,
SkillsPromptInjectionMode, SlackConfig, StorageConfig, StorageProviderConfig,
StorageProviderSection, StreamMode, SyscallAnomalyConfig, TelegramConfig, TranscriptionConfig,
TunnelConfig, UrlAccessConfig, WasmCapabilityEscalationMode, WasmConfig, WasmModuleHashPolicy,
WasmRuntimeConfig, WasmSecurityConfig, WebFetchConfig, WebSearchConfig, WebhookConfig,
};
pub fn name_and_presence<T: traits::ChannelConfig>(channel: Option<&T>) -> (&'static str, bool) {

View File

@ -5293,7 +5293,10 @@ pub(crate) async fn persist_active_workspace_config_dir(config_dir: &Path) -> Re
);
}
#[cfg(unix)]
sync_directory(&default_config_dir).await?;
#[cfg(not(unix))]
sync_directory(&default_config_dir)?;
Ok(())
}
@ -7165,7 +7168,10 @@ impl Config {
})?;
}
#[cfg(unix)]
sync_directory(parent_dir).await?;
#[cfg(not(unix))]
sync_directory(parent_dir)?;
if had_existing_config {
let _ = fs::remove_file(&backup_path).await;
@ -7175,23 +7181,21 @@ impl Config {
}
}
#[cfg(unix)]
async fn sync_directory(path: &Path) -> Result<()> {
#[cfg(unix)]
{
let dir = File::open(path)
.await
.with_context(|| format!("Failed to open directory for fsync: {}", path.display()))?;
dir.sync_all()
.await
.with_context(|| format!("Failed to fsync directory metadata: {}", path.display()))?;
Ok(())
}
let dir = File::open(path)
.await
.with_context(|| format!("Failed to open directory for fsync: {}", path.display()))?;
dir.sync_all()
.await
.with_context(|| format!("Failed to fsync directory metadata: {}", path.display()))?;
Ok(())
}
#[cfg(not(unix))]
{
let _ = path;
Ok(())
}
#[cfg(not(unix))]
fn sync_directory(path: &Path) -> Result<()> {
let _ = path;
Ok(())
}
#[cfg(test)]
@ -7200,7 +7204,6 @@ mod tests {
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;
use tempfile::TempDir;
use tokio::sync::{Mutex, MutexGuard};
use tokio::test;
use tokio_stream::wrappers::ReadDirStream;
@ -7390,7 +7393,7 @@ mod tests {
#[cfg(unix)]
#[test]
async fn save_sets_config_permissions_on_new_file() {
let temp = TempDir::new().expect("temp dir");
let temp = tempfile::TempDir::new().expect("temp dir");
let config_path = temp.path().join("config.toml");
let workspace_dir = temp.path().join("workspace");
@ -8013,7 +8016,10 @@ tool_dispatcher = "xml"
));
fs::create_dir_all(&dir).await.unwrap();
#[cfg(unix)]
sync_directory(&dir).await.unwrap();
#[cfg(not(unix))]
sync_directory(&dir).unwrap();
let _ = fs::remove_dir_all(&dir).await;
}

View File

@ -53,7 +53,7 @@ pub async fn run(config: Config) -> Result<()> {
pub async fn execute_job_now(config: &Config, job: &CronJob) -> (bool, String) {
let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir);
execute_job_with_retry(config, &security, job).await
Box::pin(execute_job_with_retry(config, &security, job)).await
}
async fn execute_job_with_retry(
@ -68,7 +68,7 @@ async fn execute_job_with_retry(
for attempt in 0..=retries {
let (success, output) = match job.job_type {
JobType::Shell => run_job_command(config, security, job).await,
JobType::Agent => run_agent_job(config, security, job).await,
JobType::Agent => Box::pin(run_agent_job(config, security, job)).await,
};
last_output = output;
@ -101,18 +101,21 @@ async fn process_due_jobs(
crate::health::mark_component_ok(component);
let max_concurrent = config.scheduler.max_concurrent.max(1);
let mut in_flight =
stream::iter(
jobs.into_iter().map(|job| {
let config = config.clone();
let security = Arc::clone(security);
let component = component.to_owned();
async move {
execute_and_persist_job(&config, security.as_ref(), &job, &component).await
}
}),
)
.buffer_unordered(max_concurrent);
let mut in_flight = stream::iter(jobs.into_iter().map(|job| {
let config = config.clone();
let security = Arc::clone(security);
let component = component.to_owned();
async move {
Box::pin(execute_and_persist_job(
&config,
security.as_ref(),
&job,
&component,
))
.await
}
}))
.buffer_unordered(max_concurrent);
while let Some((job_id, success, output)) = in_flight.next().await {
if !success {
@ -131,7 +134,7 @@ async fn execute_and_persist_job(
warn_if_high_frequency_agent_job(job);
let started_at = Utc::now();
let (success, output) = execute_job_with_retry(config, security, job).await;
let (success, output) = Box::pin(execute_job_with_retry(config, security, job)).await;
let finished_at = Utc::now();
let success = persist_job_result(config, job, success, &output, started_at, finished_at).await;
@ -170,7 +173,7 @@ async fn run_agent_job(
let run_result = match job.session_target {
SessionTarget::Main | SessionTarget::Isolated => {
crate::agent::run(
Box::pin(crate::agent::run(
config.clone(),
Some(prefixed_prompt),
None,
@ -178,7 +181,7 @@ async fn run_agent_job(
config.default_temperature,
vec![],
false,
)
))
.await
}
};

View File

@ -65,7 +65,7 @@ pub async fn run(config: Config, host: String, port: u16) -> Result<()> {
max_backoff,
move || {
let cfg = channels_cfg.clone();
async move { crate::channels::start_channels(cfg).await }
async move { Box::pin(crate::channels::start_channels(cfg)).await }
},
));
} else {
@ -214,7 +214,7 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
for task in tasks {
let prompt = format!("[Heartbeat Task] {task}");
let temp = config.default_temperature;
match crate::agent::run(
match Box::pin(crate::agent::run(
config.clone(),
Some(prompt),
None,
@ -222,7 +222,7 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
temp,
vec![],
false,
)
))
.await
{
Ok(output) => {

View File

@ -706,7 +706,10 @@ fn restore_masked_sensitive_fields(
restore_optional_secret(&mut incoming.proxy.http_proxy, &current.proxy.http_proxy);
restore_optional_secret(&mut incoming.proxy.https_proxy, &current.proxy.https_proxy);
restore_optional_secret(&mut incoming.proxy.all_proxy, &current.proxy.all_proxy);
restore_optional_secret(&mut incoming.transcription.api_key, &current.transcription.api_key);
restore_optional_secret(
&mut incoming.transcription.api_key,
&current.transcription.api_key,
);
restore_optional_secret(
&mut incoming.browser.computer_use.api_key,
&current.browser.computer_use.api_key,
@ -932,7 +935,10 @@ mod tests {
assert_eq!(hydrated.config_path, current.config_path);
assert_eq!(hydrated.workspace_dir, current.workspace_dir);
assert_eq!(hydrated.api_key, current.api_key);
assert_eq!(hydrated.transcription.api_key, current.transcription.api_key);
assert_eq!(
hydrated.transcription.api_key,
current.transcription.api_key
);
assert_eq!(hydrated.default_model.as_deref(), Some("gpt-4.1-mini"));
assert_eq!(
hydrated.reliability.api_keys,

View File

@ -981,7 +981,13 @@ pub(super) async fn run_gateway_chat_with_tools(
channel_name: &str,
) -> anyhow::Result<String> {
let config = state.config.lock().clone();
crate::agent::process_message(config, message, sender_id, channel_name).await
Box::pin(crate::agent::process_message(
config,
message,
sender_id,
channel_name,
))
.await
}
fn sanitize_gateway_response(response: &str, tools: &[Box<dyn Tool>]) -> String {
@ -1810,7 +1816,14 @@ async fn handle_whatsapp_message(
.await;
}
match run_gateway_chat_with_tools(&state, &msg.content, &msg.sender, "whatsapp").await {
match Box::pin(run_gateway_chat_with_tools(
&state,
&msg.content,
&msg.sender,
"whatsapp",
))
.await
{
Ok(response) => {
let safe_response =
sanitize_gateway_response(&response, state.tools_registry_exec.as_ref());
@ -1929,7 +1942,14 @@ async fn handle_linq_webhook(
}
// Call the LLM
match run_gateway_chat_with_tools(&state, &msg.content, &msg.sender, "linq").await {
match Box::pin(run_gateway_chat_with_tools(
&state,
&msg.content,
&msg.sender,
"linq",
))
.await
{
Ok(response) => {
let safe_response =
sanitize_gateway_response(&response, state.tools_registry_exec.as_ref());
@ -2023,7 +2043,14 @@ async fn handle_wati_webhook(State(state): State<AppState>, body: Bytes) -> impl
}
// Call the LLM
match run_gateway_chat_with_tools(&state, &msg.content, &msg.sender, "wati").await {
match Box::pin(run_gateway_chat_with_tools(
&state,
&msg.content,
&msg.sender,
"wati",
))
.await
{
Ok(response) => {
let safe_response =
sanitize_gateway_response(&response, state.tools_registry_exec.as_ref());
@ -2129,8 +2156,13 @@ async fn handle_nextcloud_talk_webhook(
.await;
}
match run_gateway_chat_with_tools(&state, &msg.content, &msg.sender, "nextcloud_talk")
.await
match Box::pin(run_gateway_chat_with_tools(
&state,
&msg.content,
&msg.sender,
"nextcloud_talk",
))
.await
{
Ok(response) => {
let safe_response =
@ -2222,7 +2254,14 @@ async fn handle_qq_webhook(
.await;
}
match run_gateway_chat_with_tools(&state, &msg.content, &msg.sender, "qq").await {
match Box::pin(run_gateway_chat_with_tools(
&state,
&msg.content,
&msg.sender,
"qq",
))
.await
{
Ok(response) => {
let safe_response =
sanitize_gateway_response(&response, state.tools_registry_exec.as_ref());

View File

@ -95,9 +95,7 @@ pub async fn handle_api_chat(
&& state.webhook_secret_hash.is_none()
&& !peer_addr.ip().is_loopback()
{
tracing::warn!(
"/api/chat: rejected unauthenticated non-loopback request"
);
tracing::warn!("/api/chat: rejected unauthenticated non-loopback request");
let err = serde_json::json!({
"error": "Unauthorized — configure pairing or X-Webhook-Secret for non-local access"
});
@ -152,7 +150,11 @@ pub async fn handle_api_chat(
message.to_string()
} else {
let recent: Vec<&String> = chat_body.context.iter().rev().take(10).rev().collect();
let context_block = recent.iter().map(|s| s.as_str()).collect::<Vec<&str>>().join("\n");
let context_block = recent
.iter()
.map(|s| s.as_str())
.collect::<Vec<&str>>()
.join("\n");
format!(
"Recent conversation context:\n{}\n\nCurrent message:\n{}",
context_block, message
@ -184,11 +186,15 @@ pub async fn handle_api_chat(
});
// ── Run the full agent loop ──
let sender_id = chat_body
.session_id
.as_deref()
.unwrap_or(rate_key.as_str());
match run_gateway_chat_with_tools(&state, &enriched_message, sender_id, "api_chat").await {
let sender_id = chat_body.session_id.as_deref().unwrap_or(rate_key.as_str());
match Box::pin(run_gateway_chat_with_tools(
&state,
&enriched_message,
sender_id,
"api_chat",
))
.await
{
Ok(response) => {
let safe_response =
sanitize_gateway_response(&response, state.tools_registry_exec.as_ref());
@ -399,7 +405,9 @@ pub async fn handle_v1_chat_completions_with_tools(
.unwrap_or("");
let token = auth.strip_prefix("Bearer ").unwrap_or("");
if !state.pairing.is_authenticated(token) {
tracing::warn!("/v1/chat/completions (compat): rejected — not paired / invalid bearer token");
tracing::warn!(
"/v1/chat/completions (compat): rejected — not paired / invalid bearer token"
);
let err = serde_json::json!({
"error": {
"message": "Invalid API key. Pair first via POST /pair, then use Authorization: Bearer <token>",
@ -485,7 +493,11 @@ pub async fn handle_v1_chat_completions_with_tools(
.rev()
.filter(|m| m.role == "user" || m.role == "assistant")
.map(|m| {
let role_label = if m.role == "user" { "User" } else { "Assistant" };
let role_label = if m.role == "user" {
"User"
} else {
"Assistant"
};
format!("{}: {}", role_label, m.content)
})
.collect();
@ -499,7 +511,11 @@ pub async fn handle_v1_chat_completions_with_tools(
.take(MAX_CONTEXT_MESSAGES)
.rev()
.collect();
let context_block = recent.iter().map(|s| s.as_str()).collect::<Vec<&str>>().join("\n");
let context_block = recent
.iter()
.map(|s| s.as_str())
.collect::<Vec<&str>>()
.join("\n");
format!(
"Recent conversation context:\n{}\n\nCurrent message:\n{}",
context_block, message
@ -550,12 +566,12 @@ pub async fn handle_v1_chat_completions_with_tools(
);
// ── Run the full agent loop ──
let reply = match run_gateway_chat_with_tools(
let reply = match Box::pin(run_gateway_chat_with_tools(
&state,
&enriched_message,
rate_key.as_str(),
"openai_compat",
)
))
.await
{
Ok(response) => {
@ -628,9 +644,7 @@ pub async fn handle_v1_chat_completions_with_tools(
}
};
let model_name = request
.model
.unwrap_or_else(|| state.model.clone());
let model_name = request.model.unwrap_or_else(|| state.model.clone());
#[allow(clippy::cast_possible_truncation)]
let prompt_tokens = (enriched_message.len() / 4) as u32;
@ -855,14 +869,20 @@ mod tests {
fn api_chat_body_rejects_missing_message() {
let json = r#"{"session_id": "s1"}"#;
let result: Result<ApiChatBody, _> = serde_json::from_str(json);
assert!(result.is_err(), "missing `message` field should fail deserialization");
assert!(
result.is_err(),
"missing `message` field should fail deserialization"
);
}
#[test]
fn oai_request_rejects_empty_messages() {
let json = r#"{"messages": []}"#;
let req: OaiChatRequest = serde_json::from_str(json).unwrap();
assert!(req.messages.is_empty(), "empty messages should parse but be caught by handler");
assert!(
req.messages.is_empty(),
"empty messages should parse but be caught by handler"
);
}
#[test]
@ -903,7 +923,17 @@ mod tests {
.skip(1)
.rev()
.filter(|m| m.role == "user" || m.role == "assistant")
.map(|m| format!("{}: {}", if m.role == "user" { "User" } else { "Assistant" }, m.content))
.map(|m| {
format!(
"{}: {}",
if m.role == "user" {
"User"
} else {
"Assistant"
},
m.content
)
})
.collect();
assert_eq!(context_messages.len(), 2);

View File

@ -10,9 +10,7 @@
//! ```
use super::AppState;
use crate::agent::loop_::{
build_shell_policy_instructions, build_tool_instructions_from_specs,
};
use crate::agent::loop_::{build_shell_policy_instructions, build_tool_instructions_from_specs};
use crate::approval::ApprovalManager;
use crate::providers::ChatMessage;
use axum::{
@ -263,8 +261,13 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) {
}));
// Full agentic loop with tools (includes WASM skills, shell, memory, etc.)
match super::run_gateway_chat_with_tools(&state, &content, ws_sender_id.as_str(), "ws")
.await
match Box::pin(super::run_gateway_chat_with_tools(
&state,
&content,
ws_sender_id.as_str(),
"ws",
))
.await
{
Ok(response) => {
let safe_response =

View File

@ -57,8 +57,6 @@ pub(crate) mod heartbeat;
pub mod hooks;
pub(crate) mod identity;
// Intentionally unused re-export — public API surface for plugin authors.
#[allow(unused_imports)]
pub(crate) mod plugins;
pub(crate) mod integrations;
pub mod memory;
pub(crate) mod migration;
@ -66,6 +64,8 @@ pub(crate) mod multimodal;
pub mod observability;
pub(crate) mod onboard;
pub mod peripherals;
#[allow(unused_imports)]
pub(crate) mod plugins;
pub mod providers;
pub mod rag;
pub mod runtime;

View File

@ -798,9 +798,9 @@ async fn main() -> Result<()> {
bail!("--channels-only does not accept --force");
}
let config = if channels_only {
onboard::run_channels_repair_wizard().await
Box::pin(onboard::run_channels_repair_wizard()).await
} else if interactive {
onboard::run_wizard(force).await
Box::pin(onboard::run_wizard(force)).await
} else {
onboard::run_quick_setup(
api_key.as_deref(),
@ -814,7 +814,7 @@ async fn main() -> Result<()> {
}?;
// Auto-start channels if user said yes during wizard
if std::env::var("ZEROCLAW_AUTOSTART_CHANNELS").as_deref() == Ok("1") {
channels::start_channels(config).await?;
Box::pin(channels::start_channels(config)).await?;
}
return Ok(());
}
@ -875,7 +875,7 @@ async fn main() -> Result<()> {
// Single-shot mode (-m) runs non-interactively: no TTY approval prompt,
// so tools are not denied by a stdin read returning EOF.
let interactive = message.is_none();
agent::run(
Box::pin(agent::run(
config,
message,
provider,
@ -883,7 +883,7 @@ async fn main() -> Result<()> {
temperature,
peripheral,
interactive,
)
))
.await
.map(|_| ())
}
@ -1114,8 +1114,8 @@ async fn main() -> Result<()> {
},
Commands::Channel { channel_command } => match channel_command {
ChannelCommands::Start => channels::start_channels(config).await,
ChannelCommands::Doctor => channels::doctor_channels(config).await,
ChannelCommands::Start => Box::pin(channels::start_channels(config)).await,
ChannelCommands::Doctor => Box::pin(channels::doctor_channels(config)).await,
other => channels::handle_command(other, &config).await,
},

View File

@ -778,8 +778,7 @@ fn default_model_for_provider(provider: &str) -> String {
"qwen-code" => "qwen3-coder-plus".into(),
"ollama" => "llama3.2".into(),
"llamacpp" => "ggml-org/gpt-oss-20b-GGUF".into(),
"sglang" | "vllm" | "osaurus" => "default".into(),
"copilot" => "default".into(),
"sglang" | "vllm" | "osaurus" | "copilot" => "default".into(),
"gemini" => "gemini-2.5-pro".into(),
"kimi-code" => "kimi-for-coding".into(),
"bedrock" => "anthropic.claude-sonnet-4-5-20250929-v1:0".into(),

View File

@ -5,7 +5,9 @@
use std::path::{Path, PathBuf};
use super::manifest::{load_manifest, ManifestLoadResult, PluginManifest, PLUGIN_MANIFEST_FILENAME};
use super::manifest::{
load_manifest, ManifestLoadResult, PluginManifest, PLUGIN_MANIFEST_FILENAME,
};
use super::registry::{DiagnosticLevel, PluginDiagnostic, PluginOrigin};
/// A discovered plugin before loading.
@ -79,10 +81,7 @@ fn scan_dir(dir: &Path, origin: PluginOrigin) -> (Vec<DiscoveredPlugin>, Vec<Plu
/// 2. Global: `~/.zeroclaw/extensions/`
/// 3. Workspace: `<workspace>/.zeroclaw/extensions/`
/// 4. Extra paths from config `[plugins] load_paths`
pub fn discover_plugins(
workspace_dir: Option<&Path>,
extra_paths: &[PathBuf],
) -> DiscoveryResult {
pub fn discover_plugins(workspace_dir: Option<&Path>, extra_paths: &[PathBuf]) -> DiscoveryResult {
let mut all_plugins = Vec::new();
let mut all_diagnostics = Vec::new();
@ -127,7 +126,7 @@ pub fn discover_plugins(
let mut deduped: Vec<DiscoveredPlugin> = Vec::with_capacity(seen.len());
// Collect in insertion order of the winning index
let mut indices: Vec<usize> = seen.values().copied().collect();
indices.sort();
indices.sort_unstable();
for i in indices {
deduped.push(all_plugins.swap_remove(i));
}
@ -183,10 +182,7 @@ version = "0.1.0"
make_plugin_dir(&ext_dir, "custom-one");
let result = discover_plugins(None, &[ext_dir]);
assert!(result
.plugins
.iter()
.any(|p| p.manifest.id == "custom-one"));
assert!(result.plugins.iter().any(|p| p.manifest.id == "custom-one"));
}
#[test]

View File

@ -13,7 +13,10 @@ use tracing::{info, warn};
use crate::config::PluginsConfig;
use super::discovery::discover_plugins;
use super::registry::*;
use super::registry::{
DiagnosticLevel, PluginDiagnostic, PluginHookRegistration, PluginOrigin, PluginRecord,
PluginRegistry, PluginStatus, PluginToolRegistration,
};
use super::traits::{Plugin, PluginApi, PluginLogger};
/// Resolve whether a discovered plugin should be enabled.
@ -306,7 +309,10 @@ mod tests {
};
let reg = load_plugins(&cfg, None, vec![]);
assert_eq!(reg.active_count(), 0);
assert!(reg.diagnostics.iter().any(|d| d.message.contains("disabled")));
assert!(reg
.diagnostics
.iter()
.any(|d| d.message.contains("disabled")));
}
#[test]

View File

@ -363,7 +363,7 @@ fn shannon_entropy(bytes: &[u8]) -> f64 {
.iter()
.filter(|&&count| count > 0)
.map(|&count| {
let p = count as f64 / len;
let p = f64::from(count) / len;
-p * p.log2()
})
.sum()
@ -390,7 +390,7 @@ mod tests {
assert!(patterns.iter().any(|p| p.contains("Stripe")));
assert!(redacted.contains("[REDACTED"));
}
_ => panic!("Should detect Stripe key"),
LeakResult::Clean => panic!("Should detect Stripe key"),
}
}
@ -403,7 +403,7 @@ mod tests {
LeakResult::Detected { patterns, .. } => {
assert!(patterns.iter().any(|p| p.contains("AWS")));
}
_ => panic!("Should detect AWS key"),
LeakResult::Clean => panic!("Should detect AWS key"),
}
}
@ -421,7 +421,7 @@ MIIEowIBAAKCAQEA0ZPr5JeyVDonXsKhfq...
assert!(patterns.iter().any(|p| p.contains("private key")));
assert!(redacted.contains("[REDACTED_PRIVATE_KEY]"));
}
_ => panic!("Should detect private key"),
LeakResult::Clean => panic!("Should detect private key"),
}
}
@ -435,7 +435,7 @@ MIIEowIBAAKCAQEA0ZPr5JeyVDonXsKhfq...
assert!(patterns.iter().any(|p| p.contains("JWT")));
assert!(redacted.contains("[REDACTED_JWT]"));
}
_ => panic!("Should detect JWT"),
LeakResult::Clean => panic!("Should detect JWT"),
}
}
@ -448,7 +448,7 @@ MIIEowIBAAKCAQEA0ZPr5JeyVDonXsKhfq...
LeakResult::Detected { patterns, .. } => {
assert!(patterns.iter().any(|p| p.contains("PostgreSQL")));
}
_ => panic!("Should detect database URL"),
LeakResult::Clean => panic!("Should detect database URL"),
}
}
@ -506,7 +506,7 @@ MIIEowIBAAKCAQEA0ZPr5JeyVDonXsKhfq...
assert!(patterns.iter().any(|p| p.contains("High-entropy token")));
assert!(redacted.contains("[REDACTED_HIGH_ENTROPY_TOKEN]"));
}
_ => panic!("expected high-entropy detection"),
LeakResult::Clean => panic!("expected high-entropy detection"),
}
}

View File

@ -61,7 +61,8 @@ fn char_class_perplexity(prefix: &str, suffix: &str) -> f64 {
let class = classify_char(ch);
if let Some(p) = suffix_prev {
let numerator = f64::from(transition[p][class] + 1);
let denominator = f64::from(row_totals[p] + CLASS_COUNT as u32);
let class_count_u32 = u32::try_from(CLASS_COUNT).unwrap_or(u32::MAX);
let denominator = f64::from(row_totals[p] + class_count_u32);
nll += -(numerator / denominator).ln();
pairs += 1;
}

View File

@ -3,7 +3,6 @@ use regex::Regex;
use std::fs;
use std::path::{Component, Path, PathBuf};
use std::sync::OnceLock;
use zip::ZipArchive;
const MAX_TEXT_FILE_BYTES: u64 = 512 * 1024;

View File

@ -452,7 +452,8 @@ fn load_skill_md(path: &Path, dir: &Path) -> Result<Skill> {
if let Ok(raw) = std::fs::read(&meta_path) {
if let Ok(meta) = serde_json::from_slice::<serde_json::Value>(&raw) {
if let Some(slug) = meta.get("slug").and_then(|v| v.as_str()) {
let normalized = normalize_skill_name(slug.split('/').last().unwrap_or(slug));
let normalized =
normalize_skill_name(slug.split('/').next_back().unwrap_or(slug));
if !normalized.is_empty() {
name = normalized;
}
@ -1616,7 +1617,7 @@ fn extract_zip_skill_meta(
f.read_to_end(&mut buf).ok();
if let Ok(meta) = serde_json::from_slice::<serde_json::Value>(&buf) {
let slug_raw = meta.get("slug").and_then(|v| v.as_str()).unwrap_or("");
let base = slug_raw.split('/').last().unwrap_or(slug_raw);
let base = slug_raw.split('/').next_back().unwrap_or(slug_raw);
let name = normalize_skill_name(base);
if !name.is_empty() {
let version = meta

View File

@ -116,7 +116,8 @@ impl Tool for CronRunTool {
}
let started_at = Utc::now();
let (success, output) = cron::scheduler::execute_job_now(&self.config, &job).await;
let (success, output) =
Box::pin(cron::scheduler::execute_job_now(&self.config, &job)).await;
let finished_at = Utc::now();
let duration_ms = (finished_at - started_at).num_milliseconds();
let status = if success { "ok" } else { "error" };

View File

@ -202,24 +202,23 @@ impl Tool for DocxReadTool {
}
};
let text =
match tokio::task::spawn_blocking(move || extract_docx_text(&bytes)).await {
Ok(Ok(t)) => t,
Ok(Err(e)) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("DOCX extraction failed: {e}")),
});
}
Err(e) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("DOCX extraction task panicked: {e}")),
});
}
};
let text = match tokio::task::spawn_blocking(move || extract_docx_text(&bytes)).await {
Ok(Ok(t)) => t,
Ok(Err(e)) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("DOCX extraction failed: {e}")),
});
}
Err(e) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("DOCX extraction task panicked: {e}")),
});
}
};
if text.trim().is_empty() {
return Ok(ToolResult {

View File

@ -301,11 +301,11 @@ mod tests {
name: "nonexistent".to_string(),
command: "/usr/bin/this_binary_does_not_exist_zeroclaw_test".to_string(),
args: vec![],
env: Default::default(),
env: std::collections::HashMap::default(),
tool_timeout_secs: None,
transport: McpTransport::Stdio,
url: None,
headers: Default::default(),
headers: std::collections::HashMap::default(),
};
let result = McpServer::connect(config).await;
assert!(result.is_err());
@ -320,11 +320,11 @@ mod tests {
name: "bad".to_string(),
command: "/usr/bin/does_not_exist_zc_test".to_string(),
args: vec![],
env: Default::default(),
env: std::collections::HashMap::default(),
tool_timeout_secs: None,
transport: McpTransport::Stdio,
url: None,
headers: Default::default(),
headers: std::collections::HashMap::default(),
}];
let registry = McpRegistry::connect_all(&configs)
.await

View File

@ -5,7 +5,7 @@ use std::borrow::Cow;
use anyhow::{anyhow, bail, Context, Result};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::{Mutex, Notify, oneshot};
use tokio::sync::{oneshot, Mutex, Notify};
use tokio::time::{timeout, Duration};
use tokio_stream::StreamExt;
@ -221,7 +221,8 @@ impl SseTransport {
.ok_or_else(|| anyhow!("URL required for SSE transport"))?
.clone();
let client = reqwest::Client::builder().build()
let client = reqwest::Client::builder()
.build()
.context("failed to build HTTP client")?;
Ok(Self {
@ -288,7 +289,7 @@ impl SseTransport {
self.reader_task = Some(tokio::spawn(async move {
let stream = resp
.bytes_stream()
.map(|item| item.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
.map(|item| item.map_err(std::io::Error::other));
let reader = tokio_util::io::StreamReader::new(stream);
let mut lines = BufReader::new(reader).lines();
@ -325,16 +326,13 @@ impl SseTransport {
if let Some(rest) = line.strip_prefix("event:") {
cur_event = Some(rest.trim().to_string());
continue;
}
if let Some(rest) = line.strip_prefix("data:") {
let rest = rest.strip_prefix(' ').unwrap_or(rest);
cur_data.push(rest.to_string());
continue;
}
if let Some(rest) = line.strip_prefix("id:") {
cur_id = Some(rest.trim().to_string());
continue;
}
}
}
@ -380,7 +378,7 @@ impl SseTransport {
Ok((derived, false))
}
async fn maybe_try_alternate_message_url(
fn maybe_try_alternate_message_url(
&self,
current_url: &str,
from_endpoint: bool,
@ -467,7 +465,9 @@ async fn handle_sse_event(
return;
};
let Some(id_val) = resp.id.clone() else { return; };
let Some(id_val) = resp.id.clone() else {
return;
};
let id = match id_val.as_u64() {
Some(v) => v,
None => return,
@ -480,7 +480,11 @@ async fn handle_sse_event(
if let Some(tx) = tx {
let _ = tx.send(resp);
} else {
tracing::debug!("MCP SSE `{}` received response for unknown id {}", server_name, id);
tracing::debug!(
"MCP SSE `{}` received response for unknown id {}",
server_name,
id
);
}
}
@ -542,7 +546,7 @@ async fn read_first_jsonrpc_from_sse_response(
) -> Result<Option<JsonRpcResponse>> {
let stream = resp
.bytes_stream()
.map(|item| item.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
.map(|item| item.map_err(std::io::Error::other));
let reader = tokio_util::io::StreamReader::new(stream);
let mut lines = BufReader::new(reader).lines();
@ -563,7 +567,8 @@ async fn read_first_jsonrpc_from_sse_response(
cur_data.clear();
let event = event.unwrap_or_else(|| "message".to_string());
if event.eq_ignore_ascii_case("endpoint") || event.eq_ignore_ascii_case("mcp-endpoint") {
if event.eq_ignore_ascii_case("endpoint") || event.eq_ignore_ascii_case("mcp-endpoint")
{
continue;
}
if !event.eq_ignore_ascii_case("message") {
@ -586,12 +591,10 @@ async fn read_first_jsonrpc_from_sse_response(
}
if let Some(rest) = line.strip_prefix("event:") {
cur_event = Some(rest.trim().to_string());
continue;
}
if let Some(rest) = line.strip_prefix("data:") {
let rest = rest.strip_prefix(' ').unwrap_or(rest);
cur_data.push(rest.to_string());
continue;
}
}
@ -603,10 +606,7 @@ impl McpTransportConn for SseTransport {
async fn send_and_recv(&mut self, request: &JsonRpcRequest) -> Result<JsonRpcResponse> {
self.ensure_connected().await?;
let id = request
.id
.as_ref()
.and_then(|v| v.as_u64());
let id = request.id.as_ref().and_then(|v| v.as_u64());
let body = serde_json::to_string(request)?;
let (mut message_url, mut from_endpoint) = self.get_message_url().await?;
@ -654,7 +654,10 @@ impl McpTransportConn for SseTransport {
let mut got_direct = None;
let mut last_status = None;
for (i, url) in std::iter::once(primary_url).chain(secondary_url.into_iter()).enumerate() {
for (i, url) in std::iter::once(primary_url)
.chain(secondary_url.into_iter())
.enumerate()
{
let mut req = self
.client
.post(&url)
@ -664,7 +667,11 @@ impl McpTransportConn for SseTransport {
for (key, value) in &self.headers {
req = req.header(key, value);
}
if !self.headers.keys().any(|k| k.eq_ignore_ascii_case("Accept")) {
if !self
.headers
.keys()
.any(|k| k.eq_ignore_ascii_case("Accept"))
{
req = req.header("Accept", "application/json, text/event-stream");
}
@ -715,12 +722,11 @@ impl McpTransportConn for SseTransport {
}
Err(_) => continue,
}
} else {
if let Some(resp) = read_first_jsonrpc_from_sse_response(resp).await? {
got_direct = Some(resp);
}
break;
}
if let Some(resp) = read_first_jsonrpc_from_sse_response(resp).await? {
got_direct = Some(resp);
}
break;
}
let text = if i == 0 && has_secondary {
@ -861,7 +867,8 @@ mod tests {
#[test]
fn test_extract_json_from_sse_uses_last_event_with_data() {
let input = ": keep-alive\n\nid: 1\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
let input =
": keep-alive\n\nid: 1\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
let extracted = extract_json_from_sse_text(input);
let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
}

View File

@ -900,8 +900,9 @@ impl Tool for ModelRoutingConfigTool {
mod tests {
use super::*;
use crate::security::{AutonomyLevel, SecurityPolicy};
use std::sync::{Mutex, OnceLock};
use std::sync::OnceLock;
use tempfile::TempDir;
use tokio::sync::Mutex;
fn test_security() -> Arc<SecurityPolicy> {
Arc::new(SecurityPolicy {
@ -945,11 +946,9 @@ mod tests {
}
}
fn env_lock() -> std::sync::MutexGuard<'static, ()> {
async fn env_lock() -> tokio::sync::MutexGuard<'static, ()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
.lock()
.expect("env lock poisoned")
LOCK.get_or_init(|| Mutex::new(())).lock().await
}
async fn test_config(tmp: &TempDir) -> Arc<Config> {
@ -1118,7 +1117,7 @@ mod tests {
#[tokio::test]
async fn get_reports_env_backed_credentials_for_routes_and_agents() {
let _env_lock = env_lock();
let _env_lock = env_lock().await;
let _provider_guard = EnvGuard::set("TELNYX_API_KEY", Some("test-telnyx-key"));
let _generic_guard = EnvGuard::set("ZEROCLAW_API_KEY", None);
let _api_key_guard = EnvGuard::set("API_KEY", None);
@ -1160,6 +1159,9 @@ mod tests {
.unwrap();
assert_eq!(route["api_key_configured"], json!(true));
assert_eq!(output["agents"]["voice_helper"]["api_key_configured"], json!(true));
assert_eq!(
output["agents"]["voice_helper"]["api_key_configured"],
json!(true)
);
}
}

View File

@ -726,7 +726,6 @@ async fn e2e_live_research_phase() {
use zeroclaw::config::{ResearchPhaseConfig, ResearchTrigger};
use zeroclaw::observability::NoopObserver;
use zeroclaw::providers::openai_codex::OpenAiCodexProvider;
use zeroclaw::providers::traits::Provider;
use zeroclaw::tools::{Tool, ToolResult};
// ── Test should_trigger ──