Compare commits

...

10 Commits

Author SHA1 Message Date
argenis de la rosa 3d007f6b55 docs: add Scoop and AUR workflows to CI map and release process 2026-03-16 14:17:18 -04:00
argenis de la rosa f349de78ed ci(aur): add AUR PKGBUILD template and publishing workflow
Adds Arch Linux distribution via AUR:
- dist/aur/PKGBUILD: package build template with cargo dist profile
- dist/aur/.SRCINFO: AUR metadata
- .github/workflows/pub-aur.yml: manual workflow to push to AUR
2026-03-16 14:16:30 -04:00
argenis de la rosa cd40051f4c ci(scoop): add Scoop manifest template and publishing workflow
Adds Windows package distribution via Scoop:
- dist/scoop/zeroclaw.json: manifest template with checkver/autoupdate
- .github/workflows/pub-scoop.yml: manual workflow to update Scoop bucket
2026-03-16 14:15:29 -04:00
argenis de la rosa 6e4b1ede28 ci(homebrew): restore Homebrew core formula publishing workflow
Re-adds the manual-dispatch workflow for bumping the zeroclaw formula
in Homebrew/homebrew-core via a bot-owned fork. Improved from the
previously removed version with safer env-var handling.

Requires secrets: HOMEBREW_CORE_BOT_TOKEN or HOMEBREW_UPSTREAM_PR_TOKEN
Requires variables: HOMEBREW_CORE_BOT_FORK_REPO, HOMEBREW_CORE_BOT_EMAIL
2026-03-16 14:12:48 -04:00
Argenis 566e3cf35b Merge pull request #3712 from zeroclaw-labs/feat/competitive-edge-heartbeat-sessions-caching
feat: competitive edge — heartbeat metrics, SQLite sessions, two-tier prompt cache
2026-03-16 13:47:42 -04:00
Argenis d642b0f3c8 fix(ci): decouple tweet from crates.io and fix duplicate publish handling (#3716)
Drop crates-io from the tweet job's needs so the release announcement
goes out with GitHub release, Docker, and website — not blocked by
crates.io publish timing.

Also fix the duplicate detection: the previous curl-based check used
a URL that didn't match the actual crate, causing cargo publish to
hit "already exists" and fail the whole job. Now we just run cargo
publish and treat "already exists" as success.
2026-03-16 13:46:05 -04:00
Argenis 8153992a11 fix(ci): scope rust-cache by OS image in stable release workflow (#3711)
Same glibc cache mismatch fix as the beta workflow — ubuntu-22.04 builds
must not share cached build-script binaries with ubuntu-latest (24.04).
2026-03-16 12:56:30 -04:00
argenis de la rosa 98688c61ff feat(cache): wire two-tier response cache, multi-provider token tracking, and cache analytics
- Two-tier response cache: in-memory LRU (hot) + SQLite (warm) with TTL-aware eviction
- Wire response cache into agent turn loop (temp==0.0, text-only responses only)
- Parse Anthropic cache_creation_input_tokens/cache_read_input_tokens
- Parse OpenAI prompt_tokens_details.cached_tokens
- Add cached_input_tokens to TokenUsage, prompt_caching to ProviderCapabilities
- Add CacheHit/CacheMiss observer events with Prometheus counters
- Add response_cache_hot_entries config field (default: 256)
2026-03-16 12:44:48 -04:00
argenis de la rosa 9ba5ba5632 feat(sessions): add SQLite backend with FTS5, trait abstraction, and migration
- Add SessionBackend trait abstracting over storage backends (load,
  append, remove_last, list, search, cleanup_stale, compact)
- Add SqliteSessionBackend with WAL mode, FTS5 full-text search,
  session metadata tracking, and TTL-based cleanup
- Add remove_last() and compact() to JSONL SessionStore
- Implement SessionBackend for both JSONL and SQLite backends
- Add automatic JSONL-to-SQLite migration (renames .jsonl → .jsonl.migrated)
- Add config: session_backend ("jsonl"/"sqlite"), session_ttl_hours
- SQLite is the new default backend; JSONL preserved for backward compat
2026-03-16 12:23:18 -04:00
argenis de la rosa 318ed8e9f1 feat(heartbeat): add health metrics, adaptive intervals, and task history
- Add HeartbeatMetrics struct with uptime, consecutive success/failure
  counts, EMA tick duration, and total ticks
- Add compute_adaptive_interval() for exponential backoff on failures
  and faster polling when high-priority tasks are present
- Add SQLite-backed task run history (src/heartbeat/store.rs) mirroring
  the cron/store.rs pattern with output truncation and pruning
- Add dead-man's switch that alerts if heartbeat stops ticking
- Wire metrics, history recording, and adaptive sleep into daemon worker
- Add config fields: adaptive, min/max_interval_minutes,
  deadman_timeout_minutes, deadman_channel, deadman_to, max_run_history
- All new fields are backward-compatible with serde defaults
2026-03-16 12:08:32 -04:00
37 changed files with 2370 additions and 48 deletions
+155
View File
@@ -0,0 +1,155 @@
name: Pub AUR Package
on:
workflow_dispatch:
inputs:
release_tag:
description: "Existing release tag (vX.Y.Z)"
required: true
type: string
dry_run:
description: "Generate PKGBUILD only (no push)"
required: false
default: true
type: boolean
concurrency:
group: aur-publish-${{ github.run_id }}
cancel-in-progress: false
permissions:
contents: read
jobs:
publish-aur:
name: Update AUR Package
runs-on: ubuntu-latest
env:
RELEASE_TAG: ${{ inputs.release_tag }}
DRY_RUN: ${{ inputs.dry_run }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Validate and compute metadata
id: meta
shell: bash
run: |
set -euo pipefail
if [[ ! "$RELEASE_TAG" =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
echo "::error::release_tag must be vX.Y.Z format."
exit 1
fi
version="${RELEASE_TAG#v}"
tarball_url="https://github.com/${GITHUB_REPOSITORY}/archive/refs/tags/${RELEASE_TAG}.tar.gz"
tarball_sha="$(curl -fsSL "$tarball_url" | sha256sum | awk '{print $1}')"
if [[ -z "$tarball_sha" ]]; then
echo "::error::Could not compute SHA256 for source tarball."
exit 1
fi
{
echo "version=$version"
echo "tarball_url=$tarball_url"
echo "tarball_sha=$tarball_sha"
} >> "$GITHUB_OUTPUT"
{
echo "### AUR Package Metadata"
echo "- version: \`${version}\`"
echo "- tarball_url: \`${tarball_url}\`"
echo "- tarball_sha: \`${tarball_sha}\`"
} >> "$GITHUB_STEP_SUMMARY"
- name: Generate PKGBUILD
id: pkgbuild
shell: bash
env:
VERSION: ${{ steps.meta.outputs.version }}
TARBALL_SHA: ${{ steps.meta.outputs.tarball_sha }}
run: |
set -euo pipefail
pkgbuild_file="$(mktemp)"
sed -e "s/^pkgver=.*/pkgver=${VERSION}/" \
-e "s/^sha256sums=.*/sha256sums=('${TARBALL_SHA}')/" \
dist/aur/PKGBUILD > "$pkgbuild_file"
echo "pkgbuild_file=$pkgbuild_file" >> "$GITHUB_OUTPUT"
echo "### Generated PKGBUILD" >> "$GITHUB_STEP_SUMMARY"
echo '```bash' >> "$GITHUB_STEP_SUMMARY"
cat "$pkgbuild_file" >> "$GITHUB_STEP_SUMMARY"
echo '```' >> "$GITHUB_STEP_SUMMARY"
- name: Generate .SRCINFO
id: srcinfo
shell: bash
env:
VERSION: ${{ steps.meta.outputs.version }}
TARBALL_SHA: ${{ steps.meta.outputs.tarball_sha }}
run: |
set -euo pipefail
srcinfo_file="$(mktemp)"
sed -e "s/pkgver = .*/pkgver = ${VERSION}/" \
-e "s/sha256sums = .*/sha256sums = ${TARBALL_SHA}/" \
-e "s|zeroclaw-[0-9.]*.tar.gz|zeroclaw-${VERSION}.tar.gz|g" \
-e "s|/v[0-9.]*\.tar\.gz|/v${VERSION}.tar.gz|g" \
dist/aur/.SRCINFO > "$srcinfo_file"
echo "srcinfo_file=$srcinfo_file" >> "$GITHUB_OUTPUT"
- name: Push to AUR
if: inputs.dry_run == false
shell: bash
env:
AUR_SSH_KEY: ${{ secrets.AUR_SSH_KEY }}
PKGBUILD_FILE: ${{ steps.pkgbuild.outputs.pkgbuild_file }}
SRCINFO_FILE: ${{ steps.srcinfo.outputs.srcinfo_file }}
VERSION: ${{ steps.meta.outputs.version }}
run: |
set -euo pipefail
if [[ -z "${AUR_SSH_KEY}" ]]; then
echo "::error::Secret AUR_SSH_KEY is required for non-dry-run."
exit 1
fi
mkdir -p ~/.ssh
echo "$AUR_SSH_KEY" > ~/.ssh/aur
chmod 600 ~/.ssh/aur
cat >> ~/.ssh/config <<SSH_CONFIG
Host aur.archlinux.org
IdentityFile ~/.ssh/aur
User aur
StrictHostKeyChecking accept-new
SSH_CONFIG
tmp_dir="$(mktemp -d)"
git clone ssh://aur@aur.archlinux.org/zeroclaw.git "$tmp_dir/aur"
cp "$PKGBUILD_FILE" "$tmp_dir/aur/PKGBUILD"
cp "$SRCINFO_FILE" "$tmp_dir/aur/.SRCINFO"
cd "$tmp_dir/aur"
git config user.name "zeroclaw-bot"
git config user.email "bot@zeroclaw.dev"
git add PKGBUILD .SRCINFO
git commit -m "zeroclaw ${VERSION}"
git push origin HEAD
echo "AUR package updated to ${VERSION}"
- name: Summary
shell: bash
run: |
if [[ "$DRY_RUN" == "true" ]]; then
echo "Dry run complete: PKGBUILD generated, no push performed."
else
echo "Publish complete: AUR package pushed."
fi
+206
View File
@@ -0,0 +1,206 @@
name: Pub Homebrew Core
on:
workflow_dispatch:
inputs:
release_tag:
description: "Existing release tag to publish (vX.Y.Z)"
required: true
type: string
dry_run:
description: "Patch formula only (no push/PR)"
required: false
default: true
type: boolean
concurrency:
group: homebrew-core-${{ github.run_id }}
cancel-in-progress: false
permissions:
contents: read
jobs:
publish-homebrew-core:
name: Publish Homebrew Core PR
runs-on: ubuntu-latest
env:
UPSTREAM_REPO: Homebrew/homebrew-core
FORMULA_PATH: Formula/z/zeroclaw.rb
RELEASE_TAG: ${{ inputs.release_tag }}
DRY_RUN: ${{ inputs.dry_run }}
BOT_FORK_REPO: ${{ vars.HOMEBREW_CORE_BOT_FORK_REPO }}
BOT_EMAIL: ${{ vars.HOMEBREW_CORE_BOT_EMAIL }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Validate release tag and version alignment
id: release_meta
shell: bash
run: |
set -euo pipefail
semver_pattern='^v[0-9]+\.[0-9]+\.[0-9]+([.-][0-9A-Za-z.-]+)?$'
if [[ ! "$RELEASE_TAG" =~ $semver_pattern ]]; then
echo "::error::release_tag must match semver-like format (vX.Y.Z[-suffix])."
exit 1
fi
if ! git rev-parse "refs/tags/${RELEASE_TAG}" >/dev/null 2>&1; then
git fetch --tags origin
fi
tag_version="${RELEASE_TAG#v}"
cargo_version="$(git show "${RELEASE_TAG}:Cargo.toml" \
| sed -n 's/^version = "\([^"]*\)"/\1/p' | head -n1)"
if [[ -z "$cargo_version" ]]; then
echo "::error::Unable to read Cargo.toml version from tag ${RELEASE_TAG}."
exit 1
fi
if [[ "$cargo_version" != "$tag_version" ]]; then
echo "::error::Tag ${RELEASE_TAG} does not match Cargo.toml version (${cargo_version})."
exit 1
fi
tarball_url="https://github.com/${GITHUB_REPOSITORY}/archive/refs/tags/${RELEASE_TAG}.tar.gz"
tarball_sha="$(curl -fsSL "$tarball_url" | sha256sum | awk '{print $1}')"
{
echo "tag_version=$tag_version"
echo "tarball_url=$tarball_url"
echo "tarball_sha=$tarball_sha"
} >> "$GITHUB_OUTPUT"
{
echo "### Release Metadata"
echo "- release_tag: \`${RELEASE_TAG}\`"
echo "- cargo_version: \`${cargo_version}\`"
echo "- tarball_sha256: \`${tarball_sha}\`"
echo "- dry_run: ${DRY_RUN}"
} >> "$GITHUB_STEP_SUMMARY"
- name: Patch Homebrew formula
id: patch_formula
shell: bash
env:
HOMEBREW_CORE_BOT_TOKEN: ${{ secrets.HOMEBREW_UPSTREAM_PR_TOKEN || secrets.HOMEBREW_CORE_BOT_TOKEN }}
GH_TOKEN: ${{ secrets.HOMEBREW_UPSTREAM_PR_TOKEN || secrets.HOMEBREW_CORE_BOT_TOKEN }}
run: |
set -euo pipefail
tmp_repo="$(mktemp -d)"
echo "tmp_repo=$tmp_repo" >> "$GITHUB_OUTPUT"
if [[ "$DRY_RUN" == "true" ]]; then
git clone --depth=1 "https://github.com/${UPSTREAM_REPO}.git" "$tmp_repo/homebrew-core"
else
if [[ -z "${BOT_FORK_REPO}" ]]; then
echo "::error::Repository variable HOMEBREW_CORE_BOT_FORK_REPO is required when dry_run=false."
exit 1
fi
if [[ -z "${HOMEBREW_CORE_BOT_TOKEN}" ]]; then
echo "::error::Repository secret HOMEBREW_CORE_BOT_TOKEN is required when dry_run=false."
exit 1
fi
if [[ "$BOT_FORK_REPO" != */* ]]; then
echo "::error::HOMEBREW_CORE_BOT_FORK_REPO must be in owner/repo format."
exit 1
fi
if ! gh api "repos/${BOT_FORK_REPO}" >/dev/null 2>&1; then
echo "::error::HOMEBREW_CORE_BOT_TOKEN cannot access ${BOT_FORK_REPO}."
exit 1
fi
gh repo clone "${BOT_FORK_REPO}" "$tmp_repo/homebrew-core" -- --depth=1
fi
repo_dir="$tmp_repo/homebrew-core"
formula_file="$repo_dir/$FORMULA_PATH"
if [[ ! -f "$formula_file" ]]; then
echo "::error::Formula file not found: $FORMULA_PATH"
exit 1
fi
if [[ "$DRY_RUN" == "false" ]]; then
if git -C "$repo_dir" remote get-url upstream >/dev/null 2>&1; then
git -C "$repo_dir" remote set-url upstream "https://github.com/${UPSTREAM_REPO}.git"
else
git -C "$repo_dir" remote add upstream "https://github.com/${UPSTREAM_REPO}.git"
fi
if git -C "$repo_dir" ls-remote --exit-code --heads upstream main >/dev/null 2>&1; then
upstream_ref="main"
else
upstream_ref="master"
fi
git -C "$repo_dir" fetch --depth=1 upstream "$upstream_ref"
branch_name="zeroclaw-${RELEASE_TAG}-${GITHUB_RUN_ID}"
git -C "$repo_dir" checkout -B "$branch_name" "upstream/$upstream_ref"
echo "branch_name=$branch_name" >> "$GITHUB_OUTPUT"
fi
tarball_url="$(grep 'tarball_url=' "$GITHUB_OUTPUT" | head -1 | cut -d= -f2-)"
tarball_sha="$(grep 'tarball_sha=' "$GITHUB_OUTPUT" | head -1 | cut -d= -f2-)"
perl -0pi -e "s|^ url \".*\"| url \"${tarball_url}\"|m" "$formula_file"
perl -0pi -e "s|^ sha256 \".*\"| sha256 \"${tarball_sha}\"|m" "$formula_file"
perl -0pi -e "s|^ license \".*\"| license \"Apache-2.0 OR MIT\"|m" "$formula_file"
git -C "$repo_dir" diff -- "$FORMULA_PATH" > "$tmp_repo/formula.diff"
if [[ ! -s "$tmp_repo/formula.diff" ]]; then
echo "::error::No formula changes generated. Nothing to publish."
exit 1
fi
{
echo "### Formula Diff"
echo '```diff'
cat "$tmp_repo/formula.diff"
echo '```'
} >> "$GITHUB_STEP_SUMMARY"
- name: Push branch and open Homebrew PR
if: inputs.dry_run == false
shell: bash
env:
GH_TOKEN: ${{ secrets.HOMEBREW_UPSTREAM_PR_TOKEN || secrets.HOMEBREW_CORE_BOT_TOKEN }}
TMP_REPO: ${{ steps.patch_formula.outputs.tmp_repo }}
BRANCH_NAME: ${{ steps.patch_formula.outputs.branch_name }}
TAG_VERSION: ${{ steps.release_meta.outputs.tag_version }}
TARBALL_URL: ${{ steps.release_meta.outputs.tarball_url }}
TARBALL_SHA: ${{ steps.release_meta.outputs.tarball_sha }}
run: |
set -euo pipefail
repo_dir="${TMP_REPO}/homebrew-core"
fork_owner="${BOT_FORK_REPO%%/*}"
bot_email="${BOT_EMAIL:-${fork_owner}@users.noreply.github.com}"
git -C "$repo_dir" config user.name "$fork_owner"
git -C "$repo_dir" config user.email "$bot_email"
git -C "$repo_dir" add "$FORMULA_PATH"
git -C "$repo_dir" commit -m "zeroclaw ${TAG_VERSION}"
gh auth setup-git
git -C "$repo_dir" push --set-upstream origin "$BRANCH_NAME"
pr_body="Automated formula bump from ZeroClaw release workflow.
- Release tag: ${RELEASE_TAG}
- Source tarball: ${TARBALL_URL}
- Source sha256: ${TARBALL_SHA}"
gh pr create \
--repo "$UPSTREAM_REPO" \
--base main \
--head "${fork_owner}:${BRANCH_NAME}" \
--title "zeroclaw ${TAG_VERSION}" \
--body "$pr_body"
- name: Summary
shell: bash
run: |
if [[ "$DRY_RUN" == "true" ]]; then
echo "Dry run complete: formula diff generated, no push/PR performed."
else
echo "Publish complete: branch pushed and PR opened from bot fork."
fi
+151
View File
@@ -0,0 +1,151 @@
name: Pub Scoop Manifest
on:
workflow_dispatch:
inputs:
release_tag:
description: "Existing release tag (vX.Y.Z)"
required: true
type: string
dry_run:
description: "Generate manifest only (no push)"
required: false
default: true
type: boolean
concurrency:
group: scoop-publish-${{ github.run_id }}
cancel-in-progress: false
permissions:
contents: read
jobs:
publish-scoop:
name: Update Scoop Manifest
runs-on: ubuntu-latest
env:
RELEASE_TAG: ${{ inputs.release_tag }}
DRY_RUN: ${{ inputs.dry_run }}
SCOOP_BUCKET_REPO: ${{ vars.SCOOP_BUCKET_REPO }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Validate and compute metadata
id: meta
shell: bash
run: |
set -euo pipefail
if [[ ! "$RELEASE_TAG" =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
echo "::error::release_tag must be vX.Y.Z format."
exit 1
fi
version="${RELEASE_TAG#v}"
zip_url="https://github.com/${GITHUB_REPOSITORY}/releases/download/${RELEASE_TAG}/zeroclaw-x86_64-pc-windows-msvc.zip"
sums_url="https://github.com/${GITHUB_REPOSITORY}/releases/download/${RELEASE_TAG}/SHA256SUMS"
sha256="$(curl -fsSL "$sums_url" | grep 'zeroclaw-x86_64-pc-windows-msvc.zip' | awk '{print $1}')"
if [[ -z "$sha256" ]]; then
echo "::error::Could not find Windows binary hash in SHA256SUMS for ${RELEASE_TAG}."
exit 1
fi
{
echo "version=$version"
echo "zip_url=$zip_url"
echo "sha256=$sha256"
} >> "$GITHUB_OUTPUT"
{
echo "### Scoop Manifest Metadata"
echo "- version: \`${version}\`"
echo "- zip_url: \`${zip_url}\`"
echo "- sha256: \`${sha256}\`"
} >> "$GITHUB_STEP_SUMMARY"
- name: Generate manifest
id: manifest
shell: bash
env:
VERSION: ${{ steps.meta.outputs.version }}
ZIP_URL: ${{ steps.meta.outputs.zip_url }}
SHA256: ${{ steps.meta.outputs.sha256 }}
run: |
set -euo pipefail
manifest_file="$(mktemp)"
cat > "$manifest_file" <<MANIFEST
{
"version": "${VERSION}",
"description": "Zero overhead. Zero compromise. 100% Rust. The fastest, smallest AI assistant.",
"homepage": "https://github.com/zeroclaw-labs/zeroclaw",
"license": "MIT|Apache-2.0",
"architecture": {
"64bit": {
"url": "${ZIP_URL}",
"hash": "${SHA256}",
"bin": "zeroclaw.exe"
}
},
"checkver": {
"github": "https://github.com/zeroclaw-labs/zeroclaw"
},
"autoupdate": {
"architecture": {
"64bit": {
"url": "https://github.com/zeroclaw-labs/zeroclaw/releases/download/v\$version/zeroclaw-x86_64-pc-windows-msvc.zip"
}
},
"hash": {
"url": "https://github.com/zeroclaw-labs/zeroclaw/releases/download/v\$version/SHA256SUMS",
"regex": "([a-f0-9]{64})\\\\s+zeroclaw-x86_64-pc-windows-msvc\\\\.zip"
}
}
}
MANIFEST
jq '.' "$manifest_file" > "${manifest_file}.formatted"
mv "${manifest_file}.formatted" "$manifest_file"
echo "manifest_file=$manifest_file" >> "$GITHUB_OUTPUT"
echo "### Generated Manifest" >> "$GITHUB_STEP_SUMMARY"
echo '```json' >> "$GITHUB_STEP_SUMMARY"
cat "$manifest_file" >> "$GITHUB_STEP_SUMMARY"
echo '```' >> "$GITHUB_STEP_SUMMARY"
- name: Push to Scoop bucket
if: inputs.dry_run == false
shell: bash
env:
GH_TOKEN: ${{ secrets.SCOOP_BUCKET_TOKEN }}
MANIFEST_FILE: ${{ steps.manifest.outputs.manifest_file }}
VERSION: ${{ steps.meta.outputs.version }}
run: |
set -euo pipefail
if [[ -z "${SCOOP_BUCKET_REPO}" ]]; then
echo "::error::Repository variable SCOOP_BUCKET_REPO is required (e.g. zeroclaw-labs/scoop-zeroclaw)."
exit 1
fi
tmp_dir="$(mktemp -d)"
gh repo clone "${SCOOP_BUCKET_REPO}" "$tmp_dir/bucket" -- --depth=1
mkdir -p "$tmp_dir/bucket/bucket"
cp "$MANIFEST_FILE" "$tmp_dir/bucket/bucket/zeroclaw.json"
cd "$tmp_dir/bucket"
git config user.name "zeroclaw-bot"
git config user.email "bot@zeroclaw.dev"
git add bucket/zeroclaw.json
git commit -m "zeroclaw ${VERSION}"
gh auth setup-git
git push origin HEAD
echo "Scoop manifest updated to ${VERSION}"
+11 -6
View File
@@ -190,6 +190,8 @@ jobs:
targets: ${{ matrix.target }}
- uses: Swatinem/rust-cache@779680da715d629ac1d338a641029a2f4372abb5 # v2
if: runner.os != 'Windows'
with:
prefix-key: ${{ matrix.os }}-${{ matrix.target }}
- uses: actions/download-artifact@v4
with:
@@ -305,13 +307,16 @@ jobs:
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
VERSION: ${{ inputs.version }}
run: |
# Skip if this version is already on crates.io (auto-sync may have published it)
# Publish to crates.io; treat "already exists" as success
# (auto-publish workflow may have already published this version)
CRATE_NAME=$(sed -n 's/^name = "\([^"]*\)"/\1/p' Cargo.toml | head -1)
if curl -sfL "https://crates.io/api/v1/crates/${CRATE_NAME}/${VERSION}" | grep -q '"version"'; then
echo "::notice::${CRATE_NAME}@${VERSION} already published on crates.io — skipping"
else
cargo publish --locked --allow-dirty --no-verify
OUTPUT=$(cargo publish --locked --allow-dirty --no-verify 2>&1) && exit 0
echo "$OUTPUT"
if echo "$OUTPUT" | grep -q 'already exists'; then
echo "::notice::${CRATE_NAME}@${VERSION} already on crates.io — skipping"
exit 0
fi
exit 1
redeploy-website:
name: Trigger Website Redeploy
@@ -359,7 +364,7 @@ jobs:
# ── Post-publish: only run after ALL artifacts are live ──────────────
tweet:
name: Tweet Release
needs: [validate, publish, docker, crates-io, redeploy-website]
needs: [validate, publish, docker, redeploy-website]
uses: ./.github/workflows/tweet-release.yml
with:
release_tag: ${{ needs.validate.outputs.tag }}
+16
View File
@@ -0,0 +1,16 @@
pkgbase = zeroclaw
pkgdesc = Zero overhead. Zero compromise. 100% Rust. The fastest, smallest AI assistant.
pkgver = 0.4.0
pkgrel = 1
url = https://github.com/zeroclaw-labs/zeroclaw
arch = x86_64
license = MIT
license = Apache-2.0
makedepends = cargo
makedepends = git
depends = gcc-libs
depends = openssl
source = zeroclaw-0.4.0.tar.gz::https://github.com/zeroclaw-labs/zeroclaw/archive/refs/tags/v0.4.0.tar.gz
sha256sums = SKIP
pkgname = zeroclaw
+32
View File
@@ -0,0 +1,32 @@
# Maintainer: zeroclaw-labs <bot@zeroclaw.dev>
pkgname=zeroclaw
pkgver=0.4.0
pkgrel=1
pkgdesc="Zero overhead. Zero compromise. 100% Rust. The fastest, smallest AI assistant."
arch=('x86_64')
url="https://github.com/zeroclaw-labs/zeroclaw"
license=('MIT' 'Apache-2.0')
depends=('gcc-libs' 'openssl')
makedepends=('cargo' 'git')
source=("${pkgname}-${pkgver}.tar.gz::https://github.com/zeroclaw-labs/zeroclaw/archive/refs/tags/v${pkgver}.tar.gz")
sha256sums=('SKIP')
prepare() {
cd "${pkgname}-${pkgver}"
export RUSTUP_TOOLCHAIN=stable
cargo fetch --locked --target "$(rustc -vV | sed -n 's/host: //p')"
}
build() {
cd "${pkgname}-${pkgver}"
export RUSTUP_TOOLCHAIN=stable
export CARGO_TARGET_DIR=target
cargo build --frozen --release --profile dist
}
package() {
cd "${pkgname}-${pkgver}"
install -Dm0755 -t "${pkgdir}/usr/bin/" "target/dist/zeroclaw"
install -Dm0644 LICENSE-MIT "${pkgdir}/usr/share/licenses/${pkgname}/LICENSE-MIT"
install -Dm0644 LICENSE-APACHE "${pkgdir}/usr/share/licenses/${pkgname}/LICENSE-APACHE"
}
+27
View File
@@ -0,0 +1,27 @@
{
"version": "0.4.0",
"description": "Zero overhead. Zero compromise. 100% Rust. The fastest, smallest AI assistant.",
"homepage": "https://github.com/zeroclaw-labs/zeroclaw",
"license": "MIT|Apache-2.0",
"architecture": {
"64bit": {
"url": "https://github.com/zeroclaw-labs/zeroclaw/releases/download/v0.4.0/zeroclaw-x86_64-pc-windows-msvc.zip",
"hash": "",
"bin": "zeroclaw.exe"
}
},
"checkver": {
"github": "https://github.com/zeroclaw-labs/zeroclaw"
},
"autoupdate": {
"architecture": {
"64bit": {
"url": "https://github.com/zeroclaw-labs/zeroclaw/releases/download/v$version/zeroclaw-x86_64-pc-windows-msvc.zip"
}
},
"hash": {
"url": "https://github.com/zeroclaw-labs/zeroclaw/releases/download/v$version/SHA256SUMS",
"regex": "([a-f0-9]{64})\\s+zeroclaw-x86_64-pc-windows-msvc\\.zip"
}
}
}
+16 -6
View File
@@ -37,6 +37,12 @@ Merge-blocking checks should stay small and deterministic. Optional checks are u
- `.github/workflows/pub-homebrew-core.yml` (`Pub Homebrew Core`)
- Purpose: manual, bot-owned Homebrew core formula bump PR flow for tagged releases
- Guardrail: release tag must match `Cargo.toml` version
- `.github/workflows/pub-scoop.yml` (`Pub Scoop Manifest`)
- Purpose: manual Scoop bucket manifest update for Windows distribution
- Guardrail: release tag must be `vX.Y.Z` format; Windows binary hash extracted from `SHA256SUMS`
- `.github/workflows/pub-aur.yml` (`Pub AUR Package`)
- Purpose: manual AUR PKGBUILD push for Arch Linux distribution
- Guardrail: release tag must be `vX.Y.Z` format; source tarball SHA256 computed at publish time
- `.github/workflows/pr-label-policy-check.yml` (`Label Policy Sanity`)
- Purpose: validate shared contributor-tier policy in `.github/label-policy.json` and ensure label workflows consume that policy
- `.github/workflows/test-rust-build.yml` (`Rust Reusable Job`)
@@ -75,6 +81,8 @@ Merge-blocking checks should stay small and deterministic. Optional checks are u
- `Docker`: tag push (`v*`) for publish, matching PRs to `master` for smoke build, manual dispatch for smoke only
- `Release`: tag push (`v*`), weekly schedule (verification-only), manual dispatch (verification or publish)
- `Pub Homebrew Core`: manual dispatch only
- `Pub Scoop Manifest`: manual dispatch only
- `Pub AUR Package`: manual dispatch only
- `Security Audit`: push to `master`, PRs to `master`, weekly schedule
- `Sec Vorpal Reviewdog`: manual dispatch only
- `Workflow Sanity`: PR/push when `.github/workflows/**`, `.github/*.yml`, or `.github/*.yaml` change
@@ -92,12 +100,14 @@ Merge-blocking checks should stay small and deterministic. Optional checks are u
2. Docker failures on PRs: inspect `.github/workflows/pub-docker-img.yml` `pr-smoke` job.
3. Release failures (tag/manual/scheduled): inspect `.github/workflows/pub-release.yml` and the `prepare` job outputs.
4. Homebrew formula publish failures: inspect `.github/workflows/pub-homebrew-core.yml` summary output and bot token/fork variables.
5. Security failures: inspect `.github/workflows/sec-audit.yml` and `deny.toml`.
6. Workflow syntax/lint failures: inspect `.github/workflows/workflow-sanity.yml`.
7. PR intake failures: inspect `.github/workflows/pr-intake-checks.yml` sticky comment and run logs.
8. Label policy parity failures: inspect `.github/workflows/pr-label-policy-check.yml`.
9. Docs failures in CI: inspect `docs-quality` job logs in `.github/workflows/ci-run.yml`.
10. Strict delta lint failures in CI: inspect `lint-strict-delta` job logs and compare with `BASE_SHA` diff scope.
5. Scoop manifest publish failures: inspect `.github/workflows/pub-scoop.yml` summary output and `SCOOP_BUCKET_REPO`/`SCOOP_BUCKET_TOKEN` settings.
6. AUR package publish failures: inspect `.github/workflows/pub-aur.yml` summary output and `AUR_SSH_KEY` secret.
7. Security failures: inspect `.github/workflows/sec-audit.yml` and `deny.toml`.
8. Workflow syntax/lint failures: inspect `.github/workflows/workflow-sanity.yml`.
9. PR intake failures: inspect `.github/workflows/pr-intake-checks.yml` sticky comment and run logs.
10. Label policy parity failures: inspect `.github/workflows/pr-label-policy-check.yml`.
11. Docs failures in CI: inspect `docs-quality` job logs in `.github/workflows/ci-run.yml`.
12. Strict delta lint failures in CI: inspect `lint-strict-delta` job logs and compare with `BASE_SHA` diff scope.
## Maintenance Rules
+37
View File
@@ -23,6 +23,8 @@ Release automation lives in:
- `.github/workflows/pub-release.yml`
- `.github/workflows/pub-homebrew-core.yml` (manual Homebrew formula PR, bot-owned)
- `.github/workflows/pub-scoop.yml` (manual Scoop bucket manifest update)
- `.github/workflows/pub-aur.yml` (manual AUR PKGBUILD push)
Modes:
@@ -115,6 +117,41 @@ Workflow guardrails:
- formula license is normalized to `Apache-2.0 OR MIT`
- PR is opened from the bot fork into `Homebrew/homebrew-core:master`
### 7) Publish Scoop manifest (Windows)
Run `Pub Scoop Manifest` manually:
- `release_tag`: `vX.Y.Z`
- `dry_run`: `true` first, then `false`
Required repository settings for non-dry-run:
- secret: `SCOOP_BUCKET_TOKEN` (PAT with push access to the bucket repo)
- variable: `SCOOP_BUCKET_REPO` (for example `zeroclaw-labs/scoop-zeroclaw`)
Workflow guardrails:
- release tag must be `vX.Y.Z` format
- Windows binary SHA256 extracted from `SHA256SUMS` release asset
- manifest pushed to `bucket/zeroclaw.json` in the Scoop bucket repo
### 8) Publish AUR package (Arch Linux)
Run `Pub AUR Package` manually:
- `release_tag`: `vX.Y.Z`
- `dry_run`: `true` first, then `false`
Required repository settings for non-dry-run:
- secret: `AUR_SSH_KEY` (SSH private key registered with AUR)
Workflow guardrails:
- release tag must be `vX.Y.Z` format
- source tarball SHA256 computed from the tagged release
- PKGBUILD and .SRCINFO pushed to AUR `zeroclaw` package
## Emergency / Recovery Path
If tag-push release fails after artifacts are validated:
+78
View File
@@ -38,6 +38,7 @@ pub struct Agent {
available_hints: Vec<String>,
route_model_by_hint: HashMap<String, String>,
allowed_tools: Option<Vec<String>>,
response_cache: Option<Arc<crate::memory::response_cache::ResponseCache>>,
}
pub struct AgentBuilder {
@@ -60,6 +61,7 @@ pub struct AgentBuilder {
available_hints: Option<Vec<String>>,
route_model_by_hint: Option<HashMap<String, String>>,
allowed_tools: Option<Vec<String>>,
response_cache: Option<Arc<crate::memory::response_cache::ResponseCache>>,
}
impl AgentBuilder {
@@ -84,6 +86,7 @@ impl AgentBuilder {
available_hints: None,
route_model_by_hint: None,
allowed_tools: None,
response_cache: None,
}
}
@@ -188,6 +191,14 @@ impl AgentBuilder {
self
}
pub fn response_cache(
mut self,
cache: Option<Arc<crate::memory::response_cache::ResponseCache>>,
) -> Self {
self.response_cache = cache;
self
}
pub fn build(self) -> Result<Agent> {
let mut tools = self
.tools
@@ -236,6 +247,7 @@ impl AgentBuilder {
available_hints: self.available_hints.unwrap_or_default(),
route_model_by_hint: self.route_model_by_hint.unwrap_or_default(),
allowed_tools: allowed,
response_cache: self.response_cache,
})
}
}
@@ -330,11 +342,25 @@ impl Agent {
.collect();
let available_hints: Vec<String> = route_model_by_hint.keys().cloned().collect();
let response_cache = if config.memory.response_cache_enabled {
crate::memory::response_cache::ResponseCache::with_hot_cache(
&config.workspace_dir,
config.memory.response_cache_ttl_minutes,
config.memory.response_cache_max_entries,
config.memory.response_cache_hot_entries,
)
.ok()
.map(Arc::new)
} else {
None
};
Agent::builder()
.provider(provider)
.tools(tools)
.memory(memory)
.observer(observer)
.response_cache(response_cache)
.tool_dispatcher(tool_dispatcher)
.memory_loader(Box::new(DefaultMemoryLoader::new(
5,
@@ -513,6 +539,47 @@ impl Agent {
for _ in 0..self.config.max_tool_iterations {
let messages = self.tool_dispatcher.to_provider_messages(&self.history);
// Response cache: check before LLM call (only for deterministic, text-only prompts)
let cache_key = if self.temperature == 0.0 {
self.response_cache.as_ref().map(|_| {
let last_user = messages
.iter()
.rfind(|m| m.role == "user")
.map(|m| m.content.as_str())
.unwrap_or("");
let system = messages
.iter()
.find(|m| m.role == "system")
.map(|m| m.content.as_str());
crate::memory::response_cache::ResponseCache::cache_key(
&effective_model,
system,
last_user,
)
})
} else {
None
};
if let (Some(ref cache), Some(ref key)) = (&self.response_cache, &cache_key) {
if let Ok(Some(cached)) = cache.get(key) {
self.observer.record_event(&ObserverEvent::CacheHit {
cache_type: "response".into(),
tokens_saved: 0,
});
self.history
.push(ConversationMessage::Chat(ChatMessage::assistant(
cached.clone(),
)));
self.trim_history();
return Ok(cached);
}
self.observer.record_event(&ObserverEvent::CacheMiss {
cache_type: "response".into(),
});
}
let response = match self
.provider
.chat(
@@ -541,6 +608,17 @@ impl Agent {
text
};
// Store in response cache (text-only, no tool calls)
if let (Some(ref cache), Some(ref key)) = (&self.response_cache, &cache_key) {
let token_count = response
.usage
.as_ref()
.and_then(|u| u.output_tokens)
.unwrap_or(0);
#[allow(clippy::cast_possible_truncation)]
let _ = cache.put(key, &effective_model, &final_text, token_count as u32);
}
self.history
.push(ConversationMessage::Chat(ChatMessage::assistant(
final_text.clone(),
+1
View File
@@ -3977,6 +3977,7 @@ mod tests {
ProviderCapabilities {
native_tool_calling: false,
vision: true,
prompt_caching: false,
}
}
+2
View File
@@ -32,6 +32,8 @@ pub mod nextcloud_talk;
pub mod nostr;
pub mod notion;
pub mod qq;
pub mod session_backend;
pub mod session_sqlite;
pub mod session_store;
pub mod signal;
pub mod slack;
+103
View File
@@ -0,0 +1,103 @@
//! Trait abstraction for session persistence backends.
//!
//! Backends store per-sender conversation histories. The trait is intentionally
//! minimal — load, append, remove_last, list — so that JSONL and SQLite (and
//! future backends) share a common interface.
use crate::providers::traits::ChatMessage;
use chrono::{DateTime, Utc};
/// Metadata about a persisted session.
#[derive(Debug, Clone)]
pub struct SessionMetadata {
/// Session key (e.g. `telegram_user123`).
pub key: String,
/// When the session was first created.
pub created_at: DateTime<Utc>,
/// When the last message was appended.
pub last_activity: DateTime<Utc>,
/// Total number of messages in the session.
pub message_count: usize,
}
/// Query parameters for listing sessions.
#[derive(Debug, Clone, Default)]
pub struct SessionQuery {
/// Keyword to search in session messages (FTS5 if available).
pub keyword: Option<String>,
/// Maximum number of sessions to return.
pub limit: Option<usize>,
}
/// Trait for session persistence backends.
///
/// Implementations must be `Send + Sync` for sharing across async tasks.
pub trait SessionBackend: Send + Sync {
/// Load all messages for a session. Returns empty vec if session doesn't exist.
fn load(&self, session_key: &str) -> Vec<ChatMessage>;
/// Append a single message to a session.
fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()>;
/// Remove the last message from a session. Returns `true` if a message was removed.
fn remove_last(&self, session_key: &str) -> std::io::Result<bool>;
/// List all session keys.
fn list_sessions(&self) -> Vec<String>;
/// List sessions with metadata.
fn list_sessions_with_metadata(&self) -> Vec<SessionMetadata> {
// Default: construct metadata from messages (backends can override for efficiency)
self.list_sessions()
.into_iter()
.map(|key| {
let messages = self.load(&key);
SessionMetadata {
key,
created_at: Utc::now(),
last_activity: Utc::now(),
message_count: messages.len(),
}
})
.collect()
}
/// Compact a session file (remove duplicates/corruption). No-op by default.
fn compact(&self, _session_key: &str) -> std::io::Result<()> {
Ok(())
}
/// Remove sessions that haven't been active within the given TTL hours.
fn cleanup_stale(&self, _ttl_hours: u32) -> std::io::Result<usize> {
Ok(0)
}
/// Search sessions by keyword. Default returns empty (backends with FTS override).
fn search(&self, _query: &SessionQuery) -> Vec<SessionMetadata> {
Vec::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn session_metadata_is_constructible() {
let meta = SessionMetadata {
key: "test".into(),
created_at: Utc::now(),
last_activity: Utc::now(),
message_count: 5,
};
assert_eq!(meta.key, "test");
assert_eq!(meta.message_count, 5);
}
#[test]
fn session_query_defaults() {
let q = SessionQuery::default();
assert!(q.keyword.is_none());
assert!(q.limit.is_none());
}
}
+503
View File
@@ -0,0 +1,503 @@
//! SQLite-backed session persistence with FTS5 search.
//!
//! Stores sessions in `{workspace}/sessions/sessions.db` using WAL mode.
//! Provides full-text search via FTS5 and automatic TTL-based cleanup.
//! Designed as the default backend, replacing JSONL for new installations.
use crate::channels::session_backend::{SessionBackend, SessionMetadata, SessionQuery};
use crate::providers::traits::ChatMessage;
use anyhow::{Context, Result};
use chrono::{DateTime, Duration, Utc};
use parking_lot::Mutex;
use rusqlite::{params, Connection};
use std::path::{Path, PathBuf};
/// SQLite-backed session store with FTS5 and WAL mode.
pub struct SqliteSessionBackend {
conn: Mutex<Connection>,
#[allow(dead_code)]
db_path: PathBuf,
}
impl SqliteSessionBackend {
/// Open or create the sessions database.
pub fn new(workspace_dir: &Path) -> Result<Self> {
let sessions_dir = workspace_dir.join("sessions");
std::fs::create_dir_all(&sessions_dir).context("Failed to create sessions directory")?;
let db_path = sessions_dir.join("sessions.db");
let conn = Connection::open(&db_path)
.with_context(|| format!("Failed to open session DB: {}", db_path.display()))?;
conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA temp_store = MEMORY;
PRAGMA mmap_size = 4194304;",
)?;
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_key TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sessions_key ON sessions(session_key);
CREATE INDEX IF NOT EXISTS idx_sessions_key_id ON sessions(session_key, id);
CREATE TABLE IF NOT EXISTS session_metadata (
session_key TEXT PRIMARY KEY,
created_at TEXT NOT NULL,
last_activity TEXT NOT NULL,
message_count INTEGER NOT NULL DEFAULT 0
);
CREATE VIRTUAL TABLE IF NOT EXISTS sessions_fts USING fts5(
session_key, content, content=sessions, content_rowid=id
);
CREATE TRIGGER IF NOT EXISTS sessions_ai AFTER INSERT ON sessions BEGIN
INSERT INTO sessions_fts(rowid, session_key, content)
VALUES (new.id, new.session_key, new.content);
END;
CREATE TRIGGER IF NOT EXISTS sessions_ad AFTER DELETE ON sessions BEGIN
INSERT INTO sessions_fts(sessions_fts, rowid, session_key, content)
VALUES ('delete', old.id, old.session_key, old.content);
END;",
)
.context("Failed to initialize session schema")?;
Ok(Self {
conn: Mutex::new(conn),
db_path,
})
}
/// Migrate JSONL session files into SQLite. Renames migrated files to `.jsonl.migrated`.
pub fn migrate_from_jsonl(&self, workspace_dir: &Path) -> Result<usize> {
let sessions_dir = workspace_dir.join("sessions");
let entries = match std::fs::read_dir(&sessions_dir) {
Ok(e) => e,
Err(_) => return Ok(0),
};
let mut migrated = 0;
for entry in entries {
let entry = match entry {
Ok(e) => e,
Err(_) => continue,
};
let name = match entry.file_name().into_string() {
Ok(n) => n,
Err(_) => continue,
};
let Some(key) = name.strip_suffix(".jsonl") else {
continue;
};
let path = entry.path();
let file = match std::fs::File::open(&path) {
Ok(f) => f,
Err(_) => continue,
};
let reader = std::io::BufReader::new(file);
let mut count = 0;
for line in std::io::BufRead::lines(reader) {
let Ok(line) = line else { continue };
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(msg) = serde_json::from_str::<ChatMessage>(trimmed) {
if self.append(key, &msg).is_ok() {
count += 1;
}
}
}
if count > 0 {
let migrated_path = path.with_extension("jsonl.migrated");
let _ = std::fs::rename(&path, &migrated_path);
migrated += 1;
}
}
Ok(migrated)
}
}
impl SessionBackend for SqliteSessionBackend {
fn load(&self, session_key: &str) -> Vec<ChatMessage> {
let conn = self.conn.lock();
let mut stmt = match conn
.prepare("SELECT role, content FROM sessions WHERE session_key = ?1 ORDER BY id ASC")
{
Ok(s) => s,
Err(_) => return Vec::new(),
};
let rows = match stmt.query_map(params![session_key], |row| {
Ok(ChatMessage {
role: row.get(0)?,
content: row.get(1)?,
})
}) {
Ok(r) => r,
Err(_) => return Vec::new(),
};
rows.filter_map(|r| r.ok()).collect()
}
fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
let conn = self.conn.lock();
let now = Utc::now().to_rfc3339();
conn.execute(
"INSERT INTO sessions (session_key, role, content, created_at)
VALUES (?1, ?2, ?3, ?4)",
params![session_key, message.role, message.content, now],
)
.map_err(std::io::Error::other)?;
// Upsert metadata
conn.execute(
"INSERT INTO session_metadata (session_key, created_at, last_activity, message_count)
VALUES (?1, ?2, ?3, 1)
ON CONFLICT(session_key) DO UPDATE SET
last_activity = excluded.last_activity,
message_count = message_count + 1",
params![session_key, now, now],
)
.map_err(std::io::Error::other)?;
Ok(())
}
fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
let conn = self.conn.lock();
let last_id: Option<i64> = conn
.query_row(
"SELECT id FROM sessions WHERE session_key = ?1 ORDER BY id DESC LIMIT 1",
params![session_key],
|row| row.get(0),
)
.ok();
let Some(id) = last_id else {
return Ok(false);
};
conn.execute("DELETE FROM sessions WHERE id = ?1", params![id])
.map_err(std::io::Error::other)?;
// Update metadata count
conn.execute(
"UPDATE session_metadata SET message_count = MAX(0, message_count - 1)
WHERE session_key = ?1",
params![session_key],
)
.map_err(std::io::Error::other)?;
Ok(true)
}
fn list_sessions(&self) -> Vec<String> {
let conn = self.conn.lock();
let mut stmt = match conn
.prepare("SELECT session_key FROM session_metadata ORDER BY last_activity DESC")
{
Ok(s) => s,
Err(_) => return Vec::new(),
};
let rows = match stmt.query_map([], |row| row.get(0)) {
Ok(r) => r,
Err(_) => return Vec::new(),
};
rows.filter_map(|r| r.ok()).collect()
}
fn list_sessions_with_metadata(&self) -> Vec<SessionMetadata> {
let conn = self.conn.lock();
let mut stmt = match conn.prepare(
"SELECT session_key, created_at, last_activity, message_count
FROM session_metadata ORDER BY last_activity DESC",
) {
Ok(s) => s,
Err(_) => return Vec::new(),
};
let rows = match stmt.query_map([], |row| {
let key: String = row.get(0)?;
let created_str: String = row.get(1)?;
let activity_str: String = row.get(2)?;
let count: i64 = row.get(3)?;
let created = DateTime::parse_from_rfc3339(&created_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
let activity = DateTime::parse_from_rfc3339(&activity_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
#[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
Ok(SessionMetadata {
key,
created_at: created,
last_activity: activity,
message_count: count as usize,
})
}) {
Ok(r) => r,
Err(_) => return Vec::new(),
};
rows.filter_map(|r| r.ok()).collect()
}
fn cleanup_stale(&self, ttl_hours: u32) -> std::io::Result<usize> {
let conn = self.conn.lock();
let cutoff = (Utc::now() - Duration::hours(i64::from(ttl_hours))).to_rfc3339();
// Find stale sessions
let stale_keys: Vec<String> = {
let mut stmt = conn
.prepare("SELECT session_key FROM session_metadata WHERE last_activity < ?1")
.map_err(std::io::Error::other)?;
let rows = stmt
.query_map(params![cutoff], |row| row.get(0))
.map_err(std::io::Error::other)?;
rows.filter_map(|r| r.ok()).collect()
};
let count = stale_keys.len();
for key in &stale_keys {
let _ = conn.execute("DELETE FROM sessions WHERE session_key = ?1", params![key]);
let _ = conn.execute(
"DELETE FROM session_metadata WHERE session_key = ?1",
params![key],
);
}
Ok(count)
}
fn search(&self, query: &SessionQuery) -> Vec<SessionMetadata> {
let Some(keyword) = &query.keyword else {
return self.list_sessions_with_metadata();
};
let conn = self.conn.lock();
#[allow(clippy::cast_possible_wrap)]
let limit = query.limit.unwrap_or(50) as i64;
// FTS5 search
let mut stmt = match conn.prepare(
"SELECT DISTINCT f.session_key
FROM sessions_fts f
WHERE sessions_fts MATCH ?1
LIMIT ?2",
) {
Ok(s) => s,
Err(_) => return Vec::new(),
};
// Quote each word for FTS5
let fts_query: String = keyword
.split_whitespace()
.map(|w| format!("\"{w}\""))
.collect::<Vec<_>>()
.join(" OR ");
let keys: Vec<String> = match stmt.query_map(params![fts_query, limit], |row| row.get(0)) {
Ok(r) => r.filter_map(|r| r.ok()).collect(),
Err(_) => return Vec::new(),
};
// Look up metadata for matched sessions
keys.iter()
.filter_map(|key| {
conn.query_row(
"SELECT created_at, last_activity, message_count FROM session_metadata WHERE session_key = ?1",
params![key],
|row| {
let created_str: String = row.get(0)?;
let activity_str: String = row.get(1)?;
let count: i64 = row.get(2)?;
Ok(SessionMetadata {
key: key.clone(),
created_at: DateTime::parse_from_rfc3339(&created_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now()),
last_activity: DateTime::parse_from_rfc3339(&activity_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now()),
#[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
message_count: count as usize,
})
},
)
.ok()
})
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn round_trip_sqlite() {
let tmp = TempDir::new().unwrap();
let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
backend
.append("user1", &ChatMessage::user("hello"))
.unwrap();
backend
.append("user1", &ChatMessage::assistant("hi"))
.unwrap();
let msgs = backend.load("user1");
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0].role, "user");
assert_eq!(msgs[1].role, "assistant");
}
#[test]
fn remove_last_sqlite() {
let tmp = TempDir::new().unwrap();
let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
backend.append("u", &ChatMessage::user("a")).unwrap();
backend.append("u", &ChatMessage::user("b")).unwrap();
assert!(backend.remove_last("u").unwrap());
let msgs = backend.load("u");
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].content, "a");
}
#[test]
fn remove_last_empty_sqlite() {
let tmp = TempDir::new().unwrap();
let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
assert!(!backend.remove_last("nonexistent").unwrap());
}
#[test]
fn list_sessions_sqlite() {
let tmp = TempDir::new().unwrap();
let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
backend.append("a", &ChatMessage::user("hi")).unwrap();
backend.append("b", &ChatMessage::user("hey")).unwrap();
let sessions = backend.list_sessions();
assert_eq!(sessions.len(), 2);
}
#[test]
fn metadata_tracks_counts() {
let tmp = TempDir::new().unwrap();
let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
backend.append("s1", &ChatMessage::user("a")).unwrap();
backend.append("s1", &ChatMessage::user("b")).unwrap();
backend.append("s1", &ChatMessage::user("c")).unwrap();
let meta = backend.list_sessions_with_metadata();
assert_eq!(meta.len(), 1);
assert_eq!(meta[0].message_count, 3);
}
#[test]
fn fts5_search_finds_content() {
let tmp = TempDir::new().unwrap();
let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
backend
.append(
"code_chat",
&ChatMessage::user("How do I parse JSON in Rust?"),
)
.unwrap();
backend
.append("weather", &ChatMessage::user("What's the weather today?"))
.unwrap();
let results = backend.search(&SessionQuery {
keyword: Some("Rust".into()),
limit: Some(10),
});
assert_eq!(results.len(), 1);
assert_eq!(results[0].key, "code_chat");
}
#[test]
fn cleanup_stale_removes_old_sessions() {
let tmp = TempDir::new().unwrap();
let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
// Insert a session with old timestamp
{
let conn = backend.conn.lock();
let old_time = (Utc::now() - Duration::hours(100)).to_rfc3339();
conn.execute(
"INSERT INTO sessions (session_key, role, content, created_at) VALUES (?1, ?2, ?3, ?4)",
params!["old_session", "user", "ancient", old_time],
).unwrap();
conn.execute(
"INSERT INTO session_metadata (session_key, created_at, last_activity, message_count) VALUES (?1, ?2, ?3, 1)",
params!["old_session", old_time, old_time],
).unwrap();
}
backend
.append("new_session", &ChatMessage::user("fresh"))
.unwrap();
let cleaned = backend.cleanup_stale(48).unwrap(); // 48h TTL
assert_eq!(cleaned, 1);
let sessions = backend.list_sessions();
assert_eq!(sessions.len(), 1);
assert_eq!(sessions[0], "new_session");
}
#[test]
fn migrate_from_jsonl_imports_and_renames() {
let tmp = TempDir::new().unwrap();
let sessions_dir = tmp.path().join("sessions");
std::fs::create_dir_all(&sessions_dir).unwrap();
// Create a JSONL file
let jsonl_path = sessions_dir.join("test_user.jsonl");
std::fs::write(
&jsonl_path,
"{\"role\":\"user\",\"content\":\"hello\"}\n{\"role\":\"assistant\",\"content\":\"hi\"}\n",
)
.unwrap();
let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
let migrated = backend.migrate_from_jsonl(tmp.path()).unwrap();
assert_eq!(migrated, 1);
// JSONL should be renamed
assert!(!jsonl_path.exists());
assert!(sessions_dir.join("test_user.jsonl.migrated").exists());
// Messages should be in SQLite
let msgs = backend.load("test_user");
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0].content, "hello");
}
}
+111
View File
@@ -5,6 +5,7 @@
//! one-per-line as JSON, never modifying old lines. On daemon restart, sessions
//! are loaded from disk to restore conversation context.
use crate::channels::session_backend::SessionBackend;
use crate::providers::traits::ChatMessage;
use std::io::{BufRead, Write};
use std::path::{Path, PathBuf};
@@ -78,6 +79,37 @@ impl SessionStore {
Ok(())
}
/// Remove the last message from a session's JSONL file.
///
/// Rewrite approach: load all messages, drop the last, rewrite. This is
/// O(n) but rollbacks are rare.
pub fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
let mut messages = self.load(session_key);
if messages.is_empty() {
return Ok(false);
}
messages.pop();
self.rewrite(session_key, &messages)?;
Ok(true)
}
/// Compact a session file by rewriting only valid messages (removes corrupt lines).
pub fn compact(&self, session_key: &str) -> std::io::Result<()> {
let messages = self.load(session_key);
self.rewrite(session_key, &messages)
}
fn rewrite(&self, session_key: &str, messages: &[ChatMessage]) -> std::io::Result<()> {
let path = self.session_path(session_key);
let mut file = std::fs::File::create(&path)?;
for msg in messages {
let json = serde_json::to_string(msg)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
writeln!(file, "{json}")?;
}
Ok(())
}
/// List all session keys that have files on disk.
pub fn list_sessions(&self) -> Vec<String> {
let entries = match std::fs::read_dir(&self.sessions_dir) {
@@ -95,6 +127,28 @@ impl SessionStore {
}
}
impl SessionBackend for SessionStore {
fn load(&self, session_key: &str) -> Vec<ChatMessage> {
self.load(session_key)
}
fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
self.append(session_key, message)
}
fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
self.remove_last(session_key)
}
fn list_sessions(&self) -> Vec<String> {
self.list_sessions()
}
fn compact(&self, session_key: &str) -> std::io::Result<()> {
self.compact(session_key)
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -178,6 +232,63 @@ mod tests {
assert_eq!(lines.len(), 2);
}
#[test]
fn remove_last_drops_final_message() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
store
.append("rm_test", &ChatMessage::user("first"))
.unwrap();
store
.append("rm_test", &ChatMessage::user("second"))
.unwrap();
assert!(store.remove_last("rm_test").unwrap());
let messages = store.load("rm_test");
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].content, "first");
}
#[test]
fn remove_last_empty_returns_false() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
assert!(!store.remove_last("nonexistent").unwrap());
}
#[test]
fn compact_removes_corrupt_lines() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
let key = "compact_test";
let path = store.session_path(key);
std::fs::create_dir_all(path.parent().unwrap()).unwrap();
let mut file = std::fs::File::create(&path).unwrap();
writeln!(file, r#"{{"role":"user","content":"ok"}}"#).unwrap();
writeln!(file, "corrupt line").unwrap();
writeln!(file, r#"{{"role":"assistant","content":"hi"}}"#).unwrap();
store.compact(key).unwrap();
let raw = std::fs::read_to_string(&path).unwrap();
assert_eq!(raw.trim().lines().count(), 2);
}
#[test]
fn session_backend_trait_works_via_dyn() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
let backend: &dyn SessionBackend = &store;
backend
.append("trait_test", &ChatMessage::user("hello"))
.unwrap();
let msgs = backend.load("trait_test");
assert_eq!(msgs.len(), 1);
}
#[test]
fn handles_corrupt_lines_gracefully() {
let tmp = TempDir::new().unwrap();
+71
View File
@@ -2650,6 +2650,9 @@ pub struct MemoryConfig {
/// Max number of cached responses before LRU eviction (default: 5000)
#[serde(default = "default_response_cache_max")]
pub response_cache_max_entries: usize,
/// Max in-memory hot cache entries for the two-tier response cache (default: 256)
#[serde(default = "default_response_cache_hot_entries")]
pub response_cache_hot_entries: usize,
// ── Memory Snapshot (soul backup to Markdown) ─────────────
/// Enable periodic export of core memories to MEMORY_SNAPSHOT.md
@@ -2718,6 +2721,10 @@ fn default_response_cache_max() -> usize {
5_000
}
fn default_response_cache_hot_entries() -> usize {
256
}
impl Default for MemoryConfig {
fn default() -> Self {
Self {
@@ -2738,6 +2745,7 @@ impl Default for MemoryConfig {
response_cache_enabled: false,
response_cache_ttl_minutes: default_response_cache_ttl(),
response_cache_max_entries: default_response_cache_max(),
response_cache_hot_entries: default_response_cache_hot_entries(),
snapshot_enabled: false,
snapshot_on_hygiene: false,
auto_hydrate: true,
@@ -3344,12 +3352,48 @@ pub struct HeartbeatConfig {
/// explicitly set).
#[serde(default, alias = "recipient")]
pub to: Option<String>,
/// Enable adaptive intervals that back off on failures and speed up for
/// high-priority tasks. Default: `false`.
#[serde(default)]
pub adaptive: bool,
/// Minimum interval in minutes when adaptive mode is enabled. Default: `5`.
#[serde(default = "default_heartbeat_min_interval")]
pub min_interval_minutes: u32,
/// Maximum interval in minutes when adaptive mode backs off. Default: `120`.
#[serde(default = "default_heartbeat_max_interval")]
pub max_interval_minutes: u32,
/// Dead-man's switch timeout in minutes. If the heartbeat has not ticked
/// within this window, an alert is sent. `0` disables. Default: `0`.
#[serde(default)]
pub deadman_timeout_minutes: u32,
/// Channel for dead-man's switch alerts (e.g. `telegram`). Falls back to
/// the heartbeat delivery channel.
#[serde(default)]
pub deadman_channel: Option<String>,
/// Recipient for dead-man's switch alerts. Falls back to `to`.
#[serde(default)]
pub deadman_to: Option<String>,
/// Maximum number of heartbeat run history records to retain. Default: `100`.
#[serde(default = "default_heartbeat_max_run_history")]
pub max_run_history: u32,
}
fn default_two_phase() -> bool {
true
}
fn default_heartbeat_min_interval() -> u32 {
5
}
fn default_heartbeat_max_interval() -> u32 {
120
}
fn default_heartbeat_max_run_history() -> u32 {
100
}
impl Default for HeartbeatConfig {
fn default() -> Self {
Self {
@@ -3359,6 +3403,13 @@ impl Default for HeartbeatConfig {
message: None,
target: None,
to: None,
adaptive: false,
min_interval_minutes: default_heartbeat_min_interval(),
max_interval_minutes: default_heartbeat_max_interval(),
deadman_timeout_minutes: 0,
deadman_channel: None,
deadman_to: None,
max_run_history: default_heartbeat_max_run_history(),
}
}
}
@@ -3587,6 +3638,13 @@ pub struct ChannelsConfig {
/// daemon restarts. Files are stored in `{workspace}/sessions/`. Default: `true`.
#[serde(default = "default_true")]
pub session_persistence: bool,
/// Session persistence backend: `"jsonl"` (legacy) or `"sqlite"` (new default).
/// SQLite provides FTS5 search, metadata tracking, and TTL cleanup.
#[serde(default = "default_session_backend")]
pub session_backend: String,
/// Auto-archive stale sessions older than this many hours. `0` disables. Default: `0`.
#[serde(default)]
pub session_ttl_hours: u32,
}
impl ChannelsConfig {
@@ -3692,6 +3750,10 @@ fn default_channel_message_timeout_secs() -> u64 {
300
}
fn default_session_backend() -> String {
"sqlite".into()
}
impl Default for ChannelsConfig {
fn default() -> Self {
Self {
@@ -3722,6 +3784,8 @@ impl Default for ChannelsConfig {
ack_reactions: true,
show_tool_calls: true,
session_persistence: true,
session_backend: default_session_backend(),
session_ttl_hours: 0,
}
}
}
@@ -7358,6 +7422,7 @@ default_temperature = 0.7
message: Some("Check London time".into()),
target: Some("telegram".into()),
to: Some("123456".into()),
..HeartbeatConfig::default()
},
cron: CronConfig::default(),
channels_config: ChannelsConfig {
@@ -7395,6 +7460,8 @@ default_temperature = 0.7
ack_reactions: true,
show_tool_calls: true,
session_persistence: true,
session_backend: default_session_backend(),
session_ttl_hours: 0,
},
memory: MemoryConfig::default(),
storage: StorageConfig::default(),
@@ -8127,6 +8194,8 @@ allowed_users = ["@ops:matrix.org"]
ack_reactions: true,
show_tool_calls: true,
session_persistence: true,
session_backend: default_session_backend(),
session_ttl_hours: 0,
};
let toml_str = toml::to_string_pretty(&c).unwrap();
let parsed: ChannelsConfig = toml::from_str(&toml_str).unwrap();
@@ -8355,6 +8424,8 @@ channel_id = "C123"
ack_reactions: true,
show_tool_calls: true,
session_persistence: true,
session_backend: default_session_backend(),
session_ttl_hours: 0,
};
let toml_str = toml::to_string_pretty(&c).unwrap();
let parsed: ChannelsConfig = toml::from_str(&toml_str).unwrap();
+128 -9
View File
@@ -203,7 +203,10 @@ where
}
async fn run_heartbeat_worker(config: Config) -> Result<()> {
use crate::heartbeat::engine::HeartbeatEngine;
use crate::heartbeat::engine::{
compute_adaptive_interval, HeartbeatEngine, HeartbeatTask, TaskPriority, TaskStatus,
};
use std::sync::Arc;
let observer: std::sync::Arc<dyn crate::observability::Observer> =
std::sync::Arc::from(crate::observability::create_observer(&config.observability));
@@ -212,19 +215,72 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
config.workspace_dir.clone(),
observer,
);
let metrics = engine.metrics();
let delivery = resolve_heartbeat_delivery(&config)?;
let two_phase = config.heartbeat.two_phase;
let adaptive = config.heartbeat.adaptive;
let start_time = std::time::Instant::now();
let interval_mins = config.heartbeat.interval_minutes.max(5);
let mut interval = tokio::time::interval(Duration::from_secs(u64::from(interval_mins) * 60));
// ── Deadman watcher ──────────────────────────────────────────
let deadman_timeout = config.heartbeat.deadman_timeout_minutes;
if deadman_timeout > 0 {
let dm_metrics = Arc::clone(&metrics);
let dm_config = config.clone();
let dm_delivery = delivery.clone();
tokio::spawn(async move {
let check_interval = Duration::from_secs(60);
let timeout = chrono::Duration::minutes(i64::from(deadman_timeout));
loop {
tokio::time::sleep(check_interval).await;
let last_tick = dm_metrics.lock().last_tick_at;
if let Some(last) = last_tick {
if chrono::Utc::now() - last > timeout {
let alert = format!(
"⚠️ Heartbeat dead-man's switch: no tick in {deadman_timeout} minutes"
);
let (channel, target) =
if let Some(ch) = &dm_config.heartbeat.deadman_channel {
let to = dm_config
.heartbeat
.deadman_to
.as_deref()
.or(dm_config.heartbeat.to.as_deref())
.unwrap_or_default();
(ch.clone(), to.to_string())
} else if let Some((ch, to)) = &dm_delivery {
(ch.clone(), to.clone())
} else {
continue;
};
let _ = crate::cron::scheduler::deliver_announcement(
&dm_config, &channel, &target, &alert,
)
.await;
}
}
}
});
}
let base_interval = config.heartbeat.interval_minutes.max(5);
let mut sleep_mins = base_interval;
loop {
interval.tick().await;
tokio::time::sleep(Duration::from_secs(u64::from(sleep_mins) * 60)).await;
// Update uptime
{
let mut m = metrics.lock();
m.uptime_secs = start_time.elapsed().as_secs();
}
let tick_start = std::time::Instant::now();
// Collect runnable tasks (active only, sorted by priority)
let mut tasks = engine.collect_runnable_tasks().await?;
let has_high_priority = tasks.iter().any(|t| t.priority == TaskPriority::High);
if tasks.is_empty() {
// Try fallback message
if let Some(fallback) = config
.heartbeat
.message
@@ -232,12 +288,15 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
.map(str::trim)
.filter(|m| !m.is_empty())
{
tasks.push(crate::heartbeat::engine::HeartbeatTask {
tasks.push(HeartbeatTask {
text: fallback.to_string(),
priority: crate::heartbeat::engine::TaskPriority::Medium,
status: crate::heartbeat::engine::TaskStatus::Active,
priority: TaskPriority::Medium,
status: TaskStatus::Active,
});
} else {
#[allow(clippy::cast_precision_loss)]
let elapsed = tick_start.elapsed().as_millis() as f64;
metrics.lock().record_success(elapsed);
continue;
}
}
@@ -250,7 +309,7 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
Some(decision_prompt),
None,
None,
0.0, // Low temperature for deterministic decision
0.0,
vec![],
false,
None,
@@ -263,6 +322,9 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
if indices.is_empty() {
tracing::info!("💓 Heartbeat Phase 1: skip (nothing to do)");
crate::health::mark_component_ok("heartbeat");
#[allow(clippy::cast_precision_loss)]
let elapsed = tick_start.elapsed().as_millis() as f64;
metrics.lock().record_success(elapsed);
continue;
}
tracing::info!(
@@ -285,7 +347,9 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
};
// ── Phase 2: Execute selected tasks ─────────────────────
let mut tick_had_error = false;
for task in &tasks_to_run {
let task_start = std::time::Instant::now();
let prompt = format!("[Heartbeat Task | {}] {}", task.priority, task.text);
let temp = config.default_temperature;
match Box::pin(crate::agent::run(
@@ -303,6 +367,20 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
{
Ok(output) => {
crate::health::mark_component_ok("heartbeat");
#[allow(clippy::cast_possible_truncation)]
let duration_ms = task_start.elapsed().as_millis() as i64;
let now = chrono::Utc::now();
let _ = crate::heartbeat::store::record_run(
&config.workspace_dir,
&task.text,
&task.priority.to_string(),
now - chrono::Duration::milliseconds(duration_ms),
now,
"ok",
Some(output.as_str()),
duration_ms,
config.heartbeat.max_run_history,
);
let announcement = if output.trim().is_empty() {
format!("💓 heartbeat task completed: {}", task.text)
} else {
@@ -326,11 +404,52 @@ async fn run_heartbeat_worker(config: Config) -> Result<()> {
}
}
Err(e) => {
tick_had_error = true;
#[allow(clippy::cast_possible_truncation)]
let duration_ms = task_start.elapsed().as_millis() as i64;
let now = chrono::Utc::now();
let _ = crate::heartbeat::store::record_run(
&config.workspace_dir,
&task.text,
&task.priority.to_string(),
now - chrono::Duration::milliseconds(duration_ms),
now,
"error",
Some(&e.to_string()),
duration_ms,
config.heartbeat.max_run_history,
);
crate::health::mark_component_error("heartbeat", e.to_string());
tracing::warn!("Heartbeat task failed: {e}");
}
}
}
// Update metrics
#[allow(clippy::cast_precision_loss)]
let tick_elapsed = tick_start.elapsed().as_millis() as f64;
{
let mut m = metrics.lock();
if tick_had_error {
m.record_failure(tick_elapsed);
} else {
m.record_success(tick_elapsed);
}
}
// Compute next sleep interval
if adaptive {
let failures = metrics.lock().consecutive_failures;
sleep_mins = compute_adaptive_interval(
base_interval,
config.heartbeat.min_interval_minutes,
config.heartbeat.max_interval_minutes,
failures,
has_high_priority,
);
} else {
sleep_mins = base_interval;
}
}
}
+177
View File
@@ -1,6 +1,8 @@
use crate::config::HeartbeatConfig;
use crate::observability::{Observer, ObserverEvent};
use anyhow::Result;
use chrono::{DateTime, Utc};
use parking_lot::Mutex as ParkingMutex;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::path::Path;
@@ -68,6 +70,99 @@ impl fmt::Display for HeartbeatTask {
}
}
// ── Health Metrics ───────────────────────────────────────────────
/// Live health metrics for the heartbeat subsystem.
///
/// Shared via `Arc<ParkingMutex<>>` between the heartbeat worker,
/// deadman watcher, and API consumers.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HeartbeatMetrics {
/// Monotonic uptime since the heartbeat loop started.
pub uptime_secs: u64,
/// Consecutive successful ticks (resets on failure).
pub consecutive_successes: u64,
/// Consecutive failed ticks (resets on success).
pub consecutive_failures: u64,
/// Timestamp of the most recent tick (UTC RFC 3339).
pub last_tick_at: Option<DateTime<Utc>>,
/// Exponential moving average of tick durations in milliseconds.
pub avg_tick_duration_ms: f64,
/// Total number of ticks executed since startup.
pub total_ticks: u64,
}
impl Default for HeartbeatMetrics {
fn default() -> Self {
Self {
uptime_secs: 0,
consecutive_successes: 0,
consecutive_failures: 0,
last_tick_at: None,
avg_tick_duration_ms: 0.0,
total_ticks: 0,
}
}
}
impl HeartbeatMetrics {
/// Record a successful tick with the given duration.
pub fn record_success(&mut self, duration_ms: f64) {
self.consecutive_successes += 1;
self.consecutive_failures = 0;
self.last_tick_at = Some(Utc::now());
self.total_ticks += 1;
self.update_avg_duration(duration_ms);
}
/// Record a failed tick with the given duration.
pub fn record_failure(&mut self, duration_ms: f64) {
self.consecutive_failures += 1;
self.consecutive_successes = 0;
self.last_tick_at = Some(Utc::now());
self.total_ticks += 1;
self.update_avg_duration(duration_ms);
}
fn update_avg_duration(&mut self, duration_ms: f64) {
const ALPHA: f64 = 0.3; // EMA smoothing factor
if self.total_ticks == 1 {
self.avg_tick_duration_ms = duration_ms;
} else {
self.avg_tick_duration_ms =
ALPHA * duration_ms + (1.0 - ALPHA) * self.avg_tick_duration_ms;
}
}
}
/// Compute the adaptive interval for the next heartbeat tick.
///
/// Strategy:
/// - On failures: exponential back-off `base * 2^failures` capped at `max_interval`.
/// - When high-priority tasks are present: use `min_interval` for faster reaction.
/// - Otherwise: use `base_interval`.
pub fn compute_adaptive_interval(
base_minutes: u32,
min_minutes: u32,
max_minutes: u32,
consecutive_failures: u64,
has_high_priority_tasks: bool,
) -> u32 {
if consecutive_failures > 0 {
let backoff = base_minutes.saturating_mul(
1u32.checked_shl(consecutive_failures.min(10) as u32)
.unwrap_or(u32::MAX),
);
return backoff.min(max_minutes).max(min_minutes);
}
if has_high_priority_tasks {
return min_minutes.max(5); // never go below 5 minutes
}
base_minutes.clamp(min_minutes, max_minutes)
}
// ── Engine ───────────────────────────────────────────────────────
/// Heartbeat engine — reads HEARTBEAT.md and executes tasks periodically
@@ -75,6 +170,7 @@ pub struct HeartbeatEngine {
config: HeartbeatConfig,
workspace_dir: std::path::PathBuf,
observer: Arc<dyn Observer>,
metrics: Arc<ParkingMutex<HeartbeatMetrics>>,
}
impl HeartbeatEngine {
@@ -87,9 +183,15 @@ impl HeartbeatEngine {
config,
workspace_dir,
observer,
metrics: Arc::new(ParkingMutex::new(HeartbeatMetrics::default())),
}
}
/// Get a shared handle to the live heartbeat metrics.
pub fn metrics(&self) -> Arc<ParkingMutex<HeartbeatMetrics>> {
Arc::clone(&self.metrics)
}
/// Start the heartbeat loop (runs until cancelled)
pub async fn run(&self) -> Result<()> {
if !self.config.enabled {
@@ -673,4 +775,79 @@ mod tests {
let _ = tokio::fs::remove_dir_all(&dir).await;
}
// ── HeartbeatMetrics tests ───────────────────────────────────
#[test]
fn metrics_record_success_updates_fields() {
let mut m = HeartbeatMetrics::default();
m.record_success(100.0);
assert_eq!(m.consecutive_successes, 1);
assert_eq!(m.consecutive_failures, 0);
assert_eq!(m.total_ticks, 1);
assert!(m.last_tick_at.is_some());
assert!((m.avg_tick_duration_ms - 100.0).abs() < f64::EPSILON);
}
#[test]
fn metrics_record_failure_resets_successes() {
let mut m = HeartbeatMetrics::default();
m.record_success(50.0);
m.record_success(50.0);
m.record_failure(200.0);
assert_eq!(m.consecutive_successes, 0);
assert_eq!(m.consecutive_failures, 1);
assert_eq!(m.total_ticks, 3);
}
#[test]
fn metrics_ema_smoothing() {
let mut m = HeartbeatMetrics::default();
m.record_success(100.0);
assert!((m.avg_tick_duration_ms - 100.0).abs() < f64::EPSILON);
m.record_success(200.0);
// EMA: 0.3 * 200 + 0.7 * 100 = 130
assert!((m.avg_tick_duration_ms - 130.0).abs() < f64::EPSILON);
}
// ── Adaptive interval tests ─────────────────────────────────
#[test]
fn adaptive_uses_base_when_no_failures() {
let result = compute_adaptive_interval(30, 5, 120, 0, false);
assert_eq!(result, 30);
}
#[test]
fn adaptive_uses_min_for_high_priority() {
let result = compute_adaptive_interval(30, 5, 120, 0, true);
assert_eq!(result, 5);
}
#[test]
fn adaptive_backs_off_on_failures() {
// 1 failure: 30 * 2 = 60
assert_eq!(compute_adaptive_interval(30, 5, 120, 1, false), 60);
// 2 failures: 30 * 4 = 120 (capped at max)
assert_eq!(compute_adaptive_interval(30, 5, 120, 2, false), 120);
// 3 failures: 30 * 8 = 240 → capped at 120
assert_eq!(compute_adaptive_interval(30, 5, 120, 3, false), 120);
}
#[test]
fn adaptive_backoff_respects_min() {
// Even with failures, must be >= min
assert!(compute_adaptive_interval(5, 10, 120, 0, false) >= 10);
}
// ── Engine metrics accessor ─────────────────────────────────
#[test]
fn engine_exposes_shared_metrics() {
let observer: Arc<dyn Observer> = Arc::new(crate::observability::NoopObserver);
let engine =
HeartbeatEngine::new(HeartbeatConfig::default(), std::env::temp_dir(), observer);
let metrics = engine.metrics();
assert_eq!(metrics.lock().total_ticks, 0);
}
}
+1
View File
@@ -1,4 +1,5 @@
pub mod engine;
pub mod store;
#[cfg(test)]
mod tests {
+305
View File
@@ -0,0 +1,305 @@
//! SQLite persistence for heartbeat task execution history.
//!
//! Mirrors the `cron/store.rs` pattern: fresh connection per call, schema
//! auto-created, output truncated, history pruned to a configurable limit.
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use rusqlite::{params, Connection};
use std::path::{Path, PathBuf};
const MAX_OUTPUT_BYTES: usize = 16 * 1024;
const TRUNCATED_MARKER: &str = "\n...[truncated]";
/// A single heartbeat task execution record.
#[derive(Debug, Clone)]
pub struct HeartbeatRun {
pub id: i64,
pub task_text: String,
pub task_priority: String,
pub started_at: DateTime<Utc>,
pub finished_at: DateTime<Utc>,
pub status: String, // "ok" or "error"
pub output: Option<String>,
pub duration_ms: i64,
}
/// Record a heartbeat task execution and prune old entries.
pub fn record_run(
workspace_dir: &Path,
task_text: &str,
task_priority: &str,
started_at: DateTime<Utc>,
finished_at: DateTime<Utc>,
status: &str,
output: Option<&str>,
duration_ms: i64,
max_history: u32,
) -> Result<()> {
let bounded_output = output.map(truncate_output);
with_connection(workspace_dir, |conn| {
let tx = conn.unchecked_transaction()?;
tx.execute(
"INSERT INTO heartbeat_runs
(task_text, task_priority, started_at, finished_at, status, output, duration_ms)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
params![
task_text,
task_priority,
started_at.to_rfc3339(),
finished_at.to_rfc3339(),
status,
bounded_output.as_deref(),
duration_ms,
],
)
.context("Failed to insert heartbeat run")?;
let keep = i64::from(max_history.max(1));
tx.execute(
"DELETE FROM heartbeat_runs
WHERE id NOT IN (
SELECT id FROM heartbeat_runs
ORDER BY started_at DESC, id DESC
LIMIT ?1
)",
params![keep],
)
.context("Failed to prune heartbeat run history")?;
tx.commit()
.context("Failed to commit heartbeat run transaction")?;
Ok(())
})
}
/// List the most recent heartbeat runs.
pub fn list_runs(workspace_dir: &Path, limit: usize) -> Result<Vec<HeartbeatRun>> {
with_connection(workspace_dir, |conn| {
let lim = i64::try_from(limit.max(1)).context("Run history limit overflow")?;
let mut stmt = conn.prepare(
"SELECT id, task_text, task_priority, started_at, finished_at, status, output, duration_ms
FROM heartbeat_runs
ORDER BY started_at DESC, id DESC
LIMIT ?1",
)?;
let rows = stmt.query_map(params![lim], |row| {
Ok(HeartbeatRun {
id: row.get(0)?,
task_text: row.get(1)?,
task_priority: row.get(2)?,
started_at: parse_rfc3339(&row.get::<_, String>(3)?).map_err(sql_err)?,
finished_at: parse_rfc3339(&row.get::<_, String>(4)?).map_err(sql_err)?,
status: row.get(5)?,
output: row.get(6)?,
duration_ms: row.get(7)?,
})
})?;
let mut runs = Vec::new();
for row in rows {
runs.push(row?);
}
Ok(runs)
})
}
/// Get aggregate stats: (total_runs, total_ok, total_error).
pub fn run_stats(workspace_dir: &Path) -> Result<(u64, u64, u64)> {
with_connection(workspace_dir, |conn| {
let total: i64 = conn.query_row("SELECT COUNT(*) FROM heartbeat_runs", [], |r| r.get(0))?;
let ok: i64 = conn.query_row(
"SELECT COUNT(*) FROM heartbeat_runs WHERE status = 'ok'",
[],
|r| r.get(0),
)?;
let err: i64 = conn.query_row(
"SELECT COUNT(*) FROM heartbeat_runs WHERE status = 'error'",
[],
|r| r.get(0),
)?;
#[allow(clippy::cast_sign_loss)]
Ok((total as u64, ok as u64, err as u64))
})
}
fn db_path(workspace_dir: &Path) -> PathBuf {
workspace_dir.join("heartbeat").join("history.db")
}
fn with_connection<T>(workspace_dir: &Path, f: impl FnOnce(&Connection) -> Result<T>) -> Result<T> {
let path = db_path(workspace_dir);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).with_context(|| {
format!("Failed to create heartbeat directory: {}", parent.display())
})?;
}
let conn = Connection::open(&path)
.with_context(|| format!("Failed to open heartbeat history DB: {}", path.display()))?;
conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA temp_store = MEMORY;
CREATE TABLE IF NOT EXISTS heartbeat_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_text TEXT NOT NULL,
task_priority TEXT NOT NULL,
started_at TEXT NOT NULL,
finished_at TEXT NOT NULL,
status TEXT NOT NULL,
output TEXT,
duration_ms INTEGER
);
CREATE INDEX IF NOT EXISTS idx_hb_runs_started ON heartbeat_runs(started_at);
CREATE INDEX IF NOT EXISTS idx_hb_runs_task ON heartbeat_runs(task_text);",
)
.context("Failed to initialize heartbeat history schema")?;
f(&conn)
}
fn truncate_output(output: &str) -> String {
if output.len() <= MAX_OUTPUT_BYTES {
return output.to_string();
}
if MAX_OUTPUT_BYTES <= TRUNCATED_MARKER.len() {
return TRUNCATED_MARKER.to_string();
}
let mut cutoff = MAX_OUTPUT_BYTES - TRUNCATED_MARKER.len();
while cutoff > 0 && !output.is_char_boundary(cutoff) {
cutoff -= 1;
}
let mut truncated = output[..cutoff].to_string();
truncated.push_str(TRUNCATED_MARKER);
truncated
}
fn parse_rfc3339(raw: &str) -> Result<DateTime<Utc>> {
let parsed = DateTime::parse_from_rfc3339(raw)
.with_context(|| format!("Invalid RFC3339 timestamp in heartbeat DB: {raw}"))?;
Ok(parsed.with_timezone(&Utc))
}
fn sql_err(err: anyhow::Error) -> rusqlite::Error {
rusqlite::Error::ToSqlConversionFailure(err.into())
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration as ChronoDuration;
use tempfile::TempDir;
#[test]
fn record_and_list_runs() {
let tmp = TempDir::new().unwrap();
let base = Utc::now();
for i in 0..3 {
let start = base + ChronoDuration::seconds(i);
let end = start + ChronoDuration::milliseconds(100);
record_run(
tmp.path(),
&format!("Task {i}"),
"medium",
start,
end,
"ok",
Some("done"),
100,
50,
)
.unwrap();
}
let runs = list_runs(tmp.path(), 10).unwrap();
assert_eq!(runs.len(), 3);
// Most recent first
assert!(runs[0].task_text.contains('2'));
}
#[test]
fn prunes_old_runs() {
let tmp = TempDir::new().unwrap();
let base = Utc::now();
for i in 0..5 {
let start = base + ChronoDuration::seconds(i);
let end = start + ChronoDuration::milliseconds(50);
record_run(
tmp.path(),
"Task",
"high",
start,
end,
"ok",
None,
50,
2, // keep only 2
)
.unwrap();
}
let runs = list_runs(tmp.path(), 10).unwrap();
assert_eq!(runs.len(), 2);
}
#[test]
fn run_stats_counts_correctly() {
let tmp = TempDir::new().unwrap();
let now = Utc::now();
record_run(tmp.path(), "A", "high", now, now, "ok", None, 10, 50).unwrap();
record_run(
tmp.path(),
"B",
"low",
now,
now,
"error",
Some("fail"),
20,
50,
)
.unwrap();
record_run(tmp.path(), "C", "medium", now, now, "ok", None, 15, 50).unwrap();
let (total, ok, err) = run_stats(tmp.path()).unwrap();
assert_eq!(total, 3);
assert_eq!(ok, 2);
assert_eq!(err, 1);
}
#[test]
fn truncates_large_output() {
let tmp = TempDir::new().unwrap();
let now = Utc::now();
let big = "x".repeat(MAX_OUTPUT_BYTES + 512);
record_run(
tmp.path(),
"T",
"medium",
now,
now,
"ok",
Some(&big),
10,
50,
)
.unwrap();
let runs = list_runs(tmp.path(), 1).unwrap();
let stored = runs[0].output.as_deref().unwrap_or_default();
assert!(stored.ends_with(TRUNCATED_MARKER));
assert!(stored.len() <= MAX_OUTPUT_BYTES);
}
}
+130 -27
View File
@@ -10,23 +10,45 @@ use chrono::{Duration, Local};
use parking_lot::Mutex;
use rusqlite::{params, Connection};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
/// Response cache backed by a dedicated SQLite database.
/// An in-memory hot cache entry for the two-tier response cache.
struct InMemoryEntry {
response: String,
token_count: u32,
created_at: std::time::Instant,
accessed_at: std::time::Instant,
}
/// Two-tier response cache: in-memory LRU (hot) + SQLite (warm).
///
/// Lives alongside `brain.db` as `response_cache.db` so it can be
/// independently wiped without touching memories.
/// The hot cache avoids SQLite round-trips for frequently repeated prompts.
/// On miss from hot cache, falls through to SQLite. On hit from SQLite,
/// the entry is promoted to the hot cache.
pub struct ResponseCache {
conn: Mutex<Connection>,
#[allow(dead_code)]
db_path: PathBuf,
ttl_minutes: i64,
max_entries: usize,
hot_cache: Mutex<HashMap<String, InMemoryEntry>>,
hot_max_entries: usize,
}
impl ResponseCache {
/// Open (or create) the response cache database.
pub fn new(workspace_dir: &Path, ttl_minutes: u32, max_entries: usize) -> Result<Self> {
Self::with_hot_cache(workspace_dir, ttl_minutes, max_entries, 256)
}
/// Open (or create) the response cache database with a custom hot cache size.
pub fn with_hot_cache(
workspace_dir: &Path,
ttl_minutes: u32,
max_entries: usize,
hot_max_entries: usize,
) -> Result<Self> {
let db_dir = workspace_dir.join("memory");
std::fs::create_dir_all(&db_dir)?;
let db_path = db_dir.join("response_cache.db");
@@ -58,6 +80,8 @@ impl ResponseCache {
db_path,
ttl_minutes: i64::from(ttl_minutes),
max_entries,
hot_cache: Mutex::new(HashMap::new()),
hot_max_entries,
})
}
@@ -76,35 +100,77 @@ impl ResponseCache {
}
/// Look up a cached response. Returns `None` on miss or expired entry.
///
/// Two-tier lookup: checks the in-memory hot cache first, then falls
/// through to SQLite. On a SQLite hit the entry is promoted to hot cache.
#[allow(clippy::cast_sign_loss)]
pub fn get(&self, key: &str) -> Result<Option<String>> {
let conn = self.conn.lock();
let now = Local::now();
let cutoff = (now - Duration::minutes(self.ttl_minutes)).to_rfc3339();
let mut stmt = conn.prepare(
"SELECT response FROM response_cache
WHERE prompt_hash = ?1 AND created_at > ?2",
)?;
let result: Option<String> = stmt.query_row(params![key, cutoff], |row| row.get(0)).ok();
if result.is_some() {
// Bump hit count and accessed_at
let now_str = now.to_rfc3339();
conn.execute(
"UPDATE response_cache
SET accessed_at = ?1, hit_count = hit_count + 1
WHERE prompt_hash = ?2",
params![now_str, key],
)?;
// Tier 1: hot cache (with TTL check)
{
let mut hot = self.hot_cache.lock();
if let Some(entry) = hot.get_mut(key) {
let ttl = std::time::Duration::from_secs(self.ttl_minutes as u64 * 60);
if entry.created_at.elapsed() > ttl {
hot.remove(key);
} else {
entry.accessed_at = std::time::Instant::now();
let response = entry.response.clone();
drop(hot);
// Still bump SQLite hit count for accurate stats
let conn = self.conn.lock();
let now_str = Local::now().to_rfc3339();
conn.execute(
"UPDATE response_cache
SET accessed_at = ?1, hit_count = hit_count + 1
WHERE prompt_hash = ?2",
params![now_str, key],
)?;
return Ok(Some(response));
}
}
}
Ok(result)
// Tier 2: SQLite (warm)
let result: Option<(String, u32)> = {
let conn = self.conn.lock();
let now = Local::now();
let cutoff = (now - Duration::minutes(self.ttl_minutes)).to_rfc3339();
let mut stmt = conn.prepare(
"SELECT response, token_count FROM response_cache
WHERE prompt_hash = ?1 AND created_at > ?2",
)?;
let result: Option<(String, u32)> = stmt
.query_row(params![key, cutoff], |row| Ok((row.get(0)?, row.get(1)?)))
.ok();
if result.is_some() {
let now_str = now.to_rfc3339();
conn.execute(
"UPDATE response_cache
SET accessed_at = ?1, hit_count = hit_count + 1
WHERE prompt_hash = ?2",
params![now_str, key],
)?;
}
result
};
if let Some((ref response, token_count)) = result {
self.promote_to_hot(key, response, token_count);
}
Ok(result.map(|(r, _)| r))
}
/// Store a response in the cache.
/// Store a response in the cache (both hot and warm tiers).
pub fn put(&self, key: &str, model: &str, response: &str, token_count: u32) -> Result<()> {
// Write to hot cache
self.promote_to_hot(key, response, token_count);
// Write to SQLite (warm)
let conn = self.conn.lock();
let now = Local::now().to_rfc3339();
@@ -138,6 +204,43 @@ impl ResponseCache {
Ok(())
}
/// Promote an entry to the in-memory hot cache, evicting the oldest if full.
fn promote_to_hot(&self, key: &str, response: &str, token_count: u32) {
let mut hot = self.hot_cache.lock();
// If already present, just update (keep original created_at for TTL)
if let Some(entry) = hot.get_mut(key) {
entry.response = response.to_string();
entry.token_count = token_count;
entry.accessed_at = std::time::Instant::now();
return;
}
// Evict oldest entry if at capacity
if self.hot_max_entries > 0 && hot.len() >= self.hot_max_entries {
if let Some(oldest_key) = hot
.iter()
.min_by_key(|(_, v)| v.accessed_at)
.map(|(k, _)| k.clone())
{
hot.remove(&oldest_key);
}
}
if self.hot_max_entries > 0 {
let now = std::time::Instant::now();
hot.insert(
key.to_string(),
InMemoryEntry {
response: response.to_string(),
token_count,
created_at: now,
accessed_at: now,
},
);
}
}
/// Return cache statistics: (total_entries, total_hits, total_tokens_saved).
pub fn stats(&self) -> Result<(usize, u64, u64)> {
let conn = self.conn.lock();
@@ -163,8 +266,8 @@ impl ResponseCache {
/// Wipe the entire cache (useful for `zeroclaw cache clear`).
pub fn clear(&self) -> Result<usize> {
self.hot_cache.lock().clear();
let conn = self.conn.lock();
let affected = conn.execute("DELETE FROM response_cache", [])?;
Ok(affected)
}
+9
View File
@@ -47,6 +47,15 @@ impl Observer for LogObserver {
ObserverEvent::HeartbeatTick => {
info!("heartbeat.tick");
}
ObserverEvent::CacheHit {
cache_type,
tokens_saved,
} => {
info!(cache_type = %cache_type, tokens_saved = tokens_saved, "cache.hit");
}
ObserverEvent::CacheMiss { cache_type } => {
info!(cache_type = %cache_type, "cache.miss");
}
ObserverEvent::Error { component, message } => {
info!(component = %component, error = %message, "error");
}
+42
View File
@@ -16,6 +16,9 @@ pub struct PrometheusObserver {
channel_messages: IntCounterVec,
heartbeat_ticks: prometheus::IntCounter,
errors: IntCounterVec,
cache_hits: IntCounterVec,
cache_misses: IntCounterVec,
cache_tokens_saved: IntCounterVec,
// Histograms
agent_duration: HistogramVec,
@@ -81,6 +84,27 @@ impl PrometheusObserver {
)
.expect("valid metric");
let cache_hits = IntCounterVec::new(
prometheus::Opts::new("zeroclaw_cache_hits_total", "Total response cache hits"),
&["cache_type"],
)
.expect("valid metric");
let cache_misses = IntCounterVec::new(
prometheus::Opts::new("zeroclaw_cache_misses_total", "Total response cache misses"),
&["cache_type"],
)
.expect("valid metric");
let cache_tokens_saved = IntCounterVec::new(
prometheus::Opts::new(
"zeroclaw_cache_tokens_saved_total",
"Total tokens saved by response cache",
),
&["cache_type"],
)
.expect("valid metric");
let agent_duration = HistogramVec::new(
HistogramOpts::new(
"zeroclaw_agent_duration_seconds",
@@ -139,6 +163,9 @@ impl PrometheusObserver {
registry.register(Box::new(channel_messages.clone())).ok();
registry.register(Box::new(heartbeat_ticks.clone())).ok();
registry.register(Box::new(errors.clone())).ok();
registry.register(Box::new(cache_hits.clone())).ok();
registry.register(Box::new(cache_misses.clone())).ok();
registry.register(Box::new(cache_tokens_saved.clone())).ok();
registry.register(Box::new(agent_duration.clone())).ok();
registry.register(Box::new(tool_duration.clone())).ok();
registry.register(Box::new(request_latency.clone())).ok();
@@ -156,6 +183,9 @@ impl PrometheusObserver {
channel_messages,
heartbeat_ticks,
errors,
cache_hits,
cache_misses,
cache_tokens_saved,
agent_duration,
tool_duration,
request_latency,
@@ -245,6 +275,18 @@ impl Observer for PrometheusObserver {
ObserverEvent::HeartbeatTick => {
self.heartbeat_ticks.inc();
}
ObserverEvent::CacheHit {
cache_type,
tokens_saved,
} => {
self.cache_hits.with_label_values(&[cache_type]).inc();
self.cache_tokens_saved
.with_label_values(&[cache_type])
.inc_by(*tokens_saved);
}
ObserverEvent::CacheMiss { cache_type } => {
self.cache_misses.with_label_values(&[cache_type]).inc();
}
ObserverEvent::Error {
component,
message: _,
+12
View File
@@ -61,6 +61,18 @@ pub enum ObserverEvent {
},
/// Periodic heartbeat tick from the runtime keep-alive loop.
HeartbeatTick,
/// Response cache hit — an LLM call was avoided.
CacheHit {
/// `"hot"` (in-memory) or `"warm"` (SQLite).
cache_type: String,
/// Estimated tokens saved by this cache hit.
tokens_saved: u64,
},
/// Response cache miss — the prompt was not found in cache.
CacheMiss {
/// `"response"` cache layer that was checked.
cache_type: String,
},
/// An error occurred in a named component.
Error {
/// Subsystem where the error originated (e.g., `"provider"`, `"gateway"`).
+1
View File
@@ -402,6 +402,7 @@ fn memory_config_defaults_for_backend(backend: &str) -> MemoryConfig {
response_cache_enabled: false,
response_cache_ttl_minutes: 60,
response_cache_max_entries: 5_000,
response_cache_hot_entries: 256,
snapshot_enabled: false,
snapshot_on_hygiene: false,
auto_hydrate: true,
+6
View File
@@ -149,6 +149,10 @@ struct AnthropicUsage {
input_tokens: Option<u64>,
#[serde(default)]
output_tokens: Option<u64>,
#[serde(default)]
cache_creation_input_tokens: Option<u64>,
#[serde(default)]
cache_read_input_tokens: Option<u64>,
}
#[derive(Debug, Deserialize)]
@@ -475,6 +479,7 @@ impl AnthropicProvider {
let usage = response.usage.map(|u| TokenUsage {
input_tokens: u.input_tokens,
output_tokens: u.output_tokens,
cached_input_tokens: u.cache_read_input_tokens,
});
for block in response.content {
@@ -614,6 +619,7 @@ impl Provider for AnthropicProvider {
ProviderCapabilities {
native_tool_calling: true,
vision: true,
prompt_caching: true,
}
}
+3
View File
@@ -312,6 +312,7 @@ impl Provider for AzureOpenAiProvider {
ProviderCapabilities {
native_tool_calling: true,
vision: true,
prompt_caching: false,
}
}
@@ -431,6 +432,7 @@ impl Provider for AzureOpenAiProvider {
let usage = native_response.usage.map(|u| TokenUsage {
input_tokens: u.prompt_tokens,
output_tokens: u.completion_tokens,
cached_input_tokens: None,
});
let message = native_response
.choices
@@ -491,6 +493,7 @@ impl Provider for AzureOpenAiProvider {
let usage = native_response.usage.map(|u| TokenUsage {
input_tokens: u.prompt_tokens,
output_tokens: u.completion_tokens,
cached_input_tokens: None,
});
let message = native_response
.choices
+2
View File
@@ -832,6 +832,7 @@ impl BedrockProvider {
let usage = response.usage.map(|u| TokenUsage {
input_tokens: u.input_tokens,
output_tokens: u.output_tokens,
cached_input_tokens: None,
});
if let Some(output) = response.output {
@@ -967,6 +968,7 @@ impl Provider for BedrockProvider {
ProviderCapabilities {
native_tool_calling: true,
vision: true,
prompt_caching: false,
}
}
+3
View File
@@ -1193,6 +1193,7 @@ impl Provider for OpenAiCompatibleProvider {
crate::providers::traits::ProviderCapabilities {
native_tool_calling: self.native_tool_calling,
vision: self.supports_vision,
prompt_caching: false,
}
}
@@ -1514,6 +1515,7 @@ impl Provider for OpenAiCompatibleProvider {
let usage = chat_response.usage.map(|u| TokenUsage {
input_tokens: u.prompt_tokens,
output_tokens: u.completion_tokens,
cached_input_tokens: None,
});
let choice = chat_response
.choices
@@ -1657,6 +1659,7 @@ impl Provider for OpenAiCompatibleProvider {
let usage = native_response.usage.map(|u| TokenUsage {
input_tokens: u.prompt_tokens,
output_tokens: u.completion_tokens,
cached_input_tokens: None,
});
let message = native_response
.choices
+1
View File
@@ -353,6 +353,7 @@ impl CopilotProvider {
let usage = api_response.usage.map(|u| TokenUsage {
input_tokens: u.prompt_tokens,
output_tokens: u.completion_tokens,
cached_input_tokens: None,
});
let choice = api_response
.choices
+1
View File
@@ -1128,6 +1128,7 @@ impl GeminiProvider {
let usage = result.usage_metadata.map(|u| TokenUsage {
input_tokens: u.prompt_token_count,
output_tokens: u.candidates_token_count,
cached_input_tokens: None,
});
let text = result
+2
View File
@@ -632,6 +632,7 @@ impl Provider for OllamaProvider {
ProviderCapabilities {
native_tool_calling: true,
vision: true,
prompt_caching: false,
}
}
@@ -764,6 +765,7 @@ impl Provider for OllamaProvider {
Some(TokenUsage {
input_tokens: response.prompt_eval_count,
output_tokens: response.eval_count,
cached_input_tokens: None,
})
} else {
None
+10
View File
@@ -135,6 +135,14 @@ struct UsageInfo {
prompt_tokens: Option<u64>,
#[serde(default)]
completion_tokens: Option<u64>,
#[serde(default)]
prompt_tokens_details: Option<PromptTokensDetails>,
}
#[derive(Debug, Deserialize)]
struct PromptTokensDetails {
#[serde(default)]
cached_tokens: Option<u64>,
}
#[derive(Debug, Deserialize)]
@@ -385,6 +393,7 @@ impl Provider for OpenAiProvider {
let usage = native_response.usage.map(|u| TokenUsage {
input_tokens: u.prompt_tokens,
output_tokens: u.completion_tokens,
cached_input_tokens: u.prompt_tokens_details.and_then(|d| d.cached_tokens),
});
let message = native_response
.choices
@@ -448,6 +457,7 @@ impl Provider for OpenAiProvider {
let usage = native_response.usage.map(|u| TokenUsage {
input_tokens: u.prompt_tokens,
output_tokens: u.completion_tokens,
cached_input_tokens: u.prompt_tokens_details.and_then(|d| d.cached_tokens),
});
let message = native_response
.choices
+1
View File
@@ -640,6 +640,7 @@ impl Provider for OpenAiCodexProvider {
ProviderCapabilities {
native_tool_calling: false,
vision: true,
prompt_caching: false,
}
}
+3
View File
@@ -306,6 +306,7 @@ impl Provider for OpenRouterProvider {
ProviderCapabilities {
native_tool_calling: true,
vision: true,
prompt_caching: false,
}
}
@@ -463,6 +464,7 @@ impl Provider for OpenRouterProvider {
let usage = native_response.usage.map(|u| TokenUsage {
input_tokens: u.prompt_tokens,
output_tokens: u.completion_tokens,
cached_input_tokens: None,
});
let message = native_response
.choices
@@ -554,6 +556,7 @@ impl Provider for OpenRouterProvider {
let usage = native_response.usage.map(|u| TokenUsage {
input_tokens: u.prompt_tokens,
output_tokens: u.completion_tokens,
cached_input_tokens: None,
});
let message = native_response
.choices
+11
View File
@@ -54,6 +54,9 @@ pub struct ToolCall {
pub struct TokenUsage {
pub input_tokens: Option<u64>,
pub output_tokens: Option<u64>,
/// Tokens served from the provider's prompt cache (Anthropic `cache_read_input_tokens`,
/// OpenAI `prompt_tokens_details.cached_tokens`).
pub cached_input_tokens: Option<u64>,
}
/// An LLM response that may contain text, tool calls, or both.
@@ -233,6 +236,9 @@ pub struct ProviderCapabilities {
pub native_tool_calling: bool,
/// Whether the provider supports vision / image inputs.
pub vision: bool,
/// Whether the provider supports prompt caching (Anthropic cache_control,
/// OpenAI automatic prompt caching).
pub prompt_caching: bool,
}
/// Provider-specific tool payload formats.
@@ -498,6 +504,7 @@ mod tests {
ProviderCapabilities {
native_tool_calling: true,
vision: true,
prompt_caching: false,
}
}
@@ -568,6 +575,7 @@ mod tests {
usage: Some(TokenUsage {
input_tokens: Some(100),
output_tokens: Some(50),
cached_input_tokens: None,
}),
reasoning_content: None,
};
@@ -613,14 +621,17 @@ mod tests {
let caps1 = ProviderCapabilities {
native_tool_calling: true,
vision: false,
prompt_caching: false,
};
let caps2 = ProviderCapabilities {
native_tool_calling: true,
vision: false,
prompt_caching: false,
};
let caps3 = ProviderCapabilities {
native_tool_calling: false,
vision: false,
prompt_caching: false,
};
assert_eq!(caps1, caps2);
+2
View File
@@ -166,6 +166,7 @@ impl Provider for TraceLlmProvider {
usage: Some(TokenUsage {
input_tokens: Some(input_tokens),
output_tokens: Some(output_tokens),
cached_input_tokens: None,
}),
reasoning_content: None,
}),
@@ -188,6 +189,7 @@ impl Provider for TraceLlmProvider {
usage: Some(TokenUsage {
input_tokens: Some(input_tokens),
output_tokens: Some(output_tokens),
cached_input_tokens: None,
}),
reasoning_content: None,
})