之前在上一章Netty专题-(2)NIO三大核心中介绍了NIO的三大核心类Selector 、 Channel 和 Buffer,这一章我们将利用这些核心进行编程实现相关的一些功能。在正式进入编程之前我们还需要介绍一些概念。

1 NIO网络编程关系图


(1)当客户端连接时,会通过 ServerSocketChannel 得到 SocketChannel
(2)Selector 进行监听 select 方法, 返回有事件发生的通道的个数
(3)将 socketChannel 注册到 Selector 上, register(Selector sel, int ops), 一个 selector 上可以注册多个 SocketChannel
(4)注册后返回一个 SelectionKey, 会和该 Selector 关联(集合)
(5)进一步得到各个 SelectionKey (有事件发生)
(6)在通过 SelectionKey 反向获取 SocketChannel , 方法 channel()
(7)通过得到的 channel完成业务处理

2 NIO网络编程入门案例

为了让大家更好的对上面的流程理解,这里首先给大家一个样例,使用NIO实现服务器端和客户端之间的数据简单通讯(非阻塞),希望大家好好看完和自己动手操作,这样对大家理解 NIO 非阻塞网络编程机制有很大的帮助。我们直接上代码:
首先是服务端:

        //创建ServerSocketChannel -> ServerSocketServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//得到一个Selecor对象Selector selector = Selector.open();//绑定一个端口6666, 在服务器端监听serverSocketChannel.socket().bind(new InetSocketAddress(6666));//设置为非阻塞serverSocketChannel.configureBlocking(false);//把 serverSocketChannel 注册到  selector 关心 事件为 OP_ACCEPTserverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("注册后的selectionkey 数量=" + selector.keys().size()); //循环等待客户端连接while (true) {//这里我们等待1秒,如果没有事件发生, 返回if(selector.select(1000) == 0) { //没有事件发生System.out.println("服务器等待了1秒,无连接");continue;}//如果返回的>0, 就获取到相关的 selectionKey集合//1.如果返回的>0, 表示已经获取到关注的事件//2. selector.selectedKeys() 返回关注事件的集合//   通过 selectionKeys 反向获取通道Set<SelectionKey> selectionKeys = selector.selectedKeys();System.out.println("selectionKeys 数量 = " + selectionKeys.size());//遍历 Set<SelectionKey>, 使用迭代器遍历Iterator<SelectionKey> keyIterator = selectionKeys.iterator();while (keyIterator.hasNext()) {//获取到SelectionKeySelectionKey key = keyIterator.next();//根据key 对应的通道发生的事件做相应处理if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客户端连接//该该客户端生成一个 SocketChannelSocketChannel socketChannel = serverSocketChannel.accept();System.out.println("客户端连接成功 生成了一个 socketChannel " + socketChannel.hashCode());//将  SocketChannel 设置为非阻塞socketChannel.configureBlocking(false);//将socketChannel 注册到selector, 关注事件为 OP_READ, 同时给socketChannel关联一个BuffersocketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size()); //2,3,4..}if(key.isReadable()) {  //发生 OP_READ//通过key 反向获取到对应channelSocketChannel channel = (SocketChannel)key.channel();//获取到该channel关联的bufferByteBuffer buffer = (ByteBuffer)key.attachment();channel.read(buffer);System.out.println("form 客户端 " + new String(buffer.array()));}//手动从集合中移动当前的selectionKey, 防止重复操作keyIterator.remove();}}

然后是客户端:

    //得到一个网络通道SocketChannel socketChannel = SocketChannel.open();//设置非阻塞socketChannel.configureBlocking(false);//提供服务器端的ip 和 端口InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);//连接服务器if (!socketChannel.connect(inetSocketAddress)) {while (!socketChannel.finishConnect()) {System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作..");}}//...如果连接成功,就发送数据String str = "hello, likangmin~";//Wraps a byte array into a bufferByteBuffer buffer = ByteBuffer.wrap(str.getBytes());//发送数据,将 buffer 数据写入 channelsocketChannel.write(buffer);System.in.read();

以上创建的步骤基本是按照之前介绍的流程走的,大家应该可以看的明白。之后我们首先启动服务端;

然后启动客户端:

成功接收。

3 NIO相关类介绍

3.1 SelectionKey

(1)SelectionKey基本介绍
SelectionKey,表示 Selector 和网络通道的注册关系, 共四种:

int OP_ACCEPT:有新的网络连接可以 accept,值为 16
int OP_CONNECT:代表连接已经建立,值为 8
int OP_READ:代表读操作,值为 1
int OP_WRITE:代表写操作,值为 4

在其源码中表示为:

public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

(2)SelectionKey 相关方法

3.2 ServerSocketChannel

(1)ServerSocketChannel 在服务器端监听新的客户端 Socket 连接
(2)ServerSocketChannel相关方法

3.3 SocketChannel

(1)SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通道,或者把通道里的数 据读到缓冲区。
(2)SocketChannel相关方法

4 NIO应用实例-群聊系统

了解了NIO的网络编程以后,接下来我们具体实现一个比较难一点的要求:

  1. 编写一个 NIO 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
  2. 实现多人群聊
  3. 服务器端:可以监测用户上线,离线,并实现消息转发功能
  4. 客户端:通过 channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发 得到)

实现的效果图大致如下:

大家在往下看之前可以自己先动手实现一下,然后跟我这里给出的示例进行对比,这样更有助于大家进一步理解 NIO 非阻塞网络编程机制。
同样的首先编写服务端代码:

//定义属性private Selector selector;private ServerSocketChannel listenChannel;private static final int PORT = 6667;//构造器//初始化工作public GroupChatServer() {try {//得到选择器selector = Selector.open();//ServerSocketChannellistenChannel =  ServerSocketChannel.open();//绑定端口listenChannel.socket().bind(new InetSocketAddress(PORT));//设置非阻塞模式listenChannel.configureBlocking(false);//将该listenChannel 注册到selectorlistenChannel.register(selector, SelectionKey.OP_ACCEPT);}catch (IOException e) {e.printStackTrace();}}//监听public void listen() {System.out.println("监听线程: " + Thread.currentThread().getName());try {//循环处理while (true) {int count = selector.select();if(count > 0) {//有事件处理//遍历得到selectionKey 集合Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {//取出selectionkeySelectionKey key = iterator.next();//监听到acceptif(key.isAcceptable()) {SocketChannel sc = listenChannel.accept();sc.configureBlocking(false);//将该 sc 注册到seletorsc.register(selector, SelectionKey.OP_READ);//提示System.out.println(sc.getRemoteAddress() + " 上线 ");}if(key.isReadable()) { //通道发送read事件,即通道是可读的状态//处理读 (专门写方法..)readData(key);}//当前的key 删除,防止重复处理iterator.remove();}} else {System.out.println("等待....");}}}catch (Exception e) {e.printStackTrace();}finally {//发生异常处理....}}//读取客户端消息private void readData(SelectionKey key) {//取到关联的channleSocketChannel channel = null;try {//得到channelchannel = (SocketChannel) key.channel();//创建bufferByteBuffer buffer = ByteBuffer.allocate(1024);int count = channel.read(buffer);//根据count的值做处理if(count > 0) {//把缓存区的数据转成字符串String msg = new String(buffer.array());//输出该消息System.out.println("form 客户端: " + msg);//向其它的客户端转发消息(去掉自己), 专门写一个方法来处理sendInfoToOtherClients(msg, channel);}}catch (IOException e) {try {System.out.println(channel.getRemoteAddress() + " 离线了..");//取消注册key.cancel();//关闭通道channel.close();}catch (IOException e2) {e2.printStackTrace();;}}}//转发消息给其它客户(通道)private void sendInfoToOtherClients(String msg, SocketChannel self ) throws  IOException{System.out.println("服务器转发消息中...");System.out.println("服务器转发数据给客户端线程: " + Thread.currentThread().getName());//遍历 所有注册到selector 上的 SocketChannel,并排除 selffor(SelectionKey key: selector.keys()) {//通过 key  取出对应的 SocketChannelChannel targetChannel = key.channel();//排除自己if(targetChannel instanceof  SocketChannel && targetChannel != self) {//转型SocketChannel dest = (SocketChannel)targetChannel;//将msg 存储到bufferByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());//将buffer 的数据写入 通道dest.write(buffer);}}}

然后编写客户端的代码:

//定义相关的属性private final String HOST = "127.0.0.1"; // 服务器的ipprivate final int PORT = 6667; //服务器端口private Selector selector;private SocketChannel socketChannel;private String username;//构造器, 完成初始化工作public GroupChatClient() throws IOException {selector = Selector.open();//连接服务器socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT));//设置非阻塞socketChannel.configureBlocking(false);//将channel 注册到selectorsocketChannel.register(selector, SelectionKey.OP_READ);//得到usernameusername = socketChannel.getLocalAddress().toString().substring(1);System.out.println(username + " is ok...");}//向服务器发送消息public void sendInfo(String info) {info = username + " 说:" + info;try {socketChannel.write(ByteBuffer.wrap(info.getBytes()));}catch (IOException e) {e.printStackTrace();}}//读取从服务器端回复的消息public void readInfo() {try {int readChannels = selector.select();if(readChannels > 0) {//有可以用的通道Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();if(key.isReadable()) {//得到相关的通道SocketChannel sc = (SocketChannel) key.channel();//得到一个BufferByteBuffer buffer = ByteBuffer.allocate(1024);//读取sc.read(buffer);//把读到的缓冲区的数据转成字符串String msg = new String(buffer.array());System.out.println(msg.trim());}}iterator.remove(); //删除当前的selectionKey, 防止重复操作} else {//System.out.println("没有可以用的通道...");}}catch (Exception e) {e.printStackTrace();}}

然后在服务端的main方法中增加如下代码:

         //创建服务器对象GroupChatServer groupChatServer = new GroupChatServer();groupChatServer.listen();

同样的在客户端的main方法中增加如下代码:

        //启动我们客户端GroupChatClient chatClient = new GroupChatClient();//启动一个线程, 每个3秒,读取从服务器发送数据new Thread() {public void run() {while (true) {chatClient.readInfo();try {Thread.currentThread().sleep(3000);}catch (InterruptedException e) {e.printStackTrace();}}}}.start();//发送数据给服务器端Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String s = scanner.nextLine();chatClient.sendInfo(s);}

然后就可以启动服务端了:

之后我们可以启动多个客户端进行测试,可能有的没有开启相关的配置,将客户端run起来以后不能开启第二个,这里只需要进行一下简单的配置即可,笔者用的是IDEA,所以介绍一下IDEA的开启方式。大家按照我的方式即可。



我们开启两个客户端进行测试:



然后发消息进行测试,在63886中发送:你好,我是63886:

可以看到服务器对消息进行了转发:

同时63939也收到了来自63886的消息:

同样的,我们在客户端63939中发送消息:你好,我是63939:

同样服务器也进行了转发:

客户端63886也收到了63939的消息:

使用NIO实现简单的群聊基本已经实现,大家看一下自己的与我这里提供的有什么差别。当然有些功能可能不完善,这里只是作为一个简单的案例,太复杂的功能可以学习完以后在进行添加。下一章我们将介绍一下NIO的零拷贝。

猜你感兴趣
Netty专题-(1)初识Netty
Netty专题-(2)NIO三大核心
Netty专题-(3)NIO网络编程
相关专题持续更新中,敬请期待…

更多文章请点击:更多…

Netty专题-(3)NIO网络编程相关推荐

  1. 你对Java网络编程了解的如何?Java NIO 网络编程 | Netty前期知识(二)

    本文主要讲解NIO的简介.NIO和传统阻塞I/O有什么区别.NIO模型和传统I/O模型之间的对比.以及围绕NIO的三大组件来讲解,理论代码相结合. 很喜欢一句话:"沉下去,再浮上来" ...

  2. Netty:Java 领域网络编程的王者

    一.简介 1. 课程背景 分布式系统的根基在于网络编程,而 Netty 是 Java 领域网络编程的王者. 2. 课程内容 第一部分 NIO 编程,三大组件 第二部分 Netty 入门学习,Event ...

  3. 用Netty开发中间件:网络编程基础

    用Netty开发中间件:网络编程基础 <Netty权威指南>在网上的评价不是很高,尤其是第一版,第二版能稍好些?入手后快速翻看了大半本,不免还是想对<Netty权威指南(第二版)&g ...

  4. 【Netty】第二章 网络编程和 IO 概念剖析

    [Netty]第二章 网络编程 文章目录 [Netty]第二章 网络编程 一.网络编程 1.模拟阻塞模式下服务器单线程处理请求 2.模拟非阻塞模式下服务器单线程处理请求 3.使用 Selector 改 ...

  5. NIO网络编程实战之简单多人聊天室

    NIO网络编程实战 利用NIO编程知识,实现多人聊天室. 1. NIO编程实现步骤 第一步:创建Selector 第二步:创建ServerSocketChannel,并绑定监听端口 第三步:将Chan ...

  6. BIO,Socket网络编程入门代码示例,NIO网络编程入门代码示例,AIO 网络编程

    BIO,Socket网络编程入门代码示例 1.BIO服务器端程序 package cn.itcast.bio;import java.io.InputStream; import java.io.Ou ...

  7. Netty学习笔记二网络编程

    Netty学习笔记二 二. 网络编程 1. 阻塞模式 阻塞主要表现为: 连接时阻塞 读取数据时阻塞 缺点: 阻塞单线程在没有连接时会阻塞等待连接的到达,连接到了以后,要进行读取数据,如果没有数据,还要 ...

  8. Java NIO网络编程之群聊系统

    概述 对ServerSocketChannel.SocketChannel.SelectionKey有一定的理解和了解对应API. NIO非阻塞网络编程相关关系梳理: 以下概念: ServerSock ...

  9. java.nio包网络编程_Java NIO网络编程

    来自孙卫琴的Java网络编程,第四章,这段代码感觉有问题啊,第45行的 synchronized(gate){} 这句代码怎么理解啊,可以这样写吗? 1.[文件] EchoServer.java ~  ...

最新文章

  1. mysql数据类型默认长度_mysql数据类型长度
  2. json / 简介及结构
  3. navicat 结构同步会加锁吗_被柜员怠慢的张小波,真的会永久地转走几个亿的结构性存款吗?...
  4. python队列长度_[python模块]队列queue
  5. python编程单词排序_Python编程20:字典的遍历和排序
  6. 秒、毫秒和年月日的转换
  7. linux那些事之页迁移(page migratiom)
  8. deepface:最先进轻量级人脸识别和人脸属性分析框架讲解
  9. maven项目转gradle
  10. div+css静态网页设计游戏网站设计——仿君海游戏官网(13页) HTML+CSS大作业_ 手游网页制作作业_网游网页设计...
  11. 基于微信小程序的自来水收费系统设计与实现-计算机毕业设计源码+LW文档
  12. css-doodle 学习第三天, selectors的使用
  13. 一行python代码,带你重温经典小游戏
  14. 手机如何设置每个月23日固定时间定时提醒
  15. 电大本科计算机上机考试题,电大计算机上机考试模拟题及答案
  16. 贺州教师评职称计算机考试,职称资格考试贺州
  17. 使用Electron打造跨平台桌面应用
  18. 基于AI的5G技术-研究方向与范例-学习笔记
  19. 共享茶楼预订小程序开发前景
  20. Django设置首页

热门文章

  1. optee userpace TA程序中的栈的设置
  2. boost库安装编译指南
  3. 【模拟】P1067 多项式输出
  4. Windows保护模式学习笔记(三)—— 长调用/短调用/调用门
  5. windbg基本简单步骤
  6. 两种选择排序算法:简单选择排序、堆排序
  7. keil5详细的安装流程和设置
  8. Java中工程、包、类介绍
  9. Dubbo服务引用原理
  10. Freemarker条件判断