fix(acp): stabilize send path, remove unreachable arms, and fix docs tables

This commit is contained in:
argenis de la rosa 2026-02-28 12:35:01 -05:00 committed by Argenis
parent 579f0f3d9a
commit 1b8d747e1f
3 changed files with 36 additions and 31 deletions

View File

@ -630,8 +630,8 @@ rg -n "Matrix|Telegram|Discord|Slack|Mattermost|Signal|WhatsApp|Email|IRC|Lark|D
| QQ | `QQ: connected and identified` | `QQ: ignoring C2C message from unauthorized user:` / `QQ: ignoring group message from unauthorized user:` | `QQ: received Reconnect (op 7)` / `QQ: received Invalid Session (op 9)` / `QQ: message channel closed` |
| Nextcloud Talk (gateway) | `POST /nextcloud-talk — Nextcloud Talk bot webhook` | `Nextcloud Talk webhook signature verification failed` / `Nextcloud Talk: ignoring message from unauthorized actor:` | `Nextcloud Talk send failed:` / `LLM error for Nextcloud Talk message:` |
| iMessage | `iMessage channel listening (AppleScript bridge)...` | (contact allowlist enforced by `allowed_contacts`) | `iMessage poll error:` |
|| ACP | `ACP channel started` | `ACP: ignoring message from unauthorized user:` | `ACP process exited unexpectedly:` / `ACP JSON-RPC timeout:` / `ACP process spawn failed:` |
|| Nostr | `Nostr channel listening as npub1...` | `Nostr: ignoring NIP-04 message from unauthorized pubkey:` / `Nostr: ignoring NIP-17 message from unauthorized pubkey:` | `Failed to decrypt NIP-04 message:` / `Failed to unwrap NIP-17 gift wrap:` / `Nostr relay pool shut down` |
| ACP | `ACP channel started` | `ACP: ignoring message from unauthorized user:` | `ACP process exited unexpectedly:` / `ACP JSON-RPC timeout:` / `ACP process spawn failed:` |
| Nostr | `Nostr channel listening as npub1...` | `Nostr: ignoring NIP-04 message from unauthorized pubkey:` / `Nostr: ignoring NIP-17 message from unauthorized pubkey:` | `Failed to decrypt NIP-04 message:` / `Failed to unwrap NIP-17 gift wrap:` / `Nostr relay pool shut down` |
### 7.3 Runtime supervisor keywords

View File

@ -42,6 +42,8 @@ pub struct AcpChannel {
client: reqwest::Client,
/// Active OpenCode subprocess and its I/O handles
process: Arc<Mutex<Option<AcpProcess>>>,
/// Serializes ACP send operations to avoid concurrent process take/spawn races.
send_operation_lock: Arc<Mutex<()>>,
/// Next message ID for JSON-RPC requests
next_message_id: Arc<AtomicU64>,
/// Optional response channel for sending ACP responses back to original channel
@ -173,6 +175,7 @@ impl AcpChannel {
pairing: None, // TODO: Implement pairing if needed
client: reqwest::Client::new(),
process: Arc::new(Mutex::new(None)),
send_operation_lock: Arc::new(Mutex::new(())),
next_message_id: Arc::new(AtomicU64::new(0)),
response_channel: None,
}
@ -398,6 +401,8 @@ impl Channel for AcpChannel {
}
async fn send(&self, message: &SendMessage) -> Result<()> {
let _send_guard = self.send_operation_lock.lock().await;
// Check if user is allowed
if !self.is_user_allowed(&message.recipient) {
tracing::warn!(
@ -430,38 +435,37 @@ impl Channel for AcpChannel {
};
// Now we have ownership of the process, no lock held
if let Some(mut process) = process_opt {
let session_id = process
.session_id
.as_ref()
.context("No active ACP session")?
.clone();
let response = self
.send_prompt(&mut process, &session_id, &content)
.await?;
let mut process = process_opt.context("ACP process disappeared unexpectedly")?;
let session_id = process
.session_id
.as_ref()
.context("No active ACP session")?
.clone();
let response_result = self.send_prompt(&mut process, &session_id, &content).await;
// Put the process back
// Always restore process ownership, even when prompting fails.
{
let mut process_guard = self.process.lock().await;
*process_guard = Some(process);
}
// Send response back through response_channel if set
if let Some(response_channel) = &self.response_channel {
let response_message = SendMessage::new(response, message.recipient.clone());
if let Err(e) = response_channel.send(&response_message).await {
tracing::warn!(
"Failed to send ACP response through response channel: {}",
e
);
}
} else {
// Log if no response channel configured
tracing::info!(
"ACP response ready (no response channel configured): {}",
response
let response = response_result?;
// Send response back through response_channel if set
if let Some(response_channel) = &self.response_channel {
let response_message = SendMessage::new(response, message.recipient.clone());
if let Err(e) = response_channel.send(&response_message).await {
tracing::warn!(
"Failed to send ACP response through response channel: {}",
e
);
}
// This should not happen because we just created it above
anyhow::bail!("ACP process disappeared unexpectedly");
} else {
// Log if no response channel configured
tracing::info!(
"ACP response ready (no response channel configured): {}",
response
);
}
Ok(())
@ -477,9 +481,9 @@ impl Channel for AcpChannel {
// listening for incoming messages, we implement a minimal listener
// that just keeps the channel alive.
tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await; // Sleep for 1 hour
Ok(())
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
}
}
async fn health_check(&self) -> bool {

View File

@ -518,6 +518,7 @@ impl std::fmt::Debug for Config {
self.channels_config.dingtalk.is_some(),
self.channels_config.napcat.is_some(),
self.channels_config.qq.is_some(),
self.channels_config.acp.is_some(),
self.channels_config.nostr.is_some(),
self.channels_config.clawdtalk.is_some(),
]