文章目录

  • 一、 Netty 模型
  • 二、 异步模型
  • 三、 Future-Listener 机制
  • 四、 Future-Listener 机制代码示例

一、 Netty 模型


以服务器端为例

1 . 线程池 : Netty 模型核心就是两个线程池 , BossGroup 线程池 和 WorkerGroup 线程池 ;

① BossGroup 线程池 : 负责维护客户端连接操作 ;

② WorkerGroup 线程池 : 负责与客户端进行数据交互 ;

③ 线程池类型 : 上述两个线程池 ( BossGroup / WorkerGroup ) 都是 NioEventLoopGroup 类型的 ;

④ 线程池中的线程 : NioEventLoopGroup 线程池中维护了多个 NioEventLoop 线程 ;

2 . 线程池中的线程 : NioEventLoopGroup 线程池中维护了若干 NioEventLoop 线程 , 这相当于主从反应器 ( Reactor ) 模型中的反应器 , 每个 NioEventLoop 中都有一个 选择器 ( Selector ) , 用于监听 Socket IO 事件 , 如 建立连接 , 数据读写 等 ;

3 . NioEventLoop 工作流程 :

NioEventLoop 中可以按照一定顺序进行数据处理 , 如数据到来后 , 按照下面的流程执行一系列操作 ;

读取数据 -> 数据解码 -> 业务逻辑处理 -> 数据编码 -> 数据发送

4 . NioEventLoop 中封装内容 :

  • 选择器 Selector
  • 任务队列 TaskQueue
  • 调度任务队列 ScheduleTaskQueue
  • NIO 通道 NioChannel
  • 管道 ChannelPipeline

上面是 Netty 的模型的总体架构 , 下面重点介绍 Netty 模型中的异步模型 , Netty 中的每次绑定端口 , 连接远程端口 , 读写数据都要涉及到异步操作 ;

二、 异步模型


1 . 异步操作概念 : 调用者调用一个异步操作后 , 并不能马上知道该操作的返回值 , 该操作也不会马上执行完成 , 该操作完成后 , 会通过回调机制 , 如 通知 , 注册的回调函数等机制通知调用者 ;

2 . Netty 中的异步操作与 ChannelFuture 返回值 :

① 异步操作 : Netty 模型中凡是关于 IO 的操作 , 如绑定端口 ( Bind ) , 远程连接 ( Connect ) , 读取数据 ( Read ) , 写出数据 ( Write ) 等操作都是异步操作 ;

② 异步操作返回值 : 上述 IO 操作返回值都是 ChannelFuture 类型实例 , ChannelFuture 是异步 IO 操作的返回结果 ;

③ 在服务器端绑定端口号时 , 调用 Bootstrap 的 bind 方法 , 会返回 ChannelFuture 对象 ;

④ 在客户端调用 Bootstrap 的 connect 方法 , 也会返回 ChannelFuture 对象 ;

3 . Netty 中的异步操作机制 :

① Future-Listener 机制 : Future 表示当前不知道结果 , 在未来的某个时刻才知道结果 , Listener 表示监听操作 , 监听返回的结果 ;

② Netty 异步模型的两个基础 : Future ( ChannelFuture 未来知道结果 ) , Callback ( 监听回调 ) ;

4 . 以客户端写出数据到服务器端为例 :

客户端写出数据 : 客户端调用写出数据方法 ChannelFuture writeAndFlush(Object msg) , 向服务器写出数据 ;

操作耗时 : 假设在服务器中接收到该数据后 , 要执行一个非常耗时的操作才能返回结果 , 就是操作非常耗时 ;

客户端不等待 : 客户端这里写出了数据 , 肯定不能阻塞等待写出操作的结果 , 需要立刻执行下面的操作 , 因此该方法是异步的 ;

客户端监听 : writeAndFlush 方法返回一个 ChannelFuture 对象 , 如果客户端需要该操作的返回结果 , 那么通过 ChannelFuture 可以监听该写出方法是否成功 ;

5 . 异步操作返回结果 :

① 返回结果 : Future 表示异步 IO 操作执行结果 , 通过该 Future 提供的 检索 , 计算 等方法检查异步操作是否执行完成 ;

② 常用接口 : ChannelFuture 继承了 Future , 也是一个接口 , 可以为该接口对象注册监听器 , 当异步任务完成后会回调该监听器方法 ;

public interface ChannelFuture extends Future<Void>

6 . Future 链式操作 : 这里以读取数据 , 处理后返回结果为例 ;

  • 数据读取操作 ;

  • 对读取的数据进行解码处理 ;

  • 执行业务逻辑

  • 将数据编码 ;

  • 将编码后的数据写出 ;

上述 555 个步骤 , 每个数据处理操作 , 都有与之对应的 Handler 处理器 ;

异步机制 : 在 Handler 处理器中需要实现异步机制 , 一般使用 Callback 回调 , 或 Future 机制 ;

链式操作优势 : 上述的链式操作 , 简洁 , 高效 , 可以让开发者快速开发高性能 , 高可靠性服务器 , 只关注业务逻辑 , 不用过多的将精力浪费在网络基础功能开发上 ;

这里的网络基础功能就是高可靠性 , 高性能的网络传输模块 ;

三、 Future-Listener 机制


1 . Future-Listener 机制 :

① Future 返回值 : 在 Netty 中执行 IO 操作 , 如 bind , read , write , connect 等方法 , 会立刻返回 ChannelFuture 对象 ;

② ChannelFuture 返回时状态 : 调用 IO 方法后 , 立刻返回 ChannelFuture 对象 , 此时该操作未完成 ;

③ 注册监听器 : ChannelFuture 可以设置 ChannelFutureListener 监听器 , 监听该 IO 操作完成状态 , 如果 IO 操作完成 , 那么会回调其 public void operationComplete(ChannelFuture future) throws Exception 接口实现方法 ;

④ IO 操作执行状态判定 : 在 operationComplete 方法中通过 调用 ChannelFuture future 参数的如下方法 , 判定当前 IO 操作完成状态 ;

  • future.isDone() : IO 操作是否完成 ;
  • future.isSuccess() : IO 操作是否成功 ; ( 常用 )
  • future.isCancelled() : IO 操作是否被取消 ;
  • future.cause() : IO 操作的失败原因 ;

2 . IO 操作的同步与异步 :

① 同步 IO 操作 : BIO 中的同步 IO 操作 , 会阻塞当前的线程 , IO 操作返回前 , 处于阻塞状态 , 不能执行其它操作 ;

② 异步 IO 操作 : 异步 IO 操作不会阻塞当前的线程 , 调用 IO 操作之后 , 可以立即执行其它操作 , 不会阻塞当前线程 , 该机制非常适用于高并发的场景 , 开发稳定 , 并发 , 高吞吐量 的服务器 ;

3 . 核心代码示例 :

// 监听绑定操作的结果
// 添加 ChannelFutureListener 监听器, 监听 bind 操作的结果
channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if(future.isDone()){System.out.println("绑定端口完成");}if(future.isSuccess()){System.out.println("绑定端口成功");}else{System.out.println("绑定端口失败");}if(future.isCancelled()){System.out.println("绑定端口取消");}System.out.println("失败原因 : " + future.cause());}
});

四、 Future-Listener 机制代码示例


1 . 代码示例 : 这里以服务器程序为例 , 客户端程序就不贴了 ;

package kim.hsl.netty;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;/*** Netty 案例服务器端*/
public class Server {public static void main(String[] args) {// 1. 创建 BossGroup 线程池 和 WorkerGroup 线程池, 其中维护 NioEventLoop 线程//     NioEventLoop 线程中执行无限循环操作// BossGroup 线程池 : 负责客户端的连接// 指定线程个数 : 客户端个数很少, 不用很多线程维护, 这里指定线程池中线程个数为 1EventLoopGroup bossGroup = new NioEventLoopGroup(1);// WorkerGroup 线程池 : 负责客户端连接的数据读写EventLoopGroup workerGroup = new NioEventLoopGroup();// 2. 服务器启动对象, 需要为该对象配置各种参数ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup) // 设置 主从 线程组 , 分别对应 主 Reactor 和 从 Reactor.channel(NioServerSocketChannel.class)  // 设置 NIO 网络套接字通道类型.option(ChannelOption.SO_BACKLOG, 128)  // 设置线程队列维护的连接个数.childOption(ChannelOption.SO_KEEPALIVE, true)  // 设置连接状态行为, 保持连接状态.childHandler(  // 为 WorkerGroup 线程池对应的 NioEventLoop 设置对应的事件 处理器 Handlernew ChannelInitializer<SocketChannel>() {// 创建通道初始化对象@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 该方法在服务器与客户端连接建立成功后会回调// 为 管道 Pipeline 设置处理器 Hanedler// 这里暂时设置为 null , 执行不会失败 , 服务器绑定端口会成功ch.pipeline().addLast(null);}});System.out.println("服务器准备完毕 ...");ChannelFuture channelFuture = null;try {// 绑定本地端口, 进行同步操作 , 并返回 ChannelFuturechannelFuture = bootstrap.bind(8888).sync();System.out.println("服务器开始监听 8888 端口 ...");// ( 本次示例核心代码 ) ----------------------------------------------------------// 监听绑定操作的结果 ( 本次示例核心代码 )// 添加 ChannelFutureListener 监听器, 监听 bind 操作的结果channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if(future.isDone()){System.out.println("绑定端口完成");}if(future.isSuccess()){System.out.println("绑定端口成功");}else{System.out.println("绑定端口失败");}if(future.isCancelled()){System.out.println("绑定端口取消");}System.out.println("失败原因 : " + future.cause());}});// ( 本次示例核心代码 ) ----------------------------------------------------------// 关闭通道 , 开始监听操作channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {// 出现异常后, 优雅的关闭bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

2 . 执行结果 : 执行上述服务器 , 由此可见 绑定 bind 操作执行完成 , 并且执行成功 , 没有失败 , 因此失败原因为 null ;

【Netty】Netty 异步任务模型 及 Future-Listener 机制相关推荐

  1. 【Netty】利用Netty实现心跳检测和重连机制

    一.前言 心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制.   我们用到的很多框架都用到了心跳检测,比如服务注册到 Eureka Server 之后会维 ...

  2. Netty(一) SpringBoot 整合长连接心跳机制

    https://github.com/crossoverJie/JCSprout 原创: crossoverJie 阅读原文 前言 Netty 是一个高性能的 NIO 网络框架,本文基于 Spring ...

  3. JavaScript学习笔记(五)---cookie、Proxy、服务器、PHP语言、http协议、同步异步、事件轮循机制、ajax编写、接口

    JavaScript学习笔记(五)---cookie.Proxy.服务器.PHP语言.http协议.同步异步.事件轮循机制.ajax编写.接口 1.cookie 1.1cookie概念 1.2cook ...

  4. tt协议号服务器,TTIot: TTIoT云端物联网Iot组件;面向JAVA;netty;mqtt;异步推送;以事件为驱动;为设备提供安全可靠的连接通信能力;...

    TTIoT云端物联网组件;面向JAVA;以事件为驱动;为设备提供安全可靠的连接通信能力 TTIoT简介 TTIOT的Broker采用MQTT协议与设备进行交互,可以应用在数据采集.能源监控.智能生活. ...

  5. netty(异步非阻塞、实时、高效率)

    1.为什么使用netty 简单,再也不用编写复杂的代码逻辑去实现通信,再也不用考虑性能问题,不需要考虑编解码问题,半包读写问题. netty运用于Hadoop的RPC框架Avro,JMS框架Rocke ...

  6. 【Netty】 异步任务调度 ( TaskQueue | ScheduleTaskQueue | SocketChannel 管理 )

    文章目录 一. 任务队列 TaskQueue 二. 处理器 Handler 同步异步操作 三. 异步任务 ( 用户自定义任务 ) 四. 异步任务 ( 用户自定义定时任务 ) 五. 异步任务 ( 其它线 ...

  7. 通过 netty 实现异步任务回调获取执行结果

    目前java 8 之前的异步并发编程的API(callable.future.futuretask),都有一个共同的问题,就是要获取结果必须阻塞等待.netty提供了一种通过回调的方式获得结果的办法: ...

  8. Netty学习笔记(六)Pipeline的传播机制

    前面简单提到了下Pipeline的传播机制,这里再详细分析下 Pipeline的传播机制中有两个非常重要的属性inbound和outbound(AbstractChannelHandlerContex ...

  9. Netty -Netty心跳检测机制案例,Netty通过WebSocket编程实现服务器和客户端长链接

    Netty心跳检测机制案例 案例要求 编写一个Netty心跳检测机制案例,当服务器超过3秒没有读时,就提示读空闲 当服务器超过5秒没有写操作时,提示写空闲 服务器超过7秒没有读或者写操作时,就提示读写 ...

最新文章

  1. JavaScript 表单编程
  2. 用户体验改善案例_改善用户体验研究的5种习惯
  3. java.lang.UnsupportedOperationException 异常分析
  4. 读书感悟之,从术到道
  5. phpcms 指定id范围 调用_Dubbogo 源码笔记(二)客户端调用过程
  6. iptables介绍与实践
  7. 您的计算机性能不足 无法运行,绝地求生进不去游戏提示运行引擎需要DX11特性等级10.0,大神救我,刚做的系统W10 64位的,游戏也是刚下的...
  8. Unity中制作图片字体
  9. 受众引擎亮相百度世界 联盟产品进入3.0时代
  10. html肤质测试,皮肤致敏试验
  11. java三色球问题_C语言三色球问题代码解析
  12. android 图片处理过程中添加进度条,[Android] 随时拍图像处理部分总结及源码分......
  13. 华尔街最“伟大”骗子排行榜!
  14. 游戏策划小白笔记——Common Sense(一)
  15. ubuntu16.04下ORB_SLAM2的配置
  16. 步进电机和步进驱动器的介绍、接线、细分和控制方法
  17. Not Shading
  18. 【原创】ARM LINUX 外部RTC实时时钟驱动移植(RX8025)
  19. 一文秒懂什么是DDoS攻击
  20. in与exist的区别

热门文章

  1. HTML5 应用的现状与前景
  2. Python学习之路-12 (递归)
  3. 5.1深入理解计算机系统——系统级I/O
  4. java追加文本到文件末尾
  5. login控件“您的登录尝试不成功。请重试”的解决方法
  6. SC-控制Windows服务的命令
  7. 关于.h .lib .dll的总结
  8. 阻止表中出现重复项——SQL UNIQUE 约束
  9. 多分辨率图像的快速查询
  10. sqlserver中实现split分割字符串函数