上一篇博客着重讲解了一下RocketMQ中的Producer,那么接下来这篇博客来带大家来了解一下RocketMQ中的Consumer角色

上述就是MQ中有关Consumer的类图,下面来介绍一下每个类

1.MQAdmin:底层类,上篇博客已经提过,就不再此重提

2.MQConsumer:Consumer公共的接口,常用的方法如下

如果消费失败的话,消息将会返回到broker中,并且延迟一会消费的时间

void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)  throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

3.MQPushConsumer:Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法

4.MQPullConsumer:Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制

在上图中出现了两类的消费者分别是PushConsumer和PullConsumer,下面来看一下

PushConsumer:通过注册监听的方式来消费信息

/**

* @FileName: Consumer.java

* @Package:com.test

* @Description: TODO

* @author: LUCKY

* @date:2015年12月28日 下午2:43:23

* @version V1.0

*/

package com.test;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import mon.consumer.ConsumeFromWhere;

import mon.message.Message;

import mon.message.MessageExt;

/**

* @ClassName: Consumer

* @Description: 模拟消费者

* @author: LUCKY

* @date:2015年12月28日 下午2:43:23

*/

public class ConsumerTest {

public static void main(String[] args) {

DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");

consumer.setNamesrvAddr("100.66.154.81:9876");

try {

// 订阅PushTopic下Tag为push的消息,都订阅消息

consumer.subscribe("PushTopic", "push");

// 程序第一次启动从消息队列头获取数据

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

//可以修改每次消费消息的数量,默认设置是每次消费一条

// consumer.setConsumeMessageBatchMaxSize(10);

//注册消费的监听

consumer.registerMessageListener(new MessageListenerConcurrently() {

//在此监听中消费信息,并返回消费的状态信息

public ConsumeConcurrentlyStatus consumeMessage(

List msgs,

ConsumeConcurrentlyContext context) {

// msgs中只收集同一个topic,同一个tag,并且key相同的message

// 会把不同的消息分别放置到不同的队列中

for(Message msg:msgs){

System.out.println(new String(msg.getBody()));

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

consumer.start();

Thread.sleep(5000);

//5秒后挂载消费端消费

consumer.suspend();

} catch (Exception e) {

e.printStackTrace();

}

}

}

PullConsumer:通过拉去的方式来消费消息

/**

* @FileName: Consumer.java

* @Package:com.test

* @Description: TODO

* @author: LUCKY

* @date:2015年12月28日 下午2:43:23

* @version V1.0

*/

package com.test;

import java.util.Set;

import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;

import com.alibaba.rocketmq.client.consumer.MessageQueueListener;

import mon.message.MessageQueue;

/**

* @ClassName: Consumer

* @Description: 模拟消费者

* @author: LUCKY

* @date:2015年12月28日 下午2:43:23

*/

public class ConsumerPullTest {

public static void main(String[] args) {

DefaultMQPullConsumer consumer=new DefaultMQPullConsumer();

consumer.setNamesrvAddr("100.66.154.81:9876");

consumer.setConsumerGroup("broker");

try {

consumer.start();

Set messageQueues=consumer.fetchSubscribeMessageQueues("PushTopic");

for(MessageQueue messageQueue:messageQueues){

System.out.println(messageQueue.getTopic());

}

//消息队列的监听

consumer.registerMessageQueueListener("", new MessageQueueListener() {

@Override

//消息队列有改变,就会触发

public void messageQueueChanged(String topic, Set mqAll,

Set mqDivided) {

// TODO Auto-generated method stub

}

});

} catch (Exception e) {

e.printStackTrace();

}

}

}一般在应用中都会采用push的方法来自动的消费信息

rocketmq 消费方式_详解RocketMQ中的Consumer相关推荐

  1. rocketmq 消息指定_详解RocketMQ不同类型的消费者

    原标题:详解RocketMQ不同类型的消费者 云栖君导读:本文节选自云栖社区系列丛书<RocketMQ原理与实战解析>,作者:阿里巴巴数据专家杨开元.本节将重点讲解RocketMQ不同类型 ...

  2. java 判断数组已经存满_详解Java中数组判断元素存在几种方式比较

    1. 通过将数组转换成List,然后使用List中的contains进行判断其是否存在 public static boolean useList(String[] arr,String contai ...

  3. sh执行文件 参数传递_详解shell中脚本参数传递的两种方式

    方式一:$0,$1,$2.. 采用$0,$1,$2..等方式获取脚本命令行传入的参数,值得注意的是,$0获取到的是脚本路径以及脚本名,后面按顺序获取参数,当参数超过10个时(包括10个),需要使用${ ...

  4. python3中unicode怎么写_详解python3中ascii与Unicode使用

    这篇文章主要为大家详解python3中ascii与Unicode使用的相关资料,需要的朋友可以参考下# Auther: Aaron Fan ''' ASCII:不支持中文,1个英文占1个字节 Unic ...

  5. java 死锁 内存消耗_详解Java中synchronized关键字的死锁和内存占用问题

    先看一段synchronized 的详解: synchronized 是 java语言的关键字,当它用来修饰一个方法或者一个代码块的时候,能够保证在同一时刻最多只有一个线程执行该段代码. 一.当两个并 ...

  6. python socket自动重连_详解python3中socket套接字的编码问题解决

    一.TCP 1.tcp服务器创建 #创建服务器 from socket import * from time import ctime #导入ctime HOST = '' #任意主机 PORT = ...

  7. java comparator相等_详解Java中Comparable和Comparator接口的区别

    详解Java中Comparable和Comparator接口的区别 发布于 2020-7-20| 复制链接 摘记: 详解Java中Comparable和Comparator接口的区别本文要来详细分析一 ...

  8. [转载] python中for语句用法_详解Python中for循环的使用_python

    参考链接: 在Python中将else条件语句与for循环一起使用 这篇文章主要介绍了Python中for循环的使用,来自于IBM官方网站技术文档,需要的朋友可以参考下 for 循环 本系列前面 &q ...

  9. python中groupby()函数讲解与示例_详解python中groupby函数通俗易懂

    一.groupby 能做什么? python中groupby函数主要的作用是进行数据的分组以及分组后地组内运算! 对于数据的分组和分组运算主要是指groupby函数的应用,具体函数的规则如下: df[ ...

  10. java list for循环遍历_详解Java中list,set,map的遍历与增强for循环

    详解Java中list,set,map的遍历与增强for循环 Java集合类可分为三大块,分别是从Collection接口延伸出的List.Set和以键值对形式作存储的Map类型集合. 关于增强for ...

最新文章

  1. C++中前置操作符和后置操作符的重载
  2. 快速撑握C#知识点之变量,类型及类型转换
  3. leetcode1. 两数之和(两种方法)
  4. C++实现链式存储二叉树
  5. Java多线程同步机制
  6. OpenStack Orchestration service (编排服务Heat)
  7. ubuntu16.04 装机4:安装防火墙ufw
  8. Flex 布局学习笔记
  9. 最简单代码,适合没学编程的人玩
  10. wow服务器文件夹,《60级魔兽世界WTFWDB文件夹全解析 by Qcat》
  11. 网络之华为USG6000防火墙日志清理
  12. Oracle JDK收费理解
  13. 浏览器字体变大|变小怎么办,浏览器字体大小设置方法
  14. make_interp_spline(x, y[1:151])(x_smooth) ValueError: x and y are incompatible.
  15. 【JVM技术专题】 深入学习Parallel Scavenge回收器「 原理篇」
  16. 开发3D游戏建模都需要哪些软件?软件繁多,如何从中挑选学习?
  17. 从头构建自己的Linux系统
  18. 科技云报道:数字化转型提速,深信服有答案
  19. 全球与中国线路型避雷器市场发展前景与投资战略规划分析报告2022~2028年
  20. 前端面试题总结(HTML+CSS部分)

热门文章

  1. Win7下面更改屏幕旋转快捷键的方法
  2. 为什么会有这么多中间表?
  3. mandriva csdn_与Google的双重开源Mandriva告别,还有更多新闻
  4. IOS企业应用出现无法验证,需要网络连接以在这台iPad上验证。接入互联网并重试
  5. win10护眼模式_这6个电脑操作技巧,会让你爱上Win10系统
  6. Faster RCNN代码详解(三):数据处理的整体结构
  7. C语言---内存操作及基础知识
  8. PHP删除多选checkbox,php一次性删除前台checkbox多选内容的简单示例
  9. Moore-Penrose广义逆矩阵
  10. word设置多级标题