消息发送和接收基本应用
添加jar包依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version>
</dependency>
生产者
public class RocketMqProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {//事务消息的时候会用到DefaultMQProducer producer=new DefaultMQProducer("gp_producer_group");producer.setNamesrvAddr("192.168.13.102:9876"); //它会从命名服务器上拿到broker的地址producer.start();int num=0;while(num<20){num++;//Topic//tags -> 标签 (分类) -> (筛选)Message message=new Message("gp_test_topic","TagA",("Hello , RocketMQ:"+num).getBytes());//消息路由策略producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {return list.get(0);}},"key-"+num);}}
}
SendResult中,有一个sendStatus状态,表示消息的发送状态。一共有四种状态
1. FLUSH_DISK_TIMEOUT : 表示没有在规定时间内完成刷盘(需要Broker 的刷盘策Ill创立设置成SYNC_FLUSH 才会报这个错误) 。
2. FLUSH_SLAVE_TIMEOUT :表示在主备方式下,并且Broker 被设置成SYNC_MASTER 方式,没有在设定时间内完成主从同步。
3. SLAVE_NOT_AVAILABLE : 这个状态产生的场景和FLUSH_SLAVE_TIMEOUT 类似, 表示在主备方式下,并且Broker 被设置成SYNC_MASTER ,但是没有找到被配置成Slave 的Broker 。
4. SEND OK :表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave 上?消息在Slave 上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND OK
消费者
consumerGroup:位于同一个consumerGroup中的consumer实例和producerGroup中的各个produer实例承担的角色类似;同一个group中可以配置多个consumer,可以提高消费端的并发消费能力以及容灾
和kafka一样,多个consumer会对消息做负载均衡,意味着同一个topic下的不同messageQueue会分发给同一个group中的不同consumer。
同时,如果我们希望消息能够达到广播的目的,那么只需要把consumer加入到不同的group就行。
RocketMQ提供了两种消息消费模型,一种是pull主动拉去,另一种是push,被动接收。但实际上RocketMQ都是pull模式,只是push在pull模式上做了一层封装,也就是pull到消息以后触发业务消费者注册到这里的callback. RocketMQ是基于长轮训来实现消息的pull
nameServer的地址:name server地址,用于获取broker、topic信息
public class RocketMqConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("gp_consumer_group");consumer.setNamesrvAddr("192.168.13.102:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("gp_test_topic","*");/*consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("Receive Message: "+list);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //签收}});*/consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {MessageExt messageExt=list.get(0);//TODO --// Throw Exceptio// 重新发送该消息// DLQ(通用设计)if(messageExt.getReconsumeTimes()==3){ //消息重发了三次//持久化 消息记录表return ConsumeOrderlyStatus.SUCCESS; //签收}return ConsumeOrderlyStatus.SUCCESS; //签收}});consumer.start();}
}
消息发送和接收基本应用相关推荐
- python 网络编程之Socket通信案例消息发送与接收
背景 网络编程是python编程中的一项基本技术.本文将实现一个简单的Socket通信案例消息发送与接收 正文 在python中的socket编程的大致流程图如上所示 我们来首先编写客户端的代码: # ...
- go 实现 kafka 消息发送、接收
引言 网络上关于 go 实现 kafka 消息发送和接收的文章很多,但是实际操作起来又不是很清楚,本文在网络资源的基础上,结合自己搭建过程中遇到的问题进行了总结. 本文的实验主机:Mac笔记本. 一. ...
- 使用Akka持久化——消息发送与接收
版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/beliefer/article/details/53929751 前言 在<使用Akka持久化 ...
- RabbitMQ消息发送和接收
1.RabbitMQ的消息发送和接受机制 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列.生产者(producer)创建消息,然后发布到队列(queue)中, ...
- 【转】DICOM医学图像处理:DIMSE消息发送与接收“大同小异”之DCMTK fo-dicom mDCM
转自:https://my.oschina.net/zssure/blog/354816 背景: 从DICOM网络传输一文开始,相继介绍了C-ECHO.C-FIND.C-STORE.C-MOVE等DI ...
- JAVA ActiveMQ消息发送和接收
JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信. ...
- linux ibm mq 安装,消息发送与接收
下载地址 http://public.dhe.ibm.com/ibmdl/export/pub/software/websphere/messaging/mqadv/ 安装 1.2 解压并安装 1.2 ...
- Golang实现Kafka消息发送、接收
一:核心概念 kafka是消息中间件的一种,是一种分布式流平台,是用于构建实时数据管道和流应用程序.具有横向扩展,容错,wicked fast(变态快)等优点. kafka中涉及的名词: 消息记录(r ...
- Kafka入门教程 Golang实现Kafka消息发送、接收
一:核心概念 kafka是消息中间件的一种,是一种分布式流平台,是用于构建实时数据管道和流应用程序.具有横向扩展,容错,wicked fast(变态快)等优点. kafka中涉及的名词: 消息记录(r ...
- RabbitMQ——使用Exchange中的fanout交换机实现消息发送和接收
文章目录: 1.写在前面 2.使用fanout交换机实现消息的发送和接收 2.1 编写消息接收类(有两个) 2.2 编写消息发送类 1.写在前面 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费 ...
最新文章
- 杰思安全获数千万元A+轮投资,绿盟科技领投,德联资本跟投
- 使用Github搜索开源项目
- java hashset 源码_Java集合源码分析-HashSet和LinkedHashSet
- 消息队列中点对点与发布订阅区别
- PHP 实现列出目录的内容
- 蚂蚁金服终端实验室演进之路
- 20200210:(leetcode 623)在二叉树中增加一行
- 一文读懂特征值分解EVD与奇异值分解SVD
- 神舟七号飞船应用计算机进行飞行状态属于,“神舟七号”飞船应用计算机进行飞行状态调整属于()。...
- 智齿科技推首款智慧客服产品:机器人代替人工
- win10 如何修改 C:\Users\用户名文件夹
- 第十一章 枚举与泛型 总结
- [Irving]字符串相似度-字符编辑距离算法(c#实现)
- 一加8 线刷官方ColorOS尝鲜版遇到的各种问题及解决方案
- 英语词汇服饰篇——Bottoms下装
- 尚硅谷-ShardingSphere
- css 选取第一个标签元素
- 浅析GPU计算——CPU和GPU的选择
- 单源最短路径算法java_数据结构 - 单源最短路径之迪杰斯特拉(Dijkstra)算法详解(Java)...
- WT588E语音芯片+数码管的应用场景介绍
热门文章
- js构造函数的浅薄理解
- 团队-科学技术器-模块测试过程
- Hybris CronJob.
- ListCtrl添加右键菜单(ListCtrl类里编辑,给ListCtrl 发送NM_RCLICK消息)
- POJ--2449--Remmarguts#39; Date【dijkstra_heap+A*】第K短路
- java学习笔记(七)数据库链接字符
- [Angular 2] Template property syntax
- 开辟经济发展的第二战场
- zigbee ti 附带工具使用方法
- ORA-00600: 内部错误代码,参数: [qctcte1], [0], [], [], [], [], [], []