diff --git a/src/channels/whatsapp_web.rs b/src/channels/whatsapp_web.rs index a16ba4338..ad1b4df35 100644 --- a/src/channels/whatsapp_web.rs +++ b/src/channels/whatsapp_web.rs @@ -233,6 +233,48 @@ impl WhatsAppWebChannel { Ok(wa_rs_binary::jid::Jid::pn(digits)) } + + // ── Reconnect state-machine helpers (used by listen() and tested directly) ── + + /// Reconnect retry constants. + const MAX_RETRIES: u32 = 10; + const BASE_DELAY_SECS: u64 = 3; + const MAX_DELAY_SECS: u64 = 300; + + /// Compute the exponential-backoff delay for a given 1-based attempt number. + /// Doubles each attempt from `BASE_DELAY_SECS`, capped at `MAX_DELAY_SECS`. + fn compute_retry_delay(attempt: u32) -> u64 { + std::cmp::min( + Self::BASE_DELAY_SECS.saturating_mul(2u64.saturating_pow(attempt.saturating_sub(1))), + Self::MAX_DELAY_SECS, + ) + } + + /// Determine whether session files should be purged. + /// Returns `true` only when `Event::LoggedOut` was explicitly observed. + fn should_purge_session(session_revoked: &std::sync::atomic::AtomicBool) -> bool { + session_revoked.load(std::sync::atomic::Ordering::Relaxed) + } + + /// Record a reconnect attempt and return `(attempt_number, exceeded_max)`. + fn record_retry(retry_count: &std::sync::atomic::AtomicU32) -> (u32, bool) { + let attempts = retry_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; + (attempts, attempts > Self::MAX_RETRIES) + } + + /// Reset the retry counter (called on `Event::Connected`). + fn reset_retry(retry_count: &std::sync::atomic::AtomicU32) { + retry_count.store(0, std::sync::atomic::Ordering::Relaxed); + } + + /// Return the session file paths to remove (primary + WAL + SHM sidecars). + fn session_file_paths(expanded_session_path: &str) -> [String; 3] { + [ + expanded_session_path.to_string(), + format!("{expanded_session_path}-wal"), + format!("{expanded_session_path}-shm"), + ] + } } #[cfg(feature = "whatsapp-web")] @@ -288,198 +330,287 @@ impl Channel for WhatsAppWebChannel { use wa_rs_tokio_transport::TokioWebSocketTransportFactory; use wa_rs_ureq_http::UreqHttpClient; - tracing::info!( - "WhatsApp Web channel starting (session: {})", - self.session_path - ); + let retry_count = Arc::new(std::sync::atomic::AtomicU32::new(0)); - // Initialize storage backend - let storage = RusqliteStore::new(&self.session_path)?; - let backend = Arc::new(storage); + loop { + let expanded_session_path = shellexpand::tilde(&self.session_path).to_string(); - // Check if we have a saved device to load - let mut device = Device::new(backend.clone()); - if backend.exists().await? { - tracing::info!("WhatsApp Web: found existing session, loading device"); - if let Some(core_device) = backend.load().await? { - device.load_from_serializable(core_device); - } else { - anyhow::bail!("Device exists but failed to load"); - } - } else { tracing::info!( - "WhatsApp Web: no existing session, new device will be created during pairing" + "WhatsApp Web channel starting (session: {})", + expanded_session_path ); - }; - // Create transport factory - let mut transport_factory = TokioWebSocketTransportFactory::new(); - if let Ok(ws_url) = std::env::var("WHATSAPP_WS_URL") { - transport_factory = transport_factory.with_url(ws_url); - } + // Initialize storage backend + let storage = RusqliteStore::new(&expanded_session_path)?; + let backend = Arc::new(storage); - // Create HTTP client for media operations - let http_client = UreqHttpClient::new(); - - // Build the bot - let tx_clone = tx.clone(); - let allowed_numbers = self.allowed_numbers.clone(); - - let mut builder = Bot::builder() - .with_backend(backend) - .with_transport_factory(transport_factory) - .with_http_client(http_client) - .on_event(move |event, _client| { - let tx_inner = tx_clone.clone(); - let allowed_numbers = allowed_numbers.clone(); - async move { - match event { - Event::Message(msg, info) => { - // Extract message content - let text = msg.text_content().unwrap_or(""); - let sender_jid = info.source.sender.clone(); - let sender_alt = info.source.sender_alt.clone(); - let sender = sender_jid.user().to_string(); - let chat = info.source.chat.to_string(); - - tracing::info!( - "WhatsApp Web message from {} in {}: {}", - sender, - chat, - text - ); - - let mapped_phone = if sender_jid.is_lid() { - _client.get_phone_number_from_lid(&sender_jid.user).await - } else { - None - }; - let sender_candidates = Self::sender_phone_candidates( - &sender_jid, - sender_alt.as_ref(), - mapped_phone.as_deref(), - ); - - if let Some(normalized) = sender_candidates - .iter() - .find(|candidate| { - Self::is_number_allowed_for_list(&allowed_numbers, candidate) - }) - .cloned() - { - let trimmed = text.trim(); - if trimmed.is_empty() { - tracing::debug!( - "WhatsApp Web: ignoring empty or non-text message from {}", - normalized - ); - return; - } - - if let Err(e) = tx_inner - .send(ChannelMessage { - id: uuid::Uuid::new_v4().to_string(), - channel: "whatsapp".to_string(), - sender: normalized.clone(), - // Reply to the originating chat JID (DM or group). - reply_target: chat, - content: trimmed.to_string(), - timestamp: chrono::Utc::now().timestamp() as u64, - thread_ts: None, - }) - .await - { - tracing::error!("Failed to send message to channel: {}", e); - } - } else { - tracing::warn!( - "WhatsApp Web: message from {} not in allowed list (candidates: {:?})", - sender_jid, - sender_candidates - ); - } - } - Event::Connected(_) => { - tracing::info!("WhatsApp Web connected successfully"); - } - Event::LoggedOut(_) => { - tracing::warn!("WhatsApp Web was logged out"); - } - Event::StreamError(stream_error) => { - tracing::error!("WhatsApp Web stream error: {:?}", stream_error); - } - Event::PairingCode { code, .. } => { - tracing::info!("WhatsApp Web pair code received: {}", code); - tracing::info!( - "Link your phone by entering this code in WhatsApp > Linked Devices" - ); - } - Event::PairingQrCode { code, .. } => { - tracing::info!( - "WhatsApp Web QR code received (scan with WhatsApp > Linked Devices)" - ); - match Self::render_pairing_qr(&code) { - Ok(rendered) => { - eprintln!(); - eprintln!( - "WhatsApp Web QR code (scan in WhatsApp > Linked Devices):" - ); - eprintln!("{rendered}"); - eprintln!(); - } - Err(err) => { - tracing::warn!( - "WhatsApp Web: failed to render pairing QR in terminal: {}", - err - ); - tracing::info!("WhatsApp Web QR payload: {}", code); - } - } - } - _ => {} - } + // Check if we have a saved device to load + let mut device = Device::new(backend.clone()); + if backend.exists().await? { + tracing::info!("WhatsApp Web: found existing session, loading device"); + if let Some(core_device) = backend.load().await? { + device.load_from_serializable(core_device); + } else { + anyhow::bail!("Device exists but failed to load"); } - }) - ; + } else { + tracing::info!( + "WhatsApp Web: no existing session, new device will be created during pairing" + ); + }; - // Configure pair-code flow when a phone number is provided. - if let Some(ref phone) = self.pair_phone { - tracing::info!("WhatsApp Web: pair-code flow enabled for configured phone number"); - builder = builder.with_pair_code(PairCodeOptions { - phone_number: phone.clone(), - custom_code: self.pair_code.clone(), - ..Default::default() - }); - } else if self.pair_code.is_some() { - tracing::warn!( - "WhatsApp Web: pair_code is set but pair_phone is missing; pair code config is ignored" - ); - } - - let mut bot = builder.build().await?; - *self.client.lock() = Some(bot.client()); - - // Run the bot - let bot_handle = bot.run().await?; - - // Store the bot handle for later shutdown - *self.bot_handle.lock() = Some(bot_handle); - - // Wait for shutdown signal - let (_shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel::<()>(1); - - select! { - _ = shutdown_rx.recv() => { - tracing::info!("WhatsApp Web channel shutting down"); + // Create transport factory + let mut transport_factory = TokioWebSocketTransportFactory::new(); + if let Ok(ws_url) = std::env::var("WHATSAPP_WS_URL") { + transport_factory = transport_factory.with_url(ws_url); } - _ = tokio::signal::ctrl_c() => { - tracing::info!("WhatsApp Web channel received Ctrl+C"); - } - } - *self.client.lock() = None; - if let Some(handle) = self.bot_handle.lock().take() { - handle.abort(); + // Create HTTP client for media operations + let http_client = UreqHttpClient::new(); + + // Channel to signal logout from the event handler back to the listen loop. + let (logout_tx, mut logout_rx) = tokio::sync::broadcast::channel::<()>(1); + + // Tracks whether Event::LoggedOut actually fired (vs task crash). + let session_revoked = Arc::new(std::sync::atomic::AtomicBool::new(false)); + + // Build the bot + let tx_clone = tx.clone(); + let allowed_numbers = self.allowed_numbers.clone(); + let logout_tx_clone = logout_tx.clone(); + let retry_count_clone = retry_count.clone(); + let session_revoked_clone = session_revoked.clone(); + + let mut builder = Bot::builder() + .with_backend(backend) + .with_transport_factory(transport_factory) + .with_http_client(http_client) + .on_event(move |event, _client| { + let tx_inner = tx_clone.clone(); + let allowed_numbers = allowed_numbers.clone(); + let logout_tx = logout_tx_clone.clone(); + let retry_count = retry_count_clone.clone(); + let session_revoked = session_revoked_clone.clone(); + async move { + match event { + Event::Message(msg, info) => { + // Extract message content + let text = msg.text_content().unwrap_or(""); + let sender_jid = info.source.sender.clone(); + let sender_alt = info.source.sender_alt.clone(); + let sender = sender_jid.user().to_string(); + let chat = info.source.chat.to_string(); + + tracing::info!( + "WhatsApp Web message received (sender_len={}, chat_len={}, text_len={})", + sender.len(), + chat.len(), + text.len() + ); + tracing::debug!( + "WhatsApp Web message content: {}", + text + ); + + let mapped_phone = if sender_jid.is_lid() { + _client.get_phone_number_from_lid(&sender_jid.user).await + } else { + None + }; + let sender_candidates = Self::sender_phone_candidates( + &sender_jid, + sender_alt.as_ref(), + mapped_phone.as_deref(), + ); + + if let Some(normalized) = sender_candidates + .iter() + .find(|candidate| { + Self::is_number_allowed_for_list(&allowed_numbers, candidate) + }) + .cloned() + { + let trimmed = text.trim(); + if trimmed.is_empty() { + tracing::debug!( + "WhatsApp Web: ignoring empty or non-text message from {}", + normalized + ); + return; + } + + if let Err(e) = tx_inner + .send(ChannelMessage { + id: uuid::Uuid::new_v4().to_string(), + channel: "whatsapp".to_string(), + sender: normalized.clone(), + // Reply to the originating chat JID (DM or group). + reply_target: chat, + content: trimmed.to_string(), + timestamp: chrono::Utc::now().timestamp() as u64, + thread_ts: None, + }) + .await + { + tracing::error!("Failed to send message to channel: {}", e); + } + } else { + tracing::warn!( + "WhatsApp Web: message from unrecognized sender not in allowed list (candidates_count={})", + sender_candidates.len() + ); + } + } + Event::Connected(_) => { + tracing::info!("WhatsApp Web connected successfully"); + WhatsAppWebChannel::reset_retry(&retry_count); + } + Event::LoggedOut(_) => { + session_revoked.store(true, std::sync::atomic::Ordering::Relaxed); + tracing::warn!( + "WhatsApp Web was logged out — will clear session and reconnect" + ); + let _ = logout_tx.send(()); + } + Event::StreamError(stream_error) => { + tracing::error!("WhatsApp Web stream error: {:?}", stream_error); + } + Event::PairingCode { code, .. } => { + tracing::info!("WhatsApp Web pair code received"); + tracing::info!( + "Link your phone by entering this code in WhatsApp > Linked Devices" + ); + eprintln!(); + eprintln!("WhatsApp Web pair code: {code}"); + eprintln!(); + } + Event::PairingQrCode { code, .. } => { + tracing::info!( + "WhatsApp Web QR code received (scan with WhatsApp > Linked Devices)" + ); + match Self::render_pairing_qr(&code) { + Ok(rendered) => { + eprintln!(); + eprintln!( + "WhatsApp Web QR code (scan in WhatsApp > Linked Devices):" + ); + eprintln!("{rendered}"); + eprintln!(); + } + Err(err) => { + tracing::warn!( + "WhatsApp Web: failed to render pairing QR in terminal: {}", + err + ); + eprintln!(); + eprintln!("WhatsApp Web QR payload: {code}"); + eprintln!(); + } + } + } + _ => {} + } + } + }); + + // Configure pair-code flow when a phone number is provided. + if let Some(ref phone) = self.pair_phone { + tracing::info!("WhatsApp Web: pair-code flow enabled for configured phone number"); + builder = builder.with_pair_code(PairCodeOptions { + phone_number: phone.clone(), + custom_code: self.pair_code.clone(), + ..Default::default() + }); + } else if self.pair_code.is_some() { + tracing::warn!( + "WhatsApp Web: pair_code is set but pair_phone is missing; pair code config is ignored" + ); + } + + let mut bot = builder.build().await?; + *self.client.lock() = Some(bot.client()); + + // Run the bot + let bot_handle = bot.run().await?; + + // Store the bot handle for later shutdown + *self.bot_handle.lock() = Some(bot_handle); + + // Drop the outer sender so logout_rx.recv() returns Err when the + // bot task ends without emitting LoggedOut (e.g. crash/panic). + drop(logout_tx); + + // Wait for a logout signal or process shutdown. + let should_reconnect = select! { + res = logout_rx.recv() => { + // Both Ok(()) and Err (sender dropped) mean the session ended. + let _ = res; + true + } + _ = tokio::signal::ctrl_c() => { + tracing::info!("WhatsApp Web channel received Ctrl+C"); + false + } + }; + + *self.client.lock() = None; + if let Some(handle) = self.bot_handle.lock().take() { + handle.abort(); + // Await the aborted task so background I/O finishes before + // we delete session files. + let _ = handle.await; + } + + // Drop bot/device so the SQLite connection is closed + // before we remove session files (releases WAL/SHM locks). + // `backend` was moved into the builder, so dropping `bot` + // releases the last Arc reference to the storage backend. + drop(bot); + drop(device); + + if should_reconnect { + let (attempts, exceeded) = Self::record_retry(&retry_count); + if exceeded { + anyhow::bail!( + "WhatsApp Web: exceeded {} reconnect attempts, giving up", + Self::MAX_RETRIES + ); + } + + // Only purge session files when LoggedOut was explicitly observed. + // A transient task crash (Err from recv) should not wipe a valid session. + if Self::should_purge_session(&session_revoked) { + for path in Self::session_file_paths(&expanded_session_path) { + match tokio::fs::remove_file(&path).await { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => tracing::warn!( + "WhatsApp Web: failed to remove session file {}: {e}", + path + ), + } + } + tracing::info!( + "WhatsApp Web: session files removed, restarting for QR pairing" + ); + } else { + tracing::warn!( + "WhatsApp Web: bot stopped without LoggedOut; reconnecting with existing session" + ); + } + + let delay = Self::compute_retry_delay(attempts); + tracing::info!( + "WhatsApp Web: reconnecting in {}s (attempt {}/{})", + delay, + attempts, + Self::MAX_RETRIES + ); + tokio::time::sleep(std::time::Duration::from_secs(delay)).await; + continue; + } + + break; } Ok(()) @@ -720,4 +851,101 @@ mod tests { let ch = make_channel(); assert!(!ch.health_check().await); } + + // ── Reconnect retry state machine tests (exercise production helpers) ── + + #[test] + #[cfg(feature = "whatsapp-web")] + fn compute_retry_delay_doubles_with_cap() { + // Uses the production helper that listen() calls for backoff. + // attempt 1 → 3s, 2 → 6s, 3 → 12s, … 7 → 192s, 8 → 300s (capped) + let expected = [3, 6, 12, 24, 48, 96, 192, 300, 300, 300]; + for (i, &want) in expected.iter().enumerate() { + let attempt = (i + 1) as u32; + assert_eq!( + WhatsAppWebChannel::compute_retry_delay(attempt), + want, + "attempt {attempt}" + ); + } + } + + #[test] + #[cfg(feature = "whatsapp-web")] + fn compute_retry_delay_zero_attempt() { + // Edge case: attempt 0 should still produce BASE (saturating_sub clamps). + assert_eq!( + WhatsAppWebChannel::compute_retry_delay(0), + WhatsAppWebChannel::BASE_DELAY_SECS + ); + } + + #[test] + #[cfg(feature = "whatsapp-web")] + fn record_retry_increments_and_detects_exceeded() { + use std::sync::atomic::AtomicU32; + let counter = AtomicU32::new(0); + + // First MAX_RETRIES attempts should not exceed. + for i in 1..=WhatsAppWebChannel::MAX_RETRIES { + let (attempt, exceeded) = WhatsAppWebChannel::record_retry(&counter); + assert_eq!(attempt, i); + assert!(!exceeded, "attempt {i} should not exceed max"); + } + + // Next attempt exceeds the limit. + let (attempt, exceeded) = WhatsAppWebChannel::record_retry(&counter); + assert_eq!(attempt, WhatsAppWebChannel::MAX_RETRIES + 1); + assert!(exceeded); + } + + #[test] + #[cfg(feature = "whatsapp-web")] + fn reset_retry_clears_counter() { + use std::sync::atomic::{AtomicU32, Ordering}; + let counter = AtomicU32::new(0); + + // Simulate several reconnect attempts via the production helper. + for _ in 0..5 { + WhatsAppWebChannel::record_retry(&counter); + } + assert_eq!(counter.load(Ordering::Relaxed), 5); + + // Event::Connected calls reset_retry — verify it zeroes the counter. + WhatsAppWebChannel::reset_retry(&counter); + assert_eq!(counter.load(Ordering::Relaxed), 0); + + // After reset, record_retry starts from 1 again. + let (attempt, exceeded) = WhatsAppWebChannel::record_retry(&counter); + assert_eq!(attempt, 1); + assert!(!exceeded); + } + + #[test] + #[cfg(feature = "whatsapp-web")] + fn should_purge_session_only_when_revoked() { + use std::sync::atomic::AtomicBool; + let flag = AtomicBool::new(false); + + // Transient crash: flag is false → should NOT purge. + assert!(!WhatsAppWebChannel::should_purge_session(&flag)); + + // Explicit LoggedOut: flag set to true → should purge. + flag.store(true, std::sync::atomic::Ordering::Relaxed); + assert!(WhatsAppWebChannel::should_purge_session(&flag)); + } + + #[test] + #[cfg(feature = "whatsapp-web")] + fn session_file_paths_includes_wal_and_shm() { + let paths = WhatsAppWebChannel::session_file_paths("/tmp/test.db"); + assert_eq!( + paths, + [ + "/tmp/test.db".to_string(), + "/tmp/test.db-wal".to_string(), + "/tmp/test.db-shm".to_string(), + ] + ); + } }