fix(channels): address critical security bugs in Gmail Pub/Sub push (#4200)
* feat(channels): add Gmail Pub/Sub push notifications for real-time email
Add GmailPushChannel that replaces IMAP polling with Google's Pub/Sub
push notification system for real-time email-driven automation.
- New channel at src/channels/gmail_push.rs implementing the Channel trait
- Registers Gmail watch subscription (POST /gmail/v1/users/me/watch)
with automatic renewal before the 7-day expiry
- Handles incoming Pub/Sub notifications at POST /webhook/gmail
- Fetches new messages via Gmail History API (startHistoryId-based)
- Dispatches email messages to the agent with full metadata
- Sends replies via Gmail messages.send API
- Config: gmail_push.enabled, topic, label_filter, oauth_token,
allowed_senders, webhook_url
- OAuth token encrypted at rest via existing secret store
- Webhook endpoint added to gateway router
- 30+ unit tests covering notification parsing, header extraction,
body decoding, sender allowlist, and config serialization
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix(config): fix pre-existing test compilation errors in schema.rs
- Remove #[cfg(unix)] gate on `use tempfile::TempDir` import since
TempDir is used unconditionally in bootstrap file tests
- Add explicit type annotations on tokio::fs::* calls to resolve
type inference failures (create_dir_all, write, read_to_string)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix(channels): fix extract_body_text_plain test
Gmail API sends base64url without padding. The decode_body function
converted URL-safe chars back to standard base64 but did not restore
the padding, causing STANDARD decoder to fail and falling back to
snippet. Add padding restoration before decoding.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* fix(channels): address critical security bugs in Gmail Pub/Sub push
- Add webhook authentication via shared secret (webhook_secret config
field or GMAIL_PUSH_WEBHOOK_SECRET env var), preventing unauthorized
message injection through the unauthenticated webhook endpoint
- Add 1MB body size limit on webhook endpoint to prevent memory exhaustion
- Fix race condition in handle_notification: hold history_id lock across
the read-fetch-update cycle to prevent duplicate message processing
when concurrent webhook notifications arrive
- Sanitize RFC 2822 headers (To/Subject) to prevent CRLF injection
attacks that could add arbitrary headers to outgoing emails
- Fix extract_email_from_header panic on malformed angle brackets by
using rfind('>') and validating bracket ordering
- Add 30s default HTTP client timeout for all Gmail API calls,
preventing indefinite hangs
- Clone tx sender before message processing loop to avoid holding
the mutex lock across network calls
---------
Co-authored-by: Giulio V <vannini.gv@gmail.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
8e81d44d54
commit
35a5451a17
1146
src/channels/gmail_push.rs
Normal file
1146
src/channels/gmail_push.rs
Normal file
File diff suppressed because it is too large
Load Diff
@ -20,6 +20,7 @@ pub mod cli;
|
||||
pub mod dingtalk;
|
||||
pub mod discord;
|
||||
pub mod email_channel;
|
||||
pub mod gmail_push;
|
||||
pub mod imessage;
|
||||
pub mod irc;
|
||||
#[cfg(feature = "channel-lark")]
|
||||
@ -62,6 +63,7 @@ pub use cli::CliChannel;
|
||||
pub use dingtalk::DingTalkChannel;
|
||||
pub use discord::DiscordChannel;
|
||||
pub use email_channel::EmailChannel;
|
||||
pub use gmail_push::GmailPushChannel;
|
||||
pub use imessage::IMessageChannel;
|
||||
pub use irc::IrcChannel;
|
||||
#[cfg(feature = "channel-lark")]
|
||||
@ -3970,6 +3972,15 @@ fn collect_configured_channels(
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(ref gp_cfg) = config.channels_config.gmail_push {
|
||||
if gp_cfg.enabled {
|
||||
channels.push(ConfiguredChannel {
|
||||
display_name: "Gmail Push",
|
||||
channel: Arc::new(GmailPushChannel::new(gp_cfg.clone())),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref irc) = config.channels_config.irc {
|
||||
channels.push(ConfiguredChannel {
|
||||
display_name: "IRC",
|
||||
|
||||
@ -4944,6 +4944,8 @@ pub struct ChannelsConfig {
|
||||
pub nextcloud_talk: Option<NextcloudTalkConfig>,
|
||||
/// Email channel configuration.
|
||||
pub email: Option<crate::channels::email_channel::EmailConfig>,
|
||||
/// Gmail Pub/Sub push notification channel configuration.
|
||||
pub gmail_push: Option<crate::channels::gmail_push::GmailPushConfig>,
|
||||
/// IRC channel configuration.
|
||||
pub irc: Option<IrcConfig>,
|
||||
/// Lark channel configuration.
|
||||
@ -5053,6 +5055,10 @@ impl ChannelsConfig {
|
||||
Box::new(ConfigWrapper::new(self.email.as_ref())),
|
||||
self.email.is_some(),
|
||||
),
|
||||
(
|
||||
Box::new(ConfigWrapper::new(self.gmail_push.as_ref())),
|
||||
self.gmail_push.is_some(),
|
||||
),
|
||||
(
|
||||
Box::new(ConfigWrapper::new(self.irc.as_ref())),
|
||||
self.irc.is_some()
|
||||
@ -5137,6 +5143,7 @@ impl Default for ChannelsConfig {
|
||||
wati: None,
|
||||
nextcloud_talk: None,
|
||||
email: None,
|
||||
gmail_push: None,
|
||||
irc: None,
|
||||
lark: None,
|
||||
feishu: None,
|
||||
@ -7741,6 +7748,13 @@ impl Config {
|
||||
"config.channels_config.email.password",
|
||||
)?;
|
||||
}
|
||||
if let Some(ref mut gp) = config.channels_config.gmail_push {
|
||||
decrypt_secret(
|
||||
&store,
|
||||
&mut gp.oauth_token,
|
||||
"config.channels_config.gmail_push.oauth_token",
|
||||
)?;
|
||||
}
|
||||
if let Some(ref mut irc) = config.channels_config.irc {
|
||||
decrypt_optional_secret(
|
||||
&store,
|
||||
@ -9172,6 +9186,13 @@ impl Config {
|
||||
"config.channels_config.email.password",
|
||||
)?;
|
||||
}
|
||||
if let Some(ref mut gp) = config_to_save.channels_config.gmail_push {
|
||||
encrypt_secret(
|
||||
&store,
|
||||
&mut gp.oauth_token,
|
||||
"config.channels_config.gmail_push.oauth_token",
|
||||
)?;
|
||||
}
|
||||
if let Some(ref mut irc) = config_to_save.channels_config.irc {
|
||||
encrypt_optional_secret(
|
||||
&store,
|
||||
@ -9810,6 +9831,7 @@ default_temperature = 0.7
|
||||
wati: None,
|
||||
nextcloud_talk: None,
|
||||
email: None,
|
||||
gmail_push: None,
|
||||
irc: None,
|
||||
lark: None,
|
||||
feishu: None,
|
||||
@ -10659,6 +10681,7 @@ allowed_users = ["@ops:matrix.org"]
|
||||
wati: None,
|
||||
nextcloud_talk: None,
|
||||
email: None,
|
||||
gmail_push: None,
|
||||
irc: None,
|
||||
lark: None,
|
||||
feishu: None,
|
||||
@ -10980,6 +11003,7 @@ channel_id = "C123"
|
||||
wati: None,
|
||||
nextcloud_talk: None,
|
||||
email: None,
|
||||
gmail_push: None,
|
||||
irc: None,
|
||||
lark: None,
|
||||
feishu: None,
|
||||
|
||||
@ -1429,6 +1429,7 @@ mod tests {
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
gmail_push: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
|
||||
@ -18,8 +18,8 @@ pub mod static_files;
|
||||
pub mod ws;
|
||||
|
||||
use crate::channels::{
|
||||
session_backend::SessionBackend, session_sqlite::SqliteSessionBackend, Channel, LinqChannel,
|
||||
NextcloudTalkChannel, SendMessage, WatiChannel, WhatsAppChannel,
|
||||
session_backend::SessionBackend, session_sqlite::SqliteSessionBackend, Channel,
|
||||
GmailPushChannel, LinqChannel, NextcloudTalkChannel, SendMessage, WatiChannel, WhatsAppChannel,
|
||||
};
|
||||
use crate::config::Config;
|
||||
use crate::cost::CostTracker;
|
||||
@ -338,6 +338,8 @@ pub struct AppState {
|
||||
/// Nextcloud Talk webhook secret for signature verification
|
||||
pub nextcloud_talk_webhook_secret: Option<Arc<str>>,
|
||||
pub wati: Option<Arc<WatiChannel>>,
|
||||
/// Gmail Pub/Sub push notification channel
|
||||
pub gmail_push: Option<Arc<GmailPushChannel>>,
|
||||
/// Observability backend for metrics scraping
|
||||
pub observer: Arc<dyn crate::observability::Observer>,
|
||||
/// Registered tool specs (for web dashboard tools page)
|
||||
@ -636,6 +638,14 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
||||
})
|
||||
.map(Arc::from);
|
||||
|
||||
// Gmail Push channel (if configured and enabled)
|
||||
let gmail_push_channel: Option<Arc<GmailPushChannel>> = config
|
||||
.channels_config
|
||||
.gmail_push
|
||||
.as_ref()
|
||||
.filter(|gp| gp.enabled)
|
||||
.map(|gp| Arc::new(GmailPushChannel::new(gp.clone())));
|
||||
|
||||
// ── Session persistence for WS chat ─────────────────────
|
||||
let session_backend: Option<Arc<dyn SessionBackend>> = if config.gateway.session_persistence {
|
||||
match SqliteSessionBackend::new(&config.workspace_dir) {
|
||||
@ -807,6 +817,7 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
||||
nextcloud_talk: nextcloud_talk_channel,
|
||||
nextcloud_talk_webhook_secret,
|
||||
wati: wati_channel,
|
||||
gmail_push: gmail_push_channel,
|
||||
observer: broadcast_observer,
|
||||
tools_registry,
|
||||
cost_tracker,
|
||||
@ -842,6 +853,7 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
||||
.route("/wati", get(handle_wati_verify))
|
||||
.route("/wati", post(handle_wati_webhook))
|
||||
.route("/nextcloud-talk", post(handle_nextcloud_talk_webhook))
|
||||
.route("/webhook/gmail", post(handle_gmail_push_webhook))
|
||||
// ── Web Dashboard API routes ──
|
||||
.route("/api/status", get(api::handle_api_status))
|
||||
.route("/api/config", get(api::handle_api_config_get))
|
||||
@ -1834,6 +1846,74 @@ async fn handle_nextcloud_talk_webhook(
|
||||
(StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
|
||||
}
|
||||
|
||||
/// Maximum request body size for the Gmail webhook endpoint (1 MB).
|
||||
/// Google Pub/Sub messages are typically under 10 KB.
|
||||
const GMAIL_WEBHOOK_MAX_BODY: usize = 1024 * 1024;
|
||||
|
||||
/// POST /webhook/gmail — incoming Gmail Pub/Sub push notification
|
||||
async fn handle_gmail_push_webhook(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
body: Bytes,
|
||||
) -> impl IntoResponse {
|
||||
let Some(ref gmail_push) = state.gmail_push else {
|
||||
return (
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(serde_json::json!({"error": "Gmail push not configured"})),
|
||||
);
|
||||
};
|
||||
|
||||
// Enforce body size limit.
|
||||
if body.len() > GMAIL_WEBHOOK_MAX_BODY {
|
||||
return (
|
||||
StatusCode::PAYLOAD_TOO_LARGE,
|
||||
Json(serde_json::json!({"error": "Request body too large"})),
|
||||
);
|
||||
}
|
||||
|
||||
// Authenticate the webhook request using a shared secret.
|
||||
let secret = gmail_push.resolve_webhook_secret();
|
||||
if !secret.is_empty() {
|
||||
let provided = headers
|
||||
.get(axum::http::header::AUTHORIZATION)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|auth| auth.strip_prefix("Bearer "))
|
||||
.unwrap_or("");
|
||||
|
||||
if provided != secret {
|
||||
tracing::warn!("Gmail push webhook: unauthorized request");
|
||||
return (
|
||||
StatusCode::UNAUTHORIZED,
|
||||
Json(serde_json::json!({"error": "Unauthorized"})),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let body_str = String::from_utf8_lossy(&body);
|
||||
let envelope: crate::channels::gmail_push::PubSubEnvelope =
|
||||
match serde_json::from_str(&body_str) {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
tracing::warn!("Gmail push webhook: invalid payload: {e}");
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({"error": "Invalid Pub/Sub envelope"})),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
// Process the notification asynchronously (non-blocking for the webhook response)
|
||||
let channel = Arc::clone(gmail_push);
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = channel.handle_notification(&envelope).await {
|
||||
tracing::error!("Gmail push notification processing failed: {e:#}");
|
||||
}
|
||||
});
|
||||
|
||||
// Acknowledge immediately — Google Pub/Sub requires a 2xx within ~10s
|
||||
(StatusCode::OK, Json(serde_json::json!({"status": "ok"})))
|
||||
}
|
||||
|
||||
// ══════════════════════════════════════════════════════════════════════════════
|
||||
// ADMIN HANDLERS (for CLI management)
|
||||
// ══════════════════════════════════════════════════════════════════════════════
|
||||
@ -2022,6 +2102,7 @@ mod tests {
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
gmail_push: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
@ -2079,6 +2160,7 @@ mod tests {
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
gmail_push: None,
|
||||
observer,
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
@ -2465,6 +2547,7 @@ mod tests {
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
gmail_push: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
@ -2536,6 +2619,7 @@ mod tests {
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
gmail_push: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
@ -2619,6 +2703,7 @@ mod tests {
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
gmail_push: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
@ -2674,6 +2759,7 @@ mod tests {
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
gmail_push: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
@ -2734,6 +2820,7 @@ mod tests {
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
gmail_push: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
@ -2799,6 +2886,7 @@ mod tests {
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
gmail_push: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
@ -2860,6 +2948,7 @@ mod tests {
|
||||
nextcloud_talk: Some(channel),
|
||||
nextcloud_talk_webhook_secret: Some(Arc::from(secret)),
|
||||
wati: None,
|
||||
gmail_push: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
cost_tracker: None,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user