Compare commits
5 Commits
master
...
feat/contr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0087b13fa0 | ||
|
|
21d6dfd8d9 | ||
|
|
7bef57cba3 | ||
|
|
2b3ab578b5 | ||
|
|
7b2c3dfeb5 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -7981,6 +7981,7 @@ version = "0.5.0"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-imap",
|
"async-imap",
|
||||||
|
"async-stream",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"axum",
|
"axum",
|
||||||
"base64",
|
"base64",
|
||||||
|
|||||||
@ -42,6 +42,7 @@ clap_complete = "4.5"
|
|||||||
tokio = { version = "1.50", default-features = false, features = ["rt-multi-thread", "macros", "time", "net", "io-util", "sync", "process", "io-std", "fs", "signal"] }
|
tokio = { version = "1.50", default-features = false, features = ["rt-multi-thread", "macros", "time", "net", "io-util", "sync", "process", "io-std", "fs", "signal"] }
|
||||||
tokio-util = { version = "0.7", default-features = false }
|
tokio-util = { version = "0.7", default-features = false }
|
||||||
tokio-stream = { version = "0.1.18", default-features = false, features = ["fs", "sync"] }
|
tokio-stream = { version = "0.1.18", default-features = false, features = ["fs", "sync"] }
|
||||||
|
async-stream = "0.3"
|
||||||
|
|
||||||
# HTTP client - minimal features
|
# HTTP client - minimal features
|
||||||
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "blocking", "multipart", "stream", "socks"] }
|
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "blocking", "multipart", "stream", "socks"] }
|
||||||
|
|||||||
153
src/gateway/api_control_plane.rs
Normal file
153
src/gateway/api_control_plane.rs
Normal file
@ -0,0 +1,153 @@
|
|||||||
|
//! Control plane REST API handlers.
|
||||||
|
|
||||||
|
use super::control_plane::{NodeCapability, NodeInfo, NodeStatus};
|
||||||
|
use super::AppState;
|
||||||
|
use axum::{
|
||||||
|
extract::{Path, State},
|
||||||
|
http::{header, HeaderMap, StatusCode},
|
||||||
|
response::{IntoResponse, Json},
|
||||||
|
};
|
||||||
|
use chrono::Utc;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
fn require_auth(state: &AppState, headers: &HeaderMap) -> Result<(), (StatusCode, &'static str)> {
|
||||||
|
if state.pairing.require_pairing() {
|
||||||
|
let token = headers
|
||||||
|
.get(header::AUTHORIZATION)
|
||||||
|
.and_then(|v| v.to_str().ok())
|
||||||
|
.and_then(|auth| auth.strip_prefix("Bearer "))
|
||||||
|
.unwrap_or("");
|
||||||
|
if !state.pairing.is_authenticated(token) {
|
||||||
|
return Err((StatusCode::UNAUTHORIZED, "Unauthorized"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// GET /api/control-plane/nodes — list all registered nodes
|
||||||
|
pub async fn list_nodes(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
|
||||||
|
if let Err(e) = require_auth(&state, &headers) {
|
||||||
|
return e.into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
match &state.control_plane {
|
||||||
|
Some(cp) => {
|
||||||
|
let nodes = cp.list_nodes();
|
||||||
|
let count = nodes.len();
|
||||||
|
Json(serde_json::json!({
|
||||||
|
"nodes": nodes,
|
||||||
|
"count": count
|
||||||
|
}))
|
||||||
|
.into_response()
|
||||||
|
}
|
||||||
|
None => (StatusCode::SERVICE_UNAVAILABLE, "Control plane not enabled").into_response(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct RegisterNodeRequest {
|
||||||
|
pub id: String,
|
||||||
|
pub name: Option<String>,
|
||||||
|
pub address: Option<String>,
|
||||||
|
pub version: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub capabilities: Vec<NodeCapability>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub metadata: HashMap<String, String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// POST /api/control-plane/nodes — register a new node
|
||||||
|
pub async fn register_node(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
headers: HeaderMap,
|
||||||
|
Json(body): Json<RegisterNodeRequest>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
if let Err(e) = require_auth(&state, &headers) {
|
||||||
|
return e.into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
match &state.control_plane {
|
||||||
|
Some(cp) => {
|
||||||
|
let node_id = body.id.clone();
|
||||||
|
let info = NodeInfo {
|
||||||
|
id: body.id,
|
||||||
|
name: body.name,
|
||||||
|
address: body.address,
|
||||||
|
version: body.version,
|
||||||
|
capabilities: body.capabilities,
|
||||||
|
status: NodeStatus::Healthy,
|
||||||
|
registered_at: Utc::now(),
|
||||||
|
last_heartbeat: Utc::now(),
|
||||||
|
missed_heartbeats: 0,
|
||||||
|
metadata: body.metadata,
|
||||||
|
};
|
||||||
|
|
||||||
|
if cp.register(info) {
|
||||||
|
(
|
||||||
|
StatusCode::CREATED,
|
||||||
|
Json(serde_json::json!({
|
||||||
|
"message": "Node registered",
|
||||||
|
"node_id": node_id
|
||||||
|
})),
|
||||||
|
)
|
||||||
|
.into_response()
|
||||||
|
} else {
|
||||||
|
(StatusCode::CONFLICT, "Node capacity reached").into_response()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => (StatusCode::SERVICE_UNAVAILABLE, "Control plane not enabled").into_response(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// POST /api/control-plane/nodes/{id}/heartbeat — record a heartbeat
|
||||||
|
pub async fn node_heartbeat(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
headers: HeaderMap,
|
||||||
|
Path(node_id): Path<String>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
if let Err(e) = require_auth(&state, &headers) {
|
||||||
|
return e.into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
match &state.control_plane {
|
||||||
|
Some(cp) => {
|
||||||
|
if cp.heartbeat(&node_id) {
|
||||||
|
Json(serde_json::json!({
|
||||||
|
"message": "Heartbeat recorded",
|
||||||
|
"node_id": node_id
|
||||||
|
}))
|
||||||
|
.into_response()
|
||||||
|
} else {
|
||||||
|
(StatusCode::NOT_FOUND, "Node not found").into_response()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => (StatusCode::SERVICE_UNAVAILABLE, "Control plane not enabled").into_response(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// DELETE /api/control-plane/nodes/{id} — deregister a node
|
||||||
|
pub async fn deregister_node(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
headers: HeaderMap,
|
||||||
|
Path(node_id): Path<String>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
if let Err(e) = require_auth(&state, &headers) {
|
||||||
|
return e.into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
match &state.control_plane {
|
||||||
|
Some(cp) => {
|
||||||
|
if cp.deregister(&node_id) {
|
||||||
|
Json(serde_json::json!({
|
||||||
|
"message": "Node deregistered",
|
||||||
|
"node_id": node_id
|
||||||
|
}))
|
||||||
|
.into_response()
|
||||||
|
} else {
|
||||||
|
(StatusCode::NOT_FOUND, "Node not found").into_response()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => (StatusCode::SERVICE_UNAVAILABLE, "Control plane not enabled").into_response(),
|
||||||
|
}
|
||||||
|
}
|
||||||
116
src/gateway/backoff.rs
Normal file
116
src/gateway/backoff.rs
Normal file
@ -0,0 +1,116 @@
|
|||||||
|
//! Exponential backoff for channel restart and retry logic.
|
||||||
|
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
/// Exponential backoff with configurable bounds and jitter.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ExponentialBackoff {
|
||||||
|
/// Current attempt number (0-indexed)
|
||||||
|
attempt: u32,
|
||||||
|
/// Base delay for the first retry
|
||||||
|
base: Duration,
|
||||||
|
/// Maximum delay cap
|
||||||
|
max: Duration,
|
||||||
|
/// Multiplier per attempt
|
||||||
|
factor: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ExponentialBackoff {
|
||||||
|
/// Create a new backoff starting at `base` delay, capped at `max`.
|
||||||
|
pub fn new(base: Duration, max: Duration) -> Self {
|
||||||
|
Self {
|
||||||
|
attempt: 0,
|
||||||
|
base,
|
||||||
|
max,
|
||||||
|
factor: 2.0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create with a custom multiplier factor.
|
||||||
|
pub fn with_factor(mut self, factor: f64) -> Self {
|
||||||
|
self.factor = factor.max(1.0);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the next delay and advance the attempt counter.
|
||||||
|
pub fn next_delay(&mut self) -> Duration {
|
||||||
|
let delay = self.base.mul_f64(self.factor.powi(self.attempt as i32));
|
||||||
|
self.attempt = self.attempt.saturating_add(1);
|
||||||
|
|
||||||
|
// Add jitter (+-25%)
|
||||||
|
let jitter_factor = 0.75 + (pseudo_random() * 0.5);
|
||||||
|
let jittered = delay.mul_f64(jitter_factor);
|
||||||
|
|
||||||
|
jittered.min(self.max)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reset the backoff counter (e.g., after a successful connection).
|
||||||
|
pub fn reset(&mut self) {
|
||||||
|
self.attempt = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Current attempt number.
|
||||||
|
pub fn attempt(&self) -> u32 {
|
||||||
|
self.attempt
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Async sleep for the next backoff duration.
|
||||||
|
pub async fn wait(&mut self) {
|
||||||
|
let delay = self.next_delay();
|
||||||
|
tokio::time::sleep(delay).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for ExponentialBackoff {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new(
|
||||||
|
Duration::from_secs(1),
|
||||||
|
Duration::from_secs(300), // 5 minutes max
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Simple deterministic pseudo-random for jitter (no external dep needed).
|
||||||
|
fn pseudo_random() -> f64 {
|
||||||
|
use std::time::SystemTime;
|
||||||
|
let nanos = SystemTime::now()
|
||||||
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.subsec_nanos();
|
||||||
|
f64::from(nanos % 1000) / 1000.0
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_default_backoff() {
|
||||||
|
let mut b = ExponentialBackoff::default();
|
||||||
|
let d1 = b.next_delay();
|
||||||
|
let _d2 = b.next_delay();
|
||||||
|
let d3 = b.next_delay();
|
||||||
|
// Each delay should generally increase (with jitter)
|
||||||
|
assert!(d1 < Duration::from_secs(5));
|
||||||
|
assert!(d3 <= Duration::from_secs(300));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_max_cap() {
|
||||||
|
let mut b = ExponentialBackoff::new(Duration::from_secs(60), Duration::from_secs(120));
|
||||||
|
for _ in 0..20 {
|
||||||
|
let d = b.next_delay();
|
||||||
|
assert!(d <= Duration::from_secs(120));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_reset() {
|
||||||
|
let mut b = ExponentialBackoff::default();
|
||||||
|
b.next_delay();
|
||||||
|
b.next_delay();
|
||||||
|
assert_eq!(b.attempt(), 2);
|
||||||
|
b.reset();
|
||||||
|
assert_eq!(b.attempt(), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
332
src/gateway/control_plane.rs
Normal file
332
src/gateway/control_plane.rs
Normal file
@ -0,0 +1,332 @@
|
|||||||
|
//! Control plane for multi-node orchestration.
|
||||||
|
//!
|
||||||
|
//! Wraps the existing `NodeRegistry` to add capability tracking,
|
||||||
|
//! health status, and control events without replacing the underlying registry.
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
/// Capabilities that a node can advertise.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum NodeCapability {
|
||||||
|
/// Can run LLM provider inference
|
||||||
|
Inference,
|
||||||
|
/// Can execute tools (shell, file, etc.)
|
||||||
|
ToolExecution,
|
||||||
|
/// Has hardware peripherals attached
|
||||||
|
Hardware,
|
||||||
|
/// Can serve as a gateway
|
||||||
|
Gateway,
|
||||||
|
/// Has memory/storage backend
|
||||||
|
Storage,
|
||||||
|
/// Custom capability
|
||||||
|
Custom(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Health status of a node.
|
||||||
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum NodeStatus {
|
||||||
|
/// Node is healthy and responding
|
||||||
|
Healthy,
|
||||||
|
/// Node has missed recent heartbeats
|
||||||
|
Degraded,
|
||||||
|
/// Node is not responding
|
||||||
|
Unreachable,
|
||||||
|
/// Node is intentionally offline
|
||||||
|
Offline,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extended information about a registered node.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct NodeInfo {
|
||||||
|
/// Unique node identifier
|
||||||
|
pub id: String,
|
||||||
|
/// Human-readable node name
|
||||||
|
pub name: Option<String>,
|
||||||
|
/// Node's advertised address (host:port)
|
||||||
|
pub address: Option<String>,
|
||||||
|
/// Node version
|
||||||
|
pub version: Option<String>,
|
||||||
|
/// Capabilities the node supports
|
||||||
|
pub capabilities: Vec<NodeCapability>,
|
||||||
|
/// Current health status
|
||||||
|
pub status: NodeStatus,
|
||||||
|
/// When this node was first registered
|
||||||
|
pub registered_at: DateTime<Utc>,
|
||||||
|
/// Last successful heartbeat
|
||||||
|
pub last_heartbeat: DateTime<Utc>,
|
||||||
|
/// Consecutive missed heartbeats
|
||||||
|
pub missed_heartbeats: u32,
|
||||||
|
/// Arbitrary metadata
|
||||||
|
pub metadata: HashMap<String, String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Events emitted by the control plane.
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
#[serde(tag = "type", rename_all = "snake_case")]
|
||||||
|
#[allow(clippy::enum_variant_names)]
|
||||||
|
pub enum ControlEvent {
|
||||||
|
/// A node registered
|
||||||
|
NodeRegistered {
|
||||||
|
node_id: String,
|
||||||
|
name: Option<String>,
|
||||||
|
capabilities: Vec<NodeCapability>,
|
||||||
|
timestamp: DateTime<Utc>,
|
||||||
|
},
|
||||||
|
/// A node was deregistered
|
||||||
|
NodeDeregistered {
|
||||||
|
node_id: String,
|
||||||
|
timestamp: DateTime<Utc>,
|
||||||
|
},
|
||||||
|
/// A node's status changed
|
||||||
|
NodeStatusChanged {
|
||||||
|
node_id: String,
|
||||||
|
old_status: NodeStatus,
|
||||||
|
new_status: NodeStatus,
|
||||||
|
timestamp: DateTime<Utc>,
|
||||||
|
},
|
||||||
|
/// A node sent a heartbeat
|
||||||
|
NodeHeartbeat {
|
||||||
|
node_id: String,
|
||||||
|
timestamp: DateTime<Utc>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The control plane manages node lifecycle and health.
|
||||||
|
pub struct ControlPlane {
|
||||||
|
nodes: Mutex<HashMap<String, NodeInfo>>,
|
||||||
|
max_nodes: usize,
|
||||||
|
event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
|
||||||
|
/// How many missed heartbeats before marking degraded
|
||||||
|
degraded_threshold: u32,
|
||||||
|
/// How many missed heartbeats before marking unreachable
|
||||||
|
unreachable_threshold: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ControlPlane {
|
||||||
|
pub fn new(
|
||||||
|
max_nodes: usize,
|
||||||
|
event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
nodes: Mutex::new(HashMap::new()),
|
||||||
|
max_nodes,
|
||||||
|
event_tx,
|
||||||
|
degraded_threshold: 3,
|
||||||
|
unreachable_threshold: 10,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register a new node. Returns false if capacity is reached.
|
||||||
|
pub fn register(&self, info: NodeInfo) -> bool {
|
||||||
|
let mut nodes = self.nodes.lock();
|
||||||
|
if nodes.len() >= self.max_nodes && !nodes.contains_key(&info.id) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
let event = ControlEvent::NodeRegistered {
|
||||||
|
node_id: info.id.clone(),
|
||||||
|
name: info.name.clone(),
|
||||||
|
capabilities: info.capabilities.clone(),
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
};
|
||||||
|
nodes.insert(info.id.clone(), info);
|
||||||
|
self.emit_event(&event);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Deregister a node by ID.
|
||||||
|
pub fn deregister(&self, node_id: &str) -> bool {
|
||||||
|
let mut nodes = self.nodes.lock();
|
||||||
|
if nodes.remove(node_id).is_some() {
|
||||||
|
self.emit_event(&ControlEvent::NodeDeregistered {
|
||||||
|
node_id: node_id.to_string(),
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
});
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record a heartbeat from a node.
|
||||||
|
pub fn heartbeat(&self, node_id: &str) -> bool {
|
||||||
|
let mut nodes = self.nodes.lock();
|
||||||
|
if let Some(node) = nodes.get_mut(node_id) {
|
||||||
|
let old_status = node.status;
|
||||||
|
node.last_heartbeat = Utc::now();
|
||||||
|
node.missed_heartbeats = 0;
|
||||||
|
node.status = NodeStatus::Healthy;
|
||||||
|
|
||||||
|
if old_status != NodeStatus::Healthy {
|
||||||
|
self.emit_event(&ControlEvent::NodeStatusChanged {
|
||||||
|
node_id: node_id.to_string(),
|
||||||
|
old_status,
|
||||||
|
new_status: NodeStatus::Healthy,
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
self.emit_event(&ControlEvent::NodeHeartbeat {
|
||||||
|
node_id: node_id.to_string(),
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
});
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List all registered nodes.
|
||||||
|
pub fn list_nodes(&self) -> Vec<NodeInfo> {
|
||||||
|
self.nodes.lock().values().cloned().collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a specific node by ID.
|
||||||
|
pub fn get_node(&self, node_id: &str) -> Option<NodeInfo> {
|
||||||
|
self.nodes.lock().get(node_id).cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Health monitor tick — increment missed heartbeats and update statuses.
|
||||||
|
/// Should be called periodically (e.g., every 30s).
|
||||||
|
pub fn health_tick(&self) {
|
||||||
|
let mut nodes = self.nodes.lock();
|
||||||
|
for node in nodes.values_mut() {
|
||||||
|
if node.status == NodeStatus::Offline {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
node.missed_heartbeats += 1;
|
||||||
|
let old_status = node.status;
|
||||||
|
let new_status = if node.missed_heartbeats >= self.unreachable_threshold {
|
||||||
|
NodeStatus::Unreachable
|
||||||
|
} else if node.missed_heartbeats >= self.degraded_threshold {
|
||||||
|
NodeStatus::Degraded
|
||||||
|
} else {
|
||||||
|
NodeStatus::Healthy
|
||||||
|
};
|
||||||
|
|
||||||
|
if old_status != new_status {
|
||||||
|
node.status = new_status;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run the health monitor background loop.
|
||||||
|
pub async fn health_monitor(self: Arc<Self>, interval_secs: u64) {
|
||||||
|
let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
self.health_tick();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Node count.
|
||||||
|
pub fn node_count(&self) -> usize {
|
||||||
|
self.nodes.lock().len()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn emit_event(&self, event: &ControlEvent) {
|
||||||
|
if let Ok(json) = serde_json::to_value(event) {
|
||||||
|
let _ = self.event_tx.send(json);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn make_control_plane() -> ControlPlane {
|
||||||
|
let (tx, _) = tokio::sync::broadcast::channel(16);
|
||||||
|
ControlPlane::new(100, tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_node(id: &str) -> NodeInfo {
|
||||||
|
NodeInfo {
|
||||||
|
id: id.to_string(),
|
||||||
|
name: Some(format!("node-{id}")),
|
||||||
|
address: Some("127.0.0.1:42618".to_string()),
|
||||||
|
version: Some("0.4.3".to_string()),
|
||||||
|
capabilities: vec![NodeCapability::Inference, NodeCapability::ToolExecution],
|
||||||
|
status: NodeStatus::Healthy,
|
||||||
|
registered_at: Utc::now(),
|
||||||
|
last_heartbeat: Utc::now(),
|
||||||
|
missed_heartbeats: 0,
|
||||||
|
metadata: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_register_and_list() {
|
||||||
|
let cp = make_control_plane();
|
||||||
|
assert!(cp.register(make_node("a")));
|
||||||
|
assert!(cp.register(make_node("b")));
|
||||||
|
assert_eq!(cp.node_count(), 2);
|
||||||
|
let nodes = cp.list_nodes();
|
||||||
|
assert_eq!(nodes.len(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_deregister() {
|
||||||
|
let cp = make_control_plane();
|
||||||
|
cp.register(make_node("a"));
|
||||||
|
assert!(cp.deregister("a"));
|
||||||
|
assert!(!cp.deregister("a")); // already removed
|
||||||
|
assert_eq!(cp.node_count(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_heartbeat() {
|
||||||
|
let cp = make_control_plane();
|
||||||
|
cp.register(make_node("a"));
|
||||||
|
assert!(cp.heartbeat("a"));
|
||||||
|
assert!(!cp.heartbeat("nonexistent"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_health_tick_degraded() {
|
||||||
|
let cp = make_control_plane();
|
||||||
|
cp.register(make_node("a"));
|
||||||
|
for _ in 0..3 {
|
||||||
|
cp.health_tick();
|
||||||
|
}
|
||||||
|
let node = cp.get_node("a").unwrap();
|
||||||
|
assert_eq!(node.status, NodeStatus::Degraded);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_health_tick_unreachable() {
|
||||||
|
let cp = make_control_plane();
|
||||||
|
cp.register(make_node("a"));
|
||||||
|
for _ in 0..10 {
|
||||||
|
cp.health_tick();
|
||||||
|
}
|
||||||
|
let node = cp.get_node("a").unwrap();
|
||||||
|
assert_eq!(node.status, NodeStatus::Unreachable);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_heartbeat_resets_status() {
|
||||||
|
let cp = make_control_plane();
|
||||||
|
cp.register(make_node("a"));
|
||||||
|
for _ in 0..5 {
|
||||||
|
cp.health_tick();
|
||||||
|
}
|
||||||
|
assert_eq!(cp.get_node("a").unwrap().status, NodeStatus::Degraded);
|
||||||
|
cp.heartbeat("a");
|
||||||
|
assert_eq!(cp.get_node("a").unwrap().status, NodeStatus::Healthy);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_capacity_limit() {
|
||||||
|
let (tx, _) = tokio::sync::broadcast::channel(16);
|
||||||
|
let cp = ControlPlane::new(2, tx);
|
||||||
|
assert!(cp.register(make_node("a")));
|
||||||
|
assert!(cp.register(make_node("b")));
|
||||||
|
assert!(!cp.register(make_node("c"))); // at capacity
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -8,8 +8,12 @@
|
|||||||
//! - Header sanitization (handled by axum/hyper)
|
//! - Header sanitization (handled by axum/hyper)
|
||||||
|
|
||||||
pub mod api;
|
pub mod api;
|
||||||
|
pub mod api_control_plane;
|
||||||
pub mod api_pairing;
|
pub mod api_pairing;
|
||||||
|
pub mod backoff;
|
||||||
|
pub mod control_plane;
|
||||||
pub mod nodes;
|
pub mod nodes;
|
||||||
|
pub mod session_keys;
|
||||||
pub mod sse;
|
pub mod sse;
|
||||||
pub mod static_files;
|
pub mod static_files;
|
||||||
pub mod ws;
|
pub mod ws;
|
||||||
@ -339,6 +343,8 @@ pub struct AppState {
|
|||||||
pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
|
pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
|
||||||
/// Pending pairing request store
|
/// Pending pairing request store
|
||||||
pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
|
pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
|
||||||
|
/// Control plane for multi-node orchestration (None if nodes.enabled is false)
|
||||||
|
pub control_plane: Option<Arc<control_plane::ControlPlane>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Run the HTTP gateway using axum with proper HTTP/1.1 compliance.
|
/// Run the HTTP gateway using axum with proper HTTP/1.1 compliance.
|
||||||
@ -704,6 +710,16 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Control plane for multi-node orchestration
|
||||||
|
let control_plane = if config.nodes.enabled {
|
||||||
|
Some(Arc::new(control_plane::ControlPlane::new(
|
||||||
|
config.nodes.max_nodes,
|
||||||
|
event_tx.clone(),
|
||||||
|
)))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let state = AppState {
|
let state = AppState {
|
||||||
config: config_state,
|
config: config_state,
|
||||||
provider,
|
provider,
|
||||||
@ -732,8 +748,17 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
|||||||
session_backend,
|
session_backend,
|
||||||
device_registry,
|
device_registry,
|
||||||
pending_pairings,
|
pending_pairings,
|
||||||
|
control_plane: control_plane.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Spawn control plane health monitor if enabled
|
||||||
|
if let Some(ref cp) = control_plane {
|
||||||
|
let cp_clone = Arc::clone(cp);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
cp_clone.health_monitor(30).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Config PUT needs larger body limit (1MB)
|
// Config PUT needs larger body limit (1MB)
|
||||||
let config_put_router = Router::new()
|
let config_put_router = Router::new()
|
||||||
.route("/api/config", put(api::handle_api_config_put))
|
.route("/api/config", put(api::handle_api_config_put))
|
||||||
@ -790,6 +815,20 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
|||||||
"/api/devices/{id}/token/rotate",
|
"/api/devices/{id}/token/rotate",
|
||||||
post(api_pairing::rotate_token),
|
post(api_pairing::rotate_token),
|
||||||
)
|
)
|
||||||
|
// ── Control Plane API ──
|
||||||
|
.route("/api/nodes", get(api_control_plane::list_nodes))
|
||||||
|
.route(
|
||||||
|
"/api/nodes/register",
|
||||||
|
post(api_control_plane::register_node),
|
||||||
|
)
|
||||||
|
.route(
|
||||||
|
"/api/nodes/{id}/heartbeat",
|
||||||
|
post(api_control_plane::node_heartbeat),
|
||||||
|
)
|
||||||
|
.route(
|
||||||
|
"/api/nodes/{id}",
|
||||||
|
delete(api_control_plane::deregister_node),
|
||||||
|
)
|
||||||
// ── SSE event stream ──
|
// ── SSE event stream ──
|
||||||
.route("/api/events", get(sse::handle_sse_events))
|
.route("/api/events", get(sse::handle_sse_events))
|
||||||
// ── WebSocket agent chat ──
|
// ── WebSocket agent chat ──
|
||||||
@ -1895,6 +1934,7 @@ mod tests {
|
|||||||
session_backend: None,
|
session_backend: None,
|
||||||
device_registry: None,
|
device_registry: None,
|
||||||
pending_pairings: None,
|
pending_pairings: None,
|
||||||
|
control_plane: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let response = handle_metrics(State(state)).await.into_response();
|
let response = handle_metrics(State(state)).await.into_response();
|
||||||
@ -1950,6 +1990,7 @@ mod tests {
|
|||||||
session_backend: None,
|
session_backend: None,
|
||||||
device_registry: None,
|
device_registry: None,
|
||||||
pending_pairings: None,
|
pending_pairings: None,
|
||||||
|
control_plane: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let response = handle_metrics(State(state)).await.into_response();
|
let response = handle_metrics(State(state)).await.into_response();
|
||||||
@ -2329,6 +2370,7 @@ mod tests {
|
|||||||
session_backend: None,
|
session_backend: None,
|
||||||
device_registry: None,
|
device_registry: None,
|
||||||
pending_pairings: None,
|
pending_pairings: None,
|
||||||
|
control_plane: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut headers = HeaderMap::new();
|
let mut headers = HeaderMap::new();
|
||||||
@ -2398,6 +2440,7 @@ mod tests {
|
|||||||
session_backend: None,
|
session_backend: None,
|
||||||
device_registry: None,
|
device_registry: None,
|
||||||
pending_pairings: None,
|
pending_pairings: None,
|
||||||
|
control_plane: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let headers = HeaderMap::new();
|
let headers = HeaderMap::new();
|
||||||
@ -2479,6 +2522,7 @@ mod tests {
|
|||||||
session_backend: None,
|
session_backend: None,
|
||||||
device_registry: None,
|
device_registry: None,
|
||||||
pending_pairings: None,
|
pending_pairings: None,
|
||||||
|
control_plane: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let response = handle_webhook(
|
let response = handle_webhook(
|
||||||
@ -2532,6 +2576,7 @@ mod tests {
|
|||||||
session_backend: None,
|
session_backend: None,
|
||||||
device_registry: None,
|
device_registry: None,
|
||||||
pending_pairings: None,
|
pending_pairings: None,
|
||||||
|
control_plane: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut headers = HeaderMap::new();
|
let mut headers = HeaderMap::new();
|
||||||
@ -2590,6 +2635,7 @@ mod tests {
|
|||||||
session_backend: None,
|
session_backend: None,
|
||||||
device_registry: None,
|
device_registry: None,
|
||||||
pending_pairings: None,
|
pending_pairings: None,
|
||||||
|
control_plane: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut headers = HeaderMap::new();
|
let mut headers = HeaderMap::new();
|
||||||
@ -2653,6 +2699,7 @@ mod tests {
|
|||||||
session_backend: None,
|
session_backend: None,
|
||||||
device_registry: None,
|
device_registry: None,
|
||||||
pending_pairings: None,
|
pending_pairings: None,
|
||||||
|
control_plane: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let response = Box::pin(handle_nextcloud_talk_webhook(
|
let response = Box::pin(handle_nextcloud_talk_webhook(
|
||||||
@ -2712,6 +2759,7 @@ mod tests {
|
|||||||
session_backend: None,
|
session_backend: None,
|
||||||
device_registry: None,
|
device_registry: None,
|
||||||
pending_pairings: None,
|
pending_pairings: None,
|
||||||
|
control_plane: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut headers = HeaderMap::new();
|
let mut headers = HeaderMap::new();
|
||||||
|
|||||||
124
src/gateway/session_keys.rs
Normal file
124
src/gateway/session_keys.rs
Normal file
@ -0,0 +1,124 @@
|
|||||||
|
//! Session key parsing for scoped gateway access.
|
||||||
|
//!
|
||||||
|
//! Session keys encode a scope that limits what operations the bearer
|
||||||
|
//! can perform. Format: `zcs_<scope>_<random>` where scope is one of:
|
||||||
|
//! `main`, `sender_<id>`, `cron`, `subagent`.
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// The scope encoded in a session key.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum SessionScope {
|
||||||
|
/// Full access (equivalent to the main bearer token)
|
||||||
|
Main,
|
||||||
|
/// Scoped to a specific sender/channel
|
||||||
|
Sender(String),
|
||||||
|
/// Scoped to cron job execution
|
||||||
|
Cron,
|
||||||
|
/// Scoped to a sub-agent delegation
|
||||||
|
SubAgent,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A parsed session key.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct SessionKey {
|
||||||
|
pub scope: SessionScope,
|
||||||
|
pub raw: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SessionKey {
|
||||||
|
/// Parse a session key string.
|
||||||
|
/// Returns None if the format is invalid.
|
||||||
|
pub fn parse(key: &str) -> Option<Self> {
|
||||||
|
let key = key.trim();
|
||||||
|
if !key.starts_with("zcs_") {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let rest = &key[4..]; // skip "zcs_"
|
||||||
|
let scope = if rest.starts_with("main_") {
|
||||||
|
SessionScope::Main
|
||||||
|
} else if let Some(sender_rest) = rest.strip_prefix("sender_") {
|
||||||
|
// Format: zcs_sender_<id>_<random>
|
||||||
|
let parts: Vec<&str> = sender_rest.splitn(2, '_').collect();
|
||||||
|
if parts.len() < 2 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
SessionScope::Sender(parts[0].to_string())
|
||||||
|
} else if rest.starts_with("cron_") {
|
||||||
|
SessionScope::Cron
|
||||||
|
} else if rest.starts_with("subagent_") {
|
||||||
|
SessionScope::SubAgent
|
||||||
|
} else {
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(Self {
|
||||||
|
scope,
|
||||||
|
raw: key.to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if this key's scope allows a given operation.
|
||||||
|
pub fn allows(&self, operation: &str) -> bool {
|
||||||
|
match &self.scope {
|
||||||
|
SessionScope::Main => true, // full access
|
||||||
|
SessionScope::Sender(_) => {
|
||||||
|
matches!(operation, "chat" | "memory_read" | "status")
|
||||||
|
}
|
||||||
|
SessionScope::Cron => {
|
||||||
|
matches!(
|
||||||
|
operation,
|
||||||
|
"chat" | "tool_execute" | "memory_read" | "memory_write"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
SessionScope::SubAgent => {
|
||||||
|
matches!(operation, "chat" | "tool_execute" | "memory_read")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_main() {
|
||||||
|
let key = SessionKey::parse("zcs_main_abc123def456").unwrap();
|
||||||
|
assert_eq!(key.scope, SessionScope::Main);
|
||||||
|
assert!(key.allows("chat"));
|
||||||
|
assert!(key.allows("anything"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_sender() {
|
||||||
|
let key = SessionKey::parse("zcs_sender_telegram123_abc456").unwrap();
|
||||||
|
assert_eq!(key.scope, SessionScope::Sender("telegram123".to_string()));
|
||||||
|
assert!(key.allows("chat"));
|
||||||
|
assert!(!key.allows("tool_execute"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_cron() {
|
||||||
|
let key = SessionKey::parse("zcs_cron_random789").unwrap();
|
||||||
|
assert_eq!(key.scope, SessionScope::Cron);
|
||||||
|
assert!(key.allows("chat"));
|
||||||
|
assert!(key.allows("tool_execute"));
|
||||||
|
assert!(!key.allows("admin"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_subagent() {
|
||||||
|
let key = SessionKey::parse("zcs_subagent_xyz").unwrap();
|
||||||
|
assert_eq!(key.scope, SessionScope::SubAgent);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_invalid_prefix() {
|
||||||
|
assert!(SessionKey::parse("zc_main_abc").is_none());
|
||||||
|
assert!(SessionKey::parse("").is_none());
|
||||||
|
assert!(SessionKey::parse("zcs_unknown_abc").is_none());
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -236,7 +236,8 @@ async fn handle_socket(socket: WebSocket, state: AppState, session_id: Option<St
|
|||||||
let user_msg = crate::providers::ChatMessage::user(&content);
|
let user_msg = crate::providers::ChatMessage::user(&content);
|
||||||
let _ = backend.append(&session_key, &user_msg);
|
let _ = backend.append(&session_key, &user_msg);
|
||||||
}
|
}
|
||||||
process_chat_message(&state, &mut agent, &mut sender, &content, &session_key).await;
|
process_chat_message(&state, &mut agent, &mut sender, &content, &session_key)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user