firmware-base/scripts/modbus_battle_test.py

367 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Modbus Battle Test - Tests how quickly Modbus registers can be updated
"""
import argparse
import json
import os
import logging
import time
import statistics
from datetime import datetime
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ConnectionException
from pymodbus.pdu import ExceptionResponse
# --- Configuration ---
MODBUS_PORT = 502
MB_BATTLE_COUNTER_REG = 20 # Counter register address
MB_BATTLE_TIMESTAMP_REG = 21 # Timestamp register address
OUTPUT_DIR = "tmp"
OUTPUT_FILE = os.path.join(OUTPUT_DIR, "modbus_battle.json")
LOG_LEVEL = logging.INFO
RETRY_COUNT = 3 # Number of retries for connection errors
RETRY_DELAY = 0.1 # Delay between retries in seconds
# --- Modbus Exception Code Mapping ---
MODBUS_EXCEPTIONS = {
1: "Illegal Function",
2: "Illegal Data Address",
3: "Illegal Data Value",
4: "Slave Device Failure",
5: "Acknowledge",
6: "Slave Device Busy",
7: "Negative Acknowledge",
8: "Memory Parity Error",
10: "Gateway Path Unavailable",
11: "Gateway Target Device Failed to Respond",
}
# --- Setup Logging ---
logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Argument Parsing ---
parser = argparse.ArgumentParser(description='Modbus Battle Test - Test how quickly Modbus registers can be updated')
parser.add_argument('--ip-address', type=str, default='192.168.1.250',
help='IP address of the Modbus TCP server (ESP32), defaults to 192.168.1.250')
parser.add_argument('--count', type=int, default=1000,
help='Number of update iterations to perform')
parser.add_argument('--delay', type=float, default=0.0,
help='Delay in seconds between updates (0 for max speed)')
parser.add_argument('--values', type=str, default='increment',
choices=['increment', 'random', 'alternating'],
help='Values to write: increment (1,2,3...), random, or alternating (0,1,0,1...)')
parser.add_argument('--retries', type=int, default=RETRY_COUNT,
help=f'Number of retries for connection errors (default: {RETRY_COUNT})')
parser.add_argument('--retry-delay', type=float, default=RETRY_DELAY,
help=f'Delay between retries in seconds (default: {RETRY_DELAY})')
parser.add_argument('--log-level', type=str, default='INFO',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
help='Set logging level (default: INFO)')
args = parser.parse_args()
# Set log level from command line
if args.log_level:
numeric_level = getattr(logging, args.log_level.upper(), None)
if isinstance(numeric_level, int):
logging.getLogger().setLevel(numeric_level)
# Create output directory
try:
os.makedirs(OUTPUT_DIR, exist_ok=True)
logging.info(f"Ensured output directory exists: {OUTPUT_DIR}")
except OSError as e:
logging.error(f"Failed to create output directory {OUTPUT_DIR}: {e}")
exit(1)
def get_next_value(current_value, mode, iteration):
"""Returns the next value to write based on the specified mode"""
if mode == 'increment':
return current_value + 1
elif mode == 'random':
import random
return random.randint(1, 65535)
elif mode == 'alternating':
return (iteration % 2) # Returns 0 or 1
return current_value + 1 # Default to increment
def write_with_retry(client, address, value, retries=RETRY_COUNT, retry_delay=RETRY_DELAY):
"""Write a register value with retry logic for connection errors"""
connection_reset = False
for attempt in range(retries + 1):
try:
# If we had a connection reset on previous attempt, reconnect first
if connection_reset and not client.is_socket_open():
logging.debug(f"Reconnecting after connection reset (attempt {attempt+1})")
client.connect()
# Give the server time to register the new connection
time.sleep(retry_delay * 2)
response = client.write_register(address, value)
return response, None
except ConnectionError as e:
# Connection reset errors are common when other clients disconnect
connection_reset = True
if "reset by peer" in str(e).lower() or "connection reset" in str(e).lower():
logging.debug(f"Connection reset detected, likely another client disconnected")
if attempt < retries:
# Use exponential backoff for connection issues
wait_time = retry_delay * (attempt + 1)
logging.debug(f"Retry {attempt+1}/{retries} after write error: {e} (waiting {wait_time:.2f}s)")
time.sleep(wait_time)
else:
return None, e
except Exception as e:
if attempt < retries:
# Use exponential backoff for connection issues
wait_time = retry_delay * (attempt + 1)
logging.debug(f"Retry {attempt+1}/{retries} after write error: {e} (waiting {wait_time:.2f}s)")
time.sleep(wait_time)
else:
return None, e
return None, Exception("Maximum retries exceeded")
def read_with_retry(client, address, count, retries=RETRY_COUNT, retry_delay=RETRY_DELAY):
"""Read register(s) with retry logic for connection errors"""
connection_reset = False
for attempt in range(retries + 1):
try:
# If we had a connection reset on previous attempt, reconnect first
if connection_reset and not client.is_socket_open():
logging.debug(f"Reconnecting after connection reset (attempt {attempt+1})")
client.connect()
# Give the server time to register the new connection
time.sleep(retry_delay * 2)
response = client.read_holding_registers(address=address, count=count)
return response, None
except ConnectionError as e:
# Connection reset errors are common when other clients disconnect
connection_reset = True
if "reset by peer" in str(e).lower() or "connection reset" in str(e).lower():
logging.debug(f"Connection reset detected, likely another client disconnected")
if attempt < retries:
# Use exponential backoff for connection issues
wait_time = retry_delay * (attempt + 1)
logging.debug(f"Retry {attempt+1}/{retries} after read error: {e} (waiting {wait_time:.2f}s)")
time.sleep(wait_time)
else:
return None, e
except Exception as e:
if attempt < retries:
# Use exponential backoff for connection issues
wait_time = retry_delay * (attempt + 1)
logging.debug(f"Retry {attempt+1}/{retries} after read error: {e} (waiting {wait_time:.2f}s)")
time.sleep(wait_time)
else:
return None, e
return None, Exception("Maximum retries exceeded")
def run_battle_test(client, count, delay, value_mode):
"""Performs the Modbus battle test with timing metrics"""
write_times = []
verify_times = []
round_trip_times = []
success_count = 0
current_value = 0
retry_count = args.retries
retry_delay = args.retry_delay
reconnect_count = 0
# First reset the counter to 0
try:
client.write_register(MB_BATTLE_COUNTER_REG, 0)
time.sleep(0.1) # Small delay to ensure reset takes effect
except Exception as e:
logging.error(f"Failed to reset counter: {e}")
return None
logging.info(f"Starting Modbus battle test with {count} iterations, {delay}s delay, and {value_mode} values")
logging.info(f"Using {retry_count} retries with {retry_delay}s delay between retries")
test_start_time = time.time()
for i in range(count):
next_value = get_next_value(current_value, value_mode, i)
iteration_start = time.time()
# Check connection state
if not client.is_socket_open():
logging.warning(f"Connection lost at iteration {i}, attempting to reconnect...")
try:
client.connect()
reconnect_count += 1
logging.info(f"Successfully reconnected (reconnect #{reconnect_count})")
time.sleep(retry_delay * 2) # Give server time to setup connection
except Exception as ce:
logging.error(f"Failed to reconnect: {ce}")
# Write the value and measure time
write_start = time.time()
write_success = False
response, write_error = write_with_retry(client, MB_BATTLE_COUNTER_REG, next_value,
retry_count, retry_delay)
if write_error:
logging.error(f"Exception during write at iteration {i}: {write_error}")
elif response and not response.isError():
write_success = True
current_value = next_value
else:
error_msg = MODBUS_EXCEPTIONS.get(response.exception_code, "Unknown") if response else "No response"
logging.error(f"Write error at iteration {i}: {error_msg}")
write_end = time.time()
write_time = (write_end - write_start) * 1000 # Convert to ms
write_times.append(write_time)
# Verify the value was written correctly
verify_start = time.time()
verify_success = False
if write_success:
read_response, read_error = read_with_retry(client, MB_BATTLE_COUNTER_REG, 1,
retry_count, retry_delay)
if read_error:
logging.error(f"Exception during verification at iteration {i}: {read_error}")
elif read_response and not read_response.isError() and len(read_response.registers) > 0:
read_value = read_response.registers[0]
if read_value == next_value:
verify_success = True
else:
logging.warning(f"Verification mismatch at iteration {i}: Expected {next_value}, got {read_value}")
else:
error_msg = MODBUS_EXCEPTIONS.get(read_response.exception_code, "Unknown") if read_response else "No response"
logging.error(f"Read error at iteration {i}: {error_msg}")
verify_end = time.time()
verify_time = (verify_end - verify_start) * 1000 # Convert to ms
verify_times.append(verify_time)
# Calculate round-trip time
round_trip_time = (verify_end - write_start) * 1000 # Convert to ms
round_trip_times.append(round_trip_time)
# Count successful operations
if write_success and verify_success:
success_count += 1
# Show progress
if i % 10 == 0 or i == count - 1:
elapsed = time.time() - test_start_time
percent_complete = (i+1) / count * 100
estimated_total = elapsed / (i+1) * count if i > 0 else 0
remaining = estimated_total - elapsed if estimated_total > 0 else 0
logging.info(f"Progress: {i+1}/{count} iterations ({percent_complete:.1f}%), " +
f"{success_count} successful, " +
f"ETA: {remaining:.1f}s remaining")
# Apply delay if specified
if delay > 0:
time.sleep(delay)
# Calculate statistics
success_rate = (success_count / count) * 100 if count > 0 else 0
test_elapsed_time = time.time() - test_start_time
results = {
"timestamp": datetime.now().isoformat(),
"parameters": {
"ip_address": args.ip_address,
"count": count,
"delay": delay,
"value_mode": value_mode,
"retries": retry_count,
"retry_delay": retry_delay
},
"results": {
"success_count": success_count,
"success_rate": success_rate,
"reconnect_count": reconnect_count,
"total_elapsed_seconds": test_elapsed_time,
"write_times_ms": {
"min": min(write_times) if write_times else None,
"max": max(write_times) if write_times else None,
"mean": statistics.mean(write_times) if write_times else None,
"median": statistics.median(write_times) if write_times else None,
"percentile_95": statistics.quantiles(write_times, n=20)[18] if len(write_times) >= 20 else None
},
"verify_times_ms": {
"min": min(verify_times) if verify_times else None,
"max": max(verify_times) if verify_times else None,
"mean": statistics.mean(verify_times) if verify_times else None,
"median": statistics.median(verify_times) if verify_times else None,
"percentile_95": statistics.quantiles(verify_times, n=20)[18] if len(verify_times) >= 20 else None
},
"round_trip_times_ms": {
"min": min(round_trip_times) if round_trip_times else None,
"max": max(round_trip_times) if round_trip_times else None,
"mean": statistics.mean(round_trip_times) if round_trip_times else None,
"median": statistics.median(round_trip_times) if round_trip_times else None,
"percentile_95": statistics.quantiles(round_trip_times, n=20)[18] if len(round_trip_times) >= 20 else None
},
"max_update_rate_per_second": 1000 / statistics.mean(round_trip_times) if round_trip_times else None
},
"all_data": {
"write_times": write_times,
"verify_times": verify_times,
"round_trip_times": round_trip_times
}
}
return results
# --- Main Script ---
def main():
client = ModbusTcpClient(args.ip_address, port=MODBUS_PORT)
connection_success = False
results = None
try:
logging.info(f"Connecting to Modbus TCP server at {args.ip_address}:{MODBUS_PORT}...")
connection_success = client.connect()
if connection_success:
logging.info("Connection successful")
results = run_battle_test(client, args.count, args.delay, args.values)
if results:
# Print summary to console
print("\n--- Modbus Battle Test Results ---")
print(f"Iterations: {args.count}")
print(f"Success rate: {results['results']['success_rate']:.2f}%")
print(f"Reconnections: {results['results']['reconnect_count']}")
print(f"Total time: {results['results']['total_elapsed_seconds']:.2f} seconds")
print(f"Write time (avg): {results['results']['write_times_ms']['mean']:.2f} ms")
print(f"Verify time (avg): {results['results']['verify_times_ms']['mean']:.2f} ms")
print(f"Round-trip time (avg): {results['results']['round_trip_times_ms']['mean']:.2f} ms")
print(f"Maximum update rate: {results['results']['max_update_rate_per_second']:.2f} updates/second")
# Print additional connection stats
if results['results']['reconnect_count'] > 0:
print("\nConnection Information:")
print(f" Reconnection events: {results['results']['reconnect_count']}")
print(f" Average time between reconnects: {results['results']['total_elapsed_seconds']/results['results']['reconnect_count']:.2f}s")
print("--------------------------------\n")
# Save results to file
with open(OUTPUT_FILE, 'w') as f:
json.dump(results, f, indent=2)
logging.info(f"Results saved to {OUTPUT_FILE}")
else:
logging.error(f"Failed to connect to Modbus TCP server at {args.ip_address}:{MODBUS_PORT}")
except Exception as e:
logging.error(f"An error occurred: {e}")
finally:
if client.is_socket_open():
client.close()
logging.info("Modbus connection closed")
if __name__ == "__main__":
main()