zeroclaw/src/channels/session_store.rs
Argenis 93b5a0b824
feat(context): token-based compaction, persistent sessions, and LLM consolidation (#3574)
Comprehensive long-running context upgrades:

- Token-based compaction: replace message-count trigger with token
  estimation (~4 chars/token). Compaction fires when estimated tokens
  exceed max_context_tokens (default 32K) OR message count exceeds
  max_history_messages. Cuts at user-turn boundaries only.

- Persistent sessions: JSONL append-only session files per channel
  sender in {workspace}/sessions/. Sessions survive daemon restarts.
  Hydrates in-memory history from disk on startup.

- LLM-driven memory consolidation: two-phase extraction after each
  conversation turn. Phase 1 writes a timestamped history entry (Daily).
  Phase 2 extracts new facts/preferences to Core memory (if any).
  Replaces raw message auto-save with semantic extraction.

- New config fields: agent.max_context_tokens (32000),
  channels_config.session_persistence (true).

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 09:25:23 -04:00

201 lines
6.6 KiB
Rust

//! JSONL-based session persistence for channel conversations.
//!
//! Each session (keyed by `channel_sender` or `channel_thread_sender`) is stored
//! as an append-only JSONL file in `{workspace}/sessions/`. Messages are appended
//! one-per-line as JSON, never modifying old lines. On daemon restart, sessions
//! are loaded from disk to restore conversation context.
use crate::providers::traits::ChatMessage;
use std::io::{BufRead, Write};
use std::path::{Path, PathBuf};
/// Append-only JSONL session store for channel conversations.
pub struct SessionStore {
sessions_dir: PathBuf,
}
impl SessionStore {
/// Create a new session store, ensuring the sessions directory exists.
pub fn new(workspace_dir: &Path) -> std::io::Result<Self> {
let sessions_dir = workspace_dir.join("sessions");
std::fs::create_dir_all(&sessions_dir)?;
Ok(Self { sessions_dir })
}
/// Compute the file path for a session key, sanitizing for filesystem safety.
fn session_path(&self, session_key: &str) -> PathBuf {
let safe_key: String = session_key
.chars()
.map(|c| {
if c.is_alphanumeric() || c == '_' || c == '-' {
c
} else {
'_'
}
})
.collect();
self.sessions_dir.join(format!("{safe_key}.jsonl"))
}
/// Load all messages for a session from its JSONL file.
/// Returns an empty vec if the file does not exist or is unreadable.
pub fn load(&self, session_key: &str) -> Vec<ChatMessage> {
let path = self.session_path(session_key);
let file = match std::fs::File::open(&path) {
Ok(f) => f,
Err(_) => return Vec::new(),
};
let reader = std::io::BufReader::new(file);
let mut messages = Vec::new();
for line in reader.lines() {
let Ok(line) = line else { continue };
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(msg) = serde_json::from_str::<ChatMessage>(trimmed) {
messages.push(msg);
}
}
messages
}
/// Append a single message to the session JSONL file.
pub fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
let path = self.session_path(session_key);
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)?;
let json = serde_json::to_string(message)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
writeln!(file, "{json}")?;
Ok(())
}
/// List all session keys that have files on disk.
pub fn list_sessions(&self) -> Vec<String> {
let entries = match std::fs::read_dir(&self.sessions_dir) {
Ok(e) => e,
Err(_) => return Vec::new(),
};
entries
.filter_map(|entry| {
let entry = entry.ok()?;
let name = entry.file_name().into_string().ok()?;
name.strip_suffix(".jsonl").map(String::from)
})
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn round_trip_append_and_load() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
store
.append("telegram_user123", &ChatMessage::user("hello"))
.unwrap();
store
.append("telegram_user123", &ChatMessage::assistant("hi there"))
.unwrap();
let messages = store.load("telegram_user123");
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].role, "user");
assert_eq!(messages[0].content, "hello");
assert_eq!(messages[1].role, "assistant");
assert_eq!(messages[1].content, "hi there");
}
#[test]
fn load_nonexistent_session_returns_empty() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
let messages = store.load("nonexistent");
assert!(messages.is_empty());
}
#[test]
fn key_sanitization() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
// Keys with special chars should be sanitized
store
.append("slack/thread:123/user", &ChatMessage::user("test"))
.unwrap();
let messages = store.load("slack/thread:123/user");
assert_eq!(messages.len(), 1);
}
#[test]
fn list_sessions_returns_keys() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
store
.append("telegram_alice", &ChatMessage::user("hi"))
.unwrap();
store
.append("discord_bob", &ChatMessage::user("hey"))
.unwrap();
let mut sessions = store.list_sessions();
sessions.sort();
assert_eq!(sessions.len(), 2);
assert!(sessions.contains(&"discord_bob".to_string()));
assert!(sessions.contains(&"telegram_alice".to_string()));
}
#[test]
fn append_is_truly_append_only() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
let key = "test_session";
store.append(key, &ChatMessage::user("msg1")).unwrap();
store.append(key, &ChatMessage::user("msg2")).unwrap();
// Read raw file to verify append-only format
let path = store.session_path(key);
let content = std::fs::read_to_string(&path).unwrap();
let lines: Vec<&str> = content.trim().lines().collect();
assert_eq!(lines.len(), 2);
}
#[test]
fn handles_corrupt_lines_gracefully() {
let tmp = TempDir::new().unwrap();
let store = SessionStore::new(tmp.path()).unwrap();
let key = "corrupt_test";
// Write valid message + corrupt line + valid message
let path = store.session_path(key);
std::fs::create_dir_all(path.parent().unwrap()).unwrap();
let mut file = std::fs::File::create(&path).unwrap();
writeln!(file, r#"{{"role":"user","content":"hello"}}"#).unwrap();
writeln!(file, "this is not valid json").unwrap();
writeln!(file, r#"{{"role":"assistant","content":"world"}}"#).unwrap();
let messages = store.load(key);
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].content, "hello");
assert_eq!(messages[1].content, "world");
}
}