/** * @file TopicManager.h * @author jinchao * @brief 主题管理类 * @version 1.0 * @date 2025-03-10 * * @copyright Copyright (c) 2025 COMAC * */ #pragma once #include #include #include #include #include "DataReaderListenerImpl.h" #include /** * @brief 主题管理类 */ class TopicManager { public: /** * @brief 删除拷贝构造 */ TopicManager(const TopicManager &) = delete; /** * @brief 删除赋值操作 */ TopicManager &operator=(const TopicManager &) = delete; /** * @brief 获取单例 * @return TopicManager*: 主题管理类实例 */ static TopicManager *Instance() { if (instance == nullptr) { std::lock_guard locker(instanceMutex); if (instance == nullptr) { // 双重检查锁定 instance = new TopicManager(); } } return instance; } /** * @brief 清理参与者 */ static void cleanupParticipant() { std::lock_guard locker(instanceMutex); if (instance != nullptr) { instance->clearAllTopic(); // 清理所有主题 if (instance->m_Participant != nullptr) { eprosima::fastdds::dds::DomainParticipantFactory::get_instance() ->delete_participant(instance->m_Participant); // 删除参与者 instance->m_Participant = nullptr; // 设置参与者为空 } delete instance; // 删除单例 instance = nullptr; // 设置单例为空 } } private: /** * @brief 显式构造函数 */ explicit TopicManager() {} /** * @brief 析构函数 */ virtual ~TopicManager() {} public: /** * @brief 初始化参与者 * @param domainId: 域ID */ XNDDSErrorCode initializeParticipant(int domainId) { XNParticipantQos participantQos = eprosima::fastdds::dds::PARTICIPANT_QOS_DEFAULT; participantQos.name("XNMonitor"); // 设置参与者名称 m_Participant = eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->create_participant( domainId, participantQos); // 创建参与者 if (m_Participant == nullptr) { return XNDDSErrorCode::INIT_FAILED; } return XNDDSErrorCode::SUCCESS; }; /** * @brief 注册发布者模板函数 * @tparam T: 类型 * @param topicName: 主题名称 * @return XNDataWriter*: 数据写入器 */ template XNDDSErrorCode registerPublisher(const std::string &topicName, XNDataWriter *dataWriter = nullptr) { std::lock_guard locker(m_Mutex); if (topics_.find(topicName) == topics_.end()) { topics_[topicName] = TopicInfo(); // 创建主题信息 TopicInfo &tmp = topics_[topicName]; // 获取主题信息 XNTypeSupport typeSupport(new T()); // 创建类型支持 typeSupport.register_type(m_Participant); // 注册类型 tmp.topic = m_Participant->create_topic(topicName.c_str(), typeSupport.get_type_name(), eprosima::fastdds::dds::TOPIC_QOS_DEFAULT); // 创建主题 if (tmp.topic == nullptr) { topics_.erase(topicName); // 移除主题 return XNDDSErrorCode::TOPIC_CREATE_FAILED; } } TopicInfo &topicInfo = topics_[topicName]; // 获取主题信息 if (topicInfo.publisher == nullptr) { topicInfo.publisher = m_Participant->create_publisher( eprosima::fastdds::dds::PUBLISHER_QOS_DEFAULT); // 创建发布者 if (topicInfo.publisher == nullptr) { return XNDDSErrorCode::PUBLISHER_CREATE_FAILED; } } if (topicInfo.dataWriter == nullptr) { XNDataWriterQos dataWriterQos; // 设置数据写入器的持久性策略, 使用瞬态本地持久性 dataWriterQos.durability().kind = eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS; // 设置数据写入器的生命周期策略, 设置为5秒 dataWriterQos.lifespan().duration = eprosima::fastdds::dds::Duration_t(5, 0); topicInfo.dataWriter = topicInfo.publisher->create_datawriter( topicInfo.topic, dataWriterQos); // 创建数据写入器 if (topicInfo.dataWriter == nullptr) { return XNDDSErrorCode::DATAWRITER_CREATE_FAILED; } } dataWriter = topicInfo.dataWriter; return XNDDSErrorCode::SUCCESS; } /** * @brief 注销发布者 * @param topicName: 主题名称 */ void unregisterPublisher(const std::string &topicName) { std::lock_guard locker(m_Mutex); auto it = topics_.find(topicName); if (it != topics_.end()) { TopicInfo &topicInfo = it->second; // 获取主题信息 if (topicInfo.dataWriter != nullptr) { topicInfo.publisher->delete_datawriter(topicInfo.dataWriter); // 删除数据写入器 topicInfo.dataWriter = nullptr; // 设置数据写入器为空 } if (topicInfo.publisher != nullptr) { m_Participant->delete_publisher(topicInfo.publisher); // 删除发布者 topicInfo.publisher = nullptr; // 设置发布者为空 } if (topicInfo.publisher == nullptr && topicInfo.subscriber == nullptr && topicInfo.topic != nullptr) { m_Participant->delete_topic(topicInfo.topic); // 删除主题 topicInfo.topic = nullptr; // 设置主题为空 topics_.erase(it); // 移除主题 } } } /** * @brief 注册订阅者模板函数 * @tparam T: 类型 * @param topicName: 主题名称 * @param fun: 回调函数 */ template XNDDSErrorCode registerSubscriber(const std::string &topicName, std::function fun) { std::lock_guard locker(m_Mutex); if (topics_.find(topicName) == topics_.end()) { topics_[topicName] = TopicInfo(); // 创建主题信息 TopicInfo &tmp = topics_[topicName]; // 获取主题信息 XNTypeSupport typeSupport(new T()); // 创建类型支持 typeSupport.register_type(m_Participant); // 注册类型 tmp.topic = m_Participant->create_topic(topicName.c_str(), typeSupport.get_type_name(), eprosima::fastdds::dds::TOPIC_QOS_DEFAULT); // 创建主题 if (tmp.topic == nullptr) { topics_.erase(topicName); // 移除主题 return XNDDSErrorCode::TOPIC_CREATE_FAILED; // 返回 } } TopicInfo &topicInfo = topics_[topicName]; // 获取主题信息 if (topicInfo.subscriber == nullptr) { topicInfo.subscriber = m_Participant->create_subscriber( eprosima::fastdds::dds::SUBSCRIBER_QOS_DEFAULT); // 创建订阅者 if (topicInfo.subscriber == nullptr) { return XNDDSErrorCode::SUBSCRIBER_CREATE_FAILED; // 返回 } } if (topicInfo.dataReader == nullptr) { XNDataReaderQos dataReaderQos; dataReaderQos.durability().kind = eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS; // 设置数据读取器的持久性策略 topicInfo.listener = new DataReaderListenerImpl(fun); // 创建数据读取器监听器 topicInfo.dataReader = topicInfo.subscriber->create_datareader( topicInfo.topic, dataReaderQos, topicInfo.listener); // 创建数据读取器 if (topicInfo.dataReader == nullptr) { return XNDDSErrorCode::DATAREADER_CREATE_FAILED; // 返回 } } else { auto oldListener = topicInfo.dataReader->get_listener(); // 获取旧的监听器 topicInfo.listener = new DataReaderListenerImpl(fun); // 创建新的监听器 topicInfo.dataReader->set_listener( topicInfo.listener, eprosima::fastdds::dds::StatusMask::all()); // 设置新的监听器 delete oldListener; // 删除旧的监听器 } return XNDDSErrorCode::SUCCESS; } /** * @brief 注销订阅者 * @param topicName: 主题名称 */ void unregisterSubscriber(const std::string &topicName) { std::lock_guard locker(m_Mutex); auto it = topics_.find(topicName); if (it != topics_.end()) { TopicInfo &topicInfo = it->second; // 获取主题信息 if (topicInfo.dataReader != nullptr) { topicInfo.subscriber->delete_datareader(topicInfo.dataReader); // 删除数据读取器 topicInfo.dataReader = nullptr; // 设置数据读取器为空 } if (topicInfo.listener != nullptr) { delete topicInfo.listener; // 删除监听器 topicInfo.listener = nullptr; // 设置监听器为空 } if (topicInfo.subscriber != nullptr) { m_Participant->delete_subscriber(topicInfo.subscriber); // 删除订阅者 topicInfo.subscriber = nullptr; // 设置订阅者为空 } if (topicInfo.subscriber == nullptr && topicInfo.publisher == nullptr && topicInfo.topic != nullptr) { m_Participant->delete_topic(topicInfo.topic); // 删除主题 topicInfo.topic = nullptr; // 设置主题为空 topics_.erase(it); // 移除主题 } } } private: /** * @brief 清除所有主题 */ void clearAllTopic() { std::lock_guard locker(m_Mutex); if (m_Participant != nullptr) { while (!topics_.empty()) { unregisterPublisher(topics_.begin()->first); // 注销发布者 unregisterSubscriber(topics_.begin()->first); // 注销订阅者 } } } private: /** * @brief 单例指针 */ static TopicManager *instance; /** * @brief 单例互斥锁 */ static std::mutex instanceMutex; /** * @brief 参与者 */ XNParticipant *m_Participant = nullptr; /** * @brief 主题映射 */ std::map topics_; /** * @brief 主题访问互斥锁 */ std::mutex m_Mutex; };