//! Sub-agent spawn tool for background execution. //! //! Implements the `subagent_spawn` tool that launches delegate agents //! asynchronously via `tokio::spawn`, returning a session ID immediately. //! See `AGENTS.md` ยง7.3 for the tool change playbook. use super::agent_load_tracker::AgentLoadTracker; use super::agent_selection::{select_agent_with_load, AgentSelectionPolicy}; use super::orchestration_settings::load_orchestration_settings; use super::subagent_registry::{SubAgentRegistry, SubAgentSession, SubAgentStatus}; use super::traits::{Tool, ToolResult}; use crate::config::{DelegateAgentConfig, SubAgentsConfig}; use crate::observability::traits::{Observer, ObserverEvent, ObserverMetric}; use crate::providers::{self, ChatMessage, Provider}; use crate::security::policy::ToolOperation; use crate::security::SecurityPolicy; use async_trait::async_trait; use chrono::Utc; use serde_json::json; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; /// Default timeout for background sub-agent provider calls. const SPAWN_TIMEOUT_SECS: u64 = 300; /// Tool that spawns a delegate agent in the background, returning immediately /// with a session ID. The sub-agent runs asynchronously and stores its result /// in the shared [`SubAgentRegistry`]. pub struct SubAgentSpawnTool { agents: Arc>, security: Arc, fallback_credential: Option, provider_runtime_options: providers::ProviderRuntimeOptions, registry: Arc, parent_tools: Arc>>, multimodal_config: crate::config::MultimodalConfig, subagent_settings: SubAgentsConfig, load_tracker: AgentLoadTracker, runtime_config_path: Option, } impl SubAgentSpawnTool { #[allow(clippy::too_many_arguments)] pub fn new( agents: HashMap, fallback_credential: Option, security: Arc, provider_runtime_options: providers::ProviderRuntimeOptions, registry: Arc, parent_tools: Arc>>, multimodal_config: crate::config::MultimodalConfig, subagents_enabled: bool, max_concurrent_subagents: usize, auto_activate: bool, runtime_config_path: Option, ) -> Self { let mut subagent_settings = SubAgentsConfig::default(); subagent_settings.enabled = subagents_enabled; subagent_settings.max_concurrent = max_concurrent_subagents.max(1); subagent_settings.auto_activate = auto_activate; Self { agents: Arc::new(agents), security, fallback_credential, provider_runtime_options, registry, parent_tools, multimodal_config, subagent_settings, load_tracker: AgentLoadTracker::new(), runtime_config_path, } } /// Reuse a shared runtime load tracker. pub fn with_load_tracker(mut self, load_tracker: AgentLoadTracker) -> Self { self.load_tracker = load_tracker; self } fn runtime_subagent_settings(&self) -> SubAgentsConfig { let mut settings = self.subagent_settings.clone(); settings.max_concurrent = settings.max_concurrent.max(1); settings.load_window_secs = settings.load_window_secs.max(1); settings.queue_poll_ms = settings.queue_poll_ms.max(1); if let Some(path) = self.runtime_config_path.as_deref() { match load_orchestration_settings(path) { Ok((_teams, subagents)) => { settings = subagents; settings.max_concurrent = settings.max_concurrent.max(1); settings.load_window_secs = settings.load_window_secs.max(1); settings.queue_poll_ms = settings.queue_poll_ms.max(1); } Err(error) => { tracing::debug!( path = %path.display(), "subagent_spawn: failed to hot-reload orchestration settings: {error}" ); } } } settings } async fn wait_for_slot_and_insert( &self, mut session: SubAgentSession, settings: &SubAgentsConfig, ) -> Result<(), usize> { let max_concurrent = settings.max_concurrent.max(1); match self.registry.try_insert(session, max_concurrent) { Ok(()) => return Ok(()), Err((running, returned)) => { if settings.queue_wait_ms == 0 { return Err(running); } session = *returned; } } let poll_ms = settings.queue_poll_ms.max(1); let wait_deadline = tokio::time::Instant::now() + Duration::from_millis(u64::try_from(settings.queue_wait_ms).unwrap_or(u64::MAX)); let poll_duration = Duration::from_millis(u64::try_from(poll_ms).unwrap_or(1)); let mut last_running = self.registry.running_count(); while tokio::time::Instant::now() < wait_deadline { tokio::time::sleep(poll_duration).await; match self.registry.try_insert(session, max_concurrent) { Ok(()) => return Ok(()), Err((running, returned)) => { last_running = running; session = *returned; } } } Err(last_running) } } #[async_trait] impl Tool for SubAgentSpawnTool { fn name(&self) -> &str { "subagent_spawn" } fn description(&self) -> &str { "Spawn a delegate agent in the background. Returns immediately with a session_id. \ `agent` can be omitted or set to `auto` when subagent auto-activation is enabled. \ Use subagent_list to check progress and subagent_manage to steer or kill." } fn parameters_schema(&self) -> serde_json::Value { let agent_names: Vec<&str> = self.agents.keys().map(|s: &String| s.as_str()).collect(); json!({ "type": "object", "additionalProperties": false, "properties": { "agent": { "type": "string", "minLength": 1, "description": format!( "Name of the agent to spawn. Available: {}", if agent_names.is_empty() { "(none configured)".to_string() } else { agent_names.join(", ") } ) }, "task": { "type": "string", "minLength": 1, "description": "The task/prompt to send to the sub-agent" }, "context": { "type": "string", "description": "Optional context to prepend (e.g. relevant code, prior findings)" } }, "required": ["task"] }) } async fn execute(&self, args: serde_json::Value) -> anyhow::Result { let requested_agent = args.get("agent").and_then(|v| v.as_str()).map(str::trim); let task = args .get("task") .and_then(|v| v.as_str()) .map(str::trim) .ok_or_else(|| anyhow::anyhow!("Missing 'task' parameter"))?; if task.is_empty() { return Ok(ToolResult { success: false, output: String::new(), error: Some("'task' parameter must not be empty".into()), }); } let context = args .get("context") .and_then(|v| v.as_str()) .map(str::trim) .unwrap_or(""); let subagent_settings = self.runtime_subagent_settings(); if !subagent_settings.enabled { return Ok(ToolResult { success: false, output: String::new(), error: Some( "Subagents are currently disabled. Re-enable with model_routing_config action set_orchestration." .to_string(), ), }); } // Security enforcement: spawn is a write operation if let Err(error) = self .security .enforce_tool_operation(ToolOperation::Act, "subagent_spawn") { return Ok(ToolResult { success: false, output: String::new(), error: Some(error), }); } let load_window_secs = u64::try_from(subagent_settings.load_window_secs).unwrap_or(1); let load_snapshot = self .load_tracker .snapshot(Duration::from_secs(load_window_secs.max(1))); let selection_policy = AgentSelectionPolicy { strategy: subagent_settings.strategy, inflight_penalty: subagent_settings.inflight_penalty, recent_selection_penalty: subagent_settings.recent_selection_penalty, recent_failure_penalty: subagent_settings.recent_failure_penalty, }; let selection = match select_agent_with_load( self.agents.as_ref(), requested_agent, task, context, subagent_settings.auto_activate, None, Some(&load_snapshot), selection_policy, ) { Ok(selection) => selection, Err(error) => { return Ok(ToolResult { success: false, output: String::new(), error: Some(error.to_string()), }); } }; let agent_name = selection.agent_name.clone(); let Some(agent_config) = self.agents.get(&agent_name).cloned() else { return Ok(ToolResult { success: false, output: String::new(), error: Some(format!("Resolved agent '{agent_name}' is unavailable")), }); }; // Create provider for this agent let provider_credential_owned = agent_config .api_key .clone() .or_else(|| self.fallback_credential.clone()); #[allow(clippy::option_as_ref_deref)] let provider_credential = provider_credential_owned.as_ref().map(String::as_str); let provider: Box = match providers::create_provider_with_options( &agent_config.provider, provider_credential, &self.provider_runtime_options, ) { Ok(p) => p, Err(e) => { self.load_tracker.record_failure(&agent_name); return Ok(ToolResult { success: false, output: String::new(), error: Some(format!( "Failed to create provider '{}' for agent '{agent_name}': {e}", agent_config.provider )), }); } }; // Build the message let full_prompt = if context.is_empty() { task.to_string() } else { format!("[Context]\n{context}\n\n[Task]\n{task}") }; let session_id = uuid::Uuid::new_v4().to_string(); let agent_name_owned = agent_name.clone(); let task_owned = task.to_string(); // Determine if agentic mode let is_agentic = agent_config.agentic; let parent_tools = self.parent_tools.clone(); let multimodal_config = self.multimodal_config.clone(); let mut load_lease = self.load_tracker.start(&agent_name_owned); // Atomically check concurrent limit and register session to prevent race conditions. let session = SubAgentSession { id: session_id.clone(), agent_name: agent_name_owned.clone(), task: task_owned, status: SubAgentStatus::Running, started_at: Utc::now(), completed_at: None, result: None, handle: None, }; if let Err(running) = self .wait_for_slot_and_insert(session, &subagent_settings) .await { load_lease.mark_failure(); return Ok(ToolResult { success: false, output: String::new(), error: Some(format!( "Maximum concurrent sub-agents reached ({limit}), currently running {running}. \ Wait for running agents to complete, tune queue_wait_ms/queue_poll_ms, or kill some.", limit = subagent_settings.max_concurrent )), }); } // Clone what we need for the spawned task let registry = self.registry.clone(); let sid = session_id.clone(); let mut bg_load_lease = load_lease; let handle = tokio::spawn(async move { let result = if is_agentic { run_agentic_background( &agent_name_owned, &agent_config, &*provider, &full_prompt, &parent_tools, &multimodal_config, ) .await } else { run_simple_background(&agent_name_owned, &agent_config, &*provider, &full_prompt) .await }; match result { Ok(tool_result) => { if tool_result.success { registry.complete(&sid, tool_result); bg_load_lease.mark_success(); } else { registry.fail( &sid, tool_result .error .unwrap_or_else(|| "Unknown error".to_string()), ); bg_load_lease.mark_failure(); } } Err(e) => { registry.fail(&sid, format!("Agent '{agent_name_owned}' error: {e}")); bg_load_lease.mark_failure(); } } }); // Store the handle for cancellation self.registry.set_handle(&session_id, handle); Ok(ToolResult { success: true, output: json!({ "session_id": session_id, "agent": agent_name, "selection_mode": selection.selection_mode, "selection_score": selection.score, "max_concurrent": subagent_settings.max_concurrent, "queue_wait_ms": subagent_settings.queue_wait_ms, "queue_poll_ms": subagent_settings.queue_poll_ms, "status": "running", "message": "Sub-agent spawned in background. Use subagent_list or subagent_manage to check progress." }) .to_string(), error: None, }) } } async fn run_simple_background( agent_name: &str, agent_config: &DelegateAgentConfig, provider: &dyn Provider, full_prompt: &str, ) -> anyhow::Result { let temperature = agent_config.temperature.unwrap_or(0.7); let result = tokio::time::timeout( Duration::from_secs(SPAWN_TIMEOUT_SECS), provider.chat_with_system( agent_config.system_prompt.as_deref(), full_prompt, &agent_config.model, temperature, ), ) .await; let result = match result { Ok(inner) => inner, Err(_elapsed) => { return Ok(ToolResult { success: false, output: String::new(), error: Some(format!( "Agent '{agent_name}' timed out after {SPAWN_TIMEOUT_SECS}s" )), }); } }; match result { Ok(response) => { let rendered = if response.trim().is_empty() { "[Empty response]".to_string() } else { response }; Ok(ToolResult { success: true, output: format!( "[Agent '{agent_name}' ({provider}/{model})]\n{rendered}", provider = agent_config.provider, model = agent_config.model ), error: None, }) } Err(e) => Ok(ToolResult { success: false, output: String::new(), error: Some(format!("Agent '{agent_name}' failed: {e}")), }), } } struct ToolArcRef { inner: Arc, } impl ToolArcRef { fn new(inner: Arc) -> Self { Self { inner } } } #[async_trait] impl Tool for ToolArcRef { fn name(&self) -> &str { self.inner.name() } fn description(&self) -> &str { self.inner.description() } fn parameters_schema(&self) -> serde_json::Value { self.inner.parameters_schema() } async fn execute(&self, args: serde_json::Value) -> anyhow::Result { self.inner.execute(args).await } } struct NoopObserver; impl Observer for NoopObserver { fn record_event(&self, _event: &ObserverEvent) {} fn record_metric(&self, _metric: &ObserverMetric) {} fn name(&self) -> &str { "noop" } fn as_any(&self) -> &dyn std::any::Any { self } } async fn run_agentic_background( agent_name: &str, agent_config: &DelegateAgentConfig, provider: &dyn Provider, full_prompt: &str, parent_tools: &[Arc], multimodal_config: &crate::config::MultimodalConfig, ) -> anyhow::Result { if agent_config.allowed_tools.is_empty() { return Ok(ToolResult { success: false, output: String::new(), error: Some(format!( "Agent '{agent_name}' has agentic=true but allowed_tools is empty" )), }); } let allowed = agent_config .allowed_tools .iter() .map(|name| name.trim()) .filter(|name| !name.is_empty()) .collect::>(); let sub_tools: Vec> = parent_tools .iter() .filter(|tool| allowed.contains(tool.name())) .filter(|tool| { tool.name() != "delegate" && tool.name() != "subagent_spawn" && tool.name() != "subagent_manage" }) .map(|tool| Box::new(ToolArcRef::new(tool.clone())) as Box) .collect(); if sub_tools.is_empty() { return Ok(ToolResult { success: false, output: String::new(), error: Some(format!( "Agent '{agent_name}' has no executable tools after filtering allowlist ({})", agent_config.allowed_tools.join(", ") )), }); } let temperature = agent_config.temperature.unwrap_or(0.7); let mut history = Vec::new(); if let Some(system_prompt) = agent_config.system_prompt.as_ref() { history.push(ChatMessage::system(system_prompt.clone())); } history.push(ChatMessage::user(full_prompt.to_string())); let noop_observer = NoopObserver; let result = tokio::time::timeout( Duration::from_secs(SPAWN_TIMEOUT_SECS), crate::agent::loop_::run_tool_call_loop( provider, &mut history, &sub_tools, &noop_observer, &agent_config.provider, &agent_config.model, temperature, true, None, "subagent_spawn", multimodal_config, agent_config.max_iterations, None, None, None, &[], ), ) .await; match result { Ok(Ok(response)) => { let rendered = if response.trim().is_empty() { "[Empty response]".to_string() } else { response }; Ok(ToolResult { success: true, output: format!( "[Agent '{agent_name}' ({provider}/{model}, agentic)]\n{rendered}", provider = agent_config.provider, model = agent_config.model ), error: None, }) } Ok(Err(e)) => Ok(ToolResult { success: false, output: String::new(), error: Some(format!("Agent '{agent_name}' failed: {e}")), }), Err(_) => Ok(ToolResult { success: false, output: String::new(), error: Some(format!( "Agent '{agent_name}' timed out after {SPAWN_TIMEOUT_SECS}s" )), }), } } #[cfg(test)] mod tests { use super::*; use crate::security::{AutonomyLevel, SecurityPolicy}; use tempfile::TempDir; fn test_security() -> Arc { Arc::new(SecurityPolicy::default()) } fn sample_agents() -> HashMap { let mut agents = HashMap::new(); agents.insert( "researcher".to_string(), DelegateAgentConfig { provider: "ollama".to_string(), model: "llama3".to_string(), system_prompt: Some("You are a research assistant.".to_string()), api_key: None, enabled: true, capabilities: vec!["research".to_string()], priority: 0, temperature: Some(0.3), max_depth: 3, agentic: false, allowed_tools: Vec::new(), max_iterations: 10, }, ); agents } #[allow(clippy::fn_params_excessive_bools)] fn write_runtime_orchestration_config( path: &std::path::Path, teams_enabled: bool, teams_auto_activate: bool, teams_max_agents: usize, subagents_enabled: bool, subagents_auto_activate: bool, subagents_max_concurrent: usize, ) { let contents = format!( r#" default_provider = "openrouter" default_model = "anthropic/claude-sonnet-4.6" default_temperature = 0.7 [agent.teams] enabled = {teams_enabled} auto_activate = {teams_auto_activate} max_agents = {teams_max_agents} [agent.subagents] enabled = {subagents_enabled} auto_activate = {subagents_auto_activate} max_concurrent = {subagents_max_concurrent} queue_wait_ms = 0 queue_poll_ms = 10 "# ); std::fs::write(path, contents).unwrap(); } fn make_tool( agents: HashMap, security: Arc, ) -> SubAgentSpawnTool { SubAgentSpawnTool::new( agents, None, security, providers::ProviderRuntimeOptions::default(), Arc::new(SubAgentRegistry::new()), Arc::new(Vec::new()), crate::config::MultimodalConfig::default(), true, 10, true, None, ) } #[test] fn name_and_schema() { let tool = make_tool(sample_agents(), test_security()); assert_eq!(tool.name(), "subagent_spawn"); let schema = tool.parameters_schema(); assert!(schema["properties"]["agent"].is_object()); assert!(schema["properties"]["task"].is_object()); assert!(schema["properties"]["context"].is_object()); let required = schema["required"].as_array().unwrap(); assert!(required.contains(&json!("task"))); assert_eq!(schema["additionalProperties"], json!(false)); } #[test] fn description_not_empty() { let tool = make_tool(sample_agents(), test_security()); assert!(!tool.description().is_empty()); } #[tokio::test] async fn missing_agent_param() { let tool = make_tool(sample_agents(), test_security()); let result = tool.execute(json!({"task": "test"})).await; assert!(result.is_ok()); } #[tokio::test] async fn missing_task_param() { let tool = make_tool(sample_agents(), test_security()); let result = tool.execute(json!({"agent": "researcher"})).await; assert!(result.is_err()); } #[tokio::test] async fn blank_agent_uses_auto_selection() { let tool = make_tool(sample_agents(), test_security()); let result = tool .execute(json!({"agent": " ", "task": "test"})) .await .unwrap(); if result.success { let output: serde_json::Value = serde_json::from_str(&result.output).unwrap(); assert_eq!(output["agent"], json!("researcher")); assert!(output["selection_mode"].is_string()); } } #[tokio::test] async fn blank_task_rejected() { let tool = make_tool(sample_agents(), test_security()); let result = tool .execute(json!({"agent": "researcher", "task": " "})) .await .unwrap(); assert!(!result.success); assert!(result.error.unwrap().contains("must not be empty")); } #[tokio::test] async fn unknown_agent_returns_error() { let tool = make_tool(sample_agents(), test_security()); let result = tool .execute(json!({"agent": "nonexistent", "task": "test"})) .await .unwrap(); assert!(!result.success); assert!(result.error.unwrap().contains("Unknown agent")); } #[tokio::test] async fn spawn_blocked_in_readonly_mode() { let readonly = Arc::new(SecurityPolicy { autonomy: AutonomyLevel::ReadOnly, ..SecurityPolicy::default() }); let tool = make_tool(sample_agents(), readonly); let result = tool .execute(json!({"agent": "researcher", "task": "test"})) .await .unwrap(); assert!(!result.success); assert!(result .error .as_deref() .unwrap_or("") .contains("read-only mode")); } #[tokio::test] async fn spawn_blocked_when_rate_limited() { let limited = Arc::new(SecurityPolicy { max_actions_per_hour: 0, ..SecurityPolicy::default() }); let tool = make_tool(sample_agents(), limited); let result = tool .execute(json!({"agent": "researcher", "task": "test"})) .await .unwrap(); assert!(!result.success); assert!(result .error .as_deref() .unwrap_or("") .contains("Rate limit exceeded")); } #[tokio::test] async fn runtime_subagent_disable_blocks_spawn() { let tmp = TempDir::new().unwrap(); let config_path = tmp.path().join("config.toml"); write_runtime_orchestration_config(&config_path, true, true, 8, false, true, 2); let tool = SubAgentSpawnTool::new( sample_agents(), None, test_security(), providers::ProviderRuntimeOptions::default(), Arc::new(SubAgentRegistry::new()), Arc::new(Vec::new()), crate::config::MultimodalConfig::default(), true, 10, true, Some(config_path), ); let result = tool .execute(json!({"agent": "researcher", "task": "test"})) .await .unwrap(); assert!(!result.success); assert!(result .error .unwrap_or_default() .contains("Subagents are currently disabled")); } #[tokio::test] async fn runtime_subagent_auto_activation_disable_requires_agent() { let tmp = TempDir::new().unwrap(); let config_path = tmp.path().join("config.toml"); write_runtime_orchestration_config(&config_path, true, true, 8, true, false, 2); let tool = SubAgentSpawnTool::new( sample_agents(), None, test_security(), providers::ProviderRuntimeOptions::default(), Arc::new(SubAgentRegistry::new()), Arc::new(Vec::new()), crate::config::MultimodalConfig::default(), true, 10, true, Some(config_path), ); let result = tool.execute(json!({"task": "test"})).await.unwrap(); assert!(!result.success); assert!(result .error .unwrap_or_default() .contains("automatic activation is disabled")); } #[tokio::test] async fn spawn_returns_session_id() { // The agent has an invalid provider so the background task will fail, // but spawn itself returns immediately with a session_id. let tool = make_tool(sample_agents(), test_security()); let result = tool .execute(json!({"agent": "researcher", "task": "test task"})) .await .unwrap(); // Spawn may fail at provider creation if the provider is invalid // For ollama, it should successfully create the provider even without a running server // The result could succeed (spawn) or fail (invalid provider) depending on environment if result.success { let output: serde_json::Value = serde_json::from_str(&result.output).unwrap(); assert!(output["session_id"].is_string()); assert_eq!(output["status"], "running"); } // Either way, no panic } #[tokio::test] async fn spawn_no_agents_configured() { let tool = make_tool(HashMap::new(), test_security()); let result = tool .execute(json!({"agent": "any", "task": "test"})) .await .unwrap(); assert!(!result.success); assert!(result .error .unwrap_or_default() .contains("No delegate agents are configured")); } #[tokio::test] async fn spawn_respects_concurrent_limit() { let max_concurrent = 3usize; let registry = Arc::new(SubAgentRegistry::new()); let tmp = TempDir::new().unwrap(); let config_path = tmp.path().join("config.toml"); write_runtime_orchestration_config(&config_path, true, true, 8, true, true, max_concurrent); // Fill up the registry with running sessions for i in 0..max_concurrent { registry.insert(SubAgentSession { id: format!("s{i}"), agent_name: "agent".to_string(), task: "task".to_string(), status: SubAgentStatus::Running, started_at: Utc::now(), completed_at: None, result: None, handle: None, }); } let tool = SubAgentSpawnTool::new( sample_agents(), None, test_security(), providers::ProviderRuntimeOptions::default(), registry, Arc::new(Vec::new()), crate::config::MultimodalConfig::default(), true, max_concurrent, true, Some(config_path), ); let result = tool .execute(json!({"agent": "researcher", "task": "test"})) .await .unwrap(); assert!(!result.success); assert!(result.error.unwrap().contains("Maximum concurrent")); } #[tokio::test] async fn runtime_max_concurrent_override_is_applied() { let tmp = TempDir::new().unwrap(); let config_path = tmp.path().join("config.toml"); write_runtime_orchestration_config(&config_path, true, true, 8, true, true, 1); let registry = Arc::new(SubAgentRegistry::new()); registry.insert(SubAgentSession { id: "existing".to_string(), agent_name: "researcher".to_string(), task: "task".to_string(), status: SubAgentStatus::Running, started_at: Utc::now(), completed_at: None, result: None, handle: None, }); let tool = SubAgentSpawnTool::new( sample_agents(), None, test_security(), providers::ProviderRuntimeOptions::default(), registry, Arc::new(Vec::new()), crate::config::MultimodalConfig::default(), true, 10, true, Some(config_path), ); let result = tool .execute(json!({"agent": "researcher", "task": "test"})) .await .unwrap(); assert!(!result.success); assert!(result .error .unwrap_or_default() .contains("Maximum concurrent sub-agents reached (1)")); } #[tokio::test] async fn schema_lists_agent_names() { let tool = make_tool(sample_agents(), test_security()); let schema = tool.parameters_schema(); let desc = schema["properties"]["agent"]["description"] .as_str() .unwrap(); assert!(desc.contains("researcher")); } }