Files
UPC-Resent/scripts/generate.py
2026-03-23 15:57:58 +08:00

482 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
代码生成器:根据配置文件自动生成 Python 执行文件
支持 TCP 和 UDP 协议
用法: python scripts/generate.py [--check]
--check: 仅检查配置,不生成文件
"""
import json
import os
import sys
import argparse
def load_config(config_file='config/devices.json'):
"""加载配置文件"""
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
def ensure_dirs(base_dir):
"""确保必要的目录存在"""
dirs = [
os.path.join(base_dir, 'bin'),
os.path.join(base_dir, 'crond'),
os.path.join(base_dir, 'log')
]
for d in dirs:
os.makedirs(d, exist_ok=True)
def dict_to_python_string(d):
"""将字典转换为 Python 代码字符串"""
lines = ['{']
for k, v in d.items():
if isinstance(v, str):
lines.append(f' "{k}": "{v}",')
else:
lines.append(f' "{k}": {v},')
lines.append('}')
return '\n'.join(lines)
def get_template_path(protocol, template_type):
"""
根据协议类型和模板类型获取模板文件路径
protocol: 'tcp''udp'
template_type: 'listener', 'crond', 'sender'
"""
protocol = protocol.lower()
if template_type == 'listener':
return f'templates/listener_{protocol}.py.tpl'
elif template_type == 'crond':
return f'templates/crond_{protocol}.py.tpl'
elif template_type == 'sender':
return f'templates/sender_{protocol}.py.tpl'
else:
raise ValueError(f"未知的模板类型: {template_type}")
def generate_listener(device, global_config, mappings, command_sets, global_commands):
"""生成监听服务文件"""
listen_protocol = device.get('listen_protocol', 'tcp').lower()
device_protocol = device.get('device_protocol', 'tcp').lower()
template_file = get_template_path(listen_protocol, 'listener')
# 检查模板文件是否存在
if not os.path.exists(template_file):
print(f"[!] 警告: 模板文件不存在: {template_file}使用TCP模板作为后备")
template_file = 'templates/listener_tcp.py.tpl'
with open(template_file, 'r', encoding='utf-8') as f:
template = f.read()
# 获取该设备的指令集
command_set_id = device.get('command_set')
if command_set_id and command_set_id in command_sets:
device_commands = command_sets[command_set_id].get('commands', {})
else:
# 兼容旧配置
device_commands = global_commands
# 构建命令映射字典字符串
# 1. 首先添加该设备指令集中所有指令的默认映射(指令名映射到自身)
device_mappings = {}
for cmd in device_commands.keys():
device_mappings[cmd] = cmd
# 2. 然后添加全局 mappings 中的映射(外部命令 -> 内部指令)
# 但只添加目标指令在该设备指令集中存在的映射
for external_cmd, internal_cmd in mappings.items():
if internal_cmd in device_commands:
device_mappings[external_cmd] = internal_cmd
# 3. 生成映射表字符串
mappings_str = "{\n"
for cmd, op in device_mappings.items():
mappings_str += f' "{cmd}": "{op}",\n'
mappings_str += " }"
# 确定使用哪个 sender 脚本(每个设备独立的 sender
sender_file = f"{global_config['base_dir']}/bin/sender_{device['id']}.py"
# 替换模板变量
content = template.format(
device_id=device['id'],
device_name=device['name'],
tms_server_ip=global_config['tms_server_ip'],
listen_port=device['listen_port'],
upc_ip=device['upc_ip'],
upc_port=device['upc_port'],
base_dir=global_config['base_dir'],
python_path=global_config['python_path'],
command_mappings=mappings_str,
sender_file=sender_file,
device_protocol=device_protocol.upper()
)
output_file = os.path.join(global_config['base_dir'], 'bin', f"{device['id']}.py")
with open(output_file, 'w', encoding='utf-8') as f:
f.write(content)
os.chmod(output_file, 0o755)
print(f"[生成] {output_file} (监听:{listen_protocol.upper()}, 设备:{device_protocol.upper()})")
return output_file
def generate_crond(device, global_config):
"""生成保活检查文件"""
listen_protocol = device.get('listen_protocol', 'tcp').lower()
device_protocol = device.get('device_protocol', 'tcp').lower()
template_file = get_template_path(listen_protocol, 'crond')
# 检查模板文件是否存在
if not os.path.exists(template_file):
print(f"[!] 警告: 模板文件不存在: {template_file}使用TCP模板作为后备")
template_file = 'templates/crond_tcp.py.tpl'
with open(template_file, 'r', encoding='utf-8') as f:
template = f.read()
content = template.format(
device_id=device['id'],
device_name=device['name'],
tms_server_ip=global_config['tms_server_ip'],
listen_port=device['listen_port'],
base_dir=global_config['base_dir'],
python_path=global_config['python_path'],
device_protocol=device_protocol.upper()
)
output_file = os.path.join(global_config['base_dir'], 'crond', f"upcrond-{device['id']}.py")
with open(output_file, 'w', encoding='utf-8') as f:
f.write(content)
os.chmod(output_file, 0o755)
print(f"[生成] {output_file} (监听:{listen_protocol.upper()}, 设备:{device_protocol.upper()})")
return output_file
def generate_sender_for_device(device, command_set, global_config):
"""为指定设备生成独立的发送模块文件"""
device_id = device['id']
device_protocol = device.get('device_protocol', device.get('protocol', 'tcp')).lower()
template_file = get_template_path(device_protocol, 'sender')
# 检查模板文件是否存在
if not os.path.exists(template_file):
print(f"[!] 警告: 模板文件不存在: {template_file}使用TCP模板作为后备")
template_file = 'templates/sender_tcp.py.tpl'
with open(template_file, 'r', encoding='utf-8') as f:
template = f.read()
# 构建指令定义字典字符串
commands = command_set.get('commands', {})
cmd_defs = "{\n"
for cmd, hex_str in commands.items():
cmd_defs += f' "{cmd}": "{hex_str}",\n'
cmd_defs += "}"
content = template.format(
command_definitions=cmd_defs
)
# 为每个设备生成独立的 sender 文件
output_file = os.path.join(global_config['base_dir'], 'bin', f'sender_{device_id}.py')
with open(output_file, 'w', encoding='utf-8') as f:
f.write(content)
os.chmod(output_file, 0o755)
print(f"[生成] {output_file} ({device_protocol.upper()}, 指令集: {command_set.get('name', '未命名')})")
return output_file
def generate_all_senders(config, global_config):
"""为所有设备生成独立的 sender 文件"""
command_sets = config.get('command_sets', {})
devices = config['devices']
generated = []
for device in devices:
if not device.get('enabled', True):
continue
# 获取设备使用的指令集
command_set_id = device.get('command_set', 'default')
if command_set_id not in command_sets:
# 兼容旧配置:如果找不到指令集,使用全局 commands
if 'commands' in config:
command_set = {'name': '全局指令', 'commands': config['commands']}
else:
print(f"[!] 警告: 设备 {device['id']} 指定的指令集 '{command_set_id}' 不存在")
continue
else:
command_set = command_sets[command_set_id]
generated.append(generate_sender_for_device(device, command_set, global_config))
return generated
def generate_crontab(devices, global_config):
"""生成 crontab 配置"""
lines = []
for device in devices:
if not device.get('enabled', True):
continue
# 每分钟执行一次保活检查
lines.append(f"* */1 * * * {global_config['python_path']} {global_config['base_dir']}/crond/upcrond-{device['id']}.py")
# 添加日志清理任务
lines.append(f"0 0 * * * {global_config['base_dir']}/cutlog.sh")
content = '\n'.join(lines) + '\n'
output_file = 'crontab.txt'
with open(output_file, 'w', encoding='utf-8') as f:
f.write(content)
print(f"[生成] {output_file}")
return output_file
def generate_control_script(devices, global_config):
"""生成控制脚本 (start/stop/restart/status)"""
template_file = 'templates/control.sh.tpl'
with open(template_file, 'r', encoding='utf-8') as f:
template = f.read()
device_list = []
for device in devices:
if device.get('enabled', True):
listen_protocol = device.get('listen_protocol', device.get('protocol', 'tcp')).upper()
device_protocol = device.get('device_protocol', device.get('protocol', 'tcp')).upper()
protocol_str = f"{listen_protocol}->{device_protocol}"
device_list.append({
'id': device['id'],
'name': device['name'],
'protocol': protocol_str
})
# 构建设备列表字符串: ID 名称 协议
device_defs = "\n".join([
f" \"{d['id']}\" \"{d['name']}\" \"{d['protocol']}\""
for d in device_list
])
# 替换模板变量(使用 replace 避免与 bash 变量冲突)
content = template.replace('{base_dir}', global_config['base_dir'])
content = content.replace('{python_path}', global_config['python_path'])
content = content.replace('{device_defs}', device_defs)
output_file = 'control.sh'
with open(output_file, 'w', encoding='utf-8') as f:
f.write(content)
os.chmod(output_file, 0o755)
print(f"[生成] {output_file}")
return output_file
def check_config(config):
"""检查配置是否合法"""
errors = []
warnings = []
global_cfg = config.get('global', {})
devices = config.get('devices', [])
command_sets = config.get('command_sets', {})
commands = config.get('commands', {}) # 兼容旧配置
mappings = config.get('mappings', {})
# 检查全局配置
required_globals = ['tms_server_ip', 'python_path', 'base_dir']
for key in required_globals:
if key not in global_cfg:
errors.append(f"缺少全局配置: {key}")
# 检查设备配置
if not devices:
errors.append("没有配置任何设备")
device_ids = []
listen_ports = []
# 收集所有指令(用于验证映射)
all_commands = set()
if command_sets:
for set_id, cmd_set in command_sets.items():
if 'commands' in cmd_set:
all_commands.update(cmd_set['commands'].keys())
if commands:
all_commands.update(commands.keys())
for device in devices:
device_id = device.get('id')
if not device_id:
errors.append("设备缺少 id 字段")
continue
if device_id in device_ids:
errors.append(f"设备 id 重复: {device_id}")
device_ids.append(device_id)
required_device_fields = ['name', 'upc_ip', 'upc_port', 'listen_port']
for field in required_device_fields:
if field not in device:
errors.append(f"设备 {device_id} 缺少字段: {field}")
# 检查协议类型(支持新旧配置兼容)
listen_protocol = device.get('listen_protocol', device.get('protocol', 'tcp')).lower()
device_protocol = device.get('device_protocol', device.get('protocol', 'tcp')).lower()
if listen_protocol not in ['tcp', 'udp']:
errors.append(f"设备 {device_id} 的监听协议类型无效: {listen_protocol} (必须是 tcp 或 udp)")
if device_protocol not in ['tcp', 'udp']:
errors.append(f"设备 {device_id} 的设备协议类型无效: {device_protocol} (必须是 tcp 或 udp)")
# 检查指令集
if 'command_set' in device:
command_set_id = device['command_set']
if command_set_id not in command_sets:
errors.append(f"设备 {device_id} 指定的指令集 '{command_set_id}' 不存在")
elif not commands and not command_sets:
warnings.append(f"设备 {device_id} 未指定指令集,且没有全局指令")
if 'listen_port' in device:
port = device['listen_port']
if port in listen_ports:
errors.append(f"设备 {device_id} 的监听端口 {port} 与其他设备冲突")
listen_ports.append(port)
# 检查指令集
if not command_sets and not commands:
warnings.append("没有定义任何指令集或指令")
# 检查映射
for cmd, op in mappings.items():
if op not in all_commands and not any(op == f"open{i}" or op == f"close{i}" for i in range(1, 9)):
warnings.append(f"映射 '{cmd}' -> '{op}' 的目标指令未在任何指令集中定义")
# 检查模板文件是否存在
listen_protocols = set()
device_protocols = set()
for device in devices:
if device.get('enabled', True):
listen_protocols.add(device.get('listen_protocol', device.get('protocol', 'tcp')).lower())
device_protocols.add(device.get('device_protocol', device.get('protocol', 'tcp')).lower())
# 检查监听协议模板
for protocol in listen_protocols:
for template_type in ['listener', 'crond']:
template_file = get_template_path(protocol, template_type)
if not os.path.exists(template_file):
warnings.append(f"模板文件不存在: {template_file}")
# 检查设备协议模板sender
for protocol in device_protocols:
template_file = get_template_path(protocol, 'sender')
if not os.path.exists(template_file):
warnings.append(f"模板文件不存在: {template_file}")
# 输出检查结果
if errors:
print("\n[!] 配置错误:")
for e in errors:
print(f" - {e}")
if warnings:
print("\n[*] 配置警告:")
for w in warnings:
print(f" - {w}")
if not errors and not warnings:
print("\n[✓] 配置检查通过")
return True
return len(errors) == 0
def main():
parser = argparse.ArgumentParser(description='UPC Resent 代码生成器 (支持 TCP/UDP)')
parser.add_argument('--check', action='store_true', help='仅检查配置,不生成文件')
args = parser.parse_args()
print("=" * 50)
print("UPC Resent 代码生成器 (支持 TCP/UDP)")
print("=" * 50)
# 加载配置
try:
config = load_config()
print("\n[✓] 配置文件加载成功")
except Exception as e:
print(f"\n[!] 加载配置文件失败: {e}")
sys.exit(1)
# 检查配置
if not check_config(config):
print("\n[!] 配置检查失败,请修正后重试")
sys.exit(1)
if args.check:
print("\n[✓] 仅检查模式,不生成文件")
sys.exit(0)
# 提取配置
global_config = config['global']
devices = [d for d in config['devices'] if d.get('enabled', True)]
# 确保目录存在
ensure_dirs(global_config['base_dir'])
print(f"[✓] 目录检查完成: {global_config['base_dir']}")
print("\n" + "-" * 50)
print("开始生成文件...")
print("-" * 50)
# 生成文件
generated_files = []
# 1. 为所有使用到的协议生成 sender.py
print("\n[生成发送模块]")
generated_files.extend(generate_all_senders(config, global_config))
# 2. 为每个设备生成监听服务和保活脚本
print("\n[生成设备服务]")
command_sets = config.get('command_sets', {})
global_commands = config.get('commands', {})
for device in devices:
print(f"\n处理设备: {device['name']} ({device['id']}) - {device.get('protocol', 'tcp').upper()}")
generated_files.append(generate_listener(device, global_config, config['mappings'], command_sets, global_commands))
generated_files.append(generate_crond(device, global_config))
# 3. 生成 crontab
print("\n[生成配置文件]")
generated_files.append(generate_crontab(config['devices'], global_config))
# 4. 生成控制脚本
generated_files.append(generate_control_script(devices, global_config))
print("\n" + "=" * 50)
print(f"[✓] 代码生成完成!共生成 {len(generated_files)} 个文件")
print("=" * 50)
print(f"\n部署目录: {global_config['base_dir']}")
print("\n使用方法:")
print(f" 1. 控制服务: ./{generated_files[-1]} {{start|stop|restart|status}}")
print(f" 2. 配置 crontab: crontab crontab.txt")
print("\n支持的协议:")
protocols = set(d.get('protocol', 'tcp').upper() for d in devices)
for p in protocols:
print(f" - {p}")
if __name__ == '__main__':
main()