最近在学些BIO,NIO相关的知识,也学习了下Netty和它的源码,做个记录,方便以后继续学习,如果有错误的地方欢迎指正

如果不了解BIO,NIO这些基础知识,可以看下我的如下博客

IO中的阻塞、非阻塞、同步、异步概念分析详解
Linux 网络 I/O 模型简介
Java NIO 介绍和基本demo

关于Netty的设计以及高性能的原因,可以看下李林锋老师的netty设计原理有个大概的理解

先贴下Netty启动客户端的代码

  EventLoopGroup group = null;final RpcClientHandler handler = new RpcClientHandler();try{group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class);//添加客户端的处理器bootstrap.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()//自定义协议解码器.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))//自定义协议编码器.addLast("frameEncoder", new LengthFieldPrepender(4))//对象参数类型编码器.addLast("encoder", new ObjectEncoder())// 对象参数类型解码器.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).addLast(handler);}});ChannelFuture future = bootstrap.connect("127.0.0.1", 8080).sync();

大致可以分为五步:

1.创建EventLoopGroup,本质上是一个线程池
2.创建Bootstrap并设置channel类型,客户端设置的是NioSocketChannel,服务端设置的是NioServerSocketChannel
3.设置option参数
4.设置handler处理器
5.connect服务器ip,端口 并启动

我们从connect方法为入口看下netty的源码是怎么实现的

其调用链如下:

connect(String inetHost, int inetPort) --> connect(SocketAddress remoteAddress) --> doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress)

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {//调用initAndRegister执行Channel的初始化和注册ChannelFuture regFuture = this.initAndRegister();final Channel channel = regFuture.channel();if (regFuture.isDone()) {return !regFuture.isSuccess() ? regFuture : this.doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());} else {final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {promise.setFailure(cause);} else {promise.registered();Bootstrap.this.doResolveAndConnect0(channel, remoteAddress, localAddress, promise);}}});return promise;}}final ChannelFuture initAndRegister() {Channel channel = null;//创建channelchannel = this.channelFactory.newChannel();//初始化channelthis.init(channel);...//注册channelChannelFuture regFuture = this.config().group().register(channel);...return regFuture;}

客户端Channel实例化

通过channel = this.channelFactory.newChannel();获取channel实例

调用了ReflectiveChannelFactory的newChannel方法,最后通过反射创建了channel对象

public ReflectiveChannelFactory(Class<? extends T> clazz) {if (clazz == null) {throw new NullPointerException("clazz");} else {this.clazz = clazz;}}public T newChannel() {try {//调用clazz默认的构造方法返回channel对象,这里的clazz就是上面构造函数里的clazzreturn (Channel)this.clazz.newInstance();} catch (Throwable var2) {throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);}}

这里创建的channel就是如下

bootstrap.group(group).channel(NioSocketChannel.class);

里设置的Channel类型,我们可以看下这里channel方法的实现

  public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");} else {return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));}}

不出所料,把我们的设置的channel类型通过构造方法传递给了ReflectiveChannelFactory,所以我们创建的channel其实就是NioSocketChannel
可以看下该channel的无参构造方法

public NioSocketChannel() {this(DEFAULT_SELECTOR_PROVIDER);}public NioSocketChannel(SelectorProvider provider) {this(newSocket(provider));}//通过SelectorProvider返回一个jdk的SocketChannelImpl对象private static java.nio.channels.SocketChannel newSocket(SelectorProvider provider) {try {return provider.openSocketChannel();} catch (IOException var2) {throw new ChannelException("Failed to open a socket.", var2);}}public NioSocketChannel(java.nio.channels.SocketChannel socket) {this((Channel)null, socket);}//这里的socket就是jdk的SocketChannel的实现类对象 并接着调用父类的构造方法public NioSocketChannel(Channel parent, java.nio.channels.SocketChannel socket) {super(parent, socket);//创建NioSocketChannelConfig对象this.config = new NioSocketChannel.NioSocketChannelConfig(this, socket.socket());}

NioSocketChannel父类的的构造方法如下

//这里的parent为Null,ch是刚刚通过provider创建的jdk的SocketChannel
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {super(parent, ch,SelectionKey.OP_READ);}

再向上调用,会调用AbstractNioChannel的构造方法,并传入实际的参数readInterestOp = SelectionKey.OP_READ

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;this.readInterestOp = readInterestOp;...//设置channel使用非阻塞模式,这是因为在java里Nio默认采用阻塞模式ch.configureBlocking(false);...}

再向上调用,会调用channel的顶层父类AbstractChannel

protected AbstractChannel(Channel parent) {this.parent = parent;this.id = this.newId();this.unsafe = this.newUnsafe();this.pipeline = this.newChannelPipeline();}

这里的parent跟据传入的参数一直是null
id:每个 Channel 都会有一个唯一的 id,用于标识
unsafe:通过 newUnsafe()实例化一个 unsafe 对象,它的类型是NioScoketChannel的NioSocketChannelUnsafe 内部类。
                 

Unsafe接口定义了很多对Channel(socket)进行操作的方法
pipeline:通过 new DefaultChannelPipeline(this) 创建的实例(这里的this是当前创建的channel对象)

    Each channel has its own pipeline and itis created automatically whena new channel is created

其构造方法如下:

    protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise =  new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}

从属性可以推测pipeline应该是一个链表结构,一个以AbstractChannelHandlerContext为节点的双向链表(TailContext和HeadContext都继承了AbstractChannelHandlerContext)

注册Channel到Selector

final ChannelFuture initAndRegister() {Channel channel = null;//创建channelchannel = this.channelFactory.newChannel();//初始化channelthis.init(channel);...//注册channelChannelFuture regFuture = this.config().group().register(channel);...return regFuture;}

在创建Channel实例之后,Netty会先对Channle进行初始化配置,然后再调用register方法进行注册

      private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();void init(Channel channel) throws Exception {//创建pipelineChannelPipeline p = channel.pipeline();//添加Handlerp.addLast(config.handler());//获取options参数并赋值到前面初始化的NioSocketChannelConfig对象里//这里返回的options对象里的键值对就是通过Bootstrap.option链式调用设值的final Map<ChannelOption<?>, Object> options = options0();synchronized (options) {for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {...if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}...}}//这里的attrs也类似,在Bootstrap设值,然后在init里保存到channel里final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}}}final Map<ChannelOption<?>, Object> options0() {return options;}

接下来就看下register方法里是怎么进行注册的

调用链如下:

MultithreadEventLoopGroup.register(Channel channel)  --> SingleThreadEventLoop.register(Channel channel)
-->  AbstractChannel.AbstractUnsafe register(EventLoop eventLoop, final ChannelPromise promise)
public final void register(EventLoop eventLoop, final ChannelPromise promise) {...//对eventLoop属性进行赋值AbstractChannel.this.eventLoop = eventLoop;//如果这里的eventLoop是当前线程,则直接执行register0方法if (eventLoop.inEventLoop()) {register0(promise);} else {//否则通过当前eventLoop来执行register0方法try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {...}}}

register0方法又调用了doRegister方法

    private void register0(ChannelPromise promise) {...// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;//调用jdk底层的register方法doRegister();neverRegistered = false;registered = true;pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);//回调通知 channel注册成功pipeline.fireChannelRegistered();if (isActive()) {//第一次注册则回调channelActiveif (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {//注册成功后会开始read操作// This channel was registered before and autoRead() is set. This means we need to begin readbeginRead();}}...}
 @Overrideprotected void doRegister() throws Exception {boolean selected = false;for (;;) {try {selectionKey = javaChannel().register(eventLoop().selector, 0, this);return;} catch (CancelledKeyException e) {if (!selected) {eventLoop().selectNow();selected = true;} else {throw e;}}}}

这里的javaChannel()返回的是一个 Java NIO 的SocketChannel 对象,然后就将 SocketChannel 注册到与 eventLoop 关联的selector 上,并将当前Channel 作为attachment 与SocketChannel 关联。

那么注册完成后Netty会帮我们做什么呢?

首先是调用pipeline.invokeHandlerAddedIfNeeded();然后会调用一个回调方法pipeline.fireChannelRegistered(),从head开始,会找满足Inbound为true的context执行其channelRegistered方法

接着会调用beginRead方法从channel进行数据读取,关键方法如下:

@Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}}

这里的readInterestOp是在AbstractNioChannel赋值的,应该是SelectionKey.OP_READ,而我们之前通过doRegister()注册的事件号是0,所以这里就是向selector注册SelectionKey.OP_READ事件,表示这个Channel已经可以开始处理read事件了

总结一下,Netty的注册过程主要实现了以下操作

Handler的添加

netty客户端的demo除了上面分析的方法,还有一个很重要的handler的链式调用,在里面定义了一些编解码器以及处理逻辑

.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()               .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)).addLast("frameEncoder", new LengthFieldPrepender(4)).addLast("encoder", new ObjectEncoder()).addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).addLast(handler);}});                   

这里就简单讲下其思路,具体的分析放到Pipeline的分析里

这里的handler方法主要就是添加一个ChannelHandler , ChannelInitializer实现了ChannelHandler接口,看下它的源码

//这里的initChannel(ChannelHandlerContext ctx)方法调用了我们实现的initChannel方法
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.try {//先按照我们自己实现的initChannel方法向pipeline中添加各种handler或者编解码器initChannel((C) ctx.channel());} catch (Throwable cause) {// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).// We do so to prevent multiple calls to initChannel(...).exceptionCaught(ctx, cause);} finally {//调用ctx.pipeline().remove(this)方法将自己从ChannelPipeline 中删除remove(ctx);}return true;}return false;}//然后上述initChannel是在handlerAdded里被调用@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {// This should always be true with our current DefaultChannelPipeline implementation.// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers// will be added in the expected order.if (initChannel(ctx)) {// We are done with init the Channel, removing the initializer now.removeState(ctx);}}}

简单来说就是先将ChannelInitializer加入到Pipeline,然后在上述回调通知的handlerAdded()方法中,调用 initChannel()方法,从而调用我们实现的ChannelInitializer的initChannel()方法,这样就可以执行我们定义的一系列addLast操作,将各种 handler 添加到ChannelPipeline 中,最后调用 ctx.pipeline().remove(this)方法将ChannelInitializer从ChannelPipeline 中删除。

客户端发起Connect请求

按照前面的调用链

connect(String inetHost, int inetPort) --> connect(SocketAddress remoteAddress) --> doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) --> Bootstrap doResolveAndConnect0() --> Bootstrap doConnect()

private static void doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {//获取对应的channelfinal Channel channel = connectPromise.channel();channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {//在channel调用connect方法if (localAddress == null) {channel.connect(remoteAddress, connectPromise);} else {channel.connect(remoteAddress, localAddress, connectPromise);}connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}});}

然后调用pipeline的connect方法,最后调用tail的connect方法,实际调用了AbstractChannelHandlerContext(TailContext的父类)的connect

 @Overridepublic final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {return tail.connect(remoteAddress, localAddress, promise);}
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {if (remoteAddress == null) {throw new NullPointerException("remoteAddress");}if (!validatePromise(promise, false)) {// cancelledreturn promise;}//从当前context(TailContext)向前遍历找到第一个outbound属性为true的contextfinal AbstractChannelHandlerContext next = findContextOutbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {//然后执行其invokeConnectnext.invokeConnect(remoteAddress, localAddress, promise);} else {safeExecute(executor, new Runnable() {@Overridepublic void run() {next.invokeConnect(remoteAddress, localAddress, promise);}}, promise, null);}return promise;}private AbstractChannelHandlerContext findContextOutbound() {AbstractChannelHandlerContext ctx = this;do {ctx = ctx.prev;} while (!ctx.outbound);return ctx;}

invokeConnect方法如下:

 private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {if (invokeHandler()) {try {((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}} else {connect(remoteAddress, localAddress, promise);}}

这里的connect最终会调用HeadContext的connect方法,然后调用了unsafe的connect方法(具体分析也放在Pipeline的分析里)

public void connect(ChannelHandlerContext ctx,SocketAddress remoteAddress, SocketAddress localAddress,ChannelPromise promise) throws Exception {unsafe.connect(remoteAddress, localAddress, promise);}

实际是调用了AbstractNioChannel里的connect方法

链路如下:

HeadContext connect()-->unsafte connect() -->AbstractNioChannel connect() --> AbstractNioChannel doConnect() --> NioSocketChannel doConnect()

这里就是对Java 的NIO api的调用了

  @Overrideprotected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {if (localAddress != null) {doBind0(localAddress);}boolean success = false;try {//建立连接,如果不成功就注册连接事件boolean connected = javaChannel().connect(remoteAddress);if (!connected) {selectionKey().interestOps(SelectionKey.OP_CONNECT);}success = true;return connected;} finally {if (!success) {doClose();}}}

至此客户端启动的基本流程就走完了

Netty学习笔记(一)Netty客户端源码分析相关推荐

  1. Nginx学习笔记(五) 源码分析内存模块内存对齐

    Nginx源码分析&内存模块 今天总结了下C语言的内存分配问题,那么就看看Nginx的内存分配相关模型的具体实现.还有内存对齐的内容~~不懂的可以看看~~ src/os/unix/Ngx_al ...

  2. 【SLAM学习笔记】12-ORB_SLAM3关键源码分析⑩ Optimizer(七)地图融合优化

    2021SC@SDUSC 目录 1.前言 2.代码分析 1.前言 这一部分代码量巨大,查阅了很多资料结合来看的代码,将分为以下部分进行分析 单帧优化 局部地图优化 全局优化 尺度与重力优化 sim3优 ...

  3. 【SLAM学习笔记】6-ORB_SLAM3关键源码分析④ Optimizer(一)单帧优化

    2021SC@SDUSC 目录 1.前言 2.代码分析 1.前言 Optimizer是非常重要的代码文件!! 这一部分代码量巨大,查阅了很多资料结合来看的代码,将分为以下部分进行分析 1. 单帧优化 ...

  4. 【SLAM学习笔记】11-ORB_SLAM3关键源码分析⑨ Optimizer(六)地图回环优化

    2021SC@SDUSC 目录 1.前言 2.代码分析 1.前言 这一部分代码量巨大,查阅了很多资料结合来看的代码,将分为以下部分进行分析 单帧优化 局部地图优化 全局优化 尺度与重力优化 sim3优 ...

  5. netty中的future和promise源码分析(二)

    前面一篇netty中的future和promise源码分析(一)中对future进行了重点分析,接下来讲一讲promise. promise是可写的future,从future的分析中可以发现在其中没 ...

  6. Android学习笔记-常用的一些源码,防止忘记了

    Android学习笔记-常用的一些源码,防止忘记了... 设置拨打电话 StringdialUri="tell:"+m_currentTelNumble; IntentcallIn ...

  7. Ceph 学习——OSD读写流程与源码分析(一)

    消息从客户端发送而来,之前几节介绍了 客户端下 对象存储.块存储库的实现以及他们在客户端下API请求的发送过程(Ceph学习--Librados与Osdc实现源码解析 . Ceph学习--客户端读写操 ...

  8. Eoe客户端源码分析---SlidingMenu的使用

    Eoe客户端源码分析及代码注释 使用滑动菜单SlidingMenu,单击滑动菜单的不同选项,可以通过ViewPager和PagerIndicator显示对应的数据内容. 0  BaseSlidingF ...

  9. grpc-go客户端源码分析

    grpc-go客户端源码分析 代码讲解基于v1.37.0版本. 和grpc-go服务端源码分析一样,我们先看一段示例代码, const (address = "localhost:50051 ...

  10. TeamTalk客户端源码分析七

    TeamTalk客户端源码分析七 一,CBaseSocket类 二,select模型 三,样例分析:登录功能 上篇文章我们分析了network模块中的引用计数,智能锁,异步回调机制以及数据的序列化和反 ...

最新文章

  1. 如何把后台返回数据的根据某个选项去重新排序?
  2. @JsonSerialize 与 @JsonDeserialize 使用
  3. js正则匹配闭合标签_正则表达式匹配封闭html标签
  4. php 子文件夹如何定义,php-子文件夹的重写规则
  5. pytorchgpu测试_pytorch学习(十)—训练并测试CNN网络
  6. servlet和action中获取URL中的汉字(解决URL中汉字为乱码的问题) .
  7. php扩展zval,PHP扩展开发(7):zval结构
  8. 群论在计算机应用技术,群论在计算机全领域中应用.ppt
  9. 深入浅出mysql_深入浅出MySQL读书笔记(一)
  10. opencv 修改图像像素
  11. TCP/IP网络编程之多进程服务端(一)
  12. 求一个n*n矩阵对角线元素之和C语言,求一个n*n矩阵主对角线之和,次对角线元素之和.用指针完成...
  13. github的博客搭建以及标签的自动化
  14. 【Java -- Map】
  15. 基于gmssl SM2 签名验签测试程序
  16. 60+CSS技巧教程资源大全
  17. java毕业设计项目源代码javaweb租车汽车租赁汽车出租管理系统
  18. BigDecumal
  19. 2022-2028年中国甲基三苯基溴化膦行业市场经营管理及投资机会分析报告
  20. java3D桌球7723_极品桌球3D豪华版

热门文章

  1. IDEA同时使用maven和gradle
  2. 3.2Python的循环结构语句:
  3. webpack2.x 中文文档 翻译 之 出口Output
  4. 抑郁症的前期体现有哪些
  5. ASP.NET MVC+EF框架+EasyUI实现权限管理系列(13)-权限设计
  6. Unknown media type in type ‘all/all’ 的解决办法
  7. VS2008如何添加 OLE/COM 对象查看器 .
  8. PAT (Basic Level) 1091 N-自守数(模拟+stl)
  9. Linux中高斯分布的参数设置,华为openGauss 配置操作系统参数
  10. matlab白噪声模块,matlab白噪声实现