XNSim/XNSimHtml/routes/run-simulation.js

590 lines
21 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const express = require('express');
const router = express.Router();
const { spawn, exec } = require('child_process');
const path = require('path');
const fs = require('fs');
const fsPromises = require('fs').promises;
const util = require('util');
const execPromise = util.promisify(exec);
const {
getRunningXNEngineProcess,
saveXNEngineProcess,
updateXNEngineProcessStatus,
deleteXNEngineProcess,
getLatestRunningXNEngineProcess
} = require('../utils/xnengine-process-utils');
const { getXNCorePath } = require('../utils/file-utils');
// 存储正在运行的仿真进程
const runningSimulations = new Map();
// 存储SSE连接
const sseClients = new Map();
// SSE中间件
function setupSSE(req, res) {
// 设置SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
// 发送初始注释以保持连接
res.write(':\n\n');
// 定期发送注释以保持连接
const keepAliveId = setInterval(() => {
res.write(':\n\n');
}, 15000);
// 当客户端断开连接时清理
req.on('close', () => {
clearInterval(keepAliveId);
// 如果存在simulationId从SSE客户端列表中移除
const simulationId = req.params.id;
if (simulationId && sseClients.has(simulationId)) {
sseClients.get(simulationId).delete(res);
if (sseClients.get(simulationId).size === 0) {
sseClients.delete(simulationId);
}
}
});
return { res, keepAliveId };
}
// 向SSE客户端发送消息
function sendSSEMessage(simulationId, event, data) {
if (!sseClients.has(simulationId)) return;
const clients = sseClients.get(simulationId);
const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
for (const client of clients) {
client.write(message);
}
}
// 检查进程是否在运行
async function isProcessRunning(pid) {
try {
process.kill(pid, 0);
return true;
} catch (error) {
return false;
}
}
// 检查进程是否为XNEngine
async function isXNEngineProcess(pid) {
try {
const { stdout } = await execPromise(`ps -p ${pid} -o comm=`);
return stdout.trim() === 'XNEngine';
} catch (error) {
return false;
}
}
// 获取实时输出的SSE接口
router.get('/simulation-output/:id', async (req, res) => {
const simulationId = req.params.id;
// 验证仿真ID
if (!simulationId) {
return res.status(400).json({ error: '缺少仿真ID' });
}
// 初始化SSE连接
const { res: sseRes } = setupSSE(req, res);
// 将该连接添加到SSE客户端列表
if (!sseClients.has(simulationId)) {
sseClients.set(simulationId, new Set());
}
sseClients.get(simulationId).add(sseRes);
try {
// 从数据库获取进程信息
const processInfo = await getRunningXNEngineProcess(simulationId);
if (processInfo) {
// 检查进程是否真的在运行且是XNEngine进程
const isRunning = await isProcessRunning(processInfo.pid);
const isXNEngine = await isXNEngineProcess(processInfo.pid);
if (isRunning && isXNEngine) {
// 发送运行状态
sendSSEMessage(simulationId, 'status', {
running: true,
pid: processInfo.pid,
startTime: processInfo.start_time,
message: '已连接到运行中的XNEngine进程'
});
// 将进程添加到runningSimulations
const simulation = runningSimulations.get(simulationId) || {
pid: processInfo.pid,
startTime: new Date(processInfo.start_time).getTime(),
output: '',
errorOutput: '',
logFile: processInfo.log_file
};
runningSimulations.set(simulationId, simulation);
// 使用tail命令来跟踪日志文件
const tailProcess = spawn('tail', ['-f', processInfo.log_file], {
stdio: ['ignore', 'pipe', 'pipe']
});
// 保存tail进程引用
simulation.tailProcess = tailProcess;
// 收集标准输出
tailProcess.stdout.on('data', (data) => {
const chunk = data.toString('utf8');
simulation.output += chunk;
// 推送到SSE客户端
sendSSEMessage(simulationId, 'output', {
type: 'stdout',
data: chunk
});
});
// 当客户端断开连接时清理
req.on('close', () => {
if (simulation.tailProcess) {
simulation.tailProcess.kill();
simulation.tailProcess = null;
}
});
// 当tail进程结束时
tailProcess.on('close', (code) => {
if (simulation.tailProcess === tailProcess) {
simulation.tailProcess = null;
}
runningSimulations.delete(simulationId);
sendSSEMessage(simulationId, 'status', {
running: false,
message: '仿真进程已结束'
});
});
return;
} else {
// 进程已结束或不是XNEngine进程删除数据库记录
await deleteXNEngineProcess(simulationId);
}
}
// 如果没有找到运行中的进程,发送仿真不存在或已结束的消息
sendSSEMessage(simulationId, 'status', {
running: false,
message: '仿真不存在或已结束'
});
} catch (error) {
console.error('处理SSE连接失败:', error);
sendSSEMessage(simulationId, 'status', {
running: false,
message: '连接失败: ' + error.message
});
}
});
// 修改run-simulation路由
router.post('/run-simulation', async (req, res) => {
try {
const { args, id } = req.body;
const simulationId = id || Date.now().toString();
if (!args || !Array.isArray(args)) {
return res.status(400).json({
error: '缺少必要参数',
message: '缺少args参数或格式不正确'
});
}
// 检查是否有XNEngine进程在运行
try {
const { stdout } = await execPromise('ps -ef | grep XNEngine | grep -v grep || true');
if (stdout.trim()) {
const processes = stdout.trim().split('\n');
// 按启动时间排序,最早的进程通常是主进程
const sortedProcesses = processes.map(line => {
const parts = line.trim().split(/\s+/);
return {
pid: parts[1],
ppid: parts[2],
startTime: parts[4],
cmd: parts.slice(7).join(' ')
};
}).sort((a, b) => a.startTime.localeCompare(b.startTime));
// 找到主进程
const mainProcess = sortedProcesses[0];
// 检查主进程是否真的在运行且是XNEngine进程
const isRunning = await isProcessRunning(mainProcess.pid);
const isXNEngine = await isXNEngineProcess(mainProcess.pid);
if (isRunning && isXNEngine) {
// 检查数据库中是否已有该进程记录
const existingProcess = await getRunningXNEngineProcess(mainProcess.pid);
if (!existingProcess) {
// 创建日志文件
const logDir = path.join(process.cwd(), 'logs');
await fsPromises.mkdir(logDir, { recursive: true });
const logFile = path.join(logDir, `xnengine_${mainProcess.pid}.log`);
// 从命令行参数中提取配置文件路径
const scenarioFile = args[1];
// 将进程信息写入数据库
await saveXNEngineProcess({
pid: mainProcess.pid,
log_file: logFile,
start_time: new Date().toISOString(),
cmd: mainProcess.cmd,
scenario_file: scenarioFile
});
}
// 返回成功响应
res.json({
success: true,
message: '已连接到运行中的仿真',
simulationId: mainProcess.pid.toString(),
isExisting: true,
startTime: mainProcess.startTime,
totalProcesses: sortedProcesses.length,
scenarioFile: (existingProcess && existingProcess.scenario_file) || args[1]
});
return;
} else {
// 进程已结束或不是XNEngine进程删除数据库记录
await deleteXNEngineProcess(mainProcess.pid);
}
}
} catch (error) {
console.error('检查XNEngine进程失败:', error);
}
// 如果没有找到运行中的进程,则启动新的仿真
// 获取XNCore路径
const xnCorePath = getXNCorePath();
if (!xnCorePath) {
return res.status(500).json({
error: 'XNCore未设置',
message: '无法找到XNEngine可执行程序'
});
}
// 构建XNEngine路径
const enginePath = path.join(xnCorePath, 'XNEngine');
// 检查引擎程序是否存在
try {
await fsPromises.access(enginePath);
} catch (error) {
return res.status(404).json({
error: 'XNEngine不存在',
message: `${enginePath} 不存在或无法访问`
});
}
// 创建日志文件
const logDir = path.join(process.cwd(), 'logs');
await fsPromises.mkdir(logDir, { recursive: true });
const logFile = path.join(logDir, `xnengine_${simulationId}.log`);
// 使用nohup启动进程将输出重定向到日志文件
const cmd = `nohup ${enginePath} ${args.join(' ')} > ${logFile} 2>&1 & echo $!`;
const { stdout: pid } = await execPromise(cmd);
const processId = parseInt(pid.trim());
// 等待一小段时间确保进程启动
await new Promise(resolve => setTimeout(resolve, 1000));
// 检查进程是否成功启动且是XNEngine进程
const isRunning = await isProcessRunning(processId);
const isXNEngine = await isXNEngineProcess(processId);
// 检查是否是测试模式
const isTestMode = args.includes('-test');
if ((isRunning && isXNEngine) || isTestMode) {
// 将进程信息写入数据库
await saveXNEngineProcess({
pid: processId,
log_file: logFile,
start_time: new Date().toISOString(),
cmd: `${enginePath} ${args.join(' ')}`,
scenario_file: args[1]
});
// 保存到运行中的仿真Map但不启动tail进程
runningSimulations.set(simulationId, {
pid: processId,
startTime: Date.now(),
output: '',
errorOutput: '',
logFile: logFile
});
// 立即响应返回仿真ID
res.json({
success: true,
message: isTestMode ? '测试已启动' : '仿真已启动',
simulationId: processId.toString(),
scenarioFile: args[1],
isTestMode: isTestMode,
logFile: logFile
});
} else {
// 进程启动失败或不是XNEngine进程
await deleteXNEngineProcess(processId);
res.status(500).json({
error: '启动失败',
message: '进程启动后立即退出或不是XNEngine进程'
});
}
} catch (error) {
res.status(500).json({
error: '运行仿真失败',
message: error.message
});
}
});
// 修改stop-simulation路由
router.post('/stop-simulation', async (req, res) => {
try {
const { id } = req.body;
if (!id) {
return res.status(400).json({
error: '缺少必要参数',
message: '缺少仿真ID'
});
}
// 从数据库获取进程信息
const processInfo = await getRunningXNEngineProcess(id);
if (processInfo) {
// 检查进程是否在运行且是XNEngine进程
const isRunning = await isProcessRunning(processInfo.pid);
const isXNEngine = await isXNEngineProcess(processInfo.pid);
if (isRunning && isXNEngine) {
// 终止进程
try {
process.kill(processInfo.pid, 'SIGTERM');
// 等待进程终止
await new Promise(resolve => setTimeout(resolve, 1000));
// 检查进程是否真的终止了
const stillRunning = await isProcessRunning(processInfo.pid);
if (stillRunning) {
// 如果还在运行,强制终止
process.kill(processInfo.pid, 'SIGKILL');
}
// 删除数据库中的进程记录
await deleteXNEngineProcess(id);
// 推送终止事件到SSE客户端
sendSSEMessage(id, 'terminated', {
message: '仿真已终止'
});
// 从运行中的仿真Map移除
runningSimulations.delete(id);
res.json({
success: true,
message: '仿真已终止'
});
} catch (error) {
console.error('终止进程失败:', error);
res.status(500).json({
error: '终止仿真失败',
message: error.message
});
}
} else {
// 进程已经不在运行或不是XNEngine进程删除数据库记录
await deleteXNEngineProcess(id);
res.json({
success: true,
message: '仿真进程已经停止'
});
}
} else {
res.json({
success: true,
message: '没有找到运行中的仿真进程'
});
}
} catch (error) {
res.status(500).json({
error: '终止仿真失败',
message: error.message
});
}
});
// 修改check-xnengine路由
router.get('/check-xnengine', async (req, res) => {
try {
// 从数据库获取最新的运行中进程
const processInfo = await getLatestRunningXNEngineProcess();
if (processInfo) {
// 检查进程是否真的在运行且是XNEngine进程
const isRunning = await isProcessRunning(processInfo.pid);
const isXNEngine = await isXNEngineProcess(processInfo.pid);
if (isRunning && isXNEngine) {
res.json({
running: true,
pid: processInfo.pid,
startTime: processInfo.start_time,
cmd: processInfo.cmd,
message: 'XNEngine进程正在运行'
});
} else {
// 进程已结束或不是XNEngine进程删除数据库记录
await deleteXNEngineProcess(processInfo.pid);
res.json({
running: false,
message: 'XNEngine进程已停止'
});
}
} else {
res.json({
running: false,
message: '未找到运行中的XNEngine进程'
});
}
} catch (error) {
console.error('检查XNEngine进程失败:', error);
res.status(500).json({
error: '检查XNEngine进程失败',
message: error.message
});
}
});
// 添加清理仿真资源的接口
router.post('/cleanup-simulation/:id', (req, res) => {
const simulationId = req.params.id;
// 清理该仿真ID对应的所有资源
if (runningSimulations.has(simulationId)) {
// 清理tail进程
const simulation = runningSimulations.get(simulationId);
if (simulation.tailProcess) {
try {
simulation.tailProcess.kill();
} catch (error) {
console.error('清理tail进程失败:', error);
}
}
// 清理其他资源
runningSimulations.delete(simulationId);
}
res.json({ success: true, message: '仿真资源已清理' });
});
// 添加读取日志文件的路由
router.get('/read-log-file', async (req, res) => {
try {
const { file, position } = req.query;
if (!file) {
return res.status(400).json({ error: '缺少日志文件路径' });
}
const startPosition = parseInt(position) || 0;
// 读取文件
const stats = await fsPromises.stat(file);
if (stats.size < startPosition) {
// 文件被截断,从头开始读取
startPosition = 0;
}
// 使用二进制模式读取以保留ANSI颜色代码
const stream = fs.createReadStream(file, {
start: startPosition,
encoding: null // 使用二进制模式
});
let content = '';
for await (const chunk of stream) {
// 将二进制数据转换为字符串保留ANSI颜色代码
content += chunk.toString('utf8');
}
res.json({ content });
} catch (error) {
console.error('读取日志文件失败:', error);
res.status(500).json({
error: '读取日志文件失败',
message: error.message
});
}
});
// 添加检查进程状态的路由
router.get('/check-process/:pid', async (req, res) => {
try {
const pid = req.params.pid;
if (!pid) {
return res.status(400).json({ error: '缺少进程ID' });
}
// 检查进程是否在运行
const isRunning = await isProcessRunning(pid);
const isXNEngine = await isXNEngineProcess(pid);
if (isRunning && isXNEngine) {
res.json({ running: true });
} else {
// 进程已结束,检查日志文件是否有错误
const logFile = path.join(process.cwd(), 'logs', `xnengine_${pid}.log`);
try {
const content = await fsPromises.readFile(logFile, 'utf8');
const hasError = content.includes('Error:') || content.includes('error:');
res.json({
running: false,
success: !hasError,
message: hasError ? '测试执行失败' : '测试执行成功'
});
} catch (error) {
res.json({
running: false,
success: false,
message: '无法读取日志文件'
});
}
}
} catch (error) {
console.error('检查进程状态失败:', error);
res.status(500).json({
error: '检查进程状态失败',
message: error.message
});
}
});
module.exports = router;