XNSim/XNMonitorServer/DataCollect.cpp

326 lines
9.0 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "DataCollect.h"
#include "DataMonitorFactory.h"
DataCollect::DataCollect()
: m_running(false), m_nextExecuteTime(0), m_collectDuration(0), m_collectFrequency(0)
{
}
DataCollect::~DataCollect()
{
stop();
}
bool DataCollect::Initialize(std::vector<MonitorDataInfo> collectDataInfos, std::string dcsFilePath)
{
m_collectDataInfos = collectDataInfos;
// 打开并读取 dcs 文件
std::ifstream dcsFile(dcsFilePath);
if (!dcsFile.is_open()) {
return false;
}
std::string line;
bool foundCollectList = false;
std::string collectListContent;
// 查找 define collect_list 部分
while (std::getline(dcsFile, line)) {
// 跳过注释行
if (line.empty() || line[0] == '!') {
continue;
}
if (line.find("define collect_list") != std::string::npos) {
foundCollectList = true;
continue;
}
if (foundCollectList) {
// 检查是否到达结束引号
if (line.find("\"") != std::string::npos) {
break;
}
collectListContent += line;
}
}
// 继续读取文件,查找采集时长和频率以及输出文件名
while (std::getline(dcsFile, line)) {
// 跳过注释行
if (line.empty() || line[0] == '!') {
continue;
}
// 查找 "for X at Y" 格式的行
if (line.find("for") != std::string::npos && line.find("at") != std::string::npos) {
std::stringstream ss(line);
std::string token;
ss >> token; // 读取 "for"
ss >> m_collectDuration; // 读取时长
ss >> token; // 读取 "at"
ss >> m_collectFrequency; // 读取频率
}
// 查找输出文件名
if (line.find("put/extend/all result") != std::string::npos) {
std::stringstream ss(line);
std::string token;
ss >> token; // 读取 "put/extend/all"
ss >> token; // 读取 "result"
ss >> m_outputFileName; // 读取文件名
m_outputFileName += ".csv"; // 添加.csv后缀
}
}
// 解析 collect_list 内容
std::stringstream ss(collectListContent);
std::string field;
while (std::getline(ss, field, '-')) {
// 去除字段前后的空白字符
field.erase(0, field.find_first_not_of(" \t\r\n"));
field.erase(field.find_last_not_of(" \t\r\n") + 1);
if (!field.empty()) {
parseHeaderField(field);
}
}
// 关闭 dcs 文件
dcsFile.close();
for (const auto &collectDataInfo : m_collectDataInfos) {
auto dataMonitor = DataMonitorFactory::GetInstance(collectDataInfo.structName);
if (dataMonitor == nullptr) {
return false;
}
if (dataMonitor->isInitialized()) {
m_alreadyStartedMonitors[collectDataInfo.structName] = dataMonitor;
} else {
dataMonitor->Initialize(nullptr, 0, 0);
m_notStartedMonitors[collectDataInfo.structName] = dataMonitor;
}
}
// 获取dcs文件所在目录
std::filesystem::path dcsPath(dcsFilePath);
std::string dcsDir = dcsPath.parent_path().string();
// 将输出文件名设置为dcs文件同目录下的完整路径
m_outputFileName = dcsDir + "/" + m_outputFileName;
if (std::filesystem::exists(m_outputFileName)) {
int suffix = 1;
std::filesystem::path originalPath(m_outputFileName);
std::string stem = originalPath.stem().string();
std::string extension = originalPath.extension().string();
std::string newFileName;
do {
newFileName = stem + "_" + std::to_string(suffix) + extension;
suffix++;
} while (std::filesystem::exists(originalPath.parent_path() / newFileName));
std::filesystem::rename(m_outputFileName,
(originalPath.parent_path() / newFileName).string());
}
m_outputFile.open(m_outputFileName);
if (!m_outputFile.is_open()) {
return false;
}
// 写入表头
m_outputFile << "Time";
for (const auto &headerField : m_headerFields) {
if (headerField.arrayDim == 0) {
m_outputFile << "," << headerField.fieldName;
} else if (headerField.arrayDim == 1) {
m_outputFile << "," << headerField.fieldName << "(" << headerField.arrayIndex1 + 1
<< ")";
} else if (headerField.arrayDim == 2) {
m_outputFile << "," << headerField.fieldName << "(" << headerField.arrayIndex1 + 1
<< "_" << headerField.arrayIndex2 + 1 << ")";
}
}
m_outputFile << std::endl;
return true;
}
void DataCollect::parseHeaderField(const std::string &headerField)
{
CSVHeaderField csvHeaderField;
csvHeaderField.fieldName = headerField.substr(0, headerField.find('('));
csvHeaderField.arrayDim = 0;
csvHeaderField.arrayIndex1 = 0;
csvHeaderField.arrayIndex2 = 0;
if (headerField.find('(') != std::string::npos) {
// 处理一维或二维数组格式为interfaceName(1)或interfaceName(1,2)
size_t firstParenPos = headerField.find('(');
size_t lastParenPos = headerField.find(')');
std::string indexStr =
headerField.substr(firstParenPos + 1, lastParenPos - firstParenPos - 1);
if (indexStr.find('_') != std::string::npos) {
// 二维数组
size_t commaPos = indexStr.find('_');
csvHeaderField.arrayIndex1 = std::stoi(indexStr.substr(0, commaPos)) - 1;
if (csvHeaderField.arrayIndex1 < 0) {
return;
}
csvHeaderField.arrayIndex2 = std::stoi(indexStr.substr(commaPos + 1)) - 1;
if (csvHeaderField.arrayIndex2 < 0) {
return;
}
csvHeaderField.arrayDim = 2;
} else {
// 一维数组
csvHeaderField.arrayIndex1 = std::stoi(indexStr) - 1;
if (csvHeaderField.arrayIndex1 < 0) {
return;
}
csvHeaderField.arrayDim = 1;
}
}
for (const auto &collectDataInfo : m_collectDataInfos) {
for (int i = 0; i < collectDataInfo.interfaceNames.size(); i++) {
if (collectDataInfo.interfaceNames[i] == csvHeaderField.fieldName) {
csvHeaderField.structName = collectDataInfo.structName;
if (collectDataInfo.arraySizes[i].second > 1) {
csvHeaderField.arraySize2 = collectDataInfo.arraySizes[i].second;
} else {
csvHeaderField.arraySize2 = 0;
}
break;
}
}
}
if (csvHeaderField.structName.empty()) {
return;
}
m_headerFields.push_back(csvHeaderField);
}
void DataCollect::start()
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_running) {
m_running = true;
m_thread = std::thread(&DataCollect::threadFunc, this);
}
}
void DataCollect::stop()
{
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_running) {
m_running = false;
m_cv.notify_all();
}
}
if (m_thread.joinable()) {
m_thread.join();
}
// 关闭文件
if (m_outputFile.is_open()) {
m_outputFile.close();
}
// 释放未启动的监控器
for (const auto &[structName, dataMonitor] : m_notStartedMonitors) {
DataMonitorFactory::ReleaseInstance(structName);
}
m_notStartedMonitors.clear();
m_alreadyStartedMonitors.clear();
}
void DataCollect::updateData()
{
for (const auto &headerField : m_headerFields) {
if (m_data.find(headerField.structName) != m_data.end()) {
if (m_data[headerField.structName].find(headerField.fieldName)
!= m_data[headerField.structName].end()) {
if (headerField.arrayDim == 0) {
m_outputFile << "," << m_data[headerField.structName][headerField.fieldName];
} else {
std::string value = m_data[headerField.structName][headerField.fieldName];
std::stringstream ss(value);
std::string item;
std::vector<std::string> values;
while (std::getline(ss, item, ',')) {
values.push_back(item);
}
if (headerField.arrayDim == 1) {
m_outputFile << "," << values[headerField.arrayIndex1];
} else if (headerField.arrayDim == 2) {
m_outputFile << ","
<< values[headerField.arrayIndex1 * headerField.arraySize2
+ headerField.arrayIndex2];
}
}
} else {
m_outputFile << "," << "0";
}
} else {
m_outputFile << "," << "0";
}
}
m_outputFile << std::endl;
}
bool DataCollect::isRunning() const
{
return m_running;
}
void DataCollect::threadFunc()
{
auto startTime = std::chrono::steady_clock::now();
auto step = std::chrono::milliseconds(static_cast<int64_t>(1000 / m_collectFrequency));
auto nextTime = step;
auto endTime =
startTime + std::chrono::milliseconds(static_cast<int64_t>(m_collectDuration * 1000));
while (m_running) {
// 等待直到到达执行时间
auto now = std::chrono::steady_clock::now();
auto targetTime = startTime + nextTime;
if (now < targetTime) {
std::this_thread::sleep_until(targetTime);
}
m_data.clear();
// 执行数据注入
for (const auto &monitorDataInfo : m_collectDataInfos) {
DataMonitorBasePtr dataMonitor;
if (m_notStartedMonitors.find(monitorDataInfo.structName)
!= m_notStartedMonitors.end()) {
dataMonitor = m_notStartedMonitors[monitorDataInfo.structName];
} else if (m_alreadyStartedMonitors.find(monitorDataInfo.structName)
!= m_alreadyStartedMonitors.end()) {
dataMonitor = m_alreadyStartedMonitors[monitorDataInfo.structName];
}
if (dataMonitor && m_data.find(monitorDataInfo.structName) == m_data.end()) {
m_data[monitorDataInfo.structName] =
dataMonitor->getStringData(monitorDataInfo.interfaceNames);
}
}
// 写入一行数据
nextTime += step;
if (targetTime > endTime) {
m_running = false;
break;
}
double timeStamp =
std::chrono::duration_cast<std::chrono::milliseconds>(now - startTime).count() / 1000.0;
m_outputFile << std::fixed << std::setprecision(3) << timeStamp;
updateData();
}
}