Kafka是高吞吐低延迟的高并发、高性能的消息中间件,在大数据领域有极为广泛的运用。配置良好的Kafka集群甚至可以做到每秒几十万、上百万的超高并发写入。Kafka到底是如何做到这么高的吞吐量和性能的呢?我们今天来走进kafka的server端探究一下它的Reactor高并发网络模型机制。

1.1、Kafka Reactor模型架构

Kafka客户端和服务端通信采取的是NIO的reactor模式,它是一种事件驱动模式。那么一个常见的单线程Reactor模式下,NIO线程的职责都有哪些呢?我们整理了如下几点:

  • 作为NIO服务端,接收客户端的TCP连接
  • 作为NIO客户端,向服务端发起TCP连接
  • 读取通信对端的请求或者应答消息
  • 向通信对端发送消息请求或者应答消息

以上四点对应的一个Reactor模式的架构图如下:

对于一些小容量的业务场景,这种单线程的模式基本够用。但是对于高负载、大并发的应用场景却并不适合,主要原因如下:

性能问题1:一个NIO线程同时处理数十万甚至百万级的链路性能是无法支撑的

性能问题2:如果超时发生重试会加重服务端处理负荷,导致大量处理积压

可靠性问题:单个线程出现故障,整个系统无法使用,造成单点故障

所以一个高并发的处理服务需要对以上架构进行优化改造,例如处理采取多线程模式,将接收线程尽量简化,相当于将接收线程作为一个接入层。那么我们回到主题kafka的reactor模式架构是怎样的?

在上面这个kafka的架构图中可以看出,它包含以下几个流程:

  • 客户端请求NIO的连接器Acceptor,同时它还具备事件的转发功能,转发到Processor处理
  • 服务端网络事件处理器Processor
  • 请求队列RequestChannel,存储了所有待处理的请求信息
  • 请求处理线程池(RequestHandler Pool)作为守护线程轮训RequestChannel的请求处理信息,并将其转发给API层对应的处理器处理
  • API层处理器将请求处理完成之后放入到Response Queue中,并由Processor从Response Queue取出发送到对应的Client端

需要注意的一点是虽然Broker层包含多个Acceptor,但是kafka的reactor模式里面还是单线程Acceptor多线程handler的模式,这里的多个Acceptor是针对一个服务器下多网卡场景的,每个EndPoint就是一个网卡它对应于一个ip和port的组合,而每个Endpoint只有一个Acceptor。

1.2、Kafka Reactor模型源码详解

按照上面架构图阐述的几个流程,它分别对应着kafka里面的事件接收、处理、响应等几个阶段,我们下面从具体实现这几个阶段的源码层面来分析。

1.2.1、SocketServer

SocketServer是一个标准的NIO服务端实现,它主要包含以下变量:

  • RequestChannel:Processor和KafkaRequestHandler 之间数据交换的队列
  • Processors:processor的容器,存放的是processor的id和processor对象的映射
  • Acceptors:acceptor的容器,存放的是EndPoint和acceptor的映射
  • ConnectionQuotas:链接限制器,针对每个IP的链接数进行限制

SocketServer的启动流程如下:

部分源码如下,启动入口:

def startup(startupProcessors: Boolean = true) {this.synchronized {connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)if (startupProcessors) {startProcessors()}
}

创建Acceptor及Proccessor实现逻辑:

private def createAcceptorAndProcessors(processorsPerListener: Int,endpoints: Seq[EndPoint]): Unit = synchronized {val sendBufferSize = config.socketSendBufferBytesval recvBufferSize = config.socketReceiveBufferBytesval brokerId = config.brokerIdendpoints.foreach { endpoint =>val listenerName = endpoint.listenerNameval securityProtocol = endpoint.securityProtocolval acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)addProcessors(acceptor, endpoint, processorsPerListener)KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()acceptor.awaitStartup()acceptors.put(endpoint, acceptor)}}

Proccessor线程启动逻辑:

private def startProcessors(processors: Seq[Processor]): Unit = synchronized {processors.foreach { processor =>KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",processor).start()}}

1.2.2、Acceptor 

Acceptor是NIO里面的一个轻量级接入服务,它主要包含如下变量:

  • nioSelector:Java的NIO网络选择器
  • serverChannel:ip和端口绑定到socket
  • Processors:processor的容器,存放的是processor对象

它的主要处理流程如下:

  1. 将nioSelector注册为OP_ACCEPT
  2. 轮训从nioSelector读取事件
  3. 通过RR的模式选择processor
  4. 接收一个新的链接设置(从serverSocketChannel获取socketChannel,并对它的属性进行设置)
  5. 移交processor的accept处理

重要逻辑代码如下:

def run() {serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)startupComplete()try {var currentProcessor = 0while (isRunning) {try {val ready = nioSelector.select(500)if (ready > 0) {val keys = nioSelector.selectedKeys()val iter = keys.iterator()while (iter.hasNext && isRunning) {try {val key = iter.nextiter.remove()if (key.isAcceptable) {val processor = synchronized {
//通过RR选择ProcessorcurrentProcessor = currentProcessor % processors.sizeprocessors(currentProcessor)}accept(key, processor)} elsethrow new IllegalStateException("Unrecognized key state for acceptor thread.")// round robin to the next processor thread, mod(numProcessors) will be done latercurrentProcessor = currentProcessor + 1} catch {case e: Throwable => error("Error while accepting connection", e)}}}}
socketChannel的链接设置逻辑:def accept(key: SelectionKey, processor: Processor) {val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]val socketChannel = serverSocketChannel.accept()try {connectionQuotas.inc(socketChannel.socket().getInetAddress)socketChannel.configureBlocking(false)socketChannel.socket().setTcpNoDelay(true)socketChannel.socket().setKeepAlive(true)if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)socketChannel.socket().setSendBufferSize(sendBufferSize)processor.accept(socketChannel)} 

1.2.3、Processor 

Processor的主要职责是将来自客户端的网络链接请求封装成RequestContext并发送给RequestChannel,同时需要对handler处理完的响应回执发送给客户端。它主要包括:

  • newConnections:是一个线程安全的队列,存放从acceptor接收到的网络新链接
  • inflightResponses:已发送客户端的响应,存放了和客户端的链接id(由本地ip、port以及远端ip、port还有额外一个序列值组成)和响应对象的映射
  • responseQueue:是一个阻塞队列,存放handler的响应请求

我们在前面使用的kafka reactor模型架构图上改造一下,就得到如下proccessor的核心逻辑架构:

它的核心逻辑如下几个步骤:

  • proccessor线程从newConnections中轮询获取socketChannel,并将selector监听事件修改为OP_READ;
  • processNewResponses处理新的响应需求,其中类型为SendAction的就是向客户端发送响应,并将发送的响应记录在inflightResponses ,它的核心逻辑是sendResponse如下:
 protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {val connectionId = response.request.context.connectionIdif (channel(connectionId).isEmpty) {response.request.updateRequestMetrics(0L, response)}if (openOrClosingChannel(connectionId).isDefined) {selector.send(responseSend)inflightResponses += (connectionId -> response)}}
  • Selector调用poll从客户端获取到的请求信息,并将获取到的NetworkReceive添加到completedReceives缓存中。
  • 而processCompletedReceives负责处理completedReceives中的接收信息,最后封装为RequestChannel.Request,再调用requestChannel将请求添加到发送队列(即requestQueue)当中,源码逻辑如下所示:
 private def processCompletedReceives() {selector.completedReceives.asScala.foreach { receive =>try {openOrClosingChannel(receive.source) match {case Some(channel) =>val header = RequestHeader.parse(receive.payload)val context = new RequestContext(header, receive.source, channel.socketAddress,channel.principal, listenerName, securityProtocol)val req = new RequestChannel.Request(processor = id, context = context,startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics)requestChannel.sendRequest(req)selector.mute(receive.source)case None =>throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")}} catch {case e: Throwable =>processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)}}}

1.2.4、RequestChannel

requestChannel承载了kafka请求和响应的所有转发,它包含有如下两个变量:

  • requestQueue:是一个加锁阻塞队列,RequestChannel传输请求和响应信息的重要组件,上面讲到的RequestChannel.Request就是被放入到这个队列中
  • Processors:存储了processorid和processor的映射关系,主要是在response发送的时候从中选择对应的processor

它的两个核心功能是添加请求和发送响应回执,源码逻辑分别如下:

def sendRequest(request: RequestChannel.Request) {requestQueue.put(request)}

发送响应回执和之前processor略有不同,这里只是将response再添加到responseQueue中,之后由processor轮训从里面取出回执发送到客户端。

  def sendResponse(response: RequestChannel.Response) {//省略log traceval processor = processors.get(response.processor)if (processor != null) {processor.enqueueResponse(response)}}

1.2.5、KafkaRequestHandler 

说到KafkaRequestHandler ,首先要往回聊一聊,看看它是如何产生的。它被KafkaRequestHandlerPool所创建,而pool是在kafkaServer启动的时候创建的,源码如下:

requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,config.numIoThreads)

可以看出它创建时已传递了brokerid、requestChannel,那么一个kafkaServer里面就只有一个requestChannel和brokerid对应。而KafkaRequestHandler 是在pool初始化时通过numIoThreads配置来创建起来的,同时将它启动并设置为守护线程,源码逻辑如下:

 val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)for (i <- 0 until numThreads) {createHandler(i)}def createHandler(id: Int): Unit = synchronized {runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()}

好了,讲解完KafkaRequestHandler 的创建过程,接下来就是它的处理逻辑了,它的逻辑很简单,流程如下几个步骤:

  • 从requestChannel拉取请求
  • 判断请求类型,如果是Request类型则调用KafkaApis处理相应的请求

1.3、改进和优化

至此,我们已经将kafka的reactor模型分析完,最后提一个发散性问题,基于kafka实现的这个reactor模型以及源码的分析实现,如果让你来设计,你觉得还有哪些是可能存在性能瓶颈的地方可以做进一步优化的,大家可以在下面留言发表你的看法,下期会把我对这个问题的一些思考分享出来。

kafka是如何做到百万级高并发低迟延的?相关推荐

  1. 你们说说kafka是如何做到百万级高并发低迟延的?

    Kafka是高吞吐低延迟的高并发.高性能的消息中间件,在大数据领域有极为广泛的运用.配置良好的Kafka集群甚至可以做到每秒几十万.上百万的超高并发写入.Kafka到底是如何做到这么高的吞吐量和性能的 ...

  2. 9基于linux百万级高并发框架Skynet-王桂林-专题视频课程

    <9>基于linux百万级高并发框架Skynet-830人已学习 课程介绍         全面介绍一款专门为游戏服务所打造的后台框架skynet,Actor模型的剖析与搭建,通用服务模块 ...

  3. 【揭秘】12306是如何抗住几亿日活、百万级高并发的?

    [欢迎关注微信公众号:厦门微思网络] 微思网络(官网):https://www.xmws.cn/ 每到节假日期间,一二线城市返乡.外出游玩的人们几乎都面临着一个问题:抢火车票! 虽然现在大多数情况下都 ...

  4. 刘志勇:微博短视频百万级高并发架构

    本文来自新浪微博视频平台资深架构师刘志勇在LiveVideoStackCon 2018讲师热身分享,并由LiveVideoStack整理而成.分享中刘志勇从设计及服务可用性方面,详细解析了微博短视频高 ...

  5. 百万用户的网站访问云服务器,大型网站百万级高并发测试–MySpace云测试CloudTest™...

    2009年12月MySpace在新西兰对用户推出了音乐和视频的服务功能,这些新功能包括能够观看音乐录像,艺术家的视频搜索,创建收藏夹列表,等等.因为MySpace网站在任何国家每日的访问量是巨大的,这 ...

  6. OPPO百万级高并发MongoDB集群性能数十倍提升优化实践

    点击蓝色"架构文摘"关注我哟 加个"星标",每天上午 09:25,干货推送! 1. 背景 线上某集群峰值TPS超过100万/秒左右(主要为写流量,读流量很低), ...

  7. 互联网产品之百万级高并发技术整体架构

    高并发是由于移动APP或网站PV(page view)即页面浏览量或点击量大,单台服务器无法承载大量访问所带来的压力,因此会采用服务器集群技术,用N台服务器进行分流,对于每次访问采取负载均衡策略,被分 ...

  8. 高并发服务器开源项目,百万级高并发WebRTC流媒体服务器设计与开发(示例代码)...

    第1章 课程导学与准备工作 本章主要介绍为何要学习WebRTC流媒体服务器开发,以及本门课能为我们带来哪些收获.之后会为大家介绍本课程内容具体安排,最后给出如何学好这门课程的一些学习建议.希望大家都能 ...

  9. MySQL百万级高并发网站优化

    为什么80%的码农都做不了架构师?>>>    一网站以下简称A站,这A站在年后流量猛增从一天的七八十万猛跑到了好几百万的IP,一天下来接近一千万的PV让整个服务器在高压下超负荷的工 ...

最新文章

  1. 小冰负责人李笛:微软不缺钱,缺对未来的把握
  2. guava Throwables类文档翻译及用法入门
  3. [Js]删除数组指定元素
  4. 用计算机源码计算加法,MFC实现简单计算器(支持加减乘除和括号运算)
  5. 网络营销——网站在网络营销优化中不收录了怎么办呢?
  6. linux下read函数缺失字节_机器人、工控机和Linux 网络编程接口能否蹭出火花?
  7. outlook本地存储设置_商务文档为什么要存储在OneDrive for business 上?
  8. android 各版本市占率,Android各版本市占率:果冻豆遥遥领先
  9. opencv图像切割1-KMeans方法
  10. 墓碑上的字符C语言,C语言编程练习6:墓碑上的字符
  11. 高级工程师究竟比你“高”在哪?
  12. 人脸方向学习(一):人脸质量评价-模糊检测方法总结一
  13. SpringBoot+Vue的租房管理系统(毕设, 包含前后台)
  14. 机电工程专业技术-测量技术
  15. win10系统问题记录(一):解决D/E盘根目录出现的msdia80.dll文件
  16. 07 Halcon 点云平面角测量
  17. uva1391Astronauts【2-SAT】
  18. Windows脚本:打开浏览器访问任意网址
  19. 从老板的裤裆拉链看 Google 管理之道
  20. Matlab 绘制多条曲线,方法!

热门文章

  1. 『面试知识集锦100篇』2.linux篇丨shell基础命令全集,我奶奶的速查手册!!
  2. 论文解读:ViT | AN IMAGE IS WORTH 16X16 WORDS: TRANSFORMERS FOR IMAGE RECOGNITION AT SCALE
  3. 使用天平3次,从12个乒乓球找唯一1个轻重未知的废品
  4. Guriddo jqGrid的学习-入门
  5. 微信小程序单行文本溢出部分省略号显示无效
  6. 解决 Unexpected lexical declaration in case block.报错
  7. 猜猜数据结构D301
  8. TSM,TRN 神经网络模型解析附代码
  9. 对不起,学习Python的捷径教程有99%的人都已经知道了
  10. c#4.0捷径教程委托、匿名方法和事件笔记