From 8dc4f3722b6caa47a9dbf4325a12ffe2c6ce6119 Mon Sep 17 00:00:00 2001 From: argenis de la rosa Date: Wed, 4 Mar 2026 05:05:10 -0500 Subject: [PATCH] fix(daemon): add shutdown grace window and signal hint parity --- src/daemon/mod.rs | 77 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 72 insertions(+), 5 deletions(-) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index bb0a4720d..6ac634e65 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -7,6 +7,7 @@ use tokio::task::JoinHandle; use tokio::time::Duration; const STATUS_FLUSH_SECONDS: u64 = 5; +const SHUTDOWN_GRACE_SECONDS: u64 = 5; #[derive(Clone, Copy, Debug, PartialEq, Eq)] enum ShutdownSignal { @@ -21,6 +22,16 @@ fn shutdown_reason(signal: ShutdownSignal) -> &'static str { } } +#[cfg(unix)] +fn shutdown_hint() -> &'static str { + "Ctrl+C or SIGTERM to stop" +} + +#[cfg(not(unix))] +fn shutdown_hint() -> &'static str { + "Ctrl+C to stop" +} + async fn wait_for_shutdown_signal() -> Result { #[cfg(unix)] { @@ -32,7 +43,10 @@ async fn wait_for_shutdown_signal() -> Result { ctrl_c?; Ok(ShutdownSignal::CtrlC) } - _ = sigterm.recv() => Ok(ShutdownSignal::SigTerm), + sigterm_result = sigterm.recv() => match sigterm_result { + Some(()) => Ok(ShutdownSignal::SigTerm), + None => bail!("SIGTERM signal stream unexpectedly closed"), + }, } } #[cfg(not(unix))] @@ -140,19 +154,40 @@ pub async fn run(config: Config, host: String, port: u16) -> Result<()> { println!("🧠 ZeroClaw daemon started"); println!(" Gateway: http://{host}:{port}"); println!(" Components: gateway, channels, heartbeat, scheduler"); - println!(" Ctrl+C or SIGTERM to stop"); + println!(" {}", shutdown_hint()); let signal = wait_for_shutdown_signal().await?; crate::health::mark_component_error("daemon", shutdown_reason(signal)); + let aborted = + shutdown_handles_with_grace(handles, Duration::from_secs(SHUTDOWN_GRACE_SECONDS)).await; + if aborted > 0 { + tracing::warn!( + aborted, + grace_seconds = SHUTDOWN_GRACE_SECONDS, + "Forced shutdown for daemon tasks that exceeded graceful drain window" + ); + } + Ok(()) +} + +async fn shutdown_handles_with_grace(handles: Vec>, grace: Duration) -> usize { + let deadline = tokio::time::Instant::now() + grace; + while !handles.iter().all(JoinHandle::is_finished) && tokio::time::Instant::now() < deadline { + tokio::time::sleep(Duration::from_millis(50)).await; + } + + let mut aborted = 0usize; for handle in &handles { - handle.abort(); + if !handle.is_finished() { + handle.abort(); + aborted += 1; + } } for handle in handles { let _ = handle.await; } - - Ok(()) + aborted } pub fn state_file_path(config: &Config) -> PathBuf { @@ -494,6 +529,38 @@ mod tests { ); } + #[test] + fn shutdown_hint_matches_platform_signal_support() { + #[cfg(unix)] + assert_eq!(shutdown_hint(), "Ctrl+C or SIGTERM to stop"); + + #[cfg(not(unix))] + assert_eq!(shutdown_hint(), "Ctrl+C to stop"); + } + + #[tokio::test] + async fn graceful_shutdown_waits_for_completed_handles_without_abort() { + let finished = tokio::spawn(async {}); + let aborted = shutdown_handles_with_grace(vec![finished], Duration::from_millis(20)).await; + assert_eq!(aborted, 0); + } + + #[tokio::test] + async fn graceful_shutdown_aborts_stuck_handles_after_timeout() { + let never_finishes = tokio::spawn(async { + tokio::time::sleep(Duration::from_secs(30)).await; + }); + let started = tokio::time::Instant::now(); + let aborted = + shutdown_handles_with_grace(vec![never_finishes], Duration::from_millis(20)).await; + + assert_eq!(aborted, 1); + assert!( + started.elapsed() < Duration::from_secs(2), + "shutdown should not block indefinitely" + ); + } + #[tokio::test] async fn supervisor_marks_error_and_restart_on_failure() { let handle = spawn_component_supervisor("daemon-test-fail", 1, 1, || async {