From d38d706c8e9cd31ca3f6cbb6a4d87cc0187e3323 Mon Sep 17 00:00:00 2001 From: Joe Hoyle Date: Sat, 21 Mar 2026 05:32:31 -0400 Subject: [PATCH] feat(channel): add Slack Assistants API status indicators (#4105) Implement start_typing/stop_typing for Slack using the Assistants API assistant.threads.setStatus method. Tracks thread context from assistant_thread_started events and inbound messages, then sets "is thinking..." status during processing. Status auto-clears when the bot sends a reply via chat.postMessage. Co-authored-by: Claude Opus 4.6 (1M context) --- src/channels/slack.rs | 117 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 116 insertions(+), 1 deletion(-) diff --git a/src/channels/slack.rs b/src/channels/slack.rs index 1b686b626..b4c6c25e4 100644 --- a/src/channels/slack.rs +++ b/src/channels/slack.rs @@ -30,6 +30,8 @@ pub struct SlackChannel { group_reply_allowed_sender_ids: Vec, user_display_name_cache: Mutex>, workspace_dir: Option, + /// Maps channel_id -> thread_ts for active assistant threads (used for status indicators). + active_assistant_thread: Mutex>, } const SLACK_HISTORY_MAX_RETRIES: u32 = 3; @@ -118,6 +120,7 @@ impl SlackChannel { group_reply_allowed_sender_ids: Vec::new(), user_display_name_cache: Mutex::new(HashMap::new()), workspace_dir: None, + active_assistant_thread: Mutex::new(HashMap::new()), } } @@ -1784,7 +1787,34 @@ impl SlackChannel { else { continue; }; - if event.get("type").and_then(|v| v.as_str()) != Some("message") { + let event_type = event + .get("type") + .and_then(|v| v.as_str()) + .unwrap_or_default(); + + // Track assistant thread context for Assistants API status indicators. + if event_type == "assistant_thread_started" + || event_type == "assistant_thread_context_changed" + { + if let Some(thread) = event.get("assistant_thread") { + let ch = thread + .get("channel_id") + .and_then(|v| v.as_str()) + .unwrap_or_default(); + let tts = thread + .get("thread_ts") + .and_then(|v| v.as_str()) + .unwrap_or_default(); + if !ch.is_empty() && !tts.is_empty() { + if let Ok(mut map) = self.active_assistant_thread.lock() { + map.insert(ch.to_string(), tts.to_string()); + } + } + } + continue; + } + + if event_type != "message" { continue; } let subtype = event.get("subtype").and_then(|v| v.as_str()); @@ -1864,6 +1894,13 @@ impl SlackChannel { interruption_scope_id: Self::inbound_interruption_scope_id(event, ts), }; + // Track thread context so start_typing can set assistant status. + if let Some(ref tts) = channel_msg.thread_ts { + if let Ok(mut map) = self.active_assistant_thread.lock() { + map.insert(channel_id.clone(), tts.clone()); + } + } + if tx.send(channel_msg).await.is_err() { return Ok(()); } @@ -2639,6 +2676,49 @@ impl Channel for SlackChannel { }; Self::evaluate_health(bot_ok, socket_mode_enabled, socket_mode_ok) } + + async fn start_typing(&self, recipient: &str) -> anyhow::Result<()> { + let thread_ts = { + let map = self + .active_assistant_thread + .lock() + .map_err(|e| anyhow::anyhow!("lock poisoned: {e}"))?; + match map.get(recipient) { + Some(ts) => ts.clone(), + None => return Ok(()), + } + }; + + let body = serde_json::json!({ + "channel_id": recipient, + "thread_ts": thread_ts, + "status": "is thinking...", + }); + + // Gracefully ignore errors — non-assistant contexts will return errors. + if let Ok(resp) = self + .http_client() + .post("https://slack.com/api/assistant.threads.setStatus") + .bearer_auth(&self.bot_token) + .json(&body) + .send() + .await + { + if !resp.status().is_success() { + tracing::debug!( + "assistant.threads.setStatus returned {}; ignoring", + resp.status() + ); + } + } + + Ok(()) + } + + async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> { + // Status auto-clears when the bot sends a message via chat.postMessage. + Ok(()) + } } #[cfg(test)] @@ -3549,4 +3629,39 @@ mod tests { let key2 = super::super::conversation_history_key(&msg2); assert_ne!(key1, key2, "session key should differ per thread"); } + + #[tokio::test] + async fn start_typing_requires_thread_context() { + let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![]); + // No thread_ts tracked for "C999" — start_typing should be a no-op (Ok). + let result = ch.start_typing("C999").await; + assert!( + result.is_ok(), + "start_typing should succeed as no-op without thread context" + ); + } + + #[test] + fn assistant_thread_tracking() { + let ch = SlackChannel::new("xoxb-fake".into(), None, None, vec![], vec![]); + + // Initially empty. + { + let map = ch.active_assistant_thread.lock().unwrap(); + assert!(map.is_empty()); + } + + // Simulate storing a thread_ts (as listen_socket_mode would). + { + let mut map = ch.active_assistant_thread.lock().unwrap(); + map.insert("C123".to_string(), "1741234567.000100".to_string()); + } + + // Verify retrieval. + { + let map = ch.active_assistant_thread.lock().unwrap(); + assert_eq!(map.get("C123"), Some(&"1741234567.000100".to_string()),); + assert_eq!(map.get("C999"), None); + } + } }