Files
UPC-Resent/templates/sender_udp.py.tpl
“zj” acf0306080 feat: sender 模板支持字符串和16进制两种格式发送
- 添加 is_valid_hex() 函数检测16进制格式
- 有效的16进制(偶数长度+0-9a-fA-F)按 bytes.fromhex() 发送
- 非16进制字符串按 UTF-8 编码直接发送
- 日志中标注发送类型 (hex/string)
2026-04-05 14:49:07 +08:00

130 lines
4.8 KiB
Smarty
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# Auto-generated from template. DO NOT EDIT DIRECTLY!
import sys
import socket
import time
import logging
import re
def is_valid_hex(hex_str):
"""检查字符串是否为有效的16进制格式只包含0-9,a-f,A-F且长度为偶数"""
if not hex_str:
return False
# 必须是偶数长度
if len(hex_str) % 2 != 0:
return False
# 只能包含16进制字符
return bool(re.match(r'^[0-9a-fA-F]+$', hex_str))
def udp_command_send(upc_addr, upc_command, data, timeout=2):
"""通过UDP发送数据并接收响应"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
try:
logging.debug(f"尝试发送UDP数据到 {{upc_addr[0]}}:{{upc_addr[1]}}...")
logging.info(f"正在发送UDP命令 '{{upc_command}}' 到 {{upc_addr}}: {{data!r}} (原始字节)")
sock.sendto(data, upc_addr)
logging.debug(f"UDP数据发送完成。")
# 尝试接收响应
recv_buffer_size = 1024
try:
logging.debug(f"等待从 {{upc_addr}} 接收UDP响应...")
response_bytes, addr = sock.recvfrom(recv_buffer_size)
if response_bytes:
logging.info(f"从 {{addr}} 收到UDP原始响应字节: {{response_bytes!r}}")
try:
response_str = response_bytes.decode('utf-8', errors='ignore')
logging.info(f"从 {{addr}} 收到UDP解码响应字符串: '{{response_str}}'")
except Exception as decode_e:
logging.warning(f"解码UDP响应时发生错误: {{decode_e}}")
else:
logging.info(f"未收到UDP响应数据。")
except socket.timeout:
logging.warning(f"接收UDP响应超时设备可能不返回响应这通常是正常的。")
except OSError as recv_e:
logging.error(f"接收UDP响应时发生网络错误: {{recv_e}}")
sock.close()
logging.debug(f"UDP socket关闭。")
return True
except OSError as e:
logging.error(f"UDP命令 '{{upc_command}}' 发送至 {{upc_addr}} 失败: {{e}}")
return False
finally:
try:
sock.close()
except:
pass
# 指令定义表(从配置文件读取)
COMMAND_DEFINITIONS = {command_definitions}
def upc_send_command(upc_addr, upc_command, log_file):
"""将命令发送给终端设备"""
data = None
send_type = "unknown"
# 查找指令定义
if upc_command in COMMAND_DEFINITIONS:
cmd_value = COMMAND_DEFINITIONS[upc_command]
# 判断是否为有效的16进制格式
if is_valid_hex(cmd_value):
data = bytes.fromhex(cmd_value)
send_type = "hex"
logging.debug(f"命令 '{{upc_command}}' 的值 '{{cmd_value}}' 识别为16进制格式")
else:
# 作为纯字符串发送
data = cmd_value.encode('utf-8')
send_type = "string"
logging.debug(f"命令 '{{upc_command}}' 的值 '{{cmd_value}}' 识别为字符串格式")
if data is None:
datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time()))
logstr = f"{{datetime_str}} 未知命令: {{upc_command}},没有对应的数据可发送。"
logging.warning(logstr)
print(logstr)
else:
i = 1
while i <= 3: # 尝试发送3次
if udp_command_send(upc_addr, upc_command, data):
datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time()))
logstr = f"{{datetime_str}} {{upc_command}}命令第{{i}}次发送成功 ({{send_type}})"
logging.info(logstr)
return
else:
datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time()))
logstr = f"{{datetime_str}} {{upc_command}}命令第{{i}}次发送失败3秒后进行第{{i+1}}次尝试"
logging.warning(logstr)
time.sleep(3)
i = i + 1
datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time()))
logstr = f"{{datetime_str}} {{upc_command}}命令在3次尝试后发送均失败"
logging.error(logstr)
if __name__ == '__main__':
ISOTIMEFORMAT = '%Y-%m-%d %X'
UPC_DEV_IP = sys.argv[1]
UPC_DEV_PORT = int(sys.argv[2])
upc_command = sys.argv[3]
LOG_FILE = sys.argv[4]
logging.basicConfig(filename=LOG_FILE, filemode="a", level=logging.DEBUG)
UPC_ADDR = (UPC_DEV_IP, UPC_DEV_PORT)
# UDP 是无连接的无需像TCP那样预先检查端口连通性
# 直接发送数据
logging.info(f"UDP模式准备发送命令到 {{UPC_ADDR}}")
upc_send_command(UPC_ADDR, upc_command, LOG_FILE)