XNSim/XNMonitorServer/CSVDataInjectThread.cpp

148 lines
3.4 KiB
C++

#include "CSVDataInjectThread.h"
#include "DataMonitorFactory.h"
CSVDataInjectThread::CSVDataInjectThread(std::string csvFilePath)
: m_csvFilePath(csvFilePath), m_running(false), m_nextExecuteTime(0)
{
}
CSVDataInjectThread::~CSVDataInjectThread()
{
stop();
}
bool CSVDataInjectThread::Initialize(std::vector<std::string> structNames)
{
m_csvFile.open(m_csvFilePath);
if (!m_csvFile.is_open()) {
return false;
}
std::string headerLine;
if (!std::getline(m_csvFile, headerLine)) {
return false;
}
std::vector<std::string> interfaceNames;
std::stringstream ss(headerLine);
std::string field;
std::getline(ss, field, ',');
while (std::getline(ss, field, ',')) {
interfaceNames.push_back(field);
}
// 将结构体和接口名称一一对应
if (structNames.size() != interfaceNames.size()) {
return false;
}
for (int i = 0; i < structNames.size(); i++) {
m_structInterfaceMap[structNames[i]].push_back(interfaceNames[i]);
}
for (const auto &[structName, interfaceNames] : m_structInterfaceMap) {
auto dataMonitor = DataMonitorFactory::GetInstance(structName);
if (dataMonitor == nullptr) {
return false;
}
if (dataMonitor->isInitialized()) {
m_alreadyStartedMonitors[structName] = dataMonitor;
} else {
m_notStartedMonitors[structName] = dataMonitor;
}
}
return true;
}
void CSVDataInjectThread::start()
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_running) {
m_running = true;
m_thread = std::thread(&CSVDataInjectThread::threadFunc, this);
}
}
void CSVDataInjectThread::stop()
{
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_running) {
m_running = false;
m_cv.notify_all();
}
}
if (m_thread.joinable()) {
m_thread.join();
}
// 关闭文件
if (m_csvFile.is_open()) {
m_csvFile.close();
}
// 释放未启动的监控器
for (const auto &[structName, dataMonitor] : m_notStartedMonitors) {
DataMonitorFactory::ReleaseInstance(structName);
}
m_notStartedMonitors.clear();
m_alreadyStartedMonitors.clear();
}
void CSVDataInjectThread::updateData()
{
// 读取下一行数据
std::string line;
if (!std::getline(m_csvFile, line)) {
// 文件读取完毕,停止线程
m_running = false;
return;
}
// 解析数据
std::stringstream ss(line);
std::string field;
std::getline(ss, field, ',');
double timeStamp = std::stod(field);
m_nextExecuteTime = static_cast<int64_t>(timeStamp * 1000); // 转换为毫秒
// 解析每个结构体的数据
for (const auto &[structName, interfaceNames] : m_structInterfaceMap) {
std::unordered_map<std::string, std::string> dataMap;
for (const auto &interfaceName : interfaceNames) {
std::getline(ss, field, ',');
dataMap[interfaceName] = field;
}
m_data[structName] = dataMap;
}
}
void CSVDataInjectThread::threadFunc()
{
// 读取第一行数据
updateData();
while (m_running) {
int64_t nextTime = m_nextExecuteTime;
// 等待直到到达执行时间
auto now = std::chrono::system_clock::now();
auto targetTime = std::chrono::system_clock::from_time_t(nextTime / 1000)
+ std::chrono::milliseconds(nextTime % 1000);
if (now < targetTime) {
std::this_thread::sleep_until(targetTime);
}
// 执行数据注入
for (const auto &[structName, dataMonitor] : m_alreadyStartedMonitors) {
if (dataMonitor && m_data.find(structName) != m_data.end()) {
dataMonitor->setDataByString(m_data[structName]);
}
}
// 读取下一行数据
updateData();
}
}