Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a1af84d992 | |||
| e34a804255 | |||
| 6120b3f705 | |||
| f175261e32 | |||
| fd9f66cad7 | |||
| d928ebc92e | |||
| 9fca9f478a | |||
| 7106632b51 | |||
| b834278754 | |||
| 186f6d9797 | |||
| 6cdc92a256 | |||
| 02599dcd3c | |||
| fe64d7ef7e | |||
| 45f953be6d | |||
| 82f29bbcb1 | |||
| 93b5a0b824 | |||
| 08a67c4a2d |
Generated
+1
-1
@@ -7945,7 +7945,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "zeroclawlabs"
|
||||
version = "0.3.1"
|
||||
version = "0.3.3"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-imap",
|
||||
|
||||
+1
-1
@@ -4,7 +4,7 @@ resolver = "2"
|
||||
|
||||
[package]
|
||||
name = "zeroclawlabs"
|
||||
version = "0.3.1"
|
||||
version = "0.3.3"
|
||||
edition = "2021"
|
||||
authors = ["theonlyhennygod"]
|
||||
license = "MIT OR Apache-2.0"
|
||||
|
||||
+60
-6
@@ -195,6 +195,18 @@ const COMPACTION_MAX_SOURCE_CHARS: usize = 12_000;
|
||||
/// Max characters retained in stored compaction summary.
|
||||
const COMPACTION_MAX_SUMMARY_CHARS: usize = 2_000;
|
||||
|
||||
/// Estimate token count for a message history using ~4 chars/token heuristic.
|
||||
/// Includes a small overhead per message for role/framing tokens.
|
||||
fn estimate_history_tokens(history: &[ChatMessage]) -> usize {
|
||||
history
|
||||
.iter()
|
||||
.map(|m| {
|
||||
// ~4 chars per token + ~4 framing tokens per message (role, delimiters)
|
||||
m.content.len().div_ceil(4) + 4
|
||||
})
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Minimum interval between progress sends to avoid flooding the draft channel.
|
||||
pub(crate) const PROGRESS_MIN_INTERVAL_MS: u64 = 500;
|
||||
|
||||
@@ -288,6 +300,7 @@ async fn auto_compact_history(
|
||||
provider: &dyn Provider,
|
||||
model: &str,
|
||||
max_history: usize,
|
||||
max_context_tokens: usize,
|
||||
) -> Result<bool> {
|
||||
let has_system = history.first().map_or(false, |m| m.role == "system");
|
||||
let non_system_count = if has_system {
|
||||
@@ -296,7 +309,10 @@ async fn auto_compact_history(
|
||||
history.len()
|
||||
};
|
||||
|
||||
if non_system_count <= max_history {
|
||||
let estimated_tokens = estimate_history_tokens(history);
|
||||
|
||||
// Trigger compaction when either token budget OR message count is exceeded.
|
||||
if estimated_tokens <= max_context_tokens && non_system_count <= max_history {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
@@ -307,7 +323,16 @@ async fn auto_compact_history(
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let compact_end = start + compact_count;
|
||||
let mut compact_end = start + compact_count;
|
||||
|
||||
// Snap compact_end to a user-turn boundary so we don't split mid-conversation.
|
||||
while compact_end > start && history.get(compact_end).map_or(false, |m| m.role != "user") {
|
||||
compact_end -= 1;
|
||||
}
|
||||
if compact_end <= start {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let to_compact: Vec<ChatMessage> = history[start..compact_end].to_vec();
|
||||
let transcript = build_compaction_transcript(&to_compact);
|
||||
|
||||
@@ -2662,11 +2687,13 @@ pub(crate) async fn run_tool_call_loop(
|
||||
arguments: tool_args.clone(),
|
||||
};
|
||||
|
||||
// Only prompt interactively on CLI; auto-approve on other channels.
|
||||
let decision = if channel_name == "cli" {
|
||||
mgr.prompt_cli(&request)
|
||||
// Interactive CLI: prompt the operator.
|
||||
// Non-interactive (channels): auto-deny since no operator
|
||||
// is present to approve.
|
||||
let decision = if mgr.is_non_interactive() {
|
||||
ApprovalResponse::No
|
||||
} else {
|
||||
ApprovalResponse::Yes
|
||||
mgr.prompt_cli(&request)
|
||||
};
|
||||
|
||||
mgr.record_decision(&tool_name, &tool_args, decision, channel_name);
|
||||
@@ -3508,6 +3535,7 @@ pub async fn run(
|
||||
provider.as_ref(),
|
||||
model_name,
|
||||
config.agent.max_history_messages,
|
||||
config.agent.max_context_tokens,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -6449,4 +6477,30 @@ Let me check the result."#;
|
||||
let result = filter_tool_specs_for_turn(specs, &groups, "BROWSE the site");
|
||||
assert_eq!(result.len(), 1);
|
||||
}
|
||||
|
||||
// ── Token-based compaction tests ──────────────────────────
|
||||
|
||||
#[test]
|
||||
fn estimate_history_tokens_empty() {
|
||||
assert_eq!(super::estimate_history_tokens(&[]), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn estimate_history_tokens_single_message() {
|
||||
let history = vec![ChatMessage::user("hello world")]; // 11 chars
|
||||
let tokens = super::estimate_history_tokens(&history);
|
||||
// 11.div_ceil(4) + 4 = 3 + 4 = 7
|
||||
assert_eq!(tokens, 7);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn estimate_history_tokens_multiple_messages() {
|
||||
let history = vec![
|
||||
ChatMessage::system("You are helpful."), // 16 chars → 4 + 4 = 8
|
||||
ChatMessage::user("What is Rust?"), // 13 chars → 4 + 4 = 8
|
||||
ChatMessage::assistant("A language."), // 11 chars → 3 + 4 = 7
|
||||
];
|
||||
let tokens = super::estimate_history_tokens(&history);
|
||||
assert_eq!(tokens, 23);
|
||||
}
|
||||
}
|
||||
|
||||
+128
-4
@@ -44,11 +44,18 @@ pub struct ApprovalLogEntry {
|
||||
|
||||
// ── ApprovalManager ──────────────────────────────────────────────
|
||||
|
||||
/// Manages the interactive approval workflow.
|
||||
/// Manages the approval workflow for tool calls.
|
||||
///
|
||||
/// - Checks config-level `auto_approve` / `always_ask` lists
|
||||
/// - Maintains a session-scoped "always" allowlist
|
||||
/// - Records an audit trail of all decisions
|
||||
///
|
||||
/// Two modes:
|
||||
/// - **Interactive** (CLI): tools needing approval trigger a stdin prompt.
|
||||
/// - **Non-interactive** (channels): tools needing approval are auto-denied
|
||||
/// because there is no interactive operator to approve them. `auto_approve`
|
||||
/// policy is still enforced, and `always_ask` / supervised-default tools are
|
||||
/// denied rather than silently allowed.
|
||||
pub struct ApprovalManager {
|
||||
/// Tools that never need approval (from config).
|
||||
auto_approve: HashSet<String>,
|
||||
@@ -56,6 +63,9 @@ pub struct ApprovalManager {
|
||||
always_ask: HashSet<String>,
|
||||
/// Autonomy level from config.
|
||||
autonomy_level: AutonomyLevel,
|
||||
/// When `true`, tools that would require interactive approval are
|
||||
/// auto-denied instead. Used for channel-driven (non-CLI) runs.
|
||||
non_interactive: bool,
|
||||
/// Session-scoped allowlist built from "Always" responses.
|
||||
session_allowlist: Mutex<HashSet<String>>,
|
||||
/// Audit trail of approval decisions.
|
||||
@@ -63,17 +73,40 @@ pub struct ApprovalManager {
|
||||
}
|
||||
|
||||
impl ApprovalManager {
|
||||
/// Create from autonomy config.
|
||||
/// Create an interactive (CLI) approval manager from autonomy config.
|
||||
pub fn from_config(config: &AutonomyConfig) -> Self {
|
||||
Self {
|
||||
auto_approve: config.auto_approve.iter().cloned().collect(),
|
||||
always_ask: config.always_ask.iter().cloned().collect(),
|
||||
autonomy_level: config.level,
|
||||
non_interactive: false,
|
||||
session_allowlist: Mutex::new(HashSet::new()),
|
||||
audit_log: Mutex::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a non-interactive approval manager for channel-driven runs.
|
||||
///
|
||||
/// Enforces the same `auto_approve` / `always_ask` / supervised policies
|
||||
/// as the CLI manager, but tools that would require interactive approval
|
||||
/// are auto-denied instead of prompting (since there is no operator).
|
||||
pub fn for_non_interactive(config: &AutonomyConfig) -> Self {
|
||||
Self {
|
||||
auto_approve: config.auto_approve.iter().cloned().collect(),
|
||||
always_ask: config.always_ask.iter().cloned().collect(),
|
||||
autonomy_level: config.level,
|
||||
non_interactive: true,
|
||||
session_allowlist: Mutex::new(HashSet::new()),
|
||||
audit_log: Mutex::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `true` when this manager operates in non-interactive mode
|
||||
/// (i.e. for channel-driven runs where no operator can approve).
|
||||
pub fn is_non_interactive(&self) -> bool {
|
||||
self.non_interactive
|
||||
}
|
||||
|
||||
/// Check whether a tool call requires interactive approval.
|
||||
///
|
||||
/// Returns `true` if the call needs a prompt, `false` if it can proceed.
|
||||
@@ -147,8 +180,8 @@ impl ApprovalManager {
|
||||
|
||||
/// Prompt the user on the CLI and return their decision.
|
||||
///
|
||||
/// For non-CLI channels, returns `Yes` automatically (interactive
|
||||
/// approval is only supported on CLI for now).
|
||||
/// Only called for interactive (CLI) managers. Non-interactive managers
|
||||
/// auto-deny in the tool-call loop before reaching this point.
|
||||
pub fn prompt_cli(&self, request: &ApprovalRequest) -> ApprovalResponse {
|
||||
prompt_cli_interactive(request)
|
||||
}
|
||||
@@ -401,6 +434,97 @@ mod tests {
|
||||
assert!(summary.contains("just a string"));
|
||||
}
|
||||
|
||||
// ── non-interactive (channel) mode ────────────────────────
|
||||
|
||||
#[test]
|
||||
fn non_interactive_manager_reports_non_interactive() {
|
||||
let mgr = ApprovalManager::for_non_interactive(&supervised_config());
|
||||
assert!(mgr.is_non_interactive());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn interactive_manager_reports_interactive() {
|
||||
let mgr = ApprovalManager::from_config(&supervised_config());
|
||||
assert!(!mgr.is_non_interactive());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_interactive_auto_approve_tools_skip_approval() {
|
||||
let mgr = ApprovalManager::for_non_interactive(&supervised_config());
|
||||
// auto_approve tools (file_read, memory_recall) should not need approval.
|
||||
assert!(!mgr.needs_approval("file_read"));
|
||||
assert!(!mgr.needs_approval("memory_recall"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_interactive_always_ask_tools_need_approval() {
|
||||
let mgr = ApprovalManager::for_non_interactive(&supervised_config());
|
||||
// always_ask tools (shell) still report as needing approval,
|
||||
// so the tool-call loop will auto-deny them in non-interactive mode.
|
||||
assert!(mgr.needs_approval("shell"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_interactive_unknown_tools_need_approval_in_supervised() {
|
||||
let mgr = ApprovalManager::for_non_interactive(&supervised_config());
|
||||
// Unknown tools in supervised mode need approval (will be auto-denied
|
||||
// by the tool-call loop for non-interactive managers).
|
||||
assert!(mgr.needs_approval("file_write"));
|
||||
assert!(mgr.needs_approval("http_request"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_interactive_full_autonomy_never_needs_approval() {
|
||||
let mgr = ApprovalManager::for_non_interactive(&full_config());
|
||||
// Full autonomy means no approval needed, even in non-interactive mode.
|
||||
assert!(!mgr.needs_approval("shell"));
|
||||
assert!(!mgr.needs_approval("file_write"));
|
||||
assert!(!mgr.needs_approval("anything"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_interactive_readonly_never_needs_approval() {
|
||||
let config = AutonomyConfig {
|
||||
level: AutonomyLevel::ReadOnly,
|
||||
..AutonomyConfig::default()
|
||||
};
|
||||
let mgr = ApprovalManager::for_non_interactive(&config);
|
||||
// ReadOnly blocks execution elsewhere; approval manager does not prompt.
|
||||
assert!(!mgr.needs_approval("shell"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_interactive_session_allowlist_still_works() {
|
||||
let mgr = ApprovalManager::for_non_interactive(&supervised_config());
|
||||
assert!(mgr.needs_approval("file_write"));
|
||||
|
||||
// Simulate an "Always" decision (would come from a prior channel run
|
||||
// if the tool was auto-approved somehow, e.g. via config change).
|
||||
mgr.record_decision(
|
||||
"file_write",
|
||||
&serde_json::json!({"path": "test.txt"}),
|
||||
ApprovalResponse::Always,
|
||||
"telegram",
|
||||
);
|
||||
|
||||
assert!(!mgr.needs_approval("file_write"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_interactive_always_ask_overrides_session_allowlist() {
|
||||
let mgr = ApprovalManager::for_non_interactive(&supervised_config());
|
||||
|
||||
mgr.record_decision(
|
||||
"shell",
|
||||
&serde_json::json!({"command": "ls"}),
|
||||
ApprovalResponse::Always,
|
||||
"telegram",
|
||||
);
|
||||
|
||||
// shell is in always_ask, so it still needs approval even after "Always".
|
||||
assert!(mgr.needs_approval("shell"));
|
||||
}
|
||||
|
||||
// ── ApprovalResponse serde ───────────────────────────────
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -746,7 +746,7 @@ impl Channel for MatrixChannel {
|
||||
MessageType::Notice(content) => (content.body.clone(), None),
|
||||
MessageType::Image(content) => {
|
||||
let dl = media_info(&content.source, &content.body);
|
||||
(format!("[image: {}]", content.body), dl)
|
||||
(format!("[IMAGE:{}]", content.body), dl)
|
||||
}
|
||||
MessageType::File(content) => {
|
||||
let dl = media_info(&content.source, &content.body);
|
||||
|
||||
+170
-1
@@ -31,6 +31,7 @@ pub mod nextcloud_talk;
|
||||
#[cfg(feature = "channel-nostr")]
|
||||
pub mod nostr;
|
||||
pub mod qq;
|
||||
pub mod session_store;
|
||||
pub mod signal;
|
||||
pub mod slack;
|
||||
pub mod telegram;
|
||||
@@ -75,6 +76,7 @@ pub use whatsapp::WhatsAppChannel;
|
||||
pub use whatsapp_web::WhatsAppWebChannel;
|
||||
|
||||
use crate::agent::loop_::{build_tool_instructions, run_tool_call_loop, scrub_credentials};
|
||||
use crate::approval::ApprovalManager;
|
||||
use crate::config::Config;
|
||||
use crate::identity;
|
||||
use crate::memory::{self, Memory};
|
||||
@@ -312,6 +314,12 @@ struct ChannelRuntimeContext {
|
||||
model_routes: Arc<Vec<crate::config::ModelRouteConfig>>,
|
||||
ack_reactions: bool,
|
||||
show_tool_calls: bool,
|
||||
session_store: Option<Arc<session_store::SessionStore>>,
|
||||
/// Non-interactive approval manager for channel-driven runs.
|
||||
/// Enforces `auto_approve` / `always_ask` / supervised policy from
|
||||
/// `[autonomy]` config; auto-denies tools that would need interactive
|
||||
/// approval since no operator is present on channel runs.
|
||||
approval_manager: Arc<ApprovalManager>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -965,6 +973,13 @@ fn proactive_trim_turns(turns: &mut Vec<ChatMessage>, budget: usize) -> usize {
|
||||
}
|
||||
|
||||
fn append_sender_turn(ctx: &ChannelRuntimeContext, sender_key: &str, turn: ChatMessage) {
|
||||
// Persist to JSONL before adding to in-memory history.
|
||||
if let Some(ref store) = ctx.session_store {
|
||||
if let Err(e) = store.append(sender_key, &turn) {
|
||||
tracing::warn!("Failed to persist session turn: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
let mut histories = ctx
|
||||
.conversation_histories
|
||||
.lock()
|
||||
@@ -2016,7 +2031,7 @@ async fn process_channel_message(
|
||||
route.model.as_str(),
|
||||
runtime_defaults.temperature,
|
||||
true,
|
||||
None,
|
||||
Some(&*ctx.approval_manager),
|
||||
msg.channel.as_str(),
|
||||
&ctx.multimodal,
|
||||
ctx.max_tool_iterations,
|
||||
@@ -2186,6 +2201,29 @@ async fn process_channel_message(
|
||||
&history_key,
|
||||
ChatMessage::assistant(&history_response),
|
||||
);
|
||||
|
||||
// Fire-and-forget LLM-driven memory consolidation.
|
||||
if ctx.auto_save_memory && msg.content.chars().count() >= AUTOSAVE_MIN_MESSAGE_CHARS {
|
||||
let provider = Arc::clone(&ctx.provider);
|
||||
let model = ctx.model.to_string();
|
||||
let memory = Arc::clone(&ctx.memory);
|
||||
let user_msg = msg.content.clone();
|
||||
let assistant_resp = delivered_response.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = crate::memory::consolidation::consolidate_turn(
|
||||
provider.as_ref(),
|
||||
&model,
|
||||
memory.as_ref(),
|
||||
&user_msg,
|
||||
&assistant_resp,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::debug!("Memory consolidation skipped: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
println!(
|
||||
" 🤖 Reply ({}ms): {}",
|
||||
started_at.elapsed().as_millis(),
|
||||
@@ -3805,8 +3843,43 @@ pub async fn start_channels(config: Config) -> Result<()> {
|
||||
model_routes: Arc::new(config.model_routes.clone()),
|
||||
ack_reactions: config.channels_config.ack_reactions,
|
||||
show_tool_calls: config.channels_config.show_tool_calls,
|
||||
session_store: if config.channels_config.session_persistence {
|
||||
match session_store::SessionStore::new(&config.workspace_dir) {
|
||||
Ok(store) => {
|
||||
tracing::info!("📂 Session persistence enabled");
|
||||
Some(Arc::new(store))
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("Session persistence disabled: {e}");
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
},
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(&config.autonomy)),
|
||||
});
|
||||
|
||||
// Hydrate in-memory conversation histories from persisted JSONL session files.
|
||||
if let Some(ref store) = runtime_ctx.session_store {
|
||||
let mut hydrated = 0usize;
|
||||
let mut histories = runtime_ctx
|
||||
.conversation_histories
|
||||
.lock()
|
||||
.unwrap_or_else(|e| e.into_inner());
|
||||
for key in store.list_sessions() {
|
||||
let msgs = store.load(&key);
|
||||
if !msgs.is_empty() {
|
||||
hydrated += 1;
|
||||
histories.insert(key, msgs);
|
||||
}
|
||||
}
|
||||
drop(histories);
|
||||
if hydrated > 0 {
|
||||
tracing::info!("📂 Restored {hydrated} session(s) from disk");
|
||||
}
|
||||
}
|
||||
|
||||
run_message_dispatch_loop(rx, runtime_ctx, max_in_flight_messages).await;
|
||||
|
||||
// Wait for all channel tasks
|
||||
@@ -4072,6 +4145,10 @@ mod tests {
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
};
|
||||
|
||||
assert!(compact_sender_history(&ctx, &sender));
|
||||
@@ -4175,6 +4252,10 @@ mod tests {
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
};
|
||||
|
||||
append_sender_turn(&ctx, &sender, ChatMessage::user("hello"));
|
||||
@@ -4234,6 +4315,10 @@ mod tests {
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
};
|
||||
|
||||
assert!(rollback_orphan_user_turn(&ctx, &sender, "pending"));
|
||||
@@ -4751,6 +4836,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -4818,6 +4907,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -4899,6 +4992,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -4965,6 +5062,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -5041,6 +5142,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -5137,6 +5242,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -5215,6 +5324,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -5308,6 +5421,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -5386,6 +5503,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -5454,6 +5575,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -5633,6 +5758,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(4);
|
||||
@@ -5720,6 +5849,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(8);
|
||||
@@ -5817,11 +5950,15 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
},
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
multimodal: crate::config::MultimodalConfig::default(),
|
||||
hooks: None,
|
||||
non_cli_excluded_tools: Arc::new(Vec::new()),
|
||||
tool_call_dedup_exempt: Arc::new(Vec::new()),
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(8);
|
||||
@@ -5921,6 +6058,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(8);
|
||||
@@ -6002,6 +6143,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -6068,6 +6213,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -6692,6 +6841,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -6784,6 +6937,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -6876,6 +7033,10 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -7432,6 +7593,10 @@ This is an example JSON object for profile settings."#;
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
// Simulate a photo attachment message with [IMAGE:] marker.
|
||||
@@ -7505,6 +7670,10 @@ This is an example JSON object for profile settings."#;
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
|
||||
@@ -0,0 +1,200 @@
|
||||
//! JSONL-based session persistence for channel conversations.
|
||||
//!
|
||||
//! Each session (keyed by `channel_sender` or `channel_thread_sender`) is stored
|
||||
//! as an append-only JSONL file in `{workspace}/sessions/`. Messages are appended
|
||||
//! one-per-line as JSON, never modifying old lines. On daemon restart, sessions
|
||||
//! are loaded from disk to restore conversation context.
|
||||
|
||||
use crate::providers::traits::ChatMessage;
|
||||
use std::io::{BufRead, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
/// Append-only JSONL session store for channel conversations.
|
||||
pub struct SessionStore {
|
||||
sessions_dir: PathBuf,
|
||||
}
|
||||
|
||||
impl SessionStore {
|
||||
/// Create a new session store, ensuring the sessions directory exists.
|
||||
pub fn new(workspace_dir: &Path) -> std::io::Result<Self> {
|
||||
let sessions_dir = workspace_dir.join("sessions");
|
||||
std::fs::create_dir_all(&sessions_dir)?;
|
||||
Ok(Self { sessions_dir })
|
||||
}
|
||||
|
||||
/// Compute the file path for a session key, sanitizing for filesystem safety.
|
||||
fn session_path(&self, session_key: &str) -> PathBuf {
|
||||
let safe_key: String = session_key
|
||||
.chars()
|
||||
.map(|c| {
|
||||
if c.is_alphanumeric() || c == '_' || c == '-' {
|
||||
c
|
||||
} else {
|
||||
'_'
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
self.sessions_dir.join(format!("{safe_key}.jsonl"))
|
||||
}
|
||||
|
||||
/// Load all messages for a session from its JSONL file.
|
||||
/// Returns an empty vec if the file does not exist or is unreadable.
|
||||
pub fn load(&self, session_key: &str) -> Vec<ChatMessage> {
|
||||
let path = self.session_path(session_key);
|
||||
let file = match std::fs::File::open(&path) {
|
||||
Ok(f) => f,
|
||||
Err(_) => return Vec::new(),
|
||||
};
|
||||
|
||||
let reader = std::io::BufReader::new(file);
|
||||
let mut messages = Vec::new();
|
||||
|
||||
for line in reader.lines() {
|
||||
let Ok(line) = line else { continue };
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
if let Ok(msg) = serde_json::from_str::<ChatMessage>(trimmed) {
|
||||
messages.push(msg);
|
||||
}
|
||||
}
|
||||
|
||||
messages
|
||||
}
|
||||
|
||||
/// Append a single message to the session JSONL file.
|
||||
pub fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
|
||||
let path = self.session_path(session_key);
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&path)?;
|
||||
|
||||
let json = serde_json::to_string(message)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
|
||||
|
||||
writeln!(file, "{json}")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// List all session keys that have files on disk.
|
||||
pub fn list_sessions(&self) -> Vec<String> {
|
||||
let entries = match std::fs::read_dir(&self.sessions_dir) {
|
||||
Ok(e) => e,
|
||||
Err(_) => return Vec::new(),
|
||||
};
|
||||
|
||||
entries
|
||||
.filter_map(|entry| {
|
||||
let entry = entry.ok()?;
|
||||
let name = entry.file_name().into_string().ok()?;
|
||||
name.strip_suffix(".jsonl").map(String::from)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
fn round_trip_append_and_load() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let store = SessionStore::new(tmp.path()).unwrap();
|
||||
|
||||
store
|
||||
.append("telegram_user123", &ChatMessage::user("hello"))
|
||||
.unwrap();
|
||||
store
|
||||
.append("telegram_user123", &ChatMessage::assistant("hi there"))
|
||||
.unwrap();
|
||||
|
||||
let messages = store.load("telegram_user123");
|
||||
assert_eq!(messages.len(), 2);
|
||||
assert_eq!(messages[0].role, "user");
|
||||
assert_eq!(messages[0].content, "hello");
|
||||
assert_eq!(messages[1].role, "assistant");
|
||||
assert_eq!(messages[1].content, "hi there");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn load_nonexistent_session_returns_empty() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let store = SessionStore::new(tmp.path()).unwrap();
|
||||
|
||||
let messages = store.load("nonexistent");
|
||||
assert!(messages.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn key_sanitization() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let store = SessionStore::new(tmp.path()).unwrap();
|
||||
|
||||
// Keys with special chars should be sanitized
|
||||
store
|
||||
.append("slack/thread:123/user", &ChatMessage::user("test"))
|
||||
.unwrap();
|
||||
|
||||
let messages = store.load("slack/thread:123/user");
|
||||
assert_eq!(messages.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn list_sessions_returns_keys() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let store = SessionStore::new(tmp.path()).unwrap();
|
||||
|
||||
store
|
||||
.append("telegram_alice", &ChatMessage::user("hi"))
|
||||
.unwrap();
|
||||
store
|
||||
.append("discord_bob", &ChatMessage::user("hey"))
|
||||
.unwrap();
|
||||
|
||||
let mut sessions = store.list_sessions();
|
||||
sessions.sort();
|
||||
assert_eq!(sessions.len(), 2);
|
||||
assert!(sessions.contains(&"discord_bob".to_string()));
|
||||
assert!(sessions.contains(&"telegram_alice".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn append_is_truly_append_only() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let store = SessionStore::new(tmp.path()).unwrap();
|
||||
let key = "test_session";
|
||||
|
||||
store.append(key, &ChatMessage::user("msg1")).unwrap();
|
||||
store.append(key, &ChatMessage::user("msg2")).unwrap();
|
||||
|
||||
// Read raw file to verify append-only format
|
||||
let path = store.session_path(key);
|
||||
let content = std::fs::read_to_string(&path).unwrap();
|
||||
let lines: Vec<&str> = content.trim().lines().collect();
|
||||
assert_eq!(lines.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handles_corrupt_lines_gracefully() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let store = SessionStore::new(tmp.path()).unwrap();
|
||||
let key = "corrupt_test";
|
||||
|
||||
// Write valid message + corrupt line + valid message
|
||||
let path = store.session_path(key);
|
||||
std::fs::create_dir_all(path.parent().unwrap()).unwrap();
|
||||
let mut file = std::fs::File::create(&path).unwrap();
|
||||
writeln!(file, r#"{{"role":"user","content":"hello"}}"#).unwrap();
|
||||
writeln!(file, "this is not valid json").unwrap();
|
||||
writeln!(file, r#"{{"role":"assistant","content":"world"}}"#).unwrap();
|
||||
|
||||
let messages = store.load(key);
|
||||
assert_eq!(messages.len(), 2);
|
||||
assert_eq!(messages[0].content, "hello");
|
||||
assert_eq!(messages[1].content, "world");
|
||||
}
|
||||
}
|
||||
@@ -791,6 +791,11 @@ pub struct AgentConfig {
|
||||
/// Maximum conversation history messages retained per session. Default: `50`.
|
||||
#[serde(default = "default_agent_max_history_messages")]
|
||||
pub max_history_messages: usize,
|
||||
/// Maximum estimated tokens for conversation history before compaction triggers.
|
||||
/// Uses ~4 chars/token heuristic. When this threshold is exceeded, older messages
|
||||
/// are summarized to preserve context while staying within budget. Default: `32000`.
|
||||
#[serde(default = "default_agent_max_context_tokens")]
|
||||
pub max_context_tokens: usize,
|
||||
/// Enable parallel tool execution within a single iteration. Default: `false`.
|
||||
#[serde(default)]
|
||||
pub parallel_tools: bool,
|
||||
@@ -817,6 +822,10 @@ fn default_agent_max_history_messages() -> usize {
|
||||
50
|
||||
}
|
||||
|
||||
fn default_agent_max_context_tokens() -> usize {
|
||||
32_000
|
||||
}
|
||||
|
||||
fn default_agent_tool_dispatcher() -> String {
|
||||
"auto".into()
|
||||
}
|
||||
@@ -827,6 +836,7 @@ impl Default for AgentConfig {
|
||||
compact_context: false,
|
||||
max_tool_iterations: default_agent_max_tool_iterations(),
|
||||
max_history_messages: default_agent_max_history_messages(),
|
||||
max_context_tokens: default_agent_max_context_tokens(),
|
||||
parallel_tools: false,
|
||||
tool_dispatcher: default_agent_tool_dispatcher(),
|
||||
tool_call_dedup_exempt: Vec::new(),
|
||||
@@ -1413,6 +1423,10 @@ pub struct HttpRequestConfig {
|
||||
/// Request timeout in seconds (default: 30)
|
||||
#[serde(default = "default_http_timeout_secs")]
|
||||
pub timeout_secs: u64,
|
||||
/// Allow requests to private/LAN hosts (RFC 1918, loopback, link-local, .local).
|
||||
/// Default: false (deny private hosts for SSRF protection).
|
||||
#[serde(default)]
|
||||
pub allow_private_hosts: bool,
|
||||
}
|
||||
|
||||
impl Default for HttpRequestConfig {
|
||||
@@ -1422,6 +1436,7 @@ impl Default for HttpRequestConfig {
|
||||
allowed_domains: vec![],
|
||||
max_response_size: default_http_max_response_size(),
|
||||
timeout_secs: default_http_timeout_secs(),
|
||||
allow_private_hosts: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3052,6 +3067,7 @@ impl<T: ChannelConfig> crate::config::traits::ConfigHandle for ConfigWrapper<T>
|
||||
///
|
||||
/// Each channel sub-section (e.g. `telegram`, `discord`) is optional;
|
||||
/// setting it to `Some(...)` enables that channel.
|
||||
#[allow(clippy::struct_excessive_bools)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
|
||||
pub struct ChannelsConfig {
|
||||
/// Enable the CLI interactive channel. Default: `true`.
|
||||
@@ -3114,6 +3130,10 @@ pub struct ChannelsConfig {
|
||||
/// not forwarded as individual channel messages. Default: `true`.
|
||||
#[serde(default = "default_true")]
|
||||
pub show_tool_calls: bool,
|
||||
/// Persist channel conversation history to JSONL files so sessions survive
|
||||
/// daemon restarts. Files are stored in `{workspace}/sessions/`. Default: `true`.
|
||||
#[serde(default = "default_true")]
|
||||
pub session_persistence: bool,
|
||||
}
|
||||
|
||||
impl ChannelsConfig {
|
||||
@@ -3248,6 +3268,7 @@ impl Default for ChannelsConfig {
|
||||
message_timeout_secs: default_channel_message_timeout_secs(),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_persistence: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6269,6 +6290,7 @@ default_temperature = 0.7
|
||||
message_timeout_secs: 300,
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_persistence: true,
|
||||
},
|
||||
memory: MemoryConfig::default(),
|
||||
storage: StorageConfig::default(),
|
||||
@@ -6983,6 +7005,7 @@ allowed_users = ["@ops:matrix.org"]
|
||||
message_timeout_secs: 300,
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_persistence: true,
|
||||
};
|
||||
let toml_str = toml::to_string_pretty(&c).unwrap();
|
||||
let parsed: ChannelsConfig = toml::from_str(&toml_str).unwrap();
|
||||
@@ -7210,6 +7233,7 @@ channel_id = "C123"
|
||||
message_timeout_secs: 300,
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_persistence: true,
|
||||
};
|
||||
let toml_str = toml::to_string_pretty(&c).unwrap();
|
||||
let parsed: ChannelsConfig = toml::from_str(&toml_str).unwrap();
|
||||
|
||||
+172
-21
@@ -152,44 +152,122 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
|
||||
crate::CronCommands::Add {
|
||||
expression,
|
||||
tz,
|
||||
agent,
|
||||
command,
|
||||
} => {
|
||||
let schedule = Schedule::Cron {
|
||||
expr: expression,
|
||||
tz,
|
||||
};
|
||||
let job = add_shell_job(config, None, schedule, &command)?;
|
||||
println!("✅ Added cron job {}", job.id);
|
||||
println!(" Expr: {}", job.expression);
|
||||
println!(" Next: {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
if agent {
|
||||
let job = add_agent_job(
|
||||
config,
|
||||
None,
|
||||
schedule,
|
||||
&command,
|
||||
SessionTarget::Isolated,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
)?;
|
||||
println!("✅ Added agent cron job {}", job.id);
|
||||
println!(" Expr : {}", job.expression);
|
||||
println!(" Next : {}", job.next_run.to_rfc3339());
|
||||
println!(" Prompt: {}", job.prompt.as_deref().unwrap_or_default());
|
||||
} else {
|
||||
let job = add_shell_job(config, None, schedule, &command)?;
|
||||
println!("✅ Added cron job {}", job.id);
|
||||
println!(" Expr: {}", job.expression);
|
||||
println!(" Next: {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
crate::CronCommands::AddAt { at, command } => {
|
||||
crate::CronCommands::AddAt { at, agent, command } => {
|
||||
let at = chrono::DateTime::parse_from_rfc3339(&at)
|
||||
.map_err(|e| anyhow::anyhow!("Invalid RFC3339 timestamp for --at: {e}"))?
|
||||
.with_timezone(&chrono::Utc);
|
||||
let schedule = Schedule::At { at };
|
||||
let job = add_shell_job(config, None, schedule, &command)?;
|
||||
println!("✅ Added one-shot cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
if agent {
|
||||
let job = add_agent_job(
|
||||
config,
|
||||
None,
|
||||
schedule,
|
||||
&command,
|
||||
SessionTarget::Isolated,
|
||||
None,
|
||||
None,
|
||||
true,
|
||||
)?;
|
||||
println!("✅ Added one-shot agent cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
println!(" Prompt: {}", job.prompt.as_deref().unwrap_or_default());
|
||||
} else {
|
||||
let job = add_shell_job(config, None, schedule, &command)?;
|
||||
println!("✅ Added one-shot cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
crate::CronCommands::AddEvery { every_ms, command } => {
|
||||
crate::CronCommands::AddEvery {
|
||||
every_ms,
|
||||
agent,
|
||||
command,
|
||||
} => {
|
||||
let schedule = Schedule::Every { every_ms };
|
||||
let job = add_shell_job(config, None, schedule, &command)?;
|
||||
println!("✅ Added interval cron job {}", job.id);
|
||||
println!(" Every(ms): {every_ms}");
|
||||
println!(" Next : {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
if agent {
|
||||
let job = add_agent_job(
|
||||
config,
|
||||
None,
|
||||
schedule,
|
||||
&command,
|
||||
SessionTarget::Isolated,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
)?;
|
||||
println!("✅ Added interval agent cron job {}", job.id);
|
||||
println!(" Every(ms): {every_ms}");
|
||||
println!(" Next : {}", job.next_run.to_rfc3339());
|
||||
println!(" Prompt : {}", job.prompt.as_deref().unwrap_or_default());
|
||||
} else {
|
||||
let job = add_shell_job(config, None, schedule, &command)?;
|
||||
println!("✅ Added interval cron job {}", job.id);
|
||||
println!(" Every(ms): {every_ms}");
|
||||
println!(" Next : {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
crate::CronCommands::Once { delay, command } => {
|
||||
let job = add_once(config, &delay, &command)?;
|
||||
println!("✅ Added one-shot cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
crate::CronCommands::Once {
|
||||
delay,
|
||||
agent,
|
||||
command,
|
||||
} => {
|
||||
if agent {
|
||||
let duration = parse_delay(&delay)?;
|
||||
let at = chrono::Utc::now() + duration;
|
||||
let schedule = Schedule::At { at };
|
||||
let job = add_agent_job(
|
||||
config,
|
||||
None,
|
||||
schedule,
|
||||
&command,
|
||||
SessionTarget::Isolated,
|
||||
None,
|
||||
None,
|
||||
true,
|
||||
)?;
|
||||
println!("✅ Added one-shot agent cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
println!(" Prompt: {}", job.prompt.as_deref().unwrap_or_default());
|
||||
} else {
|
||||
let job = add_once(config, &delay, &command)?;
|
||||
println!("✅ Added one-shot cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
crate::CronCommands::Update {
|
||||
@@ -686,4 +764,77 @@ mod tests {
|
||||
.to_string()
|
||||
.contains("blocked by security policy"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cli_agent_flag_creates_agent_job() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let config = test_config(&tmp);
|
||||
|
||||
handle_command(
|
||||
crate::CronCommands::Add {
|
||||
expression: "*/15 * * * *".into(),
|
||||
tz: None,
|
||||
agent: true,
|
||||
command: "Check server health: disk space, memory, CPU load".into(),
|
||||
},
|
||||
&config,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let jobs = list_jobs(&config).unwrap();
|
||||
assert_eq!(jobs.len(), 1);
|
||||
assert_eq!(jobs[0].job_type, JobType::Agent);
|
||||
assert_eq!(
|
||||
jobs[0].prompt.as_deref(),
|
||||
Some("Check server health: disk space, memory, CPU load")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cli_agent_flag_bypasses_shell_security_validation() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let mut config = test_config(&tmp);
|
||||
config.autonomy.allowed_commands = vec!["echo".into()];
|
||||
config.autonomy.level = crate::security::AutonomyLevel::Supervised;
|
||||
|
||||
// Without --agent, a natural language string would be blocked by shell
|
||||
// security policy. With --agent, it routes to agent job and skips
|
||||
// shell validation entirely.
|
||||
let result = handle_command(
|
||||
crate::CronCommands::Add {
|
||||
expression: "*/15 * * * *".into(),
|
||||
tz: None,
|
||||
agent: true,
|
||||
command: "Check server health: disk space, memory, CPU load".into(),
|
||||
},
|
||||
&config,
|
||||
);
|
||||
assert!(result.is_ok());
|
||||
|
||||
let jobs = list_jobs(&config).unwrap();
|
||||
assert_eq!(jobs.len(), 1);
|
||||
assert_eq!(jobs[0].job_type, JobType::Agent);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cli_without_agent_flag_defaults_to_shell_job() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let config = test_config(&tmp);
|
||||
|
||||
handle_command(
|
||||
crate::CronCommands::Add {
|
||||
expression: "*/5 * * * *".into(),
|
||||
tz: None,
|
||||
agent: false,
|
||||
command: "echo ok".into(),
|
||||
},
|
||||
&config,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let jobs = list_jobs(&config).unwrap();
|
||||
assert_eq!(jobs.len(), 1);
|
||||
assert_eq!(jobs[0].job_type, JobType::Shell);
|
||||
assert_eq!(jobs[0].command, "echo ok");
|
||||
}
|
||||
}
|
||||
|
||||
+19
-6
@@ -280,15 +280,19 @@ Times are evaluated in UTC by default; use --tz with an IANA \
|
||||
timezone name to override.
|
||||
|
||||
Examples:
|
||||
zeroclaw cron add '0 9 * * 1-5' 'Good morning' --tz America/New_York
|
||||
zeroclaw cron add '*/30 * * * *' 'Check system health'")]
|
||||
zeroclaw cron add '0 9 * * 1-5' 'Good morning' --tz America/New_York --agent
|
||||
zeroclaw cron add '*/30 * * * *' 'Check system health' --agent
|
||||
zeroclaw cron add '*/5 * * * *' 'echo ok'")]
|
||||
Add {
|
||||
/// Cron expression
|
||||
expression: String,
|
||||
/// Optional IANA timezone (e.g. America/Los_Angeles)
|
||||
#[arg(long)]
|
||||
tz: Option<String>,
|
||||
/// Command to run
|
||||
/// Treat the argument as an agent prompt instead of a shell command
|
||||
#[arg(long)]
|
||||
agent: bool,
|
||||
/// Command (shell) or prompt (agent) to run
|
||||
command: String,
|
||||
},
|
||||
/// Add a one-shot scheduled task at an RFC3339 timestamp
|
||||
@@ -303,7 +307,10 @@ Examples:
|
||||
AddAt {
|
||||
/// One-shot timestamp in RFC3339 format
|
||||
at: String,
|
||||
/// Command to run
|
||||
/// Treat the argument as an agent prompt instead of a shell command
|
||||
#[arg(long)]
|
||||
agent: bool,
|
||||
/// Command (shell) or prompt (agent) to run
|
||||
command: String,
|
||||
},
|
||||
/// Add a fixed-interval scheduled task
|
||||
@@ -318,7 +325,10 @@ Examples:
|
||||
AddEvery {
|
||||
/// Interval in milliseconds
|
||||
every_ms: u64,
|
||||
/// Command to run
|
||||
/// Treat the argument as an agent prompt instead of a shell command
|
||||
#[arg(long)]
|
||||
agent: bool,
|
||||
/// Command (shell) or prompt (agent) to run
|
||||
command: String,
|
||||
},
|
||||
/// Add a one-shot delayed task (e.g. "30m", "2h", "1d")
|
||||
@@ -335,7 +345,10 @@ Examples:
|
||||
Once {
|
||||
/// Delay duration
|
||||
delay: String,
|
||||
/// Command to run
|
||||
/// Treat the argument as an agent prompt instead of a shell command
|
||||
#[arg(long)]
|
||||
agent: bool,
|
||||
/// Command (shell) or prompt (agent) to run
|
||||
command: String,
|
||||
},
|
||||
/// Remove a scheduled task
|
||||
|
||||
+5
-4
@@ -325,11 +325,12 @@ override with --tz and an IANA timezone name.
|
||||
|
||||
Examples:
|
||||
zeroclaw cron list
|
||||
zeroclaw cron add '0 9 * * 1-5' 'Good morning' --tz America/New_York
|
||||
zeroclaw cron add '*/30 * * * *' 'Check system health'
|
||||
zeroclaw cron add-at 2025-01-15T14:00:00Z 'Send reminder'
|
||||
zeroclaw cron add '0 9 * * 1-5' 'Good morning' --tz America/New_York --agent
|
||||
zeroclaw cron add '*/30 * * * *' 'Check system health' --agent
|
||||
zeroclaw cron add '*/5 * * * *' 'echo ok'
|
||||
zeroclaw cron add-at 2025-01-15T14:00:00Z 'Send reminder' --agent
|
||||
zeroclaw cron add-every 60000 'Ping heartbeat'
|
||||
zeroclaw cron once 30m 'Run backup in 30 minutes'
|
||||
zeroclaw cron once 30m 'Run backup in 30 minutes' --agent
|
||||
zeroclaw cron pause <task-id>
|
||||
zeroclaw cron update <task-id> --expression '0 8 * * *' --tz Europe/London")]
|
||||
Cron {
|
||||
|
||||
@@ -0,0 +1,175 @@
|
||||
//! LLM-driven memory consolidation.
|
||||
//!
|
||||
//! After each conversation turn, extracts structured information:
|
||||
//! - `history_entry`: A timestamped summary for the daily conversation log.
|
||||
//! - `memory_update`: New facts, preferences, or decisions worth remembering
|
||||
//! long-term (or `null` if nothing new was learned).
|
||||
//!
|
||||
//! This two-phase approach replaces the naive raw-message auto-save with
|
||||
//! semantic extraction, similar to Nanobot's `save_memory` tool call pattern.
|
||||
|
||||
use crate::memory::traits::{Memory, MemoryCategory};
|
||||
use crate::providers::traits::Provider;
|
||||
|
||||
/// Output of consolidation extraction.
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
pub struct ConsolidationResult {
|
||||
/// Brief timestamped summary for the conversation history log.
|
||||
pub history_entry: String,
|
||||
/// New facts/preferences/decisions to store long-term, or None.
|
||||
pub memory_update: Option<String>,
|
||||
}
|
||||
|
||||
const CONSOLIDATION_SYSTEM_PROMPT: &str = r#"You are a memory consolidation engine. Given a conversation turn, extract:
|
||||
1. "history_entry": A brief summary of what happened in this turn (1-2 sentences). Include the key topic or action.
|
||||
2. "memory_update": Any NEW facts, preferences, decisions, or commitments worth remembering long-term. Return null if nothing new was learned.
|
||||
|
||||
Respond ONLY with valid JSON: {"history_entry": "...", "memory_update": "..." or null}
|
||||
Do not include any text outside the JSON object."#;
|
||||
|
||||
/// Run two-phase LLM-driven consolidation on a conversation turn.
|
||||
///
|
||||
/// Phase 1: Write a history entry to the Daily memory category.
|
||||
/// Phase 2: Write a memory update to the Core category (if the LLM identified new facts).
|
||||
///
|
||||
/// This function is designed to be called fire-and-forget via `tokio::spawn`.
|
||||
pub async fn consolidate_turn(
|
||||
provider: &dyn Provider,
|
||||
model: &str,
|
||||
memory: &dyn Memory,
|
||||
user_message: &str,
|
||||
assistant_response: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let turn_text = format!("User: {user_message}\nAssistant: {assistant_response}");
|
||||
|
||||
// Truncate very long turns to avoid wasting tokens on consolidation.
|
||||
// Use char-boundary-safe slicing to prevent panic on multi-byte UTF-8 (e.g. CJK text).
|
||||
let truncated = if turn_text.len() > 4000 {
|
||||
let mut end = 4000;
|
||||
while end > 0 && !turn_text.is_char_boundary(end) {
|
||||
end -= 1;
|
||||
}
|
||||
format!("{}…", &turn_text[..end])
|
||||
} else {
|
||||
turn_text.clone()
|
||||
};
|
||||
|
||||
let raw = provider
|
||||
.chat_with_system(Some(CONSOLIDATION_SYSTEM_PROMPT), &truncated, model, 0.1)
|
||||
.await?;
|
||||
|
||||
let result: ConsolidationResult = parse_consolidation_response(&raw, &turn_text);
|
||||
|
||||
// Phase 1: Write history entry to Daily category.
|
||||
let date = chrono::Local::now().format("%Y-%m-%d").to_string();
|
||||
let history_key = format!("daily_{date}_{}", uuid::Uuid::new_v4());
|
||||
memory
|
||||
.store(
|
||||
&history_key,
|
||||
&result.history_entry,
|
||||
MemoryCategory::Daily,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Phase 2: Write memory update to Core category (if present).
|
||||
if let Some(ref update) = result.memory_update {
|
||||
if !update.trim().is_empty() {
|
||||
let mem_key = format!("core_{}", uuid::Uuid::new_v4());
|
||||
memory
|
||||
.store(&mem_key, update, MemoryCategory::Core, None)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Parse the LLM's consolidation response, with fallback for malformed JSON.
|
||||
fn parse_consolidation_response(raw: &str, fallback_text: &str) -> ConsolidationResult {
|
||||
// Try to extract JSON from the response (LLM may wrap in markdown code blocks).
|
||||
let cleaned = raw
|
||||
.trim()
|
||||
.trim_start_matches("```json")
|
||||
.trim_start_matches("```")
|
||||
.trim_end_matches("```")
|
||||
.trim();
|
||||
|
||||
serde_json::from_str(cleaned).unwrap_or_else(|_| {
|
||||
// Fallback: use truncated turn text as history entry.
|
||||
// Use char-boundary-safe slicing to prevent panic on multi-byte UTF-8.
|
||||
let summary = if fallback_text.len() > 200 {
|
||||
let mut end = 200;
|
||||
while end > 0 && !fallback_text.is_char_boundary(end) {
|
||||
end -= 1;
|
||||
}
|
||||
format!("{}…", &fallback_text[..end])
|
||||
} else {
|
||||
fallback_text.to_string()
|
||||
};
|
||||
ConsolidationResult {
|
||||
history_entry: summary,
|
||||
memory_update: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parse_valid_json_response() {
|
||||
let raw = r#"{"history_entry": "User asked about Rust.", "memory_update": "User prefers Rust over Go."}"#;
|
||||
let result = parse_consolidation_response(raw, "fallback");
|
||||
assert_eq!(result.history_entry, "User asked about Rust.");
|
||||
assert_eq!(
|
||||
result.memory_update.as_deref(),
|
||||
Some("User prefers Rust over Go.")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_json_with_null_memory() {
|
||||
let raw = r#"{"history_entry": "Routine greeting.", "memory_update": null}"#;
|
||||
let result = parse_consolidation_response(raw, "fallback");
|
||||
assert_eq!(result.history_entry, "Routine greeting.");
|
||||
assert!(result.memory_update.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_json_wrapped_in_code_block() {
|
||||
let raw =
|
||||
"```json\n{\"history_entry\": \"Discussed deployment.\", \"memory_update\": null}\n```";
|
||||
let result = parse_consolidation_response(raw, "fallback");
|
||||
assert_eq!(result.history_entry, "Discussed deployment.");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fallback_on_malformed_response() {
|
||||
let raw = "I'm sorry, I can't do that.";
|
||||
let result = parse_consolidation_response(raw, "User: hello\nAssistant: hi");
|
||||
assert_eq!(result.history_entry, "User: hello\nAssistant: hi");
|
||||
assert!(result.memory_update.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fallback_truncates_long_text() {
|
||||
let long_text = "x".repeat(500);
|
||||
let result = parse_consolidation_response("invalid", &long_text);
|
||||
// 200 bytes + "…" (3 bytes in UTF-8) = 203
|
||||
assert!(result.history_entry.len() <= 203);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fallback_truncates_cjk_text_without_panic() {
|
||||
// Each CJK character is 3 bytes in UTF-8; byte index 200 may land
|
||||
// inside a character. This must not panic.
|
||||
let cjk_text = "二手书项目".repeat(50); // 250 chars = 750 bytes
|
||||
let result = parse_consolidation_response("invalid", &cjk_text);
|
||||
assert!(result
|
||||
.history_entry
|
||||
.is_char_boundary(result.history_entry.len()));
|
||||
assert!(result.history_entry.ends_with('…'));
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
pub mod backend;
|
||||
pub mod chunker;
|
||||
pub mod cli;
|
||||
pub mod consolidation;
|
||||
pub mod embeddings;
|
||||
pub mod hygiene;
|
||||
pub mod lucid;
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::multimodal;
|
||||
use crate::providers::traits::{ChatMessage, Provider, ProviderCapabilities};
|
||||
use crate::providers::ProviderRuntimeOptions;
|
||||
use async_trait::async_trait;
|
||||
use futures_util::StreamExt;
|
||||
use reqwest::Client;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
@@ -472,8 +473,24 @@ fn extract_stream_error_message(event: &Value) -> Option<String> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Read the response body incrementally via `bytes_stream()` to avoid
|
||||
/// buffering the entire SSE payload in memory. The previous implementation
|
||||
/// used `response.text().await?` which holds the HTTP connection open until
|
||||
/// every byte has arrived — on high-latency links the long-lived connection
|
||||
/// often drops mid-read, producing the "error decoding response body" failure
|
||||
/// reported in #3544.
|
||||
async fn decode_responses_body(response: reqwest::Response) -> anyhow::Result<String> {
|
||||
let body = response.text().await?;
|
||||
let mut body = String::new();
|
||||
let mut stream = response.bytes_stream();
|
||||
|
||||
while let Some(chunk) = stream.next().await {
|
||||
let bytes = chunk
|
||||
.map_err(|err| anyhow::anyhow!("error reading OpenAI Codex response stream: {err}"))?;
|
||||
let text = std::str::from_utf8(&bytes).map_err(|err| {
|
||||
anyhow::anyhow!("OpenAI Codex response contained invalid UTF-8: {err}")
|
||||
})?;
|
||||
body.push_str(text);
|
||||
}
|
||||
|
||||
if let Some(text) = parse_sse_text(&body)? {
|
||||
return Ok(text);
|
||||
|
||||
+16
-1
@@ -67,7 +67,13 @@ pub fn redact(value: &str) -> String {
|
||||
if value.len() <= 4 {
|
||||
"***".to_string()
|
||||
} else {
|
||||
format!("{}***", &value[..4])
|
||||
// Use char-boundary-safe slicing to prevent panic on multi-byte UTF-8.
|
||||
let prefix = value
|
||||
.char_indices()
|
||||
.nth(4)
|
||||
.map(|(byte_idx, _)| &value[..byte_idx])
|
||||
.unwrap_or(value);
|
||||
format!("{}***", prefix)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,4 +108,13 @@ mod tests {
|
||||
assert_eq!(redact(""), "***");
|
||||
assert_eq!(redact("12345"), "1234***");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn redact_handles_multibyte_utf8_without_panic() {
|
||||
// CJK characters are 3 bytes each; slicing at byte 4 would panic
|
||||
// without char-boundary-safe handling.
|
||||
let result = redact("密码是很长的秘密");
|
||||
assert!(result.ends_with("***"));
|
||||
assert!(result.is_char_boundary(result.len()));
|
||||
}
|
||||
}
|
||||
|
||||
+144
-3
@@ -793,6 +793,8 @@ impl SecurityPolicy {
|
||||
// 1. Allowlist check (is the base command permitted at all?)
|
||||
// 2. Risk classification (high / medium / low)
|
||||
// 3. Policy flags (block_high_risk_commands, require_approval_for_medium_risk)
|
||||
// — explicit allowlist entries exempt a command from the high-risk block,
|
||||
// but the wildcard "*" does NOT grant an exemption.
|
||||
// 4. Autonomy level × approval status (supervised requires explicit approval)
|
||||
// This ordering ensures deny-by-default: unknown commands are rejected
|
||||
// before any risk or autonomy logic runs.
|
||||
@@ -810,7 +812,7 @@ impl SecurityPolicy {
|
||||
let risk = self.command_risk_level(command);
|
||||
|
||||
if risk == CommandRiskLevel::High {
|
||||
if self.block_high_risk_commands {
|
||||
if self.block_high_risk_commands && !self.is_command_explicitly_allowed(command) {
|
||||
return Err("Command blocked: high-risk command is disallowed by policy".into());
|
||||
}
|
||||
if self.autonomy == AutonomyLevel::Supervised && !approved {
|
||||
@@ -834,6 +836,48 @@ impl SecurityPolicy {
|
||||
Ok(risk)
|
||||
}
|
||||
|
||||
/// Check whether **every** segment of a command is explicitly listed in
|
||||
/// `allowed_commands` — i.e., matched by a concrete entry rather than by
|
||||
/// the wildcard `"*"`.
|
||||
///
|
||||
/// This is used to exempt explicitly-allowlisted high-risk commands from
|
||||
/// the `block_high_risk_commands` gate. The wildcard entry intentionally
|
||||
/// does **not** qualify as an explicit allowlist match, so that operators
|
||||
/// who set `allowed_commands = ["*"]` still get the high-risk safety net.
|
||||
fn is_command_explicitly_allowed(&self, command: &str) -> bool {
|
||||
let segments = split_unquoted_segments(command);
|
||||
for segment in &segments {
|
||||
let cmd_part = skip_env_assignments(segment);
|
||||
let mut words = cmd_part.split_whitespace();
|
||||
let executable = strip_wrapping_quotes(words.next().unwrap_or("")).trim();
|
||||
let base_cmd_owned = command_basename(executable).to_ascii_lowercase();
|
||||
let base_cmd = strip_windows_exe_suffix(&base_cmd_owned);
|
||||
|
||||
if base_cmd.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let explicitly_listed = self.allowed_commands.iter().any(|allowed| {
|
||||
let allowed = strip_wrapping_quotes(allowed).trim();
|
||||
// Skip wildcard — it does not count as an explicit entry.
|
||||
if allowed.is_empty() || allowed == "*" {
|
||||
return false;
|
||||
}
|
||||
is_allowlist_entry_match(allowed, executable, base_cmd)
|
||||
});
|
||||
|
||||
if !explicitly_listed {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// At least one real command must be present.
|
||||
segments.iter().any(|s| {
|
||||
let s = skip_env_assignments(s.trim());
|
||||
s.split_whitespace().next().is_some_and(|w| !w.is_empty())
|
||||
})
|
||||
}
|
||||
|
||||
// ── Layered Command Allowlist ──────────────────────────────────────────
|
||||
// Defence-in-depth: five independent gates run in order before the
|
||||
// per-segment allowlist check. Each gate targets a specific bypass
|
||||
@@ -1503,10 +1547,13 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_command_blocks_high_risk_by_default() {
|
||||
fn validate_command_blocks_high_risk_via_wildcard() {
|
||||
// Wildcard allows the command through is_command_allowed, but
|
||||
// block_high_risk_commands still rejects it because "*" does not
|
||||
// count as an explicit allowlist entry.
|
||||
let p = SecurityPolicy {
|
||||
autonomy: AutonomyLevel::Supervised,
|
||||
allowed_commands: vec!["rm".into()],
|
||||
allowed_commands: vec!["*".into()],
|
||||
..SecurityPolicy::default()
|
||||
};
|
||||
|
||||
@@ -1515,6 +1562,100 @@ mod tests {
|
||||
assert!(result.unwrap_err().contains("high-risk"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_command_allows_explicitly_listed_high_risk() {
|
||||
// When a high-risk command is explicitly in allowed_commands, the
|
||||
// block_high_risk_commands gate is bypassed — the operator has made
|
||||
// a deliberate decision to permit it.
|
||||
let p = SecurityPolicy {
|
||||
autonomy: AutonomyLevel::Full,
|
||||
allowed_commands: vec!["curl".into()],
|
||||
block_high_risk_commands: true,
|
||||
..SecurityPolicy::default()
|
||||
};
|
||||
|
||||
let result = p.validate_command_execution("curl https://api.example.com/data", true);
|
||||
assert_eq!(result.unwrap(), CommandRiskLevel::High);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_command_allows_wget_when_explicitly_listed() {
|
||||
let p = SecurityPolicy {
|
||||
autonomy: AutonomyLevel::Full,
|
||||
allowed_commands: vec!["wget".into()],
|
||||
block_high_risk_commands: true,
|
||||
..SecurityPolicy::default()
|
||||
};
|
||||
|
||||
let result =
|
||||
p.validate_command_execution("wget https://releases.example.com/v1.tar.gz", true);
|
||||
assert_eq!(result.unwrap(), CommandRiskLevel::High);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_command_blocks_non_listed_high_risk_when_another_is_allowed() {
|
||||
// Allowing curl explicitly should not exempt wget.
|
||||
let p = SecurityPolicy {
|
||||
autonomy: AutonomyLevel::Full,
|
||||
allowed_commands: vec!["curl".into()],
|
||||
block_high_risk_commands: true,
|
||||
..SecurityPolicy::default()
|
||||
};
|
||||
|
||||
let result = p.validate_command_execution("wget https://evil.com", true);
|
||||
assert!(result.is_err());
|
||||
assert!(result.unwrap_err().contains("not allowed"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_command_explicit_rm_bypasses_high_risk_block() {
|
||||
// Operator explicitly listed "rm" — they accept the risk.
|
||||
let p = SecurityPolicy {
|
||||
autonomy: AutonomyLevel::Full,
|
||||
allowed_commands: vec!["rm".into()],
|
||||
block_high_risk_commands: true,
|
||||
..SecurityPolicy::default()
|
||||
};
|
||||
|
||||
let result = p.validate_command_execution("rm -rf /tmp/test", true);
|
||||
assert_eq!(result.unwrap(), CommandRiskLevel::High);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_command_high_risk_still_needs_approval_in_supervised() {
|
||||
// Even when explicitly allowed, supervised mode still requires
|
||||
// approval for high-risk commands (the approval gate is separate
|
||||
// from the block gate).
|
||||
let p = SecurityPolicy {
|
||||
autonomy: AutonomyLevel::Supervised,
|
||||
allowed_commands: vec!["curl".into()],
|
||||
block_high_risk_commands: true,
|
||||
..SecurityPolicy::default()
|
||||
};
|
||||
|
||||
let denied = p.validate_command_execution("curl https://api.example.com", false);
|
||||
assert!(denied.is_err());
|
||||
assert!(denied.unwrap_err().contains("requires explicit approval"));
|
||||
|
||||
let allowed = p.validate_command_execution("curl https://api.example.com", true);
|
||||
assert_eq!(allowed.unwrap(), CommandRiskLevel::High);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_command_pipe_needs_all_segments_explicitly_allowed() {
|
||||
// When a pipeline contains a high-risk command, every segment
|
||||
// must be explicitly allowed for the exemption to apply.
|
||||
let p = SecurityPolicy {
|
||||
autonomy: AutonomyLevel::Full,
|
||||
allowed_commands: vec!["curl".into(), "grep".into()],
|
||||
block_high_risk_commands: true,
|
||||
..SecurityPolicy::default()
|
||||
};
|
||||
|
||||
let result = p.validate_command_execution("curl https://api.example.com | grep data", true);
|
||||
assert_eq!(result.unwrap(), CommandRiskLevel::High);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_command_full_mode_skips_medium_risk_approval_gate() {
|
||||
let p = SecurityPolicy {
|
||||
|
||||
@@ -12,6 +12,7 @@ pub struct HttpRequestTool {
|
||||
allowed_domains: Vec<String>,
|
||||
max_response_size: usize,
|
||||
timeout_secs: u64,
|
||||
allow_private_hosts: bool,
|
||||
}
|
||||
|
||||
impl HttpRequestTool {
|
||||
@@ -20,12 +21,14 @@ impl HttpRequestTool {
|
||||
allowed_domains: Vec<String>,
|
||||
max_response_size: usize,
|
||||
timeout_secs: u64,
|
||||
allow_private_hosts: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
security,
|
||||
allowed_domains: normalize_allowed_domains(allowed_domains),
|
||||
max_response_size,
|
||||
timeout_secs,
|
||||
allow_private_hosts,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,7 +55,7 @@ impl HttpRequestTool {
|
||||
|
||||
let host = extract_host(url)?;
|
||||
|
||||
if is_private_or_local_host(&host) {
|
||||
if !self.allow_private_hosts && is_private_or_local_host(&host) {
|
||||
anyhow::bail!("Blocked local/private host: {host}");
|
||||
}
|
||||
|
||||
@@ -454,6 +457,13 @@ mod tests {
|
||||
use crate::security::{AutonomyLevel, SecurityPolicy};
|
||||
|
||||
fn test_tool(allowed_domains: Vec<&str>) -> HttpRequestTool {
|
||||
test_tool_with_private(allowed_domains, false)
|
||||
}
|
||||
|
||||
fn test_tool_with_private(
|
||||
allowed_domains: Vec<&str>,
|
||||
allow_private_hosts: bool,
|
||||
) -> HttpRequestTool {
|
||||
let security = Arc::new(SecurityPolicy {
|
||||
autonomy: AutonomyLevel::Supervised,
|
||||
..SecurityPolicy::default()
|
||||
@@ -463,6 +473,7 @@ mod tests {
|
||||
allowed_domains.into_iter().map(String::from).collect(),
|
||||
1_000_000,
|
||||
30,
|
||||
allow_private_hosts,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -570,7 +581,7 @@ mod tests {
|
||||
#[test]
|
||||
fn validate_requires_allowlist() {
|
||||
let security = Arc::new(SecurityPolicy::default());
|
||||
let tool = HttpRequestTool::new(security, vec![], 1_000_000, 30);
|
||||
let tool = HttpRequestTool::new(security, vec![], 1_000_000, 30, false);
|
||||
let err = tool
|
||||
.validate_url("https://example.com")
|
||||
.unwrap_err()
|
||||
@@ -686,7 +697,7 @@ mod tests {
|
||||
autonomy: AutonomyLevel::ReadOnly,
|
||||
..SecurityPolicy::default()
|
||||
});
|
||||
let tool = HttpRequestTool::new(security, vec!["example.com".into()], 1_000_000, 30);
|
||||
let tool = HttpRequestTool::new(security, vec!["example.com".into()], 1_000_000, 30, false);
|
||||
let result = tool
|
||||
.execute(json!({"url": "https://example.com"}))
|
||||
.await
|
||||
@@ -701,7 +712,7 @@ mod tests {
|
||||
max_actions_per_hour: 0,
|
||||
..SecurityPolicy::default()
|
||||
});
|
||||
let tool = HttpRequestTool::new(security, vec!["example.com".into()], 1_000_000, 30);
|
||||
let tool = HttpRequestTool::new(security, vec!["example.com".into()], 1_000_000, 30, false);
|
||||
let result = tool
|
||||
.execute(json!({"url": "https://example.com"}))
|
||||
.await
|
||||
@@ -724,6 +735,7 @@ mod tests {
|
||||
vec!["example.com".into()],
|
||||
10,
|
||||
30,
|
||||
false,
|
||||
);
|
||||
let text = "hello world this is long";
|
||||
let truncated = tool.truncate_response(text);
|
||||
@@ -738,6 +750,7 @@ mod tests {
|
||||
vec!["example.com".into()],
|
||||
0, // max_response_size = 0 means no limit
|
||||
30,
|
||||
false,
|
||||
);
|
||||
let text = "a".repeat(10_000_000);
|
||||
assert_eq!(tool.truncate_response(&text), text);
|
||||
@@ -750,6 +763,7 @@ mod tests {
|
||||
vec!["example.com".into()],
|
||||
5,
|
||||
30,
|
||||
false,
|
||||
);
|
||||
let text = "hello world";
|
||||
let truncated = tool.truncate_response(text);
|
||||
@@ -935,4 +949,70 @@ mod tests {
|
||||
.to_string();
|
||||
assert!(err.contains("IPv6"));
|
||||
}
|
||||
|
||||
// ── allow_private_hosts opt-in tests ────────────────────────
|
||||
|
||||
#[test]
|
||||
fn default_blocks_private_hosts() {
|
||||
let tool = test_tool(vec!["localhost", "192.168.1.5", "*"]);
|
||||
assert!(tool
|
||||
.validate_url("https://localhost:8080")
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("local/private"));
|
||||
assert!(tool
|
||||
.validate_url("https://192.168.1.5")
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("local/private"));
|
||||
assert!(tool
|
||||
.validate_url("https://10.0.0.1")
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("local/private"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn allow_private_hosts_permits_localhost() {
|
||||
let tool = test_tool_with_private(vec!["localhost"], true);
|
||||
assert!(tool.validate_url("https://localhost:8080").is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn allow_private_hosts_permits_private_ipv4() {
|
||||
let tool = test_tool_with_private(vec!["192.168.1.5"], true);
|
||||
assert!(tool.validate_url("https://192.168.1.5").is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn allow_private_hosts_permits_rfc1918_with_wildcard() {
|
||||
let tool = test_tool_with_private(vec!["*"], true);
|
||||
assert!(tool.validate_url("https://10.0.0.1").is_ok());
|
||||
assert!(tool.validate_url("https://172.16.0.1").is_ok());
|
||||
assert!(tool.validate_url("https://192.168.1.1").is_ok());
|
||||
assert!(tool.validate_url("http://localhost:8123").is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn allow_private_hosts_still_requires_allowlist() {
|
||||
let tool = test_tool_with_private(vec!["example.com"], true);
|
||||
let err = tool
|
||||
.validate_url("https://192.168.1.5")
|
||||
.unwrap_err()
|
||||
.to_string();
|
||||
assert!(
|
||||
err.contains("allowed_domains"),
|
||||
"Private host should still need allowlist match, got: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn allow_private_hosts_false_still_blocks() {
|
||||
let tool = test_tool_with_private(vec!["*"], false);
|
||||
assert!(tool
|
||||
.validate_url("https://localhost:8080")
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("local/private"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -314,6 +314,7 @@ pub fn all_tools_with_runtime(
|
||||
http_config.allowed_domains.clone(),
|
||||
http_config.max_response_size,
|
||||
http_config.timeout_secs,
|
||||
http_config.allow_private_hosts,
|
||||
)));
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user