feat(hardware): add pico flash and runtime code toolchain

This commit is contained in:
argenis de la rosa 2026-02-28 20:15:40 -05:00
parent bcaf4c4156
commit 50372c116a
6 changed files with 1487 additions and 0 deletions

102
src/firmware/pico/main.py Normal file
View File

@ -0,0 +1,102 @@
# ZeroClaw Pico firmware — serial JSON protocol handler
# MicroPython — Raspberry Pi Pico (RP2040)
#
# Wire protocol:
# Host → Device: {"cmd":"gpio_write","params":{"pin":25,"value":1}}\n
# Device → Host: {"ok":true,"data":{"pin":25,"value":1,"state":"HIGH"}}\n
#
# Pin direction policy:
# gpio_write always configures the pin as OUTPUT and caches it.
# gpio_read uses the cached Pin object if one already exists, so a pin
# that was set via gpio_write retains its OUTPUT direction — it is NOT
# reconfigured to INPUT. If no cached Pin exists the pin is opened as
# INPUT and the new Pin is cached for subsequent reads.
import sys
import json
from machine import Pin
# Onboard LED — GPIO 25 on Pico 1
led = Pin(25, Pin.OUT)
# Cache of Pin objects keyed by pin number (excludes the onboard LED on 25).
# gpio_write stores pins as OUTPUT; gpio_read reuses the existing Pin if one
# is cached rather than clobbering its direction.
pins_cache = {}
def handle(msg):
cmd = msg.get("cmd")
params = msg.get("params", {})
if cmd == "ping":
# data.firmware must equal "zeroclaw" for ping_handshake() to pass
return {"ok": True, "data": {"firmware": "zeroclaw", "version": "1.0.0"}}
elif cmd == "gpio_write":
pin_num = params.get("pin")
value = params.get("value")
if pin_num is None or value is None:
return {"ok": False, "error": "missing pin or value"}
# Validate/cast pin_num to int (JSON may deliver it as float or string)
try:
pin_num = int(pin_num)
except (TypeError, ValueError):
return {"ok": False, "error": "invalid pin"}
if pin_num < 0:
return {"ok": False, "error": "invalid pin"}
# Normalize value: accept bool or int, must resolve to 0 or 1.
if isinstance(value, bool):
value = int(value)
if not isinstance(value, int) or value not in (0, 1):
return {"ok": False, "error": "invalid value: must be 0 or 1"}
if pin_num == 25:
led.value(value)
else:
# Reuse a cached Pin object when available to avoid repeated
# allocations; re-initialise direction to OUT in case it was
# previously opened as IN by gpio_read.
if pin_num in pins_cache:
pins_cache[pin_num].init(mode=Pin.OUT)
else:
pins_cache[pin_num] = Pin(pin_num, Pin.OUT)
pins_cache[pin_num].value(value)
state = "HIGH" if value == 1 else "LOW"
return {"ok": True, "data": {"pin": pin_num, "value": value, "state": state}}
elif cmd == "gpio_read":
pin_num = params.get("pin")
if pin_num is None:
return {"ok": False, "error": "missing pin"}
# Validate/cast pin_num to int
try:
pin_num = int(pin_num)
except (TypeError, ValueError):
return {"ok": False, "error": "invalid pin"}
if pin_num < 0:
return {"ok": False, "error": "invalid pin"}
value = led.value() if pin_num == 25 else (
pins_cache[pin_num].value() if pin_num in pins_cache
else pins_cache.setdefault(pin_num, Pin(pin_num, Pin.IN)).value()
)
state = "HIGH" if value == 1 else "LOW"
return {"ok": True, "data": {"pin": pin_num, "value": value, "state": state}}
else:
return {"ok": False, "error": "unknown cmd: {}".format(cmd)}
while True:
try:
line = sys.stdin.readline().strip()
if not line:
continue
msg = json.loads(line)
result = handle(msg)
print(json.dumps(result))
except (ValueError, KeyError, TypeError, OSError, AttributeError) as e:
# ValueError — json.loads() on malformed input
# KeyError — unexpected missing key in a message dict
# TypeError — wrong type in an operation
# OSError — GPIO/hardware errors from Pin()/Pin.value()
# AttributeError — msg.get(...) called on non-dict JSON value
# Any other exception propagates so bugs surface during development.
print(json.dumps({"ok": False, "error": str(e)}))

Binary file not shown.

View File

@ -23,6 +23,15 @@ pub mod introspect;
#[cfg(feature = "hardware")]
pub mod serial;
#[cfg(feature = "hardware")]
pub mod uf2;
#[cfg(feature = "hardware")]
pub mod pico_flash;
#[cfg(feature = "hardware")]
pub mod pico_code;
use crate::config::Config;
use anyhow::Result;
@ -40,6 +49,12 @@ pub use protocol::{ZcCommand, ZcResponse};
#[allow(unused_imports)]
pub use transport::{Transport, TransportError, TransportKind};
#[cfg(feature = "hardware")]
#[allow(unused_imports)]
pub use pico_code::{device_code_tools, DeviceExecTool, DeviceReadCodeTool, DeviceWriteCodeTool};
#[cfg(feature = "hardware")]
#[allow(unused_imports)]
pub use pico_flash::PicoFlashTool;
#[cfg(feature = "hardware")]
#[allow(unused_imports)]
pub use serial::HardwareSerialTransport;

723
src/hardware/pico_code.rs Normal file
View File

@ -0,0 +1,723 @@
//! Phase 7 — Dynamic code tools: `device_read_code`, `device_write_code`, `device_exec`.
//!
//! These tools let the LLM read, write, and execute code on any connected
//! hardware device. The `DeviceRuntime` on each device determines which
//! host-side tooling is used:
//!
//! - **MicroPython / CircuitPython** — `mpremote` for code read/write/exec.
//! - **Arduino / Nucleus / Linux** — not yet implemented; returns a clear error.
//!
//! When the `device` parameter is omitted, each tool auto-selects the device
//! only when **exactly one** device is registered. If multiple devices are
//! present the tool returns an error and requires an explicit `device` parameter.
use super::device::{DeviceRegistry, DeviceRuntime};
use crate::tools::traits::{Tool, ToolResult};
use async_trait::async_trait;
use serde_json::json;
use std::sync::Arc;
use tokio::sync::RwLock;
/// Default timeout for `mpremote` operations (seconds).
const MPREMOTE_TIMEOUT_SECS: u64 = 30;
/// Maximum time to wait for the serial port after a reset (seconds).
const PORT_WAIT_SECS: u64 = 15;
/// Polling interval when waiting for a serial port (ms).
const PORT_POLL_MS: u64 = 200;
// ── helpers ───────────────────────────────────────────────────────────────────
/// Resolve the serial port path and runtime for a device.
///
/// If `device_alias` is provided, look it up; otherwise auto-selects the device
/// only when exactly one device is registered. With multiple devices present,
/// returns an error requiring an explicit alias.
/// Returns `(alias, port, runtime)` or an error `ToolResult`.
async fn resolve_device_port(
registry: &RwLock<DeviceRegistry>,
device_alias: Option<&str>,
) -> Result<(String, String, DeviceRuntime), ToolResult> {
let reg = registry.read().await;
let alias: String = match device_alias {
Some(a) => a.to_string(),
None => {
// Auto-select the first device.
let all_aliases: Vec<String> =
reg.aliases().into_iter().map(|a| a.to_string()).collect();
match all_aliases.as_slice() {
[single] => single.clone(),
[] => {
return Err(ToolResult {
success: false,
output: String::new(),
error: Some("no device found — is a board connected via USB?".to_string()),
});
}
multiple => {
return Err(ToolResult {
success: false,
output: String::new(),
error: Some(format!(
"multiple devices found ({}); specify the \"device\" parameter",
multiple.join(", ")
)),
});
}
}
}
};
let device = reg.get_device(&alias).ok_or_else(|| ToolResult {
success: false,
output: String::new(),
error: Some(format!("device '{alias}' not found in registry")),
})?;
let runtime = device.runtime;
let port = device.port().ok_or_else(|| ToolResult {
success: false,
output: String::new(),
error: Some(format!(
"device '{alias}' has no serial port — is it connected?"
)),
})?;
Ok((alias, port.to_string(), runtime))
}
/// Return an unsupported-runtime error `ToolResult` for a given tool name.
fn unsupported_runtime(runtime: &DeviceRuntime, tool: &str) -> ToolResult {
ToolResult {
success: false,
output: String::new(),
error: Some(format!(
"{runtime} runtime is not yet supported for {tool} — coming soon"
)),
}
}
/// Run an `mpremote` command with a timeout and return (stdout, stderr).
async fn run_mpremote(args: &[&str], timeout_secs: u64) -> Result<(String, String), String> {
use tokio::time::timeout;
let result = timeout(
std::time::Duration::from_secs(timeout_secs),
tokio::process::Command::new("mpremote").args(args).output(),
)
.await;
match result {
Ok(Ok(output)) => {
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if output.status.success() {
Ok((stdout, stderr))
} else {
Err(format!(
"mpremote failed (exit {}): {}",
output.status,
stderr.trim()
))
}
}
Ok(Err(e)) => Err(format!(
"mpremote not found or could not start ({e}). \
Install it with: pip install mpremote"
)),
Err(_) => Err(format!(
"mpremote timed out after {timeout_secs}s — \
the device may be unresponsive"
)),
}
}
// ── DeviceReadCodeTool ────────────────────────────────────────────────────────
/// Tool: read the current `main.py` from a connected device.
///
/// The LLM uses this to understand the current program before modifying it.
pub struct DeviceReadCodeTool {
registry: Arc<RwLock<DeviceRegistry>>,
}
impl DeviceReadCodeTool {
pub fn new(registry: Arc<RwLock<DeviceRegistry>>) -> Self {
Self { registry }
}
}
#[async_trait]
impl Tool for DeviceReadCodeTool {
fn name(&self) -> &str {
"device_read_code"
}
fn description(&self) -> &str {
"Read the current program (main.py) running on a connected device. \
Use this before writing new code so you understand the current state."
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"additionalProperties": false,
"properties": {
"device": {
"type": "string",
"description": "Device alias e.g. pico0, esp0. Auto-selected if only one device is connected."
}
},
"required": []
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
let device_alias = args.get("device").and_then(|v| v.as_str());
let (alias, port, runtime) = match resolve_device_port(&self.registry, device_alias).await {
Ok(v) => v,
Err(tool_result) => return Ok(tool_result),
};
// Runtime dispatch.
match runtime {
DeviceRuntime::MicroPython | DeviceRuntime::CircuitPython => {}
other => return Ok(unsupported_runtime(&other, "device_read_code")),
}
tracing::info!(alias = %alias, port = %port, runtime = %runtime, "reading main.py from device");
match run_mpremote(
&["connect", &port, "cat", ":main.py"],
MPREMOTE_TIMEOUT_SECS,
)
.await
{
Ok((stdout, _stderr)) => Ok(ToolResult {
success: true,
output: if stdout.trim().is_empty() {
format!("main.py on {alias} is empty or not found.")
} else {
format!(
"Current main.py on {alias}:\n\n```python\n{}\n```",
stdout.trim()
)
},
error: None,
}),
Err(e) => {
// mpremote cat fails if main.py doesn't exist — not a fatal error.
if e.contains("OSError") || e.contains("no such file") || e.contains("ENOENT") {
Ok(ToolResult {
success: true,
output: format!(
"No main.py found on {alias} — the device has no program yet."
),
error: None,
})
} else {
Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("Failed to read code from {alias}: {e}")),
})
}
}
}
}
}
// ── DeviceWriteCodeTool ───────────────────────────────────────────────────────
/// Tool: write a complete program to a device as `main.py`.
///
/// This replaces the current `main.py` on the device and resets it so the new
/// program starts executing immediately.
pub struct DeviceWriteCodeTool {
registry: Arc<RwLock<DeviceRegistry>>,
}
impl DeviceWriteCodeTool {
pub fn new(registry: Arc<RwLock<DeviceRegistry>>) -> Self {
Self { registry }
}
}
#[async_trait]
impl Tool for DeviceWriteCodeTool {
fn name(&self) -> &str {
"device_write_code"
}
fn description(&self) -> &str {
"Write a complete program to a device — replaces main.py and restarts the device. \
Always read the current code first with device_read_code."
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"additionalProperties": false,
"properties": {
"device": {
"type": "string",
"description": "Device alias e.g. pico0, esp0. Auto-selected if only one device is connected."
},
"code": {
"type": "string",
"description": "Complete program to write as main.py"
}
},
"required": ["code"]
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
let code = match args.get("code").and_then(|v| v.as_str()) {
Some(c) => c,
None => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("missing required parameter: code".to_string()),
});
}
};
if code.trim().is_empty() {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("code parameter is empty — provide a program to write".to_string()),
});
}
let device_alias = args.get("device").and_then(|v| v.as_str());
let (alias, port, runtime) = match resolve_device_port(&self.registry, device_alias).await {
Ok(v) => v,
Err(tool_result) => return Ok(tool_result),
};
// Runtime dispatch.
match runtime {
DeviceRuntime::MicroPython | DeviceRuntime::CircuitPython => {}
other => return Ok(unsupported_runtime(&other, "device_write_code")),
}
tracing::info!(alias = %alias, port = %port, runtime = %runtime, code_len = code.len(), "writing main.py to device");
// Write code to an atomic, owner-only temp file via tempfile crate.
let named_tmp = match tokio::task::spawn_blocking(|| {
tempfile::Builder::new()
.prefix("zeroclaw_main_")
.suffix(".py")
.tempfile()
})
.await
{
Ok(Ok(f)) => f,
Ok(Err(e)) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("failed to create temp file: {e}")),
});
}
Err(e) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("temp file task failed: {e}")),
});
}
};
let tmp_path = named_tmp.path().to_path_buf();
let tmp_str = tmp_path.to_string_lossy().to_string();
if let Err(e) = tokio::fs::write(&tmp_path, code).await {
// named_tmp dropped here — auto-removes the file.
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("failed to write temp file: {e}")),
});
}
// Deploy via mpremote: copy + reset.
let result = run_mpremote(
&["connect", &port, "cp", &tmp_str, ":main.py", "+", "reset"],
MPREMOTE_TIMEOUT_SECS,
)
.await;
// Explicit cleanup — log if removal fails rather than silently ignoring.
if let Err(e) = named_tmp.close() {
tracing::warn!(path = %tmp_str, err = %e, "failed to clean up temp file");
}
match result {
Ok((_stdout, _stderr)) => {
tracing::info!(alias = %alias, "main.py deployed and device reset");
// Wait for the serial port to reappear after reset.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let port_reappeared = wait_for_port(
&port,
std::time::Duration::from_secs(PORT_WAIT_SECS),
std::time::Duration::from_millis(PORT_POLL_MS),
)
.await;
if port_reappeared {
Ok(ToolResult {
success: true,
output: format!(
"Code deployed to {alias} — main.py updated and device reset. \
{alias} is back online."
),
error: None,
})
} else {
Ok(ToolResult {
success: true,
output: format!(
"Code deployed to {alias} — main.py updated and device reset. \
Note: serial port did not reappear within {PORT_WAIT_SECS}s; \
the device may still be booting."
),
error: None,
})
}
}
Err(e) => Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("Failed to deploy code to {alias}: {e}")),
}),
}
}
}
// ── DeviceExecTool ────────────────────────────────────────────────────────────
/// Tool: run a one-off code snippet on a device without modifying `main.py`.
///
/// Good for one-time commands, sensor reads, and testing code before committing.
pub struct DeviceExecTool {
registry: Arc<RwLock<DeviceRegistry>>,
}
impl DeviceExecTool {
pub fn new(registry: Arc<RwLock<DeviceRegistry>>) -> Self {
Self { registry }
}
}
#[async_trait]
impl Tool for DeviceExecTool {
fn name(&self) -> &str {
"device_exec"
}
fn description(&self) -> &str {
"Execute a code snippet on a connected device without modifying main.py. \
Good for one-time actions, sensor reads, and testing before writing permanent code."
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"additionalProperties": false,
"properties": {
"device": {
"type": "string",
"description": "Device alias e.g. pico0, esp0. Auto-selected if only one device is connected."
},
"code": {
"type": "string",
"description": "Code to execute. Output is captured and returned."
}
},
"required": ["code"]
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
let code = match args.get("code").and_then(|v| v.as_str()) {
Some(c) => c,
None => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("missing required parameter: code".to_string()),
});
}
};
if code.trim().is_empty() {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(
"code parameter is empty — provide a code snippet to execute".to_string(),
),
});
}
let device_alias = args.get("device").and_then(|v| v.as_str());
let (alias, port, runtime) = match resolve_device_port(&self.registry, device_alias).await {
Ok(v) => v,
Err(tool_result) => return Ok(tool_result),
};
// Runtime dispatch.
match runtime {
DeviceRuntime::MicroPython | DeviceRuntime::CircuitPython => {}
other => return Ok(unsupported_runtime(&other, "device_exec")),
}
tracing::info!(alias = %alias, port = %port, runtime = %runtime, code_len = code.len(), "executing snippet on device");
// Write snippet to an atomic, owner-only temp file via tempfile crate.
let named_tmp = match tokio::task::spawn_blocking(|| {
tempfile::Builder::new()
.prefix("zeroclaw_exec_")
.suffix(".py")
.tempfile()
})
.await
{
Ok(Ok(f)) => f,
Ok(Err(e)) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("failed to create temp file: {e}")),
});
}
Err(e) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("temp file task failed: {e}")),
});
}
};
let tmp_path = named_tmp.path().to_path_buf();
let tmp_str = tmp_path.to_string_lossy().to_string();
if let Err(e) = tokio::fs::write(&tmp_path, code).await {
// named_tmp dropped here — auto-removes the file.
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("failed to write temp file: {e}")),
});
}
// Execute via mpremote run (does NOT modify main.py).
let result =
run_mpremote(&["connect", &port, "run", &tmp_str], MPREMOTE_TIMEOUT_SECS).await;
// Explicit cleanup — log if removal fails rather than silently ignoring.
if let Err(e) = named_tmp.close() {
tracing::warn!(path = %tmp_str, err = %e, "failed to clean up temp file");
}
match result {
Ok((stdout, stderr)) => {
let output = if stdout.trim().is_empty() && !stderr.trim().is_empty() {
// Some MicroPython output goes to stderr (e.g. exceptions).
stderr.trim().to_string()
} else {
stdout.trim().to_string()
};
Ok(ToolResult {
success: true,
output: if output.is_empty() {
format!("Code executed on {alias} — no output produced.")
} else {
format!("Output from {alias}:\n{output}")
},
error: None,
})
}
Err(e) => Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("Failed to execute code on {alias}: {e}")),
}),
}
}
}
// ── port wait helper ──────────────────────────────────────────────────────────
/// Poll for a specific serial port to reappear after a device reset.
///
/// Returns `true` if the port exists within the timeout, `false` otherwise.
async fn wait_for_port(
port_path: &str,
timeout: std::time::Duration,
interval: std::time::Duration,
) -> bool {
let deadline = tokio::time::Instant::now() + timeout;
while tokio::time::Instant::now() < deadline {
if std::path::Path::new(port_path).exists() {
return true;
}
tokio::time::sleep(interval).await;
}
false
}
/// Factory function: create all Phase 7 dynamic code tools.
pub fn device_code_tools(registry: Arc<RwLock<DeviceRegistry>>) -> Vec<Box<dyn Tool>> {
vec![
Box::new(DeviceReadCodeTool::new(registry.clone())),
Box::new(DeviceWriteCodeTool::new(registry.clone())),
Box::new(DeviceExecTool::new(registry)),
]
}
#[cfg(test)]
mod tests {
use super::*;
fn empty_registry() -> Arc<RwLock<DeviceRegistry>> {
Arc::new(RwLock::new(DeviceRegistry::new()))
}
// ── DeviceReadCodeTool ───────────────────────────────────────────
#[test]
fn device_read_code_name() {
let tool = DeviceReadCodeTool::new(empty_registry());
assert_eq!(tool.name(), "device_read_code");
}
#[test]
fn device_read_code_schema_valid() {
let tool = DeviceReadCodeTool::new(empty_registry());
let schema = tool.parameters_schema();
assert_eq!(schema["type"], "object");
assert!(schema["properties"]["device"].is_object());
}
#[tokio::test]
async fn device_read_code_no_device_returns_error() {
let tool = DeviceReadCodeTool::new(empty_registry());
let result = tool.execute(json!({})).await.unwrap();
assert!(!result.success);
assert!(
result.error.as_deref().unwrap_or("").contains("no device"),
"expected 'no device' error; got: {:?}",
result.error
);
}
// ── DeviceWriteCodeTool ──────────────────────────────────────────
#[test]
fn device_write_code_name() {
let tool = DeviceWriteCodeTool::new(empty_registry());
assert_eq!(tool.name(), "device_write_code");
}
#[test]
fn device_write_code_schema_requires_code() {
let tool = DeviceWriteCodeTool::new(empty_registry());
let schema = tool.parameters_schema();
let required = schema["required"].as_array().expect("required array");
assert!(
required.iter().any(|v| v.as_str() == Some("code")),
"code should be required"
);
}
#[tokio::test]
async fn device_write_code_empty_code_rejected() {
let tool = DeviceWriteCodeTool::new(empty_registry());
let result = tool.execute(json!({"code": ""})).await.unwrap();
assert!(!result.success);
assert!(result.error.as_deref().unwrap_or("").contains("empty"));
}
#[tokio::test]
async fn device_write_code_no_device_returns_error() {
let tool = DeviceWriteCodeTool::new(empty_registry());
let result = tool
.execute(json!({"code": "print('hello')"}))
.await
.unwrap();
assert!(!result.success);
assert!(result.error.as_deref().unwrap_or("").contains("no device"),);
}
// ── DeviceExecTool ───────────────────────────────────────────────
#[test]
fn device_exec_name() {
let tool = DeviceExecTool::new(empty_registry());
assert_eq!(tool.name(), "device_exec");
}
#[test]
fn device_exec_schema_requires_code() {
let tool = DeviceExecTool::new(empty_registry());
let schema = tool.parameters_schema();
let required = schema["required"].as_array().expect("required array");
assert!(
required.iter().any(|v| v.as_str() == Some("code")),
"code should be required"
);
}
#[tokio::test]
async fn device_exec_empty_code_rejected() {
let tool = DeviceExecTool::new(empty_registry());
let result = tool.execute(json!({"code": " "})).await.unwrap();
assert!(!result.success);
assert!(result.error.as_deref().unwrap_or("").contains("empty"));
}
#[tokio::test]
async fn device_exec_no_device_returns_error() {
let tool = DeviceExecTool::new(empty_registry());
let result = tool.execute(json!({"code": "print(1+1)"})).await.unwrap();
assert!(!result.success);
assert!(result.error.as_deref().unwrap_or("").contains("no device"),);
}
// ── Factory ──────────────────────────────────────────────────────
#[test]
fn factory_returns_three_tools() {
let reg = empty_registry();
let tools = device_code_tools(reg);
assert_eq!(tools.len(), 3);
let names: Vec<&str> = tools.iter().map(|t| t.name()).collect();
assert!(names.contains(&"device_read_code"));
assert!(names.contains(&"device_write_code"));
assert!(names.contains(&"device_exec"));
}
#[test]
fn all_specs_valid() {
let reg = empty_registry();
let tools = device_code_tools(reg);
for tool in &tools {
let spec = tool.spec();
assert!(!spec.name.is_empty());
assert!(!spec.description.is_empty());
assert_eq!(spec.parameters["type"], "object");
}
}
}

296
src/hardware/pico_flash.rs Normal file
View File

@ -0,0 +1,296 @@
//! `pico_flash` tool — flash ZeroClaw firmware to a Pico in BOOTSEL mode.
//!
//! # Happy path
//! 1. User holds BOOTSEL while plugging in Pico → RPI-RP2 drive appears.
//! 2. User asks "flash my pico".
//! 3. LLM calls `pico_flash(confirm=true)`.
//! 4. Tool copies UF2 to RPI-RP2 drive; Pico reboots into MicroPython.
//! 5. Tool waits up to 20 s for `/dev/cu.usbmodem*` to appear.
//! 6. Tool deploys `main.py` via `mpremote` and resets the Pico.
//! 7. Tool waits for the serial port to reappear after reset.
//! 8. Tool returns success; user restarts ZeroClaw to get `pico0`.
use super::device::DeviceRegistry;
use super::uf2;
use crate::tools::traits::{Tool, ToolResult};
use async_trait::async_trait;
use serde_json::json;
use std::sync::Arc;
use tokio::sync::RwLock;
/// How long to wait for the Pico serial port after flashing (seconds).
const PORT_WAIT_SECS: u64 = 20;
/// How often to poll for the serial port.
const PORT_POLL_MS: u64 = 500;
// ── PicoFlashTool ─────────────────────────────────────────────────────────────
/// Tool: flash ZeroClaw MicroPython firmware to a Pico in BOOTSEL mode.
///
/// The Pico must be connected with BOOTSEL held so it mounts as `RPI-RP2`.
/// After flashing, the tool deploys `main.py` via `mpremote`, then reconnects
/// the serial transport in the [`DeviceRegistry`] so subsequent `gpio_write`
/// calls work immediately without restarting ZeroClaw.
pub struct PicoFlashTool {
registry: Arc<RwLock<DeviceRegistry>>,
}
impl PicoFlashTool {
pub fn new(registry: Arc<RwLock<DeviceRegistry>>) -> Self {
Self { registry }
}
}
#[async_trait]
impl Tool for PicoFlashTool {
fn name(&self) -> &str {
"pico_flash"
}
fn description(&self) -> &str {
"Flash ZeroClaw firmware to a Raspberry Pi Pico in BOOTSEL mode. \
The Pico must be connected with the BOOTSEL button held (shows as RPI-RP2 drive in Finder). \
After flashing the Pico reboots, main.py is deployed, and the serial \
connection is refreshed automatically no restart needed."
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"confirm": {
"type": "boolean",
"description": "Set to true to confirm flashing the Pico firmware"
}
},
"required": ["confirm"]
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
// ── 1. Require explicit confirmation ──────────────────────────────
let confirmed = args
.get("confirm")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if !confirmed {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(
"Set confirm=true to proceed with flashing. \
This will overwrite the firmware on the connected Pico."
.to_string(),
),
});
}
// ── 2. Detect BOOTSEL-mode Pico ───────────────────────────────────
let mount = match uf2::find_rpi_rp2_mount() {
Some(m) => m,
None => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(
"No Pico in BOOTSEL mode found (RPI-RP2 drive not detected). \
Hold the BOOTSEL button while plugging the Pico in via USB, \
then try again."
.to_string(),
),
});
}
};
tracing::info!(mount = %mount.display(), "RPI-RP2 volume found");
// ── 3. Ensure firmware files are extracted ────────────────────────
let firmware_dir = match uf2::ensure_firmware_dir() {
Ok(d) => d,
Err(e) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("firmware error: {e}")),
});
}
};
// ── 4. Flash UF2 ─────────────────────────────────────────────────
if let Err(e) = uf2::flash_uf2(&mount, &firmware_dir).await {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("flash failed: {e}")),
});
}
// ── 5. Wait for serial port to appear ─────────────────────────────
let port = uf2::wait_for_serial_port(
std::time::Duration::from_secs(PORT_WAIT_SECS),
std::time::Duration::from_millis(PORT_POLL_MS),
)
.await;
let port = match port {
Some(p) => p,
None => {
// Flash likely succeeded even if port didn't appear in time —
// some host systems are slower to enumerate the new port.
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!(
"UF2 copied to {} but serial port did not appear within {PORT_WAIT_SECS}s. \
Unplug and replug the Pico, then run:\n \
mpremote connect <port> cp ~/.zeroclaw/firmware/pico/main.py :main.py + reset",
mount.display()
)),
});
}
};
tracing::info!(port = %port.display(), "Pico serial port online after UF2 flash");
// ── 6. Deploy main.py via mpremote ────────────────────────────────
if let Err(e) = uf2::deploy_main_py(&port, &firmware_dir).await {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("main.py deploy failed: {e}")),
});
}
// ── 7. Wait for serial port after mpremote reset ──────────────────
//
// mpremote resets the Pico so the serial port disappears briefly.
// Give the OS a moment to drop the old entry before polling.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let final_port = uf2::wait_for_serial_port(
std::time::Duration::from_secs(PORT_WAIT_SECS),
std::time::Duration::from_millis(PORT_POLL_MS),
)
.await;
// ── 8. Reconnect serial transport in DeviceRegistry ──────────────
//
// The old transport still points at a stale port handle from before
// the flash. Reconnect so gpio_write works immediately.
let reconnect_result = match &final_port {
Some(p) => {
let port_str = p.to_string_lossy();
let mut reg = self.registry.write().await;
// Try to find a pico alias in the registry.
match reg.aliases().into_iter().find(|a| a.starts_with("pico")) {
Some(a) => {
let alias = a.to_string();
reg.reconnect(&alias, Some(&port_str)).await
}
None => Err(anyhow::anyhow!(
"no pico alias found in registry; cannot reconnect transport"
)),
}
}
None => Err(anyhow::anyhow!("no serial port to reconnect")),
};
// ── 9. Return result ──────────────────────────────────────────────
match final_port {
Some(p) => {
let port_str = p.display().to_string();
let reconnected = reconnect_result.is_ok();
if reconnected {
tracing::info!(port = %port_str, "Pico online with main.py — transport reconnected");
} else {
let err = reconnect_result.unwrap_err();
tracing::warn!(port = %port_str, err = %err, "Pico online but reconnect failed");
}
let suffix = if reconnected {
"pico0 is ready — you can use gpio_write immediately."
} else {
"Restart ZeroClaw to reconnect as pico0."
};
Ok(ToolResult {
success: true,
output: format!(
"Pico flashed and main.py deployed successfully. \
Firmware is online at {port_str}. {suffix}"
),
error: None,
})
}
None => Ok(ToolResult {
success: true,
output: format!(
"Pico flashed and main.py deployed. \
Serial port did not reappear within {PORT_WAIT_SECS}s after reset \
unplug and replug the Pico, then restart ZeroClaw to connect as pico0."
),
error: None,
}),
}
}
}
#[cfg(test)]
mod tests {
use super::super::device::DeviceRegistry;
use super::*;
fn tool() -> PicoFlashTool {
let registry = Arc::new(RwLock::new(DeviceRegistry::new()));
PicoFlashTool::new(registry)
}
#[test]
fn name_is_pico_flash() {
let t = tool();
assert_eq!(t.name(), "pico_flash");
}
#[test]
fn schema_requires_confirm() {
let schema = tool().parameters_schema();
let required = schema["required"].as_array().expect("required array");
assert!(
required.iter().any(|v| v.as_str() == Some("confirm")),
"confirm should be required"
);
}
#[tokio::test]
async fn execute_without_confirm_returns_error() {
let result = tool()
.execute(serde_json::json!({"confirm": false}))
.await
.unwrap();
assert!(!result.success);
assert!(result.error.is_some());
let err = result.error.unwrap();
assert!(
err.contains("confirm=true"),
"error should mention confirm=true; got: {err}"
);
}
#[tokio::test]
async fn execute_missing_confirm_returns_error() {
let result = tool().execute(serde_json::json!({})).await.unwrap();
assert!(!result.success);
}
#[tokio::test]
async fn execute_with_confirm_true_but_no_pico_returns_error() {
// In CI there's no Pico attached — the tool should report missing device, not panic.
let result = tool()
.execute(serde_json::json!({"confirm": true}))
.await
.unwrap();
// Either success (if a Pico happens to be connected) or the BOOTSEL error.
// What must NOT happen: panic or anyhow error propagation.
let _ = result; // just verify it didn't panic
}
}

351
src/hardware/uf2.rs Normal file
View File

@ -0,0 +1,351 @@
//! UF2 flashing support — detect BOOTSEL-mode Pico and deploy firmware.
//!
//! # Workflow
//! 1. [`find_rpi_rp2_mount`] — check well-known mount points for the RPI-RP2 volume
//! that appears when a Pico is held in BOOTSEL mode.
//! 2. [`ensure_firmware_dir`] — extract the bundled firmware files to
//! `~/.zeroclaw/firmware/pico/` if they aren't there yet.
//! 3. [`flash_uf2`] — copy the UF2 to the mount point; the Pico reboots automatically.
//!
//! # Embedded assets
//! Both firmware files are compiled into the binary with `include_bytes!` so
//! users never need to download them separately.
use anyhow::{bail, Result};
use std::path::{Path, PathBuf};
// ── Embedded firmware ─────────────────────────────────────────────────────────
/// MicroPython UF2 binary — copied to RPI-RP2 to install the base runtime.
const PICO_UF2: &[u8] = include_bytes!("../firmware/pico/zeroclaw-pico.uf2");
/// ZeroClaw serial protocol handler — written to the Pico after MicroPython boots.
pub const PICO_MAIN_PY: &[u8] = include_bytes!("../firmware/pico/main.py");
/// UF2 magic word 1 (little-endian bytes at offset 0 of every UF2 block).
const UF2_MAGIC1: [u8; 4] = [0x55, 0x46, 0x32, 0x0A];
// ── Volume detection ──────────────────────────────────────────────────────────
/// Find the RPI-RP2 mount point if a Pico is connected in BOOTSEL mode.
///
/// Checks:
/// - macOS: `/Volumes/RPI-RP2`
/// - Linux: `/media/*/RPI-RP2` and `/run/media/*/RPI-RP2`
pub fn find_rpi_rp2_mount() -> Option<PathBuf> {
// macOS
let mac = PathBuf::from("/Volumes/RPI-RP2");
if mac.exists() {
return Some(mac);
}
// Linux — /media/<user>/RPI-RP2 or /run/media/<user>/RPI-RP2
for base in &["/media", "/run/media"] {
if let Ok(entries) = std::fs::read_dir(base) {
for entry in entries.flatten() {
let candidate = entry.path().join("RPI-RP2");
if candidate.exists() {
return Some(candidate);
}
}
}
}
None
}
// ── Firmware directory management ─────────────────────────────────────────────
/// Ensure `~/.zeroclaw/firmware/pico/` exists and contains the bundled assets.
///
/// Files are only written if they are absent — existing files are never overwritten
/// so users can substitute their own firmware.
///
/// Returns the firmware directory path.
pub fn ensure_firmware_dir() -> Result<PathBuf> {
use directories::BaseDirs;
let base = BaseDirs::new().ok_or_else(|| anyhow::anyhow!("cannot determine home directory"))?;
let firmware_dir = base
.home_dir()
.join(".zeroclaw")
.join("firmware")
.join("pico");
std::fs::create_dir_all(&firmware_dir)?;
// UF2 — validate magic before writing so a broken stub is caught early.
let uf2_path = firmware_dir.join("zeroclaw-pico.uf2");
if !uf2_path.exists() {
if PICO_UF2.len() < 8 || PICO_UF2[..4] != UF2_MAGIC1 {
bail!(
"Bundled UF2 is a placeholder — download the real MicroPython UF2 from \
https://micropython.org/download/RPI_PICO/ and place it at \
src/firmware/pico/zeroclaw-pico.uf2, then rebuild ZeroClaw."
);
}
std::fs::write(&uf2_path, PICO_UF2)?;
tracing::info!(path = %uf2_path.display(), "extracted bundled UF2");
}
// main.py — always check UF2 magic even if path already exists (user may
// have placed a stub). main.py has no such check — it's just text.
let main_py_path = firmware_dir.join("main.py");
if !main_py_path.exists() {
std::fs::write(&main_py_path, PICO_MAIN_PY)?;
tracing::info!(path = %main_py_path.display(), "extracted bundled main.py");
}
Ok(firmware_dir)
}
// ── Flashing ──────────────────────────────────────────────────────────────────
/// Copy the UF2 file to the RPI-RP2 mount point.
///
/// macOS often returns "Operation not permitted" for `std::fs::copy` on FAT
/// volumes presented by BOOTSEL-mode Picos. We try four approaches in order
/// and return a clear manual-fallback message if all fail:
///
/// 1. `std::fs::copy` — fast, no subprocess; works on most Linux setups.
/// 2. `cp <src> <dst>` — bypasses some macOS VFS permission layers.
/// 3. `sudo cp …` — escalates for locked volumes.
/// 4. Error — instructs the user to run the `sudo cp` manually.
pub async fn flash_uf2(mount_point: &Path, firmware_dir: &Path) -> Result<()> {
let uf2_src = firmware_dir.join("zeroclaw-pico.uf2");
let uf2_dst = mount_point.join("firmware.uf2");
let src_str = uf2_src.to_string_lossy().into_owned();
let dst_str = uf2_dst.to_string_lossy().into_owned();
tracing::info!(
src = %src_str,
dst = %dst_str,
"flashing UF2"
);
// Validate UF2 magic before any copy attempt — prevents flashing a stub.
let data = std::fs::read(&uf2_src)?;
if data.len() < 8 || data[..4] != UF2_MAGIC1 {
bail!(
"UF2 at {} does not look like a valid UF2 file (magic mismatch). \
Download from https://micropython.org/download/RPI_PICO/ and delete \
the existing file so ZeroClaw can re-extract it.",
uf2_src.display()
);
}
// ── Attempt 1: std::fs::copy (works on Linux, sometimes blocked on macOS) ─
{
let src = uf2_src.clone();
let dst = uf2_dst.clone();
let result = tokio::task::spawn_blocking(move || std::fs::copy(&src, &dst))
.await
.map_err(|e| anyhow::anyhow!("copy task panicked: {e}"));
match result {
Ok(Ok(_)) => {
tracing::info!("UF2 copy complete (std::fs::copy) — Pico will reboot");
return Ok(());
}
Ok(Err(e)) => tracing::warn!("std::fs::copy failed ({}), trying cp", e),
Err(e) => tracing::warn!("std::fs::copy task failed ({}), trying cp", e),
}
}
// ── Attempt 2: cp via subprocess ──────────────────────────────────────────
{
/// Timeout for subprocess copy attempts (seconds).
const CP_TIMEOUT_SECS: u64 = 10;
let out = tokio::time::timeout(
std::time::Duration::from_secs(CP_TIMEOUT_SECS),
tokio::process::Command::new("cp")
.arg(&src_str)
.arg(&dst_str)
.output(),
)
.await;
match out {
Err(_elapsed) => {
tracing::warn!("cp timed out after {}s, trying sudo cp", CP_TIMEOUT_SECS);
}
Ok(Ok(o)) if o.status.success() => {
tracing::info!("UF2 copy complete (cp) — Pico will reboot");
return Ok(());
}
Ok(Ok(o)) => {
let stderr = String::from_utf8_lossy(&o.stderr);
tracing::warn!("cp failed ({}), trying sudo cp", stderr.trim());
}
Ok(Err(e)) => tracing::warn!("cp spawn failed ({}), trying sudo cp", e),
}
}
// ── Attempt 3: sudo cp (non-interactive) ─────────────────────────────────
{
const SUDO_CP_TIMEOUT_SECS: u64 = 10;
let out = tokio::time::timeout(
std::time::Duration::from_secs(SUDO_CP_TIMEOUT_SECS),
tokio::process::Command::new("sudo")
.args(["-n", "cp", &src_str, &dst_str])
.output(),
)
.await;
match out {
Err(_elapsed) => {
tracing::warn!("sudo cp timed out after {}s", SUDO_CP_TIMEOUT_SECS);
}
Ok(Ok(o)) if o.status.success() => {
tracing::info!("UF2 copy complete (sudo cp) — Pico will reboot");
return Ok(());
}
Ok(Ok(o)) => {
let stderr = String::from_utf8_lossy(&o.stderr);
tracing::warn!("sudo cp failed: {}", stderr.trim());
}
Ok(Err(e)) => tracing::warn!("sudo cp spawn failed: {}", e),
}
}
// ── All attempts failed — give the user a clear manual command ────────────
bail!(
"All copy methods failed. Run this command manually, then restart ZeroClaw:\n\
\n sudo cp {src_str} {dst_str}\n"
)
}
/// Wait for `/dev/cu.usbmodem*` (macOS) or `/dev/ttyACM*` (Linux) to appear.
///
/// Polls every `interval` for up to `timeout`. Returns the first matching path
/// found, or `None` if the deadline expires.
pub async fn wait_for_serial_port(
timeout: std::time::Duration,
interval: std::time::Duration,
) -> Option<PathBuf> {
#[cfg(target_os = "macos")]
let patterns = &["/dev/cu.usbmodem*"];
#[cfg(target_os = "linux")]
let patterns = &["/dev/ttyACM*"];
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
let patterns: &[&str] = &[];
let deadline = tokio::time::Instant::now() + timeout;
loop {
for pattern in *patterns {
if let Ok(mut hits) = glob::glob(pattern) {
if let Some(Ok(path)) = hits.next() {
return Some(path);
}
}
}
if tokio::time::Instant::now() >= deadline {
return None;
}
tokio::time::sleep(interval).await;
}
}
// ── Deploy main.py via mpremote ───────────────────────────────────────────────
/// Copy `main.py` to the Pico's MicroPython filesystem and soft-reset it.
///
/// After the UF2 is flashed the Pico reboots into MicroPython but has no
/// `main.py` on its internal filesystem. This function uses `mpremote` to
/// upload the bundled `main.py` and issue a reset so it starts executing
/// immediately.
///
/// Returns `Ok(())` on success or an error with a helpful fallback command.
pub async fn deploy_main_py(port: &Path, firmware_dir: &Path) -> Result<()> {
let main_py_src = firmware_dir.join("main.py");
let src_str = main_py_src.to_string_lossy().into_owned();
let port_str = port.to_string_lossy().into_owned();
if !main_py_src.exists() {
bail!(
"main.py not found at {} — run ensure_firmware_dir() first",
main_py_src.display()
);
}
tracing::info!(
src = %src_str,
port = %port_str,
"deploying main.py via mpremote"
);
let out = tokio::process::Command::new("mpremote")
.args([
"connect", &port_str, "cp", &src_str, ":main.py", "+", "reset",
])
.output()
.await;
match out {
Ok(o) if o.status.success() => {
tracing::info!("main.py deployed and Pico reset via mpremote");
Ok(())
}
Ok(o) => {
let stderr = String::from_utf8_lossy(&o.stderr);
bail!(
"mpremote failed (exit {}): {}.\n\
Run manually:\n mpremote connect {port_str} cp {src_str} :main.py + reset",
o.status,
stderr.trim()
)
}
Err(e) => {
bail!(
"mpremote not found or could not start ({e}).\n\
Install it with: pip install mpremote\n\
Then run: mpremote connect {port_str} cp {src_str} :main.py + reset"
)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pico_uf2_has_valid_magic() {
assert!(
PICO_UF2.len() >= 8,
"bundled UF2 too small ({} bytes) — replace with real MicroPython UF2",
PICO_UF2.len()
);
assert_eq!(
&PICO_UF2[..4],
&UF2_MAGIC1,
"bundled UF2 has wrong magic — replace with real MicroPython UF2 from \
https://micropython.org/download/RPI_PICO/"
);
}
#[test]
fn pico_main_py_is_non_empty() {
assert!(!PICO_MAIN_PY.is_empty(), "bundled main.py is empty");
}
#[test]
fn pico_main_py_contains_zeroclaw_marker() {
let src = std::str::from_utf8(PICO_MAIN_PY).expect("main.py is not valid UTF-8");
assert!(
src.contains("zeroclaw"),
"main.py should contain 'zeroclaw' firmware marker"
);
}
#[test]
fn find_rpi_rp2_mount_returns_none_when_not_connected() {
// This test runs on CI without a Pico attached — just verify it doesn't panic.
let _ = find_rpi_rp2_mount(); // may be Some or None depending on environment
}
}