Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d928ebc92e | |||
| 7106632b51 | |||
| b834278754 | |||
| 186f6d9797 | |||
| 6cdc92a256 | |||
| 02599dcd3c | |||
| fe64d7ef7e | |||
| 45f953be6d | |||
| 82f29bbcb1 | |||
| 93b5a0b824 | |||
| 08a67c4a2d | |||
| c86a0673ba | |||
| cabf99ba07 | |||
| 2d978a6b64 | |||
| 4dbc9266c1 | |||
| ea0b3c8c8c |
+4
-1
@@ -43,4 +43,7 @@ credentials.json
|
||||
lcov.info
|
||||
|
||||
# IDE's stuff
|
||||
.idea
|
||||
.idea
|
||||
|
||||
# Wrangler cache
|
||||
.wrangler/
|
||||
Generated
+1
-1
@@ -7945,7 +7945,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "zeroclawlabs"
|
||||
version = "0.3.1"
|
||||
version = "0.3.3"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-imap",
|
||||
|
||||
+1
-1
@@ -4,7 +4,7 @@ resolver = "2"
|
||||
|
||||
[package]
|
||||
name = "zeroclawlabs"
|
||||
version = "0.3.1"
|
||||
version = "0.3.3"
|
||||
edition = "2021"
|
||||
authors = ["theonlyhennygod"]
|
||||
license = "MIT OR Apache-2.0"
|
||||
|
||||
Executable
+261
@@ -0,0 +1,261 @@
|
||||
#!/usr/bin/env bash
|
||||
# Termux release validation script
|
||||
# Validates the aarch64-linux-android release artifact for Termux compatibility.
|
||||
#
|
||||
# Usage:
|
||||
# ./dev/test-termux-release.sh [version]
|
||||
#
|
||||
# Examples:
|
||||
# ./dev/test-termux-release.sh 0.3.1
|
||||
# ./dev/test-termux-release.sh # auto-detects from Cargo.toml
|
||||
#
|
||||
set -euo pipefail
|
||||
|
||||
BLUE='\033[0;34m'
|
||||
GREEN='\033[0;32m'
|
||||
RED='\033[0;31m'
|
||||
YELLOW='\033[0;33m'
|
||||
BOLD='\033[1m'
|
||||
DIM='\033[2m'
|
||||
RESET='\033[0m'
|
||||
|
||||
pass() { echo -e " ${GREEN}✓${RESET} $*"; }
|
||||
fail() { echo -e " ${RED}✗${RESET} $*"; FAILURES=$((FAILURES + 1)); }
|
||||
info() { echo -e "${BLUE}→${RESET} ${BOLD}$*${RESET}"; }
|
||||
warn() { echo -e "${YELLOW}!${RESET} $*"; }
|
||||
|
||||
FAILURES=0
|
||||
TARGET="aarch64-linux-android"
|
||||
VERSION="${1:-}"
|
||||
|
||||
if [[ -z "$VERSION" ]]; then
|
||||
if [[ -f Cargo.toml ]]; then
|
||||
VERSION=$(sed -n 's/^version = "\([^"]*\)"/\1/p' Cargo.toml | head -1)
|
||||
fi
|
||||
fi
|
||||
|
||||
if [[ -z "$VERSION" ]]; then
|
||||
echo "Usage: $0 <version>"
|
||||
echo " e.g. $0 0.3.1"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
TAG="v${VERSION}"
|
||||
ASSET_NAME="zeroclaw-${TARGET}.tar.gz"
|
||||
ASSET_URL="https://github.com/zeroclaw-labs/zeroclaw/releases/download/${TAG}/${ASSET_NAME}"
|
||||
TEMP_DIR="$(mktemp -d -t zeroclaw-termux-test-XXXXXX)"
|
||||
|
||||
cleanup() { rm -rf "$TEMP_DIR"; }
|
||||
trap cleanup EXIT
|
||||
|
||||
echo
|
||||
echo -e "${BOLD}Termux Release Validation — ${TAG}${RESET}"
|
||||
echo -e "${DIM}Target: ${TARGET}${RESET}"
|
||||
echo
|
||||
|
||||
# --- Test 1: Release tag exists ---
|
||||
info "Checking release tag ${TAG}"
|
||||
if gh release view "$TAG" >/dev/null 2>&1; then
|
||||
pass "Release ${TAG} exists"
|
||||
else
|
||||
fail "Release ${TAG} not found"
|
||||
echo -e "${RED}Release has not been published yet. Wait for the release workflow to complete.${RESET}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# --- Test 2: Android asset is listed ---
|
||||
info "Checking for ${ASSET_NAME} in release assets"
|
||||
ASSETS=$(gh release view "$TAG" --json assets -q '.assets[].name')
|
||||
if echo "$ASSETS" | grep -q "$ASSET_NAME"; then
|
||||
pass "Asset ${ASSET_NAME} found in release"
|
||||
else
|
||||
fail "Asset ${ASSET_NAME} not found in release"
|
||||
echo "Available assets:"
|
||||
echo "$ASSETS" | sed 's/^/ /'
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# --- Test 3: Download the asset ---
|
||||
info "Downloading ${ASSET_NAME}"
|
||||
if curl -fsSL "$ASSET_URL" -o "$TEMP_DIR/$ASSET_NAME"; then
|
||||
FILESIZE=$(wc -c < "$TEMP_DIR/$ASSET_NAME" | tr -d ' ')
|
||||
pass "Downloaded successfully (${FILESIZE} bytes)"
|
||||
else
|
||||
fail "Download failed from ${ASSET_URL}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# --- Test 4: Archive integrity ---
|
||||
info "Verifying archive integrity"
|
||||
if tar -tzf "$TEMP_DIR/$ASSET_NAME" >/dev/null 2>&1; then
|
||||
pass "Archive is a valid gzip tar"
|
||||
else
|
||||
fail "Archive is corrupted or not a valid tar.gz"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# --- Test 5: Contains zeroclaw binary ---
|
||||
info "Checking archive contents"
|
||||
CONTENTS=$(tar -tzf "$TEMP_DIR/$ASSET_NAME")
|
||||
if echo "$CONTENTS" | grep -q "^zeroclaw$"; then
|
||||
pass "Archive contains 'zeroclaw' binary"
|
||||
else
|
||||
fail "Archive does not contain 'zeroclaw' binary"
|
||||
echo "Contents:"
|
||||
echo "$CONTENTS" | sed 's/^/ /'
|
||||
fi
|
||||
|
||||
# --- Test 6: Extract and inspect binary ---
|
||||
info "Extracting and inspecting binary"
|
||||
tar -xzf "$TEMP_DIR/$ASSET_NAME" -C "$TEMP_DIR"
|
||||
BINARY="$TEMP_DIR/zeroclaw"
|
||||
|
||||
if [[ -f "$BINARY" ]]; then
|
||||
pass "Binary extracted"
|
||||
else
|
||||
fail "Binary not found after extraction"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# --- Test 7: ELF format and architecture ---
|
||||
info "Checking binary format"
|
||||
FILE_INFO=$(file "$BINARY")
|
||||
if echo "$FILE_INFO" | grep -q "ELF"; then
|
||||
pass "Binary is ELF format"
|
||||
else
|
||||
fail "Binary is not ELF format: $FILE_INFO"
|
||||
fi
|
||||
|
||||
if echo "$FILE_INFO" | grep -qi "aarch64\|ARM aarch64"; then
|
||||
pass "Binary targets aarch64 architecture"
|
||||
else
|
||||
fail "Binary does not target aarch64: $FILE_INFO"
|
||||
fi
|
||||
|
||||
if echo "$FILE_INFO" | grep -qi "android\|bionic"; then
|
||||
pass "Binary is linked for Android/Bionic"
|
||||
else
|
||||
# Android binaries may not always show "android" in file output,
|
||||
# check with readelf if available
|
||||
if command -v readelf >/dev/null 2>&1; then
|
||||
INTERP=$(readelf -l "$BINARY" 2>/dev/null | grep -o '/[^ ]*linker[^ ]*' || true)
|
||||
if echo "$INTERP" | grep -qi "android\|bionic"; then
|
||||
pass "Binary uses Android linker: $INTERP"
|
||||
else
|
||||
warn "Could not confirm Android linkage (interpreter: ${INTERP:-unknown})"
|
||||
warn "file output: $FILE_INFO"
|
||||
fi
|
||||
else
|
||||
warn "Could not confirm Android linkage (readelf not available)"
|
||||
warn "file output: $FILE_INFO"
|
||||
fi
|
||||
fi
|
||||
|
||||
# --- Test 8: Binary is stripped ---
|
||||
info "Checking binary optimization"
|
||||
if echo "$FILE_INFO" | grep -q "stripped"; then
|
||||
pass "Binary is stripped (release optimized)"
|
||||
else
|
||||
warn "Binary may not be stripped"
|
||||
fi
|
||||
|
||||
# --- Test 9: Binary is not dynamically linked to glibc ---
|
||||
info "Checking for glibc dependencies"
|
||||
if command -v readelf >/dev/null 2>&1; then
|
||||
NEEDED=$(readelf -d "$BINARY" 2>/dev/null | grep NEEDED || true)
|
||||
if echo "$NEEDED" | grep -qi "libc\.so\.\|libpthread\|libdl"; then
|
||||
# Check if it's glibc or bionic
|
||||
if echo "$NEEDED" | grep -qi "libc\.so\.6"; then
|
||||
fail "Binary links against glibc (libc.so.6) — will not work on Termux"
|
||||
else
|
||||
pass "Binary links against libc (likely Bionic)"
|
||||
fi
|
||||
else
|
||||
pass "No glibc dependencies detected"
|
||||
fi
|
||||
else
|
||||
warn "readelf not available — skipping dynamic library check"
|
||||
fi
|
||||
|
||||
# --- Test 10: SHA256 checksum verification ---
|
||||
info "Verifying SHA256 checksum"
|
||||
CHECKSUMS_URL="https://github.com/zeroclaw-labs/zeroclaw/releases/download/${TAG}/SHA256SUMS"
|
||||
if curl -fsSL "$CHECKSUMS_URL" -o "$TEMP_DIR/SHA256SUMS" 2>/dev/null; then
|
||||
EXPECTED=$(grep "$ASSET_NAME" "$TEMP_DIR/SHA256SUMS" | awk '{print $1}')
|
||||
if [[ -n "$EXPECTED" ]]; then
|
||||
if command -v sha256sum >/dev/null 2>&1; then
|
||||
ACTUAL=$(sha256sum "$TEMP_DIR/$ASSET_NAME" | awk '{print $1}')
|
||||
elif command -v shasum >/dev/null 2>&1; then
|
||||
ACTUAL=$(shasum -a 256 "$TEMP_DIR/$ASSET_NAME" | awk '{print $1}')
|
||||
else
|
||||
warn "No sha256sum or shasum available"
|
||||
ACTUAL=""
|
||||
fi
|
||||
|
||||
if [[ -n "$ACTUAL" && "$ACTUAL" == "$EXPECTED" ]]; then
|
||||
pass "SHA256 checksum matches"
|
||||
elif [[ -n "$ACTUAL" ]]; then
|
||||
fail "SHA256 mismatch: expected=$EXPECTED actual=$ACTUAL"
|
||||
fi
|
||||
else
|
||||
warn "No checksum entry for ${ASSET_NAME} in SHA256SUMS"
|
||||
fi
|
||||
else
|
||||
warn "Could not download SHA256SUMS"
|
||||
fi
|
||||
|
||||
# --- Test 11: install.sh Termux detection ---
|
||||
info "Validating install.sh Termux detection"
|
||||
INSTALL_SH="install.sh"
|
||||
if [[ ! -f "$INSTALL_SH" ]]; then
|
||||
INSTALL_SH="$(dirname "$0")/../install.sh"
|
||||
fi
|
||||
|
||||
if [[ -f "$INSTALL_SH" ]]; then
|
||||
if grep -q 'TERMUX_VERSION' "$INSTALL_SH"; then
|
||||
pass "install.sh checks TERMUX_VERSION"
|
||||
else
|
||||
fail "install.sh does not check TERMUX_VERSION"
|
||||
fi
|
||||
|
||||
if grep -q 'aarch64-linux-android' "$INSTALL_SH"; then
|
||||
pass "install.sh maps to aarch64-linux-android target"
|
||||
else
|
||||
fail "install.sh does not map to aarch64-linux-android"
|
||||
fi
|
||||
|
||||
# Simulate Termux detection (mock uname as Linux since we may run on macOS)
|
||||
detect_result=$(
|
||||
bash -c '
|
||||
TERMUX_VERSION="0.118"
|
||||
os="Linux"
|
||||
arch="aarch64"
|
||||
case "$os:$arch" in
|
||||
Linux:aarch64|Linux:arm64)
|
||||
if [[ -n "${TERMUX_VERSION:-}" || -d "/data/data/com.termux" ]]; then
|
||||
echo "aarch64-linux-android"
|
||||
else
|
||||
echo "aarch64-unknown-linux-gnu"
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
'
|
||||
)
|
||||
if [[ "$detect_result" == "aarch64-linux-android" ]]; then
|
||||
pass "Termux detection returns correct target (simulated)"
|
||||
else
|
||||
fail "Termux detection returned: $detect_result (expected aarch64-linux-android)"
|
||||
fi
|
||||
else
|
||||
warn "install.sh not found — skipping detection tests"
|
||||
fi
|
||||
|
||||
# --- Summary ---
|
||||
echo
|
||||
if [[ "$FAILURES" -eq 0 ]]; then
|
||||
echo -e "${GREEN}${BOLD}All tests passed!${RESET}"
|
||||
echo -e "${DIM}The Termux release artifact for ${TAG} is valid.${RESET}"
|
||||
else
|
||||
echo -e "${RED}${BOLD}${FAILURES} test(s) failed.${RESET}"
|
||||
exit 1
|
||||
fi
|
||||
+54
-2
@@ -195,6 +195,18 @@ const COMPACTION_MAX_SOURCE_CHARS: usize = 12_000;
|
||||
/// Max characters retained in stored compaction summary.
|
||||
const COMPACTION_MAX_SUMMARY_CHARS: usize = 2_000;
|
||||
|
||||
/// Estimate token count for a message history using ~4 chars/token heuristic.
|
||||
/// Includes a small overhead per message for role/framing tokens.
|
||||
fn estimate_history_tokens(history: &[ChatMessage]) -> usize {
|
||||
history
|
||||
.iter()
|
||||
.map(|m| {
|
||||
// ~4 chars per token + ~4 framing tokens per message (role, delimiters)
|
||||
m.content.len().div_ceil(4) + 4
|
||||
})
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Minimum interval between progress sends to avoid flooding the draft channel.
|
||||
pub(crate) const PROGRESS_MIN_INTERVAL_MS: u64 = 500;
|
||||
|
||||
@@ -288,6 +300,7 @@ async fn auto_compact_history(
|
||||
provider: &dyn Provider,
|
||||
model: &str,
|
||||
max_history: usize,
|
||||
max_context_tokens: usize,
|
||||
) -> Result<bool> {
|
||||
let has_system = history.first().map_or(false, |m| m.role == "system");
|
||||
let non_system_count = if has_system {
|
||||
@@ -296,7 +309,10 @@ async fn auto_compact_history(
|
||||
history.len()
|
||||
};
|
||||
|
||||
if non_system_count <= max_history {
|
||||
let estimated_tokens = estimate_history_tokens(history);
|
||||
|
||||
// Trigger compaction when either token budget OR message count is exceeded.
|
||||
if estimated_tokens <= max_context_tokens && non_system_count <= max_history {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
@@ -307,7 +323,16 @@ async fn auto_compact_history(
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let compact_end = start + compact_count;
|
||||
let mut compact_end = start + compact_count;
|
||||
|
||||
// Snap compact_end to a user-turn boundary so we don't split mid-conversation.
|
||||
while compact_end > start && history.get(compact_end).map_or(false, |m| m.role != "user") {
|
||||
compact_end -= 1;
|
||||
}
|
||||
if compact_end <= start {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let to_compact: Vec<ChatMessage> = history[start..compact_end].to_vec();
|
||||
let transcript = build_compaction_transcript(&to_compact);
|
||||
|
||||
@@ -3508,6 +3533,7 @@ pub async fn run(
|
||||
provider.as_ref(),
|
||||
model_name,
|
||||
config.agent.max_history_messages,
|
||||
config.agent.max_context_tokens,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -6449,4 +6475,30 @@ Let me check the result."#;
|
||||
let result = filter_tool_specs_for_turn(specs, &groups, "BROWSE the site");
|
||||
assert_eq!(result.len(), 1);
|
||||
}
|
||||
|
||||
// ── Token-based compaction tests ──────────────────────────
|
||||
|
||||
#[test]
|
||||
fn estimate_history_tokens_empty() {
|
||||
assert_eq!(super::estimate_history_tokens(&[]), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn estimate_history_tokens_single_message() {
|
||||
let history = vec![ChatMessage::user("hello world")]; // 11 chars
|
||||
let tokens = super::estimate_history_tokens(&history);
|
||||
// 11.div_ceil(4) + 4 = 3 + 4 = 7
|
||||
assert_eq!(tokens, 7);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn estimate_history_tokens_multiple_messages() {
|
||||
let history = vec![
|
||||
ChatMessage::system("You are helpful."), // 16 chars → 4 + 4 = 8
|
||||
ChatMessage::user("What is Rust?"), // 13 chars → 4 + 4 = 8
|
||||
ChatMessage::assistant("A language."), // 11 chars → 3 + 4 = 7
|
||||
];
|
||||
let tokens = super::estimate_history_tokens(&history);
|
||||
assert_eq!(tokens, 23);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -746,7 +746,7 @@ impl Channel for MatrixChannel {
|
||||
MessageType::Notice(content) => (content.body.clone(), None),
|
||||
MessageType::Image(content) => {
|
||||
let dl = media_info(&content.source, &content.body);
|
||||
(format!("[image: {}]", content.body), dl)
|
||||
(format!("[IMAGE:{}]", content.body), dl)
|
||||
}
|
||||
MessageType::File(content) => {
|
||||
let dl = media_info(&content.source, &content.body);
|
||||
|
||||
@@ -31,6 +31,7 @@ pub mod nextcloud_talk;
|
||||
#[cfg(feature = "channel-nostr")]
|
||||
pub mod nostr;
|
||||
pub mod qq;
|
||||
pub mod session_store;
|
||||
pub mod signal;
|
||||
pub mod slack;
|
||||
pub mod telegram;
|
||||
@@ -312,6 +313,7 @@ struct ChannelRuntimeContext {
|
||||
model_routes: Arc<Vec<crate::config::ModelRouteConfig>>,
|
||||
ack_reactions: bool,
|
||||
show_tool_calls: bool,
|
||||
session_store: Option<Arc<session_store::SessionStore>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -965,6 +967,13 @@ fn proactive_trim_turns(turns: &mut Vec<ChatMessage>, budget: usize) -> usize {
|
||||
}
|
||||
|
||||
fn append_sender_turn(ctx: &ChannelRuntimeContext, sender_key: &str, turn: ChatMessage) {
|
||||
// Persist to JSONL before adding to in-memory history.
|
||||
if let Some(ref store) = ctx.session_store {
|
||||
if let Err(e) = store.append(sender_key, &turn) {
|
||||
tracing::warn!("Failed to persist session turn: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
let mut histories = ctx
|
||||
.conversation_histories
|
||||
.lock()
|
||||
@@ -2186,6 +2195,29 @@ async fn process_channel_message(
|
||||
&history_key,
|
||||
ChatMessage::assistant(&history_response),
|
||||
);
|
||||
|
||||
// Fire-and-forget LLM-driven memory consolidation.
|
||||
if ctx.auto_save_memory && msg.content.chars().count() >= AUTOSAVE_MIN_MESSAGE_CHARS {
|
||||
let provider = Arc::clone(&ctx.provider);
|
||||
let model = ctx.model.to_string();
|
||||
let memory = Arc::clone(&ctx.memory);
|
||||
let user_msg = msg.content.clone();
|
||||
let assistant_resp = delivered_response.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = crate::memory::consolidation::consolidate_turn(
|
||||
provider.as_ref(),
|
||||
&model,
|
||||
memory.as_ref(),
|
||||
&user_msg,
|
||||
&assistant_resp,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::debug!("Memory consolidation skipped: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
println!(
|
||||
" 🤖 Reply ({}ms): {}",
|
||||
started_at.elapsed().as_millis(),
|
||||
@@ -3805,8 +3837,42 @@ pub async fn start_channels(config: Config) -> Result<()> {
|
||||
model_routes: Arc::new(config.model_routes.clone()),
|
||||
ack_reactions: config.channels_config.ack_reactions,
|
||||
show_tool_calls: config.channels_config.show_tool_calls,
|
||||
session_store: if config.channels_config.session_persistence {
|
||||
match session_store::SessionStore::new(&config.workspace_dir) {
|
||||
Ok(store) => {
|
||||
tracing::info!("📂 Session persistence enabled");
|
||||
Some(Arc::new(store))
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("Session persistence disabled: {e}");
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
},
|
||||
});
|
||||
|
||||
// Hydrate in-memory conversation histories from persisted JSONL session files.
|
||||
if let Some(ref store) = runtime_ctx.session_store {
|
||||
let mut hydrated = 0usize;
|
||||
let mut histories = runtime_ctx
|
||||
.conversation_histories
|
||||
.lock()
|
||||
.unwrap_or_else(|e| e.into_inner());
|
||||
for key in store.list_sessions() {
|
||||
let msgs = store.load(&key);
|
||||
if !msgs.is_empty() {
|
||||
hydrated += 1;
|
||||
histories.insert(key, msgs);
|
||||
}
|
||||
}
|
||||
drop(histories);
|
||||
if hydrated > 0 {
|
||||
tracing::info!("📂 Restored {hydrated} session(s) from disk");
|
||||
}
|
||||
}
|
||||
|
||||
run_message_dispatch_loop(rx, runtime_ctx, max_in_flight_messages).await;
|
||||
|
||||
// Wait for all channel tasks
|
||||
@@ -4072,6 +4138,7 @@ mod tests {
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
};
|
||||
|
||||
assert!(compact_sender_history(&ctx, &sender));
|
||||
@@ -4175,6 +4242,7 @@ mod tests {
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
};
|
||||
|
||||
append_sender_turn(&ctx, &sender, ChatMessage::user("hello"));
|
||||
@@ -4234,6 +4302,7 @@ mod tests {
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
};
|
||||
|
||||
assert!(rollback_orphan_user_turn(&ctx, &sender, "pending"));
|
||||
@@ -4751,6 +4820,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -4818,6 +4888,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -4899,6 +4970,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -4965,6 +5037,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -5041,6 +5114,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -5137,6 +5211,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -5215,6 +5290,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -5308,6 +5384,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -5386,6 +5463,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -5454,6 +5532,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -5633,6 +5712,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(4);
|
||||
@@ -5720,6 +5800,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(8);
|
||||
@@ -5817,6 +5898,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
},
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
multimodal: crate::config::MultimodalConfig::default(),
|
||||
hooks: None,
|
||||
non_cli_excluded_tools: Arc::new(Vec::new()),
|
||||
@@ -5921,6 +6003,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(8);
|
||||
@@ -6002,6 +6085,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -6068,6 +6152,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -6692,6 +6777,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -6784,6 +6870,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -6876,6 +6963,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
@@ -7432,6 +7520,7 @@ This is an example JSON object for profile settings."#;
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
// Simulate a photo attachment message with [IMAGE:] marker.
|
||||
@@ -7505,6 +7594,7 @@ This is an example JSON object for profile settings."#;
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
});
|
||||
|
||||
process_channel_message(
|
||||
|
||||
@@ -0,0 +1,200 @@
|
||||
//! JSONL-based session persistence for channel conversations.
|
||||
//!
|
||||
//! Each session (keyed by `channel_sender` or `channel_thread_sender`) is stored
|
||||
//! as an append-only JSONL file in `{workspace}/sessions/`. Messages are appended
|
||||
//! one-per-line as JSON, never modifying old lines. On daemon restart, sessions
|
||||
//! are loaded from disk to restore conversation context.
|
||||
|
||||
use crate::providers::traits::ChatMessage;
|
||||
use std::io::{BufRead, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
/// Append-only JSONL session store for channel conversations.
|
||||
pub struct SessionStore {
|
||||
sessions_dir: PathBuf,
|
||||
}
|
||||
|
||||
impl SessionStore {
|
||||
/// Create a new session store, ensuring the sessions directory exists.
|
||||
pub fn new(workspace_dir: &Path) -> std::io::Result<Self> {
|
||||
let sessions_dir = workspace_dir.join("sessions");
|
||||
std::fs::create_dir_all(&sessions_dir)?;
|
||||
Ok(Self { sessions_dir })
|
||||
}
|
||||
|
||||
/// Compute the file path for a session key, sanitizing for filesystem safety.
|
||||
fn session_path(&self, session_key: &str) -> PathBuf {
|
||||
let safe_key: String = session_key
|
||||
.chars()
|
||||
.map(|c| {
|
||||
if c.is_alphanumeric() || c == '_' || c == '-' {
|
||||
c
|
||||
} else {
|
||||
'_'
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
self.sessions_dir.join(format!("{safe_key}.jsonl"))
|
||||
}
|
||||
|
||||
/// Load all messages for a session from its JSONL file.
|
||||
/// Returns an empty vec if the file does not exist or is unreadable.
|
||||
pub fn load(&self, session_key: &str) -> Vec<ChatMessage> {
|
||||
let path = self.session_path(session_key);
|
||||
let file = match std::fs::File::open(&path) {
|
||||
Ok(f) => f,
|
||||
Err(_) => return Vec::new(),
|
||||
};
|
||||
|
||||
let reader = std::io::BufReader::new(file);
|
||||
let mut messages = Vec::new();
|
||||
|
||||
for line in reader.lines() {
|
||||
let Ok(line) = line else { continue };
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
if let Ok(msg) = serde_json::from_str::<ChatMessage>(trimmed) {
|
||||
messages.push(msg);
|
||||
}
|
||||
}
|
||||
|
||||
messages
|
||||
}
|
||||
|
||||
/// Append a single message to the session JSONL file.
|
||||
pub fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
|
||||
let path = self.session_path(session_key);
|
||||
let mut file = std::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&path)?;
|
||||
|
||||
let json = serde_json::to_string(message)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
|
||||
|
||||
writeln!(file, "{json}")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// List all session keys that have files on disk.
|
||||
pub fn list_sessions(&self) -> Vec<String> {
|
||||
let entries = match std::fs::read_dir(&self.sessions_dir) {
|
||||
Ok(e) => e,
|
||||
Err(_) => return Vec::new(),
|
||||
};
|
||||
|
||||
entries
|
||||
.filter_map(|entry| {
|
||||
let entry = entry.ok()?;
|
||||
let name = entry.file_name().into_string().ok()?;
|
||||
name.strip_suffix(".jsonl").map(String::from)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
fn round_trip_append_and_load() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let store = SessionStore::new(tmp.path()).unwrap();
|
||||
|
||||
store
|
||||
.append("telegram_user123", &ChatMessage::user("hello"))
|
||||
.unwrap();
|
||||
store
|
||||
.append("telegram_user123", &ChatMessage::assistant("hi there"))
|
||||
.unwrap();
|
||||
|
||||
let messages = store.load("telegram_user123");
|
||||
assert_eq!(messages.len(), 2);
|
||||
assert_eq!(messages[0].role, "user");
|
||||
assert_eq!(messages[0].content, "hello");
|
||||
assert_eq!(messages[1].role, "assistant");
|
||||
assert_eq!(messages[1].content, "hi there");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn load_nonexistent_session_returns_empty() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let store = SessionStore::new(tmp.path()).unwrap();
|
||||
|
||||
let messages = store.load("nonexistent");
|
||||
assert!(messages.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn key_sanitization() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let store = SessionStore::new(tmp.path()).unwrap();
|
||||
|
||||
// Keys with special chars should be sanitized
|
||||
store
|
||||
.append("slack/thread:123/user", &ChatMessage::user("test"))
|
||||
.unwrap();
|
||||
|
||||
let messages = store.load("slack/thread:123/user");
|
||||
assert_eq!(messages.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn list_sessions_returns_keys() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let store = SessionStore::new(tmp.path()).unwrap();
|
||||
|
||||
store
|
||||
.append("telegram_alice", &ChatMessage::user("hi"))
|
||||
.unwrap();
|
||||
store
|
||||
.append("discord_bob", &ChatMessage::user("hey"))
|
||||
.unwrap();
|
||||
|
||||
let mut sessions = store.list_sessions();
|
||||
sessions.sort();
|
||||
assert_eq!(sessions.len(), 2);
|
||||
assert!(sessions.contains(&"discord_bob".to_string()));
|
||||
assert!(sessions.contains(&"telegram_alice".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn append_is_truly_append_only() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let store = SessionStore::new(tmp.path()).unwrap();
|
||||
let key = "test_session";
|
||||
|
||||
store.append(key, &ChatMessage::user("msg1")).unwrap();
|
||||
store.append(key, &ChatMessage::user("msg2")).unwrap();
|
||||
|
||||
// Read raw file to verify append-only format
|
||||
let path = store.session_path(key);
|
||||
let content = std::fs::read_to_string(&path).unwrap();
|
||||
let lines: Vec<&str> = content.trim().lines().collect();
|
||||
assert_eq!(lines.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handles_corrupt_lines_gracefully() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let store = SessionStore::new(tmp.path()).unwrap();
|
||||
let key = "corrupt_test";
|
||||
|
||||
// Write valid message + corrupt line + valid message
|
||||
let path = store.session_path(key);
|
||||
std::fs::create_dir_all(path.parent().unwrap()).unwrap();
|
||||
let mut file = std::fs::File::create(&path).unwrap();
|
||||
writeln!(file, r#"{{"role":"user","content":"hello"}}"#).unwrap();
|
||||
writeln!(file, "this is not valid json").unwrap();
|
||||
writeln!(file, r#"{{"role":"assistant","content":"world"}}"#).unwrap();
|
||||
|
||||
let messages = store.load(key);
|
||||
assert_eq!(messages.len(), 2);
|
||||
assert_eq!(messages[0].content, "hello");
|
||||
assert_eq!(messages[1].content, "world");
|
||||
}
|
||||
}
|
||||
+38
-1
@@ -791,6 +791,11 @@ pub struct AgentConfig {
|
||||
/// Maximum conversation history messages retained per session. Default: `50`.
|
||||
#[serde(default = "default_agent_max_history_messages")]
|
||||
pub max_history_messages: usize,
|
||||
/// Maximum estimated tokens for conversation history before compaction triggers.
|
||||
/// Uses ~4 chars/token heuristic. When this threshold is exceeded, older messages
|
||||
/// are summarized to preserve context while staying within budget. Default: `32000`.
|
||||
#[serde(default = "default_agent_max_context_tokens")]
|
||||
pub max_context_tokens: usize,
|
||||
/// Enable parallel tool execution within a single iteration. Default: `false`.
|
||||
#[serde(default)]
|
||||
pub parallel_tools: bool,
|
||||
@@ -817,6 +822,10 @@ fn default_agent_max_history_messages() -> usize {
|
||||
50
|
||||
}
|
||||
|
||||
fn default_agent_max_context_tokens() -> usize {
|
||||
32_000
|
||||
}
|
||||
|
||||
fn default_agent_tool_dispatcher() -> String {
|
||||
"auto".into()
|
||||
}
|
||||
@@ -827,6 +836,7 @@ impl Default for AgentConfig {
|
||||
compact_context: false,
|
||||
max_tool_iterations: default_agent_max_tool_iterations(),
|
||||
max_history_messages: default_agent_max_history_messages(),
|
||||
max_context_tokens: default_agent_max_context_tokens(),
|
||||
parallel_tools: false,
|
||||
tool_dispatcher: default_agent_tool_dispatcher(),
|
||||
tool_call_dedup_exempt: Vec::new(),
|
||||
@@ -1413,6 +1423,10 @@ pub struct HttpRequestConfig {
|
||||
/// Request timeout in seconds (default: 30)
|
||||
#[serde(default = "default_http_timeout_secs")]
|
||||
pub timeout_secs: u64,
|
||||
/// Allow requests to private/LAN hosts (RFC 1918, loopback, link-local, .local).
|
||||
/// Default: false (deny private hosts for SSRF protection).
|
||||
#[serde(default)]
|
||||
pub allow_private_hosts: bool,
|
||||
}
|
||||
|
||||
impl Default for HttpRequestConfig {
|
||||
@@ -1422,6 +1436,7 @@ impl Default for HttpRequestConfig {
|
||||
allowed_domains: vec![],
|
||||
max_response_size: default_http_max_response_size(),
|
||||
timeout_secs: default_http_timeout_secs(),
|
||||
allow_private_hosts: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2895,22 +2910,34 @@ pub struct HeartbeatConfig {
|
||||
pub enabled: bool,
|
||||
/// Interval in minutes between heartbeat pings. Default: `30`.
|
||||
pub interval_minutes: u32,
|
||||
/// Enable two-phase heartbeat: Phase 1 asks LLM whether to run, Phase 2
|
||||
/// executes only when the LLM decides there is work to do. Saves API cost
|
||||
/// during quiet periods. Default: `true`.
|
||||
#[serde(default = "default_two_phase")]
|
||||
pub two_phase: bool,
|
||||
/// Optional fallback task text when `HEARTBEAT.md` has no task entries.
|
||||
#[serde(default)]
|
||||
pub message: Option<String>,
|
||||
/// Optional delivery channel for heartbeat output (for example: `telegram`).
|
||||
/// When omitted, auto-selects the first configured channel.
|
||||
#[serde(default, alias = "channel")]
|
||||
pub target: Option<String>,
|
||||
/// Optional delivery recipient/chat identifier (required when `target` is set).
|
||||
/// Optional delivery recipient/chat identifier (required when `target` is
|
||||
/// explicitly set).
|
||||
#[serde(default, alias = "recipient")]
|
||||
pub to: Option<String>,
|
||||
}
|
||||
|
||||
fn default_two_phase() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
impl Default for HeartbeatConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enabled: false,
|
||||
interval_minutes: 30,
|
||||
two_phase: true,
|
||||
message: None,
|
||||
target: None,
|
||||
to: None,
|
||||
@@ -3040,6 +3067,7 @@ impl<T: ChannelConfig> crate::config::traits::ConfigHandle for ConfigWrapper<T>
|
||||
///
|
||||
/// Each channel sub-section (e.g. `telegram`, `discord`) is optional;
|
||||
/// setting it to `Some(...)` enables that channel.
|
||||
#[allow(clippy::struct_excessive_bools)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
|
||||
pub struct ChannelsConfig {
|
||||
/// Enable the CLI interactive channel. Default: `true`.
|
||||
@@ -3102,6 +3130,10 @@ pub struct ChannelsConfig {
|
||||
/// not forwarded as individual channel messages. Default: `true`.
|
||||
#[serde(default = "default_true")]
|
||||
pub show_tool_calls: bool,
|
||||
/// Persist channel conversation history to JSONL files so sessions survive
|
||||
/// daemon restarts. Files are stored in `{workspace}/sessions/`. Default: `true`.
|
||||
#[serde(default = "default_true")]
|
||||
pub session_persistence: bool,
|
||||
}
|
||||
|
||||
impl ChannelsConfig {
|
||||
@@ -3236,6 +3268,7 @@ impl Default for ChannelsConfig {
|
||||
message_timeout_secs: default_channel_message_timeout_secs(),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_persistence: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6217,6 +6250,7 @@ default_temperature = 0.7
|
||||
heartbeat: HeartbeatConfig {
|
||||
enabled: true,
|
||||
interval_minutes: 15,
|
||||
two_phase: true,
|
||||
message: Some("Check London time".into()),
|
||||
target: Some("telegram".into()),
|
||||
to: Some("123456".into()),
|
||||
@@ -6256,6 +6290,7 @@ default_temperature = 0.7
|
||||
message_timeout_secs: 300,
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_persistence: true,
|
||||
},
|
||||
memory: MemoryConfig::default(),
|
||||
storage: StorageConfig::default(),
|
||||
@@ -6970,6 +7005,7 @@ allowed_users = ["@ops:matrix.org"]
|
||||
message_timeout_secs: 300,
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_persistence: true,
|
||||
};
|
||||
let toml_str = toml::to_string_pretty(&c).unwrap();
|
||||
let parsed: ChannelsConfig = toml::from_str(&toml_str).unwrap();
|
||||
@@ -7197,6 +7233,7 @@ channel_id = "C123"
|
||||
message_timeout_secs: 300,
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_persistence: true,
|
||||
};
|
||||
let toml_str = toml::to_string_pretty(&c).unwrap();
|
||||
let parsed: ChannelsConfig = toml::from_str(&toml_str).unwrap();
|
||||
|
||||
+172
-21
@@ -152,44 +152,122 @@ pub fn handle_command(command: crate::CronCommands, config: &Config) -> Result<(
|
||||
crate::CronCommands::Add {
|
||||
expression,
|
||||
tz,
|
||||
agent,
|
||||
command,
|
||||
} => {
|
||||
let schedule = Schedule::Cron {
|
||||
expr: expression,
|
||||
tz,
|
||||
};
|
||||
let job = add_shell_job(config, None, schedule, &command)?;
|
||||
println!("✅ Added cron job {}", job.id);
|
||||
println!(" Expr: {}", job.expression);
|
||||
println!(" Next: {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
if agent {
|
||||
let job = add_agent_job(
|
||||
config,
|
||||
None,
|
||||
schedule,
|
||||
&command,
|
||||
SessionTarget::Isolated,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
)?;
|
||||
println!("✅ Added agent cron job {}", job.id);
|
||||
println!(" Expr : {}", job.expression);
|
||||
println!(" Next : {}", job.next_run.to_rfc3339());
|
||||
println!(" Prompt: {}", job.prompt.as_deref().unwrap_or_default());
|
||||
} else {
|
||||
let job = add_shell_job(config, None, schedule, &command)?;
|
||||
println!("✅ Added cron job {}", job.id);
|
||||
println!(" Expr: {}", job.expression);
|
||||
println!(" Next: {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
crate::CronCommands::AddAt { at, command } => {
|
||||
crate::CronCommands::AddAt { at, agent, command } => {
|
||||
let at = chrono::DateTime::parse_from_rfc3339(&at)
|
||||
.map_err(|e| anyhow::anyhow!("Invalid RFC3339 timestamp for --at: {e}"))?
|
||||
.with_timezone(&chrono::Utc);
|
||||
let schedule = Schedule::At { at };
|
||||
let job = add_shell_job(config, None, schedule, &command)?;
|
||||
println!("✅ Added one-shot cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
if agent {
|
||||
let job = add_agent_job(
|
||||
config,
|
||||
None,
|
||||
schedule,
|
||||
&command,
|
||||
SessionTarget::Isolated,
|
||||
None,
|
||||
None,
|
||||
true,
|
||||
)?;
|
||||
println!("✅ Added one-shot agent cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
println!(" Prompt: {}", job.prompt.as_deref().unwrap_or_default());
|
||||
} else {
|
||||
let job = add_shell_job(config, None, schedule, &command)?;
|
||||
println!("✅ Added one-shot cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
crate::CronCommands::AddEvery { every_ms, command } => {
|
||||
crate::CronCommands::AddEvery {
|
||||
every_ms,
|
||||
agent,
|
||||
command,
|
||||
} => {
|
||||
let schedule = Schedule::Every { every_ms };
|
||||
let job = add_shell_job(config, None, schedule, &command)?;
|
||||
println!("✅ Added interval cron job {}", job.id);
|
||||
println!(" Every(ms): {every_ms}");
|
||||
println!(" Next : {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
if agent {
|
||||
let job = add_agent_job(
|
||||
config,
|
||||
None,
|
||||
schedule,
|
||||
&command,
|
||||
SessionTarget::Isolated,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
)?;
|
||||
println!("✅ Added interval agent cron job {}", job.id);
|
||||
println!(" Every(ms): {every_ms}");
|
||||
println!(" Next : {}", job.next_run.to_rfc3339());
|
||||
println!(" Prompt : {}", job.prompt.as_deref().unwrap_or_default());
|
||||
} else {
|
||||
let job = add_shell_job(config, None, schedule, &command)?;
|
||||
println!("✅ Added interval cron job {}", job.id);
|
||||
println!(" Every(ms): {every_ms}");
|
||||
println!(" Next : {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
crate::CronCommands::Once { delay, command } => {
|
||||
let job = add_once(config, &delay, &command)?;
|
||||
println!("✅ Added one-shot cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
crate::CronCommands::Once {
|
||||
delay,
|
||||
agent,
|
||||
command,
|
||||
} => {
|
||||
if agent {
|
||||
let duration = parse_delay(&delay)?;
|
||||
let at = chrono::Utc::now() + duration;
|
||||
let schedule = Schedule::At { at };
|
||||
let job = add_agent_job(
|
||||
config,
|
||||
None,
|
||||
schedule,
|
||||
&command,
|
||||
SessionTarget::Isolated,
|
||||
None,
|
||||
None,
|
||||
true,
|
||||
)?;
|
||||
println!("✅ Added one-shot agent cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
println!(" Prompt: {}", job.prompt.as_deref().unwrap_or_default());
|
||||
} else {
|
||||
let job = add_once(config, &delay, &command)?;
|
||||
println!("✅ Added one-shot cron job {}", job.id);
|
||||
println!(" At : {}", job.next_run.to_rfc3339());
|
||||
println!(" Cmd : {}", job.command);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
crate::CronCommands::Update {
|
||||
@@ -686,4 +764,77 @@ mod tests {
|
||||
.to_string()
|
||||
.contains("blocked by security policy"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cli_agent_flag_creates_agent_job() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let config = test_config(&tmp);
|
||||
|
||||
handle_command(
|
||||
crate::CronCommands::Add {
|
||||
expression: "*/15 * * * *".into(),
|
||||
tz: None,
|
||||
agent: true,
|
||||
command: "Check server health: disk space, memory, CPU load".into(),
|
||||
},
|
||||
&config,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let jobs = list_jobs(&config).unwrap();
|
||||
assert_eq!(jobs.len(), 1);
|
||||
assert_eq!(jobs[0].job_type, JobType::Agent);
|
||||
assert_eq!(
|
||||
jobs[0].prompt.as_deref(),
|
||||
Some("Check server health: disk space, memory, CPU load")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cli_agent_flag_bypasses_shell_security_validation() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let mut config = test_config(&tmp);
|
||||
config.autonomy.allowed_commands = vec!["echo".into()];
|
||||
config.autonomy.level = crate::security::AutonomyLevel::Supervised;
|
||||
|
||||
// Without --agent, a natural language string would be blocked by shell
|
||||
// security policy. With --agent, it routes to agent job and skips
|
||||
// shell validation entirely.
|
||||
let result = handle_command(
|
||||
crate::CronCommands::Add {
|
||||
expression: "*/15 * * * *".into(),
|
||||
tz: None,
|
||||
agent: true,
|
||||
command: "Check server health: disk space, memory, CPU load".into(),
|
||||
},
|
||||
&config,
|
||||
);
|
||||
assert!(result.is_ok());
|
||||
|
||||
let jobs = list_jobs(&config).unwrap();
|
||||
assert_eq!(jobs.len(), 1);
|
||||
assert_eq!(jobs[0].job_type, JobType::Agent);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cli_without_agent_flag_defaults_to_shell_job() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let config = test_config(&tmp);
|
||||
|
||||
handle_command(
|
||||
crate::CronCommands::Add {
|
||||
expression: "*/5 * * * *".into(),
|
||||
tz: None,
|
||||
agent: false,
|
||||
command: "echo ok".into(),
|
||||
},
|
||||
&config,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let jobs = list_jobs(&config).unwrap();
|
||||
assert_eq!(jobs.len(), 1);
|
||||
assert_eq!(jobs[0].job_type, JobType::Shell);
|
||||
assert_eq!(jobs[0].command, "echo ok");
|
||||
}
|
||||
}
|
||||
|
||||
+140
-58
@@ -203,14 +203,17 @@ where
|
||||
}
|
||||
|
||||
async fn run_heartbeat_worker(config: Config) -> Result<()> {
|
||||
use crate::heartbeat::engine::HeartbeatEngine;
|
||||
|
||||
let observer: std::sync::Arc<dyn crate::observability::Observer> =
|
||||
std::sync::Arc::from(crate::observability::create_observer(&config.observability));
|
||||
let engine = crate::heartbeat::engine::HeartbeatEngine::new(
|
||||
let engine = HeartbeatEngine::new(
|
||||
config.heartbeat.clone(),
|
||||
config.workspace_dir.clone(),
|
||||
observer,
|
||||
);
|
||||
let delivery = heartbeat_delivery_target(&config)?;
|
||||
let delivery = resolve_heartbeat_delivery(&config)?;
|
||||
let two_phase = config.heartbeat.two_phase;
|
||||
|
||||
let interval_mins = config.heartbeat.interval_minutes.max(5);
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(u64::from(interval_mins) * 60));
|
||||
@@ -218,14 +221,71 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
let file_tasks = engine.collect_tasks().await?;
|
||||
let tasks = heartbeat_tasks_for_tick(file_tasks, config.heartbeat.message.as_deref());
|
||||
// Collect runnable tasks (active only, sorted by priority)
|
||||
let mut tasks = engine.collect_runnable_tasks().await?;
|
||||
if tasks.is_empty() {
|
||||
continue;
|
||||
// Try fallback message
|
||||
if let Some(fallback) = config
|
||||
.heartbeat
|
||||
.message
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.filter(|m| !m.is_empty())
|
||||
{
|
||||
tasks.push(crate::heartbeat::engine::HeartbeatTask {
|
||||
text: fallback.to_string(),
|
||||
priority: crate::heartbeat::engine::TaskPriority::Medium,
|
||||
status: crate::heartbeat::engine::TaskStatus::Active,
|
||||
});
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
for task in tasks {
|
||||
let prompt = format!("[Heartbeat Task] {task}");
|
||||
// ── Phase 1: LLM decision (two-phase mode) ──────────────
|
||||
let tasks_to_run = if two_phase {
|
||||
let decision_prompt = HeartbeatEngine::build_decision_prompt(&tasks);
|
||||
match crate::agent::run(
|
||||
config.clone(),
|
||||
Some(decision_prompt),
|
||||
None,
|
||||
None,
|
||||
0.0, // Low temperature for deterministic decision
|
||||
vec![],
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(response) => {
|
||||
let indices = HeartbeatEngine::parse_decision_response(&response, tasks.len());
|
||||
if indices.is_empty() {
|
||||
tracing::info!("💓 Heartbeat Phase 1: skip (nothing to do)");
|
||||
crate::health::mark_component_ok("heartbeat");
|
||||
continue;
|
||||
}
|
||||
tracing::info!(
|
||||
"💓 Heartbeat Phase 1: run {} of {} tasks",
|
||||
indices.len(),
|
||||
tasks.len()
|
||||
);
|
||||
indices
|
||||
.into_iter()
|
||||
.filter_map(|i| tasks.get(i).cloned())
|
||||
.collect()
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("💓 Heartbeat Phase 1 failed, running all tasks: {e}");
|
||||
tasks
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tasks
|
||||
};
|
||||
|
||||
// ── Phase 2: Execute selected tasks ─────────────────────
|
||||
for task in &tasks_to_run {
|
||||
let prompt = format!("[Heartbeat Task | {}] {}", task.priority, task.text);
|
||||
let temp = config.default_temperature;
|
||||
match crate::agent::run(
|
||||
config.clone(),
|
||||
@@ -242,7 +302,7 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
|
||||
Ok(output) => {
|
||||
crate::health::mark_component_ok("heartbeat");
|
||||
let announcement = if output.trim().is_empty() {
|
||||
"heartbeat task executed".to_string()
|
||||
format!("💓 heartbeat task completed: {}", task.text)
|
||||
} else {
|
||||
output
|
||||
};
|
||||
@@ -272,22 +332,8 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
fn heartbeat_tasks_for_tick(
|
||||
file_tasks: Vec<String>,
|
||||
fallback_message: Option<&str>,
|
||||
) -> Vec<String> {
|
||||
if !file_tasks.is_empty() {
|
||||
return file_tasks;
|
||||
}
|
||||
|
||||
fallback_message
|
||||
.map(str::trim)
|
||||
.filter(|message| !message.is_empty())
|
||||
.map(|message| vec![message.to_string()])
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn heartbeat_delivery_target(config: &Config) -> Result<Option<(String, String)>> {
|
||||
/// Resolve delivery target: explicit config > auto-detect first configured channel.
|
||||
fn resolve_heartbeat_delivery(config: &Config) -> Result<Option<(String, String)>> {
|
||||
let channel = config
|
||||
.heartbeat
|
||||
.target
|
||||
@@ -302,16 +348,45 @@ fn heartbeat_delivery_target(config: &Config) -> Result<Option<(String, String)>
|
||||
.filter(|value| !value.is_empty());
|
||||
|
||||
match (channel, target) {
|
||||
(None, None) => Ok(None),
|
||||
(Some(_), None) => anyhow::bail!("heartbeat.to is required when heartbeat.target is set"),
|
||||
(None, Some(_)) => anyhow::bail!("heartbeat.target is required when heartbeat.to is set"),
|
||||
// Both explicitly set — validate and use.
|
||||
(Some(channel), Some(target)) => {
|
||||
validate_heartbeat_channel_config(config, channel)?;
|
||||
Ok(Some((channel.to_string(), target.to_string())))
|
||||
}
|
||||
// Only one set — error.
|
||||
(Some(_), None) => anyhow::bail!("heartbeat.to is required when heartbeat.target is set"),
|
||||
(None, Some(_)) => anyhow::bail!("heartbeat.target is required when heartbeat.to is set"),
|
||||
// Neither set — try auto-detect the first configured channel.
|
||||
(None, None) => Ok(auto_detect_heartbeat_channel(config)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Auto-detect the best channel for heartbeat delivery by checking which
|
||||
/// channels are configured. Returns the first match in priority order.
|
||||
fn auto_detect_heartbeat_channel(config: &Config) -> Option<(String, String)> {
|
||||
// Priority order: telegram > discord > slack > mattermost
|
||||
if let Some(tg) = &config.channels_config.telegram {
|
||||
// Use the first allowed_user as target, or fall back to empty (broadcast)
|
||||
let target = tg.allowed_users.first().cloned().unwrap_or_default();
|
||||
if !target.is_empty() {
|
||||
return Some(("telegram".to_string(), target));
|
||||
}
|
||||
}
|
||||
if config.channels_config.discord.is_some() {
|
||||
// Discord requires explicit target — can't auto-detect
|
||||
return None;
|
||||
}
|
||||
if config.channels_config.slack.is_some() {
|
||||
// Slack requires explicit target
|
||||
return None;
|
||||
}
|
||||
if config.channels_config.mattermost.is_some() {
|
||||
// Mattermost requires explicit target
|
||||
return None;
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn validate_heartbeat_channel_config(config: &Config, channel: &str) -> Result<()> {
|
||||
match channel.to_ascii_lowercase().as_str() {
|
||||
"telegram" => {
|
||||
@@ -487,75 +562,56 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heartbeat_tasks_use_file_tasks_when_available() {
|
||||
let tasks =
|
||||
heartbeat_tasks_for_tick(vec!["From file".to_string()], Some("Fallback from config"));
|
||||
assert_eq!(tasks, vec!["From file".to_string()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heartbeat_tasks_fall_back_to_config_message() {
|
||||
let tasks = heartbeat_tasks_for_tick(vec![], Some(" check london time "));
|
||||
assert_eq!(tasks, vec!["check london time".to_string()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heartbeat_tasks_ignore_empty_fallback_message() {
|
||||
let tasks = heartbeat_tasks_for_tick(vec![], Some(" "));
|
||||
assert!(tasks.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heartbeat_delivery_target_none_when_unset() {
|
||||
fn resolve_delivery_none_when_unset() {
|
||||
let config = Config::default();
|
||||
let target = heartbeat_delivery_target(&config).unwrap();
|
||||
let target = resolve_heartbeat_delivery(&config).unwrap();
|
||||
assert!(target.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heartbeat_delivery_target_requires_to_field() {
|
||||
fn resolve_delivery_requires_to_field() {
|
||||
let mut config = Config::default();
|
||||
config.heartbeat.target = Some("telegram".into());
|
||||
let err = heartbeat_delivery_target(&config).unwrap_err();
|
||||
let err = resolve_heartbeat_delivery(&config).unwrap_err();
|
||||
assert!(err
|
||||
.to_string()
|
||||
.contains("heartbeat.to is required when heartbeat.target is set"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heartbeat_delivery_target_requires_target_field() {
|
||||
fn resolve_delivery_requires_target_field() {
|
||||
let mut config = Config::default();
|
||||
config.heartbeat.to = Some("123456".into());
|
||||
let err = heartbeat_delivery_target(&config).unwrap_err();
|
||||
let err = resolve_heartbeat_delivery(&config).unwrap_err();
|
||||
assert!(err
|
||||
.to_string()
|
||||
.contains("heartbeat.target is required when heartbeat.to is set"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heartbeat_delivery_target_rejects_unsupported_channel() {
|
||||
fn resolve_delivery_rejects_unsupported_channel() {
|
||||
let mut config = Config::default();
|
||||
config.heartbeat.target = Some("email".into());
|
||||
config.heartbeat.to = Some("ops@example.com".into());
|
||||
let err = heartbeat_delivery_target(&config).unwrap_err();
|
||||
let err = resolve_heartbeat_delivery(&config).unwrap_err();
|
||||
assert!(err
|
||||
.to_string()
|
||||
.contains("unsupported heartbeat.target channel"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heartbeat_delivery_target_requires_channel_configuration() {
|
||||
fn resolve_delivery_requires_channel_configuration() {
|
||||
let mut config = Config::default();
|
||||
config.heartbeat.target = Some("telegram".into());
|
||||
config.heartbeat.to = Some("123456".into());
|
||||
let err = heartbeat_delivery_target(&config).unwrap_err();
|
||||
let err = resolve_heartbeat_delivery(&config).unwrap_err();
|
||||
assert!(err
|
||||
.to_string()
|
||||
.contains("channels_config.telegram is not configured"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn heartbeat_delivery_target_accepts_telegram_configuration() {
|
||||
fn resolve_delivery_accepts_telegram_configuration() {
|
||||
let mut config = Config::default();
|
||||
config.heartbeat.target = Some("telegram".into());
|
||||
config.heartbeat.to = Some("123456".into());
|
||||
@@ -568,7 +624,33 @@ mod tests {
|
||||
mention_only: false,
|
||||
});
|
||||
|
||||
let target = heartbeat_delivery_target(&config).unwrap();
|
||||
let target = resolve_heartbeat_delivery(&config).unwrap();
|
||||
assert_eq!(target, Some(("telegram".to_string(), "123456".to_string())));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn auto_detect_telegram_when_configured() {
|
||||
let mut config = Config::default();
|
||||
config.channels_config.telegram = Some(crate::config::TelegramConfig {
|
||||
bot_token: "bot-token".into(),
|
||||
allowed_users: vec!["user123".into()],
|
||||
stream_mode: crate::config::StreamMode::default(),
|
||||
draft_update_interval_ms: 1000,
|
||||
interrupt_on_new_message: false,
|
||||
mention_only: false,
|
||||
});
|
||||
|
||||
let target = resolve_heartbeat_delivery(&config).unwrap();
|
||||
assert_eq!(
|
||||
target,
|
||||
Some(("telegram".to_string(), "user123".to_string()))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn auto_detect_none_when_no_channels() {
|
||||
let config = Config::default();
|
||||
let target = auto_detect_heartbeat_channel(&config);
|
||||
assert!(target.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
+399
-27
@@ -1,11 +1,75 @@
|
||||
use crate::config::HeartbeatConfig;
|
||||
use crate::observability::{Observer, ObserverEvent};
|
||||
use anyhow::Result;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::{self, Duration};
|
||||
use tracing::{info, warn};
|
||||
|
||||
// ── Structured task types ────────────────────────────────────────
|
||||
|
||||
/// Priority level for a heartbeat task.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum TaskPriority {
|
||||
Low,
|
||||
Medium,
|
||||
High,
|
||||
}
|
||||
|
||||
impl fmt::Display for TaskPriority {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Low => write!(f, "low"),
|
||||
Self::Medium => write!(f, "medium"),
|
||||
Self::High => write!(f, "high"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Status of a heartbeat task.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum TaskStatus {
|
||||
Active,
|
||||
Paused,
|
||||
Completed,
|
||||
}
|
||||
|
||||
impl fmt::Display for TaskStatus {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Active => write!(f, "active"),
|
||||
Self::Paused => write!(f, "paused"),
|
||||
Self::Completed => write!(f, "completed"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A structured heartbeat task with priority and status metadata.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct HeartbeatTask {
|
||||
pub text: String,
|
||||
pub priority: TaskPriority,
|
||||
pub status: TaskStatus,
|
||||
}
|
||||
|
||||
impl HeartbeatTask {
|
||||
pub fn is_runnable(&self) -> bool {
|
||||
self.status == TaskStatus::Active
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for HeartbeatTask {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "[{}] {}", self.priority, self.text)
|
||||
}
|
||||
}
|
||||
|
||||
// ── Engine ───────────────────────────────────────────────────────
|
||||
|
||||
/// Heartbeat engine — reads HEARTBEAT.md and executes tasks periodically
|
||||
pub struct HeartbeatEngine {
|
||||
config: HeartbeatConfig,
|
||||
@@ -64,8 +128,8 @@ impl HeartbeatEngine {
|
||||
Ok(self.collect_tasks().await?.len())
|
||||
}
|
||||
|
||||
/// Read HEARTBEAT.md and return all parsed tasks.
|
||||
pub async fn collect_tasks(&self) -> Result<Vec<String>> {
|
||||
/// Read HEARTBEAT.md and return all parsed structured tasks.
|
||||
pub async fn collect_tasks(&self) -> Result<Vec<HeartbeatTask>> {
|
||||
let heartbeat_path = self.workspace_dir.join("HEARTBEAT.md");
|
||||
if !heartbeat_path.exists() {
|
||||
return Ok(Vec::new());
|
||||
@@ -74,13 +138,145 @@ impl HeartbeatEngine {
|
||||
Ok(Self::parse_tasks(&content))
|
||||
}
|
||||
|
||||
/// Parse tasks from HEARTBEAT.md (lines starting with `- `)
|
||||
fn parse_tasks(content: &str) -> Vec<String> {
|
||||
/// Collect only runnable (active) tasks, sorted by priority (high first).
|
||||
pub async fn collect_runnable_tasks(&self) -> Result<Vec<HeartbeatTask>> {
|
||||
let mut tasks: Vec<HeartbeatTask> = self
|
||||
.collect_tasks()
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(HeartbeatTask::is_runnable)
|
||||
.collect();
|
||||
// Sort by priority descending (High > Medium > Low)
|
||||
tasks.sort_by(|a, b| b.priority.cmp(&a.priority));
|
||||
Ok(tasks)
|
||||
}
|
||||
|
||||
/// Parse tasks from HEARTBEAT.md with structured metadata support.
|
||||
///
|
||||
/// Supports both legacy flat format and new structured format:
|
||||
///
|
||||
/// Legacy:
|
||||
/// `- Check email` → medium priority, active status
|
||||
///
|
||||
/// Structured:
|
||||
/// `- [high] Check email` → high priority, active
|
||||
/// `- [low|paused] Review old PRs` → low priority, paused
|
||||
/// `- [completed] Old task` → medium priority, completed
|
||||
fn parse_tasks(content: &str) -> Vec<HeartbeatTask> {
|
||||
content
|
||||
.lines()
|
||||
.filter_map(|line| {
|
||||
let trimmed = line.trim();
|
||||
trimmed.strip_prefix("- ").map(ToString::to_string)
|
||||
let text = trimmed.strip_prefix("- ")?;
|
||||
if text.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(Self::parse_task_line(text))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Parse a single task line into a structured `HeartbeatTask`.
|
||||
///
|
||||
/// Format: `[priority|status] task text` or just `task text`.
|
||||
fn parse_task_line(text: &str) -> HeartbeatTask {
|
||||
if let Some(rest) = text.strip_prefix('[') {
|
||||
if let Some((meta, task_text)) = rest.split_once(']') {
|
||||
let task_text = task_text.trim();
|
||||
if !task_text.is_empty() {
|
||||
let (priority, status) = Self::parse_meta(meta);
|
||||
return HeartbeatTask {
|
||||
text: task_text.to_string(),
|
||||
priority,
|
||||
status,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
// No metadata — default to medium/active
|
||||
HeartbeatTask {
|
||||
text: text.to_string(),
|
||||
priority: TaskPriority::Medium,
|
||||
status: TaskStatus::Active,
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse metadata tags like `high`, `low|paused`, `completed`.
|
||||
fn parse_meta(meta: &str) -> (TaskPriority, TaskStatus) {
|
||||
let mut priority = TaskPriority::Medium;
|
||||
let mut status = TaskStatus::Active;
|
||||
|
||||
for part in meta.split('|') {
|
||||
match part.trim().to_ascii_lowercase().as_str() {
|
||||
"high" => priority = TaskPriority::High,
|
||||
"medium" | "med" => priority = TaskPriority::Medium,
|
||||
"low" => priority = TaskPriority::Low,
|
||||
"active" => status = TaskStatus::Active,
|
||||
"paused" | "pause" => status = TaskStatus::Paused,
|
||||
"completed" | "complete" | "done" => status = TaskStatus::Completed,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
(priority, status)
|
||||
}
|
||||
|
||||
/// Build the Phase 1 LLM decision prompt for two-phase heartbeat.
|
||||
pub fn build_decision_prompt(tasks: &[HeartbeatTask]) -> String {
|
||||
let mut prompt = String::from(
|
||||
"You are a heartbeat scheduler. Review the following periodic tasks and decide \
|
||||
whether any should be executed right now.\n\n\
|
||||
Consider:\n\
|
||||
- Task priority (high tasks are more urgent)\n\
|
||||
- Whether the task is time-sensitive or can wait\n\
|
||||
- Whether running the task now would provide value\n\n\
|
||||
Tasks:\n",
|
||||
);
|
||||
|
||||
for (i, task) in tasks.iter().enumerate() {
|
||||
use std::fmt::Write;
|
||||
let _ = writeln!(prompt, "{}. [{}] {}", i + 1, task.priority, task.text);
|
||||
}
|
||||
|
||||
prompt.push_str(
|
||||
"\nRespond with ONLY one of:\n\
|
||||
- `run: 1,2,3` (comma-separated task numbers to execute)\n\
|
||||
- `skip` (nothing needs to run right now)\n\n\
|
||||
Be conservative — skip if tasks are routine and not time-sensitive.",
|
||||
);
|
||||
|
||||
prompt
|
||||
}
|
||||
|
||||
/// Parse the Phase 1 LLM decision response.
|
||||
///
|
||||
/// Returns indices of tasks to run, or empty vec if skipped.
|
||||
pub fn parse_decision_response(response: &str, task_count: usize) -> Vec<usize> {
|
||||
let trimmed = response.trim().to_ascii_lowercase();
|
||||
|
||||
if trimmed == "skip" || trimmed.starts_with("skip") {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
// Look for "run: 1,2,3" pattern
|
||||
let numbers_part = if let Some(after_run) = trimmed.strip_prefix("run:") {
|
||||
after_run.trim()
|
||||
} else if let Some(after_run) = trimmed.strip_prefix("run ") {
|
||||
after_run.trim()
|
||||
} else {
|
||||
// Try to parse as bare numbers
|
||||
trimmed.as_str()
|
||||
};
|
||||
|
||||
numbers_part
|
||||
.split(',')
|
||||
.filter_map(|s| {
|
||||
let n: usize = s.trim().parse().ok()?;
|
||||
if n >= 1 && n <= task_count {
|
||||
Some(n - 1) // Convert to 0-indexed
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
@@ -93,10 +289,14 @@ impl HeartbeatEngine {
|
||||
# Add tasks below (one per line, starting with `- `)\n\
|
||||
# The agent will check this file on each heartbeat tick.\n\
|
||||
#\n\
|
||||
# Format: - [priority|status] Task description\n\
|
||||
# priority: high, medium (default), low\n\
|
||||
# status: active (default), paused, completed\n\
|
||||
#\n\
|
||||
# Examples:\n\
|
||||
# - Check my email for important messages\n\
|
||||
# - [high] Check my email for important messages\n\
|
||||
# - Review my calendar for upcoming events\n\
|
||||
# - Check the weather forecast\n";
|
||||
# - [low|paused] Check the weather forecast\n";
|
||||
tokio::fs::write(&path, default).await?;
|
||||
}
|
||||
Ok(())
|
||||
@@ -112,9 +312,9 @@ mod tests {
|
||||
let content = "# Tasks\n\n- Check email\n- Review calendar\nNot a task\n- Third task";
|
||||
let tasks = HeartbeatEngine::parse_tasks(content);
|
||||
assert_eq!(tasks.len(), 3);
|
||||
assert_eq!(tasks[0], "Check email");
|
||||
assert_eq!(tasks[1], "Review calendar");
|
||||
assert_eq!(tasks[2], "Third task");
|
||||
assert_eq!(tasks[0].text, "Check email");
|
||||
assert_eq!(tasks[0].priority, TaskPriority::Medium);
|
||||
assert_eq!(tasks[0].status, TaskStatus::Active);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -133,26 +333,21 @@ mod tests {
|
||||
let content = " - Indented task\n\t- Tab indented";
|
||||
let tasks = HeartbeatEngine::parse_tasks(content);
|
||||
assert_eq!(tasks.len(), 2);
|
||||
assert_eq!(tasks[0], "Indented task");
|
||||
assert_eq!(tasks[1], "Tab indented");
|
||||
assert_eq!(tasks[0].text, "Indented task");
|
||||
assert_eq!(tasks[1].text, "Tab indented");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_tasks_dash_without_space_ignored() {
|
||||
let content = "- Real task\n-\n- Another";
|
||||
let tasks = HeartbeatEngine::parse_tasks(content);
|
||||
// "-" trimmed = "-", does NOT start with "- " => skipped
|
||||
// "- Real task" => "Real task"
|
||||
// "- Another" => "Another"
|
||||
assert_eq!(tasks.len(), 2);
|
||||
assert_eq!(tasks[0], "Real task");
|
||||
assert_eq!(tasks[1], "Another");
|
||||
assert_eq!(tasks[0].text, "Real task");
|
||||
assert_eq!(tasks[1].text, "Another");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_tasks_trailing_space_bullet_trimmed_to_dash() {
|
||||
// "- " trimmed becomes "-" (trim removes trailing space)
|
||||
// "-" does NOT start with "- " => skipped
|
||||
let content = "- ";
|
||||
let tasks = HeartbeatEngine::parse_tasks(content);
|
||||
assert_eq!(tasks.len(), 0);
|
||||
@@ -160,11 +355,10 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn parse_tasks_bullet_with_content_after_spaces() {
|
||||
// "- hello " trimmed becomes "- hello" => starts_with "- " => "hello"
|
||||
let content = "- hello ";
|
||||
let tasks = HeartbeatEngine::parse_tasks(content);
|
||||
assert_eq!(tasks.len(), 1);
|
||||
assert_eq!(tasks[0], "hello");
|
||||
assert_eq!(tasks[0].text, "hello");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -172,8 +366,8 @@ mod tests {
|
||||
let content = "- Check email 📧\n- Review calendar 📅\n- 日本語タスク";
|
||||
let tasks = HeartbeatEngine::parse_tasks(content);
|
||||
assert_eq!(tasks.len(), 3);
|
||||
assert!(tasks[0].contains("📧"));
|
||||
assert!(tasks[2].contains("日本語"));
|
||||
assert!(tasks[0].text.contains('📧'));
|
||||
assert!(tasks[2].text.contains("日本語"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -181,15 +375,15 @@ mod tests {
|
||||
let content = "# Periodic Tasks\n\n## Quick\n- Task A\n\n## Long\n- Task B\n\n* Not a dash bullet\n1. Not numbered";
|
||||
let tasks = HeartbeatEngine::parse_tasks(content);
|
||||
assert_eq!(tasks.len(), 2);
|
||||
assert_eq!(tasks[0], "Task A");
|
||||
assert_eq!(tasks[1], "Task B");
|
||||
assert_eq!(tasks[0].text, "Task A");
|
||||
assert_eq!(tasks[1].text, "Task B");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_tasks_single_task() {
|
||||
let tasks = HeartbeatEngine::parse_tasks("- Only one");
|
||||
assert_eq!(tasks.len(), 1);
|
||||
assert_eq!(tasks[0], "Only one");
|
||||
assert_eq!(tasks[0].text, "Only one");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -201,9 +395,153 @@ mod tests {
|
||||
});
|
||||
let tasks = HeartbeatEngine::parse_tasks(&content);
|
||||
assert_eq!(tasks.len(), 100);
|
||||
assert_eq!(tasks[99], "Task 99");
|
||||
assert_eq!(tasks[99].text, "Task 99");
|
||||
}
|
||||
|
||||
// ── Structured task parsing tests ────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn parse_task_with_high_priority() {
|
||||
let content = "- [high] Urgent email check";
|
||||
let tasks = HeartbeatEngine::parse_tasks(content);
|
||||
assert_eq!(tasks.len(), 1);
|
||||
assert_eq!(tasks[0].text, "Urgent email check");
|
||||
assert_eq!(tasks[0].priority, TaskPriority::High);
|
||||
assert_eq!(tasks[0].status, TaskStatus::Active);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_task_with_low_paused() {
|
||||
let content = "- [low|paused] Review old PRs";
|
||||
let tasks = HeartbeatEngine::parse_tasks(content);
|
||||
assert_eq!(tasks.len(), 1);
|
||||
assert_eq!(tasks[0].text, "Review old PRs");
|
||||
assert_eq!(tasks[0].priority, TaskPriority::Low);
|
||||
assert_eq!(tasks[0].status, TaskStatus::Paused);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_task_completed() {
|
||||
let content = "- [completed] Old task";
|
||||
let tasks = HeartbeatEngine::parse_tasks(content);
|
||||
assert_eq!(tasks.len(), 1);
|
||||
assert_eq!(tasks[0].priority, TaskPriority::Medium);
|
||||
assert_eq!(tasks[0].status, TaskStatus::Completed);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_task_without_metadata_defaults() {
|
||||
let content = "- Plain task";
|
||||
let tasks = HeartbeatEngine::parse_tasks(content);
|
||||
assert_eq!(tasks.len(), 1);
|
||||
assert_eq!(tasks[0].text, "Plain task");
|
||||
assert_eq!(tasks[0].priority, TaskPriority::Medium);
|
||||
assert_eq!(tasks[0].status, TaskStatus::Active);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_mixed_structured_and_legacy() {
|
||||
let content = "- [high] Urgent\n- Normal task\n- [low|paused] Later";
|
||||
let tasks = HeartbeatEngine::parse_tasks(content);
|
||||
assert_eq!(tasks.len(), 3);
|
||||
assert_eq!(tasks[0].priority, TaskPriority::High);
|
||||
assert_eq!(tasks[1].priority, TaskPriority::Medium);
|
||||
assert_eq!(tasks[2].priority, TaskPriority::Low);
|
||||
assert_eq!(tasks[2].status, TaskStatus::Paused);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn runnable_filters_paused_and_completed() {
|
||||
let content = "- [high] Active\n- [low|paused] Paused\n- [completed] Done";
|
||||
let tasks = HeartbeatEngine::parse_tasks(content);
|
||||
let runnable: Vec<_> = tasks
|
||||
.into_iter()
|
||||
.filter(HeartbeatTask::is_runnable)
|
||||
.collect();
|
||||
assert_eq!(runnable.len(), 1);
|
||||
assert_eq!(runnable[0].text, "Active");
|
||||
}
|
||||
|
||||
// ── Two-phase decision tests ────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn decision_prompt_includes_all_tasks() {
|
||||
let tasks = vec![
|
||||
HeartbeatTask {
|
||||
text: "Check email".into(),
|
||||
priority: TaskPriority::High,
|
||||
status: TaskStatus::Active,
|
||||
},
|
||||
HeartbeatTask {
|
||||
text: "Review calendar".into(),
|
||||
priority: TaskPriority::Medium,
|
||||
status: TaskStatus::Active,
|
||||
},
|
||||
];
|
||||
let prompt = HeartbeatEngine::build_decision_prompt(&tasks);
|
||||
assert!(prompt.contains("1. [high] Check email"));
|
||||
assert!(prompt.contains("2. [medium] Review calendar"));
|
||||
assert!(prompt.contains("skip"));
|
||||
assert!(prompt.contains("run:"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_decision_skip() {
|
||||
let indices = HeartbeatEngine::parse_decision_response("skip", 3);
|
||||
assert!(indices.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_decision_skip_with_reason() {
|
||||
let indices =
|
||||
HeartbeatEngine::parse_decision_response("skip — nothing urgent right now", 3);
|
||||
assert!(indices.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_decision_run_single() {
|
||||
let indices = HeartbeatEngine::parse_decision_response("run: 1", 3);
|
||||
assert_eq!(indices, vec![0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_decision_run_multiple() {
|
||||
let indices = HeartbeatEngine::parse_decision_response("run: 1, 3", 3);
|
||||
assert_eq!(indices, vec![0, 2]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_decision_run_out_of_range_ignored() {
|
||||
let indices = HeartbeatEngine::parse_decision_response("run: 1, 5, 2", 3);
|
||||
assert_eq!(indices, vec![0, 1]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_decision_run_zero_ignored() {
|
||||
let indices = HeartbeatEngine::parse_decision_response("run: 0, 1", 3);
|
||||
assert_eq!(indices, vec![0]);
|
||||
}
|
||||
|
||||
// ── Task display ────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn task_display_format() {
|
||||
let task = HeartbeatTask {
|
||||
text: "Check email".into(),
|
||||
priority: TaskPriority::High,
|
||||
status: TaskStatus::Active,
|
||||
};
|
||||
assert_eq!(format!("{task}"), "[high] Check email");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn priority_ordering() {
|
||||
assert!(TaskPriority::High > TaskPriority::Medium);
|
||||
assert!(TaskPriority::Medium > TaskPriority::Low);
|
||||
}
|
||||
|
||||
// ── Async tests ─────────────────────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn ensure_heartbeat_file_creates_file() {
|
||||
let dir = std::env::temp_dir().join("zeroclaw_test_heartbeat");
|
||||
@@ -216,6 +554,7 @@ mod tests {
|
||||
assert!(path.exists());
|
||||
let content = tokio::fs::read_to_string(&path).await.unwrap();
|
||||
assert!(content.contains("Periodic Tasks"));
|
||||
assert!(content.contains("[high]"));
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(&dir).await;
|
||||
}
|
||||
@@ -301,4 +640,37 @@ mod tests {
|
||||
let result = engine.run().await;
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn collect_runnable_tasks_sorts_by_priority() {
|
||||
let dir = std::env::temp_dir().join("zeroclaw_test_runnable_sort");
|
||||
let _ = tokio::fs::remove_dir_all(&dir).await;
|
||||
tokio::fs::create_dir_all(&dir).await.unwrap();
|
||||
|
||||
tokio::fs::write(
|
||||
dir.join("HEARTBEAT.md"),
|
||||
"- [low] Low task\n- [high] High task\n- Medium task\n- [low|paused] Skip me",
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let observer: Arc<dyn Observer> = Arc::new(crate::observability::NoopObserver);
|
||||
let engine = HeartbeatEngine::new(
|
||||
HeartbeatConfig {
|
||||
enabled: true,
|
||||
interval_minutes: 30,
|
||||
..HeartbeatConfig::default()
|
||||
},
|
||||
dir.clone(),
|
||||
observer,
|
||||
);
|
||||
|
||||
let tasks = engine.collect_runnable_tasks().await.unwrap();
|
||||
assert_eq!(tasks.len(), 3); // paused one excluded
|
||||
assert_eq!(tasks[0].priority, TaskPriority::High);
|
||||
assert_eq!(tasks[1].priority, TaskPriority::Medium);
|
||||
assert_eq!(tasks[2].priority, TaskPriority::Low);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(&dir).await;
|
||||
}
|
||||
}
|
||||
|
||||
+19
-6
@@ -280,15 +280,19 @@ Times are evaluated in UTC by default; use --tz with an IANA \
|
||||
timezone name to override.
|
||||
|
||||
Examples:
|
||||
zeroclaw cron add '0 9 * * 1-5' 'Good morning' --tz America/New_York
|
||||
zeroclaw cron add '*/30 * * * *' 'Check system health'")]
|
||||
zeroclaw cron add '0 9 * * 1-5' 'Good morning' --tz America/New_York --agent
|
||||
zeroclaw cron add '*/30 * * * *' 'Check system health' --agent
|
||||
zeroclaw cron add '*/5 * * * *' 'echo ok'")]
|
||||
Add {
|
||||
/// Cron expression
|
||||
expression: String,
|
||||
/// Optional IANA timezone (e.g. America/Los_Angeles)
|
||||
#[arg(long)]
|
||||
tz: Option<String>,
|
||||
/// Command to run
|
||||
/// Treat the argument as an agent prompt instead of a shell command
|
||||
#[arg(long)]
|
||||
agent: bool,
|
||||
/// Command (shell) or prompt (agent) to run
|
||||
command: String,
|
||||
},
|
||||
/// Add a one-shot scheduled task at an RFC3339 timestamp
|
||||
@@ -303,7 +307,10 @@ Examples:
|
||||
AddAt {
|
||||
/// One-shot timestamp in RFC3339 format
|
||||
at: String,
|
||||
/// Command to run
|
||||
/// Treat the argument as an agent prompt instead of a shell command
|
||||
#[arg(long)]
|
||||
agent: bool,
|
||||
/// Command (shell) or prompt (agent) to run
|
||||
command: String,
|
||||
},
|
||||
/// Add a fixed-interval scheduled task
|
||||
@@ -318,7 +325,10 @@ Examples:
|
||||
AddEvery {
|
||||
/// Interval in milliseconds
|
||||
every_ms: u64,
|
||||
/// Command to run
|
||||
/// Treat the argument as an agent prompt instead of a shell command
|
||||
#[arg(long)]
|
||||
agent: bool,
|
||||
/// Command (shell) or prompt (agent) to run
|
||||
command: String,
|
||||
},
|
||||
/// Add a one-shot delayed task (e.g. "30m", "2h", "1d")
|
||||
@@ -335,7 +345,10 @@ Examples:
|
||||
Once {
|
||||
/// Delay duration
|
||||
delay: String,
|
||||
/// Command to run
|
||||
/// Treat the argument as an agent prompt instead of a shell command
|
||||
#[arg(long)]
|
||||
agent: bool,
|
||||
/// Command (shell) or prompt (agent) to run
|
||||
command: String,
|
||||
},
|
||||
/// Remove a scheduled task
|
||||
|
||||
+5
-4
@@ -325,11 +325,12 @@ override with --tz and an IANA timezone name.
|
||||
|
||||
Examples:
|
||||
zeroclaw cron list
|
||||
zeroclaw cron add '0 9 * * 1-5' 'Good morning' --tz America/New_York
|
||||
zeroclaw cron add '*/30 * * * *' 'Check system health'
|
||||
zeroclaw cron add-at 2025-01-15T14:00:00Z 'Send reminder'
|
||||
zeroclaw cron add '0 9 * * 1-5' 'Good morning' --tz America/New_York --agent
|
||||
zeroclaw cron add '*/30 * * * *' 'Check system health' --agent
|
||||
zeroclaw cron add '*/5 * * * *' 'echo ok'
|
||||
zeroclaw cron add-at 2025-01-15T14:00:00Z 'Send reminder' --agent
|
||||
zeroclaw cron add-every 60000 'Ping heartbeat'
|
||||
zeroclaw cron once 30m 'Run backup in 30 minutes'
|
||||
zeroclaw cron once 30m 'Run backup in 30 minutes' --agent
|
||||
zeroclaw cron pause <task-id>
|
||||
zeroclaw cron update <task-id> --expression '0 8 * * *' --tz Europe/London")]
|
||||
Cron {
|
||||
|
||||
@@ -0,0 +1,175 @@
|
||||
//! LLM-driven memory consolidation.
|
||||
//!
|
||||
//! After each conversation turn, extracts structured information:
|
||||
//! - `history_entry`: A timestamped summary for the daily conversation log.
|
||||
//! - `memory_update`: New facts, preferences, or decisions worth remembering
|
||||
//! long-term (or `null` if nothing new was learned).
|
||||
//!
|
||||
//! This two-phase approach replaces the naive raw-message auto-save with
|
||||
//! semantic extraction, similar to Nanobot's `save_memory` tool call pattern.
|
||||
|
||||
use crate::memory::traits::{Memory, MemoryCategory};
|
||||
use crate::providers::traits::Provider;
|
||||
|
||||
/// Output of consolidation extraction.
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
pub struct ConsolidationResult {
|
||||
/// Brief timestamped summary for the conversation history log.
|
||||
pub history_entry: String,
|
||||
/// New facts/preferences/decisions to store long-term, or None.
|
||||
pub memory_update: Option<String>,
|
||||
}
|
||||
|
||||
const CONSOLIDATION_SYSTEM_PROMPT: &str = r#"You are a memory consolidation engine. Given a conversation turn, extract:
|
||||
1. "history_entry": A brief summary of what happened in this turn (1-2 sentences). Include the key topic or action.
|
||||
2. "memory_update": Any NEW facts, preferences, decisions, or commitments worth remembering long-term. Return null if nothing new was learned.
|
||||
|
||||
Respond ONLY with valid JSON: {"history_entry": "...", "memory_update": "..." or null}
|
||||
Do not include any text outside the JSON object."#;
|
||||
|
||||
/// Run two-phase LLM-driven consolidation on a conversation turn.
|
||||
///
|
||||
/// Phase 1: Write a history entry to the Daily memory category.
|
||||
/// Phase 2: Write a memory update to the Core category (if the LLM identified new facts).
|
||||
///
|
||||
/// This function is designed to be called fire-and-forget via `tokio::spawn`.
|
||||
pub async fn consolidate_turn(
|
||||
provider: &dyn Provider,
|
||||
model: &str,
|
||||
memory: &dyn Memory,
|
||||
user_message: &str,
|
||||
assistant_response: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let turn_text = format!("User: {user_message}\nAssistant: {assistant_response}");
|
||||
|
||||
// Truncate very long turns to avoid wasting tokens on consolidation.
|
||||
// Use char-boundary-safe slicing to prevent panic on multi-byte UTF-8 (e.g. CJK text).
|
||||
let truncated = if turn_text.len() > 4000 {
|
||||
let mut end = 4000;
|
||||
while end > 0 && !turn_text.is_char_boundary(end) {
|
||||
end -= 1;
|
||||
}
|
||||
format!("{}…", &turn_text[..end])
|
||||
} else {
|
||||
turn_text.clone()
|
||||
};
|
||||
|
||||
let raw = provider
|
||||
.chat_with_system(Some(CONSOLIDATION_SYSTEM_PROMPT), &truncated, model, 0.1)
|
||||
.await?;
|
||||
|
||||
let result: ConsolidationResult = parse_consolidation_response(&raw, &turn_text);
|
||||
|
||||
// Phase 1: Write history entry to Daily category.
|
||||
let date = chrono::Local::now().format("%Y-%m-%d").to_string();
|
||||
let history_key = format!("daily_{date}_{}", uuid::Uuid::new_v4());
|
||||
memory
|
||||
.store(
|
||||
&history_key,
|
||||
&result.history_entry,
|
||||
MemoryCategory::Daily,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Phase 2: Write memory update to Core category (if present).
|
||||
if let Some(ref update) = result.memory_update {
|
||||
if !update.trim().is_empty() {
|
||||
let mem_key = format!("core_{}", uuid::Uuid::new_v4());
|
||||
memory
|
||||
.store(&mem_key, update, MemoryCategory::Core, None)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Parse the LLM's consolidation response, with fallback for malformed JSON.
|
||||
fn parse_consolidation_response(raw: &str, fallback_text: &str) -> ConsolidationResult {
|
||||
// Try to extract JSON from the response (LLM may wrap in markdown code blocks).
|
||||
let cleaned = raw
|
||||
.trim()
|
||||
.trim_start_matches("```json")
|
||||
.trim_start_matches("```")
|
||||
.trim_end_matches("```")
|
||||
.trim();
|
||||
|
||||
serde_json::from_str(cleaned).unwrap_or_else(|_| {
|
||||
// Fallback: use truncated turn text as history entry.
|
||||
// Use char-boundary-safe slicing to prevent panic on multi-byte UTF-8.
|
||||
let summary = if fallback_text.len() > 200 {
|
||||
let mut end = 200;
|
||||
while end > 0 && !fallback_text.is_char_boundary(end) {
|
||||
end -= 1;
|
||||
}
|
||||
format!("{}…", &fallback_text[..end])
|
||||
} else {
|
||||
fallback_text.to_string()
|
||||
};
|
||||
ConsolidationResult {
|
||||
history_entry: summary,
|
||||
memory_update: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parse_valid_json_response() {
|
||||
let raw = r#"{"history_entry": "User asked about Rust.", "memory_update": "User prefers Rust over Go."}"#;
|
||||
let result = parse_consolidation_response(raw, "fallback");
|
||||
assert_eq!(result.history_entry, "User asked about Rust.");
|
||||
assert_eq!(
|
||||
result.memory_update.as_deref(),
|
||||
Some("User prefers Rust over Go.")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_json_with_null_memory() {
|
||||
let raw = r#"{"history_entry": "Routine greeting.", "memory_update": null}"#;
|
||||
let result = parse_consolidation_response(raw, "fallback");
|
||||
assert_eq!(result.history_entry, "Routine greeting.");
|
||||
assert!(result.memory_update.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_json_wrapped_in_code_block() {
|
||||
let raw =
|
||||
"```json\n{\"history_entry\": \"Discussed deployment.\", \"memory_update\": null}\n```";
|
||||
let result = parse_consolidation_response(raw, "fallback");
|
||||
assert_eq!(result.history_entry, "Discussed deployment.");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fallback_on_malformed_response() {
|
||||
let raw = "I'm sorry, I can't do that.";
|
||||
let result = parse_consolidation_response(raw, "User: hello\nAssistant: hi");
|
||||
assert_eq!(result.history_entry, "User: hello\nAssistant: hi");
|
||||
assert!(result.memory_update.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fallback_truncates_long_text() {
|
||||
let long_text = "x".repeat(500);
|
||||
let result = parse_consolidation_response("invalid", &long_text);
|
||||
// 200 bytes + "…" (3 bytes in UTF-8) = 203
|
||||
assert!(result.history_entry.len() <= 203);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fallback_truncates_cjk_text_without_panic() {
|
||||
// Each CJK character is 3 bytes in UTF-8; byte index 200 may land
|
||||
// inside a character. This must not panic.
|
||||
let cjk_text = "二手书项目".repeat(50); // 250 chars = 750 bytes
|
||||
let result = parse_consolidation_response("invalid", &cjk_text);
|
||||
assert!(result
|
||||
.history_entry
|
||||
.is_char_boundary(result.history_entry.len()));
|
||||
assert!(result.history_entry.ends_with('…'));
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
pub mod backend;
|
||||
pub mod chunker;
|
||||
pub mod cli;
|
||||
pub mod consolidation;
|
||||
pub mod embeddings;
|
||||
pub mod hygiene;
|
||||
pub mod lucid;
|
||||
|
||||
+16
-1
@@ -67,7 +67,13 @@ pub fn redact(value: &str) -> String {
|
||||
if value.len() <= 4 {
|
||||
"***".to_string()
|
||||
} else {
|
||||
format!("{}***", &value[..4])
|
||||
// Use char-boundary-safe slicing to prevent panic on multi-byte UTF-8.
|
||||
let prefix = value
|
||||
.char_indices()
|
||||
.nth(4)
|
||||
.map(|(byte_idx, _)| &value[..byte_idx])
|
||||
.unwrap_or(value);
|
||||
format!("{}***", prefix)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,4 +108,13 @@ mod tests {
|
||||
assert_eq!(redact(""), "***");
|
||||
assert_eq!(redact("12345"), "1234***");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn redact_handles_multibyte_utf8_without_panic() {
|
||||
// CJK characters are 3 bytes each; slicing at byte 4 would panic
|
||||
// without char-boundary-safe handling.
|
||||
let result = redact("密码是很长的秘密");
|
||||
assert!(result.ends_with("***"));
|
||||
assert!(result.is_char_boundary(result.len()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ pub struct HttpRequestTool {
|
||||
allowed_domains: Vec<String>,
|
||||
max_response_size: usize,
|
||||
timeout_secs: u64,
|
||||
allow_private_hosts: bool,
|
||||
}
|
||||
|
||||
impl HttpRequestTool {
|
||||
@@ -20,12 +21,14 @@ impl HttpRequestTool {
|
||||
allowed_domains: Vec<String>,
|
||||
max_response_size: usize,
|
||||
timeout_secs: u64,
|
||||
allow_private_hosts: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
security,
|
||||
allowed_domains: normalize_allowed_domains(allowed_domains),
|
||||
max_response_size,
|
||||
timeout_secs,
|
||||
allow_private_hosts,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,7 +55,7 @@ impl HttpRequestTool {
|
||||
|
||||
let host = extract_host(url)?;
|
||||
|
||||
if is_private_or_local_host(&host) {
|
||||
if !self.allow_private_hosts && is_private_or_local_host(&host) {
|
||||
anyhow::bail!("Blocked local/private host: {host}");
|
||||
}
|
||||
|
||||
@@ -454,6 +457,13 @@ mod tests {
|
||||
use crate::security::{AutonomyLevel, SecurityPolicy};
|
||||
|
||||
fn test_tool(allowed_domains: Vec<&str>) -> HttpRequestTool {
|
||||
test_tool_with_private(allowed_domains, false)
|
||||
}
|
||||
|
||||
fn test_tool_with_private(
|
||||
allowed_domains: Vec<&str>,
|
||||
allow_private_hosts: bool,
|
||||
) -> HttpRequestTool {
|
||||
let security = Arc::new(SecurityPolicy {
|
||||
autonomy: AutonomyLevel::Supervised,
|
||||
..SecurityPolicy::default()
|
||||
@@ -463,6 +473,7 @@ mod tests {
|
||||
allowed_domains.into_iter().map(String::from).collect(),
|
||||
1_000_000,
|
||||
30,
|
||||
allow_private_hosts,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -570,7 +581,7 @@ mod tests {
|
||||
#[test]
|
||||
fn validate_requires_allowlist() {
|
||||
let security = Arc::new(SecurityPolicy::default());
|
||||
let tool = HttpRequestTool::new(security, vec![], 1_000_000, 30);
|
||||
let tool = HttpRequestTool::new(security, vec![], 1_000_000, 30, false);
|
||||
let err = tool
|
||||
.validate_url("https://example.com")
|
||||
.unwrap_err()
|
||||
@@ -686,7 +697,7 @@ mod tests {
|
||||
autonomy: AutonomyLevel::ReadOnly,
|
||||
..SecurityPolicy::default()
|
||||
});
|
||||
let tool = HttpRequestTool::new(security, vec!["example.com".into()], 1_000_000, 30);
|
||||
let tool = HttpRequestTool::new(security, vec!["example.com".into()], 1_000_000, 30, false);
|
||||
let result = tool
|
||||
.execute(json!({"url": "https://example.com"}))
|
||||
.await
|
||||
@@ -701,7 +712,7 @@ mod tests {
|
||||
max_actions_per_hour: 0,
|
||||
..SecurityPolicy::default()
|
||||
});
|
||||
let tool = HttpRequestTool::new(security, vec!["example.com".into()], 1_000_000, 30);
|
||||
let tool = HttpRequestTool::new(security, vec!["example.com".into()], 1_000_000, 30, false);
|
||||
let result = tool
|
||||
.execute(json!({"url": "https://example.com"}))
|
||||
.await
|
||||
@@ -724,6 +735,7 @@ mod tests {
|
||||
vec!["example.com".into()],
|
||||
10,
|
||||
30,
|
||||
false,
|
||||
);
|
||||
let text = "hello world this is long";
|
||||
let truncated = tool.truncate_response(text);
|
||||
@@ -738,6 +750,7 @@ mod tests {
|
||||
vec!["example.com".into()],
|
||||
0, // max_response_size = 0 means no limit
|
||||
30,
|
||||
false,
|
||||
);
|
||||
let text = "a".repeat(10_000_000);
|
||||
assert_eq!(tool.truncate_response(&text), text);
|
||||
@@ -750,6 +763,7 @@ mod tests {
|
||||
vec!["example.com".into()],
|
||||
5,
|
||||
30,
|
||||
false,
|
||||
);
|
||||
let text = "hello world";
|
||||
let truncated = tool.truncate_response(text);
|
||||
@@ -935,4 +949,70 @@ mod tests {
|
||||
.to_string();
|
||||
assert!(err.contains("IPv6"));
|
||||
}
|
||||
|
||||
// ── allow_private_hosts opt-in tests ────────────────────────
|
||||
|
||||
#[test]
|
||||
fn default_blocks_private_hosts() {
|
||||
let tool = test_tool(vec!["localhost", "192.168.1.5", "*"]);
|
||||
assert!(tool
|
||||
.validate_url("https://localhost:8080")
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("local/private"));
|
||||
assert!(tool
|
||||
.validate_url("https://192.168.1.5")
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("local/private"));
|
||||
assert!(tool
|
||||
.validate_url("https://10.0.0.1")
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("local/private"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn allow_private_hosts_permits_localhost() {
|
||||
let tool = test_tool_with_private(vec!["localhost"], true);
|
||||
assert!(tool.validate_url("https://localhost:8080").is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn allow_private_hosts_permits_private_ipv4() {
|
||||
let tool = test_tool_with_private(vec!["192.168.1.5"], true);
|
||||
assert!(tool.validate_url("https://192.168.1.5").is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn allow_private_hosts_permits_rfc1918_with_wildcard() {
|
||||
let tool = test_tool_with_private(vec!["*"], true);
|
||||
assert!(tool.validate_url("https://10.0.0.1").is_ok());
|
||||
assert!(tool.validate_url("https://172.16.0.1").is_ok());
|
||||
assert!(tool.validate_url("https://192.168.1.1").is_ok());
|
||||
assert!(tool.validate_url("http://localhost:8123").is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn allow_private_hosts_still_requires_allowlist() {
|
||||
let tool = test_tool_with_private(vec!["example.com"], true);
|
||||
let err = tool
|
||||
.validate_url("https://192.168.1.5")
|
||||
.unwrap_err()
|
||||
.to_string();
|
||||
assert!(
|
||||
err.contains("allowed_domains"),
|
||||
"Private host should still need allowlist match, got: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn allow_private_hosts_false_still_blocks() {
|
||||
let tool = test_tool_with_private(vec!["*"], false);
|
||||
assert!(tool
|
||||
.validate_url("https://localhost:8080")
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("local/private"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -314,6 +314,7 @@ pub fn all_tools_with_runtime(
|
||||
http_config.allowed_domains.clone(),
|
||||
http_config.max_response_size,
|
||||
http_config.timeout_secs,
|
||||
http_config.allow_private_hosts,
|
||||
)));
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user