From 4cf1adfd7dfc6ead759ebd3be410bafb85f5beab Mon Sep 17 00:00:00 2001 From: argenis de la rosa Date: Thu, 5 Mar 2026 05:54:37 -0500 Subject: [PATCH] feat(channels): scaffold bridge websocket channel for #2816 (cherry picked from commit e8e314f69e396d86ad97a4817532a351cd7c1365) --- docs/issue-fix-checklist.md | 12 ++++++ src/channels/bridge.rs | 83 ++++++++++++++++++++++++++++++++++++ src/channels/mod.rs | 22 ++++++++++ src/config/schema.rs | 84 +++++++++++++++++++++++++++++++++++++ 4 files changed, 201 insertions(+) create mode 100644 docs/issue-fix-checklist.md create mode 100644 src/channels/bridge.rs diff --git a/docs/issue-fix-checklist.md b/docs/issue-fix-checklist.md new file mode 100644 index 000000000..2bdc12ccc --- /dev/null +++ b/docs/issue-fix-checklist.md @@ -0,0 +1,12 @@ +# ZeroClaw Issue Fix Checklist + +> 状态约定:`TODO` = 待处理,`IN PROGRESS` = 进行中。 + +- #2816 — Bridge WebSocket channel — TODO (`IN PROGRESS`) +- #2810 — TODO +- #2735 — TODO +- #2721 — TODO +- #2403 — TODO +- #2451 — TODO +- #2404 — TODO +- #2401 — TODO diff --git a/src/channels/bridge.rs b/src/channels/bridge.rs new file mode 100644 index 000000000..2275b3198 --- /dev/null +++ b/src/channels/bridge.rs @@ -0,0 +1,83 @@ +use crate::channels::traits::{Channel, ChannelMessage, SendMessage}; +use crate::config::schema::BridgeConfig; +use async_trait::async_trait; +use tokio::sync::mpsc; + +/// Bridge WebSocket channel scaffold. +/// +/// This MVP wires config + channel lifecycle into the runtime while the +/// full websocket transport is implemented incrementally. +#[derive(Debug, Clone)] +pub struct BridgeChannel { + config: BridgeConfig, +} + +impl BridgeChannel { + pub fn new(config: BridgeConfig) -> Self { + Self { config } + } + + #[must_use] + pub fn config(&self) -> &BridgeConfig { + &self.config + } + + #[must_use] + pub fn endpoint_url(&self) -> String { + format!( + "ws://{}:{}{}", + self.config.bind_host, self.config.bind_port, self.config.path + ) + } +} + +#[async_trait] +impl Channel for BridgeChannel { + fn name(&self) -> &str { + "bridge" + } + + async fn send(&self, message: &SendMessage) -> anyhow::Result<()> { + tracing::info!( + recipient = %message.recipient, + subject = ?message.subject, + bytes = message.content.len(), + endpoint = %self.endpoint_url(), + "Bridge channel scaffold send invoked (no-op)" + ); + Ok(()) + } + + async fn listen(&self, tx: mpsc::Sender) -> anyhow::Result<()> { + tracing::info!( + endpoint = %self.endpoint_url(), + "Bridge channel scaffold listener started (waiting for shutdown)" + ); + + // Keep task alive so supervised listener doesn't hot-restart while + // websocket transport is being implemented. + tx.closed().await; + Ok(()) + } + + async fn health_check(&self) -> bool { + !self.config.bind_host.trim().is_empty() + && self.config.bind_host == "127.0.0.1" + && self.config.bind_port > 0 + && self.config.path.starts_with('/') + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn bridge_channel_name_and_endpoint_from_config() { + let channel = BridgeChannel::new(BridgeConfig::default()); + + assert_eq!(channel.name(), "bridge"); + assert_eq!(channel.endpoint_url(), "ws://127.0.0.1:8765/ws"); + assert_eq!(channel.config().bind_host, "127.0.0.1"); + } +} diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 66df26bee..11750c10b 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -14,6 +14,7 @@ //! To add a new channel, implement [`Channel`] in a new submodule and wire it into //! [`start_channels`]. See `AGENTS.md` §7.2 for the full change playbook. +pub mod bridge; pub mod clawdtalk; pub mod cli; pub mod dingtalk; @@ -42,6 +43,7 @@ pub mod whatsapp_storage; #[cfg(feature = "whatsapp-web")] pub mod whatsapp_web; +pub use bridge::BridgeChannel; pub use clawdtalk::ClawdTalkChannel; pub use cli::CliChannel; pub use dingtalk::DingTalkChannel; @@ -4346,6 +4348,13 @@ fn collect_configured_channels( let _ = matrix_skip_context; let mut channels = Vec::new(); + if let Some(ref bridge_cfg) = config.channels_config.bridge { + channels.push(ConfiguredChannel { + display_name: "Bridge", + channel: Arc::new(BridgeChannel::new(bridge_cfg.clone())), + }); + } + if let Some(ref tg) = config.channels_config.telegram { let mut telegram = TelegramChannel::new( tg.bot_token.clone(), @@ -10118,6 +10127,19 @@ BTC is currently around $65,000 based on latest tool output."#; .any(|entry| entry.channel.name() == "mattermost")); } + #[test] + fn collect_configured_channels_includes_bridge_when_configured() { + let mut config = Config::default(); + config.channels_config.bridge = Some(crate::config::schema::BridgeConfig::default()); + + let channels = collect_configured_channels(&config, "test"); + + assert!(channels.iter().any(|entry| entry.display_name == "Bridge")); + assert!(channels + .iter() + .any(|entry| entry.channel.name() == "bridge")); + } + struct AlwaysFailChannel { name: &'static str, calls: Arc, diff --git a/src/config/schema.rs b/src/config/schema.rs index 24887efa6..2c22ffba9 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -408,6 +408,7 @@ impl std::fmt::Debug for Config { self.model_providers.keys().map(String::as_str).collect(); let delegate_agent_ids: Vec<&str> = self.agents.keys().map(String::as_str).collect(); let enabled_channel_count = [ + self.channels_config.bridge.is_some(), self.channels_config.telegram.is_some(), self.channels_config.discord.is_some(), self.channels_config.slack.is_some(), @@ -3292,6 +3293,8 @@ impl crate::config::traits::ConfigHandle for ConfigWrapper pub struct ChannelsConfig { /// Enable the CLI interactive channel. Default: `true`. pub cli: bool, + /// Local bridge websocket channel configuration. + pub bridge: Option, /// Telegram bot channel configuration. pub telegram: Option, /// Discord bot channel configuration. @@ -3345,6 +3348,10 @@ impl ChannelsConfig { #[rustfmt::skip] pub fn channels_except_webhook(&self) -> Vec<(Box, bool)> { vec![ + ( + Box::new(ConfigWrapper::new(self.bridge.as_ref())), + self.bridge.is_some(), + ), ( Box::new(ConfigWrapper::new(self.telegram.as_ref())), self.telegram.is_some(), @@ -3444,6 +3451,7 @@ impl Default for ChannelsConfig { fn default() -> Self { Self { cli: true, + bridge: None, telegram: None, discord: None, slack: None, @@ -3541,6 +3549,54 @@ fn clone_group_reply_allowed_sender_ids(group_reply: Option<&GroupReplyConfig>) .unwrap_or_default() } +fn default_bridge_bind_host() -> String { + "127.0.0.1".into() +} + +fn default_bridge_bind_port() -> u16 { + 8765 +} + +fn default_bridge_path() -> String { + "/ws".into() +} + +/// Bridge WebSocket channel configuration. +/// +/// This listener is local-only by default (`127.0.0.1`) for safety. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct BridgeConfig { + /// Local bind host for the bridge listener. + #[serde(default = "default_bridge_bind_host")] + pub bind_host: String, + /// TCP port for incoming websocket bridge clients. + #[serde(default = "default_bridge_bind_port")] + pub bind_port: u16, + /// HTTP path for websocket upgrade requests. + #[serde(default = "default_bridge_path")] + pub path: String, +} + +impl Default for BridgeConfig { + fn default() -> Self { + Self { + bind_host: default_bridge_bind_host(), + bind_port: default_bridge_bind_port(), + path: default_bridge_path(), + } + } +} + +impl ChannelConfig for BridgeConfig { + fn name() -> &'static str { + "Bridge" + } + + fn desc() -> &'static str { + "Local websocket bridge" + } +} + /// Telegram bot channel configuration. #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] pub struct TelegramConfig { @@ -7182,6 +7238,7 @@ default_temperature = 0.7 goal_loop: GoalLoopConfig::default(), channels_config: ChannelsConfig { cli: true, + bridge: None, telegram: Some(TelegramConfig { bot_token: "123:ABC".into(), allowed_users: vec!["user1".into()], @@ -8181,6 +8238,7 @@ allowed_users = ["@ops:matrix.org"] async fn channels_config_with_imessage_and_matrix() { let c = ChannelsConfig { cli: true, + bridge: None, telegram: None, discord: None, slack: None, @@ -8228,6 +8286,31 @@ allowed_users = ["@ops:matrix.org"] assert!(c.matrix.is_none()); } + #[test] + async fn bridge_config_deserializes_with_safe_defaults() { + let parsed: BridgeConfig = serde_json::from_str("{}").unwrap(); + assert_eq!(parsed.bind_host, "127.0.0.1"); + assert_eq!(parsed.bind_port, 8765); + assert_eq!(parsed.path, "/ws"); + } + + #[test] + async fn channels_config_supports_bridge_section() { + let toml_str = r#" +cli = true + +[bridge] +bind_host = "127.0.0.1" +bind_port = 9010 +path = "/bridge" +"#; + let parsed: ChannelsConfig = toml::from_str(toml_str).unwrap(); + let bridge = parsed.bridge.expect("bridge should be present"); + assert_eq!(bridge.bind_host, "127.0.0.1"); + assert_eq!(bridge.bind_port, 9010); + assert_eq!(bridge.path, "/bridge"); + } + // ── Edge cases: serde(default) for allowed_users ───────── #[test] @@ -8460,6 +8543,7 @@ channel_id = "C123" async fn channels_config_with_whatsapp() { let c = ChannelsConfig { cli: true, + bridge: None, telegram: None, discord: None, slack: None,