feat(memory): add mem0 (OpenMemory) backend integration (#3965)

* feat(memory): add mem0 (OpenMemory) backend integration

- Implement Mem0Memory struct with full Memory trait
- Add history() audit trail, recall_filtered() with time/metadata filters
- Add store_procedural() for conversation trace extraction
- Add ProceduralMessage type to Memory trait with default no-op
- Feature-gated behind `memory-mem0` flag
- 9 unit tests covering edge cases

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* style: apply cargo fmt

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(memory): add extraction_prompt config, deploy scripts, and timing instrumentation

- Add `extraction_prompt` field to `Mem0Config` for custom LLM fact
  extraction prompts (e.g. Cantonese/Chinese content), with
  `MEM0_EXTRACTION_PROMPT` env var fallback
- Pass `custom_instructions` in mem0 store requests so the server
  uses the client-supplied prompt over its default
- Add timing instrumentation to channel message pipeline
  (mem_recall_ms, elapsed_before_llm_ms, llm_call_ms, total_ms)
- Add `deploy/mem0/` with self-hosted mem0 + reranker GPU server
  scripts, fully configurable via environment variables
- Update config reference docs (EN, zh-CN, VI) with `[memory.mem0]`
  subsection

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

# Conflicts:
#	src/channels/mod.rs

* chore: remove accidentally staged worktree from index

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
khhjoe 2026-03-21 07:22:44 +09:00 committed by GitHub
parent 9e8a478254
commit 00209dd899
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 1466 additions and 20 deletions

View File

@ -0,0 +1,97 @@
# Mem0 Integration: Dual-Scope Recall + Per-Turn Memory
## Context
Mem0 auto-save works but the integration is missing key features from mem0 best practices: per-turn recall, multi-level scoping, and proper context injection. This causes the bot to "forget" on follow-up turns and not differentiate users.
## What's Missing (vs mem0 docs)
1. **Per-turn recall** — only first turn gets memory context, follow-ups get nothing
2. **Dual-scope** — no sender vs group distinction. All memories use single hardcoded `user_id`
3. **System prompt injection** — memory prepended to user message (pollutes session history)
4. **`agent_id` scoping** — mem0 supports agent-level patterns, not used
## Changes
### 1. `src/memory/mem0.rs` — Use session_id for multi-level scoping
Map zeroclaw's `session_id` param to mem0's `user_id`. This enables per-user and per-group memory namespaces without changing the `Memory` trait.
```rust
// Add helper:
fn effective_user_id(&self, session_id: Option<&str>) -> &str {
session_id.filter(|s| !s.is_empty()).unwrap_or(&self.user_id)
}
// In store(): use effective_user_id(session_id) as mem0 user_id
// In recall(): use effective_user_id(session_id) as mem0 user_id
// In list(): use effective_user_id(session_id) as mem0 user_id
```
### 2. `src/channels/mod.rs` ~line 2229 — Per-turn dual-scope recall
Remove `if !had_prior_history` gate. Always recall from both sender scope and group scope (for group chats).
```rust
// Detect group chat
let is_group = msg.reply_target.contains("@g.us")
|| msg.reply_target.starts_with("group:");
// Sender-scope recall (always)
let sender_context = build_memory_context(
ctx.memory.as_ref(), &msg.content, ctx.min_relevance_score,
Some(&msg.sender),
).await;
// Group-scope recall (groups only)
let group_context = if is_group {
build_memory_context(
ctx.memory.as_ref(), &msg.content, ctx.min_relevance_score,
Some(&history_key),
).await
} else {
String::new()
};
// Merge (deduplicate by checking substring overlap)
let memory_context = merge_memory_contexts(&sender_context, &group_context);
```
### 3. `src/channels/mod.rs` ~line 2244 — Inject into system prompt
Move memory context from user message to system prompt. Re-fetched each turn, doesn't pollute session.
```rust
let mut system_prompt = build_channel_system_prompt(...);
if !memory_context.is_empty() {
system_prompt.push_str(&format!("\n\n{memory_context}"));
}
let mut history = vec![ChatMessage::system(system_prompt)];
```
### 4. `src/channels/mod.rs` — Dual-scope auto-save
Find existing auto-save call. For group messages, store twice:
- `store(key, content, category, Some(&msg.sender))` — personal facts
- `store(key, content, category, Some(&history_key))` — group context
Both async, non-blocking. DMs only store to sender scope.
### 5. `src/memory/mem0.rs` — Add `agent_id` support (optional)
Pass `self.app_name` as `agent_id` param to mem0 API for agent behavior tracking.
## Files to Modify
1. `src/memory/mem0.rs` — session_id → user_id mapping
2. `src/channels/mod.rs` — per-turn recall, dual-scope, system prompt injection, dual-scope save
## Verification
1. `cargo check --features whatsapp-web,memory-mem0`
2. `cargo test --features whatsapp-web,memory-mem0`
3. Deploy to Synology
4. Test DM: "我鍾意食壽司" → next turn "我鍾意食咩" → should recall
5. Test group: Joe says "我鍾意食壽司" → someone else asks "Joe 鍾意食咩" → should recall from group scope
6. Check mem0 server logs: GET with `user_id=sender` AND `user_id=group_key`
7. Check mem0 server logs: POST with both user_ids for group messages

View File

@ -225,6 +225,8 @@ channel-matrix = ["dep:matrix-sdk"]
channel-lark = ["dep:prost"]
channel-feishu = ["channel-lark"] # Alias for Feishu users (Lark and Feishu are the same platform)
memory-postgres = ["dep:postgres"]
# memory-mem0 = Mem0 (OpenMemory) memory backend via REST API
memory-mem0 = []
observability-prometheus = ["dep:prometheus"]
observability-otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp"]
peripheral-rpi = ["rppal"]

80
deploy/mem0/mem0-gpu-start.sh Executable file
View File

@ -0,0 +1,80 @@
#!/bin/bash
# Start mem0 + reranker GPU container for ZeroClaw memory backend.
#
# Required env vars:
# MEM0_LLM_API_KEY or ZAI_API_KEY — API key for the LLM used in fact extraction
#
# Optional env vars (with defaults):
# MEM0_LLM_PROVIDER — mem0 LLM provider (default: "openai" i.e. OpenAI-compatible)
# MEM0_LLM_MODEL — LLM model for fact extraction (default: "glm-5-turbo")
# MEM0_LLM_BASE_URL — LLM API base URL (default: "https://api.z.ai/api/coding/paas/v4")
# MEM0_EMBEDDER_MODEL — embedding model (default: "BAAI/bge-m3")
# MEM0_EMBEDDER_DIMS — embedding dimensions (default: "1024")
# MEM0_EMBEDDER_DEVICE — "cuda", "cpu", or "auto" (default: "cuda")
# MEM0_VECTOR_COLLECTION — Qdrant collection name (default: "zeroclaw_mem0")
# RERANKER_MODEL — reranker model (default: "BAAI/bge-reranker-v2-m3")
# RERANKER_DEVICE — "cuda" or "cpu" (default: "cuda")
# MEM0_PORT — mem0 server port (default: 8765)
# RERANKER_PORT — reranker server port (default: 8678)
# CONTAINER_IMAGE — base container image (default: docker.io/kyuz0/amd-strix-halo-comfyui:latest)
# CONTAINER_NAME — container name (default: mem0-gpu)
# DATA_DIR — host path for Qdrant data (default: ~/mem0-data)
# SCRIPT_DIR — host path for server scripts (default: directory of this script)
set -e
# Resolve script directory for mounting server scripts
SCRIPT_DIR="${SCRIPT_DIR:-$(cd "$(dirname "$0")" && pwd)}"
# API key — accept either name
export MEM0_LLM_API_KEY="${MEM0_LLM_API_KEY:-${ZAI_API_KEY:?MEM0_LLM_API_KEY or ZAI_API_KEY must be set}}"
# Defaults
MEM0_LLM_MODEL="${MEM0_LLM_MODEL:-glm-5-turbo}"
MEM0_LLM_BASE_URL="${MEM0_LLM_BASE_URL:-https://api.z.ai/api/coding/paas/v4}"
MEM0_PORT="${MEM0_PORT:-8765}"
RERANKER_PORT="${RERANKER_PORT:-8678}"
CONTAINER_IMAGE="${CONTAINER_IMAGE:-docker.io/kyuz0/amd-strix-halo-comfyui:latest}"
CONTAINER_NAME="${CONTAINER_NAME:-mem0-gpu}"
DATA_DIR="${DATA_DIR:-$HOME/mem0-data}"
# Stop existing CPU services (if any)
kill -9 $(pgrep -f "mem0-server.py") 2>/dev/null || true
kill -9 $(pgrep -f "reranker-server.py") 2>/dev/null || true
# Stop existing container
podman stop "$CONTAINER_NAME" 2>/dev/null || true
podman rm "$CONTAINER_NAME" 2>/dev/null || true
podman run -d --name "$CONTAINER_NAME" \
--device /dev/dri --device /dev/kfd \
--group-add video --group-add render \
--restart unless-stopped \
-p "$MEM0_PORT:$MEM0_PORT" -p "$RERANKER_PORT:$RERANKER_PORT" \
-v "$DATA_DIR":/root/mem0-data:Z \
-v "$SCRIPT_DIR/mem0-server.py":/app/mem0-server.py:ro,Z \
-v "$SCRIPT_DIR/reranker-server.py":/app/reranker-server.py:ro,Z \
-v "$HOME/.cache/huggingface":/root/.cache/huggingface:Z \
-e MEM0_LLM_API_KEY="$MEM0_LLM_API_KEY" \
-e ZAI_API_KEY="$MEM0_LLM_API_KEY" \
-e MEM0_LLM_MODEL="$MEM0_LLM_MODEL" \
-e MEM0_LLM_BASE_URL="$MEM0_LLM_BASE_URL" \
${MEM0_LLM_PROVIDER:+-e MEM0_LLM_PROVIDER="$MEM0_LLM_PROVIDER"} \
${MEM0_EMBEDDER_MODEL:+-e MEM0_EMBEDDER_MODEL="$MEM0_EMBEDDER_MODEL"} \
${MEM0_EMBEDDER_DIMS:+-e MEM0_EMBEDDER_DIMS="$MEM0_EMBEDDER_DIMS"} \
${MEM0_EMBEDDER_DEVICE:+-e MEM0_EMBEDDER_DEVICE="$MEM0_EMBEDDER_DEVICE"} \
${MEM0_VECTOR_COLLECTION:+-e MEM0_VECTOR_COLLECTION="$MEM0_VECTOR_COLLECTION"} \
${RERANKER_MODEL:+-e RERANKER_MODEL="$RERANKER_MODEL"} \
${RERANKER_DEVICE:+-e RERANKER_DEVICE="$RERANKER_DEVICE"} \
-e RERANKER_PORT="$RERANKER_PORT" \
-e RERANKER_URL="http://127.0.0.1:$RERANKER_PORT/rerank" \
-e TORCH_ROCM_AOTRITON_ENABLE_EXPERIMENTAL=1 \
-e HOME=/root \
"$CONTAINER_IMAGE" \
bash -c "pip install -q FlagEmbedding mem0ai flask httpx qdrant-client 2>&1 | tail -3; echo '=== Starting reranker (GPU) on :$RERANKER_PORT ==='; python3 /app/reranker-server.py & sleep 3; echo '=== Starting mem0 (GPU) on :$MEM0_PORT ==='; exec python3 /app/mem0-server.py"
echo "Container started, waiting for init..."
sleep 15
echo "=== Container logs ==="
podman logs "$CONTAINER_NAME" 2>&1 | tail -25
echo "=== Port check ==="
ss -tlnp | grep "$MEM0_PORT\|$RERANKER_PORT" || echo "Ports not yet ready, check: podman logs $CONTAINER_NAME"

288
deploy/mem0/mem0-server.py Normal file
View File

@ -0,0 +1,288 @@
"""Minimal OpenMemory-compatible REST server wrapping mem0 Python SDK."""
import asyncio
import json, os, uuid, httpx
from datetime import datetime, timezone
from fastapi import FastAPI, Query
from pydantic import BaseModel
from typing import Optional
from mem0 import Memory
app = FastAPI()
RERANKER_URL = os.environ.get("RERANKER_URL", "http://127.0.0.1:8678/rerank")
CUSTOM_EXTRACTION_PROMPT = """You are a memory extraction specialist for a Cantonese/Chinese chat assistant.
Extract ONLY important, persistent facts from the conversation. Rules:
1. Extract personal preferences, habits, relationships, names, locations
2. Extract decisions, plans, and commitments people make
3. SKIP small talk, greetings, reactions ("ok", "哈哈", "係呀")
4. SKIP temporary states ("我依家食緊飯") unless they reveal a habit
5. Keep facts in the ORIGINAL language (Cantonese/Chinese/English)
6. For each fact, note WHO it's about (use their name or identifier if known)
7. Merge/update existing facts rather than creating duplicates
Return a list of facts in JSON format: {"facts": ["fact1", "fact2", ...]}
"""
PROCEDURAL_EXTRACTION_PROMPT = """You are a procedural memory specialist for an AI assistant.
Extract HOW-TO patterns and reusable procedures from the conversation trace. Rules:
1. Identify step-by-step procedures the assistant followed to accomplish a task
2. Extract tool usage patterns: which tools were called, in what order, with what arguments
3. Capture decision points: why the assistant chose one approach over another
4. Note error-recovery patterns: what failed, how it was fixed
5. Keep the procedure generic enough to apply to similar future tasks
6. Preserve technical details (commands, file paths, API calls) that are reusable
7. SKIP greetings, small talk, and conversational filler
8. Format each procedure as: "To [goal]: [step1] -> [step2] -> ... -> [result]"
Return a list of procedures in JSON format: {"facts": ["procedure1", "procedure2", ...]}
"""
# ── Configurable via environment variables ─────────────────────────
# LLM (for fact extraction when infer=true)
MEM0_LLM_PROVIDER = os.environ.get("MEM0_LLM_PROVIDER", "openai") # "openai" (compatible), "anthropic", etc.
MEM0_LLM_MODEL = os.environ.get("MEM0_LLM_MODEL", "glm-5-turbo")
MEM0_LLM_API_KEY = os.environ.get("MEM0_LLM_API_KEY") or os.environ.get("ZAI_API_KEY", "")
MEM0_LLM_BASE_URL = os.environ.get("MEM0_LLM_BASE_URL", "https://api.z.ai/api/coding/paas/v4")
# Embedder
MEM0_EMBEDDER_PROVIDER = os.environ.get("MEM0_EMBEDDER_PROVIDER", "huggingface") # "huggingface", "openai", etc.
MEM0_EMBEDDER_MODEL = os.environ.get("MEM0_EMBEDDER_MODEL", "BAAI/bge-m3")
MEM0_EMBEDDER_DIMS = int(os.environ.get("MEM0_EMBEDDER_DIMS", "1024"))
MEM0_EMBEDDER_DEVICE = os.environ.get("MEM0_EMBEDDER_DEVICE", "cuda") # "cuda", "cpu", "auto"
# Vector store
MEM0_VECTOR_PROVIDER = os.environ.get("MEM0_VECTOR_PROVIDER", "qdrant") # "qdrant", "chroma", etc.
MEM0_VECTOR_COLLECTION = os.environ.get("MEM0_VECTOR_COLLECTION", "zeroclaw_mem0")
MEM0_VECTOR_PATH = os.environ.get("MEM0_VECTOR_PATH", os.path.expanduser("~/mem0-data/qdrant"))
config = {
"llm": {
"provider": MEM0_LLM_PROVIDER,
"config": {
"model": MEM0_LLM_MODEL,
"api_key": MEM0_LLM_API_KEY,
"openai_base_url": MEM0_LLM_BASE_URL,
},
},
"embedder": {
"provider": MEM0_EMBEDDER_PROVIDER,
"config": {
"model": MEM0_EMBEDDER_MODEL,
"embedding_dims": MEM0_EMBEDDER_DIMS,
"model_kwargs": {"device": MEM0_EMBEDDER_DEVICE},
},
},
"vector_store": {
"provider": MEM0_VECTOR_PROVIDER,
"config": {
"collection_name": MEM0_VECTOR_COLLECTION,
"embedding_model_dims": MEM0_EMBEDDER_DIMS,
"path": MEM0_VECTOR_PATH,
},
},
"custom_fact_extraction_prompt": CUSTOM_EXTRACTION_PROMPT,
}
m = Memory.from_config(config)
def rerank_results(query: str, items: list, top_k: int = 10) -> list:
"""Rerank search results using bge-reranker-v2-m3."""
if not items:
return items
documents = [item.get("memory", "") for item in items]
try:
resp = httpx.post(
RERANKER_URL,
json={"query": query, "documents": documents, "top_k": top_k},
timeout=10.0,
)
resp.raise_for_status()
ranked = resp.json().get("results", [])
return [items[r["index"]] for r in ranked]
except Exception as e:
print(f"Reranker failed, using original order: {e}")
return items
class AddMemoryRequest(BaseModel):
user_id: str
text: str
metadata: Optional[dict] = None
infer: bool = True
app: Optional[str] = None
custom_instructions: Optional[str] = None
@app.post("/api/v1/memories/")
async def add_memory(req: AddMemoryRequest):
# Use client-supplied prompt, fall back to server default, then mem0 SDK default
prompt = req.custom_instructions or CUSTOM_EXTRACTION_PROMPT
result = await asyncio.to_thread(m.add, req.text, user_id=req.user_id, metadata=req.metadata or {}, prompt=prompt)
return {"id": str(uuid.uuid4()), "status": "ok", "result": result}
class ProceduralMemoryRequest(BaseModel):
user_id: str
messages: list[dict]
metadata: Optional[dict] = None
@app.post("/api/v1/memories/procedural")
async def add_procedural_memory(req: ProceduralMemoryRequest):
"""Store a conversation trace as procedural memory.
Accepts a list of messages (role/content dicts) representing a full
conversation turn including tool calls, then uses mem0's native
procedural memory extraction to learn reusable "how to" patterns.
"""
# Build metadata with procedural type marker
meta = {"type": "procedural"}
if req.metadata:
meta.update(req.metadata)
# Use mem0's native message list support + procedural prompt
result = await asyncio.to_thread(m.add,
req.messages,
user_id=req.user_id,
metadata=meta,
prompt=PROCEDURAL_EXTRACTION_PROMPT,
)
return {"id": str(uuid.uuid4()), "status": "ok", "result": result}
def _parse_mem0_results(raw_results) -> list:
raw = raw_results.get("results", raw_results) if isinstance(raw_results, dict) else raw_results
items = []
for r in raw:
item = r if isinstance(r, dict) else {"memory": str(r)}
items.append({
"id": item.get("id", str(uuid.uuid4())),
"memory": item.get("memory", item.get("text", "")),
"created_at": item.get("created_at", datetime.now(timezone.utc).isoformat()),
"metadata_": item.get("metadata", {}),
})
return items
def _parse_iso_timestamp(value: str) -> Optional[datetime]:
"""Parse an ISO 8601 timestamp string, returning None on failure."""
try:
dt = datetime.fromisoformat(value)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except (ValueError, TypeError):
return None
def _item_created_at(item: dict) -> Optional[datetime]:
"""Extract created_at from an item as a timezone-aware datetime."""
raw = item.get("created_at")
if raw is None:
return None
if isinstance(raw, datetime):
if raw.tzinfo is None:
raw = raw.replace(tzinfo=timezone.utc)
return raw
return _parse_iso_timestamp(str(raw))
def _apply_post_filters(
items: list,
created_after: Optional[str],
created_before: Optional[str],
) -> list:
"""Filter items by created_after / created_before timestamps (post-query)."""
after_dt = _parse_iso_timestamp(created_after) if created_after else None
before_dt = _parse_iso_timestamp(created_before) if created_before else None
if after_dt is None and before_dt is None:
return items
filtered = []
for item in items:
ts = _item_created_at(item)
if ts is None:
# Keep items without a parseable timestamp
filtered.append(item)
continue
if after_dt and ts < after_dt:
continue
if before_dt and ts > before_dt:
continue
filtered.append(item)
return filtered
@app.get("/api/v1/memories/")
async def list_or_search_memories(
user_id: str = Query(...),
search_query: Optional[str] = Query(None),
size: int = Query(10),
rerank: bool = Query(True),
created_after: Optional[str] = Query(None),
created_before: Optional[str] = Query(None),
metadata_filter: Optional[str] = Query(None),
):
# Build mem0 SDK filters dict from metadata_filter JSON param
sdk_filters = None
if metadata_filter:
try:
sdk_filters = json.loads(metadata_filter)
except json.JSONDecodeError:
sdk_filters = None
if search_query:
# Fetch more results than needed so reranker has candidates to work with
fetch_size = min(size * 3, 50)
results = await asyncio.to_thread(m.search,
search_query,
user_id=user_id,
limit=fetch_size,
filters=sdk_filters,
)
items = _parse_mem0_results(results)
items = _apply_post_filters(items, created_after, created_before)
if rerank and items:
items = rerank_results(search_query, items, top_k=size)
else:
items = items[:size]
return {"items": items, "total": len(items)}
else:
results = await asyncio.to_thread(m.get_all,user_id=user_id, filters=sdk_filters)
items = _parse_mem0_results(results)
items = _apply_post_filters(items, created_after, created_before)
return {"items": items, "total": len(items)}
@app.delete("/api/v1/memories/{memory_id}")
async def delete_memory(memory_id: str):
try:
await asyncio.to_thread(m.delete, memory_id)
except Exception:
pass
return {"status": "ok"}
@app.get("/api/v1/memories/{memory_id}/history")
async def get_memory_history(memory_id: str):
"""Return the edit history of a specific memory."""
try:
history = await asyncio.to_thread(m.history, memory_id)
# Normalize to list of dicts
entries = []
raw = history if isinstance(history, list) else history.get("results", history) if isinstance(history, dict) else [history]
for h in raw:
entry = h if isinstance(h, dict) else {"event": str(h)}
entries.append(entry)
return {"memory_id": memory_id, "history": entries}
except Exception as e:
return {"memory_id": memory_id, "history": [], "error": str(e)}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8765)

View File

@ -0,0 +1,50 @@
from flask import Flask, request, jsonify
from FlagEmbedding import FlagReranker
import os, torch
app = Flask(__name__)
reranker = None
# ── Configurable via environment variables ─────────────────────────
RERANKER_MODEL = os.environ.get("RERANKER_MODEL", "BAAI/bge-reranker-v2-m3")
RERANKER_DEVICE = os.environ.get("RERANKER_DEVICE", "cuda" if torch.cuda.is_available() else "cpu")
RERANKER_PORT = int(os.environ.get("RERANKER_PORT", "8678"))
def get_reranker():
global reranker
if reranker is None:
reranker = FlagReranker(RERANKER_MODEL, use_fp16=True, device=RERANKER_DEVICE)
return reranker
@app.route('/rerank', methods=['POST'])
def rerank():
data = request.json
query = data.get('query', '')
documents = data.get('documents', [])
top_k = data.get('top_k', len(documents))
if not query or not documents:
return jsonify({'error': 'query and documents required'}), 400
pairs = [[query, doc] for doc in documents]
scores = get_reranker().compute_score(pairs)
if isinstance(scores, float):
scores = [scores]
results = sorted(
[{'index': i, 'document': doc, 'score': score}
for i, (doc, score) in enumerate(zip(documents, scores))],
key=lambda x: x['score'], reverse=True
)[:top_k]
return jsonify({'results': results})
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'ok', 'model': RERANKER_MODEL, 'device': RERANKER_DEVICE})
if __name__ == '__main__':
print(f'Loading reranker model ({RERANKER_MODEL}) on {RERANKER_DEVICE}...')
get_reranker()
print(f'Reranker server ready on :{RERANKER_PORT}')
app.run(host='0.0.0.0', port=RERANKER_PORT)

View File

@ -411,6 +411,30 @@ allowed_roots = [\"~/Desktop/projects\", \"/opt/shared-repo\"]
- 内存上下文注入忽略旧的 `assistant_resp*` 自动保存键,以防止旧模型生成的摘要被视为事实。
### `[memory.mem0]`
Mem0 (OpenMemory) 后端 — 连接自托管 mem0 服务器,提供基于向量的记忆存储和 LLM 事实提取。构建时需要 `memory-mem0` feature flag配置需设置 `backend = "mem0"`
| 键 | 默认值 | 环境变量 | 用途 |
|---|---|---|---|
| `url` | `http://localhost:8765` | `MEM0_URL` | OpenMemory 服务器地址 |
| `user_id` | `zeroclaw` | `MEM0_USER_ID` | 记忆作用域的用户 ID |
| `app_name` | `zeroclaw` | `MEM0_APP_NAME` | 在 mem0 中注册的应用名称 |
| `infer` | `true` | — | 使用 LLM 从存储文本中提取事实 (`true`) 或原样存储 (`false`) |
| `extraction_prompt` | 未设置 | `MEM0_EXTRACTION_PROMPT` | 自定义 LLM 事实提取提示词(如适用于非英文内容) |
```toml
[memory]
backend = "mem0"
[memory.mem0]
url = "http://192.168.0.171:8765"
user_id = "zeroclaw-bot"
extraction_prompt = "用原始语言提取事实..."
```
服务器部署脚本位于 `deploy/mem0/`
## `[[model_routes]]``[[embedding_routes]]`
使用路由提示,以便集成可以在模型 ID 演变时保持稳定的名称。

View File

@ -463,6 +463,30 @@ Notes:
- Memory context injection ignores legacy `assistant_resp*` auto-save keys to prevent old model-authored summaries from being treated as facts.
### `[memory.mem0]`
Mem0 (OpenMemory) backend — connects to a self-hosted mem0 server for vector-based memory with LLM-powered fact extraction. Requires feature flag `memory-mem0` at build time and `backend = "mem0"` in config.
| Key | Default | Env var | Purpose |
|---|---|---|---|
| `url` | `http://localhost:8765` | `MEM0_URL` | OpenMemory server URL |
| `user_id` | `zeroclaw` | `MEM0_USER_ID` | User ID for scoping memories |
| `app_name` | `zeroclaw` | `MEM0_APP_NAME` | Application name registered in mem0 |
| `infer` | `true` | — | Use LLM to extract facts from stored text (`true`) or store raw (`false`) |
| `extraction_prompt` | unset | `MEM0_EXTRACTION_PROMPT` | Custom prompt for LLM fact extraction (e.g. for non-English content) |
```toml
[memory]
backend = "mem0"
[memory.mem0]
url = "http://192.168.0.171:8765"
user_id = "zeroclaw-bot"
extraction_prompt = "Extract facts in the original language..."
```
Server deployment scripts are in `deploy/mem0/`.
## `[[model_routes]]` and `[[embedding_routes]]`
Use route hints so integrations can keep stable names while model IDs evolve.

View File

@ -337,6 +337,30 @@ Lưu ý:
- Chèn ngữ cảnh memory bỏ qua khóa auto-save `assistant_resp*` kiểu cũ để tránh tóm tắt do model tạo bị coi là sự thật.
### `[memory.mem0]`
Backend Mem0 (OpenMemory) — kết nối đến server mem0 tự host, cung cấp bộ nhớ vector với trích xuất sự kiện bằng LLM. Cần feature flag `memory-mem0` khi build và `backend = "mem0"` trong config.
| Khóa | Mặc định | Biến môi trường | Mục đích |
|---|---|---|---|
| `url` | `http://localhost:8765` | `MEM0_URL` | URL server OpenMemory |
| `user_id` | `zeroclaw` | `MEM0_USER_ID` | User ID để phân vùng memory |
| `app_name` | `zeroclaw` | `MEM0_APP_NAME` | Tên ứng dụng đăng ký trong mem0 |
| `infer` | `true` | — | Dùng LLM trích xuất sự kiện từ text (`true`) hoặc lưu nguyên (`false`) |
| `extraction_prompt` | chưa đặt | `MEM0_EXTRACTION_PROMPT` | Prompt tùy chỉnh cho trích xuất sự kiện LLM (vd: cho nội dung không phải tiếng Anh) |
```toml
[memory]
backend = "mem0"
[memory.mem0]
url = "http://192.168.0.171:8765"
user_id = "zeroclaw-bot"
extraction_prompt = "Trích xuất sự kiện bằng ngôn ngữ gốc..."
```
Script triển khai server nằm trong `deploy/mem0/`.
## `[[model_routes]]``[[embedding_routes]]`
Route hint giúp tên tích hợp ổn định khi model ID thay đổi.

View File

@ -2203,30 +2203,63 @@ async fn process_channel_message(
);
}
// Only enrich with memory context when there is no prior conversation
// history. Follow-up turns already include context from previous messages.
if !had_prior_history {
let memory_context = build_memory_context(
// ── Dual-scope memory recall ──────────────────────────────────
// Always recall before each LLM call (not just first turn).
// For group chats: merge sender-scope + group-scope memories.
// For DMs: sender-scope only.
let is_group_chat =
msg.reply_target.contains("@g.us") || msg.reply_target.starts_with("group:");
let mem_recall_start = Instant::now();
let sender_memory_fut = build_memory_context(
ctx.memory.as_ref(),
&msg.content,
ctx.min_relevance_score,
Some(&msg.sender),
);
let (sender_memory, group_memory) = if is_group_chat {
let group_memory_fut = build_memory_context(
ctx.memory.as_ref(),
&msg.content,
ctx.min_relevance_score,
Some(&history_key),
)
.await;
if let Some(last_turn) = prior_turns.last_mut() {
if last_turn.role == "user" && !memory_context.is_empty() {
last_turn.content = format!("{memory_context}{}", msg.content);
}
}
}
);
tokio::join!(sender_memory_fut, group_memory_fut)
} else {
(sender_memory_fut.await, String::new())
};
#[allow(clippy::cast_possible_truncation)]
let mem_recall_ms = mem_recall_start.elapsed().as_millis() as u64;
tracing::info!(
mem_recall_ms,
sender_empty = sender_memory.is_empty(),
group_empty = group_memory.is_empty(),
"⏱ Memory recall completed"
);
// Merge sender + group memories, avoiding duplicates
let memory_context = if group_memory.is_empty() {
sender_memory
} else if sender_memory.is_empty() {
group_memory
} else {
format!("{sender_memory}\n{group_memory}")
};
// Use refreshed system prompt for new sessions (master's /new support),
// and inject memory into system prompt (not user message) so it
// doesn't pollute session history and is re-fetched each turn.
let base_system_prompt = if had_prior_history {
ctx.system_prompt.as_str().to_string()
} else {
refreshed_new_session_system_prompt(ctx.as_ref())
};
let system_prompt =
let mut system_prompt =
build_channel_system_prompt(&base_system_prompt, &msg.channel, &msg.reply_target);
if !memory_context.is_empty() {
let _ = write!(system_prompt, "\n\n{memory_context}");
}
let mut history = vec![ChatMessage::system(system_prompt)];
history.extend(prior_turns);
let use_streaming = target_channel
@ -2360,6 +2393,10 @@ async fn process_channel_message(
let timeout_budget_secs =
channel_message_timeout_budget_secs(ctx.message_timeout_secs, ctx.max_tool_iterations);
let llm_call_start = Instant::now();
#[allow(clippy::cast_possible_truncation)]
let elapsed_before_llm_ms = started_at.elapsed().as_millis() as u64;
tracing::info!(elapsed_before_llm_ms, "⏱ Starting LLM call");
let llm_result = tokio::select! {
() = cancellation_token.cancelled() => LlmExecutionResult::Cancelled,
result = tokio::time::timeout(
@ -2410,6 +2447,12 @@ async fn process_channel_message(
let _ = handle.await;
}
#[allow(clippy::cast_possible_truncation)]
let llm_call_ms = llm_call_start.elapsed().as_millis() as u64;
#[allow(clippy::cast_possible_truncation)]
let total_ms = started_at.elapsed().as_millis() as u64;
tracing::info!(llm_call_ms, total_ms, "⏱ LLM call completed");
if let Some(token) = typing_cancellation.as_ref() {
token.cancel();
}
@ -2518,6 +2561,7 @@ async fn process_channel_message(
} else {
sanitized_response
};
runtime_trace::record_event(
"channel_message_outbound",
Some(msg.channel.as_str()),
@ -2592,7 +2636,7 @@ async fn process_channel_message(
}
} else if let Err(e) = channel
.send(
&SendMessage::new(delivered_response, &msg.reply_target)
&SendMessage::new(&delivered_response, &msg.reply_target)
.in_thread(msg.thread_ts.clone()),
)
.await
@ -8223,10 +8267,12 @@ BTC is currently around $65,000 based on latest tool output."#
.unwrap_or_else(|e| e.into_inner());
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].len(), 2);
// Memory context is injected into the system prompt, not the user message.
assert_eq!(calls[0][0].0, "system");
assert!(calls[0][0].1.contains("[Memory context]"));
assert!(calls[0][0].1.contains("Age is 45"));
assert_eq!(calls[0][1].0, "user");
assert!(calls[0][1].1.contains("[Memory context]"));
assert!(calls[0][1].1.contains("Age is 45"));
assert!(calls[0][1].1.contains("hello"));
assert_eq!(calls[0][1].1, "hello");
let histories = runtime_ctx
.conversation_histories

View File

@ -3501,6 +3501,77 @@ impl Default for QdrantConfig {
}
}
/// Configuration for the mem0 (OpenMemory) memory backend.
///
/// Connects to a self-hosted OpenMemory server via its REST API.
/// Deploy OpenMemory with `docker compose up` from the mem0 repo,
/// then point `url` at the API (default `http://localhost:8765`).
///
/// ```toml
/// [memory]
/// backend = "mem0"
///
/// [memory.mem0]
/// url = "http://localhost:8765"
/// user_id = "zeroclaw"
/// app_name = "zeroclaw"
/// ```
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct Mem0Config {
/// OpenMemory server URL (e.g. `http://localhost:8765`).
/// Falls back to `MEM0_URL` env var if not set.
#[serde(default = "default_mem0_url")]
pub url: String,
/// User ID for scoping memories within mem0.
/// Falls back to `MEM0_USER_ID` env var, or default `"zeroclaw"`.
#[serde(default = "default_mem0_user_id")]
pub user_id: String,
/// Application name registered in mem0.
/// Falls back to `MEM0_APP_NAME` env var, or default `"zeroclaw"`.
#[serde(default = "default_mem0_app_name")]
pub app_name: String,
/// Whether mem0 should use its built-in LLM to extract facts from
/// stored text (`infer = true`) or store raw text as-is (`false`).
#[serde(default = "default_mem0_infer")]
pub infer: bool,
/// Custom prompt for guiding LLM-based fact extraction when `infer = true`.
/// Useful for non-English content (e.g. Cantonese/Chinese).
/// Falls back to `MEM0_EXTRACTION_PROMPT` env var.
/// If unset, the mem0 server uses its built-in default prompt.
#[serde(default = "default_mem0_extraction_prompt")]
pub extraction_prompt: Option<String>,
}
fn default_mem0_url() -> String {
std::env::var("MEM0_URL").unwrap_or_else(|_| "http://localhost:8765".into())
}
fn default_mem0_user_id() -> String {
std::env::var("MEM0_USER_ID").unwrap_or_else(|_| "zeroclaw".into())
}
fn default_mem0_app_name() -> String {
std::env::var("MEM0_APP_NAME").unwrap_or_else(|_| "zeroclaw".into())
}
fn default_mem0_infer() -> bool {
true
}
fn default_mem0_extraction_prompt() -> Option<String> {
std::env::var("MEM0_EXTRACTION_PROMPT")
.ok()
.filter(|s| !s.trim().is_empty())
}
impl Default for Mem0Config {
fn default() -> Self {
Self {
url: default_mem0_url(),
user_id: default_mem0_user_id(),
app_name: default_mem0_app_name(),
infer: default_mem0_infer(),
extraction_prompt: default_mem0_extraction_prompt(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[allow(clippy::struct_excessive_bools)]
pub struct MemoryConfig {
@ -3586,6 +3657,13 @@ pub struct MemoryConfig {
/// Only used when `backend = "qdrant"`.
#[serde(default)]
pub qdrant: QdrantConfig,
// ── Mem0 backend options ─────────────────────────────────
/// Configuration for mem0 (OpenMemory) backend.
/// Only used when `backend = "mem0"`.
/// Requires `--features memory-mem0` at build time.
#[serde(default)]
pub mem0: Mem0Config,
}
fn default_embedding_provider() -> String {
@ -3661,6 +3739,7 @@ impl Default for MemoryConfig {
auto_hydrate: true,
sqlite_open_timeout_secs: None,
qdrant: QdrantConfig::default(),
mem0: Mem0Config::default(),
}
}
}
@ -6907,8 +6986,45 @@ impl Config {
)
.context("Failed to deserialize config file")?;
// Warn about each unknown config key
// Warn about each unknown config key.
// serde_ignored + #[serde(default)] on nested structs can produce
// false positives: parent-level fields get re-reported under the
// nested key (e.g. "memory.mem0.auto_hydrate" even though
// auto_hydrate belongs to MemoryConfig, not Mem0Config). We
// suppress these by checking whether the leaf key is a known field
// on the parent struct.
let known_memory_fields: &[&str] = &[
"backend",
"auto_save",
"hygiene_enabled",
"archive_after_days",
"purge_after_days",
"conversation_retention_days",
"embedding_provider",
"embedding_model",
"embedding_dimensions",
"vector_weight",
"keyword_weight",
"min_relevance_score",
"embedding_cache_size",
"chunk_max_tokens",
"response_cache_enabled",
"response_cache_ttl_minutes",
"response_cache_max_entries",
"response_cache_hot_entries",
"snapshot_enabled",
"snapshot_on_hygiene",
"auto_hydrate",
"sqlite_open_timeout_secs",
];
for path in ignored_paths {
// Skip false positives from nested memory sub-sections
if path.starts_with("memory.mem0.") || path.starts_with("memory.qdrant.") {
let leaf = path.rsplit('.').next().unwrap_or("");
if known_memory_fields.contains(&leaf) {
continue;
}
}
tracing::warn!(
"Unknown config key ignored: \"{}\". Check config.toml for typos or deprecated options.",
path

View File

@ -4,6 +4,7 @@ pub enum MemoryBackendKind {
Lucid,
Postgres,
Qdrant,
Mem0,
Markdown,
None,
Unknown,
@ -65,6 +66,15 @@ const QDRANT_PROFILE: MemoryBackendProfile = MemoryBackendProfile {
optional_dependency: false,
};
const MEM0_PROFILE: MemoryBackendProfile = MemoryBackendProfile {
key: "mem0",
label: "Mem0 (OpenMemory) — semantic memory with LLM fact extraction via [memory.mem0]",
auto_save_default: true,
uses_sqlite_hygiene: false,
sqlite_based: false,
optional_dependency: true,
};
const NONE_PROFILE: MemoryBackendProfile = MemoryBackendProfile {
key: "none",
label: "None — disable persistent memory",
@ -104,6 +114,7 @@ pub fn classify_memory_backend(backend: &str) -> MemoryBackendKind {
"lucid" => MemoryBackendKind::Lucid,
"postgres" => MemoryBackendKind::Postgres,
"qdrant" => MemoryBackendKind::Qdrant,
"mem0" | "openmemory" => MemoryBackendKind::Mem0,
"markdown" => MemoryBackendKind::Markdown,
"none" => MemoryBackendKind::None,
_ => MemoryBackendKind::Unknown,
@ -116,6 +127,7 @@ pub fn memory_backend_profile(backend: &str) -> MemoryBackendProfile {
MemoryBackendKind::Lucid => LUCID_PROFILE,
MemoryBackendKind::Postgres => POSTGRES_PROFILE,
MemoryBackendKind::Qdrant => QDRANT_PROFILE,
MemoryBackendKind::Mem0 => MEM0_PROFILE,
MemoryBackendKind::Markdown => MARKDOWN_PROFILE,
MemoryBackendKind::None => NONE_PROFILE,
MemoryBackendKind::Unknown => CUSTOM_PROFILE,

631
src/memory/mem0.rs Normal file
View File

@ -0,0 +1,631 @@
//! Mem0 (OpenMemory) memory backend.
//!
//! Connects to a self-hosted OpenMemory server via its REST API
//! and implements the [`Memory`] trait for seamless integration with
//! ZeroClaw's auto-save, auto-recall, and hygiene lifecycle.
//!
//! Deploy OpenMemory: `docker compose up` from the mem0 repo.
//! Default endpoint: `http://localhost:8765`.
use super::traits::{Memory, MemoryCategory, MemoryEntry, ProceduralMessage};
use crate::config::schema::Mem0Config;
use async_trait::async_trait;
use reqwest::Client;
use serde::{Deserialize, Serialize};
/// Memory backend backed by a mem0 (OpenMemory) REST API.
pub struct Mem0Memory {
client: Client,
base_url: String,
user_id: String,
app_name: String,
infer: bool,
extraction_prompt: Option<String>,
}
// ── mem0 API request/response types ────────────────────────────────
#[derive(Serialize)]
struct AddMemoryRequest<'a> {
user_id: &'a str,
text: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
metadata: Option<Mem0Metadata<'a>>,
infer: bool,
#[serde(skip_serializing_if = "Option::is_none")]
app: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
custom_instructions: Option<&'a str>,
}
#[derive(Serialize)]
struct Mem0Metadata<'a> {
key: &'a str,
category: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
session_id: Option<&'a str>,
}
#[derive(Serialize)]
struct AddProceduralRequest<'a> {
user_id: &'a str,
messages: &'a [ProceduralMessage],
#[serde(skip_serializing_if = "Option::is_none")]
metadata: Option<serde_json::Value>,
}
#[derive(Serialize)]
struct DeleteMemoriesRequest<'a> {
memory_ids: Vec<&'a str>,
user_id: &'a str,
}
#[derive(Deserialize)]
struct Mem0MemoryItem {
id: String,
#[serde(alias = "content", alias = "text", default)]
memory: String,
#[serde(default)]
created_at: Option<serde_json::Value>,
#[serde(default, rename = "metadata_")]
metadata: Option<Mem0ResponseMetadata>,
#[serde(alias = "relevance_score", default)]
score: Option<f64>,
}
#[derive(Deserialize, Default)]
struct Mem0ResponseMetadata {
#[serde(default)]
key: Option<String>,
#[serde(default)]
category: Option<String>,
#[serde(default)]
session_id: Option<String>,
}
#[derive(Deserialize)]
struct Mem0ListResponse {
#[serde(default)]
items: Vec<Mem0MemoryItem>,
#[serde(default)]
total: usize,
}
// ── Implementation ─────────────────────────────────────────────────
impl Mem0Memory {
/// Create a new mem0 memory backend from config.
pub fn new(config: &Mem0Config) -> anyhow::Result<Self> {
let base_url = config.url.trim_end_matches('/').to_string();
if base_url.is_empty() {
anyhow::bail!("mem0 URL is empty; set [memory.mem0] url or MEM0_URL env var");
}
let client = Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()?;
Ok(Self {
client,
base_url,
user_id: config.user_id.clone(),
app_name: config.app_name.clone(),
infer: config.infer,
extraction_prompt: config.extraction_prompt.clone(),
})
}
fn api_url(&self, path: &str) -> String {
format!("{}/api/v1{}", self.base_url, path)
}
/// Use `session_id` as the effective mem0 `user_id` when provided,
/// falling back to the configured default. This enables per-user
/// and per-group memory scoping via the existing `Memory` trait.
fn effective_user_id<'a>(&'a self, session_id: Option<&'a str>) -> &'a str {
session_id
.filter(|s| !s.trim().is_empty())
.unwrap_or(&self.user_id)
}
/// Recall memories with optional search filters.
///
/// - `created_after` / `created_before`: ISO 8601 timestamps for time-range filtering.
/// - `metadata_filter`: arbitrary JSON object passed to the mem0 SDK `filters` param.
pub async fn recall_filtered(
&self,
query: &str,
limit: usize,
session_id: Option<&str>,
created_after: Option<&str>,
created_before: Option<&str>,
metadata_filter: Option<&serde_json::Value>,
) -> anyhow::Result<Vec<MemoryEntry>> {
let effective_user = self.effective_user_id(session_id);
let limit_str = limit.to_string();
let mut params: Vec<(&str, &str)> = vec![
("user_id", effective_user),
("search_query", query),
("size", &limit_str),
];
if let Some(after) = created_after {
params.push(("created_after", after));
}
if let Some(before) = created_before {
params.push(("created_before", before));
}
let meta_json;
if let Some(mf) = metadata_filter {
meta_json = serde_json::to_string(mf)?;
params.push(("metadata_filter", &meta_json));
}
let resp = self
.client
.get(self.api_url("/memories/"))
.query(&params)
.send()
.await?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("mem0 recall failed ({status}): {text}");
}
let list: Mem0ListResponse = resp.json().await?;
Ok(list.items.into_iter().map(|i| self.to_entry(i)).collect())
}
fn to_entry(&self, item: Mem0MemoryItem) -> MemoryEntry {
let meta = item.metadata.unwrap_or_default();
let timestamp = match item.created_at {
Some(serde_json::Value::Number(n)) => {
// Unix timestamp → ISO 8601
if let Some(ts) = n.as_i64() {
chrono::DateTime::from_timestamp(ts, 0)
.map(|dt| dt.to_rfc3339())
.unwrap_or_default()
} else {
String::new()
}
}
Some(serde_json::Value::String(s)) => s,
_ => String::new(),
};
let category = match meta.category.as_deref() {
Some("daily") => MemoryCategory::Daily,
Some("conversation") => MemoryCategory::Conversation,
Some(other) if other != "core" => MemoryCategory::Custom(other.to_string()),
// "core" or None → default
_ => MemoryCategory::Core,
};
MemoryEntry {
id: item.id,
key: meta.key.unwrap_or_default(),
content: item.memory,
category,
timestamp,
session_id: meta.session_id,
score: item.score,
}
}
/// Store a conversation trace as procedural memory.
///
/// Sends the message history (user input, tool calls, assistant response)
/// to the mem0 procedural endpoint so that "how to" patterns can be
/// extracted and stored for future recall.
pub async fn store_procedural(
&self,
messages: &[ProceduralMessage],
session_id: Option<&str>,
) -> anyhow::Result<()> {
if messages.is_empty() {
return Ok(());
}
let effective_user = self.effective_user_id(session_id);
let body = AddProceduralRequest {
user_id: effective_user,
messages,
metadata: None,
};
let resp = self
.client
.post(self.api_url("/memories/procedural"))
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("mem0 store_procedural failed ({status}): {text}");
}
Ok(())
}
}
// ── History API types ─────────────────────────────────────────────
#[derive(Deserialize)]
struct Mem0HistoryResponse {
#[serde(default)]
history: Vec<serde_json::Value>,
#[serde(default)]
error: Option<String>,
}
impl Mem0Memory {
/// Retrieve the edit history (audit trail) for a specific memory by ID.
pub async fn history(&self, memory_id: &str) -> anyhow::Result<String> {
let url = self.api_url(&format!("/memories/{memory_id}/history"));
let resp = self.client.get(&url).send().await?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("mem0 history failed ({status}): {text}");
}
let body: Mem0HistoryResponse = resp.json().await?;
if let Some(err) = body.error {
anyhow::bail!("mem0 history error: {err}");
}
if body.history.is_empty() {
return Ok(format!("No history found for memory {memory_id}."));
}
let mut lines = Vec::with_capacity(body.history.len() + 1);
lines.push(format!("History for memory {memory_id}:"));
for (i, entry) in body.history.iter().enumerate() {
let event = entry
.get("event")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let old_memory = entry
.get("old_memory")
.and_then(|v| v.as_str())
.unwrap_or("-");
let new_memory = entry
.get("new_memory")
.and_then(|v| v.as_str())
.unwrap_or("-");
let timestamp = entry
.get("created_at")
.or_else(|| entry.get("timestamp"))
.and_then(|v| v.as_str())
.unwrap_or("unknown");
lines.push(format!(
" {idx}. [{event}] at {timestamp}\n old: {old_memory}\n new: {new_memory}",
idx = i + 1,
));
}
Ok(lines.join("\n"))
}
}
#[async_trait]
impl Memory for Mem0Memory {
fn name(&self) -> &str {
"mem0"
}
async fn store(
&self,
key: &str,
content: &str,
category: MemoryCategory,
session_id: Option<&str>,
) -> anyhow::Result<()> {
let cat_str = category.to_string();
let effective_user = self.effective_user_id(session_id);
let body = AddMemoryRequest {
user_id: effective_user,
text: content,
metadata: Some(Mem0Metadata {
key,
category: &cat_str,
session_id,
}),
infer: self.infer,
app: Some(&self.app_name),
custom_instructions: self.extraction_prompt.as_deref(),
};
let resp = self
.client
.post(self.api_url("/memories/"))
.json(&body)
.send()
.await?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("mem0 store failed ({status}): {text}");
}
Ok(())
}
async fn recall(
&self,
query: &str,
limit: usize,
session_id: Option<&str>,
) -> anyhow::Result<Vec<MemoryEntry>> {
self.recall_filtered(query, limit, session_id, None, None, None)
.await
}
async fn get(&self, key: &str) -> anyhow::Result<Option<MemoryEntry>> {
// mem0 doesn't have a get-by-key API, so we search by key in metadata
let results = self.recall(key, 1, None).await?;
Ok(results.into_iter().find(|e| e.key == key))
}
async fn list(
&self,
category: Option<&MemoryCategory>,
session_id: Option<&str>,
) -> anyhow::Result<Vec<MemoryEntry>> {
let effective_user = self.effective_user_id(session_id);
let resp = self
.client
.get(self.api_url("/memories/"))
.query(&[("user_id", effective_user), ("size", "100")])
.send()
.await?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("mem0 list failed ({status}): {text}");
}
let list: Mem0ListResponse = resp.json().await?;
let entries: Vec<MemoryEntry> = list.items.into_iter().map(|i| self.to_entry(i)).collect();
// Client-side category filter (mem0 API doesn't filter by metadata)
match category {
Some(cat) => Ok(entries.into_iter().filter(|e| &e.category == cat).collect()),
None => Ok(entries),
}
}
async fn forget(&self, key: &str) -> anyhow::Result<bool> {
// Find the memory ID by key first
let entry = self.get(key).await?;
let entry = match entry {
Some(e) => e,
None => return Ok(false),
};
let body = DeleteMemoriesRequest {
memory_ids: vec![&entry.id],
user_id: &self.user_id,
};
let resp = self
.client
.delete(self.api_url("/memories/"))
.json(&body)
.send()
.await?;
Ok(resp.status().is_success())
}
async fn count(&self) -> anyhow::Result<usize> {
let resp = self
.client
.get(self.api_url("/memories/"))
.query(&[
("user_id", self.user_id.as_str()),
("size", "1"),
("page", "1"),
])
.send()
.await?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("mem0 count failed ({status}): {text}");
}
let list: Mem0ListResponse = resp.json().await?;
Ok(list.total)
}
async fn health_check(&self) -> bool {
self.client
.get(self.api_url("/memories/"))
.query(&[
("user_id", self.user_id.as_str()),
("size", "1"),
("page", "1"),
])
.send()
.await
.is_ok_and(|r| r.status().is_success())
}
async fn store_procedural(
&self,
messages: &[ProceduralMessage],
session_id: Option<&str>,
) -> anyhow::Result<()> {
Mem0Memory::store_procedural(self, messages, session_id).await
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_config() -> Mem0Config {
Mem0Config {
url: "http://localhost:8765".into(),
user_id: "test-user".into(),
app_name: "test-app".into(),
infer: true,
extraction_prompt: None,
}
}
#[test]
fn new_rejects_empty_url() {
let config = Mem0Config {
url: String::new(),
..test_config()
};
assert!(Mem0Memory::new(&config).is_err());
}
#[test]
fn new_trims_trailing_slash() {
let config = Mem0Config {
url: "http://localhost:8765/".into(),
..test_config()
};
let mem = Mem0Memory::new(&config).unwrap();
assert_eq!(mem.base_url, "http://localhost:8765");
}
#[test]
fn api_url_builds_correct_path() {
let mem = Mem0Memory::new(&test_config()).unwrap();
assert_eq!(
mem.api_url("/memories/"),
"http://localhost:8765/api/v1/memories/"
);
}
#[test]
fn to_entry_maps_unix_timestamp() {
let mem = Mem0Memory::new(&test_config()).unwrap();
let item = Mem0MemoryItem {
id: "id-1".into(),
memory: "hello".into(),
created_at: Some(serde_json::json!(1_700_000_000)),
metadata: Some(Mem0ResponseMetadata {
key: Some("k1".into()),
category: Some("core".into()),
session_id: None,
}),
score: Some(0.95),
};
let entry = mem.to_entry(item);
assert_eq!(entry.id, "id-1");
assert_eq!(entry.key, "k1");
assert_eq!(entry.category, MemoryCategory::Core);
assert!(!entry.timestamp.is_empty());
assert_eq!(entry.score, Some(0.95));
}
#[test]
fn to_entry_maps_string_timestamp() {
let mem = Mem0Memory::new(&test_config()).unwrap();
let item = Mem0MemoryItem {
id: "id-2".into(),
memory: "world".into(),
created_at: Some(serde_json::json!("2024-01-01T00:00:00Z")),
metadata: None,
score: None,
};
let entry = mem.to_entry(item);
assert_eq!(entry.timestamp, "2024-01-01T00:00:00Z");
assert_eq!(entry.category, MemoryCategory::Core); // default
}
#[test]
fn to_entry_handles_missing_metadata() {
let mem = Mem0Memory::new(&test_config()).unwrap();
let item = Mem0MemoryItem {
id: "id-3".into(),
memory: "bare".into(),
created_at: None,
metadata: None,
score: None,
};
let entry = mem.to_entry(item);
assert_eq!(entry.key, "");
assert_eq!(entry.category, MemoryCategory::Core);
assert!(entry.timestamp.is_empty());
assert_eq!(entry.score, None);
}
#[test]
fn to_entry_custom_category() {
let mem = Mem0Memory::new(&test_config()).unwrap();
let item = Mem0MemoryItem {
id: "id-4".into(),
memory: "custom".into(),
created_at: None,
metadata: Some(Mem0ResponseMetadata {
key: Some("k".into()),
category: Some("project_notes".into()),
session_id: Some("s1".into()),
}),
score: None,
};
let entry = mem.to_entry(item);
assert_eq!(
entry.category,
MemoryCategory::Custom("project_notes".into())
);
assert_eq!(entry.session_id.as_deref(), Some("s1"));
}
#[test]
fn name_returns_mem0() {
let mem = Mem0Memory::new(&test_config()).unwrap();
assert_eq!(mem.name(), "mem0");
}
#[test]
fn procedural_request_serializes_messages() {
let messages = vec![
ProceduralMessage {
role: "user".into(),
content: "How do I deploy?".into(),
name: None,
},
ProceduralMessage {
role: "tool".into(),
content: "deployment started".into(),
name: Some("shell".into()),
},
ProceduralMessage {
role: "assistant".into(),
content: "Deployment complete.".into(),
name: None,
},
];
let req = AddProceduralRequest {
user_id: "test-user",
messages: &messages,
metadata: None,
};
let json = serde_json::to_value(&req).unwrap();
assert_eq!(json["user_id"], "test-user");
let msgs = json["messages"].as_array().unwrap();
assert_eq!(msgs.len(), 3);
assert_eq!(msgs[0]["role"], "user");
assert_eq!(msgs[1]["name"], "shell");
// metadata should be absent when None
assert!(json.get("metadata").is_none());
}
}

View File

@ -7,6 +7,8 @@ pub mod hygiene;
pub mod knowledge_graph;
pub mod lucid;
pub mod markdown;
#[cfg(feature = "memory-mem0")]
pub mod mem0;
pub mod none;
#[cfg(feature = "memory-postgres")]
pub mod postgres;
@ -24,6 +26,8 @@ pub use backend::{
};
pub use lucid::LucidMemory;
pub use markdown::MarkdownMemory;
#[cfg(feature = "memory-mem0")]
pub use mem0::Mem0Memory;
pub use none::NoneMemory;
#[cfg(feature = "memory-postgres")]
pub use postgres::PostgresMemory;
@ -32,7 +36,7 @@ pub use response_cache::ResponseCache;
pub use sqlite::SqliteMemory;
pub use traits::Memory;
#[allow(unused_imports)]
pub use traits::{MemoryCategory, MemoryEntry};
pub use traits::{MemoryCategory, MemoryEntry, ProceduralMessage};
use crate::config::{EmbeddingRouteConfig, MemoryConfig, StorageProviderConfig};
use anyhow::Context;
@ -57,7 +61,7 @@ where
Ok(Box::new(LucidMemory::new(workspace_dir, local)))
}
MemoryBackendKind::Postgres => postgres_builder(),
MemoryBackendKind::Qdrant | MemoryBackendKind::Markdown => {
MemoryBackendKind::Qdrant | MemoryBackendKind::Mem0 | MemoryBackendKind::Markdown => {
Ok(Box::new(MarkdownMemory::new(workspace_dir)))
}
MemoryBackendKind::None => Ok(Box::new(NoneMemory::new())),
@ -336,6 +340,28 @@ pub fn create_memory_with_storage_and_routes(
);
}
#[cfg(feature = "memory-mem0")]
fn build_mem0_memory(config: &crate::config::MemoryConfig) -> anyhow::Result<Box<dyn Memory>> {
let mem = Mem0Memory::new(&config.mem0)?;
tracing::info!(
"📦 Mem0 memory backend configured (url: {}, user: {})",
config.mem0.url,
config.mem0.user_id
);
Ok(Box::new(mem))
}
#[cfg(not(feature = "memory-mem0"))]
fn build_mem0_memory(_config: &crate::config::MemoryConfig) -> anyhow::Result<Box<dyn Memory>> {
anyhow::bail!(
"memory backend 'mem0' requested but this build was compiled without `memory-mem0`; rebuild with `--features memory-mem0`"
);
}
if matches!(backend_kind, MemoryBackendKind::Mem0) {
return build_mem0_memory(config);
}
if matches!(backend_kind, MemoryBackendKind::Qdrant) {
let url = config
.qdrant

View File

@ -1,6 +1,18 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
/// A single message in a conversation trace for procedural memory.
///
/// Used to capture "how to" patterns from tool-calling turns so that
/// backends that support procedural storage (e.g. mem0) can learn from them.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ProceduralMessage {
pub role: String,
pub content: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
}
/// A single memory entry
#[derive(Clone, Serialize, Deserialize)]
pub struct MemoryEntry {
@ -109,6 +121,19 @@ pub trait Memory: Send + Sync {
/// Health check
async fn health_check(&self) -> bool;
/// Store a conversation trace as procedural memory.
///
/// Backends that support procedural storage (e.g. mem0) override this
/// to extract "how to" patterns from tool-calling turns. The default
/// implementation is a no-op.
async fn store_procedural(
&self,
_messages: &[ProceduralMessage],
_session_id: Option<&str>,
) -> anyhow::Result<()> {
Ok(())
}
}
#[cfg(test)]

View File

@ -418,6 +418,7 @@ fn memory_config_defaults_for_backend(backend: &str) -> MemoryConfig {
auto_hydrate: true,
sqlite_open_timeout_secs: None,
qdrant: crate::config::QdrantConfig::default(),
mem0: crate::config::schema::Mem0Config::default(),
}
}