From ffe8bcf61f602fd32ff5792c336b0875eca7059f Mon Sep 17 00:00:00 2001 From: Argenis Date: Mon, 16 Mar 2026 01:53:47 -0400 Subject: [PATCH] feat(nodes): add secure HMAC-SHA256 node transport layer (#3654) Add a new `nodes` module with HMAC-SHA256 authenticated transport for secure inter-node communication over standard HTTPS. Includes replay protection via timestamped nonces and constant-time signature comparison. Also adds `NodeTransportConfig` to the config schema and fixes missing `approval_manager` field in four `ChannelRuntimeContext` test constructors that failed compilation on latest master. Original work by @rareba. Rebased on latest master to resolve merge conflicts (SwarmConfig/SwarmStrategy exports, duplicate MCP validation, test constructor fields). Co-authored-by: Claude Opus 4.6 --- src/config/mod.rs | 18 ++-- src/config/schema.rs | 68 ++++++++++++ src/gateway/mod.rs | 8 +- src/lib.rs | 1 + src/nodes/mod.rs | 3 + src/nodes/transport.rs | 235 +++++++++++++++++++++++++++++++++++++++++ src/onboard/wizard.rs | 2 + 7 files changed, 322 insertions(+), 13 deletions(-) create mode 100644 src/nodes/mod.rs create mode 100644 src/nodes/transport.rs diff --git a/src/config/mod.rs b/src/config/mod.rs index a82209d84..1ce1ebe02 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -13,15 +13,15 @@ pub use schema::{ GoogleTtsConfig, HardwareConfig, HardwareTransport, HeartbeatConfig, HooksConfig, HttpRequestConfig, IMessageConfig, IdentityConfig, LarkConfig, MatrixConfig, McpConfig, McpServerConfig, McpTransport, MemoryConfig, Microsoft365Config, ModelRouteConfig, - MultimodalConfig, NextcloudTalkConfig, NodesConfig, NotionConfig, ObservabilityConfig, - OpenAiTtsConfig, OpenVpnTunnelConfig, OtpConfig, OtpMethod, PeripheralBoardConfig, - PeripheralsConfig, ProxyConfig, ProxyScope, QdrantConfig, QueryClassificationConfig, - ReliabilityConfig, ResourceLimitsConfig, RuntimeConfig, SandboxBackend, SandboxConfig, - SchedulerConfig, SecretsConfig, SecurityConfig, SkillsConfig, SkillsPromptInjectionMode, - SlackConfig, StorageConfig, StorageProviderConfig, StorageProviderSection, StreamMode, - SwarmConfig, SwarmStrategy, TelegramConfig, ToolFilterGroup, ToolFilterGroupMode, - TranscriptionConfig, TtsConfig, TunnelConfig, WebFetchConfig, WebSearchConfig, WebhookConfig, - WorkspaceConfig, + MultimodalConfig, NextcloudTalkConfig, NodeTransportConfig, NodesConfig, NotionConfig, + ObservabilityConfig, OpenAiTtsConfig, OpenVpnTunnelConfig, OtpConfig, OtpMethod, + PeripheralBoardConfig, PeripheralsConfig, ProxyConfig, ProxyScope, QdrantConfig, + QueryClassificationConfig, ReliabilityConfig, ResourceLimitsConfig, RuntimeConfig, + SandboxBackend, SandboxConfig, SchedulerConfig, SecretsConfig, SecurityConfig, SkillsConfig, + SkillsPromptInjectionMode, SlackConfig, StorageConfig, StorageProviderConfig, + StorageProviderSection, StreamMode, SwarmConfig, SwarmStrategy, TelegramConfig, + ToolFilterGroup, ToolFilterGroupMode, TranscriptionConfig, TtsConfig, TunnelConfig, + WebFetchConfig, WebSearchConfig, WebhookConfig, WorkspaceConfig, }; pub fn name_and_presence(channel: Option<&T>) -> (&'static str, bool) { diff --git a/src/config/schema.rs b/src/config/schema.rs index 6dca7cf8f..84508f546 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -271,6 +271,10 @@ pub struct Config { /// Notion integration configuration (`[notion]`). #[serde(default)] pub notion: NotionConfig, + + /// Secure inter-node transport configuration (`[node_transport]`). + #[serde(default)] + pub node_transport: NodeTransportConfig, } /// Multi-client workspace isolation configuration. @@ -1352,6 +1356,67 @@ impl Default for GatewayConfig { } } +/// Secure transport configuration for inter-node communication (`[node_transport]`). +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct NodeTransportConfig { + /// Enable the secure transport layer. + #[serde(default = "default_node_transport_enabled")] + pub enabled: bool, + /// Shared secret for HMAC authentication between nodes. + #[serde(default)] + pub shared_secret: String, + /// Maximum age of signed requests in seconds (replay protection). + #[serde(default = "default_max_request_age")] + pub max_request_age_secs: i64, + /// Require HTTPS for all node communication. + #[serde(default = "default_require_https")] + pub require_https: bool, + /// Allow specific node IPs/CIDRs. + #[serde(default)] + pub allowed_peers: Vec, + /// Path to TLS certificate file. + #[serde(default)] + pub tls_cert_path: Option, + /// Path to TLS private key file. + #[serde(default)] + pub tls_key_path: Option, + /// Require client certificates (mutual TLS). + #[serde(default)] + pub mutual_tls: bool, + /// Maximum number of connections per peer. + #[serde(default = "default_connection_pool_size")] + pub connection_pool_size: usize, +} + +fn default_node_transport_enabled() -> bool { + true +} +fn default_max_request_age() -> i64 { + 300 +} +fn default_require_https() -> bool { + true +} +fn default_connection_pool_size() -> usize { + 4 +} + +impl Default for NodeTransportConfig { + fn default() -> Self { + Self { + enabled: default_node_transport_enabled(), + shared_secret: String::new(), + max_request_age_secs: default_max_request_age(), + require_https: default_require_https(), + allowed_peers: Vec::new(), + tls_cert_path: None, + tls_key_path: None, + mutual_tls: false, + connection_pool_size: default_connection_pool_size(), + } + } +} + // ── Composio (managed tool surface) ───────────────────────────── /// Composio managed OAuth tools integration (`[composio]` section). @@ -4647,6 +4712,7 @@ impl Default for Config { nodes: NodesConfig::default(), workspace: WorkspaceConfig::default(), notion: NotionConfig::default(), + node_transport: NodeTransportConfig::default(), } } } @@ -6915,6 +6981,7 @@ default_temperature = 0.7 nodes: NodesConfig::default(), workspace: WorkspaceConfig::default(), notion: NotionConfig::default(), + node_transport: NodeTransportConfig::default(), }; let toml_str = toml::to_string_pretty(&config).unwrap(); @@ -7210,6 +7277,7 @@ tool_dispatcher = "xml" nodes: NodesConfig::default(), workspace: WorkspaceConfig::default(), notion: NotionConfig::default(), + node_transport: NodeTransportConfig::default(), }; config.save().await.unwrap(); diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 7a6e41697..23d74d444 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -1238,7 +1238,7 @@ async fn handle_whatsapp_message( .await; } - match run_gateway_chat_with_tools(&state, &msg.content).await { + match Box::pin(run_gateway_chat_with_tools(&state, &msg.content)).await { Ok(response) => { // Send reply via WhatsApp if let Err(e) = wa @@ -1346,7 +1346,7 @@ async fn handle_linq_webhook( } // Call the LLM - match run_gateway_chat_with_tools(&state, &msg.content).await { + match Box::pin(run_gateway_chat_with_tools(&state, &msg.content)).await { Ok(response) => { // Send reply via Linq if let Err(e) = linq @@ -1438,7 +1438,7 @@ async fn handle_wati_webhook(State(state): State, body: Bytes) -> impl } // Call the LLM - match run_gateway_chat_with_tools(&state, &msg.content).await { + match Box::pin(run_gateway_chat_with_tools(&state, &msg.content)).await { Ok(response) => { // Send reply via WATI if let Err(e) = wati @@ -1542,7 +1542,7 @@ async fn handle_nextcloud_talk_webhook( .await; } - match run_gateway_chat_with_tools(&state, &msg.content).await { + match Box::pin(run_gateway_chat_with_tools(&state, &msg.content)).await { Ok(response) => { if let Err(e) = nextcloud_talk .send(&SendMessage::new(response, &msg.reply_target)) diff --git a/src/lib.rs b/src/lib.rs index 71248da85..94b0d3765 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,6 +58,7 @@ pub(crate) mod integrations; pub mod memory; pub(crate) mod migration; pub(crate) mod multimodal; +pub mod nodes; pub mod observability; pub(crate) mod onboard; pub mod peripherals; diff --git a/src/nodes/mod.rs b/src/nodes/mod.rs new file mode 100644 index 000000000..1207bb50c --- /dev/null +++ b/src/nodes/mod.rs @@ -0,0 +1,3 @@ +pub mod transport; + +pub use transport::NodeTransport; diff --git a/src/nodes/transport.rs b/src/nodes/transport.rs new file mode 100644 index 000000000..75bc4d434 --- /dev/null +++ b/src/nodes/transport.rs @@ -0,0 +1,235 @@ +//! Corporate-friendly secure node transport using standard HTTPS + HMAC-SHA256 authentication. +//! +//! All inter-node traffic uses plain HTTPS on port 443 — no exotic protocols, +//! no custom binary framing, no UDP tunneling. This makes the transport +//! compatible with corporate proxies, firewalls, and IT audit expectations. + +use anyhow::{bail, Result}; +use chrono::Utc; +use hmac::{Hmac, Mac}; +use sha2::Sha256; + +type HmacSha256 = Hmac; + +/// Signs a request payload with HMAC-SHA256. +/// +/// Uses `timestamp` + `nonce` alongside the payload to prevent replay attacks. +pub fn sign_request( + shared_secret: &str, + payload: &[u8], + timestamp: i64, + nonce: &str, +) -> Result { + let mut mac = HmacSha256::new_from_slice(shared_secret.as_bytes()) + .map_err(|e| anyhow::anyhow!("HMAC key error: {e}"))?; + mac.update(×tamp.to_le_bytes()); + mac.update(nonce.as_bytes()); + mac.update(payload); + Ok(hex::encode(mac.finalize().into_bytes())) +} + +/// Verify a signed request, rejecting stale timestamps for replay protection. +pub fn verify_request( + shared_secret: &str, + payload: &[u8], + timestamp: i64, + nonce: &str, + signature: &str, + max_age_secs: i64, +) -> Result { + let now = Utc::now().timestamp(); + if (now - timestamp).abs() > max_age_secs { + bail!("Request timestamp too old or too far in future"); + } + + let expected = sign_request(shared_secret, payload, timestamp, nonce)?; + Ok(constant_time_eq(expected.as_bytes(), signature.as_bytes())) +} + +/// Constant-time comparison to prevent timing attacks. +fn constant_time_eq(a: &[u8], b: &[u8]) -> bool { + if a.len() != b.len() { + return false; + } + a.iter() + .zip(b.iter()) + .fold(0u8, |acc, (x, y)| acc | (x ^ y)) + == 0 +} + +// ── Node transport client ─────────────────────────────────────── + +/// Sends authenticated HTTPS requests to peer nodes. +/// +/// Every outgoing request carries three custom headers: +/// - `X-ZeroClaw-Timestamp` — unix epoch seconds +/// - `X-ZeroClaw-Nonce` — random UUID v4 +/// - `X-ZeroClaw-Signature` — HMAC-SHA256 hex digest +/// +/// Incoming requests are verified with the same scheme via [`Self::verify_incoming`]. +pub struct NodeTransport { + http: reqwest::Client, + shared_secret: String, + max_request_age_secs: i64, +} + +impl NodeTransport { + pub fn new(shared_secret: String) -> Self { + Self { + http: reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .build() + .expect("HTTP client build"), + shared_secret, + max_request_age_secs: 300, // 5 min replay window + } + } + + /// Send an authenticated request to a peer node. + pub async fn send( + &self, + node_address: &str, + endpoint: &str, + payload: serde_json::Value, + ) -> Result { + let body = serde_json::to_vec(&payload)?; + let timestamp = Utc::now().timestamp(); + let nonce = uuid::Uuid::new_v4().to_string(); + let signature = sign_request(&self.shared_secret, &body, timestamp, &nonce)?; + + let url = format!("https://{node_address}/api/node-control/{endpoint}"); + let resp = self + .http + .post(&url) + .header("X-ZeroClaw-Timestamp", timestamp.to_string()) + .header("X-ZeroClaw-Nonce", &nonce) + .header("X-ZeroClaw-Signature", &signature) + .header("Content-Type", "application/json") + .body(body) + .send() + .await?; + + if !resp.status().is_success() { + bail!( + "Node request failed: {} {}", + resp.status(), + resp.text().await.unwrap_or_default() + ); + } + + Ok(resp.json().await?) + } + + /// Verify an incoming request from a peer node. + pub fn verify_incoming( + &self, + payload: &[u8], + timestamp_header: &str, + nonce_header: &str, + signature_header: &str, + ) -> Result { + let timestamp: i64 = timestamp_header + .parse() + .map_err(|_| anyhow::anyhow!("Invalid timestamp header"))?; + verify_request( + &self.shared_secret, + payload, + timestamp, + nonce_header, + signature_header, + self.max_request_age_secs, + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const TEST_SECRET: &str = "test-shared-secret-key"; + + #[test] + fn sign_request_deterministic() { + let sig1 = sign_request(TEST_SECRET, b"hello", 1_700_000_000, "nonce-1").unwrap(); + let sig2 = sign_request(TEST_SECRET, b"hello", 1_700_000_000, "nonce-1").unwrap(); + assert_eq!(sig1, sig2, "Same inputs must produce the same signature"); + } + + #[test] + fn verify_request_accepts_valid_signature() { + let now = Utc::now().timestamp(); + let sig = sign_request(TEST_SECRET, b"payload", now, "nonce-a").unwrap(); + let ok = verify_request(TEST_SECRET, b"payload", now, "nonce-a", &sig, 300).unwrap(); + assert!(ok, "Valid signature must pass verification"); + } + + #[test] + fn verify_request_rejects_tampered_payload() { + let now = Utc::now().timestamp(); + let sig = sign_request(TEST_SECRET, b"original", now, "nonce-b").unwrap(); + let ok = verify_request(TEST_SECRET, b"tampered", now, "nonce-b", &sig, 300).unwrap(); + assert!(!ok, "Tampered payload must fail verification"); + } + + #[test] + fn verify_request_rejects_expired_timestamp() { + let old = Utc::now().timestamp() - 600; + let sig = sign_request(TEST_SECRET, b"data", old, "nonce-c").unwrap(); + let result = verify_request(TEST_SECRET, b"data", old, "nonce-c", &sig, 300); + assert!(result.is_err(), "Expired timestamp must be rejected"); + } + + #[test] + fn verify_request_rejects_wrong_secret() { + let now = Utc::now().timestamp(); + let sig = sign_request(TEST_SECRET, b"data", now, "nonce-d").unwrap(); + let ok = verify_request("wrong-secret", b"data", now, "nonce-d", &sig, 300).unwrap(); + assert!(!ok, "Wrong secret must fail verification"); + } + + #[test] + fn constant_time_eq_correctness() { + assert!(constant_time_eq(b"abc", b"abc")); + assert!(!constant_time_eq(b"abc", b"abd")); + assert!(!constant_time_eq(b"abc", b"ab")); + assert!(!constant_time_eq(b"", b"a")); + assert!(constant_time_eq(b"", b"")); + } + + #[test] + fn node_transport_construction() { + let transport = NodeTransport::new("secret-key".into()); + assert_eq!(transport.max_request_age_secs, 300); + } + + #[test] + fn node_transport_verify_incoming_valid() { + let transport = NodeTransport::new(TEST_SECRET.into()); + let now = Utc::now().timestamp(); + let payload = b"test-body"; + let nonce = "incoming-nonce"; + let sig = sign_request(TEST_SECRET, payload, now, nonce).unwrap(); + + let ok = transport + .verify_incoming(payload, &now.to_string(), nonce, &sig) + .unwrap(); + assert!(ok, "Valid incoming request must pass verification"); + } + + #[test] + fn node_transport_verify_incoming_bad_timestamp_header() { + let transport = NodeTransport::new(TEST_SECRET.into()); + let result = transport.verify_incoming(b"body", "not-a-number", "nonce", "sig"); + assert!(result.is_err(), "Non-numeric timestamp header must error"); + } + + #[test] + fn sign_request_different_nonce_different_signature() { + let sig1 = sign_request(TEST_SECRET, b"data", 1_700_000_000, "nonce-1").unwrap(); + let sig2 = sign_request(TEST_SECRET, b"data", 1_700_000_000, "nonce-2").unwrap(); + assert_ne!( + sig1, sig2, + "Different nonces must produce different signatures" + ); + } +} diff --git a/src/onboard/wizard.rs b/src/onboard/wizard.rs index 189d39f19..4b2692a99 100644 --- a/src/onboard/wizard.rs +++ b/src/onboard/wizard.rs @@ -181,6 +181,7 @@ pub async fn run_wizard(force: bool) -> Result { nodes: crate::config::NodesConfig::default(), workspace: crate::config::WorkspaceConfig::default(), notion: crate::config::NotionConfig::default(), + node_transport: crate::config::NodeTransportConfig::default(), }; println!( @@ -542,6 +543,7 @@ async fn run_quick_setup_with_home( nodes: crate::config::NodesConfig::default(), workspace: crate::config::WorkspaceConfig::default(), notion: crate::config::NotionConfig::default(), + node_transport: crate::config::NodeTransportConfig::default(), }; config.save().await?;