首先先了解下DefaultMQPushConsumer 参数:
https://blog.csdn.net/weixin_41715077/article/details/85170893
以及消费模式:
https://blog.csdn.net/qq_36804701/article/details/81481343

简单例子

package com.swk.springboot.rocketmq;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
public class MQPushConsumer {public static void main(String[] args) throws MQClientException {String groupName = "rocketMqGroup1";// 用于把多个Consumer组织到一起,提高并发处理能力DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);// 设置nameServer地址,多个以;分隔consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876");/*** 1. CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 2. CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费 3. CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);/*** CLUSTERING:默认模式,同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所*  订阅topic整体,从而达到负载均衡的目的* BROADCASTING:同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。* */consumer.setMessageModel(MessageModel.BROADCASTING);// 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息consumer.subscribe("order-topic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> mgs,ConsumeConcurrentlyContext consumeconcurrentlycontext) {System.out.println(Thread.currentThread().getName()+"Receive New Messages:"+mgs);// ConsumeConcurrentlyStatus.RECONSUME_LATER boker会根据设置的messageDelayLevel发起重试,默认16次return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}}

实战例子

@Slf4j
@Component
public class RocketMqConsumer implements FactoryBean<DefaultMQPushConsumer>, InitializingBean, DisposableBean {private DefaultMQPushConsumer consumer;private final MessageModel messageModel = MessageModel.CLUSTERING;@Autowiredprivate YourService yourService;@Value("${rocketmq.consumer.groupName}")private String consumerGroupName;@Value("${rocketmq.consumer.instanceName}")private String instanceName;@Value("${rocketmq.consumer.nameServerAddr}")private String nameServerAddr;@Value("${rocketmq.consumer.consumeThreadMin}")private int consumeThreadMin;@Value("${rocketmq.consumer.consumeThreadMax}")private int consumeThreadMax;@Value("${rocketmq.consumer.subscribes.topicAndTags}")private String topicAndTags;@Value("${rocketmq.consumer.pullThresholdForQueue}")private int pullThresholdForQueue;@Value("${rocketmq.consumer.pullBatchSize}")private int pullBatchSize;@Overridepublic void destroy() throws Exception {if (null != this.consumer) {consumer.shutdown();log.info("rocketMq consumer shutdown");}}@Overridepublic DefaultMQPushConsumer getObject() throws Exception {return this.consumer;}@Overridepublic Class<?> getObjectType() {return DefaultMQPushConsumer.class;}@Overridepublic void afterPropertiesSet() throws Exception {consumer = new DefaultMQPushConsumer(consumerGroupName);consumer.setInstanceName(instanceName);consumer.setMessageModel(messageModel);consumer.setNamesrvAddr(nameServerAddr);consumer.setConsumeThreadMax(consumeThreadMax);consumer.setConsumeThreadMin(consumeThreadMin);consumer.setPullBatchSize(pullBatchSize);consumer.setPullThresholdForQueue(pullThresholdForQueue);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 订阅的 topic 和 tagsconsumer.subscribe(topicAndTags, topicAndTags);// 注册监听方式 有序consumer.registerMessageListener((MessageListenerOrderly) (messageExtList, consumeOrderlyContext) -> {MessageExt msg = messageExtList.get(0);String messageBody = new String(msg.getBody());log.info("consume message, msgId={}, topic={}, tags={}, keys={},\n body={}", msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getKeys(), messageBody);// todo: business code to consume message//yourService.xxxreturn ConsumeOrderlyStatus.SUCCESS;});consumer.start();log.info("rocketMQ consumer group [{}] started", consumerGroupName);}
}

RocketMqConsumer 消费者使用范例相关推荐

  1. 用java编写验证码程序_编写,验证和分析实时Java应用程序

    本文是" 用实时Java开发"系列的第三篇也是最后一部分,展示了如何设计,编写,验证和分析基本的实时应用程序. 我们将说明: 应用程序的时间和性能要求. 为什么传统的非实时Java ...

  2. java多线程 生产者消费者_java多线程之生产者消费者经典问题 - 很不错的范例

    /**生产者消费者问题,涉及到几个类 * 第一,这个问题本身就是一个类,即主类 * 第二,既然是生产者.消费者,那么生产者类和消费者类就是必须的 * 第三,生产什么,消费什么,所以物品类是必须的,这里 ...

  3. 如何在 Java 中正确使用 wait, notify 和 notifyAll – 以生产者消费者模型为例

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. wait, notify 和 noti ...

  4. 优秀HTML5网站学习范例:从“饥饿游戏浏览器”谈用户体验

    继影片<饥饿游戏>获得票房成功后,<饥饿游戏2:火星燎原>也于2012年宣布开拍,将在今年的11月22日登陆全球各大院线.值此之际,微软携手美国狮门影业公司和 RED Inte ...

  5. Find-Sec-Bugs 漏洞范例

    第一章 Find-sec-bugs简介 插件介绍: Find-Sec-Bugs 是一款本地 bug 扫描插件 "FindBugs-IDEA" 的 Java 安全漏洞规则扩展库,它支 ...

  6. 消息队列:生产者/消费者模式

    1.什么是生产者消费者模式 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题.生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接 ...

  7. 设计模式——生产者消费者模式

    1 基本概括 2 主要介绍 2.1 概念 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题.生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消 ...

  8. 生产者消费者模式三种实现方式

    目录 1.什么是生产者消费者模式: 2.生产者消费者模型的实现: 第一种:使用 synchronized和wait.notify 第二种:使用 Lock和await.signal 第三种:使用 阻塞队 ...

  9. 文案怎么写?文案范例一次学会!

    对于写作,我知道你有满满的疑问 「写文案有什么好处?」.「作文不好,可以写文案吗?」.「写不好会不会被笑?」.「没有灵感,不知道要写什么?」.「词汇少,能写得出金句吗?」.「文案会不会很难学啊?」.「 ...

最新文章

  1. IOS基础使用PCH文件全局定义宏常量
  2. Java OCR tesseract 图像智能字符识别技术
  3. C++ Primer 第五版 第6章 6.1——函数及函数定义及调用阅读笔记
  4. ClickHouse在字节跳动推荐和广告业务部门的最佳实践
  5. 高级数据结构及应用 —— 使用 bitmap 进行字符串去重
  6. doubango的帧率太低,怎么解决?
  7. 【zk开发】让eclipse识别×.zul文件为xml格式
  8. 高等代数(第三版)北大(参考答案)
  9. ARCore快速入门-02导入ARCore For Unity
  10. 2019IDEA破解安装
  11. EXCEL密码清除——巧用RAR
  12. SVN提交出错--URL access forbidden for unknown reason
  13. word图文设计:如何用图片水印功能制作日历画册
  14. 人工智能守护青山绿水 内蒙古环保厅引入阿里云ET环境大脑
  15. 关于正向级数收敛而它的平方也收敛的证明
  16. 小程序 canvas画图片
  17. 数据库概述09(数据库中的锁机制)
  18. Java基础查漏补缺(个人向)
  19. Getaverse,走向Web3的远方桥梁
  20. 计算机电源大小,电源功率到底选多大?老司机告诉你电源功率怎么选?

热门文章

  1. 卧槽!算法岗又爆了!
  2. Transformer一统江湖:自然语言处理三大特征抽取器比较
  3. Wampserver安装提示没有找到 msvcp120.dll mysql.exe
  4. 商业模式分析——以“我养你App”为例
  5. Android控件全解手册 - 官方SearchView使用/属性/监听/搭配Toolbar/样式/其他/开源项目
  6. 初学Substance Painter:最基本的工作流程
  7. 魔域口袋版服务器维护,《魔域手游》13日例行维护
  8. 聊城大学格创计算机协会干事纳新工作取得圆满成功
  9. poi(1)---概述
  10. Kotlin Contract