Files
UPC-Resent/templates/sender_tcp.py.tpl
2026-03-22 17:17:44 +08:00

120 lines
4.8 KiB
Smarty
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# Auto-generated from template. DO NOT EDIT DIRECTLY!
import sys
import socket
import time
import logging
import telnetlib
def tcp_command_send(upc_addr, upc_command, data):
"""负责发送数据和接收一次响应"""
timeout = 2
socket.setdefaulttimeout(timeout)
try:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
logging.debug(f"尝试连接到 {{upc_addr[0]}}:{{upc_addr[1]}}...")
client.connect(upc_addr)
logging.debug(f"成功连接到 {{upc_addr[0]}}:{{upc_addr[1]}}。")
logging.info(f"正在发送命令 '{{upc_command}}' 到 {{upc_addr}}: {{data!r}} (原始字节)")
client.send(data)
logging.debug(f"数据发送完成。")
recv_buffer_size = 1024
response_bytes = b''
try:
logging.debug(f"尝试从 {{upc_addr}} 接收响应数据...")
response_bytes = client.recv(recv_buffer_size)
if response_bytes:
logging.info(f"从 {{upc_addr}} 收到原始响应字节: {{response_bytes!r}}")
try:
response_str = response_bytes.decode('utf-8', errors='ignore')
logging.info(f"从 {{upc_addr}} 收到解码响应字符串: '{{response_str}}'")
except Exception as decode_e:
logging.warning(f"解码从 {{upc_addr}} 收到的响应时发生错误: {{decode_e}}")
else:
logging.info(f"从 {{upc_addr}} 未收到响应数据 (连接可能已关闭或设备未发送)。")
except socket.timeout:
logging.warning(f"从 {{upc_addr}} 接收响应超时。")
except OSError as recv_e:
logging.error(f"从 {{upc_addr}} 接收响应时发生网络错误: {{recv_e}}")
client.close()
logging.debug(f"连接关闭。")
return True
except OSError as e:
logging.error(f"命令 '{{upc_command}}' 发送至 {{upc_addr}} 失败: {{e}}")
return False
# 指令定义表(从配置文件读取)
COMMAND_DEFINITIONS = {command_definitions}
def upc_send_command(upc_addr, upc_command, log_file):
"""将命令发送给终端设备"""
data = None
# 查找指令定义
if upc_command in COMMAND_DEFINITIONS:
hex_str = COMMAND_DEFINITIONS[upc_command]
data = bytes.fromhex(hex_str)
if data is None:
datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time()))
logstr = f"{{datetime_str}} 未知命令: {{upc_command}},没有对应的数据可发送。"
logging.warning(logstr)
print(logstr)
else:
i = 1
while i <= 3: # 尝试发送3次
if tcp_command_send(upc_addr, upc_command, data):
datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time()))
logstr = f"{{datetime_str}} {{upc_command}}命令第{{i}}次发送成功"
logging.info(logstr)
return
else:
datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time()))
logstr = f"{{datetime_str}} {{upc_command}}命令第{{i}}次发送失败3秒后进行第{{i+1}}次尝试"
logging.warning(logstr)
time.sleep(3)
i = i + 1
datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time()))
logstr = f"{{datetime_str}} {{upc_command}}命令在3次尝试后发送均失败"
logging.error(logstr)
if __name__ == '__main__':
ISOTIMEFORMAT = '%Y-%m-%d %X'
UPC_DEV_IP = sys.argv[1]
UPC_DEV_PORT = sys.argv[2]
upc_command = sys.argv[3]
LOG_FILE = sys.argv[4]
logging.basicConfig(filename=LOG_FILE, filemode="a", level=logging.DEBUG)
UPC_ADDR = (UPC_DEV_IP, int(UPC_DEV_PORT))
# 验证端口是否能Telnet通
try:
logging.debug(f"尝试通过 Telnet 检查端口 {{UPC_ADDR[0]}}:{{UPC_ADDR[1]}} 可访问性...")
tn = telnetlib.Telnet(UPC_DEV_IP, UPC_DEV_PORT)
tn.close()
logging.debug(f"端口 {{UPC_ADDR[0]}}:{{UPC_ADDR[1]}} Telnet 检查通过。")
except OSError as e:
datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time()))
logstr = f"!!!!!ERROR!!!!!{{datetime_str}} {{UPC_ADDR}} 端口无法访问,请检查设备或网络连接: {{e}}!!!!!"
logging.error(logstr)
print(logstr)
except Exception as e:
datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time()))
logstr = f"!!!!!ERROR!!!!!{{datetime_str}} {{UPC_ADDR}} Telnet检查时发生未知错误: {{e}}!!!!!"
logging.error(logstr, exc_info=True)
print(logstr)
else:
upc_send_command(UPC_ADDR, upc_command, LOG_FILE)