RocketMQ API使用简介、拉取机制
分布式开放消息系统(RocketMQ)的原理与实践
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {// 声明并初始化一个producer// 需要一个producer group名字作为构造方法的参数,这里为producer1DefaultMQProducer producer = new DefaultMQProducer("order_producer");producer.setNamesrvAddr("localhost:9876");//producer.createTopic(key, newTopic, queueNum);producer.start();//String[] tags = new String[] {"TagA","TagC","TagD"};Date date = new Date();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(date);for (int i = 1; i <= 5; i++) {try {// 时间戳String body = dateStr + " order_0 " + i;Message msg = new Message("TopicTest",// topic//tags[i%tags.length],// tag"order_0",// tag"KEY"+i,body.getBytes()// body);SendResult sendResult = producer.send(msg,new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer)arg; return mqs.get(id);}// 0是队列的下标},0);System.out.println(sendResult + ", body:" + body);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}for (int i = 1; i <= 5; i++) {try {// 时间戳String body = dateStr + " order_1 " + i;Message msg = new Message("TopicTest",// topic//tags[i%tags.length],// tag"order_1",// tag"KEY"+i,body.getBytes()// body);SendResult sendResult = producer.send(msg,new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer)arg; return mqs.get(id);}// 1是队列的下标},1);System.out.println(sendResult + ", body:" + body);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}for (int i = 1; i <= 5; i++) {try {// 时间戳String body = dateStr + " order_2 " + i;Message msg = new Message("TopicTest",// topic//tags[i%tags.length],// tag"order_2",// tag"KEY"+i,body.getBytes()// body);SendResult sendResult = producer.send(msg,new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer)arg; return mqs.get(id);}// 2是队列的下标},2);System.out.println(sendResult + ", body:" + body);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;public class Consumer1 {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer");consumer.setNamesrvAddr("localhost:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);/**消费线程池最小数量:默认10**/consumer.setConsumeThreadMin(10);/**消费线程池最大数量:默认120**/consumer.setConsumeThreadMax(20);// 订阅的主题,以及过滤的标签内容consumer.subscribe("TopicTest", "*");//设置一个Listener,主要进行消息的逻辑处理consumer.registerMessageListener(new MessageListenerOrderly() {private Random random = new Random();@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {// 设置自动提交context.setAutoCommit(true);for(MessageExt msg: msgs) {System.out.println(msg + ",content:" + new String(msg.getBody()));}try {TimeUnit.SECONDS.sleep(random.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}});//调用start()方法启动consumerconsumer.start();System.out.println("C1 Started.");}
}
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;public class Consumer2 {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer");consumer.setNamesrvAddr("localhost:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);/**消费线程池最小数量:默认10**/consumer.setConsumeThreadMin(10);/**消费线程池最大数量:默认120**/consumer.setConsumeThreadMax(20);// 订阅的主题,以及过滤的标签内容consumer.subscribe("TopicTest", "*");//设置一个Listener,主要进行消息的逻辑处理consumer.registerMessageListener(new MessageListenerOrderly() {private Random random = new Random();@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {// 设置自动提交context.setAutoCommit(true);for(MessageExt msg: msgs) {System.out.println(msg + ",content:" + new String(msg.getBody()));}try {TimeUnit.SECONDS.sleep(random.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}});//调用start()方法启动consumerconsumer.start();System.out.println("C2 Started.");}
}
RocketMQ API使用简介、拉取机制相关推荐
- 【RocketMQ】消息的拉取
RocketMQ消息的消费以组为单位,有两种消费模式: 广播模式:同一个消息队列可以分配给组内的每个消费者,每条消息可以被组内的消费者进行消费. 集群模式:同一个消费组下,一个消息队列同一时间只能分配 ...
- 【OpenGL】六、Visual Studio 2019 配置 GitHub ( 提取和拉取简介 | 拉取远程代码 )
文章目录 一.提取和拉取简介 二.拉取远程代码 一.提取和拉取简介 在 " 团队资源管理器 " 主页中 , 选择 " 同步 " 选项 , 在同步页面中 , 有 ...
- rocketmq中的消息拉取及并发消费理解
消息拉取采用单线程形式,便于消息的顺序拉取 默认批量取32个,出现性能考虑,减少网络请求.不能保证会拉取到32个,因为消息队列中的存放的是topic-queueid对应的索引,会包含多个tag,而消息 ...
- 4.4.2 将拉取偏移量作为提交偏移量
4.4.2 将拉取偏移量作为提交偏移量 旧API中,当客户端迭代消费消息时会更新分区信息的已消费偏移量,并且有一个后台线程定时将分区信息的已消费偏移量作为已提交偏移量发送给协调者节点. 新API中,订 ...
- RocketMQ:Consumer概述及启动流程与消息拉取源码分析
文章目录 Consumer 概述 消费者核心类 消费者启动流程 消息拉取 PullMessageService实现机制 ProcessQueue实现机制 消息拉取基本流程 客户端发起消息拉取请求 消息 ...
- 从源码分析RocketMQ系列-消息拉取PullMessageProcessor详解
导语 在之前的分析中分析了关于SendMessageProcessor,并且提供了对应的源码分析分析对于消息持久化的问题,下面来看另外一个PullMessageProcessor,在RocketM ...
- RocketMQ源码(十七)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码
转载来源: RocketMQ源码(19)-Broker处理DefaultMQPushConsumer发起的拉取消息请求源码[一万字]_刘Java的博客-CSDN博客 此前我们学习了RocketMQ源码 ...
- RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码【一万字】
基于RocketMQ release-4.9.3,深入的介绍了Broker处理DefaultMQPushConsumer发起的拉取消息请求源码. 此前我们学习了RocketMQ源码(18)-Defau ...
- RocketMq重试及消息不丢失机制
1.消息重试机制 由于MQ经常处于复杂的分布式系统中,考虑网络波动.服务宕机.程序异常因素,很有可能出现消息发送或者消费失败的问题.因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点.如果没有消 ...
最新文章
- 没有匹配 if 的非法 else_求求你,别再写这么多if...else...了
- C语言基本数据结构之四(线性,对分,二叉树查找及二叉树删除)
- C 语言编程 — 基本数据类型
- python libnum库安装使用方法
- 微信公众平台两种消息处理流程对比
- 一文解决new/delete与malloc/free相关问题:区别?注意事项?使用方式?
- eclipse 中 安装git 插件和使用git 插件
- 关于PHPExcel 导出下载表格,调试器响应乱码
- smartSVN 分支与合并
- C++数据结构01--顺序线性表实现
- 转:OWASP发布Web应用程序的十大安全风险
- mac java安全_关于 Java for Mac OS X 10.4 发行版 7 的安全性内容
- HTML快速上手教程
- 记休产假前的一些杂想
- 微信公众号引流的平台有哪些?
- 21.pgsql中的执行计划explain
- OpenGL学习笔记--字体库freetype2、FTGL
- Elasticsearch的DSL搜索
- Zigbee 设置信道,PANID,发射功率现对z-stack里几个网络参数的设置以及如何获取总结一下。
- Python 下opencv 应用: 摄像头参数设置
热门文章
- “许巍日”新歌提前曝光 《爱如少年》10/15温暖登场!
- vuejs构建的单页面应用history模式子页面微信分享在iOS中遇到的问题
- Windows 8 Release Preview 安装秘技两则
- 史上最强福利,阿里云半价+Plesk免费2万台速抢!
- 大数据科学认识与理解论坛全攻略
- [20161031]rman备份与数据文件OS块.txt
- android studio 导入第三方库的记录
- TestNG之注解的生命周期
- JAVA设计模式--辛格尔顿
- MySQL主从复制(Master-Slave)与读写分离(MySQL-Proxy)实践