diff --git a/src/cron/mod.rs b/src/cron/mod.rs index 7153fbd25..7e389a7aa 100644 --- a/src/cron/mod.rs +++ b/src/cron/mod.rs @@ -45,6 +45,39 @@ pub(crate) fn validate_shell_command_with_security( .map_err(|reason| anyhow!("blocked by security policy: {reason}")) } +pub(crate) fn validate_delivery_config(delivery: Option<&DeliveryConfig>) -> Result<()> { + let Some(delivery) = delivery else { + return Ok(()); + }; + + if delivery.mode.eq_ignore_ascii_case("none") { + return Ok(()); + } + if !delivery.mode.eq_ignore_ascii_case("announce") { + bail!("unsupported delivery mode: {}", delivery.mode); + } + + let channel = delivery.channel.as_deref().map(str::trim); + let Some(channel) = channel.filter(|value| !value.is_empty()) else { + bail!("delivery.channel is required for announce mode"); + }; + match channel.to_ascii_lowercase().as_str() { + "telegram" | "discord" | "slack" | "mattermost" | "signal" | "matrix" => {} + other => bail!("unsupported delivery channel: {other}"), + } + + let has_target = delivery + .to + .as_deref() + .map(str::trim) + .is_some_and(|value| !value.is_empty()); + if !has_target { + bail!("delivery.to is required for announce mode"); + } + + Ok(()) +} + /// Create a validated shell job, enforcing security policy before persistence. /// /// All entrypoints that create shell cron jobs should route through this @@ -54,10 +87,12 @@ pub fn add_shell_job_with_approval( name: Option, schedule: Schedule, command: &str, + delivery: Option, approved: bool, ) -> Result { validate_shell_command(config, command, approved)?; - store::add_shell_job(config, name, schedule, command) + validate_delivery_config(delivery.as_ref())?; + store::add_shell_job(config, name, schedule, command, delivery) } /// Update a shell job's command with security validation. @@ -95,7 +130,7 @@ pub fn add_once_at_validated( approved: bool, ) -> Result { let schedule = Schedule::At { at }; - add_shell_job_with_approval(config, None, schedule, command, approved) + add_shell_job_with_approval(config, None, schedule, command, None, approved) } // Convenience wrappers for CLI paths (default approved=false). @@ -106,7 +141,7 @@ pub(crate) fn add_shell_job( schedule: Schedule, command: &str, ) -> Result { - add_shell_job_with_approval(config, name, schedule, command, false) + add_shell_job_with_approval(config, name, schedule, command, None, false) } pub(crate) fn add_job(config: &Config, expression: &str, command: &str) -> Result { @@ -680,6 +715,7 @@ mod tests { tz: None, }, "touch cron-medium-risk", + None, true, ); assert!(approved.is_ok(), "{approved:?}"); @@ -812,6 +848,7 @@ mod tests { tz: None, }, "curl https://example.com", + None, false, ); assert!(result.is_err()); diff --git a/src/cron/store.rs b/src/cron/store.rs index ef5599b55..81c93820a 100644 --- a/src/cron/store.rs +++ b/src/cron/store.rs @@ -1,7 +1,7 @@ use crate::config::Config; use crate::cron::{ - next_run_for_schedule, schedule_cron_expression, validate_schedule, CronJob, CronJobPatch, - CronRun, DeliveryConfig, JobType, Schedule, SessionTarget, + next_run_for_schedule, schedule_cron_expression, validate_delivery_config, validate_schedule, + CronJob, CronJobPatch, CronRun, DeliveryConfig, JobType, Schedule, SessionTarget, }; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; @@ -24,7 +24,7 @@ pub fn add_job(config: &Config, expression: &str, command: &str) -> Result, schedule: Schedule, command: &str, + delivery: Option, ) -> Result { let now = Utc::now(); validate_schedule(&schedule, now)?; + validate_delivery_config(delivery.as_ref())?; let next_run = next_run_for_schedule(&schedule, now)?; let id = Uuid::new_v4().to_string(); let expression = schedule_cron_expression(&schedule).unwrap_or_default(); let schedule_json = serde_json::to_string(&schedule)?; + let delivery = delivery.unwrap_or_default(); let delete_after_run = matches!(schedule, Schedule::At { .. }); @@ -54,7 +57,7 @@ pub fn add_shell_job( command, schedule_json, name, - serde_json::to_string(&DeliveryConfig::default())?, + serde_json::to_string(&delivery)?, if delete_after_run { 1 } else { 0 }, now.to_rfc3339(), next_run.to_rfc3339(), @@ -81,6 +84,7 @@ pub fn add_agent_job( ) -> Result { let now = Utc::now(); validate_schedule(&schedule, now)?; + validate_delivery_config(delivery.as_ref())?; let next_run = next_run_for_schedule(&schedule, now)?; let id = Uuid::new_v4().to_string(); let expression = schedule_cron_expression(&schedule).unwrap_or_default(); @@ -694,6 +698,7 @@ mod tests { at: Utc::now() + ChronoDuration::minutes(10), }, "echo once", + None, ) .unwrap(); assert!(one_shot.delete_after_run); @@ -703,11 +708,98 @@ mod tests { None, Schedule::Every { every_ms: 60_000 }, "echo recurring", + None, ) .unwrap(); assert!(!recurring.delete_after_run); } + #[test] + fn add_shell_job_persists_delivery() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let job = add_shell_job( + &config, + Some("deliver-shell".into()), + Schedule::Cron { + expr: "*/5 * * * *".into(), + tz: None, + }, + "echo delivered", + Some(DeliveryConfig { + mode: "announce".into(), + channel: Some("discord".into()), + to: Some("1234567890".into()), + best_effort: true, + }), + ) + .unwrap(); + + assert_eq!(job.delivery.mode, "announce"); + assert_eq!(job.delivery.channel.as_deref(), Some("discord")); + assert_eq!(job.delivery.to.as_deref(), Some("1234567890")); + + let stored = get_job(&config, &job.id).unwrap(); + assert_eq!(stored.delivery.mode, "announce"); + assert_eq!(stored.delivery.channel.as_deref(), Some("discord")); + assert_eq!(stored.delivery.to.as_deref(), Some("1234567890")); + } + + #[test] + fn add_agent_job_rejects_invalid_announce_delivery() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let err = add_agent_job( + &config, + Some("deliver-agent".into()), + Schedule::Cron { + expr: "*/5 * * * *".into(), + tz: None, + }, + "summarize logs", + SessionTarget::Isolated, + None, + Some(DeliveryConfig { + mode: "announce".into(), + channel: Some("discord".into()), + to: None, + best_effort: true, + }), + false, + None, + ) + .unwrap_err(); + + assert!(err.to_string().contains("delivery.to is required")); + } + + #[test] + fn add_shell_job_rejects_invalid_delivery_mode() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let err = add_shell_job( + &config, + Some("deliver-shell".into()), + Schedule::Cron { + expr: "*/5 * * * *".into(), + tz: None, + }, + "echo delivered", + Some(DeliveryConfig { + mode: "annouce".into(), + channel: Some("discord".into()), + to: Some("1234567890".into()), + best_effort: true, + }), + ) + .unwrap_err(); + + assert!(err.to_string().contains("unsupported delivery mode")); + } + #[test] fn add_list_remove_roundtrip() { let tmp = TempDir::new().unwrap(); @@ -1034,7 +1126,7 @@ mod tests { let tmp = TempDir::new().unwrap(); let config = test_config(&tmp); let at = Utc::now() + ChronoDuration::minutes(10); - let job = add_shell_job(&config, None, Schedule::At { at }, "echo once").unwrap(); + let job = add_shell_job(&config, None, Schedule::At { at }, "echo once", None).unwrap(); reschedule_after_run(&config, &job, true, "done").unwrap(); @@ -1051,7 +1143,7 @@ mod tests { let tmp = TempDir::new().unwrap(); let config = test_config(&tmp); let at = Utc::now() + ChronoDuration::minutes(10); - let job = add_shell_job(&config, None, Schedule::At { at }, "echo once").unwrap(); + let job = add_shell_job(&config, None, Schedule::At { at }, "echo once", None).unwrap(); reschedule_after_run(&config, &job, false, "failed").unwrap(); diff --git a/src/gateway/api.rs b/src/gateway/api.rs index bfbee26fd..ee5d4fd94 100644 --- a/src/gateway/api.rs +++ b/src/gateway/api.rs @@ -71,7 +71,7 @@ pub struct CronAddBody { pub command: Option, pub job_type: Option, pub prompt: Option, - pub delivery: Option, + pub delivery: Option, pub session_target: Option, pub model: Option, pub allowed_tools: Option>, @@ -239,11 +239,11 @@ pub async fn handle_api_cron_list( "command": job.command, "prompt": job.prompt, "schedule": job.schedule, - "delivery": job.delivery, "next_run": job.next_run.to_rfc3339(), "last_run": job.last_run.map(|t| t.to_rfc3339()), "last_status": job.last_status, "enabled": job.enabled, + "delivery": job.delivery, }) }) .collect(); @@ -267,18 +267,38 @@ pub async fn handle_api_cron_add( return e.into_response(); } + let CronAddBody { + name, + schedule, + command, + job_type, + prompt, + delivery, + session_target, + model, + allowed_tools, + delete_after_run, + } = body; + let config = state.config.lock().clone(); let schedule = crate::cron::Schedule::Cron { - expr: body.schedule, + expr: schedule, tz: None, }; + if let Err(e) = crate::cron::validate_delivery_config(delivery.as_ref()) { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": format!("Failed to add cron job: {e}")})), + ) + .into_response(); + } // Determine job type: explicit field, or infer "agent" when prompt is provided. - let is_agent = matches!(body.job_type.as_deref(), Some("agent")) - || (body.job_type.is_none() && body.prompt.is_some()); + let is_agent = + matches!(job_type.as_deref(), Some("agent")) || (job_type.is_none() && prompt.is_some()); let result = if is_agent { - let prompt = match body.prompt.as_deref() { + let prompt = match prompt.as_deref() { Some(p) if !p.trim().is_empty() => p, _ => { return ( @@ -289,42 +309,27 @@ pub async fn handle_api_cron_add( } }; - let session_target = body - .session_target + let session_target = session_target .as_deref() .map(crate::cron::SessionTarget::parse) .unwrap_or_default(); - let delivery_config = match &body.delivery { - Some(v) => match serde_json::from_value::(v.clone()) { - Ok(cfg) => Some(cfg), - Err(e) => { - return ( - StatusCode::BAD_REQUEST, - Json(serde_json::json!({"error": format!("Invalid delivery config: {e}")})), - ) - .into_response(); - } - }, - None => None, - }; - let default_delete = matches!(schedule, crate::cron::Schedule::At { .. }); - let delete_after_run = body.delete_after_run.unwrap_or(default_delete); + let delete_after_run = delete_after_run.unwrap_or(default_delete); crate::cron::add_agent_job( &config, - body.name, + name, schedule, prompt, session_target, - body.model, - delivery_config, + model, + delivery, delete_after_run, - body.allowed_tools, + allowed_tools, ) } else { - let command = match body.command.as_deref() { + let command = match command.as_deref() { Some(c) if !c.trim().is_empty() => c, _ => { return ( @@ -335,30 +340,7 @@ pub async fn handle_api_cron_add( } }; - let mut job_result = - crate::cron::add_shell_job_with_approval(&config, body.name, schedule, command, false); - - // If delivery was provided, patch the created job to persist delivery config. - if let (Ok(ref job), Some(ref delivery_val)) = (&job_result, &body.delivery) { - match serde_json::from_value::(delivery_val.clone()) { - Ok(delivery_cfg) => { - let patch = crate::cron::CronJobPatch { - delivery: Some(delivery_cfg), - ..crate::cron::CronJobPatch::default() - }; - job_result = crate::cron::update_job(&config, &job.id, patch); - } - Err(e) => { - return ( - StatusCode::BAD_REQUEST, - Json(serde_json::json!({"error": format!("Invalid delivery config: {e}")})), - ) - .into_response(); - } - } - } - - job_result + crate::cron::add_shell_job_with_approval(&config, name, schedule, command, delivery, false) }; match result { @@ -371,8 +353,8 @@ pub async fn handle_api_cron_add( "command": job.command, "prompt": job.prompt, "schedule": job.schedule, - "delivery": job.delivery, "enabled": job.enabled, + "delivery": job.delivery, } })) .into_response(), @@ -1305,6 +1287,125 @@ pub async fn handle_api_session_delete( #[cfg(test)] mod tests { use super::*; + use crate::gateway::{nodes, AppState, GatewayRateLimiter, IdempotencyStore}; + use crate::memory::{Memory, MemoryCategory, MemoryEntry}; + use crate::providers::Provider; + use crate::security::pairing::PairingGuard; + use async_trait::async_trait; + use axum::response::IntoResponse; + use http_body_util::BodyExt; + use parking_lot::Mutex; + use std::sync::Arc; + use std::time::Duration; + + struct MockMemory; + + #[async_trait] + impl Memory for MockMemory { + fn name(&self) -> &str { + "mock" + } + + async fn store( + &self, + _key: &str, + _content: &str, + _category: MemoryCategory, + _session_id: Option<&str>, + ) -> anyhow::Result<()> { + Ok(()) + } + + async fn recall( + &self, + _query: &str, + _limit: usize, + _session_id: Option<&str>, + ) -> anyhow::Result> { + Ok(Vec::new()) + } + + async fn get(&self, _key: &str) -> anyhow::Result> { + Ok(None) + } + + async fn list( + &self, + _category: Option<&MemoryCategory>, + _session_id: Option<&str>, + ) -> anyhow::Result> { + Ok(Vec::new()) + } + + async fn forget(&self, _key: &str) -> anyhow::Result { + Ok(false) + } + + async fn count(&self) -> anyhow::Result { + Ok(0) + } + + async fn health_check(&self) -> bool { + true + } + } + + struct MockProvider; + + #[async_trait] + impl Provider for MockProvider { + async fn chat_with_system( + &self, + _system_prompt: Option<&str>, + _message: &str, + _model: &str, + _temperature: f64, + ) -> anyhow::Result { + Ok("ok".to_string()) + } + } + + fn test_state(config: crate::config::Config) -> AppState { + AppState { + config: Arc::new(Mutex::new(config)), + provider: Arc::new(MockProvider), + model: "test-model".into(), + temperature: 0.0, + mem: Arc::new(MockMemory), + auto_save: false, + webhook_secret_hash: None, + pairing: Arc::new(PairingGuard::new(false, &[])), + trust_forwarded_headers: false, + rate_limiter: Arc::new(GatewayRateLimiter::new(100, 100, 100)), + idempotency_store: Arc::new(IdempotencyStore::new(Duration::from_secs(300), 1000)), + whatsapp: None, + whatsapp_app_secret: None, + linq: None, + linq_signing_secret: None, + nextcloud_talk: None, + nextcloud_talk_webhook_secret: None, + wati: None, + observer: Arc::new(crate::observability::NoopObserver), + tools_registry: Arc::new(Vec::new()), + cost_tracker: None, + event_tx: tokio::sync::broadcast::channel(16).0, + shutdown_tx: tokio::sync::watch::channel(false).0, + node_registry: Arc::new(nodes::NodeRegistry::new(16)), + session_backend: None, + device_registry: None, + pending_pairings: None, + } + } + + async fn response_json(response: axum::response::Response) -> serde_json::Value { + let body = response + .into_body() + .collect() + .await + .expect("response body") + .to_bytes(); + serde_json::from_slice(&body).expect("valid json response") + } #[test] fn masking_keeps_toml_valid_and_preserves_api_keys_type() { @@ -1712,4 +1813,174 @@ mod tests { .iter() .all(|route| route.api_key.as_deref() != Some(MASKED_SECRET))); } + + #[tokio::test] + async fn cron_api_shell_roundtrip_includes_delivery() { + let tmp = tempfile::TempDir::new().unwrap(); + let config = crate::config::Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..crate::config::Config::default() + }; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + let state = test_state(config); + + let add_response = handle_api_cron_add( + State(state.clone()), + HeaderMap::new(), + Json( + serde_json::from_value::(serde_json::json!({ + "name": "test-job", + "schedule": "*/5 * * * *", + "command": "echo hello", + "delivery": { + "mode": "announce", + "channel": "discord", + "to": "1234567890", + "best_effort": true + } + })) + .expect("body should deserialize"), + ), + ) + .await + .into_response(); + + let add_json = response_json(add_response).await; + assert_eq!(add_json["status"], "ok"); + assert_eq!(add_json["job"]["delivery"]["mode"], "announce"); + assert_eq!(add_json["job"]["delivery"]["channel"], "discord"); + assert_eq!(add_json["job"]["delivery"]["to"], "1234567890"); + + let list_response = handle_api_cron_list(State(state), HeaderMap::new()) + .await + .into_response(); + let list_json = response_json(list_response).await; + let jobs = list_json["jobs"].as_array().expect("jobs array"); + assert_eq!(jobs.len(), 1); + assert_eq!(jobs[0]["delivery"]["mode"], "announce"); + assert_eq!(jobs[0]["delivery"]["channel"], "discord"); + assert_eq!(jobs[0]["delivery"]["to"], "1234567890"); + } + + #[tokio::test] + async fn cron_api_accepts_agent_jobs() { + let tmp = tempfile::TempDir::new().unwrap(); + let config = crate::config::Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..crate::config::Config::default() + }; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + let state = test_state(config); + + let response = handle_api_cron_add( + State(state.clone()), + HeaderMap::new(), + Json( + serde_json::from_value::(serde_json::json!({ + "name": "agent-job", + "schedule": "*/5 * * * *", + "job_type": "agent", + "command": "ignored shell command", + "prompt": "summarize the latest logs" + })) + .expect("body should deserialize"), + ), + ) + .await + .into_response(); + + let json = response_json(response).await; + assert_eq!(json["status"], "ok"); + + let config = state.config.lock().clone(); + let jobs = crate::cron::list_jobs(&config).unwrap(); + assert_eq!(jobs.len(), 1); + assert_eq!(jobs[0].job_type, crate::cron::JobType::Agent); + assert_eq!(jobs[0].prompt.as_deref(), Some("summarize the latest logs")); + } + + #[tokio::test] + async fn cron_api_rejects_announce_delivery_without_target() { + let tmp = tempfile::TempDir::new().unwrap(); + let config = crate::config::Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..crate::config::Config::default() + }; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + let state = test_state(config); + + let response = handle_api_cron_add( + State(state.clone()), + HeaderMap::new(), + Json( + serde_json::from_value::(serde_json::json!({ + "name": "invalid-delivery-job", + "schedule": "*/5 * * * *", + "command": "echo hello", + "delivery": { + "mode": "announce", + "channel": "discord" + } + })) + .expect("body should deserialize"), + ), + ) + .await + .into_response(); + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + let json = response_json(response).await; + assert!(json["error"] + .as_str() + .unwrap_or_default() + .contains("delivery.to is required")); + + let config = state.config.lock().clone(); + assert!(crate::cron::list_jobs(&config).unwrap().is_empty()); + } + + #[tokio::test] + async fn cron_api_rejects_announce_delivery_with_unsupported_channel() { + let tmp = tempfile::TempDir::new().unwrap(); + let config = crate::config::Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..crate::config::Config::default() + }; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + let state = test_state(config); + + let response = handle_api_cron_add( + State(state.clone()), + HeaderMap::new(), + Json( + serde_json::from_value::(serde_json::json!({ + "name": "invalid-delivery-job", + "schedule": "*/5 * * * *", + "command": "echo hello", + "delivery": { + "mode": "announce", + "channel": "email", + "to": "alerts@example.com" + } + })) + .expect("body should deserialize"), + ), + ) + .await + .into_response(); + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + let json = response_json(response).await; + assert!(json["error"] + .as_str() + .unwrap_or_default() + .contains("unsupported delivery channel")); + + let config = state.config.lock().clone(); + assert!(crate::cron::list_jobs(&config).unwrap().is_empty()); + } } diff --git a/src/tools/cron_add.rs b/src/tools/cron_add.rs index 6bc2d5237..293ad5c08 100644 --- a/src/tools/cron_add.rs +++ b/src/tools/cron_add.rs @@ -235,6 +235,19 @@ impl Tool for CronAddTool { .get("approved") .and_then(serde_json::Value::as_bool) .unwrap_or(false); + let delivery = match args.get("delivery") { + Some(v) => match serde_json::from_value::(v.clone()) { + Ok(cfg) => Some(cfg), + Err(e) => { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some(format!("Invalid delivery config: {e}")), + }); + } + }, + None => None, + }; let result = match job_type { JobType::Shell => { @@ -261,7 +274,14 @@ impl Tool for CronAddTool { return Ok(blocked); } - cron::add_shell_job_with_approval(&self.config, name, schedule, command, approved) + cron::add_shell_job_with_approval( + &self.config, + name, + schedule, + command, + delivery, + approved, + ) } JobType::Agent => { let prompt = match args.get("prompt").and_then(serde_json::Value::as_str) { @@ -307,20 +327,6 @@ impl Tool for CronAddTool { None => None, }; - let delivery = match args.get("delivery") { - Some(v) => match serde_json::from_value::(v.clone()) { - Ok(cfg) => Some(cfg), - Err(e) => { - return Ok(ToolResult { - success: false, - output: String::new(), - error: Some(format!("Invalid delivery config: {e}")), - }); - } - }, - None => None, - }; - if let Some(blocked) = self.enforce_mutation_allowed("cron_add") { return Ok(blocked); } @@ -406,6 +412,36 @@ mod tests { assert!(result.output.contains("next_run")); } + #[tokio::test] + async fn shell_job_persists_delivery() { + let tmp = TempDir::new().unwrap(); + let cfg = test_config(&tmp).await; + let tool = CronAddTool::new(cfg.clone(), test_security(&cfg)); + let result = tool + .execute(json!({ + "schedule": { "kind": "cron", "expr": "*/5 * * * *" }, + "job_type": "shell", + "command": "echo ok", + "delivery": { + "mode": "announce", + "channel": "discord", + "to": "1234567890", + "best_effort": true + } + })) + .await + .unwrap(); + + assert!(result.success, "{:?}", result.error); + + let jobs = cron::list_jobs(&cfg).unwrap(); + assert_eq!(jobs.len(), 1); + assert_eq!(jobs[0].delivery.mode, "announce"); + assert_eq!(jobs[0].delivery.channel.as_deref(), Some("discord")); + assert_eq!(jobs[0].delivery.to.as_deref(), Some("1234567890")); + assert!(jobs[0].delivery.best_effort); + } + #[tokio::test] async fn blocks_disallowed_shell_command() { let tmp = TempDir::new().unwrap(); diff --git a/src/tools/cron_run.rs b/src/tools/cron_run.rs index deed1d2c2..0bb55b944 100644 --- a/src/tools/cron_run.rs +++ b/src/tools/cron_run.rs @@ -244,6 +244,7 @@ mod tests { tz: None, }, "touch cron-run-approval", + None, true, ) .unwrap(); diff --git a/src/tools/schedule.rs b/src/tools/schedule.rs index 6502f1016..fcffd9601 100644 --- a/src/tools/schedule.rs +++ b/src/tools/schedule.rs @@ -315,6 +315,7 @@ impl ScheduleTool { tz: None, }, command, + None, approved, ) { Ok(job) => job,