首先看server端:class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer

下面这个实现了com.alibaba.rocketmq.remoting.RemotingService中的start方法。启动通信服务端

   @Overridepublic void start() {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
                nettyServerConfig.getServerWorkerThreads(), //
                new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});ServerBootstrap childHandler = //
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)//
                        .option(ChannelOption.SO_BACKLOG, 1024)//
                        .option(ChannelOption.SO_REUSEADDR, true)//
                        .option(ChannelOption.SO_KEEPALIVE, false)//
                        .childOption(ChannelOption.TCP_NODELAY, true)//
                        .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())//
                        .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())//
                        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(//
                                        defaultEventExecutorGroup, //
                                        new NettyEncoder(), //
                                        new NettyDecoder(), //
                                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //
                                        new NettyConnetManageHandler(), //
                                        new NettyServerHandler());}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecuter.start();}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Exception e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}

这里重点就是那两个hander了。第一个NettyConnetManageHandler就不必讲了,是关于连接方面的功能。

   class NettyConnetManageHandler extends ChannelDuplexHandler {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);super.channelRegistered(ctx);}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);super.channelUnregistered(ctx);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);super.channelActive(ctx);if (NettyRemotingServer.this.channelEventListener != null) {NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel()));}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);super.channelInactive(ctx);if (NettyRemotingServer.this.channelEventListener != null) {NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent evnet = (IdleStateEvent) evt;if (evnet.state().equals(IdleState.ALL_IDLE)) {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);RemotingUtil.closeChannel(ctx.channel());if (NettyRemotingServer.this.channelEventListener != null) {NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));}}}ctx.fireUserEventTriggered(evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);if (NettyRemotingServer.this.channelEventListener != null) {NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel()));}RemotingUtil.closeChannel(ctx.channel());}

重点看第二个handler:这个是实现处理消息的业务:NettyServerHandler

    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {processMessageReceived(ctx, msg);}}

processMessageReceived(ctx, msg)这个方法是RemotingService继承自NettyRemotingAbstract中的方法。我们再来看下NettyRemotingAbstract中的这个方法:
    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {final RemotingCommand cmd = msg;if (cmd != null) {switch (cmd.getType()) {case REQUEST_COMMAND:processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:processResponseCommand(ctx, cmd);break;default:break;}}}

    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();if (rpcHook != null) {rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);}final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);if (rpcHook != null) {rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);}if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();try {ctx.writeAndFlush(response);} catch (Throwable e) {plog.error("process request over, but response failed", e);plog.error(cmd.toString());plog.error(response.toString());}} else {}}} catch (Throwable e) {if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException".equals(e.getClass().getCanonicalName())) {plog.error("process request exception", e);plog.error(cmd.toString());}if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
                                    RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {if ((System.currentTimeMillis() % 10000) == 0) {plog.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
                            + ", too many requests and system thread pool busy, RejectedExecutionException " //
                            + pair.getObject2().toString() //
                            + " request code: " + cmd.getCode());}if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error = " request type " + cmd.getCode() + " not supported";final RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);plog.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}}

这里就通过final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode())来选择一processor来调用他的

processRequest(ctx, cmd)方法。

我们来看一个拉去消息的processor:PullMessageProcessor implements NettyRequestProcessor

@Overridepublic RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {return this.processRequest(ctx.channel(), request, true);}private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)throws RemotingCommandException {RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();final PullMessageRequestHeader requestHeader =(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);// 由于有直接返回的逻辑,所以必须要设置
        response.setOpaque(request.getOpaque());if (log.isDebugEnabled()) {log.debug("receive PullMessage request command, " + request);}if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden");return response;}SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());if (null == subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " "+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));return response;}if (!subscriptionGroupConfig.isConsumeEnable()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());return response;}final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (null == topicConfig) {log.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel));response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;}if (!PermName.isReadable(topicConfig.getPerm())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");return response;}if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic()+ " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();log.warn(errorInfo);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorInfo);return response;}SubscriptionData subscriptionData = null;if (hasSubscriptionFlag) {try {subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(),requestHeader.getSubscription());} catch (Exception e) {log.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
                        requestHeader.getConsumerGroup());response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);response.setRemark("parse the consumer's subscription failed");return response;}} else {ConsumerGroupInfo consumerGroupInfo =this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());if (null == consumerGroupInfo) {log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));return response;}if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //
                    && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");return response;}subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());if (null == subscriptionData) {log.warn("the consumer's subscription not exist, group: {}", requestHeader.getConsumerGroup());response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));return response;}if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {log.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),subscriptionData.getSubString());response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);response.setRemark("the consumer's subscription not latest");return response;}}final GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);if (getMessageResult != null) {response.setRemark(getMessageResult.getStatus().name());responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());responseHeader.setMinOffset(getMessageResult.getMinOffset());responseHeader.setMaxOffset(getMessageResult.getMaxOffset());if (getMessageResult.isSuggestPullingFromSlave()) {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());} else {responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:break;case SLAVE:if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}break;}if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {// consume too slow ,redirect to another machineif (getMessageResult.isSuggestPullingFromSlave()) {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());}// consume okelse {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());}} else {responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}switch (getMessageResult.getStatus()) {case FOUND:response.setCode(ResponseCode.SUCCESS);break;case MESSAGE_WAS_REMOVING:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;case NO_MATCHED_LOGIC_QUEUE:case NO_MESSAGE_IN_QUEUE:if (0 != requestHeader.getQueueOffset()) {response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
                                requestHeader.getQueueOffset(), //
                                getMessageResult.getNextBeginOffset(), //
                                requestHeader.getTopic(), //
                                requestHeader.getQueueId(), //
                                requestHeader.getConsumerGroup()//
                        );} else {response.setCode(ResponseCode.PULL_NOT_FOUND);}break;case NO_MATCHED_MESSAGE:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;case OFFSET_FOUND_NULL:response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_OVERFLOW_BADLY:response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: "+ getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress());break;case OFFSET_OVERFLOW_ONE:response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_TOO_SMALL:response.setCode(ResponseCode.PULL_OFFSET_MOVED);log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),getMessageResult.getMinOffset(), channel.remoteAddress());break;default:assert false;break;}if (this.hasConsumeMessageHook()) {ConsumeMessageContext context = new ConsumeMessageContext();context.setConsumerGroup(requestHeader.getConsumerGroup());context.setTopic(requestHeader.getTopic());context.setQueueId(requestHeader.getQueueId());String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);switch (response.getCode()) {case ResponseCode.SUCCESS:context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);context.setCommercialRcvTimes(getMessageResult.getMsgCount4Commercial());context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());context.setCommercialOwner(owner);break;case ResponseCode.PULL_NOT_FOUND:if (!brokerAllowSuspend) {context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);context.setCommercialRcvTimes(1);context.setCommercialOwner(owner);}break;case ResponseCode.PULL_RETRY_IMMEDIATELY:case ResponseCode.PULL_OFFSET_MOVED:context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);context.setCommercialRcvTimes(1);context.setCommercialOwner(owner);break;default:assert false;break;}this.executeConsumeMessageHookBefore(context);}switch (response.getCode()) {case ResponseCode.SUCCESS:this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),getMessageResult.getMessageCount());this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),getMessageResult.getBufferTotalSize());this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {final long beginTimeMills = this.brokerController.getMessageStore().now();final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(), //
                                requestHeader.getTopic(), requestHeader.getQueueId(),//
                                (int) (this.brokerController.getMessageStore().now() - beginTimeMills));response.setBody(r);} else {try {FileRegion fileRegion =new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {getMessageResult.release();if (!future.isSuccess()) {log.error("transfer many message by pagecache failed, " + channel.remoteAddress(), future.cause());}}});} catch (Throwable e) {log.error("transfer many message by pagecache exception", e);getMessageResult.release();}response = null;}break;case ResponseCode.PULL_NOT_FOUND:if (brokerAllowSuspend && hasSuspendFlag) {long pollingTimeMills = suspendTimeoutMillisLong;if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();}String topic = requestHeader.getTopic();long offset = requestHeader.getQueueOffset();int queueId = requestHeader.getQueueId();PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData);this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response = null;break;}case ResponseCode.PULL_RETRY_IMMEDIATELY:break;case ResponseCode.PULL_OFFSET_MOVED:if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE|| this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {MessageQueue mq = new MessageQueue();mq.setTopic(requestHeader.getTopic());mq.setQueueId(requestHeader.getQueueId());mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());OffsetMovedEvent event = new OffsetMovedEvent();event.setConsumerGroup(requestHeader.getConsumerGroup());event.setMessageQueue(mq);event.setOffsetRequest(requestHeader.getQueueOffset());event.setOffsetNew(getMessageResult.getNextBeginOffset());this.generateOffsetMovedEvent(event);log.warn("PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),responseHeader.getSuggestWhichBrokerId());} else {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),responseHeader.getSuggestWhichBrokerId());}break;default:assert false;}} else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("store getMessage return null");}boolean storeOffsetEnable = brokerAllowSuspend;storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;storeOffsetEnable = storeOffsetEnable&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;if (storeOffsetEnable) {this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());}return response;}

转载于:https://www.cnblogs.com/guazi/p/6835221.html

RocketMQ 拉取消息-通信模块相关推荐

  1. RocketMQ源码(十七)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码

    转载来源: RocketMQ源码(19)-Broker处理DefaultMQPushConsumer发起的拉取消息请求源码[一万字]_刘Java的博客-CSDN博客 此前我们学习了RocketMQ源码 ...

  2. RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码【一万字】

    基于RocketMQ release-4.9.3,深入的介绍了Broker处理DefaultMQPushConsumer发起的拉取消息请求源码. 此前我们学习了RocketMQ源码(18)-Defau ...

  3. 【mq】从零开始实现 mq-09-消费者拉取消息 pull message

    前景回顾 [mq]从零开始实现 mq-01-生产者.消费者启动 [mq]从零开始实现 mq-02-如何实现生产者调用消费者? [mq]从零开始实现 mq-03-引入 broker 中间人 [mq]从零 ...

  4. CMQ——多线程实现自动拉取消息

    何为CMQ? 腾讯云消息队列(Cloud Message Queue,CMQ)是一种分布式消息队列服务,它能够提供可靠的基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)之间的 ...

  5. rabbitmq 拉取消息太慢_面试官:消息队列这些我都要问

    作者:mousycoder segmentfault.com/a/1190000021054802 消息队列连环炮 项目里怎么样使用 MQ 的? 为什么要使用消息队列? 消息队列有什么优点和缺点? k ...

  6. RocketMQ如何保证消息不丢失? 如何快速处理积压消息?

    文章目录 1. 哪些环节会有丢消息的可能? 2. 消息生产阶段如何保证消息不丢失 2.1 同步发送 2.2 采用事务消息 3. Broker如何保证接收到的消息不会丢失 4. 消费者如何确保拉取到的消 ...

  7. RocketMQ:Consumer概述及启动流程与消息拉取源码分析

    文章目录 Consumer 概述 消费者核心类 消费者启动流程 消息拉取 PullMessageService实现机制 ProcessQueue实现机制 消息拉取基本流程 客户端发起消息拉取请求 消息 ...

  8. 从源码分析RocketMQ系列-消息拉取PullMessageProcessor详解

    导语   在之前的分析中分析了关于SendMessageProcessor,并且提供了对应的源码分析分析对于消息持久化的问题,下面来看另外一个PullMessageProcessor,在RocketM ...

  9. 【RocketMQ】消息的拉取

    RocketMQ消息的消费以组为单位,有两种消费模式: 广播模式:同一个消息队列可以分配给组内的每个消费者,每条消息可以被组内的消费者进行消费. 集群模式:同一个消费组下,一个消息队列同一时间只能分配 ...

  10. Consumer消息拉取和消费流程分析

    1. 前言 MQConsumer是RocketMQ提供的消费者接口,从接口定义上可以看到,它主要的功能是订阅感兴趣的Topic.注册消息监听器.启动生产者开始消费消息. ​ 消费者获取消息的模式有两种 ...

最新文章

  1. 笔记本上的小键盘计算机怎样用,笔记本小键盘怎么开,详细教您笔记本小键盘怎么开启...
  2. 史上最完整的人工智能书单大全,学习AI的请收藏好
  3. 学习笔记——sklearn监督学习:回归(简单数学知识罗列)
  4. 基于php的问答,thinkask
  5. 牛!2020年,这项技术将获得1,000,000,000元人民币注资!
  6. linux安装网卡驱动tgz,Linux安装网卡驱动
  7. 统考计算机和英语百度网盘,《计算机专业英语》在线考试系统
  8. php auth和rbac区别,THINKPHP中的AUTH权限管理介绍
  9. c++学习 -- #program once
  10. Mybatis破MySql8小时断线问题
  11. 如何在B站读一个深度学习的研究生?
  12. 计算机智能化弱电设备发展趋势,中国弱电智能化发展趋势
  13. flv文件修复工具——FLVMDI的使用方法
  14. 8种妙招,公众号实现吸粉引流
  15. vue项目中使用Google Analytics (谷歌统计)
  16. 解决微信小程序数据渲染缓慢或卡顿的方法
  17. 微信小程序--获取视频链接(Videourl)方法~~~
  18. Redis List命令大全
  19. 利用SimpleTagSupport创建定制标签
  20. 使用免费引流脚本,如何突破引流难的困境

热门文章

  1. 【Chrome小技巧】Chrome浏览器如何实现下载速度加快?
  2. Mac 版 QQ 音乐上线离线提示音的方法?
  3. Ubuntu20.04安装 Redis 并配置 phpRedisAdmin
  4. 详解iMazing保障数据安全的设置
  5. 如何用iMazing导出苹果手机短信(彩信)
  6. 用 CrossOver 安装的 Windows 软件在哪
  7. 800万像素3倍光变 奥林巴斯FE280降价
  8. OPC UA 统一架构学习4
  9. MySQL Table is marked as crashed and should be repaired
  10. git stash 和还原某个文件版本