- Add SessionBackend trait abstracting over storage backends (load,
append, remove_last, list, search, cleanup_stale, compact)
- Add SqliteSessionBackend with WAL mode, FTS5 full-text search,
session metadata tracking, and TTL-based cleanup
- Add remove_last() and compact() to JSONL SessionStore
- Implement SessionBackend for both JSONL and SQLite backends
- Add automatic JSONL-to-SQLite migration (renames .jsonl → .jsonl.migrated)
- Add config: session_backend ("jsonl"/"sqlite"), session_ttl_hours
- SQLite is the new default backend; JSONL preserved for backward compat
312 lines
10 KiB
Rust
312 lines
10 KiB
Rust
//! JSONL-based session persistence for channel conversations.
|
|
//!
|
|
//! Each session (keyed by `channel_sender` or `channel_thread_sender`) is stored
|
|
//! as an append-only JSONL file in `{workspace}/sessions/`. Messages are appended
|
|
//! one-per-line as JSON, never modifying old lines. On daemon restart, sessions
|
|
//! are loaded from disk to restore conversation context.
|
|
|
|
use crate::channels::session_backend::SessionBackend;
|
|
use crate::providers::traits::ChatMessage;
|
|
use std::io::{BufRead, Write};
|
|
use std::path::{Path, PathBuf};
|
|
|
|
/// Append-only JSONL session store for channel conversations.
|
|
pub struct SessionStore {
|
|
sessions_dir: PathBuf,
|
|
}
|
|
|
|
impl SessionStore {
|
|
/// Create a new session store, ensuring the sessions directory exists.
|
|
pub fn new(workspace_dir: &Path) -> std::io::Result<Self> {
|
|
let sessions_dir = workspace_dir.join("sessions");
|
|
std::fs::create_dir_all(&sessions_dir)?;
|
|
Ok(Self { sessions_dir })
|
|
}
|
|
|
|
/// Compute the file path for a session key, sanitizing for filesystem safety.
|
|
fn session_path(&self, session_key: &str) -> PathBuf {
|
|
let safe_key: String = session_key
|
|
.chars()
|
|
.map(|c| {
|
|
if c.is_alphanumeric() || c == '_' || c == '-' {
|
|
c
|
|
} else {
|
|
'_'
|
|
}
|
|
})
|
|
.collect();
|
|
self.sessions_dir.join(format!("{safe_key}.jsonl"))
|
|
}
|
|
|
|
/// Load all messages for a session from its JSONL file.
|
|
/// Returns an empty vec if the file does not exist or is unreadable.
|
|
pub fn load(&self, session_key: &str) -> Vec<ChatMessage> {
|
|
let path = self.session_path(session_key);
|
|
let file = match std::fs::File::open(&path) {
|
|
Ok(f) => f,
|
|
Err(_) => return Vec::new(),
|
|
};
|
|
|
|
let reader = std::io::BufReader::new(file);
|
|
let mut messages = Vec::new();
|
|
|
|
for line in reader.lines() {
|
|
let Ok(line) = line else { continue };
|
|
let trimmed = line.trim();
|
|
if trimmed.is_empty() {
|
|
continue;
|
|
}
|
|
if let Ok(msg) = serde_json::from_str::<ChatMessage>(trimmed) {
|
|
messages.push(msg);
|
|
}
|
|
}
|
|
|
|
messages
|
|
}
|
|
|
|
/// Append a single message to the session JSONL file.
|
|
pub fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
|
|
let path = self.session_path(session_key);
|
|
let mut file = std::fs::OpenOptions::new()
|
|
.create(true)
|
|
.append(true)
|
|
.open(&path)?;
|
|
|
|
let json = serde_json::to_string(message)
|
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
|
|
|
|
writeln!(file, "{json}")?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Remove the last message from a session's JSONL file.
|
|
///
|
|
/// Rewrite approach: load all messages, drop the last, rewrite. This is
|
|
/// O(n) but rollbacks are rare.
|
|
pub fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
|
|
let mut messages = self.load(session_key);
|
|
if messages.is_empty() {
|
|
return Ok(false);
|
|
}
|
|
messages.pop();
|
|
self.rewrite(session_key, &messages)?;
|
|
Ok(true)
|
|
}
|
|
|
|
/// Compact a session file by rewriting only valid messages (removes corrupt lines).
|
|
pub fn compact(&self, session_key: &str) -> std::io::Result<()> {
|
|
let messages = self.load(session_key);
|
|
self.rewrite(session_key, &messages)
|
|
}
|
|
|
|
fn rewrite(&self, session_key: &str, messages: &[ChatMessage]) -> std::io::Result<()> {
|
|
let path = self.session_path(session_key);
|
|
let mut file = std::fs::File::create(&path)?;
|
|
for msg in messages {
|
|
let json = serde_json::to_string(msg)
|
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
|
|
writeln!(file, "{json}")?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// List all session keys that have files on disk.
|
|
pub fn list_sessions(&self) -> Vec<String> {
|
|
let entries = match std::fs::read_dir(&self.sessions_dir) {
|
|
Ok(e) => e,
|
|
Err(_) => return Vec::new(),
|
|
};
|
|
|
|
entries
|
|
.filter_map(|entry| {
|
|
let entry = entry.ok()?;
|
|
let name = entry.file_name().into_string().ok()?;
|
|
name.strip_suffix(".jsonl").map(String::from)
|
|
})
|
|
.collect()
|
|
}
|
|
}
|
|
|
|
impl SessionBackend for SessionStore {
|
|
fn load(&self, session_key: &str) -> Vec<ChatMessage> {
|
|
self.load(session_key)
|
|
}
|
|
|
|
fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
|
|
self.append(session_key, message)
|
|
}
|
|
|
|
fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
|
|
self.remove_last(session_key)
|
|
}
|
|
|
|
fn list_sessions(&self) -> Vec<String> {
|
|
self.list_sessions()
|
|
}
|
|
|
|
fn compact(&self, session_key: &str) -> std::io::Result<()> {
|
|
self.compact(session_key)
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use tempfile::TempDir;
|
|
|
|
#[test]
|
|
fn round_trip_append_and_load() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let store = SessionStore::new(tmp.path()).unwrap();
|
|
|
|
store
|
|
.append("telegram_user123", &ChatMessage::user("hello"))
|
|
.unwrap();
|
|
store
|
|
.append("telegram_user123", &ChatMessage::assistant("hi there"))
|
|
.unwrap();
|
|
|
|
let messages = store.load("telegram_user123");
|
|
assert_eq!(messages.len(), 2);
|
|
assert_eq!(messages[0].role, "user");
|
|
assert_eq!(messages[0].content, "hello");
|
|
assert_eq!(messages[1].role, "assistant");
|
|
assert_eq!(messages[1].content, "hi there");
|
|
}
|
|
|
|
#[test]
|
|
fn load_nonexistent_session_returns_empty() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let store = SessionStore::new(tmp.path()).unwrap();
|
|
|
|
let messages = store.load("nonexistent");
|
|
assert!(messages.is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn key_sanitization() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let store = SessionStore::new(tmp.path()).unwrap();
|
|
|
|
// Keys with special chars should be sanitized
|
|
store
|
|
.append("slack/thread:123/user", &ChatMessage::user("test"))
|
|
.unwrap();
|
|
|
|
let messages = store.load("slack/thread:123/user");
|
|
assert_eq!(messages.len(), 1);
|
|
}
|
|
|
|
#[test]
|
|
fn list_sessions_returns_keys() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let store = SessionStore::new(tmp.path()).unwrap();
|
|
|
|
store
|
|
.append("telegram_alice", &ChatMessage::user("hi"))
|
|
.unwrap();
|
|
store
|
|
.append("discord_bob", &ChatMessage::user("hey"))
|
|
.unwrap();
|
|
|
|
let mut sessions = store.list_sessions();
|
|
sessions.sort();
|
|
assert_eq!(sessions.len(), 2);
|
|
assert!(sessions.contains(&"discord_bob".to_string()));
|
|
assert!(sessions.contains(&"telegram_alice".to_string()));
|
|
}
|
|
|
|
#[test]
|
|
fn append_is_truly_append_only() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let store = SessionStore::new(tmp.path()).unwrap();
|
|
let key = "test_session";
|
|
|
|
store.append(key, &ChatMessage::user("msg1")).unwrap();
|
|
store.append(key, &ChatMessage::user("msg2")).unwrap();
|
|
|
|
// Read raw file to verify append-only format
|
|
let path = store.session_path(key);
|
|
let content = std::fs::read_to_string(&path).unwrap();
|
|
let lines: Vec<&str> = content.trim().lines().collect();
|
|
assert_eq!(lines.len(), 2);
|
|
}
|
|
|
|
#[test]
|
|
fn remove_last_drops_final_message() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let store = SessionStore::new(tmp.path()).unwrap();
|
|
|
|
store
|
|
.append("rm_test", &ChatMessage::user("first"))
|
|
.unwrap();
|
|
store
|
|
.append("rm_test", &ChatMessage::user("second"))
|
|
.unwrap();
|
|
|
|
assert!(store.remove_last("rm_test").unwrap());
|
|
let messages = store.load("rm_test");
|
|
assert_eq!(messages.len(), 1);
|
|
assert_eq!(messages[0].content, "first");
|
|
}
|
|
|
|
#[test]
|
|
fn remove_last_empty_returns_false() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let store = SessionStore::new(tmp.path()).unwrap();
|
|
assert!(!store.remove_last("nonexistent").unwrap());
|
|
}
|
|
|
|
#[test]
|
|
fn compact_removes_corrupt_lines() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let store = SessionStore::new(tmp.path()).unwrap();
|
|
let key = "compact_test";
|
|
|
|
let path = store.session_path(key);
|
|
std::fs::create_dir_all(path.parent().unwrap()).unwrap();
|
|
let mut file = std::fs::File::create(&path).unwrap();
|
|
writeln!(file, r#"{{"role":"user","content":"ok"}}"#).unwrap();
|
|
writeln!(file, "corrupt line").unwrap();
|
|
writeln!(file, r#"{{"role":"assistant","content":"hi"}}"#).unwrap();
|
|
|
|
store.compact(key).unwrap();
|
|
|
|
let raw = std::fs::read_to_string(&path).unwrap();
|
|
assert_eq!(raw.trim().lines().count(), 2);
|
|
}
|
|
|
|
#[test]
|
|
fn session_backend_trait_works_via_dyn() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let store = SessionStore::new(tmp.path()).unwrap();
|
|
let backend: &dyn SessionBackend = &store;
|
|
|
|
backend
|
|
.append("trait_test", &ChatMessage::user("hello"))
|
|
.unwrap();
|
|
let msgs = backend.load("trait_test");
|
|
assert_eq!(msgs.len(), 1);
|
|
}
|
|
|
|
#[test]
|
|
fn handles_corrupt_lines_gracefully() {
|
|
let tmp = TempDir::new().unwrap();
|
|
let store = SessionStore::new(tmp.path()).unwrap();
|
|
let key = "corrupt_test";
|
|
|
|
// Write valid message + corrupt line + valid message
|
|
let path = store.session_path(key);
|
|
std::fs::create_dir_all(path.parent().unwrap()).unwrap();
|
|
let mut file = std::fs::File::create(&path).unwrap();
|
|
writeln!(file, r#"{{"role":"user","content":"hello"}}"#).unwrap();
|
|
writeln!(file, "this is not valid json").unwrap();
|
|
writeln!(file, r#"{{"role":"assistant","content":"world"}}"#).unwrap();
|
|
|
|
let messages = store.load(key);
|
|
assert_eq!(messages.len(), 2);
|
|
assert_eq!(messages[0].content, "hello");
|
|
assert_eq!(messages[1].content, "world");
|
|
}
|
|
}
|