feat(agent): add end-to-end team orchestration bundle

This commit is contained in:
chumyin 2026-03-01 12:50:31 +00:00 committed by Chum Yin
parent b64cae9d3d
commit be0f52fce7
6 changed files with 4031 additions and 0 deletions

View File

@ -0,0 +1,260 @@
# Agent Teams Orchestration Evaluation Pack (2026-03-01)
Status: Deep optimization complete, validation evidence captured.
Linear parent: [RMN-284](https://linear.app/zeroclawlabs/issue/RMN-284/improvement-agent-teams-orchestration-research)
Execution slices: RMN-285, RMN-286, RMN-287, RMN-288, RMN-289
## 1) Objective
Define a practical and testable multi-agent orchestration contract that:
- decomposes complex work into parallelizable units,
- constrains communication overhead,
- preserves quality through explicit verification,
- and enforces token-aware execution policies.
## 2) A2A-Lite Protocol Contract
All inter-agent messages MUST follow a small fixed payload shape.
### Required fields
- `run_id`: stable run identifier
- `task_id`: task node identifier in DAG
- `sender`: agent id
- `recipient`: agent id or coordinator
- `status`: `queued|running|blocked|done|failed`
- `confidence`: `0-100`
- `risk_level`: `low|medium|high|critical`
- `summary`: short natural-language summary (token-capped)
- `artifacts`: list of evidence pointers (paths/URIs)
- `needs`: dependency requests or unblocks
- `next_action`: next deterministic action
### Message discipline
- Never forward raw transcripts by default.
- Always send evidence pointers, not full payload dumps.
- Keep summaries bounded by budget profile.
- Escalate to coordinator when risk is `high|critical`.
### Example message
```json
{
"run_id": "run-2026-03-01-001",
"task_id": "task-17",
"sender": "worker-protocol",
"recipient": "lead",
"status": "done",
"confidence": 0.91,
"risk_level": "medium",
"summary": "Protocol schema validated against three handoff paths; escalation path requires owner signoff.",
"artifacts": [
"docs/project/agent-teams-orchestration-eval-2026-03-01.md#2-a2a-lite-protocol-contract",
"scripts/ci/agent_team_orchestration_eval.py"
],
"needs": [
"scheduler-policy-review"
],
"next_action": "handoff-to-scheduler-owner"
}
```
## 3) DAG Scheduling + Budget Policy
### Decomposition rules
- Build a DAG first; avoid flat task lists.
- Parallelize only nodes without write-conflict overlap.
- Each node has one owner and explicit acceptance checks.
### Topology policy
- Default: `star` (lead + bounded workers).
- Escalation: temporary peer channels for conflict resolution only.
- Avoid sustained mesh communication unless explicitly justified.
### Budget hierarchy
- Run budget
- Team budget
- Task budget
- Message budget
### Auto-degradation policy (in order)
1. Reduce peer-to-peer communication.
2. Tighten summary caps.
3. Reduce active workers.
4. Switch lower-priority workers to lower-cost model tier.
5. Increase compaction cadence.
## 4) KPI Schema
Required metrics per run:
- `throughput` (tasks/day equivalent)
- `pass_rate`
- `defect_escape`
- `total_tokens`
- `coordination_tokens`
- `coordination_ratio`
- `p95_latency_s`
Derived governance checks:
- Coordination overhead target: `coordination_ratio <= 0.20`
- Quality floor: `pass_rate >= 0.80`
## 5) Experiment Matrix
Run all topology modes under `low|medium|high` budget buckets:
- `single`
- `lead_subagent`
- `star_team`
- `mesh_team`
Control variables:
- same workload set
- same task count
- same average task token baseline
Decision output:
- cost-optimal topology
- quality-optimal topology
- production default recommendation
## 5.1) Deep Optimization Dimensions
The evaluation engine now supports deeper policy dimensions:
- Workload profiles: `implementation`, `debugging`, `research`, `mixed`
- Protocol modes: `a2a_lite`, `transcript`
- Degradation policies: `none`, `auto`, `aggressive`
- Recommendation modes: `balanced`, `cost`, `quality`
- Gate checks: coordination ratio, pass rate, latency, budget compliance
Observed implications:
- `a2a_lite` keeps summary payload and coordination tokens bounded.
- `transcript` mode can substantially increase coordination overhead and budget risk.
- `auto` degradation can reduce participants and summary size when budget pressure is detected.
## 6) Validation Flow
1. Run simulation script and export JSON report.
2. Run protocol comparison (`a2a_lite` vs `transcript`).
3. Run budget sweep with degradation policy enabled.
4. Validate gating thresholds.
5. Attach output artifacts to the corresponding Linear issue.
6. Promote to rollout only when all acceptance checks pass.
## 7) Local Commands
```bash
python3 scripts/ci/agent_team_orchestration_eval.py --budget medium --json-output -
python3 scripts/ci/agent_team_orchestration_eval.py --budget medium --topologies star_team --enforce-gates
python3 scripts/ci/agent_team_orchestration_eval.py --budget medium --protocol-mode transcript --json-output -
python3 scripts/ci/agent_team_orchestration_eval.py --all-budgets --degradation-policy auto --json-output docs/project/agent-teams-orchestration-eval-sample-2026-03-01.json
python3 -m unittest scripts.ci.tests.test_agent_team_orchestration_eval -v
cargo test team_orchestration --lib
```
## 7.1) Key Validation Findings (2026-03-01)
- Medium budget + `a2a_lite`: recommendation = `star_team`
- Medium budget + `transcript`: recommendation = `lead_subagent` (coordination overhead spikes in larger teams)
- Budget sweep + `auto` degradation: mesh topology can be de-risked via participant reduction + tighter summaries, while `star_team` remains the balanced default
Sample evidence artifact:
- `docs/project/agent-teams-orchestration-eval-sample-2026-03-01.json`
## 7.2) Repository Core Implementation (Rust)
In addition to script-level simulation, the orchestration engine is implemented
as a reusable Rust module:
- `src/agent/team_orchestration.rs`
- `src/agent/mod.rs` (`pub mod team_orchestration;`)
Core capabilities implemented in Rust:
- `A2ALiteMessage` + `HandoffPolicy` validation and compaction
- `TeamTopology` evaluation under budget/workload/protocol dimensions
- `DegradationPolicy` (`none|auto|aggressive`) for pressure handling
- Multi-gate evaluation (`coordination_ratio`, `pass_rate`, `latency`, `budget`)
- Recommendation scoring (`balanced|cost|quality`)
- Budget sweep helper across `low|medium|high`
- DAG planner with conflict-aware batching (`build_conflict_aware_execution_plan`)
- Task budget allocator (`allocate_task_budgets`) for run-budget pressure
- Plan validator (`validate_execution_plan`) with topology/order/budget/lock checks
- Plan diagnostics (`analyze_execution_plan`) for critical path and parallel efficiency
- Batch handoff synthesis (`build_batch_handoff_messages`) for planner->worker A2A-Lite
- End-to-end orchestration API (`orchestrate_task_graph`) linking eval + plan + validation + diagnostics + handoff generation
- Handoff token estimators (`estimate_handoff_tokens`, `estimate_batch_handoff_tokens`) for communication-budget governance
Rust unit-test status:
- `cargo test team_orchestration --lib`
- result: `17 passed; 0 failed`
## 7.3) Concurrency Decomposition Contract (Rust planner)
The Rust planner now provides a deterministic decomposition pipeline:
1. validate task graph (`TaskNodeSpec`, dependency integrity)
2. topological sort with cycle detection
3. budget allocation per task under run budget pressure
4. ownership-lock-aware batch construction for bounded parallelism
Planner outputs:
- `ExecutionPlan.topological_order`
- `ExecutionPlan.budgets`
- `ExecutionPlan.batches`
- `ExecutionPlan.total_estimated_tokens`
This is the repository-native basis for converting complex work into safe
parallel slices while reducing merge/file ownership conflicts and token waste.
Additional hardening added:
- `validate_execution_plan(plan, tasks)` for dependency/topological-order/conflict/budget integrity checks
- `analyze_execution_plan(plan, tasks)` for critical-path and parallel-efficiency diagnostics
- `build_batch_handoff_messages(run_id, plan, tasks, policy)` for planner-to-worker A2A-Lite handoffs
## 7.4) End-to-End Orchestration Bundle
`orchestrate_task_graph(...)` now exposes one deterministic orchestration entrypoint:
1. evaluate topology candidates under budget/workload/protocol/degradation gates
2. choose recommended topology
3. derive planner config from selected topology and budget envelope
4. build conflict-aware execution plan
5. validate the plan
6. compute plan diagnostics
7. generate compact A2A-Lite batch handoff messages
8. estimate communication token cost for handoffs
Output contract (`OrchestrationBundle`) includes:
- recommendation report and selected topology evidence
- planner config used for execution
- validated execution plan
- diagnostics (`critical_path_len`, parallelism metrics, lock counts)
- batch handoff messages
- estimated handoff token footprint
## 8) Definition of Done
- Protocol contract documented and example messages included.
- Scheduling and budget degradation policy documented.
- KPI schema and experiment matrix documented.
- Evaluation script and tests passing in local validation.
- Protocol comparison and budget sweep evidence generated.
- Linear evidence links updated for execution traceability.

View File

@ -0,0 +1,730 @@
{
"schema_version": "zeroclaw.agent-team-eval.v1",
"budget_profile": "low",
"inputs": {
"tasks": 24,
"avg_task_tokens": 1400,
"coordination_rounds": 4,
"topologies": [
"single",
"lead_subagent",
"star_team",
"mesh_team"
],
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_policy": "auto",
"recommendation_mode": "balanced",
"max_coordination_ratio": 0.2,
"min_pass_rate": 0.8,
"max_p95_latency": 180.0
},
"results": [
{
"topology": "single",
"participants": 1,
"model_tier": "primary",
"tasks": 24,
"tasks_per_worker": 24.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": false,
"degradation_actions": [],
"execution_tokens": 34608,
"coordination_tokens": 0,
"cache_savings_tokens": 2422,
"total_tokens": 32186,
"coordination_ratio": 0.0,
"estimated_pass_rate": 0.76,
"estimated_defect_escape": 0.24,
"estimated_p95_latency_s": 152.64,
"estimated_throughput_tpd": 13584.91,
"budget_limit_tokens": 33840,
"budget_headroom_tokens": 1654,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": false,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": false
},
{
"topology": "lead_subagent",
"participants": 2,
"model_tier": "primary",
"tasks": 24,
"tasks_per_worker": 24.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": false,
"degradation_actions": [],
"execution_tokens": 32877,
"coordination_tokens": 557,
"cache_savings_tokens": 3287,
"total_tokens": 30147,
"coordination_ratio": 0.0185,
"estimated_pass_rate": 0.82,
"estimated_defect_escape": 0.18,
"estimated_p95_latency_s": 152.82,
"estimated_throughput_tpd": 13568.9,
"budget_limit_tokens": 33840,
"budget_headroom_tokens": 3693,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": true,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": true
},
{
"topology": "star_team",
"participants": 3,
"model_tier": "primary",
"tasks": 24,
"tasks_per_worker": 12.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": false,
"degradation_actions": [],
"execution_tokens": 31839,
"coordination_tokens": 1611,
"cache_savings_tokens": 3820,
"total_tokens": 29630,
"coordination_ratio": 0.0544,
"estimated_pass_rate": 0.86,
"estimated_defect_escape": 0.14,
"estimated_p95_latency_s": 76.84,
"estimated_throughput_tpd": 26985.94,
"budget_limit_tokens": 33840,
"budget_headroom_tokens": 4210,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": true,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": true
},
{
"topology": "mesh_team",
"participants": 3,
"model_tier": "primary",
"tasks": 24,
"tasks_per_worker": 12.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": false,
"degradation_actions": [],
"execution_tokens": 33569,
"coordination_tokens": 1611,
"cache_savings_tokens": 4028,
"total_tokens": 31152,
"coordination_ratio": 0.0517,
"estimated_pass_rate": 0.8,
"estimated_defect_escape": 0.2,
"estimated_p95_latency_s": 76.84,
"estimated_throughput_tpd": 26985.94,
"budget_limit_tokens": 33840,
"budget_headroom_tokens": 2688,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": true,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": true
}
],
"rankings": {
"cost_asc": [
"star_team",
"lead_subagent",
"mesh_team",
"single"
],
"coordination_ratio_asc": [
"single",
"lead_subagent",
"mesh_team",
"star_team"
],
"latency_asc": [
"star_team",
"mesh_team",
"single",
"lead_subagent"
],
"pass_rate_desc": [
"star_team",
"lead_subagent",
"mesh_team",
"single"
]
},
"recommendation": {
"mode": "balanced",
"recommended_topology": "star_team",
"reason": "weighted_score",
"scores": [
{
"topology": "star_team",
"score": 0.50354,
"gate_pass": true
},
{
"topology": "mesh_team",
"score": 0.45944,
"gate_pass": true
},
{
"topology": "lead_subagent",
"score": 0.38029,
"gate_pass": true
}
],
"used_gate_filtered_pool": true
},
"budget_sweep": [
{
"budget_profile": "low",
"results": [
{
"topology": "single",
"participants": 1,
"model_tier": "primary",
"tasks": 24,
"tasks_per_worker": 24.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": false,
"degradation_actions": [],
"execution_tokens": 34608,
"coordination_tokens": 0,
"cache_savings_tokens": 2422,
"total_tokens": 32186,
"coordination_ratio": 0.0,
"estimated_pass_rate": 0.76,
"estimated_defect_escape": 0.24,
"estimated_p95_latency_s": 152.64,
"estimated_throughput_tpd": 13584.91,
"budget_limit_tokens": 33840,
"budget_headroom_tokens": 1654,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": false,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": false
},
{
"topology": "lead_subagent",
"participants": 2,
"model_tier": "primary",
"tasks": 24,
"tasks_per_worker": 24.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": false,
"degradation_actions": [],
"execution_tokens": 32877,
"coordination_tokens": 557,
"cache_savings_tokens": 3287,
"total_tokens": 30147,
"coordination_ratio": 0.0185,
"estimated_pass_rate": 0.82,
"estimated_defect_escape": 0.18,
"estimated_p95_latency_s": 152.82,
"estimated_throughput_tpd": 13568.9,
"budget_limit_tokens": 33840,
"budget_headroom_tokens": 3693,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": true,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": true
},
{
"topology": "star_team",
"participants": 3,
"model_tier": "primary",
"tasks": 24,
"tasks_per_worker": 12.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": false,
"degradation_actions": [],
"execution_tokens": 31839,
"coordination_tokens": 1611,
"cache_savings_tokens": 3820,
"total_tokens": 29630,
"coordination_ratio": 0.0544,
"estimated_pass_rate": 0.86,
"estimated_defect_escape": 0.14,
"estimated_p95_latency_s": 76.84,
"estimated_throughput_tpd": 26985.94,
"budget_limit_tokens": 33840,
"budget_headroom_tokens": 4210,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": true,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": true
},
{
"topology": "mesh_team",
"participants": 3,
"model_tier": "primary",
"tasks": 24,
"tasks_per_worker": 12.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": false,
"degradation_actions": [],
"execution_tokens": 33569,
"coordination_tokens": 1611,
"cache_savings_tokens": 4028,
"total_tokens": 31152,
"coordination_ratio": 0.0517,
"estimated_pass_rate": 0.8,
"estimated_defect_escape": 0.2,
"estimated_p95_latency_s": 76.84,
"estimated_throughput_tpd": 26985.94,
"budget_limit_tokens": 33840,
"budget_headroom_tokens": 2688,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": true,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": true
}
],
"rankings": {
"cost_asc": [
"star_team",
"lead_subagent",
"mesh_team",
"single"
],
"coordination_ratio_asc": [
"single",
"lead_subagent",
"mesh_team",
"star_team"
],
"latency_asc": [
"star_team",
"mesh_team",
"single",
"lead_subagent"
],
"pass_rate_desc": [
"star_team",
"lead_subagent",
"mesh_team",
"single"
]
},
"recommendation": {
"mode": "balanced",
"recommended_topology": "star_team",
"reason": "weighted_score",
"scores": [
{
"topology": "star_team",
"score": 0.50354,
"gate_pass": true
},
{
"topology": "mesh_team",
"score": 0.45944,
"gate_pass": true
},
{
"topology": "lead_subagent",
"score": 0.38029,
"gate_pass": true
}
],
"used_gate_filtered_pool": true
}
},
{
"budget_profile": "medium",
"results": [
{
"topology": "single",
"participants": 1,
"model_tier": "primary",
"tasks": 24,
"tasks_per_worker": 24.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": false,
"degradation_actions": [],
"execution_tokens": 34608,
"coordination_tokens": 0,
"cache_savings_tokens": 2422,
"total_tokens": 32186,
"coordination_ratio": 0.0,
"estimated_pass_rate": 0.79,
"estimated_defect_escape": 0.21,
"estimated_p95_latency_s": 152.64,
"estimated_throughput_tpd": 13584.91,
"budget_limit_tokens": 34080,
"budget_headroom_tokens": 1894,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": false,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": false
},
{
"topology": "lead_subagent",
"participants": 2,
"model_tier": "primary",
"tasks": 24,
"tasks_per_worker": 24.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": false,
"degradation_actions": [],
"execution_tokens": 32877,
"coordination_tokens": 863,
"cache_savings_tokens": 3287,
"total_tokens": 30453,
"coordination_ratio": 0.0283,
"estimated_pass_rate": 0.85,
"estimated_defect_escape": 0.15,
"estimated_p95_latency_s": 152.82,
"estimated_throughput_tpd": 13568.9,
"budget_limit_tokens": 34080,
"budget_headroom_tokens": 3627,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": true,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": true
},
{
"topology": "star_team",
"participants": 5,
"model_tier": "primary",
"tasks": 24,
"tasks_per_worker": 6.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": false,
"degradation_actions": [],
"execution_tokens": 31839,
"coordination_tokens": 4988,
"cache_savings_tokens": 3820,
"total_tokens": 33007,
"coordination_ratio": 0.1511,
"estimated_pass_rate": 0.89,
"estimated_defect_escape": 0.11,
"estimated_p95_latency_s": 39.2,
"estimated_throughput_tpd": 52897.96,
"budget_limit_tokens": 34080,
"budget_headroom_tokens": 1073,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": true,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": true
},
{
"topology": "mesh_team",
"participants": 4,
"model_tier": "economy",
"tasks": 24,
"tasks_per_worker": 8.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": true,
"degradation_actions": [
"reduce_participants:5->4",
"tighten_summary_scale:0.82",
"switch_model_tier:economy"
],
"execution_tokens": 33569,
"coordination_tokens": 4050,
"cache_savings_tokens": 4028,
"total_tokens": 33591,
"coordination_ratio": 0.1206,
"estimated_pass_rate": 0.82,
"estimated_defect_escape": 0.18,
"estimated_p95_latency_s": 51.92,
"estimated_throughput_tpd": 39938.37,
"budget_limit_tokens": 34080,
"budget_headroom_tokens": 489,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": true,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": true
}
],
"rankings": {
"cost_asc": [
"lead_subagent",
"single",
"star_team",
"mesh_team"
],
"coordination_ratio_asc": [
"single",
"lead_subagent",
"mesh_team",
"star_team"
],
"latency_asc": [
"star_team",
"mesh_team",
"single",
"lead_subagent"
],
"pass_rate_desc": [
"star_team",
"lead_subagent",
"mesh_team",
"single"
]
},
"recommendation": {
"mode": "balanced",
"recommended_topology": "star_team",
"reason": "weighted_score",
"scores": [
{
"topology": "star_team",
"score": 0.55528,
"gate_pass": true
},
{
"topology": "mesh_team",
"score": 0.50105,
"gate_pass": true
},
{
"topology": "lead_subagent",
"score": 0.4152,
"gate_pass": true
}
],
"used_gate_filtered_pool": true
}
},
{
"budget_profile": "high",
"results": [
{
"topology": "single",
"participants": 1,
"model_tier": "primary",
"tasks": 24,
"tasks_per_worker": 24.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": false,
"degradation_actions": [],
"execution_tokens": 34608,
"coordination_tokens": 0,
"cache_savings_tokens": 2422,
"total_tokens": 32186,
"coordination_ratio": 0.0,
"estimated_pass_rate": 0.81,
"estimated_defect_escape": 0.19,
"estimated_p95_latency_s": 152.64,
"estimated_throughput_tpd": 13584.91,
"budget_limit_tokens": 34368,
"budget_headroom_tokens": 2182,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": true,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": true
},
{
"topology": "lead_subagent",
"participants": 2,
"model_tier": "primary",
"tasks": 24,
"tasks_per_worker": 24.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": false,
"degradation_actions": [],
"execution_tokens": 32877,
"coordination_tokens": 863,
"cache_savings_tokens": 3287,
"total_tokens": 30453,
"coordination_ratio": 0.0283,
"estimated_pass_rate": 0.87,
"estimated_defect_escape": 0.13,
"estimated_p95_latency_s": 152.82,
"estimated_throughput_tpd": 13568.9,
"budget_limit_tokens": 34368,
"budget_headroom_tokens": 3915,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": true,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": true
},
{
"topology": "star_team",
"participants": 5,
"model_tier": "primary",
"tasks": 24,
"tasks_per_worker": 6.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": false,
"degradation_actions": [],
"execution_tokens": 31839,
"coordination_tokens": 4988,
"cache_savings_tokens": 3820,
"total_tokens": 33007,
"coordination_ratio": 0.1511,
"estimated_pass_rate": 0.91,
"estimated_defect_escape": 0.09,
"estimated_p95_latency_s": 39.2,
"estimated_throughput_tpd": 52897.96,
"budget_limit_tokens": 34368,
"budget_headroom_tokens": 1361,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": true,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": true
},
{
"topology": "mesh_team",
"participants": 4,
"model_tier": "economy",
"tasks": 24,
"tasks_per_worker": 8.0,
"workload_profile": "mixed",
"protocol_mode": "a2a_lite",
"degradation_applied": true,
"degradation_actions": [
"reduce_participants:5->4",
"tighten_summary_scale:0.82",
"switch_model_tier:economy"
],
"execution_tokens": 33569,
"coordination_tokens": 4050,
"cache_savings_tokens": 4028,
"total_tokens": 33591,
"coordination_ratio": 0.1206,
"estimated_pass_rate": 0.84,
"estimated_defect_escape": 0.16,
"estimated_p95_latency_s": 51.92,
"estimated_throughput_tpd": 39938.37,
"budget_limit_tokens": 34368,
"budget_headroom_tokens": 777,
"budget_ok": true,
"gates": {
"coordination_ratio_ok": true,
"quality_ok": true,
"latency_ok": true,
"budget_ok": true
},
"gate_pass": true
}
],
"rankings": {
"cost_asc": [
"lead_subagent",
"single",
"star_team",
"mesh_team"
],
"coordination_ratio_asc": [
"single",
"lead_subagent",
"mesh_team",
"star_team"
],
"latency_asc": [
"star_team",
"mesh_team",
"single",
"lead_subagent"
],
"pass_rate_desc": [
"star_team",
"lead_subagent",
"mesh_team",
"single"
]
},
"recommendation": {
"mode": "balanced",
"recommended_topology": "star_team",
"reason": "weighted_score",
"scores": [
{
"topology": "star_team",
"score": 0.56428,
"gate_pass": true
},
{
"topology": "mesh_team",
"score": 0.51005,
"gate_pass": true
},
{
"topology": "lead_subagent",
"score": 0.4242,
"gate_pass": true
},
{
"topology": "single",
"score": 0.37937,
"gate_pass": true
}
],
"used_gate_filtered_pool": true
}
}
]
}

View File

@ -0,0 +1,660 @@
#!/usr/bin/env python3
"""Estimate coordination efficiency across agent-team topologies.
This script remains intentionally lightweight so it can run in local and CI
contexts without external dependencies. It supports:
- topology comparison (`single`, `lead_subagent`, `star_team`, `mesh_team`)
- budget-aware simulation (`low`, `medium`, `high`)
- workload and protocol profiles
- optional degradation policies under budget pressure
- gate enforcement and recommendation output
"""
from __future__ import annotations
import argparse
import json
import sys
from dataclasses import dataclass
from typing import Iterable
TOPOLOGIES = ("single", "lead_subagent", "star_team", "mesh_team")
RECOMMENDATION_MODES = ("balanced", "cost", "quality")
DEGRADATION_POLICIES = ("none", "auto", "aggressive")
@dataclass(frozen=True)
class BudgetProfile:
name: str
summary_cap_tokens: int
max_workers: int
compaction_interval_rounds: int
message_budget_per_task: int
quality_modifier: float
@dataclass(frozen=True)
class WorkloadProfile:
name: str
execution_multiplier: float
sync_multiplier: float
summary_multiplier: float
latency_multiplier: float
quality_modifier: float
@dataclass(frozen=True)
class ProtocolProfile:
name: str
summary_multiplier: float
artifact_discount: float
latency_penalty_per_message_s: float
cache_bonus: float
quality_modifier: float
BUDGETS: dict[str, BudgetProfile] = {
"low": BudgetProfile(
name="low",
summary_cap_tokens=80,
max_workers=3,
compaction_interval_rounds=3,
message_budget_per_task=10,
quality_modifier=-0.03,
),
"medium": BudgetProfile(
name="medium",
summary_cap_tokens=120,
max_workers=5,
compaction_interval_rounds=5,
message_budget_per_task=20,
quality_modifier=0.0,
),
"high": BudgetProfile(
name="high",
summary_cap_tokens=180,
max_workers=8,
compaction_interval_rounds=8,
message_budget_per_task=32,
quality_modifier=0.02,
),
}
WORKLOADS: dict[str, WorkloadProfile] = {
"implementation": WorkloadProfile(
name="implementation",
execution_multiplier=1.00,
sync_multiplier=1.00,
summary_multiplier=1.00,
latency_multiplier=1.00,
quality_modifier=0.00,
),
"debugging": WorkloadProfile(
name="debugging",
execution_multiplier=1.12,
sync_multiplier=1.25,
summary_multiplier=1.12,
latency_multiplier=1.18,
quality_modifier=-0.02,
),
"research": WorkloadProfile(
name="research",
execution_multiplier=0.95,
sync_multiplier=0.90,
summary_multiplier=0.95,
latency_multiplier=0.92,
quality_modifier=0.01,
),
"mixed": WorkloadProfile(
name="mixed",
execution_multiplier=1.03,
sync_multiplier=1.08,
summary_multiplier=1.05,
latency_multiplier=1.06,
quality_modifier=0.00,
),
}
PROTOCOLS: dict[str, ProtocolProfile] = {
"a2a_lite": ProtocolProfile(
name="a2a_lite",
summary_multiplier=1.00,
artifact_discount=0.18,
latency_penalty_per_message_s=0.00,
cache_bonus=0.02,
quality_modifier=0.01,
),
"transcript": ProtocolProfile(
name="transcript",
summary_multiplier=2.20,
artifact_discount=0.00,
latency_penalty_per_message_s=0.012,
cache_bonus=-0.01,
quality_modifier=-0.02,
),
}
def _participants(topology: str, budget: BudgetProfile) -> int:
if topology == "single":
return 1
if topology == "lead_subagent":
return 2
if topology in ("star_team", "mesh_team"):
return min(5, budget.max_workers)
raise ValueError(f"unknown topology: {topology}")
def _execution_factor(topology: str) -> float:
factors = {
"single": 1.00,
"lead_subagent": 0.95,
"star_team": 0.92,
"mesh_team": 0.97,
}
return factors[topology]
def _base_pass_rate(topology: str) -> float:
rates = {
"single": 0.78,
"lead_subagent": 0.84,
"star_team": 0.88,
"mesh_team": 0.82,
}
return rates[topology]
def _cache_factor(topology: str) -> float:
factors = {
"single": 0.05,
"lead_subagent": 0.08,
"star_team": 0.10,
"mesh_team": 0.10,
}
return factors[topology]
def _coordination_messages(
*,
topology: str,
rounds: int,
participants: int,
workload: WorkloadProfile,
) -> int:
if topology == "single":
return 0
workers = max(1, participants - 1)
lead_messages = 2 * workers * rounds
if topology == "lead_subagent":
base_messages = lead_messages
elif topology == "star_team":
broadcast = workers * rounds
base_messages = lead_messages + broadcast
elif topology == "mesh_team":
peer_messages = workers * max(0, workers - 1) * rounds
base_messages = lead_messages + peer_messages
else:
raise ValueError(f"unknown topology: {topology}")
return int(round(base_messages * workload.sync_multiplier))
def _compute_result(
*,
topology: str,
tasks: int,
avg_task_tokens: int,
rounds: int,
budget: BudgetProfile,
workload: WorkloadProfile,
protocol: ProtocolProfile,
participants_override: int | None = None,
summary_scale: float = 1.0,
extra_quality_modifier: float = 0.0,
model_tier: str = "primary",
degradation_applied: bool = False,
degradation_actions: list[str] | None = None,
) -> dict[str, object]:
participants = participants_override or _participants(topology, budget)
participants = max(1, participants)
parallelism = 1 if topology == "single" else max(1, participants - 1)
execution_tokens = int(
tasks
* avg_task_tokens
* _execution_factor(topology)
* workload.execution_multiplier
)
summary_tokens = min(
budget.summary_cap_tokens,
max(24, int(avg_task_tokens * 0.08)),
)
summary_tokens = int(summary_tokens * workload.summary_multiplier * protocol.summary_multiplier)
summary_tokens = max(16, int(summary_tokens * summary_scale))
messages = _coordination_messages(
topology=topology,
rounds=rounds,
participants=participants,
workload=workload,
)
raw_coordination_tokens = messages * summary_tokens
compaction_events = rounds // budget.compaction_interval_rounds
compaction_discount = min(0.35, compaction_events * 0.10)
coordination_tokens = int(raw_coordination_tokens * (1.0 - compaction_discount))
coordination_tokens = int(coordination_tokens * (1.0 - protocol.artifact_discount))
cache_factor = _cache_factor(topology) + protocol.cache_bonus
cache_factor = min(0.30, max(0.0, cache_factor))
cache_savings_tokens = int(execution_tokens * cache_factor)
total_tokens = max(1, execution_tokens + coordination_tokens - cache_savings_tokens)
coordination_ratio = coordination_tokens / total_tokens
pass_rate = (
_base_pass_rate(topology)
+ budget.quality_modifier
+ workload.quality_modifier
+ protocol.quality_modifier
+ extra_quality_modifier
)
pass_rate = min(0.99, max(0.0, pass_rate))
defect_escape = round(max(0.0, 1.0 - pass_rate), 4)
base_latency_s = (tasks / parallelism) * 6.0 * workload.latency_multiplier
sync_penalty_s = messages * (0.02 + protocol.latency_penalty_per_message_s)
p95_latency_s = round(base_latency_s + sync_penalty_s, 2)
throughput_tpd = round((tasks / max(1.0, p95_latency_s)) * 86400.0, 2)
budget_limit_tokens = tasks * avg_task_tokens + tasks * budget.message_budget_per_task
budget_ok = total_tokens <= budget_limit_tokens
return {
"topology": topology,
"participants": participants,
"model_tier": model_tier,
"tasks": tasks,
"tasks_per_worker": round(tasks / parallelism, 2),
"workload_profile": workload.name,
"protocol_mode": protocol.name,
"degradation_applied": degradation_applied,
"degradation_actions": degradation_actions or [],
"execution_tokens": execution_tokens,
"coordination_tokens": coordination_tokens,
"cache_savings_tokens": cache_savings_tokens,
"total_tokens": total_tokens,
"coordination_ratio": round(coordination_ratio, 4),
"estimated_pass_rate": round(pass_rate, 4),
"estimated_defect_escape": defect_escape,
"estimated_p95_latency_s": p95_latency_s,
"estimated_throughput_tpd": throughput_tpd,
"budget_limit_tokens": budget_limit_tokens,
"budget_headroom_tokens": budget_limit_tokens - total_tokens,
"budget_ok": budget_ok,
}
def evaluate_topology(
*,
topology: str,
tasks: int,
avg_task_tokens: int,
rounds: int,
budget: BudgetProfile,
workload: WorkloadProfile,
protocol: ProtocolProfile,
degradation_policy: str,
coordination_ratio_hint: float,
) -> dict[str, object]:
base = _compute_result(
topology=topology,
tasks=tasks,
avg_task_tokens=avg_task_tokens,
rounds=rounds,
budget=budget,
workload=workload,
protocol=protocol,
)
if degradation_policy == "none" or topology == "single":
return base
pressure = (not bool(base["budget_ok"])) or (
float(base["coordination_ratio"]) > coordination_ratio_hint
)
if not pressure:
return base
if degradation_policy == "auto":
participant_delta = 1
summary_scale = 0.82
quality_penalty = -0.01
model_tier = "economy"
elif degradation_policy == "aggressive":
participant_delta = 2
summary_scale = 0.65
quality_penalty = -0.03
model_tier = "economy"
else:
raise ValueError(f"unknown degradation policy: {degradation_policy}")
reduced = max(2, int(base["participants"]) - participant_delta)
actions = [
f"reduce_participants:{base['participants']}->{reduced}",
f"tighten_summary_scale:{summary_scale}",
f"switch_model_tier:{model_tier}",
]
return _compute_result(
topology=topology,
tasks=tasks,
avg_task_tokens=avg_task_tokens,
rounds=rounds,
budget=budget,
workload=workload,
protocol=protocol,
participants_override=reduced,
summary_scale=summary_scale,
extra_quality_modifier=quality_penalty,
model_tier=model_tier,
degradation_applied=True,
degradation_actions=actions,
)
def parse_topologies(raw: str) -> list[str]:
items = [x.strip() for x in raw.split(",") if x.strip()]
invalid = sorted(set(items) - set(TOPOLOGIES))
if invalid:
raise ValueError(f"invalid topologies: {', '.join(invalid)}")
if not items:
raise ValueError("topology list is empty")
return items
def _emit_json(path: str, payload: dict[str, object]) -> None:
content = json.dumps(payload, indent=2, sort_keys=False)
if path == "-":
print(content)
return
with open(path, "w", encoding="utf-8") as f:
f.write(content)
f.write("\n")
def _rank(results: Iterable[dict[str, object]], key: str) -> list[str]:
return [x["topology"] for x in sorted(results, key=lambda row: row[key])] # type: ignore[index]
def _score_recommendation(
*,
results: list[dict[str, object]],
mode: str,
) -> dict[str, object]:
if not results:
return {
"mode": mode,
"recommended_topology": None,
"reason": "no_results",
"scores": [],
}
max_tokens = max(int(row["total_tokens"]) for row in results)
max_latency = max(float(row["estimated_p95_latency_s"]) for row in results)
if mode == "balanced":
w_quality, w_cost, w_latency = 0.45, 0.35, 0.20
elif mode == "cost":
w_quality, w_cost, w_latency = 0.25, 0.55, 0.20
elif mode == "quality":
w_quality, w_cost, w_latency = 0.65, 0.20, 0.15
else:
raise ValueError(f"unknown recommendation mode: {mode}")
scored: list[dict[str, object]] = []
for row in results:
quality = float(row["estimated_pass_rate"])
cost_norm = 1.0 - (int(row["total_tokens"]) / max(1, max_tokens))
latency_norm = 1.0 - (float(row["estimated_p95_latency_s"]) / max(1.0, max_latency))
score = (quality * w_quality) + (cost_norm * w_cost) + (latency_norm * w_latency)
scored.append(
{
"topology": row["topology"],
"score": round(score, 5),
"gate_pass": row["gate_pass"],
}
)
scored.sort(key=lambda x: float(x["score"]), reverse=True)
return {
"mode": mode,
"recommended_topology": scored[0]["topology"],
"reason": "weighted_score",
"scores": scored,
}
def _apply_gates(
*,
row: dict[str, object],
max_coordination_ratio: float,
min_pass_rate: float,
max_p95_latency: float,
) -> dict[str, object]:
coord_ok = float(row["coordination_ratio"]) <= max_coordination_ratio
quality_ok = float(row["estimated_pass_rate"]) >= min_pass_rate
latency_ok = float(row["estimated_p95_latency_s"]) <= max_p95_latency
budget_ok = bool(row["budget_ok"])
row["gates"] = {
"coordination_ratio_ok": coord_ok,
"quality_ok": quality_ok,
"latency_ok": latency_ok,
"budget_ok": budget_ok,
}
row["gate_pass"] = coord_ok and quality_ok and latency_ok and budget_ok
return row
def _evaluate_budget(
*,
budget: BudgetProfile,
args: argparse.Namespace,
topologies: list[str],
workload: WorkloadProfile,
protocol: ProtocolProfile,
) -> dict[str, object]:
rows = [
evaluate_topology(
topology=t,
tasks=args.tasks,
avg_task_tokens=args.avg_task_tokens,
rounds=args.coordination_rounds,
budget=budget,
workload=workload,
protocol=protocol,
degradation_policy=args.degradation_policy,
coordination_ratio_hint=args.max_coordination_ratio,
)
for t in topologies
]
rows = [
_apply_gates(
row=r,
max_coordination_ratio=args.max_coordination_ratio,
min_pass_rate=args.min_pass_rate,
max_p95_latency=args.max_p95_latency,
)
for r in rows
]
gate_pass_rows = [r for r in rows if bool(r["gate_pass"])]
recommendation_pool = gate_pass_rows if gate_pass_rows else rows
recommendation = _score_recommendation(
results=recommendation_pool,
mode=args.recommendation_mode,
)
recommendation["used_gate_filtered_pool"] = bool(gate_pass_rows)
return {
"budget_profile": budget.name,
"results": rows,
"rankings": {
"cost_asc": _rank(rows, "total_tokens"),
"coordination_ratio_asc": _rank(rows, "coordination_ratio"),
"latency_asc": _rank(rows, "estimated_p95_latency_s"),
"pass_rate_desc": [
x["topology"]
for x in sorted(rows, key=lambda row: row["estimated_pass_rate"], reverse=True)
],
},
"recommendation": recommendation,
}
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--budget", choices=sorted(BUDGETS.keys()), default="medium")
parser.add_argument("--all-budgets", action="store_true")
parser.add_argument("--tasks", type=int, default=24)
parser.add_argument("--avg-task-tokens", type=int, default=1400)
parser.add_argument("--coordination-rounds", type=int, default=4)
parser.add_argument(
"--topologies",
default=",".join(TOPOLOGIES),
help=f"comma-separated list: {','.join(TOPOLOGIES)}",
)
parser.add_argument("--workload-profile", choices=sorted(WORKLOADS.keys()), default="mixed")
parser.add_argument("--protocol-mode", choices=sorted(PROTOCOLS.keys()), default="a2a_lite")
parser.add_argument(
"--degradation-policy",
choices=DEGRADATION_POLICIES,
default="none",
)
parser.add_argument(
"--recommendation-mode",
choices=RECOMMENDATION_MODES,
default="balanced",
)
parser.add_argument("--max-coordination-ratio", type=float, default=0.20)
parser.add_argument("--min-pass-rate", type=float, default=0.80)
parser.add_argument("--max-p95-latency", type=float, default=180.0)
parser.add_argument("--json-output", default="-")
parser.add_argument("--enforce-gates", action="store_true")
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.tasks <= 0:
parser.error("--tasks must be > 0")
if args.avg_task_tokens <= 0:
parser.error("--avg-task-tokens must be > 0")
if args.coordination_rounds < 0:
parser.error("--coordination-rounds must be >= 0")
if not (0.0 < args.max_coordination_ratio < 1.0):
parser.error("--max-coordination-ratio must be in (0, 1)")
if not (0.0 < args.min_pass_rate <= 1.0):
parser.error("--min-pass-rate must be in (0, 1]")
if args.max_p95_latency <= 0.0:
parser.error("--max-p95-latency must be > 0")
try:
topologies = parse_topologies(args.topologies)
except ValueError as exc:
parser.error(str(exc))
workload = WORKLOADS[args.workload_profile]
protocol = PROTOCOLS[args.protocol_mode]
budget_targets = list(BUDGETS.values()) if args.all_budgets else [BUDGETS[args.budget]]
budget_reports = [
_evaluate_budget(
budget=budget,
args=args,
topologies=topologies,
workload=workload,
protocol=protocol,
)
for budget in budget_targets
]
primary = budget_reports[0]
payload: dict[str, object] = {
"schema_version": "zeroclaw.agent-team-eval.v1",
"budget_profile": primary["budget_profile"],
"inputs": {
"tasks": args.tasks,
"avg_task_tokens": args.avg_task_tokens,
"coordination_rounds": args.coordination_rounds,
"topologies": topologies,
"workload_profile": args.workload_profile,
"protocol_mode": args.protocol_mode,
"degradation_policy": args.degradation_policy,
"recommendation_mode": args.recommendation_mode,
"max_coordination_ratio": args.max_coordination_ratio,
"min_pass_rate": args.min_pass_rate,
"max_p95_latency": args.max_p95_latency,
},
"results": primary["results"],
"rankings": primary["rankings"],
"recommendation": primary["recommendation"],
}
if args.all_budgets:
payload["budget_sweep"] = budget_reports
_emit_json(args.json_output, payload)
if not args.enforce_gates:
return 0
violations: list[str] = []
for report in budget_reports:
budget_name = report["budget_profile"]
for row in report["results"]: # type: ignore[index]
if bool(row["gate_pass"]):
continue
gates = row["gates"]
if not gates["coordination_ratio_ok"]:
violations.append(
f"{budget_name}:{row['topology']}: coordination_ratio={row['coordination_ratio']}"
)
if not gates["quality_ok"]:
violations.append(
f"{budget_name}:{row['topology']}: pass_rate={row['estimated_pass_rate']}"
)
if not gates["latency_ok"]:
violations.append(
f"{budget_name}:{row['topology']}: p95_latency_s={row['estimated_p95_latency_s']}"
)
if not gates["budget_ok"]:
violations.append(f"{budget_name}:{row['topology']}: exceeded budget_limit_tokens")
if violations:
print("gate violations detected:", file=sys.stderr)
for item in violations:
print(f"- {item}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -0,0 +1,255 @@
#!/usr/bin/env python3
"""Tests for scripts/ci/agent_team_orchestration_eval.py."""
from __future__ import annotations
import json
import subprocess
import tempfile
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[3]
SCRIPT = ROOT / "scripts" / "ci" / "agent_team_orchestration_eval.py"
def run_cmd(cmd: list[str]) -> subprocess.CompletedProcess[str]:
return subprocess.run(
cmd,
cwd=str(ROOT),
text=True,
capture_output=True,
check=False,
)
class AgentTeamOrchestrationEvalTest(unittest.TestCase):
maxDiff = None
def test_json_output_contains_expected_fields(self) -> None:
with tempfile.NamedTemporaryFile(suffix=".json") as out:
proc = run_cmd(
[
"python3",
str(SCRIPT),
"--budget",
"medium",
"--json-output",
out.name,
]
)
self.assertEqual(proc.returncode, 0, msg=proc.stderr)
payload = json.loads(Path(out.name).read_text(encoding="utf-8"))
self.assertEqual(payload["schema_version"], "zeroclaw.agent-team-eval.v1")
self.assertEqual(payload["budget_profile"], "medium")
self.assertIn("results", payload)
self.assertEqual(len(payload["results"]), 4)
self.assertIn("recommendation", payload)
sample = payload["results"][0]
required_keys = {
"topology",
"participants",
"model_tier",
"tasks",
"execution_tokens",
"coordination_tokens",
"cache_savings_tokens",
"total_tokens",
"coordination_ratio",
"estimated_pass_rate",
"estimated_defect_escape",
"estimated_p95_latency_s",
"estimated_throughput_tpd",
"budget_limit_tokens",
"budget_ok",
"gates",
"gate_pass",
}
self.assertTrue(required_keys.issubset(sample.keys()))
def test_coordination_ratio_increases_with_topology_complexity(self) -> None:
proc = run_cmd(
[
"python3",
str(SCRIPT),
"--budget",
"medium",
"--json-output",
"-",
]
)
self.assertEqual(proc.returncode, 0, msg=proc.stderr)
payload = json.loads(proc.stdout)
by_topology = {row["topology"]: row for row in payload["results"]}
self.assertLess(
by_topology["single"]["coordination_ratio"],
by_topology["lead_subagent"]["coordination_ratio"],
)
self.assertLess(
by_topology["lead_subagent"]["coordination_ratio"],
by_topology["star_team"]["coordination_ratio"],
)
self.assertLess(
by_topology["star_team"]["coordination_ratio"],
by_topology["mesh_team"]["coordination_ratio"],
)
def test_protocol_transcript_costs_more_coordination_tokens(self) -> None:
base = run_cmd(
[
"python3",
str(SCRIPT),
"--budget",
"medium",
"--topologies",
"star_team",
"--protocol-mode",
"a2a_lite",
"--json-output",
"-",
]
)
self.assertEqual(base.returncode, 0, msg=base.stderr)
base_payload = json.loads(base.stdout)
transcript = run_cmd(
[
"python3",
str(SCRIPT),
"--budget",
"medium",
"--topologies",
"star_team",
"--protocol-mode",
"transcript",
"--json-output",
"-",
]
)
self.assertEqual(transcript.returncode, 0, msg=transcript.stderr)
transcript_payload = json.loads(transcript.stdout)
base_tokens = base_payload["results"][0]["coordination_tokens"]
transcript_tokens = transcript_payload["results"][0]["coordination_tokens"]
self.assertGreater(transcript_tokens, base_tokens)
def test_auto_degradation_applies_under_pressure(self) -> None:
no_degrade = run_cmd(
[
"python3",
str(SCRIPT),
"--budget",
"medium",
"--topologies",
"mesh_team",
"--degradation-policy",
"none",
"--json-output",
"-",
]
)
self.assertEqual(no_degrade.returncode, 0, msg=no_degrade.stderr)
no_degrade_payload = json.loads(no_degrade.stdout)
no_degrade_row = no_degrade_payload["results"][0]
auto_degrade = run_cmd(
[
"python3",
str(SCRIPT),
"--budget",
"medium",
"--topologies",
"mesh_team",
"--degradation-policy",
"auto",
"--json-output",
"-",
]
)
self.assertEqual(auto_degrade.returncode, 0, msg=auto_degrade.stderr)
auto_payload = json.loads(auto_degrade.stdout)
auto_row = auto_payload["results"][0]
self.assertTrue(auto_row["degradation_applied"])
self.assertLess(auto_row["participants"], no_degrade_row["participants"])
self.assertLess(auto_row["coordination_tokens"], no_degrade_row["coordination_tokens"])
def test_all_budgets_emits_budget_sweep(self) -> None:
proc = run_cmd(
[
"python3",
str(SCRIPT),
"--all-budgets",
"--topologies",
"single,star_team",
"--json-output",
"-",
]
)
self.assertEqual(proc.returncode, 0, msg=proc.stderr)
payload = json.loads(proc.stdout)
self.assertIn("budget_sweep", payload)
self.assertEqual(len(payload["budget_sweep"]), 3)
budgets = [x["budget_profile"] for x in payload["budget_sweep"]]
self.assertEqual(budgets, ["low", "medium", "high"])
def test_gate_fails_for_mesh_under_default_threshold(self) -> None:
proc = run_cmd(
[
"python3",
str(SCRIPT),
"--budget",
"medium",
"--topologies",
"mesh_team",
"--enforce-gates",
"--max-coordination-ratio",
"0.20",
"--json-output",
"-",
]
)
self.assertEqual(proc.returncode, 1)
self.assertIn("gate violations detected", proc.stderr)
self.assertIn("mesh_team", proc.stderr)
def test_gate_passes_for_star_under_default_threshold(self) -> None:
proc = run_cmd(
[
"python3",
str(SCRIPT),
"--budget",
"medium",
"--topologies",
"star_team",
"--enforce-gates",
"--max-coordination-ratio",
"0.20",
"--json-output",
"-",
]
)
self.assertEqual(proc.returncode, 0, msg=proc.stderr)
def test_recommendation_prefers_star_for_medium_defaults(self) -> None:
proc = run_cmd(
[
"python3",
str(SCRIPT),
"--budget",
"medium",
"--json-output",
"-",
]
)
self.assertEqual(proc.returncode, 0, msg=proc.stderr)
payload = json.loads(proc.stdout)
self.assertEqual(payload["recommendation"]["recommended_topology"], "star_team")
if __name__ == "__main__":
unittest.main()

View File

@ -8,6 +8,7 @@ pub mod prompt;
pub mod quota_aware;
pub mod research;
pub mod session;
pub mod team_orchestration;
#[cfg(test)]
mod tests;

File diff suppressed because it is too large Load Diff