一、概念

早期的 Java API 只支持由本地系统套接字库提供所谓的阻塞函数来支持网络编程由于是阻塞 I/O ,要管理多个并发客户端,需要为每个新的客户端Socket 创建一个 Thread 。这将导致一系列的问题,第一,在任何时候都可能有大量的线程处于休眠状态(不可能每时每刻都有对应的并发数);第二,需要为每个线程的调用栈都分配内存;第三,JVM 在线程的上下文切换所带来的开销会带来麻烦。

Java 在 2002 年引入了非阻塞 I/O,位于 JDK 1.4 的 java.nio 包中。class java.nio.channels.Selector 是Java 的非阻塞 I/O 实现的关键。它使用了事件通知以确定在一组非阻塞套接字中有哪些已经就绪能够进行 I/O 相关的操作。因为可以在任何的时间检查任意的读操作或者写操作的完成状态,所以如图 1-2 所示,一个单一的线程便可以处理多个并发的连接。

尽管可以直接使用 Java NIO API,但是在高负载下可靠和高效地处理和调度 I/O 操作是一项繁琐而且容易出错的任务,最好还是留给高性能的网络编程专家——Netty。

Netty 是一款异步的事件驱动的网络应用程序框架,支持快速的开发可维护的高性能的瞄向协议的服务端和客户端。它驾驭了Java高级API的能力,并将其隐藏在一个易于使用的API之后。首先,它的基于 Java NIO 的异步的和事件驱动的实现,保证了高负载下应用程序性能的最大化和可伸缩性。其次, Netty 也包含了一组设计模式,将应用程序逻辑从网络层解耦,简化了开发过程, 同时也最大限度地提高了可测试性、模块化以及代码的可重用性。

tips:面向对象的基本概念—> 用较简单的抽象隐藏底层实现的复杂性。

二、核心组件

  • Channel

Channel是Java NIO的一个基本构造。可以看作是传入或传出数据的载体。因此,它可以被打开或关闭,连接或者断开连接。以下是常用的Channel:

-- EmbeddedChannel
-- LocalServerChannel
-- NioDatagramChannel
-- NioSctpChannel
-- NioSocketChannel

  • 回调

    当一个回调被触发时,相应的事件可以被一个interface-ChannelHandler的实现处理。

  • Future

    Netty中所有的I/O操作都是异步的。因为一个操作可能不会立即返回,所以我们需要一种在之后的某个时间点确定其结果的方法。

Future 和 回调 是相互补充的机制,提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。

Netty 提供了ChannelFuture,用于在执行异步操作的时候使用。每个Netty的出站I/O操作都会返回一个ChannelFuture。ChannelFuture能够注册一个或者多个ChannelFutureListener 实例。监听器的回调方法operationComplete(),将会在对应的操作完成时被调用。

  • ChannelHandler

    Netty 的主要组件是ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。

Netty 使用不同的事件来通知我们状态的改变或者是操作的状态,每个事件都可以被分发给ChannelHandler类中某个用户实现的方法。Netty提供了大量预定义的可以开箱即用的ChannelHandler实现,包括用于各种协议的ChannelHandler。

现在,事件可以被分发给ChannelHandler类中某个用户实现的方法。那么,如果 ChannelHandler 处理完成后不直接返回给客户端,而是传递给下一个ChannelHandler 继续处理呢?那么就要说到 ChannelPipeline !

ChannelPipeline 提供了 ChannelHandler链 的容器,并定义了用于在该链上传播入站和出站事件流的API。使得事件流经 ChannelPipeline 是 ChannelHandler 的工作,它们是在应用程序的初始化或者引导阶段被安装的。这些对象接收事件、执行他们所实现的处理逻辑,并将数据传递给链中的下一个ChannelHandler:

1、一个ChannelInitializer的实现被注册到了ServerBootstrap中。
2、当 ChannelInitializer.initChannel()方法被调用时, ChannelInitializer将在 ChannelPipeline 中安装一组自定义的 ChannelHandler。
3、ChannelInitializer 将它自己从 ChannelPipeline 中移除。

  • EventLoop

EventLoop 定义了Netty的核心抽象,用来处理连接的生命周期中所发生的事件,在内部,将会为每个Channel分配一个EventLoop。

EventLoop本身只由一个线程驱动,其处理了一个Channel的所有I/O事件,并且在该EventLoop的整个生命周期内都不会改变。这个简单而强大的设计消除了你可能有的在ChannelHandler实现中需要进行同步的任何顾虑。

这里需要说到,EventLoop的管理是通过EventLoopGroup来实现的。还要一点要注意的是,客户端引导类是 Bootstrap,只需要一个EventLoopGroup。服务端引导类是ServerBootstrap,通常需要两个 EventLoopGroup,一个用来接收客户端连接,一个用来处理 I/O 事件(也可以只使用一个 EventLoopGroup,此时其将在两个场景下共用同一个 EventLoopGroup)。

1、一个 EventLoopGroup 包含一个或者多个 EventLoop;
2、一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
3、所有由 EventLoop 处理的 I/O 事件都将在它专有的Thread 上被处理;
4、一个 Channel 在它的生命周期内只注册于一个EventLoop;
5、NIO中,一个 EventLoop 分配给多个 Channel(面对多个Channel,一个 EventLoop 按照事件触发,顺序执行); OIO中,一个 EventLoop 分配给一个 Channel。

tips:Netty 应用程序的一个一般准则:尽可能的重用 EventLoop,以减少线程创建所带来的开销。

  • Bootstrap 和 ServerBootstrap

    BootStarp 和 ServerBootstrap 被称为引导类,指对应用程序进行配置,并使他运行起来的过程。Netty处理引导的方式是使你的应用程序和网络层相隔离。

BootStrap 是客户端的引导类,Bootstrap 在调用 bind()(连接UDP)和 connect()(连接TCP)方法时,会新创建一个 Channel,仅创建一个单独的、没有父 Channel 的 Channel 来实现所有的网络交换。

ServerBootstrap 是服务端的引导类,ServerBootstarp 在调用 bind() 方法时会创建一个 ServerChannel 来接受来自客户端的连接,并且该 ServerChannel 管理了多个子 Channel 用于同客户端之间的通信。

三、实例

所有的Netty服务端/客户端都至少需要两个部分:

1、至少一个ChannelHandler —— 该组件实现了对数据的处理。

2、引导 —— 这是配置服务器的启动代码。

服务端:

public classEchoServer {private final intport;public EchoServer(intport) {this.port =port;}public void start() throwsInterruptedException {final EchoServerHandler serverHandler = newEchoServerHandler();//1、创建EventLoopGroup以进行事件的处理,如接受新连接以及读/写数据EventLoopGroup group = newNioEventLoopGroup();try{//2、创建ServerBootstrap,引导和绑定服务器ServerBootstrap bootstrap = newServerBootstrap();bootstrap.group(group, group)//3、指定所使用的NIO传输Channel.channel(NioServerSocketChannel.class)//4、使用指定的端口设置套接字地址.localAddress(newInetSocketAddress(port))//5、添加一个 EchoServerHandler 到子 Channel的 ChannelPipeline//当一个新的连接被接受时,一个新的子Channel将会被创建,而 ChannelInitializer 将会把一个你的EchoServerHandler 的实例添加到该 Channel 的 ChannelPipeline 中.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected voidinitChannel(SocketChannel ch) {ChannelPipeline pipeline=ch.pipeline();pipeline.addLast(serverHandler);}});//6、异步地绑定服务器,调用sync()方法阻塞等待直到绑定完成ChannelFuture channelFuture =bootstrap.bind().sync();System.out.println(EchoServer.class.getName() + "started and listening for connections on" +channelFuture.channel().localAddress());//7、获取 Channel 的 CloseFuture,并且阻塞当前线程直到它完成
channelFuture.channel().closeFuture().sync();}finally{//8、关闭 EventLoopGroup 释放所有的资源
group.shutdownGracefully().sync();}}public static void main(String[] args) throwsInterruptedException {new EchoServer(9999).start();}
}

@ChannelHandler.Sharable //标识一个Channel-Handler 可以被多个Channel安全的共享
public class EchoServerHandler extendsChannelHandlerAdapter {/*** 对于每个传入的消息都要调用**@paramctx*@parammsg*@throwsException*/@Overridepublic voidchannelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf in=(ByteBuf) msg;System.out.println("Server received:" +in.toString(CharsetUtil.UTF_8));//将接收到的消息写给发送者,而不冲刷出站消息//ChannelHandlerContext 发送消息。导致消息向下一个ChannelHandler流动//Channel 发送消息将会导致消息从 ChannelPipeline的尾端开始流动
ctx.write(in);}/*** 通知 ChannelHandlerAdapter 最后一次对channel-Read()的调用是当前批量读取中的最后一条消息**@paramctx*@throwsException*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throwsException {//暂存于ChannelOutboundBuffer中的消息,在下一次调用flush()或者writeAndFlush()方法时将会尝试写出到套接字//将这份暂存消息冲刷到远程节点,并且关闭该Channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);}/*** 在读取操作期间,有异常抛出时会调用**@paramctx*@paramcause*@throwsException*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {cause.printStackTrace();ctx.close();}}

EchoServerHandler.java

客户端:

public classEchoClient {private finalString host;private final intport;public EchoClient(String host, intport) {this.host =host;this.port =port;}public void start() throwsInterruptedException {EventLoopGroup group= newNioEventLoopGroup();try{//创建BootstrapBootstrap bootstrap = newBootstrap();//指定 EventLoopGroup 以处理客户端事件;适应于NIO的实现
bootstrap.group(group)//适用于NIO传输的Channel类型.channel(NioSocketChannel.class).remoteAddress(newInetSocketAddress(host, port))//在创建Channel时,向ChannelPipeline中添加一个EchoClientHandler实例.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throwsException {ch.pipeline().addLast(newEchoClientHandler());}});//连接到远程节点,阻塞等待直到连接完成ChannelFuture channelFuture =bootstrap.connect().sync();//阻塞,直到Channel 关闭
channelFuture.channel().closeFuture().sync();}finally{//关闭线程池并且释放所有的资源
group.shutdownGracefully().sync();}}public static void main(String[] args) throwsInterruptedException {new EchoClient("127.0.0.1", 9999).start();System.out.println("------------------------------------");new EchoClient("127.0.0.1", 9999).start();System.out.println("------------------------------------");new EchoClient("127.0.0.1", 9999).start();}}

@ChannelHandler.Sharable //标记该类的实例可以被多个Channel共享
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>{/*** 当从服务器接收到一条消息时被调用**@paramctx*@parammsg ByteBuf (Netty 的字节容器) 作为一个面向流的协议,TCP 保证了字节数组将会按照服务器发送它们的顺序接收*@throwsException*/@Overrideprotected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throwsException {System.out.println("Client" + ctx.channel().remoteAddress() + "connected");System.out.println(msg.toString(CharsetUtil.UTF_8));}/*** 在到服务器的连接已经建立之后将被调用**@paramctx*@throwsException*/@Overridepublic voidchannelActive(ChannelHandlerContext ctx)  {ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock!", CharsetUtil.UTF_8));}/*** 在处理过程中引发异常时被调用**@paramctx*@paramcause*@throwsException*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {cause.printStackTrace();ctx.close();}}

EchoClientHandler.java

四、结语

带着一阵迷糊就开始了Netty学习之旅,学到现在还是对Netty一堆专有名词头大!没办法,只好硬着头皮学下去了,毕竟,熟读唐诗三百首,不会作诗也会吟嘛!

来总结下,一个Netty服务端处理客户端连接的过程:

1、创建一个channel同该用户端进行绑定;
2、channel从EventLoopGroup获得一个EventLoop,并注册到该EventLoop,channel生命周期内都和该EventLoop在一起(注册时获得selectionKey);
3、channel同用户端进行网络连接、关闭和读写,生成相对应的event(改变selectinKey信息),触发eventloop调度线程进行执行;
4、ChannelPipeline 找到对应 ChannelHandler 方法处理用户逻辑。

我们项目中使用的 Netty 服务端启动类:

public classNettyServer {public static final Logger logger = LoggerFactory.getLogger(NettyServer.class);private static Integer LISTENER_PORT = PropertiesLoader.getResourcesLoader().getInteger("nettyPort");private intport;EventLoopGroup boss= null;EventLoopGroup worker= null;ServerBootstrap serverBootstrap= null;public static NettyServer nettyServer = null;public staticNettyServer getInstance() {if (nettyServer == null) {synchronized (NettyServer.class) {if (nettyServer == null) {nettyServer= new NettyServer(LISTENER_PORT==null?9999:LISTENER_PORT);}}}returnnettyServer;}/*** 构造函数**@paramport 端口*/private NettyServer(intport) {this.port =port;}/*** 绑定**@throwsInterruptedException*/public void init() throwsInterruptedException {try{//创建两个线程池//目前服务器CPU为单核8线程,调整线程为8boss = new NioEventLoopGroup(8);worker= new NioEventLoopGroup(8);serverBootstrap= newServerBootstrap();serverBootstrap.group(boss, worker);//两个工作线程serverBootstrap.channel(NioServerSocketChannel.class);//重用缓冲区
serverBootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);//自动调整下一次缓冲区建立时分配的空间大小,避免内存的浪费
serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);//当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度,默认值50。serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);//用于启用或关于Nagle算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送。默认为false。serverBootstrap.option(ChannelOption.TCP_NODELAY, true);//是否启用心跳保活机制serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//支持tcp协议//bootstrap.childHandler(new TcpChannelInitializer());//支持webSocket协议serverBootstrap.childHandler(newWebSocketChannelInitializer());ChannelFuture f=serverBootstrap.bind(port).sync();if(f.isSuccess()) {logger.info("netty server start...");}//等到服务端监听端口关闭
f.channel().closeFuture().sync();}finally{//优雅释放线程资源
boss.shutdownGracefully().sync();worker.shutdownGracefully().sync();}}/*** 销毁netty相关资源*/public voiddestroy() {try{if (boss != null) {boss.shutdownGracefully();}if (worker != null) {worker.shutdownGracefully();}if (serverBootstrap != null) {serverBootstrap= null;}}catch(Exception e) {logger.error("netty close err:" +e.getMessage(), e);}}
}

NettyServer.java

tips: ServerBootstrap  中增加了一个方法childHandler(),它的目的是添加 ChannelHandler ;Bootstrap 中添加 ChannelHandler 用 handler() 方法。

参考资料:《Netty IN ACTION》

演示源代码:https://github.com/JMCuixy/NettyDemo

转载于:https://www.cnblogs.com/jmcui/p/9154842.html

Netty 系列一(核心组件和实例).相关推荐

  1. java channel源码_彤哥说netty系列之Java NIO核心组件之Channel

    你好,我是彤哥,本篇是netty系列的第五篇. 欢迎来我的工从号彤哥读源码系统地学习源码&架构的知识. 简介 上一章我们一起学习了如何使用Java原生NIO实现群聊系统,这章我们一起来看看Ja ...

  2. java nio attachment_7. 彤哥说netty系列之Java NIO核心组件之Selector

    --日拱一卒,不期而至! 你好,我是彤哥,本篇是netty系列的第七篇. 简介 上一章我们一起学习了Java NIO的核心组件Buffer,它通常跟Channel一起使用,但是它们在网络IO中又该如何 ...

  3. java channel源码_5. 彤哥说netty系列之Java NIO核心组件之Channel

    你好,我是彤哥,本篇是netty系列的第五篇. 简介 上一章我们一起学习了如何使用Java原生NIO实现群聊系统,这章我们一起来看看Java NIO的核心组件之一--Channel. 思维转变 首先, ...

  4. 6. 彤哥说netty系列之Java NIO核心组件之Buffer

    --日拱一卒,不期而至! 你好,我是彤哥,本篇是netty系列的第六篇. 简介 上一章我们一起学习了Java NIO的核心组件Channel,它可以看作是实体与实体之间的连接,而且需要与Buffer交 ...

  5. 【Netty系列_3】Netty源码分析之服务端channel

    highlight: androidstudio 前言 学习源码要有十足的耐性!越是封装完美的框架,内部就越复杂,源码很深很长!不过要抓住要点分析,实在不行多看几遍,配合debug,去一窥优秀框架的精 ...

  6. netty系列之:自定义编码和解码器要注意的问题

    文章目录 简介 自定义编码器和解码器的实现 ReplayingDecoder 总结 简介 在之前的系列文章中,我们提到了netty中的channel只接受ByteBuf类型的对象,如果不是ByteBu ...

  7. netty系列之:netty架构概述

    文章目录 简介 netty架构图 丰富的Buffer数据机构 零拷贝 统一的API 事件驱动 其他优秀的特性 总结 简介 Netty为什么这么优秀,它在JDK本身的NIO基础上又做了什么改进呢?它的架 ...

  8. 【读后感】Netty 系列之 Netty 高性能之道 - 相比 Mina 如何 ?

    [读后感]Netty 系列之 Netty 高性能之道 - 相比 Mina 如何 ? 太阳火神的美丽人生 (http://blog.csdn.net/opengl_es) 本文遵循"署名-非商 ...

  9. BizTalk学习笔记系列之二:实例说明如何使用BizTalk

    BizTalk学习笔记系列之二:实例说明如何使用BizTalk --.BizTalk学习笔记系列之二<?XML:NAMESPACE PREFIX = O /> Aaron.Gao,2006 ...

  10. Netty系列(三):说说NioEventLoop

    前言 本来想先写下NioServerSocketChannel以及NioSocketChannel的注册流程的,但是最后发现始终离不开NioEventLoop这个类,所以在这之前必须得先讲解下NioE ...

最新文章

  1. 面试题3-二维数组中的查找
  2. 从移动端开发者的角度聊微软的困境和机会
  3. C#事件回调委托EventHandler
  4. osg 倾斜数据纹理_高科技构筑逼真效果——无人机倾斜摄影技术在实景三维建模的应用及展望...
  5. WebFlux响应式编程基础之 4 reactive stream 响应式流
  6. c语言错误spawning,C语言一直出现Error spawning cl.exe的解决办法
  7. Code Review 是一场苦涩但有意思的修行 | 凌云时刻
  8. pc微信登录扫码显示无法连接服务器,WeAuth微信小程序实现PC网站扫码授权登录...
  9. 计算机老年学校讲义,天津老年大学计算机类教学大纲
  10. farm ugly chicken kick(fuck)这个计算机词汇的由来
  11. mysql客户端字符集_设置MySQL客户端连接使用的字符集
  12. JavaScript进阶讲解六—>js函数式编程
  13. TDD测试驱动开发案例【水货】
  14. 矩阵乘法np.dot()及np.multipy()区别
  15. idea 设置字体大小
  16. 【USB】STM32模拟USB鼠标
  17. html网页怎么弄背景 图片,在html网页中如何设置背景图片?网页背景怎么设置?...
  18. gimtehseet工时管理系统介绍
  19. 区块链技术具体要用到什么开发语言?
  20. java.lang.NoSuchMethodError: org.springframework.http.MediaType.getCharSet()Ljava/nio/charset/Charse

热门文章

  1. SQL基础--完整性约束
  2. Codeforces Beta Round #9 (Div. 2 Only) C. Hexadecimal's Numbers dfs
  3. 软考中高项学员:2016年3月14日作业
  4. 以太网单播、组播、广播
  5. 读《c#与算法--快速排序》随笔
  6. Mysql 获取当月和上个月第一天和最后一天的SQL
  7. 跳出误区:Java程序员进阶架构师真的没你想象的那么简单......
  8. 程序员们请收好这本JVM日历:Java 2018大事回顾
  9. 【Android学习笔记】设置App启动页
  10. Window10+VS2015+DevExpress.net 15.1.7完美破解(图)