feat(agent): run independent tool calls concurrently in runtime loop

This commit is contained in:
Chummy
2026-02-20 18:48:19 +08:00
parent b26bf262b8
commit f7b2f7a7d7
2 changed files with 425 additions and 82 deletions
+2
View File
@@ -75,6 +75,8 @@ Notes:
- Setting `max_tool_iterations = 0` falls back to safe default `10`.
- If a channel message exceeds this value, the runtime returns: `Agent exceeded maximum tool iterations (<value>)`.
- In CLI, gateway, and channel tool loops, multiple independent tool calls are executed concurrently by default when the pending calls do not require approval gating; result order remains stable.
- `parallel_tools` applies to the `Agent::turn()` API surface. It does not gate the runtime loop used by CLI, gateway, or channel handlers.
## `[agents.<name>]`
+423 -82
View File
@@ -889,6 +889,145 @@ pub(crate) async fn agent_turn(
.await
}
async fn execute_one_tool(
call_name: &str,
call_arguments: serde_json::Value,
tools_registry: &[Box<dyn Tool>],
observer: &dyn Observer,
cancellation_token: Option<&CancellationToken>,
) -> Result<String> {
let Some(tool) = find_tool(tools_registry, call_name) else {
return Ok(format!("Unknown tool: {call_name}"));
};
observer.record_event(&ObserverEvent::ToolCallStart {
tool: call_name.to_string(),
});
let start = Instant::now();
let tool_future = tool.execute(call_arguments);
let tool_result = if let Some(token) = cancellation_token {
tokio::select! {
() = token.cancelled() => return Err(ToolLoopCancelled.into()),
result = tool_future => result,
}
} else {
tool_future.await
};
match tool_result {
Ok(r) => {
observer.record_event(&ObserverEvent::ToolCall {
tool: call_name.to_string(),
duration: start.elapsed(),
success: r.success,
});
if r.success {
Ok(scrub_credentials(&r.output))
} else {
Ok(format!("Error: {}", r.error.unwrap_or_else(|| r.output)))
}
}
Err(e) => {
observer.record_event(&ObserverEvent::ToolCall {
tool: call_name.to_string(),
duration: start.elapsed(),
success: false,
});
Ok(format!("Error executing {call_name}: {e}"))
}
}
}
fn should_execute_tools_in_parallel(
tool_calls: &[ParsedToolCall],
approval: Option<&ApprovalManager>,
) -> bool {
if tool_calls.len() <= 1 {
return false;
}
if let Some(mgr) = approval {
if tool_calls.iter().any(|call| mgr.needs_approval(&call.name)) {
// Approval-gated calls must keep sequential handling so the caller can
// enforce CLI prompt/deny policy consistently.
return false;
}
}
true
}
async fn execute_tools_parallel(
tool_calls: &[ParsedToolCall],
tools_registry: &[Box<dyn Tool>],
observer: &dyn Observer,
cancellation_token: Option<&CancellationToken>,
) -> Result<Vec<String>> {
let futures: Vec<_> = tool_calls
.iter()
.map(|call| {
execute_one_tool(
&call.name,
call.arguments.clone(),
tools_registry,
observer,
cancellation_token,
)
})
.collect();
let results = futures::future::join_all(futures).await;
results.into_iter().collect()
}
async fn execute_tools_sequential(
tool_calls: &[ParsedToolCall],
tools_registry: &[Box<dyn Tool>],
observer: &dyn Observer,
approval: Option<&ApprovalManager>,
channel_name: &str,
cancellation_token: Option<&CancellationToken>,
) -> Result<Vec<String>> {
let mut individual_results: Vec<String> = Vec::with_capacity(tool_calls.len());
for call in tool_calls {
if let Some(mgr) = approval {
if mgr.needs_approval(&call.name) {
let request = ApprovalRequest {
tool_name: call.name.clone(),
arguments: call.arguments.clone(),
};
let decision = if channel_name == "cli" {
mgr.prompt_cli(&request)
} else {
ApprovalResponse::No
};
mgr.record_decision(&call.name, &call.arguments, decision, channel_name);
if decision == ApprovalResponse::No {
individual_results.push("Denied by user.".to_string());
continue;
}
}
}
let result = execute_one_tool(
&call.name,
call.arguments.clone(),
tools_registry,
observer,
cancellation_token,
)
.await?;
individual_results.push(result);
}
Ok(individual_results)
}
// ── Agent Tool-Call Loop ──────────────────────────────────────────────────
// Core agentic iteration: send conversation to the LLM, parse any tool
// calls from the response, execute them, append results to history, and
@@ -1085,86 +1224,34 @@ pub(crate) async fn run_tool_call_loop(
let _ = std::io::stdout().flush();
}
// Execute each tool call and build results.
// `individual_results` tracks per-call output so that native-mode history
// can emit one `role: tool` message per tool call with the correct ID.
// Execute tool calls and build results. `individual_results` tracks per-call output so
// native-mode history can emit one role=tool message per tool call with the correct ID.
//
// When multiple tool calls are present and interactive CLI approval is not needed, run
// tool executions concurrently for lower wall-clock latency.
let mut tool_results = String::new();
let mut individual_results: Vec<String> = Vec::new();
for call in &tool_calls {
// ── Approval hook ────────────────────────────────
if let Some(mgr) = approval {
if mgr.needs_approval(&call.name) {
let request = ApprovalRequest {
tool_name: call.name.clone(),
arguments: call.arguments.clone(),
};
let should_parallel = should_execute_tools_in_parallel(&tool_calls, approval);
let individual_results = if should_parallel {
execute_tools_parallel(
&tool_calls,
tools_registry,
observer,
cancellation_token.as_ref(),
)
.await?
} else {
execute_tools_sequential(
&tool_calls,
tools_registry,
observer,
approval,
channel_name,
cancellation_token.as_ref(),
)
.await?
};
// On CLI, prompt interactively. On other channels where
// interactive approval is not possible, deny the call to
// respect the supervised autonomy setting.
let decision = if channel_name == "cli" {
mgr.prompt_cli(&request)
} else {
ApprovalResponse::No
};
mgr.record_decision(&call.name, &call.arguments, decision, channel_name);
if decision == ApprovalResponse::No {
let denied = "Denied by user.".to_string();
individual_results.push(denied.clone());
let _ = writeln!(
tool_results,
"<tool_result name=\"{}\">\n{denied}\n</tool_result>",
call.name
);
continue;
}
}
}
observer.record_event(&ObserverEvent::ToolCallStart {
tool: call.name.clone(),
});
let start = Instant::now();
let result = if let Some(tool) = find_tool(tools_registry, &call.name) {
let tool_future = tool.execute(call.arguments.clone());
let tool_result = if let Some(token) = cancellation_token.as_ref() {
tokio::select! {
() = token.cancelled() => return Err(ToolLoopCancelled.into()),
result = tool_future => result,
}
} else {
tool_future.await
};
match tool_result {
Ok(r) => {
observer.record_event(&ObserverEvent::ToolCall {
tool: call.name.clone(),
duration: start.elapsed(),
success: r.success,
});
if r.success {
scrub_credentials(&r.output)
} else {
format!("Error: {}", r.error.unwrap_or_else(|| r.output))
}
}
Err(e) => {
observer.record_event(&ObserverEvent::ToolCall {
tool: call.name.clone(),
duration: start.elapsed(),
success: false,
});
format!("Error executing {}: {e}", call.name)
}
}
} else {
format!("Unknown tool: {}", call.name)
};
individual_results.push(result.clone());
for (call, result) in tool_calls.iter().zip(individual_results.iter()) {
let _ = writeln!(
tool_results,
"<tool_result name=\"{}\">\n{}\n</tool_result>",
@@ -1608,9 +1695,7 @@ pub async fn run(
}
// Auto-save conversation turns (skip short/trivial messages)
if config.memory.auto_save
&& user_input.chars().count() >= AUTOSAVE_MIN_MESSAGE_CHARS
{
if config.memory.auto_save && user_input.chars().count() >= AUTOSAVE_MIN_MESSAGE_CHARS {
let user_key = autosave_memory_key("user_msg");
let _ = mem
.store(&user_key, &user_input, MemoryCategory::Conversation, None)
@@ -1881,8 +1966,10 @@ mod tests {
use super::*;
use async_trait::async_trait;
use base64::{engine::general_purpose::STANDARD, Engine as _};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[test]
fn test_scrub_credentials() {
@@ -1973,6 +2060,121 @@ mod tests {
}
}
struct ScriptedProvider {
responses: Arc<Mutex<VecDeque<ChatResponse>>>,
}
impl ScriptedProvider {
fn from_text_responses(responses: Vec<&str>) -> Self {
let scripted = responses
.into_iter()
.map(|text| ChatResponse {
text: Some(text.to_string()),
tool_calls: Vec::new(),
})
.collect();
Self {
responses: Arc::new(Mutex::new(scripted)),
}
}
}
#[async_trait]
impl Provider for ScriptedProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
_message: &str,
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
anyhow::bail!("chat_with_system should not be used in scripted provider tests");
}
async fn chat(
&self,
_request: ChatRequest<'_>,
_model: &str,
_temperature: f64,
) -> anyhow::Result<ChatResponse> {
let mut responses = self
.responses
.lock()
.expect("responses lock should be valid");
responses
.pop_front()
.ok_or_else(|| anyhow::anyhow!("scripted provider exhausted responses"))
}
}
struct DelayTool {
name: String,
delay_ms: u64,
active: Arc<AtomicUsize>,
max_active: Arc<AtomicUsize>,
}
impl DelayTool {
fn new(
name: &str,
delay_ms: u64,
active: Arc<AtomicUsize>,
max_active: Arc<AtomicUsize>,
) -> Self {
Self {
name: name.to_string(),
delay_ms,
active,
max_active,
}
}
}
#[async_trait]
impl Tool for DelayTool {
fn name(&self) -> &str {
&self.name
}
fn description(&self) -> &str {
"Delay tool for testing parallel tool execution"
}
fn parameters_schema(&self) -> serde_json::Value {
serde_json::json!({
"type": "object",
"properties": {
"value": { "type": "string" }
},
"required": ["value"]
})
}
async fn execute(
&self,
args: serde_json::Value,
) -> anyhow::Result<crate::tools::ToolResult> {
let now_active = self.active.fetch_add(1, Ordering::SeqCst) + 1;
self.max_active.fetch_max(now_active, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(self.delay_ms)).await;
self.active.fetch_sub(1, Ordering::SeqCst);
let value = args
.get("value")
.and_then(serde_json::Value::as_str)
.unwrap_or_default()
.to_string();
Ok(crate::tools::ToolResult {
success: true,
output: format!("ok:{value}"),
error: None,
})
}
}
#[tokio::test]
async fn run_tool_call_loop_returns_structured_error_for_non_vision_provider() {
let calls = Arc::new(AtomicUsize::new(0));
@@ -2091,6 +2293,145 @@ mod tests {
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
#[test]
fn should_execute_tools_in_parallel_returns_false_for_single_call() {
let calls = vec![ParsedToolCall {
name: "file_read".to_string(),
arguments: serde_json::json!({"path": "a.txt"}),
}];
assert!(!should_execute_tools_in_parallel(&calls, None));
}
#[test]
fn should_execute_tools_in_parallel_returns_false_when_approval_is_required() {
let calls = vec![
ParsedToolCall {
name: "shell".to_string(),
arguments: serde_json::json!({"command": "pwd"}),
},
ParsedToolCall {
name: "http_request".to_string(),
arguments: serde_json::json!({"url": "https://example.com"}),
},
];
let approval_cfg = crate::config::AutonomyConfig::default();
let approval_mgr = ApprovalManager::from_config(&approval_cfg);
assert!(!should_execute_tools_in_parallel(&calls, Some(&approval_mgr)));
}
#[test]
fn should_execute_tools_in_parallel_returns_true_when_cli_has_no_interactive_approvals() {
let calls = vec![
ParsedToolCall {
name: "shell".to_string(),
arguments: serde_json::json!({"command": "pwd"}),
},
ParsedToolCall {
name: "http_request".to_string(),
arguments: serde_json::json!({"url": "https://example.com"}),
},
];
let approval_cfg = crate::config::AutonomyConfig {
level: crate::security::AutonomyLevel::Full,
..crate::config::AutonomyConfig::default()
};
let approval_mgr = ApprovalManager::from_config(&approval_cfg);
assert!(should_execute_tools_in_parallel(&calls, Some(&approval_mgr)));
}
#[tokio::test]
async fn run_tool_call_loop_executes_multiple_tools_in_parallel_with_ordered_results() {
let provider = ScriptedProvider::from_text_responses(vec![
r#"<tool_call>
{"name":"delay_a","arguments":{"value":"A"}}
</tool_call>
<tool_call>
{"name":"delay_b","arguments":{"value":"B"}}
</tool_call>"#,
"done",
]);
let active = Arc::new(AtomicUsize::new(0));
let max_active = Arc::new(AtomicUsize::new(0));
let tools_registry: Vec<Box<dyn Tool>> = vec![
Box::new(DelayTool::new(
"delay_a",
200,
Arc::clone(&active),
Arc::clone(&max_active),
)),
Box::new(DelayTool::new(
"delay_b",
200,
Arc::clone(&active),
Arc::clone(&max_active),
)),
];
let approval_cfg = crate::config::AutonomyConfig {
level: crate::security::AutonomyLevel::Full,
..crate::config::AutonomyConfig::default()
};
let approval_mgr = ApprovalManager::from_config(&approval_cfg);
let mut history = vec![
ChatMessage::system("test-system"),
ChatMessage::user("run tool calls"),
];
let observer = NoopObserver;
let started = std::time::Instant::now();
let result = run_tool_call_loop(
&provider,
&mut history,
&tools_registry,
&observer,
"mock-provider",
"mock-model",
0.0,
true,
Some(&approval_mgr),
"telegram",
&crate::config::MultimodalConfig::default(),
4,
None,
None,
)
.await
.expect("parallel execution should complete");
let elapsed = started.elapsed();
assert_eq!(result, "done");
assert!(
elapsed < Duration::from_millis(350),
"parallel execution should be faster than sequential fallback; elapsed={elapsed:?}"
);
assert!(
max_active.load(Ordering::SeqCst) >= 2,
"both tools should overlap in execution"
);
let tool_results_message = history
.iter()
.find(|msg| msg.role == "user" && msg.content.starts_with("[Tool results]"))
.expect("tool results message should be present");
let idx_a = tool_results_message
.content
.find("name=\"delay_a\"")
.expect("delay_a result should be present");
let idx_b = tool_results_message
.content
.find("name=\"delay_b\"")
.expect("delay_b result should be present");
assert!(
idx_a < idx_b,
"tool results should preserve input order for tool call mapping"
);
}
#[test]
fn parse_tool_calls_extracts_single_call() {
let response = r#"Let me check that.