fix(hardware): add missing serial/uf2/pico modules declared in mod.rs

cargo fmt was exiting with code 1 because mod.rs declared pub mod serial,
uf2, pico_flash, pico_code but those files were missing from the branch.
Also apply auto-formatting to loader.rs.
This commit is contained in:
ehushubhamshaw 2026-03-14 21:58:01 -04:00
parent eb518adb38
commit 2fbf4cd15c
5 changed files with 1679 additions and 4 deletions

View File

@ -152,12 +152,19 @@ fn load_one_plugin(plugin_dir: &Path, manifest_path: &Path) -> Result<LoadedPlug
}
// Validate binary path: must exist, be a regular file, and reside within plugin_dir.
let canonical_plugin_dir = plugin_dir
.canonicalize()
.map_err(|e| anyhow::anyhow!("cannot canonicalize plugin dir {}: {}", plugin_dir.display(), e))?;
let canonical_plugin_dir = plugin_dir.canonicalize().map_err(|e| {
anyhow::anyhow!(
"cannot canonicalize plugin dir {}: {}",
plugin_dir.display(),
e
)
})?;
let raw_binary_path = plugin_dir.join(&manifest.exec.binary);
if !raw_binary_path.exists() {
anyhow::bail!("manifest exec binary not found: {}", raw_binary_path.display());
anyhow::bail!(
"manifest exec binary not found: {}",
raw_binary_path.display()
);
}
let binary_path = raw_binary_path.canonicalize().map_err(|e| {
anyhow::anyhow!(

723
src/hardware/pico_code.rs Normal file
View File

@ -0,0 +1,723 @@
//! Phase 7 — Dynamic code tools: `device_read_code`, `device_write_code`, `device_exec`.
//!
//! These tools let the LLM read, write, and execute code on any connected
//! hardware device. The `DeviceRuntime` on each device determines which
//! host-side tooling is used:
//!
//! - **MicroPython / CircuitPython** — `mpremote` for code read/write/exec.
//! - **Arduino / Nucleus / Linux** — not yet implemented; returns a clear error.
//!
//! When the `device` parameter is omitted, each tool auto-selects the device
//! only when **exactly one** device is registered. If multiple devices are
//! present the tool returns an error and requires an explicit `device` parameter.
use super::device::{DeviceRegistry, DeviceRuntime};
use crate::tools::traits::{Tool, ToolResult};
use async_trait::async_trait;
use serde_json::json;
use std::sync::Arc;
use tokio::sync::RwLock;
/// Default timeout for `mpremote` operations (seconds).
const MPREMOTE_TIMEOUT_SECS: u64 = 30;
/// Maximum time to wait for the serial port after a reset (seconds).
const PORT_WAIT_SECS: u64 = 15;
/// Polling interval when waiting for a serial port (ms).
const PORT_POLL_MS: u64 = 200;
// ── helpers ───────────────────────────────────────────────────────────────────
/// Resolve the serial port path and runtime for a device.
///
/// If `device_alias` is provided, look it up; otherwise auto-selects the device
/// only when exactly one device is registered. With multiple devices present,
/// returns an error requiring an explicit alias.
/// Returns `(alias, port, runtime)` or an error `ToolResult`.
async fn resolve_device_port(
registry: &RwLock<DeviceRegistry>,
device_alias: Option<&str>,
) -> Result<(String, String, DeviceRuntime), ToolResult> {
let reg = registry.read().await;
let alias: String = match device_alias {
Some(a) => a.to_string(),
None => {
// Auto-select the first device.
let all_aliases: Vec<String> =
reg.aliases().into_iter().map(|a| a.to_string()).collect();
match all_aliases.as_slice() {
[single] => single.clone(),
[] => {
return Err(ToolResult {
success: false,
output: String::new(),
error: Some("no device found — is a board connected via USB?".to_string()),
});
}
multiple => {
return Err(ToolResult {
success: false,
output: String::new(),
error: Some(format!(
"multiple devices found ({}); specify the \"device\" parameter",
multiple.join(", ")
)),
});
}
}
}
};
let device = reg.get_device(&alias).ok_or_else(|| ToolResult {
success: false,
output: String::new(),
error: Some(format!("device '{alias}' not found in registry")),
})?;
let runtime = device.runtime;
let port = device.port().ok_or_else(|| ToolResult {
success: false,
output: String::new(),
error: Some(format!(
"device '{alias}' has no serial port — is it connected?"
)),
})?;
Ok((alias, port.to_string(), runtime))
}
/// Return an unsupported-runtime error `ToolResult` for a given tool name.
fn unsupported_runtime(runtime: &DeviceRuntime, tool: &str) -> ToolResult {
ToolResult {
success: false,
output: String::new(),
error: Some(format!(
"{runtime} runtime is not yet supported for {tool} — coming soon"
)),
}
}
/// Run an `mpremote` command with a timeout and return (stdout, stderr).
async fn run_mpremote(args: &[&str], timeout_secs: u64) -> Result<(String, String), String> {
use tokio::time::timeout;
let result = timeout(
std::time::Duration::from_secs(timeout_secs),
tokio::process::Command::new("mpremote").args(args).output(),
)
.await;
match result {
Ok(Ok(output)) => {
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if output.status.success() {
Ok((stdout, stderr))
} else {
Err(format!(
"mpremote failed (exit {}): {}",
output.status,
stderr.trim()
))
}
}
Ok(Err(e)) => Err(format!(
"mpremote not found or could not start ({e}). \
Install it with: pip install mpremote"
)),
Err(_) => Err(format!(
"mpremote timed out after {timeout_secs}s — \
the device may be unresponsive"
)),
}
}
// ── DeviceReadCodeTool ────────────────────────────────────────────────────────
/// Tool: read the current `main.py` from a connected device.
///
/// The LLM uses this to understand the current program before modifying it.
pub struct DeviceReadCodeTool {
registry: Arc<RwLock<DeviceRegistry>>,
}
impl DeviceReadCodeTool {
pub fn new(registry: Arc<RwLock<DeviceRegistry>>) -> Self {
Self { registry }
}
}
#[async_trait]
impl Tool for DeviceReadCodeTool {
fn name(&self) -> &str {
"device_read_code"
}
fn description(&self) -> &str {
"Read the current program (main.py) running on a connected device. \
Use this before writing new code so you understand the current state."
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"additionalProperties": false,
"properties": {
"device": {
"type": "string",
"description": "Device alias e.g. pico0, esp0. Auto-selected if only one device is connected."
}
},
"required": []
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
let device_alias = args.get("device").and_then(|v| v.as_str());
let (alias, port, runtime) = match resolve_device_port(&self.registry, device_alias).await {
Ok(v) => v,
Err(tool_result) => return Ok(tool_result),
};
// Runtime dispatch.
match runtime {
DeviceRuntime::MicroPython | DeviceRuntime::CircuitPython => {}
other => return Ok(unsupported_runtime(&other, "device_read_code")),
}
tracing::info!(alias = %alias, port = %port, runtime = %runtime, "reading main.py from device");
match run_mpremote(
&["connect", &port, "cat", ":main.py"],
MPREMOTE_TIMEOUT_SECS,
)
.await
{
Ok((stdout, _stderr)) => Ok(ToolResult {
success: true,
output: if stdout.trim().is_empty() {
format!("main.py on {alias} is empty or not found.")
} else {
format!(
"Current main.py on {alias}:\n\n```python\n{}\n```",
stdout.trim()
)
},
error: None,
}),
Err(e) => {
// mpremote cat fails if main.py doesn't exist — not a fatal error.
if e.contains("OSError") || e.contains("no such file") || e.contains("ENOENT") {
Ok(ToolResult {
success: true,
output: format!(
"No main.py found on {alias} — the device has no program yet."
),
error: None,
})
} else {
Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("Failed to read code from {alias}: {e}")),
})
}
}
}
}
}
// ── DeviceWriteCodeTool ───────────────────────────────────────────────────────
/// Tool: write a complete program to a device as `main.py`.
///
/// This replaces the current `main.py` on the device and resets it so the new
/// program starts executing immediately.
pub struct DeviceWriteCodeTool {
registry: Arc<RwLock<DeviceRegistry>>,
}
impl DeviceWriteCodeTool {
pub fn new(registry: Arc<RwLock<DeviceRegistry>>) -> Self {
Self { registry }
}
}
#[async_trait]
impl Tool for DeviceWriteCodeTool {
fn name(&self) -> &str {
"device_write_code"
}
fn description(&self) -> &str {
"Write a complete program to a device — replaces main.py and restarts the device. \
Always read the current code first with device_read_code."
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"additionalProperties": false,
"properties": {
"device": {
"type": "string",
"description": "Device alias e.g. pico0, esp0. Auto-selected if only one device is connected."
},
"code": {
"type": "string",
"description": "Complete program to write as main.py"
}
},
"required": ["code"]
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
let code = match args.get("code").and_then(|v| v.as_str()) {
Some(c) => c,
None => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("missing required parameter: code".to_string()),
});
}
};
if code.trim().is_empty() {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("code parameter is empty — provide a program to write".to_string()),
});
}
let device_alias = args.get("device").and_then(|v| v.as_str());
let (alias, port, runtime) = match resolve_device_port(&self.registry, device_alias).await {
Ok(v) => v,
Err(tool_result) => return Ok(tool_result),
};
// Runtime dispatch.
match runtime {
DeviceRuntime::MicroPython | DeviceRuntime::CircuitPython => {}
other => return Ok(unsupported_runtime(&other, "device_write_code")),
}
tracing::info!(alias = %alias, port = %port, runtime = %runtime, code_len = code.len(), "writing main.py to device");
// Write code to an atomic, owner-only temp file via tempfile crate.
let named_tmp = match tokio::task::spawn_blocking(|| {
tempfile::Builder::new()
.prefix("zeroclaw_main_")
.suffix(".py")
.tempfile()
})
.await
{
Ok(Ok(f)) => f,
Ok(Err(e)) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("failed to create temp file: {e}")),
});
}
Err(e) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("temp file task failed: {e}")),
});
}
};
let tmp_path = named_tmp.path().to_path_buf();
let tmp_str = tmp_path.to_string_lossy().to_string();
if let Err(e) = tokio::fs::write(&tmp_path, code).await {
// named_tmp dropped here — auto-removes the file.
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("failed to write temp file: {e}")),
});
}
// Deploy via mpremote: copy + reset.
let result = run_mpremote(
&["connect", &port, "cp", &tmp_str, ":main.py", "+", "reset"],
MPREMOTE_TIMEOUT_SECS,
)
.await;
// Explicit cleanup — log if removal fails rather than silently ignoring.
if let Err(e) = named_tmp.close() {
tracing::warn!(path = %tmp_str, err = %e, "failed to clean up temp file");
}
match result {
Ok((_stdout, _stderr)) => {
tracing::info!(alias = %alias, "main.py deployed and device reset");
// Wait for the serial port to reappear after reset.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let port_reappeared = wait_for_port(
&port,
std::time::Duration::from_secs(PORT_WAIT_SECS),
std::time::Duration::from_millis(PORT_POLL_MS),
)
.await;
if port_reappeared {
Ok(ToolResult {
success: true,
output: format!(
"Code deployed to {alias} — main.py updated and device reset. \
{alias} is back online."
),
error: None,
})
} else {
Ok(ToolResult {
success: true,
output: format!(
"Code deployed to {alias} — main.py updated and device reset. \
Note: serial port did not reappear within {PORT_WAIT_SECS}s; \
the device may still be booting."
),
error: None,
})
}
}
Err(e) => Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("Failed to deploy code to {alias}: {e}")),
}),
}
}
}
// ── DeviceExecTool ────────────────────────────────────────────────────────────
/// Tool: run a one-off code snippet on a device without modifying `main.py`.
///
/// Good for one-time commands, sensor reads, and testing code before committing.
pub struct DeviceExecTool {
registry: Arc<RwLock<DeviceRegistry>>,
}
impl DeviceExecTool {
pub fn new(registry: Arc<RwLock<DeviceRegistry>>) -> Self {
Self { registry }
}
}
#[async_trait]
impl Tool for DeviceExecTool {
fn name(&self) -> &str {
"device_exec"
}
fn description(&self) -> &str {
"Execute a code snippet on a connected device without modifying main.py. \
Good for one-time actions, sensor reads, and testing before writing permanent code."
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"additionalProperties": false,
"properties": {
"device": {
"type": "string",
"description": "Device alias e.g. pico0, esp0. Auto-selected if only one device is connected."
},
"code": {
"type": "string",
"description": "Code to execute. Output is captured and returned."
}
},
"required": ["code"]
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
let code = match args.get("code").and_then(|v| v.as_str()) {
Some(c) => c,
None => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("missing required parameter: code".to_string()),
});
}
};
if code.trim().is_empty() {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(
"code parameter is empty — provide a code snippet to execute".to_string(),
),
});
}
let device_alias = args.get("device").and_then(|v| v.as_str());
let (alias, port, runtime) = match resolve_device_port(&self.registry, device_alias).await {
Ok(v) => v,
Err(tool_result) => return Ok(tool_result),
};
// Runtime dispatch.
match runtime {
DeviceRuntime::MicroPython | DeviceRuntime::CircuitPython => {}
other => return Ok(unsupported_runtime(&other, "device_exec")),
}
tracing::info!(alias = %alias, port = %port, runtime = %runtime, code_len = code.len(), "executing snippet on device");
// Write snippet to an atomic, owner-only temp file via tempfile crate.
let named_tmp = match tokio::task::spawn_blocking(|| {
tempfile::Builder::new()
.prefix("zeroclaw_exec_")
.suffix(".py")
.tempfile()
})
.await
{
Ok(Ok(f)) => f,
Ok(Err(e)) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("failed to create temp file: {e}")),
});
}
Err(e) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("temp file task failed: {e}")),
});
}
};
let tmp_path = named_tmp.path().to_path_buf();
let tmp_str = tmp_path.to_string_lossy().to_string();
if let Err(e) = tokio::fs::write(&tmp_path, code).await {
// named_tmp dropped here — auto-removes the file.
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("failed to write temp file: {e}")),
});
}
// Execute via mpremote run (does NOT modify main.py).
let result =
run_mpremote(&["connect", &port, "run", &tmp_str], MPREMOTE_TIMEOUT_SECS).await;
// Explicit cleanup — log if removal fails rather than silently ignoring.
if let Err(e) = named_tmp.close() {
tracing::warn!(path = %tmp_str, err = %e, "failed to clean up temp file");
}
match result {
Ok((stdout, stderr)) => {
let output = if stdout.trim().is_empty() && !stderr.trim().is_empty() {
// Some MicroPython output goes to stderr (e.g. exceptions).
stderr.trim().to_string()
} else {
stdout.trim().to_string()
};
Ok(ToolResult {
success: true,
output: if output.is_empty() {
format!("Code executed on {alias} — no output produced.")
} else {
format!("Output from {alias}:\n{output}")
},
error: None,
})
}
Err(e) => Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("Failed to execute code on {alias}: {e}")),
}),
}
}
}
// ── port wait helper ──────────────────────────────────────────────────────────
/// Poll for a specific serial port to reappear after a device reset.
///
/// Returns `true` if the port exists within the timeout, `false` otherwise.
async fn wait_for_port(
port_path: &str,
timeout: std::time::Duration,
interval: std::time::Duration,
) -> bool {
let deadline = tokio::time::Instant::now() + timeout;
while tokio::time::Instant::now() < deadline {
if std::path::Path::new(port_path).exists() {
return true;
}
tokio::time::sleep(interval).await;
}
false
}
/// Factory function: create all Phase 7 dynamic code tools.
pub fn device_code_tools(registry: Arc<RwLock<DeviceRegistry>>) -> Vec<Box<dyn Tool>> {
vec![
Box::new(DeviceReadCodeTool::new(registry.clone())),
Box::new(DeviceWriteCodeTool::new(registry.clone())),
Box::new(DeviceExecTool::new(registry)),
]
}
#[cfg(test)]
mod tests {
use super::*;
fn empty_registry() -> Arc<RwLock<DeviceRegistry>> {
Arc::new(RwLock::new(DeviceRegistry::new()))
}
// ── DeviceReadCodeTool ───────────────────────────────────────────
#[test]
fn device_read_code_name() {
let tool = DeviceReadCodeTool::new(empty_registry());
assert_eq!(tool.name(), "device_read_code");
}
#[test]
fn device_read_code_schema_valid() {
let tool = DeviceReadCodeTool::new(empty_registry());
let schema = tool.parameters_schema();
assert_eq!(schema["type"], "object");
assert!(schema["properties"]["device"].is_object());
}
#[tokio::test]
async fn device_read_code_no_device_returns_error() {
let tool = DeviceReadCodeTool::new(empty_registry());
let result = tool.execute(json!({})).await.unwrap();
assert!(!result.success);
assert!(
result.error.as_deref().unwrap_or("").contains("no device"),
"expected 'no device' error; got: {:?}",
result.error
);
}
// ── DeviceWriteCodeTool ──────────────────────────────────────────
#[test]
fn device_write_code_name() {
let tool = DeviceWriteCodeTool::new(empty_registry());
assert_eq!(tool.name(), "device_write_code");
}
#[test]
fn device_write_code_schema_requires_code() {
let tool = DeviceWriteCodeTool::new(empty_registry());
let schema = tool.parameters_schema();
let required = schema["required"].as_array().expect("required array");
assert!(
required.iter().any(|v| v.as_str() == Some("code")),
"code should be required"
);
}
#[tokio::test]
async fn device_write_code_empty_code_rejected() {
let tool = DeviceWriteCodeTool::new(empty_registry());
let result = tool.execute(json!({"code": ""})).await.unwrap();
assert!(!result.success);
assert!(result.error.as_deref().unwrap_or("").contains("empty"));
}
#[tokio::test]
async fn device_write_code_no_device_returns_error() {
let tool = DeviceWriteCodeTool::new(empty_registry());
let result = tool
.execute(json!({"code": "print('hello')"}))
.await
.unwrap();
assert!(!result.success);
assert!(result.error.as_deref().unwrap_or("").contains("no device"),);
}
// ── DeviceExecTool ───────────────────────────────────────────────
#[test]
fn device_exec_name() {
let tool = DeviceExecTool::new(empty_registry());
assert_eq!(tool.name(), "device_exec");
}
#[test]
fn device_exec_schema_requires_code() {
let tool = DeviceExecTool::new(empty_registry());
let schema = tool.parameters_schema();
let required = schema["required"].as_array().expect("required array");
assert!(
required.iter().any(|v| v.as_str() == Some("code")),
"code should be required"
);
}
#[tokio::test]
async fn device_exec_empty_code_rejected() {
let tool = DeviceExecTool::new(empty_registry());
let result = tool.execute(json!({"code": " "})).await.unwrap();
assert!(!result.success);
assert!(result.error.as_deref().unwrap_or("").contains("empty"));
}
#[tokio::test]
async fn device_exec_no_device_returns_error() {
let tool = DeviceExecTool::new(empty_registry());
let result = tool.execute(json!({"code": "print(1+1)"})).await.unwrap();
assert!(!result.success);
assert!(result.error.as_deref().unwrap_or("").contains("no device"),);
}
// ── Factory ──────────────────────────────────────────────────────
#[test]
fn factory_returns_three_tools() {
let reg = empty_registry();
let tools = device_code_tools(reg);
assert_eq!(tools.len(), 3);
let names: Vec<&str> = tools.iter().map(|t| t.name()).collect();
assert!(names.contains(&"device_read_code"));
assert!(names.contains(&"device_write_code"));
assert!(names.contains(&"device_exec"));
}
#[test]
fn all_specs_valid() {
let reg = empty_registry();
let tools = device_code_tools(reg);
for tool in &tools {
let spec = tool.spec();
assert!(!spec.name.is_empty());
assert!(!spec.description.is_empty());
assert_eq!(spec.parameters["type"], "object");
}
}
}

296
src/hardware/pico_flash.rs Normal file
View File

@ -0,0 +1,296 @@
//! `pico_flash` tool — flash ZeroClaw firmware to a Pico in BOOTSEL mode.
//!
//! # Happy path
//! 1. User holds BOOTSEL while plugging in Pico → RPI-RP2 drive appears.
//! 2. User asks "flash my pico".
//! 3. LLM calls `pico_flash(confirm=true)`.
//! 4. Tool copies UF2 to RPI-RP2 drive; Pico reboots into MicroPython.
//! 5. Tool waits up to 20 s for `/dev/cu.usbmodem*` to appear.
//! 6. Tool deploys `main.py` via `mpremote` and resets the Pico.
//! 7. Tool waits for the serial port to reappear after reset.
//! 8. Tool returns success; user restarts ZeroClaw to get `pico0`.
use super::device::DeviceRegistry;
use super::uf2;
use crate::tools::traits::{Tool, ToolResult};
use async_trait::async_trait;
use serde_json::json;
use std::sync::Arc;
use tokio::sync::RwLock;
/// How long to wait for the Pico serial port after flashing (seconds).
const PORT_WAIT_SECS: u64 = 20;
/// How often to poll for the serial port.
const PORT_POLL_MS: u64 = 500;
// ── PicoFlashTool ─────────────────────────────────────────────────────────────
/// Tool: flash ZeroClaw MicroPython firmware to a Pico in BOOTSEL mode.
///
/// The Pico must be connected with BOOTSEL held so it mounts as `RPI-RP2`.
/// After flashing, the tool deploys `main.py` via `mpremote`, then reconnects
/// the serial transport in the [`DeviceRegistry`] so subsequent `gpio_write`
/// calls work immediately without restarting ZeroClaw.
pub struct PicoFlashTool {
registry: Arc<RwLock<DeviceRegistry>>,
}
impl PicoFlashTool {
pub fn new(registry: Arc<RwLock<DeviceRegistry>>) -> Self {
Self { registry }
}
}
#[async_trait]
impl Tool for PicoFlashTool {
fn name(&self) -> &str {
"pico_flash"
}
fn description(&self) -> &str {
"Flash ZeroClaw firmware to a Raspberry Pi Pico in BOOTSEL mode. \
The Pico must be connected with the BOOTSEL button held (shows as RPI-RP2 drive in Finder). \
After flashing the Pico reboots, main.py is deployed, and the serial \
connection is refreshed automatically no restart needed."
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"confirm": {
"type": "boolean",
"description": "Set to true to confirm flashing the Pico firmware"
}
},
"required": ["confirm"]
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
// ── 1. Require explicit confirmation ──────────────────────────────
let confirmed = args
.get("confirm")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if !confirmed {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(
"Set confirm=true to proceed with flashing. \
This will overwrite the firmware on the connected Pico."
.to_string(),
),
});
}
// ── 2. Detect BOOTSEL-mode Pico ───────────────────────────────────
let mount = match uf2::find_rpi_rp2_mount() {
Some(m) => m,
None => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(
"No Pico in BOOTSEL mode found (RPI-RP2 drive not detected). \
Hold the BOOTSEL button while plugging the Pico in via USB, \
then try again."
.to_string(),
),
});
}
};
tracing::info!(mount = %mount.display(), "RPI-RP2 volume found");
// ── 3. Ensure firmware files are extracted ────────────────────────
let firmware_dir = match uf2::ensure_firmware_dir() {
Ok(d) => d,
Err(e) => {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("firmware error: {e}")),
});
}
};
// ── 4. Flash UF2 ─────────────────────────────────────────────────
if let Err(e) = uf2::flash_uf2(&mount, &firmware_dir).await {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("flash failed: {e}")),
});
}
// ── 5. Wait for serial port to appear ─────────────────────────────
let port = uf2::wait_for_serial_port(
std::time::Duration::from_secs(PORT_WAIT_SECS),
std::time::Duration::from_millis(PORT_POLL_MS),
)
.await;
let port = match port {
Some(p) => p,
None => {
// Flash likely succeeded even if port didn't appear in time —
// some host systems are slower to enumerate the new port.
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!(
"UF2 copied to {} but serial port did not appear within {PORT_WAIT_SECS}s. \
Unplug and replug the Pico, then run:\n \
mpremote connect <port> cp ~/.zeroclaw/firmware/pico/main.py :main.py + reset",
mount.display()
)),
});
}
};
tracing::info!(port = %port.display(), "Pico serial port online after UF2 flash");
// ── 6. Deploy main.py via mpremote ────────────────────────────────
if let Err(e) = uf2::deploy_main_py(&port, &firmware_dir).await {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!("main.py deploy failed: {e}")),
});
}
// ── 7. Wait for serial port after mpremote reset ──────────────────
//
// mpremote resets the Pico so the serial port disappears briefly.
// Give the OS a moment to drop the old entry before polling.
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let final_port = uf2::wait_for_serial_port(
std::time::Duration::from_secs(PORT_WAIT_SECS),
std::time::Duration::from_millis(PORT_POLL_MS),
)
.await;
// ── 8. Reconnect serial transport in DeviceRegistry ──────────────
//
// The old transport still points at a stale port handle from before
// the flash. Reconnect so gpio_write works immediately.
let reconnect_result = match &final_port {
Some(p) => {
let port_str = p.to_string_lossy();
let mut reg = self.registry.write().await;
// Try to find a pico alias in the registry.
match reg.aliases().into_iter().find(|a| a.starts_with("pico")) {
Some(a) => {
let alias = a.to_string();
reg.reconnect(&alias, Some(&port_str)).await
}
None => Err(anyhow::anyhow!(
"no pico alias found in registry; cannot reconnect transport"
)),
}
}
None => Err(anyhow::anyhow!("no serial port to reconnect")),
};
// ── 9. Return result ──────────────────────────────────────────────
match final_port {
Some(p) => {
let port_str = p.display().to_string();
let reconnected = reconnect_result.is_ok();
if reconnected {
tracing::info!(port = %port_str, "Pico online with main.py — transport reconnected");
} else {
let err = reconnect_result.unwrap_err();
tracing::warn!(port = %port_str, err = %err, "Pico online but reconnect failed");
}
let suffix = if reconnected {
"pico0 is ready — you can use gpio_write immediately."
} else {
"Restart ZeroClaw to reconnect as pico0."
};
Ok(ToolResult {
success: true,
output: format!(
"Pico flashed and main.py deployed successfully. \
Firmware is online at {port_str}. {suffix}"
),
error: None,
})
}
None => Ok(ToolResult {
success: true,
output: format!(
"Pico flashed and main.py deployed. \
Serial port did not reappear within {PORT_WAIT_SECS}s after reset \
unplug and replug the Pico, then restart ZeroClaw to connect as pico0."
),
error: None,
}),
}
}
}
#[cfg(test)]
mod tests {
use super::super::device::DeviceRegistry;
use super::*;
fn tool() -> PicoFlashTool {
let registry = Arc::new(RwLock::new(DeviceRegistry::new()));
PicoFlashTool::new(registry)
}
#[test]
fn name_is_pico_flash() {
let t = tool();
assert_eq!(t.name(), "pico_flash");
}
#[test]
fn schema_requires_confirm() {
let schema = tool().parameters_schema();
let required = schema["required"].as_array().expect("required array");
assert!(
required.iter().any(|v| v.as_str() == Some("confirm")),
"confirm should be required"
);
}
#[tokio::test]
async fn execute_without_confirm_returns_error() {
let result = tool()
.execute(serde_json::json!({"confirm": false}))
.await
.unwrap();
assert!(!result.success);
assert!(result.error.is_some());
let err = result.error.unwrap();
assert!(
err.contains("confirm=true"),
"error should mention confirm=true; got: {err}"
);
}
#[tokio::test]
async fn execute_missing_confirm_returns_error() {
let result = tool().execute(serde_json::json!({})).await.unwrap();
assert!(!result.success);
}
#[tokio::test]
async fn execute_with_confirm_true_but_no_pico_returns_error() {
// In CI there's no Pico attached — the tool should report missing device, not panic.
let result = tool()
.execute(serde_json::json!({"confirm": true}))
.await
.unwrap();
// Either success (if a Pico happens to be connected) or the BOOTSEL error.
// What must NOT happen: panic or anyhow error propagation.
let _ = result; // just verify it didn't panic
}
}

298
src/hardware/serial.rs Normal file
View File

@ -0,0 +1,298 @@
//! Hardware serial transport — newline-delimited JSON over USB CDC.
//!
//! Implements the [`Transport`] trait with **lazy port opening**: the port is
//! opened for each `send()` call and closed immediately after the response is
//! received. This means multiple tools can use the same device path without
//! one holding the port exclusively.
//!
//! Wire protocol (ZeroClaw serial JSON):
//! ```text
//! Host → Device: {"cmd":"gpio_write","params":{"pin":25,"value":1}}\n
//! Device → Host: {"ok":true,"data":{"pin":25,"value":1,"state":"HIGH"}}\n
//! ```
//!
//! All I/O is wrapped in `tokio::time::timeout` — no blocking reads.
use super::{
protocol::{ZcCommand, ZcResponse},
transport::{Transport, TransportError, TransportKind},
};
use async_trait::async_trait;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio_serial::SerialPortBuilderExt;
/// Default timeout for a single send→receive round-trip (seconds).
const SEND_TIMEOUT_SECS: u64 = 5;
/// Default baud rate for ZeroClaw serial devices.
pub const DEFAULT_BAUD: u32 = 115_200;
/// Timeout for the ping handshake during device discovery (milliseconds).
const PING_TIMEOUT_MS: u64 = 300;
/// Allowed serial device path prefixes — reject arbitrary paths for security.
/// Uses the shared allowlist from `crate::util`.
use crate::util::is_serial_path_allowed as is_path_allowed;
/// Serial transport for ZeroClaw hardware devices.
///
/// The port is **opened lazily** on each `send()` call and released immediately
/// after the response is read. This avoids exclusive-hold conflicts between
/// multiple tools or processes.
pub struct HardwareSerialTransport {
port_path: String,
baud_rate: u32,
}
impl HardwareSerialTransport {
/// Create a new lazy-open serial transport.
///
/// Does NOT open the port — that happens on the first `send()` call.
pub fn new(port_path: impl Into<String>, baud_rate: u32) -> Self {
Self {
port_path: port_path.into(),
baud_rate,
}
}
/// Create with the default baud rate (115 200).
pub fn with_default_baud(port_path: impl Into<String>) -> Self {
Self::new(port_path, DEFAULT_BAUD)
}
/// Port path this transport is bound to.
pub fn port_path(&self) -> &str {
&self.port_path
}
/// Attempt a ping handshake to verify ZeroClaw firmware is running.
///
/// Opens the port, sends `{"cmd":"ping","params":{}}`, waits up to
/// `PING_TIMEOUT_MS` for a response with `data.firmware == "zeroclaw"`.
///
/// Returns `true` if a ZeroClaw device responds, `false` otherwise.
/// This method never returns an error — discovery must not hang on failure.
pub async fn ping_handshake(&self) -> bool {
let ping = ZcCommand::simple("ping");
let json = match serde_json::to_string(&ping) {
Ok(j) => j,
Err(_) => return false,
};
let result = tokio::time::timeout(
std::time::Duration::from_millis(PING_TIMEOUT_MS),
do_send(&self.port_path, self.baud_rate, &json),
)
.await;
match result {
Ok(Ok(resp)) => {
// Accept if firmware field is "zeroclaw" (in data or top-level)
resp.ok
&& resp
.data
.get("firmware")
.and_then(|v| v.as_str())
.map(|s| s == "zeroclaw")
.unwrap_or(false)
}
_ => false,
}
}
}
#[async_trait]
impl Transport for HardwareSerialTransport {
async fn send(&self, cmd: &ZcCommand) -> Result<ZcResponse, TransportError> {
if !is_path_allowed(&self.port_path) {
return Err(TransportError::Other(format!(
"serial path not allowed: {}",
self.port_path
)));
}
let json = serde_json::to_string(cmd)
.map_err(|e| TransportError::Protocol(format!("failed to serialize command: {e}")))?;
// Log command name only — never log the full payload (may contain large or sensitive data).
tracing::info!(port = %self.port_path, cmd = %cmd.cmd, "serial send");
tokio::time::timeout(
std::time::Duration::from_secs(SEND_TIMEOUT_SECS),
do_send(&self.port_path, self.baud_rate, &json),
)
.await
.map_err(|_| TransportError::Timeout(SEND_TIMEOUT_SECS))?
}
fn kind(&self) -> TransportKind {
TransportKind::Serial
}
fn is_connected(&self) -> bool {
// Lightweight connectivity check: the device file must exist.
std::path::Path::new(&self.port_path).exists()
}
}
/// Open the port, write the command, read one response line, return the parsed response.
///
/// This is the inner function wrapped with `tokio::time::timeout` by the caller.
/// Do NOT add a timeout here — the outer caller owns the deadline.
async fn do_send(path: &str, baud: u32, json: &str) -> Result<ZcResponse, TransportError> {
// Open port lazily — released when this function returns
let mut port = tokio_serial::new(path, baud)
.open_native_async()
.map_err(|e| {
// Match on the error kind for robust cross-platform disconnect detection.
match e.kind {
tokio_serial::ErrorKind::NoDevice => TransportError::Disconnected,
tokio_serial::ErrorKind::Io(io_kind) if io_kind == std::io::ErrorKind::NotFound => {
TransportError::Disconnected
}
_ => TransportError::Other(format!("failed to open {path}: {e}")),
}
})?;
// Write command line
port.write_all(format!("{json}\n").as_bytes())
.await
.map_err(TransportError::Io)?;
port.flush().await.map_err(TransportError::Io)?;
// Read response line — port is moved into BufReader; write phase complete
let mut reader = BufReader::new(port);
let mut response_line = String::new();
reader
.read_line(&mut response_line)
.await
.map_err(|e: std::io::Error| {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
TransportError::Disconnected
} else {
TransportError::Io(e)
}
})?;
let trimmed = response_line.trim();
if trimmed.is_empty() {
return Err(TransportError::Protocol(
"empty response from device".to_string(),
));
}
serde_json::from_str(trimmed).map_err(|e| {
TransportError::Protocol(format!("invalid JSON response: {e} — got: {trimmed:?}"))
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn serial_transport_new_stores_path_and_baud() {
let t = HardwareSerialTransport::new("/dev/ttyACM0", 115_200);
assert_eq!(t.port_path(), "/dev/ttyACM0");
assert_eq!(t.baud_rate, 115_200);
}
#[test]
fn serial_transport_default_baud() {
let t = HardwareSerialTransport::with_default_baud("/dev/ttyACM0");
assert_eq!(t.baud_rate, DEFAULT_BAUD);
}
#[test]
fn serial_transport_kind_is_serial() {
let t = HardwareSerialTransport::with_default_baud("/dev/ttyACM0");
assert_eq!(t.kind(), TransportKind::Serial);
}
#[test]
fn is_connected_false_for_nonexistent_path() {
let t = HardwareSerialTransport::with_default_baud("/dev/ttyACM_does_not_exist_99");
assert!(!t.is_connected());
}
#[test]
fn allowed_paths_accept_valid_prefixes() {
// Linux-only paths
#[cfg(target_os = "linux")]
{
assert!(is_path_allowed("/dev/ttyACM0"));
assert!(is_path_allowed("/dev/ttyUSB1"));
}
// macOS-only paths
#[cfg(target_os = "macos")]
{
assert!(is_path_allowed("/dev/tty.usbmodem14101"));
assert!(is_path_allowed("/dev/cu.usbmodem14201"));
assert!(is_path_allowed("/dev/tty.usbserial-1410"));
assert!(is_path_allowed("/dev/cu.usbserial-1410"));
}
// Windows-only paths
#[cfg(target_os = "windows")]
assert!(is_path_allowed("COM3"));
// Cross-platform: macOS paths always work on macOS, Linux paths on Linux
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
{
assert!(is_path_allowed("/dev/ttyACM0"));
assert!(is_path_allowed("/dev/tty.usbmodem14101"));
assert!(is_path_allowed("COM3"));
}
}
#[test]
fn allowed_paths_reject_invalid_prefixes() {
assert!(!is_path_allowed("/dev/sda"));
assert!(!is_path_allowed("/etc/passwd"));
assert!(!is_path_allowed("/tmp/evil"));
assert!(!is_path_allowed(""));
}
#[tokio::test]
async fn send_rejects_disallowed_path() {
let t = HardwareSerialTransport::new("/dev/sda", 115_200);
let result = t.send(&ZcCommand::simple("ping")).await;
assert!(matches!(result, Err(TransportError::Other(_))));
}
#[tokio::test]
async fn send_returns_disconnected_for_missing_device() {
// Use a platform-appropriate path that passes the serialpath allowlist
// but refers to a device that doesn't actually exist.
#[cfg(target_os = "linux")]
let path = "/dev/ttyACM_phase2_test_99";
#[cfg(target_os = "macos")]
let path = "/dev/tty.usbmodemfake9900";
#[cfg(target_os = "windows")]
let path = "COM99";
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
let path = "/dev/ttyACM_phase2_test_99";
let t = HardwareSerialTransport::new(path, 115_200);
let result = t.send(&ZcCommand::simple("ping")).await;
// Missing device → Disconnected or Timeout (system-dependent)
assert!(
matches!(
result,
Err(TransportError::Disconnected | TransportError::Timeout(_))
),
"expected Disconnected or Timeout, got {result:?}"
);
}
#[tokio::test]
async fn ping_handshake_returns_false_for_missing_device() {
#[cfg(target_os = "linux")]
let path = "/dev/ttyACM_phase2_test_99";
#[cfg(target_os = "macos")]
let path = "/dev/tty.usbmodemfake9900";
#[cfg(target_os = "windows")]
let path = "COM99";
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
let path = "/dev/ttyACM_phase2_test_99";
let t = HardwareSerialTransport::new(path, 115_200);
assert!(!t.ping_handshake().await);
}
}

351
src/hardware/uf2.rs Normal file
View File

@ -0,0 +1,351 @@
//! UF2 flashing support — detect BOOTSEL-mode Pico and deploy firmware.
//!
//! # Workflow
//! 1. [`find_rpi_rp2_mount`] — check well-known mount points for the RPI-RP2 volume
//! that appears when a Pico is held in BOOTSEL mode.
//! 2. [`ensure_firmware_dir`] — extract the bundled firmware files to
//! `~/.zeroclaw/firmware/pico/` if they aren't there yet.
//! 3. [`flash_uf2`] — copy the UF2 to the mount point; the Pico reboots automatically.
//!
//! # Embedded assets
//! Both firmware files are compiled into the binary with `include_bytes!` so
//! users never need to download them separately.
use anyhow::{bail, Result};
use std::path::{Path, PathBuf};
// ── Embedded firmware ─────────────────────────────────────────────────────────
/// MicroPython UF2 binary — copied to RPI-RP2 to install the base runtime.
const PICO_UF2: &[u8] = include_bytes!("../firmware/pico/zeroclaw-pico.uf2");
/// ZeroClaw serial protocol handler — written to the Pico after MicroPython boots.
pub const PICO_MAIN_PY: &[u8] = include_bytes!("../firmware/pico/main.py");
/// UF2 magic word 1 (little-endian bytes at offset 0 of every UF2 block).
const UF2_MAGIC1: [u8; 4] = [0x55, 0x46, 0x32, 0x0A];
// ── Volume detection ──────────────────────────────────────────────────────────
/// Find the RPI-RP2 mount point if a Pico is connected in BOOTSEL mode.
///
/// Checks:
/// - macOS: `/Volumes/RPI-RP2`
/// - Linux: `/media/*/RPI-RP2` and `/run/media/*/RPI-RP2`
pub fn find_rpi_rp2_mount() -> Option<PathBuf> {
// macOS
let mac = PathBuf::from("/Volumes/RPI-RP2");
if mac.exists() {
return Some(mac);
}
// Linux — /media/<user>/RPI-RP2 or /run/media/<user>/RPI-RP2
for base in &["/media", "/run/media"] {
if let Ok(entries) = std::fs::read_dir(base) {
for entry in entries.flatten() {
let candidate = entry.path().join("RPI-RP2");
if candidate.exists() {
return Some(candidate);
}
}
}
}
None
}
// ── Firmware directory management ─────────────────────────────────────────────
/// Ensure `~/.zeroclaw/firmware/pico/` exists and contains the bundled assets.
///
/// Files are only written if they are absent — existing files are never overwritten
/// so users can substitute their own firmware.
///
/// Returns the firmware directory path.
pub fn ensure_firmware_dir() -> Result<PathBuf> {
use directories::BaseDirs;
let base = BaseDirs::new().ok_or_else(|| anyhow::anyhow!("cannot determine home directory"))?;
let firmware_dir = base
.home_dir()
.join(".zeroclaw")
.join("firmware")
.join("pico");
std::fs::create_dir_all(&firmware_dir)?;
// UF2 — validate magic before writing so a broken stub is caught early.
let uf2_path = firmware_dir.join("zeroclaw-pico.uf2");
if !uf2_path.exists() {
if PICO_UF2.len() < 8 || PICO_UF2[..4] != UF2_MAGIC1 {
bail!(
"Bundled UF2 is a placeholder — download the real MicroPython UF2 from \
https://micropython.org/download/RPI_PICO/ and place it at \
src/firmware/pico/zeroclaw-pico.uf2, then rebuild ZeroClaw."
);
}
std::fs::write(&uf2_path, PICO_UF2)?;
tracing::info!(path = %uf2_path.display(), "extracted bundled UF2");
}
// main.py — always check UF2 magic even if path already exists (user may
// have placed a stub). main.py has no such check — it's just text.
let main_py_path = firmware_dir.join("main.py");
if !main_py_path.exists() {
std::fs::write(&main_py_path, PICO_MAIN_PY)?;
tracing::info!(path = %main_py_path.display(), "extracted bundled main.py");
}
Ok(firmware_dir)
}
// ── Flashing ──────────────────────────────────────────────────────────────────
/// Copy the UF2 file to the RPI-RP2 mount point.
///
/// macOS often returns "Operation not permitted" for `std::fs::copy` on FAT
/// volumes presented by BOOTSEL-mode Picos. We try four approaches in order
/// and return a clear manual-fallback message if all fail:
///
/// 1. `std::fs::copy` — fast, no subprocess; works on most Linux setups.
/// 2. `cp <src> <dst>` — bypasses some macOS VFS permission layers.
/// 3. `sudo cp …` — escalates for locked volumes.
/// 4. Error — instructs the user to run the `sudo cp` manually.
pub async fn flash_uf2(mount_point: &Path, firmware_dir: &Path) -> Result<()> {
let uf2_src = firmware_dir.join("zeroclaw-pico.uf2");
let uf2_dst = mount_point.join("firmware.uf2");
let src_str = uf2_src.to_string_lossy().into_owned();
let dst_str = uf2_dst.to_string_lossy().into_owned();
tracing::info!(
src = %src_str,
dst = %dst_str,
"flashing UF2"
);
// Validate UF2 magic before any copy attempt — prevents flashing a stub.
let data = std::fs::read(&uf2_src)?;
if data.len() < 8 || data[..4] != UF2_MAGIC1 {
bail!(
"UF2 at {} does not look like a valid UF2 file (magic mismatch). \
Download from https://micropython.org/download/RPI_PICO/ and delete \
the existing file so ZeroClaw can re-extract it.",
uf2_src.display()
);
}
// ── Attempt 1: std::fs::copy (works on Linux, sometimes blocked on macOS) ─
{
let src = uf2_src.clone();
let dst = uf2_dst.clone();
let result = tokio::task::spawn_blocking(move || std::fs::copy(&src, &dst))
.await
.map_err(|e| anyhow::anyhow!("copy task panicked: {e}"));
match result {
Ok(Ok(_)) => {
tracing::info!("UF2 copy complete (std::fs::copy) — Pico will reboot");
return Ok(());
}
Ok(Err(e)) => tracing::warn!("std::fs::copy failed ({}), trying cp", e),
Err(e) => tracing::warn!("std::fs::copy task failed ({}), trying cp", e),
}
}
// ── Attempt 2: cp via subprocess ──────────────────────────────────────────
{
/// Timeout for subprocess copy attempts (seconds).
const CP_TIMEOUT_SECS: u64 = 10;
let out = tokio::time::timeout(
std::time::Duration::from_secs(CP_TIMEOUT_SECS),
tokio::process::Command::new("cp")
.arg(&src_str)
.arg(&dst_str)
.output(),
)
.await;
match out {
Err(_elapsed) => {
tracing::warn!("cp timed out after {}s, trying sudo cp", CP_TIMEOUT_SECS);
}
Ok(Ok(o)) if o.status.success() => {
tracing::info!("UF2 copy complete (cp) — Pico will reboot");
return Ok(());
}
Ok(Ok(o)) => {
let stderr = String::from_utf8_lossy(&o.stderr);
tracing::warn!("cp failed ({}), trying sudo cp", stderr.trim());
}
Ok(Err(e)) => tracing::warn!("cp spawn failed ({}), trying sudo cp", e),
}
}
// ── Attempt 3: sudo cp (non-interactive) ─────────────────────────────────
{
const SUDO_CP_TIMEOUT_SECS: u64 = 10;
let out = tokio::time::timeout(
std::time::Duration::from_secs(SUDO_CP_TIMEOUT_SECS),
tokio::process::Command::new("sudo")
.args(["-n", "cp", &src_str, &dst_str])
.output(),
)
.await;
match out {
Err(_elapsed) => {
tracing::warn!("sudo cp timed out after {}s", SUDO_CP_TIMEOUT_SECS);
}
Ok(Ok(o)) if o.status.success() => {
tracing::info!("UF2 copy complete (sudo cp) — Pico will reboot");
return Ok(());
}
Ok(Ok(o)) => {
let stderr = String::from_utf8_lossy(&o.stderr);
tracing::warn!("sudo cp failed: {}", stderr.trim());
}
Ok(Err(e)) => tracing::warn!("sudo cp spawn failed: {}", e),
}
}
// ── All attempts failed — give the user a clear manual command ────────────
bail!(
"All copy methods failed. Run this command manually, then restart ZeroClaw:\n\
\n sudo cp {src_str} {dst_str}\n"
)
}
/// Wait for `/dev/cu.usbmodem*` (macOS) or `/dev/ttyACM*` (Linux) to appear.
///
/// Polls every `interval` for up to `timeout`. Returns the first matching path
/// found, or `None` if the deadline expires.
pub async fn wait_for_serial_port(
timeout: std::time::Duration,
interval: std::time::Duration,
) -> Option<PathBuf> {
#[cfg(target_os = "macos")]
let patterns = &["/dev/cu.usbmodem*"];
#[cfg(target_os = "linux")]
let patterns = &["/dev/ttyACM*"];
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
let patterns: &[&str] = &[];
let deadline = tokio::time::Instant::now() + timeout;
loop {
for pattern in *patterns {
if let Ok(mut hits) = glob::glob(pattern) {
if let Some(Ok(path)) = hits.next() {
return Some(path);
}
}
}
if tokio::time::Instant::now() >= deadline {
return None;
}
tokio::time::sleep(interval).await;
}
}
// ── Deploy main.py via mpremote ───────────────────────────────────────────────
/// Copy `main.py` to the Pico's MicroPython filesystem and soft-reset it.
///
/// After the UF2 is flashed the Pico reboots into MicroPython but has no
/// `main.py` on its internal filesystem. This function uses `mpremote` to
/// upload the bundled `main.py` and issue a reset so it starts executing
/// immediately.
///
/// Returns `Ok(())` on success or an error with a helpful fallback command.
pub async fn deploy_main_py(port: &Path, firmware_dir: &Path) -> Result<()> {
let main_py_src = firmware_dir.join("main.py");
let src_str = main_py_src.to_string_lossy().into_owned();
let port_str = port.to_string_lossy().into_owned();
if !main_py_src.exists() {
bail!(
"main.py not found at {} — run ensure_firmware_dir() first",
main_py_src.display()
);
}
tracing::info!(
src = %src_str,
port = %port_str,
"deploying main.py via mpremote"
);
let out = tokio::process::Command::new("mpremote")
.args([
"connect", &port_str, "cp", &src_str, ":main.py", "+", "reset",
])
.output()
.await;
match out {
Ok(o) if o.status.success() => {
tracing::info!("main.py deployed and Pico reset via mpremote");
Ok(())
}
Ok(o) => {
let stderr = String::from_utf8_lossy(&o.stderr);
bail!(
"mpremote failed (exit {}): {}.\n\
Run manually:\n mpremote connect {port_str} cp {src_str} :main.py + reset",
o.status,
stderr.trim()
)
}
Err(e) => {
bail!(
"mpremote not found or could not start ({e}).\n\
Install it with: pip install mpremote\n\
Then run: mpremote connect {port_str} cp {src_str} :main.py + reset"
)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pico_uf2_has_valid_magic() {
assert!(
PICO_UF2.len() >= 8,
"bundled UF2 too small ({} bytes) — replace with real MicroPython UF2",
PICO_UF2.len()
);
assert_eq!(
&PICO_UF2[..4],
&UF2_MAGIC1,
"bundled UF2 has wrong magic — replace with real MicroPython UF2 from \
https://micropython.org/download/RPI_PICO/"
);
}
#[test]
fn pico_main_py_is_non_empty() {
assert!(!PICO_MAIN_PY.is_empty(), "bundled main.py is empty");
}
#[test]
fn pico_main_py_contains_zeroclaw_marker() {
let src = std::str::from_utf8(PICO_MAIN_PY).expect("main.py is not valid UTF-8");
assert!(
src.contains("zeroclaw"),
"main.py should contain 'zeroclaw' firmware marker"
);
}
#[test]
fn find_rpi_rp2_mount_returns_none_when_not_connected() {
// This test runs on CI without a Pico attached — just verify it doesn't panic.
let _ = find_rpi_rp2_mount(); // may be Some or None depending on environment
}
}