411 lines
13 KiB
Python
411 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: UTF-8 -*-
|
||
"""
|
||
代码生成器:根据配置文件自动生成 Python 执行文件
|
||
支持 TCP 和 UDP 协议
|
||
|
||
用法: python scripts/generate.py [--check]
|
||
--check: 仅检查配置,不生成文件
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import sys
|
||
import argparse
|
||
|
||
|
||
def load_config(config_file='config/devices.json'):
|
||
"""加载配置文件"""
|
||
with open(config_file, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
|
||
|
||
def ensure_dirs(base_dir):
|
||
"""确保必要的目录存在"""
|
||
dirs = [
|
||
os.path.join(base_dir, 'bin'),
|
||
os.path.join(base_dir, 'crond'),
|
||
os.path.join(base_dir, 'log')
|
||
]
|
||
for d in dirs:
|
||
os.makedirs(d, exist_ok=True)
|
||
|
||
|
||
def dict_to_python_string(d):
|
||
"""将字典转换为 Python 代码字符串"""
|
||
lines = ['{']
|
||
for k, v in d.items():
|
||
if isinstance(v, str):
|
||
lines.append(f' "{k}": "{v}",')
|
||
else:
|
||
lines.append(f' "{k}": {v},')
|
||
lines.append('}')
|
||
return '\n'.join(lines)
|
||
|
||
|
||
def get_template_path(protocol, template_type):
|
||
"""
|
||
根据协议类型和模板类型获取模板文件路径
|
||
protocol: 'tcp' 或 'udp'
|
||
template_type: 'listener', 'crond', 'sender'
|
||
"""
|
||
protocol = protocol.lower()
|
||
if template_type == 'listener':
|
||
return f'templates/listener_{protocol}.py.tpl'
|
||
elif template_type == 'crond':
|
||
return f'templates/crond_{protocol}.py.tpl'
|
||
elif template_type == 'sender':
|
||
return f'templates/sender_{protocol}.py.tpl'
|
||
else:
|
||
raise ValueError(f"未知的模板类型: {template_type}")
|
||
|
||
|
||
def generate_listener(device, global_config, mappings):
|
||
"""生成监听服务文件"""
|
||
protocol = device.get('protocol', 'tcp').lower()
|
||
template_file = get_template_path(protocol, 'listener')
|
||
|
||
# 检查模板文件是否存在
|
||
if not os.path.exists(template_file):
|
||
print(f"[!] 警告: 模板文件不存在: {template_file},使用TCP模板作为后备")
|
||
template_file = 'templates/listener_tcp.py.tpl'
|
||
|
||
with open(template_file, 'r', encoding='utf-8') as f:
|
||
template = f.read()
|
||
|
||
# 构建命令映射字典字符串
|
||
mappings_str = "{\n"
|
||
for cmd, op in mappings.items():
|
||
mappings_str += f' "{cmd}": "{op}",\n'
|
||
# 添加默认映射(open1-open8, close1-close8)
|
||
for i in range(1, 9):
|
||
mappings_str += f' "open{i}": "open{i}",\n'
|
||
mappings_str += f' "close{i}": "close{i}",\n'
|
||
mappings_str += " }"
|
||
|
||
# 替换模板变量
|
||
content = template.format(
|
||
device_id=device['id'],
|
||
device_name=device['name'],
|
||
tms_server_ip=global_config['tms_server_ip'],
|
||
listen_port=device['listen_port'],
|
||
upc_ip=device['upc_ip'],
|
||
upc_port=device['upc_port'],
|
||
base_dir=global_config['base_dir'],
|
||
python_path=global_config['python_path'],
|
||
command_mappings=mappings_str
|
||
)
|
||
|
||
output_file = os.path.join(global_config['base_dir'], 'bin', f"{device['id']}.py")
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
f.write(content)
|
||
|
||
os.chmod(output_file, 0o755)
|
||
print(f"[生成] {output_file} ({protocol.upper()})")
|
||
return output_file
|
||
|
||
|
||
def generate_crond(device, global_config):
|
||
"""生成保活检查文件"""
|
||
protocol = device.get('protocol', 'tcp').lower()
|
||
template_file = get_template_path(protocol, 'crond')
|
||
|
||
# 检查模板文件是否存在
|
||
if not os.path.exists(template_file):
|
||
print(f"[!] 警告: 模板文件不存在: {template_file},使用TCP模板作为后备")
|
||
template_file = 'templates/crond_tcp.py.tpl'
|
||
|
||
with open(template_file, 'r', encoding='utf-8') as f:
|
||
template = f.read()
|
||
|
||
content = template.format(
|
||
device_id=device['id'],
|
||
device_name=device['name'],
|
||
tms_server_ip=global_config['tms_server_ip'],
|
||
listen_port=device['listen_port'],
|
||
base_dir=global_config['base_dir'],
|
||
python_path=global_config['python_path']
|
||
)
|
||
|
||
output_file = os.path.join(global_config['base_dir'], 'crond', f"upcrond-{device['id']}.py")
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
f.write(content)
|
||
|
||
os.chmod(output_file, 0o755)
|
||
print(f"[生成] {output_file} ({protocol.upper()})")
|
||
return output_file
|
||
|
||
|
||
def generate_sender(commands, global_config, protocol='tcp'):
|
||
"""生成发送模块文件"""
|
||
protocol = protocol.lower()
|
||
template_file = get_template_path(protocol, 'sender')
|
||
|
||
# 检查模板文件是否存在
|
||
if not os.path.exists(template_file):
|
||
print(f"[!] 警告: 模板文件不存在: {template_file},使用TCP模板作为后备")
|
||
template_file = 'templates/sender_tcp.py.tpl'
|
||
|
||
with open(template_file, 'r', encoding='utf-8') as f:
|
||
template = f.read()
|
||
|
||
# 构建指令定义字典字符串
|
||
cmd_defs = "{\n"
|
||
for cmd, hex_str in commands.items():
|
||
cmd_defs += f' "{cmd}": "{hex_str}",\n'
|
||
cmd_defs += "}"
|
||
|
||
content = template.format(
|
||
command_definitions=cmd_defs
|
||
)
|
||
|
||
# 根据协议生成不同的文件名
|
||
output_file = os.path.join(global_config['base_dir'], 'bin', f'sender_{protocol}.py')
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
f.write(content)
|
||
|
||
os.chmod(output_file, 0o755)
|
||
print(f"[生成] {output_file} ({protocol.upper()})")
|
||
return output_file
|
||
|
||
|
||
def generate_all_senders(config, global_config):
|
||
"""为所有使用到的协议生成 sender 文件"""
|
||
commands = config['commands']
|
||
devices = config['devices']
|
||
|
||
# 收集所有使用的协议类型
|
||
protocols = set()
|
||
for device in devices:
|
||
if device.get('enabled', True):
|
||
protocol = device.get('protocol', 'tcp').lower()
|
||
protocols.add(protocol)
|
||
|
||
generated = []
|
||
for protocol in protocols:
|
||
generated.append(generate_sender(commands, global_config, protocol))
|
||
|
||
return generated
|
||
|
||
|
||
def generate_crontab(devices, global_config):
|
||
"""生成 crontab 配置"""
|
||
lines = []
|
||
|
||
for device in devices:
|
||
if not device.get('enabled', True):
|
||
continue
|
||
# 每分钟执行一次保活检查
|
||
lines.append(f"* */1 * * * {global_config['python_path']} {global_config['base_dir']}/crond/upcrond-{device['id']}.py")
|
||
|
||
# 添加日志清理任务
|
||
lines.append(f"0 0 * * * {global_config['base_dir']}/cutlog.sh")
|
||
|
||
content = '\n'.join(lines) + '\n'
|
||
output_file = 'crontab.txt'
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
f.write(content)
|
||
|
||
print(f"[生成] {output_file}")
|
||
return output_file
|
||
|
||
|
||
def generate_control_script(devices, global_config):
|
||
"""生成控制脚本 (start/stop/restart/status)"""
|
||
|
||
template_file = 'templates/control.sh.tpl'
|
||
with open(template_file, 'r', encoding='utf-8') as f:
|
||
template = f.read()
|
||
|
||
device_list = []
|
||
for device in devices:
|
||
if device.get('enabled', True):
|
||
device_list.append({
|
||
'id': device['id'],
|
||
'name': device['name'],
|
||
'protocol': device.get('protocol', 'tcp').upper()
|
||
})
|
||
|
||
# 构建设备列表字符串
|
||
device_defs = "\n".join([
|
||
f" \"{d['id']}\" \"{d['name']}\" \"{d['protocol']}\""
|
||
for d in device_list
|
||
])
|
||
|
||
# 替换模板变量(使用 replace 避免与 bash 变量冲突)
|
||
content = template.replace('{base_dir}', global_config['base_dir'])
|
||
content = content.replace('{python_path}', global_config['python_path'])
|
||
content = content.replace('{device_defs}', device_defs)
|
||
|
||
output_file = 'control.sh'
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
f.write(content)
|
||
|
||
os.chmod(output_file, 0o755)
|
||
print(f"[生成] {output_file}")
|
||
return output_file
|
||
|
||
|
||
def check_config(config):
|
||
"""检查配置是否合法"""
|
||
errors = []
|
||
warnings = []
|
||
|
||
global_cfg = config.get('global', {})
|
||
devices = config.get('devices', [])
|
||
commands = config.get('commands', {})
|
||
mappings = config.get('mappings', {})
|
||
|
||
# 检查全局配置
|
||
required_globals = ['tms_server_ip', 'python_path', 'base_dir']
|
||
for key in required_globals:
|
||
if key not in global_cfg:
|
||
errors.append(f"缺少全局配置: {key}")
|
||
|
||
# 检查设备配置
|
||
if not devices:
|
||
errors.append("没有配置任何设备")
|
||
|
||
device_ids = []
|
||
listen_ports = []
|
||
|
||
for device in devices:
|
||
device_id = device.get('id')
|
||
if not device_id:
|
||
errors.append("设备缺少 id 字段")
|
||
continue
|
||
|
||
if device_id in device_ids:
|
||
errors.append(f"设备 id 重复: {device_id}")
|
||
device_ids.append(device_id)
|
||
|
||
required_device_fields = ['name', 'upc_ip', 'upc_port', 'listen_port']
|
||
for field in required_device_fields:
|
||
if field not in device:
|
||
errors.append(f"设备 {device_id} 缺少字段: {field}")
|
||
|
||
# 检查协议类型
|
||
protocol = device.get('protocol', 'tcp').lower()
|
||
if protocol not in ['tcp', 'udp']:
|
||
errors.append(f"设备 {device_id} 的协议类型无效: {protocol} (必须是 tcp 或 udp)")
|
||
|
||
if 'listen_port' in device:
|
||
port = device['listen_port']
|
||
if port in listen_ports:
|
||
errors.append(f"设备 {device_id} 的监听端口 {port} 与其他设备冲突")
|
||
listen_ports.append(port)
|
||
|
||
# 检查指令定义
|
||
if not commands:
|
||
warnings.append("没有定义任何指令")
|
||
|
||
# 检查映射
|
||
for cmd, op in mappings.items():
|
||
if op not in commands and not any(op == f"open{i}" or op == f"close{i}" for i in range(1, 9)):
|
||
warnings.append(f"映射 '{cmd}' -> '{op}' 的目标指令未在 commands 中定义")
|
||
|
||
# 检查模板文件是否存在
|
||
protocols = set()
|
||
for device in devices:
|
||
if device.get('enabled', True):
|
||
protocols.add(device.get('protocol', 'tcp').lower())
|
||
|
||
for protocol in protocols:
|
||
for template_type in ['listener', 'crond', 'sender']:
|
||
template_file = get_template_path(protocol, template_type)
|
||
if not os.path.exists(template_file):
|
||
warnings.append(f"模板文件不存在: {template_file}")
|
||
|
||
# 输出检查结果
|
||
if errors:
|
||
print("\n[!] 配置错误:")
|
||
for e in errors:
|
||
print(f" - {e}")
|
||
|
||
if warnings:
|
||
print("\n[*] 配置警告:")
|
||
for w in warnings:
|
||
print(f" - {w}")
|
||
|
||
if not errors and not warnings:
|
||
print("\n[✓] 配置检查通过")
|
||
return True
|
||
|
||
return len(errors) == 0
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='UPC Resent 代码生成器 (支持 TCP/UDP)')
|
||
parser.add_argument('--check', action='store_true', help='仅检查配置,不生成文件')
|
||
args = parser.parse_args()
|
||
|
||
print("=" * 50)
|
||
print("UPC Resent 代码生成器 (支持 TCP/UDP)")
|
||
print("=" * 50)
|
||
|
||
# 加载配置
|
||
try:
|
||
config = load_config()
|
||
print("\n[✓] 配置文件加载成功")
|
||
except Exception as e:
|
||
print(f"\n[!] 加载配置文件失败: {e}")
|
||
sys.exit(1)
|
||
|
||
# 检查配置
|
||
if not check_config(config):
|
||
print("\n[!] 配置检查失败,请修正后重试")
|
||
sys.exit(1)
|
||
|
||
if args.check:
|
||
print("\n[✓] 仅检查模式,不生成文件")
|
||
sys.exit(0)
|
||
|
||
# 提取配置
|
||
global_config = config['global']
|
||
devices = [d for d in config['devices'] if d.get('enabled', True)]
|
||
|
||
# 确保目录存在
|
||
ensure_dirs(global_config['base_dir'])
|
||
print(f"[✓] 目录检查完成: {global_config['base_dir']}")
|
||
|
||
print("\n" + "-" * 50)
|
||
print("开始生成文件...")
|
||
print("-" * 50)
|
||
|
||
# 生成文件
|
||
generated_files = []
|
||
|
||
# 1. 为所有使用到的协议生成 sender.py
|
||
print("\n[生成发送模块]")
|
||
generated_files.extend(generate_all_senders(config, global_config))
|
||
|
||
# 2. 为每个设备生成监听服务和保活脚本
|
||
print("\n[生成设备服务]")
|
||
for device in devices:
|
||
print(f"\n处理设备: {device['name']} ({device['id']}) - {device.get('protocol', 'tcp').upper()}")
|
||
generated_files.append(generate_listener(device, global_config, config['mappings']))
|
||
generated_files.append(generate_crond(device, global_config))
|
||
|
||
# 3. 生成 crontab
|
||
print("\n[生成配置文件]")
|
||
generated_files.append(generate_crontab(config['devices'], global_config))
|
||
|
||
# 4. 生成控制脚本
|
||
generated_files.append(generate_control_script(devices, global_config))
|
||
|
||
print("\n" + "=" * 50)
|
||
print(f"[✓] 代码生成完成!共生成 {len(generated_files)} 个文件")
|
||
print("=" * 50)
|
||
print(f"\n部署目录: {global_config['base_dir']}")
|
||
print("\n使用方法:")
|
||
print(f" 1. 控制服务: ./{generated_files[-1]} {{start|stop|restart|status}}")
|
||
print(f" 2. 配置 crontab: crontab crontab.txt")
|
||
print("\n支持的协议:")
|
||
protocols = set(d.get('protocol', 'tcp').upper() for d in devices)
|
||
for p in protocols:
|
||
print(f" - {p}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|