XNSim/XNMonitorServer/DataCollect.cpp

326 lines
9.0 KiB
C++
Raw Permalink Normal View History

#include "DataCollect.h"
#include "DataMonitorFactory.h"
DataCollect::DataCollect()
: m_running(false), m_nextExecuteTime(0), m_collectDuration(0), m_collectFrequency(0)
{
}
DataCollect::~DataCollect()
{
stop();
}
bool DataCollect::Initialize(std::vector<MonitorDataInfo> collectDataInfos, std::string dcsFilePath)
{
m_collectDataInfos = collectDataInfos;
// 打开并读取 dcs 文件
std::ifstream dcsFile(dcsFilePath);
if (!dcsFile.is_open()) {
return false;
}
std::string line;
bool foundCollectList = false;
std::string collectListContent;
// 查找 define collect_list 部分
while (std::getline(dcsFile, line)) {
// 跳过注释行
if (line.empty() || line[0] == '!') {
continue;
}
if (line.find("define collect_list") != std::string::npos) {
foundCollectList = true;
continue;
}
if (foundCollectList) {
// 检查是否到达结束引号
if (line.find("\"") != std::string::npos) {
break;
}
collectListContent += line;
}
}
// 继续读取文件,查找采集时长和频率以及输出文件名
while (std::getline(dcsFile, line)) {
// 跳过注释行
if (line.empty() || line[0] == '!') {
continue;
}
// 查找 "for X at Y" 格式的行
if (line.find("for") != std::string::npos && line.find("at") != std::string::npos) {
std::stringstream ss(line);
std::string token;
ss >> token; // 读取 "for"
ss >> m_collectDuration; // 读取时长
ss >> token; // 读取 "at"
ss >> m_collectFrequency; // 读取频率
}
// 查找输出文件名
if (line.find("put/extend/all result") != std::string::npos) {
std::stringstream ss(line);
std::string token;
ss >> token; // 读取 "put/extend/all"
ss >> token; // 读取 "result"
ss >> m_outputFileName; // 读取文件名
m_outputFileName += ".csv"; // 添加.csv后缀
}
}
// 解析 collect_list 内容
std::stringstream ss(collectListContent);
std::string field;
while (std::getline(ss, field, '-')) {
// 去除字段前后的空白字符
field.erase(0, field.find_first_not_of(" \t\r\n"));
field.erase(field.find_last_not_of(" \t\r\n") + 1);
if (!field.empty()) {
parseHeaderField(field);
}
}
// 关闭 dcs 文件
dcsFile.close();
for (const auto &collectDataInfo : m_collectDataInfos) {
auto dataMonitor = DataMonitorFactory::GetInstance(collectDataInfo.structName);
if (dataMonitor == nullptr) {
return false;
}
if (dataMonitor->isInitialized()) {
m_alreadyStartedMonitors[collectDataInfo.structName] = dataMonitor;
} else {
dataMonitor->Initialize(nullptr, 0, 0);
m_notStartedMonitors[collectDataInfo.structName] = dataMonitor;
}
}
// 获取dcs文件所在目录
std::filesystem::path dcsPath(dcsFilePath);
std::string dcsDir = dcsPath.parent_path().string();
// 将输出文件名设置为dcs文件同目录下的完整路径
m_outputFileName = dcsDir + "/" + m_outputFileName;
if (std::filesystem::exists(m_outputFileName)) {
int suffix = 1;
std::filesystem::path originalPath(m_outputFileName);
std::string stem = originalPath.stem().string();
std::string extension = originalPath.extension().string();
std::string newFileName;
do {
newFileName = stem + "_" + std::to_string(suffix) + extension;
suffix++;
} while (std::filesystem::exists(originalPath.parent_path() / newFileName));
std::filesystem::rename(m_outputFileName,
(originalPath.parent_path() / newFileName).string());
}
m_outputFile.open(m_outputFileName);
if (!m_outputFile.is_open()) {
return false;
}
// 写入表头
m_outputFile << "Time";
for (const auto &headerField : m_headerFields) {
if (headerField.arrayDim == 0) {
m_outputFile << "," << headerField.fieldName;
} else if (headerField.arrayDim == 1) {
m_outputFile << "," << headerField.fieldName << "(" << headerField.arrayIndex1 + 1
<< ")";
} else if (headerField.arrayDim == 2) {
m_outputFile << "," << headerField.fieldName << "(" << headerField.arrayIndex1 + 1
<< "_" << headerField.arrayIndex2 + 1 << ")";
}
}
m_outputFile << std::endl;
return true;
}
void DataCollect::parseHeaderField(const std::string &headerField)
{
CSVHeaderField csvHeaderField;
csvHeaderField.fieldName = headerField.substr(0, headerField.find('('));
csvHeaderField.arrayDim = 0;
csvHeaderField.arrayIndex1 = 0;
csvHeaderField.arrayIndex2 = 0;
if (headerField.find('(') != std::string::npos) {
// 处理一维或二维数组格式为interfaceName(1)或interfaceName(1,2)
size_t firstParenPos = headerField.find('(');
size_t lastParenPos = headerField.find(')');
std::string indexStr =
headerField.substr(firstParenPos + 1, lastParenPos - firstParenPos - 1);
if (indexStr.find('_') != std::string::npos) {
// 二维数组
size_t commaPos = indexStr.find('_');
csvHeaderField.arrayIndex1 = std::stoi(indexStr.substr(0, commaPos)) - 1;
if (csvHeaderField.arrayIndex1 < 0) {
return;
}
csvHeaderField.arrayIndex2 = std::stoi(indexStr.substr(commaPos + 1)) - 1;
if (csvHeaderField.arrayIndex2 < 0) {
return;
}
csvHeaderField.arrayDim = 2;
} else {
// 一维数组
csvHeaderField.arrayIndex1 = std::stoi(indexStr) - 1;
if (csvHeaderField.arrayIndex1 < 0) {
return;
}
csvHeaderField.arrayDim = 1;
}
}
for (const auto &collectDataInfo : m_collectDataInfos) {
for (int i = 0; i < collectDataInfo.interfaceNames.size(); i++) {
if (collectDataInfo.interfaceNames[i] == csvHeaderField.fieldName) {
csvHeaderField.structName = collectDataInfo.structName;
if (collectDataInfo.arraySizes[i].second > 1) {
csvHeaderField.arraySize2 = collectDataInfo.arraySizes[i].second;
} else {
csvHeaderField.arraySize2 = 0;
}
break;
}
}
}
if (csvHeaderField.structName.empty()) {
return;
}
m_headerFields.push_back(csvHeaderField);
}
void DataCollect::start()
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_running) {
m_running = true;
m_thread = std::thread(&DataCollect::threadFunc, this);
}
}
void DataCollect::stop()
{
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_running) {
m_running = false;
m_cv.notify_all();
}
}
if (m_thread.joinable()) {
m_thread.join();
}
// 关闭文件
if (m_outputFile.is_open()) {
m_outputFile.close();
}
// 释放未启动的监控器
for (const auto &[structName, dataMonitor] : m_notStartedMonitors) {
DataMonitorFactory::ReleaseInstance(structName);
}
m_notStartedMonitors.clear();
m_alreadyStartedMonitors.clear();
}
void DataCollect::updateData()
{
for (const auto &headerField : m_headerFields) {
if (m_data.find(headerField.structName) != m_data.end()) {
if (m_data[headerField.structName].find(headerField.fieldName)
!= m_data[headerField.structName].end()) {
if (headerField.arrayDim == 0) {
m_outputFile << "," << m_data[headerField.structName][headerField.fieldName];
} else {
std::string value = m_data[headerField.structName][headerField.fieldName];
std::stringstream ss(value);
std::string item;
std::vector<std::string> values;
while (std::getline(ss, item, ',')) {
values.push_back(item);
}
if (headerField.arrayDim == 1) {
m_outputFile << "," << values[headerField.arrayIndex1];
} else if (headerField.arrayDim == 2) {
m_outputFile << ","
<< values[headerField.arrayIndex1 * headerField.arraySize2
+ headerField.arrayIndex2];
}
}
} else {
m_outputFile << "," << "0";
}
} else {
m_outputFile << "," << "0";
}
}
m_outputFile << std::endl;
}
bool DataCollect::isRunning() const
{
return m_running;
}
void DataCollect::threadFunc()
{
auto startTime = std::chrono::steady_clock::now();
auto step = std::chrono::milliseconds(static_cast<int64_t>(1000 / m_collectFrequency));
auto nextTime = step;
auto endTime =
startTime + std::chrono::milliseconds(static_cast<int64_t>(m_collectDuration * 1000));
while (m_running) {
// 等待直到到达执行时间
auto now = std::chrono::steady_clock::now();
auto targetTime = startTime + nextTime;
if (now < targetTime) {
std::this_thread::sleep_until(targetTime);
}
m_data.clear();
// 执行数据注入
for (const auto &monitorDataInfo : m_collectDataInfos) {
DataMonitorBasePtr dataMonitor;
if (m_notStartedMonitors.find(monitorDataInfo.structName)
!= m_notStartedMonitors.end()) {
dataMonitor = m_notStartedMonitors[monitorDataInfo.structName];
} else if (m_alreadyStartedMonitors.find(monitorDataInfo.structName)
!= m_alreadyStartedMonitors.end()) {
dataMonitor = m_alreadyStartedMonitors[monitorDataInfo.structName];
}
if (dataMonitor && m_data.find(monitorDataInfo.structName) == m_data.end()) {
m_data[monitorDataInfo.structName] =
dataMonitor->getStringData(monitorDataInfo.interfaceNames);
}
}
// 写入一行数据
nextTime += step;
if (targetTime > endTime) {
m_running = false;
break;
}
double timeStamp =
std::chrono::duration_cast<std::chrono::milliseconds>(now - startTime).count() / 1000.0;
m_outputFile << std::fixed << std::setprecision(3) << timeStamp;
updateData();
}
}