zeroclaw/src/observability/runtime_trace.rs
2026-03-10 01:48:19 -04:00

415 lines
12 KiB
Rust

use crate::config::ObservabilityConfig;
use anyhow::Result;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock, RwLock};
use uuid::Uuid;
const DEFAULT_TRACE_REL_PATH: &str = "state/runtime-trace.jsonl";
/// Runtime trace storage policy.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RuntimeTraceStorageMode {
None,
Rolling,
Full,
}
impl RuntimeTraceStorageMode {
fn from_raw(raw: &str) -> Self {
match raw.trim().to_ascii_lowercase().as_str() {
"rolling" => Self::Rolling,
"full" => Self::Full,
_ => Self::None,
}
}
}
/// Structured runtime trace event for tool-call and model-reply diagnostics.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeTraceEvent {
pub id: String,
pub timestamp: String,
pub event_type: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub channel: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub provider: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub turn_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub success: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(default)]
pub payload: Value,
}
struct RuntimeTraceLogger {
mode: RuntimeTraceStorageMode,
max_entries: usize,
path: PathBuf,
write_lock: std::sync::Mutex<()>,
}
impl RuntimeTraceLogger {
fn new(mode: RuntimeTraceStorageMode, max_entries: usize, path: PathBuf) -> Self {
Self {
mode,
max_entries: max_entries.max(1),
path,
write_lock: std::sync::Mutex::new(()),
}
}
fn append(&self, event: &RuntimeTraceEvent) -> Result<()> {
if self.mode == RuntimeTraceStorageMode::None {
return Ok(());
}
let _guard = self.write_lock.lock().unwrap_or_else(|e| e.into_inner());
if let Some(parent) = self.path.parent() {
fs::create_dir_all(parent)?;
}
let line = serde_json::to_string(event)?;
let mut options = OpenOptions::new();
options.create(true).append(true);
#[cfg(unix)]
{
use std::os::unix::fs::OpenOptionsExt;
options.mode(0o600);
}
let mut file = options.open(&self.path)?;
writeln!(file, "{line}")?;
file.sync_data()?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = fs::set_permissions(&self.path, std::fs::Permissions::from_mode(0o600));
}
if self.mode == RuntimeTraceStorageMode::Rolling {
self.trim_to_last_entries()?;
}
Ok(())
}
fn trim_to_last_entries(&self) -> Result<()> {
let raw = fs::read_to_string(&self.path).unwrap_or_default();
let lines: Vec<&str> = raw
.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.collect();
if lines.len() <= self.max_entries {
return Ok(());
}
let keep_from = lines.len().saturating_sub(self.max_entries);
let kept = &lines[keep_from..];
let mut rewritten = kept.join("\n");
rewritten.push('\n');
let tmp = self.path.with_extension(format!(
"tmp.{}.{}",
std::process::id(),
Utc::now().timestamp_nanos_opt().unwrap_or_default()
));
fs::write(&tmp, rewritten)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = fs::set_permissions(&tmp, std::fs::Permissions::from_mode(0o600));
}
fs::rename(tmp, &self.path)?;
Ok(())
}
}
static TRACE_LOGGER: LazyLock<RwLock<Option<Arc<RuntimeTraceLogger>>>> =
LazyLock::new(|| RwLock::new(None));
/// Resolve runtime trace storage mode from config.
pub fn storage_mode_from_config(config: &ObservabilityConfig) -> RuntimeTraceStorageMode {
let mode = RuntimeTraceStorageMode::from_raw(&config.runtime_trace_mode);
if mode == RuntimeTraceStorageMode::None
&& !config.runtime_trace_mode.trim().is_empty()
&& !config.runtime_trace_mode.eq_ignore_ascii_case("none")
{
tracing::warn!(
mode = %config.runtime_trace_mode,
"Unknown observability.runtime_trace_mode; falling back to none"
);
}
mode
}
/// Resolve runtime trace path from config.
pub fn resolve_trace_path(config: &ObservabilityConfig, workspace_dir: &Path) -> PathBuf {
let raw = config.runtime_trace_path.trim();
let fallback = workspace_dir.join(DEFAULT_TRACE_REL_PATH);
if raw.is_empty() {
return fallback;
}
let configured = PathBuf::from(raw);
if configured.is_absolute() {
configured
} else {
workspace_dir.join(configured)
}
}
/// Initialize (or disable) runtime trace logging.
pub fn init_from_config(config: &ObservabilityConfig, workspace_dir: &Path) {
let mode = storage_mode_from_config(config);
let logger = if mode == RuntimeTraceStorageMode::None {
None
} else {
Some(Arc::new(RuntimeTraceLogger::new(
mode,
config.runtime_trace_max_entries.max(1),
resolve_trace_path(config, workspace_dir),
)))
};
let mut guard = TRACE_LOGGER.write().unwrap_or_else(|e| e.into_inner());
*guard = logger;
}
/// Record a runtime trace event.
pub fn record_event(
event_type: &str,
channel: Option<&str>,
provider: Option<&str>,
model: Option<&str>,
turn_id: Option<&str>,
success: Option<bool>,
message: Option<&str>,
payload: Value,
) {
let logger = TRACE_LOGGER
.read()
.unwrap_or_else(|e| e.into_inner())
.clone();
let Some(logger) = logger else {
return;
};
let event = RuntimeTraceEvent {
id: Uuid::new_v4().to_string(),
timestamp: Utc::now().to_rfc3339(),
event_type: event_type.to_string(),
channel: channel.map(str::to_string),
provider: provider.map(str::to_string),
model: model.map(str::to_string),
turn_id: turn_id.map(str::to_string),
success,
message: message.map(str::to_string),
payload,
};
if let Err(err) = logger.append(&event) {
tracing::warn!("Failed to write runtime trace event: {err}");
}
}
/// Load recent runtime trace events from storage.
pub fn load_events(
path: &Path,
limit: usize,
event_filter: Option<&str>,
contains: Option<&str>,
) -> Result<Vec<RuntimeTraceEvent>> {
if !path.exists() {
return Ok(Vec::new());
}
let raw = fs::read_to_string(path)?;
let mut events = Vec::new();
for line in raw.lines() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
match serde_json::from_str::<RuntimeTraceEvent>(trimmed) {
Ok(event) => events.push(event),
Err(err) => tracing::warn!("Skipping malformed runtime trace line: {err}"),
}
}
if let Some(filter) = event_filter.map(str::trim).filter(|f| !f.is_empty()) {
let normalized = filter.to_ascii_lowercase();
events.retain(|event| event.event_type.to_ascii_lowercase() == normalized);
}
if let Some(needle) = contains.map(str::trim).filter(|s| !s.is_empty()) {
let needle = needle.to_ascii_lowercase();
events.retain(|event| {
let mut haystack = format!(
"{} {} {}",
event.event_type,
event.message.as_deref().unwrap_or_default(),
event.payload
);
if let Some(channel) = &event.channel {
haystack.push_str(channel);
}
if let Some(provider) = &event.provider {
haystack.push_str(provider);
}
if let Some(model) = &event.model {
haystack.push_str(model);
}
haystack.to_ascii_lowercase().contains(&needle)
});
}
if events.len() > limit {
let keep_from = events.len() - limit;
events = events.split_off(keep_from);
}
events.reverse();
Ok(events)
}
/// Find a runtime trace event by id.
pub fn find_event_by_id(path: &Path, id: &str) -> Result<Option<RuntimeTraceEvent>> {
if !path.exists() {
return Ok(None);
}
let raw = fs::read_to_string(path)?;
for line in raw.lines().rev() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(event) = serde_json::from_str::<RuntimeTraceEvent>(trimmed) {
if event.id == id {
return Ok(Some(event));
}
}
}
Ok(None)
}
#[cfg(test)]
mod tests {
use super::*;
fn test_observability_config() -> ObservabilityConfig {
ObservabilityConfig {
backend: "none".to_string(),
otel_endpoint: None,
otel_service_name: None,
runtime_trace_mode: "rolling".to_string(),
runtime_trace_path: "state/runtime-trace.jsonl".to_string(),
runtime_trace_max_entries: 3,
}
}
#[test]
fn resolve_trace_path_relative_joins_workspace() {
let cfg = test_observability_config();
let workspace = tempfile::tempdir().unwrap();
let path = resolve_trace_path(&cfg, workspace.path());
assert_eq!(path, workspace.path().join("state/runtime-trace.jsonl"));
}
#[test]
fn storage_mode_parses_known_values() {
let mut cfg = test_observability_config();
cfg.runtime_trace_mode = "none".into();
assert_eq!(
storage_mode_from_config(&cfg),
RuntimeTraceStorageMode::None
);
cfg.runtime_trace_mode = "rolling".into();
assert_eq!(
storage_mode_from_config(&cfg),
RuntimeTraceStorageMode::Rolling
);
cfg.runtime_trace_mode = "full".into();
assert_eq!(
storage_mode_from_config(&cfg),
RuntimeTraceStorageMode::Full
);
}
#[test]
fn rolling_mode_keeps_latest_entries() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("trace.jsonl");
let logger = RuntimeTraceLogger::new(RuntimeTraceStorageMode::Rolling, 2, path.clone());
for i in 0..5 {
let event = RuntimeTraceEvent {
id: format!("id-{i}"),
timestamp: Utc::now().to_rfc3339(),
event_type: "test".into(),
channel: None,
provider: None,
model: None,
turn_id: None,
success: None,
message: Some(format!("event-{i}")),
payload: serde_json::json!({ "i": i }),
};
logger.append(&event).unwrap();
}
let events = load_events(&path, 10, None, None).unwrap();
assert_eq!(events.len(), 2);
assert_eq!(events[0].message.as_deref(), Some("event-4"));
assert_eq!(events[1].message.as_deref(), Some("event-3"));
}
#[test]
fn find_event_by_id_returns_match() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("trace.jsonl");
let logger = RuntimeTraceLogger::new(RuntimeTraceStorageMode::Full, 100, path.clone());
let target_id = "target-event";
let event = RuntimeTraceEvent {
id: target_id.into(),
timestamp: Utc::now().to_rfc3339(),
event_type: "tool_call_result".into(),
channel: Some("telegram".into()),
provider: Some("openrouter".into()),
model: Some("x".into()),
turn_id: Some("turn-1".into()),
success: Some(false),
message: Some("boom".into()),
payload: serde_json::json!({ "error": "boom" }),
};
logger.append(&event).unwrap();
let found = find_event_by_id(&path, target_id).unwrap();
assert!(found.is_some());
assert_eq!(found.unwrap().id, target_id);
}
}