之前用nio写了一个聊天室的简单demo,发现思路很乱,然后就开始看zookeeper是怎么做的,发现思路很妙,于是就仿照着写了一个很简单的客户端版本,就是输入一个加法的字符串,然后让服务器计算,再返回。

客户端:

package client;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Scanner;public class Client {private ClientIO cIO;public Client(){init();}private void init(){cIO = new ClientIO();}class ClientIO{private Queue<Packet> outgoingQueue;private Queue<Packet> pendingQueue;private Selector selector;private SocketChannel channel;public ClientIO(){outgoingQueue = new LinkedList<>();pendingQueue = new LinkedList<>();try {selector = Selector.open();channel = SocketChannel.open();channel.connect(new InetSocketAddress("localhost", 20002));channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_CONNECT);} catch (IOException e) {e.printStackTrace();}new Thread(new SendThread()).start();}public void submit(Packet packet){outgoingQueue.add(packet);}class SendThread implements Runnable{private void connect(){try {channel.register(selector, SelectionKey.OP_WRITE);} catch (ClosedChannelException e) {e.printStackTrace();}}private void read(){if(pendingQueue.isEmpty())return;ByteBuffer buffer = ByteBuffer.allocate(1024);try {channel.read(buffer);} catch (IOException e) {e.printStackTrace();}buffer.flip();String r = new String(buffer.array());Packet packet = pendingQueue.poll();packet.setResult(r);synchronized (packet) {packet.notifyAll();}}private void updateInterest(){try {if(!outgoingQueue.isEmpty() && !pendingQueue.isEmpty()){channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);}else if (!outgoingQueue.isEmpty() && pendingQueue.isEmpty()) {channel.register(selector, SelectionKey.OP_WRITE);}else if (outgoingQueue.isEmpty() && !pendingQueue.isEmpty()) {channel.register(selector, SelectionKey.OP_READ);}else {channel.register(selector, SelectionKey.OP_WRITE);}} catch (ClosedChannelException e) {e.printStackTrace();}}private void write(){if(outgoingQueue.isEmpty())return;Packet packet = outgoingQueue.poll();ByteBuffer buffer = ByteBuffer.allocate(1024);buffer.put(packet.getOp().getBytes());buffer.flip();try {channel.write(buffer);} catch (IOException e) {e.printStackTrace();}pendingQueue.add(packet);}@Overridepublic void run() {try {while(true){selector.select();Iterator<SelectionKey> itr = selector.selectedKeys().iterator();while(itr.hasNext()){SelectionKey key = itr.next();if(key.isConnectable()){connect();}else if(key.isReadable()){read();}else if(key.isWritable()){write();}itr.remove();}updateInterest();}} catch (IOException e) {e.printStackTrace();}}}}class Packet{private String op;private String result;public void setOp(String op){this.op = op;}public void setResult(String result){this.result = result;}public String getOp(){return this.op;}public String getResult(){return this.result;}}private void loop(){Scanner scanner = null;try {scanner = new Scanner(System.in);while(true){System.out.println("Plz input: ");String op = scanner.next();System.out.println(getResult(op));}} finally {scanner.close();}}private String getResult(String op){Packet packet = new Packet();packet.setOp(op);cIO.submit(packet);synchronized (packet) {try {packet.wait();} catch (InterruptedException e) {e.printStackTrace();}}return packet.getResult();}public static void main(String[] args) {new Client().loop();}}

服务端:

没有什么,还是自己写的,完了研究了zookeeper的再更一个

package server;import java.io.IOException;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;public class Server {private Selector selector;private static final int PORT = 20002;public void connect(SelectionKey key){ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();System.out.println("log: client connects...");try {SocketChannel socketChannel = serverChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);} catch (ClosedChannelException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}public String compute(String m){String[] op = m.split("\\+");BigDecimal a1 = new BigDecimal(op[0]);BigDecimal a2 = new BigDecimal(op[1]);return a1.add(a2).toString();}public void read(SelectionKey key){SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);StringBuffer sb = new StringBuffer();int c = 0;try {while((c = socketChannel.read(buffer)) > 0){buffer.flip();int size = buffer.remaining();byte[] bytes = new byte[size];buffer.get(bytes, 0, size);sb.append(new String(bytes));}if(c == -1){System.out.println("log: client closes");socketChannel.close();return;}else{System.out.println("log: msg-> " + sb.toString());String r = compute(sb.toString());SocketChannel sc = (SocketChannel) key.channel();sc.register(selector, SelectionKey.OP_WRITE, r);}} catch (IOException e) {e.printStackTrace();}}private void write(SelectionKey key){SocketChannel sc = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);buffer.put(((String)(key.attachment())).getBytes());buffer.flip();try {sc.write(buffer);sc.register(selector, SelectionKey.OP_READ);} catch (IOException e) {e.printStackTrace();}}public void start(){try {this.selector = Selector.open();ServerSocketChannel serverChannel = ServerSocketChannel.open();serverChannel.configureBlocking(false);serverChannel.bind(new InetSocketAddress(PORT));serverChannel.register(selector, SelectionKey.OP_ACCEPT, "server");System.out.println("log: sever starts...");while(selector.select() > 0){Iterator<SelectionKey> itr = selector.selectedKeys().iterator();while(itr.hasNext()){SelectionKey key = itr.next();if(key.isAcceptable()){connect(key);}else if(key.isReadable()){read(key);}else if(key.isWritable()){write(key);}itr.remove();}}} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {new Server().start();}}

最常使用的场景是请求响应模式,服务器和客户端处于不同的角色。服务端是被动方,客户端是主动方,所以客户端的特征是先写后读,而服务端是先读后写。再结合nio中可以监听的四种事件,accept是服务端专有的,也是服务端最开始监听的事件,而connect是客户端专有的,也是客户端最开始监听的事件。然后服务端一旦监听到了连接,就需要为这个新的socket设置read事件来等待客户端的请求,而客户端connect以后一般是设置write事件,等待用户输入然后向服务端发送请求。

nio的另一个特征是服务端的请求响应或者客户端的请求响应都是处于两个监听周期的。

对于服务端,肯定是先从read周期获取用户的请求,然后再在write周期返回,就涉及到了一个缓存结果,然后在下一个周期write的过程,其实这个有一个比较简单的做法就是每一次read周期内,解析了请求,然后得到了相应的资源结果,然后就需要注册一个写事件写入结果,那么可以在注册写事件的同时,把结果使用register的attachment来附加到key上,那么下一个写周期的时候,服务器就可以直接从key里面取到这个attachment,直接写入。这个办法应对简单的场景个人感觉还是很好的,因为nio的思路是使用一个select来管理所有的channel或者连接,所以,每一个read周期都有多个channel的请求,我们缓存结果的时候需要考虑或者维护一个映射关系,以便于在下一个write周期的时候知道每一个write事件就绪的channel要写入的是哪一份结果,这个维护过程我们可以使用attachment机制来应对,attachment本身也就是用来标识或者绑定与这个channel相关的一些东西,所以感觉这个办法是对请求响应服务器模型的一个很好的帮助。

所以个人感觉一个比较简单的nio服务器模式是:首先在serversocketchannel上注册一个accept事件,然后有了连接,就先注册一个读事件等待请求,然后一旦有了请求,就处理,计算结果,然后就注册一个写事件,并且把结果attach上,注意这里的事件都是单独的事件,尤其是写入结果的时候写事件不能和读事件一起,因为如果我们的while循环里如果先处理的是读事件,那么就会覆盖之前的结果。这个模型可以应对请求响应模式。如果是聊天那种,可能需要维护一个channel和session的关系。这也是我最近写了两个ni的简单服务器额感悟。

下面重点说下nio客户端怎么写。我感觉客户端还是最原始的bio比较好写。因为客户端通常吧只有一个socket,而且客户端通常会有一个客户端线程来梳理ui,每一个按钮都会对应一次发送请求并等待结果的过程。所以很自然地,就是使用bio会比较简单,就是用outstream发,然后用instream收。nio还是用在服务器端比较好,因为服务器要处理多个socket,所以使用bio会对应很多线程,才引入了nio的select机制。个人感觉客户端nio写起来并不是很舒服。

所以我研究了一下zookeeper的nio客户端的实现,大致写出了上面的那个客户端,感觉还是比较舒服的,当然还有很多需要完善的地方。

思路:

这里主要针对阻塞式的客户端模式,比如说客户端界面线程要获取服务端的数据来显示,这种场景一般都是阻塞式的,界面画一个圈然后转,知道服务器返回了资源,再显示。

(1)首先,必须将io的东西和客户端线程后者业务解耦,所以我们可以建立一个clientIO的内部类来处理网络io。然后封装一个发送的最小单位packet。所以客户端的每一次请求都会生成一个packet。

(2)客户端和io唯一通讯的是两个队列,一个outgoing和一个pending。

outgoing:是将要发送的packet;

pending:是发送完的packet,等待服务器的结果。

所以客户端每一次的packet会先进入outgoing队列,然后发送完以后再进入pending队列,当收到服务器相应以后再从pending移除,最后再通知客户端处理完毕。

(3)clientIO内部会建立这两个队列,还会有一个nio的线程来处理队列信息。首先在channel上注册一个connect事件,一旦连接就注册一个write事件,一旦有write事件,就调用write方法把outgoing队列中取一个packet发出去,把packet移到pending中,然后再注册一个读事件,等待结果。这样在下一个读事件里,就可以从pending中拿一个事件处理,最后把结果写入packet,通知客户端线程。这个过程每一次都是从队列的开始拿,从队列的尾部放,因为这个是tcp协议,所以最终会按顺序到达服务器,那么从服务器返回的结果,也就一定是pending队列中的第一个packet,所以这个对应关系正好解决了客户端的一个难题,那就是客户端通常是先写后读的,那么下一次读的时候怎么知道结果对应的是哪一个packet呢?这个双队列正好解决了这个对应问题,而且把客户端业务和io完完全无案解耦了,感觉非常好的一个设计。

所以clietIO的主要职责是:

维护两个队列,向客户端提供一个接口来向队列扔packet;

启动一个线程来处理列队中的packet,每一个写入周期把outgoing发送出去,每一个读周期把pending的packet附上结果。然后通知唤醒客户端线程。

那么客户端要做的就是:

每当业务方法需要io时,就构造一个packet,扔到io的队列里,然后阻塞在这个packet上,知道被clientIO唤醒,返回结果。

过程大致如下:

 private String getResult(String op){Packet packet = new Packet();packet.setOp(op);cIO.submit(packet);synchronized (packet) {try {packet.wait();} catch (InterruptedException e) {e.printStackTrace();}}return packet.getResult();}

【Java】NIO 仿照zookeeper 写的 nio客户端相关推荐

  1. java nio 客户端_Java网络编程:Netty框架学习(二)---Java NIO,实现简单的服务端客户端消息传输...

    概述 上篇中已经讲到Java中的NIO类库,Java中也称New IO,类库的目标就是要让Java支持非阻塞IO,基于这个原因,更多的人喜欢称Java NIO为非阻塞IO(Non-Block IO), ...

  2. 【Java NIO】一文了解NIO

    [Java NIO]一文了解NIO Java NIO 1 背景介绍 在上一篇文章中我们介绍了Java基本IO,也就是阻塞式IO(BIO),在JDK1.4版本后推出了新的IO系统(NIO),也可以理解为 ...

  3. java reactor模式例子_JAVA BIO,NIO,Reactor模式总结

    传统同步阻塞I/O(BIO) 在NIO之前编写服务器使用的是同步阻塞I/O(Blocking I/O).下面是一个典型的线程池客服端服务器示例代码,这段代码在连接数急剧上升的情况下,这个服务器代码就会 ...

  4. 【Java基础】从Java语言层面理解BIO,NIO,AIO(二)

    文章目录 零.从网络层面理解BIO,NIO,AIO 一.相关概念 1.什么是socket? 2.IP地址(IP Address) 3.端口(Port) 4.协议(Protocol) 4.1.协议简介 ...

  5. Java IO模型:BIO、NIO、AIO讲解

    文章目录 IO 首先:什么是IO? 为什么要改进IO? BIO.NIO.AIO BIO NIO NIO实现原理 Channel(通道) : Buffer(缓冲区): Selector(选择器) : A ...

  6. java中的nio_Java中的NIO基础知识

    上一篇介绍了五种NIO模型,本篇将介绍Java中的NIO类库,为学习netty做好铺垫 Java NIO 由3个核心组成,分别是Channels,Buffers,Selectors.本文主要介绍着三个 ...

  7. Java网络编程(6)NIO - Channel详解

    前言 NIO的三个核心组件:Buffer.Channel.Selector Java网络编程(4)NIO的理解与NIO的三个组件完成了大概的了解 Java网络编程(5)NIO - Buffer详解详细 ...

  8. 手写一个NIO群聊系统

    一.浅谈NIO 1. 什么是NIO? ​​Java NIO​​:同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有 ​​I/ ...

  9. Java IO框架之BIO、NIO、AIO

    我是傲骄鹿先生,沉淀.学习.分享.成长. 如果你觉得文章内容还可以的话,希望不吝您的「一键三连」,文章里面有不足的地方希望各位在评论区补充疑惑.见解以及面试中遇到的奇葩问法 目录 一.概述 二.BIO ...

  10. zookeeper简介以及C客户端用法

    zookeeper简介以及C客户端用法 前言 简介 zookeeper保证 理解zookeeper的顺序一致性 zookeeper 接口 安装 zoo.cfg参数详解 常用命令 C API zooke ...

最新文章

  1. 认识DOM的三大节点:元素节点,文本节点,属性节点以及nodeName,nodeType,nodeValue的区别
  2. 将你的Apache速度提高十倍的经验分享
  3. 创建性设计模式之2--建造者模式
  4. TCP协议疑难杂症全景解析|硬核
  5. [css] 用css3实现伪3D的文字效果
  6. MORMOT数据库连接池
  7. 前端之同源策略 Jsonp 与 CORS
  8. ROS入门-9.订阅者Subscriber的编程实现
  9. android 编译 c 程序,Android上通过gcc编译普通的C程序
  10. Google gae部署php简单说明
  11. 阿里巴巴100%云上双11
  12. 计算机CG技术未来发展前景,CG就业前景怎么样?
  13. Final Scrum
  14. 《翻译与本地化CAT软件实用教程》目录
  15. 物联网和工业物联网有什么区别?
  16. Tayga NAT64 IPv6与IPv4互访解决方案
  17. 雅虎邮箱pop服务器,使用Yahoo.com.cn的POP和SMTP
  18. 基于python+django框架+Mysql数据库的校园运动场地预约系设计与实现
  19. item_history_price - 获取京东商品历史价格信息
  20. 仿微信翻译----本地短信翻译。

热门文章

  1. android两个popwindow背景,Android PopWindow 设置背景亮度的实例
  2. 白帽子讲web安全笔记-xss总结
  3. JAVA电影购票系统
  4. 购票系统c语言座位分配,铁路购票系统的简单座位分配算法
  5. 常用的SQL注入语句
  6. Java开发工具 IntelliJ IDEA(idea使用教程,手把手教学)内容很全,一篇管够!!!
  7. 联想主板9针开关接线图_收藏丨34个电气控制接线图、电子元件工作原理图
  8. 软件测试及自动化测试
  9. C专家编程--随记(二)
  10. imshow、image以及imagesc的区别