一.并发消费使用示例:

public class BalanceComuser {public static void main(String[] args) throws Exception {// 实例化消息生产者,指定组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");// 指定Namesrv地址信息.consumer.setNamesrvAddr("127.0.0.1:9876");// 订阅Topicconsumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {try {for(MessageExt msg : msgs) {String topic = msg.getTopic();String msgBody = new String(msg.getBody(), "utf-8");String tags = msg.getTags();System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);}} catch (Exception e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消息者consumer.start();System.out.printf("Consumer Started.%n");}
}

简单回顾一下,上面就是我们开发者使用消费者去消费RocketMQ中的消息的一个简单例子,而我们重点的业务逻辑通常就是写在消费回调函数中

二.源码分析并发消费服务ConsumeMessageConcurrentlyService

如果我们是使用MessageListenerConcurrently这个消费监听回调的话,那么在消费者启动的时候,内部会对应创建一个消费服务,这个消费服务就是ConsumeMessageConcurrentlyService,下面我们来看下这个并发消费服务的源码

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest

该方法就是拉取消息服务拉取到消息之后,会把消息提交到并发消费服务,此时就会调用到这个方法,其中参数一就是待消费的消息,参数二是拉取mq对应的ProcessQueue,参数三是拉取的mq对象,参数四顺序消费才会用到

/*** 提交消费请求任务给消费线程池* @param msgs* @param processQueue* @param messageQueue* @param dispatchToConsume*/
@Override
public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {// 并发消费线程池中每一个线程最多消费多少条消息,默认1条final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();// 如果拉取到的消息数量 <= 每一个线程最多消费的消息数量,那么直接就创建一个消费请求任务放到并发消费线程池中执行就可以了if (msgs.size() <= consumeBatchSize) {// 创建消费请求任务ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {// 把消费请求任务放到并发消费线程池中this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {this.submitConsumeRequestLater(consumeRequest);}}// 否则如果拉取到的消息数量 > 每一个线程最多消费的消息数量,那么对待消费的消息进行分批放入到并发消费线程池中else {for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}this.submitConsumeRequestLater(consumeRequest);}}}
}

可以看到首先会将待消费的消息按照consumeMessageBatchMaxSize的大小去进行切分成数量相同的消息集合,然后对每一个集合都创建一个消费请求任务,接着就把消费任务放到并发消费线程池中

我们来看下这个消费请求任务

/*** 消费请求任务*/
class ConsumeRequest implements Runnable {/*** 要消费的消息集合*/private final List<MessageExt> msgs;/*** 消费消息的队列快照*/private final ProcessQueue processQueue;/*** 消费消息所属的mq*/private final MessageQueue messageQueue;public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {this.msgs = msgs;this.processQueue = processQueue;this.messageQueue = messageQueue;}public List<MessageExt> getMsgs() {return msgs;}public ProcessQueue getProcessQueue() {return processQueue;}@Overridepublic void run() {// 如果队列被dropped,那么直接返回,不再进行消费if (this.processQueue.isDropped()) {log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());ConsumeMessageContext consumeMessageContext = null;// 执行hookif (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {// 对每一个消息设置 一个开始消费时间MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}// 消费回调方法,执行具体消费业务逻辑...status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);// 消费回调抛出异常,hasException = truehasException = true;}// 计算消费的消耗时间long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}}// 如果一批消息的消费时间 > 15分钟,则returnType == ConsumeReturnType.TIME_OUTelse if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;}// 如果回调接口返回的状态是RECONSUME_LATER,那么returnType = ConsumeReturnType.FAILEDelse if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {returnType = ConsumeReturnType.FAILED;}// 如果回调接口返回的状态是CONSUME_SUCCESS,那么returnType = ConsumeReturnType.SUCCESSelse if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}// 如果回调接口返回null,或者回调方法抛异常了,那么status = ConsumeConcurrentlyStatus.RECONSUME_LATERif (null == status) {log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);if (!processQueue.isDropped()) {// 处理消费结果ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}public MessageQueue getMessageQueue() {return messageQueue;}}

可以看到这个消费请求任务就是一个runnable对象,所以原理就是通过将待消费的一批消息切分成数量相同的多批消息,然后切分后的每一批消息都会创建一个消费请求任务让并发消费线程池去进行消费,而并发消费线程池是如何消费这一批批的消息的呢,我们来看它的run方法:

1.判断消费的mq是否已经被dropped了,如果已经被dropped,那么就return,停止消费

2.创建消费上下文对象,执行消费前置钩子方法,并且填充一部分信息到ConsumeMessageContext中

3.对每一个待消费的消息设置一个开始消费时间,然后执行监听消息的回调方法,也就是用户自己的业务逻辑

4.根据监听消息回调方法的返回值去得到returnType的值,并且执行消费后置钩子方法,继续填充ConsumeMessageContext对象

5.执行processConsumeResult方法进行消费结果的处理

上面的步骤比较重要的就是第5点对消费结果的处理,所以直接来到processConsumeResult方法

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult

public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest
) {// 默认是Integer.MAX_VALUE,该值需要配合设置批量消费去使用int ackIndex = context.getAckIndex();if (consumeRequest.getMsgs().isEmpty())return;switch (status) {// 消费成功case CONSUME_SUCCESS:// 对于批量消费,如果用户设置的ackIndex大于批量消费消息数,那么ackIndex = 消费数 - 1if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}int ok = ackIndex + 1;int failed = consumeRequest.getMsgs().size() - ok;// 统计消费成功的消息数量this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);// 统计消费失败的消息数量this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;// 消费失败,ackIndex = -1case RECONSUME_LATER:ackIndex = -1;this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size());break;default:break;}switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;case CLUSTERING:List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());// 这里有两种情况:// 1.在回调方法中返回了RECONSUME_LATER,表示此次消费失败,那么无论是单条消费还是批量消费都会对所有的消息进行回退// 2.在回调方法中返回了CONSUME_SUCCESS,表示此次消费成功,那么对于单条消费来说是不会对这条消息进行回退的,//   但是如果是批量消费,并且指定了ackIndex,就算是返回了CONSUME_SUCCESS,也会对索引的消息进行回退// 举个例子,如果用户设置了批量消费 3 条数据,回调方法的返回值是CONSUME_SUCCESS,ackIndex = 0// 第一次遍历 i = 0 + 1 = 1, 1 < 3?,条件成立,所以索引为0的消息就需要进行消息回退,i++// 第二次遍历 i = 1 + 1 = 2, 2 < 3?,条件成立,所以索引为1的消息就需要进行消息回退,i++// 第三次遍历 i = 2 + 1 = 3, 3 < 3?,条件不成立,所以索引为2的消息不需要进行消息回退,跳出循环// 也就是说对于批量消费,ackIndex的意思就是该索引本身及(从0开始)之后的消息都需要进行消息回退for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);// 向broker发起消息回退请求// 什么是消息回退?当消费失败之后,消费者会重新向broker发送一个延迟消息,当该消息到达到期时间的时候就又会被消费者所重新消费,到达了消费重试的目的boolean result = this.sendMessageBack(msg, context);// 请求失败,重试if (!result) {// 消息重消费次数 + 1msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);// 消息加入到响应失败集合msgBackFailed.add(msg);}}// 把响应失败的消息从consumeRequest中移除if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);// 把响应失败的消息延迟5s后重新放到消费服务线程中进行再次消费this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}// 把被真正消费成功的消息从msgTreeMap中移除,怎样才算真正的消费成功? 这里消费成功的 或者消费失败但是消息回退成功都算是真正的消费成功// 如果移除完msg之后msgTreeMap已经没有数据了,那么返回offset就等于当前ProcessQueue最大偏移量 + 1, 反之返回的offset就等于当前ProcessQueue最小偏移量long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {// 更新本地内存中该mq的已消费偏移量this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}
}

首先就是会根据用户在监听消费回调方法的返回值去进行不同的逻辑判断,这里用户能够返回的枚举只有两个,一个是CONSUME_SUCCESS表示消费成功,另一个是RECONSUME_LATER表示消费失败,可能需要重消费。我们先来看CONSUME_SUCCESS的处理

case CONSUME_SUCCESS:// 对于批量消费,如果用户设置的ackIndex大于批量消费消息数,那么ackIndex = 消费数 - 1if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}int ok = ackIndex + 1;int failed = consumeRequest.getMsgs().size() - ok;// 统计消费成功的消息数量this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);// 统计消费失败的消息数量this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;

如果消费成功,那么ackIndex就等于待消费的msg长度 -1(默认ackIndex == Integer.MAX_VALUE),而这个ackIndex有什么用呢?我们跳到下面的代码

for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);// 向broker发起消息回退请求// 什么是消息回退?当消费失败之后,消费者会重新向broker发送一个延迟消息,当该消息到达到期时间的时候就又会被消费者所重新消费,到达了消费重试的目的boolean result = this.sendMessageBack(msg, context);// 请求失败,重试if (!result) {// 消息重消费次数 + 1msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);// 消息加入到响应失败集合msgBackFailed.add(msg);}
}// 把响应失败的消息从consumeRequest中移除
if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);// 把响应失败的消息延迟5s后重新放到消费服务线程中进行再次消费this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}

可以看到如果ackIndex = msg的长度 - 1,那么这个for循环就不会进来了,同样的我们看RECONSUME_LATER的处理

case RECONSUME_LATER:ackIndex = -1;this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size());break;

当消费回调接口返回RECONSUME_LATER的时候,ackIndex直接就等于-1了,而当ackIndex == -1的时候,上面的for循环就能把每一个待消费的msg都遍历一遍,那么这里for循环中都对每一个msg做什么呢?答案就是去做消息的回退,也就是消费者会把消费失败的消息当做一个延迟消息发送给broker,之后当延时时间结束之后消费者就能够从broker中再次拿到这个消息进行重消费,所以这就是为什么返回值叫RECONSUME_LATER,而不是CONSUME_FAIL了吧?因为消费失败之后,消费者是不会就此罢休的,而是通过延迟消息的方式去再次对消息进行重消费。这里我们先不去看消息回退的如何实现的,后面再说~,紧接着就是需要去更新消费进度了

// 把被真正消费成功的消息从msgTreeMap中移除,怎样才算真正的消费成功? 这里消费成功的 或者消费失败但是消息回退成功都算是真正的消费成功
// 如果移除完msg之后msgTreeMap已经没有数据了,那么返回offset就等于当前ProcessQueue最大偏移量 + 1, 反之返回的offset就等于当前ProcessQueue最小偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {// 更新本地内存中该mq的已消费偏移量this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}

先从ProcessQueue中移除掉已经消费的消息,这里需要注意的是,消息回退失败的那些消息是不会被移除的,只有消费成功的,或者消费重试并且消息回退成功的消息才能从ProcessQueue中被移除,我们看一下移除的逻辑org.apache.rocketmq.client.impl.consumer.ProcessQueue#removeMessage

/*** 从msgTreeMap中移除消息* @param msgs  要移除的msg* @return  如果移除完msg之后msgTreeMap已经没有数据了,那么返回offset就等于当前ProcessQueue最大偏移量 + 1, 反之返回的offset就等于当前ProcessQueue最小偏移量*/
public long removeMessage(final List<MessageExt> msgs) {long result = -1;final long now = System.currentTimeMillis();try {// 加写锁this.lockTreeMap.writeLock().lockInterruptibly();this.lastConsumeTimestamp = now;try {if (!msgTreeMap.isEmpty()) {// 如果移除完msg之后,msgTreeMap已经为空了,那么就返回ProcessQueue中最大偏移量 + 1result = this.queueOffsetMax + 1;int removedCnt = 0;// 把消息从msgTreeMap中删除,并且更新msgSize,msgCountfor (MessageExt msg : msgs) {MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());if (prev != null) {removedCnt--;msgSize.addAndGet(0 - msg.getBody().length);}}msgCount.addAndGet(removedCnt);// 如果移除完了msg之后,msgTreeMap不为空,那么就返回ProcessQueue中最小偏移量if (!msgTreeMap.isEmpty()) {result = msgTreeMap.firstKey();}}} finally {this.lockTreeMap.writeLock().unlock();}} catch (Throwable t) {log.error("removeMessage exception", t);}return result;
}

可以看到当移除了这部分消息之后,如果ProcessQueue中已经没有消息了,那么就返回ProcessQueue中最大偏移量 + 1,反之如果processQueue还有消息,就返回ProcessQueue最小偏移量

if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {// 更新本地内存中该mq的已消费偏移量this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}

拿到要更新的消费进度之后,就通过消费进度组件去进行更新了,这里我们只针对集群模式来说,在集群模式下消费者本地先会保存消费进度,然后消费者启动的时候会启动一个定时任务去把本地内存中保存的消费进度发送到broker端去进行持久化,而根据上面返回的消费进度逻辑来看,是有可能会造成消费的重复消费的,因为消费的时候是线程池多线程去消费的,也就是一堆顺序的消息在经过多线程去执行消费的时候,有可能是会造成后面的消息先消费完然后就去更新消费进度,此时ProcessQueue还有着前面的消息(还没消费完),那么就会返回前面第一条消息的偏移量并且还会持久化到broker中,所以当有该消费者组的其他消费者实例分配到该mq去消费的时候,从broker端拿到的该mq已消费偏移量就是之前正在消费的消息了,也就是造成了重复消费。

如何提高并发消费的速度?

可以增加并发消费线程池的线程数量,修改consumeThreadMin以及consumeThreadMax;默认并发消费时每一个线程每次只会消费一个消息,所以我们也可以修改参数使得每一个线程每一次消费一批消息,通过修改consumeMessageBatchMaxSize,并且使用了批量消费之后,还可以通过context.setAckIndex()方法去指定这批消息中哪些消息是消费失败需要被重试。

RocketMQ之消费者并发消费源码解析相关推荐

  1. java 并发框架源码_Java并发编程高阶技术-高性能并发框架源码解析与实战

    Java并发编程高阶技术-高性能并发框架源码解析与实战 1 _0 Z' @+ l: s3 f6 r% t|____资料3 Z9 P- I2 x8 T6 ^ |____coding-275-master ...

  2. 跟我学RocketMQ之批量消息发送源码解析

    上篇文章 跟我学RocketMQ之消息发送源码解析 中,我们已经对普通消息的发送流程进行了详细的解释,但是由于篇幅问题没有展开讲解批量消息的发送.本文中,我们就一起来集中分析一下批量消息的发送是怎样的 ...

  3. java 并发框架源码_某网Java并发编程高阶技术-高性能并发框架源码解析与实战(云盘下载)...

    第1章 课程介绍(Java并发编程进阶课程) 什么是Disruptor?它一个高性能的异步处理框架,号称"单线程每秒可处理600W个订单"的神器,本课程目标:彻底精通一个如此优秀的 ...

  4. RocketMQ:消息ACK机制源码解析

    消息消费进度 概述 消费者消费消息过程中,为了避免消息的重复消费,应将消息消费进度保存起来,当其他消费者再对消息进行消费时,读取已消费的消息偏移量,对之后的消息进行消费即可. 消息模式分为两种: 集群 ...

  5. 使用信号灯法,标志位解决测试生产者消费者问题(源码解析、建议收藏)

    测试生产者消费者问题2-信号灯法,标志位解决(Java) package src.thread;//测试生产者消费者问题2:信号灯法,标志位解决 public class TestPC2 {publi ...

  6. RocketMQ源码解析之消息消费者(consume Message)

    原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...

  7. 6、RocketMQ 源码解析之 Broker 启动(上)

    上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...

  8. Kafka生产者和消费者分区策略部分源码解析

    之前我在看其他的博客时,发现对于kafka consumer的RoundRobin的缺点分析中,有两种观点,一种认为缺点在于如果消费者组中消费者消费的主题不同,或者消费者线程数不同,那么会造成消费者消 ...

  9. RocketMQ源码解析:Filtersrv

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

  10. RocketMQ源码解析之broker文件清理

    原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...

最新文章

  1. Python:urllib2模块Handler处理器 和 自定义Opener
  2. java好过去前一天日期_Java-日期保存为前一天
  3. html分析python字典_从python字典到html-lis
  4. OpenStack Pike Minimal安装:三、镜像管理
  5. 【错误记录】VMware 虚拟机报错 ( VMWare 中的 Ubuntu 虚拟机网络设置 | 第一次网络设置 )
  6. 实时计算Flink——产品安全
  7. 笔记本电脑排行_笔记本电脑性价比排行2020
  8. 【收藏】k8s使用securityContext和sysctl
  9. delimited mysql_在MySQL中存儲逗號分隔的數據
  10. 编写第一个Linux环境下程序的编译,下载记录
  11. setState同步异步场景
  12. (剑指Offer)面试题49:把字符串转换为整数
  13. 微软想让所有人都成为开发者?
  14. android通过点击播放视频,Android通过MediaPlayer实现播放视频实例
  15. PPC软件字体太小的调整
  16. yaw公式_横摆角速度(Yaw Rate)估算(上)
  17. maven module 路径_解决maven项目中-Dmaven.multiModuleProjectDirectory报错问题
  18. 内部投资回报率IRR
  19. 原生JS实现登录框邮箱提示
  20. iOS从零开始,用Swift:iOS上的数据持久性和沙箱

热门文章

  1. 185.部门工资前三高的员工
  2. 矩阵论7,8,9作业
  3. linux桌面系统开启wifi,8089B开启Wifi的方法(默认的红旗linux系统)
  4. 个税倒推收入的计算器_手把手教你做个税计算器(1)
  5. python中popen阻塞怎么办_对Python subprocess.Popen子进程管道阻塞详解
  6. [译]直观理解信息论
  7. 主子式大于等于零的矩阵是半正定矩阵的证明方法之一
  8. P5018 对称二叉树
  9. Unity 用ml-agents机器学习造个游戏AI吧(1)(Windows环境配置)
  10. 预科阶段:快速实战入门