firmware-base/scripts/modbus_read_coils.py

127 lines
4.7 KiB
Python

import argparse
import json
import os
import logging
import time
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ConnectionException
from pymodbus.pdu import ExceptionResponse
# --- Configuration ---
MODBUS_PORT = 502
OUTPUT_DIR = "tmp"
OUTPUT_FILE = os.path.join(OUTPUT_DIR, "mbc-test.json")
LOG_LEVEL = logging.INFO
# --- Modbus Exception Code Mapping ---
MODBUS_EXCEPTIONS = {
1: "Illegal Function",
2: "Illegal Data Address",
3: "Illegal Data Value",
4: "Slave Device Failure",
5: "Acknowledge",
6: "Slave Device Busy",
7: "Negative Acknowledge",
8: "Memory Parity Error",
10: "Gateway Path Unavailable",
11: "Gateway Target Device Failed to Respond",
}
# --- Setup Logging ---
logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Argument Parsing ---
parser = argparse.ArgumentParser(description='Connect to Modbus TCP server and read coils.')
parser.add_argument('--ip-address', type=str, default='192.168.1.250', \
help='IP address of the Modbus TCP server (ESP32), defaults to 192.168.1.250')
parser.add_argument('--address', type=int, required=True, \
help='Starting coil address to read.')
parser.add_argument('--count', type=int, default=1, \
help='Number of coils to read (default: 1).')
args = parser.parse_args()
# Use arguments for address and count
COIL_ADDRESS = args.address
COIL_COUNT = args.count
# --- Main Script ---
client = ModbusTcpClient(args.ip_address, port=MODBUS_PORT)
connection_success = False
results = None
read_success = False
# Create output directory
try:
os.makedirs(OUTPUT_DIR, exist_ok=True)
logging.info(f"Ensured output directory exists: {OUTPUT_DIR}")
except OSError as e:
logging.error(f"Failed to create output directory {OUTPUT_DIR}: {e}")
exit(1)
try:
logging.info(f"Attempting to connect to Modbus TCP server at {args.ip_address}:{MODBUS_PORT}...")
connection_success = client.connect()
if connection_success:
logging.info("Connection successful.")
logging.info(f"Reading {COIL_COUNT} coils starting from address {COIL_ADDRESS}...")
try:
# Read Coils (Function Code 01)
response = client.read_coils(address=COIL_ADDRESS, count=COIL_COUNT)
if not response.isError():
# Access the list of boolean values
coil_values = response.bits[:COIL_COUNT]
logging.info(f"Successfully read {len(coil_values)} coil values.")
results = coil_values
read_success = True
# Print the results to the console
print("\n--- Read Coil Values ---")
for i, value in enumerate(coil_values):
print(f"Coil {COIL_ADDRESS + i}: {value}")
print("------------------------\n")
else:
# Handle Modbus logical errors
error_code = getattr(response, 'exception_code', None)
error_message = MODBUS_EXCEPTIONS.get(error_code, f"Unknown error code {error_code}")
logging.error(f"Modbus error reading coils: Code {error_code} - {error_message}. Response: {response}")
except ConnectionException as ce:
# Handle connection errors during read
logging.error(f"Connection error during read: {ce}")
except Exception as e:
# Handle other unexpected errors during read attempt
logging.error(f"An unexpected error occurred during read: {e}")
else:
logging.error(f"Failed to connect to the Modbus TCP server at {args.ip_address}:{MODBUS_PORT}.")
except Exception as e:
# Catch errors during initial connection or loop setup
logging.error(f"An unexpected error occurred: {e}")
finally:
if client.is_socket_open():
client.close()
logging.info("Modbus connection closed.")
# Write results to JSON file only if read was successful
if read_success and results is not None:
try:
with open(OUTPUT_FILE, 'w') as f:
json.dump(results, f, indent=4)
logging.info(f"Coil results successfully written to {OUTPUT_FILE}")
except IOError as e:
logging.error(f"Failed to write results to {OUTPUT_FILE}: {e}")
else:
if connection_success and not read_success:
logging.warning("Read operation failed, JSON file not written.")
elif not connection_success:
logging.warning("Connection failed, JSON file not written.")
# Exit with error code if connection or read failed
if not connection_success or not read_success:
exit(1) # Indicate failure