#pragma once #include "XNBaseFrameObject_p.h" #include "XNEventManager.h" #include #include #include #include #include #include #include #include #include #include #include #include // 事件处理器信息结构 struct EventHandlerInfo { std::function callback; // 回调函数 uint32_t objectId; // 对象ID uint32_t localId; // 本地ID bool isAsync; // 是否异步处理 XNEvent::Priority priority; // 事件优先级 uint32_t threadPriority; // 线程优先级 // 获取全局处理器ID uint32_t GetHandlerId() const { return (objectId << 16) | (localId & 0xFFFF); } // 从全局处理器ID中提取对象ID static uint32_t GetObjectId(uint32_t handlerId) { return handlerId >> 16; } // 从全局处理器ID中提取本地ID static uint32_t GetLocalId(uint32_t handlerId) { return handlerId & 0xFFFF; } }; // 事件任务基类 class BaseEventTask { public: /** * @brief 构造函数 * @param name 事件名称 * @param data 事件数据 * @param callback 回调函数 * @param manager 事件管理器指针 */ BaseEventTask(const std::string &name, const std::any &data, std::function callback, XNEventManager *manager) : eventName(name), eventData(data), eventCallback(callback), eventManager(manager) { } virtual ~BaseEventTask() = default; /** * @brief 执行任务 */ virtual void execute() = 0; protected: std::string eventName; std::any eventData; std::function eventCallback; XNEventManager *eventManager; }; // 异步事件任务 class AsyncEventTask : public BaseEventTask { public: /** * @brief 构造函数 * @param name 事件名称 * @param data 事件数据 * @param callback 回调函数 * @param manager 事件管理器指针 */ AsyncEventTask(const std::string &name, const std::any &data, std::function callback, XNEventManager *manager) : BaseEventTask(name, data, callback, manager) { } /** * @brief 执行任务 */ void execute() override { try { eventCallback(eventData); if (eventManager) { eventManager->EventProcessed(eventName, true); } } catch (const std::exception &e) { LOG_ERROR("Async event handler exception for " + eventName + ": " + e.what()); if (eventManager) { eventManager->EventProcessed(eventName, false); } } } }; // 实时事件任务 class RTEventTask : public BaseEventTask { public: /** * @brief 构造函数 * @param name 事件名称 * @param data 事件数据 * @param callback 回调函数 * @param manager 事件管理器指针 */ RTEventTask(const std::string &name, const std::any &data, std::function callback, XNEventManager *manager) : BaseEventTask(name, data, callback, manager) { } /** * @brief 执行任务 */ void execute() override { try { eventCallback(eventData); if (eventManager) { eventManager->EventProcessed(eventName, true); } } catch (const std::exception &e) { LOG_ERROR("RT event handler exception for " + eventName + ": " + e.what()); if (eventManager) { eventManager->EventProcessed(eventName, false); } } } }; // 实时线程管理器 class RTThreadManager { public: RTThreadManager() : running(false) {} ~RTThreadManager() { stop(); } void start(int threadCount, int priority, int cpuCore) { running = true; for (int i = 0; i < threadCount; ++i) { pthread_t thread; pthread_create(&thread, nullptr, threadFunction, this); // 设置线程优先级 struct sched_param param; param.sched_priority = priority; pthread_setschedparam(thread, SCHED_FIFO, ¶m); // 设置CPU亲和性 cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(cpuCore, &cpuset); pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); threads.push_back(thread); } } void stop() { running = false; taskCond.notify_all(); for (auto thread : threads) { pthread_join(thread, nullptr); } threads.clear(); } void addTask(RTEventTask *task) { std::lock_guard lock(taskMutex); taskQueue.push(task); taskCond.notify_one(); } private: static void *threadFunction(void *arg) { auto manager = static_cast(arg); manager->processTask(); return nullptr; } void processTask() { while (running) { RTEventTask *task = nullptr; { std::unique_lock lock(taskMutex); taskCond.wait(lock, [this] { return !taskQueue.empty() || !running; }); if (!running) break; task = taskQueue.front(); taskQueue.pop(); } if (task) { task->execute(); delete task; } } } bool running; std::vector threads; std::queue taskQueue; std::mutex taskMutex; std::condition_variable taskCond; }; // 事件管理器的私有实现类 struct XNEventManagerPrivate : public XNBaseFrameObjectPrivate { // 存储事件及其对应的处理器信息列表 // key: 事件名称 // value: 该事件对应的所有处理器信息列表 std::map> eventHandlers; // 处理器ID到事件名称的反向映射,用于快速查找 std::map handlerToEvent; // 本地ID计数器 int localIdCounter = 0; // 互斥锁,用于保护事件处理器表的线程安全访问 std::mutex eventMutex; // 线程池相关 std::vector workerThreads; std::queue taskQueue; std::mutex taskMutex; std::condition_variable taskCond; bool running = true; RTThreadManager rtManager; };