#include "DataCollect.h" #include "DataMonitorFactory.h" DataCollect::DataCollect() : m_running(false), m_nextExecuteTime(0), m_collectDuration(0), m_collectFrequency(0) { } DataCollect::~DataCollect() { stop(); } bool DataCollect::Initialize(std::vector collectDataInfos, std::string dcsFilePath) { m_collectDataInfos = collectDataInfos; // 打开并读取 dcs 文件 std::ifstream dcsFile(dcsFilePath); if (!dcsFile.is_open()) { return false; } std::string line; bool foundCollectList = false; std::string collectListContent; // 查找 define collect_list 部分 while (std::getline(dcsFile, line)) { // 跳过注释行 if (line.empty() || line[0] == '!') { continue; } if (line.find("define collect_list") != std::string::npos) { foundCollectList = true; continue; } if (foundCollectList) { // 检查是否到达结束引号 if (line.find("\"") != std::string::npos) { break; } collectListContent += line; } } // 继续读取文件,查找采集时长和频率以及输出文件名 while (std::getline(dcsFile, line)) { // 跳过注释行 if (line.empty() || line[0] == '!') { continue; } // 查找 "for X at Y" 格式的行 if (line.find("for") != std::string::npos && line.find("at") != std::string::npos) { std::stringstream ss(line); std::string token; ss >> token; // 读取 "for" ss >> m_collectDuration; // 读取时长 ss >> token; // 读取 "at" ss >> m_collectFrequency; // 读取频率 } // 查找输出文件名 if (line.find("put/extend/all result") != std::string::npos) { std::stringstream ss(line); std::string token; ss >> token; // 读取 "put/extend/all" ss >> token; // 读取 "result" ss >> m_outputFileName; // 读取文件名 m_outputFileName += ".csv"; // 添加.csv后缀 } } // 解析 collect_list 内容 std::stringstream ss(collectListContent); std::string field; while (std::getline(ss, field, '-')) { // 去除字段前后的空白字符 field.erase(0, field.find_first_not_of(" \t\r\n")); field.erase(field.find_last_not_of(" \t\r\n") + 1); if (!field.empty()) { parseHeaderField(field); } } // 关闭 dcs 文件 dcsFile.close(); for (const auto &collectDataInfo : m_collectDataInfos) { auto dataMonitor = DataMonitorFactory::GetInstance(collectDataInfo.structName); if (dataMonitor == nullptr) { return false; } if (dataMonitor->isInitialized()) { m_alreadyStartedMonitors[collectDataInfo.structName] = dataMonitor; } else { dataMonitor->Initialize(nullptr, 0, 0); m_notStartedMonitors[collectDataInfo.structName] = dataMonitor; } } // 获取dcs文件所在目录 std::filesystem::path dcsPath(dcsFilePath); std::string dcsDir = dcsPath.parent_path().string(); // 将输出文件名设置为dcs文件同目录下的完整路径 m_outputFileName = dcsDir + "/" + m_outputFileName; if (std::filesystem::exists(m_outputFileName)) { int suffix = 1; std::filesystem::path originalPath(m_outputFileName); std::string stem = originalPath.stem().string(); std::string extension = originalPath.extension().string(); std::string newFileName; do { newFileName = stem + "_" + std::to_string(suffix) + extension; suffix++; } while (std::filesystem::exists(originalPath.parent_path() / newFileName)); std::filesystem::rename(m_outputFileName, (originalPath.parent_path() / newFileName).string()); } m_outputFile.open(m_outputFileName); if (!m_outputFile.is_open()) { return false; } // 写入表头 m_outputFile << "Time"; for (const auto &headerField : m_headerFields) { if (headerField.arrayDim == 0) { m_outputFile << "," << headerField.fieldName; } else if (headerField.arrayDim == 1) { m_outputFile << "," << headerField.fieldName << "(" << headerField.arrayIndex1 + 1 << ")"; } else if (headerField.arrayDim == 2) { m_outputFile << "," << headerField.fieldName << "(" << headerField.arrayIndex1 + 1 << "_" << headerField.arrayIndex2 + 1 << ")"; } } m_outputFile << std::endl; return true; } void DataCollect::parseHeaderField(const std::string &headerField) { CSVHeaderField csvHeaderField; csvHeaderField.fieldName = headerField.substr(0, headerField.find('(')); csvHeaderField.arrayDim = 0; csvHeaderField.arrayIndex1 = 0; csvHeaderField.arrayIndex2 = 0; if (headerField.find('(') != std::string::npos) { // 处理一维或二维数组,格式为interfaceName(1)或interfaceName(1,2) size_t firstParenPos = headerField.find('('); size_t lastParenPos = headerField.find(')'); std::string indexStr = headerField.substr(firstParenPos + 1, lastParenPos - firstParenPos - 1); if (indexStr.find('_') != std::string::npos) { // 二维数组 size_t commaPos = indexStr.find('_'); csvHeaderField.arrayIndex1 = std::stoi(indexStr.substr(0, commaPos)) - 1; if (csvHeaderField.arrayIndex1 < 0) { return; } csvHeaderField.arrayIndex2 = std::stoi(indexStr.substr(commaPos + 1)) - 1; if (csvHeaderField.arrayIndex2 < 0) { return; } csvHeaderField.arrayDim = 2; } else { // 一维数组 csvHeaderField.arrayIndex1 = std::stoi(indexStr) - 1; if (csvHeaderField.arrayIndex1 < 0) { return; } csvHeaderField.arrayDim = 1; } } for (const auto &collectDataInfo : m_collectDataInfos) { for (int i = 0; i < collectDataInfo.interfaceNames.size(); i++) { if (collectDataInfo.interfaceNames[i] == csvHeaderField.fieldName) { csvHeaderField.structName = collectDataInfo.structName; if (collectDataInfo.arraySizes[i].second > 1) { csvHeaderField.arraySize2 = collectDataInfo.arraySizes[i].second; } else { csvHeaderField.arraySize2 = 0; } break; } } } if (csvHeaderField.structName.empty()) { return; } m_headerFields.push_back(csvHeaderField); } void DataCollect::start() { std::lock_guard lock(m_mutex); if (!m_running) { m_running = true; m_thread = std::thread(&DataCollect::threadFunc, this); } } void DataCollect::stop() { { std::lock_guard lock(m_mutex); if (m_running) { m_running = false; m_cv.notify_all(); } } if (m_thread.joinable()) { m_thread.join(); } // 关闭文件 if (m_outputFile.is_open()) { m_outputFile.close(); } // 释放未启动的监控器 for (const auto &[structName, dataMonitor] : m_notStartedMonitors) { DataMonitorFactory::ReleaseInstance(structName); } m_notStartedMonitors.clear(); m_alreadyStartedMonitors.clear(); } void DataCollect::updateData() { for (const auto &headerField : m_headerFields) { if (m_data.find(headerField.structName) != m_data.end()) { if (m_data[headerField.structName].find(headerField.fieldName) != m_data[headerField.structName].end()) { if (headerField.arrayDim == 0) { m_outputFile << "," << m_data[headerField.structName][headerField.fieldName]; } else { std::string value = m_data[headerField.structName][headerField.fieldName]; std::stringstream ss(value); std::string item; std::vector values; while (std::getline(ss, item, ',')) { values.push_back(item); } if (headerField.arrayDim == 1) { m_outputFile << "," << values[headerField.arrayIndex1]; } else if (headerField.arrayDim == 2) { m_outputFile << "," << values[headerField.arrayIndex1 * headerField.arraySize2 + headerField.arrayIndex2]; } } } else { m_outputFile << "," << "0"; } } else { m_outputFile << "," << "0"; } } m_outputFile << std::endl; } bool DataCollect::isRunning() const { return m_running; } void DataCollect::threadFunc() { auto startTime = std::chrono::steady_clock::now(); auto step = std::chrono::milliseconds(static_cast(1000 / m_collectFrequency)); auto nextTime = step; auto endTime = startTime + std::chrono::milliseconds(static_cast(m_collectDuration * 1000)); while (m_running) { // 等待直到到达执行时间 auto now = std::chrono::steady_clock::now(); auto targetTime = startTime + nextTime; if (now < targetTime) { std::this_thread::sleep_until(targetTime); } m_data.clear(); // 执行数据注入 for (const auto &monitorDataInfo : m_collectDataInfos) { DataMonitorBasePtr dataMonitor; if (m_notStartedMonitors.find(monitorDataInfo.structName) != m_notStartedMonitors.end()) { dataMonitor = m_notStartedMonitors[monitorDataInfo.structName]; } else if (m_alreadyStartedMonitors.find(monitorDataInfo.structName) != m_alreadyStartedMonitors.end()) { dataMonitor = m_alreadyStartedMonitors[monitorDataInfo.structName]; } if (dataMonitor && m_data.find(monitorDataInfo.structName) == m_data.end()) { m_data[monitorDataInfo.structName] = dataMonitor->getStringData(monitorDataInfo.interfaceNames); } } // 写入一行数据 nextTime += step; if (targetTime > endTime) { m_running = false; break; } double timeStamp = std::chrono::duration_cast(now - startTime).count() / 1000.0; m_outputFile << std::fixed << std::setprecision(3) << timeStamp; updateData(); } }