diff --git a/templates/sender_tcp.py.tpl b/templates/sender_tcp.py.tpl index ab5bb44..f42870c 100644 --- a/templates/sender_tcp.py.tpl +++ b/templates/sender_tcp.py.tpl @@ -7,6 +7,18 @@ import socket import time import logging import telnetlib +import re + + +def is_valid_hex(hex_str): + """检查字符串是否为有效的16进制格式(只包含0-9,a-f,A-F且长度为偶数)""" + if not hex_str: + return False + # 必须是偶数长度 + if len(hex_str) % 2 != 0: + return False + # 只能包含16进制字符 + return bool(re.match(r'^[0-9a-fA-F]+$', hex_str)) def tcp_command_send(upc_addr, upc_command, data): @@ -57,11 +69,21 @@ COMMAND_DEFINITIONS = {command_definitions} def upc_send_command(upc_addr, upc_command, log_file): """将命令发送给终端设备""" data = None + send_type = "unknown" # 查找指令定义 if upc_command in COMMAND_DEFINITIONS: - hex_str = COMMAND_DEFINITIONS[upc_command] - data = bytes.fromhex(hex_str) + cmd_value = COMMAND_DEFINITIONS[upc_command] + # 判断是否为有效的16进制格式 + if is_valid_hex(cmd_value): + data = bytes.fromhex(cmd_value) + send_type = "hex" + logging.debug(f"命令 '{{upc_command}}' 的值 '{{cmd_value}}' 识别为16进制格式") + else: + # 作为纯字符串发送 + data = cmd_value.encode('utf-8') + send_type = "string" + logging.debug(f"命令 '{{upc_command}}' 的值 '{{cmd_value}}' 识别为字符串格式") if data is None: datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time())) @@ -73,7 +95,7 @@ def upc_send_command(upc_addr, upc_command, log_file): while i <= 3: # 尝试发送3次 if tcp_command_send(upc_addr, upc_command, data): datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time())) - logstr = f"{{datetime_str}} {{upc_command}}命令第{{i}}次发送成功" + logstr = f"{{datetime_str}} {{upc_command}}命令第{{i}}次发送成功 ({{send_type}})" logging.info(logstr) return else: diff --git a/templates/sender_udp.py.tpl b/templates/sender_udp.py.tpl index 7ea0a77..392cb0d 100644 --- a/templates/sender_udp.py.tpl +++ b/templates/sender_udp.py.tpl @@ -6,6 +6,18 @@ import sys import socket import time import logging +import re + + +def is_valid_hex(hex_str): + """检查字符串是否为有效的16进制格式(只包含0-9,a-f,A-F且长度为偶数)""" + if not hex_str: + return False + # 必须是偶数长度 + if len(hex_str) % 2 != 0: + return False + # 只能包含16进制字符 + return bool(re.match(r'^[0-9a-fA-F]+$', hex_str)) def udp_command_send(upc_addr, upc_command, data, timeout=2): @@ -59,11 +71,21 @@ COMMAND_DEFINITIONS = {command_definitions} def upc_send_command(upc_addr, upc_command, log_file): """将命令发送给终端设备""" data = None + send_type = "unknown" # 查找指令定义 if upc_command in COMMAND_DEFINITIONS: - hex_str = COMMAND_DEFINITIONS[upc_command] - data = bytes.fromhex(hex_str) + cmd_value = COMMAND_DEFINITIONS[upc_command] + # 判断是否为有效的16进制格式 + if is_valid_hex(cmd_value): + data = bytes.fromhex(cmd_value) + send_type = "hex" + logging.debug(f"命令 '{{upc_command}}' 的值 '{{cmd_value}}' 识别为16进制格式") + else: + # 作为纯字符串发送 + data = cmd_value.encode('utf-8') + send_type = "string" + logging.debug(f"命令 '{{upc_command}}' 的值 '{{cmd_value}}' 识别为字符串格式") if data is None: datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time())) @@ -75,7 +97,7 @@ def upc_send_command(upc_addr, upc_command, log_file): while i <= 3: # 尝试发送3次 if udp_command_send(upc_addr, upc_command, data): datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time())) - logstr = f"{{datetime_str}} {{upc_command}}命令第{{i}}次发送成功" + logstr = f"{{datetime_str}} {{upc_command}}命令第{{i}}次发送成功 ({{send_type}})" logging.info(logstr) return else: