From ecc8865cb773eb48a28236b85bc7973421b5fd95 Mon Sep 17 00:00:00 2001 From: Ken Yeung Date: Mon, 23 Feb 2026 21:02:00 +0800 Subject: [PATCH] feat: add WATI WhatsApp Business API channel (#1472) Add a new WATI channel for WhatsApp Business API integration via the WATI managed platform. WATI simplifies WhatsApp integration with its own REST API and webhook system. - New WatiChannel implementation (webhook mode, REST send) - WatiConfig with api_token, api_url, tenant_id, allowed_numbers - Gateway routes: GET/POST /wati for webhook verification and messages - Flexible webhook parsing handles WATI's variable field names - 15 unit tests covering parsing, allowlist, timestamps, phone normalization --- src/channels/mod.rs | 14 ++ src/channels/wati.rs | 505 +++++++++++++++++++++++++++++++++++++++++++ src/config/schema.rs | 40 ++++ src/gateway/mod.rs | 127 ++++++++++- 4 files changed, 685 insertions(+), 1 deletion(-) create mode 100644 src/channels/wati.rs diff --git a/src/channels/mod.rs b/src/channels/mod.rs index e8093c839..9e6e18b98 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -35,6 +35,7 @@ pub mod slack; pub mod telegram; pub mod traits; pub mod transcription; +pub mod wati; pub mod whatsapp; #[cfg(feature = "whatsapp-web")] pub mod whatsapp_storage; @@ -61,6 +62,7 @@ pub use signal::SignalChannel; pub use slack::SlackChannel; pub use telegram::TelegramChannel; pub use traits::{Channel, SendMessage}; +pub use wati::WatiChannel; pub use whatsapp::WhatsAppChannel; #[cfg(feature = "whatsapp-web")] pub use whatsapp_web::WhatsAppWebChannel; @@ -2813,6 +2815,18 @@ fn collect_configured_channels( }); } + if let Some(ref wati_cfg) = config.channels_config.wati { + channels.push(ConfiguredChannel { + display_name: "WATI", + channel: Arc::new(WatiChannel::new( + wati_cfg.api_token.clone(), + wati_cfg.api_url.clone(), + wati_cfg.tenant_id.clone(), + wati_cfg.allowed_numbers.clone(), + )), + }); + } + if let Some(ref nc) = config.channels_config.nextcloud_talk { channels.push(ConfiguredChannel { display_name: "Nextcloud Talk", diff --git a/src/channels/wati.rs b/src/channels/wati.rs new file mode 100644 index 000000000..6e3037027 --- /dev/null +++ b/src/channels/wati.rs @@ -0,0 +1,505 @@ +use super::traits::{Channel, ChannelMessage, SendMessage}; +use async_trait::async_trait; +use uuid::Uuid; + +/// WATI WhatsApp Business API channel. +/// +/// This channel operates in webhook mode (push-based) rather than polling. +/// Messages are received via the gateway's `/wati` webhook endpoint. +/// The `listen` method here is a keepalive placeholder; actual message handling +/// happens in the gateway when WATI sends webhook events. +pub struct WatiChannel { + api_token: String, + api_url: String, + tenant_id: Option, + allowed_numbers: Vec, + client: reqwest::Client, +} + +impl WatiChannel { + pub fn new( + api_token: String, + api_url: String, + tenant_id: Option, + allowed_numbers: Vec, + ) -> Self { + Self { + api_token, + api_url, + tenant_id, + allowed_numbers, + client: crate::config::build_runtime_proxy_client("channel.wati"), + } + } + + /// Check if a phone number is allowed (E.164 format: +1234567890). + fn is_number_allowed(&self, phone: &str) -> bool { + self.allowed_numbers.iter().any(|n| n == "*" || n == phone) + } + + /// Build the target field for the WATI API, prefixing with tenant_id if set. + fn build_target(&self, phone: &str) -> String { + // Strip leading '+' — WATI expects bare digits + let bare = phone.strip_prefix('+').unwrap_or(phone); + if let Some(ref tid) = self.tenant_id { + if bare.starts_with(&format!("{tid}:")) { + bare.to_string() + } else { + format!("{tid}:{bare}") + } + } else { + bare.to_string() + } + } + + /// Parse an incoming webhook payload from WATI and extract messages. + /// + /// WATI's webhook payloads have variable field names depending on the API + /// version and configuration, so we try multiple paths for each field. + pub fn parse_webhook_payload(&self, payload: &serde_json::Value) -> Vec { + let mut messages = Vec::new(); + + // Extract text — try multiple field paths + let text = payload + .get("text") + .and_then(|v| v.as_str()) + .or_else(|| { + payload + .get("message") + .and_then(|m| m.get("text").or_else(|| m.get("body"))) + .and_then(|v| v.as_str()) + }) + .unwrap_or("") + .trim(); + + if text.is_empty() { + return messages; + } + + // Check fromMe — skip outgoing messages + let from_me = payload + .get("fromMe") + .or_else(|| payload.get("from_me")) + .or_else(|| payload.get("owner")) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + if from_me { + tracing::debug!("WATI: skipping fromMe message"); + return messages; + } + + // Extract waId (sender phone number) + let wa_id = payload + .get("waId") + .or_else(|| payload.get("wa_id")) + .or_else(|| payload.get("from")) + .and_then(|v| v.as_str()) + .unwrap_or("") + .trim(); + + if wa_id.is_empty() { + return messages; + } + + // Normalize phone to E.164 format + let normalized_phone = if wa_id.starts_with('+') { + wa_id.to_string() + } else { + format!("+{wa_id}") + }; + + // Check allowlist + if !self.is_number_allowed(&normalized_phone) { + tracing::warn!( + "WATI: ignoring message from unauthorized sender: {normalized_phone}. \ + Add to channels.wati.allowed_numbers in config.toml, \ + or run `zeroclaw onboard --channels-only` to configure interactively." + ); + return messages; + } + + // Extract timestamp — handle unix seconds, unix ms, or ISO string + let timestamp = payload + .get("timestamp") + .or_else(|| payload.get("created")) + .map(|t| { + if let Some(secs) = t.as_u64() { + // Distinguish seconds from milliseconds (ms > 10_000_000_000) + if secs > 10_000_000_000 { + secs / 1000 + } else { + secs + } + } else if let Some(s) = t.as_str() { + chrono::DateTime::parse_from_rfc3339(s) + .ok() + .map(|dt| dt.timestamp().cast_unsigned()) + .unwrap_or_else(|| { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + }) + } else { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + } + }) + .unwrap_or_else(|| { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + }); + + messages.push(ChannelMessage { + id: Uuid::new_v4().to_string(), + reply_target: normalized_phone.clone(), + sender: normalized_phone, + content: text.to_string(), + channel: "wati".to_string(), + timestamp, + thread_ts: None, + }); + + messages + } +} + +#[async_trait] +impl Channel for WatiChannel { + fn name(&self) -> &str { + "wati" + } + + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { + let target = self.build_target(&message.recipient); + + let body = serde_json::json!({ + "target": target, + "text": message.content + }); + + let url = format!("{}/api/ext/v3/conversations/messages/text", self.api_url); + + let resp = self + .client + .post(&url) + .bearer_auth(&self.api_token) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await?; + + if !resp.status().is_success() { + let status = resp.status(); + let error_body = resp.text().await.unwrap_or_default(); + tracing::error!("WATI send failed: {status} — {error_body}"); + anyhow::bail!("WATI API error: {status}"); + } + + Ok(()) + } + + async fn listen(&self, _tx: tokio::sync::mpsc::Sender) -> anyhow::Result<()> { + // WATI uses webhooks (push-based), not polling. + // Messages are received via the gateway's /wati endpoint. + tracing::info!( + "WATI channel active (webhook mode). \ + Configure WATI webhook to POST to your gateway's /wati endpoint." + ); + + // Keep the task alive — it will be cancelled when the channel shuts down + loop { + tokio::time::sleep(std::time::Duration::from_secs(3600)).await; + } + } + + async fn health_check(&self) -> bool { + let url = format!("{}/api/ext/v3/contacts/count", self.api_url); + + self.client + .get(&url) + .bearer_auth(&self.api_token) + .send() + .await + .map(|r| r.status().is_success()) + .unwrap_or(false) + } + + async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> { + // WATI API does not support typing indicators + Ok(()) + } + + async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> { + // WATI API does not support typing indicators + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_channel() -> WatiChannel { + WatiChannel { + api_token: "test-token".into(), + api_url: "https://live-mt-server.wati.io".into(), + tenant_id: None, + allowed_numbers: vec!["+1234567890".into()], + client: reqwest::Client::new(), + } + } + + fn make_wildcard_channel() -> WatiChannel { + WatiChannel { + api_token: "test-token".into(), + api_url: "https://live-mt-server.wati.io".into(), + tenant_id: None, + allowed_numbers: vec!["*".into()], + client: reqwest::Client::new(), + } + } + + #[test] + fn wati_channel_name() { + let ch = make_channel(); + assert_eq!(ch.name(), "wati"); + } + + #[test] + fn wati_number_allowed_exact() { + let ch = make_channel(); + assert!(ch.is_number_allowed("+1234567890")); + assert!(!ch.is_number_allowed("+9876543210")); + } + + #[test] + fn wati_number_allowed_wildcard() { + let ch = make_wildcard_channel(); + assert!(ch.is_number_allowed("+1234567890")); + assert!(ch.is_number_allowed("+9999999999")); + } + + #[test] + fn wati_number_allowed_empty() { + let ch = WatiChannel { + api_token: "tok".into(), + api_url: "https://live-mt-server.wati.io".into(), + tenant_id: None, + allowed_numbers: vec![], + client: reqwest::Client::new(), + }; + assert!(!ch.is_number_allowed("+1234567890")); + } + + #[test] + fn wati_build_target_with_tenant() { + let ch = WatiChannel { + api_token: "tok".into(), + api_url: "https://live-mt-server.wati.io".into(), + tenant_id: Some("tenant1".into()), + allowed_numbers: vec![], + client: reqwest::Client::new(), + }; + assert_eq!(ch.build_target("+1234567890"), "tenant1:1234567890"); + } + + #[test] + fn wati_build_target_without_tenant() { + let ch = make_channel(); + assert_eq!(ch.build_target("+1234567890"), "1234567890"); + } + + #[test] + fn wati_build_target_already_prefixed() { + let ch = WatiChannel { + api_token: "tok".into(), + api_url: "https://live-mt-server.wati.io".into(), + tenant_id: Some("tenant1".into()), + allowed_numbers: vec![], + client: reqwest::Client::new(), + }; + // If the phone already has the tenant prefix, don't double it + assert_eq!(ch.build_target("tenant1:1234567890"), "tenant1:1234567890"); + } + + #[test] + fn wati_parse_valid_message() { + let ch = make_channel(); + let payload = serde_json::json!({ + "text": "Hello from WATI!", + "waId": "1234567890", + "fromMe": false, + "timestamp": 1705320000u64 + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].sender, "+1234567890"); + assert_eq!(msgs[0].content, "Hello from WATI!"); + assert_eq!(msgs[0].channel, "wati"); + assert_eq!(msgs[0].reply_target, "+1234567890"); + assert_eq!(msgs[0].timestamp, 1705320000); + } + + #[test] + fn wati_parse_skip_from_me() { + let ch = make_wildcard_channel(); + let payload = serde_json::json!({ + "text": "My own message", + "waId": "1234567890", + "fromMe": true + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert!(msgs.is_empty(), "fromMe messages should be skipped"); + } + + #[test] + fn wati_parse_skip_no_text() { + let ch = make_wildcard_channel(); + let payload = serde_json::json!({ + "waId": "1234567890", + "fromMe": false + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert!(msgs.is_empty(), "Messages without text should be skipped"); + } + + #[test] + fn wati_parse_alternative_field_names() { + let ch = make_wildcard_channel(); + + // wa_id instead of waId, message.body instead of text + let payload = serde_json::json!({ + "message": { "body": "Alt field test" }, + "wa_id": "1234567890", + "from_me": false, + "timestamp": 1705320000u64 + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].content, "Alt field test"); + assert_eq!(msgs[0].sender, "+1234567890"); + } + + #[test] + fn wati_parse_timestamp_seconds() { + let ch = make_wildcard_channel(); + let payload = serde_json::json!({ + "text": "Test", + "waId": "1234567890", + "timestamp": 1705320000u64 + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert_eq!(msgs[0].timestamp, 1705320000); + } + + #[test] + fn wati_parse_timestamp_milliseconds() { + let ch = make_wildcard_channel(); + let payload = serde_json::json!({ + "text": "Test", + "waId": "1234567890", + "timestamp": 1705320000000u64 + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert_eq!(msgs[0].timestamp, 1705320000); + } + + #[test] + fn wati_parse_timestamp_iso() { + let ch = make_wildcard_channel(); + let payload = serde_json::json!({ + "text": "Test", + "waId": "1234567890", + "timestamp": "2025-01-15T12:00:00Z" + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert_eq!(msgs[0].timestamp, 1736942400); + } + + #[test] + fn wati_parse_normalizes_phone() { + let ch = WatiChannel { + api_token: "tok".into(), + api_url: "https://live-mt-server.wati.io".into(), + tenant_id: None, + allowed_numbers: vec!["+1234567890".into()], + client: reqwest::Client::new(), + }; + + // Phone without + prefix + let payload = serde_json::json!({ + "text": "Hi", + "waId": "1234567890", + "fromMe": false + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].sender, "+1234567890"); + } + + #[test] + fn wati_parse_empty_payload() { + let ch = make_channel(); + let payload = serde_json::json!({}); + let msgs = ch.parse_webhook_payload(&payload); + assert!(msgs.is_empty()); + } + + #[test] + fn wati_parse_from_field_fallback() { + let ch = make_wildcard_channel(); + // Uses "from" instead of "waId" + let payload = serde_json::json!({ + "text": "Fallback test", + "from": "1234567890", + "fromMe": false + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].sender, "+1234567890"); + } + + #[test] + fn wati_parse_message_text_fallback() { + let ch = make_wildcard_channel(); + // Uses "message.text" instead of top-level "text" + let payload = serde_json::json!({ + "message": { "text": "Nested text" }, + "waId": "1234567890", + "fromMe": false + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].content, "Nested text"); + } + + #[test] + fn wati_parse_owner_field_as_from_me() { + let ch = make_wildcard_channel(); + // Uses "owner" field as fromMe indicator + let payload = serde_json::json!({ + "text": "Test", + "waId": "1234567890", + "owner": true + }); + + let msgs = ch.parse_webhook_payload(&payload); + assert!(msgs.is_empty(), "owner=true messages should be skipped"); + } +} diff --git a/src/config/schema.rs b/src/config/schema.rs index fcc00a1ba..b22068baf 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -33,6 +33,7 @@ const SUPPORTED_PROXY_SERVICE_KEYS: &[&str] = &[ "channel.signal", "channel.slack", "channel.telegram", + "channel.wati", "channel.whatsapp", "tool.browser", "tool.composio", @@ -2444,6 +2445,8 @@ pub struct ChannelsConfig { pub whatsapp: Option, /// Linq Partner API channel configuration. pub linq: Option, + /// WATI WhatsApp Business API channel configuration. + pub wati: Option, /// Nextcloud Talk bot channel configuration. pub nextcloud_talk: Option, /// Email channel configuration. @@ -2511,6 +2514,10 @@ impl ChannelsConfig { Box::new(ConfigWrapper::new(&self.linq)), self.linq.is_some(), ), + ( + Box::new(ConfigWrapper::new(&self.wati)), + self.wati.is_some(), + ), ( Box::new(ConfigWrapper::new(&self.nextcloud_talk)), self.nextcloud_talk.is_some(), @@ -2578,6 +2585,7 @@ impl Default for ChannelsConfig { signal: None, whatsapp: None, linq: None, + wati: None, nextcloud_talk: None, email: None, irc: None, @@ -2887,6 +2895,35 @@ impl ChannelConfig for LinqConfig { } } +/// WATI WhatsApp Business API channel configuration. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct WatiConfig { + /// WATI API token (Bearer auth). + pub api_token: String, + /// WATI API base URL (default: https://live-mt-server.wati.io). + #[serde(default = "default_wati_api_url")] + pub api_url: String, + /// Tenant ID for multi-channel setups (optional). + #[serde(default)] + pub tenant_id: Option, + /// Allowed phone numbers (E.164 format) or "*" for all. + #[serde(default)] + pub allowed_numbers: Vec, +} + +fn default_wati_api_url() -> String { + "https://live-mt-server.wati.io".to_string() +} + +impl ChannelConfig for WatiConfig { + fn name() -> &'static str { + "WATI" + } + fn desc() -> &'static str { + "WhatsApp via WATI Business API" + } +} + /// Nextcloud Talk bot configuration (webhook receive + OCS send API). #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] pub struct NextcloudTalkConfig { @@ -4704,6 +4741,7 @@ default_temperature = 0.7 signal: None, whatsapp: None, linq: None, + wati: None, nextcloud_talk: None, email: None, irc: None, @@ -5258,6 +5296,7 @@ allowed_users = ["@ops:matrix.org"] signal: None, whatsapp: None, linq: None, + wati: None, nextcloud_talk: None, email: None, irc: None, @@ -5471,6 +5510,7 @@ channel_id = "C123" allowed_numbers: vec!["+1".into()], }), linq: None, + wati: None, nextcloud_talk: None, email: None, irc: None, diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 78d347ee9..133781ffc 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -12,7 +12,7 @@ pub mod sse; pub mod static_files; pub mod ws; -use crate::channels::{Channel, LinqChannel, NextcloudTalkChannel, SendMessage, WhatsAppChannel}; +use crate::channels::{Channel, LinqChannel, NextcloudTalkChannel, SendMessage, WatiChannel, WhatsAppChannel}; use crate::config::Config; use crate::cost::CostTracker; use crate::memory::{self, Memory, MemoryCategory}; @@ -64,6 +64,10 @@ fn linq_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String { format!("linq_{}_{}", msg.sender, msg.id) } +fn wati_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String { + format!("wati_{}_{}", msg.sender, msg.id) +} + fn nextcloud_talk_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String { format!("nextcloud_talk_{}_{}", msg.sender, msg.id) } @@ -295,6 +299,7 @@ pub struct AppState { pub nextcloud_talk: Option>, /// Nextcloud Talk webhook secret for signature verification pub nextcloud_talk_webhook_secret: Option>, + pub wati: Option>, /// Observability backend for metrics scraping pub observer: Arc, /// Registered tool specs (for web dashboard tools page) @@ -474,6 +479,17 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { }) .map(Arc::from); + // WATI channel (if configured) + let wati_channel: Option> = + config.channels_config.wati.as_ref().map(|wati_cfg| { + Arc::new(WatiChannel::new( + wati_cfg.api_token.clone(), + wati_cfg.api_url.clone(), + wati_cfg.tenant_id.clone(), + wati_cfg.allowed_numbers.clone(), + )) + }); + // Nextcloud Talk channel (if configured) let nextcloud_talk_channel: Option> = config.channels_config.nextcloud_talk.as_ref().map(|nc| { @@ -563,6 +579,10 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { if linq_channel.is_some() { println!(" POST /linq — Linq message webhook (iMessage/RCS/SMS)"); } + if wati_channel.is_some() { + println!(" GET /wati — WATI webhook verification"); + println!(" POST /wati — WATI message webhook"); + } if nextcloud_talk_channel.is_some() { println!(" POST /nextcloud-talk — Nextcloud Talk bot webhook"); } @@ -616,6 +636,7 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { linq_signing_secret, nextcloud_talk: nextcloud_talk_channel, nextcloud_talk_webhook_secret, + wati: wati_channel, observer: broadcast_observer, tools_registry, cost_tracker, @@ -637,6 +658,8 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { .route("/whatsapp", get(handle_whatsapp_verify)) .route("/whatsapp", post(handle_whatsapp_message)) .route("/linq", post(handle_linq_webhook)) + .route("/wati", get(handle_wati_verify)) + .route("/wati", post(handle_wati_webhook)) .route("/nextcloud-talk", post(handle_nextcloud_talk_webhook)) // ── Web Dashboard API routes ── .route("/api/status", get(api::handle_api_status)) @@ -1293,6 +1316,101 @@ async fn handle_linq_webhook( (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))) } +/// GET /wati — WATI webhook verification (echoes hub.challenge) +async fn handle_wati_verify( + State(state): State, + Query(params): Query, +) -> impl IntoResponse { + if state.wati.is_none() { + return (StatusCode::NOT_FOUND, "WATI not configured".to_string()); + } + + // WATI may use Meta-style webhook verification; echo the challenge + if let Some(challenge) = params.challenge { + tracing::info!("WATI webhook verified successfully"); + return (StatusCode::OK, challenge); + } + + (StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string()) +} + +#[derive(Debug, Deserialize)] +pub struct WatiVerifyQuery { + #[serde(rename = "hub.challenge")] + pub challenge: Option, +} + +/// POST /wati — incoming WATI WhatsApp message webhook +async fn handle_wati_webhook( + State(state): State, + body: Bytes, +) -> impl IntoResponse { + let Some(ref wati) = state.wati else { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": "WATI not configured"})), + ); + }; + + // Parse JSON body + let Ok(payload) = serde_json::from_slice::(&body) else { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "Invalid JSON payload"})), + ); + }; + + // Parse messages from the webhook payload + let messages = wati.parse_webhook_payload(&payload); + + if messages.is_empty() { + return (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))); + } + + // Process each message + for msg in &messages { + tracing::info!( + "WATI message from {}: {}", + msg.sender, + truncate_with_ellipsis(&msg.content, 50) + ); + + // Auto-save to memory + if state.auto_save { + let key = wati_memory_key(msg); + let _ = state + .mem + .store(&key, &msg.content, MemoryCategory::Conversation, None) + .await; + } + + // Call the LLM + match run_gateway_chat_with_tools(&state, &msg.content).await { + Ok(response) => { + // Send reply via WATI + if let Err(e) = wati + .send(&SendMessage::new(response, &msg.reply_target)) + .await + { + tracing::error!("Failed to send WATI reply: {e}"); + } + } + Err(e) => { + tracing::error!("LLM error for WATI message: {e:#}"); + let _ = wati + .send(&SendMessage::new( + "Sorry, I couldn't process your message right now.", + &msg.reply_target, + )) + .await; + } + } + } + + // Acknowledge the webhook + (StatusCode::OK, Json(serde_json::json!({"status": "ok"}))) +} + /// POST /nextcloud-talk — incoming message webhook (Nextcloud Talk bot API) async fn handle_nextcloud_talk_webhook( State(state): State, @@ -1472,6 +1590,7 @@ mod tests { linq_signing_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, + wati: None, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), cost_tracker: None, @@ -1885,6 +2004,7 @@ mod tests { linq_signing_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, + wati: None, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), cost_tracker: None, @@ -1948,6 +2068,7 @@ mod tests { linq_signing_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, + wati: None, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), cost_tracker: None, @@ -2023,6 +2144,7 @@ mod tests { linq_signing_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, + wati: None, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), cost_tracker: None, @@ -2070,6 +2192,7 @@ mod tests { linq_signing_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, + wati: None, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), cost_tracker: None, @@ -2122,6 +2245,7 @@ mod tests { linq_signing_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, + wati: None, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), cost_tracker: None, @@ -2179,6 +2303,7 @@ mod tests { linq_signing_secret: None, nextcloud_talk: None, nextcloud_talk_webhook_secret: None, + wati: None, observer: Arc::new(crate::observability::NoopObserver), tools_registry: Arc::new(Vec::new()), cost_tracker: None,