feat(gateway): add experimental node-control scaffold API
This commit is contained in:
parent
56ffcd4477
commit
c876a03819
@ -326,6 +326,14 @@ Notes:
|
||||
| `require_pairing` | `true` | require pairing before bearer auth |
|
||||
| `allow_public_bind` | `false` | block accidental public exposure |
|
||||
|
||||
## `[gateway.node_control]` (experimental)
|
||||
|
||||
| Key | Default | Purpose |
|
||||
|---|---|---|
|
||||
| `enabled` | `false` | enable node-control scaffold endpoint (`POST /api/node-control`) |
|
||||
| `auth_token` | `null` | optional extra shared token checked via `X-Node-Control-Token` |
|
||||
| `allowed_node_ids` | `[]` | allowlist for `node.describe`/`node.invoke` (`[]` accepts any) |
|
||||
|
||||
## `[autonomy]`
|
||||
|
||||
| Key | Default | Purpose |
|
||||
|
||||
@ -259,6 +259,14 @@ Lưu ý:
|
||||
| `require_pairing` | `true` | Yêu cầu ghép nối trước khi xác thực bearer |
|
||||
| `allow_public_bind` | `false` | Chặn lộ public do vô ý |
|
||||
|
||||
## `[gateway.node_control]` (thử nghiệm)
|
||||
|
||||
| Khóa | Mặc định | Mục đích |
|
||||
|---|---|---|
|
||||
| `enabled` | `false` | Bật endpoint scaffold node-control (`POST /api/node-control`) |
|
||||
| `auth_token` | `null` | Shared token bổ sung, kiểm qua header `X-Node-Control-Token` |
|
||||
| `allowed_node_ids` | `[]` | Allowlist cho `node.describe`/`node.invoke` (`[]` = chấp nhận mọi node) |
|
||||
|
||||
## `[autonomy]`
|
||||
|
||||
| Khóa | Mặc định | Mục đích |
|
||||
|
||||
@ -259,6 +259,14 @@ Lưu ý:
|
||||
| `require_pairing` | `true` | Yêu cầu ghép nối trước khi xác thực bearer |
|
||||
| `allow_public_bind` | `false` | Chặn lộ public do vô ý |
|
||||
|
||||
## `[gateway.node_control]` (thử nghiệm)
|
||||
|
||||
| Khóa | Mặc định | Mục đích |
|
||||
|---|---|---|
|
||||
| `enabled` | `false` | Bật endpoint scaffold node-control (`POST /api/node-control`) |
|
||||
| `auth_token` | `null` | Shared token bổ sung, kiểm qua header `X-Node-Control-Token` |
|
||||
| `allowed_node_ids` | `[]` | Allowlist cho `node.describe`/`node.invoke` (`[]` = chấp nhận mọi node) |
|
||||
|
||||
## `[autonomy]`
|
||||
|
||||
| Khóa | Mặc định | Mục đích |
|
||||
|
||||
@ -827,6 +827,38 @@ pub struct GatewayConfig {
|
||||
/// Maximum distinct idempotency keys retained in memory.
|
||||
#[serde(default = "default_gateway_idempotency_max_keys")]
|
||||
pub idempotency_max_keys: usize,
|
||||
|
||||
/// Node-control protocol scaffold (`[gateway.node_control]`).
|
||||
#[serde(default)]
|
||||
pub node_control: NodeControlConfig,
|
||||
}
|
||||
|
||||
/// Node-control scaffold settings under `[gateway.node_control]`.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
|
||||
pub struct NodeControlConfig {
|
||||
/// Enable experimental node-control API endpoints.
|
||||
#[serde(default)]
|
||||
pub enabled: bool,
|
||||
|
||||
/// Optional extra shared token for node-control API calls.
|
||||
/// When set, clients must send this value in `X-Node-Control-Token`.
|
||||
#[serde(default)]
|
||||
pub auth_token: Option<String>,
|
||||
|
||||
/// Allowlist of remote node IDs for `node.describe`/`node.invoke`.
|
||||
/// Empty means "no explicit allowlist" (accept all IDs).
|
||||
#[serde(default)]
|
||||
pub allowed_node_ids: Vec<String>,
|
||||
}
|
||||
|
||||
impl Default for NodeControlConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enabled: false,
|
||||
auth_token: None,
|
||||
allowed_node_ids: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn default_gateway_port() -> u16 {
|
||||
@ -875,6 +907,7 @@ impl Default for GatewayConfig {
|
||||
rate_limit_max_keys: default_gateway_rate_limit_max_keys(),
|
||||
idempotency_ttl_secs: default_idempotency_ttl_secs(),
|
||||
idempotency_max_keys: default_gateway_idempotency_max_keys(),
|
||||
node_control: NodeControlConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -6042,6 +6075,9 @@ channel_id = "C123"
|
||||
assert_eq!(g.rate_limit_max_keys, 10_000);
|
||||
assert_eq!(g.idempotency_ttl_secs, 300);
|
||||
assert_eq!(g.idempotency_max_keys, 10_000);
|
||||
assert!(!g.node_control.enabled);
|
||||
assert!(g.node_control.auth_token.is_none());
|
||||
assert!(g.node_control.allowed_node_ids.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -6073,6 +6109,11 @@ channel_id = "C123"
|
||||
rate_limit_max_keys: 2048,
|
||||
idempotency_ttl_secs: 600,
|
||||
idempotency_max_keys: 4096,
|
||||
node_control: NodeControlConfig {
|
||||
enabled: true,
|
||||
auth_token: Some("node-token".into()),
|
||||
allowed_node_ids: vec!["node-1".into(), "node-2".into()],
|
||||
},
|
||||
};
|
||||
let toml_str = toml::to_string(&g).unwrap();
|
||||
let parsed: GatewayConfig = toml::from_str(&toml_str).unwrap();
|
||||
@ -6085,6 +6126,15 @@ channel_id = "C123"
|
||||
assert_eq!(parsed.rate_limit_max_keys, 2048);
|
||||
assert_eq!(parsed.idempotency_ttl_secs, 600);
|
||||
assert_eq!(parsed.idempotency_max_keys, 4096);
|
||||
assert!(parsed.node_control.enabled);
|
||||
assert_eq!(
|
||||
parsed.node_control.auth_token.as_deref(),
|
||||
Some("node-token")
|
||||
);
|
||||
assert_eq!(
|
||||
parsed.node_control.allowed_node_ids,
|
||||
vec!["node-1", "node-2"]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -7385,6 +7435,9 @@ default_model = "legacy-model"
|
||||
assert!(!g.trust_forwarded_headers);
|
||||
assert_eq!(g.rate_limit_max_keys, 10_000);
|
||||
assert_eq!(g.idempotency_max_keys, 10_000);
|
||||
assert!(!g.node_control.enabled);
|
||||
assert!(g.node_control.auth_token.is_none());
|
||||
assert!(g.node_control.allowed_node_ids.is_empty());
|
||||
}
|
||||
|
||||
// ── Peripherals config ───────────────────────────────────────
|
||||
|
||||
@ -624,6 +624,9 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
||||
if qq_webhook_enabled {
|
||||
println!(" POST /qq — QQ Bot webhook (validation + events)");
|
||||
}
|
||||
if config.gateway.node_control.enabled {
|
||||
println!(" POST /api/node-control — experimental node-control RPC scaffold");
|
||||
}
|
||||
println!(" GET /api/* — REST API (bearer token required)");
|
||||
println!(" GET /ws/chat — WebSocket agent chat");
|
||||
println!(" GET /health — health check");
|
||||
@ -723,6 +726,7 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
||||
.route("/api/cost", get(api::handle_api_cost))
|
||||
.route("/api/cli-tools", get(api::handle_api_cli_tools))
|
||||
.route("/api/health", get(api::handle_api_health))
|
||||
.route("/api/node-control", post(handle_node_control))
|
||||
// ── SSE event stream ──
|
||||
.route("/api/events", get(sse::handle_sse_events))
|
||||
// ── WebSocket agent chat ──
|
||||
@ -911,6 +915,189 @@ pub struct WebhookBody {
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Deserialize)]
|
||||
pub struct NodeControlRequest {
|
||||
pub method: String,
|
||||
#[serde(default)]
|
||||
pub node_id: Option<String>,
|
||||
#[serde(default)]
|
||||
pub capability: Option<String>,
|
||||
#[serde(default)]
|
||||
pub arguments: serde_json::Value,
|
||||
}
|
||||
|
||||
fn node_id_allowed(node_id: &str, allowed_node_ids: &[String]) -> bool {
|
||||
if allowed_node_ids.is_empty() {
|
||||
return true;
|
||||
}
|
||||
|
||||
allowed_node_ids
|
||||
.iter()
|
||||
.any(|candidate| candidate == "*" || candidate == node_id)
|
||||
}
|
||||
|
||||
/// POST /api/node-control — experimental node-control protocol scaffold.
|
||||
///
|
||||
/// Supported methods:
|
||||
/// - `node.list`
|
||||
/// - `node.describe`
|
||||
/// - `node.invoke` (stubbed as not implemented)
|
||||
async fn handle_node_control(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
body: Result<Json<NodeControlRequest>, axum::extract::rejection::JsonRejection>,
|
||||
) -> impl IntoResponse {
|
||||
// ── Bearer auth (pairing) ──
|
||||
if state.pairing.require_pairing() {
|
||||
let auth = headers
|
||||
.get(header::AUTHORIZATION)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("");
|
||||
let token = auth.strip_prefix("Bearer ").unwrap_or("");
|
||||
if !state.pairing.is_authenticated(token) {
|
||||
let err = serde_json::json!({
|
||||
"error": "Unauthorized — pair first via POST /pair, then send Authorization: Bearer <token>"
|
||||
});
|
||||
return (StatusCode::UNAUTHORIZED, Json(err));
|
||||
}
|
||||
}
|
||||
|
||||
let Json(request) = match body {
|
||||
Ok(body) => body,
|
||||
Err(e) => {
|
||||
tracing::warn!("Node-control JSON parse error: {e}");
|
||||
let err = serde_json::json!({
|
||||
"error": "Invalid JSON body for node-control request"
|
||||
});
|
||||
return (StatusCode::BAD_REQUEST, Json(err));
|
||||
}
|
||||
};
|
||||
|
||||
let node_control = { state.config.lock().gateway.node_control.clone() };
|
||||
if !node_control.enabled {
|
||||
return (
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(serde_json::json!({"error": "Node-control API is disabled"})),
|
||||
);
|
||||
}
|
||||
|
||||
// Optional second-factor shared token for node-control endpoints.
|
||||
if let Some(expected_token) = node_control
|
||||
.auth_token
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.filter(|value| !value.is_empty())
|
||||
{
|
||||
let provided_token = headers
|
||||
.get("X-Node-Control-Token")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(str::trim)
|
||||
.unwrap_or("");
|
||||
if !constant_time_eq(expected_token, provided_token) {
|
||||
return (
|
||||
StatusCode::UNAUTHORIZED,
|
||||
Json(serde_json::json!({"error": "Invalid X-Node-Control-Token"})),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let method = request.method.trim();
|
||||
match method {
|
||||
"node.list" => {
|
||||
let nodes = node_control
|
||||
.allowed_node_ids
|
||||
.iter()
|
||||
.map(|node_id| {
|
||||
serde_json::json!({
|
||||
"node_id": node_id,
|
||||
"status": "unpaired",
|
||||
"capabilities": []
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
(
|
||||
StatusCode::OK,
|
||||
Json(serde_json::json!({
|
||||
"ok": true,
|
||||
"method": "node.list",
|
||||
"nodes": nodes
|
||||
})),
|
||||
)
|
||||
}
|
||||
"node.describe" => {
|
||||
let Some(node_id) = request
|
||||
.node_id
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.filter(|value| !value.is_empty())
|
||||
else {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({"error": "node_id is required for node.describe"})),
|
||||
);
|
||||
};
|
||||
if !node_id_allowed(node_id, &node_control.allowed_node_ids) {
|
||||
return (
|
||||
StatusCode::FORBIDDEN,
|
||||
Json(serde_json::json!({"error": "node_id is not allowed"})),
|
||||
);
|
||||
}
|
||||
|
||||
(
|
||||
StatusCode::OK,
|
||||
Json(serde_json::json!({
|
||||
"ok": true,
|
||||
"method": "node.describe",
|
||||
"node_id": node_id,
|
||||
"description": {
|
||||
"status": "stub",
|
||||
"capabilities": [],
|
||||
"message": "Node descriptor scaffold is enabled; runtime backend is not wired yet."
|
||||
}
|
||||
})),
|
||||
)
|
||||
}
|
||||
"node.invoke" => {
|
||||
let Some(node_id) = request
|
||||
.node_id
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.filter(|value| !value.is_empty())
|
||||
else {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({"error": "node_id is required for node.invoke"})),
|
||||
);
|
||||
};
|
||||
if !node_id_allowed(node_id, &node_control.allowed_node_ids) {
|
||||
return (
|
||||
StatusCode::FORBIDDEN,
|
||||
Json(serde_json::json!({"error": "node_id is not allowed"})),
|
||||
);
|
||||
}
|
||||
|
||||
(
|
||||
StatusCode::NOT_IMPLEMENTED,
|
||||
Json(serde_json::json!({
|
||||
"ok": false,
|
||||
"method": "node.invoke",
|
||||
"node_id": node_id,
|
||||
"capability": request.capability,
|
||||
"arguments": request.arguments,
|
||||
"error": "node.invoke backend is not implemented yet in this scaffold"
|
||||
})),
|
||||
)
|
||||
}
|
||||
_ => (
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({
|
||||
"error": "Unsupported method",
|
||||
"supported_methods": ["node.list", "node.describe", "node.invoke"]
|
||||
})),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// POST /webhook — main webhook endpoint
|
||||
async fn handle_webhook(
|
||||
State(state): State<AppState>,
|
||||
@ -1704,6 +1891,18 @@ mod tests {
|
||||
assert!(q.mode.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn node_id_allowed_with_empty_allowlist_accepts_any() {
|
||||
assert!(node_id_allowed("node-a", &[]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn node_id_allowed_respects_allowlist() {
|
||||
let allow = vec!["node-1".to_string(), "node-2".to_string()];
|
||||
assert!(node_id_allowed("node-1", &allow));
|
||||
assert!(!node_id_allowed("node-9", &allow));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn app_state_is_clone() {
|
||||
fn assert_clone<T: Clone>() {}
|
||||
@ -2214,6 +2413,112 @@ mod tests {
|
||||
assert_eq!(provider_impl.calls.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn node_control_returns_not_found_when_disabled() {
|
||||
let provider: Arc<dyn Provider> = Arc::new(MockProvider::default());
|
||||
let memory: Arc<dyn Memory> = Arc::new(MockMemory);
|
||||
|
||||
let state = AppState {
|
||||
config: Arc::new(Mutex::new(Config::default())),
|
||||
provider,
|
||||
model: "test-model".into(),
|
||||
temperature: 0.0,
|
||||
mem: memory,
|
||||
auto_save: false,
|
||||
webhook_secret_hash: None,
|
||||
pairing: Arc::new(PairingGuard::new(false, &[])),
|
||||
trust_forwarded_headers: false,
|
||||
rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
|
||||
idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
|
||||
whatsapp: None,
|
||||
whatsapp_app_secret: None,
|
||||
linq: None,
|
||||
linq_signing_secret: None,
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
tools_registry_exec: Arc::new(Vec::new()),
|
||||
multimodal: crate::config::MultimodalConfig::default(),
|
||||
max_tool_iterations: 10,
|
||||
cost_tracker: None,
|
||||
event_tx: tokio::sync::broadcast::channel(16).0,
|
||||
};
|
||||
|
||||
let response = handle_node_control(
|
||||
State(state),
|
||||
HeaderMap::new(),
|
||||
Ok(Json(NodeControlRequest {
|
||||
method: "node.list".into(),
|
||||
node_id: None,
|
||||
capability: None,
|
||||
arguments: serde_json::Value::Null,
|
||||
})),
|
||||
)
|
||||
.await
|
||||
.into_response();
|
||||
assert_eq!(response.status(), StatusCode::NOT_FOUND);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn node_control_list_returns_stub_nodes_when_enabled() {
|
||||
let provider: Arc<dyn Provider> = Arc::new(MockProvider::default());
|
||||
let memory: Arc<dyn Memory> = Arc::new(MockMemory);
|
||||
|
||||
let mut config = Config::default();
|
||||
config.gateway.node_control.enabled = true;
|
||||
config.gateway.node_control.allowed_node_ids = vec!["node-1".into(), "node-2".into()];
|
||||
|
||||
let state = AppState {
|
||||
config: Arc::new(Mutex::new(config)),
|
||||
provider,
|
||||
model: "test-model".into(),
|
||||
temperature: 0.0,
|
||||
mem: memory,
|
||||
auto_save: false,
|
||||
webhook_secret_hash: None,
|
||||
pairing: Arc::new(PairingGuard::new(false, &[])),
|
||||
trust_forwarded_headers: false,
|
||||
rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
|
||||
idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
|
||||
whatsapp: None,
|
||||
whatsapp_app_secret: None,
|
||||
linq: None,
|
||||
linq_signing_secret: None,
|
||||
nextcloud_talk: None,
|
||||
nextcloud_talk_webhook_secret: None,
|
||||
wati: None,
|
||||
observer: Arc::new(crate::observability::NoopObserver),
|
||||
tools_registry: Arc::new(Vec::new()),
|
||||
tools_registry_exec: Arc::new(Vec::new()),
|
||||
multimodal: crate::config::MultimodalConfig::default(),
|
||||
max_tool_iterations: 10,
|
||||
cost_tracker: None,
|
||||
event_tx: tokio::sync::broadcast::channel(16).0,
|
||||
};
|
||||
|
||||
let response = handle_node_control(
|
||||
State(state),
|
||||
HeaderMap::new(),
|
||||
Ok(Json(NodeControlRequest {
|
||||
method: "node.list".into(),
|
||||
node_id: None,
|
||||
capability: None,
|
||||
arguments: serde_json::Value::Null,
|
||||
})),
|
||||
)
|
||||
.await
|
||||
.into_response();
|
||||
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
let payload = response.into_body().collect().await.unwrap().to_bytes();
|
||||
let parsed: serde_json::Value = serde_json::from_slice(&payload).unwrap();
|
||||
assert_eq!(parsed["ok"], true);
|
||||
assert_eq!(parsed["method"], "node.list");
|
||||
assert_eq!(parsed["nodes"].as_array().map(|v| v.len()), Some(2));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn webhook_autosave_stores_distinct_keys_per_request() {
|
||||
let provider_impl = Arc::new(MockProvider::default());
|
||||
|
||||
Loading…
Reference in New Issue
Block a user