RocketMQ消费者中有一个consumeFromWhere属性,该属性从语义上来看就是说该消费者从队列的哪里开始消费,并且可以通过setConsumeFormWhere方法去进行设置,可设置的有三个值,分别是CONSUME_FROM_LAST_OFFSET,CONSUME_FROM_FIRST_OFFSET,CONSUME_FORM_TIMESTAMP,按照语义上来理解就是从队列的最后开始消费,从队列的初始位置开始消费,从消息指定的时间戳开始消费,但是事实上真的是这么简单吗,下面我们就来看一下这三个值是怎么实现的。

首先一个消费者开始消费之前,先要分配到自己订阅的主题对应的mq,这个过程就需要对所有的mq去进行负载均衡了,具体负载均衡的过程不是这篇文章的重点,我们直接来到计算开始消费位点的地方

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {// 条件成立: 说明这个mq是新分配的if (!this.processQueueTable.containsKey(mq)) {// 如果当前消费者实例是顺序消费,那么就会先对新分配到的mq进行broker端加锁,如果加锁不成功,直接跳过if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);continue;}// 内存中可能有该队列的一些脏数据,所以要把这些脏数据移除this.removeDirtyOffset(mq);// 创建该新分配的mq对应的队列快照对象ProcessQueue pq = new ProcessQueue();// 根据用户设置的ConsumeFromWhere去获取新分配的mq下一次起始消费偏移量,ConsumeFromWhere根据setConsumeFromWhere()方法进行设置long nextOffset = this.computePullFromWhere(mq);// 如果起始消费偏移量 >= 0, 就创建拉取消息的任务if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);changed = true;}}// 如果起始消费偏移量 < 0,就什么都不做,只打印个日志else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}
}
// 把新分配的队列快照分发给PullMessageService线程
this.dispatchPullRequest(pullRequestList);

上面的代码是updateProcessQueueTableInRebalance方法的一部分,updateProcessQueueTableInRebalance方法是从负载均衡中调用过来的,mqSet这个集合是负载均衡之后分配给当前消费者实例的最新mq集合,processQueueTable里面保存了当前消费者之前被分配过的mq以及对应的ProcessQueue。首先去遍历所有新分配过来的mq,每一个新分配的mq与processQueueTable中的mq进行对比,如果发现processQueueTable中不存在这个mq,则表示这个mq是新分配过来的,此时就需要去创建该mq对应的ProcessQueue,然后放到processQueueTable中,这里注意有一行代码long nextOffset = this.computePullFromWhere(mq);这行代码返回了一个nextOffset,消费者根据这个nextOffset去构建出一个拉取消息请求给拉取消息服务,拉取消息服务根据这个nextOffset的值去决定从队列中的那个位置开始拉取消息,所以这行代码就是去计算队列的消费位置的了,我们进去看一看

org.apache.rocketmq.client.impl.consumer.RebalancePushImpl#computePullFromWhere

该方法是一个抽象方法,如果我们使用的是DefaultMQPushConsumer这个消费者实例,那么底层使用的就是RebalancePushImpl去进行消费开始位点的计算

public long computePullFromWhere(MessageQueue mq) {long result = -1;final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();switch (consumeFromWhere) {case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:case CONSUME_FROM_MIN_OFFSET:case CONSUME_FROM_MAX_OFFSET:case CONSUME_FROM_LAST_OFFSET: {// 从broker端获取到当前消费者组对这个mq的已消费进度long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);// 条件成立,这里有两种场景:// 1.当lastOffset > 0时, 说明当前消费组已经有实例消费过这个mq,直接返回broker端记录的消费进度// 2.当lastOffset == 0时, 表示当前这个消费者组第一次消费这个mq,并且这个mq的consumequeue文件还未曾被删除过,以及此时该mq内的消息是“热数据”,此时返回lastOffset == 0if (lastOffset >= 0) {result = lastOffset;}// 条件成立:表示当前这个消费者组第一次消费这个mq,但是该mq的consumequeue文件被删除过,或者 此时该mq内的消息是“冷数据”else if (-1 == lastOffset) {// 重试主题if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {result = 0L;}// 非重试主题else {try {// 取这个mq的最大偏移位(consumequeue目录中的最大偏移位)result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);} catch (MQClientException e) {result = -1;}}} else {result = -1;}break;}case CONSUME_FROM_FIRST_OFFSET: {long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {result = lastOffset;} else if (-1 == lastOffset) {result = 0L;} else {result = -1;}break;}case CONSUME_FROM_TIMESTAMP: {long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {result = lastOffset;} else if (-1 == lastOffset) {if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {try {result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);} catch (MQClientException e) {result = -1;}} else {try {long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),UtilAll.YYYYMMDDHHMMSS).getTime();result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);} catch (MQClientException e) {result = -1;}}} else {result = -1;}break;}default:break;}return result;
}

参数就是待计算开始消费位点的mq,这里有三个分支,分别就是我们上面说的三个枚举不同的计算方式,当我们设置的开始消费位点方式是CONSUME_FROM_LAST_OFFSET的时候会来到第一个分支

case CONSUME_FROM_LAST_OFFSET: {// 从broker端获取到当前消费者组对这个mq的已消费进度long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);// 条件成立,这里有两种场景:// 1.当lastOffset > 0时, 说明当前消费组已经有实例消费过这个mq,直接返回broker端记录的消费进度// 2.当lastOffset == 0时, 表示当前这个消费者组第一次消费这个mq,并且这个mq的consumequeue文件还未曾被删除过,以及此时该mq内的消息是“热数据”,此时返回lastOffset == 0if (lastOffset >= 0) {result = lastOffset;}// 条件成立:表示当前这个消费者组第一次消费这个mq,但是该mq的consumequeue文件被删除过,或者 此时该mq内的消息是“冷数据”else if (-1 == lastOffset) {// 重试主题if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {result = 0L;}// 非重试主题else {try {// 取这个mq的最大偏移位(consumequeue目录中的最大偏移位)result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);} catch (MQClientException e) {result = -1;}}} else {result = -1;}break;
}

首先会通过消费进度组件(集群模式是RemoteBrokerOffsetStore)去从broker端获取这个mq的已消费进度

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#readOffset

/*** 获取mq的已消费偏移量* @param mq    mq对象* @param type  获取类型,内存获取 or broker端获取* @return*/
@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {if (mq != null) {switch (type) {case MEMORY_FIRST_THEN_STORE:// 从内存中拿mq已消费偏移量,case READ_FROM_MEMORY: {AtomicLong offset = this.offsetTable.get(mq);if (offset != null) {return offset.get();} else if (ReadOffsetType.READ_FROM_MEMORY == type) {return -1;}}// 从远程broker端拿mq已消费偏移量case READ_FROM_STORE: {try {// 从broker中查询指定mq已消费偏移量long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);AtomicLong offset = new AtomicLong(brokerOffset);// 更新offsetTable表this.updateOffset(mq, offset.get(), false);return brokerOffset;}// 查询失败fetchConsumeOffsetFromBroker方法会抛出异常,直接返回 -1catch (MQBrokerException e) {return -1;}//Other exceptionscatch (Exception e) {log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);return -2;}}default:break;}}return -1;
}

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#fetchConsumeOffsetFromBroker

*** 从broker中获取到指定mq已消费偏移量* @param mq    指定的mq* @return  指定的mq的已消费偏移量* @throws RemotingException* @throws MQBrokerException* @throws InterruptedException* @throws MQClientException*/private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,InterruptedException, MQClientException {// 根据mq所属的broker组名找到其中的主节点或者从节点地址FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());if (null == findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());}// 往broker发送查询该队列的消费偏移量的请求if (findBrokerResult != null) {// 创建请求头对象QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();// mq所属主题requestHeader.setTopic(mq.getTopic());// 消费者组名requestHeader.setConsumerGroup(this.groupName);// queueIdrequestHeader.setQueueId(mq.getQueueId());// 查询失败抛出异常return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);} else {throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}}
}

org.apache.rocketmq.client.impl.MQClientAPIImpl#queryConsumerOffset

/*** 查询指定队列的消费偏移量* @param addr  broker地址* @param requestHeader 请求头对象* @param timeoutMillis 请求超时时间* @return  队列已消费偏移量* @throws RemotingException* @throws MQBrokerException* @throws InterruptedException*/
public long queryConsumerOffset(final String addr,final QueryConsumerOffsetRequestHeader requestHeader,final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {// 查询成功case ResponseCode.SUCCESS: {QueryConsumerOffsetResponseHeader responseHeader =(QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);return responseHeader.getOffset();}default:break;}// 返回QUERY_NOT_FOUND响应码,表示查询失败,抛出异常throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

可以看到上面就是向broker发起查询指定mq的已消费进度的请求,所以我们下面要去到broker服务端看是怎么处理返回的

org.apache.rocketmq.broker.processor.ConsumerManageProcessor#queryConsumerOffset

private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);final QueryConsumerOffsetResponseHeader responseHeader =(QueryConsumerOffsetResponseHeader) response.readCustomHeader();final QueryConsumerOffsetRequestHeader requestHeader =(QueryConsumerOffsetRequestHeader) request.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);// 从broker内存中查询到指定消费者组 在指定主题和指定mq的 已消费偏移量, 如果这个消费者组是第一次消费该mq,那么就查询不到对应的消费进度了,此时返回-1long offset =this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());// 查询成功,直接返回给客户端if (offset >= 0) {responseHeader.setOffset(offset);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);}// 如果查询不到,上面的offset则会返回 -1, 那么会进此条件分支else {// 获取mq中最小的偏移位long minOffset =this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),requestHeader.getQueueId());// 条件成立: minOffset == 0(说明该mq的consumequeue文件还未曾被删除过),并且此时该mq的消息并不是“冷数据”,那么这种情况返回的offset就等于0if (minOffset <= 0&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {responseHeader.setOffset(0L);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);}// 上面条件不成立,返回查询未找到else {response.setCode(ResponseCode.QUERY_NOT_FOUND);response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");}}return response;
}

在broker中首先会从内存中查询出指定消费者组在指定主题和指定mq的已消费偏移量

/*** 查询指定消费者组,指定topic,指定mq的已消费进度,如果查询不到,返回-1* @param group 指定的消费者组名* @param topic 指定的主题名称* @param queueId   指定的mq的queueId* @return  对应的已消费进度*/
public long queryOffset(final String group, final String topic, final int queueId) {// topic@groupString key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null != map) {Long offset = map.get(queueId);if (offset != null)return offset;}return -1;
}

根据topic和消费者组名作为key去获取指定mq的已消费进度,也就是说同一个topic对于不同的消费者组来说,他们对于同一个mq的消费进度是不同的。最后如果没有查询到就返回-1,那么什么时候会没有查询到?就是这个消费组第一次消费这个topic中的这个mq。接下来就会去判断上面查询到的offset

if (offset >= 0) {responseHeader.setOffset(offset);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);
}

如果查询到的话这个if条件就会成立,那么此时就立刻返回给消费者,但是如果查询不到就会进入else分支

else {long minOffset =this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),requestHeader.getQueueId());if (minOffset <= 0&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {responseHeader.setOffset(0L);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);}// 上面条件不成立,返回查询未找到else {response.setCode(ResponseCode.QUERY_NOT_FOUND);response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");}
}

在else分支中会去查询到这个mq此时的最小偏移位,并且对返回的这个mq的最小偏移位进行判断,这里关键要搞清楚这个条件是如何成立的。第一个判断minOffset<=0,那么minOffset是什么时候等于0的呢?其实就是这个mq的consumequeue文件没有被删除过(定时任务删除或者手动删除),第二个判断是判断此时consumequeue文件中的数据是否是“冷数据”,如果不是“冷数据”这个判断就成立,所以综合来说整个if判断成立的条件是:该mq的consumequeue文件还未曾被删除过,并且此时该mq的消息并不是“冷数据”,那么这个条件符合之后,返回给消费者的offset就是0,反之如果if条件不成立,就返回给消费者一个QUERY_NOT_FOUND查询未找到的响应码

分析完broker服务端之后,我们再回到消费者端看拿到返回值之后如何处理

public long queryConsumerOffset(final String addr,final QueryConsumerOffsetRequestHeader requestHeader,final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {// 查询成功case ResponseCode.SUCCESS: {QueryConsumerOffsetResponseHeader responseHeader =(QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);return responseHeader.getOffset();}default:break;}// 返回QUERY_NOT_FOUND响应码,表示查询失败,抛出异常throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

可以看到对于返回SUCCESS响应码的情况,直接就拿到broker返回的offset并返回,而对于其他响应码,也就是QUERY_NOT_FOUND这个响应码都会抛出异常,然后我们再看上层调用

case READ_FROM_STORE: {try {// 从broker中查询指定mq已消费偏移量long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);AtomicLong offset = new AtomicLong(brokerOffset);// 更新offsetTable表this.updateOffset(mq, offset.get(), false);return brokerOffset;}// 查询失败fetchConsumeOffsetFromBroker方法会抛出异常,直接返回 -1catch (MQBrokerException e) {return -1;}//Other exceptionscatch (Exception e) {log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);return -2;}
}

一直到这个上层调用的try catch代码块,如果查询成功就直接返回出去了,否则对于查询失败的情况由于底层会抛出异常,所以这里的catch语句块就能够执行,返回-1

分析到这里,我们再回到computePullFromWhere方法

case CONSUME_FROM_LAST_OFFSET: {long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);result = lastOffset;}if (lastOffset >= 0) {result = lastOffset;}else if (-1 == lastOffset) {// 重试队列if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {result = 0L;}// 非重试队列else {try {result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);} catch (MQClientException e) {result = -1;}}} else {result = -1;}break;
}

上面说到从broker服务端获取到mq已消费的偏移量的细节,那么这里就要对获取到的已消费偏移量去进行判断,从而得到正确的值返回给上层,这里先说明一下,在上层的调用中如果该方法返回的mq已消费偏移量是小于0的,那么是不会对该mq进行拉取消息的。

根据上面的分析,可以知道下面的三个if条件分支分别是什么情况下会发生:

1.lastOffset>=0,什么情况下返回的mq已消费偏移量才会大于等于0?这里就分为大于0和等于0这两种情况了:

当lastOffset > 0时, 表示当前消费组已经有实例消费过这个mq,直接返回broker端记录的消费进度

        当lastOffset == 0时, 表示当前这个消费者组第一次消费这个mq,并且这个mq的consumequeue文件还未曾被删除过,以及此时该mq内的消息是“热数据”

2.lastOffset==-1

当这个消费者组第一次消费这个mq,但是该mq的consumequeue文件被删除过,或者 此时该mq内的消息是“冷数据”的时候

3.else other

其他情况都返回-1

所以此时这里就会有两个问题了:

1.当启动一个新的消费者组实例去消费消息,此时该消费者实例对于所分配到的mq是从哪里开始消费?

根据上面的分析,我们可以知道,此时是有可能会进入第一个if分支或者是第二个if分支,关键就在于此时该mq的consumequeue文件是否有被删除过,或者是该mq内的数据是“冷数据”还是“热数据”,如果该消费者实例启动得比较晚,此时定时任务已经把过期的consumequeue文件都删除了,那么就从该mq的最新消息开始消费了,又或者是此时该mq中的消息堆积得比较多了,已经超过了总体内存的30%(冷数据),那么也会从该mq的最新消息开始消费,其余情况都会重新消费该mq的消息

2.此时已经有一个消费者组正在消费,新增一个该消费者组的实例,那么此时该消费者实例对于所分配到的mq是从哪里开始消费?

这种情况下,第一个if条件就会成立,所以是从broker返回的该mq已消费偏移量开始消费

我们有了上面分析,接着再去分析其他两个值

CONSUME_FROM_FIRST_OFFSET

case CONSUME_FROM_FIRST_OFFSET: {long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {result = lastOffset;} else if (-1 == lastOffset) {result = 0L;} else {result = -1;}break;
}

可以看到CONSUME_FROM_FIRST_OFFSET的判断与CONSUME_FROM_LAST_OFFSET的判断大同小异,唯一不同的就是对于lastOffset等于-1的这种情况,这种情况统一都是返回0,也就是说当这个消费者组第一次消费这个mq,但是该mq的consumequeue文件被删除过,或者 此时该mq内的消息是“冷数据”的时候就会重新消费该mq的消息,所以可以看到,对于设置了CONSUME_FROM_FIRST_OFFSET的消费者,如果当前是已经有对应的消费组的实例在消费了,那么此时该值并不会生效

CONSUME_FROM_TIMESTAMP

case CONSUME_FROM_TIMESTAMP: {long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {result = lastOffset;} else if (-1 == lastOffset) {if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {try {result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);} catch (MQClientException e) {result = -1;}} else {try {long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),UtilAll.YYYYMMDDHHMMSS).getTime();result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);} catch (MQClientException e) {result = -1;}}} else {result = -1;}break;
}

CONSUME_FROM_TIMESTAMP中对于lastOffset>=0的处理是一样的,不同的就是last等于-1的时候,是会根据getConsumeTimestamp指定的时间去找到这个时间的消息偏移量并返回出去,也就是说对于设置了CONSUME_FROM_TIMESTAMP的消费者,如果当前是已经有对应的消费组的实例在消费了,那么此时该值也是不会生效

总结:我们可以看到,根据上面的分析,消费者设置这三个值所起到的效果并不会跟其语义是完全一致的,如果启动的消费者所属的消费者组中已经有实例对mq进行消费了,那么这个消费者消费的起始位置就是已消费偏移量,反之,如果这个消费者所属的消费者组是第一次对mq进行消费,就可能会根据自己的语义来决定了

关于RocketMQ消费者消费队列的消费起始位置源码分析相关推荐

  1. 阻塞队列(ArrayBlockingQueue) 迭代器源码分析

    文章目录 为什么 ArrayBlockingQueue 迭代器复杂呢? 提出几个 ArrayBlockingQueue 迭代器的问题用于下面代码分析时进行思考 Itrs Itr 1.重要变量 2. 构 ...

  2. RocketMQ:Producer启动流程与消息发送源码分析

    文章目录 Producer 1.方法和属性 2.启动流程 3.消息发送 3.1验证消息 3.2查找路由 3.3选择队列 3.4发送消息 3.5发送批量消息 Producer 在RocketMQ中,消息 ...

  3. 并发编程(九)—— Java 并发队列 BlockingQueue 实现之 LinkedBlockingQueue 源码分析...

    LinkedBlockingQueue 在看源码之前,通过查询API发现对LinkedBlockingQueue特点的简单介绍: 1.LinkedBlockingQueue是一个由链表实现的有界队列阻 ...

  4. 9. 源码分析之消息消费

    源码分析之消息消费 Rebalance(针对集群消费模式) (1)消费Group下的所有消费者 (2)Topic的所有Queue队列 (3)Queue分配策略 触发时机 (1)消费者启动 (2)消费者 ...

  5. 源码分析Dubbo服务消费端启动流程

    通过前面文章详解,我们知道Dubbo服务消费者标签dubbo:reference最终会在Spring容器中创建一个对应的ReferenceBean实例,而ReferenceBean实现了Spring生 ...

  6. 从源码分析RocketMQ系列-RocketMQ消息持久化源码详解

    导语   在上篇分析中,提到了一个概念处理器,并且在进入到最终NettyIO的时候看到了一个Pair的对象,这个对象存储了两个对象,一个是执行器,一个是处理器,在进入Runable对象的时候看到封装到 ...

  7. 从源码分析RocketMQ系列-消息拉取PullMessageProcessor详解

    导语   在之前的分析中分析了关于SendMessageProcessor,并且提供了对应的源码分析分析对于消息持久化的问题,下面来看另外一个PullMessageProcessor,在RocketM ...

  8. 详细讲解:RocketMQ的限时订单实战与RocketMQ的源码分析!

    目录 一.限时订单实战 1.1.什么是限时订单 1.2.如何实现限时订单 1.2.1.限时订单的流程 1.2.2.限时订单实现的关键 1.2.3.轮询数据库? 1.2.4.Java 本身的提供的解决方 ...

  9. RocketMQ源码分析之延迟消息

    文章目录 前言 一.延迟消息 1.特点 2.使用场景 3.demo 二.发送延迟消息 三.broker端存储延迟消息 四.总结 1.延迟消息工作原理 2.延迟消息在消费者消费重试中的应用 前言 本篇文 ...

  10. RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)

    在阅读本文前,若您对RocketMQ技术感兴趣,请加入 RocketMQ技术交流群 根据上文的描述,发送事务消息的入口为: TransactionMQProducer#sendMessageInTra ...

最新文章

  1. React 项目----className 样式 (13)
  2. 微软翻译突破百种语言和方言大关
  3. 使用阿里云的maven私服的setting.xml, 提高maven项目jar下载速度
  4. Xubunbtu远程桌面的tab键
  5. 帝国cms微信小程序多语言相亲交友制作记录难点使用的小程序组件分析
  6. JAVA基础整理-100.Java 多线编程
  7. 集成学习-Boosting集成学习算法LightGBM
  8. 渗透测试中dns log的使用
  9. 746. 使用最小花费爬楼梯 golang 动态规划
  10. 爬取IMDBTOP250
  11. Java B2B2C多用户商城 springboot架构 (五)springboot整合 beatlsql
  12. jQuery1.9+ 废弃的函数和方法 升级Jquery版本遇到的问题
  13. win7和win8双系统的问题
  14. Linux查看内存,负载状态
  15. POI导出之我的实践篇
  16. 芝麻二维码,安卓和苹果二维码合并和统计工具
  17. 怎么把计算机加入网络打印机共享打印机共享,打印机共享怎么设置
  18. 使用WireShark生成地理位置数据地图
  19. html按钮点击后无效,关于html中按钮的单击事件,第一次单击可以运行,再次单击不能运行的解决方法...
  20. 小型机与PC服务器的区别(phpc)

热门文章

  1. 易筋SpringBoot 2.1 | 第十七篇:SpringBoot的事务Transaction
  2. 2021-09-10 Bagging[7](并 行)和Boosting[8](串行)是两种常见的集成学习方法
  3. 420.强密码检测器
  4. python函数 range()和arange()
  5. 2002年4月计算机二级c语言题,全国计算机等级考试二级C语言真题2004年4月
  6. 克隆的虚拟机一直重复出现登录界面_QQ空间里的装扮怎样才能克隆到自己空间?...
  7. Numpy快速入门教程
  8. 广义线性模型(Generalized Linear Models, GLM)
  9. JAVA的对象创建与调用的内存解析
  10. php 编译安装gd 失败,编译安装php5 解决编译安装的php加载不了gd