From 1702bb2747ea6a54c849d142470887fc36549abe Mon Sep 17 00:00:00 2001 From: Argenis Date: Tue, 24 Mar 2026 01:02:46 -0400 Subject: [PATCH] fix: route WebSocket connections through configured proxy (#4408) * fix: route WebSocket connections through configured proxy (#4408) tokio_tungstenite::connect_async does not honour proxy settings. Add proxy-aware WebSocket connect helpers (HTTP CONNECT and SOCKS5) in config::schema and update all six channel WebSocket connections (discord, discord_history, slack, dingtalk, lark, qq) to use ws_connect_with_proxy instead of connect_async. * fix: update Cargo.lock with tokio-socks dependency --- Cargo.lock | 59 +++--- Cargo.toml | 1 + src/channels/dingtalk.rs | 7 +- src/channels/discord.rs | 7 +- src/channels/discord_history.rs | 7 +- src/channels/lark.rs | 7 +- src/channels/qq.rs | 4 +- src/channels/slack.rs | 8 +- src/config/mod.rs | 4 +- src/config/schema.rs | 311 ++++++++++++++++++++++++++++++++ 10 files changed, 378 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f4d4dd568..228862e18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2542,9 +2542,9 @@ checksum = "e079f19b08ca6239f47f8ba8509c11cf3ea30095831f7fed61441475edd8c449" [[package]] name = "embed-resource" -version = "3.0.7" +version = "3.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47ec73ddcf6b7f23173d5c3c5a32b5507dc0a734de7730aa14abc5d5e296bb5f" +checksum = "63a1d0de4f2249aa0ff5884d7080814f446bb241a559af6c170a41e878ed2d45" dependencies = [ "cc", "memchr", @@ -2616,18 +2616,18 @@ dependencies = [ [[package]] name = "env_filter" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a1c3cc8e57274ec99de65301228b537f1e4eedc1b8e0f9411c6caac8ae7308f" +checksum = "32e90c2accc4b07a8456ea0debdc2e7587bdd890680d71173a15d4ae604f6eef" dependencies = [ "log", ] [[package]] name = "env_logger" -version = "0.11.9" +version = "0.11.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2daee4ea451f429a58296525ddf28b45a3b64f1acf6587e2067437bb11e218d" +checksum = "0621c04f2196ac3f488dd583365b9c09be011a4ab8b9f37248ffcc8f6198b56a" dependencies = [ "env_filter", "log", @@ -6876,7 +6876,7 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e67ba7e9b2b56446f1d419b1d807906278ffa1a658a8a5d8a39dcb1f5a78614f" dependencies = [ - "toml_edit 0.25.5+spec-1.1.0", + "toml_edit 0.25.8+spec-1.1.0", ] [[package]] @@ -8351,9 +8351,9 @@ dependencies = [ [[package]] name = "serde_spanned" -version = "1.0.4" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8bbf91e5a4d6315eee45e704372590b30e260ee83af6639d64557f51b067776" +checksum = "876ac351060d4f882bb1032b6369eb0aef79ad9df1ea8bc404874d8cc3d0cd98" dependencies = [ "serde_core", ] @@ -9623,7 +9623,7 @@ checksum = "cf92845e79fc2e2def6a5d828f0801e29a2f8acc037becc5ab08595c7d5e9863" dependencies = [ "indexmap 2.13.0", "serde_core", - "serde_spanned 1.0.4", + "serde_spanned 1.1.0", "toml_datetime 0.7.5+spec-1.1.0", "toml_parser", "toml_writer", @@ -9632,14 +9632,14 @@ dependencies = [ [[package]] name = "toml" -version = "1.0.7+spec-1.1.0" +version = "1.1.0+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd28d57d8a6f6e458bc0b8784f8fdcc4b99a437936056fa122cb234f18656a96" +checksum = "f8195ca05e4eb728f4ba94f3e3291661320af739c4e43779cbdfae82ab239fcc" dependencies = [ "indexmap 2.13.0", "serde_core", - "serde_spanned 1.0.4", - "toml_datetime 1.0.1+spec-1.1.0", + "serde_spanned 1.1.0", + "toml_datetime 1.1.0+spec-1.1.0", "toml_parser", "toml_writer", "winnow 1.0.0", @@ -9665,9 +9665,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "1.0.1+spec-1.1.0" +version = "1.1.0+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b320e741db58cac564e26c607d3cc1fdc4a88fd36c879568c07856ed83ff3e9" +checksum = "97251a7c317e03ad83774a8752a7e81fb6067740609f75ea2b585b569a59198f" dependencies = [ "serde_core", ] @@ -9710,21 +9710,21 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.25.5+spec-1.1.0" +version = "0.25.8+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ca1a40644a28bce036923f6a431df0b34236949d111cc07cb6dca830c9ef2e1" +checksum = "16bff38f1d86c47f9ff0647e6838d7bb362522bdf44006c7068c2b1e606f1f3c" dependencies = [ "indexmap 2.13.0", - "toml_datetime 1.0.1+spec-1.1.0", + "toml_datetime 1.1.0+spec-1.1.0", "toml_parser", "winnow 1.0.0", ] [[package]] name = "toml_parser" -version = "1.0.10+spec-1.1.0" +version = "1.1.0+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7df25b4befd31c4816df190124375d5a20c6b6921e2cad937316de3fccd63420" +checksum = "2334f11ee363607eb04df9b8fc8a13ca1715a72ba8662a26ac285c98aabb4011" dependencies = [ "winnow 1.0.0", ] @@ -9737,9 +9737,9 @@ checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" [[package]] name = "toml_writer" -version = "1.0.7+spec-1.1.0" +version = "1.1.0+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f17aaa1c6e3dc22b1da4b6bba97d066e354c7945cac2f7852d4e4e7ca7a6b56d" +checksum = "d282ade6016312faf3e41e57ebbba0c073e4056dab1232ab1cb624199648f8ed" [[package]] name = "tonic" @@ -9981,9 +9981,9 @@ checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" [[package]] name = "type1-encoding-parser" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3d6cc09e1a99c7e01f2afe4953789311a1c50baebbdac5b477ecf78e2e92a5b" +checksum = "fa10c302f5a53b7ad27fd42a3996e23d096ba39b5b8dd6d9e683a05b01bee749" dependencies = [ "pom", ] @@ -12358,7 +12358,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tokio-test", - "toml 1.0.7+spec-1.1.0", + "toml 1.1.0+spec-1.1.0", "tracing", ] @@ -12436,10 +12436,11 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-serial", + "tokio-socks", "tokio-stream", "tokio-tungstenite 0.29.0", "tokio-util", - "toml 1.0.7+spec-1.1.0", + "toml 1.1.0+spec-1.1.0", "tower", "tower-http", "tracing", @@ -12576,9 +12577,9 @@ dependencies = [ [[package]] name = "zip" -version = "8.3.1" +version = "8.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c546feb4481b0fbafb4ef0d79b6204fc41c6f9884b1b73b1d73f82442fc0845" +checksum = "7756d0206d058333667493c4014f545f4b9603c4330ccd6d9b3f86dcab59f7d9" dependencies = [ "crc32fast", "flate2", diff --git a/Cargo.toml b/Cargo.toml index 8726dac1f..7f5896128 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -150,6 +150,7 @@ which = "8.0" # WebSocket client channels (Discord/Lark/DingTalk/Nostr) tokio-tungstenite = { version = "0.29", features = ["rustls-tls-webpki-roots"] } +tokio-socks = "0.5" futures-util = { version = "0.3", default-features = false, features = ["sink"] } nostr-sdk = { version = "0.44", default-features = false, features = ["nip04", "nip59"], optional = true } regex = "1.10" diff --git a/src/channels/dingtalk.rs b/src/channels/dingtalk.rs index 916765aff..063cf9de0 100644 --- a/src/channels/dingtalk.rs +++ b/src/channels/dingtalk.rs @@ -162,7 +162,12 @@ impl Channel for DingTalkChannel { let ws_url = format!("{}?ticket={}", gw.endpoint, gw.ticket); tracing::info!("DingTalk: connecting to stream WebSocket..."); - let (ws_stream, _) = tokio_tungstenite::connect_async(&ws_url).await?; + let (ws_stream, _) = crate::config::ws_connect_with_proxy( + &ws_url, + "channel.dingtalk", + self.proxy_url.as_deref(), + ) + .await?; let (mut write, mut read) = ws_stream.split(); tracing::info!("DingTalk: connected and listening for messages..."); diff --git a/src/channels/discord.rs b/src/channels/discord.rs index 6c1183ec6..e1735a207 100644 --- a/src/channels/discord.rs +++ b/src/channels/discord.rs @@ -675,7 +675,12 @@ impl Channel for DiscordChannel { let ws_url = format!("{gw_url}/?v=10&encoding=json"); tracing::info!("Discord: connecting to gateway..."); - let (ws_stream, _) = tokio_tungstenite::connect_async(&ws_url).await?; + let (ws_stream, _) = crate::config::ws_connect_with_proxy( + &ws_url, + "channel.discord", + self.proxy_url.as_deref(), + ) + .await?; let (mut write, mut read) = ws_stream.split(); // Read Hello (opcode 10) diff --git a/src/channels/discord_history.rs b/src/channels/discord_history.rs index ac08a7ba2..f2bf30df1 100644 --- a/src/channels/discord_history.rs +++ b/src/channels/discord_history.rs @@ -240,7 +240,12 @@ impl Channel for DiscordHistoryChannel { let ws_url = format!("{gw_url}/?v=10&encoding=json"); tracing::info!("DiscordHistory: connecting to gateway..."); - let (ws_stream, _) = tokio_tungstenite::connect_async(&ws_url).await?; + let (ws_stream, _) = crate::config::ws_connect_with_proxy( + &ws_url, + "channel.discord", + self.proxy_url.as_deref(), + ) + .await?; let (mut write, mut read) = ws_stream.split(); // Read Hello (opcode 10) diff --git a/src/channels/lark.rs b/src/channels/lark.rs index ce60e502a..16b1d2ef9 100644 --- a/src/channels/lark.rs +++ b/src/channels/lark.rs @@ -734,7 +734,12 @@ impl LarkChannel { .unwrap_or(0); tracing::info!("Lark: connecting to {wss_url}"); - let (ws_stream, _) = tokio_tungstenite::connect_async(&wss_url).await?; + let (ws_stream, _) = crate::config::ws_connect_with_proxy( + &wss_url, + "channel.lark", + self.proxy_url.as_deref(), + ) + .await?; let (mut write, mut read) = ws_stream.split(); tracing::info!("Lark: WS connected (service_id={service_id})"); diff --git a/src/channels/qq.rs b/src/channels/qq.rs index e06c9462f..49f5a2b1c 100644 --- a/src/channels/qq.rs +++ b/src/channels/qq.rs @@ -976,7 +976,9 @@ impl Channel for QQChannel { let gw_url = self.get_gateway_url(&token).await?; tracing::info!("QQ: connecting to gateway WebSocket..."); - let (ws_stream, _) = tokio_tungstenite::connect_async(&gw_url).await?; + let (ws_stream, _) = + crate::config::ws_connect_with_proxy(&gw_url, "channel.qq", self.proxy_url.as_deref()) + .await?; let (mut write, mut read) = ws_stream.split(); // Read Hello (opcode 10) diff --git a/src/channels/slack.rs b/src/channels/slack.rs index d854fc5d3..ff7dcc64d 100644 --- a/src/channels/slack.rs +++ b/src/channels/slack.rs @@ -2021,7 +2021,13 @@ impl SlackChannel { } }; - let (ws_stream, _) = match tokio_tungstenite::connect_async(&ws_url).await { + let (ws_stream, _) = match crate::config::ws_connect_with_proxy( + &ws_url, + "channel.slack", + self.proxy_url.as_deref(), + ) + .await + { Ok(connection) => { socket_reconnect_attempt = 0; connection diff --git a/src/config/mod.rs b/src/config/mod.rs index 78f614196..ae255155e 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -7,8 +7,8 @@ pub use schema::{ apply_channel_proxy_to_builder, apply_runtime_proxy_to_builder, build_channel_proxy_client, build_channel_proxy_client_with_timeouts, build_runtime_proxy_client, build_runtime_proxy_client_with_timeouts, runtime_proxy_config, set_runtime_proxy_config, - AgentConfig, AssemblyAiSttConfig, AuditConfig, AutonomyConfig, BackupConfig, - BrowserComputerUseConfig, BrowserConfig, BuiltinHooksConfig, ChannelsConfig, + ws_connect_with_proxy, AgentConfig, AssemblyAiSttConfig, AuditConfig, AutonomyConfig, + BackupConfig, BrowserComputerUseConfig, BrowserConfig, BuiltinHooksConfig, ChannelsConfig, ClassificationRule, ClaudeCodeConfig, ClaudeCodeRunnerConfig, CloudOpsConfig, CodexCliConfig, ComposioConfig, Config, ConversationalAiConfig, CostConfig, CronConfig, CronJobDecl, CronScheduleDecl, DataRetentionConfig, DeepgramSttConfig, DelegateAgentConfig, diff --git a/src/config/schema.rs b/src/config/schema.rs index cebd288b7..3bdd0db22 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -4159,6 +4159,317 @@ fn apply_explicit_proxy_to_builder( builder } +// ── Proxy-aware WebSocket connect ──────────────────────────────── +// +// `tokio_tungstenite::connect_async` does not honour proxy settings. +// The helpers below resolve the effective proxy URL for a given service +// key and, when a proxy is active, establish a tunnelled TCP connection +// (HTTP CONNECT for http/https proxies, SOCKS5 for socks5/socks5h) +// before handing the stream to `tokio_tungstenite` for the WebSocket +// handshake. + +/// Combined async IO trait for boxed WebSocket transport streams. +trait AsyncReadWrite: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send {} +impl AsyncReadWrite for T {} + +/// A boxed async IO stream used when a WebSocket connection is tunnelled +/// through a proxy. The concrete type varies depending on the proxy +/// kind (HTTP CONNECT vs SOCKS5) and the target scheme (ws vs wss). +/// +/// We wrap in a newtype so we can implement `AsyncRead` and `AsyncWrite` +/// via delegation, since Rust trait objects cannot combine multiple +/// non-auto traits. +pub struct BoxedIo(Box); + +impl tokio::io::AsyncRead for BoxedIo { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + std::pin::Pin::new(&mut *self.0).poll_read(cx, buf) + } +} + +impl tokio::io::AsyncWrite for BoxedIo { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + std::pin::Pin::new(&mut *self.0).poll_write(cx, buf) + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::pin::Pin::new(&mut *self.0).poll_flush(cx) + } + + fn poll_shutdown( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::pin::Pin::new(&mut *self.0).poll_shutdown(cx) + } +} + +impl Unpin for BoxedIo {} + +/// Convenience alias for the WebSocket stream returned by the proxy-aware +/// connect helpers. +pub type ProxiedWsStream = tokio_tungstenite::WebSocketStream; + +/// Resolve the effective proxy URL for a WebSocket connection to the +/// given `ws_url`, taking into account the per-channel `proxy_url` +/// override, the runtime proxy config, scope and no_proxy list. +fn resolve_ws_proxy_url( + service_key: &str, + ws_url: &str, + channel_proxy_url: Option<&str>, +) -> Option { + // 1. Explicit per-channel proxy always wins. + if let Some(url) = normalize_proxy_url_option(channel_proxy_url) { + return Some(url); + } + + // 2. Consult the runtime proxy config. + let cfg = runtime_proxy_config(); + if !cfg.should_apply_to_service(service_key) { + return None; + } + + // Check the no_proxy list against the WebSocket target host. + if let Ok(parsed) = reqwest::Url::parse(ws_url) { + if let Some(host) = parsed.host_str() { + let no_proxy_entries = cfg.normalized_no_proxy(); + if !no_proxy_entries.is_empty() { + let host_lower = host.to_ascii_lowercase(); + let matches_no_proxy = no_proxy_entries.iter().any(|entry| { + let entry = entry.trim().to_ascii_lowercase(); + if entry == "*" { + return true; + } + if host_lower == entry { + return true; + } + // Support ".example.com" matching "foo.example.com" + if let Some(suffix) = entry.strip_prefix('.') { + return host_lower.ends_with(suffix) || host_lower == suffix; + } + // Support "example.com" also matching "foo.example.com" + host_lower.ends_with(&format!(".{entry}")) + }); + if matches_no_proxy { + return None; + } + } + } + } + + // For wss:// prefer https_proxy, for ws:// prefer http_proxy, fall + // back to all_proxy in both cases. + let is_secure = ws_url.starts_with("wss://") || ws_url.starts_with("wss:"); + let preferred = if is_secure { + normalize_proxy_url_option(cfg.https_proxy.as_deref()) + } else { + normalize_proxy_url_option(cfg.http_proxy.as_deref()) + }; + preferred.or_else(|| normalize_proxy_url_option(cfg.all_proxy.as_deref())) +} + +/// Connect a WebSocket through the configured proxy (if any). +/// +/// When no proxy applies, this is a thin wrapper around +/// `tokio_tungstenite::connect_async`. When a proxy is active the +/// function tunnels the TCP connection through the proxy before +/// performing the WebSocket upgrade. +/// +/// `service_key` is the proxy-service selector (e.g. `"channel.discord"`). +/// `channel_proxy_url` is the optional per-channel proxy override. +pub async fn ws_connect_with_proxy( + ws_url: &str, + service_key: &str, + channel_proxy_url: Option<&str>, +) -> anyhow::Result<( + ProxiedWsStream, + tokio_tungstenite::tungstenite::http::Response>>, +)> { + let proxy_url = resolve_ws_proxy_url(service_key, ws_url, channel_proxy_url); + + match proxy_url { + None => { + // No proxy — delegate directly. + let (stream, resp) = tokio_tungstenite::connect_async(ws_url).await?; + // Re-wrap the inner stream into our boxed type so the caller + // always gets `ProxiedWsStream`. + let inner = stream.into_inner(); + let boxed = BoxedIo(Box::new(inner)); + let ws = tokio_tungstenite::WebSocketStream::from_raw_socket( + boxed, + tokio_tungstenite::tungstenite::protocol::Role::Client, + None, + ) + .await; + Ok((ws, resp)) + } + Some(proxy) => ws_connect_via_proxy(ws_url, &proxy).await, + } +} + +/// Establish a WebSocket connection tunnelled through the given proxy URL. +async fn ws_connect_via_proxy( + ws_url: &str, + proxy_url: &str, +) -> anyhow::Result<( + ProxiedWsStream, + tokio_tungstenite::tungstenite::http::Response>>, +)> { + use tokio::io::{AsyncReadExt, AsyncWriteExt as _}; + use tokio::net::TcpStream; + + let target = + reqwest::Url::parse(ws_url).with_context(|| format!("Invalid WebSocket URL: {ws_url}"))?; + let target_host = target + .host_str() + .ok_or_else(|| anyhow::anyhow!("WebSocket URL has no host: {ws_url}"))? + .to_string(); + let target_port = target + .port_or_known_default() + .unwrap_or(if target.scheme() == "wss" { 443 } else { 80 }); + + let proxy = reqwest::Url::parse(proxy_url) + .with_context(|| format!("Invalid proxy URL: {proxy_url}"))?; + + let stream: BoxedIo = match proxy.scheme() { + "socks5" | "socks5h" | "socks" => { + let proxy_addr = format!( + "{}:{}", + proxy.host_str().unwrap_or("127.0.0.1"), + proxy.port_or_known_default().unwrap_or(1080) + ); + let target_addr = format!("{target_host}:{target_port}"); + let socks_stream = if proxy.username().is_empty() { + tokio_socks::tcp::Socks5Stream::connect(proxy_addr.as_str(), target_addr.as_str()) + .await + .with_context(|| format!("SOCKS5 connect to {target_addr} via {proxy_addr}"))? + } else { + let password = proxy.password().unwrap_or(""); + tokio_socks::tcp::Socks5Stream::connect_with_password( + proxy_addr.as_str(), + target_addr.as_str(), + proxy.username(), + password, + ) + .await + .with_context(|| format!("SOCKS5 auth connect to {target_addr} via {proxy_addr}"))? + }; + let tcp: TcpStream = socks_stream.into_inner(); + BoxedIo(Box::new(tcp)) + } + "http" | "https" => { + let proxy_host = proxy.host_str().unwrap_or("127.0.0.1"); + let proxy_port = proxy.port_or_known_default().unwrap_or(8080); + let proxy_addr = format!("{proxy_host}:{proxy_port}"); + + let mut tcp = TcpStream::connect(&proxy_addr) + .await + .with_context(|| format!("TCP connect to HTTP proxy {proxy_addr}"))?; + + // Send HTTP CONNECT request. + let connect_req = format!( + "CONNECT {target_host}:{target_port} HTTP/1.1\r\nHost: {target_host}:{target_port}\r\n\r\n" + ); + tcp.write_all(connect_req.as_bytes()).await?; + + // Read the response (we only need the status line). + let mut buf = vec![0u8; 4096]; + let mut total = 0usize; + loop { + let n = tcp.read(&mut buf[total..]).await?; + if n == 0 { + anyhow::bail!("HTTP CONNECT proxy closed connection before response"); + } + total += n; + // Look for end of HTTP headers. + if let Some(pos) = find_header_end(&buf[..total]) { + let status_line = std::str::from_utf8(&buf[..pos]) + .unwrap_or("") + .lines() + .next() + .unwrap_or(""); + if !status_line.contains("200") { + anyhow::bail!( + "HTTP CONNECT proxy returned non-200 response: {status_line}" + ); + } + break; + } + if total >= buf.len() { + anyhow::bail!("HTTP CONNECT proxy response too large"); + } + } + + BoxedIo(Box::new(tcp)) + } + scheme => { + anyhow::bail!("Unsupported proxy scheme '{scheme}' for WebSocket connections"); + } + }; + + // If the target is wss://, wrap in TLS. + let is_secure = target.scheme() == "wss"; + let stream: BoxedIo = if is_secure { + let mut root_store = rustls::RootCertStore::empty(); + root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + let tls_config = std::sync::Arc::new( + rustls::ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(), + ); + let connector = tokio_rustls::TlsConnector::from(tls_config); + let server_name = rustls_pki_types::ServerName::try_from(target_host.clone()) + .with_context(|| format!("Invalid TLS server name: {target_host}"))?; + + // `stream` is `BoxedIo` — we need a concrete `AsyncRead + AsyncWrite` + // for `TlsConnector::connect`. Since `BoxedIo` already satisfies + // those bounds we can pass it directly. + let tls_stream = connector + .connect(server_name, stream) + .await + .with_context(|| format!("TLS handshake with {target_host}"))?; + BoxedIo(Box::new(tls_stream)) + } else { + stream + }; + + // Perform the WebSocket client handshake over the tunnelled stream. + let ws_request = tokio_tungstenite::tungstenite::http::Request::builder() + .uri(ws_url) + .header("Host", format!("{target_host}:{target_port}")) + .header("Connection", "Upgrade") + .header("Upgrade", "websocket") + .header( + "Sec-WebSocket-Key", + tokio_tungstenite::tungstenite::handshake::client::generate_key(), + ) + .header("Sec-WebSocket-Version", "13") + .body(()) + .with_context(|| "Failed to build WebSocket upgrade request")?; + + let (ws_stream, response) = tokio_tungstenite::client_async(ws_request, stream) + .await + .with_context(|| format!("WebSocket handshake failed for {ws_url}"))?; + + Ok((ws_stream, response)) +} + +/// Find the `\r\n\r\n` boundary marking the end of HTTP headers. +fn find_header_end(buf: &[u8]) -> Option { + buf.windows(4).position(|w| w == b"\r\n\r\n").map(|p| p + 4) +} + fn parse_proxy_scope(raw: &str) -> Option { match raw.trim().to_ascii_lowercase().as_str() { "environment" | "env" => Some(ProxyScope::Environment),