在 ServerBootstrapAcceptor#channelRead 方法中调用childGroup 将Channel 注册到work 线程中的。

一、 处理流程
io.netty.channel.nio.NioEventLoop#run

 @Overrideprotected void run() {for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();// 在这里处理 selectedKey} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}
 private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {// 一开始selectedKeys 为空,返回注册在selector中等待IO操作(及有事件发生)channel的selectionKeyprocessSelectedKeysPlain(selector.selectedKeys());}}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {// SelectKey 类型是读或者 连接,执行read逻辑unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}

继续查看io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read

     @Overridepublic void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;// 从这里开始往后传播,一直到ServerBootstrapAcceptor 中的channelRead pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}
     @Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {// 就在这里 向work 线程组里投递childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}

一路向后,进入 io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)

  @Overridepublic ChannelFuture register(Channel channel) {// 看看next 里有什么return next().register(channel);}@Overridepublic EventExecutor next() {// chooser 是什么玩意啊?模糊!return chooser.next();}

在初始化work 的时候有这么一段

好,继续回到 register 流程,io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)

    @Overridepublic ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));}@Overridepublic ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}

继续进入 io.netty.channel.AbstractChannel.AbstractUnsafe#register

     @Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");}if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {// 投递完成eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}

之后就是work 自己去处理了

Netty是如何把Channel 从Boss线程传到Work线程的?相关推荐

  1. 一道Netty面试题:boss线程池和worker线程池能不能合在一起?

    前言 这篇帖子我估计要反复修改,我不确定面试官是不是随口问的(就是可能他自己也没仔细想过这个问题...),我当时回答的是不能,我确实不大明白为啥要合在一起,合在一起你也是要有线程去处理连接,一部分线程 ...

  2. Netty入门——组件(Channel)一

    目录 一.channel的主要作用 二.EventLoop处理io任务代码示例 2.1.服务端代码示例 2.2.客户端代码示例 2.3.服务端和客户端查看控制台输出结果 三.ChannelFuture ...

  3. 【操作系统】操作系统知识点整理;C++ 实现线程池与windows 线程池的使用;

    文章目录 体系结构 冯诺依曼 存储结构 cache常见的组织结构 cache命中 缓存一致性 硬中断.软中断 操作系统结构 内核 Linux宏内核 内存管理 虚拟内存 内存管理 - 分段 - 分页 - ...

  4. mongodb线程池_常用高并发网络线程模型设计及MongoDB线程模型优化实践

    服务端通常需要支持高并发业务访问,如何设计优秀的服务端网络IO工作线程/进程模型对业务的高并发访问需求起着至关重要的核心作用. 本文总结了了不同场景下的多种网络IO线程/进程模型,并给出了各种模型的优 ...

  5. hash是线程安全的吗?怎么解决?_这次进程、线程、多线程和线程安全问题,一次性帮你全解决了...

    1. 什么是进程 一个软件,在操作系统中运行时,我们称其为进程. 进程是操作系统分配资源的最小单元,线程是操作系统调度的最小单元. 2. 什么是线程 在一个进程中,每个独立的功能都需要独立的去运行,这 ...

  6. 线程、线程匿名内部类、解决线程不安全的方式

    线程 线程:正在运行的程序,是程序的执行路径:多线性 进程:是应用程序的载体,程序运行在虚拟机中.一个应用软件对应一个进程. 一个进程包含多个线程,一个线程对应一个进程. 好处:提高软件的运行效率 多 ...

  7. 线程池:治理线程的法宝

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 作者:Oo鲁毅oO juejin.im/post/5e1b1fcc ...

  8. python多线程的使用(导入线程模块、创建子线程任务、启动子线程任务、获取当前执行的线程号)

    1. 导入线程模块 #导入线程模块 import threading 2. 线程类Thread参数说明 Thread([group [, target [, name [, args [, kwarg ...

  9. python中gil锁和线程锁_Python线程——GIL锁、线程锁(互斥锁)、递归锁(RLock)...

    GIL锁 ​ 计算机有4核,代表着同一时间,可以干4个任务.如果单核cpu的话,我启动10个线程,我看上去也是并发的,因为是执行了上下文的切换,让看上去是并发的.但是单核永远肯定时串行的,它肯定是串行 ...

最新文章

  1. legend位置 pyecharts_实验|pyecharts数据可视化分析-1
  2. 趋势畅想-搭载android系统的智能数码相机
  3. 第四周项目五-用递归方法求解(求n的阶乘)
  4. NSMutableArray 记住取不到时要进行强转
  5. 使用Vsftpd服务传输文件
  6. 专访阿里云域名与网站业务总经理宋瑛桥:域名未来将更加个性化、生态化和规范化...
  7. 模块XX.dll已加载,但对DllRegisterServer的调用失败
  8. Vijos1775 CodeVS1174 NOIP2009 靶形数独
  9. java将所有的字符串转换为大写或小写
  10. 手机怎么用java9_java9_java9官方版 32位64位 最新版_天天下载手机版
  11. 在ubuntu16.04下安装opencv3.4.5(超详细)
  12. 3月21日阿里云北京峰会的注册二维码
  13. Java从入门到精通(视频教程+源码)
  14. (Java)抽象类的基本概念
  15. SD卡驱动(基于XS128)
  16. 为什么吃鸡显示连接不到服务器,为什么吃鸡进游戏显示连接不上 | 手游网游页游攻略大全...
  17. 迈阿密色主题学科导航 HTML5静态开源
  18. HEVC码率控制算法1TEncRateCtrl
  19. Qt Qml 汽车仪表
  20. MDL---Material Design Lite框架推荐

热门文章

  1. PhpStudy下载安装使用教程,图文教程(超详细)
  2. 关于医疗区块链(Medical Block Chain)的杂谈,以及其他
  3. 《京韵大鼓——红梅阁》(唱词文本)(骆玉笙音配像本)
  4. Android常用透明度代码
  5. oracle修改外键值,ORACLE 外键约束修改行为
  6. 【VSCode】【Sass】在vscode中使用sass
  7. 计算机一级考试题库操作题手机版,全国计算机一级操作考试题库示范
  8. 这5个超好用的免费网站,你一定要知道
  9. 本周总结and下周规划
  10. 微信开放平台【第三方平台】java开发总结:验证票据(component_verify_ticket)(-)