From 21ccb9e13aefff5c55825955e79931b5e8570282 Mon Sep 17 00:00:00 2001 From: Aleksandr Prilipko Date: Thu, 19 Feb 2026 21:26:54 +0700 Subject: [PATCH] feat(channel): add voice message transcription via Whisper API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add voice-to-text transcription for Telegram voice/audio messages using any Whisper-compatible API (Groq by default, configurable endpoint). - New TranscriptionConfig in config schema (enabled, api_url, model, language, max_duration_secs) with serde defaults - New transcription module: MIME detection, .oga→.ogg normalization, size/format validation, Whisper API client - Telegram: voice download pipeline (getFile → CDN download → transcribe), listen loop fallback for voice messages, [Voice] prefix on transcribed text - Proxy support via "transcription.groq" service key - 18 new tests (MIME mapping, normalization, config roundtrip, voice metadata parsing, builder wiring, format/size rejection) Disabled by default (enabled: false). Fail-fast validation order: size → format → API key. Co-Authored-By: Claude Opus 4.6 --- src/channels/mod.rs | 4 +- src/channels/telegram.rs | 257 +++++++++++++++++++++++++++++++++- src/channels/transcription.rs | 218 ++++++++++++++++++++++++++++ src/config/mod.rs | 4 +- src/config/schema.rs | 100 ++++++++++++- src/onboard/wizard.rs | 2 + 6 files changed, 576 insertions(+), 9 deletions(-) create mode 100644 src/channels/transcription.rs diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 0980f1a80..ec3cc6a0e 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -32,6 +32,7 @@ pub mod signal; pub mod slack; pub mod telegram; pub mod traits; +pub mod transcription; pub mod whatsapp; #[cfg(feature = "whatsapp-web")] pub mod whatsapp_storage; @@ -2308,7 +2309,8 @@ fn collect_configured_channels( tg.allowed_users.clone(), tg.mention_only, ) - .with_streaming(tg.stream_mode, tg.draft_update_interval_ms), + .with_streaming(tg.stream_mode, tg.draft_update_interval_ms) + .with_transcription(config.transcription.clone()), ), }); } diff --git a/src/channels/telegram.rs b/src/channels/telegram.rs index 2a8d583ae..e72d3426e 100644 --- a/src/channels/telegram.rs +++ b/src/channels/telegram.rs @@ -329,6 +329,7 @@ pub struct TelegramChannel { /// Base URL for the Telegram Bot API. Defaults to `https://api.telegram.org`. /// Override for local Bot API servers or testing. api_base: String, + transcription: Option, } impl TelegramChannel { @@ -357,6 +358,8 @@ impl TelegramChannel { mention_only, bot_username: Mutex::new(None), api_base: "https://api.telegram.org".to_string(), + transcription: None, + api_base: "https://api.telegram.org".to_string(), } } @@ -378,6 +381,14 @@ impl TelegramChannel { self } + /// Configure voice transcription. + pub fn with_transcription(mut self, config: crate::config::TranscriptionConfig) -> Self { + if config.enabled { + self.transcription = Some(config); + } + self + } + /// Parse reply_target into (chat_id, optional thread_id). fn parse_reply_target(reply_target: &str) -> (String, Option) { if let Some((chat_id, thread_id)) = reply_target.split_once(':') { @@ -767,6 +778,176 @@ Allowlist Telegram username (without '@') or numeric user ID.", } } + /// Get the file path for a Telegram file ID via the Bot API. + async fn get_file_path(&self, file_id: &str) -> anyhow::Result { + let url = self.api_url("getFile"); + let resp = self + .http_client() + .get(&url) + .query(&[("file_id", file_id)]) + .send() + .await + .context("Failed to call Telegram getFile")?; + + let data: serde_json::Value = resp.json().await?; + data.get("result") + .and_then(|r| r.get("file_path")) + .and_then(serde_json::Value::as_str) + .map(String::from) + .context("Telegram getFile: missing file_path in response") + } + + /// Download a file from the Telegram CDN. + async fn download_file(&self, file_path: &str) -> anyhow::Result> { + let url = format!( + "https://api.telegram.org/file/bot{}/{file_path}", + self.bot_token + ); + let resp = self + .http_client() + .get(&url) + .send() + .await + .context("Failed to download Telegram file")?; + + if !resp.status().is_success() { + anyhow::bail!("Telegram file download failed: {}", resp.status()); + } + + Ok(resp.bytes().await?.to_vec()) + } + + /// Extract (file_id, duration) from a voice or audio message. + fn parse_voice_metadata(message: &serde_json::Value) -> Option<(String, u64)> { + let voice = message.get("voice").or_else(|| message.get("audio"))?; + let file_id = voice.get("file_id")?.as_str()?.to_string(); + let duration = voice + .get("duration") + .and_then(serde_json::Value::as_u64) + .unwrap_or(0); + Some((file_id, duration)) + } + + /// Attempt to parse a Telegram update as a voice message and transcribe it. + /// + /// Returns `None` if the message is not a voice message, transcription is disabled, + /// or the message exceeds duration limits. + async fn try_parse_voice_message(&self, update: &serde_json::Value) -> Option { + let config = self.transcription.as_ref()?; + let message = update.get("message")?; + + let (file_id, duration) = Self::parse_voice_metadata(message)?; + + if duration > config.max_duration_secs { + tracing::info!( + "Skipping voice message: duration {duration}s exceeds limit {}s", + config.max_duration_secs + ); + return None; + } + + // Extract sender info (same logic as parse_update_message) + let username = message + .get("from") + .and_then(|from| from.get("username")) + .and_then(serde_json::Value::as_str) + .unwrap_or("unknown") + .to_string(); + + let sender_id = message + .get("from") + .and_then(|from| from.get("id")) + .and_then(serde_json::Value::as_i64) + .map(|id| id.to_string()); + + let sender_identity = if username == "unknown" { + sender_id.clone().unwrap_or_else(|| "unknown".to_string()) + } else { + username.clone() + }; + + let mut identities = vec![username.as_str()]; + if let Some(id) = sender_id.as_deref() { + identities.push(id); + } + + if !self.is_any_user_allowed(identities.iter().copied()) { + return None; + } + + let chat_id = message + .get("chat") + .and_then(|chat| chat.get("id")) + .and_then(serde_json::Value::as_i64) + .map(|id| id.to_string())?; + + let message_id = message + .get("message_id") + .and_then(serde_json::Value::as_i64) + .unwrap_or(0); + + let thread_id = message + .get("message_thread_id") + .and_then(serde_json::Value::as_i64) + .map(|id| id.to_string()); + + let reply_target = if let Some(tid) = thread_id { + format!("{}:{}", chat_id, tid) + } else { + chat_id.clone() + }; + + // Download and transcribe + let file_path = match self.get_file_path(&file_id).await { + Ok(p) => p, + Err(e) => { + tracing::warn!("Failed to get voice file path: {e}"); + return None; + } + }; + + let file_name = file_path + .rsplit('/') + .next() + .unwrap_or("voice.ogg") + .to_string(); + + let audio_data = match self.download_file(&file_path).await { + Ok(d) => d, + Err(e) => { + tracing::warn!("Failed to download voice file: {e}"); + return None; + } + }; + + let text = + match super::transcription::transcribe_audio(audio_data, &file_name, config).await { + Ok(t) => t, + Err(e) => { + tracing::warn!("Voice transcription failed: {e}"); + return None; + } + }; + + if text.trim().is_empty() { + tracing::info!("Voice transcription returned empty text, skipping"); + return None; + } + + Some(ChannelMessage { + id: format!("telegram_{chat_id}_{message_id}"), + sender: sender_identity, + reply_target, + content: format!("[Voice] {text}"), + channel: "telegram".to_string(), + timestamp: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(), + thread_ts: None, + }) + } + fn parse_update_message( &self, update: &serde_json::Value, @@ -1900,10 +2081,15 @@ Ensure only one `zeroclaw` process is using this bot token." offset = uid + 1; } - let Some((mut msg, photo_file_id)) = self.parse_update_message(update) else { - self.handle_unauthorized_message(update).await; - continue; - }; + let (mut msg, photo_file_id) = + if let Some(parsed) = self.parse_update_message(update) { + parsed + } else if let Some(voice_msg) = self.try_parse_voice_message(update).await { + (voice_msg, None) + } else { + self.handle_unauthorized_message(update).await; + continue; + }; // Resolve photo file_id to data URI and inject as IMAGE marker if let Some(file_id) = photo_file_id { @@ -3102,4 +3288,67 @@ mod tests { assert!(part.len() <= TELEGRAM_MAX_MESSAGE_LENGTH); } } + + #[test] + fn parse_voice_metadata_extracts_voice() { + let msg = serde_json::json!({ + "voice": { + "file_id": "abc123", + "duration": 5 + } + }); + let (file_id, dur) = TelegramChannel::parse_voice_metadata(&msg).unwrap(); + assert_eq!(file_id, "abc123"); + assert_eq!(dur, 5); + } + + #[test] + fn parse_voice_metadata_extracts_audio() { + let msg = serde_json::json!({ + "audio": { + "file_id": "audio456", + "duration": 30 + } + }); + let (file_id, dur) = TelegramChannel::parse_voice_metadata(&msg).unwrap(); + assert_eq!(file_id, "audio456"); + assert_eq!(dur, 30); + } + + #[test] + fn parse_voice_metadata_returns_none_for_text() { + let msg = serde_json::json!({ + "text": "hello" + }); + assert!(TelegramChannel::parse_voice_metadata(&msg).is_none()); + } + + #[test] + fn parse_voice_metadata_defaults_duration_to_zero() { + let msg = serde_json::json!({ + "voice": { + "file_id": "no_dur" + } + }); + let (_, dur) = TelegramChannel::parse_voice_metadata(&msg).unwrap(); + assert_eq!(dur, 0); + } + + #[test] + fn with_transcription_sets_config_when_enabled() { + let mut tc = crate::config::TranscriptionConfig::default(); + tc.enabled = true; + + let ch = + TelegramChannel::new("token".into(), vec!["*".into()], false).with_transcription(tc); + assert!(ch.transcription.is_some()); + } + + #[test] + fn with_transcription_skips_when_disabled() { + let tc = crate::config::TranscriptionConfig::default(); // enabled = false + let ch = + TelegramChannel::new("token".into(), vec!["*".into()], false).with_transcription(tc); + assert!(ch.transcription.is_none()); + } } diff --git a/src/channels/transcription.rs b/src/channels/transcription.rs new file mode 100644 index 000000000..a7533c0a1 --- /dev/null +++ b/src/channels/transcription.rs @@ -0,0 +1,218 @@ +use anyhow::{bail, Context, Result}; +use reqwest::multipart::{Form, Part}; + +use crate::config::TranscriptionConfig; + +/// Maximum upload size accepted by the Groq Whisper API (25 MB). +const MAX_AUDIO_BYTES: usize = 25 * 1024 * 1024; + +/// Map file extension to MIME type for Whisper-compatible transcription APIs. +fn mime_for_audio(extension: &str) -> Option<&'static str> { + match extension.to_ascii_lowercase().as_str() { + "flac" => Some("audio/flac"), + "mp3" | "mpeg" | "mpga" => Some("audio/mpeg"), + "mp4" | "m4a" => Some("audio/mp4"), + "ogg" | "oga" => Some("audio/ogg"), + "opus" => Some("audio/opus"), + "wav" => Some("audio/wav"), + "webm" => Some("audio/webm"), + _ => None, + } +} + +/// Normalize audio filename for Whisper-compatible APIs. +/// +/// Groq validates the filename extension — `.oga` (Opus-in-Ogg) is not in +/// its accepted list, so we rewrite it to `.ogg`. +fn normalize_audio_filename(file_name: &str) -> String { + match file_name.rsplit_once('.') { + Some((stem, ext)) if ext.eq_ignore_ascii_case("oga") => format!("{stem}.ogg"), + _ => file_name.to_string(), + } +} + +/// Transcribe audio bytes via a Whisper-compatible transcription API. +/// +/// Returns the transcribed text on success. Requires `GROQ_API_KEY` in the +/// environment. The caller is responsible for enforcing duration limits +/// *before* downloading the file; this function enforces the byte-size cap. +pub async fn transcribe_audio( + audio_data: Vec, + file_name: &str, + config: &TranscriptionConfig, +) -> Result { + if audio_data.len() > MAX_AUDIO_BYTES { + bail!( + "Audio file too large ({} bytes, max {MAX_AUDIO_BYTES})", + audio_data.len() + ); + } + + let normalized_name = normalize_audio_filename(file_name); + let extension = normalized_name + .rsplit_once('.') + .map(|(_, e)| e) + .unwrap_or(""); + let mime = mime_for_audio(extension).ok_or_else(|| { + anyhow::anyhow!( + "Unsupported audio format '.{extension}' — accepted: flac, mp3, mp4, mpeg, mpga, m4a, ogg, opus, wav, webm" + ) + })?; + + let api_key = std::env::var("GROQ_API_KEY").context( + "GROQ_API_KEY environment variable is not set — required for voice transcription", + )?; + + let client = crate::config::build_runtime_proxy_client("transcription.groq"); + + let file_part = Part::bytes(audio_data) + .file_name(normalized_name) + .mime_str(mime)?; + + let mut form = Form::new() + .part("file", file_part) + .text("model", config.model.clone()) + .text("response_format", "json"); + + if let Some(ref lang) = config.language { + form = form.text("language", lang.clone()); + } + + let resp = client + .post(&config.api_url) + .bearer_auth(&api_key) + .multipart(form) + .send() + .await + .context("Failed to send transcription request")?; + + let status = resp.status(); + let body: serde_json::Value = resp + .json() + .await + .context("Failed to parse transcription response")?; + + if !status.is_success() { + let error_msg = body["error"]["message"].as_str().unwrap_or("unknown error"); + bail!("Transcription API error ({}): {}", status, error_msg); + } + + let text = body["text"] + .as_str() + .context("Transcription response missing 'text' field")? + .to_string(); + + Ok(text) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn rejects_oversized_audio() { + let big = vec![0u8; MAX_AUDIO_BYTES + 1]; + let config = TranscriptionConfig::default(); + + let err = transcribe_audio(big, "test.ogg", &config) + .await + .unwrap_err(); + assert!( + err.to_string().contains("too large"), + "expected size error, got: {err}" + ); + } + + #[tokio::test] + async fn rejects_missing_api_key() { + // Ensure the key is absent for this test + std::env::remove_var("GROQ_API_KEY"); + + let data = vec![0u8; 100]; + let config = TranscriptionConfig::default(); + + let err = transcribe_audio(data, "test.ogg", &config) + .await + .unwrap_err(); + assert!( + err.to_string().contains("GROQ_API_KEY"), + "expected missing-key error, got: {err}" + ); + } + + #[test] + fn mime_for_audio_maps_accepted_formats() { + let cases = [ + ("flac", "audio/flac"), + ("mp3", "audio/mpeg"), + ("mpeg", "audio/mpeg"), + ("mpga", "audio/mpeg"), + ("mp4", "audio/mp4"), + ("m4a", "audio/mp4"), + ("ogg", "audio/ogg"), + ("oga", "audio/ogg"), + ("opus", "audio/opus"), + ("wav", "audio/wav"), + ("webm", "audio/webm"), + ]; + for (ext, expected) in cases { + assert_eq!( + mime_for_audio(ext), + Some(expected), + "failed for extension: {ext}" + ); + } + } + + #[test] + fn mime_for_audio_case_insensitive() { + assert_eq!(mime_for_audio("OGG"), Some("audio/ogg")); + assert_eq!(mime_for_audio("MP3"), Some("audio/mpeg")); + assert_eq!(mime_for_audio("Opus"), Some("audio/opus")); + } + + #[test] + fn mime_for_audio_rejects_unknown() { + assert_eq!(mime_for_audio("txt"), None); + assert_eq!(mime_for_audio("pdf"), None); + assert_eq!(mime_for_audio("aac"), None); + assert_eq!(mime_for_audio(""), None); + } + + #[test] + fn normalize_audio_filename_rewrites_oga() { + assert_eq!(normalize_audio_filename("voice.oga"), "voice.ogg"); + assert_eq!(normalize_audio_filename("file.OGA"), "file.ogg"); + } + + #[test] + fn normalize_audio_filename_preserves_accepted() { + assert_eq!(normalize_audio_filename("voice.ogg"), "voice.ogg"); + assert_eq!(normalize_audio_filename("track.mp3"), "track.mp3"); + assert_eq!(normalize_audio_filename("clip.opus"), "clip.opus"); + } + + #[test] + fn normalize_audio_filename_no_extension() { + assert_eq!(normalize_audio_filename("voice"), "voice"); + } + + #[tokio::test] + async fn rejects_unsupported_audio_format() { + let data = vec![0u8; 100]; + let config = TranscriptionConfig::default(); + + let err = transcribe_audio(data, "recording.aac", &config) + .await + .unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("Unsupported audio format"), + "expected unsupported-format error, got: {msg}" + ); + assert!( + msg.contains(".aac"), + "error should mention the rejected extension, got: {msg}" + ); + } +} diff --git a/src/config/mod.rs b/src/config/mod.rs index c40053d45..f329d8a96 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -13,8 +13,8 @@ pub use schema::{ ProxyConfig, ProxyScope, QueryClassificationConfig, ReliabilityConfig, ResourceLimitsConfig, RuntimeConfig, SandboxBackend, SandboxConfig, SchedulerConfig, SecretsConfig, SecurityConfig, SkillsConfig, SkillsPromptInjectionMode, SlackConfig, StorageConfig, StorageProviderConfig, - StorageProviderSection, StreamMode, TelegramConfig, TunnelConfig, WebSearchConfig, - WebhookConfig, + StorageProviderSection, StreamMode, TelegramConfig, TranscriptionConfig, TunnelConfig, + WebSearchConfig, WebhookConfig, }; #[cfg(test)] diff --git a/src/config/schema.rs b/src/config/schema.rs index cb7ad82f1..7f5eb429f 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -38,10 +38,17 @@ const SUPPORTED_PROXY_SERVICE_KEYS: &[&str] = &[ "tool.pushover", "memory.embeddings", "tunnel.custom", + "transcription.groq", ]; -const SUPPORTED_PROXY_SERVICE_SELECTORS: &[&str] = - &["provider.*", "channel.*", "tool.*", "memory.*", "tunnel.*"]; +const SUPPORTED_PROXY_SERVICE_SELECTORS: &[&str] = &[ + "provider.*", + "channel.*", + "tool.*", + "memory.*", + "tunnel.*", + "transcription.*", +]; static RUNTIME_PROXY_CONFIG: OnceLock> = OnceLock::new(); static RUNTIME_PROXY_CLIENT_CACHE: OnceLock>> = @@ -186,6 +193,10 @@ pub struct Config { /// Hardware configuration (wizard-driven physical world setup). #[serde(default)] pub hardware: HardwareConfig, + + /// Voice transcription configuration (Whisper API via Groq). + #[serde(default)] + pub transcription: TranscriptionConfig, } // ── Delegate Agents ────────────────────────────────────────────── @@ -298,6 +309,52 @@ impl Default for HardwareConfig { } } +// ── Transcription ──────────────────────────────────────────────── + +fn default_transcription_api_url() -> String { + "https://api.groq.com/openai/v1/audio/transcriptions".into() +} + +fn default_transcription_model() -> String { + "whisper-large-v3-turbo".into() +} + +fn default_transcription_max_duration_secs() -> u64 { + 120 +} + +/// Voice transcription configuration (Whisper API via Groq). +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct TranscriptionConfig { + /// Enable voice transcription for channels that support it. + #[serde(default)] + pub enabled: bool, + /// Whisper API endpoint URL. + #[serde(default = "default_transcription_api_url")] + pub api_url: String, + /// Whisper model name. + #[serde(default = "default_transcription_model")] + pub model: String, + /// Optional language hint (ISO-639-1, e.g. "en", "ru"). + #[serde(default)] + pub language: Option, + /// Maximum voice duration in seconds (messages longer than this are skipped). + #[serde(default = "default_transcription_max_duration_secs")] + pub max_duration_secs: u64, +} + +impl Default for TranscriptionConfig { + fn default() -> Self { + Self { + enabled: false, + api_url: default_transcription_api_url(), + model: default_transcription_model(), + language: None, + max_duration_secs: default_transcription_max_duration_secs(), + } + } +} + /// Agent orchestration configuration (`[agent]` section). #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] pub struct AgentConfig { @@ -2849,6 +2906,7 @@ impl Default for Config { agents: HashMap::new(), hardware: HardwareConfig::default(), query_classification: QueryClassificationConfig::default(), + transcription: TranscriptionConfig::default(), } } } @@ -4000,6 +4058,7 @@ default_temperature = 0.7 peripherals: PeripheralsConfig::default(), agents: HashMap::new(), hardware: HardwareConfig::default(), + transcription: TranscriptionConfig::default(), }; let toml_str = toml::to_string_pretty(&config).unwrap(); @@ -4169,6 +4228,7 @@ tool_dispatcher = "xml" peripherals: PeripheralsConfig::default(), agents: HashMap::new(), hardware: HardwareConfig::default(), + transcription: TranscriptionConfig::default(), }; config.save().await.unwrap(); @@ -6146,4 +6206,40 @@ default_model = "legacy-model" "Test setup: file should be world-readable (mode {mode:o})" ); } + + #[test] + async fn transcription_config_defaults() { + let tc = TranscriptionConfig::default(); + assert!(!tc.enabled); + assert!(tc.api_url.contains("groq.com")); + assert_eq!(tc.model, "whisper-large-v3-turbo"); + assert!(tc.language.is_none()); + assert_eq!(tc.max_duration_secs, 120); + } + + #[test] + async fn config_roundtrip_with_transcription() { + let mut config = Config::default(); + config.transcription.enabled = true; + config.transcription.language = Some("en".into()); + + let toml_str = toml::to_string_pretty(&config).unwrap(); + let parsed: Config = toml::from_str(&toml_str).unwrap(); + + assert!(parsed.transcription.enabled); + assert_eq!(parsed.transcription.language.as_deref(), Some("en")); + assert_eq!(parsed.transcription.model, "whisper-large-v3-turbo"); + } + + #[test] + async fn config_without_transcription_uses_defaults() { + let toml_str = r#" + default_provider = "openrouter" + default_model = "test-model" + default_temperature = 0.7 + "#; + let parsed: Config = toml::from_str(toml_str).unwrap(); + assert!(!parsed.transcription.enabled); + assert_eq!(parsed.transcription.max_duration_secs, 120); + } } diff --git a/src/onboard/wizard.rs b/src/onboard/wizard.rs index b9119d9e4..d622ae7ff 100644 --- a/src/onboard/wizard.rs +++ b/src/onboard/wizard.rs @@ -185,6 +185,7 @@ pub async fn run_wizard(force: bool) -> Result { agents: std::collections::HashMap::new(), hardware: hardware_config, query_classification: crate::config::QueryClassificationConfig::default(), + transcription: crate::config::TranscriptionConfig::default(), }; println!( @@ -428,6 +429,7 @@ async fn run_quick_setup_with_home( agents: std::collections::HashMap::new(), hardware: crate::config::HardwareConfig::default(), query_classification: crate::config::QueryClassificationConfig::default(), + transcription: crate::config::TranscriptionConfig::default(), }; config.save().await?;