Compare commits

...

18 Commits

Author SHA1 Message Date
Argenis a2c9e3cb29 Merge branch 'master' into work-issues/3417-fix-tilde-home-dir-expansion 2026-03-13 14:15:47 -04:00
SimianAstronaut7 e3e711073a feat(providers): support custom HTTP headers for LLM API requests (#3423)
Add `extra_headers` config field and `ZEROCLAW_EXTRA_HEADERS` env var
support so users can specify custom HTTP headers for provider API
requests. This enables connecting to providers that require specific
headers (e.g., User-Agent, HTTP-Referer, X-Title) without a reverse
proxy.

Config file headers serve as the base; env var headers override them.
Format: `Key:Value,Key2:Value2`

Closes #3189

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Argenis <theonlyhennygod@gmail.com>
2026-03-13 14:15:42 -04:00
SimianAstronaut7 1033287f38 fix(gateway): skip pairing dialog when require_pairing is disabled (#3422)
When `require_pairing = false` in config, the dashboard showed the
pairing dialog even though no pairing code exists, creating a deadlock.

Add `requiresPairing` field to AuthState (defaults to `true` as a safe
fallback) and update it from the `/health` endpoint response. Gate the
pairing dialog in App.tsx on both `!isAuthenticated` and
`requiresPairing` so the dashboard loads directly when pairing is
disabled.

Closes #3267

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 14:08:59 -04:00
simianastronaut 320171c66a fix(daemon): expand tilde to home directory in file paths
Rust treats `~` as a literal path character, not a home directory
shorthand. Several config resolution paths used `PathBuf::from()` on
user-provided strings without expanding `~` first, causing a literal
`~` folder to be created in the working directory.

Apply `shellexpand::tilde()` to all user-facing path inputs:
- ZEROCLAW_CONFIG_DIR env var (config/schema.rs, onboard/wizard.rs)
- ZEROCLAW_WORKSPACE env var (config/schema.rs, onboard/wizard.rs,
  channels/matrix.rs)
- active_workspace.toml marker file config_dir (config/schema.rs)

The WhatsApp Web session_path was already correctly expanded via
shellexpand::tilde() in whatsapp_web.rs.

Closes #3417

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 11:07:40 -04:00
Argenis 6a4ccaeb73 fix: strip stale tool_result from conversation history and memory context (#3418)
Prevent orphan `<tool_result>` blocks from leaking into LLM sessions:

- Strip `<tool_result>` blocks from cached prior turns in
  `process_channel_message` so the LLM never sees a tool result
  without a preceding tool call (Case A — in-memory accumulation).
- Skip memory entries containing `<tool_result` in both
  `should_skip_memory_context_entry` (channel path) and
  `build_context` (agent path) so SQLite-recalled tool output
  is never injected as memory context (Case B — post-restart).

Closes #3402
2026-03-13 09:55:57 -04:00
Argenis 4aead04916 fix: skip documentation URLs in cloudflare tunnel URL parser (#3416)
The URL parser captured the first https:// URL found in cloudflared
stderr output. When cloudflared emits a quic-go UDP buffer warning
containing a github.com link, that documentation URL was incorrectly
captured as the tunnel's public URL.

Extract URL parsing into a testable helper function that skips known
documentation domains (github.com, cloudflare.com/docs,
developers.cloudflare.com) and recognises tunnel-specific log prefixes
("Visit it at", "Route at", "Registered tunnel connection") and the
.trycloudflare.com domain.

Closes #3413
2026-03-13 09:40:02 -04:00
Argenis 7b23c8934c docs: clarify master-only branch policy and clean up stale references (#3415)
Add a prominent migration notice to CONTRIBUTING.md with explicit
instructions for contributors who still have local or forked main
branches. Fix the last remaining main branch reference in
python/pyproject.toml. Stale merged branches and main-related remote
branches have been deleted.

Refs: #2929, #3061
2026-03-13 09:38:11 -04:00
Argenis 5d1543100d fix: support Linq 2026-02-03 webhook payload shape (#3337) (#3410)
Closes #3337

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 09:31:53 -04:00
Argenis e3a91bc805 fix: gate prometheus and fix AtomicU64 for 32-bit targets (#3409)
* fix: gate prometheus and fix AtomicU64 for 32-bit targets (#3335)

Closes #3335

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* style: fix import ordering for cfg-gated atomics

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 09:31:25 -04:00
Argenis 833fdefbe5 fix: restore MCP support missing from master branch (#3412)
MCP (Model Context Protocol) config and tool modules were added on the
old `main` branch but never made it to `master`. This restores the full
MCP subsystem: config schema, transport layer (stdio/HTTP/SSE), client
registry, tool wrapper, config validation, and channel wiring.

Closes #3379

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 09:20:37 -04:00
Argenis 13f74f0ecc fix: restore web dashboard by auto-building frontend in build.rs (#3408)
The build.rs was reduced to only creating an empty web/dist/ directory,
which meant rust-embed would embed no files and the SPA fallback handler
would return 404 for every request including `/`. This is a regression
from v0.1.8 where web/dist/ was still tracked in git.

Update build.rs to detect when web/dist/index.html is missing and
automatically run `npm ci && npm run build` if npm is available. The
build is best-effort: when Node.js is absent the Rust build still
succeeds with an empty dist directory (release workflows pre-build the
frontend separately).

Closes #3386

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 09:20:34 -04:00
Argenis 9ff045d2e9 fix: resolve install.sh prebuilt download 404 by querying releases API (#3406)
The /releases/latest/download/ URL only resolves to the latest non-prerelease,
non-draft release. When that release has no binary assets (e.g. v0.1.9a),
--prebuilt-only fails with a 404. This adds resolve_asset_url() which queries
the GitHub releases API for the newest release (including prereleases) that
actually contains the requested asset, falling back to /releases/latest/ if
the API call fails.

Closes #3389

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 09:20:31 -04:00
Argenis 6fe8e3a5bb fix: gracefully handle reasoning_enabled for unsupported Ollama models (#3411)
When reasoning_enabled is configured, the Ollama provider sends
think=true to all models. Models that don't support the think parameter
(e.g. qwen3.5:0.8b) cause request failures that the reliable provider
classifies as retryable, leading to an infinite retry loop.

Fix: when a request with think=true fails, automatically retry once
with think omitted. This lets the call succeed on models that lack
reasoning support while preserving thinking for capable models.

Closes #3183
Related #850

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 09:16:03 -04:00
Argenis 5dc1750df7 fix: add crypto.randomUUID fallback for older browsers (#3407)
Replace direct `crypto.randomUUID()` calls in the web dashboard with a
`generateUUID()` utility that falls back to a manual UUID v4 implementation
using `crypto.getRandomValues()` when `randomUUID` is unavailable (older
Safari, some Electron builds, Raspberry Pi browsers).

Closes #3303
Closes #3261

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 09:16:00 -04:00
SimianAstronaut7 b40c9e77af Merge pull request #3365 from zeroclaw-labs/ci/fix-glibc-cache-mismatch
ci: pin release workflows to ubuntu-latest to fix glibc cache mismatch
2026-03-12 17:44:42 -04:00
simianastronaut 34cac3d9dd ci: pin release workflows to ubuntu-latest to fix glibc cache mismatch
CI workflows use ubuntu-latest (24.04, glibc 2.39) while release
workflows used ubuntu-22.04 (glibc 2.35). Swatinem/rust-cache keys
on runner.os ("Linux"), not the specific version, so cached build
scripts compiled on 24.04 would fail on 22.04 with GLIBC_2.39 not
found errors.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 17:37:31 -04:00
SimianAstronaut7 badf96dcab Merge pull request #3363 from zeroclaw-labs/ci/faster-apple-build
ci: use thin LTO profile for faster CI builds
2026-03-12 17:31:29 -04:00
simianastronaut c1e1228fb0 ci: use thin LTO profile for faster CI builds
The release profile uses fat LTO + codegen-units=1, which is
optimal for distribution binaries but unnecessarily slow for CI
validation builds. Add a dedicated `ci` profile with thin LTO and
codegen-units=16, and use it in both CI workflows.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 17:18:36 -04:00
35 changed files with 2704 additions and 107 deletions
+1 -1
View File
@@ -96,7 +96,7 @@ jobs:
- name: Build release
shell: bash
run: cargo build --release --locked --target ${{ matrix.target }}
run: cargo build --profile ci --locked --target ${{ matrix.target }}
env:
CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER: clang
CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
+1 -1
View File
@@ -124,7 +124,7 @@ jobs:
- name: Build release
shell: bash
run: cargo build --release --locked --target ${{ matrix.target }}
run: cargo build --profile ci --locked --target ${{ matrix.target }}
env:
CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER: clang
CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
+2 -2
View File
@@ -65,11 +65,11 @@ jobs:
fail-fast: false
matrix:
include:
- os: ubuntu-22.04
- os: ubuntu-latest
target: x86_64-unknown-linux-gnu
artifact: zeroclaw
ext: tar.gz
- os: ubuntu-22.04
- os: ubuntu-latest
target: aarch64-unknown-linux-gnu
artifact: zeroclaw
ext: tar.gz
+2 -2
View File
@@ -83,11 +83,11 @@ jobs:
fail-fast: false
matrix:
include:
- os: ubuntu-22.04
- os: ubuntu-latest
target: x86_64-unknown-linux-gnu
artifact: zeroclaw
ext: tar.gz
- os: ubuntu-22.04
- os: ubuntu-latest
target: aarch64-unknown-linux-gnu
artifact: zeroclaw
ext: tar.gz
+27 -7
View File
@@ -2,20 +2,41 @@
Thanks for your interest in contributing to ZeroClaw! This guide will help you get started.
---
## ⚠️ Branch Migration Notice (March 2026)
**`master` is the ONLY default branch. The `main` branch no longer exists.**
If you have an existing fork or local clone that tracks `main`, you **must** update it:
```bash
# Update your local clone to track master
git checkout master
git branch -D main 2>/dev/null # delete local main if it exists
git remote set-head origin master
git fetch origin --prune # remove stale remote refs
# If your fork still has a main branch, delete it
git push origin --delete main 2>/dev/null
```
All PRs must target **`master`**. PRs targeting `main` will be rejected.
**Background:** ZeroClaw previously used `main` in some documentation and scripts, which caused 404 errors, broken CI refs, and contributor confusion (see [#2929](https://github.com/zeroclaw-labs/zeroclaw/issues/2929), [#3061](https://github.com/zeroclaw-labs/zeroclaw/issues/3061), [#3194](https://github.com/zeroclaw-labs/zeroclaw/pull/3194)). As of March 2026, all references have been corrected, stale branches cleaned up, and the `main` branch permanently deleted.
---
## Branching Model
> **Important — `master` is the default branch.**
>
> ZeroClaw uses **`master`** as its single source-of-truth branch. The `main` branch has been removed.
>
> Previously, some documentation and scripts referenced a `main` branch, which caused 404 errors and contributor confusion (see [#2929](https://github.com/zeroclaw-labs/zeroclaw/issues/2929), [#3061](https://github.com/zeroclaw-labs/zeroclaw/issues/3061), [#3194](https://github.com/zeroclaw-labs/zeroclaw/pull/3194)). As of March 2026, all references have been corrected and the `main` branch deleted.
> **`master`** is the single source-of-truth branch.
>
> **How contributors should work:**
> 1. Fork the repository
> 2. Create a `feat/*` or `fix/*` branch from `master`
> 3. Open a PR targeting `master`
>
> Do **not** create or push to a `main` branch.
> Do **not** create or push to a `main` branch. There is no `main` branch — it will not work.
## First-Time Contributors
@@ -559,4 +580,3 @@ Recommended scope keys in commit titles:
## License
By contributing, you agree that your contributions will be licensed under the MIT License.
# Contributing Guide Update
+9 -2
View File
@@ -48,8 +48,8 @@ schemars = "1.2"
tracing = { version = "0.1", default-features = false }
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "ansi", "env-filter"] }
# Observability - Prometheus metrics
prometheus = { version = "0.14", default-features = false }
# Observability - Prometheus metrics (optional; requires AtomicU64, unavailable on 32-bit)
prometheus = { version = "0.14", default-features = false, optional = true }
# Base64 encoding (screenshots, image data)
base64 = "0.22"
@@ -205,6 +205,8 @@ sandbox-landlock = ["dep:landlock"]
sandbox-bubblewrap = []
# Backward-compatible alias for older invocations
landlock = ["sandbox-landlock"]
# Prometheus metrics observer (requires 64-bit atomics; disable on 32-bit targets)
metrics = ["dep:prometheus"]
# probe = probe-rs for Nucleo memory read (adds ~50 deps; optional)
probe = ["dep:probe-rs"]
# rag-pdf = PDF ingestion for datasheet RAG
@@ -225,6 +227,11 @@ inherits = "release"
codegen-units = 8 # Parallel codegen for faster builds on powerful machines (16GB+ RAM recommended)
# Use: cargo build --profile release-fast
[profile.ci]
inherits = "release"
lto = "thin" # Much faster than fat LTO; still catches release-mode issues
codegen-units = 16 # Full parallelism for CI runners
[profile.dist]
inherits = "release"
opt-level = "z"
+107 -3
View File
@@ -1,6 +1,110 @@
use std::path::Path;
use std::process::Command;
fn main() {
let dir = std::path::Path::new("web/dist");
if !dir.exists() {
std::fs::create_dir_all(dir).expect("failed to create web/dist/");
let dist_dir = Path::new("web/dist");
let web_dir = Path::new("web");
// Tell Cargo to re-run this script when web source files change.
println!("cargo:rerun-if-changed=web/src");
println!("cargo:rerun-if-changed=web/index.html");
println!("cargo:rerun-if-changed=web/package.json");
println!("cargo:rerun-if-changed=web/vite.config.ts");
// Attempt to build the web frontend if npm is available and web/dist is
// missing or stale. The build is best-effort: when Node.js is not
// installed (e.g. CI containers, cross-compilation, minimal dev setups)
// we fall back to the existing stub/empty dist directory so the Rust
// build still succeeds.
let needs_build = !dist_dir.join("index.html").exists();
if needs_build && web_dir.join("package.json").exists() {
if let Ok(npm) = which_npm() {
eprintln!("cargo:warning=Building web frontend (web/dist is missing or stale)...");
// npm ci / npm install
let install_status = Command::new(&npm)
.args(["ci", "--ignore-scripts"])
.current_dir(web_dir)
.status();
match install_status {
Ok(s) if s.success() => {}
Ok(s) => {
// Fall back to `npm install` if `npm ci` fails (no lockfile, etc.)
eprintln!("cargo:warning=npm ci exited with {s}, trying npm install...");
let fallback = Command::new(&npm)
.args(["install"])
.current_dir(web_dir)
.status();
if !matches!(fallback, Ok(s) if s.success()) {
eprintln!("cargo:warning=npm install failed — skipping web build");
ensure_dist_dir(dist_dir);
return;
}
}
Err(e) => {
eprintln!("cargo:warning=Could not run npm: {e} — skipping web build");
ensure_dist_dir(dist_dir);
return;
}
}
// npm run build
let build_status = Command::new(&npm)
.args(["run", "build"])
.current_dir(web_dir)
.status();
match build_status {
Ok(s) if s.success() => {
eprintln!("cargo:warning=Web frontend built successfully.");
}
Ok(s) => {
eprintln!(
"cargo:warning=npm run build exited with {s} — web dashboard may be unavailable"
);
}
Err(e) => {
eprintln!(
"cargo:warning=Could not run npm build: {e} — web dashboard may be unavailable"
);
}
}
}
}
ensure_dist_dir(dist_dir);
}
/// Ensure the dist directory exists so `rust-embed` does not fail at compile
/// time even when the web frontend is not built.
fn ensure_dist_dir(dist_dir: &Path) {
if !dist_dir.exists() {
std::fs::create_dir_all(dist_dir).expect("failed to create web/dist/");
}
}
/// Locate the `npm` binary on the system PATH.
fn which_npm() -> Result<String, ()> {
let cmd = if cfg!(target_os = "windows") {
"where"
} else {
"which"
};
Command::new(cmd)
.arg("npm")
.output()
.ok()
.and_then(|output| {
if output.status.success() {
String::from_utf8(output.stdout)
.ok()
.map(|s| s.lines().next().unwrap_or("npm").trim().to_string())
} else {
None
}
})
.ok_or(())
}
+38 -3
View File
@@ -211,8 +211,35 @@ should_attempt_prebuilt_for_resources() {
return 1
}
resolve_asset_url() {
local asset_name="$1"
local api_url="https://api.github.com/repos/zeroclaw-labs/zeroclaw/releases"
local releases_json download_url
# Fetch up to 10 recent releases (includes prereleases) and find the first
# one that contains the requested asset.
releases_json="$(curl -fsSL "${api_url}?per_page=10" 2>/dev/null || true)"
if [[ -z "$releases_json" ]]; then
return 1
fi
# Parse with simple grep/sed — avoids jq dependency.
download_url="$(printf '%s\n' "$releases_json" \
| tr ',' '\n' \
| grep '"browser_download_url"' \
| sed 's/.*"browser_download_url"[[:space:]]*:[[:space:]]*"\([^"]*\)".*/\1/' \
| grep "/${asset_name}\$" \
| head -n 1)"
if [[ -z "$download_url" ]]; then
return 1
fi
echo "$download_url"
}
install_prebuilt_binary() {
local target archive_url temp_dir archive_path extracted_bin install_dir
local target archive_url temp_dir archive_path extracted_bin install_dir asset_name
if ! have_cmd curl; then
warn "curl is required for pre-built binary installation."
@@ -229,9 +256,17 @@ install_prebuilt_binary() {
return 1
fi
archive_url="https://github.com/zeroclaw-labs/zeroclaw/releases/latest/download/zeroclaw-${target}.tar.gz"
asset_name="zeroclaw-${target}.tar.gz"
# Try the GitHub API first to find the newest release (including prereleases)
# that actually contains the asset, then fall back to /releases/latest/.
archive_url="$(resolve_asset_url "$asset_name" || true)"
if [[ -z "$archive_url" ]]; then
archive_url="https://github.com/zeroclaw-labs/zeroclaw/releases/latest/download/${asset_name}"
fi
temp_dir="$(mktemp -d -t zeroclaw-prebuilt-XXXXXX)"
archive_path="$temp_dir/zeroclaw-${target}.tar.gz"
archive_path="$temp_dir/${asset_name}"
info "Attempting pre-built binary install for target: $target"
if ! curl -fsSL "$archive_url" -o "$archive_path"; then
+1 -1
View File
@@ -52,7 +52,7 @@ dev = [
[project.urls]
Homepage = "https://github.com/zeroclaw-labs/zeroclaw"
Documentation = "https://github.com/zeroclaw-labs/zeroclaw/tree/main/python"
Documentation = "https://github.com/zeroclaw-labs/zeroclaw/tree/master/python"
Repository = "https://github.com/zeroclaw-labs/zeroclaw"
Issues = "https://github.com/zeroclaw-labs/zeroclaw/issues"
+8
View File
@@ -267,6 +267,12 @@ async fn build_context(mem: &dyn Memory, user_msg: &str, min_relevance_score: f6
if memory::is_assistant_autosave_key(&entry.key) {
continue;
}
// Skip entries containing tool_result blocks — they can leak
// stale tool output from previous heartbeat ticks into new
// sessions, presenting the LLM with orphan tool_result data.
if entry.content.contains("<tool_result") {
continue;
}
let _ = writeln!(context, "- {}: {}", entry.key, entry.content);
}
if context == "[Memory context]\n" {
@@ -2892,6 +2898,7 @@ pub async fn run(
secrets_encrypt: config.secrets.encrypt,
reasoning_enabled: config.runtime.reasoning_enabled,
provider_timeout_secs: Some(config.provider_timeout_secs),
extra_headers: config.extra_headers.clone(),
};
let provider: Box<dyn Provider> = providers::create_routed_provider_with_options(
@@ -3364,6 +3371,7 @@ pub async fn process_message(config: Config, message: &str) -> Result<String> {
secrets_encrypt: config.secrets.encrypt,
reasoning_enabled: config.runtime.reasoning_enabled,
provider_timeout_secs: Some(config.provider_timeout_secs),
extra_headers: config.extra_headers.clone(),
};
let provider: Box<dyn Provider> = providers::create_routed_provider_with_options(
provider_name,
+8 -1
View File
@@ -1,6 +1,10 @@
use crate::channels::traits::{Channel, ChannelMessage, SendMessage};
use async_trait::async_trait;
use std::sync::atomic::{AtomicU64, Ordering};
#[cfg(not(target_has_atomic = "64"))]
use std::sync::atomic::AtomicU32;
#[cfg(target_has_atomic = "64")]
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::{mpsc, Mutex};
@@ -13,7 +17,10 @@ use tokio_rustls::rustls;
const READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300);
/// Monotonic counter to ensure unique message IDs under burst traffic.
#[cfg(target_has_atomic = "64")]
static MSG_SEQ: AtomicU64 = AtomicU64::new(0);
#[cfg(not(target_has_atomic = "64"))]
static MSG_SEQ: AtomicU32 = AtomicU32::new(0);
/// IRC over TLS channel.
///
+289 -22
View File
@@ -61,20 +61,33 @@ impl LinqChannel {
/// Parse an incoming webhook payload from Linq and extract messages.
///
/// Linq webhook envelope:
/// Supports two webhook formats:
///
/// **New format (webhook_version 2026-02-03):**
/// ```json
/// {
/// "api_version": "v3",
/// "webhook_version": "2026-02-03",
/// "event_type": "message.received",
/// "data": {
/// "id": "msg-...",
/// "direction": "inbound",
/// "sender_handle": { "handle": "+1...", "is_me": false },
/// "chat": { "id": "chat-..." },
/// "parts": [{ "type": "text", "value": "..." }]
/// }
/// }
/// ```
///
/// **Legacy format (webhook_version 2025-01-01):**
/// ```json
/// {
/// "api_version": "v3",
/// "event_type": "message.received",
/// "event_id": "...",
/// "created_at": "...",
/// "trace_id": "...",
/// "data": {
/// "chat_id": "...",
/// "from": "+1...",
/// "recipient_phone": "+1...",
/// "is_from_me": false,
/// "service": "iMessage",
/// "message": {
/// "id": "...",
/// "parts": [{ "type": "text", "value": "..." }]
@@ -99,18 +112,44 @@ impl LinqChannel {
return messages;
};
// Detect format: new format has `sender_handle`, legacy has `from`.
let is_new_format = data.get("sender_handle").is_some();
// Skip messages sent by the bot itself
if data
.get("is_from_me")
.and_then(|v| v.as_bool())
.unwrap_or(false)
{
let is_from_me = if is_new_format {
// New format: data.sender_handle.is_me or data.direction == "outbound"
data.get("sender_handle")
.and_then(|sh| sh.get("is_me"))
.and_then(|v| v.as_bool())
.unwrap_or(false)
|| data
.get("direction")
.and_then(|d| d.as_str())
.is_some_and(|d| d == "outbound")
} else {
// Legacy format: data.is_from_me
data.get("is_from_me")
.and_then(|v| v.as_bool())
.unwrap_or(false)
};
if is_from_me {
tracing::debug!("Linq: skipping is_from_me message");
return messages;
}
// Get sender phone number
let Some(from) = data.get("from").and_then(|f| f.as_str()) else {
let from = if is_new_format {
// New format: data.sender_handle.handle
data.get("sender_handle")
.and_then(|sh| sh.get("handle"))
.and_then(|h| h.as_str())
} else {
// Legacy format: data.from
data.get("from").and_then(|f| f.as_str())
};
let Some(from) = from else {
return messages;
};
@@ -132,18 +171,33 @@ impl LinqChannel {
}
// Get chat_id for reply routing
let chat_id = data
.get("chat_id")
.and_then(|c| c.as_str())
.unwrap_or("")
.to_string();
// Extract text from message parts
let Some(message) = data.get("message") else {
return messages;
let chat_id = if is_new_format {
// New format: data.chat.id
data.get("chat")
.and_then(|c| c.get("id"))
.and_then(|id| id.as_str())
.unwrap_or("")
.to_string()
} else {
// Legacy format: data.chat_id
data.get("chat_id")
.and_then(|c| c.as_str())
.unwrap_or("")
.to_string()
};
let Some(parts) = message.get("parts").and_then(|p| p.as_array()) else {
// Extract message parts
let parts = if is_new_format {
// New format: data.parts (directly on data)
data.get("parts").and_then(|p| p.as_array())
} else {
// Legacy format: data.message.parts
data.get("message")
.and_then(|m| m.get("parts"))
.and_then(|p| p.as_array())
};
let Some(parts) = parts else {
return messages;
};
@@ -790,4 +844,217 @@ mod tests {
let ch = make_channel();
assert_eq!(ch.phone_number(), "+15551234567");
}
// ---- New format (2026-02-03) tests ----
#[test]
fn linq_parse_new_format_text_message() {
let ch = make_channel();
let payload = serde_json::json!({
"api_version": "v3",
"webhook_version": "2026-02-03",
"event_type": "message.received",
"event_id": "evt-123",
"created_at": "2026-03-01T12:00:00Z",
"trace_id": "trace-456",
"data": {
"id": "msg-abc",
"direction": "inbound",
"sender_handle": {
"handle": "+1234567890",
"is_me": false
},
"chat": { "id": "chat-789" },
"service": "iMessage",
"parts": [{
"type": "text",
"value": "Hello from new format!"
}]
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].sender, "+1234567890");
assert_eq!(msgs[0].content, "Hello from new format!");
assert_eq!(msgs[0].channel, "linq");
assert_eq!(msgs[0].reply_target, "chat-789");
}
#[test]
fn linq_parse_new_format_skip_is_me() {
let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]);
let payload = serde_json::json!({
"event_type": "message.received",
"webhook_version": "2026-02-03",
"data": {
"id": "msg-abc",
"direction": "outbound",
"sender_handle": {
"handle": "+15551234567",
"is_me": true
},
"chat": { "id": "chat-789" },
"parts": [{ "type": "text", "value": "My own message" }]
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert!(
msgs.is_empty(),
"is_me messages should be skipped in new format"
);
}
#[test]
fn linq_parse_new_format_skip_outbound_direction() {
let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]);
let payload = serde_json::json!({
"event_type": "message.received",
"webhook_version": "2026-02-03",
"data": {
"id": "msg-abc",
"direction": "outbound",
"sender_handle": {
"handle": "+15551234567",
"is_me": false
},
"chat": { "id": "chat-789" },
"parts": [{ "type": "text", "value": "Outbound" }]
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert!(msgs.is_empty(), "outbound direction should be skipped");
}
#[test]
fn linq_parse_new_format_unauthorized_sender() {
let ch = make_channel();
let payload = serde_json::json!({
"event_type": "message.received",
"webhook_version": "2026-02-03",
"data": {
"id": "msg-abc",
"direction": "inbound",
"sender_handle": {
"handle": "+9999999999",
"is_me": false
},
"chat": { "id": "chat-789" },
"parts": [{ "type": "text", "value": "Spam" }]
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert!(
msgs.is_empty(),
"Unauthorized senders should be filtered in new format"
);
}
#[test]
fn linq_parse_new_format_media_image() {
let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]);
let payload = serde_json::json!({
"event_type": "message.received",
"webhook_version": "2026-02-03",
"data": {
"id": "msg-abc",
"direction": "inbound",
"sender_handle": {
"handle": "+1234567890",
"is_me": false
},
"chat": { "id": "chat-789" },
"parts": [{
"type": "media",
"url": "https://example.com/photo.png",
"mime_type": "image/png"
}]
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].content, "[IMAGE:https://example.com/photo.png]");
}
#[test]
fn linq_parse_new_format_multiple_parts() {
let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]);
let payload = serde_json::json!({
"event_type": "message.received",
"webhook_version": "2026-02-03",
"data": {
"id": "msg-abc",
"direction": "inbound",
"sender_handle": {
"handle": "+1234567890",
"is_me": false
},
"chat": { "id": "chat-789" },
"parts": [
{ "type": "text", "value": "Check this out" },
{ "type": "media", "url": "https://example.com/img.jpg", "mime_type": "image/jpeg" }
]
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert_eq!(msgs.len(), 1);
assert_eq!(
msgs[0].content,
"Check this out\n[IMAGE:https://example.com/img.jpg]"
);
}
#[test]
fn linq_parse_new_format_fallback_reply_target_when_no_chat() {
let ch = LinqChannel::new("tok".into(), "+15551234567".into(), vec!["*".into()]);
let payload = serde_json::json!({
"event_type": "message.received",
"webhook_version": "2026-02-03",
"data": {
"id": "msg-abc",
"direction": "inbound",
"sender_handle": {
"handle": "+1234567890",
"is_me": false
},
"parts": [{ "type": "text", "value": "Hi" }]
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].reply_target, "+1234567890");
}
#[test]
fn linq_parse_new_format_normalizes_phone() {
let ch = LinqChannel::new(
"tok".into(),
"+15551234567".into(),
vec!["+1234567890".into()],
);
let payload = serde_json::json!({
"event_type": "message.received",
"webhook_version": "2026-02-03",
"data": {
"id": "msg-abc",
"direction": "inbound",
"sender_handle": {
"handle": "1234567890",
"is_me": false
},
"chat": { "id": "chat-789" },
"parts": [{ "type": "text", "value": "Hi" }]
}
});
let msgs = ch.parse_webhook_payload(&payload);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].sender, "+1234567890");
}
}
+5 -2
View File
@@ -765,8 +765,11 @@ impl Channel for MatrixChannel {
// Download media to workspace if present
let body = if let Some((url, filename)) = media_download {
let workspace = std::path::PathBuf::from(
std::env::var("ZEROCLAW_WORKSPACE")
.unwrap_or_else(|_| "/tmp/zeroclaw-uploads".to_string()),
shellexpand::tilde(
&std::env::var("ZEROCLAW_WORKSPACE")
.unwrap_or_else(|_| "/tmp/zeroclaw-uploads".to_string()),
)
.as_ref(),
);
let _ = tokio::fs::create_dir_all(&workspace).await;
let dest = workspace.join(&filename);
+116 -4
View File
@@ -89,7 +89,11 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Write;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
#[cfg(not(target_has_atomic = "64"))]
use std::sync::atomic::AtomicU32;
#[cfg(target_has_atomic = "64")]
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{Duration, Instant, SystemTime};
use tokio_util::sync::CancellationToken;
@@ -571,6 +575,25 @@ fn normalize_cached_channel_turns(turns: Vec<ChatMessage>) -> Vec<ChatMessage> {
normalized
}
/// Remove `<tool_result …>…</tool_result>` blocks (and a leading `[Tool results]`
/// header, if present) from a conversation-history entry so that stale tool
/// output is never presented to the LLM without the corresponding `<tool_call>`.
fn strip_tool_result_content(text: &str) -> String {
static TOOL_RESULT_RE: std::sync::LazyLock<regex::Regex> = std::sync::LazyLock::new(|| {
regex::Regex::new(r"(?s)<tool_result[^>]*>.*?</tool_result>").unwrap()
});
let cleaned = TOOL_RESULT_RE.replace_all(text, "");
let cleaned = cleaned.trim();
// If the only remaining content is the header, drop it entirely.
if cleaned == "[Tool results]" || cleaned.is_empty() {
return String::new();
}
cleaned.to_string()
}
fn supports_runtime_model_switch(channel_name: &str) -> bool {
matches!(channel_name, "telegram" | "discord" | "matrix")
}
@@ -952,6 +975,14 @@ fn should_skip_memory_context_entry(key: &str, content: &str) -> bool {
return true;
}
// Skip entries containing tool_result blocks. After a daemon restart
// these can be recalled from SQLite and injected as memory context,
// presenting the LLM with a `<tool_result>` without a preceding
// `<tool_call>` and triggering hallucinated output.
if content.contains("<tool_result") {
return true;
}
content.chars().count() > MEMORY_CONTEXT_MAX_CHARS
}
@@ -1758,6 +1789,15 @@ async fn process_channel_message(
.unwrap_or_default();
let mut prior_turns = normalize_cached_channel_turns(prior_turns_raw);
// Strip stale tool_result blocks from cached turns so the LLM never
// sees a `<tool_result>` without a preceding `<tool_call>`, which
// causes hallucinated output on subsequent heartbeat ticks or sessions.
for turn in &mut prior_turns {
if turn.content.contains("<tool_result") {
turn.content = strip_tool_result_content(&turn.content);
}
}
// Only enrich with memory context when there is no prior conversation
// history. Follow-up turns already include context from previous messages.
if !had_prior_history {
@@ -2305,7 +2345,10 @@ async fn run_message_dispatch_loop(
String,
InFlightSenderTaskState,
>::new()));
#[cfg(target_has_atomic = "64")]
let task_sequence = Arc::new(AtomicU64::new(1));
#[cfg(not(target_has_atomic = "64"))]
let task_sequence = Arc::new(AtomicU32::new(1));
while let Some(msg) = rx.recv().await {
let permit = match Arc::clone(&semaphore).acquire_owned().await {
@@ -2323,7 +2366,7 @@ async fn run_message_dispatch_loop(
let sender_scope_key = interruption_scope_key(&msg);
let cancellation_token = CancellationToken::new();
let completion = Arc::new(InFlightTaskCompletion::new());
let task_id = task_sequence.fetch_add(1, Ordering::Relaxed);
let task_id = task_sequence.fetch_add(1, Ordering::Relaxed) as u64;
if interrupt_enabled {
let previous = {
@@ -3305,6 +3348,7 @@ pub async fn start_channels(config: Config) -> Result<()> {
secrets_encrypt: config.secrets.encrypt,
reasoning_enabled: config.runtime.reasoning_enabled,
provider_timeout_secs: Some(config.provider_timeout_secs),
extra_headers: config.extra_headers.clone(),
};
let provider: Arc<dyn Provider> = Arc::from(
create_resilient_provider_nonblocking(
@@ -3364,7 +3408,7 @@ pub async fn start_channels(config: Config) -> Result<()> {
};
// Build system prompt from workspace identity files + skills
let workspace = config.workspace_dir.clone();
let tools_registry = Arc::new(tools::all_tools_with_runtime(
let mut built_tools = tools::all_tools_with_runtime(
Arc::new(config.clone()),
&security,
runtime,
@@ -3378,7 +3422,44 @@ pub async fn start_channels(config: Config) -> Result<()> {
&config.agents,
config.api_key.as_deref(),
&config,
));
);
// Wire MCP tools into the registry before freezing — non-fatal.
if config.mcp.enabled && !config.mcp.servers.is_empty() {
tracing::info!(
"Initializing MCP client — {} server(s) configured",
config.mcp.servers.len()
);
match crate::tools::mcp_client::McpRegistry::connect_all(&config.mcp.servers).await {
Ok(registry) => {
let registry = std::sync::Arc::new(registry);
let names = registry.tool_names();
let mut registered = 0usize;
for name in names {
if let Some(def) = registry.get_tool_def(&name).await {
let wrapper = crate::tools::mcp_tool::McpToolWrapper::new(
name,
def,
std::sync::Arc::clone(&registry),
);
built_tools.push(Box::new(wrapper));
registered += 1;
}
}
tracing::info!(
"MCP: {} tool(s) registered from {} server(s)",
registered,
registry.server_count()
);
}
Err(e) => {
// Non-fatal — daemon continues with the tools registered above.
tracing::error!("MCP registry failed to initialize: {e:#}");
}
}
}
let tools_registry = Arc::new(built_tools);
let skills = crate::skills::load_skills_with_config(&workspace, &config);
@@ -3719,6 +3800,37 @@ mod tests {
"telegram_user_msg_101",
"Please describe the image"
));
// Entries containing tool_result blocks must be skipped (#3402).
assert!(should_skip_memory_context_entry(
"telegram_user_msg_200",
r#"[Tool results]
<tool_result name="shell">Mon Feb 20</tool_result>"#
));
assert!(!should_skip_memory_context_entry(
"telegram_user_msg_201",
"plain text without tool results"
));
}
#[test]
fn strip_tool_result_content_removes_blocks_and_header() {
let input = r#"[Tool results]
<tool_result name="shell">Mon Feb 20</tool_result>
<tool_result name="http_request">{"status":200}</tool_result>"#;
assert_eq!(strip_tool_result_content(input), "");
let mixed = "Some context\n<tool_result name=\"shell\">ok</tool_result>\nMore text";
let cleaned = strip_tool_result_content(mixed);
assert!(cleaned.contains("Some context"));
assert!(cleaned.contains("More text"));
assert!(!cleaned.contains("tool_result"));
assert_eq!(
strip_tool_result_content("no tool results here"),
"no tool results here"
);
assert_eq!(strip_tool_result_content(""), "");
}
#[test]
+7 -6
View File
@@ -10,12 +10,13 @@ pub use schema::{
CronConfig, DelegateAgentConfig, DiscordConfig, DockerRuntimeConfig, EdgeTtsConfig,
ElevenLabsTtsConfig, EmbeddingRouteConfig, EstopConfig, FeishuConfig, GatewayConfig,
GoogleTtsConfig, HardwareConfig, HardwareTransport, HeartbeatConfig, HooksConfig,
HttpRequestConfig, IMessageConfig, IdentityConfig, LarkConfig, MatrixConfig, MemoryConfig,
ModelRouteConfig, MultimodalConfig, NextcloudTalkConfig, ObservabilityConfig, OpenAiTtsConfig,
OtpConfig, OtpMethod, PeripheralBoardConfig, PeripheralsConfig, ProxyConfig, ProxyScope,
QdrantConfig, QueryClassificationConfig, ReliabilityConfig, ResourceLimitsConfig,
RuntimeConfig, SandboxBackend, SandboxConfig, SchedulerConfig, SecretsConfig, SecurityConfig,
SkillsConfig, SkillsPromptInjectionMode, SlackConfig, StorageConfig, StorageProviderConfig,
HttpRequestConfig, IMessageConfig, IdentityConfig, LarkConfig, MatrixConfig, McpConfig,
McpServerConfig, McpTransport, MemoryConfig, ModelRouteConfig, MultimodalConfig,
NextcloudTalkConfig, ObservabilityConfig, OpenAiTtsConfig, OtpConfig, OtpMethod,
PeripheralBoardConfig, PeripheralsConfig, ProxyConfig, ProxyScope, QdrantConfig,
QueryClassificationConfig, ReliabilityConfig, ResourceLimitsConfig, RuntimeConfig,
SandboxBackend, SandboxConfig, SchedulerConfig, SecretsConfig, SecurityConfig, SkillsConfig,
SkillsPromptInjectionMode, SlackConfig, StorageConfig, StorageProviderConfig,
StorageProviderSection, StreamMode, TelegramConfig, TranscriptionConfig, TtsConfig,
TunnelConfig, WebFetchConfig, WebSearchConfig, WebhookConfig,
};
+274 -4
View File
@@ -97,6 +97,17 @@ pub struct Config {
#[serde(default = "default_provider_timeout_secs")]
pub provider_timeout_secs: u64,
/// Extra HTTP headers to include in LLM provider API requests.
///
/// Some providers require specific headers (e.g., `User-Agent`, `HTTP-Referer`,
/// `X-Title`) for request routing or policy enforcement. Headers defined here
/// augment (and override) the program's default headers.
///
/// Can also be set via `ZEROCLAW_EXTRA_HEADERS` environment variable using
/// the format `Key:Value,Key2:Value2`. Env var headers override config file headers.
#[serde(default)]
pub extra_headers: HashMap<String, String>,
/// Observability backend configuration (`[observability]`).
#[serde(default)]
pub observability: ObservabilityConfig,
@@ -232,6 +243,10 @@ pub struct Config {
/// Text-to-Speech configuration (`[tts]`).
#[serde(default)]
pub tts: TtsConfig,
/// External MCP server connections (`[mcp]`).
#[serde(default, alias = "mcpServers")]
pub mcp: McpConfig,
}
/// Named provider profile definition compatible with Codex app-server style config.
@@ -455,6 +470,60 @@ impl Default for TranscriptionConfig {
}
}
// ── MCP ─────────────────────────────────────────────────────────
/// Transport type for MCP server connections.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq, Default)]
#[serde(rename_all = "lowercase")]
pub enum McpTransport {
/// Spawn a local process and communicate over stdin/stdout.
#[default]
Stdio,
/// Connect via HTTP POST.
Http,
/// Connect via HTTP + Server-Sent Events.
Sse,
}
/// Configuration for a single external MCP server.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
pub struct McpServerConfig {
/// Display name used as a tool prefix (`<server>__<tool>`).
pub name: String,
/// Transport type (default: stdio).
#[serde(default)]
pub transport: McpTransport,
/// URL for HTTP/SSE transports.
#[serde(default)]
pub url: Option<String>,
/// Executable to spawn for stdio transport.
#[serde(default)]
pub command: String,
/// Command arguments for stdio transport.
#[serde(default)]
pub args: Vec<String>,
/// Optional environment variables for stdio transport.
#[serde(default)]
pub env: HashMap<String, String>,
/// Optional HTTP headers for HTTP/SSE transports.
#[serde(default)]
pub headers: HashMap<String, String>,
/// Optional per-call timeout in seconds (hard capped in validation).
#[serde(default)]
pub tool_timeout_secs: Option<u64>,
}
/// External MCP client configuration (`[mcp]` section).
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
pub struct McpConfig {
/// Enable MCP tool loading.
#[serde(default)]
pub enabled: bool,
/// Configured MCP servers.
#[serde(default, alias = "mcpServers")]
pub servers: Vec<McpServerConfig>,
}
// ── TTS (Text-to-Speech) ─────────────────────────────────────────
fn default_tts_provider() -> String {
@@ -1634,6 +1703,65 @@ fn service_selector_matches(selector: &str, service_key: &str) -> bool {
false
}
const MCP_MAX_TOOL_TIMEOUT_SECS: u64 = 600;
fn validate_mcp_config(config: &McpConfig) -> Result<()> {
let mut seen_names = std::collections::HashSet::new();
for (i, server) in config.servers.iter().enumerate() {
let name = server.name.trim();
if name.is_empty() {
anyhow::bail!("mcp.servers[{i}].name must not be empty");
}
if !seen_names.insert(name.to_ascii_lowercase()) {
anyhow::bail!("mcp.servers contains duplicate name: {name}");
}
if let Some(timeout) = server.tool_timeout_secs {
if timeout == 0 {
anyhow::bail!("mcp.servers[{i}].tool_timeout_secs must be greater than 0");
}
if timeout > MCP_MAX_TOOL_TIMEOUT_SECS {
anyhow::bail!(
"mcp.servers[{i}].tool_timeout_secs exceeds max {MCP_MAX_TOOL_TIMEOUT_SECS}"
);
}
}
match server.transport {
McpTransport::Stdio => {
if server.command.trim().is_empty() {
anyhow::bail!(
"mcp.servers[{i}] with transport=stdio requires non-empty command"
);
}
}
McpTransport::Http | McpTransport::Sse => {
let url = server
.url
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.ok_or_else(|| {
anyhow::anyhow!(
"mcp.servers[{i}] with transport={} requires url",
match server.transport {
McpTransport::Http => "http",
McpTransport::Sse => "sse",
McpTransport::Stdio => "stdio",
}
)
})?;
let parsed = reqwest::Url::parse(url)
.with_context(|| format!("mcp.servers[{i}].url is not a valid URL"))?;
if !matches!(parsed.scheme(), "http" | "https") {
anyhow::bail!("mcp.servers[{i}].url must use http/https");
}
}
}
}
Ok(())
}
fn validate_proxy_url(field: &str, url: &str) -> Result<()> {
let parsed = reqwest::Url::parse(url)
.with_context(|| format!("Invalid {field} URL: '{url}' is not a valid URL"))?;
@@ -3851,6 +3979,7 @@ impl Default for Config {
model_providers: HashMap::new(),
default_temperature: default_temperature(),
provider_timeout_secs: default_provider_timeout_secs(),
extra_headers: HashMap::new(),
observability: ObservabilityConfig::default(),
autonomy: AutonomyConfig::default(),
security: SecurityConfig::default(),
@@ -3885,6 +4014,7 @@ impl Default for Config {
query_classification: QueryClassificationConfig::default(),
transcription: TranscriptionConfig::default(),
tts: TtsConfig::default(),
mcp: McpConfig::default(),
}
}
}
@@ -3960,7 +4090,8 @@ async fn load_persisted_workspace_dirs(
return Ok(None);
}
let parsed_dir = PathBuf::from(raw_config_dir);
let expanded_dir = shellexpand::tilde(raw_config_dir);
let parsed_dir = PathBuf::from(expanded_dir.as_ref());
let config_dir = if parsed_dir.is_absolute() {
parsed_dir
} else {
@@ -4103,7 +4234,7 @@ async fn resolve_runtime_config_dirs(
if let Ok(custom_config_dir) = std::env::var("ZEROCLAW_CONFIG_DIR") {
let custom_config_dir = custom_config_dir.trim();
if !custom_config_dir.is_empty() {
let zeroclaw_dir = PathBuf::from(custom_config_dir);
let zeroclaw_dir = PathBuf::from(shellexpand::tilde(custom_config_dir).as_ref());
return Ok((
zeroclaw_dir.clone(),
zeroclaw_dir.join("workspace"),
@@ -4114,8 +4245,9 @@ async fn resolve_runtime_config_dirs(
if let Ok(custom_workspace) = std::env::var("ZEROCLAW_WORKSPACE") {
if !custom_workspace.is_empty() {
let expanded = shellexpand::tilde(&custom_workspace);
let (zeroclaw_dir, workspace_dir) =
resolve_config_dir_for_workspace(&PathBuf::from(custom_workspace));
resolve_config_dir_for_workspace(&PathBuf::from(expanded.as_ref()));
return Ok((
zeroclaw_dir,
workspace_dir,
@@ -4237,6 +4369,34 @@ fn has_ollama_cloud_credential(config_api_key: Option<&str>) -> bool {
})
}
/// Parse the `ZEROCLAW_EXTRA_HEADERS` environment variable value.
///
/// Format: `Key:Value,Key2:Value2`
///
/// Entries without a colon or with an empty key are silently skipped.
/// Leading/trailing whitespace on both key and value is trimmed.
pub fn parse_extra_headers_env(raw: &str) -> Vec<(String, String)> {
let mut result = Vec::new();
for entry in raw.split(',') {
let entry = entry.trim();
if entry.is_empty() {
continue;
}
if let Some((key, value)) = entry.split_once(':') {
let key = key.trim();
let value = value.trim();
if key.is_empty() {
tracing::warn!("Ignoring extra header with empty name in ZEROCLAW_EXTRA_HEADERS");
continue;
}
result.push((key.to_string(), value.to_string()));
} else {
tracing::warn!("Ignoring malformed extra header entry (missing ':'): {entry}");
}
}
result
}
fn normalize_wire_api(raw: &str) -> Option<&'static str> {
match raw.trim().to_ascii_lowercase().as_str() {
"responses" | "openai-responses" | "open-ai-responses" => Some("responses"),
@@ -4844,6 +5004,11 @@ impl Config {
}
}
// MCP
if self.mcp.enabled {
validate_mcp_config(&self.mcp)?;
}
// Proxy (delegate to existing validation)
self.proxy.validate()?;
@@ -4916,14 +5081,24 @@ impl Config {
}
}
// Extra provider headers: ZEROCLAW_EXTRA_HEADERS
// Format: "Key:Value,Key2:Value2"
// Env var headers override config file headers with the same name.
if let Ok(raw) = std::env::var("ZEROCLAW_EXTRA_HEADERS") {
for header in parse_extra_headers_env(&raw) {
self.extra_headers.insert(header.0, header.1);
}
}
// Apply named provider profile remapping (Codex app-server compatibility).
self.apply_named_model_provider_profile();
// Workspace directory: ZEROCLAW_WORKSPACE
if let Ok(workspace) = std::env::var("ZEROCLAW_WORKSPACE") {
if !workspace.is_empty() {
let expanded = shellexpand::tilde(&workspace);
let (_, workspace_dir) =
resolve_config_dir_for_workspace(&PathBuf::from(workspace));
resolve_config_dir_for_workspace(&PathBuf::from(expanded.as_ref()));
self.workspace_dir = workspace_dir;
}
}
@@ -5759,6 +5934,7 @@ default_temperature = 0.7
model_providers: HashMap::new(),
default_temperature: 0.5,
provider_timeout_secs: 120,
extra_headers: HashMap::new(),
observability: ObservabilityConfig {
backend: "log".into(),
..ObservabilityConfig::default()
@@ -5850,6 +6026,7 @@ default_temperature = 0.7
hardware: HardwareConfig::default(),
transcription: TranscriptionConfig::default(),
tts: TtsConfig::default(),
mcp: McpConfig::default(),
};
let toml_str = toml::to_string_pretty(&config).unwrap();
@@ -5913,6 +6090,97 @@ provider_timeout_secs = 300
assert_eq!(parsed.provider_timeout_secs, 300);
}
#[test]
async fn parse_extra_headers_env_basic() {
let headers = parse_extra_headers_env("User-Agent:MyApp/1.0,X-Title:zeroclaw");
assert_eq!(headers.len(), 2);
assert_eq!(
headers[0],
("User-Agent".to_string(), "MyApp/1.0".to_string())
);
assert_eq!(headers[1], ("X-Title".to_string(), "zeroclaw".to_string()));
}
#[test]
async fn parse_extra_headers_env_with_url_value() {
let headers =
parse_extra_headers_env("HTTP-Referer:https://github.com/zeroclaw-labs/zeroclaw");
assert_eq!(headers.len(), 1);
// Only splits on first colon, preserving URL colons in value
assert_eq!(headers[0].0, "HTTP-Referer");
assert_eq!(headers[0].1, "https://github.com/zeroclaw-labs/zeroclaw");
}
#[test]
async fn parse_extra_headers_env_empty_string() {
let headers = parse_extra_headers_env("");
assert!(headers.is_empty());
}
#[test]
async fn parse_extra_headers_env_whitespace_trimming() {
let headers = parse_extra_headers_env(" X-Title : zeroclaw , User-Agent : cli/1.0 ");
assert_eq!(headers.len(), 2);
assert_eq!(headers[0], ("X-Title".to_string(), "zeroclaw".to_string()));
assert_eq!(
headers[1],
("User-Agent".to_string(), "cli/1.0".to_string())
);
}
#[test]
async fn parse_extra_headers_env_skips_malformed() {
let headers = parse_extra_headers_env("X-Valid:value,no-colon-here,Another:ok");
assert_eq!(headers.len(), 2);
assert_eq!(headers[0], ("X-Valid".to_string(), "value".to_string()));
assert_eq!(headers[1], ("Another".to_string(), "ok".to_string()));
}
#[test]
async fn parse_extra_headers_env_skips_empty_key() {
let headers = parse_extra_headers_env(":value,X-Valid:ok");
assert_eq!(headers.len(), 1);
assert_eq!(headers[0], ("X-Valid".to_string(), "ok".to_string()));
}
#[test]
async fn parse_extra_headers_env_allows_empty_value() {
let headers = parse_extra_headers_env("X-Empty:");
assert_eq!(headers.len(), 1);
assert_eq!(headers[0], ("X-Empty".to_string(), String::new()));
}
#[test]
async fn parse_extra_headers_env_trailing_comma() {
let headers = parse_extra_headers_env("X-Title:zeroclaw,");
assert_eq!(headers.len(), 1);
assert_eq!(headers[0], ("X-Title".to_string(), "zeroclaw".to_string()));
}
#[test]
async fn extra_headers_parses_from_toml() {
let raw = r#"
default_temperature = 0.7
[extra_headers]
User-Agent = "MyApp/1.0"
X-Title = "zeroclaw"
"#;
let parsed: Config = toml::from_str(raw).unwrap();
assert_eq!(parsed.extra_headers.len(), 2);
assert_eq!(parsed.extra_headers.get("User-Agent").unwrap(), "MyApp/1.0");
assert_eq!(parsed.extra_headers.get("X-Title").unwrap(), "zeroclaw");
}
#[test]
async fn extra_headers_defaults_to_empty() {
let raw = r#"
default_temperature = 0.7
"#;
let parsed: Config = toml::from_str(raw).unwrap();
assert!(parsed.extra_headers.is_empty());
}
#[test]
async fn storage_provider_dburl_alias_deserializes() {
let raw = r#"
@@ -6012,6 +6280,7 @@ tool_dispatcher = "xml"
model_providers: HashMap::new(),
default_temperature: 0.9,
provider_timeout_secs: 120,
extra_headers: HashMap::new(),
observability: ObservabilityConfig::default(),
autonomy: AutonomyConfig::default(),
security: SecurityConfig::default(),
@@ -6046,6 +6315,7 @@ tool_dispatcher = "xml"
hardware: HardwareConfig::default(),
transcription: TranscriptionConfig::default(),
tts: TtsConfig::default(),
mcp: McpConfig::default(),
};
config.save().await.unwrap();
+21 -9
View File
@@ -352,6 +352,7 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
secrets_encrypt: config.secrets.encrypt,
reasoning_enabled: config.runtime.reasoning_enabled,
provider_timeout_secs: Some(config.provider_timeout_secs),
extra_headers: config.extra_headers.clone(),
},
)?);
let model = config
@@ -748,15 +749,25 @@ const PROMETHEUS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8"
/// GET /metrics — Prometheus text exposition format
async fn handle_metrics(State(state): State<AppState>) -> impl IntoResponse {
let body = if let Some(prom) = state
.observer
.as_ref()
.as_any()
.downcast_ref::<crate::observability::PrometheusObserver>()
{
prom.encode()
} else {
String::from("# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n")
let body = {
#[cfg(feature = "metrics")]
{
if let Some(prom) = state
.observer
.as_ref()
.as_any()
.downcast_ref::<crate::observability::PrometheusObserver>()
{
prom.encode()
} else {
String::from("# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n")
}
}
#[cfg(not(feature = "metrics"))]
{
let _ = &state;
String::from("# Prometheus backend not enabled. Set [observability] backend = \"prometheus\" in config.\n")
}
};
(
@@ -1739,6 +1750,7 @@ mod tests {
assert!(text.contains("Prometheus backend not enabled"));
}
#[cfg(feature = "metrics")]
#[tokio::test]
async fn metrics_endpoint_renders_prometheus_output() {
let prom = Arc::new(crate::observability::PrometheusObserver::new());
+21 -2
View File
@@ -3,6 +3,7 @@ pub mod multi;
pub mod noop;
#[cfg(feature = "observability-otel")]
pub mod otel;
#[cfg(feature = "metrics")]
pub mod prometheus;
pub mod runtime_trace;
pub mod traits;
@@ -15,6 +16,7 @@ pub use self::multi::MultiObserver;
pub use noop::NoopObserver;
#[cfg(feature = "observability-otel")]
pub use otel::OtelObserver;
#[cfg(feature = "metrics")]
pub use prometheus::PrometheusObserver;
pub use traits::{Observer, ObserverEvent};
#[allow(unused_imports)]
@@ -26,7 +28,19 @@ use crate::config::ObservabilityConfig;
pub fn create_observer(config: &ObservabilityConfig) -> Box<dyn Observer> {
match config.backend.as_str() {
"log" => Box::new(LogObserver::new()),
"prometheus" => Box::new(PrometheusObserver::new()),
"prometheus" => {
#[cfg(feature = "metrics")]
{
Box::new(PrometheusObserver::new())
}
#[cfg(not(feature = "metrics"))]
{
tracing::warn!(
"Prometheus backend requested but this build was compiled without `metrics`; falling back to noop."
);
Box::new(NoopObserver)
}
}
"otel" | "opentelemetry" | "otlp" => {
#[cfg(feature = "observability-otel")]
match OtelObserver::new(
@@ -104,7 +118,12 @@ mod tests {
backend: "prometheus".into(),
..ObservabilityConfig::default()
};
assert_eq!(create_observer(&cfg).name(), "prometheus");
let expected = if cfg!(feature = "metrics") {
"prometheus"
} else {
"noop"
};
assert_eq!(create_observer(&cfg).name(), expected);
}
#[test]
+7 -2
View File
@@ -139,6 +139,7 @@ pub async fn run_wizard(force: bool) -> Result<Config> {
model_providers: std::collections::HashMap::new(),
default_temperature: 0.7,
provider_timeout_secs: 120,
extra_headers: std::collections::HashMap::new(),
observability: ObservabilityConfig::default(),
autonomy: AutonomyConfig::default(),
security: crate::config::SecurityConfig::default(),
@@ -173,6 +174,7 @@ pub async fn run_wizard(force: bool) -> Result<Config> {
query_classification: crate::config::QueryClassificationConfig::default(),
transcription: crate::config::TranscriptionConfig::default(),
tts: crate::config::TtsConfig::default(),
mcp: crate::config::McpConfig::default(),
};
println!(
@@ -423,7 +425,7 @@ fn resolve_quick_setup_dirs_with_home(home: &Path) -> (PathBuf, PathBuf) {
if let Ok(custom_config_dir) = std::env::var("ZEROCLAW_CONFIG_DIR") {
let trimmed = custom_config_dir.trim();
if !trimmed.is_empty() {
let config_dir = PathBuf::from(trimmed);
let config_dir = PathBuf::from(shellexpand::tilde(trimmed).as_ref());
return (config_dir.clone(), config_dir.join("workspace"));
}
}
@@ -431,8 +433,9 @@ fn resolve_quick_setup_dirs_with_home(home: &Path) -> (PathBuf, PathBuf) {
if let Ok(custom_workspace) = std::env::var("ZEROCLAW_WORKSPACE") {
let trimmed = custom_workspace.trim();
if !trimmed.is_empty() {
let expanded = shellexpand::tilde(trimmed);
return crate::config::schema::resolve_config_dir_for_workspace(&PathBuf::from(
trimmed,
expanded.as_ref(),
));
}
}
@@ -492,6 +495,7 @@ async fn run_quick_setup_with_home(
model_providers: std::collections::HashMap::new(),
default_temperature: 0.7,
provider_timeout_secs: 120,
extra_headers: std::collections::HashMap::new(),
observability: ObservabilityConfig::default(),
autonomy: AutonomyConfig::default(),
security: crate::config::SecurityConfig::default(),
@@ -526,6 +530,7 @@ async fn run_quick_setup_with_home(
query_classification: crate::config::QueryClassificationConfig::default(),
transcription: crate::config::TranscriptionConfig::default(),
tts: crate::config::TtsConfig::default(),
mcp: crate::config::McpConfig::default(),
};
config.save().await?;
+94 -4
View File
@@ -39,6 +39,8 @@ pub struct OpenAiCompatibleProvider {
native_tool_calling: bool,
/// HTTP request timeout in seconds for LLM API calls. Default: 120.
timeout_secs: u64,
/// Extra HTTP headers to include in all API requests.
extra_headers: std::collections::HashMap<String, String>,
}
/// How the provider expects the API key to be sent.
@@ -173,6 +175,7 @@ impl OpenAiCompatibleProvider {
merge_system_into_user,
native_tool_calling: !merge_system_into_user,
timeout_secs: 120,
extra_headers: std::collections::HashMap::new(),
}
}
@@ -182,6 +185,15 @@ impl OpenAiCompatibleProvider {
self
}
/// Set extra HTTP headers to include in all API requests.
pub fn with_extra_headers(
mut self,
headers: std::collections::HashMap<String, String>,
) -> Self {
self.extra_headers = headers;
self
}
/// Collect all `system` role messages, concatenate their content,
/// and prepend to the first `user` message. Drop all system messages.
/// Used for providers (e.g. MiniMax) that reject `role: system`.
@@ -215,10 +227,28 @@ impl OpenAiCompatibleProvider {
fn http_client(&self) -> Client {
let timeout = self.timeout_secs;
if let Some(ua) = self.user_agent.as_deref() {
let has_user_agent = self.user_agent.is_some();
let has_extra_headers = !self.extra_headers.is_empty();
if has_user_agent || has_extra_headers {
let mut headers = HeaderMap::new();
if let Ok(value) = HeaderValue::from_str(ua) {
headers.insert(USER_AGENT, value);
if let Some(ua) = self.user_agent.as_deref() {
if let Ok(value) = HeaderValue::from_str(ua) {
headers.insert(USER_AGENT, value);
}
}
for (key, value) in &self.extra_headers {
match (
reqwest::header::HeaderName::from_bytes(key.as_bytes()),
HeaderValue::from_str(value),
) {
(Ok(name), Ok(val)) => {
headers.insert(name, val);
}
_ => {
tracing::warn!(header = key, "Skipping invalid extra header name or value");
}
}
}
let builder = Client::builder()
@@ -229,7 +259,9 @@ impl OpenAiCompatibleProvider {
crate::config::apply_runtime_proxy_to_builder(builder, "provider.compatible");
return builder.build().unwrap_or_else(|error| {
tracing::warn!("Failed to build proxied timeout client with user-agent: {error}");
tracing::warn!(
"Failed to build proxied timeout client with custom headers: {error}"
);
Client::new()
});
}
@@ -2921,4 +2953,62 @@ mod tests {
let p = make_provider("test", "https://example.com", None).with_timeout_secs(300);
assert_eq!(p.timeout_secs, 300);
}
#[test]
fn extra_headers_default_empty() {
let p = make_provider("test", "https://example.com", None);
assert!(p.extra_headers.is_empty());
}
#[test]
fn with_extra_headers_sets_headers() {
let mut headers = std::collections::HashMap::new();
headers.insert("X-Title".to_string(), "zeroclaw".to_string());
headers.insert(
"HTTP-Referer".to_string(),
"https://example.com".to_string(),
);
let p = make_provider("test", "https://example.com", None).with_extra_headers(headers);
assert_eq!(p.extra_headers.len(), 2);
assert_eq!(p.extra_headers.get("X-Title").unwrap(), "zeroclaw");
assert_eq!(
p.extra_headers.get("HTTP-Referer").unwrap(),
"https://example.com"
);
}
#[test]
fn http_client_with_extra_headers_builds_successfully() {
let mut headers = std::collections::HashMap::new();
headers.insert("X-Title".to_string(), "zeroclaw".to_string());
headers.insert("User-Agent".to_string(), "TestAgent/1.0".to_string());
let p = make_provider("test", "https://example.com", None).with_extra_headers(headers);
// Should not panic
let _client = p.http_client();
}
#[test]
fn http_client_without_extra_headers_or_user_agent() {
let p = make_provider("test", "https://example.com", None);
// Should use the cached proxy client path
let _client = p.http_client();
}
#[test]
fn extra_headers_combined_with_user_agent() {
let mut headers = std::collections::HashMap::new();
headers.insert("X-Title".to_string(), "zeroclaw".to_string());
let p = OpenAiCompatibleProvider::new_with_user_agent(
"test",
"https://example.com",
None,
AuthStyle::Bearer,
"CustomAgent/1.0",
)
.with_extra_headers(headers);
assert_eq!(p.user_agent.as_deref(), Some("CustomAgent/1.0"));
assert_eq!(p.extra_headers.len(), 1);
// Should not panic
let _client = p.http_client();
}
}
+32 -5
View File
@@ -680,6 +680,9 @@ pub struct ProviderRuntimeOptions {
/// HTTP request timeout in seconds for LLM provider API calls.
/// `None` uses the provider's built-in default (120s for compatible providers).
pub provider_timeout_secs: Option<u64>,
/// Extra HTTP headers to include in provider API requests.
/// These are merged from the config file and `ZEROCLAW_EXTRA_HEADERS` env var.
pub extra_headers: std::collections::HashMap<String, String>,
}
impl Default for ProviderRuntimeOptions {
@@ -691,6 +694,7 @@ impl Default for ProviderRuntimeOptions {
secrets_encrypt: true,
reasoning_enabled: None,
provider_timeout_secs: None,
extra_headers: std::collections::HashMap::new(),
}
}
}
@@ -997,15 +1001,20 @@ fn create_provider_with_url_and_options(
api_url: Option<&str>,
options: &ProviderRuntimeOptions,
) -> anyhow::Result<Box<dyn Provider>> {
// Closure to optionally apply the configured provider timeout to
// OpenAI-compatible providers before boxing them as trait objects.
// Closure to optionally apply the configured provider timeout and extra
// headers to OpenAI-compatible providers before boxing them as trait objects.
let compat = {
let timeout = options.provider_timeout_secs;
let extra_headers = options.extra_headers.clone();
move |p: OpenAiCompatibleProvider| -> Box<dyn Provider> {
match timeout {
Some(t) => Box::new(p.with_timeout_secs(t)),
None => Box::new(p),
let mut p = p;
if let Some(t) = timeout {
p = p.with_timeout_secs(t);
}
if !extra_headers.is_empty() {
p = p.with_extra_headers(extra_headers.clone());
}
Box::new(p)
}
};
@@ -3069,4 +3078,22 @@ mod tests {
assert_eq!(check_api_key_prefix("openai", "my-custom-key-123"), None);
assert_eq!(check_api_key_prefix("anthropic", "some-random-key"), None);
}
#[test]
fn provider_runtime_options_default_has_empty_extra_headers() {
let options = ProviderRuntimeOptions::default();
assert!(options.extra_headers.is_empty());
}
#[test]
fn provider_runtime_options_extra_headers_passed_through() {
let mut extra_headers = std::collections::HashMap::new();
extra_headers.insert("X-Title".to_string(), "zeroclaw".to_string());
let options = ProviderRuntimeOptions {
extra_headers,
..ProviderRuntimeOptions::default()
};
assert_eq!(options.extra_headers.len(), 1);
assert_eq!(options.extra_headers.get("X-Title").unwrap(), "zeroclaw");
}
}
+81 -9
View File
@@ -27,7 +27,7 @@ struct ChatRequest {
tools: Option<Vec<serde_json::Value>>,
}
#[derive(Debug, Serialize)]
#[derive(Debug, Clone, Serialize)]
struct Message {
role: String,
#[serde(skip_serializing_if = "Option::is_none")]
@@ -40,14 +40,14 @@ struct Message {
tool_name: Option<String>,
}
#[derive(Debug, Serialize)]
#[derive(Debug, Clone, Serialize)]
struct OutgoingToolCall {
#[serde(rename = "type")]
kind: String,
function: OutgoingFunction,
}
#[derive(Debug, Serialize)]
#[derive(Debug, Clone, Serialize)]
struct OutgoingFunction {
name: String,
arguments: serde_json::Value,
@@ -258,13 +258,31 @@ impl OllamaProvider {
model: &str,
temperature: f64,
tools: Option<&[serde_json::Value]>,
) -> ChatRequest {
self.build_chat_request_with_think(
messages,
model,
temperature,
tools,
self.reasoning_enabled,
)
}
/// Build a chat request with an explicit `think` value.
fn build_chat_request_with_think(
&self,
messages: Vec<Message>,
model: &str,
temperature: f64,
tools: Option<&[serde_json::Value]>,
think: Option<bool>,
) -> ChatRequest {
ChatRequest {
model: model.to_string(),
messages,
stream: false,
options: Options { temperature },
think: self.reasoning_enabled,
think,
tools: tools.map(|t| t.to_vec()),
}
}
@@ -396,17 +414,18 @@ impl OllamaProvider {
.collect()
}
/// Send a request to Ollama and get the parsed response.
/// Pass `tools` to enable native function-calling for models that support it.
async fn send_request(
/// Send a single HTTP request to Ollama and parse the response.
async fn send_request_inner(
&self,
messages: Vec<Message>,
messages: &[Message],
model: &str,
temperature: f64,
should_auth: bool,
tools: Option<&[serde_json::Value]>,
think: Option<bool>,
) -> anyhow::Result<ApiChatResponse> {
let request = self.build_chat_request(messages, model, temperature, tools);
let request =
self.build_chat_request_with_think(messages.to_vec(), model, temperature, tools, think);
let url = format!("{}/api/chat", self.base_url);
@@ -466,6 +485,59 @@ impl OllamaProvider {
Ok(chat_response)
}
/// Send a request to Ollama and get the parsed response.
/// Pass `tools` to enable native function-calling for models that support it.
///
/// When `reasoning_enabled` (`think`) is set to `true`, the first request
/// includes `think: true`. If that request fails (the model may not support
/// the `think` parameter), we automatically retry once with `think` omitted
/// so the call succeeds instead of entering an infinite retry loop.
async fn send_request(
&self,
messages: Vec<Message>,
model: &str,
temperature: f64,
should_auth: bool,
tools: Option<&[serde_json::Value]>,
) -> anyhow::Result<ApiChatResponse> {
let result = self
.send_request_inner(
&messages,
model,
temperature,
should_auth,
tools,
self.reasoning_enabled,
)
.await;
match result {
Ok(resp) => Ok(resp),
Err(first_err) if self.reasoning_enabled == Some(true) => {
tracing::warn!(
model = model,
error = %first_err,
"Ollama request failed with think=true; retrying without reasoning \
(model may not support it)"
);
// Retry with think omitted from the request entirely.
self.send_request_inner(&messages, model, temperature, should_auth, tools, None)
.await
.map_err(|retry_err| {
// Both attempts failed — return the original error for clarity.
tracing::error!(
model = model,
original_error = %first_err,
retry_error = %retry_err,
"Ollama request also failed without think; returning original error"
);
first_err
})
}
Err(e) => Err(e),
}
}
/// Convert Ollama tool calls to the JSON format expected by parse_tool_calls in loop_.rs
///
/// Handles quirky model behavior where tool calls are wrapped:
+1
View File
@@ -1018,6 +1018,7 @@ data: [DONE]
auth_profile_override: None,
reasoning_enabled: None,
provider_timeout_secs: None,
extra_headers: std::collections::HashMap::new(),
};
let provider =
OpenAiCodexProvider::new(&options, None).expect("provider should initialize");
+357
View File
@@ -0,0 +1,357 @@
//! MCP (Model Context Protocol) client — connects to external tool servers.
//!
//! Supports multiple transports: stdio (spawn local process), HTTP, and SSE.
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use anyhow::{anyhow, bail, Context, Result};
use serde_json::json;
use tokio::sync::Mutex;
use tokio::time::{timeout, Duration};
use crate::config::schema::McpServerConfig;
use crate::tools::mcp_protocol::{
JsonRpcRequest, McpToolDef, McpToolsListResult, MCP_PROTOCOL_VERSION,
};
use crate::tools::mcp_transport::{create_transport, McpTransportConn};
/// Timeout for receiving a response from an MCP server during init/list.
/// Prevents a hung server from blocking the daemon indefinitely.
const RECV_TIMEOUT_SECS: u64 = 30;
/// Default timeout for tool calls (seconds) when not configured per-server.
const DEFAULT_TOOL_TIMEOUT_SECS: u64 = 180;
/// Maximum allowed tool call timeout (seconds) — hard safety ceiling.
const MAX_TOOL_TIMEOUT_SECS: u64 = 600;
// ── Internal server state ──────────────────────────────────────────────────
struct McpServerInner {
config: McpServerConfig,
transport: Box<dyn McpTransportConn>,
next_id: AtomicU64,
tools: Vec<McpToolDef>,
}
// ── McpServer ──────────────────────────────────────────────────────────────
/// A live connection to one MCP server (any transport).
#[derive(Clone)]
pub struct McpServer {
inner: Arc<Mutex<McpServerInner>>,
}
impl McpServer {
/// Connect to the server, perform the initialize handshake, and fetch the tool list.
pub async fn connect(config: McpServerConfig) -> Result<Self> {
// Create transport based on config
let mut transport = create_transport(&config).with_context(|| {
format!(
"failed to create transport for MCP server `{}`",
config.name
)
})?;
// Initialize handshake
let id = 1u64;
let init_req = JsonRpcRequest::new(
id,
"initialize",
json!({
"protocolVersion": MCP_PROTOCOL_VERSION,
"capabilities": {},
"clientInfo": {
"name": "zeroclaw",
"version": env!("CARGO_PKG_VERSION")
}
}),
);
let init_resp = timeout(
Duration::from_secs(RECV_TIMEOUT_SECS),
transport.send_and_recv(&init_req),
)
.await
.with_context(|| {
format!(
"MCP server `{}` timed out after {}s waiting for initialize response",
config.name, RECV_TIMEOUT_SECS
)
})??;
if init_resp.error.is_some() {
bail!(
"MCP server `{}` rejected initialize: {:?}",
config.name,
init_resp.error
);
}
// Notify server that client is initialized (no response expected for notifications)
let notif = JsonRpcRequest::notification("notifications/initialized", json!({}));
// Best effort - ignore errors for notifications
let _ = transport.send_and_recv(&notif).await;
// Fetch available tools
let id = 2u64;
let list_req = JsonRpcRequest::new(id, "tools/list", json!({}));
let list_resp = timeout(
Duration::from_secs(RECV_TIMEOUT_SECS),
transport.send_and_recv(&list_req),
)
.await
.with_context(|| {
format!(
"MCP server `{}` timed out after {}s waiting for tools/list response",
config.name, RECV_TIMEOUT_SECS
)
})??;
let result = list_resp
.result
.ok_or_else(|| anyhow!("tools/list returned no result from `{}`", config.name))?;
let tool_list: McpToolsListResult = serde_json::from_value(result)
.with_context(|| format!("failed to parse tools/list from `{}`", config.name))?;
let tool_count = tool_list.tools.len();
let inner = McpServerInner {
config,
transport,
next_id: AtomicU64::new(3), // Start at 3 since we used 1 and 2
tools: tool_list.tools,
};
tracing::info!(
"MCP server `{}` connected — {} tool(s) available",
inner.config.name,
tool_count
);
Ok(Self {
inner: Arc::new(Mutex::new(inner)),
})
}
/// Tools advertised by this server.
pub async fn tools(&self) -> Vec<McpToolDef> {
self.inner.lock().await.tools.clone()
}
/// Server display name.
#[allow(dead_code)]
pub async fn name(&self) -> String {
self.inner.lock().await.config.name.clone()
}
/// Call a tool on this server. Returns the raw JSON result.
pub async fn call_tool(
&self,
tool_name: &str,
arguments: serde_json::Value,
) -> Result<serde_json::Value> {
let mut inner = self.inner.lock().await;
let id = inner.next_id.fetch_add(1, Ordering::Relaxed);
let req = JsonRpcRequest::new(
id,
"tools/call",
json!({ "name": tool_name, "arguments": arguments }),
);
// Use per-server tool timeout if configured, otherwise default.
// Cap at MAX_TOOL_TIMEOUT_SECS for safety.
let tool_timeout = inner
.config
.tool_timeout_secs
.unwrap_or(DEFAULT_TOOL_TIMEOUT_SECS)
.min(MAX_TOOL_TIMEOUT_SECS);
let resp = timeout(
Duration::from_secs(tool_timeout),
inner.transport.send_and_recv(&req),
)
.await
.map_err(|_| {
anyhow!(
"MCP server `{}` timed out after {}s during tool call `{tool_name}`",
inner.config.name,
tool_timeout
)
})?
.with_context(|| {
format!(
"MCP server `{}` error during tool call `{tool_name}`",
inner.config.name
)
})?;
if let Some(err) = resp.error {
bail!("MCP tool `{tool_name}` error {}: {}", err.code, err.message);
}
Ok(resp.result.unwrap_or(serde_json::Value::Null))
}
}
// ── McpRegistry ───────────────────────────────────────────────────────────
/// Registry of all connected MCP servers, with a flat tool index.
pub struct McpRegistry {
servers: Vec<McpServer>,
/// prefixed_name -> (server_index, original_tool_name)
tool_index: HashMap<String, (usize, String)>,
}
impl McpRegistry {
/// Connect to all configured servers. Non-fatal: failures are logged and skipped.
pub async fn connect_all(configs: &[McpServerConfig]) -> Result<Self> {
let mut servers = Vec::new();
let mut tool_index = HashMap::new();
for config in configs {
match McpServer::connect(config.clone()).await {
Ok(server) => {
let server_idx = servers.len();
// Collect tools while holding the lock once, then release
let tools = server.tools().await;
for tool in &tools {
// Prefix prevents name collisions across servers
let prefixed = format!("{}__{}", config.name, tool.name);
tool_index.insert(prefixed, (server_idx, tool.name.clone()));
}
servers.push(server);
}
// Non-fatal — log and continue with remaining servers
Err(e) => {
tracing::error!("Failed to connect to MCP server `{}`: {:#}", config.name, e);
}
}
}
Ok(Self {
servers,
tool_index,
})
}
/// All prefixed tool names across all connected servers.
pub fn tool_names(&self) -> Vec<String> {
self.tool_index.keys().cloned().collect()
}
/// Tool definition for a given prefixed name (cloned).
pub async fn get_tool_def(&self, prefixed_name: &str) -> Option<McpToolDef> {
let (server_idx, original_name) = self.tool_index.get(prefixed_name)?;
let inner = self.servers[*server_idx].inner.lock().await;
inner
.tools
.iter()
.find(|t| &t.name == original_name)
.cloned()
}
/// Execute a tool by prefixed name.
pub async fn call_tool(
&self,
prefixed_name: &str,
arguments: serde_json::Value,
) -> Result<String> {
let (server_idx, original_name) = self
.tool_index
.get(prefixed_name)
.ok_or_else(|| anyhow!("unknown MCP tool `{prefixed_name}`"))?;
let result = self.servers[*server_idx]
.call_tool(original_name, arguments)
.await?;
serde_json::to_string_pretty(&result)
.with_context(|| format!("failed to serialize result of MCP tool `{prefixed_name}`"))
}
pub fn is_empty(&self) -> bool {
self.servers.is_empty()
}
pub fn server_count(&self) -> usize {
self.servers.len()
}
pub fn tool_count(&self) -> usize {
self.tool_index.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::schema::McpTransport;
#[test]
fn tool_name_prefix_format() {
let prefixed = format!("{}__{}", "filesystem", "read_file");
assert_eq!(prefixed, "filesystem__read_file");
}
#[tokio::test]
async fn connect_nonexistent_command_fails_cleanly() {
// A command that doesn't exist should fail at spawn, not panic.
let config = McpServerConfig {
name: "nonexistent".to_string(),
command: "/usr/bin/this_binary_does_not_exist_zeroclaw_test".to_string(),
args: vec![],
env: HashMap::default(),
tool_timeout_secs: None,
transport: McpTransport::Stdio,
url: None,
headers: HashMap::default(),
};
let result = McpServer::connect(config).await;
assert!(result.is_err());
let msg = result.err().unwrap().to_string();
assert!(msg.contains("failed to create transport"), "got: {msg}");
}
#[tokio::test]
async fn connect_all_nonfatal_on_single_failure() {
// If one server config is bad, connect_all should succeed (with 0 servers).
let configs = vec![McpServerConfig {
name: "bad".to_string(),
command: "/usr/bin/does_not_exist_zc_test".to_string(),
args: vec![],
env: HashMap::default(),
tool_timeout_secs: None,
transport: McpTransport::Stdio,
url: None,
headers: HashMap::default(),
}];
let registry = McpRegistry::connect_all(&configs)
.await
.expect("connect_all should not fail");
assert!(registry.is_empty());
assert_eq!(registry.tool_count(), 0);
}
#[test]
fn http_transport_requires_url() {
let config = McpServerConfig {
name: "test".into(),
transport: McpTransport::Http,
..Default::default()
};
let result = create_transport(&config);
assert!(result.is_err());
}
#[test]
fn sse_transport_requires_url() {
let config = McpServerConfig {
name: "test".into(),
transport: McpTransport::Sse,
..Default::default()
};
let result = create_transport(&config);
assert!(result.is_err());
}
}
+130
View File
@@ -0,0 +1,130 @@
//! MCP (Model Context Protocol) JSON-RPC 2.0 protocol types.
//! Protocol version: 2024-11-05
//! Adapted from ops-mcp-server/src/protocol.rs for client use.
//! Both Serialize and Deserialize are derived — the client both sends (Serialize)
//! and receives (Deserialize) JSON-RPC messages.
use serde::{Deserialize, Serialize};
pub const JSONRPC_VERSION: &str = "2.0";
pub const MCP_PROTOCOL_VERSION: &str = "2024-11-05";
// Standard JSON-RPC 2.0 error codes
#[allow(dead_code)]
pub const PARSE_ERROR: i32 = -32700;
#[allow(dead_code)]
pub const INVALID_REQUEST: i32 = -32600;
#[allow(dead_code)]
pub const METHOD_NOT_FOUND: i32 = -32601;
#[allow(dead_code)]
pub const INVALID_PARAMS: i32 = -32602;
pub const INTERNAL_ERROR: i32 = -32603;
/// Outbound JSON-RPC request (client -> MCP server).
/// Used for both method calls (with id) and notifications (id = None).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcRequest {
pub jsonrpc: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<serde_json::Value>,
pub method: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub params: Option<serde_json::Value>,
}
impl JsonRpcRequest {
/// Create a method call request with a numeric id.
pub fn new(id: u64, method: impl Into<String>, params: serde_json::Value) -> Self {
Self {
jsonrpc: JSONRPC_VERSION.to_string(),
id: Some(serde_json::Value::Number(id.into())),
method: method.into(),
params: Some(params),
}
}
/// Create a notification — no id, no response expected from server.
pub fn notification(method: impl Into<String>, params: serde_json::Value) -> Self {
Self {
jsonrpc: JSONRPC_VERSION.to_string(),
id: None,
method: method.into(),
params: Some(params),
}
}
}
/// Inbound JSON-RPC response (MCP server -> client).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcResponse {
pub jsonrpc: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<JsonRpcError>,
}
/// JSON-RPC error object embedded in a response.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcError {
pub code: i32,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<serde_json::Value>,
}
/// A tool advertised by an MCP server (from `tools/list` response).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpToolDef {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(rename = "inputSchema")]
pub input_schema: serde_json::Value,
}
/// Expected shape of the `tools/list` result payload.
#[derive(Debug, Deserialize)]
pub struct McpToolsListResult {
pub tools: Vec<McpToolDef>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn request_serializes_with_id() {
let req = JsonRpcRequest::new(1, "tools/list", serde_json::json!({}));
let s = serde_json::to_string(&req).unwrap();
assert!(s.contains("\"id\":1"));
assert!(s.contains("\"method\":\"tools/list\""));
assert!(s.contains("\"jsonrpc\":\"2.0\""));
}
#[test]
fn notification_omits_id() {
let notif =
JsonRpcRequest::notification("notifications/initialized", serde_json::json!({}));
let s = serde_json::to_string(&notif).unwrap();
assert!(!s.contains("\"id\""));
}
#[test]
fn response_deserializes() {
let json = r#"{"jsonrpc":"2.0","id":1,"result":{"tools":[]}}"#;
let resp: JsonRpcResponse = serde_json::from_str(json).unwrap();
assert!(resp.result.is_some());
assert!(resp.error.is_none());
}
#[test]
fn tool_def_deserializes_input_schema() {
let json = r#"{"name":"read_file","description":"Read a file","inputSchema":{"type":"object","properties":{"path":{"type":"string"}}}}"#;
let def: McpToolDef = serde_json::from_str(json).unwrap();
assert_eq!(def.name, "read_file");
assert!(def.input_schema.is_object());
}
}
+68
View File
@@ -0,0 +1,68 @@
//! Wraps a discovered MCP tool as a zeroclaw [`Tool`] so it is dispatched
//! through the existing tool registry and agent loop without modification.
use std::sync::Arc;
use async_trait::async_trait;
use crate::tools::mcp_client::McpRegistry;
use crate::tools::mcp_protocol::McpToolDef;
use crate::tools::traits::{Tool, ToolResult};
/// A zeroclaw [`Tool`] backed by an MCP server tool.
///
/// The `prefixed_name` (e.g. `filesystem__read_file`) is what the agent loop
/// sees. The registry knows how to route it to the correct server.
pub struct McpToolWrapper {
/// Prefixed name: `<server_name>__<tool_name>`.
prefixed_name: String,
/// Description extracted from the MCP tool definition. Stored as an owned
/// String so that `description()` can return `&str` with self's lifetime.
description: String,
/// JSON schema for the tool's input parameters.
input_schema: serde_json::Value,
/// Shared registry — used to dispatch actual tool calls.
registry: Arc<McpRegistry>,
}
impl McpToolWrapper {
pub fn new(prefixed_name: String, def: McpToolDef, registry: Arc<McpRegistry>) -> Self {
let description = def.description.unwrap_or_else(|| "MCP tool".to_string());
Self {
prefixed_name,
description,
input_schema: def.input_schema,
registry,
}
}
}
#[async_trait]
impl Tool for McpToolWrapper {
fn name(&self) -> &str {
&self.prefixed_name
}
fn description(&self) -> &str {
&self.description
}
fn parameters_schema(&self) -> serde_json::Value {
self.input_schema.clone()
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
match self.registry.call_tool(&self.prefixed_name, args).await {
Ok(output) => Ok(ToolResult {
success: true,
output,
error: None,
}),
Err(e) => Ok(ToolResult {
success: false,
output: String::new(),
error: Some(e.to_string()),
}),
}
}
}
+868
View File
@@ -0,0 +1,868 @@
//! MCP transport abstraction — supports stdio, SSE, and HTTP transports.
use std::borrow::Cow;
use anyhow::{anyhow, bail, Context, Result};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::{oneshot, Mutex, Notify};
use tokio::time::{timeout, Duration};
use tokio_stream::StreamExt;
use crate::config::schema::{McpServerConfig, McpTransport};
use crate::tools::mcp_protocol::{JsonRpcError, JsonRpcRequest, JsonRpcResponse, INTERNAL_ERROR};
/// Maximum bytes for a single JSON-RPC response.
const MAX_LINE_BYTES: usize = 4 * 1024 * 1024; // 4 MB
/// Timeout for init/list operations.
const RECV_TIMEOUT_SECS: u64 = 30;
// ── Transport Trait ──────────────────────────────────────────────────────
/// Abstract transport for MCP communication.
#[async_trait::async_trait]
pub trait McpTransportConn: Send + Sync {
/// Send a JSON-RPC request and receive the response.
async fn send_and_recv(&mut self, request: &JsonRpcRequest) -> Result<JsonRpcResponse>;
/// Close the connection.
async fn close(&mut self) -> Result<()>;
}
// ── Stdio Transport ──────────────────────────────────────────────────────
/// Stdio-based transport (spawn local process).
pub struct StdioTransport {
_child: Child,
stdin: tokio::process::ChildStdin,
stdout_lines: tokio::io::Lines<BufReader<tokio::process::ChildStdout>>,
}
impl StdioTransport {
pub fn new(config: &McpServerConfig) -> Result<Self> {
let mut child = Command::new(&config.command)
.args(&config.args)
.envs(&config.env)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit())
.kill_on_drop(true)
.spawn()
.with_context(|| format!("failed to spawn MCP server `{}`", config.name))?;
let stdin = child
.stdin
.take()
.ok_or_else(|| anyhow!("no stdin on MCP server `{}`", config.name))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow!("no stdout on MCP server `{}`", config.name))?;
let stdout_lines = BufReader::new(stdout).lines();
Ok(Self {
_child: child,
stdin,
stdout_lines,
})
}
async fn send_raw(&mut self, line: &str) -> Result<()> {
self.stdin
.write_all(line.as_bytes())
.await
.context("failed to write to MCP server stdin")?;
self.stdin
.write_all(b"\n")
.await
.context("failed to write newline to MCP server stdin")?;
self.stdin.flush().await.context("failed to flush stdin")?;
Ok(())
}
async fn recv_raw(&mut self) -> Result<String> {
let line = self
.stdout_lines
.next_line()
.await?
.ok_or_else(|| anyhow!("MCP server closed stdout"))?;
if line.len() > MAX_LINE_BYTES {
bail!("MCP response too large: {} bytes", line.len());
}
Ok(line)
}
}
#[async_trait::async_trait]
impl McpTransportConn for StdioTransport {
async fn send_and_recv(&mut self, request: &JsonRpcRequest) -> Result<JsonRpcResponse> {
let line = serde_json::to_string(request)?;
self.send_raw(&line).await?;
if request.id.is_none() {
return Ok(JsonRpcResponse {
jsonrpc: crate::tools::mcp_protocol::JSONRPC_VERSION.to_string(),
id: None,
result: None,
error: None,
});
}
let resp_line = timeout(Duration::from_secs(RECV_TIMEOUT_SECS), self.recv_raw())
.await
.context("timeout waiting for MCP response")??;
let resp: JsonRpcResponse = serde_json::from_str(&resp_line)
.with_context(|| format!("invalid JSON-RPC response: {}", resp_line))?;
Ok(resp)
}
async fn close(&mut self) -> Result<()> {
let _ = self.stdin.shutdown().await;
Ok(())
}
}
// ── HTTP Transport ───────────────────────────────────────────────────────
/// HTTP-based transport (POST requests).
pub struct HttpTransport {
url: String,
client: reqwest::Client,
headers: std::collections::HashMap<String, String>,
}
impl HttpTransport {
pub fn new(config: &McpServerConfig) -> Result<Self> {
let url = config
.url
.as_ref()
.ok_or_else(|| anyhow!("URL required for HTTP transport"))?
.clone();
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(120))
.build()
.context("failed to build HTTP client")?;
Ok(Self {
url,
client,
headers: config.headers.clone(),
})
}
}
#[async_trait::async_trait]
impl McpTransportConn for HttpTransport {
async fn send_and_recv(&mut self, request: &JsonRpcRequest) -> Result<JsonRpcResponse> {
let body = serde_json::to_string(request)?;
let mut req = self.client.post(&self.url).body(body);
for (key, value) in &self.headers {
req = req.header(key, value);
}
let resp = req
.send()
.await
.context("HTTP request to MCP server failed")?;
if !resp.status().is_success() {
bail!("MCP server returned HTTP {}", resp.status());
}
if request.id.is_none() {
return Ok(JsonRpcResponse {
jsonrpc: crate::tools::mcp_protocol::JSONRPC_VERSION.to_string(),
id: None,
result: None,
error: None,
});
}
let resp_text = resp.text().await.context("failed to read HTTP response")?;
let mcp_resp: JsonRpcResponse = serde_json::from_str(&resp_text)
.with_context(|| format!("invalid JSON-RPC response: {}", resp_text))?;
Ok(mcp_resp)
}
async fn close(&mut self) -> Result<()> {
Ok(())
}
}
// ── SSE Transport ─────────────────────────────────────────────────────────
/// SSE-based transport (HTTP POST for requests, SSE for responses).
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
enum SseStreamState {
Unknown,
Connected,
Unsupported,
}
pub struct SseTransport {
sse_url: String,
server_name: String,
client: reqwest::Client,
headers: std::collections::HashMap<String, String>,
stream_state: SseStreamState,
shared: std::sync::Arc<Mutex<SseSharedState>>,
notify: std::sync::Arc<Notify>,
shutdown_tx: Option<oneshot::Sender<()>>,
reader_task: Option<tokio::task::JoinHandle<()>>,
}
impl SseTransport {
pub fn new(config: &McpServerConfig) -> Result<Self> {
let sse_url = config
.url
.as_ref()
.ok_or_else(|| anyhow!("URL required for SSE transport"))?
.clone();
let client = reqwest::Client::builder()
.build()
.context("failed to build HTTP client")?;
Ok(Self {
sse_url,
server_name: config.name.clone(),
client,
headers: config.headers.clone(),
stream_state: SseStreamState::Unknown,
shared: std::sync::Arc::new(Mutex::new(SseSharedState::default())),
notify: std::sync::Arc::new(Notify::new()),
shutdown_tx: None,
reader_task: None,
})
}
async fn ensure_connected(&mut self) -> Result<()> {
if self.stream_state == SseStreamState::Unsupported {
return Ok(());
}
if let Some(task) = &self.reader_task {
if !task.is_finished() {
self.stream_state = SseStreamState::Connected;
return Ok(());
}
}
let mut req = self
.client
.get(&self.sse_url)
.header("Accept", "text/event-stream")
.header("Cache-Control", "no-cache");
for (key, value) in &self.headers {
req = req.header(key, value);
}
let resp = req.send().await.context("SSE GET to MCP server failed")?;
if resp.status() == reqwest::StatusCode::NOT_FOUND
|| resp.status() == reqwest::StatusCode::METHOD_NOT_ALLOWED
{
self.stream_state = SseStreamState::Unsupported;
return Ok(());
}
if !resp.status().is_success() {
return Err(anyhow!("MCP server returned HTTP {}", resp.status()));
}
let is_event_stream = resp
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.is_some_and(|v| v.to_ascii_lowercase().contains("text/event-stream"));
if !is_event_stream {
self.stream_state = SseStreamState::Unsupported;
return Ok(());
}
let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
self.shutdown_tx = Some(shutdown_tx);
let shared = self.shared.clone();
let notify = self.notify.clone();
let sse_url = self.sse_url.clone();
let server_name = self.server_name.clone();
self.reader_task = Some(tokio::spawn(async move {
let stream = resp
.bytes_stream()
.map(|item| item.map_err(std::io::Error::other));
let reader = tokio_util::io::StreamReader::new(stream);
let mut lines = BufReader::new(reader).lines();
let mut cur_event: Option<String> = None;
let mut cur_id: Option<String> = None;
let mut cur_data: Vec<String> = Vec::new();
loop {
tokio::select! {
_ = &mut shutdown_rx => {
break;
}
line = lines.next_line() => {
let Ok(line_opt) = line else { break; };
let Some(mut line) = line_opt else { break; };
if line.ends_with('\r') {
line.pop();
}
if line.is_empty() {
if cur_event.is_none() && cur_id.is_none() && cur_data.is_empty() {
continue;
}
let event = cur_event.take();
let data = cur_data.join("\n");
cur_data.clear();
let id = cur_id.take();
handle_sse_event(
&server_name,
&sse_url,
&shared,
&notify,
event.as_deref(),
id.as_deref(),
data,
)
.await;
continue;
}
if line.starts_with(':') {
continue;
}
if let Some(rest) = line.strip_prefix("event:") {
cur_event = Some(rest.trim().to_string());
continue;
}
if let Some(rest) = line.strip_prefix("data:") {
let rest = rest.strip_prefix(' ').unwrap_or(rest);
cur_data.push(rest.to_string());
continue;
}
if let Some(rest) = line.strip_prefix("id:") {
cur_id = Some(rest.trim().to_string());
}
}
}
}
let pending = {
let mut guard = shared.lock().await;
std::mem::take(&mut guard.pending)
};
for (_, tx) in pending {
let _ = tx.send(JsonRpcResponse {
jsonrpc: crate::tools::mcp_protocol::JSONRPC_VERSION.to_string(),
id: None,
result: None,
error: Some(JsonRpcError {
code: INTERNAL_ERROR,
message: "SSE connection closed".to_string(),
data: None,
}),
});
}
}));
self.stream_state = SseStreamState::Connected;
Ok(())
}
async fn get_message_url(&self) -> Result<(String, bool)> {
let guard = self.shared.lock().await;
if let Some(url) = &guard.message_url {
return Ok((url.clone(), guard.message_url_from_endpoint));
}
drop(guard);
let derived = derive_message_url(&self.sse_url, "messages")
.or_else(|| derive_message_url(&self.sse_url, "message"))
.ok_or_else(|| anyhow!("invalid SSE URL"))?;
let mut guard = self.shared.lock().await;
if guard.message_url.is_none() {
guard.message_url = Some(derived.clone());
guard.message_url_from_endpoint = false;
}
Ok((derived, false))
}
}
#[derive(Default)]
struct SseSharedState {
message_url: Option<String>,
message_url_from_endpoint: bool,
pending: std::collections::HashMap<u64, oneshot::Sender<JsonRpcResponse>>,
}
fn derive_message_url(sse_url: &str, message_path: &str) -> Option<String> {
let url = reqwest::Url::parse(sse_url).ok()?;
let mut segments: Vec<&str> = url.path_segments()?.collect();
if segments.is_empty() {
return None;
}
if segments.last().copied() == Some("sse") {
segments.pop();
segments.push(message_path);
let mut new_url = url.clone();
new_url.set_path(&format!("/{}", segments.join("/")));
return Some(new_url.to_string());
}
let mut new_url = url.clone();
let mut path = url.path().trim_end_matches('/').to_string();
path.push('/');
path.push_str(message_path);
new_url.set_path(&path);
Some(new_url.to_string())
}
async fn handle_sse_event(
server_name: &str,
sse_url: &str,
shared: &std::sync::Arc<Mutex<SseSharedState>>,
notify: &std::sync::Arc<Notify>,
event: Option<&str>,
_id: Option<&str>,
data: String,
) {
let event = event.unwrap_or("message");
let trimmed = data.trim();
if trimmed.is_empty() {
return;
}
if event.eq_ignore_ascii_case("endpoint") || event.eq_ignore_ascii_case("mcp-endpoint") {
if let Some(url) = parse_endpoint_from_data(sse_url, trimmed) {
let mut guard = shared.lock().await;
guard.message_url = Some(url);
guard.message_url_from_endpoint = true;
drop(guard);
notify.notify_waiters();
}
return;
}
if !event.eq_ignore_ascii_case("message") {
return;
}
let Ok(value) = serde_json::from_str::<serde_json::Value>(trimmed) else {
return;
};
let Ok(resp) = serde_json::from_value::<JsonRpcResponse>(value.clone()) else {
let _ = serde_json::from_value::<JsonRpcRequest>(value);
return;
};
let Some(id_val) = resp.id.clone() else {
return;
};
let id = match id_val.as_u64() {
Some(v) => v,
None => return,
};
let tx = {
let mut guard = shared.lock().await;
guard.pending.remove(&id)
};
if let Some(tx) = tx {
let _ = tx.send(resp);
} else {
tracing::debug!(
"MCP SSE `{}` received response for unknown id {}",
server_name,
id
);
}
}
fn parse_endpoint_from_data(sse_url: &str, data: &str) -> Option<String> {
if data.starts_with('{') {
let v: serde_json::Value = serde_json::from_str(data).ok()?;
let endpoint = v.get("endpoint")?.as_str()?;
return parse_endpoint_from_data(sse_url, endpoint);
}
if data.starts_with("http://") || data.starts_with("https://") {
return Some(data.to_string());
}
let base = reqwest::Url::parse(sse_url).ok()?;
base.join(data).ok().map(|u| u.to_string())
}
fn extract_json_from_sse_text(resp_text: &str) -> Cow<'_, str> {
let text = resp_text.trim_start_matches('\u{feff}');
let mut current_data_lines: Vec<&str> = Vec::new();
let mut last_event_data_lines: Vec<&str> = Vec::new();
for raw_line in text.lines() {
let line = raw_line.trim_end_matches('\r').trim_start();
if line.is_empty() {
if !current_data_lines.is_empty() {
last_event_data_lines = std::mem::take(&mut current_data_lines);
}
continue;
}
if line.starts_with(':') {
continue;
}
if let Some(rest) = line.strip_prefix("data:") {
let rest = rest.strip_prefix(' ').unwrap_or(rest);
current_data_lines.push(rest);
}
}
if !current_data_lines.is_empty() {
last_event_data_lines = current_data_lines;
}
if last_event_data_lines.is_empty() {
return Cow::Borrowed(text.trim());
}
if last_event_data_lines.len() == 1 {
return Cow::Borrowed(last_event_data_lines[0].trim());
}
let joined = last_event_data_lines.join("\n");
Cow::Owned(joined.trim().to_string())
}
async fn read_first_jsonrpc_from_sse_response(
resp: reqwest::Response,
) -> Result<Option<JsonRpcResponse>> {
let stream = resp
.bytes_stream()
.map(|item| item.map_err(std::io::Error::other));
let reader = tokio_util::io::StreamReader::new(stream);
let mut lines = BufReader::new(reader).lines();
let mut cur_event: Option<String> = None;
let mut cur_data: Vec<String> = Vec::new();
while let Ok(line_opt) = lines.next_line().await {
let Some(mut line) = line_opt else { break };
if line.ends_with('\r') {
line.pop();
}
if line.is_empty() {
if cur_event.is_none() && cur_data.is_empty() {
continue;
}
let event = cur_event.take();
let data = cur_data.join("\n");
cur_data.clear();
let event = event.unwrap_or_else(|| "message".to_string());
if event.eq_ignore_ascii_case("endpoint") || event.eq_ignore_ascii_case("mcp-endpoint")
{
continue;
}
if !event.eq_ignore_ascii_case("message") {
continue;
}
let trimmed = data.trim();
if trimmed.is_empty() {
continue;
}
let json_str = extract_json_from_sse_text(trimmed);
if let Ok(resp) = serde_json::from_str::<JsonRpcResponse>(json_str.as_ref()) {
return Ok(Some(resp));
}
continue;
}
if line.starts_with(':') {
continue;
}
if let Some(rest) = line.strip_prefix("event:") {
cur_event = Some(rest.trim().to_string());
continue;
}
if let Some(rest) = line.strip_prefix("data:") {
let rest = rest.strip_prefix(' ').unwrap_or(rest);
cur_data.push(rest.to_string());
}
}
Ok(None)
}
#[async_trait::async_trait]
impl McpTransportConn for SseTransport {
async fn send_and_recv(&mut self, request: &JsonRpcRequest) -> Result<JsonRpcResponse> {
self.ensure_connected().await?;
let id = request.id.as_ref().and_then(|v| v.as_u64());
let body = serde_json::to_string(request)?;
let (mut message_url, mut from_endpoint) = self.get_message_url().await?;
if self.stream_state == SseStreamState::Connected && !from_endpoint {
for _ in 0..3 {
{
let guard = self.shared.lock().await;
if guard.message_url_from_endpoint {
if let Some(url) = &guard.message_url {
message_url = url.clone();
from_endpoint = true;
break;
}
}
}
let _ = timeout(Duration::from_millis(300), self.notify.notified()).await;
}
}
let primary_url = if from_endpoint {
message_url.clone()
} else {
self.sse_url.clone()
};
let secondary_url = if message_url == self.sse_url {
None
} else if primary_url == message_url {
Some(self.sse_url.clone())
} else {
Some(message_url.clone())
};
let has_secondary = secondary_url.is_some();
let mut rx = None;
if let Some(id) = id {
if self.stream_state == SseStreamState::Connected {
let (tx, ch) = oneshot::channel();
{
let mut guard = self.shared.lock().await;
guard.pending.insert(id, tx);
}
rx = Some((id, ch));
}
}
let mut got_direct = None;
let mut last_status = None;
for (i, url) in std::iter::once(primary_url)
.chain(secondary_url.into_iter())
.enumerate()
{
let mut req = self
.client
.post(&url)
.timeout(Duration::from_secs(120))
.body(body.clone())
.header("Content-Type", "application/json");
for (key, value) in &self.headers {
req = req.header(key, value);
}
if !self
.headers
.keys()
.any(|k| k.eq_ignore_ascii_case("Accept"))
{
req = req.header("Accept", "application/json, text/event-stream");
}
let resp = req.send().await.context("SSE POST to MCP server failed")?;
let status = resp.status();
last_status = Some(status);
if (status == reqwest::StatusCode::NOT_FOUND
|| status == reqwest::StatusCode::METHOD_NOT_ALLOWED)
&& i == 0
{
continue;
}
if !status.is_success() {
break;
}
if request.id.is_none() {
got_direct = Some(JsonRpcResponse {
jsonrpc: crate::tools::mcp_protocol::JSONRPC_VERSION.to_string(),
id: None,
result: None,
error: None,
});
break;
}
let is_sse = resp
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.is_some_and(|v| v.to_ascii_lowercase().contains("text/event-stream"));
if is_sse {
if i == 0 && has_secondary {
match timeout(
Duration::from_secs(3),
read_first_jsonrpc_from_sse_response(resp),
)
.await
{
Ok(res) => {
if let Some(resp) = res? {
got_direct = Some(resp);
}
break;
}
Err(_) => continue,
}
}
if let Some(resp) = read_first_jsonrpc_from_sse_response(resp).await? {
got_direct = Some(resp);
}
break;
}
let text = if i == 0 && has_secondary {
match timeout(Duration::from_secs(3), resp.text()).await {
Ok(Ok(t)) => t,
Ok(Err(_)) => String::new(),
Err(_) => continue,
}
} else {
resp.text().await.unwrap_or_default()
};
let trimmed = text.trim();
if !trimmed.is_empty() {
let json_str = if trimmed.contains("\ndata:") || trimmed.starts_with("data:") {
extract_json_from_sse_text(trimmed)
} else {
Cow::Borrowed(trimmed)
};
if let Ok(mcp_resp) = serde_json::from_str::<JsonRpcResponse>(json_str.as_ref()) {
got_direct = Some(mcp_resp);
}
}
break;
}
if let Some((id, _)) = rx.as_ref() {
if got_direct.is_some() {
let mut guard = self.shared.lock().await;
guard.pending.remove(id);
} else if let Some(status) = last_status {
if !status.is_success() {
let mut guard = self.shared.lock().await;
guard.pending.remove(id);
}
}
}
if let Some(resp) = got_direct {
return Ok(resp);
}
if let Some(status) = last_status {
if !status.is_success() {
bail!("MCP server returned HTTP {}", status);
}
} else {
bail!("MCP request not sent");
}
let Some((_id, rx)) = rx else {
bail!("MCP server returned no response");
};
rx.await.map_err(|_| anyhow!("SSE response channel closed"))
}
async fn close(&mut self) -> Result<()> {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
if let Some(task) = self.reader_task.take() {
task.abort();
}
Ok(())
}
}
// ── Factory ──────────────────────────────────────────────────────────────
/// Create a transport based on config.
pub fn create_transport(config: &McpServerConfig) -> Result<Box<dyn McpTransportConn>> {
match config.transport {
McpTransport::Stdio => Ok(Box::new(StdioTransport::new(config)?)),
McpTransport::Http => Ok(Box::new(HttpTransport::new(config)?)),
McpTransport::Sse => Ok(Box::new(SseTransport::new(config)?)),
}
}
// ── Tests ─────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_transport_default_is_stdio() {
let config = McpServerConfig::default();
assert_eq!(config.transport, McpTransport::Stdio);
}
#[test]
fn test_http_transport_requires_url() {
let config = McpServerConfig {
name: "test".into(),
transport: McpTransport::Http,
..Default::default()
};
assert!(HttpTransport::new(&config).is_err());
}
#[test]
fn test_sse_transport_requires_url() {
let config = McpServerConfig {
name: "test".into(),
transport: McpTransport::Sse,
..Default::default()
};
assert!(SseTransport::new(&config).is_err());
}
#[test]
fn test_extract_json_from_sse_data_no_space() {
let input = "data:{\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
let extracted = extract_json_from_sse_text(input);
let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
}
#[test]
fn test_extract_json_from_sse_with_event_and_id() {
let input = "id: 1\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
let extracted = extract_json_from_sse_text(input);
let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
}
#[test]
fn test_extract_json_from_sse_multiline_data() {
let input = "event: message\ndata: {\ndata: \"jsonrpc\": \"2.0\",\ndata: \"result\": {}\ndata: }\n\n";
let extracted = extract_json_from_sse_text(input);
let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
}
#[test]
fn test_extract_json_from_sse_skips_bom_and_leading_whitespace() {
let input = "\u{feff}\n\n data: {\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
let extracted = extract_json_from_sse_text(input);
let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
}
#[test]
fn test_extract_json_from_sse_uses_last_event_with_data() {
let input =
": keep-alive\n\nid: 1\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"result\":{}}\n\n";
let extracted = extract_json_from_sse_text(input);
let _: JsonRpcResponse = serde_json::from_str(extracted.as_ref()).unwrap();
}
}
+5
View File
@@ -40,6 +40,10 @@ pub mod hardware_memory_map;
pub mod hardware_memory_read;
pub mod http_request;
pub mod image_info;
pub mod mcp_client;
pub mod mcp_protocol;
pub mod mcp_tool;
pub mod mcp_transport;
pub mod memory_forget;
pub mod memory_recall;
pub mod memory_store;
@@ -341,6 +345,7 @@ pub fn all_tools_with_runtime(
secrets_encrypt: root_config.secrets.encrypt,
reasoning_enabled: root_config.runtime.reasoning_enabled,
provider_timeout_secs: Some(root_config.provider_timeout_secs),
extra_headers: root_config.extra_headers.clone(),
},
)
.with_parent_tools(parent_tools)
+81 -7
View File
@@ -3,6 +3,34 @@ use anyhow::{bail, Result};
use tokio::io::AsyncBufReadExt;
use tokio::process::Command;
/// Try to extract a real tunnel URL from a cloudflared log line.
///
/// Returns `Some(url)` when the line contains a genuine tunnel endpoint,
/// skipping documentation and warning URLs (quic-go GitHub links,
/// Cloudflare docs pages, etc.).
fn extract_tunnel_url(line: &str) -> Option<String> {
let idx = line.find("https://")?;
let url_part = &line[idx..];
let end = url_part
.find(|c: char| c.is_whitespace())
.unwrap_or(url_part.len());
let candidate = &url_part[..end];
let is_tunnel_line = line.contains("Visit it at")
|| line.contains("Route at")
|| line.contains("Registered tunnel connection");
let is_tunnel_domain = candidate.contains(".trycloudflare.com");
let is_docs_url = candidate.contains("github.com")
|| candidate.contains("cloudflare.com/docs")
|| candidate.contains("developers.cloudflare.com");
if is_tunnel_line || is_tunnel_domain || !is_docs_url {
Some(candidate.to_string())
} else {
None
}
}
/// Cloudflare Tunnel — wraps the `cloudflared` binary.
///
/// Requires `cloudflared` installed and a tunnel token from the
@@ -62,13 +90,8 @@ impl Tunnel for CloudflareTunnel {
match line {
Ok(Ok(Some(l))) => {
tracing::debug!("cloudflared: {l}");
// Look for the URL pattern in cloudflared output
if let Some(idx) = l.find("https://") {
let url_part = &l[idx..];
let end = url_part
.find(|c: char| c.is_whitespace())
.unwrap_or(url_part.len());
public_url = url_part[..end].to_string();
if let Some(url) = extract_tunnel_url(&l) {
public_url = url;
break;
}
}
@@ -138,4 +161,55 @@ mod tests {
let tunnel = CloudflareTunnel::new("cf-token".into());
assert!(!tunnel.health_check().await);
}
#[test]
fn extract_skips_quic_go_github_url() {
let line = "2024-01-01T00:00:00Z WRN failed to sufficiently increase receive buffer size. See https://github.com/quic-go/quic-go/wiki/UDP-Buffer-Sizes for details.";
assert_eq!(extract_tunnel_url(line), None);
}
#[test]
fn extract_skips_cloudflare_docs_url() {
let line = "2024-01-01T00:00:00Z INF For more info see https://cloudflare.com/docs/tunnels";
assert_eq!(extract_tunnel_url(line), None);
}
#[test]
fn extract_skips_developers_cloudflare_url() {
let line = "2024-01-01T00:00:00Z INF See https://developers.cloudflare.com/cloudflare-one/connections/connect-apps";
assert_eq!(extract_tunnel_url(line), None);
}
#[test]
fn extract_captures_trycloudflare_url() {
let line = "2024-01-01T00:00:00Z INF Visit it at https://my-tunnel-abc.trycloudflare.com";
assert_eq!(
extract_tunnel_url(line),
Some("https://my-tunnel-abc.trycloudflare.com".into())
);
}
#[test]
fn extract_captures_url_on_visit_it_at_line() {
let line = "2024-01-01T00:00:00Z INF Visit it at https://some-custom-domain.example.com";
assert_eq!(
extract_tunnel_url(line),
Some("https://some-custom-domain.example.com".into())
);
}
#[test]
fn extract_captures_url_on_route_at_line() {
let line = "2024-01-01T00:00:00Z INF Route at https://tunnel.example.com/path";
assert_eq!(
extract_tunnel_url(line),
Some("https://tunnel.example.com/path".into())
);
}
#[test]
fn extract_returns_none_for_line_without_url() {
let line = "2024-01-01T00:00:00Z INF Starting tunnel";
assert_eq!(extract_tunnel_url(line), None);
}
}
+1
View File
@@ -152,6 +152,7 @@ async fn openai_codex_second_vision_support() -> Result<()> {
secrets_encrypt: false,
reasoning_enabled: None,
provider_timeout_secs: None,
extra_headers: std::collections::HashMap::new(),
};
let provider = zeroclaw::providers::create_provider_with_options("openai-codex", None, &opts)?;
+2 -2
View File
@@ -80,7 +80,7 @@ function PairingDialog({ onPair }: { onPair: (code: string) => Promise<void> })
}
function AppContent() {
const { isAuthenticated, loading, pair, logout } = useAuth();
const { isAuthenticated, requiresPairing, loading, pair, logout } = useAuth();
const [locale, setLocaleState] = useState('tr');
const setAppLocale = (newLocale: string) => {
@@ -105,7 +105,7 @@ function AppContent() {
);
}
if (!isAuthenticated) {
if (!isAuthenticated && requiresPairing) {
return <PairingDialog onPair={pair} />;
}
+5
View File
@@ -24,6 +24,8 @@ export interface AuthState {
token: string | null;
/** Whether the user is currently authenticated. */
isAuthenticated: boolean;
/** Whether the server requires pairing. Defaults to true (safe fallback). */
requiresPairing: boolean;
/** True while the initial auth check is in progress. */
loading: boolean;
/** Pair with the agent using a pairing code. Stores the token on success. */
@@ -45,6 +47,7 @@ export interface AuthProviderProps {
export function AuthProvider({ children }: AuthProviderProps) {
const [token, setTokenState] = useState<string | null>(readToken);
const [authenticated, setAuthenticated] = useState<boolean>(checkAuth);
const [requiresPairing, setRequiresPairing] = useState<boolean>(true);
const [loading, setLoading] = useState<boolean>(!checkAuth());
// On mount: check if server requires pairing at all
@@ -55,6 +58,7 @@ export function AuthProvider({ children }: AuthProviderProps) {
.then((health) => {
if (cancelled) return;
if (!health.require_pairing) {
setRequiresPairing(false);
setAuthenticated(true);
}
})
@@ -98,6 +102,7 @@ export function AuthProvider({ children }: AuthProviderProps) {
const value: AuthState = {
token,
isAuthenticated: authenticated,
requiresPairing,
loading,
pair,
logout,
+27
View File
@@ -0,0 +1,27 @@
/**
* Generate a UUID v4 string.
*
* Uses `crypto.randomUUID()` when available (modern browsers, secure contexts)
* and falls back to a manual implementation backed by `crypto.getRandomValues()`
* for older browsers (e.g. Safari < 15.4, some Electron/Raspberry-Pi builds).
*
* Closes #3303, #3261.
*/
export function generateUUID(): string {
if (typeof crypto !== 'undefined' && typeof crypto.randomUUID === 'function') {
return crypto.randomUUID();
}
// Fallback: RFC 4122 version 4 UUID via getRandomValues
// crypto must exist if we reached here (only randomUUID is missing)
const c = globalThis.crypto;
const bytes = new Uint8Array(16);
c.getRandomValues(bytes);
// Set version (4) and variant (10xx) bits per RFC 4122
bytes[6] = (bytes[6]! & 0x0f) | 0x40;
bytes[8] = (bytes[8]! & 0x3f) | 0x80;
const hex = Array.from(bytes, (b) => b.toString(16).padStart(2, '0')).join('');
return `${hex.slice(0, 8)}-${hex.slice(8, 12)}-${hex.slice(12, 16)}-${hex.slice(16, 20)}-${hex.slice(20)}`;
}
+2 -1
View File
@@ -1,5 +1,6 @@
import type { WsMessage } from '../types/api';
import { getToken } from './auth';
import { generateUUID } from './uuid';
export type WsMessageHandler = (msg: WsMessage) => void;
export type WsOpenHandler = () => void;
@@ -26,7 +27,7 @@ const SESSION_STORAGE_KEY = 'zeroclaw_session_id';
function getOrCreateSessionId(): string {
let id = sessionStorage.getItem(SESSION_STORAGE_KEY);
if (!id) {
id = crypto.randomUUID();
id = generateUUID();
sessionStorage.setItem(SESSION_STORAGE_KEY, id);
}
return id;
+6 -5
View File
@@ -2,6 +2,7 @@ import { useState, useEffect, useRef, useCallback } from 'react';
import { Send, Bot, User, AlertCircle, Copy, Check } from 'lucide-react';
import type { WsMessage } from '@/types/api';
import { WebSocketClient } from '@/lib/ws';
import { generateUUID } from '@/lib/uuid';
interface ChatMessage {
id: string;
@@ -53,7 +54,7 @@ export default function AgentChat() {
setMessages((prev) => [
...prev,
{
id: crypto.randomUUID(),
id: generateUUID(),
role: 'agent',
content,
timestamp: new Date(),
@@ -69,7 +70,7 @@ export default function AgentChat() {
setMessages((prev) => [
...prev,
{
id: crypto.randomUUID(),
id: generateUUID(),
role: 'agent',
content: `[Tool Call] ${msg.name ?? 'unknown'}(${JSON.stringify(msg.args ?? {})})`,
timestamp: new Date(),
@@ -81,7 +82,7 @@ export default function AgentChat() {
setMessages((prev) => [
...prev,
{
id: crypto.randomUUID(),
id: generateUUID(),
role: 'agent',
content: `[Tool Result] ${msg.output ?? ''}`,
timestamp: new Date(),
@@ -93,7 +94,7 @@ export default function AgentChat() {
setMessages((prev) => [
...prev,
{
id: crypto.randomUUID(),
id: generateUUID(),
role: 'agent',
content: `[Error] ${msg.message ?? 'Unknown error'}`,
timestamp: new Date(),
@@ -124,7 +125,7 @@ export default function AgentChat() {
setMessages((prev) => [
...prev,
{
id: crypto.randomUUID(),
id: generateUUID(),
role: 'user',
content: trimmed,
timestamp: new Date(),