150 lines
3.5 KiB
C++
150 lines
3.5 KiB
C++
#include "CSVDataInjectThread.h"
|
|
#include "DataMonitorFactory.h"
|
|
|
|
CSVDataInjectThread::CSVDataInjectThread(std::string csvFilePath)
|
|
: m_csvFilePath(csvFilePath), m_running(false), m_nextExecuteTime(0)
|
|
{
|
|
}
|
|
|
|
CSVDataInjectThread::~CSVDataInjectThread()
|
|
{
|
|
stop();
|
|
}
|
|
|
|
bool CSVDataInjectThread::Initialize(std::vector<std::string> structNames)
|
|
{
|
|
m_csvFile.open(m_csvFilePath);
|
|
if (!m_csvFile.is_open()) {
|
|
return false;
|
|
}
|
|
|
|
std::string headerLine;
|
|
if (!std::getline(m_csvFile, headerLine)) {
|
|
return false;
|
|
}
|
|
|
|
std::vector<std::string> interfaceNames;
|
|
std::stringstream ss(headerLine);
|
|
std::string field;
|
|
std::getline(ss, field, ',');
|
|
while (std::getline(ss, field, ',')) {
|
|
interfaceNames.push_back(field);
|
|
}
|
|
|
|
// 将结构体和接口名称一一对应
|
|
if (structNames.size() != interfaceNames.size()) {
|
|
return false;
|
|
}
|
|
|
|
for (int i = 0; i < structNames.size(); i++) {
|
|
m_structInterfaceMap[structNames[i]].push_back(interfaceNames[i]);
|
|
}
|
|
|
|
for (const auto &[structName, interfaceNames] : m_structInterfaceMap) {
|
|
auto dataMonitor = DataMonitorFactory::GetInstance(structName);
|
|
if (dataMonitor == nullptr) {
|
|
return false;
|
|
}
|
|
if (dataMonitor->isInitialized()) {
|
|
m_alreadyStartedMonitors[structName] = dataMonitor;
|
|
} else {
|
|
m_notStartedMonitors[structName] = dataMonitor;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void CSVDataInjectThread::start()
|
|
{
|
|
std::lock_guard<std::mutex> lock(m_mutex);
|
|
if (!m_running) {
|
|
m_running = true;
|
|
m_thread = std::thread(&CSVDataInjectThread::threadFunc, this);
|
|
}
|
|
}
|
|
|
|
void CSVDataInjectThread::stop()
|
|
{
|
|
{
|
|
std::lock_guard<std::mutex> lock(m_mutex);
|
|
if (m_running) {
|
|
m_running = false;
|
|
m_cv.notify_all();
|
|
}
|
|
}
|
|
|
|
if (m_thread.joinable()) {
|
|
m_thread.join();
|
|
}
|
|
|
|
// 关闭文件
|
|
if (m_csvFile.is_open()) {
|
|
m_csvFile.close();
|
|
}
|
|
|
|
// 释放未启动的监控器
|
|
for (const auto &[structName, dataMonitor] : m_notStartedMonitors) {
|
|
DataMonitorFactory::ReleaseInstance(structName);
|
|
}
|
|
m_notStartedMonitors.clear();
|
|
m_alreadyStartedMonitors.clear();
|
|
}
|
|
|
|
void CSVDataInjectThread::updateData()
|
|
{
|
|
// 读取下一行数据
|
|
std::string line;
|
|
if (!std::getline(m_csvFile, line)) {
|
|
// 文件读取完毕,停止线程
|
|
m_running = false;
|
|
return;
|
|
}
|
|
|
|
// 解析数据
|
|
std::stringstream ss(line);
|
|
std::string field;
|
|
std::getline(ss, field, ',');
|
|
double timeStamp = std::stod(field);
|
|
m_nextExecuteTime = static_cast<int64_t>(timeStamp * 1000); // 转换为毫秒
|
|
|
|
// 解析每个结构体的数据
|
|
for (const auto &[structName, interfaceNames] : m_structInterfaceMap) {
|
|
std::unordered_map<std::string, std::string> dataMap;
|
|
for (const auto &interfaceName : interfaceNames) {
|
|
std::getline(ss, field, ',');
|
|
dataMap[interfaceName] = field;
|
|
}
|
|
m_data[structName] = dataMap;
|
|
}
|
|
}
|
|
|
|
void CSVDataInjectThread::threadFunc()
|
|
{
|
|
// 读取第一行数据
|
|
updateData();
|
|
auto startTime = std::chrono::steady_clock::now();
|
|
|
|
while (m_running) {
|
|
int64_t nextTime = m_nextExecuteTime;
|
|
|
|
// 等待直到到达执行时间
|
|
auto now = std::chrono::steady_clock::now();
|
|
auto elapsed =
|
|
std::chrono::duration_cast<std::chrono::milliseconds>(now - startTime).count();
|
|
auto targetTime = startTime + std::chrono::milliseconds(nextTime);
|
|
|
|
if (now < targetTime) {
|
|
std::this_thread::sleep_until(targetTime);
|
|
}
|
|
|
|
// 执行数据注入
|
|
for (const auto &[structName, dataMonitor] : m_alreadyStartedMonitors) {
|
|
if (dataMonitor && m_data.find(structName) != m_data.end()) {
|
|
dataMonitor->setDataByString(m_data[structName]);
|
|
}
|
|
}
|
|
|
|
// 读取下一行数据
|
|
updateData();
|
|
}
|
|
} |