Merge pull request #2725 from zeroclaw-labs/issue-2702-matrix-otk-conflict-dev

fix(matrix): break OTK conflict retry loop
This commit is contained in:
Argenis 2026-03-05 01:54:18 -05:00 committed by GitHub
commit 04deae13b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -16,7 +16,10 @@ use matrix_sdk::{
use reqwest::Client;
use serde::Deserialize;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tokio::sync::{mpsc, Mutex, OnceCell, RwLock};
/// Matrix channel for Matrix Client-Server API.
@ -33,6 +36,7 @@ pub struct MatrixChannel {
zeroclaw_dir: Option<PathBuf>,
resolved_room_id_cache: Arc<RwLock<Option<String>>>,
sdk_client: Arc<OnceCell<MatrixSdkClient>>,
otk_conflict_detected: Arc<AtomicBool>,
http_client: Client,
transcription: Option<crate::config::TranscriptionConfig>,
}
@ -111,6 +115,23 @@ impl MatrixChannel {
format!("{error_type} (details redacted)")
}
fn is_otk_conflict_message(message: &str) -> bool {
let lower = message.to_ascii_lowercase();
lower.contains("one time key") && lower.contains("already exists")
}
fn otk_conflict_recovery_message(&self) -> String {
let mut message = String::from(
"Matrix E2EE one-time key upload conflict detected (`one time key ... already exists`). \
ZeroClaw paused Matrix sync to avoid an infinite retry loop. \
Resolve by deregistering the stale Matrix device for this bot account, resetting the local Matrix crypto store, then restarting ZeroClaw.",
);
if let Some(store_dir) = self.matrix_store_dir() {
message.push_str(&format!(" Local crypto store: {}", store_dir.display()));
}
message
}
fn normalize_optional_field(value: Option<String>) -> Option<String> {
value
.map(|entry| entry.trim().to_string())
@ -174,6 +195,7 @@ impl MatrixChannel {
zeroclaw_dir,
resolved_room_id_cache: Arc::new(RwLock::new(None)),
sdk_client: Arc::new(OnceCell::new()),
otk_conflict_detected: Arc::new(AtomicBool::new(false)),
http_client: Client::new(),
transcription: None,
}
@ -525,6 +547,17 @@ impl MatrixChannel {
};
client.restore_session(session).await?;
let holder = client.cross_process_store_locks_holder_name().to_string();
if let Err(error) = client
.encryption()
.enable_cross_process_store_lock(holder)
.await
{
let safe_error = Self::sanitize_error_for_log(&error);
tracing::warn!(
"Matrix failed to enable cross-process crypto-store lock: {safe_error}"
);
}
Ok::<MatrixSdkClient, anyhow::Error>(client)
})
@ -686,6 +719,10 @@ impl Channel for MatrixChannel {
}
async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
if self.otk_conflict_detected.load(Ordering::Relaxed) {
anyhow::bail!("{}", self.otk_conflict_recovery_message());
}
let client = self.matrix_client().await?;
let target_room_id = self.target_room_id().await?;
let target_room: OwnedRoomId = target_room_id.parse()?;
@ -711,6 +748,10 @@ impl Channel for MatrixChannel {
}
async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
if self.otk_conflict_detected.load(Ordering::Relaxed) {
anyhow::bail!("{}", self.otk_conflict_recovery_message());
}
let target_room_id = self.target_room_id().await?;
self.ensure_room_supported(&target_room_id).await?;
@ -891,15 +932,29 @@ impl Channel for MatrixChannel {
});
let sync_settings = SyncSettings::new().timeout(std::time::Duration::from_secs(30));
let otk_conflict_detected = Arc::clone(&self.otk_conflict_detected);
client
.sync_with_result_callback(sync_settings, |sync_result| {
let tx = tx.clone();
let otk_conflict_detected = Arc::clone(&otk_conflict_detected);
async move {
if tx.is_closed() {
return Ok::<LoopCtrl, matrix_sdk::Error>(LoopCtrl::Break);
}
if let Err(error) = sync_result {
let raw_error = error.to_string();
if MatrixChannel::is_otk_conflict_message(&raw_error) {
let first_detection =
!otk_conflict_detected.swap(true, Ordering::SeqCst);
if first_detection {
tracing::error!(
"Matrix detected one-time key upload conflict; stopping listener to avoid retry loop."
);
}
return Ok::<LoopCtrl, matrix_sdk::Error>(LoopCtrl::Break);
}
let safe_error = MatrixChannel::sanitize_error_for_log(&error);
tracing::warn!("Matrix sync error: {safe_error}, retrying...");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
@ -910,10 +965,18 @@ impl Channel for MatrixChannel {
})
.await?;
if self.otk_conflict_detected.load(Ordering::Relaxed) {
anyhow::bail!("{}", self.otk_conflict_recovery_message());
}
Ok(())
}
async fn health_check(&self) -> bool {
if self.otk_conflict_detected.load(Ordering::Relaxed) {
return false;
}
let Ok(room_id) = self.target_room_id().await else {
return false;
};
@ -929,7 +992,6 @@ impl Channel for MatrixChannel {
#[cfg(test)]
mod tests {
use super::*;
use matrix_sdk::ruma::{OwnedEventId, OwnedUserId};
fn make_channel() -> MatrixChannel {
MatrixChannel::new(
@ -1055,6 +1117,33 @@ mod tests {
assert!(ch.matrix_store_dir().is_none());
}
#[test]
fn otk_conflict_message_detection_matches_matrix_errors() {
assert!(MatrixChannel::is_otk_conflict_message(
"One time key signed_curve25519:AAAAAAAAAA4 already exists. Old key: ... new key: ..."
));
assert!(!MatrixChannel::is_otk_conflict_message(
"Matrix sync timeout while waiting for long poll"
));
}
#[test]
fn otk_conflict_recovery_message_includes_store_path_when_available() {
let ch = MatrixChannel::new_with_session_hint_and_zeroclaw_dir(
"https://matrix.org".to_string(),
"tok".to_string(),
"!r:m".to_string(),
vec![],
None,
None,
Some(PathBuf::from("/tmp/zeroclaw")),
);
let message = ch.otk_conflict_recovery_message();
assert!(message.contains("one-time key upload conflict"));
assert!(message.contains("/tmp/zeroclaw/state/matrix"));
}
#[test]
fn encode_path_segment_encodes_room_refs() {
assert_eq!(