Compare commits

...

7 Commits

Author SHA1 Message Date
Argenis 93b5a0b824 feat(context): token-based compaction, persistent sessions, and LLM consolidation (#3574)
Comprehensive long-running context upgrades:

- Token-based compaction: replace message-count trigger with token
  estimation (~4 chars/token). Compaction fires when estimated tokens
  exceed max_context_tokens (default 32K) OR message count exceeds
  max_history_messages. Cuts at user-turn boundaries only.

- Persistent sessions: JSONL append-only session files per channel
  sender in {workspace}/sessions/. Sessions survive daemon restarts.
  Hydrates in-memory history from disk on startup.

- LLM-driven memory consolidation: two-phase extraction after each
  conversation turn. Phase 1 writes a timestamped history entry (Daily).
  Phase 2 extracts new facts/preferences to Core memory (if any).
  Replaces raw message auto-save with semantic extraction.

- New config fields: agent.max_context_tokens (32000),
  channels_config.session_persistence (true).

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 09:25:23 -04:00
Argenis 08a67c4a2d chore: bump version to v0.3.2 (#3564)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 06:47:37 -04:00
Argenis c86a0673ba feat(heartbeat): two-phase execution, structured tasks, and auto-routing (#3562)
Upgrade heartbeat system with 4 key improvements:

- Two-phase heartbeat: Phase 1 asks LLM "skip or run?" to save API cost
  on quiet periods. Phase 2 executes only selected tasks.
- Structured task format: `- [priority|status] task text` with
  high/medium/low priority and active/paused/completed status.
- Decision intelligence: LLM-driven smart filtering via structured prompt
  at temperature 0.0 for deterministic decisions.
- Delivery routing: auto-detect best configured channel when no explicit
  target is set (telegram > discord > slack > mattermost).

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 06:11:59 -04:00
Argenis cabf99ba07 Merge pull request #3539 from zeroclaw-labs/cleanup
chore: add .wrangler/ to gitignore
2026-03-14 22:53:12 -04:00
argenis de la rosa 2d978a6b64 chore: add .wrangler/ to gitignore and clean up stale files
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-14 22:41:17 -04:00
Argenis 4dbc9266c1 Merge pull request #3536 from zeroclaw-labs/test/termux-release-validation
test: add Termux release validation script
2026-03-14 22:21:21 -04:00
argenis de la rosa ea0b3c8c8c test: add Termux release validation script
Validates the aarch64-linux-android release artifact: download, archive
integrity, ELF format, architecture, checksum, and install.sh detection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-14 22:10:33 -04:00
12 changed files with 1337 additions and 91 deletions
+4 -1
View File
@@ -43,4 +43,7 @@ credentials.json
lcov.info
# IDE's stuff
.idea
.idea
# Wrangler cache
.wrangler/
Generated
+1 -1
View File
@@ -7945,7 +7945,7 @@ dependencies = [
[[package]]
name = "zeroclawlabs"
version = "0.3.1"
version = "0.3.2"
dependencies = [
"anyhow",
"async-imap",
+1 -1
View File
@@ -4,7 +4,7 @@ resolver = "2"
[package]
name = "zeroclawlabs"
version = "0.3.1"
version = "0.3.2"
edition = "2021"
authors = ["theonlyhennygod"]
license = "MIT OR Apache-2.0"
+261
View File
@@ -0,0 +1,261 @@
#!/usr/bin/env bash
# Termux release validation script
# Validates the aarch64-linux-android release artifact for Termux compatibility.
#
# Usage:
# ./dev/test-termux-release.sh [version]
#
# Examples:
# ./dev/test-termux-release.sh 0.3.1
# ./dev/test-termux-release.sh # auto-detects from Cargo.toml
#
set -euo pipefail
BLUE='\033[0;34m'
GREEN='\033[0;32m'
RED='\033[0;31m'
YELLOW='\033[0;33m'
BOLD='\033[1m'
DIM='\033[2m'
RESET='\033[0m'
pass() { echo -e " ${GREEN}${RESET} $*"; }
fail() { echo -e " ${RED}${RESET} $*"; FAILURES=$((FAILURES + 1)); }
info() { echo -e "${BLUE}${RESET} ${BOLD}$*${RESET}"; }
warn() { echo -e "${YELLOW}!${RESET} $*"; }
FAILURES=0
TARGET="aarch64-linux-android"
VERSION="${1:-}"
if [[ -z "$VERSION" ]]; then
if [[ -f Cargo.toml ]]; then
VERSION=$(sed -n 's/^version = "\([^"]*\)"/\1/p' Cargo.toml | head -1)
fi
fi
if [[ -z "$VERSION" ]]; then
echo "Usage: $0 <version>"
echo " e.g. $0 0.3.1"
exit 1
fi
TAG="v${VERSION}"
ASSET_NAME="zeroclaw-${TARGET}.tar.gz"
ASSET_URL="https://github.com/zeroclaw-labs/zeroclaw/releases/download/${TAG}/${ASSET_NAME}"
TEMP_DIR="$(mktemp -d -t zeroclaw-termux-test-XXXXXX)"
cleanup() { rm -rf "$TEMP_DIR"; }
trap cleanup EXIT
echo
echo -e "${BOLD}Termux Release Validation — ${TAG}${RESET}"
echo -e "${DIM}Target: ${TARGET}${RESET}"
echo
# --- Test 1: Release tag exists ---
info "Checking release tag ${TAG}"
if gh release view "$TAG" >/dev/null 2>&1; then
pass "Release ${TAG} exists"
else
fail "Release ${TAG} not found"
echo -e "${RED}Release has not been published yet. Wait for the release workflow to complete.${RESET}"
exit 1
fi
# --- Test 2: Android asset is listed ---
info "Checking for ${ASSET_NAME} in release assets"
ASSETS=$(gh release view "$TAG" --json assets -q '.assets[].name')
if echo "$ASSETS" | grep -q "$ASSET_NAME"; then
pass "Asset ${ASSET_NAME} found in release"
else
fail "Asset ${ASSET_NAME} not found in release"
echo "Available assets:"
echo "$ASSETS" | sed 's/^/ /'
exit 1
fi
# --- Test 3: Download the asset ---
info "Downloading ${ASSET_NAME}"
if curl -fsSL "$ASSET_URL" -o "$TEMP_DIR/$ASSET_NAME"; then
FILESIZE=$(wc -c < "$TEMP_DIR/$ASSET_NAME" | tr -d ' ')
pass "Downloaded successfully (${FILESIZE} bytes)"
else
fail "Download failed from ${ASSET_URL}"
exit 1
fi
# --- Test 4: Archive integrity ---
info "Verifying archive integrity"
if tar -tzf "$TEMP_DIR/$ASSET_NAME" >/dev/null 2>&1; then
pass "Archive is a valid gzip tar"
else
fail "Archive is corrupted or not a valid tar.gz"
exit 1
fi
# --- Test 5: Contains zeroclaw binary ---
info "Checking archive contents"
CONTENTS=$(tar -tzf "$TEMP_DIR/$ASSET_NAME")
if echo "$CONTENTS" | grep -q "^zeroclaw$"; then
pass "Archive contains 'zeroclaw' binary"
else
fail "Archive does not contain 'zeroclaw' binary"
echo "Contents:"
echo "$CONTENTS" | sed 's/^/ /'
fi
# --- Test 6: Extract and inspect binary ---
info "Extracting and inspecting binary"
tar -xzf "$TEMP_DIR/$ASSET_NAME" -C "$TEMP_DIR"
BINARY="$TEMP_DIR/zeroclaw"
if [[ -f "$BINARY" ]]; then
pass "Binary extracted"
else
fail "Binary not found after extraction"
exit 1
fi
# --- Test 7: ELF format and architecture ---
info "Checking binary format"
FILE_INFO=$(file "$BINARY")
if echo "$FILE_INFO" | grep -q "ELF"; then
pass "Binary is ELF format"
else
fail "Binary is not ELF format: $FILE_INFO"
fi
if echo "$FILE_INFO" | grep -qi "aarch64\|ARM aarch64"; then
pass "Binary targets aarch64 architecture"
else
fail "Binary does not target aarch64: $FILE_INFO"
fi
if echo "$FILE_INFO" | grep -qi "android\|bionic"; then
pass "Binary is linked for Android/Bionic"
else
# Android binaries may not always show "android" in file output,
# check with readelf if available
if command -v readelf >/dev/null 2>&1; then
INTERP=$(readelf -l "$BINARY" 2>/dev/null | grep -o '/[^ ]*linker[^ ]*' || true)
if echo "$INTERP" | grep -qi "android\|bionic"; then
pass "Binary uses Android linker: $INTERP"
else
warn "Could not confirm Android linkage (interpreter: ${INTERP:-unknown})"
warn "file output: $FILE_INFO"
fi
else
warn "Could not confirm Android linkage (readelf not available)"
warn "file output: $FILE_INFO"
fi
fi
# --- Test 8: Binary is stripped ---
info "Checking binary optimization"
if echo "$FILE_INFO" | grep -q "stripped"; then
pass "Binary is stripped (release optimized)"
else
warn "Binary may not be stripped"
fi
# --- Test 9: Binary is not dynamically linked to glibc ---
info "Checking for glibc dependencies"
if command -v readelf >/dev/null 2>&1; then
NEEDED=$(readelf -d "$BINARY" 2>/dev/null | grep NEEDED || true)
if echo "$NEEDED" | grep -qi "libc\.so\.\|libpthread\|libdl"; then
# Check if it's glibc or bionic
if echo "$NEEDED" | grep -qi "libc\.so\.6"; then
fail "Binary links against glibc (libc.so.6) — will not work on Termux"
else
pass "Binary links against libc (likely Bionic)"
fi
else
pass "No glibc dependencies detected"
fi
else
warn "readelf not available — skipping dynamic library check"
fi
# --- Test 10: SHA256 checksum verification ---
info "Verifying SHA256 checksum"
CHECKSUMS_URL="https://github.com/zeroclaw-labs/zeroclaw/releases/download/${TAG}/SHA256SUMS"
if curl -fsSL "$CHECKSUMS_URL" -o "$TEMP_DIR/SHA256SUMS" 2>/dev/null; then
EXPECTED=$(grep "$ASSET_NAME" "$TEMP_DIR/SHA256SUMS" | awk '{print $1}')
if [[ -n "$EXPECTED" ]]; then
if command -v sha256sum >/dev/null 2>&1; then
ACTUAL=$(sha256sum "$TEMP_DIR/$ASSET_NAME" | awk '{print $1}')
elif command -v shasum >/dev/null 2>&1; then
ACTUAL=$(shasum -a 256 "$TEMP_DIR/$ASSET_NAME" | awk '{print $1}')
else
warn "No sha256sum or shasum available"
ACTUAL=""
fi
if [[ -n "$ACTUAL" && "$ACTUAL" == "$EXPECTED" ]]; then
pass "SHA256 checksum matches"
elif [[ -n "$ACTUAL" ]]; then
fail "SHA256 mismatch: expected=$EXPECTED actual=$ACTUAL"
fi
else
warn "No checksum entry for ${ASSET_NAME} in SHA256SUMS"
fi
else
warn "Could not download SHA256SUMS"
fi
# --- Test 11: install.sh Termux detection ---
info "Validating install.sh Termux detection"
INSTALL_SH="install.sh"
if [[ ! -f "$INSTALL_SH" ]]; then
INSTALL_SH="$(dirname "$0")/../install.sh"
fi
if [[ -f "$INSTALL_SH" ]]; then
if grep -q 'TERMUX_VERSION' "$INSTALL_SH"; then
pass "install.sh checks TERMUX_VERSION"
else
fail "install.sh does not check TERMUX_VERSION"
fi
if grep -q 'aarch64-linux-android' "$INSTALL_SH"; then
pass "install.sh maps to aarch64-linux-android target"
else
fail "install.sh does not map to aarch64-linux-android"
fi
# Simulate Termux detection (mock uname as Linux since we may run on macOS)
detect_result=$(
bash -c '
TERMUX_VERSION="0.118"
os="Linux"
arch="aarch64"
case "$os:$arch" in
Linux:aarch64|Linux:arm64)
if [[ -n "${TERMUX_VERSION:-}" || -d "/data/data/com.termux" ]]; then
echo "aarch64-linux-android"
else
echo "aarch64-unknown-linux-gnu"
fi
;;
esac
'
)
if [[ "$detect_result" == "aarch64-linux-android" ]]; then
pass "Termux detection returns correct target (simulated)"
else
fail "Termux detection returned: $detect_result (expected aarch64-linux-android)"
fi
else
warn "install.sh not found — skipping detection tests"
fi
# --- Summary ---
echo
if [[ "$FAILURES" -eq 0 ]]; then
echo -e "${GREEN}${BOLD}All tests passed!${RESET}"
echo -e "${DIM}The Termux release artifact for ${TAG} is valid.${RESET}"
else
echo -e "${RED}${BOLD}${FAILURES} test(s) failed.${RESET}"
exit 1
fi
+54 -2
View File
@@ -195,6 +195,18 @@ const COMPACTION_MAX_SOURCE_CHARS: usize = 12_000;
/// Max characters retained in stored compaction summary.
const COMPACTION_MAX_SUMMARY_CHARS: usize = 2_000;
/// Estimate token count for a message history using ~4 chars/token heuristic.
/// Includes a small overhead per message for role/framing tokens.
fn estimate_history_tokens(history: &[ChatMessage]) -> usize {
history
.iter()
.map(|m| {
// ~4 chars per token + ~4 framing tokens per message (role, delimiters)
m.content.len().div_ceil(4) + 4
})
.sum()
}
/// Minimum interval between progress sends to avoid flooding the draft channel.
pub(crate) const PROGRESS_MIN_INTERVAL_MS: u64 = 500;
@@ -288,6 +300,7 @@ async fn auto_compact_history(
provider: &dyn Provider,
model: &str,
max_history: usize,
max_context_tokens: usize,
) -> Result<bool> {
let has_system = history.first().map_or(false, |m| m.role == "system");
let non_system_count = if has_system {
@@ -296,7 +309,10 @@ async fn auto_compact_history(
history.len()
};
if non_system_count <= max_history {
let estimated_tokens = estimate_history_tokens(history);
// Trigger compaction when either token budget OR message count is exceeded.
if estimated_tokens <= max_context_tokens && non_system_count <= max_history {
return Ok(false);
}
@@ -307,7 +323,16 @@ async fn auto_compact_history(
return Ok(false);
}
let compact_end = start + compact_count;
let mut compact_end = start + compact_count;
// Snap compact_end to a user-turn boundary so we don't split mid-conversation.
while compact_end > start && history.get(compact_end).map_or(false, |m| m.role != "user") {
compact_end -= 1;
}
if compact_end <= start {
return Ok(false);
}
let to_compact: Vec<ChatMessage> = history[start..compact_end].to_vec();
let transcript = build_compaction_transcript(&to_compact);
@@ -3508,6 +3533,7 @@ pub async fn run(
provider.as_ref(),
model_name,
config.agent.max_history_messages,
config.agent.max_context_tokens,
)
.await
{
@@ -6449,4 +6475,30 @@ Let me check the result."#;
let result = filter_tool_specs_for_turn(specs, &groups, "BROWSE the site");
assert_eq!(result.len(), 1);
}
// ── Token-based compaction tests ──────────────────────────
#[test]
fn estimate_history_tokens_empty() {
assert_eq!(super::estimate_history_tokens(&[]), 0);
}
#[test]
fn estimate_history_tokens_single_message() {
let history = vec![ChatMessage::user("hello world")]; // 11 chars
let tokens = super::estimate_history_tokens(&history);
// 11.div_ceil(4) + 4 = 3 + 4 = 7
assert_eq!(tokens, 7);
}
#[test]
fn estimate_history_tokens_multiple_messages() {
let history = vec![
ChatMessage::system("You are helpful."), // 16 chars → 4 + 4 = 8
ChatMessage::user("What is Rust?"), // 13 chars → 4 + 4 = 8
ChatMessage::assistant("A language."), // 11 chars → 3 + 4 = 7
];
let tokens = super::estimate_history_tokens(&history);
assert_eq!(tokens, 23);
}
}
+90
View File
@@ -31,6 +31,7 @@ pub mod nextcloud_talk;
#[cfg(feature = "channel-nostr")]
pub mod nostr;
pub mod qq;
pub mod session_store;
pub mod signal;
pub mod slack;
pub mod telegram;
@@ -312,6 +313,7 @@ struct ChannelRuntimeContext {
model_routes: Arc<Vec<crate::config::ModelRouteConfig>>,
ack_reactions: bool,
show_tool_calls: bool,
session_store: Option<Arc<session_store::SessionStore>>,
}
#[derive(Clone)]
@@ -965,6 +967,13 @@ fn proactive_trim_turns(turns: &mut Vec<ChatMessage>, budget: usize) -> usize {
}
fn append_sender_turn(ctx: &ChannelRuntimeContext, sender_key: &str, turn: ChatMessage) {
// Persist to JSONL before adding to in-memory history.
if let Some(ref store) = ctx.session_store {
if let Err(e) = store.append(sender_key, &turn) {
tracing::warn!("Failed to persist session turn: {e}");
}
}
let mut histories = ctx
.conversation_histories
.lock()
@@ -2186,6 +2195,29 @@ async fn process_channel_message(
&history_key,
ChatMessage::assistant(&history_response),
);
// Fire-and-forget LLM-driven memory consolidation.
if ctx.auto_save_memory && msg.content.chars().count() >= AUTOSAVE_MIN_MESSAGE_CHARS {
let provider = Arc::clone(&ctx.provider);
let model = ctx.model.to_string();
let memory = Arc::clone(&ctx.memory);
let user_msg = msg.content.clone();
let assistant_resp = delivered_response.clone();
tokio::spawn(async move {
if let Err(e) = crate::memory::consolidation::consolidate_turn(
provider.as_ref(),
&model,
memory.as_ref(),
&user_msg,
&assistant_resp,
)
.await
{
tracing::debug!("Memory consolidation skipped: {e}");
}
});
}
println!(
" 🤖 Reply ({}ms): {}",
started_at.elapsed().as_millis(),
@@ -3805,8 +3837,42 @@ pub async fn start_channels(config: Config) -> Result<()> {
model_routes: Arc::new(config.model_routes.clone()),
ack_reactions: config.channels_config.ack_reactions,
show_tool_calls: config.channels_config.show_tool_calls,
session_store: if config.channels_config.session_persistence {
match session_store::SessionStore::new(&config.workspace_dir) {
Ok(store) => {
tracing::info!("📂 Session persistence enabled");
Some(Arc::new(store))
}
Err(e) => {
tracing::warn!("Session persistence disabled: {e}");
None
}
}
} else {
None
},
});
// Hydrate in-memory conversation histories from persisted JSONL session files.
if let Some(ref store) = runtime_ctx.session_store {
let mut hydrated = 0usize;
let mut histories = runtime_ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
for key in store.list_sessions() {
let msgs = store.load(&key);
if !msgs.is_empty() {
hydrated += 1;
histories.insert(key, msgs);
}
}
drop(histories);
if hydrated > 0 {
tracing::info!("📂 Restored {hydrated} session(s) from disk");
}
}
run_message_dispatch_loop(rx, runtime_ctx, max_in_flight_messages).await;
// Wait for all channel tasks
@@ -4072,6 +4138,7 @@ mod tests {
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
};
assert!(compact_sender_history(&ctx, &sender));
@@ -4175,6 +4242,7 @@ mod tests {
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
};
append_sender_turn(&ctx, &sender, ChatMessage::user("hello"));
@@ -4234,6 +4302,7 @@ mod tests {
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
};
assert!(rollback_orphan_user_turn(&ctx, &sender, "pending"));
@@ -4751,6 +4820,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
@@ -4818,6 +4888,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
@@ -4899,6 +4970,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
@@ -4965,6 +5037,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
@@ -5041,6 +5114,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
@@ -5137,6 +5211,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
@@ -5215,6 +5290,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
@@ -5308,6 +5384,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
@@ -5386,6 +5463,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
@@ -5454,6 +5532,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
@@ -5633,6 +5712,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(4);
@@ -5720,6 +5800,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(8);
@@ -5817,6 +5898,7 @@ BTC is currently around $65,000 based on latest tool output."#
},
ack_reactions: true,
show_tool_calls: true,
session_store: None,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
@@ -5921,6 +6003,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(8);
@@ -6002,6 +6085,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
@@ -6068,6 +6152,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
@@ -6692,6 +6777,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
@@ -6784,6 +6870,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
@@ -6876,6 +6963,7 @@ BTC is currently around $65,000 based on latest tool output."#
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
@@ -7432,6 +7520,7 @@ This is an example JSON object for profile settings."#;
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
// Simulate a photo attachment message with [IMAGE:] marker.
@@ -7505,6 +7594,7 @@ This is an example JSON object for profile settings."#;
model_routes: Arc::new(Vec::new()),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
});
process_channel_message(
+200
View File
@@ -0,0 +1,200 @@
//! JSONL-based session persistence for channel conversations.
//!
//! Each session (keyed by `channel_sender` or `channel_thread_sender`) is stored
//! as an append-only JSONL file in `{workspace}/sessions/`. Messages are appended
//! one-per-line as JSON, never modifying old lines. On daemon restart, sessions
//! are loaded from disk to restore conversation context.
use crate::providers::traits::ChatMessage;
use std::io::{BufRead, Write};
use std::path::{Path, PathBuf};
/// Append-only JSONL session store for channel conversations.
pub struct SessionStore {
sessions_dir: PathBuf,
}
impl SessionStore {
/// Create a new session store, ensuring the sessions directory exists.
pub fn new(workspace_dir: &Path) -> std::io::Result<Self> {
let sessions_dir = workspace_dir.join("sessions");
std::fs::create_dir_all(&sessions_dir)?;
Ok(Self { sessions_dir })
}
/// Compute the file path for a session key, sanitizing for filesystem safety.
fn session_path(&self, session_key: &str) -> PathBuf {
let safe_key: String = session_key
.chars()
.map(|c| {
if c.is_alphanumeric() || c == '_' || c == '-' {
c
} else {
'_'
}
})
.collect();
self.sessions_dir.join(format!("{safe_key}.jsonl"))
}
/// Load all messages for a session from its JSONL file.
/// Returns an empty vec if the file does not exist or is unreadable.
pub fn load(&self, session_key: &str) -> Vec<ChatMessage> {
let path = self.session_path(session_key);
let file = match std::fs::File::open(&path) {
Ok(f) => f,
Err(_) => return Vec::new(),
};
let reader = std::io::BufReader::new(file);
let mut messages = Vec::new();
for line in reader.lines() {
let Ok(line) = line else { continue };
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(msg) = serde_json::from_str::<ChatMessage>(trimmed) {
messages.push(msg);
}
}
messages
}
/// Append a single message to the session JSONL file.
pub fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
let path = self.session_path(session_key);
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)?;
let json = serde_json::to_string(message)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
writeln!(file, "{json}")?;
Ok(())
}
/// List all session keys that have files on disk.
pub fn list_sessions(&self) -> Vec<String> {
let entries = match std::fs::read_dir(&self.sessions_dir) {
Ok(e) => e,
Err(_) => return Vec::new(),
};
entries
.filter_map(|entry| {
let entry = entry.ok()?;
let name = entry.file_name().into_string().ok()?;
name.strip_suffix(".jsonl").map(String::from)
})
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn round_trip_append_and_load() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
store
.append("telegram_user123", &ChatMessage::user("hello"))
.unwrap();
store
.append("telegram_user123", &ChatMessage::assistant("hi there"))
.unwrap();
let messages = store.load("telegram_user123");
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].role, "user");
assert_eq!(messages[0].content, "hello");
assert_eq!(messages[1].role, "assistant");
assert_eq!(messages[1].content, "hi there");
}
#[test]
fn load_nonexistent_session_returns_empty() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
let messages = store.load("nonexistent");
assert!(messages.is_empty());
}
#[test]
fn key_sanitization() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
// Keys with special chars should be sanitized
store
.append("slack/thread:123/user", &ChatMessage::user("test"))
.unwrap();
let messages = store.load("slack/thread:123/user");
assert_eq!(messages.len(), 1);
}
#[test]
fn list_sessions_returns_keys() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
store
.append("telegram_alice", &ChatMessage::user("hi"))
.unwrap();
store
.append("discord_bob", &ChatMessage::user("hey"))
.unwrap();
let mut sessions = store.list_sessions();
sessions.sort();
assert_eq!(sessions.len(), 2);
assert!(sessions.contains(&"discord_bob".to_string()));
assert!(sessions.contains(&"telegram_alice".to_string()));
}
#[test]
fn append_is_truly_append_only() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
let key = "test_session";
store.append(key, &ChatMessage::user("msg1")).unwrap();
store.append(key, &ChatMessage::user("msg2")).unwrap();
// Read raw file to verify append-only format
let path = store.session_path(key);
let content = std::fs::read_to_string(&path).unwrap();
let lines: Vec<&str> = content.trim().lines().collect();
assert_eq!(lines.len(), 2);
}
#[test]
fn handles_corrupt_lines_gracefully() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
let key = "corrupt_test";
// Write valid message + corrupt line + valid message
let path = store.session_path(key);
std::fs::create_dir_all(path.parent().unwrap()).unwrap();
let mut file = std::fs::File::create(&path).unwrap();
writeln!(file, r#"{{"role":"user","content":"hello"}}"#).unwrap();
writeln!(file, "this is not valid json").unwrap();
writeln!(file, r#"{{"role":"assistant","content":"world"}}"#).unwrap();
let messages = store.load(key);
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].content, "hello");
assert_eq!(messages[1].content, "world");
}
}
+33 -1
View File
@@ -791,6 +791,11 @@ pub struct AgentConfig {
/// Maximum conversation history messages retained per session. Default: `50`.
#[serde(default = "default_agent_max_history_messages")]
pub max_history_messages: usize,
/// Maximum estimated tokens for conversation history before compaction triggers.
/// Uses ~4 chars/token heuristic. When this threshold is exceeded, older messages
/// are summarized to preserve context while staying within budget. Default: `32000`.
#[serde(default = "default_agent_max_context_tokens")]
pub max_context_tokens: usize,
/// Enable parallel tool execution within a single iteration. Default: `false`.
#[serde(default)]
pub parallel_tools: bool,
@@ -817,6 +822,10 @@ fn default_agent_max_history_messages() -> usize {
50
}
fn default_agent_max_context_tokens() -> usize {
32_000
}
fn default_agent_tool_dispatcher() -> String {
"auto".into()
}
@@ -827,6 +836,7 @@ impl Default for AgentConfig {
compact_context: false,
max_tool_iterations: default_agent_max_tool_iterations(),
max_history_messages: default_agent_max_history_messages(),
max_context_tokens: default_agent_max_context_tokens(),
parallel_tools: false,
tool_dispatcher: default_agent_tool_dispatcher(),
tool_call_dedup_exempt: Vec::new(),
@@ -2895,22 +2905,34 @@ pub struct HeartbeatConfig {
pub enabled: bool,
/// Interval in minutes between heartbeat pings. Default: `30`.
pub interval_minutes: u32,
/// Enable two-phase heartbeat: Phase 1 asks LLM whether to run, Phase 2
/// executes only when the LLM decides there is work to do. Saves API cost
/// during quiet periods. Default: `true`.
#[serde(default = "default_two_phase")]
pub two_phase: bool,
/// Optional fallback task text when `HEARTBEAT.md` has no task entries.
#[serde(default)]
pub message: Option<String>,
/// Optional delivery channel for heartbeat output (for example: `telegram`).
/// When omitted, auto-selects the first configured channel.
#[serde(default, alias = "channel")]
pub target: Option<String>,
/// Optional delivery recipient/chat identifier (required when `target` is set).
/// Optional delivery recipient/chat identifier (required when `target` is
/// explicitly set).
#[serde(default, alias = "recipient")]
pub to: Option<String>,
}
fn default_two_phase() -> bool {
true
}
impl Default for HeartbeatConfig {
fn default() -> Self {
Self {
enabled: false,
interval_minutes: 30,
two_phase: true,
message: None,
target: None,
to: None,
@@ -3040,6 +3062,7 @@ impl<T: ChannelConfig> crate::config::traits::ConfigHandle for ConfigWrapper<T>
///
/// Each channel sub-section (e.g. `telegram`, `discord`) is optional;
/// setting it to `Some(...)` enables that channel.
#[allow(clippy::struct_excessive_bools)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct ChannelsConfig {
/// Enable the CLI interactive channel. Default: `true`.
@@ -3102,6 +3125,10 @@ pub struct ChannelsConfig {
/// not forwarded as individual channel messages. Default: `true`.
#[serde(default = "default_true")]
pub show_tool_calls: bool,
/// Persist channel conversation history to JSONL files so sessions survive
/// daemon restarts. Files are stored in `{workspace}/sessions/`. Default: `true`.
#[serde(default = "default_true")]
pub session_persistence: bool,
}
impl ChannelsConfig {
@@ -3236,6 +3263,7 @@ impl Default for ChannelsConfig {
message_timeout_secs: default_channel_message_timeout_secs(),
ack_reactions: true,
show_tool_calls: true,
session_persistence: true,
}
}
}
@@ -6217,6 +6245,7 @@ default_temperature = 0.7
heartbeat: HeartbeatConfig {
enabled: true,
interval_minutes: 15,
two_phase: true,
message: Some("Check London time".into()),
target: Some("telegram".into()),
to: Some("123456".into()),
@@ -6256,6 +6285,7 @@ default_temperature = 0.7
message_timeout_secs: 300,
ack_reactions: true,
show_tool_calls: true,
session_persistence: true,
},
memory: MemoryConfig::default(),
storage: StorageConfig::default(),
@@ -6970,6 +7000,7 @@ allowed_users = ["@ops:matrix.org"]
message_timeout_secs: 300,
ack_reactions: true,
show_tool_calls: true,
session_persistence: true,
};
let toml_str = toml::to_string_pretty(&c).unwrap();
let parsed: ChannelsConfig = toml::from_str(&toml_str).unwrap();
@@ -7197,6 +7228,7 @@ channel_id = "C123"
message_timeout_secs: 300,
ack_reactions: true,
show_tool_calls: true,
session_persistence: true,
};
let toml_str = toml::to_string_pretty(&c).unwrap();
let parsed: ChannelsConfig = toml::from_str(&toml_str).unwrap();
+140 -58
View File
@@ -203,14 +203,17 @@ where
}
async fn run_heartbeat_worker(config: Config) -> Result<()> {
use crate::heartbeat::engine::HeartbeatEngine;
let observer: std::sync::Arc<dyn crate::observability::Observer> =
std::sync::Arc::from(crate::observability::create_observer(&config.observability));
let engine = crate::heartbeat::engine::HeartbeatEngine::new(
let engine = HeartbeatEngine::new(
config.heartbeat.clone(),
config.workspace_dir.clone(),
observer,
);
let delivery = heartbeat_delivery_target(&config)?;
let delivery = resolve_heartbeat_delivery(&config)?;
let two_phase = config.heartbeat.two_phase;
let interval_mins = config.heartbeat.interval_minutes.max(5);
let mut interval = tokio::time::interval(Duration::from_secs(u64::from(interval_mins) * 60));
@@ -218,14 +221,71 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
loop {
interval.tick().await;
let file_tasks = engine.collect_tasks().await?;
let tasks = heartbeat_tasks_for_tick(file_tasks, config.heartbeat.message.as_deref());
// Collect runnable tasks (active only, sorted by priority)
let mut tasks = engine.collect_runnable_tasks().await?;
if tasks.is_empty() {
continue;
// Try fallback message
if let Some(fallback) = config
.heartbeat
.message
.as_deref()
.map(str::trim)
.filter(|m| !m.is_empty())
{
tasks.push(crate::heartbeat::engine::HeartbeatTask {
text: fallback.to_string(),
priority: crate::heartbeat::engine::TaskPriority::Medium,
status: crate::heartbeat::engine::TaskStatus::Active,
});
} else {
continue;
}
}
for task in tasks {
let prompt = format!("[Heartbeat Task] {task}");
// ── Phase 1: LLM decision (two-phase mode) ──────────────
let tasks_to_run = if two_phase {
let decision_prompt = HeartbeatEngine::build_decision_prompt(&tasks);
match crate::agent::run(
config.clone(),
Some(decision_prompt),
None,
None,
0.0, // Low temperature for deterministic decision
vec![],
false,
None,
)
.await
{
Ok(response) => {
let indices = HeartbeatEngine::parse_decision_response(&response, tasks.len());
if indices.is_empty() {
tracing::info!("💓 Heartbeat Phase 1: skip (nothing to do)");
crate::health::mark_component_ok("heartbeat");
continue;
}
tracing::info!(
"💓 Heartbeat Phase 1: run {} of {} tasks",
indices.len(),
tasks.len()
);
indices
.into_iter()
.filter_map(|i| tasks.get(i).cloned())
.collect()
}
Err(e) => {
tracing::warn!("💓 Heartbeat Phase 1 failed, running all tasks: {e}");
tasks
}
}
} else {
tasks
};
// ── Phase 2: Execute selected tasks ─────────────────────
for task in &tasks_to_run {
let prompt = format!("[Heartbeat Task | {}] {}", task.priority, task.text);
let temp = config.default_temperature;
match crate::agent::run(
config.clone(),
@@ -242,7 +302,7 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
Ok(output) => {
crate::health::mark_component_ok("heartbeat");
let announcement = if output.trim().is_empty() {
"heartbeat task executed".to_string()
format!("💓 heartbeat task completed: {}", task.text)
} else {
output
};
@@ -272,22 +332,8 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
}
}
fn heartbeat_tasks_for_tick(
file_tasks: Vec<String>,
fallback_message: Option<&str>,
) -> Vec<String> {
if !file_tasks.is_empty() {
return file_tasks;
}
fallback_message
.map(str::trim)
.filter(|message| !message.is_empty())
.map(|message| vec![message.to_string()])
.unwrap_or_default()
}
fn heartbeat_delivery_target(config: &Config) -> Result<Option<(String, String)>> {
/// Resolve delivery target: explicit config > auto-detect first configured channel.
fn resolve_heartbeat_delivery(config: &Config) -> Result<Option<(String, String)>> {
let channel = config
.heartbeat
.target
@@ -302,16 +348,45 @@ fn heartbeat_delivery_target(config: &Config) -> Result<Option<(String, String)>
.filter(|value| !value.is_empty());
match (channel, target) {
(None, None) => Ok(None),
(Some(_), None) => anyhow::bail!("heartbeat.to is required when heartbeat.target is set"),
(None, Some(_)) => anyhow::bail!("heartbeat.target is required when heartbeat.to is set"),
// Both explicitly set — validate and use.
(Some(channel), Some(target)) => {
validate_heartbeat_channel_config(config, channel)?;
Ok(Some((channel.to_string(), target.to_string())))
}
// Only one set — error.
(Some(_), None) => anyhow::bail!("heartbeat.to is required when heartbeat.target is set"),
(None, Some(_)) => anyhow::bail!("heartbeat.target is required when heartbeat.to is set"),
// Neither set — try auto-detect the first configured channel.
(None, None) => Ok(auto_detect_heartbeat_channel(config)),
}
}
/// Auto-detect the best channel for heartbeat delivery by checking which
/// channels are configured. Returns the first match in priority order.
fn auto_detect_heartbeat_channel(config: &Config) -> Option<(String, String)> {
// Priority order: telegram > discord > slack > mattermost
if let Some(tg) = &config.channels_config.telegram {
// Use the first allowed_user as target, or fall back to empty (broadcast)
let target = tg.allowed_users.first().cloned().unwrap_or_default();
if !target.is_empty() {
return Some(("telegram".to_string(), target));
}
}
if config.channels_config.discord.is_some() {
// Discord requires explicit target — can't auto-detect
return None;
}
if config.channels_config.slack.is_some() {
// Slack requires explicit target
return None;
}
if config.channels_config.mattermost.is_some() {
// Mattermost requires explicit target
return None;
}
None
}
fn validate_heartbeat_channel_config(config: &Config, channel: &str) -> Result<()> {
match channel.to_ascii_lowercase().as_str() {
"telegram" => {
@@ -487,75 +562,56 @@ mod tests {
}
#[test]
fn heartbeat_tasks_use_file_tasks_when_available() {
let tasks =
heartbeat_tasks_for_tick(vec!["From file".to_string()], Some("Fallback from config"));
assert_eq!(tasks, vec!["From file".to_string()]);
}
#[test]
fn heartbeat_tasks_fall_back_to_config_message() {
let tasks = heartbeat_tasks_for_tick(vec![], Some(" check london time "));
assert_eq!(tasks, vec!["check london time".to_string()]);
}
#[test]
fn heartbeat_tasks_ignore_empty_fallback_message() {
let tasks = heartbeat_tasks_for_tick(vec![], Some(" "));
assert!(tasks.is_empty());
}
#[test]
fn heartbeat_delivery_target_none_when_unset() {
fn resolve_delivery_none_when_unset() {
let config = Config::default();
let target = heartbeat_delivery_target(&config).unwrap();
let target = resolve_heartbeat_delivery(&config).unwrap();
assert!(target.is_none());
}
#[test]
fn heartbeat_delivery_target_requires_to_field() {
fn resolve_delivery_requires_to_field() {
let mut config = Config::default();
config.heartbeat.target = Some("telegram".into());
let err = heartbeat_delivery_target(&config).unwrap_err();
let err = resolve_heartbeat_delivery(&config).unwrap_err();
assert!(err
.to_string()
.contains("heartbeat.to is required when heartbeat.target is set"));
}
#[test]
fn heartbeat_delivery_target_requires_target_field() {
fn resolve_delivery_requires_target_field() {
let mut config = Config::default();
config.heartbeat.to = Some("123456".into());
let err = heartbeat_delivery_target(&config).unwrap_err();
let err = resolve_heartbeat_delivery(&config).unwrap_err();
assert!(err
.to_string()
.contains("heartbeat.target is required when heartbeat.to is set"));
}
#[test]
fn heartbeat_delivery_target_rejects_unsupported_channel() {
fn resolve_delivery_rejects_unsupported_channel() {
let mut config = Config::default();
config.heartbeat.target = Some("email".into());
config.heartbeat.to = Some("ops@example.com".into());
let err = heartbeat_delivery_target(&config).unwrap_err();
let err = resolve_heartbeat_delivery(&config).unwrap_err();
assert!(err
.to_string()
.contains("unsupported heartbeat.target channel"));
}
#[test]
fn heartbeat_delivery_target_requires_channel_configuration() {
fn resolve_delivery_requires_channel_configuration() {
let mut config = Config::default();
config.heartbeat.target = Some("telegram".into());
config.heartbeat.to = Some("123456".into());
let err = heartbeat_delivery_target(&config).unwrap_err();
let err = resolve_heartbeat_delivery(&config).unwrap_err();
assert!(err
.to_string()
.contains("channels_config.telegram is not configured"));
}
#[test]
fn heartbeat_delivery_target_accepts_telegram_configuration() {
fn resolve_delivery_accepts_telegram_configuration() {
let mut config = Config::default();
config.heartbeat.target = Some("telegram".into());
config.heartbeat.to = Some("123456".into());
@@ -568,7 +624,33 @@ mod tests {
mention_only: false,
});
let target = heartbeat_delivery_target(&config).unwrap();
let target = resolve_heartbeat_delivery(&config).unwrap();
assert_eq!(target, Some(("telegram".to_string(), "123456".to_string())));
}
#[test]
fn auto_detect_telegram_when_configured() {
let mut config = Config::default();
config.channels_config.telegram = Some(crate::config::TelegramConfig {
bot_token: "bot-token".into(),
allowed_users: vec!["user123".into()],
stream_mode: crate::config::StreamMode::default(),
draft_update_interval_ms: 1000,
interrupt_on_new_message: false,
mention_only: false,
});
let target = resolve_heartbeat_delivery(&config).unwrap();
assert_eq!(
target,
Some(("telegram".to_string(), "user123".to_string()))
);
}
#[test]
fn auto_detect_none_when_no_channels() {
let config = Config::default();
let target = auto_detect_heartbeat_channel(&config);
assert!(target.is_none());
}
}
+399 -27
View File
@@ -1,11 +1,75 @@
use crate::config::HeartbeatConfig;
use crate::observability::{Observer, ObserverEvent};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::path::Path;
use std::sync::Arc;
use tokio::time::{self, Duration};
use tracing::{info, warn};
// ── Structured task types ────────────────────────────────────────
/// Priority level for a heartbeat task.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum TaskPriority {
Low,
Medium,
High,
}
impl fmt::Display for TaskPriority {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Low => write!(f, "low"),
Self::Medium => write!(f, "medium"),
Self::High => write!(f, "high"),
}
}
}
/// Status of a heartbeat task.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum TaskStatus {
Active,
Paused,
Completed,
}
impl fmt::Display for TaskStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Active => write!(f, "active"),
Self::Paused => write!(f, "paused"),
Self::Completed => write!(f, "completed"),
}
}
}
/// A structured heartbeat task with priority and status metadata.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HeartbeatTask {
pub text: String,
pub priority: TaskPriority,
pub status: TaskStatus,
}
impl HeartbeatTask {
pub fn is_runnable(&self) -> bool {
self.status == TaskStatus::Active
}
}
impl fmt::Display for HeartbeatTask {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[{}] {}", self.priority, self.text)
}
}
// ── Engine ───────────────────────────────────────────────────────
/// Heartbeat engine — reads HEARTBEAT.md and executes tasks periodically
pub struct HeartbeatEngine {
config: HeartbeatConfig,
@@ -64,8 +128,8 @@ impl HeartbeatEngine {
Ok(self.collect_tasks().await?.len())
}
/// Read HEARTBEAT.md and return all parsed tasks.
pub async fn collect_tasks(&self) -> Result<Vec<String>> {
/// Read HEARTBEAT.md and return all parsed structured tasks.
pub async fn collect_tasks(&self) -> Result<Vec<HeartbeatTask>> {
let heartbeat_path = self.workspace_dir.join("HEARTBEAT.md");
if !heartbeat_path.exists() {
return Ok(Vec::new());
@@ -74,13 +138,145 @@ impl HeartbeatEngine {
Ok(Self::parse_tasks(&content))
}
/// Parse tasks from HEARTBEAT.md (lines starting with `- `)
fn parse_tasks(content: &str) -> Vec<String> {
/// Collect only runnable (active) tasks, sorted by priority (high first).
pub async fn collect_runnable_tasks(&self) -> Result<Vec<HeartbeatTask>> {
let mut tasks: Vec<HeartbeatTask> = self
.collect_tasks()
.await?
.into_iter()
.filter(HeartbeatTask::is_runnable)
.collect();
// Sort by priority descending (High > Medium > Low)
tasks.sort_by(|a, b| b.priority.cmp(&a.priority));
Ok(tasks)
}
/// Parse tasks from HEARTBEAT.md with structured metadata support.
///
/// Supports both legacy flat format and new structured format:
///
/// Legacy:
/// `- Check email` → medium priority, active status
///
/// Structured:
/// `- [high] Check email` → high priority, active
/// `- [low|paused] Review old PRs` → low priority, paused
/// `- [completed] Old task` → medium priority, completed
fn parse_tasks(content: &str) -> Vec<HeartbeatTask> {
content
.lines()
.filter_map(|line| {
let trimmed = line.trim();
trimmed.strip_prefix("- ").map(ToString::to_string)
let text = trimmed.strip_prefix("- ")?;
if text.is_empty() {
return None;
}
Some(Self::parse_task_line(text))
})
.collect()
}
/// Parse a single task line into a structured `HeartbeatTask`.
///
/// Format: `[priority|status] task text` or just `task text`.
fn parse_task_line(text: &str) -> HeartbeatTask {
if let Some(rest) = text.strip_prefix('[') {
if let Some((meta, task_text)) = rest.split_once(']') {
let task_text = task_text.trim();
if !task_text.is_empty() {
let (priority, status) = Self::parse_meta(meta);
return HeartbeatTask {
text: task_text.to_string(),
priority,
status,
};
}
}
}
// No metadata — default to medium/active
HeartbeatTask {
text: text.to_string(),
priority: TaskPriority::Medium,
status: TaskStatus::Active,
}
}
/// Parse metadata tags like `high`, `low|paused`, `completed`.
fn parse_meta(meta: &str) -> (TaskPriority, TaskStatus) {
let mut priority = TaskPriority::Medium;
let mut status = TaskStatus::Active;
for part in meta.split('|') {
match part.trim().to_ascii_lowercase().as_str() {
"high" => priority = TaskPriority::High,
"medium" | "med" => priority = TaskPriority::Medium,
"low" => priority = TaskPriority::Low,
"active" => status = TaskStatus::Active,
"paused" | "pause" => status = TaskStatus::Paused,
"completed" | "complete" | "done" => status = TaskStatus::Completed,
_ => {}
}
}
(priority, status)
}
/// Build the Phase 1 LLM decision prompt for two-phase heartbeat.
pub fn build_decision_prompt(tasks: &[HeartbeatTask]) -> String {
let mut prompt = String::from(
"You are a heartbeat scheduler. Review the following periodic tasks and decide \
whether any should be executed right now.\n\n\
Consider:\n\
- Task priority (high tasks are more urgent)\n\
- Whether the task is time-sensitive or can wait\n\
- Whether running the task now would provide value\n\n\
Tasks:\n",
);
for (i, task) in tasks.iter().enumerate() {
use std::fmt::Write;
let _ = writeln!(prompt, "{}. [{}] {}", i + 1, task.priority, task.text);
}
prompt.push_str(
"\nRespond with ONLY one of:\n\
- `run: 1,2,3` (comma-separated task numbers to execute)\n\
- `skip` (nothing needs to run right now)\n\n\
Be conservative skip if tasks are routine and not time-sensitive.",
);
prompt
}
/// Parse the Phase 1 LLM decision response.
///
/// Returns indices of tasks to run, or empty vec if skipped.
pub fn parse_decision_response(response: &str, task_count: usize) -> Vec<usize> {
let trimmed = response.trim().to_ascii_lowercase();
if trimmed == "skip" || trimmed.starts_with("skip") {
return Vec::new();
}
// Look for "run: 1,2,3" pattern
let numbers_part = if let Some(after_run) = trimmed.strip_prefix("run:") {
after_run.trim()
} else if let Some(after_run) = trimmed.strip_prefix("run ") {
after_run.trim()
} else {
// Try to parse as bare numbers
trimmed.as_str()
};
numbers_part
.split(',')
.filter_map(|s| {
let n: usize = s.trim().parse().ok()?;
if n >= 1 && n <= task_count {
Some(n - 1) // Convert to 0-indexed
} else {
None
}
})
.collect()
}
@@ -93,10 +289,14 @@ impl HeartbeatEngine {
# Add tasks below (one per line, starting with `- `)\n\
# The agent will check this file on each heartbeat tick.\n\
#\n\
# Format: - [priority|status] Task description\n\
# priority: high, medium (default), low\n\
# status: active (default), paused, completed\n\
#\n\
# Examples:\n\
# - Check my email for important messages\n\
# - [high] Check my email for important messages\n\
# - Review my calendar for upcoming events\n\
# - Check the weather forecast\n";
# - [low|paused] Check the weather forecast\n";
tokio::fs::write(&path, default).await?;
}
Ok(())
@@ -112,9 +312,9 @@ mod tests {
let content = "# Tasks\n\n- Check email\n- Review calendar\nNot a task\n- Third task";
let tasks = HeartbeatEngine::parse_tasks(content);
assert_eq!(tasks.len(), 3);
assert_eq!(tasks[0], "Check email");
assert_eq!(tasks[1], "Review calendar");
assert_eq!(tasks[2], "Third task");
assert_eq!(tasks[0].text, "Check email");
assert_eq!(tasks[0].priority, TaskPriority::Medium);
assert_eq!(tasks[0].status, TaskStatus::Active);
}
#[test]
@@ -133,26 +333,21 @@ mod tests {
let content = " - Indented task\n\t- Tab indented";
let tasks = HeartbeatEngine::parse_tasks(content);
assert_eq!(tasks.len(), 2);
assert_eq!(tasks[0], "Indented task");
assert_eq!(tasks[1], "Tab indented");
assert_eq!(tasks[0].text, "Indented task");
assert_eq!(tasks[1].text, "Tab indented");
}
#[test]
fn parse_tasks_dash_without_space_ignored() {
let content = "- Real task\n-\n- Another";
let tasks = HeartbeatEngine::parse_tasks(content);
// "-" trimmed = "-", does NOT start with "- " => skipped
// "- Real task" => "Real task"
// "- Another" => "Another"
assert_eq!(tasks.len(), 2);
assert_eq!(tasks[0], "Real task");
assert_eq!(tasks[1], "Another");
assert_eq!(tasks[0].text, "Real task");
assert_eq!(tasks[1].text, "Another");
}
#[test]
fn parse_tasks_trailing_space_bullet_trimmed_to_dash() {
// "- " trimmed becomes "-" (trim removes trailing space)
// "-" does NOT start with "- " => skipped
let content = "- ";
let tasks = HeartbeatEngine::parse_tasks(content);
assert_eq!(tasks.len(), 0);
@@ -160,11 +355,10 @@ mod tests {
#[test]
fn parse_tasks_bullet_with_content_after_spaces() {
// "- hello " trimmed becomes "- hello" => starts_with "- " => "hello"
let content = "- hello ";
let tasks = HeartbeatEngine::parse_tasks(content);
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0], "hello");
assert_eq!(tasks[0].text, "hello");
}
#[test]
@@ -172,8 +366,8 @@ mod tests {
let content = "- Check email 📧\n- Review calendar 📅\n- 日本語タスク";
let tasks = HeartbeatEngine::parse_tasks(content);
assert_eq!(tasks.len(), 3);
assert!(tasks[0].contains("📧"));
assert!(tasks[2].contains("日本語"));
assert!(tasks[0].text.contains('📧'));
assert!(tasks[2].text.contains("日本語"));
}
#[test]
@@ -181,15 +375,15 @@ mod tests {
let content = "# Periodic Tasks\n\n## Quick\n- Task A\n\n## Long\n- Task B\n\n* Not a dash bullet\n1. Not numbered";
let tasks = HeartbeatEngine::parse_tasks(content);
assert_eq!(tasks.len(), 2);
assert_eq!(tasks[0], "Task A");
assert_eq!(tasks[1], "Task B");
assert_eq!(tasks[0].text, "Task A");
assert_eq!(tasks[1].text, "Task B");
}
#[test]
fn parse_tasks_single_task() {
let tasks = HeartbeatEngine::parse_tasks("- Only one");
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0], "Only one");
assert_eq!(tasks[0].text, "Only one");
}
#[test]
@@ -201,9 +395,153 @@ mod tests {
});
let tasks = HeartbeatEngine::parse_tasks(&content);
assert_eq!(tasks.len(), 100);
assert_eq!(tasks[99], "Task 99");
assert_eq!(tasks[99].text, "Task 99");
}
// ── Structured task parsing tests ────────────────────────────
#[test]
fn parse_task_with_high_priority() {
let content = "- [high] Urgent email check";
let tasks = HeartbeatEngine::parse_tasks(content);
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].text, "Urgent email check");
assert_eq!(tasks[0].priority, TaskPriority::High);
assert_eq!(tasks[0].status, TaskStatus::Active);
}
#[test]
fn parse_task_with_low_paused() {
let content = "- [low|paused] Review old PRs";
let tasks = HeartbeatEngine::parse_tasks(content);
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].text, "Review old PRs");
assert_eq!(tasks[0].priority, TaskPriority::Low);
assert_eq!(tasks[0].status, TaskStatus::Paused);
}
#[test]
fn parse_task_completed() {
let content = "- [completed] Old task";
let tasks = HeartbeatEngine::parse_tasks(content);
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].priority, TaskPriority::Medium);
assert_eq!(tasks[0].status, TaskStatus::Completed);
}
#[test]
fn parse_task_without_metadata_defaults() {
let content = "- Plain task";
let tasks = HeartbeatEngine::parse_tasks(content);
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].text, "Plain task");
assert_eq!(tasks[0].priority, TaskPriority::Medium);
assert_eq!(tasks[0].status, TaskStatus::Active);
}
#[test]
fn parse_mixed_structured_and_legacy() {
let content = "- [high] Urgent\n- Normal task\n- [low|paused] Later";
let tasks = HeartbeatEngine::parse_tasks(content);
assert_eq!(tasks.len(), 3);
assert_eq!(tasks[0].priority, TaskPriority::High);
assert_eq!(tasks[1].priority, TaskPriority::Medium);
assert_eq!(tasks[2].priority, TaskPriority::Low);
assert_eq!(tasks[2].status, TaskStatus::Paused);
}
#[test]
fn runnable_filters_paused_and_completed() {
let content = "- [high] Active\n- [low|paused] Paused\n- [completed] Done";
let tasks = HeartbeatEngine::parse_tasks(content);
let runnable: Vec<_> = tasks
.into_iter()
.filter(HeartbeatTask::is_runnable)
.collect();
assert_eq!(runnable.len(), 1);
assert_eq!(runnable[0].text, "Active");
}
// ── Two-phase decision tests ────────────────────────────────
#[test]
fn decision_prompt_includes_all_tasks() {
let tasks = vec![
HeartbeatTask {
text: "Check email".into(),
priority: TaskPriority::High,
status: TaskStatus::Active,
},
HeartbeatTask {
text: "Review calendar".into(),
priority: TaskPriority::Medium,
status: TaskStatus::Active,
},
];
let prompt = HeartbeatEngine::build_decision_prompt(&tasks);
assert!(prompt.contains("1. [high] Check email"));
assert!(prompt.contains("2. [medium] Review calendar"));
assert!(prompt.contains("skip"));
assert!(prompt.contains("run:"));
}
#[test]
fn parse_decision_skip() {
let indices = HeartbeatEngine::parse_decision_response("skip", 3);
assert!(indices.is_empty());
}
#[test]
fn parse_decision_skip_with_reason() {
let indices =
HeartbeatEngine::parse_decision_response("skip — nothing urgent right now", 3);
assert!(indices.is_empty());
}
#[test]
fn parse_decision_run_single() {
let indices = HeartbeatEngine::parse_decision_response("run: 1", 3);
assert_eq!(indices, vec![0]);
}
#[test]
fn parse_decision_run_multiple() {
let indices = HeartbeatEngine::parse_decision_response("run: 1, 3", 3);
assert_eq!(indices, vec![0, 2]);
}
#[test]
fn parse_decision_run_out_of_range_ignored() {
let indices = HeartbeatEngine::parse_decision_response("run: 1, 5, 2", 3);
assert_eq!(indices, vec![0, 1]);
}
#[test]
fn parse_decision_run_zero_ignored() {
let indices = HeartbeatEngine::parse_decision_response("run: 0, 1", 3);
assert_eq!(indices, vec![0]);
}
// ── Task display ────────────────────────────────────────────
#[test]
fn task_display_format() {
let task = HeartbeatTask {
text: "Check email".into(),
priority: TaskPriority::High,
status: TaskStatus::Active,
};
assert_eq!(format!("{task}"), "[high] Check email");
}
#[test]
fn priority_ordering() {
assert!(TaskPriority::High > TaskPriority::Medium);
assert!(TaskPriority::Medium > TaskPriority::Low);
}
// ── Async tests ─────────────────────────────────────────────
#[tokio::test]
async fn ensure_heartbeat_file_creates_file() {
let dir = std::env::temp_dir().join("zeroclaw_test_heartbeat");
@@ -216,6 +554,7 @@ mod tests {
assert!(path.exists());
let content = tokio::fs::read_to_string(&path).await.unwrap();
assert!(content.contains("Periodic Tasks"));
assert!(content.contains("[high]"));
let _ = tokio::fs::remove_dir_all(&dir).await;
}
@@ -301,4 +640,37 @@ mod tests {
let result = engine.run().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn collect_runnable_tasks_sorts_by_priority() {
let dir = std::env::temp_dir().join("zeroclaw_test_runnable_sort");
let _ = tokio::fs::remove_dir_all(&dir).await;
tokio::fs::create_dir_all(&dir).await.unwrap();
tokio::fs::write(
dir.join("HEARTBEAT.md"),
"- [low] Low task\n- [high] High task\n- Medium task\n- [low|paused] Skip me",
)
.await
.unwrap();
let observer: Arc<dyn Observer> = Arc::new(crate::observability::NoopObserver);
let engine = HeartbeatEngine::new(
HeartbeatConfig {
enabled: true,
interval_minutes: 30,
..HeartbeatConfig::default()
},
dir.clone(),
observer,
);
let tasks = engine.collect_runnable_tasks().await.unwrap();
assert_eq!(tasks.len(), 3); // paused one excluded
assert_eq!(tasks[0].priority, TaskPriority::High);
assert_eq!(tasks[1].priority, TaskPriority::Medium);
assert_eq!(tasks[2].priority, TaskPriority::Low);
let _ = tokio::fs::remove_dir_all(&dir).await;
}
}
+153
View File
@@ -0,0 +1,153 @@
//! LLM-driven memory consolidation.
//!
//! After each conversation turn, extracts structured information:
//! - `history_entry`: A timestamped summary for the daily conversation log.
//! - `memory_update`: New facts, preferences, or decisions worth remembering
//! long-term (or `null` if nothing new was learned).
//!
//! This two-phase approach replaces the naive raw-message auto-save with
//! semantic extraction, similar to Nanobot's `save_memory` tool call pattern.
use crate::memory::traits::{Memory, MemoryCategory};
use crate::providers::traits::Provider;
/// Output of consolidation extraction.
#[derive(Debug, serde::Deserialize)]
pub struct ConsolidationResult {
/// Brief timestamped summary for the conversation history log.
pub history_entry: String,
/// New facts/preferences/decisions to store long-term, or None.
pub memory_update: Option<String>,
}
const CONSOLIDATION_SYSTEM_PROMPT: &str = r#"You are a memory consolidation engine. Given a conversation turn, extract:
1. "history_entry": A brief summary of what happened in this turn (1-2 sentences). Include the key topic or action.
2. "memory_update": Any NEW facts, preferences, decisions, or commitments worth remembering long-term. Return null if nothing new was learned.
Respond ONLY with valid JSON: {"history_entry": "...", "memory_update": "..." or null}
Do not include any text outside the JSON object."#;
/// Run two-phase LLM-driven consolidation on a conversation turn.
///
/// Phase 1: Write a history entry to the Daily memory category.
/// Phase 2: Write a memory update to the Core category (if the LLM identified new facts).
///
/// This function is designed to be called fire-and-forget via `tokio::spawn`.
pub async fn consolidate_turn(
provider: &dyn Provider,
model: &str,
memory: &dyn Memory,
user_message: &str,
assistant_response: &str,
) -> anyhow::Result<()> {
let turn_text = format!("User: {user_message}\nAssistant: {assistant_response}");
// Truncate very long turns to avoid wasting tokens on consolidation.
let truncated = if turn_text.len() > 4000 {
format!("{}", &turn_text[..4000])
} else {
turn_text.clone()
};
let raw = provider
.chat_with_system(Some(CONSOLIDATION_SYSTEM_PROMPT), &truncated, model, 0.1)
.await?;
let result: ConsolidationResult = parse_consolidation_response(&raw, &turn_text);
// Phase 1: Write history entry to Daily category.
let date = chrono::Local::now().format("%Y-%m-%d").to_string();
let history_key = format!("daily_{date}_{}", uuid::Uuid::new_v4());
memory
.store(
&history_key,
&result.history_entry,
MemoryCategory::Daily,
None,
)
.await?;
// Phase 2: Write memory update to Core category (if present).
if let Some(ref update) = result.memory_update {
if !update.trim().is_empty() {
let mem_key = format!("core_{}", uuid::Uuid::new_v4());
memory
.store(&mem_key, update, MemoryCategory::Core, None)
.await?;
}
}
Ok(())
}
/// Parse the LLM's consolidation response, with fallback for malformed JSON.
fn parse_consolidation_response(raw: &str, fallback_text: &str) -> ConsolidationResult {
// Try to extract JSON from the response (LLM may wrap in markdown code blocks).
let cleaned = raw
.trim()
.trim_start_matches("```json")
.trim_start_matches("```")
.trim_end_matches("```")
.trim();
serde_json::from_str(cleaned).unwrap_or_else(|_| {
// Fallback: use truncated turn text as history entry.
let summary = if fallback_text.len() > 200 {
format!("{}", &fallback_text[..200])
} else {
fallback_text.to_string()
};
ConsolidationResult {
history_entry: summary,
memory_update: None,
}
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_valid_json_response() {
let raw = r#"{"history_entry": "User asked about Rust.", "memory_update": "User prefers Rust over Go."}"#;
let result = parse_consolidation_response(raw, "fallback");
assert_eq!(result.history_entry, "User asked about Rust.");
assert_eq!(
result.memory_update.as_deref(),
Some("User prefers Rust over Go.")
);
}
#[test]
fn parse_json_with_null_memory() {
let raw = r#"{"history_entry": "Routine greeting.", "memory_update": null}"#;
let result = parse_consolidation_response(raw, "fallback");
assert_eq!(result.history_entry, "Routine greeting.");
assert!(result.memory_update.is_none());
}
#[test]
fn parse_json_wrapped_in_code_block() {
let raw =
"```json\n{\"history_entry\": \"Discussed deployment.\", \"memory_update\": null}\n```";
let result = parse_consolidation_response(raw, "fallback");
assert_eq!(result.history_entry, "Discussed deployment.");
}
#[test]
fn fallback_on_malformed_response() {
let raw = "I'm sorry, I can't do that.";
let result = parse_consolidation_response(raw, "User: hello\nAssistant: hi");
assert_eq!(result.history_entry, "User: hello\nAssistant: hi");
assert!(result.memory_update.is_none());
}
#[test]
fn fallback_truncates_long_text() {
let long_text = "x".repeat(500);
let result = parse_consolidation_response("invalid", &long_text);
// 200 bytes + "…" (3 bytes in UTF-8) = 203
assert!(result.history_entry.len() <= 203);
}
}
+1
View File
@@ -1,6 +1,7 @@
pub mod backend;
pub mod chunker;
pub mod cli;
pub mod consolidation;
pub mod embeddings;
pub mod hygiene;
pub mod lucid;