文章目录

  • 普通消息
    • 消息发送
      • 同步发送消息
      • 异步发送消息
      • 单向发送消息
  • 代码示例
    • 导入RocketMQ的依赖
    • 定义同步消息的发送者
    • 定义异步消息的发送者
    • 定义消费者

普通消息

消息发送

同步发送消息

同步发送消息是指:Producer发出一条消息后,会在收到MQ返回的ACK后才发送下一条消息。该方式的消息可靠性最高,但是消息发送效率太低。

异步发送消息

异步发送消息是指:Producer发出消息后无须等待MQ返回ACK,直接发送下一条消息。该方式有一定可靠性,发送效率相对同步较高

单向发送消息

单向发送消息是指:Producer仅负责发送消息,不等待、不处理MQ的ACK,该发送方式MQ也不返回ACK。该方式发送效率最高,但可靠性最低。

代码示例

导入RocketMQ的依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId>
</dependency>
定义同步消息的发送者
public class SynProducer {public void producer() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {//创建一个Producer,参数为ProducerGroup名称DefaultMQProducer producer = new DefaultMQProducer("testGroup");//指定nameserver地址producer.setNamesrvAddr("Rocemq:9876");//设置失败重试次数producer.setRetryTimesWhenSendFailed(3);//设置发送超时时间为5秒producer.setSendMsgTimeout(5000);//开启生产者producer.start();//生产并发送100条消息for (int i = 0;i<100;i++){byte[] body = ("hello"+i).getBytes();//注意不要倒错包,org.apache.rocketmq.common.message.Message;Message msg = new Message("Topic","Tag",body);msg.setKeys("key:"+i);SendResult sendResult = producer.send(msg);System.out.println(sendResult);}producer.shutdown();}
}

发送的状态:

  • SEND_OK:发送成功
  • FLUSH_DISK_TIMEOUT:刷盘超时,仅在同步刷盘出现,异步不会出现
  • FLUSH_SLAVE_TIMEOUT:slave同步超时,同步复制会出现该状态,异步复制不会出现
  • SLAVE_NOT_AVAILABLE:Slave不可用,同步复制会出现该状态,异步复制不会出现
定义异步消息的发送者
public class AsyncProducer {public void Producer() throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("test");producer.setNamesrvAddr("rocket:9876");//指定异步发送失败后不进行重试producer.setRetryTimesWhenSendFailed(0);//指定新创建的Topic的Queue数量为2producer.setDefaultTopicQueueNums(2);producer.start();for (int i =0;i<100;i++){byte []body = ("hi:"+i).getBytes();try{Message msg = new Message("topicA","tag",body);//异步发送,指定回调producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println(sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.println(throwable);}});//由于采用的是异步发送,若不sleep,会导致消息还未发送就会将producer给关闭,报错TimeUnit.SECONDS.sleep(3);producer.shutdown();} catch (RemotingException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}
}
定义消费者
public class Consumer {public void Consumer() throws MQClientException {//定义一个pull消费者//DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("test");DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("test");pushConsumer.setNamesrvAddr("rocketmq:9876");//指定从第一条消息开始消费pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//指定topic与TagpushConsumer.subscribe("Topic","*");//指定采用"广播模式"进行消费,默认为集群模式pushConsumer.setMessageModel(MessageModel.BROADCASTING);//注册监听器pushConsumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> lists,ConsumeConcurrentlyContext context) {//逐条消费消息for (MessageExt list:lists){System.out.println(list);}//返回消费状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//开启消费者消费pushConsumer.start();System.out.println("Consumer Started");}
}

RocketMQ(十三)——实战-普通消息的发送与消费相关推荐

  1. rocketmq queue_RocketMQ 实战(三) - 消息的有序性

    ■ RocketMQ有序消息的使用 1 为什么需要消息的有序性 比如用户张三终于挣了一百存在在银行卡里存取款,对应两个异步的短信消息,肯定要保证先存后取吧,不然都没钱怎么发了取钱的消息呢! M1 - ...

  2. RocketMQ消息顺序发送和消费问题

    事故现场分析: 由于创新业务产品上线,运营产品想通过一些活动来刺激用户,采用注册邀请机制即可获取积分的相关活动.考虑到后续可能还有其他可能的活动来发放积分,所以设计的时候,采用mq消息模式发放积分,异 ...

  3. 如何在优雅地Spring 中实现消息的发送和消费

    本文将对rocktmq-spring-boot的设计实现做一个简单的介绍,读者可以通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,然后通过一个 ...

  4. 如何在优雅地Spring 中实现消息的发送和消费 1

    本文将对rocktmq-spring-boot的设计实现做一个简单的介绍,读者可以通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,然后通过一个 ...

  5. KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)

    文章目录 一.基础集成 1. 技术选型 2. 导入依赖 3. kafka配置 4. auto-offset-reset 简述 5. 新增一个订单类 6. 生产者(异步) 7. 消费者 8. kafka ...

  6. rocketmq 消息指定_RocketMq 实际案例–普通消息的发送

    RocketMq 实际案例–普通消息的发送 @(消息中间件)[RocketMq 实例] 学习 rocketMq 最根本的是要先学会用嘛,在创建 rocketMq 的第一个案例的时候,碰到很多坑,可以记 ...

  7. 详解,最新整理,RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略

    消息队列常见问题处理 分布式事务 什么是分布式事务 我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之前需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用 ...

  8. RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略

    消息队列常见问题处理 分布式事务 什么是分布式事务 我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之前需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用 ...

  9. 使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息

    作者 | 辽天 来源 | 阿里巴巴云原生公众号 导读:本文将 rocktmq-spring-boot 的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 sp ...

最新文章

  1. java正则表达式的用法_Java 正则表达式的使用
  2. python入门之函数调用内置函数_第九篇 python基础之函数,递归,内置函数
  3. hdfs/hbase 程序利用Kerberos认证超过ticket_lifetime期限后异常
  4. Paket:一个面向.NET的包管理器
  5. leetcood学习笔记-58-最后一个单词的长度
  6. 10 条真心有趣的 Linux 命令
  7. raid5 合适 多少块硬盘_分析Linux raid6同步成raid5导致数据丢失的情况
  8. 动态规划系列问题—从小白到大佬的入门、进阶之旅!!!
  9. 2011计算机等级考试二级c语言公共基础教程.doc,2011年全国计算机等级考试二级c语言公共基础知识复习100题及答案.doc...
  10. 《作业指导书》的发布管理问题与解决办法
  11. qt界面布局之使窗口显示出现在正中间位置
  12. k均值聚类算法案例 r语言iris_聚类分析—系统聚类
  13. python如何实现网页爬取,并翻译成中文
  14. 边框虚线html代码是,网页虚线代码/表格边框虚线代码大全
  15. 深圳禾正医院自控系统案例|能迪科技canlead中央空调净化系统
  16. Word文档没保存电脑死机了,重启打开文档一片空白怎么办?
  17. Win10耳机插上没反应,外放正常怎么解决?
  18. 海尔智家半年报营收净利双增,卡萨帝、三翼鸟贡献几何?
  19. Ubuntu16.04配置deeplabv3+的pytorch版本
  20. 快速幂取余算法总结详解

热门文章

  1. 2017.5.28 codeforce h题思考记录
  2. sdoi r1前的注意事项
  3. 2017.2.19 loli测试
  4. Pentium II Pentium III架构/微架构/流水线 (7) - 微架构框图
  5. go语言实战_字节跳动年薪50W抢Go开发人才,你还在问该不该学?
  6. 2个css特效冲突了怎么办_学生打扫卫生不积极怎么办?改变自己的观念,从2个方面影响学生...
  7. Unreal Engine 4 —— Ghost Mesh Plugin的开发日志
  8. 基于GPU的粒子系统
  9. 大数据售前的一点感悟
  10. JavaScript--函数