Netty是如何把Channel 从Boss线程传到Work线程的?
在 ServerBootstrapAcceptor#channelRead 方法中调用childGroup 将Channel 注册到work 线程中的。
一、 处理流程
io.netty.channel.nio.NioEventLoop#run
@Overrideprotected void run() {for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();// 在这里处理 selectedKey} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}
private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {// 一开始selectedKeys 为空,返回注册在selector中等待IO操作(及有事件发生)channel的selectionKeyprocessSelectedKeysPlain(selector.selectedKeys());}}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {// SelectKey 类型是读或者 连接,执行read逻辑unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}
继续查看io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
@Overridepublic void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;// 从这里开始往后传播,一直到ServerBootstrapAcceptor 中的channelRead pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}
@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {// 就在这里 向work 线程组里投递childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}
一路向后,进入 io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
@Overridepublic ChannelFuture register(Channel channel) {// 看看next 里有什么return next().register(channel);}@Overridepublic EventExecutor next() {// chooser 是什么玩意啊?模糊!return chooser.next();}
在初始化work 的时候有这么一段
好,继续回到 register 流程,io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
@Overridepublic ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));}@Overridepublic ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}
继续进入 io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");}if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {// 投递完成eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}
之后就是work 自己去处理了
Netty是如何把Channel 从Boss线程传到Work线程的?相关推荐
- 一道Netty面试题:boss线程池和worker线程池能不能合在一起?
前言 这篇帖子我估计要反复修改,我不确定面试官是不是随口问的(就是可能他自己也没仔细想过这个问题...),我当时回答的是不能,我确实不大明白为啥要合在一起,合在一起你也是要有线程去处理连接,一部分线程 ...
- Netty入门——组件(Channel)一
目录 一.channel的主要作用 二.EventLoop处理io任务代码示例 2.1.服务端代码示例 2.2.客户端代码示例 2.3.服务端和客户端查看控制台输出结果 三.ChannelFuture ...
- 【操作系统】操作系统知识点整理;C++ 实现线程池与windows 线程池的使用;
文章目录 体系结构 冯诺依曼 存储结构 cache常见的组织结构 cache命中 缓存一致性 硬中断.软中断 操作系统结构 内核 Linux宏内核 内存管理 虚拟内存 内存管理 - 分段 - 分页 - ...
- mongodb线程池_常用高并发网络线程模型设计及MongoDB线程模型优化实践
服务端通常需要支持高并发业务访问,如何设计优秀的服务端网络IO工作线程/进程模型对业务的高并发访问需求起着至关重要的核心作用. 本文总结了了不同场景下的多种网络IO线程/进程模型,并给出了各种模型的优 ...
- hash是线程安全的吗?怎么解决?_这次进程、线程、多线程和线程安全问题,一次性帮你全解决了...
1. 什么是进程 一个软件,在操作系统中运行时,我们称其为进程. 进程是操作系统分配资源的最小单元,线程是操作系统调度的最小单元. 2. 什么是线程 在一个进程中,每个独立的功能都需要独立的去运行,这 ...
- 线程、线程匿名内部类、解决线程不安全的方式
线程 线程:正在运行的程序,是程序的执行路径:多线性 进程:是应用程序的载体,程序运行在虚拟机中.一个应用软件对应一个进程. 一个进程包含多个线程,一个线程对应一个进程. 好处:提高软件的运行效率 多 ...
- 线程池:治理线程的法宝
点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 作者:Oo鲁毅oO juejin.im/post/5e1b1fcc ...
- python多线程的使用(导入线程模块、创建子线程任务、启动子线程任务、获取当前执行的线程号)
1. 导入线程模块 #导入线程模块 import threading 2. 线程类Thread参数说明 Thread([group [, target [, name [, args [, kwarg ...
- python中gil锁和线程锁_Python线程——GIL锁、线程锁(互斥锁)、递归锁(RLock)...
GIL锁 计算机有4核,代表着同一时间,可以干4个任务.如果单核cpu的话,我启动10个线程,我看上去也是并发的,因为是执行了上下文的切换,让看上去是并发的.但是单核永远肯定时串行的,它肯定是串行 ...
最新文章
- legend位置 pyecharts_实验|pyecharts数据可视化分析-1
- 趋势畅想-搭载android系统的智能数码相机
- 第四周项目五-用递归方法求解(求n的阶乘)
- NSMutableArray 记住取不到时要进行强转
- 使用Vsftpd服务传输文件
- 专访阿里云域名与网站业务总经理宋瑛桥:域名未来将更加个性化、生态化和规范化...
- 模块XX.dll已加载,但对DllRegisterServer的调用失败
- Vijos1775 CodeVS1174 NOIP2009 靶形数独
- java将所有的字符串转换为大写或小写
- 手机怎么用java9_java9_java9官方版 32位64位 最新版_天天下载手机版
- 在ubuntu16.04下安装opencv3.4.5(超详细)
- 3月21日阿里云北京峰会的注册二维码
- Java从入门到精通(视频教程+源码)
- (Java)抽象类的基本概念
- SD卡驱动(基于XS128)
- 为什么吃鸡显示连接不到服务器,为什么吃鸡进游戏显示连接不上 | 手游网游页游攻略大全...
- 迈阿密色主题学科导航 HTML5静态开源
- HEVC码率控制算法1TEncRateCtrl
- Qt Qml 汽车仪表
- MDL---Material Design Lite框架推荐
热门文章
- PhpStudy下载安装使用教程,图文教程(超详细)
- 关于医疗区块链(Medical Block Chain)的杂谈,以及其他
- 《京韵大鼓——红梅阁》(唱词文本)(骆玉笙音配像本)
- Android常用透明度代码
- oracle修改外键值,ORACLE 外键约束修改行为
- 【VSCode】【Sass】在vscode中使用sass
- 计算机一级考试题库操作题手机版,全国计算机一级操作考试题库示范
- 这5个超好用的免费网站,你一定要知道
- 本周总结and下周规划
- 微信开放平台【第三方平台】java开发总结:验证票据(component_verify_ticket)(-)