XNSim/XNCore/XNEventManager.cpp

291 lines
7.9 KiB
C++
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "XNEventManager.h"
#include "XNEventManager_p.h"
#include "XNFramework.h"
#include <thread>
#include <algorithm>
// 构造函数
XNEventManager::XNEventManager() : XNBaseFrameObject(new XNEventManagerPrivate())
{
// 设置唯一标识符
SetUniqueId(7);
// 设置对象名称
SetObjectName("XNEventManager");
}
// 析构函数
XNEventManager::~XNEventManager()
{
T_D();
d->running = false;
d->taskCond.notify_all();
for (auto &thread : d->workerThreads) {
if (thread.joinable()) {
thread.join();
}
}
}
// 保护构造函数实现
XNEventManager::XNEventManager(PrivateType *p) : XNBaseFrameObject(p)
{
}
// 初始化事件管理器
bool XNEventManager::Initialize()
{
T_D();
// 配置普通线程池
SetMaxThreadCount(std::thread::hardware_concurrency());
// 配置实时线程池
SetRTThreadPoolConfig(2, // 最大线程数
sched_get_priority_min(SCHED_FIFO), // 最小优先级
sched_get_priority_max(SCHED_FIFO)); // 最大优先级
LOG_INFO("D01094001:事件管理器初始化成功!");
d->_status = XNFrameObjectStatus::Initialized;
return true;
}
// 准备执行
bool XNEventManager::PrepareForExecute()
{
T_D();
d->_status = XNFrameObjectStatus::Ready;
LOG_INFO("D01094002:事件管理器准备就绪!");
return true;
}
// 修改注册事件处理器的实现
int XNEventManager::RegisterEventHandler(const std::string &eventName, XNEventCallback callback,
uint32_t objectId, bool async, XNEvent::Priority priority)
{
T_D();
if (eventName.empty() || !callback) {
LOG_WARNING("A01093003:注册的事件名称或回调函数为空,注册失败!");
return -1;
}
std::lock_guard<std::mutex> locker(d->eventMutex);
// 生成新的本地ID
d->localIdCounter = (d->localIdCounter + 1) & 0xFFFF;
if (d->localIdCounter == 0)
d->localIdCounter = 1; // 避免为0
// 创建处理器信息
EventHandlerInfo handlerInfo;
handlerInfo.callback = callback;
handlerInfo.objectId = objectId;
handlerInfo.localId = d->localIdCounter;
handlerInfo.isAsync = async;
handlerInfo.priority = priority;
// 计算全局处理器ID
int handlerId = handlerInfo.GetHandlerId();
// 添加处理器信息到事件列表
d->eventHandlers[eventName].push_back(handlerInfo);
// 添加反向映射
d->handlerToEvent[handlerId] = eventName;
LOG_INFO("D01094004:注册 %1 事件处理器, 事件名称: %2, 处理器ID: %3 (对象ID: %4, 本地ID: %5)",
async ? "异步" : "同步", eventName, handlerId, objectId, d->localIdCounter);
return handlerId;
}
// 修改移除事件处理器的实现
bool XNEventManager::RemoveEventHandler(const std::string &eventName, int handlerId)
{
T_D();
std::lock_guard<std::mutex> locker(d->eventMutex);
// 如果指定了事件名称,先验证事件是否存在
if (!eventName.empty()) {
auto it = d->eventHandlers.find(eventName);
if (it == d->eventHandlers.end()) {
LOG_WARNING("B01093005:移除事件 %1 时, 事件不存在!", eventName);
return false;
}
// 查找并移除指定的处理器
auto &handlers = it->second;
auto handlerIt = std::find_if(
handlers.begin(), handlers.end(),
[handlerId](const EventHandlerInfo &info) { return info.GetHandlerId() == handlerId; });
if (handlerIt != handlers.end()) {
handlers.erase(handlerIt);
d->handlerToEvent.erase(handlerId);
LOG_INFO("D01094006:移除事件 %1 的处理器ID: %2", eventName, handlerId);
// 如果事件没有处理器了,移除整个事件
if (handlers.empty()) {
d->eventHandlers.erase(it);
}
return true;
}
LOG_WARNING("B01093007:移除事件 %1 的处理器ID: %2 时, 处理器不存在!", eventName, handlerId);
return false;
}
// 如果没有指定事件名称,使用反向映射查找
auto eventIt = d->handlerToEvent.find(handlerId);
if (eventIt != d->handlerToEvent.end()) {
std::string eventToRemove = eventIt->second;
auto &handlers = d->eventHandlers[eventToRemove];
auto handlerIt = std::find_if(
handlers.begin(), handlers.end(),
[handlerId](const EventHandlerInfo &info) { return info.GetHandlerId() == handlerId; });
if (handlerIt != handlers.end()) {
handlers.erase(handlerIt);
d->handlerToEvent.erase(handlerId);
LOG_INFO("D01094008:移除事件 %1 的处理器ID: %2", eventToRemove, handlerId);
// 如果事件没有处理器了,移除整个事件
if (handlers.empty()) {
d->eventHandlers.erase(eventToRemove);
}
return true;
}
}
LOG_WARNING("B01093009:移除处理器ID: %1 时, 处理器不存在!", handlerId);
return false;
}
// 修改触发事件的实现
void XNEventManager::TriggerEvent(const std::string &eventName, const std::any &eventData,
bool forceAsync, XNEvent::Priority priority)
{
T_D();
std::list<EventHandlerInfo> handlers;
{
std::lock_guard<std::mutex> locker(d->eventMutex);
auto it = d->eventHandlers.find(eventName);
if (it == d->eventHandlers.end()) {
return;
}
handlers = it->second;
}
for (const auto &handler : handlers) {
if (forceAsync || handler.isAsync) {
if (priority == XNEvent::Priority::RealTime) {
// 创建实时任务
RTEventTask *task = new RTEventTask(eventName, eventData, handler.callback, this);
d->rtManager.addTask(task);
} else {
// 普通异步任务使用线程池
std::lock_guard<std::mutex> lock(d->taskMutex);
d->taskQueue.push(new AsyncEventTask(eventName, eventData, handler.callback, this));
d->taskCond.notify_one();
}
} else {
// 同步执行
try {
handler.callback(eventData);
EventProcessed(eventName, true);
} catch (const std::exception &e) {
LOG_ERROR("B01092010:事件 %1 的处理器ID: %2 执行时发生异常: %3", eventName,
handler.GetHandlerId(), e.what());
EventProcessed(eventName, false);
}
}
}
}
void XNEventManager::SetMaxThreadCount(int count)
{
T_D();
std::lock_guard<std::mutex> lock(d->taskMutex);
// 停止现有线程
d->running = false;
d->taskCond.notify_all();
for (auto &thread : d->workerThreads) {
if (thread.joinable()) {
thread.join();
}
}
d->workerThreads.clear();
// 创建新线程
d->running = true;
for (int i = 0; i < count; ++i) {
d->workerThreads.emplace_back([this, d]() {
while (d->running) {
BaseEventTask *task = nullptr;
{
std::unique_lock<std::mutex> lock(d->taskMutex);
d->taskCond.wait(lock,
[this, d] { return !d->taskQueue.empty() || !d->running; });
if (!d->running) {
break;
}
task = d->taskQueue.front();
d->taskQueue.pop();
}
if (task) {
task->execute();
delete task;
}
}
});
}
LOG_INFO("D01094011:设置事件处理线程池最大线程数为 %1", count);
}
int XNEventManager::GetMaxThreadCount() const
{
T_D();
std::lock_guard<std::mutex> lock(d->taskMutex);
return d->workerThreads.size();
}
void XNEventManager::WaitForAsyncEvents()
{
T_D();
std::unique_lock<std::mutex> lock(d->taskMutex);
d->taskCond.wait(lock, [this, d] { return d->taskQueue.empty(); });
}
void XNEventManager::SetRTThreadPoolConfig(int maxThreads, int minPriority, int maxPriority)
{
T_D();
d->rtManager.stop();
XNFrameworkPtr framework = GetFramework();
if (!framework) {
LOG_WARNING("B01093012:主框架指针为空,无法设置实时线程池配置!");
return;
}
uint32_t cpuAffinity = framework->GetCpuAffinity();
// 找到最后一个可用的CPU
int lastCpu = -1;
for (int i = 0; i < 32; i++) { // 假设最多32个CPU
if (cpuAffinity & (1u << i)) {
lastCpu = i;
}
}
if (lastCpu < 0) {
LOG_WARNING("B01093013:没有可用的CPU使用默认CPU 1");
lastCpu = 1;
} else {
LOG_INFO("D01094014:实时事件处理线程绑定到CPU %1", lastCpu);
}
d->rtManager.start(maxThreads, maxPriority, lastCpu);
}
void XNEventManager::EventProcessed(const std::string &eventName, bool success)
{
T_D();
// 这里可以添加事件处理完成的回调逻辑
//LOG_INFO("Event " + eventName + " processed " + (success ? "successfully" : "with error"));
}