Compare commits
5 Commits
master
...
feat/contr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0087b13fa0 | ||
|
|
21d6dfd8d9 | ||
|
|
7bef57cba3 | ||
|
|
2b3ab578b5 | ||
|
|
7b2c3dfeb5 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -7981,6 +7981,7 @@ version = "0.5.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-imap",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"axum",
|
||||
"base64",
|
||||
|
||||
@ -42,6 +42,7 @@ clap_complete = "4.5"
|
||||
tokio = { version = "1.50", default-features = false, features = ["rt-multi-thread", "macros", "time", "net", "io-util", "sync", "process", "io-std", "fs", "signal"] }
|
||||
tokio-util = { version = "0.7", default-features = false }
|
||||
tokio-stream = { version = "0.1.18", default-features = false, features = ["fs", "sync"] }
|
||||
async-stream = "0.3"
|
||||
|
||||
# HTTP client - minimal features
|
||||
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "blocking", "multipart", "stream", "socks"] }
|
||||
|
||||
153
src/gateway/api_control_plane.rs
Normal file
153
src/gateway/api_control_plane.rs
Normal file
@ -0,0 +1,153 @@
|
||||
//! Control plane REST API handlers.
|
||||
|
||||
use super::control_plane::{NodeCapability, NodeInfo, NodeStatus};
|
||||
use super::AppState;
|
||||
use axum::{
|
||||
extract::{Path, State},
|
||||
http::{header, HeaderMap, StatusCode},
|
||||
response::{IntoResponse, Json},
|
||||
};
|
||||
use chrono::Utc;
|
||||
use serde::Deserialize;
|
||||
use std::collections::HashMap;
|
||||
|
||||
fn require_auth(state: &AppState, headers: &HeaderMap) -> Result<(), (StatusCode, &'static str)> {
|
||||
if state.pairing.require_pairing() {
|
||||
let token = headers
|
||||
.get(header::AUTHORIZATION)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.and_then(|auth| auth.strip_prefix("Bearer "))
|
||||
.unwrap_or("");
|
||||
if !state.pairing.is_authenticated(token) {
|
||||
return Err((StatusCode::UNAUTHORIZED, "Unauthorized"));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// GET /api/control-plane/nodes — list all registered nodes
|
||||
pub async fn list_nodes(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
|
||||
if let Err(e) = require_auth(&state, &headers) {
|
||||
return e.into_response();
|
||||
}
|
||||
|
||||
match &state.control_plane {
|
||||
Some(cp) => {
|
||||
let nodes = cp.list_nodes();
|
||||
let count = nodes.len();
|
||||
Json(serde_json::json!({
|
||||
"nodes": nodes,
|
||||
"count": count
|
||||
}))
|
||||
.into_response()
|
||||
}
|
||||
None => (StatusCode::SERVICE_UNAVAILABLE, "Control plane not enabled").into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct RegisterNodeRequest {
|
||||
pub id: String,
|
||||
pub name: Option<String>,
|
||||
pub address: Option<String>,
|
||||
pub version: Option<String>,
|
||||
#[serde(default)]
|
||||
pub capabilities: Vec<NodeCapability>,
|
||||
#[serde(default)]
|
||||
pub metadata: HashMap<String, String>,
|
||||
}
|
||||
|
||||
/// POST /api/control-plane/nodes — register a new node
|
||||
pub async fn register_node(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
Json(body): Json<RegisterNodeRequest>,
|
||||
) -> impl IntoResponse {
|
||||
if let Err(e) = require_auth(&state, &headers) {
|
||||
return e.into_response();
|
||||
}
|
||||
|
||||
match &state.control_plane {
|
||||
Some(cp) => {
|
||||
let node_id = body.id.clone();
|
||||
let info = NodeInfo {
|
||||
id: body.id,
|
||||
name: body.name,
|
||||
address: body.address,
|
||||
version: body.version,
|
||||
capabilities: body.capabilities,
|
||||
status: NodeStatus::Healthy,
|
||||
registered_at: Utc::now(),
|
||||
last_heartbeat: Utc::now(),
|
||||
missed_heartbeats: 0,
|
||||
metadata: body.metadata,
|
||||
};
|
||||
|
||||
if cp.register(info) {
|
||||
(
|
||||
StatusCode::CREATED,
|
||||
Json(serde_json::json!({
|
||||
"message": "Node registered",
|
||||
"node_id": node_id
|
||||
})),
|
||||
)
|
||||
.into_response()
|
||||
} else {
|
||||
(StatusCode::CONFLICT, "Node capacity reached").into_response()
|
||||
}
|
||||
}
|
||||
None => (StatusCode::SERVICE_UNAVAILABLE, "Control plane not enabled").into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
/// POST /api/control-plane/nodes/{id}/heartbeat — record a heartbeat
|
||||
pub async fn node_heartbeat(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
Path(node_id): Path<String>,
|
||||
) -> impl IntoResponse {
|
||||
if let Err(e) = require_auth(&state, &headers) {
|
||||
return e.into_response();
|
||||
}
|
||||
|
||||
match &state.control_plane {
|
||||
Some(cp) => {
|
||||
if cp.heartbeat(&node_id) {
|
||||
Json(serde_json::json!({
|
||||
"message": "Heartbeat recorded",
|
||||
"node_id": node_id
|
||||
}))
|
||||
.into_response()
|
||||
} else {
|
||||
(StatusCode::NOT_FOUND, "Node not found").into_response()
|
||||
}
|
||||
}
|
||||
None => (StatusCode::SERVICE_UNAVAILABLE, "Control plane not enabled").into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
/// DELETE /api/control-plane/nodes/{id} — deregister a node
|
||||
pub async fn deregister_node(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
Path(node_id): Path<String>,
|
||||
) -> impl IntoResponse {
|
||||
if let Err(e) = require_auth(&state, &headers) {
|
||||
return e.into_response();
|
||||
}
|
||||
|
||||
match &state.control_plane {
|
||||
Some(cp) => {
|
||||
if cp.deregister(&node_id) {
|
||||
Json(serde_json::json!({
|
||||
"message": "Node deregistered",
|
||||
"node_id": node_id
|
||||
}))
|
||||
.into_response()
|
||||
} else {
|
||||
(StatusCode::NOT_FOUND, "Node not found").into_response()
|
||||
}
|
||||
}
|
||||
None => (StatusCode::SERVICE_UNAVAILABLE, "Control plane not enabled").into_response(),
|
||||
}
|
||||
}
|
||||
116
src/gateway/backoff.rs
Normal file
116
src/gateway/backoff.rs
Normal file
@ -0,0 +1,116 @@
|
||||
//! Exponential backoff for channel restart and retry logic.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
/// Exponential backoff with configurable bounds and jitter.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ExponentialBackoff {
|
||||
/// Current attempt number (0-indexed)
|
||||
attempt: u32,
|
||||
/// Base delay for the first retry
|
||||
base: Duration,
|
||||
/// Maximum delay cap
|
||||
max: Duration,
|
||||
/// Multiplier per attempt
|
||||
factor: f64,
|
||||
}
|
||||
|
||||
impl ExponentialBackoff {
|
||||
/// Create a new backoff starting at `base` delay, capped at `max`.
|
||||
pub fn new(base: Duration, max: Duration) -> Self {
|
||||
Self {
|
||||
attempt: 0,
|
||||
base,
|
||||
max,
|
||||
factor: 2.0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create with a custom multiplier factor.
|
||||
pub fn with_factor(mut self, factor: f64) -> Self {
|
||||
self.factor = factor.max(1.0);
|
||||
self
|
||||
}
|
||||
|
||||
/// Get the next delay and advance the attempt counter.
|
||||
pub fn next_delay(&mut self) -> Duration {
|
||||
let delay = self.base.mul_f64(self.factor.powi(self.attempt as i32));
|
||||
self.attempt = self.attempt.saturating_add(1);
|
||||
|
||||
// Add jitter (+-25%)
|
||||
let jitter_factor = 0.75 + (pseudo_random() * 0.5);
|
||||
let jittered = delay.mul_f64(jitter_factor);
|
||||
|
||||
jittered.min(self.max)
|
||||
}
|
||||
|
||||
/// Reset the backoff counter (e.g., after a successful connection).
|
||||
pub fn reset(&mut self) {
|
||||
self.attempt = 0;
|
||||
}
|
||||
|
||||
/// Current attempt number.
|
||||
pub fn attempt(&self) -> u32 {
|
||||
self.attempt
|
||||
}
|
||||
|
||||
/// Async sleep for the next backoff duration.
|
||||
pub async fn wait(&mut self) {
|
||||
let delay = self.next_delay();
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ExponentialBackoff {
|
||||
fn default() -> Self {
|
||||
Self::new(
|
||||
Duration::from_secs(1),
|
||||
Duration::from_secs(300), // 5 minutes max
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Simple deterministic pseudo-random for jitter (no external dep needed).
|
||||
fn pseudo_random() -> f64 {
|
||||
use std::time::SystemTime;
|
||||
let nanos = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.subsec_nanos();
|
||||
f64::from(nanos % 1000) / 1000.0
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_default_backoff() {
|
||||
let mut b = ExponentialBackoff::default();
|
||||
let d1 = b.next_delay();
|
||||
let _d2 = b.next_delay();
|
||||
let d3 = b.next_delay();
|
||||
// Each delay should generally increase (with jitter)
|
||||
assert!(d1 < Duration::from_secs(5));
|
||||
assert!(d3 <= Duration::from_secs(300));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_max_cap() {
|
||||
let mut b = ExponentialBackoff::new(Duration::from_secs(60), Duration::from_secs(120));
|
||||
for _ in 0..20 {
|
||||
let d = b.next_delay();
|
||||
assert!(d <= Duration::from_secs(120));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_reset() {
|
||||
let mut b = ExponentialBackoff::default();
|
||||
b.next_delay();
|
||||
b.next_delay();
|
||||
assert_eq!(b.attempt(), 2);
|
||||
b.reset();
|
||||
assert_eq!(b.attempt(), 0);
|
||||
}
|
||||
}
|
||||
332
src/gateway/control_plane.rs
Normal file
332
src/gateway/control_plane.rs
Normal file
@ -0,0 +1,332 @@
|
||||
//! Control plane for multi-node orchestration.
|
||||
//!
|
||||
//! Wraps the existing `NodeRegistry` to add capability tracking,
|
||||
//! health status, and control events without replacing the underlying registry.
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use parking_lot::Mutex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Capabilities that a node can advertise.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum NodeCapability {
|
||||
/// Can run LLM provider inference
|
||||
Inference,
|
||||
/// Can execute tools (shell, file, etc.)
|
||||
ToolExecution,
|
||||
/// Has hardware peripherals attached
|
||||
Hardware,
|
||||
/// Can serve as a gateway
|
||||
Gateway,
|
||||
/// Has memory/storage backend
|
||||
Storage,
|
||||
/// Custom capability
|
||||
Custom(String),
|
||||
}
|
||||
|
||||
/// Health status of a node.
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum NodeStatus {
|
||||
/// Node is healthy and responding
|
||||
Healthy,
|
||||
/// Node has missed recent heartbeats
|
||||
Degraded,
|
||||
/// Node is not responding
|
||||
Unreachable,
|
||||
/// Node is intentionally offline
|
||||
Offline,
|
||||
}
|
||||
|
||||
/// Extended information about a registered node.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct NodeInfo {
|
||||
/// Unique node identifier
|
||||
pub id: String,
|
||||
/// Human-readable node name
|
||||
pub name: Option<String>,
|
||||
/// Node's advertised address (host:port)
|
||||
pub address: Option<String>,
|
||||
/// Node version
|
||||
pub version: Option<String>,
|
||||
/// Capabilities the node supports
|
||||
pub capabilities: Vec<NodeCapability>,
|
||||
/// Current health status
|
||||
pub status: NodeStatus,
|
||||
/// When this node was first registered
|
||||
pub registered_at: DateTime<Utc>,
|
||||
/// Last successful heartbeat
|
||||
pub last_heartbeat: DateTime<Utc>,
|
||||
/// Consecutive missed heartbeats
|
||||
pub missed_heartbeats: u32,
|
||||
/// Arbitrary metadata
|
||||
pub metadata: HashMap<String, String>,
|
||||
}
|
||||
|
||||
/// Events emitted by the control plane.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
pub enum ControlEvent {
|
||||
/// A node registered
|
||||
NodeRegistered {
|
||||
node_id: String,
|
||||
name: Option<String>,
|
||||
capabilities: Vec<NodeCapability>,
|
||||
timestamp: DateTime<Utc>,
|
||||
},
|
||||
/// A node was deregistered
|
||||
NodeDeregistered {
|
||||
node_id: String,
|
||||
timestamp: DateTime<Utc>,
|
||||
},
|
||||
/// A node's status changed
|
||||
NodeStatusChanged {
|
||||
node_id: String,
|
||||
old_status: NodeStatus,
|
||||
new_status: NodeStatus,
|
||||
timestamp: DateTime<Utc>,
|
||||
},
|
||||
/// A node sent a heartbeat
|
||||
NodeHeartbeat {
|
||||
node_id: String,
|
||||
timestamp: DateTime<Utc>,
|
||||
},
|
||||
}
|
||||
|
||||
/// The control plane manages node lifecycle and health.
|
||||
pub struct ControlPlane {
|
||||
nodes: Mutex<HashMap<String, NodeInfo>>,
|
||||
max_nodes: usize,
|
||||
event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
|
||||
/// How many missed heartbeats before marking degraded
|
||||
degraded_threshold: u32,
|
||||
/// How many missed heartbeats before marking unreachable
|
||||
unreachable_threshold: u32,
|
||||
}
|
||||
|
||||
impl ControlPlane {
|
||||
pub fn new(
|
||||
max_nodes: usize,
|
||||
event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
|
||||
) -> Self {
|
||||
Self {
|
||||
nodes: Mutex::new(HashMap::new()),
|
||||
max_nodes,
|
||||
event_tx,
|
||||
degraded_threshold: 3,
|
||||
unreachable_threshold: 10,
|
||||
}
|
||||
}
|
||||
|
||||
/// Register a new node. Returns false if capacity is reached.
|
||||
pub fn register(&self, info: NodeInfo) -> bool {
|
||||
let mut nodes = self.nodes.lock();
|
||||
if nodes.len() >= self.max_nodes && !nodes.contains_key(&info.id) {
|
||||
return false;
|
||||
}
|
||||
let event = ControlEvent::NodeRegistered {
|
||||
node_id: info.id.clone(),
|
||||
name: info.name.clone(),
|
||||
capabilities: info.capabilities.clone(),
|
||||
timestamp: Utc::now(),
|
||||
};
|
||||
nodes.insert(info.id.clone(), info);
|
||||
self.emit_event(&event);
|
||||
true
|
||||
}
|
||||
|
||||
/// Deregister a node by ID.
|
||||
pub fn deregister(&self, node_id: &str) -> bool {
|
||||
let mut nodes = self.nodes.lock();
|
||||
if nodes.remove(node_id).is_some() {
|
||||
self.emit_event(&ControlEvent::NodeDeregistered {
|
||||
node_id: node_id.to_string(),
|
||||
timestamp: Utc::now(),
|
||||
});
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a heartbeat from a node.
|
||||
pub fn heartbeat(&self, node_id: &str) -> bool {
|
||||
let mut nodes = self.nodes.lock();
|
||||
if let Some(node) = nodes.get_mut(node_id) {
|
||||
let old_status = node.status;
|
||||
node.last_heartbeat = Utc::now();
|
||||
node.missed_heartbeats = 0;
|
||||
node.status = NodeStatus::Healthy;
|
||||
|
||||
if old_status != NodeStatus::Healthy {
|
||||
self.emit_event(&ControlEvent::NodeStatusChanged {
|
||||
node_id: node_id.to_string(),
|
||||
old_status,
|
||||
new_status: NodeStatus::Healthy,
|
||||
timestamp: Utc::now(),
|
||||
});
|
||||
}
|
||||
|
||||
self.emit_event(&ControlEvent::NodeHeartbeat {
|
||||
node_id: node_id.to_string(),
|
||||
timestamp: Utc::now(),
|
||||
});
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// List all registered nodes.
|
||||
pub fn list_nodes(&self) -> Vec<NodeInfo> {
|
||||
self.nodes.lock().values().cloned().collect()
|
||||
}
|
||||
|
||||
/// Get a specific node by ID.
|
||||
pub fn get_node(&self, node_id: &str) -> Option<NodeInfo> {
|
||||
self.nodes.lock().get(node_id).cloned()
|
||||
}
|
||||
|
||||
/// Health monitor tick — increment missed heartbeats and update statuses.
|
||||
/// Should be called periodically (e.g., every 30s).
|
||||
pub fn health_tick(&self) {
|
||||
let mut nodes = self.nodes.lock();
|
||||
for node in nodes.values_mut() {
|
||||
if node.status == NodeStatus::Offline {
|
||||
continue;
|
||||
}
|
||||
node.missed_heartbeats += 1;
|
||||
let old_status = node.status;
|
||||
let new_status = if node.missed_heartbeats >= self.unreachable_threshold {
|
||||
NodeStatus::Unreachable
|
||||
} else if node.missed_heartbeats >= self.degraded_threshold {
|
||||
NodeStatus::Degraded
|
||||
} else {
|
||||
NodeStatus::Healthy
|
||||
};
|
||||
|
||||
if old_status != new_status {
|
||||
node.status = new_status;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Run the health monitor background loop.
|
||||
pub async fn health_monitor(self: Arc<Self>, interval_secs: u64) {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
self.health_tick();
|
||||
}
|
||||
}
|
||||
|
||||
/// Node count.
|
||||
pub fn node_count(&self) -> usize {
|
||||
self.nodes.lock().len()
|
||||
}
|
||||
|
||||
fn emit_event(&self, event: &ControlEvent) {
|
||||
if let Ok(json) = serde_json::to_value(event) {
|
||||
let _ = self.event_tx.send(json);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn make_control_plane() -> ControlPlane {
|
||||
let (tx, _) = tokio::sync::broadcast::channel(16);
|
||||
ControlPlane::new(100, tx)
|
||||
}
|
||||
|
||||
fn make_node(id: &str) -> NodeInfo {
|
||||
NodeInfo {
|
||||
id: id.to_string(),
|
||||
name: Some(format!("node-{id}")),
|
||||
address: Some("127.0.0.1:42618".to_string()),
|
||||
version: Some("0.4.3".to_string()),
|
||||
capabilities: vec![NodeCapability::Inference, NodeCapability::ToolExecution],
|
||||
status: NodeStatus::Healthy,
|
||||
registered_at: Utc::now(),
|
||||
last_heartbeat: Utc::now(),
|
||||
missed_heartbeats: 0,
|
||||
metadata: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_register_and_list() {
|
||||
let cp = make_control_plane();
|
||||
assert!(cp.register(make_node("a")));
|
||||
assert!(cp.register(make_node("b")));
|
||||
assert_eq!(cp.node_count(), 2);
|
||||
let nodes = cp.list_nodes();
|
||||
assert_eq!(nodes.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_deregister() {
|
||||
let cp = make_control_plane();
|
||||
cp.register(make_node("a"));
|
||||
assert!(cp.deregister("a"));
|
||||
assert!(!cp.deregister("a")); // already removed
|
||||
assert_eq!(cp.node_count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_heartbeat() {
|
||||
let cp = make_control_plane();
|
||||
cp.register(make_node("a"));
|
||||
assert!(cp.heartbeat("a"));
|
||||
assert!(!cp.heartbeat("nonexistent"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_health_tick_degraded() {
|
||||
let cp = make_control_plane();
|
||||
cp.register(make_node("a"));
|
||||
for _ in 0..3 {
|
||||
cp.health_tick();
|
||||
}
|
||||
let node = cp.get_node("a").unwrap();
|
||||
assert_eq!(node.status, NodeStatus::Degraded);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_health_tick_unreachable() {
|
||||
let cp = make_control_plane();
|
||||
cp.register(make_node("a"));
|
||||
for _ in 0..10 {
|
||||
cp.health_tick();
|
||||
}
|
||||
let node = cp.get_node("a").unwrap();
|
||||
assert_eq!(node.status, NodeStatus::Unreachable);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_heartbeat_resets_status() {
|
||||
let cp = make_control_plane();
|
||||
cp.register(make_node("a"));
|
||||
for _ in 0..5 {
|
||||
cp.health_tick();
|
||||
}
|
||||
assert_eq!(cp.get_node("a").unwrap().status, NodeStatus::Degraded);
|
||||
cp.heartbeat("a");
|
||||
assert_eq!(cp.get_node("a").unwrap().status, NodeStatus::Healthy);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_capacity_limit() {
|
||||
let (tx, _) = tokio::sync::broadcast::channel(16);
|
||||
let cp = ControlPlane::new(2, tx);
|
||||
assert!(cp.register(make_node("a")));
|
||||
assert!(cp.register(make_node("b")));
|
||||
assert!(!cp.register(make_node("c"))); // at capacity
|
||||
}
|
||||
}
|
||||
@ -8,8 +8,12 @@
|
||||
//! - Header sanitization (handled by axum/hyper)
|
||||
|
||||
pub mod api;
|
||||
pub mod api_control_plane;
|
||||
pub mod api_pairing;
|
||||
pub mod backoff;
|
||||
pub mod control_plane;
|
||||
pub mod nodes;
|
||||
pub mod session_keys;
|
||||
pub mod sse;
|
||||
pub mod static_files;
|
||||
pub mod ws;
|
||||
@ -339,6 +343,8 @@ pub struct AppState {
|
||||
pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
|
||||
/// Pending pairing request store
|
||||
pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
|
||||
/// Control plane for multi-node orchestration (None if nodes.enabled is false)
|
||||
pub control_plane: Option<Arc<control_plane::ControlPlane>>,
|
||||
}
|
||||
|
||||
/// Run the HTTP gateway using axum with proper HTTP/1.1 compliance.
|
||||
@ -704,6 +710,16 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
||||
None
|
||||
};
|
||||
|
||||
// Control plane for multi-node orchestration
|
||||
let control_plane = if config.nodes.enabled {
|
||||
Some(Arc::new(control_plane::ControlPlane::new(
|
||||
config.nodes.max_nodes,
|
||||
event_tx.clone(),
|
||||
)))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let state = AppState {
|
||||
config: config_state,
|
||||
provider,
|
||||
@ -732,8 +748,17 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
||||
session_backend,
|
||||
device_registry,
|
||||
pending_pairings,
|
||||
control_plane: control_plane.clone(),
|
||||
};
|
||||
|
||||
// Spawn control plane health monitor if enabled
|
||||
if let Some(ref cp) = control_plane {
|
||||
let cp_clone = Arc::clone(cp);
|
||||
tokio::spawn(async move {
|
||||
cp_clone.health_monitor(30).await;
|
||||
});
|
||||
}
|
||||
|
||||
// Config PUT needs larger body limit (1MB)
|
||||
let config_put_router = Router::new()
|
||||
.route("/api/config", put(api::handle_api_config_put))
|
||||
@ -790,6 +815,20 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
||||
"/api/devices/{id}/token/rotate",
|
||||
post(api_pairing::rotate_token),
|
||||
)
|
||||
// ── Control Plane API ──
|
||||
.route("/api/nodes", get(api_control_plane::list_nodes))
|
||||
.route(
|
||||
"/api/nodes/register",
|
||||
post(api_control_plane::register_node),
|
||||
)
|
||||
.route(
|
||||
"/api/nodes/{id}/heartbeat",
|
||||
post(api_control_plane::node_heartbeat),
|
||||
)
|
||||
.route(
|
||||
"/api/nodes/{id}",
|
||||
delete(api_control_plane::deregister_node),
|
||||
)
|
||||
// ── SSE event stream ──
|
||||
.route("/api/events", get(sse::handle_sse_events))
|
||||
// ── WebSocket agent chat ──
|
||||
@ -1895,6 +1934,7 @@ mod tests {
|
||||
session_backend: None,
|
||||
device_registry: None,
|
||||
pending_pairings: None,
|
||||
control_plane: None,
|
||||
};
|
||||
|
||||
let response = handle_metrics(State(state)).await.into_response();
|
||||
@ -1950,6 +1990,7 @@ mod tests {
|
||||
session_backend: None,
|
||||
device_registry: None,
|
||||
pending_pairings: None,
|
||||
control_plane: None,
|
||||
};
|
||||
|
||||
let response = handle_metrics(State(state)).await.into_response();
|
||||
@ -2329,6 +2370,7 @@ mod tests {
|
||||
session_backend: None,
|
||||
device_registry: None,
|
||||
pending_pairings: None,
|
||||
control_plane: None,
|
||||
};
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
@ -2398,6 +2440,7 @@ mod tests {
|
||||
session_backend: None,
|
||||
device_registry: None,
|
||||
pending_pairings: None,
|
||||
control_plane: None,
|
||||
};
|
||||
|
||||
let headers = HeaderMap::new();
|
||||
@ -2479,6 +2522,7 @@ mod tests {
|
||||
session_backend: None,
|
||||
device_registry: None,
|
||||
pending_pairings: None,
|
||||
control_plane: None,
|
||||
};
|
||||
|
||||
let response = handle_webhook(
|
||||
@ -2532,6 +2576,7 @@ mod tests {
|
||||
session_backend: None,
|
||||
device_registry: None,
|
||||
pending_pairings: None,
|
||||
control_plane: None,
|
||||
};
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
@ -2590,6 +2635,7 @@ mod tests {
|
||||
session_backend: None,
|
||||
device_registry: None,
|
||||
pending_pairings: None,
|
||||
control_plane: None,
|
||||
};
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
@ -2653,6 +2699,7 @@ mod tests {
|
||||
session_backend: None,
|
||||
device_registry: None,
|
||||
pending_pairings: None,
|
||||
control_plane: None,
|
||||
};
|
||||
|
||||
let response = Box::pin(handle_nextcloud_talk_webhook(
|
||||
@ -2712,6 +2759,7 @@ mod tests {
|
||||
session_backend: None,
|
||||
device_registry: None,
|
||||
pending_pairings: None,
|
||||
control_plane: None,
|
||||
};
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
|
||||
124
src/gateway/session_keys.rs
Normal file
124
src/gateway/session_keys.rs
Normal file
@ -0,0 +1,124 @@
|
||||
//! Session key parsing for scoped gateway access.
|
||||
//!
|
||||
//! Session keys encode a scope that limits what operations the bearer
|
||||
//! can perform. Format: `zcs_<scope>_<random>` where scope is one of:
|
||||
//! `main`, `sender_<id>`, `cron`, `subagent`.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// The scope encoded in a session key.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum SessionScope {
|
||||
/// Full access (equivalent to the main bearer token)
|
||||
Main,
|
||||
/// Scoped to a specific sender/channel
|
||||
Sender(String),
|
||||
/// Scoped to cron job execution
|
||||
Cron,
|
||||
/// Scoped to a sub-agent delegation
|
||||
SubAgent,
|
||||
}
|
||||
|
||||
/// A parsed session key.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SessionKey {
|
||||
pub scope: SessionScope,
|
||||
pub raw: String,
|
||||
}
|
||||
|
||||
impl SessionKey {
|
||||
/// Parse a session key string.
|
||||
/// Returns None if the format is invalid.
|
||||
pub fn parse(key: &str) -> Option<Self> {
|
||||
let key = key.trim();
|
||||
if !key.starts_with("zcs_") {
|
||||
return None;
|
||||
}
|
||||
|
||||
let rest = &key[4..]; // skip "zcs_"
|
||||
let scope = if rest.starts_with("main_") {
|
||||
SessionScope::Main
|
||||
} else if let Some(sender_rest) = rest.strip_prefix("sender_") {
|
||||
// Format: zcs_sender_<id>_<random>
|
||||
let parts: Vec<&str> = sender_rest.splitn(2, '_').collect();
|
||||
if parts.len() < 2 {
|
||||
return None;
|
||||
}
|
||||
SessionScope::Sender(parts[0].to_string())
|
||||
} else if rest.starts_with("cron_") {
|
||||
SessionScope::Cron
|
||||
} else if rest.starts_with("subagent_") {
|
||||
SessionScope::SubAgent
|
||||
} else {
|
||||
return None;
|
||||
};
|
||||
|
||||
Some(Self {
|
||||
scope,
|
||||
raw: key.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Check if this key's scope allows a given operation.
|
||||
pub fn allows(&self, operation: &str) -> bool {
|
||||
match &self.scope {
|
||||
SessionScope::Main => true, // full access
|
||||
SessionScope::Sender(_) => {
|
||||
matches!(operation, "chat" | "memory_read" | "status")
|
||||
}
|
||||
SessionScope::Cron => {
|
||||
matches!(
|
||||
operation,
|
||||
"chat" | "tool_execute" | "memory_read" | "memory_write"
|
||||
)
|
||||
}
|
||||
SessionScope::SubAgent => {
|
||||
matches!(operation, "chat" | "tool_execute" | "memory_read")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse_main() {
|
||||
let key = SessionKey::parse("zcs_main_abc123def456").unwrap();
|
||||
assert_eq!(key.scope, SessionScope::Main);
|
||||
assert!(key.allows("chat"));
|
||||
assert!(key.allows("anything"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_sender() {
|
||||
let key = SessionKey::parse("zcs_sender_telegram123_abc456").unwrap();
|
||||
assert_eq!(key.scope, SessionScope::Sender("telegram123".to_string()));
|
||||
assert!(key.allows("chat"));
|
||||
assert!(!key.allows("tool_execute"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_cron() {
|
||||
let key = SessionKey::parse("zcs_cron_random789").unwrap();
|
||||
assert_eq!(key.scope, SessionScope::Cron);
|
||||
assert!(key.allows("chat"));
|
||||
assert!(key.allows("tool_execute"));
|
||||
assert!(!key.allows("admin"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_subagent() {
|
||||
let key = SessionKey::parse("zcs_subagent_xyz").unwrap();
|
||||
assert_eq!(key.scope, SessionScope::SubAgent);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_invalid_prefix() {
|
||||
assert!(SessionKey::parse("zc_main_abc").is_none());
|
||||
assert!(SessionKey::parse("").is_none());
|
||||
assert!(SessionKey::parse("zcs_unknown_abc").is_none());
|
||||
}
|
||||
}
|
||||
@ -236,7 +236,8 @@ async fn handle_socket(socket: WebSocket, state: AppState, session_id: Option<St
|
||||
let user_msg = crate::providers::ChatMessage::user(&content);
|
||||
let _ = backend.append(&session_key, &user_msg);
|
||||
}
|
||||
process_chat_message(&state, &mut agent, &mut sender, &content, &session_key).await;
|
||||
process_chat_message(&state, &mut agent, &mut sender, &content, &session_key)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user