#!/usr/bin/env python3 # -*- coding: UTF-8 -*- """ 代码生成器:根据配置文件自动生成 Python 执行文件 支持 TCP 和 UDP 协议 用法: python scripts/generate.py [--check] --check: 仅检查配置,不生成文件 """ import json import os import sys import argparse def load_config(config_file='config/devices.json'): """加载配置文件""" with open(config_file, 'r', encoding='utf-8') as f: return json.load(f) def ensure_dirs(base_dir): """确保必要的目录存在""" dirs = [ os.path.join(base_dir, 'bin'), os.path.join(base_dir, 'crond'), os.path.join(base_dir, 'log') ] for d in dirs: os.makedirs(d, exist_ok=True) def dict_to_python_string(d): """将字典转换为 Python 代码字符串""" lines = ['{'] for k, v in d.items(): if isinstance(v, str): lines.append(f' "{k}": "{v}",') else: lines.append(f' "{k}": {v},') lines.append('}') return '\n'.join(lines) def get_template_path(protocol, template_type): """ 根据协议类型和模板类型获取模板文件路径 protocol: 'tcp' 或 'udp' template_type: 'listener', 'crond', 'sender' """ protocol = protocol.lower() if template_type == 'listener': return f'templates/listener_{protocol}.py.tpl' elif template_type == 'crond': return f'templates/crond_{protocol}.py.tpl' elif template_type == 'sender': return f'templates/sender_{protocol}.py.tpl' else: raise ValueError(f"未知的模板类型: {template_type}") def generate_listener(device, global_config, mappings): """生成监听服务文件""" protocol = device.get('protocol', 'tcp').lower() template_file = get_template_path(protocol, 'listener') # 检查模板文件是否存在 if not os.path.exists(template_file): print(f"[!] 警告: 模板文件不存在: {template_file},使用TCP模板作为后备") template_file = 'templates/listener_tcp.py.tpl' with open(template_file, 'r', encoding='utf-8') as f: template = f.read() # 构建命令映射字典字符串 mappings_str = "{\n" for cmd, op in mappings.items(): mappings_str += f' "{cmd}": "{op}",\n' # 添加默认映射(open1-open8, close1-close8) for i in range(1, 9): mappings_str += f' "open{i}": "open{i}",\n' mappings_str += f' "close{i}": "close{i}",\n' mappings_str += " }" # 替换模板变量 content = template.format( device_id=device['id'], device_name=device['name'], tms_server_ip=global_config['tms_server_ip'], listen_port=device['listen_port'], upc_ip=device['upc_ip'], upc_port=device['upc_port'], base_dir=global_config['base_dir'], python_path=global_config['python_path'], command_mappings=mappings_str ) output_file = os.path.join(global_config['base_dir'], 'bin', f"{device['id']}.py") with open(output_file, 'w', encoding='utf-8') as f: f.write(content) os.chmod(output_file, 0o755) print(f"[生成] {output_file} ({protocol.upper()})") return output_file def generate_crond(device, global_config): """生成保活检查文件""" protocol = device.get('protocol', 'tcp').lower() template_file = get_template_path(protocol, 'crond') # 检查模板文件是否存在 if not os.path.exists(template_file): print(f"[!] 警告: 模板文件不存在: {template_file},使用TCP模板作为后备") template_file = 'templates/crond_tcp.py.tpl' with open(template_file, 'r', encoding='utf-8') as f: template = f.read() content = template.format( device_id=device['id'], device_name=device['name'], tms_server_ip=global_config['tms_server_ip'], listen_port=device['listen_port'], base_dir=global_config['base_dir'], python_path=global_config['python_path'] ) output_file = os.path.join(global_config['base_dir'], 'crond', f"upcrond-{device['id']}.py") with open(output_file, 'w', encoding='utf-8') as f: f.write(content) os.chmod(output_file, 0o755) print(f"[生成] {output_file} ({protocol.upper()})") return output_file def generate_sender(commands, global_config, protocol='tcp'): """生成发送模块文件""" protocol = protocol.lower() template_file = get_template_path(protocol, 'sender') # 检查模板文件是否存在 if not os.path.exists(template_file): print(f"[!] 警告: 模板文件不存在: {template_file},使用TCP模板作为后备") template_file = 'templates/sender_tcp.py.tpl' with open(template_file, 'r', encoding='utf-8') as f: template = f.read() # 构建指令定义字典字符串 cmd_defs = "{\n" for cmd, hex_str in commands.items(): cmd_defs += f' "{cmd}": "{hex_str}",\n' cmd_defs += "}" content = template.format( command_definitions=cmd_defs ) # 根据协议生成不同的文件名 output_file = os.path.join(global_config['base_dir'], 'bin', f'sender_{protocol}.py') with open(output_file, 'w', encoding='utf-8') as f: f.write(content) os.chmod(output_file, 0o755) print(f"[生成] {output_file} ({protocol.upper()})") return output_file def generate_all_senders(config, global_config): """为所有使用到的协议生成 sender 文件""" commands = config['commands'] devices = config['devices'] # 收集所有使用的协议类型 protocols = set() for device in devices: if device.get('enabled', True): protocol = device.get('protocol', 'tcp').lower() protocols.add(protocol) generated = [] for protocol in protocols: generated.append(generate_sender(commands, global_config, protocol)) return generated def generate_crontab(devices, global_config): """生成 crontab 配置""" lines = [] for device in devices: if not device.get('enabled', True): continue # 每分钟执行一次保活检查 lines.append(f"* */1 * * * {global_config['python_path']} {global_config['base_dir']}/crond/upcrond-{device['id']}.py") # 添加日志清理任务 lines.append(f"0 0 * * * {global_config['base_dir']}/cutlog.sh") content = '\n'.join(lines) + '\n' output_file = 'crontab.txt' with open(output_file, 'w', encoding='utf-8') as f: f.write(content) print(f"[生成] {output_file}") return output_file def generate_start_script(devices, global_config): """生成启动脚本""" lines = ['#!/bin/bash'] lines.append('# Auto-generated start script') lines.append('') lines.append('# 启动所有监听服务') for device in devices: if not device.get('enabled', True): lines.append(f"# {device['name']} ({device['id']}) - 已禁用") continue protocol = device.get('protocol', 'tcp').upper() script_path = f"{global_config['base_dir']}/bin/{device['id']}.py" lines.append(f"echo '启动 {device['name']} ({device['id']}) - {protocol}...'") lines.append(f"su root -c \"{global_config['python_path']} {script_path} &\"") lines.append('') lines.append('echo "所有服务已启动"') content = '\n'.join(lines) + '\n' output_file = 'start.sh' with open(output_file, 'w', encoding='utf-8') as f: f.write(content) os.chmod(output_file, 0o755) print(f"[生成] {output_file}") return output_file def check_config(config): """检查配置是否合法""" errors = [] warnings = [] global_cfg = config.get('global', {}) devices = config.get('devices', []) commands = config.get('commands', {}) mappings = config.get('mappings', {}) # 检查全局配置 required_globals = ['tms_server_ip', 'python_path', 'base_dir'] for key in required_globals: if key not in global_cfg: errors.append(f"缺少全局配置: {key}") # 检查设备配置 if not devices: errors.append("没有配置任何设备") device_ids = [] listen_ports = [] for device in devices: device_id = device.get('id') if not device_id: errors.append("设备缺少 id 字段") continue if device_id in device_ids: errors.append(f"设备 id 重复: {device_id}") device_ids.append(device_id) required_device_fields = ['name', 'upc_ip', 'upc_port', 'listen_port'] for field in required_device_fields: if field not in device: errors.append(f"设备 {device_id} 缺少字段: {field}") # 检查协议类型 protocol = device.get('protocol', 'tcp').lower() if protocol not in ['tcp', 'udp']: errors.append(f"设备 {device_id} 的协议类型无效: {protocol} (必须是 tcp 或 udp)") if 'listen_port' in device: port = device['listen_port'] if port in listen_ports: errors.append(f"设备 {device_id} 的监听端口 {port} 与其他设备冲突") listen_ports.append(port) # 检查指令定义 if not commands: warnings.append("没有定义任何指令") # 检查映射 for cmd, op in mappings.items(): if op not in commands and not any(op == f"open{i}" or op == f"close{i}" for i in range(1, 9)): warnings.append(f"映射 '{cmd}' -> '{op}' 的目标指令未在 commands 中定义") # 检查模板文件是否存在 protocols = set() for device in devices: if device.get('enabled', True): protocols.add(device.get('protocol', 'tcp').lower()) for protocol in protocols: for template_type in ['listener', 'crond', 'sender']: template_file = get_template_path(protocol, template_type) if not os.path.exists(template_file): warnings.append(f"模板文件不存在: {template_file}") # 输出检查结果 if errors: print("\n[!] 配置错误:") for e in errors: print(f" - {e}") if warnings: print("\n[*] 配置警告:") for w in warnings: print(f" - {w}") if not errors and not warnings: print("\n[✓] 配置检查通过") return True return len(errors) == 0 def main(): parser = argparse.ArgumentParser(description='UPC Resent 代码生成器 (支持 TCP/UDP)') parser.add_argument('--check', action='store_true', help='仅检查配置,不生成文件') args = parser.parse_args() print("=" * 50) print("UPC Resent 代码生成器 (支持 TCP/UDP)") print("=" * 50) # 加载配置 try: config = load_config() print("\n[✓] 配置文件加载成功") except Exception as e: print(f"\n[!] 加载配置文件失败: {e}") sys.exit(1) # 检查配置 if not check_config(config): print("\n[!] 配置检查失败,请修正后重试") sys.exit(1) if args.check: print("\n[✓] 仅检查模式,不生成文件") sys.exit(0) # 提取配置 global_config = config['global'] devices = [d for d in config['devices'] if d.get('enabled', True)] # 确保目录存在 ensure_dirs(global_config['base_dir']) print(f"[✓] 目录检查完成: {global_config['base_dir']}") print("\n" + "-" * 50) print("开始生成文件...") print("-" * 50) # 生成文件 generated_files = [] # 1. 为所有使用到的协议生成 sender.py print("\n[生成发送模块]") generated_files.extend(generate_all_senders(config, global_config)) # 2. 为每个设备生成监听服务和保活脚本 print("\n[生成设备服务]") for device in devices: print(f"\n处理设备: {device['name']} ({device['id']}) - {device.get('protocol', 'tcp').upper()}") generated_files.append(generate_listener(device, global_config, config['mappings'])) generated_files.append(generate_crond(device, global_config)) # 3. 生成 crontab print("\n[生成配置文件]") generated_files.append(generate_crontab(config['devices'], global_config)) # 4. 生成启动脚本 generated_files.append(generate_start_script(devices, global_config)) print("\n" + "=" * 50) print(f"[✓] 代码生成完成!共生成 {len(generated_files)} 个文件") print("=" * 50) print(f"\n部署目录: {global_config['base_dir']}") print("\n使用方法:") print(f" 1. 启动服务: ./{generated_files[-1]}") print(f" 2. 配置 crontab: crontab crontab.txt") print("\n支持的协议:") protocols = set(d.get('protocol', 'tcp').upper() for d in devices) for p in protocols: print(f" - {p}") if __name__ == '__main__': main()