#include "XNEventManager.h" #include "XNEventManager_p.h" #include "XNFramework.h" #include #include // 构造函数 XNEventManager::XNEventManager() : XNBaseFrameObject(new XNEventManagerPrivate()) { // 设置唯一标识符 SetUniqueId(7); // 设置对象名称 SetObjectName("XNEventManager"); } // 析构函数 XNEventManager::~XNEventManager() { T_D(); d->running = false; d->taskCond.notify_all(); for (auto &thread : d->workerThreads) { if (thread.joinable()) { thread.join(); } } } // 保护构造函数实现 XNEventManager::XNEventManager(PrivateType *p) : XNBaseFrameObject(p) { } // 修改注册事件处理器的实现 int XNEventManager::RegisterEventHandler(const std::string &eventName, XNEventCallback callback, uint32_t objectId, bool async, XNEvent::Priority priority) { T_D(); if (eventName.empty() || !callback) { LOG_WARNING("Invalid event name or callback!"); return -1; } std::lock_guard locker(d->eventMutex); // 生成新的本地ID d->localIdCounter = (d->localIdCounter + 1) & 0xFFFF; if (d->localIdCounter == 0) d->localIdCounter = 1; // 避免为0 // 创建处理器信息 EventHandlerInfo handlerInfo; handlerInfo.callback = callback; handlerInfo.objectId = objectId; handlerInfo.localId = d->localIdCounter; handlerInfo.isAsync = async; handlerInfo.priority = priority; // 计算全局处理器ID int handlerId = handlerInfo.GetHandlerId(); // 添加处理器信息到事件列表 d->eventHandlers[eventName].push_back(handlerInfo); // 添加反向映射 d->handlerToEvent[handlerId] = eventName; LOG_INFO("Registered " + std::string(async ? "async" : "sync") + " event handler for event: " + eventName + ", handler ID: " + std::to_string(handlerId) + " (object: " + std::to_string(objectId) + ", local: " + std::to_string(d->localIdCounter) + ")"); return handlerId; } // 修改移除事件处理器的实现 bool XNEventManager::RemoveEventHandler(const std::string &eventName, int handlerId) { T_D(); std::lock_guard locker(d->eventMutex); // 如果指定了事件名称,先验证事件是否存在 if (!eventName.empty()) { auto it = d->eventHandlers.find(eventName); if (it == d->eventHandlers.end()) { LOG_WARNING("Event " + eventName + " not found!"); return false; } // 查找并移除指定的处理器 auto &handlers = it->second; auto handlerIt = std::find_if( handlers.begin(), handlers.end(), [handlerId](const EventHandlerInfo &info) { return info.GetHandlerId() == handlerId; }); if (handlerIt != handlers.end()) { handlers.erase(handlerIt); d->handlerToEvent.erase(handlerId); LOG_INFO("Removed handler ID " + std::to_string(handlerId) + " from event: " + eventName); // 如果事件没有处理器了,移除整个事件 if (handlers.empty()) { d->eventHandlers.erase(it); } return true; } LOG_WARNING("Handler ID " + std::to_string(handlerId) + " not found in event: " + eventName); return false; } // 如果没有指定事件名称,使用反向映射查找 auto eventIt = d->handlerToEvent.find(handlerId); if (eventIt != d->handlerToEvent.end()) { std::string eventToRemove = eventIt->second; auto &handlers = d->eventHandlers[eventToRemove]; auto handlerIt = std::find_if( handlers.begin(), handlers.end(), [handlerId](const EventHandlerInfo &info) { return info.GetHandlerId() == handlerId; }); if (handlerIt != handlers.end()) { handlers.erase(handlerIt); d->handlerToEvent.erase(handlerId); LOG_INFO("Removed handler ID " + std::to_string(handlerId) + " from event: " + eventToRemove); // 如果事件没有处理器了,移除整个事件 if (handlers.empty()) { d->eventHandlers.erase(eventToRemove); } return true; } } LOG_WARNING("Handler ID " + std::to_string(handlerId) + " not found!"); return false; } // 修改触发事件的实现 void XNEventManager::TriggerEvent(const std::string &eventName, const std::any &eventData, bool forceAsync, XNEvent::Priority priority) { T_D(); std::list handlers; { std::lock_guard locker(d->eventMutex); auto it = d->eventHandlers.find(eventName); if (it == d->eventHandlers.end()) { return; } handlers = it->second; } for (const auto &handler : handlers) { if (forceAsync || handler.isAsync) { if (priority == XNEvent::Priority::RealTime) { // 创建实时任务 RTEventTask *task = new RTEventTask(eventName, eventData, handler.callback, this); d->rtManager.addTask(task); } else { // 普通异步任务使用线程池 std::lock_guard lock(d->taskMutex); d->taskQueue.push(new AsyncEventTask(eventName, eventData, handler.callback, this)); d->taskCond.notify_one(); } } else { // 同步执行 try { handler.callback(eventData); EventProcessed(eventName, true); } catch (const std::exception &e) { LOG_ERROR("Exception in handler " + std::to_string(handler.GetHandlerId()) + " for event " + eventName + ": " + e.what()); EventProcessed(eventName, false); } } } } void XNEventManager::SetMaxThreadCount(int count) { T_D(); std::lock_guard lock(d->taskMutex); // 停止现有线程 d->running = false; d->taskCond.notify_all(); for (auto &thread : d->workerThreads) { if (thread.joinable()) { thread.join(); } } d->workerThreads.clear(); // 创建新线程 d->running = true; for (int i = 0; i < count; ++i) { d->workerThreads.emplace_back([this, d]() { while (d->running) { BaseEventTask *task = nullptr; { std::unique_lock lock(d->taskMutex); d->taskCond.wait(lock, [this, d] { return !d->taskQueue.empty() || !d->running; }); if (!d->running) { break; } task = d->taskQueue.front(); d->taskQueue.pop(); } if (task) { task->execute(); delete task; } } }); } LOG_INFO("Set thread pool max thread count to " + std::to_string(count)); } int XNEventManager::GetMaxThreadCount() const { T_D(); std::lock_guard lock(d->taskMutex); return d->workerThreads.size(); } void XNEventManager::WaitForAsyncEvents() { T_D(); std::unique_lock lock(d->taskMutex); d->taskCond.wait(lock, [this, d] { return d->taskQueue.empty(); }); LOG_INFO("All async events have been processed"); } // 初始化事件管理器 bool XNEventManager::Initialize() { T_D(); // 配置普通线程池 SetMaxThreadCount(std::thread::hardware_concurrency()); // 配置实时线程池 SetRTThreadPoolConfig(2, // 最大线程数 sched_get_priority_min(SCHED_FIFO), // 最小优先级 sched_get_priority_max(SCHED_FIFO)); // 最大优先级 LOG_INFO("XNEventManager Initialize Success!"); d->_status = XNFrameObjectStatus::Initialized; return true; } // 准备执行 bool XNEventManager::PrepareForExecute() { T_D(); d->_status = XNFrameObjectStatus::Ready; LOG_INFO("XNEventManager is prepared!"); return true; } void XNEventManager::SetRTThreadPoolConfig(int maxThreads, int minPriority, int maxPriority) { T_D(); d->rtManager.stop(); XNFrameworkPtr framework = GetFramework(); if (!framework) { LOG_WARNING("XNFramework is nullptr!"); return; } uint32_t cpuAffinity = framework->GetCpuAffinity(); // 找到最后一个可用的CPU int lastCpu = -1; for (int i = 0; i < 32; i++) { // 假设最多32个CPU if (cpuAffinity & (1u << i)) { lastCpu = i; } } if (lastCpu < 0) { LOG_WARNING("No available CPU found in affinity mask, using default CPU 1"); lastCpu = 1; } else { LOG_INFO("RT thread bound to CPU " + std::to_string(lastCpu)); } d->rtManager.start(maxThreads, maxPriority, lastCpu); } void XNEventManager::EventProcessed(const std::string &eventName, bool success) { T_D(); // 这里可以添加事件处理完成的回调逻辑 //LOG_INFO("Event " + eventName + " processed " + (success ? "successfully" : "with error")); }