#include "CSVDataInjectThread.h" #include "DataMonitorFactory.h" CSVDataInjectThread::CSVDataInjectThread(std::string csvFilePath) : m_csvFilePath(csvFilePath), m_running(false), m_nextExecuteTime(0) { } CSVDataInjectThread::~CSVDataInjectThread() { stop(); } bool CSVDataInjectThread::Initialize(std::vector injectDataInfos) { m_csvFile.open(m_csvFilePath); if (!m_csvFile.is_open()) { return false; } m_injectDataInfos = injectDataInfos; std::string headerLine; if (!std::getline(m_csvFile, headerLine)) { return false; } // 修改CSV头解析逻辑 std::stringstream ss(headerLine); std::string field; std::getline(ss, field, ','); // 跳过第一列时间戳 // 使用getline读取所有字段,包括最后一个 while (std::getline(ss, field, ',')) { // 去除字段前后的空白字符 field.erase(0, field.find_first_not_of(" \t\r\n")); field.erase(field.find_last_not_of(" \t\r\n") + 1); if (!field.empty()) { parseHeaderField(field); } } for (const auto &injectDataInfo : m_injectDataInfos) { auto dataMonitor = DataMonitorFactory::GetInstance(injectDataInfo.structName); if (dataMonitor == nullptr) { return false; } if (dataMonitor->isInitialized()) { m_alreadyStartedMonitors[injectDataInfo.structName] = dataMonitor; } else { m_notStartedMonitors[injectDataInfo.structName] = dataMonitor; } } return true; } void CSVDataInjectThread::parseHeaderField(const std::string &headerField) { CSVHeaderField csvHeaderField; csvHeaderField.fieldName = headerField.substr(0, headerField.find('[')); csvHeaderField.arrayDim = 0; csvHeaderField.arrayIndex1 = 0; csvHeaderField.arrayIndex2 = 0; if (headerField.find('[') != std::string::npos) { // 处理一维数组 size_t firstBracketPos = headerField.find('['); size_t firstBracketEndPos = headerField.find(']'); csvHeaderField.arrayIndex1 = std::stoi( headerField.substr(firstBracketPos + 1, firstBracketEndPos - firstBracketPos - 1)); csvHeaderField.arrayDim = 1; // 检查是否有第二个方括号,判断是否为二维数组 if (headerField.find('[', firstBracketEndPos) != std::string::npos) { size_t secondBracketPos = headerField.find('[', firstBracketEndPos); size_t secondBracketEndPos = headerField.find(']', secondBracketPos); csvHeaderField.arrayIndex2 = std::stoi(headerField.substr( secondBracketPos + 1, secondBracketEndPos - secondBracketPos - 1)); csvHeaderField.arrayDim = 2; } } for (const auto &injectDataInfo : m_injectDataInfos) { for (int i = 0; i < injectDataInfo.interfaceNames.size(); i++) { if (injectDataInfo.interfaceNames[i] == csvHeaderField.fieldName) { csvHeaderField.structName = injectDataInfo.structName; if (injectDataInfo.arraySizes[i].second > 1) { csvHeaderField.arraySize1 = injectDataInfo.arraySizes[i].first; } else { csvHeaderField.arraySize1 = 0; } break; } } } if (csvHeaderField.structName.empty()) { return; } m_headerFields.push_back(csvHeaderField); } void CSVDataInjectThread::start() { std::lock_guard lock(m_mutex); if (!m_running) { m_running = true; m_thread = std::thread(&CSVDataInjectThread::threadFunc, this); } } void CSVDataInjectThread::stop() { { std::lock_guard lock(m_mutex); if (m_running) { m_running = false; m_cv.notify_all(); } } if (m_thread.joinable()) { m_thread.join(); } // 关闭文件 if (m_csvFile.is_open()) { m_csvFile.close(); } // 释放未启动的监控器 for (const auto &[structName, dataMonitor] : m_notStartedMonitors) { DataMonitorFactory::ReleaseInstance(structName); } m_notStartedMonitors.clear(); m_alreadyStartedMonitors.clear(); } void CSVDataInjectThread::updateData() { // 读取下一行数据 std::string line; if (!std::getline(m_csvFile, line)) { // 文件读取完毕,停止线程 m_running = false; return; } // 解析数据 std::stringstream ss(line); std::string field; std::getline(ss, field, ','); double timeStamp = std::stod(field); m_nextExecuteTime = static_cast(timeStamp * 1000); // 转换为毫秒 // 为每个结构体初始化数据 std::unordered_map>> tempDataMap; for (const auto &injectDataInfo : m_injectDataInfos) { std::unordered_map> dataMap; for (int i = 0; i < injectDataInfo.interfaceNames.size(); i++) { if (injectDataInfo.arraySizes[i].first > 1) { for (int j = 0; j < injectDataInfo.arraySizes[i].first; j++) { if (injectDataInfo.arraySizes[i].second > 1) { for (int k = 0; k < injectDataInfo.arraySizes[i].second; k++) { dataMap[injectDataInfo.interfaceNames[i]].push_back("0"); } } else { dataMap[injectDataInfo.interfaceNames[i]].push_back("0"); } } } else { dataMap[injectDataInfo.interfaceNames[i]].push_back("0"); } } tempDataMap[injectDataInfo.structName] = dataMap; } // 读取下一行数据 for (int i = 0; i < m_headerFields.size(); i++) { std::getline(ss, field, ','); if (m_headerFields[i].arrayDim == 0) { tempDataMap[m_headerFields[i].structName][m_headerFields[i].fieldName][0] = field; } else if (m_headerFields[i].arrayDim == 1) { tempDataMap[m_headerFields[i].structName][m_headerFields[i].fieldName] [m_headerFields[i].arrayIndex1] = field; } else if (m_headerFields[i].arrayDim == 2) { tempDataMap[m_headerFields[i].structName][m_headerFields[i].fieldName] [m_headerFields[i].arrayIndex1 * m_headerFields[i].arraySize1 + m_headerFields[i].arrayIndex2] = field; } } // 将tempDataMap转换为m_data格式 for (const auto &[structName, dataMap] : tempDataMap) { std::unordered_map structData; for (const auto &[interfaceName, values] : dataMap) { if (values.empty()) { structData[interfaceName] = "0"; } else { std::stringstream ss; for (size_t i = 0; i < values.size(); ++i) { ss << values[i]; if (i < values.size() - 1) { ss << ","; } } structData[interfaceName] = ss.str(); } } m_data[structName] = structData; } } bool CSVDataInjectThread::isRunning() const { return m_running; } void CSVDataInjectThread::threadFunc() { // 读取第一行数据 updateData(); auto startTime = std::chrono::steady_clock::now(); while (m_running) { int64_t nextTime = m_nextExecuteTime; // 等待直到到达执行时间 auto now = std::chrono::steady_clock::now(); auto elapsed = std::chrono::duration_cast(now - startTime).count(); auto targetTime = startTime + std::chrono::milliseconds(nextTime); if (now < targetTime) { std::this_thread::sleep_until(targetTime); } // 执行数据注入 for (const auto &[structName, dataMonitor] : m_alreadyStartedMonitors) { if (dataMonitor && m_data.find(structName) != m_data.end()) { dataMonitor->setDataByString(m_data[structName]); } } // 读取下一行数据 updateData(); } }