当Subscriber 接收到数据的时候,会通过它的Listener或者使能一些Condition通知到Application有一些新的数据是可获取的,然后Application就可以通过其相关的DataReader的操作函数去获取该数据。
关于Subscription模块的类图如下:

有关类图中的关系可以查看:UML类图中的几种关系
由以上类图可以看出:

  • TopicDescription是Topic的父类
  • DataReaderListener是SubscriberListener的父类,继承了其定义的Callback函数
  • ReadCondition是QueryCondition的父类
  • Subscriber可以包含DataReader, QosPolicy, StatusCondition, SubscriberListener
  • DataReader可以包含DataReaderListener,QosPolicy,ReadCondition, TopicDescription, Data, DataSample
  • DataReader可以关联或者包含多个DataSample,每一个DataSample Class有一个Data 和Sample Info
    以上小节分别对其内容进行分析

Subscriber

从Subscriber这个类的定义来看,除了图1类图中的方法之外,它还会继承自Entity的方法,比如get_listener,set_listener,get_qos,set_qos

怎样创建Subscriber

从图1可以看出Subscriber需要调用DomainParticipant 的成员函数创建。

  1. 在domain中创建DomainParticipant
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
{// Errorreturn;
}
  1. 创建CustomSubscriberQos
// A custom SubscriberQos can be provided to the creation method
SubscriberQos custom_qos;// Modify QoS attributes
custom_qos.entity_factory().autoenable_created_entities = false;

SubscriberQos用于控制Subscriber的行为,与Subscriber相关的Qos Policy 主要包含如下:

QosPolicy class 作用
PartitionQosPolicy
PresentationQosPolicy
GroupDataQosPolicy
EntityFactoryQosPolicy
  1. 创建CustomSubscriberListener
CustomSubscriberListener custom_listener;// Modify Listener attributes
// (...)

SubscriberListener 继承自DataReaderListener , 其主要作用是在Subscriber的状态变化或者DataReader的一些特定的Event被触发之后调用其定义的一些callback, 有点儿类似于Hook函数,默认情况下,这部分内容为空,用户使用需要自己定义其内容。

  1. 创建不同的Subscriber
// Create a Subscriber with default SubscriberQos and no Listener
// The value SUBSCRIBER_QOS_DEFAULT is used to denote the default QoS.
Subscriber* subscriber_with_default_qos =participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
if (nullptr == subscriber_with_default_qos)
{// Errorreturn;
}
Subscriber* subscriber_with_custom_qos =participant->create_subscriber(custom_qos);
if (nullptr == subscriber_with_custom_qos)
{// Errorreturn;
}
Subscriber* subscriber_with_default_qos_and_custom_listener =participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT, &custom_listener);
if (nullptr == subscriber_with_default_qos_and_custom_listener)
{// Errorreturn;
}
  1. Subscriber其他属性的更改/获取
// Get the current QoS or create a new one from scratch
SubscriberQos qos = subscriber->get_qos();// Assign the new Qos to the object
subscriber->set_qos(qos1);// Get the current QoS or create a new one from scratch
SubscriberQos qos_type1 = participant->get_default_subscriber_qos();SubscriberQos qos_type2;
// Set as the new default SubscriberQos
if (participant->set_default_subscriber_qos(qos_type2) != ReturnCode_t::RETCODE_OK)
{// Errorreturn;
}

DataReader

怎样创建/删除DataReader

创建DataReader需要Subscriber的成员函数create_datareader,其函数的定义与返回值如下:

DataReader *create_datareader(TopicDescription *topic, const DataReaderQos &reader_qos, DataReaderListener *listener = nullptr, const StatusMask &mask = StatusMask::all())

Parameters: topic – Topic the DataReader will be listening(Mandatory).
reader_qos – QoS of the DataReader(Mandatory).
listener – Pointer to the listener (default: nullptr)(Optional )
mask – StatusMask that holds statuses the listener responds to (default: all)(Optional ).
Returns: Pointer to the created DataReader. nullptr if failed.

reader_qos可以用默认的DataReaderQos也就是DATAREADER_QOS_DEFAULT,也可以用自行创建一个Custom_Qos

A StatusMask that activates or deactivates triggering of individual callbacks on the DataReaderListener. By default all events are enabled.

  1. 创建相应的Subscriber
  2. 创建特定的DataReaderQos
// A custom DataReaderQos can be provided to the creation method
DataReaderQos custom_qos;// Modify QoS attributes
// (...)
  1. 创建特定的CustomDataReaderListener
// CustomDataReaderListener inherits from DataReaderListener.
CustomDataReaderListener custom_listener;
  1. 调用Subscriber的成员函数create_datareader创建,delete_datareader删除
DataReaderQos custom_qos;
CustomDataReaderListener custom_listener;
DataReader* data_reader_with_default_qos_and_custom_listener =subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT, &custom_listener);
if (nullptr == data_reader_with_default_qos_and_custom_listener)
{// Errorreturn;
}
// Delete the DataReader
if (subscriber->delete_datareader(data_reader_with_default_qos_and_custom_listener ) != ReturnCode_t::RETCODE_OK)
{// Errorreturn;
}

Access to the data

DataReader, SampleInfo, Instance, Sample之间的关系

首先需要搞清楚DataReader, SampleInfo, Instance, Sample之间的关系如下:

SampleInfo包含的属性如下表:

SampleInfo Sample/Instance Functionality
sample_state sample 表示每个DataReader对每一个收到的Sample是否读取的标志
view_state sample 表示收到的这个sample是否是其对应的Instance的第一个
instance_state Instance 表示一个Instance的状态:
ALIVE:目前还存在
NOT_ALIVE_DISPOSED:DataWriter 舍弃了此Instance
NOT_ALIVE_NO_WRITERS:DataReader舍弃了此Instance因为远程DataWriter未发布此Instance
valid_data sample TRUE:Data sample包含数据和SampleInfo
FALSE:Data sample只有SampleInfo
disposed_generation_count Instance 表示Instance从NOT ALIVE *变成ALIVE的次数
no_writers_generation_count instance 表示Instance从NOT_ALIVE_NO_WRITERS变成ALIVE的次数
sample_rank Instance 表示针对此Instance目前还有多少个未读取的 Sample
generation_rank Instance 表示从接收到一个Instance的sample到此Instance的最新sample(MRSIC)进入Collection,Instance从NOT ALIVE到ALIVE的次数
absolute_generation_rank Instance 表示从接收到一个Instance的sample到接收此Instance的最新的sample(MRSIC)期间,Instance从NOT ALIVE到ALIVE的次数
source_timestamp sample DataWriter发布此sample的时间戳

DataReader怎么取值

目前看来DataReader获取数据主要有如下三种方式:

  1. 利用Read/Take 等DataReader方法
  2. 利用DataReaderListener callback
  3. 利用Wait-Set与Condition
利用Read/Take, SampleInfo获取数据

DataReader::read()函数原型如下:

ReturnCode_t read(LoanableCollection &data_values, SampleInfoSeq &sample_infos, int32_t max_samples = LENGTH_UNLIMITED, SampleStateMask sample_states = ANY_SAMPLE_STATE, ViewStateMask view_states = ANY_VIEW_STATE, InstanceStateMask instance_states = ANY_INSTANCE_STATE)

DataReader::take()函数原型如下:

ReturnCode_t take(LoanableCollection &data_values, SampleInfoSeq &sample_infos, int32_t max_samples = LENGTH_UNLIMITED, SampleStateMask sample_states = ANY_SAMPLE_STATE, ViewStateMask view_states = ANY_VIEW_STATE, InstanceStateMask instance_states = ANY_INSTANCE_STATE)

以上两个函数返回LoanableCollection 和SampleInfoSeq 数据类型,示例如下:

    // Sequences are automatically initialized to be empty (maximum == 0)FooSeq data_seq;SampleInfoSeq info_seq;// with empty sequences, a take() or read() will return loaned// sequence elementsReturnCode_t ret_code = data_reader->take(data_seq, info_seq,LENGTH_UNLIMITED, ANY_SAMPLE_STATE,ANY_VIEW_STATE, ANY_INSTANCE_STATE);// process the returned data// must return the loaned sequences when done processingdata_reader->return_loan(data_seq, info_seq);// process the returned dataif (ret_code == ReturnCode_t::RETCODE_OK){// Both info_seq.length() and data_seq.length() will have the number of samples returnedfor (FooSeq::size_type n = 0; n < info_seq.length(); ++n){// Only samples for which valid_data is true should be accessedif (info_seq[n].valid_data){// Do something with data_seq[n]}}// must return the loaned sequences when done processingdata_reader->return_loan(data_seq, info_seq);}
利用DataReaderListener获取数据

当DataReader收到其配对的DataWriter发送的数据之后,可以通过DataReaderListener的两个callback来通知应用。

inline virtual void on_data_available(DataReader *reader)
Virtual function to be implemented by the user containing the actions to be performed when a new Data Message is received.

class CustomizedDataReaderListener : public DataReaderListener
{public:CustomizedDataReaderListener(): DataReaderListener(){}virtual ~CustomizedDataReaderListener(){}virtual void on_data_available(DataReader* reader){// Create a data and SampleInfo instanceFoo data;SampleInfo info;// Keep taking data until there is nothing to takewhile (reader->take_next_sample(&data, &info) == ReturnCode_t::RETCODE_OK){if (info.valid_data){// Do something with the datastd::cout << "Received new data value for topic "<< reader->get_topicdescription()->get_name()<< std::endl;}else{std::cout << "Remote writer for topic "<< reader->get_topicdescription()->get_name()<< " is dead" << std::endl;}}}};

inline virtual void on_data_on_readers(Subscriber *sub)
Virtual function to be implemented by the user containing the actions to be performed when a new Data Message is available on any reader.

利用Wait-Set获取数据

通过应用起一个线程,利用Wait-Set等到相应的状态满足或者设定的Timeout之后就获取数据。
示例如下:

// Create a DataReader
DataReader* data_reader =subscriber->create_datareader(topic, DATAREADER_QOS_DEFAULT);
if (nullptr == data_reader)
{// Errorreturn;
}// Prepare a wait-set to wait for data on the DataReader
WaitSet wait_set;  // Create a WaitSet instance
StatusCondition& condition = data_reader->get_statuscondition();//定义DataReader的StatusCondition
condition.set_enabled_statuses(StatusMask::data_available());//设置StatusCondition的trigger_value为Communication Status的DATA_AVAILABLE
wait_set.attach_condition(condition);//将定义的Condition添加到wait_set// Create a data and SampleInfo instance
Foo data;
SampleInfo info;//Define a timeout of 5 seconds
eprosima::fastrtps::Duration_t timeout (5, 0);// Loop reading data as it arrives
// This will make the current thread to be dedicated exclusively to
// waiting and reading data until the remote DataWriter dies
while (true)
{ConditionSeq active_conditions;if (ReturnCode_t::RETCODE_OK == wait_set.wait(active_conditions, timeout)){while (ReturnCode_t::RETCODE_OK == data_reader->take_next_sample(&data, &info)){if (info.valid_data) //如果sample中的Data部分有内容{// Do something with the datastd::cout << "Received new data value for topic "<< topic->get_name()<< std::endl;}else{// If the remote writer is not alive, we exit the reading loopstd::cout << "Remote writer for topic "<< topic->get_name()<< " is dead" << std::endl;break;}}}else{std::cout << "No data this time" << std::endl;}
}

DDS之DCPS Subscription模块相关推荐

  1. DDS之DCPS Infrastructure模块

    DCPS Infrastructure Infrastructure Module Entity Entity Identifier QoS policy Listener Status Status ...

  2. 中间件DDS之DCPS模型

    DCPS:Data-Centric Publish-Subscribe 先来些概念的介绍,后续再来分析源码. 概述 DDS规范有两层,分别是数据本地重构层DLRL(Data Local Reconst ...

  3. DDS (Data Distribution Service) 数据分发服务-规范中文翻译_006

    DDS (Data Distribution Service) 数据分发服务-规范中文翻译_006 2.以数据为中心的订阅发布(DCPS) 2.2 平台无关模型(Platform Independen ...

  4. opensplice dds v6.3.2_给你看个宝贝,近乎完美的DDS正弦波信号音生成器

    好文章当然要分享啦~如果您喜欢这篇文章,请联系后台添加白名单,欢迎转载哟~在测试和验证分辨率高于16位的高精度快速模数转换器(ADC)的交流性能时,需要用到近乎完美的正弦波生成器,该生成器至少支持0k ...

  5. DDS、openDDS和fast DDS介绍

    上一篇文章讲了什么是DDS,以及一些技术特点和openDDS下载,今天继续科普下DDS技术特点和其他版本的实现. DDS DDS采用订阅/发布体系结构,以数据为中心,也就是通过订阅/发布这个结构来实现 ...

  6. 基于FPGA的波、幅、频、相可调DDS信号发生器的设计

    声明:本文只对设计原理和过程作粗略的阐述,详细可以研究我贴出来的完整源代码,也可以私信交流. 若干略缩语解释: FPGA(Field Programmable Gate Array):现场可编程逻辑门 ...

  7. 【FPGA Verilog】手把手教你实现一个DDS信号发生器

    信号发⽣器的设计与实现 1.输出波形:⽅波(占空⽐50%).锯⻮波.三⻆波.脉冲信号(占空⽐连续可调).正弦波.任意波等 2.输出频率:100KHz 3.波形选择:使⽤拨码开关选择 思路: 使用FPG ...

  8. CASE_05 基于FPGA的DDS信号发生器

             该系类博客序言和资源简介可浏览该博客:PREFACE FPGA经典案例序言 快速了解该系列博客的内容与可用 资源. 目录 1 简介 2 DDS原理与方案 2.1 方案一:基于CORD ...

  9. Fast DDS入门二、Fast DDS在Windows平台的编译安装

    Fast DDS入门五.在Windows平台创建一个简单的Fast DDS示例程序 1 Fast DDS动态库的编译安装 本节提供了在Windows环境中从源代码安装Fast DDS的说明.将安装以下 ...

最新文章

  1. leetcode C++ 28. 实现 strStr() 实现 strStr() 函数。 给定一个 haystack 字符串和一个 needle 字符串,在 haystack 字符串中找出 need
  2. svn教程----示例二:测试人员拥有读权限
  3. Python中super()和__init__()方法
  4. HNOI2015 实验比较
  5. c语言程序设计指针教学,C语言程序设计中指针教学要点分析
  6. java socketserver多线程_JAVA I/O(五)多线程网络Socket和ServerSocket
  7. python 处理异常_Python异常处理– Python尝试除外
  8. 传统机器学习流程总结
  9. Linux服务器安全配置三要点
  10. PHP团队 编码规范 代码样式风格规范
  11. 图像处理之调整对比度
  12. cad命令栏怎么调出来_Solidworks工具栏,功能区不见了,怎么调出来?
  13. 此计算机不支持动态磁盘,磁盘无法分区提示此操作系统不支持动态磁盘故障原因分析与解决...
  14. 空气质量等级c语言编程,关于SDS011模块(空气中pm2.5及pm10)单片机c程序实现(链接附源码)...
  15. 一名IT民工开通博客
  16. web网站搭建(nginx优化)二
  17. 野渡梅香舟自横,浅吟彼岸情
  18. 图文并解Word插入修改删除批注
  19. 第10课:《ChatGPT提示工程》—— Guidelines(入门)
  20. 安卓面试宝典,2021最新Android知识体系总结,面试资料分享

热门文章

  1. Axon Framework架构概述
  2. Goldwasser-Micali 公钥加密系统
  3. 8.3 开始使用truffle-contract
  4. three.js 导入显示模型的时候自动计算模型缩放比例
  5. vue中,scss样式的三种写法——当前页面直接定义、@import引入样式、main.js引入公共样式 deep和important的写法
  6. python 制作二维码
  7. 新绝代双骄3终极全攻略6
  8. LockSupport 以及 park、unpark 方法
  9. [XJTU计算机网络安全与管理]——第十三讲 攻击与病毒
  10. 黑龙江科学杂志黑龙江科学杂志社黑龙江科学编辑部2022年第24期目录