367 lines
17 KiB
Python
367 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Modbus Battle Test - Tests how quickly Modbus registers can be updated
|
|
"""
|
|
import argparse
|
|
import json
|
|
import os
|
|
import logging
|
|
import time
|
|
import statistics
|
|
from datetime import datetime
|
|
from pymodbus.client import ModbusTcpClient
|
|
from pymodbus.exceptions import ConnectionException
|
|
from pymodbus.pdu import ExceptionResponse
|
|
|
|
# --- Configuration ---
|
|
MODBUS_PORT = 502
|
|
MB_BATTLE_COUNTER_REG = 20 # Counter register address
|
|
MB_BATTLE_TIMESTAMP_REG = 21 # Timestamp register address
|
|
OUTPUT_DIR = "tmp"
|
|
OUTPUT_FILE = os.path.join(OUTPUT_DIR, "modbus_battle.json")
|
|
LOG_LEVEL = logging.INFO
|
|
RETRY_COUNT = 3 # Number of retries for connection errors
|
|
RETRY_DELAY = 0.1 # Delay between retries in seconds
|
|
|
|
# --- Modbus Exception Code Mapping ---
|
|
MODBUS_EXCEPTIONS = {
|
|
1: "Illegal Function",
|
|
2: "Illegal Data Address",
|
|
3: "Illegal Data Value",
|
|
4: "Slave Device Failure",
|
|
5: "Acknowledge",
|
|
6: "Slave Device Busy",
|
|
7: "Negative Acknowledge",
|
|
8: "Memory Parity Error",
|
|
10: "Gateway Path Unavailable",
|
|
11: "Gateway Target Device Failed to Respond",
|
|
}
|
|
|
|
# --- Setup Logging ---
|
|
logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
# --- Argument Parsing ---
|
|
parser = argparse.ArgumentParser(description='Modbus Battle Test - Test how quickly Modbus registers can be updated')
|
|
parser.add_argument('--ip-address', type=str, default='192.168.1.250',
|
|
help='IP address of the Modbus TCP server (ESP32), defaults to 192.168.1.250')
|
|
parser.add_argument('--count', type=int, default=1000,
|
|
help='Number of update iterations to perform')
|
|
parser.add_argument('--delay', type=float, default=0.0,
|
|
help='Delay in seconds between updates (0 for max speed)')
|
|
parser.add_argument('--values', type=str, default='increment',
|
|
choices=['increment', 'random', 'alternating'],
|
|
help='Values to write: increment (1,2,3...), random, or alternating (0,1,0,1...)')
|
|
parser.add_argument('--retries', type=int, default=RETRY_COUNT,
|
|
help=f'Number of retries for connection errors (default: {RETRY_COUNT})')
|
|
parser.add_argument('--retry-delay', type=float, default=RETRY_DELAY,
|
|
help=f'Delay between retries in seconds (default: {RETRY_DELAY})')
|
|
parser.add_argument('--log-level', type=str, default='INFO',
|
|
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
|
|
help='Set logging level (default: INFO)')
|
|
args = parser.parse_args()
|
|
|
|
# Set log level from command line
|
|
if args.log_level:
|
|
numeric_level = getattr(logging, args.log_level.upper(), None)
|
|
if isinstance(numeric_level, int):
|
|
logging.getLogger().setLevel(numeric_level)
|
|
|
|
# Create output directory
|
|
try:
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
logging.info(f"Ensured output directory exists: {OUTPUT_DIR}")
|
|
except OSError as e:
|
|
logging.error(f"Failed to create output directory {OUTPUT_DIR}: {e}")
|
|
exit(1)
|
|
|
|
def get_next_value(current_value, mode, iteration):
|
|
"""Returns the next value to write based on the specified mode"""
|
|
if mode == 'increment':
|
|
return current_value + 1
|
|
elif mode == 'random':
|
|
import random
|
|
return random.randint(1, 65535)
|
|
elif mode == 'alternating':
|
|
return (iteration % 2) # Returns 0 or 1
|
|
return current_value + 1 # Default to increment
|
|
|
|
def write_with_retry(client, address, value, retries=RETRY_COUNT, retry_delay=RETRY_DELAY):
|
|
"""Write a register value with retry logic for connection errors"""
|
|
connection_reset = False
|
|
for attempt in range(retries + 1):
|
|
try:
|
|
# If we had a connection reset on previous attempt, reconnect first
|
|
if connection_reset and not client.is_socket_open():
|
|
logging.debug(f"Reconnecting after connection reset (attempt {attempt+1})")
|
|
client.connect()
|
|
# Give the server time to register the new connection
|
|
time.sleep(retry_delay * 2)
|
|
|
|
response = client.write_register(address, value)
|
|
return response, None
|
|
except ConnectionError as e:
|
|
# Connection reset errors are common when other clients disconnect
|
|
connection_reset = True
|
|
if "reset by peer" in str(e).lower() or "connection reset" in str(e).lower():
|
|
logging.debug(f"Connection reset detected, likely another client disconnected")
|
|
|
|
if attempt < retries:
|
|
# Use exponential backoff for connection issues
|
|
wait_time = retry_delay * (attempt + 1)
|
|
logging.debug(f"Retry {attempt+1}/{retries} after write error: {e} (waiting {wait_time:.2f}s)")
|
|
time.sleep(wait_time)
|
|
else:
|
|
return None, e
|
|
except Exception as e:
|
|
if attempt < retries:
|
|
# Use exponential backoff for connection issues
|
|
wait_time = retry_delay * (attempt + 1)
|
|
logging.debug(f"Retry {attempt+1}/{retries} after write error: {e} (waiting {wait_time:.2f}s)")
|
|
time.sleep(wait_time)
|
|
else:
|
|
return None, e
|
|
return None, Exception("Maximum retries exceeded")
|
|
|
|
def read_with_retry(client, address, count, retries=RETRY_COUNT, retry_delay=RETRY_DELAY):
|
|
"""Read register(s) with retry logic for connection errors"""
|
|
connection_reset = False
|
|
for attempt in range(retries + 1):
|
|
try:
|
|
# If we had a connection reset on previous attempt, reconnect first
|
|
if connection_reset and not client.is_socket_open():
|
|
logging.debug(f"Reconnecting after connection reset (attempt {attempt+1})")
|
|
client.connect()
|
|
# Give the server time to register the new connection
|
|
time.sleep(retry_delay * 2)
|
|
|
|
response = client.read_holding_registers(address=address, count=count)
|
|
return response, None
|
|
except ConnectionError as e:
|
|
# Connection reset errors are common when other clients disconnect
|
|
connection_reset = True
|
|
if "reset by peer" in str(e).lower() or "connection reset" in str(e).lower():
|
|
logging.debug(f"Connection reset detected, likely another client disconnected")
|
|
|
|
if attempt < retries:
|
|
# Use exponential backoff for connection issues
|
|
wait_time = retry_delay * (attempt + 1)
|
|
logging.debug(f"Retry {attempt+1}/{retries} after read error: {e} (waiting {wait_time:.2f}s)")
|
|
time.sleep(wait_time)
|
|
else:
|
|
return None, e
|
|
except Exception as e:
|
|
if attempt < retries:
|
|
# Use exponential backoff for connection issues
|
|
wait_time = retry_delay * (attempt + 1)
|
|
logging.debug(f"Retry {attempt+1}/{retries} after read error: {e} (waiting {wait_time:.2f}s)")
|
|
time.sleep(wait_time)
|
|
else:
|
|
return None, e
|
|
return None, Exception("Maximum retries exceeded")
|
|
|
|
def run_battle_test(client, count, delay, value_mode):
|
|
"""Performs the Modbus battle test with timing metrics"""
|
|
write_times = []
|
|
verify_times = []
|
|
round_trip_times = []
|
|
success_count = 0
|
|
current_value = 0
|
|
retry_count = args.retries
|
|
retry_delay = args.retry_delay
|
|
reconnect_count = 0
|
|
|
|
# First reset the counter to 0
|
|
try:
|
|
client.write_register(MB_BATTLE_COUNTER_REG, 0)
|
|
time.sleep(0.1) # Small delay to ensure reset takes effect
|
|
except Exception as e:
|
|
logging.error(f"Failed to reset counter: {e}")
|
|
return None
|
|
|
|
logging.info(f"Starting Modbus battle test with {count} iterations, {delay}s delay, and {value_mode} values")
|
|
logging.info(f"Using {retry_count} retries with {retry_delay}s delay between retries")
|
|
|
|
test_start_time = time.time()
|
|
|
|
for i in range(count):
|
|
next_value = get_next_value(current_value, value_mode, i)
|
|
iteration_start = time.time()
|
|
|
|
# Check connection state
|
|
if not client.is_socket_open():
|
|
logging.warning(f"Connection lost at iteration {i}, attempting to reconnect...")
|
|
try:
|
|
client.connect()
|
|
reconnect_count += 1
|
|
logging.info(f"Successfully reconnected (reconnect #{reconnect_count})")
|
|
time.sleep(retry_delay * 2) # Give server time to setup connection
|
|
except Exception as ce:
|
|
logging.error(f"Failed to reconnect: {ce}")
|
|
|
|
# Write the value and measure time
|
|
write_start = time.time()
|
|
write_success = False
|
|
|
|
response, write_error = write_with_retry(client, MB_BATTLE_COUNTER_REG, next_value,
|
|
retry_count, retry_delay)
|
|
|
|
if write_error:
|
|
logging.error(f"Exception during write at iteration {i}: {write_error}")
|
|
elif response and not response.isError():
|
|
write_success = True
|
|
current_value = next_value
|
|
else:
|
|
error_msg = MODBUS_EXCEPTIONS.get(response.exception_code, "Unknown") if response else "No response"
|
|
logging.error(f"Write error at iteration {i}: {error_msg}")
|
|
|
|
write_end = time.time()
|
|
write_time = (write_end - write_start) * 1000 # Convert to ms
|
|
write_times.append(write_time)
|
|
|
|
# Verify the value was written correctly
|
|
verify_start = time.time()
|
|
verify_success = False
|
|
|
|
if write_success:
|
|
read_response, read_error = read_with_retry(client, MB_BATTLE_COUNTER_REG, 1,
|
|
retry_count, retry_delay)
|
|
|
|
if read_error:
|
|
logging.error(f"Exception during verification at iteration {i}: {read_error}")
|
|
elif read_response and not read_response.isError() and len(read_response.registers) > 0:
|
|
read_value = read_response.registers[0]
|
|
if read_value == next_value:
|
|
verify_success = True
|
|
else:
|
|
logging.warning(f"Verification mismatch at iteration {i}: Expected {next_value}, got {read_value}")
|
|
else:
|
|
error_msg = MODBUS_EXCEPTIONS.get(read_response.exception_code, "Unknown") if read_response else "No response"
|
|
logging.error(f"Read error at iteration {i}: {error_msg}")
|
|
|
|
verify_end = time.time()
|
|
verify_time = (verify_end - verify_start) * 1000 # Convert to ms
|
|
verify_times.append(verify_time)
|
|
|
|
# Calculate round-trip time
|
|
round_trip_time = (verify_end - write_start) * 1000 # Convert to ms
|
|
round_trip_times.append(round_trip_time)
|
|
|
|
# Count successful operations
|
|
if write_success and verify_success:
|
|
success_count += 1
|
|
|
|
# Show progress
|
|
if i % 10 == 0 or i == count - 1:
|
|
elapsed = time.time() - test_start_time
|
|
percent_complete = (i+1) / count * 100
|
|
estimated_total = elapsed / (i+1) * count if i > 0 else 0
|
|
remaining = estimated_total - elapsed if estimated_total > 0 else 0
|
|
|
|
logging.info(f"Progress: {i+1}/{count} iterations ({percent_complete:.1f}%), " +
|
|
f"{success_count} successful, " +
|
|
f"ETA: {remaining:.1f}s remaining")
|
|
|
|
# Apply delay if specified
|
|
if delay > 0:
|
|
time.sleep(delay)
|
|
|
|
# Calculate statistics
|
|
success_rate = (success_count / count) * 100 if count > 0 else 0
|
|
test_elapsed_time = time.time() - test_start_time
|
|
|
|
results = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"parameters": {
|
|
"ip_address": args.ip_address,
|
|
"count": count,
|
|
"delay": delay,
|
|
"value_mode": value_mode,
|
|
"retries": retry_count,
|
|
"retry_delay": retry_delay
|
|
},
|
|
"results": {
|
|
"success_count": success_count,
|
|
"success_rate": success_rate,
|
|
"reconnect_count": reconnect_count,
|
|
"total_elapsed_seconds": test_elapsed_time,
|
|
"write_times_ms": {
|
|
"min": min(write_times) if write_times else None,
|
|
"max": max(write_times) if write_times else None,
|
|
"mean": statistics.mean(write_times) if write_times else None,
|
|
"median": statistics.median(write_times) if write_times else None,
|
|
"percentile_95": statistics.quantiles(write_times, n=20)[18] if len(write_times) >= 20 else None
|
|
},
|
|
"verify_times_ms": {
|
|
"min": min(verify_times) if verify_times else None,
|
|
"max": max(verify_times) if verify_times else None,
|
|
"mean": statistics.mean(verify_times) if verify_times else None,
|
|
"median": statistics.median(verify_times) if verify_times else None,
|
|
"percentile_95": statistics.quantiles(verify_times, n=20)[18] if len(verify_times) >= 20 else None
|
|
},
|
|
"round_trip_times_ms": {
|
|
"min": min(round_trip_times) if round_trip_times else None,
|
|
"max": max(round_trip_times) if round_trip_times else None,
|
|
"mean": statistics.mean(round_trip_times) if round_trip_times else None,
|
|
"median": statistics.median(round_trip_times) if round_trip_times else None,
|
|
"percentile_95": statistics.quantiles(round_trip_times, n=20)[18] if len(round_trip_times) >= 20 else None
|
|
},
|
|
"max_update_rate_per_second": 1000 / statistics.mean(round_trip_times) if round_trip_times else None
|
|
},
|
|
"all_data": {
|
|
"write_times": write_times,
|
|
"verify_times": verify_times,
|
|
"round_trip_times": round_trip_times
|
|
}
|
|
}
|
|
|
|
return results
|
|
|
|
# --- Main Script ---
|
|
def main():
|
|
client = ModbusTcpClient(args.ip_address, port=MODBUS_PORT)
|
|
connection_success = False
|
|
results = None
|
|
|
|
try:
|
|
logging.info(f"Connecting to Modbus TCP server at {args.ip_address}:{MODBUS_PORT}...")
|
|
connection_success = client.connect()
|
|
|
|
if connection_success:
|
|
logging.info("Connection successful")
|
|
results = run_battle_test(client, args.count, args.delay, args.values)
|
|
if results:
|
|
# Print summary to console
|
|
print("\n--- Modbus Battle Test Results ---")
|
|
print(f"Iterations: {args.count}")
|
|
print(f"Success rate: {results['results']['success_rate']:.2f}%")
|
|
print(f"Reconnections: {results['results']['reconnect_count']}")
|
|
print(f"Total time: {results['results']['total_elapsed_seconds']:.2f} seconds")
|
|
print(f"Write time (avg): {results['results']['write_times_ms']['mean']:.2f} ms")
|
|
print(f"Verify time (avg): {results['results']['verify_times_ms']['mean']:.2f} ms")
|
|
print(f"Round-trip time (avg): {results['results']['round_trip_times_ms']['mean']:.2f} ms")
|
|
print(f"Maximum update rate: {results['results']['max_update_rate_per_second']:.2f} updates/second")
|
|
|
|
# Print additional connection stats
|
|
if results['results']['reconnect_count'] > 0:
|
|
print("\nConnection Information:")
|
|
print(f" Reconnection events: {results['results']['reconnect_count']}")
|
|
print(f" Average time between reconnects: {results['results']['total_elapsed_seconds']/results['results']['reconnect_count']:.2f}s")
|
|
|
|
print("--------------------------------\n")
|
|
|
|
# Save results to file
|
|
with open(OUTPUT_FILE, 'w') as f:
|
|
json.dump(results, f, indent=2)
|
|
logging.info(f"Results saved to {OUTPUT_FILE}")
|
|
else:
|
|
logging.error(f"Failed to connect to Modbus TCP server at {args.ip_address}:{MODBUS_PORT}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"An error occurred: {e}")
|
|
finally:
|
|
if client.is_socket_open():
|
|
client.close()
|
|
logging.info("Modbus connection closed")
|
|
|
|
if __name__ == "__main__":
|
|
main() |