1.初始化和启动 SocketServer

var socketServer: SocketServer = null
。。。
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startupProcessors = false)

1.1.初始化 SocketServer

/*** An NIO socket server. The threading model is*   1 Acceptor thread that handles new connections*   Acceptor has N Processor threads that each have their own selector and read requests from sockets*   M Handler threads that handle requests and produce responses back to the processor threads for writing.*/
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {private val maxQueuedRequests = config.queuedMaxRequestsprivate val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")this.logIdent = logContext.logPrefixprivate val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", "socket-server-metrics")private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", "socket-server-metrics")memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONEval requestChannel = new RequestChannel(maxQueuedRequests)private val processors = new ConcurrentHashMap[Int, Processor]()private var nextProcessorId = 0private[network] val acceptors = new ConcurrentHashMap[EndPoint, Acceptor]()private var connectionQuotas: ConnectionQuotas = _private var stoppedProcessingRequests = false。。。
//省略部分代码
}

1.2.启动 SocketServer

  /*** Start the socket server. Acceptors for all the listeners are started. Processors* are started if `startupProcessors` is true. If not, processors are only started when* [[kafka.network.SocketServer#startProcessors()]] is invoked. Delayed starting of processors* is used to delay processing client connections until server is fully initialized, e.g.* to ensure that all credentials have been loaded before authentications are performed.* Acceptors are always started during `startup` so that the bound port is known when this* method completes even when ephemeral ports are used. Incoming connections on this server* are processed when processors start up and invoke [[org.apache.kafka.common.network.Selector#poll]].** @param startupProcessors Flag indicating whether `Processor`s must be started.*/def startup(startupProcessors: Boolean = true) {this.synchronized {connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides)createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)if (startupProcessors) {startProcessors()}}newGauge("NetworkProcessorAvgIdlePercent",new Gauge[Double] {def value = SocketServer.this.synchronized {val ioWaitRatioMetricNames = processors.values.asScala.map { p =>metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags)}ioWaitRatioMetricNames.map { metricName =>Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))}.sum / processors.size}})newGauge("MemoryPoolAvailable",new Gauge[Long] {def value = memoryPool.availableMemory()})newGauge("MemoryPoolUsed",new Gauge[Long] {def value = memoryPool.size() - memoryPool.availableMemory()})info("Started " + acceptors.size + " acceptor threads")}

Kafka核心源码解析 - SocketServer源码解析相关推荐

  1. Kafka 核心源码解读【一】--日志模块

    文章目录 1 日志段:保存消息文件的对象是怎么实现的? 1.1 Kafka 日志结构概览 1.2 日志段代码解析 1.3 日志段类声明 1.4 append 方法 1.5 read 方法 1.6 re ...

  2. SpringKafka原理解析及源码学习-Spring生态(一)

    Kafka 作为老牌消息中间件, 高吞吐是它的拿手好戏. Spring 的生态中也有官方提供的 spring-kafka.jar. 本文主要学习 Spring 官方提供的 spring-kafka.j ...

  3. 自己实现多线程的socket,socketserver源码剖析

    1,IO多路复用 三种多路复用的机制:select.poll.epoll 用的多的两个:select和epoll 简单的说就是: 1,select和poll所有平台都支持,epoll只有linux支持 ...

  4. Java定时任务(一) Timer及TimerTask的案例解析及源码分析

    Java定时任务(一)  Timer及TimerTask的案例解析及源码分析 一.概述: 定时任务这个概念在Java的学习以及项目的开发中并不陌生,应用场景也是多种多样.比如我们会注意到12306网站 ...

  5. Python之socketserver源码分析

    一.socketserver简介 socketserver是一个创建服务器的框架,封装了许多功能用来处理来自客户端的请求,简化了自己写服务端代码.比如说对于基本的套接字服务器(socket-based ...

  6. php项目素材,PHP素材资源解析平台源码V8.0(thinkPHP框架内核)

    PHP素材资源解析平台源码V8.0 第三方平台下载千图网千库网等素材网站下载站 V8版本.十月一日最 新更新,全新的解析架构. 小白问题一:为什么我不能解析? 答:解析是需要开通目标站VIP的. 小白 ...

  7. Go实现的5G核心网开源项目free5gc源码分析系列 | Gopher Daily (2021.01.08) ʕ◔ϖ◔ʔ

    每日一谚:"Abstractions should be discovered, not created." Go技术新闻 Go实现的5G核心网开源项目free5gc源码分析系列 ...

  8. graylog+kafka+zookeeper(单机测试及源码),graylog设置URL报警方式(五)

    graylog+kafka+zookeeper(单机测试及源码),graylog设置URL报警方式(五) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylog组件 ...

  9. graylog+kafka+zookeeper(单机测试及源码),kafka+zookeeper组件部署(二)

    graylog+kafka+zookeeper(单机测试及源码),kafka+zookeeper组件部署(二) 问题背景 graylog+kafka+zookeeper(单机测试及源码),graylo ...

最新文章

  1. springboot + rabbitmq 用了消息确认机制,感觉掉坑里了
  2. kotlin条件表达式
  3. 硬盘为何会丢失数据?
  4. python自动化发送邮件_python接口自动化(三十三)-python自动发邮件总结及实例说明番外篇——下(详解)...
  5. 并查集+二分-hdu-4750-Count The Pairs
  6. 每天天一个linux命令echo,每天一个linux命令-echo(示例代码)
  7. LeetCode 第 35 场双周赛(216/2839,前7.61%)
  8. Django配置数据库读写分离
  9. 理想汽车回应被列入“预摘牌名单”:积极配合审计底稿相关的工作
  10. 单/多文档的窗体类属性修改(VC_MFC)
  11. java调用百度api进行身份证识别
  12. 【转载】DirectX支配游戏!历代GPU架构全解析
  13. Linux下的离线词典,Linux下星际译王离线词库
  14. 央行数字货币离我们还有多远?
  15. PHP:编写标准体重计算器
  16. 塞力斯是鸿蒙系统吗,赛力斯“驼峰”智能增程系统为何物?一亮相就引起行业沸腾...
  17. Verilog 实现千兆网UDP协议 基于88E1111--板级验证--增加ARP
  18. 条码和自动识别的基础知识
  19. leetcode 1419 数青蛙
  20. 20190122——回首向来萧瑟处,无人等在灯火阑珊处。 Java责任链

热门文章

  1. BZOJ1423 : Optimus Prime
  2. MQTT的学习之Mosquitto安装使用(1)
  3. 3 django系列之Form表单在前端web界面渲染与入库保存
  4. javascript实现定时器四秒后跳转到秋秋淘衣坊首页(setInterval计时器)
  5. ROS系列之初识gmapping
  6. 用calloc()函数分配内存
  7. 模板匹配中差值的平方和(SSD)与互相关准则的关系
  8. 7-5 素数判断 (10 分)
  9. MyBatis复习(四):#{}占位符与SQL传参
  10. java命令行参数写哪里_Java的命令行参数