网络编程概念

两个程序通过一个双向的通信连接实现数据的交换,连接的一端称为一个socket

  • Socket是一个语言无标准,可以实现网络编程语言都有Socket
  • 通过·IP+Port通信
  • BIO、NIO、AIO适用场景

BIO:连接数少且固定的框架

NIO: 连接数多且连接时间短

AIO(NIO.2): 连接数多且连接时间长

Java IO流程图

Socket连接步骤

  1. 服务器监听
  2. 客户端请求
  3. 连接确定
  • Tips:连接的时候三次握手,断开连接四次挥手

同步和异步(OS底层操作)

  • 同步:使用同步IO时,Java自己处理IO读写
  • 异步:使用异步Io时,Java将IO读写委托给OS处理,需要将数据缓冲区地址和大小给OS(用户数据),OS需要支持异步IO操作API

阻塞和非阻塞(程序阻塞代码块)

  • 阻塞:使用阻塞IO时,Java调用会一直阻塞到读写完成才返回。
  • 非阻塞:使用非阻塞IO时,如果不能读写Java调用会马上返回,当IO事件分发器通知可读写时再继续进行读写,不循环直到读写完成。

BIO编程

  • Blocking IO:同步阻塞的编程方式
  • BIO编程方式通常是在JDK1.4版本之前常用的编程方式。
  • 编程实现过程:首先服务端启动一个ServerSocket来监听网络请求,客户端启动Socket发起网络请求,默认情况下ServerSocket会建立一个线程来处理这个请求,如果服务端没有线程可用,客户端会阻塞等待或遭到拒绝。(可以加入线程池)

  • 改善

阻塞BIO案例

Server:

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/*** BIO服务端源码* @author shenj* @version 1.0*/
public final class ServerNormal {//默认的端口号private static int DEFAULT_PORT = 12345;//单例的ServerSocketprivate static ServerSocket server;//根据传入参数设置监听端口,如果没有参数调用以下方法并使用默认值public static void start() throws IOException{//使用默认值start(DEFAULT_PORT);}//这个方法不会被大量并发访问,不太需要考虑效率,直接进行方法同步就行了public synchronized static void start(int port) throws IOException{if(server != null) return;try{//通过构造函数创建ServerSocket//如果端口合法且空闲,服务端就监听成功server = new ServerSocket(port);System.out.println("服务器已启动,端口号:" + port);//通过无线循环监听客户端连接//如果没有客户端接入,将阻塞在accept操作上。while(true){Socket socket = server.accept();//当有新的客户端接入时,会执行下面的代码//然后创建一个新的线程处理这条Socket链路new Thread(new ServerHandler(socket)).start();}}finally{//一些必要的清理工作if(server != null){System.out.println("服务器已关闭。");server.close();server = null;}}}
}

ServerHandler


package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;import com.anxpp.io.utils.Calculator;
/*** 客户端线程* @author shenj* 用于处理一个客户端的Socket链路*/
public class ServerHandler implements Runnable{private Socket socket;public ServerHandler(Socket socket) {this.socket = socket;}@Overridepublic void run() {BufferedReader in = null;PrintWriter out = null;try{in = new BufferedReader(new InputStreamReader(socket.getInputStream()));out = new PrintWriter(socket.getOutputStream(),true);String expression;String result;while(true){//通过BufferedReader读取一行//如果已经读到输入流尾部,返回null,退出循环//如果得到非空值,就尝试计算结果并返回if((expression = in.readLine())==null) break;System.out.println("服务器收到消息:" + expression);try{result = Calculator.cal(expression).toString();}catch(Exception e){result = "计算错误:" + e.getMessage();}out.println(result);}}catch(Exception e){e.printStackTrace();}finally{//一些必要的清理工作if(in != null){try {in.close();} catch (IOException e) {e.printStackTrace();}in = null;}if(out != null){out.close();out = null;}if(socket != null){try {socket.close();} catch (IOException e) {e.printStackTrace();}socket = null;}}}
}

client

package com.anxpp.io.calculator.bio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/*** 阻塞式I/O创建的客户端* @author shenj* @version 1.0*/
public class Client {//默认的端口号private static int DEFAULT_SERVER_PORT = 12345;private static String DEFAULT_SERVER_IP = "127.0.0.1";public static void send(String expression){send(DEFAULT_SERVER_PORT,expression);}public static void send(int port,String expression){System.out.println("算术表达式为:" + expression);Socket socket = null;BufferedReader in = null;PrintWriter out = null;try{socket = new Socket(DEFAULT_SERVER_IP,port);in = new BufferedReader(new InputStreamReader(socket.getInputStream()));out = new PrintWriter(socket.getOutputStream(),true);out.println(expression);System.out.println("___结果为:" + in.readLine());}catch(Exception e){e.printStackTrace();}finally{//一下必要的清理工作if(in != null){try {in.close();} catch (IOException e) {e.printStackTrace();}in = null;}if(out != null){out.close();out = null;}if(socket != null){try {socket.close();} catch (IOException e) {e.printStackTrace();}socket = null;}}}
}

测试代码,放在一个程序中

package com.anxpp.io.calculator.bio;
import java.io.IOException;
import java.util.Random;
/*** 测试方法* @author shenj* @version 1.0*/
public class Test {//测试主方法public static void main(String[] args) throws InterruptedException {//运行服务器new Thread(new Runnable() {@Overridepublic void run() {try {ServerBetter.start();} catch (IOException e) {e.printStackTrace();}}}).start();//避免客户端先于服务器启动前执行代码Thread.sleep(100);//运行客户端 char operators[] = {'+','-','*','/'};Random random = new Random(System.currentTimeMillis());new Thread(new Runnable() {@SuppressWarnings("static-access")@Overridepublic void run() {while(true){//随机产生算术表达式String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);Client.send(expression);try {Thread.currentThread().sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}).start();}
}

NIO编程

  • Ubblocking(New IO):同步阻塞的编程方式
  • 主要解决BIO的大并发问题,NIO基于Reactor,面向Buffer(缓存区)编程
  • 不能完全解决BIO上的问题,当并发上来的话,还是会有BIO一样的问题

  • 同步非阻塞,服务器实现模式为一个请求一个通道,即客户端发送的连接请求都会注册到多路复用器器上,多路复用器轮询到有I/O操作请求时才启动一个线程进行处理
  • NIO方式使用于连接数目多且比较短(轻操作)的架构,比如:聊天服务器,并发局限于应用中,编程复杂,JDK1.8开始支持
  • NIO核心三大组件:Selector(多路复用器)、Channel(通道)、Buffer(缓冲区)
  • NIO核心三大组件之间的关系
1.每个 channel都会对应一个 Buffer
2.Selector对应一个线程,一个线程对应多个 channel(连接)
3.该图反应了有三个 channel?注册到该 selector/程序
4.程序切换到哪个 channel是有事件决定的, Event就是一个重要的概念
5.Selector会根据不同的事件,在各个通道上切换
6.Buffer就是一个内存块,底层是有一个数组
7.数据的读取写入是通过 Buffer,这个和BIO,BIO中要么是输入流,或者是输出流,不能双向,但是NIO的 Buffer是可以读也可以写,需要ip方法切换
8.channel是双向的,可以返回底层操作系统的情况,比如 Linux,底层的操作系通道就是双向的
  • Buffer有7个子类(没有BooleanBuffer):ByteBuffer、IntBuffer、CharBuffer、LongBuffer、DoubleBuffer、FloatBuffer、ShortBuffer
  1. 最常用的是ByteBuffer
  • Buffer重要的四个属性:
  1. mark:标志位
  2. position:下标指针
  3. limit:当前缓冲区的终点
  4. capacity:缓冲区容量
  • flip通过改变这四个属性的值达到反转Buffer状态的功能

Buffer常用方法:

yteBuffer常用方法:

缓冲区案例

  • 缓冲区代码实例:
package icu.lookyousmileface.nio.basic;import java.nio.IntBuffer;/*** @author shenj* @title: NioBuffer* @projectName NettyPro* @date 2020/11/30 14:41*/
public class NioBuffer {public static void main(String[] args) {//IntBuffer.allocate(5);表示Buffer的空间为5,并且Buffer缓冲区类型为IntIntBuffer intBuffer = IntBuffer.allocate(5);//intBuffer.capacity()表示获得Buffer的大小for(int i = 0;i< intBuffer.capacity();i++){intBuffer.put(i*2);}//buffer进行过写操作之后需要读操作的时候需要flip进行状态反转intBuffer.flip();//hasRemaining()返回剩余的可用长度while (intBuffer.hasRemaining()){System.out.println(intBuffer.get());}}
}

Channel(通道):

FileChannel:

  • TransferTo拷贝的比较快底层实现是零拷贝
  • ByteBuffer+FileChannel实现文字输入到文件中:
/*** @author shenj* @title: NioBufferChannelFileWrite* @projectName NettyPro* @date 2020/11/30 18:05*/
public class NioBufferChannelFileWrite {private static final String msg = "怒发冲冠,凭栏处、潇潇雨歇。抬望眼、仰天长啸,壮怀激烈。三十功名尘与土,八千里路云和月。莫等闲、白了少年头,空悲切。";public static void main(String[] args) {try {FileOutputStream fileOutputStream = new FileOutputStream(new File("src/main/resources/filedata/data1.txt"));FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();ByteBuffer fileDataBuffer = ByteBuffer.allocate(1024);ByteBuffer putData = fileDataBuffer.put(msg.getBytes());//反转putData.flip();fileOutputStreamChannel.write(putData);fileOutputStream.close();} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}
}

实现的流程图

ByteBuffer+FileChannel实现从文件中读取到控制台:

/*** @author shenj* @title: NioBufferChannelRead* @projectName NettyPro* @date 2020/11/30 18:55*/
public class NioBufferChannelRead {private static final String filePath = "src/main/resources/filedata/data1.txt";public static void main(String[] args) {File file = new File(filePath);try {FileInputStream fileInputStream = new FileInputStream(file);FileChannel fileInputStreamChannel = fileInputStream.getChannel();//获取file的大小,避免浪费内存ByteBuffer byteDataBuffer = ByteBuffer.allocate((int) file.length());fileInputStreamChannel.read(byteDataBuffer);//byteDataBuffer.array()将byteBuffer缓冲区中的data变成数组System.out.println(new String(byteDataBuffer.array()));fileInputStream.close();} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}
}

实现的流程图

Buffer的分散和聚焦

  • Buffer的分散和聚焦:

Scattering:将数据写入到buffer时,可以采取buffer数组,依次写入【 分散】

Gathering:从buffer读取数据时,可以采用buffer数组,依次读取【聚焦】

  • 实例代码(Kotlin):
/*** @title: NioScatingAndGething* @projectName NettyPro* @author shenj* @date 2020/12/1 13:23*/
fun main(args: Array<String>): Unit {val serverSocketChannel = ServerSocketChannel.open();val inetSocketAddress = InetSocketAddress(9948);//绑定端口到socket上,并启动serverSocketChannel.socket().bind(inetSocketAddress);//buffer数组,NIO会自动将数据放到数组中,无需操心val byteBuffer = arrayOfNulls<ByteBuffer>(2)byteBuffer[0] = ByteBuffer.allocate(5)byteBuffer[1] = ByteBuffer.allocate(3)val scoketChannel = serverSocketChannel.accept();var messageLight = 8;while (true) {var byteRead: Int = 0while (byteRead < messageLight) {var read = scoketChannel.read(byteBuffer)println("byteRead:" + read)byteRead += read.toInt()Arrays.asList<ByteBuffer>(*byteBuffer).stream().map { buffer: ByteBuffer -> "potion:" + buffer.position() + "limit:" + buffer.limit() }.forEach { x: String? -> println(x) }}Arrays.asList<ByteBuffer>(*byteBuffer).forEach(Consumer { buffer: ByteBuffer -> buffer.flip() })var byteWrite: Long = 0;while (byteWrite < messageLight) {var write = scoketChannel.write(byteBuffer)byteWrite += write}Arrays.asList<ByteBuffer>(*byteBuffer).forEach(Consumer { buffer: ByteBuffer -> buffer.clear() })println("byteRead:" + byteRead + "byteWrite:" + byteWrite + "messagelenght:" + messageLight)}
}

一个Buffer实现文件的复制:

/*** @author shenj* @title: NioBufferOnlyOneWriteAndRead* @projectName NettyPro* @date 2020/11/30 19:18*/
public class NioBufferOnlyOneWriteAndRead {private static final String filePath1 = "src/main/resources/filedata/data1.txt";private static final String filePath2 = "src/main/resources/filedata/data2.txt";public static void main(String[] args) {File file1 = new File(filePath1);File file2 = new File(filePath2);try {FileInputStream fileInputStream = new FileInputStream(file1);FileOutputStream fileOutputStream = new FileOutputStream(file2);FileChannel fileInputStreamChannel = fileInputStream.getChannel();FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();ByteBuffer dataBuffer = ByteBuffer.allocate(512);while (-1 != fileInputStreamChannel.read(dataBuffer)) {//读和写之间需要切换dataBuffer.flip();fileOutputStreamChannel.write(dataBuffer);//清空Buffer缓冲区的数据dataBuffer.clear();}fileInputStream.close();fileOutputStream.close();} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}
}

利用transferFrom拷贝文件:

/*** @author shenj* @title: NioBufferTransorf* @projectName NettyPro* @date 2020/11/30 20:18*/
public class NioBufferTransorf {private static final String filepath1 = "src/main/resources/filedata/data1.txt";private static final String filepath2 = "src/main/resources/filedata/data2.txt";public static void main(String[] args) {try {FileInputStream fileInputStream = new FileInputStream(new File(filepath1));FileOutputStream fileOutputStream = new FileOutputStream(new File(filepath2));FileChannel fileInputStreamChannel = fileInputStream.getChannel();FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();//将目标的通道的数据复制到当前通道,Channel自带数据有效长度size获取fileOutputStreamChannel.transferFrom(fileInputStreamChannel, 0, fileInputStreamChannel.size());fileInputStreamChannel.close();fileOutputStreamChannel.close();fileInputStream.close();fileOutputStream.close();} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}}

利用transferFrom拷贝Image(Kotlin):

fun main(args: Array<String>):Unit {val  imagePath = "src/main/resources/filedata/sky.jpg";val  copyImageToPath = "src/main/resources/filedata/sky_copy.jpg"val accessFile = RandomAccessFile(imagePath, "rw")val accessFile_copy = RandomAccessFile(copyImageToPath, "rw")val accessFile_channle = accessFile.channelval accesssFile_copy_channel = accessFile_copy.channelaccesssFile_copy_channel.transferFrom(accessFile_channle,0,accessFile_channle.size())
}

Buffer和Channel的注意事项:

NIO案例

  • 实例代码:

    • NioServer.java
package bigdata.studynio;public class NioServer {public static void main(String[] args) {int port = 8080;if(args != null && args.length < 0){//port = Integer.valueOf(args[0]);    }//创建服务器线程NioServerWork nioServerWork = new NioServerWork(port);new Thread(nioServerWork, "server").start();}
}

NioServerWork.java

package bigdata.studynio;import java.io.BufferedReader;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;public class NioServerWork implements Runnable {//多路复用器 Selector会对注册在其上面的channel进行;轮询,当某个channel发生读写操作时,//就会处于相应的就绪状态,通过SelectionKey的值急性IO 操作private Selector selector;//多路复用器private ServerSocketChannel channel;private volatile boolean stop;/*** @param port* 构造函数*/public NioServerWork(int port) {try {selector = Selector.open();//打开多路复用器channel = ServerSocketChannel.open();//打开socketchannelchannel.configureBlocking(false);//配置通道为非阻塞的状态channel.socket().bind(new InetSocketAddress(port), 1024);//通道socket绑定地址和端口channel.register(selector, SelectionKey.OP_ACCEPT);//将通道channel在多路复用器selector上注册为接收操作System.out.println("NIO 服务启动 端口: "+ port);} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}        }public void stop(){this.stop=true;}@Overridepublic void run() {//线程的Runnable程序System.out.println("NIO 服务  run()");while(!stop){try {selector.select(1000);//最大阻塞时间1s//获取多路复用器的事件值SelectionKey,并存放在迭代器中Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectedKeys.iterator();SelectionKey key =null;//System.out.println("NIO 服务  try");while(iterator.hasNext()){System.out.println("NIO 服务  iterator.hasNext()");key = iterator.next();iterator.remove();//获取后冲迭代器中删除此值try {handleinput(key);//根据SelectionKey的值进行相应的读写操作              } catch (Exception e) {if(key!=null){key.cancel();if(key.channel()!=null)key.channel().close();                           }}                                  }                           } catch (IOException e) {System.out.println("NIO 服务  run  catch IOException");e.printStackTrace();System.exit(1);}}       }/*** @param key* @throws IOException* 根据SelectionKey的值进行相应的读写操作*/private void handleinput(SelectionKey key) throws IOException {System.out.println("NIO 服务  handleinput");if(key.isValid()){//判断所传的SelectionKey值是否可用if(key.isAcceptable()){//在构造函数中注册的key值为OP_ACCEPT,,在判断是否为接收操作ServerSocketChannel  ssc = (ServerSocketChannel)key.channel();//获取key值所对应的channelSocketChannel sc = ssc.accept();//设置为接收非阻塞通道sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_READ);//并把这个通道注册为OP_READ           }if(key.isReadable()){//判断所传的SelectionKey值是否为OP_READ,通过上面的注册后,经过轮询后就会是此操作SocketChannel sc = (SocketChannel)key.channel();//获取key对应的channelByteBuffer readbuf = ByteBuffer.allocate(1024);int readbytes = sc.read(readbuf);//从channel中读取byte数据并存放readbufif(readbytes > 0){readbuf.flip();//检测时候为完整的内容,若不是则返回完整的byte[] bytes = new byte[readbuf.remaining()];readbuf.get(bytes);String string = new String(bytes, "UTF-8");//把读取的数据转换成stringSystem.out.println("服务器接受到命令 :"+ string); //"查询时间"就是读取的命令,此字符串要与客户端发送的一致,才能获取当前时间,否则就是bad orderString currenttime = "查询时间".equalsIgnoreCase(string) ? new java.util.Date(System.currentTimeMillis()).toString() : "bad order";dowrite(sc,currenttime);//获取到当前时间后,就需要把当前时间的字符串发送出去}else if (readbytes < 0){key.cancel();sc.close();                  }else{}             }           }       }/*** @param sc* @param currenttime* @throws IOException* 服务器的业务操作,将当前时间写到通道内*/private void dowrite(SocketChannel sc, String currenttime) throws IOException {System.out.println("服务器 dowrite  currenttime"+  currenttime);if(currenttime !=null && currenttime.trim().length()>0){byte[] bytes = currenttime.getBytes();//将当前时间序列化ByteBuffer writebuf = ByteBuffer.allocate(bytes.length);writebuf.put(bytes);//将序列化的内容写入分配的内存writebuf.flip();sc.write(writebuf);   //将此内容写入通道      }}
}

NioClient.java

package bigdata.studynio;public class NioClient {public static void main(String[] args) {int port = 8080;if(args !=null && args.length > 0){try {//port = Integer.valueOf(args[0]);} catch (Exception e) {// TODO: handle exception}}//创建客户端线程new Thread(new NioClientWork("127.0.0.1",port),"client").start();}}

NioClientWork.java

package bigdata.studynio;import java.io.BufferedReader;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;public class NioClientWork implements Runnable {private String host;private int port;private Selector selector;private SocketChannel socketChannel;private volatile boolean stop;/*** @param string* @param port* 构造函数*/public NioClientWork(String string, int port) {this.host = string == null ? "127.0.0.1":string;this.port = port;try {selector= Selector.open();//打开多路复用器socketChannel=SocketChannel.open();//打开socketchannelsocketChannel.configureBlocking(false);System.out.println("NIO 客户端启动 端口: "+ port);} catch (IOException e) {e.printStackTrace();System.exit(1);}}/* (non-Javadoc)* @see java.lang.Runnable#run()*/@Overridepublic void run() {try {doConnect();//客户端线程需要连接服务器} catch (Exception e) {e.printStackTrace();System.exit(1);}while(!stop){try {selector.select(1000);//最大阻塞时间1s//获取多路复用器的事件值SelectionKey,并存放在迭代器中Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectedKeys.iterator();SelectionKey key =null;while (iterator.hasNext()) {key = iterator.next();iterator.remove();try {handleinput(key);//获取多路复用器的事件值SelectionKey,并存放在迭代器中                  } catch (Exception e) {if(key == null){key.cancel();if(socketChannel ==null)socketChannel.close();                          }}                  }} catch (IOException e) {e.printStackTrace();System.exit(1);}          }if(selector !=null){try {selector.close();} catch (Exception e) {e.printStackTrace();}}}/*** @throws IOException* 线程连接服务器,并注册操作* */private void doConnect() throws IOException {if(socketChannel.connect(new InetSocketAddress(host, port))){//检测通道是否连接到服务器 System.out.println("NIO 客户端 idoConnect OP_READ ");socketChannel.register(selector, SelectionKey.OP_READ);//如果已经连接到了服务器,就把通道在selector注册为OP_READdowrite(socketChannel);}else{System.out.println("NIO 客户端 doConnect OP_CONNECT ");socketChannel.register(selector, SelectionKey.OP_CONNECT);//如果客户端未连接到服务器,则将通道注册为OP_CONNECT操作}    }/*** @param key* @throws IOException* 根据SelectionKey的值进行相应的读写操作*/private void handleinput(SelectionKey key) throws IOException {//System.out.println("NIO 客户端 handleinput ");if(key.isValid()){//判断所传的SelectionKey值是否可用//System.out.println("NIO 客户端 isValid() ");SocketChannel sc = (SocketChannel) key.channel();if(key.isConnectable()){//一开始的时候,客户端需要连接服务器操作,所以检测是否为连接状态System.out.println("NIO 客户端 isConnectable ");if(sc.finishConnect()){//是否完成连接System.out.println("NIO 客户端 finishConnect ");dowrite(sc);//向通道内发送数据,就是“查询时间” 的命令,读写通道与通道注册事件类型无关,注册事件只是当有事件来了,就会去处理相应事件sc.register(selector, SelectionKey.OP_READ);//如果完成了连接,就把通道注册为 OP_READ操作,用于接收服务器出过来的数据}else{System.out.println("NIO 客户端 not finishConnect ");System.exit(1);              }}if(key.isReadable()){//根据上面注册的selector的通道读事件,进行操作System.out.println("NIO 客户端 isReadable() ");ByteBuffer readbuf = ByteBuffer.allocate(1024);int readbytes = sc.read(readbuf);//获取通道从服务器发过来的数据,并反序列化if(readbytes > 0){readbuf.flip();byte[] bytes=new byte[readbuf.remaining()];readbuf.get(bytes);String string = new String(bytes, "UTF-8");System.out.println("时间是: " + string);this.stop=true;    //操作完毕后,关闭所有的操作              }else if (readbytes < 0){key.cancel();sc.close();}else{}         }               }       }private void dowrite(SocketChannel sc) throws IOException {byte[] req = "查询时间".getBytes();ByteBuffer writebuf = ByteBuffer.allocate(req.length);writebuf.put(req);writebuf.flip();sc.write(writebuf);if(!writebuf.hasRemaining()){System.out.println("向服务器发送命令成功 ");}    }
}

asReadOnlyBuffer例子:

/*** @author shenj* @title: NioBufferOnlyRead* @projectName NettyPro* @date 2020/12/1 9:54*/
public class NioBufferOnlyRead {public static void main(String[] args) {ByteBuffer dataBuffer = ByteBuffer.allocate(5);for (int i = 0; i < dataBuffer.capacity() - 1; i++) {dataBuffer.put((byte) (i * 2));}dataBuffer.flip();//可以从一个创建的Buffer获取OnlyReadBufferByteBuffer onlyByteBuffer = dataBuffer.asReadOnlyBuffer();while (onlyByteBuffer.hasRemaining()) {System.out.println(onlyByteBuffer.get());}//无法往OnlyReadBuffer写数据
//        onlyByteBuffer.put((byte)(2));//可以往dataBuffer中写数据dataBuffer.put((byte) (2));}
}
  • 指定Buffer的读写操作,会生成一个新的Buffer
  • MappedByteBuffer:可以实现文件在内存(堆外内存)直接修改,而操作系统无需再拷贝一份,实例代码:
/*** @author shenj* @title: NioBufferAsReadOnlyBuffer* @projectName NettyPro* @date 2020/12/1 11:35*/
public class NioBufferAsReadOnlyBuffer {public static void main(String[] args) {try {RandomAccessFile randomAccessFile = new RandomAccessFile("src/main/resources/filedata/data1.txt", "rw");FileChannel randomAccessFileChannel = randomAccessFile.getChannel();MappedByteBuffer map = randomAccessFileChannel.map(FileChannel.MapMode.READ_WRITE, 0, randomAccessFileChannel.size());map.put(0, (byte) ('H'));randomAccessFile.close();randomAccessFileChannel.close();} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}
}

RandomAccessFile处理文本,mode的源码:

String name = (file != null ? file.getPath() : null);int imode = -1;if (mode.equals("r"))imode = O_RDONLY;else if (mode.startsWith("rw")) {imode = O_RDWR;rw = true;if (mode.length() > 2) {if (mode.equals("rws"))imode |= O_SYNC;else if (mode.equals("rwd"))imode |= O_DSYNC;elseimode = -1;}

由源码可知mode的四种模式对应的功能如下:

MappedByteBuffer属性解析:

public abstract MappedByteBuffer map(MapMode mode,long position, long size)throws IOException;

mode的三种模式:

/*** Mode for a read-only mapping.*/public static final MapMode READ_ONLY= new MapMode("READ_ONLY");/*** Mode for a read/write mapping.*/public static final MapMode READ_WRITE= new MapMode("READ_WRITE");/*** Mode for a private (copy-on-write) mapping.*/public static final MapMode PRIVATE= new MapMode("PRIVATE");

多路复用

  • Selector(多路复用器)
  1. Java的Nio,用到非阻塞的IO方式。可以用一个线程,处理多个客户端连接,就会使用到Selector(选择器)
  2. Selector能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式注册到同一个Selector),如果有事件发生,便获取时间让后针对每个事件进行相应的处理,这样就可以只用一个单线程区管理多个通道,也就是管理多个连接和请求
  • Selector相关方法
//实现Closeable接口表示拥有自动关闭流的功能
public abstract class Selector implements Closeable {
//表示获得一个Selector实例
public static Selector open() throws IOException {
//表示设置超时时间,非阻塞,当有事件发生的时候将将注册到相应的SelectionLey中
public abstract int select(long timeout)
//表示获得已经注册了所有的Selectkey
public abstract Set<SelectionKey> selectedKeys();
//阻塞的方法
public abstract int select() throws IOException;
//在未超过select(long timeout)的范围中,可以wakeup唤醒selector
public abstract Selector wakeup();
//不阻塞,立即返回
public abstract int selectNow() throws IOException;
  • 获得了SelectionKey相当于获得了对应的Channel,SelectionKey和Channel之间是注册关系
  • Selectort、SelectionKey、ServerSocketChannel、SocketChannel

对上图的说明:

1.当客户端连接时,会通过ServerSocketChannel得到SocketChannel
2.将socketChannel注册到Selector上,register(Selector sel, int ops)一个selector上可以注册多个SocketChannel
3.注册后返回一个SelectionKey,会和该Selector关联(集合)
4.Selector进行监听select方法,返回有事件发生的通道的个数
5.进一步得到各个SelectionKey(有事件发生)
6.在通过SelectionKey反向获取SocketChannel,方法channel()
7.可以通过得到channel,完成业务处理

使用三大核心组件编写Server-Client实现上图业务逻辑:

  • Server(Java):
/*** @author shenj* @title: NioServer* @projectName NettyPro* @date 2020/12/1 17:57*/
public class NioServer {public static void main(String[] args) {try {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();Selector selector = Selector.open();serverSocketChannel.socket().bind(new InetSocketAddress(7798));//设置未非阻塞serverSocketChannel.configureBlocking(false);//注册selector,设置关注事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) {if (selector.select(1000) == 0) {System.out.println("超时1s!");continue;}//selector>0表示触发了关注事件Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();iterator.forEachRemaining(s -> {//监听连接事件if (s.isAcceptable()) {try {SocketChannel socketChannel = serverSocketChannel.accept();//非阻塞socketChannel.configureBlocking(false);System.out.println("一个客户端连接成功"+socketChannel.hashCode());socketChannel.register(selector, OP_READ, ByteBuffer.allocate(1024));} catch (IOException e) {e.printStackTrace();}}//监听读事件if (s.isReadable()) {SocketChannel socketChannel = (SocketChannel)s.channel();ByteBuffer dataBuffer = (ByteBuffer)s.attachment();try {socketChannel.read(dataBuffer);System.out.println("来自客户端:"+new String(dataBuffer.array()));} catch (IOException e) {e.printStackTrace();}dataBuffer.clear();}iterator.remove();});}} catch (IOException e) {e.printStackTrace();}}
}

Client(Kotlin):

/*** @title: NioClient* @projectName NettyPro* @author shenj* @date 2020/12/1 18:57*/
fun main(args: Array<String>):Unit {val data = "我深深地熟悉你脚步的韵律,它在我心中敲击."val socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);val inetSocketAddress = InetSocketAddress("127.0.0.1",7798)if(!socketChannel.connect(inetSocketAddress)){while (!socketChannel.finishConnect()){System.out.println("连接服务器,需要时间....出去溜达会吧~")}}val dataBuffer = ByteBuffer.wrap(data.toByteArray(Charsets.UTF_8))socketChannel.write(dataBuffer)System.`in`.read()
}

SelectionKey API

//注册channel对应的SelectionKey
public abstract class SelectionKey {
//得到与之与之关联的selector
public abstract Selector selector();
//得到与之关联的channel
public abstract SelectableChannel channel();
//是否可以读
public final boolean isReadable()
//是否可以写
public final boolean isWritable()
//是否可以accept
public final boolean isAcceptable()
//获得与之关联的数据
public final Object attachment()
//改变监听事件
public abstract SelectionKey interestOps(int ops);

SelectionKey的四个重要属性,<<表示位运算向左移动

public static final int OP_READ = 1 << 0;//1 读操作
public static final int OP_WRITE = 1 << 2;//4 写操作
public static final int OP_CONNECT = 1 << 3;//8 已经连接
public static final int OP_ACCEPT = 1 << 4;//16 有新的网络可以accept

SocketChannel API

  1. SocketChannel,网络IO通道,具体负责进行读写操作,NIO把缓冲区的数据写入通道,或者把通道的数据读到缓冲区
public abstract class SocketChannelextends AbstractSelectableChannelimplements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel
{
//获得一个SocketChannel通道
public static SocketChannel open() throws IOException {
//设置阻塞或非阻塞
public abstract boolean isConnectionPending();
//连接服务器
public abstract boolean connect(SocketAddress remote) throws IOException;
//如果connect方法连接失败,接下来通过该方法进行完成连接操作
public abstract boolean finishConnect() throws IOException;
//往通道内写数据
public abstract int write(ByteBuffer src) throws IOException;
//往通道里读数据
public abstract int read(ByteBuffer dst) throws IOException;
//注册一个选择器斌设置监听事件。最后一个参数可以设置共享数据
public final SelectionKey register(Selector sel, int ops,Object att)

多人聊天室(NIO)

  • 多人聊天室

    • Server
/*** @author shenj* @title: GroupToChatWithServer* @projectName NettyPro* @date 2020/12/1 22:53*/
public class GroupToChatWithServer {private Selector selector;private ServerSocketChannel serverSocketChannel;private static final int Port = 7799;/*** @author shenj* @title:  GroupToChatWithServer* @date 2020/12/2  14:18* 无参构造 * */public GroupToChatWithServer() {try {selector = Selector.open();serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.socket().bind(new InetSocketAddress(Port));serverSocketChannel.configureBlocking(false);serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}/*** @author shenj* @title: listen* @date 2020/12/2  14:13*多路复用器监听channel通道的事件*/public void listen() {try {while (true) {int status = selector.select();if (status > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> key_Iterator = selectionKeys.iterator();key_Iterator.forEachRemaining(s -> {if (s.isAcceptable()) {try {SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);System.out.println(socketChannel.getRemoteAddress()+"上线了");} catch (IOException e) {e.printStackTrace();}}if(s.isReadable()){readData(s);}key_Iterator.remove();});} else {
//                    System.out.println("等待中....");}}} catch (Exception e) {e.printStackTrace();} finally {}}/*** @author shenj* @title: readData* @date 2020/12/2  14:16* Listen监听到通道的read事件,读取信息*/private void readData(SelectionKey key){SocketChannel socketChannel = null;try{socketChannel = (SocketChannel) key.channel();ByteBuffer dataBuffer = ByteBuffer.allocate(1024);int readCount = socketChannel.read(dataBuffer);if (readCount>0){String msg = new String(dataBuffer.array());System.out.println("from 客户端"+msg);sendInfo(msg,socketChannel);}}catch (Exception e){try {System.out.println(socketChannel.getRemoteAddress()+"下线了");key.cancel();socketChannel.close();} catch (IOException e2) {e2.printStackTrace();}}}/*** @author shenj* @title: sendInfo* @date 2020/12/2  14:17* 发送信息到其他的客户端*/private void sendInfo(String msg, SocketChannel self){System.out.println("服务器发送信息中...");selector.selectedKeys().stream().forEach(s->{Channel targetChannel = s.channel();//排除自己发送if (targetChannel instanceof SocketChannel && targetChannel != self){SocketChannel dest = (SocketChannel)targetChannel;ByteBuffer dataBuffer = ByteBuffer.wrap(msg.getBytes());try {dest.write(dataBuffer);} catch (IOException e) {e.printStackTrace();}}});}/*** @author shenj* @title: main* @date 2020/12/2  14:19* 主方法*/public static void main(String[] args) {GroupToChatWithServer chatWithServer = new GroupToChatWithServer();chatWithServer.listen();}
}

Client

/*** @author shenj* @title: GroupClient* @projectName NettyPro* @date 2020/12/2 0:04*/
public class GroupClient {private final String host = "127.0.0.1";private final int port = 7799;private Selector selector;private SocketChannel socketChannel;private String username;/*** @author shenj* @title: GroupClient* @date 2020/12/2  14:20 * 无参构造*/public GroupClient() {try {selector = Selector.open();socketChannel = SocketChannel.open(new InetSocketAddress(host,port));socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);username = socketChannel.getLocalAddress().toString().substring(1);System.out.println(username+"is ok!!!");} catch (IOException e) {e.printStackTrace();}}/*** @author shenj* @title: sendInfo* @date 2020/12/2  14:12* 发送信息到服务端*/public  void sendInfo(String info){info = username+"说:"+info;try {socketChannel.write(ByteBuffer.wrap(info.getBytes()));} catch (IOException e) {e.printStackTrace();}}/*** @author shenj* @title: readInfo* @date 2020/12/2  14:11* 读取服务器发送的信息*/public void readInfo(){try {int status = selector.select();if (status>0){Iterator<SelectionKey> key_iterator = selector.selectedKeys().iterator();while (key_iterator.hasNext()) {SelectionKey s = key_iterator.next();if (s.isReadable()) {SocketChannel socketChannel = (SocketChannel) s.channel();ByteBuffer dataBuffer = ByteBuffer.allocate(1024);try {socketChannel.read(dataBuffer);String msg = new String(dataBuffer.array());System.out.println(msg.trim());} catch (IOException e) {e.printStackTrace();}}}key_iterator.remove();}else {
//                System.out.println("没有可用的通道...");}} catch (IOException e) {e.printStackTrace();}}/*** @author shenj* @title: main* @date 2020/12/2  14:20* 主方法*/public static void main(String[] args) {GroupClient chatClient = new GroupClient();new Thread(){@Overridepublic void run() {while (true){chatClient.readInfo();try {sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}}.start();//发送给服务端Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()){String str = scanner.nextLine();chatClient.sendInfo(str);}}
}

零拷贝

  • 零拷贝:是指没有CPU拷贝

    • 实在网络传输优化的重要手段,从OS角度来看,内核缓冲区只有唯一的一份数据,不存在重复,更少使用CPU缓存伪共享以及无CPU校验和计算

常用两种零拷贝mmap(内存映射)和sendFile

  • 传统io

  • 三次上下文切换
  • Hard driver→kernel buffer→user buffer→socket buffer→protocol engine
  • kernel buffer、user buffer、socket buffer(两次cpu拷贝)

mmao优化

  • 两次上下文切换
  • Hard driver→kernel buffer→socket buffer→protocol engine
  • mmap直接将Hard driver文件映射到kernel buffer上,kernel buffer和user buffer共享数据,通过直接操作kernel buffer(内核内存)数据,实现文件的操作
  • kernel buffer、socket buffer(一次cpu拷贝)

sendFile

Linux2.1

  • 两次上文切换
  • Hard driver→kernel buffer→socket buffer→protocol engine
  • kernel buffer 、socket buffer(一次cpu拷贝(拷贝的是元数据,比如:数据长度等))
  • CPU copy:cpu拷贝,DMA copy:内存拷贝
  • Linux2.4

  • Hard driver →kernel buffer—→(socket buffer(copy kernel buffer元数据,比如lenght等,可以忽略))→protocol engine
  • kernel buffer 可以直接通过内存拷贝到协议栈protocol engin,但是还是有少量数据需要cpu copy到socket buffer上

使用.transferTo零拷贝传输文件

Server

/*** @author shenj* @title: NioServer* @projectName NettyPro* @date 2020/12/2 10:47*/
public class NioServer implements Serializable {private static final String host = "127.0.0.1";private static final int port = 8899;public static void main(String[] args) throws IOException {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.socket().bind(new InetSocketAddress(host, port));ByteBuffer dataBuffer = ByteBuffer.allocate(4098);while (true) {SocketChannel socketChannel = serverSocketChannel.accept();int readCount = 0;while (-1 != readCount) {try {//统计读取字节readCount = socketChannel.read(dataBuffer);}catch (IOException e){break;}//可以让dataBuffer复用,使position = 0;mark = -1作废dataBuffer.rewind();}}}
}
  • .rewind:表示buffer的倒带,也就是position=0,mark标志位无效

Client:

/*** @author shenj* @title: NioClient* @projectName NettyPro* @date 2020/12/2 14:38*/
public class NioClient implements Serializable {private static final  String fileName = "protoc-3.6.1-win32.zip";private static final String  host = "127.0.0.1";private static final int port = 8899;public static void main(String[] args) throws IOException {SocketChannel socketChannel = SocketChannel.open();//客户端使用connect连接socketChannel.connect(new InetSocketAddress(host,port));//文件传输通道FileChannel fileChannel = new FileInputStream(new File(fileName)).getChannel();long start = System.currentTimeMillis();long byteNum = fileChannel.transferTo(0, fileChannel.size(), socketChannel);System.out.println("传输的字节数:"+byteNum+"耗时:"+(System.currentTimeMillis()-start));fileChannel.close();}
}

一文带你由浅入深Netty异步非阻塞世界相关推荐

  1. Netty异步非阻塞事件驱动及组件原理详解

    本文基于 Netty 4.1 展开介绍相关理论模型,使用场景,基本组件.整体架构,知其然且知其所以然,希望给大家在实际开发实践.学习开源项目方面提供参考. Netty 是一个异步事件驱动的网络应用程序 ...

  2. Netty异步非阻塞事件驱动及原理详解

    本文基于 Netty 4.1 展开介绍相关理论模型.使用场景.基本组件.整体架构,知其然且知其所以然,希望给大家在实际开发实践.学习开源项目方面提供参考.        Netty 是一个异步事件驱动 ...

  3. netty(异步非阻塞、实时、高效率)

    1.为什么使用netty 简单,再也不用编写复杂的代码逻辑去实现通信,再也不用考虑性能问题,不需要考虑编解码问题,半包读写问题. netty运用于Hadoop的RPC框架Avro,JMS框架Rocke ...

  4. 可以真正带你理清同步阻塞与同步非阻塞与异步阻塞与异步非阻塞的文章

    看了很多这方面的博客,发现说的都云里雾里,有些甚至矛盾了起来,自己理了一下这些文章,结合自己的理解说一说其到底是什么. 首先我们都知道阻塞与非阻塞,同步与异步是两个概念.而且是不是有一种它们本质上不是 ...

  5. node异步非阻塞的杂谈

    引言 node作为服务器的优势就在于适合处理高并发的请求,对于web网站后台这种I/O密集型的后台尤其有优势,其核心就在于node是一个异步非阻塞模型.关于异步,同步,阻塞,非阻塞这些概念,本文不做讨 ...

  6. 同步阻塞的BIO、同步非阻塞的NIO、异步非阻塞的AIO

    IO的方式通常分为几种,同步阻塞的BIO.同步非阻塞的NIO.异步非阻塞的AIO. 一.BIO 在JDK1.4出来之前,我们建立网络连接的时候采用BIO模式,需要先在服务端启动一个ServerSock ...

  7. Java异步非阻塞编程的几种方式

    简介: Java异步非阻塞编程的几种方式 一. 从一个同步的Http调用说起 一个很简单的业务逻辑,其他后端服务提供了一个接口,我们需要通过接口调用,获取到响应的数据. 逆地理接口:通过经纬度获取这个 ...

  8. Node.js:海量数据大行其道的今天 node.js 在IO方面如何异步非阻塞

    以其高性能,异步IO著称,当然node.js 在 stream 上的异步也非常到位. 我们一般理解的异步请求是这样的: 同时处理多件件事件 A,B,C,D,E,F,G .... 如果用.net语言去处 ...

  9. AIO,BIO,NIO:同步阻塞式IO,同步非阻塞IO,异步非阻塞IO

    BIO,同步阻塞式IO,简单理解:一个连接一个线程 NIO,同步非阻塞IO,简单理解:一个请求一个线程 AIO,异步非阻塞IO,简单理解:一个有效请求一个线程 IO:阻塞IO BIO:同步阻塞IO.服 ...

  10. 使用tornado让你的请求异步非阻塞

    2019独角兽企业重金招聘Python工程师标准>>> 前言 也许有同学很迷惑:tornado不是标榜异步非阻塞解决10K问题的嘛?但是我却发现不是torando不好,而是你用错了. ...

最新文章

  1. 鸟哥的Linux私房菜(基础篇)- 第二十五章、 Linux 备份策略
  2. 剑指Offer:反转链表【24】
  3. c++string替换指定位置字符_Lua 字符串
  4. 前端学习(3142):react-hello-react之父组件render
  5. fwrite在任意位置写入文件,并可修改文件内容
  6. python 类与类之间的关系
  7. python的zip()函数,压缩多个可迭代对象
  8. Vue报错Module build failed Error Node Sass version 6.0.1 is incompatible with ^4.0.0.解决方案
  9. python 将一个列表切割成随机指定长度的多个列表
  10. java 中 byte[]、File、InputStream 互相转换
  11. Android存储(1)-- 你还在乱用Android存储嘛!!!
  12. 1.1 经典车间生产调度问题模型及其算法
  13. Android Intent FLAG标识
  14. 一次性餐巾行业调研报告 - 市场现状分析与发展前景预测
  15. Java8新特性 方法引用(四)
  16. 火车头采集的文件发布到服务器上,火车头采集器图片采集上传设置
  17. VB中常用的函数(一)
  18. 计算机组成原理 CPU 结构和功能
  19. 7-7 社交集群 (30 分) (集合数组的方法)
  20. 基于php的养生食疗,夏季养生宜吃的8种食物

热门文章

  1. 在VMware下安装中标麒麟操作系统7.0以及Neokylin基础常用知识
  2. android:viewpager+photoview实现图片查看器
  3. 图片批量上传至服务器/华为云obs 前台采用webuploader.js div+css布局 图片.zip华为云obs浏览器下载
  4. 专利写作技巧以及流程
  5. PowerPoint 消除所有动画VBA指令
  6. npm install 报错 npm ERR! errno ETIMEDOUT
  7. 煮酒论AI,看看大牛怎么说
  8. Tensorflow SavedModel 模型的保存和加载
  9. Python设置excel单元格格式
  10. 吾欲使汝为恶,则恶不可为;使汝为善,则我不为恶。