• 代码示例
  • 代码阐释
  • 运行效果
  • 内容补充

讲个9·3阅兵时程序员间流传的笑话:

同学们,现在向我们走来的是程序员方阵!他们穿着拖鞋,披着毛巾,左手拿着键盘,右手举着鼠标,腋下夹着USB转换器。他们因睡眠不足而显得精神不振,喊着微弱的口号走过主席台,主席问候:程序员们辛苦了!程序员方队异口同声地答道:Hello World!

——研究一项技术,如果不提及“Hello World”,那指定是外行。

上篇博客,搭了一个最简单的集群——双主群集,这篇博客就利用这个环境,写一个简单的生产者、消费者,来快速体验一下RocketMQ的HelloWorld。


代码示例

Maven配置
引一下jar包,这里还是用3.2.6这一比较经典的版本

<dependency><groupId>com.alibaba.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>3.2.6</version>
</dependency>

pom.xml

  • 生产者
    写一个简单的Producer类,来发送消息:
/*** Producer,发送消息*/
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("group_name");producer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");producer.start();for (int i = 0; i < 100; i++) {try {Message msg = new Message("TopicTest",              // topic"TagA",                                     // tag("HelloWorld - RocketMQ" + i).getBytes()    // body);SendResult sendResult = producer.send(msg);System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}

生产者

  • 消费者
    写一个简单的Consumer类,来接收消息:
/*** Consumer,订阅消息*/
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg: msgs) {try {String topic = msg.getTopic();String tags = msg.getTags();String msgBody = new String(msg.getBody(),"utf-8");System.out.println("收到消息--" + " topic:" + topic + " ,tags:" + tags + " ,msg:" +msgBody);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}
}

消费者


代码阐释

根据上面的生产者和消费者,说明几点内容:

  • GroupName
    无论生产者、消费者都必须给出GroupName,而且具有唯一性!

  • Topic、Tag
    生产到哪个Topic的哪个Tag下,消费者也是从Topic的哪个Tag进行消费,可见这个Tag有点类似于JMS Selector机制,即实现消息的过滤。

  • NameServer
    生产者、消费者需要设置NameServer地址。
    消费方式:这里,采用的是Consumer Push的方式,即设置Listener机制回调,相当于开启了一个线程。


运行效果

光说不练嘴把式,来看一下运行效果:

  • 生产者


生产者运行结果
仔细看看生产者结果输出,就会发现,有的消息发往broker-a,有的在broker-b上,自动实现了消息的负载均衡!

  • 消费者


消费者运行结果
这里消费消息是没有什么顺序的,以后我们在来谈消息的顺序性。

  • 管控台


消费前
会发现消息分布在2个broker上。

消费后


内容补充

  • 启动顺序
    务必保证先启动消费者进行Topic订阅,然后在启动生产者进行生产(否则极有可能导致消息的重复消费,重复消费,重复消费!重要的事情说三遍!)。而且在实际开发中,有时候不会批量的处理消息,而是原子性的,单线程的去一条一条的处理消息,这样就是实时的在处理消息。(批量的处理海量的消息,可以考虑Kafka)

  • 持久化
    在ActiveMQ中,生产消息的时候会提供是否持久化的选择,但是对于RocketMQ而言,消息是一定会被持久化的!

  • 宕机处理
    在多Master模式中,如果某个Master进程挂了,显然这台broker将不可用,上面的消息也将无法消费,要知道开源版本的RocketMQ是没有提供切换程序,来自动恢复故障的,因此在实际开发中,我们一般提供一个监听程序,用于监控Master的状态。

  • 单批次消息消费数量
    上面的消费者采用的是Push Consumer的方式,那么监听的Listener中的消息List到底是多少条呢?虽然提供了API,如consumer.setConsumeMessageBatchMaxSize(10),实际上即使设置了批量的条数,但是注意了,是最大是10,并不意味着每次batch的都是10,只有在消息有挤压的情况下才有可能。而且Push Consumer的最佳实践方式就是一条条的消费,如果需要batch,可以使用Pull Consumer。来做个测试:
    1) 改一下消费者代码


    单批次消息消费数量测试代码
    2) 运行效果–先启动消费者,再启动生产者

    先启消费者后启生产者运行效果
    3) 运行效果–先启动生产者(这样消息会有挤压),再启动消费者

    先启生产者后启消费者运行效果

RocketMQ(三)——HelloWorld相关推荐

  1. RocketMQ 三种发消息的方式

    本文主要 使用 RocketMQ-Spring 封装提供的 RocketMQTemplate ,实现三种(同步.异步.oneway)发送消息的方式.Producer在进行消息发送时可以是阻塞的,也可以 ...

  2. rocketmq(三 java操作rocket API, rocketmq 幂等性)

    JAVA操作rocketmq: 1.导入rocketmq所需要的依赖: <dependency><groupId>com.alibaba.rocketmq</groupI ...

  3. RabbitMQ(三) HelloWorld 单生产者单消费者示例实现

    一.创建Maven工程,引入RabbitMQ依赖. pom.xml 如下: <?xml version="1.0" encoding="UTF-8"?&g ...

  4. IDEA Git操作(三)使用 cherry-pick、交互式 rebase 自由修改提交树

    说明 本教程按照 git在线练习 顺序进行,将在线测试的命令操作落地到 IDEA,使用开发工具来实现所有在线练习中的操作. 你可以结合 git在线练习 来学习本教程,先在线学习git命令,再在 IDE ...

  5. RocketMQ 部署不当导致磁盘空间不释放

    背景 生产环境采用 RocketMQ 三主三从集群搭建,6 个实例部署在 3 台 Linux 服务器上(节省资源),每台服务器部署一主一从,生产上运行一段时间后,发现磁盘空间报警,发现df与du显示的 ...

  6. Windows RGBDS 及 BGB 的安装 及 HelloWorld

    本文私用,不对外公开 相关文件下载链接: https://wwm.lanzouv.com/b03vdncpi 密码:e4qi 一. 安装RGBDS 测试RGBDS是否安装成功: 1."win ...

  7. SpringBoot——入门(HelloWorld和探究HelloWorld)

    一.简介 Spring Boot 是由 Pivotal 团队提供的全新框架,其设计目的是用来简化新 Spring 应用的初始搭建以及开发过程.该框架使用了特定的方式来进行配置,从而使开发人员不再需要定 ...

  8. RocketMQ吐血整理

    RocketMQ消息队列 本博文借鉴RocketMQ官网https://www.aliyun.com/product/rocketmq?spm=a2c4g.11186623.2.26.1d81a08b ...

  9. RocketMq(一)-下载安装

    本篇记录RocketMq下载.安装以及常用命令. 一.下载 (1)官网下载地址:https://rocketmq.apache.org/dowloading/releases/ 本篇以4.9.4版本为 ...

最新文章

  1. 10个你值得收藏的牛逼开源后台控制面板
  2. 分享一个测试图片的方式
  3. UiPickerView基本使用方法
  4. ConcurrentHashMap的实现原理和源码分析
  5. 一个jsp能取到父类jsp的值吗_「Javaweb」ssm整合权限控制框架shiro,你知道怎么做吗?...
  6. linux mysql端口启动失败怎么办,Linux下apache mysql等服务修改默认端口后无法正常启动解决办法...
  7. 用jquery实现图片轮播
  8. Item 22. 模板方法与曲线救国(Template Method)
  9. 推荐10个高效好用的办公软件,极大提升效率
  10. 佳能Canon imageCLASS MF4712 打印机驱动
  11. Android / iOS 招聘
  12. android手机api等级_什么是Android API 级别?
  13. 【总结】线性代数的本质 - 3
  14. SVN强制编写注释才能提交,提交中不允许删除文件操作。
  15. 开源进企业8月27日活动
  16. 互联网舆情监测公司监测哪些内容,TOOM北京舆情监测公司
  17. ROUGE和pyrouge的安装
  18. 第13章:直方图处理
  19. 2016全国房价趋势
  20. 彻底搞懂编码 GBK 和 UTF8

热门文章

  1. JAVA基本程序设计规范
  2. 优橙教育内推岗位—5G网络优化工程
  3. 计算机基础在线阅读,计算机基础答案.docx
  4. 关于企业微信代开发网页授权问题(OAuth2)
  5. 泛在电力物联网技术及战略解读:一个战略 两个领域 三个阶段
  6. 毕业设计 STM32的智能饮水机控制系统(源码+硬件+论文)
  7. remote call
  8. 犯罪心理学Seasons one
  9. CompileFlow 学习与使用--第一节
  10. 美国历任总统的演讲视频