NioEventLoopGroup 源码分析

1. 在阅读源码时做了一定的注释,并且做了一些测试分析源码内的执行流程,由于博客篇幅有限。为了方便 IDE 查看、跟踪、调试 代码,所以在 github上提供 netty 的源码、详细的注释及测试用例。欢迎大家 star、fork !

2. 由于个人水平有限,对源码的分析理解可能存在偏差或不透彻的地方还请大家在评论区指出,谢谢!

从今天开始,就准备进军 ne tty 了,主要的想法是看看 netty4 中一些比较重要的实现,也就是能经常出现在我们面前的东西。主要是: 线程池、通道、管道、编解码器、以及常用的工具类。

然后现在看源码应该不会像之前的 jdk 那么细致了,主要是看了一个类以后就发现 netty 对代码封装太强了,基本一个功能可能封装了七八个类去实现,很多的抽象类但是这些抽象类中的功能还非常的多。所以说主要看这个流程,以及里面写的比较好的代码或者比较新的思想会仔细的去看看。具体的子字段,每个方法不可能做到那么细致。

好,正式开始 netty 源码征战 !

1. 基本思路

这里首先讲一下结论,也就是先说我看这个类的源码整理出来的思路,主要就是因为这些类太杂,一个功能在好几个类中才完全实现。

我们在 new 一个 worker/boss 线程的时候一般是采用的直接使用的无参的构造方法,但是无参的构造方法他创建的线程池的大小是我们 CPU 核心的 2 倍。紧接着就需要 new 这么多个线程放到线程池里面,这里的线程池采用的数据结构是一个数组存放的,每一个线程需要设置一个任务队列,显然任务队列使用的是一个阻塞队列,这里实际采用的是LinkedBlockQueue,然后回想一下在 jdk 中的线程池是不是还有一个比较重要的参数就是线程工厂,对的!这里也有这个东西,他是需要我们手动传入的,但是如果不传则会使用一个默认的线程工厂,里面有一个newThread方法,这个方法实现基本和 jdk 中的实现一模一样,就是创建一个级别为 5 的非 Daemon 线程。对这就是我们在创建一个线程池时候完成的全部工作!

好现在来具体说一下,我们每次创建的是NioEventLoopGroup但是他又继承了 n 个类才实现了线程池,也就是线程池的祖先是ScheduledExecutorService是 jdk 中的线程池的一个接口,其中里面最重要的数据结构就是一个 children 数组,用来装线程的。

然后具体的线程他也是进行了封装的,也就是我们常看到的NioEventLoop。这个类里面有两个比较重要的结构:taskQueue 和 thread 。很明显这个非常类似 jdk 中的线程池。

2. NioEventLoopGroup 线程池分析

首先要创建线程池,传入的线程数为 0,他是一直在调用this()最后追溯到super(nThreads,threadFactory,selectorProvider)也就是使用了MultithreadEventLoopGroup的构造方法,在这一步确定了当传入的线程数为 0 时应该设置的线程数为 CPU 核心的两倍。然后再次上调,调用了MultithreadEventExecutorGroup的构造方法,在这里才是真正的开始了线程池的初始化。

首先设置了线程池工厂,然后初始化 chooser ,接着创建 n 个线程放到 children 数组中,最后设置线程中断的监听事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
复制代码
/*** 这个方法流程:* 1、设置了默认的线程工厂* 2、初始化 chooser* 3、创建nTreads个NioEventLoop对象保存在children数组中* 4、添加中断的监听事件* @param nThreads* @param threadFactory* @param args*/protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}// 默认使用线程工厂是 DefaultThreadFactoryif (threadFactory == null) {threadFactory = newDefaultThreadFactory();}children = new SingleThreadEventExecutor[nThreads];// 二的平方的实现是看 n&-n==n//根据线程个数是否为2的幂次方,采用不同策略初始化chooserif (isPowerOfTwo(children.length)) {chooser = new PowerOfTwoEventExecutorChooser();} else {chooser = new GenericEventExecutorChooser();}//产生nTreads个NioEventLoop对象保存在children数组中for (int i = 0; i < nThreads; i ++) {boolean success = false;try {children[i] = newChild(threadFactory, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {// 没成功,把已有的线程优雅关闭if (!success) {for (int j = 0; j < i; j ++) {children[j].shutdownGracefully();}// 没有完全关闭的线程让它一直等待for (int j = 0; j < i; j ++) {EventExecutor e = children[j];try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);}} catch (InterruptedException interrupted) {Thread.currentThread().interrupt();break;}}}}}// 对每一个 children 添加中断线程时候的监听事件,就是将 terminatedChildren 自增// 判断是否到达线程总数,是则更新 terminationFuturefinal FutureListener<Object> terminationListener = new FutureListener<Object>() {@Overridepublic void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {terminationFuture.setSuccess(null);}}};for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);}}
复制代码

其中有一个 if 分支用来初始化 chooser ,这个 chooser 就是用来选择使用哪个线程来执行哪些操作的。这里用到了判断一个数是否为 2 的次幂的一个方法isPowerOfTwo()实现比较有意思,贴出来。

1
2
3
复制代码
private static boolean isPowerOfTwo(int val) {return (val & -val) == val;
}
复制代码

接下来目光要转向newChild(threadFactory, args),因为在这个类里面这个方法是抽象的,在NioEventLoopGroup得到了实现。其实看到了也非常的简单粗暴,直接 new 了一个NioEventLoop,接下来就应该分析这个线程的包装类了。

1
2
3
4
5
6
复制代码
@Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {// 这里才是重点 也就是真正的线程 被放在自己的 children 数组中return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
复制代码

3. NioEventLoop 线程分析

上面已经看到了,newChild方法就是 new 了一个NioEventLoop。所以有必要好好看看这个线程包装类。

这个类的构造方法是调用了父类SingleThreadEventLoop的构造,接着继续上调SingleThreadEventExecutor构造,在这个类中才真正的实现了线程的构造。里面就做了两件事 :

  1. new 了一个新的线程,新的线程还分配了一个任务,任务的内容就是调用本类中的一个 run 方法,在NioEventLoop中实现。

  2. 设置任务队列为LinkedBlockQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
复制代码
/*** 构造方法主要完成了:* 1、new 一个新的线程执行一个 run 方法* 2、用 LinkedBlockQueue 初始化 taskQueue* @param parent* @param threadFactory* @param addTaskWakesUp*/protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {if (threadFactory == null) {throw new NullPointerException("threadFactory");}this.parent = parent;this.addTaskWakesUp = addTaskWakesUp;// new 了一个新的线程thread = threadFactory.newThread(new Runnable() {@Overridepublic void run() {boolean success = false;updateLastExecutionTime();try {// 调用一个 run 方法SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {// 让线程关闭for (;;) {int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});// 使用 LinkedBlockQueue 初始化 taskQueuetaskQueue = newTaskQueue();}
复制代码

然后看一下他要执行的 run 方法在NioEventLoop中得到了实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
复制代码
/***'wakenUp.compareAndSet(false, true)' 一般都会在 select.wakeUp() 之前执行* 因为这样可以减少 select.wakeUp() 调用的次数,select.wakeUp() 调用是一个代价* 很高的操作* 注意:如果说我们过早的把 wakenUp 设置为 true,可能导致线程的竞争问题,过早设置的情形如下:1) Selector is waken up between 'wakenUp.set(false)' and'selector.select(...)'. (BAD)2) Selector is waken up between 'selector.select(...)' and'if (wakenUp.get()) { ... }'. (OK)在第一种情况中 wakenUp 被设置为 true 则 select 会立刻被唤醒直到 wakenUp 再次被设置为 false但是wakenUp.compareAndSet(false, true)会失败,并且导致所有希望唤醒他的线程都会失败导致select 进行不必要的休眠为了解决这个问题我们是在 wakenUp 为 true 的时候再次对 select 进行唤醒。*/@Overrideprotected void run() {for (;;) {// 获取之前的线程状态,并让 select 阻塞boolean oldWakenUp = wakenUp.getAndSet(false);try {// 有任务在线程创建之后直接开始 selectif (hasTasks()) {selectNow(); //直接调用了 select 的 selectNow 然后再次唤醒同下面的代码// 没有任务} else {// 自旋进行等待可进行 select 操作select(oldWakenUp);// 再次唤醒,解决并发问题if (wakenUp.get()) {selector.wakeup();}}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;// 都是处理 selected 的通道的数据,并执行所有的任务,只是在 runAllTasks 传的参数不同if (ioRatio == 100) {processSelectedKeys();runAllTasks();} else {final long ioStartTime = System.nanoTime();processSelectedKeys();final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}if (isShuttingDown()) {closeAll();if (confirmShutdown()) {break;}}} catch (Throwable t) {logger.warn("Unexpected exception in the selector loop.", t);// Prevent possible consecutive immediate failures that lead to// excessive CPU consumption.try {Thread.sleep(1000);} catch (InterruptedException e) {// Ignore.}}}}
复制代码

紧接着就是分析这个 run 方法,也就是线程在被创建之后进行的一系列操作。里面主要做了三件事:

  1. 进行 select

  2. 处理 selectedKeys

  3. 唤醒队列中所有的任务

上面的操作都是在一个循环里面一直执行的,所以说NioEventLoop这个线程的作用就只有一个那就是:进行任务处理。在这个线程被 new 出来时我们就给他分配了线程的任务就是永不停歇的进行上面的操作。

上面的过程说的是有线程安全问题,也就是如果我们过早的把 wakenUp 设置为 true,我们的 select 就会苏醒过来,而其他的线程不清楚这种状态想要设置为 wakenUp 的时候都会失败,导致 select 休眠。主要感觉有点是因为这个东西不是线程间可见的,要是采用 volatile 可能就会解决这个问题,但是 wakenUp 是 final 的不能使用 volatile 关键字修饰。所以作者采用的解决方案就是再次手动唤醒,防止由于其他线程并发设置 wakenUp 的值导致的不必要的休眠。

然后要说一下 select 方法,这个方法的调用主要因为在队列中没有任务,所以就暂时不用 select ,这个方法里面做的就是自旋的去 select ,没有任务就 等待一段时间再去 select。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
复制代码
/*** 这个方法主要干的事情:* 1、如果不需要等待就直接 select* 2、需要等待则等一个超时时间再去 select* 这个过程是不停进行的也就是死循环直达有任务可进行 select 时 select 完毕退出循环* @param oldWakenUp* @throws IOException*/private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {// 不用等待进行一次 select 操作long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// 等一个超时再去选择int selectedKeys = selector.select(timeoutMillis);selectCnt ++;if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - Selected something,// - waken up by user, or// - the task queue has a pending task.// - a scheduled task is ready for processingbreak;}if (Thread.interrupted()) {// Thread was interrupted so reset selected keys and break so we not run into a busy loop.// As this is most likely a bug in the handler of the user or it's client library we will// also log it.//// See https://github.com/netty/netty/issues/2426if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// The selector returned prematurely many times in a row.// Rebuild the selector to work around the problem.logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding selector.",selectCnt);rebuildSelector();selector = this.selector;// Select again to populate selectedKeys.selector.selectNow();selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);}// Harmless exception - log anyway}}
复制代码

接着就是processSelectedKeys();runAllTasks();这两个方法,前一个方法不说就是和我们写 Nio 的时候的步骤差不多,遍历 selectedKeys 处理,然后runAllTasks()执行所有的任务的 run 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码
protected boolean runAllTasks() {fetchFromDelayedQueue();Runnable task = pollTask();if (task == null) {return false;}// 这个循环就是用来循环任务队列中的所有任务for (;;) {try {task.run();} catch (Throwable t) {logger.warn("A task raised an exception.", t);}task = pollTask(); // 循环条件if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();return true;}}}
复制代码

4. 总结

好了其实到这里线程池其实分析的已经差不多了,对于很多的细节问题并没有仔细的去看,单丝我们清楚流程以及里面的结构基本就差不多了。

NioEventLoopGroup中包装了NioEventLoop线程任务。具体包装在了 children 数组中,然后使用 newThread 工厂创建线程,接着给线程分配任务,任务就是进行 select 操作。

在此我向大家推荐一个架构学习交流群。交流学习群号: 744642380, 里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良

NioEventLoopGroup 源码分析相关推荐

  1. Netty Pipeline源码分析(2)

    原文链接:https://wangwei.one/posts/net... 前面 ,我们分析了Netty Pipeline的初始化及节点添加与删除逻辑.接下来,我们将来分析Pipeline的事件传播机 ...

  2. netty源码分析系列——EventLoop

    2019独角兽企业重金招聘Python工程师标准>>> 前言 EventLoop也是netty作为一个事件驱动架构的网络框架的重要组成部分,netty主要通过它来实现异步编程,从前面 ...

  3. Netty源码分析第1章(Netty启动流程)----第4节: 注册多路复用

    Netty源码分析第1章(Netty启动流程)---->第4节: 注册多路复用 Netty源码分析第一章:Netty启动流程   第四节:注册多路复用 回顾下以上的小节, 我们知道了channe ...

  4. 创建线程的三种方法_Netty源码分析系列之NioEventLoop的创建与启动

    前言 前三篇文章分别分析了 Netty 服务端 channel 的初始化.注册以及绑定过程的源码,理论上这篇文章应该开始分析新连接接入过程的源码了,但是在看源码的过程中,发现有一个非常重要的组件:Ni ...

  5. 【Netty之旅四】你一定看得懂的Netty客户端启动源码分析!

    前言 前面小飞已经讲解了NIO和Netty服务端启动,这一讲是Client的启动过程. 源码系列的文章依旧还是遵循大白话+画图的风格来讲解,本文Netty源码及以后的文章版本都基于:4.1.22.Fi ...

  6. AsyncHttpClient源码分析-基于Netty的连接池实现

    原文地址:asynchttpclient源码分析-基于Netty的连接池实现 最近项目重构,有了个机会更多接触一个有别于HttpAsyncClient的异步网络框架AsyncHttpClient,是个 ...

  7. 《RocketMQ源码分析》NameServer如何处理Broker的连接

    <RocketMQ源码分析>NameServer如何处理Broker的连接 NameServer 介绍 NameServer 功能 动态路由发现和注册 服务剔除 创建NameServerC ...

  8. 【Netty系列_3】Netty源码分析之服务端channel

    highlight: androidstudio 前言 学习源码要有十足的耐性!越是封装完美的框架,内部就越复杂,源码很深很长!不过要抓住要点分析,实在不行多看几遍,配合debug,去一窥优秀框架的精 ...

  9. 阿里开源一站式分布式事务框架seata源码分析(AT模式下TM与RM分析)

    序言: 对于阿里开源分布式事务框架seata的详细了解可以参考官网,这里不会详细介绍.本章只会介绍seata中AT模式的源码分析(对阿seata有一定了解或者成功完成过demo). seata中一个事 ...

最新文章

  1. C++ 重载运算符简单举例
  2. 程序员晒元宵节福利,网友:看了我想砸键盘......
  3. Java Socket实战之一:单线程通信
  4. 实时OLAP分析利器Druid介绍
  5. whea uncorrectable error蓝屏_Windows 10再出“不可选”更新:蓝屏、死机比较烦
  6. linux shell 切割文件,linux shell 将文件按照行数以及顺序拆分成多个文件
  7. ASP.NET Core 集成测试
  8. java jinq_将JINQ与JPA和H2一起使用
  9. SpringBoot2.0 整合 Shiro 框架,实现用户权限管理
  10. shell、ftp、mysql如何连接笔记
  11. Xcode打包踩过的那些坑
  12. win10桌面便签小工具下载,可固定电脑桌面的便签软件
  13. Python歌词解析
  14. 发烧游戏机型的计算机制配单,万元主机配置发烧级游戏设计渲染配置单
  15. 正则表达式 '^[a-zA-Z0-9''-'\s]{1,30}$' 代表什么意思?
  16. 如何让你得声音洪亮结实有磁性
  17. Fedora 15不能正常关机,总是卡死在关机画面上
  18. Web 前端通过调用ActiveX实现LPT1端口小票机打印功能。
  19. 北京化工大学通信工程linux,2020北京化工大学信息与通信工程考研经验考研真题考研分数线考研参考书、目录...
  20. hadoop入门介绍(一)

热门文章

  1. 解决log4j.properties不起作用的问题
  2. 企业基础管理薄弱,激励机制不健全怎么办?
  3. 利用EA根据sql脚本生成数据库文档
  4. PyTorch大更新!谷歌出手帮助开发,正式支持TensorBoard | 附5大开源项目
  5. 苹果无人车裁员200人,收购特斯拉呼声再起
  6. ICLR 2019提交截止,近1600篇论文已全部上线
  7. AI说:你的书法有咖喱味丨看字识国别
  8. 一场谷歌与苹果的合作:TensorFlow Lite开始支持Core ML
  9. AI即开即用,这是悄然推出的“腾讯最新AI技术”小程序
  10. 刚被通用收编的这家创业公司,号称能把LiDAR成本降低近100%