diff --git a/src/channels/mod.rs b/src/channels/mod.rs index 17eb4379c..0cc571f16 100644 --- a/src/channels/mod.rs +++ b/src/channels/mod.rs @@ -404,6 +404,18 @@ fn interruption_scope_key(msg: &traits::ChannelMessage) -> String { format!("{}_{}_{}", msg.channel, msg.reply_target, msg.sender) } +/// Returns `true` when `content` is a `/stop` command (with optional `@botname` suffix). +/// Not gated on channel type — all non-CLI channels support `/stop`. +fn is_stop_command(content: &str) -> bool { + let trimmed = content.trim(); + if !trimmed.starts_with('/') { + return false; + } + let cmd = trimmed.split_whitespace().next().unwrap_or(""); + let base = cmd.split('@').next().unwrap_or(cmd); + base.eq_ignore_ascii_case("/stop") +} + /// Strip tool-call XML tags from outgoing messages. /// /// LLM responses may contain ``, ``, @@ -2755,6 +2767,49 @@ async fn run_message_dispatch_loop( let task_sequence = Arc::new(AtomicU32::new(1)); while let Some(msg) = rx.recv().await { + // Fast path: /stop cancels the in-flight task for this sender scope without + // spawning a worker or registering a new task. Handled here — before semaphore + // acquisition — so the target task is still in the store and is never replaced. + if msg.channel != "cli" && is_stop_command(&msg.content) { + let scope_key = interruption_scope_key(&msg); + let previous = { + let mut active = in_flight_by_sender.lock().await; + active.remove(&scope_key) + }; + let reply = if let Some(state) = previous { + state.cancellation.cancel(); + "Stop signal sent.".to_string() + } else { + "No in-flight task for this sender scope.".to_string() + }; + let channel = ctx + .channels_by_name + .get(&msg.channel) + .or_else(|| { + // Multi-room channels use "name:qualifier" format (e.g. "matrix:!roomId"); + // fall back to base channel name for routing. + msg.channel + .split_once(':') + .and_then(|(base, _)| ctx.channels_by_name.get(base)) + }) + .cloned(); + if let Some(channel) = channel { + let reply_target = msg.reply_target.clone(); + let thread_ts = msg.thread_ts.clone(); + tokio::spawn(async move { + let _ = channel + .send(&SendMessage::new(reply, &reply_target).in_thread(thread_ts)) + .await; + }); + } else { + tracing::warn!( + channel = %msg.channel, + "stop command: no registered channel found for reply" + ); + } + continue; + } + let permit = match Arc::clone(&semaphore).acquire_owned().await { Ok(permit) => permit, Err(_) => break, @@ -2773,7 +2828,12 @@ async fn run_message_dispatch_loop( let completion = Arc::new(InFlightTaskCompletion::new()); let task_id = task_sequence.fetch_add(1, Ordering::Relaxed) as u64; - if interrupt_enabled { + // Register all non-CLI tasks in the in-flight store so /stop can reach them. + // This is a deliberate broadening from the previous behaviour where only + // interrupt_enabled (Telegram/Slack) channels registered tasks. + let register_in_flight = msg.channel != "cli"; + + if register_in_flight { let previous = { let mut active = in_flight.lock().await; active.insert( @@ -2786,20 +2846,22 @@ async fn run_message_dispatch_loop( ) }; - if let Some(previous) = previous { - tracing::info!( - channel = %msg.channel, - sender = %msg.sender, - "Interrupting previous in-flight request for sender" - ); - previous.cancellation.cancel(); - previous.completion.wait().await; + if interrupt_enabled { + if let Some(previous) = previous { + tracing::info!( + channel = %msg.channel, + sender = %msg.sender, + "Interrupting previous in-flight request for sender" + ); + previous.cancellation.cancel(); + previous.completion.wait().await; + } } } process_channel_message(worker_ctx, msg, cancellation_token).await; - if interrupt_enabled { + if register_in_flight { let mut active = in_flight.lock().await; if active .get(&sender_scope_key) @@ -9197,4 +9259,47 @@ This is an example JSON object for profile settings."#; Err(e) => panic!("should succeed when telegram is configured: {e}"), } } + + // ── is_stop_command tests ───────────────────────────────────────────── + + #[test] + fn is_stop_command_matches_bare_slash_stop() { + assert!(is_stop_command("/stop")); + } + + #[test] + fn is_stop_command_matches_with_leading_trailing_whitespace() { + assert!(is_stop_command(" /stop ")); + } + + #[test] + fn is_stop_command_is_case_insensitive() { + assert!(is_stop_command("/STOP")); + assert!(is_stop_command("/Stop")); + } + + #[test] + fn is_stop_command_matches_with_bot_suffix() { + assert!(is_stop_command("/stop@zeroclaw_bot")); + } + + #[test] + fn is_stop_command_rejects_other_slash_commands() { + assert!(!is_stop_command("/new")); + assert!(!is_stop_command("/model gpt-4")); + assert!(!is_stop_command("/models")); + } + + #[test] + fn is_stop_command_rejects_plain_text() { + assert!(!is_stop_command("stop")); + assert!(!is_stop_command("please stop")); + assert!(!is_stop_command("")); + } + + #[test] + fn is_stop_command_rejects_stop_as_substring() { + assert!(!is_stop_command("/stopwatch")); + assert!(!is_stop_command("/stop-all")); + } }