Netty writeAndFlush()方法分为两步, 先 write 再 flush

    @Overridepublic ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {DefaultChannelHandlerContext next;next = findContextOutbound(MASK_WRITE);ReferenceCountUtil.touch(msg, next);next.invoker.invokeWrite(next, msg, promise);next = findContextOutbound(MASK_FLUSH);next.invoker.invokeFlush(next);return promise;}

以上是DefaultChannelHandlerContext中的writeAndFlush方法, 可见实际上是先调用了write, 然后调用flush

1. write

write方法从TailHandler开始, 穿过中间自定义的各种handler以后到达HeadHandler, 然后调用了HeadHandler的成员变量Unsafe的write

如下

        @Overridepublic void write(Object msg, ChannelPromise promise) {ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;if (outboundBuffer == null) {// If the outboundBuffer is null we know the channel was closed and so// need to fail the future right away. If it is not null the handling of the rest// will be done in flush0()// See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);// release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);return;}outboundBuffer.addMessage(msg, promise);}

最终会把需要write的msg和promise(也就是一个future, 我们拿到手的future, 添加Listener的也是这个)放入到outboundBuffer中, msg和promise在outboundBuffer中的存在形式是一个自定义的结构体Entry.

也就是说调用write方法实际上并不是真的将消息写出去, 而是将消息和此次操作的promise放入到了一个队列中

2. flush

flush也是从Tail开始, 最后到Head, 最终调用的也是Head里的unsafe的flush0()方法, 然后flush0()里再调用doWrite()方法, 如下:

 @Overrideprotected void doWrite(ChannelOutboundBuffer in) throws Exception {int writeSpinCount = -1;for (;;) {Object msg = in.current();if (msg == null) {// Wrote all messages.
                clearOpWrite();break;}if (msg instanceof ByteBuf) {ByteBuf buf = (ByteBuf) msg;int readableBytes = buf.readableBytes();if (readableBytes == 0) {in.remove();continue;}boolean setOpWrite = false;boolean done = false;long flushedAmount = 0;if (writeSpinCount == -1) {writeSpinCount = config().getWriteSpinCount();}for (int i = writeSpinCount - 1; i >= 0; i --) {int localFlushedAmount = doWriteBytes(buf); // 这里才是实际将数据写出去的地方if (localFlushedAmount == 0) {setOpWrite = true;break;}flushedAmount += localFlushedAmount;if (!buf.isReadable()) {done = true;break;}}in.progress(flushedAmount);if (done) {in.remove();} else {incompleteWrite(setOpWrite);break;}} else if (msg instanceof FileRegion) {FileRegion region = (FileRegion) msg;boolean setOpWrite = false;boolean done = false;long flushedAmount = 0;if (writeSpinCount == -1) {writeSpinCount = config().getWriteSpinCount();}for (int i = writeSpinCount - 1; i >= 0; i --) {long localFlushedAmount = doWriteFileRegion(region);if (localFlushedAmount == 0) {setOpWrite = true;break;}flushedAmount += localFlushedAmount;if (region.transfered() >= region.count()) {done = true;break;}}in.progress(flushedAmount);if (done) {in.remove(); // 根据写出的数据的数量情况, 来判断操作是否完成, 如果完成则调用 in.remove()} else {incompleteWrite(setOpWrite);break;}} else {throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));}}}

红字部分就是最后将数据写出去的地方, 这里写数据最终调用的是 GatheringByteChannel 的 write() 方法, 这是个原生Java接口, 具体实现依赖于实现这个接口的Java类, 例如会调用 NIO 的 SocketChannel 的write()方法, 至此, 实际写数据的过程出现了, SocketChannel可以运行在non-blocking模式, 也就是非阻塞异步模式, write数据会马上返回写入的数据数量 (并不一定是所有数据都写入成功, 对于是否写入了所有数据, Netty有自己的处理逻辑, 也就是上面代码中的红字的那段for循环, 具体参看下SocketChannel的javadoc和netty源码).

当所有数据写入SocketChannel成功, 开始调用in.remove(), 这个 in 就是第一步 1. write 里的那个 outboundBuffer, 他的类型是 ChannelOutboundBuffer, 代码如下:

    public final boolean remove() {if (isEmpty()) {return false;}Entry e = buffer[flushed];Object msg = e.msg;if (msg == null) {return false;}ChannelPromise promise = e.promise;int size = e.pendingSize;e.clear();flushed = flushed + 1 & buffer.length - 1;if (!e.cancelled) {// only release message, notify and decrement if it was not canceled before.
            safeRelease(msg);safeSuccess(promise); // 这里, 调用了promise的trySuccess()方法, 触发ListenerdecrementPendingOutboundBytes(size);}return true;}

最后会调用Promise的notifyListeners()操作, 触发Listener完成整个异步流程

---------

最后, 回到我们应用netty的时候的代码

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.writeAndFlush(new Object()).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {// do sth} else {// do sth
                }}});}

这就是整个流程

最后提一下, Netty的AbstractNioChannel里封装了selectionKey, 在accept socket的时候, socket会被注册到eventLoop()的Selector, 这个selectionKey就会被赋值,  如下

selectionKey = javaChannel().register(eventLoop().selector, 0, this);

在以后Selector的select()的时候,  则会通过这个key来获取到channel, 然后调用 AbstractChannel 里的 DefaultChannelPipeline 来触发 Handler 的 connect, read, write 等等事件...

转载于:https://www.cnblogs.com/zemliu/p/3667332.html

Netty writeAndFlush() 流程与异步相关推荐

  1. Netty源码分析第1章(Netty启动流程)----第4节: 注册多路复用

    Netty源码分析第1章(Netty启动流程)---->第4节: 注册多路复用 Netty源码分析第一章:Netty启动流程   第四节:注册多路复用 回顾下以上的小节, 我们知道了channe ...

  2. Netty实现的一个异步Socket代码

    本人写的一个使用Netty实现的一个异步Socket代码 package test.core.nio;import com.google.common.util.concurrent.ThreadFa ...

  3. 基于MQ对登录系统核心流程进行异步化改造,提升系统性能-11

    基于MQ对登录系统核心流程进行异步化改造,提升系统性能 1.会员表与实体类定义 2.登录RocketMQ 参数定义 3.登录消息生产者 4.登录服务(LoginService) 5.登录Control ...

  4. Netty在IDEA中搭建HelloWorld服务端并对Netty执行流程与重要组件进行介绍

    场景 什么是Netty Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架. Netty 是基于 Java NIO 的异步事件驱动 ...

  5. Netty : writeAndFlush的线程安全及并发问题

    使用Netty编程时,我们经常会从用户线程,而不是Netty线程池发起write操作,因为我们不能在netty的事件回调中做大量耗时操作.那么问题来了 – 1, writeAndFlush是线程安全的 ...

  6. ktor框架用到了netty吗_Ktor-构建异步服务器和客户端的 Kotlin 框架

    软件简介 Ktor 是一个使用 Kotlin 以最小的成本快速创建 Web 应用程序的框架. Ktor 是一个用于在连接系统(connected systems)中构建异步服务器和客户端的 Kotli ...

  7. Netty实战一之异步和事件驱动

    Netty是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服务器和客户端. 使用Netty你可以并不是很需要网络编程.多线程处理.并发等专业Java知识的积蓄. Net ...

  8. Ajax 详解 网页从输入url到渲染的流程 同步 异步 你想要的全都有

    1  前后端交互流程 1.1   了解服务器      提供某种服务器的机器(计算机) 1.2    了解前端         访问 服务器的几种方式 直接在地址栏输入网址            网页 ...

  9. java 伪异步 netty,大话netty系列之--伪异步BIO

    生意规模扩大 话说,老王和大明的生意越来越好,这就需要两个人增强业务往来,由于天南地北,两个人只能每次运输都需要雇一个人去运货(new 一个线程),一个月下来,两人一算,人力成本太大了,光是雇佣人一个 ...

最新文章

  1. 使用小技巧教你用Selenium获取鼠标指向的元素
  2. Cloud Foundry平台中国唯一云供应商,阿里云持续链接Cloud Foundry/Kubernetes生态
  3. bootstrap5
  4. c语言stdio中null的值,C/C++编程笔记:C语言NULL值和数字 0 值区别及NULL详解
  5. [scala-spark]4. 函数式编程
  6. gorm配置logger显示执行的sql
  7. 缓存机制与局部性原理
  8. linux 下添加,修改,删除路由
  9. 把A表中的a字段和b字段数据 复制到B表中的aa字段和bb字段
  10. 史上超全halcon常见3D算子汇总(一)
  11. python编辑器spyder运行_使用spyder编译器单步调试python
  12. 无法开启计算机,Win7下鼠标右键无法开启计算机属性怎么办?
  13. 秒,在解答这个C语言题目上,我们都败了
  14. 一个农民父亲令人震撼的力量
  15. 获取谷歌浏览器缓存视频方法
  16. linux安装音乐软件教程,Ubuntu 下安装深度音乐播放器
  17. 学习Python的pyecharts的过程中踩到的一些坑
  18. 音频特征(2):时域图、频谱图、语谱图(时频谱图)
  19. lisp横断面数据文件_【干货】横断面测量数据批量转换成断面图,CASS应该如何做?...
  20. 浅谈LANG_ISO 639-1世界语言列表检索

热门文章

  1. jsp 体检信息查询 绕过用户名验证_一篇彻底搞懂jsp
  2. python中history()_keras中的History对象用法
  3. mysql 1308_Mysql恢复数据报ERROR 1308 : LEAVE with no matching label_MySQL
  4. mysql server 5.0安装教程_MySQL Server 5.0安装教程
  5. php删除菜单栏,如何删除WordPress站点健康状态面板和菜单项
  6. 什么是四路串口服务器?
  7. 光端机的分类有哪些?
  8. 【渝粤教育】电大中专测量学作业 题库
  9. 【渝粤教育】电大中专新媒体营销实务 (3)作业 题库
  10. 【渝粤教育】电大中专市场营销管理 (2)作业 题库