use crate::config::Config; use anyhow::Result; use chrono::Utc; use std::future::Future; use std::path::PathBuf; use tokio::task::JoinHandle; use tokio::time::Duration; const STATUS_FLUSH_SECONDS: u64 = 5; /// Wait for shutdown signal (SIGINT or SIGTERM). /// SIGHUP is explicitly ignored so the daemon survives terminal/SSH disconnects. async fn wait_for_shutdown_signal() -> Result<()> { #[cfg(unix)] { use tokio::signal::unix::{signal, SignalKind}; let mut sigint = signal(SignalKind::interrupt())?; let mut sigterm = signal(SignalKind::terminate())?; let mut sighup = signal(SignalKind::hangup())?; loop { tokio::select! { _ = sigint.recv() => { tracing::info!("Received SIGINT, shutting down..."); break; } _ = sigterm.recv() => { tracing::info!("Received SIGTERM, shutting down..."); break; } _ = sighup.recv() => { tracing::info!("Received SIGHUP, ignoring (daemon stays running)"); } } } } #[cfg(not(unix))] { tokio::signal::ctrl_c().await?; tracing::info!("Received Ctrl+C, shutting down..."); } Ok(()) } pub async fn run(config: Config, host: String, port: u16) -> Result<()> { let initial_backoff = config.reliability.channel_initial_backoff_secs.max(1); let max_backoff = config .reliability .channel_max_backoff_secs .max(initial_backoff); crate::health::mark_component_ok("daemon"); if config.heartbeat.enabled { let _ = crate::heartbeat::engine::HeartbeatEngine::ensure_heartbeat_file(&config.workspace_dir) .await; } let mut handles: Vec> = vec![spawn_state_writer(config.clone())]; { let gateway_cfg = config.clone(); let gateway_host = host.clone(); handles.push(spawn_component_supervisor( "gateway", initial_backoff, max_backoff, move || { let cfg = gateway_cfg.clone(); let host = gateway_host.clone(); async move { Box::pin(crate::gateway::run_gateway(&host, port, cfg)).await } }, )); } { if has_supervised_channels(&config) { let channels_cfg = config.clone(); handles.push(spawn_component_supervisor( "channels", initial_backoff, max_backoff, move || { let cfg = channels_cfg.clone(); async move { Box::pin(crate::channels::start_channels(cfg)).await } }, )); } else { crate::health::mark_component_ok("channels"); tracing::info!("No real-time channels configured; channel supervisor disabled"); } } if config.heartbeat.enabled { let heartbeat_cfg = config.clone(); handles.push(spawn_component_supervisor( "heartbeat", initial_backoff, max_backoff, move || { let cfg = heartbeat_cfg.clone(); async move { Box::pin(run_heartbeat_worker(cfg)).await } }, )); } if config.cron.enabled { let scheduler_cfg = config.clone(); handles.push(spawn_component_supervisor( "scheduler", initial_backoff, max_backoff, move || { let cfg = scheduler_cfg.clone(); async move { Box::pin(crate::cron::scheduler::run(cfg)).await } }, )); } else { crate::health::mark_component_ok("scheduler"); tracing::info!("Cron disabled; scheduler supervisor not started"); } println!("🧠 ZeroClaw daemon started"); println!(" Gateway: http://{host}:{port}"); println!(" Components: gateway, channels, heartbeat, scheduler"); println!(" Ctrl+C or SIGTERM to stop"); // Wait for shutdown signal (SIGINT or SIGTERM) wait_for_shutdown_signal().await?; crate::health::mark_component_error("daemon", "shutdown requested"); for handle in &handles { handle.abort(); } for handle in handles { let _ = handle.await; } Ok(()) } pub fn state_file_path(config: &Config) -> PathBuf { config .config_path .parent() .map_or_else(|| PathBuf::from("."), PathBuf::from) .join("daemon_state.json") } fn spawn_state_writer(config: Config) -> JoinHandle<()> { tokio::spawn(async move { let path = state_file_path(&config); if let Some(parent) = path.parent() { let _ = tokio::fs::create_dir_all(parent).await; } let mut interval = tokio::time::interval(Duration::from_secs(STATUS_FLUSH_SECONDS)); loop { interval.tick().await; let mut json = crate::health::snapshot_json(); if let Some(obj) = json.as_object_mut() { obj.insert( "written_at".into(), serde_json::json!(Utc::now().to_rfc3339()), ); } let data = serde_json::to_vec_pretty(&json).unwrap_or_else(|_| b"{}".to_vec()); let _ = tokio::fs::write(&path, data).await; } }) } fn spawn_component_supervisor( name: &'static str, initial_backoff_secs: u64, max_backoff_secs: u64, mut run_component: F, ) -> JoinHandle<()> where F: FnMut() -> Fut + Send + 'static, Fut: Future> + Send + 'static, { tokio::spawn(async move { let mut backoff = initial_backoff_secs.max(1); let max_backoff = max_backoff_secs.max(backoff); loop { crate::health::mark_component_ok(name); match run_component().await { Ok(()) => { crate::health::mark_component_error(name, "component exited unexpectedly"); tracing::warn!("Daemon component '{name}' exited unexpectedly"); // Clean exit — reset backoff since the component ran successfully backoff = initial_backoff_secs.max(1); } Err(e) => { crate::health::mark_component_error(name, e.to_string()); tracing::error!("Daemon component '{name}' failed: {e}"); } } crate::health::bump_component_restart(name); tokio::time::sleep(Duration::from_secs(backoff)).await; // Double backoff AFTER sleeping so first error uses initial_backoff backoff = backoff.saturating_mul(2).min(max_backoff); } }) } async fn run_heartbeat_worker(config: Config) -> Result<()> { use crate::heartbeat::engine::{ compute_adaptive_interval, HeartbeatEngine, HeartbeatTask, TaskPriority, TaskStatus, }; use std::sync::Arc; let observer: std::sync::Arc = std::sync::Arc::from(crate::observability::create_observer(&config.observability)); let engine = HeartbeatEngine::new( config.heartbeat.clone(), config.workspace_dir.clone(), observer, ); let metrics = engine.metrics(); let delivery = resolve_heartbeat_delivery(&config)?; let two_phase = config.heartbeat.two_phase; let adaptive = config.heartbeat.adaptive; let start_time = std::time::Instant::now(); // ── Deadman watcher ────────────────────────────────────────── let deadman_timeout = config.heartbeat.deadman_timeout_minutes; if deadman_timeout > 0 { let dm_metrics = Arc::clone(&metrics); let dm_config = config.clone(); let dm_delivery = delivery.clone(); tokio::spawn(async move { let check_interval = Duration::from_secs(60); let timeout = chrono::Duration::minutes(i64::from(deadman_timeout)); loop { tokio::time::sleep(check_interval).await; let last_tick = dm_metrics.lock().last_tick_at; if let Some(last) = last_tick { if chrono::Utc::now() - last > timeout { let alert = format!( "⚠️ Heartbeat dead-man's switch: no tick in {deadman_timeout} minutes" ); let (channel, target) = if let Some(ch) = &dm_config.heartbeat.deadman_channel { let to = dm_config .heartbeat .deadman_to .as_deref() .or(dm_config.heartbeat.to.as_deref()) .unwrap_or_default(); (ch.clone(), to.to_string()) } else if let Some((ch, to)) = &dm_delivery { (ch.clone(), to.clone()) } else { continue; }; let _ = crate::cron::scheduler::deliver_announcement( &dm_config, &channel, &target, &alert, ) .await; } } } }); } let base_interval = config.heartbeat.interval_minutes.max(5); let mut sleep_mins = base_interval; loop { tokio::time::sleep(Duration::from_secs(u64::from(sleep_mins) * 60)).await; // Update uptime { let mut m = metrics.lock(); m.uptime_secs = start_time.elapsed().as_secs(); } let tick_start = std::time::Instant::now(); // Collect runnable tasks (active only, sorted by priority) let mut tasks = engine.collect_runnable_tasks().await?; let has_high_priority = tasks.iter().any(|t| t.priority == TaskPriority::High); if tasks.is_empty() { if let Some(fallback) = config .heartbeat .message .as_deref() .map(str::trim) .filter(|m| !m.is_empty()) { tasks.push(HeartbeatTask { text: fallback.to_string(), priority: TaskPriority::Medium, status: TaskStatus::Active, }); } else { #[allow(clippy::cast_precision_loss)] let elapsed = tick_start.elapsed().as_millis() as f64; metrics.lock().record_success(elapsed); continue; } } // ── Phase 1: LLM decision (two-phase mode) ────────────── let tasks_to_run = if two_phase { let decision_prompt = HeartbeatEngine::build_decision_prompt(&tasks); match Box::pin(crate::agent::run( config.clone(), Some(decision_prompt), None, None, 0.0, vec![], false, None, None, )) .await { Ok(response) => { let indices = HeartbeatEngine::parse_decision_response(&response, tasks.len()); if indices.is_empty() { tracing::info!("💓 Heartbeat Phase 1: skip (nothing to do)"); crate::health::mark_component_ok("heartbeat"); #[allow(clippy::cast_precision_loss)] let elapsed = tick_start.elapsed().as_millis() as f64; metrics.lock().record_success(elapsed); continue; } tracing::info!( "💓 Heartbeat Phase 1: run {} of {} tasks", indices.len(), tasks.len() ); indices .into_iter() .filter_map(|i| tasks.get(i).cloned()) .collect() } Err(e) => { tracing::warn!("💓 Heartbeat Phase 1 failed, running all tasks: {e}"); tasks } } } else { tasks }; // ── Phase 2: Execute selected tasks ───────────────────── let mut tick_had_error = false; for task in &tasks_to_run { let task_start = std::time::Instant::now(); let prompt = format!("[Heartbeat Task | {}] {}", task.priority, task.text); let temp = config.default_temperature; match Box::pin(crate::agent::run( config.clone(), Some(prompt), None, None, temp, vec![], false, None, None, )) .await { Ok(output) => { crate::health::mark_component_ok("heartbeat"); #[allow(clippy::cast_possible_truncation)] let duration_ms = task_start.elapsed().as_millis() as i64; let now = chrono::Utc::now(); let _ = crate::heartbeat::store::record_run( &config.workspace_dir, &task.text, &task.priority.to_string(), now - chrono::Duration::milliseconds(duration_ms), now, "ok", Some(output.as_str()), duration_ms, config.heartbeat.max_run_history, ); let announcement = if output.trim().is_empty() { format!("💓 heartbeat task completed: {}", task.text) } else { output }; if let Some((channel, target)) = &delivery { if let Err(e) = crate::cron::scheduler::deliver_announcement( &config, channel, target, &announcement, ) .await { crate::health::mark_component_error( "heartbeat", format!("delivery failed: {e}"), ); tracing::warn!("Heartbeat delivery failed: {e}"); } } } Err(e) => { tick_had_error = true; #[allow(clippy::cast_possible_truncation)] let duration_ms = task_start.elapsed().as_millis() as i64; let now = chrono::Utc::now(); let _ = crate::heartbeat::store::record_run( &config.workspace_dir, &task.text, &task.priority.to_string(), now - chrono::Duration::milliseconds(duration_ms), now, "error", Some(&e.to_string()), duration_ms, config.heartbeat.max_run_history, ); crate::health::mark_component_error("heartbeat", e.to_string()); tracing::warn!("Heartbeat task failed: {e}"); } } } // Update metrics #[allow(clippy::cast_precision_loss)] let tick_elapsed = tick_start.elapsed().as_millis() as f64; { let mut m = metrics.lock(); if tick_had_error { m.record_failure(tick_elapsed); } else { m.record_success(tick_elapsed); } } // Compute next sleep interval if adaptive { let failures = metrics.lock().consecutive_failures; sleep_mins = compute_adaptive_interval( base_interval, config.heartbeat.min_interval_minutes, config.heartbeat.max_interval_minutes, failures, has_high_priority, ); } else { sleep_mins = base_interval; } } } /// Resolve delivery target: explicit config > auto-detect first configured channel. fn resolve_heartbeat_delivery(config: &Config) -> Result> { let channel = config .heartbeat .target .as_deref() .map(str::trim) .filter(|value| !value.is_empty()); let target = config .heartbeat .to .as_deref() .map(str::trim) .filter(|value| !value.is_empty()); match (channel, target) { // Both explicitly set — validate and use. (Some(channel), Some(target)) => { validate_heartbeat_channel_config(config, channel)?; Ok(Some((channel.to_string(), target.to_string()))) } // Only one set — error. (Some(_), None) => anyhow::bail!("heartbeat.to is required when heartbeat.target is set"), (None, Some(_)) => anyhow::bail!("heartbeat.target is required when heartbeat.to is set"), // Neither set — try auto-detect the first configured channel. (None, None) => Ok(auto_detect_heartbeat_channel(config)), } } /// Auto-detect the best channel for heartbeat delivery by checking which /// channels are configured. Returns the first match in priority order. fn auto_detect_heartbeat_channel(config: &Config) -> Option<(String, String)> { // Priority order: telegram > discord > slack > mattermost if let Some(tg) = &config.channels_config.telegram { // Use the first allowed_user as target, or fall back to empty (broadcast) let target = tg.allowed_users.first().cloned().unwrap_or_default(); if !target.is_empty() { return Some(("telegram".to_string(), target)); } } if config.channels_config.discord.is_some() { // Discord requires explicit target — can't auto-detect return None; } if config.channels_config.slack.is_some() { // Slack requires explicit target return None; } if config.channels_config.mattermost.is_some() { // Mattermost requires explicit target return None; } None } fn validate_heartbeat_channel_config(config: &Config, channel: &str) -> Result<()> { match channel.to_ascii_lowercase().as_str() { "telegram" => { if config.channels_config.telegram.is_none() { anyhow::bail!( "heartbeat.target is set to telegram but channels_config.telegram is not configured" ); } } "discord" => { if config.channels_config.discord.is_none() { anyhow::bail!( "heartbeat.target is set to discord but channels_config.discord is not configured" ); } } "slack" => { if config.channels_config.slack.is_none() { anyhow::bail!( "heartbeat.target is set to slack but channels_config.slack is not configured" ); } } "mattermost" => { if config.channels_config.mattermost.is_none() { anyhow::bail!( "heartbeat.target is set to mattermost but channels_config.mattermost is not configured" ); } } other => anyhow::bail!("unsupported heartbeat.target channel: {other}"), } Ok(()) } fn has_supervised_channels(config: &Config) -> bool { config .channels_config .channels_except_webhook() .iter() .any(|(_, ok)| *ok) } #[cfg(test)] mod tests { use super::*; use tempfile::TempDir; fn test_config(tmp: &TempDir) -> Config { let config = Config { workspace_dir: tmp.path().join("workspace"), config_path: tmp.path().join("config.toml"), ..Config::default() }; std::fs::create_dir_all(&config.workspace_dir).unwrap(); config } #[test] fn state_file_path_uses_config_directory() { let tmp = TempDir::new().unwrap(); let config = test_config(&tmp); let path = state_file_path(&config); assert_eq!(path, tmp.path().join("daemon_state.json")); } #[tokio::test] async fn supervisor_marks_error_and_restart_on_failure() { let handle = spawn_component_supervisor("daemon-test-fail", 1, 1, || async { anyhow::bail!("boom") }); tokio::time::sleep(Duration::from_millis(50)).await; handle.abort(); let _ = handle.await; let snapshot = crate::health::snapshot_json(); let component = &snapshot["components"]["daemon-test-fail"]; assert_eq!(component["status"], "error"); assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1); assert!(component["last_error"] .as_str() .unwrap_or("") .contains("boom")); } #[tokio::test] async fn supervisor_marks_unexpected_exit_as_error() { let handle = spawn_component_supervisor("daemon-test-exit", 1, 1, || async { Ok(()) }); tokio::time::sleep(Duration::from_millis(50)).await; handle.abort(); let _ = handle.await; let snapshot = crate::health::snapshot_json(); let component = &snapshot["components"]["daemon-test-exit"]; assert_eq!(component["status"], "error"); assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1); assert!(component["last_error"] .as_str() .unwrap_or("") .contains("component exited unexpectedly")); } #[test] fn detects_no_supervised_channels() { let config = Config::default(); assert!(!has_supervised_channels(&config)); } #[test] fn detects_supervised_channels_present() { let mut config = Config::default(); config.channels_config.telegram = Some(crate::config::TelegramConfig { bot_token: "token".into(), allowed_users: vec![], stream_mode: crate::config::StreamMode::default(), draft_update_interval_ms: 1000, interrupt_on_new_message: false, mention_only: false, }); assert!(has_supervised_channels(&config)); } #[test] fn detects_dingtalk_as_supervised_channel() { let mut config = Config::default(); config.channels_config.dingtalk = Some(crate::config::schema::DingTalkConfig { client_id: "client_id".into(), client_secret: "client_secret".into(), allowed_users: vec!["*".into()], }); assert!(has_supervised_channels(&config)); } #[test] fn detects_mattermost_as_supervised_channel() { let mut config = Config::default(); config.channels_config.mattermost = Some(crate::config::schema::MattermostConfig { url: "https://mattermost.example.com".into(), bot_token: "token".into(), channel_id: Some("channel-id".into()), allowed_users: vec!["*".into()], thread_replies: Some(true), mention_only: Some(false), }); assert!(has_supervised_channels(&config)); } #[test] fn detects_qq_as_supervised_channel() { let mut config = Config::default(); config.channels_config.qq = Some(crate::config::schema::QQConfig { app_id: "app-id".into(), app_secret: "app-secret".into(), allowed_users: vec!["*".into()], }); assert!(has_supervised_channels(&config)); } #[test] fn detects_nextcloud_talk_as_supervised_channel() { let mut config = Config::default(); config.channels_config.nextcloud_talk = Some(crate::config::schema::NextcloudTalkConfig { base_url: "https://cloud.example.com".into(), app_token: "app-token".into(), webhook_secret: None, allowed_users: vec!["*".into()], }); assert!(has_supervised_channels(&config)); } #[test] fn resolve_delivery_none_when_unset() { let config = Config::default(); let target = resolve_heartbeat_delivery(&config).unwrap(); assert!(target.is_none()); } #[test] fn resolve_delivery_requires_to_field() { let mut config = Config::default(); config.heartbeat.target = Some("telegram".into()); let err = resolve_heartbeat_delivery(&config).unwrap_err(); assert!(err .to_string() .contains("heartbeat.to is required when heartbeat.target is set")); } #[test] fn resolve_delivery_requires_target_field() { let mut config = Config::default(); config.heartbeat.to = Some("123456".into()); let err = resolve_heartbeat_delivery(&config).unwrap_err(); assert!(err .to_string() .contains("heartbeat.target is required when heartbeat.to is set")); } #[test] fn resolve_delivery_rejects_unsupported_channel() { let mut config = Config::default(); config.heartbeat.target = Some("email".into()); config.heartbeat.to = Some("ops@example.com".into()); let err = resolve_heartbeat_delivery(&config).unwrap_err(); assert!(err .to_string() .contains("unsupported heartbeat.target channel")); } #[test] fn resolve_delivery_requires_channel_configuration() { let mut config = Config::default(); config.heartbeat.target = Some("telegram".into()); config.heartbeat.to = Some("123456".into()); let err = resolve_heartbeat_delivery(&config).unwrap_err(); assert!(err .to_string() .contains("channels_config.telegram is not configured")); } #[test] fn resolve_delivery_accepts_telegram_configuration() { let mut config = Config::default(); config.heartbeat.target = Some("telegram".into()); config.heartbeat.to = Some("123456".into()); config.channels_config.telegram = Some(crate::config::TelegramConfig { bot_token: "bot-token".into(), allowed_users: vec![], stream_mode: crate::config::StreamMode::default(), draft_update_interval_ms: 1000, interrupt_on_new_message: false, mention_only: false, }); let target = resolve_heartbeat_delivery(&config).unwrap(); assert_eq!(target, Some(("telegram".to_string(), "123456".to_string()))); } #[test] fn auto_detect_telegram_when_configured() { let mut config = Config::default(); config.channels_config.telegram = Some(crate::config::TelegramConfig { bot_token: "bot-token".into(), allowed_users: vec!["user123".into()], stream_mode: crate::config::StreamMode::default(), draft_update_interval_ms: 1000, interrupt_on_new_message: false, mention_only: false, }); let target = resolve_heartbeat_delivery(&config).unwrap(); assert_eq!( target, Some(("telegram".to_string(), "user123".to_string())) ); } #[test] fn auto_detect_none_when_no_channels() { let config = Config::default(); let target = auto_detect_heartbeat_channel(&config); assert!(target.is_none()); } /// Verify that SIGHUP does not cause shutdown — the daemon should ignore it /// and only terminate on SIGINT or SIGTERM. #[cfg(unix)] #[tokio::test] async fn sighup_does_not_shut_down_daemon() { use libc; use tokio::time::{timeout, Duration}; let handle = tokio::spawn(wait_for_shutdown_signal()); // Give the signal handler time to register tokio::time::sleep(Duration::from_millis(50)).await; // Send SIGHUP to ourselves — should be ignored by the handler unsafe { libc::raise(libc::SIGHUP) }; // The future should NOT complete within a short window let result = timeout(Duration::from_millis(200), handle).await; assert!( result.is_err(), "wait_for_shutdown_signal should not return after SIGHUP" ); } }