sender分析之Selector
Selector是kafka自己实现的一个NIO 异步非阻塞的网络IO操作。使用一条单独的线程管理多条网络连接上的连接、读、写操作
一 核心字段
java.nio.channels.Selector nioSelector: 用来监听网络I/O事件
Map<String, KafkaChannel> channels: 维护了NodeId和KafkaChannel之间的映射关系,KafkaChannel是针对SocketChannel的进一步封装
List<Send> completedSends: 保存哪些请求已经完全发送出去
List<String> failedSends: 保存哪些请求发送失败
List<NetworkReceive> completedReceives: 保存已经完全接收到的请求
Map<KafkaChannel,Deque<NetworkReceive>> stagedReceives: 暂存一次OP_READ事件处理过程中读取到的全部请求,当一次OP_READ事件处理完成之后,会将stagedReceives集合中的请求保存到completedReceives集合中
List<String> disconnected: 记录poll过程中断开的连接
List<String> connected:记录poll过程中新建的连接
ChannelBuilder channelBuilder: 用于创建KafkaChannel的工具,根据不同配置创建不同的TransportLayer的子类,然后创建KafkaChannel
int maxReceiveSize: 能够接受请求的最大字节是多大
二 重要方法
2.1 connect
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
if (this.channels.containsKey(id))
throw new IllegalStateException("There isalready a connection for id " + id);
// 创建SocketChannel
SocketChannel socketChannel = SocketChannel.open();
// 设置成非阻塞模式
socketChannel.configureBlocking(false);
// 获取Socket对象
Socket socket = socketChannel.socket();
socket.setKeepAlive(true);// 设置为长连接
// 设置发送buffer的大小
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setSendBufferSize(sendBufferSize);
// 设置接收buffe的大小
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setReceiveBufferSize(receiveBufferSize);
socket.setTcpNoDelay(true);
boolean connected;
try {
// 因为是非阻塞式的,所以SocketChannel.connect方法是发起一个连接,connect方法在连接正式建立
// 之前就可能返回,在后面会通过Selector.finishConnect方法确认连接是否真正的建立
connected = socketChannel.connect(address);
} catch (UnresolvedAddressException e) {
socketChannel.close();
throw new IOException("Can't resolve address:" + address, e);
} catch (IOException e) {
socketChannel.close();
throw e;
}
// 将这个SocketChannel注册到nioSelector上,并关注OP_CONNECT事件
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
// 创建KafkaChannel
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
// 将kafkachannel注册到key
key.attach(channel);
// 将NodeId和KafkaChannel绑定,放到channels中管理
this.channels.put(id, channel);
if (connected) {
// OP_CONNECTwon't trigger for immediately connected channels
log.debug("Immediately connected tonode {}", channel.id());
immediatelyConnectedKeys.add(key);
key.interestOps(0);
}
}
2.2 poll 真正执行网络IO,它会调用nioSelector.select方法等待I/O事件发生
public void poll(long timeout) throws IOException {if (timeout < 0)throw new IllegalArgumentException("timeout should be >= 0");clear(); // 将上一次poll的结果清除掉if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())timeout = 0;/* check ready keys */long startSelect = time.nanoseconds();// 调用nioSelect.select方法等待I/O事件发生int readyKeys = select(timeout);long endSelect = time.nanoseconds();this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());// 处理I/O事件if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);}// 将stagedReceives复制到completedReceivesaddToCompletedReceives();long endIo = time.nanoseconds();this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());// 关闭长期空闲连接maybeCloseOldestConnection(endSelect); }
2.3 pollSelectionKeys 处理OP_CONNECT、OP_READ、OP_WRITE事件,并且会检测连接状态
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,boolean isImmediatelyConnected, long currentTimeNanos) {Iterator<SelectionKey> iterator = selectionKeys.iterator();// 遍历SelectionKeywhile (iterator.hasNext()) {// 获取每一个SelectionKeySelectionKey key = iterator.next();// 从Iterator中移除iterator.remove();// 之前创建连接时,将kafkachannel注册到key上,就是为了在这里获取KafkaChannel channel = channel(key);sensors.maybeRegisterConnectionMetrics(channel.id());if (idleExpiryManager != null)idleExpiryManager.update(channel.id(), currentTimeNanos);try {// 对connect方法返回true或者OP_CONNECTION事件进行处理if (isImmediatelyConnected || key.isConnectable()) {// 会检测SocketChannel是否建立完成,建立后会取消对OP_CONNECT事件的关注,开始// 关注OP_READ事件if (channel.finishConnect()) {// 添加到已连接的集合中this.connected.add(channel.id());this.sensors.connectionCreated.record();SocketChannel socketChannel = (SocketChannel) key.channel();log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",socketChannel.socket().getReceiveBufferSize(),socketChannel.socket().getSendBufferSize(),socketChannel.socket().getSoTimeout(),channel.id());} elsecontinue;}// 调用KafkaChannel的prepare方法进行身份验证if (channel.isConnected() && !channel.ready())channel.prepare();// 处理OP_READ事件if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {NetworkReceive networkReceive;while ((networkReceive = channel.read()) != null)// read方法读到一个完整的NetworkReceive,则将其添加到stagedReceives中保存// 如读取不到一个完整的NetworkReceive,则返回null,下次处理OP_READ事件时,继续读取// 直到读取到一个完整的NetworkReceiveaddToStagedReceives(channel, networkReceive);}// 处理OP_WRITE事件if (channel.ready() && key.isWritable()) {Send send = channel.write();// 上面的write方法将KafkaChannel的send字段发送出去,如果发送未完成,则返回null// 如果发送完成则返回Send,并添加到completedSends集合中带后续处理if (send != null) {this.completedSends.add(send);this.sensors.recordBytesSent(channel.id(), send.size());}}// 如果key无效。则关闭KafkaChannel.并且添加这个channel到断开的连接的集合中if (!key.isValid()) {close(channel);this.disconnected.add(channel.id());}} catch (Exception e) {String desc = channel.socketDescription();if (e instanceof IOException)log.debug("Connection with {} disconnected", desc, e);elselog.warn("Unexpected error from {}; closing connection", desc, e);close(channel);this.disconnected.add(channel.id());}} }
2.4 具体的读写操作交给了KafkaChannel
private boolean send(Send send) throws IOException {// 如果send在一次write调用时没有发送完,SelectionKey的OP_WRITE事件还没有取消,还会继续监听此channel的op_write事件// 直到整个send请求发送完毕才取消send.writeTo(transportLayer);// 判断发送是否完成是通过ByteBuffer中是否还有剩余的字节来判断的if (send.completed())transportLayer.removeInterestOps(SelectionKey.OP_WRITE);return send.completed(); }
public NetworkReceive read() throws IOException {NetworkReceive result = null;// 初始化NetworkReceiveif (receive == null) {receive = new NetworkReceive(maxReceiveSize, id);}// 从TransportLayer中读取数据到NetworkReceive对象中,如果没有读完一个完整的NetworkReceive// 则下次触发OP_READ事件时将继续填充此NetworkReceive对象;如果读完了则将此receive置为空,下次// 触发读操作的时候,创建新的NetworkReceive对象receive(receive);if (receive.complete()) {receive.payload().rewind();result = receive;receive = null;}return result; }
sender分析之Selector相关推荐
- sender分析之创建请求
一 Sender run方法调用流程 # 从Metadata获取集群元数据 # 调用RecordAccumulator.ready方法,根据RecordAccumulator的缓存情况,选出可以向哪些 ...
- Kafka Producer 实现源码分析
前言 拥抱变化接手了 Kafka 平台,遂学习 0.10.0 线上版本的设计与实现.限于篇幅,本文不会逐行解析源码,而是从逻辑流程.设计模式.并发安全等方面学习各组件,笔记仅供个人 Review 一: ...
- 【Python数据分析】波士顿房价分析小例子
%matplotlib inline #将生成的图片嵌入网页中 import matplotlib.pyplot as plt from sklearn imp ...
- 使用NSCondition实现多线程同步
iOS中实现多线程技术有非常多方法. 这里说说使用NSCondition实现多线程同步的问题,也就是解决生产者消费者问题(如收发同步等等). 问题流程例如以下: 消费者取得锁,取产品,假设没有,则wa ...
- 【安全开发】IOS安全编码规范
申明:本文非笔者原创,原文转载自:https://github.com/SecurityPaper/SecurityPaper-web/blob/master/_posts/2.SDL%E8%A7%8 ...
- iphone开发笔记和技巧总结
在iphone程序中实现截屏的一种方法: //导入头文件 #importQuartzCore/QuartzCore.h //将整个self.view大小的图层形式创建一张图片imageUIGrap ...
- C#体育彩票选号器案例
案例分析: 对象:选号器 属性:1.随机数 2.选号器集合(String数组的泛型集合) 要在LIstView中展示 方法:1.单组选号:随机生成7个随机数 2.随机单组:根据输入的组数,生成相 ...
- UI学习第二篇 (控件)
UIbutton 也是一个控件,它属于UIControl 用的最多的就是事件响应 1. //创建按钮对象 UIButton * _botton = [UIButton buttonWithType:U ...
- UIMenuController的简单使用
2019独角兽企业重金招聘Python工程师标准>>> // // ZsyTextField.m // UIMenuController使用简介 // // Created by Z ...
最新文章
- 2022-2028年中国乙酸钴行业发展现状调研及市场前景规划报告
- 通过聚合数据API获取微信精选文章
- 栈的输出_栈和队列--十进制转化为二进制
- python代码块-python一些常用代码块
- gcc交叉编译的实现
- iOS 文字样式处理总结(字体、前背景色、斜体、加粗、对齐、行间距、段间距、动态获取字符串label宽高等)...
- malloc和free
- 李宏毅自然语言处理——多语言BERT
- Android10(Q,API-29)以上版本无法在存储卡目录创建文件夹的问题
- QT绘制同心扇形(Paintevent实现)
- IntelliJ IDEA 更换背景图和背景颜色
- google code 代码托管 用git创建仓库
- Latex beamer 常用操作记录
- C++游戏game | 井字棋游戏坤坤版(配资源+视频)【赋源码,双人对战】
- android 游戏语言设置在哪里设置中文版,使命召唤手游语言变更方法 怎么设置中文...
- 激荡20年,芯片产能从零起步到反超美国,中国制造的又一大成就
- html好友页面,好友列表.html
- Jetpack MVVM 七宗罪之四: 使用 LiveData/StateFlow 发送 Events
- 漫话:什么是平衡(AVL)树?这应该是把AVL树讲的最好的文章了
- java判断两个数互质_AcWing 458. 比例简化-java(无需判断互质)
热门文章
- android日历编程,设置日历并添加 1天_android-calendar_开发99编程知识库
- JavaScript常用工具Date对象和Math介绍介绍
- Python进制转换(利用栈)
- 云服务器cpu系列,云服务器cpu系列
- java提升权限运行_提升代码的运行权限,实现模拟管理员身份的功能
- java 序列化 写入mysql_java 序列化到mysql数据库中
- linux安装mysql默认的配置文件_[转]关于Linux安装mysql默认配置文件位置
- 小学三年级计算机导学案,小学三年级学科导学案.doc
- python matplotlib使用ax绘图
- jmeter java 关联_使用Jmeter进行数据关联和并发用户