kafka是如何做到百万级高并发低迟延的?
Kafka是高吞吐低延迟的高并发、高性能的消息中间件,在大数据领域有极为广泛的运用。配置良好的Kafka集群甚至可以做到每秒几十万、上百万的超高并发写入。Kafka到底是如何做到这么高的吞吐量和性能的呢?我们今天来走进kafka的server端探究一下它的Reactor高并发网络模型机制。
1.1、Kafka Reactor模型架构
Kafka客户端和服务端通信采取的是NIO的reactor模式,它是一种事件驱动模式。那么一个常见的单线程Reactor模式下,NIO线程的职责都有哪些呢?我们整理了如下几点:
- 作为NIO服务端,接收客户端的TCP连接
- 作为NIO客户端,向服务端发起TCP连接
- 读取通信对端的请求或者应答消息
- 向通信对端发送消息请求或者应答消息
以上四点对应的一个Reactor模式的架构图如下:
对于一些小容量的业务场景,这种单线程的模式基本够用。但是对于高负载、大并发的应用场景却并不适合,主要原因如下:
性能问题1:一个NIO线程同时处理数十万甚至百万级的链路性能是无法支撑的
性能问题2:如果超时发生重试会加重服务端处理负荷,导致大量处理积压
可靠性问题:单个线程出现故障,整个系统无法使用,造成单点故障
所以一个高并发的处理服务需要对以上架构进行优化改造,例如处理采取多线程模式,将接收线程尽量简化,相当于将接收线程作为一个接入层。那么我们回到主题kafka的reactor模式架构是怎样的?
在上面这个kafka的架构图中可以看出,它包含以下几个流程:
- 客户端请求NIO的连接器Acceptor,同时它还具备事件的转发功能,转发到Processor处理
- 服务端网络事件处理器Processor
- 请求队列RequestChannel,存储了所有待处理的请求信息
- 请求处理线程池(RequestHandler Pool)作为守护线程轮训RequestChannel的请求处理信息,并将其转发给API层对应的处理器处理
- API层处理器将请求处理完成之后放入到Response Queue中,并由Processor从Response Queue取出发送到对应的Client端
需要注意的一点是虽然Broker层包含多个Acceptor,但是kafka的reactor模式里面还是单线程Acceptor多线程handler的模式,这里的多个Acceptor是针对一个服务器下多网卡场景的,每个EndPoint就是一个网卡它对应于一个ip和port的组合,而每个Endpoint只有一个Acceptor。
1.2、Kafka Reactor模型源码详解
按照上面架构图阐述的几个流程,它分别对应着kafka里面的事件接收、处理、响应等几个阶段,我们下面从具体实现这几个阶段的源码层面来分析。
1.2.1、SocketServer
SocketServer是一个标准的NIO服务端实现,它主要包含以下变量:
- RequestChannel:Processor和KafkaRequestHandler 之间数据交换的队列
- Processors:processor的容器,存放的是processor的id和processor对象的映射
- Acceptors:acceptor的容器,存放的是EndPoint和acceptor的映射
- ConnectionQuotas:链接限制器,针对每个IP的链接数进行限制
SocketServer的启动流程如下:
部分源码如下,启动入口:
def startup(startupProcessors: Boolean = true) {this.synchronized {connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)if (startupProcessors) {startProcessors()}
}
创建Acceptor及Proccessor实现逻辑:
private def createAcceptorAndProcessors(processorsPerListener: Int,endpoints: Seq[EndPoint]): Unit = synchronized {val sendBufferSize = config.socketSendBufferBytesval recvBufferSize = config.socketReceiveBufferBytesval brokerId = config.brokerIdendpoints.foreach { endpoint =>val listenerName = endpoint.listenerNameval securityProtocol = endpoint.securityProtocolval acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)addProcessors(acceptor, endpoint, processorsPerListener)KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()acceptor.awaitStartup()acceptors.put(endpoint, acceptor)}}
Proccessor线程启动逻辑:
private def startProcessors(processors: Seq[Processor]): Unit = synchronized {processors.foreach { processor =>KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",processor).start()}}
1.2.2、Acceptor
Acceptor是NIO里面的一个轻量级接入服务,它主要包含如下变量:
- nioSelector:Java的NIO网络选择器
- serverChannel:ip和端口绑定到socket
- Processors:processor的容器,存放的是processor对象
它的主要处理流程如下:
- 将nioSelector注册为OP_ACCEPT
- 轮训从nioSelector读取事件
- 通过RR的模式选择processor
- 接收一个新的链接设置(从serverSocketChannel获取socketChannel,并对它的属性进行设置)
- 移交processor的accept处理
重要逻辑代码如下:
def run() {serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)startupComplete()try {var currentProcessor = 0while (isRunning) {try {val ready = nioSelector.select(500)if (ready > 0) {val keys = nioSelector.selectedKeys()val iter = keys.iterator()while (iter.hasNext && isRunning) {try {val key = iter.nextiter.remove()if (key.isAcceptable) {val processor = synchronized {
//通过RR选择ProcessorcurrentProcessor = currentProcessor % processors.sizeprocessors(currentProcessor)}accept(key, processor)} elsethrow new IllegalStateException("Unrecognized key state for acceptor thread.")// round robin to the next processor thread, mod(numProcessors) will be done latercurrentProcessor = currentProcessor + 1} catch {case e: Throwable => error("Error while accepting connection", e)}}}}
socketChannel的链接设置逻辑:def accept(key: SelectionKey, processor: Processor) {val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]val socketChannel = serverSocketChannel.accept()try {connectionQuotas.inc(socketChannel.socket().getInetAddress)socketChannel.configureBlocking(false)socketChannel.socket().setTcpNoDelay(true)socketChannel.socket().setKeepAlive(true)if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)socketChannel.socket().setSendBufferSize(sendBufferSize)processor.accept(socketChannel)}
1.2.3、Processor
Processor的主要职责是将来自客户端的网络链接请求封装成RequestContext并发送给RequestChannel,同时需要对handler处理完的响应回执发送给客户端。它主要包括:
- newConnections:是一个线程安全的队列,存放从acceptor接收到的网络新链接
- inflightResponses:已发送客户端的响应,存放了和客户端的链接id(由本地ip、port以及远端ip、port还有额外一个序列值组成)和响应对象的映射
- responseQueue:是一个阻塞队列,存放handler的响应请求
我们在前面使用的kafka reactor模型架构图上改造一下,就得到如下proccessor的核心逻辑架构:
它的核心逻辑如下几个步骤:
- proccessor线程从newConnections中轮询获取socketChannel,并将selector监听事件修改为OP_READ;
- processNewResponses处理新的响应需求,其中类型为SendAction的就是向客户端发送响应,并将发送的响应记录在inflightResponses ,它的核心逻辑是sendResponse如下:
protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {val connectionId = response.request.context.connectionIdif (channel(connectionId).isEmpty) {response.request.updateRequestMetrics(0L, response)}if (openOrClosingChannel(connectionId).isDefined) {selector.send(responseSend)inflightResponses += (connectionId -> response)}}
- Selector调用poll从客户端获取到的请求信息,并将获取到的NetworkReceive添加到completedReceives缓存中。
- 而processCompletedReceives负责处理completedReceives中的接收信息,最后封装为RequestChannel.Request,再调用requestChannel将请求添加到发送队列(即requestQueue)当中,源码逻辑如下所示:
private def processCompletedReceives() {selector.completedReceives.asScala.foreach { receive =>try {openOrClosingChannel(receive.source) match {case Some(channel) =>val header = RequestHeader.parse(receive.payload)val context = new RequestContext(header, receive.source, channel.socketAddress,channel.principal, listenerName, securityProtocol)val req = new RequestChannel.Request(processor = id, context = context,startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics)requestChannel.sendRequest(req)selector.mute(receive.source)case None =>throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")}} catch {case e: Throwable =>processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)}}}
1.2.4、RequestChannel
requestChannel承载了kafka请求和响应的所有转发,它包含有如下两个变量:
- requestQueue:是一个加锁阻塞队列,RequestChannel传输请求和响应信息的重要组件,上面讲到的RequestChannel.Request就是被放入到这个队列中
- Processors:存储了processorid和processor的映射关系,主要是在response发送的时候从中选择对应的processor
它的两个核心功能是添加请求和发送响应回执,源码逻辑分别如下:
def sendRequest(request: RequestChannel.Request) {requestQueue.put(request)}
发送响应回执和之前processor略有不同,这里只是将response再添加到responseQueue中,之后由processor轮训从里面取出回执发送到客户端。
def sendResponse(response: RequestChannel.Response) {//省略log traceval processor = processors.get(response.processor)if (processor != null) {processor.enqueueResponse(response)}}
1.2.5、KafkaRequestHandler
说到KafkaRequestHandler ,首先要往回聊一聊,看看它是如何产生的。它被KafkaRequestHandlerPool所创建,而pool是在kafkaServer启动的时候创建的,源码如下:
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,config.numIoThreads)
可以看出它创建时已传递了brokerid、requestChannel,那么一个kafkaServer里面就只有一个requestChannel和brokerid对应。而KafkaRequestHandler 是在pool初始化时通过numIoThreads配置来创建起来的,同时将它启动并设置为守护线程,源码逻辑如下:
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)for (i <- 0 until numThreads) {createHandler(i)}def createHandler(id: Int): Unit = synchronized {runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()}
好了,讲解完KafkaRequestHandler 的创建过程,接下来就是它的处理逻辑了,它的逻辑很简单,流程如下几个步骤:
- 从requestChannel拉取请求
- 判断请求类型,如果是Request类型则调用KafkaApis处理相应的请求
1.3、改进和优化
至此,我们已经将kafka的reactor模型分析完,最后提一个发散性问题,基于kafka实现的这个reactor模型以及源码的分析实现,如果让你来设计,你觉得还有哪些是可能存在性能瓶颈的地方可以做进一步优化的,大家可以在下面留言发表你的看法,下期会把我对这个问题的一些思考分享出来。
kafka是如何做到百万级高并发低迟延的?相关推荐
- 你们说说kafka是如何做到百万级高并发低迟延的?
Kafka是高吞吐低延迟的高并发.高性能的消息中间件,在大数据领域有极为广泛的运用.配置良好的Kafka集群甚至可以做到每秒几十万.上百万的超高并发写入.Kafka到底是如何做到这么高的吞吐量和性能的 ...
- 9基于linux百万级高并发框架Skynet-王桂林-专题视频课程
<9>基于linux百万级高并发框架Skynet-830人已学习 课程介绍 全面介绍一款专门为游戏服务所打造的后台框架skynet,Actor模型的剖析与搭建,通用服务模块 ...
- 【揭秘】12306是如何抗住几亿日活、百万级高并发的?
[欢迎关注微信公众号:厦门微思网络] 微思网络(官网):https://www.xmws.cn/ 每到节假日期间,一二线城市返乡.外出游玩的人们几乎都面临着一个问题:抢火车票! 虽然现在大多数情况下都 ...
- 刘志勇:微博短视频百万级高并发架构
本文来自新浪微博视频平台资深架构师刘志勇在LiveVideoStackCon 2018讲师热身分享,并由LiveVideoStack整理而成.分享中刘志勇从设计及服务可用性方面,详细解析了微博短视频高 ...
- 百万用户的网站访问云服务器,大型网站百万级高并发测试–MySpace云测试CloudTest™...
2009年12月MySpace在新西兰对用户推出了音乐和视频的服务功能,这些新功能包括能够观看音乐录像,艺术家的视频搜索,创建收藏夹列表,等等.因为MySpace网站在任何国家每日的访问量是巨大的,这 ...
- OPPO百万级高并发MongoDB集群性能数十倍提升优化实践
点击蓝色"架构文摘"关注我哟 加个"星标",每天上午 09:25,干货推送! 1. 背景 线上某集群峰值TPS超过100万/秒左右(主要为写流量,读流量很低), ...
- 互联网产品之百万级高并发技术整体架构
高并发是由于移动APP或网站PV(page view)即页面浏览量或点击量大,单台服务器无法承载大量访问所带来的压力,因此会采用服务器集群技术,用N台服务器进行分流,对于每次访问采取负载均衡策略,被分 ...
- 高并发服务器开源项目,百万级高并发WebRTC流媒体服务器设计与开发(示例代码)...
第1章 课程导学与准备工作 本章主要介绍为何要学习WebRTC流媒体服务器开发,以及本门课能为我们带来哪些收获.之后会为大家介绍本课程内容具体安排,最后给出如何学好这门课程的一些学习建议.希望大家都能 ...
- MySQL百万级高并发网站优化
为什么80%的码农都做不了架构师?>>> 一网站以下简称A站,这A站在年后流量猛增从一天的七八十万猛跑到了好几百万的IP,一天下来接近一千万的PV让整个服务器在高压下超负荷的工 ...
最新文章
- 小冰负责人李笛:微软不缺钱,缺对未来的把握
- guava Throwables类文档翻译及用法入门
- [Js]删除数组指定元素
- 用计算机源码计算加法,MFC实现简单计算器(支持加减乘除和括号运算)
- 网络营销——网站在网络营销优化中不收录了怎么办呢?
- linux下read函数缺失字节_机器人、工控机和Linux 网络编程接口能否蹭出火花?
- outlook本地存储设置_商务文档为什么要存储在OneDrive for business 上?
- android 各版本市占率,Android各版本市占率:果冻豆遥遥领先
- opencv图像切割1-KMeans方法
- 墓碑上的字符C语言,C语言编程练习6:墓碑上的字符
- 高级工程师究竟比你“高”在哪?
- 人脸方向学习(一):人脸质量评价-模糊检测方法总结一
- SpringBoot+Vue的租房管理系统(毕设, 包含前后台)
- 机电工程专业技术-测量技术
- win10系统问题记录(一):解决D/E盘根目录出现的msdia80.dll文件
- 07 Halcon 点云平面角测量
- uva1391Astronauts【2-SAT】
- Windows脚本:打开浏览器访问任意网址
- 从老板的裤裆拉链看 Google 管理之道
- Matlab 绘制多条曲线,方法!
热门文章
- 『面试知识集锦100篇』2.linux篇丨shell基础命令全集,我奶奶的速查手册!!
- 论文解读:ViT | AN IMAGE IS WORTH 16X16 WORDS: TRANSFORMERS FOR IMAGE RECOGNITION AT SCALE
- 使用天平3次,从12个乒乓球找唯一1个轻重未知的废品
- Guriddo jqGrid的学习-入门
- 微信小程序单行文本溢出部分省略号显示无效
- 解决 Unexpected lexical declaration in case block.报错
- 猜猜数据结构D301
- TSM,TRN 神经网络模型解析附代码
- 对不起,学习Python的捷径教程有99%的人都已经知道了
- c#4.0捷径教程委托、匿名方法和事件笔记