XNSim/Release/include/XNCore/XNEventManager_p.h

243 lines
5.6 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#pragma once
#include "XNBaseFrameObject_p.h"
#include "XNEventManager.h"
#include <functional>
#include <sys/types.h>
#include <bits/pthreadtypes.h>
#include <bits/sched.h>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <string>
#include <any>
#include <map>
#include <list>
// 事件处理器信息结构
struct EventHandlerInfo {
std::function<void(const std::any &)> callback; // 回调函数
uint32_t objectId; // 对象ID
uint32_t localId; // 本地ID
bool isAsync; // 是否异步处理
XNEvent::Priority priority; // 事件优先级
uint32_t threadPriority; // 线程优先级
// 获取全局处理器ID
uint32_t GetHandlerId() const { return (objectId << 16) | (localId & 0xFFFF); }
// 从全局处理器ID中提取对象ID
static uint32_t GetObjectId(uint32_t handlerId) { return handlerId >> 16; }
// 从全局处理器ID中提取本地ID
static uint32_t GetLocalId(uint32_t handlerId) { return handlerId & 0xFFFF; }
};
// 事件任务基类
class BaseEventTask
{
public:
/**
* @brief 构造函数
* @param name 事件名称
* @param data 事件数据
* @param callback 回调函数
* @param manager 事件管理器指针
*/
BaseEventTask(const std::string &name, const std::any &data,
std::function<void(const std::any &)> callback, XNEventManager *manager)
: eventName(name), eventData(data), eventCallback(callback), eventManager(manager)
{
}
virtual ~BaseEventTask() = default;
/**
* @brief 执行任务
*/
virtual void execute() = 0;
protected:
std::string eventName;
std::any eventData;
std::function<void(const std::any &)> eventCallback;
XNEventManager *eventManager;
};
// 异步事件任务
class AsyncEventTask : public BaseEventTask
{
public:
/**
* @brief 构造函数
* @param name 事件名称
* @param data 事件数据
* @param callback 回调函数
* @param manager 事件管理器指针
*/
AsyncEventTask(const std::string &name, const std::any &data,
std::function<void(const std::any &)> callback, XNEventManager *manager)
: BaseEventTask(name, data, callback, manager)
{
}
/**
* @brief 执行任务
*/
void execute() override
{
try {
eventCallback(eventData);
if (eventManager) {
eventManager->EventProcessed(eventName, true);
}
} catch (const std::exception &e) {
LOG_ERROR("Async event handler exception for " + eventName + ": " + e.what());
if (eventManager) {
eventManager->EventProcessed(eventName, false);
}
}
}
};
// 实时事件任务
class RTEventTask : public BaseEventTask
{
public:
/**
* @brief 构造函数
* @param name 事件名称
* @param data 事件数据
* @param callback 回调函数
* @param manager 事件管理器指针
*/
RTEventTask(const std::string &name, const std::any &data,
std::function<void(const std::any &)> callback, XNEventManager *manager)
: BaseEventTask(name, data, callback, manager)
{
}
/**
* @brief 执行任务
*/
void execute() override
{
try {
eventCallback(eventData);
if (eventManager) {
eventManager->EventProcessed(eventName, true);
}
} catch (const std::exception &e) {
LOG_ERROR("RT event handler exception for " + eventName + ": " + e.what());
if (eventManager) {
eventManager->EventProcessed(eventName, false);
}
}
}
};
// 实时线程管理器
class RTThreadManager
{
public:
RTThreadManager() : running(false) {}
~RTThreadManager() { stop(); }
void start(int threadCount, int priority, int cpuCore)
{
running = true;
for (int i = 0; i < threadCount; ++i) {
pthread_t thread;
pthread_create(&thread, nullptr, threadFunction, this);
// 设置线程优先级
struct sched_param param;
param.sched_priority = priority;
pthread_setschedparam(thread, SCHED_FIFO, &param);
// 设置CPU亲和性
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpuCore, &cpuset);
pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
threads.push_back(thread);
}
}
void stop()
{
running = false;
taskCond.notify_all();
for (auto thread : threads) {
pthread_join(thread, nullptr);
}
threads.clear();
}
void addTask(RTEventTask *task)
{
std::lock_guard<std::mutex> lock(taskMutex);
taskQueue.push(task);
taskCond.notify_one();
}
private:
static void *threadFunction(void *arg)
{
auto manager = static_cast<RTThreadManager *>(arg);
manager->processTask();
return nullptr;
}
void processTask()
{
while (running) {
RTEventTask *task = nullptr;
{
std::unique_lock<std::mutex> lock(taskMutex);
taskCond.wait(lock, [this] { return !taskQueue.empty() || !running; });
if (!running)
break;
task = taskQueue.front();
taskQueue.pop();
}
if (task) {
task->execute();
delete task;
}
}
}
bool running;
std::vector<pthread_t> threads;
std::queue<RTEventTask *> taskQueue;
std::mutex taskMutex;
std::condition_variable taskCond;
};
// 事件管理器的私有实现类
struct XNEventManagerPrivate : public XNBaseFrameObjectPrivate {
// 存储事件及其对应的处理器信息列表
// key: 事件名称
// value: 该事件对应的所有处理器信息列表
std::map<std::string, std::list<EventHandlerInfo>> eventHandlers;
// 处理器ID到事件名称的反向映射用于快速查找
std::map<int, std::string> handlerToEvent;
// 本地ID计数器
int localIdCounter = 0;
// 互斥锁,用于保护事件处理器表的线程安全访问
std::mutex eventMutex;
// 线程池相关
std::vector<std::thread> workerThreads;
std::queue<BaseEventTask *> taskQueue;
std::mutex taskMutex;
std::condition_variable taskCond;
bool running = true;
RTThreadManager rtManager;
};