/** * @file TopicManager.h * @author jinchao * @brief 主题管理类 * @version 1.0 * @date 2025-03-10 * * @copyright Copyright (c) 2025 COMAC * */ #pragma once #include #include #include #include #include #include "DataReaderListenerImpl.h" #include "../TypeDefine.h" #include /** * @brief 主题管理类 */ class TopicManager : public QObject { Q_OBJECT public: /** * @brief 删除拷贝构造 */ TopicManager(const TopicManager &) = delete; /** * @brief 删除赋值操作 */ TopicManager &operator=(const TopicManager &) = delete; /** * @brief 获取单例 * @return TopicManager*: 主题管理类实例 */ static TopicManager *Instance() { if (instance == nullptr) { QMutexLocker locker(&instanceMutex); if (instance == nullptr) { // 双重检查锁定 instance = new TopicManager(); } } return instance; } /** * @brief 清理参与者 */ static void cleanupParticipant() { QMutexLocker locker(&instanceMutex); if (instance != nullptr) { instance->clearAllTopic(); // 清理所有主题 if (instance->m_Participant != nullptr) { eprosima::fastdds::dds::DomainParticipantFactory::get_instance() ->delete_participant(instance->m_Participant); // 删除参与者 instance->m_Participant = nullptr; // 设置参与者为空 } delete instance; // 删除单例 instance = nullptr; // 设置单例为空 } } private: /** * @brief 显式构造函数 * @param parent: 父对象 */ explicit TopicManager(QObject *parent = nullptr) : QObject(parent) { XNParticipantQos participantQos; participantQos.name("XNMonitor"); // 设置参与者名称 m_Participant = eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->create_participant( 10, participantQos); // 创建参与者 } /** * @brief 析构函数 */ virtual ~TopicManager() {} signals: /** * @brief 发送调试信息信号 * @param type: 类型 * @param message: 信息 */ void sendDebugMessage(int type, const QString &message); public: /** * @brief 注册发布者模板函数 * @tparam T: 类型 * @param topicName: 主题名称 * @return XNDataWriter*: 数据写入器 */ template XNDataWriter *registerPublisher(const QString &topicName) { QMutexLocker locker(&m_Mutex); if (!topics_.contains(topicName)) { topics_[topicName] = TopicInfo(); // 创建主题信息 TopicInfo &tmp = topics_[topicName]; // 获取主题信息 XNTypeSupport typeSupport(new T()); // 创建类型支持 typeSupport.register_type(m_Participant); // 注册类型 tmp.topic = m_Participant->create_topic( topicName.toStdString().c_str(), typeSupport.get_type_name(), eprosima::fastdds::dds::TOPIC_QOS_DEFAULT); // 创建主题 if (tmp.topic == nullptr) { emit sendDebugMessage( 1, QString("Failed to create topic %1").arg(topicName)); // 发送调试信息 topics_.remove(topicName); // 移除主题 return nullptr; // 返回空 } } TopicInfo &topicInfo = topics_[topicName]; // 获取主题信息 if (topicInfo.publisher == nullptr) { topicInfo.publisher = m_Participant->create_publisher( eprosima::fastdds::dds::PUBLISHER_QOS_DEFAULT); // 创建发布者 if (topicInfo.publisher == nullptr) { emit sendDebugMessage( 1, QString("Failed to create publisher %1").arg(topicName)); // 发送调试信息 return nullptr; // 返回空 } } if (topicInfo.dataWriter == nullptr) { XNDataWriterQos dataWriterQos; // 设置数据写入器的持久性策略, 使用瞬态本地持久性 dataWriterQos.durability().kind = eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS; // 设置数据写入器的生命周期策略, 设置为5秒 dataWriterQos.lifespan().duration = eprosima::fastdds::dds::Duration_t(5, 0); topicInfo.dataWriter = topicInfo.publisher->create_datawriter( topicInfo.topic, dataWriterQos); // 创建数据写入器 if (topicInfo.dataWriter == nullptr) { emit sendDebugMessage( 1, QString("Failed to create data writer %1").arg(topicName)); // 发送调试信息 topics_.remove(topicName); // 移除主题 return nullptr; // 返回空 } } return topicInfo.dataWriter; // 返回数据写入器 } /** * @brief 注销发布者 * @param topicName: 主题名称 */ void unregisterPublisher(const QString &topicName) { QMutexLocker locker(&m_Mutex); if (topics_.contains(topicName)) { TopicInfo &topicInfo = topics_[topicName]; // 获取主题信息 if (topicInfo.dataWriter != nullptr) { topicInfo.publisher->delete_datawriter(topicInfo.dataWriter); // 删除数据写入器 topicInfo.dataWriter = nullptr; // 设置数据写入器为空 } if (topicInfo.publisher != nullptr) { m_Participant->delete_publisher(topicInfo.publisher); // 删除发布者 topicInfo.publisher = nullptr; // 设置发布者为空 } if (topicInfo.publisher == nullptr && topicInfo.subscriber == nullptr && topicInfo.topic != nullptr) { m_Participant->delete_topic(topicInfo.topic); // 删除主题 topicInfo.topic = nullptr; // 设置主题为空 topics_.remove(topicName); // 移除主题 } } } /** * @brief 注册订阅者模板函数 * @tparam T: 类型 * @param topicName: 主题名称 * @param fun: 回调函数 */ template void registerSubscriber(const QString &topicName, std::function fun) { QMutexLocker locker(&m_Mutex); if (!topics_.contains(topicName)) { topics_[topicName] = TopicInfo(); // 创建主题信息 TopicInfo &tmp = topics_[topicName]; // 获取主题信息 XNTypeSupport typeSupport(new T()); // 创建类型支持 typeSupport.register_type(m_Participant); // 注册类型 tmp.topic = m_Participant->create_topic( topicName.toStdString().c_str(), typeSupport.get_type_name(), eprosima::fastdds::dds::TOPIC_QOS_DEFAULT); // 创建主题 if (tmp.topic == nullptr) { emit sendDebugMessage( 1, QString("Failed to create topic %1").arg(topicName)); // 发送调试信息 topics_.remove(topicName); // 移除主题 return; // 返回 } } TopicInfo &topicInfo = topics_[topicName]; // 获取主题信息 if (topicInfo.subscriber == nullptr) { topicInfo.subscriber = m_Participant->create_subscriber( eprosima::fastdds::dds::SUBSCRIBER_QOS_DEFAULT); // 创建订阅者 if (topicInfo.subscriber == nullptr) { emit sendDebugMessage( 1, QString("Failed to create subscriber %1").arg(topicName)); // 发送调试信息 return; // 返回 } } if (topicInfo.dataReader == nullptr) { XNDataReaderQos dataReaderQos; dataReaderQos.durability().kind = eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS; // 设置数据读取器的持久性策略 topicInfo.listener = new DataReaderListenerImpl(fun); // 创建数据读取器监听器 topicInfo.dataReader = topicInfo.subscriber->create_datareader( topicInfo.topic, dataReaderQos, topicInfo.listener); // 创建数据读取器 if (topicInfo.dataReader == nullptr) { emit sendDebugMessage( 1, QString("Failed to create data reader %1").arg(topicName)); // 发送调试信息 return; // 返回 } } else { auto oldListener = topicInfo.dataReader->get_listener(); // 获取旧的监听器 topicInfo.listener = new DataReaderListenerImpl(fun); // 创建新的监听器 topicInfo.dataReader->set_listener( topicInfo.listener, eprosima::fastdds::dds::StatusMask::all()); // 设置新的监听器 delete oldListener; // 删除旧的监听器 } } /** * @brief 注销订阅者 * @param topicName: 主题名称 */ void unregisterSubscriber(const QString &topicName) { QMutexLocker locker(&m_Mutex); if (topics_.contains(topicName)) { TopicInfo &topicInfo = topics_[topicName]; // 获取主题信息 if (topicInfo.dataReader != nullptr) { topicInfo.subscriber->delete_datareader(topicInfo.dataReader); // 删除数据读取器 topicInfo.dataReader = nullptr; // 设置数据读取器为空 } if (topicInfo.listener != nullptr) { delete topicInfo.listener; // 删除监听器 topicInfo.listener = nullptr; // 设置监听器为空 } if (topicInfo.subscriber != nullptr) { m_Participant->delete_subscriber(topicInfo.subscriber); // 删除订阅者 topicInfo.subscriber = nullptr; // 设置订阅者为空 } if (topicInfo.subscriber == nullptr && topicInfo.publisher == nullptr && topicInfo.topic != nullptr) { m_Participant->delete_topic(topicInfo.topic); // 删除主题 topicInfo.topic = nullptr; // 设置主题为空 topics_.remove(topicName); // 移除主题 emit sendDebugMessage( 0, QString("unregisterSubscriber %1").arg(topicName)); // 发送调试信息 } } } private: /** * @brief 清除所有主题 */ void clearAllTopic() { QMutexLocker locker(&m_Mutex); if (m_Participant != nullptr) { while (topics_.size() > 0) { emit sendDebugMessage( 0, QString("clearAllTopic %1").arg(topics_.begin().key())); // 发送调试信息 unregisterPublisher(topics_.begin().key()); // 注销发布者 unregisterSubscriber(topics_.begin().key()); // 注销订阅者 } } } private: /** * @brief 单例指针 */ static TopicManager *instance; /** * @brief 单例互斥锁 */ static QMutex instanceMutex; /** * @brief 参与者 */ XNParticipant *m_Participant = nullptr; /** * @brief 主题映射 */ QMap topics_; /** * @brief 主题访问互斥锁 */ QMutex m_Mutex; };