在【NIO】IO模型,这节课中,我们提到了5种IO模型。第四种,SIGIO一般都是在进程间使用信号通讯的时候的手段,在Java中不是很适用,我就不深入去讲了。第五种,linux 服务器上的典型代表是 aio,但在Java中也没有对应的内容。不过,有一种非常通用的模型,也应该归到第五种的纯异步模型中去,这就是Callback模型。

回调函数

Callback函数,翻译成回调函数。假如我们是A,在通知B去做一件事情的时候,告诉他,你做完的时候通过XXX方式通知我你做完了。比如说,这个XXX可能是打电话,可能是微信,可能是短信,可能是喊一嗓子。这个都是A向B注册的回调函数。

回调函数还可以做更多。我再举个例子,老板告诉员工说去查一下某个客户的联系方式。员工问,我查到以后呢?老板说你查到了就给客户打电话通知系统升级。好,这个过程中,员工问,我查到以后做什么,这其实就是请求注册一个回调函数。老板说你查到了再干某某事,这个某某事就是真正注册的回调函数。员工做完了第一件事情之后,就可以使用第一件事情的结果(电话号码)去执行第二件事了。这个过程老板甚至都不需要关心第一件和第二件事之间的切换,老板只需要通过注册回调告诉员工就行了。这可就是真正的异步模型了。

Callback的例子非常多,最出名,可能还是node.js了。例如:

//异步示例
var fs = require('fs');
fs.unlink('/tmp/hello', function(err){ // this function is callbackif (err) throw err;console.log('successfully deleted /tmp/hello');
});
console.log("hello world");

这是nodejs 删除一个文件所用的代码,如果运行它的话可以看到hello world,会在 successfully 之前。这是因为fs.unlink是一个非阻塞的函数,一调用就立即返回。然后就会去打印hello world,而等到操作系统真的把文件删除的操作完成了,就会调用后面的那个匿名方法。javascript中,函数是可以做为参数传递给另一个函数的。这个注册的匿名方法就是callback。

实际上,主线程根本不知道,操作系统完成删除一个文件的操作需要多长时间。这个回调函数在什么时刻会被执行也是完全不能预定的。这是一个纯异步的模型。

Java中的实现

其实回调函数的概念出现的很早,比Java的历史都长。C语言时代就已经有了,C是用函数指针去实现。我们先不去管它,我们思考一下Java怎么实现。

C可以传递函数指针,javascript 和 python 干脆就直接传递 function 做为参数。Java都不支持啊,怎么办?不怕,我们可以使用对象封装嘛。还是上节课所说的除法的例子,我把它改成Callback形式的:

public class CallbackServer
{public void run() throws IOException {ServerSocketChannel serverChannel = ServerSocketChannel.open();serverChannel.configureBlocking(false);serverChannel.socket().bind(new InetSocketAddress("127.0.0.1", 8000));Selector selector = Selector.open();serverChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) {selector.select();Iterator ite = selector.selectedKeys().iterator();while (ite.hasNext()) {SelectionKey key = (SelectionKey)ite.next();if (key.isAcceptable()) {ServerSocketChannel s = (ServerSocketChannel)key.channel();SocketChannel clientSocket = s.accept();System.out.println("Got a new Connection");clientSocket.configureBlocking(false);SelectionKey newKey = clientSocket.register(selector, SelectionKey.OP_WRITE);CommonClient client = new CommonClient(clientSocket, newKey);newKey.attach(client);System.out.println("client waiting");}else if (key.isReadable()) {CommonClient client = (CommonClient) key.attachment();client.onRead();}else if (key.isWritable()) {CommonClient client = (CommonClient) key.attachment();client.onWrite();}ite.remove();}}}public static void main( String[] args ) throws Exception{CallbackServer server = new CallbackServer();server.run();}
}class CommonClient {private SocketChannel clientSocket;private ByteBuffer recvBuffer;private SelectionKey key;private Callback callback;private String msg;public CommonClient(SocketChannel clientSocket, SelectionKey key) {this.clientSocket = clientSocket;this.key = key;recvBuffer = ByteBuffer.allocate(8);try {this.clientSocket.configureBlocking(false);key.interestOps(SelectionKey.OP_WRITE);} catch (IOException e) {}}public void close() {try {clientSocket.close();key.cancel();}catch (IOException e){};}// an rpc to notify client to send a numberpublic void sendMessage(String msg, Callback cback)  {this.callback = cback;try {try {recvBuffer.clear();recvBuffer.put(msg.getBytes());recvBuffer.flip();clientSocket.write(recvBuffer);key.interestOps(SelectionKey.OP_READ);} catch (IOException e) {e.printStackTrace();}}catch (Exception e) {}}// when key is writable, resume the fiber to continue// to write.public void onWrite() {sendMessage("divident", new Callback() {@Overridepublic void onSucceed(int data) {int a = data;sendMessage("divisor", new Callback() {@Overridepublic void onSucceed(int data) {int b = data;sendMessage(String.valueOf(a / b), null);}});}});}public void onRead() {int res = 0;try {try {recvBuffer.clear();// read may fail even SelectionKey is readable// when read fails, the fiber should suspend, waiting for next// time the key is ready.int n = clientSocket.read(recvBuffer);while (n == 0) {n = clientSocket.read(recvBuffer);}if (n == -1) {close();return;}System.out.println("received " + n + " bytes from client");} catch (IOException e) {e.printStackTrace();}recvBuffer.flip();res = getInt(recvBuffer);// when read ends, we are no longer interested in reading,// but in writing.key.interestOps(SelectionKey.OP_WRITE);} catch (Exception e) {}this.callback.onSucceed(res);}public int getInt(ByteBuffer buf) {int r = 0;while (buf.hasRemaining()) {r *= 10;r += buf.get() - '0';}return r;}
}interface Callback {public void onSucceed(int data);
}

服务端的核心逻辑就在于那个sendMessage,注意看,这个方法的定义是不是很像javascript的文件操作函数了?向客户端发送一个字符串,并且注册一个Callback方法。

callback的调用是在数据从客户端发送到服务端的时候,我们通过onRead方法从网络上取到数据,然后调用callback.succeed,在第一次的callback里,我们又调了一次sendMessage。第二次send得到了b,然后计算 a / b。

这里有一点小技巧。注意看,a 和 b 都不是成员变量,而是局部变量。**b 做为内嵌匿名类能够访问到定义它的匿名类的那个方法里的局部变量。**这种东西叫闭包。当然在Java语言中,我们不这样去讲它,我们讲,b 在 a 的词法作用域中,所以在定义 b 的地方还是可以访问到 a。如果是js,python,我们通常会叫它闭包了。

本质上,这个程序仍然是selector在驱动,但经过了一层封装以后,我们已经把状态机模型转成了Callback模型。我们已经不再手动去维护服务端的状态迁移了,而是通过Callback来定义一件事情完成以后要做什么。组织Callback的负担要比状态机的负担轻。

相应地,把客户端程序也做了一些修改:

public class CallbackClient {public static void main(String[] args) {try {SocketChannel socketChannel = SocketChannel.open();socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));ByteBuffer writeBuffer = ByteBuffer.allocate(32);ByteBuffer readBuffer = ByteBuffer.allocate(32);getMessage(readBuffer, socketChannel);sendRandomInt(writeBuffer, socketChannel, 1000);getMessage(readBuffer, socketChannel);try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}sendRandomInt(writeBuffer, socketChannel, 10);getMessage(readBuffer, socketChannel);socketChannel.close();} catch (IOException e) {}}public static void sendRandomInt(ByteBuffer writeBuffer, SocketChannel socketChannel, int bound) {Random r = new Random();int d = 0;d = r.nextInt(bound);if (d == 0)d = 1;System.out.println(d);writeBuffer.clear();writeBuffer.put(String.valueOf(d).getBytes());writeBuffer.flip();try {socketChannel.write(writeBuffer);} catch (IOException e) {e.printStackTrace();}}public static void getMessage(ByteBuffer readBuffer, SocketChannel socketChannel) {readBuffer.clear();byte[] buf = new byte[16];try {socketChannel.read(readBuffer);} catch (IOException e) {}readBuffer.flip();readBuffer.get(buf, 0, readBuffer.remaining());System.out.println(new String(buf));}
}

好了,今天就到这里了,我已经把Java中的IO模型都讲完了。大家一定要多动手练习,想通它,异步编程非常复杂,也非常重要。一定要掌握它。

【NIO】异步模型之Callback -- 封装NIO相关推荐

  1. 面试必会系列 - 5.1 网络BIO、NIO、epoll,同步/异步模型、阻塞/非阻塞模型,你能分清吗?

    本文已收录至 Github(MD-Notes),若博客中图片模糊或打不开,可以来我的 Github 仓库,包含了完整图文:https://github.com/HanquanHq/MD-Notes,涵 ...

  2. BIO、伪异步 IO、AIO和NIO

    BIO 采用 BIO 通信模型的服务端, 通常由一个独立的 Acceptor 线程负责监听客户端的连接, 它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理, 处理完成之后, 通过输出 ...

  3. NIO Reactor模型

    NIO Reactor模型 Reactor三种模型 单线程模型 多线程模型 主从多线程模型 Netty线程模型 1 线程组 2 ChannelPipeline 3 异步非阻塞 Reactor模式是基于 ...

  4. Java进阶(五)Java I/O模型从BIO到NIO和Reactor模式

    本文介绍了Java中的四种I/O模型,同步阻塞,同步非阻塞,多路复用,异步阻塞.同时将NIO和BIO进行了对比,并详细分析了基于NIO的Reactor模式,包括经典单线程模型以及多线程模式和多Reac ...

  5. 【面试】迄今为止把同步/异步/阻塞/非阻塞/BIO/NIO/AIO讲的这么清楚的好文章(快快珍藏)...

    网上有很多讲同步/异步/阻塞/非阻塞/BIO/NIO/AIO的文章,但是都没有达到我的心里预期,于是自己写一篇出来. 常规的误区 假设有一个展示用户详情的需求,分两步,先调用一个HTTP接口拿到详情数 ...

  6. 迄今为止把同步/异步/阻塞/非阻塞/BIO/NIO/AIO讲的这么清楚的好文章

    来源:编程新说 网上有很多讲同步/异步/阻塞/非阻塞/BIO/NIO/AIO的文章,但是都没有达到我的心里预期,于是自己写一篇出来. 常规的误区 假设有一个展示用户详情的需求,分两步,先调用一个HTT ...

  7. 一个故事讲清楚BIO NIO 异步

    转载请引用:一个故事讲清楚NIO 假设某银行只有10个职员.该银行的业务流程分为以下4个步骤: 1) 顾客填申请表(5分钟): 2) 职员审核(1分钟): 3) 职员叫保安去金库取钱(3分钟): 4) ...

  8. cpu 被挂起和阻塞_迄今为止把同步/异步/阻塞/非阻塞/BIO/NIO/AIO讲的这么清楚的好文章...

    网上有很多讲同步/异步/阻塞/非阻塞/BIO/NIO/AIO的文章,但是都没有达到我的心里预期,于是自己写一篇出来. 常规的误区 假设有一个展示用户详情的需求,分两步,先调用一个HTTP接口拿到详情数 ...

  9. 响应式web(一):什么是响应式web,异步调用,callback的本质,servlet3的异步

    Reactive 同步与异步 传统的web模型 "命令式编程":阻塞式模型,一个请求发过来,也许你会开启新的线程,但最后需要等待完成所有操作之后,才能返回response. 一个一 ...

最新文章

  1. Spring 工厂的相关的方法
  2. 程序员修神之路--分布式系统使用网关到底是好还是坏?
  3. nmon Analyser分析仪
  4. 2018 蓝桥杯省赛 B 组模拟赛(一)I. 天上的星星(二维前缀和)
  5. Hive(Impala)测试数据生成与加载
  6. 64位系统使用Access数据库文件的彻底解决方法
  7. jQuery操作DOM节点的相关方法
  8. 使用Cocos creator开发一个文字游戏
  9. C# 繁体转简体 简体转繁体
  10. Backtrader量化回测8——手续费
  11. 纳米机器人最新进展(2021年)
  12. 如何从Gitlab上拉取代码
  13. 魔术轮胎,dugoff轮胎建模 采用模块化建模方法,搭建非线性魔术轮胎PAC2002,dugoff模型
  14. iframe标签控制视频大小及自动播放
  15. 考研秘籍——考研流程及初试备战
  16. app ui设计规范
  17. 问题分享:Word中图片显示不全的解决方法
  18. DEV03-GBase 8a MPP Cluster OLAP 函数
  19. python属于什么部门_如何理解简历中部门名称的模式?
  20. 在 Ubuntu 上安装 jstest-gtk 手柄测试

热门文章

  1. 微生物组领域最高质量的资源全在这
  2. Nature:好导师的16个标准
  3. 生态统计学里的数据转化与标准化
  4. js弹出一段html,html js 弹出层
  5. Python使用matplotlib可视化散点图、使用seaborn中的lmplot函数可视化不同分组散点图的最优线性回归拟合曲线(Scatter plot with regression line)
  6. Python使用matplotlib可视化两个时间序列的交叉相关性图、交叉相关图显示了两个时间序列之间的滞后性(Cross Correlation plot)
  7. R语言使用lm构建线性回归模型、并将目标变量对数化实战:模型训练集和测试集的残差总结信息(residiual summary)、模型训练(测试)集自由度计算、模型训练(测试)集残差标准误计算
  8. R语言使用ggplot2可视化互相覆盖的直方图实战(Overlaying histograms)
  9. UserWarning: Label not :NUMBER: is present in all training examples
  10. 介绍一下K近邻(KNN)算法,KNeighbors和RadiusNeighbors的差异是什么?各有什么优势?