Compare commits

...

16 Commits

Author SHA1 Message Date
Argenis e387f58579 Merge pull request #3951 from Alix-007/issue-3946-release-memory-postgres
build(release): include memory-postgres in published artifacts
2026-03-19 14:42:15 -04:00
Argenis fa06798926 fix(cron): persist allowed_tools for agent jobs (#3993)
Persist allowed_tools in cron_jobs table, threading it through CLI add/update and cron_add/cron_update tool APIs. Add regression coverage for store, tool, and CLI roundtrip paths.

Fixups over original PR #3929: add allowed_tools to all_overdue_jobs SELECT (merge gap), resolve merge conflicts.

Closes #3920
Supersedes #3929
2026-03-19 14:37:55 -04:00
Alix-007 a4cd4b287e feat(slack): add thread_replies channel option (#3930)
Add a thread_replies option to Slack channel config (default true). When false, replies go to channel root instead of the originating thread.

Closes #3888
2026-03-19 14:32:02 -04:00
argenis de la rosa 3ce7f2345e merge: resolve conflicts with master, include channel-lark in RELEASE_CARGO_FEATURES
Add channel-lark (merged to master separately) to RELEASE_CARGO_FEATURES
env var. Keep the DRY env-var approach and remove stale Docker build-args.
2026-03-19 14:24:30 -04:00
Argenis eb9dfc04b4 fix(anthropic): always apply cache_control to system prompts (#3990)
* fix: always use Blocks format for system prompts with cache_control

System prompts under 3KB were wrapped in SystemPrompt::String which
cannot carry cache_control headers, resulting in 0% cache hit rate
on Haiku 4.5. Always use SystemPrompt::Blocks with ephemeral
cache_control regardless of prompt size.

Fixes #3977

* fix: lower conversation caching threshold from >4 to >1 messages

The previous threshold of >4 non-system messages was too restrictive,
delaying cache benefits until 5+ turns. Lower to >1 so caching kicks
in after the first user+assistant exchange.

Fixes #3977

* test: update anthropic cache tests for new thresholds and Blocks format

- convert_messages_small_system_prompt now expects Blocks with
  cache_control instead of String variant
- should_cache_conversation tests updated for >1 threshold
- backward_compatibility test replaced with blocks-system test
2026-03-19 14:21:45 -04:00
Argenis 9cc74a2698 fix(security): wire sandbox into shell command execution (#3989)
* fix: add sandbox field to ShellTool struct

Add `sandbox: Arc<dyn Sandbox>` field to `ShellTool` and a
`new_with_sandbox()` constructor so callers can inject the configured
sandbox backend. The existing `new()` constructor defaults to
`NoopSandbox` for backward compatibility.

Ref: #3983

* fix: apply sandbox wrapping in ShellTool::execute()

Call `self.sandbox.wrap_command()` on the underlying std::process::Command
(via `as_std_mut()`) after building the shell command and before clearing
the environment. This ensures every shell command passes through the
configured sandbox backend before execution.

Ref: #3983

* fix: wire up sandbox creation at ShellTool callsites

In `all_tools_with_runtime()`, create a sandbox from
`root_config.security` via `create_sandbox()` and pass it to
`ShellTool::new_with_sandbox()`. The `default_tools_with_runtime()`
path retains `ShellTool::new()` which defaults to `NoopSandbox`.

Ref: #3983

* test: add sandbox integration tests for ShellTool

Verify that ShellTool can be constructed with a sandbox via
`new_with_sandbox()`, that NoopSandbox leaves commands unmodified,
and that command execution works end-to-end with a sandbox attached.

Ref: #3983
2026-03-19 14:21:42 -04:00
Argenis 133dc46b41 fix(web): restore accidentally deleted logo file (#3988)
* fix: restore accidentally deleted logo file

The logo.png was removed in commit 48bdbde2 but is still referenced
by the web UI components. Restore it from git history.

Fixes #3984

* fix: copy logo to web/dist for rust-embed

The Rust binary embeds files from web/dist/ via rust-embed, so the
logo must also be present there to be served without a rebuild.

Fixes #3984
2026-03-19 14:21:15 -04:00
Argenis ad03605cad Merge pull request #3949 from Alix-007/issue-3817-cron-delivery-context
fix: default cron delivery to the active channel context
2026-03-19 14:20:59 -04:00
Argenis ae1acf9b9c Merge pull request #3950 from Alix-007/issue-3466-homebrew-service-workspace
fix(onboard): warn when Homebrew services use another workspace
2026-03-19 14:20:56 -04:00
Alix-007 cc91f22e9b fix(skills): narrow shell shebang detection (#3944)
Co-authored-by: Alix-007 <267018309+Alix-007@users.noreply.github.com>
2026-03-19 14:10:51 -04:00
Martin 030f5fe288 fix(install): fix guided installer in LXC/container environments (#3947)
Replace subshell-based /dev/stdin probing in guided_input_stream with a
file-descriptor approach (guided_open_input) that works reliably in LXC
containers accessed over SSH.

The previous implementation probed /dev/stdin and /proc/self/fd/0 via
subshells before falling back to /dev/tty. In LXC containers these
probes fail even when FD 0 is perfectly usable, causing the guided
installer to exit with "requires an interactive terminal".

The fix:
- When stdin is a terminal (-t 0), assign GUIDED_FD=0 directly without
  any subshell probing — trusting the kernel's own tty check
- Otherwise, open /dev/tty as an explicit fd (exec {GUIDED_FD}</dev/tty)
- guided_read uses `read -u "$GUIDED_FD"` instead of `< "$file_path"`
- Add echo after silent reads (password prompts) for correct line handling

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 14:10:48 -04:00
Giulio V c47bbcc972 fix(cron): add startup catch-up and drop login shell flag (#3948)
* fix(cron): add startup catch-up and drop login shell flag

Problems:
1. When ZeroClaw started after downtime (late boot, daemon restart),
   overdue jobs were picked up via `due_jobs()` but limited by
   `max_tasks` per poll cycle — with many overdue jobs, catch-up
   could take many cycles.
2. Cron shell jobs used `sh -lc` (login shell), which loads the
   full user profile on every execution — slow and may cause
   unexpected side effects.

Fixes:
- Add `all_overdue_jobs()` store query without `max_tasks` limit
- Add `catch_up_overdue_jobs()` startup phase that runs ALL overdue
  jobs once before entering the normal polling loop
- Extract `build_cron_shell_command()` helper using `sh -c` (non-login)
- Add structured tracing for catch-up progress
- Add tests for all new functions

* feat(cron): make catch-up configurable via API and control panel

Add `catch_up_on_startup` boolean to `[cron]` config (default: true).
When enabled, the scheduler runs all overdue jobs at startup before
entering the normal polling loop. Users can toggle this from:

- The Cron page toggle switch in the control panel
- PATCH /api/cron/settings { "catch_up_on_startup": false }
- The `[cron]` section of the TOML config editor

Also adds GET /api/cron/settings endpoint to read cron subsystem
settings without parsing the full config.

* fix(config): add catch_up_on_startup to CronConfig test constructors

The CI Lint job failed because the `cron_config_serde_roundtrip` test
constructs CronConfig directly and was missing the new field.
2026-03-19 14:10:37 -04:00
Alix-007 0d28cca843 build(release): drop stale docker feature args 2026-03-19 19:14:07 +08:00
Alix-007 7ddd2aace3 build(release): ship postgres-capable release artifacts 2026-03-19 15:37:45 +08:00
Alix-007 c7b3b762e0 fix(onboard): warn when Homebrew service uses another workspace 2026-03-19 15:30:40 +08:00
Alix-007 4b00e8ba75 fix(cron): default channel delivery to active reply target 2026-03-19 15:11:47 +08:00
25 changed files with 1363 additions and 117 deletions
+2 -3
View File
@@ -16,6 +16,7 @@ env:
CARGO_TERM_COLOR: always
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
RELEASE_CARGO_FEATURES: channel-matrix,channel-lark,memory-postgres
jobs:
version:
@@ -213,7 +214,7 @@ jobs:
if [ -n "${{ matrix.linker_env || '' }}" ] && [ -n "${{ matrix.linker || '' }}" ]; then
export "${{ matrix.linker_env }}=${{ matrix.linker }}"
fi
cargo build --release --locked --features channel-matrix,channel-lark,memory-postgres --target ${{ matrix.target }}
cargo build --release --locked --features "${{ env.RELEASE_CARGO_FEATURES }}" --target ${{ matrix.target }}
- name: Package (Unix)
if: runner.os != 'Windows'
@@ -345,8 +346,6 @@ jobs:
with:
context: docker-ctx
push: true
build-args: |
ZEROCLAW_CARGO_FEATURES=channel-matrix,channel-lark,memory-postgres
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ needs.version.outputs.tag }}
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:beta
+2 -3
View File
@@ -20,6 +20,7 @@ env:
CARGO_TERM_COLOR: always
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
RELEASE_CARGO_FEATURES: channel-matrix,channel-lark,memory-postgres
jobs:
validate:
@@ -214,7 +215,7 @@ jobs:
if [ -n "${{ matrix.linker_env || '' }}" ] && [ -n "${{ matrix.linker || '' }}" ]; then
export "${{ matrix.linker_env }}=${{ matrix.linker }}"
fi
cargo build --release --locked --features channel-matrix,channel-lark,memory-postgres --target ${{ matrix.target }}
cargo build --release --locked --features "${{ env.RELEASE_CARGO_FEATURES }}" --target ${{ matrix.target }}
- name: Package (Unix)
if: runner.os != 'Windows'
@@ -388,8 +389,6 @@ jobs:
with:
context: docker-ctx
push: true
build-args: |
ZEROCLAW_CARGO_FEATURES=channel-matrix,channel-lark,memory-postgres
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ needs.validate.outputs.tag }}
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
+13 -27
View File
@@ -448,46 +448,32 @@ bool_to_word() {
fi
}
guided_input_stream() {
# Some constrained containers report interactive stdin (-t 0) but deny
# opening /dev/stdin directly. Probe readability before selecting it.
if [[ -t 0 ]] && (: </dev/stdin) 2>/dev/null; then
echo "/dev/stdin"
guided_open_input() {
# Use stdin directly when it is an interactive terminal (e.g. SSH into LXC).
# Subshell probing of /dev/stdin fails in some constrained containers even
# when FD 0 is perfectly usable, so skip the probe and trust -t 0.
if [[ -t 0 ]]; then
GUIDED_FD=0
return 0
fi
if [[ -t 0 ]] && (: </proc/self/fd/0) 2>/dev/null; then
echo "/proc/self/fd/0"
return 0
fi
if (: </dev/tty) 2>/dev/null; then
echo "/dev/tty"
return 0
fi
return 1
# Non-interactive stdin: try to open /dev/tty as an explicit fd.
exec {GUIDED_FD}</dev/tty 2>/dev/null || return 1
}
guided_read() {
local __target_var="$1"
local __prompt="$2"
local __silent="${3:-false}"
local __input_source=""
local __value=""
if ! __input_source="$(guided_input_stream)"; then
return 1
fi
[[ -n "${GUIDED_FD:-}" ]] || guided_open_input || return 1
if [[ "$__silent" == true ]]; then
if ! read -r -s -p "$__prompt" __value <"$__input_source"; then
return 1
fi
read -r -s -u "$GUIDED_FD" -p "$__prompt" __value || return 1
echo
else
if ! read -r -p "$__prompt" __value <"$__input_source"; then
return 1
fi
read -r -u "$GUIDED_FD" -p "$__prompt" __value || return 1
fi
printf -v "$__target_var" '%s' "$__value"
@@ -708,7 +694,7 @@ prompt_model() {
run_guided_installer() {
local os_name="$1"
if ! guided_input_stream >/dev/null; then
if ! guided_open_input >/dev/null; then
error "guided installer requires an interactive terminal."
error "Run from a terminal, or pass --no-guided with explicit flags."
exit 1
+285
View File
@@ -2181,6 +2181,7 @@ pub(crate) async fn agent_turn(
temperature: f64,
silent: bool,
channel_name: &str,
channel_reply_target: Option<&str>,
multimodal_config: &crate::config::MultimodalConfig,
max_tool_iterations: usize,
approval: Option<&ApprovalManager>,
@@ -2200,6 +2201,7 @@ pub(crate) async fn agent_turn(
silent,
approval,
channel_name,
channel_reply_target,
multimodal_config,
max_tool_iterations,
None,
@@ -2213,6 +2215,100 @@ pub(crate) async fn agent_turn(
.await
}
fn maybe_inject_channel_delivery_defaults(
tool_name: &str,
tool_args: &mut serde_json::Value,
channel_name: &str,
channel_reply_target: Option<&str>,
) {
if tool_name != "cron_add" {
return;
}
if !matches!(
channel_name,
"telegram" | "discord" | "slack" | "mattermost" | "matrix"
) {
return;
}
let Some(reply_target) = channel_reply_target
.map(str::trim)
.filter(|value| !value.is_empty())
else {
return;
};
let Some(args) = tool_args.as_object_mut() else {
return;
};
let is_agent_job = args
.get("job_type")
.and_then(serde_json::Value::as_str)
.is_some_and(|job_type| job_type.eq_ignore_ascii_case("agent"))
|| args
.get("prompt")
.and_then(serde_json::Value::as_str)
.is_some_and(|prompt| !prompt.trim().is_empty());
if !is_agent_job {
return;
}
let default_delivery = || {
serde_json::json!({
"mode": "announce",
"channel": channel_name,
"to": reply_target,
})
};
match args.get_mut("delivery") {
None => {
args.insert("delivery".to_string(), default_delivery());
}
Some(serde_json::Value::Null) => {
*args.get_mut("delivery").expect("delivery key exists") = default_delivery();
}
Some(serde_json::Value::Object(delivery)) => {
if delivery
.get("mode")
.and_then(serde_json::Value::as_str)
.is_some_and(|mode| mode.eq_ignore_ascii_case("none"))
{
return;
}
delivery
.entry("mode".to_string())
.or_insert_with(|| serde_json::Value::String("announce".to_string()));
let needs_channel = delivery
.get("channel")
.and_then(serde_json::Value::as_str)
.is_none_or(|value| value.trim().is_empty());
if needs_channel {
delivery.insert(
"channel".to_string(),
serde_json::Value::String(channel_name.to_string()),
);
}
let needs_target = delivery
.get("to")
.and_then(serde_json::Value::as_str)
.is_none_or(|value| value.trim().is_empty());
if needs_target {
delivery.insert(
"to".to_string(),
serde_json::Value::String(reply_target.to_string()),
);
}
}
Some(_) => {}
}
}
async fn execute_one_tool(
call_name: &str,
call_arguments: serde_json::Value,
@@ -2406,6 +2502,7 @@ pub(crate) async fn run_tool_call_loop(
silent: bool,
approval: Option<&ApprovalManager>,
channel_name: &str,
channel_reply_target: Option<&str>,
multimodal_config: &crate::config::MultimodalConfig,
max_tool_iterations: usize,
cancellation_token: Option<CancellationToken>,
@@ -2816,6 +2913,13 @@ pub(crate) async fn run_tool_call_loop(
}
}
maybe_inject_channel_delivery_defaults(
&tool_name,
&mut tool_args,
channel_name,
channel_reply_target,
);
// ── Approval hook ────────────────────────────────
if let Some(mgr) = approval {
if mgr.needs_approval(&tool_name) {
@@ -3558,6 +3662,7 @@ pub async fn run(
false,
approval_manager.as_ref(),
channel_name,
None,
&config.multimodal,
config.agent.max_tool_iterations,
None,
@@ -3784,6 +3889,7 @@ pub async fn run(
false,
approval_manager.as_ref(),
channel_name,
None,
&config.multimodal,
config.agent.max_tool_iterations,
None,
@@ -4163,6 +4269,7 @@ pub async fn process_message(
config.default_temperature,
true,
"daemon",
None,
&config.multimodal,
config.agent.max_tool_iterations,
Some(&approval_manager),
@@ -4481,6 +4588,57 @@ mod tests {
}
}
struct RecordingArgsTool {
name: String,
recorded_args: Arc<Mutex<Vec<serde_json::Value>>>,
}
impl RecordingArgsTool {
fn new(name: &str, recorded_args: Arc<Mutex<Vec<serde_json::Value>>>) -> Self {
Self {
name: name.to_string(),
recorded_args,
}
}
}
#[async_trait]
impl Tool for RecordingArgsTool {
fn name(&self) -> &str {
&self.name
}
fn description(&self) -> &str {
"Records tool arguments for regression tests"
}
fn parameters_schema(&self) -> serde_json::Value {
serde_json::json!({
"type": "object",
"properties": {
"prompt": { "type": "string" },
"schedule": { "type": "object" },
"delivery": { "type": "object" }
}
})
}
async fn execute(
&self,
args: serde_json::Value,
) -> anyhow::Result<crate::tools::ToolResult> {
self.recorded_args
.lock()
.expect("recorded args lock should be valid")
.push(args.clone());
Ok(crate::tools::ToolResult {
success: true,
output: args.to_string(),
error: None,
})
}
}
struct DelayTool {
name: String,
delay_ms: u64,
@@ -4619,6 +4777,7 @@ mod tests {
true,
None,
"cli",
None,
&crate::config::MultimodalConfig::default(),
3,
None,
@@ -4668,6 +4827,7 @@ mod tests {
true,
None,
"cli",
None,
&multimodal,
3,
None,
@@ -4711,6 +4871,7 @@ mod tests {
true,
None,
"cli",
None,
&crate::config::MultimodalConfig::default(),
3,
None,
@@ -4840,6 +5001,7 @@ mod tests {
true,
Some(&approval_mgr),
"telegram",
None,
&crate::config::MultimodalConfig::default(),
4,
None,
@@ -4877,6 +5039,122 @@ mod tests {
);
}
#[tokio::test]
async fn run_tool_call_loop_injects_channel_delivery_defaults_for_cron_add() {
let provider = ScriptedProvider::from_text_responses(vec![
r#"<tool_call>
{"name":"cron_add","arguments":{"job_type":"agent","prompt":"remind me later","schedule":{"kind":"every","every_ms":60000}}}
</tool_call>"#,
"done",
]);
let recorded_args = Arc::new(Mutex::new(Vec::new()));
let tools_registry: Vec<Box<dyn Tool>> = vec![Box::new(RecordingArgsTool::new(
"cron_add",
Arc::clone(&recorded_args),
))];
let mut history = vec![
ChatMessage::system("test-system"),
ChatMessage::user("schedule a reminder"),
];
let observer = NoopObserver;
let result = run_tool_call_loop(
&provider,
&mut history,
&tools_registry,
&observer,
"mock-provider",
"mock-model",
0.0,
true,
None,
"telegram",
Some("chat-42"),
&crate::config::MultimodalConfig::default(),
4,
None,
None,
None,
&[],
&[],
None,
None,
)
.await
.expect("cron_add delivery defaults should be injected");
assert_eq!(result, "done");
let recorded = recorded_args
.lock()
.expect("recorded args lock should be valid");
let delivery = recorded[0]["delivery"].clone();
assert_eq!(
delivery,
serde_json::json!({
"mode": "announce",
"channel": "telegram",
"to": "chat-42",
})
);
}
#[tokio::test]
async fn run_tool_call_loop_preserves_explicit_cron_delivery_none() {
let provider = ScriptedProvider::from_text_responses(vec![
r#"<tool_call>
{"name":"cron_add","arguments":{"job_type":"agent","prompt":"run silently","schedule":{"kind":"every","every_ms":60000},"delivery":{"mode":"none"}}}
</tool_call>"#,
"done",
]);
let recorded_args = Arc::new(Mutex::new(Vec::new()));
let tools_registry: Vec<Box<dyn Tool>> = vec![Box::new(RecordingArgsTool::new(
"cron_add",
Arc::clone(&recorded_args),
))];
let mut history = vec![
ChatMessage::system("test-system"),
ChatMessage::user("schedule a quiet cron job"),
];
let observer = NoopObserver;
let result = run_tool_call_loop(
&provider,
&mut history,
&tools_registry,
&observer,
"mock-provider",
"mock-model",
0.0,
true,
None,
"telegram",
Some("chat-42"),
&crate::config::MultimodalConfig::default(),
4,
None,
None,
None,
&[],
&[],
None,
None,
)
.await
.expect("explicit delivery mode should be preserved");
assert_eq!(result, "done");
let recorded = recorded_args
.lock()
.expect("recorded args lock should be valid");
assert_eq!(recorded[0]["delivery"], serde_json::json!({"mode": "none"}));
}
#[tokio::test]
async fn run_tool_call_loop_deduplicates_repeated_tool_calls() {
let provider = ScriptedProvider::from_text_responses(vec![
@@ -4912,6 +5190,7 @@ mod tests {
true,
None,
"cli",
None,
&crate::config::MultimodalConfig::default(),
4,
None,
@@ -4980,6 +5259,7 @@ mod tests {
true,
Some(&approval_mgr),
"telegram",
None,
&crate::config::MultimodalConfig::default(),
4,
None,
@@ -5039,6 +5319,7 @@ mod tests {
true,
None,
"cli",
None,
&crate::config::MultimodalConfig::default(),
4,
None,
@@ -5118,6 +5399,7 @@ mod tests {
true,
None,
"cli",
None,
&crate::config::MultimodalConfig::default(),
4,
None,
@@ -5174,6 +5456,7 @@ mod tests {
true,
None,
"cli",
None,
&crate::config::MultimodalConfig::default(),
4,
None,
@@ -5246,6 +5529,7 @@ mod tests {
0.0,
true,
"daemon",
None,
&crate::config::MultimodalConfig::default(),
4,
None,
@@ -7137,6 +7421,7 @@ Let me check the result."#;
true,
None,
"telegram",
None,
&crate::config::MultimodalConfig::default(),
4,
None,
+2
View File
@@ -2240,6 +2240,7 @@ async fn process_channel_message(
true,
Some(&*ctx.approval_manager),
msg.channel.as_str(),
Some(msg.reply_target.as_str()),
&ctx.multimodal,
ctx.max_tool_iterations,
Some(cancellation_token.clone()),
@@ -3379,6 +3380,7 @@ fn collect_configured_channels(
Vec::new(),
sl.allowed_users.clone(),
)
.with_thread_replies(sl.thread_replies.unwrap_or(true))
.with_group_reply_policy(sl.mention_only, Vec::new())
.with_workspace_dir(config.workspace_dir.clone()),
),
+37 -1
View File
@@ -25,6 +25,7 @@ pub struct SlackChannel {
channel_id: Option<String>,
channel_ids: Vec<String>,
allowed_users: Vec<String>,
thread_replies: bool,
mention_only: bool,
group_reply_allowed_sender_ids: Vec<String>,
user_display_name_cache: Mutex<HashMap<String, CachedSlackDisplayName>>,
@@ -75,6 +76,7 @@ impl SlackChannel {
channel_id,
channel_ids,
allowed_users,
thread_replies: true,
mention_only: false,
group_reply_allowed_sender_ids: Vec::new(),
user_display_name_cache: Mutex::new(HashMap::new()),
@@ -94,6 +96,12 @@ impl SlackChannel {
self
}
/// Configure whether outbound replies stay in the originating Slack thread.
pub fn with_thread_replies(mut self, thread_replies: bool) -> Self {
self.thread_replies = thread_replies;
self
}
/// Configure workspace directory used for persisting inbound Slack attachments.
pub fn with_workspace_dir(mut self, dir: PathBuf) -> Self {
self.workspace_dir = Some(dir);
@@ -122,6 +130,14 @@ impl SlackChannel {
.any(|entry| entry == "*" || entry == user_id)
}
fn outbound_thread_ts<'a>(&self, message: &'a SendMessage) -> Option<&'a str> {
if self.thread_replies {
message.thread_ts.as_deref()
} else {
None
}
}
/// Get the bot's own user ID so we can ignore our own messages
async fn get_bot_user_id(&self) -> Option<String> {
let resp: serde_json::Value = self
@@ -2149,7 +2165,7 @@ impl Channel for SlackChannel {
"text": message.content
});
if let Some(ref ts) = message.thread_ts {
if let Some(ts) = self.outbound_thread_ts(message) {
body["thread_ts"] = serde_json::json!(ts);
}
@@ -2484,10 +2500,30 @@ mod tests {
#[test]
fn slack_group_reply_policy_defaults_to_all_messages() {
let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec!["*".into()]);
assert!(ch.thread_replies);
assert!(!ch.mention_only);
assert!(ch.group_reply_allowed_sender_ids.is_empty());
}
#[test]
fn with_thread_replies_sets_flag() {
let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![])
.with_thread_replies(false);
assert!(!ch.thread_replies);
}
#[test]
fn outbound_thread_ts_respects_thread_replies_setting() {
let msg = SendMessage::new("hello", "C123").in_thread(Some("1741234567.100001".into()));
let threaded = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![]);
assert_eq!(threaded.outbound_thread_ts(&msg), Some("1741234567.100001"));
let channel_root = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![])
.with_thread_replies(false);
assert_eq!(channel_root.outbound_thread_ts(&msg), None);
}
#[test]
fn with_workspace_dir_sets_field() {
let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![])
+31
View File
@@ -4143,6 +4143,15 @@ pub struct CronConfig {
/// Enable the cron subsystem. Default: `true`.
#[serde(default = "default_true")]
pub enabled: bool,
/// Run all overdue jobs at scheduler startup. Default: `true`.
///
/// When the machine boots late or the daemon restarts, jobs whose
/// `next_run` is in the past are considered "missed". With this
/// option enabled the scheduler fires them once before entering
/// the normal polling loop. Disable if you prefer missed jobs to
/// simply wait for their next scheduled occurrence.
#[serde(default = "default_true")]
pub catch_up_on_startup: bool,
/// Maximum number of historical cron run records to retain. Default: `50`.
#[serde(default = "default_max_run_history")]
pub max_run_history: u32,
@@ -4156,6 +4165,7 @@ impl Default for CronConfig {
fn default() -> Self {
Self {
enabled: true,
catch_up_on_startup: true,
max_run_history: default_max_run_history(),
}
}
@@ -4630,6 +4640,10 @@ pub struct SlackConfig {
/// cancels the in-flight request and starts a fresh response with preserved history.
#[serde(default)]
pub interrupt_on_new_message: bool,
/// When true (default), replies stay in the originating Slack thread.
/// When false, replies go to the channel root instead.
#[serde(default)]
pub thread_replies: Option<bool>,
/// When true, only respond to messages that @-mention the bot in groups.
/// Direct messages remain allowed.
#[serde(default)]
@@ -8397,11 +8411,13 @@ recipient = "42"
async fn cron_config_serde_roundtrip() {
let c = CronConfig {
enabled: false,
catch_up_on_startup: false,
max_run_history: 100,
};
let json = serde_json::to_string(&c).unwrap();
let parsed: CronConfig = serde_json::from_str(&json).unwrap();
assert!(!parsed.enabled);
assert!(!parsed.catch_up_on_startup);
assert_eq!(parsed.max_run_history, 100);
}
@@ -8415,6 +8431,7 @@ default_temperature = 0.7
let parsed: Config = toml::from_str(toml_str).unwrap();
assert!(parsed.cron.enabled);
assert!(parsed.cron.catch_up_on_startup);
assert_eq!(parsed.cron.max_run_history, 50);
}
@@ -9370,6 +9387,7 @@ allowed_users = ["@ops:matrix.org"]
let parsed: SlackConfig = serde_json::from_str(json).unwrap();
assert!(parsed.allowed_users.is_empty());
assert!(!parsed.interrupt_on_new_message);
assert_eq!(parsed.thread_replies, None);
assert!(!parsed.mention_only);
}
@@ -9379,6 +9397,7 @@ allowed_users = ["@ops:matrix.org"]
let parsed: SlackConfig = serde_json::from_str(json).unwrap();
assert_eq!(parsed.allowed_users, vec!["U111"]);
assert!(!parsed.interrupt_on_new_message);
assert_eq!(parsed.thread_replies, None);
assert!(!parsed.mention_only);
}
@@ -9388,6 +9407,7 @@ allowed_users = ["@ops:matrix.org"]
let parsed: SlackConfig = serde_json::from_str(json).unwrap();
assert!(parsed.mention_only);
assert!(!parsed.interrupt_on_new_message);
assert_eq!(parsed.thread_replies, None);
}
#[test]
@@ -9395,6 +9415,16 @@ allowed_users = ["@ops:matrix.org"]
let json = r#"{"bot_token":"xoxb-tok","interrupt_on_new_message":true}"#;
let parsed: SlackConfig = serde_json::from_str(json).unwrap();
assert!(parsed.interrupt_on_new_message);
assert_eq!(parsed.thread_replies, None);
assert!(!parsed.mention_only);
}
#[test]
async fn slack_config_deserializes_thread_replies() {
let json = r#"{"bot_token":"xoxb-tok","thread_replies":false}"#;
let parsed: SlackConfig = serde_json::from_str(json).unwrap();
assert_eq!(parsed.thread_replies, Some(false));
assert!(!parsed.interrupt_on_new_message);
assert!(!parsed.mention_only);
}
@@ -9418,6 +9448,7 @@ channel_id = "C123"
let parsed: SlackConfig = toml::from_str(toml_str).unwrap();
assert!(parsed.allowed_users.is_empty());
assert!(!parsed.interrupt_on_new_message);
assert_eq!(parsed.thread_replies, None);
assert!(!parsed.mention_only);
assert_eq!(parsed.channel_id.as_deref(), Some("C123"));
}
+144 -8
View File
@@ -14,8 +14,8 @@ pub use schedule::{
};
#[allow(unused_imports)]
pub use store::{
add_agent_job, due_jobs, get_job, list_jobs, list_runs, record_last_run, record_run,
remove_job, reschedule_after_run, update_job,
add_agent_job, all_overdue_jobs, due_jobs, get_job, list_jobs, list_runs, record_last_run,
record_run, remove_job, reschedule_after_run, update_job,
};
pub use types::{
deserialize_maybe_stringified, CronJob, CronJobPatch, CronRun, DeliveryConfig, JobType,
@@ -156,6 +156,7 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
expression,
tz,
agent,
allowed_tools,
command,
} => {
let schedule = Schedule::Cron {
@@ -172,12 +173,20 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
None,
None,
false,
if allowed_tools.is_empty() {
None
} else {
Some(allowed_tools)
},
)?;
println!("✅ Added agent cron job {}", job.id);
println!(" Expr : {}", job.expression);
println!(" Next : {}", job.next_run.to_rfc3339());
println!(" Prompt: {}", job.prompt.as_deref().unwrap_or_default());
} else {
if !allowed_tools.is_empty() {
bail!("--allowed-tool is only supported with --agent cron jobs");
}
let job = add_shell_job(config, None, schedule, &command)?;
println!("✅ Added cron job {}", job.id);
println!(" Expr: {}", job.expression);
@@ -186,7 +195,12 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
}
Ok(())
}
crate::CronCommands::AddAt { at, agent, command } => {
crate::CronCommands::AddAt {
at,
agent,
allowed_tools,
command,
} => {
let at = chrono::DateTime::parse_from_rfc3339(&at)
.map_err(|e| anyhow::anyhow!("Invalid RFC3339 timestamp for --at: {e}"))?
.with_timezone(&chrono::Utc);
@@ -201,11 +215,19 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
None,
None,
true,
if allowed_tools.is_empty() {
None
} else {
Some(allowed_tools)
},
)?;
println!("✅ Added one-shot agent cron job {}", job.id);
println!(" At : {}", job.next_run.to_rfc3339());
println!(" Prompt: {}", job.prompt.as_deref().unwrap_or_default());
} else {
if !allowed_tools.is_empty() {
bail!("--allowed-tool is only supported with --agent cron jobs");
}
let job = add_shell_job(config, None, schedule, &command)?;
println!("✅ Added one-shot cron job {}", job.id);
println!(" At : {}", job.next_run.to_rfc3339());
@@ -216,6 +238,7 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
crate::CronCommands::AddEvery {
every_ms,
agent,
allowed_tools,
command,
} => {
let schedule = Schedule::Every { every_ms };
@@ -229,12 +252,20 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
None,
None,
false,
if allowed_tools.is_empty() {
None
} else {
Some(allowed_tools)
},
)?;
println!("✅ Added interval agent cron job {}", job.id);
println!(" Every(ms): {every_ms}");
println!(" Next : {}", job.next_run.to_rfc3339());
println!(" Prompt : {}", job.prompt.as_deref().unwrap_or_default());
} else {
if !allowed_tools.is_empty() {
bail!("--allowed-tool is only supported with --agent cron jobs");
}
let job = add_shell_job(config, None, schedule, &command)?;
println!("✅ Added interval cron job {}", job.id);
println!(" Every(ms): {every_ms}");
@@ -246,6 +277,7 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
crate::CronCommands::Once {
delay,
agent,
allowed_tools,
command,
} => {
if agent {
@@ -261,11 +293,19 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
None,
None,
true,
if allowed_tools.is_empty() {
None
} else {
Some(allowed_tools)
},
)?;
println!("✅ Added one-shot agent cron job {}", job.id);
println!(" At : {}", job.next_run.to_rfc3339());
println!(" Prompt: {}", job.prompt.as_deref().unwrap_or_default());
} else {
if !allowed_tools.is_empty() {
bail!("--allowed-tool is only supported with --agent cron jobs");
}
let job = add_once(config, &delay, &command)?;
println!("✅ Added one-shot cron job {}", job.id);
println!(" At : {}", job.next_run.to_rfc3339());
@@ -279,21 +319,37 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
tz,
command,
name,
allowed_tools,
} => {
if expression.is_none() && tz.is_none() && command.is_none() && name.is_none() {
bail!("At least one of --expression, --tz, --command, or --name must be provided");
if expression.is_none()
&& tz.is_none()
&& command.is_none()
&& name.is_none()
&& allowed_tools.is_empty()
{
bail!(
"At least one of --expression, --tz, --command, --name, or --allowed-tool must be provided"
);
}
let existing = if expression.is_some() || tz.is_some() || !allowed_tools.is_empty() {
Some(get_job(config, &id)?)
} else {
None
};
// Merge expression/tz with the existing schedule so that
// --tz alone updates the timezone and --expression alone
// preserves the existing timezone.
let schedule = if expression.is_some() || tz.is_some() {
let existing = get_job(config, &id)?;
let (existing_expr, existing_tz) = match existing.schedule {
let existing = existing
.as_ref()
.expect("existing job must be loaded when updating schedule");
let (existing_expr, existing_tz) = match &existing.schedule {
Schedule::Cron {
expr,
tz: existing_tz,
} => (expr, existing_tz),
} => (expr.clone(), existing_tz.clone()),
_ => bail!("Cannot update expression/tz on a non-cron schedule"),
};
Some(Schedule::Cron {
@@ -304,10 +360,24 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
None
};
if !allowed_tools.is_empty() {
let existing = existing
.as_ref()
.expect("existing job must be loaded when updating allowed tools");
if existing.job_type != JobType::Agent {
bail!("--allowed-tool is only supported for agent cron jobs");
}
}
let patch = CronJobPatch {
schedule,
command,
name,
allowed_tools: if allowed_tools.is_empty() {
None
} else {
Some(allowed_tools)
},
..CronJobPatch::default()
};
@@ -430,6 +500,7 @@ mod tests {
tz: tz.map(Into::into),
command: command.map(Into::into),
name: name.map(Into::into),
allowed_tools: vec![],
},
config,
)
@@ -778,6 +849,7 @@ mod tests {
expression: "*/15 * * * *".into(),
tz: None,
agent: true,
allowed_tools: vec![],
command: "Check server health: disk space, memory, CPU load".into(),
},
&config,
@@ -808,6 +880,7 @@ mod tests {
expression: "*/15 * * * *".into(),
tz: None,
agent: true,
allowed_tools: vec![],
command: "Check server health: disk space, memory, CPU load".into(),
},
&config,
@@ -819,6 +892,68 @@ mod tests {
assert_eq!(jobs[0].job_type, JobType::Agent);
}
#[test]
fn cli_agent_allowed_tools_persist() {
let tmp = TempDir::new().unwrap();
let config = test_config(&tmp);
handle_command(
crate::CronCommands::Add {
expression: "*/15 * * * *".into(),
tz: None,
agent: true,
allowed_tools: vec!["file_read".into(), "web_search".into()],
command: "Check server health".into(),
},
&config,
)
.unwrap();
let jobs = list_jobs(&config).unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(
jobs[0].allowed_tools,
Some(vec!["file_read".into(), "web_search".into()])
);
}
#[test]
fn cli_update_agent_allowed_tools_persist() {
let tmp = TempDir::new().unwrap();
let config = test_config(&tmp);
let job = add_agent_job(
&config,
Some("agent".into()),
Schedule::Cron {
expr: "*/5 * * * *".into(),
tz: None,
},
"original prompt",
SessionTarget::Isolated,
None,
None,
false,
None,
)
.unwrap();
handle_command(
crate::CronCommands::Update {
id: job.id.clone(),
expression: None,
tz: None,
command: None,
name: None,
allowed_tools: vec!["shell".into()],
},
&config,
)
.unwrap();
let updated = get_job(&config, &job.id).unwrap();
assert_eq!(updated.allowed_tools, Some(vec!["shell".into()]));
}
#[test]
fn cli_without_agent_flag_defaults_to_shell_job() {
let tmp = TempDir::new().unwrap();
@@ -829,6 +964,7 @@ mod tests {
expression: "*/5 * * * *".into(),
tz: None,
agent: false,
allowed_tools: vec![],
command: "echo ok".into(),
},
&config,
+130 -14
View File
@@ -6,8 +6,9 @@ use crate::channels::{
};
use crate::config::Config;
use crate::cron::{
due_jobs, next_run_for_schedule, record_last_run, record_run, remove_job, reschedule_after_run,
update_job, CronJob, CronJobPatch, DeliveryConfig, JobType, Schedule, SessionTarget,
all_overdue_jobs, due_jobs, next_run_for_schedule, record_last_run, record_run, remove_job,
reschedule_after_run, update_job, CronJob, CronJobPatch, DeliveryConfig, JobType, Schedule,
SessionTarget,
};
use crate::security::SecurityPolicy;
use anyhow::Result;
@@ -33,6 +34,18 @@ pub async fn run(config: Config) -> Result<()> {
crate::health::mark_component_ok(SCHEDULER_COMPONENT);
// ── Startup catch-up: run ALL overdue jobs before entering the
// normal polling loop. The regular loop is capped by `max_tasks`,
// which could leave some overdue jobs waiting across many cycles
// if the machine was off for a while. The catch-up phase fetches
// without the `max_tasks` limit so every missed job fires once.
// Controlled by `[cron] catch_up_on_startup` (default: true).
if config.cron.catch_up_on_startup {
catch_up_overdue_jobs(&config, &security).await;
} else {
tracing::info!("Scheduler startup: catch-up disabled by config");
}
loop {
interval.tick().await;
// Keep scheduler liveness fresh even when there are no due jobs.
@@ -51,6 +64,35 @@ pub async fn run(config: Config) -> Result<()> {
}
}
/// Fetch **all** overdue jobs (ignoring `max_tasks`) and execute them.
///
/// Called once at scheduler startup so that jobs missed during downtime
/// (e.g. late boot, daemon restart) are caught up immediately.
async fn catch_up_overdue_jobs(config: &Config, security: &Arc<SecurityPolicy>) {
let now = Utc::now();
let jobs = match all_overdue_jobs(config, now) {
Ok(jobs) => jobs,
Err(e) => {
tracing::warn!("Startup catch-up query failed: {e}");
return;
}
};
if jobs.is_empty() {
tracing::info!("Scheduler startup: no overdue jobs to catch up");
return;
}
tracing::info!(
count = jobs.len(),
"Scheduler startup: catching up overdue jobs"
);
process_due_jobs(config, security, jobs, SCHEDULER_COMPONENT).await;
tracing::info!("Scheduler startup: catch-up complete");
}
pub async fn execute_job_now(config: &Config, job: &CronJob) -> (bool, String) {
let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir);
Box::pin(execute_job_with_retry(config, &security, job)).await
@@ -506,18 +548,12 @@ async fn run_job_command_with_timeout(
);
}
let child = match Command::new("sh")
.arg("-lc")
.arg(&job.command)
.current_dir(&config.workspace_dir)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()
{
Ok(child) => child,
Err(e) => return (false, format!("spawn error: {e}")),
let child = match build_cron_shell_command(&job.command, &config.workspace_dir) {
Ok(mut cmd) => match cmd.spawn() {
Ok(child) => child,
Err(e) => return (false, format!("spawn error: {e}")),
},
Err(e) => return (false, format!("shell setup error: {e}")),
};
match time::timeout(timeout, child.wait_with_output()).await {
@@ -540,6 +576,35 @@ async fn run_job_command_with_timeout(
}
}
/// Build a shell `Command` for cron job execution.
///
/// Uses `sh -c <command>` (non-login shell). On Windows, ZeroClaw users
/// typically have Git Bash installed which provides `sh` in PATH, and
/// cron commands are written with Unix shell syntax. The previous `-lc`
/// (login shell) flag was dropped: login shells load the full user
/// profile on every invocation which is slow and may cause side effects.
///
/// The command is configured with:
/// - `current_dir` set to the workspace
/// - `stdin` piped to `/dev/null` (no interactive input)
/// - `stdout` and `stderr` piped for capture
/// - `kill_on_drop(true)` for safe timeout handling
fn build_cron_shell_command(
command: &str,
workspace_dir: &std::path::Path,
) -> anyhow::Result<Command> {
let mut cmd = Command::new("sh");
cmd.arg("-c")
.arg(command)
.current_dir(workspace_dir)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
Ok(cmd)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -900,6 +965,7 @@ mod tests {
None,
None,
true,
None,
)
.unwrap();
let started = Utc::now();
@@ -925,6 +991,7 @@ mod tests {
None,
None,
true,
None,
)
.unwrap();
let started = Utc::now();
@@ -991,6 +1058,7 @@ mod tests {
best_effort: false,
}),
false,
None,
)
.unwrap();
let started = Utc::now();
@@ -1029,6 +1097,7 @@ mod tests {
best_effort: true,
}),
false,
None,
)
.unwrap();
let started = Utc::now();
@@ -1060,6 +1129,7 @@ mod tests {
None,
None,
false,
None,
)
.unwrap();
assert!(!job.delete_after_run);
@@ -1152,4 +1222,50 @@ mod tests {
.to_string()
.contains("matrix delivery channel requires `channel-matrix` feature"));
}
#[test]
fn build_cron_shell_command_uses_sh_non_login() {
let workspace = std::env::temp_dir();
let cmd = build_cron_shell_command("echo cron-test", &workspace).unwrap();
let debug = format!("{cmd:?}");
assert!(debug.contains("echo cron-test"));
assert!(debug.contains("\"sh\""), "should use sh: {debug}");
// Must NOT use login shell (-l) — login shells load full profile
// and are slow/unpredictable for cron jobs.
assert!(
!debug.contains("\"-lc\""),
"must not use login shell: {debug}"
);
}
#[tokio::test]
async fn build_cron_shell_command_executes_successfully() {
let workspace = std::env::temp_dir();
let mut cmd = build_cron_shell_command("echo cron-ok", &workspace).unwrap();
let output = cmd.output().await.unwrap();
assert!(output.status.success());
let stdout = String::from_utf8_lossy(&output.stdout);
assert!(stdout.contains("cron-ok"));
}
#[tokio::test]
async fn catch_up_queries_all_overdue_jobs_ignoring_max_tasks() {
let tmp = TempDir::new().unwrap();
let mut config = test_config(&tmp).await;
config.scheduler.max_tasks = 1; // limit normal polling to 1
// Create 3 jobs with "every minute" schedule
for i in 0..3 {
let _ = cron::add_job(&config, "* * * * *", &format!("echo catchup-{i}")).unwrap();
}
// Verify normal due_jobs is limited to max_tasks=1
let far_future = Utc::now() + ChronoDuration::days(1);
let due = cron::due_jobs(&config, far_future).unwrap();
assert_eq!(due.len(), 1, "due_jobs must respect max_tasks");
// all_overdue_jobs ignores the limit
let overdue = cron::all_overdue_jobs(&config, far_future).unwrap();
assert_eq!(overdue.len(), 3, "all_overdue_jobs must return all");
}
}
+170 -8
View File
@@ -77,6 +77,7 @@ pub fn add_agent_job(
model: Option<String>,
delivery: Option<DeliveryConfig>,
delete_after_run: bool,
allowed_tools: Option<Vec<String>>,
) -> Result<CronJob> {
let now = Utc::now();
validate_schedule(&schedule, now)?;
@@ -90,8 +91,8 @@ pub fn add_agent_job(
conn.execute(
"INSERT INTO cron_jobs (
id, expression, command, schedule, job_type, prompt, name, session_target, model,
enabled, delivery, delete_after_run, created_at, next_run
) VALUES (?1, ?2, '', ?3, 'agent', ?4, ?5, ?6, ?7, 1, ?8, ?9, ?10, ?11)",
enabled, delivery, delete_after_run, allowed_tools, created_at, next_run
) VALUES (?1, ?2, '', ?3, 'agent', ?4, ?5, ?6, ?7, 1, ?8, ?9, ?10, ?11, ?12)",
params![
id,
expression,
@@ -102,6 +103,7 @@ pub fn add_agent_job(
model,
serde_json::to_string(&delivery)?,
if delete_after_run { 1 } else { 0 },
encode_allowed_tools(allowed_tools.as_ref())?,
now.to_rfc3339(),
next_run.to_rfc3339(),
],
@@ -117,7 +119,8 @@ pub fn list_jobs(config: &Config) -> Result<Vec<CronJob>> {
with_connection(config, |conn| {
let mut stmt = conn.prepare(
"SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
allowed_tools
FROM cron_jobs ORDER BY next_run ASC",
)?;
@@ -135,7 +138,8 @@ pub fn get_job(config: &Config, job_id: &str) -> Result<CronJob> {
with_connection(config, |conn| {
let mut stmt = conn.prepare(
"SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
allowed_tools
FROM cron_jobs WHERE id = ?1",
)?;
@@ -168,7 +172,8 @@ pub fn due_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
with_connection(config, |conn| {
let mut stmt = conn.prepare(
"SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
allowed_tools
FROM cron_jobs
WHERE enabled = 1 AND next_run <= ?1
ORDER BY next_run ASC
@@ -188,6 +193,34 @@ pub fn due_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
})
}
/// Return **all** enabled overdue jobs without the `max_tasks` limit.
///
/// Used by the scheduler startup catch-up to ensure every missed job is
/// executed at least once after a period of downtime (late boot, daemon
/// restart, etc.).
pub fn all_overdue_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
with_connection(config, |conn| {
let mut stmt = conn.prepare(
"SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output, allowed_tools
FROM cron_jobs
WHERE enabled = 1 AND next_run <= ?1
ORDER BY next_run ASC",
)?;
let rows = stmt.query_map(params![now.to_rfc3339()], map_cron_job_row)?;
let mut jobs = Vec::new();
for row in rows {
match row {
Ok(job) => jobs.push(job),
Err(e) => tracing::warn!("Skipping cron job with unparseable row data: {e}"),
}
}
Ok(jobs)
})
}
pub fn update_job(config: &Config, job_id: &str, patch: CronJobPatch) -> Result<CronJob> {
let mut job = get_job(config, job_id)?;
let mut schedule_changed = false;
@@ -222,6 +255,9 @@ pub fn update_job(config: &Config, job_id: &str, patch: CronJobPatch) -> Result<
if let Some(delete_after_run) = patch.delete_after_run {
job.delete_after_run = delete_after_run;
}
if let Some(allowed_tools) = patch.allowed_tools {
job.allowed_tools = Some(allowed_tools);
}
if schedule_changed {
job.next_run = next_run_for_schedule(&job.schedule, Utc::now())?;
@@ -232,8 +268,8 @@ pub fn update_job(config: &Config, job_id: &str, patch: CronJobPatch) -> Result<
"UPDATE cron_jobs
SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4, prompt = ?5, name = ?6,
session_target = ?7, model = ?8, enabled = ?9, delivery = ?10, delete_after_run = ?11,
next_run = ?12
WHERE id = ?13",
allowed_tools = ?12, next_run = ?13
WHERE id = ?14",
params![
job.expression,
job.command,
@@ -246,6 +282,7 @@ pub fn update_job(config: &Config, job_id: &str, patch: CronJobPatch) -> Result<
if job.enabled { 1 } else { 0 },
serde_json::to_string(&job.delivery)?,
if job.delete_after_run { 1 } else { 0 },
encode_allowed_tools(job.allowed_tools.as_ref())?,
job.next_run.to_rfc3339(),
job.id,
],
@@ -446,6 +483,7 @@ fn map_cron_job_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CronJob> {
let next_run_raw: String = row.get(13)?;
let last_run_raw: Option<String> = row.get(14)?;
let created_at_raw: String = row.get(12)?;
let allowed_tools_raw: Option<String> = row.get(17)?;
Ok(CronJob {
id: row.get(0)?,
@@ -468,7 +506,8 @@ fn map_cron_job_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CronJob> {
},
last_status: row.get(15)?,
last_output: row.get(16)?,
allowed_tools: None,
allowed_tools: decode_allowed_tools(allowed_tools_raw.as_deref())
.map_err(sql_conversion_error)?,
})
}
@@ -502,6 +541,25 @@ fn decode_delivery(delivery_raw: Option<&str>) -> Result<DeliveryConfig> {
Ok(DeliveryConfig::default())
}
fn encode_allowed_tools(allowed_tools: Option<&Vec<String>>) -> Result<Option<String>> {
allowed_tools
.map(serde_json::to_string)
.transpose()
.context("Failed to serialize cron allowed_tools")
}
fn decode_allowed_tools(raw: Option<&str>) -> Result<Option<Vec<String>>> {
if let Some(raw) = raw {
let trimmed = raw.trim();
if !trimmed.is_empty() {
return serde_json::from_str(trimmed)
.map(Some)
.with_context(|| format!("Failed to parse cron allowed_tools JSON: {trimmed}"));
}
}
Ok(None)
}
fn add_column_if_missing(conn: &Connection, name: &str, sql_type: &str) -> Result<()> {
let mut stmt = conn.prepare("PRAGMA table_info(cron_jobs)")?;
let mut rows = stmt.query([])?;
@@ -557,6 +615,7 @@ fn with_connection<T>(config: &Config, f: impl FnOnce(&Connection) -> Result<T>)
enabled INTEGER NOT NULL DEFAULT 1,
delivery TEXT,
delete_after_run INTEGER NOT NULL DEFAULT 0,
allowed_tools TEXT,
created_at TEXT NOT NULL,
next_run TEXT NOT NULL,
last_run TEXT,
@@ -590,6 +649,7 @@ fn with_connection<T>(config: &Config, f: impl FnOnce(&Connection) -> Result<T>)
add_column_if_missing(&conn, "enabled", "INTEGER NOT NULL DEFAULT 1")?;
add_column_if_missing(&conn, "delivery", "TEXT")?;
add_column_if_missing(&conn, "delete_after_run", "INTEGER NOT NULL DEFAULT 0")?;
add_column_if_missing(&conn, "allowed_tools", "TEXT")?;
f(&conn)
}
@@ -704,6 +764,108 @@ mod tests {
assert_eq!(due.len(), 2);
}
#[test]
fn all_overdue_jobs_ignores_max_tasks_limit() {
let tmp = TempDir::new().unwrap();
let mut config = test_config(&tmp);
config.scheduler.max_tasks = 2;
let _ = add_job(&config, "* * * * *", "echo ov-1").unwrap();
let _ = add_job(&config, "* * * * *", "echo ov-2").unwrap();
let _ = add_job(&config, "* * * * *", "echo ov-3").unwrap();
let far_future = Utc::now() + ChronoDuration::days(365);
// due_jobs respects the limit
let due = due_jobs(&config, far_future).unwrap();
assert_eq!(due.len(), 2);
// all_overdue_jobs returns everything
let overdue = all_overdue_jobs(&config, far_future).unwrap();
assert_eq!(overdue.len(), 3);
}
#[test]
fn all_overdue_jobs_excludes_disabled_jobs() {
let tmp = TempDir::new().unwrap();
let config = test_config(&tmp);
let job = add_job(&config, "* * * * *", "echo disabled").unwrap();
let _ = update_job(
&config,
&job.id,
CronJobPatch {
enabled: Some(false),
..CronJobPatch::default()
},
)
.unwrap();
let far_future = Utc::now() + ChronoDuration::days(365);
let overdue = all_overdue_jobs(&config, far_future).unwrap();
assert!(overdue.is_empty());
}
#[test]
fn add_agent_job_persists_allowed_tools() {
let tmp = TempDir::new().unwrap();
let config = test_config(&tmp);
let job = add_agent_job(
&config,
Some("agent".into()),
Schedule::Every { every_ms: 60_000 },
"do work",
SessionTarget::Isolated,
None,
None,
false,
Some(vec!["file_read".into(), "web_search".into()]),
)
.unwrap();
assert_eq!(
job.allowed_tools,
Some(vec!["file_read".into(), "web_search".into()])
);
let stored = get_job(&config, &job.id).unwrap();
assert_eq!(stored.allowed_tools, job.allowed_tools);
}
#[test]
fn update_job_persists_allowed_tools_patch() {
let tmp = TempDir::new().unwrap();
let config = test_config(&tmp);
let job = add_agent_job(
&config,
Some("agent".into()),
Schedule::Every { every_ms: 60_000 },
"do work",
SessionTarget::Isolated,
None,
None,
false,
None,
)
.unwrap();
let updated = update_job(
&config,
&job.id,
CronJobPatch {
allowed_tools: Some(vec!["shell".into()]),
..CronJobPatch::default()
},
)
.unwrap();
assert_eq!(updated.allowed_tools, Some(vec!["shell".into()]));
assert_eq!(
get_job(&config, &job.id).unwrap().allowed_tools,
Some(vec!["shell".into()])
);
}
#[test]
fn reschedule_after_run_persists_last_status_and_last_run() {
let tmp = TempDir::new().unwrap();
+59
View File
@@ -357,6 +357,65 @@ pub async fn handle_api_cron_delete(
}
}
/// GET /api/cron/settings — return cron subsystem settings
pub async fn handle_api_cron_settings_get(
State(state): State<AppState>,
headers: HeaderMap,
) -> impl IntoResponse {
if let Err(e) = require_auth(&state, &headers) {
return e.into_response();
}
let config = state.config.lock().clone();
Json(serde_json::json!({
"enabled": config.cron.enabled,
"catch_up_on_startup": config.cron.catch_up_on_startup,
"max_run_history": config.cron.max_run_history,
}))
.into_response()
}
/// PATCH /api/cron/settings — update cron subsystem settings
pub async fn handle_api_cron_settings_patch(
State(state): State<AppState>,
headers: HeaderMap,
Json(body): Json<serde_json::Value>,
) -> impl IntoResponse {
if let Err(e) = require_auth(&state, &headers) {
return e.into_response();
}
let mut config = state.config.lock().clone();
if let Some(v) = body.get("enabled").and_then(|v| v.as_bool()) {
config.cron.enabled = v;
}
if let Some(v) = body.get("catch_up_on_startup").and_then(|v| v.as_bool()) {
config.cron.catch_up_on_startup = v;
}
if let Some(v) = body.get("max_run_history").and_then(|v| v.as_u64()) {
config.cron.max_run_history = u32::try_from(v).unwrap_or(u32::MAX);
}
if let Err(e) = config.save().await {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("Failed to save config: {e}")})),
)
.into_response();
}
*state.config.lock() = config.clone();
Json(serde_json::json!({
"status": "ok",
"enabled": config.cron.enabled,
"catch_up_on_startup": config.cron.catch_up_on_startup,
"max_run_history": config.cron.max_run_history,
}))
.into_response()
}
/// GET /api/integrations — list all integrations with status
pub async fn handle_api_integrations(
State(state): State<AppState>,
+4
View File
@@ -766,6 +766,10 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
.route("/api/tools", get(api::handle_api_tools))
.route("/api/cron", get(api::handle_api_cron_list))
.route("/api/cron", post(api::handle_api_cron_add))
.route(
"/api/cron/settings",
get(api::handle_api_cron_settings_get).patch(api::handle_api_cron_settings_patch),
)
.route("/api/cron/{id}", delete(api::handle_api_cron_delete))
.route("/api/cron/{id}/runs", get(api::handle_api_cron_runs))
.route("/api/integrations", get(api::handle_api_integrations))
+15
View File
@@ -299,6 +299,9 @@ Examples:
/// Treat the argument as an agent prompt instead of a shell command
#[arg(long)]
agent: bool,
/// Restrict agent cron jobs to the specified tool names (repeatable, agent-only)
#[arg(long = "allowed-tool")]
allowed_tools: Vec<String>,
/// Command (shell) or prompt (agent) to run
command: String,
},
@@ -317,6 +320,9 @@ Examples:
/// Treat the argument as an agent prompt instead of a shell command
#[arg(long)]
agent: bool,
/// Restrict agent cron jobs to the specified tool names (repeatable, agent-only)
#[arg(long = "allowed-tool")]
allowed_tools: Vec<String>,
/// Command (shell) or prompt (agent) to run
command: String,
},
@@ -335,6 +341,9 @@ Examples:
/// Treat the argument as an agent prompt instead of a shell command
#[arg(long)]
agent: bool,
/// Restrict agent cron jobs to the specified tool names (repeatable, agent-only)
#[arg(long = "allowed-tool")]
allowed_tools: Vec<String>,
/// Command (shell) or prompt (agent) to run
command: String,
},
@@ -355,6 +364,9 @@ Examples:
/// Treat the argument as an agent prompt instead of a shell command
#[arg(long)]
agent: bool,
/// Restrict agent cron jobs to the specified tool names (repeatable, agent-only)
#[arg(long = "allowed-tool")]
allowed_tools: Vec<String>,
/// Command (shell) or prompt (agent) to run
command: String,
},
@@ -388,6 +400,9 @@ Examples:
/// New job name
#[arg(long)]
name: Option<String>,
/// Replace the agent job allowlist with the specified tool names (repeatable)
#[arg(long = "allowed-tool")]
allowed_tools: Vec<String>,
},
/// Pause a scheduled task
Pause {
+98
View File
@@ -463,6 +463,47 @@ fn resolve_quick_setup_dirs_with_home(home: &Path) -> (PathBuf, PathBuf) {
(config_dir.clone(), config_dir.join("workspace"))
}
fn homebrew_prefix_for_exe(exe: &Path) -> Option<&'static str> {
let exe = exe.to_string_lossy();
if exe == "/opt/homebrew/bin/zeroclaw"
|| exe.starts_with("/opt/homebrew/Cellar/zeroclaw/")
|| exe.starts_with("/opt/homebrew/opt/zeroclaw/")
{
return Some("/opt/homebrew");
}
if exe == "/usr/local/bin/zeroclaw"
|| exe.starts_with("/usr/local/Cellar/zeroclaw/")
|| exe.starts_with("/usr/local/opt/zeroclaw/")
{
return Some("/usr/local");
}
None
}
fn quick_setup_homebrew_service_note(
config_path: &Path,
workspace_dir: &Path,
exe: &Path,
) -> Option<String> {
let prefix = homebrew_prefix_for_exe(exe)?;
let service_root = Path::new(prefix).join("var").join("zeroclaw");
let service_config = service_root.join("config.toml");
let service_workspace = service_root.join("workspace");
if config_path == service_config || workspace_dir == service_workspace {
return None;
}
Some(format!(
"Homebrew service note: `brew services` uses {} (config {}) by default. Your onboarding just wrote {}. If you plan to run ZeroClaw as a service, copy or link this workspace first.",
service_workspace.display(),
service_config.display(),
config_path.display(),
))
}
#[allow(clippy::too_many_lines)]
async fn run_quick_setup_with_home(
credential_override: Option<&str>,
@@ -650,6 +691,16 @@ async fn run_quick_setup_with_home(
style("Config saved:").white().bold(),
style(config_path.display()).green()
);
if cfg!(target_os = "macos") {
if let Ok(exe) = std::env::current_exe() {
if let Some(note) =
quick_setup_homebrew_service_note(&config_path, &workspace_dir, &exe)
{
println!();
println!(" {}", style(note).yellow());
}
}
}
println!();
println!(" {}", style("Next steps:").white().bold());
if credential_override.is_none() {
@@ -3913,6 +3964,7 @@ fn setup_channels() -> Result<ChannelsConfig> {
},
allowed_users,
interrupt_on_new_message: false,
thread_replies: None,
mention_only: false,
});
}
@@ -6066,6 +6118,52 @@ mod tests {
assert_eq!(config.config_path, expected_config_path);
}
#[test]
fn homebrew_prefix_for_exe_detects_supported_layouts() {
assert_eq!(
homebrew_prefix_for_exe(Path::new("/opt/homebrew/bin/zeroclaw")),
Some("/opt/homebrew")
);
assert_eq!(
homebrew_prefix_for_exe(Path::new(
"/opt/homebrew/Cellar/zeroclaw/0.5.0/bin/zeroclaw",
)),
Some("/opt/homebrew")
);
assert_eq!(
homebrew_prefix_for_exe(Path::new("/usr/local/bin/zeroclaw")),
Some("/usr/local")
);
assert_eq!(homebrew_prefix_for_exe(Path::new("/tmp/zeroclaw")), None);
}
#[test]
fn quick_setup_homebrew_service_note_mentions_service_workspace() {
let note = quick_setup_homebrew_service_note(
Path::new("/Users/alix/.zeroclaw/config.toml"),
Path::new("/Users/alix/.zeroclaw/workspace"),
Path::new("/opt/homebrew/bin/zeroclaw"),
)
.expect("homebrew installs should emit a service workspace note");
assert!(note.contains("/opt/homebrew/var/zeroclaw/workspace"));
assert!(note.contains("/opt/homebrew/var/zeroclaw/config.toml"));
assert!(note.contains("/Users/alix/.zeroclaw/config.toml"));
}
#[test]
fn quick_setup_homebrew_service_note_skips_matching_service_layout() {
let service_config = Path::new("/opt/homebrew/var/zeroclaw/config.toml");
let service_workspace = Path::new("/opt/homebrew/var/zeroclaw/workspace");
assert!(quick_setup_homebrew_service_note(
service_config,
service_workspace,
Path::new("/opt/homebrew/bin/zeroclaw"),
)
.is_none());
}
// ── scaffold_workspace: basic file creation ─────────────────
#[tokio::test]
+50 -41
View File
@@ -211,9 +211,9 @@ impl AnthropicProvider {
text.len() > 3072
}
/// Cache conversations with more than 4 messages (excluding system)
/// Cache conversations with more than 1 non-system message (i.e. after first exchange)
fn should_cache_conversation(messages: &[ChatMessage]) -> bool {
messages.iter().filter(|m| m.role != "system").count() > 4
messages.iter().filter(|m| m.role != "system").count() > 1
}
/// Apply cache control to the last message content block
@@ -447,17 +447,13 @@ impl AnthropicProvider {
}
}
// Convert system text to SystemPrompt with cache control if large
// Always use Blocks format with cache_control for system prompts
let system_prompt = system_text.map(|text| {
if Self::should_cache_system(&text) {
SystemPrompt::Blocks(vec![SystemBlock {
block_type: "text".to_string(),
text,
cache_control: Some(CacheControl::ephemeral()),
}])
} else {
SystemPrompt::String(text)
}
SystemPrompt::Blocks(vec![SystemBlock {
block_type: "text".to_string(),
text,
cache_control: Some(CacheControl::ephemeral()),
}])
});
(system_prompt, native_messages)
@@ -1063,12 +1059,8 @@ mod tests {
role: "user".to_string(),
content: "Hello".to_string(),
},
ChatMessage {
role: "assistant".to_string(),
content: "Hi".to_string(),
},
];
// Only 2 non-system messages
// Only 1 non-system message — should not cache
assert!(!AnthropicProvider::should_cache_conversation(&messages));
}
@@ -1078,8 +1070,8 @@ mod tests {
role: "system".to_string(),
content: "System prompt".to_string(),
}];
// Add 5 non-system messages
for i in 0..5 {
// Add 3 non-system messages
for i in 0..3 {
messages.push(ChatMessage {
role: if i % 2 == 0 { "user" } else { "assistant" }.to_string(),
content: format!("Message {i}"),
@@ -1090,21 +1082,24 @@ mod tests {
#[test]
fn should_cache_conversation_boundary() {
let mut messages = vec![];
// Add exactly 4 non-system messages
for i in 0..4 {
messages.push(ChatMessage {
role: if i % 2 == 0 { "user" } else { "assistant" }.to_string(),
content: format!("Message {i}"),
});
}
let messages = vec![ChatMessage {
role: "user".to_string(),
content: "Hello".to_string(),
}];
// Exactly 1 non-system message — should not cache
assert!(!AnthropicProvider::should_cache_conversation(&messages));
// Add one more to cross boundary
messages.push(ChatMessage {
role: "user".to_string(),
content: "One more".to_string(),
});
// Add one more to cross boundary (>1)
let messages = vec![
ChatMessage {
role: "user".to_string(),
content: "Hello".to_string(),
},
ChatMessage {
role: "assistant".to_string(),
content: "Hi".to_string(),
},
];
assert!(AnthropicProvider::should_cache_conversation(&messages));
}
@@ -1217,7 +1212,7 @@ mod tests {
}
#[test]
fn convert_messages_small_system_prompt() {
fn convert_messages_small_system_prompt_uses_blocks_with_cache() {
let messages = vec![ChatMessage {
role: "system".to_string(),
content: "Short system prompt".to_string(),
@@ -1226,10 +1221,17 @@ mod tests {
let (system_prompt, _) = AnthropicProvider::convert_messages(&messages);
match system_prompt.unwrap() {
SystemPrompt::String(s) => {
assert_eq!(s, "Short system prompt");
SystemPrompt::Blocks(blocks) => {
assert_eq!(blocks.len(), 1);
assert_eq!(blocks[0].text, "Short system prompt");
assert!(
blocks[0].cache_control.is_some(),
"Small system prompts should have cache_control"
);
}
SystemPrompt::String(_) => {
panic!("Expected Blocks variant with cache_control for small prompt")
}
SystemPrompt::Blocks(_) => panic!("Expected String variant for small prompt"),
}
}
@@ -1254,12 +1256,16 @@ mod tests {
}
#[test]
fn backward_compatibility_native_chat_request() {
// Test that requests without cache_control serialize identically to old format
fn native_chat_request_with_blocks_system() {
// System prompts now always use Blocks format with cache_control
let req = NativeChatRequest {
model: "claude-3-opus".to_string(),
max_tokens: 4096,
system: Some(SystemPrompt::String("System".to_string())),
system: Some(SystemPrompt::Blocks(vec![SystemBlock {
block_type: "text".to_string(),
text: "System".to_string(),
cache_control: Some(CacheControl::ephemeral()),
}])),
messages: vec![NativeMessage {
role: "user".to_string(),
content: vec![NativeContentOut::Text {
@@ -1272,8 +1278,11 @@ mod tests {
};
let json = serde_json::to_string(&req).unwrap();
assert!(!json.contains("cache_control"));
assert!(json.contains(r#""system":"System""#));
assert!(json.contains("System"));
assert!(
json.contains(r#""cache_control":{"type":"ephemeral"}"#),
"System prompt should include cache_control"
);
}
#[tokio::test]
+61 -7
View File
@@ -409,13 +409,43 @@ fn has_shell_shebang(path: &Path) -> bool {
return false;
};
let prefix = &content[..content.len().min(128)];
let shebang = String::from_utf8_lossy(prefix).to_ascii_lowercase();
shebang.starts_with("#!")
&& (shebang.contains("sh")
|| shebang.contains("bash")
|| shebang.contains("zsh")
|| shebang.contains("pwsh")
|| shebang.contains("powershell"))
let shebang_line = String::from_utf8_lossy(prefix)
.lines()
.next()
.unwrap_or_default()
.trim()
.to_ascii_lowercase();
let Some(interpreter) = shebang_interpreter(&shebang_line) else {
return false;
};
matches!(
interpreter,
"sh" | "bash" | "zsh" | "ksh" | "fish" | "pwsh" | "powershell"
)
}
fn shebang_interpreter(line: &str) -> Option<&str> {
let shebang = line.strip_prefix("#!")?.trim();
if shebang.is_empty() {
return None;
}
let mut parts = shebang.split_whitespace();
let first = parts.next()?;
let first_basename = Path::new(first).file_name()?.to_str()?;
if first_basename == "env" {
for part in parts {
if part.starts_with('-') {
continue;
}
return Path::new(part).file_name()?.to_str();
}
return None;
}
Some(first_basename)
}
fn extract_markdown_links(content: &str) -> Vec<String> {
@@ -586,6 +616,30 @@ mod tests {
);
}
#[test]
fn audit_allows_python_shebang_file_when_early_text_contains_sh() {
let dir = tempfile::tempdir().unwrap();
let skill_dir = dir.path().join("python-helper");
let scripts_dir = skill_dir.join("scripts");
std::fs::create_dir_all(&scripts_dir).unwrap();
std::fs::write(skill_dir.join("SKILL.md"), "# Skill\n").unwrap();
std::fs::write(
scripts_dir.join("helper.py"),
"#!/usr/bin/env python3\n\"\"\"Refresh report cache.\"\"\"\n\nprint(\"ok\")\n",
)
.unwrap();
let report = audit_skill_directory(&skill_dir).unwrap();
assert!(
!report
.findings
.iter()
.any(|finding| finding.contains("script-like files are blocked")),
"{:#?}",
report.findings
);
}
#[test]
fn audit_rejects_markdown_escape_links() {
let dir = tempfile::tempdir().unwrap();
+47 -1
View File
@@ -130,6 +130,11 @@ impl Tool for CronAddTool {
"type": "string",
"description": "Optional model override for agent jobs, e.g. 'x-ai/grok-4-1-fast'"
},
"allowed_tools": {
"type": "array",
"items": { "type": "string" },
"description": "Optional allowlist of tool names for agent jobs. When omitted, all tools remain available."
},
"delivery": {
"type": "object",
"description": "Optional delivery config to send job output to a channel after each run. When provided, all three of mode, channel, and to are expected.",
@@ -288,6 +293,19 @@ impl Tool for CronAddTool {
.get("model")
.and_then(serde_json::Value::as_str)
.map(str::to_string);
let allowed_tools = match args.get("allowed_tools") {
Some(v) => match serde_json::from_value::<Vec<String>>(v.clone()) {
Ok(v) => Some(v),
Err(e) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("Invalid allowed_tools: {e}")),
});
}
},
None => None,
};
let delivery = match args.get("delivery") {
Some(v) => match serde_json::from_value::<DeliveryConfig>(v.clone()) {
@@ -316,6 +334,7 @@ impl Tool for CronAddTool {
model,
delivery,
delete_after_run,
allowed_tools,
)
}
};
@@ -329,7 +348,8 @@ impl Tool for CronAddTool {
"job_type": job.job_type,
"schedule": job.schedule,
"next_run": job.next_run,
"enabled": job.enabled
"enabled": job.enabled,
"allowed_tools": job.allowed_tools
}))?,
error: None,
}),
@@ -612,6 +632,32 @@ mod tests {
.contains("Missing 'prompt'"));
}
#[tokio::test]
async fn agent_job_persists_allowed_tools() {
let tmp = TempDir::new().unwrap();
let cfg = test_config(&tmp).await;
let tool = CronAddTool::new(cfg.clone(), test_security(&cfg));
let result = tool
.execute(json!({
"schedule": { "kind": "cron", "expr": "*/5 * * * *" },
"job_type": "agent",
"prompt": "check status",
"allowed_tools": ["file_read", "web_search"]
}))
.await
.unwrap();
assert!(result.success, "{:?}", result.error);
let jobs = cron::list_jobs(&cfg).unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(
jobs[0].allowed_tools,
Some(vec!["file_read".into(), "web_search".into()])
);
}
#[tokio::test]
async fn delivery_schema_includes_matrix_channel() {
let tmp = TempDir::new().unwrap();
+42
View File
@@ -89,6 +89,11 @@ impl Tool for CronUpdateTool {
"type": "string",
"description": "Model override for agent jobs, e.g. 'x-ai/grok-4-1-fast'"
},
"allowed_tools": {
"type": "array",
"items": { "type": "string" },
"description": "Optional replacement allowlist of tool names for agent jobs"
},
"session_target": {
"type": "string",
"enum": ["isolated", "main"],
@@ -403,6 +408,7 @@ mod tests {
"command",
"prompt",
"model",
"allowed_tools",
"session_target",
"delete_after_run",
"schedule",
@@ -501,4 +507,40 @@ mod tests {
.contains("Rate limit exceeded"));
assert!(cron::get_job(&cfg, &job.id).unwrap().enabled);
}
#[tokio::test]
async fn updates_agent_allowed_tools() {
let tmp = TempDir::new().unwrap();
let cfg = test_config(&tmp).await;
let job = cron::add_agent_job(
&cfg,
None,
crate::cron::Schedule::Cron {
expr: "*/5 * * * *".into(),
tz: None,
},
"check status",
crate::cron::SessionTarget::Isolated,
None,
None,
false,
None,
)
.unwrap();
let tool = CronUpdateTool::new(cfg.clone(), test_security(&cfg));
let result = tool
.execute(json!({
"job_id": job.id,
"patch": { "allowed_tools": ["file_read", "web_search"] }
}))
.await
.unwrap();
assert!(result.success, "{:?}", result.error);
assert_eq!(
cron::get_job(&cfg, &job.id).unwrap().allowed_tools,
Some(vec!["file_read".into(), "web_search".into()])
);
}
}
+1
View File
@@ -418,6 +418,7 @@ impl DelegateTool {
true,
None,
"delegate",
None,
&self.multimodal_config,
agent_config.max_iterations,
None,
+7 -2
View File
@@ -146,7 +146,7 @@ pub use workspace_tool::WorkspaceTool;
use crate::config::{Config, DelegateAgentConfig};
use crate::memory::Memory;
use crate::runtime::{NativeRuntime, RuntimeAdapter};
use crate::security::SecurityPolicy;
use crate::security::{create_sandbox, SecurityPolicy};
use async_trait::async_trait;
use parking_lot::RwLock;
use std::collections::HashMap;
@@ -283,8 +283,13 @@ pub fn all_tools_with_runtime(
root_config: &crate::config::Config,
) -> (Vec<Box<dyn Tool>>, Option<DelegateParentToolsHandle>) {
let has_shell_access = runtime.has_shell_access();
let sandbox = create_sandbox(&root_config.security);
let mut tool_arcs: Vec<Arc<dyn Tool>> = vec![
Arc::new(ShellTool::new(security.clone(), runtime)),
Arc::new(ShellTool::new_with_sandbox(
security.clone(),
runtime,
sandbox,
)),
Arc::new(FileReadTool::new(security.clone())),
Arc::new(FileWriteTool::new(security.clone())),
Arc::new(FileEditTool::new(security.clone())),
+82 -1
View File
@@ -1,5 +1,6 @@
use super::traits::{Tool, ToolResult};
use crate::runtime::RuntimeAdapter;
use crate::security::traits::Sandbox;
use crate::security::SecurityPolicy;
use async_trait::async_trait;
use serde_json::json;
@@ -44,11 +45,28 @@ const SAFE_ENV_VARS: &[&str] = &[
pub struct ShellTool {
security: Arc<SecurityPolicy>,
runtime: Arc<dyn RuntimeAdapter>,
sandbox: Arc<dyn Sandbox>,
}
impl ShellTool {
pub fn new(security: Arc<SecurityPolicy>, runtime: Arc<dyn RuntimeAdapter>) -> Self {
Self { security, runtime }
Self {
security,
runtime,
sandbox: Arc::new(crate::security::NoopSandbox),
}
}
pub fn new_with_sandbox(
security: Arc<SecurityPolicy>,
runtime: Arc<dyn RuntimeAdapter>,
sandbox: Arc<dyn Sandbox>,
) -> Self {
Self {
security,
runtime,
sandbox,
}
}
}
@@ -169,6 +187,14 @@ impl Tool for ShellTool {
});
}
};
// Apply sandbox wrapping before execution.
// The Sandbox trait operates on std::process::Command, so use as_std_mut()
// to get a mutable reference to the underlying command.
self.sandbox
.wrap_command(cmd.as_std_mut())
.map_err(|e| anyhow::anyhow!("Sandbox error: {}", e))?;
cmd.env_clear();
for var in collect_allowed_shell_env_vars(&self.security) {
@@ -690,4 +716,59 @@ mod tests {
|| r2.error.as_deref().unwrap_or("").contains("budget")
);
}
// ── Sandbox integration tests ────────────────────────
#[test]
fn shell_tool_can_be_constructed_with_sandbox() {
use crate::security::NoopSandbox;
let sandbox: Arc<dyn Sandbox> = Arc::new(NoopSandbox);
let tool = ShellTool::new_with_sandbox(
test_security(AutonomyLevel::Supervised),
test_runtime(),
sandbox,
);
assert_eq!(tool.name(), "shell");
}
#[test]
fn noop_sandbox_does_not_modify_command() {
use crate::security::NoopSandbox;
let sandbox = NoopSandbox;
let mut cmd = std::process::Command::new("echo");
cmd.arg("hello");
let program_before = cmd.get_program().to_os_string();
let args_before: Vec<_> = cmd.get_args().map(|a| a.to_os_string()).collect();
sandbox
.wrap_command(&mut cmd)
.expect("wrap_command should succeed");
assert_eq!(cmd.get_program(), program_before);
assert_eq!(
cmd.get_args().map(|a| a.to_os_string()).collect::<Vec<_>>(),
args_before
);
}
#[tokio::test]
async fn shell_executes_with_sandbox() {
use crate::security::NoopSandbox;
let sandbox: Arc<dyn Sandbox> = Arc::new(NoopSandbox);
let tool = ShellTool::new_with_sandbox(
test_security(AutonomyLevel::Supervised),
test_runtime(),
sandbox,
);
let result = tool
.execute(json!({"command": "echo sandbox_test"}))
.await
.expect("command with sandbox should succeed");
assert!(result.success);
assert!(result.output.contains("sandbox_test"));
}
}
BIN
View File
Binary file not shown.

After

Width:  |  Height:  |  Size: 2.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.1 MiB

+19
View File
@@ -193,6 +193,25 @@ export function getCronRuns(
).then((data) => unwrapField(data, 'runs'));
}
export interface CronSettings {
enabled: boolean;
catch_up_on_startup: boolean;
max_run_history: number;
}
export function getCronSettings(): Promise<CronSettings> {
return apiFetch<CronSettings>('/api/cron/settings');
}
export function patchCronSettings(
patch: Partial<CronSettings>,
): Promise<CronSettings> {
return apiFetch<CronSettings & { status: string }>('/api/cron/settings', {
method: 'PATCH',
body: JSON.stringify(patch),
});
}
// ---------------------------------------------------------------------------
// Integrations
// ---------------------------------------------------------------------------
+62 -1
View File
@@ -12,7 +12,15 @@ import {
RefreshCw,
} from 'lucide-react';
import type { CronJob, CronRun } from '@/types/api';
import { getCronJobs, addCronJob, deleteCronJob, getCronRuns } from '@/lib/api';
import {
getCronJobs,
addCronJob,
deleteCronJob,
getCronRuns,
getCronSettings,
patchCronSettings,
} from '@/lib/api';
import type { CronSettings } from '@/lib/api';
import { t } from '@/lib/i18n';
function formatDate(iso: string | null): string {
@@ -143,6 +151,8 @@ export default function Cron() {
const [showForm, setShowForm] = useState(false);
const [confirmDelete, setConfirmDelete] = useState<string | null>(null);
const [expandedJob, setExpandedJob] = useState<string | null>(null);
const [settings, setSettings] = useState<CronSettings | null>(null);
const [togglingCatchUp, setTogglingCatchUp] = useState(false);
// Form state
const [formName, setFormName] = useState('');
@@ -159,8 +169,28 @@ export default function Cron() {
.finally(() => setLoading(false));
};
const fetchSettings = () => {
getCronSettings().then(setSettings).catch(() => {});
};
const toggleCatchUp = async () => {
if (!settings) return;
setTogglingCatchUp(true);
try {
const updated = await patchCronSettings({
catch_up_on_startup: !settings.catch_up_on_startup,
});
setSettings(updated);
} catch {
// silently fail — user can retry
} finally {
setTogglingCatchUp(false);
}
};
useEffect(() => {
fetchJobs();
fetchSettings();
}, []);
const handleAdd = async () => {
@@ -250,6 +280,37 @@ export default function Cron() {
</button>
</div>
{/* Catch-up toggle */}
{settings && (
<div className="glass-card px-4 py-3 flex items-center justify-between">
<div>
<span className="text-sm font-medium text-white">
Catch up missed jobs on startup
</span>
<p className="text-xs text-[#556080] mt-0.5">
Run all overdue jobs when ZeroClaw starts after downtime
</p>
</div>
<button
onClick={toggleCatchUp}
disabled={togglingCatchUp}
className={`relative inline-flex h-6 w-11 items-center rounded-full transition-colors duration-300 focus:outline-none ${
settings.catch_up_on_startup
? 'bg-[#0080ff]'
: 'bg-[#1a1a3e]'
}`}
>
<span
className={`inline-block h-4 w-4 rounded-full bg-white transition-transform duration-300 ${
settings.catch_up_on_startup
? 'translate-x-6'
: 'translate-x-1'
}`}
/>
</button>
</div>
)}
{/* Add Job Form Modal */}
{showForm && (
<div className="fixed inset-0 modal-backdrop flex items-center justify-center z-50">