diff --git a/src/config/schema.rs b/src/config/schema.rs index c0f7f6d08..0e8a79ae2 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -1897,10 +1897,11 @@ impl Default for QdrantConfig { #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] #[allow(clippy::struct_excessive_bools)] pub struct MemoryConfig { - /// "sqlite" | "lucid" | "postgres" | "qdrant" | "markdown" | "none" (`none` = explicit no-op memory) + /// "sqlite" | "lucid" | "postgres" | "qdrant" | "sqlite_qdrant_hybrid" | "markdown" | "none" (`none` = explicit no-op memory) /// /// `postgres` requires `[storage.provider.config]` with `db_url` (`dbURL` alias supported). /// `qdrant` uses `[memory.qdrant]` config or `QDRANT_URL` env var. + /// `sqlite_qdrant_hybrid` uses SQLite as authoritative store with Qdrant vector sync; requires `[memory.qdrant]` config. pub backend: String, /// Auto-save user-stated conversation input to memory (assistant output is excluded) pub auto_save: bool, @@ -1973,7 +1974,7 @@ pub struct MemoryConfig { // ── Qdrant backend options ───────────────────────────────── /// Configuration for Qdrant vector database backend. - /// Only used when `backend = "qdrant"`. + /// Used when `backend = "qdrant"` or `backend = "sqlite_qdrant_hybrid"`. #[serde(default)] pub qdrant: QdrantConfig, } diff --git a/src/lib.rs b/src/lib.rs index ace154e6f..22cbfd335 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -381,6 +381,8 @@ pub enum MemoryCommands { }, /// Show memory backend statistics and health Stats, + /// Rebuild indexes and re-sync derived data (FTS5, embeddings, vector stores) + Reindex, /// Clear memories by category, by key, or clear all Clear { /// Delete a single entry by key (supports prefix match) diff --git a/src/main.rs b/src/main.rs index 80a74f8e5..ebe8f0571 100644 --- a/src/main.rs +++ b/src/main.rs @@ -638,6 +638,8 @@ enum MemoryCommands { Get { key: String }, /// Show memory backend statistics and health Stats, + /// Rebuild indexes and re-sync derived data (FTS5, embeddings, vector stores) + Reindex, /// Clear memories by category, by key, or clear all Clear { /// Delete a single entry by key (supports prefix match) diff --git a/src/memory/backend.rs b/src/memory/backend.rs index 353d1b3cd..2c6f78b80 100644 --- a/src/memory/backend.rs +++ b/src/memory/backend.rs @@ -4,6 +4,7 @@ pub enum MemoryBackendKind { Lucid, Postgres, Qdrant, + SqliteQdrantHybrid, Markdown, None, Unknown, @@ -65,6 +66,15 @@ const QDRANT_PROFILE: MemoryBackendProfile = MemoryBackendProfile { optional_dependency: false, }; +const SQLITE_QDRANT_HYBRID_PROFILE: MemoryBackendProfile = MemoryBackendProfile { + key: "sqlite_qdrant_hybrid", + label: "SQLite + Qdrant Hybrid — SQLite authoritative store with Qdrant vector sync", + auto_save_default: true, + uses_sqlite_hygiene: true, + sqlite_based: true, + optional_dependency: false, +}; + const NONE_PROFILE: MemoryBackendProfile = MemoryBackendProfile { key: "none", label: "None — disable persistent memory", @@ -104,6 +114,7 @@ pub fn classify_memory_backend(backend: &str) -> MemoryBackendKind { "lucid" => MemoryBackendKind::Lucid, "postgres" => MemoryBackendKind::Postgres, "qdrant" => MemoryBackendKind::Qdrant, + "sqlite_qdrant_hybrid" => MemoryBackendKind::SqliteQdrantHybrid, "markdown" => MemoryBackendKind::Markdown, "none" => MemoryBackendKind::None, _ => MemoryBackendKind::Unknown, @@ -116,6 +127,7 @@ pub fn memory_backend_profile(backend: &str) -> MemoryBackendProfile { MemoryBackendKind::Lucid => LUCID_PROFILE, MemoryBackendKind::Postgres => POSTGRES_PROFILE, MemoryBackendKind::Qdrant => QDRANT_PROFILE, + MemoryBackendKind::SqliteQdrantHybrid => SQLITE_QDRANT_HYBRID_PROFILE, MemoryBackendKind::Markdown => MARKDOWN_PROFILE, MemoryBackendKind::None => NONE_PROFILE, MemoryBackendKind::Unknown => CUSTOM_PROFILE, @@ -139,6 +151,10 @@ mod tests { MemoryBackendKind::Markdown ); assert_eq!(classify_memory_backend("none"), MemoryBackendKind::None); + assert_eq!( + classify_memory_backend("sqlite_qdrant_hybrid"), + MemoryBackendKind::SqliteQdrantHybrid + ); } #[test] @@ -164,6 +180,15 @@ mod tests { assert!(profile.uses_sqlite_hygiene); } + #[test] + fn sqlite_qdrant_hybrid_profile_is_sqlite_based() { + let profile = memory_backend_profile("sqlite_qdrant_hybrid"); + assert_eq!(profile.key, "sqlite_qdrant_hybrid"); + assert!(profile.sqlite_based); + assert!(profile.uses_sqlite_hygiene); + assert!(profile.auto_save_default); + } + #[test] fn unknown_profile_preserves_extensibility_defaults() { let profile = memory_backend_profile("custom-memory"); diff --git a/src/memory/cli.rs b/src/memory/cli.rs index 6feff1d99..186f393fa 100644 --- a/src/memory/cli.rs +++ b/src/memory/cli.rs @@ -1,7 +1,7 @@ use super::traits::{Memory, MemoryCategory}; use super::{ - classify_memory_backend, create_memory_for_migration, effective_memory_backend_name, - MemoryBackendKind, + classify_memory_backend, create_memory_for_migration, create_memory_with_storage, + effective_memory_backend_name, MemoryBackendKind, }; use crate::config::Config; #[cfg(feature = "memory-postgres")] @@ -20,6 +20,7 @@ pub async fn handle_command(command: crate::MemoryCommands, config: &Config) -> } => handle_list(config, category, session, limit, offset).await, crate::MemoryCommands::Get { key } => handle_get(config, &key).await, crate::MemoryCommands::Stats => handle_stats(config).await, + crate::MemoryCommands::Reindex => handle_reindex(config).await, crate::MemoryCommands::Clear { key, category, yes } => { handle_clear(config, key, category, yes).await } @@ -198,6 +199,31 @@ async fn handle_stats(config: &Config) -> Result<()> { Ok(()) } +async fn handle_reindex(config: &Config) -> Result<()> { + // Reindex needs the full backend (with embeddings) rather than the + // lightweight CLI/migration factory. + let mem = create_memory_with_storage( + &config.memory, + Some(&config.storage.provider.config), + &config.workspace_dir, + config.api_key.as_deref(), + )?; + + println!( + "Reindexing {} backend...\n", + style(mem.name()).white().bold() + ); + + let count = mem.reindex().await?; + + println!( + "\n{} Reindex complete: {count} entries processed.", + style("✓").green().bold() + ); + + Ok(()) +} + async fn handle_clear( config: &Config, key: Option, diff --git a/src/memory/hybrid.rs b/src/memory/hybrid.rs new file mode 100644 index 000000000..328abf498 --- /dev/null +++ b/src/memory/hybrid.rs @@ -0,0 +1,296 @@ +use super::qdrant::QdrantMemory; +use super::sqlite::SqliteMemory; +use super::traits::{Memory, MemoryCategory, MemoryEntry}; +use async_trait::async_trait; + +/// Hybrid memory backend: SQLite (authoritative) + Qdrant (vector sync). +/// +/// SQLite is the source of truth for all memory entries. Qdrant is kept +/// in sync on a best-effort basis to provide semantic vector search. +/// If Qdrant is unavailable during a store, the entry is persisted safely +/// to SQLite and a warning is logged. Running `reindex` recovers Qdrant +/// consistency by re-syncing all SQLite entries. +pub struct SqliteQdrantHybridMemory { + sqlite: SqliteMemory, + qdrant: QdrantMemory, +} + +impl SqliteQdrantHybridMemory { + pub fn new(sqlite: SqliteMemory, qdrant: QdrantMemory) -> Self { + Self { sqlite, qdrant } + } +} + +#[async_trait] +impl Memory for SqliteQdrantHybridMemory { + fn name(&self) -> &str { + "sqlite_qdrant_hybrid" + } + + async fn store( + &self, + key: &str, + content: &str, + category: MemoryCategory, + session_id: Option<&str>, + ) -> anyhow::Result<()> { + // SQLite is authoritative — always store there first. + self.sqlite + .store(key, content, category.clone(), session_id) + .await?; + + // Best-effort sync to Qdrant; warn on failure. + if let Err(e) = self.qdrant.store(key, content, category, session_id).await { + tracing::warn!( + key, + error = %e, + "Qdrant sync failed during store; SQLite remains authoritative" + ); + } + + Ok(()) + } + + async fn recall( + &self, + query: &str, + limit: usize, + session_id: Option<&str>, + ) -> anyhow::Result> { + // Try Qdrant first for semantic search. + match self.qdrant.recall(query, limit, session_id).await { + Ok(results) if !results.is_empty() => Ok(results), + Ok(_) | Err(_) => { + // Fallback to SQLite keyword/hybrid search. + self.sqlite.recall(query, limit, session_id).await + } + } + } + + async fn get(&self, key: &str) -> anyhow::Result> { + // SQLite is authoritative for exact key lookups. + self.sqlite.get(key).await + } + + async fn list( + &self, + category: Option<&MemoryCategory>, + session_id: Option<&str>, + ) -> anyhow::Result> { + self.sqlite.list(category, session_id).await + } + + async fn forget(&self, key: &str) -> anyhow::Result { + let deleted = self.sqlite.forget(key).await?; + + // Best-effort delete from Qdrant. + if let Err(e) = self.qdrant.forget(key).await { + tracing::warn!( + key, + error = %e, + "Qdrant sync failed during forget; SQLite remains authoritative" + ); + } + + Ok(deleted) + } + + async fn count(&self) -> anyhow::Result { + // SQLite is authoritative for entry count. + self.sqlite.count().await + } + + async fn health_check(&self) -> bool { + // Healthy if SQLite is healthy; Qdrant unavailability is degraded + // but not fatal. + self.sqlite.health_check().await + } + + /// Two-phase reindex: + /// + /// **Phase 1** — Rebuild SQLite FTS5 and local embeddings by delegating + /// to the SQLite backend reindex. + /// + /// **Phase 2** — Re-sync every SQLite entry to Qdrant so the vector + /// index matches the authoritative store. Individual Qdrant failures + /// are logged as warnings (best-effort pattern). + async fn reindex(&self) -> anyhow::Result { + // ── Phase 1: SQLite reindex ────────────────────────────────── + tracing::info!("hybrid reindex phase 1: rebuilding SQLite FTS5 and embeddings"); + let sqlite_count = self.sqlite.reindex().await?; + tracing::info!( + "hybrid reindex phase 1 complete: {sqlite_count} embeddings rebuilt in SQLite" + ); + + // ── Phase 2: Re-sync all entries to Qdrant ────────────────── + tracing::info!("hybrid reindex phase 2: re-syncing all entries to Qdrant"); + let entries = self.sqlite.list(None, None).await?; + let total = entries.len(); + let mut synced: usize = 0; + let mut failed: usize = 0; + + for entry in &entries { + match self + .qdrant + .store( + &entry.key, + &entry.content, + entry.category.clone(), + entry.session_id.as_deref(), + ) + .await + { + Ok(()) => synced += 1, + Err(e) => { + failed += 1; + tracing::warn!( + key = %entry.key, + error = %e, + "Qdrant re-sync failed for entry; skipping" + ); + } + } + } + + tracing::info!( + "hybrid reindex phase 2 complete: {synced}/{total} entries synced to Qdrant \ + ({failed} failed)" + ); + + Ok(synced) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::memory::embeddings::{EmbeddingProvider, NoopEmbedding}; + use std::sync::Arc; + use tempfile::TempDir; + + /// Verifies the hybrid backend name is correct. + #[test] + fn hybrid_name() { + let tmp = TempDir::new().unwrap(); + let sqlite = SqliteMemory::new(tmp.path()).unwrap(); + let embedder: Arc = Arc::new(NoopEmbedding); + let qdrant = + QdrantMemory::new_lazy("http://localhost:6333", "test_collection", None, embedder); + let hybrid = SqliteQdrantHybridMemory::new(sqlite, qdrant); + assert_eq!(hybrid.name(), "sqlite_qdrant_hybrid"); + } + + /// Phase 1 (SQLite reindex) succeeds even when Qdrant is unreachable. + /// The test exercises reindex on the hybrid backend against an + /// intentionally unreachable Qdrant endpoint. + #[tokio::test] + async fn reindex_succeeds_with_unreachable_qdrant() { + let tmp = TempDir::new().unwrap(); + let sqlite = SqliteMemory::new(tmp.path()).unwrap(); + let embedder: Arc = Arc::new(NoopEmbedding); + // Point at a non-existent Qdrant instance. + let qdrant = + QdrantMemory::new_lazy("http://127.0.0.1:1", "test_collection", None, embedder); + let hybrid = SqliteQdrantHybridMemory::new(sqlite, qdrant); + + // Store entries directly via the SQLite backend so they exist + // for reindex without requiring Qdrant. + hybrid + .sqlite + .store("key1", "value one", MemoryCategory::Core, None) + .await + .unwrap(); + hybrid + .sqlite + .store("key2", "value two", MemoryCategory::Daily, None) + .await + .unwrap(); + + // Reindex should succeed — phase 1 rebuilds SQLite, phase 2 + // logs warnings for unreachable Qdrant but does not error. + let result = hybrid.reindex().await; + assert!(result.is_ok(), "reindex must not fail when Qdrant is down"); + + // SQLite data should still be intact. + let count = hybrid.count().await.unwrap(); + assert_eq!(count, 2); + } + + /// Store via hybrid falls back gracefully when Qdrant is unreachable. + #[tokio::test] + async fn store_succeeds_with_unreachable_qdrant() { + let tmp = TempDir::new().unwrap(); + let sqlite = SqliteMemory::new(tmp.path()).unwrap(); + let embedder: Arc = Arc::new(NoopEmbedding); + let qdrant = + QdrantMemory::new_lazy("http://127.0.0.1:1", "test_collection", None, embedder); + let hybrid = SqliteQdrantHybridMemory::new(sqlite, qdrant); + + hybrid + .store("lang", "Rust", MemoryCategory::Core, None) + .await + .unwrap(); + + let entry = hybrid.get("lang").await.unwrap(); + assert!(entry.is_some()); + assert_eq!(entry.unwrap().content, "Rust"); + } + + /// Recall falls back to SQLite when Qdrant is unreachable. + #[tokio::test] + async fn recall_falls_back_to_sqlite() { + let tmp = TempDir::new().unwrap(); + let sqlite = SqliteMemory::new(tmp.path()).unwrap(); + let embedder: Arc = Arc::new(NoopEmbedding); + let qdrant = + QdrantMemory::new_lazy("http://127.0.0.1:1", "test_collection", None, embedder); + let hybrid = SqliteQdrantHybridMemory::new(sqlite, qdrant); + + hybrid + .store("note", "Rust is fast", MemoryCategory::Core, None) + .await + .unwrap(); + + let results = hybrid.recall("fast", 10, None).await.unwrap(); + assert!( + results.iter().any(|e| e.content.contains("Rust is fast")), + "SQLite fallback recall should find the entry" + ); + } + + /// Forget succeeds even when Qdrant is unreachable. + #[tokio::test] + async fn forget_succeeds_with_unreachable_qdrant() { + let tmp = TempDir::new().unwrap(); + let sqlite = SqliteMemory::new(tmp.path()).unwrap(); + let embedder: Arc = Arc::new(NoopEmbedding); + let qdrant = + QdrantMemory::new_lazy("http://127.0.0.1:1", "test_collection", None, embedder); + let hybrid = SqliteQdrantHybridMemory::new(sqlite, qdrant); + + hybrid + .store("temp", "temporary data", MemoryCategory::Daily, None) + .await + .unwrap(); + + let deleted = hybrid.forget("temp").await.unwrap(); + assert!(deleted); + + let entry = hybrid.get("temp").await.unwrap(); + assert!(entry.is_none()); + } + + /// Reindex on empty DB returns zero. + #[tokio::test] + async fn reindex_empty_db() { + let tmp = TempDir::new().unwrap(); + let sqlite = SqliteMemory::new(tmp.path()).unwrap(); + let embedder: Arc = Arc::new(NoopEmbedding); + let qdrant = + QdrantMemory::new_lazy("http://127.0.0.1:1", "test_collection", None, embedder); + let hybrid = SqliteQdrantHybridMemory::new(sqlite, qdrant); + + let count = hybrid.reindex().await.unwrap(); + assert_eq!(count, 0); + } +} diff --git a/src/memory/mod.rs b/src/memory/mod.rs index 890a912ec..b42478a31 100644 --- a/src/memory/mod.rs +++ b/src/memory/mod.rs @@ -2,6 +2,7 @@ pub mod backend; pub mod chunker; pub mod cli; pub mod embeddings; +pub mod hybrid; pub mod hygiene; pub mod lucid; pub mod markdown; @@ -20,6 +21,7 @@ pub use backend::{ classify_memory_backend, default_memory_backend_key, memory_backend_profile, selectable_memory_backends, MemoryBackendKind, MemoryBackendProfile, }; +pub use hybrid::SqliteQdrantHybridMemory; pub use lucid::LucidMemory; pub use markdown::MarkdownMemory; pub use none::NoneMemory; @@ -58,6 +60,11 @@ where MemoryBackendKind::Qdrant | MemoryBackendKind::Markdown => { Ok(Box::new(MarkdownMemory::new(workspace_dir))) } + MemoryBackendKind::SqliteQdrantHybrid => { + // In simplified builder contexts (e.g. migration), fall back to + // sqlite-only since the full hybrid requires Qdrant config. + Ok(Box::new(sqlite_builder()?)) + } MemoryBackendKind::None => Ok(Box::new(NoneMemory::new())), MemoryBackendKind::Unknown => { tracing::warn!( @@ -360,6 +367,46 @@ pub fn create_memory_with_storage_and_routes( ))); } + if matches!(backend_kind, MemoryBackendKind::SqliteQdrantHybrid) { + let sqlite = build_sqlite_memory(config, workspace_dir, &resolved_embedding)?; + + let url = config + .qdrant + .url + .clone() + .filter(|s| !s.trim().is_empty()) + .or_else(|| std::env::var("QDRANT_URL").ok()) + .filter(|s| !s.trim().is_empty()) + .context( + "sqlite_qdrant_hybrid backend requires url in [memory.qdrant] or QDRANT_URL env var", + )?; + let collection = std::env::var("QDRANT_COLLECTION") + .ok() + .filter(|s| !s.trim().is_empty()) + .unwrap_or_else(|| config.qdrant.collection.clone()); + let qdrant_api_key = config + .qdrant + .api_key + .clone() + .or_else(|| std::env::var("QDRANT_API_KEY").ok()) + .filter(|s| !s.trim().is_empty()); + let qdrant_embedder: Arc = + Arc::from(embeddings::create_embedding_provider( + &resolved_embedding.provider, + resolved_embedding.api_key.as_deref(), + &resolved_embedding.model, + resolved_embedding.dimensions, + )); + let qdrant = QdrantMemory::new_lazy(&url, &collection, qdrant_api_key, qdrant_embedder); + + tracing::info!( + "📦 sqlite_qdrant_hybrid memory backend configured (qdrant url: {}, collection: {})", + url, + collection + ); + return Ok(Box::new(SqliteQdrantHybridMemory::new(sqlite, qdrant))); + } + create_memory_with_builders( &backend_name, workspace_dir, diff --git a/src/memory/sqlite.rs b/src/memory/sqlite.rs index 3e90ec6dc..ac5a04269 100644 --- a/src/memory/sqlite.rs +++ b/src/memory/sqlite.rs @@ -779,6 +779,11 @@ impl Memory for SqliteMemory { .await .unwrap_or(false) } + + async fn reindex(&self) -> anyhow::Result { + // Delegate to the inherent reindex implementation. + SqliteMemory::reindex(self).await + } } #[cfg(test)] diff --git a/src/memory/traits.rs b/src/memory/traits.rs index de72923d3..ed8eb0d16 100644 --- a/src/memory/traits.rs +++ b/src/memory/traits.rs @@ -92,6 +92,14 @@ pub trait Memory: Send + Sync { /// Health check async fn health_check(&self) -> bool; + + /// Rebuild indexes and re-sync derived data (FTS5, embeddings, vector stores). + /// + /// Returns the number of entries processed. The default implementation + /// returns an error; backends that support reindexing override this. + async fn reindex(&self) -> anyhow::Result { + anyhow::bail!("Reindex not supported by {} backend", self.name()) + } } #[cfg(test)]