文章目录

  • 顺序消息的概念
  • 顺序消费的原理
  • 消费状态
  • 演示
    • Producer
    • Consumer
  • 代码


顺序消息的概念

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。

RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序


顺序消费的原理

在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);

而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。

但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。

当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。


消费状态

 package org.apache.rocketmq.client.consumer.listener;public enum ConsumeOrderlyStatus {/*** Success consumption*/SUCCESS,/*** Suspend current queue a moment* 不能跳过消息,等待一下*/SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

演示

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

Producer

package com.artisan.rocketmq.order;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-10 16:38* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class OrderedProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ordered_group_name");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();String[] tags = new String[]{"TagA", "TagC", "TagD"};// 订单列表List<OrderStep> orderList = buildOrders();Date date = new Date();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(date);for (int i = 0; i < 10; i++) {// 加个时间前缀String body = dateStr + " Hello RocketMQ "+ i + " " + orderList.get(i);Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,body.getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long id = (Long) arg;  //根据订单id选择发送queuelong index = id % mqs.size();return mqs.get((int) index);}}, orderList.get(i).getOrderId());//订单idSystem.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",sendResult.getSendStatus(),sendResult.getMessageQueue().getQueueId(),body));}producer.shutdown();}/*** 生成模拟订单数据*/private static List<OrderStep> buildOrders() {List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("购物车");orderList.add(orderDemo);return orderList;}/*** 订单的步骤*/private static class OrderStep {private long orderId;private String desc;public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}}
}

日志


Consumer

package com.artisan.rocketmq.order;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version v1.0* @create 2019-11-10 16:38* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class OrderedConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_group_name");consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");/*** 设置消费位置*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");// 有序消费 MessageListenerOrderlyconsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext context) {context.setAutoCommit(true);Random random = new Random();for (MessageExt msg : msgs) {// 可以看到每个queue有唯一的consume来消费, 订单对每个queue(分区)有序try {System.out.println("consumeThread=" + Thread.currentThread().getName()+ ", queueId=" + msg.getQueueId()+ ", content:" + new String(msg.getBody(),RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}try {//模拟业务逻辑处理中...TimeUnit.SECONDS.sleep(random.nextInt(10));} catch (Exception e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

运行日志


代码

请移步:https://github.com/yangshangwei/rocketmqMaster


RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage相关推荐

  1. rocketmq 初体验(二)AsyncProducer No name server address, please set it.

    AsyncProducer No name server address, please set it. 报错log No name server address, please set it. 原因 ...

  2. 全文搜索引擎Elasticsearch的初体验:基本概念和操作

    一.简介 关于Java Web的开发周边技术,搜索引擎也是经常被用到的,其中solr和es是被当作技术选型经常出现的,他们都是基于lucene,但是,你没法直接用 Lucene,必须自己写代码去调用它 ...

  3. 微信小程序初体验-苏州实时公交API

    利用聚合数据API快速写出小程序,过程简单. 1.申请小程序账号 2.进入开发 3.调用API.比如"苏州实时公交"小程序,选择的是苏州实时公交API. 苏州实时公交API文档:h ...

  4. ap接口 php_小白php API初体验 php api文档 php api接口开发 php web ap

    这里的php 写API其实就是指提供一个WebServiceWebSite : 1.以html格式响应返回 2.由用户通过浏览器来接入 WebService : 1.以json/Xml格式返回 2.由 ...

  5. Hadoop3——集群搭建以及初体验

    1. 匹配主机名 2.下载安装hadoop 3. 配置Hadoop环境 4. 启动Hadoop环境 5. Hadoop初体验 建议先整体浏览一遍再做 (关于创建虚拟机的操作日后有需要的话我再补上) 1 ...

  6. RocketMQ-初体验RocketMQ(06)-使用API操作RocketMQ ,理解RocketMQ的存储结构

    文章目录 RocketMQ集群基本信息 目标 知识预习 发送方式 发送结果 环境搭建 使用Java API操作RocketMQ-Simple Message Step1. pom.xml增加依赖和bu ...

  7. RocketMQ-初体验RocketMQ(01)_RocketMQ初体验

    文章目录 RocketMQ的由来 RocketMQ 版本 RocketMQ 基本概念 消息模型 消息生产者(producer) 消息消费者(Consumer) 主题(Topic) 代理服务器(Brok ...

  8. RocketMQ-初体验RocketMQ(05)_RocketMQ架构解读

    文章目录 Rocketmq整体架构 namesrv broker producer & consumer 通信方式 Rocketmq整体架构 RocketMQ-初体验RocketMQ(01)_ ...

  9. RocketMQ-初体验RocketMQ(03)_RocketMQ多机集群部署

    文章目录 环境准备 参考配置 集群搭建 130主机 broker 配置文件 启动namesrv 启动broker Master 和 broker Slave 131主机 broker 配置文件 启动n ...

最新文章

  1. python3安装步骤mac-Mac 安装Python3
  2. Boost:排序的bimap双图的测试程序
  3. 从编码层面对比java和c#
  4. 如何用手机打开dcm格式图片_压缩图片用这个神奇吧(电脑版)
  5. 第十三章 对文本进行排序、单一和重复操作:sort命令、uniq命令
  6. IDEA autowired校验关闭方法
  7. mysql中select后接数字_MySQL SELECT用于从一组数字中排列一个数字
  8. 文字超出显示....省略显示
  9. Java/JSP中使用JDBC连接SQL Server 2005~(2008类似)
  10. TensorFlow by Google CNN分类真实图片 Machine Learning Foundations: Ep #5 - Classifying real-world images
  11. visio2016专业版2018最新密钥和下载方法 整理
  12. 你还在用Rational Rose画图吗?来来来给你介绍一款开源免费上手容易的 BOUML UML画图工具。
  13. 服务器lsass状态代码c0000005,提示lsass.exe失败状态代码c0000005
  14. java 机器学习库_7个最好的Java机器学习开发库
  15. 《一舞醉红尘,一笑歌岁月》
  16. ghost服务器系统镜像文件,带RAID服务器能GHOST备份吗?
  17. CentOS7安装Pure-ftpd
  18. 红黑数和普通的二叉排序树有什么要求
  19. Linux初探之如何查看帮助文档自学命令
  20. Chromedriver适用谷歌浏览器的各个版本

热门文章

  1. 图像识别 43个模型
  2. jsp获取连接池的实时连接数_PHP进阶教程-实现一个简单的MySQL连接池
  3. Leetcode 剑指 Offer 04. 二维数组中的查找 (每日一题 20210727)
  4. python应用实战系列-一文教你深入解读word2vec
  5. Linux从入门到精通系列之线性表链式存储结构-单链表原理解析
  6. MATLAB实战系列(四)- LabVIEW初探
  7. 深度学习核心技术精讲100篇(十二)-DCGAN(对抗生成网络)算法应用及代码实现
  8. ClickHouse 在字节跳动广告场景的应用
  9. 在Tableau中实现表格下钻
  10. Elasticsearch 冷热集群架构实战