feat: add WATI WhatsApp Business API channel (#1472)
Add a new WATI channel for WhatsApp Business API integration via the WATI managed platform. WATI simplifies WhatsApp integration with its own REST API and webhook system. - New WatiChannel implementation (webhook mode, REST send) - WatiConfig with api_token, api_url, tenant_id, allowed_numbers - Gateway routes: GET/POST /wati for webhook verification and messages - Flexible webhook parsing handles WATI's variable field names - 15 unit tests covering parsing, allowlist, timestamps, phone normalization
This commit is contained in:
parent
10dd428de1
commit
ecc8865cb7
@ -35,6 +35,7 @@ pub mod slack;
|
||||
pub mod telegram;
|
||||
pub mod traits;
|
||||
pub mod transcription;
|
||||
pub mod wati;
|
||||
pub mod whatsapp;
|
||||
#[cfg(feature = "whatsapp-web")]
|
||||
pub mod whatsapp_storage;
|
||||
@ -61,6 +62,7 @@ pub use signal::SignalChannel;
|
||||
pub use slack::SlackChannel;
|
||||
pub use telegram::TelegramChannel;
|
||||
pub use traits::{Channel, SendMessage};
|
||||
pub use wati::WatiChannel;
|
||||
pub use whatsapp::WhatsAppChannel;
|
||||
#[cfg(feature = "whatsapp-web")]
|
||||
pub use whatsapp_web::WhatsAppWebChannel;
|
||||
@ -2813,6 +2815,18 @@ fn collect_configured_channels(
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(ref wati_cfg) = config.channels_config.wati {
|
||||
channels.push(ConfiguredChannel {
|
||||
display_name: "WATI",
|
||||
channel: Arc::new(WatiChannel::new(
|
||||
wati_cfg.api_token.clone(),
|
||||
wati_cfg.api_url.clone(),
|
||||
wati_cfg.tenant_id.clone(),
|
||||
wati_cfg.allowed_numbers.clone(),
|
||||
)),
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(ref nc) = config.channels_config.nextcloud_talk {
|
||||
channels.push(ConfiguredChannel {
|
||||
display_name: "Nextcloud Talk",
|
||||
|
||||
505
src/channels/wati.rs
Normal file
505
src/channels/wati.rs
Normal file
@ -0,0 +1,505 @@
|
||||
use super::traits::{Channel, ChannelMessage, SendMessage};
|
||||
use async_trait::async_trait;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// WATI WhatsApp Business API channel.
|
||||
///
|
||||
/// This channel operates in webhook mode (push-based) rather than polling.
|
||||
/// Messages are received via the gateway's `/wati` webhook endpoint.
|
||||
/// The `listen` method here is a keepalive placeholder; actual message handling
|
||||
/// happens in the gateway when WATI sends webhook events.
|
||||
pub struct WatiChannel {
|
||||
api_token: String,
|
||||
api_url: String,
|
||||
tenant_id: Option<String>,
|
||||
allowed_numbers: Vec<String>,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
impl WatiChannel {
|
||||
pub fn new(
|
||||
api_token: String,
|
||||
api_url: String,
|
||||
tenant_id: Option<String>,
|
||||
allowed_numbers: Vec<String>,
|
||||
) -> Self {
|
||||
Self {
|
||||
api_token,
|
||||
api_url,
|
||||
tenant_id,
|
||||
allowed_numbers,
|
||||
client: crate::config::build_runtime_proxy_client("channel.wati"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a phone number is allowed (E.164 format: +1234567890).
|
||||
fn is_number_allowed(&self, phone: &str) -> bool {
|
||||
self.allowed_numbers.iter().any(|n| n == "*" || n == phone)
|
||||
}
|
||||
|
||||
/// Build the target field for the WATI API, prefixing with tenant_id if set.
|
||||
fn build_target(&self, phone: &str) -> String {
|
||||
// Strip leading '+' — WATI expects bare digits
|
||||
let bare = phone.strip_prefix('+').unwrap_or(phone);
|
||||
if let Some(ref tid) = self.tenant_id {
|
||||
if bare.starts_with(&format!("{tid}:")) {
|
||||
bare.to_string()
|
||||
} else {
|
||||
format!("{tid}:{bare}")
|
||||
}
|
||||
} else {
|
||||
bare.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse an incoming webhook payload from WATI and extract messages.
|
||||
///
|
||||
/// WATI's webhook payloads have variable field names depending on the API
|
||||
/// version and configuration, so we try multiple paths for each field.
|
||||
pub fn parse_webhook_payload(&self, payload: &serde_json::Value) -> Vec<ChannelMessage> {
|
||||
let mut messages = Vec::new();
|
||||
|
||||
// Extract text — try multiple field paths
|
||||
let text = payload
|
||||
.get("text")
|
||||
.and_then(|v| v.as_str())
|
||||
.or_else(|| {
|
||||
payload
|
||||
.get("message")
|
||||
.and_then(|m| m.get("text").or_else(|| m.get("body")))
|
||||
.and_then(|v| v.as_str())
|
||||
})
|
||||
.unwrap_or("")
|
||||
.trim();
|
||||
|
||||
if text.is_empty() {
|
||||
return messages;
|
||||
}
|
||||
|
||||
// Check fromMe — skip outgoing messages
|
||||
let from_me = payload
|
||||
.get("fromMe")
|
||||
.or_else(|| payload.get("from_me"))
|
||||
.or_else(|| payload.get("owner"))
|
||||
.and_then(|v| v.as_bool())
|
||||
.unwrap_or(false);
|
||||
|
||||
if from_me {
|
||||
tracing::debug!("WATI: skipping fromMe message");
|
||||
return messages;
|
||||
}
|
||||
|
||||
// Extract waId (sender phone number)
|
||||
let wa_id = payload
|
||||
.get("waId")
|
||||
.or_else(|| payload.get("wa_id"))
|
||||
.or_else(|| payload.get("from"))
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("")
|
||||
.trim();
|
||||
|
||||
if wa_id.is_empty() {
|
||||
return messages;
|
||||
}
|
||||
|
||||
// Normalize phone to E.164 format
|
||||
let normalized_phone = if wa_id.starts_with('+') {
|
||||
wa_id.to_string()
|
||||
} else {
|
||||
format!("+{wa_id}")
|
||||
};
|
||||
|
||||
// Check allowlist
|
||||
if !self.is_number_allowed(&normalized_phone) {
|
||||
tracing::warn!(
|
||||
"WATI: ignoring message from unauthorized sender: {normalized_phone}. \
|
||||
Add to channels.wati.allowed_numbers in config.toml, \
|
||||
or run `zeroclaw onboard --channels-only` to configure interactively."
|
||||
);
|
||||
return messages;
|
||||
}
|
||||
|
||||
// Extract timestamp — handle unix seconds, unix ms, or ISO string
|
||||
let timestamp = payload
|
||||
.get("timestamp")
|
||||
.or_else(|| payload.get("created"))
|
||||
.map(|t| {
|
||||
if let Some(secs) = t.as_u64() {
|
||||
// Distinguish seconds from milliseconds (ms > 10_000_000_000)
|
||||
if secs > 10_000_000_000 {
|
||||
secs / 1000
|
||||
} else {
|
||||
secs
|
||||
}
|
||||
} else if let Some(s) = t.as_str() {
|
||||
chrono::DateTime::parse_from_rfc3339(s)
|
||||
.ok()
|
||||
.map(|dt| dt.timestamp().cast_unsigned())
|
||||
.unwrap_or_else(|| {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs()
|
||||
})
|
||||
} else {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs()
|
||||
}
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs()
|
||||
});
|
||||
|
||||
messages.push(ChannelMessage {
|
||||
id: Uuid::new_v4().to_string(),
|
||||
reply_target: normalized_phone.clone(),
|
||||
sender: normalized_phone,
|
||||
content: text.to_string(),
|
||||
channel: "wati".to_string(),
|
||||
timestamp,
|
||||
thread_ts: None,
|
||||
});
|
||||
|
||||
messages
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Channel for WatiChannel {
|
||||
fn name(&self) -> &str {
|
||||
"wati"
|
||||
}
|
||||
|
||||
async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
|
||||
let target = self.build_target(&message.recipient);
|
||||
|
||||
let body = serde_json::json!({
|
||||
"target": target,
|
||||
"text": message.content
|
||||
});
|
||||
|
||||
let url = format!("{}/api/ext/v3/conversations/messages/text", self.api_url);
|
||||
|
||||
let resp = self
|
||||
.client
|
||||
.post(&url)
|
||||
.bearer_auth(&self.api_token)
|
||||
.header("Content-Type", "application/json")
|
||||
.json(&body)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let status = resp.status();
|
||||
let error_body = resp.text().await.unwrap_or_default();
|
||||
tracing::error!("WATI send failed: {status} — {error_body}");
|
||||
anyhow::bail!("WATI API error: {status}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn listen(&self, _tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
|
||||
// WATI uses webhooks (push-based), not polling.
|
||||
// Messages are received via the gateway's /wati endpoint.
|
||||
tracing::info!(
|
||||
"WATI channel active (webhook mode). \
|
||||
Configure WATI webhook to POST to your gateway's /wati endpoint."
|
||||
);
|
||||
|
||||
// Keep the task alive — it will be cancelled when the channel shuts down
|
||||
loop {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn health_check(&self) -> bool {
|
||||
let url = format!("{}/api/ext/v3/contacts/count", self.api_url);
|
||||
|
||||
self.client
|
||||
.get(&url)
|
||||
.bearer_auth(&self.api_token)
|
||||
.send()
|
||||
.await
|
||||
.map(|r| r.status().is_success())
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> {
|
||||
// WATI API does not support typing indicators
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
|
||||
// WATI API does not support typing indicators
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn make_channel() -> WatiChannel {
|
||||
WatiChannel {
|
||||
api_token: "test-token".into(),
|
||||
api_url: "https://live-mt-server.wati.io".into(),
|
||||
tenant_id: None,
|
||||
allowed_numbers: vec!["+1234567890".into()],
|
||||
client: reqwest::Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_wildcard_channel() -> WatiChannel {
|
||||
WatiChannel {
|
||||
api_token: "test-token".into(),
|
||||
api_url: "https://live-mt-server.wati.io".into(),
|
||||
tenant_id: None,
|
||||
allowed_numbers: vec!["*".into()],
|
||||
client: reqwest::Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_channel_name() {
|
||||
let ch = make_channel();
|
||||
assert_eq!(ch.name(), "wati");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_number_allowed_exact() {
|
||||
let ch = make_channel();
|
||||
assert!(ch.is_number_allowed("+1234567890"));
|
||||
assert!(!ch.is_number_allowed("+9876543210"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_number_allowed_wildcard() {
|
||||
let ch = make_wildcard_channel();
|
||||
assert!(ch.is_number_allowed("+1234567890"));
|
||||
assert!(ch.is_number_allowed("+9999999999"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_number_allowed_empty() {
|
||||
let ch = WatiChannel {
|
||||
api_token: "tok".into(),
|
||||
api_url: "https://live-mt-server.wati.io".into(),
|
||||
tenant_id: None,
|
||||
allowed_numbers: vec![],
|
||||
client: reqwest::Client::new(),
|
||||
};
|
||||
assert!(!ch.is_number_allowed("+1234567890"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_build_target_with_tenant() {
|
||||
let ch = WatiChannel {
|
||||
api_token: "tok".into(),
|
||||
api_url: "https://live-mt-server.wati.io".into(),
|
||||
tenant_id: Some("tenant1".into()),
|
||||
allowed_numbers: vec![],
|
||||
client: reqwest::Client::new(),
|
||||
};
|
||||
assert_eq!(ch.build_target("+1234567890"), "tenant1:1234567890");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_build_target_without_tenant() {
|
||||
let ch = make_channel();
|
||||
assert_eq!(ch.build_target("+1234567890"), "1234567890");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_build_target_already_prefixed() {
|
||||
let ch = WatiChannel {
|
||||
api_token: "tok".into(),
|
||||
api_url: "https://live-mt-server.wati.io".into(),
|
||||
tenant_id: Some("tenant1".into()),
|
||||
allowed_numbers: vec![],
|
||||
client: reqwest::Client::new(),
|
||||
};
|
||||
// If the phone already has the tenant prefix, don't double it
|
||||
assert_eq!(ch.build_target("tenant1:1234567890"), "tenant1:1234567890");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_parse_valid_message() {
|
||||
let ch = make_channel();
|
||||
let payload = serde_json::json!({
|
||||
"text": "Hello from WATI!",
|
||||
"waId": "1234567890",
|
||||
"fromMe": false,
|
||||
"timestamp": 1705320000u64
|
||||
});
|
||||
|
||||
let msgs = ch.parse_webhook_payload(&payload);
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].sender, "+1234567890");
|
||||
assert_eq!(msgs[0].content, "Hello from WATI!");
|
||||
assert_eq!(msgs[0].channel, "wati");
|
||||
assert_eq!(msgs[0].reply_target, "+1234567890");
|
||||
assert_eq!(msgs[0].timestamp, 1705320000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_parse_skip_from_me() {
|
||||
let ch = make_wildcard_channel();
|
||||
let payload = serde_json::json!({
|
||||
"text": "My own message",
|
||||
"waId": "1234567890",
|
||||
"fromMe": true
|
||||
});
|
||||
|
||||
let msgs = ch.parse_webhook_payload(&payload);
|
||||
assert!(msgs.is_empty(), "fromMe messages should be skipped");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_parse_skip_no_text() {
|
||||
let ch = make_wildcard_channel();
|
||||
let payload = serde_json::json!({
|
||||
"waId": "1234567890",
|
||||
"fromMe": false
|
||||
});
|
||||
|
||||
let msgs = ch.parse_webhook_payload(&payload);
|
||||
assert!(msgs.is_empty(), "Messages without text should be skipped");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_parse_alternative_field_names() {
|
||||
let ch = make_wildcard_channel();
|
||||
|
||||
// wa_id instead of waId, message.body instead of text
|
||||
let payload = serde_json::json!({
|
||||
"message": { "body": "Alt field test" },
|
||||
"wa_id": "1234567890",
|
||||
"from_me": false,
|
||||
"timestamp": 1705320000u64
|
||||
});
|
||||
|
||||
let msgs = ch.parse_webhook_payload(&payload);
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].content, "Alt field test");
|
||||
assert_eq!(msgs[0].sender, "+1234567890");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_parse_timestamp_seconds() {
|
||||
let ch = make_wildcard_channel();
|
||||
let payload = serde_json::json!({
|
||||
"text": "Test",
|
||||
"waId": "1234567890",
|
||||
"timestamp": 1705320000u64
|
||||
});
|
||||
|
||||
let msgs = ch.parse_webhook_payload(&payload);
|
||||
assert_eq!(msgs[0].timestamp, 1705320000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_parse_timestamp_milliseconds() {
|
||||
let ch = make_wildcard_channel();
|
||||
let payload = serde_json::json!({
|
||||
"text": "Test",
|
||||
"waId": "1234567890",
|
||||
"timestamp": 1705320000000u64
|
||||
});
|
||||
|
||||
let msgs = ch.parse_webhook_payload(&payload);
|
||||
assert_eq!(msgs[0].timestamp, 1705320000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_parse_timestamp_iso() {
|
||||
let ch = make_wildcard_channel();
|
||||
let payload = serde_json::json!({
|
||||
"text": "Test",
|
||||
"waId": "1234567890",
|
||||
"timestamp": "2025-01-15T12:00:00Z"
|
||||
});
|
||||
|
||||
let msgs = ch.parse_webhook_payload(&payload);
|
||||
assert_eq!(msgs[0].timestamp, 1736942400);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_parse_normalizes_phone() {
|
||||
let ch = WatiChannel {
|
||||
api_token: "tok".into(),
|
||||
api_url: "https://live-mt-server.wati.io".into(),
|
||||
tenant_id: None,
|
||||
allowed_numbers: vec!["+1234567890".into()],
|
||||
client: reqwest::Client::new(),
|
||||
};
|
||||
|
||||
// Phone without + prefix
|
||||
let payload = serde_json::json!({
|
||||
"text": "Hi",
|
||||
"waId": "1234567890",
|
||||
"fromMe": false
|
||||
});
|
||||
|
||||
let msgs = ch.parse_webhook_payload(&payload);
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].sender, "+1234567890");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_parse_empty_payload() {
|
||||
let ch = make_channel();
|
||||
let payload = serde_json::json!({});
|
||||
let msgs = ch.parse_webhook_payload(&payload);
|
||||
assert!(msgs.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_parse_from_field_fallback() {
|
||||
let ch = make_wildcard_channel();
|
||||
// Uses "from" instead of "waId"
|
||||
let payload = serde_json::json!({
|
||||
"text": "Fallback test",
|
||||
"from": "1234567890",
|
||||
"fromMe": false
|
||||
});
|
||||
|
||||
let msgs = ch.parse_webhook_payload(&payload);
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].sender, "+1234567890");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_parse_message_text_fallback() {
|
||||
let ch = make_wildcard_channel();
|
||||
// Uses "message.text" instead of top-level "text"
|
||||
let payload = serde_json::json!({
|
||||
"message": { "text": "Nested text" },
|
||||
"waId": "1234567890",
|
||||
"fromMe": false
|
||||
});
|
||||
|
||||
let msgs = ch.parse_webhook_payload(&payload);
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].content, "Nested text");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wati_parse_owner_field_as_from_me() {
|
||||
let ch = make_wildcard_channel();
|
||||
// Uses "owner" field as fromMe indicator
|
||||
let payload = serde_json::json!({
|
||||
"text": "Test",
|
||||
"waId": "1234567890",
|
||||
"owner": true
|
||||
});
|
||||
|
||||
let msgs = ch.parse_webhook_payload(&payload);
|
||||
assert!(msgs.is_empty(), "owner=true messages should be skipped");
|
||||
}
|
||||
}
|
||||
@ -33,6 +33,7 @@ const SUPPORTED_PROXY_SERVICE_KEYS: &[&str] = &[
|
||||
"channel.signal",
|
||||
"channel.slack",
|
||||
"channel.telegram",
|
||||
"channel.wati",
|
||||
"channel.whatsapp",
|
||||
"tool.browser",
|
||||
"tool.composio",
|
||||
@ -2444,6 +2445,8 @@ pub struct ChannelsConfig {
|
||||
pub whatsapp: Option<WhatsAppConfig>,
|
||||
/// Linq Partner API channel configuration.
|
||||
pub linq: Option<LinqConfig>,
|
||||
/// WATI WhatsApp Business API channel configuration.
|
||||
pub wati: Option<WatiConfig>,
|
||||
/// Nextcloud Talk bot channel configuration.
|
||||
pub nextcloud_talk: Option<NextcloudTalkConfig>,
|
||||
/// Email channel configuration.
|
||||
@ -2511,6 +2514,10 @@ impl ChannelsConfig {
|
||||
Box::new(ConfigWrapper::new(&self.linq)),
|
||||
self.linq.is_some(),
|
||||
),
|
||||
(
|
||||
Box::new(ConfigWrapper::new(&self.wati)),
|
||||
self.wati.is_some(),
|
||||
),
|
||||
(
|
||||
Box::new(ConfigWrapper::new(&self.nextcloud_talk)),
|
||||
self.nextcloud_talk.is_some(),
|
||||
@ -2578,6 +2585,7 @@ impl Default for ChannelsConfig {
|
||||
signal: None,
|
||||
whatsapp: None,
|
||||
linq: None,
|
||||
wati: None,
|
||||
nextcloud_talk: None,
|
||||
email: None,
|
||||
irc: None,
|
||||
@ -2887,6 +2895,35 @@ impl ChannelConfig for LinqConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// WATI WhatsApp Business API channel configuration.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
|
||||
pub struct WatiConfig {
|
||||
/// WATI API token (Bearer auth).
|
||||
pub api_token: String,
|
||||
/// WATI API base URL (default: https://live-mt-server.wati.io).
|
||||
#[serde(default = "default_wati_api_url")]
|
||||
pub api_url: String,
|
||||
/// Tenant ID for multi-channel setups (optional).
|
||||
#[serde(default)]
|
||||
pub tenant_id: Option<String>,
|
||||
/// Allowed phone numbers (E.164 format) or "*" for all.
|
||||
#[serde(default)]
|
||||
pub allowed_numbers: Vec<String>,
|
||||
}
|
||||
|
||||
fn default_wati_api_url() -> String {
|
||||
"https://live-mt-server.wati.io".to_string()
|
||||
}
|
||||
|
||||
impl ChannelConfig for WatiConfig {
|
||||
fn name() -> &'static str {
|
||||
"WATI"
|
||||
}
|
||||
fn desc() -> &'static str {
|
||||
"WhatsApp via WATI Business API"
|
||||
}
|
||||
}
|
||||
|
||||
/// Nextcloud Talk bot configuration (webhook receive + OCS send API).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
|
||||
pub struct NextcloudTalkConfig {
|
||||
@ -4704,6 +4741,7 @@ default_temperature = 0.7
|
||||
signal: None,
|
||||
whatsapp: None,
|
||||
linq: None,
|
||||
wati: None,
|
||||
nextcloud_talk: None,
|
||||
email: None,
|
||||
irc: None,
|
||||
@ -5258,6 +5296,7 @@ allowed_users = ["@ops:matrix.org"]
|
||||
signal: None,
|
||||
whatsapp: None,
|
||||
linq: None,
|
||||
wati: None,
|
||||
nextcloud_talk: None,
|
||||
email: None,
|
||||
irc: None,
|
||||
@ -5471,6 +5510,7 @@ channel_id = "C123"
|
||||
allowed_numbers: vec!["+1".into()],
|
||||
}),
|
||||
linq: None,
|
||||
wati: None,
|
||||
nextcloud_talk: None,
|
||||
email: None,
|
||||
irc: None,
|
||||
|
||||
@ -12,7 +12,7 @@ pub mod sse;
|
||||
pub mod static_files;
|
||||
pub mod ws;
|
||||
|
||||
use crate::channels::{Channel, LinqChannel, NextcloudTalkChannel, SendMessage, WhatsAppChannel};
|
||||
use crate::channels::{Channel, LinqChannel, NextcloudTalkChannel, SendMessage, WatiChannel, WhatsAppChannel};
|
||||
use crate::config::Config;
|
||||
use crate::cost::CostTracker;
|
||||
use crate::memory::{self, Memory, MemoryCategory};
|
||||
@ -64,6 +64,10 @@ fn linq_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
|
||||
format!("linq_{}_{}", msg.sender, msg.id)
|
||||
}
|
||||
|
||||
fn wati_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
|
||||
format!("wati_{}_{}", msg.sender, msg.id)
|
||||
}
|
||||
|
||||
fn nextcloud_talk_memory_key(msg: &crate::channels::traits::ChannelMessage) -> String {
|
||||
format!("nextcloud_talk_{}_{}", msg.sender, msg.id)
|
||||
}
|
||||
@ -295,6 +299,7 @@ pub struct AppState {
|
||||
pub nextcloud_talk: Option<Arc<NextcloudTalkChannel>>,
|
||||
/// Nextcloud Talk webhook secret for signature verification
|
||||
pub nextcloud_talk_webhook_secret: Option<Arc<str>>,
|
||||
pub wati: Option<Arc<WatiChannel>>,
|
||||
/// Observability backend for metrics scraping
|
||||
pub observer: Arc<dyn crate::observability::Observer>,
|
||||
/// Registered tool specs (for web dashboard tools page)
|
||||
@ -474,6 +479,17 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
||||
})
|
||||
.map(Arc::from);
|
||||
|
||||
// WATI channel (if configured)
|
||||
let wati_channel: Option<Arc<WatiChannel>> =
|
||||
config.channels_config.wati.as_ref().map(|wati_cfg| {
|
||||
Arc::new(WatiChannel::new(
|
||||
wati_cfg.api_token.clone(),
|
||||
wati_cfg.api_url.clone(),
|
||||
wati_cfg.tenant_id.clone(),
|
||||
wati_cfg.allowed_numbers.clone(),
|
||||
))
|
||||
});
|
||||
|
||||
// Nextcloud Talk channel (if configured)
|
||||
let nextcloud_talk_channel: Option<Arc<NextcloudTalkChannel>> =
|
||||
config.channels_config.nextcloud_talk.as_ref().map(|nc| {
|
||||
@ -563,6 +579,10 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
||||
if linq_channel.is_some() {
|
||||
println!(" POST /linq — Linq message webhook (iMessage/RCS/SMS)");
|
||||
}
|
||||
if wati_channel.is_some() {
|
||||
println!(" GET /wati — WATI webhook verification");
|
||||
println!(" POST /wati — WATI message webhook");
|
||||
}
|
||||
if nextcloud_talk_channel.is_some() {
|
||||
println!(" POST /nextcloud-talk — Nextcloud Talk bot webhook");
|
||||
}
|
||||
@ -616,6 +636,7 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
||||
linq_signing_secret,
|
||||
nextcloud_talk: nextcloud_talk_channel,
|
||||
nextcloud_talk_webhook_secret,
|
||||
wati: wati_channel,
|
||||
observer: broadcast_observer,
|
||||
tools_registry,
|
||||
cost_tracker,
|
||||
@ -637,6 +658,8 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
||||
.route("/whatsapp", get(handle_whatsapp_verify))
|
||||
.route("/whatsapp", post(handle_whatsapp_message))
|
||||
.route("/linq", post(handle_linq_webhook))
|
||||
.route("/wati", get(handle_wati_verify))
|
||||
.route("/wati", post(handle_wati_webhook))
|
||||
.route("/nextcloud-talk", post(handle_nextcloud_talk_webhook))
|
||||
// ── Web Dashboard API routes ──
|
||||
.route("/api/status", get(api::handle_api_status))
|
||||
@ -1293,6 +1316,101 @@ async fn handle_linq_webhook(
|
||||
(StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
|
||||
}
|
||||
|
||||
/// GET /wati — WATI webhook verification (echoes hub.challenge)
|
||||
async fn handle_wati_verify(
|
||||
State(state): State<AppState>,
|
||||
Query(params): Query<WatiVerifyQuery>,
|
||||
) -> impl IntoResponse {
|
||||
if state.wati.is_none() {
|
||||
return (StatusCode::NOT_FOUND, "WATI not configured".to_string());
|
||||
}
|
||||
|
||||
// WATI may use Meta-style webhook verification; echo the challenge
|
||||
if let Some(challenge) = params.challenge {
|
||||
tracing::info!("WATI webhook verified successfully");
|
||||
return (StatusCode::OK, challenge);
|
||||
}
|
||||
|
||||
(StatusCode::BAD_REQUEST, "Missing hub.challenge".to_string())
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct WatiVerifyQuery {
|
||||
#[serde(rename = "hub.challenge")]
|
||||
pub challenge: Option<String>,
|
||||
}
|
||||
|
||||
/// POST /wati — incoming WATI WhatsApp message webhook
|
||||
async fn handle_wati_webhook(
|
||||
State(state): State<AppState>,
|
||||
body: Bytes,
|
||||
) -> impl IntoResponse {
|
||||
let Some(ref wati) = state.wati else {
|
||||
return (
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(serde_json::json!({"error": "WATI not configured"})),
|
||||
);
|
||||
};
|
||||
|
||||
// Parse JSON body
|
||||
let Ok(payload) = serde_json::from_slice::<serde_json::Value>(&body) else {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({"error": "Invalid JSON payload"})),
|
||||
);
|
||||
};
|
||||
|
||||
// Parse messages from the webhook payload
|
||||
let messages = wati.parse_webhook_payload(&payload);
|
||||
|
||||
if messages.is_empty() {
|
||||
return (StatusCode::OK, Json(serde_json::json!({"status": "ok"})));
|
||||
}
|
||||
|
||||
// Process each message
|
||||
for msg in &messages {
|
||||
tracing::info!(
|
||||
"WATI message from {}: {}",
|
||||
msg.sender,
|
||||
truncate_with_ellipsis(&msg.content, 50)
|
||||
);
|
||||
|
||||
// Auto-save to memory
|
||||
if state.auto_save {
|
||||
let key = wati_memory_key(msg);
|
||||
let _ = state
|
||||
.mem
|
||||
.store(&key, &msg.content, MemoryCategory::Conversation, None)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Call the LLM
|
||||
match run_gateway_chat_with_tools(&state, &msg.content).await {
|
||||
Ok(response) => {
|
||||
// Send reply via WATI
|
||||
if let Err(e) = wati
|
||||
.send(&SendMessage::new(response, &msg.reply_target))
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to send WATI reply: {e}");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("LLM error for WATI message: {e:#}");
|
||||
let _ = wati
|
||||
.send(&SendMessage::new(
|
||||
"Sorry, I couldn't process your message right now.",
|
||||
&msg.reply_target,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Acknowledge the webhook
|
||||
(StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
|
||||
}
|
||||
|
||||
/// POST /nextcloud-talk — incoming message webhook (Nextcloud Talk bot API)
|
||||
async fn handle_nextcloud_talk_webhook(
|
||||
State(state): State<AppState>,
|
||||
@ -1472,6 +1590,7 @@ mod tests {
|
||||
linq_signing_secret: None,
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
@ -1885,6 +2004,7 @@ mod tests {
|
||||
linq_signing_secret: None,
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
@ -1948,6 +2068,7 @@ mod tests {
|
||||
linq_signing_secret: None,
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
@ -2023,6 +2144,7 @@ mod tests {
|
||||
linq_signing_secret: None,
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
@ -2070,6 +2192,7 @@ mod tests {
|
||||
linq_signing_secret: None,
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
@ -2122,6 +2245,7 @@ mod tests {
|
||||
linq_signing_secret: None,
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
@ -2179,6 +2303,7 @@ mod tests {
|
||||
linq_signing_secret: None,
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user