在进行项目开发的过程中,需要进行阶段性的项目进行整理和汇总;

在进行MQ的使用的时候,使用RocketMq进行相关的开发的过程中,需要对MQ的用法进行整体的总结和相关的操作。MQ 顾明司仪就是消息队列,整个消息队列在项目的耦合过程中,整个RocketMq和RabbitMq还是有本质的差距的,在进行相关的MQ对比的时候其实之前一直对RocketMq了解的不是很清楚,在见相关的使用的时候,一直认为这两个MQ的差距不是很大,其实差距是非常明显的,就整个MQ的使用规范来说,RocketMq是没有交换机的概念的

MQ 基础概念

消息模型(Message Model) RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息, Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。 ConsumerGroup 由多个Consumer 实例构成。

消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异 步方式均需要Broker返回确认信息,单向发送不需要。 生产者中,会把同一类Producer组成一个集合,叫做生产者组,这类Producer发送同一类消息且发送 逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组 的其他生产者实例以提交或回溯消费。

消息消费者(Consumer) 负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提 供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。 拉取式消费的应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控 制。一旦获取了批量消息,应用就会启动消费过程。 推动式消费模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。 消费者同样会把同一类Consumer组成一个集合,叫做消费者组,这类Consumer通常消费同一类消息 且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的 是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费 (Clustering)和广播消费(Broadcasting)。 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

主题(Topic) 表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息 订阅的基本单位。 同一个Topic下的数据,会分片保存到不同的Broker上,而每一个分片单位,就叫做MessageQueue。 MessageQueue是生产者发送消息与消费者消费消息的最小单位。

代理服务器(Broker Server) 消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的 消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、 消费进度偏移和主题和队列消息等。 Broker Server是RocketMQ真正的业务核心,

包含了多个重要的子模块: Remoting Module:整个Broker的实体,负责处理来自clients端的请求。

Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息 Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。

HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。

Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快 速查询。 而Broker Server要保证高可用需要搭建主从集群架构。

RocketMQ中有两种Broker架构模式: 普通集群: 这种集群模式下会给每个节点分配一个固定的角色,master负责响应客户端的请求,并存储消息。 slave则只负责对master的消息进行同步保存,并响应部分客户端的读请求。消息同步方式分为同步同 步和异步同步。 这种集群模式下各个节点的角色无法进行切换,也就是说,master节点挂了,这一组Broker就不可用 了。

Dledger高可用集群: Dledger是RocketMQ自4.5版本引入的实现高可用集群的一项技术。这个模式下的集群会随机选出一个 节点作为master,而当master节点挂了后,会从slave中自动选出一个节点升级成为master。

Dledger技术做的事情:1、接管Broker的CommitLog消息存储 2、从集群中选举出master节点 3、完 成master节点往slave节点的消息同步。

生产者为什么也有组的概念,生产者就是发消息,搞一个组的概念,基本就是满足事务消息机制,生产者如果只有一个,事务消息的流程就断了,一个生产者挂了就没办法回查,生产者组的作用,有了组的概念,就保证了容错的几率

名字服务(Name Server) 名称服务充当路由消息的提供者。Broker Server会在启动时向所有的Name Server注册自己的服务信 息,并且后续通过心跳请求的方式保证这个服务信息的实时性。生产者或消费者能够通过名字服务查找 各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。 这种特性也就意味着NameServer中任意的节点挂了,只要有一台服务节点正常,整个路由服务就不会 有影响。当然,这里不考虑节点的负载情况。

消息(Message) 消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题Topic。 RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过 Message ID和Key查询消息的功能。 并且Message上有一个为消息设置的标志,Tag标签。用于同一主题下区分不同类型的消息。来自同一 业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰 度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻 辑,实现更好的扩展性。

MQ的作用主要有以下三个方面:

异步 例子:快递员发快递,直接到客户家效率会很低。引入菜鸟驿站后,快递员只需要把快递放到菜鸟 驿站,就可以继续发其他快递去了。客户再按自己的时间安排去菜鸟驿站取快递。 作用:异步能提高系统的响应速度、吞吐量。快递员就是生产者,客户就是消费者,快递就是消息

解耦 例子:《Thinking in JAVA》很经典,但是都是英文,我们看不懂,所以需要编辑社,将文章翻译 成其他语言,这样就可以完成英语与其他语言的交流。

作用: 1、服务之间进行解耦,才可以减少服务之间的影响。提高系统整体的稳定性以及可扩展性。 2、另外,解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消 费,并且消费者的增加或者减少对生产者没有影响。

削峰 例子:长江每年都会涨水,但是下游出水口的速度是基本稳定的,所以会涨水。引入三峡大坝后, 可以把水储存起来,下游慢慢排水。 作用:以稳定的系统资源应对突发的流量冲击。

MQ的优缺点

上面MQ的所用也就是使用MQ的优点。

但是引入MQ也是有他的缺点的:

系统可用性降低 系统引入的外部依赖增多,系统的稳定性就会变差。一旦MQ宕机,对业务会产生影响。这就需要考虑 如何保证MQ的高可用。

系统复杂度提高 引入MQ后系统的复杂度会大大提高。以前服务之间可以进行同步的服务调用,引入MQ后,会变为异 步调用,数据的链路就会变得更复杂。并且还会带来其他一些问题。比如:如何保证消费不会丢失?不 会被重复调用?怎么保证消息的顺序性等问题。

消息一致性问题 A系统处理完业务,通过MQ发送消息给B、C系统进行后续的业务处理。如果B系统处理成功,C系统处 理失败怎么办?这就需要考虑如何保证消息数据处理的一致性。

几大MQ产品特点比较

RocketMq的架构布局特点:

RocketMq的集群架构

1、Broke配置

2、客户端的公共配置

3、Produce配置

4、PushConsumer配置

5、PullConsumer配置

6、 Message数据结构

在RocketMq中的NameServrClustr 这个就类似于Zookeeper,Broker是实际处理业务的核心组件,实际处理消息的节点,在进行对应的列表的订阅的时候会从对应的NameServr中取拿取对应的列表,然后再去和Broker进行交互;

RocketMq的编程模型

RocketMQ的生产者和消费者的编程模型都是有个比较固定的步骤的,

掌握这个固定的步骤

消息发送者的固定步骤

1.创建消息生产者producer,并制定生产者组名

2.指定Nameserver地址

3.启动producer

4.创建消息对象,指定主题Topic、Tag和消息体

5.发送消息

6.关闭生产者producer

消息消费者的固定步骤

1.创建消费者Consumer,制定消费者组名

2.指定Nameserver地址

3.订阅主题Topic和Tag

4.设置回调函数,处理消息

5.启动消费者consumer

消息的发送方式

基本样例

基本样例部分我们使用消息生产者分别通过三种方式发送消息

,同步发送、异步发送以及单向发送。 然后使用消费者来消费这些消息。

单向发送消息是没有消息的返回值和状态的,主要是用的API是sendOneWay();

同步发送,是这生产者往MQ发送消息,消息有没有发送到MQ,生产者二回进行重试;调用的是Send方法,会返回一个SendResult()的方法状态

  DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//        producer.setNamesrvAddr("192.168.232.128:9876");producer.start();for (int i = 0; i < 20; i++)try {{Message msg = new Message("TopicTest","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));//同步传递消息,消息会发给集群中的一个Broker节点。
//                    SendResult sendResult = producer.send(msg);
//                    System.out.printf("%s%n", sendResult);producer.sendOneway(msg);}} catch (Exception e) {e.printStackTrace();}producer.shutdown();}

异步发送,生产者,往Mq发送消息之后,后面会继续处理自己的业务,但是会给MQ 一个回调函数,就是MQ 执行完之后,自己主动去回去调用生产者的回调函数,什么时候执行这个回调,生产者就不处理和不控制了

//简单样例:异步发送消息
public class AsyncProducer {public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
//        producer.setNamesrvAddr("192.168.232.128:9876");producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount = 100;//由于是异步发送,这里引入一个countDownLatch,保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。final CountDownLatch countDownLatch = new CountDownLatch(messageCount);for (int i = 0; i < messageCount; i++) {try {final int index = i;Message msg = new Message("TopicTest","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});System.out.println("消息发送完成");} catch (Exception e) {e.printStackTrace();}}countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();}
}

通常情况下,在进行客户端和服务端进行划分的时候,生产者往往是作为一个客户端进行处理的,而Broker是作为的是一个服务端进行处理的,由客户端往服务端发起一个请求,RocketMq同样的也需要往生产者进行请求,一般的回调就是Broker往生产者进行请求,Broker服务正常才可以进行回调,双向交互,没有固定位置,生产者既可以做客户端也可以做服务端,而Broker有可以做服务端或者客户端

消息的发送方式

=========================================================================

界面Tag分析

通过分析不同的brokerr中的集群就可以看到相关的Broker中的内容

在使用MQ进行发送的是Topic就是我们要发送的目标,内容就是hi对应的Body里面的消息体里面的内容,tags,key其实就是针对消息分类的信息,这个可以这样子进行下简单的理解,在进行相关的MQ的消息的区分的时候,需要进行相关的Topic,tag,keys进行区分,只有这种三级区分才可以进行下一步的进行划分操作,配置选项中 可以将 autocreatTopic自动创建Topic;同步发送和异步发送,SendOneWay是同步发送的是没有返回结果的,send 方法会返回一个sendResult,会返回一个结果,会判断当前的消息是否会发送成功过;

Send_Ok发送成功,同步发送,会等待Broker给的一个状态,在进行异步发送的时候,producer在进行执行produce.send(msg,new SendCallBack)会执行一个消息回调的方法,会提供一个SeendCallBack,CountDownLaunch,的作用,就是上面的回调没执行一次就会进行一次减1操作,这个就是你的所有的CountDownLaunch全部执行完了,这个CountDownLaunch的Produc就可以进行关闭了

同步发送,只是消息发送到Broker,至于消费不消费,生产者是不管的

消费者主动去Broker上拉取消息,Brokere主动推送给消费者,

=========================================================================

消息的接受方式

使用消费者消费消息。

消费者消费消息有两种模式,

一种是消费者主动去Broker上拉取消息的拉模式,

另一种是消费者等待 Broker把消息推送过来的推模式。

拉模式的样例见:org.apache.rocketmq.example.simple.PullConsumer

拉模式下,消费者主动去Broker主动拉取,RocketMq中没有交换机的概念,队列是消息的最小概念,在进行相关的消息的存储的时候直接就是放在了队列的里面,没有消息的交换机,为什么要设置offset,要把消息存起来,存起来offset,存到队列里面,才可也进行后续的消费和处理工作,


public class PullConsumer {private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.setNamesrvAddr("192.168.232.128:9876");consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");for (MessageQueue mq : mqs) {System.out.printf("Consume from the queue: %s%n", mq);SINGLE_MQ:while (true) {try {PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);System.out.printf("%s%n", pullResult);putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:break;case NO_MATCHED_MSG:break;case NO_NEW_MSG:break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}} catch (Exception e) {e.printStackTrace();}}}consumer.shutdown();}private static long getMessageQueueOffset(MessageQueue mq) {Long offset = OFFSE_TABLE.get(mq);if (offset != null)return offset;return 0;}private static void putMessageQueueOffset(MessageQueue mq, long offset) {OFFSE_TABLE.put(mq, offset);}}

推模式的样例见:org.apache.rocketmq.example.simple.PushConsumer

推模式,Broker往消费者去推送消息,注册消息监听,有Broker主动往这个方法来推,里面consumerMessaee方法中推去,消费者执行完,之后返回给Brokere一个状态,

public class PushConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
//        consumer.setNamesrvAddr("192.168.232.128:9876");consumer.subscribe("TopicTest", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//wrong time format 2017_0422_221800consumer.setConsumeTimestamp("20181109221800");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

推模式下的,消费者代码,这个主要是Consumer配置一个监听,消费者进行监听Broker里面的内容,当有消息来的时候就进行相关的监听的操作Broker将消息推给Consumer,Consumer没有放Broker上发送请求,

拉模式,是指Cosumer去对应的Broker上主动去拉取对应的信息,

从图上可知这个Topic是一个一个不同的队列,而不同的队列又是不部署在不同的Broker上,MessageQueue是订阅发布的最小单位,单位是分布在不同的Broker,

去遍历不同的MessagQueue,进行过滤,遍历完之后就可以拿到对应的Queues上的内容,拉消息,一次拉多消息,一次拉32条,offset.MessageQueue,记录一个偏移量,当队列中有很多消息的时候,上次从1开始进行记录的消费,下一次就不用重新开始消费了,从消费的历史消费节点往后的偏移量位置开始消费,offset就是记录消费的位置,

 =========================================================================

顺序消息

顺序消息的10个订单,都要进行指定顺序的消费,启动一个消费者进行顺序的消费,进行相关的消息的消费,启动两个消费者的实例,进行相关的消息的消费,可以看到对应的结果,两个消费诶者的消费清,顺序消息里面消费的取模运输,ListMessagQueuee,这个是当前所有消息的MessageeQueue所有的消息的消费队列,Msg代表当前的消息,这个select方法指的就是给这条消息找一个目标的MessageQueue,arg,就是传的参数,orderId,穿进去,取模,order为5,肯定在一个队列里面,在一个MQ里面,相同的OrderId,放在一个队列,消息是分散在不同的broker中的,MessageQueue,是存储消息的最小单位,顺序消息就是横着拿,并发的消息就是竖着拿,保证局部有序,不保证全局有序;需要注意的是10个订单是分散到两台机器上的

MESSAGEQUE,的选择器,选择将消息传到那个对应的 队列中,保证orderID的消息全部存储到同一个队列中,MessageQueue

public class Producer {public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
//            producer.setNamesrvAddr("192.168.232.128:9876");producer.start();for (int i = 0; i < 10; i++) {int orderId = i;for(int j = 0 ; j <= 5 ; j ++){Message msg =new Message("OrderTopicTest", "order_"+orderId, "KEY" + orderId,("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}
}

,RocketMq 的顺序消费只能保证局部有序,不能保证整体有序,例如聊天窗口里面的内容都是局部有序,根据什么来排序,发送消息有没有什么不同,业务参数,ordrderID,要指定同一个order执行指定的消费的顺序,在进行执行的时候需要指定的MessageQueueSelecter进行指定的顺序的安排和指定顺序的发展的方式,在对应的select方法里面取进行操作的步骤都有哪些,取到对应的orderId,然后再和对应的mqs.size()进行相关的取模操作,就可以拿到对应的索引的位置,就保证了同条orderID的消息保证存储到了同一条队列里面

orderId,算出来对应的orderId会放到同一个队列上去,如何保证从一个队列中一个队列中拿,消费者端注册一个MessageListnerOrderly的监听器就可以了,消费者端就可以帮你做这种事情,而另外一种MessageCurrntly这个就是从不同队列中拿的就是乱拿的,顺序有序需要生产者和消费者进行配合才可以进行有序的处理,包装全局有序,就保证一个topic里面就一个队列就可以了,顺序消费就是Rocket里面一个非常好用的场景。

Topic下面有很多MessagQueue,他做了那些事情呢,他吧下面所有Order相同的消息,有很多不同的order_Id的消息,监听如何保证顺序,到消费者端,还是按顺序来,不一定保证顺序有序,一次一次拿,指定一个队列一个队列,先把一个messagQueeue中的消息拿完,在哪一下一个messageQueue中的消息保证消费者端消息不会乱的,这样就保证局部有序了,一个队列一个队列来取,

所有的MQ 都是只能保证局部有序,局部有序和全局有序,保证去取消息的时候,一个队列一个队列的方式取去

顺序有序的去监听

 public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setNamesrvAddr("192.168.232.128:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.subscribe("OrderTopicTest", "*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);for(MessageExt msg:msgs){System.out.println("收到消息内容 "+new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});//        这样是保证不了最终消费顺序的。
//        consumer.registerMessageListener(new MessageListenerConcurrently() {
//            @Override
//            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//                for(MessageExt msg:msgs){
//                    System.out.println("收到消息内容 "+new String(msg.getBody()));
//                }
//                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//            }
//        });consumer.start();System.out.printf("Consumer Started.%n");}}

下面的Concurreently就不是顺序消息监听的,

consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});

广播消息

所有的 消费者都消费到,不管是不是集群,所有的Consumer之间共享,而不是值往对应的组里面共享


public class PushConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.setMessageModel(MessageModel.BROADCASTING);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Broadcast Consumer Started.%n");}
}

延时消息

延时消息不需要设置延时的工具,只需要设置对应的延时等级,延时级别就可以了

public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new
DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled
message " + i).getBytes());
// This message will be delivered to consumer 10 seconds
later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}

延时级别指的是,生产者要隔多久才能到消费者哪里进行消费,商业版可也任意定制时间,

在配置对应的Topic下面有很多主题,其中有一个系统主题

延时消息的消息队列等级

批量消息

批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞 吐量。 批量消息的消息生产者样例见:org.apache.rocketmq.example.batch.SimpleBatchProducer和 org.apache.rocketmq.example.batch.SplitBatchProducer

相信大家在官网以及测试代码中都看到了关键的注释:如果批量消息大于1MB就不要用一个批次 发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB 实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB。但 是使用批量消息时,这个消息长度确实是必须考虑的一个问题。而且批量消息的使用是有一定限 制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息 等。

  public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");producer.start();//If you just send messages of no more than 1MiB at a time, it is easy to use batch//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule supportString topic = "BatchTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));producer.send(messages);producer.shutdown();}

过滤消息:

在进行消息的对应的过滤的时候,需要进行相关的消息的,给每个Message进行过滤,subscribe进行过滤,

在大多数情况下,可以使用Message的Tag属性来简单快速的过滤信息。

使用Tag过滤消息的消息生产者案例见:org.apache.rocketmq.example.filter.TagFilterProducer

使用Tag过滤消息的消息消费者案例见:org.apache.rocketmq.example.filter.TagFilterConsumer

主要是看消息消费者。consumer.subscribe("TagFilterTest", "TagA || TagC"); 这句只订阅TagA 和TagC的消息。 TAG是RocketMQ中特有的一个消息属性。

RocketMQ的最佳实践中就建议,使用RocketMQ时, 一个应用可以就用一个Topic,

而应用中的不同业务就用TAG来区分。保证过滤的效率,

但是,这种方式有一个很大的限制,就是一个消息只能有一个TAG,这在一些比较复杂的场景就有点不 足了。 这时候,可以使用SQL表达式来对消息进行过滤。

事务消息:

针对RocketMq 的,只跟发送者,就是生产者有关,专有的事务生产者,指定一个事务监听器,

RocketMq 如果出现分布式事务的问题,事务消费机制,一个消息分布式的问题,事务消息的大概理解,事务消息保证了消息中间件一半的理解,分布式中的消息轨迹,开关打开完之后,消息轨迹,声明生产者的时候,消息轨迹的部分需要跟踪对应的消息,全链路监控,MQ,进行数据的监控,

SpringBoot整合MQ

在进行整合,都是由一个RocektMqTemplate进行发送的和监听的,指定对应的SpringBootConsumer消费者的信息,整合的消费者的注解有一个RocketMqMessagListener

@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode= ConsumeMode.CONCURRENTLY)
public class SpringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message : "+ message);}
}

SpringBoot集成RocketMq的精髓进行后续生产者进行开发的过程中,需要进行的

@Component
public class SpringProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String topic,String msg){this.rocketMQTemplate.convertAndSend(topic,msg);}public void sendMessageInTransaction(String topic,String msg) throws InterruptedException {String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 10; i++) {//尝试在Header中加入一些自定义的属性。Message<String> message = MessageBuilder.withPayload(msg).setHeader(RocketMQHeaders.TRANSACTION_ID,"TransID_"+i)//发到事务监听器里后,这个自己设定的TAGS属性会丢失。但是上面那个属性不会丢失。.setHeader(RocketMQHeaders.TAGS,tags[i % tags.length])//MyProp在事务监听器里也能拿到,为什么就单单这个RocketMQHeaders.TAGS拿不到?这只能去调源码了。.setHeader("MyProp","MyProp_"+i).build();String destination =topic+":"+tags[i % tags.length];//这里发送事务消息时,还是会转换成RocketMQ的Message对象,再调用RocketMQ的API完成事务消息机制。SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message,destination);System.out.printf("%s%n", sendResult);Thread.sleep(10);}}
}

在进行发送的时候会有一个conveerSendMessage,进行转换的时候,需要进行转换

总结:

SpringBoot 引入org.apache.rocketmq:rocketmq-spring-boot-starter依赖后,就可以通过内置 的RocketMQTemplate来与RocketMQ交互。相关属性都以rockemq.开头。 SpringBoot依赖中的Message对象和RocketMQ-client中的Message对象是两个不同的对象,

这在 使用的时候要非常容易弄错。例如RocketMQ-client中的Message里的TAG属性,在SpringBoot依 赖中的Message中就没有。Tag属性被移到了发送目标中,与Topic一起,以Topic:Tag的方式指 定。 最后强调一次,一定要注意版本。rocketmq-spring-boot-starter的更新进度一般都会略慢于 RocketMQ的版本更新,并且版本不同会引发很多奇怪的问题。

SpringCloudStream整合RocketMQ!!!!

SpringCloudStream是Spring社区提供的一个统一的消息驱动框架,目的是想要以一个统一的编程模 型来对接所有的MQ消息中间件产品。我们还是来看看SpringCloudStream如何来集成RocketMQ。

@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class ScRocketMQApplication {
public static void main(String[] args) {
SpringApplication.run(ScRocketMQApplication.class,args);
}
}

注意这个@EnableBinding({Source.class, Sink.class})注解,这是SpringCloudStream引入的 Binder配置。

进行配置之后的设置代码量更少,在进行设置的时候,MQTestController的设置

@RestController
@RequestMapping("/MQTest")
public class MQTestController {@Resourceprivate ScProducer producer;@RequestMapping("/sendMessage")public String sendMessage(String message){producer.sendMessage(message);return "消息发送完成";}
}

消费者的信息绑定Consumer进行相关的配置更加简单

@Component
public class ScConsumer {@StreamListener(Sink.INPUT)public void onMessage(String messsage) {System.out.println("received message:" + messsage + " from binding:" +Sink.INPUT);}
}

生产者的信息配置设置Producer

@Component
public class ScProducer {@Resourceprivate Source source;public void sendMessage(String msg){Map<String, Object> headers = new HashMap<>();headers.put(MessageConst.PROPERTY_TAGS, "testTag");MessageHeaders messageHeaders = new MessageHeaders(headers);Message<String> message = MessageBuilder.createMessage(msg, messageHeaders);this.source.output().send(message);}
}

这个source.output,之前前面都是组建一个消息,sourc是一个接口,什么都没有,比SpringBoot集成的方式更简单,

启动类的配置Application

@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class ScRocketMQApplication {public static void main(String[] args) {SpringApplication.run(ScRocketMQApplication.class,args);}
}

对应的source.class,Sink.class里面的内容进行查看可以看出来分别是一个input,output

public interface Sink {String INPUT = "input";@Input("input")SubscribableChannel input();
}public interface Source {String OUTPUT = "output";@Output("output")MessageChannel output();
}public interface Processor extends Source, Sink {
}public class DirectWithAttributesChannel extends DirectChannel {private final Map<String, Object> attributes = new HashMap();public DirectWithAttributesChannel() {}public void setAttribute(String key, Object value) {this.attributes.put(key, value);}public Object getAttribute(String key) {return this.attributes.get(key);}public String getBeanName() {return this.getComponentName();}public boolean subscribe(MessageHandler handler) {return this.getDispatcher().getHandlerCount() == 1 ? false : super.subscribe(handler);}
}

这个框架牛逼的地方如果需要更换对应的消息队列的配置,只需要进行修改对应的pom依赖,在进行相关的配置的修改,不需要改任何的业务代码,spring框架维护的,

列如换成你想用的依赖,

SpringCloudStream,把MQ抽象成了一个非常简单的模型,一个Binder就代表了与消息中间件产品的链接,Bindings,代表了与消息中间件沟通的通道,通过这个链接可以收消息和发消息,而所谓的Message,就是对应的Producee和Consumer就是看成了所谓的产品

Binders就是产品,Bindings就是通道,Meessag就是消息;

inputchannel通道代表的是收消息,outputchannel通道代表的是发消息

SpringCloudStream才是主流的配置,集中的处理业务

总结 关于SpringCloudStream。

这是一套几乎通用的消息中间件编程框架,例如从对接RocketMQ换到 对接Kafka,业务代码几乎不需要动,只需要更换pom依赖并且修改配置文件就行了。但是,由于 各个MQ产品都有自己的业务模型,差距非常大,所以使用使用SpringCloudStream时要注意业务 模型转换。并且在实际使用中,要非常注意各个MQ的个性化配置属性。

例如RocketMQ的个性化 属性都是以spring.cloud.stream.rocketmq开头,只有通过这些属性才能用上RocketMQ的延迟消 息、排序消息、事务消息等个性化功能。 SpringCloudStream是Spring社区提供的一套统一框架,但是官方目前只封装了kafka、kafka Stream、RabbitMQ的具体依赖。而RocketMQ的依赖是交由厂商自己维护的,也就是由阿里巴巴 自己来维护。

这个维护力度显然是有不小差距的。所以一方面可以看到之前在使用SpringBoot时 着重强调的版本问题,在使用SpringCloudStream中被放大了很多。

spring-cloud-starterstream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本还是4.4.0。这个 差距就非常大了。另一方面,RocketMQ这帮大神不屑于写文档的问题也特别严重, SpringCloudStream中关于RocketMQ的个性化配置几乎很难找到完整的文档。 总之,对于RocketMQ来说,SpringCloudStream目前来说还并不是一个非常好的集成方案。这方 面跟kafka和Rabbit还没法比。所以使用时要慎重。

项目总结---05(RocketMq使用对比)相关推荐

  1. 朱大能求职之旅-为什么使用消息队列?消息队列有什么优缺点?Kafka/Activemq/Rabbitmq/Rocketmq优缺点对比?

    1. 面试题 为什么使用消息队列啊?消息队列有什么优点和缺点啊?kafka.activemq.rabbitmq.rocketmq都有什么区别以及适合哪些场景? 2. 面试官心理分析 其实面试官主要是想 ...

  2. Kafka与RocketMQ的对比分析

    本文来说下Kafka与RocketMQ的对比分析 文章目录 概述 概述

  3. 项目开发周期与数据库设计对比

    项目开发周期与数据库设计对比 项目开发周期 数据库设计 需求分析 分析客户的业务和数据处理需求. 1.         收集信息(不怕多,但怕漏): 2.         标识对象: 3.       ...

  4. java ssm实现修改密码,SSM项目store_SSM_v1 05修改密码

    SSM项目store_SSM_v1 05修改密码 文章目录 1 点击修改密码跳转 1.1 前端加入注解/user/updatePwd 修改密码 1.2 跳转到密码修改页面 UserController ...

  5. 产品化软件开发与项目化软件开发的对比

    对比指标 产品化特点 项目化特点  主要定位  为产品使用目标群体提供有价值的服务,致力于提升产品的价值和服务水平 提供达到客户预期目标的整套解决方案,以客户的用户为中心,以项目客户的利益为核心  服 ...

  6. Scrum 项目 4.0-5.0-约教网站开发(一)

    ----------------------------------4.0----------------------------------------------- 一.项目任务 1.准备看板. ...

  7. 【SSH网上商城项目实战05】完成数据库的级联查询和分页

    上一节我们完成了EasyUI菜单的实现.这一节我们主要来写一下CategoryServiceImpl实现类,完成数据库的级联查询.一般项目从后往前做,先做service(我们没有抽取Dao,最后再抽取 ...

  8. java项目开发的工具选型对比,这10条建议你一定要关注!

    现在越来越多的项目要用报表工具,但国内市场上报表工具鱼龙混杂,很难只从一个方面就分出高低优劣,必须多方面比较. 在报表选型这方面我也吃过开源工具和第三方的亏,特意总结了十点经验,你可以参考下. 建议一 ...

  9. 集成底座项目典型数据下发方式对比说明

    随着企业信息化的不断发展.不断升级,越来越多的业务系统在满足企业业务发展的同时,往往又会成为信息化建设和业务操作上的瓶颈,无论是频繁进行业务系统切换,还是跨系统之间的基础数据的维护与打通,都难以应对企 ...

最新文章

  1. itoa函数的递归实现(二级指针实现)
  2. ARP欺骗原理与模拟
  3. 计算机视觉,基于skimage对图像阈值分割的学习
  4. CodeForces 572A,B,C
  5. JS的typeof力所能及已经力所不及
  6. python现在时间减去过去时间等于20分钟怎么写_获取当前时间减去10分钟的话SQL语句怎么写...
  7. icache的方面以及使用
  8. linux redis图形界面,linux安装redis和windows安装可视化工具
  9. mysql分层_MySql中的分层数据
  10. Redis实战(七)
  11. matlab晶闸管整流电路,整流电路MATLAB仿真实验
  12. 富士施乐m115b怎么连接电脑_富士施乐 Fuji Xerox DocuPrint M118w/M118z打印机无线连接设置详解...
  13. 从0到1搭建大数据平台之数据采集篇
  14. 新学两个新汇编指令:bic和orr
  15. 使用磁性霍尔传感器实现门锁报警
  16. MS-DOS系统的操作命令
  17. 软件测试顶岗实习实习总结
  18. 客户体验改善计划的用户注销通知导致服务器自动重启
  19. spring boot 引用 shiro 认证AD域
  20. 华为鸿蒙dba,人生中最重要的决策|读在职博士DBA

热门文章

  1. ZED-F9K使用:录制数据+地图查看
  2. 成人高考可以报计算机专业吗,没有基础可以报成人高考计算机专业吗
  3. 在Ubuntu系统上安装Windows 10(真实有效)
  4. 笔记31-JDBC连接池JDBCTemplate
  5. mysql数据库用户建立_Mysql数据库之创建用户
  6. WordPress主题-B2美化通用子主题商业运营版
  7. 杭州市民卡面试题【杭州多测师】【杭州多测师_王sir】
  8. 8.7 python 日学 线程进阶、协程
  9. 软件著作权人享有的权利有哪些
  10. 不能转正的开发实习,要不要去?