#pragma once #include "XNBaseFrameObject_p.h" #include "XNEventManager.h" #include #include #include #include #include #include #include #include #include #include #include // 事件处理器信息结构 struct EventHandlerInfo { std::function callback; // 回调函数 quint32 objectId; // 对象ID int localId; // 本地ID bool isAsync; // 是否异步处理 XNEvent::Priority priority; // 事件优先级 int threadPriority; // 线程优先级 // 获取全局处理器ID int GetHandlerId() const { return (objectId << 16) | (localId & 0xFFFF); } // 从全局处理器ID中提取对象ID static quint32 GetObjectId(int handlerId) { return handlerId >> 16; } // 从全局处理器ID中提取本地ID static int GetLocalId(int handlerId) { return handlerId & 0xFFFF; } }; // 事件任务基类 class BaseEventTask : public QRunnable { public: BaseEventTask(const QString &name, const QVariant &data, std::function callback, XNEventManager *manager) : eventName(name), eventData(data), eventCallback(callback), eventManager(manager) { setAutoDelete(true); } virtual ~BaseEventTask() = default; protected: QString eventName; QVariant eventData; std::function eventCallback; XNEventManager *eventManager; }; // 实时事件任务 class RTEventTask { public: RTEventTask(const QString &name, const QVariant &data, std::function callback, XNEventManager *manager) : eventName(name), eventData(data), eventCallback(callback), eventManager(manager) { } void execute() { try { eventCallback(eventData); if (eventManager) { QMetaObject::invokeMethod(eventManager, "EventProcessed", Qt::QueuedConnection, Q_ARG(QString, eventName), Q_ARG(bool, true)); } } catch (const std::exception &e) { LOG_ERROR( QString("RT event handler exception for %1: %2").arg(eventName).arg(e.what())); if (eventManager) { QMetaObject::invokeMethod(eventManager, "EventProcessed", Qt::QueuedConnection, Q_ARG(QString, eventName), Q_ARG(bool, false)); } } } private: QString eventName; QVariant eventData; std::function eventCallback; XNEventManager *eventManager; }; // 实时线程管理器 class RTThreadManager { public: RTThreadManager() : running(false) {} ~RTThreadManager() { stop(); } void start(int threadCount, int priority, int cpuCore) { running = true; for (int i = 0; i < threadCount; ++i) { pthread_t thread; pthread_create(&thread, nullptr, threadFunction, this); // 设置线程优先级 struct sched_param param; param.sched_priority = priority; pthread_setschedparam(thread, SCHED_FIFO, ¶m); // 设置CPU亲和性 cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(cpuCore, &cpuset); pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset); threads.push_back(thread); } } void stop() { running = false; taskCond.notify_all(); for (auto thread : threads) { pthread_join(thread, nullptr); } threads.clear(); } void addTask(RTEventTask *task) { std::lock_guard lock(taskMutex); taskQueue.push(task); taskCond.notify_one(); } private: static void *threadFunction(void *arg) { auto manager = static_cast(arg); manager->processTask(); return nullptr; } void processTask() { while (running) { RTEventTask *task = nullptr; { std::unique_lock lock(taskMutex); taskCond.wait(lock, [this] { return !taskQueue.empty() || !running; }); if (!running) break; task = taskQueue.front(); taskQueue.pop(); } if (task) { task->execute(); delete task; } } } bool running; std::vector threads; std::queue taskQueue; std::mutex taskMutex; std::condition_variable taskCond; }; // 事件管理器的私有实现类 class XNEventManagerPrivate : public XNBaseFrameObjectPrivate { public: // 声明公共接口类 Q_DECLARE_PUBLIC(XNEventManager) // 构造函数,初始化私有实现 explicit XNEventManagerPrivate(XNEventManager *q) : XNBaseFrameObjectPrivate(q) {} // 存储事件及其对应的处理器信息列表 // key: 事件名称 // value: 该事件对应的所有处理器信息列表 QMap> eventHandlers; // 处理器ID到事件名称的反向映射,用于快速查找 QMap handlerToEvent; // 本地ID计数器 int localIdCounter = 0; // 互斥锁,用于保护事件处理器表的线程安全访问 QMutex eventMutex; // 线程池,用于异步执行事件处理器 QThreadPool threadPool; // 实时线程池 QThreadPool rtThreadPool; RTThreadManager rtManager; QThreadPool normalThreadPool; // 用于非实时任务 };