diff --git a/docs/config-reference.md b/docs/config-reference.md index 7885cd03c..dd92bafd5 100644 --- a/docs/config-reference.md +++ b/docs/config-reference.md @@ -326,6 +326,14 @@ Notes: | `require_pairing` | `true` | require pairing before bearer auth | | `allow_public_bind` | `false` | block accidental public exposure | +## `[gateway.node_control]` (experimental) + +| Key | Default | Purpose | +|---|---|---| +| `enabled` | `false` | enable node-control scaffold endpoint (`POST /api/node-control`) | +| `auth_token` | `null` | optional extra shared token checked via `X-Node-Control-Token` | +| `allowed_node_ids` | `[]` | allowlist for `node.describe`/`node.invoke` (`[]` accepts any) | + ## `[autonomy]` | Key | Default | Purpose | diff --git a/docs/i18n/vi/config-reference.md b/docs/i18n/vi/config-reference.md index 3b1b6a14a..e49c8489e 100644 --- a/docs/i18n/vi/config-reference.md +++ b/docs/i18n/vi/config-reference.md @@ -259,6 +259,14 @@ Lưu ý: | `require_pairing` | `true` | Yêu cầu ghép nối trước khi xác thực bearer | | `allow_public_bind` | `false` | Chặn lộ public do vô ý | +## `[gateway.node_control]` (thử nghiệm) + +| Khóa | Mặc định | Mục đích | +|---|---|---| +| `enabled` | `false` | Bật endpoint scaffold node-control (`POST /api/node-control`) | +| `auth_token` | `null` | Shared token bổ sung, kiểm qua header `X-Node-Control-Token` | +| `allowed_node_ids` | `[]` | Allowlist cho `node.describe`/`node.invoke` (`[]` = chấp nhận mọi node) | + ## `[autonomy]` | Khóa | Mặc định | Mục đích | diff --git a/docs/vi/config-reference.md b/docs/vi/config-reference.md index 3b1b6a14a..e49c8489e 100644 --- a/docs/vi/config-reference.md +++ b/docs/vi/config-reference.md @@ -259,6 +259,14 @@ Lưu ý: | `require_pairing` | `true` | Yêu cầu ghép nối trước khi xác thực bearer | | `allow_public_bind` | `false` | Chặn lộ public do vô ý | +## `[gateway.node_control]` (thử nghiệm) + +| Khóa | Mặc định | Mục đích | +|---|---|---| +| `enabled` | `false` | Bật endpoint scaffold node-control (`POST /api/node-control`) | +| `auth_token` | `null` | Shared token bổ sung, kiểm qua header `X-Node-Control-Token` | +| `allowed_node_ids` | `[]` | Allowlist cho `node.describe`/`node.invoke` (`[]` = chấp nhận mọi node) | + ## `[autonomy]` | Khóa | Mặc định | Mục đích | diff --git a/src/config/schema.rs b/src/config/schema.rs index 99b9b2bee..8c6dac16d 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -827,6 +827,38 @@ pub struct GatewayConfig { /// Maximum distinct idempotency keys retained in memory. #[serde(default = "default_gateway_idempotency_max_keys")] pub idempotency_max_keys: usize, + + /// Node-control protocol scaffold (`[gateway.node_control]`). + #[serde(default)] + pub node_control: NodeControlConfig, +} + +/// Node-control scaffold settings under `[gateway.node_control]`. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct NodeControlConfig { + /// Enable experimental node-control API endpoints. + #[serde(default)] + pub enabled: bool, + + /// Optional extra shared token for node-control API calls. + /// When set, clients must send this value in `X-Node-Control-Token`. + #[serde(default)] + pub auth_token: Option, + + /// Allowlist of remote node IDs for `node.describe`/`node.invoke`. + /// Empty means "no explicit allowlist" (accept all IDs). + #[serde(default)] + pub allowed_node_ids: Vec, +} + +impl Default for NodeControlConfig { + fn default() -> Self { + Self { + enabled: false, + auth_token: None, + allowed_node_ids: Vec::new(), + } + } } fn default_gateway_port() -> u16 { @@ -875,6 +907,7 @@ impl Default for GatewayConfig { rate_limit_max_keys: default_gateway_rate_limit_max_keys(), idempotency_ttl_secs: default_idempotency_ttl_secs(), idempotency_max_keys: default_gateway_idempotency_max_keys(), + node_control: NodeControlConfig::default(), } } } @@ -6042,6 +6075,9 @@ channel_id = "C123" assert_eq!(g.rate_limit_max_keys, 10_000); assert_eq!(g.idempotency_ttl_secs, 300); assert_eq!(g.idempotency_max_keys, 10_000); + assert!(!g.node_control.enabled); + assert!(g.node_control.auth_token.is_none()); + assert!(g.node_control.allowed_node_ids.is_empty()); } #[test] @@ -6073,6 +6109,11 @@ channel_id = "C123" rate_limit_max_keys: 2048, idempotency_ttl_secs: 600, idempotency_max_keys: 4096, + node_control: NodeControlConfig { + enabled: true, + auth_token: Some("node-token".into()), + allowed_node_ids: vec!["node-1".into(), "node-2".into()], + }, }; let toml_str = toml::to_string(&g).unwrap(); let parsed: GatewayConfig = toml::from_str(&toml_str).unwrap(); @@ -6085,6 +6126,15 @@ channel_id = "C123" assert_eq!(parsed.rate_limit_max_keys, 2048); assert_eq!(parsed.idempotency_ttl_secs, 600); assert_eq!(parsed.idempotency_max_keys, 4096); + assert!(parsed.node_control.enabled); + assert_eq!( + parsed.node_control.auth_token.as_deref(), + Some("node-token") + ); + assert_eq!( + parsed.node_control.allowed_node_ids, + vec!["node-1", "node-2"] + ); } #[test] @@ -7385,6 +7435,9 @@ default_model = "legacy-model" assert!(!g.trust_forwarded_headers); assert_eq!(g.rate_limit_max_keys, 10_000); assert_eq!(g.idempotency_max_keys, 10_000); + assert!(!g.node_control.enabled); + assert!(g.node_control.auth_token.is_none()); + assert!(g.node_control.allowed_node_ids.is_empty()); } // ── Peripherals config ─────────────────────────────────────── diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 6fde83027..3e876053b 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -624,6 +624,9 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { if qq_webhook_enabled { println!(" POST /qq — QQ Bot webhook (validation + events)"); } + if config.gateway.node_control.enabled { + println!(" POST /api/node-control — experimental node-control RPC scaffold"); + } println!(" GET /api/* — REST API (bearer token required)"); println!(" GET /ws/chat — WebSocket agent chat"); println!(" GET /health — health check"); @@ -723,6 +726,7 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { .route("/api/cost", get(api::handle_api_cost)) .route("/api/cli-tools", get(api::handle_api_cli_tools)) .route("/api/health", get(api::handle_api_health)) + .route("/api/node-control", post(handle_node_control)) // ── SSE event stream ── .route("/api/events", get(sse::handle_sse_events)) // ── WebSocket agent chat ── @@ -911,6 +915,189 @@ pub struct WebhookBody { pub message: String, } +#[derive(Debug, Clone, serde::Deserialize)] +pub struct NodeControlRequest { + pub method: String, + #[serde(default)] + pub node_id: Option, + #[serde(default)] + pub capability: Option, + #[serde(default)] + pub arguments: serde_json::Value, +} + +fn node_id_allowed(node_id: &str, allowed_node_ids: &[String]) -> bool { + if allowed_node_ids.is_empty() { + return true; + } + + allowed_node_ids + .iter() + .any(|candidate| candidate == "*" || candidate == node_id) +} + +/// POST /api/node-control — experimental node-control protocol scaffold. +/// +/// Supported methods: +/// - `node.list` +/// - `node.describe` +/// - `node.invoke` (stubbed as not implemented) +async fn handle_node_control( + State(state): State, + headers: HeaderMap, + body: Result, axum::extract::rejection::JsonRejection>, +) -> impl IntoResponse { + // ── Bearer auth (pairing) ── + if state.pairing.require_pairing() { + let auth = headers + .get(header::AUTHORIZATION) + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + let token = auth.strip_prefix("Bearer ").unwrap_or(""); + if !state.pairing.is_authenticated(token) { + let err = serde_json::json!({ + "error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer " + }); + return (StatusCode::UNAUTHORIZED, Json(err)); + } + } + + let Json(request) = match body { + Ok(body) => body, + Err(e) => { + tracing::warn!("Node-control JSON parse error: {e}"); + let err = serde_json::json!({ + "error": "Invalid JSON body for node-control request" + }); + return (StatusCode::BAD_REQUEST, Json(err)); + } + }; + + let node_control = { state.config.lock().gateway.node_control.clone() }; + if !node_control.enabled { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": "Node-control API is disabled"})), + ); + } + + // Optional second-factor shared token for node-control endpoints. + if let Some(expected_token) = node_control + .auth_token + .as_deref() + .map(str::trim) + .filter(|value| !value.is_empty()) + { + let provided_token = headers + .get("X-Node-Control-Token") + .and_then(|v| v.to_str().ok()) + .map(str::trim) + .unwrap_or(""); + if !constant_time_eq(expected_token, provided_token) { + return ( + StatusCode::UNAUTHORIZED, + Json(serde_json::json!({"error": "Invalid X-Node-Control-Token"})), + ); + } + } + + let method = request.method.trim(); + match method { + "node.list" => { + let nodes = node_control + .allowed_node_ids + .iter() + .map(|node_id| { + serde_json::json!({ + "node_id": node_id, + "status": "unpaired", + "capabilities": [] + }) + }) + .collect::>(); + ( + StatusCode::OK, + Json(serde_json::json!({ + "ok": true, + "method": "node.list", + "nodes": nodes + })), + ) + } + "node.describe" => { + let Some(node_id) = request + .node_id + .as_deref() + .map(str::trim) + .filter(|value| !value.is_empty()) + else { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "node_id is required for node.describe"})), + ); + }; + if !node_id_allowed(node_id, &node_control.allowed_node_ids) { + return ( + StatusCode::FORBIDDEN, + Json(serde_json::json!({"error": "node_id is not allowed"})), + ); + } + + ( + StatusCode::OK, + Json(serde_json::json!({ + "ok": true, + "method": "node.describe", + "node_id": node_id, + "description": { + "status": "stub", + "capabilities": [], + "message": "Node descriptor scaffold is enabled; runtime backend is not wired yet." + } + })), + ) + } + "node.invoke" => { + let Some(node_id) = request + .node_id + .as_deref() + .map(str::trim) + .filter(|value| !value.is_empty()) + else { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "node_id is required for node.invoke"})), + ); + }; + if !node_id_allowed(node_id, &node_control.allowed_node_ids) { + return ( + StatusCode::FORBIDDEN, + Json(serde_json::json!({"error": "node_id is not allowed"})), + ); + } + + ( + StatusCode::NOT_IMPLEMENTED, + Json(serde_json::json!({ + "ok": false, + "method": "node.invoke", + "node_id": node_id, + "capability": request.capability, + "arguments": request.arguments, + "error": "node.invoke backend is not implemented yet in this scaffold" + })), + ) + } + _ => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({ + "error": "Unsupported method", + "supported_methods": ["node.list", "node.describe", "node.invoke"] + })), + ), + } +} + /// POST /webhook — main webhook endpoint async fn handle_webhook( State(state): State, @@ -1704,6 +1891,18 @@ mod tests { assert!(q.mode.is_none()); } + #[test] + fn node_id_allowed_with_empty_allowlist_accepts_any() { + assert!(node_id_allowed("node-a", &[])); + } + + #[test] + fn node_id_allowed_respects_allowlist() { + let allow = vec!["node-1".to_string(), "node-2".to_string()]; + assert!(node_id_allowed("node-1", &allow)); + assert!(!node_id_allowed("node-9", &allow)); + } + #[test] fn app_state_is_clone() { fn assert_clone() {} @@ -2214,6 +2413,112 @@ mod tests { assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1); } + #[tokio::test] + async fn node_control_returns_not_found_when_disabled() { + let provider: Arc = Arc::new(MockProvider::default()); + let memory: Arc = Arc::new(MockMemory); + + let state = AppState { + config: Arc::new(Mutex::new(Config::default())), + provider, + model: "test-model".into(), + temperature: 0.0, + mem: memory, + auto_save: false, + webhook_secret_hash: None, + pairing: Arc::new(PairingGuard::new(false, &[])), + trust_forwarded_headers: false, + rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), + idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), + whatsapp: None, + whatsapp_app_secret: None, + linq: None, + linq_signing_secret: None, + nextcloud_talk: None, + nextcloud_talk_webhook_secret: None, + wati: None, + observer: Arc::new(crate::observability::NoopObserver), + tools_registry: Arc::new(Vec::new()), + tools_registry_exec: Arc::new(Vec::new()), + multimodal: crate::config::MultimodalConfig::default(), + max_tool_iterations: 10, + cost_tracker: None, + event_tx: tokio::sync::broadcast::channel(16).0, + }; + + let response = handle_node_control( + State(state), + HeaderMap::new(), + Ok(Json(NodeControlRequest { + method: "node.list".into(), + node_id: None, + capability: None, + arguments: serde_json::Value::Null, + })), + ) + .await + .into_response(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); + } + + #[tokio::test] + async fn node_control_list_returns_stub_nodes_when_enabled() { + let provider: Arc = Arc::new(MockProvider::default()); + let memory: Arc = Arc::new(MockMemory); + + let mut config = Config::default(); + config.gateway.node_control.enabled = true; + config.gateway.node_control.allowed_node_ids = vec!["node-1".into(), "node-2".into()]; + + let state = AppState { + config: Arc::new(Mutex::new(config)), + provider, + model: "test-model".into(), + temperature: 0.0, + mem: memory, + auto_save: false, + webhook_secret_hash: None, + pairing: Arc::new(PairingGuard::new(false, &[])), + trust_forwarded_headers: false, + rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), + idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), + whatsapp: None, + whatsapp_app_secret: None, + linq: None, + linq_signing_secret: None, + nextcloud_talk: None, + nextcloud_talk_webhook_secret: None, + wati: None, + observer: Arc::new(crate::observability::NoopObserver), + tools_registry: Arc::new(Vec::new()), + tools_registry_exec: Arc::new(Vec::new()), + multimodal: crate::config::MultimodalConfig::default(), + max_tool_iterations: 10, + cost_tracker: None, + event_tx: tokio::sync::broadcast::channel(16).0, + }; + + let response = handle_node_control( + State(state), + HeaderMap::new(), + Ok(Json(NodeControlRequest { + method: "node.list".into(), + node_id: None, + capability: None, + arguments: serde_json::Value::Null, + })), + ) + .await + .into_response(); + + assert_eq!(response.status(), StatusCode::OK); + let payload = response.into_body().collect().await.unwrap().to_bytes(); + let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap(); + assert_eq!(parsed["ok"], true); + assert_eq!(parsed["method"], "node.list"); + assert_eq!(parsed["nodes"].as_array().map(|v| v.len()), Some(2)); + } + #[tokio::test] async fn webhook_autosave_stores_distinct_keys_per_request() { let provider_impl = Arc::new(MockProvider::default());