From 1b8d747e1f62a9b7c348a62ea3a9e9339fd0b8bd Mon Sep 17 00:00:00 2001 From: argenis de la rosa Date: Sat, 28 Feb 2026 12:35:01 -0500 Subject: [PATCH] fix(acp): stabilize send path, remove unreachable arms, and fix docs tables --- docs/channels-reference.md | 4 +-- src/channels/acp.rs | 62 ++++++++++++++++++++------------------ src/config/schema.rs | 1 + 3 files changed, 36 insertions(+), 31 deletions(-) diff --git a/docs/channels-reference.md b/docs/channels-reference.md index 3b2619a56..759e36da6 100644 --- a/docs/channels-reference.md +++ b/docs/channels-reference.md @@ -630,8 +630,8 @@ rg -n "Matrix|Telegram|Discord|Slack|Mattermost|Signal|WhatsApp|Email|IRC|Lark|D | QQ | `QQ: connected and identified` | `QQ: ignoring C2C message from unauthorized user:` / `QQ: ignoring group message from unauthorized user:` | `QQ: received Reconnect (op 7)` / `QQ: received Invalid Session (op 9)` / `QQ: message channel closed` | | Nextcloud Talk (gateway) | `POST /nextcloud-talk — Nextcloud Talk bot webhook` | `Nextcloud Talk webhook signature verification failed` / `Nextcloud Talk: ignoring message from unauthorized actor:` | `Nextcloud Talk send failed:` / `LLM error for Nextcloud Talk message:` | | iMessage | `iMessage channel listening (AppleScript bridge)...` | (contact allowlist enforced by `allowed_contacts`) | `iMessage poll error:` | -|| ACP | `ACP channel started` | `ACP: ignoring message from unauthorized user:` | `ACP process exited unexpectedly:` / `ACP JSON-RPC timeout:` / `ACP process spawn failed:` | -|| Nostr | `Nostr channel listening as npub1...` | `Nostr: ignoring NIP-04 message from unauthorized pubkey:` / `Nostr: ignoring NIP-17 message from unauthorized pubkey:` | `Failed to decrypt NIP-04 message:` / `Failed to unwrap NIP-17 gift wrap:` / `Nostr relay pool shut down` | +| ACP | `ACP channel started` | `ACP: ignoring message from unauthorized user:` | `ACP process exited unexpectedly:` / `ACP JSON-RPC timeout:` / `ACP process spawn failed:` | +| Nostr | `Nostr channel listening as npub1...` | `Nostr: ignoring NIP-04 message from unauthorized pubkey:` / `Nostr: ignoring NIP-17 message from unauthorized pubkey:` | `Failed to decrypt NIP-04 message:` / `Failed to unwrap NIP-17 gift wrap:` / `Nostr relay pool shut down` | ### 7.3 Runtime supervisor keywords diff --git a/src/channels/acp.rs b/src/channels/acp.rs index cfcf6db5d..0f72203ed 100644 --- a/src/channels/acp.rs +++ b/src/channels/acp.rs @@ -42,6 +42,8 @@ pub struct AcpChannel { client: reqwest::Client, /// Active OpenCode subprocess and its I/O handles process: Arc>>, + /// Serializes ACP send operations to avoid concurrent process take/spawn races. + send_operation_lock: Arc>, /// Next message ID for JSON-RPC requests next_message_id: Arc, /// Optional response channel for sending ACP responses back to original channel @@ -173,6 +175,7 @@ impl AcpChannel { pairing: None, // TODO: Implement pairing if needed client: reqwest::Client::new(), process: Arc::new(Mutex::new(None)), + send_operation_lock: Arc::new(Mutex::new(())), next_message_id: Arc::new(AtomicU64::new(0)), response_channel: None, } @@ -398,6 +401,8 @@ impl Channel for AcpChannel { } async fn send(&self, message: &SendMessage) -> Result<()> { + let _send_guard = self.send_operation_lock.lock().await; + // Check if user is allowed if !self.is_user_allowed(&message.recipient) { tracing::warn!( @@ -430,38 +435,37 @@ impl Channel for AcpChannel { }; // Now we have ownership of the process, no lock held - if let Some(mut process) = process_opt { - let session_id = process - .session_id - .as_ref() - .context("No active ACP session")? - .clone(); - let response = self - .send_prompt(&mut process, &session_id, &content) - .await?; + let mut process = process_opt.context("ACP process disappeared unexpectedly")?; + let session_id = process + .session_id + .as_ref() + .context("No active ACP session")? + .clone(); + let response_result = self.send_prompt(&mut process, &session_id, &content).await; - // Put the process back + // Always restore process ownership, even when prompting fails. + { let mut process_guard = self.process.lock().await; *process_guard = Some(process); + } - // Send response back through response_channel if set - if let Some(response_channel) = &self.response_channel { - let response_message = SendMessage::new(response, message.recipient.clone()); - if let Err(e) = response_channel.send(&response_message).await { - tracing::warn!( - "Failed to send ACP response through response channel: {}", - e - ); - } - } else { - // Log if no response channel configured - tracing::info!( - "ACP response ready (no response channel configured): {}", - response + let response = response_result?; + + // Send response back through response_channel if set + if let Some(response_channel) = &self.response_channel { + let response_message = SendMessage::new(response, message.recipient.clone()); + if let Err(e) = response_channel.send(&response_message).await { + tracing::warn!( + "Failed to send ACP response through response channel: {}", + e ); } - // This should not happen because we just created it above - anyhow::bail!("ACP process disappeared unexpectedly"); + } else { + // Log if no response channel configured + tracing::info!( + "ACP response ready (no response channel configured): {}", + response + ); } Ok(()) @@ -477,9 +481,9 @@ impl Channel for AcpChannel { // listening for incoming messages, we implement a minimal listener // that just keeps the channel alive. - tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await; // Sleep for 1 hour - - Ok(()) + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + } } async fn health_check(&self) -> bool { diff --git a/src/config/schema.rs b/src/config/schema.rs index 87f4c950d..64ea284bc 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -518,6 +518,7 @@ impl std::fmt::Debug for Config { self.channels_config.dingtalk.is_some(), self.channels_config.napcat.is_some(), self.channels_config.qq.is_some(), + self.channels_config.acp.is_some(), self.channels_config.nostr.is_some(), self.channels_config.clawdtalk.is_some(), ]