XNSim/XNMonitorServer/TopicManager.h

295 lines
9.0 KiB
C
Raw Normal View History

/**
* @file TopicManager.h
* @author jinchao
* @brief
* @version 1.0
* @date 2025-03-10
*
* @copyright Copyright (c) 2025 COMAC
*
*/
#pragma once
#include "XNMonitorServer_global.h"
#include "DataReaderListenerImpl.h"
/**
* @brief
*/
class TopicManager
{
public:
/**
* @brief
*/
TopicManager(const TopicManager &) = delete;
/**
* @brief
*/
TopicManager &operator=(const TopicManager &) = delete;
/**
* @brief
* @return TopicManager*:
*/
static TopicManager *Instance()
{
if (instance == nullptr) {
std::lock_guard<std::mutex> locker(instanceMutex);
if (instance == nullptr) { // 双重检查锁定
instance = new TopicManager();
}
}
return instance;
}
/**
* @brief
*/
static void cleanupParticipant()
{
std::lock_guard<std::mutex> locker(instanceMutex);
if (instance != nullptr) {
instance->clearAllTopic(); // 清理所有主题
if (instance->m_Participant != nullptr) {
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()
->delete_participant(instance->m_Participant); // 删除参与者
instance->m_Participant = nullptr; // 设置参与者为空
}
delete instance; // 删除单例
instance = nullptr; // 设置单例为空
}
}
private:
/**
* @brief
*/
explicit TopicManager() {}
/**
* @brief
*/
virtual ~TopicManager() {}
public:
/**
* @brief
* @param domainId: ID
*/
XNDDSErrorCode initializeParticipant(int domainId)
{
XNParticipantQos participantQos = eprosima::fastdds::dds::PARTICIPANT_QOS_DEFAULT;
participantQos.name("XNMonitor"); // 设置参与者名称
m_Participant =
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->create_participant(
domainId, participantQos); // 创建参与者
if (m_Participant == nullptr) {
return XNDDSErrorCode::INIT_FAILED;
}
return XNDDSErrorCode::SUCCESS;
};
/**
* @brief
* @tparam T:
* @param topicName:
* @return XNDataWriter*:
*/
template <typename T>
XNDDSErrorCode registerPublisher(const std::string &topicName, XNDataWriter *&dataWriter)
{
std::lock_guard<std::mutex> locker(m_Mutex);
if (topics_.find(topicName) == topics_.end()) {
topics_[topicName] = TopicInfo(); // 创建主题信息
TopicInfo &tmp = topics_[topicName]; // 获取主题信息
XNTypeSupport typeSupport(new T()); // 创建类型支持
typeSupport.register_type(m_Participant); // 注册类型
tmp.topic =
m_Participant->create_topic(topicName.c_str(), typeSupport.get_type_name(),
eprosima::fastdds::dds::TOPIC_QOS_DEFAULT); // 创建主题
if (tmp.topic == nullptr) {
topics_.erase(topicName); // 移除主题
return XNDDSErrorCode::TOPIC_CREATE_FAILED;
}
}
TopicInfo &topicInfo = topics_[topicName]; // 获取主题信息
if (topicInfo.publisher == nullptr) {
topicInfo.publisher = m_Participant->create_publisher(
eprosima::fastdds::dds::PUBLISHER_QOS_DEFAULT); // 创建发布者
if (topicInfo.publisher == nullptr) {
return XNDDSErrorCode::PUBLISHER_CREATE_FAILED;
}
}
if (topicInfo.dataWriter == nullptr) {
XNDataWriterQos dataWriterQos;
// 设置数据写入器的持久性策略, 使用瞬态本地持久性
dataWriterQos.durability().kind = eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS;
// 设置数据写入器的生命周期策略, 设置为5秒
dataWriterQos.lifespan().duration = eprosima::fastdds::dds::Duration_t(5, 0);
topicInfo.dataWriter = topicInfo.publisher->create_datawriter(
topicInfo.topic, dataWriterQos); // 创建数据写入器
if (topicInfo.dataWriter == nullptr) {
return XNDDSErrorCode::DATAWRITER_CREATE_FAILED;
}
}
dataWriter = topicInfo.dataWriter;
return XNDDSErrorCode::SUCCESS;
}
/**
* @brief
* @param topicName:
*/
void unregisterPublisher(const std::string &topicName)
{
std::lock_guard<std::mutex> locker(m_Mutex);
auto it = topics_.find(topicName);
if (it != topics_.end()) {
TopicInfo &topicInfo = it->second; // 获取主题信息
if (topicInfo.dataWriter != nullptr) {
topicInfo.publisher->delete_datawriter(topicInfo.dataWriter); // 删除数据写入器
topicInfo.dataWriter = nullptr; // 设置数据写入器为空
}
if (topicInfo.publisher != nullptr) {
m_Participant->delete_publisher(topicInfo.publisher); // 删除发布者
topicInfo.publisher = nullptr; // 设置发布者为空
}
if (topicInfo.publisher == nullptr && topicInfo.subscriber == nullptr
&& topicInfo.topic != nullptr) {
m_Participant->delete_topic(topicInfo.topic); // 删除主题
topicInfo.topic = nullptr; // 设置主题为空
topics_.erase(it); // 移除主题
}
}
}
/**
* @brief
* @tparam T:
* @param topicName:
* @param fun:
*/
template <typename T>
XNDDSErrorCode registerSubscriber(const std::string &topicName,
std::function<void(const typename T::type &)> fun)
{
std::lock_guard<std::mutex> locker(m_Mutex);
if (topics_.find(topicName) == topics_.end()) {
topics_[topicName] = TopicInfo(); // 创建主题信息
TopicInfo &tmp = topics_[topicName]; // 获取主题信息
XNTypeSupport typeSupport(new T()); // 创建类型支持
typeSupport.register_type(m_Participant); // 注册类型
tmp.topic =
m_Participant->create_topic(topicName.c_str(), typeSupport.get_type_name(),
eprosima::fastdds::dds::TOPIC_QOS_DEFAULT); // 创建主题
if (tmp.topic == nullptr) {
topics_.erase(topicName); // 移除主题
return XNDDSErrorCode::TOPIC_CREATE_FAILED; // 返回
}
}
TopicInfo &topicInfo = topics_[topicName]; // 获取主题信息
if (topicInfo.subscriber == nullptr) {
topicInfo.subscriber = m_Participant->create_subscriber(
eprosima::fastdds::dds::SUBSCRIBER_QOS_DEFAULT); // 创建订阅者
if (topicInfo.subscriber == nullptr) {
return XNDDSErrorCode::SUBSCRIBER_CREATE_FAILED; // 返回
}
}
if (topicInfo.dataReader == nullptr) {
XNDataReaderQos dataReaderQos;
dataReaderQos.durability().kind =
eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS; // 设置数据读取器的持久性策略
topicInfo.listener =
new DataReaderListenerImpl<typename T::type>(fun); // 创建数据读取器监听器
topicInfo.dataReader = topicInfo.subscriber->create_datareader(
topicInfo.topic, dataReaderQos, topicInfo.listener); // 创建数据读取器
if (topicInfo.dataReader == nullptr) {
return XNDDSErrorCode::DATAREADER_CREATE_FAILED; // 返回
}
} else {
auto oldListener = topicInfo.dataReader->get_listener(); // 获取旧的监听器
topicInfo.listener =
new DataReaderListenerImpl<typename T::type>(fun); // 创建新的监听器
topicInfo.dataReader->set_listener(
topicInfo.listener,
eprosima::fastdds::dds::StatusMask::all()); // 设置新的监听器
delete oldListener; // 删除旧的监听器
}
return XNDDSErrorCode::SUCCESS;
}
/**
* @brief
* @param topicName:
*/
void unregisterSubscriber(const std::string &topicName)
{
std::lock_guard<std::mutex> locker(m_Mutex);
auto it = topics_.find(topicName);
if (it != topics_.end()) {
TopicInfo &topicInfo = it->second; // 获取主题信息
if (topicInfo.dataReader != nullptr) {
topicInfo.subscriber->delete_datareader(topicInfo.dataReader); // 删除数据读取器
topicInfo.dataReader = nullptr; // 设置数据读取器为空
}
if (topicInfo.listener != nullptr) {
delete topicInfo.listener; // 删除监听器
topicInfo.listener = nullptr; // 设置监听器为空
}
if (topicInfo.subscriber != nullptr) {
m_Participant->delete_subscriber(topicInfo.subscriber); // 删除订阅者
topicInfo.subscriber = nullptr; // 设置订阅者为空
}
if (topicInfo.subscriber == nullptr && topicInfo.publisher == nullptr
&& topicInfo.topic != nullptr) {
m_Participant->delete_topic(topicInfo.topic); // 删除主题
topicInfo.topic = nullptr; // 设置主题为空
topics_.erase(it); // 移除主题
}
}
}
private:
/**
* @brief
*/
void clearAllTopic()
{
std::lock_guard<std::mutex> locker(m_Mutex);
if (m_Participant != nullptr) {
while (!topics_.empty()) {
unregisterPublisher(topics_.begin()->first); // 注销发布者
unregisterSubscriber(topics_.begin()->first); // 注销订阅者
}
}
}
private:
/**
* @brief
*/
static TopicManager *instance;
/**
* @brief
*/
static std::mutex instanceMutex;
/**
* @brief
*/
XNParticipant *m_Participant = nullptr;
/**
* @brief
*/
std::map<std::string, TopicInfo> topics_;
/**
* @brief 访
*/
std::mutex m_Mutex;
};