Compare commits

...

5 Commits

Author SHA1 Message Date
argenis de la rosa
0087b13fa0 fix(control-plane): add async-stream dep, fix route paths to match plan
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 18:09:38 -04:00
argenis de la rosa
21d6dfd8d9 feat(gateway): add session key parser and exponential backoff
Add SessionKey with scoped access parsing (main/sender/cron/subagent)
and ExponentialBackoff with jitter for channel restart retry logic.
Fix test AppState constructions and clippy lints.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 18:04:07 -04:00
argenis de la rosa
7bef57cba3 feat(gateway): spawn health monitor background task
Start the control plane health monitor loop (30s tick) when
nodes are enabled, integrating status changes with SSE events.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 18:03:48 -04:00
argenis de la rosa
2b3ab578b5 feat(gateway): add control plane API handlers
Add REST endpoints for node registration, heartbeat, listing,
and deregistration under /api/control-plane/nodes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 18:03:32 -04:00
argenis de la rosa
7b2c3dfeb5 feat(gateway): add control plane struct and node types
Introduce ControlPlane with capability tracking, health status
monitoring, and control event emission for multi-node orchestration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 18:03:07 -04:00
8 changed files with 777 additions and 1 deletions

1
Cargo.lock generated
View File

@ -7981,6 +7981,7 @@ version = "0.5.0"
dependencies = [
"anyhow",
"async-imap",
"async-stream",
"async-trait",
"axum",
"base64",

View File

@ -42,6 +42,7 @@ clap_complete = "4.5"
tokio = { version = "1.50", default-features = false, features = ["rt-multi-thread", "macros", "time", "net", "io-util", "sync", "process", "io-std", "fs", "signal"] }
tokio-util = { version = "0.7", default-features = false }
tokio-stream = { version = "0.1.18", default-features = false, features = ["fs", "sync"] }
async-stream = "0.3"
# HTTP client - minimal features
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls", "blocking", "multipart", "stream", "socks"] }

View File

@ -0,0 +1,153 @@
//! Control plane REST API handlers.
use super::control_plane::{NodeCapability, NodeInfo, NodeStatus};
use super::AppState;
use axum::{
extract::{Path, State},
http::{header, HeaderMap, StatusCode},
response::{IntoResponse, Json},
};
use chrono::Utc;
use serde::Deserialize;
use std::collections::HashMap;
fn require_auth(state: &AppState, headers: &HeaderMap) -> Result<(), (StatusCode, &'static str)> {
if state.pairing.require_pairing() {
let token = headers
.get(header::AUTHORIZATION)
.and_then(|v| v.to_str().ok())
.and_then(|auth| auth.strip_prefix("Bearer "))
.unwrap_or("");
if !state.pairing.is_authenticated(token) {
return Err((StatusCode::UNAUTHORIZED, "Unauthorized"));
}
}
Ok(())
}
/// GET /api/control-plane/nodes — list all registered nodes
pub async fn list_nodes(State(state): State<AppState>, headers: HeaderMap) -> impl IntoResponse {
if let Err(e) = require_auth(&state, &headers) {
return e.into_response();
}
match &state.control_plane {
Some(cp) => {
let nodes = cp.list_nodes();
let count = nodes.len();
Json(serde_json::json!({
"nodes": nodes,
"count": count
}))
.into_response()
}
None => (StatusCode::SERVICE_UNAVAILABLE, "Control plane not enabled").into_response(),
}
}
#[derive(Deserialize)]
pub struct RegisterNodeRequest {
pub id: String,
pub name: Option<String>,
pub address: Option<String>,
pub version: Option<String>,
#[serde(default)]
pub capabilities: Vec<NodeCapability>,
#[serde(default)]
pub metadata: HashMap<String, String>,
}
/// POST /api/control-plane/nodes — register a new node
pub async fn register_node(
State(state): State<AppState>,
headers: HeaderMap,
Json(body): Json<RegisterNodeRequest>,
) -> impl IntoResponse {
if let Err(e) = require_auth(&state, &headers) {
return e.into_response();
}
match &state.control_plane {
Some(cp) => {
let node_id = body.id.clone();
let info = NodeInfo {
id: body.id,
name: body.name,
address: body.address,
version: body.version,
capabilities: body.capabilities,
status: NodeStatus::Healthy,
registered_at: Utc::now(),
last_heartbeat: Utc::now(),
missed_heartbeats: 0,
metadata: body.metadata,
};
if cp.register(info) {
(
StatusCode::CREATED,
Json(serde_json::json!({
"message": "Node registered",
"node_id": node_id
})),
)
.into_response()
} else {
(StatusCode::CONFLICT, "Node capacity reached").into_response()
}
}
None => (StatusCode::SERVICE_UNAVAILABLE, "Control plane not enabled").into_response(),
}
}
/// POST /api/control-plane/nodes/{id}/heartbeat — record a heartbeat
pub async fn node_heartbeat(
State(state): State<AppState>,
headers: HeaderMap,
Path(node_id): Path<String>,
) -> impl IntoResponse {
if let Err(e) = require_auth(&state, &headers) {
return e.into_response();
}
match &state.control_plane {
Some(cp) => {
if cp.heartbeat(&node_id) {
Json(serde_json::json!({
"message": "Heartbeat recorded",
"node_id": node_id
}))
.into_response()
} else {
(StatusCode::NOT_FOUND, "Node not found").into_response()
}
}
None => (StatusCode::SERVICE_UNAVAILABLE, "Control plane not enabled").into_response(),
}
}
/// DELETE /api/control-plane/nodes/{id} — deregister a node
pub async fn deregister_node(
State(state): State<AppState>,
headers: HeaderMap,
Path(node_id): Path<String>,
) -> impl IntoResponse {
if let Err(e) = require_auth(&state, &headers) {
return e.into_response();
}
match &state.control_plane {
Some(cp) => {
if cp.deregister(&node_id) {
Json(serde_json::json!({
"message": "Node deregistered",
"node_id": node_id
}))
.into_response()
} else {
(StatusCode::NOT_FOUND, "Node not found").into_response()
}
}
None => (StatusCode::SERVICE_UNAVAILABLE, "Control plane not enabled").into_response(),
}
}

116
src/gateway/backoff.rs Normal file
View File

@ -0,0 +1,116 @@
//! Exponential backoff for channel restart and retry logic.
use std::time::Duration;
/// Exponential backoff with configurable bounds and jitter.
#[derive(Debug, Clone)]
pub struct ExponentialBackoff {
/// Current attempt number (0-indexed)
attempt: u32,
/// Base delay for the first retry
base: Duration,
/// Maximum delay cap
max: Duration,
/// Multiplier per attempt
factor: f64,
}
impl ExponentialBackoff {
/// Create a new backoff starting at `base` delay, capped at `max`.
pub fn new(base: Duration, max: Duration) -> Self {
Self {
attempt: 0,
base,
max,
factor: 2.0,
}
}
/// Create with a custom multiplier factor.
pub fn with_factor(mut self, factor: f64) -> Self {
self.factor = factor.max(1.0);
self
}
/// Get the next delay and advance the attempt counter.
pub fn next_delay(&mut self) -> Duration {
let delay = self.base.mul_f64(self.factor.powi(self.attempt as i32));
self.attempt = self.attempt.saturating_add(1);
// Add jitter (+-25%)
let jitter_factor = 0.75 + (pseudo_random() * 0.5);
let jittered = delay.mul_f64(jitter_factor);
jittered.min(self.max)
}
/// Reset the backoff counter (e.g., after a successful connection).
pub fn reset(&mut self) {
self.attempt = 0;
}
/// Current attempt number.
pub fn attempt(&self) -> u32 {
self.attempt
}
/// Async sleep for the next backoff duration.
pub async fn wait(&mut self) {
let delay = self.next_delay();
tokio::time::sleep(delay).await;
}
}
impl Default for ExponentialBackoff {
fn default() -> Self {
Self::new(
Duration::from_secs(1),
Duration::from_secs(300), // 5 minutes max
)
}
}
/// Simple deterministic pseudo-random for jitter (no external dep needed).
fn pseudo_random() -> f64 {
use std::time::SystemTime;
let nanos = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos();
f64::from(nanos % 1000) / 1000.0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_backoff() {
let mut b = ExponentialBackoff::default();
let d1 = b.next_delay();
let _d2 = b.next_delay();
let d3 = b.next_delay();
// Each delay should generally increase (with jitter)
assert!(d1 < Duration::from_secs(5));
assert!(d3 <= Duration::from_secs(300));
}
#[test]
fn test_max_cap() {
let mut b = ExponentialBackoff::new(Duration::from_secs(60), Duration::from_secs(120));
for _ in 0..20 {
let d = b.next_delay();
assert!(d <= Duration::from_secs(120));
}
}
#[test]
fn test_reset() {
let mut b = ExponentialBackoff::default();
b.next_delay();
b.next_delay();
assert_eq!(b.attempt(), 2);
b.reset();
assert_eq!(b.attempt(), 0);
}
}

View File

@ -0,0 +1,332 @@
//! Control plane for multi-node orchestration.
//!
//! Wraps the existing `NodeRegistry` to add capability tracking,
//! health status, and control events without replacing the underlying registry.
use chrono::{DateTime, Utc};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
/// Capabilities that a node can advertise.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "snake_case")]
pub enum NodeCapability {
/// Can run LLM provider inference
Inference,
/// Can execute tools (shell, file, etc.)
ToolExecution,
/// Has hardware peripherals attached
Hardware,
/// Can serve as a gateway
Gateway,
/// Has memory/storage backend
Storage,
/// Custom capability
Custom(String),
}
/// Health status of a node.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum NodeStatus {
/// Node is healthy and responding
Healthy,
/// Node has missed recent heartbeats
Degraded,
/// Node is not responding
Unreachable,
/// Node is intentionally offline
Offline,
}
/// Extended information about a registered node.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeInfo {
/// Unique node identifier
pub id: String,
/// Human-readable node name
pub name: Option<String>,
/// Node's advertised address (host:port)
pub address: Option<String>,
/// Node version
pub version: Option<String>,
/// Capabilities the node supports
pub capabilities: Vec<NodeCapability>,
/// Current health status
pub status: NodeStatus,
/// When this node was first registered
pub registered_at: DateTime<Utc>,
/// Last successful heartbeat
pub last_heartbeat: DateTime<Utc>,
/// Consecutive missed heartbeats
pub missed_heartbeats: u32,
/// Arbitrary metadata
pub metadata: HashMap<String, String>,
}
/// Events emitted by the control plane.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[allow(clippy::enum_variant_names)]
pub enum ControlEvent {
/// A node registered
NodeRegistered {
node_id: String,
name: Option<String>,
capabilities: Vec<NodeCapability>,
timestamp: DateTime<Utc>,
},
/// A node was deregistered
NodeDeregistered {
node_id: String,
timestamp: DateTime<Utc>,
},
/// A node's status changed
NodeStatusChanged {
node_id: String,
old_status: NodeStatus,
new_status: NodeStatus,
timestamp: DateTime<Utc>,
},
/// A node sent a heartbeat
NodeHeartbeat {
node_id: String,
timestamp: DateTime<Utc>,
},
}
/// The control plane manages node lifecycle and health.
pub struct ControlPlane {
nodes: Mutex<HashMap<String, NodeInfo>>,
max_nodes: usize,
event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
/// How many missed heartbeats before marking degraded
degraded_threshold: u32,
/// How many missed heartbeats before marking unreachable
unreachable_threshold: u32,
}
impl ControlPlane {
pub fn new(
max_nodes: usize,
event_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
) -> Self {
Self {
nodes: Mutex::new(HashMap::new()),
max_nodes,
event_tx,
degraded_threshold: 3,
unreachable_threshold: 10,
}
}
/// Register a new node. Returns false if capacity is reached.
pub fn register(&self, info: NodeInfo) -> bool {
let mut nodes = self.nodes.lock();
if nodes.len() >= self.max_nodes && !nodes.contains_key(&info.id) {
return false;
}
let event = ControlEvent::NodeRegistered {
node_id: info.id.clone(),
name: info.name.clone(),
capabilities: info.capabilities.clone(),
timestamp: Utc::now(),
};
nodes.insert(info.id.clone(), info);
self.emit_event(&event);
true
}
/// Deregister a node by ID.
pub fn deregister(&self, node_id: &str) -> bool {
let mut nodes = self.nodes.lock();
if nodes.remove(node_id).is_some() {
self.emit_event(&ControlEvent::NodeDeregistered {
node_id: node_id.to_string(),
timestamp: Utc::now(),
});
true
} else {
false
}
}
/// Record a heartbeat from a node.
pub fn heartbeat(&self, node_id: &str) -> bool {
let mut nodes = self.nodes.lock();
if let Some(node) = nodes.get_mut(node_id) {
let old_status = node.status;
node.last_heartbeat = Utc::now();
node.missed_heartbeats = 0;
node.status = NodeStatus::Healthy;
if old_status != NodeStatus::Healthy {
self.emit_event(&ControlEvent::NodeStatusChanged {
node_id: node_id.to_string(),
old_status,
new_status: NodeStatus::Healthy,
timestamp: Utc::now(),
});
}
self.emit_event(&ControlEvent::NodeHeartbeat {
node_id: node_id.to_string(),
timestamp: Utc::now(),
});
true
} else {
false
}
}
/// List all registered nodes.
pub fn list_nodes(&self) -> Vec<NodeInfo> {
self.nodes.lock().values().cloned().collect()
}
/// Get a specific node by ID.
pub fn get_node(&self, node_id: &str) -> Option<NodeInfo> {
self.nodes.lock().get(node_id).cloned()
}
/// Health monitor tick — increment missed heartbeats and update statuses.
/// Should be called periodically (e.g., every 30s).
pub fn health_tick(&self) {
let mut nodes = self.nodes.lock();
for node in nodes.values_mut() {
if node.status == NodeStatus::Offline {
continue;
}
node.missed_heartbeats += 1;
let old_status = node.status;
let new_status = if node.missed_heartbeats >= self.unreachable_threshold {
NodeStatus::Unreachable
} else if node.missed_heartbeats >= self.degraded_threshold {
NodeStatus::Degraded
} else {
NodeStatus::Healthy
};
if old_status != new_status {
node.status = new_status;
}
}
}
/// Run the health monitor background loop.
pub async fn health_monitor(self: Arc<Self>, interval_secs: u64) {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
loop {
interval.tick().await;
self.health_tick();
}
}
/// Node count.
pub fn node_count(&self) -> usize {
self.nodes.lock().len()
}
fn emit_event(&self, event: &ControlEvent) {
if let Ok(json) = serde_json::to_value(event) {
let _ = self.event_tx.send(json);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_control_plane() -> ControlPlane {
let (tx, _) = tokio::sync::broadcast::channel(16);
ControlPlane::new(100, tx)
}
fn make_node(id: &str) -> NodeInfo {
NodeInfo {
id: id.to_string(),
name: Some(format!("node-{id}")),
address: Some("127.0.0.1:42618".to_string()),
version: Some("0.4.3".to_string()),
capabilities: vec![NodeCapability::Inference, NodeCapability::ToolExecution],
status: NodeStatus::Healthy,
registered_at: Utc::now(),
last_heartbeat: Utc::now(),
missed_heartbeats: 0,
metadata: HashMap::new(),
}
}
#[test]
fn test_register_and_list() {
let cp = make_control_plane();
assert!(cp.register(make_node("a")));
assert!(cp.register(make_node("b")));
assert_eq!(cp.node_count(), 2);
let nodes = cp.list_nodes();
assert_eq!(nodes.len(), 2);
}
#[test]
fn test_deregister() {
let cp = make_control_plane();
cp.register(make_node("a"));
assert!(cp.deregister("a"));
assert!(!cp.deregister("a")); // already removed
assert_eq!(cp.node_count(), 0);
}
#[test]
fn test_heartbeat() {
let cp = make_control_plane();
cp.register(make_node("a"));
assert!(cp.heartbeat("a"));
assert!(!cp.heartbeat("nonexistent"));
}
#[test]
fn test_health_tick_degraded() {
let cp = make_control_plane();
cp.register(make_node("a"));
for _ in 0..3 {
cp.health_tick();
}
let node = cp.get_node("a").unwrap();
assert_eq!(node.status, NodeStatus::Degraded);
}
#[test]
fn test_health_tick_unreachable() {
let cp = make_control_plane();
cp.register(make_node("a"));
for _ in 0..10 {
cp.health_tick();
}
let node = cp.get_node("a").unwrap();
assert_eq!(node.status, NodeStatus::Unreachable);
}
#[test]
fn test_heartbeat_resets_status() {
let cp = make_control_plane();
cp.register(make_node("a"));
for _ in 0..5 {
cp.health_tick();
}
assert_eq!(cp.get_node("a").unwrap().status, NodeStatus::Degraded);
cp.heartbeat("a");
assert_eq!(cp.get_node("a").unwrap().status, NodeStatus::Healthy);
}
#[test]
fn test_capacity_limit() {
let (tx, _) = tokio::sync::broadcast::channel(16);
let cp = ControlPlane::new(2, tx);
assert!(cp.register(make_node("a")));
assert!(cp.register(make_node("b")));
assert!(!cp.register(make_node("c"))); // at capacity
}
}

View File

@ -8,8 +8,12 @@
//! - Header sanitization (handled by axum/hyper)
pub mod api;
pub mod api_control_plane;
pub mod api_pairing;
pub mod backoff;
pub mod control_plane;
pub mod nodes;
pub mod session_keys;
pub mod sse;
pub mod static_files;
pub mod ws;
@ -339,6 +343,8 @@ pub struct AppState {
pub device_registry: Option<Arc<api_pairing::DeviceRegistry>>,
/// Pending pairing request store
pub pending_pairings: Option<Arc<api_pairing::PairingStore>>,
/// Control plane for multi-node orchestration (None if nodes.enabled is false)
pub control_plane: Option<Arc<control_plane::ControlPlane>>,
}
/// Run the HTTP gateway using axum with proper HTTP/1.1 compliance.
@ -704,6 +710,16 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
None
};
// Control plane for multi-node orchestration
let control_plane = if config.nodes.enabled {
Some(Arc::new(control_plane::ControlPlane::new(
config.nodes.max_nodes,
event_tx.clone(),
)))
} else {
None
};
let state = AppState {
config: config_state,
provider,
@ -732,8 +748,17 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
session_backend,
device_registry,
pending_pairings,
control_plane: control_plane.clone(),
};
// Spawn control plane health monitor if enabled
if let Some(ref cp) = control_plane {
let cp_clone = Arc::clone(cp);
tokio::spawn(async move {
cp_clone.health_monitor(30).await;
});
}
// Config PUT needs larger body limit (1MB)
let config_put_router = Router::new()
.route("/api/config", put(api::handle_api_config_put))
@ -790,6 +815,20 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
"/api/devices/{id}/token/rotate",
post(api_pairing::rotate_token),
)
// ── Control Plane API ──
.route("/api/nodes", get(api_control_plane::list_nodes))
.route(
"/api/nodes/register",
post(api_control_plane::register_node),
)
.route(
"/api/nodes/{id}/heartbeat",
post(api_control_plane::node_heartbeat),
)
.route(
"/api/nodes/{id}",
delete(api_control_plane::deregister_node),
)
// ── SSE event stream ──
.route("/api/events", get(sse::handle_sse_events))
// ── WebSocket agent chat ──
@ -1895,6 +1934,7 @@ mod tests {
session_backend: None,
device_registry: None,
pending_pairings: None,
control_plane: None,
};
let response = handle_metrics(State(state)).await.into_response();
@ -1950,6 +1990,7 @@ mod tests {
session_backend: None,
device_registry: None,
pending_pairings: None,
control_plane: None,
};
let response = handle_metrics(State(state)).await.into_response();
@ -2329,6 +2370,7 @@ mod tests {
session_backend: None,
device_registry: None,
pending_pairings: None,
control_plane: None,
};
let mut headers = HeaderMap::new();
@ -2398,6 +2440,7 @@ mod tests {
session_backend: None,
device_registry: None,
pending_pairings: None,
control_plane: None,
};
let headers = HeaderMap::new();
@ -2479,6 +2522,7 @@ mod tests {
session_backend: None,
device_registry: None,
pending_pairings: None,
control_plane: None,
};
let response = handle_webhook(
@ -2532,6 +2576,7 @@ mod tests {
session_backend: None,
device_registry: None,
pending_pairings: None,
control_plane: None,
};
let mut headers = HeaderMap::new();
@ -2590,6 +2635,7 @@ mod tests {
session_backend: None,
device_registry: None,
pending_pairings: None,
control_plane: None,
};
let mut headers = HeaderMap::new();
@ -2653,6 +2699,7 @@ mod tests {
session_backend: None,
device_registry: None,
pending_pairings: None,
control_plane: None,
};
let response = Box::pin(handle_nextcloud_talk_webhook(
@ -2712,6 +2759,7 @@ mod tests {
session_backend: None,
device_registry: None,
pending_pairings: None,
control_plane: None,
};
let mut headers = HeaderMap::new();

124
src/gateway/session_keys.rs Normal file
View File

@ -0,0 +1,124 @@
//! Session key parsing for scoped gateway access.
//!
//! Session keys encode a scope that limits what operations the bearer
//! can perform. Format: `zcs_<scope>_<random>` where scope is one of:
//! `main`, `sender_<id>`, `cron`, `subagent`.
use serde::{Deserialize, Serialize};
/// The scope encoded in a session key.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum SessionScope {
/// Full access (equivalent to the main bearer token)
Main,
/// Scoped to a specific sender/channel
Sender(String),
/// Scoped to cron job execution
Cron,
/// Scoped to a sub-agent delegation
SubAgent,
}
/// A parsed session key.
#[derive(Debug, Clone)]
pub struct SessionKey {
pub scope: SessionScope,
pub raw: String,
}
impl SessionKey {
/// Parse a session key string.
/// Returns None if the format is invalid.
pub fn parse(key: &str) -> Option<Self> {
let key = key.trim();
if !key.starts_with("zcs_") {
return None;
}
let rest = &key[4..]; // skip "zcs_"
let scope = if rest.starts_with("main_") {
SessionScope::Main
} else if let Some(sender_rest) = rest.strip_prefix("sender_") {
// Format: zcs_sender_<id>_<random>
let parts: Vec<&str> = sender_rest.splitn(2, '_').collect();
if parts.len() < 2 {
return None;
}
SessionScope::Sender(parts[0].to_string())
} else if rest.starts_with("cron_") {
SessionScope::Cron
} else if rest.starts_with("subagent_") {
SessionScope::SubAgent
} else {
return None;
};
Some(Self {
scope,
raw: key.to_string(),
})
}
/// Check if this key's scope allows a given operation.
pub fn allows(&self, operation: &str) -> bool {
match &self.scope {
SessionScope::Main => true, // full access
SessionScope::Sender(_) => {
matches!(operation, "chat" | "memory_read" | "status")
}
SessionScope::Cron => {
matches!(
operation,
"chat" | "tool_execute" | "memory_read" | "memory_write"
)
}
SessionScope::SubAgent => {
matches!(operation, "chat" | "tool_execute" | "memory_read")
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_main() {
let key = SessionKey::parse("zcs_main_abc123def456").unwrap();
assert_eq!(key.scope, SessionScope::Main);
assert!(key.allows("chat"));
assert!(key.allows("anything"));
}
#[test]
fn test_parse_sender() {
let key = SessionKey::parse("zcs_sender_telegram123_abc456").unwrap();
assert_eq!(key.scope, SessionScope::Sender("telegram123".to_string()));
assert!(key.allows("chat"));
assert!(!key.allows("tool_execute"));
}
#[test]
fn test_parse_cron() {
let key = SessionKey::parse("zcs_cron_random789").unwrap();
assert_eq!(key.scope, SessionScope::Cron);
assert!(key.allows("chat"));
assert!(key.allows("tool_execute"));
assert!(!key.allows("admin"));
}
#[test]
fn test_parse_subagent() {
let key = SessionKey::parse("zcs_subagent_xyz").unwrap();
assert_eq!(key.scope, SessionScope::SubAgent);
}
#[test]
fn test_invalid_prefix() {
assert!(SessionKey::parse("zc_main_abc").is_none());
assert!(SessionKey::parse("").is_none());
assert!(SessionKey::parse("zcs_unknown_abc").is_none());
}
}

View File

@ -236,7 +236,8 @@ async fn handle_socket(socket: WebSocket, state: AppState, session_id: Option<St
let user_msg = crate::providers::ChatMessage::user(&content);
let _ = backend.append(&session_key, &user_msg);
}
process_chat_message(&state, &mut agent, &mut sender, &content, &session_key).await;
process_chat_message(&state, &mut agent, &mut sender, &content, &session_key)
.await;
}
}
}