Compare commits
6 Commits
master
...
review/pr-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
18cfb4e2fe | ||
|
|
ccabfd7167 | ||
|
|
b91844966d | ||
|
|
e46a19c61c | ||
|
|
e8c61522ea | ||
|
|
a12716a065 |
@ -12,8 +12,6 @@ SOP 审计条目通过 `SopAuditLogger` 持久化到配置的内存后端的 `so
|
||||
- `sop_step_{run_id}_{step_number}`:单步结果
|
||||
- `sop_approval_{run_id}_{step_number}`:操作员审批记录
|
||||
- `sop_timeout_approve_{run_id}_{step_number}`:超时自动审批记录
|
||||
- `sop_gate_decision_{gate_id}_{timestamp_ms}`:门评估器决策记录(启用 `ampersona-gates` 时)
|
||||
- `sop_phase_state`:持久化的信任阶段状态快照(启用 `ampersona-gates` 时)
|
||||
|
||||
## 2. 检查路径
|
||||
|
||||
|
||||
@ -12,8 +12,6 @@ Common key patterns:
|
||||
- `sop_step_{run_id}_{step_number}`: per-step result
|
||||
- `sop_approval_{run_id}_{step_number}`: operator approval record
|
||||
- `sop_timeout_approve_{run_id}_{step_number}`: timeout auto-approval record
|
||||
- `sop_gate_decision_{gate_id}_{timestamp_ms}`: gate evaluator decision record (when `ampersona-gates` is enabled)
|
||||
- `sop_phase_state`: persisted trust-phase state snapshot (when `ampersona-gates` is enabled)
|
||||
|
||||
## 2. Inspection Paths
|
||||
|
||||
|
||||
@ -25,11 +25,11 @@ pub use schema::{
|
||||
ProjectIntelConfig, ProxyConfig, ProxyScope, QdrantConfig, QueryClassificationConfig,
|
||||
ReliabilityConfig, ResourceLimitsConfig, RuntimeConfig, SandboxBackend, SandboxConfig,
|
||||
SchedulerConfig, SecretsConfig, SecurityConfig, SecurityOpsConfig, SkillCreationConfig,
|
||||
SkillsConfig, SkillsPromptInjectionMode, SlackConfig, StorageConfig, StorageProviderConfig,
|
||||
StorageProviderSection, StreamMode, SwarmConfig, SwarmStrategy, TelegramConfig,
|
||||
TextBrowserConfig, ToolFilterGroup, ToolFilterGroupMode, TranscriptionConfig, TtsConfig,
|
||||
TunnelConfig, VerifiableIntentConfig, WebFetchConfig, WebSearchConfig, WebhookConfig,
|
||||
WhatsAppChatPolicy, WhatsAppWebMode, WorkspaceConfig, DEFAULT_GWS_SERVICES,
|
||||
SkillsConfig, SkillsPromptInjectionMode, SlackConfig, SopConfig, StorageConfig,
|
||||
StorageProviderConfig, StorageProviderSection, StreamMode, SwarmConfig, SwarmStrategy,
|
||||
TelegramConfig, TextBrowserConfig, ToolFilterGroup, ToolFilterGroupMode, TranscriptionConfig,
|
||||
TtsConfig, TunnelConfig, VerifiableIntentConfig, WebFetchConfig, WebSearchConfig,
|
||||
WebhookConfig, WhatsAppChatPolicy, WhatsAppWebMode, WorkspaceConfig, DEFAULT_GWS_SERVICES,
|
||||
};
|
||||
|
||||
pub fn name_and_presence<T: traits::ChannelConfig>(channel: Option<&T>) -> (&'static str, bool) {
|
||||
|
||||
@ -379,6 +379,10 @@ pub struct Config {
|
||||
/// Claude Code tool configuration (`[claude_code]`).
|
||||
#[serde(default)]
|
||||
pub claude_code: ClaudeCodeConfig,
|
||||
|
||||
/// Standard Operating Procedures engine configuration (`[sop]`).
|
||||
#[serde(default)]
|
||||
pub sop: SopConfig,
|
||||
}
|
||||
|
||||
/// Multi-client workspace isolation configuration.
|
||||
@ -1248,6 +1252,12 @@ pub struct AgentConfig {
|
||||
/// Default: `[]` (no filtering — all tools included).
|
||||
#[serde(default)]
|
||||
pub tool_filter_groups: Vec<ToolFilterGroup>,
|
||||
/// Maximum characters for the assembled system prompt. When `> 0`, the prompt
|
||||
/// is truncated to this limit after assembly (keeping the top portion which
|
||||
/// contains identity and safety instructions). `0` means unlimited.
|
||||
/// Useful for small-context models (e.g. glm-4.5-air ~8K tokens → set to 8000).
|
||||
#[serde(default = "default_max_system_prompt_chars")]
|
||||
pub max_system_prompt_chars: usize,
|
||||
}
|
||||
|
||||
fn default_agent_max_tool_iterations() -> usize {
|
||||
@ -1266,6 +1276,10 @@ fn default_agent_tool_dispatcher() -> String {
|
||||
"auto".into()
|
||||
}
|
||||
|
||||
fn default_max_system_prompt_chars() -> usize {
|
||||
0
|
||||
}
|
||||
|
||||
impl Default for AgentConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
@ -1277,6 +1291,7 @@ impl Default for AgentConfig {
|
||||
tool_dispatcher: default_agent_tool_dispatcher(),
|
||||
tool_call_dedup_exempt: Vec::new(),
|
||||
tool_filter_groups: Vec::new(),
|
||||
max_system_prompt_chars: default_max_system_prompt_chars(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -6815,6 +6830,7 @@ impl Default for Config {
|
||||
locale: None,
|
||||
verifiable_intent: VerifiableIntentConfig::default(),
|
||||
claude_code: ClaudeCodeConfig::default(),
|
||||
sop: SopConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -9267,6 +9283,70 @@ async fn sync_directory(path: &Path) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
// ── SOP engine configuration ───────────────────────────────────
|
||||
|
||||
/// Standard Operating Procedures engine configuration (`[sop]`).
|
||||
///
|
||||
/// The `default_execution_mode` field uses the `SopExecutionMode` type from
|
||||
/// `sop::types` (re-exported via `sop::SopExecutionMode`). To avoid circular
|
||||
/// module references, config stores it using the same enum definition.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
|
||||
pub struct SopConfig {
|
||||
/// Directory containing SOP definitions (subdirs with SOP.toml + SOP.md).
|
||||
/// Falls back to `<workspace>/sops` when omitted.
|
||||
#[serde(default)]
|
||||
pub sops_dir: Option<String>,
|
||||
|
||||
/// Default execution mode for SOPs that omit `execution_mode`.
|
||||
/// Values: `auto`, `supervised` (default), `step_by_step`,
|
||||
/// `priority_based`, `deterministic`.
|
||||
#[serde(default = "default_sop_execution_mode")]
|
||||
pub default_execution_mode: String,
|
||||
|
||||
/// Maximum total concurrent SOP runs across all SOPs.
|
||||
#[serde(default = "default_sop_max_concurrent_total")]
|
||||
pub max_concurrent_total: usize,
|
||||
|
||||
/// Approval timeout in seconds. When a run waits for approval longer than
|
||||
/// this, Critical/High-priority SOPs auto-approve; others stay waiting.
|
||||
/// Set to 0 to disable timeout.
|
||||
#[serde(default = "default_sop_approval_timeout_secs")]
|
||||
pub approval_timeout_secs: u64,
|
||||
|
||||
/// Maximum number of finished runs kept in memory for status queries.
|
||||
/// Oldest runs are evicted when over capacity. 0 = unlimited.
|
||||
#[serde(default = "default_sop_max_finished_runs")]
|
||||
pub max_finished_runs: usize,
|
||||
}
|
||||
|
||||
fn default_sop_execution_mode() -> String {
|
||||
"supervised".to_string()
|
||||
}
|
||||
|
||||
fn default_sop_max_concurrent_total() -> usize {
|
||||
4
|
||||
}
|
||||
|
||||
fn default_sop_approval_timeout_secs() -> u64 {
|
||||
300
|
||||
}
|
||||
|
||||
fn default_sop_max_finished_runs() -> usize {
|
||||
100
|
||||
}
|
||||
|
||||
impl Default for SopConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
sops_dir: None,
|
||||
default_execution_mode: default_sop_execution_mode(),
|
||||
max_concurrent_total: default_sop_max_concurrent_total(),
|
||||
approval_timeout_secs: default_sop_approval_timeout_secs(),
|
||||
max_finished_runs: default_sop_max_finished_runs(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@ -9738,6 +9818,7 @@ default_temperature = 0.7
|
||||
locale: None,
|
||||
verifiable_intent: VerifiableIntentConfig::default(),
|
||||
claude_code: ClaudeCodeConfig::default(),
|
||||
sop: SopConfig::default(),
|
||||
};
|
||||
|
||||
let toml_str = toml::to_string_pretty(&config).unwrap();
|
||||
@ -10119,6 +10200,7 @@ default_temperature = 0.7
|
||||
locale: None,
|
||||
verifiable_intent: VerifiableIntentConfig::default(),
|
||||
claude_code: ClaudeCodeConfig::default(),
|
||||
sop: SopConfig::default(),
|
||||
};
|
||||
|
||||
config.save().await.unwrap();
|
||||
@ -13667,9 +13749,9 @@ require_otp_to_resume = true
|
||||
|
||||
// ── Bootstrap files ─────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
#[tokio::test]
|
||||
async fn ensure_bootstrap_files_creates_missing_files() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let tmp = tempfile::TempDir::new().unwrap();
|
||||
let ws = tmp.path().join("workspace");
|
||||
tokio::fs::create_dir_all(&ws).await.unwrap();
|
||||
|
||||
@ -13683,9 +13765,9 @@ require_otp_to_resume = true
|
||||
assert!(identity.contains("IDENTITY.md"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[tokio::test]
|
||||
async fn ensure_bootstrap_files_does_not_overwrite_existing() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let tmp = tempfile::TempDir::new().unwrap();
|
||||
let ws = tmp.path().join("workspace");
|
||||
tokio::fs::create_dir_all(&ws).await.unwrap();
|
||||
|
||||
|
||||
18
src/lib.rs
18
src/lib.rs
@ -71,6 +71,7 @@ pub mod runtime;
|
||||
pub(crate) mod security;
|
||||
pub(crate) mod service;
|
||||
pub(crate) mod skills;
|
||||
pub mod sop;
|
||||
pub mod tools;
|
||||
pub(crate) mod tunnel;
|
||||
pub(crate) mod util;
|
||||
@ -561,3 +562,20 @@ Examples:
|
||||
/// Flash ZeroClaw firmware to Nucleo-F401RE (builds + probe-rs run)
|
||||
FlashNucleo,
|
||||
}
|
||||
|
||||
/// SOP management subcommands
|
||||
#[derive(Subcommand, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub enum SopCommands {
|
||||
/// List loaded SOPs
|
||||
List,
|
||||
/// Validate SOP definitions
|
||||
Validate {
|
||||
/// SOP name to validate (all if omitted)
|
||||
name: Option<String>,
|
||||
},
|
||||
/// Show details of an SOP
|
||||
Show {
|
||||
/// Name of the SOP to show
|
||||
name: String,
|
||||
},
|
||||
}
|
||||
|
||||
@ -107,6 +107,7 @@ mod security;
|
||||
mod service;
|
||||
mod skillforge;
|
||||
mod skills;
|
||||
mod sop;
|
||||
mod tools;
|
||||
mod tunnel;
|
||||
mod util;
|
||||
@ -117,7 +118,7 @@ use config::Config;
|
||||
// Re-export so binary modules can use crate::<CommandEnum> while keeping a single source of truth.
|
||||
pub use zeroclaw::{
|
||||
ChannelCommands, CronCommands, GatewayCommands, HardwareCommands, IntegrationCommands,
|
||||
MigrateCommands, PeripheralCommands, ServiceCommands, SkillCommands,
|
||||
MigrateCommands, PeripheralCommands, ServiceCommands, SkillCommands, SopCommands,
|
||||
};
|
||||
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
|
||||
|
||||
@ -201,6 +201,7 @@ pub async fn run_wizard(force: bool) -> Result<Config> {
|
||||
locale: None,
|
||||
verifiable_intent: crate::config::VerifiableIntentConfig::default(),
|
||||
claude_code: crate::config::ClaudeCodeConfig::default(),
|
||||
sop: crate::config::SopConfig::default(),
|
||||
};
|
||||
|
||||
println!(
|
||||
@ -624,6 +625,7 @@ async fn run_quick_setup_with_home(
|
||||
locale: None,
|
||||
verifiable_intent: crate::config::VerifiableIntentConfig::default(),
|
||||
claude_code: crate::config::ClaudeCodeConfig::default(),
|
||||
sop: crate::config::SopConfig::default(),
|
||||
};
|
||||
|
||||
config.save().await?;
|
||||
|
||||
@ -78,33 +78,6 @@ impl SopAuditLogger {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Log a gate evaluation decision record.
|
||||
#[cfg(feature = "ampersona-gates")]
|
||||
pub async fn log_gate_decision(
|
||||
&self,
|
||||
record: &ersona_engine::gates::decision::GateDecisionRecord,
|
||||
) -> Result<()> {
|
||||
let timestamp_ms = chrono::Utc::now().timestamp_millis();
|
||||
let key = format!("sop_gate_decision_{}_{timestamp_ms}", record.gate_id);
|
||||
let content = serde_json::to_string_pretty(record)?;
|
||||
self.memory.store(&key, &content, category(), None).await?;
|
||||
info!(
|
||||
gate_id = %record.gate_id,
|
||||
decision = %record.decision,
|
||||
"SOP audit: gate decision logged"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Persist (upsert) the current gate phase state.
|
||||
#[cfg(feature = "ampersona-gates")]
|
||||
pub async fn log_phase_state(&self, state: &ersona_core::state::PhaseState) -> Result<()> {
|
||||
let key = "sop_phase_state";
|
||||
let content = serde_json::to_string_pretty(state)?;
|
||||
self.memory.store(key, &content, category(), None).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Retrieve a stored run by ID (if it exists in memory).
|
||||
pub async fn get_run(&self, run_id: &str) -> Result<Option<SopRun>> {
|
||||
let key = run_key(run_id);
|
||||
@ -166,6 +139,7 @@ mod tests {
|
||||
completed_at: None,
|
||||
step_results: Vec::new(),
|
||||
waiting_since: None,
|
||||
llm_calls_saved: 0,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -24,7 +24,7 @@ pub enum DispatchResult {
|
||||
Started {
|
||||
run_id: String,
|
||||
sop_name: String,
|
||||
action: SopRunAction,
|
||||
action: Box<SopRunAction>,
|
||||
},
|
||||
/// A matching SOP was found but could not start (cooldown / concurrency).
|
||||
Skipped { sop_name: String, reason: String },
|
||||
@ -39,6 +39,8 @@ fn extract_run_id_from_action(action: &SopRunAction) -> &str {
|
||||
match action {
|
||||
SopRunAction::ExecuteStep { run_id, .. }
|
||||
| SopRunAction::WaitApproval { run_id, .. }
|
||||
| SopRunAction::DeterministicStep { run_id, .. }
|
||||
| SopRunAction::CheckpointWait { run_id, .. }
|
||||
| SopRunAction::Completed { run_id, .. }
|
||||
| SopRunAction::Failed { run_id, .. } => run_id,
|
||||
}
|
||||
@ -49,6 +51,8 @@ fn action_label(action: &SopRunAction) -> &'static str {
|
||||
match action {
|
||||
SopRunAction::ExecuteStep { .. } => "ExecuteStep",
|
||||
SopRunAction::WaitApproval { .. } => "WaitApproval",
|
||||
SopRunAction::DeterministicStep { .. } => "DeterministicStep",
|
||||
SopRunAction::CheckpointWait { .. } => "CheckpointWait",
|
||||
SopRunAction::Completed { .. } => "Completed",
|
||||
SopRunAction::Failed { .. } => "Failed",
|
||||
}
|
||||
@ -62,7 +66,6 @@ fn action_label(action: &SopRunAction) -> &'static str {
|
||||
/// 1. Lock → `match_trigger` → collect SOP names → drop lock
|
||||
/// 2. Lock → for each name: `start_run` → collect results → drop lock
|
||||
/// 3. Async (no lock): audit each started run
|
||||
#[tracing::instrument(skip(engine, audit), fields(source = %event.source, topic = ?event.topic))]
|
||||
pub async fn dispatch_sop_event(
|
||||
engine: &Arc<Mutex<SopEngine>>,
|
||||
audit: &SopAuditLogger,
|
||||
@ -124,7 +127,7 @@ pub async fn dispatch_sop_event(
|
||||
results.push(DispatchResult::Started {
|
||||
run_id,
|
||||
sop_name: sop_name.clone(),
|
||||
action,
|
||||
action: Box::new(action),
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
@ -158,14 +161,14 @@ pub async fn dispatch_sop_event(
|
||||
/// approval timeout polling in the scheduler handles progression.
|
||||
/// For `ExecuteStep` actions, the run is started in the engine but steps
|
||||
/// cannot be executed without an agent loop — this is logged as a warning.
|
||||
pub async fn process_headless_results(results: &[DispatchResult]) {
|
||||
pub fn process_headless_results(results: &[DispatchResult]) {
|
||||
for result in results {
|
||||
match result {
|
||||
DispatchResult::Started {
|
||||
run_id,
|
||||
sop_name,
|
||||
action,
|
||||
} => match action {
|
||||
} => match action.as_ref() {
|
||||
SopRunAction::ExecuteStep { step, .. } => {
|
||||
warn!(
|
||||
"SOP headless dispatch: run {run_id} ('{sop_name}') ready for step {} \
|
||||
@ -180,6 +183,24 @@ pub async fn process_headless_results(results: &[DispatchResult]) {
|
||||
step.number, step.title,
|
||||
);
|
||||
}
|
||||
SopRunAction::DeterministicStep { step, .. } => {
|
||||
info!(
|
||||
"SOP headless dispatch: run {run_id} ('{sop_name}') deterministic step {} \
|
||||
'{}'",
|
||||
step.number, step.title,
|
||||
);
|
||||
}
|
||||
SopRunAction::CheckpointWait {
|
||||
step, state_file, ..
|
||||
} => {
|
||||
info!(
|
||||
"SOP headless dispatch: run {run_id} ('{sop_name}') checkpoint at step {} \
|
||||
'{}', state persisted to {}",
|
||||
step.number,
|
||||
step.title,
|
||||
state_file.display(),
|
||||
);
|
||||
}
|
||||
SopRunAction::Completed { .. } => {
|
||||
info!(
|
||||
"SOP headless dispatch: run {run_id} ('{sop_name}') completed immediately"
|
||||
@ -250,7 +271,7 @@ impl SopCronCache {
|
||||
for trigger in &sop.triggers {
|
||||
if let super::types::SopTrigger::Cron { expression } = trigger {
|
||||
// Normalize 5-field crontab to 6-field (prepend seconds)
|
||||
let normalized = match crate::cron::schedule::normalize_expression(expression) {
|
||||
let normalized = match crate::cron::normalize_expression(expression) {
|
||||
Ok(n) => n,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
@ -349,10 +370,13 @@ mod tests {
|
||||
body: "Do step one".into(),
|
||||
suggested_tools: vec![],
|
||||
requires_confirmation: false,
|
||||
kind: crate::sop::SopStepKind::default(),
|
||||
schema: None,
|
||||
}],
|
||||
cooldown_secs: 0,
|
||||
max_concurrent: 2,
|
||||
location: None,
|
||||
deterministic: false,
|
||||
}
|
||||
}
|
||||
|
||||
@ -396,7 +420,7 @@ mod tests {
|
||||
let results = dispatch_sop_event(&engine, &audit, event).await;
|
||||
assert_eq!(results.len(), 1);
|
||||
assert!(
|
||||
matches!(&results[0], DispatchResult::Started { sop_name, action, .. } if sop_name == "mqtt-sop" && matches!(action, SopRunAction::ExecuteStep { .. }))
|
||||
matches!(&results[0], DispatchResult::Started { sop_name, action, .. } if sop_name == "mqtt-sop" && matches!(action.as_ref(), SopRunAction::ExecuteStep { .. }))
|
||||
);
|
||||
}
|
||||
|
||||
@ -534,7 +558,7 @@ mod tests {
|
||||
assert_eq!(sop_name, "supervised-sop");
|
||||
assert!(!run_id.is_empty());
|
||||
assert!(
|
||||
matches!(action, SopRunAction::WaitApproval { .. }),
|
||||
matches!(action.as_ref(), SopRunAction::WaitApproval { .. }),
|
||||
"Supervised SOP must return WaitApproval, got {:?}",
|
||||
action
|
||||
);
|
||||
@ -561,7 +585,7 @@ mod tests {
|
||||
match &results[0] {
|
||||
DispatchResult::Started { action, .. } => {
|
||||
assert!(
|
||||
matches!(action, SopRunAction::ExecuteStep { .. }),
|
||||
matches!(action.as_ref(), SopRunAction::ExecuteStep { .. }),
|
||||
"Auto SOP must return ExecuteStep, got {:?}",
|
||||
action
|
||||
);
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Write as _;
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use tracing::{info, warn};
|
||||
@ -8,8 +8,9 @@ use tracing::{info, warn};
|
||||
use super::condition::evaluate_condition;
|
||||
use super::load_sops;
|
||||
use super::types::{
|
||||
Sop, SopEvent, SopPriority, SopRun, SopRunAction, SopRunStatus, SopStep, SopStepResult,
|
||||
SopStepStatus, SopTrigger, SopTriggerSource,
|
||||
DeterministicRunState, DeterministicSavings, Sop, SopEvent, SopExecutionMode, SopPriority,
|
||||
SopRun, SopRunAction, SopRunStatus, SopStep, SopStepKind, SopStepResult, SopStepStatus,
|
||||
SopTrigger, SopTriggerSource,
|
||||
};
|
||||
use crate::config::SopConfig;
|
||||
|
||||
@ -21,6 +22,8 @@ pub struct SopEngine {
|
||||
finished_runs: Vec<SopRun>,
|
||||
config: SopConfig,
|
||||
run_counter: u64,
|
||||
/// Cumulative savings from deterministic execution.
|
||||
deterministic_savings: DeterministicSavings,
|
||||
}
|
||||
|
||||
impl SopEngine {
|
||||
@ -32,6 +35,7 @@ impl SopEngine {
|
||||
finished_runs: Vec::new(),
|
||||
config,
|
||||
run_counter: 0,
|
||||
deterministic_savings: DeterministicSavings::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -40,7 +44,7 @@ impl SopEngine {
|
||||
self.sops = load_sops(
|
||||
workspace_dir,
|
||||
self.config.sops_dir.as_deref(),
|
||||
self.config.default_execution_mode,
|
||||
super::parse_execution_mode(&self.config.default_execution_mode),
|
||||
);
|
||||
info!("SOP engine loaded {} SOPs", self.sops.len());
|
||||
}
|
||||
@ -118,7 +122,15 @@ impl SopEngine {
|
||||
}
|
||||
|
||||
/// Start a new SOP run. Returns the first action to take.
|
||||
/// Deterministic SOPs are automatically routed to `start_deterministic_run`.
|
||||
pub fn start_run(&mut self, sop_name: &str, event: SopEvent) -> Result<SopRunAction> {
|
||||
// Route deterministic SOPs to dedicated path
|
||||
if self.get_sop(sop_name).map_or(false, |s| {
|
||||
s.execution_mode == SopExecutionMode::Deterministic
|
||||
}) {
|
||||
return self.start_deterministic_run(sop_name, event);
|
||||
}
|
||||
|
||||
let sop = self
|
||||
.get_sop(sop_name)
|
||||
.ok_or_else(|| anyhow::anyhow!("SOP not found: {sop_name}"))?
|
||||
@ -154,6 +166,7 @@ impl SopEngine {
|
||||
completed_at: None,
|
||||
step_results: Vec::new(),
|
||||
waiting_since: None,
|
||||
llm_calls_saved: 0,
|
||||
};
|
||||
|
||||
self.active_runs.insert(run_id.clone(), run);
|
||||
@ -283,6 +296,273 @@ impl SopEngine {
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Return cumulative deterministic execution savings.
|
||||
pub fn deterministic_savings(&self) -> &DeterministicSavings {
|
||||
&self.deterministic_savings
|
||||
}
|
||||
|
||||
// ── Deterministic execution ─────────────────────────────────
|
||||
|
||||
/// Start a deterministic SOP run. Steps execute sequentially without LLM
|
||||
/// round-trips. Returns the first action (DeterministicStep or CheckpointWait).
|
||||
pub fn start_deterministic_run(
|
||||
&mut self,
|
||||
sop_name: &str,
|
||||
event: SopEvent,
|
||||
) -> Result<SopRunAction> {
|
||||
let sop = self
|
||||
.get_sop(sop_name)
|
||||
.ok_or_else(|| anyhow::anyhow!("SOP not found: {sop_name}"))?
|
||||
.clone();
|
||||
|
||||
if sop.execution_mode != SopExecutionMode::Deterministic {
|
||||
bail!(
|
||||
"SOP '{}' is not in deterministic mode (mode: {})",
|
||||
sop_name,
|
||||
sop.execution_mode
|
||||
);
|
||||
}
|
||||
|
||||
if !self.can_start(sop_name) {
|
||||
bail!(
|
||||
"Cannot start SOP '{}': cooldown or concurrency limit reached",
|
||||
sop_name
|
||||
);
|
||||
}
|
||||
|
||||
if sop.steps.is_empty() {
|
||||
bail!("SOP '{}' has no steps defined", sop_name);
|
||||
}
|
||||
|
||||
self.run_counter += 1;
|
||||
let dur = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default();
|
||||
let epoch_ms = dur.as_secs() * 1000 + u64::from(dur.subsec_millis());
|
||||
let run_id = format!("det-{epoch_ms}-{:04}", self.run_counter);
|
||||
let now = now_iso8601();
|
||||
|
||||
let total_steps = u32::try_from(sop.steps.len()).unwrap_or(u32::MAX);
|
||||
let run = SopRun {
|
||||
run_id: run_id.clone(),
|
||||
sop_name: sop_name.to_string(),
|
||||
trigger_event: event,
|
||||
status: SopRunStatus::Running,
|
||||
current_step: 1,
|
||||
total_steps,
|
||||
started_at: now,
|
||||
completed_at: None,
|
||||
step_results: Vec::new(),
|
||||
waiting_since: None,
|
||||
llm_calls_saved: 0,
|
||||
};
|
||||
|
||||
self.active_runs.insert(run_id.clone(), run);
|
||||
info!(
|
||||
"Deterministic SOP run {} started for '{}'",
|
||||
run_id, sop_name
|
||||
);
|
||||
|
||||
// Produce first step action
|
||||
let step = sop.steps[0].clone();
|
||||
let input = serde_json::Value::Null;
|
||||
self.resolve_deterministic_action(&sop, &run_id, &step, input)
|
||||
}
|
||||
|
||||
/// Advance a deterministic run with the output of the current step.
|
||||
/// The output is piped as input to the next step.
|
||||
pub fn advance_deterministic_step(
|
||||
&mut self,
|
||||
run_id: &str,
|
||||
step_output: serde_json::Value,
|
||||
) -> Result<SopRunAction> {
|
||||
let run = self
|
||||
.active_runs
|
||||
.get_mut(run_id)
|
||||
.ok_or_else(|| anyhow::anyhow!("Active run not found: {run_id}"))?;
|
||||
|
||||
let sop = self
|
||||
.sops
|
||||
.iter()
|
||||
.find(|s| s.name == run.sop_name)
|
||||
.ok_or_else(|| anyhow::anyhow!("SOP '{}' no longer loaded", run.sop_name))?
|
||||
.clone();
|
||||
|
||||
// Record step result
|
||||
let now = now_iso8601();
|
||||
let step_result = SopStepResult {
|
||||
step_number: run.current_step,
|
||||
status: SopStepStatus::Completed,
|
||||
output: step_output.to_string(),
|
||||
started_at: run.started_at.clone(),
|
||||
completed_at: Some(now),
|
||||
};
|
||||
run.step_results.push(step_result);
|
||||
|
||||
// Each deterministic step saves one LLM call
|
||||
run.llm_calls_saved += 1;
|
||||
|
||||
// Advance to next step
|
||||
let next_step_num = run.current_step + 1;
|
||||
if next_step_num > run.total_steps {
|
||||
info!(
|
||||
"Deterministic SOP run {run_id} completed ({} LLM calls saved)",
|
||||
run.llm_calls_saved
|
||||
);
|
||||
let saved = run.llm_calls_saved;
|
||||
self.deterministic_savings.total_llm_calls_saved += saved;
|
||||
self.deterministic_savings.total_runs += 1;
|
||||
return Ok(self.finish_run(run_id, SopRunStatus::Completed, None));
|
||||
}
|
||||
|
||||
let run = self.active_runs.get_mut(run_id).unwrap();
|
||||
run.current_step = next_step_num;
|
||||
|
||||
let step_idx = (next_step_num - 1) as usize;
|
||||
let step = sop.steps[step_idx].clone();
|
||||
let run_id_owned = run_id.to_string();
|
||||
|
||||
self.resolve_deterministic_action(&sop, &run_id_owned, &step, step_output)
|
||||
}
|
||||
|
||||
/// Resume a deterministic run from persisted state.
|
||||
pub fn resume_deterministic_run(
|
||||
&mut self,
|
||||
state: DeterministicRunState,
|
||||
) -> Result<SopRunAction> {
|
||||
let run = self
|
||||
.active_runs
|
||||
.get_mut(&state.run_id)
|
||||
.ok_or_else(|| anyhow::anyhow!("Active run not found: {}", state.run_id))?;
|
||||
|
||||
if run.status != SopRunStatus::PausedCheckpoint {
|
||||
bail!(
|
||||
"Run {} is not paused at checkpoint (status: {})",
|
||||
state.run_id,
|
||||
run.status
|
||||
);
|
||||
}
|
||||
|
||||
let sop = self
|
||||
.sops
|
||||
.iter()
|
||||
.find(|s| s.name == run.sop_name)
|
||||
.ok_or_else(|| anyhow::anyhow!("SOP '{}' no longer loaded", run.sop_name))?
|
||||
.clone();
|
||||
|
||||
run.status = SopRunStatus::Running;
|
||||
run.waiting_since = None;
|
||||
run.llm_calls_saved = state.llm_calls_saved;
|
||||
|
||||
// Resume from the step after the last completed one
|
||||
let next_step_num = state.last_completed_step + 1;
|
||||
if next_step_num > state.total_steps {
|
||||
info!(
|
||||
"Deterministic SOP run {} completed on resume ({} LLM calls saved)",
|
||||
state.run_id, state.llm_calls_saved
|
||||
);
|
||||
self.deterministic_savings.total_llm_calls_saved += state.llm_calls_saved;
|
||||
self.deterministic_savings.total_runs += 1;
|
||||
return Ok(self.finish_run(&state.run_id, SopRunStatus::Completed, None));
|
||||
}
|
||||
|
||||
let run = self.active_runs.get_mut(&state.run_id).unwrap();
|
||||
run.current_step = next_step_num;
|
||||
|
||||
let step_idx = (next_step_num - 1) as usize;
|
||||
let step = sop.steps[step_idx].clone();
|
||||
|
||||
// Use last step's output as input, or Null
|
||||
let last_output = state
|
||||
.step_outputs
|
||||
.get(&state.last_completed_step)
|
||||
.cloned()
|
||||
.unwrap_or(serde_json::Value::Null);
|
||||
|
||||
let run_id = state.run_id.clone();
|
||||
self.resolve_deterministic_action(&sop, &run_id, &step, last_output)
|
||||
}
|
||||
|
||||
/// Resolve the action for a deterministic step (execute or checkpoint).
|
||||
fn resolve_deterministic_action(
|
||||
&mut self,
|
||||
sop: &Sop,
|
||||
run_id: &str,
|
||||
step: &SopStep,
|
||||
input: serde_json::Value,
|
||||
) -> Result<SopRunAction> {
|
||||
if step.kind == SopStepKind::Checkpoint {
|
||||
// Pause at checkpoint — persist state and wait for approval
|
||||
if let Some(run) = self.active_runs.get_mut(run_id) {
|
||||
run.status = SopRunStatus::PausedCheckpoint;
|
||||
run.waiting_since = Some(now_iso8601());
|
||||
}
|
||||
|
||||
let state_file = self.persist_deterministic_state(run_id, sop)?;
|
||||
|
||||
info!(
|
||||
"Deterministic SOP run {run_id}: checkpoint at step {} '{}', state persisted to {}",
|
||||
step.number,
|
||||
step.title,
|
||||
state_file.display()
|
||||
);
|
||||
|
||||
Ok(SopRunAction::CheckpointWait {
|
||||
run_id: run_id.to_string(),
|
||||
step: step.clone(),
|
||||
state_file,
|
||||
})
|
||||
} else {
|
||||
Ok(SopRunAction::DeterministicStep {
|
||||
run_id: run_id.to_string(),
|
||||
step: step.clone(),
|
||||
input,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Persist the current deterministic run state to a JSON file.
|
||||
fn persist_deterministic_state(&self, run_id: &str, sop: &Sop) -> Result<PathBuf> {
|
||||
let run = self
|
||||
.active_runs
|
||||
.get(run_id)
|
||||
.ok_or_else(|| anyhow::anyhow!("Run not found: {run_id}"))?;
|
||||
|
||||
let mut step_outputs = HashMap::new();
|
||||
for result in &run.step_results {
|
||||
// Try to parse output as JSON, fall back to string value
|
||||
let value = serde_json::from_str(&result.output)
|
||||
.unwrap_or_else(|_| serde_json::Value::String(result.output.clone()));
|
||||
step_outputs.insert(result.step_number, value);
|
||||
}
|
||||
|
||||
let state = DeterministicRunState {
|
||||
run_id: run_id.to_string(),
|
||||
sop_name: run.sop_name.clone(),
|
||||
last_completed_step: run.current_step.saturating_sub(1),
|
||||
total_steps: run.total_steps,
|
||||
step_outputs,
|
||||
persisted_at: now_iso8601(),
|
||||
llm_calls_saved: run.llm_calls_saved,
|
||||
paused_at_checkpoint: run.status == SopRunStatus::PausedCheckpoint,
|
||||
};
|
||||
|
||||
// Write to SOP location directory, or temp dir
|
||||
let dir = sop.location.as_deref().unwrap_or_else(|| Path::new("."));
|
||||
let state_file = dir.join(format!("{run_id}.state.json"));
|
||||
let json = serde_json::to_string_pretty(&state)?;
|
||||
std::fs::write(&state_file, json)?;
|
||||
|
||||
Ok(state_file)
|
||||
}
|
||||
|
||||
/// Load a persisted deterministic run state from a JSON file.
|
||||
pub fn load_deterministic_state(path: &Path) -> Result<DeterministicRunState> {
|
||||
let content = std::fs::read_to_string(path)?;
|
||||
let state: DeterministicRunState = serde_json::from_str(&content)?;
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
// ── Approval timeout ──────────────────────────────────────────
|
||||
|
||||
/// Check all WaitingApproval runs for timeout. For Critical/High-priority SOPs,
|
||||
@ -487,21 +767,21 @@ fn resolve_step_action(sop: &Sop, step: &SopStep, run_id: String, context: Strin
|
||||
}
|
||||
|
||||
let needs_approval = match sop.execution_mode {
|
||||
crate::sop::SopExecutionMode::Auto => false,
|
||||
crate::sop::SopExecutionMode::Supervised => {
|
||||
// Deterministic mode is handled via start_deterministic_run;
|
||||
// if we reach here via the standard path, treat as Auto.
|
||||
SopExecutionMode::Auto | SopExecutionMode::Deterministic => false,
|
||||
SopExecutionMode::Supervised => {
|
||||
// Supervised: approval only before the first step
|
||||
step.number == 1
|
||||
}
|
||||
crate::sop::SopExecutionMode::StepByStep => true,
|
||||
crate::sop::SopExecutionMode::PriorityBased => {
|
||||
match sop.priority {
|
||||
SopPriority::Critical | SopPriority::High => false,
|
||||
SopPriority::Normal | SopPriority::Low => {
|
||||
// Supervised behavior for normal/low
|
||||
step.number == 1
|
||||
}
|
||||
SopExecutionMode::StepByStep => true,
|
||||
SopExecutionMode::PriorityBased => match sop.priority {
|
||||
SopPriority::Critical | SopPriority::High => false,
|
||||
SopPriority::Normal | SopPriority::Low => {
|
||||
// Supervised behavior for normal/low
|
||||
step.number == 1
|
||||
}
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
if needs_approval {
|
||||
@ -680,6 +960,8 @@ mod tests {
|
||||
body: "Do step one".into(),
|
||||
suggested_tools: vec!["shell".into()],
|
||||
requires_confirmation: false,
|
||||
kind: SopStepKind::default(),
|
||||
schema: None,
|
||||
},
|
||||
SopStep {
|
||||
number: 2,
|
||||
@ -687,11 +969,14 @@ mod tests {
|
||||
body: "Do step two".into(),
|
||||
suggested_tools: vec![],
|
||||
requires_confirmation: false,
|
||||
kind: SopStepKind::default(),
|
||||
schema: None,
|
||||
},
|
||||
],
|
||||
cooldown_secs: 0,
|
||||
max_concurrent: 1,
|
||||
location: None,
|
||||
deterministic: false,
|
||||
}
|
||||
}
|
||||
|
||||
@ -706,6 +991,8 @@ mod tests {
|
||||
match action {
|
||||
SopRunAction::ExecuteStep { run_id, .. }
|
||||
| SopRunAction::WaitApproval { run_id, .. }
|
||||
| SopRunAction::DeterministicStep { run_id, .. }
|
||||
| SopRunAction::CheckpointWait { run_id, .. }
|
||||
| SopRunAction::Completed { run_id, .. }
|
||||
| SopRunAction::Failed { run_id, .. } => run_id,
|
||||
}
|
||||
@ -1359,6 +1646,7 @@ mod tests {
|
||||
completed_at: None,
|
||||
step_results: Vec::new(),
|
||||
waiting_since: None,
|
||||
llm_calls_saved: 0,
|
||||
};
|
||||
let ctx = format_step_context(&sop, &run, &sop.steps[0]);
|
||||
assert!(ctx.contains("pump-shutdown"));
|
||||
|
||||
746
src/sop/gates.rs
746
src/sop/gates.rs
@ -1,746 +0,0 @@
|
||||
//! Gate evaluation state for ampersona trust-phase transitions.
|
||||
//!
|
||||
//! This module is only compiled when the `ampersona-gates` feature is active
|
||||
//! (module declaration in `mod.rs` is behind `#[cfg]`).
|
||||
//!
|
||||
//! Gate decisions do NOT change SOP execution behavior — this is purely
|
||||
//! observation + phase state tracking + audit logging.
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Mutex;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use ampersona_core::spec::gates::Gate;
|
||||
use ampersona_core::state::{PendingTransition, PhaseState, TransitionRecord};
|
||||
use ampersona_core::traits::MetricsProvider;
|
||||
use ampersona_engine::gates::decision::GateDecisionRecord;
|
||||
use ampersona_engine::gates::evaluator::DefaultGateEvaluator;
|
||||
use anyhow::Result;
|
||||
use chrono::Utc;
|
||||
use std::sync::Arc;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::memory::traits::{Memory, MemoryCategory};
|
||||
|
||||
const PHASE_STATE_KEY: &str = "sop_phase_state";
|
||||
|
||||
fn sop_category() -> MemoryCategory {
|
||||
MemoryCategory::Custom("sop".into())
|
||||
}
|
||||
|
||||
// ── Inner state ────────────────────────────────────────────────
|
||||
|
||||
struct GateEvalInner {
|
||||
phase_state: PhaseState,
|
||||
last_tick: Instant,
|
||||
}
|
||||
|
||||
// ── GateEvalState ──────────────────────────────────────────────
|
||||
|
||||
/// Manages trust-phase gate evaluation state.
|
||||
///
|
||||
/// Single `Mutex<GateEvalInner>` ensures atomic interval-check + evaluate + apply.
|
||||
/// `DefaultGateEvaluator` is a unit struct — called inline, not stored.
|
||||
pub struct GateEvalState {
|
||||
inner: Mutex<GateEvalInner>,
|
||||
memory: Arc<dyn Memory>,
|
||||
gates: Vec<Gate>,
|
||||
tick_interval: Duration,
|
||||
}
|
||||
|
||||
impl GateEvalState {
|
||||
/// Create with fresh (default) phase state.
|
||||
pub fn new(
|
||||
agent_name: &str,
|
||||
gates: Vec<Gate>,
|
||||
interval_secs: u64,
|
||||
memory: Arc<dyn Memory>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Mutex::new(GateEvalInner {
|
||||
phase_state: PhaseState::new(agent_name.to_string()),
|
||||
last_tick: Instant::now(),
|
||||
}),
|
||||
memory,
|
||||
gates,
|
||||
tick_interval: Duration::from_secs(interval_secs),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create with a known phase state (warm-start).
|
||||
pub fn with_state(
|
||||
state: PhaseState,
|
||||
gates: Vec<Gate>,
|
||||
interval_secs: u64,
|
||||
memory: Arc<dyn Memory>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Mutex::new(GateEvalInner {
|
||||
phase_state: state,
|
||||
last_tick: Instant::now(),
|
||||
}),
|
||||
memory,
|
||||
gates,
|
||||
tick_interval: Duration::from_secs(interval_secs),
|
||||
}
|
||||
}
|
||||
|
||||
/// Load gate definitions from a persona JSON file.
|
||||
///
|
||||
/// Expects `{"gates": [...]}` at the top level. Missing file → empty Vec.
|
||||
/// Parse error → warn log + empty Vec.
|
||||
pub fn load_gates_from_file(path: &Path) -> Vec<Gate> {
|
||||
let content = match std::fs::read_to_string(path) {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Vec::new(),
|
||||
};
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
struct PersonaGates {
|
||||
#[serde(default)]
|
||||
gates: Vec<Gate>,
|
||||
}
|
||||
|
||||
match serde_json::from_str::<PersonaGates>(&content) {
|
||||
Ok(parsed) => parsed.gates,
|
||||
Err(e) => {
|
||||
warn!(path = %path.display(), error = %e, "failed to parse gates from persona file");
|
||||
Vec::new()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Rebuild from Memory backend (warm-start).
|
||||
///
|
||||
/// Loads `PhaseState` from Memory key `sop_phase_state`, loads gates from
|
||||
/// file, falls back to fresh state on parse error.
|
||||
pub async fn rebuild_from_memory(
|
||||
memory: Arc<dyn Memory>,
|
||||
agent_name: &str,
|
||||
gates_file: Option<&Path>,
|
||||
interval_secs: u64,
|
||||
) -> Result<Self> {
|
||||
let gates = gates_file
|
||||
.map(Self::load_gates_from_file)
|
||||
.unwrap_or_default();
|
||||
|
||||
let phase_state = match memory.get(PHASE_STATE_KEY).await? {
|
||||
Some(entry) => match serde_json::from_str::<PhaseState>(&entry.content) {
|
||||
Ok(state) => {
|
||||
info!(
|
||||
phase = ?state.current_phase,
|
||||
rev = state.state_rev,
|
||||
"gate eval warm-started from memory"
|
||||
);
|
||||
state
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(error = %e, "failed to parse phase state from memory, using fresh state");
|
||||
PhaseState::new(agent_name.to_string())
|
||||
}
|
||||
},
|
||||
None => PhaseState::new(agent_name.to_string()),
|
||||
};
|
||||
|
||||
Ok(Self::with_state(phase_state, gates, interval_secs, memory))
|
||||
}
|
||||
|
||||
/// Atomic tick: interval check + evaluate + apply under single lock.
|
||||
///
|
||||
/// Returns `Some(record)` if a gate fired, `None` otherwise.
|
||||
pub fn tick(&self, metrics: &dyn MetricsProvider) -> Option<GateDecisionRecord> {
|
||||
let _span = tracing::info_span!("gate_eval_tick", gates = self.gates.len()).entered();
|
||||
|
||||
// interval_secs=0 means disabled
|
||||
if self.tick_interval.is_zero() {
|
||||
return None;
|
||||
}
|
||||
|
||||
if self.inner.is_poisoned() {
|
||||
error!("gate eval mutex poisoned — loss of gate evaluation until restart");
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut inner = self.inner.lock().ok()?;
|
||||
|
||||
// Check interval
|
||||
if inner.last_tick.elapsed() < self.tick_interval {
|
||||
return None;
|
||||
}
|
||||
inner.last_tick = Instant::now();
|
||||
|
||||
// Evaluate
|
||||
let record = DefaultGateEvaluator.evaluate(&self.gates, &inner.phase_state, metrics);
|
||||
|
||||
match record {
|
||||
Some(ref record) => {
|
||||
// Apply decision in-place under the same lock
|
||||
apply_decision(&mut inner.phase_state, record);
|
||||
info!(
|
||||
gate_id = %record.gate_id,
|
||||
decision = %record.decision,
|
||||
from = ?record.from_phase,
|
||||
to = %record.to_phase,
|
||||
"gate decision"
|
||||
);
|
||||
}
|
||||
None => {
|
||||
debug!("no gate fired");
|
||||
}
|
||||
}
|
||||
|
||||
record
|
||||
}
|
||||
|
||||
/// Persist current phase state to Memory.
|
||||
pub async fn persist(&self) -> Result<()> {
|
||||
let content = {
|
||||
let inner = self
|
||||
.inner
|
||||
.lock()
|
||||
.map_err(|e| anyhow::anyhow!("gate eval lock poisoned: {e}"))?;
|
||||
serde_json::to_string_pretty(&inner.phase_state)?
|
||||
};
|
||||
self.memory
|
||||
.store(PHASE_STATE_KEY, &content, sop_category(), None)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Snapshot of current phase state (for diagnostics / sop_status).
|
||||
pub fn phase_state_snapshot(&self) -> Option<PhaseState> {
|
||||
self.inner.lock().ok().map(|g| g.phase_state.clone())
|
||||
}
|
||||
|
||||
/// Number of loaded gate definitions.
|
||||
pub fn gate_count(&self) -> usize {
|
||||
self.gates.len()
|
||||
}
|
||||
}
|
||||
|
||||
// ── Decision application ───────────────────────────────────────
|
||||
|
||||
fn apply_decision(state: &mut PhaseState, record: &GateDecisionRecord) {
|
||||
match record.decision.as_str() {
|
||||
"transition" => {
|
||||
state.current_phase = Some(record.to_phase.clone());
|
||||
state.state_rev += 1;
|
||||
state.last_transition = Some(TransitionRecord {
|
||||
gate_id: record.gate_id.clone(),
|
||||
from_phase: record.from_phase.clone(),
|
||||
to_phase: record.to_phase.clone(),
|
||||
at: Utc::now(),
|
||||
decision_id: format!(
|
||||
"{}-{}-{}",
|
||||
record.gate_id, record.state_rev, record.metrics_hash
|
||||
),
|
||||
metrics_hash: Some(record.metrics_hash.clone()),
|
||||
state_rev: state.state_rev,
|
||||
});
|
||||
state.pending_transition = None;
|
||||
state.updated_at = Utc::now();
|
||||
}
|
||||
"observed" => {
|
||||
debug!(
|
||||
gate_id = %record.gate_id,
|
||||
"observed gate — no state change"
|
||||
);
|
||||
}
|
||||
"pending_human" => {
|
||||
state.pending_transition = Some(PendingTransition {
|
||||
gate_id: record.gate_id.clone(),
|
||||
from_phase: record.from_phase.clone(),
|
||||
to_phase: record.to_phase.clone(),
|
||||
decision: record.decision.clone(),
|
||||
metrics_hash: record.metrics_hash.clone(),
|
||||
state_rev: record.state_rev,
|
||||
created_at: Utc::now(),
|
||||
});
|
||||
state.updated_at = Utc::now();
|
||||
}
|
||||
other => {
|
||||
warn!(decision = %other, gate_id = %record.gate_id, "unknown gate decision — skipping");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Tests ──────────────────────────────────────────────────────
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use ampersona_core::errors::MetricError;
|
||||
use ampersona_core::spec::gates::Gate;
|
||||
use ampersona_core::traits::{MetricQuery, MetricSample};
|
||||
use ampersona_core::types::{CriterionOp, GateApproval, GateDirection, GateEnforcement};
|
||||
use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
|
||||
// ── Mock MetricsProvider ──────────────────────────────────
|
||||
|
||||
struct MockMetrics {
|
||||
values: HashMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
impl MockMetrics {
|
||||
fn new(values: Vec<(&str, serde_json::Value)>) -> Self {
|
||||
Self {
|
||||
values: values
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k.to_string(), v))
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MetricsProvider for MockMetrics {
|
||||
fn get_metric(&self, query: &MetricQuery) -> Result<MetricSample, MetricError> {
|
||||
self.values
|
||||
.get(&query.name)
|
||||
.cloned()
|
||||
.map(|value| MetricSample {
|
||||
name: query.name.clone(),
|
||||
value,
|
||||
sampled_at: Utc::now(),
|
||||
})
|
||||
.ok_or_else(|| MetricError::NotFound(query.name.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
// ── Helpers ───────────────────────────────────────────────
|
||||
|
||||
fn make_promote_gate(
|
||||
id: &str,
|
||||
metric: &str,
|
||||
op: CriterionOp,
|
||||
value: serde_json::Value,
|
||||
to_phase: &str,
|
||||
) -> Gate {
|
||||
Gate {
|
||||
id: id.into(),
|
||||
direction: GateDirection::Promote,
|
||||
enforcement: GateEnforcement::Enforce,
|
||||
priority: 0,
|
||||
cooldown_seconds: 0,
|
||||
from_phase: None,
|
||||
to_phase: to_phase.into(),
|
||||
criteria: vec![ampersona_core::spec::gates::Criterion {
|
||||
metric: metric.into(),
|
||||
op,
|
||||
value,
|
||||
window_seconds: None,
|
||||
}],
|
||||
metrics_schema: None,
|
||||
approval: GateApproval::Auto,
|
||||
on_pass: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn test_memory() -> Arc<dyn Memory> {
|
||||
let mem_cfg = crate::config::MemoryConfig {
|
||||
backend: "sqlite".into(),
|
||||
..crate::config::MemoryConfig::default()
|
||||
};
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
Arc::from(crate::memory::create_memory(&mem_cfg, tmp.path(), None).unwrap())
|
||||
}
|
||||
|
||||
// ── Tests ─────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn tick_no_gates_returns_none() {
|
||||
let mem = test_memory();
|
||||
let ge = GateEvalState::new("test-agent", vec![], 1, mem);
|
||||
let metrics = MockMetrics::new(vec![]);
|
||||
// Force past interval
|
||||
{
|
||||
let mut inner = ge.inner.lock().unwrap();
|
||||
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
|
||||
}
|
||||
assert!(ge.tick(&metrics).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_with_passing_gate_returns_decision() {
|
||||
let mem = test_memory();
|
||||
let gate = make_promote_gate(
|
||||
"g1",
|
||||
"sop.completion_rate",
|
||||
CriterionOp::Gte,
|
||||
json!(0.8),
|
||||
"active",
|
||||
);
|
||||
let ge = GateEvalState::new("test-agent", vec![gate], 1, mem);
|
||||
let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.9))]);
|
||||
{
|
||||
let mut inner = ge.inner.lock().unwrap();
|
||||
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
|
||||
}
|
||||
let record = ge.tick(&metrics);
|
||||
assert!(record.is_some());
|
||||
let record = record.unwrap();
|
||||
assert_eq!(record.gate_id, "g1");
|
||||
assert_eq!(record.to_phase, "active");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_transition_advances_phase() {
|
||||
let mem = test_memory();
|
||||
let gate = make_promote_gate(
|
||||
"g1",
|
||||
"sop.completion_rate",
|
||||
CriterionOp::Gte,
|
||||
json!(0.8),
|
||||
"active",
|
||||
);
|
||||
let ge = GateEvalState::new("test-agent", vec![gate], 1, mem);
|
||||
let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]);
|
||||
{
|
||||
let mut inner = ge.inner.lock().unwrap();
|
||||
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
|
||||
}
|
||||
ge.tick(&metrics);
|
||||
|
||||
let snap = ge.phase_state_snapshot().unwrap();
|
||||
assert_eq!(snap.current_phase, Some("active".into()));
|
||||
assert!(snap.state_rev > 0);
|
||||
assert!(snap.last_transition.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_observed_no_state_change() {
|
||||
let mem = test_memory();
|
||||
let mut gate = make_promote_gate(
|
||||
"g1",
|
||||
"sop.completion_rate",
|
||||
CriterionOp::Gte,
|
||||
json!(0.8),
|
||||
"active",
|
||||
);
|
||||
gate.enforcement = GateEnforcement::Observe;
|
||||
let ge = GateEvalState::new("test-agent", vec![gate], 1, mem);
|
||||
let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]);
|
||||
{
|
||||
let mut inner = ge.inner.lock().unwrap();
|
||||
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
|
||||
}
|
||||
let record = ge.tick(&metrics);
|
||||
assert!(record.is_some());
|
||||
assert_eq!(record.unwrap().decision, "observed");
|
||||
|
||||
let snap = ge.phase_state_snapshot().unwrap();
|
||||
assert!(snap.current_phase.is_none()); // no change
|
||||
assert_eq!(snap.state_rev, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_pending_human_sets_pending() {
|
||||
let mem = test_memory();
|
||||
let mut gate = make_promote_gate(
|
||||
"g1",
|
||||
"sop.completion_rate",
|
||||
CriterionOp::Gte,
|
||||
json!(0.8),
|
||||
"active",
|
||||
);
|
||||
gate.approval = GateApproval::Human;
|
||||
let ge = GateEvalState::new("test-agent", vec![gate], 1, mem);
|
||||
let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]);
|
||||
{
|
||||
let mut inner = ge.inner.lock().unwrap();
|
||||
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
|
||||
}
|
||||
let record = ge.tick(&metrics);
|
||||
assert!(record.is_some());
|
||||
assert_eq!(record.unwrap().decision, "pending_human");
|
||||
|
||||
let snap = ge.phase_state_snapshot().unwrap();
|
||||
assert!(snap.pending_transition.is_some());
|
||||
assert_eq!(snap.pending_transition.unwrap().to_phase, "active");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn load_gates_missing_file_returns_empty() {
|
||||
let gates = GateEvalState::load_gates_from_file(Path::new("/nonexistent/persona.json"));
|
||||
assert!(gates.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn load_gates_valid_persona() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let path = dir.path().join("persona.json");
|
||||
std::fs::write(
|
||||
&path,
|
||||
r#"{
|
||||
"gates": [{
|
||||
"id": "g1",
|
||||
"direction": "promote",
|
||||
"to_phase": "active",
|
||||
"criteria": [{"metric": "sop.completion_rate", "op": "gte", "value": 0.8}]
|
||||
}]
|
||||
}"#,
|
||||
)
|
||||
.unwrap();
|
||||
let gates = GateEvalState::load_gates_from_file(&path);
|
||||
assert_eq!(gates.len(), 1);
|
||||
assert_eq!(gates[0].id, "g1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn load_gates_no_gates_key_returns_empty() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let path = dir.path().join("persona.json");
|
||||
std::fs::write(&path, r#"{"name": "test"}"#).unwrap();
|
||||
let gates = GateEvalState::load_gates_from_file(&path);
|
||||
assert!(gates.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn load_gates_invalid_json_returns_empty() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let path = dir.path().join("persona.json");
|
||||
std::fs::write(&path, "not json at all {{{").unwrap();
|
||||
let gates = GateEvalState::load_gates_from_file(&path);
|
||||
assert!(gates.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn warm_start_roundtrip() {
|
||||
let mem = test_memory();
|
||||
let gate = make_promote_gate(
|
||||
"g1",
|
||||
"sop.completion_rate",
|
||||
CriterionOp::Gte,
|
||||
json!(0.8),
|
||||
"active",
|
||||
);
|
||||
|
||||
// Create, tick to advance state, persist
|
||||
let ge = GateEvalState::new("test-agent", vec![gate.clone()], 1, Arc::clone(&mem));
|
||||
let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]);
|
||||
{
|
||||
let mut inner = ge.inner.lock().unwrap();
|
||||
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
|
||||
}
|
||||
ge.tick(&metrics);
|
||||
ge.persist().await.unwrap();
|
||||
|
||||
// Write gates file for rebuild
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let gates_path = dir.path().join("persona.json");
|
||||
std::fs::write(
|
||||
&gates_path,
|
||||
serde_json::to_string(&serde_json::json!({"gates": [gate]})).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Rebuild
|
||||
let ge2 = GateEvalState::rebuild_from_memory(
|
||||
Arc::clone(&mem),
|
||||
"test-agent",
|
||||
Some(gates_path.as_path()),
|
||||
1,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let snap = ge2.phase_state_snapshot().unwrap();
|
||||
assert_eq!(snap.current_phase, Some("active".into()));
|
||||
assert!(snap.state_rev > 0);
|
||||
assert_eq!(ge2.gate_count(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn warm_start_empty_memory() {
|
||||
let mem = test_memory();
|
||||
let ge = GateEvalState::rebuild_from_memory(Arc::clone(&mem), "test-agent", None, 60)
|
||||
.await
|
||||
.unwrap();
|
||||
let snap = ge.phase_state_snapshot().unwrap();
|
||||
assert!(snap.current_phase.is_none());
|
||||
assert_eq!(snap.state_rev, 0);
|
||||
assert_eq!(ge.gate_count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn demote_priority_over_promote() {
|
||||
let mem = test_memory();
|
||||
let promote = make_promote_gate(
|
||||
"promote-g",
|
||||
"sop.completion_rate",
|
||||
CriterionOp::Gte,
|
||||
json!(0.8),
|
||||
"active",
|
||||
);
|
||||
let mut demote = make_promote_gate(
|
||||
"demote-g",
|
||||
"sop.deviation_rate",
|
||||
CriterionOp::Gte,
|
||||
json!(0.3),
|
||||
"restricted",
|
||||
);
|
||||
demote.direction = GateDirection::Demote;
|
||||
demote.from_phase = Some("active".into());
|
||||
|
||||
let state = PhaseState {
|
||||
current_phase: Some("active".into()),
|
||||
..PhaseState::new("test-agent".into())
|
||||
};
|
||||
let ge = GateEvalState::with_state(state, vec![promote, demote], 1, mem);
|
||||
let metrics = MockMetrics::new(vec![
|
||||
("sop.completion_rate", json!(0.95)),
|
||||
("sop.deviation_rate", json!(0.5)),
|
||||
]);
|
||||
{
|
||||
let mut inner = ge.inner.lock().unwrap();
|
||||
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
|
||||
}
|
||||
let record = ge.tick(&metrics).unwrap();
|
||||
// Demote should fire first (evaluator sorts demote before promote)
|
||||
assert_eq!(record.gate_id, "demote-g");
|
||||
assert_eq!(record.to_phase, "restricted");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn idempotent_tick_after_apply() {
|
||||
let mem = test_memory();
|
||||
let gate = make_promote_gate(
|
||||
"g1",
|
||||
"sop.completion_rate",
|
||||
CriterionOp::Gte,
|
||||
json!(0.8),
|
||||
"active",
|
||||
);
|
||||
let ge = GateEvalState::new("test-agent", vec![gate], 1, mem);
|
||||
let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]);
|
||||
|
||||
// First tick — fires
|
||||
{
|
||||
let mut inner = ge.inner.lock().unwrap();
|
||||
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
|
||||
}
|
||||
let first = ge.tick(&metrics);
|
||||
assert!(first.is_some());
|
||||
|
||||
// Second tick with same metrics + updated state_rev — should not fire again
|
||||
// (evaluator idempotency via metrics_hash + state_rev)
|
||||
{
|
||||
let mut inner = ge.inner.lock().unwrap();
|
||||
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
|
||||
}
|
||||
let second = ge.tick(&metrics);
|
||||
assert!(second.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gate_tick_with_real_collector() {
|
||||
use crate::sop::metrics::SopMetricsCollector;
|
||||
use crate::sop::types::{
|
||||
SopEvent, SopRun, SopRunStatus, SopStepResult, SopStepStatus, SopTriggerSource,
|
||||
};
|
||||
|
||||
let mem = test_memory();
|
||||
let collector = SopMetricsCollector::new();
|
||||
|
||||
// Record a completed run
|
||||
let run = SopRun {
|
||||
run_id: "r1".into(),
|
||||
sop_name: "test-sop".into(),
|
||||
trigger_event: SopEvent {
|
||||
source: SopTriggerSource::Manual,
|
||||
topic: None,
|
||||
payload: None,
|
||||
timestamp: "2026-02-19T12:00:00Z".into(),
|
||||
},
|
||||
status: SopRunStatus::Completed,
|
||||
current_step: 1,
|
||||
total_steps: 1,
|
||||
started_at: "2026-02-19T12:00:00Z".into(),
|
||||
completed_at: Some("2026-02-19T12:05:00Z".into()),
|
||||
step_results: vec![SopStepResult {
|
||||
step_number: 1,
|
||||
status: SopStepStatus::Completed,
|
||||
output: "done".into(),
|
||||
started_at: "2026-02-19T12:00:00Z".into(),
|
||||
completed_at: Some("2026-02-19T12:01:00Z".into()),
|
||||
}],
|
||||
waiting_since: None,
|
||||
};
|
||||
collector.record_run_complete(&run);
|
||||
|
||||
let gate = make_promote_gate(
|
||||
"g1",
|
||||
"sop.completion_rate",
|
||||
CriterionOp::Gte,
|
||||
json!(0.8),
|
||||
"active",
|
||||
);
|
||||
let ge = GateEvalState::new("test-agent", vec![gate], 1, mem);
|
||||
{
|
||||
let mut inner = ge.inner.lock().unwrap();
|
||||
inner.last_tick = Instant::now().checked_sub(Duration::from_secs(10)).unwrap();
|
||||
}
|
||||
let record = ge.tick(&collector);
|
||||
assert!(record.is_some());
|
||||
assert_eq!(record.unwrap().to_phase, "active");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_respects_interval() {
|
||||
let mem = test_memory();
|
||||
let gate = make_promote_gate(
|
||||
"g1",
|
||||
"sop.completion_rate",
|
||||
CriterionOp::Gte,
|
||||
json!(0.8),
|
||||
"active",
|
||||
);
|
||||
|
||||
// Long interval
|
||||
let ge = GateEvalState::new("test-agent", vec![gate.clone()], 3600, mem.clone());
|
||||
let metrics = MockMetrics::new(vec![("sop.completion_rate", json!(0.95))]);
|
||||
// last_tick is Instant::now() — not enough elapsed
|
||||
assert!(ge.tick(&metrics).is_none());
|
||||
|
||||
// Zero interval = disabled
|
||||
let ge_disabled = GateEvalState::new("test-agent", vec![gate], 0, mem);
|
||||
assert!(ge_disabled.tick(&metrics).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ampersona_decision_strings_stable() {
|
||||
// Canary test: verifies that DefaultGateEvaluator produces the decision
|
||||
// strings we expect. If ampersona changes them, this test fails.
|
||||
let state = PhaseState::new("test".into());
|
||||
|
||||
// Enforce promote → "transition"
|
||||
let enforce_gate =
|
||||
make_promote_gate("g-enforce", "m", CriterionOp::Gte, json!(1), "phase-b");
|
||||
let metrics = MockMetrics::new(vec![("m", json!(1))]);
|
||||
let record = DefaultGateEvaluator.evaluate(&[enforce_gate], &state, &metrics);
|
||||
assert_eq!(
|
||||
record.as_ref().map(|r| r.decision.as_str()),
|
||||
Some("transition")
|
||||
);
|
||||
|
||||
// Observe promote → "observed"
|
||||
let mut observe_gate =
|
||||
make_promote_gate("g-observe", "m", CriterionOp::Gte, json!(1), "phase-b");
|
||||
observe_gate.enforcement = GateEnforcement::Observe;
|
||||
let record = DefaultGateEvaluator.evaluate(&[observe_gate], &state, &metrics);
|
||||
assert_eq!(
|
||||
record.as_ref().map(|r| r.decision.as_str()),
|
||||
Some("observed")
|
||||
);
|
||||
|
||||
// RequireApproval promote → "pending_human"
|
||||
let mut approval_gate =
|
||||
make_promote_gate("g-approval", "m", CriterionOp::Gte, json!(1), "phase-b");
|
||||
approval_gate.approval = GateApproval::Human;
|
||||
let record = DefaultGateEvaluator.evaluate(&[approval_gate], &state, &metrics);
|
||||
assert_eq!(
|
||||
record.as_ref().map(|r| r.decision.as_str()),
|
||||
Some("pending_human")
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -413,34 +413,6 @@ impl Default for SopMetricsCollector {
|
||||
}
|
||||
}
|
||||
|
||||
// ── Conditional MetricsProvider impl ───────────────────────────
|
||||
|
||||
#[cfg(feature = "ampersona-gates")]
|
||||
impl ampersona_core::traits::MetricsProvider for SopMetricsCollector {
|
||||
fn get_metric(
|
||||
&self,
|
||||
query: &ersona_core::traits::MetricQuery,
|
||||
) -> Result<ampersona_core::traits::MetricSample, ampersona_core::errors::MetricError> {
|
||||
if self.inner.is_poisoned() {
|
||||
return Err(ampersona_core::errors::MetricError::ProviderUnavailable);
|
||||
}
|
||||
let value = if let Some(ref window) = query.window {
|
||||
// Window specified by evaluator (from Criterion.window_seconds)
|
||||
self.get_metric_value_windowed(&query.name, window)
|
||||
} else {
|
||||
// No window — use name as-is (may include _7d/_30d suffix or be all-time)
|
||||
self.get_metric_value(&query.name)
|
||||
};
|
||||
value
|
||||
.map(|v| ampersona_core::traits::MetricSample {
|
||||
name: query.name.clone(),
|
||||
value: v,
|
||||
sampled_at: Utc::now(),
|
||||
})
|
||||
.ok_or_else(|| ampersona_core::errors::MetricError::NotFound(query.name.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
// ── Helpers ────────────────────────────────────────────────────
|
||||
|
||||
fn build_snapshot(run: &SopRun, human_count: u64, timeout_count: u64) -> RunSnapshot {
|
||||
@ -637,6 +609,9 @@ mod tests {
|
||||
total_steps: u32,
|
||||
step_results: Vec<SopStepResult>,
|
||||
) -> SopRun {
|
||||
let now = Utc::now();
|
||||
let started = (now - chrono::Duration::minutes(5)).to_rfc3339();
|
||||
let completed = now.to_rfc3339();
|
||||
SopRun {
|
||||
run_id: run_id.into(),
|
||||
sop_name: sop_name.into(),
|
||||
@ -644,10 +619,11 @@ mod tests {
|
||||
status,
|
||||
current_step: total_steps,
|
||||
total_steps,
|
||||
started_at: "2026-02-19T12:00:00Z".into(),
|
||||
completed_at: Some("2026-02-19T12:05:00Z".into()),
|
||||
started_at: started,
|
||||
completed_at: Some(completed),
|
||||
step_results,
|
||||
waiting_since: None,
|
||||
llm_calls_saved: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@ -1141,43 +1117,6 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
// ── MetricsProvider impl (ampersona-gates feature) ───────
|
||||
|
||||
#[cfg(feature = "ampersona-gates")]
|
||||
#[test]
|
||||
fn metrics_provider_get_metric() {
|
||||
use ampersona_core::traits::{MetricQuery, MetricsProvider};
|
||||
|
||||
let c = SopMetricsCollector::new();
|
||||
let run = make_run(
|
||||
"r1",
|
||||
"test-sop",
|
||||
SopRunStatus::Completed,
|
||||
1,
|
||||
vec![make_step(1, SopStepStatus::Completed)],
|
||||
);
|
||||
c.record_run_complete(&run);
|
||||
|
||||
let query = MetricQuery {
|
||||
name: "sop.runs_completed".into(),
|
||||
window: None,
|
||||
};
|
||||
let sample = c.get_metric(&query).unwrap();
|
||||
assert_eq!(sample.value, json!(1u64));
|
||||
assert_eq!(sample.name, "sop.runs_completed");
|
||||
|
||||
// NotFound for unknown metric
|
||||
let bad_query = MetricQuery {
|
||||
name: "sop.nonexistent".into(),
|
||||
window: None,
|
||||
};
|
||||
let err = c.get_metric(&bad_query).unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
ampersona_core::errors::MetricError::NotFound(_)
|
||||
));
|
||||
}
|
||||
|
||||
// ── Warm-start tests ─────────────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
@ -1245,6 +1184,7 @@ mod tests {
|
||||
completed_at: None,
|
||||
step_results: vec![],
|
||||
waiting_since: None,
|
||||
llm_calls_saved: 0,
|
||||
};
|
||||
audit.log_run_start(&run).await.unwrap();
|
||||
|
||||
@ -1342,6 +1282,7 @@ mod tests {
|
||||
completed_at: None,
|
||||
step_results: vec![],
|
||||
waiting_since: None,
|
||||
llm_calls_saved: 0,
|
||||
};
|
||||
audit.log_run_start(&running_run).await.unwrap();
|
||||
audit.log_approval(&running_run, 1).await.unwrap();
|
||||
@ -1385,7 +1326,7 @@ mod tests {
|
||||
assert_eq!(hic_7d, 1);
|
||||
}
|
||||
|
||||
// ── Windowed MetricsProvider tests (ampersona-gates feature) ──
|
||||
// ── Windowed MetricsProvider tests ──
|
||||
|
||||
#[test]
|
||||
fn get_metric_windowed_7d_matches_suffix() {
|
||||
@ -1461,32 +1402,4 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(val, 2);
|
||||
}
|
||||
|
||||
#[cfg(feature = "ampersona-gates")]
|
||||
#[test]
|
||||
fn get_metric_provider_window_propagation() {
|
||||
use ampersona_core::traits::{MetricQuery, MetricsProvider};
|
||||
|
||||
let c = SopMetricsCollector::new();
|
||||
let run = make_run(
|
||||
"r1",
|
||||
"test-sop",
|
||||
SopRunStatus::Completed,
|
||||
1,
|
||||
vec![make_step(1, SopStepStatus::Completed)],
|
||||
);
|
||||
c.record_run_complete(&run);
|
||||
|
||||
// Query with window via MetricsProvider trait
|
||||
let query = MetricQuery {
|
||||
name: "sop.runs_completed".into(),
|
||||
window: Some(std::time::Duration::from_secs(7 * 86400)),
|
||||
};
|
||||
let sample = c.get_metric(&query).unwrap();
|
||||
assert_eq!(sample.value, json!(1u64));
|
||||
|
||||
// Same result as suffix-based query
|
||||
let suffix_val = c.get_metric_value("sop.runs_completed_7d");
|
||||
assert_eq!(Some(sample.value), suffix_val);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,20 +2,17 @@ pub mod audit;
|
||||
pub mod condition;
|
||||
pub mod dispatch;
|
||||
pub mod engine;
|
||||
#[cfg(feature = "ampersona-gates")]
|
||||
pub mod gates;
|
||||
pub mod metrics;
|
||||
pub mod types;
|
||||
|
||||
pub use audit::SopAuditLogger;
|
||||
pub use engine::SopEngine;
|
||||
#[cfg(feature = "ampersona-gates")]
|
||||
pub use gates::GateEvalState;
|
||||
pub use metrics::SopMetricsCollector;
|
||||
#[allow(unused_imports)]
|
||||
pub use types::{
|
||||
Sop, SopEvent, SopExecutionMode, SopPriority, SopRun, SopRunAction, SopRunStatus, SopStep,
|
||||
SopStepResult, SopStepStatus, SopTrigger, SopTriggerSource,
|
||||
DeterministicRunState, DeterministicSavings, Sop, SopEvent, SopExecutionMode, SopPriority,
|
||||
SopRun, SopRunAction, SopRunStatus, SopStep, SopStepKind, SopStepResult, SopStepStatus,
|
||||
SopTrigger, SopTriggerSource, StepSchema,
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
@ -24,6 +21,19 @@ use tracing::warn;
|
||||
|
||||
use types::{SopManifest, SopMeta};
|
||||
|
||||
/// Parse an execution mode string into `SopExecutionMode`, falling back to
|
||||
/// `Supervised` for unknown values.
|
||||
pub fn parse_execution_mode(s: &str) -> SopExecutionMode {
|
||||
match s.trim().to_lowercase().as_str() {
|
||||
"auto" => SopExecutionMode::Auto,
|
||||
"step_by_step" => SopExecutionMode::StepByStep,
|
||||
"priority_based" => SopExecutionMode::PriorityBased,
|
||||
"deterministic" => SopExecutionMode::Deterministic,
|
||||
// "supervised" and any unknown value
|
||||
_ => SopExecutionMode::Supervised,
|
||||
}
|
||||
}
|
||||
|
||||
// ── SOP directory helpers ───────────────────────────────────────
|
||||
|
||||
/// Return the default SOPs directory: `<workspace>/sops`.
|
||||
@ -112,19 +122,28 @@ fn load_sop(sop_dir: &Path, default_execution_mode: SopExecutionMode) -> Result<
|
||||
execution_mode,
|
||||
cooldown_secs,
|
||||
max_concurrent,
|
||||
deterministic,
|
||||
} = manifest.sop;
|
||||
|
||||
// When deterministic=true, override execution_mode to Deterministic
|
||||
let effective_mode = if deterministic {
|
||||
SopExecutionMode::Deterministic
|
||||
} else {
|
||||
execution_mode.unwrap_or(default_execution_mode)
|
||||
};
|
||||
|
||||
Ok(Sop {
|
||||
name,
|
||||
description,
|
||||
version,
|
||||
priority,
|
||||
execution_mode: execution_mode.unwrap_or(default_execution_mode),
|
||||
execution_mode: effective_mode,
|
||||
triggers: manifest.triggers,
|
||||
steps,
|
||||
cooldown_secs,
|
||||
max_concurrent,
|
||||
location: Some(sop_dir.to_path_buf()),
|
||||
deterministic,
|
||||
})
|
||||
}
|
||||
|
||||
@ -143,6 +162,7 @@ pub fn parse_steps(md: &str) -> Vec<SopStep> {
|
||||
let mut current_body = String::new();
|
||||
let mut current_tools: Vec<String> = Vec::new();
|
||||
let mut current_requires_confirmation = false;
|
||||
let mut current_kind = SopStepKind::Execute;
|
||||
|
||||
for line in md.lines() {
|
||||
let trimmed = line.trim();
|
||||
@ -164,6 +184,7 @@ pub fn parse_steps(md: &str) -> Vec<SopStep> {
|
||||
&mut current_body,
|
||||
&mut current_tools,
|
||||
&mut current_requires_confirmation,
|
||||
&mut current_kind,
|
||||
);
|
||||
in_steps_section = false;
|
||||
}
|
||||
@ -184,6 +205,7 @@ pub fn parse_steps(md: &str) -> Vec<SopStep> {
|
||||
&mut current_body,
|
||||
&mut current_tools,
|
||||
&mut current_requires_confirmation,
|
||||
&mut current_kind,
|
||||
);
|
||||
|
||||
let step_num = u32::try_from(steps.len())
|
||||
@ -217,6 +239,15 @@ pub fn parse_steps(md: &str) -> Vec<SopStep> {
|
||||
if let Some(val) = bullet.strip_prefix("requires_confirmation:") {
|
||||
current_requires_confirmation = val.trim().eq_ignore_ascii_case("true");
|
||||
}
|
||||
} else if bullet.starts_with("kind:") {
|
||||
if let Some(val) = bullet.strip_prefix("kind:") {
|
||||
let val = val.trim();
|
||||
if val.eq_ignore_ascii_case("checkpoint") {
|
||||
current_kind = SopStepKind::Checkpoint;
|
||||
} else {
|
||||
current_kind = SopStepKind::Execute;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Continuation body line
|
||||
if !current_body.is_empty() {
|
||||
@ -244,6 +275,7 @@ pub fn parse_steps(md: &str) -> Vec<SopStep> {
|
||||
&mut current_body,
|
||||
&mut current_tools,
|
||||
&mut current_requires_confirmation,
|
||||
&mut current_kind,
|
||||
);
|
||||
|
||||
steps
|
||||
@ -257,6 +289,7 @@ fn flush_step(
|
||||
body: &mut String,
|
||||
tools: &mut Vec<String>,
|
||||
requires_confirmation: &mut bool,
|
||||
kind: &mut SopStepKind,
|
||||
) {
|
||||
if let Some(n) = number.take() {
|
||||
steps.push(SopStep {
|
||||
@ -265,9 +298,12 @@ fn flush_step(
|
||||
body: body.trim().to_string(),
|
||||
suggested_tools: std::mem::take(tools),
|
||||
requires_confirmation: *requires_confirmation,
|
||||
kind: *kind,
|
||||
schema: None,
|
||||
});
|
||||
*body = String::new();
|
||||
*requires_confirmation = false;
|
||||
*kind = SopStepKind::Execute;
|
||||
}
|
||||
}
|
||||
|
||||
@ -349,7 +385,7 @@ pub fn handle_command(command: crate::SopCommands, config: &crate::config::Confi
|
||||
let sops = load_sops(
|
||||
&config.workspace_dir,
|
||||
sops_dir_override,
|
||||
config.sop.default_execution_mode,
|
||||
parse_execution_mode(&config.sop.default_execution_mode),
|
||||
);
|
||||
if sops.is_empty() {
|
||||
println!("No SOPs found.");
|
||||
@ -393,7 +429,7 @@ pub fn handle_command(command: crate::SopCommands, config: &crate::config::Confi
|
||||
let sops = load_sops(
|
||||
&config.workspace_dir,
|
||||
sops_dir_override,
|
||||
config.sop.default_execution_mode,
|
||||
parse_execution_mode(&config.sop.default_execution_mode),
|
||||
);
|
||||
let matching: Vec<&Sop> = if let Some(ref name) = name {
|
||||
sops.iter().filter(|s| s.name == *name).collect()
|
||||
@ -443,7 +479,7 @@ pub fn handle_command(command: crate::SopCommands, config: &crate::config::Confi
|
||||
let sops = load_sops(
|
||||
&config.workspace_dir,
|
||||
sops_dir_override,
|
||||
config.sop.default_execution_mode,
|
||||
parse_execution_mode(&config.sop.default_execution_mode),
|
||||
);
|
||||
let sop = sops
|
||||
.iter()
|
||||
@ -474,16 +510,23 @@ pub fn handle_command(command: crate::SopCommands, config: &crate::config::Confi
|
||||
if !sop.steps.is_empty() {
|
||||
println!("Steps:");
|
||||
for step in &sop.steps {
|
||||
let confirm_tag = if step.requires_confirmation {
|
||||
" [requires confirmation]"
|
||||
let mut tags = Vec::new();
|
||||
if step.requires_confirmation {
|
||||
tags.push("requires confirmation");
|
||||
}
|
||||
if step.kind == SopStepKind::Checkpoint {
|
||||
tags.push("checkpoint");
|
||||
}
|
||||
let tag_str = if tags.is_empty() {
|
||||
String::new()
|
||||
} else {
|
||||
""
|
||||
format!(" [{}]", tags.join(", "))
|
||||
};
|
||||
println!(
|
||||
" {}. {}{}",
|
||||
step.number,
|
||||
console::style(&step.title).bold(),
|
||||
confirm_tag
|
||||
tag_str
|
||||
);
|
||||
if !step.body.is_empty() {
|
||||
for line in step.body.lines() {
|
||||
@ -705,6 +748,7 @@ type = "manual"
|
||||
cooldown_secs: 0,
|
||||
max_concurrent: 1,
|
||||
location: None,
|
||||
deterministic: false,
|
||||
};
|
||||
|
||||
let warnings = validate_sop(&sop);
|
||||
@ -729,10 +773,13 @@ type = "manual"
|
||||
body: "Do the thing".into(),
|
||||
suggested_tools: vec!["shell".into()],
|
||||
requires_confirmation: false,
|
||||
kind: SopStepKind::default(),
|
||||
schema: None,
|
||||
}],
|
||||
cooldown_secs: 0,
|
||||
max_concurrent: 1,
|
||||
location: None,
|
||||
deterministic: false,
|
||||
};
|
||||
|
||||
let warnings = validate_sop(&sop);
|
||||
|
||||
113
src/sop/types.rs
113
src/sop/types.rs
@ -1,5 +1,6 @@
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
|
||||
@ -42,6 +43,10 @@ pub enum SopExecutionMode {
|
||||
StepByStep,
|
||||
/// Critical/High → Auto, Normal/Low → Supervised.
|
||||
PriorityBased,
|
||||
/// Execute steps sequentially without LLM round-trips.
|
||||
/// Step outputs are piped as inputs to the next step.
|
||||
/// Checkpoint steps pause for human approval.
|
||||
Deterministic,
|
||||
}
|
||||
|
||||
impl fmt::Display for SopExecutionMode {
|
||||
@ -51,6 +56,7 @@ impl fmt::Display for SopExecutionMode {
|
||||
Self::Supervised => write!(f, "supervised"),
|
||||
Self::StepByStep => write!(f, "step_by_step"),
|
||||
Self::PriorityBased => write!(f, "priority_based"),
|
||||
Self::Deterministic => write!(f, "deterministic"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -93,6 +99,44 @@ impl fmt::Display for SopTrigger {
|
||||
}
|
||||
}
|
||||
|
||||
// ── Step kind ────────────────────────────────────────────────────
|
||||
|
||||
/// The kind of a workflow step.
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum SopStepKind {
|
||||
/// Normal step — executed by the agent (or deterministic handler).
|
||||
#[default]
|
||||
Execute,
|
||||
/// Checkpoint step — pauses execution and waits for human approval.
|
||||
Checkpoint,
|
||||
}
|
||||
|
||||
impl fmt::Display for SopStepKind {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Execute => write!(f, "execute"),
|
||||
Self::Checkpoint => write!(f, "checkpoint"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Typed step parameters ────────────────────────────────────────
|
||||
|
||||
/// JSON Schema fragment for validating step input/output data.
|
||||
///
|
||||
/// Stored as a raw `serde_json::Value` so callers can validate without
|
||||
/// pulling in a full JSON Schema library.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct StepSchema {
|
||||
/// JSON Schema object describing expected input shape.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub input: Option<serde_json::Value>,
|
||||
/// JSON Schema object describing expected output shape.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub output: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
// ── Step ────────────────────────────────────────────────────────
|
||||
|
||||
/// A single step in an SOP procedure, parsed from SOP.md.
|
||||
@ -105,6 +149,12 @@ pub struct SopStep {
|
||||
pub suggested_tools: Vec<String>,
|
||||
#[serde(default)]
|
||||
pub requires_confirmation: bool,
|
||||
/// Step kind: `execute` (default) or `checkpoint`.
|
||||
#[serde(default)]
|
||||
pub kind: SopStepKind,
|
||||
/// Typed input/output schemas for deterministic data flow validation.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub schema: Option<StepSchema>,
|
||||
}
|
||||
|
||||
// ── SOP ─────────────────────────────────────────────────────────
|
||||
@ -125,6 +175,10 @@ pub struct Sop {
|
||||
pub max_concurrent: u32,
|
||||
#[serde(skip)]
|
||||
pub location: Option<PathBuf>,
|
||||
/// When true, sets execution_mode to Deterministic.
|
||||
/// Steps execute sequentially without LLM round-trips.
|
||||
#[serde(default)]
|
||||
pub deterministic: bool,
|
||||
}
|
||||
|
||||
fn default_cooldown_secs() -> u64 {
|
||||
@ -160,6 +214,9 @@ pub(crate) struct SopMeta {
|
||||
pub cooldown_secs: u64,
|
||||
#[serde(default = "default_max_concurrent")]
|
||||
pub max_concurrent: u32,
|
||||
/// Opt-in deterministic execution (no LLM round-trips between steps).
|
||||
#[serde(default)]
|
||||
pub deterministic: bool,
|
||||
}
|
||||
|
||||
fn default_sop_version() -> String {
|
||||
@ -214,6 +271,8 @@ pub enum SopRunStatus {
|
||||
Pending,
|
||||
Running,
|
||||
WaitingApproval,
|
||||
/// Paused at a checkpoint in a deterministic workflow.
|
||||
PausedCheckpoint,
|
||||
Completed,
|
||||
Failed,
|
||||
Cancelled,
|
||||
@ -225,6 +284,7 @@ impl fmt::Display for SopRunStatus {
|
||||
Self::Pending => write!(f, "pending"),
|
||||
Self::Running => write!(f, "running"),
|
||||
Self::WaitingApproval => write!(f, "waiting_approval"),
|
||||
Self::PausedCheckpoint => write!(f, "paused_checkpoint"),
|
||||
Self::Completed => write!(f, "completed"),
|
||||
Self::Failed => write!(f, "failed"),
|
||||
Self::Cancelled => write!(f, "cancelled"),
|
||||
@ -276,6 +336,44 @@ pub struct SopRun {
|
||||
/// ISO-8601 timestamp when the run entered WaitingApproval (for timeout tracking).
|
||||
#[serde(default)]
|
||||
pub waiting_since: Option<String>,
|
||||
/// Number of LLM calls saved by deterministic execution in this run.
|
||||
#[serde(default)]
|
||||
pub llm_calls_saved: u64,
|
||||
}
|
||||
|
||||
// ── Deterministic workflow state (persistence + resume) ──────────
|
||||
|
||||
/// Persisted state for a deterministic workflow run, enabling resume
|
||||
/// after interruption. Serialized to a JSON file alongside the SOP.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DeterministicRunState {
|
||||
/// Identifier of this run.
|
||||
pub run_id: String,
|
||||
/// SOP name this state belongs to.
|
||||
pub sop_name: String,
|
||||
/// Last successfully completed step number (0 = none completed).
|
||||
pub last_completed_step: u32,
|
||||
/// Total steps in the workflow.
|
||||
pub total_steps: u32,
|
||||
/// Output of each completed step, keyed by step number.
|
||||
pub step_outputs: HashMap<u32, serde_json::Value>,
|
||||
/// ISO-8601 timestamp when this state was last persisted.
|
||||
pub persisted_at: String,
|
||||
/// Number of LLM calls that were saved by deterministic execution.
|
||||
pub llm_calls_saved: u64,
|
||||
/// Whether the run is paused at a checkpoint awaiting approval.
|
||||
pub paused_at_checkpoint: bool,
|
||||
}
|
||||
|
||||
// ── Cost savings metric ──────────────────────────────────────────
|
||||
|
||||
/// Tracks how many LLM round-trips were saved by deterministic execution.
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
pub struct DeterministicSavings {
|
||||
/// Total LLM calls saved across all deterministic runs.
|
||||
pub total_llm_calls_saved: u64,
|
||||
/// Total deterministic runs completed.
|
||||
pub total_runs: u64,
|
||||
}
|
||||
|
||||
/// What the engine instructs the caller to do next after a state transition.
|
||||
@ -293,6 +391,20 @@ pub enum SopRunAction {
|
||||
step: SopStep,
|
||||
context: String,
|
||||
},
|
||||
/// Execute a step deterministically (no LLM). The `input` is the piped
|
||||
/// output from the previous step (or trigger payload for step 1).
|
||||
DeterministicStep {
|
||||
run_id: String,
|
||||
step: SopStep,
|
||||
input: serde_json::Value,
|
||||
},
|
||||
/// Deterministic workflow hit a checkpoint — pause for human approval.
|
||||
/// Workflow state has been persisted so it can resume after approval.
|
||||
CheckpointWait {
|
||||
run_id: String,
|
||||
step: SopStep,
|
||||
state_file: PathBuf,
|
||||
},
|
||||
/// The SOP run completed successfully.
|
||||
Completed { run_id: String, sop_name: String },
|
||||
/// The SOP run failed.
|
||||
@ -459,6 +571,7 @@ path = "/sop/test"
|
||||
completed_at: Some("2026-02-19T12:00:05Z".into()),
|
||||
}],
|
||||
waiting_since: None,
|
||||
llm_calls_saved: 0,
|
||||
};
|
||||
let json = serde_json::to_string(&run).unwrap();
|
||||
let parsed: SopRun = serde_json::from_str(&json).unwrap();
|
||||
|
||||
@ -76,6 +76,11 @@ pub mod schema;
|
||||
pub mod screenshot;
|
||||
pub mod security_ops;
|
||||
pub mod shell;
|
||||
pub mod sop_advance;
|
||||
pub mod sop_approve;
|
||||
pub mod sop_execute;
|
||||
pub mod sop_list;
|
||||
pub mod sop_status;
|
||||
pub mod swarm;
|
||||
pub mod text_browser;
|
||||
pub mod tool_search;
|
||||
@ -146,6 +151,11 @@ pub use schema::{CleaningStrategy, SchemaCleanr};
|
||||
pub use screenshot::ScreenshotTool;
|
||||
pub use security_ops::SecurityOpsTool;
|
||||
pub use shell::ShellTool;
|
||||
pub use sop_advance::SopAdvanceTool;
|
||||
pub use sop_approve::SopApproveTool;
|
||||
pub use sop_execute::SopExecuteTool;
|
||||
pub use sop_list::SopListTool;
|
||||
pub use sop_status::SopStatusTool;
|
||||
pub use swarm::SwarmTool;
|
||||
pub use text_browser::TextBrowserTool;
|
||||
pub use tool_search::ToolSearchTool;
|
||||
@ -556,6 +566,18 @@ pub fn all_tools_with_runtime(
|
||||
)));
|
||||
}
|
||||
|
||||
// SOP tools (registered when sops_dir is configured)
|
||||
if root_config.sop.sops_dir.is_some() {
|
||||
let sop_engine = Arc::new(std::sync::Mutex::new(crate::sop::SopEngine::new(
|
||||
root_config.sop.clone(),
|
||||
)));
|
||||
tool_arcs.push(Arc::new(SopListTool::new(Arc::clone(&sop_engine))));
|
||||
tool_arcs.push(Arc::new(SopExecuteTool::new(Arc::clone(&sop_engine))));
|
||||
tool_arcs.push(Arc::new(SopAdvanceTool::new(Arc::clone(&sop_engine))));
|
||||
tool_arcs.push(Arc::new(SopApproveTool::new(Arc::clone(&sop_engine))));
|
||||
tool_arcs.push(Arc::new(SopStatusTool::new(Arc::clone(&sop_engine))));
|
||||
}
|
||||
|
||||
if let Some(key) = composio_key {
|
||||
if !key.is_empty() {
|
||||
tool_arcs.push(Arc::new(ComposioTool::new(
|
||||
|
||||
@ -181,6 +181,18 @@ impl Tool for SopAdvanceTool {
|
||||
} => {
|
||||
format!("SOP '{sop_name}' run {run_id} failed: {reason}")
|
||||
}
|
||||
SopRunAction::DeterministicStep { run_id, step, .. } => {
|
||||
format!(
|
||||
"Step recorded. Next deterministic step for run {run_id}: {}",
|
||||
step.title
|
||||
)
|
||||
}
|
||||
SopRunAction::CheckpointWait { run_id, step, .. } => {
|
||||
format!(
|
||||
"Step recorded. Run {run_id} paused at checkpoint: {}",
|
||||
step.title
|
||||
)
|
||||
}
|
||||
};
|
||||
Ok(ToolResult {
|
||||
success: true,
|
||||
@ -222,6 +234,8 @@ mod tests {
|
||||
body: "Do step one".into(),
|
||||
suggested_tools: vec![],
|
||||
requires_confirmation: false,
|
||||
kind: SopStepKind::default(),
|
||||
schema: None,
|
||||
},
|
||||
SopStep {
|
||||
number: 2,
|
||||
@ -229,11 +243,14 @@ mod tests {
|
||||
body: "Do step two".into(),
|
||||
suggested_tools: vec![],
|
||||
requires_confirmation: false,
|
||||
kind: SopStepKind::default(),
|
||||
schema: None,
|
||||
},
|
||||
],
|
||||
cooldown_secs: 0,
|
||||
max_concurrent: 1,
|
||||
location: None,
|
||||
deterministic: false,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -143,10 +143,13 @@ mod tests {
|
||||
body: "Do it".into(),
|
||||
suggested_tools: vec![],
|
||||
requires_confirmation: false,
|
||||
kind: SopStepKind::default(),
|
||||
schema: None,
|
||||
}],
|
||||
cooldown_secs: 0,
|
||||
max_concurrent: 1,
|
||||
location: None,
|
||||
deterministic: false,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -118,6 +118,18 @@ impl Tool for SopExecuteTool {
|
||||
SopRunAction::Failed { run_id, reason, .. } => {
|
||||
format!("SOP run {run_id} failed: {reason}")
|
||||
}
|
||||
SopRunAction::DeterministicStep { run_id, step, .. } => {
|
||||
format!(
|
||||
"SOP run started (deterministic): {run_id}\nFirst step: {}",
|
||||
step.title
|
||||
)
|
||||
}
|
||||
SopRunAction::CheckpointWait { run_id, step, .. } => {
|
||||
format!(
|
||||
"SOP run started: {run_id} (paused at checkpoint: {})",
|
||||
step.title
|
||||
)
|
||||
}
|
||||
};
|
||||
Ok(ToolResult {
|
||||
success: true,
|
||||
@ -140,7 +152,9 @@ fn action_run_id(action: &SopRunAction) -> Option<&str> {
|
||||
SopRunAction::ExecuteStep { run_id, .. }
|
||||
| SopRunAction::WaitApproval { run_id, .. }
|
||||
| SopRunAction::Completed { run_id, .. }
|
||||
| SopRunAction::Failed { run_id, .. } => Some(run_id),
|
||||
| SopRunAction::Failed { run_id, .. }
|
||||
| SopRunAction::DeterministicStep { run_id, .. }
|
||||
| SopRunAction::CheckpointWait { run_id, .. } => Some(run_id),
|
||||
}
|
||||
}
|
||||
|
||||
@ -168,6 +182,8 @@ mod tests {
|
||||
body: "Do step one".into(),
|
||||
suggested_tools: vec!["shell".into()],
|
||||
requires_confirmation: false,
|
||||
kind: SopStepKind::default(),
|
||||
schema: None,
|
||||
},
|
||||
SopStep {
|
||||
number: 2,
|
||||
@ -175,11 +191,14 @@ mod tests {
|
||||
body: "Do step two".into(),
|
||||
suggested_tools: vec![],
|
||||
requires_confirmation: false,
|
||||
kind: SopStepKind::default(),
|
||||
schema: None,
|
||||
},
|
||||
],
|
||||
cooldown_secs: 0,
|
||||
max_concurrent: 1,
|
||||
location: None,
|
||||
deterministic: false,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -137,10 +137,13 @@ mod tests {
|
||||
body: "Do it".into(),
|
||||
suggested_tools: vec![],
|
||||
requires_confirmation: false,
|
||||
kind: SopStepKind::default(),
|
||||
schema: None,
|
||||
}],
|
||||
cooldown_secs: 0,
|
||||
max_concurrent: 1,
|
||||
location: None,
|
||||
deterministic: false,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -11,8 +11,6 @@ use crate::sop::{SopEngine, SopMetricsCollector};
|
||||
pub struct SopStatusTool {
|
||||
engine: Arc<Mutex<SopEngine>>,
|
||||
collector: Option<Arc<SopMetricsCollector>>,
|
||||
#[cfg(feature = "ampersona-gates")]
|
||||
gate_eval: Option<Arc<crate::sop::GateEvalState>>,
|
||||
}
|
||||
|
||||
impl SopStatusTool {
|
||||
@ -20,8 +18,6 @@ impl SopStatusTool {
|
||||
Self {
|
||||
engine,
|
||||
collector: None,
|
||||
#[cfg(feature = "ampersona-gates")]
|
||||
gate_eval: None,
|
||||
}
|
||||
}
|
||||
|
||||
@ -30,61 +26,11 @@ impl SopStatusTool {
|
||||
self
|
||||
}
|
||||
|
||||
#[cfg(feature = "ampersona-gates")]
|
||||
pub fn with_gate_eval(mut self, gate_eval: Arc<crate::sop::GateEvalState>) -> Self {
|
||||
self.gate_eval = Some(gate_eval);
|
||||
self
|
||||
}
|
||||
|
||||
fn append_gate_status(&self, output: &mut String, include_gate_status: bool) {
|
||||
#[cfg(feature = "ampersona-gates")]
|
||||
if include_gate_status {
|
||||
if let Some(ref ge) = self.gate_eval {
|
||||
if let Some(snap) = ge.phase_state_snapshot() {
|
||||
let _ = writeln!(output, "\nGate Status:");
|
||||
let _ = writeln!(
|
||||
output,
|
||||
" current_phase: {}",
|
||||
snap.current_phase.as_deref().unwrap_or("(none)")
|
||||
);
|
||||
let _ = writeln!(output, " state_rev: {}", snap.state_rev);
|
||||
let _ = writeln!(output, " gates_loaded: {}", ge.gate_count());
|
||||
if let Some(ref tr) = snap.last_transition {
|
||||
let _ = writeln!(
|
||||
output,
|
||||
" last_transition: {} ({} → {})",
|
||||
tr.at.to_rfc3339(),
|
||||
tr.from_phase.as_deref().unwrap_or("(none)"),
|
||||
tr.to_phase,
|
||||
);
|
||||
} else {
|
||||
let _ = writeln!(output, " last_transition: none");
|
||||
}
|
||||
if let Some(ref pt) = snap.pending_transition {
|
||||
let _ = writeln!(
|
||||
output,
|
||||
" pending_transition: {} → {} ({})",
|
||||
pt.from_phase.as_deref().unwrap_or("(none)"),
|
||||
pt.to_phase,
|
||||
pt.decision,
|
||||
);
|
||||
} else {
|
||||
let _ = writeln!(output, " pending_transition: none");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let _ = writeln!(
|
||||
output,
|
||||
"\nGate Status: not available (gate eval not configured)"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "ampersona-gates"))]
|
||||
if include_gate_status {
|
||||
let _ = writeln!(
|
||||
output,
|
||||
"\nGate Status: not available (ampersona-gates feature not enabled)"
|
||||
"\nGate Status: not available (gate evaluation not supported)"
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -309,10 +255,13 @@ mod tests {
|
||||
body: "Do it".into(),
|
||||
suggested_tools: vec![],
|
||||
requires_confirmation: false,
|
||||
kind: SopStepKind::default(),
|
||||
schema: None,
|
||||
}],
|
||||
cooldown_secs: 0,
|
||||
max_concurrent: 2,
|
||||
location: None,
|
||||
deterministic: false,
|
||||
}
|
||||
}
|
||||
|
||||
@ -431,6 +380,7 @@ mod tests {
|
||||
completed_at: Some("2026-02-19T12:01:00Z".into()),
|
||||
}],
|
||||
waiting_since: None,
|
||||
llm_calls_saved: 0,
|
||||
};
|
||||
collector.record_run_complete(&run);
|
||||
|
||||
@ -466,6 +416,7 @@ mod tests {
|
||||
completed_at: Some("2026-02-19T12:01:00Z".into()),
|
||||
}],
|
||||
waiting_since: None,
|
||||
llm_calls_saved: 0,
|
||||
};
|
||||
collector.record_run_complete(&run);
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user