fix(channel): add interruption_scope_id for thread-aware cancellation scoping (#4017)
Add interruption_scope_id to ChannelMessage for thread-aware cancellation. Slack genuine thread replies and Matrix threads get scoped keys, preventing cross-thread cancellation. All other channels preserve existing behavior. Supersedes #3900. Depends on #3891.
This commit is contained in:
parent
5c898246ff
commit
a198b26594
@ -251,6 +251,7 @@ impl BlueskyChannel {
|
||||
channel: "bluesky".to_string(),
|
||||
timestamp,
|
||||
thread_ts: Some(notif.uri.clone()),
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -48,6 +48,7 @@ impl Channel for CliChannel {
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
if tx.send(msg).await.is_err() {
|
||||
@ -111,6 +112,7 @@ mod tests {
|
||||
channel: "cli".into(),
|
||||
timestamp: 1_234_567_890,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
assert_eq!(msg.id, "test-id");
|
||||
assert_eq!(msg.sender, "user");
|
||||
@ -130,6 +132,7 @@ mod tests {
|
||||
channel: "ch".into(),
|
||||
timestamp: 0,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
let cloned = msg.clone();
|
||||
assert_eq!(cloned.id, msg.id);
|
||||
|
||||
@ -275,6 +275,7 @@ impl Channel for DingTalkChannel {
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
if tx.send(channel_msg).await.is_err() {
|
||||
|
||||
@ -789,6 +789,7 @@ impl Channel for DiscordChannel {
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
if tx.send(channel_msg).await.is_err() {
|
||||
|
||||
@ -467,6 +467,7 @@ impl EmailChannel {
|
||||
channel: "email".to_string(),
|
||||
timestamp: email.timestamp,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
if tx.send(msg).await.is_err() {
|
||||
|
||||
@ -294,6 +294,7 @@ end tell"#
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
if tx.send(msg).await.is_err() {
|
||||
|
||||
@ -580,6 +580,7 @@ impl Channel for IrcChannel {
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
if tx.send(channel_msg).await.is_err() {
|
||||
|
||||
@ -823,6 +823,7 @@ impl LarkChannel {
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
tracing::debug!("Lark WS: message in {}", lark_msg.chat_id);
|
||||
@ -1120,6 +1121,7 @@ impl LarkChannel {
|
||||
channel: self.channel_name().to_string(),
|
||||
timestamp,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
});
|
||||
|
||||
messages
|
||||
|
||||
@ -267,6 +267,7 @@ impl LinqChannel {
|
||||
channel: "linq".to_string(),
|
||||
timestamp,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
});
|
||||
|
||||
messages
|
||||
|
||||
@ -893,7 +893,8 @@ impl Channel for MatrixChannel {
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts,
|
||||
thread_ts: thread_ts.clone(),
|
||||
interruption_scope_id: thread_ts,
|
||||
};
|
||||
|
||||
let _ = tx.send(msg).await;
|
||||
|
||||
@ -322,6 +322,7 @@ impl MattermostChannel {
|
||||
#[allow(clippy::cast_sign_loss)]
|
||||
timestamp: (create_at / 1000) as u64,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -198,6 +198,7 @@ impl Channel for MochatChannel {
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
if tx.send(channel_msg).await.is_err() {
|
||||
|
||||
@ -406,7 +406,13 @@ fn followup_thread_id(msg: &traits::ChannelMessage) -> Option<String> {
|
||||
}
|
||||
|
||||
fn interruption_scope_key(msg: &traits::ChannelMessage) -> String {
|
||||
format!("{}_{}_{}", msg.channel, msg.reply_target, msg.sender)
|
||||
match &msg.interruption_scope_id {
|
||||
Some(scope) => format!(
|
||||
"{}_{}_{}_{}",
|
||||
msg.channel, msg.reply_target, msg.sender, scope
|
||||
),
|
||||
None => format!("{}_{}_{}", msg.channel, msg.reply_target, msg.sender),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `true` when `content` is a `/stop` command (with optional `@botname` suffix).
|
||||
@ -5586,6 +5592,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "test-channel".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -5664,6 +5671,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -5756,6 +5764,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "test-channel".to_string(),
|
||||
timestamp: 3,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -5833,6 +5842,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "test-channel".to_string(),
|
||||
timestamp: 2,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -5920,6 +5930,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -6028,6 +6039,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 2,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -6117,6 +6129,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 3,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -6221,6 +6234,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 4,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -6310,6 +6324,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "test-channel".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -6389,6 +6404,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "test-channel".to_string(),
|
||||
timestamp: 2,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -6578,6 +6594,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "test-channel".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@ -6589,6 +6606,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "test-channel".to_string(),
|
||||
timestamp: 2,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@ -6677,6 +6695,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@ -6689,6 +6708,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 2,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@ -6790,6 +6810,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "slack".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: Some("1741234567.100001".to_string()),
|
||||
interruption_scope_id: Some("1741234567.100001".to_string()),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@ -6802,6 +6823,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "slack".to_string(),
|
||||
timestamp: 2,
|
||||
thread_ts: Some("1741234567.100001".to_string()),
|
||||
interruption_scope_id: Some("1741234567.100001".to_string()),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@ -6900,6 +6922,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@ -6912,6 +6935,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 2,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@ -6992,6 +7016,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "test-channel".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -7069,6 +7094,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "test-channel".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -7590,6 +7616,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "slack".into(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
assert_eq!(conversation_memory_key(&msg), "slack_U123_msg_abc123");
|
||||
@ -7605,6 +7632,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "slack".into(),
|
||||
timestamp: 1,
|
||||
thread_ts: Some("1741234567.123456".into()),
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
@ -7623,6 +7651,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "cli".into(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
assert_eq!(followup_thread_id(&msg).as_deref(), Some("msg_abc123"));
|
||||
@ -7638,6 +7667,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "slack".into(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
let msg2 = traits::ChannelMessage {
|
||||
id: "msg_2".into(),
|
||||
@ -7647,6 +7677,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "slack".into(),
|
||||
timestamp: 2,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
assert_ne!(
|
||||
@ -7668,6 +7699,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "slack".into(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
let msg2 = traits::ChannelMessage {
|
||||
id: "msg_2".into(),
|
||||
@ -7677,6 +7709,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "slack".into(),
|
||||
timestamp: 2,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
mem.store(
|
||||
@ -7822,6 +7855,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "test-channel".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -7837,6 +7871,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "test-channel".to_string(),
|
||||
timestamp: 2,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -7949,6 +7984,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -7980,6 +8016,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 2,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -8017,6 +8054,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 3,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -8115,6 +8153,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "test-channel".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -8218,6 +8257,7 @@ BTC is currently around $65,000 based on latest tool output."#
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -8787,6 +8827,7 @@ This is an example JSON object for profile settings."#;
|
||||
channel: "test-channel".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -8870,6 +8911,7 @@ This is an example JSON object for profile settings."#;
|
||||
channel: "test-channel".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -8885,6 +8927,7 @@ This is an example JSON object for profile settings."#;
|
||||
channel: "test-channel".to_string(),
|
||||
timestamp: 2,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -9028,6 +9071,7 @@ This is an example JSON object for profile settings."#;
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -9136,6 +9180,7 @@ This is an example JSON object for profile settings."#;
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -9236,6 +9281,7 @@ This is an example JSON object for profile settings."#;
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -9356,6 +9402,7 @@ This is an example JSON object for profile settings."#;
|
||||
channel: "telegram".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
CancellationToken::new(),
|
||||
)
|
||||
@ -9494,4 +9541,151 @@ This is an example JSON object for profile settings."#;
|
||||
};
|
||||
assert!(!cfg.enabled_for_channel("discord"));
|
||||
}
|
||||
|
||||
// ── interruption_scope_key tests ──────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn interruption_scope_key_without_scope_id_is_three_component() {
|
||||
let msg = traits::ChannelMessage {
|
||||
id: "1".into(),
|
||||
sender: "alice".into(),
|
||||
reply_target: "room".into(),
|
||||
content: "hi".into(),
|
||||
channel: "matrix".into(),
|
||||
timestamp: 0,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
assert_eq!(interruption_scope_key(&msg), "matrix_room_alice");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn interruption_scope_key_with_scope_id_is_four_component() {
|
||||
let msg = traits::ChannelMessage {
|
||||
id: "1".into(),
|
||||
sender: "alice".into(),
|
||||
reply_target: "room".into(),
|
||||
content: "hi".into(),
|
||||
channel: "matrix".into(),
|
||||
timestamp: 0,
|
||||
thread_ts: Some("$thread1".into()),
|
||||
interruption_scope_id: Some("$thread1".into()),
|
||||
};
|
||||
assert_eq!(interruption_scope_key(&msg), "matrix_room_alice_$thread1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn interruption_scope_key_thread_ts_alone_does_not_affect_key() {
|
||||
// thread_ts used for reply anchoring should not bleed into scope key
|
||||
let msg = traits::ChannelMessage {
|
||||
id: "1".into(),
|
||||
sender: "alice".into(),
|
||||
reply_target: "C123".into(),
|
||||
content: "hi".into(),
|
||||
channel: "slack".into(),
|
||||
timestamp: 0,
|
||||
thread_ts: Some("1234567890.000100".into()), // Slack top-level fallback
|
||||
interruption_scope_id: None, // but NOT a thread reply
|
||||
};
|
||||
assert_eq!(interruption_scope_key(&msg), "slack_C123_alice");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn message_dispatch_different_threads_do_not_cancel_each_other() {
|
||||
let channel_impl = Arc::new(SlackRecordingChannel::default());
|
||||
let channel: Arc<dyn Channel> = channel_impl.clone();
|
||||
|
||||
let mut channels_by_name = HashMap::new();
|
||||
channels_by_name.insert(channel.name().to_string(), channel);
|
||||
|
||||
let runtime_ctx = Arc::new(ChannelRuntimeContext {
|
||||
channels_by_name: Arc::new(channels_by_name),
|
||||
provider: Arc::new(SlowProvider {
|
||||
delay: Duration::from_millis(150),
|
||||
}),
|
||||
default_provider: Arc::new("test-provider".to_string()),
|
||||
memory: Arc::new(NoopMemory),
|
||||
tools_registry: Arc::new(vec![]),
|
||||
observer: Arc::new(NoopObserver),
|
||||
system_prompt: Arc::new("test-system-prompt".to_string()),
|
||||
model: Arc::new("test-model".to_string()),
|
||||
temperature: 0.0,
|
||||
auto_save_memory: false,
|
||||
max_tool_iterations: 10,
|
||||
min_relevance_score: 0.0,
|
||||
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
|
||||
pending_new_sessions: Arc::new(Mutex::new(HashSet::new())),
|
||||
provider_cache: Arc::new(Mutex::new(HashMap::new())),
|
||||
route_overrides: Arc::new(Mutex::new(HashMap::new())),
|
||||
api_key: None,
|
||||
api_url: None,
|
||||
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
|
||||
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
|
||||
workspace_dir: Arc::new(std::env::temp_dir()),
|
||||
prompt_config: Arc::new(crate::config::Config::default()),
|
||||
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
|
||||
interrupt_on_new_message: InterruptOnNewMessageConfig {
|
||||
telegram: false,
|
||||
slack: true,
|
||||
discord: false,
|
||||
mattermost: false,
|
||||
},
|
||||
multimodal: crate::config::MultimodalConfig::default(),
|
||||
hooks: None,
|
||||
non_cli_excluded_tools: Arc::new(Vec::new()),
|
||||
autonomy_level: AutonomyLevel::default(),
|
||||
tool_call_dedup_exempt: Arc::new(Vec::new()),
|
||||
model_routes: Arc::new(Vec::new()),
|
||||
query_classification: crate::config::QueryClassificationConfig::default(),
|
||||
ack_reactions: true,
|
||||
show_tool_calls: true,
|
||||
session_store: None,
|
||||
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
|
||||
&crate::config::AutonomyConfig::default(),
|
||||
)),
|
||||
activated_tools: None,
|
||||
});
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(8);
|
||||
let send_task = tokio::spawn(async move {
|
||||
// Two messages from same sender but in different Slack threads —
|
||||
// they must NOT cancel each other.
|
||||
tx.send(traits::ChannelMessage {
|
||||
id: "1741234567.100001".to_string(),
|
||||
sender: "alice".to_string(),
|
||||
reply_target: "C123".to_string(),
|
||||
content: "thread-a question".to_string(),
|
||||
channel: "slack".to_string(),
|
||||
timestamp: 1,
|
||||
thread_ts: Some("1741234567.100001".to_string()),
|
||||
interruption_scope_id: Some("1741234567.100001".to_string()),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(30)).await;
|
||||
tx.send(traits::ChannelMessage {
|
||||
id: "1741234567.200002".to_string(),
|
||||
sender: "alice".to_string(),
|
||||
reply_target: "C123".to_string(),
|
||||
content: "thread-b question".to_string(),
|
||||
channel: "slack".to_string(),
|
||||
timestamp: 2,
|
||||
thread_ts: Some("1741234567.200002".to_string()),
|
||||
interruption_scope_id: Some("1741234567.200002".to_string()),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
run_message_dispatch_loop(rx, runtime_ctx, 4).await;
|
||||
send_task.await.unwrap();
|
||||
|
||||
// Both tasks should have completed — different threads, no cancellation.
|
||||
let sent_messages = channel_impl.sent_messages.lock().await;
|
||||
assert_eq!(
|
||||
sent_messages.len(),
|
||||
2,
|
||||
"both Slack thread messages should complete, got: {sent_messages:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -193,6 +193,7 @@ impl NextcloudTalkChannel {
|
||||
channel: "nextcloud_talk".to_string(),
|
||||
timestamp: Self::now_unix_secs(),
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
});
|
||||
|
||||
messages
|
||||
@ -294,6 +295,7 @@ impl NextcloudTalkChannel {
|
||||
channel: "nextcloud_talk".to_string(),
|
||||
timestamp,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
});
|
||||
|
||||
messages
|
||||
|
||||
@ -253,6 +253,7 @@ impl Channel for NostrChannel {
|
||||
channel: "nostr".to_string(),
|
||||
timestamp,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
if tx.send(msg).await.is_err() {
|
||||
tracing::info!("Nostr listener: message bus closed, stopping");
|
||||
|
||||
@ -360,6 +360,7 @@ impl Channel for NotionChannel {
|
||||
channel: "notion".into(),
|
||||
timestamp,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
.await
|
||||
.is_err()
|
||||
|
||||
@ -465,6 +465,7 @@ impl Channel for QQChannel {
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
if tx.send(channel_msg).await.is_err() {
|
||||
@ -503,6 +504,7 @@ impl Channel for QQChannel {
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
if tx.send(channel_msg).await.is_err() {
|
||||
|
||||
@ -225,6 +225,7 @@ impl RedditChannel {
|
||||
channel: "reddit".to_string(),
|
||||
timestamp,
|
||||
thread_ts: item.parent_id.clone(),
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -266,6 +266,7 @@ impl SignalChannel {
|
||||
channel: "signal".to_string(),
|
||||
timestamp: timestamp / 1000, // millis → secs
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -165,6 +165,23 @@ impl SlackChannel {
|
||||
.map(str::to_string)
|
||||
}
|
||||
|
||||
/// Returns the interruption scope identifier for a Slack message.
|
||||
///
|
||||
/// Returns `Some(thread_ts)` only when the message is a genuine thread reply
|
||||
/// (Slack's `thread_ts` field is present and differs from the message's own `ts`).
|
||||
/// Returns `None` for top-level messages and thread parent messages (where
|
||||
/// `thread_ts == ts`), placing them in the 3-component scope key
|
||||
/// (`channel_reply_target_sender`).
|
||||
///
|
||||
/// Intentional: top-level messages and threaded replies are separate conversational
|
||||
/// scopes and should not cancel each other's in-flight tasks.
|
||||
fn inbound_interruption_scope_id(msg: &serde_json::Value, ts: &str) -> Option<String> {
|
||||
msg.get("thread_ts")
|
||||
.and_then(|t| t.as_str())
|
||||
.filter(|&t| t != ts)
|
||||
.map(str::to_string)
|
||||
}
|
||||
|
||||
fn normalized_channel_id(input: Option<&str>) -> Option<String> {
|
||||
input
|
||||
.map(str::trim)
|
||||
@ -1792,6 +1809,7 @@ impl SlackChannel {
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts: Self::inbound_thread_ts(event, ts),
|
||||
interruption_scope_id: Self::inbound_interruption_scope_id(event, ts),
|
||||
};
|
||||
|
||||
if tx.send(channel_msg).await.is_err() {
|
||||
@ -2356,6 +2374,7 @@ impl Channel for SlackChannel {
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts: Self::inbound_thread_ts(msg, ts),
|
||||
interruption_scope_id: Self::inbound_interruption_scope_id(msg, ts),
|
||||
};
|
||||
|
||||
if tx.send(channel_msg).await.is_err() {
|
||||
@ -2440,6 +2459,7 @@ impl Channel for SlackChannel {
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts: Some(thread_ts.clone()),
|
||||
interruption_scope_id: Some(thread_ts.clone()),
|
||||
};
|
||||
|
||||
if tx.send(channel_msg).await.is_err() {
|
||||
|
||||
@ -1142,6 +1142,7 @@ Allowlist Telegram username (without '@') or numeric user ID.",
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts: thread_id,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
}
|
||||
|
||||
@ -1264,6 +1265,7 @@ Allowlist Telegram username (without '@') or numeric user ID.",
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts: thread_id,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
}
|
||||
|
||||
@ -1425,6 +1427,7 @@ Allowlist Telegram username (without '@') or numeric user ID.",
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
thread_ts: thread_id,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -12,6 +12,11 @@ pub struct ChannelMessage {
|
||||
/// Platform thread identifier (e.g. Slack `ts`, Discord thread ID).
|
||||
/// When set, replies should be posted as threaded responses.
|
||||
pub thread_ts: Option<String>,
|
||||
/// Thread scope identifier for interruption/cancellation grouping.
|
||||
/// Distinct from `thread_ts` (reply anchor): this is `Some` only when the message
|
||||
/// is genuinely inside a reply thread and should be isolated from other threads.
|
||||
/// `None` means top-level — scope is sender+channel only.
|
||||
pub interruption_scope_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Message to send through a channel
|
||||
@ -182,6 +187,7 @@ mod tests {
|
||||
channel: "dummy".into(),
|
||||
timestamp: 123,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e.to_string()))
|
||||
@ -198,6 +204,7 @@ mod tests {
|
||||
channel: "dummy".into(),
|
||||
timestamp: 999,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
let cloned = message.clone();
|
||||
|
||||
@ -288,6 +288,7 @@ impl Channel for TwitterChannel {
|
||||
.get("conversation_id")
|
||||
.and_then(|c| c.as_str())
|
||||
.map(|s| s.to_string()),
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
if tx.send(channel_msg).await.is_err() {
|
||||
|
||||
@ -163,6 +163,7 @@ impl WatiChannel {
|
||||
channel: "wati".to_string(),
|
||||
timestamp,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
});
|
||||
|
||||
messages
|
||||
|
||||
@ -237,6 +237,7 @@ impl Channel for WebhookChannel {
|
||||
channel: "webhook".to_string(),
|
||||
timestamp,
|
||||
thread_ts: payload.thread_id,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
if state.tx.send(msg).await.is_err() {
|
||||
|
||||
@ -142,6 +142,7 @@ impl WhatsAppChannel {
|
||||
channel: "whatsapp".to_string(),
|
||||
timestamp,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@ -741,6 +741,7 @@ impl Channel for WhatsAppWebChannel {
|
||||
content,
|
||||
timestamp: chrono::Utc::now().timestamp() as u64,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
.await
|
||||
{
|
||||
|
||||
@ -2173,6 +2173,7 @@ mod tests {
|
||||
channel: "whatsapp".into(),
|
||||
timestamp: 1,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
let key = whatsapp_memory_key(&msg);
|
||||
|
||||
@ -124,6 +124,7 @@ impl Channel for MatrixTestChannel {
|
||||
channel: self.channel_name.clone(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e.to_string()))
|
||||
@ -564,6 +565,7 @@ fn channel_message_thread_ts_preserved_on_clone() {
|
||||
channel: "slack".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: Some("1700000000.000001".into()),
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
let cloned = msg.clone();
|
||||
@ -580,6 +582,7 @@ fn channel_message_none_thread_ts_preserved() {
|
||||
channel: "telegram".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
assert!(msg.clone().thread_ts.is_none());
|
||||
@ -633,6 +636,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "telegram".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
"discord" => ChannelMessage {
|
||||
id: "dc_1".into(),
|
||||
@ -642,6 +646,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "discord".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
"slack" => ChannelMessage {
|
||||
id: "sl_1".into(),
|
||||
@ -651,6 +656,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "slack".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: Some("1700000000.000001".into()),
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
"imessage" => ChannelMessage {
|
||||
id: "im_1".into(),
|
||||
@ -660,6 +666,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "imessage".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
"irc" => ChannelMessage {
|
||||
id: "irc_1".into(),
|
||||
@ -669,6 +676,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "irc".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
"email" => ChannelMessage {
|
||||
id: "email_1".into(),
|
||||
@ -678,6 +686,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "email".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
"signal" => ChannelMessage {
|
||||
id: "sig_1".into(),
|
||||
@ -687,6 +696,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "signal".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
"mattermost" => ChannelMessage {
|
||||
id: "mm_1".into(),
|
||||
@ -696,6 +706,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "mattermost".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: Some("root_msg_id".into()),
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
"whatsapp" => ChannelMessage {
|
||||
id: "wa_1".into(),
|
||||
@ -705,6 +716,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "whatsapp".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
"nextcloud_talk" => ChannelMessage {
|
||||
id: "nc_1".into(),
|
||||
@ -714,6 +726,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "nextcloud_talk".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
"wecom" => ChannelMessage {
|
||||
id: "wc_1".into(),
|
||||
@ -723,6 +736,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "wecom".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
"dingtalk" => ChannelMessage {
|
||||
id: "dt_1".into(),
|
||||
@ -732,6 +746,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "dingtalk".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
"qq" => ChannelMessage {
|
||||
id: "qq_1".into(),
|
||||
@ -741,6 +756,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "qq".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
"linq" => ChannelMessage {
|
||||
id: "lq_1".into(),
|
||||
@ -750,6 +766,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "linq".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
"wati" => ChannelMessage {
|
||||
id: "wt_1".into(),
|
||||
@ -759,6 +776,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "wati".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
"cli" => ChannelMessage {
|
||||
id: "cli_1".into(),
|
||||
@ -768,6 +786,7 @@ fn make_platform_message(platform: &str) -> ChannelMessage {
|
||||
channel: "cli".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
},
|
||||
_ => panic!("Unknown platform: {platform}"),
|
||||
}
|
||||
@ -1068,6 +1087,7 @@ fn channel_message_zero_timestamp() {
|
||||
channel: "ch".into(),
|
||||
timestamp: 0,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
assert_eq!(msg.timestamp, 0);
|
||||
}
|
||||
@ -1082,6 +1102,7 @@ fn channel_message_max_timestamp() {
|
||||
channel: "ch".into(),
|
||||
timestamp: u64::MAX,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
assert_eq!(msg.timestamp, u64::MAX);
|
||||
}
|
||||
|
||||
@ -25,6 +25,7 @@ fn channel_message_sender_field_holds_platform_user_id() {
|
||||
channel: "telegram".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
assert_eq!(msg.sender, "123456789");
|
||||
@ -47,6 +48,7 @@ fn channel_message_reply_target_distinct_from_sender() {
|
||||
channel: "discord".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
assert_ne!(
|
||||
@ -67,6 +69,7 @@ fn channel_message_fields_not_swapped() {
|
||||
channel: "test".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
@ -93,6 +96,7 @@ fn channel_message_preserves_all_fields_on_clone() {
|
||||
channel: "test_channel".into(),
|
||||
timestamp: 1700000001,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
};
|
||||
|
||||
let cloned = original.clone();
|
||||
@ -186,6 +190,7 @@ impl Channel for CapturingChannel {
|
||||
channel: "capturing".into(),
|
||||
timestamp: 1700000000,
|
||||
thread_ts: None,
|
||||
interruption_scope_id: None,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e.to_string()))
|
||||
|
||||
Loading…
Reference in New Issue
Block a user