diff --git a/docs/i18n/zh-CN/reference/sop/observability.zh-CN.md b/docs/i18n/zh-CN/reference/sop/observability.zh-CN.md index 511116e27..565376551 100644 --- a/docs/i18n/zh-CN/reference/sop/observability.zh-CN.md +++ b/docs/i18n/zh-CN/reference/sop/observability.zh-CN.md @@ -12,8 +12,6 @@ SOP 审计条目通过 `SopAuditLogger` 持久化到配置的内存后端的 `so - `sop_step_{run_id}_{step_number}`:单步结果 - `sop_approval_{run_id}_{step_number}`:操作员审批记录 - `sop_timeout_approve_{run_id}_{step_number}`:超时自动审批记录 -- `sop_gate_decision_{gate_id}_{timestamp_ms}`:门评估器决策记录(启用 `ampersona-gates` 时) -- `sop_phase_state`:持久化的信任阶段状态快照(启用 `ampersona-gates` 时) ## 2. 检查路径 diff --git a/docs/reference/sop/observability.md b/docs/reference/sop/observability.md index eaa4f7999..6745228c8 100644 --- a/docs/reference/sop/observability.md +++ b/docs/reference/sop/observability.md @@ -12,8 +12,6 @@ Common key patterns: - `sop_step_{run_id}_{step_number}`: per-step result - `sop_approval_{run_id}_{step_number}`: operator approval record - `sop_timeout_approve_{run_id}_{step_number}`: timeout auto-approval record -- `sop_gate_decision_{gate_id}_{timestamp_ms}`: gate evaluator decision record (when `ampersona-gates` is enabled) -- `sop_phase_state`: persisted trust-phase state snapshot (when `ampersona-gates` is enabled) ## 2. Inspection Paths diff --git a/src/channels/lark.rs b/src/channels/lark.rs index 70c0ae5b2..a3b30a43a 100644 --- a/src/channels/lark.rs +++ b/src/channels/lark.rs @@ -1546,6 +1546,7 @@ impl LarkChannel { timestamp, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }] } diff --git a/src/channels/voice_wake.rs b/src/channels/voice_wake.rs index 4990b9125..1d2b7dcc8 100644 --- a/src/channels/voice_wake.rs +++ b/src/channels/voice_wake.rs @@ -238,6 +238,7 @@ impl Channel for VoiceWakeChannel { timestamp: ts, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; if let Err(e) = tx.send(msg).await { diff --git a/src/channels/wati.rs b/src/channels/wati.rs index 4e3cecbd9..8d6c53623 100644 --- a/src/channels/wati.rs +++ b/src/channels/wati.rs @@ -354,6 +354,7 @@ impl WatiChannel { timestamp, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }); messages diff --git a/src/config/mod.rs b/src/config/mod.rs index 298849f43..dd3dcc4da 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -27,11 +27,11 @@ pub use schema::{ QueryClassificationConfig, ReliabilityConfig, ResourceLimitsConfig, RuntimeConfig, SandboxBackend, SandboxConfig, SchedulerConfig, SecretsConfig, SecurityConfig, SecurityOpsConfig, SkillCreationConfig, SkillsConfig, SkillsPromptInjectionMode, SlackConfig, - StorageConfig, StorageProviderConfig, StorageProviderSection, StreamMode, SwarmConfig, - SwarmStrategy, TelegramConfig, TextBrowserConfig, ToolFilterGroup, ToolFilterGroupMode, - TranscriptionConfig, TtsConfig, TunnelConfig, VerifiableIntentConfig, WebFetchConfig, - WebSearchConfig, WebhookConfig, WhatsAppChatPolicy, WhatsAppWebMode, WorkspaceConfig, - DEFAULT_GWS_SERVICES, + SopConfig, StorageConfig, StorageProviderConfig, StorageProviderSection, StreamMode, + SwarmConfig, SwarmStrategy, TelegramConfig, TextBrowserConfig, ToolFilterGroup, + ToolFilterGroupMode, TranscriptionConfig, TtsConfig, TunnelConfig, VerifiableIntentConfig, + WebFetchConfig, WebSearchConfig, WebhookConfig, WhatsAppChatPolicy, WhatsAppWebMode, + WorkspaceConfig, DEFAULT_GWS_SERVICES, }; pub fn name_and_presence(channel: Option<&T>) -> (&'static str, bool) { diff --git a/src/config/schema.rs b/src/config/schema.rs index 8a4dbc6c9..0782c98f5 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -391,6 +391,10 @@ pub struct Config { /// Claude Code tool configuration (`[claude_code]`). #[serde(default)] pub claude_code: ClaudeCodeConfig, + + /// Standard Operating Procedures engine configuration (`[sop]`). + #[serde(default)] + pub sop: SopConfig, } /// Multi-client workspace isolation configuration. @@ -7340,6 +7344,7 @@ impl Default for Config { locale: None, verifiable_intent: VerifiableIntentConfig::default(), claude_code: ClaudeCodeConfig::default(), + sop: SopConfig::default(), } } } @@ -9850,6 +9855,70 @@ async fn sync_directory(path: &Path) -> Result<()> { } } +// ── SOP engine configuration ─────────────────────────────────── + +/// Standard Operating Procedures engine configuration (`[sop]`). +/// +/// The `default_execution_mode` field uses the `SopExecutionMode` type from +/// `sop::types` (re-exported via `sop::SopExecutionMode`). To avoid circular +/// module references, config stores it using the same enum definition. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct SopConfig { + /// Directory containing SOP definitions (subdirs with SOP.toml + SOP.md). + /// Falls back to `/sops` when omitted. + #[serde(default)] + pub sops_dir: Option, + + /// Default execution mode for SOPs that omit `execution_mode`. + /// Values: `auto`, `supervised` (default), `step_by_step`, + /// `priority_based`, `deterministic`. + #[serde(default = "default_sop_execution_mode")] + pub default_execution_mode: String, + + /// Maximum total concurrent SOP runs across all SOPs. + #[serde(default = "default_sop_max_concurrent_total")] + pub max_concurrent_total: usize, + + /// Approval timeout in seconds. When a run waits for approval longer than + /// this, Critical/High-priority SOPs auto-approve; others stay waiting. + /// Set to 0 to disable timeout. + #[serde(default = "default_sop_approval_timeout_secs")] + pub approval_timeout_secs: u64, + + /// Maximum number of finished runs kept in memory for status queries. + /// Oldest runs are evicted when over capacity. 0 = unlimited. + #[serde(default = "default_sop_max_finished_runs")] + pub max_finished_runs: usize, +} + +fn default_sop_execution_mode() -> String { + "supervised".to_string() +} + +fn default_sop_max_concurrent_total() -> usize { + 4 +} + +fn default_sop_approval_timeout_secs() -> u64 { + 300 +} + +fn default_sop_max_finished_runs() -> usize { + 100 +} + +impl Default for SopConfig { + fn default() -> Self { + Self { + sops_dir: None, + default_execution_mode: default_sop_execution_mode(), + max_concurrent_total: default_sop_max_concurrent_total(), + approval_timeout_secs: default_sop_approval_timeout_secs(), + max_finished_runs: default_sop_max_finished_runs(), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -10330,6 +10399,7 @@ default_temperature = 0.7 locale: None, verifiable_intent: VerifiableIntentConfig::default(), claude_code: ClaudeCodeConfig::default(), + sop: SopConfig::default(), }; let toml_str = toml::to_string_pretty(&config).unwrap(); @@ -10848,6 +10918,7 @@ default_temperature = 0.7 locale: None, verifiable_intent: VerifiableIntentConfig::default(), claude_code: ClaudeCodeConfig::default(), + sop: SopConfig::default(), }; config.save().await.unwrap(); @@ -14411,9 +14482,9 @@ require_otp_to_resume = true // ── Bootstrap files ───────────────────────────────────── - #[test] + #[tokio::test] async fn ensure_bootstrap_files_creates_missing_files() { - let tmp = TempDir::new().unwrap(); + let tmp = tempfile::TempDir::new().unwrap(); let ws = tmp.path().join("workspace"); let _: () = tokio::fs::create_dir_all(&ws).await.unwrap(); @@ -14427,9 +14498,9 @@ require_otp_to_resume = true assert!(identity.contains("IDENTITY.md")); } - #[test] + #[tokio::test] async fn ensure_bootstrap_files_does_not_overwrite_existing() { - let tmp = TempDir::new().unwrap(); + let tmp = tempfile::TempDir::new().unwrap(); let ws = tmp.path().join("workspace"); let _: () = tokio::fs::create_dir_all(&ws).await.unwrap(); diff --git a/src/lib.rs b/src/lib.rs index 6068dd4a9..f3548bc5a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,6 +71,7 @@ pub mod runtime; pub(crate) mod security; pub(crate) mod service; pub(crate) mod skills; +pub mod sop; pub mod tools; pub(crate) mod tunnel; pub(crate) mod util; @@ -561,3 +562,20 @@ Examples: /// Flash ZeroClaw firmware to Nucleo-F401RE (builds + probe-rs run) FlashNucleo, } + +/// SOP management subcommands +#[derive(Subcommand, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum SopCommands { + /// List loaded SOPs + List, + /// Validate SOP definitions + Validate { + /// SOP name to validate (all if omitted) + name: Option, + }, + /// Show details of an SOP + Show { + /// Name of the SOP to show + name: String, + }, +} diff --git a/src/main.rs b/src/main.rs index 026524bb6..b0ffe4840 100644 --- a/src/main.rs +++ b/src/main.rs @@ -107,6 +107,7 @@ mod security; mod service; mod skillforge; mod skills; +mod sop; mod tools; mod tunnel; mod util; @@ -117,7 +118,7 @@ use config::Config; // Re-export so binary modules can use crate:: while keeping a single source of truth. pub use zeroclaw::{ ChannelCommands, CronCommands, GatewayCommands, HardwareCommands, IntegrationCommands, - MigrateCommands, PeripheralCommands, ServiceCommands, SkillCommands, + MigrateCommands, PeripheralCommands, ServiceCommands, SkillCommands, SopCommands, }; #[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)] diff --git a/src/onboard/wizard.rs b/src/onboard/wizard.rs index d486cbc87..e9a6b293a 100644 --- a/src/onboard/wizard.rs +++ b/src/onboard/wizard.rs @@ -204,6 +204,7 @@ pub async fn run_wizard(force: bool) -> Result { locale: None, verifiable_intent: crate::config::VerifiableIntentConfig::default(), claude_code: crate::config::ClaudeCodeConfig::default(), + sop: crate::config::SopConfig::default(), }; println!( @@ -638,6 +639,7 @@ async fn run_quick_setup_with_home( locale: None, verifiable_intent: crate::config::VerifiableIntentConfig::default(), claude_code: crate::config::ClaudeCodeConfig::default(), + sop: crate::config::SopConfig::default(), }; config.save().await?; diff --git a/src/sop/audit.rs b/src/sop/audit.rs index 19d6f1115..d51cef403 100644 --- a/src/sop/audit.rs +++ b/src/sop/audit.rs @@ -78,33 +78,6 @@ impl SopAuditLogger { Ok(()) } - /// Log a gate evaluation decision record. - #[cfg(feature = "ampersona-gates")] - pub async fn log_gate_decision( - &self, - record: &ersona_engine::gates::decision::GateDecisionRecord, - ) -> Result<()> { - let timestamp_ms = chrono::Utc::now().timestamp_millis(); - let key = format!("sop_gate_decision_{}_{timestamp_ms}", record.gate_id); - let content = serde_json::to_string_pretty(record)?; - self.memory.store(&key, &content, category(), None).await?; - info!( - gate_id = %record.gate_id, - decision = %record.decision, - "SOP audit: gate decision logged" - ); - Ok(()) - } - - /// Persist (upsert) the current gate phase state. - #[cfg(feature = "ampersona-gates")] - pub async fn log_phase_state(&self, state: &ersona_core::state::PhaseState) -> Result<()> { - let key = "sop_phase_state"; - let content = serde_json::to_string_pretty(state)?; - self.memory.store(key, &content, category(), None).await?; - Ok(()) - } - /// Retrieve a stored run by ID (if it exists in memory). pub async fn get_run(&self, run_id: &str) -> Result> { let key = run_key(run_id); @@ -166,6 +139,7 @@ mod tests { completed_at: None, step_results: Vec::new(), waiting_since: None, + llm_calls_saved: 0, } } diff --git a/src/sop/dispatch.rs b/src/sop/dispatch.rs index 75f50ff5e..aaa00af78 100644 --- a/src/sop/dispatch.rs +++ b/src/sop/dispatch.rs @@ -24,7 +24,7 @@ pub enum DispatchResult { Started { run_id: String, sop_name: String, - action: SopRunAction, + action: Box, }, /// A matching SOP was found but could not start (cooldown / concurrency). Skipped { sop_name: String, reason: String }, @@ -39,6 +39,8 @@ fn extract_run_id_from_action(action: &SopRunAction) -> &str { match action { SopRunAction::ExecuteStep { run_id, .. } | SopRunAction::WaitApproval { run_id, .. } + | SopRunAction::DeterministicStep { run_id, .. } + | SopRunAction::CheckpointWait { run_id, .. } | SopRunAction::Completed { run_id, .. } | SopRunAction::Failed { run_id, .. } => run_id, } @@ -49,6 +51,8 @@ fn action_label(action: &SopRunAction) -> &'static str { match action { SopRunAction::ExecuteStep { .. } => "ExecuteStep", SopRunAction::WaitApproval { .. } => "WaitApproval", + SopRunAction::DeterministicStep { .. } => "DeterministicStep", + SopRunAction::CheckpointWait { .. } => "CheckpointWait", SopRunAction::Completed { .. } => "Completed", SopRunAction::Failed { .. } => "Failed", } @@ -62,7 +66,6 @@ fn action_label(action: &SopRunAction) -> &'static str { /// 1. Lock → `match_trigger` → collect SOP names → drop lock /// 2. Lock → for each name: `start_run` → collect results → drop lock /// 3. Async (no lock): audit each started run -#[tracing::instrument(skip(engine, audit), fields(source = %event.source, topic = ?event.topic))] pub async fn dispatch_sop_event( engine: &Arc>, audit: &SopAuditLogger, @@ -124,7 +127,7 @@ pub async fn dispatch_sop_event( results.push(DispatchResult::Started { run_id, sop_name: sop_name.clone(), - action, + action: Box::new(action), }); } Err(e) => { @@ -158,14 +161,14 @@ pub async fn dispatch_sop_event( /// approval timeout polling in the scheduler handles progression. /// For `ExecuteStep` actions, the run is started in the engine but steps /// cannot be executed without an agent loop — this is logged as a warning. -pub async fn process_headless_results(results: &[DispatchResult]) { +pub fn process_headless_results(results: &[DispatchResult]) { for result in results { match result { DispatchResult::Started { run_id, sop_name, action, - } => match action { + } => match action.as_ref() { SopRunAction::ExecuteStep { step, .. } => { warn!( "SOP headless dispatch: run {run_id} ('{sop_name}') ready for step {} \ @@ -180,6 +183,24 @@ pub async fn process_headless_results(results: &[DispatchResult]) { step.number, step.title, ); } + SopRunAction::DeterministicStep { step, .. } => { + info!( + "SOP headless dispatch: run {run_id} ('{sop_name}') deterministic step {} \ + '{}'", + step.number, step.title, + ); + } + SopRunAction::CheckpointWait { + step, state_file, .. + } => { + info!( + "SOP headless dispatch: run {run_id} ('{sop_name}') checkpoint at step {} \ + '{}', state persisted to {}", + step.number, + step.title, + state_file.display(), + ); + } SopRunAction::Completed { .. } => { info!( "SOP headless dispatch: run {run_id} ('{sop_name}') completed immediately" @@ -250,7 +271,7 @@ impl SopCronCache { for trigger in &sop.triggers { if let super::types::SopTrigger::Cron { expression } = trigger { // Normalize 5-field crontab to 6-field (prepend seconds) - let normalized = match crate::cron::schedule::normalize_expression(expression) { + let normalized = match crate::cron::normalize_expression(expression) { Ok(n) => n, Err(e) => { warn!( @@ -349,10 +370,13 @@ mod tests { body: "Do step one".into(), suggested_tools: vec![], requires_confirmation: false, + kind: crate::sop::SopStepKind::default(), + schema: None, }], cooldown_secs: 0, max_concurrent: 2, location: None, + deterministic: false, } } @@ -396,7 +420,7 @@ mod tests { let results = dispatch_sop_event(&engine, &audit, event).await; assert_eq!(results.len(), 1); assert!( - matches!(&results[0], DispatchResult::Started { sop_name, action, .. } if sop_name == "mqtt-sop" && matches!(action, SopRunAction::ExecuteStep { .. })) + matches!(&results[0], DispatchResult::Started { sop_name, action, .. } if sop_name == "mqtt-sop" && matches!(action.as_ref(), SopRunAction::ExecuteStep { .. })) ); } @@ -534,7 +558,7 @@ mod tests { assert_eq!(sop_name, "supervised-sop"); assert!(!run_id.is_empty()); assert!( - matches!(action, SopRunAction::WaitApproval { .. }), + matches!(action.as_ref(), SopRunAction::WaitApproval { .. }), "Supervised SOP must return WaitApproval, got {:?}", action ); @@ -561,7 +585,7 @@ mod tests { match &results[0] { DispatchResult::Started { action, .. } => { assert!( - matches!(action, SopRunAction::ExecuteStep { .. }), + matches!(action.as_ref(), SopRunAction::ExecuteStep { .. }), "Auto SOP must return ExecuteStep, got {:?}", action ); diff --git a/src/sop/engine.rs b/src/sop/engine.rs index fde3a69fc..4e2eca06e 100644 --- a/src/sop/engine.rs +++ b/src/sop/engine.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::fmt::Write as _; -use std::path::Path; +use std::path::{Path, PathBuf}; use anyhow::{bail, Result}; use tracing::{info, warn}; @@ -8,8 +8,9 @@ use tracing::{info, warn}; use super::condition::evaluate_condition; use super::load_sops; use super::types::{ - Sop, SopEvent, SopPriority, SopRun, SopRunAction, SopRunStatus, SopStep, SopStepResult, - SopStepStatus, SopTrigger, SopTriggerSource, + DeterministicRunState, DeterministicSavings, Sop, SopEvent, SopExecutionMode, SopPriority, + SopRun, SopRunAction, SopRunStatus, SopStep, SopStepKind, SopStepResult, SopStepStatus, + SopTrigger, SopTriggerSource, }; use crate::config::SopConfig; @@ -21,6 +22,8 @@ pub struct SopEngine { finished_runs: Vec, config: SopConfig, run_counter: u64, + /// Cumulative savings from deterministic execution. + deterministic_savings: DeterministicSavings, } impl SopEngine { @@ -32,6 +35,7 @@ impl SopEngine { finished_runs: Vec::new(), config, run_counter: 0, + deterministic_savings: DeterministicSavings::default(), } } @@ -40,7 +44,7 @@ impl SopEngine { self.sops = load_sops( workspace_dir, self.config.sops_dir.as_deref(), - self.config.default_execution_mode, + super::parse_execution_mode(&self.config.default_execution_mode), ); info!("SOP engine loaded {} SOPs", self.sops.len()); } @@ -118,7 +122,15 @@ impl SopEngine { } /// Start a new SOP run. Returns the first action to take. + /// Deterministic SOPs are automatically routed to `start_deterministic_run`. pub fn start_run(&mut self, sop_name: &str, event: SopEvent) -> Result { + // Route deterministic SOPs to dedicated path + if self.get_sop(sop_name).map_or(false, |s| { + s.execution_mode == SopExecutionMode::Deterministic + }) { + return self.start_deterministic_run(sop_name, event); + } + let sop = self .get_sop(sop_name) .ok_or_else(|| anyhow::anyhow!("SOP not found: {sop_name}"))? @@ -154,6 +166,7 @@ impl SopEngine { completed_at: None, step_results: Vec::new(), waiting_since: None, + llm_calls_saved: 0, }; self.active_runs.insert(run_id.clone(), run); @@ -283,6 +296,277 @@ impl SopEngine { .collect() } + /// Return cumulative deterministic execution savings. + pub fn deterministic_savings(&self) -> &DeterministicSavings { + &self.deterministic_savings + } + + // ── Deterministic execution ───────────────────────────────── + + /// Start a deterministic SOP run. Steps execute sequentially without LLM + /// round-trips. Returns the first action (DeterministicStep or CheckpointWait). + pub fn start_deterministic_run( + &mut self, + sop_name: &str, + event: SopEvent, + ) -> Result { + let sop = self + .get_sop(sop_name) + .ok_or_else(|| anyhow::anyhow!("SOP not found: {sop_name}"))? + .clone(); + + if sop.execution_mode != SopExecutionMode::Deterministic { + bail!( + "SOP '{}' is not in deterministic mode (mode: {})", + sop_name, + sop.execution_mode + ); + } + + if !self.can_start(sop_name) { + bail!( + "Cannot start SOP '{}': cooldown or concurrency limit reached", + sop_name + ); + } + + if sop.steps.is_empty() { + bail!("SOP '{}' has no steps defined", sop_name); + } + + self.run_counter += 1; + let dur = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default(); + let epoch_ms = dur.as_secs() * 1000 + u64::from(dur.subsec_millis()); + let run_id = format!("det-{epoch_ms}-{:04}", self.run_counter); + let now = now_iso8601(); + + let total_steps = u32::try_from(sop.steps.len()).unwrap_or(u32::MAX); + let run = SopRun { + run_id: run_id.clone(), + sop_name: sop_name.to_string(), + trigger_event: event, + status: SopRunStatus::Running, + current_step: 1, + total_steps, + started_at: now, + completed_at: None, + step_results: Vec::new(), + waiting_since: None, + llm_calls_saved: 0, + }; + + self.active_runs.insert(run_id.clone(), run); + info!( + "Deterministic SOP run {} started for '{}'", + run_id, sop_name + ); + + // Produce first step action + let step = sop.steps[0].clone(); + let input = serde_json::Value::Null; + self.resolve_deterministic_action(&sop, &run_id, &step, input) + } + + /// Advance a deterministic run with the output of the current step. + /// The output is piped as input to the next step. + pub fn advance_deterministic_step( + &mut self, + run_id: &str, + step_output: serde_json::Value, + ) -> Result { + let run = self + .active_runs + .get_mut(run_id) + .ok_or_else(|| anyhow::anyhow!("Active run not found: {run_id}"))?; + + let sop = self + .sops + .iter() + .find(|s| s.name == run.sop_name) + .ok_or_else(|| anyhow::anyhow!("SOP '{}' no longer loaded", run.sop_name))? + .clone(); + + // Record step result + let now = now_iso8601(); + let step_result = SopStepResult { + step_number: run.current_step, + status: SopStepStatus::Completed, + output: step_output.to_string(), + started_at: run.started_at.clone(), + completed_at: Some(now), + }; + run.step_results.push(step_result); + + // Each deterministic step saves one LLM call + run.llm_calls_saved += 1; + + // Advance to next step + let next_step_num = run.current_step + 1; + if next_step_num > run.total_steps { + info!( + "Deterministic SOP run {run_id} completed ({} LLM calls saved)", + run.llm_calls_saved + ); + let saved = run.llm_calls_saved; + self.deterministic_savings.total_llm_calls_saved += saved; + self.deterministic_savings.total_runs += 1; + return Ok(self.finish_run(run_id, SopRunStatus::Completed, None)); + } + + let run = self.active_runs.get_mut(run_id).unwrap(); + run.current_step = next_step_num; + + let step_idx = (next_step_num - 1) as usize; + let step = sop.steps[step_idx].clone(); + let run_id_owned = run_id.to_string(); + + self.resolve_deterministic_action(&sop, &run_id_owned, &step, step_output) + } + + /// Resume a deterministic run from persisted state. + pub fn resume_deterministic_run( + &mut self, + state: DeterministicRunState, + ) -> Result { + let run = self + .active_runs + .get_mut(&state.run_id) + .ok_or_else(|| anyhow::anyhow!("Active run not found: {}", state.run_id))?; + + if run.status != SopRunStatus::PausedCheckpoint { + bail!( + "Run {} is not paused at checkpoint (status: {})", + state.run_id, + run.status + ); + } + + let sop = self + .sops + .iter() + .find(|s| s.name == run.sop_name) + .ok_or_else(|| anyhow::anyhow!("SOP '{}' no longer loaded", run.sop_name))? + .clone(); + + run.status = SopRunStatus::Running; + run.waiting_since = None; + run.llm_calls_saved = state.llm_calls_saved; + + // Resume from the step after the last completed one + let next_step_num = state.last_completed_step + 1; + if next_step_num > state.total_steps { + info!( + "Deterministic SOP run {} completed on resume ({} LLM calls saved)", + state.run_id, state.llm_calls_saved + ); + self.deterministic_savings.total_llm_calls_saved += state.llm_calls_saved; + self.deterministic_savings.total_runs += 1; + return Ok(self.finish_run(&state.run_id, SopRunStatus::Completed, None)); + } + + let run = self.active_runs.get_mut(&state.run_id).unwrap(); + run.current_step = next_step_num; + + let step_idx = (next_step_num - 1) as usize; + let step = sop.steps[step_idx].clone(); + + // Use last step's output as input, or Null + let last_output = state + .step_outputs + .get(&state.last_completed_step) + .cloned() + .unwrap_or(serde_json::Value::Null); + + let run_id = state.run_id.clone(); + self.resolve_deterministic_action(&sop, &run_id, &step, last_output) + } + + /// Resolve the action for a deterministic step (execute or checkpoint). + fn resolve_deterministic_action( + &mut self, + sop: &Sop, + run_id: &str, + step: &SopStep, + input: serde_json::Value, + ) -> Result { + if step.kind == SopStepKind::Checkpoint { + // Pause at checkpoint — persist state and wait for approval + if let Some(run) = self.active_runs.get_mut(run_id) { + run.status = SopRunStatus::PausedCheckpoint; + run.waiting_since = Some(now_iso8601()); + } + + let state_file = self.persist_deterministic_state(run_id, sop)?; + + info!( + "Deterministic SOP run {run_id}: checkpoint at step {} '{}', state persisted to {}", + step.number, + step.title, + state_file.display() + ); + + Ok(SopRunAction::CheckpointWait { + run_id: run_id.to_string(), + step: step.clone(), + state_file, + }) + } else { + Ok(SopRunAction::DeterministicStep { + run_id: run_id.to_string(), + step: step.clone(), + input, + }) + } + } + + /// Persist the current deterministic run state to a JSON file. + fn persist_deterministic_state(&self, run_id: &str, sop: &Sop) -> Result { + let run = self + .active_runs + .get(run_id) + .ok_or_else(|| anyhow::anyhow!("Run not found: {run_id}"))?; + + let mut step_outputs = HashMap::new(); + for result in &run.step_results { + // Try to parse output as JSON, fall back to string value + let value = serde_json::from_str(&result.output) + .unwrap_or_else(|_| serde_json::Value::String(result.output.clone())); + step_outputs.insert(result.step_number, value); + } + + let state = DeterministicRunState { + run_id: run_id.to_string(), + sop_name: run.sop_name.clone(), + last_completed_step: run.current_step.saturating_sub(1), + total_steps: run.total_steps, + step_outputs, + persisted_at: now_iso8601(), + llm_calls_saved: run.llm_calls_saved, + paused_at_checkpoint: run.status == SopRunStatus::PausedCheckpoint, + }; + + // Write to SOP location directory, or system temp dir + let temp_dir = std::env::temp_dir(); + let dir = sop + .location + .as_deref() + .unwrap_or_else(|| temp_dir.as_path()); + let state_file = dir.join(format!("{run_id}.state.json")); + let json = serde_json::to_string_pretty(&state)?; + std::fs::write(&state_file, json)?; + + Ok(state_file) + } + + /// Load a persisted deterministic run state from a JSON file. + pub fn load_deterministic_state(path: &Path) -> Result { + let content = std::fs::read_to_string(path)?; + let state: DeterministicRunState = serde_json::from_str(&content)?; + Ok(state) + } + // ── Approval timeout ────────────────────────────────────────── /// Check all WaitingApproval runs for timeout. For Critical/High-priority SOPs, @@ -487,21 +771,21 @@ fn resolve_step_action(sop: &Sop, step: &SopStep, run_id: String, context: Strin } let needs_approval = match sop.execution_mode { - crate::sop::SopExecutionMode::Auto => false, - crate::sop::SopExecutionMode::Supervised => { + // Deterministic mode is handled via start_deterministic_run; + // if we reach here via the standard path, treat as Auto. + SopExecutionMode::Auto | SopExecutionMode::Deterministic => false, + SopExecutionMode::Supervised => { // Supervised: approval only before the first step step.number == 1 } - crate::sop::SopExecutionMode::StepByStep => true, - crate::sop::SopExecutionMode::PriorityBased => { - match sop.priority { - SopPriority::Critical | SopPriority::High => false, - SopPriority::Normal | SopPriority::Low => { - // Supervised behavior for normal/low - step.number == 1 - } + SopExecutionMode::StepByStep => true, + SopExecutionMode::PriorityBased => match sop.priority { + SopPriority::Critical | SopPriority::High => false, + SopPriority::Normal | SopPriority::Low => { + // Supervised behavior for normal/low + step.number == 1 } - } + }, }; if needs_approval { @@ -680,6 +964,8 @@ mod tests { body: "Do step one".into(), suggested_tools: vec!["shell".into()], requires_confirmation: false, + kind: SopStepKind::default(), + schema: None, }, SopStep { number: 2, @@ -687,11 +973,14 @@ mod tests { body: "Do step two".into(), suggested_tools: vec![], requires_confirmation: false, + kind: SopStepKind::default(), + schema: None, }, ], cooldown_secs: 0, max_concurrent: 1, location: None, + deterministic: false, } } @@ -706,6 +995,8 @@ mod tests { match action { SopRunAction::ExecuteStep { run_id, .. } | SopRunAction::WaitApproval { run_id, .. } + | SopRunAction::DeterministicStep { run_id, .. } + | SopRunAction::CheckpointWait { run_id, .. } | SopRunAction::Completed { run_id, .. } | SopRunAction::Failed { run_id, .. } => run_id, } @@ -1359,6 +1650,7 @@ mod tests { completed_at: None, step_results: Vec::new(), waiting_since: None, + llm_calls_saved: 0, }; let ctx = format_step_context(&sop, &run, &sop.steps[0]); assert!(ctx.contains("pump-shutdown")); @@ -1628,4 +1920,171 @@ mod tests { assert_eq!(run.status, SopRunStatus::Running); assert!(run.waiting_since.is_none()); } + + // ── Deterministic execution ───────────────────────── + + fn deterministic_sop(name: &str) -> Sop { + Sop { + name: name.into(), + description: format!("Deterministic SOP: {name}"), + version: "1.0.0".into(), + priority: SopPriority::Normal, + execution_mode: SopExecutionMode::Deterministic, + triggers: vec![SopTrigger::Manual], + steps: vec![ + SopStep { + number: 1, + title: "Step one".into(), + body: "Do step one".into(), + suggested_tools: vec![], + requires_confirmation: false, + kind: SopStepKind::Execute, + schema: None, + }, + SopStep { + number: 2, + title: "Checkpoint".into(), + body: "Pause for approval".into(), + suggested_tools: vec![], + requires_confirmation: false, + kind: SopStepKind::Checkpoint, + schema: None, + }, + SopStep { + number: 3, + title: "Step three".into(), + body: "Final step".into(), + suggested_tools: vec![], + requires_confirmation: false, + kind: SopStepKind::Execute, + schema: None, + }, + ], + cooldown_secs: 0, + max_concurrent: 1, + location: None, + deterministic: true, + } + } + + #[test] + fn deterministic_start_returns_deterministic_step() { + let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]); + let action = engine.start_run("det-sop", manual_event()).unwrap(); + assert!( + matches!(action, SopRunAction::DeterministicStep { ref step, .. } if step.number == 1), + "First action should be DeterministicStep for step 1" + ); + let run_id = extract_run_id(&action).to_string(); + assert!(run_id.starts_with("det-")); + } + + #[test] + fn deterministic_start_routes_through_start_run() { + let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]); + // start_run should auto-route to start_deterministic_run + let action = engine.start_run("det-sop", manual_event()).unwrap(); + assert!(matches!(action, SopRunAction::DeterministicStep { .. })); + } + + #[test] + fn deterministic_advance_pipes_output() { + let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]); + let action = engine.start_run("det-sop", manual_event()).unwrap(); + let run_id = extract_run_id(&action).to_string(); + + // Advance step 1 with output + let output = serde_json::json!({"result": "step1_done"}); + let action = engine + .advance_deterministic_step(&run_id, output.clone()) + .unwrap(); + + // Step 2 is a checkpoint — should pause + assert!( + matches!(action, SopRunAction::CheckpointWait { ref step, .. } if step.number == 2), + "Step 2 (checkpoint) should return CheckpointWait" + ); + } + + #[test] + fn deterministic_checkpoint_pauses_run() { + let mut engine = engine_with_sops(vec![deterministic_sop("det-sop")]); + let action = engine.start_run("det-sop", manual_event()).unwrap(); + let run_id = extract_run_id(&action).to_string(); + + // Complete step 1 + let action = engine + .advance_deterministic_step(&run_id, serde_json::json!({"ok": true})) + .unwrap(); + + // Should be at checkpoint + assert!(matches!(action, SopRunAction::CheckpointWait { .. })); + + // Run should be PausedCheckpoint + let run = engine.get_run(&run_id).unwrap(); + assert_eq!(run.status, SopRunStatus::PausedCheckpoint); + assert!(run.waiting_since.is_some()); + } + + #[test] + fn deterministic_completion_tracks_savings() { + let mut sop = deterministic_sop("det-sop"); + // Simplify: 2 execute steps, no checkpoint + sop.steps = vec![ + SopStep { + number: 1, + title: "Step one".into(), + body: "Do it".into(), + suggested_tools: vec![], + requires_confirmation: false, + kind: SopStepKind::Execute, + schema: None, + }, + SopStep { + number: 2, + title: "Step two".into(), + body: "Do it too".into(), + suggested_tools: vec![], + requires_confirmation: false, + kind: SopStepKind::Execute, + schema: None, + }, + ]; + let mut engine = engine_with_sops(vec![sop]); + + let action = engine.start_run("det-sop", manual_event()).unwrap(); + let run_id = extract_run_id(&action).to_string(); + + // Complete step 1 + let action = engine + .advance_deterministic_step(&run_id, serde_json::json!("s1")) + .unwrap(); + assert!(matches!(action, SopRunAction::DeterministicStep { .. })); + + // Complete step 2 + let action = engine + .advance_deterministic_step(&run_id, serde_json::json!("s2")) + .unwrap(); + assert!(matches!(action, SopRunAction::Completed { .. })); + + // Check savings + let savings = engine.deterministic_savings(); + assert_eq!(savings.total_runs, 1); + assert_eq!(savings.total_llm_calls_saved, 2); + } + + #[test] + fn deterministic_non_deterministic_sop_rejected() { + let mut engine = engine_with_sops(vec![test_sop( + "s1", + SopExecutionMode::Auto, + SopPriority::Normal, + )]); + let result = engine.start_deterministic_run("s1", manual_event()); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("not in deterministic mode")); + } } diff --git a/src/sop/gates.rs b/src/sop/gates.rs deleted file mode 100644 index a2ba4c5d2..000000000 --- a/src/sop/gates.rs +++ /dev/null @@ -1,746 +0,0 @@ -//! Gate evaluation state for ampersona trust-phase transitions. -//! -//! This module is only compiled when the `ampersona-gates` feature is active -//! (module declaration in `mod.rs` is behind `#[cfg]`). -//! -//! Gate decisions do NOT change SOP execution behavior — this is purely -//! observation + phase state tracking + audit logging. - -use std::path::Path; -use std::sync::Mutex; -use std::time::{Duration, Instant}; - -use ampersona_core::spec::gates::Gate; -use ampersona_core::state::{PendingTransition, PhaseState, TransitionRecord}; -use ampersona_core::traits::MetricsProvider; -use ampersona_engine::gates::decision::GateDecisionRecord; -use ampersona_engine::gates::evaluator::DefaultGateEvaluator; -use anyhow::Result; -use chrono::Utc; -use std::sync::Arc; -use tracing::{debug, error, info, warn}; - -use crate::memory::traits::{Memory, MemoryCategory}; - -const PHASE_STATE_KEY: &str = "sop_phase_state"; - -fn sop_category() -> MemoryCategory { - MemoryCategory::Custom("sop".into()) -} - -// ── Inner state ──────────────────────────────────────────────── - -struct GateEvalInner { - phase_state: PhaseState, - last_tick: Instant, -} - -// ── GateEvalState ────────────────────────────────────────────── - -/// Manages trust-phase gate evaluation state. -/// -/// Single `Mutex` ensures atomic interval-check + evaluate + apply. -/// `DefaultGateEvaluator` is a unit struct — called inline, not stored. -pub struct GateEvalState { - inner: Mutex, - memory: Arc, - gates: Vec, - tick_interval: Duration, -} - -impl GateEvalState { - /// Create with fresh (default) phase state. - pub fn new( - agent_name: &str, - gates: Vec, - interval_secs: u64, - memory: Arc, - ) -> Self { - Self { - inner: Mutex::new(GateEvalInner { - phase_state: PhaseState::new(agent_name.to_string()), - last_tick: Instant::now(), - }), - memory, - gates, - tick_interval: Duration::from_secs(interval_secs), - } - } - - /// Create with a known phase state (warm-start). - pub fn with_state( - state: PhaseState, - gates: Vec, - interval_secs: u64, - memory: Arc, - ) -> Self { - Self { - inner: Mutex::new(GateEvalInner { - phase_state: state, - last_tick: Instant::now(), - }), - memory, - gates, - tick_interval: Duration::from_secs(interval_secs), - } - } - - /// Load gate definitions from a persona JSON file. - /// - /// Expects `{"gates": [...]}` at the top level. Missing file → empty Vec. - /// Parse error → warn log + empty Vec. - pub fn load_gates_from_file(path: &Path) -> Vec { - let content = match std::fs::read_to_string(path) { - Ok(c) => c, - Err(_) => return Vec::new(), - }; - - #[derive(serde::Deserialize)] - struct PersonaGates { - #[serde(default)] - gates: Vec, - } - - match serde_json::from_str::(&content) { - Ok(parsed) => parsed.gates, - Err(e) => { - warn!(path = %path.display(), error = %e, "failed to parse gates from persona file"); - Vec::new() - } - } - } - - /// Rebuild from Memory backend (warm-start). - /// - /// Loads `PhaseState` from Memory key `sop_phase_state`, loads gates from - /// file, falls back to fresh state on parse error. - pub async fn rebuild_from_memory( - memory: Arc, - agent_name: &str, - gates_file: Option<&Path>, - interval_secs: u64, - ) -> Result { - let gates = gates_file - .map(Self::load_gates_from_file) - .unwrap_or_default(); - - let phase_state = match memory.get(PHASE_STATE_KEY).await? { - Some(entry) => match serde_json::from_str::(&entry.content) { - Ok(state) => { - info!( - phase = ?state.current_phase, - rev = state.state_rev, - "gate eval warm-started from memory" - ); - state - } - Err(e) => { - warn!(error = %e, "failed to parse phase state from memory, using fresh state"); - PhaseState::new(agent_name.to_string()) - } - }, - None => PhaseState::new(agent_name.to_string()), - }; - - Ok(Self::with_state(phase_state, gates, interval_secs, memory)) - } - - /// Atomic tick: interval check + evaluate + apply under single lock. - /// - /// Returns `Some(record)` if a gate fired, `None` otherwise. - pub fn tick(&self, metrics: &dyn MetricsProvider) -> Option { - let _span = tracing::info_span!("gate_eval_tick", gates = self.gates.len()).entered(); - - // interval_secs=0 means disabled - if self.tick_interval.is_zero() { - return None; - } - - if self.inner.is_poisoned() { - error!("gate eval mutex poisoned — loss of gate evaluation until restart"); - return None; - } - - let mut inner = self.inner.lock().ok()?; - - // Check interval - if inner.last_tick.elapsed() < self.tick_interval { - return None; - } - inner.last_tick = Instant::now(); - - // Evaluate - let record = DefaultGateEvaluator.evaluate(&self.gates, &inner.phase_state, metrics); - - match record { - Some(ref record) => { - // Apply decision in-place under the same lock - apply_decision(&mut inner.phase_state, record); - info!( - gate_id = %record.gate_id, - decision = %record.decision, - from = ?record.from_phase, - to = %record.to_phase, - "gate decision" - ); - } - None => { - debug!("no gate fired"); - } - } - - record - } - - /// Persist current phase state to Memory. - pub async fn persist(&self) -> Result<()> { - let content = { - let inner = self - .inner - .lock() - .map_err(|e| anyhow::anyhow!("gate eval lock poisoned: {e}"))?; - serde_json::to_string_pretty(&inner.phase_state)? - }; - self.memory - .store(PHASE_STATE_KEY, &content, sop_category(), None) - .await?; - Ok(()) - } - - /// Snapshot of current phase state (for diagnostics / sop_status). - pub fn phase_state_snapshot(&self) -> Option { - self.inner.lock().ok().map(|g| g.phase_state.clone()) - } - - /// Number of loaded gate definitions. - pub fn gate_count(&self) -> usize { - self.gates.len() - } -} - -// ── Decision application ─────────────────────────────────────── - -fn apply_decision(state: &mut PhaseState, record: &GateDecisionRecord) { - match record.decision.as_str() { - "transition" => { - state.current_phase = Some(record.to_phase.clone()); - state.state_rev += 1; - state.last_transition = Some(TransitionRecord { - gate_id: record.gate_id.clone(), - from_phase: record.from_phase.clone(), - to_phase: record.to_phase.clone(), - at: Utc::now(), - decision_id: format!( - "{}-{}-{}", - record.gate_id, record.state_rev, record.metrics_hash - ), - metrics_hash: Some(record.metrics_hash.clone()), - state_rev: state.state_rev, - }); - state.pending_transition = None; - state.updated_at = Utc::now(); - } - "observed" => { - debug!( - gate_id = %record.gate_id, - "observed gate — no state change" - ); - } - "pending_human" => { - state.pending_transition = Some(PendingTransition { - gate_id: record.gate_id.clone(), - from_phase: record.from_phase.clone(), - to_phase: record.to_phase.clone(), - decision: record.decision.clone(), - metrics_hash: record.metrics_hash.clone(), - state_rev: record.state_rev, - created_at: Utc::now(), - }); - state.updated_at = Utc::now(); - } - other => { - warn!(decision = %other, gate_id = %record.gate_id, "unknown gate decision — skipping"); - } - } -} - -// ── Tests ────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - use ampersona_core::errors::MetricError; - use ampersona_core::spec::gates::Gate; - use ampersona_core::traits::{MetricQuery, MetricSample}; - use ampersona_core::types::{CriterionOp, GateApproval, GateDirection, GateEnforcement}; - use serde_json::json; - use std::collections::HashMap; - - // ── Mock MetricsProvider ────────────────────────────────── - - struct MockMetrics { - values: HashMap, - } - - impl MockMetrics { - fn new(values: Vec<(&str, serde_json::Value)>) -> Self { - Self { - values: values - .into_iter() - .map(|(k, v)| (k.to_string(), v)) - .collect(), - } - } - } - - impl MetricsProvider for MockMetrics { - fn get_metric(&self, query: &MetricQuery) -> Result { - self.values - .get(&query.name) - .cloned() - .map(|value| MetricSample { - name: query.name.clone(), - value, - sampled_at: Utc::now(), - }) - .ok_or_else(|| MetricError::NotFound(query.name.clone())) - } - } - - // ── Helpers ─────────────────────────────────────────────── - - fn make_promote_gate( - id: &str, - metric: &str, - op: CriterionOp, - value: serde_json::Value, - to_phase: &str, - ) -> Gate { - Gate { - id: id.into(), - direction: GateDirection::Promote, - enforcement: GateEnforcement::Enforce, - priority: 0, - cooldown_seconds: 0, - from_phase: None, - to_phase: to_phase.into(), - criteria: vec![ampersona_core::spec::gates::Criterion { - metric: metric.into(), - op, - value, - window_seconds: None, - }], - metrics_schema: None, - approval: GateApproval::Auto, - on_pass: None, - } - } - - fn test_memory() -> Arc { - let mem_cfg = crate::config::MemoryConfig { - backend: "sqlite".into(), - ..crate::config::MemoryConfig::default() - }; - let tmp = tempfile::tempdir().unwrap(); - Arc::from(crate::memory::create_memory(&mem_cfg, tmp.path(), None).unwrap()) - } - - // ── Tests ───────────────────────────────────────────────── - - #[test] - fn tick_no_gates_returns_none() { - let mem = test_memory(); - let ge = GateEvalState::new("test-agent", vec![], 1, mem); - let metrics = MockMetrics::new(vec![]); - // Force past interval - { - let mut inner = ge.inner.lock().unwrap(); - inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap(); - } - assert!(ge.tick(&metrics).is_none()); - } - - #[test] - fn tick_with_passing_gate_returns_decision() { - let mem = test_memory(); - let gate = make_promote_gate( - "g1", - "sop.completion_rate", - CriterionOp::Gte, - json!(0.8), - "active", - ); - let ge = GateEvalState::new("test-agent", vec![gate], 1, mem); - let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.9))]); - { - let mut inner = ge.inner.lock().unwrap(); - inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap(); - } - let record = ge.tick(&metrics); - assert!(record.is_some()); - let record = record.unwrap(); - assert_eq!(record.gate_id, "g1"); - assert_eq!(record.to_phase, "active"); - } - - #[test] - fn tick_transition_advances_phase() { - let mem = test_memory(); - let gate = make_promote_gate( - "g1", - "sop.completion_rate", - CriterionOp::Gte, - json!(0.8), - "active", - ); - let ge = GateEvalState::new("test-agent", vec![gate], 1, mem); - let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]); - { - let mut inner = ge.inner.lock().unwrap(); - inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap(); - } - ge.tick(&metrics); - - let snap = ge.phase_state_snapshot().unwrap(); - assert_eq!(snap.current_phase, Some("active".into())); - assert!(snap.state_rev > 0); - assert!(snap.last_transition.is_some()); - } - - #[test] - fn tick_observed_no_state_change() { - let mem = test_memory(); - let mut gate = make_promote_gate( - "g1", - "sop.completion_rate", - CriterionOp::Gte, - json!(0.8), - "active", - ); - gate.enforcement = GateEnforcement::Observe; - let ge = GateEvalState::new("test-agent", vec![gate], 1, mem); - let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]); - { - let mut inner = ge.inner.lock().unwrap(); - inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap(); - } - let record = ge.tick(&metrics); - assert!(record.is_some()); - assert_eq!(record.unwrap().decision, "observed"); - - let snap = ge.phase_state_snapshot().unwrap(); - assert!(snap.current_phase.is_none()); // no change - assert_eq!(snap.state_rev, 0); - } - - #[test] - fn tick_pending_human_sets_pending() { - let mem = test_memory(); - let mut gate = make_promote_gate( - "g1", - "sop.completion_rate", - CriterionOp::Gte, - json!(0.8), - "active", - ); - gate.approval = GateApproval::Human; - let ge = GateEvalState::new("test-agent", vec![gate], 1, mem); - let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]); - { - let mut inner = ge.inner.lock().unwrap(); - inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap(); - } - let record = ge.tick(&metrics); - assert!(record.is_some()); - assert_eq!(record.unwrap().decision, "pending_human"); - - let snap = ge.phase_state_snapshot().unwrap(); - assert!(snap.pending_transition.is_some()); - assert_eq!(snap.pending_transition.unwrap().to_phase, "active"); - } - - #[test] - fn load_gates_missing_file_returns_empty() { - let gates = GateEvalState::load_gates_from_file(Path::new("/nonexistent/persona.json")); - assert!(gates.is_empty()); - } - - #[test] - fn load_gates_valid_persona() { - let dir = tempfile::tempdir().unwrap(); - let path = dir.path().join("persona.json"); - std::fs::write( - &path, - r#"{ - "gates": [{ - "id": "g1", - "direction": "promote", - "to_phase": "active", - "criteria": [{"metric": "sop.completion_rate", "op": "gte", "value": 0.8}] - }] - }"#, - ) - .unwrap(); - let gates = GateEvalState::load_gates_from_file(&path); - assert_eq!(gates.len(), 1); - assert_eq!(gates[0].id, "g1"); - } - - #[test] - fn load_gates_no_gates_key_returns_empty() { - let dir = tempfile::tempdir().unwrap(); - let path = dir.path().join("persona.json"); - std::fs::write(&path, r#"{"name": "test"}"#).unwrap(); - let gates = GateEvalState::load_gates_from_file(&path); - assert!(gates.is_empty()); - } - - #[test] - fn load_gates_invalid_json_returns_empty() { - let dir = tempfile::tempdir().unwrap(); - let path = dir.path().join("persona.json"); - std::fs::write(&path, "not json at all {{{").unwrap(); - let gates = GateEvalState::load_gates_from_file(&path); - assert!(gates.is_empty()); - } - - #[tokio::test] - async fn warm_start_roundtrip() { - let mem = test_memory(); - let gate = make_promote_gate( - "g1", - "sop.completion_rate", - CriterionOp::Gte, - json!(0.8), - "active", - ); - - // Create, tick to advance state, persist - let ge = GateEvalState::new("test-agent", vec![gate.clone()], 1, Arc::clone(&mem)); - let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]); - { - let mut inner = ge.inner.lock().unwrap(); - inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap(); - } - ge.tick(&metrics); - ge.persist().await.unwrap(); - - // Write gates file for rebuild - let dir = tempfile::tempdir().unwrap(); - let gates_path = dir.path().join("persona.json"); - std::fs::write( - &gates_path, - serde_json::to_string(&serde_json::json!({"gates": [gate]})).unwrap(), - ) - .unwrap(); - - // Rebuild - let ge2 = GateEvalState::rebuild_from_memory( - Arc::clone(&mem), - "test-agent", - Some(gates_path.as_path()), - 1, - ) - .await - .unwrap(); - - let snap = ge2.phase_state_snapshot().unwrap(); - assert_eq!(snap.current_phase, Some("active".into())); - assert!(snap.state_rev > 0); - assert_eq!(ge2.gate_count(), 1); - } - - #[tokio::test] - async fn warm_start_empty_memory() { - let mem = test_memory(); - let ge = GateEvalState::rebuild_from_memory(Arc::clone(&mem), "test-agent", None, 60) - .await - .unwrap(); - let snap = ge.phase_state_snapshot().unwrap(); - assert!(snap.current_phase.is_none()); - assert_eq!(snap.state_rev, 0); - assert_eq!(ge.gate_count(), 0); - } - - #[test] - fn demote_priority_over_promote() { - let mem = test_memory(); - let promote = make_promote_gate( - "promote-g", - "sop.completion_rate", - CriterionOp::Gte, - json!(0.8), - "active", - ); - let mut demote = make_promote_gate( - "demote-g", - "sop.deviation_rate", - CriterionOp::Gte, - json!(0.3), - "restricted", - ); - demote.direction = GateDirection::Demote; - demote.from_phase = Some("active".into()); - - let state = PhaseState { - current_phase: Some("active".into()), - ..PhaseState::new("test-agent".into()) - }; - let ge = GateEvalState::with_state(state, vec![promote, demote], 1, mem); - let metrics = MockMetrics::new(vec![ - ("sop.completion_rate", json!(0.95)), - ("sop.deviation_rate", json!(0.5)), - ]); - { - let mut inner = ge.inner.lock().unwrap(); - inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap(); - } - let record = ge.tick(&metrics).unwrap(); - // Demote should fire first (evaluator sorts demote before promote) - assert_eq!(record.gate_id, "demote-g"); - assert_eq!(record.to_phase, "restricted"); - } - - #[test] - fn idempotent_tick_after_apply() { - let mem = test_memory(); - let gate = make_promote_gate( - "g1", - "sop.completion_rate", - CriterionOp::Gte, - json!(0.8), - "active", - ); - let ge = GateEvalState::new("test-agent", vec![gate], 1, mem); - let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]); - - // First tick — fires - { - let mut inner = ge.inner.lock().unwrap(); - inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap(); - } - let first = ge.tick(&metrics); - assert!(first.is_some()); - - // Second tick with same metrics + updated state_rev — should not fire again - // (evaluator idempotency via metrics_hash + state_rev) - { - let mut inner = ge.inner.lock().unwrap(); - inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap(); - } - let second = ge.tick(&metrics); - assert!(second.is_none()); - } - - #[test] - fn gate_tick_with_real_collector() { - use crate::sop::metrics::SopMetricsCollector; - use crate::sop::types::{ - SopEvent, SopRun, SopRunStatus, SopStepResult, SopStepStatus, SopTriggerSource, - }; - - let mem = test_memory(); - let collector = SopMetricsCollector::new(); - - // Record a completed run - let run = SopRun { - run_id: "r1".into(), - sop_name: "test-sop".into(), - trigger_event: SopEvent { - source: SopTriggerSource::Manual, - topic: None, - payload: None, - timestamp: "2026-02-19T12:00:00Z".into(), - }, - status: SopRunStatus::Completed, - current_step: 1, - total_steps: 1, - started_at: "2026-02-19T12:00:00Z".into(), - completed_at: Some("2026-02-19T12:05:00Z".into()), - step_results: vec![SopStepResult { - step_number: 1, - status: SopStepStatus::Completed, - output: "done".into(), - started_at: "2026-02-19T12:00:00Z".into(), - completed_at: Some("2026-02-19T12:01:00Z".into()), - }], - waiting_since: None, - }; - collector.record_run_complete(&run); - - let gate = make_promote_gate( - "g1", - "sop.completion_rate", - CriterionOp::Gte, - json!(0.8), - "active", - ); - let ge = GateEvalState::new("test-agent", vec![gate], 1, mem); - { - let mut inner = ge.inner.lock().unwrap(); - inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap(); - } - let record = ge.tick(&collector); - assert!(record.is_some()); - assert_eq!(record.unwrap().to_phase, "active"); - } - - #[test] - fn tick_respects_interval() { - let mem = test_memory(); - let gate = make_promote_gate( - "g1", - "sop.completion_rate", - CriterionOp::Gte, - json!(0.8), - "active", - ); - - // Long interval - let ge = GateEvalState::new("test-agent", vec![gate.clone()], 3600, mem.clone()); - let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]); - // last_tick is Instant::now() — not enough elapsed - assert!(ge.tick(&metrics).is_none()); - - // Zero interval = disabled - let ge_disabled = GateEvalState::new("test-agent", vec![gate], 0, mem); - assert!(ge_disabled.tick(&metrics).is_none()); - } - - #[test] - fn ampersona_decision_strings_stable() { - // Canary test: verifies that DefaultGateEvaluator produces the decision - // strings we expect. If ampersona changes them, this test fails. - let state = PhaseState::new("test".into()); - - // Enforce promote → "transition" - let enforce_gate = - make_promote_gate("g-enforce", "m", CriterionOp::Gte, json!(1), "phase-b"); - let metrics = MockMetrics::new(vec![("m", json!(1))]); - let record = DefaultGateEvaluator.evaluate(&[enforce_gate], &state, &metrics); - assert_eq!( - record.as_ref().map(|r| r.decision.as_str()), - Some("transition") - ); - - // Observe promote → "observed" - let mut observe_gate = - make_promote_gate("g-observe", "m", CriterionOp::Gte, json!(1), "phase-b"); - observe_gate.enforcement = GateEnforcement::Observe; - let record = DefaultGateEvaluator.evaluate(&[observe_gate], &state, &metrics); - assert_eq!( - record.as_ref().map(|r| r.decision.as_str()), - Some("observed") - ); - - // RequireApproval promote → "pending_human" - let mut approval_gate = - make_promote_gate("g-approval", "m", CriterionOp::Gte, json!(1), "phase-b"); - approval_gate.approval = GateApproval::Human; - let record = DefaultGateEvaluator.evaluate(&[approval_gate], &state, &metrics); - assert_eq!( - record.as_ref().map(|r| r.decision.as_str()), - Some("pending_human") - ); - } -} diff --git a/src/sop/metrics.rs b/src/sop/metrics.rs index fd8f464dd..64fb5ca80 100644 --- a/src/sop/metrics.rs +++ b/src/sop/metrics.rs @@ -413,34 +413,6 @@ impl Default for SopMetricsCollector { } } -// ── Conditional MetricsProvider impl ─────────────────────────── - -#[cfg(feature = "ampersona-gates")] -impl ampersona_core::traits::MetricsProvider for SopMetricsCollector { - fn get_metric( - &self, - query: &ersona_core::traits::MetricQuery, - ) -> Result { - if self.inner.is_poisoned() { - return Err(ampersona_core::errors::MetricError::ProviderUnavailable); - } - let value = if let Some(ref window) = query.window { - // Window specified by evaluator (from Criterion.window_seconds) - self.get_metric_value_windowed(&query.name, window) - } else { - // No window — use name as-is (may include _7d/_30d suffix or be all-time) - self.get_metric_value(&query.name) - }; - value - .map(|v| ampersona_core::traits::MetricSample { - name: query.name.clone(), - value: v, - sampled_at: Utc::now(), - }) - .ok_or_else(|| ampersona_core::errors::MetricError::NotFound(query.name.clone())) - } -} - // ── Helpers ──────────────────────────────────────────────────── fn build_snapshot(run: &SopRun, human_count: u64, timeout_count: u64) -> RunSnapshot { @@ -637,6 +609,9 @@ mod tests { total_steps: u32, step_results: Vec, ) -> SopRun { + let now = Utc::now(); + let started = (now - chrono::Duration::minutes(5)).to_rfc3339(); + let completed = now.to_rfc3339(); SopRun { run_id: run_id.into(), sop_name: sop_name.into(), @@ -644,10 +619,11 @@ mod tests { status, current_step: total_steps, total_steps, - started_at: "2026-02-19T12:00:00Z".into(), - completed_at: Some("2026-02-19T12:05:00Z".into()), + started_at: started, + completed_at: Some(completed), step_results, waiting_since: None, + llm_calls_saved: 0, } } @@ -1141,43 +1117,6 @@ mod tests { ); } - // ── MetricsProvider impl (ampersona-gates feature) ─────── - - #[cfg(feature = "ampersona-gates")] - #[test] - fn metrics_provider_get_metric() { - use ampersona_core::traits::{MetricQuery, MetricsProvider}; - - let c = SopMetricsCollector::new(); - let run = make_run( - "r1", - "test-sop", - SopRunStatus::Completed, - 1, - vec![make_step(1, SopStepStatus::Completed)], - ); - c.record_run_complete(&run); - - let query = MetricQuery { - name: "sop.runs_completed".into(), - window: None, - }; - let sample = c.get_metric(&query).unwrap(); - assert_eq!(sample.value, json!(1u64)); - assert_eq!(sample.name, "sop.runs_completed"); - - // NotFound for unknown metric - let bad_query = MetricQuery { - name: "sop.nonexistent".into(), - window: None, - }; - let err = c.get_metric(&bad_query).unwrap_err(); - assert!(matches!( - err, - ampersona_core::errors::MetricError::NotFound(_) - )); - } - // ── Warm-start tests ───────────────────────────────────── #[tokio::test] @@ -1245,6 +1184,7 @@ mod tests { completed_at: None, step_results: vec![], waiting_since: None, + llm_calls_saved: 0, }; audit.log_run_start(&run).await.unwrap(); @@ -1342,6 +1282,7 @@ mod tests { completed_at: None, step_results: vec![], waiting_since: None, + llm_calls_saved: 0, }; audit.log_run_start(&running_run).await.unwrap(); audit.log_approval(&running_run, 1).await.unwrap(); @@ -1385,7 +1326,7 @@ mod tests { assert_eq!(hic_7d, 1); } - // ── Windowed MetricsProvider tests (ampersona-gates feature) ── + // ── Windowed MetricsProvider tests ── #[test] fn get_metric_windowed_7d_matches_suffix() { @@ -1461,32 +1402,4 @@ mod tests { .unwrap(); assert_eq!(val, 2); } - - #[cfg(feature = "ampersona-gates")] - #[test] - fn get_metric_provider_window_propagation() { - use ampersona_core::traits::{MetricQuery, MetricsProvider}; - - let c = SopMetricsCollector::new(); - let run = make_run( - "r1", - "test-sop", - SopRunStatus::Completed, - 1, - vec![make_step(1, SopStepStatus::Completed)], - ); - c.record_run_complete(&run); - - // Query with window via MetricsProvider trait - let query = MetricQuery { - name: "sop.runs_completed".into(), - window: Some(std::time::Duration::from_secs(7 * 86400)), - }; - let sample = c.get_metric(&query).unwrap(); - assert_eq!(sample.value, json!(1u64)); - - // Same result as suffix-based query - let suffix_val = c.get_metric_value("sop.runs_completed_7d"); - assert_eq!(Some(sample.value), suffix_val); - } } diff --git a/src/sop/mod.rs b/src/sop/mod.rs index be9f45d54..eba59dcbf 100644 --- a/src/sop/mod.rs +++ b/src/sop/mod.rs @@ -2,20 +2,17 @@ pub mod audit; pub mod condition; pub mod dispatch; pub mod engine; -#[cfg(feature = "ampersona-gates")] -pub mod gates; pub mod metrics; pub mod types; pub use audit::SopAuditLogger; pub use engine::SopEngine; -#[cfg(feature = "ampersona-gates")] -pub use gates::GateEvalState; pub use metrics::SopMetricsCollector; #[allow(unused_imports)] pub use types::{ - Sop, SopEvent, SopExecutionMode, SopPriority, SopRun, SopRunAction, SopRunStatus, SopStep, - SopStepResult, SopStepStatus, SopTrigger, SopTriggerSource, + DeterministicRunState, DeterministicSavings, Sop, SopEvent, SopExecutionMode, SopPriority, + SopRun, SopRunAction, SopRunStatus, SopStep, SopStepKind, SopStepResult, SopStepStatus, + SopTrigger, SopTriggerSource, StepSchema, }; use anyhow::Result; @@ -24,6 +21,19 @@ use tracing::warn; use types::{SopManifest, SopMeta}; +/// Parse an execution mode string into `SopExecutionMode`, falling back to +/// `Supervised` for unknown values. +pub fn parse_execution_mode(s: &str) -> SopExecutionMode { + match s.trim().to_lowercase().as_str() { + "auto" => SopExecutionMode::Auto, + "step_by_step" => SopExecutionMode::StepByStep, + "priority_based" => SopExecutionMode::PriorityBased, + "deterministic" => SopExecutionMode::Deterministic, + // "supervised" and any unknown value + _ => SopExecutionMode::Supervised, + } +} + // ── SOP directory helpers ─────────────────────────────────────── /// Return the default SOPs directory: `/sops`. @@ -112,19 +122,28 @@ fn load_sop(sop_dir: &Path, default_execution_mode: SopExecutionMode) -> Result< execution_mode, cooldown_secs, max_concurrent, + deterministic, } = manifest.sop; + // When deterministic=true, override execution_mode to Deterministic + let effective_mode = if deterministic { + SopExecutionMode::Deterministic + } else { + execution_mode.unwrap_or(default_execution_mode) + }; + Ok(Sop { name, description, version, priority, - execution_mode: execution_mode.unwrap_or(default_execution_mode), + execution_mode: effective_mode, triggers: manifest.triggers, steps, cooldown_secs, max_concurrent, location: Some(sop_dir.to_path_buf()), + deterministic, }) } @@ -143,6 +162,7 @@ pub fn parse_steps(md: &str) -> Vec { let mut current_body = String::new(); let mut current_tools: Vec = Vec::new(); let mut current_requires_confirmation = false; + let mut current_kind = SopStepKind::Execute; for line in md.lines() { let trimmed = line.trim(); @@ -164,6 +184,7 @@ pub fn parse_steps(md: &str) -> Vec { &mut current_body, &mut current_tools, &mut current_requires_confirmation, + &mut current_kind, ); in_steps_section = false; } @@ -184,6 +205,7 @@ pub fn parse_steps(md: &str) -> Vec { &mut current_body, &mut current_tools, &mut current_requires_confirmation, + &mut current_kind, ); let step_num = u32::try_from(steps.len()) @@ -217,6 +239,15 @@ pub fn parse_steps(md: &str) -> Vec { if let Some(val) = bullet.strip_prefix("requires_confirmation:") { current_requires_confirmation = val.trim().eq_ignore_ascii_case("true"); } + } else if bullet.starts_with("kind:") { + if let Some(val) = bullet.strip_prefix("kind:") { + let val = val.trim(); + if val.eq_ignore_ascii_case("checkpoint") { + current_kind = SopStepKind::Checkpoint; + } else { + current_kind = SopStepKind::Execute; + } + } } else { // Continuation body line if !current_body.is_empty() { @@ -244,6 +275,7 @@ pub fn parse_steps(md: &str) -> Vec { &mut current_body, &mut current_tools, &mut current_requires_confirmation, + &mut current_kind, ); steps @@ -257,6 +289,7 @@ fn flush_step( body: &mut String, tools: &mut Vec, requires_confirmation: &mut bool, + kind: &mut SopStepKind, ) { if let Some(n) = number.take() { steps.push(SopStep { @@ -265,9 +298,12 @@ fn flush_step( body: body.trim().to_string(), suggested_tools: std::mem::take(tools), requires_confirmation: *requires_confirmation, + kind: *kind, + schema: None, }); *body = String::new(); *requires_confirmation = false; + *kind = SopStepKind::Execute; } } @@ -349,7 +385,7 @@ pub fn handle_command(command: crate::SopCommands, config: &crate::config::Confi let sops = load_sops( &config.workspace_dir, sops_dir_override, - config.sop.default_execution_mode, + parse_execution_mode(&config.sop.default_execution_mode), ); if sops.is_empty() { println!("No SOPs found."); @@ -393,7 +429,7 @@ pub fn handle_command(command: crate::SopCommands, config: &crate::config::Confi let sops = load_sops( &config.workspace_dir, sops_dir_override, - config.sop.default_execution_mode, + parse_execution_mode(&config.sop.default_execution_mode), ); let matching: Vec<&Sop> = if let Some(ref name) = name { sops.iter().filter(|s| s.name == *name).collect() @@ -443,7 +479,7 @@ pub fn handle_command(command: crate::SopCommands, config: &crate::config::Confi let sops = load_sops( &config.workspace_dir, sops_dir_override, - config.sop.default_execution_mode, + parse_execution_mode(&config.sop.default_execution_mode), ); let sop = sops .iter() @@ -474,16 +510,23 @@ pub fn handle_command(command: crate::SopCommands, config: &crate::config::Confi if !sop.steps.is_empty() { println!("Steps:"); for step in &sop.steps { - let confirm_tag = if step.requires_confirmation { - " [requires confirmation]" + let mut tags = Vec::new(); + if step.requires_confirmation { + tags.push("requires confirmation"); + } + if step.kind == SopStepKind::Checkpoint { + tags.push("checkpoint"); + } + let tag_str = if tags.is_empty() { + String::new() } else { - "" + format!(" [{}]", tags.join(", ")) }; println!( " {}. {}{}", step.number, console::style(&step.title).bold(), - confirm_tag + tag_str ); if !step.body.is_empty() { for line in step.body.lines() { @@ -705,6 +748,7 @@ type = "manual" cooldown_secs: 0, max_concurrent: 1, location: None, + deterministic: false, }; let warnings = validate_sop(&sop); @@ -729,10 +773,13 @@ type = "manual" body: "Do the thing".into(), suggested_tools: vec!["shell".into()], requires_confirmation: false, + kind: SopStepKind::default(), + schema: None, }], cooldown_secs: 0, max_concurrent: 1, location: None, + deterministic: false, }; let warnings = validate_sop(&sop); @@ -813,4 +860,75 @@ type = "manual" )); assert!(matches!(manifest.triggers[4], SopTrigger::Manual)); } + + #[test] + fn deterministic_flag_overrides_execution_mode() { + let dir = tempfile::tempdir().unwrap(); + let sop_dir = dir.path().join("det-sop"); + fs::create_dir_all(&sop_dir).unwrap(); + + fs::write( + sop_dir.join("SOP.toml"), + r#" +[sop] +name = "det-sop" +description = "A deterministic SOP" +deterministic = true + +[[triggers]] +type = "manual" +"#, + ) + .unwrap(); + + fs::write( + sop_dir.join("SOP.md"), + r#"# Det SOP + +## Steps + +1. **Step one** — First step. + - kind: execute + +2. **Checkpoint** — Pause for approval. + - kind: checkpoint + +3. **Step three** — Final step. +"#, + ) + .unwrap(); + + let sops = load_sops_from_directory(dir.path(), SopExecutionMode::Supervised); + assert_eq!(sops.len(), 1); + + let sop = &sops[0]; + assert_eq!(sop.name, "det-sop"); + assert_eq!(sop.execution_mode, SopExecutionMode::Deterministic); + assert!(sop.deterministic); + assert_eq!(sop.steps.len(), 3); + assert_eq!(sop.steps[0].kind, SopStepKind::Execute); + assert_eq!(sop.steps[1].kind, SopStepKind::Checkpoint); + assert_eq!(sop.steps[2].kind, SopStepKind::Execute); + } + + #[test] + fn parse_steps_with_checkpoint_kind() { + let md = r#"## Steps + +1. **Read data** — Read from sensor. + - tools: gpio_read + - kind: execute + +2. **Review** — Human review checkpoint. + - kind: checkpoint + +3. **Apply** — Apply changes. +"#; + let steps = parse_steps(md); + assert_eq!(steps.len(), 3); + assert_eq!(steps[0].kind, SopStepKind::Execute); + assert_eq!(steps[1].kind, SopStepKind::Checkpoint); + // Default kind should be Execute + assert_eq!(steps[2].kind, SopStepKind::Execute); + } } diff --git a/src/sop/types.rs b/src/sop/types.rs index 0c995e04b..21eba034a 100644 --- a/src/sop/types.rs +++ b/src/sop/types.rs @@ -1,5 +1,6 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::fmt; use std::path::PathBuf; @@ -42,6 +43,10 @@ pub enum SopExecutionMode { StepByStep, /// Critical/High → Auto, Normal/Low → Supervised. PriorityBased, + /// Execute steps sequentially without LLM round-trips. + /// Step outputs are piped as inputs to the next step. + /// Checkpoint steps pause for human approval. + Deterministic, } impl fmt::Display for SopExecutionMode { @@ -51,6 +56,7 @@ impl fmt::Display for SopExecutionMode { Self::Supervised => write!(f, "supervised"), Self::StepByStep => write!(f, "step_by_step"), Self::PriorityBased => write!(f, "priority_based"), + Self::Deterministic => write!(f, "deterministic"), } } } @@ -93,6 +99,44 @@ impl fmt::Display for SopTrigger { } } +// ── Step kind ──────────────────────────────────────────────────── + +/// The kind of a workflow step. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum SopStepKind { + /// Normal step — executed by the agent (or deterministic handler). + #[default] + Execute, + /// Checkpoint step — pauses execution and waits for human approval. + Checkpoint, +} + +impl fmt::Display for SopStepKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Execute => write!(f, "execute"), + Self::Checkpoint => write!(f, "checkpoint"), + } + } +} + +// ── Typed step parameters ──────────────────────────────────────── + +/// JSON Schema fragment for validating step input/output data. +/// +/// Stored as a raw `serde_json::Value` so callers can validate without +/// pulling in a full JSON Schema library. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct StepSchema { + /// JSON Schema object describing expected input shape. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub input: Option, + /// JSON Schema object describing expected output shape. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub output: Option, +} + // ── Step ──────────────────────────────────────────────────────── /// A single step in an SOP procedure, parsed from SOP.md. @@ -105,6 +149,12 @@ pub struct SopStep { pub suggested_tools: Vec, #[serde(default)] pub requires_confirmation: bool, + /// Step kind: `execute` (default) or `checkpoint`. + #[serde(default)] + pub kind: SopStepKind, + /// Typed input/output schemas for deterministic data flow validation. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub schema: Option, } // ── SOP ───────────────────────────────────────────────────────── @@ -125,6 +175,10 @@ pub struct Sop { pub max_concurrent: u32, #[serde(skip)] pub location: Option, + /// When true, sets execution_mode to Deterministic. + /// Steps execute sequentially without LLM round-trips. + #[serde(default)] + pub deterministic: bool, } fn default_cooldown_secs() -> u64 { @@ -160,6 +214,9 @@ pub(crate) struct SopMeta { pub cooldown_secs: u64, #[serde(default = "default_max_concurrent")] pub max_concurrent: u32, + /// Opt-in deterministic execution (no LLM round-trips between steps). + #[serde(default)] + pub deterministic: bool, } fn default_sop_version() -> String { @@ -214,6 +271,8 @@ pub enum SopRunStatus { Pending, Running, WaitingApproval, + /// Paused at a checkpoint in a deterministic workflow. + PausedCheckpoint, Completed, Failed, Cancelled, @@ -225,6 +284,7 @@ impl fmt::Display for SopRunStatus { Self::Pending => write!(f, "pending"), Self::Running => write!(f, "running"), Self::WaitingApproval => write!(f, "waiting_approval"), + Self::PausedCheckpoint => write!(f, "paused_checkpoint"), Self::Completed => write!(f, "completed"), Self::Failed => write!(f, "failed"), Self::Cancelled => write!(f, "cancelled"), @@ -276,6 +336,44 @@ pub struct SopRun { /// ISO-8601 timestamp when the run entered WaitingApproval (for timeout tracking). #[serde(default)] pub waiting_since: Option, + /// Number of LLM calls saved by deterministic execution in this run. + #[serde(default)] + pub llm_calls_saved: u64, +} + +// ── Deterministic workflow state (persistence + resume) ────────── + +/// Persisted state for a deterministic workflow run, enabling resume +/// after interruption. Serialized to a JSON file alongside the SOP. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeterministicRunState { + /// Identifier of this run. + pub run_id: String, + /// SOP name this state belongs to. + pub sop_name: String, + /// Last successfully completed step number (0 = none completed). + pub last_completed_step: u32, + /// Total steps in the workflow. + pub total_steps: u32, + /// Output of each completed step, keyed by step number. + pub step_outputs: HashMap, + /// ISO-8601 timestamp when this state was last persisted. + pub persisted_at: String, + /// Number of LLM calls that were saved by deterministic execution. + pub llm_calls_saved: u64, + /// Whether the run is paused at a checkpoint awaiting approval. + pub paused_at_checkpoint: bool, +} + +// ── Cost savings metric ────────────────────────────────────────── + +/// Tracks how many LLM round-trips were saved by deterministic execution. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct DeterministicSavings { + /// Total LLM calls saved across all deterministic runs. + pub total_llm_calls_saved: u64, + /// Total deterministic runs completed. + pub total_runs: u64, } /// What the engine instructs the caller to do next after a state transition. @@ -293,6 +391,20 @@ pub enum SopRunAction { step: SopStep, context: String, }, + /// Execute a step deterministically (no LLM). The `input` is the piped + /// output from the previous step (or trigger payload for step 1). + DeterministicStep { + run_id: String, + step: SopStep, + input: serde_json::Value, + }, + /// Deterministic workflow hit a checkpoint — pause for human approval. + /// Workflow state has been persisted so it can resume after approval. + CheckpointWait { + run_id: String, + step: SopStep, + state_file: PathBuf, + }, /// The SOP run completed successfully. Completed { run_id: String, sop_name: String }, /// The SOP run failed. @@ -378,6 +490,62 @@ condition = "$.value > 85" ); } + #[test] + fn step_kind_display() { + assert_eq!(SopStepKind::Execute.to_string(), "execute"); + assert_eq!(SopStepKind::Checkpoint.to_string(), "checkpoint"); + } + + #[test] + fn step_kind_serde_roundtrip() { + let json = serde_json::to_string(&SopStepKind::Checkpoint).unwrap(); + assert_eq!(json, "\"checkpoint\""); + let parsed: SopStepKind = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed, SopStepKind::Checkpoint); + } + + #[test] + fn execution_mode_deterministic_roundtrip() { + let json = serde_json::to_string(&SopExecutionMode::Deterministic).unwrap(); + assert_eq!(json, "\"deterministic\""); + let parsed: SopExecutionMode = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed, SopExecutionMode::Deterministic); + } + + #[test] + fn deterministic_run_state_serde() { + let state = DeterministicRunState { + run_id: "det-001".into(), + sop_name: "test-sop".into(), + last_completed_step: 2, + total_steps: 5, + step_outputs: { + let mut m = std::collections::HashMap::new(); + m.insert(1, serde_json::json!({"result": "ok"})); + m.insert(2, serde_json::json!("step2_done")); + m + }, + persisted_at: "2026-03-01T00:00:00Z".into(), + llm_calls_saved: 2, + paused_at_checkpoint: true, + }; + let json = serde_json::to_string(&state).unwrap(); + let parsed: DeterministicRunState = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.run_id, "det-001"); + assert_eq!(parsed.last_completed_step, 2); + assert_eq!(parsed.llm_calls_saved, 2); + assert!(parsed.paused_at_checkpoint); + assert_eq!(parsed.step_outputs.len(), 2); + } + + #[test] + fn run_status_paused_checkpoint_display() { + assert_eq!( + SopRunStatus::PausedCheckpoint.to_string(), + "paused_checkpoint" + ); + } + #[test] fn step_defaults() { let step: SopStep = @@ -459,6 +627,7 @@ path = "/sop/test" completed_at: Some("2026-02-19T12:00:05Z".into()), }], waiting_since: None, + llm_calls_saved: 0, }; let json = serde_json::to_string(&run).unwrap(); let parsed: SopRun = serde_json::from_str(&json).unwrap(); diff --git a/src/tools/mod.rs b/src/tools/mod.rs index d0664c363..a74f05ed6 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -85,6 +85,11 @@ pub mod sessions; pub mod shell; pub mod skill_http; pub mod skill_tool; +pub mod sop_advance; +pub mod sop_approve; +pub mod sop_execute; +pub mod sop_list; +pub mod sop_status; pub mod swarm; pub mod text_browser; pub mod tool_search; @@ -166,6 +171,11 @@ pub use shell::ShellTool; pub use skill_http::SkillHttpTool; #[allow(unused_imports)] pub use skill_tool::SkillShellTool; +pub use sop_advance::SopAdvanceTool; +pub use sop_approve::SopApproveTool; +pub use sop_execute::SopExecuteTool; +pub use sop_list::SopListTool; +pub use sop_status::SopStatusTool; pub use swarm::SwarmTool; pub use text_browser::TextBrowserTool; pub use tool_search::ToolSearchTool; @@ -702,6 +712,18 @@ pub fn all_tools_with_runtime( Arc::clone(&channel_map_handle), ))); + // SOP tools (registered when sops_dir is configured) + if root_config.sop.sops_dir.is_some() { + let sop_engine = Arc::new(std::sync::Mutex::new(crate::sop::SopEngine::new( + root_config.sop.clone(), + ))); + tool_arcs.push(Arc::new(SopListTool::new(Arc::clone(&sop_engine)))); + tool_arcs.push(Arc::new(SopExecuteTool::new(Arc::clone(&sop_engine)))); + tool_arcs.push(Arc::new(SopAdvanceTool::new(Arc::clone(&sop_engine)))); + tool_arcs.push(Arc::new(SopApproveTool::new(Arc::clone(&sop_engine)))); + tool_arcs.push(Arc::new(SopStatusTool::new(Arc::clone(&sop_engine)))); + } + if let Some(key) = composio_key { if !key.is_empty() { tool_arcs.push(Arc::new(ComposioTool::new( diff --git a/src/tools/sop_advance.rs b/src/tools/sop_advance.rs index e31b75d65..249bbffc5 100644 --- a/src/tools/sop_advance.rs +++ b/src/tools/sop_advance.rs @@ -181,6 +181,18 @@ impl Tool for SopAdvanceTool { } => { format!("SOP '{sop_name}' run {run_id} failed: {reason}") } + SopRunAction::DeterministicStep { run_id, step, .. } => { + format!( + "Step recorded. Next deterministic step for run {run_id}: {}", + step.title + ) + } + SopRunAction::CheckpointWait { run_id, step, .. } => { + format!( + "Step recorded. Run {run_id} paused at checkpoint: {}", + step.title + ) + } }; Ok(ToolResult { success: true, @@ -222,6 +234,8 @@ mod tests { body: "Do step one".into(), suggested_tools: vec![], requires_confirmation: false, + kind: SopStepKind::default(), + schema: None, }, SopStep { number: 2, @@ -229,11 +243,14 @@ mod tests { body: "Do step two".into(), suggested_tools: vec![], requires_confirmation: false, + kind: SopStepKind::default(), + schema: None, }, ], cooldown_secs: 0, max_concurrent: 1, location: None, + deterministic: false, } } diff --git a/src/tools/sop_approve.rs b/src/tools/sop_approve.rs index 204831241..e407935e9 100644 --- a/src/tools/sop_approve.rs +++ b/src/tools/sop_approve.rs @@ -143,10 +143,13 @@ mod tests { body: "Do it".into(), suggested_tools: vec![], requires_confirmation: false, + kind: SopStepKind::default(), + schema: None, }], cooldown_secs: 0, max_concurrent: 1, location: None, + deterministic: false, } } diff --git a/src/tools/sop_execute.rs b/src/tools/sop_execute.rs index 5d4235af6..328e6d2af 100644 --- a/src/tools/sop_execute.rs +++ b/src/tools/sop_execute.rs @@ -118,6 +118,18 @@ impl Tool for SopExecuteTool { SopRunAction::Failed { run_id, reason, .. } => { format!("SOP run {run_id} failed: {reason}") } + SopRunAction::DeterministicStep { run_id, step, .. } => { + format!( + "SOP run started (deterministic): {run_id}\nFirst step: {}", + step.title + ) + } + SopRunAction::CheckpointWait { run_id, step, .. } => { + format!( + "SOP run started: {run_id} (paused at checkpoint: {})", + step.title + ) + } }; Ok(ToolResult { success: true, @@ -140,7 +152,9 @@ fn action_run_id(action: &SopRunAction) -> Option<&str> { SopRunAction::ExecuteStep { run_id, .. } | SopRunAction::WaitApproval { run_id, .. } | SopRunAction::Completed { run_id, .. } - | SopRunAction::Failed { run_id, .. } => Some(run_id), + | SopRunAction::Failed { run_id, .. } + | SopRunAction::DeterministicStep { run_id, .. } + | SopRunAction::CheckpointWait { run_id, .. } => Some(run_id), } } @@ -168,6 +182,8 @@ mod tests { body: "Do step one".into(), suggested_tools: vec!["shell".into()], requires_confirmation: false, + kind: SopStepKind::default(), + schema: None, }, SopStep { number: 2, @@ -175,11 +191,14 @@ mod tests { body: "Do step two".into(), suggested_tools: vec![], requires_confirmation: false, + kind: SopStepKind::default(), + schema: None, }, ], cooldown_secs: 0, max_concurrent: 1, location: None, + deterministic: false, } } diff --git a/src/tools/sop_list.rs b/src/tools/sop_list.rs index 048ac4d24..a04ff6485 100644 --- a/src/tools/sop_list.rs +++ b/src/tools/sop_list.rs @@ -137,10 +137,13 @@ mod tests { body: "Do it".into(), suggested_tools: vec![], requires_confirmation: false, + kind: SopStepKind::default(), + schema: None, }], cooldown_secs: 0, max_concurrent: 1, location: None, + deterministic: false, } } diff --git a/src/tools/sop_status.rs b/src/tools/sop_status.rs index cf0255325..3b897c7f0 100644 --- a/src/tools/sop_status.rs +++ b/src/tools/sop_status.rs @@ -11,8 +11,6 @@ use crate::sop::{SopEngine, SopMetricsCollector}; pub struct SopStatusTool { engine: Arc>, collector: Option>, - #[cfg(feature = "ampersona-gates")] - gate_eval: Option>, } impl SopStatusTool { @@ -20,8 +18,6 @@ impl SopStatusTool { Self { engine, collector: None, - #[cfg(feature = "ampersona-gates")] - gate_eval: None, } } @@ -30,61 +26,11 @@ impl SopStatusTool { self } - #[cfg(feature = "ampersona-gates")] - pub fn with_gate_eval(mut self, gate_eval: Arc) -> Self { - self.gate_eval = Some(gate_eval); - self - } - fn append_gate_status(&self, output: &mut String, include_gate_status: bool) { - #[cfg(feature = "ampersona-gates")] - if include_gate_status { - if let Some(ref ge) = self.gate_eval { - if let Some(snap) = ge.phase_state_snapshot() { - let _ = writeln!(output, "\nGate Status:"); - let _ = writeln!( - output, - " current_phase: {}", - snap.current_phase.as_deref().unwrap_or("(none)") - ); - let _ = writeln!(output, " state_rev: {}", snap.state_rev); - let _ = writeln!(output, " gates_loaded: {}", ge.gate_count()); - if let Some(ref tr) = snap.last_transition { - let _ = writeln!( - output, - " last_transition: {} ({} → {})", - tr.at.to_rfc3339(), - tr.from_phase.as_deref().unwrap_or("(none)"), - tr.to_phase, - ); - } else { - let _ = writeln!(output, " last_transition: none"); - } - if let Some(ref pt) = snap.pending_transition { - let _ = writeln!( - output, - " pending_transition: {} → {} ({})", - pt.from_phase.as_deref().unwrap_or("(none)"), - pt.to_phase, - pt.decision, - ); - } else { - let _ = writeln!(output, " pending_transition: none"); - } - } - } else { - let _ = writeln!( - output, - "\nGate Status: not available (gate eval not configured)" - ); - } - } - - #[cfg(not(feature = "ampersona-gates"))] if include_gate_status { let _ = writeln!( output, - "\nGate Status: not available (ampersona-gates feature not enabled)" + "\nGate Status: not available (gate evaluation not supported)" ); } } @@ -309,10 +255,13 @@ mod tests { body: "Do it".into(), suggested_tools: vec![], requires_confirmation: false, + kind: SopStepKind::default(), + schema: None, }], cooldown_secs: 0, max_concurrent: 2, location: None, + deterministic: false, } } @@ -431,6 +380,7 @@ mod tests { completed_at: Some("2026-02-19T12:01:00Z".into()), }], waiting_since: None, + llm_calls_saved: 0, }; collector.record_run_complete(&run); @@ -466,6 +416,7 @@ mod tests { completed_at: Some("2026-02-19T12:01:00Z".into()), }], waiting_since: None, + llm_calls_saved: 0, }; collector.record_run_complete(&run); diff --git a/web/dist/.gitkeep b/web/dist/.gitkeep new file mode 100644 index 000000000..e69de29bb