From 248348bd80c446006fe8cc404e9e24064fc9aa23 Mon Sep 17 00:00:00 2001 From: Abdullah Imad Date: Wed, 11 Mar 2026 19:07:49 -0400 Subject: [PATCH] feat(matrix): reaction and threading support (#3219) Implement add_reaction/remove_reaction for Matrix channel using ReactionEventContent and redaction. Add threading support via Relation::Thread in send() and thread_ts extraction from incoming messages, enabling threaded conversations. Co-authored-by: Claude Opus 4.6 Co-authored-by: Argenis --- src/channels/matrix.rs | 87 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 83 insertions(+), 4 deletions(-) diff --git a/src/channels/matrix.rs b/src/channels/matrix.rs index e6d4c3836..42c7e8335 100644 --- a/src/channels/matrix.rs +++ b/src/channels/matrix.rs @@ -1,12 +1,17 @@ use crate::channels::traits::{Channel, ChannelMessage, SendMessage}; use async_trait::async_trait; +use std::collections::HashMap; use matrix_sdk::{ authentication::matrix::MatrixSession, config::SyncSettings, ruma::{ events::room::message::{ - MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent, + MessageType, OriginalSyncRoomMessageEvent, Relation, RoomMessageEventContent, }, + events::reaction::ReactionEventContent, + events::relation::{Annotation, InReplyTo, Thread}, + events::room::MediaSource, + OwnedEventId, OwnedRoomId, OwnedUserId, }, Client as MatrixSdkClient, LoopCtrl, Room, RoomState, SessionMeta, SessionTokens, @@ -31,6 +36,7 @@ pub struct MatrixChannel { resolved_room_id_cache: Arc>>, sdk_client: Arc>, http_client: Client, + reaction_events: Arc>>, } impl std::fmt::Debug for MatrixChannel { @@ -163,6 +169,7 @@ impl MatrixChannel { resolved_room_id_cache: Arc::new(RwLock::new(None)), sdk_client: Arc::new(OnceCell::new()), http_client: Client::new(), + reaction_events: Arc::new(RwLock::new(HashMap::new())), } } @@ -547,8 +554,18 @@ impl Channel for MatrixChannel { anyhow::bail!("Matrix room '{}' is not in joined state", target_room_id); } - room.send(RoomMessageEventContent::text_markdown(&message.content)) - .await?; + let mut content = RoomMessageEventContent::text_markdown(&message.content); + + if let Some(ref thread_ts) = message.thread_ts { + if let Ok(thread_root) = thread_ts.parse::() { + content.relates_to = Some(Relation::Thread(Thread::plain( + thread_root.clone(), + thread_root, + ))); + } + } + + room.send(content).await?; Ok(()) } @@ -634,6 +651,10 @@ impl Channel for MatrixChannel { } } + let thread_ts = match &event.content.relates_to { + Some(Relation::Thread(thread)) => Some(thread.event_id.to_string()), + _ => None, + }; let msg = ChannelMessage { id: event_id, sender: sender.clone(), @@ -644,7 +665,7 @@ impl Channel for MatrixChannel { .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(), - thread_ts: None, + thread_ts, }; let _ = tx.send(msg).await; @@ -684,6 +705,64 @@ impl Channel for MatrixChannel { self.matrix_client().await.is_ok() } + + async fn add_reaction( + &self, + _channel_id: &str, + message_id: &str, + emoji: &str, + ) -> anyhow::Result<()> { + let client = self.matrix_client().await?; + let target_room_id = self.target_room_id().await?; + let target_room: OwnedRoomId = target_room_id.parse()?; + + let room = client + .get_room(&target_room) + .ok_or_else(|| anyhow::anyhow!("Matrix room not found for reaction"))?; + + let event_id: OwnedEventId = message_id + .parse() + .map_err(|_| anyhow::anyhow!("Invalid event ID for reaction: {}", message_id))?; + + let reaction = ReactionEventContent::new(Annotation::new(event_id, emoji.to_string())); + let response = room.send(reaction).await?; + + let key = format!("{}:{}", message_id, emoji); + self.reaction_events + .write() + .await + .insert(key, response.event_id.to_string()); + + Ok(()) + } + + async fn remove_reaction( + &self, + _channel_id: &str, + message_id: &str, + emoji: &str, + ) -> anyhow::Result<()> { + let key = format!("{}:{}", message_id, emoji); + let reaction_event_id = self.reaction_events.write().await.remove(&key); + + if let Some(reaction_event_id) = reaction_event_id { + let client = self.matrix_client().await?; + let target_room_id = self.target_room_id().await?; + let target_room: OwnedRoomId = target_room_id.parse()?; + + let room = client + .get_room(&target_room) + .ok_or_else(|| anyhow::anyhow!("Matrix room not found for reaction removal"))?; + + let event_id: OwnedEventId = reaction_event_id.parse().map_err(|_| { + anyhow::anyhow!("Invalid reaction event ID: {}", reaction_event_id) + })?; + + room.redact(&event_id, None, None).await?; + } + + Ok(()) + } } #[cfg(test)]