RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage
文章目录
- 顺序消息的概念
- 顺序消费的原理
- 消费状态
- 演示
- Producer
- Consumer
- 代码
顺序消息的概念
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。
RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理
在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);
而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。
但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。
当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
消费状态
package org.apache.rocketmq.client.consumer.listener;public enum ConsumeOrderlyStatus {/*** Success consumption*/SUCCESS,/*** Suspend current queue a moment* 不能跳过消息,等待一下*/SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
演示
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
Producer
package com.artisan.rocketmq.order;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-10 16:38* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class OrderedProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ordered_group_name");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();String[] tags = new String[]{"TagA", "TagC", "TagD"};// 订单列表List<OrderStep> orderList = buildOrders();Date date = new Date();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(date);for (int i = 0; i < 10; i++) {// 加个时间前缀String body = dateStr + " Hello RocketMQ "+ i + " " + orderList.get(i);Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,body.getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long id = (Long) arg; //根据订单id选择发送queuelong index = id % mqs.size();return mqs.get((int) index);}}, orderList.get(i).getOrderId());//订单idSystem.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",sendResult.getSendStatus(),sendResult.getMessageQueue().getQueueId(),body));}producer.shutdown();}/*** 生成模拟订单数据*/private static List<OrderStep> buildOrders() {List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("购物车");orderList.add(orderDemo);return orderList;}/*** 订单的步骤*/private static class OrderStep {private long orderId;private String desc;public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}}
}
日志
Consumer
package com.artisan.rocketmq.order;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version v1.0* @create 2019-11-10 16:38* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class OrderedConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_group_name");consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");/*** 设置消费位置*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");// 有序消费 MessageListenerOrderlyconsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext context) {context.setAutoCommit(true);Random random = new Random();for (MessageExt msg : msgs) {// 可以看到每个queue有唯一的consume来消费, 订单对每个queue(分区)有序try {System.out.println("consumeThread=" + Thread.currentThread().getName()+ ", queueId=" + msg.getQueueId()+ ", content:" + new String(msg.getBody(),RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}try {//模拟业务逻辑处理中...TimeUnit.SECONDS.sleep(random.nextInt(10));} catch (Exception e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}
运行日志
代码
请移步:https://github.com/yangshangwei/rocketmqMaster
RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage相关推荐
- rocketmq 初体验(二)AsyncProducer No name server address, please set it.
AsyncProducer No name server address, please set it. 报错log No name server address, please set it. 原因 ...
- 全文搜索引擎Elasticsearch的初体验:基本概念和操作
一.简介 关于Java Web的开发周边技术,搜索引擎也是经常被用到的,其中solr和es是被当作技术选型经常出现的,他们都是基于lucene,但是,你没法直接用 Lucene,必须自己写代码去调用它 ...
- 微信小程序初体验-苏州实时公交API
利用聚合数据API快速写出小程序,过程简单. 1.申请小程序账号 2.进入开发 3.调用API.比如"苏州实时公交"小程序,选择的是苏州实时公交API. 苏州实时公交API文档:h ...
- ap接口 php_小白php API初体验 php api文档 php api接口开发 php web ap
这里的php 写API其实就是指提供一个WebServiceWebSite : 1.以html格式响应返回 2.由用户通过浏览器来接入 WebService : 1.以json/Xml格式返回 2.由 ...
- Hadoop3——集群搭建以及初体验
1. 匹配主机名 2.下载安装hadoop 3. 配置Hadoop环境 4. 启动Hadoop环境 5. Hadoop初体验 建议先整体浏览一遍再做 (关于创建虚拟机的操作日后有需要的话我再补上) 1 ...
- RocketMQ-初体验RocketMQ(06)-使用API操作RocketMQ ,理解RocketMQ的存储结构
文章目录 RocketMQ集群基本信息 目标 知识预习 发送方式 发送结果 环境搭建 使用Java API操作RocketMQ-Simple Message Step1. pom.xml增加依赖和bu ...
- RocketMQ-初体验RocketMQ(01)_RocketMQ初体验
文章目录 RocketMQ的由来 RocketMQ 版本 RocketMQ 基本概念 消息模型 消息生产者(producer) 消息消费者(Consumer) 主题(Topic) 代理服务器(Brok ...
- RocketMQ-初体验RocketMQ(05)_RocketMQ架构解读
文章目录 Rocketmq整体架构 namesrv broker producer & consumer 通信方式 Rocketmq整体架构 RocketMQ-初体验RocketMQ(01)_ ...
- RocketMQ-初体验RocketMQ(03)_RocketMQ多机集群部署
文章目录 环境准备 参考配置 集群搭建 130主机 broker 配置文件 启动namesrv 启动broker Master 和 broker Slave 131主机 broker 配置文件 启动n ...
最新文章
- python3安装步骤mac-Mac 安装Python3
- Boost:排序的bimap双图的测试程序
- 从编码层面对比java和c#
- 如何用手机打开dcm格式图片_压缩图片用这个神奇吧(电脑版)
- 第十三章 对文本进行排序、单一和重复操作:sort命令、uniq命令
- IDEA autowired校验关闭方法
- mysql中select后接数字_MySQL SELECT用于从一组数字中排列一个数字
- 文字超出显示....省略显示
- Java/JSP中使用JDBC连接SQL Server 2005~(2008类似)
- TensorFlow by Google CNN分类真实图片 Machine Learning Foundations: Ep #5 - Classifying real-world images
- visio2016专业版2018最新密钥和下载方法 整理
- 你还在用Rational Rose画图吗?来来来给你介绍一款开源免费上手容易的 BOUML UML画图工具。
- 服务器lsass状态代码c0000005,提示lsass.exe失败状态代码c0000005
- java 机器学习库_7个最好的Java机器学习开发库
- 《一舞醉红尘,一笑歌岁月》
- ghost服务器系统镜像文件,带RAID服务器能GHOST备份吗?
- CentOS7安装Pure-ftpd
- 红黑数和普通的二叉排序树有什么要求
- Linux初探之如何查看帮助文档自学命令
- Chromedriver适用谷歌浏览器的各个版本
热门文章
- 图像识别 43个模型
- jsp获取连接池的实时连接数_PHP进阶教程-实现一个简单的MySQL连接池
- Leetcode 剑指 Offer 04. 二维数组中的查找 (每日一题 20210727)
- python应用实战系列-一文教你深入解读word2vec
- Linux从入门到精通系列之线性表链式存储结构-单链表原理解析
- MATLAB实战系列(四)- LabVIEW初探
- 深度学习核心技术精讲100篇(十二)-DCGAN(对抗生成网络)算法应用及代码实现
- ClickHouse 在字节跳动广告场景的应用
- 在Tableau中实现表格下钻
- Elasticsearch 冷热集群架构实战