rocketmq 消费方式_详解RocketMQ中的Consumer
上一篇博客着重讲解了一下RocketMQ中的Producer,那么接下来这篇博客来带大家来了解一下RocketMQ中的Consumer角色
上述就是MQ中有关Consumer的类图,下面来介绍一下每个类
1.MQAdmin:底层类,上篇博客已经提过,就不再此重提
2.MQConsumer:Consumer公共的接口,常用的方法如下
如果消费失败的话,消息将会返回到broker中,并且延迟一会消费的时间
void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
3.MQPushConsumer:Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法
4.MQPullConsumer:Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制
在上图中出现了两类的消费者分别是PushConsumer和PullConsumer,下面来看一下
PushConsumer:通过注册监听的方式来消费信息
/**
* @FileName: Consumer.java
* @Package:com.test
* @Description: TODO
* @author: LUCKY
* @date:2015年12月28日 下午2:43:23
* @version V1.0
*/
package com.test;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import mon.consumer.ConsumeFromWhere;
import mon.message.Message;
import mon.message.MessageExt;
/**
* @ClassName: Consumer
* @Description: 模拟消费者
* @author: LUCKY
* @date:2015年12月28日 下午2:43:23
*/
public class ConsumerTest {
public static void main(String[] args) {
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");
consumer.setNamesrvAddr("100.66.154.81:9876");
try {
// 订阅PushTopic下Tag为push的消息,都订阅消息
consumer.subscribe("PushTopic", "push");
// 程序第一次启动从消息队列头获取数据
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//可以修改每次消费消息的数量,默认设置是每次消费一条
// consumer.setConsumeMessageBatchMaxSize(10);
//注册消费的监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
//在此监听中消费信息,并返回消费的状态信息
public ConsumeConcurrentlyStatus consumeMessage(
List msgs,
ConsumeConcurrentlyContext context) {
// msgs中只收集同一个topic,同一个tag,并且key相同的message
// 会把不同的消息分别放置到不同的队列中
for(Message msg:msgs){
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
Thread.sleep(5000);
//5秒后挂载消费端消费
consumer.suspend();
} catch (Exception e) {
e.printStackTrace();
}
}
}
PullConsumer:通过拉去的方式来消费消息
/**
* @FileName: Consumer.java
* @Package:com.test
* @Description: TODO
* @author: LUCKY
* @date:2015年12月28日 下午2:43:23
* @version V1.0
*/
package com.test;
import java.util.Set;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.MessageQueueListener;
import mon.message.MessageQueue;
/**
* @ClassName: Consumer
* @Description: 模拟消费者
* @author: LUCKY
* @date:2015年12月28日 下午2:43:23
*/
public class ConsumerPullTest {
public static void main(String[] args) {
DefaultMQPullConsumer consumer=new DefaultMQPullConsumer();
consumer.setNamesrvAddr("100.66.154.81:9876");
consumer.setConsumerGroup("broker");
try {
consumer.start();
Set messageQueues=consumer.fetchSubscribeMessageQueues("PushTopic");
for(MessageQueue messageQueue:messageQueues){
System.out.println(messageQueue.getTopic());
}
//消息队列的监听
consumer.registerMessageQueueListener("", new MessageQueueListener() {
@Override
//消息队列有改变,就会触发
public void messageQueueChanged(String topic, Set mqAll,
Set mqDivided) {
// TODO Auto-generated method stub
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}一般在应用中都会采用push的方法来自动的消费信息
rocketmq 消费方式_详解RocketMQ中的Consumer相关推荐
- rocketmq 消息指定_详解RocketMQ不同类型的消费者
原标题:详解RocketMQ不同类型的消费者 云栖君导读:本文节选自云栖社区系列丛书<RocketMQ原理与实战解析>,作者:阿里巴巴数据专家杨开元.本节将重点讲解RocketMQ不同类型 ...
- java 判断数组已经存满_详解Java中数组判断元素存在几种方式比较
1. 通过将数组转换成List,然后使用List中的contains进行判断其是否存在 public static boolean useList(String[] arr,String contai ...
- sh执行文件 参数传递_详解shell中脚本参数传递的两种方式
方式一:$0,$1,$2.. 采用$0,$1,$2..等方式获取脚本命令行传入的参数,值得注意的是,$0获取到的是脚本路径以及脚本名,后面按顺序获取参数,当参数超过10个时(包括10个),需要使用${ ...
- python3中unicode怎么写_详解python3中ascii与Unicode使用
这篇文章主要为大家详解python3中ascii与Unicode使用的相关资料,需要的朋友可以参考下# Auther: Aaron Fan ''' ASCII:不支持中文,1个英文占1个字节 Unic ...
- java 死锁 内存消耗_详解Java中synchronized关键字的死锁和内存占用问题
先看一段synchronized 的详解: synchronized 是 java语言的关键字,当它用来修饰一个方法或者一个代码块的时候,能够保证在同一时刻最多只有一个线程执行该段代码. 一.当两个并 ...
- python socket自动重连_详解python3中socket套接字的编码问题解决
一.TCP 1.tcp服务器创建 #创建服务器 from socket import * from time import ctime #导入ctime HOST = '' #任意主机 PORT = ...
- java comparator相等_详解Java中Comparable和Comparator接口的区别
详解Java中Comparable和Comparator接口的区别 发布于 2020-7-20| 复制链接 摘记: 详解Java中Comparable和Comparator接口的区别本文要来详细分析一 ...
- [转载] python中for语句用法_详解Python中for循环的使用_python
参考链接: 在Python中将else条件语句与for循环一起使用 这篇文章主要介绍了Python中for循环的使用,来自于IBM官方网站技术文档,需要的朋友可以参考下 for 循环 本系列前面 &q ...
- python中groupby()函数讲解与示例_详解python中groupby函数通俗易懂
一.groupby 能做什么? python中groupby函数主要的作用是进行数据的分组以及分组后地组内运算! 对于数据的分组和分组运算主要是指groupby函数的应用,具体函数的规则如下: df[ ...
- java list for循环遍历_详解Java中list,set,map的遍历与增强for循环
详解Java中list,set,map的遍历与增强for循环 Java集合类可分为三大块,分别是从Collection接口延伸出的List.Set和以键值对形式作存储的Map类型集合. 关于增强for ...
最新文章
- C++中前置操作符和后置操作符的重载
- 快速撑握C#知识点之变量,类型及类型转换
- leetcode1. 两数之和(两种方法)
- C++实现链式存储二叉树
- Java多线程同步机制
- OpenStack Orchestration service (编排服务Heat)
- ubuntu16.04 装机4:安装防火墙ufw
- Flex 布局学习笔记
- 最简单代码,适合没学编程的人玩
- wow服务器文件夹,《60级魔兽世界WTFWDB文件夹全解析 by Qcat》
- 网络之华为USG6000防火墙日志清理
- Oracle JDK收费理解
- 浏览器字体变大|变小怎么办,浏览器字体大小设置方法
- make_interp_spline(x, y[1:151])(x_smooth) ValueError: x and y are incompatible.
- 【JVM技术专题】 深入学习Parallel Scavenge回收器「 原理篇」
- 开发3D游戏建模都需要哪些软件?软件繁多,如何从中挑选学习?
- 从头构建自己的Linux系统
- 科技云报道:数字化转型提速,深信服有答案
- 全球与中国线路型避雷器市场发展前景与投资战略规划分析报告2022~2028年
- 前端面试题总结(HTML+CSS部分)