feat(nodes): add secure HMAC-SHA256 node transport layer (#3654)
Add a new `nodes` module with HMAC-SHA256 authenticated transport for secure inter-node communication over standard HTTPS. Includes replay protection via timestamped nonces and constant-time signature comparison. Also adds `NodeTransportConfig` to the config schema and fixes missing `approval_manager` field in four `ChannelRuntimeContext` test constructors that failed compilation on latest master. Original work by @rareba. Rebased on latest master to resolve merge conflicts (SwarmConfig/SwarmStrategy exports, duplicate MCP validation, test constructor fields). Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
982c3069dd
commit
ffe8bcf61f
@ -13,15 +13,15 @@ pub use schema::{
|
||||
GoogleTtsConfig, HardwareConfig, HardwareTransport, HeartbeatConfig, HooksConfig,
|
||||
HttpRequestConfig, IMessageConfig, IdentityConfig, LarkConfig, MatrixConfig, McpConfig,
|
||||
McpServerConfig, McpTransport, MemoryConfig, Microsoft365Config, ModelRouteConfig,
|
||||
MultimodalConfig, NextcloudTalkConfig, NodesConfig, NotionConfig, ObservabilityConfig,
|
||||
OpenAiTtsConfig, OpenVpnTunnelConfig, OtpConfig, OtpMethod, PeripheralBoardConfig,
|
||||
PeripheralsConfig, ProxyConfig, ProxyScope, QdrantConfig, QueryClassificationConfig,
|
||||
ReliabilityConfig, ResourceLimitsConfig, RuntimeConfig, SandboxBackend, SandboxConfig,
|
||||
SchedulerConfig, SecretsConfig, SecurityConfig, SkillsConfig, SkillsPromptInjectionMode,
|
||||
SlackConfig, StorageConfig, StorageProviderConfig, StorageProviderSection, StreamMode,
|
||||
SwarmConfig, SwarmStrategy, TelegramConfig, ToolFilterGroup, ToolFilterGroupMode,
|
||||
TranscriptionConfig, TtsConfig, TunnelConfig, WebFetchConfig, WebSearchConfig, WebhookConfig,
|
||||
WorkspaceConfig,
|
||||
MultimodalConfig, NextcloudTalkConfig, NodeTransportConfig, NodesConfig, NotionConfig,
|
||||
ObservabilityConfig, OpenAiTtsConfig, OpenVpnTunnelConfig, OtpConfig, OtpMethod,
|
||||
PeripheralBoardConfig, PeripheralsConfig, ProxyConfig, ProxyScope, QdrantConfig,
|
||||
QueryClassificationConfig, ReliabilityConfig, ResourceLimitsConfig, RuntimeConfig,
|
||||
SandboxBackend, SandboxConfig, SchedulerConfig, SecretsConfig, SecurityConfig, SkillsConfig,
|
||||
SkillsPromptInjectionMode, SlackConfig, StorageConfig, StorageProviderConfig,
|
||||
StorageProviderSection, StreamMode, SwarmConfig, SwarmStrategy, TelegramConfig,
|
||||
ToolFilterGroup, ToolFilterGroupMode, TranscriptionConfig, TtsConfig, TunnelConfig,
|
||||
WebFetchConfig, WebSearchConfig, WebhookConfig, WorkspaceConfig,
|
||||
};
|
||||
|
||||
pub fn name_and_presence<T: traits::ChannelConfig>(channel: Option<&T>) -> (&'static str, bool) {
|
||||
|
||||
@ -271,6 +271,10 @@ pub struct Config {
|
||||
/// Notion integration configuration (`[notion]`).
|
||||
#[serde(default)]
|
||||
pub notion: NotionConfig,
|
||||
|
||||
/// Secure inter-node transport configuration (`[node_transport]`).
|
||||
#[serde(default)]
|
||||
pub node_transport: NodeTransportConfig,
|
||||
}
|
||||
|
||||
/// Multi-client workspace isolation configuration.
|
||||
@ -1352,6 +1356,67 @@ impl Default for GatewayConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Secure transport configuration for inter-node communication (`[node_transport]`).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
|
||||
pub struct NodeTransportConfig {
|
||||
/// Enable the secure transport layer.
|
||||
#[serde(default = "default_node_transport_enabled")]
|
||||
pub enabled: bool,
|
||||
/// Shared secret for HMAC authentication between nodes.
|
||||
#[serde(default)]
|
||||
pub shared_secret: String,
|
||||
/// Maximum age of signed requests in seconds (replay protection).
|
||||
#[serde(default = "default_max_request_age")]
|
||||
pub max_request_age_secs: i64,
|
||||
/// Require HTTPS for all node communication.
|
||||
#[serde(default = "default_require_https")]
|
||||
pub require_https: bool,
|
||||
/// Allow specific node IPs/CIDRs.
|
||||
#[serde(default)]
|
||||
pub allowed_peers: Vec<String>,
|
||||
/// Path to TLS certificate file.
|
||||
#[serde(default)]
|
||||
pub tls_cert_path: Option<String>,
|
||||
/// Path to TLS private key file.
|
||||
#[serde(default)]
|
||||
pub tls_key_path: Option<String>,
|
||||
/// Require client certificates (mutual TLS).
|
||||
#[serde(default)]
|
||||
pub mutual_tls: bool,
|
||||
/// Maximum number of connections per peer.
|
||||
#[serde(default = "default_connection_pool_size")]
|
||||
pub connection_pool_size: usize,
|
||||
}
|
||||
|
||||
fn default_node_transport_enabled() -> bool {
|
||||
true
|
||||
}
|
||||
fn default_max_request_age() -> i64 {
|
||||
300
|
||||
}
|
||||
fn default_require_https() -> bool {
|
||||
true
|
||||
}
|
||||
fn default_connection_pool_size() -> usize {
|
||||
4
|
||||
}
|
||||
|
||||
impl Default for NodeTransportConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enabled: default_node_transport_enabled(),
|
||||
shared_secret: String::new(),
|
||||
max_request_age_secs: default_max_request_age(),
|
||||
require_https: default_require_https(),
|
||||
allowed_peers: Vec::new(),
|
||||
tls_cert_path: None,
|
||||
tls_key_path: None,
|
||||
mutual_tls: false,
|
||||
connection_pool_size: default_connection_pool_size(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Composio (managed tool surface) ─────────────────────────────
|
||||
|
||||
/// Composio managed OAuth tools integration (`[composio]` section).
|
||||
@ -4647,6 +4712,7 @@ impl Default for Config {
|
||||
nodes: NodesConfig::default(),
|
||||
workspace: WorkspaceConfig::default(),
|
||||
notion: NotionConfig::default(),
|
||||
node_transport: NodeTransportConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -6915,6 +6981,7 @@ default_temperature = 0.7
|
||||
nodes: NodesConfig::default(),
|
||||
workspace: WorkspaceConfig::default(),
|
||||
notion: NotionConfig::default(),
|
||||
node_transport: NodeTransportConfig::default(),
|
||||
};
|
||||
|
||||
let toml_str = toml::to_string_pretty(&config).unwrap();
|
||||
@ -7210,6 +7277,7 @@ tool_dispatcher = "xml"
|
||||
nodes: NodesConfig::default(),
|
||||
workspace: WorkspaceConfig::default(),
|
||||
notion: NotionConfig::default(),
|
||||
node_transport: NodeTransportConfig::default(),
|
||||
};
|
||||
|
||||
config.save().await.unwrap();
|
||||
|
||||
@ -1238,7 +1238,7 @@ async fn handle_whatsapp_message(
|
||||
.await;
|
||||
}
|
||||
|
||||
match run_gateway_chat_with_tools(&state, &msg.content).await {
|
||||
match Box::pin(run_gateway_chat_with_tools(&state, &msg.content)).await {
|
||||
Ok(response) => {
|
||||
// Send reply via WhatsApp
|
||||
if let Err(e) = wa
|
||||
@ -1346,7 +1346,7 @@ async fn handle_linq_webhook(
|
||||
}
|
||||
|
||||
// Call the LLM
|
||||
match run_gateway_chat_with_tools(&state, &msg.content).await {
|
||||
match Box::pin(run_gateway_chat_with_tools(&state, &msg.content)).await {
|
||||
Ok(response) => {
|
||||
// Send reply via Linq
|
||||
if let Err(e) = linq
|
||||
@ -1438,7 +1438,7 @@ async fn handle_wati_webhook(State(state): State<AppState>, body: Bytes) -> impl
|
||||
}
|
||||
|
||||
// Call the LLM
|
||||
match run_gateway_chat_with_tools(&state, &msg.content).await {
|
||||
match Box::pin(run_gateway_chat_with_tools(&state, &msg.content)).await {
|
||||
Ok(response) => {
|
||||
// Send reply via WATI
|
||||
if let Err(e) = wati
|
||||
@ -1542,7 +1542,7 @@ async fn handle_nextcloud_talk_webhook(
|
||||
.await;
|
||||
}
|
||||
|
||||
match run_gateway_chat_with_tools(&state, &msg.content).await {
|
||||
match Box::pin(run_gateway_chat_with_tools(&state, &msg.content)).await {
|
||||
Ok(response) => {
|
||||
if let Err(e) = nextcloud_talk
|
||||
.send(&SendMessage::new(response, &msg.reply_target))
|
||||
|
||||
@ -58,6 +58,7 @@ pub(crate) mod integrations;
|
||||
pub mod memory;
|
||||
pub(crate) mod migration;
|
||||
pub(crate) mod multimodal;
|
||||
pub mod nodes;
|
||||
pub mod observability;
|
||||
pub(crate) mod onboard;
|
||||
pub mod peripherals;
|
||||
|
||||
3
src/nodes/mod.rs
Normal file
3
src/nodes/mod.rs
Normal file
@ -0,0 +1,3 @@
|
||||
pub mod transport;
|
||||
|
||||
pub use transport::NodeTransport;
|
||||
235
src/nodes/transport.rs
Normal file
235
src/nodes/transport.rs
Normal file
@ -0,0 +1,235 @@
|
||||
//! Corporate-friendly secure node transport using standard HTTPS + HMAC-SHA256 authentication.
|
||||
//!
|
||||
//! All inter-node traffic uses plain HTTPS on port 443 — no exotic protocols,
|
||||
//! no custom binary framing, no UDP tunneling. This makes the transport
|
||||
//! compatible with corporate proxies, firewalls, and IT audit expectations.
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use chrono::Utc;
|
||||
use hmac::{Hmac, Mac};
|
||||
use sha2::Sha256;
|
||||
|
||||
type HmacSha256 = Hmac<Sha256>;
|
||||
|
||||
/// Signs a request payload with HMAC-SHA256.
|
||||
///
|
||||
/// Uses `timestamp` + `nonce` alongside the payload to prevent replay attacks.
|
||||
pub fn sign_request(
|
||||
shared_secret: &str,
|
||||
payload: &[u8],
|
||||
timestamp: i64,
|
||||
nonce: &str,
|
||||
) -> Result<String> {
|
||||
let mut mac = HmacSha256::new_from_slice(shared_secret.as_bytes())
|
||||
.map_err(|e| anyhow::anyhow!("HMAC key error: {e}"))?;
|
||||
mac.update(×tamp.to_le_bytes());
|
||||
mac.update(nonce.as_bytes());
|
||||
mac.update(payload);
|
||||
Ok(hex::encode(mac.finalize().into_bytes()))
|
||||
}
|
||||
|
||||
/// Verify a signed request, rejecting stale timestamps for replay protection.
|
||||
pub fn verify_request(
|
||||
shared_secret: &str,
|
||||
payload: &[u8],
|
||||
timestamp: i64,
|
||||
nonce: &str,
|
||||
signature: &str,
|
||||
max_age_secs: i64,
|
||||
) -> Result<bool> {
|
||||
let now = Utc::now().timestamp();
|
||||
if (now - timestamp).abs() > max_age_secs {
|
||||
bail!("Request timestamp too old or too far in future");
|
||||
}
|
||||
|
||||
let expected = sign_request(shared_secret, payload, timestamp, nonce)?;
|
||||
Ok(constant_time_eq(expected.as_bytes(), signature.as_bytes()))
|
||||
}
|
||||
|
||||
/// Constant-time comparison to prevent timing attacks.
|
||||
fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
|
||||
if a.len() != b.len() {
|
||||
return false;
|
||||
}
|
||||
a.iter()
|
||||
.zip(b.iter())
|
||||
.fold(0u8, |acc, (x, y)| acc | (x ^ y))
|
||||
== 0
|
||||
}
|
||||
|
||||
// ── Node transport client ───────────────────────────────────────
|
||||
|
||||
/// Sends authenticated HTTPS requests to peer nodes.
|
||||
///
|
||||
/// Every outgoing request carries three custom headers:
|
||||
/// - `X-ZeroClaw-Timestamp` — unix epoch seconds
|
||||
/// - `X-ZeroClaw-Nonce` — random UUID v4
|
||||
/// - `X-ZeroClaw-Signature` — HMAC-SHA256 hex digest
|
||||
///
|
||||
/// Incoming requests are verified with the same scheme via [`Self::verify_incoming`].
|
||||
pub struct NodeTransport {
|
||||
http: reqwest::Client,
|
||||
shared_secret: String,
|
||||
max_request_age_secs: i64,
|
||||
}
|
||||
|
||||
impl NodeTransport {
|
||||
pub fn new(shared_secret: String) -> Self {
|
||||
Self {
|
||||
http: reqwest::Client::builder()
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.build()
|
||||
.expect("HTTP client build"),
|
||||
shared_secret,
|
||||
max_request_age_secs: 300, // 5 min replay window
|
||||
}
|
||||
}
|
||||
|
||||
/// Send an authenticated request to a peer node.
|
||||
pub async fn send(
|
||||
&self,
|
||||
node_address: &str,
|
||||
endpoint: &str,
|
||||
payload: serde_json::Value,
|
||||
) -> Result<serde_json::Value> {
|
||||
let body = serde_json::to_vec(&payload)?;
|
||||
let timestamp = Utc::now().timestamp();
|
||||
let nonce = uuid::Uuid::new_v4().to_string();
|
||||
let signature = sign_request(&self.shared_secret, &body, timestamp, &nonce)?;
|
||||
|
||||
let url = format!("https://{node_address}/api/node-control/{endpoint}");
|
||||
let resp = self
|
||||
.http
|
||||
.post(&url)
|
||||
.header("X-ZeroClaw-Timestamp", timestamp.to_string())
|
||||
.header("X-ZeroClaw-Nonce", &nonce)
|
||||
.header("X-ZeroClaw-Signature", &signature)
|
||||
.header("Content-Type", "application/json")
|
||||
.body(body)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
bail!(
|
||||
"Node request failed: {} {}",
|
||||
resp.status(),
|
||||
resp.text().await.unwrap_or_default()
|
||||
);
|
||||
}
|
||||
|
||||
Ok(resp.json().await?)
|
||||
}
|
||||
|
||||
/// Verify an incoming request from a peer node.
|
||||
pub fn verify_incoming(
|
||||
&self,
|
||||
payload: &[u8],
|
||||
timestamp_header: &str,
|
||||
nonce_header: &str,
|
||||
signature_header: &str,
|
||||
) -> Result<bool> {
|
||||
let timestamp: i64 = timestamp_header
|
||||
.parse()
|
||||
.map_err(|_| anyhow::anyhow!("Invalid timestamp header"))?;
|
||||
verify_request(
|
||||
&self.shared_secret,
|
||||
payload,
|
||||
timestamp,
|
||||
nonce_header,
|
||||
signature_header,
|
||||
self.max_request_age_secs,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
const TEST_SECRET: &str = "test-shared-secret-key";
|
||||
|
||||
#[test]
|
||||
fn sign_request_deterministic() {
|
||||
let sig1 = sign_request(TEST_SECRET, b"hello", 1_700_000_000, "nonce-1").unwrap();
|
||||
let sig2 = sign_request(TEST_SECRET, b"hello", 1_700_000_000, "nonce-1").unwrap();
|
||||
assert_eq!(sig1, sig2, "Same inputs must produce the same signature");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_request_accepts_valid_signature() {
|
||||
let now = Utc::now().timestamp();
|
||||
let sig = sign_request(TEST_SECRET, b"payload", now, "nonce-a").unwrap();
|
||||
let ok = verify_request(TEST_SECRET, b"payload", now, "nonce-a", &sig, 300).unwrap();
|
||||
assert!(ok, "Valid signature must pass verification");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_request_rejects_tampered_payload() {
|
||||
let now = Utc::now().timestamp();
|
||||
let sig = sign_request(TEST_SECRET, b"original", now, "nonce-b").unwrap();
|
||||
let ok = verify_request(TEST_SECRET, b"tampered", now, "nonce-b", &sig, 300).unwrap();
|
||||
assert!(!ok, "Tampered payload must fail verification");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_request_rejects_expired_timestamp() {
|
||||
let old = Utc::now().timestamp() - 600;
|
||||
let sig = sign_request(TEST_SECRET, b"data", old, "nonce-c").unwrap();
|
||||
let result = verify_request(TEST_SECRET, b"data", old, "nonce-c", &sig, 300);
|
||||
assert!(result.is_err(), "Expired timestamp must be rejected");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_request_rejects_wrong_secret() {
|
||||
let now = Utc::now().timestamp();
|
||||
let sig = sign_request(TEST_SECRET, b"data", now, "nonce-d").unwrap();
|
||||
let ok = verify_request("wrong-secret", b"data", now, "nonce-d", &sig, 300).unwrap();
|
||||
assert!(!ok, "Wrong secret must fail verification");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn constant_time_eq_correctness() {
|
||||
assert!(constant_time_eq(b"abc", b"abc"));
|
||||
assert!(!constant_time_eq(b"abc", b"abd"));
|
||||
assert!(!constant_time_eq(b"abc", b"ab"));
|
||||
assert!(!constant_time_eq(b"", b"a"));
|
||||
assert!(constant_time_eq(b"", b""));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn node_transport_construction() {
|
||||
let transport = NodeTransport::new("secret-key".into());
|
||||
assert_eq!(transport.max_request_age_secs, 300);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn node_transport_verify_incoming_valid() {
|
||||
let transport = NodeTransport::new(TEST_SECRET.into());
|
||||
let now = Utc::now().timestamp();
|
||||
let payload = b"test-body";
|
||||
let nonce = "incoming-nonce";
|
||||
let sig = sign_request(TEST_SECRET, payload, now, nonce).unwrap();
|
||||
|
||||
let ok = transport
|
||||
.verify_incoming(payload, &now.to_string(), nonce, &sig)
|
||||
.unwrap();
|
||||
assert!(ok, "Valid incoming request must pass verification");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn node_transport_verify_incoming_bad_timestamp_header() {
|
||||
let transport = NodeTransport::new(TEST_SECRET.into());
|
||||
let result = transport.verify_incoming(b"body", "not-a-number", "nonce", "sig");
|
||||
assert!(result.is_err(), "Non-numeric timestamp header must error");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sign_request_different_nonce_different_signature() {
|
||||
let sig1 = sign_request(TEST_SECRET, b"data", 1_700_000_000, "nonce-1").unwrap();
|
||||
let sig2 = sign_request(TEST_SECRET, b"data", 1_700_000_000, "nonce-2").unwrap();
|
||||
assert_ne!(
|
||||
sig1, sig2,
|
||||
"Different nonces must produce different signatures"
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -181,6 +181,7 @@ pub async fn run_wizard(force: bool) -> Result<Config> {
|
||||
nodes: crate::config::NodesConfig::default(),
|
||||
workspace: crate::config::WorkspaceConfig::default(),
|
||||
notion: crate::config::NotionConfig::default(),
|
||||
node_transport: crate::config::NodeTransportConfig::default(),
|
||||
};
|
||||
|
||||
println!(
|
||||
@ -542,6 +543,7 @@ async fn run_quick_setup_with_home(
|
||||
nodes: crate::config::NodesConfig::default(),
|
||||
workspace: crate::config::WorkspaceConfig::default(),
|
||||
notion: crate::config::NotionConfig::default(),
|
||||
node_transport: crate::config::NodeTransportConfig::default(),
|
||||
};
|
||||
|
||||
config.save().await?;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user