前言

这将是RocketMQ实战系列的最后一篇文章,该系列的文章列表如下:

《RocketMQ实战(一)》

《RocketMQ实战(二)》

《RocketMQ实战(三):分布式事务》

RocketMQ 3.2.6的事务机制

在上一篇博客中,已经知道RocketMQ 3.0.8是支持事务回查机制,但是在RocketMQ 3.2.6中取消了这个功能,下面我们继续以转账功能分析我们自己如何解决这个问题。

在正常情况下,当然没有问题,如果第五步(向MQ发送确认消息)出现失败,加上RocketMQ 3.2.6版本没有事务回查机制,就会导致这条转账消息,在A银行完成了操作,但是迟迟对B银行系统不可见!

用户U1从A银行系统转账给B银行系统的用户U2的处理过程如下:

第一步:A银行系统生成一条转账消息,以事务消息的方式写入RocketMQ,此时B银行系统不可见这条消息

第二步:写入MQ成功后,回调A银行系统,对T1,T2表进行操作(很显然需要是一个事务)

我们重点关注下T2表,这个表是用来干嘛的呢?每条转账消息都会在T2表中,该表有2个特殊的字段:status,updatetime。(用途会在后文详述)

第三步:完成第二步,接下来发送确认消息给MQ,如果这个确认消息发送成功,那么这条转账消息,将对B银行系统可见。然后B银行系统,会在一个事务中完成对t3,t5的操作。

如果发送确认消息给MQ失败的处理思路:

首先,B银行系统,有一个定时任务(比如说每隔1MIN执行一次),扫描表t5,取得一段时间内的数据,发送给A银行系统。要知道t5中的数据,必然是A银行系统成功处理并发送确认消息成功的转账数据。为什么要发送给A银行系统呢,其实就是为了找到那些发送确认消息失败的转账数据。那么怎么发给A银行系统呢,这个方式比较多,可以考虑在来一个Topic,也可以考虑Netty等。发送给A银行系统,其实就是为了更新t2表的status,updatetime。

这里有一个关键,如何“扫描表t5,取得一段时间内的数据”?这就是t4的作用,在t4中记录一个time字段,每次定时任务启动,先更新time(比如设定为当前系统时间,设置前的的时间为old),然后扫描出t5中大于这个old时间的转账数据,如此循环往复。

其次,A银行系统,也有一个定时任务(可以根据业务消费能力定,可以大一些),扫描t2表(指定status及updatetime条件),将那些确认消息发送失败的转账消息找出来,更新updatetime并发送给MQ。

这样,我们并没有改动RocketMQ 3.2.6的源码,而是在外围解决了事务回查!

其实到这里,你可以发现RocketMQ的一个特点,就是将生产者和MQ绑定,而不需要特别处理消费者,这是为什么呢?因为消息只要发往RocketMQ成功,那么就意味着成功,为什么这么说?

前面,我们说过,消费者端消费消息只会产生2种错误,第一:timeout,第二:exception。要知道RocketMQ对于超时,会不断重试;对于消费异常,会根据消费端的返回码,会有重试机制保证。也就是,RocketMQ一定会让消息得到消费,如果消费有问题,只能是消费者的问题,而不会是RocketMQ的问题!

Pull Or Push

在前面的博客已经提到,在RocketMQ中Consumer分为2类:Push Consumer、Pull Consumer。以前的例子都是Push Consumer,接下来,为大家介绍下Pull Consumer。

从表面意思上来看,好像Push是MQ推送给消费者,而Pull是消费者从MQ中拉取;其实本质上都是拉取模式PULL,即消费者从MQ中轮询取得消息。

在Push模式下,Consumer把轮询过程封装了,并注册了MessageListener监听器,取到消息后,唤醒MessageListener监听器中的consumeMessage()进行消费,所以给我们造成了感觉上好像是“推消息”。

在Pull模式下,需要特别注意的是,本质上是从一个Topic下的所有Queue进行拉取,而且每个Queue都必须记录拉取位置,否则会导致重复消费。还有拉取的时间间隔,拉取的大小等等。不过所有的这一切,MQPullConsumerScheduleService都替我们考虑清楚了,提供updateConsumeOffset去更新消费的队列的位置(默认5S同步一次),提供setPullNextDelayTimeMillis设置下次拉取的时间间隔(应该设置的大一些,至少大于5S)。

仔细回想下,对于Push方式的回调   和  Pull方式的回调,还有什么关键区别么?

对于Push而言,不论是基于MessageListenerConcurrently的,还是基于MessageListenerOrderly的,都有返回值的;而Pull的doPullTask的返回值却是void?

这意味,我们需要在pull方式中,注意自己处理每条消息消费的异常情况!

通过运行结果,可以印证上面的观点:为什么每次消费都是4条开始,4条结束呢?因为一个Topic下有4个Queue,而且上面的代码实际上会针对每个Queue开启一个线程去消费!

RocketMQ Filter组件介绍

对于ActiveMQ而言,我们可以通过JMS Selectors机制(就是类似于SQL的语法)来实现过滤,很easy。那么和RocketMQ Filter组件有什么区别呢?

虽然,2者都能实现过滤,但是RocketMQ Filter的性能要更高效些,因为RocketMQ是在broker上将过滤后的数据发往filter,然后消费者直接从filter上取得数据;而ActiveMQ是消费者直接在broker上进行过滤消费!(当然,对于RocketMQ而言,Tag机制已经足够应付日常绝大数的过滤功能,除非你的业务对性能有特别高的要求)

具体怎么做呢?这里我就不演示了,网上有很多例子,这里只说下大致的过程:

第一:broker-xxx.properties中指定filter个数

第二:上传一段JAVA代码,其实就是一个类

到这里,整个RocketMQ实战系列就结束呢,你学到了么,体会到RocketMQ的强大了么?

See u next blog!

本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1921757,如需转载请自行联系原作者

RocketMQ实战(四)相关推荐

  1. RocketMQ实战--大数据平台技术栈06

    回顾:大数据平台技术栈 (ps:可点击查看),今天就来说说其中的RocketMQ! 作者丨张丰哲 www.jianshu.com/p/3afd610a8f7d 阿里巴巴有2大核心的分布式技术,一个是O ...

  2. RocketMQ实战(一)

    阿里巴巴有2大核心的分布式技术,一个是OceanBase,另一个就是RocketMQ.在实际项目中已经领教过RocketMQ的强大,本人计划写一个RocketMQ实战系列,将涵盖RocketMQ的简介 ...

  3. RocketMQ 实战 集群监控平台搭建

    RocketMQ 实战 集群监控平台搭建 概述 RocketMQ有一个对其扩展的开源项目incubator-rocketmq-externals,这个项目中有一个子模块叫rocketmq-consol ...

  4. RocketMQ实战与原理---安装、部署及简单应用

    RocketMQ实战与原理 第一章  安装.部署及简单应用 1. 安装 1.1 下载RocketMQ 2. 部署 2.1 部署nameserver 2.2 部署broker 2.3 新增Topic 3 ...

  5. [python opencv 计算机视觉零基础到实战] 四、了解色彩空间及其详解

    一.学习目标 了解什么是色彩空间 了解opencv中色彩空间的转换 目录 [python opencv 计算机视觉零基础到实战] 一.opencv的helloworld [[python opencv ...

  6. 计算机视觉的深度学习实战四:图像特征提取

    更多精彩内容请关注微信公众号:听潮庭. 计算机视觉的深度学习实战四:图像特征提取 综述: 颜色特征 量化颜色直方图.聚类颜色直方图 几何特征 Edge,Corner,Blob 基于关键点的特征描述子 ...

  7. 《OpenCv视觉之眼》Python图像处理十九:Opencv图像处理实战四之通过OpenCV进行人脸口罩模型训练并进行口罩检测

    本专栏主要介绍如果通过OpenCv-Python进行图像处理,通过原理理解OpenCv-Python的函数处理原型,在具体情况中,针对不同的图像进行不同等级的.不同方法的处理,以达到对图像进行去噪.锐 ...

  8. 《RocketMQ实战专栏》为什么是你学习RocketMQ的最佳资料

    <RocketMQ实战与原理>专栏简介 简介 RocketMQ业界主流的消息中间件之一,承载公司核心业务消息的流转.对RocketMQ核心原理的理解与最佳实践成了开发与运维同学的必备技能. ...

  9. RocketMQ实战与原理解析

    网站 更多书籍点击进入>> CiCi岛 下载 电子版仅供预览及学习交流使用,下载后请24小时内删除,支持正版,喜欢的请购买正版书籍 电子书下载(皮皮云盘-点击"普通下载" ...

最新文章

  1. SpringBoot Actuator监控【转】
  2. 我们的2008。。。。。。
  3. 文巾解题 面试题 03.06. 动物收容所
  4. python str模块
  5. 百度全面开放搜索流量,进击的智能小程序!
  6. MySQL存储过程(一)——存储过程理论基础
  7. xml属于php还是js,xml是什么
  8. Ubuntu下,curl 安装
  9. SAS Marketing Automation 6.3 User’s Guide 学习笔记
  10. 你大学生活最美好的时刻是?
  11. pre-trained模型的使用
  12. 用ps增加照片的气氛--镜头光晕
  13. Html img显示图片的三种方式
  14. Liunx实现超级签名详细攻略(一)超级签名简介
  15. 有没有计算机网课,有没有电脑录制视频工具可以录制网课?
  16. pytorch GradScale 梯度缩放算子
  17. GPU Memory Problems in PyTorch(显卡爆炸与利用率不足)
  18. AD20设计规则检查设置(DRC检查设置)
  19. LVDS、FPD-Link/GMSL、MIPI的区别
  20. Radis 使用详细教程

热门文章

  1. java中值传递机制
  2. JavaScript获取样式值的几种方法学习总结
  3. 35-03沉浸式状态栏例子
  4. 通过UltraISO来提取U盘启动盘的ISO镜像文件
  5. python爬虫:Multipart/form-data POST文件上传详解
  6. BNUOJ 6038 - Reaux! Sham! Beaux!(模拟)
  7. Struts2 单个文件上传/多文件上传
  8. CLR中的IL、CTS和CLS总结
  9. How do use my library
  10. androidstudio 日历视图怎么显示农历_记事日历-记事与时间管理工具