From b0232c505740104cb93e263e128ffc627cd1f5e8 Mon Sep 17 00:00:00 2001 From: Argenis Date: Mon, 23 Mar 2026 15:14:11 -0400 Subject: [PATCH] feat(channels): add automatic media understanding pipeline (#4402) * feat(channels): add automatic media understanding pipeline for inbound messages Add MediaPipeline that pre-processes inbound channel message attachments before the agent sees them: - Audio: transcribed via existing transcription infrastructure, annotated as [Audio transcription: ...] - Images: annotated with [Image: attached] (vision-aware) - Video: annotated with [Video: attached] (placeholder for future API) The pipeline is opt-in via [media_pipeline] config section (default: disabled). Individual media types can be toggled independently. Changes: - New src/channels/media_pipeline.rs with MediaPipeline struct and tests - New MediaPipelineConfig in config/schema.rs - Added attachments field to ChannelMessage for media pass-through - Wired pipeline into process_channel_message after hooks, before agent Co-Authored-By: Claude Opus 4.6 (1M context) * fix(channels): add attachments field to integration test fixtures Add missing `attachments: vec![]` to all ChannelMessage struct literals in channel_matrix.rs and channel_routing.rs after the new attachments field was added to the struct in traits.rs. Also fix schema.rs test compilation: make TempDir import unconditional and add explicit type annotations on tokio::fs calls to resolve type inference errors in the bootstrap file tests. Co-Authored-By: Claude Opus 4.6 (1M context) * fix(channels): add missing attachments field to gmail_push and discord_history constructors These channels were added to master after the media pipeline PR was originally branched. The ChannelMessage struct now requires an attachments field, so initialise it to an empty Vec for channels that do not yet extract attachments. --------- Co-authored-by: Giulio V Co-authored-by: Claude Opus 4.6 (1M context) --- src/channels/bluesky.rs | 1 + src/channels/cli.rs | 3 + src/channels/dingtalk.rs | 1 + src/channels/discord.rs | 1 + src/channels/discord_history.rs | 1 + src/channels/email_channel.rs | 1 + src/channels/gmail_push.rs | 1 + src/channels/imessage.rs | 1 + src/channels/irc.rs | 1 + src/channels/lark.rs | 2 + src/channels/linq.rs | 1 + src/channels/matrix.rs | 1 + src/channels/mattermost.rs | 1 + src/channels/media_pipeline.rs | 409 +++++++++++++++++++++++++++ src/channels/mochat.rs | 1 + src/channels/mod.rs | 124 ++++++++ src/channels/nextcloud_talk.rs | 2 + src/channels/nostr.rs | 1 + src/channels/notion.rs | 1 + src/channels/qq.rs | 2 + src/channels/reddit.rs | 1 + src/channels/signal.rs | 1 + src/channels/slack.rs | 5 + src/channels/telegram.rs | 3 + src/channels/traits.rs | 6 + src/channels/twitter.rs | 1 + src/channels/wati.rs | 1 + src/channels/webhook.rs | 1 + src/channels/whatsapp.rs | 1 + src/channels/whatsapp_web.rs | 1 + src/config/mod.rs | 8 +- src/config/schema.rs | 45 +++ src/gateway/mod.rs | 1 + src/onboard/wizard.rs | 2 + tests/integration/channel_matrix.rs | 21 ++ tests/integration/channel_routing.rs | 5 + 36 files changed, 655 insertions(+), 4 deletions(-) create mode 100644 src/channels/media_pipeline.rs diff --git a/src/channels/bluesky.rs b/src/channels/bluesky.rs index 7dafafb83..681586114 100644 --- a/src/channels/bluesky.rs +++ b/src/channels/bluesky.rs @@ -252,6 +252,7 @@ impl BlueskyChannel { timestamp, thread_ts: Some(notif.uri.clone()), interruption_scope_id: None, + attachments: vec![], }) } diff --git a/src/channels/cli.rs b/src/channels/cli.rs index 69cb5367d..7ade5a900 100644 --- a/src/channels/cli.rs +++ b/src/channels/cli.rs @@ -49,6 +49,7 @@ impl Channel for CliChannel { .as_secs(), thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; if tx.send(msg).await.is_err() { @@ -113,6 +114,7 @@ mod tests { timestamp: 1_234_567_890, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; assert_eq!(msg.id, "test-id"); assert_eq!(msg.sender, "user"); @@ -133,6 +135,7 @@ mod tests { timestamp: 0, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; let cloned = msg.clone(); assert_eq!(cloned.id, msg.id); diff --git a/src/channels/dingtalk.rs b/src/channels/dingtalk.rs index 34fb96ee3..916765aff 100644 --- a/src/channels/dingtalk.rs +++ b/src/channels/dingtalk.rs @@ -285,6 +285,7 @@ impl Channel for DingTalkChannel { .as_secs(), thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; if tx.send(channel_msg).await.is_err() { diff --git a/src/channels/discord.rs b/src/channels/discord.rs index 639d8fcb1..1b7115388 100644 --- a/src/channels/discord.rs +++ b/src/channels/discord.rs @@ -914,6 +914,7 @@ impl Channel for DiscordChannel { .as_secs(), thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; if tx.send(channel_msg).await.is_err() { diff --git a/src/channels/discord_history.rs b/src/channels/discord_history.rs index a40993671..ac08a7ba2 100644 --- a/src/channels/discord_history.rs +++ b/src/channels/discord_history.rs @@ -494,6 +494,7 @@ impl Channel for DiscordHistoryChannel { .as_secs(), thread_ts: None, interruption_scope_id: None, + attachments: Vec::new(), }; if tx.send(channel_msg).await.is_err() { break; diff --git a/src/channels/email_channel.rs b/src/channels/email_channel.rs index 81d996e20..c30ba6577 100644 --- a/src/channels/email_channel.rs +++ b/src/channels/email_channel.rs @@ -468,6 +468,7 @@ impl EmailChannel { timestamp: email.timestamp, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; if tx.send(msg).await.is_err() { diff --git a/src/channels/gmail_push.rs b/src/channels/gmail_push.rs index 7528db7bf..588ea2f2f 100644 --- a/src/channels/gmail_push.rs +++ b/src/channels/gmail_push.rs @@ -494,6 +494,7 @@ impl GmailPushChannel { timestamp, thread_ts: Some(gmail_msg.thread_id), interruption_scope_id: None, + attachments: Vec::new(), }; if tx.send(channel_msg).await.is_err() { diff --git a/src/channels/imessage.rs b/src/channels/imessage.rs index 75935c906..0354061e1 100644 --- a/src/channels/imessage.rs +++ b/src/channels/imessage.rs @@ -295,6 +295,7 @@ end tell"# .as_secs(), thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; if tx.send(msg).await.is_err() { diff --git a/src/channels/irc.rs b/src/channels/irc.rs index 562399c4d..0ac87e274 100644 --- a/src/channels/irc.rs +++ b/src/channels/irc.rs @@ -581,6 +581,7 @@ impl Channel for IrcChannel { .as_secs(), thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; if tx.send(channel_msg).await.is_err() { diff --git a/src/channels/lark.rs b/src/channels/lark.rs index 63386fa54..70c0ae5b2 100644 --- a/src/channels/lark.rs +++ b/src/channels/lark.rs @@ -1019,6 +1019,7 @@ impl LarkChannel { .as_secs(), thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; tracing::debug!("Lark WS: message in {}", lark_msg.chat_id); @@ -1753,6 +1754,7 @@ impl LarkChannel { timestamp, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }); messages diff --git a/src/channels/linq.rs b/src/channels/linq.rs index 0a78cb7ea..1a1e96f67 100644 --- a/src/channels/linq.rs +++ b/src/channels/linq.rs @@ -268,6 +268,7 @@ impl LinqChannel { timestamp, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }); messages diff --git a/src/channels/matrix.rs b/src/channels/matrix.rs index c4e396048..42a29ebfd 100644 --- a/src/channels/matrix.rs +++ b/src/channels/matrix.rs @@ -974,6 +974,7 @@ impl Channel for MatrixChannel { .as_secs(), thread_ts: thread_ts.clone(), interruption_scope_id: thread_ts, + attachments: vec![], }; let _ = tx.send(msg).await; diff --git a/src/channels/mattermost.rs b/src/channels/mattermost.rs index 38775be5c..5197c6a29 100644 --- a/src/channels/mattermost.rs +++ b/src/channels/mattermost.rs @@ -464,6 +464,7 @@ impl MattermostChannel { timestamp: (create_at / 1000) as u64, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }) } } diff --git a/src/channels/media_pipeline.rs b/src/channels/media_pipeline.rs new file mode 100644 index 000000000..9bb699bd2 --- /dev/null +++ b/src/channels/media_pipeline.rs @@ -0,0 +1,409 @@ +//! Automatic media understanding pipeline for inbound channel messages. +//! +//! Pre-processes media attachments (audio, images, video) before the agent sees +//! the message, enriching the text with human-readable annotations: +//! +//! - **Audio**: transcribed via the existing [`super::transcription`] infrastructure, +//! prepended as `[Audio transcription: ...]`. +//! - **Images**: when a vision-capable provider is active, described as `[Image: ]`. +//! Falls back to `[Image: attached]` when vision is unavailable. +//! - **Video**: summarised as `[Video summary: ...]` when an API is available, +//! otherwise `[Video: attached]`. +//! +//! The pipeline is **opt-in** via `[media_pipeline] enabled = true` in config. + +use crate::config::{MediaPipelineConfig, TranscriptionConfig}; + +/// Classifies an attachment by MIME type or file extension. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MediaKind { + Audio, + Image, + Video, + Unknown, +} + +/// A single media attachment on an inbound message. +#[derive(Debug, Clone)] +pub struct MediaAttachment { + /// Original file name (e.g. `voice.ogg`, `photo.jpg`). + pub file_name: String, + /// Raw bytes of the attachment. + pub data: Vec, + /// MIME type if known (e.g. `audio/ogg`, `image/jpeg`). + pub mime_type: Option, +} + +impl MediaAttachment { + /// Classify this attachment into a [`MediaKind`]. + pub fn kind(&self) -> MediaKind { + // Try MIME type first. + if let Some(ref mime) = self.mime_type { + let lower = mime.to_ascii_lowercase(); + if lower.starts_with("audio/") { + return MediaKind::Audio; + } + if lower.starts_with("image/") { + return MediaKind::Image; + } + if lower.starts_with("video/") { + return MediaKind::Video; + } + } + + // Fall back to file extension. + let ext = self + .file_name + .rsplit_once('.') + .map(|(_, e)| e.to_ascii_lowercase()) + .unwrap_or_default(); + + match ext.as_str() { + "flac" | "mp3" | "mpeg" | "mpga" | "m4a" | "ogg" | "oga" | "opus" | "wav" | "webm" => { + MediaKind::Audio + } + "png" | "jpg" | "jpeg" | "gif" | "bmp" | "webp" | "heic" | "tiff" | "svg" => { + MediaKind::Image + } + "mp4" | "mkv" | "avi" | "mov" | "wmv" | "flv" => MediaKind::Video, + _ => MediaKind::Unknown, + } + } +} + +/// The media understanding pipeline. +/// +/// Consumes a message's text and attachments, returning enriched text with +/// media annotations prepended. +pub struct MediaPipeline<'a> { + config: &'a MediaPipelineConfig, + transcription_config: &'a TranscriptionConfig, + vision_available: bool, +} + +impl<'a> MediaPipeline<'a> { + /// Create a new pipeline. `vision_available` indicates whether the current + /// provider supports vision (image description). + pub fn new( + config: &'a MediaPipelineConfig, + transcription_config: &'a TranscriptionConfig, + vision_available: bool, + ) -> Self { + Self { + config, + transcription_config, + vision_available, + } + } + + /// Process a message's attachments and return enriched text. + /// + /// If the pipeline is disabled via config, returns `original_text` unchanged. + pub async fn process(&self, original_text: &str, attachments: &[MediaAttachment]) -> String { + if !self.config.enabled || attachments.is_empty() { + return original_text.to_string(); + } + + let mut annotations = Vec::new(); + + for attachment in attachments { + match attachment.kind() { + MediaKind::Audio if self.config.transcribe_audio => { + let annotation = self.process_audio(attachment).await; + annotations.push(annotation); + } + MediaKind::Image if self.config.describe_images => { + let annotation = self.process_image(attachment); + annotations.push(annotation); + } + MediaKind::Video if self.config.summarize_video => { + let annotation = self.process_video(attachment); + annotations.push(annotation); + } + _ => {} + } + } + + if annotations.is_empty() { + return original_text.to_string(); + } + + let mut enriched = String::with_capacity( + annotations.iter().map(|a| a.len() + 1).sum::() + original_text.len() + 2, + ); + + for annotation in &annotations { + enriched.push_str(annotation); + enriched.push('\n'); + } + + if !original_text.is_empty() { + enriched.push('\n'); + enriched.push_str(original_text); + } + + enriched.trim().to_string() + } + + /// Transcribe an audio attachment using the existing transcription infra. + async fn process_audio(&self, attachment: &MediaAttachment) -> String { + if !self.transcription_config.enabled { + return "[Audio: attached]".to_string(); + } + + match super::transcription::transcribe_audio( + attachment.data.clone(), + &attachment.file_name, + self.transcription_config, + ) + .await + { + Ok(text) => { + let trimmed = text.trim(); + if trimmed.is_empty() { + "[Audio transcription: (empty)]".to_string() + } else { + format!("[Audio transcription: {trimmed}]") + } + } + Err(err) => { + tracing::warn!( + file = %attachment.file_name, + error = %err, + "Media pipeline: audio transcription failed" + ); + "[Audio: transcription failed]".to_string() + } + } + } + + /// Describe an image attachment. + /// + /// When vision is available, the image will be passed through to the + /// provider as an `[IMAGE:]` marker and described by the model in the + /// normal flow. Here we only add a placeholder annotation so the agent + /// knows an image is present. + fn process_image(&self, attachment: &MediaAttachment) -> String { + if self.vision_available { + format!( + "[Image: {} attached, will be processed by vision model]", + attachment.file_name + ) + } else { + format!("[Image: {} attached]", attachment.file_name) + } + } + + /// Summarize a video attachment. + /// + /// Video analysis requires external APIs not currently integrated. + /// For now we add a placeholder annotation. + fn process_video(&self, attachment: &MediaAttachment) -> String { + format!("[Video: {} attached]", attachment.file_name) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn default_pipeline_config(enabled: bool) -> MediaPipelineConfig { + MediaPipelineConfig { + enabled, + transcribe_audio: true, + describe_images: true, + summarize_video: true, + } + } + + fn sample_audio() -> MediaAttachment { + MediaAttachment { + file_name: "voice.ogg".to_string(), + data: vec![0u8; 100], + mime_type: Some("audio/ogg".to_string()), + } + } + + fn sample_image() -> MediaAttachment { + MediaAttachment { + file_name: "photo.jpg".to_string(), + data: vec![0u8; 50], + mime_type: Some("image/jpeg".to_string()), + } + } + + fn sample_video() -> MediaAttachment { + MediaAttachment { + file_name: "clip.mp4".to_string(), + data: vec![0u8; 200], + mime_type: Some("video/mp4".to_string()), + } + } + + #[test] + fn media_kind_from_mime() { + let audio = MediaAttachment { + file_name: "file".to_string(), + data: vec![], + mime_type: Some("audio/ogg".to_string()), + }; + assert_eq!(audio.kind(), MediaKind::Audio); + + let image = MediaAttachment { + file_name: "file".to_string(), + data: vec![], + mime_type: Some("image/png".to_string()), + }; + assert_eq!(image.kind(), MediaKind::Image); + + let video = MediaAttachment { + file_name: "file".to_string(), + data: vec![], + mime_type: Some("video/mp4".to_string()), + }; + assert_eq!(video.kind(), MediaKind::Video); + } + + #[test] + fn media_kind_from_extension() { + let audio = MediaAttachment { + file_name: "voice.ogg".to_string(), + data: vec![], + mime_type: None, + }; + assert_eq!(audio.kind(), MediaKind::Audio); + + let image = MediaAttachment { + file_name: "photo.png".to_string(), + data: vec![], + mime_type: None, + }; + assert_eq!(image.kind(), MediaKind::Image); + + let video = MediaAttachment { + file_name: "clip.mp4".to_string(), + data: vec![], + mime_type: None, + }; + assert_eq!(video.kind(), MediaKind::Video); + + let unknown = MediaAttachment { + file_name: "data.bin".to_string(), + data: vec![], + mime_type: None, + }; + assert_eq!(unknown.kind(), MediaKind::Unknown); + } + + #[tokio::test] + async fn disabled_pipeline_returns_original_text() { + let config = default_pipeline_config(false); + let tc = TranscriptionConfig::default(); + let pipeline = MediaPipeline::new(&config, &tc, false); + + let result = pipeline.process("hello", &[sample_audio()]).await; + assert_eq!(result, "hello"); + } + + #[tokio::test] + async fn empty_attachments_returns_original_text() { + let config = default_pipeline_config(true); + let tc = TranscriptionConfig::default(); + let pipeline = MediaPipeline::new(&config, &tc, false); + + let result = pipeline.process("hello", &[]).await; + assert_eq!(result, "hello"); + } + + #[tokio::test] + async fn image_annotation_with_vision() { + let config = default_pipeline_config(true); + let tc = TranscriptionConfig::default(); + let pipeline = MediaPipeline::new(&config, &tc, true); + + let result = pipeline.process("check this", &[sample_image()]).await; + assert!( + result.contains("[Image: photo.jpg attached, will be processed by vision model]"), + "expected vision annotation, got: {result}" + ); + assert!(result.contains("check this")); + } + + #[tokio::test] + async fn image_annotation_without_vision() { + let config = default_pipeline_config(true); + let tc = TranscriptionConfig::default(); + let pipeline = MediaPipeline::new(&config, &tc, false); + + let result = pipeline.process("check this", &[sample_image()]).await; + assert!( + result.contains("[Image: photo.jpg attached]"), + "expected basic image annotation, got: {result}" + ); + } + + #[tokio::test] + async fn video_annotation() { + let config = default_pipeline_config(true); + let tc = TranscriptionConfig::default(); + let pipeline = MediaPipeline::new(&config, &tc, false); + + let result = pipeline.process("watch", &[sample_video()]).await; + assert!( + result.contains("[Video: clip.mp4 attached]"), + "expected video annotation, got: {result}" + ); + } + + #[tokio::test] + async fn audio_without_transcription_enabled() { + let config = default_pipeline_config(true); + let mut tc = TranscriptionConfig::default(); + tc.enabled = false; + let pipeline = MediaPipeline::new(&config, &tc, false); + + let result = pipeline.process("", &[sample_audio()]).await; + assert_eq!(result, "[Audio: attached]"); + } + + #[tokio::test] + async fn multiple_attachments_produce_multiple_annotations() { + let config = default_pipeline_config(true); + let mut tc = TranscriptionConfig::default(); + tc.enabled = false; + let pipeline = MediaPipeline::new(&config, &tc, false); + + let attachments = vec![sample_audio(), sample_image(), sample_video()]; + let result = pipeline.process("context", &attachments).await; + + assert!( + result.contains("[Audio: attached]"), + "missing audio annotation" + ); + assert!( + result.contains("[Image: photo.jpg attached]"), + "missing image annotation" + ); + assert!( + result.contains("[Video: clip.mp4 attached]"), + "missing video annotation" + ); + assert!(result.contains("context"), "missing original text"); + } + + #[tokio::test] + async fn disabled_sub_features_skip_processing() { + let config = MediaPipelineConfig { + enabled: true, + transcribe_audio: false, + describe_images: false, + summarize_video: false, + }; + let tc = TranscriptionConfig::default(); + let pipeline = MediaPipeline::new(&config, &tc, false); + + let attachments = vec![sample_audio(), sample_image(), sample_video()]; + let result = pipeline.process("hello", &attachments).await; + assert_eq!(result, "hello"); + } +} diff --git a/src/channels/mochat.rs b/src/channels/mochat.rs index 6084d45db..bd0dd5627 100644 --- a/src/channels/mochat.rs +++ b/src/channels/mochat.rs @@ -199,6 +199,7 @@ impl Channel for MochatChannel { .as_secs(), thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; if tx.send(channel_msg).await.is_err() { diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 9eb49b996..875e08d5f 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -31,6 +31,7 @@ pub mod linq; #[cfg(feature = "channel-matrix")] pub mod matrix; pub mod mattermost; +pub mod media_pipeline; pub mod mochat; pub mod nextcloud_talk; #[cfg(feature = "channel-nostr")] @@ -367,6 +368,8 @@ struct ChannelRuntimeContext { message_timeout_secs: u64, interrupt_on_new_message: InterruptOnNewMessageConfig, multimodal: crate::config::MultimodalConfig, + media_pipeline: crate::config::MediaPipelineConfig, + transcription_config: crate::config::TranscriptionConfig, hooks: Option>, non_cli_excluded_tools: Arc>, autonomy_level: AutonomyLevel, @@ -2065,6 +2068,17 @@ async fn process_channel_message( msg }; + // ── Media pipeline: enrich inbound message with media annotations ── + if ctx.media_pipeline.enabled && !msg.attachments.is_empty() { + let vision = ctx.provider.supports_vision(); + let pipeline = media_pipeline::MediaPipeline::new( + &ctx.media_pipeline, + &ctx.transcription_config, + vision, + ); + msg.content = Box::pin(pipeline.process(&msg.content, &msg.attachments)).await; + } + // ── Link enricher: prepend URL summaries before agent sees the message ── let le_config = &ctx.prompt_config.link_enricher; if le_config.enabled { @@ -4744,6 +4758,8 @@ pub async fn start_channels(config: Config) -> Result<()> { matrix: interrupt_on_new_message_matrix, }, multimodal: config.multimodal.clone(), + media_pipeline: config.media_pipeline.clone(), + transcription_config: config.transcription.clone(), hooks: if config.hooks.enabled { let mut runner = crate::hooks::HookRunner::new(); if config.hooks.builtin.command_logger { @@ -5115,6 +5131,8 @@ mod tests { matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, provider_runtime_options: providers::ProviderRuntimeOptions::default(), workspace_dir: Arc::new(std::env::temp_dir()), @@ -5232,6 +5250,8 @@ mod tests { matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, provider_runtime_options: providers::ProviderRuntimeOptions::default(), workspace_dir: Arc::new(std::env::temp_dir()), @@ -5305,6 +5325,8 @@ mod tests { matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, provider_runtime_options: providers::ProviderRuntimeOptions::default(), workspace_dir: Arc::new(std::env::temp_dir()), @@ -5397,6 +5419,8 @@ mod tests { matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, provider_runtime_options: providers::ProviderRuntimeOptions::default(), workspace_dir: Arc::new(std::env::temp_dir()), @@ -5943,6 +5967,8 @@ BTC is currently around $65,000 based on latest tool output."# non_cli_excluded_tools: Arc::new(Vec::new()), tool_call_dedup_exempt: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, model_routes: Arc::new(Vec::new()), query_classification: crate::config::QueryClassificationConfig::default(), @@ -5968,6 +5994,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -6025,6 +6052,8 @@ BTC is currently around $65,000 based on latest tool output."# autonomy_level: AutonomyLevel::default(), tool_call_dedup_exempt: Arc::new(Vec::new()), multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, model_routes: Arc::new(Vec::new()), query_classification: crate::config::QueryClassificationConfig::default(), @@ -6050,6 +6079,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -6118,6 +6148,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -6146,6 +6178,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 3, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -6199,6 +6232,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -6227,6 +6262,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 2, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -6290,6 +6326,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -6318,6 +6356,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -6402,6 +6441,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -6430,6 +6471,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 2, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -6495,6 +6537,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -6523,6 +6567,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 3, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -6603,6 +6648,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -6631,6 +6678,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 4, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -6696,6 +6744,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -6727,6 +6777,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -6782,6 +6833,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -6813,6 +6866,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 2, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -6986,6 +7040,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -7013,6 +7069,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }) .await .unwrap(); @@ -7025,6 +7082,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 2, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }) .await .unwrap(); @@ -7089,6 +7147,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -7117,6 +7177,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }) .await .unwrap(); @@ -7130,6 +7191,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 2, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }) .await .unwrap(); @@ -7210,6 +7272,8 @@ BTC is currently around $65,000 based on latest tool output."# show_tool_calls: true, session_store: None, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -7235,6 +7299,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: Some("1741234567.100001".to_string()), interruption_scope_id: Some("1741234567.100001".to_string()), + attachments: vec![], }) .await .unwrap(); @@ -7248,6 +7313,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 2, thread_ts: Some("1741234567.100001".to_string()), interruption_scope_id: Some("1741234567.100001".to_string()), + attachments: vec![], }) .await .unwrap(); @@ -7322,6 +7388,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -7350,6 +7418,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }) .await .unwrap(); @@ -7363,6 +7432,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 2, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }) .await .unwrap(); @@ -7419,6 +7489,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -7447,6 +7519,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -7500,6 +7573,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -7528,6 +7603,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -8057,6 +8133,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; assert_eq!(conversation_memory_key(&msg), "slack_U123_msg_abc123"); @@ -8073,6 +8150,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: Some("1741234567.123456".into()), interruption_scope_id: None, + attachments: vec![], }; assert_eq!( @@ -8092,6 +8170,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; assert_eq!(followup_thread_id(&msg).as_deref(), Some("msg_abc123")); @@ -8108,6 +8187,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; let msg2 = traits::ChannelMessage { id: "msg_2".into(), @@ -8118,6 +8198,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 2, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; assert_ne!( @@ -8140,6 +8221,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; let msg2 = traits::ChannelMessage { id: "msg_2".into(), @@ -8150,6 +8232,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 2, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; mem.store( @@ -8271,6 +8354,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -8299,6 +8384,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -8315,6 +8401,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 2, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -8403,6 +8490,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -8431,6 +8520,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -8463,6 +8553,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 2, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -8501,6 +8592,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 3, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -8575,6 +8667,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -8603,6 +8697,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -8684,6 +8779,8 @@ BTC is currently around $65,000 based on latest tool output."# matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -8712,6 +8809,7 @@ BTC is currently around $65,000 based on latest tool output."# timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -9257,6 +9355,8 @@ This is an example JSON object for profile settings."#; matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -9286,6 +9386,7 @@ This is an example JSON object for profile settings."#; timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -9345,6 +9446,8 @@ This is an example JSON object for profile settings."#; matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -9373,6 +9476,7 @@ This is an example JSON object for profile settings."#; timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -9389,6 +9493,7 @@ This is an example JSON object for profile settings."#; timestamp: 2, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -9508,6 +9613,8 @@ This is an example JSON object for profile settings."#; matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -9536,6 +9643,7 @@ This is an example JSON object for profile settings."#; timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -9620,6 +9728,8 @@ This is an example JSON object for profile settings."#; matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -9648,6 +9758,7 @@ This is an example JSON object for profile settings."#; timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -9724,6 +9835,8 @@ This is an example JSON object for profile settings."#; matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -9752,6 +9865,7 @@ This is an example JSON object for profile settings."#; timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -9848,6 +9962,8 @@ This is an example JSON object for profile settings."#; matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -9876,6 +9992,7 @@ This is an example JSON object for profile settings."#; timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, CancellationToken::new(), ) @@ -10033,6 +10150,7 @@ This is an example JSON object for profile settings."#; timestamp: 0, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; assert_eq!(interruption_scope_key(&msg), "matrix_room_alice"); } @@ -10048,6 +10166,7 @@ This is an example JSON object for profile settings."#; timestamp: 0, thread_ts: Some("$thread1".into()), interruption_scope_id: Some("$thread1".into()), + attachments: vec![], }; assert_eq!(interruption_scope_key(&msg), "matrix_room_alice_$thread1"); } @@ -10064,6 +10183,7 @@ This is an example JSON object for profile settings."#; timestamp: 0, thread_ts: Some("1234567890.000100".into()), // Slack top-level fallback interruption_scope_id: None, // but NOT a thread reply + attachments: vec![], }; assert_eq!(interruption_scope_key(&msg), "slack_C123_alice"); } @@ -10110,6 +10230,8 @@ This is an example JSON object for profile settings."#; matrix: false, }, multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), + transcription_config: crate::config::TranscriptionConfig::default(), hooks: None, non_cli_excluded_tools: Arc::new(Vec::new()), autonomy_level: AutonomyLevel::default(), @@ -10140,6 +10262,7 @@ This is an example JSON object for profile settings."#; timestamp: 1, thread_ts: Some("1741234567.100001".to_string()), interruption_scope_id: Some("1741234567.100001".to_string()), + attachments: vec![], }) .await .unwrap(); @@ -10153,6 +10276,7 @@ This is an example JSON object for profile settings."#; timestamp: 2, thread_ts: Some("1741234567.200002".to_string()), interruption_scope_id: Some("1741234567.200002".to_string()), + attachments: vec![], }) .await .unwrap(); diff --git a/src/channels/nextcloud_talk.rs b/src/channels/nextcloud_talk.rs index 8b08d5233..071558378 100644 --- a/src/channels/nextcloud_talk.rs +++ b/src/channels/nextcloud_talk.rs @@ -206,6 +206,7 @@ impl NextcloudTalkChannel { timestamp: Self::now_unix_secs(), thread_ts: None, interruption_scope_id: None, + attachments: vec![], }); messages @@ -308,6 +309,7 @@ impl NextcloudTalkChannel { timestamp, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }); messages diff --git a/src/channels/nostr.rs b/src/channels/nostr.rs index 97fb40db3..a04a01619 100644 --- a/src/channels/nostr.rs +++ b/src/channels/nostr.rs @@ -254,6 +254,7 @@ impl Channel for NostrChannel { timestamp, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; if tx.send(msg).await.is_err() { tracing::info!("Nostr listener: message bus closed, stopping"); diff --git a/src/channels/notion.rs b/src/channels/notion.rs index ce00989f3..7586d3d4c 100644 --- a/src/channels/notion.rs +++ b/src/channels/notion.rs @@ -361,6 +361,7 @@ impl Channel for NotionChannel { timestamp, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }) .await .is_err() diff --git a/src/channels/qq.rs b/src/channels/qq.rs index c0831c072..e06c9462f 100644 --- a/src/channels/qq.rs +++ b/src/channels/qq.rs @@ -1139,6 +1139,7 @@ impl Channel for QQChannel { .as_secs(), thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; if tx.send(channel_msg).await.is_err() { @@ -1178,6 +1179,7 @@ impl Channel for QQChannel { .as_secs(), thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; if tx.send(channel_msg).await.is_err() { diff --git a/src/channels/reddit.rs b/src/channels/reddit.rs index a26450d66..2f0a42f2d 100644 --- a/src/channels/reddit.rs +++ b/src/channels/reddit.rs @@ -226,6 +226,7 @@ impl RedditChannel { timestamp, thread_ts: item.parent_id.clone(), interruption_scope_id: None, + attachments: vec![], }) } } diff --git a/src/channels/signal.rs b/src/channels/signal.rs index 6abdb3743..9901688a6 100644 --- a/src/channels/signal.rs +++ b/src/channels/signal.rs @@ -280,6 +280,7 @@ impl SignalChannel { timestamp: timestamp / 1000, // millis → secs thread_ts: None, interruption_scope_id: None, + attachments: vec![], }) } } diff --git a/src/channels/slack.rs b/src/channels/slack.rs index 061f439dd..2c5c0e85f 100644 --- a/src/channels/slack.rs +++ b/src/channels/slack.rs @@ -2027,6 +2027,7 @@ impl SlackChannel { Self::inbound_thread_ts_genuine_only(event) }, interruption_scope_id: Self::inbound_interruption_scope_id(event, ts), + attachments: vec![], }; // Track thread context so start_typing can set assistant status. @@ -2701,6 +2702,7 @@ impl Channel for SlackChannel { Self::inbound_thread_ts_genuine_only(msg) }, interruption_scope_id: Self::inbound_interruption_scope_id(msg, ts), + attachments: vec![], }; if tx.send(channel_msg).await.is_err() { @@ -2786,6 +2788,7 @@ impl Channel for SlackChannel { .as_secs(), thread_ts: Some(thread_ts.clone()), interruption_scope_id: Some(thread_ts.clone()), + attachments: vec![], }; if tx.send(channel_msg).await.is_err() { @@ -3738,6 +3741,7 @@ mod tests { timestamp: 0, thread_ts: None, // thread_replies=false → no fallback to ts interruption_scope_id: None, + attachments: vec![], }; let msg1 = make_msg("100.000"); @@ -3763,6 +3767,7 @@ mod tests { timestamp: 0, thread_ts: Some(ts.to_string()), // thread_replies=true → ts as thread_ts interruption_scope_id: None, + attachments: vec![], }; let msg1 = make_msg("100.000"); diff --git a/src/channels/telegram.rs b/src/channels/telegram.rs index 9175e1b07..a6faa87ad 100644 --- a/src/channels/telegram.rs +++ b/src/channels/telegram.rs @@ -1157,6 +1157,7 @@ Allowlist Telegram username (without '@') or numeric user ID.", .as_secs(), thread_ts: thread_id, interruption_scope_id: None, + attachments: vec![], }) } @@ -1287,6 +1288,7 @@ Allowlist Telegram username (without '@') or numeric user ID.", .as_secs(), thread_ts: thread_id, interruption_scope_id: None, + attachments: vec![], }) } @@ -1491,6 +1493,7 @@ Allowlist Telegram username (without '@') or numeric user ID.", .as_secs(), thread_ts: thread_id, interruption_scope_id: None, + attachments: vec![], }) } diff --git a/src/channels/traits.rs b/src/channels/traits.rs index 861c3c12c..5dcbdf524 100644 --- a/src/channels/traits.rs +++ b/src/channels/traits.rs @@ -17,6 +17,10 @@ pub struct ChannelMessage { /// is genuinely inside a reply thread and should be isolated from other threads. /// `None` means top-level — scope is sender+channel only. pub interruption_scope_id: Option, + /// Media attachments (audio, images, video) for the media pipeline. + /// Channels populate this when they receive media alongside a text message. + /// Defaults to empty — existing channels are unaffected. + pub attachments: Vec, } /// Message to send through a channel @@ -188,6 +192,7 @@ mod tests { timestamp: 123, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }) .await .map_err(|e| anyhow::anyhow!(e.to_string())) @@ -205,6 +210,7 @@ mod tests { timestamp: 999, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; let cloned = message.clone(); diff --git a/src/channels/twitter.rs b/src/channels/twitter.rs index fa955e8dc..78efe030e 100644 --- a/src/channels/twitter.rs +++ b/src/channels/twitter.rs @@ -289,6 +289,7 @@ impl Channel for TwitterChannel { .and_then(|c| c.as_str()) .map(|s| s.to_string()), interruption_scope_id: None, + attachments: vec![], }; if tx.send(channel_msg).await.is_err() { diff --git a/src/channels/wati.rs b/src/channels/wati.rs index 2eecba154..4e3cecbd9 100644 --- a/src/channels/wati.rs +++ b/src/channels/wati.rs @@ -211,6 +211,7 @@ impl WatiChannel { timestamp, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }); messages diff --git a/src/channels/webhook.rs b/src/channels/webhook.rs index 4355eb1ae..ef005e62d 100644 --- a/src/channels/webhook.rs +++ b/src/channels/webhook.rs @@ -238,6 +238,7 @@ impl Channel for WebhookChannel { timestamp, thread_ts: payload.thread_id, interruption_scope_id: None, + attachments: vec![], }; if state.tx.send(msg).await.is_err() { diff --git a/src/channels/whatsapp.rs b/src/channels/whatsapp.rs index 22261cf0f..0ef8761de 100644 --- a/src/channels/whatsapp.rs +++ b/src/channels/whatsapp.rs @@ -152,6 +152,7 @@ impl WhatsAppChannel { timestamp, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }); } } diff --git a/src/channels/whatsapp_web.rs b/src/channels/whatsapp_web.rs index 45b047923..d0581a074 100644 --- a/src/channels/whatsapp_web.rs +++ b/src/channels/whatsapp_web.rs @@ -822,6 +822,7 @@ impl Channel for WhatsAppWebChannel { timestamp: chrono::Utc::now().timestamp() as u64, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }) .await { diff --git a/src/config/mod.rs b/src/config/mod.rs index 986249940..298849f43 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -19,10 +19,10 @@ pub use schema::{ ImageProviderFluxConfig, ImageProviderImagenConfig, ImageProviderStabilityConfig, JiraConfig, KnowledgeConfig, LarkConfig, LinkEnricherConfig, LinkedInConfig, LinkedInContentConfig, LinkedInImageConfig, LocalWhisperConfig, MatrixConfig, McpConfig, McpServerConfig, - McpTransport, MemoryConfig, MemoryPolicyConfig, Microsoft365Config, ModelRouteConfig, - MultimodalConfig, NextcloudTalkConfig, NodeTransportConfig, NodesConfig, NotionConfig, - ObservabilityConfig, OpenAiSttConfig, OpenAiTtsConfig, OpenVpnTunnelConfig, OtpConfig, - OtpMethod, PacingConfig, PeripheralBoardConfig, PeripheralsConfig, PiperTtsConfig, + McpTransport, MediaPipelineConfig, MemoryConfig, MemoryPolicyConfig, Microsoft365Config, + ModelRouteConfig, MultimodalConfig, NextcloudTalkConfig, NodeTransportConfig, NodesConfig, + NotionConfig, ObservabilityConfig, OpenAiSttConfig, OpenAiTtsConfig, OpenVpnTunnelConfig, + OtpConfig, OtpMethod, PacingConfig, PeripheralBoardConfig, PeripheralsConfig, PiperTtsConfig, PluginsConfig, ProjectIntelConfig, ProxyConfig, ProxyScope, QdrantConfig, QueryClassificationConfig, ReliabilityConfig, ResourceLimitsConfig, RuntimeConfig, SandboxBackend, SandboxConfig, SchedulerConfig, SecretsConfig, SecurityConfig, diff --git a/src/config/schema.rs b/src/config/schema.rs index 22e691e83..8a4dbc6c9 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -261,6 +261,10 @@ pub struct Config { #[serde(default)] pub multimodal: MultimodalConfig, + /// Automatic media understanding pipeline (`[media_pipeline]`). + #[serde(default)] + pub media_pipeline: MediaPipelineConfig, + /// Web fetch tool configuration (`[web_fetch]`). #[serde(default)] pub web_fetch: WebFetchConfig, @@ -1519,6 +1523,44 @@ impl Default for MultimodalConfig { } } +// ── Media Pipeline ────────────────────────────────────────────── + +/// Automatic media understanding pipeline configuration (`[media_pipeline]`). +/// +/// When enabled, inbound channel messages with media attachments are +/// pre-processed before reaching the agent: audio is transcribed, images are +/// annotated, and videos are summarised. +#[allow(clippy::struct_excessive_bools)] +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct MediaPipelineConfig { + /// Master toggle for the media pipeline (default: false). + #[serde(default)] + pub enabled: bool, + + /// Transcribe audio attachments using the configured transcription provider. + #[serde(default = "default_true")] + pub transcribe_audio: bool, + + /// Add image descriptions when a vision-capable model is active. + #[serde(default = "default_true")] + pub describe_images: bool, + + /// Summarize video attachments (placeholder — requires external API). + #[serde(default = "default_true")] + pub summarize_video: bool, +} + +impl Default for MediaPipelineConfig { + fn default() -> Self { + Self { + enabled: false, + transcribe_audio: true, + describe_images: true, + summarize_video: true, + } + } +} + // ── Identity (AIEOS / OpenClaw format) ────────────────────────── /// Identity format configuration (`[identity]` section). @@ -7266,6 +7308,7 @@ impl Default for Config { browser_delegate: crate::tools::browser_delegate::BrowserDelegateConfig::default(), http_request: HttpRequestConfig::default(), multimodal: MultimodalConfig::default(), + media_pipeline: MediaPipelineConfig::default(), web_fetch: WebFetchConfig::default(), link_enricher: LinkEnricherConfig::default(), text_browser: TextBrowserConfig::default(), @@ -10254,6 +10297,7 @@ default_temperature = 0.7 browser_delegate: crate::tools::browser_delegate::BrowserDelegateConfig::default(), http_request: HttpRequestConfig::default(), multimodal: MultimodalConfig::default(), + media_pipeline: MediaPipelineConfig::default(), web_fetch: WebFetchConfig::default(), link_enricher: LinkEnricherConfig::default(), text_browser: TextBrowserConfig::default(), @@ -10771,6 +10815,7 @@ default_temperature = 0.7 browser_delegate: crate::tools::browser_delegate::BrowserDelegateConfig::default(), http_request: HttpRequestConfig::default(), multimodal: MultimodalConfig::default(), + media_pipeline: MediaPipelineConfig::default(), web_fetch: WebFetchConfig::default(), link_enricher: LinkEnricherConfig::default(), text_browser: TextBrowserConfig::default(), diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 42b47c024..7407807dd 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -2394,6 +2394,7 @@ mod tests { timestamp: 1, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; let key = whatsapp_memory_key(&msg); diff --git a/src/onboard/wizard.rs b/src/onboard/wizard.rs index bfdb7d8b4..340ddd86c 100644 --- a/src/onboard/wizard.rs +++ b/src/onboard/wizard.rs @@ -172,6 +172,7 @@ pub async fn run_wizard(force: bool) -> Result { browser_delegate: crate::tools::browser_delegate::BrowserDelegateConfig::default(), http_request: crate::config::HttpRequestConfig::default(), multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), web_fetch: crate::config::WebFetchConfig::default(), link_enricher: crate::config::LinkEnricherConfig::default(), text_browser: crate::config::TextBrowserConfig::default(), @@ -605,6 +606,7 @@ async fn run_quick_setup_with_home( browser_delegate: crate::tools::browser_delegate::BrowserDelegateConfig::default(), http_request: crate::config::HttpRequestConfig::default(), multimodal: crate::config::MultimodalConfig::default(), + media_pipeline: crate::config::MediaPipelineConfig::default(), web_fetch: crate::config::WebFetchConfig::default(), link_enricher: crate::config::LinkEnricherConfig::default(), text_browser: crate::config::TextBrowserConfig::default(), diff --git a/tests/integration/channel_matrix.rs b/tests/integration/channel_matrix.rs index 419c66807..8ead8450b 100644 --- a/tests/integration/channel_matrix.rs +++ b/tests/integration/channel_matrix.rs @@ -125,6 +125,7 @@ impl Channel for MatrixTestChannel { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }) .await .map_err(|e| anyhow::anyhow!(e.to_string())) @@ -566,6 +567,7 @@ fn channel_message_thread_ts_preserved_on_clone() { timestamp: 1700000000, thread_ts: Some("1700000000.000001".into()), interruption_scope_id: None, + attachments: vec![], }; let cloned = msg.clone(); @@ -583,6 +585,7 @@ fn channel_message_none_thread_ts_preserved() { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; assert!(msg.clone().thread_ts.is_none()); @@ -637,6 +640,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, "discord" => ChannelMessage { id: "dc_1".into(), @@ -647,6 +651,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, "slack" => ChannelMessage { id: "sl_1".into(), @@ -657,6 +662,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: Some("1700000000.000001".into()), interruption_scope_id: None, + attachments: vec![], }, "imessage" => ChannelMessage { id: "im_1".into(), @@ -667,6 +673,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, "irc" => ChannelMessage { id: "irc_1".into(), @@ -677,6 +684,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, "email" => ChannelMessage { id: "email_1".into(), @@ -687,6 +695,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, "signal" => ChannelMessage { id: "sig_1".into(), @@ -697,6 +706,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, "mattermost" => ChannelMessage { id: "mm_1".into(), @@ -707,6 +717,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: Some("root_msg_id".into()), interruption_scope_id: None, + attachments: vec![], }, "whatsapp" => ChannelMessage { id: "wa_1".into(), @@ -717,6 +728,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, "nextcloud_talk" => ChannelMessage { id: "nc_1".into(), @@ -727,6 +739,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, "wecom" => ChannelMessage { id: "wc_1".into(), @@ -737,6 +750,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, "dingtalk" => ChannelMessage { id: "dt_1".into(), @@ -747,6 +761,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, "qq" => ChannelMessage { id: "qq_1".into(), @@ -757,6 +772,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, "linq" => ChannelMessage { id: "lq_1".into(), @@ -767,6 +783,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, "wati" => ChannelMessage { id: "wt_1".into(), @@ -777,6 +794,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, "cli" => ChannelMessage { id: "cli_1".into(), @@ -787,6 +805,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }, _ => panic!("Unknown platform: {platform}"), } @@ -1088,6 +1107,7 @@ fn channel_message_zero_timestamp() { timestamp: 0, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; assert_eq!(msg.timestamp, 0); } @@ -1103,6 +1123,7 @@ fn channel_message_max_timestamp() { timestamp: u64::MAX, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; assert_eq!(msg.timestamp, u64::MAX); } diff --git a/tests/integration/channel_routing.rs b/tests/integration/channel_routing.rs index 8d48b5d91..93fdeacf8 100644 --- a/tests/integration/channel_routing.rs +++ b/tests/integration/channel_routing.rs @@ -26,6 +26,7 @@ fn channel_message_sender_field_holds_platform_user_id() { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; assert_eq!(msg.sender, "123456789"); @@ -49,6 +50,7 @@ fn channel_message_reply_target_distinct_from_sender() { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; assert_ne!( @@ -70,6 +72,7 @@ fn channel_message_fields_not_swapped() { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; assert_eq!( @@ -97,6 +100,7 @@ fn channel_message_preserves_all_fields_on_clone() { timestamp: 1700000001, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }; let cloned = original.clone(); @@ -191,6 +195,7 @@ impl Channel for CapturingChannel { timestamp: 1700000000, thread_ts: None, interruption_scope_id: None, + attachments: vec![], }) .await .map_err(|e| anyhow::anyhow!(e.to_string()))