275 lines
7.6 KiB
C++
275 lines
7.6 KiB
C++
#include "CSVDataInjectThread.h"
|
||
#include "DataMonitorFactory.h"
|
||
|
||
CSVDataInjectThread::CSVDataInjectThread(std::string csvFilePath)
|
||
: m_csvFilePath(csvFilePath), m_running(false), m_nextExecuteTime(0)
|
||
{
|
||
}
|
||
|
||
CSVDataInjectThread::~CSVDataInjectThread()
|
||
{
|
||
stop();
|
||
}
|
||
|
||
bool CSVDataInjectThread::Initialize(std::vector<MonitorDataInfo> injectDataInfos)
|
||
{
|
||
m_csvFile.open(m_csvFilePath);
|
||
if (!m_csvFile.is_open()) {
|
||
return false;
|
||
}
|
||
|
||
m_injectDataInfos = injectDataInfos;
|
||
|
||
std::string headerLine;
|
||
if (!std::getline(m_csvFile, headerLine)) {
|
||
return false;
|
||
}
|
||
|
||
// 修改CSV头解析逻辑
|
||
std::stringstream ss(headerLine);
|
||
std::string field;
|
||
std::getline(ss, field, ','); // 跳过第一列时间戳
|
||
|
||
// 使用getline读取所有字段,包括最后一个
|
||
while (std::getline(ss, field, ',')) {
|
||
// 去除字段前后的空白字符
|
||
field.erase(0, field.find_first_not_of(" \t\r\n"));
|
||
field.erase(field.find_last_not_of(" \t\r\n") + 1);
|
||
|
||
if (!field.empty()) {
|
||
parseHeaderField(field);
|
||
}
|
||
}
|
||
|
||
for (const auto &injectDataInfo : m_injectDataInfos) {
|
||
auto dataMonitor = DataMonitorFactory::GetInstance(injectDataInfo.structName);
|
||
if (dataMonitor == nullptr) {
|
||
return false;
|
||
}
|
||
if (dataMonitor->isInitialized()) {
|
||
m_alreadyStartedMonitors[injectDataInfo.structName] = dataMonitor;
|
||
} else {
|
||
dataMonitor->Initialize(nullptr, 0, 0);
|
||
m_notStartedMonitors[injectDataInfo.structName] = dataMonitor;
|
||
}
|
||
}
|
||
return true;
|
||
}
|
||
|
||
void CSVDataInjectThread::parseHeaderField(const std::string &headerField)
|
||
{
|
||
CSVHeaderField csvHeaderField;
|
||
csvHeaderField.fieldName = headerField.substr(0, headerField.find('('));
|
||
csvHeaderField.arrayDim = 0;
|
||
csvHeaderField.arrayIndex1 = 0;
|
||
csvHeaderField.arrayIndex2 = 0;
|
||
|
||
if (headerField.find('(') != std::string::npos) {
|
||
// 处理一维或二维数组,格式为interfaceName(1)或interfaceName(1,2)
|
||
size_t firstParenPos = headerField.find('(');
|
||
size_t lastParenPos = headerField.find(')');
|
||
std::string indexStr =
|
||
headerField.substr(firstParenPos + 1, lastParenPos - firstParenPos - 1);
|
||
if (indexStr.find('_') != std::string::npos) {
|
||
// 二维数组
|
||
size_t commaPos = indexStr.find('_');
|
||
csvHeaderField.arrayIndex1 = std::stoi(indexStr.substr(0, commaPos)) - 1;
|
||
if (csvHeaderField.arrayIndex1 < 0) {
|
||
return;
|
||
}
|
||
csvHeaderField.arrayIndex2 = std::stoi(indexStr.substr(commaPos + 1)) - 1;
|
||
if (csvHeaderField.arrayIndex2 < 0) {
|
||
return;
|
||
}
|
||
csvHeaderField.arrayDim = 2;
|
||
} else {
|
||
// 一维数组
|
||
csvHeaderField.arrayIndex1 = std::stoi(indexStr) - 1;
|
||
if (csvHeaderField.arrayIndex1 < 0) {
|
||
return;
|
||
}
|
||
csvHeaderField.arrayDim = 1;
|
||
}
|
||
}
|
||
|
||
for (const auto &injectDataInfo : m_injectDataInfos) {
|
||
for (int i = 0; i < injectDataInfo.interfaceNames.size(); i++) {
|
||
if (injectDataInfo.interfaceNames[i] == csvHeaderField.fieldName) {
|
||
csvHeaderField.structName = injectDataInfo.structName;
|
||
if (injectDataInfo.arraySizes[i].second > 1) {
|
||
csvHeaderField.arraySize2 = injectDataInfo.arraySizes[i].second;
|
||
} else {
|
||
csvHeaderField.arraySize2 = 0;
|
||
}
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
if (csvHeaderField.structName.empty()) {
|
||
return;
|
||
}
|
||
|
||
m_headerFields.push_back(csvHeaderField);
|
||
}
|
||
|
||
void CSVDataInjectThread::start()
|
||
{
|
||
std::lock_guard<std::mutex> lock(m_mutex);
|
||
if (!m_running) {
|
||
m_running = true;
|
||
m_thread = std::thread(&CSVDataInjectThread::threadFunc, this);
|
||
}
|
||
}
|
||
|
||
void CSVDataInjectThread::stop()
|
||
{
|
||
{
|
||
std::lock_guard<std::mutex> lock(m_mutex);
|
||
if (m_running) {
|
||
m_running = false;
|
||
m_cv.notify_all();
|
||
}
|
||
}
|
||
|
||
if (m_thread.joinable()) {
|
||
m_thread.join();
|
||
}
|
||
|
||
// 关闭文件
|
||
if (m_csvFile.is_open()) {
|
||
m_csvFile.close();
|
||
}
|
||
|
||
// 释放未启动的监控器
|
||
for (const auto &[structName, dataMonitor] : m_notStartedMonitors) {
|
||
DataMonitorFactory::ReleaseInstance(structName);
|
||
}
|
||
m_notStartedMonitors.clear();
|
||
m_alreadyStartedMonitors.clear();
|
||
}
|
||
|
||
void CSVDataInjectThread::updateData()
|
||
{
|
||
// 读取下一行数据
|
||
std::string line;
|
||
if (!std::getline(m_csvFile, line)) {
|
||
// 文件读取完毕,停止线程
|
||
m_running = false;
|
||
return;
|
||
}
|
||
|
||
// 解析数据
|
||
std::stringstream ss(line);
|
||
std::string field;
|
||
std::getline(ss, field, ',');
|
||
double timeStamp = 0;
|
||
try {
|
||
timeStamp = std::stod(field);
|
||
} catch (const std::exception &e) {
|
||
std::cout << "field: " << field << " is not a number" << std::endl;
|
||
return;
|
||
}
|
||
m_nextExecuteTime = static_cast<int64_t>(timeStamp * 1000); // 转换为毫秒
|
||
|
||
// 为每个结构体初始化数据
|
||
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<std::string>>>
|
||
tempDataMap;
|
||
for (const auto &injectDataInfo : m_injectDataInfos) {
|
||
std::unordered_map<std::string, std::vector<std::string>> dataMap;
|
||
for (int i = 0; i < injectDataInfo.interfaceNames.size(); i++) {
|
||
if (injectDataInfo.arraySizes[i].first > 1) {
|
||
for (int j = 0; j < injectDataInfo.arraySizes[i].first; j++) {
|
||
if (injectDataInfo.arraySizes[i].second > 1) {
|
||
for (int k = 0; k < injectDataInfo.arraySizes[i].second; k++) {
|
||
dataMap[injectDataInfo.interfaceNames[i]].push_back("0");
|
||
}
|
||
} else {
|
||
dataMap[injectDataInfo.interfaceNames[i]].push_back("0");
|
||
}
|
||
}
|
||
} else {
|
||
dataMap[injectDataInfo.interfaceNames[i]].push_back("0");
|
||
}
|
||
}
|
||
tempDataMap[injectDataInfo.structName] = dataMap;
|
||
}
|
||
|
||
// 读取下一行数据
|
||
for (int i = 0; i < m_headerFields.size(); i++) {
|
||
std::getline(ss, field, ',');
|
||
if (m_headerFields[i].arrayDim == 0) {
|
||
tempDataMap[m_headerFields[i].structName][m_headerFields[i].fieldName][0] = field;
|
||
} else if (m_headerFields[i].arrayDim == 1) {
|
||
tempDataMap[m_headerFields[i].structName][m_headerFields[i].fieldName]
|
||
[m_headerFields[i].arrayIndex1] = field;
|
||
} else if (m_headerFields[i].arrayDim == 2) {
|
||
tempDataMap[m_headerFields[i].structName][m_headerFields[i].fieldName]
|
||
[m_headerFields[i].arrayIndex1 * m_headerFields[i].arraySize2
|
||
+ m_headerFields[i].arrayIndex2] = field;
|
||
}
|
||
}
|
||
|
||
// 将tempDataMap转换为m_data格式
|
||
for (const auto &[structName, dataMap] : tempDataMap) {
|
||
std::unordered_map<std::string, std::string> structData;
|
||
for (const auto &[interfaceName, values] : dataMap) {
|
||
if (values.empty()) {
|
||
structData[interfaceName] = "0";
|
||
} else {
|
||
std::stringstream ss;
|
||
for (size_t i = 0; i < values.size(); ++i) {
|
||
ss << values[i];
|
||
if (i < values.size() - 1) {
|
||
ss << ",";
|
||
}
|
||
}
|
||
structData[interfaceName] = ss.str();
|
||
}
|
||
}
|
||
m_data[structName] = structData;
|
||
}
|
||
}
|
||
|
||
bool CSVDataInjectThread::isRunning() const
|
||
{
|
||
return m_running;
|
||
}
|
||
|
||
void CSVDataInjectThread::threadFunc()
|
||
{
|
||
// 读取第一行数据
|
||
updateData();
|
||
auto startTime = std::chrono::steady_clock::now();
|
||
|
||
while (m_running) {
|
||
int64_t nextTime = m_nextExecuteTime;
|
||
|
||
// 等待直到到达执行时间
|
||
auto now = std::chrono::steady_clock::now();
|
||
auto elapsed =
|
||
std::chrono::duration_cast<std::chrono::milliseconds>(now - startTime).count();
|
||
auto targetTime = startTime + std::chrono::milliseconds(nextTime);
|
||
|
||
if (now < targetTime) {
|
||
std::this_thread::sleep_until(targetTime);
|
||
}
|
||
|
||
// 执行数据注入
|
||
for (const auto &monitorDataInfo : m_injectDataInfos) {
|
||
DataMonitorBasePtr dataMonitor;
|
||
if (m_notStartedMonitors.find(monitorDataInfo.structName)
|
||
!= m_notStartedMonitors.end()) {
|
||
dataMonitor = m_notStartedMonitors[monitorDataInfo.structName];
|
||
} else if (m_alreadyStartedMonitors.find(monitorDataInfo.structName)
|
||
!= m_alreadyStartedMonitors.end()) {
|
||
dataMonitor = m_alreadyStartedMonitors[monitorDataInfo.structName];
|
||
}
|
||
if (dataMonitor && m_data.find(monitorDataInfo.structName) != m_data.end()) {
|
||
dataMonitor->setDataByString(m_data[monitorDataInfo.structName]);
|
||
}
|
||
}
|
||
|
||
// 读取下一行数据
|
||
updateData();
|
||
}
|
||
} |