From 898e102510fb9998e58d5899cb85e55c65ee7ec4 Mon Sep 17 00:00:00 2001 From: argenis de la rosa Date: Sun, 1 Mar 2026 00:50:51 -0500 Subject: [PATCH] feat(heartbeat): add dedupe and per-tick runtime caps --- src/config/schema.rs | 47 +++++++++++++++ src/daemon/mod.rs | 139 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 185 insertions(+), 1 deletion(-) diff --git a/src/config/schema.rs b/src/config/schema.rs index c99c31779..24e6cf253 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -3935,6 +3935,12 @@ pub struct HeartbeatConfig { pub enabled: bool, /// Interval in minutes between heartbeat pings. Default: `30`. pub interval_minutes: u32, + /// Maximum heartbeat tasks to execute per tick. Default: `3`. + #[serde(default = "default_heartbeat_max_tasks_per_tick")] + pub max_tasks_per_tick: usize, + /// Skip duplicate task text within this cooldown window (minutes). Default: `0` (disabled). + #[serde(default = "default_heartbeat_dedupe_window_minutes")] + pub dedupe_window_minutes: u32, /// Optional fallback task text when `HEARTBEAT.md` has no task entries. #[serde(default)] pub message: Option, @@ -3946,11 +3952,21 @@ pub struct HeartbeatConfig { pub to: Option, } +fn default_heartbeat_max_tasks_per_tick() -> usize { + 3 +} + +fn default_heartbeat_dedupe_window_minutes() -> u32 { + 0 +} + impl Default for HeartbeatConfig { fn default() -> Self { Self { enabled: false, interval_minutes: 30, + max_tasks_per_tick: default_heartbeat_max_tasks_per_tick(), + dedupe_window_minutes: default_heartbeat_dedupe_window_minutes(), message: None, target: None, to: None, @@ -7861,6 +7877,12 @@ impl Config { } // Scheduler + if self.heartbeat.interval_minutes == 0 { + anyhow::bail!("heartbeat.interval_minutes must be greater than 0"); + } + if self.heartbeat.max_tasks_per_tick == 0 { + anyhow::bail!("heartbeat.max_tasks_per_tick must be greater than 0"); + } if self.scheduler.max_concurrent == 0 { anyhow::bail!("scheduler.max_concurrent must be greater than 0"); } @@ -9314,6 +9336,8 @@ allowed_roots = [] let h = HeartbeatConfig::default(); assert!(!h.enabled); assert_eq!(h.interval_minutes, 30); + assert_eq!(h.max_tasks_per_tick, 3); + assert_eq!(h.dedupe_window_minutes, 0); assert!(h.message.is_none()); assert!(h.target.is_none()); assert!(h.to.is_none()); @@ -9324,6 +9348,8 @@ allowed_roots = [] let raw = r#" enabled = true interval_minutes = 10 +max_tasks_per_tick = 5 +dedupe_window_minutes = 30 message = "Ping" channel = "telegram" recipient = "42" @@ -9331,6 +9357,8 @@ recipient = "42" let parsed: HeartbeatConfig = toml::from_str(raw).unwrap(); assert!(parsed.enabled); assert_eq!(parsed.interval_minutes, 10); + assert_eq!(parsed.max_tasks_per_tick, 5); + assert_eq!(parsed.dedupe_window_minutes, 30); assert_eq!(parsed.message.as_deref(), Some("Ping")); assert_eq!(parsed.target.as_deref(), Some("telegram")); assert_eq!(parsed.to.as_deref(), Some("42")); @@ -9496,6 +9524,8 @@ ws_url = "ws://127.0.0.1:3002" heartbeat: HeartbeatConfig { enabled: true, interval_minutes: 15, + max_tasks_per_tick: 4, + dedupe_window_minutes: 10, message: Some("Check London time".into()), target: Some("telegram".into()), to: Some("123456".into()), @@ -9583,6 +9613,8 @@ ws_url = "ws://127.0.0.1:3002" assert_eq!(parsed.runtime.kind, "docker"); assert!(parsed.heartbeat.enabled); assert_eq!(parsed.heartbeat.interval_minutes, 15); + assert_eq!(parsed.heartbeat.max_tasks_per_tick, 4); + assert_eq!(parsed.heartbeat.dedupe_window_minutes, 10); assert_eq!( parsed.heartbeat.message.as_deref(), Some("Check London time") @@ -9611,6 +9643,8 @@ default_temperature = 0.7 assert_eq!(parsed.autonomy.level, AutonomyLevel::Supervised); assert_eq!(parsed.runtime.kind, "native"); assert!(!parsed.heartbeat.enabled); + assert_eq!(parsed.heartbeat.max_tasks_per_tick, 3); + assert_eq!(parsed.heartbeat.dedupe_window_minutes, 0); assert!(parsed.channels_config.cli); assert!(parsed.memory.hygiene_enabled); assert_eq!(parsed.memory.archive_after_days, 7); @@ -13703,6 +13737,19 @@ sensitivity = 0.9 assert!(err.to_string().contains("alert_cooldown_secs")); } + #[test] + async fn heartbeat_validation_rejects_zero_max_tasks_per_tick() { + let mut config = Config::default(); + config.heartbeat.max_tasks_per_tick = 0; + + let err = config + .validate() + .expect_err("expected heartbeat max_tasks_per_tick validation failure"); + assert!(err + .to_string() + .contains("heartbeat.max_tasks_per_tick must be greater than 0")); + } + #[test] async fn security_validation_rejects_denied_threshold_above_total_threshold() { let mut config = Config::default(); diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 1d948e51b..f6eceb706 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -1,8 +1,10 @@ use crate::config::Config; use anyhow::{bail, Result}; use chrono::Utc; +use std::collections::HashMap; use std::future::Future; use std::path::PathBuf; +use std::time::Instant; use tokio::task::JoinHandle; use tokio::time::Duration; @@ -200,16 +202,41 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> { let delivery = heartbeat_delivery_target(&config)?; let interval_mins = config.heartbeat.interval_minutes.max(5); + let dedupe_window = Duration::from_secs(u64::from(config.heartbeat.dedupe_window_minutes) * 60); + let mut recently_executed_tasks: HashMap = HashMap::new(); let mut interval = tokio::time::interval(Duration::from_secs(u64::from(interval_mins) * 60)); loop { interval.tick().await; let file_tasks = engine.collect_tasks().await?; - let tasks = heartbeat_tasks_for_tick(file_tasks, config.heartbeat.message.as_deref()); + let candidate_tasks = + heartbeat_tasks_for_tick(file_tasks, config.heartbeat.message.as_deref()); + let candidate_count = candidate_tasks.len(); + let tasks = apply_heartbeat_runtime_policy( + candidate_tasks, + config.heartbeat.max_tasks_per_tick, + dedupe_window, + &mut recently_executed_tasks, + Instant::now(), + ); if tasks.is_empty() { + if candidate_count > 0 { + tracing::debug!( + "Heartbeat runtime policy skipped all candidate tasks (dedupe/cap active)" + ); + } continue; } + if tasks.len() < candidate_count { + tracing::info!( + selected = tasks.len(), + candidates = candidate_count, + max_tasks_per_tick = config.heartbeat.max_tasks_per_tick, + dedupe_window_minutes = config.heartbeat.dedupe_window_minutes, + "Heartbeat runtime policy filtered candidate tasks" + ); + } for task in tasks { let prompt = format!("[Heartbeat Task] {task}"); @@ -294,6 +321,53 @@ fn heartbeat_tasks_for_tick( .unwrap_or_default() } +fn apply_heartbeat_runtime_policy( + tasks: Vec, + max_tasks_per_tick: usize, + dedupe_window: Duration, + recently_executed_tasks: &mut HashMap, + now: Instant, +) -> Vec { + if max_tasks_per_tick == 0 { + return Vec::new(); + } + + if dedupe_window.is_zero() { + return tasks.into_iter().take(max_tasks_per_tick).collect(); + } + + recently_executed_tasks.retain(|_, seen_at| { + now.checked_duration_since(*seen_at) + .unwrap_or_default() + .lt(&dedupe_window) + }); + + let mut selected = Vec::new(); + for task in tasks { + let dedupe_key = task.trim().to_ascii_lowercase(); + if dedupe_key.is_empty() { + continue; + } + let seen_recently = recently_executed_tasks + .get(&dedupe_key) + .is_some_and(|seen_at| { + now.checked_duration_since(*seen_at) + .unwrap_or_default() + .lt(&dedupe_window) + }); + if seen_recently { + continue; + } + + recently_executed_tasks.insert(dedupe_key, now); + selected.push(task); + if selected.len() >= max_tasks_per_tick { + break; + } + } + selected +} + fn heartbeat_delivery_target(config: &Config) -> Result> { let channel = config .heartbeat @@ -576,6 +650,69 @@ mod tests { assert!(tasks.is_empty()); } + #[test] + fn heartbeat_runtime_policy_limits_tasks_per_tick() { + let now = Instant::now(); + let mut recent = HashMap::new(); + let tasks = apply_heartbeat_runtime_policy( + vec![ + "task-a".to_string(), + "task-b".to_string(), + "task-c".to_string(), + ], + 2, + Duration::ZERO, + &mut recent, + now, + ); + assert_eq!(tasks, vec!["task-a".to_string(), "task-b".to_string()]); + } + + #[test] + fn heartbeat_runtime_policy_dedupes_recent_tasks_case_insensitive() { + let now = Instant::now(); + let mut recent = HashMap::new(); + let window = Duration::from_secs(300); + + let first = apply_heartbeat_runtime_policy( + vec!["Task-A".to_string(), "Task-B".to_string()], + 5, + window, + &mut recent, + now, + ); + assert_eq!(first.len(), 2); + + let second = apply_heartbeat_runtime_policy( + vec!["task-a".to_string(), "task-c".to_string()], + 5, + window, + &mut recent, + now + Duration::from_secs(60), + ); + assert_eq!(second, vec!["task-c".to_string()]); + } + + #[test] + fn heartbeat_runtime_policy_allows_task_after_dedupe_window() { + let now = Instant::now(); + let mut recent = HashMap::new(); + let window = Duration::from_secs(60); + + let first = + apply_heartbeat_runtime_policy(vec!["task-a".to_string()], 5, window, &mut recent, now); + assert_eq!(first, vec!["task-a".to_string()]); + + let second = apply_heartbeat_runtime_policy( + vec!["task-a".to_string()], + 5, + window, + &mut recent, + now + Duration::from_secs(61), + ); + assert_eq!(second, vec!["task-a".to_string()]); + } + #[test] fn heartbeat_announcement_text_skips_no_reply_sentinel() { assert!(heartbeat_announcement_text(" NO_reply ").is_none());