zeroclaw/src/daemon/mod.rs
Argenis 65cb4fe099
feat(heartbeat): default interval 30→5min + prune heartbeat from auto-save (#3938)
Lower the default heartbeat interval to 5 minutes to match the renewable
partial wake-lock cadence. Add `[heartbeat task` to the memory auto-save
skip filter so heartbeat prompts (both Phase 1 decision and Phase 2 task
execution) do not pollute persistent conversation memory.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 08:17:08 -04:00

820 lines
29 KiB
Rust

use crate::config::Config;
use anyhow::Result;
use chrono::Utc;
use std::future::Future;
use std::path::PathBuf;
use tokio::task::JoinHandle;
use tokio::time::Duration;
const STATUS_FLUSH_SECONDS: u64 = 5;
/// Wait for shutdown signal (SIGINT or SIGTERM).
/// SIGHUP is explicitly ignored so the daemon survives terminal/SSH disconnects.
async fn wait_for_shutdown_signal() -> Result<()> {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;
let mut sighup = signal(SignalKind::hangup())?;
loop {
tokio::select! {
_ = sigint.recv() => {
tracing::info!("Received SIGINT, shutting down...");
break;
}
_ = sigterm.recv() => {
tracing::info!("Received SIGTERM, shutting down...");
break;
}
_ = sighup.recv() => {
tracing::info!("Received SIGHUP, ignoring (daemon stays running)");
}
}
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c().await?;
tracing::info!("Received Ctrl+C, shutting down...");
}
Ok(())
}
pub async fn run(config: Config, host: String, port: u16) -> Result<()> {
let initial_backoff = config.reliability.channel_initial_backoff_secs.max(1);
let max_backoff = config
.reliability
.channel_max_backoff_secs
.max(initial_backoff);
crate::health::mark_component_ok("daemon");
if config.heartbeat.enabled {
let _ =
crate::heartbeat::engine::HeartbeatEngine::ensure_heartbeat_file(&config.workspace_dir)
.await;
}
let mut handles: Vec<JoinHandle<()>> = vec![spawn_state_writer(config.clone())];
{
let gateway_cfg = config.clone();
let gateway_host = host.clone();
handles.push(spawn_component_supervisor(
"gateway",
initial_backoff,
max_backoff,
move || {
let cfg = gateway_cfg.clone();
let host = gateway_host.clone();
async move { Box::pin(crate::gateway::run_gateway(&host, port, cfg)).await }
},
));
}
{
if has_supervised_channels(&config) {
let channels_cfg = config.clone();
handles.push(spawn_component_supervisor(
"channels",
initial_backoff,
max_backoff,
move || {
let cfg = channels_cfg.clone();
async move { Box::pin(crate::channels::start_channels(cfg)).await }
},
));
} else {
crate::health::mark_component_ok("channels");
tracing::info!("No real-time channels configured; channel supervisor disabled");
}
}
if config.heartbeat.enabled {
let heartbeat_cfg = config.clone();
handles.push(spawn_component_supervisor(
"heartbeat",
initial_backoff,
max_backoff,
move || {
let cfg = heartbeat_cfg.clone();
async move { Box::pin(run_heartbeat_worker(cfg)).await }
},
));
}
if config.cron.enabled {
let scheduler_cfg = config.clone();
handles.push(spawn_component_supervisor(
"scheduler",
initial_backoff,
max_backoff,
move || {
let cfg = scheduler_cfg.clone();
async move { Box::pin(crate::cron::scheduler::run(cfg)).await }
},
));
} else {
crate::health::mark_component_ok("scheduler");
tracing::info!("Cron disabled; scheduler supervisor not started");
}
println!("🧠 ZeroClaw daemon started");
println!(" Gateway: http://{host}:{port}");
println!(" Components: gateway, channels, heartbeat, scheduler");
if config.gateway.require_pairing {
println!(" Pairing: enabled (code appears in gateway output above)");
}
println!(" Ctrl+C or SIGTERM to stop");
// Wait for shutdown signal (SIGINT or SIGTERM)
wait_for_shutdown_signal().await?;
crate::health::mark_component_error("daemon", "shutdown requested");
for handle in &handles {
handle.abort();
}
for handle in handles {
let _ = handle.await;
}
Ok(())
}
pub fn state_file_path(config: &Config) -> PathBuf {
config
.config_path
.parent()
.map_or_else(|| PathBuf::from("."), PathBuf::from)
.join("daemon_state.json")
}
fn spawn_state_writer(config: Config) -> JoinHandle<()> {
tokio::spawn(async move {
let path = state_file_path(&config);
if let Some(parent) = path.parent() {
let _ = tokio::fs::create_dir_all(parent).await;
}
let mut interval = tokio::time::interval(Duration::from_secs(STATUS_FLUSH_SECONDS));
loop {
interval.tick().await;
let mut json = crate::health::snapshot_json();
if let Some(obj) = json.as_object_mut() {
obj.insert(
"written_at".into(),
serde_json::json!(Utc::now().to_rfc3339()),
);
}
let data = serde_json::to_vec_pretty(&json).unwrap_or_else(|_| b"{}".to_vec());
let _ = tokio::fs::write(&path, data).await;
}
})
}
fn spawn_component_supervisor<F, Fut>(
name: &'static str,
initial_backoff_secs: u64,
max_backoff_secs: u64,
mut run_component: F,
) -> JoinHandle<()>
where
F: FnMut() -> Fut + Send + 'static,
Fut: Future<Output = Result<()>> + Send + 'static,
{
tokio::spawn(async move {
let mut backoff = initial_backoff_secs.max(1);
let max_backoff = max_backoff_secs.max(backoff);
loop {
crate::health::mark_component_ok(name);
match run_component().await {
Ok(()) => {
crate::health::mark_component_error(name, "component exited unexpectedly");
tracing::warn!("Daemon component '{name}' exited unexpectedly");
// Clean exit — reset backoff since the component ran successfully
backoff = initial_backoff_secs.max(1);
}
Err(e) => {
crate::health::mark_component_error(name, e.to_string());
tracing::error!("Daemon component '{name}' failed: {e}");
}
}
crate::health::bump_component_restart(name);
tokio::time::sleep(Duration::from_secs(backoff)).await;
// Double backoff AFTER sleeping so first error uses initial_backoff
backoff = backoff.saturating_mul(2).min(max_backoff);
}
})
}
async fn run_heartbeat_worker(config: Config) -> Result<()> {
use crate::heartbeat::engine::{
compute_adaptive_interval, HeartbeatEngine, HeartbeatTask, TaskPriority, TaskStatus,
};
use std::sync::Arc;
let observer: std::sync::Arc<dyn crate::observability::Observer> =
std::sync::Arc::from(crate::observability::create_observer(&config.observability));
let engine = HeartbeatEngine::new(
config.heartbeat.clone(),
config.workspace_dir.clone(),
observer,
);
let metrics = engine.metrics();
let delivery = resolve_heartbeat_delivery(&config)?;
let two_phase = config.heartbeat.two_phase;
let adaptive = config.heartbeat.adaptive;
let start_time = std::time::Instant::now();
// ── Deadman watcher ──────────────────────────────────────────
let deadman_timeout = config.heartbeat.deadman_timeout_minutes;
if deadman_timeout > 0 {
let dm_metrics = Arc::clone(&metrics);
let dm_config = config.clone();
let dm_delivery = delivery.clone();
tokio::spawn(async move {
let check_interval = Duration::from_secs(60);
let timeout = chrono::Duration::minutes(i64::from(deadman_timeout));
loop {
tokio::time::sleep(check_interval).await;
let last_tick = dm_metrics.lock().last_tick_at;
if let Some(last) = last_tick {
if chrono::Utc::now() - last > timeout {
let alert = format!(
"⚠️ Heartbeat dead-man's switch: no tick in {deadman_timeout} minutes"
);
let (channel, target) =
if let Some(ch) = &dm_config.heartbeat.deadman_channel {
let to = dm_config
.heartbeat
.deadman_to
.as_deref()
.or(dm_config.heartbeat.to.as_deref())
.unwrap_or_default();
(ch.clone(), to.to_string())
} else if let Some((ch, to)) = &dm_delivery {
(ch.clone(), to.clone())
} else {
continue;
};
let _ = crate::cron::scheduler::deliver_announcement(
&dm_config, &channel, &target, &alert,
)
.await;
}
}
}
});
}
let base_interval = config.heartbeat.interval_minutes.max(5);
let mut sleep_mins = base_interval;
loop {
tokio::time::sleep(Duration::from_secs(u64::from(sleep_mins) * 60)).await;
// Update uptime
{
let mut m = metrics.lock();
m.uptime_secs = start_time.elapsed().as_secs();
}
let tick_start = std::time::Instant::now();
// Collect runnable tasks (active only, sorted by priority)
let mut tasks = engine.collect_runnable_tasks().await?;
let has_high_priority = tasks.iter().any(|t| t.priority == TaskPriority::High);
if tasks.is_empty() {
if let Some(fallback) = config
.heartbeat
.message
.as_deref()
.map(str::trim)
.filter(|m| !m.is_empty())
{
tasks.push(HeartbeatTask {
text: fallback.to_string(),
priority: TaskPriority::Medium,
status: TaskStatus::Active,
});
} else {
#[allow(clippy::cast_precision_loss)]
let elapsed = tick_start.elapsed().as_millis() as f64;
metrics.lock().record_success(elapsed);
continue;
}
}
// ── Phase 1: LLM decision (two-phase mode) ──────────────
let tasks_to_run = if two_phase {
let decision_prompt = format!(
"[Heartbeat Task | decision] {}",
HeartbeatEngine::build_decision_prompt(&tasks),
);
match Box::pin(crate::agent::run(
config.clone(),
Some(decision_prompt),
None,
None,
0.0,
vec![],
false,
None,
None,
))
.await
{
Ok(response) => {
let indices = HeartbeatEngine::parse_decision_response(&response, tasks.len());
if indices.is_empty() {
tracing::info!("💓 Heartbeat Phase 1: skip (nothing to do)");
crate::health::mark_component_ok("heartbeat");
#[allow(clippy::cast_precision_loss)]
let elapsed = tick_start.elapsed().as_millis() as f64;
metrics.lock().record_success(elapsed);
continue;
}
tracing::info!(
"💓 Heartbeat Phase 1: run {} of {} tasks",
indices.len(),
tasks.len()
);
indices
.into_iter()
.filter_map(|i| tasks.get(i).cloned())
.collect()
}
Err(e) => {
tracing::warn!("💓 Heartbeat Phase 1 failed, running all tasks: {e}");
tasks
}
}
} else {
tasks
};
// ── Phase 2: Execute selected tasks ─────────────────────
let mut tick_had_error = false;
for task in &tasks_to_run {
let task_start = std::time::Instant::now();
let prompt = format!("[Heartbeat Task | {}] {}", task.priority, task.text);
let temp = config.default_temperature;
match Box::pin(crate::agent::run(
config.clone(),
Some(prompt),
None,
None,
temp,
vec![],
false,
None,
None,
))
.await
{
Ok(output) => {
crate::health::mark_component_ok("heartbeat");
#[allow(clippy::cast_possible_truncation)]
let duration_ms = task_start.elapsed().as_millis() as i64;
let now = chrono::Utc::now();
let _ = crate::heartbeat::store::record_run(
&config.workspace_dir,
&task.text,
&task.priority.to_string(),
now - chrono::Duration::milliseconds(duration_ms),
now,
"ok",
Some(output.as_str()),
duration_ms,
config.heartbeat.max_run_history,
);
let announcement = if output.trim().is_empty() {
format!("💓 heartbeat task completed: {}", task.text)
} else {
output
};
if let Some((channel, target)) = &delivery {
if let Err(e) = crate::cron::scheduler::deliver_announcement(
&config,
channel,
target,
&announcement,
)
.await
{
crate::health::mark_component_error(
"heartbeat",
format!("delivery failed: {e}"),
);
tracing::warn!("Heartbeat delivery failed: {e}");
}
}
}
Err(e) => {
tick_had_error = true;
#[allow(clippy::cast_possible_truncation)]
let duration_ms = task_start.elapsed().as_millis() as i64;
let now = chrono::Utc::now();
let _ = crate::heartbeat::store::record_run(
&config.workspace_dir,
&task.text,
&task.priority.to_string(),
now - chrono::Duration::milliseconds(duration_ms),
now,
"error",
Some(&e.to_string()),
duration_ms,
config.heartbeat.max_run_history,
);
crate::health::mark_component_error("heartbeat", e.to_string());
tracing::warn!("Heartbeat task failed: {e}");
}
}
}
// Update metrics
#[allow(clippy::cast_precision_loss)]
let tick_elapsed = tick_start.elapsed().as_millis() as f64;
{
let mut m = metrics.lock();
if tick_had_error {
m.record_failure(tick_elapsed);
} else {
m.record_success(tick_elapsed);
}
}
// Compute next sleep interval
if adaptive {
let failures = metrics.lock().consecutive_failures;
sleep_mins = compute_adaptive_interval(
base_interval,
config.heartbeat.min_interval_minutes,
config.heartbeat.max_interval_minutes,
failures,
has_high_priority,
);
} else {
sleep_mins = base_interval;
}
}
}
/// Resolve delivery target: explicit config > auto-detect first configured channel.
fn resolve_heartbeat_delivery(config: &Config) -> Result<Option<(String, String)>> {
let channel = config
.heartbeat
.target
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty());
let target = config
.heartbeat
.to
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty());
match (channel, target) {
// Both explicitly set — validate and use.
(Some(channel), Some(target)) => {
validate_heartbeat_channel_config(config, channel)?;
Ok(Some((channel.to_string(), target.to_string())))
}
// Only one set — error.
(Some(_), None) => anyhow::bail!("heartbeat.to is required when heartbeat.target is set"),
(None, Some(_)) => anyhow::bail!("heartbeat.target is required when heartbeat.to is set"),
// Neither set — try auto-detect the first configured channel.
(None, None) => Ok(auto_detect_heartbeat_channel(config)),
}
}
/// Auto-detect the best channel for heartbeat delivery by checking which
/// channels are configured. Returns the first match in priority order.
fn auto_detect_heartbeat_channel(config: &Config) -> Option<(String, String)> {
// Priority order: telegram > discord > slack > mattermost
if let Some(tg) = &config.channels_config.telegram {
// Use the first allowed_user as target, or fall back to empty (broadcast)
let target = tg.allowed_users.first().cloned().unwrap_or_default();
if !target.is_empty() {
return Some(("telegram".to_string(), target));
}
}
if config.channels_config.discord.is_some() {
// Discord requires explicit target — can't auto-detect
return None;
}
if config.channels_config.slack.is_some() {
// Slack requires explicit target
return None;
}
if config.channels_config.mattermost.is_some() {
// Mattermost requires explicit target
return None;
}
None
}
fn validate_heartbeat_channel_config(config: &Config, channel: &str) -> Result<()> {
match channel.to_ascii_lowercase().as_str() {
"telegram" => {
if config.channels_config.telegram.is_none() {
anyhow::bail!(
"heartbeat.target is set to telegram but channels_config.telegram is not configured"
);
}
}
"discord" => {
if config.channels_config.discord.is_none() {
anyhow::bail!(
"heartbeat.target is set to discord but channels_config.discord is not configured"
);
}
}
"slack" => {
if config.channels_config.slack.is_none() {
anyhow::bail!(
"heartbeat.target is set to slack but channels_config.slack is not configured"
);
}
}
"mattermost" => {
if config.channels_config.mattermost.is_none() {
anyhow::bail!(
"heartbeat.target is set to mattermost but channels_config.mattermost is not configured"
);
}
}
other => anyhow::bail!("unsupported heartbeat.target channel: {other}"),
}
Ok(())
}
fn has_supervised_channels(config: &Config) -> bool {
config
.channels_config
.channels_except_webhook()
.iter()
.any(|(_, ok)| *ok)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn test_config(tmp: &TempDir) -> Config {
let config = Config {
workspace_dir: tmp.path().join("workspace"),
config_path: tmp.path().join("config.toml"),
..Config::default()
};
std::fs::create_dir_all(&config.workspace_dir).unwrap();
config
}
#[test]
fn state_file_path_uses_config_directory() {
let tmp = TempDir::new().unwrap();
let config = test_config(&tmp);
let path = state_file_path(&config);
assert_eq!(path, tmp.path().join("daemon_state.json"));
}
#[tokio::test]
async fn supervisor_marks_error_and_restart_on_failure() {
let handle = spawn_component_supervisor("daemon-test-fail", 1, 1, || async {
anyhow::bail!("boom")
});
tokio::time::sleep(Duration::from_millis(50)).await;
handle.abort();
let _ = handle.await;
let snapshot = crate::health::snapshot_json();
let component = &snapshot["components"]["daemon-test-fail"];
assert_eq!(component["status"], "error");
assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
assert!(component["last_error"]
.as_str()
.unwrap_or("")
.contains("boom"));
}
#[tokio::test]
async fn supervisor_marks_unexpected_exit_as_error() {
let handle = spawn_component_supervisor("daemon-test-exit", 1, 1, || async { Ok(()) });
tokio::time::sleep(Duration::from_millis(50)).await;
handle.abort();
let _ = handle.await;
let snapshot = crate::health::snapshot_json();
let component = &snapshot["components"]["daemon-test-exit"];
assert_eq!(component["status"], "error");
assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
assert!(component["last_error"]
.as_str()
.unwrap_or("")
.contains("component exited unexpectedly"));
}
#[test]
fn detects_no_supervised_channels() {
let config = Config::default();
assert!(!has_supervised_channels(&config));
}
#[test]
fn detects_supervised_channels_present() {
let mut config = Config::default();
config.channels_config.telegram = Some(crate::config::TelegramConfig {
bot_token: "token".into(),
allowed_users: vec![],
stream_mode: crate::config::StreamMode::default(),
draft_update_interval_ms: 1000,
interrupt_on_new_message: false,
mention_only: false,
ack_reactions: None,
});
assert!(has_supervised_channels(&config));
}
#[test]
fn detects_dingtalk_as_supervised_channel() {
let mut config = Config::default();
config.channels_config.dingtalk = Some(crate::config::schema::DingTalkConfig {
client_id: "client_id".into(),
client_secret: "client_secret".into(),
allowed_users: vec!["*".into()],
});
assert!(has_supervised_channels(&config));
}
#[test]
fn detects_mattermost_as_supervised_channel() {
let mut config = Config::default();
config.channels_config.mattermost = Some(crate::config::schema::MattermostConfig {
url: "https://mattermost.example.com".into(),
bot_token: "token".into(),
channel_id: Some("channel-id".into()),
allowed_users: vec!["*".into()],
thread_replies: Some(true),
mention_only: Some(false),
});
assert!(has_supervised_channels(&config));
}
#[test]
fn detects_qq_as_supervised_channel() {
let mut config = Config::default();
config.channels_config.qq = Some(crate::config::schema::QQConfig {
app_id: "app-id".into(),
app_secret: "app-secret".into(),
allowed_users: vec!["*".into()],
});
assert!(has_supervised_channels(&config));
}
#[test]
fn detects_nextcloud_talk_as_supervised_channel() {
let mut config = Config::default();
config.channels_config.nextcloud_talk = Some(crate::config::schema::NextcloudTalkConfig {
base_url: "https://cloud.example.com".into(),
app_token: "app-token".into(),
webhook_secret: None,
allowed_users: vec!["*".into()],
});
assert!(has_supervised_channels(&config));
}
#[test]
fn resolve_delivery_none_when_unset() {
let config = Config::default();
let target = resolve_heartbeat_delivery(&config).unwrap();
assert!(target.is_none());
}
#[test]
fn resolve_delivery_requires_to_field() {
let mut config = Config::default();
config.heartbeat.target = Some("telegram".into());
let err = resolve_heartbeat_delivery(&config).unwrap_err();
assert!(err
.to_string()
.contains("heartbeat.to is required when heartbeat.target is set"));
}
#[test]
fn resolve_delivery_requires_target_field() {
let mut config = Config::default();
config.heartbeat.to = Some("123456".into());
let err = resolve_heartbeat_delivery(&config).unwrap_err();
assert!(err
.to_string()
.contains("heartbeat.target is required when heartbeat.to is set"));
}
#[test]
fn resolve_delivery_rejects_unsupported_channel() {
let mut config = Config::default();
config.heartbeat.target = Some("email".into());
config.heartbeat.to = Some("ops@example.com".into());
let err = resolve_heartbeat_delivery(&config).unwrap_err();
assert!(err
.to_string()
.contains("unsupported heartbeat.target channel"));
}
#[test]
fn resolve_delivery_requires_channel_configuration() {
let mut config = Config::default();
config.heartbeat.target = Some("telegram".into());
config.heartbeat.to = Some("123456".into());
let err = resolve_heartbeat_delivery(&config).unwrap_err();
assert!(err
.to_string()
.contains("channels_config.telegram is not configured"));
}
#[test]
fn resolve_delivery_accepts_telegram_configuration() {
let mut config = Config::default();
config.heartbeat.target = Some("telegram".into());
config.heartbeat.to = Some("123456".into());
config.channels_config.telegram = Some(crate::config::TelegramConfig {
bot_token: "bot-token".into(),
allowed_users: vec![],
stream_mode: crate::config::StreamMode::default(),
draft_update_interval_ms: 1000,
interrupt_on_new_message: false,
mention_only: false,
ack_reactions: None,
});
let target = resolve_heartbeat_delivery(&config).unwrap();
assert_eq!(target, Some(("telegram".to_string(), "123456".to_string())));
}
#[test]
fn auto_detect_telegram_when_configured() {
let mut config = Config::default();
config.channels_config.telegram = Some(crate::config::TelegramConfig {
bot_token: "bot-token".into(),
allowed_users: vec!["user123".into()],
stream_mode: crate::config::StreamMode::default(),
draft_update_interval_ms: 1000,
interrupt_on_new_message: false,
mention_only: false,
ack_reactions: None,
});
let target = resolve_heartbeat_delivery(&config).unwrap();
assert_eq!(
target,
Some(("telegram".to_string(), "user123".to_string()))
);
}
#[test]
fn auto_detect_none_when_no_channels() {
let config = Config::default();
let target = auto_detect_heartbeat_channel(&config);
assert!(target.is_none());
}
/// Verify that SIGHUP does not cause shutdown — the daemon should ignore it
/// and only terminate on SIGINT or SIGTERM.
#[cfg(unix)]
#[tokio::test]
async fn sighup_does_not_shut_down_daemon() {
use libc;
use tokio::time::{timeout, Duration};
let handle = tokio::spawn(wait_for_shutdown_signal());
// Give the signal handler time to register
tokio::time::sleep(Duration::from_millis(50)).await;
// Send SIGHUP to ourselves — should be ignored by the handler
unsafe { libc::raise(libc::SIGHUP) };
// The future should NOT complete within a short window
let result = timeout(Duration::from_millis(200), handle).await;
assert!(
result.is_err(),
"wait_for_shutdown_signal should not return after SIGHUP"
);
}
}