//! Axum-based HTTP gateway with proper HTTP/1.1 compliance, body limits, and timeouts. //! //! This module replaces the raw TCP implementation with axum for: //! - Proper HTTP/1.1 parsing and compliance //! - Content-Length validation (handled by hyper) //! - Request body size limits (64KB max) //! - Request timeouts (30s) to prevent slow-loris attacks //! - Header sanitization (handled by axum/hyper) pub mod api; mod openai_compat; mod openclaw_compat; pub mod sse; pub mod static_files; pub mod ws; use crate::channels::{ BlueBubblesChannel, Channel, GitHubChannel, LinqChannel, NextcloudTalkChannel, QQChannel, SendMessage, WatiChannel, WhatsAppChannel, }; use crate::config::Config; use crate::cost::CostTracker; use crate::memory::{self, Memory, MemoryCategory}; use crate::providers::{self, ChatMessage, Provider}; use crate::runtime; use crate::security::pairing::{constant_time_eq, is_public_bind, PairingGuard}; use crate::security::SecurityPolicy; use crate::tools::traits::ToolSpec; use crate::tools::{self, Tool}; use crate::util::truncate_with_ellipsis; use anyhow::{Context, Result}; use axum::{ body::{Body, Bytes}, extract::{ConnectInfo, Query, State}, http::{header, HeaderMap, StatusCode}, response::{IntoResponse, Json, Response}, routing::{delete, get, post, put}, Router, }; use futures_util::StreamExt; use parking_lot::Mutex; use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::time::{Duration, Instant}; use tower_http::limit::RequestBodyLimitLayer; use tower_http::timeout::TimeoutLayer; use uuid::Uuid; /// Maximum request body size (64KB) — prevents memory exhaustion pub const MAX_BODY_SIZE: usize = 65_536; /// Request timeout (30s) — prevents slow-loris attacks pub const REQUEST_TIMEOUT_SECS: u64 = 30; /// Sliding window used by gateway rate limiting. pub const RATE_LIMIT_WINDOW_SECS: u64 = 60; /// Fallback max distinct client keys tracked in gateway rate limiter. pub const RATE_LIMIT_MAX_KEYS_DEFAULT: usize = 10_000; /// Fallback max distinct idempotency keys retained in gateway memory. pub const IDEMPOTENCY_MAX_KEYS_DEFAULT: usize = 10_000; fn webhook_memory_key() -> String { format!("webhook_msg_{}", Uuid::new_v4()) } fn whatsapp_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String { format!("whatsapp_{}_{}", msg.sender, msg.id) } fn linq_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String { format!("linq_{}_{}", msg.sender, msg.id) } fn github_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String { format!("github_{}_{}", msg.sender, msg.id) } fn bluebubbles_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String { format!("bluebubbles_{}_{}", msg.sender, msg.id) } fn wati_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String { format!("wati_{}_{}", msg.sender, msg.id) } fn nextcloud_talk_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String { format!("nextcloud_talk_{}_{}", msg.sender, msg.id) } fn qq_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String { format!("qq_{}_{}", msg.sender, msg.id) } fn gateway_message_session_id(msg: &crate::channels::traits::ChannelMessage) -> String { if msg.channel == "qq" || msg.channel == "napcat" { return format!("{}_{}", msg.channel, msg.sender); } match &msg.thread_ts { Some(thread_id) => format!("{}_{}_{}", msg.channel, thread_id, msg.sender), None => format!("{}_{}", msg.channel, msg.sender), } } fn hash_webhook_secret(value: &str) -> String { use sha2::{Digest, Sha256}; let digest = Sha256::digest(value.as_bytes()); hex::encode(digest) } /// How often the rate limiter sweeps stale IP entries from its map. const RATE_LIMITER_SWEEP_INTERVAL_SECS: u64 = 300; // 5 minutes #[derive(Debug)] struct SlidingWindowRateLimiter { limit_per_window: u32, window: Duration, max_keys: usize, requests: Mutex<(HashMap>, Instant)>, } impl SlidingWindowRateLimiter { fn new(limit_per_window: u32, window: Duration, max_keys: usize) -> Self { Self { limit_per_window, window, max_keys: max_keys.max(1), requests: Mutex::new((HashMap::new(), Instant::now())), } } fn prune_stale(requests: &mut HashMap>, cutoff: Instant) { requests.retain(|_, timestamps| { timestamps.retain(|t| *t > cutoff); !timestamps.is_empty() }); } fn allow(&self, key: &str) -> bool { if self.limit_per_window == 0 { return true; } let now = Instant::now(); let cutoff = now.checked_sub(self.window).unwrap_or_else(Instant::now); let mut guard = self.requests.lock(); let (requests, last_sweep) = &mut *guard; // Periodic sweep: remove keys with no recent requests if last_sweep.elapsed() >= Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS) { Self::prune_stale(requests, cutoff); *last_sweep = now; } if !requests.contains_key(key) && requests.len() >= self.max_keys { // Opportunistic stale cleanup before eviction under cardinality pressure. Self::prune_stale(requests, cutoff); *last_sweep = now; if requests.len() >= self.max_keys { let evict_key = requests .iter() .min_by_key(|(_, timestamps)| timestamps.last().copied().unwrap_or(cutoff)) .map(|(k, _)| k.clone()); if let Some(evict_key) = evict_key { requests.remove(&evict_key); } } } let entry = requests.entry(key.to_owned()).or_default(); entry.retain(|instant| *instant > cutoff); if entry.len() >= self.limit_per_window as usize { return false; } entry.push(now); true } } #[derive(Debug)] pub struct GatewayRateLimiter { pair: SlidingWindowRateLimiter, webhook: SlidingWindowRateLimiter, } impl GatewayRateLimiter { fn new(pair_per_minute: u32, webhook_per_minute: u32, max_keys: usize) -> Self { let window = Duration::from_secs(RATE_LIMIT_WINDOW_SECS); Self { pair: SlidingWindowRateLimiter::new(pair_per_minute, window, max_keys), webhook: SlidingWindowRateLimiter::new(webhook_per_minute, window, max_keys), } } fn allow_pair(&self, key: &str) -> bool { self.pair.allow(key) } fn allow_webhook(&self, key: &str) -> bool { self.webhook.allow(key) } } #[derive(Debug)] pub struct IdempotencyStore { ttl: Duration, max_keys: usize, keys: Mutex>, } impl IdempotencyStore { fn new(ttl: Duration, max_keys: usize) -> Self { Self { ttl, max_keys: max_keys.max(1), keys: Mutex::new(HashMap::new()), } } /// Returns true if this key is new and is now recorded. fn record_if_new(&self, key: &str) -> bool { let now = Instant::now(); let mut keys = self.keys.lock(); keys.retain(|_, seen_at| now.duration_since(*seen_at) < self.ttl); if keys.contains_key(key) { return false; } if keys.len() >= self.max_keys { let evict_key = keys .iter() .min_by_key(|(_, seen_at)| *seen_at) .map(|(k, _)| k.clone()); if let Some(evict_key) = evict_key { keys.remove(&evict_key); } } keys.insert(key.to_owned(), now); true } } fn parse_client_ip(value: &str) -> Option { let value = value.trim().trim_matches('"').trim(); if value.is_empty() { return None; } if let Ok(ip) = value.parse::() { return Some(ip); } if let Ok(addr) = value.parse::() { return Some(addr.ip()); } let value = value.trim_matches(['[', ']']); value.parse::().ok() } fn forwarded_client_ip(headers: &HeaderMap) -> Option { if let Some(xff) = headers.get("X-Forwarded-For").and_then(|v| v.to_str().ok()) { for candidate in xff.split(',') { if let Some(ip) = parse_client_ip(candidate) { return Some(ip); } } } headers .get("X-Real-IP") .and_then(|v| v.to_str().ok()) .and_then(parse_client_ip) } pub(crate) fn client_key_from_request( peer_addr: Option, headers: &HeaderMap, trust_forwarded_headers: bool, ) -> String { if trust_forwarded_headers { if let Some(ip) = forwarded_client_ip(headers) { return ip.to_string(); } } peer_addr .map(|addr| addr.ip().to_string()) .unwrap_or_else(|| "unknown".to_string()) } fn normalize_max_keys(configured: usize, fallback: usize) -> usize { if configured == 0 { fallback.max(1) } else { configured } } /// Shared state for all axum handlers #[derive(Clone)] pub struct AppState { pub config: Arc>, pub provider: Arc, pub model: String, pub temperature: f64, pub mem: Arc, pub auto_save: bool, /// SHA-256 hash of `X-Webhook-Secret` (hex-encoded), never plaintext. pub webhook_secret_hash: Option>, pub pairing: Arc, pub trust_forwarded_headers: bool, pub rate_limiter: Arc, pub idempotency_store: Arc, pub whatsapp: Option>, /// `WhatsApp` app secret for webhook signature verification (`X-Hub-Signature-256`) pub whatsapp_app_secret: Option>, pub linq: Option>, /// Linq webhook signing secret for signature verification pub linq_signing_secret: Option>, pub bluebubbles: Option>, /// BlueBubbles inbound webhook secret for Bearer auth verification pub bluebubbles_webhook_secret: Option>, pub nextcloud_talk: Option>, /// Nextcloud Talk webhook secret for signature verification pub nextcloud_talk_webhook_secret: Option>, pub wati: Option>, pub qq: Option>, pub qq_webhook_enabled: bool, /// Observability backend for metrics scraping pub observer: Arc, /// Registered tool specs (for web dashboard tools page) pub tools_registry: Arc>, /// Executable tools for agent loop (web chat) pub tools_registry_exec: Arc>>, /// Multimodal config for image handling in web chat pub multimodal: crate::config::MultimodalConfig, /// Max tool iterations for agent loop pub max_tool_iterations: usize, /// Cost tracker (optional, for web dashboard cost page) pub cost_tracker: Option>, /// SSE broadcast channel for real-time events pub event_tx: tokio::sync::broadcast::Sender, } /// Run the HTTP gateway using axum with proper HTTP/1.1 compliance. #[allow(clippy::too_many_lines)] pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { // ── Security: refuse public bind without tunnel or explicit opt-in ── if is_public_bind(host) && config.tunnel.provider == "none" && !config.gateway.allow_public_bind { anyhow::bail!( "🛑 Refusing to bind to {host} — gateway would be exposed to the internet.\n\ Fix: use --host 127.0.0.1 (default), configure a tunnel, or set\n\ [gateway] allow_public_bind = true in config.toml (NOT recommended)." ); } let config_state = Arc::new(Mutex::new(config.clone())); // ── Hooks ────────────────────────────────────────────────────── let hooks: Option> = if config.hooks.enabled { let mut runner = crate::hooks::HookRunner::new(); if config.hooks.builtin.boot_script { runner.register(Box::new(crate::hooks::builtin::BootScriptHook)); } if config.hooks.builtin.command_logger { runner.register(Box::new(crate::hooks::builtin::CommandLoggerHook::new())); } if config.hooks.builtin.session_memory { runner.register(Box::new(crate::hooks::builtin::SessionMemoryHook)); } Some(std::sync::Arc::new(runner)) } else { None }; let addr: SocketAddr = format!("{host}:{port}").parse()?; let listener = tokio::net::TcpListener::bind(addr).await?; let actual_port = listener.local_addr()?.port(); let display_addr = format!("{host}:{actual_port}"); let provider: Arc = Arc::from(providers::create_resilient_provider_with_options( config.default_provider.as_deref().unwrap_or("openrouter"), config.api_key.as_deref(), config.api_url.as_deref(), &config.reliability, &providers::ProviderRuntimeOptions { auth_profile_override: None, provider_api_url: config.api_url.clone(), provider_transport: config.effective_provider_transport(), zeroclaw_dir: config.config_path.parent().map(std::path::PathBuf::from), secrets_encrypt: config.secrets.encrypt, reasoning_enabled: config.runtime.reasoning_enabled, reasoning_level: config.effective_provider_reasoning_level(), custom_provider_api_mode: config.provider_api.map(|mode| mode.as_compatible_mode()), max_tokens_override: None, model_support_vision: config.model_support_vision, }, )?); let model = config .default_model .clone() .unwrap_or_else(|| "anthropic/claude-sonnet-4".into()); let temperature = config.default_temperature; let mem: Arc = Arc::from(memory::create_memory_with_storage( &config.memory, Some(&config.storage.provider.config), &config.workspace_dir, config.api_key.as_deref(), )?); let runtime: Arc = Arc::from(runtime::create_runtime(&config.runtime)?); let security = Arc::new(SecurityPolicy::from_config( &config.autonomy, &config.workspace_dir, )); let (composio_key, composio_entity_id) = if config.composio.enabled { ( config.composio.api_key.as_deref(), Some(config.composio.entity_id.as_str()), ) } else { (None, None) }; let tools_registry_exec: Arc>> = Arc::new(tools::all_tools_with_runtime( Arc::new(config.clone()), &security, runtime, Arc::clone(&mem), composio_key, composio_entity_id, &config.browser, &config.http_request, &config.web_fetch, &config.workspace_dir, &config.agents, config.api_key.as_deref(), &config, )); let tools_registry: Arc> = Arc::new(tools_registry_exec.iter().map(|t| t.spec()).collect()); let max_tool_iterations = config.agent.max_tool_iterations; let multimodal_config = config.multimodal.clone(); // Cost tracker (optional) let cost_tracker = if config.cost.enabled { match CostTracker::new(config.cost.clone(), &config.workspace_dir) { Ok(ct) => Some(Arc::new(ct)), Err(e) => { tracing::warn!("Failed to initialize cost tracker: {e}"); None } } } else { None }; // SSE broadcast channel for real-time events let (event_tx, _event_rx) = tokio::sync::broadcast::channel::(256); // Extract webhook secret for authentication let webhook_secret_hash: Option> = config.channels_config.webhook.as_ref().and_then(|webhook| { webhook.secret.as_ref().and_then(|raw_secret| { let trimmed_secret = raw_secret.trim(); (!trimmed_secret.is_empty()) .then(|| Arc::::from(hash_webhook_secret(trimmed_secret))) }) }); // WhatsApp channel (if configured) let whatsapp_channel: Option> = config .channels_config .whatsapp .as_ref() .filter(|wa| wa.is_cloud_config()) .map(|wa| { Arc::new(WhatsAppChannel::new( wa.access_token.clone().unwrap_or_default(), wa.phone_number_id.clone().unwrap_or_default(), wa.verify_token.clone().unwrap_or_default(), wa.allowed_numbers.clone(), )) }); // WhatsApp app secret for webhook signature verification // Priority: environment variable > config file let whatsapp_app_secret: Option> = std::env::var("ZEROCLAW_WHATSAPP_APP_SECRET") .ok() .and_then(|secret| { let secret = secret.trim(); (!secret.is_empty()).then(|| secret.to_owned()) }) .or_else(|| { config.channels_config.whatsapp.as_ref().and_then(|wa| { wa.app_secret .as_deref() .map(str::trim) .filter(|secret| !secret.is_empty()) .map(ToOwned::to_owned) }) }) .map(Arc::from); // Linq channel (if configured) let linq_channel: Option> = config.channels_config.linq.as_ref().map(|lq| { Arc::new(LinqChannel::new( lq.api_token.clone(), lq.from_phone.clone(), lq.allowed_senders.clone(), )) }); // Linq signing secret for webhook signature verification // Priority: environment variable > config file let linq_signing_secret: Option> = std::env::var("ZEROCLAW_LINQ_SIGNING_SECRET") .ok() .and_then(|secret| { let secret = secret.trim(); (!secret.is_empty()).then(|| secret.to_owned()) }) .or_else(|| { config.channels_config.linq.as_ref().and_then(|lq| { lq.signing_secret .as_deref() .map(str::trim) .filter(|secret| !secret.is_empty()) .map(ToOwned::to_owned) }) }) .map(Arc::from); // BlueBubbles channel (if configured) let bluebubbles_channel: Option> = config.channels_config.bluebubbles.as_ref().map(|bb| { Arc::new(BlueBubblesChannel::new( bb.server_url.clone(), bb.password.clone(), bb.allowed_senders.clone(), bb.ignore_senders.clone(), )) }); let bluebubbles_webhook_secret: Option> = config .channels_config .bluebubbles .as_ref() .and_then(|bb| bb.webhook_secret.as_deref()) .map(Arc::from); // WATI channel (if configured) let wati_channel: Option> = config.channels_config.wati.as_ref().map(|wati_cfg| { Arc::new(WatiChannel::new( wati_cfg.api_token.clone(), wati_cfg.api_url.clone(), wati_cfg.tenant_id.clone(), wati_cfg.allowed_numbers.clone(), )) }); // QQ channel (if configured) let qq_channel: Option> = config.channels_config.qq.as_ref().map(|qq_cfg| { Arc::new(QQChannel::new_with_environment( qq_cfg.app_id.clone(), qq_cfg.app_secret.clone(), qq_cfg.allowed_users.clone(), qq_cfg.environment.clone(), )) }); let qq_webhook_enabled = config .channels_config .qq .as_ref() .is_some_and(|qq| qq.receive_mode == crate::config::schema::QQReceiveMode::Webhook); // Nextcloud Talk channel (if configured) let nextcloud_talk_channel: Option> = config.channels_config.nextcloud_talk.as_ref().map(|nc| { Arc::new(NextcloudTalkChannel::new( nc.base_url.clone(), nc.app_token.clone(), nc.allowed_users.clone(), )) }); // Nextcloud Talk webhook secret for signature verification // Priority: environment variable > config file let nextcloud_talk_webhook_secret: Option> = std::env::var("ZEROCLAW_NEXTCLOUD_TALK_WEBHOOK_SECRET") .ok() .and_then(|secret| { let secret = secret.trim(); (!secret.is_empty()).then(|| secret.to_owned()) }) .or_else(|| { config .channels_config .nextcloud_talk .as_ref() .and_then(|nc| { nc.webhook_secret .as_deref() .map(str::trim) .filter(|secret| !secret.is_empty()) .map(ToOwned::to_owned) }) }) .map(Arc::from); // ── Pairing guard ────────────────────────────────────── let pairing = Arc::new(PairingGuard::new( config.gateway.require_pairing, &config.gateway.paired_tokens, )); let rate_limit_max_keys = normalize_max_keys( config.gateway.rate_limit_max_keys, RATE_LIMIT_MAX_KEYS_DEFAULT, ); let rate_limiter = Arc::new(GatewayRateLimiter::new( config.gateway.pair_rate_limit_per_minute, config.gateway.webhook_rate_limit_per_minute, rate_limit_max_keys, )); let idempotency_max_keys = normalize_max_keys( config.gateway.idempotency_max_keys, IDEMPOTENCY_MAX_KEYS_DEFAULT, ); let idempotency_store = Arc::new(IdempotencyStore::new( Duration::from_secs(config.gateway.idempotency_ttl_secs.max(1)), idempotency_max_keys, )); // ── Tunnel ──────────────────────────────────────────────── let tunnel = crate::tunnel::create_tunnel(&config.tunnel)?; let mut tunnel_url: Option = None; if let Some(ref tun) = tunnel { println!("🔗 Starting {} tunnel...", tun.name()); match tun.start(host, actual_port).await { Ok(url) => { println!("🌐 Tunnel active: {url}"); tunnel_url = Some(url); } Err(e) => { println!("⚠️ Tunnel failed to start: {e}"); println!(" Falling back to local-only mode."); } } } println!("🦀 ZeroClaw Gateway listening on http://{display_addr}"); if let Some(ref url) = tunnel_url { println!(" 🌐 Public URL: {url}"); } println!(" 🌐 Web Dashboard: http://{display_addr}/"); println!(" POST /pair — pair a new client (X-Pairing-Code header)"); println!(" POST /webhook — {{\"message\": \"your prompt\"}}"); println!(" POST /api/chat — {{\"message\": \"...\", \"context\": [...]}} (tools-enabled, OpenClaw compat)"); if whatsapp_channel.is_some() { println!(" GET /whatsapp — Meta webhook verification"); println!(" POST /whatsapp — WhatsApp message webhook"); } if linq_channel.is_some() { println!(" POST /linq — Linq message webhook (iMessage/RCS/SMS)"); } if config.channels_config.github.is_some() { println!(" POST /github — GitHub issue/PR comment webhook"); } if bluebubbles_channel.is_some() { println!(" POST /bluebubbles — BlueBubbles iMessage webhook"); } if wati_channel.is_some() { println!(" GET /wati — WATI webhook verification"); println!(" POST /wati — WATI message webhook"); } if nextcloud_talk_channel.is_some() { println!(" POST /nextcloud-talk — Nextcloud Talk bot webhook"); } if qq_webhook_enabled { println!(" POST /qq — QQ Bot webhook (validation + events)"); } if config.gateway.node_control.enabled { println!(" POST /api/node-control — experimental node-control RPC scaffold"); } println!(" POST /v1/chat/completions — OpenAI-compatible (full agent loop)"); println!(" GET /v1/models — list available models"); println!(" GET /api/* — REST API (bearer token required)"); println!(" GET /ws/chat — WebSocket agent chat"); println!(" GET /health — health check"); println!(" GET /metrics — Prometheus metrics"); if let Some(code) = pairing.pairing_code() { println!(); println!(" 🔐 PAIRING REQUIRED — use this one-time code:"); println!(" ┌──────────────┐"); println!(" │ {code} │"); println!(" └──────────────┘"); println!(" Send: POST /pair with header X-Pairing-Code: {code}"); } else if pairing.require_pairing() { println!(" 🔒 Pairing: ACTIVE (bearer token required)"); } else { println!(" ⚠️ Pairing: DISABLED (all requests accepted)"); } println!(" Press Ctrl+C to stop.\n"); crate::health::mark_component_ok("gateway"); // Fire gateway start hook if let Some(ref hooks) = hooks { hooks.fire_gateway_start(host, actual_port).await; } // Wrap observer with broadcast capability for SSE // Use cost-tracking observer when cost tracking is enabled let base_observer = crate::observability::create_observer_with_cost_tracking( &config.observability, cost_tracker.clone(), &config.cost, ); let broadcast_observer: Arc = Arc::new(sse::BroadcastObserver::new(base_observer, event_tx.clone())); let state = AppState { config: config_state, provider, model, temperature, mem, auto_save: config.memory.auto_save, webhook_secret_hash, pairing, trust_forwarded_headers: config.gateway.trust_forwarded_headers, rate_limiter, idempotency_store, whatsapp: whatsapp_channel, whatsapp_app_secret, linq: linq_channel, linq_signing_secret, bluebubbles: bluebubbles_channel, bluebubbles_webhook_secret, nextcloud_talk: nextcloud_talk_channel, nextcloud_talk_webhook_secret, wati: wati_channel, qq: qq_channel, qq_webhook_enabled, observer: broadcast_observer, tools_registry, tools_registry_exec, multimodal: multimodal_config, max_tool_iterations, cost_tracker, event_tx, }; // Config PUT needs larger body limit (1MB) let config_put_router = Router::new() .route("/api/config", put(api::handle_api_config_put)) .layer(RequestBodyLimitLayer::new(1_048_576)); // The OpenAI-compatible endpoints use a larger body limit (512KB) because // chat histories can be much bigger than the default 64KB webhook limit. // They get their own nested router with a separate body limit layer. // // NOTE: The /v1/chat/completions handler routes through the full agent loop // (run_gateway_chat_with_tools) via openclaw_compat, giving OpenClaw callers // tools + memory support. The original simple-chat handler is preserved in // openai_compat.rs for reference. let openai_compat_routes = Router::new() .route( "/v1/chat/completions", post(openclaw_compat::handle_v1_chat_completions_with_tools), ) .layer(RequestBodyLimitLayer::new( openai_compat::CHAT_COMPLETIONS_MAX_BODY_SIZE, )); // Build router with middleware let app = Router::new() // ── Existing routes ── .route("/health", get(handle_health)) .route("/metrics", get(handle_metrics)) .route("/pair", post(handle_pair)) .route("/webhook", get(handle_webhook_usage).post(handle_webhook)) .route("/whatsapp", get(handle_whatsapp_verify)) .route("/whatsapp", post(handle_whatsapp_message)) .route("/linq", post(handle_linq_webhook)) .route("/github", post(handle_github_webhook)) .route("/bluebubbles", post(handle_bluebubbles_webhook)) .route("/wati", get(handle_wati_verify)) .route("/wati", post(handle_wati_webhook)) .route("/nextcloud-talk", post(handle_nextcloud_talk_webhook)) .route("/qq", post(handle_qq_webhook)) // ── OpenClaw migration: tools-enabled chat endpoint ── .route("/api/chat", post(openclaw_compat::handle_api_chat)) // ── OpenAI-compatible endpoints ── .route("/v1/models", get(openai_compat::handle_v1_models)) .merge(openai_compat_routes) // ── Web Dashboard API routes ── .route("/api/status", get(api::handle_api_status)) .route("/api/config", get(api::handle_api_config_get)) .route("/api/tools", get(api::handle_api_tools)) .route("/api/cron", get(api::handle_api_cron_list)) .route("/api/cron", post(api::handle_api_cron_add)) .route("/api/cron/{id}", delete(api::handle_api_cron_delete)) .route("/api/integrations", get(api::handle_api_integrations)) .route( "/api/doctor", get(api::handle_api_doctor).post(api::handle_api_doctor), ) .route("/api/memory", get(api::handle_api_memory_list)) .route("/api/memory", post(api::handle_api_memory_store)) .route("/api/memory/{key}", delete(api::handle_api_memory_delete)) .route("/api/pairing/devices", get(api::handle_api_pairing_devices)) .route( "/api/pairing/devices/{id}", delete(api::handle_api_pairing_device_revoke), ) .route("/api/cost", get(api::handle_api_cost)) .route("/api/cli-tools", get(api::handle_api_cli_tools)) .route("/api/health", get(api::handle_api_health)) .route("/api/node-control", post(handle_node_control)) // ── SSE event stream ── .route("/api/events", get(sse::handle_sse_events)) // ── WebSocket agent chat ── .route("/ws/chat", get(ws::handle_ws_chat)) // ── Static assets (web dashboard) ── .route("/_app/{*path}", get(static_files::handle_static)) // ── Config PUT with larger body limit ── .merge(config_put_router) .with_state(state) .layer(RequestBodyLimitLayer::new(MAX_BODY_SIZE)) .layer(TimeoutLayer::with_status_code( StatusCode::REQUEST_TIMEOUT, Duration::from_secs(REQUEST_TIMEOUT_SECS), )) // ── SPA fallback: non-API GET requests serve index.html ── .fallback(get(static_files::handle_spa_fallback)); // Run the server let serve_result = axum::serve( listener, app.into_make_service_with_connect_info::(), ) .await; if let Some(ref hooks) = hooks { hooks.fire_gateway_stop().await; } serve_result?; Ok(()) } // ══════════════════════════════════════════════════════════════════════════════ // AXUM HANDLERS // ══════════════════════════════════════════════════════════════════════════════ /// GET /health — always public (no secrets leaked) async fn handle_health(State(state): State) -> impl IntoResponse { let body = serde_json::json!({ "status": "ok", "paired": state.pairing.is_paired(), "require_pairing": state.pairing.require_pairing(), "runtime": crate::health::snapshot_json(), }); Json(body) } /// Prometheus content type for text exposition format. const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8"; /// GET /metrics — Prometheus text exposition format async fn handle_metrics( State(state): State, ConnectInfo(peer_addr): ConnectInfo, headers: HeaderMap, ) -> impl IntoResponse { if state.pairing.require_pairing() { let auth = headers .get(header::AUTHORIZATION) .and_then(|v| v.to_str().ok()) .unwrap_or(""); let token = auth.strip_prefix("Bearer ").unwrap_or("").trim(); if !state.pairing.is_authenticated(token) { return ( StatusCode::UNAUTHORIZED, [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)], String::from( "# unauthorized: provide Authorization: Bearer for /metrics\n", ), ); } } else if !peer_addr.ip().is_loopback() { return ( StatusCode::FORBIDDEN, [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)], String::from( "# metrics disabled for non-loopback clients when pairing is not required\n", ), ); } let body = if let Some(prom) = state .observer .as_ref() .as_any() .downcast_ref::() { prom.encode() } else { String::from("# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n") }; ( StatusCode::OK, [(header::CONTENT_TYPE, PROMETHEUS_CONTENT_TYPE)], body, ) } /// POST /pair — exchange one-time code for bearer token #[axum::debug_handler] async fn handle_pair( State(state): State, ConnectInfo(peer_addr): ConnectInfo, headers: HeaderMap, ) -> impl IntoResponse { let rate_key = client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers); if !state.rate_limiter.allow_pair(&rate_key) { tracing::warn!("/pair rate limit exceeded"); let err = serde_json::json!({ "error": "Too many pairing requests. Please retry later.", "retry_after": RATE_LIMIT_WINDOW_SECS, }); return (StatusCode::TOO_MANY_REQUESTS, Json(err)); } let code = headers .get("X-Pairing-Code") .and_then(|v| v.to_str().ok()) .unwrap_or(""); match state.pairing.try_pair(code, &rate_key).await { Ok(Some(token)) => { tracing::info!("🔐 New client paired successfully"); if let Err(err) = persist_pairing_tokens(state.config.clone(), &state.pairing).await { tracing::error!("🔐 Pairing succeeded but token persistence failed: {err:#}"); let body = serde_json::json!({ "paired": true, "persisted": false, "token": token, "message": "Paired for this process, but failed to persist token to config.toml. Check config path and write permissions.", }); return (StatusCode::OK, Json(body)); } let body = serde_json::json!({ "paired": true, "persisted": true, "token": token, "message": "Save this token — use it as Authorization: Bearer " }); (StatusCode::OK, Json(body)) } Ok(None) => { tracing::warn!("🔐 Pairing attempt with invalid code"); let err = serde_json::json!({"error": "Invalid pairing code"}); (StatusCode::FORBIDDEN, Json(err)) } Err(lockout_secs) => { tracing::warn!( "🔐 Pairing locked out — too many failed attempts ({lockout_secs}s remaining)" ); let err = serde_json::json!({ "error": format!("Too many failed attempts. Try again in {lockout_secs}s."), "retry_after": lockout_secs }); (StatusCode::TOO_MANY_REQUESTS, Json(err)) } } } async fn persist_pairing_tokens(config: Arc>, pairing: &PairingGuard) -> Result<()> { let paired_tokens = pairing.tokens(); // This is needed because parking_lot's guard is not Send so we clone the inner // this should be removed once async mutexes are used everywhere let mut updated_cfg = { config.lock().clone() }; updated_cfg.gateway.paired_tokens = paired_tokens; updated_cfg .save() .await .context("Failed to persist paired tokens to config.toml")?; // Keep shared runtime config in sync with persisted tokens. *config.lock() = updated_cfg; Ok(()) } /// Simple chat for webhook endpoint (no tools, for backward compatibility and testing). async fn prepare_gateway_messages_for_provider( state: &AppState, message: &str, ) -> anyhow::Result> { let user_messages = vec![ChatMessage::user(message)]; // Keep webhook/gateway prompts aligned with channel behavior by injecting // workspace-aware system context before model invocation. let system_prompt = { let config_guard = state.config.lock(); crate::channels::build_system_prompt( &config_guard.workspace_dir, &state.model, &[], // tools - empty for simple chat &[], // skills Some(&config_guard.identity), None, // bootstrap_max_chars - use default ) }; let mut messages = Vec::with_capacity(1 + user_messages.len()); messages.push(ChatMessage::system(system_prompt)); messages.extend(user_messages); let multimodal_config = state.config.lock().multimodal.clone(); let prepared = crate::multimodal::prepare_messages_for_provider(&messages, &multimodal_config).await?; Ok(prepared.messages) } /// Simple chat for webhook endpoint (no tools, for backward compatibility and testing). async fn run_gateway_chat_simple(state: &AppState, message: &str) -> anyhow::Result { let prepared_messages = prepare_gateway_messages_for_provider(state, message).await?; state .provider .chat_with_history(&prepared_messages, &state.model, state.temperature) .await } /// Full-featured chat with tools for channel handlers (WhatsApp, Linq, Nextcloud Talk). pub(super) async fn run_gateway_chat_with_tools( state: &AppState, message: &str, session_id: Option<&str>, ) -> anyhow::Result { let config = state.config.lock().clone(); crate::agent::process_message_with_session(config, message, session_id).await } fn gateway_outbound_leak_guard_snapshot( state: &AppState, ) -> crate::config::OutboundLeakGuardConfig { state.config.lock().security.outbound_leak_guard.clone() } fn sanitize_gateway_response( response: &str, tools: &[Box], leak_guard: &crate::config::OutboundLeakGuardConfig, ) -> String { match crate::channels::sanitize_channel_response(response, tools, leak_guard) { crate::channels::ChannelSanitizationResult::Sanitized(sanitized) => { if sanitized.is_empty() && !response.trim().is_empty() { "I encountered malformed tool-call output and could not produce a safe reply. Please try again." .to_string() } else { sanitized } } crate::channels::ChannelSanitizationResult::Blocked { .. } => { "I blocked a draft response because it appeared to contain credential material. Please ask for a redacted summary." .to_string() } } } /// Webhook request body #[derive(serde::Deserialize)] pub struct WebhookBody { pub message: String, #[serde(default)] pub stream: Option, #[serde(default)] pub session_id: Option, } #[derive(Debug, Clone, serde::Deserialize)] pub struct NodeControlRequest { pub method: String, #[serde(default)] pub node_id: Option, #[serde(default)] pub capability: Option, #[serde(default)] pub arguments: serde_json::Value, } fn node_id_allowed(node_id: &str, allowed_node_ids: &[String]) -> bool { if allowed_node_ids.is_empty() { return true; } allowed_node_ids .iter() .any(|candidate| candidate == "*" || candidate == node_id) } /// POST /api/node-control — experimental node-control protocol scaffold. /// /// Supported methods: /// - `node.list` /// - `node.describe` /// - `node.invoke` (stubbed as not implemented) async fn handle_node_control( State(state): State, headers: HeaderMap, body: Result, axum::extract::rejection::JsonRejection>, ) -> impl IntoResponse { // ── Bearer auth (pairing) ── if state.pairing.require_pairing() { let auth = headers .get(header::AUTHORIZATION) .and_then(|v| v.to_str().ok()) .unwrap_or(""); let token = auth.strip_prefix("Bearer ").unwrap_or(""); if !state.pairing.is_authenticated(token) { let err = serde_json::json!({ "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer " }); return (StatusCode::UNAUTHORIZED, Json(err)); } } let Json(request) = match body { Ok(body) => body, Err(e) => { tracing::warn!("Node-control JSON parse error: {e}"); let err = serde_json::json!({ "error": "Invalid JSON body for node-control request" }); return (StatusCode::BAD_REQUEST, Json(err)); } }; let node_control = { state.config.lock().gateway.node_control.clone() }; if !node_control.enabled { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Node-control API is disabled"})), ); } // Optional second-factor shared token for node-control endpoints. if let Some(expected_token) = node_control .auth_token .as_deref() .map(str::trim) .filter(|value| !value.is_empty()) { let provided_token = headers .get("X-Node-Control-Token") .and_then(|v| v.to_str().ok()) .map(str::trim) .unwrap_or(""); if !constant_time_eq(expected_token, provided_token) { return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "Invalid X-Node-Control-Token"})), ); } } let method = request.method.trim(); match method { "node.list" => { let nodes = node_control .allowed_node_ids .iter() .map(|node_id| { serde_json::json!({ "node_id": node_id, "status": "unpaired", "capabilities": [] }) }) .collect::>(); ( StatusCode::OK, Json(serde_json::json!({ "ok": true, "method": "node.list", "nodes": nodes })), ) } "node.describe" => { let Some(node_id) = request .node_id .as_deref() .map(str::trim) .filter(|value| !value.is_empty()) else { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "node_id is required for node.describe"})), ); }; if !node_id_allowed(node_id, &node_control.allowed_node_ids) { return ( StatusCode::FORBIDDEN, Json(serde_json::json!({"error": "node_id is not allowed"})), ); } ( StatusCode::OK, Json(serde_json::json!({ "ok": true, "method": "node.describe", "node_id": node_id, "description": { "status": "stub", "capabilities": [], "message": "Node descriptor scaffold is enabled; runtime backend is not wired yet." } })), ) } "node.invoke" => { let Some(node_id) = request .node_id .as_deref() .map(str::trim) .filter(|value| !value.is_empty()) else { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "node_id is required for node.invoke"})), ); }; if !node_id_allowed(node_id, &node_control.allowed_node_ids) { return ( StatusCode::FORBIDDEN, Json(serde_json::json!({"error": "node_id is not allowed"})), ); } ( StatusCode::NOT_IMPLEMENTED, Json(serde_json::json!({ "ok": false, "method": "node.invoke", "node_id": node_id, "capability": request.capability, "arguments": request.arguments, "error": "node.invoke backend is not implemented yet in this scaffold" })), ) } _ => ( StatusCode::BAD_REQUEST, Json(serde_json::json!({ "error": "Unsupported method", "supported_methods": ["node.list", "node.describe", "node.invoke"] })), ), } } /// POST /webhook — main webhook endpoint async fn handle_webhook_usage() -> impl IntoResponse { ( StatusCode::METHOD_NOT_ALLOWED, Json(serde_json::json!({ "error": "Use POST /webhook with a JSON body: {\"message\":\"...\"}", "method": "POST", "path": "/webhook", "example": { "message": "Hello from webhook" } })), ) } fn handle_webhook_streaming( state: AppState, prepared_messages: Vec, provider_label: String, model_label: String, started_at: Instant, ) -> Response { if !state.provider.supports_streaming() { let model_for_call = state.model.clone(); let provider_label_for_call = provider_label.clone(); let model_label_for_call = model_label.clone(); let state_for_call = state.clone(); let messages_for_call = prepared_messages.clone(); let stream = futures_util::stream::once(async move { match state_for_call .provider .chat_with_history( &messages_for_call, &model_for_call, state_for_call.temperature, ) .await { Ok(response) => { let leak_guard_cfg = gateway_outbound_leak_guard_snapshot(&state_for_call); let safe_response = sanitize_gateway_response( &response, state_for_call.tools_registry_exec.as_ref(), &leak_guard_cfg, ); let duration = started_at.elapsed(); state_for_call.observer.record_event( &crate::observability::ObserverEvent::LlmResponse { provider: provider_label_for_call.clone(), model: model_label_for_call.clone(), duration, success: true, error_message: None, input_tokens: None, output_tokens: None, }, ); state_for_call.observer.record_metric( &crate::observability::traits::ObserverMetric::RequestLatency(duration), ); state_for_call.observer.record_event( &crate::observability::ObserverEvent::AgentEnd { provider: provider_label_for_call, model: model_label_for_call, duration, tokens_used: None, cost_usd: None, }, ); let payload = serde_json::json!({"response": safe_response, "model": state_for_call.model}); let mut output = format!("data: {payload}\n\n"); output.push_str("data: [DONE]\n\n"); Ok::<_, std::io::Error>(Bytes::from(output)) } Err(e) => { let duration = started_at.elapsed(); let sanitized = providers::sanitize_api_error(&e.to_string()); state_for_call.observer.record_event( &crate::observability::ObserverEvent::LlmResponse { provider: provider_label_for_call.clone(), model: model_label_for_call.clone(), duration, success: false, error_message: Some(sanitized.clone()), input_tokens: None, output_tokens: None, }, ); state_for_call.observer.record_metric( &crate::observability::traits::ObserverMetric::RequestLatency(duration), ); state_for_call.observer.record_event( &crate::observability::ObserverEvent::Error { component: "gateway".to_string(), message: sanitized.clone(), }, ); state_for_call.observer.record_event( &crate::observability::ObserverEvent::AgentEnd { provider: provider_label_for_call, model: model_label_for_call, duration, tokens_used: None, cost_usd: None, }, ); tracing::error!("Webhook provider error: {}", sanitized); let mut output = format!( "data: {}\n\n", serde_json::json!({"error": "LLM request failed"}) ); output.push_str("data: [DONE]\n\n"); Ok(Bytes::from(output)) } } }); return Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "text/event-stream") .header(header::CACHE_CONTROL, "no-cache") .header(header::CONNECTION, "keep-alive") .body(Body::from_stream(stream)) .unwrap_or_else(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response()); } let provider_stream = state.provider.stream_chat_with_history( &prepared_messages, &state.model, state.temperature, crate::providers::traits::StreamOptions::new(true), ); let state_for_stream = state.clone(); let provider_label_for_stream = provider_label.clone(); let model_label_for_stream = model_label.clone(); let mut stream_failed = false; let sse_stream = provider_stream.map(move |result| match result { Ok(chunk) if chunk.is_final => { if !stream_failed { let duration = started_at.elapsed(); state_for_stream.observer.record_event( &crate::observability::ObserverEvent::LlmResponse { provider: provider_label_for_stream.clone(), model: model_label_for_stream.clone(), duration, success: true, error_message: None, input_tokens: None, output_tokens: None, }, ); state_for_stream.observer.record_metric( &crate::observability::traits::ObserverMetric::RequestLatency(duration), ); state_for_stream.observer.record_event( &crate::observability::ObserverEvent::AgentEnd { provider: provider_label_for_stream.clone(), model: model_label_for_stream.clone(), duration, tokens_used: None, cost_usd: None, }, ); } Ok::<_, std::io::Error>(Bytes::from("data: [DONE]\n\n")) } Ok(chunk) => { if chunk.delta.is_empty() { return Ok(Bytes::new()); } let payload = serde_json::json!({ "delta": chunk.delta, "model": model_label_for_stream }); Ok(Bytes::from(format!("data: {payload}\n\n"))) } Err(e) => { stream_failed = true; let duration = started_at.elapsed(); let sanitized = providers::sanitize_api_error(&e.to_string()); state_for_stream.observer.record_event( &crate::observability::ObserverEvent::LlmResponse { provider: provider_label_for_stream.clone(), model: model_label_for_stream.clone(), duration, success: false, error_message: Some(sanitized.clone()), input_tokens: None, output_tokens: None, }, ); state_for_stream.observer.record_metric( &crate::observability::traits::ObserverMetric::RequestLatency(duration), ); state_for_stream .observer .record_event(&crate::observability::ObserverEvent::Error { component: "gateway".to_string(), message: sanitized.clone(), }); state_for_stream.observer.record_event( &crate::observability::ObserverEvent::AgentEnd { provider: provider_label_for_stream.clone(), model: model_label_for_stream.clone(), duration, tokens_used: None, cost_usd: None, }, ); tracing::error!("Webhook streaming provider error: {}", sanitized); let output = format!( "data: {}\n\ndata: [DONE]\n\n", serde_json::json!({"error": "LLM request failed"}) ); Ok(Bytes::from(output)) } }); Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "text/event-stream") .header(header::CACHE_CONTROL, "no-cache") .header(header::CONNECTION, "keep-alive") .body(Body::from_stream(sse_stream)) .unwrap_or_else(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response()) } /// POST /webhook — main webhook endpoint async fn handle_webhook( State(state): State, ConnectInfo(peer_addr): ConnectInfo, headers: HeaderMap, body: Result, axum::extract::rejection::JsonRejection>, ) -> Response { let rate_key = client_key_from_request(Some(peer_addr), &headers, state.trust_forwarded_headers); if !state.rate_limiter.allow_webhook(&rate_key) { tracing::warn!("/webhook rate limit exceeded"); let err = serde_json::json!({ "error": "Too many webhook requests. Please retry later.", "retry_after": RATE_LIMIT_WINDOW_SECS, }); return (StatusCode::TOO_MANY_REQUESTS, Json(err)).into_response(); } // Require at least one auth layer for non-loopback traffic. if !state.pairing.require_pairing() && state.webhook_secret_hash.is_none() && !peer_addr.ip().is_loopback() { tracing::warn!( "Webhook: rejected unauthenticated non-loopback request (pairing disabled and no webhook secret configured)" ); let err = serde_json::json!({ "error": "Unauthorized — configure pairing or X-Webhook-Secret for non-local webhook access" }); return (StatusCode::UNAUTHORIZED, Json(err)).into_response(); } // ── Bearer token auth (pairing) ── if state.pairing.require_pairing() { let auth = headers .get(header::AUTHORIZATION) .and_then(|v| v.to_str().ok()) .unwrap_or(""); let token = auth.strip_prefix("Bearer ").unwrap_or(""); if !state.pairing.is_authenticated(token) { tracing::warn!("Webhook: rejected — not paired / invalid bearer token"); let err = serde_json::json!({ "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer " }); return (StatusCode::UNAUTHORIZED, Json(err)).into_response(); } } // ── Webhook secret auth (optional, additional layer) ── if let Some(ref secret_hash) = state.webhook_secret_hash { let header_hash = headers .get("X-Webhook-Secret") .and_then(|v| v.to_str().ok()) .map(str::trim) .filter(|value| !value.is_empty()) .map(hash_webhook_secret); match header_hash { Some(val) if constant_time_eq(&val, secret_hash.as_ref()) => {} _ => { tracing::warn!("Webhook: rejected request — invalid or missing X-Webhook-Secret"); let err = serde_json::json!({"error": "Unauthorized — invalid or missing X-Webhook-Secret header"}); return (StatusCode::UNAUTHORIZED, Json(err)).into_response(); } } } // ── Parse body ── let Json(webhook_body) = match body { Ok(b) => b, Err(e) => { tracing::warn!("Webhook JSON parse error: {e}"); let err = serde_json::json!({ "error": "Invalid JSON body. Expected: {\"message\": \"...\"}" }); return (StatusCode::BAD_REQUEST, Json(err)).into_response(); } }; // ── Idempotency (optional) ── if let Some(idempotency_key) = headers .get("X-Idempotency-Key") .and_then(|v| v.to_str().ok()) .map(str::trim) .filter(|value| !value.is_empty()) { if !state.idempotency_store.record_if_new(idempotency_key) { tracing::info!("Webhook duplicate ignored (idempotency key: {idempotency_key})"); let body = serde_json::json!({ "status": "duplicate", "idempotent": true, "message": "Request already processed for this idempotency key" }); return (StatusCode::OK, Json(body)).into_response(); } } let message = webhook_body.message.trim(); let webhook_session_id = webhook_body .session_id .as_deref() .map(str::trim) .filter(|value| !value.is_empty()); if message.is_empty() { let err = serde_json::json!({ "error": "The `message` field is required and must be a non-empty string." }); return (StatusCode::BAD_REQUEST, Json(err)).into_response(); } if state.auto_save { let key = webhook_memory_key(); let _ = state .mem .store( &key, message, MemoryCategory::Conversation, webhook_session_id, ) .await; } let provider_label = state .config .lock() .default_provider .clone() .unwrap_or_else(|| "unknown".to_string()); let model_label = state.model.clone(); let started_at = Instant::now(); state .observer .record_event(&crate::observability::ObserverEvent::AgentStart { provider: provider_label.clone(), model: model_label.clone(), }); state .observer .record_event(&crate::observability::ObserverEvent::LlmRequest { provider: provider_label.clone(), model: model_label.clone(), messages_count: 1, }); if webhook_body.stream.unwrap_or(false) { let prepared_messages = match prepare_gateway_messages_for_provider(&state, message).await { Ok(messages) => messages, Err(e) => { let duration = started_at.elapsed(); let sanitized = providers::sanitize_api_error(&e.to_string()); state .observer .record_event(&crate::observability::ObserverEvent::LlmResponse { provider: provider_label.clone(), model: model_label.clone(), duration, success: false, error_message: Some(sanitized.clone()), input_tokens: None, output_tokens: None, }); state.observer.record_metric( &crate::observability::traits::ObserverMetric::RequestLatency(duration), ); state .observer .record_event(&crate::observability::ObserverEvent::Error { component: "gateway".to_string(), message: sanitized.clone(), }); state .observer .record_event(&crate::observability::ObserverEvent::AgentEnd { provider: provider_label, model: model_label, duration, tokens_used: None, cost_usd: None, }); tracing::error!("Webhook streaming setup failed: {}", sanitized); let err = serde_json::json!({"error": "LLM request failed"}); return (StatusCode::INTERNAL_SERVER_ERROR, Json(err)).into_response(); } }; return handle_webhook_streaming( state, prepared_messages, provider_label, model_label, started_at, ); } match run_gateway_chat_simple(&state, message).await { Ok(response) => { let leak_guard_cfg = gateway_outbound_leak_guard_snapshot(&state); let safe_response = sanitize_gateway_response( &response, state.tools_registry_exec.as_ref(), &leak_guard_cfg, ); let duration = started_at.elapsed(); state .observer .record_event(&crate::observability::ObserverEvent::LlmResponse { provider: provider_label.clone(), model: model_label.clone(), duration, success: true, error_message: None, input_tokens: None, output_tokens: None, }); state.observer.record_metric( &crate::observability::traits::ObserverMetric::RequestLatency(duration), ); state .observer .record_event(&crate::observability::ObserverEvent::AgentEnd { provider: provider_label, model: model_label, duration, tokens_used: None, cost_usd: None, }); let body = serde_json::json!({"response": safe_response, "model": state.model}); (StatusCode::OK, Json(body)).into_response() } Err(e) => { let duration = started_at.elapsed(); let sanitized = providers::sanitize_api_error(&e.to_string()); state .observer .record_event(&crate::observability::ObserverEvent::LlmResponse { provider: provider_label.clone(), model: model_label.clone(), duration, success: false, error_message: Some(sanitized.clone()), input_tokens: None, output_tokens: None, }); state.observer.record_metric( &crate::observability::traits::ObserverMetric::RequestLatency(duration), ); state .observer .record_event(&crate::observability::ObserverEvent::Error { component: "gateway".to_string(), message: sanitized.clone(), }); state .observer .record_event(&crate::observability::ObserverEvent::AgentEnd { provider: provider_label, model: model_label, duration, tokens_used: None, cost_usd: None, }); tracing::error!("Webhook provider error: {}", sanitized); let err = serde_json::json!({"error": "LLM request failed"}); (StatusCode::INTERNAL_SERVER_ERROR, Json(err)).into_response() } } } /// `WhatsApp` verification query params #[derive(serde::Deserialize)] pub struct WhatsAppVerifyQuery { #[serde(rename = "hub.mode")] pub mode: Option, #[serde(rename = "hub.verify_token")] pub verify_token: Option, #[serde(rename = "hub.challenge")] pub challenge: Option, } /// GET /whatsapp — Meta webhook verification async fn handle_whatsapp_verify( State(state): State, Query(params): Query, ) -> impl IntoResponse { let Some(ref wa) = state.whatsapp else { return (StatusCode::NOT_FOUND, "WhatsApp not configured".to_string()); }; // Verify the token matches (constant-time comparison to prevent timing attacks) let token_matches = params .verify_token .as_deref() .is_some_and(|t| constant_time_eq(t, wa.verify_token())); if params.mode.as_deref() == Some("subscribe") && token_matches { if let Some(ch) = params.challenge { tracing::info!("WhatsApp webhook verified successfully"); return (StatusCode::OK, ch); } return (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string()); } tracing::warn!("WhatsApp webhook verification failed — token mismatch"); (StatusCode::FORBIDDEN, "Forbidden".to_string()) } /// Verify `WhatsApp` webhook signature (`X-Hub-Signature-256`). /// Returns true if the signature is valid, false otherwise. /// See: pub fn verify_whatsapp_signature(app_secret: &str, body: &[u8], signature_header: &str) -> bool { use ring::hmac; // Signature format: "sha256=" let Some(hex_sig) = signature_header.strip_prefix("sha256=") else { return false; }; // Decode hex signature let Ok(expected) = hex::decode(hex_sig) else { return false; }; let key = hmac::Key::new(hmac::HMAC_SHA256, app_secret.as_bytes()); hmac::verify(&key, body, &expected).is_ok() } /// POST /whatsapp — incoming message webhook async fn handle_whatsapp_message( State(state): State, headers: HeaderMap, body: Bytes, ) -> impl IntoResponse { let Some(ref wa) = state.whatsapp else { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "WhatsApp not configured"})), ); }; // ── Security: Verify X-Hub-Signature-256 if app_secret is configured ── if let Some(ref app_secret) = state.whatsapp_app_secret { let signature = headers .get("X-Hub-Signature-256") .and_then(|v| v.to_str().ok()) .unwrap_or(""); if !verify_whatsapp_signature(app_secret, &body, signature) { tracing::warn!( "WhatsApp webhook signature verification failed (signature: {})", if signature.is_empty() { "missing" } else { "invalid" } ); return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "Invalid signature"})), ); } } // Parse JSON body let Ok(payload) = serde_json::from_slice::(&body) else { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "Invalid JSON payload"})), ); }; // Parse messages from the webhook payload let messages = wa.parse_webhook_payload(&payload); if messages.is_empty() { // Acknowledge the webhook even if no messages (could be status updates) return (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))); } // Process each message for msg in &messages { tracing::info!( "WhatsApp message from {}: {}", msg.sender, truncate_with_ellipsis(&msg.content, 50) ); let session_id = gateway_message_session_id(msg); // Auto-save to memory if state.auto_save { let key = whatsapp_memory_key(msg); let _ = state .mem .store( &key, &msg.content, MemoryCategory::Conversation, Some(&session_id), ) .await; } match run_gateway_chat_with_tools(&state, &msg.content, Some(&session_id)).await { Ok(response) => { let leak_guard_cfg = gateway_outbound_leak_guard_snapshot(&state); let safe_response = sanitize_gateway_response( &response, state.tools_registry_exec.as_ref(), &leak_guard_cfg, ); // Send reply via WhatsApp if let Err(e) = wa .send(&SendMessage::new(safe_response, &msg.reply_target)) .await { tracing::error!("Failed to send WhatsApp reply: {e}"); } } Err(e) => { tracing::error!("LLM error for WhatsApp message: {e:#}"); let _ = wa .send(&SendMessage::new( "Sorry, I couldn't process your message right now.", &msg.reply_target, )) .await; } } } // Acknowledge the webhook (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))) } /// POST /linq — incoming message webhook (iMessage/RCS/SMS via Linq) async fn handle_linq_webhook( State(state): State, headers: HeaderMap, body: Bytes, ) -> impl IntoResponse { let Some(ref linq) = state.linq else { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Linq not configured"})), ); }; let body_str = String::from_utf8_lossy(&body); // ── Security: Verify X-Webhook-Signature if signing_secret is configured ── if let Some(ref signing_secret) = state.linq_signing_secret { let timestamp = headers .get("X-Webhook-Timestamp") .and_then(|v| v.to_str().ok()) .unwrap_or(""); let signature = headers .get("X-Webhook-Signature") .and_then(|v| v.to_str().ok()) .unwrap_or(""); if !crate::channels::linq::verify_linq_signature( signing_secret, &body_str, timestamp, signature, ) { tracing::warn!( "Linq webhook signature verification failed (signature: {})", if signature.is_empty() { "missing" } else { "invalid" } ); return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "Invalid signature"})), ); } } // Parse JSON body let Ok(payload) = serde_json::from_slice::(&body) else { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "Invalid JSON payload"})), ); }; // Parse messages from the webhook payload let messages = linq.parse_webhook_payload(&payload); if messages.is_empty() { if payload .get("event_type") .and_then(|v| v.as_str()) .is_some_and(|event| event == "message.received") { tracing::warn!( "Linq webhook message.received produced no actionable messages (possible unsupported payload shape)" ); } // Acknowledge the webhook even if no messages (could be status/delivery events) return (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))); } // Process each message for msg in &messages { tracing::info!( "Linq message from {}: {}", msg.sender, truncate_with_ellipsis(&msg.content, 50) ); let session_id = gateway_message_session_id(msg); // Auto-save to memory if state.auto_save { let key = linq_memory_key(msg); let _ = state .mem .store( &key, &msg.content, MemoryCategory::Conversation, Some(&session_id), ) .await; } // Call the LLM match run_gateway_chat_with_tools(&state, &msg.content, Some(&session_id)).await { Ok(response) => { let leak_guard_cfg = gateway_outbound_leak_guard_snapshot(&state); let safe_response = sanitize_gateway_response( &response, state.tools_registry_exec.as_ref(), &leak_guard_cfg, ); // Send reply via Linq if let Err(e) = linq .send(&SendMessage::new(safe_response, &msg.reply_target)) .await { tracing::error!("Failed to send Linq reply: {e}"); } } Err(e) => { tracing::error!("LLM error for Linq message: {e:#}"); let _ = linq .send(&SendMessage::new( "Sorry, I couldn't process your message right now.", &msg.reply_target, )) .await; } } } // Acknowledge the webhook (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))) } /// POST /github — incoming GitHub webhook (issue/PR comments) #[allow(clippy::large_futures)] async fn handle_github_webhook( State(state): State, headers: HeaderMap, body: Bytes, ) -> impl IntoResponse { let github_cfg = { let guard = state.config.lock(); guard.channels_config.github.clone() }; let Some(github_cfg) = github_cfg else { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "GitHub channel not configured"})), ); }; let access_token = std::env::var("ZEROCLAW_GITHUB_TOKEN") .ok() .map(|v| v.trim().to_string()) .filter(|v| !v.is_empty()) .unwrap_or_else(|| github_cfg.access_token.trim().to_string()); if access_token.is_empty() { tracing::error!( "GitHub webhook received but no access token is configured. \ Set channels_config.github.access_token or ZEROCLAW_GITHUB_TOKEN." ); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "GitHub access token is not configured"})), ); } let webhook_secret = std::env::var("ZEROCLAW_GITHUB_WEBHOOK_SECRET") .ok() .map(|v| v.trim().to_string()) .filter(|v| !v.is_empty()) .or_else(|| { github_cfg .webhook_secret .as_deref() .map(str::trim) .filter(|v| !v.is_empty()) .map(ToOwned::to_owned) }); let event_name = headers .get("X-GitHub-Event") .and_then(|v| v.to_str().ok()) .map(str::trim) .filter(|v| !v.is_empty()); let Some(event_name) = event_name else { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "Missing X-GitHub-Event header"})), ); }; if let Some(secret) = webhook_secret.as_deref() { let signature = headers .get("X-Hub-Signature-256") .and_then(|v| v.to_str().ok()) .unwrap_or(""); if !crate::channels::github::verify_github_signature(secret, &body, signature) { tracing::warn!( "GitHub webhook signature verification failed (signature: {})", if signature.is_empty() { "missing" } else { "invalid" } ); return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "Invalid signature"})), ); } } if let Some(delivery_id) = headers .get("X-GitHub-Delivery") .and_then(|v| v.to_str().ok()) .map(str::trim) .filter(|v| !v.is_empty()) { let key = format!("github:{delivery_id}"); if !state.idempotency_store.record_if_new(&key) { tracing::info!("GitHub webhook duplicate ignored (delivery: {delivery_id})"); return ( StatusCode::OK, Json( serde_json::json!({"status":"duplicate","idempotent":true,"delivery_id":delivery_id}), ), ); } } let Ok(payload) = serde_json::from_slice::(&body) else { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "Invalid JSON payload"})), ); }; let github = GitHubChannel::new( access_token, github_cfg.api_base_url.clone(), github_cfg.allowed_repos.clone(), ); let messages = github.parse_webhook_payload(event_name, &payload); if messages.is_empty() { return ( StatusCode::OK, Json(serde_json::json!({"status": "ok", "handled": false})), ); } for msg in &messages { tracing::info!( "GitHub webhook message from {}: {}", msg.sender, truncate_with_ellipsis(&msg.content, 80) ); if state.auto_save { let key = github_memory_key(msg); let _ = state .mem .store(&key, &msg.content, MemoryCategory::Conversation, None) .await; } match run_gateway_chat_with_tools(&state, &msg.content, None).await { Ok(response) => { let leak_guard_cfg = gateway_outbound_leak_guard_snapshot(&state); let safe_response = sanitize_gateway_response( &response, state.tools_registry_exec.as_ref(), &leak_guard_cfg, ); if let Err(e) = github .send( &SendMessage::new(safe_response, &msg.reply_target) .in_thread(msg.thread_ts.clone()), ) .await { tracing::error!("Failed to send GitHub reply: {e}"); } } Err(e) => { tracing::error!("LLM error for GitHub webhook message: {e:#}"); let _ = github .send( &SendMessage::new( "Sorry, I couldn't process your message right now.", &msg.reply_target, ) .in_thread(msg.thread_ts.clone()), ) .await; } } } ( StatusCode::OK, Json(serde_json::json!({"status": "ok", "handled": true})), ) } /// POST /bluebubbles — incoming BlueBubbles iMessage webhook async fn handle_bluebubbles_webhook( State(state): State, headers: HeaderMap, body: Bytes, ) -> impl IntoResponse { let Some(ref bluebubbles) = state.bluebubbles else { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "BlueBubbles not configured"})), ); }; // Verify Authorization: Bearer if configured if let Some(ref expected) = state.bluebubbles_webhook_secret { let provided = headers .get("Authorization") .and_then(|v| v.to_str().ok()) .and_then(|v| v.strip_prefix("Bearer ")); if !provided.is_some_and(|t| constant_time_eq(t, expected.as_ref())) { tracing::warn!("BlueBubbles webhook auth failed (missing or invalid Bearer token)"); return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "Unauthorized"})), ); } } let Ok(payload) = serde_json::from_slice::(&body) else { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "Invalid JSON payload"})), ); }; let messages = bluebubbles.parse_webhook_payload(&payload); if messages.is_empty() { return (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))); } for msg in &messages { tracing::info!( "BlueBubbles iMessage from {}: {}", msg.sender, truncate_with_ellipsis(&msg.content, 50) ); if state.auto_save { let key = bluebubbles_memory_key(msg); let _ = state .mem .store(&key, &msg.content, MemoryCategory::Conversation, None) .await; } let _ = bluebubbles.start_typing(&msg.reply_target).await; let leak_guard_cfg = gateway_outbound_leak_guard_snapshot(&state); match run_gateway_chat_with_tools(&state, &msg.content, None).await { Ok(response) => { let _ = bluebubbles.stop_typing(&msg.reply_target).await; let safe_response = sanitize_gateway_response( &response, state.tools_registry_exec.as_ref(), &leak_guard_cfg, ); if let Err(e) = bluebubbles .send(&SendMessage::new(safe_response, &msg.reply_target)) .await { tracing::error!("Failed to send BlueBubbles reply: {e}"); } } Err(e) => { let _ = bluebubbles.stop_typing(&msg.reply_target).await; tracing::error!("LLM error for BlueBubbles message: {e:#}"); let _ = bluebubbles .send(&SendMessage::new( "Sorry, I couldn't process your message right now.", &msg.reply_target, )) .await; } } } (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))) } /// GET /wati — WATI webhook verification (echoes hub.challenge) async fn handle_wati_verify( State(state): State, Query(params): Query, ) -> impl IntoResponse { if state.wati.is_none() { return (StatusCode::NOT_FOUND, "WATI not configured".to_string()); } // WATI may use Meta-style webhook verification; echo the challenge if let Some(challenge) = params.challenge { tracing::info!("WATI webhook verified successfully"); return (StatusCode::OK, challenge); } (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string()) } #[derive(Debug, serde::Deserialize)] pub struct WatiVerifyQuery { #[serde(rename = "hub.challenge")] pub challenge: Option, } /// POST /wati — incoming WATI WhatsApp message webhook async fn handle_wati_webhook(State(state): State, body: Bytes) -> impl IntoResponse { let Some(ref wati) = state.wati else { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "WATI not configured"})), ); }; // Parse JSON body let Ok(payload) = serde_json::from_slice::(&body) else { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "Invalid JSON payload"})), ); }; // Parse messages from the webhook payload let messages = wati.parse_webhook_payload(&payload); if messages.is_empty() { return (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))); } // Process each message for msg in &messages { tracing::info!( "WATI message from {}: {}", msg.sender, truncate_with_ellipsis(&msg.content, 50) ); let session_id = gateway_message_session_id(msg); // Auto-save to memory if state.auto_save { let key = wati_memory_key(msg); let _ = state .mem .store( &key, &msg.content, MemoryCategory::Conversation, Some(&session_id), ) .await; } // Call the LLM match run_gateway_chat_with_tools(&state, &msg.content, Some(&session_id)).await { Ok(response) => { let leak_guard_cfg = gateway_outbound_leak_guard_snapshot(&state); let safe_response = sanitize_gateway_response( &response, state.tools_registry_exec.as_ref(), &leak_guard_cfg, ); // Send reply via WATI if let Err(e) = wati .send(&SendMessage::new(safe_response, &msg.reply_target)) .await { tracing::error!("Failed to send WATI reply: {e}"); } } Err(e) => { tracing::error!("LLM error for WATI message: {e:#}"); let _ = wati .send(&SendMessage::new( "Sorry, I couldn't process your message right now.", &msg.reply_target, )) .await; } } } // Acknowledge the webhook (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))) } /// POST /nextcloud-talk — incoming message webhook (Nextcloud Talk bot API) async fn handle_nextcloud_talk_webhook( State(state): State, headers: HeaderMap, body: Bytes, ) -> impl IntoResponse { let Some(ref nextcloud_talk) = state.nextcloud_talk else { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Nextcloud Talk not configured"})), ); }; let body_str = String::from_utf8_lossy(&body); // ── Security: Verify Nextcloud Talk HMAC signature if secret is configured ── if let Some(ref webhook_secret) = state.nextcloud_talk_webhook_secret { let random = headers .get("X-Nextcloud-Talk-Random") .and_then(|v| v.to_str().ok()) .unwrap_or(""); let signature = headers .get("X-Nextcloud-Talk-Signature") .and_then(|v| v.to_str().ok()) .unwrap_or(""); if !crate::channels::nextcloud_talk::verify_nextcloud_talk_signature( webhook_secret, random, &body_str, signature, ) { tracing::warn!( "Nextcloud Talk webhook signature verification failed (signature: {})", if signature.is_empty() { "missing" } else { "invalid" } ); return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "Invalid signature"})), ); } } // Parse JSON body let Ok(payload) = serde_json::from_slice::(&body) else { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "Invalid JSON payload"})), ); }; // Parse messages from webhook payload let messages = nextcloud_talk.parse_webhook_payload(&payload); if messages.is_empty() { // Acknowledge webhook even if payload does not contain actionable user messages. return (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))); } for msg in &messages { tracing::info!( "Nextcloud Talk message from {}: {}", msg.sender, truncate_with_ellipsis(&msg.content, 50) ); let session_id = gateway_message_session_id(msg); if state.auto_save { let key = nextcloud_talk_memory_key(msg); let _ = state .mem .store( &key, &msg.content, MemoryCategory::Conversation, Some(&session_id), ) .await; } match run_gateway_chat_with_tools(&state, &msg.content, Some(&session_id)).await { Ok(response) => { let leak_guard_cfg = gateway_outbound_leak_guard_snapshot(&state); let safe_response = sanitize_gateway_response( &response, state.tools_registry_exec.as_ref(), &leak_guard_cfg, ); if let Err(e) = nextcloud_talk .send(&SendMessage::new(safe_response, &msg.reply_target)) .await { tracing::error!("Failed to send Nextcloud Talk reply: {e}"); } } Err(e) => { tracing::error!("LLM error for Nextcloud Talk message: {e:#}"); let _ = nextcloud_talk .send(&SendMessage::new( "Sorry, I couldn't process your message right now.", &msg.reply_target, )) .await; } } } (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))) } /// POST /qq — incoming QQ Bot webhook (validation + events) async fn handle_qq_webhook( State(state): State, headers: HeaderMap, body: Bytes, ) -> impl IntoResponse { let Some(ref qq) = state.qq else { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "QQ not configured"})), ); }; if !state.qq_webhook_enabled { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "QQ webhook mode not enabled"})), ); } let app_id_header = headers .get("X-Bot-Appid") .and_then(|v| v.to_str().ok()) .map(str::trim) .unwrap_or(""); if !app_id_header.is_empty() && !constant_time_eq(app_id_header, qq.app_id()) { tracing::warn!("QQ webhook rejected due to mismatched X-Bot-Appid"); return ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "Invalid X-Bot-Appid"})), ); } let Ok(payload) = serde_json::from_slice::(&body) else { return ( StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "Invalid JSON payload"})), ); }; if let Some(validation_response) = qq.build_webhook_validation_response(&payload) { tracing::info!("QQ webhook validation challenge accepted"); return (StatusCode::OK, Json(validation_response)); } let messages = qq.parse_webhook_payload(&payload).await; if messages.is_empty() { return (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))); } for msg in &messages { tracing::info!( "QQ webhook message from {}: {}", msg.sender, truncate_with_ellipsis(&msg.content, 50) ); let session_id = gateway_message_session_id(msg); if state.auto_save { let key = qq_memory_key(msg); let _ = state .mem .store( &key, &msg.content, MemoryCategory::Conversation, Some(&session_id), ) .await; } match run_gateway_chat_with_tools(&state, &msg.content, Some(&session_id)).await { Ok(response) => { let leak_guard_cfg = gateway_outbound_leak_guard_snapshot(&state); let safe_response = sanitize_gateway_response( &response, state.tools_registry_exec.as_ref(), &leak_guard_cfg, ); if let Err(e) = qq .send( &SendMessage::new(safe_response, &msg.reply_target) .in_thread(msg.thread_ts.clone()), ) .await { tracing::error!("Failed to send QQ reply: {e}"); } } Err(e) => { tracing::error!("LLM error for QQ webhook message: {e:#}"); let _ = qq .send( &SendMessage::new( "Sorry, I couldn't process your message right now.", &msg.reply_target, ) .in_thread(msg.thread_ts.clone()), ) .await; } } } (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))) } #[cfg(test)] mod tests { use super::*; use crate::channels::traits::ChannelMessage; use crate::memory::{Memory, MemoryCategory, MemoryEntry}; use crate::providers::Provider; use async_trait::async_trait; use axum::http::HeaderValue; use axum::response::IntoResponse; use http_body_util::BodyExt; use parking_lot::Mutex; use std::sync::atomic::{AtomicUsize, Ordering}; /// Generate a random hex secret at runtime to avoid hard-coded cryptographic values. fn generate_test_secret() -> String { let bytes: [u8; 32] = rand::random(); hex::encode(bytes) } #[test] fn security_body_limit_is_64kb() { assert_eq!(MAX_BODY_SIZE, 65_536); } #[test] fn security_timeout_is_30_seconds() { assert_eq!(REQUEST_TIMEOUT_SECS, 30); } #[test] fn webhook_body_requires_message_field() { let valid = r#"{"message": "hello"}"#; let parsed: Result = serde_json::from_str(valid); assert!(parsed.is_ok()); let parsed = parsed.unwrap(); assert_eq!(parsed.message, "hello"); assert_eq!(parsed.stream, None); let stream_enabled = r#"{"message": "hello", "stream": true}"#; let parsed: Result = serde_json::from_str(stream_enabled); assert!(parsed.is_ok()); assert_eq!(parsed.unwrap().stream, Some(true)); let missing = r#"{"other": "field"}"#; let parsed: Result = serde_json::from_str(missing); assert!(parsed.is_err()); } #[tokio::test] async fn webhook_get_usage_returns_explicit_method_hint() { let response = handle_webhook_usage().await.into_response(); assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED); let payload = response.into_body().collect().await.unwrap().to_bytes(); let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap(); assert_eq!(parsed["method"], "POST"); assert_eq!(parsed["path"], "/webhook"); assert_eq!(parsed["example"]["message"], "Hello from webhook"); } #[test] fn whatsapp_query_fields_are_optional() { let q = WhatsAppVerifyQuery { mode: None, verify_token: None, challenge: None, }; assert!(q.mode.is_none()); } #[test] fn node_id_allowed_with_empty_allowlist_accepts_any() { assert!(node_id_allowed("node-a", &[])); } #[test] fn node_id_allowed_respects_allowlist() { let allow = vec!["node-1".to_string(), "node-2".to_string()]; assert!(node_id_allowed("node-1", &allow)); assert!(!node_id_allowed("node-9", &allow)); } #[test] fn app_state_is_clone() { fn assert_clone() {} assert_clone::(); } #[tokio::test] async fn metrics_endpoint_returns_hint_when_prometheus_is_disabled() { let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider: Arc::new(MockProvider::default()), model: "test-model".into(), temperature: 0.0, mem: Arc::new(MockMemory), auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let response = handle_metrics(State(state), test_connect_info(), HeaderMap::new()) .await .into_response(); assert_eq!(response.status(), StatusCode::OK); assert_eq!( response .headers() .get(header::CONTENT_TYPE) .and_then(|value| value.to_str().ok()), Some(PROMETHEUS_CONTENT_TYPE) ); let body = response.into_body().collect().await.unwrap().to_bytes(); let text = String::from_utf8(body.to_vec()).unwrap(); assert!(text.contains("Prometheus backend not enabled")); } #[tokio::test] async fn metrics_endpoint_renders_prometheus_output() { let prom = Arc::new( crate::observability::PrometheusObserver::new() .expect("prometheus observer should initialize in tests"), ); crate::observability::Observer::record_event( prom.as_ref(), &crate::observability::ObserverEvent::HeartbeatTick, ); let observer: Arc = prom; let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider: Arc::new(MockProvider::default()), model: "test-model".into(), temperature: 0.0, mem: Arc::new(MockMemory), auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer, tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let response = handle_metrics(State(state), test_connect_info(), HeaderMap::new()) .await .into_response(); assert_eq!(response.status(), StatusCode::OK); let body = response.into_body().collect().await.unwrap().to_bytes(); let text = String::from_utf8(body.to_vec()).unwrap(); assert!(text.contains("zeroclaw_heartbeat_ticks_total 1")); } #[tokio::test] async fn metrics_endpoint_rejects_public_clients_when_pairing_is_disabled() { let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider: Arc::new(MockProvider::default()), model: "test-model".into(), temperature: 0.0, mem: Arc::new(MockMemory), auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let response = handle_metrics(State(state), test_public_connect_info(), HeaderMap::new()) .await .into_response(); assert_eq!(response.status(), StatusCode::FORBIDDEN); let body = response.into_body().collect().await.unwrap().to_bytes(); let text = String::from_utf8(body.to_vec()).unwrap(); assert!(text.contains("non-loopback")); } #[tokio::test] async fn metrics_endpoint_requires_bearer_token_when_pairing_is_enabled() { let paired_token = "zc_test_token".to_string(); let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider: Arc::new(MockProvider::default()), model: "test-model".into(), temperature: 0.0, mem: Arc::new(MockMemory), auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(true, std::slice::from_ref(&paired_token))), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let unauthorized = handle_metrics(State(state.clone()), test_connect_info(), HeaderMap::new()) .await .into_response(); assert_eq!(unauthorized.status(), StatusCode::UNAUTHORIZED); let mut headers = HeaderMap::new(); headers.insert( header::AUTHORIZATION, HeaderValue::from_str(&format!("Bearer {paired_token}")).unwrap(), ); let authorized = handle_metrics(State(state), test_connect_info(), headers) .await .into_response(); assert_eq!(authorized.status(), StatusCode::OK); } #[test] fn gateway_rate_limiter_blocks_after_limit() { let limiter = GatewayRateLimiter::new(2, 2, 100); assert!(limiter.allow_pair("127.0.0.1")); assert!(limiter.allow_pair("127.0.0.1")); assert!(!limiter.allow_pair("127.0.0.1")); } #[test] fn rate_limiter_sweep_removes_stale_entries() { let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 100); // Add entries for multiple IPs assert!(limiter.allow("ip-1")); assert!(limiter.allow("ip-2")); assert!(limiter.allow("ip-3")); { let guard = limiter.requests.lock(); assert_eq!(guard.0.len(), 3); } // Force a sweep by backdating last_sweep { let mut guard = limiter.requests.lock(); guard.1 = Instant::now() .checked_sub(Duration::from_secs(RATE_LIMITER_SWEEP_INTERVAL_SECS + 1)) .unwrap(); // Clear timestamps for ip-2 and ip-3 to simulate stale entries guard.0.get_mut("ip-2").unwrap().clear(); guard.0.get_mut("ip-3").unwrap().clear(); } // Next allow() call should trigger sweep and remove stale entries assert!(limiter.allow("ip-1")); { let guard = limiter.requests.lock(); assert_eq!(guard.0.len(), 1, "Stale entries should have been swept"); assert!(guard.0.contains_key("ip-1")); } } #[test] fn rate_limiter_zero_limit_always_allows() { let limiter = SlidingWindowRateLimiter::new(0, Duration::from_secs(60), 10); for _ in 0..100 { assert!(limiter.allow("any-key")); } } #[test] fn idempotency_store_rejects_duplicate_key() { let store = IdempotencyStore::new(Duration::from_secs(30), 10); assert!(store.record_if_new("req-1")); assert!(!store.record_if_new("req-1")); assert!(store.record_if_new("req-2")); } #[test] fn rate_limiter_bounded_cardinality_evicts_oldest_key() { let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 2); assert!(limiter.allow("ip-1")); assert!(limiter.allow("ip-2")); assert!(limiter.allow("ip-3")); let guard = limiter.requests.lock(); assert_eq!(guard.0.len(), 2); assert!(guard.0.contains_key("ip-2")); assert!(guard.0.contains_key("ip-3")); } #[test] fn idempotency_store_bounded_cardinality_evicts_oldest_key() { let store = IdempotencyStore::new(Duration::from_secs(300), 2); assert!(store.record_if_new("k1")); std::thread::sleep(Duration::from_millis(2)); assert!(store.record_if_new("k2")); std::thread::sleep(Duration::from_millis(2)); assert!(store.record_if_new("k3")); let keys = store.keys.lock(); assert_eq!(keys.len(), 2); assert!(!keys.contains_key("k1")); assert!(keys.contains_key("k2")); assert!(keys.contains_key("k3")); } #[test] fn client_key_defaults_to_peer_addr_when_untrusted_proxy_mode() { let peer = SocketAddr::from(([10, 0, 0, 5], 42617)); let mut headers = HeaderMap::new(); headers.insert( "X-Forwarded-For", HeaderValue::from_static("198.51.100.10, 203.0.113.11"), ); let key = client_key_from_request(Some(peer), &headers, false); assert_eq!(key, "10.0.0.5"); } #[test] fn client_key_uses_forwarded_ip_only_in_trusted_proxy_mode() { let peer = SocketAddr::from(([10, 0, 0, 5], 42617)); let mut headers = HeaderMap::new(); headers.insert( "X-Forwarded-For", HeaderValue::from_static("198.51.100.10, 203.0.113.11"), ); let key = client_key_from_request(Some(peer), &headers, true); assert_eq!(key, "198.51.100.10"); } #[test] fn client_key_falls_back_to_peer_when_forwarded_header_invalid() { let peer = SocketAddr::from(([10, 0, 0, 5], 42617)); let mut headers = HeaderMap::new(); headers.insert("X-Forwarded-For", HeaderValue::from_static("garbage-value")); let key = client_key_from_request(Some(peer), &headers, true); assert_eq!(key, "10.0.0.5"); } #[test] fn normalize_max_keys_uses_fallback_for_zero() { assert_eq!(normalize_max_keys(0, 10_000), 10_000); assert_eq!(normalize_max_keys(0, 0), 1); } #[test] fn normalize_max_keys_preserves_nonzero_values() { assert_eq!(normalize_max_keys(2_048, 10_000), 2_048); assert_eq!(normalize_max_keys(1, 10_000), 1); } #[tokio::test] async fn persist_pairing_tokens_writes_config_tokens() { let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("config.toml"); let workspace_path = temp.path().join("workspace"); let mut config = Config::default(); config.config_path = config_path.clone(); config.workspace_dir = workspace_path; config.save().await.unwrap(); let guard = PairingGuard::new(true, &[]); let code = guard.pairing_code().unwrap(); let token = guard.try_pair(&code, "test_client").await.unwrap().unwrap(); assert!(guard.is_authenticated(&token)); let shared_config = Arc::new(Mutex::new(config)); persist_pairing_tokens(shared_config.clone(), &guard) .await .unwrap(); let saved = tokio::fs::read_to_string(config_path).await.unwrap(); let parsed: Config = toml::from_str(&saved).unwrap(); assert_eq!(parsed.gateway.paired_tokens.len(), 1); let persisted = &parsed.gateway.paired_tokens[0]; assert!(crate::security::SecretStore::is_encrypted(persisted)); let store = crate::security::SecretStore::new(temp.path(), true); let decrypted = store.decrypt(persisted).unwrap(); assert_eq!(decrypted.len(), 64); assert!(decrypted.chars().all(|c| c.is_ascii_hexdigit())); let in_memory = shared_config.lock(); assert_eq!(in_memory.gateway.paired_tokens.len(), 1); assert_eq!(&in_memory.gateway.paired_tokens[0], &decrypted); } #[test] fn webhook_memory_key_is_unique() { let key1 = webhook_memory_key(); let key2 = webhook_memory_key(); assert!(key1.starts_with("webhook_msg_")); assert!(key2.starts_with("webhook_msg_")); assert_ne!(key1, key2); } #[test] fn whatsapp_memory_key_includes_sender_and_message_id() { let msg = ChannelMessage { id: "wamid-123".into(), sender: "+1234567890".into(), reply_target: "+1234567890".into(), content: "hello".into(), channel: "whatsapp".into(), timestamp: 1, thread_ts: None, }; let key = whatsapp_memory_key(&msg); assert_eq!(key, "whatsapp_+1234567890_wamid-123"); } #[test] fn qq_memory_key_includes_sender_and_message_id() { let msg = ChannelMessage { id: "msg-123".into(), sender: "user_openid".into(), reply_target: "user:user_openid".into(), content: "hello".into(), channel: "qq".into(), timestamp: 1, thread_ts: Some("msg-123".into()), }; let key = qq_memory_key(&msg); assert_eq!(key, "qq_user_openid_msg-123"); } struct MockScheduleTool; #[async_trait] impl Tool for MockScheduleTool { fn name(&self) -> &str { "schedule" } fn description(&self) -> &str { "Mock schedule tool" } fn parameters_schema(&self) -> serde_json::Value { serde_json::json!({ "type": "object", "properties": { "action": { "type": "string" } }, "required": ["action"] }) } async fn execute( &self, _args: serde_json::Value, ) -> anyhow::Result { Ok(crate::tools::ToolResult { success: true, output: "ok".to_string(), error: None, }) } } #[test] fn sanitize_gateway_response_removes_tool_call_tags() { let input = r#"Before {"name":"schedule","arguments":{"action":"create"}} After"#; let leak_guard = crate::config::OutboundLeakGuardConfig::default(); let result = sanitize_gateway_response(input, &[], &leak_guard); let normalized = result .lines() .filter(|line| !line.trim().is_empty()) .collect::>() .join("\n"); assert_eq!(normalized, "Before\nAfter"); assert!(!result.contains("")); assert!(!result.contains("\"name\":\"schedule\"")); } #[test] fn sanitize_gateway_response_removes_isolated_tool_json_artifacts() { let tools: Vec> = vec![Box::new(MockScheduleTool)]; let input = r#"{"name":"schedule","parameters":{"action":"create"}} {"result":{"status":"scheduled"}} Reminder set successfully."#; let leak_guard = crate::config::OutboundLeakGuardConfig::default(); let result = sanitize_gateway_response(input, &tools, &leak_guard); assert_eq!(result, "Reminder set successfully."); assert!(!result.contains("\"name\":\"schedule\"")); assert!(!result.contains("\"result\"")); } #[test] fn sanitize_gateway_response_blocks_detected_credentials_when_configured() { let tools: Vec> = Vec::new(); let leak_guard = crate::config::OutboundLeakGuardConfig { enabled: true, action: crate::config::OutboundLeakGuardAction::Block, sensitivity: 0.7, }; let result = sanitize_gateway_response("Temporary key: AKIAABCDEFGHIJKLMNOP", &tools, &leak_guard); assert!(result.contains("blocked a draft response")); } #[derive(Default)] struct MockMemory; #[async_trait] impl Memory for MockMemory { fn name(&self) -> &str { "mock" } async fn store( &self, _key: &str, _content: &str, _category: MemoryCategory, _session_id: Option<&str>, ) -> anyhow::Result<()> { Ok(()) } async fn recall( &self, _query: &str, _limit: usize, _session_id: Option<&str>, ) -> anyhow::Result> { Ok(Vec::new()) } async fn get(&self, _key: &str) -> anyhow::Result> { Ok(None) } async fn list( &self, _category: Option<&MemoryCategory>, _session_id: Option<&str>, ) -> anyhow::Result> { Ok(Vec::new()) } async fn forget(&self, _key: &str) -> anyhow::Result { Ok(false) } async fn count(&self) -> anyhow::Result { Ok(0) } async fn health_check(&self) -> bool { true } } #[derive(Default)] struct MockProvider { calls: AtomicUsize, } #[async_trait] impl Provider for MockProvider { async fn chat_with_system( &self, _system_prompt: Option<&str>, _message: &str, _model: &str, _temperature: f64, ) -> anyhow::Result { self.calls.fetch_add(1, Ordering::SeqCst); Ok("ok".into()) } } #[derive(Default)] struct TrackingMemory { keys: Mutex>, } #[async_trait] impl Memory for TrackingMemory { fn name(&self) -> &str { "tracking" } async fn store( &self, key: &str, _content: &str, _category: MemoryCategory, _session_id: Option<&str>, ) -> anyhow::Result<()> { self.keys.lock().push(key.to_string()); Ok(()) } async fn recall( &self, _query: &str, _limit: usize, _session_id: Option<&str>, ) -> anyhow::Result> { Ok(Vec::new()) } async fn get(&self, _key: &str) -> anyhow::Result> { Ok(None) } async fn list( &self, _category: Option<&MemoryCategory>, _session_id: Option<&str>, ) -> anyhow::Result> { Ok(Vec::new()) } async fn forget(&self, _key: &str) -> anyhow::Result { Ok(false) } async fn count(&self) -> anyhow::Result { let size = self.keys.lock().len(); Ok(size) } async fn health_check(&self) -> bool { true } } fn test_connect_info() -> ConnectInfo { ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 30_300))) } fn test_public_connect_info() -> ConnectInfo { ConnectInfo(SocketAddr::from(([203, 0, 113, 10], 30_300))) } #[tokio::test] async fn webhook_idempotency_skips_duplicate_provider_calls() { let provider_impl = Arc::new(MockProvider::default()); let provider: Arc = provider_impl.clone(); let memory: Arc = Arc::new(MockMemory); let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let mut headers = HeaderMap::new(); headers.insert("X-Idempotency-Key", HeaderValue::from_static("abc-123")); let body = Ok(Json(WebhookBody { message: "hello".into(), stream: None, session_id: None, })); let first = handle_webhook( State(state.clone()), test_connect_info(), headers.clone(), body, ) .await .into_response(); assert_eq!(first.status(), StatusCode::OK); let body = Ok(Json(WebhookBody { message: "hello".into(), stream: None, session_id: None, })); let second = handle_webhook(State(state), test_connect_info(), headers, body) .await .into_response(); assert_eq!(second.status(), StatusCode::OK); let payload = second.into_body().collect().await.unwrap().to_bytes(); let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap(); assert_eq!(parsed["status"], "duplicate"); assert_eq!(parsed["idempotent"], true); assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1); } #[tokio::test] async fn webhook_rejects_public_traffic_without_auth_layers() { let provider_impl = Arc::new(MockProvider::default()); let provider: Arc = provider_impl; let memory: Arc = Arc::new(MockMemory); let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let response = handle_webhook( State(state), test_public_connect_info(), HeaderMap::new(), Ok(Json(WebhookBody { message: "hello".into(), stream: None, session_id: None, })), ) .await .into_response(); assert_eq!(response.status(), StatusCode::UNAUTHORIZED); } #[tokio::test] async fn webhook_rejects_empty_message() { let provider_impl = Arc::new(MockProvider::default()); let provider: Arc = provider_impl.clone(); let memory: Arc = Arc::new(MockMemory); let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let response = handle_webhook( State(state), test_connect_info(), HeaderMap::new(), Ok(Json(WebhookBody { message: " ".into(), stream: None, session_id: None, })), ) .await .into_response(); assert_eq!(response.status(), StatusCode::BAD_REQUEST); assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0); } #[tokio::test] async fn webhook_stream_response_uses_sse_content_type() { let provider_impl = Arc::new(MockProvider::default()); let provider: Arc = provider_impl.clone(); let memory: Arc = Arc::new(MockMemory); let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let response = handle_webhook( State(state), test_connect_info(), HeaderMap::new(), Ok(Json(WebhookBody { message: "stream me".into(), stream: Some(true), session_id: None, })), ) .await .into_response(); assert_eq!(response.status(), StatusCode::OK); let content_type = response .headers() .get(header::CONTENT_TYPE) .and_then(|v| v.to_str().ok()) .unwrap_or(""); assert!(content_type.starts_with("text/event-stream")); let payload = response.into_body().collect().await.unwrap().to_bytes(); let text = String::from_utf8_lossy(&payload); assert!(text.contains("data: [DONE]")); assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1); } #[tokio::test] async fn node_control_returns_not_found_when_disabled() { let provider: Arc = Arc::new(MockProvider::default()); let memory: Arc = Arc::new(MockMemory); let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let response = handle_node_control( State(state), HeaderMap::new(), Ok(Json(NodeControlRequest { method: "node.list".into(), node_id: None, capability: None, arguments: serde_json::Value::Null, })), ) .await .into_response(); assert_eq!(response.status(), StatusCode::NOT_FOUND); } #[tokio::test] async fn node_control_list_returns_stub_nodes_when_enabled() { let provider: Arc = Arc::new(MockProvider::default()); let memory: Arc = Arc::new(MockMemory); let mut config = Config::default(); config.gateway.node_control.enabled = true; config.gateway.node_control.allowed_node_ids = vec!["node-1".into(), "node-2".into()]; let state = AppState { config: Arc::new(Mutex::new(config)), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let response = handle_node_control( State(state), HeaderMap::new(), Ok(Json(NodeControlRequest { method: "node.list".into(), node_id: None, capability: None, arguments: serde_json::Value::Null, })), ) .await .into_response(); assert_eq!(response.status(), StatusCode::OK); let payload = response.into_body().collect().await.unwrap().to_bytes(); let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap(); assert_eq!(parsed["ok"], true); assert_eq!(parsed["method"], "node.list"); assert_eq!(parsed["nodes"].as_array().map(|v| v.len()), Some(2)); } #[tokio::test] async fn webhook_autosave_stores_distinct_keys_per_request() { let provider_impl = Arc::new(MockProvider::default()); let provider: Arc = provider_impl.clone(); let tracking_impl = Arc::new(TrackingMemory::default()); let memory: Arc = tracking_impl.clone(); let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: true, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let headers = HeaderMap::new(); let body1 = Ok(Json(WebhookBody { message: "hello one".into(), stream: None, session_id: None, })); let first = handle_webhook( State(state.clone()), test_connect_info(), headers.clone(), body1, ) .await .into_response(); assert_eq!(first.status(), StatusCode::OK); let body2 = Ok(Json(WebhookBody { message: "hello two".into(), stream: None, session_id: None, })); let second = handle_webhook(State(state), test_connect_info(), headers, body2) .await .into_response(); assert_eq!(second.status(), StatusCode::OK); let keys = tracking_impl.keys.lock().clone(); assert_eq!(keys.len(), 2); assert_ne!(keys[0], keys[1]); assert!(keys[0].starts_with("webhook_msg_")); assert!(keys[1].starts_with("webhook_msg_")); assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 2); } #[test] fn webhook_secret_hash_is_deterministic_and_nonempty() { let secret_a = generate_test_secret(); let secret_b = generate_test_secret(); let one = hash_webhook_secret(&secret_a); let two = hash_webhook_secret(&secret_a); let other = hash_webhook_secret(&secret_b); assert_eq!(one, two); assert_ne!(one, other); assert_eq!(one.len(), 64); } #[tokio::test] async fn webhook_secret_hash_rejects_missing_header() { let provider_impl = Arc::new(MockProvider::default()); let provider: Arc = provider_impl.clone(); let memory: Arc = Arc::new(MockMemory); let secret = generate_test_secret(); let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))), pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let response = handle_webhook( State(state), test_connect_info(), HeaderMap::new(), Ok(Json(WebhookBody { message: "hello".into(), stream: None, session_id: None, })), ) .await .into_response(); assert_eq!(response.status(), StatusCode::UNAUTHORIZED); assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0); } #[tokio::test] async fn webhook_secret_hash_rejects_invalid_header() { let provider_impl = Arc::new(MockProvider::default()); let provider: Arc = provider_impl.clone(); let memory: Arc = Arc::new(MockMemory); let valid_secret = generate_test_secret(); let wrong_secret = generate_test_secret(); let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&valid_secret))), pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let mut headers = HeaderMap::new(); headers.insert( "X-Webhook-Secret", HeaderValue::from_str(&wrong_secret).unwrap(), ); let response = handle_webhook( State(state), test_connect_info(), headers, Ok(Json(WebhookBody { message: "hello".into(), stream: None, session_id: None, })), ) .await .into_response(); assert_eq!(response.status(), StatusCode::UNAUTHORIZED); assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0); } #[tokio::test] async fn webhook_secret_hash_accepts_valid_header() { let provider_impl = Arc::new(MockProvider::default()); let provider: Arc = provider_impl.clone(); let memory: Arc = Arc::new(MockMemory); let secret = generate_test_secret(); let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: Some(Arc::from(hash_webhook_secret(&secret))), pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let mut headers = HeaderMap::new(); headers.insert("X-Webhook-Secret", HeaderValue::from_str(&secret).unwrap()); let response = handle_webhook( State(state), test_connect_info(), headers, Ok(Json(WebhookBody { message: "hello".into(), stream: None, session_id: None, })), ) .await .into_response(); assert_eq!(response.status(), StatusCode::OK); assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1); } fn compute_nextcloud_signature_hex(secret: &str, random: &str, body: &str) -> String { use hmac::{Hmac, Mac}; use sha2::Sha256; let payload = format!("{random}{body}"); let mut mac = Hmac::::new_from_slice(secret.as_bytes()).unwrap(); mac.update(payload.as_bytes()); hex::encode(mac.finalize().into_bytes()) } fn compute_github_signature_header(secret: &str, body: &str) -> String { use hmac::{Hmac, Mac}; use sha2::Sha256; let mut mac = Hmac::::new_from_slice(secret.as_bytes()).unwrap(); mac.update(body.as_bytes()); format!("sha256={}", hex::encode(mac.finalize().into_bytes())) } #[tokio::test] async fn github_webhook_returns_not_found_when_not_configured() { let provider: Arc = Arc::new(MockProvider::default()); let memory: Arc = Arc::new(MockMemory); let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let response = handle_github_webhook( State(state), HeaderMap::new(), Bytes::from_static(br#"{"action":"created"}"#), ) .await .into_response(); assert_eq!(response.status(), StatusCode::NOT_FOUND); } #[tokio::test] async fn github_webhook_rejects_invalid_signature() { let provider_impl = Arc::new(MockProvider::default()); let provider: Arc = provider_impl.clone(); let memory: Arc = Arc::new(MockMemory); let mut config = Config::default(); config.channels_config.github = Some(crate::config::schema::GitHubConfig { access_token: "ghp_test_token".into(), webhook_secret: Some("github-secret".into()), api_base_url: None, allowed_repos: vec!["zeroclaw-labs/zeroclaw".into()], }); let state = AppState { config: Arc::new(Mutex::new(config)), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let body = r#"{ "action":"created", "repository":{"full_name":"zeroclaw-labs/zeroclaw"}, "issue":{"number":2079,"title":"x"}, "comment":{"id":1,"body":"hello","user":{"login":"alice","type":"User"}} }"#; let mut headers = HeaderMap::new(); headers.insert("X-GitHub-Event", HeaderValue::from_static("issue_comment")); headers.insert( "X-Hub-Signature-256", HeaderValue::from_static("sha256=deadbeef"), ); let response = handle_github_webhook(State(state), headers, Bytes::from(body)) .await .into_response(); assert_eq!(response.status(), StatusCode::UNAUTHORIZED); assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0); } #[tokio::test] async fn github_webhook_duplicate_delivery_returns_duplicate_status() { let provider_impl = Arc::new(MockProvider::default()); let provider: Arc = provider_impl.clone(); let memory: Arc = Arc::new(MockMemory); let secret = "github-secret"; let mut config = Config::default(); config.channels_config.github = Some(crate::config::schema::GitHubConfig { access_token: "ghp_test_token".into(), webhook_secret: Some(secret.into()), api_base_url: None, allowed_repos: vec!["zeroclaw-labs/zeroclaw".into()], }); let state = AppState { config: Arc::new(Mutex::new(config)), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let body = r#"{ "action":"created", "repository":{"full_name":"zeroclaw-labs/zeroclaw"}, "issue":{"number":2079,"title":"x"}, "comment":{"id":1,"body":"hello","user":{"login":"alice","type":"User"}} }"#; let signature = compute_github_signature_header(secret, body); let mut headers = HeaderMap::new(); headers.insert("X-GitHub-Event", HeaderValue::from_static("issue_comment")); headers.insert( "X-Hub-Signature-256", HeaderValue::from_str(&signature).unwrap(), ); headers.insert("X-GitHub-Delivery", HeaderValue::from_static("delivery-1")); let first = handle_github_webhook( State(state.clone()), headers.clone(), Bytes::from(body.to_string()), ) .await .into_response(); assert_eq!(first.status(), StatusCode::OK); let second = handle_github_webhook(State(state), headers, Bytes::from(body.to_string())) .await .into_response(); assert_eq!(second.status(), StatusCode::OK); let payload = second.into_body().collect().await.unwrap().to_bytes(); let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap(); assert_eq!(parsed["status"], "duplicate"); assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0); } #[tokio::test] async fn nextcloud_talk_webhook_returns_not_found_when_not_configured() { let provider: Arc = Arc::new(MockProvider::default()); let memory: Arc = Arc::new(MockMemory); let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let response = handle_nextcloud_talk_webhook( State(state), HeaderMap::new(), Bytes::from_static(br#"{"type":"message"}"#), ) .await .into_response(); assert_eq!(response.status(), StatusCode::NOT_FOUND); } #[tokio::test] async fn nextcloud_talk_webhook_rejects_invalid_signature() { let provider_impl = Arc::new(MockProvider::default()); let provider: Arc = provider_impl.clone(); let memory: Arc = Arc::new(MockMemory); let channel = Arc::new(NextcloudTalkChannel::new( "https://cloud.example.com".into(), "app-token".into(), vec!["*".into()], )); let secret = "nextcloud-test-secret"; let random = "seed-value"; let body = r#"{"type":"message","object":{"token":"room-token"},"message":{"actorType":"users","actorId":"user_a","message":"hello"}}"#; let _valid_signature = compute_nextcloud_signature_hex(secret, random, body); let invalid_signature = "deadbeef"; let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: Some(channel), nextcloud_talk_webhook_secret: Some(Arc::from(secret)), wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let mut headers = HeaderMap::new(); headers.insert( "X-Nextcloud-Talk-Random", HeaderValue::from_str(random).unwrap(), ); headers.insert( "X-Nextcloud-Talk-Signature", HeaderValue::from_str(invalid_signature).unwrap(), ); let response = handle_nextcloud_talk_webhook(State(state), headers, Bytes::from(body)) .await .into_response(); assert_eq!(response.status(), StatusCode::UNAUTHORIZED); assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0); } #[tokio::test] async fn qq_webhook_returns_not_found_when_not_configured() { let provider: Arc = Arc::new(MockProvider::default()); let memory: Arc = Arc::new(MockMemory); let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: None, qq_webhook_enabled: false, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let response = handle_qq_webhook( State(state), HeaderMap::new(), Bytes::from_static(br#"{"op":13,"d":{"plain_token":"p","event_ts":"1"}}"#), ) .await .into_response(); assert_eq!(response.status(), StatusCode::NOT_FOUND); } #[tokio::test] async fn qq_webhook_validation_returns_signed_challenge() { let provider_impl = Arc::new(MockProvider::default()); let provider: Arc = provider_impl.clone(); let memory: Arc = Arc::new(MockMemory); let qq = Arc::new(QQChannel::new( "11111111".into(), "DG5g3B4j9X2KOErG".into(), vec!["*".into()], )); let state = AppState { config: Arc::new(Mutex::new(Config::default())), provider, model: "test-model".into(), temperature: 0.0, mem: memory, auto_save: false, webhook_secret_hash: None, pairing: Arc::new(PairingGuard::new(false, &[])), trust_forwarded_headers: false, rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), whatsapp: None, whatsapp_app_secret: None, linq: None, linq_signing_secret: None, bluebubbles: None, bluebubbles_webhook_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, wati: None, qq: Some(qq), qq_webhook_enabled: true, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), tools_registry_exec: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), max_tool_iterations: 10, cost_tracker: None, event_tx: tokio::sync::broadcast::channel(16).0, }; let mut headers = HeaderMap::new(); headers.insert("X-Bot-Appid", HeaderValue::from_static("11111111")); let response = handle_qq_webhook( State(state), headers, Bytes::from_static( br#"{"op":13,"d":{"plain_token":"Arq0D5A61EgUu4OxUvOp","event_ts":"1725442341"}}"#, ), ) .await .into_response(); assert_eq!(response.status(), StatusCode::OK); let payload = response.into_body().collect().await.unwrap().to_bytes(); let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap(); assert_eq!(parsed["plain_token"], "Arq0D5A61EgUu4OxUvOp"); assert_eq!( parsed["signature"], "87befc99c42c651b3aac0278e71ada338433ae26fcb24307bdc5ad38c1adc2d01bcfcadc0842edac85e85205028a1132afe09280305f13aa6909ffc2d652c706" ); assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 0); } // ══════════════════════════════════════════════════════════ // WhatsApp Signature Verification Tests (CWE-345 Prevention) // ══════════════════════════════════════════════════════════ fn compute_whatsapp_signature_hex(secret: &str, body: &[u8]) -> String { use hmac::{Hmac, Mac}; use sha2::Sha256; let mut mac = Hmac::::new_from_slice(secret.as_bytes()).unwrap(); mac.update(body); hex::encode(mac.finalize().into_bytes()) } fn compute_whatsapp_signature_header(secret: &str, body: &[u8]) -> String { format!("sha256={}", compute_whatsapp_signature_hex(secret, body)) } #[test] fn whatsapp_signature_valid() { let app_secret = generate_test_secret(); let body = b"test body content"; let signature_header = compute_whatsapp_signature_header(&app_secret, body); assert!(verify_whatsapp_signature( &app_secret, body, &signature_header )); } #[test] fn whatsapp_signature_invalid_wrong_secret() { let app_secret = generate_test_secret(); let wrong_secret = generate_test_secret(); let body = b"test body content"; let signature_header = compute_whatsapp_signature_header(&wrong_secret, body); assert!(!verify_whatsapp_signature( &app_secret, body, &signature_header )); } #[test] fn whatsapp_signature_invalid_wrong_body() { let app_secret = generate_test_secret(); let original_body = b"original body"; let tampered_body = b"tampered body"; let signature_header = compute_whatsapp_signature_header(&app_secret, original_body); // Verify with tampered body should fail assert!(!verify_whatsapp_signature( &app_secret, tampered_body, &signature_header )); } #[test] fn whatsapp_signature_missing_prefix() { let app_secret = generate_test_secret(); let body = b"test body"; // Signature without "sha256=" prefix let signature_header = "abc123def456"; assert!(!verify_whatsapp_signature( &app_secret, body, signature_header )); } #[test] fn whatsapp_signature_empty_header() { let app_secret = generate_test_secret(); let body = b"test body"; assert!(!verify_whatsapp_signature(&app_secret, body, "")); } #[test] fn whatsapp_signature_invalid_hex() { let app_secret = generate_test_secret(); let body = b"test body"; // Invalid hex characters let signature_header = "sha256=not_valid_hex_zzz"; assert!(!verify_whatsapp_signature( &app_secret, body, signature_header )); } #[test] fn whatsapp_signature_empty_body() { let app_secret = generate_test_secret(); let body = b""; let signature_header = compute_whatsapp_signature_header(&app_secret, body); assert!(verify_whatsapp_signature( &app_secret, body, &signature_header )); } #[test] fn whatsapp_signature_unicode_body() { let app_secret = generate_test_secret(); let body = "Hello 🦀 World".as_bytes(); let signature_header = compute_whatsapp_signature_header(&app_secret, body); assert!(verify_whatsapp_signature( &app_secret, body, &signature_header )); } #[test] fn whatsapp_signature_json_payload() { let app_secret = generate_test_secret(); let body = br#"{"entry":[{"changes":[{"value":{"messages":[{"from":"1234567890","text":{"body":"Hello"}}]}}]}]}"#; let signature_header = compute_whatsapp_signature_header(&app_secret, body); assert!(verify_whatsapp_signature( &app_secret, body, &signature_header )); } #[test] fn whatsapp_signature_case_sensitive_prefix() { let app_secret = generate_test_secret(); let body = b"test body"; let hex_sig = compute_whatsapp_signature_hex(&app_secret, body); // Wrong case prefix should fail let wrong_prefix = format!("SHA256={hex_sig}"); assert!(!verify_whatsapp_signature(&app_secret, body, &wrong_prefix)); // Correct prefix should pass let correct_prefix = format!("sha256={hex_sig}"); assert!(verify_whatsapp_signature( &app_secret, body, &correct_prefix )); } #[test] fn whatsapp_signature_truncated_hex() { let app_secret = generate_test_secret(); let body = b"test body"; let hex_sig = compute_whatsapp_signature_hex(&app_secret, body); let truncated = &hex_sig[..32]; // Only half the signature let signature_header = format!("sha256={truncated}"); assert!(!verify_whatsapp_signature( &app_secret, body, &signature_header )); } #[test] fn whatsapp_signature_extra_bytes() { let app_secret = generate_test_secret(); let body = b"test body"; let hex_sig = compute_whatsapp_signature_hex(&app_secret, body); let extended = format!("{hex_sig}deadbeef"); let signature_header = format!("sha256={extended}"); assert!(!verify_whatsapp_signature( &app_secret, body, &signature_header )); } // ══════════════════════════════════════════════════════════ // IdempotencyStore Edge-Case Tests // ══════════════════════════════════════════════════════════ #[test] fn idempotency_store_allows_different_keys() { let store = IdempotencyStore::new(Duration::from_secs(60), 100); assert!(store.record_if_new("key-a")); assert!(store.record_if_new("key-b")); assert!(store.record_if_new("key-c")); assert!(store.record_if_new("key-d")); } #[test] fn idempotency_store_max_keys_clamped_to_one() { let store = IdempotencyStore::new(Duration::from_secs(60), 0); assert!(store.record_if_new("only-key")); assert!(!store.record_if_new("only-key")); } #[test] fn idempotency_store_rapid_duplicate_rejected() { let store = IdempotencyStore::new(Duration::from_secs(300), 100); assert!(store.record_if_new("rapid")); assert!(!store.record_if_new("rapid")); } #[test] fn idempotency_store_accepts_after_ttl_expires() { let store = IdempotencyStore::new(Duration::from_millis(1), 100); assert!(store.record_if_new("ttl-key")); std::thread::sleep(Duration::from_millis(10)); assert!(store.record_if_new("ttl-key")); } #[test] fn idempotency_store_eviction_preserves_newest() { let store = IdempotencyStore::new(Duration::from_secs(300), 1); assert!(store.record_if_new("old-key")); std::thread::sleep(Duration::from_millis(2)); assert!(store.record_if_new("new-key")); let keys = store.keys.lock(); assert_eq!(keys.len(), 1); assert!(!keys.contains_key("old-key")); assert!(keys.contains_key("new-key")); } #[test] fn rate_limiter_allows_after_window_expires() { let window = Duration::from_millis(50); let limiter = SlidingWindowRateLimiter::new(2, window, 100); assert!(limiter.allow("ip-1")); assert!(limiter.allow("ip-1")); assert!(!limiter.allow("ip-1")); // blocked // Wait for window to expire std::thread::sleep(Duration::from_millis(60)); // Should be allowed again assert!(limiter.allow("ip-1")); } #[test] fn rate_limiter_independent_keys_tracked_separately() { let limiter = SlidingWindowRateLimiter::new(2, Duration::from_secs(60), 100); assert!(limiter.allow("ip-1")); assert!(limiter.allow("ip-1")); assert!(!limiter.allow("ip-1")); // ip-1 blocked // ip-2 should still work assert!(limiter.allow("ip-2")); assert!(limiter.allow("ip-2")); assert!(!limiter.allow("ip-2")); // ip-2 now blocked } #[test] fn rate_limiter_exact_boundary_at_max_keys() { let limiter = SlidingWindowRateLimiter::new(10, Duration::from_secs(60), 3); assert!(limiter.allow("ip-1")); assert!(limiter.allow("ip-2")); assert!(limiter.allow("ip-3")); // At capacity now assert!(limiter.allow("ip-4")); // should evict ip-1 let guard = limiter.requests.lock(); assert_eq!(guard.0.len(), 3); assert!( !guard.0.contains_key("ip-1"), "ip-1 should have been evicted" ); assert!(guard.0.contains_key("ip-2")); assert!(guard.0.contains_key("ip-3")); assert!(guard.0.contains_key("ip-4")); } #[test] fn gateway_rate_limiter_pair_and_webhook_are_independent() { let limiter = GatewayRateLimiter::new(2, 3, 100); // Exhaust pair limit assert!(limiter.allow_pair("ip-1")); assert!(limiter.allow_pair("ip-1")); assert!(!limiter.allow_pair("ip-1")); // pair blocked // Webhook should still work assert!(limiter.allow_webhook("ip-1")); assert!(limiter.allow_webhook("ip-1")); assert!(limiter.allow_webhook("ip-1")); assert!(!limiter.allow_webhook("ip-1")); // webhook now blocked } #[test] fn rate_limiter_single_key_max_allows_one_request() { let limiter = SlidingWindowRateLimiter::new(5, Duration::from_secs(60), 1); assert!(limiter.allow("ip-1")); assert!(limiter.allow("ip-2")); // evicts ip-1 let guard = limiter.requests.lock(); assert_eq!(guard.0.len(), 1); assert!(guard.0.contains_key("ip-2")); assert!(!guard.0.contains_key("ip-1")); } #[test] fn rate_limiter_concurrent_access_safe() { use std::sync::Arc; let limiter = Arc::new(SlidingWindowRateLimiter::new( 1000, Duration::from_secs(60), 1000, )); let mut handles = Vec::new(); for i in 0..10 { let limiter = limiter.clone(); handles.push(std::thread::spawn(move || { for j in 0..100 { limiter.allow(&format!("thread-{i}-req-{j}")); } })); } for handle in handles { handle.join().unwrap(); } // Should not panic or deadlock let guard = limiter.requests.lock(); assert!(guard.0.len() <= 1000, "should respect max_keys"); } #[test] fn idempotency_store_concurrent_access_safe() { use std::sync::Arc; let store = Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)); let mut handles = Vec::new(); for i in 0..10 { let store = store.clone(); handles.push(std::thread::spawn(move || { for j in 0..100 { store.record_if_new(&format!("thread-{i}-key-{j}")); } })); } for handle in handles { handle.join().unwrap(); } let keys = store.keys.lock(); assert!(keys.len() <= 1000, "should respect max_keys"); } #[test] fn rate_limiter_rapid_burst_then_cooldown() { let limiter = SlidingWindowRateLimiter::new(5, Duration::from_millis(50), 100); // Burst: use all 5 requests for _ in 0..5 { assert!(limiter.allow("burst-ip")); } assert!(!limiter.allow("burst-ip")); // 6th should fail // Cooldown std::thread::sleep(Duration::from_millis(60)); // Should be allowed again assert!(limiter.allow("burst-ip")); } }