feat(config): add ProgressMode enum for streaming channel draft updates

This commit is contained in:
argenis de la rosa 2026-02-28 20:18:40 -05:00 committed by Argenis
parent 84b43ba4b2
commit bfacba20cb
8 changed files with 266 additions and 93 deletions

View File

@ -1,5 +1,5 @@
use crate::approval::{ApprovalManager, ApprovalRequest, ApprovalResponse};
use crate::config::Config;
use crate::config::{Config, ProgressMode};
use crate::memory::{self, Memory, MemoryCategory};
use crate::multimodal;
use crate::observability::{self, runtime_trace, Observer, ObserverEvent};
@ -290,6 +290,7 @@ tokio::task_local! {
static TOOL_LOOP_NON_CLI_APPROVAL_CONTEXT: Option<NonCliApprovalContext>;
static LOOP_DETECTION_CONFIG: LoopDetectionConfig;
static SAFETY_HEARTBEAT_CONFIG: Option<SafetyHeartbeatConfig>;
static TOOL_LOOP_PROGRESS_MODE: ProgressMode;
}
/// Configuration for periodic safety-constraint re-injection (heartbeat).
@ -305,6 +306,14 @@ fn should_inject_safety_heartbeat(counter: usize, interval: usize) -> bool {
interval > 0 && counter > 0 && counter % interval == 0
}
fn should_emit_verbose_progress(mode: ProgressMode) -> bool {
mode == ProgressMode::Verbose
}
fn should_emit_tool_progress(mode: ProgressMode) -> bool {
mode != ProgressMode::Off
}
/// Extract a short hint from tool call arguments for progress display.
fn truncate_tool_args_for_progress(name: &str, args: &serde_json::Value, max_len: usize) -> String {
let hint = match name {
@ -654,27 +663,31 @@ pub(crate) async fn run_tool_call_loop_with_reply_target(
on_delta: Option<tokio::sync::mpsc::Sender<String>>,
hooks: Option<&crate::hooks::HookRunner>,
excluded_tools: &[String],
progress_mode: ProgressMode,
) -> Result<String> {
TOOL_LOOP_REPLY_TARGET
TOOL_LOOP_PROGRESS_MODE
.scope(
reply_target.map(str::to_string),
run_tool_call_loop(
provider,
history,
tools_registry,
observer,
provider_name,
model,
temperature,
silent,
approval,
channel_name,
multimodal_config,
max_tool_iterations,
cancellation_token,
on_delta,
hooks,
excluded_tools,
progress_mode,
TOOL_LOOP_REPLY_TARGET.scope(
reply_target.map(str::to_string),
run_tool_call_loop(
provider,
history,
tools_registry,
observer,
provider_name,
model,
temperature,
silent,
approval,
channel_name,
multimodal_config,
max_tool_iterations,
cancellation_token,
on_delta,
hooks,
excluded_tools,
),
),
)
.await
@ -809,6 +822,9 @@ pub(crate) async fn run_tool_call_loop(
.try_with(Clone::clone)
.ok()
.flatten();
let progress_mode = TOOL_LOOP_PROGRESS_MODE
.try_with(|mode| *mode)
.unwrap_or(ProgressMode::Verbose);
let bypass_non_cli_approval_for_turn =
approval.is_some_and(|mgr| channel_name != "cli" && mgr.consume_non_cli_allow_all_once());
if bypass_non_cli_approval_for_turn {
@ -870,13 +886,15 @@ pub(crate) async fn run_tool_call_loop(
}
// ── Progress: LLM thinking ────────────────────────────
if let Some(ref tx) = on_delta {
let phase = if iteration == 0 {
"\u{1f914} Thinking...\n".to_string()
} else {
format!("\u{1f914} Thinking (round {})...\n", iteration + 1)
};
let _ = tx.send(format!("{DRAFT_PROGRESS_SENTINEL}{phase}")).await;
if should_emit_verbose_progress(progress_mode) {
if let Some(ref tx) = on_delta {
let phase = if iteration == 0 {
"\u{1f914} Thinking...\n".to_string()
} else {
format!("\u{1f914} Thinking (round {})...\n", iteration + 1)
};
let _ = tx.send(format!("{DRAFT_PROGRESS_SENTINEL}{phase}")).await;
}
}
observer.record_event(&ObserverEvent::LlmRequest {
@ -1078,15 +1096,17 @@ pub(crate) async fn run_tool_call_loop(
};
// ── Progress: LLM responded ─────────────────────────────
if let Some(ref tx) = on_delta {
let llm_secs = llm_started_at.elapsed().as_secs();
if !tool_calls.is_empty() {
let _ = tx
.send(format!(
"{DRAFT_PROGRESS_SENTINEL}\u{1f4ac} Got {} tool call(s) ({llm_secs}s)\n",
tool_calls.len()
))
.await;
if should_emit_verbose_progress(progress_mode) {
if let Some(ref tx) = on_delta {
let llm_secs = llm_started_at.elapsed().as_secs();
if !tool_calls.is_empty() {
let _ = tx
.send(format!(
"{DRAFT_PROGRESS_SENTINEL}\u{1f4ac} Got {} tool call(s) ({llm_secs}s)\n",
tool_calls.len()
))
.await;
}
}
}
@ -1120,12 +1140,14 @@ pub(crate) async fn run_tool_call_loop(
}),
);
if let Some(ref tx) = on_delta {
let _ = tx
.send(format!(
"{DRAFT_PROGRESS_SENTINEL}\u{21bb} Retrying: response deferred action without a tool call\n"
))
.await;
if should_emit_verbose_progress(progress_mode) {
if let Some(ref tx) = on_delta {
let _ = tx
.send(format!(
"{DRAFT_PROGRESS_SENTINEL}\u{21bb} Retrying: response deferred action without a tool call\n"
))
.await;
}
}
continue;
@ -1422,18 +1444,19 @@ pub(crate) async fn run_tool_call_loop(
}),
);
// ── Progress: tool start ────────────────────────────
if let Some(ref tx) = on_delta {
let hint = truncate_tool_args_for_progress(&tool_name, &tool_args, 60);
let progress = if hint.is_empty() {
format!("\u{23f3} {}\n", tool_name)
} else {
format!("\u{23f3} {}: {hint}\n", tool_name)
};
tracing::debug!(tool = %tool_name, "Sending progress start to draft");
let _ = tx
.send(format!("{DRAFT_PROGRESS_SENTINEL}{progress}"))
.await;
if should_emit_tool_progress(progress_mode) {
if let Some(ref tx) = on_delta {
let hint = truncate_tool_args_for_progress(&tool_name, &tool_args, 60);
let progress = if hint.is_empty() {
format!("\u{23f3} {}\n", tool_name)
} else {
format!("\u{23f3} {}: {hint}\n", tool_name)
};
tracing::debug!(tool = %tool_name, "Sending progress start to draft");
let _ = tx
.send(format!("{DRAFT_PROGRESS_SENTINEL}{progress}"))
.await;
}
}
executable_indices.push(idx);
@ -1514,21 +1537,22 @@ pub(crate) async fn run_tool_call_loop(
.await;
}
// ── Progress: tool completion ───────────────────────
if let Some(ref tx) = on_delta {
let secs = outcome.duration.as_secs();
let icon = if outcome.success {
"\u{2705}"
} else {
"\u{274c}"
};
tracing::debug!(tool = %call.name, secs, "Sending progress complete to draft");
let _ = tx
.send(format!(
"{DRAFT_PROGRESS_SENTINEL}{icon} {} ({secs}s)\n",
call.name
))
.await;
if should_emit_tool_progress(progress_mode) {
if let Some(ref tx) = on_delta {
let secs = outcome.duration.as_secs();
let icon = if outcome.success {
"\u{2705}"
} else {
"\u{274c}"
};
tracing::debug!(tool = %call.name, secs, "Sending progress complete to draft");
let _ = tx
.send(format!(
"{DRAFT_PROGRESS_SENTINEL}{icon} {} ({secs}s)\n",
call.name
))
.await;
}
}
// ── Loop detection: record call ──────────────────────
@ -1597,12 +1621,14 @@ pub(crate) async fn run_tool_call_loop(
Some("loop pattern detected, injecting self-correction prompt"),
serde_json::json!({ "iteration": iteration + 1, "warning": &warning }),
);
if let Some(ref tx) = on_delta {
let _ = tx
.send(format!(
"{DRAFT_PROGRESS_SENTINEL}\u{26a0}\u{fe0f} Loop detected, attempting self-correction\n"
))
.await;
if should_emit_verbose_progress(progress_mode) {
if let Some(ref tx) = on_delta {
let _ = tx
.send(format!(
"{DRAFT_PROGRESS_SENTINEL}\u{26a0}\u{fe0f} Loop detected, attempting self-correction\n"
))
.await;
}
}
loop_detection_prompt = Some(warning);
}
@ -5644,4 +5670,16 @@ Let me check the result."#;
assert_eq!(parsed["content"].as_str(), Some("answer"));
assert!(parsed.get("reasoning_content").is_none());
}
#[test]
fn progress_mode_gates_work_as_expected() {
assert!(should_emit_verbose_progress(ProgressMode::Verbose));
assert!(!should_emit_verbose_progress(ProgressMode::Compact));
assert!(!should_emit_verbose_progress(ProgressMode::Off));
assert!(should_emit_tool_progress(ProgressMode::Verbose));
assert!(should_emit_tool_progress(ProgressMode::Compact));
assert!(!should_emit_tool_progress(ProgressMode::Off));
}
}

View File

@ -82,7 +82,7 @@ use crate::agent::loop_::{
};
use crate::agent::session::{resolve_session_id, shared_session_manager, Session, SessionManager};
use crate::approval::{ApprovalManager, ApprovalResponse, PendingApprovalError};
use crate::config::{Config, NonCliNaturalLanguageApprovalMode};
use crate::config::{Config, NonCliNaturalLanguageApprovalMode, ProgressMode};
use crate::identity;
use crate::memory::{self, Memory};
use crate::observability::{self, runtime_trace, Observer};
@ -166,6 +166,23 @@ fn clear_live_channels() {
.clear();
}
fn runtime_telegram_progress_mode_store() -> &'static Mutex<ProgressMode> {
static STORE: OnceLock<Mutex<ProgressMode>> = OnceLock::new();
STORE.get_or_init(|| Mutex::new(ProgressMode::default()))
}
fn set_runtime_telegram_progress_mode(mode: ProgressMode) {
*runtime_telegram_progress_mode_store()
.lock()
.unwrap_or_else(|e| e.into_inner()) = mode;
}
fn runtime_telegram_progress_mode() -> ProgressMode {
*runtime_telegram_progress_mode_store()
.lock()
.unwrap_or_else(|e| e.into_inner())
}
pub(crate) fn get_live_channel(name: &str) -> Option<Arc<dyn Channel>> {
live_channels_registry()
.lock()
@ -683,6 +700,27 @@ fn split_internal_progress_delta(delta: &str) -> (bool, &str) {
}
}
fn effective_progress_mode_for_message(
channel_name: &str,
expose_internal_tool_details: bool,
) -> ProgressMode {
if channel_name.eq_ignore_ascii_case("cli") || expose_internal_tool_details {
ProgressMode::Verbose
} else if channel_name.eq_ignore_ascii_case("telegram") {
runtime_telegram_progress_mode()
} else {
ProgressMode::Off
}
}
fn is_verbose_only_progress_line(delta: &str) -> bool {
let trimmed = delta.trim_start();
trimmed.starts_with("\u{1f914} Thinking")
|| trimmed.starts_with("\u{1f4ac} Got ")
|| trimmed.starts_with("\u{21bb} Retrying")
|| trimmed.starts_with("\u{26a0}\u{fe0f} Loop detected")
}
fn build_channel_system_prompt(
base_prompt: &str,
channel_name: &str,
@ -3460,6 +3498,8 @@ or tune thresholds in config.",
let expose_internal_tool_details =
msg.channel == "cli" || should_expose_internal_tool_details(&msg.content);
let progress_mode =
effective_progress_mode_for_message(msg.channel.as_str(), expose_internal_tool_details);
let excluded_tools_snapshot = if msg.channel == "cli" {
Vec::new()
} else {
@ -3527,7 +3567,7 @@ or tune thresholds in config.",
let channel = Arc::clone(channel_ref);
let reply_target = msg.reply_target.clone();
let draft_id = draft_id_ref.to_string();
let suppress_internal_progress = !expose_internal_tool_details;
let mode = progress_mode;
Some(tokio::spawn(async move {
let mut accumulated = String::new();
while let Some(delta) = rx.recv().await {
@ -3536,10 +3576,15 @@ or tune thresholds in config.",
continue;
}
let (is_internal_progress, visible_delta) = split_internal_progress_delta(&delta);
if suppress_internal_progress && is_internal_progress {
continue;
if is_internal_progress {
if mode == ProgressMode::Off {
continue;
}
if mode == ProgressMode::Compact && is_verbose_only_progress_line(visible_delta)
{
continue;
}
}
accumulated.push_str(visible_delta);
if let Err(e) = channel
.update_draft(&reply_target, &draft_id, &accumulated)
@ -3605,6 +3650,7 @@ or tune thresholds in config.",
delta_tx,
ctx.hooks.as_deref(),
&excluded_tools_snapshot,
progress_mode,
),
) => LlmExecutionResult::Completed(result),
};
@ -5482,6 +5528,13 @@ pub async fn start_channels(config: Config) -> Result<()> {
.telegram
.as_ref()
.is_some_and(|tg| tg.interrupt_on_new_message);
let telegram_progress_mode = config
.channels_config
.telegram
.as_ref()
.map(|tg| tg.progress_mode)
.unwrap_or_default();
set_runtime_telegram_progress_mode(telegram_progress_mode);
let session_manager = shared_session_manager(&config.agent.session, &config.workspace_dir)?
.map(|mgr| mgr as Arc<dyn SessionManager + Send + Sync>);
@ -11160,6 +11213,32 @@ Done reminder set for 1:38 AM."#;
assert_eq!(plain, "final answer");
}
#[test]
fn effective_progress_mode_defaults_non_telegram_to_off() {
assert_eq!(
effective_progress_mode_for_message("draft-streaming-channel", false),
ProgressMode::Off
);
assert_eq!(
effective_progress_mode_for_message("draft-streaming-channel", true),
ProgressMode::Verbose
);
}
#[test]
fn effective_progress_mode_uses_telegram_runtime_setting() {
set_runtime_telegram_progress_mode(ProgressMode::Compact);
assert_eq!(
effective_progress_mode_for_message("telegram", false),
ProgressMode::Compact
);
set_runtime_telegram_progress_mode(ProgressMode::Off);
assert_eq!(
effective_progress_mode_for_message("telegram", false),
ProgressMode::Off
);
}
#[test]
fn build_channel_system_prompt_includes_visibility_policy() {
let hidden = build_channel_system_prompt("base", "telegram", "chat", false);

View File

@ -18,15 +18,15 @@ pub use schema::{
MatrixConfig, MemoryConfig, ModelRouteConfig, MultimodalConfig, NextcloudTalkConfig,
NonCliNaturalLanguageApprovalMode, ObservabilityConfig, OtpChallengeDelivery, OtpConfig,
OtpMethod, OutboundLeakGuardAction, OutboundLeakGuardConfig, PeripheralBoardConfig,
PeripheralsConfig, PerplexityFilterConfig, PluginEntryConfig, PluginsConfig, ProviderConfig,
ProxyConfig, ProxyScope, QdrantConfig, QueryClassificationConfig, ReliabilityConfig,
ResearchPhaseConfig, ResearchTrigger, ResourceLimitsConfig, RuntimeConfig, SandboxBackend,
SandboxConfig, SchedulerConfig, SecretsConfig, SecurityConfig, SecurityRoleConfig,
SkillsConfig, SkillsPromptInjectionMode, SlackConfig, StorageConfig, StorageProviderConfig,
StorageProviderSection, StreamMode, SyscallAnomalyConfig, TelegramConfig, TranscriptionConfig,
TunnelConfig, UrlAccessConfig, WasmCapabilityEscalationMode, WasmConfig, WasmModuleHashPolicy,
WasmRuntimeConfig, WasmSecurityConfig, WebFetchConfig, WebSearchConfig, WebhookConfig,
DEFAULT_MODEL_FALLBACK,
PeripheralsConfig, PerplexityFilterConfig, PluginEntryConfig, PluginsConfig, ProgressMode,
ProviderConfig, ProxyConfig, ProxyScope, QdrantConfig, QueryClassificationConfig,
ReliabilityConfig, ResearchPhaseConfig, ResearchTrigger, ResourceLimitsConfig, RuntimeConfig,
SandboxBackend, SandboxConfig, SchedulerConfig, SecretsConfig, SecurityConfig,
SecurityRoleConfig, SkillsConfig, SkillsPromptInjectionMode, SlackConfig, StorageConfig,
StorageProviderConfig, StorageProviderSection, StreamMode, SyscallAnomalyConfig,
TelegramConfig, TranscriptionConfig, TunnelConfig, UrlAccessConfig,
WasmCapabilityEscalationMode, WasmConfig, WasmModuleHashPolicy, WasmRuntimeConfig,
WasmSecurityConfig, WebFetchConfig, WebSearchConfig, WebhookConfig, DEFAULT_MODEL_FALLBACK,
};
pub fn name_and_presence<T: traits::ChannelConfig>(channel: Option<&T>) -> (&'static str, bool) {
@ -55,6 +55,7 @@ mod tests {
draft_update_interval_ms: 1000,
interrupt_on_new_message: false,
mention_only: false,
progress_mode: ProgressMode::default(),
group_reply: None,
base_url: None,
ack_enabled: true,

View File

@ -4285,6 +4285,19 @@ pub enum StreamMode {
Partial,
}
/// Progress verbosity for channels that support draft streaming.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum ProgressMode {
/// Show all progress lines (thinking rounds, tool-count lines, tool lifecycle).
Verbose,
/// Show only tool lifecycle lines (start + completion).
#[default]
Compact,
/// Suppress progress lines and stream only final answer text.
Off,
}
fn default_draft_update_interval_ms() -> u64 {
1000
}
@ -4530,6 +4543,9 @@ pub struct TelegramConfig {
/// Direct messages are always processed.
#[serde(default)]
pub mention_only: bool,
/// Draft progress verbosity for streaming updates.
#[serde(default)]
pub progress_mode: ProgressMode,
/// Group-chat trigger controls.
#[serde(default)]
pub group_reply: Option<GroupReplyConfig>,
@ -8972,6 +8988,7 @@ mod tests {
draft_update_interval_ms: 1000,
interrupt_on_new_message: false,
mention_only: false,
progress_mode: ProgressMode::default(),
ack_enabled: true,
group_reply: None,
base_url: None,
@ -9399,6 +9416,7 @@ ws_url = "ws://127.0.0.1:3002"
draft_update_interval_ms: default_draft_update_interval_ms(),
interrupt_on_new_message: false,
mention_only: false,
progress_mode: ProgressMode::default(),
ack_enabled: true,
group_reply: None,
base_url: None,
@ -9880,6 +9898,7 @@ tool_dispatcher = "xml"
draft_update_interval_ms: 1000,
interrupt_on_new_message: false,
mention_only: false,
progress_mode: ProgressMode::default(),
ack_enabled: true,
group_reply: None,
base_url: None,
@ -10064,6 +10083,7 @@ tool_dispatcher = "xml"
draft_update_interval_ms: 500,
interrupt_on_new_message: true,
mention_only: false,
progress_mode: ProgressMode::default(),
ack_enabled: true,
group_reply: None,
base_url: None,
@ -10082,6 +10102,7 @@ tool_dispatcher = "xml"
let json = r#"{"bot_token":"tok","allowed_users":[]}"#;
let parsed: TelegramConfig = serde_json::from_str(json).unwrap();
assert_eq!(parsed.stream_mode, StreamMode::Off);
assert_eq!(parsed.progress_mode, ProgressMode::Compact);
assert_eq!(parsed.draft_update_interval_ms, 1000);
assert!(!parsed.interrupt_on_new_message);
assert!(parsed.base_url.is_none());
@ -10099,6 +10120,31 @@ tool_dispatcher = "xml"
assert_eq!(parsed.base_url, Some("https://tapi.bale.ai".to_string()));
}
#[test]
async fn progress_mode_deserializes_variants() {
let verbose: ProgressMode = serde_json::from_str(r#""verbose""#).unwrap();
let compact: ProgressMode = serde_json::from_str(r#""compact""#).unwrap();
let off: ProgressMode = serde_json::from_str(r#""off""#).unwrap();
assert_eq!(verbose, ProgressMode::Verbose);
assert_eq!(compact, ProgressMode::Compact);
assert_eq!(off, ProgressMode::Off);
}
#[test]
async fn telegram_config_deserializes_progress_mode_verbose() {
let json = r#"{"bot_token":"tok","allowed_users":[],"progress_mode":"verbose"}"#;
let parsed: TelegramConfig = serde_json::from_str(json).unwrap();
assert_eq!(parsed.progress_mode, ProgressMode::Verbose);
}
#[test]
async fn telegram_config_deserializes_progress_mode_off() {
let json = r#"{"bot_token":"tok","allowed_users":[],"progress_mode":"off"}"#;
let parsed: TelegramConfig = serde_json::from_str(json).unwrap();
assert_eq!(parsed.progress_mode, ProgressMode::Off);
}
#[test]
async fn telegram_group_reply_config_overrides_legacy_mention_only() {
let json = r#"{

View File

@ -498,6 +498,7 @@ mod tests {
draft_update_interval_ms: 1000,
interrupt_on_new_message: false,
mention_only: false,
progress_mode: crate::config::ProgressMode::default(),
ack_enabled: true,
group_reply: None,
base_url: None,
@ -667,6 +668,7 @@ mod tests {
draft_update_interval_ms: 1000,
interrupt_on_new_message: false,
mention_only: false,
progress_mode: crate::config::ProgressMode::default(),
ack_enabled: true,
group_reply: None,
base_url: None,

View File

@ -770,7 +770,9 @@ pub fn all_integrations() -> Vec<IntegrationEntry> {
#[cfg(test)]
mod tests {
use super::*;
use crate::config::schema::{IMessageConfig, MatrixConfig, StreamMode, TelegramConfig};
use crate::config::schema::{
IMessageConfig, MatrixConfig, ProgressMode, StreamMode, TelegramConfig,
};
use crate::config::Config;
#[test]
@ -837,6 +839,7 @@ mod tests {
draft_update_interval_ms: 1000,
interrupt_on_new_message: false,
mention_only: false,
progress_mode: ProgressMode::default(),
ack_enabled: true,
group_reply: None,
base_url: None,

View File

@ -1289,7 +1289,9 @@ fn backup_target_config(config_path: &Path) -> Result<Option<PathBuf>> {
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{Config, DelegateAgentConfig, MemoryConfig, StreamMode, TelegramConfig};
use crate::config::{
Config, DelegateAgentConfig, MemoryConfig, ProgressMode, StreamMode, TelegramConfig,
};
use crate::memory::{Memory, SqliteMemory};
use rusqlite::params;
use serde_json::json;
@ -1454,6 +1456,7 @@ mod tests {
draft_update_interval_ms: 1_500,
interrupt_on_new_message: false,
mention_only: false,
progress_mode: ProgressMode::default(),
ack_enabled: true,
group_reply: None,
base_url: None,

View File

@ -1,7 +1,7 @@
use crate::config::schema::{
default_nostr_relays, DingTalkConfig, IrcConfig, LarkReceiveMode, LinqConfig,
NextcloudTalkConfig, NostrConfig, QQConfig, QQEnvironment, QQReceiveMode, SignalConfig,
StreamMode, WhatsAppConfig,
NextcloudTalkConfig, NostrConfig, ProgressMode, QQConfig, QQEnvironment, QQReceiveMode,
SignalConfig, StreamMode, WhatsAppConfig,
};
use crate::config::{
AutonomyConfig, BrowserConfig, ChannelsConfig, ComposioConfig, Config, DiscordConfig,
@ -4436,6 +4436,7 @@ fn setup_channels() -> Result<ChannelsConfig> {
draft_update_interval_ms: 1000,
interrupt_on_new_message: false,
mention_only: false,
progress_mode: ProgressMode::default(),
group_reply: None,
base_url: None,
ack_enabled: true,