diff --git a/src/gateway/api_control_plane.rs b/src/gateway/api_control_plane.rs new file mode 100644 index 000000000..38b373b55 --- /dev/null +++ b/src/gateway/api_control_plane.rs @@ -0,0 +1,172 @@ +//! Control plane REST API handlers. + +use super::control_plane::{NodeCapability, NodeInfo, NodeStatus}; +use super::AppState; +use axum::{ + extract::{Path, State}, + http::{header, HeaderMap, StatusCode}, + response::{IntoResponse, Json}, +}; +use chrono::Utc; +use serde::Deserialize; +use std::collections::HashMap; + +fn require_auth(state: &AppState, headers: &HeaderMap) -> Result<(), (StatusCode, &'static str)> { + if state.pairing.require_pairing() { + let token = headers + .get(header::AUTHORIZATION) + .and_then(|v| v.to_str().ok()) + .and_then(|auth| auth.strip_prefix("Bearer ")) + .unwrap_or(""); + if !state.pairing.is_authenticated(token) { + return Err((StatusCode::UNAUTHORIZED, "Unauthorized")); + } + } + Ok(()) +} + +/// GET /api/control-plane/nodes — list all registered nodes +pub async fn list_nodes( + State(state): State, + headers: HeaderMap, +) -> impl IntoResponse { + if let Err(e) = require_auth(&state, &headers) { + return e.into_response(); + } + + match &state.control_plane { + Some(cp) => { + let nodes = cp.list_nodes(); + let count = nodes.len(); + Json(serde_json::json!({ + "nodes": nodes, + "count": count + })) + .into_response() + } + None => ( + StatusCode::SERVICE_UNAVAILABLE, + "Control plane not enabled", + ) + .into_response(), + } +} + +#[derive(Deserialize)] +pub struct RegisterNodeRequest { + pub id: String, + pub name: Option, + pub address: Option, + pub version: Option, + #[serde(default)] + pub capabilities: Vec, + #[serde(default)] + pub metadata: HashMap, +} + +/// POST /api/control-plane/nodes — register a new node +pub async fn register_node( + State(state): State, + headers: HeaderMap, + Json(body): Json, +) -> impl IntoResponse { + if let Err(e) = require_auth(&state, &headers) { + return e.into_response(); + } + + match &state.control_plane { + Some(cp) => { + let node_id = body.id.clone(); + let info = NodeInfo { + id: body.id, + name: body.name, + address: body.address, + version: body.version, + capabilities: body.capabilities, + status: NodeStatus::Healthy, + registered_at: Utc::now(), + last_heartbeat: Utc::now(), + missed_heartbeats: 0, + metadata: body.metadata, + }; + + if cp.register(info) { + ( + StatusCode::CREATED, + Json(serde_json::json!({ + "message": "Node registered", + "node_id": node_id + })), + ) + .into_response() + } else { + (StatusCode::CONFLICT, "Node capacity reached").into_response() + } + } + None => ( + StatusCode::SERVICE_UNAVAILABLE, + "Control plane not enabled", + ) + .into_response(), + } +} + +/// POST /api/control-plane/nodes/{id}/heartbeat — record a heartbeat +pub async fn node_heartbeat( + State(state): State, + headers: HeaderMap, + Path(node_id): Path, +) -> impl IntoResponse { + if let Err(e) = require_auth(&state, &headers) { + return e.into_response(); + } + + match &state.control_plane { + Some(cp) => { + if cp.heartbeat(&node_id) { + Json(serde_json::json!({ + "message": "Heartbeat recorded", + "node_id": node_id + })) + .into_response() + } else { + (StatusCode::NOT_FOUND, "Node not found").into_response() + } + } + None => ( + StatusCode::SERVICE_UNAVAILABLE, + "Control plane not enabled", + ) + .into_response(), + } +} + +/// DELETE /api/control-plane/nodes/{id} — deregister a node +pub async fn deregister_node( + State(state): State, + headers: HeaderMap, + Path(node_id): Path, +) -> impl IntoResponse { + if let Err(e) = require_auth(&state, &headers) { + return e.into_response(); + } + + match &state.control_plane { + Some(cp) => { + if cp.deregister(&node_id) { + Json(serde_json::json!({ + "message": "Node deregistered", + "node_id": node_id + })) + .into_response() + } else { + (StatusCode::NOT_FOUND, "Node not found").into_response() + } + } + None => ( + StatusCode::SERVICE_UNAVAILABLE, + "Control plane not enabled", + ) + .into_response(), + } +} diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 5f4280286..0585b7a4a 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -8,6 +8,7 @@ //! - Header sanitization (handled by axum/hyper) pub mod api; +pub mod api_control_plane; pub mod api_pairing; pub mod control_plane; pub mod nodes; @@ -804,6 +805,11 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> { "/api/devices/{id}/token/rotate", post(api_pairing::rotate_token), ) + // ── Control Plane API ── + .route("/api/control-plane/nodes", get(api_control_plane::list_nodes)) + .route("/api/control-plane/nodes", post(api_control_plane::register_node)) + .route("/api/control-plane/nodes/{id}/heartbeat", post(api_control_plane::node_heartbeat)) + .route("/api/control-plane/nodes/{id}", delete(api_control_plane::deregister_node)) // ── SSE event stream ── .route("/api/events", get(sse::handle_sse_events)) // ── WebSocket agent chat ──