上文我们从netty-example的Discard服务器端示例分析了netty的组件,今天我们从另一个简单的示例Echo客户端分析一下上个示例中没有出现的netty组件。

1. 服务端的连接处理,读写处理

echo客户端代码:

/*** Sends one message when a connection is open and echoes back any received* data to the server.  Simply put, the echo client initiates the ping-pong* traffic between the echo client and server by sending the first message to* the server.*/
public final class EchoClient {static final boolean SSL = System.getProperty("ssl") != null;static final String HOST = System.getProperty("host", "127.0.0.1");static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));public static void main(String[] args) throws Exception {// Configure SSL.gitfinal SslContext sslCtx;if (SSL) {sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();} else {sslCtx = null;}// Configure the client.EventLoopGroup group = new NioEventLoopGroup();try {
            Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));}//p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler());}});// Start the client.ChannelFuture f = b.connect(HOST, PORT).sync();// Wait until the connection is closed.
            f.channel().closeFuture().sync();} finally {// Shut down the event loop to terminate all threads.
            group.shutdownGracefully();}}
}

从上面的代码可以看出,discard的服务端代码和echo的客户端代码基本相似,不同的是一个使用ServerBootStrap,另一个使用BootStrap而已。先看一下连接过程

NioEventLoop处理key的过程,

 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write  ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }

2.1 连接流程

调用AbstractNioByteChannel的finishConnect()方法

        @Overridepublic final void finishConnect() {// Note this method is invoked by the event loop only if the connection attempt was// neither cancelled nor timed out.assert eventLoop().inEventLoop();try {boolean wasActive = isActive();doFinishConnect();fulfillConnectPromise(connectPromise, wasActive);} catch (Throwable t) { fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } }

触发channelActive操作:

        private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {if (promise == null) {// Closed via cancellation and the promise has been notified already.return;}// trySuccess() will return false if a user cancelled the connection attempt.boolean promiseSet = promise.trySuccess();// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,// because what happened is what happened.if (!wasActive && isActive()) {pipeline().fireChannelActive();}// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().if (!promiseSet) { close(voidPromise()); } }

2.2 读操作流程

调用AbstractNioByteChannel的read()方法,

  典型的autoRead流程如下:

  1. 当socket建立连接时,Netty触发一个inbound事件channelActive,然后提交一个read()请求给本身(参考DefaultChannelPipeline.fireChannelActive())

  2. 接收到read()请求后,Netty从socket读取消息。

  3. 当读取到消息时,Netty触发channelRead()。

  4. 当读取不到消息后,Netty触发ChannelReadCompleted().

  5. Netty提交另外一个read()请求来继续从socket中读取消息。

@Overridepublic final void read() {final ChannelConfig config = config();if (!config.isAutoRead() && !isReadPending()) {// ChannelConfig.setAutoRead(false) was called in the meantimeremoveReadOp();return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; try { boolean needReadPendingReset = true; do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer.  byteBuf.release(); byteBuf = null; break; } allocHandle.incMessagesRead(1); if (needReadPendingReset) { needReadPendingReset = false; setReadPending(false); } pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (allocHandle.lastBytesRead() < 0) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, allocHandle.lastBytesRead() < 0, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }

触发读操作

    @Overridepublic ChannelHandlerContext fireChannelRead(Object msg) {AbstractChannelHandlerContext next = findContextInbound();next.invoker().invokeChannelRead(next, pipeline.touch(msg, next));return this;}

读完触发完成事件

    @Overridepublic ChannelPipeline fireChannelReadComplete() {head.fireChannelReadComplete();if (channel.config().isAutoRead()) {read();}return this;}@Overridepublic ChannelHandlerContext fireChannelReadComplete() {AbstractChannelHandlerContext next = findContextInbound(); next.invoker().invokeChannelReadComplete(next); return this; }

2.3 写操作流程

写操作

 @SuppressWarnings("deprecation")protected void flush0() {if (inFlush0) {// Avoid re-entrancereturn;}final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ close(voidPromise(), t, false); } else { outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false; } }

写操作具体实现(以NioSocketChannel为例):

 @Overrideprotected void doWrite(ChannelOutboundBuffer in) throws Exception {for (;;) {int size = in.size();if (size == 0) {// All written so clear OP_WRITEclearOpWrite(); break; } long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; // Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers(); int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); SocketChannel ch = javaChannel(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. super.doWrite(in); return; case 1: // Only one ByteBuf so use non-gathering write ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; default: for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; } // Release the fully written buffers, and update the indexes of the partially written buffer.  in.removeBytes(writtenBytes); if (!done) { // Did not write all buffers completely.  incompleteWrite(setOpWrite); break; } } }

2. ChannelInboundHandler和ChannelInboundHandler

Echo的handler代码如下:

/*** Handler implementation for the echo client.  It initiates the ping-pong* traffic between the echo client and server by sending the first message to* the server.*/
public class EchoClientHandler extends ChannelInboundHandlerAdapter {private final ByteBuf firstMessage;/*** Creates a client-side handler.*/public EchoClientHandler() {firstMessage = Unpooled.buffer(EchoClient.SIZE);for (int i = 0; i < firstMessage.capacity(); i ++) {firstMessage.writeByte((byte) i);}}@Overridepublic void channelActive(ChannelHandlerContext ctx) {ctx.writeAndFlush(firstMessage);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.write(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// Close the connection when an exception is raised.
        cause.printStackTrace();ctx.close();}

上面的代码出现了两个重要的netty组件:ChannelInboundHandlerAdapter和ByteBuf。其中ByteBuf在另一篇文章已经讲到。我们这次重点分析一下    ChannelInboundHandlerAdapter及其相关类。

  ChannelInboundHandlerAdapter继承了ChannelInboundHandler,它的作用是将operation转到ChannelPipeline中的下一个ChannelHandler。子类可以重写一个方法的实现来改变。注意:在方法#channelRead(ChannelHandlerContext, Object)自动返回前,message不会释放。若需要一个可以自动释放接收消息的ChannelInboundHandler实现时,请考虑SimpleChannelInboundHandler。

  ChannelOutboundHandlerAdapter继承了ChannelOutboundHandler,它仅通过调用ChannelHandlerContext跳转到每个方法。

  ChannelInboundHandler处理输入的事件,事件由外部事件源产生,例如从一个socket接收到数据。

  ChannelOutboundHandler解析你自己应用提交的操作。

 2.1 ChannelInboundHandler.channelActive() 

从源码角度看一下,Netty触发一个inbound事件channelActive(以LoggingHandler为例):

   @Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {if (logger.isEnabled(internalLevel)) {logger.log(internalLevel, format(ctx, "ACTIVE"));}ctx.fireChannelActive();}

触发操作如下:

     @Overridepublic ChannelHandlerContext fireChannelActive() {AbstractChannelHandlerContext next = findContextInbound();next.invoker().invokeChannelActive(next);return this;}private AbstractChannelHandlerContext findContextInbound() {AbstractChannelHandlerContext ctx = this;do {ctx = ctx.next;} while (!ctx.inbound);return ctx;}

invokeChannelActive方法实现:

    @Overridepublic void invokeChannelActive(final ChannelHandlerContext ctx) {if (executor.inEventLoop()) {invokeChannelActiveNow(ctx);} else {executor.execute(new OneTimeTask() {@Overridepublic void run() {invokeChannelActiveNow(ctx);}});}}public static void invokeChannelActiveNow(final ChannelHandlerContext ctx) {try {((ChannelInboundHandler) ctx.handler()).channelActive(ctx);} catch (Throwable t) {notifyHandlerException(ctx, t);}}

2.2 ChannelOutboundHandler.Read()

读的流程:

    @Overridepublic ChannelHandlerContext read() {AbstractChannelHandlerContext next = findContextOutbound();next.invoker().invokeRead(next);return this;}

查找outbound的过程:

    private AbstractChannelHandlerContext findContextOutbound() {AbstractChannelHandlerContext ctx = this;do {ctx = ctx.prev;} while (!ctx.outbound);return ctx;}

触发读操作:

    @Overridepublic void invokeRead(final ChannelHandlerContext ctx) {if (executor.inEventLoop()) {invokeReadNow(ctx);} else {AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;Runnable task = dctx.invokeReadTask;if (task == null) {dctx.invokeReadTask = task = new Runnable() {@Overridepublic void run() {invokeReadNow(ctx);}};}executor.execute(task);}}

2.3 ChannelOutboundHandler.write()

以实现类LoggingHandler为例:

    @Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {if (logger.isEnabled(internalLevel)) {logger.log(internalLevel, format(ctx, "WRITE", msg));}ctx.write(msg, promise);}

具体实现:

    @Overridepublic ChannelFuture write(Object msg, ChannelPromise promise) {AbstractChannelHandlerContext next = findContextOutbound();next.invoker().invokeWrite(next, pipeline.touch(msg, next), promise);return promise;}

写操作的触发

    @Overridepublic void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {if (msg == null) {throw new NullPointerException("msg");}if (!validatePromise(ctx, promise, true)) {// promise cancelled
            ReferenceCountUtil.release(msg);return;}if (executor.inEventLoop()) {invokeWriteNow(ctx, msg, promise);} else {safeExecuteOutbound(WriteTask.newInstance(ctx, msg, promise), promise, msg);}}

立刻触发

    public static void invokeWriteNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {try {((ChannelOutboundHandler) ctx.handler()).write(ctx, msg, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}}

小结:

  Netty中,可以注册多个handler。ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行,如下图所示,按照注册的先后顺序对Handler进行排序,request进入Netty后的执行顺序为:

参考文献

【1】http://blog.csdn.net/u013252773/article/details/21195593

【2】http://stackoverflow.com/questions/22354135/in-netty4-why-read-and-write-both-in-outboundhandler

转载于:https://www.cnblogs.com/davidwang456/p/5046406.html

从netty-example分析Netty组件续相关推荐

  1. 从netty-example分析Netty组件

    分析netty从源码开始 准备工作: 1.下载源代码:https://github.com/netty/netty.git 我下载的版本为4.1 2. eclipse导入maven工程. netty提 ...

  2. 56. Netty源代码分析-服务器初始化 NioEventLoopGroup实例化

    一. 代码下载 Netty代码下载和编译参考前一篇Netty文章 https://blog.51cto.com/483181/2112163 二. 服务器代码分析 2.1 服务器代码编写 一般Nett ...

  3. 【Netty】Netty 入门案例分析 ( Netty 模型解析 | Netty 服务器端代码 | Netty 客户端代码 )

    文章目录 一. Netty 模型代码解析 二. Netty 案例服务器端代码 1 . 服务器主程序 2 . 服务器自定义 Handler 处理者 三. Netty 案例客户端代码 1 . 客户端主程序 ...

  4. 为什么选择Netty作为基础通信组件?

    以下内容根据网上资料和自己整理总结而成 一.什么是Netty? Netty是一个高性能 事件驱动.异步非堵塞的IO(NIO)Java开源框架,Jboss提供,用于建立TCP等底层的连接,基于Netty ...

  5. Netty系列之Netty基础概念与组件

    什么是Netty,Netty各个组件介绍 本部分转载自 Java技术债务[什么是Netty?为什么使用Netty?Netty有哪些组件?] 原文链接:https://blog.csdn.net/qq_ ...

  6. 【网络】Wireshark分析Netty建链过程( tcp三次握手、osi模型)

    文章目录 1. osi模型简述 2. tcp三次握手 3. 验证三次握手 系列文章: <Wireshark分析Netty建链过程( tcp三次握手.osi模型)> <IPV4数据报头 ...

  7. 【Netty】Netty 入门案例分析 ( Netty 线程模型 | Netty 案例需求 | IntelliJ IDEA 项目导入 Netty 开发库 )

    文章目录 一. Netty 线程模型 二. Netty 案例需求 三. IntelliJ IDEA 引入 Netty 包 一. Netty 线程模型 1 . Netty 中的线程池 : Netty 中 ...

  8. Netty入门之Netty的基本介绍和IO模型

    一.Netty介绍和应用场景 1.简介 Netty是由JBOSS提供的一个java开源框架,现为Github上的独立项目 Netty是一个异步的.基于事件驱动的网络应用框架,用以快速开发高性能.高可靠 ...

  9. 【读后感】Netty 系列之 Netty 高性能之道 - 相比 Mina 如何 ?

    [读后感]Netty 系列之 Netty 高性能之道 - 相比 Mina 如何 ? 太阳火神的美丽人生 (http://blog.csdn.net/opengl_es) 本文遵循"署名-非商 ...

  10. Netty 快速开始(netty websocket客户端使用流程)

    文章目录 一.网络IO的基本知识与概念 1. 同步.异步.阻塞.非阻塞概念 2. IO模型 3. NIO和IO有什么区别? 4. Java NIO 工作流程 二.netty 1. 什么是netty? ...

最新文章

  1. 电子合同的履行_什么是电子合同履行?怎么履行电子合同?
  2. 【VS+QT开发】找不到到qhostinfo.h文件
  3. 深度学习核心技术精讲100篇(四十二)-Seq2seq框架下的文本生成
  4. C#中动态加载卸载类库
  5. 2.1.4 什么是ASCII码?汉字的表示和编码是怎样的?
  6. CodeForces - 1328E Tree Queries(dfs序/LCA)
  7. c语言如何控制电脑串口,C语言直接驱动硬件实现PC机的串口操作
  8. 新的一年,推荐一些好书给大家
  9. 京东拼多多全面封杀电子烟,淘宝仍正常销售...
  10. vim 基本操作总结
  11. 【译】x86程序员手册13-第5章 内存管理
  12. 记录linux deploy如何进行分区安装centos7
  13. hdu 5053 the Sum of Cube(水)
  14. 李宏毅老师《机器学习》课程笔记-1深度学习简介
  15. Revit二次开发之绘制钢筋
  16. Java——文本框设置背景颜色、字体样式和颜色
  17. 循环神经网络中梯度爆炸的原因
  18. BGP通过团体和MED属性灵活控制回程路由
  19. FFmpeg[32] - x264 [error]: high422 profile doesn‘t support lossless
  20. 数据结构(七)高级排序算法——归并、快速排序

热门文章

  1. python高通滤波器设计_python实现直方图均衡化,理想高通滤波与高斯低通滤波
  2. 会员直推奖php程序_PHP自适应卡益源码 前台直销源码 报单费 直推奖 有内部商城...
  3. python用表达式解密密文_基于Python解密仿射密码
  4. Qt工程pro文件配置详解
  5. 客户端界面实现及登录功能实现
  6. 计算器界面分析及界面程序实现
  7. python构建网站flask_30分钟搭建Python的Flask框架并在上面编写第一个应用
  8. “纯金”卫星,撞向我们的“蛋壳时代”
  9. Linux下CMake简明教程(10) 定义宏来控制打印的信息
  10. python主线程执行_python 并发执行之多线程