326 lines
9.0 KiB
C++
326 lines
9.0 KiB
C++
#include "DataCollect.h"
|
||
#include "DataMonitorFactory.h"
|
||
|
||
DataCollect::DataCollect()
|
||
: m_running(false), m_nextExecuteTime(0), m_collectDuration(0), m_collectFrequency(0)
|
||
{
|
||
}
|
||
|
||
DataCollect::~DataCollect()
|
||
{
|
||
stop();
|
||
}
|
||
|
||
bool DataCollect::Initialize(std::vector<MonitorDataInfo> collectDataInfos, std::string dcsFilePath)
|
||
{
|
||
m_collectDataInfos = collectDataInfos;
|
||
|
||
// 打开并读取 dcs 文件
|
||
std::ifstream dcsFile(dcsFilePath);
|
||
if (!dcsFile.is_open()) {
|
||
return false;
|
||
}
|
||
|
||
std::string line;
|
||
bool foundCollectList = false;
|
||
std::string collectListContent;
|
||
|
||
// 查找 define collect_list 部分
|
||
while (std::getline(dcsFile, line)) {
|
||
// 跳过注释行
|
||
if (line.empty() || line[0] == '!') {
|
||
continue;
|
||
}
|
||
|
||
if (line.find("define collect_list") != std::string::npos) {
|
||
foundCollectList = true;
|
||
continue;
|
||
}
|
||
|
||
if (foundCollectList) {
|
||
// 检查是否到达结束引号
|
||
if (line.find("\"") != std::string::npos) {
|
||
break;
|
||
}
|
||
collectListContent += line;
|
||
}
|
||
}
|
||
|
||
// 继续读取文件,查找采集时长和频率以及输出文件名
|
||
while (std::getline(dcsFile, line)) {
|
||
// 跳过注释行
|
||
if (line.empty() || line[0] == '!') {
|
||
continue;
|
||
}
|
||
|
||
// 查找 "for X at Y" 格式的行
|
||
if (line.find("for") != std::string::npos && line.find("at") != std::string::npos) {
|
||
std::stringstream ss(line);
|
||
std::string token;
|
||
ss >> token; // 读取 "for"
|
||
ss >> m_collectDuration; // 读取时长
|
||
ss >> token; // 读取 "at"
|
||
ss >> m_collectFrequency; // 读取频率
|
||
}
|
||
|
||
// 查找输出文件名
|
||
if (line.find("put/extend/all result") != std::string::npos) {
|
||
std::stringstream ss(line);
|
||
std::string token;
|
||
ss >> token; // 读取 "put/extend/all"
|
||
ss >> token; // 读取 "result"
|
||
ss >> m_outputFileName; // 读取文件名
|
||
m_outputFileName += ".csv"; // 添加.csv后缀
|
||
}
|
||
}
|
||
|
||
// 解析 collect_list 内容
|
||
std::stringstream ss(collectListContent);
|
||
std::string field;
|
||
while (std::getline(ss, field, '-')) {
|
||
// 去除字段前后的空白字符
|
||
field.erase(0, field.find_first_not_of(" \t\r\n"));
|
||
field.erase(field.find_last_not_of(" \t\r\n") + 1);
|
||
|
||
if (!field.empty()) {
|
||
parseHeaderField(field);
|
||
}
|
||
}
|
||
|
||
// 关闭 dcs 文件
|
||
dcsFile.close();
|
||
|
||
for (const auto &collectDataInfo : m_collectDataInfos) {
|
||
auto dataMonitor = DataMonitorFactory::GetInstance(collectDataInfo.structName);
|
||
if (dataMonitor == nullptr) {
|
||
return false;
|
||
}
|
||
if (dataMonitor->isInitialized()) {
|
||
m_alreadyStartedMonitors[collectDataInfo.structName] = dataMonitor;
|
||
} else {
|
||
dataMonitor->Initialize(nullptr, 0, 0);
|
||
m_notStartedMonitors[collectDataInfo.structName] = dataMonitor;
|
||
}
|
||
}
|
||
|
||
// 获取dcs文件所在目录
|
||
std::filesystem::path dcsPath(dcsFilePath);
|
||
std::string dcsDir = dcsPath.parent_path().string();
|
||
|
||
// 将输出文件名设置为dcs文件同目录下的完整路径
|
||
m_outputFileName = dcsDir + "/" + m_outputFileName;
|
||
if (std::filesystem::exists(m_outputFileName)) {
|
||
int suffix = 1;
|
||
std::filesystem::path originalPath(m_outputFileName);
|
||
std::string stem = originalPath.stem().string();
|
||
std::string extension = originalPath.extension().string();
|
||
std::string newFileName;
|
||
|
||
do {
|
||
newFileName = stem + "_" + std::to_string(suffix) + extension;
|
||
suffix++;
|
||
} while (std::filesystem::exists(originalPath.parent_path() / newFileName));
|
||
|
||
std::filesystem::rename(m_outputFileName,
|
||
(originalPath.parent_path() / newFileName).string());
|
||
}
|
||
m_outputFile.open(m_outputFileName);
|
||
if (!m_outputFile.is_open()) {
|
||
return false;
|
||
}
|
||
|
||
// 写入表头
|
||
m_outputFile << "Time";
|
||
|
||
for (const auto &headerField : m_headerFields) {
|
||
if (headerField.arrayDim == 0) {
|
||
m_outputFile << "," << headerField.fieldName;
|
||
} else if (headerField.arrayDim == 1) {
|
||
m_outputFile << "," << headerField.fieldName << "(" << headerField.arrayIndex1 + 1
|
||
<< ")";
|
||
} else if (headerField.arrayDim == 2) {
|
||
m_outputFile << "," << headerField.fieldName << "(" << headerField.arrayIndex1 + 1
|
||
<< "_" << headerField.arrayIndex2 + 1 << ")";
|
||
}
|
||
}
|
||
m_outputFile << std::endl;
|
||
|
||
return true;
|
||
}
|
||
|
||
void DataCollect::parseHeaderField(const std::string &headerField)
|
||
{
|
||
CSVHeaderField csvHeaderField;
|
||
csvHeaderField.fieldName = headerField.substr(0, headerField.find('('));
|
||
csvHeaderField.arrayDim = 0;
|
||
csvHeaderField.arrayIndex1 = 0;
|
||
csvHeaderField.arrayIndex2 = 0;
|
||
|
||
if (headerField.find('(') != std::string::npos) {
|
||
// 处理一维或二维数组,格式为interfaceName(1)或interfaceName(1,2)
|
||
size_t firstParenPos = headerField.find('(');
|
||
size_t lastParenPos = headerField.find(')');
|
||
std::string indexStr =
|
||
headerField.substr(firstParenPos + 1, lastParenPos - firstParenPos - 1);
|
||
if (indexStr.find('_') != std::string::npos) {
|
||
// 二维数组
|
||
size_t commaPos = indexStr.find('_');
|
||
csvHeaderField.arrayIndex1 = std::stoi(indexStr.substr(0, commaPos)) - 1;
|
||
if (csvHeaderField.arrayIndex1 < 0) {
|
||
return;
|
||
}
|
||
csvHeaderField.arrayIndex2 = std::stoi(indexStr.substr(commaPos + 1)) - 1;
|
||
if (csvHeaderField.arrayIndex2 < 0) {
|
||
return;
|
||
}
|
||
csvHeaderField.arrayDim = 2;
|
||
} else {
|
||
// 一维数组
|
||
csvHeaderField.arrayIndex1 = std::stoi(indexStr) - 1;
|
||
if (csvHeaderField.arrayIndex1 < 0) {
|
||
return;
|
||
}
|
||
csvHeaderField.arrayDim = 1;
|
||
}
|
||
}
|
||
|
||
for (const auto &collectDataInfo : m_collectDataInfos) {
|
||
for (int i = 0; i < collectDataInfo.interfaceNames.size(); i++) {
|
||
if (collectDataInfo.interfaceNames[i] == csvHeaderField.fieldName) {
|
||
csvHeaderField.structName = collectDataInfo.structName;
|
||
if (collectDataInfo.arraySizes[i].second > 1) {
|
||
csvHeaderField.arraySize2 = collectDataInfo.arraySizes[i].second;
|
||
} else {
|
||
csvHeaderField.arraySize2 = 0;
|
||
}
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
if (csvHeaderField.structName.empty()) {
|
||
return;
|
||
}
|
||
|
||
m_headerFields.push_back(csvHeaderField);
|
||
}
|
||
|
||
void DataCollect::start()
|
||
{
|
||
std::lock_guard<std::mutex> lock(m_mutex);
|
||
if (!m_running) {
|
||
m_running = true;
|
||
m_thread = std::thread(&DataCollect::threadFunc, this);
|
||
}
|
||
}
|
||
|
||
void DataCollect::stop()
|
||
{
|
||
{
|
||
std::lock_guard<std::mutex> lock(m_mutex);
|
||
if (m_running) {
|
||
m_running = false;
|
||
m_cv.notify_all();
|
||
}
|
||
}
|
||
|
||
if (m_thread.joinable()) {
|
||
m_thread.join();
|
||
}
|
||
|
||
// 关闭文件
|
||
if (m_outputFile.is_open()) {
|
||
m_outputFile.close();
|
||
}
|
||
|
||
// 释放未启动的监控器
|
||
for (const auto &[structName, dataMonitor] : m_notStartedMonitors) {
|
||
DataMonitorFactory::ReleaseInstance(structName);
|
||
}
|
||
m_notStartedMonitors.clear();
|
||
m_alreadyStartedMonitors.clear();
|
||
}
|
||
|
||
void DataCollect::updateData()
|
||
{
|
||
for (const auto &headerField : m_headerFields) {
|
||
if (m_data.find(headerField.structName) != m_data.end()) {
|
||
if (m_data[headerField.structName].find(headerField.fieldName)
|
||
!= m_data[headerField.structName].end()) {
|
||
if (headerField.arrayDim == 0) {
|
||
m_outputFile << "," << m_data[headerField.structName][headerField.fieldName];
|
||
} else {
|
||
std::string value = m_data[headerField.structName][headerField.fieldName];
|
||
std::stringstream ss(value);
|
||
std::string item;
|
||
std::vector<std::string> values;
|
||
while (std::getline(ss, item, ',')) {
|
||
values.push_back(item);
|
||
}
|
||
if (headerField.arrayDim == 1) {
|
||
m_outputFile << "," << values[headerField.arrayIndex1];
|
||
} else if (headerField.arrayDim == 2) {
|
||
m_outputFile << ","
|
||
<< values[headerField.arrayIndex1 * headerField.arraySize2
|
||
+ headerField.arrayIndex2];
|
||
}
|
||
}
|
||
} else {
|
||
m_outputFile << "," << "0";
|
||
}
|
||
} else {
|
||
m_outputFile << "," << "0";
|
||
}
|
||
}
|
||
m_outputFile << std::endl;
|
||
}
|
||
|
||
bool DataCollect::isRunning() const
|
||
{
|
||
return m_running;
|
||
}
|
||
|
||
void DataCollect::threadFunc()
|
||
{
|
||
auto startTime = std::chrono::steady_clock::now();
|
||
auto step = std::chrono::milliseconds(static_cast<int64_t>(1000 / m_collectFrequency));
|
||
auto nextTime = step;
|
||
auto endTime =
|
||
startTime + std::chrono::milliseconds(static_cast<int64_t>(m_collectDuration * 1000));
|
||
|
||
while (m_running) {
|
||
// 等待直到到达执行时间
|
||
auto now = std::chrono::steady_clock::now();
|
||
auto targetTime = startTime + nextTime;
|
||
|
||
if (now < targetTime) {
|
||
std::this_thread::sleep_until(targetTime);
|
||
}
|
||
m_data.clear();
|
||
// 执行数据注入
|
||
for (const auto &monitorDataInfo : m_collectDataInfos) {
|
||
DataMonitorBasePtr dataMonitor;
|
||
if (m_notStartedMonitors.find(monitorDataInfo.structName)
|
||
!= m_notStartedMonitors.end()) {
|
||
dataMonitor = m_notStartedMonitors[monitorDataInfo.structName];
|
||
} else if (m_alreadyStartedMonitors.find(monitorDataInfo.structName)
|
||
!= m_alreadyStartedMonitors.end()) {
|
||
dataMonitor = m_alreadyStartedMonitors[monitorDataInfo.structName];
|
||
}
|
||
if (dataMonitor && m_data.find(monitorDataInfo.structName) == m_data.end()) {
|
||
m_data[monitorDataInfo.structName] =
|
||
dataMonitor->getStringData(monitorDataInfo.interfaceNames);
|
||
}
|
||
}
|
||
|
||
// 写入一行数据
|
||
nextTime += step;
|
||
if (targetTime > endTime) {
|
||
m_running = false;
|
||
break;
|
||
}
|
||
double timeStamp =
|
||
std::chrono::duration_cast<std::chrono::milliseconds>(now - startTime).count() / 1000.0;
|
||
m_outputFile << std::fixed << std::setprecision(3) << timeStamp;
|
||
updateData();
|
||
}
|
||
} |