Compare commits

...

17 Commits

Author SHA1 Message Date
simianastronaut a1af84d992 fix(security): enforce approval policy for channel-driven runs
Channel-driven runs (Telegram, Matrix, Discord, etc.) previously bypassed
the ApprovalManager entirely — `None` was passed into the tool-call loop,
so `auto_approve`, `always_ask`, and supervised approval checks were
silently skipped for all non-CLI execution paths.

Add a non-interactive mode to ApprovalManager that enforces the same
autonomy config policies but auto-denies tools requiring interactive
approval (since no operator is present on channel runs). Specifically:

- Add `ApprovalManager::for_non_interactive()` constructor that creates
  a manager which auto-denies tools needing approval instead of prompting
- Add `is_non_interactive()` method so the tool-call loop can distinguish
  interactive (CLI prompt) from non-interactive (auto-deny) managers
- Update tool-call loop: non-interactive managers auto-deny instead of
  the previous auto-approve behavior for non-CLI channels
- Wire the non-interactive approval manager into ChannelRuntimeContext
  so channel runs enforce the full approval policy
- Add 8 tests covering non-interactive approval behavior

Security implications:
- `always_ask` tools are now denied on channels (previously bypassed)
- Supervised-mode unknown tools are now denied on channels (previously
  bypassed)
- `auto_approve` tools continue to work on channels unchanged
- `full` autonomy mode is unaffected (no approval needed regardless)
- `read_only` mode is unaffected (blocks execution elsewhere)

Closes #3487

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 15:56:57 -04:00
SimianAstronaut7 e34a804255 Merge pull request #3632 from zeroclaw-labs/work-issues/3544-fix-codex-sse-buffering
fix(provider): use incremental SSE stream reading for openai-codex responses
2026-03-15 15:34:39 -04:00
SimianAstronaut7 6120b3f705 Merge pull request #3630 from zeroclaw-labs/work-issues/3567-allow-commands-bypass-high-risk
fix(security): let explicit allowed_commands bypass high-risk block
2026-03-15 15:34:37 -04:00
SimianAstronaut7 f175261e32 Merge pull request #3631 from zeroclaw-labs/work-issues/3486-fix-matrix-image-marker
fix(channels): use canonical IMAGE marker in Matrix channel
2026-03-15 15:34:31 -04:00
simianastronaut fd9f66cad7 fix(provider): use incremental SSE stream reading for openai-codex responses
Replace full-body buffering (`response.text().await`) in
`decode_responses_body()` with incremental `bytes_stream()` chunk
processing.  The previous approach held the HTTP connection open until
every byte had arrived; on high-latency links the long-lived connection
would frequently drop mid-read, producing the "error decoding response
body" failure on the first attempt (succeeding only after retry).

Reading chunks incrementally lets each network segment complete within
its own timeout window, eliminating the systematic first-attempt failure.

Closes #3544

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 15:22:55 -04:00
simianastronaut d928ebc92e fix(channels): use canonical IMAGE marker in Matrix channel
Matrix image messages used lowercase `[image: ...]` format instead of
the canonical `[IMAGE:...]` marker used by all other channels (Telegram,
Slack, Discord, QQ, LinQ). This caused Matrix image attachments to
bypass the multimodal vision pipeline which looks for `[IMAGE:...]`.

Closes #3486

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 15:14:53 -04:00
simianastronaut 9fca9f478a fix(security): let explicit allowed_commands bypass high-risk block
When `block_high_risk_commands = true`, commands like `curl` and `wget`
were unconditionally blocked even if explicitly listed in
`allowed_commands`. This made it impossible to use legitimate API calls
in full autonomy mode.

Now, if a command is explicitly named in `allowed_commands` (not via
the wildcard `*`), it is exempt from the `block_high_risk_commands`
gate. The wildcard entry intentionally does NOT grant this exemption,
preserving the safety net for broad allowlists.

Other security gates (supervised-mode approval, rate limiting, path
policy, argument validation) remain fully enforced.

Closes #3567

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 15:13:32 -04:00
SimianAstronaut7 7106632b51 Merge pull request #3627 from zeroclaw-labs/work-issues/3533-fix-utf8-slice-panic
fix(agent): use char-boundary-safe slicing to prevent CJK text panic
2026-03-15 14:36:49 -04:00
SimianAstronaut7 b834278754 Merge pull request #3626 from zeroclaw-labs/work-issues/3563-fix-cron-add-nl-security
fix(cron): add --agent flag so natural language prompts bypass shell security
2026-03-15 14:36:46 -04:00
SimianAstronaut7 186f6d9797 Merge pull request #3625 from zeroclaw-labs/work-issues/3568-http-request-private-hosts
feat(tool): add allow_private_hosts option to http_request tool
2026-03-15 14:36:44 -04:00
simianastronaut 6cdc92a256 fix(agent): use char-boundary-safe slicing to prevent CJK text panic
Replace unsafe byte-index string slicing (`&text[..N]`) with
char-boundary-safe alternatives in memory consolidation and security
redaction to prevent panics when multi-byte UTF-8 characters (e.g.
Chinese/Japanese/Korean) span the slice boundary.

Fixes the same class of bug as the prior fix in `execute_one_tool`
(commit 8fcbb6eb), applied to two remaining instances:
- `src/memory/consolidation.rs`: truncation at byte 4000 and 200
- `src/security/mod.rs`: `redact()` prefix at byte 4

Adds regression tests with CJK input for both locations.

Closes #3533

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 14:27:09 -04:00
simianastronaut 02599dcd3c fix(cron): add --agent flag to CLI cron commands to bypass shell security validation
The CLI `cron add` command always routed the second positional argument
through shell security policy validation, which blocked natural language
prompts like "Check server health: disk space, memory, CPU load". This
adds an `--agent` flag to `cron add`, `cron add-at`, `cron add-every`,
and `cron once` so that natural language prompts are correctly stored as
agent jobs without shell command validation.

Closes #3563

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 14:26:45 -04:00
simianastronaut fe64d7ef7e feat(tool): add allow_private_hosts option to http_request tool (#3568)
The http_request tool unconditionally blocked all private/LAN hosts with
no opt-out, preventing legitimate use cases like calling a local Home
Assistant instance or internal APIs. This adds an `allow_private_hosts`
config flag (default: false) under `[http_request]` that, when set to
true, skips the private-host SSRF check while still enforcing the domain
allowlist.

Closes #3568

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 14:23:54 -04:00
Argenis 45f953be6d Merge pull request #3578 from zeroclaw-labs/chore/bump-v0.3.3
chore: bump version to v0.3.3
2026-03-15 09:53:13 -04:00
argenis de la rosa 82f29bbcb1 chore: bump version to v0.3.3
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 09:41:22 -04:00
Argenis 93b5a0b824 feat(context): token-based compaction, persistent sessions, and LLM consolidation (#3574)
Comprehensive long-running context upgrades:

- Token-based compaction: replace message-count trigger with token
  estimation (~4 chars/token). Compaction fires when estimated tokens
  exceed max_context_tokens (default 32K) OR message count exceeds
  max_history_messages. Cuts at user-turn boundaries only.

- Persistent sessions: JSONL append-only session files per channel
  sender in {workspace}/sessions/. Sessions survive daemon restarts.
  Hydrates in-memory history from disk on startup.

- LLM-driven memory consolidation: two-phase extraction after each
  conversation turn. Phase 1 writes a timestamped history entry (Daily).
  Phase 2 extracts new facts/preferences to Core memory (if any).
  Replaces raw message auto-save with semantic extraction.

- New config fields: agent.max_context_tokens (32000),
  channels_config.session_persistence (true).

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 09:25:23 -04:00
Argenis 08a67c4a2d chore: bump version to v0.3.2 (#3564)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 06:47:37 -04:00
18 changed files with 1220 additions and 54 deletions
Generated
+1 -1
View File
@@ -7945,7 +7945,7 @@ dependencies = [
[[package]]
name = "zeroclawlabs"
version = "0.3.1"
version = "0.3.3"
dependencies = [
"anyhow",
"async-imap",
+1 -1
View File
@@ -4,7 +4,7 @@ resolver = "2"
[package]
name = "zeroclawlabs"
version = "0.3.1"
version = "0.3.3"
edition = "2021"
authors = ["theonlyhennygod"]
license = "MIT OR Apache-2.0"
+60 -6
View File
@@ -195,6 +195,18 @@ const COMPACTION_MAX_SOURCE_CHARS: usize = 12_000;
/// Max characters retained in stored compaction summary.
const COMPACTION_MAX_SUMMARY_CHARS: usize = 2_000;
/// Estimate token count for a message history using ~4 chars/token heuristic.
/// Includes a small overhead per message for role/framing tokens.
fn estimate_history_tokens(history: &[ChatMessage]) -> usize {
history
.iter()
.map(|m| {
// ~4 chars per token + ~4 framing tokens per message (role, delimiters)
m.content.len().div_ceil(4) + 4
})
.sum()
}
/// Minimum interval between progress sends to avoid flooding the draft channel.
pub(crate) const PROGRESS_MIN_INTERVAL_MS: u64 = 500;
@@ -288,6 +300,7 @@ async fn auto_compact_history(
provider: &dyn Provider,
model: &str,
max_history: usize,
max_context_tokens: usize,
) -> Result<bool> {
let has_system = history.first().map_or(false, |m| m.role == "system");
let non_system_count = if has_system {
@@ -296,7 +309,10 @@ async fn auto_compact_history(
history.len()
};
if non_system_count <= max_history {
let estimated_tokens = estimate_history_tokens(history);
// Trigger compaction when either token budget OR message count is exceeded.
if estimated_tokens <= max_context_tokens && non_system_count <= max_history {
return Ok(false);
}
@@ -307,7 +323,16 @@ async fn auto_compact_history(
return Ok(false);
}
let compact_end = start + compact_count;
let mut compact_end = start + compact_count;
// Snap compact_end to a user-turn boundary so we don't split mid-conversation.
while compact_end > start && history.get(compact_end).map_or(false, |m| m.role != "user") {
compact_end -= 1;
}
if compact_end <= start {
return Ok(false);
}
let to_compact: Vec<ChatMessage> = history[start..compact_end].to_vec();
let transcript = build_compaction_transcript(&to_compact);
@@ -2662,11 +2687,13 @@ pub(crate) async fn run_tool_call_loop(
arguments: tool_args.clone(),
};
// Only prompt interactively on CLI; auto-approve on other channels.
let decision = if channel_name == "cli" {
mgr.prompt_cli(&request)
// Interactive CLI: prompt the operator.
// Non-interactive (channels): auto-deny since no operator
// is present to approve.
let decision = if mgr.is_non_interactive() {
ApprovalResponse::No
} else {
ApprovalResponse::Yes
mgr.prompt_cli(&request)
};
mgr.record_decision(&tool_name, &tool_args, decision, channel_name);
@@ -3508,6 +3535,7 @@ pub async fn run(
provider.as_ref(),
model_name,
config.agent.max_history_messages,
config.agent.max_context_tokens,
)
.await
{
@@ -6449,4 +6477,30 @@ Let me check the result."#;
let result = filter_tool_specs_for_turn(specs, &groups, "BROWSE the site");
assert_eq!(result.len(), 1);
}
// ── Token-based compaction tests ──────────────────────────
#[test]
fn estimate_history_tokens_empty() {
assert_eq!(super::estimate_history_tokens(&[]), 0);
}
#[test]
fn estimate_history_tokens_single_message() {
let history = vec![ChatMessage::user("hello world")]; // 11 chars
let tokens = super::estimate_history_tokens(&history);
// 11.div_ceil(4) + 4 = 3 + 4 = 7
assert_eq!(tokens, 7);
}
#[test]
fn estimate_history_tokens_multiple_messages() {
let history = vec![
ChatMessage::system("You are helpful."), // 16 chars → 4 + 4 = 8
ChatMessage::user("What is Rust?"), // 13 chars → 4 + 4 = 8
ChatMessage::assistant("A language."), // 11 chars → 3 + 4 = 7
];
let tokens = super::estimate_history_tokens(&history);
assert_eq!(tokens, 23);
}
}
+128 -4
View File
@@ -44,11 +44,18 @@ pub struct ApprovalLogEntry {
// ── ApprovalManager ──────────────────────────────────────────────
/// Manages the interactive approval workflow.
/// Manages the approval workflow for tool calls.
///
/// - Checks config-level `auto_approve` / `always_ask` lists
/// - Maintains a session-scoped "always" allowlist
/// - Records an audit trail of all decisions
///
/// Two modes:
/// - **Interactive** (CLI): tools needing approval trigger a stdin prompt.
/// - **Non-interactive** (channels): tools needing approval are auto-denied
/// because there is no interactive operator to approve them. `auto_approve`
/// policy is still enforced, and `always_ask` / supervised-default tools are
/// denied rather than silently allowed.
pub struct ApprovalManager {
/// Tools that never need approval (from config).
auto_approve: HashSet<String>,
@@ -56,6 +63,9 @@ pub struct ApprovalManager {
always_ask: HashSet<String>,
/// Autonomy level from config.
autonomy_level: AutonomyLevel,
/// When `true`, tools that would require interactive approval are
/// auto-denied instead. Used for channel-driven (non-CLI) runs.
non_interactive: bool,
/// Session-scoped allowlist built from "Always" responses.
session_allowlist: Mutex<HashSet<String>>,
/// Audit trail of approval decisions.
@@ -63,17 +73,40 @@ pub struct ApprovalManager {
}
impl ApprovalManager {
/// Create from autonomy config.
/// Create an interactive (CLI) approval manager from autonomy config.
pub fn from_config(config: &AutonomyConfig) -> Self {
Self {
auto_approve: config.auto_approve.iter().cloned().collect(),
always_ask: config.always_ask.iter().cloned().collect(),
autonomy_level: config.level,
non_interactive: false,
session_allowlist: Mutex::new(HashSet::new()),
audit_log: Mutex::new(Vec::new()),
}
}
/// Create a non-interactive approval manager for channel-driven runs.
///
/// Enforces the same `auto_approve` / `always_ask` / supervised policies
/// as the CLI manager, but tools that would require interactive approval
/// are auto-denied instead of prompting (since there is no operator).
pub fn for_non_interactive(config: &AutonomyConfig) -> Self {
Self {
auto_approve: config.auto_approve.iter().cloned().collect(),
always_ask: config.always_ask.iter().cloned().collect(),
autonomy_level: config.level,
non_interactive: true,
session_allowlist: Mutex::new(HashSet::new()),
audit_log: Mutex::new(Vec::new()),
}
}
/// Returns `true` when this manager operates in non-interactive mode
/// (i.e. for channel-driven runs where no operator can approve).
pub fn is_non_interactive(&self) -> bool {
self.non_interactive
}
/// Check whether a tool call requires interactive approval.
///
/// Returns `true` if the call needs a prompt, `false` if it can proceed.
@@ -147,8 +180,8 @@ impl ApprovalManager {
/// Prompt the user on the CLI and return their decision.
///
/// For non-CLI channels, returns `Yes` automatically (interactive
/// approval is only supported on CLI for now).
/// Only called for interactive (CLI) managers. Non-interactive managers
/// auto-deny in the tool-call loop before reaching this point.
pub fn prompt_cli(&self, request: &ApprovalRequest) -> ApprovalResponse {
prompt_cli_interactive(request)
}
@@ -401,6 +434,97 @@ mod tests {
assert!(summary.contains("just a string"));
}
// ── non-interactive (channel) mode ────────────────────────
#[test]
fn non_interactive_manager_reports_non_interactive() {
let mgr = ApprovalManager::for_non_interactive(&supervised_config());
assert!(mgr.is_non_interactive());
}
#[test]
fn interactive_manager_reports_interactive() {
let mgr = ApprovalManager::from_config(&supervised_config());
assert!(!mgr.is_non_interactive());
}
#[test]
fn non_interactive_auto_approve_tools_skip_approval() {
let mgr = ApprovalManager::for_non_interactive(&supervised_config());
// auto_approve tools (file_read, memory_recall) should not need approval.
assert!(!mgr.needs_approval("file_read"));
assert!(!mgr.needs_approval("memory_recall"));
}
#[test]
fn non_interactive_always_ask_tools_need_approval() {
let mgr = ApprovalManager::for_non_interactive(&supervised_config());
// always_ask tools (shell) still report as needing approval,
// so the tool-call loop will auto-deny them in non-interactive mode.
assert!(mgr.needs_approval("shell"));
}
#[test]
fn non_interactive_unknown_tools_need_approval_in_supervised() {
let mgr = ApprovalManager::for_non_interactive(&supervised_config());
// Unknown tools in supervised mode need approval (will be auto-denied
// by the tool-call loop for non-interactive managers).
assert!(mgr.needs_approval("file_write"));
assert!(mgr.needs_approval("http_request"));
}
#[test]
fn non_interactive_full_autonomy_never_needs_approval() {
let mgr = ApprovalManager::for_non_interactive(&full_config());
// Full autonomy means no approval needed, even in non-interactive mode.
assert!(!mgr.needs_approval("shell"));
assert!(!mgr.needs_approval("file_write"));
assert!(!mgr.needs_approval("anything"));
}
#[test]
fn non_interactive_readonly_never_needs_approval() {
let config = AutonomyConfig {
level: AutonomyLevel::ReadOnly,
..AutonomyConfig::default()
};
let mgr = ApprovalManager::for_non_interactive(&config);
// ReadOnly blocks execution elsewhere; approval manager does not prompt.
assert!(!mgr.needs_approval("shell"));
}
#[test]
fn non_interactive_session_allowlist_still_works() {
let mgr = ApprovalManager::for_non_interactive(&supervised_config());
assert!(mgr.needs_approval("file_write"));
// Simulate an "Always" decision (would come from a prior channel run
// if the tool was auto-approved somehow, e.g. via config change).
mgr.record_decision(
"file_write",
&serde_json::json!({"path": "test.txt"}),
ApprovalResponse::Always,
"telegram",
);
assert!(!mgr.needs_approval("file_write"));
}
#[test]
fn non_interactive_always_ask_overrides_session_allowlist() {
let mgr = ApprovalManager::for_non_interactive(&supervised_config());
mgr.record_decision(
"shell",
&serde_json::json!({"command": "ls"}),
ApprovalResponse::Always,
"telegram",
);
// shell is in always_ask, so it still needs approval even after "Always".
assert!(mgr.needs_approval("shell"));
}
// ── ApprovalResponse serde ───────────────────────────────
#[test]
+1 -1
View File
@@ -746,7 +746,7 @@ impl Channel for MatrixChannel {
MessageType::Notice(content) => (content.body.clone(), None),
MessageType::Image(content) => {
let dl = media_info(&content.source, &content.body);
(format!("[image: {}]", content.body), dl)
(format!("[IMAGE:{}]", content.body), dl)
}
MessageType::File(content) => {
let dl = media_info(&content.source, &content.body);
+170 -1
View File
@@ -31,6 +31,7 @@ pub mod nextcloud_talk;
#[cfg(feature = "channel-nostr")]
pub mod nostr;
pub mod qq;
pub mod session_store;
pub mod signal;
pub mod slack;
pub mod telegram;
@@ -75,6 +76,7 @@ pub use whatsapp::WhatsAppChannel;
pub use whatsapp_web::WhatsAppWebChannel;
use crate::agent::loop_::{build_tool_instructions, run_tool_call_loop, scrub_credentials};
use crate::approval::ApprovalManager;
use crate::config::Config;
use crate::identity;
use crate::memory::{self, Memory};
@@ -312,6 +314,12 @@ struct ChannelRuntimeContext {
model_routes: Arc<Vec<crate::config::ModelRouteConfig>>,
ack_reactions: bool,
show_tool_calls: bool,
session_store: Option<Arc<session_store::SessionStore>>,
/// Non-interactive approval manager for channel-driven runs.
/// Enforces `auto_approve` / `always_ask` / supervised policy from
/// `[autonomy]` config; auto-denies tools that would need interactive
/// approval since no operator is present on channel runs.
approval_manager: Arc<ApprovalManager>,
}
#[derive(Clone)]
@@ -965,6 +973,13 @@ fn proactive_trim_turns(turns: &mut Vec<ChatMessage>, budget: usize) -> usize {
}
fn append_sender_turn(ctx: &ChannelRuntimeContext, sender_key: &str, turn: ChatMessage) {
// Persist to JSONL before adding to in-memory history.
if let Some(ref store) = ctx.session_store {
if let Err(e) = store.append(sender_key, &turn) {
tracing::warn!("Failed to persist session turn: {e}");
}
}
let mut histories = ctx
.conversation_histories
.lock()
@@ -2016,7 +2031,7 @@ async fn process_channel_message(
route.model.as_str(),
runtime_defaults.temperature,
true,
None,
Some(&*ctx.approval_manager),
msg.channel.as_str(),
&ctx.multimodal,
ctx.max_tool_iterations,
@@ -2186,6 +2201,29 @@ async fn process_channel_message(
&history_key,
ChatMessage::assistant(&history_response),
);
// Fire-and-forget LLM-driven memory consolidation.
if ctx.auto_save_memory && msg.content.chars().count() >= AUTOSAVE_MIN_MESSAGE_CHARS {
let provider = Arc::clone(&ctx.provider);
let model = ctx.model.to_string();
let memory = Arc::clone(&ctx.memory);
let user_msg = msg.content.clone();
let assistant_resp = delivered_response.clone();
tokio::spawn(async move {
if let Err(e) = crate::memory::consolidation::consolidate_turn(
provider.as_ref(),
&model,
memory.as_ref(),
&user_msg,
&assistant_resp,
)
.await
{
tracing::debug!("Memory consolidation skipped: {e}");
}
});
}
println!(
" 🤖 Reply ({}ms): {}",
started_at.elapsed().as_millis(),
@@ -3805,8 +3843,43 @@ pub async fn start_channels(config: Config) -> Result<()> {
model_routes: Arc::new(config.model_routes.clone()),
ack_reactions: config.channels_config.ack_reactions,
show_tool_calls: config.channels_config.show_tool_calls,
session_store: if config.channels_config.session_persistence {
match session_store::SessionStore::new(&config.workspace_dir) {
Ok(store) => {
tracing::info!("📂 Session persistence enabled");
Some(Arc::new(store))
}
Err(e) => {
tracing::warn!("Session persistence disabled: {e}");
None
}
}
} else {
None
},
approval_manager: Arc::new(ApprovalManager::for_non_interactive(&config.autonomy)),
});
// Hydrate in-memory conversation histories from persisted JSONL session files.
if let Some(ref store) = runtime_ctx.session_store {
let mut hydrated = 0usize;
let mut histories = runtime_ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
for key in store.list_sessions() {
let msgs = store.load(&key);
if !msgs.is_empty() {
hydrated += 1;
histories.insert(key, msgs);
}
}
drop(histories);
if hydrated > 0 {
tracing::info!("📂 Restored {hydrated} session(s) from disk");
}
}
run_message_dispatch_loop(rx, runtime_ctx, max_in_flight_messages).await;
// Wait for all channel tasks
@@ -4072,6 +4145,10 @@ mod tests {
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
};
assert!(compact_sender_history(&ctx, &sender));
@@ -4175,6 +4252,10 @@ mod tests {
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
};
append_sender_turn(&ctx, &sender, ChatMessage::user("hello"));
@@ -4234,6 +4315,10 @@ mod tests {
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
};
assert!(rollback_orphan_user_turn(&ctx, &sender, "pending"));
@@ -4751,6 +4836,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
@@ -4818,6 +4907,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
@@ -4899,6 +4992,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
@@ -4965,6 +5062,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
@@ -5041,6 +5142,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
@@ -5137,6 +5242,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
@@ -5215,6 +5324,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
@@ -5308,6 +5421,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
@@ -5386,6 +5503,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
@@ -5454,6 +5575,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
@@ -5633,6 +5758,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(4);
@@ -5720,6 +5849,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(8);
@@ -5817,11 +5950,15 @@ BTC is currently around $65,000 based on latest tool output."#
},
ack_reactions: true,
show_tool_calls: true,
session_store: None,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
tool_call_dedup_exempt: Arc::new(Vec::new()),
model_routes: Arc::new(Vec::new()),
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(8);
@@ -5921,6 +6058,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(8);
@@ -6002,6 +6143,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
@@ -6068,6 +6213,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
@@ -6692,6 +6841,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
@@ -6784,6 +6937,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
@@ -6876,6 +7033,10 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
@@ -7432,6 +7593,10 @@ This is an example JSON object for profile settings."#;
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
// Simulate a photo attachment message with [IMAGE:] marker.
@@ -7505,6 +7670,10 @@ This is an example JSON object for profile settings."#;
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&crate::config::AutonomyConfig::default(),
)),
});
process_channel_message(
+200
View File
@@ -0,0 +1,200 @@
//! JSONL-based session persistence for channel conversations.
//!
//! Each session (keyed by `channel_sender` or `channel_thread_sender`) is stored
//! as an append-only JSONL file in `{workspace}/sessions/`. Messages are appended
//! one-per-line as JSON, never modifying old lines. On daemon restart, sessions
//! are loaded from disk to restore conversation context.
use crate::providers::traits::ChatMessage;
use std::io::{BufRead, Write};
use std::path::{Path, PathBuf};
/// Append-only JSONL session store for channel conversations.
pub struct SessionStore {
sessions_dir: PathBuf,
}
impl SessionStore {
/// Create a new session store, ensuring the sessions directory exists.
pub fn new(workspace_dir: &Path) -> std::io::Result<Self> {
let sessions_dir = workspace_dir.join("sessions");
std::fs::create_dir_all(&sessions_dir)?;
Ok(Self { sessions_dir })
}
/// Compute the file path for a session key, sanitizing for filesystem safety.
fn session_path(&self, session_key: &str) -> PathBuf {
let safe_key: String = session_key
.chars()
.map(|c| {
if c.is_alphanumeric() || c == '_' || c == '-' {
c
} else {
'_'
}
})
.collect();
self.sessions_dir.join(format!("{safe_key}.jsonl"))
}
/// Load all messages for a session from its JSONL file.
/// Returns an empty vec if the file does not exist or is unreadable.
pub fn load(&self, session_key: &str) -> Vec<ChatMessage> {
let path = self.session_path(session_key);
let file = match std::fs::File::open(&path) {
Ok(f) => f,
Err(_) => return Vec::new(),
};
let reader = std::io::BufReader::new(file);
let mut messages = Vec::new();
for line in reader.lines() {
let Ok(line) = line else { continue };
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(msg) = serde_json::from_str::<ChatMessage>(trimmed) {
messages.push(msg);
}
}
messages
}
/// Append a single message to the session JSONL file.
pub fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
let path = self.session_path(session_key);
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)?;
let json = serde_json::to_string(message)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
writeln!(file, "{json}")?;
Ok(())
}
/// List all session keys that have files on disk.
pub fn list_sessions(&self) -> Vec<String> {
let entries = match std::fs::read_dir(&self.sessions_dir) {
Ok(e) => e,
Err(_) => return Vec::new(),
};
entries
.filter_map(|entry| {
let entry = entry.ok()?;
let name = entry.file_name().into_string().ok()?;
name.strip_suffix(".jsonl").map(String::from)
})
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn round_trip_append_and_load() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
store
.append("telegram_user123", &ChatMessage::user("hello"))
.unwrap();
store
.append("telegram_user123", &ChatMessage::assistant("hi there"))
.unwrap();
let messages = store.load("telegram_user123");
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].role, "user");
assert_eq!(messages[0].content, "hello");
assert_eq!(messages[1].role, "assistant");
assert_eq!(messages[1].content, "hi there");
}
#[test]
fn load_nonexistent_session_returns_empty() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
let messages = store.load("nonexistent");
assert!(messages.is_empty());
}
#[test]
fn key_sanitization() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
// Keys with special chars should be sanitized
store
.append("slack/thread:123/user", &ChatMessage::user("test"))
.unwrap();
let messages = store.load("slack/thread:123/user");
assert_eq!(messages.len(), 1);
}
#[test]
fn list_sessions_returns_keys() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
store
.append("telegram_alice", &ChatMessage::user("hi"))
.unwrap();
store
.append("discord_bob", &ChatMessage::user("hey"))
.unwrap();
let mut sessions = store.list_sessions();
sessions.sort();
assert_eq!(sessions.len(), 2);
assert!(sessions.contains(&"discord_bob".to_string()));
assert!(sessions.contains(&"telegram_alice".to_string()));
}
#[test]
fn append_is_truly_append_only() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
let key = "test_session";
store.append(key, &ChatMessage::user("msg1")).unwrap();
store.append(key, &ChatMessage::user("msg2")).unwrap();
// Read raw file to verify append-only format
let path = store.session_path(key);
let content = std::fs::read_to_string(&path).unwrap();
let lines: Vec<&str> = content.trim().lines().collect();
assert_eq!(lines.len(), 2);
}
#[test]
fn handles_corrupt_lines_gracefully() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
let key = "corrupt_test";
// Write valid message + corrupt line + valid message
let path = store.session_path(key);
std::fs::create_dir_all(path.parent().unwrap()).unwrap();
let mut file = std::fs::File::create(&path).unwrap();
writeln!(file, r#"{{"role":"user","content":"hello"}}"#).unwrap();
writeln!(file, "this is not valid json").unwrap();
writeln!(file, r#"{{"role":"assistant","content":"world"}}"#).unwrap();
let messages = store.load(key);
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].content, "hello");
assert_eq!(messages[1].content, "world");
}
}
+24
View File
@@ -791,6 +791,11 @@ pub struct AgentConfig {
/// Maximum conversation history messages retained per session. Default: `50`.
#[serde(default = "default_agent_max_history_messages")]
pub max_history_messages: usize,
/// Maximum estimated tokens for conversation history before compaction triggers.
/// Uses ~4 chars/token heuristic. When this threshold is exceeded, older messages
/// are summarized to preserve context while staying within budget. Default: `32000`.
#[serde(default = "default_agent_max_context_tokens")]
pub max_context_tokens: usize,
/// Enable parallel tool execution within a single iteration. Default: `false`.
#[serde(default)]
pub parallel_tools: bool,
@@ -817,6 +822,10 @@ fn default_agent_max_history_messages() -> usize {
50
}
fn default_agent_max_context_tokens() -> usize {
32_000
}
fn default_agent_tool_dispatcher() -> String {
"auto".into()
}
@@ -827,6 +836,7 @@ impl Default for AgentConfig {
compact_context: false,
max_tool_iterations: default_agent_max_tool_iterations(),
max_history_messages: default_agent_max_history_messages(),
max_context_tokens: default_agent_max_context_tokens(),
parallel_tools: false,
tool_dispatcher: default_agent_tool_dispatcher(),
tool_call_dedup_exempt: Vec::new(),
@@ -1413,6 +1423,10 @@ pub struct HttpRequestConfig {
/// Request timeout in seconds (default: 30)
#[serde(default = "default_http_timeout_secs")]
pub timeout_secs: u64,
/// Allow requests to private/LAN hosts (RFC 1918, loopback, link-local, .local).
/// Default: false (deny private hosts for SSRF protection).
#[serde(default)]
pub allow_private_hosts: bool,
}
impl Default for HttpRequestConfig {
@@ -1422,6 +1436,7 @@ impl Default for HttpRequestConfig {
allowed_domains: vec![],
max_response_size: default_http_max_response_size(),
timeout_secs: default_http_timeout_secs(),
allow_private_hosts: false,
}
}
}
@@ -3052,6 +3067,7 @@ impl<T: ChannelConfig> crate::config::traits::ConfigHandle for ConfigWrapper<T>
///
/// Each channel sub-section (e.g. `telegram`, `discord`) is optional;
/// setting it to `Some(...)` enables that channel.
#[allow(clippy::struct_excessive_bools)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct ChannelsConfig {
/// Enable the CLI interactive channel. Default: `true`.
@@ -3114,6 +3130,10 @@ pub struct ChannelsConfig {
/// not forwarded as individual channel messages. Default: `true`.
#[serde(default = "default_true")]
pub show_tool_calls: bool,
/// Persist channel conversation history to JSONL files so sessions survive
/// daemon restarts. Files are stored in `{workspace}/sessions/`. Default: `true`.
#[serde(default = "default_true")]
pub session_persistence: bool,
}
impl ChannelsConfig {
@@ -3248,6 +3268,7 @@ impl Default for ChannelsConfig {
message_timeout_secs: default_channel_message_timeout_secs(),
ack_reactions: true,
show_tool_calls: true,
session_persistence: true,
}
}
}
@@ -6269,6 +6290,7 @@ default_temperature = 0.7
message_timeout_secs: 300,
ack_reactions: true,
show_tool_calls: true,
session_persistence: true,
},
memory: MemoryConfig::default(),
storage: StorageConfig::default(),
@@ -6983,6 +7005,7 @@ allowed_users = ["@ops:matrix.org"]
message_timeout_secs: 300,
ack_reactions: true,
show_tool_calls: true,
session_persistence: true,
};
let toml_str = toml::to_string_pretty(&c).unwrap();
let parsed: ChannelsConfig = toml::from_str(&toml_str).unwrap();
@@ -7210,6 +7233,7 @@ channel_id = "C123"
message_timeout_secs: 300,
ack_reactions: true,
show_tool_calls: true,
session_persistence: true,
};
let toml_str = toml::to_string_pretty(&c).unwrap();
let parsed: ChannelsConfig = toml::from_str(&toml_str).unwrap();
+172 -21
View File
@@ -152,44 +152,122 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
crate::CronCommands::Add {
expression,
tz,
agent,
command,
} => {
let schedule = Schedule::Cron {
expr: expression,
tz,
};
let job = add_shell_job(config, None, schedule, &command)?;
println!("✅ Added cron job {}", job.id);
println!(" Expr: {}", job.expression);
println!(" Next: {}", job.next_run.to_rfc3339());
println!(" Cmd : {}", job.command);
if agent {
let job = add_agent_job(
config,
None,
schedule,
&command,
SessionTarget::Isolated,
None,
None,
false,
)?;
println!("✅ Added agent cron job {}", job.id);
println!(" Expr : {}", job.expression);
println!(" Next : {}", job.next_run.to_rfc3339());
println!(" Prompt: {}", job.prompt.as_deref().unwrap_or_default());
} else {
let job = add_shell_job(config, None, schedule, &command)?;
println!("✅ Added cron job {}", job.id);
println!(" Expr: {}", job.expression);
println!(" Next: {}", job.next_run.to_rfc3339());
println!(" Cmd : {}", job.command);
}
Ok(())
}
crate::CronCommands::AddAt { at, command } => {
crate::CronCommands::AddAt { at, agent, command } => {
let at = chrono::DateTime::parse_from_rfc3339(&at)
.map_err(|e| anyhow::anyhow!("Invalid RFC3339 timestamp for --at: {e}"))?
.with_timezone(&chrono::Utc);
let schedule = Schedule::At { at };
let job = add_shell_job(config, None, schedule, &command)?;
println!("✅ Added one-shot cron job {}", job.id);
println!(" At : {}", job.next_run.to_rfc3339());
println!(" Cmd : {}", job.command);
if agent {
let job = add_agent_job(
config,
None,
schedule,
&command,
SessionTarget::Isolated,
None,
None,
true,
)?;
println!("✅ Added one-shot agent cron job {}", job.id);
println!(" At : {}", job.next_run.to_rfc3339());
println!(" Prompt: {}", job.prompt.as_deref().unwrap_or_default());
} else {
let job = add_shell_job(config, None, schedule, &command)?;
println!("✅ Added one-shot cron job {}", job.id);
println!(" At : {}", job.next_run.to_rfc3339());
println!(" Cmd : {}", job.command);
}
Ok(())
}
crate::CronCommands::AddEvery { every_ms, command } => {
crate::CronCommands::AddEvery {
every_ms,
agent,
command,
} => {
let schedule = Schedule::Every { every_ms };
let job = add_shell_job(config, None, schedule, &command)?;
println!("✅ Added interval cron job {}", job.id);
println!(" Every(ms): {every_ms}");
println!(" Next : {}", job.next_run.to_rfc3339());
println!(" Cmd : {}", job.command);
if agent {
let job = add_agent_job(
config,
None,
schedule,
&command,
SessionTarget::Isolated,
None,
None,
false,
)?;
println!("✅ Added interval agent cron job {}", job.id);
println!(" Every(ms): {every_ms}");
println!(" Next : {}", job.next_run.to_rfc3339());
println!(" Prompt : {}", job.prompt.as_deref().unwrap_or_default());
} else {
let job = add_shell_job(config, None, schedule, &command)?;
println!("✅ Added interval cron job {}", job.id);
println!(" Every(ms): {every_ms}");
println!(" Next : {}", job.next_run.to_rfc3339());
println!(" Cmd : {}", job.command);
}
Ok(())
}
crate::CronCommands::Once { delay, command } => {
let job = add_once(config, &delay, &command)?;
println!("✅ Added one-shot cron job {}", job.id);
println!(" At : {}", job.next_run.to_rfc3339());
println!(" Cmd : {}", job.command);
crate::CronCommands::Once {
delay,
agent,
command,
} => {
if agent {
let duration = parse_delay(&delay)?;
let at = chrono::Utc::now() + duration;
let schedule = Schedule::At { at };
let job = add_agent_job(
config,
None,
schedule,
&command,
SessionTarget::Isolated,
None,
None,
true,
)?;
println!("✅ Added one-shot agent cron job {}", job.id);
println!(" At : {}", job.next_run.to_rfc3339());
println!(" Prompt: {}", job.prompt.as_deref().unwrap_or_default());
} else {
let job = add_once(config, &delay, &command)?;
println!("✅ Added one-shot cron job {}", job.id);
println!(" At : {}", job.next_run.to_rfc3339());
println!(" Cmd : {}", job.command);
}
Ok(())
}
crate::CronCommands::Update {
@@ -686,4 +764,77 @@ mod tests {
.to_string()
.contains("blocked by security policy"));
}
#[test]
fn cli_agent_flag_creates_agent_job() {
let tmp = TempDir::new().unwrap();
let config = test_config(&tmp);
handle_command(
crate::CronCommands::Add {
expression: "*/15 * * * *".into(),
tz: None,
agent: true,
command: "Check server health: disk space, memory, CPU load".into(),
},
&config,
)
.unwrap();
let jobs = list_jobs(&config).unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].job_type, JobType::Agent);
assert_eq!(
jobs[0].prompt.as_deref(),
Some("Check server health: disk space, memory, CPU load")
);
}
#[test]
fn cli_agent_flag_bypasses_shell_security_validation() {
let tmp = TempDir::new().unwrap();
let mut config = test_config(&tmp);
config.autonomy.allowed_commands = vec!["echo".into()];
config.autonomy.level = crate::security::AutonomyLevel::Supervised;
// Without --agent, a natural language string would be blocked by shell
// security policy. With --agent, it routes to agent job and skips
// shell validation entirely.
let result = handle_command(
crate::CronCommands::Add {
expression: "*/15 * * * *".into(),
tz: None,
agent: true,
command: "Check server health: disk space, memory, CPU load".into(),
},
&config,
);
assert!(result.is_ok());
let jobs = list_jobs(&config).unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].job_type, JobType::Agent);
}
#[test]
fn cli_without_agent_flag_defaults_to_shell_job() {
let tmp = TempDir::new().unwrap();
let config = test_config(&tmp);
handle_command(
crate::CronCommands::Add {
expression: "*/5 * * * *".into(),
tz: None,
agent: false,
command: "echo ok".into(),
},
&config,
)
.unwrap();
let jobs = list_jobs(&config).unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].job_type, JobType::Shell);
assert_eq!(jobs[0].command, "echo ok");
}
}
+19 -6
View File
@@ -280,15 +280,19 @@ Times are evaluated in UTC by default; use --tz with an IANA \
timezone name to override.
Examples:
zeroclaw cron add '0 9 * * 1-5' 'Good morning' --tz America/New_York
zeroclaw cron add '*/30 * * * *' 'Check system health'")]
zeroclaw cron add '0 9 * * 1-5' 'Good morning' --tz America/New_York --agent
zeroclaw cron add '*/30 * * * *' 'Check system health' --agent
zeroclaw cron add '*/5 * * * *' 'echo ok'")]
Add {
/// Cron expression
expression: String,
/// Optional IANA timezone (e.g. America/Los_Angeles)
#[arg(long)]
tz: Option<String>,
/// Command to run
/// Treat the argument as an agent prompt instead of a shell command
#[arg(long)]
agent: bool,
/// Command (shell) or prompt (agent) to run
command: String,
},
/// Add a one-shot scheduled task at an RFC3339 timestamp
@@ -303,7 +307,10 @@ Examples:
AddAt {
/// One-shot timestamp in RFC3339 format
at: String,
/// Command to run
/// Treat the argument as an agent prompt instead of a shell command
#[arg(long)]
agent: bool,
/// Command (shell) or prompt (agent) to run
command: String,
},
/// Add a fixed-interval scheduled task
@@ -318,7 +325,10 @@ Examples:
AddEvery {
/// Interval in milliseconds
every_ms: u64,
/// Command to run
/// Treat the argument as an agent prompt instead of a shell command
#[arg(long)]
agent: bool,
/// Command (shell) or prompt (agent) to run
command: String,
},
/// Add a one-shot delayed task (e.g. "30m", "2h", "1d")
@@ -335,7 +345,10 @@ Examples:
Once {
/// Delay duration
delay: String,
/// Command to run
/// Treat the argument as an agent prompt instead of a shell command
#[arg(long)]
agent: bool,
/// Command (shell) or prompt (agent) to run
command: String,
},
/// Remove a scheduled task
+5 -4
View File
@@ -325,11 +325,12 @@ override with --tz and an IANA timezone name.
Examples:
zeroclaw cron list
zeroclaw cron add '0 9 * * 1-5' 'Good morning' --tz America/New_York
zeroclaw cron add '*/30 * * * *' 'Check system health'
zeroclaw cron add-at 2025-01-15T14:00:00Z 'Send reminder'
zeroclaw cron add '0 9 * * 1-5' 'Good morning' --tz America/New_York --agent
zeroclaw cron add '*/30 * * * *' 'Check system health' --agent
zeroclaw cron add '*/5 * * * *' 'echo ok'
zeroclaw cron add-at 2025-01-15T14:00:00Z 'Send reminder' --agent
zeroclaw cron add-every 60000 'Ping heartbeat'
zeroclaw cron once 30m 'Run backup in 30 minutes'
zeroclaw cron once 30m 'Run backup in 30 minutes' --agent
zeroclaw cron pause <task-id>
zeroclaw cron update <task-id> --expression '0 8 * * *' --tz Europe/London")]
Cron {
+175
View File
@@ -0,0 +1,175 @@
//! LLM-driven memory consolidation.
//!
//! After each conversation turn, extracts structured information:
//! - `history_entry`: A timestamped summary for the daily conversation log.
//! - `memory_update`: New facts, preferences, or decisions worth remembering
//! long-term (or `null` if nothing new was learned).
//!
//! This two-phase approach replaces the naive raw-message auto-save with
//! semantic extraction, similar to Nanobot's `save_memory` tool call pattern.
use crate::memory::traits::{Memory, MemoryCategory};
use crate::providers::traits::Provider;
/// Output of consolidation extraction.
#[derive(Debug, serde::Deserialize)]
pub struct ConsolidationResult {
/// Brief timestamped summary for the conversation history log.
pub history_entry: String,
/// New facts/preferences/decisions to store long-term, or None.
pub memory_update: Option<String>,
}
const CONSOLIDATION_SYSTEM_PROMPT: &str = r#"You are a memory consolidation engine. Given a conversation turn, extract:
1. "history_entry": A brief summary of what happened in this turn (1-2 sentences). Include the key topic or action.
2. "memory_update": Any NEW facts, preferences, decisions, or commitments worth remembering long-term. Return null if nothing new was learned.
Respond ONLY with valid JSON: {"history_entry": "...", "memory_update": "..." or null}
Do not include any text outside the JSON object."#;
/// Run two-phase LLM-driven consolidation on a conversation turn.
///
/// Phase 1: Write a history entry to the Daily memory category.
/// Phase 2: Write a memory update to the Core category (if the LLM identified new facts).
///
/// This function is designed to be called fire-and-forget via `tokio::spawn`.
pub async fn consolidate_turn(
provider: &dyn Provider,
model: &str,
memory: &dyn Memory,
user_message: &str,
assistant_response: &str,
) -> anyhow::Result<()> {
let turn_text = format!("User: {user_message}\nAssistant: {assistant_response}");
// Truncate very long turns to avoid wasting tokens on consolidation.
// Use char-boundary-safe slicing to prevent panic on multi-byte UTF-8 (e.g. CJK text).
let truncated = if turn_text.len() > 4000 {
let mut end = 4000;
while end > 0 && !turn_text.is_char_boundary(end) {
end -= 1;
}
format!("{}", &turn_text[..end])
} else {
turn_text.clone()
};
let raw = provider
.chat_with_system(Some(CONSOLIDATION_SYSTEM_PROMPT), &truncated, model, 0.1)
.await?;
let result: ConsolidationResult = parse_consolidation_response(&raw, &turn_text);
// Phase 1: Write history entry to Daily category.
let date = chrono::Local::now().format("%Y-%m-%d").to_string();
let history_key = format!("daily_{date}_{}", uuid::Uuid::new_v4());
memory
.store(
&history_key,
&result.history_entry,
MemoryCategory::Daily,
None,
)
.await?;
// Phase 2: Write memory update to Core category (if present).
if let Some(ref update) = result.memory_update {
if !update.trim().is_empty() {
let mem_key = format!("core_{}", uuid::Uuid::new_v4());
memory
.store(&mem_key, update, MemoryCategory::Core, None)
.await?;
}
}
Ok(())
}
/// Parse the LLM's consolidation response, with fallback for malformed JSON.
fn parse_consolidation_response(raw: &str, fallback_text: &str) -> ConsolidationResult {
// Try to extract JSON from the response (LLM may wrap in markdown code blocks).
let cleaned = raw
.trim()
.trim_start_matches("```json")
.trim_start_matches("```")
.trim_end_matches("```")
.trim();
serde_json::from_str(cleaned).unwrap_or_else(|_| {
// Fallback: use truncated turn text as history entry.
// Use char-boundary-safe slicing to prevent panic on multi-byte UTF-8.
let summary = if fallback_text.len() > 200 {
let mut end = 200;
while end > 0 && !fallback_text.is_char_boundary(end) {
end -= 1;
}
format!("{}", &fallback_text[..end])
} else {
fallback_text.to_string()
};
ConsolidationResult {
history_entry: summary,
memory_update: None,
}
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_valid_json_response() {
let raw = r#"{"history_entry": "User asked about Rust.", "memory_update": "User prefers Rust over Go."}"#;
let result = parse_consolidation_response(raw, "fallback");
assert_eq!(result.history_entry, "User asked about Rust.");
assert_eq!(
result.memory_update.as_deref(),
Some("User prefers Rust over Go.")
);
}
#[test]
fn parse_json_with_null_memory() {
let raw = r#"{"history_entry": "Routine greeting.", "memory_update": null}"#;
let result = parse_consolidation_response(raw, "fallback");
assert_eq!(result.history_entry, "Routine greeting.");
assert!(result.memory_update.is_none());
}
#[test]
fn parse_json_wrapped_in_code_block() {
let raw =
"```json\n{\"history_entry\": \"Discussed deployment.\", \"memory_update\": null}\n```";
let result = parse_consolidation_response(raw, "fallback");
assert_eq!(result.history_entry, "Discussed deployment.");
}
#[test]
fn fallback_on_malformed_response() {
let raw = "I'm sorry, I can't do that.";
let result = parse_consolidation_response(raw, "User: hello\nAssistant: hi");
assert_eq!(result.history_entry, "User: hello\nAssistant: hi");
assert!(result.memory_update.is_none());
}
#[test]
fn fallback_truncates_long_text() {
let long_text = "x".repeat(500);
let result = parse_consolidation_response("invalid", &long_text);
// 200 bytes + "…" (3 bytes in UTF-8) = 203
assert!(result.history_entry.len() <= 203);
}
#[test]
fn fallback_truncates_cjk_text_without_panic() {
// Each CJK character is 3 bytes in UTF-8; byte index 200 may land
// inside a character. This must not panic.
let cjk_text = "二手书项目".repeat(50); // 250 chars = 750 bytes
let result = parse_consolidation_response("invalid", &cjk_text);
assert!(result
.history_entry
.is_char_boundary(result.history_entry.len()));
assert!(result.history_entry.ends_with('…'));
}
}
+1
View File
@@ -1,6 +1,7 @@
pub mod backend;
pub mod chunker;
pub mod cli;
pub mod consolidation;
pub mod embeddings;
pub mod hygiene;
pub mod lucid;
+18 -1
View File
@@ -4,6 +4,7 @@ use crate::multimodal;
use crate::providers::traits::{ChatMessage, Provider, ProviderCapabilities};
use crate::providers::ProviderRuntimeOptions;
use async_trait::async_trait;
use futures_util::StreamExt;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -472,8 +473,24 @@ fn extract_stream_error_message(event: &Value) -> Option<String> {
None
}
/// Read the response body incrementally via `bytes_stream()` to avoid
/// buffering the entire SSE payload in memory. The previous implementation
/// used `response.text().await?` which holds the HTTP connection open until
/// every byte has arrived — on high-latency links the long-lived connection
/// often drops mid-read, producing the "error decoding response body" failure
/// reported in #3544.
async fn decode_responses_body(response: reqwest::Response) -> anyhow::Result<String> {
let body = response.text().await?;
let mut body = String::new();
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
let bytes = chunk
.map_err(|err| anyhow::anyhow!("error reading OpenAI Codex response stream: {err}"))?;
let text = std::str::from_utf8(&bytes).map_err(|err| {
anyhow::anyhow!("OpenAI Codex response contained invalid UTF-8: {err}")
})?;
body.push_str(text);
}
if let Some(text) = parse_sse_text(&body)? {
return Ok(text);
+16 -1
View File
@@ -67,7 +67,13 @@ pub fn redact(value: &str) -> String {
if value.len() <= 4 {
"***".to_string()
} else {
format!("{}***", &value[..4])
// Use char-boundary-safe slicing to prevent panic on multi-byte UTF-8.
let prefix = value
.char_indices()
.nth(4)
.map(|(byte_idx, _)| &value[..byte_idx])
.unwrap_or(value);
format!("{}***", prefix)
}
}
@@ -102,4 +108,13 @@ mod tests {
assert_eq!(redact(""), "***");
assert_eq!(redact("12345"), "1234***");
}
#[test]
fn redact_handles_multibyte_utf8_without_panic() {
// CJK characters are 3 bytes each; slicing at byte 4 would panic
// without char-boundary-safe handling.
let result = redact("密码是很长的秘密");
assert!(result.ends_with("***"));
assert!(result.is_char_boundary(result.len()));
}
}
+144 -3
View File
@@ -793,6 +793,8 @@ impl SecurityPolicy {
// 1. Allowlist check (is the base command permitted at all?)
// 2. Risk classification (high / medium / low)
// 3. Policy flags (block_high_risk_commands, require_approval_for_medium_risk)
// — explicit allowlist entries exempt a command from the high-risk block,
// but the wildcard "*" does NOT grant an exemption.
// 4. Autonomy level × approval status (supervised requires explicit approval)
// This ordering ensures deny-by-default: unknown commands are rejected
// before any risk or autonomy logic runs.
@@ -810,7 +812,7 @@ impl SecurityPolicy {
let risk = self.command_risk_level(command);
if risk == CommandRiskLevel::High {
if self.block_high_risk_commands {
if self.block_high_risk_commands && !self.is_command_explicitly_allowed(command) {
return Err("Command blocked: high-risk command is disallowed by policy".into());
}
if self.autonomy == AutonomyLevel::Supervised && !approved {
@@ -834,6 +836,48 @@ impl SecurityPolicy {
Ok(risk)
}
/// Check whether **every** segment of a command is explicitly listed in
/// `allowed_commands` — i.e., matched by a concrete entry rather than by
/// the wildcard `"*"`.
///
/// This is used to exempt explicitly-allowlisted high-risk commands from
/// the `block_high_risk_commands` gate. The wildcard entry intentionally
/// does **not** qualify as an explicit allowlist match, so that operators
/// who set `allowed_commands = ["*"]` still get the high-risk safety net.
fn is_command_explicitly_allowed(&self, command: &str) -> bool {
let segments = split_unquoted_segments(command);
for segment in &segments {
let cmd_part = skip_env_assignments(segment);
let mut words = cmd_part.split_whitespace();
let executable = strip_wrapping_quotes(words.next().unwrap_or("")).trim();
let base_cmd_owned = command_basename(executable).to_ascii_lowercase();
let base_cmd = strip_windows_exe_suffix(&base_cmd_owned);
if base_cmd.is_empty() {
continue;
}
let explicitly_listed = self.allowed_commands.iter().any(|allowed| {
let allowed = strip_wrapping_quotes(allowed).trim();
// Skip wildcard — it does not count as an explicit entry.
if allowed.is_empty() || allowed == "*" {
return false;
}
is_allowlist_entry_match(allowed, executable, base_cmd)
});
if !explicitly_listed {
return false;
}
}
// At least one real command must be present.
segments.iter().any(|s| {
let s = skip_env_assignments(s.trim());
s.split_whitespace().next().is_some_and(|w| !w.is_empty())
})
}
// ── Layered Command Allowlist ──────────────────────────────────────────
// Defence-in-depth: five independent gates run in order before the
// per-segment allowlist check. Each gate targets a specific bypass
@@ -1503,10 +1547,13 @@ mod tests {
}
#[test]
fn validate_command_blocks_high_risk_by_default() {
fn validate_command_blocks_high_risk_via_wildcard() {
// Wildcard allows the command through is_command_allowed, but
// block_high_risk_commands still rejects it because "*" does not
// count as an explicit allowlist entry.
let p = SecurityPolicy {
autonomy: AutonomyLevel::Supervised,
allowed_commands: vec!["rm".into()],
allowed_commands: vec!["*".into()],
..SecurityPolicy::default()
};
@@ -1515,6 +1562,100 @@ mod tests {
assert!(result.unwrap_err().contains("high-risk"));
}
#[test]
fn validate_command_allows_explicitly_listed_high_risk() {
// When a high-risk command is explicitly in allowed_commands, the
// block_high_risk_commands gate is bypassed — the operator has made
// a deliberate decision to permit it.
let p = SecurityPolicy {
autonomy: AutonomyLevel::Full,
allowed_commands: vec!["curl".into()],
block_high_risk_commands: true,
..SecurityPolicy::default()
};
let result = p.validate_command_execution("curl https://api.example.com/data", true);
assert_eq!(result.unwrap(), CommandRiskLevel::High);
}
#[test]
fn validate_command_allows_wget_when_explicitly_listed() {
let p = SecurityPolicy {
autonomy: AutonomyLevel::Full,
allowed_commands: vec!["wget".into()],
block_high_risk_commands: true,
..SecurityPolicy::default()
};
let result =
p.validate_command_execution("wget https://releases.example.com/v1.tar.gz", true);
assert_eq!(result.unwrap(), CommandRiskLevel::High);
}
#[test]
fn validate_command_blocks_non_listed_high_risk_when_another_is_allowed() {
// Allowing curl explicitly should not exempt wget.
let p = SecurityPolicy {
autonomy: AutonomyLevel::Full,
allowed_commands: vec!["curl".into()],
block_high_risk_commands: true,
..SecurityPolicy::default()
};
let result = p.validate_command_execution("wget https://evil.com", true);
assert!(result.is_err());
assert!(result.unwrap_err().contains("not allowed"));
}
#[test]
fn validate_command_explicit_rm_bypasses_high_risk_block() {
// Operator explicitly listed "rm" — they accept the risk.
let p = SecurityPolicy {
autonomy: AutonomyLevel::Full,
allowed_commands: vec!["rm".into()],
block_high_risk_commands: true,
..SecurityPolicy::default()
};
let result = p.validate_command_execution("rm -rf /tmp/test", true);
assert_eq!(result.unwrap(), CommandRiskLevel::High);
}
#[test]
fn validate_command_high_risk_still_needs_approval_in_supervised() {
// Even when explicitly allowed, supervised mode still requires
// approval for high-risk commands (the approval gate is separate
// from the block gate).
let p = SecurityPolicy {
autonomy: AutonomyLevel::Supervised,
allowed_commands: vec!["curl".into()],
block_high_risk_commands: true,
..SecurityPolicy::default()
};
let denied = p.validate_command_execution("curl https://api.example.com", false);
assert!(denied.is_err());
assert!(denied.unwrap_err().contains("requires explicit approval"));
let allowed = p.validate_command_execution("curl https://api.example.com", true);
assert_eq!(allowed.unwrap(), CommandRiskLevel::High);
}
#[test]
fn validate_command_pipe_needs_all_segments_explicitly_allowed() {
// When a pipeline contains a high-risk command, every segment
// must be explicitly allowed for the exemption to apply.
let p = SecurityPolicy {
autonomy: AutonomyLevel::Full,
allowed_commands: vec!["curl".into(), "grep".into()],
block_high_risk_commands: true,
..SecurityPolicy::default()
};
let result = p.validate_command_execution("curl https://api.example.com | grep data", true);
assert_eq!(result.unwrap(), CommandRiskLevel::High);
}
#[test]
fn validate_command_full_mode_skips_medium_risk_approval_gate() {
let p = SecurityPolicy {
+84 -4
View File
@@ -12,6 +12,7 @@ pub struct HttpRequestTool {
allowed_domains: Vec<String>,
max_response_size: usize,
timeout_secs: u64,
allow_private_hosts: bool,
}
impl HttpRequestTool {
@@ -20,12 +21,14 @@ impl HttpRequestTool {
allowed_domains: Vec<String>,
max_response_size: usize,
timeout_secs: u64,
allow_private_hosts: bool,
) -> Self {
Self {
security,
allowed_domains: normalize_allowed_domains(allowed_domains),
max_response_size,
timeout_secs,
allow_private_hosts,
}
}
@@ -52,7 +55,7 @@ impl HttpRequestTool {
let host = extract_host(url)?;
if is_private_or_local_host(&host) {
if !self.allow_private_hosts && is_private_or_local_host(&host) {
anyhow::bail!("Blocked local/private host: {host}");
}
@@ -454,6 +457,13 @@ mod tests {
use crate::security::{AutonomyLevel, SecurityPolicy};
fn test_tool(allowed_domains: Vec<&str>) -> HttpRequestTool {
test_tool_with_private(allowed_domains, false)
}
fn test_tool_with_private(
allowed_domains: Vec<&str>,
allow_private_hosts: bool,
) -> HttpRequestTool {
let security = Arc::new(SecurityPolicy {
autonomy: AutonomyLevel::Supervised,
..SecurityPolicy::default()
@@ -463,6 +473,7 @@ mod tests {
allowed_domains.into_iter().map(String::from).collect(),
1_000_000,
30,
allow_private_hosts,
)
}
@@ -570,7 +581,7 @@ mod tests {
#[test]
fn validate_requires_allowlist() {
let security = Arc::new(SecurityPolicy::default());
let tool = HttpRequestTool::new(security, vec![], 1_000_000, 30);
let tool = HttpRequestTool::new(security, vec![], 1_000_000, 30, false);
let err = tool
.validate_url("https://example.com")
.unwrap_err()
@@ -686,7 +697,7 @@ mod tests {
autonomy: AutonomyLevel::ReadOnly,
..SecurityPolicy::default()
});
let tool = HttpRequestTool::new(security, vec!["example.com".into()], 1_000_000, 30);
let tool = HttpRequestTool::new(security, vec!["example.com".into()], 1_000_000, 30, false);
let result = tool
.execute(json!({"url": "https://example.com"}))
.await
@@ -701,7 +712,7 @@ mod tests {
max_actions_per_hour: 0,
..SecurityPolicy::default()
});
let tool = HttpRequestTool::new(security, vec!["example.com".into()], 1_000_000, 30);
let tool = HttpRequestTool::new(security, vec!["example.com".into()], 1_000_000, 30, false);
let result = tool
.execute(json!({"url": "https://example.com"}))
.await
@@ -724,6 +735,7 @@ mod tests {
vec!["example.com".into()],
10,
30,
false,
);
let text = "hello world this is long";
let truncated = tool.truncate_response(text);
@@ -738,6 +750,7 @@ mod tests {
vec!["example.com".into()],
0, // max_response_size = 0 means no limit
30,
false,
);
let text = "a".repeat(10_000_000);
assert_eq!(tool.truncate_response(&text), text);
@@ -750,6 +763,7 @@ mod tests {
vec!["example.com".into()],
5,
30,
false,
);
let text = "hello world";
let truncated = tool.truncate_response(text);
@@ -935,4 +949,70 @@ mod tests {
.to_string();
assert!(err.contains("IPv6"));
}
// ── allow_private_hosts opt-in tests ────────────────────────
#[test]
fn default_blocks_private_hosts() {
let tool = test_tool(vec!["localhost", "192.168.1.5", "*"]);
assert!(tool
.validate_url("https://localhost:8080")
.unwrap_err()
.to_string()
.contains("local/private"));
assert!(tool
.validate_url("https://192.168.1.5")
.unwrap_err()
.to_string()
.contains("local/private"));
assert!(tool
.validate_url("https://10.0.0.1")
.unwrap_err()
.to_string()
.contains("local/private"));
}
#[test]
fn allow_private_hosts_permits_localhost() {
let tool = test_tool_with_private(vec!["localhost"], true);
assert!(tool.validate_url("https://localhost:8080").is_ok());
}
#[test]
fn allow_private_hosts_permits_private_ipv4() {
let tool = test_tool_with_private(vec!["192.168.1.5"], true);
assert!(tool.validate_url("https://192.168.1.5").is_ok());
}
#[test]
fn allow_private_hosts_permits_rfc1918_with_wildcard() {
let tool = test_tool_with_private(vec!["*"], true);
assert!(tool.validate_url("https://10.0.0.1").is_ok());
assert!(tool.validate_url("https://172.16.0.1").is_ok());
assert!(tool.validate_url("https://192.168.1.1").is_ok());
assert!(tool.validate_url("http://localhost:8123").is_ok());
}
#[test]
fn allow_private_hosts_still_requires_allowlist() {
let tool = test_tool_with_private(vec!["example.com"], true);
let err = tool
.validate_url("https://192.168.1.5")
.unwrap_err()
.to_string();
assert!(
err.contains("allowed_domains"),
"Private host should still need allowlist match, got: {err}"
);
}
#[test]
fn allow_private_hosts_false_still_blocks() {
let tool = test_tool_with_private(vec!["*"], false);
assert!(tool
.validate_url("https://localhost:8080")
.unwrap_err()
.to_string()
.contains("local/private"));
}
}
+1
View File
@@ -314,6 +314,7 @@ pub fn all_tools_with_runtime(
http_config.allowed_domains.clone(),
http_config.max_response_size,
http_config.timeout_secs,
http_config.allow_private_hosts,
)));
}