const express = require('express'); const router = express.Router(); const { spawn, exec } = require('child_process'); const path = require('path'); const fs = require('fs'); const fsPromises = require('fs').promises; const util = require('util'); const execPromise = util.promisify(exec); const { getRunningXNEngineProcess, saveXNEngineProcess, updateXNEngineProcessStatus, deleteXNEngineProcess, getLatestRunningXNEngineProcess } = require('../utils/xnengine-process-utils'); const { getXNCorePath } = require('../utils/file-utils'); // 存储正在运行的仿真进程 const runningSimulations = new Map(); // 存储SSE连接 const sseClients = new Map(); // SSE中间件 function setupSSE(req, res) { // 设置SSE headers res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' }); // 发送初始注释以保持连接 res.write(':\n\n'); // 定期发送注释以保持连接 const keepAliveId = setInterval(() => { res.write(':\n\n'); }, 15000); // 当客户端断开连接时清理 req.on('close', () => { clearInterval(keepAliveId); // 如果存在simulationId,从SSE客户端列表中移除 const simulationId = req.params.id; if (simulationId && sseClients.has(simulationId)) { sseClients.get(simulationId).delete(res); if (sseClients.get(simulationId).size === 0) { sseClients.delete(simulationId); } } }); return { res, keepAliveId }; } // 向SSE客户端发送消息 function sendSSEMessage(simulationId, event, data) { if (!sseClients.has(simulationId)) return; const clients = sseClients.get(simulationId); const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; for (const client of clients) { client.write(message); } } // 检查进程是否在运行 async function isProcessRunning(pid) { try { process.kill(pid, 0); return true; } catch (error) { return false; } } // 检查进程是否为XNEngine async function isXNEngineProcess(pid) { try { const { stdout } = await execPromise(`ps -p ${pid} -o comm=`); return stdout.trim() === 'XNEngine'; } catch (error) { return false; } } // 获取实时输出的SSE接口 router.get('/simulation-output/:id', async (req, res) => { const simulationId = req.params.id; // 验证仿真ID if (!simulationId) { return res.status(400).json({ error: '缺少仿真ID' }); } // 初始化SSE连接 const { res: sseRes } = setupSSE(req, res); // 将该连接添加到SSE客户端列表 if (!sseClients.has(simulationId)) { sseClients.set(simulationId, new Set()); } sseClients.get(simulationId).add(sseRes); try { // 从数据库获取进程信息 const processInfo = await getRunningXNEngineProcess(simulationId); if (processInfo) { // 检查进程是否真的在运行且是XNEngine进程 const isRunning = await isProcessRunning(processInfo.pid); const isXNEngine = await isXNEngineProcess(processInfo.pid); if (isRunning && isXNEngine) { // 发送运行状态 sendSSEMessage(simulationId, 'status', { running: true, pid: processInfo.pid, startTime: processInfo.start_time, message: '已连接到运行中的XNEngine进程' }); // 将进程添加到runningSimulations const simulation = runningSimulations.get(simulationId) || { pid: processInfo.pid, startTime: new Date(processInfo.start_time).getTime(), output: '', errorOutput: '', logFile: processInfo.log_file }; runningSimulations.set(simulationId, simulation); // 使用tail命令来跟踪日志文件 const tailProcess = spawn('tail', ['-f', processInfo.log_file], { stdio: ['ignore', 'pipe', 'pipe'] }); // 保存tail进程引用 simulation.tailProcess = tailProcess; // 收集标准输出 tailProcess.stdout.on('data', (data) => { const chunk = data.toString('utf8'); simulation.output += chunk; // 推送到SSE客户端 sendSSEMessage(simulationId, 'output', { type: 'stdout', data: chunk }); }); // 当客户端断开连接时清理 req.on('close', () => { if (simulation.tailProcess) { simulation.tailProcess.kill(); simulation.tailProcess = null; } }); // 当tail进程结束时 tailProcess.on('close', (code) => { if (simulation.tailProcess === tailProcess) { simulation.tailProcess = null; } runningSimulations.delete(simulationId); sendSSEMessage(simulationId, 'status', { running: false, message: '仿真进程已结束' }); }); return; } else { // 进程已结束或不是XNEngine进程,删除数据库记录 await deleteXNEngineProcess(simulationId); } } // 如果没有找到运行中的进程,发送仿真不存在或已结束的消息 sendSSEMessage(simulationId, 'status', { running: false, message: '仿真不存在或已结束' }); } catch (error) { console.error('处理SSE连接失败:', error); sendSSEMessage(simulationId, 'status', { running: false, message: '连接失败: ' + error.message }); } }); // 修改run-simulation路由 router.post('/run-simulation', async (req, res) => { try { const { args, id } = req.body; const simulationId = id || Date.now().toString(); if (!args || !Array.isArray(args)) { return res.status(400).json({ error: '缺少必要参数', message: '缺少args参数或格式不正确' }); } // 检查是否有XNEngine进程在运行 try { const { stdout } = await execPromise('ps -ef | grep XNEngine | grep -v grep || true'); if (stdout.trim()) { const processes = stdout.trim().split('\n'); // 按启动时间排序,最早的进程通常是主进程 const sortedProcesses = processes.map(line => { const parts = line.trim().split(/\s+/); return { pid: parts[1], ppid: parts[2], startTime: parts[4], cmd: parts.slice(7).join(' ') }; }).sort((a, b) => a.startTime.localeCompare(b.startTime)); // 找到主进程 const mainProcess = sortedProcesses[0]; // 检查主进程是否真的在运行且是XNEngine进程 const isRunning = await isProcessRunning(mainProcess.pid); const isXNEngine = await isXNEngineProcess(mainProcess.pid); if (isRunning && isXNEngine) { // 检查数据库中是否已有该进程记录 const existingProcess = await getRunningXNEngineProcess(mainProcess.pid); if (!existingProcess) { // 创建日志文件 const logDir = path.join(process.cwd(), 'logs'); await fsPromises.mkdir(logDir, { recursive: true }); const logFile = path.join(logDir, `xnengine_${mainProcess.pid}.log`); // 从命令行参数中提取配置文件路径 const scenarioFile = args[1]; // 将进程信息写入数据库 await saveXNEngineProcess({ pid: mainProcess.pid, log_file: logFile, start_time: new Date().toISOString(), cmd: mainProcess.cmd, scenario_file: scenarioFile }); } // 返回成功响应 res.json({ success: true, message: '已连接到运行中的仿真', simulationId: mainProcess.pid.toString(), isExisting: true, startTime: mainProcess.startTime, totalProcesses: sortedProcesses.length, scenarioFile: (existingProcess && existingProcess.scenario_file) || args[1] }); return; } else { // 进程已结束或不是XNEngine进程,删除数据库记录 await deleteXNEngineProcess(mainProcess.pid); } } } catch (error) { console.error('检查XNEngine进程失败:', error); } // 如果没有找到运行中的进程,则启动新的仿真 // 获取XNCore路径 const xnCorePath = getXNCorePath(); if (!xnCorePath) { return res.status(500).json({ error: 'XNCore未设置', message: '无法找到XNEngine可执行程序' }); } // 构建XNEngine路径 const enginePath = path.join(xnCorePath, 'XNEngine'); // 检查引擎程序是否存在 try { await fsPromises.access(enginePath); } catch (error) { return res.status(404).json({ error: 'XNEngine不存在', message: `${enginePath} 不存在或无法访问` }); } // 创建日志文件 const logDir = path.join(process.cwd(), 'logs'); await fsPromises.mkdir(logDir, { recursive: true }); const logFile = path.join(logDir, `xnengine_${simulationId}.log`); // 使用nohup启动进程,将输出重定向到日志文件 const cmd = `nohup ${enginePath} ${args.join(' ')} > ${logFile} 2>&1 & echo $!`; const { stdout: pid } = await execPromise(cmd); const processId = parseInt(pid.trim()); // 等待一小段时间确保进程启动 await new Promise(resolve => setTimeout(resolve, 1000)); // 检查进程是否成功启动且是XNEngine进程 const isRunning = await isProcessRunning(processId); const isXNEngine = await isXNEngineProcess(processId); // 检查是否是测试模式 const isTestMode = args.includes('-test'); if ((isRunning && isXNEngine) || isTestMode) { // 将进程信息写入数据库 await saveXNEngineProcess({ pid: processId, log_file: logFile, start_time: new Date().toISOString(), cmd: `${enginePath} ${args.join(' ')}`, scenario_file: args[1] }); // 保存到运行中的仿真Map,但不启动tail进程 runningSimulations.set(simulationId, { pid: processId, startTime: Date.now(), output: '', errorOutput: '', logFile: logFile }); // 立即响应,返回仿真ID res.json({ success: true, message: isTestMode ? '测试已启动' : '仿真已启动', simulationId: processId.toString(), scenarioFile: args[1], isTestMode: isTestMode, logFile: logFile }); } else { // 进程启动失败或不是XNEngine进程 await deleteXNEngineProcess(processId); res.status(500).json({ error: '启动失败', message: '进程启动后立即退出或不是XNEngine进程' }); } } catch (error) { res.status(500).json({ error: '运行仿真失败', message: error.message }); } }); // 修改stop-simulation路由 router.post('/stop-simulation', async (req, res) => { try { const { id } = req.body; if (!id) { return res.status(400).json({ error: '缺少必要参数', message: '缺少仿真ID' }); } // 从数据库获取进程信息 const processInfo = await getRunningXNEngineProcess(id); if (processInfo) { // 检查进程是否在运行且是XNEngine进程 const isRunning = await isProcessRunning(processInfo.pid); const isXNEngine = await isXNEngineProcess(processInfo.pid); if (isRunning && isXNEngine) { // 终止进程 try { process.kill(processInfo.pid, 'SIGTERM'); // 等待进程终止 await new Promise(resolve => setTimeout(resolve, 1000)); // 检查进程是否真的终止了 const stillRunning = await isProcessRunning(processInfo.pid); if (stillRunning) { // 如果还在运行,强制终止 process.kill(processInfo.pid, 'SIGKILL'); } // 删除数据库中的进程记录 await deleteXNEngineProcess(id); // 推送终止事件到SSE客户端 sendSSEMessage(id, 'terminated', { message: '仿真已终止' }); // 从运行中的仿真Map移除 runningSimulations.delete(id); res.json({ success: true, message: '仿真已终止' }); } catch (error) { console.error('终止进程失败:', error); res.status(500).json({ error: '终止仿真失败', message: error.message }); } } else { // 进程已经不在运行或不是XNEngine进程,删除数据库记录 await deleteXNEngineProcess(id); res.json({ success: true, message: '仿真进程已经停止' }); } } else { res.json({ success: true, message: '没有找到运行中的仿真进程' }); } } catch (error) { res.status(500).json({ error: '终止仿真失败', message: error.message }); } }); // 修改check-xnengine路由 router.get('/check-xnengine', async (req, res) => { try { // 从数据库获取最新的运行中进程 const processInfo = await getLatestRunningXNEngineProcess(); if (processInfo) { // 检查进程是否真的在运行且是XNEngine进程 const isRunning = await isProcessRunning(processInfo.pid); const isXNEngine = await isXNEngineProcess(processInfo.pid); if (isRunning && isXNEngine) { res.json({ running: true, pid: processInfo.pid, startTime: processInfo.start_time, cmd: processInfo.cmd, message: 'XNEngine进程正在运行' }); } else { // 进程已结束或不是XNEngine进程,删除数据库记录 await deleteXNEngineProcess(processInfo.pid); res.json({ running: false, message: 'XNEngine进程已停止' }); } } else { res.json({ running: false, message: '未找到运行中的XNEngine进程' }); } } catch (error) { console.error('检查XNEngine进程失败:', error); res.status(500).json({ error: '检查XNEngine进程失败', message: error.message }); } }); // 添加清理仿真资源的接口 router.post('/cleanup-simulation/:id', (req, res) => { const simulationId = req.params.id; // 清理该仿真ID对应的所有资源 if (runningSimulations.has(simulationId)) { // 清理tail进程 const simulation = runningSimulations.get(simulationId); if (simulation.tailProcess) { try { simulation.tailProcess.kill(); } catch (error) { console.error('清理tail进程失败:', error); } } // 清理其他资源 runningSimulations.delete(simulationId); } res.json({ success: true, message: '仿真资源已清理' }); }); // 添加读取日志文件的路由 router.get('/read-log-file', async (req, res) => { try { const { file, position } = req.query; if (!file) { return res.status(400).json({ error: '缺少日志文件路径' }); } const startPosition = parseInt(position) || 0; // 读取文件 const stats = await fsPromises.stat(file); if (stats.size < startPosition) { // 文件被截断,从头开始读取 startPosition = 0; } // 使用二进制模式读取,以保留ANSI颜色代码 const stream = fs.createReadStream(file, { start: startPosition, encoding: null // 使用二进制模式 }); let content = ''; for await (const chunk of stream) { // 将二进制数据转换为字符串,保留ANSI颜色代码 content += chunk.toString('utf8'); } res.json({ content }); } catch (error) { console.error('读取日志文件失败:', error); res.status(500).json({ error: '读取日志文件失败', message: error.message }); } }); // 添加检查进程状态的路由 router.get('/check-process/:pid', async (req, res) => { try { const pid = req.params.pid; if (!pid) { return res.status(400).json({ error: '缺少进程ID' }); } // 检查进程是否在运行 const isRunning = await isProcessRunning(pid); const isXNEngine = await isXNEngineProcess(pid); if (isRunning && isXNEngine) { res.json({ running: true }); } else { // 进程已结束,检查日志文件是否有错误 const logFile = path.join(process.cwd(), 'logs', `xnengine_${pid}.log`); try { const content = await fsPromises.readFile(logFile, 'utf8'); const hasError = content.includes('Error:') || content.includes('error:'); res.json({ running: false, success: !hasError, message: hasError ? '测试执行失败' : '测试执行成功' }); } catch (error) { res.json({ running: false, success: false, message: '无法读取日志文件' }); } } } catch (error) { console.error('检查进程状态失败:', error); res.status(500).json({ error: '检查进程状态失败', message: error.message }); } }); module.exports = router;