feat(channels): scaffold bridge websocket channel for #2816

(cherry picked from commit e8e314f69e396d86ad97a4817532a351cd7c1365)
This commit is contained in:
argenis de la rosa 2026-03-05 05:54:37 -05:00 committed by Argenis
parent c350a8a7f8
commit 4cf1adfd7d
4 changed files with 201 additions and 0 deletions

View File

@ -0,0 +1,12 @@
# ZeroClaw Issue Fix Checklist
> 状态约定:`TODO` = 待处理,`IN PROGRESS` = 进行中。
- #2816 — Bridge WebSocket channel — TODO (`IN PROGRESS`)
- #2810 — TODO
- #2735 — TODO
- #2721 — TODO
- #2403 — TODO
- #2451 — TODO
- #2404 — TODO
- #2401 — TODO

83
src/channels/bridge.rs Normal file
View File

@ -0,0 +1,83 @@
use crate::channels::traits::{Channel, ChannelMessage, SendMessage};
use crate::config::schema::BridgeConfig;
use async_trait::async_trait;
use tokio::sync::mpsc;
/// Bridge WebSocket channel scaffold.
///
/// This MVP wires config + channel lifecycle into the runtime while the
/// full websocket transport is implemented incrementally.
#[derive(Debug, Clone)]
pub struct BridgeChannel {
config: BridgeConfig,
}
impl BridgeChannel {
pub fn new(config: BridgeConfig) -> Self {
Self { config }
}
#[must_use]
pub fn config(&self) -> &BridgeConfig {
&self.config
}
#[must_use]
pub fn endpoint_url(&self) -> String {
format!(
"ws://{}:{}{}",
self.config.bind_host, self.config.bind_port, self.config.path
)
}
}
#[async_trait]
impl Channel for BridgeChannel {
fn name(&self) -> &str {
"bridge"
}
async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
tracing::info!(
recipient = %message.recipient,
subject = ?message.subject,
bytes = message.content.len(),
endpoint = %self.endpoint_url(),
"Bridge channel scaffold send invoked (no-op)"
);
Ok(())
}
async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
tracing::info!(
endpoint = %self.endpoint_url(),
"Bridge channel scaffold listener started (waiting for shutdown)"
);
// Keep task alive so supervised listener doesn't hot-restart while
// websocket transport is being implemented.
tx.closed().await;
Ok(())
}
async fn health_check(&self) -> bool {
!self.config.bind_host.trim().is_empty()
&& self.config.bind_host == "127.0.0.1"
&& self.config.bind_port > 0
&& self.config.path.starts_with('/')
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn bridge_channel_name_and_endpoint_from_config() {
let channel = BridgeChannel::new(BridgeConfig::default());
assert_eq!(channel.name(), "bridge");
assert_eq!(channel.endpoint_url(), "ws://127.0.0.1:8765/ws");
assert_eq!(channel.config().bind_host, "127.0.0.1");
}
}

View File

@ -14,6 +14,7 @@
//! To add a new channel, implement [`Channel`] in a new submodule and wire it into
//! [`start_channels`]. See `AGENTS.md` §7.2 for the full change playbook.
pub mod bridge;
pub mod clawdtalk;
pub mod cli;
pub mod dingtalk;
@ -42,6 +43,7 @@ pub mod whatsapp_storage;
#[cfg(feature = "whatsapp-web")]
pub mod whatsapp_web;
pub use bridge::BridgeChannel;
pub use clawdtalk::ClawdTalkChannel;
pub use cli::CliChannel;
pub use dingtalk::DingTalkChannel;
@ -4346,6 +4348,13 @@ fn collect_configured_channels(
let _ = matrix_skip_context;
let mut channels = Vec::new();
if let Some(ref bridge_cfg) = config.channels_config.bridge {
channels.push(ConfiguredChannel {
display_name: "Bridge",
channel: Arc::new(BridgeChannel::new(bridge_cfg.clone())),
});
}
if let Some(ref tg) = config.channels_config.telegram {
let mut telegram = TelegramChannel::new(
tg.bot_token.clone(),
@ -10118,6 +10127,19 @@ BTC is currently around $65,000 based on latest tool output."#;
.any(|entry| entry.channel.name() == "mattermost"));
}
#[test]
fn collect_configured_channels_includes_bridge_when_configured() {
let mut config = Config::default();
config.channels_config.bridge = Some(crate::config::schema::BridgeConfig::default());
let channels = collect_configured_channels(&config, "test");
assert!(channels.iter().any(|entry| entry.display_name == "Bridge"));
assert!(channels
.iter()
.any(|entry| entry.channel.name() == "bridge"));
}
struct AlwaysFailChannel {
name: &'static str,
calls: Arc<AtomicUsize>,

View File

@ -408,6 +408,7 @@ impl std::fmt::Debug for Config {
self.model_providers.keys().map(String::as_str).collect();
let delegate_agent_ids: Vec<&str> = self.agents.keys().map(String::as_str).collect();
let enabled_channel_count = [
self.channels_config.bridge.is_some(),
self.channels_config.telegram.is_some(),
self.channels_config.discord.is_some(),
self.channels_config.slack.is_some(),
@ -3292,6 +3293,8 @@ impl<T: ChannelConfig> crate::config::traits::ConfigHandle for ConfigWrapper<T>
pub struct ChannelsConfig {
/// Enable the CLI interactive channel. Default: `true`.
pub cli: bool,
/// Local bridge websocket channel configuration.
pub bridge: Option<BridgeConfig>,
/// Telegram bot channel configuration.
pub telegram: Option<TelegramConfig>,
/// Discord bot channel configuration.
@ -3345,6 +3348,10 @@ impl ChannelsConfig {
#[rustfmt::skip]
pub fn channels_except_webhook(&self) -> Vec<(Box<dyn super::traits::ConfigHandle>, bool)> {
vec![
(
Box::new(ConfigWrapper::new(self.bridge.as_ref())),
self.bridge.is_some(),
),
(
Box::new(ConfigWrapper::new(self.telegram.as_ref())),
self.telegram.is_some(),
@ -3444,6 +3451,7 @@ impl Default for ChannelsConfig {
fn default() -> Self {
Self {
cli: true,
bridge: None,
telegram: None,
discord: None,
slack: None,
@ -3541,6 +3549,54 @@ fn clone_group_reply_allowed_sender_ids(group_reply: Option<&GroupReplyConfig>)
.unwrap_or_default()
}
fn default_bridge_bind_host() -> String {
"127.0.0.1".into()
}
fn default_bridge_bind_port() -> u16 {
8765
}
fn default_bridge_path() -> String {
"/ws".into()
}
/// Bridge WebSocket channel configuration.
///
/// This listener is local-only by default (`127.0.0.1`) for safety.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct BridgeConfig {
/// Local bind host for the bridge listener.
#[serde(default = "default_bridge_bind_host")]
pub bind_host: String,
/// TCP port for incoming websocket bridge clients.
#[serde(default = "default_bridge_bind_port")]
pub bind_port: u16,
/// HTTP path for websocket upgrade requests.
#[serde(default = "default_bridge_path")]
pub path: String,
}
impl Default for BridgeConfig {
fn default() -> Self {
Self {
bind_host: default_bridge_bind_host(),
bind_port: default_bridge_bind_port(),
path: default_bridge_path(),
}
}
}
impl ChannelConfig for BridgeConfig {
fn name() -> &'static str {
"Bridge"
}
fn desc() -> &'static str {
"Local websocket bridge"
}
}
/// Telegram bot channel configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct TelegramConfig {
@ -7182,6 +7238,7 @@ default_temperature = 0.7
goal_loop: GoalLoopConfig::default(),
channels_config: ChannelsConfig {
cli: true,
bridge: None,
telegram: Some(TelegramConfig {
bot_token: "123:ABC".into(),
allowed_users: vec!["user1".into()],
@ -8181,6 +8238,7 @@ allowed_users = ["@ops:matrix.org"]
async fn channels_config_with_imessage_and_matrix() {
let c = ChannelsConfig {
cli: true,
bridge: None,
telegram: None,
discord: None,
slack: None,
@ -8228,6 +8286,31 @@ allowed_users = ["@ops:matrix.org"]
assert!(c.matrix.is_none());
}
#[test]
async fn bridge_config_deserializes_with_safe_defaults() {
let parsed: BridgeConfig = serde_json::from_str("{}").unwrap();
assert_eq!(parsed.bind_host, "127.0.0.1");
assert_eq!(parsed.bind_port, 8765);
assert_eq!(parsed.path, "/ws");
}
#[test]
async fn channels_config_supports_bridge_section() {
let toml_str = r#"
cli = true
[bridge]
bind_host = "127.0.0.1"
bind_port = 9010
path = "/bridge"
"#;
let parsed: ChannelsConfig = toml::from_str(toml_str).unwrap();
let bridge = parsed.bridge.expect("bridge should be present");
assert_eq!(bridge.bind_host, "127.0.0.1");
assert_eq!(bridge.bind_port, 9010);
assert_eq!(bridge.path, "/bridge");
}
// ── Edge cases: serde(default) for allowed_users ─────────
#[test]
@ -8460,6 +8543,7 @@ channel_id = "C123"
async fn channels_config_with_whatsapp() {
let c = ChannelsConfig {
cli: true,
bridge: None,
telegram: None,
discord: None,
slack: None,