zeroclaw/src/daemon/mod.rs
Argenis d9ab017df0
feat(tools): add Microsoft 365 integration via Graph API (#3653)
Add Microsoft 365 tool providing access to Outlook mail, Teams messages,
Calendar events, OneDrive files, and SharePoint search via Microsoft
Graph API. Includes OAuth2 token caching (client credentials and device
code flows), security policy enforcement, and config validation.

Rebased on latest master, resolving conflicts with SwarmConfig exports
and adding approval_manager to ChannelRuntimeContext test constructors.

Original work by @rareba.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-16 01:44:39 -04:00

659 lines
22 KiB
Rust

use crate::config::Config;
use anyhow::Result;
use chrono::Utc;
use std::future::Future;
use std::path::PathBuf;
use tokio::task::JoinHandle;
use tokio::time::Duration;
const STATUS_FLUSH_SECONDS: u64 = 5;
/// Wait for shutdown signal (SIGINT or SIGTERM)
async fn wait_for_shutdown_signal() -> Result<()> {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;
tokio::select! {
_ = sigint.recv() => {
tracing::info!("Received SIGINT, shutting down...");
}
_ = sigterm.recv() => {
tracing::info!("Received SIGTERM, shutting down...");
}
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c().await?;
tracing::info!("Received Ctrl+C, shutting down...");
}
Ok(())
}
pub async fn run(config: Config, host: String, port: u16) -> Result<()> {
let initial_backoff = config.reliability.channel_initial_backoff_secs.max(1);
let max_backoff = config
.reliability
.channel_max_backoff_secs
.max(initial_backoff);
crate::health::mark_component_ok("daemon");
if config.heartbeat.enabled {
let _ =
crate::heartbeat::engine::HeartbeatEngine::ensure_heartbeat_file(&config.workspace_dir)
.await;
}
let mut handles: Vec<JoinHandle<()>> = vec![spawn_state_writer(config.clone())];
{
let gateway_cfg = config.clone();
let gateway_host = host.clone();
handles.push(spawn_component_supervisor(
"gateway",
initial_backoff,
max_backoff,
move || {
let cfg = gateway_cfg.clone();
let host = gateway_host.clone();
async move { crate::gateway::run_gateway(&host, port, cfg).await }
},
));
}
{
if has_supervised_channels(&config) {
let channels_cfg = config.clone();
handles.push(spawn_component_supervisor(
"channels",
initial_backoff,
max_backoff,
move || {
let cfg = channels_cfg.clone();
async move { Box::pin(crate::channels::start_channels(cfg)).await }
},
));
} else {
crate::health::mark_component_ok("channels");
tracing::info!("No real-time channels configured; channel supervisor disabled");
}
}
if config.heartbeat.enabled {
let heartbeat_cfg = config.clone();
handles.push(spawn_component_supervisor(
"heartbeat",
initial_backoff,
max_backoff,
move || {
let cfg = heartbeat_cfg.clone();
async move { Box::pin(run_heartbeat_worker(cfg)).await }
},
));
}
if config.cron.enabled {
let scheduler_cfg = config.clone();
handles.push(spawn_component_supervisor(
"scheduler",
initial_backoff,
max_backoff,
move || {
let cfg = scheduler_cfg.clone();
async move { crate::cron::scheduler::run(cfg).await }
},
));
} else {
crate::health::mark_component_ok("scheduler");
tracing::info!("Cron disabled; scheduler supervisor not started");
}
println!("🧠 ZeroClaw daemon started");
println!(" Gateway: http://{host}:{port}");
println!(" Components: gateway, channels, heartbeat, scheduler");
println!(" Ctrl+C or SIGTERM to stop");
// Wait for shutdown signal (SIGINT or SIGTERM)
wait_for_shutdown_signal().await?;
crate::health::mark_component_error("daemon", "shutdown requested");
for handle in &handles {
handle.abort();
}
for handle in handles {
let _ = handle.await;
}
Ok(())
}
pub fn state_file_path(config: &Config) -> PathBuf {
config
.config_path
.parent()
.map_or_else(|| PathBuf::from("."), PathBuf::from)
.join("daemon_state.json")
}
fn spawn_state_writer(config: Config) -> JoinHandle<()> {
tokio::spawn(async move {
let path = state_file_path(&config);
if let Some(parent) = path.parent() {
let _ = tokio::fs::create_dir_all(parent).await;
}
let mut interval = tokio::time::interval(Duration::from_secs(STATUS_FLUSH_SECONDS));
loop {
interval.tick().await;
let mut json = crate::health::snapshot_json();
if let Some(obj) = json.as_object_mut() {
obj.insert(
"written_at".into(),
serde_json::json!(Utc::now().to_rfc3339()),
);
}
let data = serde_json::to_vec_pretty(&json).unwrap_or_else(|_| b"{}".to_vec());
let _ = tokio::fs::write(&path, data).await;
}
})
}
fn spawn_component_supervisor<F, Fut>(
name: &'static str,
initial_backoff_secs: u64,
max_backoff_secs: u64,
mut run_component: F,
) -> JoinHandle<()>
where
F: FnMut() -> Fut + Send + 'static,
Fut: Future<Output = Result<()>> + Send + 'static,
{
tokio::spawn(async move {
let mut backoff = initial_backoff_secs.max(1);
let max_backoff = max_backoff_secs.max(backoff);
loop {
crate::health::mark_component_ok(name);
match run_component().await {
Ok(()) => {
crate::health::mark_component_error(name, "component exited unexpectedly");
tracing::warn!("Daemon component '{name}' exited unexpectedly");
// Clean exit — reset backoff since the component ran successfully
backoff = initial_backoff_secs.max(1);
}
Err(e) => {
crate::health::mark_component_error(name, e.to_string());
tracing::error!("Daemon component '{name}' failed: {e}");
}
}
crate::health::bump_component_restart(name);
tokio::time::sleep(Duration::from_secs(backoff)).await;
// Double backoff AFTER sleeping so first error uses initial_backoff
backoff = backoff.saturating_mul(2).min(max_backoff);
}
})
}
async fn run_heartbeat_worker(config: Config) -> Result<()> {
use crate::heartbeat::engine::HeartbeatEngine;
let observer: std::sync::Arc<dyn crate::observability::Observer> =
std::sync::Arc::from(crate::observability::create_observer(&config.observability));
let engine = HeartbeatEngine::new(
config.heartbeat.clone(),
config.workspace_dir.clone(),
observer,
);
let delivery = resolve_heartbeat_delivery(&config)?;
let two_phase = config.heartbeat.two_phase;
let interval_mins = config.heartbeat.interval_minutes.max(5);
let mut interval = tokio::time::interval(Duration::from_secs(u64::from(interval_mins) * 60));
loop {
interval.tick().await;
// Collect runnable tasks (active only, sorted by priority)
let mut tasks = engine.collect_runnable_tasks().await?;
if tasks.is_empty() {
// Try fallback message
if let Some(fallback) = config
.heartbeat
.message
.as_deref()
.map(str::trim)
.filter(|m| !m.is_empty())
{
tasks.push(crate::heartbeat::engine::HeartbeatTask {
text: fallback.to_string(),
priority: crate::heartbeat::engine::TaskPriority::Medium,
status: crate::heartbeat::engine::TaskStatus::Active,
});
} else {
continue;
}
}
// ── Phase 1: LLM decision (two-phase mode) ──────────────
let tasks_to_run = if two_phase {
let decision_prompt = HeartbeatEngine::build_decision_prompt(&tasks);
match Box::pin(crate::agent::run(
config.clone(),
Some(decision_prompt),
None,
None,
0.0, // Low temperature for deterministic decision
vec![],
false,
None,
None,
))
.await
{
Ok(response) => {
let indices = HeartbeatEngine::parse_decision_response(&response, tasks.len());
if indices.is_empty() {
tracing::info!("💓 Heartbeat Phase 1: skip (nothing to do)");
crate::health::mark_component_ok("heartbeat");
continue;
}
tracing::info!(
"💓 Heartbeat Phase 1: run {} of {} tasks",
indices.len(),
tasks.len()
);
indices
.into_iter()
.filter_map(|i| tasks.get(i).cloned())
.collect()
}
Err(e) => {
tracing::warn!("💓 Heartbeat Phase 1 failed, running all tasks: {e}");
tasks
}
}
} else {
tasks
};
// ── Phase 2: Execute selected tasks ─────────────────────
for task in &tasks_to_run {
let prompt = format!("[Heartbeat Task | {}] {}", task.priority, task.text);
let temp = config.default_temperature;
match Box::pin(crate::agent::run(
config.clone(),
Some(prompt),
None,
None,
temp,
vec![],
false,
None,
None,
))
.await
{
Ok(output) => {
crate::health::mark_component_ok("heartbeat");
let announcement = if output.trim().is_empty() {
format!("💓 heartbeat task completed: {}", task.text)
} else {
output
};
if let Some((channel, target)) = &delivery {
if let Err(e) = crate::cron::scheduler::deliver_announcement(
&config,
channel,
target,
&announcement,
)
.await
{
crate::health::mark_component_error(
"heartbeat",
format!("delivery failed: {e}"),
);
tracing::warn!("Heartbeat delivery failed: {e}");
}
}
}
Err(e) => {
crate::health::mark_component_error("heartbeat", e.to_string());
tracing::warn!("Heartbeat task failed: {e}");
}
}
}
}
}
/// Resolve delivery target: explicit config > auto-detect first configured channel.
fn resolve_heartbeat_delivery(config: &Config) -> Result<Option<(String, String)>> {
let channel = config
.heartbeat
.target
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty());
let target = config
.heartbeat
.to
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty());
match (channel, target) {
// Both explicitly set — validate and use.
(Some(channel), Some(target)) => {
validate_heartbeat_channel_config(config, channel)?;
Ok(Some((channel.to_string(), target.to_string())))
}
// Only one set — error.
(Some(_), None) => anyhow::bail!("heartbeat.to is required when heartbeat.target is set"),
(None, Some(_)) => anyhow::bail!("heartbeat.target is required when heartbeat.to is set"),
// Neither set — try auto-detect the first configured channel.
(None, None) => Ok(auto_detect_heartbeat_channel(config)),
}
}
/// Auto-detect the best channel for heartbeat delivery by checking which
/// channels are configured. Returns the first match in priority order.
fn auto_detect_heartbeat_channel(config: &Config) -> Option<(String, String)> {
// Priority order: telegram > discord > slack > mattermost
if let Some(tg) = &config.channels_config.telegram {
// Use the first allowed_user as target, or fall back to empty (broadcast)
let target = tg.allowed_users.first().cloned().unwrap_or_default();
if !target.is_empty() {
return Some(("telegram".to_string(), target));
}
}
if config.channels_config.discord.is_some() {
// Discord requires explicit target — can't auto-detect
return None;
}
if config.channels_config.slack.is_some() {
// Slack requires explicit target
return None;
}
if config.channels_config.mattermost.is_some() {
// Mattermost requires explicit target
return None;
}
None
}
fn validate_heartbeat_channel_config(config: &Config, channel: &str) -> Result<()> {
match channel.to_ascii_lowercase().as_str() {
"telegram" => {
if config.channels_config.telegram.is_none() {
anyhow::bail!(
"heartbeat.target is set to telegram but channels_config.telegram is not configured"
);
}
}
"discord" => {
if config.channels_config.discord.is_none() {
anyhow::bail!(
"heartbeat.target is set to discord but channels_config.discord is not configured"
);
}
}
"slack" => {
if config.channels_config.slack.is_none() {
anyhow::bail!(
"heartbeat.target is set to slack but channels_config.slack is not configured"
);
}
}
"mattermost" => {
if config.channels_config.mattermost.is_none() {
anyhow::bail!(
"heartbeat.target is set to mattermost but channels_config.mattermost is not configured"
);
}
}
other => anyhow::bail!("unsupported heartbeat.target channel: {other}"),
}
Ok(())
}
fn has_supervised_channels(config: &Config) -> bool {
config
.channels_config
.channels_except_webhook()
.iter()
.any(|(_, ok)| *ok)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn test_config(tmp: &TempDir) -> Config {
let config = Config {
workspace_dir: tmp.path().join("workspace"),
config_path: tmp.path().join("config.toml"),
..Config::default()
};
std::fs::create_dir_all(&config.workspace_dir).unwrap();
config
}
#[test]
fn state_file_path_uses_config_directory() {
let tmp = TempDir::new().unwrap();
let config = test_config(&tmp);
let path = state_file_path(&config);
assert_eq!(path, tmp.path().join("daemon_state.json"));
}
#[tokio::test]
async fn supervisor_marks_error_and_restart_on_failure() {
let handle = spawn_component_supervisor("daemon-test-fail", 1, 1, || async {
anyhow::bail!("boom")
});
tokio::time::sleep(Duration::from_millis(50)).await;
handle.abort();
let _ = handle.await;
let snapshot = crate::health::snapshot_json();
let component = &snapshot["components"]["daemon-test-fail"];
assert_eq!(component["status"], "error");
assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
assert!(component["last_error"]
.as_str()
.unwrap_or("")
.contains("boom"));
}
#[tokio::test]
async fn supervisor_marks_unexpected_exit_as_error() {
let handle = spawn_component_supervisor("daemon-test-exit", 1, 1, || async { Ok(()) });
tokio::time::sleep(Duration::from_millis(50)).await;
handle.abort();
let _ = handle.await;
let snapshot = crate::health::snapshot_json();
let component = &snapshot["components"]["daemon-test-exit"];
assert_eq!(component["status"], "error");
assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
assert!(component["last_error"]
.as_str()
.unwrap_or("")
.contains("component exited unexpectedly"));
}
#[test]
fn detects_no_supervised_channels() {
let config = Config::default();
assert!(!has_supervised_channels(&config));
}
#[test]
fn detects_supervised_channels_present() {
let mut config = Config::default();
config.channels_config.telegram = Some(crate::config::TelegramConfig {
bot_token: "token".into(),
allowed_users: vec![],
stream_mode: crate::config::StreamMode::default(),
draft_update_interval_ms: 1000,
interrupt_on_new_message: false,
mention_only: false,
});
assert!(has_supervised_channels(&config));
}
#[test]
fn detects_dingtalk_as_supervised_channel() {
let mut config = Config::default();
config.channels_config.dingtalk = Some(crate::config::schema::DingTalkConfig {
client_id: "client_id".into(),
client_secret: "client_secret".into(),
allowed_users: vec!["*".into()],
});
assert!(has_supervised_channels(&config));
}
#[test]
fn detects_mattermost_as_supervised_channel() {
let mut config = Config::default();
config.channels_config.mattermost = Some(crate::config::schema::MattermostConfig {
url: "https://mattermost.example.com".into(),
bot_token: "token".into(),
channel_id: Some("channel-id".into()),
allowed_users: vec!["*".into()],
thread_replies: Some(true),
mention_only: Some(false),
});
assert!(has_supervised_channels(&config));
}
#[test]
fn detects_qq_as_supervised_channel() {
let mut config = Config::default();
config.channels_config.qq = Some(crate::config::schema::QQConfig {
app_id: "app-id".into(),
app_secret: "app-secret".into(),
allowed_users: vec!["*".into()],
});
assert!(has_supervised_channels(&config));
}
#[test]
fn detects_nextcloud_talk_as_supervised_channel() {
let mut config = Config::default();
config.channels_config.nextcloud_talk = Some(crate::config::schema::NextcloudTalkConfig {
base_url: "https://cloud.example.com".into(),
app_token: "app-token".into(),
webhook_secret: None,
allowed_users: vec!["*".into()],
});
assert!(has_supervised_channels(&config));
}
#[test]
fn resolve_delivery_none_when_unset() {
let config = Config::default();
let target = resolve_heartbeat_delivery(&config).unwrap();
assert!(target.is_none());
}
#[test]
fn resolve_delivery_requires_to_field() {
let mut config = Config::default();
config.heartbeat.target = Some("telegram".into());
let err = resolve_heartbeat_delivery(&config).unwrap_err();
assert!(err
.to_string()
.contains("heartbeat.to is required when heartbeat.target is set"));
}
#[test]
fn resolve_delivery_requires_target_field() {
let mut config = Config::default();
config.heartbeat.to = Some("123456".into());
let err = resolve_heartbeat_delivery(&config).unwrap_err();
assert!(err
.to_string()
.contains("heartbeat.target is required when heartbeat.to is set"));
}
#[test]
fn resolve_delivery_rejects_unsupported_channel() {
let mut config = Config::default();
config.heartbeat.target = Some("email".into());
config.heartbeat.to = Some("ops@example.com".into());
let err = resolve_heartbeat_delivery(&config).unwrap_err();
assert!(err
.to_string()
.contains("unsupported heartbeat.target channel"));
}
#[test]
fn resolve_delivery_requires_channel_configuration() {
let mut config = Config::default();
config.heartbeat.target = Some("telegram".into());
config.heartbeat.to = Some("123456".into());
let err = resolve_heartbeat_delivery(&config).unwrap_err();
assert!(err
.to_string()
.contains("channels_config.telegram is not configured"));
}
#[test]
fn resolve_delivery_accepts_telegram_configuration() {
let mut config = Config::default();
config.heartbeat.target = Some("telegram".into());
config.heartbeat.to = Some("123456".into());
config.channels_config.telegram = Some(crate::config::TelegramConfig {
bot_token: "bot-token".into(),
allowed_users: vec![],
stream_mode: crate::config::StreamMode::default(),
draft_update_interval_ms: 1000,
interrupt_on_new_message: false,
mention_only: false,
});
let target = resolve_heartbeat_delivery(&config).unwrap();
assert_eq!(target, Some(("telegram".to_string(), "123456".to_string())));
}
#[test]
fn auto_detect_telegram_when_configured() {
let mut config = Config::default();
config.channels_config.telegram = Some(crate::config::TelegramConfig {
bot_token: "bot-token".into(),
allowed_users: vec!["user123".into()],
stream_mode: crate::config::StreamMode::default(),
draft_update_interval_ms: 1000,
interrupt_on_new_message: false,
mention_only: false,
});
let target = resolve_heartbeat_delivery(&config).unwrap();
assert_eq!(
target,
Some(("telegram".to_string(), "user123".to_string()))
);
}
#[test]
fn auto_detect_none_when_no_channels() {
let config = Config::default();
let target = auto_detect_heartbeat_channel(&config);
assert!(target.is_none());
}
}