Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e387f58579 | |||
| fa06798926 | |||
| a4cd4b287e | |||
| 3ce7f2345e | |||
| eb9dfc04b4 | |||
| 9cc74a2698 | |||
| 133dc46b41 | |||
| ad03605cad | |||
| ae1acf9b9c | |||
| cc91f22e9b | |||
| 030f5fe288 | |||
| c47bbcc972 | |||
| 0d28cca843 | |||
| 7ddd2aace3 | |||
| c7b3b762e0 | |||
| 4b00e8ba75 |
@@ -16,6 +16,7 @@ env:
|
||||
CARGO_TERM_COLOR: always
|
||||
REGISTRY: ghcr.io
|
||||
IMAGE_NAME: ${{ github.repository }}
|
||||
RELEASE_CARGO_FEATURES: channel-matrix,channel-lark,memory-postgres
|
||||
|
||||
jobs:
|
||||
version:
|
||||
@@ -213,7 +214,7 @@ jobs:
|
||||
if [ -n "${{ matrix.linker_env || '' }}" ] && [ -n "${{ matrix.linker || '' }}" ]; then
|
||||
export "${{ matrix.linker_env }}=${{ matrix.linker }}"
|
||||
fi
|
||||
cargo build --release --locked --features channel-matrix,channel-lark,memory-postgres --target ${{ matrix.target }}
|
||||
cargo build --release --locked --features "${{ env.RELEASE_CARGO_FEATURES }}" --target ${{ matrix.target }}
|
||||
|
||||
- name: Package (Unix)
|
||||
if: runner.os != 'Windows'
|
||||
@@ -345,8 +346,6 @@ jobs:
|
||||
with:
|
||||
context: docker-ctx
|
||||
push: true
|
||||
build-args: |
|
||||
ZEROCLAW_CARGO_FEATURES=channel-matrix,channel-lark,memory-postgres
|
||||
tags: |
|
||||
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ needs.version.outputs.tag }}
|
||||
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:beta
|
||||
|
||||
@@ -20,6 +20,7 @@ env:
|
||||
CARGO_TERM_COLOR: always
|
||||
REGISTRY: ghcr.io
|
||||
IMAGE_NAME: ${{ github.repository }}
|
||||
RELEASE_CARGO_FEATURES: channel-matrix,channel-lark,memory-postgres
|
||||
|
||||
jobs:
|
||||
validate:
|
||||
@@ -214,7 +215,7 @@ jobs:
|
||||
if [ -n "${{ matrix.linker_env || '' }}" ] && [ -n "${{ matrix.linker || '' }}" ]; then
|
||||
export "${{ matrix.linker_env }}=${{ matrix.linker }}"
|
||||
fi
|
||||
cargo build --release --locked --features channel-matrix,channel-lark,memory-postgres --target ${{ matrix.target }}
|
||||
cargo build --release --locked --features "${{ env.RELEASE_CARGO_FEATURES }}" --target ${{ matrix.target }}
|
||||
|
||||
- name: Package (Unix)
|
||||
if: runner.os != 'Windows'
|
||||
@@ -388,8 +389,6 @@ jobs:
|
||||
with:
|
||||
context: docker-ctx
|
||||
push: true
|
||||
build-args: |
|
||||
ZEROCLAW_CARGO_FEATURES=channel-matrix,channel-lark,memory-postgres
|
||||
tags: |
|
||||
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ needs.validate.outputs.tag }}
|
||||
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
|
||||
|
||||
+13
-27
@@ -448,46 +448,32 @@ bool_to_word() {
|
||||
fi
|
||||
}
|
||||
|
||||
guided_input_stream() {
|
||||
# Some constrained containers report interactive stdin (-t 0) but deny
|
||||
# opening /dev/stdin directly. Probe readability before selecting it.
|
||||
if [[ -t 0 ]] && (: </dev/stdin) 2>/dev/null; then
|
||||
echo "/dev/stdin"
|
||||
guided_open_input() {
|
||||
# Use stdin directly when it is an interactive terminal (e.g. SSH into LXC).
|
||||
# Subshell probing of /dev/stdin fails in some constrained containers even
|
||||
# when FD 0 is perfectly usable, so skip the probe and trust -t 0.
|
||||
if [[ -t 0 ]]; then
|
||||
GUIDED_FD=0
|
||||
return 0
|
||||
fi
|
||||
|
||||
if [[ -t 0 ]] && (: </proc/self/fd/0) 2>/dev/null; then
|
||||
echo "/proc/self/fd/0"
|
||||
return 0
|
||||
fi
|
||||
|
||||
if (: </dev/tty) 2>/dev/null; then
|
||||
echo "/dev/tty"
|
||||
return 0
|
||||
fi
|
||||
|
||||
return 1
|
||||
# Non-interactive stdin: try to open /dev/tty as an explicit fd.
|
||||
exec {GUIDED_FD}</dev/tty 2>/dev/null || return 1
|
||||
}
|
||||
|
||||
guided_read() {
|
||||
local __target_var="$1"
|
||||
local __prompt="$2"
|
||||
local __silent="${3:-false}"
|
||||
local __input_source=""
|
||||
local __value=""
|
||||
|
||||
if ! __input_source="$(guided_input_stream)"; then
|
||||
return 1
|
||||
fi
|
||||
[[ -n "${GUIDED_FD:-}" ]] || guided_open_input || return 1
|
||||
|
||||
if [[ "$__silent" == true ]]; then
|
||||
if ! read -r -s -p "$__prompt" __value <"$__input_source"; then
|
||||
return 1
|
||||
fi
|
||||
read -r -s -u "$GUIDED_FD" -p "$__prompt" __value || return 1
|
||||
echo
|
||||
else
|
||||
if ! read -r -p "$__prompt" __value <"$__input_source"; then
|
||||
return 1
|
||||
fi
|
||||
read -r -u "$GUIDED_FD" -p "$__prompt" __value || return 1
|
||||
fi
|
||||
|
||||
printf -v "$__target_var" '%s' "$__value"
|
||||
@@ -708,7 +694,7 @@ prompt_model() {
|
||||
run_guided_installer() {
|
||||
local os_name="$1"
|
||||
|
||||
if ! guided_input_stream >/dev/null; then
|
||||
if ! guided_open_input >/dev/null; then
|
||||
error "guided installer requires an interactive terminal."
|
||||
error "Run from a terminal, or pass --no-guided with explicit flags."
|
||||
exit 1
|
||||
|
||||
@@ -2181,6 +2181,7 @@ pub(crate) async fn agent_turn(
|
||||
temperature: f64,
|
||||
silent: bool,
|
||||
channel_name: &str,
|
||||
channel_reply_target: Option<&str>,
|
||||
multimodal_config: &crate::config::MultimodalConfig,
|
||||
max_tool_iterations: usize,
|
||||
approval: Option<&ApprovalManager>,
|
||||
@@ -2200,6 +2201,7 @@ pub(crate) async fn agent_turn(
|
||||
silent,
|
||||
approval,
|
||||
channel_name,
|
||||
channel_reply_target,
|
||||
multimodal_config,
|
||||
max_tool_iterations,
|
||||
None,
|
||||
@@ -2213,6 +2215,100 @@ pub(crate) async fn agent_turn(
|
||||
.await
|
||||
}
|
||||
|
||||
fn maybe_inject_channel_delivery_defaults(
|
||||
tool_name: &str,
|
||||
tool_args: &mut serde_json::Value,
|
||||
channel_name: &str,
|
||||
channel_reply_target: Option<&str>,
|
||||
) {
|
||||
if tool_name != "cron_add" {
|
||||
return;
|
||||
}
|
||||
|
||||
if !matches!(
|
||||
channel_name,
|
||||
"telegram" | "discord" | "slack" | "mattermost" | "matrix"
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
let Some(reply_target) = channel_reply_target
|
||||
.map(str::trim)
|
||||
.filter(|value| !value.is_empty())
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
let Some(args) = tool_args.as_object_mut() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let is_agent_job = args
|
||||
.get("job_type")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.is_some_and(|job_type| job_type.eq_ignore_ascii_case("agent"))
|
||||
|| args
|
||||
.get("prompt")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.is_some_and(|prompt| !prompt.trim().is_empty());
|
||||
if !is_agent_job {
|
||||
return;
|
||||
}
|
||||
|
||||
let default_delivery = || {
|
||||
serde_json::json!({
|
||||
"mode": "announce",
|
||||
"channel": channel_name,
|
||||
"to": reply_target,
|
||||
})
|
||||
};
|
||||
|
||||
match args.get_mut("delivery") {
|
||||
None => {
|
||||
args.insert("delivery".to_string(), default_delivery());
|
||||
}
|
||||
Some(serde_json::Value::Null) => {
|
||||
*args.get_mut("delivery").expect("delivery key exists") = default_delivery();
|
||||
}
|
||||
Some(serde_json::Value::Object(delivery)) => {
|
||||
if delivery
|
||||
.get("mode")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.is_some_and(|mode| mode.eq_ignore_ascii_case("none"))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
delivery
|
||||
.entry("mode".to_string())
|
||||
.or_insert_with(|| serde_json::Value::String("announce".to_string()));
|
||||
|
||||
let needs_channel = delivery
|
||||
.get("channel")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.is_none_or(|value| value.trim().is_empty());
|
||||
if needs_channel {
|
||||
delivery.insert(
|
||||
"channel".to_string(),
|
||||
serde_json::Value::String(channel_name.to_string()),
|
||||
);
|
||||
}
|
||||
|
||||
let needs_target = delivery
|
||||
.get("to")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.is_none_or(|value| value.trim().is_empty());
|
||||
if needs_target {
|
||||
delivery.insert(
|
||||
"to".to_string(),
|
||||
serde_json::Value::String(reply_target.to_string()),
|
||||
);
|
||||
}
|
||||
}
|
||||
Some(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_one_tool(
|
||||
call_name: &str,
|
||||
call_arguments: serde_json::Value,
|
||||
@@ -2406,6 +2502,7 @@ pub(crate) async fn run_tool_call_loop(
|
||||
silent: bool,
|
||||
approval: Option<&ApprovalManager>,
|
||||
channel_name: &str,
|
||||
channel_reply_target: Option<&str>,
|
||||
multimodal_config: &crate::config::MultimodalConfig,
|
||||
max_tool_iterations: usize,
|
||||
cancellation_token: Option<CancellationToken>,
|
||||
@@ -2816,6 +2913,13 @@ pub(crate) async fn run_tool_call_loop(
|
||||
}
|
||||
}
|
||||
|
||||
maybe_inject_channel_delivery_defaults(
|
||||
&tool_name,
|
||||
&mut tool_args,
|
||||
channel_name,
|
||||
channel_reply_target,
|
||||
);
|
||||
|
||||
// ── Approval hook ────────────────────────────────
|
||||
if let Some(mgr) = approval {
|
||||
if mgr.needs_approval(&tool_name) {
|
||||
@@ -3558,6 +3662,7 @@ pub async fn run(
|
||||
false,
|
||||
approval_manager.as_ref(),
|
||||
channel_name,
|
||||
None,
|
||||
&config.multimodal,
|
||||
config.agent.max_tool_iterations,
|
||||
None,
|
||||
@@ -3784,6 +3889,7 @@ pub async fn run(
|
||||
false,
|
||||
approval_manager.as_ref(),
|
||||
channel_name,
|
||||
None,
|
||||
&config.multimodal,
|
||||
config.agent.max_tool_iterations,
|
||||
None,
|
||||
@@ -4163,6 +4269,7 @@ pub async fn process_message(
|
||||
config.default_temperature,
|
||||
true,
|
||||
"daemon",
|
||||
None,
|
||||
&config.multimodal,
|
||||
config.agent.max_tool_iterations,
|
||||
Some(&approval_manager),
|
||||
@@ -4481,6 +4588,57 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
struct RecordingArgsTool {
|
||||
name: String,
|
||||
recorded_args: Arc<Mutex<Vec<serde_json::Value>>>,
|
||||
}
|
||||
|
||||
impl RecordingArgsTool {
|
||||
fn new(name: &str, recorded_args: Arc<Mutex<Vec<serde_json::Value>>>) -> Self {
|
||||
Self {
|
||||
name: name.to_string(),
|
||||
recorded_args,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for RecordingArgsTool {
|
||||
fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
fn description(&self) -> &str {
|
||||
"Records tool arguments for regression tests"
|
||||
}
|
||||
|
||||
fn parameters_schema(&self) -> serde_json::Value {
|
||||
serde_json::json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"prompt": { "type": "string" },
|
||||
"schedule": { "type": "object" },
|
||||
"delivery": { "type": "object" }
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn execute(
|
||||
&self,
|
||||
args: serde_json::Value,
|
||||
) -> anyhow::Result<crate::tools::ToolResult> {
|
||||
self.recorded_args
|
||||
.lock()
|
||||
.expect("recorded args lock should be valid")
|
||||
.push(args.clone());
|
||||
Ok(crate::tools::ToolResult {
|
||||
success: true,
|
||||
output: args.to_string(),
|
||||
error: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct DelayTool {
|
||||
name: String,
|
||||
delay_ms: u64,
|
||||
@@ -4619,6 +4777,7 @@ mod tests {
|
||||
true,
|
||||
None,
|
||||
"cli",
|
||||
None,
|
||||
&crate::config::MultimodalConfig::default(),
|
||||
3,
|
||||
None,
|
||||
@@ -4668,6 +4827,7 @@ mod tests {
|
||||
true,
|
||||
None,
|
||||
"cli",
|
||||
None,
|
||||
&multimodal,
|
||||
3,
|
||||
None,
|
||||
@@ -4711,6 +4871,7 @@ mod tests {
|
||||
true,
|
||||
None,
|
||||
"cli",
|
||||
None,
|
||||
&crate::config::MultimodalConfig::default(),
|
||||
3,
|
||||
None,
|
||||
@@ -4840,6 +5001,7 @@ mod tests {
|
||||
true,
|
||||
Some(&approval_mgr),
|
||||
"telegram",
|
||||
None,
|
||||
&crate::config::MultimodalConfig::default(),
|
||||
4,
|
||||
None,
|
||||
@@ -4877,6 +5039,122 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn run_tool_call_loop_injects_channel_delivery_defaults_for_cron_add() {
|
||||
let provider = ScriptedProvider::from_text_responses(vec![
|
||||
r#"<tool_call>
|
||||
{"name":"cron_add","arguments":{"job_type":"agent","prompt":"remind me later","schedule":{"kind":"every","every_ms":60000}}}
|
||||
</tool_call>"#,
|
||||
"done",
|
||||
]);
|
||||
|
||||
let recorded_args = Arc::new(Mutex::new(Vec::new()));
|
||||
let tools_registry: Vec<Box<dyn Tool>> = vec![Box::new(RecordingArgsTool::new(
|
||||
"cron_add",
|
||||
Arc::clone(&recorded_args),
|
||||
))];
|
||||
|
||||
let mut history = vec![
|
||||
ChatMessage::system("test-system"),
|
||||
ChatMessage::user("schedule a reminder"),
|
||||
];
|
||||
let observer = NoopObserver;
|
||||
|
||||
let result = run_tool_call_loop(
|
||||
&provider,
|
||||
&mut history,
|
||||
&tools_registry,
|
||||
&observer,
|
||||
"mock-provider",
|
||||
"mock-model",
|
||||
0.0,
|
||||
true,
|
||||
None,
|
||||
"telegram",
|
||||
Some("chat-42"),
|
||||
&crate::config::MultimodalConfig::default(),
|
||||
4,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
&[],
|
||||
&[],
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("cron_add delivery defaults should be injected");
|
||||
|
||||
assert_eq!(result, "done");
|
||||
|
||||
let recorded = recorded_args
|
||||
.lock()
|
||||
.expect("recorded args lock should be valid");
|
||||
let delivery = recorded[0]["delivery"].clone();
|
||||
assert_eq!(
|
||||
delivery,
|
||||
serde_json::json!({
|
||||
"mode": "announce",
|
||||
"channel": "telegram",
|
||||
"to": "chat-42",
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn run_tool_call_loop_preserves_explicit_cron_delivery_none() {
|
||||
let provider = ScriptedProvider::from_text_responses(vec![
|
||||
r#"<tool_call>
|
||||
{"name":"cron_add","arguments":{"job_type":"agent","prompt":"run silently","schedule":{"kind":"every","every_ms":60000},"delivery":{"mode":"none"}}}
|
||||
</tool_call>"#,
|
||||
"done",
|
||||
]);
|
||||
|
||||
let recorded_args = Arc::new(Mutex::new(Vec::new()));
|
||||
let tools_registry: Vec<Box<dyn Tool>> = vec![Box::new(RecordingArgsTool::new(
|
||||
"cron_add",
|
||||
Arc::clone(&recorded_args),
|
||||
))];
|
||||
|
||||
let mut history = vec![
|
||||
ChatMessage::system("test-system"),
|
||||
ChatMessage::user("schedule a quiet cron job"),
|
||||
];
|
||||
let observer = NoopObserver;
|
||||
|
||||
let result = run_tool_call_loop(
|
||||
&provider,
|
||||
&mut history,
|
||||
&tools_registry,
|
||||
&observer,
|
||||
"mock-provider",
|
||||
"mock-model",
|
||||
0.0,
|
||||
true,
|
||||
None,
|
||||
"telegram",
|
||||
Some("chat-42"),
|
||||
&crate::config::MultimodalConfig::default(),
|
||||
4,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
&[],
|
||||
&[],
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("explicit delivery mode should be preserved");
|
||||
|
||||
assert_eq!(result, "done");
|
||||
|
||||
let recorded = recorded_args
|
||||
.lock()
|
||||
.expect("recorded args lock should be valid");
|
||||
assert_eq!(recorded[0]["delivery"], serde_json::json!({"mode": "none"}));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn run_tool_call_loop_deduplicates_repeated_tool_calls() {
|
||||
let provider = ScriptedProvider::from_text_responses(vec![
|
||||
@@ -4912,6 +5190,7 @@ mod tests {
|
||||
true,
|
||||
None,
|
||||
"cli",
|
||||
None,
|
||||
&crate::config::MultimodalConfig::default(),
|
||||
4,
|
||||
None,
|
||||
@@ -4980,6 +5259,7 @@ mod tests {
|
||||
true,
|
||||
Some(&approval_mgr),
|
||||
"telegram",
|
||||
None,
|
||||
&crate::config::MultimodalConfig::default(),
|
||||
4,
|
||||
None,
|
||||
@@ -5039,6 +5319,7 @@ mod tests {
|
||||
true,
|
||||
None,
|
||||
"cli",
|
||||
None,
|
||||
&crate::config::MultimodalConfig::default(),
|
||||
4,
|
||||
None,
|
||||
@@ -5118,6 +5399,7 @@ mod tests {
|
||||
true,
|
||||
None,
|
||||
"cli",
|
||||
None,
|
||||
&crate::config::MultimodalConfig::default(),
|
||||
4,
|
||||
None,
|
||||
@@ -5174,6 +5456,7 @@ mod tests {
|
||||
true,
|
||||
None,
|
||||
"cli",
|
||||
None,
|
||||
&crate::config::MultimodalConfig::default(),
|
||||
4,
|
||||
None,
|
||||
@@ -5246,6 +5529,7 @@ mod tests {
|
||||
0.0,
|
||||
true,
|
||||
"daemon",
|
||||
None,
|
||||
&crate::config::MultimodalConfig::default(),
|
||||
4,
|
||||
None,
|
||||
@@ -7137,6 +7421,7 @@ Let me check the result."#;
|
||||
true,
|
||||
None,
|
||||
"telegram",
|
||||
None,
|
||||
&crate::config::MultimodalConfig::default(),
|
||||
4,
|
||||
None,
|
||||
|
||||
@@ -2240,6 +2240,7 @@ async fn process_channel_message(
|
||||
true,
|
||||
Some(&*ctx.approval_manager),
|
||||
msg.channel.as_str(),
|
||||
Some(msg.reply_target.as_str()),
|
||||
&ctx.multimodal,
|
||||
ctx.max_tool_iterations,
|
||||
Some(cancellation_token.clone()),
|
||||
@@ -3379,6 +3380,7 @@ fn collect_configured_channels(
|
||||
Vec::new(),
|
||||
sl.allowed_users.clone(),
|
||||
)
|
||||
.with_thread_replies(sl.thread_replies.unwrap_or(true))
|
||||
.with_group_reply_policy(sl.mention_only, Vec::new())
|
||||
.with_workspace_dir(config.workspace_dir.clone()),
|
||||
),
|
||||
|
||||
+37
-1
@@ -25,6 +25,7 @@ pub struct SlackChannel {
|
||||
channel_id: Option<String>,
|
||||
channel_ids: Vec<String>,
|
||||
allowed_users: Vec<String>,
|
||||
thread_replies: bool,
|
||||
mention_only: bool,
|
||||
group_reply_allowed_sender_ids: Vec<String>,
|
||||
user_display_name_cache: Mutex<HashMap<String, CachedSlackDisplayName>>,
|
||||
@@ -75,6 +76,7 @@ impl SlackChannel {
|
||||
channel_id,
|
||||
channel_ids,
|
||||
allowed_users,
|
||||
thread_replies: true,
|
||||
mention_only: false,
|
||||
group_reply_allowed_sender_ids: Vec::new(),
|
||||
user_display_name_cache: Mutex::new(HashMap::new()),
|
||||
@@ -94,6 +96,12 @@ impl SlackChannel {
|
||||
self
|
||||
}
|
||||
|
||||
/// Configure whether outbound replies stay in the originating Slack thread.
|
||||
pub fn with_thread_replies(mut self, thread_replies: bool) -> Self {
|
||||
self.thread_replies = thread_replies;
|
||||
self
|
||||
}
|
||||
|
||||
/// Configure workspace directory used for persisting inbound Slack attachments.
|
||||
pub fn with_workspace_dir(mut self, dir: PathBuf) -> Self {
|
||||
self.workspace_dir = Some(dir);
|
||||
@@ -122,6 +130,14 @@ impl SlackChannel {
|
||||
.any(|entry| entry == "*" || entry == user_id)
|
||||
}
|
||||
|
||||
fn outbound_thread_ts<'a>(&self, message: &'a SendMessage) -> Option<&'a str> {
|
||||
if self.thread_replies {
|
||||
message.thread_ts.as_deref()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the bot's own user ID so we can ignore our own messages
|
||||
async fn get_bot_user_id(&self) -> Option<String> {
|
||||
let resp: serde_json::Value = self
|
||||
@@ -2149,7 +2165,7 @@ impl Channel for SlackChannel {
|
||||
"text": message.content
|
||||
});
|
||||
|
||||
if let Some(ref ts) = message.thread_ts {
|
||||
if let Some(ts) = self.outbound_thread_ts(message) {
|
||||
body["thread_ts"] = serde_json::json!(ts);
|
||||
}
|
||||
|
||||
@@ -2484,10 +2500,30 @@ mod tests {
|
||||
#[test]
|
||||
fn slack_group_reply_policy_defaults_to_all_messages() {
|
||||
let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec!["*".into()]);
|
||||
assert!(ch.thread_replies);
|
||||
assert!(!ch.mention_only);
|
||||
assert!(ch.group_reply_allowed_sender_ids.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn with_thread_replies_sets_flag() {
|
||||
let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![])
|
||||
.with_thread_replies(false);
|
||||
assert!(!ch.thread_replies);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn outbound_thread_ts_respects_thread_replies_setting() {
|
||||
let msg = SendMessage::new("hello", "C123").in_thread(Some("1741234567.100001".into()));
|
||||
|
||||
let threaded = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![]);
|
||||
assert_eq!(threaded.outbound_thread_ts(&msg), Some("1741234567.100001"));
|
||||
|
||||
let channel_root = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![])
|
||||
.with_thread_replies(false);
|
||||
assert_eq!(channel_root.outbound_thread_ts(&msg), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn with_workspace_dir_sets_field() {
|
||||
let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![])
|
||||
|
||||
@@ -4143,6 +4143,15 @@ pub struct CronConfig {
|
||||
/// Enable the cron subsystem. Default: `true`.
|
||||
#[serde(default = "default_true")]
|
||||
pub enabled: bool,
|
||||
/// Run all overdue jobs at scheduler startup. Default: `true`.
|
||||
///
|
||||
/// When the machine boots late or the daemon restarts, jobs whose
|
||||
/// `next_run` is in the past are considered "missed". With this
|
||||
/// option enabled the scheduler fires them once before entering
|
||||
/// the normal polling loop. Disable if you prefer missed jobs to
|
||||
/// simply wait for their next scheduled occurrence.
|
||||
#[serde(default = "default_true")]
|
||||
pub catch_up_on_startup: bool,
|
||||
/// Maximum number of historical cron run records to retain. Default: `50`.
|
||||
#[serde(default = "default_max_run_history")]
|
||||
pub max_run_history: u32,
|
||||
@@ -4156,6 +4165,7 @@ impl Default for CronConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enabled: true,
|
||||
catch_up_on_startup: true,
|
||||
max_run_history: default_max_run_history(),
|
||||
}
|
||||
}
|
||||
@@ -4630,6 +4640,10 @@ pub struct SlackConfig {
|
||||
/// cancels the in-flight request and starts a fresh response with preserved history.
|
||||
#[serde(default)]
|
||||
pub interrupt_on_new_message: bool,
|
||||
/// When true (default), replies stay in the originating Slack thread.
|
||||
/// When false, replies go to the channel root instead.
|
||||
#[serde(default)]
|
||||
pub thread_replies: Option<bool>,
|
||||
/// When true, only respond to messages that @-mention the bot in groups.
|
||||
/// Direct messages remain allowed.
|
||||
#[serde(default)]
|
||||
@@ -8397,11 +8411,13 @@ recipient = "42"
|
||||
async fn cron_config_serde_roundtrip() {
|
||||
let c = CronConfig {
|
||||
enabled: false,
|
||||
catch_up_on_startup: false,
|
||||
max_run_history: 100,
|
||||
};
|
||||
let json = serde_json::to_string(&c).unwrap();
|
||||
let parsed: CronConfig = serde_json::from_str(&json).unwrap();
|
||||
assert!(!parsed.enabled);
|
||||
assert!(!parsed.catch_up_on_startup);
|
||||
assert_eq!(parsed.max_run_history, 100);
|
||||
}
|
||||
|
||||
@@ -8415,6 +8431,7 @@ default_temperature = 0.7
|
||||
|
||||
let parsed: Config = toml::from_str(toml_str).unwrap();
|
||||
assert!(parsed.cron.enabled);
|
||||
assert!(parsed.cron.catch_up_on_startup);
|
||||
assert_eq!(parsed.cron.max_run_history, 50);
|
||||
}
|
||||
|
||||
@@ -9370,6 +9387,7 @@ allowed_users = ["@ops:matrix.org"]
|
||||
let parsed: SlackConfig = serde_json::from_str(json).unwrap();
|
||||
assert!(parsed.allowed_users.is_empty());
|
||||
assert!(!parsed.interrupt_on_new_message);
|
||||
assert_eq!(parsed.thread_replies, None);
|
||||
assert!(!parsed.mention_only);
|
||||
}
|
||||
|
||||
@@ -9379,6 +9397,7 @@ allowed_users = ["@ops:matrix.org"]
|
||||
let parsed: SlackConfig = serde_json::from_str(json).unwrap();
|
||||
assert_eq!(parsed.allowed_users, vec!["U111"]);
|
||||
assert!(!parsed.interrupt_on_new_message);
|
||||
assert_eq!(parsed.thread_replies, None);
|
||||
assert!(!parsed.mention_only);
|
||||
}
|
||||
|
||||
@@ -9388,6 +9407,7 @@ allowed_users = ["@ops:matrix.org"]
|
||||
let parsed: SlackConfig = serde_json::from_str(json).unwrap();
|
||||
assert!(parsed.mention_only);
|
||||
assert!(!parsed.interrupt_on_new_message);
|
||||
assert_eq!(parsed.thread_replies, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -9395,6 +9415,16 @@ allowed_users = ["@ops:matrix.org"]
|
||||
let json = r#"{"bot_token":"xoxb-tok","interrupt_on_new_message":true}"#;
|
||||
let parsed: SlackConfig = serde_json::from_str(json).unwrap();
|
||||
assert!(parsed.interrupt_on_new_message);
|
||||
assert_eq!(parsed.thread_replies, None);
|
||||
assert!(!parsed.mention_only);
|
||||
}
|
||||
|
||||
#[test]
|
||||
async fn slack_config_deserializes_thread_replies() {
|
||||
let json = r#"{"bot_token":"xoxb-tok","thread_replies":false}"#;
|
||||
let parsed: SlackConfig = serde_json::from_str(json).unwrap();
|
||||
assert_eq!(parsed.thread_replies, Some(false));
|
||||
assert!(!parsed.interrupt_on_new_message);
|
||||
assert!(!parsed.mention_only);
|
||||
}
|
||||
|
||||
@@ -9418,6 +9448,7 @@ channel_id = "C123"
|
||||
let parsed: SlackConfig = toml::from_str(toml_str).unwrap();
|
||||
assert!(parsed.allowed_users.is_empty());
|
||||
assert!(!parsed.interrupt_on_new_message);
|
||||
assert_eq!(parsed.thread_replies, None);
|
||||
assert!(!parsed.mention_only);
|
||||
assert_eq!(parsed.channel_id.as_deref(), Some("C123"));
|
||||
}
|
||||
|
||||
+144
-8
@@ -14,8 +14,8 @@ pub use schedule::{
|
||||
};
|
||||
#[allow(unused_imports)]
|
||||
pub use store::{
|
||||
add_agent_job, due_jobs, get_job, list_jobs, list_runs, record_last_run, record_run,
|
||||
remove_job, reschedule_after_run, update_job,
|
||||
add_agent_job, all_overdue_jobs, due_jobs, get_job, list_jobs, list_runs, record_last_run,
|
||||
record_run, remove_job, reschedule_after_run, update_job,
|
||||
};
|
||||
pub use types::{
|
||||
deserialize_maybe_stringified, CronJob, CronJobPatch, CronRun, DeliveryConfig, JobType,
|
||||
@@ -156,6 +156,7 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
|
||||
expression,
|
||||
tz,
|
||||
agent,
|
||||
allowed_tools,
|
||||
command,
|
||||
} => {
|
||||
let schedule = Schedule::Cron {
|
||||
@@ -172,12 +173,20 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
if allowed_tools.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(allowed_tools)
|
||||
},
|
||||
)?;
|
||||
println!("✅ Added agent cron job {}", job.id);
|
||||
println!(" Expr : {}", job.expression);
|
||||
println!(" Next : {}", job.next_run.to_rfc3339());
|
||||
println!(" Prompt: {}", job.prompt.as_deref().unwrap_or_default());
|
||||
} else {
|
||||
if !allowed_tools.is_empty() {
|
||||
bail!("--allowed-tool is only supported with --agent cron jobs");
|
||||
}
|
||||
let job = add_shell_job(config, None, schedule, &command)?;
|
||||
println!("✅ Added cron job {}", job.id);
|
||||
println!(" Expr: {}", job.expression);
|
||||
@@ -186,7 +195,12 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
crate::CronCommands::AddAt { at, agent, command } => {
|
||||
crate::CronCommands::AddAt {
|
||||
at,
|
||||
agent,
|
||||
allowed_tools,
|
||||
command,
|
||||
} => {
|
||||
let at = chrono::DateTime::parse_from_rfc3339(&at)
|
||||
.map_err(|e| anyhow::anyhow!("Invalid RFC3339 timestamp for --at: {e}"))?
|
||||
.with_timezone(&chrono::Utc);
|
||||
@@ -201,11 +215,19 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
|
||||
None,
|
||||
None,
|
||||
true,
|
||||
if allowed_tools.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(allowed_tools)
|
||||
},
|
||||
)?;
|
||||
println!("✅ Added one-shot agent cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
println!(" Prompt: {}", job.prompt.as_deref().unwrap_or_default());
|
||||
} else {
|
||||
if !allowed_tools.is_empty() {
|
||||
bail!("--allowed-tool is only supported with --agent cron jobs");
|
||||
}
|
||||
let job = add_shell_job(config, None, schedule, &command)?;
|
||||
println!("✅ Added one-shot cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
@@ -216,6 +238,7 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
|
||||
crate::CronCommands::AddEvery {
|
||||
every_ms,
|
||||
agent,
|
||||
allowed_tools,
|
||||
command,
|
||||
} => {
|
||||
let schedule = Schedule::Every { every_ms };
|
||||
@@ -229,12 +252,20 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
if allowed_tools.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(allowed_tools)
|
||||
},
|
||||
)?;
|
||||
println!("✅ Added interval agent cron job {}", job.id);
|
||||
println!(" Every(ms): {every_ms}");
|
||||
println!(" Next : {}", job.next_run.to_rfc3339());
|
||||
println!(" Prompt : {}", job.prompt.as_deref().unwrap_or_default());
|
||||
} else {
|
||||
if !allowed_tools.is_empty() {
|
||||
bail!("--allowed-tool is only supported with --agent cron jobs");
|
||||
}
|
||||
let job = add_shell_job(config, None, schedule, &command)?;
|
||||
println!("✅ Added interval cron job {}", job.id);
|
||||
println!(" Every(ms): {every_ms}");
|
||||
@@ -246,6 +277,7 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
|
||||
crate::CronCommands::Once {
|
||||
delay,
|
||||
agent,
|
||||
allowed_tools,
|
||||
command,
|
||||
} => {
|
||||
if agent {
|
||||
@@ -261,11 +293,19 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
|
||||
None,
|
||||
None,
|
||||
true,
|
||||
if allowed_tools.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(allowed_tools)
|
||||
},
|
||||
)?;
|
||||
println!("✅ Added one-shot agent cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
println!(" Prompt: {}", job.prompt.as_deref().unwrap_or_default());
|
||||
} else {
|
||||
if !allowed_tools.is_empty() {
|
||||
bail!("--allowed-tool is only supported with --agent cron jobs");
|
||||
}
|
||||
let job = add_once(config, &delay, &command)?;
|
||||
println!("✅ Added one-shot cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
@@ -279,21 +319,37 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
|
||||
tz,
|
||||
command,
|
||||
name,
|
||||
allowed_tools,
|
||||
} => {
|
||||
if expression.is_none() && tz.is_none() && command.is_none() && name.is_none() {
|
||||
bail!("At least one of --expression, --tz, --command, or --name must be provided");
|
||||
if expression.is_none()
|
||||
&& tz.is_none()
|
||||
&& command.is_none()
|
||||
&& name.is_none()
|
||||
&& allowed_tools.is_empty()
|
||||
{
|
||||
bail!(
|
||||
"At least one of --expression, --tz, --command, --name, or --allowed-tool must be provided"
|
||||
);
|
||||
}
|
||||
|
||||
let existing = if expression.is_some() || tz.is_some() || !allowed_tools.is_empty() {
|
||||
Some(get_job(config, &id)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Merge expression/tz with the existing schedule so that
|
||||
// --tz alone updates the timezone and --expression alone
|
||||
// preserves the existing timezone.
|
||||
let schedule = if expression.is_some() || tz.is_some() {
|
||||
let existing = get_job(config, &id)?;
|
||||
let (existing_expr, existing_tz) = match existing.schedule {
|
||||
let existing = existing
|
||||
.as_ref()
|
||||
.expect("existing job must be loaded when updating schedule");
|
||||
let (existing_expr, existing_tz) = match &existing.schedule {
|
||||
Schedule::Cron {
|
||||
expr,
|
||||
tz: existing_tz,
|
||||
} => (expr, existing_tz),
|
||||
} => (expr.clone(), existing_tz.clone()),
|
||||
_ => bail!("Cannot update expression/tz on a non-cron schedule"),
|
||||
};
|
||||
Some(Schedule::Cron {
|
||||
@@ -304,10 +360,24 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
|
||||
None
|
||||
};
|
||||
|
||||
if !allowed_tools.is_empty() {
|
||||
let existing = existing
|
||||
.as_ref()
|
||||
.expect("existing job must be loaded when updating allowed tools");
|
||||
if existing.job_type != JobType::Agent {
|
||||
bail!("--allowed-tool is only supported for agent cron jobs");
|
||||
}
|
||||
}
|
||||
|
||||
let patch = CronJobPatch {
|
||||
schedule,
|
||||
command,
|
||||
name,
|
||||
allowed_tools: if allowed_tools.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(allowed_tools)
|
||||
},
|
||||
..CronJobPatch::default()
|
||||
};
|
||||
|
||||
@@ -430,6 +500,7 @@ mod tests {
|
||||
tz: tz.map(Into::into),
|
||||
command: command.map(Into::into),
|
||||
name: name.map(Into::into),
|
||||
allowed_tools: vec![],
|
||||
},
|
||||
config,
|
||||
)
|
||||
@@ -778,6 +849,7 @@ mod tests {
|
||||
expression: "*/15 * * * *".into(),
|
||||
tz: None,
|
||||
agent: true,
|
||||
allowed_tools: vec![],
|
||||
command: "Check server health: disk space, memory, CPU load".into(),
|
||||
},
|
||||
&config,
|
||||
@@ -808,6 +880,7 @@ mod tests {
|
||||
expression: "*/15 * * * *".into(),
|
||||
tz: None,
|
||||
agent: true,
|
||||
allowed_tools: vec![],
|
||||
command: "Check server health: disk space, memory, CPU load".into(),
|
||||
},
|
||||
&config,
|
||||
@@ -819,6 +892,68 @@ mod tests {
|
||||
assert_eq!(jobs[0].job_type, JobType::Agent);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cli_agent_allowed_tools_persist() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let config = test_config(&tmp);
|
||||
|
||||
handle_command(
|
||||
crate::CronCommands::Add {
|
||||
expression: "*/15 * * * *".into(),
|
||||
tz: None,
|
||||
agent: true,
|
||||
allowed_tools: vec!["file_read".into(), "web_search".into()],
|
||||
command: "Check server health".into(),
|
||||
},
|
||||
&config,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let jobs = list_jobs(&config).unwrap();
|
||||
assert_eq!(jobs.len(), 1);
|
||||
assert_eq!(
|
||||
jobs[0].allowed_tools,
|
||||
Some(vec!["file_read".into(), "web_search".into()])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cli_update_agent_allowed_tools_persist() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let config = test_config(&tmp);
|
||||
let job = add_agent_job(
|
||||
&config,
|
||||
Some("agent".into()),
|
||||
Schedule::Cron {
|
||||
expr: "*/5 * * * *".into(),
|
||||
tz: None,
|
||||
},
|
||||
"original prompt",
|
||||
SessionTarget::Isolated,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
handle_command(
|
||||
crate::CronCommands::Update {
|
||||
id: job.id.clone(),
|
||||
expression: None,
|
||||
tz: None,
|
||||
command: None,
|
||||
name: None,
|
||||
allowed_tools: vec!["shell".into()],
|
||||
},
|
||||
&config,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let updated = get_job(&config, &job.id).unwrap();
|
||||
assert_eq!(updated.allowed_tools, Some(vec!["shell".into()]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cli_without_agent_flag_defaults_to_shell_job() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
@@ -829,6 +964,7 @@ mod tests {
|
||||
expression: "*/5 * * * *".into(),
|
||||
tz: None,
|
||||
agent: false,
|
||||
allowed_tools: vec![],
|
||||
command: "echo ok".into(),
|
||||
},
|
||||
&config,
|
||||
|
||||
+130
-14
@@ -6,8 +6,9 @@ use crate::channels::{
|
||||
};
|
||||
use crate::config::Config;
|
||||
use crate::cron::{
|
||||
due_jobs, next_run_for_schedule, record_last_run, record_run, remove_job, reschedule_after_run,
|
||||
update_job, CronJob, CronJobPatch, DeliveryConfig, JobType, Schedule, SessionTarget,
|
||||
all_overdue_jobs, due_jobs, next_run_for_schedule, record_last_run, record_run, remove_job,
|
||||
reschedule_after_run, update_job, CronJob, CronJobPatch, DeliveryConfig, JobType, Schedule,
|
||||
SessionTarget,
|
||||
};
|
||||
use crate::security::SecurityPolicy;
|
||||
use anyhow::Result;
|
||||
@@ -33,6 +34,18 @@ pub async fn run(config: Config) -> Result<()> {
|
||||
|
||||
crate::health::mark_component_ok(SCHEDULER_COMPONENT);
|
||||
|
||||
// ── Startup catch-up: run ALL overdue jobs before entering the
|
||||
// normal polling loop. The regular loop is capped by `max_tasks`,
|
||||
// which could leave some overdue jobs waiting across many cycles
|
||||
// if the machine was off for a while. The catch-up phase fetches
|
||||
// without the `max_tasks` limit so every missed job fires once.
|
||||
// Controlled by `[cron] catch_up_on_startup` (default: true).
|
||||
if config.cron.catch_up_on_startup {
|
||||
catch_up_overdue_jobs(&config, &security).await;
|
||||
} else {
|
||||
tracing::info!("Scheduler startup: catch-up disabled by config");
|
||||
}
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
// Keep scheduler liveness fresh even when there are no due jobs.
|
||||
@@ -51,6 +64,35 @@ pub async fn run(config: Config) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch **all** overdue jobs (ignoring `max_tasks`) and execute them.
|
||||
///
|
||||
/// Called once at scheduler startup so that jobs missed during downtime
|
||||
/// (e.g. late boot, daemon restart) are caught up immediately.
|
||||
async fn catch_up_overdue_jobs(config: &Config, security: &Arc<SecurityPolicy>) {
|
||||
let now = Utc::now();
|
||||
let jobs = match all_overdue_jobs(config, now) {
|
||||
Ok(jobs) => jobs,
|
||||
Err(e) => {
|
||||
tracing::warn!("Startup catch-up query failed: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if jobs.is_empty() {
|
||||
tracing::info!("Scheduler startup: no overdue jobs to catch up");
|
||||
return;
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
count = jobs.len(),
|
||||
"Scheduler startup: catching up overdue jobs"
|
||||
);
|
||||
|
||||
process_due_jobs(config, security, jobs, SCHEDULER_COMPONENT).await;
|
||||
|
||||
tracing::info!("Scheduler startup: catch-up complete");
|
||||
}
|
||||
|
||||
pub async fn execute_job_now(config: &Config, job: &CronJob) -> (bool, String) {
|
||||
let security = SecurityPolicy::from_config(&config.autonomy, &config.workspace_dir);
|
||||
Box::pin(execute_job_with_retry(config, &security, job)).await
|
||||
@@ -506,18 +548,12 @@ async fn run_job_command_with_timeout(
|
||||
);
|
||||
}
|
||||
|
||||
let child = match Command::new("sh")
|
||||
.arg("-lc")
|
||||
.arg(&job.command)
|
||||
.current_dir(&config.workspace_dir)
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.kill_on_drop(true)
|
||||
.spawn()
|
||||
{
|
||||
Ok(child) => child,
|
||||
Err(e) => return (false, format!("spawn error: {e}")),
|
||||
let child = match build_cron_shell_command(&job.command, &config.workspace_dir) {
|
||||
Ok(mut cmd) => match cmd.spawn() {
|
||||
Ok(child) => child,
|
||||
Err(e) => return (false, format!("spawn error: {e}")),
|
||||
},
|
||||
Err(e) => return (false, format!("shell setup error: {e}")),
|
||||
};
|
||||
|
||||
match time::timeout(timeout, child.wait_with_output()).await {
|
||||
@@ -540,6 +576,35 @@ async fn run_job_command_with_timeout(
|
||||
}
|
||||
}
|
||||
|
||||
/// Build a shell `Command` for cron job execution.
|
||||
///
|
||||
/// Uses `sh -c <command>` (non-login shell). On Windows, ZeroClaw users
|
||||
/// typically have Git Bash installed which provides `sh` in PATH, and
|
||||
/// cron commands are written with Unix shell syntax. The previous `-lc`
|
||||
/// (login shell) flag was dropped: login shells load the full user
|
||||
/// profile on every invocation which is slow and may cause side effects.
|
||||
///
|
||||
/// The command is configured with:
|
||||
/// - `current_dir` set to the workspace
|
||||
/// - `stdin` piped to `/dev/null` (no interactive input)
|
||||
/// - `stdout` and `stderr` piped for capture
|
||||
/// - `kill_on_drop(true)` for safe timeout handling
|
||||
fn build_cron_shell_command(
|
||||
command: &str,
|
||||
workspace_dir: &std::path::Path,
|
||||
) -> anyhow::Result<Command> {
|
||||
let mut cmd = Command::new("sh");
|
||||
cmd.arg("-c")
|
||||
.arg(command)
|
||||
.current_dir(workspace_dir)
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.kill_on_drop(true);
|
||||
|
||||
Ok(cmd)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -900,6 +965,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
let started = Utc::now();
|
||||
@@ -925,6 +991,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
let started = Utc::now();
|
||||
@@ -991,6 +1058,7 @@ mod tests {
|
||||
best_effort: false,
|
||||
}),
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
let started = Utc::now();
|
||||
@@ -1029,6 +1097,7 @@ mod tests {
|
||||
best_effort: true,
|
||||
}),
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
let started = Utc::now();
|
||||
@@ -1060,6 +1129,7 @@ mod tests {
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
assert!(!job.delete_after_run);
|
||||
@@ -1152,4 +1222,50 @@ mod tests {
|
||||
.to_string()
|
||||
.contains("matrix delivery channel requires `channel-matrix` feature"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_cron_shell_command_uses_sh_non_login() {
|
||||
let workspace = std::env::temp_dir();
|
||||
let cmd = build_cron_shell_command("echo cron-test", &workspace).unwrap();
|
||||
let debug = format!("{cmd:?}");
|
||||
assert!(debug.contains("echo cron-test"));
|
||||
assert!(debug.contains("\"sh\""), "should use sh: {debug}");
|
||||
// Must NOT use login shell (-l) — login shells load full profile
|
||||
// and are slow/unpredictable for cron jobs.
|
||||
assert!(
|
||||
!debug.contains("\"-lc\""),
|
||||
"must not use login shell: {debug}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn build_cron_shell_command_executes_successfully() {
|
||||
let workspace = std::env::temp_dir();
|
||||
let mut cmd = build_cron_shell_command("echo cron-ok", &workspace).unwrap();
|
||||
let output = cmd.output().await.unwrap();
|
||||
assert!(output.status.success());
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
assert!(stdout.contains("cron-ok"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn catch_up_queries_all_overdue_jobs_ignoring_max_tasks() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let mut config = test_config(&tmp).await;
|
||||
config.scheduler.max_tasks = 1; // limit normal polling to 1
|
||||
|
||||
// Create 3 jobs with "every minute" schedule
|
||||
for i in 0..3 {
|
||||
let _ = cron::add_job(&config, "* * * * *", &format!("echo catchup-{i}")).unwrap();
|
||||
}
|
||||
|
||||
// Verify normal due_jobs is limited to max_tasks=1
|
||||
let far_future = Utc::now() + ChronoDuration::days(1);
|
||||
let due = cron::due_jobs(&config, far_future).unwrap();
|
||||
assert_eq!(due.len(), 1, "due_jobs must respect max_tasks");
|
||||
|
||||
// all_overdue_jobs ignores the limit
|
||||
let overdue = cron::all_overdue_jobs(&config, far_future).unwrap();
|
||||
assert_eq!(overdue.len(), 3, "all_overdue_jobs must return all");
|
||||
}
|
||||
}
|
||||
|
||||
+170
-8
@@ -77,6 +77,7 @@ pub fn add_agent_job(
|
||||
model: Option<String>,
|
||||
delivery: Option<DeliveryConfig>,
|
||||
delete_after_run: bool,
|
||||
allowed_tools: Option<Vec<String>>,
|
||||
) -> Result<CronJob> {
|
||||
let now = Utc::now();
|
||||
validate_schedule(&schedule, now)?;
|
||||
@@ -90,8 +91,8 @@ pub fn add_agent_job(
|
||||
conn.execute(
|
||||
"INSERT INTO cron_jobs (
|
||||
id, expression, command, schedule, job_type, prompt, name, session_target, model,
|
||||
enabled, delivery, delete_after_run, created_at, next_run
|
||||
) VALUES (?1, ?2, '', ?3, 'agent', ?4, ?5, ?6, ?7, 1, ?8, ?9, ?10, ?11)",
|
||||
enabled, delivery, delete_after_run, allowed_tools, created_at, next_run
|
||||
) VALUES (?1, ?2, '', ?3, 'agent', ?4, ?5, ?6, ?7, 1, ?8, ?9, ?10, ?11, ?12)",
|
||||
params![
|
||||
id,
|
||||
expression,
|
||||
@@ -102,6 +103,7 @@ pub fn add_agent_job(
|
||||
model,
|
||||
serde_json::to_string(&delivery)?,
|
||||
if delete_after_run { 1 } else { 0 },
|
||||
encode_allowed_tools(allowed_tools.as_ref())?,
|
||||
now.to_rfc3339(),
|
||||
next_run.to_rfc3339(),
|
||||
],
|
||||
@@ -117,7 +119,8 @@ pub fn list_jobs(config: &Config) -> Result<Vec<CronJob>> {
|
||||
with_connection(config, |conn| {
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
|
||||
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output
|
||||
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
|
||||
allowed_tools
|
||||
FROM cron_jobs ORDER BY next_run ASC",
|
||||
)?;
|
||||
|
||||
@@ -135,7 +138,8 @@ pub fn get_job(config: &Config, job_id: &str) -> Result<CronJob> {
|
||||
with_connection(config, |conn| {
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
|
||||
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output
|
||||
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
|
||||
allowed_tools
|
||||
FROM cron_jobs WHERE id = ?1",
|
||||
)?;
|
||||
|
||||
@@ -168,7 +172,8 @@ pub fn due_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
|
||||
with_connection(config, |conn| {
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
|
||||
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output
|
||||
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output,
|
||||
allowed_tools
|
||||
FROM cron_jobs
|
||||
WHERE enabled = 1 AND next_run <= ?1
|
||||
ORDER BY next_run ASC
|
||||
@@ -188,6 +193,34 @@ pub fn due_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Return **all** enabled overdue jobs without the `max_tasks` limit.
|
||||
///
|
||||
/// Used by the scheduler startup catch-up to ensure every missed job is
|
||||
/// executed at least once after a period of downtime (late boot, daemon
|
||||
/// restart, etc.).
|
||||
pub fn all_overdue_jobs(config: &Config, now: DateTime<Utc>) -> Result<Vec<CronJob>> {
|
||||
with_connection(config, |conn| {
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, expression, command, schedule, job_type, prompt, name, session_target, model,
|
||||
enabled, delivery, delete_after_run, created_at, next_run, last_run, last_status, last_output, allowed_tools
|
||||
FROM cron_jobs
|
||||
WHERE enabled = 1 AND next_run <= ?1
|
||||
ORDER BY next_run ASC",
|
||||
)?;
|
||||
|
||||
let rows = stmt.query_map(params![now.to_rfc3339()], map_cron_job_row)?;
|
||||
|
||||
let mut jobs = Vec::new();
|
||||
for row in rows {
|
||||
match row {
|
||||
Ok(job) => jobs.push(job),
|
||||
Err(e) => tracing::warn!("Skipping cron job with unparseable row data: {e}"),
|
||||
}
|
||||
}
|
||||
Ok(jobs)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn update_job(config: &Config, job_id: &str, patch: CronJobPatch) -> Result<CronJob> {
|
||||
let mut job = get_job(config, job_id)?;
|
||||
let mut schedule_changed = false;
|
||||
@@ -222,6 +255,9 @@ pub fn update_job(config: &Config, job_id: &str, patch: CronJobPatch) -> Result<
|
||||
if let Some(delete_after_run) = patch.delete_after_run {
|
||||
job.delete_after_run = delete_after_run;
|
||||
}
|
||||
if let Some(allowed_tools) = patch.allowed_tools {
|
||||
job.allowed_tools = Some(allowed_tools);
|
||||
}
|
||||
|
||||
if schedule_changed {
|
||||
job.next_run = next_run_for_schedule(&job.schedule, Utc::now())?;
|
||||
@@ -232,8 +268,8 @@ pub fn update_job(config: &Config, job_id: &str, patch: CronJobPatch) -> Result<
|
||||
"UPDATE cron_jobs
|
||||
SET expression = ?1, command = ?2, schedule = ?3, job_type = ?4, prompt = ?5, name = ?6,
|
||||
session_target = ?7, model = ?8, enabled = ?9, delivery = ?10, delete_after_run = ?11,
|
||||
next_run = ?12
|
||||
WHERE id = ?13",
|
||||
allowed_tools = ?12, next_run = ?13
|
||||
WHERE id = ?14",
|
||||
params![
|
||||
job.expression,
|
||||
job.command,
|
||||
@@ -246,6 +282,7 @@ pub fn update_job(config: &Config, job_id: &str, patch: CronJobPatch) -> Result<
|
||||
if job.enabled { 1 } else { 0 },
|
||||
serde_json::to_string(&job.delivery)?,
|
||||
if job.delete_after_run { 1 } else { 0 },
|
||||
encode_allowed_tools(job.allowed_tools.as_ref())?,
|
||||
job.next_run.to_rfc3339(),
|
||||
job.id,
|
||||
],
|
||||
@@ -446,6 +483,7 @@ fn map_cron_job_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CronJob> {
|
||||
let next_run_raw: String = row.get(13)?;
|
||||
let last_run_raw: Option<String> = row.get(14)?;
|
||||
let created_at_raw: String = row.get(12)?;
|
||||
let allowed_tools_raw: Option<String> = row.get(17)?;
|
||||
|
||||
Ok(CronJob {
|
||||
id: row.get(0)?,
|
||||
@@ -468,7 +506,8 @@ fn map_cron_job_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CronJob> {
|
||||
},
|
||||
last_status: row.get(15)?,
|
||||
last_output: row.get(16)?,
|
||||
allowed_tools: None,
|
||||
allowed_tools: decode_allowed_tools(allowed_tools_raw.as_deref())
|
||||
.map_err(sql_conversion_error)?,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -502,6 +541,25 @@ fn decode_delivery(delivery_raw: Option<&str>) -> Result<DeliveryConfig> {
|
||||
Ok(DeliveryConfig::default())
|
||||
}
|
||||
|
||||
fn encode_allowed_tools(allowed_tools: Option<&Vec<String>>) -> Result<Option<String>> {
|
||||
allowed_tools
|
||||
.map(serde_json::to_string)
|
||||
.transpose()
|
||||
.context("Failed to serialize cron allowed_tools")
|
||||
}
|
||||
|
||||
fn decode_allowed_tools(raw: Option<&str>) -> Result<Option<Vec<String>>> {
|
||||
if let Some(raw) = raw {
|
||||
let trimmed = raw.trim();
|
||||
if !trimmed.is_empty() {
|
||||
return serde_json::from_str(trimmed)
|
||||
.map(Some)
|
||||
.with_context(|| format!("Failed to parse cron allowed_tools JSON: {trimmed}"));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn add_column_if_missing(conn: &Connection, name: &str, sql_type: &str) -> Result<()> {
|
||||
let mut stmt = conn.prepare("PRAGMA table_info(cron_jobs)")?;
|
||||
let mut rows = stmt.query([])?;
|
||||
@@ -557,6 +615,7 @@ fn with_connection<T>(config: &Config, f: impl FnOnce(&Connection) -> Result<T>)
|
||||
enabled INTEGER NOT NULL DEFAULT 1,
|
||||
delivery TEXT,
|
||||
delete_after_run INTEGER NOT NULL DEFAULT 0,
|
||||
allowed_tools TEXT,
|
||||
created_at TEXT NOT NULL,
|
||||
next_run TEXT NOT NULL,
|
||||
last_run TEXT,
|
||||
@@ -590,6 +649,7 @@ fn with_connection<T>(config: &Config, f: impl FnOnce(&Connection) -> Result<T>)
|
||||
add_column_if_missing(&conn, "enabled", "INTEGER NOT NULL DEFAULT 1")?;
|
||||
add_column_if_missing(&conn, "delivery", "TEXT")?;
|
||||
add_column_if_missing(&conn, "delete_after_run", "INTEGER NOT NULL DEFAULT 0")?;
|
||||
add_column_if_missing(&conn, "allowed_tools", "TEXT")?;
|
||||
|
||||
f(&conn)
|
||||
}
|
||||
@@ -704,6 +764,108 @@ mod tests {
|
||||
assert_eq!(due.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn all_overdue_jobs_ignores_max_tasks_limit() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let mut config = test_config(&tmp);
|
||||
config.scheduler.max_tasks = 2;
|
||||
|
||||
let _ = add_job(&config, "* * * * *", "echo ov-1").unwrap();
|
||||
let _ = add_job(&config, "* * * * *", "echo ov-2").unwrap();
|
||||
let _ = add_job(&config, "* * * * *", "echo ov-3").unwrap();
|
||||
|
||||
let far_future = Utc::now() + ChronoDuration::days(365);
|
||||
// due_jobs respects the limit
|
||||
let due = due_jobs(&config, far_future).unwrap();
|
||||
assert_eq!(due.len(), 2);
|
||||
// all_overdue_jobs returns everything
|
||||
let overdue = all_overdue_jobs(&config, far_future).unwrap();
|
||||
assert_eq!(overdue.len(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn all_overdue_jobs_excludes_disabled_jobs() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let config = test_config(&tmp);
|
||||
|
||||
let job = add_job(&config, "* * * * *", "echo disabled").unwrap();
|
||||
let _ = update_job(
|
||||
&config,
|
||||
&job.id,
|
||||
CronJobPatch {
|
||||
enabled: Some(false),
|
||||
..CronJobPatch::default()
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let far_future = Utc::now() + ChronoDuration::days(365);
|
||||
let overdue = all_overdue_jobs(&config, far_future).unwrap();
|
||||
assert!(overdue.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_agent_job_persists_allowed_tools() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let config = test_config(&tmp);
|
||||
|
||||
let job = add_agent_job(
|
||||
&config,
|
||||
Some("agent".into()),
|
||||
Schedule::Every { every_ms: 60_000 },
|
||||
"do work",
|
||||
SessionTarget::Isolated,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
Some(vec!["file_read".into(), "web_search".into()]),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
job.allowed_tools,
|
||||
Some(vec!["file_read".into(), "web_search".into()])
|
||||
);
|
||||
|
||||
let stored = get_job(&config, &job.id).unwrap();
|
||||
assert_eq!(stored.allowed_tools, job.allowed_tools);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn update_job_persists_allowed_tools_patch() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let config = test_config(&tmp);
|
||||
|
||||
let job = add_agent_job(
|
||||
&config,
|
||||
Some("agent".into()),
|
||||
Schedule::Every { every_ms: 60_000 },
|
||||
"do work",
|
||||
SessionTarget::Isolated,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let updated = update_job(
|
||||
&config,
|
||||
&job.id,
|
||||
CronJobPatch {
|
||||
allowed_tools: Some(vec!["shell".into()]),
|
||||
..CronJobPatch::default()
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(updated.allowed_tools, Some(vec!["shell".into()]));
|
||||
assert_eq!(
|
||||
get_job(&config, &job.id).unwrap().allowed_tools,
|
||||
Some(vec!["shell".into()])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reschedule_after_run_persists_last_status_and_last_run() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
|
||||
@@ -357,6 +357,65 @@ pub async fn handle_api_cron_delete(
|
||||
}
|
||||
}
|
||||
|
||||
/// GET /api/cron/settings — return cron subsystem settings
|
||||
pub async fn handle_api_cron_settings_get(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
) -> impl IntoResponse {
|
||||
if let Err(e) = require_auth(&state, &headers) {
|
||||
return e.into_response();
|
||||
}
|
||||
|
||||
let config = state.config.lock().clone();
|
||||
Json(serde_json::json!({
|
||||
"enabled": config.cron.enabled,
|
||||
"catch_up_on_startup": config.cron.catch_up_on_startup,
|
||||
"max_run_history": config.cron.max_run_history,
|
||||
}))
|
||||
.into_response()
|
||||
}
|
||||
|
||||
/// PATCH /api/cron/settings — update cron subsystem settings
|
||||
pub async fn handle_api_cron_settings_patch(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
Json(body): Json<serde_json::Value>,
|
||||
) -> impl IntoResponse {
|
||||
if let Err(e) = require_auth(&state, &headers) {
|
||||
return e.into_response();
|
||||
}
|
||||
|
||||
let mut config = state.config.lock().clone();
|
||||
|
||||
if let Some(v) = body.get("enabled").and_then(|v| v.as_bool()) {
|
||||
config.cron.enabled = v;
|
||||
}
|
||||
if let Some(v) = body.get("catch_up_on_startup").and_then(|v| v.as_bool()) {
|
||||
config.cron.catch_up_on_startup = v;
|
||||
}
|
||||
if let Some(v) = body.get("max_run_history").and_then(|v| v.as_u64()) {
|
||||
config.cron.max_run_history = u32::try_from(v).unwrap_or(u32::MAX);
|
||||
}
|
||||
|
||||
if let Err(e) = config.save().await {
|
||||
return (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"error": format!("Failed to save config: {e}")})),
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
|
||||
*state.config.lock() = config.clone();
|
||||
|
||||
Json(serde_json::json!({
|
||||
"status": "ok",
|
||||
"enabled": config.cron.enabled,
|
||||
"catch_up_on_startup": config.cron.catch_up_on_startup,
|
||||
"max_run_history": config.cron.max_run_history,
|
||||
}))
|
||||
.into_response()
|
||||
}
|
||||
|
||||
/// GET /api/integrations — list all integrations with status
|
||||
pub async fn handle_api_integrations(
|
||||
State(state): State<AppState>,
|
||||
|
||||
@@ -766,6 +766,10 @@ pub async fn run_gateway(host: &str, port: u16, config: Config) -> Result<()> {
|
||||
.route("/api/tools", get(api::handle_api_tools))
|
||||
.route("/api/cron", get(api::handle_api_cron_list))
|
||||
.route("/api/cron", post(api::handle_api_cron_add))
|
||||
.route(
|
||||
"/api/cron/settings",
|
||||
get(api::handle_api_cron_settings_get).patch(api::handle_api_cron_settings_patch),
|
||||
)
|
||||
.route("/api/cron/{id}", delete(api::handle_api_cron_delete))
|
||||
.route("/api/cron/{id}/runs", get(api::handle_api_cron_runs))
|
||||
.route("/api/integrations", get(api::handle_api_integrations))
|
||||
|
||||
+15
@@ -299,6 +299,9 @@ Examples:
|
||||
/// Treat the argument as an agent prompt instead of a shell command
|
||||
#[arg(long)]
|
||||
agent: bool,
|
||||
/// Restrict agent cron jobs to the specified tool names (repeatable, agent-only)
|
||||
#[arg(long = "allowed-tool")]
|
||||
allowed_tools: Vec<String>,
|
||||
/// Command (shell) or prompt (agent) to run
|
||||
command: String,
|
||||
},
|
||||
@@ -317,6 +320,9 @@ Examples:
|
||||
/// Treat the argument as an agent prompt instead of a shell command
|
||||
#[arg(long)]
|
||||
agent: bool,
|
||||
/// Restrict agent cron jobs to the specified tool names (repeatable, agent-only)
|
||||
#[arg(long = "allowed-tool")]
|
||||
allowed_tools: Vec<String>,
|
||||
/// Command (shell) or prompt (agent) to run
|
||||
command: String,
|
||||
},
|
||||
@@ -335,6 +341,9 @@ Examples:
|
||||
/// Treat the argument as an agent prompt instead of a shell command
|
||||
#[arg(long)]
|
||||
agent: bool,
|
||||
/// Restrict agent cron jobs to the specified tool names (repeatable, agent-only)
|
||||
#[arg(long = "allowed-tool")]
|
||||
allowed_tools: Vec<String>,
|
||||
/// Command (shell) or prompt (agent) to run
|
||||
command: String,
|
||||
},
|
||||
@@ -355,6 +364,9 @@ Examples:
|
||||
/// Treat the argument as an agent prompt instead of a shell command
|
||||
#[arg(long)]
|
||||
agent: bool,
|
||||
/// Restrict agent cron jobs to the specified tool names (repeatable, agent-only)
|
||||
#[arg(long = "allowed-tool")]
|
||||
allowed_tools: Vec<String>,
|
||||
/// Command (shell) or prompt (agent) to run
|
||||
command: String,
|
||||
},
|
||||
@@ -388,6 +400,9 @@ Examples:
|
||||
/// New job name
|
||||
#[arg(long)]
|
||||
name: Option<String>,
|
||||
/// Replace the agent job allowlist with the specified tool names (repeatable)
|
||||
#[arg(long = "allowed-tool")]
|
||||
allowed_tools: Vec<String>,
|
||||
},
|
||||
/// Pause a scheduled task
|
||||
Pause {
|
||||
|
||||
@@ -463,6 +463,47 @@ fn resolve_quick_setup_dirs_with_home(home: &Path) -> (PathBuf, PathBuf) {
|
||||
(config_dir.clone(), config_dir.join("workspace"))
|
||||
}
|
||||
|
||||
fn homebrew_prefix_for_exe(exe: &Path) -> Option<&'static str> {
|
||||
let exe = exe.to_string_lossy();
|
||||
if exe == "/opt/homebrew/bin/zeroclaw"
|
||||
|| exe.starts_with("/opt/homebrew/Cellar/zeroclaw/")
|
||||
|| exe.starts_with("/opt/homebrew/opt/zeroclaw/")
|
||||
{
|
||||
return Some("/opt/homebrew");
|
||||
}
|
||||
|
||||
if exe == "/usr/local/bin/zeroclaw"
|
||||
|| exe.starts_with("/usr/local/Cellar/zeroclaw/")
|
||||
|| exe.starts_with("/usr/local/opt/zeroclaw/")
|
||||
{
|
||||
return Some("/usr/local");
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn quick_setup_homebrew_service_note(
|
||||
config_path: &Path,
|
||||
workspace_dir: &Path,
|
||||
exe: &Path,
|
||||
) -> Option<String> {
|
||||
let prefix = homebrew_prefix_for_exe(exe)?;
|
||||
let service_root = Path::new(prefix).join("var").join("zeroclaw");
|
||||
let service_config = service_root.join("config.toml");
|
||||
let service_workspace = service_root.join("workspace");
|
||||
|
||||
if config_path == service_config || workspace_dir == service_workspace {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(format!(
|
||||
"Homebrew service note: `brew services` uses {} (config {}) by default. Your onboarding just wrote {}. If you plan to run ZeroClaw as a service, copy or link this workspace first.",
|
||||
service_workspace.display(),
|
||||
service_config.display(),
|
||||
config_path.display(),
|
||||
))
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_lines)]
|
||||
async fn run_quick_setup_with_home(
|
||||
credential_override: Option<&str>,
|
||||
@@ -650,6 +691,16 @@ async fn run_quick_setup_with_home(
|
||||
style("Config saved:").white().bold(),
|
||||
style(config_path.display()).green()
|
||||
);
|
||||
if cfg!(target_os = "macos") {
|
||||
if let Ok(exe) = std::env::current_exe() {
|
||||
if let Some(note) =
|
||||
quick_setup_homebrew_service_note(&config_path, &workspace_dir, &exe)
|
||||
{
|
||||
println!();
|
||||
println!(" {}", style(note).yellow());
|
||||
}
|
||||
}
|
||||
}
|
||||
println!();
|
||||
println!(" {}", style("Next steps:").white().bold());
|
||||
if credential_override.is_none() {
|
||||
@@ -3913,6 +3964,7 @@ fn setup_channels() -> Result<ChannelsConfig> {
|
||||
},
|
||||
allowed_users,
|
||||
interrupt_on_new_message: false,
|
||||
thread_replies: None,
|
||||
mention_only: false,
|
||||
});
|
||||
}
|
||||
@@ -6066,6 +6118,52 @@ mod tests {
|
||||
assert_eq!(config.config_path, expected_config_path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn homebrew_prefix_for_exe_detects_supported_layouts() {
|
||||
assert_eq!(
|
||||
homebrew_prefix_for_exe(Path::new("/opt/homebrew/bin/zeroclaw")),
|
||||
Some("/opt/homebrew")
|
||||
);
|
||||
assert_eq!(
|
||||
homebrew_prefix_for_exe(Path::new(
|
||||
"/opt/homebrew/Cellar/zeroclaw/0.5.0/bin/zeroclaw",
|
||||
)),
|
||||
Some("/opt/homebrew")
|
||||
);
|
||||
assert_eq!(
|
||||
homebrew_prefix_for_exe(Path::new("/usr/local/bin/zeroclaw")),
|
||||
Some("/usr/local")
|
||||
);
|
||||
assert_eq!(homebrew_prefix_for_exe(Path::new("/tmp/zeroclaw")), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn quick_setup_homebrew_service_note_mentions_service_workspace() {
|
||||
let note = quick_setup_homebrew_service_note(
|
||||
Path::new("/Users/alix/.zeroclaw/config.toml"),
|
||||
Path::new("/Users/alix/.zeroclaw/workspace"),
|
||||
Path::new("/opt/homebrew/bin/zeroclaw"),
|
||||
)
|
||||
.expect("homebrew installs should emit a service workspace note");
|
||||
|
||||
assert!(note.contains("/opt/homebrew/var/zeroclaw/workspace"));
|
||||
assert!(note.contains("/opt/homebrew/var/zeroclaw/config.toml"));
|
||||
assert!(note.contains("/Users/alix/.zeroclaw/config.toml"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn quick_setup_homebrew_service_note_skips_matching_service_layout() {
|
||||
let service_config = Path::new("/opt/homebrew/var/zeroclaw/config.toml");
|
||||
let service_workspace = Path::new("/opt/homebrew/var/zeroclaw/workspace");
|
||||
|
||||
assert!(quick_setup_homebrew_service_note(
|
||||
service_config,
|
||||
service_workspace,
|
||||
Path::new("/opt/homebrew/bin/zeroclaw"),
|
||||
)
|
||||
.is_none());
|
||||
}
|
||||
|
||||
// ── scaffold_workspace: basic file creation ─────────────────
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
+50
-41
@@ -211,9 +211,9 @@ impl AnthropicProvider {
|
||||
text.len() > 3072
|
||||
}
|
||||
|
||||
/// Cache conversations with more than 4 messages (excluding system)
|
||||
/// Cache conversations with more than 1 non-system message (i.e. after first exchange)
|
||||
fn should_cache_conversation(messages: &[ChatMessage]) -> bool {
|
||||
messages.iter().filter(|m| m.role != "system").count() > 4
|
||||
messages.iter().filter(|m| m.role != "system").count() > 1
|
||||
}
|
||||
|
||||
/// Apply cache control to the last message content block
|
||||
@@ -447,17 +447,13 @@ impl AnthropicProvider {
|
||||
}
|
||||
}
|
||||
|
||||
// Convert system text to SystemPrompt with cache control if large
|
||||
// Always use Blocks format with cache_control for system prompts
|
||||
let system_prompt = system_text.map(|text| {
|
||||
if Self::should_cache_system(&text) {
|
||||
SystemPrompt::Blocks(vec![SystemBlock {
|
||||
block_type: "text".to_string(),
|
||||
text,
|
||||
cache_control: Some(CacheControl::ephemeral()),
|
||||
}])
|
||||
} else {
|
||||
SystemPrompt::String(text)
|
||||
}
|
||||
SystemPrompt::Blocks(vec![SystemBlock {
|
||||
block_type: "text".to_string(),
|
||||
text,
|
||||
cache_control: Some(CacheControl::ephemeral()),
|
||||
}])
|
||||
});
|
||||
|
||||
(system_prompt, native_messages)
|
||||
@@ -1063,12 +1059,8 @@ mod tests {
|
||||
role: "user".to_string(),
|
||||
content: "Hello".to_string(),
|
||||
},
|
||||
ChatMessage {
|
||||
role: "assistant".to_string(),
|
||||
content: "Hi".to_string(),
|
||||
},
|
||||
];
|
||||
// Only 2 non-system messages
|
||||
// Only 1 non-system message — should not cache
|
||||
assert!(!AnthropicProvider::should_cache_conversation(&messages));
|
||||
}
|
||||
|
||||
@@ -1078,8 +1070,8 @@ mod tests {
|
||||
role: "system".to_string(),
|
||||
content: "System prompt".to_string(),
|
||||
}];
|
||||
// Add 5 non-system messages
|
||||
for i in 0..5 {
|
||||
// Add 3 non-system messages
|
||||
for i in 0..3 {
|
||||
messages.push(ChatMessage {
|
||||
role: if i % 2 == 0 { "user" } else { "assistant" }.to_string(),
|
||||
content: format!("Message {i}"),
|
||||
@@ -1090,21 +1082,24 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn should_cache_conversation_boundary() {
|
||||
let mut messages = vec![];
|
||||
// Add exactly 4 non-system messages
|
||||
for i in 0..4 {
|
||||
messages.push(ChatMessage {
|
||||
role: if i % 2 == 0 { "user" } else { "assistant" }.to_string(),
|
||||
content: format!("Message {i}"),
|
||||
});
|
||||
}
|
||||
let messages = vec![ChatMessage {
|
||||
role: "user".to_string(),
|
||||
content: "Hello".to_string(),
|
||||
}];
|
||||
// Exactly 1 non-system message — should not cache
|
||||
assert!(!AnthropicProvider::should_cache_conversation(&messages));
|
||||
|
||||
// Add one more to cross boundary
|
||||
messages.push(ChatMessage {
|
||||
role: "user".to_string(),
|
||||
content: "One more".to_string(),
|
||||
});
|
||||
// Add one more to cross boundary (>1)
|
||||
let messages = vec![
|
||||
ChatMessage {
|
||||
role: "user".to_string(),
|
||||
content: "Hello".to_string(),
|
||||
},
|
||||
ChatMessage {
|
||||
role: "assistant".to_string(),
|
||||
content: "Hi".to_string(),
|
||||
},
|
||||
];
|
||||
assert!(AnthropicProvider::should_cache_conversation(&messages));
|
||||
}
|
||||
|
||||
@@ -1217,7 +1212,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn convert_messages_small_system_prompt() {
|
||||
fn convert_messages_small_system_prompt_uses_blocks_with_cache() {
|
||||
let messages = vec![ChatMessage {
|
||||
role: "system".to_string(),
|
||||
content: "Short system prompt".to_string(),
|
||||
@@ -1226,10 +1221,17 @@ mod tests {
|
||||
let (system_prompt, _) = AnthropicProvider::convert_messages(&messages);
|
||||
|
||||
match system_prompt.unwrap() {
|
||||
SystemPrompt::String(s) => {
|
||||
assert_eq!(s, "Short system prompt");
|
||||
SystemPrompt::Blocks(blocks) => {
|
||||
assert_eq!(blocks.len(), 1);
|
||||
assert_eq!(blocks[0].text, "Short system prompt");
|
||||
assert!(
|
||||
blocks[0].cache_control.is_some(),
|
||||
"Small system prompts should have cache_control"
|
||||
);
|
||||
}
|
||||
SystemPrompt::String(_) => {
|
||||
panic!("Expected Blocks variant with cache_control for small prompt")
|
||||
}
|
||||
SystemPrompt::Blocks(_) => panic!("Expected String variant for small prompt"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1254,12 +1256,16 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backward_compatibility_native_chat_request() {
|
||||
// Test that requests without cache_control serialize identically to old format
|
||||
fn native_chat_request_with_blocks_system() {
|
||||
// System prompts now always use Blocks format with cache_control
|
||||
let req = NativeChatRequest {
|
||||
model: "claude-3-opus".to_string(),
|
||||
max_tokens: 4096,
|
||||
system: Some(SystemPrompt::String("System".to_string())),
|
||||
system: Some(SystemPrompt::Blocks(vec![SystemBlock {
|
||||
block_type: "text".to_string(),
|
||||
text: "System".to_string(),
|
||||
cache_control: Some(CacheControl::ephemeral()),
|
||||
}])),
|
||||
messages: vec![NativeMessage {
|
||||
role: "user".to_string(),
|
||||
content: vec![NativeContentOut::Text {
|
||||
@@ -1272,8 +1278,11 @@ mod tests {
|
||||
};
|
||||
|
||||
let json = serde_json::to_string(&req).unwrap();
|
||||
assert!(!json.contains("cache_control"));
|
||||
assert!(json.contains(r#""system":"System""#));
|
||||
assert!(json.contains("System"));
|
||||
assert!(
|
||||
json.contains(r#""cache_control":{"type":"ephemeral"}"#),
|
||||
"System prompt should include cache_control"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
+61
-7
@@ -409,13 +409,43 @@ fn has_shell_shebang(path: &Path) -> bool {
|
||||
return false;
|
||||
};
|
||||
let prefix = &content[..content.len().min(128)];
|
||||
let shebang = String::from_utf8_lossy(prefix).to_ascii_lowercase();
|
||||
shebang.starts_with("#!")
|
||||
&& (shebang.contains("sh")
|
||||
|| shebang.contains("bash")
|
||||
|| shebang.contains("zsh")
|
||||
|| shebang.contains("pwsh")
|
||||
|| shebang.contains("powershell"))
|
||||
let shebang_line = String::from_utf8_lossy(prefix)
|
||||
.lines()
|
||||
.next()
|
||||
.unwrap_or_default()
|
||||
.trim()
|
||||
.to_ascii_lowercase();
|
||||
let Some(interpreter) = shebang_interpreter(&shebang_line) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
matches!(
|
||||
interpreter,
|
||||
"sh" | "bash" | "zsh" | "ksh" | "fish" | "pwsh" | "powershell"
|
||||
)
|
||||
}
|
||||
|
||||
fn shebang_interpreter(line: &str) -> Option<&str> {
|
||||
let shebang = line.strip_prefix("#!")?.trim();
|
||||
if shebang.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut parts = shebang.split_whitespace();
|
||||
let first = parts.next()?;
|
||||
let first_basename = Path::new(first).file_name()?.to_str()?;
|
||||
|
||||
if first_basename == "env" {
|
||||
for part in parts {
|
||||
if part.starts_with('-') {
|
||||
continue;
|
||||
}
|
||||
return Path::new(part).file_name()?.to_str();
|
||||
}
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(first_basename)
|
||||
}
|
||||
|
||||
fn extract_markdown_links(content: &str) -> Vec<String> {
|
||||
@@ -586,6 +616,30 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn audit_allows_python_shebang_file_when_early_text_contains_sh() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let skill_dir = dir.path().join("python-helper");
|
||||
let scripts_dir = skill_dir.join("scripts");
|
||||
std::fs::create_dir_all(&scripts_dir).unwrap();
|
||||
std::fs::write(skill_dir.join("SKILL.md"), "# Skill\n").unwrap();
|
||||
std::fs::write(
|
||||
scripts_dir.join("helper.py"),
|
||||
"#!/usr/bin/env python3\n\"\"\"Refresh report cache.\"\"\"\n\nprint(\"ok\")\n",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let report = audit_skill_directory(&skill_dir).unwrap();
|
||||
assert!(
|
||||
!report
|
||||
.findings
|
||||
.iter()
|
||||
.any(|finding| finding.contains("script-like files are blocked")),
|
||||
"{:#?}",
|
||||
report.findings
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn audit_rejects_markdown_escape_links() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
|
||||
+47
-1
@@ -130,6 +130,11 @@ impl Tool for CronAddTool {
|
||||
"type": "string",
|
||||
"description": "Optional model override for agent jobs, e.g. 'x-ai/grok-4-1-fast'"
|
||||
},
|
||||
"allowed_tools": {
|
||||
"type": "array",
|
||||
"items": { "type": "string" },
|
||||
"description": "Optional allowlist of tool names for agent jobs. When omitted, all tools remain available."
|
||||
},
|
||||
"delivery": {
|
||||
"type": "object",
|
||||
"description": "Optional delivery config to send job output to a channel after each run. When provided, all three of mode, channel, and to are expected.",
|
||||
@@ -288,6 +293,19 @@ impl Tool for CronAddTool {
|
||||
.get("model")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.map(str::to_string);
|
||||
let allowed_tools = match args.get("allowed_tools") {
|
||||
Some(v) => match serde_json::from_value::<Vec<String>>(v.clone()) {
|
||||
Ok(v) => Some(v),
|
||||
Err(e) => {
|
||||
return Ok(ToolResult {
|
||||
success: false,
|
||||
output: String::new(),
|
||||
error: Some(format!("Invalid allowed_tools: {e}")),
|
||||
});
|
||||
}
|
||||
},
|
||||
None => None,
|
||||
};
|
||||
|
||||
let delivery = match args.get("delivery") {
|
||||
Some(v) => match serde_json::from_value::<DeliveryConfig>(v.clone()) {
|
||||
@@ -316,6 +334,7 @@ impl Tool for CronAddTool {
|
||||
model,
|
||||
delivery,
|
||||
delete_after_run,
|
||||
allowed_tools,
|
||||
)
|
||||
}
|
||||
};
|
||||
@@ -329,7 +348,8 @@ impl Tool for CronAddTool {
|
||||
"job_type": job.job_type,
|
||||
"schedule": job.schedule,
|
||||
"next_run": job.next_run,
|
||||
"enabled": job.enabled
|
||||
"enabled": job.enabled,
|
||||
"allowed_tools": job.allowed_tools
|
||||
}))?,
|
||||
error: None,
|
||||
}),
|
||||
@@ -612,6 +632,32 @@ mod tests {
|
||||
.contains("Missing 'prompt'"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn agent_job_persists_allowed_tools() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let cfg = test_config(&tmp).await;
|
||||
let tool = CronAddTool::new(cfg.clone(), test_security(&cfg));
|
||||
|
||||
let result = tool
|
||||
.execute(json!({
|
||||
"schedule": { "kind": "cron", "expr": "*/5 * * * *" },
|
||||
"job_type": "agent",
|
||||
"prompt": "check status",
|
||||
"allowed_tools": ["file_read", "web_search"]
|
||||
}))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(result.success, "{:?}", result.error);
|
||||
|
||||
let jobs = cron::list_jobs(&cfg).unwrap();
|
||||
assert_eq!(jobs.len(), 1);
|
||||
assert_eq!(
|
||||
jobs[0].allowed_tools,
|
||||
Some(vec!["file_read".into(), "web_search".into()])
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delivery_schema_includes_matrix_channel() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
|
||||
@@ -89,6 +89,11 @@ impl Tool for CronUpdateTool {
|
||||
"type": "string",
|
||||
"description": "Model override for agent jobs, e.g. 'x-ai/grok-4-1-fast'"
|
||||
},
|
||||
"allowed_tools": {
|
||||
"type": "array",
|
||||
"items": { "type": "string" },
|
||||
"description": "Optional replacement allowlist of tool names for agent jobs"
|
||||
},
|
||||
"session_target": {
|
||||
"type": "string",
|
||||
"enum": ["isolated", "main"],
|
||||
@@ -403,6 +408,7 @@ mod tests {
|
||||
"command",
|
||||
"prompt",
|
||||
"model",
|
||||
"allowed_tools",
|
||||
"session_target",
|
||||
"delete_after_run",
|
||||
"schedule",
|
||||
@@ -501,4 +507,40 @@ mod tests {
|
||||
.contains("Rate limit exceeded"));
|
||||
assert!(cron::get_job(&cfg, &job.id).unwrap().enabled);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn updates_agent_allowed_tools() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let cfg = test_config(&tmp).await;
|
||||
let job = cron::add_agent_job(
|
||||
&cfg,
|
||||
None,
|
||||
crate::cron::Schedule::Cron {
|
||||
expr: "*/5 * * * *".into(),
|
||||
tz: None,
|
||||
},
|
||||
"check status",
|
||||
crate::cron::SessionTarget::Isolated,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
let tool = CronUpdateTool::new(cfg.clone(), test_security(&cfg));
|
||||
|
||||
let result = tool
|
||||
.execute(json!({
|
||||
"job_id": job.id,
|
||||
"patch": { "allowed_tools": ["file_read", "web_search"] }
|
||||
}))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(result.success, "{:?}", result.error);
|
||||
assert_eq!(
|
||||
cron::get_job(&cfg, &job.id).unwrap().allowed_tools,
|
||||
Some(vec!["file_read".into(), "web_search".into()])
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -418,6 +418,7 @@ impl DelegateTool {
|
||||
true,
|
||||
None,
|
||||
"delegate",
|
||||
None,
|
||||
&self.multimodal_config,
|
||||
agent_config.max_iterations,
|
||||
None,
|
||||
|
||||
+7
-2
@@ -146,7 +146,7 @@ pub use workspace_tool::WorkspaceTool;
|
||||
use crate::config::{Config, DelegateAgentConfig};
|
||||
use crate::memory::Memory;
|
||||
use crate::runtime::{NativeRuntime, RuntimeAdapter};
|
||||
use crate::security::SecurityPolicy;
|
||||
use crate::security::{create_sandbox, SecurityPolicy};
|
||||
use async_trait::async_trait;
|
||||
use parking_lot::RwLock;
|
||||
use std::collections::HashMap;
|
||||
@@ -283,8 +283,13 @@ pub fn all_tools_with_runtime(
|
||||
root_config: &crate::config::Config,
|
||||
) -> (Vec<Box<dyn Tool>>, Option<DelegateParentToolsHandle>) {
|
||||
let has_shell_access = runtime.has_shell_access();
|
||||
let sandbox = create_sandbox(&root_config.security);
|
||||
let mut tool_arcs: Vec<Arc<dyn Tool>> = vec![
|
||||
Arc::new(ShellTool::new(security.clone(), runtime)),
|
||||
Arc::new(ShellTool::new_with_sandbox(
|
||||
security.clone(),
|
||||
runtime,
|
||||
sandbox,
|
||||
)),
|
||||
Arc::new(FileReadTool::new(security.clone())),
|
||||
Arc::new(FileWriteTool::new(security.clone())),
|
||||
Arc::new(FileEditTool::new(security.clone())),
|
||||
|
||||
+82
-1
@@ -1,5 +1,6 @@
|
||||
use super::traits::{Tool, ToolResult};
|
||||
use crate::runtime::RuntimeAdapter;
|
||||
use crate::security::traits::Sandbox;
|
||||
use crate::security::SecurityPolicy;
|
||||
use async_trait::async_trait;
|
||||
use serde_json::json;
|
||||
@@ -44,11 +45,28 @@ const SAFE_ENV_VARS: &[&str] = &[
|
||||
pub struct ShellTool {
|
||||
security: Arc<SecurityPolicy>,
|
||||
runtime: Arc<dyn RuntimeAdapter>,
|
||||
sandbox: Arc<dyn Sandbox>,
|
||||
}
|
||||
|
||||
impl ShellTool {
|
||||
pub fn new(security: Arc<SecurityPolicy>, runtime: Arc<dyn RuntimeAdapter>) -> Self {
|
||||
Self { security, runtime }
|
||||
Self {
|
||||
security,
|
||||
runtime,
|
||||
sandbox: Arc::new(crate::security::NoopSandbox),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_sandbox(
|
||||
security: Arc<SecurityPolicy>,
|
||||
runtime: Arc<dyn RuntimeAdapter>,
|
||||
sandbox: Arc<dyn Sandbox>,
|
||||
) -> Self {
|
||||
Self {
|
||||
security,
|
||||
runtime,
|
||||
sandbox,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -169,6 +187,14 @@ impl Tool for ShellTool {
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// Apply sandbox wrapping before execution.
|
||||
// The Sandbox trait operates on std::process::Command, so use as_std_mut()
|
||||
// to get a mutable reference to the underlying command.
|
||||
self.sandbox
|
||||
.wrap_command(cmd.as_std_mut())
|
||||
.map_err(|e| anyhow::anyhow!("Sandbox error: {}", e))?;
|
||||
|
||||
cmd.env_clear();
|
||||
|
||||
for var in collect_allowed_shell_env_vars(&self.security) {
|
||||
@@ -690,4 +716,59 @@ mod tests {
|
||||
|| r2.error.as_deref().unwrap_or("").contains("budget")
|
||||
);
|
||||
}
|
||||
|
||||
// ── Sandbox integration tests ────────────────────────
|
||||
|
||||
#[test]
|
||||
fn shell_tool_can_be_constructed_with_sandbox() {
|
||||
use crate::security::NoopSandbox;
|
||||
|
||||
let sandbox: Arc<dyn Sandbox> = Arc::new(NoopSandbox);
|
||||
let tool = ShellTool::new_with_sandbox(
|
||||
test_security(AutonomyLevel::Supervised),
|
||||
test_runtime(),
|
||||
sandbox,
|
||||
);
|
||||
assert_eq!(tool.name(), "shell");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn noop_sandbox_does_not_modify_command() {
|
||||
use crate::security::NoopSandbox;
|
||||
|
||||
let sandbox = NoopSandbox;
|
||||
let mut cmd = std::process::Command::new("echo");
|
||||
cmd.arg("hello");
|
||||
|
||||
let program_before = cmd.get_program().to_os_string();
|
||||
let args_before: Vec<_> = cmd.get_args().map(|a| a.to_os_string()).collect();
|
||||
|
||||
sandbox
|
||||
.wrap_command(&mut cmd)
|
||||
.expect("wrap_command should succeed");
|
||||
|
||||
assert_eq!(cmd.get_program(), program_before);
|
||||
assert_eq!(
|
||||
cmd.get_args().map(|a| a.to_os_string()).collect::<Vec<_>>(),
|
||||
args_before
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shell_executes_with_sandbox() {
|
||||
use crate::security::NoopSandbox;
|
||||
|
||||
let sandbox: Arc<dyn Sandbox> = Arc::new(NoopSandbox);
|
||||
let tool = ShellTool::new_with_sandbox(
|
||||
test_security(AutonomyLevel::Supervised),
|
||||
test_runtime(),
|
||||
sandbox,
|
||||
);
|
||||
let result = tool
|
||||
.execute(json!({"command": "echo sandbox_test"}))
|
||||
.await
|
||||
.expect("command with sandbox should succeed");
|
||||
assert!(result.success);
|
||||
assert!(result.output.contains("sandbox_test"));
|
||||
}
|
||||
}
|
||||
|
||||
Vendored
BIN
Binary file not shown.
|
After Width: | Height: | Size: 2.1 MiB |
Binary file not shown.
|
After Width: | Height: | Size: 2.1 MiB |
@@ -193,6 +193,25 @@ export function getCronRuns(
|
||||
).then((data) => unwrapField(data, 'runs'));
|
||||
}
|
||||
|
||||
export interface CronSettings {
|
||||
enabled: boolean;
|
||||
catch_up_on_startup: boolean;
|
||||
max_run_history: number;
|
||||
}
|
||||
|
||||
export function getCronSettings(): Promise<CronSettings> {
|
||||
return apiFetch<CronSettings>('/api/cron/settings');
|
||||
}
|
||||
|
||||
export function patchCronSettings(
|
||||
patch: Partial<CronSettings>,
|
||||
): Promise<CronSettings> {
|
||||
return apiFetch<CronSettings & { status: string }>('/api/cron/settings', {
|
||||
method: 'PATCH',
|
||||
body: JSON.stringify(patch),
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Integrations
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
+62
-1
@@ -12,7 +12,15 @@ import {
|
||||
RefreshCw,
|
||||
} from 'lucide-react';
|
||||
import type { CronJob, CronRun } from '@/types/api';
|
||||
import { getCronJobs, addCronJob, deleteCronJob, getCronRuns } from '@/lib/api';
|
||||
import {
|
||||
getCronJobs,
|
||||
addCronJob,
|
||||
deleteCronJob,
|
||||
getCronRuns,
|
||||
getCronSettings,
|
||||
patchCronSettings,
|
||||
} from '@/lib/api';
|
||||
import type { CronSettings } from '@/lib/api';
|
||||
import { t } from '@/lib/i18n';
|
||||
|
||||
function formatDate(iso: string | null): string {
|
||||
@@ -143,6 +151,8 @@ export default function Cron() {
|
||||
const [showForm, setShowForm] = useState(false);
|
||||
const [confirmDelete, setConfirmDelete] = useState<string | null>(null);
|
||||
const [expandedJob, setExpandedJob] = useState<string | null>(null);
|
||||
const [settings, setSettings] = useState<CronSettings | null>(null);
|
||||
const [togglingCatchUp, setTogglingCatchUp] = useState(false);
|
||||
|
||||
// Form state
|
||||
const [formName, setFormName] = useState('');
|
||||
@@ -159,8 +169,28 @@ export default function Cron() {
|
||||
.finally(() => setLoading(false));
|
||||
};
|
||||
|
||||
const fetchSettings = () => {
|
||||
getCronSettings().then(setSettings).catch(() => {});
|
||||
};
|
||||
|
||||
const toggleCatchUp = async () => {
|
||||
if (!settings) return;
|
||||
setTogglingCatchUp(true);
|
||||
try {
|
||||
const updated = await patchCronSettings({
|
||||
catch_up_on_startup: !settings.catch_up_on_startup,
|
||||
});
|
||||
setSettings(updated);
|
||||
} catch {
|
||||
// silently fail — user can retry
|
||||
} finally {
|
||||
setTogglingCatchUp(false);
|
||||
}
|
||||
};
|
||||
|
||||
useEffect(() => {
|
||||
fetchJobs();
|
||||
fetchSettings();
|
||||
}, []);
|
||||
|
||||
const handleAdd = async () => {
|
||||
@@ -250,6 +280,37 @@ export default function Cron() {
|
||||
</button>
|
||||
</div>
|
||||
|
||||
{/* Catch-up toggle */}
|
||||
{settings && (
|
||||
<div className="glass-card px-4 py-3 flex items-center justify-between">
|
||||
<div>
|
||||
<span className="text-sm font-medium text-white">
|
||||
Catch up missed jobs on startup
|
||||
</span>
|
||||
<p className="text-xs text-[#556080] mt-0.5">
|
||||
Run all overdue jobs when ZeroClaw starts after downtime
|
||||
</p>
|
||||
</div>
|
||||
<button
|
||||
onClick={toggleCatchUp}
|
||||
disabled={togglingCatchUp}
|
||||
className={`relative inline-flex h-6 w-11 items-center rounded-full transition-colors duration-300 focus:outline-none ${
|
||||
settings.catch_up_on_startup
|
||||
? 'bg-[#0080ff]'
|
||||
: 'bg-[#1a1a3e]'
|
||||
}`}
|
||||
>
|
||||
<span
|
||||
className={`inline-block h-4 w-4 rounded-full bg-white transition-transform duration-300 ${
|
||||
settings.catch_up_on_startup
|
||||
? 'translate-x-6'
|
||||
: 'translate-x-1'
|
||||
}`}
|
||||
/>
|
||||
</button>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Add Job Form Modal */}
|
||||
{showForm && (
|
||||
<div className="fixed inset-0 modal-backdrop flex items-center justify-center z-50">
|
||||
|
||||
Reference in New Issue
Block a user