Add three new tools in src/tools/sessions.rs: - sessions_list: lists active sessions with channel, message count, last activity - sessions_history: reads last N messages from a session by ID - sessions_send: appends a message to a session for inter-agent communication All tools operate on the SessionBackend trait, using the JSONL SessionStore by default. Registered unconditionally in all_tools_with_runtime when the sessions directory is accessible. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
481 lines
16 KiB
Rust
481 lines
16 KiB
Rust
//! Session-to-session messaging tools for inter-agent communication.
|
|
//!
|
|
//! Provides three tools:
|
|
//! - `sessions_list` — list active sessions with metadata
|
|
//! - `sessions_history` — read message history from a specific session
|
|
//! - `sessions_send` — send a message to a specific session
|
|
|
|
use super::traits::{Tool, ToolResult};
|
|
use crate::channels::session_backend::SessionBackend;
|
|
use async_trait::async_trait;
|
|
use serde_json::json;
|
|
use std::fmt::Write;
|
|
use std::sync::Arc;
|
|
|
|
// ── SessionsListTool ────────────────────────────────────────────────
|
|
|
|
/// Lists active sessions with their channel, last activity time, and message count.
|
|
pub struct SessionsListTool {
|
|
backend: Arc<dyn SessionBackend>,
|
|
}
|
|
|
|
impl SessionsListTool {
|
|
pub fn new(backend: Arc<dyn SessionBackend>) -> Self {
|
|
Self { backend }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Tool for SessionsListTool {
|
|
fn name(&self) -> &str {
|
|
"sessions_list"
|
|
}
|
|
|
|
fn description(&self) -> &str {
|
|
"List all active conversation sessions with their channel, last activity time, and message count."
|
|
}
|
|
|
|
fn parameters_schema(&self) -> serde_json::Value {
|
|
json!({
|
|
"type": "object",
|
|
"properties": {
|
|
"limit": {
|
|
"type": "integer",
|
|
"description": "Max sessions to return (default: 50)"
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
|
|
#[allow(clippy::cast_possible_truncation)]
|
|
let limit = args
|
|
.get("limit")
|
|
.and_then(serde_json::Value::as_u64)
|
|
.map_or(50, |v| v as usize);
|
|
|
|
let metadata = self.backend.list_sessions_with_metadata();
|
|
|
|
if metadata.is_empty() {
|
|
return Ok(ToolResult {
|
|
success: true,
|
|
output: "No active sessions found.".into(),
|
|
error: None,
|
|
});
|
|
}
|
|
|
|
let capped: Vec<_> = metadata.into_iter().take(limit).collect();
|
|
let mut output = format!("Found {} session(s):\n", capped.len());
|
|
for meta in &capped {
|
|
// Extract channel from key (convention: channel__identifier)
|
|
let channel = meta.key.split("__").next().unwrap_or(&meta.key);
|
|
let _ = writeln!(
|
|
output,
|
|
"- {}: channel={}, messages={}, last_activity={}",
|
|
meta.key, channel, meta.message_count, meta.last_activity
|
|
);
|
|
}
|
|
|
|
Ok(ToolResult {
|
|
success: true,
|
|
output,
|
|
error: None,
|
|
})
|
|
}
|
|
}
|
|
|
|
// ── SessionsHistoryTool ─────────────────────────────────────────────
|
|
|
|
/// Reads the message history of a specific session by ID.
|
|
pub struct SessionsHistoryTool {
|
|
backend: Arc<dyn SessionBackend>,
|
|
}
|
|
|
|
impl SessionsHistoryTool {
|
|
pub fn new(backend: Arc<dyn SessionBackend>) -> Self {
|
|
Self { backend }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Tool for SessionsHistoryTool {
|
|
fn name(&self) -> &str {
|
|
"sessions_history"
|
|
}
|
|
|
|
fn description(&self) -> &str {
|
|
"Read the message history of a specific session by its session ID. Returns the last N messages."
|
|
}
|
|
|
|
fn parameters_schema(&self) -> serde_json::Value {
|
|
json!({
|
|
"type": "object",
|
|
"properties": {
|
|
"session_id": {
|
|
"type": "string",
|
|
"description": "The session ID to read history from (e.g. telegram__user123)"
|
|
},
|
|
"limit": {
|
|
"type": "integer",
|
|
"description": "Max messages to return, from most recent (default: 20)"
|
|
}
|
|
},
|
|
"required": ["session_id"]
|
|
})
|
|
}
|
|
|
|
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
|
|
let session_id = args
|
|
.get("session_id")
|
|
.and_then(|v| v.as_str())
|
|
.ok_or_else(|| anyhow::anyhow!("Missing 'session_id' parameter"))?;
|
|
|
|
#[allow(clippy::cast_possible_truncation)]
|
|
let limit = args
|
|
.get("limit")
|
|
.and_then(serde_json::Value::as_u64)
|
|
.map_or(20, |v| v as usize);
|
|
|
|
let messages = self.backend.load(session_id);
|
|
|
|
if messages.is_empty() {
|
|
return Ok(ToolResult {
|
|
success: true,
|
|
output: format!("No messages found for session '{session_id}'."),
|
|
error: None,
|
|
});
|
|
}
|
|
|
|
// Take the last `limit` messages
|
|
let start = messages.len().saturating_sub(limit);
|
|
let tail = &messages[start..];
|
|
|
|
let mut output = format!(
|
|
"Session '{}': showing {}/{} messages\n",
|
|
session_id,
|
|
tail.len(),
|
|
messages.len()
|
|
);
|
|
for msg in tail {
|
|
let _ = writeln!(output, "[{}] {}", msg.role, msg.content);
|
|
}
|
|
|
|
Ok(ToolResult {
|
|
success: true,
|
|
output,
|
|
error: None,
|
|
})
|
|
}
|
|
}
|
|
|
|
// ── SessionsSendTool ────────────────────────────────────────────────
|
|
|
|
/// Sends a message to a specific session, enabling inter-agent communication.
|
|
pub struct SessionsSendTool {
|
|
backend: Arc<dyn SessionBackend>,
|
|
}
|
|
|
|
impl SessionsSendTool {
|
|
pub fn new(backend: Arc<dyn SessionBackend>) -> Self {
|
|
Self { backend }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Tool for SessionsSendTool {
|
|
fn name(&self) -> &str {
|
|
"sessions_send"
|
|
}
|
|
|
|
fn description(&self) -> &str {
|
|
"Send a message to a specific session by its session ID. The message is appended to the session's conversation history as a 'user' message, enabling inter-agent communication."
|
|
}
|
|
|
|
fn parameters_schema(&self) -> serde_json::Value {
|
|
json!({
|
|
"type": "object",
|
|
"properties": {
|
|
"session_id": {
|
|
"type": "string",
|
|
"description": "The target session ID (e.g. telegram__user123)"
|
|
},
|
|
"message": {
|
|
"type": "string",
|
|
"description": "The message content to send"
|
|
}
|
|
},
|
|
"required": ["session_id", "message"]
|
|
})
|
|
}
|
|
|
|
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
|
|
let session_id = args
|
|
.get("session_id")
|
|
.and_then(|v| v.as_str())
|
|
.ok_or_else(|| anyhow::anyhow!("Missing 'session_id' parameter"))?;
|
|
|
|
let message = args
|
|
.get("message")
|
|
.and_then(|v| v.as_str())
|
|
.ok_or_else(|| anyhow::anyhow!("Missing 'message' parameter"))?;
|
|
|
|
if message.trim().is_empty() {
|
|
return Ok(ToolResult {
|
|
success: false,
|
|
output: String::new(),
|
|
error: Some("Message content must not be empty.".into()),
|
|
});
|
|
}
|
|
|
|
let chat_msg = crate::providers::traits::ChatMessage::user(message);
|
|
|
|
match self.backend.append(session_id, &chat_msg) {
|
|
Ok(()) => Ok(ToolResult {
|
|
success: true,
|
|
output: format!("Message sent to session '{session_id}'."),
|
|
error: None,
|
|
}),
|
|
Err(e) => Ok(ToolResult {
|
|
success: false,
|
|
output: String::new(),
|
|
error: Some(format!("Failed to send message: {e}")),
|
|
}),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::channels::session_store::SessionStore;
|
|
use crate::providers::traits::ChatMessage;
|
|
use tempfile::TempDir;
|
|
|
|
fn test_backend() -> (TempDir, Arc<dyn SessionBackend>) {
|
|
let tmp = TempDir::new().unwrap();
|
|
let store = SessionStore::new(tmp.path()).unwrap();
|
|
(tmp, Arc::new(store))
|
|
}
|
|
|
|
fn seeded_backend() -> (TempDir, Arc<dyn SessionBackend>) {
|
|
let tmp = TempDir::new().unwrap();
|
|
let store = SessionStore::new(tmp.path()).unwrap();
|
|
store
|
|
.append("telegram__alice", &ChatMessage::user("Hello from Alice"))
|
|
.unwrap();
|
|
store
|
|
.append(
|
|
"telegram__alice",
|
|
&ChatMessage::assistant("Hi Alice, how can I help?"),
|
|
)
|
|
.unwrap();
|
|
store
|
|
.append("discord__bob", &ChatMessage::user("Hey from Bob"))
|
|
.unwrap();
|
|
(tmp, Arc::new(store))
|
|
}
|
|
|
|
// ── SessionsListTool tests ──────────────────────────────────────
|
|
|
|
#[tokio::test]
|
|
async fn list_empty_sessions() {
|
|
let (_tmp, backend) = test_backend();
|
|
let tool = SessionsListTool::new(backend);
|
|
let result = tool.execute(json!({})).await.unwrap();
|
|
assert!(result.success);
|
|
assert!(result.output.contains("No active sessions"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn list_sessions_shows_all() {
|
|
let (_tmp, backend) = seeded_backend();
|
|
let tool = SessionsListTool::new(backend);
|
|
let result = tool.execute(json!({})).await.unwrap();
|
|
assert!(result.success);
|
|
assert!(result.output.contains("2 session(s)"));
|
|
assert!(result.output.contains("telegram__alice"));
|
|
assert!(result.output.contains("discord__bob"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn list_sessions_respects_limit() {
|
|
let (_tmp, backend) = seeded_backend();
|
|
let tool = SessionsListTool::new(backend);
|
|
let result = tool.execute(json!({"limit": 1})).await.unwrap();
|
|
assert!(result.success);
|
|
assert!(result.output.contains("1 session(s)"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn list_sessions_extracts_channel() {
|
|
let (_tmp, backend) = seeded_backend();
|
|
let tool = SessionsListTool::new(backend);
|
|
let result = tool.execute(json!({})).await.unwrap();
|
|
assert!(result.output.contains("channel=telegram"));
|
|
assert!(result.output.contains("channel=discord"));
|
|
}
|
|
|
|
#[test]
|
|
fn list_tool_name_and_schema() {
|
|
let (_tmp, backend) = test_backend();
|
|
let tool = SessionsListTool::new(backend);
|
|
assert_eq!(tool.name(), "sessions_list");
|
|
assert!(tool.parameters_schema()["properties"]["limit"].is_object());
|
|
}
|
|
|
|
// ── SessionsHistoryTool tests ───────────────────────────────────
|
|
|
|
#[tokio::test]
|
|
async fn history_empty_session() {
|
|
let (_tmp, backend) = test_backend();
|
|
let tool = SessionsHistoryTool::new(backend);
|
|
let result = tool
|
|
.execute(json!({"session_id": "nonexistent"}))
|
|
.await
|
|
.unwrap();
|
|
assert!(result.success);
|
|
assert!(result.output.contains("No messages found"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn history_returns_messages() {
|
|
let (_tmp, backend) = seeded_backend();
|
|
let tool = SessionsHistoryTool::new(backend);
|
|
let result = tool
|
|
.execute(json!({"session_id": "telegram__alice"}))
|
|
.await
|
|
.unwrap();
|
|
assert!(result.success);
|
|
assert!(result.output.contains("showing 2/2 messages"));
|
|
assert!(result.output.contains("[user] Hello from Alice"));
|
|
assert!(result.output.contains("[assistant] Hi Alice"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn history_respects_limit() {
|
|
let (_tmp, backend) = seeded_backend();
|
|
let tool = SessionsHistoryTool::new(backend);
|
|
let result = tool
|
|
.execute(json!({"session_id": "telegram__alice", "limit": 1}))
|
|
.await
|
|
.unwrap();
|
|
assert!(result.success);
|
|
assert!(result.output.contains("showing 1/2 messages"));
|
|
// Should show only the last message
|
|
assert!(result.output.contains("[assistant]"));
|
|
assert!(!result.output.contains("[user] Hello from Alice"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn history_missing_session_id() {
|
|
let (_tmp, backend) = test_backend();
|
|
let tool = SessionsHistoryTool::new(backend);
|
|
let result = tool.execute(json!({})).await;
|
|
assert!(result.is_err());
|
|
assert!(result.unwrap_err().to_string().contains("session_id"));
|
|
}
|
|
|
|
#[test]
|
|
fn history_tool_name_and_schema() {
|
|
let (_tmp, backend) = test_backend();
|
|
let tool = SessionsHistoryTool::new(backend);
|
|
assert_eq!(tool.name(), "sessions_history");
|
|
let schema = tool.parameters_schema();
|
|
assert!(schema["properties"]["session_id"].is_object());
|
|
assert!(schema["required"]
|
|
.as_array()
|
|
.unwrap()
|
|
.contains(&json!("session_id")));
|
|
}
|
|
|
|
// ── SessionsSendTool tests ──────────────────────────────────────
|
|
|
|
#[tokio::test]
|
|
async fn send_appends_message() {
|
|
let (_tmp, backend) = test_backend();
|
|
let tool = SessionsSendTool::new(backend.clone());
|
|
let result = tool
|
|
.execute(json!({
|
|
"session_id": "telegram__alice",
|
|
"message": "Hello from another agent"
|
|
}))
|
|
.await
|
|
.unwrap();
|
|
assert!(result.success);
|
|
assert!(result.output.contains("Message sent"));
|
|
|
|
// Verify message was appended
|
|
let messages = backend.load("telegram__alice");
|
|
assert_eq!(messages.len(), 1);
|
|
assert_eq!(messages[0].role, "user");
|
|
assert_eq!(messages[0].content, "Hello from another agent");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn send_to_existing_session() {
|
|
let (_tmp, backend) = seeded_backend();
|
|
let tool = SessionsSendTool::new(backend.clone());
|
|
let result = tool
|
|
.execute(json!({
|
|
"session_id": "telegram__alice",
|
|
"message": "Inter-agent message"
|
|
}))
|
|
.await
|
|
.unwrap();
|
|
assert!(result.success);
|
|
|
|
let messages = backend.load("telegram__alice");
|
|
assert_eq!(messages.len(), 3);
|
|
assert_eq!(messages[2].content, "Inter-agent message");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn send_rejects_empty_message() {
|
|
let (_tmp, backend) = test_backend();
|
|
let tool = SessionsSendTool::new(backend);
|
|
let result = tool
|
|
.execute(json!({
|
|
"session_id": "telegram__alice",
|
|
"message": " "
|
|
}))
|
|
.await
|
|
.unwrap();
|
|
assert!(!result.success);
|
|
assert!(result.error.unwrap().contains("empty"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn send_missing_session_id() {
|
|
let (_tmp, backend) = test_backend();
|
|
let tool = SessionsSendTool::new(backend);
|
|
let result = tool.execute(json!({"message": "hi"})).await;
|
|
assert!(result.is_err());
|
|
assert!(result.unwrap_err().to_string().contains("session_id"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn send_missing_message() {
|
|
let (_tmp, backend) = test_backend();
|
|
let tool = SessionsSendTool::new(backend);
|
|
let result = tool.execute(json!({"session_id": "telegram__alice"})).await;
|
|
assert!(result.is_err());
|
|
assert!(result.unwrap_err().to_string().contains("message"));
|
|
}
|
|
|
|
#[test]
|
|
fn send_tool_name_and_schema() {
|
|
let (_tmp, backend) = test_backend();
|
|
let tool = SessionsSendTool::new(backend);
|
|
assert_eq!(tool.name(), "sessions_send");
|
|
let schema = tool.parameters_schema();
|
|
assert!(schema["required"]
|
|
.as_array()
|
|
.unwrap()
|
|
.contains(&json!("session_id")));
|
|
assert!(schema["required"]
|
|
.as_array()
|
|
.unwrap()
|
|
.contains(&json!("message")));
|
|
}
|
|
}
|