Merge pull request #2323 from zeroclaw-labs/pr-1837-s34-main-rebased

feat(hardware): replay pico toolchain + prompt wiring on main [RMN-1837]
This commit is contained in:
Argenis 2026-03-05 01:53:06 -05:00 committed by GitHub
commit d22657fac0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1633 additions and 17 deletions

View File

@ -1196,6 +1196,49 @@ pub(super) fn parse_glm_shortened_body(body: &str) -> Option<ParsedToolCall> {
None
}
/// Parse shorthand tag body format `tool_name{...}` used by some models
/// inside `<tool_call>...</tool_call>` wrappers.
///
/// Example:
/// `<tool_call>shell{"command":"ls -la"}</tool_call>`
fn parse_shorthand_tag_call(body: &str) -> Option<ParsedToolCall> {
let body = body.trim();
if body.is_empty() {
return None;
}
let open_brace = body.find('{')?;
let close_brace = body.rfind('}')?;
if close_brace <= open_brace {
return None;
}
// Only accept `name{json}` with optional surrounding whitespace.
if !body[close_brace + 1..].trim().is_empty() {
return None;
}
let raw_name = body[..open_brace].trim().trim_end_matches(':').trim();
if raw_name.is_empty()
|| !raw_name
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
{
return None;
}
let args = serde_json::from_str::<serde_json::Value>(&body[open_brace..=close_brace]).ok()?;
if !args.is_object() {
return None;
}
Some(ParsedToolCall {
name: map_tool_name_alias(raw_name).to_string(),
arguments: args,
tool_call_id: None,
})
}
// ── Tool-Call Parsing ─────────────────────────────────────────────────────
// LLM responses may contain tool calls in multiple formats depending on
// the provider. Parsing follows a priority chain:
@ -1288,9 +1331,20 @@ pub(super) fn parse_tool_calls(response: &str) -> (String, Vec<ParsedToolCall>)
}
}
if !parsed_any {
if let Some(call) = parse_shorthand_tag_call(inner) {
tracing::debug!(
tool = %call.name,
"parsed shorthand tool call body inside <tool_call>"
);
calls.push(call);
parsed_any = true;
}
}
if !parsed_any {
tracing::warn!(
"Malformed <tool_call>: expected tool-call object in tag body (JSON/XML/GLM)"
"Malformed <tool_call>: expected tool-call object in tag body (JSON/XML/GLM/shorthand)"
);
}
@ -1330,6 +1384,13 @@ pub(super) fn parse_tool_calls(response: &str) -> (String, Vec<ParsedToolCall>)
}
}
if !parsed_any {
if let Some(call) = parse_shorthand_tag_call(inner) {
calls.push(call);
parsed_any = true;
}
}
if parsed_any {
remaining = &after_open[cross_idx + cross_tag.len()..];
resolved = true;
@ -1720,3 +1781,24 @@ pub(super) fn parse_structured_tool_calls(
result
}
#[cfg(test)]
mod tests {
use super::parse_tool_calls;
#[test]
fn parse_tool_calls_accepts_shorthand_object_in_tag_body() {
let response = r#"<tool_call>shell{"command":"echo hi"}</tool_call>"#;
let (_text, calls) = parse_tool_calls(response);
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].name, "shell");
assert_eq!(calls[0].arguments["command"], "echo hi");
}
#[test]
fn parse_tool_calls_rejects_non_object_shorthand_payload() {
let response = r#"<tool_call>shell["echo hi"]</tool_call>"#;
let (_text, calls) = parse_tool_calls(response);
assert!(calls.is_empty());
}
}

View File

@ -4513,25 +4513,72 @@ pub fn build_system_prompt_with_mode(
prompt.push('\n');
}
// ── 1b. Hardware (when gpio/arduino tools present) ───────────
let has_hardware = tools.iter().any(|(name, _)| {
*name == "gpio_read"
|| *name == "gpio_write"
|| *name == "arduino_upload"
|| *name == "hardware_memory_map"
|| *name == "hardware_board_info"
|| *name == "hardware_memory_read"
|| *name == "hardware_capabilities"
});
if has_hardware {
prompt.push_str(
// ── 1b. Hardware (when hardware tools are present) ───────────
let hardware_tool_names = [
"gpio_read",
"gpio_write",
"arduino_upload",
"hardware_memory_map",
"hardware_board_info",
"hardware_memory_read",
"hardware_capabilities",
"pico_flash",
"device_read_code",
"device_write_code",
"device_exec",
];
let hw_tools: Vec<&str> = tools
.iter()
.filter_map(|(name, _)| {
if hardware_tool_names.contains(name) {
Some(*name)
} else {
None
}
})
.collect();
if !hw_tools.is_empty() {
let hw_names = hw_tools.join(", ");
let _ = writeln!(
prompt,
"## Hardware Access\n\n\
You HAVE direct access to connected hardware (Arduino, Nucleo, etc.). The user owns this system and has configured it.\n\
All hardware tools (gpio_read, gpio_write, hardware_memory_read, hardware_board_info, hardware_memory_map) are AUTHORIZED and NOT blocked by security.\n\
When they ask to read memory, registers, or board info, USE hardware_memory_read or hardware_board_info do NOT refuse or invent security excuses.\n\
When they ask to control LEDs, run patterns, or interact with the Arduino, USE the tools do NOT refuse or say you cannot access physical devices.\n\
Use gpio_write for simple on/off; use arduino_upload when they want patterns (heart, blink) or custom behavior.\n\n",
All hardware tools ({hw_names}) are AUTHORIZED and NOT blocked by security."
);
if hw_tools
.iter()
.any(|name| *name == "hardware_memory_read" || *name == "hardware_board_info")
{
prompt.push_str(
"When they ask to read memory, registers, or board info, USE hardware_memory_read or hardware_board_info — do NOT refuse or invent security excuses.\n",
);
}
if hw_tools
.iter()
.any(|name| *name == "gpio_read" || *name == "gpio_write" || *name == "arduino_upload")
{
prompt.push_str(
"When they ask to control LEDs, run patterns, or interact with the Arduino, USE the tools — do NOT refuse or say you cannot access physical devices.\n",
);
}
if hw_tools.contains(&"gpio_write") && hw_tools.contains(&"arduino_upload") {
prompt.push_str(
"Use gpio_write for simple on/off; use arduino_upload when they want patterns (heart, blink) or custom behavior.\n",
);
}
if hw_tools.contains(&"gpio_write") {
prompt.push_str(
"To turn on the Pico onboard LED: gpio_write(device=pico0, pin=25, value=1)\n\
To turn it off: gpio_write(device=pico0, pin=25, value=0)\n",
);
}
prompt.push('\n');
}
// ── 1c. Action instruction (avoid meta-summary) ───────────────

102
src/firmware/pico/main.py Normal file
View File

@ -0,0 +1,102 @@
# ZeroClaw Pico firmware — serial JSON protocol handler
# MicroPython — Raspberry Pi Pico (RP2040)
#
# Wire protocol:
# Host → Device: {"cmd":"gpio_write","params":{"pin":25,"value":1}}\n
# Device → Host: {"ok":true,"data":{"pin":25,"value":1,"state":"HIGH"}}\n
#
# Pin direction policy:
# gpio_write always configures the pin as OUTPUT and caches it.
# gpio_read uses the cached Pin object if one already exists, so a pin
# that was set via gpio_write retains its OUTPUT direction — it is NOT
# reconfigured to INPUT. If no cached Pin exists the pin is opened as
# INPUT and the new Pin is cached for subsequent reads.
import sys
import json
from machine import Pin
# Onboard LED — GPIO 25 on Pico 1
led = Pin(25, Pin.OUT)
# Cache of Pin objects keyed by pin number (excludes the onboard LED on 25).
# gpio_write stores pins as OUTPUT; gpio_read reuses the existing Pin if one
# is cached rather than clobbering its direction.
pins_cache = {}
def handle(msg):
cmd = msg.get("cmd")
params = msg.get("params", {})
if cmd == "ping":
# data.firmware must equal "zeroclaw" for ping_handshake() to pass
return {"ok": True, "data": {"firmware": "zeroclaw", "version": "1.0.0"}}
elif cmd == "gpio_write":
pin_num = params.get("pin")
value = params.get("value")
if pin_num is None or value is None:
return {"ok": False, "error": "missing pin or value"}
# Validate/cast pin_num to int (JSON may deliver it as float or string)
try:
pin_num = int(pin_num)
except (TypeError, ValueError):
return {"ok": False, "error": "invalid pin"}
if pin_num < 0:
return {"ok": False, "error": "invalid pin"}
# Normalize value: accept bool or int, must resolve to 0 or 1.
if isinstance(value, bool):
value = int(value)
if not isinstance(value, int) or value not in (0, 1):
return {"ok": False, "error": "invalid value: must be 0 or 1"}
if pin_num == 25:
led.value(value)
else:
# Reuse a cached Pin object when available to avoid repeated
# allocations; re-initialise direction to OUT in case it was
# previously opened as IN by gpio_read.
if pin_num in pins_cache:
pins_cache[pin_num].init(mode=Pin.OUT)
else:
pins_cache[pin_num] = Pin(pin_num, Pin.OUT)
pins_cache[pin_num].value(value)
state = "HIGH" if value == 1 else "LOW"
return {"ok": True, "data": {"pin": pin_num, "value": value, "state": state}}
elif cmd == "gpio_read":
pin_num = params.get("pin")
if pin_num is None:
return {"ok": False, "error": "missing pin"}
# Validate/cast pin_num to int
try:
pin_num = int(pin_num)
except (TypeError, ValueError):
return {"ok": False, "error": "invalid pin"}
if pin_num < 0:
return {"ok": False, "error": "invalid pin"}
value = led.value() if pin_num == 25 else (
pins_cache[pin_num].value() if pin_num in pins_cache
else pins_cache.setdefault(pin_num, Pin(pin_num, Pin.IN)).value()
)
state = "HIGH" if value == 1 else "LOW"
return {"ok": True, "data": {"pin": pin_num, "value": value, "state": state}}
else:
return {"ok": False, "error": "unknown cmd: {}".format(cmd)}
while True:
try:
line = sys.stdin.readline().strip()
if not line:
continue
msg = json.loads(line)
result = handle(msg)
print(json.dumps(result))
except (ValueError, KeyError, TypeError, OSError, AttributeError) as e:
# ValueError — json.loads() on malformed input
# KeyError — unexpected missing key in a message dict
# TypeError — wrong type in an operation
# OSError — GPIO/hardware errors from Pin()/Pin.value()
# AttributeError — msg.get(...) called on non-dict JSON value
# Any other exception propagates so bugs surface during development.
print(json.dumps({"ok": False, "error": str(e)}))

Binary file not shown.

View File

@ -23,6 +23,15 @@ pub mod introspect;
#[cfg(feature = "hardware")]
pub mod serial;
#[cfg(feature = "hardware")]
pub mod uf2;
#[cfg(feature = "hardware")]
pub mod pico_flash;
#[cfg(feature = "hardware")]
pub mod pico_code;
use crate::config::Config;
use anyhow::Result;
@ -40,6 +49,12 @@ pub use protocol::{ZcCommand, ZcResponse};
#[allow(unused_imports)]
pub use transport::{Transport, TransportError, TransportKind};
#[cfg(feature = "hardware")]
#[allow(unused_imports)]
pub use pico_code::{device_code_tools, DeviceExecTool, DeviceReadCodeTool, DeviceWriteCodeTool};
#[cfg(feature = "hardware")]
#[allow(unused_imports)]
pub use pico_flash::PicoFlashTool;
#[cfg(feature = "hardware")]
#[allow(unused_imports)]
pub use serial::HardwareSerialTransport;

723
src/hardware/pico_code.rs Normal file
View File

@ -0,0 +1,723 @@
//! Phase 7 — Dynamic code tools: `device_read_code`, `device_write_code`, `device_exec`.
//!
//! These tools let the LLM read, write, and execute code on any connected
//! hardware device. The `DeviceRuntime` on each device determines which
//! host-side tooling is used:
//!
//! - **MicroPython / CircuitPython** — `mpremote` for code read/write/exec.
//! - **Arduino / Nucleus / Linux** — not yet implemented; returns a clear error.
//!
//! When the `device` parameter is omitted, each tool auto-selects the device
//! only when **exactly one** device is registered. If multiple devices are
//! present the tool returns an error and requires an explicit `device` parameter.
use super::device::{DeviceRegistry, DeviceRuntime};
use crate::tools::traits::{Tool, ToolResult};
use async_trait::async_trait;
use serde_json::json;
use std::sync::Arc;
use tokio::sync::RwLock;
/// Default timeout for `mpremote` operations (seconds).
const MPREMOTE_TIMEOUT_SECS: u64 = 30;
/// Maximum time to wait for the serial port after a reset (seconds).
const PORT_WAIT_SECS: u64 = 15;
/// Polling interval when waiting for a serial port (ms).
const PORT_POLL_MS: u64 = 200;
// ── helpers ───────────────────────────────────────────────────────────────────
/// Resolve the serial port path and runtime for a device.
///
/// If `device_alias` is provided, look it up; otherwise auto-selects the device
/// only when exactly one device is registered. With multiple devices present,
/// returns an error requiring an explicit alias.
/// Returns `(alias, port, runtime)` or an error `ToolResult`.
async fn resolve_device_port(
registry: &RwLock<DeviceRegistry>,
device_alias: Option<&str>,
) -> Result<(String, String, DeviceRuntime), ToolResult> {
let reg = registry.read().await;
let alias: String = match device_alias {
Some(a) => a.to_string(),
None => {
// Auto-select the first device.
let all_aliases: Vec<String> =
reg.aliases().into_iter().map(|a| a.to_string()).collect();
match all_aliases.as_slice() {
[single] => single.clone(),
[] => {
return Err(ToolResult {
success: false,
output: String::new(),
error: Some("no device found — is a board connected via USB?".to_string()),
});
}
multiple => {
return Err(ToolResult {
success: false,
output: String::new(),
error: Some(format!(
"multiple devices found ({}); specify the \"device\" parameter",
multiple.join(", ")
)),
});
}
}
}
};
let device = reg.get_device(&alias).ok_or_else(|| ToolResult {
success: false,
output: String::new(),
error: Some(format!("device '{alias}' not found in registry")),
})?;
let runtime = device.runtime;
let port = device.port().ok_or_else(|| ToolResult {
success: false,
output: String::new(),
error: Some(format!(
"device '{alias}' has no serial port — is it connected?"
)),
})?;
Ok((alias, port.to_string(), runtime))
}
/// Return an unsupported-runtime error `ToolResult` for a given tool name.
fn unsupported_runtime(runtime: &DeviceRuntime, tool: &str) -> ToolResult {
ToolResult {
success: false,
output: String::new(),
error: Some(format!(
"{runtime} runtime is not yet supported for {tool} — coming soon"
)),
}
}
/// Run an `mpremote` command with a timeout and return (stdout, stderr).
async fn run_mpremote(args: &[&str], timeout_secs: u64) -> Result<(String, String), String> {
use tokio::time::timeout;
let result = timeout(
std::time::Duration::from_secs(timeout_secs),
tokio::process::Command::new("mpremote").args(args).output(),
)
.await;
match result {
Ok(Ok(output)) => {
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if output.status.success() {
Ok((stdout, stderr))
} else {
Err(format!(
"mpremote failed (exit {}): {}",
output.status,
stderr.trim()
))
}
}
Ok(Err(e)) => Err(format!(
"mpremote not found or could not start ({e}). \
Install it with: pip install mpremote"
)),
Err(_) => Err(format!(
"mpremote timed out after {timeout_secs}s — \
the device may be unresponsive"
)),
}
}
// ── DeviceReadCodeTool ────────────────────────────────────────────────────────
/// Tool: read the current `main.py` from a connected device.
///
/// The LLM uses this to understand the current program before modifying it.
pub struct DeviceReadCodeTool {
registry: Arc<RwLock<DeviceRegistry>>,
}
impl DeviceReadCodeTool {
pub fn new(registry: Arc<RwLock<DeviceRegistry>>) -> Self {
Self { registry }
}
}
#[async_trait]
impl Tool for DeviceReadCodeTool {
fn name(&self) -> &str {
"device_read_code"
}
fn description(&self) -> &str {
"Read the current program (main.py) running on a connected device. \
Use this before writing new code so you understand the current state."
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"additionalProperties": false,
"properties": {
"device": {
"type": "string",
"description": "Device alias e.g. pico0, esp0. Auto-selected if only one device is connected."
}
},
"required": []
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
let device_alias = args.get("device").and_then(|v| v.as_str());
let (alias, port, runtime) = match resolve_device_port(&self.registry, device_alias).await {
Ok(v) => v,
Err(tool_result) => return Ok(tool_result),
};
// Runtime dispatch.
match runtime {
DeviceRuntime::MicroPython | DeviceRuntime::CircuitPython => {}
other => return Ok(unsupported_runtime(&other, "device_read_code")),
}
tracing::info!(alias = %alias, port = %port, runtime = %runtime, "reading main.py from device");
match run_mpremote(
&["connect", &port, "cat", ":main.py"],
MPREMOTE_TIMEOUT_SECS,
)
.await
{
Ok((stdout, _stderr)) => Ok(ToolResult {
success: true,
output: if stdout.trim().is_empty() {
format!("main.py on {alias} is empty or not found.")
} else {
format!(
"Current main.py on {alias}:\n\n```python\n{}\n```",
stdout.trim()
)
},
error: None,
}),
Err(e) => {
// mpremote cat fails if main.py doesn't exist — not a fatal error.
if e.contains("OSError") || e.contains("no such file") || e.contains("ENOENT") {
Ok(ToolResult {
success: true,
output: format!(
"No main.py found on {alias} — the device has no program yet."
),
error: None,
})
} else {
Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("Failed to read code from {alias}: {e}")),
})
}
}
}
}
}
// ── DeviceWriteCodeTool ───────────────────────────────────────────────────────
/// Tool: write a complete program to a device as `main.py`.
///
/// This replaces the current `main.py` on the device and resets it so the new
/// program starts executing immediately.
pub struct DeviceWriteCodeTool {
registry: Arc<RwLock<DeviceRegistry>>,
}
impl DeviceWriteCodeTool {
pub fn new(registry: Arc<RwLock<DeviceRegistry>>) -> Self {
Self { registry }
}
}
#[async_trait]
impl Tool for DeviceWriteCodeTool {
fn name(&self) -> &str {
"device_write_code"
}
fn description(&self) -> &str {
"Write a complete program to a device — replaces main.py and restarts the device. \
Always read the current code first with device_read_code."
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"additionalProperties": false,
"properties": {
"device": {
"type": "string",
"description": "Device alias e.g. pico0, esp0. Auto-selected if only one device is connected."
},
"code": {
"type": "string",
"description": "Complete program to write as main.py"
}
},
"required": ["code"]
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
let code = match args.get("code").and_then(|v| v.as_str()) {
Some(c) => c,
None => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("missing required parameter: code".to_string()),
});
}
};
if code.trim().is_empty() {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("code parameter is empty — provide a program to write".to_string()),
});
}
let device_alias = args.get("device").and_then(|v| v.as_str());
let (alias, port, runtime) = match resolve_device_port(&self.registry, device_alias).await {
Ok(v) => v,
Err(tool_result) => return Ok(tool_result),
};
// Runtime dispatch.
match runtime {
DeviceRuntime::MicroPython | DeviceRuntime::CircuitPython => {}
other => return Ok(unsupported_runtime(&other, "device_write_code")),
}
tracing::info!(alias = %alias, port = %port, runtime = %runtime, code_len = code.len(), "writing main.py to device");
// Write code to an atomic, owner-only temp file via tempfile crate.
let named_tmp = match tokio::task::spawn_blocking(|| {
tempfile::Builder::new()
.prefix("zeroclaw_main_")
.suffix(".py")
.tempfile()
})
.await
{
Ok(Ok(f)) => f,
Ok(Err(e)) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("failed to create temp file: {e}")),
});
}
Err(e) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("temp file task failed: {e}")),
});
}
};
let tmp_path = named_tmp.path().to_path_buf();
let tmp_str = tmp_path.to_string_lossy().to_string();
if let Err(e) = tokio::fs::write(&tmp_path, code).await {
// named_tmp dropped here — auto-removes the file.
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("failed to write temp file: {e}")),
});
}
// Deploy via mpremote: copy + reset.
let result = run_mpremote(
&["connect", &port, "cp", &tmp_str, ":main.py", "+", "reset"],
MPREMOTE_TIMEOUT_SECS,
)
.await;
// Explicit cleanup — log if removal fails rather than silently ignoring.
if let Err(e) = named_tmp.close() {
tracing::warn!(path = %tmp_str, err = %e, "failed to clean up temp file");
}
match result {
Ok((_stdout, _stderr)) => {
tracing::info!(alias = %alias, "main.py deployed and device reset");
// Wait for the serial port to reappear after reset.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let port_reappeared = wait_for_port(
&port,
std::time::Duration::from_secs(PORT_WAIT_SECS),
std::time::Duration::from_millis(PORT_POLL_MS),
)
.await;
if port_reappeared {
Ok(ToolResult {
success: true,
output: format!(
"Code deployed to {alias} — main.py updated and device reset. \
{alias} is back online."
),
error: None,
})
} else {
Ok(ToolResult {
success: true,
output: format!(
"Code deployed to {alias} — main.py updated and device reset. \
Note: serial port did not reappear within {PORT_WAIT_SECS}s; \
the device may still be booting."
),
error: None,
})
}
}
Err(e) => Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("Failed to deploy code to {alias}: {e}")),
}),
}
}
}
// ── DeviceExecTool ────────────────────────────────────────────────────────────
/// Tool: run a one-off code snippet on a device without modifying `main.py`.
///
/// Good for one-time commands, sensor reads, and testing code before committing.
pub struct DeviceExecTool {
registry: Arc<RwLock<DeviceRegistry>>,
}
impl DeviceExecTool {
pub fn new(registry: Arc<RwLock<DeviceRegistry>>) -> Self {
Self { registry }
}
}
#[async_trait]
impl Tool for DeviceExecTool {
fn name(&self) -> &str {
"device_exec"
}
fn description(&self) -> &str {
"Execute a code snippet on a connected device without modifying main.py. \
Good for one-time actions, sensor reads, and testing before writing permanent code."
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"additionalProperties": false,
"properties": {
"device": {
"type": "string",
"description": "Device alias e.g. pico0, esp0. Auto-selected if only one device is connected."
},
"code": {
"type": "string",
"description": "Code to execute. Output is captured and returned."
}
},
"required": ["code"]
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
let code = match args.get("code").and_then(|v| v.as_str()) {
Some(c) => c,
None => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("missing required parameter: code".to_string()),
});
}
};
if code.trim().is_empty() {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(
"code parameter is empty — provide a code snippet to execute".to_string(),
),
});
}
let device_alias = args.get("device").and_then(|v| v.as_str());
let (alias, port, runtime) = match resolve_device_port(&self.registry, device_alias).await {
Ok(v) => v,
Err(tool_result) => return Ok(tool_result),
};
// Runtime dispatch.
match runtime {
DeviceRuntime::MicroPython | DeviceRuntime::CircuitPython => {}
other => return Ok(unsupported_runtime(&other, "device_exec")),
}
tracing::info!(alias = %alias, port = %port, runtime = %runtime, code_len = code.len(), "executing snippet on device");
// Write snippet to an atomic, owner-only temp file via tempfile crate.
let named_tmp = match tokio::task::spawn_blocking(|| {
tempfile::Builder::new()
.prefix("zeroclaw_exec_")
.suffix(".py")
.tempfile()
})
.await
{
Ok(Ok(f)) => f,
Ok(Err(e)) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("failed to create temp file: {e}")),
});
}
Err(e) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("temp file task failed: {e}")),
});
}
};
let tmp_path = named_tmp.path().to_path_buf();
let tmp_str = tmp_path.to_string_lossy().to_string();
if let Err(e) = tokio::fs::write(&tmp_path, code).await {
// named_tmp dropped here — auto-removes the file.
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("failed to write temp file: {e}")),
});
}
// Execute via mpremote run (does NOT modify main.py).
let result =
run_mpremote(&["connect", &port, "run", &tmp_str], MPREMOTE_TIMEOUT_SECS).await;
// Explicit cleanup — log if removal fails rather than silently ignoring.
if let Err(e) = named_tmp.close() {
tracing::warn!(path = %tmp_str, err = %e, "failed to clean up temp file");
}
match result {
Ok((stdout, stderr)) => {
let output = if stdout.trim().is_empty() && !stderr.trim().is_empty() {
// Some MicroPython output goes to stderr (e.g. exceptions).
stderr.trim().to_string()
} else {
stdout.trim().to_string()
};
Ok(ToolResult {
success: true,
output: if output.is_empty() {
format!("Code executed on {alias} — no output produced.")
} else {
format!("Output from {alias}:\n{output}")
},
error: None,
})
}
Err(e) => Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("Failed to execute code on {alias}: {e}")),
}),
}
}
}
// ── port wait helper ──────────────────────────────────────────────────────────
/// Poll for a specific serial port to reappear after a device reset.
///
/// Returns `true` if the port exists within the timeout, `false` otherwise.
async fn wait_for_port(
port_path: &str,
timeout: std::time::Duration,
interval: std::time::Duration,
) -> bool {
let deadline = tokio::time::Instant::now() + timeout;
while tokio::time::Instant::now() < deadline {
if std::path::Path::new(port_path).exists() {
return true;
}
tokio::time::sleep(interval).await;
}
false
}
/// Factory function: create all Phase 7 dynamic code tools.
pub fn device_code_tools(registry: Arc<RwLock<DeviceRegistry>>) -> Vec<Box<dyn Tool>> {
vec![
Box::new(DeviceReadCodeTool::new(registry.clone())),
Box::new(DeviceWriteCodeTool::new(registry.clone())),
Box::new(DeviceExecTool::new(registry)),
]
}
#[cfg(test)]
mod tests {
use super::*;
fn empty_registry() -> Arc<RwLock<DeviceRegistry>> {
Arc::new(RwLock::new(DeviceRegistry::new()))
}
// ── DeviceReadCodeTool ───────────────────────────────────────────
#[test]
fn device_read_code_name() {
let tool = DeviceReadCodeTool::new(empty_registry());
assert_eq!(tool.name(), "device_read_code");
}
#[test]
fn device_read_code_schema_valid() {
let tool = DeviceReadCodeTool::new(empty_registry());
let schema = tool.parameters_schema();
assert_eq!(schema["type"], "object");
assert!(schema["properties"]["device"].is_object());
}
#[tokio::test]
async fn device_read_code_no_device_returns_error() {
let tool = DeviceReadCodeTool::new(empty_registry());
let result = tool.execute(json!({})).await.unwrap();
assert!(!result.success);
assert!(
result.error.as_deref().unwrap_or("").contains("no device"),
"expected 'no device' error; got: {:?}",
result.error
);
}
// ── DeviceWriteCodeTool ──────────────────────────────────────────
#[test]
fn device_write_code_name() {
let tool = DeviceWriteCodeTool::new(empty_registry());
assert_eq!(tool.name(), "device_write_code");
}
#[test]
fn device_write_code_schema_requires_code() {
let tool = DeviceWriteCodeTool::new(empty_registry());
let schema = tool.parameters_schema();
let required = schema["required"].as_array().expect("required array");
assert!(
required.iter().any(|v| v.as_str() == Some("code")),
"code should be required"
);
}
#[tokio::test]
async fn device_write_code_empty_code_rejected() {
let tool = DeviceWriteCodeTool::new(empty_registry());
let result = tool.execute(json!({"code": ""})).await.unwrap();
assert!(!result.success);
assert!(result.error.as_deref().unwrap_or("").contains("empty"));
}
#[tokio::test]
async fn device_write_code_no_device_returns_error() {
let tool = DeviceWriteCodeTool::new(empty_registry());
let result = tool
.execute(json!({"code": "print('hello')"}))
.await
.unwrap();
assert!(!result.success);
assert!(result.error.as_deref().unwrap_or("").contains("no device"),);
}
// ── DeviceExecTool ───────────────────────────────────────────────
#[test]
fn device_exec_name() {
let tool = DeviceExecTool::new(empty_registry());
assert_eq!(tool.name(), "device_exec");
}
#[test]
fn device_exec_schema_requires_code() {
let tool = DeviceExecTool::new(empty_registry());
let schema = tool.parameters_schema();
let required = schema["required"].as_array().expect("required array");
assert!(
required.iter().any(|v| v.as_str() == Some("code")),
"code should be required"
);
}
#[tokio::test]
async fn device_exec_empty_code_rejected() {
let tool = DeviceExecTool::new(empty_registry());
let result = tool.execute(json!({"code": " "})).await.unwrap();
assert!(!result.success);
assert!(result.error.as_deref().unwrap_or("").contains("empty"));
}
#[tokio::test]
async fn device_exec_no_device_returns_error() {
let tool = DeviceExecTool::new(empty_registry());
let result = tool.execute(json!({"code": "print(1+1)"})).await.unwrap();
assert!(!result.success);
assert!(result.error.as_deref().unwrap_or("").contains("no device"),);
}
// ── Factory ──────────────────────────────────────────────────────
#[test]
fn factory_returns_three_tools() {
let reg = empty_registry();
let tools = device_code_tools(reg);
assert_eq!(tools.len(), 3);
let names: Vec<&str> = tools.iter().map(|t| t.name()).collect();
assert!(names.contains(&"device_read_code"));
assert!(names.contains(&"device_write_code"));
assert!(names.contains(&"device_exec"));
}
#[test]
fn all_specs_valid() {
let reg = empty_registry();
let tools = device_code_tools(reg);
for tool in &tools {
let spec = tool.spec();
assert!(!spec.name.is_empty());
assert!(!spec.description.is_empty());
assert_eq!(spec.parameters["type"], "object");
}
}
}

296
src/hardware/pico_flash.rs Normal file
View File

@ -0,0 +1,296 @@
//! `pico_flash` tool — flash ZeroClaw firmware to a Pico in BOOTSEL mode.
//!
//! # Happy path
//! 1. User holds BOOTSEL while plugging in Pico → RPI-RP2 drive appears.
//! 2. User asks "flash my pico".
//! 3. LLM calls `pico_flash(confirm=true)`.
//! 4. Tool copies UF2 to RPI-RP2 drive; Pico reboots into MicroPython.
//! 5. Tool waits up to 20 s for `/dev/cu.usbmodem*` to appear.
//! 6. Tool deploys `main.py` via `mpremote` and resets the Pico.
//! 7. Tool waits for the serial port to reappear after reset.
//! 8. Tool returns success; user restarts ZeroClaw to get `pico0`.
use super::device::DeviceRegistry;
use super::uf2;
use crate::tools::traits::{Tool, ToolResult};
use async_trait::async_trait;
use serde_json::json;
use std::sync::Arc;
use tokio::sync::RwLock;
/// How long to wait for the Pico serial port after flashing (seconds).
const PORT_WAIT_SECS: u64 = 20;
/// How often to poll for the serial port.
const PORT_POLL_MS: u64 = 500;
// ── PicoFlashTool ─────────────────────────────────────────────────────────────
/// Tool: flash ZeroClaw MicroPython firmware to a Pico in BOOTSEL mode.
///
/// The Pico must be connected with BOOTSEL held so it mounts as `RPI-RP2`.
/// After flashing, the tool deploys `main.py` via `mpremote`, then reconnects
/// the serial transport in the [`DeviceRegistry`] so subsequent `gpio_write`
/// calls work immediately without restarting ZeroClaw.
pub struct PicoFlashTool {
registry: Arc<RwLock<DeviceRegistry>>,
}
impl PicoFlashTool {
pub fn new(registry: Arc<RwLock<DeviceRegistry>>) -> Self {
Self { registry }
}
}
#[async_trait]
impl Tool for PicoFlashTool {
fn name(&self) -> &str {
"pico_flash"
}
fn description(&self) -> &str {
"Flash ZeroClaw firmware to a Raspberry Pi Pico in BOOTSEL mode. \
The Pico must be connected with the BOOTSEL button held (shows as RPI-RP2 drive in Finder). \
After flashing the Pico reboots, main.py is deployed, and the serial \
connection is refreshed automatically no restart needed."
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"confirm": {
"type": "boolean",
"description": "Set to true to confirm flashing the Pico firmware"
}
},
"required": ["confirm"]
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
// ── 1. Require explicit confirmation ──────────────────────────────
let confirmed = args
.get("confirm")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if !confirmed {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(
"Set confirm=true to proceed with flashing. \
This will overwrite the firmware on the connected Pico."
.to_string(),
),
});
}
// ── 2. Detect BOOTSEL-mode Pico ───────────────────────────────────
let mount = match uf2::find_rpi_rp2_mount() {
Some(m) => m,
None => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(
"No Pico in BOOTSEL mode found (RPI-RP2 drive not detected). \
Hold the BOOTSEL button while plugging the Pico in via USB, \
then try again."
.to_string(),
),
});
}
};
tracing::info!(mount = %mount.display(), "RPI-RP2 volume found");
// ── 3. Ensure firmware files are extracted ────────────────────────
let firmware_dir = match uf2::ensure_firmware_dir() {
Ok(d) => d,
Err(e) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("firmware error: {e}")),
});
}
};
// ── 4. Flash UF2 ─────────────────────────────────────────────────
if let Err(e) = uf2::flash_uf2(&mount, &firmware_dir).await {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("flash failed: {e}")),
});
}
// ── 5. Wait for serial port to appear ─────────────────────────────
let port = uf2::wait_for_serial_port(
std::time::Duration::from_secs(PORT_WAIT_SECS),
std::time::Duration::from_millis(PORT_POLL_MS),
)
.await;
let port = match port {
Some(p) => p,
None => {
// Flash likely succeeded even if port didn't appear in time —
// some host systems are slower to enumerate the new port.
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!(
"UF2 copied to {} but serial port did not appear within {PORT_WAIT_SECS}s. \
Unplug and replug the Pico, then run:\n \
mpremote connect <port> cp ~/.zeroclaw/firmware/pico/main.py :main.py + reset",
mount.display()
)),
});
}
};
tracing::info!(port = %port.display(), "Pico serial port online after UF2 flash");
// ── 6. Deploy main.py via mpremote ────────────────────────────────
if let Err(e) = uf2::deploy_main_py(&port, &firmware_dir).await {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("main.py deploy failed: {e}")),
});
}
// ── 7. Wait for serial port after mpremote reset ──────────────────
//
// mpremote resets the Pico so the serial port disappears briefly.
// Give the OS a moment to drop the old entry before polling.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let final_port = uf2::wait_for_serial_port(
std::time::Duration::from_secs(PORT_WAIT_SECS),
std::time::Duration::from_millis(PORT_POLL_MS),
)
.await;
// ── 8. Reconnect serial transport in DeviceRegistry ──────────────
//
// The old transport still points at a stale port handle from before
// the flash. Reconnect so gpio_write works immediately.
let reconnect_result = match &final_port {
Some(p) => {
let port_str = p.to_string_lossy();
let mut reg = self.registry.write().await;
// Try to find a pico alias in the registry.
match reg.aliases().into_iter().find(|a| a.starts_with("pico")) {
Some(a) => {
let alias = a.to_string();
reg.reconnect(&alias, Some(&port_str)).await
}
None => Err(anyhow::anyhow!(
"no pico alias found in registry; cannot reconnect transport"
)),
}
}
None => Err(anyhow::anyhow!("no serial port to reconnect")),
};
// ── 9. Return result ──────────────────────────────────────────────
match final_port {
Some(p) => {
let port_str = p.display().to_string();
let reconnected = reconnect_result.is_ok();
if reconnected {
tracing::info!(port = %port_str, "Pico online with main.py — transport reconnected");
} else {
let err = reconnect_result.unwrap_err();
tracing::warn!(port = %port_str, err = %err, "Pico online but reconnect failed");
}
let suffix = if reconnected {
"pico0 is ready — you can use gpio_write immediately."
} else {
"Restart ZeroClaw to reconnect as pico0."
};
Ok(ToolResult {
success: true,
output: format!(
"Pico flashed and main.py deployed successfully. \
Firmware is online at {port_str}. {suffix}"
),
error: None,
})
}
None => Ok(ToolResult {
success: true,
output: format!(
"Pico flashed and main.py deployed. \
Serial port did not reappear within {PORT_WAIT_SECS}s after reset \
unplug and replug the Pico, then restart ZeroClaw to connect as pico0."
),
error: None,
}),
}
}
}
#[cfg(test)]
mod tests {
use super::super::device::DeviceRegistry;
use super::*;
fn tool() -> PicoFlashTool {
let registry = Arc::new(RwLock::new(DeviceRegistry::new()));
PicoFlashTool::new(registry)
}
#[test]
fn name_is_pico_flash() {
let t = tool();
assert_eq!(t.name(), "pico_flash");
}
#[test]
fn schema_requires_confirm() {
let schema = tool().parameters_schema();
let required = schema["required"].as_array().expect("required array");
assert!(
required.iter().any(|v| v.as_str() == Some("confirm")),
"confirm should be required"
);
}
#[tokio::test]
async fn execute_without_confirm_returns_error() {
let result = tool()
.execute(serde_json::json!({"confirm": false}))
.await
.unwrap();
assert!(!result.success);
assert!(result.error.is_some());
let err = result.error.unwrap();
assert!(
err.contains("confirm=true"),
"error should mention confirm=true; got: {err}"
);
}
#[tokio::test]
async fn execute_missing_confirm_returns_error() {
let result = tool().execute(serde_json::json!({})).await.unwrap();
assert!(!result.success);
}
#[tokio::test]
async fn execute_with_confirm_true_but_no_pico_returns_error() {
// In CI there's no Pico attached — the tool should report missing device, not panic.
let result = tool()
.execute(serde_json::json!({"confirm": true}))
.await
.unwrap();
// Either success (if a Pico happens to be connected) or the BOOTSEL error.
// What must NOT happen: panic or anyhow error propagation.
let _ = result; // just verify it didn't panic
}
}

351
src/hardware/uf2.rs Normal file
View File

@ -0,0 +1,351 @@
//! UF2 flashing support — detect BOOTSEL-mode Pico and deploy firmware.
//!
//! # Workflow
//! 1. [`find_rpi_rp2_mount`] — check well-known mount points for the RPI-RP2 volume
//! that appears when a Pico is held in BOOTSEL mode.
//! 2. [`ensure_firmware_dir`] — extract the bundled firmware files to
//! `~/.zeroclaw/firmware/pico/` if they aren't there yet.
//! 3. [`flash_uf2`] — copy the UF2 to the mount point; the Pico reboots automatically.
//!
//! # Embedded assets
//! Both firmware files are compiled into the binary with `include_bytes!` so
//! users never need to download them separately.
use anyhow::{bail, Result};
use std::path::{Path, PathBuf};
// ── Embedded firmware ─────────────────────────────────────────────────────────
/// MicroPython UF2 binary — copied to RPI-RP2 to install the base runtime.
const PICO_UF2: &[u8] = include_bytes!("../firmware/pico/zeroclaw-pico.uf2");
/// ZeroClaw serial protocol handler — written to the Pico after MicroPython boots.
pub const PICO_MAIN_PY: &[u8] = include_bytes!("../firmware/pico/main.py");
/// UF2 magic word 1 (little-endian bytes at offset 0 of every UF2 block).
const UF2_MAGIC1: [u8; 4] = [0x55, 0x46, 0x32, 0x0A];
// ── Volume detection ──────────────────────────────────────────────────────────
/// Find the RPI-RP2 mount point if a Pico is connected in BOOTSEL mode.
///
/// Checks:
/// - macOS: `/Volumes/RPI-RP2`
/// - Linux: `/media/*/RPI-RP2` and `/run/media/*/RPI-RP2`
pub fn find_rpi_rp2_mount() -> Option<PathBuf> {
// macOS
let mac = PathBuf::from("/Volumes/RPI-RP2");
if mac.exists() {
return Some(mac);
}
// Linux — /media/<user>/RPI-RP2 or /run/media/<user>/RPI-RP2
for base in &["/media", "/run/media"] {
if let Ok(entries) = std::fs::read_dir(base) {
for entry in entries.flatten() {
let candidate = entry.path().join("RPI-RP2");
if candidate.exists() {
return Some(candidate);
}
}
}
}
None
}
// ── Firmware directory management ─────────────────────────────────────────────
/// Ensure `~/.zeroclaw/firmware/pico/` exists and contains the bundled assets.
///
/// Files are only written if they are absent — existing files are never overwritten
/// so users can substitute their own firmware.
///
/// Returns the firmware directory path.
pub fn ensure_firmware_dir() -> Result<PathBuf> {
use directories::BaseDirs;
let base = BaseDirs::new().ok_or_else(|| anyhow::anyhow!("cannot determine home directory"))?;
let firmware_dir = base
.home_dir()
.join(".zeroclaw")
.join("firmware")
.join("pico");
std::fs::create_dir_all(&firmware_dir)?;
// UF2 — validate magic before writing so a broken stub is caught early.
let uf2_path = firmware_dir.join("zeroclaw-pico.uf2");
if !uf2_path.exists() {
if PICO_UF2.len() < 8 || PICO_UF2[..4] != UF2_MAGIC1 {
bail!(
"Bundled UF2 is a placeholder — download the real MicroPython UF2 from \
https://micropython.org/download/RPI_PICO/ and place it at \
src/firmware/pico/zeroclaw-pico.uf2, then rebuild ZeroClaw."
);
}
std::fs::write(&uf2_path, PICO_UF2)?;
tracing::info!(path = %uf2_path.display(), "extracted bundled UF2");
}
// main.py — always check UF2 magic even if path already exists (user may
// have placed a stub). main.py has no such check — it's just text.
let main_py_path = firmware_dir.join("main.py");
if !main_py_path.exists() {
std::fs::write(&main_py_path, PICO_MAIN_PY)?;
tracing::info!(path = %main_py_path.display(), "extracted bundled main.py");
}
Ok(firmware_dir)
}
// ── Flashing ──────────────────────────────────────────────────────────────────
/// Copy the UF2 file to the RPI-RP2 mount point.
///
/// macOS often returns "Operation not permitted" for `std::fs::copy` on FAT
/// volumes presented by BOOTSEL-mode Picos. We try four approaches in order
/// and return a clear manual-fallback message if all fail:
///
/// 1. `std::fs::copy` — fast, no subprocess; works on most Linux setups.
/// 2. `cp <src> <dst>` — bypasses some macOS VFS permission layers.
/// 3. `sudo cp …` — escalates for locked volumes.
/// 4. Error — instructs the user to run the `sudo cp` manually.
pub async fn flash_uf2(mount_point: &Path, firmware_dir: &Path) -> Result<()> {
let uf2_src = firmware_dir.join("zeroclaw-pico.uf2");
let uf2_dst = mount_point.join("firmware.uf2");
let src_str = uf2_src.to_string_lossy().into_owned();
let dst_str = uf2_dst.to_string_lossy().into_owned();
tracing::info!(
src = %src_str,
dst = %dst_str,
"flashing UF2"
);
// Validate UF2 magic before any copy attempt — prevents flashing a stub.
let data = std::fs::read(&uf2_src)?;
if data.len() < 8 || data[..4] != UF2_MAGIC1 {
bail!(
"UF2 at {} does not look like a valid UF2 file (magic mismatch). \
Download from https://micropython.org/download/RPI_PICO/ and delete \
the existing file so ZeroClaw can re-extract it.",
uf2_src.display()
);
}
// ── Attempt 1: std::fs::copy (works on Linux, sometimes blocked on macOS) ─
{
let src = uf2_src.clone();
let dst = uf2_dst.clone();
let result = tokio::task::spawn_blocking(move || std::fs::copy(&src, &dst))
.await
.map_err(|e| anyhow::anyhow!("copy task panicked: {e}"));
match result {
Ok(Ok(_)) => {
tracing::info!("UF2 copy complete (std::fs::copy) — Pico will reboot");
return Ok(());
}
Ok(Err(e)) => tracing::warn!("std::fs::copy failed ({}), trying cp", e),
Err(e) => tracing::warn!("std::fs::copy task failed ({}), trying cp", e),
}
}
// ── Attempt 2: cp via subprocess ──────────────────────────────────────────
{
/// Timeout for subprocess copy attempts (seconds).
const CP_TIMEOUT_SECS: u64 = 10;
let out = tokio::time::timeout(
std::time::Duration::from_secs(CP_TIMEOUT_SECS),
tokio::process::Command::new("cp")
.arg(&src_str)
.arg(&dst_str)
.output(),
)
.await;
match out {
Err(_elapsed) => {
tracing::warn!("cp timed out after {}s, trying sudo cp", CP_TIMEOUT_SECS);
}
Ok(Ok(o)) if o.status.success() => {
tracing::info!("UF2 copy complete (cp) — Pico will reboot");
return Ok(());
}
Ok(Ok(o)) => {
let stderr = String::from_utf8_lossy(&o.stderr);
tracing::warn!("cp failed ({}), trying sudo cp", stderr.trim());
}
Ok(Err(e)) => tracing::warn!("cp spawn failed ({}), trying sudo cp", e),
}
}
// ── Attempt 3: sudo cp (non-interactive) ─────────────────────────────────
{
const SUDO_CP_TIMEOUT_SECS: u64 = 10;
let out = tokio::time::timeout(
std::time::Duration::from_secs(SUDO_CP_TIMEOUT_SECS),
tokio::process::Command::new("sudo")
.args(["-n", "cp", &src_str, &dst_str])
.output(),
)
.await;
match out {
Err(_elapsed) => {
tracing::warn!("sudo cp timed out after {}s", SUDO_CP_TIMEOUT_SECS);
}
Ok(Ok(o)) if o.status.success() => {
tracing::info!("UF2 copy complete (sudo cp) — Pico will reboot");
return Ok(());
}
Ok(Ok(o)) => {
let stderr = String::from_utf8_lossy(&o.stderr);
tracing::warn!("sudo cp failed: {}", stderr.trim());
}
Ok(Err(e)) => tracing::warn!("sudo cp spawn failed: {}", e),
}
}
// ── All attempts failed — give the user a clear manual command ────────────
bail!(
"All copy methods failed. Run this command manually, then restart ZeroClaw:\n\
\n sudo cp {src_str} {dst_str}\n"
)
}
/// Wait for `/dev/cu.usbmodem*` (macOS) or `/dev/ttyACM*` (Linux) to appear.
///
/// Polls every `interval` for up to `timeout`. Returns the first matching path
/// found, or `None` if the deadline expires.
pub async fn wait_for_serial_port(
timeout: std::time::Duration,
interval: std::time::Duration,
) -> Option<PathBuf> {
#[cfg(target_os = "macos")]
let patterns = &["/dev/cu.usbmodem*"];
#[cfg(target_os = "linux")]
let patterns = &["/dev/ttyACM*"];
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
let patterns: &[&str] = &[];
let deadline = tokio::time::Instant::now() + timeout;
loop {
for pattern in *patterns {
if let Ok(mut hits) = glob::glob(pattern) {
if let Some(Ok(path)) = hits.next() {
return Some(path);
}
}
}
if tokio::time::Instant::now() >= deadline {
return None;
}
tokio::time::sleep(interval).await;
}
}
// ── Deploy main.py via mpremote ───────────────────────────────────────────────
/// Copy `main.py` to the Pico's MicroPython filesystem and soft-reset it.
///
/// After the UF2 is flashed the Pico reboots into MicroPython but has no
/// `main.py` on its internal filesystem. This function uses `mpremote` to
/// upload the bundled `main.py` and issue a reset so it starts executing
/// immediately.
///
/// Returns `Ok(())` on success or an error with a helpful fallback command.
pub async fn deploy_main_py(port: &Path, firmware_dir: &Path) -> Result<()> {
let main_py_src = firmware_dir.join("main.py");
let src_str = main_py_src.to_string_lossy().into_owned();
let port_str = port.to_string_lossy().into_owned();
if !main_py_src.exists() {
bail!(
"main.py not found at {} — run ensure_firmware_dir() first",
main_py_src.display()
);
}
tracing::info!(
src = %src_str,
port = %port_str,
"deploying main.py via mpremote"
);
let out = tokio::process::Command::new("mpremote")
.args([
"connect", &port_str, "cp", &src_str, ":main.py", "+", "reset",
])
.output()
.await;
match out {
Ok(o) if o.status.success() => {
tracing::info!("main.py deployed and Pico reset via mpremote");
Ok(())
}
Ok(o) => {
let stderr = String::from_utf8_lossy(&o.stderr);
bail!(
"mpremote failed (exit {}): {}.\n\
Run manually:\n mpremote connect {port_str} cp {src_str} :main.py + reset",
o.status,
stderr.trim()
)
}
Err(e) => {
bail!(
"mpremote not found or could not start ({e}).\n\
Install it with: pip install mpremote\n\
Then run: mpremote connect {port_str} cp {src_str} :main.py + reset"
)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pico_uf2_has_valid_magic() {
assert!(
PICO_UF2.len() >= 8,
"bundled UF2 too small ({} bytes) — replace with real MicroPython UF2",
PICO_UF2.len()
);
assert_eq!(
&PICO_UF2[..4],
&UF2_MAGIC1,
"bundled UF2 has wrong magic — replace with real MicroPython UF2 from \
https://micropython.org/download/RPI_PICO/"
);
}
#[test]
fn pico_main_py_is_non_empty() {
assert!(!PICO_MAIN_PY.is_empty(), "bundled main.py is empty");
}
#[test]
fn pico_main_py_contains_zeroclaw_marker() {
let src = std::str::from_utf8(PICO_MAIN_PY).expect("main.py is not valid UTF-8");
assert!(
src.contains("zeroclaw"),
"main.py should contain 'zeroclaw' firmware marker"
);
}
#[test]
fn find_rpi_rp2_mount_returns_none_when_not_connected() {
// This test runs on CI without a Pico attached — just verify it doesn't panic.
let _ = find_rpi_rp2_mount(); // may be Some or None depending on environment
}
}