fix(cron): persist delivery for api-created cron jobs (#4087)

Resolves merge conflicts from PR #4064. Uses typed DeliveryConfig in
CronAddBody and passes delivery directly to add_shell_job_with_approval
and add_agent_job instead of post-creation patching. Preserves master's
richer API fields (session_target, model, allowed_tools, delete_after_run).
This commit is contained in:
Argenis 2026-03-20 15:42:00 -04:00 committed by Roman Tataurov
parent a22cec8adf
commit a3dbf19d72
No known key found for this signature in database
GPG Key ID: 70A51EF3185C334B
6 changed files with 515 additions and 77 deletions

View File

@ -45,6 +45,39 @@ pub(crate) fn validate_shell_command_with_security(
.map_err(|reason| anyhow!("blocked by security policy: {reason}"))
}
pub(crate) fn validate_delivery_config(delivery: Option<&DeliveryConfig>) -> Result<()> {
let Some(delivery) = delivery else {
return Ok(());
};
if delivery.mode.eq_ignore_ascii_case("none") {
return Ok(());
}
if !delivery.mode.eq_ignore_ascii_case("announce") {
bail!("unsupported delivery mode: {}", delivery.mode);
}
let channel = delivery.channel.as_deref().map(str::trim);
let Some(channel) = channel.filter(|value| !value.is_empty()) else {
bail!("delivery.channel is required for announce mode");
};
match channel.to_ascii_lowercase().as_str() {
"telegram" | "discord" | "slack" | "mattermost" | "signal" | "matrix" => {}
other => bail!("unsupported delivery channel: {other}"),
}
let has_target = delivery
.to
.as_deref()
.map(str::trim)
.is_some_and(|value| !value.is_empty());
if !has_target {
bail!("delivery.to is required for announce mode");
}
Ok(())
}
/// Create a validated shell job, enforcing security policy before persistence.
///
/// All entrypoints that create shell cron jobs should route through this
@ -54,10 +87,12 @@ pub fn add_shell_job_with_approval(
name: Option<String>,
schedule: Schedule,
command: &str,
delivery: Option<DeliveryConfig>,
approved: bool,
) -> Result<CronJob> {
validate_shell_command(config, command, approved)?;
store::add_shell_job(config, name, schedule, command)
validate_delivery_config(delivery.as_ref())?;
store::add_shell_job(config, name, schedule, command, delivery)
}
/// Update a shell job's command with security validation.
@ -95,7 +130,7 @@ pub fn add_once_at_validated(
approved: bool,
) -> Result<CronJob> {
let schedule = Schedule::At { at };
add_shell_job_with_approval(config, None, schedule, command, approved)
add_shell_job_with_approval(config, None, schedule, command, None, approved)
}
// Convenience wrappers for CLI paths (default approved=false).
@ -106,7 +141,7 @@ pub(crate) fn add_shell_job(
schedule: Schedule,
command: &str,
) -> Result<CronJob> {
add_shell_job_with_approval(config, name, schedule, command, false)
add_shell_job_with_approval(config, name, schedule, command, None, false)
}
pub(crate) fn add_job(config: &Config, expression: &str, command: &str) -> Result<CronJob> {
@ -680,6 +715,7 @@ mod tests {
tz: None,
},
"touch cron-medium-risk",
None,
true,
);
assert!(approved.is_ok(), "{approved:?}");
@ -812,6 +848,7 @@ mod tests {
tz: None,
},
"curl https://example.com",
None,
false,
);
assert!(result.is_err());

View File

@ -1,7 +1,7 @@
use crate::config::Config;
use crate::cron::{
next_run_for_schedule, schedule_cron_expression, validate_schedule, CronJob, CronJobPatch,
CronRun, DeliveryConfig, JobType, Schedule, SessionTarget,
next_run_for_schedule, schedule_cron_expression, validate_delivery_config, validate_schedule,
CronJob, CronJobPatch, CronRun, DeliveryConfig, JobType, Schedule, SessionTarget,
};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
@ -24,7 +24,7 @@ pub fn add_job(config: &Config, expression: &str, command: &str) -> Result<CronJ
expr: expression.to_string(),
tz: None,
};
add_shell_job(config, None, schedule, command)
add_shell_job(config, None, schedule, command, None)
}
pub fn add_shell_job(
@ -32,13 +32,16 @@ pub fn add_shell_job(
name: Option<String>,
schedule: Schedule,
command: &str,
delivery: Option<DeliveryConfig>,
) -> Result<CronJob> {
let now = Utc::now();
validate_schedule(&schedule, now)?;
validate_delivery_config(delivery.as_ref())?;
let next_run = next_run_for_schedule(&schedule, now)?;
let id = Uuid::new_v4().to_string();
let expression = schedule_cron_expression(&schedule).unwrap_or_default();
let schedule_json = serde_json::to_string(&schedule)?;
let delivery = delivery.unwrap_or_default();
let delete_after_run = matches!(schedule, Schedule::At { .. });
@ -54,7 +57,7 @@ pub fn add_shell_job(
command,
schedule_json,
name,
serde_json::to_string(&DeliveryConfig::default())?,
serde_json::to_string(&delivery)?,
if delete_after_run { 1 } else { 0 },
now.to_rfc3339(),
next_run.to_rfc3339(),
@ -81,6 +84,7 @@ pub fn add_agent_job(
) -> Result<CronJob> {
let now = Utc::now();
validate_schedule(&schedule, now)?;
validate_delivery_config(delivery.as_ref())?;
let next_run = next_run_for_schedule(&schedule, now)?;
let id = Uuid::new_v4().to_string();
let expression = schedule_cron_expression(&schedule).unwrap_or_default();
@ -694,6 +698,7 @@ mod tests {
at: Utc::now() + ChronoDuration::minutes(10),
},
"echo once",
None,
)
.unwrap();
assert!(one_shot.delete_after_run);
@ -703,11 +708,98 @@ mod tests {
None,
Schedule::Every { every_ms: 60_000 },
"echo recurring",
None,
)
.unwrap();
assert!(!recurring.delete_after_run);
}
#[test]
fn add_shell_job_persists_delivery() {
let tmp = TempDir::new().unwrap();
let config = test_config(&tmp);
let job = add_shell_job(
&config,
Some("deliver-shell".into()),
Schedule::Cron {
expr: "*/5 * * * *".into(),
tz: None,
},
"echo delivered",
Some(DeliveryConfig {
mode: "announce".into(),
channel: Some("discord".into()),
to: Some("1234567890".into()),
best_effort: true,
}),
)
.unwrap();
assert_eq!(job.delivery.mode, "announce");
assert_eq!(job.delivery.channel.as_deref(), Some("discord"));
assert_eq!(job.delivery.to.as_deref(), Some("1234567890"));
let stored = get_job(&config, &job.id).unwrap();
assert_eq!(stored.delivery.mode, "announce");
assert_eq!(stored.delivery.channel.as_deref(), Some("discord"));
assert_eq!(stored.delivery.to.as_deref(), Some("1234567890"));
}
#[test]
fn add_agent_job_rejects_invalid_announce_delivery() {
let tmp = TempDir::new().unwrap();
let config = test_config(&tmp);
let err = add_agent_job(
&config,
Some("deliver-agent".into()),
Schedule::Cron {
expr: "*/5 * * * *".into(),
tz: None,
},
"summarize logs",
SessionTarget::Isolated,
None,
Some(DeliveryConfig {
mode: "announce".into(),
channel: Some("discord".into()),
to: None,
best_effort: true,
}),
false,
None,
)
.unwrap_err();
assert!(err.to_string().contains("delivery.to is required"));
}
#[test]
fn add_shell_job_rejects_invalid_delivery_mode() {
let tmp = TempDir::new().unwrap();
let config = test_config(&tmp);
let err = add_shell_job(
&config,
Some("deliver-shell".into()),
Schedule::Cron {
expr: "*/5 * * * *".into(),
tz: None,
},
"echo delivered",
Some(DeliveryConfig {
mode: "annouce".into(),
channel: Some("discord".into()),
to: Some("1234567890".into()),
best_effort: true,
}),
)
.unwrap_err();
assert!(err.to_string().contains("unsupported delivery mode"));
}
#[test]
fn add_list_remove_roundtrip() {
let tmp = TempDir::new().unwrap();
@ -1034,7 +1126,7 @@ mod tests {
let tmp = TempDir::new().unwrap();
let config = test_config(&tmp);
let at = Utc::now() + ChronoDuration::minutes(10);
let job = add_shell_job(&config, None, Schedule::At { at }, "echo once").unwrap();
let job = add_shell_job(&config, None, Schedule::At { at }, "echo once", None).unwrap();
reschedule_after_run(&config, &job, true, "done").unwrap();
@ -1051,7 +1143,7 @@ mod tests {
let tmp = TempDir::new().unwrap();
let config = test_config(&tmp);
let at = Utc::now() + ChronoDuration::minutes(10);
let job = add_shell_job(&config, None, Schedule::At { at }, "echo once").unwrap();
let job = add_shell_job(&config, None, Schedule::At { at }, "echo once", None).unwrap();
reschedule_after_run(&config, &job, false, "failed").unwrap();

View File

@ -71,7 +71,7 @@ pub struct CronAddBody {
pub command: Option<String>,
pub job_type: Option<String>,
pub prompt: Option<String>,
pub delivery: Option<serde_json::Value>,
pub delivery: Option<crate::cron::DeliveryConfig>,
pub session_target: Option<String>,
pub model: Option<String>,
pub allowed_tools: Option<Vec<String>>,
@ -239,11 +239,11 @@ pub async fn handle_api_cron_list(
"command": job.command,
"prompt": job.prompt,
"schedule": job.schedule,
"delivery": job.delivery,
"next_run": job.next_run.to_rfc3339(),
"last_run": job.last_run.map(|t| t.to_rfc3339()),
"last_status": job.last_status,
"enabled": job.enabled,
"delivery": job.delivery,
})
})
.collect();
@ -267,18 +267,38 @@ pub async fn handle_api_cron_add(
return e.into_response();
}
let CronAddBody {
name,
schedule,
command,
job_type,
prompt,
delivery,
session_target,
model,
allowed_tools,
delete_after_run,
} = body;
let config = state.config.lock().clone();
let schedule = crate::cron::Schedule::Cron {
expr: body.schedule,
expr: schedule,
tz: None,
};
if let Err(e) = crate::cron::validate_delivery_config(delivery.as_ref()) {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": format!("Failed to add cron job: {e}")})),
)
.into_response();
}
// Determine job type: explicit field, or infer "agent" when prompt is provided.
let is_agent = matches!(body.job_type.as_deref(), Some("agent"))
|| (body.job_type.is_none() && body.prompt.is_some());
let is_agent =
matches!(job_type.as_deref(), Some("agent")) || (job_type.is_none() && prompt.is_some());
let result = if is_agent {
let prompt = match body.prompt.as_deref() {
let prompt = match prompt.as_deref() {
Some(p) if !p.trim().is_empty() => p,
_ => {
return (
@ -289,42 +309,27 @@ pub async fn handle_api_cron_add(
}
};
let session_target = body
.session_target
let session_target = session_target
.as_deref()
.map(crate::cron::SessionTarget::parse)
.unwrap_or_default();
let delivery_config = match &body.delivery {
Some(v) => match serde_json::from_value::<crate::cron::DeliveryConfig>(v.clone()) {
Ok(cfg) => Some(cfg),
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": format!("Invalid delivery config: {e}")})),
)
.into_response();
}
},
None => None,
};
let default_delete = matches!(schedule, crate::cron::Schedule::At { .. });
let delete_after_run = body.delete_after_run.unwrap_or(default_delete);
let delete_after_run = delete_after_run.unwrap_or(default_delete);
crate::cron::add_agent_job(
&config,
body.name,
name,
schedule,
prompt,
session_target,
body.model,
delivery_config,
model,
delivery,
delete_after_run,
body.allowed_tools,
allowed_tools,
)
} else {
let command = match body.command.as_deref() {
let command = match command.as_deref() {
Some(c) if !c.trim().is_empty() => c,
_ => {
return (
@ -335,30 +340,7 @@ pub async fn handle_api_cron_add(
}
};
let mut job_result =
crate::cron::add_shell_job_with_approval(&config, body.name, schedule, command, false);
// If delivery was provided, patch the created job to persist delivery config.
if let (Ok(ref job), Some(ref delivery_val)) = (&job_result, &body.delivery) {
match serde_json::from_value::<crate::cron::DeliveryConfig>(delivery_val.clone()) {
Ok(delivery_cfg) => {
let patch = crate::cron::CronJobPatch {
delivery: Some(delivery_cfg),
..crate::cron::CronJobPatch::default()
};
job_result = crate::cron::update_job(&config, &job.id, patch);
}
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": format!("Invalid delivery config: {e}")})),
)
.into_response();
}
}
}
job_result
crate::cron::add_shell_job_with_approval(&config, name, schedule, command, delivery, false)
};
match result {
@ -371,8 +353,8 @@ pub async fn handle_api_cron_add(
"command": job.command,
"prompt": job.prompt,
"schedule": job.schedule,
"delivery": job.delivery,
"enabled": job.enabled,
"delivery": job.delivery,
}
}))
.into_response(),
@ -1305,6 +1287,125 @@ pub async fn handle_api_session_delete(
#[cfg(test)]
mod tests {
use super::*;
use crate::gateway::{nodes, AppState, GatewayRateLimiter, IdempotencyStore};
use crate::memory::{Memory, MemoryCategory, MemoryEntry};
use crate::providers::Provider;
use crate::security::pairing::PairingGuard;
use async_trait::async_trait;
use axum::response::IntoResponse;
use http_body_util::BodyExt;
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;
struct MockMemory;
#[async_trait]
impl Memory for MockMemory {
fn name(&self) -> &str {
"mock"
}
async fn store(
&self,
_key: &str,
_content: &str,
_category: MemoryCategory,
_session_id: Option<&str>,
) -> anyhow::Result<()> {
Ok(())
}
async fn recall(
&self,
_query: &str,
_limit: usize,
_session_id: Option<&str>,
) -> anyhow::Result<Vec<MemoryEntry>> {
Ok(Vec::new())
}
async fn get(&self, _key: &str) -> anyhow::Result<Option<MemoryEntry>> {
Ok(None)
}
async fn list(
&self,
_category: Option<&MemoryCategory>,
_session_id: Option<&str>,
) -> anyhow::Result<Vec<MemoryEntry>> {
Ok(Vec::new())
}
async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
Ok(false)
}
async fn count(&self) -> anyhow::Result<usize> {
Ok(0)
}
async fn health_check(&self) -> bool {
true
}
}
struct MockProvider;
#[async_trait]
impl Provider for MockProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
_message: &str,
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
Ok("ok".to_string())
}
}
fn test_state(config: crate::config::Config) -> AppState {
AppState {
config: Arc::new(Mutex::new(config)),
provider: Arc::new(MockProvider),
model: "test-model".into(),
temperature: 0.0,
mem: Arc::new(MockMemory),
auto_save: false,
webhook_secret_hash: None,
pairing: Arc::new(PairingGuard::new(false, &[])),
trust_forwarded_headers: false,
rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)),
idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)),
whatsapp: None,
whatsapp_app_secret: None,
linq: None,
linq_signing_secret: None,
nextcloud_talk: None,
nextcloud_talk_webhook_secret: None,
wati: None,
observer: Arc::new(crate::observability::NoopObserver),
tools_registry: Arc::new(Vec::new()),
cost_tracker: None,
event_tx: tokio::sync::broadcast::channel(16).0,
shutdown_tx: tokio::sync::watch::channel(false).0,
node_registry: Arc::new(nodes::NodeRegistry::new(16)),
session_backend: None,
device_registry: None,
pending_pairings: None,
}
}
async fn response_json(response: axum::response::Response) -> serde_json::Value {
let body = response
.into_body()
.collect()
.await
.expect("response body")
.to_bytes();
serde_json::from_slice(&body).expect("valid json response")
}
#[test]
fn masking_keeps_toml_valid_and_preserves_api_keys_type() {
@ -1712,4 +1813,174 @@ mod tests {
.iter()
.all(|route| route.api_key.as_deref() != Some(MASKED_SECRET)));
}
#[tokio::test]
async fn cron_api_shell_roundtrip_includes_delivery() {
let tmp = tempfile::TempDir::new().unwrap();
let config = crate::config::Config {
workspace_dir: tmp.path().join("workspace"),
config_path: tmp.path().join("config.toml"),
..crate::config::Config::default()
};
std::fs::create_dir_all(&config.workspace_dir).unwrap();
let state = test_state(config);
let add_response = handle_api_cron_add(
State(state.clone()),
HeaderMap::new(),
Json(
serde_json::from_value::<CronAddBody>(serde_json::json!({
"name": "test-job",
"schedule": "*/5 * * * *",
"command": "echo hello",
"delivery": {
"mode": "announce",
"channel": "discord",
"to": "1234567890",
"best_effort": true
}
}))
.expect("body should deserialize"),
),
)
.await
.into_response();
let add_json = response_json(add_response).await;
assert_eq!(add_json["status"], "ok");
assert_eq!(add_json["job"]["delivery"]["mode"], "announce");
assert_eq!(add_json["job"]["delivery"]["channel"], "discord");
assert_eq!(add_json["job"]["delivery"]["to"], "1234567890");
let list_response = handle_api_cron_list(State(state), HeaderMap::new())
.await
.into_response();
let list_json = response_json(list_response).await;
let jobs = list_json["jobs"].as_array().expect("jobs array");
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0]["delivery"]["mode"], "announce");
assert_eq!(jobs[0]["delivery"]["channel"], "discord");
assert_eq!(jobs[0]["delivery"]["to"], "1234567890");
}
#[tokio::test]
async fn cron_api_accepts_agent_jobs() {
let tmp = tempfile::TempDir::new().unwrap();
let config = crate::config::Config {
workspace_dir: tmp.path().join("workspace"),
config_path: tmp.path().join("config.toml"),
..crate::config::Config::default()
};
std::fs::create_dir_all(&config.workspace_dir).unwrap();
let state = test_state(config);
let response = handle_api_cron_add(
State(state.clone()),
HeaderMap::new(),
Json(
serde_json::from_value::<CronAddBody>(serde_json::json!({
"name": "agent-job",
"schedule": "*/5 * * * *",
"job_type": "agent",
"command": "ignored shell command",
"prompt": "summarize the latest logs"
}))
.expect("body should deserialize"),
),
)
.await
.into_response();
let json = response_json(response).await;
assert_eq!(json["status"], "ok");
let config = state.config.lock().clone();
let jobs = crate::cron::list_jobs(&config).unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].job_type, crate::cron::JobType::Agent);
assert_eq!(jobs[0].prompt.as_deref(), Some("summarize the latest logs"));
}
#[tokio::test]
async fn cron_api_rejects_announce_delivery_without_target() {
let tmp = tempfile::TempDir::new().unwrap();
let config = crate::config::Config {
workspace_dir: tmp.path().join("workspace"),
config_path: tmp.path().join("config.toml"),
..crate::config::Config::default()
};
std::fs::create_dir_all(&config.workspace_dir).unwrap();
let state = test_state(config);
let response = handle_api_cron_add(
State(state.clone()),
HeaderMap::new(),
Json(
serde_json::from_value::<CronAddBody>(serde_json::json!({
"name": "invalid-delivery-job",
"schedule": "*/5 * * * *",
"command": "echo hello",
"delivery": {
"mode": "announce",
"channel": "discord"
}
}))
.expect("body should deserialize"),
),
)
.await
.into_response();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
let json = response_json(response).await;
assert!(json["error"]
.as_str()
.unwrap_or_default()
.contains("delivery.to is required"));
let config = state.config.lock().clone();
assert!(crate::cron::list_jobs(&config).unwrap().is_empty());
}
#[tokio::test]
async fn cron_api_rejects_announce_delivery_with_unsupported_channel() {
let tmp = tempfile::TempDir::new().unwrap();
let config = crate::config::Config {
workspace_dir: tmp.path().join("workspace"),
config_path: tmp.path().join("config.toml"),
..crate::config::Config::default()
};
std::fs::create_dir_all(&config.workspace_dir).unwrap();
let state = test_state(config);
let response = handle_api_cron_add(
State(state.clone()),
HeaderMap::new(),
Json(
serde_json::from_value::<CronAddBody>(serde_json::json!({
"name": "invalid-delivery-job",
"schedule": "*/5 * * * *",
"command": "echo hello",
"delivery": {
"mode": "announce",
"channel": "email",
"to": "alerts@example.com"
}
}))
.expect("body should deserialize"),
),
)
.await
.into_response();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
let json = response_json(response).await;
assert!(json["error"]
.as_str()
.unwrap_or_default()
.contains("unsupported delivery channel"));
let config = state.config.lock().clone();
assert!(crate::cron::list_jobs(&config).unwrap().is_empty());
}
}

View File

@ -235,6 +235,19 @@ impl Tool for CronAddTool {
.get("approved")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
let delivery = match args.get("delivery") {
Some(v) => match serde_json::from_value::<DeliveryConfig>(v.clone()) {
Ok(cfg) => Some(cfg),
Err(e) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("Invalid delivery config: {e}")),
});
}
},
None => None,
};
let result = match job_type {
JobType::Shell => {
@ -261,7 +274,14 @@ impl Tool for CronAddTool {
return Ok(blocked);
}
cron::add_shell_job_with_approval(&self.config, name, schedule, command, approved)
cron::add_shell_job_with_approval(
&self.config,
name,
schedule,
command,
delivery,
approved,
)
}
JobType::Agent => {
let prompt = match args.get("prompt").and_then(serde_json::Value::as_str) {
@ -307,20 +327,6 @@ impl Tool for CronAddTool {
None => None,
};
let delivery = match args.get("delivery") {
Some(v) => match serde_json::from_value::<DeliveryConfig>(v.clone()) {
Ok(cfg) => Some(cfg),
Err(e) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("Invalid delivery config: {e}")),
});
}
},
None => None,
};
if let Some(blocked) = self.enforce_mutation_allowed("cron_add") {
return Ok(blocked);
}
@ -406,6 +412,36 @@ mod tests {
assert!(result.output.contains("next_run"));
}
#[tokio::test]
async fn shell_job_persists_delivery() {
let tmp = TempDir::new().unwrap();
let cfg = test_config(&tmp).await;
let tool = CronAddTool::new(cfg.clone(), test_security(&cfg));
let result = tool
.execute(json!({
"schedule": { "kind": "cron", "expr": "*/5 * * * *" },
"job_type": "shell",
"command": "echo ok",
"delivery": {
"mode": "announce",
"channel": "discord",
"to": "1234567890",
"best_effort": true
}
}))
.await
.unwrap();
assert!(result.success, "{:?}", result.error);
let jobs = cron::list_jobs(&cfg).unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].delivery.mode, "announce");
assert_eq!(jobs[0].delivery.channel.as_deref(), Some("discord"));
assert_eq!(jobs[0].delivery.to.as_deref(), Some("1234567890"));
assert!(jobs[0].delivery.best_effort);
}
#[tokio::test]
async fn blocks_disallowed_shell_command() {
let tmp = TempDir::new().unwrap();

View File

@ -244,6 +244,7 @@ mod tests {
tz: None,
},
"touch cron-run-approval",
None,
true,
)
.unwrap();

View File

@ -315,6 +315,7 @@ impl ScheduleTool {
tz: None,
},
command,
None,
approved,
) {
Ok(job) => job,