XNSim/XNCore/XNEventManager.cpp

296 lines
7.9 KiB
C++
Executable File

#include "XNEventManager.h"
#include "XNEventManager_p.h"
#include "XNFramework.h"
#include <thread>
#include <algorithm>
// 构造函数
XNEventManager::XNEventManager() : XNBaseFrameObject(new XNEventManagerPrivate())
{
// 设置唯一标识符
SetUniqueId(7);
// 设置对象名称
SetObjectName("XNEventManager");
}
// 析构函数
XNEventManager::~XNEventManager()
{
T_D();
d->running = false;
d->taskCond.notify_all();
for (auto &thread : d->workerThreads) {
if (thread.joinable()) {
thread.join();
}
}
}
// 保护构造函数实现
XNEventManager::XNEventManager(PrivateType *p) : XNBaseFrameObject(p)
{
}
// 修改注册事件处理器的实现
int XNEventManager::RegisterEventHandler(const std::string &eventName, XNEventCallback callback,
uint32_t objectId, bool async, XNEvent::Priority priority)
{
T_D();
if (eventName.empty() || !callback) {
LOG_WARNING("Invalid event name or callback!");
return -1;
}
std::lock_guard<std::mutex> locker(d->eventMutex);
// 生成新的本地ID
d->localIdCounter = (d->localIdCounter + 1) & 0xFFFF;
if (d->localIdCounter == 0)
d->localIdCounter = 1; // 避免为0
// 创建处理器信息
EventHandlerInfo handlerInfo;
handlerInfo.callback = callback;
handlerInfo.objectId = objectId;
handlerInfo.localId = d->localIdCounter;
handlerInfo.isAsync = async;
handlerInfo.priority = priority;
// 计算全局处理器ID
int handlerId = handlerInfo.GetHandlerId();
// 添加处理器信息到事件列表
d->eventHandlers[eventName].push_back(handlerInfo);
// 添加反向映射
d->handlerToEvent[handlerId] = eventName;
LOG_INFO("Registered " + std::string(async ? "async" : "sync") + " event handler for event: "
+ eventName + ", handler ID: " + std::to_string(handlerId) + " (object: "
+ std::to_string(objectId) + ", local: " + std::to_string(d->localIdCounter) + ")");
return handlerId;
}
// 修改移除事件处理器的实现
bool XNEventManager::RemoveEventHandler(const std::string &eventName, int handlerId)
{
T_D();
std::lock_guard<std::mutex> locker(d->eventMutex);
// 如果指定了事件名称,先验证事件是否存在
if (!eventName.empty()) {
auto it = d->eventHandlers.find(eventName);
if (it == d->eventHandlers.end()) {
LOG_WARNING("Event " + eventName + " not found!");
return false;
}
// 查找并移除指定的处理器
auto &handlers = it->second;
auto handlerIt = std::find_if(
handlers.begin(), handlers.end(),
[handlerId](const EventHandlerInfo &info) { return info.GetHandlerId() == handlerId; });
if (handlerIt != handlers.end()) {
handlers.erase(handlerIt);
d->handlerToEvent.erase(handlerId);
LOG_INFO("Removed handler ID " + std::to_string(handlerId)
+ " from event: " + eventName);
// 如果事件没有处理器了,移除整个事件
if (handlers.empty()) {
d->eventHandlers.erase(it);
}
return true;
}
LOG_WARNING("Handler ID " + std::to_string(handlerId)
+ " not found in event: " + eventName);
return false;
}
// 如果没有指定事件名称,使用反向映射查找
auto eventIt = d->handlerToEvent.find(handlerId);
if (eventIt != d->handlerToEvent.end()) {
std::string eventToRemove = eventIt->second;
auto &handlers = d->eventHandlers[eventToRemove];
auto handlerIt = std::find_if(
handlers.begin(), handlers.end(),
[handlerId](const EventHandlerInfo &info) { return info.GetHandlerId() == handlerId; });
if (handlerIt != handlers.end()) {
handlers.erase(handlerIt);
d->handlerToEvent.erase(handlerId);
LOG_INFO("Removed handler ID " + std::to_string(handlerId)
+ " from event: " + eventToRemove);
// 如果事件没有处理器了,移除整个事件
if (handlers.empty()) {
d->eventHandlers.erase(eventToRemove);
}
return true;
}
}
LOG_WARNING("Handler ID " + std::to_string(handlerId) + " not found!");
return false;
}
// 修改触发事件的实现
void XNEventManager::TriggerEvent(const std::string &eventName, const std::any &eventData,
bool forceAsync, XNEvent::Priority priority)
{
T_D();
std::list<EventHandlerInfo> handlers;
{
std::lock_guard<std::mutex> locker(d->eventMutex);
auto it = d->eventHandlers.find(eventName);
if (it == d->eventHandlers.end()) {
return;
}
handlers = it->second;
}
for (const auto &handler : handlers) {
if (forceAsync || handler.isAsync) {
if (priority == XNEvent::Priority::RealTime) {
// 创建实时任务
RTEventTask *task = new RTEventTask(eventName, eventData, handler.callback, this);
d->rtManager.addTask(task);
} else {
// 普通异步任务使用线程池
std::lock_guard<std::mutex> lock(d->taskMutex);
d->taskQueue.push(new AsyncEventTask(eventName, eventData, handler.callback, this));
d->taskCond.notify_one();
}
} else {
// 同步执行
try {
handler.callback(eventData);
EventProcessed(eventName, true);
} catch (const std::exception &e) {
LOG_ERROR("Exception in handler " + std::to_string(handler.GetHandlerId())
+ " for event " + eventName + ": " + e.what());
EventProcessed(eventName, false);
}
}
}
}
void XNEventManager::SetMaxThreadCount(int count)
{
T_D();
std::lock_guard<std::mutex> lock(d->taskMutex);
// 停止现有线程
d->running = false;
d->taskCond.notify_all();
for (auto &thread : d->workerThreads) {
if (thread.joinable()) {
thread.join();
}
}
d->workerThreads.clear();
// 创建新线程
d->running = true;
for (int i = 0; i < count; ++i) {
d->workerThreads.emplace_back([this, d]() {
while (d->running) {
BaseEventTask *task = nullptr;
{
std::unique_lock<std::mutex> lock(d->taskMutex);
d->taskCond.wait(lock,
[this, d] { return !d->taskQueue.empty() || !d->running; });
if (!d->running) {
break;
}
task = d->taskQueue.front();
d->taskQueue.pop();
}
if (task) {
task->execute();
delete task;
}
}
});
}
LOG_INFO("Set thread pool max thread count to " + std::to_string(count));
}
int XNEventManager::GetMaxThreadCount() const
{
T_D();
std::lock_guard<std::mutex> lock(d->taskMutex);
return d->workerThreads.size();
}
void XNEventManager::WaitForAsyncEvents()
{
T_D();
std::unique_lock<std::mutex> lock(d->taskMutex);
d->taskCond.wait(lock, [this, d] { return d->taskQueue.empty(); });
LOG_INFO("All async events have been processed");
}
// 初始化事件管理器
bool XNEventManager::Initialize()
{
T_D();
// 配置普通线程池
SetMaxThreadCount(std::thread::hardware_concurrency());
// 配置实时线程池
SetRTThreadPoolConfig(2, // 最大线程数
sched_get_priority_min(SCHED_FIFO), // 最小优先级
sched_get_priority_max(SCHED_FIFO)); // 最大优先级
LOG_INFO("XNEventManager Initialize Success!");
d->_status = XNFrameObjectStatus::Initialized;
return true;
}
// 准备执行
bool XNEventManager::PrepareForExecute()
{
T_D();
d->_status = XNFrameObjectStatus::Ready;
LOG_INFO("XNEventManager is prepared!");
return true;
}
void XNEventManager::SetRTThreadPoolConfig(int maxThreads, int minPriority, int maxPriority)
{
T_D();
d->rtManager.stop();
XNFrameworkPtr framework = GetFramework();
if (!framework) {
LOG_WARNING("XNFramework is nullptr!");
return;
}
uint32_t cpuAffinity = framework->GetCpuAffinity();
// 找到最后一个可用的CPU
int lastCpu = -1;
for (int i = 0; i < 32; i++) { // 假设最多32个CPU
if (cpuAffinity & (1u << i)) {
lastCpu = i;
}
}
if (lastCpu < 0) {
LOG_WARNING("No available CPU found in affinity mask, using default CPU 1");
lastCpu = 1;
} else {
LOG_INFO("RT thread bound to CPU " + std::to_string(lastCpu));
}
d->rtManager.start(maxThreads, maxPriority, lastCpu);
}
void XNEventManager::EventProcessed(const std::string &eventName, bool success)
{
T_D();
// 这里可以添加事件处理完成的回调逻辑
LOG_INFO("Event " + eventName + " processed " + (success ? "successfully" : "with error"));
}