XNSim/Release/include/XNCore/XNDDSManager.h

156 lines
5.5 KiB
C++

#pragma once
#include "XNBaseFrameObject.h"
#include "XNBaseFrameObject_p.h"
struct PublisherInfo {
FAST_DDS_MACRO::Publisher *publisher;
FAST_DDS_MACRO::DataWriter *dataWriter;
};
struct SubscriberInfo {
FAST_DDS_MACRO::Subscriber *subscriber;
FAST_DDS_MACRO::DataReader *dataReader;
};
struct TopicInfo {
FAST_DDS_MACRO::Topic *topic;
std::map<uint32_t, PublisherInfo> publishers_;
std::map<uint32_t, SubscriberInfo> subscribers_;
};
template <typename T>
class DataReaderListenerImpl : public FAST_DDS_MACRO::DataReaderListener
{
public:
DataReaderListenerImpl(std::function<void(const T &)> callback) : callback_(callback) {}
void on_data_available(FAST_DDS_MACRO::DataReader *reader) override
{
FAST_DDS_MACRO::SampleInfo info;
if (reader->take_next_sample(&data_, &info) == FAST_DDS_MACRO::RETCODE_OK
&& info.valid_data) {
callback_(data_);
}
}
private:
T data_;
std::function<void(const T &)> callback_;
};
struct XNDDSManagerPrivate : public XNBaseFrameObjectPrivate {
FAST_DDS_MACRO::DomainParticipant *participant_;
std::map<std::string, TopicInfo> topics_;
std::mutex mutex_;
};
class XNDDSManager : public XNBaseFrameObject
{
XN_METATYPE(XNDDSManager, XNBaseFrameObject)
XN_DECLARE_PRIVATE(XNDDSManager)
public:
XNDDSManager();
~XNDDSManager();
protected:
XNDDSManager(PrivateType *p);
public:
virtual bool Initialize() override;
virtual bool PrepareForExecute() override;
void SetDomainID(uint32_t domainID);
public:
template <typename T>
FAST_DDS_MACRO::DataWriter *RegisterPublisher(const std::string &topicName,
uint32_t publisherID)
{
T_D();
std::lock_guard<std::mutex> lock(d->mutex_);
if (d->topics_.find(topicName) == d->topics_.end()) {
d->topics_[topicName] = TopicInfo();
TopicInfo &tmp = d->topics_[topicName];
FAST_DDS_MACRO::TypeSupport typeSupport(new T());
typeSupport.register_type(d->participant_);
tmp.topic = d->participant_->create_topic(topicName, typeSupport.get_type_name(),
FAST_DDS_MACRO::TOPIC_QOS_DEFAULT);
if (tmp.topic == nullptr) {
LOG_ERROR("0x2130 Create Topic %1 Failed!", topicName);
d->topics_.erase(topicName);
return nullptr;
}
}
TopicInfo &tmp = d->topics_[topicName];
tmp.publishers_[publisherID] = PublisherInfo();
tmp.publishers_[publisherID].publisher =
d->participant_->create_publisher(FAST_DDS_MACRO::PUBLISHER_QOS_DEFAULT, nullptr);
if (tmp.publishers_[publisherID].publisher == nullptr) {
LOG_ERROR("0x2131 Create Publisher %1 for Topic %2 Failed!", publisherID, topicName);
return nullptr;
}
// 设置数据写入器QoS策略
FAST_DDS_MACRO::DataWriterQos dataWriterQos;
// 设置数据写入器的历史记录策略, 只保留最新的一个数据
// dataWriterQos.history().kind = FAST_DDS_MACRO::KEEP_LAST_HISTORY_QOS;
// dataWriterQos.history().depth = 1;
// 设置数据写入器的可靠性策略, 使用最佳努力可靠性
// dataWriterQos.reliability().kind = FAST_DDS_MACRO::BEST_EFFORT_RELIABILITY_QOS;
// 设置数据写入器的持久性策略, 使用瞬态本地持久性
dataWriterQos.durability().kind = FAST_DDS_MACRO::VOLATILE_DURABILITY_QOS;
// 设置数据写入器的生命周期策略, 设置为5秒
dataWriterQos.lifespan().duration = FAST_DDS_MACRO::Duration_t(5, 0);
tmp.publishers_[publisherID].dataWriter =
tmp.publishers_[publisherID].publisher->create_datawriter(tmp.topic, dataWriterQos,
nullptr);
if (tmp.publishers_[publisherID].dataWriter == nullptr) {
LOG_ERROR("0x2132 Create DataWriter %1 for Topic %2 Failed!", publisherID, topicName);
return nullptr;
}
LOG_INFO("0x2133 Create Publisher %1 for Topic %2 Success!", publisherID, topicName);
return tmp.publishers_[publisherID].dataWriter;
}
template <typename T>
void RegisterSubscriber(const std::string &topicName, uint32_t subscriberID,
std::function<void(const typename T::type &)> fun)
{
T_D();
std::lock_guard<std::mutex> lock(d->mutex_);
if (d->topics_.find(topicName) == d->topics_.end()) {
d->topics_[topicName] = TopicInfo();
TopicInfo &tmp = d->topics_[topicName];
FAST_DDS_MACRO::TypeSupport typeSupport(new T());
typeSupport.register_type(d->participant_);
tmp.topic = d->participant_->create_topic(topicName, typeSupport.get_type_name(),
FAST_DDS_MACRO::TOPIC_QOS_DEFAULT);
if (tmp.topic == nullptr) {
LOG_ERROR("0x2130 Create Topic %1 Failed!", topicName);
d->topics_.erase(topicName);
return;
}
}
TopicInfo &tmp = d->topics_[topicName];
tmp.subscribers_[subscriberID] = SubscriberInfo();
tmp.subscribers_[subscriberID].subscriber =
d->participant_->create_subscriber(FAST_DDS_MACRO::SUBSCRIBER_QOS_DEFAULT, nullptr);
if (tmp.subscribers_[subscriberID].subscriber == nullptr) {
LOG_ERROR("0x2135 Create Subscriber %1 for Topic %2 Failed!", subscriberID, topicName);
}
FAST_DDS_MACRO::DataReaderQos dataReaderQos;
dataReaderQos.durability().kind = FAST_DDS_MACRO::VOLATILE_DURABILITY_QOS;
FAST_DDS_MACRO::DataReaderListener *listener =
new DataReaderListenerImpl<typename T::type>(fun);
tmp.subscribers_[subscriberID].dataReader =
tmp.subscribers_[subscriberID].subscriber->create_datareader(tmp.topic, dataReaderQos,
listener);
if (tmp.subscribers_[subscriberID].dataReader == nullptr) {
LOG_ERROR("0x2136 Create DataReader %1 for Topic %2 Failed!", subscriberID, topicName);
}
LOG_INFO("0x2137 Create Subscriber %1 for Topic %2 Success!", subscriberID, topicName);
}
};