Compare commits

...

6 Commits

Author SHA1 Message Date
argenis de la rosa
18cfb4e2fe fix(config): restore max_system_prompt_chars field removed by merge conflict
The PR branch was based on an older master and the merge incorrectly
removed the max_system_prompt_chars field from AgentConfig, which was
added by PR #4185. This restores the field, its default function, and
the Default impl entry to prevent a build break on merge.
2026-03-21 20:00:20 -04:00
rareba
ccabfd7167 fix(sop): wire SOP tools into tool registry
The five SOP tools (sop_list, sop_advance, sop_execute, sop_approve,
sop_status) existed as source files but were never registered in
all_tools_with_runtime. They are now conditionally registered when
sop.sops_dir is configured.

Also fixes:
- Add mod sop + SopCommands re-export to main.rs (binary crate)
- Handle new DeterministicStep/CheckpointWait variants in match arms
- Add missing struct fields (deterministic, kind, schema, llm_calls_saved)
  to test constructors

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 21:06:49 +01:00
Giulio V
b91844966d style(sop): fix formatting in metrics.rs 2026-03-21 18:55:02 +01:00
Giulio V
e46a19c61c fix(sop): remove non-functional ampersona-gates feature flag
The ampersona-gates feature flag referenced ampersona_core and
ampersona_engine crates that do not exist, causing cargo check
--all-features to fail. Remove the feature flag and all gated code:

- Remove ampersona-gates from Cargo.toml [features]
- Delete src/sop/gates.rs (entire module behind cfg gate)
- Remove gated methods from audit.rs (log_gate_decision, log_phase_state)
- Remove gated MetricsProvider impl and tests from metrics.rs
- Simplify sop_status.rs gate_eval field and append_gate_status
- Update observability docs (EN + zh-CN)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 18:39:56 +01:00
Giulio V
e8c61522ea fix(sop): resolve clippy warnings and fix metrics test failures
- Add `ampersona-gates = []` feature declaration to Cargo.toml to fix
  clippy `unexpected cfg condition value` errors in sop/mod.rs,
  sop/audit.rs, and sop/metrics.rs.
- Use dynamic Utc::now() timestamps in metrics test helper `make_run()`
  instead of hardcoded 2026-02-19 dates, which had drifted outside the
  7-day/30-day windowed metric windows causing 7 test failures.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 18:08:07 +01:00
Giulio V
a12716a065 feat(sop): add deterministic execution mode with typed steps and approval checkpoints
Add opt-in deterministic execution to the SOP workflow engine, inspired
by OpenClaw's Lobster engine. Deterministic mode bypasses LLM round-trips
for step transitions, executing steps sequentially with piped outputs.

Key additions:
- SopExecutionMode::Deterministic variant and `deterministic: true` SOP.toml flag
- SopStepKind enum (Execute/Checkpoint) for marking approval pause points
- StepSchema for typed input/output validation (JSON Schema fragments)
- DeterministicRunState for persisting/resuming interrupted workflows
- DeterministicSavings for tracking LLM calls saved
- SopRunAction::DeterministicStep and CheckpointWait action variants
- SopRunStatus::PausedCheckpoint status
- Engine methods: start_deterministic_run, advance_deterministic_step,
  resume_deterministic_run, persist/load_deterministic_state
- SopConfig in config/schema.rs with sops_dir, default_execution_mode,
  max_concurrent_total, approval_timeout_secs, max_finished_runs
- Wire `pub mod sop` in lib.rs (previously dead/uncompiled module)
- Fix pre-existing test issues: TempDir import, async test annotations

All 86 SOP core tests pass (engine: 42, mod: 17, dispatch: 13, types: 14).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 16:06:09 +01:00
20 changed files with 704 additions and 977 deletions

View File

@ -12,8 +12,6 @@ SOP 审计条目通过 `SopAuditLogger` 持久化到配置的内存后端的 `so
- `sop_step_{run_id}_{step_number}`:单步结果
- `sop_approval_{run_id}_{step_number}`:操作员审批记录
- `sop_timeout_approve_{run_id}_{step_number}`:超时自动审批记录
- `sop_gate_decision_{gate_id}_{timestamp_ms}`:门评估器决策记录(启用 `ampersona-gates` 时)
- `sop_phase_state`:持久化的信任阶段状态快照(启用 `ampersona-gates` 时)
## 2. 检查路径

View File

@ -12,8 +12,6 @@ Common key patterns:
- `sop_step_{run_id}_{step_number}`: per-step result
- `sop_approval_{run_id}_{step_number}`: operator approval record
- `sop_timeout_approve_{run_id}_{step_number}`: timeout auto-approval record
- `sop_gate_decision_{gate_id}_{timestamp_ms}`: gate evaluator decision record (when `ampersona-gates` is enabled)
- `sop_phase_state`: persisted trust-phase state snapshot (when `ampersona-gates` is enabled)
## 2. Inspection Paths

View File

@ -25,11 +25,11 @@ pub use schema::{
ProjectIntelConfig, ProxyConfig, ProxyScope, QdrantConfig, QueryClassificationConfig,
ReliabilityConfig, ResourceLimitsConfig, RuntimeConfig, SandboxBackend, SandboxConfig,
SchedulerConfig, SecretsConfig, SecurityConfig, SecurityOpsConfig, SkillCreationConfig,
SkillsConfig, SkillsPromptInjectionMode, SlackConfig, StorageConfig, StorageProviderConfig,
StorageProviderSection, StreamMode, SwarmConfig, SwarmStrategy, TelegramConfig,
TextBrowserConfig, ToolFilterGroup, ToolFilterGroupMode, TranscriptionConfig, TtsConfig,
TunnelConfig, VerifiableIntentConfig, WebFetchConfig, WebSearchConfig, WebhookConfig,
WhatsAppChatPolicy, WhatsAppWebMode, WorkspaceConfig, DEFAULT_GWS_SERVICES,
SkillsConfig, SkillsPromptInjectionMode, SlackConfig, SopConfig, StorageConfig,
StorageProviderConfig, StorageProviderSection, StreamMode, SwarmConfig, SwarmStrategy,
TelegramConfig, TextBrowserConfig, ToolFilterGroup, ToolFilterGroupMode, TranscriptionConfig,
TtsConfig, TunnelConfig, VerifiableIntentConfig, WebFetchConfig, WebSearchConfig,
WebhookConfig, WhatsAppChatPolicy, WhatsAppWebMode, WorkspaceConfig, DEFAULT_GWS_SERVICES,
};
pub fn name_and_presence<T: traits::ChannelConfig>(channel: Option<&T>) -> (&'static str, bool) {

View File

@ -379,6 +379,10 @@ pub struct Config {
/// Claude Code tool configuration (`[claude_code]`).
#[serde(default)]
pub claude_code: ClaudeCodeConfig,
/// Standard Operating Procedures engine configuration (`[sop]`).
#[serde(default)]
pub sop: SopConfig,
}
/// Multi-client workspace isolation configuration.
@ -1248,6 +1252,12 @@ pub struct AgentConfig {
/// Default: `[]` (no filtering — all tools included).
#[serde(default)]
pub tool_filter_groups: Vec<ToolFilterGroup>,
/// Maximum characters for the assembled system prompt. When `> 0`, the prompt
/// is truncated to this limit after assembly (keeping the top portion which
/// contains identity and safety instructions). `0` means unlimited.
/// Useful for small-context models (e.g. glm-4.5-air ~8K tokens → set to 8000).
#[serde(default = "default_max_system_prompt_chars")]
pub max_system_prompt_chars: usize,
}
fn default_agent_max_tool_iterations() -> usize {
@ -1266,6 +1276,10 @@ fn default_agent_tool_dispatcher() -> String {
"auto".into()
}
fn default_max_system_prompt_chars() -> usize {
0
}
impl Default for AgentConfig {
fn default() -> Self {
Self {
@ -1277,6 +1291,7 @@ impl Default for AgentConfig {
tool_dispatcher: default_agent_tool_dispatcher(),
tool_call_dedup_exempt: Vec::new(),
tool_filter_groups: Vec::new(),
max_system_prompt_chars: default_max_system_prompt_chars(),
}
}
}
@ -6815,6 +6830,7 @@ impl Default for Config {
locale: None,
verifiable_intent: VerifiableIntentConfig::default(),
claude_code: ClaudeCodeConfig::default(),
sop: SopConfig::default(),
}
}
}
@ -9267,6 +9283,70 @@ async fn sync_directory(path: &Path) -> Result<()> {
}
}
// ── SOP engine configuration ───────────────────────────────────
/// Standard Operating Procedures engine configuration (`[sop]`).
///
/// The `default_execution_mode` field uses the `SopExecutionMode` type from
/// `sop::types` (re-exported via `sop::SopExecutionMode`). To avoid circular
/// module references, config stores it using the same enum definition.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct SopConfig {
/// Directory containing SOP definitions (subdirs with SOP.toml + SOP.md).
/// Falls back to `<workspace>/sops` when omitted.
#[serde(default)]
pub sops_dir: Option<String>,
/// Default execution mode for SOPs that omit `execution_mode`.
/// Values: `auto`, `supervised` (default), `step_by_step`,
/// `priority_based`, `deterministic`.
#[serde(default = "default_sop_execution_mode")]
pub default_execution_mode: String,
/// Maximum total concurrent SOP runs across all SOPs.
#[serde(default = "default_sop_max_concurrent_total")]
pub max_concurrent_total: usize,
/// Approval timeout in seconds. When a run waits for approval longer than
/// this, Critical/High-priority SOPs auto-approve; others stay waiting.
/// Set to 0 to disable timeout.
#[serde(default = "default_sop_approval_timeout_secs")]
pub approval_timeout_secs: u64,
/// Maximum number of finished runs kept in memory for status queries.
/// Oldest runs are evicted when over capacity. 0 = unlimited.
#[serde(default = "default_sop_max_finished_runs")]
pub max_finished_runs: usize,
}
fn default_sop_execution_mode() -> String {
"supervised".to_string()
}
fn default_sop_max_concurrent_total() -> usize {
4
}
fn default_sop_approval_timeout_secs() -> u64 {
300
}
fn default_sop_max_finished_runs() -> usize {
100
}
impl Default for SopConfig {
fn default() -> Self {
Self {
sops_dir: None,
default_execution_mode: default_sop_execution_mode(),
max_concurrent_total: default_sop_max_concurrent_total(),
approval_timeout_secs: default_sop_approval_timeout_secs(),
max_finished_runs: default_sop_max_finished_runs(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -9738,6 +9818,7 @@ default_temperature = 0.7
locale: None,
verifiable_intent: VerifiableIntentConfig::default(),
claude_code: ClaudeCodeConfig::default(),
sop: SopConfig::default(),
};
let toml_str = toml::to_string_pretty(&config).unwrap();
@ -10119,6 +10200,7 @@ default_temperature = 0.7
locale: None,
verifiable_intent: VerifiableIntentConfig::default(),
claude_code: ClaudeCodeConfig::default(),
sop: SopConfig::default(),
};
config.save().await.unwrap();
@ -13667,9 +13749,9 @@ require_otp_to_resume = true
// ── Bootstrap files ─────────────────────────────────────
#[test]
#[tokio::test]
async fn ensure_bootstrap_files_creates_missing_files() {
let tmp = TempDir::new().unwrap();
let tmp = tempfile::TempDir::new().unwrap();
let ws = tmp.path().join("workspace");
tokio::fs::create_dir_all(&ws).await.unwrap();
@ -13683,9 +13765,9 @@ require_otp_to_resume = true
assert!(identity.contains("IDENTITY.md"));
}
#[test]
#[tokio::test]
async fn ensure_bootstrap_files_does_not_overwrite_existing() {
let tmp = TempDir::new().unwrap();
let tmp = tempfile::TempDir::new().unwrap();
let ws = tmp.path().join("workspace");
tokio::fs::create_dir_all(&ws).await.unwrap();

View File

@ -71,6 +71,7 @@ pub mod runtime;
pub(crate) mod security;
pub(crate) mod service;
pub(crate) mod skills;
pub mod sop;
pub mod tools;
pub(crate) mod tunnel;
pub(crate) mod util;
@ -561,3 +562,20 @@ Examples:
/// Flash ZeroClaw firmware to Nucleo-F401RE (builds + probe-rs run)
FlashNucleo,
}
/// SOP management subcommands
#[derive(Subcommand, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum SopCommands {
/// List loaded SOPs
List,
/// Validate SOP definitions
Validate {
/// SOP name to validate (all if omitted)
name: Option<String>,
},
/// Show details of an SOP
Show {
/// Name of the SOP to show
name: String,
},
}

View File

@ -107,6 +107,7 @@ mod security;
mod service;
mod skillforge;
mod skills;
mod sop;
mod tools;
mod tunnel;
mod util;
@ -117,7 +118,7 @@ use config::Config;
// Re-export so binary modules can use crate::<CommandEnum> while keeping a single source of truth.
pub use zeroclaw::{
ChannelCommands, CronCommands, GatewayCommands, HardwareCommands, IntegrationCommands,
MigrateCommands, PeripheralCommands, ServiceCommands, SkillCommands,
MigrateCommands, PeripheralCommands, ServiceCommands, SkillCommands, SopCommands,
};
#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]

View File

@ -201,6 +201,7 @@ pub async fn run_wizard(force: bool) -> Result<Config> {
locale: None,
verifiable_intent: crate::config::VerifiableIntentConfig::default(),
claude_code: crate::config::ClaudeCodeConfig::default(),
sop: crate::config::SopConfig::default(),
};
println!(
@ -624,6 +625,7 @@ async fn run_quick_setup_with_home(
locale: None,
verifiable_intent: crate::config::VerifiableIntentConfig::default(),
claude_code: crate::config::ClaudeCodeConfig::default(),
sop: crate::config::SopConfig::default(),
};
config.save().await?;

View File

@ -78,33 +78,6 @@ impl SopAuditLogger {
Ok(())
}
/// Log a gate evaluation decision record.
#[cfg(feature = "ampersona-gates")]
pub async fn log_gate_decision(
&self,
record: &ampersona_engine::gates::decision::GateDecisionRecord,
) -> Result<()> {
let timestamp_ms = chrono::Utc::now().timestamp_millis();
let key = format!("sop_gate_decision_{}_{timestamp_ms}", record.gate_id);
let content = serde_json::to_string_pretty(record)?;
self.memory.store(&key, &content, category(), None).await?;
info!(
gate_id = %record.gate_id,
decision = %record.decision,
"SOP audit: gate decision logged"
);
Ok(())
}
/// Persist (upsert) the current gate phase state.
#[cfg(feature = "ampersona-gates")]
pub async fn log_phase_state(&self, state: &ampersona_core::state::PhaseState) -> Result<()> {
let key = "sop_phase_state";
let content = serde_json::to_string_pretty(state)?;
self.memory.store(key, &content, category(), None).await?;
Ok(())
}
/// Retrieve a stored run by ID (if it exists in memory).
pub async fn get_run(&self, run_id: &str) -> Result<Option<SopRun>> {
let key = run_key(run_id);
@ -166,6 +139,7 @@ mod tests {
completed_at: None,
step_results: Vec::new(),
waiting_since: None,
llm_calls_saved: 0,
}
}

View File

@ -24,7 +24,7 @@ pub enum DispatchResult {
Started {
run_id: String,
sop_name: String,
action: SopRunAction,
action: Box<SopRunAction>,
},
/// A matching SOP was found but could not start (cooldown / concurrency).
Skipped { sop_name: String, reason: String },
@ -39,6 +39,8 @@ fn extract_run_id_from_action(action: &SopRunAction) -> &str {
match action {
SopRunAction::ExecuteStep { run_id, .. }
| SopRunAction::WaitApproval { run_id, .. }
| SopRunAction::DeterministicStep { run_id, .. }
| SopRunAction::CheckpointWait { run_id, .. }
| SopRunAction::Completed { run_id, .. }
| SopRunAction::Failed { run_id, .. } => run_id,
}
@ -49,6 +51,8 @@ fn action_label(action: &SopRunAction) -> &'static str {
match action {
SopRunAction::ExecuteStep { .. } => "ExecuteStep",
SopRunAction::WaitApproval { .. } => "WaitApproval",
SopRunAction::DeterministicStep { .. } => "DeterministicStep",
SopRunAction::CheckpointWait { .. } => "CheckpointWait",
SopRunAction::Completed { .. } => "Completed",
SopRunAction::Failed { .. } => "Failed",
}
@ -62,7 +66,6 @@ fn action_label(action: &SopRunAction) -> &'static str {
/// 1. Lock → `match_trigger` → collect SOP names → drop lock
/// 2. Lock → for each name: `start_run` → collect results → drop lock
/// 3. Async (no lock): audit each started run
#[tracing::instrument(skip(engine, audit), fields(source = %event.source, topic = ?event.topic))]
pub async fn dispatch_sop_event(
engine: &Arc<Mutex<SopEngine>>,
audit: &SopAuditLogger,
@ -124,7 +127,7 @@ pub async fn dispatch_sop_event(
results.push(DispatchResult::Started {
run_id,
sop_name: sop_name.clone(),
action,
action: Box::new(action),
});
}
Err(e) => {
@ -158,14 +161,14 @@ pub async fn dispatch_sop_event(
/// approval timeout polling in the scheduler handles progression.
/// For `ExecuteStep` actions, the run is started in the engine but steps
/// cannot be executed without an agent loop — this is logged as a warning.
pub async fn process_headless_results(results: &[DispatchResult]) {
pub fn process_headless_results(results: &[DispatchResult]) {
for result in results {
match result {
DispatchResult::Started {
run_id,
sop_name,
action,
} => match action {
} => match action.as_ref() {
SopRunAction::ExecuteStep { step, .. } => {
warn!(
"SOP headless dispatch: run {run_id} ('{sop_name}') ready for step {} \
@ -180,6 +183,24 @@ pub async fn process_headless_results(results: &[DispatchResult]) {
step.number, step.title,
);
}
SopRunAction::DeterministicStep { step, .. } => {
info!(
"SOP headless dispatch: run {run_id} ('{sop_name}') deterministic step {} \
'{}'",
step.number, step.title,
);
}
SopRunAction::CheckpointWait {
step, state_file, ..
} => {
info!(
"SOP headless dispatch: run {run_id} ('{sop_name}') checkpoint at step {} \
'{}', state persisted to {}",
step.number,
step.title,
state_file.display(),
);
}
SopRunAction::Completed { .. } => {
info!(
"SOP headless dispatch: run {run_id} ('{sop_name}') completed immediately"
@ -250,7 +271,7 @@ impl SopCronCache {
for trigger in &sop.triggers {
if let super::types::SopTrigger::Cron { expression } = trigger {
// Normalize 5-field crontab to 6-field (prepend seconds)
let normalized = match crate::cron::schedule::normalize_expression(expression) {
let normalized = match crate::cron::normalize_expression(expression) {
Ok(n) => n,
Err(e) => {
warn!(
@ -349,10 +370,13 @@ mod tests {
body: "Do step one".into(),
suggested_tools: vec![],
requires_confirmation: false,
kind: crate::sop::SopStepKind::default(),
schema: None,
}],
cooldown_secs: 0,
max_concurrent: 2,
location: None,
deterministic: false,
}
}
@ -396,7 +420,7 @@ mod tests {
let results = dispatch_sop_event(&engine, &audit, event).await;
assert_eq!(results.len(), 1);
assert!(
matches!(&results[0], DispatchResult::Started { sop_name, action, .. } if sop_name == "mqtt-sop" && matches!(action, SopRunAction::ExecuteStep { .. }))
matches!(&results[0], DispatchResult::Started { sop_name, action, .. } if sop_name == "mqtt-sop" && matches!(action.as_ref(), SopRunAction::ExecuteStep { .. }))
);
}
@ -534,7 +558,7 @@ mod tests {
assert_eq!(sop_name, "supervised-sop");
assert!(!run_id.is_empty());
assert!(
matches!(action, SopRunAction::WaitApproval { .. }),
matches!(action.as_ref(), SopRunAction::WaitApproval { .. }),
"Supervised SOP must return WaitApproval, got {:?}",
action
);
@ -561,7 +585,7 @@ mod tests {
match &results[0] {
DispatchResult::Started { action, .. } => {
assert!(
matches!(action, SopRunAction::ExecuteStep { .. }),
matches!(action.as_ref(), SopRunAction::ExecuteStep { .. }),
"Auto SOP must return ExecuteStep, got {:?}",
action
);

View File

@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::fmt::Write as _;
use std::path::Path;
use std::path::{Path, PathBuf};
use anyhow::{bail, Result};
use tracing::{info, warn};
@ -8,8 +8,9 @@ use tracing::{info, warn};
use super::condition::evaluate_condition;
use super::load_sops;
use super::types::{
Sop, SopEvent, SopPriority, SopRun, SopRunAction, SopRunStatus, SopStep, SopStepResult,
SopStepStatus, SopTrigger, SopTriggerSource,
DeterministicRunState, DeterministicSavings, Sop, SopEvent, SopExecutionMode, SopPriority,
SopRun, SopRunAction, SopRunStatus, SopStep, SopStepKind, SopStepResult, SopStepStatus,
SopTrigger, SopTriggerSource,
};
use crate::config::SopConfig;
@ -21,6 +22,8 @@ pub struct SopEngine {
finished_runs: Vec<SopRun>,
config: SopConfig,
run_counter: u64,
/// Cumulative savings from deterministic execution.
deterministic_savings: DeterministicSavings,
}
impl SopEngine {
@ -32,6 +35,7 @@ impl SopEngine {
finished_runs: Vec::new(),
config,
run_counter: 0,
deterministic_savings: DeterministicSavings::default(),
}
}
@ -40,7 +44,7 @@ impl SopEngine {
self.sops = load_sops(
workspace_dir,
self.config.sops_dir.as_deref(),
self.config.default_execution_mode,
super::parse_execution_mode(&self.config.default_execution_mode),
);
info!("SOP engine loaded {} SOPs", self.sops.len());
}
@ -118,7 +122,15 @@ impl SopEngine {
}
/// Start a new SOP run. Returns the first action to take.
/// Deterministic SOPs are automatically routed to `start_deterministic_run`.
pub fn start_run(&mut self, sop_name: &str, event: SopEvent) -> Result<SopRunAction> {
// Route deterministic SOPs to dedicated path
if self.get_sop(sop_name).map_or(false, |s| {
s.execution_mode == SopExecutionMode::Deterministic
}) {
return self.start_deterministic_run(sop_name, event);
}
let sop = self
.get_sop(sop_name)
.ok_or_else(|| anyhow::anyhow!("SOP not found: {sop_name}"))?
@ -154,6 +166,7 @@ impl SopEngine {
completed_at: None,
step_results: Vec::new(),
waiting_since: None,
llm_calls_saved: 0,
};
self.active_runs.insert(run_id.clone(), run);
@ -283,6 +296,273 @@ impl SopEngine {
.collect()
}
/// Return cumulative deterministic execution savings.
pub fn deterministic_savings(&self) -> &DeterministicSavings {
&self.deterministic_savings
}
// ── Deterministic execution ─────────────────────────────────
/// Start a deterministic SOP run. Steps execute sequentially without LLM
/// round-trips. Returns the first action (DeterministicStep or CheckpointWait).
pub fn start_deterministic_run(
&mut self,
sop_name: &str,
event: SopEvent,
) -> Result<SopRunAction> {
let sop = self
.get_sop(sop_name)
.ok_or_else(|| anyhow::anyhow!("SOP not found: {sop_name}"))?
.clone();
if sop.execution_mode != SopExecutionMode::Deterministic {
bail!(
"SOP '{}' is not in deterministic mode (mode: {})",
sop_name,
sop.execution_mode
);
}
if !self.can_start(sop_name) {
bail!(
"Cannot start SOP '{}': cooldown or concurrency limit reached",
sop_name
);
}
if sop.steps.is_empty() {
bail!("SOP '{}' has no steps defined", sop_name);
}
self.run_counter += 1;
let dur = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();
let epoch_ms = dur.as_secs() * 1000 + u64::from(dur.subsec_millis());
let run_id = format!("det-{epoch_ms}-{:04}", self.run_counter);
let now = now_iso8601();
let total_steps = u32::try_from(sop.steps.len()).unwrap_or(u32::MAX);
let run = SopRun {
run_id: run_id.clone(),
sop_name: sop_name.to_string(),
trigger_event: event,
status: SopRunStatus::Running,
current_step: 1,
total_steps,
started_at: now,
completed_at: None,
step_results: Vec::new(),
waiting_since: None,
llm_calls_saved: 0,
};
self.active_runs.insert(run_id.clone(), run);
info!(
"Deterministic SOP run {} started for '{}'",
run_id, sop_name
);
// Produce first step action
let step = sop.steps[0].clone();
let input = serde_json::Value::Null;
self.resolve_deterministic_action(&sop, &run_id, &step, input)
}
/// Advance a deterministic run with the output of the current step.
/// The output is piped as input to the next step.
pub fn advance_deterministic_step(
&mut self,
run_id: &str,
step_output: serde_json::Value,
) -> Result<SopRunAction> {
let run = self
.active_runs
.get_mut(run_id)
.ok_or_else(|| anyhow::anyhow!("Active run not found: {run_id}"))?;
let sop = self
.sops
.iter()
.find(|s| s.name == run.sop_name)
.ok_or_else(|| anyhow::anyhow!("SOP '{}' no longer loaded", run.sop_name))?
.clone();
// Record step result
let now = now_iso8601();
let step_result = SopStepResult {
step_number: run.current_step,
status: SopStepStatus::Completed,
output: step_output.to_string(),
started_at: run.started_at.clone(),
completed_at: Some(now),
};
run.step_results.push(step_result);
// Each deterministic step saves one LLM call
run.llm_calls_saved += 1;
// Advance to next step
let next_step_num = run.current_step + 1;
if next_step_num > run.total_steps {
info!(
"Deterministic SOP run {run_id} completed ({} LLM calls saved)",
run.llm_calls_saved
);
let saved = run.llm_calls_saved;
self.deterministic_savings.total_llm_calls_saved += saved;
self.deterministic_savings.total_runs += 1;
return Ok(self.finish_run(run_id, SopRunStatus::Completed, None));
}
let run = self.active_runs.get_mut(run_id).unwrap();
run.current_step = next_step_num;
let step_idx = (next_step_num - 1) as usize;
let step = sop.steps[step_idx].clone();
let run_id_owned = run_id.to_string();
self.resolve_deterministic_action(&sop, &run_id_owned, &step, step_output)
}
/// Resume a deterministic run from persisted state.
pub fn resume_deterministic_run(
&mut self,
state: DeterministicRunState,
) -> Result<SopRunAction> {
let run = self
.active_runs
.get_mut(&state.run_id)
.ok_or_else(|| anyhow::anyhow!("Active run not found: {}", state.run_id))?;
if run.status != SopRunStatus::PausedCheckpoint {
bail!(
"Run {} is not paused at checkpoint (status: {})",
state.run_id,
run.status
);
}
let sop = self
.sops
.iter()
.find(|s| s.name == run.sop_name)
.ok_or_else(|| anyhow::anyhow!("SOP '{}' no longer loaded", run.sop_name))?
.clone();
run.status = SopRunStatus::Running;
run.waiting_since = None;
run.llm_calls_saved = state.llm_calls_saved;
// Resume from the step after the last completed one
let next_step_num = state.last_completed_step + 1;
if next_step_num > state.total_steps {
info!(
"Deterministic SOP run {} completed on resume ({} LLM calls saved)",
state.run_id, state.llm_calls_saved
);
self.deterministic_savings.total_llm_calls_saved += state.llm_calls_saved;
self.deterministic_savings.total_runs += 1;
return Ok(self.finish_run(&state.run_id, SopRunStatus::Completed, None));
}
let run = self.active_runs.get_mut(&state.run_id).unwrap();
run.current_step = next_step_num;
let step_idx = (next_step_num - 1) as usize;
let step = sop.steps[step_idx].clone();
// Use last step's output as input, or Null
let last_output = state
.step_outputs
.get(&state.last_completed_step)
.cloned()
.unwrap_or(serde_json::Value::Null);
let run_id = state.run_id.clone();
self.resolve_deterministic_action(&sop, &run_id, &step, last_output)
}
/// Resolve the action for a deterministic step (execute or checkpoint).
fn resolve_deterministic_action(
&mut self,
sop: &Sop,
run_id: &str,
step: &SopStep,
input: serde_json::Value,
) -> Result<SopRunAction> {
if step.kind == SopStepKind::Checkpoint {
// Pause at checkpoint — persist state and wait for approval
if let Some(run) = self.active_runs.get_mut(run_id) {
run.status = SopRunStatus::PausedCheckpoint;
run.waiting_since = Some(now_iso8601());
}
let state_file = self.persist_deterministic_state(run_id, sop)?;
info!(
"Deterministic SOP run {run_id}: checkpoint at step {} '{}', state persisted to {}",
step.number,
step.title,
state_file.display()
);
Ok(SopRunAction::CheckpointWait {
run_id: run_id.to_string(),
step: step.clone(),
state_file,
})
} else {
Ok(SopRunAction::DeterministicStep {
run_id: run_id.to_string(),
step: step.clone(),
input,
})
}
}
/// Persist the current deterministic run state to a JSON file.
fn persist_deterministic_state(&self, run_id: &str, sop: &Sop) -> Result<PathBuf> {
let run = self
.active_runs
.get(run_id)
.ok_or_else(|| anyhow::anyhow!("Run not found: {run_id}"))?;
let mut step_outputs = HashMap::new();
for result in &run.step_results {
// Try to parse output as JSON, fall back to string value
let value = serde_json::from_str(&result.output)
.unwrap_or_else(|_| serde_json::Value::String(result.output.clone()));
step_outputs.insert(result.step_number, value);
}
let state = DeterministicRunState {
run_id: run_id.to_string(),
sop_name: run.sop_name.clone(),
last_completed_step: run.current_step.saturating_sub(1),
total_steps: run.total_steps,
step_outputs,
persisted_at: now_iso8601(),
llm_calls_saved: run.llm_calls_saved,
paused_at_checkpoint: run.status == SopRunStatus::PausedCheckpoint,
};
// Write to SOP location directory, or temp dir
let dir = sop.location.as_deref().unwrap_or_else(|| Path::new("."));
let state_file = dir.join(format!("{run_id}.state.json"));
let json = serde_json::to_string_pretty(&state)?;
std::fs::write(&state_file, json)?;
Ok(state_file)
}
/// Load a persisted deterministic run state from a JSON file.
pub fn load_deterministic_state(path: &Path) -> Result<DeterministicRunState> {
let content = std::fs::read_to_string(path)?;
let state: DeterministicRunState = serde_json::from_str(&content)?;
Ok(state)
}
// ── Approval timeout ──────────────────────────────────────────
/// Check all WaitingApproval runs for timeout. For Critical/High-priority SOPs,
@ -487,21 +767,21 @@ fn resolve_step_action(sop: &Sop, step: &SopStep, run_id: String, context: Strin
}
let needs_approval = match sop.execution_mode {
crate::sop::SopExecutionMode::Auto => false,
crate::sop::SopExecutionMode::Supervised => {
// Deterministic mode is handled via start_deterministic_run;
// if we reach here via the standard path, treat as Auto.
SopExecutionMode::Auto | SopExecutionMode::Deterministic => false,
SopExecutionMode::Supervised => {
// Supervised: approval only before the first step
step.number == 1
}
crate::sop::SopExecutionMode::StepByStep => true,
crate::sop::SopExecutionMode::PriorityBased => {
match sop.priority {
SopPriority::Critical | SopPriority::High => false,
SopPriority::Normal | SopPriority::Low => {
// Supervised behavior for normal/low
step.number == 1
}
SopExecutionMode::StepByStep => true,
SopExecutionMode::PriorityBased => match sop.priority {
SopPriority::Critical | SopPriority::High => false,
SopPriority::Normal | SopPriority::Low => {
// Supervised behavior for normal/low
step.number == 1
}
}
},
};
if needs_approval {
@ -680,6 +960,8 @@ mod tests {
body: "Do step one".into(),
suggested_tools: vec!["shell".into()],
requires_confirmation: false,
kind: SopStepKind::default(),
schema: None,
},
SopStep {
number: 2,
@ -687,11 +969,14 @@ mod tests {
body: "Do step two".into(),
suggested_tools: vec![],
requires_confirmation: false,
kind: SopStepKind::default(),
schema: None,
},
],
cooldown_secs: 0,
max_concurrent: 1,
location: None,
deterministic: false,
}
}
@ -706,6 +991,8 @@ mod tests {
match action {
SopRunAction::ExecuteStep { run_id, .. }
| SopRunAction::WaitApproval { run_id, .. }
| SopRunAction::DeterministicStep { run_id, .. }
| SopRunAction::CheckpointWait { run_id, .. }
| SopRunAction::Completed { run_id, .. }
| SopRunAction::Failed { run_id, .. } => run_id,
}
@ -1359,6 +1646,7 @@ mod tests {
completed_at: None,
step_results: Vec::new(),
waiting_since: None,
llm_calls_saved: 0,
};
let ctx = format_step_context(&sop, &run, &sop.steps[0]);
assert!(ctx.contains("pump-shutdown"));

View File

@ -1,746 +0,0 @@
//! Gate evaluation state for ampersona trust-phase transitions.
//!
//! This module is only compiled when the `ampersona-gates` feature is active
//! (module declaration in `mod.rs` is behind `#[cfg]`).
//!
//! Gate decisions do NOT change SOP execution behavior — this is purely
//! observation + phase state tracking + audit logging.
use std::path::Path;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use ampersona_core::spec::gates::Gate;
use ampersona_core::state::{PendingTransition, PhaseState, TransitionRecord};
use ampersona_core::traits::MetricsProvider;
use ampersona_engine::gates::decision::GateDecisionRecord;
use ampersona_engine::gates::evaluator::DefaultGateEvaluator;
use anyhow::Result;
use chrono::Utc;
use std::sync::Arc;
use tracing::{debug, error, info, warn};
use crate::memory::traits::{Memory, MemoryCategory};
const PHASE_STATE_KEY: &str = "sop_phase_state";
fn sop_category() -> MemoryCategory {
MemoryCategory::Custom("sop".into())
}
// ── Inner state ────────────────────────────────────────────────
struct GateEvalInner {
phase_state: PhaseState,
last_tick: Instant,
}
// ── GateEvalState ──────────────────────────────────────────────
/// Manages trust-phase gate evaluation state.
///
/// Single `Mutex<GateEvalInner>` ensures atomic interval-check + evaluate + apply.
/// `DefaultGateEvaluator` is a unit struct — called inline, not stored.
pub struct GateEvalState {
inner: Mutex<GateEvalInner>,
memory: Arc<dyn Memory>,
gates: Vec<Gate>,
tick_interval: Duration,
}
impl GateEvalState {
/// Create with fresh (default) phase state.
pub fn new(
agent_name: &str,
gates: Vec<Gate>,
interval_secs: u64,
memory: Arc<dyn Memory>,
) -> Self {
Self {
inner: Mutex::new(GateEvalInner {
phase_state: PhaseState::new(agent_name.to_string()),
last_tick: Instant::now(),
}),
memory,
gates,
tick_interval: Duration::from_secs(interval_secs),
}
}
/// Create with a known phase state (warm-start).
pub fn with_state(
state: PhaseState,
gates: Vec<Gate>,
interval_secs: u64,
memory: Arc<dyn Memory>,
) -> Self {
Self {
inner: Mutex::new(GateEvalInner {
phase_state: state,
last_tick: Instant::now(),
}),
memory,
gates,
tick_interval: Duration::from_secs(interval_secs),
}
}
/// Load gate definitions from a persona JSON file.
///
/// Expects `{"gates": [...]}` at the top level. Missing file → empty Vec.
/// Parse error → warn log + empty Vec.
pub fn load_gates_from_file(path: &Path) -> Vec<Gate> {
let content = match std::fs::read_to_string(path) {
Ok(c) => c,
Err(_) => return Vec::new(),
};
#[derive(serde::Deserialize)]
struct PersonaGates {
#[serde(default)]
gates: Vec<Gate>,
}
match serde_json::from_str::<PersonaGates>(&content) {
Ok(parsed) => parsed.gates,
Err(e) => {
warn!(path = %path.display(), error = %e, "failed to parse gates from persona file");
Vec::new()
}
}
}
/// Rebuild from Memory backend (warm-start).
///
/// Loads `PhaseState` from Memory key `sop_phase_state`, loads gates from
/// file, falls back to fresh state on parse error.
pub async fn rebuild_from_memory(
memory: Arc<dyn Memory>,
agent_name: &str,
gates_file: Option<&Path>,
interval_secs: u64,
) -> Result<Self> {
let gates = gates_file
.map(Self::load_gates_from_file)
.unwrap_or_default();
let phase_state = match memory.get(PHASE_STATE_KEY).await? {
Some(entry) => match serde_json::from_str::<PhaseState>(&entry.content) {
Ok(state) => {
info!(
phase = ?state.current_phase,
rev = state.state_rev,
"gate eval warm-started from memory"
);
state
}
Err(e) => {
warn!(error = %e, "failed to parse phase state from memory, using fresh state");
PhaseState::new(agent_name.to_string())
}
},
None => PhaseState::new(agent_name.to_string()),
};
Ok(Self::with_state(phase_state, gates, interval_secs, memory))
}
/// Atomic tick: interval check + evaluate + apply under single lock.
///
/// Returns `Some(record)` if a gate fired, `None` otherwise.
pub fn tick(&self, metrics: &dyn MetricsProvider) -> Option<GateDecisionRecord> {
let _span = tracing::info_span!("gate_eval_tick", gates = self.gates.len()).entered();
// interval_secs=0 means disabled
if self.tick_interval.is_zero() {
return None;
}
if self.inner.is_poisoned() {
error!("gate eval mutex poisoned — loss of gate evaluation until restart");
return None;
}
let mut inner = self.inner.lock().ok()?;
// Check interval
if inner.last_tick.elapsed() < self.tick_interval {
return None;
}
inner.last_tick = Instant::now();
// Evaluate
let record = DefaultGateEvaluator.evaluate(&self.gates, &inner.phase_state, metrics);
match record {
Some(ref record) => {
// Apply decision in-place under the same lock
apply_decision(&mut inner.phase_state, record);
info!(
gate_id = %record.gate_id,
decision = %record.decision,
from = ?record.from_phase,
to = %record.to_phase,
"gate decision"
);
}
None => {
debug!("no gate fired");
}
}
record
}
/// Persist current phase state to Memory.
pub async fn persist(&self) -> Result<()> {
let content = {
let inner = self
.inner
.lock()
.map_err(|e| anyhow::anyhow!("gate eval lock poisoned: {e}"))?;
serde_json::to_string_pretty(&inner.phase_state)?
};
self.memory
.store(PHASE_STATE_KEY, &content, sop_category(), None)
.await?;
Ok(())
}
/// Snapshot of current phase state (for diagnostics / sop_status).
pub fn phase_state_snapshot(&self) -> Option<PhaseState> {
self.inner.lock().ok().map(|g| g.phase_state.clone())
}
/// Number of loaded gate definitions.
pub fn gate_count(&self) -> usize {
self.gates.len()
}
}
// ── Decision application ───────────────────────────────────────
fn apply_decision(state: &mut PhaseState, record: &GateDecisionRecord) {
match record.decision.as_str() {
"transition" => {
state.current_phase = Some(record.to_phase.clone());
state.state_rev += 1;
state.last_transition = Some(TransitionRecord {
gate_id: record.gate_id.clone(),
from_phase: record.from_phase.clone(),
to_phase: record.to_phase.clone(),
at: Utc::now(),
decision_id: format!(
"{}-{}-{}",
record.gate_id, record.state_rev, record.metrics_hash
),
metrics_hash: Some(record.metrics_hash.clone()),
state_rev: state.state_rev,
});
state.pending_transition = None;
state.updated_at = Utc::now();
}
"observed" => {
debug!(
gate_id = %record.gate_id,
"observed gate — no state change"
);
}
"pending_human" => {
state.pending_transition = Some(PendingTransition {
gate_id: record.gate_id.clone(),
from_phase: record.from_phase.clone(),
to_phase: record.to_phase.clone(),
decision: record.decision.clone(),
metrics_hash: record.metrics_hash.clone(),
state_rev: record.state_rev,
created_at: Utc::now(),
});
state.updated_at = Utc::now();
}
other => {
warn!(decision = %other, gate_id = %record.gate_id, "unknown gate decision — skipping");
}
}
}
// ── Tests ──────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use ampersona_core::errors::MetricError;
use ampersona_core::spec::gates::Gate;
use ampersona_core::traits::{MetricQuery, MetricSample};
use ampersona_core::types::{CriterionOp, GateApproval, GateDirection, GateEnforcement};
use serde_json::json;
use std::collections::HashMap;
// ── Mock MetricsProvider ──────────────────────────────────
struct MockMetrics {
values: HashMap<String, serde_json::Value>,
}
impl MockMetrics {
fn new(values: Vec<(&str, serde_json::Value)>) -> Self {
Self {
values: values
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect(),
}
}
}
impl MetricsProvider for MockMetrics {
fn get_metric(&self, query: &MetricQuery) -> Result<MetricSample, MetricError> {
self.values
.get(&query.name)
.cloned()
.map(|value| MetricSample {
name: query.name.clone(),
value,
sampled_at: Utc::now(),
})
.ok_or_else(|| MetricError::NotFound(query.name.clone()))
}
}
// ── Helpers ───────────────────────────────────────────────
fn make_promote_gate(
id: &str,
metric: &str,
op: CriterionOp,
value: serde_json::Value,
to_phase: &str,
) -> Gate {
Gate {
id: id.into(),
direction: GateDirection::Promote,
enforcement: GateEnforcement::Enforce,
priority: 0,
cooldown_seconds: 0,
from_phase: None,
to_phase: to_phase.into(),
criteria: vec![ampersona_core::spec::gates::Criterion {
metric: metric.into(),
op,
value,
window_seconds: None,
}],
metrics_schema: None,
approval: GateApproval::Auto,
on_pass: None,
}
}
fn test_memory() -> Arc<dyn Memory> {
let mem_cfg = crate::config::MemoryConfig {
backend: "sqlite".into(),
..crate::config::MemoryConfig::default()
};
let tmp = tempfile::tempdir().unwrap();
Arc::from(crate::memory::create_memory(&mem_cfg, tmp.path(), None).unwrap())
}
// ── Tests ─────────────────────────────────────────────────
#[test]
fn tick_no_gates_returns_none() {
let mem = test_memory();
let ge = GateEvalState::new("test-agent", vec![], 1, mem);
let metrics = MockMetrics::new(vec![]);
// Force past interval
{
let mut inner = ge.inner.lock().unwrap();
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
}
assert!(ge.tick(&metrics).is_none());
}
#[test]
fn tick_with_passing_gate_returns_decision() {
let mem = test_memory();
let gate = make_promote_gate(
"g1",
"sop.completion_rate",
CriterionOp::Gte,
json!(0.8),
"active",
);
let ge = GateEvalState::new("test-agent", vec![gate], 1, mem);
let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.9))]);
{
let mut inner = ge.inner.lock().unwrap();
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
}
let record = ge.tick(&metrics);
assert!(record.is_some());
let record = record.unwrap();
assert_eq!(record.gate_id, "g1");
assert_eq!(record.to_phase, "active");
}
#[test]
fn tick_transition_advances_phase() {
let mem = test_memory();
let gate = make_promote_gate(
"g1",
"sop.completion_rate",
CriterionOp::Gte,
json!(0.8),
"active",
);
let ge = GateEvalState::new("test-agent", vec![gate], 1, mem);
let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]);
{
let mut inner = ge.inner.lock().unwrap();
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
}
ge.tick(&metrics);
let snap = ge.phase_state_snapshot().unwrap();
assert_eq!(snap.current_phase, Some("active".into()));
assert!(snap.state_rev > 0);
assert!(snap.last_transition.is_some());
}
#[test]
fn tick_observed_no_state_change() {
let mem = test_memory();
let mut gate = make_promote_gate(
"g1",
"sop.completion_rate",
CriterionOp::Gte,
json!(0.8),
"active",
);
gate.enforcement = GateEnforcement::Observe;
let ge = GateEvalState::new("test-agent", vec![gate], 1, mem);
let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]);
{
let mut inner = ge.inner.lock().unwrap();
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
}
let record = ge.tick(&metrics);
assert!(record.is_some());
assert_eq!(record.unwrap().decision, "observed");
let snap = ge.phase_state_snapshot().unwrap();
assert!(snap.current_phase.is_none()); // no change
assert_eq!(snap.state_rev, 0);
}
#[test]
fn tick_pending_human_sets_pending() {
let mem = test_memory();
let mut gate = make_promote_gate(
"g1",
"sop.completion_rate",
CriterionOp::Gte,
json!(0.8),
"active",
);
gate.approval = GateApproval::Human;
let ge = GateEvalState::new("test-agent", vec![gate], 1, mem);
let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]);
{
let mut inner = ge.inner.lock().unwrap();
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
}
let record = ge.tick(&metrics);
assert!(record.is_some());
assert_eq!(record.unwrap().decision, "pending_human");
let snap = ge.phase_state_snapshot().unwrap();
assert!(snap.pending_transition.is_some());
assert_eq!(snap.pending_transition.unwrap().to_phase, "active");
}
#[test]
fn load_gates_missing_file_returns_empty() {
let gates = GateEvalState::load_gates_from_file(Path::new("/nonexistent/persona.json"));
assert!(gates.is_empty());
}
#[test]
fn load_gates_valid_persona() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("persona.json");
std::fs::write(
&path,
r#"{
"gates": [{
"id": "g1",
"direction": "promote",
"to_phase": "active",
"criteria": [{"metric": "sop.completion_rate", "op": "gte", "value": 0.8}]
}]
}"#,
)
.unwrap();
let gates = GateEvalState::load_gates_from_file(&path);
assert_eq!(gates.len(), 1);
assert_eq!(gates[0].id, "g1");
}
#[test]
fn load_gates_no_gates_key_returns_empty() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("persona.json");
std::fs::write(&path, r#"{"name": "test"}"#).unwrap();
let gates = GateEvalState::load_gates_from_file(&path);
assert!(gates.is_empty());
}
#[test]
fn load_gates_invalid_json_returns_empty() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("persona.json");
std::fs::write(&path, "not json at all {{{").unwrap();
let gates = GateEvalState::load_gates_from_file(&path);
assert!(gates.is_empty());
}
#[tokio::test]
async fn warm_start_roundtrip() {
let mem = test_memory();
let gate = make_promote_gate(
"g1",
"sop.completion_rate",
CriterionOp::Gte,
json!(0.8),
"active",
);
// Create, tick to advance state, persist
let ge = GateEvalState::new("test-agent", vec![gate.clone()], 1, Arc::clone(&mem));
let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]);
{
let mut inner = ge.inner.lock().unwrap();
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
}
ge.tick(&metrics);
ge.persist().await.unwrap();
// Write gates file for rebuild
let dir = tempfile::tempdir().unwrap();
let gates_path = dir.path().join("persona.json");
std::fs::write(
&gates_path,
serde_json::to_string(&serde_json::json!({"gates": [gate]})).unwrap(),
)
.unwrap();
// Rebuild
let ge2 = GateEvalState::rebuild_from_memory(
Arc::clone(&mem),
"test-agent",
Some(gates_path.as_path()),
1,
)
.await
.unwrap();
let snap = ge2.phase_state_snapshot().unwrap();
assert_eq!(snap.current_phase, Some("active".into()));
assert!(snap.state_rev > 0);
assert_eq!(ge2.gate_count(), 1);
}
#[tokio::test]
async fn warm_start_empty_memory() {
let mem = test_memory();
let ge = GateEvalState::rebuild_from_memory(Arc::clone(&mem), "test-agent", None, 60)
.await
.unwrap();
let snap = ge.phase_state_snapshot().unwrap();
assert!(snap.current_phase.is_none());
assert_eq!(snap.state_rev, 0);
assert_eq!(ge.gate_count(), 0);
}
#[test]
fn demote_priority_over_promote() {
let mem = test_memory();
let promote = make_promote_gate(
"promote-g",
"sop.completion_rate",
CriterionOp::Gte,
json!(0.8),
"active",
);
let mut demote = make_promote_gate(
"demote-g",
"sop.deviation_rate",
CriterionOp::Gte,
json!(0.3),
"restricted",
);
demote.direction = GateDirection::Demote;
demote.from_phase = Some("active".into());
let state = PhaseState {
current_phase: Some("active".into()),
..PhaseState::new("test-agent".into())
};
let ge = GateEvalState::with_state(state, vec![promote, demote], 1, mem);
let metrics = MockMetrics::new(vec![
("sop.completion_rate", json!(0.95)),
("sop.deviation_rate", json!(0.5)),
]);
{
let mut inner = ge.inner.lock().unwrap();
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
}
let record = ge.tick(&metrics).unwrap();
// Demote should fire first (evaluator sorts demote before promote)
assert_eq!(record.gate_id, "demote-g");
assert_eq!(record.to_phase, "restricted");
}
#[test]
fn idempotent_tick_after_apply() {
let mem = test_memory();
let gate = make_promote_gate(
"g1",
"sop.completion_rate",
CriterionOp::Gte,
json!(0.8),
"active",
);
let ge = GateEvalState::new("test-agent", vec![gate], 1, mem);
let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]);
// First tick — fires
{
let mut inner = ge.inner.lock().unwrap();
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
}
let first = ge.tick(&metrics);
assert!(first.is_some());
// Second tick with same metrics + updated state_rev — should not fire again
// (evaluator idempotency via metrics_hash + state_rev)
{
let mut inner = ge.inner.lock().unwrap();
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
}
let second = ge.tick(&metrics);
assert!(second.is_none());
}
#[test]
fn gate_tick_with_real_collector() {
use crate::sop::metrics::SopMetricsCollector;
use crate::sop::types::{
SopEvent, SopRun, SopRunStatus, SopStepResult, SopStepStatus, SopTriggerSource,
};
let mem = test_memory();
let collector = SopMetricsCollector::new();
// Record a completed run
let run = SopRun {
run_id: "r1".into(),
sop_name: "test-sop".into(),
trigger_event: SopEvent {
source: SopTriggerSource::Manual,
topic: None,
payload: None,
timestamp: "2026-02-19T12:00:00Z".into(),
},
status: SopRunStatus::Completed,
current_step: 1,
total_steps: 1,
started_at: "2026-02-19T12:00:00Z".into(),
completed_at: Some("2026-02-19T12:05:00Z".into()),
step_results: vec![SopStepResult {
step_number: 1,
status: SopStepStatus::Completed,
output: "done".into(),
started_at: "2026-02-19T12:00:00Z".into(),
completed_at: Some("2026-02-19T12:01:00Z".into()),
}],
waiting_since: None,
};
collector.record_run_complete(&run);
let gate = make_promote_gate(
"g1",
"sop.completion_rate",
CriterionOp::Gte,
json!(0.8),
"active",
);
let ge = GateEvalState::new("test-agent", vec![gate], 1, mem);
{
let mut inner = ge.inner.lock().unwrap();
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
}
let record = ge.tick(&collector);
assert!(record.is_some());
assert_eq!(record.unwrap().to_phase, "active");
}
#[test]
fn tick_respects_interval() {
let mem = test_memory();
let gate = make_promote_gate(
"g1",
"sop.completion_rate",
CriterionOp::Gte,
json!(0.8),
"active",
);
// Long interval
let ge = GateEvalState::new("test-agent", vec![gate.clone()], 3600, mem.clone());
let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]);
// last_tick is Instant::now() — not enough elapsed
assert!(ge.tick(&metrics).is_none());
// Zero interval = disabled
let ge_disabled = GateEvalState::new("test-agent", vec![gate], 0, mem);
assert!(ge_disabled.tick(&metrics).is_none());
}
#[test]
fn ampersona_decision_strings_stable() {
// Canary test: verifies that DefaultGateEvaluator produces the decision
// strings we expect. If ampersona changes them, this test fails.
let state = PhaseState::new("test".into());
// Enforce promote → "transition"
let enforce_gate =
make_promote_gate("g-enforce", "m", CriterionOp::Gte, json!(1), "phase-b");
let metrics = MockMetrics::new(vec![("m", json!(1))]);
let record = DefaultGateEvaluator.evaluate(&[enforce_gate], &state, &metrics);
assert_eq!(
record.as_ref().map(|r| r.decision.as_str()),
Some("transition")
);
// Observe promote → "observed"
let mut observe_gate =
make_promote_gate("g-observe", "m", CriterionOp::Gte, json!(1), "phase-b");
observe_gate.enforcement = GateEnforcement::Observe;
let record = DefaultGateEvaluator.evaluate(&[observe_gate], &state, &metrics);
assert_eq!(
record.as_ref().map(|r| r.decision.as_str()),
Some("observed")
);
// RequireApproval promote → "pending_human"
let mut approval_gate =
make_promote_gate("g-approval", "m", CriterionOp::Gte, json!(1), "phase-b");
approval_gate.approval = GateApproval::Human;
let record = DefaultGateEvaluator.evaluate(&[approval_gate], &state, &metrics);
assert_eq!(
record.as_ref().map(|r| r.decision.as_str()),
Some("pending_human")
);
}
}

View File

@ -413,34 +413,6 @@ impl Default for SopMetricsCollector {
}
}
// ── Conditional MetricsProvider impl ───────────────────────────
#[cfg(feature = "ampersona-gates")]
impl ampersona_core::traits::MetricsProvider for SopMetricsCollector {
fn get_metric(
&self,
query: &ampersona_core::traits::MetricQuery,
) -> Result<ampersona_core::traits::MetricSample, ampersona_core::errors::MetricError> {
if self.inner.is_poisoned() {
return Err(ampersona_core::errors::MetricError::ProviderUnavailable);
}
let value = if let Some(ref window) = query.window {
// Window specified by evaluator (from Criterion.window_seconds)
self.get_metric_value_windowed(&query.name, window)
} else {
// No window — use name as-is (may include _7d/_30d suffix or be all-time)
self.get_metric_value(&query.name)
};
value
.map(|v| ampersona_core::traits::MetricSample {
name: query.name.clone(),
value: v,
sampled_at: Utc::now(),
})
.ok_or_else(|| ampersona_core::errors::MetricError::NotFound(query.name.clone()))
}
}
// ── Helpers ────────────────────────────────────────────────────
fn build_snapshot(run: &SopRun, human_count: u64, timeout_count: u64) -> RunSnapshot {
@ -637,6 +609,9 @@ mod tests {
total_steps: u32,
step_results: Vec<SopStepResult>,
) -> SopRun {
let now = Utc::now();
let started = (now - chrono::Duration::minutes(5)).to_rfc3339();
let completed = now.to_rfc3339();
SopRun {
run_id: run_id.into(),
sop_name: sop_name.into(),
@ -644,10 +619,11 @@ mod tests {
status,
current_step: total_steps,
total_steps,
started_at: "2026-02-19T12:00:00Z".into(),
completed_at: Some("2026-02-19T12:05:00Z".into()),
started_at: started,
completed_at: Some(completed),
step_results,
waiting_since: None,
llm_calls_saved: 0,
}
}
@ -1141,43 +1117,6 @@ mod tests {
);
}
// ── MetricsProvider impl (ampersona-gates feature) ───────
#[cfg(feature = "ampersona-gates")]
#[test]
fn metrics_provider_get_metric() {
use ampersona_core::traits::{MetricQuery, MetricsProvider};
let c = SopMetricsCollector::new();
let run = make_run(
"r1",
"test-sop",
SopRunStatus::Completed,
1,
vec![make_step(1, SopStepStatus::Completed)],
);
c.record_run_complete(&run);
let query = MetricQuery {
name: "sop.runs_completed".into(),
window: None,
};
let sample = c.get_metric(&query).unwrap();
assert_eq!(sample.value, json!(1u64));
assert_eq!(sample.name, "sop.runs_completed");
// NotFound for unknown metric
let bad_query = MetricQuery {
name: "sop.nonexistent".into(),
window: None,
};
let err = c.get_metric(&bad_query).unwrap_err();
assert!(matches!(
err,
ampersona_core::errors::MetricError::NotFound(_)
));
}
// ── Warm-start tests ─────────────────────────────────────
#[tokio::test]
@ -1245,6 +1184,7 @@ mod tests {
completed_at: None,
step_results: vec![],
waiting_since: None,
llm_calls_saved: 0,
};
audit.log_run_start(&run).await.unwrap();
@ -1342,6 +1282,7 @@ mod tests {
completed_at: None,
step_results: vec![],
waiting_since: None,
llm_calls_saved: 0,
};
audit.log_run_start(&running_run).await.unwrap();
audit.log_approval(&running_run, 1).await.unwrap();
@ -1385,7 +1326,7 @@ mod tests {
assert_eq!(hic_7d, 1);
}
// ── Windowed MetricsProvider tests (ampersona-gates feature) ──
// ── Windowed MetricsProvider tests ──
#[test]
fn get_metric_windowed_7d_matches_suffix() {
@ -1461,32 +1402,4 @@ mod tests {
.unwrap();
assert_eq!(val, 2);
}
#[cfg(feature = "ampersona-gates")]
#[test]
fn get_metric_provider_window_propagation() {
use ampersona_core::traits::{MetricQuery, MetricsProvider};
let c = SopMetricsCollector::new();
let run = make_run(
"r1",
"test-sop",
SopRunStatus::Completed,
1,
vec![make_step(1, SopStepStatus::Completed)],
);
c.record_run_complete(&run);
// Query with window via MetricsProvider trait
let query = MetricQuery {
name: "sop.runs_completed".into(),
window: Some(std::time::Duration::from_secs(7 * 86400)),
};
let sample = c.get_metric(&query).unwrap();
assert_eq!(sample.value, json!(1u64));
// Same result as suffix-based query
let suffix_val = c.get_metric_value("sop.runs_completed_7d");
assert_eq!(Some(sample.value), suffix_val);
}
}

View File

@ -2,20 +2,17 @@ pub mod audit;
pub mod condition;
pub mod dispatch;
pub mod engine;
#[cfg(feature = "ampersona-gates")]
pub mod gates;
pub mod metrics;
pub mod types;
pub use audit::SopAuditLogger;
pub use engine::SopEngine;
#[cfg(feature = "ampersona-gates")]
pub use gates::GateEvalState;
pub use metrics::SopMetricsCollector;
#[allow(unused_imports)]
pub use types::{
Sop, SopEvent, SopExecutionMode, SopPriority, SopRun, SopRunAction, SopRunStatus, SopStep,
SopStepResult, SopStepStatus, SopTrigger, SopTriggerSource,
DeterministicRunState, DeterministicSavings, Sop, SopEvent, SopExecutionMode, SopPriority,
SopRun, SopRunAction, SopRunStatus, SopStep, SopStepKind, SopStepResult, SopStepStatus,
SopTrigger, SopTriggerSource, StepSchema,
};
use anyhow::Result;
@ -24,6 +21,19 @@ use tracing::warn;
use types::{SopManifest, SopMeta};
/// Parse an execution mode string into `SopExecutionMode`, falling back to
/// `Supervised` for unknown values.
pub fn parse_execution_mode(s: &str) -> SopExecutionMode {
match s.trim().to_lowercase().as_str() {
"auto" => SopExecutionMode::Auto,
"step_by_step" => SopExecutionMode::StepByStep,
"priority_based" => SopExecutionMode::PriorityBased,
"deterministic" => SopExecutionMode::Deterministic,
// "supervised" and any unknown value
_ => SopExecutionMode::Supervised,
}
}
// ── SOP directory helpers ───────────────────────────────────────
/// Return the default SOPs directory: `<workspace>/sops`.
@ -112,19 +122,28 @@ fn load_sop(sop_dir: &Path, default_execution_mode: SopExecutionMode) -> Result<
execution_mode,
cooldown_secs,
max_concurrent,
deterministic,
} = manifest.sop;
// When deterministic=true, override execution_mode to Deterministic
let effective_mode = if deterministic {
SopExecutionMode::Deterministic
} else {
execution_mode.unwrap_or(default_execution_mode)
};
Ok(Sop {
name,
description,
version,
priority,
execution_mode: execution_mode.unwrap_or(default_execution_mode),
execution_mode: effective_mode,
triggers: manifest.triggers,
steps,
cooldown_secs,
max_concurrent,
location: Some(sop_dir.to_path_buf()),
deterministic,
})
}
@ -143,6 +162,7 @@ pub fn parse_steps(md: &str) -> Vec<SopStep> {
let mut current_body = String::new();
let mut current_tools: Vec<String> = Vec::new();
let mut current_requires_confirmation = false;
let mut current_kind = SopStepKind::Execute;
for line in md.lines() {
let trimmed = line.trim();
@ -164,6 +184,7 @@ pub fn parse_steps(md: &str) -> Vec<SopStep> {
&mut current_body,
&mut current_tools,
&mut current_requires_confirmation,
&mut current_kind,
);
in_steps_section = false;
}
@ -184,6 +205,7 @@ pub fn parse_steps(md: &str) -> Vec<SopStep> {
&mut current_body,
&mut current_tools,
&mut current_requires_confirmation,
&mut current_kind,
);
let step_num = u32::try_from(steps.len())
@ -217,6 +239,15 @@ pub fn parse_steps(md: &str) -> Vec<SopStep> {
if let Some(val) = bullet.strip_prefix("requires_confirmation:") {
current_requires_confirmation = val.trim().eq_ignore_ascii_case("true");
}
} else if bullet.starts_with("kind:") {
if let Some(val) = bullet.strip_prefix("kind:") {
let val = val.trim();
if val.eq_ignore_ascii_case("checkpoint") {
current_kind = SopStepKind::Checkpoint;
} else {
current_kind = SopStepKind::Execute;
}
}
} else {
// Continuation body line
if !current_body.is_empty() {
@ -244,6 +275,7 @@ pub fn parse_steps(md: &str) -> Vec<SopStep> {
&mut current_body,
&mut current_tools,
&mut current_requires_confirmation,
&mut current_kind,
);
steps
@ -257,6 +289,7 @@ fn flush_step(
body: &mut String,
tools: &mut Vec<String>,
requires_confirmation: &mut bool,
kind: &mut SopStepKind,
) {
if let Some(n) = number.take() {
steps.push(SopStep {
@ -265,9 +298,12 @@ fn flush_step(
body: body.trim().to_string(),
suggested_tools: std::mem::take(tools),
requires_confirmation: *requires_confirmation,
kind: *kind,
schema: None,
});
*body = String::new();
*requires_confirmation = false;
*kind = SopStepKind::Execute;
}
}
@ -349,7 +385,7 @@ pub fn handle_command(command: crate::SopCommands, config: &crate::config::Confi
let sops = load_sops(
&config.workspace_dir,
sops_dir_override,
config.sop.default_execution_mode,
parse_execution_mode(&config.sop.default_execution_mode),
);
if sops.is_empty() {
println!("No SOPs found.");
@ -393,7 +429,7 @@ pub fn handle_command(command: crate::SopCommands, config: &crate::config::Confi
let sops = load_sops(
&config.workspace_dir,
sops_dir_override,
config.sop.default_execution_mode,
parse_execution_mode(&config.sop.default_execution_mode),
);
let matching: Vec<&Sop> = if let Some(ref name) = name {
sops.iter().filter(|s| s.name == *name).collect()
@ -443,7 +479,7 @@ pub fn handle_command(command: crate::SopCommands, config: &crate::config::Confi
let sops = load_sops(
&config.workspace_dir,
sops_dir_override,
config.sop.default_execution_mode,
parse_execution_mode(&config.sop.default_execution_mode),
);
let sop = sops
.iter()
@ -474,16 +510,23 @@ pub fn handle_command(command: crate::SopCommands, config: &crate::config::Confi
if !sop.steps.is_empty() {
println!("Steps:");
for step in &sop.steps {
let confirm_tag = if step.requires_confirmation {
" [requires confirmation]"
let mut tags = Vec::new();
if step.requires_confirmation {
tags.push("requires confirmation");
}
if step.kind == SopStepKind::Checkpoint {
tags.push("checkpoint");
}
let tag_str = if tags.is_empty() {
String::new()
} else {
""
format!(" [{}]", tags.join(", "))
};
println!(
" {}. {}{}",
step.number,
console::style(&step.title).bold(),
confirm_tag
tag_str
);
if !step.body.is_empty() {
for line in step.body.lines() {
@ -705,6 +748,7 @@ type = "manual"
cooldown_secs: 0,
max_concurrent: 1,
location: None,
deterministic: false,
};
let warnings = validate_sop(&sop);
@ -729,10 +773,13 @@ type = "manual"
body: "Do the thing".into(),
suggested_tools: vec!["shell".into()],
requires_confirmation: false,
kind: SopStepKind::default(),
schema: None,
}],
cooldown_secs: 0,
max_concurrent: 1,
location: None,
deterministic: false,
};
let warnings = validate_sop(&sop);

View File

@ -1,5 +1,6 @@
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::path::PathBuf;
@ -42,6 +43,10 @@ pub enum SopExecutionMode {
StepByStep,
/// Critical/High → Auto, Normal/Low → Supervised.
PriorityBased,
/// Execute steps sequentially without LLM round-trips.
/// Step outputs are piped as inputs to the next step.
/// Checkpoint steps pause for human approval.
Deterministic,
}
impl fmt::Display for SopExecutionMode {
@ -51,6 +56,7 @@ impl fmt::Display for SopExecutionMode {
Self::Supervised => write!(f, "supervised"),
Self::StepByStep => write!(f, "step_by_step"),
Self::PriorityBased => write!(f, "priority_based"),
Self::Deterministic => write!(f, "deterministic"),
}
}
}
@ -93,6 +99,44 @@ impl fmt::Display for SopTrigger {
}
}
// ── Step kind ────────────────────────────────────────────────────
/// The kind of a workflow step.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SopStepKind {
/// Normal step — executed by the agent (or deterministic handler).
#[default]
Execute,
/// Checkpoint step — pauses execution and waits for human approval.
Checkpoint,
}
impl fmt::Display for SopStepKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Execute => write!(f, "execute"),
Self::Checkpoint => write!(f, "checkpoint"),
}
}
}
// ── Typed step parameters ────────────────────────────────────────
/// JSON Schema fragment for validating step input/output data.
///
/// Stored as a raw `serde_json::Value` so callers can validate without
/// pulling in a full JSON Schema library.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StepSchema {
/// JSON Schema object describing expected input shape.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub input: Option<serde_json::Value>,
/// JSON Schema object describing expected output shape.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub output: Option<serde_json::Value>,
}
// ── Step ────────────────────────────────────────────────────────
/// A single step in an SOP procedure, parsed from SOP.md.
@ -105,6 +149,12 @@ pub struct SopStep {
pub suggested_tools: Vec<String>,
#[serde(default)]
pub requires_confirmation: bool,
/// Step kind: `execute` (default) or `checkpoint`.
#[serde(default)]
pub kind: SopStepKind,
/// Typed input/output schemas for deterministic data flow validation.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub schema: Option<StepSchema>,
}
// ── SOP ─────────────────────────────────────────────────────────
@ -125,6 +175,10 @@ pub struct Sop {
pub max_concurrent: u32,
#[serde(skip)]
pub location: Option<PathBuf>,
/// When true, sets execution_mode to Deterministic.
/// Steps execute sequentially without LLM round-trips.
#[serde(default)]
pub deterministic: bool,
}
fn default_cooldown_secs() -> u64 {
@ -160,6 +214,9 @@ pub(crate) struct SopMeta {
pub cooldown_secs: u64,
#[serde(default = "default_max_concurrent")]
pub max_concurrent: u32,
/// Opt-in deterministic execution (no LLM round-trips between steps).
#[serde(default)]
pub deterministic: bool,
}
fn default_sop_version() -> String {
@ -214,6 +271,8 @@ pub enum SopRunStatus {
Pending,
Running,
WaitingApproval,
/// Paused at a checkpoint in a deterministic workflow.
PausedCheckpoint,
Completed,
Failed,
Cancelled,
@ -225,6 +284,7 @@ impl fmt::Display for SopRunStatus {
Self::Pending => write!(f, "pending"),
Self::Running => write!(f, "running"),
Self::WaitingApproval => write!(f, "waiting_approval"),
Self::PausedCheckpoint => write!(f, "paused_checkpoint"),
Self::Completed => write!(f, "completed"),
Self::Failed => write!(f, "failed"),
Self::Cancelled => write!(f, "cancelled"),
@ -276,6 +336,44 @@ pub struct SopRun {
/// ISO-8601 timestamp when the run entered WaitingApproval (for timeout tracking).
#[serde(default)]
pub waiting_since: Option<String>,
/// Number of LLM calls saved by deterministic execution in this run.
#[serde(default)]
pub llm_calls_saved: u64,
}
// ── Deterministic workflow state (persistence + resume) ──────────
/// Persisted state for a deterministic workflow run, enabling resume
/// after interruption. Serialized to a JSON file alongside the SOP.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeterministicRunState {
/// Identifier of this run.
pub run_id: String,
/// SOP name this state belongs to.
pub sop_name: String,
/// Last successfully completed step number (0 = none completed).
pub last_completed_step: u32,
/// Total steps in the workflow.
pub total_steps: u32,
/// Output of each completed step, keyed by step number.
pub step_outputs: HashMap<u32, serde_json::Value>,
/// ISO-8601 timestamp when this state was last persisted.
pub persisted_at: String,
/// Number of LLM calls that were saved by deterministic execution.
pub llm_calls_saved: u64,
/// Whether the run is paused at a checkpoint awaiting approval.
pub paused_at_checkpoint: bool,
}
// ── Cost savings metric ──────────────────────────────────────────
/// Tracks how many LLM round-trips were saved by deterministic execution.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct DeterministicSavings {
/// Total LLM calls saved across all deterministic runs.
pub total_llm_calls_saved: u64,
/// Total deterministic runs completed.
pub total_runs: u64,
}
/// What the engine instructs the caller to do next after a state transition.
@ -293,6 +391,20 @@ pub enum SopRunAction {
step: SopStep,
context: String,
},
/// Execute a step deterministically (no LLM). The `input` is the piped
/// output from the previous step (or trigger payload for step 1).
DeterministicStep {
run_id: String,
step: SopStep,
input: serde_json::Value,
},
/// Deterministic workflow hit a checkpoint — pause for human approval.
/// Workflow state has been persisted so it can resume after approval.
CheckpointWait {
run_id: String,
step: SopStep,
state_file: PathBuf,
},
/// The SOP run completed successfully.
Completed { run_id: String, sop_name: String },
/// The SOP run failed.
@ -459,6 +571,7 @@ path = "/sop/test"
completed_at: Some("2026-02-19T12:00:05Z".into()),
}],
waiting_since: None,
llm_calls_saved: 0,
};
let json = serde_json::to_string(&run).unwrap();
let parsed: SopRun = serde_json::from_str(&json).unwrap();

View File

@ -76,6 +76,11 @@ pub mod schema;
pub mod screenshot;
pub mod security_ops;
pub mod shell;
pub mod sop_advance;
pub mod sop_approve;
pub mod sop_execute;
pub mod sop_list;
pub mod sop_status;
pub mod swarm;
pub mod text_browser;
pub mod tool_search;
@ -146,6 +151,11 @@ pub use schema::{CleaningStrategy, SchemaCleanr};
pub use screenshot::ScreenshotTool;
pub use security_ops::SecurityOpsTool;
pub use shell::ShellTool;
pub use sop_advance::SopAdvanceTool;
pub use sop_approve::SopApproveTool;
pub use sop_execute::SopExecuteTool;
pub use sop_list::SopListTool;
pub use sop_status::SopStatusTool;
pub use swarm::SwarmTool;
pub use text_browser::TextBrowserTool;
pub use tool_search::ToolSearchTool;
@ -556,6 +566,18 @@ pub fn all_tools_with_runtime(
)));
}
// SOP tools (registered when sops_dir is configured)
if root_config.sop.sops_dir.is_some() {
let sop_engine = Arc::new(std::sync::Mutex::new(crate::sop::SopEngine::new(
root_config.sop.clone(),
)));
tool_arcs.push(Arc::new(SopListTool::new(Arc::clone(&sop_engine))));
tool_arcs.push(Arc::new(SopExecuteTool::new(Arc::clone(&sop_engine))));
tool_arcs.push(Arc::new(SopAdvanceTool::new(Arc::clone(&sop_engine))));
tool_arcs.push(Arc::new(SopApproveTool::new(Arc::clone(&sop_engine))));
tool_arcs.push(Arc::new(SopStatusTool::new(Arc::clone(&sop_engine))));
}
if let Some(key) = composio_key {
if !key.is_empty() {
tool_arcs.push(Arc::new(ComposioTool::new(

View File

@ -181,6 +181,18 @@ impl Tool for SopAdvanceTool {
} => {
format!("SOP '{sop_name}' run {run_id} failed: {reason}")
}
SopRunAction::DeterministicStep { run_id, step, .. } => {
format!(
"Step recorded. Next deterministic step for run {run_id}: {}",
step.title
)
}
SopRunAction::CheckpointWait { run_id, step, .. } => {
format!(
"Step recorded. Run {run_id} paused at checkpoint: {}",
step.title
)
}
};
Ok(ToolResult {
success: true,
@ -222,6 +234,8 @@ mod tests {
body: "Do step one".into(),
suggested_tools: vec![],
requires_confirmation: false,
kind: SopStepKind::default(),
schema: None,
},
SopStep {
number: 2,
@ -229,11 +243,14 @@ mod tests {
body: "Do step two".into(),
suggested_tools: vec![],
requires_confirmation: false,
kind: SopStepKind::default(),
schema: None,
},
],
cooldown_secs: 0,
max_concurrent: 1,
location: None,
deterministic: false,
}
}

View File

@ -143,10 +143,13 @@ mod tests {
body: "Do it".into(),
suggested_tools: vec![],
requires_confirmation: false,
kind: SopStepKind::default(),
schema: None,
}],
cooldown_secs: 0,
max_concurrent: 1,
location: None,
deterministic: false,
}
}

View File

@ -118,6 +118,18 @@ impl Tool for SopExecuteTool {
SopRunAction::Failed { run_id, reason, .. } => {
format!("SOP run {run_id} failed: {reason}")
}
SopRunAction::DeterministicStep { run_id, step, .. } => {
format!(
"SOP run started (deterministic): {run_id}\nFirst step: {}",
step.title
)
}
SopRunAction::CheckpointWait { run_id, step, .. } => {
format!(
"SOP run started: {run_id} (paused at checkpoint: {})",
step.title
)
}
};
Ok(ToolResult {
success: true,
@ -140,7 +152,9 @@ fn action_run_id(action: &SopRunAction) -> Option<&str> {
SopRunAction::ExecuteStep { run_id, .. }
| SopRunAction::WaitApproval { run_id, .. }
| SopRunAction::Completed { run_id, .. }
| SopRunAction::Failed { run_id, .. } => Some(run_id),
| SopRunAction::Failed { run_id, .. }
| SopRunAction::DeterministicStep { run_id, .. }
| SopRunAction::CheckpointWait { run_id, .. } => Some(run_id),
}
}
@ -168,6 +182,8 @@ mod tests {
body: "Do step one".into(),
suggested_tools: vec!["shell".into()],
requires_confirmation: false,
kind: SopStepKind::default(),
schema: None,
},
SopStep {
number: 2,
@ -175,11 +191,14 @@ mod tests {
body: "Do step two".into(),
suggested_tools: vec![],
requires_confirmation: false,
kind: SopStepKind::default(),
schema: None,
},
],
cooldown_secs: 0,
max_concurrent: 1,
location: None,
deterministic: false,
}
}

View File

@ -137,10 +137,13 @@ mod tests {
body: "Do it".into(),
suggested_tools: vec![],
requires_confirmation: false,
kind: SopStepKind::default(),
schema: None,
}],
cooldown_secs: 0,
max_concurrent: 1,
location: None,
deterministic: false,
}
}

View File

@ -11,8 +11,6 @@ use crate::sop::{SopEngine, SopMetricsCollector};
pub struct SopStatusTool {
engine: Arc<Mutex<SopEngine>>,
collector: Option<Arc<SopMetricsCollector>>,
#[cfg(feature = "ampersona-gates")]
gate_eval: Option<Arc<crate::sop::GateEvalState>>,
}
impl SopStatusTool {
@ -20,8 +18,6 @@ impl SopStatusTool {
Self {
engine,
collector: None,
#[cfg(feature = "ampersona-gates")]
gate_eval: None,
}
}
@ -30,61 +26,11 @@ impl SopStatusTool {
self
}
#[cfg(feature = "ampersona-gates")]
pub fn with_gate_eval(mut self, gate_eval: Arc<crate::sop::GateEvalState>) -> Self {
self.gate_eval = Some(gate_eval);
self
}
fn append_gate_status(&self, output: &mut String, include_gate_status: bool) {
#[cfg(feature = "ampersona-gates")]
if include_gate_status {
if let Some(ref ge) = self.gate_eval {
if let Some(snap) = ge.phase_state_snapshot() {
let _ = writeln!(output, "\nGate Status:");
let _ = writeln!(
output,
" current_phase: {}",
snap.current_phase.as_deref().unwrap_or("(none)")
);
let _ = writeln!(output, " state_rev: {}", snap.state_rev);
let _ = writeln!(output, " gates_loaded: {}", ge.gate_count());
if let Some(ref tr) = snap.last_transition {
let _ = writeln!(
output,
" last_transition: {} ({} → {})",
tr.at.to_rfc3339(),
tr.from_phase.as_deref().unwrap_or("(none)"),
tr.to_phase,
);
} else {
let _ = writeln!(output, " last_transition: none");
}
if let Some(ref pt) = snap.pending_transition {
let _ = writeln!(
output,
" pending_transition: {} → {} ({})",
pt.from_phase.as_deref().unwrap_or("(none)"),
pt.to_phase,
pt.decision,
);
} else {
let _ = writeln!(output, " pending_transition: none");
}
}
} else {
let _ = writeln!(
output,
"\nGate Status: not available (gate eval not configured)"
);
}
}
#[cfg(not(feature = "ampersona-gates"))]
if include_gate_status {
let _ = writeln!(
output,
"\nGate Status: not available (ampersona-gates feature not enabled)"
"\nGate Status: not available (gate evaluation not supported)"
);
}
}
@ -309,10 +255,13 @@ mod tests {
body: "Do it".into(),
suggested_tools: vec![],
requires_confirmation: false,
kind: SopStepKind::default(),
schema: None,
}],
cooldown_secs: 0,
max_concurrent: 2,
location: None,
deterministic: false,
}
}
@ -431,6 +380,7 @@ mod tests {
completed_at: Some("2026-02-19T12:01:00Z".into()),
}],
waiting_since: None,
llm_calls_saved: 0,
};
collector.record_run_complete(&run);
@ -466,6 +416,7 @@ mod tests {
completed_at: Some("2026-02-19T12:01:00Z".into()),
}],
waiting_since: None,
llm_calls_saved: 0,
};
collector.record_run_complete(&run);