From c611ffa43b4ecc8a7bad28077990472c81ee9ecc Mon Sep 17 00:00:00 2001 From: Chummy Date: Fri, 20 Feb 2026 21:33:23 +0800 Subject: [PATCH] fix(scheduler): harden idle health heartbeat behavior --- src/cron/scheduler.rs | 75 +++++++++++++++++++++++++++++++++---------- 1 file changed, 58 insertions(+), 17 deletions(-) diff --git a/src/cron/scheduler.rs b/src/cron/scheduler.rs index 09b288ea0..b6f4d690c 100644 --- a/src/cron/scheduler.rs +++ b/src/cron/scheduler.rs @@ -17,6 +17,7 @@ use tokio::time::{self, Duration}; const MIN_POLL_SECONDS: u64 = 5; const SHELL_JOB_TIMEOUT_SECS: u64 = 120; +const SCHEDULER_COMPONENT: &str = "scheduler"; pub async fn run(config: Config) -> Result<()> { let poll_secs = config.reliability.scheduler_poll_secs.max(MIN_POLL_SECONDS); @@ -27,23 +28,23 @@ pub async fn run(config: Config) -> Result<()> { &config.workspace_dir, )); - crate::health::mark_component_ok("scheduler"); + crate::health::mark_component_ok(SCHEDULER_COMPONENT); loop { interval.tick().await; // Keep scheduler liveness fresh even when there are no due jobs. - crate::health::mark_component_ok("scheduler"); + crate::health::mark_component_ok(SCHEDULER_COMPONENT); let jobs = match due_jobs(&config, Utc::now()) { Ok(jobs) => jobs, Err(e) => { - crate::health::mark_component_error("scheduler", e.to_string()); + crate::health::mark_component_error(SCHEDULER_COMPONENT, e.to_string()); tracing::warn!("Scheduler query failed: {e}"); continue; } }; - process_due_jobs(&config, &security, jobs).await; + process_due_jobs(&config, &security, jobs, SCHEDULER_COMPONENT).await; } } @@ -87,14 +88,28 @@ async fn execute_job_with_retry( (false, last_output) } -async fn process_due_jobs(config: &Config, security: &Arc, jobs: Vec) { +async fn process_due_jobs( + config: &Config, + security: &Arc, + jobs: Vec, + component: &str, +) { + // Refresh scheduler health on every successful poll cycle, including idle cycles. + crate::health::mark_component_ok(component); + let max_concurrent = config.scheduler.max_concurrent.max(1); - let mut in_flight = stream::iter(jobs.into_iter().map(|job| { - let config = config.clone(); - let security = Arc::clone(security); - async move { execute_and_persist_job(&config, security.as_ref(), &job).await } - })) - .buffer_unordered(max_concurrent); + let mut in_flight = + stream::iter( + jobs.into_iter().map(|job| { + let config = config.clone(); + let security = Arc::clone(security); + let component = component.to_owned(); + async move { + execute_and_persist_job(&config, security.as_ref(), &job, &component).await + } + }), + ) + .buffer_unordered(max_concurrent); while let Some((job_id, success)) = in_flight.next().await { if !success { @@ -107,8 +122,9 @@ async fn execute_and_persist_job( config: &Config, security: &SecurityPolicy, job: &CronJob, + component: &str, ) -> (String, bool) { - crate::health::mark_component_ok("scheduler"); + crate::health::mark_component_ok(component); warn_if_high_frequency_agent_job(job); let started_at = Utc::now(); @@ -539,6 +555,10 @@ mod tests { } } + fn unique_component(prefix: &str) -> String { + format!("{prefix}-{}", uuid::Uuid::new_v4()) + } + #[tokio::test] async fn run_job_command_success() { let tmp = TempDir::new().unwrap(); @@ -720,7 +740,27 @@ mod tests { } #[tokio::test] - async fn process_due_jobs_failure_does_not_mark_scheduler_unhealthy() { + async fn process_due_jobs_marks_component_ok_even_when_idle() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp).await; + let security = Arc::new(SecurityPolicy::from_config( + &config.autonomy, + &config.workspace_dir, + )); + let component = unique_component("scheduler-idle"); + + crate::health::mark_component_error(&component, "pre-existing error"); + process_due_jobs(&config, &security, Vec::new(), &component).await; + + let snapshot = crate::health::snapshot_json(); + let entry = &snapshot["components"][component.as_str()]; + assert_eq!(entry["status"], "ok"); + assert!(entry["last_ok"].as_str().is_some()); + assert!(entry["last_error"].is_null()); + } + + #[tokio::test] + async fn process_due_jobs_failure_does_not_mark_component_unhealthy() { let tmp = TempDir::new().unwrap(); let config = test_config(&tmp).await; let job = test_job("ls definitely_missing_file_for_scheduler_component_health_test"); @@ -728,13 +768,14 @@ mod tests { &config.autonomy, &config.workspace_dir, )); + let component = unique_component("scheduler-fail"); - crate::health::mark_component_ok("scheduler"); - process_due_jobs(&config, &security, vec![job]).await; + crate::health::mark_component_ok(&component); + process_due_jobs(&config, &security, vec![job], &component).await; let snapshot = crate::health::snapshot_json(); - let scheduler = &snapshot["components"]["scheduler"]; - assert_eq!(scheduler["status"], "ok"); + let entry = &snapshot["components"][component.as_str()]; + assert_eq!(entry["status"], "ok"); } #[tokio::test]