diff --git a/src/config/schema.rs b/src/config/schema.rs index 6eef9146d..3e5303ea4 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -172,6 +172,10 @@ pub struct Config { #[serde(default)] pub cron: CronConfig, + /// Goal loop configuration for autonomous long-term goal execution (`[goal_loop]`). + #[serde(default)] + pub goal_loop: GoalLoopConfig, + /// Channel configurations: Telegram, Discord, Slack, etc. (`[channels_config]`). #[serde(default)] pub channels_config: ChannelsConfig, @@ -2994,6 +2998,40 @@ impl Default for HeartbeatConfig { } } +// ── Goal Loop Config ──────────────────────────────────────────── + +/// Configuration for the autonomous goal loop engine (`[goal_loop]`). +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct GoalLoopConfig { + /// Enable autonomous goal execution. Default: `false`. + pub enabled: bool, + /// Interval in minutes between goal loop cycles. Default: `10`. + pub interval_minutes: u32, + /// Timeout in seconds for a single step execution. Default: `120`. + pub step_timeout_secs: u64, + /// Maximum steps to execute per cycle. Default: `3`. + pub max_steps_per_cycle: u32, + /// Optional channel to deliver goal events to (e.g. "lark", "telegram"). + #[serde(default)] + pub channel: Option, + /// Optional recipient/chat_id for goal event delivery. + #[serde(default)] + pub target: Option, +} + +impl Default for GoalLoopConfig { + fn default() -> Self { + Self { + enabled: false, + interval_minutes: 10, + step_timeout_secs: 120, + max_steps_per_cycle: 3, + channel: None, + target: None, + } + } +} + // ── Cron ──────────────────────────────────────────────────────── /// Cron job configuration (`[cron]` section). @@ -4523,6 +4561,7 @@ impl Default for Config { embedding_routes: Vec::new(), heartbeat: HeartbeatConfig::default(), cron: CronConfig::default(), + goal_loop: GoalLoopConfig::default(), channels_config: ChannelsConfig::default(), memory: MemoryConfig::default(), storage: StorageConfig::default(), @@ -6711,6 +6750,7 @@ default_temperature = 0.7 to: Some("123456".into()), }, cron: CronConfig::default(), + goal_loop: GoalLoopConfig::default(), channels_config: ChannelsConfig { cli: true, telegram: Some(TelegramConfig { @@ -7109,6 +7149,7 @@ tool_dispatcher = "xml" query_classification: QueryClassificationConfig::default(), heartbeat: HeartbeatConfig::default(), cron: CronConfig::default(), + goal_loop: GoalLoopConfig::default(), channels_config: ChannelsConfig::default(), memory: MemoryConfig::default(), storage: StorageConfig::default(), diff --git a/src/cron/consolidation.rs b/src/cron/consolidation.rs new file mode 100644 index 000000000..2e2d792cd --- /dev/null +++ b/src/cron/consolidation.rs @@ -0,0 +1,183 @@ +use crate::config::Config; +use crate::cron::{add_agent_job, CronJob, Schedule, SessionTarget}; +use anyhow::Result; + +/// Default cron expression: 3:00 AM daily. +const DEFAULT_SCHEDULE_EXPR: &str = "0 3 * * *"; + +/// Job name marker used to identify consolidation jobs. +pub const CONSOLIDATION_JOB_NAME: &str = "__consolidate_nightly"; + +/// The prompt instructs the agent to perform memory consolidation using +/// existing tools (cron_runs, memory_recall, memory_store, file_write). +const CONSOLIDATION_PROMPT: &str = "\ +You are running a nightly memory consolidation job. Your goal is to distill \ +the past 24 hours of operational activity into a concise, actionable summary \ +stored in long-term memory. + +Follow these steps exactly: + +1. Use `cron_runs` to review recent job execution results from the past 24 hours. \ + Note any recurring errors, timeouts, or policy denials. + +2. Use `memory_recall` to retrieve today's Daily memories. Look for patterns, \ + discoveries, and progress toward goals. + +3. Identify and classify findings: + - **Recurring errors**: problems that appeared more than once + - **Successful strategies**: approaches that worked well + - **New discoveries**: information or capabilities learned + - **Blocked goals**: objectives that could not be completed and why + +4. Synthesize a concise summary (max 500 words) of actionable learnings. \ + Focus on what should change going forward, not just what happened. + +5. Store the summary using `memory_store` with category \"core\" and \ + key format \"consolidation_YYYY-MM-DD\" (use today's date). + +6. If the workspace file `MEMORY.md` exists, use `file_read` to read it, \ + then use `file_write` to append a dated section at the end with the \ + top 3 learnings from today's consolidation. Format: + ``` + ## Consolidation — YYYY-MM-DD + 1. + 2. + 3. + ``` + +If there is no meaningful activity to consolidate (no runs, no daily memories), \ +store a brief note confirming the check was performed and skip the MEMORY.md update."; + +/// Create a nightly memory consolidation cron agent job. +/// +/// Schedule: 3:00 AM daily (local time), configurable via `schedule_expr`. +/// Job type: agent with `__consolidate` marker in the name. +/// Session target: isolated (does not disturb main sessions). +pub fn create_consolidation_job(config: &Config) -> Result { + create_consolidation_job_with_schedule(config, DEFAULT_SCHEDULE_EXPR, None) +} + +/// Create a consolidation job with a custom cron expression and optional timezone. +pub fn create_consolidation_job_with_schedule( + config: &Config, + cron_expr: &str, + tz: Option, +) -> Result { + let schedule = Schedule::Cron { + expr: cron_expr.into(), + tz, + }; + + add_agent_job( + config, + Some(CONSOLIDATION_JOB_NAME.into()), + schedule, + CONSOLIDATION_PROMPT, + SessionTarget::Isolated, + None, // use default model + None, // no delivery config + false, // recurring job — do not delete after run + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cron::{JobType, Schedule, SessionTarget}; + use tempfile::TempDir; + + fn test_config(tmp: &TempDir) -> Config { + let config = Config { + workspace_dir: tmp.path().join("workspace"), + config_path: tmp.path().join("config.toml"), + ..Config::default() + }; + std::fs::create_dir_all(&config.workspace_dir).unwrap(); + config + } + + #[test] + fn create_consolidation_job_produces_valid_job() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let job = create_consolidation_job(&config).unwrap(); + + assert_eq!(job.name.as_deref(), Some(CONSOLIDATION_JOB_NAME)); + assert_eq!(job.job_type, JobType::Agent); + assert_eq!(job.session_target, SessionTarget::Isolated); + assert!(!job.delete_after_run); + assert!(job.enabled); + } + + #[test] + fn create_consolidation_job_uses_correct_schedule() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let job = create_consolidation_job(&config).unwrap(); + + match &job.schedule { + Schedule::Cron { expr, tz } => { + assert_eq!(expr, DEFAULT_SCHEDULE_EXPR); + assert!(tz.is_none()); + } + other => panic!("Expected Cron schedule, got {other:?}"), + } + } + + #[test] + fn create_consolidation_job_prompt_contains_key_instructions() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let job = create_consolidation_job(&config).unwrap(); + let prompt = job.prompt.expect("consolidation job must have a prompt"); + + assert!( + prompt.contains("memory_recall"), + "prompt should instruct use of memory_recall" + ); + assert!( + prompt.contains("memory_store"), + "prompt should instruct use of memory_store" + ); + assert!( + prompt.contains("cron_runs"), + "prompt should instruct use of cron_runs" + ); + assert!( + prompt.contains("consolidation_YYYY-MM-DD"), + "prompt should specify key format" + ); + assert!( + prompt.contains("core"), + "prompt should specify core category" + ); + assert!( + prompt.contains("MEMORY.md"), + "prompt should mention MEMORY.md" + ); + } + + #[test] + fn create_consolidation_job_with_custom_schedule_applies_tz() { + let tmp = TempDir::new().unwrap(); + let config = test_config(&tmp); + + let job = create_consolidation_job_with_schedule( + &config, + "0 4 * * *", + Some("America/New_York".into()), + ) + .unwrap(); + + match &job.schedule { + Schedule::Cron { expr, tz } => { + assert_eq!(expr, "0 4 * * *"); + assert_eq!(tz.as_deref(), Some("America/New_York")); + } + other => panic!("Expected Cron schedule, got {other:?}"), + } + } +} diff --git a/src/cron/mod.rs b/src/cron/mod.rs index 49db429d0..d93ce6d98 100644 --- a/src/cron/mod.rs +++ b/src/cron/mod.rs @@ -2,6 +2,7 @@ use crate::config::Config; use crate::security::SecurityPolicy; use anyhow::{bail, Result}; +pub mod consolidation; mod schedule; mod store; mod types; diff --git a/src/goals/engine.rs b/src/goals/engine.rs new file mode 100644 index 000000000..6d12214b6 --- /dev/null +++ b/src/goals/engine.rs @@ -0,0 +1,929 @@ +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use std::fmt::Write as _; +use std::path::{Path, PathBuf}; + +/// Maximum retry attempts per step before marking the goal as blocked. +const MAX_STEP_ATTEMPTS: u32 = 3; + +// ── Data Structures ───────────────────────────────────────────── + +/// Root state persisted to `{workspace}/state/goals.json`. +/// Format matches the `goal-tracker` skill's file layout. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct GoalState { + #[serde(default)] + pub goals: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Goal { + pub id: String, + pub description: String, + #[serde(default)] + pub status: GoalStatus, + #[serde(default)] + pub priority: GoalPriority, + #[serde(default)] + pub created_at: String, + #[serde(default)] + pub updated_at: String, + #[serde(default)] + pub steps: Vec, + /// Accumulated context from previous step results. + #[serde(default)] + pub context: String, + /// Last error encountered during step execution. + #[serde(default)] + pub last_error: Option, +} + +#[derive(Debug, Clone, Serialize, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum GoalStatus { + #[default] + Pending, + InProgress, + Completed, + Blocked, + Cancelled, +} + +impl<'de> Deserialize<'de> for GoalStatus { + fn deserialize>(d: D) -> Result { + let s = String::deserialize(d)?; + Ok(match s.as_str() { + "in_progress" => Self::InProgress, + "completed" => Self::Completed, + "blocked" => Self::Blocked, + "cancelled" => Self::Cancelled, + _ => Self::Pending, + }) + } +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum GoalPriority { + Low = 0, + #[default] + Medium = 1, + High = 2, + Critical = 3, +} + +impl PartialOrd for GoalPriority { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for GoalPriority { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + (*self as u8).cmp(&(*other as u8)) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Step { + pub id: String, + pub description: String, + #[serde(default)] + pub status: StepStatus, + #[serde(default)] + pub result: Option, + #[serde(default)] + pub attempts: u32, +} + +#[derive(Debug, Clone, Serialize, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum StepStatus { + #[default] + Pending, + InProgress, + Completed, + Failed, + Blocked, +} + +impl<'de> Deserialize<'de> for StepStatus { + fn deserialize>(d: D) -> Result { + let s = String::deserialize(d)?; + Ok(match s.as_str() { + "in_progress" => Self::InProgress, + "completed" => Self::Completed, + "failed" => Self::Failed, + "blocked" => Self::Blocked, + _ => Self::Pending, + }) + } +} + +// ── GoalEngine ────────────────────────────────────────────────── + +pub struct GoalEngine { + state_path: PathBuf, +} + +impl GoalEngine { + pub fn new(workspace_dir: &Path) -> Self { + Self { + state_path: workspace_dir.join("state").join("goals.json"), + } + } + + /// Load goal state from disk. Returns empty state if file doesn't exist. + pub async fn load_state(&self) -> Result { + if !self.state_path.exists() { + return Ok(GoalState::default()); + } + let bytes = tokio::fs::read(&self.state_path).await?; + if bytes.is_empty() { + return Ok(GoalState::default()); + } + let state: GoalState = serde_json::from_slice(&bytes)?; + Ok(state) + } + + /// Atomic save: write to .tmp then rename. + pub async fn save_state(&self, state: &GoalState) -> Result<()> { + if let Some(parent) = self.state_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let tmp = self.state_path.with_extension("json.tmp"); + let data = serde_json::to_vec_pretty(state)?; + tokio::fs::write(&tmp, data).await?; + tokio::fs::rename(&tmp, &self.state_path).await?; + Ok(()) + } + + /// Select the next actionable (goal_index, step_index) pair. + /// + /// Strategy: highest-priority in-progress goal, first pending step + /// that hasn't exceeded `MAX_STEP_ATTEMPTS`. + pub fn select_next_actionable(state: &GoalState) -> Option<(usize, usize)> { + let mut best: Option<(usize, usize, GoalPriority)> = None; + + for (gi, goal) in state.goals.iter().enumerate() { + if goal.status != GoalStatus::InProgress { + continue; + } + if let Some(si) = goal + .steps + .iter() + .position(|s| s.status == StepStatus::Pending && s.attempts < MAX_STEP_ATTEMPTS) + { + match best { + Some((_, _, ref bp)) if goal.priority <= *bp => {} + _ => best = Some((gi, si, goal.priority)), + } + } + } + + best.map(|(gi, si, _)| (gi, si)) + } + + /// Build a focused prompt for the agent to execute one step. + pub fn build_step_prompt(goal: &Goal, step: &Step) -> String { + let mut prompt = String::new(); + + let _ = writeln!( + prompt, + "[Goal Loop] Executing step for goal: {}\n", + goal.description + ); + + // Completed steps summary + let completed: Vec<&Step> = goal + .steps + .iter() + .filter(|s| s.status == StepStatus::Completed) + .collect(); + if !completed.is_empty() { + prompt.push_str("Completed steps:\n"); + for s in &completed { + let _ = writeln!( + prompt, + "- [done] {}: {}", + s.description, + s.result.as_deref().unwrap_or("(no result)") + ); + } + prompt.push('\n'); + } + + // Accumulated context + if !goal.context.is_empty() { + let _ = write!(prompt, "Context so far:\n{}\n\n", goal.context); + } + + // Current step + let _ = write!( + prompt, + "Current step: {}\n\ + Please execute this step. Provide a clear summary of what you did and the outcome.\n", + step.description + ); + + // Retry warning + if step.attempts > 0 { + let _ = write!( + prompt, + "\nWARNING: This step has failed {} time(s) before. \ + Last error: {}\n\ + Try a different approach.\n", + step.attempts, + goal.last_error.as_deref().unwrap_or("unknown") + ); + } + + prompt + } + + /// Simple heuristic: output containing error indicators → failure. + pub fn interpret_result(output: &str) -> bool { + let lower = output.to_ascii_lowercase(); + let failure_indicators = [ + "failed to", + "error:", + "unable to", + "cannot ", + "could not", + "fatal:", + "panic:", + ]; + !failure_indicators.iter().any(|ind| lower.contains(ind)) + } + + pub fn max_step_attempts() -> u32 { + MAX_STEP_ATTEMPTS + } + + /// Find in-progress goals that have no actionable steps remaining. + /// + /// A goal is "stalled" when it is `InProgress` but every step is either + /// completed, blocked, or has exhausted its retry attempts. These goals + /// need a reflection session to decide: add new steps, mark completed, + /// mark blocked, or escalate to the user. + pub fn find_stalled_goals(state: &GoalState) -> Vec { + state + .goals + .iter() + .enumerate() + .filter(|(_, g)| g.status == GoalStatus::InProgress) + .filter(|(_, g)| { + !g.steps.is_empty() + && !g.steps.iter().any(|s| { + s.status == StepStatus::Pending && s.attempts < MAX_STEP_ATTEMPTS + }) + }) + .map(|(i, _)| i) + .collect() + } + + /// Build a reflection prompt for a stalled goal. + /// + /// The agent is asked to review the goal's overall progress and decide + /// what to do next: add new steps, mark the goal completed, or escalate. + pub fn build_reflection_prompt(goal: &Goal) -> String { + let mut prompt = String::new(); + + let _ = writeln!( + prompt, + "[Goal Reflection] Goal: {}\n", + goal.description + ); + + prompt.push_str("All steps have been attempted. Here is the current state:\n\n"); + + for s in &goal.steps { + let status_tag = match s.status { + StepStatus::Completed => "done", + StepStatus::Failed | StepStatus::Blocked => "blocked", + _ if s.attempts >= MAX_STEP_ATTEMPTS => "exhausted", + _ => "pending", + }; + let result = s.result.as_deref().unwrap_or("(no result)"); + let _ = writeln!(prompt, "- [{status_tag}] {}: {result}", s.description); + } + + if !goal.context.is_empty() { + let _ = write!(prompt, "\nAccumulated context:\n{}\n", goal.context); + } + + if let Some(ref err) = goal.last_error { + let _ = write!(prompt, "\nLast error: {err}\n"); + } + + prompt.push_str( + "\nReflect on this goal and take ONE of the following actions:\n\ + 1. If the goal is effectively achieved, update state/goals.json to mark it `completed`.\n\ + 2. If some steps failed but you can try a different approach, add NEW steps to \ + state/goals.json with fresh descriptions (don't reuse failed step IDs).\n\ + 3. If the goal is truly blocked and needs human input, mark it `blocked` in \ + state/goals.json and explain what you need from the user.\n\ + 4. Use memory_store to record what you learned from the failures.\n\n\ + Be decisive. Do not leave the goal in its current state.", + ); + + prompt + } + +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn sample_goal_state() -> GoalState { + GoalState { + goals: vec![ + Goal { + id: "g1".into(), + description: "Build automation platform".into(), + status: GoalStatus::InProgress, + priority: GoalPriority::High, + created_at: "2026-01-01T00:00:00Z".into(), + updated_at: "2026-01-01T00:00:00Z".into(), + steps: vec![ + Step { + id: "s1".into(), + description: "Research tools".into(), + status: StepStatus::Completed, + result: Some("Found 3 tools".into()), + attempts: 1, + }, + Step { + id: "s2".into(), + description: "Setup environment".into(), + status: StepStatus::Pending, + result: None, + attempts: 0, + }, + Step { + id: "s3".into(), + description: "Write code".into(), + status: StepStatus::Pending, + result: None, + attempts: 0, + }, + ], + context: "Using Python + Selenium".into(), + last_error: None, + }, + Goal { + id: "g2".into(), + description: "Learn Rust".into(), + status: GoalStatus::InProgress, + priority: GoalPriority::Medium, + created_at: "2026-01-02T00:00:00Z".into(), + updated_at: "2026-01-02T00:00:00Z".into(), + steps: vec![Step { + id: "s1".into(), + description: "Read the book".into(), + status: StepStatus::Pending, + result: None, + attempts: 0, + }], + context: String::new(), + last_error: None, + }, + ], + } + } + + #[test] + fn goal_loop_config_serde_roundtrip() { + let toml_str = r#" +enabled = true +interval_minutes = 15 +step_timeout_secs = 180 +max_steps_per_cycle = 5 +channel = "lark" +target = "oc_test" +"#; + let config: crate::config::GoalLoopConfig = toml::from_str(toml_str).unwrap(); + assert!(config.enabled); + assert_eq!(config.interval_minutes, 15); + assert_eq!(config.step_timeout_secs, 180); + assert_eq!(config.max_steps_per_cycle, 5); + assert_eq!(config.channel.as_deref(), Some("lark")); + assert_eq!(config.target.as_deref(), Some("oc_test")); + } + + #[test] + fn goal_loop_config_defaults() { + let config = crate::config::GoalLoopConfig::default(); + assert!(!config.enabled); + assert_eq!(config.interval_minutes, 10); + assert_eq!(config.step_timeout_secs, 120); + assert_eq!(config.max_steps_per_cycle, 3); + assert!(config.channel.is_none()); + assert!(config.target.is_none()); + } + + #[test] + fn goal_state_serde_roundtrip() { + let state = sample_goal_state(); + let json = serde_json::to_string_pretty(&state).unwrap(); + let parsed: GoalState = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.goals.len(), 2); + assert_eq!(parsed.goals[0].steps.len(), 3); + assert_eq!(parsed.goals[0].steps[0].status, StepStatus::Completed); + } + + #[test] + fn select_next_actionable_picks_highest_priority() { + let state = sample_goal_state(); + let result = GoalEngine::select_next_actionable(&state); + // g1 (High) step s2 should be selected over g2 (Medium) + assert_eq!(result, Some((0, 1))); + } + + #[test] + fn select_next_actionable_skips_exhausted_steps() { + let mut state = sample_goal_state(); + // Exhaust s2 attempts + state.goals[0].steps[1].attempts = MAX_STEP_ATTEMPTS; + let result = GoalEngine::select_next_actionable(&state); + // Should skip s2, pick s3 + assert_eq!(result, Some((0, 2))); + } + + #[test] + fn select_next_actionable_skips_non_in_progress_goals() { + let mut state = sample_goal_state(); + state.goals[0].status = GoalStatus::Completed; + let result = GoalEngine::select_next_actionable(&state); + // g1 completed, should pick g2 s1 + assert_eq!(result, Some((1, 0))); + } + + #[test] + fn select_next_actionable_returns_none_when_nothing_actionable() { + let state = GoalState::default(); + assert!(GoalEngine::select_next_actionable(&state).is_none()); + } + + #[test] + fn build_step_prompt_includes_goal_and_step() { + let state = sample_goal_state(); + let prompt = GoalEngine::build_step_prompt(&state.goals[0], &state.goals[0].steps[1]); + assert!(prompt.contains("Build automation platform")); + assert!(prompt.contains("Setup environment")); + assert!(prompt.contains("Research tools")); + assert!(prompt.contains("Using Python + Selenium")); + assert!(!prompt.contains("WARNING")); // no retries yet + } + + #[test] + fn build_step_prompt_includes_retry_warning() { + let mut state = sample_goal_state(); + state.goals[0].steps[1].attempts = 2; + state.goals[0].last_error = Some("connection refused".into()); + let prompt = GoalEngine::build_step_prompt(&state.goals[0], &state.goals[0].steps[1]); + assert!(prompt.contains("WARNING")); + assert!(prompt.contains("2 time(s)")); + assert!(prompt.contains("connection refused")); + } + + #[test] + fn interpret_result_success() { + assert!(GoalEngine::interpret_result( + "Successfully set up the environment" + )); + assert!(GoalEngine::interpret_result("Done. All tasks completed.")); + } + + #[test] + fn interpret_result_failure() { + assert!(!GoalEngine::interpret_result("Failed to install package")); + assert!(!GoalEngine::interpret_result( + "Error: connection timeout occurred" + )); + assert!(!GoalEngine::interpret_result("Unable to find the resource")); + assert!(!GoalEngine::interpret_result("cannot open file")); + assert!(!GoalEngine::interpret_result("Fatal: repository not found")); + } + + #[tokio::test] + async fn load_save_state_roundtrip() { + let tmp = TempDir::new().unwrap(); + let engine = GoalEngine::new(tmp.path()); + + // Initially empty + let empty = engine.load_state().await.unwrap(); + assert!(empty.goals.is_empty()); + + // Save and reload + let state = sample_goal_state(); + engine.save_state(&state).await.unwrap(); + let loaded = engine.load_state().await.unwrap(); + assert_eq!(loaded.goals.len(), 2); + assert_eq!(loaded.goals[0].id, "g1"); + assert_eq!(loaded.goals[1].priority, GoalPriority::Medium); + } + + #[test] + fn priority_ordering() { + assert!(GoalPriority::Critical > GoalPriority::High); + assert!(GoalPriority::High > GoalPriority::Medium); + assert!(GoalPriority::Medium > GoalPriority::Low); + } + + #[test] + fn goal_status_default_is_pending() { + assert_eq!(GoalStatus::default(), GoalStatus::Pending); + } + + #[test] + fn step_status_default_is_pending() { + assert_eq!(StepStatus::default(), StepStatus::Pending); + } + + #[test] + fn find_stalled_goals_detects_exhausted_steps() { + let state = GoalState { + goals: vec![Goal { + id: "g1".into(), + description: "Stalled goal".into(), + status: GoalStatus::InProgress, + priority: GoalPriority::High, + created_at: String::new(), + updated_at: String::new(), + steps: vec![ + Step { + id: "s1".into(), + description: "Done step".into(), + status: StepStatus::Completed, + result: Some("ok".into()), + attempts: 1, + }, + Step { + id: "s2".into(), + description: "Exhausted step".into(), + status: StepStatus::Pending, + result: None, + attempts: 3, // >= MAX_STEP_ATTEMPTS + }, + ], + context: String::new(), + last_error: Some("step failed 3 times".into()), + }], + }; + + let stalled = GoalEngine::find_stalled_goals(&state); + assert_eq!(stalled, vec![0]); + } + + #[test] + fn find_stalled_goals_ignores_actionable_goals() { + let state = sample_goal_state(); // has pending steps with attempts=0 + let stalled = GoalEngine::find_stalled_goals(&state); + assert!(stalled.is_empty()); + } + + #[test] + fn find_stalled_goals_ignores_completed_goals() { + let state = GoalState { + goals: vec![Goal { + id: "g1".into(), + description: "Done".into(), + status: GoalStatus::Completed, + priority: GoalPriority::Medium, + created_at: String::new(), + updated_at: String::new(), + steps: vec![Step { + id: "s1".into(), + description: "Only step".into(), + status: StepStatus::Completed, + result: Some("ok".into()), + attempts: 1, + }], + context: String::new(), + last_error: None, + }], + }; + + let stalled = GoalEngine::find_stalled_goals(&state); + assert!(stalled.is_empty()); + } + + #[test] + fn build_reflection_prompt_includes_step_summary() { + let goal = Goal { + id: "g1".into(), + description: "Test reflection".into(), + status: GoalStatus::InProgress, + priority: GoalPriority::High, + created_at: String::new(), + updated_at: String::new(), + steps: vec![ + Step { + id: "s1".into(), + description: "Completed step".into(), + status: StepStatus::Completed, + result: Some("worked".into()), + attempts: 1, + }, + Step { + id: "s2".into(), + description: "Failed step".into(), + status: StepStatus::Pending, + result: None, + attempts: 3, + }, + ], + context: "some context".into(), + last_error: Some("policy_denied".into()), + }; + + let prompt = GoalEngine::build_reflection_prompt(&goal); + assert!(prompt.contains("[Goal Reflection]")); + assert!(prompt.contains("Test reflection")); + assert!(prompt.contains("[done] Completed step")); + assert!(prompt.contains("[exhausted] Failed step")); + assert!(prompt.contains("some context")); + assert!(prompt.contains("policy_denied")); + assert!(prompt.contains("memory_store")); + } + + // ── Self-healing deserialization tests ─────────────────────── + + #[test] + fn goal_status_deserializes_all_valid_variants() { + let cases = vec![ + ("\"pending\"", GoalStatus::Pending), + ("\"in_progress\"", GoalStatus::InProgress), + ("\"completed\"", GoalStatus::Completed), + ("\"blocked\"", GoalStatus::Blocked), + ("\"cancelled\"", GoalStatus::Cancelled), + ]; + for (json_str, expected) in cases { + let parsed: GoalStatus = + serde_json::from_str(json_str).unwrap_or_else(|e| panic!("{json_str}: {e}")); + assert_eq!(parsed, expected, "GoalStatus mismatch for {json_str}"); + } + } + + #[test] + fn goal_status_self_healing_unknown_variants() { + for variant in &["\"unknown\"", "\"invalid\"", "\"PENDING\"", "\"IN_PROGRESS\"", "\"\""] { + let parsed: GoalStatus = + serde_json::from_str(variant).unwrap_or_else(|e| panic!("{variant}: {e}")); + assert_eq!(parsed, GoalStatus::Pending); + } + } + + #[test] + fn step_status_deserializes_all_valid_variants() { + let cases = vec![ + ("\"pending\"", StepStatus::Pending), + ("\"in_progress\"", StepStatus::InProgress), + ("\"completed\"", StepStatus::Completed), + ("\"failed\"", StepStatus::Failed), + ("\"blocked\"", StepStatus::Blocked), + ]; + for (json_str, expected) in cases { + let parsed: StepStatus = + serde_json::from_str(json_str).unwrap_or_else(|e| panic!("{json_str}: {e}")); + assert_eq!(parsed, expected, "StepStatus mismatch for {json_str}"); + } + } + + #[test] + fn step_status_self_healing_unknown_variants() { + for variant in &["\"unknown\"", "\"done\"", "\"FAILED\"", "\"\""] { + let parsed: StepStatus = + serde_json::from_str(variant).unwrap_or_else(|e| panic!("{variant}: {e}")); + assert_eq!(parsed, StepStatus::Pending); + } + } + + #[test] + fn goal_status_self_healing_in_full_goal_json() { + let json = r#"{"id":"g1","description":"test","status":"totally_bogus","steps":[]}"#; + let goal: Goal = serde_json::from_str(json).unwrap(); + assert_eq!(goal.status, GoalStatus::Pending); + } + + // ── find_stalled_goals edge cases ─────────────────────────── + + #[test] + fn find_stalled_goals_empty_steps_not_stalled() { + let state = GoalState { + goals: vec![Goal { + id: "g1".into(), + description: "No steps".into(), + status: GoalStatus::InProgress, + priority: GoalPriority::High, + created_at: String::new(), + updated_at: String::new(), + steps: vec![], + context: String::new(), + last_error: None, + }], + }; + assert!(GoalEngine::find_stalled_goals(&state).is_empty()); + } + + #[test] + fn find_stalled_goals_multiple_stalled() { + let stalled_goal = |id: &str| Goal { + id: id.into(), + description: format!("Stalled {id}"), + status: GoalStatus::InProgress, + priority: GoalPriority::Medium, + created_at: String::new(), + updated_at: String::new(), + steps: vec![Step { + id: "s1".into(), + description: "Exhausted".into(), + status: StepStatus::Pending, + result: None, + attempts: MAX_STEP_ATTEMPTS, + }], + context: String::new(), + last_error: None, + }; + let state = GoalState { + goals: vec![stalled_goal("g1"), stalled_goal("g2"), stalled_goal("g3")], + }; + assert_eq!(GoalEngine::find_stalled_goals(&state), vec![0, 1, 2]); + } + + #[test] + fn find_stalled_goals_all_steps_completed_is_stalled() { + let state = GoalState { + goals: vec![Goal { + id: "g1".into(), + description: "All done but still in-progress".into(), + status: GoalStatus::InProgress, + priority: GoalPriority::High, + created_at: String::new(), + updated_at: String::new(), + steps: vec![ + Step { + id: "s1".into(), + description: "Done".into(), + status: StepStatus::Completed, + result: Some("ok".into()), + attempts: 1, + }, + Step { + id: "s2".into(), + description: "Also done".into(), + status: StepStatus::Completed, + result: Some("ok".into()), + attempts: 1, + }, + ], + context: String::new(), + last_error: None, + }], + }; + assert_eq!(GoalEngine::find_stalled_goals(&state), vec![0]); + } + + #[test] + fn find_stalled_goals_mix_completed_and_blocked_steps() { + let state = GoalState { + goals: vec![Goal { + id: "g1".into(), + description: "Mixed".into(), + status: GoalStatus::InProgress, + priority: GoalPriority::High, + created_at: String::new(), + updated_at: String::new(), + steps: vec![ + Step { + id: "s1".into(), + description: "Done".into(), + status: StepStatus::Completed, + result: Some("ok".into()), + attempts: 1, + }, + Step { + id: "s2".into(), + description: "Blocked".into(), + status: StepStatus::Blocked, + result: None, + attempts: 0, + }, + ], + context: String::new(), + last_error: None, + }], + }; + assert_eq!(GoalEngine::find_stalled_goals(&state), vec![0]); + } + + // ── build_reflection_prompt edge cases ─────────────────────── + + #[test] + fn build_reflection_prompt_empty_context_omits_section() { + let goal = Goal { + id: "g1".into(), + description: "Empty context".into(), + status: GoalStatus::InProgress, + priority: GoalPriority::High, + created_at: String::new(), + updated_at: String::new(), + steps: vec![Step { + id: "s1".into(), + description: "Step".into(), + status: StepStatus::Completed, + result: Some("ok".into()), + attempts: 1, + }], + context: String::new(), + last_error: None, + }; + let prompt = GoalEngine::build_reflection_prompt(&goal); + assert!(!prompt.contains("Accumulated context")); + } + + #[test] + fn build_reflection_prompt_no_last_error_omits_section() { + let goal = Goal { + id: "g1".into(), + description: "No error".into(), + status: GoalStatus::InProgress, + priority: GoalPriority::High, + created_at: String::new(), + updated_at: String::new(), + steps: vec![Step { + id: "s1".into(), + description: "Step".into(), + status: StepStatus::Completed, + result: Some("ok".into()), + attempts: 1, + }], + context: "some ctx".into(), + last_error: None, + }; + let prompt = GoalEngine::build_reflection_prompt(&goal); + assert!(!prompt.contains("Last error")); + } + + #[test] + fn build_reflection_prompt_all_done_tags() { + let goal = Goal { + id: "g1".into(), + description: "All done".into(), + status: GoalStatus::InProgress, + priority: GoalPriority::High, + created_at: String::new(), + updated_at: String::new(), + steps: vec![ + Step { + id: "s1".into(), + description: "First".into(), + status: StepStatus::Completed, + result: Some("ok".into()), + attempts: 1, + }, + Step { + id: "s2".into(), + description: "Second".into(), + status: StepStatus::Completed, + result: Some("ok".into()), + attempts: 1, + }, + ], + context: String::new(), + last_error: None, + }; + let prompt = GoalEngine::build_reflection_prompt(&goal); + assert!(prompt.contains("[done] First")); + assert!(prompt.contains("[done] Second")); + assert!(!prompt.contains("[exhausted]")); + assert!(!prompt.contains("[blocked]")); + } + + // ── GoalPriority comparison and serde ──────────────────────── + + #[test] + fn priority_all_comparisons() { + assert!(GoalPriority::Critical > GoalPriority::High); + assert!(GoalPriority::High > GoalPriority::Medium); + assert!(GoalPriority::Medium > GoalPriority::Low); + assert!(GoalPriority::Low < GoalPriority::Critical); + } + + #[test] + fn priority_serde_roundtrip_all_variants() { + for priority in &[ + GoalPriority::Low, + GoalPriority::Medium, + GoalPriority::High, + GoalPriority::Critical, + ] { + let json = serde_json::to_string(priority).unwrap(); + let parsed: GoalPriority = serde_json::from_str(&json).unwrap(); + assert_eq!(*priority, parsed); + } + } +} diff --git a/src/goals/mod.rs b/src/goals/mod.rs new file mode 100644 index 000000000..702e611f1 --- /dev/null +++ b/src/goals/mod.rs @@ -0,0 +1 @@ +pub mod engine; diff --git a/src/lib.rs b/src/lib.rs index 09466d2e8..db3b589db 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,6 +50,7 @@ pub(crate) mod cron; pub(crate) mod daemon; pub(crate) mod doctor; pub mod gateway; +pub mod goals; pub(crate) mod hardware; pub(crate) mod health; pub(crate) mod heartbeat; diff --git a/src/main.rs b/src/main.rs index ec796b66d..fd5edc3c7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -63,6 +63,7 @@ mod cron; mod daemon; mod doctor; mod gateway; +mod goals; mod hardware; mod health; mod heartbeat; diff --git a/src/onboard/wizard.rs b/src/onboard/wizard.rs index 90f52f3a3..2e38dc944 100644 --- a/src/onboard/wizard.rs +++ b/src/onboard/wizard.rs @@ -157,6 +157,7 @@ pub async fn run_wizard(force: bool) -> Result { embedding_routes: Vec::new(), heartbeat: HeartbeatConfig::default(), cron: crate::config::CronConfig::default(), + goal_loop: crate::config::schema::GoalLoopConfig::default(), channels_config, memory: memory_config, // User-selected memory backend storage: StorageConfig::default(), @@ -514,6 +515,7 @@ async fn run_quick_setup_with_home( embedding_routes: Vec::new(), heartbeat: HeartbeatConfig::default(), cron: crate::config::CronConfig::default(), + goal_loop: crate::config::schema::GoalLoopConfig::default(), channels_config: ChannelsConfig::default(), memory: memory_config, storage: StorageConfig::default(), diff --git a/src/service/mod.rs b/src/service/mod.rs index 9c3c6da1b..8ba096c43 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -391,6 +391,10 @@ fn install_macos(config: &Config) -> Result<()> { let stdout = logs_dir.join("daemon.stdout.log"); let stderr = logs_dir.join("daemon.stderr.log"); + // Forward provider-related env vars into the launchd plist so the daemon + // can refresh OAuth tokens and authenticate without manual intervention. + let env_block = build_launchd_env_vars(); + let plist = format!( r#" @@ -406,7 +410,7 @@ fn install_macos(config: &Config) -> Result<()> { RunAtLoad KeepAlive - + {env_block} StandardOutPath {stdout} StandardErrorPath @@ -416,6 +420,7 @@ fn install_macos(config: &Config) -> Result<()> { "#, label = SERVICE_LABEL, exe = xml_escape(&exe.display().to_string()), + env_block = env_block, stdout = xml_escape(&stdout.display().to_string()), stderr = xml_escape(&stderr.display().to_string()) ); @@ -441,9 +446,11 @@ fn install_linux_systemd(config: &Config) -> Result<()> { } let exe = std::env::current_exe().context("Failed to resolve current executable")?; + let env_lines = build_systemd_env_vars(); let unit = format!( - "[Unit]\nDescription=ZeroClaw daemon\nAfter=network.target\n\n[Service]\nType=simple\nExecStart={} daemon\nRestart=always\nRestartSec=3\n\n[Install]\nWantedBy=default.target\n", - exe.display() + "[Unit]\nDescription=ZeroClaw daemon\nAfter=network.target\n\n[Service]\nType=simple\nExecStart={exe} daemon\nRestart=always\nRestartSec=3\n{env_lines}\n[Install]\nWantedBy=default.target\n", + exe = exe.display(), + env_lines = env_lines, ); fs::write(&file, unit)?; @@ -1037,6 +1044,61 @@ fn install_windows(config: &Config) -> Result<()> { Ok(()) } +/// Env vars to forward into the launchd / systemd service so that the daemon +/// can refresh OAuth tokens and authenticate with providers automatically. +const SERVICE_ENV_VARS: &[&str] = &[ + "GEMINI_API_KEY", + "GEMINI_CLI_CLIENT_ID", + "GEMINI_CLI_CLIENT_SECRET", + "OPENAI_API_KEY", + "ANTHROPIC_API_KEY", + "OPENROUTER_API_KEY", +]; + +/// Build the `EnvironmentVariables` plist fragment from env vars +/// present in the current process. Returns an empty string if none are set. +fn build_launchd_env_vars() -> String { + let mut entries = Vec::new(); + for &var in SERVICE_ENV_VARS { + if let Ok(val) = std::env::var(var) { + if !val.is_empty() { + entries.push(format!( + " {}\n {}", + xml_escape(var), + xml_escape(&val) + )); + } + } + } + if entries.is_empty() { + String::new() + } else { + format!( + "\n EnvironmentVariables\n \n{}\n ", + entries.join("\n") + ) + } +} + +/// Build `Environment=` lines for a systemd unit from env vars present in the +/// current process. Returns an empty string if none are set. +fn build_systemd_env_vars() -> String { + let mut lines = Vec::new(); + for &var in SERVICE_ENV_VARS { + if let Ok(val) = std::env::var(var) { + if !val.is_empty() { + // systemd Environment values with special chars need quoting + lines.push(format!("Environment=\"{var}={val}\"")); + } + } + } + if lines.is_empty() { + String::new() + } else { + format!("{}\n", lines.join("\n")) + } +} + fn macos_service_file() -> Result { let home = directories::UserDirs::new() .map(|u| u.home_dir().to_path_buf())