XNSim/XNMonitorServer/CSVDataInjectThread.cpp

260 lines
7.1 KiB
C++
Raw Normal View History

#include "CSVDataInjectThread.h"
#include "DataMonitorFactory.h"
CSVDataInjectThread::CSVDataInjectThread(std::string csvFilePath)
: m_csvFilePath(csvFilePath), m_running(false), m_nextExecuteTime(0)
{
}
CSVDataInjectThread::~CSVDataInjectThread()
{
stop();
}
bool CSVDataInjectThread::Initialize(std::vector<InjectDataInfo> injectDataInfos)
{
m_csvFile.open(m_csvFilePath);
if (!m_csvFile.is_open()) {
return false;
}
m_injectDataInfos = injectDataInfos;
std::string headerLine;
if (!std::getline(m_csvFile, headerLine)) {
return false;
}
// 修改CSV头解析逻辑
std::stringstream ss(headerLine);
std::string field;
std::getline(ss, field, ','); // 跳过第一列时间戳
// 使用getline读取所有字段包括最后一个
while (std::getline(ss, field, ',')) {
// 去除字段前后的空白字符
field.erase(0, field.find_first_not_of(" \t\r\n"));
field.erase(field.find_last_not_of(" \t\r\n") + 1);
if (!field.empty()) {
parseHeaderField(field);
}
}
for (const auto &injectDataInfo : m_injectDataInfos) {
auto dataMonitor = DataMonitorFactory::GetInstance(injectDataInfo.structName);
if (dataMonitor == nullptr) {
return false;
}
if (dataMonitor->isInitialized()) {
m_alreadyStartedMonitors[injectDataInfo.structName] = dataMonitor;
} else {
m_notStartedMonitors[injectDataInfo.structName] = dataMonitor;
}
}
return true;
}
void CSVDataInjectThread::parseHeaderField(const std::string &headerField)
{
CSVHeaderField csvHeaderField;
csvHeaderField.fieldName = headerField.substr(0, headerField.find('('));
csvHeaderField.arrayDim = 0;
csvHeaderField.arrayIndex1 = 0;
csvHeaderField.arrayIndex2 = 0;
if (headerField.find('(') != std::string::npos) {
// 处理一维或二维数组格式为interfaceName(1)或interfaceName(1,2)
size_t firstParenPos = headerField.find('(');
size_t lastParenPos = headerField.find(')');
std::string indexStr =
headerField.substr(firstParenPos + 1, lastParenPos - firstParenPos - 1);
if (indexStr.find('_') != std::string::npos) {
// 二维数组
size_t commaPos = indexStr.find('_');
csvHeaderField.arrayIndex1 = std::stoi(indexStr.substr(0, commaPos)) - 1;
if (csvHeaderField.arrayIndex1 < 0) {
return;
}
csvHeaderField.arrayIndex2 = std::stoi(indexStr.substr(commaPos + 1)) - 1;
if (csvHeaderField.arrayIndex2 < 0) {
return;
}
csvHeaderField.arrayDim = 2;
} else {
// 一维数组
csvHeaderField.arrayIndex1 = std::stoi(indexStr) - 1;
if (csvHeaderField.arrayIndex1 < 0) {
return;
}
csvHeaderField.arrayDim = 1;
}
}
for (const auto &injectDataInfo : m_injectDataInfos) {
for (int i = 0; i < injectDataInfo.interfaceNames.size(); i++) {
if (injectDataInfo.interfaceNames[i] == csvHeaderField.fieldName) {
csvHeaderField.structName = injectDataInfo.structName;
if (injectDataInfo.arraySizes[i].second > 1) {
csvHeaderField.arraySize1 = injectDataInfo.arraySizes[i].first;
} else {
csvHeaderField.arraySize1 = 0;
}
break;
}
}
}
if (csvHeaderField.structName.empty()) {
return;
}
m_headerFields.push_back(csvHeaderField);
}
void CSVDataInjectThread::start()
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_running) {
m_running = true;
m_thread = std::thread(&CSVDataInjectThread::threadFunc, this);
}
}
void CSVDataInjectThread::stop()
{
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_running) {
m_running = false;
m_cv.notify_all();
}
}
if (m_thread.joinable()) {
m_thread.join();
}
// 关闭文件
if (m_csvFile.is_open()) {
m_csvFile.close();
}
// 释放未启动的监控器
for (const auto &[structName, dataMonitor] : m_notStartedMonitors) {
DataMonitorFactory::ReleaseInstance(structName);
}
m_notStartedMonitors.clear();
m_alreadyStartedMonitors.clear();
}
void CSVDataInjectThread::updateData()
{
// 读取下一行数据
std::string line;
if (!std::getline(m_csvFile, line)) {
// 文件读取完毕,停止线程
m_running = false;
return;
}
// 解析数据
std::stringstream ss(line);
std::string field;
std::getline(ss, field, ',');
double timeStamp = std::stod(field);
m_nextExecuteTime = static_cast<int64_t>(timeStamp * 1000); // 转换为毫秒
// 为每个结构体初始化数据
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<std::string>>>
tempDataMap;
for (const auto &injectDataInfo : m_injectDataInfos) {
std::unordered_map<std::string, std::vector<std::string>> dataMap;
for (int i = 0; i < injectDataInfo.interfaceNames.size(); i++) {
if (injectDataInfo.arraySizes[i].first > 1) {
for (int j = 0; j < injectDataInfo.arraySizes[i].first; j++) {
if (injectDataInfo.arraySizes[i].second > 1) {
for (int k = 0; k < injectDataInfo.arraySizes[i].second; k++) {
dataMap[injectDataInfo.interfaceNames[i]].push_back("0");
}
} else {
dataMap[injectDataInfo.interfaceNames[i]].push_back("0");
}
}
} else {
dataMap[injectDataInfo.interfaceNames[i]].push_back("0");
}
}
tempDataMap[injectDataInfo.structName] = dataMap;
}
// 读取下一行数据
for (int i = 0; i < m_headerFields.size(); i++) {
std::getline(ss, field, ',');
if (m_headerFields[i].arrayDim == 0) {
tempDataMap[m_headerFields[i].structName][m_headerFields[i].fieldName][0] = field;
} else if (m_headerFields[i].arrayDim == 1) {
tempDataMap[m_headerFields[i].structName][m_headerFields[i].fieldName]
[m_headerFields[i].arrayIndex1] = field;
} else if (m_headerFields[i].arrayDim == 2) {
tempDataMap[m_headerFields[i].structName][m_headerFields[i].fieldName]
[m_headerFields[i].arrayIndex1 * m_headerFields[i].arraySize1
+ m_headerFields[i].arrayIndex2] = field;
}
}
// 将tempDataMap转换为m_data格式
for (const auto &[structName, dataMap] : tempDataMap) {
std::unordered_map<std::string, std::string> structData;
for (const auto &[interfaceName, values] : dataMap) {
if (values.empty()) {
structData[interfaceName] = "0";
} else {
std::stringstream ss;
for (size_t i = 0; i < values.size(); ++i) {
ss << values[i];
if (i < values.size() - 1) {
ss << ",";
}
}
structData[interfaceName] = ss.str();
}
}
m_data[structName] = structData;
}
}
bool CSVDataInjectThread::isRunning() const
{
return m_running;
}
void CSVDataInjectThread::threadFunc()
{
// 读取第一行数据
updateData();
auto startTime = std::chrono::steady_clock::now();
while (m_running) {
int64_t nextTime = m_nextExecuteTime;
// 等待直到到达执行时间
auto now = std::chrono::steady_clock::now();
auto elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(now - startTime).count();
auto targetTime = startTime + std::chrono::milliseconds(nextTime);
if (now < targetTime) {
std::this_thread::sleep_until(targetTime);
}
// 执行数据注入
for (const auto &[structName, dataMonitor] : m_alreadyStartedMonitors) {
if (dataMonitor && m_data.find(structName) != m_data.end()) {
dataMonitor->setDataByString(m_data[structName]);
}
}
// 读取下一行数据
updateData();
}
}