feat(heartbeat): add dedupe and per-tick runtime caps
This commit is contained in:
+138
-1
@@ -1,8 +1,10 @@
|
||||
use crate::config::Config;
|
||||
use anyhow::{bail, Result};
|
||||
use chrono::Utc;
|
||||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Instant;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
|
||||
@@ -200,16 +202,41 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
|
||||
let delivery = heartbeat_delivery_target(&config)?;
|
||||
|
||||
let interval_mins = config.heartbeat.interval_minutes.max(5);
|
||||
let dedupe_window = Duration::from_secs(u64::from(config.heartbeat.dedupe_window_minutes) * 60);
|
||||
let mut recently_executed_tasks: HashMap<String, Instant> = HashMap::new();
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(u64::from(interval_mins) * 60));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
let file_tasks = engine.collect_tasks().await?;
|
||||
let tasks = heartbeat_tasks_for_tick(file_tasks, config.heartbeat.message.as_deref());
|
||||
let candidate_tasks =
|
||||
heartbeat_tasks_for_tick(file_tasks, config.heartbeat.message.as_deref());
|
||||
let candidate_count = candidate_tasks.len();
|
||||
let tasks = apply_heartbeat_runtime_policy(
|
||||
candidate_tasks,
|
||||
config.heartbeat.max_tasks_per_tick,
|
||||
dedupe_window,
|
||||
&mut recently_executed_tasks,
|
||||
Instant::now(),
|
||||
);
|
||||
if tasks.is_empty() {
|
||||
if candidate_count > 0 {
|
||||
tracing::debug!(
|
||||
"Heartbeat runtime policy skipped all candidate tasks (dedupe/cap active)"
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if tasks.len() < candidate_count {
|
||||
tracing::info!(
|
||||
selected = tasks.len(),
|
||||
candidates = candidate_count,
|
||||
max_tasks_per_tick = config.heartbeat.max_tasks_per_tick,
|
||||
dedupe_window_minutes = config.heartbeat.dedupe_window_minutes,
|
||||
"Heartbeat runtime policy filtered candidate tasks"
|
||||
);
|
||||
}
|
||||
|
||||
for task in tasks {
|
||||
let prompt = format!("[Heartbeat Task] {task}");
|
||||
@@ -294,6 +321,53 @@ fn heartbeat_tasks_for_tick(
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn apply_heartbeat_runtime_policy(
|
||||
tasks: Vec<String>,
|
||||
max_tasks_per_tick: usize,
|
||||
dedupe_window: Duration,
|
||||
recently_executed_tasks: &mut HashMap<String, Instant>,
|
||||
now: Instant,
|
||||
) -> Vec<String> {
|
||||
if max_tasks_per_tick == 0 {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
if dedupe_window.is_zero() {
|
||||
return tasks.into_iter().take(max_tasks_per_tick).collect();
|
||||
}
|
||||
|
||||
recently_executed_tasks.retain(|_, seen_at| {
|
||||
now.checked_duration_since(*seen_at)
|
||||
.unwrap_or_default()
|
||||
.lt(&dedupe_window)
|
||||
});
|
||||
|
||||
let mut selected = Vec::new();
|
||||
for task in tasks {
|
||||
let dedupe_key = task.trim().to_ascii_lowercase();
|
||||
if dedupe_key.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let seen_recently = recently_executed_tasks
|
||||
.get(&dedupe_key)
|
||||
.is_some_and(|seen_at| {
|
||||
now.checked_duration_since(*seen_at)
|
||||
.unwrap_or_default()
|
||||
.lt(&dedupe_window)
|
||||
});
|
||||
if seen_recently {
|
||||
continue;
|
||||
}
|
||||
|
||||
recently_executed_tasks.insert(dedupe_key, now);
|
||||
selected.push(task);
|
||||
if selected.len() >= max_tasks_per_tick {
|
||||
break;
|
||||
}
|
||||
}
|
||||
selected
|
||||
}
|
||||
|
||||
fn heartbeat_delivery_target(config: &Config) -> Result<Option<(String, String)>> {
|
||||
let channel = config
|
||||
.heartbeat
|
||||
@@ -576,6 +650,69 @@ mod tests {
|
||||
assert!(tasks.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heartbeat_runtime_policy_limits_tasks_per_tick() {
|
||||
let now = Instant::now();
|
||||
let mut recent = HashMap::new();
|
||||
let tasks = apply_heartbeat_runtime_policy(
|
||||
vec![
|
||||
"task-a".to_string(),
|
||||
"task-b".to_string(),
|
||||
"task-c".to_string(),
|
||||
],
|
||||
2,
|
||||
Duration::ZERO,
|
||||
&mut recent,
|
||||
now,
|
||||
);
|
||||
assert_eq!(tasks, vec!["task-a".to_string(), "task-b".to_string()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heartbeat_runtime_policy_dedupes_recent_tasks_case_insensitive() {
|
||||
let now = Instant::now();
|
||||
let mut recent = HashMap::new();
|
||||
let window = Duration::from_secs(300);
|
||||
|
||||
let first = apply_heartbeat_runtime_policy(
|
||||
vec!["Task-A".to_string(), "Task-B".to_string()],
|
||||
5,
|
||||
window,
|
||||
&mut recent,
|
||||
now,
|
||||
);
|
||||
assert_eq!(first.len(), 2);
|
||||
|
||||
let second = apply_heartbeat_runtime_policy(
|
||||
vec!["task-a".to_string(), "task-c".to_string()],
|
||||
5,
|
||||
window,
|
||||
&mut recent,
|
||||
now + Duration::from_secs(60),
|
||||
);
|
||||
assert_eq!(second, vec!["task-c".to_string()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heartbeat_runtime_policy_allows_task_after_dedupe_window() {
|
||||
let now = Instant::now();
|
||||
let mut recent = HashMap::new();
|
||||
let window = Duration::from_secs(60);
|
||||
|
||||
let first =
|
||||
apply_heartbeat_runtime_policy(vec!["task-a".to_string()], 5, window, &mut recent, now);
|
||||
assert_eq!(first, vec!["task-a".to_string()]);
|
||||
|
||||
let second = apply_heartbeat_runtime_policy(
|
||||
vec!["task-a".to_string()],
|
||||
5,
|
||||
window,
|
||||
&mut recent,
|
||||
now + Duration::from_secs(61),
|
||||
);
|
||||
assert_eq!(second, vec!["task-a".to_string()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heartbeat_announcement_text_skips_no_reply_sentinel() {
|
||||
assert!(heartbeat_announcement_text(" NO_reply ").is_none());
|
||||
|
||||
Reference in New Issue
Block a user