开如启动我们有Netty的服务端,用的方法对象是ServerBootstrap对象
具体的Netty的服务端的代码如下

public class MyNettyServer {public static void main(String[] args) {// 因为我们的Netty用的是主从模式的开发模式,所以要设置二个主从组给这个服务器引擎,// 同时这个二个group可以在创建的时候可以指定这个组当中可以有多少个线程,默认是1个NioEventLoopGroup parent = new NioEventLoopGroup();NioEventLoopGroup chlid = new NioEventLoopGroup();try {// 先进行构建我们的服务器引擎ServerBootStrap,直接new也就可以了ServerBootstrap serverBootStrap = new ServerBootstrap();serverBootStrap.group(parent, chlid);// 进行配置这个服务器引擎ServerBootStrap,刚创建时是不能直接启动这个这个服务器引擎的,要配置三个基本的配置的// 这个三个基本的配置如下:channel,handler,option// 这个表示的是如果我们的服务器如果如果不过来这么多的通道,最多只能让1024个通道时进行阻塞等待serverBootStrap.option(ChannelOption.SO_BACKLOG, 1024);serverBootStrap.option(ChannelOption.SO_KEEPALIVE, true);// 给我们的引擎设置一下我们的通道serverBootStrap.channel(NioServerSocketChannel.class);// 最后再注册一个handler处理器对象,进行注册一个通道初始化器对象serverBootStrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// TODO Auto-generated method stubSystem.out.println("进行通道的初始化操作,这个方法是在我们的通道向Selector注册时,就会执行这个方法");ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,Delimiters.lineDelimiter()[0]));ch.pipeline().addLast(new StringDecoder(Charset.defaultCharset()));//进行数据的编码,然后发送给客户端ch.pipeline().addLast(new StringEncoder(Charset.defaultCharset()));//进行具体的一个管道调用处理操作ch.pipeline().addFirst(new MyHandlerTest());}});//现在进行绑定端口,同时用的是同步的方法ChannelFuture future=serverBootStrap.bind(8888).sync();//进行给我们的客户端发送响应数据
//          future.channel().writeAndFlush("开明乾坤,照大地...\r\n");ByteBuf bu=PooledByteBufAllocator.DEFAULT.buffer(1024);bu.writeBytes("开明乾坤,照大地".getBytes(Charset.defaultCharset()));future.channel().writeAndFlush(bu);//进行关闭操作,这个方法是一个阻塞的方法,线程会被阻塞到这里future.channel().closeFuture().sync();//进行那么操作// 最后进行关闭这个组} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}finally{parent.shutdownGracefully();chlid.shutdownGracefully();}}}

具体的服务器的handler的代码如下:

public class MyHandlerTest extends ChannelInboundHandlerAdapter {//这个方法是在我们的channel向我们的Select注册时调用这个方法@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {super.channelRegistered(ctx);}//这个方法是在我们的通道进行有数据发送过来时进行调用这个方法去取得通道中数据@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {super.channelRead(ctx, msg);System.out.println("有数据过来了,你要接收这个数据了");//进行判断一下我们的数据的类型if(msg instanceof ByteBuf){//进行具体数据处理过程如下:ByteBuf buf=(ByteBuf) msg;//进行数据的转换操作,也可以Charset.defaultCharset()System.out.println("我们的服务器接收到客户端发过来的数据为:"+buf.toString(Charset.forName("UTF-8")));}}}

具体的客户端的代码如下:

public class MyNettyClient {public static void main(String[] args) {//进行创建事件组对象NioEventLoopGroup group=new NioEventLoopGroup();try {//创建一个客户端启动引擎,用的是BootstrapBootstrap bootStrap=new Bootstrap();//进行增加组对象bootStrap.group(group);//进行客户端启动引擎的基本的配置如下bootStrap.option(ChannelOption.SO_KEEPALIVE,true)//保持存活.option(ChannelOption.SO_RCVBUF,1024) //设置接受缓冲区的大小.channel(NioSocketChannel.class)       //设置突然我们客户端的通道为SocketChannel.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {//进行基本的客户端的初始化操作System.out.println("我们的客户端开始工作了...");//增加一些基本的处理器操作//增加数据分割符,数据后面增加一个分割符,也就是//Delimiters.lineDelimiter()[0],这个lineDelimiter返回有二个元素//的数据,第一个表示的是\r\n,第二个是\n,这里用的是第一个ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,Delimiters.lineDelimiter()[0]));//进行增加一些数据的解码和编码的操作,这个Charset.defaultCharset()就是UTF-8ch.pipeline().addLast(new StringEncoder());//这个进行接收字符串的数据的操作ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new MyReadDataHandler());}});//进行启动我们的客户端引擎对象,使用的是同步的启动方法ChannelFuture future=bootStrap.connect("localhost",8888).sync();//进行给服务器发送数据,Delimiters.lineDelimiter()这个表示数据有\r\n时就把这个数据发送给服务器
//          future.channel().write("杨海龙".getBytes(Charset.forName("UTF-8")));//这种方式不可以
//          future.channel().writeAndFlush(Delimiters.lineDelimiter()[0]);//不可以future.channel().writeAndFlush("杨海龙");//注意这个不要增加回换行在数据的后面,因为在定义编码的时候默认增加上去了
//          future.channel().writeAndFlush(Delimiters.lineDelimiter()[0]);//不可以//第二种数据写出的方式ByteBuf buf=PooledByteBufAllocator.DEFAULT.buffer(1024);//进行写入数据buf.writeBytes("李四".getBytes(Charset.defaultCharset()));//最后进行写出数据future.channel().writeAndFlush(buf);//进行通道等待关闭的操作future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();}finally{group.shutdownGracefully();}}
}

我们有客户端的Handler的代码如下:

public class MyReadDataHandler extends ChannelInboundHandlerAdapter {//这个方法是用来注册我们有Channnel时调用这的方法 @Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {// TODO Auto-generated method stubsuper.channelRegistered(ctx);}//我们的服务器有发送数据过来时,就会调用这个方法进行读取这个这个数据@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {super.channelRead(ctx, msg);//进行数据的读取System.out.println("读取服务器发送过来的数据...");//进行接收数据了if(msg instanceof ByteBuf){ByteBuf buf=(ByteBuf) msg;System.out.println("我们的从服务器接收到的数据为:"+buf.toString(Charset.defaultCharset()));}}}

现在开始一步一步的调度我们有Netty的服务器端的代码

// 先进行构建我们的服务器引擎ServerBootStrap,直接new也就可以了
ServerBootstrap serverBootStrap = new ServerBootstrap();

上面这个调用创建一个服务器的启动辅助类,加载了这个ServerBootStrap当中的所有的静态的成员的方法,其中有重要的ServerBootstrap的配置类ServerBootstrapConfig

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);//也会调用这个类的构造方法public ServerBootstrap() { }
}

同时我们的这个ServerBootstrap继承于AbstractBootstrap,所以也会调用我们有AbstractBootstrap的构造方法

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {volatile EventLoopGroup group;@SuppressWarnings("deprecation")private volatile ChannelFactory<? extends C> channelFactory;private volatile SocketAddress localAddress;private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();private volatile ChannelHandler handler;AbstractBootstrap() {// Disallow extending from a different package.}

执行到这个方法时会绑定我们有线程的模型

serverBootStrap.group(parent, chlid);

这个方法先把这个parent加入到当前类的父类AbstractBootstrap当中的group里面,然后再把child加入到serverBootstrap当中的group当中

    /*** The {@link EventLoopGroup} which is used to handle all the events for the to-be-created* {@link Channel}*/@SuppressWarnings("unchecked")public B group(EventLoopGroup group) {if (group == null) {throw new NullPointerException("group");}if (this.group != null) {throw new IllegalStateException("group set already");}this.group = group;return (B) this;}

ServerBootstrap当中的group方法

    /*** Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and* {@link Channel}'s.*/public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (childGroup == null) {throw new NullPointerException("childGroup");}if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = childGroup;return this;}

然后再进行我们有TCP参数的绑定操作:

            // 这个表示的是如果我们的服务器如果如果不过来这么多的通道,最多只能让1024个通道时进行阻塞等待serverBootStrap.option(ChannelOption.SO_BACKLOG, 1024);serverBootStrap.option(ChannelOption.SO_KEEPALIVE, true);// 给我们的引擎设置一下我们的通道serverBootStrap.channel(NioServerSocketChannel.class);

下面的方法调用了父类AbstractBootstrap的option的方法

    /*** Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got* created. Use a value of {@code null} to remove a previous set {@link ChannelOption}.*/@SuppressWarnings("unchecked")public <T> B option(ChannelOption<T> option, T value) {if (option == null) {throw new NullPointerException("option");}if (value == null) {synchronized (options) {options.remove(option);}} else {synchronized (options) {options.put(option, value);}}return (B) this;}

上面的这个options是我们父类AbstractBootstrap的静态的linkedHashMap的结构体

private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

对应的结果为:options={SO_BACKLOG=1024, SO_KEEPALIVE=true}
接下来开始设置我们有服务器的通道的方法了

// 给我们的引擎设置一下我们的通道
serverBootStrap.channel(NioServerSocketChannel.class);

具体的调用代码如下:
一样还是调用我们有ServerBootstrap的channel的方法,下面这个channelFactory(通道工厂类)是专门用我们调用这个方法时传递进来的字节类名来进行反射创建出来具体的通道类,也就是创建出来了NioServerSocketChannel类对象

    /*** The {@link Class} which is used to create {@link Channel} instances from.* You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your* {@link Channel} implementation has no no-args constructor.*/public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");}return channelFactory(new ReflectiveChannelFactory<C>(channelClass));}

接下来进行具体的handler的注册工作

// 最后再注册一个handler处理器对象,进行注册一个通道初始化器对象serverBootStrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// TODO Auto-generated method stubSystem.out.println("进行通道的初始化操作,这个方法是在我们的通道向Selector注册时,就会执行这个方法");ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,Delimiters.lineDelimiter()[0]));ch.pipeline().addLast(new StringDecoder(Charset.defaultCharset()));//进行数据的编码,然后发送给客户端ch.pipeline().addLast(new StringEncoder(Charset.defaultCharset()));//进行具体的一个管道调用处理操作ch.pipeline().addFirst(new MyHandlerTest());}});
     * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.*/public ServerBootstrap childHandler(ChannelHandler childHandler) {if (childHandler == null) {throw new NullPointerException("childHandler");}this.childHandler = childHandler;return this;}

接下是开始绑定我们有数据的了

//现在进行绑定端口,同时用的是同步的方法
ChannelFuture future=serverBootStrap.bind(8888).sync();

还是调用了我们有AbstractBootstrap的方法

    /*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind(InetAddress inetHost, int inetPort) {return bind(new InetSocketAddress(inetHost, inetPort));}/*** Create a new {@link Channel} and bind it.*/public ChannelFuture bind(SocketAddress localAddress) {validate();if (localAddress == null) {throw new NullPointerException("localAddress");}return doBind(localAddress);}private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();//这个方法具体的去调用我们有工厂的方法,然后调用在newChannel的方法当中调用了一个空构造方法创建出来我们ServerSocketChannel的通道类final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;}}

现在这个validate这个方法是我们ServerBootstrap类的方法的具体的实现如下

    /*** Validate all the parameters. Sub-classes may override this, but should* call the super method in that case.*/@SuppressWarnings("unchecked")public B validate() {if (group == null) {throw new IllegalStateException("group not set");}if (channelFactory == null) {throw new IllegalStateException("channel or channelFactory not set");}return (B) this;}

在我们有上面的这个DoBind方法当中调用了一个initAndRegister();

final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();init(channel);} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.//    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully//    added to the event loop's task queue for later execution.//    i.e. It's safe to attempt bind() or connect() now://         because bind() or connect() will be executed *after* the scheduled registration task is executed//         because register(), bind(), and connect() are all bound to the same thread.return regFuture;}

//这个方法具体的去调用我们有工厂的方法,然后调用在newChannel的方法当中调用了一个空构造方法创建出来我们ServerSocketChannel的通道类

    @Overridepublic T newChannel() {try {return clazz.getConstructor().newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + clazz, t);}}

调用这个初始化init的方法代码的操作的方法如下

@Overridevoid init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options0();//得到我们有在服务器当中设置的可选的参数synchronized (options) {setChannelOptions(channel, options, logger);}//我们没有设置服务器的att,所以这个可以不用看final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}//得到一个默认的DefaultChannelPipelineChannelPipeline p = channel.pipeline();//下面的这些方法都是我们有基本一些设置操作final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}//下面这些操作都是我们有handler的基本的设置p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}

进行具体的设置我们有服务器的可选的参数

    static void setChannelOptions(Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {//进行for取出我们有已经设置的optionfor (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {setChannelOption(channel, e.getKey(), e.getValue(), logger);}}@SuppressWarnings("unchecked")private static void setChannelOption(Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {try {//修改我们的配置文件,把这些数据设置进行我们有配置类当中if (!channel.config().setOption((ChannelOption<Object>) option, value)) {logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);}} catch (Throwable t) {logger.warn("Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);}}

其他的看后续文章

Netty的启动执行过程分析(一)相关推荐

  1. Ansible执行过程分析、异步模式和速度优化

    Ansible系列(七):执行过程分析.异步模式和速度优化 分类: Linux服务篇 undefined 我写了更完善的Ansible专栏文章:一步到位玩儿透Ansible Ansible系列文章:h ...

  2. 源码剖析 Netty 服务启动 NIO

    如果这个文章看不懂的话 , 建议反复阅读 Netty 与 Reactor 开篇立意.引用网友好的建议.看源码要针对性的看,最佳实践就是,带着明确的目的去看源码.抓主要问题,放弃小问题.主要的逻辑理解了 ...

  3. 【Netty之旅四】你一定看得懂的Netty客户端启动源码分析!

    前言 前面小飞已经讲解了NIO和Netty服务端启动,这一讲是Client的启动过程. 源码系列的文章依旧还是遵循大白话+画图的风格来讲解,本文Netty源码及以后的文章版本都基于:4.1.22.Fi ...

  4. ART运行时Semi-Space(SS)和Generational Semi-Space(GSS)GC执行过程分析

    Semi-Space(SS)GC和Generational Semi-Space(GSS)GC是ART运行时引进的两个Compacting GC.它们的共同特点是都具有一个From Space和一个T ...

  5. SparkStreaming在启动执行步鄹和DStream的理解

    目录: SparkStreaming启动执行步鄹: DStream和Rdd的理解 Linux中发送数据 SparkStreaming代码如下 结果展示: SparkStreaming启动执行步鄹: 1 ...

  6. Netty服务器启动源码剖析

    Netty服务器启动源码剖析 文章目录 Netty服务器启动源码剖析 1.Netty服务器启动源码剖析 1.1.执行new NioEventLoopGroup()时发生了什么 1.1.1.NioEve ...

  7. spring启动执行_执行器的Spring启动和安全性事件

    spring启动执行 Spring Boot Actuator提供了审核功能,用于在启用了Spring Security的Spring Boot应用程序中发布和侦听与安全相关的事件. 默认事件是身份验 ...

  8. PXE预启动执行环境简介

    预启动执行环境(Preboot eXecution Environment,PXE)也被称为预执行环境,提供了一种使用网络接口(Network Interface)启动计算机的机制.这种机制让计算机的 ...

  9. Linux中设置开机启动执行命令和普通用户配置环境变量开机启动生效

    记录:343 场景:在CentOS 7.9操作系统上,开机启动就执行自定义的命令,配置rc.local文件达到需求:在普通用户中配置环境变量开机启动生效,使用profile实现. 版本: 操作系统:C ...

最新文章

  1. 按下enter键在各个文本框中切换焦点_你真的了解Enter键吗?请先学习本文后再回答...
  2. Android仿QQ界面
  3. 【C语言】数据结构C语言版 实验5 递归
  4. eplan单线原理图多线原理图_EPLAN-文本
  5. 分布式文件系统HDFS原理篇
  6. 解决AndroidStudio报错问题:Missing essential plugin
  7. Tkinter 的text使用方法
  8. DirectX11.2前哨战 R7/R9显卡性能首测
  9. 服务器维护lol3.23,LOL3月23日测试服更新公告 3月23日更新内容介绍
  10. GitHub克隆下载加速
  11. 天泉湖酒店式养生社区服务中心漏电火灾监控系统的设计与应用
  12. 做结构化怎样选择文档类型
  13. 数据库机房管理系统的设计(SQL)
  14. SAS编程|SDTM-DM人口学域
  15. 《痞子衡嵌入式半月刊》 第 47 期
  16. 将16x2 LCD与Arduino连接方法
  17. JavaWeb 中的cookie
  18. 设计模式 状态模式 以自动售货机为例
  19. Intel IPP 介绍与Demo程序
  20. python编写一个判断完数的函数过程_第4章-30 找完数 (20分)python

热门文章

  1. 工作十年悟出的15条职场道理
  2. 四:以理论结合实践方式梳理前端 React 框架 ——— React 高级语法
  3. 折腾一天安装Centos7,以及后面恢复Win7引导的曲折历程
  4. win10Ie重置.html默认应用设置,win10 edge浏览器怎么重置?重置Microsoft Edge浏览器为Win10默认设置方法...
  5. 深度学习项目实战——Kaggle竞赛(Humpback Whale Identification)
  6. 突发!ChatGPT 开始大面积封号,注册功能关闭!亚洲成重灾区,网友自救喊话:不要登录,不要登录...
  7. 如果微信小程序中图片过多过大怎么办
  8. Android运行C/C++程序,无需ROOT!
  9. 名画343 张若澄《燕山八景图》
  10. Burden of Proof