diff --git a/Cargo.toml b/Cargo.toml index d4de257f1..ea376897d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -231,6 +231,9 @@ rag-pdf = ["dep:pdf-extract"] wasm-tools = ["dep:wasmtime", "dep:wasmtime-wasi"] # whatsapp-web = Native WhatsApp Web client with custom rusqlite storage backend whatsapp-web = ["dep:wa-rs", "dep:wa-rs-core", "dep:wa-rs-binary", "dep:wa-rs-proto", "dep:wa-rs-ureq-http", "dep:wa-rs-tokio-transport", "dep:serde-big-array", "dep:prost", "dep:qrcode"] +firecrawl = [] +web-fetch-html2md = [] +web-fetch-plaintext = [] [profile.release] opt-level = "z" # Optimize for size diff --git a/src/agent/loop_.rs b/src/agent/loop_.rs index 918263fb7..74bd232c6 100644 --- a/src/agent/loop_.rs +++ b/src/agent/loop_.rs @@ -32,6 +32,7 @@ mod execution; mod history; mod parsing; +use crate::agent::session::{create_session_manager, resolve_session_id, SessionManager}; use context::{build_context, build_hardware_context}; use execution::{ execute_tools_parallel, execute_tools_sequential, should_execute_tools_in_parallel, @@ -47,7 +48,6 @@ use parsing::{ parse_perl_style_tool_calls, parse_structured_tool_calls, parse_tool_call_value, parse_tool_calls, parse_tool_calls_from_json_value, tool_call_signature, ParsedToolCall, }; -use crate::agent::session::{create_session_manager, resolve_session_id, SessionManager}; /// Minimum characters per chunk when relaying LLM text to a streaming draft. const STREAM_CHUNK_MIN_CHARS: usize = 80; @@ -150,8 +150,12 @@ impl Helper for SlashCommandCompleter {} static CHANNEL_SESSION_MANAGER: LazyLock>>> = LazyLock::new(|| Mutex::new(HashMap::new())); -async fn channel_session_manager(config: &Config) -> Result>> { - let key = format!("{:?}:{:?}", config.workspace_dir, config.agent.session); +fn channel_session_manager(config: &Config) -> Result>> { + let key = format!( + "{}:{:?}", + config.workspace_dir.display(), + config.agent.session + ); { let map = CHANNEL_SESSION_MANAGER.lock().unwrap(); @@ -951,7 +955,7 @@ pub(crate) async fn run_tool_call_loop( Some(model), Some(&turn_id), Some(false), - Some(&parse_issue), + Some(parse_issue), serde_json::json!({ "iteration": iteration + 1, "response_excerpt": truncate_with_ellipsis( @@ -2163,7 +2167,7 @@ pub async fn process_message( sender_id: &str, channel_name: &str, ) -> Result { - tracing::warn!(sender_id, channel_name, "process_message called"); + tracing::debug!(sender_id, channel_name, "process_message called"); let observer: Arc = Arc::from(observability::create_observer(&config.observability)); let runtime: Arc = @@ -2331,16 +2335,13 @@ pub async fn process_message( format!("{context}[{now}] {message}") }; - let session_manager = channel_session_manager(&config).await?; + let session_manager = channel_session_manager(&config)?; let session_id = resolve_session_id(&config.agent.session, sender_id, Some(channel_name)); - tracing::warn!(session_id, "session_id resolved"); + tracing::debug!(session_id, "session_id resolved"); if let Some(mgr) = session_manager { let session = mgr.get_or_create(&session_id).await?; let stored_history = session.get_history().await?; - tracing::warn!( - history_len = stored_history.len(), - "session history loaded" - ); + tracing::debug!(history_len = stored_history.len(), "session history loaded"); let filtered_history: Vec = stored_history .into_iter() .filter(|m| m.role != "system") @@ -2377,7 +2378,7 @@ pub async fn process_message( .update_history(persisted) .await .context("Failed to update session history")?; - tracing::warn!(saved_len, "session history saved"); + tracing::debug!(saved_len, "session history saved"); Ok(reply) } else { let mut history = vec![ diff --git a/src/agent/prompt.rs b/src/agent/prompt.rs index 6d63489a2..612a5c958 100644 --- a/src/agent/prompt.rs +++ b/src/agent/prompt.rs @@ -115,7 +115,9 @@ impl PromptSection for IdentitySection { inject_workspace_file(&mut prompt, ctx.workspace_dir, "MEMORY.md"); } - let extra_files = ctx.identity_config.map_or(&[][..], |cfg| cfg.extra_files.as_slice()); + let extra_files = ctx + .identity_config + .map_or(&[][..], |cfg| cfg.extra_files.as_slice()); for file in extra_files { if let Some(safe_relative) = normalize_openclaw_identity_extra_file(file) { inject_workspace_file(&mut prompt, ctx.workspace_dir, safe_relative); diff --git a/src/agent/session.rs b/src/agent/session.rs index 3a5857e6a..4b92ca0a5 100644 --- a/src/agent/session.rs +++ b/src/agent/session.rs @@ -1,5 +1,7 @@ use crate::providers::ChatMessage; -use crate::{config::AgentSessionBackend, config::AgentSessionConfig, config::AgentSessionStrategy}; +use crate::{ + config::AgentSessionBackend, config::AgentSessionConfig, config::AgentSessionStrategy, +}; use anyhow::{Context, Result}; use async_trait::async_trait; use parking_lot::Mutex; @@ -147,10 +149,12 @@ impl SessionManager for MemorySessionManager { async fn get_history(&self, session_id: &str) -> Result> { let mut sessions = self.inner.sessions.write().await; let now = unix_seconds_now(); - let entry = sessions.entry(session_id.to_string()).or_insert_with(|| MemorySessionState { - history: Vec::new(), - updated_at_unix: now, - }); + let entry = sessions + .entry(session_id.to_string()) + .or_insert_with(|| MemorySessionState { + history: Vec::new(), + updated_at_unix: now, + }); entry.updated_at_unix = now; Ok(entry.history.clone()) } @@ -243,7 +247,7 @@ impl SqliteSessionManager { let conn = self.conn.clone(); let session_id = session_id.to_string(); let age_secs = age.as_secs() as i64; - + tokio::task::spawn_blocking(move || { let conn = conn.lock(); let new_time = unix_seconds_now() - age_secs; @@ -252,7 +256,9 @@ impl SqliteSessionManager { params![session_id, new_time], )?; Ok(()) - }).await? + }) + .await + .context("SQLite blocking task panicked")? } } @@ -291,7 +297,9 @@ impl SessionManager for SqliteSessionManager { params![session_id, now], )?; Ok(Vec::new()) - }).await? + }) + .await + .context("SQLite blocking task panicked")? } async fn set_history(&self, session_id: &str, mut history: Vec) -> Result<()> { @@ -310,13 +318,15 @@ impl SessionManager for SqliteSessionManager { params![session_id, json, now], )?; Ok(()) - }).await? + }) + .await + .context("SQLite blocking task panicked")? } async fn delete(&self, session_id: &str) -> Result<()> { let conn = self.conn.clone(); let session_id = session_id.to_string(); - + tokio::task::spawn_blocking(move || { let conn = conn.lock(); conn.execute( @@ -324,7 +334,9 @@ impl SessionManager for SqliteSessionManager { params![session_id], )?; Ok(()) - }).await? + }) + .await + .context("SQLite blocking task panicked")? } async fn cleanup_expired(&self) -> Result { @@ -333,7 +345,7 @@ impl SessionManager for SqliteSessionManager { } let conn = self.conn.clone(); let ttl_secs = self.ttl.as_secs() as i64; - + tokio::task::spawn_blocking(move || { let cutoff = unix_seconds_now() - ttl_secs; let conn = conn.lock(); @@ -342,7 +354,9 @@ impl SessionManager for SqliteSessionManager { params![cutoff], )?; Ok(removed) - }).await? + }) + .await + .context("SQLite blocking task panicked")? } } @@ -472,10 +486,11 @@ mod tests { session .update_history(vec![ChatMessage::user("hi"), ChatMessage::assistant("ok")]) .await?; - + // Force expire by setting age to 2 seconds - mgr.force_expire_session("s1", Duration::from_secs(2)).await?; - + mgr.force_expire_session("s1", Duration::from_secs(2)) + .await?; + let removed = mgr.cleanup_expired().await?; assert!(removed >= 1); Ok(()) diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 9c6d23f9b..25f6fabed 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -3011,7 +3011,7 @@ async fn process_channel_message( ) { let sender_id = msg.sender.as_str(); let channel_name = msg.channel.as_str(); - tracing::warn!(sender_id, channel_name, "process_message called"); + tracing::debug!(sender_id, channel_name, "process_message called"); if cancellation_token.is_cancelled() { return; } @@ -3115,37 +3115,35 @@ or tune thresholds in config.", }; if should_seed { - let session_config = CHANNEL_SESSION_CONFIG.get().cloned().unwrap_or_default(); - let session_id = resolve_session_id( - &session_config, - msg.sender.as_str(), - Some(msg.channel.as_str()), - ); - tracing::warn!(session_id, "session_id resolved"); - match manager.get_or_create(&session_id).await { - Ok(session) => match session.get_history().await { - Ok(history) => { - tracing::warn!( - history_len = history.len(), - "session history loaded" - ); - let filtered: Vec = history - .into_iter() - .filter(|m| m.role != "system") - .collect(); - let mut histories = ctx - .conversation_histories - .lock() - .unwrap_or_else(|e| e.into_inner()); - histories.entry(history_key.clone()).or_insert(filtered); - } + if let Some(session_config) = CHANNEL_SESSION_CONFIG.get().cloned() { + let session_id = resolve_session_id( + &session_config, + msg.sender.as_str(), + Some(msg.channel.as_str()), + ); + tracing::debug!(session_id, "session_id resolved"); + match manager.get_or_create(&session_id).await { + Ok(session) => match session.get_history().await { + Ok(history) => { + tracing::debug!(history_len = history.len(), "session history loaded"); + let filtered: Vec = + history.into_iter().filter(|m| m.role != "system").collect(); + let mut histories = ctx + .conversation_histories + .lock() + .unwrap_or_else(|e| e.into_inner()); + histories.entry(history_key.clone()).or_insert(filtered); + } + Err(err) => { + tracing::warn!("Failed to load session history: {err}"); + } + }, Err(err) => { - tracing::warn!("Failed to load session history: {err}"); + tracing::warn!("Failed to open session: {err}"); } - }, - Err(err) => { - tracing::warn!("Failed to open session: {err}"); } + } else { + tracing::warn!("CHANNEL_SESSION_CONFIG not initialized, skipping session"); } } } @@ -3568,41 +3566,44 @@ or tune thresholds in config.", ChatMessage::assistant(&history_response), ); if let Some(manager) = ctx.session_manager.as_ref() { - let session_config = CHANNEL_SESSION_CONFIG.get().cloned().unwrap_or_default(); - let session_id = resolve_session_id( - &session_config, - msg.sender.as_str(), - Some(msg.channel.as_str()), - ); - tracing::warn!(session_id, "session_id resolved"); - match manager.get_or_create(&session_id).await { - Ok(session) => { - let latest = { - let histories = ctx - .conversation_histories - .lock() - .unwrap_or_else(|e| e.into_inner()); - histories.get(&history_key).cloned().unwrap_or_default() - }; - let filtered: Vec = latest - .into_iter() - .filter(|m| { - m.role != "system" - && m.role != "tool" - && m.role != "tool_use" - && m.role != "tool_result" - }) - .collect(); - let saved_len = filtered.len(); - if let Err(err) = session.update_history(filtered).await { - tracing::warn!("Failed to update session history: {err}"); - } else { - tracing::warn!(saved_len, "session history saved"); + if let Some(session_config) = CHANNEL_SESSION_CONFIG.get().cloned() { + let session_id = resolve_session_id( + &session_config, + msg.sender.as_str(), + Some(msg.channel.as_str()), + ); + tracing::debug!(session_id, "session_id resolved"); + match manager.get_or_create(&session_id).await { + Ok(session) => { + let latest = { + let histories = ctx + .conversation_histories + .lock() + .unwrap_or_else(|e| e.into_inner()); + histories.get(&history_key).cloned().unwrap_or_default() + }; + let filtered: Vec = latest + .into_iter() + .filter(|m| { + m.role != "system" + && m.role != "tool" + && m.role != "tool_use" + && m.role != "tool_result" + }) + .collect(); + let saved_len = filtered.len(); + if let Err(err) = session.update_history(filtered).await { + tracing::warn!("Failed to update session history: {err}"); + } else { + tracing::debug!(saved_len, "session history saved"); + } + } + Err(err) => { + tracing::warn!("Failed to open session: {err}"); } } - Err(err) => { - tracing::warn!("Failed to open session: {err}"); - } + } else { + tracing::warn!("CHANNEL_SESSION_CONFIG not initialized, skipping session"); } } println!( @@ -7070,6 +7071,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -7180,6 +7182,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -7285,6 +7288,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -7381,6 +7385,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -7527,6 +7532,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -7621,6 +7627,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -7766,6 +7773,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -7881,6 +7889,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -7976,6 +7985,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -8093,6 +8103,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -8208,6 +8219,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(route_overrides)), api_key: None, @@ -8284,6 +8296,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -8373,6 +8386,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(provider_cache_seed)), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -8518,6 +8532,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: Some("http://127.0.0.1:11434".to_string()), @@ -8624,6 +8639,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 12, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -8689,6 +8705,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 3, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -8866,6 +8883,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -8951,6 +8969,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -9048,6 +9067,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -9127,6 +9147,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -9191,6 +9212,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 10, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -9712,6 +9734,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -9802,6 +9825,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -9892,6 +9916,7 @@ BTC is currently around $65,000 based on latest tool output."# max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(histories)), + session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -10596,6 +10621,7 @@ BTC is currently around $65,000 based on latest tool output."#; max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, @@ -10667,6 +10693,7 @@ BTC is currently around $65,000 based on latest tool output."#; max_tool_iterations: 5, min_relevance_score: 0.0, conversation_histories: Arc::new(Mutex::new(HashMap::new())), + session_manager: None, provider_cache: Arc::new(Mutex::new(HashMap::new())), route_overrides: Arc::new(Mutex::new(HashMap::new())), api_key: None, diff --git a/src/channels/telegram.rs b/src/channels/telegram.rs index bf8b646fd..9030d39bc 100644 --- a/src/channels/telegram.rs +++ b/src/channels/telegram.rs @@ -735,7 +735,7 @@ impl TelegramChannel { } fn log_poll_transport_error(sanitized: &str, consecutive_failures: u32) { - if consecutive_failures >= 6 && consecutive_failures % 6 == 0 { + if consecutive_failures >= 6 && consecutive_failures.is_multiple_of(6) { tracing::warn!( "Telegram poll transport error persists (consecutive={}): {}", consecutive_failures, diff --git a/src/config/mod.rs b/src/config/mod.rs index fbbeb0a88..eb196c7dc 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -6,23 +6,22 @@ pub use schema::{ apply_runtime_proxy_to_builder, build_runtime_proxy_client, build_runtime_proxy_client_with_timeouts, runtime_proxy_config, set_runtime_proxy_config, AgentConfig, AgentSessionBackend, AgentSessionConfig, AgentSessionStrategy, AgentsIpcConfig, - AuditConfig, AutonomyConfig, BrowserComputerUseConfig, - BrowserConfig, BuiltinHooksConfig, ChannelsConfig, ClassificationRule, ComposioConfig, Config, - CoordinationConfig, CostConfig, CronConfig, DelegateAgentConfig, DiscordConfig, - DockerRuntimeConfig, EmbeddingRouteConfig, EstopConfig, FeishuConfig, GatewayConfig, - GroupReplyConfig, GroupReplyMode, HardwareConfig, HardwareTransport, HeartbeatConfig, - HooksConfig, HttpRequestConfig, IMessageConfig, IdentityConfig, LarkConfig, MatrixConfig, - MemoryConfig, ModelRouteConfig, MultimodalConfig, NextcloudTalkConfig, ObservabilityConfig, + AuditConfig, AutonomyConfig, BrowserComputerUseConfig, BrowserConfig, BuiltinHooksConfig, + ChannelsConfig, ClassificationRule, ComposioConfig, Config, CoordinationConfig, CostConfig, + CronConfig, DelegateAgentConfig, DiscordConfig, DockerRuntimeConfig, EmbeddingRouteConfig, + EstopConfig, FeishuConfig, GatewayConfig, GroupReplyConfig, GroupReplyMode, HardwareConfig, + HardwareTransport, HeartbeatConfig, HooksConfig, HttpRequestConfig, IMessageConfig, + IdentityConfig, LarkConfig, MatrixConfig, MemoryConfig, ModelRouteConfig, MultimodalConfig, + NextcloudTalkConfig, NonCliNaturalLanguageApprovalMode, ObservabilityConfig, OtpChallengeDelivery, OtpConfig, OtpMethod, PeripheralBoardConfig, PeripheralsConfig, - NonCliNaturalLanguageApprovalMode, PerplexityFilterConfig, PluginEntryConfig, PluginsConfig, - ProviderConfig, ProxyConfig, ProxyScope, QdrantConfig, QueryClassificationConfig, - ReliabilityConfig, ResearchPhaseConfig, ResearchTrigger, ResourceLimitsConfig, RuntimeConfig, - SandboxBackend, SandboxConfig, SchedulerConfig, SecretsConfig, SecurityConfig, - SecurityRoleConfig, SkillsConfig, SkillsPromptInjectionMode, SlackConfig, StorageConfig, - StorageProviderConfig, StorageProviderSection, StreamMode, SyscallAnomalyConfig, - TelegramConfig, TranscriptionConfig, TunnelConfig, UrlAccessConfig, - WasmCapabilityEscalationMode, WasmConfig, WasmModuleHashPolicy, WasmRuntimeConfig, - WasmSecurityConfig, WebFetchConfig, WebSearchConfig, WebhookConfig, + PerplexityFilterConfig, PluginEntryConfig, PluginsConfig, ProviderConfig, ProxyConfig, + ProxyScope, QdrantConfig, QueryClassificationConfig, ReliabilityConfig, ResearchPhaseConfig, + ResearchTrigger, ResourceLimitsConfig, RuntimeConfig, SandboxBackend, SandboxConfig, + SchedulerConfig, SecretsConfig, SecurityConfig, SecurityRoleConfig, SkillsConfig, + SkillsPromptInjectionMode, SlackConfig, StorageConfig, StorageProviderConfig, + StorageProviderSection, StreamMode, SyscallAnomalyConfig, TelegramConfig, TranscriptionConfig, + TunnelConfig, UrlAccessConfig, WasmCapabilityEscalationMode, WasmConfig, WasmModuleHashPolicy, + WasmRuntimeConfig, WasmSecurityConfig, WebFetchConfig, WebSearchConfig, WebhookConfig, }; pub fn name_and_presence(channel: Option<&T>) -> (&'static str, bool) { diff --git a/src/config/schema.rs b/src/config/schema.rs index 06529073d..48371ad11 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -5293,7 +5293,10 @@ pub(crate) async fn persist_active_workspace_config_dir(config_dir: &Path) -> Re ); } + #[cfg(unix)] sync_directory(&default_config_dir).await?; + #[cfg(not(unix))] + sync_directory(&default_config_dir)?; Ok(()) } @@ -7165,7 +7168,10 @@ impl Config { })?; } + #[cfg(unix)] sync_directory(parent_dir).await?; + #[cfg(not(unix))] + sync_directory(parent_dir)?; if had_existing_config { let _ = fs::remove_file(&backup_path).await; @@ -7175,23 +7181,21 @@ impl Config { } } +#[cfg(unix)] async fn sync_directory(path: &Path) -> Result<()> { - #[cfg(unix)] - { - let dir = File::open(path) - .await - .with_context(|| format!("Failed to open directory for fsync: {}", path.display()))?; - dir.sync_all() - .await - .with_context(|| format!("Failed to fsync directory metadata: {}", path.display()))?; - Ok(()) - } + let dir = File::open(path) + .await + .with_context(|| format!("Failed to open directory for fsync: {}", path.display()))?; + dir.sync_all() + .await + .with_context(|| format!("Failed to fsync directory metadata: {}", path.display()))?; + Ok(()) +} - #[cfg(not(unix))] - { - let _ = path; - Ok(()) - } +#[cfg(not(unix))] +fn sync_directory(path: &Path) -> Result<()> { + let _ = path; + Ok(()) } #[cfg(test)] @@ -7200,7 +7204,6 @@ mod tests { #[cfg(unix)] use std::os::unix::fs::PermissionsExt; use std::path::PathBuf; - use tempfile::TempDir; use tokio::sync::{Mutex, MutexGuard}; use tokio::test; use tokio_stream::wrappers::ReadDirStream; @@ -7390,7 +7393,7 @@ mod tests { #[cfg(unix)] #[test] async fn save_sets_config_permissions_on_new_file() { - let temp = TempDir::new().expect("temp dir"); + let temp = tempfile::TempDir::new().expect("temp dir"); let config_path = temp.path().join("config.toml"); let workspace_dir = temp.path().join("workspace"); @@ -8013,7 +8016,10 @@ tool_dispatcher = "xml" )); fs::create_dir_all(&dir).await.unwrap(); + #[cfg(unix)] sync_directory(&dir).await.unwrap(); + #[cfg(not(unix))] + sync_directory(&dir).unwrap(); let _ = fs::remove_dir_all(&dir).await; } diff --git a/src/cron/scheduler.rs b/src/cron/scheduler.rs index 0dcca4ecd..3844e6bcf 100644 --- a/src/cron/scheduler.rs +++ b/src/cron/scheduler.rs @@ -53,7 +53,7 @@ pub async fn run(config: Config) -> Result<()> { pub async fn execute_job_now(config: &Config, job: &CronJob) -> (bool, String) { let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir); - execute_job_with_retry(config, &security, job).await + Box::pin(execute_job_with_retry(config, &security, job)).await } async fn execute_job_with_retry( @@ -68,7 +68,7 @@ async fn execute_job_with_retry( for attempt in 0..=retries { let (success, output) = match job.job_type { JobType::Shell => run_job_command(config, security, job).await, - JobType::Agent => run_agent_job(config, security, job).await, + JobType::Agent => Box::pin(run_agent_job(config, security, job)).await, }; last_output = output; @@ -101,18 +101,21 @@ async fn process_due_jobs( crate::health::mark_component_ok(component); let max_concurrent = config.scheduler.max_concurrent.max(1); - let mut in_flight = - stream::iter( - jobs.into_iter().map(|job| { - let config = config.clone(); - let security = Arc::clone(security); - let component = component.to_owned(); - async move { - execute_and_persist_job(&config, security.as_ref(), &job, &component).await - } - }), - ) - .buffer_unordered(max_concurrent); + let mut in_flight = stream::iter(jobs.into_iter().map(|job| { + let config = config.clone(); + let security = Arc::clone(security); + let component = component.to_owned(); + async move { + Box::pin(execute_and_persist_job( + &config, + security.as_ref(), + &job, + &component, + )) + .await + } + })) + .buffer_unordered(max_concurrent); while let Some((job_id, success, output)) = in_flight.next().await { if !success { @@ -131,7 +134,7 @@ async fn execute_and_persist_job( warn_if_high_frequency_agent_job(job); let started_at = Utc::now(); - let (success, output) = execute_job_with_retry(config, security, job).await; + let (success, output) = Box::pin(execute_job_with_retry(config, security, job)).await; let finished_at = Utc::now(); let success = persist_job_result(config, job, success, &output, started_at, finished_at).await; @@ -170,7 +173,7 @@ async fn run_agent_job( let run_result = match job.session_target { SessionTarget::Main | SessionTarget::Isolated => { - crate::agent::run( + Box::pin(crate::agent::run( config.clone(), Some(prefixed_prompt), None, @@ -178,7 +181,7 @@ async fn run_agent_job( config.default_temperature, vec![], false, - ) + )) .await } }; diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 3b25243f8..6e93764c2 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -65,7 +65,7 @@ pub async fn run(config: Config, host: String, port: u16) -> Result<()> { max_backoff, move || { let cfg = channels_cfg.clone(); - async move { crate::channels::start_channels(cfg).await } + async move { Box::pin(crate::channels::start_channels(cfg)).await } }, )); } else { @@ -214,7 +214,7 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> { for task in tasks { let prompt = format!("[Heartbeat Task] {task}"); let temp = config.default_temperature; - match crate::agent::run( + match Box::pin(crate::agent::run( config.clone(), Some(prompt), None, @@ -222,7 +222,7 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> { temp, vec![], false, - ) + )) .await { Ok(output) => { diff --git a/src/gateway/api.rs b/src/gateway/api.rs index a936264c0..20adaf035 100644 --- a/src/gateway/api.rs +++ b/src/gateway/api.rs @@ -706,7 +706,10 @@ fn restore_masked_sensitive_fields( restore_optional_secret(&mut incoming.proxy.http_proxy, ¤t.proxy.http_proxy); restore_optional_secret(&mut incoming.proxy.https_proxy, ¤t.proxy.https_proxy); restore_optional_secret(&mut incoming.proxy.all_proxy, ¤t.proxy.all_proxy); - restore_optional_secret(&mut incoming.transcription.api_key, ¤t.transcription.api_key); + restore_optional_secret( + &mut incoming.transcription.api_key, + ¤t.transcription.api_key, + ); restore_optional_secret( &mut incoming.browser.computer_use.api_key, ¤t.browser.computer_use.api_key, @@ -932,7 +935,10 @@ mod tests { assert_eq!(hydrated.config_path, current.config_path); assert_eq!(hydrated.workspace_dir, current.workspace_dir); assert_eq!(hydrated.api_key, current.api_key); - assert_eq!(hydrated.transcription.api_key, current.transcription.api_key); + assert_eq!( + hydrated.transcription.api_key, + current.transcription.api_key + ); assert_eq!(hydrated.default_model.as_deref(), Some("gpt-4.1-mini")); assert_eq!( hydrated.reliability.api_keys, diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 27ccc7e8f..74ed8bc7b 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -981,7 +981,13 @@ pub(super) async fn run_gateway_chat_with_tools( channel_name: &str, ) -> anyhow::Result { let config = state.config.lock().clone(); - crate::agent::process_message(config, message, sender_id, channel_name).await + Box::pin(crate::agent::process_message( + config, + message, + sender_id, + channel_name, + )) + .await } fn sanitize_gateway_response(response: &str, tools: &[Box]) -> String { @@ -1810,7 +1816,14 @@ async fn handle_whatsapp_message( .await; } - match run_gateway_chat_with_tools(&state, &msg.content, &msg.sender, "whatsapp").await { + match Box::pin(run_gateway_chat_with_tools( + &state, + &msg.content, + &msg.sender, + "whatsapp", + )) + .await + { Ok(response) => { let safe_response = sanitize_gateway_response(&response, state.tools_registry_exec.as_ref()); @@ -1929,7 +1942,14 @@ async fn handle_linq_webhook( } // Call the LLM - match run_gateway_chat_with_tools(&state, &msg.content, &msg.sender, "linq").await { + match Box::pin(run_gateway_chat_with_tools( + &state, + &msg.content, + &msg.sender, + "linq", + )) + .await + { Ok(response) => { let safe_response = sanitize_gateway_response(&response, state.tools_registry_exec.as_ref()); @@ -2023,7 +2043,14 @@ async fn handle_wati_webhook(State(state): State, body: Bytes) -> impl } // Call the LLM - match run_gateway_chat_with_tools(&state, &msg.content, &msg.sender, "wati").await { + match Box::pin(run_gateway_chat_with_tools( + &state, + &msg.content, + &msg.sender, + "wati", + )) + .await + { Ok(response) => { let safe_response = sanitize_gateway_response(&response, state.tools_registry_exec.as_ref()); @@ -2129,8 +2156,13 @@ async fn handle_nextcloud_talk_webhook( .await; } - match run_gateway_chat_with_tools(&state, &msg.content, &msg.sender, "nextcloud_talk") - .await + match Box::pin(run_gateway_chat_with_tools( + &state, + &msg.content, + &msg.sender, + "nextcloud_talk", + )) + .await { Ok(response) => { let safe_response = @@ -2222,7 +2254,14 @@ async fn handle_qq_webhook( .await; } - match run_gateway_chat_with_tools(&state, &msg.content, &msg.sender, "qq").await { + match Box::pin(run_gateway_chat_with_tools( + &state, + &msg.content, + &msg.sender, + "qq", + )) + .await + { Ok(response) => { let safe_response = sanitize_gateway_response(&response, state.tools_registry_exec.as_ref()); diff --git a/src/gateway/openclaw_compat.rs b/src/gateway/openclaw_compat.rs index 95aa686c9..c68631a28 100644 --- a/src/gateway/openclaw_compat.rs +++ b/src/gateway/openclaw_compat.rs @@ -95,9 +95,7 @@ pub async fn handle_api_chat( && state.webhook_secret_hash.is_none() && !peer_addr.ip().is_loopback() { - tracing::warn!( - "/api/chat: rejected unauthenticated non-loopback request" - ); + tracing::warn!("/api/chat: rejected unauthenticated non-loopback request"); let err = serde_json::json!({ "error": "Unauthorized — configure pairing or X-Webhook-Secret for non-local access" }); @@ -152,7 +150,11 @@ pub async fn handle_api_chat( message.to_string() } else { let recent: Vec<&String> = chat_body.context.iter().rev().take(10).rev().collect(); - let context_block = recent.iter().map(|s| s.as_str()).collect::>().join("\n"); + let context_block = recent + .iter() + .map(|s| s.as_str()) + .collect::>() + .join("\n"); format!( "Recent conversation context:\n{}\n\nCurrent message:\n{}", context_block, message @@ -184,11 +186,15 @@ pub async fn handle_api_chat( }); // ── Run the full agent loop ── - let sender_id = chat_body - .session_id - .as_deref() - .unwrap_or(rate_key.as_str()); - match run_gateway_chat_with_tools(&state, &enriched_message, sender_id, "api_chat").await { + let sender_id = chat_body.session_id.as_deref().unwrap_or(rate_key.as_str()); + match Box::pin(run_gateway_chat_with_tools( + &state, + &enriched_message, + sender_id, + "api_chat", + )) + .await + { Ok(response) => { let safe_response = sanitize_gateway_response(&response, state.tools_registry_exec.as_ref()); @@ -399,7 +405,9 @@ pub async fn handle_v1_chat_completions_with_tools( .unwrap_or(""); let token = auth.strip_prefix("Bearer ").unwrap_or(""); if !state.pairing.is_authenticated(token) { - tracing::warn!("/v1/chat/completions (compat): rejected — not paired / invalid bearer token"); + tracing::warn!( + "/v1/chat/completions (compat): rejected — not paired / invalid bearer token" + ); let err = serde_json::json!({ "error": { "message": "Invalid API key. Pair first via POST /pair, then use Authorization: Bearer ", @@ -485,7 +493,11 @@ pub async fn handle_v1_chat_completions_with_tools( .rev() .filter(|m| m.role == "user" || m.role == "assistant") .map(|m| { - let role_label = if m.role == "user" { "User" } else { "Assistant" }; + let role_label = if m.role == "user" { + "User" + } else { + "Assistant" + }; format!("{}: {}", role_label, m.content) }) .collect(); @@ -499,7 +511,11 @@ pub async fn handle_v1_chat_completions_with_tools( .take(MAX_CONTEXT_MESSAGES) .rev() .collect(); - let context_block = recent.iter().map(|s| s.as_str()).collect::>().join("\n"); + let context_block = recent + .iter() + .map(|s| s.as_str()) + .collect::>() + .join("\n"); format!( "Recent conversation context:\n{}\n\nCurrent message:\n{}", context_block, message @@ -550,12 +566,12 @@ pub async fn handle_v1_chat_completions_with_tools( ); // ── Run the full agent loop ── - let reply = match run_gateway_chat_with_tools( + let reply = match Box::pin(run_gateway_chat_with_tools( &state, &enriched_message, rate_key.as_str(), "openai_compat", - ) + )) .await { Ok(response) => { @@ -628,9 +644,7 @@ pub async fn handle_v1_chat_completions_with_tools( } }; - let model_name = request - .model - .unwrap_or_else(|| state.model.clone()); + let model_name = request.model.unwrap_or_else(|| state.model.clone()); #[allow(clippy::cast_possible_truncation)] let prompt_tokens = (enriched_message.len() / 4) as u32; @@ -855,14 +869,20 @@ mod tests { fn api_chat_body_rejects_missing_message() { let json = r#"{"session_id": "s1"}"#; let result: Result = serde_json::from_str(json); - assert!(result.is_err(), "missing `message` field should fail deserialization"); + assert!( + result.is_err(), + "missing `message` field should fail deserialization" + ); } #[test] fn oai_request_rejects_empty_messages() { let json = r#"{"messages": []}"#; let req: OaiChatRequest = serde_json::from_str(json).unwrap(); - assert!(req.messages.is_empty(), "empty messages should parse but be caught by handler"); + assert!( + req.messages.is_empty(), + "empty messages should parse but be caught by handler" + ); } #[test] @@ -903,7 +923,17 @@ mod tests { .skip(1) .rev() .filter(|m| m.role == "user" || m.role == "assistant") - .map(|m| format!("{}: {}", if m.role == "user" { "User" } else { "Assistant" }, m.content)) + .map(|m| { + format!( + "{}: {}", + if m.role == "user" { + "User" + } else { + "Assistant" + }, + m.content + ) + }) .collect(); assert_eq!(context_messages.len(), 2); diff --git a/src/gateway/ws.rs b/src/gateway/ws.rs index bf930ae54..513599579 100644 --- a/src/gateway/ws.rs +++ b/src/gateway/ws.rs @@ -10,9 +10,7 @@ //! ``` use super::AppState; -use crate::agent::loop_::{ - build_shell_policy_instructions, build_tool_instructions_from_specs, -}; +use crate::agent::loop_::{build_shell_policy_instructions, build_tool_instructions_from_specs}; use crate::approval::ApprovalManager; use crate::providers::ChatMessage; use axum::{ @@ -263,8 +261,13 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) { })); // Full agentic loop with tools (includes WASM skills, shell, memory, etc.) - match super::run_gateway_chat_with_tools(&state, &content, ws_sender_id.as_str(), "ws") - .await + match Box::pin(super::run_gateway_chat_with_tools( + &state, + &content, + ws_sender_id.as_str(), + "ws", + )) + .await { Ok(response) => { let safe_response = diff --git a/src/lib.rs b/src/lib.rs index 056ab6ad9..5a8be0779 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,8 +57,6 @@ pub(crate) mod heartbeat; pub mod hooks; pub(crate) mod identity; // Intentionally unused re-export — public API surface for plugin authors. -#[allow(unused_imports)] -pub(crate) mod plugins; pub(crate) mod integrations; pub mod memory; pub(crate) mod migration; @@ -66,6 +64,8 @@ pub(crate) mod multimodal; pub mod observability; pub(crate) mod onboard; pub mod peripherals; +#[allow(unused_imports)] +pub(crate) mod plugins; pub mod providers; pub mod rag; pub mod runtime; diff --git a/src/main.rs b/src/main.rs index 8717e5fb8..3fd91a661 100644 --- a/src/main.rs +++ b/src/main.rs @@ -798,9 +798,9 @@ async fn main() -> Result<()> { bail!("--channels-only does not accept --force"); } let config = if channels_only { - onboard::run_channels_repair_wizard().await + Box::pin(onboard::run_channels_repair_wizard()).await } else if interactive { - onboard::run_wizard(force).await + Box::pin(onboard::run_wizard(force)).await } else { onboard::run_quick_setup( api_key.as_deref(), @@ -814,7 +814,7 @@ async fn main() -> Result<()> { }?; // Auto-start channels if user said yes during wizard if std::env::var("ZEROCLAW_AUTOSTART_CHANNELS").as_deref() == Ok("1") { - channels::start_channels(config).await?; + Box::pin(channels::start_channels(config)).await?; } return Ok(()); } @@ -875,7 +875,7 @@ async fn main() -> Result<()> { // Single-shot mode (-m) runs non-interactively: no TTY approval prompt, // so tools are not denied by a stdin read returning EOF. let interactive = message.is_none(); - agent::run( + Box::pin(agent::run( config, message, provider, @@ -883,7 +883,7 @@ async fn main() -> Result<()> { temperature, peripheral, interactive, - ) + )) .await .map(|_| ()) } @@ -1114,8 +1114,8 @@ async fn main() -> Result<()> { }, Commands::Channel { channel_command } => match channel_command { - ChannelCommands::Start => channels::start_channels(config).await, - ChannelCommands::Doctor => channels::doctor_channels(config).await, + ChannelCommands::Start => Box::pin(channels::start_channels(config)).await, + ChannelCommands::Doctor => Box::pin(channels::doctor_channels(config)).await, other => channels::handle_command(other, &config).await, }, diff --git a/src/onboard/wizard.rs b/src/onboard/wizard.rs index fe4292280..300029b8e 100644 --- a/src/onboard/wizard.rs +++ b/src/onboard/wizard.rs @@ -778,8 +778,7 @@ fn default_model_for_provider(provider: &str) -> String { "qwen-code" => "qwen3-coder-plus".into(), "ollama" => "llama3.2".into(), "llamacpp" => "ggml-org/gpt-oss-20b-GGUF".into(), - "sglang" | "vllm" | "osaurus" => "default".into(), - "copilot" => "default".into(), + "sglang" | "vllm" | "osaurus" | "copilot" => "default".into(), "gemini" => "gemini-2.5-pro".into(), "kimi-code" => "kimi-for-coding".into(), "bedrock" => "anthropic.claude-sonnet-4-5-20250929-v1:0".into(), diff --git a/src/plugins/discovery.rs b/src/plugins/discovery.rs index 330080e18..a7354f81c 100644 --- a/src/plugins/discovery.rs +++ b/src/plugins/discovery.rs @@ -5,7 +5,9 @@ use std::path::{Path, PathBuf}; -use super::manifest::{load_manifest, ManifestLoadResult, PluginManifest, PLUGIN_MANIFEST_FILENAME}; +use super::manifest::{ + load_manifest, ManifestLoadResult, PluginManifest, PLUGIN_MANIFEST_FILENAME, +}; use super::registry::{DiagnosticLevel, PluginDiagnostic, PluginOrigin}; /// A discovered plugin before loading. @@ -79,10 +81,7 @@ fn scan_dir(dir: &Path, origin: PluginOrigin) -> (Vec, Vec/.zeroclaw/extensions/` /// 4. Extra paths from config `[plugins] load_paths` -pub fn discover_plugins( - workspace_dir: Option<&Path>, - extra_paths: &[PathBuf], -) -> DiscoveryResult { +pub fn discover_plugins(workspace_dir: Option<&Path>, extra_paths: &[PathBuf]) -> DiscoveryResult { let mut all_plugins = Vec::new(); let mut all_diagnostics = Vec::new(); @@ -127,7 +126,7 @@ pub fn discover_plugins( let mut deduped: Vec = Vec::with_capacity(seen.len()); // Collect in insertion order of the winning index let mut indices: Vec = seen.values().copied().collect(); - indices.sort(); + indices.sort_unstable(); for i in indices { deduped.push(all_plugins.swap_remove(i)); } @@ -183,10 +182,7 @@ version = "0.1.0" make_plugin_dir(&ext_dir, "custom-one"); let result = discover_plugins(None, &[ext_dir]); - assert!(result - .plugins - .iter() - .any(|p| p.manifest.id == "custom-one")); + assert!(result.plugins.iter().any(|p| p.manifest.id == "custom-one")); } #[test] diff --git a/src/plugins/loader.rs b/src/plugins/loader.rs index 722e11f1a..90893a17e 100644 --- a/src/plugins/loader.rs +++ b/src/plugins/loader.rs @@ -13,7 +13,10 @@ use tracing::{info, warn}; use crate::config::PluginsConfig; use super::discovery::discover_plugins; -use super::registry::*; +use super::registry::{ + DiagnosticLevel, PluginDiagnostic, PluginHookRegistration, PluginOrigin, PluginRecord, + PluginRegistry, PluginStatus, PluginToolRegistration, +}; use super::traits::{Plugin, PluginApi, PluginLogger}; /// Resolve whether a discovered plugin should be enabled. @@ -306,7 +309,10 @@ mod tests { }; let reg = load_plugins(&cfg, None, vec![]); assert_eq!(reg.active_count(), 0); - assert!(reg.diagnostics.iter().any(|d| d.message.contains("disabled"))); + assert!(reg + .diagnostics + .iter() + .any(|d| d.message.contains("disabled"))); } #[test] diff --git a/src/security/leak_detector.rs b/src/security/leak_detector.rs index 3c9c9122a..df49fab37 100644 --- a/src/security/leak_detector.rs +++ b/src/security/leak_detector.rs @@ -363,7 +363,7 @@ fn shannon_entropy(bytes: &[u8]) -> f64 { .iter() .filter(|&&count| count > 0) .map(|&count| { - let p = count as f64 / len; + let p = f64::from(count) / len; -p * p.log2() }) .sum() @@ -390,7 +390,7 @@ mod tests { assert!(patterns.iter().any(|p| p.contains("Stripe"))); assert!(redacted.contains("[REDACTED")); } - _ => panic!("Should detect Stripe key"), + LeakResult::Clean => panic!("Should detect Stripe key"), } } @@ -403,7 +403,7 @@ mod tests { LeakResult::Detected { patterns, .. } => { assert!(patterns.iter().any(|p| p.contains("AWS"))); } - _ => panic!("Should detect AWS key"), + LeakResult::Clean => panic!("Should detect AWS key"), } } @@ -421,7 +421,7 @@ MIIEowIBAAKCAQEA0ZPr5JeyVDonXsKhfq... assert!(patterns.iter().any(|p| p.contains("private key"))); assert!(redacted.contains("[REDACTED_PRIVATE_KEY]")); } - _ => panic!("Should detect private key"), + LeakResult::Clean => panic!("Should detect private key"), } } @@ -435,7 +435,7 @@ MIIEowIBAAKCAQEA0ZPr5JeyVDonXsKhfq... assert!(patterns.iter().any(|p| p.contains("JWT"))); assert!(redacted.contains("[REDACTED_JWT]")); } - _ => panic!("Should detect JWT"), + LeakResult::Clean => panic!("Should detect JWT"), } } @@ -448,7 +448,7 @@ MIIEowIBAAKCAQEA0ZPr5JeyVDonXsKhfq... LeakResult::Detected { patterns, .. } => { assert!(patterns.iter().any(|p| p.contains("PostgreSQL"))); } - _ => panic!("Should detect database URL"), + LeakResult::Clean => panic!("Should detect database URL"), } } @@ -506,7 +506,7 @@ MIIEowIBAAKCAQEA0ZPr5JeyVDonXsKhfq... assert!(patterns.iter().any(|p| p.contains("High-entropy token"))); assert!(redacted.contains("[REDACTED_HIGH_ENTROPY_TOKEN]")); } - _ => panic!("expected high-entropy detection"), + LeakResult::Clean => panic!("expected high-entropy detection"), } } diff --git a/src/security/perplexity.rs b/src/security/perplexity.rs index c2e68e7cd..109231864 100644 --- a/src/security/perplexity.rs +++ b/src/security/perplexity.rs @@ -61,7 +61,8 @@ fn char_class_perplexity(prefix: &str, suffix: &str) -> f64 { let class = classify_char(ch); if let Some(p) = suffix_prev { let numerator = f64::from(transition[p][class] + 1); - let denominator = f64::from(row_totals[p] + CLASS_COUNT as u32); + let class_count_u32 = u32::try_from(CLASS_COUNT).unwrap_or(u32::MAX); + let denominator = f64::from(row_totals[p] + class_count_u32); nll += -(numerator / denominator).ln(); pairs += 1; } diff --git a/src/skills/audit.rs b/src/skills/audit.rs index 0e7f2f896..825c54d61 100644 --- a/src/skills/audit.rs +++ b/src/skills/audit.rs @@ -3,7 +3,6 @@ use regex::Regex; use std::fs; use std::path::{Component, Path, PathBuf}; use std::sync::OnceLock; -use zip::ZipArchive; const MAX_TEXT_FILE_BYTES: u64 = 512 * 1024; diff --git a/src/skills/mod.rs b/src/skills/mod.rs index 9feeb1d5f..70ad5ee11 100644 --- a/src/skills/mod.rs +++ b/src/skills/mod.rs @@ -452,7 +452,8 @@ fn load_skill_md(path: &Path, dir: &Path) -> Result { if let Ok(raw) = std::fs::read(&meta_path) { if let Ok(meta) = serde_json::from_slice::(&raw) { if let Some(slug) = meta.get("slug").and_then(|v| v.as_str()) { - let normalized = normalize_skill_name(slug.split('/').last().unwrap_or(slug)); + let normalized = + normalize_skill_name(slug.split('/').next_back().unwrap_or(slug)); if !normalized.is_empty() { name = normalized; } @@ -1616,7 +1617,7 @@ fn extract_zip_skill_meta( f.read_to_end(&mut buf).ok(); if let Ok(meta) = serde_json::from_slice::(&buf) { let slug_raw = meta.get("slug").and_then(|v| v.as_str()).unwrap_or(""); - let base = slug_raw.split('/').last().unwrap_or(slug_raw); + let base = slug_raw.split('/').next_back().unwrap_or(slug_raw); let name = normalize_skill_name(base); if !name.is_empty() { let version = meta diff --git a/src/tools/cron_run.rs b/src/tools/cron_run.rs index bb3c9e419..2d73f414d 100644 --- a/src/tools/cron_run.rs +++ b/src/tools/cron_run.rs @@ -116,7 +116,8 @@ impl Tool for CronRunTool { } let started_at = Utc::now(); - let (success, output) = cron::scheduler::execute_job_now(&self.config, &job).await; + let (success, output) = + Box::pin(cron::scheduler::execute_job_now(&self.config, &job)).await; let finished_at = Utc::now(); let duration_ms = (finished_at - started_at).num_milliseconds(); let status = if success { "ok" } else { "error" }; diff --git a/src/tools/docx_read.rs b/src/tools/docx_read.rs index 316a308e5..e63527631 100644 --- a/src/tools/docx_read.rs +++ b/src/tools/docx_read.rs @@ -202,24 +202,23 @@ impl Tool for DocxReadTool { } }; - let text = - match tokio::task::spawn_blocking(move || extract_docx_text(&bytes)).await { - Ok(Ok(t)) => t, - Ok(Err(e)) => { - return Ok(ToolResult { - success: false, - output: String::new(), - error: Some(format!("DOCX extraction failed: {e}")), - }); - } - Err(e) => { - return Ok(ToolResult { - success: false, - output: String::new(), - error: Some(format!("DOCX extraction task panicked: {e}")), - }); - } - }; + let text = match tokio::task::spawn_blocking(move || extract_docx_text(&bytes)).await { + Ok(Ok(t)) => t, + Ok(Err(e)) => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some(format!("DOCX extraction failed: {e}")), + }); + } + Err(e) => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some(format!("DOCX extraction task panicked: {e}")), + }); + } + }; if text.trim().is_empty() { return Ok(ToolResult { diff --git a/src/tools/mcp_client.rs b/src/tools/mcp_client.rs index bdc77419e..70e0f7f91 100644 --- a/src/tools/mcp_client.rs +++ b/src/tools/mcp_client.rs @@ -301,11 +301,11 @@ mod tests { name: "nonexistent".to_string(), command: "/usr/bin/this_binary_does_not_exist_zeroclaw_test".to_string(), args: vec![], - env: Default::default(), + env: std::collections::HashMap::default(), tool_timeout_secs: None, transport: McpTransport::Stdio, url: None, - headers: Default::default(), + headers: std::collections::HashMap::default(), }; let result = McpServer::connect(config).await; assert!(result.is_err()); @@ -320,11 +320,11 @@ mod tests { name: "bad".to_string(), command: "/usr/bin/does_not_exist_zc_test".to_string(), args: vec![], - env: Default::default(), + env: std::collections::HashMap::default(), tool_timeout_secs: None, transport: McpTransport::Stdio, url: None, - headers: Default::default(), + headers: std::collections::HashMap::default(), }]; let registry = McpRegistry::connect_all(&configs) .await diff --git a/src/tools/mcp_transport.rs b/src/tools/mcp_transport.rs index 89d54ae0a..61052a343 100644 --- a/src/tools/mcp_transport.rs +++ b/src/tools/mcp_transport.rs @@ -5,7 +5,7 @@ use std::borrow::Cow; use anyhow::{anyhow, bail, Context, Result}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child, Command}; -use tokio::sync::{Mutex, Notify, oneshot}; +use tokio::sync::{oneshot, Mutex, Notify}; use tokio::time::{timeout, Duration}; use tokio_stream::StreamExt; @@ -221,7 +221,8 @@ impl SseTransport { .ok_or_else(|| anyhow!("URL required for SSE transport"))? .clone(); - let client = reqwest::Client::builder().build() + let client = reqwest::Client::builder() + .build() .context("failed to build HTTP client")?; Ok(Self { @@ -288,7 +289,7 @@ impl SseTransport { self.reader_task = Some(tokio::spawn(async move { let stream = resp .bytes_stream() - .map(|item| item.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))); + .map(|item| item.map_err(std::io::Error::other)); let reader = tokio_util::io::StreamReader::new(stream); let mut lines = BufReader::new(reader).lines(); @@ -325,16 +326,13 @@ impl SseTransport { if let Some(rest) = line.strip_prefix("event:") { cur_event = Some(rest.trim().to_string()); - continue; } if let Some(rest) = line.strip_prefix("data:") { let rest = rest.strip_prefix(' ').unwrap_or(rest); cur_data.push(rest.to_string()); - continue; } if let Some(rest) = line.strip_prefix("id:") { cur_id = Some(rest.trim().to_string()); - continue; } } } @@ -380,7 +378,7 @@ impl SseTransport { Ok((derived, false)) } - async fn maybe_try_alternate_message_url( + fn maybe_try_alternate_message_url( &self, current_url: &str, from_endpoint: bool, @@ -467,7 +465,9 @@ async fn handle_sse_event( return; }; - let Some(id_val) = resp.id.clone() else { return; }; + let Some(id_val) = resp.id.clone() else { + return; + }; let id = match id_val.as_u64() { Some(v) => v, None => return, @@ -480,7 +480,11 @@ async fn handle_sse_event( if let Some(tx) = tx { let _ = tx.send(resp); } else { - tracing::debug!("MCP SSE `{}` received response for unknown id {}", server_name, id); + tracing::debug!( + "MCP SSE `{}` received response for unknown id {}", + server_name, + id + ); } } @@ -542,7 +546,7 @@ async fn read_first_jsonrpc_from_sse_response( ) -> Result> { let stream = resp .bytes_stream() - .map(|item| item.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))); + .map(|item| item.map_err(std::io::Error::other)); let reader = tokio_util::io::StreamReader::new(stream); let mut lines = BufReader::new(reader).lines(); @@ -563,7 +567,8 @@ async fn read_first_jsonrpc_from_sse_response( cur_data.clear(); let event = event.unwrap_or_else(|| "message".to_string()); - if event.eq_ignore_ascii_case("endpoint") || event.eq_ignore_ascii_case("mcp-endpoint") { + if event.eq_ignore_ascii_case("endpoint") || event.eq_ignore_ascii_case("mcp-endpoint") + { continue; } if !event.eq_ignore_ascii_case("message") { @@ -586,12 +591,10 @@ async fn read_first_jsonrpc_from_sse_response( } if let Some(rest) = line.strip_prefix("event:") { cur_event = Some(rest.trim().to_string()); - continue; } if let Some(rest) = line.strip_prefix("data:") { let rest = rest.strip_prefix(' ').unwrap_or(rest); cur_data.push(rest.to_string()); - continue; } } @@ -603,10 +606,7 @@ impl McpTransportConn for SseTransport { async fn send_and_recv(&mut self, request: &JsonRpcRequest) -> Result { self.ensure_connected().await?; - let id = request - .id - .as_ref() - .and_then(|v| v.as_u64()); + let id = request.id.as_ref().and_then(|v| v.as_u64()); let body = serde_json::to_string(request)?; let (mut message_url, mut from_endpoint) = self.get_message_url().await?; @@ -654,7 +654,10 @@ impl McpTransportConn for SseTransport { let mut got_direct = None; let mut last_status = None; - for (i, url) in std::iter::once(primary_url).chain(secondary_url.into_iter()).enumerate() { + for (i, url) in std::iter::once(primary_url) + .chain(secondary_url.into_iter()) + .enumerate() + { let mut req = self .client .post(&url) @@ -664,7 +667,11 @@ impl McpTransportConn for SseTransport { for (key, value) in &self.headers { req = req.header(key, value); } - if !self.headers.keys().any(|k| k.eq_ignore_ascii_case("Accept")) { + if !self + .headers + .keys() + .any(|k| k.eq_ignore_ascii_case("Accept")) + { req = req.header("Accept", "application/json, text/event-stream"); } @@ -715,12 +722,11 @@ impl McpTransportConn for SseTransport { } Err(_) => continue, } - } else { - if let Some(resp) = read_first_jsonrpc_from_sse_response(resp).await? { - got_direct = Some(resp); - } - break; } + if let Some(resp) = read_first_jsonrpc_from_sse_response(resp).await? { + got_direct = Some(resp); + } + break; } let text = if i == 0 && has_secondary { @@ -861,7 +867,8 @@ mod tests { #[test] fn test_extract_json_from_sse_uses_last_event_with_data() { - let input = ": keep-alive\n\nid: 1\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"result\":{}}\n\n"; + let input = + ": keep-alive\n\nid: 1\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"result\":{}}\n\n"; let extracted = extract_json_from_sse_text(input); let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap(); } diff --git a/src/tools/model_routing_config.rs b/src/tools/model_routing_config.rs index 7e7b01096..44d4993d2 100644 --- a/src/tools/model_routing_config.rs +++ b/src/tools/model_routing_config.rs @@ -900,8 +900,9 @@ impl Tool for ModelRoutingConfigTool { mod tests { use super::*; use crate::security::{AutonomyLevel, SecurityPolicy}; - use std::sync::{Mutex, OnceLock}; + use std::sync::OnceLock; use tempfile::TempDir; + use tokio::sync::Mutex; fn test_security() -> Arc { Arc::new(SecurityPolicy { @@ -945,11 +946,9 @@ mod tests { } } - fn env_lock() -> std::sync::MutexGuard<'static, ()> { + async fn env_lock() -> tokio::sync::MutexGuard<'static, ()> { static LOCK: OnceLock> = OnceLock::new(); - LOCK.get_or_init(|| Mutex::new(())) - .lock() - .expect("env lock poisoned") + LOCK.get_or_init(|| Mutex::new(())).lock().await } async fn test_config(tmp: &TempDir) -> Arc { @@ -1118,7 +1117,7 @@ mod tests { #[tokio::test] async fn get_reports_env_backed_credentials_for_routes_and_agents() { - let _env_lock = env_lock(); + let _env_lock = env_lock().await; let _provider_guard = EnvGuard::set("TELNYX_API_KEY", Some("test-telnyx-key")); let _generic_guard = EnvGuard::set("ZEROCLAW_API_KEY", None); let _api_key_guard = EnvGuard::set("API_KEY", None); @@ -1160,6 +1159,9 @@ mod tests { .unwrap(); assert_eq!(route["api_key_configured"], json!(true)); - assert_eq!(output["agents"]["voice_helper"]["api_key_configured"], json!(true)); + assert_eq!( + output["agents"]["voice_helper"]["api_key_configured"], + json!(true) + ); } } diff --git a/tests/agent_e2e.rs b/tests/agent_e2e.rs index dfa18a378..0d14bc7b8 100644 --- a/tests/agent_e2e.rs +++ b/tests/agent_e2e.rs @@ -726,7 +726,6 @@ async fn e2e_live_research_phase() { use zeroclaw::config::{ResearchPhaseConfig, ResearchTrigger}; use zeroclaw::observability::NoopObserver; use zeroclaw::providers::openai_codex::OpenAiCodexProvider; - use zeroclaw::providers::traits::Provider; use zeroclaw::tools::{Tool, ToolResult}; // ── Test should_trigger ──