Netty系列(2)快速入门Netty线程模型、Netty入门程序、Netty任务队列
文章目录
- 1 Netty线程模型
- 1.1 传统阻塞 I/O 服务模型
- 1.2 Reactor线程模型
- 1.2.1 单 Reactor 单线程模型
- 1.2.2 单Reactor多线程
- 1.2.3 主从 Reactor 多线程
- 1.2.4 Reactor线程模型小结
- 1.3 Netty线程模型
- 1.3.1 简单版Netty模型
- 1.3.2 进阶版Netty模型
- 1.3.3 详细版Netty模型
- 2 Netty快速入门案例-TCP服务
- 2.1 服务端代码实现
- 2.2 客户端代码实现
- 3 Netty任务队列
- 3.1 用户自定义的普通任务
- 3.2 用户自定义定时任务
1 Netty线程模型
不同的线程模式,对程序的性能有很大影响,在学习Netty线程模式之前,首先需要了解下 各个线程模 式, 最后看看 Netty 线程模型有什么优越性。目前存在的线程模型有:
- 传统阻塞
I/O
服务模型 Reactor
模式Reactor
单线程;- 单
Reactor
多线程; - 主从
Reactor
多线程
1.1 传统阻塞 I/O 服务模型
- 模型特点
- 采用阻塞 IO 模式获取输入的数据
- 每个连接都需要独立的线程完成数据的输入,业务处理,数据返回
- 存在问题
- 当并发数很大,就会创建大量的线程,占用很大系统资源
- 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 read 操作,造成线程资源浪费
1.2 Reactor线程模型
针对传统阻塞 I/O 服务模型的 2 个缺点,解决方案:
- 基于
I/O
复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理 - 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
IO复用结合线程池,就是Reactor模式的基本设计思想。
Reactor 模式中核心组成
Reactor
:Reactor
在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO
事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人;Handlers
:处理程序执行I/O
事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor
通过调度适当的处理程序来响应I/O
事件,处理程序执行非阻塞操作。
1.2.1 单 Reactor 单线程模型
处理流程
- Selector是可以实现应用程序通过一个阻塞对象监听多路连接请求 ;
- Reactor 对象通过 Selector监控客户端请求事件,收到事件后通过 Dispatch 进行分发 ;
- 建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对 象处理连接完成后的后续业务处理 ;
- Handler 会完成 Read→业务处理→Send 的完整业务流程。
优缺点
- 优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
- 缺点:性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈
- 缺点:可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
使用场景:客户端的数量有限,业务处理非常快速,比如 Redis 在业务处理的时间复杂度 O(1) 的情况
1.2.2 单Reactor多线程
- 处理流程
- Reactor 对象通过 Select 监控客户端请求事件,收到事件后,通过 Dispatch 进行分发
- 如果建立连接请求,则右 Acceptor 通过 accept 处理连接请求,然后创建一个 Handler 对象处理完成连接后的各种事件
- 如果不是连接请求,则由 Reactor 分发调用连接对应的 handler 来处理
- handler 只负责响应事件,不做具体的业务处理,通过 read 读取数据后,会分发给后面的 worker 线程池的某个线程处理业务
- worker 线程池会分配独立线程完成真正的业务,并将结果返回给 handler
- handler 收到响应后,通过 send 将结果返回给 client
- 优缺点
- 优点:可以充分的利用多核 cpu 的处理能力
- 缺点:多线程数据共享和访问比较复杂,Reactor 处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈。
1.2.3 主从 Reactor 多线程
- 处理流程
Reactor
主线程 MainReactor 对象通过 select 监听连接事件,收到事件后,通过 Acceptor 处理连接事件- 当 Acceptor 处理连接事件后,MainReactor 将连接分配给 SubReactor
- subreactor 将连接加入到连接队列进行监听,并创建 handler 进行各种事件处理
- 当有新事件发生时,subreactor 就会调用对应的 handler 处理
- handler 通过 read 读取数据,分发给后面的 worker 线程处理
- worker 线程池分配独立的 worker 线程进行业务处理,并返回结果
- handler 收到响应的结果后,再通过 send 将结果返回给 client
- Reactor 主线程可以对应多个 Reactor 子线程,即 MainRecator 可以关联多个 SubReactor
1.2.4 Reactor线程模型小结
- 3中线程模型生活场景类比
- 单 Reactor 单线程,前台接待员和服务员是同一个人,全程为顾客服务
- 单 Reactor 多线程,1 个前台接待员,多个服务员,接待员只负责接待
- 主从 Reactor 多线程,多个前台接待员,多个服务生
- Reactor 模式具有如下的优点
- 响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的
- 可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销
- 扩展性好,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源
- 复用性好,Reactor 模型本身与具体事件处理逻辑无关,具有很高的复用性
1.3 Netty线程模型
Netty 的设计主要基于主从 Reactor 多线程模式,并做了一定的改进。
1.3.1 简单版Netty模型
BossGroup
线程维护Selector
,只关注Accecpt
- 当接收到
Accept
事件,获取到对应的SocketChannel
,封装成NIOScoketChannel
并注册到Worker
线程(事件循环),并进行维护 - 当
Worker
线程监听到Selector
中通道发生自己感兴趣的事件后,就进行处理(就由handler
),注意handler
已经加入到通道
1.3.2 进阶版Netty模型
- 有两组线程池:
BossGroup
和WorkerGroup
,BossGroup
中的线程专门负责和客户端建立 连接,WorkerGroup
中的线程专门负责处理连接上的读写,BossGroup
和WorkerGroup
类型都是NioEventLoopGroup
; BossGroup
和WorkerGroup
含有多个不断循环的执行事件处理的线程,每个线程都包含一 个 Selector,用于监听注册在其上的 Channel;- 每个
BossGroup
中的线程循环执行以下三个步骤- 轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
- 处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel,并将其注册到 WorkerGroup 中某个线程上的 Selector 上
- 处理任务队列的任务,即
runAllTasks
- 每个 WorkerGroup 中的线程循环执行以下三个步骤
- 轮询
read
,write
事件 - 处理
I/O
事件,即read
,write
事件,在对应NioScocketChannel
处理 - 处理任务队列的任务,即
runAllTasks
- 轮询
- 每个
Worker
NIOEventLoop
处理业务时,会使用pipeline
,pipeline
中包含了channel
,即通过pipeline
可以获取到对应通道,管道中维护了很多的处理器
1.3.3 详细版Netty模型
- Netty 抽象出两组线程池:BossGroup 和 WorkerGroup。每个线程池中都有 NioEventLoop 线程。BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的 线程专门负责处理连接上的读写。BossGroup 和 WorkerGroup 的类型都是
NioEventLoopGroup
NioEventLoopGroup
相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就 是一个 NioEventLooNioEventLoop
表示一个不断循环的执行处理任务的线程,每个NioEventLoop
都有一个Selector
,用于监听绑定在其上的socket
的网络通讯NioEventLoopGroup
可以有多个线程,即可以含有多个NioEventLoop
- 每个
BossNioEventLoop
循环执行的步骤有3
步:- 轮询
accept
事件 - 处理
accept
事件,与client
建立连接,生成NioScocketChannel
,并将其注册到某个worker
NIOEventLoop
上的Selector
- 处理任务队列的任务,即
runAllTasks
- 轮询
- 每个
WorkerNIOEventLoop
循环执行的步骤:- 轮询
read
,write
事件 - 处理
I/O
事件,即read
,write
事件,在对应NioScocketChannel
处理 - 处理任务队列的任务,即
runAllTasks
- 轮询
- 在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了 Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器 (拦截处理器、过滤处理器、自定义处理器等)。
2 Netty快速入门案例-TCP服务
2.1 服务端代码实现
package com.warybee.simple;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;/*** @description Netty开发一个服务端*/
public class NettyServer {public static void main(String[] args) throws InterruptedException {//1.创建bossGroup线程组: 处理网络连接事件 线程数默认为: 2 * 处理器线程数EventLoopGroup bossGroup = new NioEventLoopGroup();//2.创建workerGroup线程组: 处理网络事件--读写事件 2 * 处理器线程数EventLoopGroup workerGroup = new NioEventLoopGroup();try {//3.创建服务端启动助手ServerBootstrap bootstrap=new ServerBootstrap();//4.设置线程组bootstrap.group(bossGroup,workerGroup)//5.设置服务端通道实现.channel(NioServerSocketChannel.class)//6.参数设置-设置线程队列中等待连接个数.option(ChannelOption.SO_BACKLOG,128)//7.参数设置-设置活跃状态,child是设置workerGroup.childOption(ChannelOption.SO_KEEPALIVE,true)8.创建一个通道初始化对象.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//9.向pipeline中添加自定义业务处理handlerch.pipeline().addLast(new NettyServerHandler());}});System.out.println("服务端就绪");//10.启动服务端并绑定端口,同时将异步改为同步ChannelFuture channelFuture = bootstrap.bind(9999).sync();//11.关闭通道(并不是真正意义上的关闭,而是监听通道关闭状态)和关闭连接池channelFuture.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
Netty服务端自定义Handler
package com.warybee.simple;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;/*** @description 自定义服务端handle*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {/*** 读取客户端消息* @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("server ctx:"+ctx);ByteBuf byteBuffer=(ByteBuf)msg;Channel channel = ctx.channel();ChannelPipeline pipeline = ctx.pipeline();System.out.println("客户端发送的消息是:"+byteBuffer.toString(StandardCharsets.UTF_8));System.out.println("客户端地址:"+ctx.channel().remoteAddress());}/*** 数据读取完成* @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//发送消息给客户端ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端",StandardCharsets.UTF_8));}/*** 通道就绪* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);}/*** 异常* @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}
2.2 客户端代码实现
package com.warybee.simple;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;/*** @description*/
public class NettyClient {public static void main(String[] args) throws InterruptedException {//创建线程组EventLoopGroup eventExecutors = new NioEventLoopGroup();try {//创建客户端启动助手Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventExecutors).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyClientHandler());}});//启动客户端, 等待连接服务端,ChannelFuture cf = bootstrap.connect("127.0.0.1", 9999).sync();//关闭通道和关闭连接池cf.channel().closeFuture().sync();}finally {eventExecutors.shutdownGracefully();}}
}
Netty客户端自定义Handler
package com.warybee.simple;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;import java.nio.charset.StandardCharsets;/*** @description Netty客户端自定义Handler*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {/*** 通道就绪* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client "+ctx);ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务端", StandardCharsets.UTF_8));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf=(ByteBuf) msg;System.out.println("服务端回复的消息:"+byteBuf.toString(StandardCharsets.UTF_8));System.out.println("服务器地址:"+ctx.channel().remoteAddress());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
3 Netty任务队列
在使用Netty的时候,我们的业务处理都是放到我们自定义的handler里面,那么如果handler里面有一些执行比较耗时的操作的话,依旧会出现线程阻塞的情况,那么怎么来处理呢?
我们可以回过头去看看Netty的模型图,里面有一块是TaskQueue,这个就是Netty提供给我们的任务队列,可以用来异步处理任务,它是和channel是一一绑定的。
3.1 用户自定义的普通任务
在定义Handler里面通过ChannelHandlerContext
获取channel
/*** 通道就绪* @param ctx* @throws Exception*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {//TODO 这里可以执行耗时任务ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", StandardCharsets.UTF_8));}});System.out.println("client "+ctx);ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务端", StandardCharsets.UTF_8));
}
3.2 用户自定义定时任务
任务是提交到 scheduleTaskQueue
中
/*** 通道就绪* @param ctx* @throws Exception*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {//用户自定义定时任务,5秒后执行ctx.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {try {//TODO 这里可以执行耗时任务ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", CharsetUtil.UTF_8));System.out.println("channel code=" + ctx.channel().hashCode());} catch (Exception ex) {System.out.println("发生异常" + ex.getMessage());}}}, 5, TimeUnit.SECONDS);
}
Netty系列(2)快速入门Netty线程模型、Netty入门程序、Netty任务队列相关推荐
- Netty实战七之EventLoop和线程模型
简单地说,线程模型指定了操作系统.编程语言.框架或者应用程序的上下文中的线程管理的关键方面.Netty的线程模型强大但又易用,并且和Netty的一贯宗旨一样,旨在简化你的应用程序代码,同时最大限度地提 ...
- 2. 彤哥说netty系列之IO的五种模型
你好,我是彤哥,本篇是netty系列的第二篇. 欢迎来我的公从号彤哥读源码系统地学习源码&架构的知识. 简介 本文将介绍linux中的五种IO模型,同时也会介绍阻塞/非阻塞与同步/异步的区别. ...
- Netty工作笔记0034---Netty架构设计--线程模型
技术交流QQ群[JAVA,C++,Python,.NET,BigData,AI]:170933152
- Netty系列之Netty线程模型
关注点在于:如何灵活的动态绑定IO事件处理,又能进行串行化处理减少锁的使用 摘自:http://www.infoq.com/cn/articles/netty-threading-model 1. 背 ...
- Netty学习(三):Netty线程模型和代码示例
〇.前言 网络编程的基本线程模型,详见:Netty学习(二):线程模型 一.工作原理简图 Netty主要基于主从 Reactors 多线程模型(如下图) 做了一定的改进,其中主从Reactor 多线程 ...
- Netty和RPC框架线程模型分析
<Netty 进阶之路>.<分布式服务框架原理与实践>作者李林锋深入剖析Netty和RPC框架线程模型.李林锋已在 InfoQ 上开设 Netty 专题持续出稿,感兴趣的同学可 ...
- Netty 和 RPC 框架线程模型分析
https://www.infoq.cn/article/9Ib3hbKSgQaALj02-90y 1. 背景 1.1 线程模型的重要性 对于 RPC 框架而言,影响其性能指标的主要有三个要素: I/ ...
- JavaSocket编程之Netty框架线程模型
1.Netty概述 Netty是一个由JBoss提供的高效的Java NIO client-server(客户端-服务器)开发框架,使用Netty可以快速开发网络应用.Netty提供了一种新的方式来使 ...
- reactor线程模型_简单了解Java Netty Reactor三种线程模型
1. Reactor三种线程模型 1.1. 单线程模型 Reactor单线程模型,指的是所有的IO操作都在同一个NIO线程上面完成,NIO线程的职责如下: 1)作为NIO服务端,接收客户端的TCP连接 ...
最新文章
- R筛选dataframe时间范围内的数据
- (摘抄)HTTP 协议详解
- Android静态安全检测 - WebView明文存储密码
- puppeteer执行js_使用Node.js和Puppeteer与表单和网页进行交互– 2
- Keil(MDK-ARM-STM32)系列教程(五)Configuration(Ⅰ)
- 【BZOJ 2957】 2957: 楼房重建 (线段树)
- linux创建删除用户和用户组
- STM8S103之IO复用
- python re.match函数的使用
- HTML+CSS+JavaScript仿京东购物商城网站 web前端制作服装购物商城 html电商购物网站...
- 图像分割阈值选取技术综述
- java学生管理系统购买_GitHub - Xiaoxin-love/StudentSystem: java学生管理系统
- Pytorch简单实现seq2seq+Attention机器人问答
- 深度学习——手写数字识别
- 微信公众号开发----生成带参数的临时二维码
- RISC-V指令集架构------RV32C压缩指令集
- 让curl支持IE代理
- QQ春节红包活动如何应对10亿级流量?看看大佬的复盘总结
- MySQL中文乱码问题处理详解
- 谷歌支付:无法购买您要买的商品。
热门文章
- 线上教育系统平台,企业如何才能运营呢?
- Python中turtle的用法(听课笔记)
- python 伪造源ip_HTTP请求源IP伪造
- DIP-数字图像处理-复习笔记
- C语言 立方体随鼠标转动,HTML5 盒子悬停动效 - 立方体沿鼠标方向翻滚
- 华为鸿蒙P30,华为Mate 30 Lite曝光 或搭载鸿蒙OS9月上市
- 多媒体播放器-VLC media player提供下载
- URL与URLs区别是什么?
- ssm+java计算机毕业设计职业高中智慧教学系统5vuz6(程序+lw+源码+远程部署)
- 二、MySQL——多表查询内容