diff --git a/README.md b/README.md index c62beb6..dcdd6dd 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,8 @@ cd UPC-Resent "upc_ip": "192.168.8.73", "upc_port": 502, "listen_port": 10079, - "enabled": true + "enabled": true, + "keep_alive": true // TCP长连接模式(可选,默认false) }, { "id": "dev2", @@ -141,6 +142,7 @@ crontab crontab.txt | `upc_port` | 控制设备的端口(Modbus默认502) | `502` | | `listen_port` | 本机监听端口(每个设备需不同) | `10079` | | `enabled` | 是否启用该设备 | `true` / `false` | +| `keep_alive` | **TCP 专用** 是否保持连接(长连接模式) | `true` / `false` (默认) **协议组合说明**: - `listen_protocol`: 主机监听客户端连接的协议(客户端 -> 主机) diff --git a/config/devices.json b/config/devices.json index fe0d744..5f3dd79 100644 --- a/config/devices.json +++ b/config/devices.json @@ -61,7 +61,8 @@ "upc_ip": "192.168.8.73", "upc_port": 502, "listen_port": 10079, - "enabled": true + "enabled": true, + "keep_alive": true }, { "id": "dev2", diff --git a/scripts/generate.py b/scripts/generate.py index 9181f86..235b753 100644 --- a/scripts/generate.py +++ b/scripts/generate.py @@ -103,6 +103,9 @@ def generate_listener(device, global_config, mappings, command_sets, global_comm # 确定使用哪个 sender 脚本(每个设备独立的 sender) sender_file = f"{global_config['base_dir']}/bin/sender_{device['id']}.py" + # 获取 keep_alive 配置(仅TCP有效,默认False) + keep_alive = device.get('keep_alive', False) + # 替换模板变量 content = template.format( device_id=device['id'], @@ -115,7 +118,8 @@ def generate_listener(device, global_config, mappings, command_sets, global_comm python_path=global_config['python_path'], command_mappings=mappings_str, sender_file=sender_file, - device_protocol=device_protocol.upper() + device_protocol=device_protocol.upper(), + keep_alive='True' if keep_alive else 'False' ) output_file = os.path.join(global_config['base_dir'], 'bin', f"{device['id']}.py") diff --git a/templates/listener_tcp.py.tpl b/templates/listener_tcp.py.tpl index de4a418..6a95aa3 100644 --- a/templates/listener_tcp.py.tpl +++ b/templates/listener_tcp.py.tpl @@ -18,6 +18,7 @@ SENDER_FILE = '{sender_file}' LOG_FILE = '{base_dir}/log/{device_id}.log' PYTHON_PATH = '{python_path}' DEVICE_ID = '{device_id}' +KEEP_ALIVE = {keep_alive} # 是否保持TCP连接 # ========================= BUFSIZE = 1024 @@ -60,61 +61,76 @@ def main(): logging.info("服务启动,监听 %s:%s" % (TMS_SERVER_IP, TMS_PORT)) print(f"[{{DEVICE_ID}}] 服务启动,监听 {{TMS_SERVER_IP}}:{{TMS_PORT}}") - while True: - logging.debug("等待客户端连接...") - connection, address = sock.accept() - logging.info("客户端连接建立: %s:%s" % (address[0], address[1])) + def handle_connection(connection, address): + """处理单个连接,支持 keep_alive 模式""" + logging.info("客户端连接建立: %s:%s, keep_alive=%s" % (address[0], address[1], KEEP_ALIVE)) try: - connection.settimeout(5) - logging.debug("设置超时 5 秒,等待接收数据...") - buf_bytes = connection.recv(BUFSIZE) - logging.debug("收到原始数据: %s (长度=%d)" % (repr(buf_bytes), len(buf_bytes))) - buf = buf_bytes.decode('utf-8').strip() + # 设置超时:keep_alive 模式下超时时间更长 + timeout = 30 if KEEP_ALIVE else 5 + connection.settimeout(timeout) + logging.debug("设置超时 %d 秒" % timeout) + + while True: + logging.debug("等待接收数据...") + buf_bytes = connection.recv(BUFSIZE) + + # 客户端断开连接 + if not buf_bytes: + logging.info("客户端断开连接: %s:%s" % (address[0], address[1])) + break + + logging.debug("收到原始数据: %s (长度=%d)" % (repr(buf_bytes), len(buf_bytes))) + buf = buf_bytes.decode('utf-8').strip() - datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time())) - logstr = "===" + datetime_str + " 收到客户端 " + address[0] + ":" + str(address[1]) + \ - " 发送的指令: '" + str(buf) + "'" - logging.info(logstr) - print(logstr) - - operation = process_command(buf) - logging.debug("process_command 返回: '%s'" % operation) - - if operation == "status_check": - # 状态查询或心跳检测,直接回复 ok - response = 'ok' - logstr = "===" + datetime_str + " 状态查询/心跳检测,回复: " + response - logging.info(logstr) - print(logstr) - logging.debug("发送响应: '%s'" % response) - connection.send(response.encode('utf-8')) - logging.debug("响应发送完成") - elif operation == "nodata": - response = 'Unknown command: %s' % buf - logstr = "===" + datetime_str + " 未知指令: '" + str(buf) + "'" - logging.warning(logstr) - print(logstr) - logging.debug("发送未知指令响应: '%s'" % response) - connection.send(response.encode('utf-8')) - else: - logstr = "===" + datetime_str + " 映射到内部指令: '" + str(operation) + "'" + datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time())) + logstr = "===" + datetime_str + " 收到客户端 " + address[0] + ":" + str(address[1]) + \ + " 发送的指令: '" + str(buf) + "'" logging.info(logstr) print(logstr) - os_command = str(PYTHON_PATH) + " " + SENDER_FILE + " " + \ - str(UPC_DEV_IP) + " " + str(UPC_DEV_PORT) + " " + \ - str(operation) + " " + str(LOG_FILE) - logging.info("执行外部命令: %s" % os_command) - logging.debug("开始执行外部命令...") - exit_code = os.system(os_command) - logging.debug("外部命令执行完成,返回码: %d" % exit_code) - - response = 'Command %s processed as %s' % (buf, operation) - logging.debug("发送响应: '%s'" % response) - connection.send(response.encode('utf-8')) - logging.debug("响应发送完成") + operation = process_command(buf) + logging.debug("process_command 返回: '%s'" % operation) + + if operation == "status_check": + # 状态查询或心跳检测,直接回复 ok + response = 'ok' + logstr = "===" + datetime_str + " 状态查询/心跳检测,回复: " + response + logging.info(logstr) + print(logstr) + logging.debug("发送响应: '%s'" % response) + connection.send(response.encode('utf-8')) + logging.debug("响应发送完成") + elif operation == "nodata": + response = 'Unknown command: %s' % buf + logstr = "===" + datetime_str + " 未知指令: '" + str(buf) + "'" + logging.warning(logstr) + print(logstr) + logging.debug("发送未知指令响应: '%s'" % response) + connection.send(response.encode('utf-8')) + else: + logstr = "===" + datetime_str + " 映射到内部指令: '" + str(operation) + "'" + logging.info(logstr) + print(logstr) + + os_command = str(PYTHON_PATH) + " " + SENDER_FILE + " " + \ + str(UPC_DEV_IP) + " " + str(UPC_DEV_PORT) + " " + \ + str(operation) + " " + str(LOG_FILE) + logging.info("执行外部命令: %s" % os_command) + logging.debug("开始执行外部命令...") + exit_code = os.system(os_command) + logging.debug("外部命令执行完成,返回码: %d" % exit_code) + + response = 'Command %s processed as %s' % (buf, operation) + logging.debug("发送响应: '%s'" % response) + connection.send(response.encode('utf-8')) + logging.debug("响应发送完成") + # 非 keep_alive 模式,处理完一个请求后断开连接 + if not KEEP_ALIVE: + logging.debug("非 keep_alive 模式,断开连接") + break + except socket.timeout: datetime_str = time.strftime(ISOTIMEFORMAT, time.localtime(time.time())) logstr = "===" + datetime_str + " 客户端连接超时: " + address[0] + ":" + str(address[1]) + "===" @@ -136,6 +152,11 @@ def main(): logging.info("关闭客户端连接: %s:%s" % (address[0], address[1])) connection.close() + while True: + logging.debug("等待客户端连接...") + connection, address = sock.accept() + handle_connection(connection, address) + if __name__ == '__main__': main()