#!/usr/bin/env python3 # -*- coding: UTF-8 -*- """ 代码生成器:根据配置文件自动生成 Python 执行文件 支持 TCP 和 UDP 协议 用法: python scripts/generate.py [--check] --check: 仅检查配置,不生成文件 """ import json import os import sys import argparse def load_config(config_file='config/devices.json'): """加载配置文件""" with open(config_file, 'r', encoding='utf-8') as f: return json.load(f) def ensure_dirs(base_dir): """确保必要的目录存在""" dirs = [ os.path.join(base_dir, 'bin'), os.path.join(base_dir, 'crond'), os.path.join(base_dir, 'log') ] for d in dirs: os.makedirs(d, exist_ok=True) def dict_to_python_string(d): """将字典转换为 Python 代码字符串""" lines = ['{'] for k, v in d.items(): if isinstance(v, str): lines.append(f' "{k}": "{v}",') else: lines.append(f' "{k}": {v},') lines.append('}') return '\n'.join(lines) def get_template_path(protocol, template_type): """ 根据协议类型和模板类型获取模板文件路径 protocol: 'tcp' 或 'udp' template_type: 'listener', 'crond', 'sender' """ protocol = protocol.lower() if template_type == 'listener': return f'templates/listener_{protocol}.py.tpl' elif template_type == 'crond': return f'templates/crond_{protocol}.py.tpl' elif template_type == 'sender': return f'templates/sender_{protocol}.py.tpl' else: raise ValueError(f"未知的模板类型: {template_type}") def generate_listener(device, global_config, mappings, command_sets, global_commands): """生成监听服务文件""" listen_protocol = device.get('listen_protocol', 'tcp').lower() device_protocol = device.get('device_protocol', 'tcp').lower() template_file = get_template_path(listen_protocol, 'listener') # 检查模板文件是否存在 if not os.path.exists(template_file): print(f"[!] 警告: 模板文件不存在: {template_file},使用TCP模板作为后备") template_file = 'templates/listener_tcp.py.tpl' with open(template_file, 'r', encoding='utf-8') as f: template = f.read() # 获取该设备的指令集 command_set_id = device.get('command_set') if command_set_id and command_set_id in command_sets: device_commands = command_sets[command_set_id].get('commands', {}) else: # 兼容旧配置 device_commands = global_commands # 构建命令映射字典字符串 # 1. 首先添加该设备指令集中所有指令的默认映射(指令名映射到自身) device_mappings = {} for cmd in device_commands.keys(): device_mappings[cmd] = cmd # 2. 然后添加全局 mappings 中的映射(外部命令 -> 内部指令) # 但只添加目标指令在该设备指令集中存在的映射 for external_cmd, internal_cmd in mappings.items(): if internal_cmd in device_commands: device_mappings[external_cmd] = internal_cmd # 3. 生成映射表字符串 mappings_str = "{\n" for cmd, op in device_mappings.items(): mappings_str += f' "{cmd}": "{op}",\n' mappings_str += " }" # 确定使用哪个 sender 脚本(每个设备独立的 sender) sender_file = f"{global_config['base_dir']}/bin/sender_{device['id']}.py" # 获取 keep_alive 配置(仅TCP有效,默认False) keep_alive = device.get('keep_alive', False) # 替换模板变量 content = template.format( device_id=device['id'], device_name=device['name'], tms_server_ip=global_config['tms_server_ip'], listen_port=device['listen_port'], upc_ip=device['upc_ip'], upc_port=device['upc_port'], base_dir=global_config['base_dir'], python_path=global_config['python_path'], command_mappings=mappings_str, sender_file=sender_file, device_protocol=device_protocol.upper(), keep_alive='True' if keep_alive else 'False' ) output_file = os.path.join(global_config['base_dir'], 'bin', f"{device['id']}.py") with open(output_file, 'w', encoding='utf-8') as f: f.write(content) os.chmod(output_file, 0o755) print(f"[生成] {output_file} (监听:{listen_protocol.upper()}, 设备:{device_protocol.upper()})") return output_file def generate_crond(device, global_config): """生成保活检查文件""" listen_protocol = device.get('listen_protocol', 'tcp').lower() device_protocol = device.get('device_protocol', 'tcp').lower() template_file = get_template_path(listen_protocol, 'crond') # 检查模板文件是否存在 if not os.path.exists(template_file): print(f"[!] 警告: 模板文件不存在: {template_file},使用TCP模板作为后备") template_file = 'templates/crond_tcp.py.tpl' with open(template_file, 'r', encoding='utf-8') as f: template = f.read() content = template.format( device_id=device['id'], device_name=device['name'], tms_server_ip=global_config['tms_server_ip'], listen_port=device['listen_port'], base_dir=global_config['base_dir'], python_path=global_config['python_path'], device_protocol=device_protocol.upper() ) output_file = os.path.join(global_config['base_dir'], 'crond', f"upcrond-{device['id']}.py") with open(output_file, 'w', encoding='utf-8') as f: f.write(content) os.chmod(output_file, 0o755) print(f"[生成] {output_file} (监听:{listen_protocol.upper()}, 设备:{device_protocol.upper()})") return output_file def generate_sender_for_device(device, command_set, global_config): """为指定设备生成独立的发送模块文件""" device_id = device['id'] device_protocol = device.get('device_protocol', device.get('protocol', 'tcp')).lower() template_file = get_template_path(device_protocol, 'sender') # 检查模板文件是否存在 if not os.path.exists(template_file): print(f"[!] 警告: 模板文件不存在: {template_file},使用TCP模板作为后备") template_file = 'templates/sender_tcp.py.tpl' with open(template_file, 'r', encoding='utf-8') as f: template = f.read() # 构建指令定义字典字符串 commands = command_set.get('commands', {}) cmd_defs = "{\n" for cmd, hex_str in commands.items(): cmd_defs += f' "{cmd}": "{hex_str}",\n' cmd_defs += "}" content = template.format( command_definitions=cmd_defs ) # 为每个设备生成独立的 sender 文件 output_file = os.path.join(global_config['base_dir'], 'bin', f'sender_{device_id}.py') with open(output_file, 'w', encoding='utf-8') as f: f.write(content) os.chmod(output_file, 0o755) print(f"[生成] {output_file} ({device_protocol.upper()}, 指令集: {command_set.get('name', '未命名')})") return output_file def generate_all_senders(config, global_config): """为所有设备生成独立的 sender 文件""" command_sets = config.get('command_sets', {}) devices = config['devices'] generated = [] for device in devices: if not device.get('enabled', True): continue # 获取设备使用的指令集 command_set_id = device.get('command_set', 'default') if command_set_id not in command_sets: # 兼容旧配置:如果找不到指令集,使用全局 commands if 'commands' in config: command_set = {'name': '全局指令', 'commands': config['commands']} else: print(f"[!] 警告: 设备 {device['id']} 指定的指令集 '{command_set_id}' 不存在") continue else: command_set = command_sets[command_set_id] generated.append(generate_sender_for_device(device, command_set, global_config)) return generated def generate_crontab(devices, global_config): """生成 crontab 配置""" lines = [] for device in devices: if not device.get('enabled', True): continue # 每分钟执行一次保活检查 lines.append(f"* */1 * * * {global_config['python_path']} {global_config['base_dir']}/crond/upcrond-{device['id']}.py") # 添加日志清理任务 lines.append(f"0 0 * * * {global_config['base_dir']}/cutlog.sh") content = '\n'.join(lines) + '\n' output_file = 'crontab.txt' with open(output_file, 'w', encoding='utf-8') as f: f.write(content) # 同时复制到 base_dir 目录 base_dir = global_config['base_dir'] target_file = os.path.join(base_dir, 'crontab.txt') with open(target_file, 'w', encoding='utf-8') as f: f.write(content) print(f"[生成] {output_file} -> {target_file}") return output_file def generate_control_script(devices, global_config): """生成控制脚本 (start/stop/restart/status)""" template_file = 'templates/control.sh.tpl' with open(template_file, 'r', encoding='utf-8') as f: template = f.read() device_list = [] for device in devices: if device.get('enabled', True): listen_protocol = device.get('listen_protocol', device.get('protocol', 'tcp')).upper() device_protocol = device.get('device_protocol', device.get('protocol', 'tcp')).upper() protocol_str = f"{listen_protocol}->{device_protocol}" device_list.append({ 'id': device['id'], 'name': device['name'], 'protocol': protocol_str }) # 构建设备列表字符串: ID 名称 协议 device_defs = "\n".join([ f" \"{d['id']}\" \"{d['name']}\" \"{d['protocol']}\"" for d in device_list ]) # 替换模板变量(使用 replace 避免与 bash 变量冲突) content = template.replace('{base_dir}', global_config['base_dir']) content = content.replace('{python_path}', global_config['python_path']) content = content.replace('{device_defs}', device_defs) # 生成到项目根目录 output_file = 'control.sh' with open(output_file, 'w', encoding='utf-8') as f: f.write(content) os.chmod(output_file, 0o755) # 同时复制到 base_dir 目录 base_dir = global_config['base_dir'] target_file = os.path.join(base_dir, 'control.sh') with open(target_file, 'w', encoding='utf-8') as f: f.write(content) os.chmod(target_file, 0o755) print(f"[生成] {output_file} -> {target_file}") return output_file def check_config(config): """检查配置是否合法""" errors = [] warnings = [] global_cfg = config.get('global', {}) devices = config.get('devices', []) command_sets = config.get('command_sets', {}) commands = config.get('commands', {}) # 兼容旧配置 mappings = config.get('mappings', {}) # 检查全局配置 required_globals = ['tms_server_ip', 'python_path', 'base_dir'] for key in required_globals: if key not in global_cfg: errors.append(f"缺少全局配置: {key}") # 检查设备配置 if not devices: errors.append("没有配置任何设备") device_ids = [] listen_ports = [] # 收集所有指令(用于验证映射) all_commands = set() if command_sets: for set_id, cmd_set in command_sets.items(): if 'commands' in cmd_set: all_commands.update(cmd_set['commands'].keys()) if commands: all_commands.update(commands.keys()) for device in devices: device_id = device.get('id') if not device_id: errors.append("设备缺少 id 字段") continue if device_id in device_ids: errors.append(f"设备 id 重复: {device_id}") device_ids.append(device_id) required_device_fields = ['name', 'upc_ip', 'upc_port', 'listen_port'] for field in required_device_fields: if field not in device: errors.append(f"设备 {device_id} 缺少字段: {field}") # 检查协议类型(支持新旧配置兼容) listen_protocol = device.get('listen_protocol', device.get('protocol', 'tcp')).lower() device_protocol = device.get('device_protocol', device.get('protocol', 'tcp')).lower() if listen_protocol not in ['tcp', 'udp']: errors.append(f"设备 {device_id} 的监听协议类型无效: {listen_protocol} (必须是 tcp 或 udp)") if device_protocol not in ['tcp', 'udp']: errors.append(f"设备 {device_id} 的设备协议类型无效: {device_protocol} (必须是 tcp 或 udp)") # 检查指令集 if 'command_set' in device: command_set_id = device['command_set'] if command_set_id not in command_sets: errors.append(f"设备 {device_id} 指定的指令集 '{command_set_id}' 不存在") elif not commands and not command_sets: warnings.append(f"设备 {device_id} 未指定指令集,且没有全局指令") if 'listen_port' in device: port = device['listen_port'] if port in listen_ports: errors.append(f"设备 {device_id} 的监听端口 {port} 与其他设备冲突") listen_ports.append(port) # 检查指令集 if not command_sets and not commands: warnings.append("没有定义任何指令集或指令") # 检查映射 for cmd, op in mappings.items(): if op not in all_commands and not any(op == f"open{i}" or op == f"close{i}" for i in range(1, 9)): warnings.append(f"映射 '{cmd}' -> '{op}' 的目标指令未在任何指令集中定义") # 检查模板文件是否存在 listen_protocols = set() device_protocols = set() for device in devices: if device.get('enabled', True): listen_protocols.add(device.get('listen_protocol', device.get('protocol', 'tcp')).lower()) device_protocols.add(device.get('device_protocol', device.get('protocol', 'tcp')).lower()) # 检查监听协议模板 for protocol in listen_protocols: for template_type in ['listener', 'crond']: template_file = get_template_path(protocol, template_type) if not os.path.exists(template_file): warnings.append(f"模板文件不存在: {template_file}") # 检查设备协议模板(sender) for protocol in device_protocols: template_file = get_template_path(protocol, 'sender') if not os.path.exists(template_file): warnings.append(f"模板文件不存在: {template_file}") # 输出检查结果 if errors: print("\n[!] 配置错误:") for e in errors: print(f" - {e}") if warnings: print("\n[*] 配置警告:") for w in warnings: print(f" - {w}") if not errors and not warnings: print("\n[✓] 配置检查通过") return True return len(errors) == 0 def main(): parser = argparse.ArgumentParser(description='UPC Resent 代码生成器 (支持 TCP/UDP)') parser.add_argument('--check', action='store_true', help='仅检查配置,不生成文件') args = parser.parse_args() print("=" * 50) print("UPC Resent 代码生成器 (支持 TCP/UDP)") print("=" * 50) # 加载配置 try: config = load_config() print("\n[✓] 配置文件加载成功") except Exception as e: print(f"\n[!] 加载配置文件失败: {e}") sys.exit(1) # 检查配置 if not check_config(config): print("\n[!] 配置检查失败,请修正后重试") sys.exit(1) if args.check: print("\n[✓] 仅检查模式,不生成文件") sys.exit(0) # 提取配置 global_config = config['global'] devices = [d for d in config['devices'] if d.get('enabled', True)] # 确保目录存在 ensure_dirs(global_config['base_dir']) print(f"[✓] 目录检查完成: {global_config['base_dir']}") print("\n" + "-" * 50) print("开始生成文件...") print("-" * 50) # 生成文件 generated_files = [] # 1. 为所有使用到的协议生成 sender.py print("\n[生成发送模块]") generated_files.extend(generate_all_senders(config, global_config)) # 2. 为每个设备生成监听服务和保活脚本 print("\n[生成设备服务]") command_sets = config.get('command_sets', {}) global_commands = config.get('commands', {}) for device in devices: print(f"\n处理设备: {device['name']} ({device['id']}) - {device.get('protocol', 'tcp').upper()}") generated_files.append(generate_listener(device, global_config, config['mappings'], command_sets, global_commands)) generated_files.append(generate_crond(device, global_config)) # 3. 生成 crontab print("\n[生成配置文件]") generated_files.append(generate_crontab(config['devices'], global_config)) # 4. 生成控制脚本 generated_files.append(generate_control_script(devices, global_config)) print("\n" + "=" * 50) print(f"[✓] 代码生成完成!共生成 {len(generated_files)} 个文件") print("=" * 50) print(f"\n部署目录: {global_config['base_dir']}") print("\n使用方法:") print(f" 1. 控制服务: ./{generated_files[-1]} {{start|stop|restart|status}}") print(f" 2. 配置 crontab: crontab crontab.txt") print("\n支持的协议:") protocols = set(d.get('protocol', 'tcp').upper() for d in devices) for p in protocols: print(f" - {p}") if __name__ == '__main__': main()