前言

如果让我设计一个配置中心,最先想到的两个核心功能:一个是如何将配置存储下来,另一个是怎么能够实时的获取到最新的配置;最简单的方式我们可以直接利用现有的一些中间件:Zookeeper、Redis等;

  • Zookeeper: 本身提供了持久化功能,同时客户端可以监听某个节点,节点数据变更,可以实时推送给客户端;
  • Redis: Redis也提供了持久化的方案,同时可以通过psubscribe提供的订阅功能做到配置的实时推动;

以上两个中间件在客户端进行连接的时候,其实建立的是长连接;长连接就是客户端和服务端建立连接后连接是保持的,这样服务端可以很容易的把最新的配置推送给客户端.

数据交互模式

常见的数据交互模式:Push模式和Pull模式

  • Push模式:服务端主动推送数据给客户端,上面说到的Zookeeper和Redis就是这种模式; 这种模式实时性很高,对客户端来说也简单,接收处理消息即可;缺点就是服务端不知道客户端处理消息的能力,可能会导致数据积压,同时也增加了服务端的工作量,影响服务端的性能;
  • Pull模式: 拉取模式,即客户端主动去服务端拉取数据,主动权在客户端,拉取数据,然后处理数据,再拉取数据,一直循环下去,具体拉取数据的时间间隔不好设定,太短可能会导致大量的连接拉取不到数据,太长导致数据接收不及时;

可以发现两种模式各有优缺点;Apollo既没有使用Push模式也没有使用Pull模式,而是使用了长轮询的数据交互模式;

  • 长轮询模式:通过客户端和服务端的配合,达到主动权在客户端,同时也能保证数据的实时性;长轮询本质上也是轮询,只不过对普通的轮询做了优化处理,服务端在没有数据的时候并不是马上返回数据,会 hold 住请求,等待服务端有数据,或者一直没有数据超时处理;

长轮询模式其实在很多中间件中被广泛使用比如:RocketMQ、Kakfa、Nacos等;当然具体每个中间件是如何实现自己的长轮询方案是不一样的,本文重点介绍的是Apollo如何利用Servlet3.0中提供的异步请求处理机制来实现自己的长轮询;下面先看一下Servlet3.0的异步处理机制原理.

Servlet异步处理

Servlet 3.0开始支持异步处理请求, 在接收到请求之后Servlet线程可以将耗时的操作委派给另一个线程来完成, 这样Servlet线程就可以被释放出来,可以去接收其他的请求,可以提高系统的吞吐量;为了更加清楚的了解异步处理,我们需要了解一下线程模型,下面以常用的容器Tomcat为例,来看一下Tomcat的线程模型;

Tomcat线程模型

Unix系统I/O模型主要包含以下五种类型:同步阻塞I/O、同步非阻塞I/O、I/O多路复用、信号驱动I/O和异步I/O;

目前主流使用的是I/O多路复用模型,像以高性能著称的Netty,以及下面要介绍的Tomcat都是用来此模型;此模型依赖非阻塞Channel,可以 通过一个线程来管理多个数据通道(Channel)的状态,极大的提供了性能; 当然此模型下也演变出多个变种包括:单线程模型、多线程模型、主从多线程模型, 这里就不展开讲了可以参考:Netty 系列之 Netty 线程模型

Tomcat的NioEndpoint主要包括:AcceptorPollerSocketProcessorExecutor几个组件,大致关系如下图所示:

  • Acceptor:默认启动一个线程通过ServerSocketChannel监听连接请求,同时往Selecotr中注册读事件;
  • Poller: 内部其实就是一个选择器Selector,选择哪些SocketChannel已经准备好读写了,这里一般会启动多个pollerThread,也就是我们常见的多Selector模型;
  • Executor:执行器线程池,默认最大可以创建200个线程,是真正处理I/O读写的地方,这个线程很大程度上也影响了系统的吞吐量;其实异步处理也就是释放这里面的线程;
  • SocketProcessor:可以理解就是处理I/O读写的任务,被Executor来调度;

异步处理

常见的同步处理是Executor组件分配一个线程,这个线程同时处理消息的读取、消息的处理以及消息的返回;

而异步处理Executor组件会分配一个线程处理消息的读取,读取完线程就被回收了;接下来消息的处理用户可以使用自己的线程去处理;而消息的返回会再次通过Executor分配一个新的线程来处理; 以下是一个简单的实例:

 public void doGet(HttpServletRequest request, HttpServletResponse response) {//1.获取asyncContextAsyncContext asyncContext = request.startAsync();//设置超时时间,不设置默认30秒asyncContext.setTimeout(5000);//设置监听器asyncContext.addListener(new AsyncListener() {...});//2.异步处理new Thread(() -> {PrintWriter out = asyncContext.getResponse().getWriter();out.println("test");//3.任务完成返回结果asyncContext.complete();}.start();}
复制代码
  • 异步开始: 首先生成一个AsyncContext类保存了requst、response、超时时间以及监听器等信息;以上doGet方法中的内容会在Executor组件分配的第一个线程中执行,这个线程一般会命名为:http-nio-8080-exec-x;
  • 异步处理业务逻辑: 自定义一个线程,用来处理耗时的业务逻辑;这时候自定义线程启动完,Executor就回收了http-nio-8080-exec-x线程; 注意这时处理当前SocketChannelSocketProcessor还在,只是没有线程处理它,导致客户端一直阻塞着;
  • 任务结束: 重新让SocketProcessor参与调度,从Executor组件中分配新的线程,返回客户端结果;这里可以执行的操作包括:completedispatch;当然也可能任务太耗时,导致超过设定的超时时间,从而进入超时处理;
  • 超时处理:Tomcat会专门启动一个AsyncTimeout线程,用来检查等待中的SocketProcessor是否已经超时,如果超时同样会结束阻塞.

大致流程如下图所示:

DeferredResult实现长轮询

Apollo并没有直接使用Servlet异步处理的原生接口,而是使用DeferredResult类,此类其实是对原生异步处理类的包装;从字面意思“延迟结果”也能大概看出来它的作用,同样的也可以在接受到请求之后,处理异步任务,归还主线程,等有了结果通知再次申请主线程,返回结果;

下面首先看一个使用DeferredResult的简短实例:

 public static ExecutorService exec = Executors.newCachedThreadPool();@GetMapping("/DeferredResult")public DeferredResult<String> testDeferredResult() {//1.创建DeferredResult并设置超时时间DeferredResult<String> deferredResult = new DeferredResult<String>(10 * 1000L);//设置一些监听器deferredResult.onTimeout(...)deferredResult.onCompletion(...)deferredResult.onError(...)//2.异步处理new Thread(() -> {//3.异步处理耗时任务Thread.sleep(2000);//4.给deferredResult设置结果deferredResult.setResult("异步线程执行完毕");}.start();}
复制代码

具体的使用结构和使用Servlet异步处理基本一致,setResult()其实和complete()功能一样;

原理分析

DeferredResult使用在Controller中返回的就是一个DeferredResult对象,Spring会根据每种返回的类型有对应的处理器,DeferredResult对应的处理器DeferredResultMethodReturnValueHandler:

@Overridepublic void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {if (returnValue == null) {mavContainer.setRequestHandled(true);return;}DeferredResult<?> result;// 返回类型为DeferredResultif (returnValue instanceof DeferredResult) {result = (DeferredResult<?>) returnValue;}// 返回类型为ListenableFuture做适配转换,可以转换成DeferredResultelse if (returnValue instanceof ListenableFuture) {result = adaptListenableFuture((ListenableFuture<?>) returnValue);}//返回类型为CompletionStage做适配转换,可以转换成DeferredResultelse if (returnValue instanceof CompletionStage) {result = adaptCompletionStage((CompletionStage<?>) returnValue);}else {// Should not happen...throw new IllegalStateException("Unexpected return value type: " + returnValue);}//处理DeferredResultWebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);}
复制代码

接下来就是处理DeferredResult,里面包含设置超时时间、设置监听器等操作,看里面的部分重点代码:

public void startDeferredResultProcessing(final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {...省略部源码(设置超时时间、设置监听器等操作)...//开始异步处理,对request.startAsync()的包装startAsyncProcessing(processingContext);try {interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);//设置结果处理器,在接收到result的时候会执行此处理器,这里面其实就是对complete()、dispatch()方法的包装deferredResult.setResultHandler(result -> {result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);setConcurrentResultAndDispatch(result);});}catch (Throwable ex) {setConcurrentResultAndDispatch(ex);}
}
复制代码

在执行deferredResult.setResult()方法后,就会触发DeferredResultHandler,这里面会请求重新调度,从而返回结果给客户端;

以下是一个大致处理的流程图:

长轮询

在了解了以上这些之后,我们就可以考虑一下Apollo为什么使用DeferredResult来实现长轮询机制;从上面对长轮询的描述中,有几个点很重要分别是:

  • 没有结果能hold住请求,能阻塞多久,这里面是存在一个超时时间的;
  • 在被hold的这段时间内,需要释放资源,主要是线程资源;
  • 有了结果能立即通知,对时效性要求高;

基于以上这三点,使用DeferredResult来实现长轮询是非常合适的:

  • 能够将request和response保存到AsyncContext中,不给Channel响应SocketChannel本身就会阻塞住;
  • 返回一个DeferredResult对象,处理完上下文信息之后,就释放Executor主线程了;
  • 提供了complete和dispatch操作,有结果能实时通知;

Apollo分析

Apollo的长轮询机制完成依赖DeferredResult,重点可以看两个类:NotificationControllerV2ReleaseMessageScanner

  • NotificationControllerV2:主要用来接收请求,并且生成DeferredResult,并且统一保存到Map中;
  • ReleaseMessageScanner:扫描发布记录,哪些namespace有更新,立即通知Map中等待的DeferredResult;

以下是一个大致处理的流程图:

其他长轮询实现

上面提到长轮询模式其实在很多中间件中被广泛使用比如:RocketMQ、Kakfa、Nacos等;

Nacos本身也是使用的DeferredResult机制来实现的长轮询;RocketMQ和Kafka在客户端消费消息的时候,同意也采用了长轮询的方式,虽然没有直接使用DeferredResult,其实实现原理基本一致,下面重点看一下RocketMQ的长轮询机制;

RocketMQ长轮询

消费者在消费消息的时候,如果发现本轮没有消息,其实并不会立即返回,同样会hold住一段时间等待是否有消息;

在看RocketMQ源码到过程中,同样重点关注Broker中的两个类:PullMessageProcessorPullRequestHoldService

PullMessageProcessor从名字就可以看出这是一个拉起消息的处理类,重点看一下在没有拉取到消息是如何处理的

case ResponseCode.PULL_NOT_FOUND:if(brokerAllowSuspend &&hasSuspendFlag){long pollingTimeMills = suspendTimeoutMillisLong;if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();}String topic = requestHeader.getTopic();long offset = requestHeader.getQueueOffset();int queueId = requestHeader.getQueueId();PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response = null;break;}
复制代码

返回编码为PULL_NOT_FOUND,会使用PullRequestHoldService服务,hold住当前的PullRequest,其实就是把PullRequest保存下来;这里面有一个重要的代码就是response 赋值为null,在给SocketChannel发送消息的时候会判断response是否为空,如果我空就返回消息,这时候的SocketChannel其实就处于阻塞状态了,并且当前的线程处理完也释放了;

PullRequestHoldService一方面保存了没有拉取到消息的PullRequest,另一方面内部会启动一个线程检查被hold住的PullRequest释放有新消息,如果有会通知拉取消息,并返回给客户端;

public void run() {log.info("{} service started", this.getServiceName());while (!this.isStopped()) {try {if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {this.waitForRunning(5 * 1000);} else {this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());}long beginLockTimestamp = this.systemClock.now();//异步检查hold住的请求this.checkHoldRequest();long costTime = this.systemClock.now() - beginLockTimestamp;if (costTime > 5 * 1000) {log.info("[NOTIFYME] check hold request cost {} ms.", costTime);}} catch (Throwable e) {log.warn(this.getServiceName() + " service has exception. ", e);}}log.info("{} service end", this.getServiceName());}private void checkHoldRequest() {//遍历所有被保存的没有拉取到消息的pullRequestfor (String key : this.pullRequestTable.keySet()) {String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);if (2 == kArray.length) {String topic = kArray[0];int queueId = Integer.parseInt(kArray[1]);final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);try {//通知拉取最新的消息并且重新分配处理线程,将消息发送给客户端this.notifyMessageArriving(topic, queueId, offset);} catch (Throwable e) {log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);}}}}
复制代码

总结

本文从了解Apollo长轮询机制出发,在了解到使用的是DeferredResult来实现的,深入的过程中发现其底层其实使用的是Servlet的异步处理机制,再深入的过程中发现有释放主线程、申请主线程的过程;这时候就有必要去了解Tomcat的线程模型,这样整个处理流程就串起来了;同时为了了解长轮询机制更多的应用范围,已RocketMQ为例做了一定的分析;这样对整个长轮询机制有更深入的了解.

从Apollo看长轮询相关推荐

  1. 从RocketMQ看长轮询(Long Polling)

    前言 消息队列一般在消费端都会提供push和pull两种模式,RocketMQ同样实现了这两种模式,分别提供了两个实现类:DefaultMQPushConsumer和DefaultMQPullCons ...

  2. Apollo 中的 长轮询 定时机制

    今天这篇文章来介绍一下Nacos配置中心的原理之一:长轮询机制的应用 为方便理解与表达,这里把 Nacos 控制台和 Nacos 注册中心称为 Nacos 服务器(就是 web 界面那个),我们编写的 ...

  3. 用了这么久配置中心,还不知道长轮询是什么?

    前言 传统的静态配置方式想要修改某个配置时,必须重新启动一次应用,如果是数据库连接串的变更,那可能还容易接受一些,但如果变更的是一些运行时实时感知的配置,如某个功能项的开关,重启应用就显得有点大动干戈 ...

  4. java 长轮询_基于springboot 长轮询的实现操作

    springboot 长轮询实现 基于 @EnableAsync , @Sync @SpringBootApplication @EnableAsync public class DemoApplic ...

  5. 转---谈谈HTTP协议中的短轮询、长轮询、长连接和短连接

    作者:伯乐在线专栏作者 - 左潇龙 http://web.jobbole.com/85541/ 如有好文章投稿,请点击 → 这里了解详情 引言 最近刚到公司不到一个月,正处于熟悉项目和源码的阶段,因此 ...

  6. tcp协议中的长连接和短连接服务器,谈谈HTTP协议中的短轮询、长轮询、长连接和短链接...

    undefined 在之前总结 WebSocket 的时候就已经提到过短长轮询了~~今天看公众号文章,又把长短连接引进来一起分析.感觉这种总结很棒,那么我们一起看看呗 长短连接 听说长短连接的话,应该 ...

  7. .xhr长轮询_使用Spring 3.2的DeferredResult进行长轮询

    .xhr长轮询 在我们的最后一集中 , Agile Cowboys Inc.的首席执行官刚刚雇用了Java / Spring顾问,方法是为他提供最初为女友购买的保时捷. 这位首席执行官的女友因失去保时 ...

  8. spring 长轮询_Spring集成文件轮询和测试

    spring 长轮询 我最近实施了一个小项目,在该项目中,我们必须轮询文件夹中的新文件,然后在文件内容上触发服务流. Spring Integration非常适合此要求,因为它带有一个通道适配器 ,该 ...

  9. ajax长轮询 java web_网页实时聊天之js和jQuery实现ajax长轮询

    众所周知,HTTP协议是无状态的,所以一次的请求都是一个单独的事件,和前后都没有联系.所以我们在解决网页实时聊天时就遇到一个问题,如何保证与服务器的长时间联系,从而源源不段地获取信息. 一直以来的方式 ...

最新文章

  1. csgo 人数文件_学生机简单开一个CSGO的社区服务器
  2. Codeforces round 396(Div. 2) 题解
  3. 全国计算机等级考试题库二级C操作题100套(第48套)
  4. 网页字体设置你了解吗?
  5. 人设崩塌?万茜被盗号甩锅程序员,却两次被官方打脸,网友:作死
  6. python 模拟io_Python 的五种io模型理解
  7. 腾讯起诉西瓜视频直播《王者荣耀》;人人车否认破产;苹果人事大变动! | 极客头条...
  8. OnScrollListener
  9. (转)IBM AIX系统安装
  10. JSON怎么转成Excel
  11. 桃源网盘php,桃源居业主自建论坛 - Powered by PHPWind
  12. iHRM 人力资源管理系统_第7章 POI报表的入门
  13. JavaScript实现随机彩票双色球
  14. CTF学习笔记——Easy Calc
  15. 辞职后五险一金怎么办?史上最全处理办法汇总-千氪
  16. 计算机视觉(十六):目标检测概述
  17. type-c速度测试软件,速度篇—Type-C/USB3.0接口到底谁快_固态硬盘评测-中关村在线...
  18. 2步迁移PC端微信聊天记录
  19. linux中查看隐藏文件夹_如何在Linux中隐藏图像中的文件或文件夹
  20. 香港服务器机房TKO

热门文章

  1. SSL的组成,其中握手协议和记录协议实现、密钥生成过程及其评价
  2. c语言关于奇偶数的计算
  3. Pytorch:自定义构建VGG16网络
  4. 计算n以内中的所有数字包含1的个数
  5. 微信小程序开发扫条码wx.scanCode报scanCode:fail
  6. stm32蓝牙遥控小车(hal库)
  7. centos卸载源码安装的php
  8. Hololens 开发笔记(1)——HelloWorld
  9. matlab如何画曲线的切线,Matlab-如何在曲线上绘制切线
  10. matlab灰度图转伪彩色,[转载]灰度图转伪彩色图和彩虹图,基于OpenCV 2.4.3