#pragma once #include "XNBaseFrameObject.h" #include "XNBaseFrameObject_p.h" struct PublisherInfo { FAST_DDS_MACRO::Publisher *publisher; FAST_DDS_MACRO::DataWriter *dataWriter; }; struct SubscriberInfo { FAST_DDS_MACRO::Subscriber *subscriber; FAST_DDS_MACRO::DataReader *dataReader; }; struct TopicInfo { FAST_DDS_MACRO::Topic *topic; std::map publishers_; std::map subscribers_; }; template class DataReaderListenerImpl : public FAST_DDS_MACRO::DataReaderListener { public: DataReaderListenerImpl(std::function callback) : callback_(callback) {} void on_data_available(FAST_DDS_MACRO::DataReader *reader) override { FAST_DDS_MACRO::SampleInfo info; if (reader->take_next_sample(&data_, &info) == FAST_DDS_MACRO::RETCODE_OK && info.valid_data) { callback_(data_); } } private: T data_; std::function callback_; }; struct XNDDSManagerPrivate : public XNBaseFrameObjectPrivate { FAST_DDS_MACRO::DomainParticipant *participant_; std::map topics_; std::mutex mutex_; }; class XNDDSManager : public XNBaseFrameObject { XN_METATYPE(XNDDSManager, XNBaseFrameObject) XN_DECLARE_PRIVATE(XNDDSManager) public: XNDDSManager(); ~XNDDSManager(); protected: XNDDSManager(PrivateType *p); public: virtual bool Initialize() override; virtual bool PrepareForExecute() override; void SetDomainID(uint32_t domainID); public: template FAST_DDS_MACRO::DataWriter *RegisterPublisher(const std::string &topicName, uint32_t publisherID) { T_D(); std::lock_guard lock(d->mutex_); if (d->topics_.find(topicName) == d->topics_.end()) { d->topics_[topicName] = TopicInfo(); TopicInfo &tmp = d->topics_[topicName]; FAST_DDS_MACRO::TypeSupport typeSupport(new T()); typeSupport.register_type(d->participant_); tmp.topic = d->participant_->create_topic(topicName, typeSupport.get_type_name(), FAST_DDS_MACRO::TOPIC_QOS_DEFAULT); if (tmp.topic == nullptr) { LOG_ERROR("0x2130 Create Topic %1 Failed!", topicName); d->topics_.erase(topicName); return nullptr; } } TopicInfo &tmp = d->topics_[topicName]; tmp.publishers_[publisherID] = PublisherInfo(); tmp.publishers_[publisherID].publisher = d->participant_->create_publisher(FAST_DDS_MACRO::PUBLISHER_QOS_DEFAULT, nullptr); if (tmp.publishers_[publisherID].publisher == nullptr) { LOG_ERROR("0x2131 Create Publisher %1 for Topic %2 Failed!", publisherID, topicName); return nullptr; } // 设置数据写入器QoS策略 FAST_DDS_MACRO::DataWriterQos dataWriterQos; // 设置数据写入器的历史记录策略, 只保留最新的一个数据 // dataWriterQos.history().kind = FAST_DDS_MACRO::KEEP_LAST_HISTORY_QOS; // dataWriterQos.history().depth = 1; // 设置数据写入器的可靠性策略, 使用最佳努力可靠性 // dataWriterQos.reliability().kind = FAST_DDS_MACRO::BEST_EFFORT_RELIABILITY_QOS; // 设置数据写入器的持久性策略, 使用瞬态本地持久性 dataWriterQos.durability().kind = FAST_DDS_MACRO::VOLATILE_DURABILITY_QOS; // 设置数据写入器的生命周期策略, 设置为5秒 dataWriterQos.lifespan().duration = FAST_DDS_MACRO::Duration_t(5, 0); tmp.publishers_[publisherID].dataWriter = tmp.publishers_[publisherID].publisher->create_datawriter(tmp.topic, dataWriterQos, nullptr); if (tmp.publishers_[publisherID].dataWriter == nullptr) { LOG_ERROR("0x2132 Create DataWriter %1 for Topic %2 Failed!", publisherID, topicName); return nullptr; } LOG_INFO("0x2133 Create Publisher %1 for Topic %2 Success!", publisherID, topicName); return tmp.publishers_[publisherID].dataWriter; } template void RegisterSubscriber(const std::string &topicName, uint32_t subscriberID, std::function fun) { T_D(); std::lock_guard lock(d->mutex_); if (d->topics_.find(topicName) == d->topics_.end()) { d->topics_[topicName] = TopicInfo(); TopicInfo &tmp = d->topics_[topicName]; FAST_DDS_MACRO::TypeSupport typeSupport(new T()); typeSupport.register_type(d->participant_); tmp.topic = d->participant_->create_topic(topicName, typeSupport.get_type_name(), FAST_DDS_MACRO::TOPIC_QOS_DEFAULT); if (tmp.topic == nullptr) { LOG_ERROR("0x2130 Create Topic %1 Failed!", topicName); d->topics_.erase(topicName); return; } } TopicInfo &tmp = d->topics_[topicName]; tmp.subscribers_[subscriberID] = SubscriberInfo(); tmp.subscribers_[subscriberID].subscriber = d->participant_->create_subscriber(FAST_DDS_MACRO::SUBSCRIBER_QOS_DEFAULT, nullptr); if (tmp.subscribers_[subscriberID].subscriber == nullptr) { LOG_ERROR("0x2135 Create Subscriber %1 for Topic %2 Failed!", subscriberID, topicName); } FAST_DDS_MACRO::DataReaderQos dataReaderQos; dataReaderQos.durability().kind = FAST_DDS_MACRO::VOLATILE_DURABILITY_QOS; FAST_DDS_MACRO::DataReaderListener *listener = new DataReaderListenerImpl(fun); tmp.subscribers_[subscriberID].dataReader = tmp.subscribers_[subscriberID].subscriber->create_datareader(tmp.topic, dataReaderQos, listener); if (tmp.subscribers_[subscriberID].dataReader == nullptr) { LOG_ERROR("0x2136 Create DataReader %1 for Topic %2 Failed!", subscriberID, topicName); } LOG_INFO("0x2137 Create Subscriber %1 for Topic %2 Success!", subscriberID, topicName); } };