XNSim/XNMonitorServer/CSVDataInjectThread.cpp

275 lines
7.6 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "CSVDataInjectThread.h"
#include "DataMonitorFactory.h"
CSVDataInjectThread::CSVDataInjectThread(std::string csvFilePath)
: m_csvFilePath(csvFilePath), m_running(false), m_nextExecuteTime(0)
{
}
CSVDataInjectThread::~CSVDataInjectThread()
{
stop();
}
bool CSVDataInjectThread::Initialize(std::vector<MonitorDataInfo> injectDataInfos)
{
m_csvFile.open(m_csvFilePath);
if (!m_csvFile.is_open()) {
return false;
}
m_injectDataInfos = injectDataInfos;
std::string headerLine;
if (!std::getline(m_csvFile, headerLine)) {
return false;
}
// 修改CSV头解析逻辑
std::stringstream ss(headerLine);
std::string field;
std::getline(ss, field, ','); // 跳过第一列时间戳
// 使用getline读取所有字段包括最后一个
while (std::getline(ss, field, ',')) {
// 去除字段前后的空白字符
field.erase(0, field.find_first_not_of(" \t\r\n"));
field.erase(field.find_last_not_of(" \t\r\n") + 1);
if (!field.empty()) {
parseHeaderField(field);
}
}
for (const auto &injectDataInfo : m_injectDataInfos) {
auto dataMonitor = DataMonitorFactory::GetInstance(injectDataInfo.structName);
if (dataMonitor == nullptr) {
return false;
}
if (dataMonitor->isInitialized()) {
m_alreadyStartedMonitors[injectDataInfo.structName] = dataMonitor;
} else {
dataMonitor->Initialize(nullptr, 0, 0);
m_notStartedMonitors[injectDataInfo.structName] = dataMonitor;
}
}
return true;
}
void CSVDataInjectThread::parseHeaderField(const std::string &headerField)
{
CSVHeaderField csvHeaderField;
csvHeaderField.fieldName = headerField.substr(0, headerField.find('('));
csvHeaderField.arrayDim = 0;
csvHeaderField.arrayIndex1 = 0;
csvHeaderField.arrayIndex2 = 0;
if (headerField.find('(') != std::string::npos) {
// 处理一维或二维数组格式为interfaceName(1)或interfaceName(1,2)
size_t firstParenPos = headerField.find('(');
size_t lastParenPos = headerField.find(')');
std::string indexStr =
headerField.substr(firstParenPos + 1, lastParenPos - firstParenPos - 1);
if (indexStr.find('_') != std::string::npos) {
// 二维数组
size_t commaPos = indexStr.find('_');
csvHeaderField.arrayIndex1 = std::stoi(indexStr.substr(0, commaPos)) - 1;
if (csvHeaderField.arrayIndex1 < 0) {
return;
}
csvHeaderField.arrayIndex2 = std::stoi(indexStr.substr(commaPos + 1)) - 1;
if (csvHeaderField.arrayIndex2 < 0) {
return;
}
csvHeaderField.arrayDim = 2;
} else {
// 一维数组
csvHeaderField.arrayIndex1 = std::stoi(indexStr) - 1;
if (csvHeaderField.arrayIndex1 < 0) {
return;
}
csvHeaderField.arrayDim = 1;
}
}
for (const auto &injectDataInfo : m_injectDataInfos) {
for (int i = 0; i < injectDataInfo.interfaceNames.size(); i++) {
if (injectDataInfo.interfaceNames[i] == csvHeaderField.fieldName) {
csvHeaderField.structName = injectDataInfo.structName;
if (injectDataInfo.arraySizes[i].second > 1) {
csvHeaderField.arraySize2 = injectDataInfo.arraySizes[i].second;
} else {
csvHeaderField.arraySize2 = 0;
}
break;
}
}
}
if (csvHeaderField.structName.empty()) {
return;
}
m_headerFields.push_back(csvHeaderField);
}
void CSVDataInjectThread::start()
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_running) {
m_running = true;
m_thread = std::thread(&CSVDataInjectThread::threadFunc, this);
}
}
void CSVDataInjectThread::stop()
{
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_running) {
m_running = false;
m_cv.notify_all();
}
}
if (m_thread.joinable()) {
m_thread.join();
}
// 关闭文件
if (m_csvFile.is_open()) {
m_csvFile.close();
}
// 释放未启动的监控器
for (const auto &[structName, dataMonitor] : m_notStartedMonitors) {
DataMonitorFactory::ReleaseInstance(structName);
}
m_notStartedMonitors.clear();
m_alreadyStartedMonitors.clear();
}
void CSVDataInjectThread::updateData()
{
// 读取下一行数据
std::string line;
if (!std::getline(m_csvFile, line)) {
// 文件读取完毕,停止线程
m_running = false;
return;
}
// 解析数据
std::stringstream ss(line);
std::string field;
std::getline(ss, field, ',');
double timeStamp = 0;
try {
timeStamp = std::stod(field);
} catch (const std::exception &e) {
std::cout << "field: " << field << " is not a number" << std::endl;
return;
}
m_nextExecuteTime = static_cast<int64_t>(timeStamp * 1000); // 转换为毫秒
// 为每个结构体初始化数据
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<std::string>>>
tempDataMap;
for (const auto &injectDataInfo : m_injectDataInfos) {
std::unordered_map<std::string, std::vector<std::string>> dataMap;
for (int i = 0; i < injectDataInfo.interfaceNames.size(); i++) {
if (injectDataInfo.arraySizes[i].first > 1) {
for (int j = 0; j < injectDataInfo.arraySizes[i].first; j++) {
if (injectDataInfo.arraySizes[i].second > 1) {
for (int k = 0; k < injectDataInfo.arraySizes[i].second; k++) {
dataMap[injectDataInfo.interfaceNames[i]].push_back("0");
}
} else {
dataMap[injectDataInfo.interfaceNames[i]].push_back("0");
}
}
} else {
dataMap[injectDataInfo.interfaceNames[i]].push_back("0");
}
}
tempDataMap[injectDataInfo.structName] = dataMap;
}
// 读取下一行数据
for (int i = 0; i < m_headerFields.size(); i++) {
std::getline(ss, field, ',');
if (m_headerFields[i].arrayDim == 0) {
tempDataMap[m_headerFields[i].structName][m_headerFields[i].fieldName][0] = field;
} else if (m_headerFields[i].arrayDim == 1) {
tempDataMap[m_headerFields[i].structName][m_headerFields[i].fieldName]
[m_headerFields[i].arrayIndex1] = field;
} else if (m_headerFields[i].arrayDim == 2) {
tempDataMap[m_headerFields[i].structName][m_headerFields[i].fieldName]
[m_headerFields[i].arrayIndex1 * m_headerFields[i].arraySize2
+ m_headerFields[i].arrayIndex2] = field;
}
}
// 将tempDataMap转换为m_data格式
for (const auto &[structName, dataMap] : tempDataMap) {
std::unordered_map<std::string, std::string> structData;
for (const auto &[interfaceName, values] : dataMap) {
if (values.empty()) {
structData[interfaceName] = "0";
} else {
std::stringstream ss;
for (size_t i = 0; i < values.size(); ++i) {
ss << values[i];
if (i < values.size() - 1) {
ss << ",";
}
}
structData[interfaceName] = ss.str();
}
}
m_data[structName] = structData;
}
}
bool CSVDataInjectThread::isRunning() const
{
return m_running;
}
void CSVDataInjectThread::threadFunc()
{
// 读取第一行数据
updateData();
auto startTime = std::chrono::steady_clock::now();
while (m_running) {
int64_t nextTime = m_nextExecuteTime;
// 等待直到到达执行时间
auto now = std::chrono::steady_clock::now();
auto elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(now - startTime).count();
auto targetTime = startTime + std::chrono::milliseconds(nextTime);
if (now < targetTime) {
std::this_thread::sleep_until(targetTime);
}
// 执行数据注入
for (const auto &monitorDataInfo : m_injectDataInfos) {
DataMonitorBasePtr dataMonitor;
if (m_notStartedMonitors.find(monitorDataInfo.structName)
!= m_notStartedMonitors.end()) {
dataMonitor = m_notStartedMonitors[monitorDataInfo.structName];
} else if (m_alreadyStartedMonitors.find(monitorDataInfo.structName)
!= m_alreadyStartedMonitors.end()) {
dataMonitor = m_alreadyStartedMonitors[monitorDataInfo.structName];
}
if (dataMonitor && m_data.find(monitorDataInfo.structName) != m_data.end()) {
dataMonitor->setDataByString(m_data[monitorDataInfo.structName]);
}
}
// 读取下一行数据
updateData();
}
}