添加jar包依赖

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version>
</dependency> 

生产者

public class RocketMqProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {//事务消息的时候会用到DefaultMQProducer producer=new DefaultMQProducer("gp_producer_group");producer.setNamesrvAddr("192.168.13.102:9876"); //它会从命名服务器上拿到broker的地址producer.start();int num=0;while(num<20){num++;//Topic//tags -> 标签 (分类) -> (筛选)Message message=new Message("gp_test_topic","TagA",("Hello , RocketMQ:"+num).getBytes());//消息路由策略producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {return list.get(0);}},"key-"+num);}}
}

SendResult中,有一个sendStatus状态,表示消息的发送状态。一共有四种状态

1. FLUSH_DISK_TIMEOUT : 表示没有在规定时间内完成刷盘(需要Broker 的刷盘策Ill创立设置成SYNC_FLUSH 才会报这个错误) 。

2. FLUSH_SLAVE_TIMEOUT :表示在主备方式下,并且Broker 被设置成SYNC_MASTER 方式,没有在设定时间内完成主从同步。

3. SLAVE_NOT_AVAILABLE : 这个状态产生的场景和FLUSH_SLAVE_TIMEOUT 类似, 表示在主备方式下,并且Broker 被设置成SYNC_MASTER ,但是没有找到被配置成Slave 的Broker 。

4. SEND OK :表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave 上?消息在Slave 上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND OK

消费者

consumerGroup:位于同一个consumerGroup中的consumer实例和producerGroup中的各个produer实例承担的角色类似;同一个group中可以配置多个consumer,可以提高消费端的并发消费能力以及容灾

和kafka一样,多个consumer会对消息做负载均衡,意味着同一个topic下的不同messageQueue会分发给同一个group中的不同consumer。

同时,如果我们希望消息能够达到广播的目的,那么只需要把consumer加入到不同的group就行。
 
RocketMQ提供了两种消息消费模型,一种是pull主动拉去,另一种是push,被动接收。但实际上RocketMQ都是pull模式,只是push在pull模式上做了一层封装,也就是pull到消息以后触发业务消费者注册到这里的callback. RocketMQ是基于长轮训来实现消息的pull

nameServer的地址:name server地址,用于获取broker、topic信息

public class RocketMqConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("gp_consumer_group");consumer.setNamesrvAddr("192.168.13.102:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("gp_test_topic","*");/*consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("Receive Message: "+list);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //签收}});*/consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {MessageExt  messageExt=list.get(0);//TODO  --// Throw Exceptio// 重新发送该消息// DLQ(通用设计)if(messageExt.getReconsumeTimes()==3){  //消息重发了三次//持久化 消息记录表return ConsumeOrderlyStatus.SUCCESS; //签收}return ConsumeOrderlyStatus.SUCCESS; //签收}});consumer.start();}
}

消息发送和接收基本应用相关推荐

  1. python 网络编程之Socket通信案例消息发送与接收

    背景 网络编程是python编程中的一项基本技术.本文将实现一个简单的Socket通信案例消息发送与接收 正文 在python中的socket编程的大致流程图如上所示 我们来首先编写客户端的代码: # ...

  2. go 实现 kafka 消息发送、接收

    引言 网络上关于 go 实现 kafka 消息发送和接收的文章很多,但是实际操作起来又不是很清楚,本文在网络资源的基础上,结合自己搭建过程中遇到的问题进行了总结. 本文的实验主机:Mac笔记本. 一. ...

  3. 使用Akka持久化——消息发送与接收

    版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/beliefer/article/details/53929751 前言 在<使用Akka持久化 ...

  4. RabbitMQ消息发送和接收

    1.RabbitMQ的消息发送和接受机制 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列.生产者(producer)创建消息,然后发布到队列(queue)中, ...

  5. 【转】DICOM医学图像处理:DIMSE消息发送与接收“大同小异”之DCMTK fo-dicom mDCM

    转自:https://my.oschina.net/zssure/blog/354816 背景: 从DICOM网络传输一文开始,相继介绍了C-ECHO.C-FIND.C-STORE.C-MOVE等DI ...

  6. JAVA ActiveMQ消息发送和接收

    JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信. ...

  7. linux ibm mq 安装,消息发送与接收

    下载地址 http://public.dhe.ibm.com/ibmdl/export/pub/software/websphere/messaging/mqadv/ 安装 1.2 解压并安装 1.2 ...

  8. Golang实现Kafka消息发送、接收

    一:核心概念 kafka是消息中间件的一种,是一种分布式流平台,是用于构建实时数据管道和流应用程序.具有横向扩展,容错,wicked fast(变态快)等优点. kafka中涉及的名词: 消息记录(r ...

  9. Kafka入门教程 Golang实现Kafka消息发送、接收

    一:核心概念 kafka是消息中间件的一种,是一种分布式流平台,是用于构建实时数据管道和流应用程序.具有横向扩展,容错,wicked fast(变态快)等优点. kafka中涉及的名词: 消息记录(r ...

  10. RabbitMQ——使用Exchange中的fanout交换机实现消息发送和接收

    文章目录: 1.写在前面 2.使用fanout交换机实现消息的发送和接收 2.1 编写消息接收类(有两个) 2.2 编写消息发送类 1.写在前面 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费 ...

最新文章

  1. 杰思安全获数千万元A+轮投资,绿盟科技领投,德联资本跟投
  2. 使用Github搜索开源项目
  3. java hashset 源码_Java集合源码分析-HashSet和LinkedHashSet
  4. 消息队列中点对点与发布订阅区别
  5. PHP 实现列出目录的内容
  6. 蚂蚁金服终端实验室演进之路
  7. 20200210:(leetcode 623)在二叉树中增加一行
  8. 一文读懂特征值分解EVD与奇异值分解SVD
  9. 神舟七号飞船应用计算机进行飞行状态属于,“神舟七号”飞船应用计算机进行飞行状态调整属于()。...
  10. 智齿科技推首款智慧客服产品:机器人代替人工
  11. win10 如何修改 C:\Users\用户名文件夹
  12. 第十一章 枚举与泛型 总结
  13. [Irving]字符串相似度-字符编辑距离算法(c#实现)
  14. 一加8 线刷官方ColorOS尝鲜版遇到的各种问题及解决方案
  15. 英语词汇服饰篇——Bottoms下装
  16. 尚硅谷-ShardingSphere
  17. css 选取第一个标签元素
  18. 浅析GPU计算——CPU和GPU的选择
  19. 单源最短路径算法java_数据结构 - 单源最短路径之迪杰斯特拉(Dijkstra)算法详解(Java)...
  20. WT588E语音芯片+数码管的应用场景介绍

热门文章

  1. js构造函数的浅薄理解
  2. 团队-科学技术器-模块测试过程
  3. Hybris CronJob.
  4. ListCtrl添加右键菜单(ListCtrl类里编辑,给ListCtrl 发送NM_RCLICK消息)
  5. POJ--2449--Remmarguts#39; Date【dijkstra_heap+A*】第K短路
  6. java学习笔记(七)数据库链接字符
  7. [Angular 2] Template property syntax
  8. 开辟经济发展的第二战场
  9. zigbee ti 附带工具使用方法
  10. ORA-00600: 内部错误代码,参数: [qctcte1], [0], [], [], [], [], [], []