firmware-base/scripts/rate_test_serial.py

260 lines
10 KiB
Python

# scripts/rate_test_serial.py
import serial
import time
import sys
import argparse
import serial.tools.list_ports
import json
import os
from datetime import datetime
# --- Configuration ---
DEFAULT_PORT = "COM12"
BAUD_RATE = 115200
INIT_DELAY = 2.5 # Match original script's 2.5s delay after connection
# 125cmds/sec - 8ms delay
def find_esp_port():
"""Tries to automatically find the ESP device port."""
ports = serial.tools.list_ports.comports()
for port, desc, hwid in sorted(ports):
# Look for common ESP32 VID/PID or descriptions
if "CP210x" in desc or "USB Serial Device" in desc or "CH340" in desc or "SER=Serial" in hwid or "VID:PID=10C4:EA60" in hwid:
print(f"Found potential ESP device: {port} ({desc})")
return port
print(f"Could not automatically find ESP device, defaulting to {DEFAULT_PORT}")
return DEFAULT_PORT
def test_command_rate(port, command, min_delay_ms, max_delay_ms, step_ms, num_commands, report_file=None):
"""Tests sending commands at different rates to find the maximum reliable rate."""
ser = None
current_delay = max_delay_ms # Start with the slowest rate (most likely to work)
# Prepare report data
report_data = {
"timestamp": datetime.now().isoformat(),
"command": command,
"port": port,
"baud_rate": BAUD_RATE,
"parameters": {
"min_delay_ms": min_delay_ms,
"max_delay_ms": max_delay_ms,
"step_ms": step_ms,
"commands_per_test": num_commands
},
"tests": [],
"result": {
"max_reliable_rate_cmds_per_sec": None,
"min_reliable_delay_ms": None,
"success": False
}
}
try:
print(f"Opening connection to {port} at {BAUD_RATE} baud...")
ser = serial.Serial(port, BAUD_RATE, timeout=1)
print(f"Connected. Waiting {INIT_DELAY}s for initialization...")
time.sleep(INIT_DELAY) # Initial delay
ser.reset_input_buffer()
# First test with maximum delay to establish baseline
print(f"\n=== Testing with {current_delay}ms delay (baseline) ===")
success, test_data = run_command_sequence(ser, command, current_delay, num_commands)
report_data["tests"].append({
"delay_ms": current_delay,
"success": success,
"data": test_data
})
if not success:
print("Failed even with maximum delay! Device might not be responding correctly.")
return report_data
# Binary search to find the threshold
low = min_delay_ms
high = max_delay_ms
last_success_delay = max_delay_ms
while high - low > step_ms:
mid = (low + high) // 2
print(f"\n=== Testing with {mid}ms delay ===")
success, test_data = run_command_sequence(ser, command, mid, num_commands)
report_data["tests"].append({
"delay_ms": mid,
"success": success,
"data": test_data
})
if success:
high = mid # Try with a shorter delay
last_success_delay = mid
else:
low = mid # Need a longer delay
# Final confirmation of the threshold
print(f"\n=== FINAL TEST with {last_success_delay}ms delay ===")
success, test_data = run_command_sequence(ser, command, last_success_delay, num_commands)
report_data["tests"].append({
"delay_ms": last_success_delay,
"success": success,
"data": test_data,
"is_final_test": True
})
if success:
rate = 1000.0 / last_success_delay
print(f"\n✅ Maximum reliable rate: approx. {rate:.2f} commands per second ({last_success_delay}ms delay)")
report_data["result"]["success"] = True
report_data["result"]["max_reliable_rate_cmds_per_sec"] = rate
report_data["result"]["min_reliable_delay_ms"] = last_success_delay
else:
print(f"\n⚠️ Results inconsistent. Try again with different parameters.")
report_data["result"]["success"] = False
except Exception as e:
print(f"Error: {e}")
report_data["error"] = str(e)
finally:
if ser and ser.is_open:
print("Closing serial port.")
ser.close()
# Save report if requested
if report_file:
# Ensure directory exists
os.makedirs(os.path.dirname(os.path.abspath(report_file)), exist_ok=True)
with open(report_file, 'w') as f:
json.dump(report_data, f, indent=2)
print(f"Report saved to {report_file}")
return report_data
def run_command_sequence(ser, command, delay_ms, count):
"""Runs a sequence of commands with specified delay between them."""
expected_responses = ["V: Bridge::onMessage", "V: PHApp::list", "V: Called method:"]
all_success = True
test_data = []
for i in range(count):
print(f"Sending command #{i+1}/{count} with {delay_ms}ms delay")
success, response_data = send_and_verify(ser, command, expected_responses)
test_data.append({
"command_index": i+1,
"success": success,
"responses": response_data
})
all_success = all_success and success
if not success:
print(f"❌ Failed at attempt #{i+1} - stopping sequence")
return False, test_data
if i < count - 1: # Don't sleep after the last command
time.sleep(delay_ms / 1000.0)
return all_success, test_data
def send_and_verify(ser, command, expected_responses):
"""Sends a command and verifies the response contains expected strings."""
# Send command
print(f" Sending: {command}")
send_time = time.time()
if not command.endswith('\n'):
command += '\n'
ser.write(command.encode('utf-8'))
ser.flush()
# Read response with timeout
start = time.time()
timeout = 2.0 # 2 seconds to receive a response
buffer = ""
found_responses = [False] * len(expected_responses)
response_data = {
"send_time": send_time,
"response_time": None,
"total_response_time_ms": None,
"lines": [],
"expected_responses": {response: False for response in expected_responses},
"all_found": False,
"echo_detected": False
}
while time.time() - start < timeout:
if ser.in_waiting > 0:
data = ser.readline().decode('utf-8', errors='ignore').strip()
if data:
receive_time = time.time()
print(f" Received: {data}")
buffer += data + "\n"
response_data["lines"].append({
"text": data,
"time": receive_time,
"delay_ms": round((receive_time - send_time) * 1000, 2)
})
# First response time
if response_data["response_time"] is None:
response_data["response_time"] = receive_time
response_data["total_response_time_ms"] = round((receive_time - send_time) * 1000, 2)
# Check if this line contains any of our expected responses
for i, expected in enumerate(expected_responses):
if expected in data:
found_responses[i] = True
response_data["expected_responses"][expected] = True
# If we've found all expected responses, we can stop early
if all(found_responses):
print(" ✅ All expected responses received")
response_data["all_found"] = True
return True, response_data
else:
time.sleep(0.01) # Short sleep to avoid busy waiting
# Check if we've seen all expected responses
all_found = all(found_responses)
response_data["all_found"] = all_found
if not all_found:
print(" ❌ Not all expected responses received")
for i, expected in enumerate(expected_responses):
if not found_responses[i]:
print(f" Missing: {expected}")
# Look for echo pattern
if command.strip('\n') in buffer:
print(" ⚠️ Command echo detected - device might be in echo mode or not handling commands")
response_data["echo_detected"] = True
return all_found, response_data
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Test maximum command rate for ESP device.")
parser.add_argument("command", help="The command to send (e.g., '<<1;2;64;list:1:0>>')")
parser.add_argument("--port", help=f"Serial port (default: auto-detect or {DEFAULT_PORT})")
parser.add_argument("--min-delay", type=int, default=5, help="Minimum delay between commands (ms)")
parser.add_argument("--max-delay", type=int, default=1000, help="Maximum delay between commands (ms)")
parser.add_argument("--step", type=int, default=5, help="Step size for delay adjustment (ms)")
parser.add_argument("--count", type=int, default=3, help="Number of commands to send in each test sequence")
parser.add_argument("--report", help="Path to save JSON report (default: ./tmp/battle.json)")
args = parser.parse_args()
# Determine port
serial_port = args.port if args.port else find_esp_port()
# Set default report path if not specified
report_path = args.report if args.report else "./tmp/battle.json"
# Run the rate test
test_command_rate(
serial_port,
args.command,
args.min_delay,
args.max_delay,
args.step,
args.count,
report_path
)