先总结一下。
线程是最容易编写的并发方式,操作系统也提供了最好的支持;协程可以做到更强的并发能力,但需要实现调度器;回调是开销最小的,它本来不是特别为并发来设计的,它是通用的异步化操作的实现模型。
注意线程和协程本身也是使用异步来模拟同步,线程由操作系统来模拟,协程由用户级调度器模拟。模拟的过程是:发起事件请求、挂起当前执行过程(线程或协程)、响应事件请求、恢复挂起的执行过程。回调没有这么复杂,你需要自己把连续的执行过程分解成多步操作。
线程就不讨论了,用起来比较简单;协程之前简单研究了一下,切换开销比线程有很大改进,但还是有点大,用作IO事件调度还可以,粒度更小的操作就显得开销过大了,想象一下极端情况下每次调度只处理一字节;回调看起来是个不错的方式,但也有许多问题需要解决。
以上次的代码为例,这里简化一下:
- void server_loop(int fd) {
- while(true) {
- register_read_event(fd);
- wait_read_event();
- int client = accept(fd);
- close(client);
- }
- }
复制代码
改成对应的callback方式,为了让代码更像是工具转过来的,我把上一篇的代码修改一下:
- void server_loop(int fd) {
- register_read_event(fd);
- wait_read_event({
- int clientfd = accept(fd);
- close(clientfd);
- server_loop(fd);
- });
- }
复制代码
这里不打算讨论while循环的转换问题,虽然这也是个难题,但可以通过设置一个变量来避开。
看一下循环里面是如何转换的?只要把wait_read_event后面的代码放进一个closure就可以了。看起来很简单,如果可以通过宏或其它语言设施通知编译器把某行后面的代码构造一个代码块,的确是会简单一些,不过目前我还不知道有哪种语言可以这样。如果没有语言可以做到这样(或者你用的语言做不到),像上面一样手工构造这个代码块也是可以接受的,毕竟它提供了高性能的并发模式。
问题还没有彻底解决,构造这个代码块实际是撕裂了整个逻辑,想象一下,如果要把register_read_event/wait_read_event和accept封装起来,隐藏这个实现过程,结果要类似这样:
- void server_loop(int fd) {
- int client = register_read_event_AND_wait_AND_accept(fd);
- close(client);
- server_loop(fd);
- }
复制代码
问题来了,由于不是线程和协程模式,而是异步回调模式,这么写是不可能的,你只能再构造一个代码块:
- void register_read_event_AND_wait_AND_accept(int fd, void delegate(int) dg) {
- register_read_event(fd);
- wait_read_event({
- int client = accept(fd);
- dg(client);
- }
- }
- void server_loop(int fd) {
- register_read_event_AND_wait_AND_accept(fd, (int client){
- close(client);
- server_loop(fd);
- }
- }
复制代码
原以为把回调过程封装起来,以后直接调用就漂亮了,但结果却是每一次包装都没有减少它,你的逻辑必须做成closure传递进去,脱离不了这种不连贯的方式。
或许让编译器来转换?还是写成这样:
- void server_loop(int fd) {
- int client = register_read_event_AND_wait_AND_accept(fd);
- close(client);
- server_loop(fd);
- }
复制代码
它自动转成:
- void server_loop(int fd) {
- register_read_event_AND_wait_AND_accept(fd, (int client){
- close(client);
- server_loop(fd);
- }
- }
复制代码
如果修改编译器或许能让它做到,把register_read_event_AND_wait_AND_accept声明成 asynchronized,它就自动把该调用以及之后的代码进行转换。或者只要把wait_read_event这样声明,所有调用它的方法都自动把这个属性向外传播。。。。。。
一种新的语言?
本文介绍一个使用大概10行代码, 实现一个类似老赵JSCEX[1]的工具
首先来看一下下面这段Node.js代码:
- var copy = function(src, dst) {
- var fd_src = fs.open(src, "r");
- var fd_dst = fs.open(dst, "w");
- while (true) {
- var buffer_size = 4096;
- var buffer = new Buffer(buffer_size);
- var bytes_read = fs.read(fd_src, buffer, 0, buffer_size);
- if (bytes_read > 0) {
- var write_at = 0;
- while (write_at < bytes_read) {
- write_at += fs.write(fd_dst, buffer, write_at, bytes_read - write_at);
- }
- } else {
- break;
- }
- }
- };
复制代码
这段看起来像是一个文件拷贝的函数, 但是这个函数在Node里面是无法运行的, 因为Node的所有函数调用都必须是异步的, 这里用到的fs.open, fs.read, fs.write都必须修改为异步调用, 修改后可运行的代码如下:
- var copy = function(src, dst) {
- fs.open(src, "r", undefined, function(err, fd_src) {
- fs.open(dst, "w", undefined, function(err, fd_dst) {
- var copy_rec = function(position) {
- var buffer_size = 4096;
- var buffer = new Buffer(4096);
- fs.read(fd_src, buffer, 0, buffer_size, position, function(err, bytes_read) {
- if (bytes_read > 0) {
- var write_rec = function(write_at) {
- if (write_at < bytes_read) {
- fs.write(fd_dst, buffer, 0, bytes_read, position, function(err, written) {
- if (written > 0) {
- write_rec(write_at + written);
- }
- });
- } else {
- copy_rec(position + bytes_read);
- }
- };
- write_rec(0);
- }
- });
- };
- copy_rec(0);
- });
- });
- };
复制代码
看到这里, 我想第一次接触Node的同学就已经要崩溃了, 一个简单的文件拷贝函数竟然需要嵌套4~5层函数回调, 那么更复杂的比如web servlet岂不是要嵌套10多层函数回调了?
难道没有方法让最上面的简洁明了的代码运行起来么? 下面就介绍一种方法, 不修改最开始那个简洁明了的拷贝函数, 让它直接跑起来, 并且不会阻塞整个JS引擎
首先分析一下同步调用与异步调用的区别
- // sync
- var fd = fs.read (name, mode);
- // use fd
- // async
- fs.read (name, mode, function (fd) {
- // use fd
- });
复制代码
仔细观察这两个调用, 会发现其实这就是一个CPS变换[2], 所以, 我们只需要做一个CPS变换的包装就可以实现同步的文件操作, 具体实现如下:
- var efs = {
- open: function (name, mode) {
- return call_cc(function (continuation) {
- fs.open(name, mode, undefined, function(err, fd_src) {
- continuation(fd_src);
- });
- });
- },
- read: function (fd, buffer, offset, length) {
- return call_cc(function (continuation) {
- fs.read(fd, buffer, offset, length, null, function(err, bytes_read) {
- continuation(bytes_read);
- });
- });
- },
- write: function (fd, buffer, offset, length) {
- return call_cc(function (continuation) {
- fs.write(fd, buffer, offset, length, null, function(err, written) {
- continuation(written);
- });
- });
- }
- };
复制代码
这个实现的核心就是call/cc函数[3], 如果了解lisp或者scheme的同学可能会非常熟悉这个函数, 这个函数作用是把当前正在运行的线程挂起, 然后执行call/cc本体, call/cc函数不会返回, 直到主动调用参数传进来的那个回调函数(本例中continuation变量绑定的函数), call/cc函数的返回值就是传给回调函数的参数. call/cc函数确实很难理解, 下面举例说明一下:
- util.log("before call/cc");
- var result = call_cc(function (continuation) {
- util.log("in call/cc before set timer");
- setTimeout(function () {
- util.log("before callback");
- continuation("hello world!");
- util.log("after callback");
- }, 1000);
- util.log("in call/cc after set timer");
- });
- util.log("after call/cc " + result);
复制代码
这段代码的输出为:
- 31 Oct 13:01:33 - before call/cc
- // 运行call/cc函数主体
- 31 Oct 13:01:33 - in call/cc before set timer
- 31 Oct 13:01:33 - in call/cc after set timer
- // 休息1秒, 进入setTimeout的回调
- 31 Oct 13:01:34 - before callback
- // 进入continuation函数, 调用continuation函数就相当于call/cc函数返回了"hello world!", 从call/cc返回开始继续执行
- 31 Oct 13:01:34 - after call/cc hello world!
- // 执行setTimeout回调剩下部分
- 31 Oct 13:01:34 - after callback
复制代码
现在我们已经使用CPS转换得到了"同步"的文件处理函数, 虽然说看起来是同步的, 但是内部使用的是call/cc实现, 实际上是不会阻塞JS引擎的, 其他回调仍然能正常执行. 有了这些函数的支持, 本文开头给出的那个文件拷贝函数就能够不作任何修改正常执行, 现在一切问题都已经解决了, 唯一剩下的问题就是call/cc函数的实现, 标准的node.js引擎是没有call/cc函数的, 这里我们使用node-fibers[4]实现call/cc函数.
- var call_cc = function(cont) {
- var fiber = Fiber.current;
- cont(function (a) {
- fiber.run(a);
- });
- return Fiber["yield"]();
- };
复制代码
由于node-fiber的限制, 这个copy函数必须运行在fiber环境中, 完整代码如下:
- var fs = require("fs");
- require("fibers");
- var call_cc = function(cont) {
- var fiber = Fiber.current;
- cont(function (a) {
- fiber.run(a);
- });
- return Fiber["yield"]();
- };
- var efs = {
- open: function (name, mode) {
- return call_cc(function (continuation) {
- fs.open(name, mode, undefined, function(err, fd_src) {
- continuation(fd_src);
- });
- });
- },
- read: function (fd, buffer, offset, length) {
- return call_cc(function (continuation) {
- fs.read(fd, buffer, offset, length, null, function(err, bytes_read) {
- continuation(bytes_read);
- });
- });
- },
- write: function (fd, buffer, offset, length) {
- return call_cc(function (continuation) {
- fs.write(fd, buffer, offset, length, null, function(err, written) {
- continuation(written);
- });
- });
- }
- };
- var copy = function(src, dst) {
- var fd_src = efs.open(src, "r");
- var fd_dst = efs.open(dst, "w");
- while (true) {
- var buffer_size = 4096;
- var buffer = new Buffer(buffer_size);
- var bytes_read = efs.read(fd_src, buffer, 0, buffer_size);
- if (bytes_read > 0) {
- var write_at = 0;
- while (write_at < bytes_read) {
- write_at += efs.write(fd_dst, buffer, write_at, bytes_read - write_at);
- }
- } else {
- break;
- }
- }
- };
- Fiber(function () {
- copy("src", "dst");
- }).run();
复制代码
最后一点空间, 我想简单评论一下老赵的JSCEX[1], 简单来说, 就是一点: 简直就是在重复发明轮子.
有关函数式编程, 在lisp界已经有40~50年的研究, 各个方面都已经有非常完善的体系, 当然这种异步同步函数之间互相转换早已经有体系化, 可证明的研究[5], 实现这样一个异步类库, 最好是从底层做起, 首先实现fiber[4]或者shift/reset[6]这样的底层函数, 然后再其之上, 构建一个异步框架. 像老赵这样, 一上来就搞JS再编译, 那是绕了十万八千里的远路, 不仅本身会引入bug, 也会给用户的调试造成很大困扰, 性能问题也无法保证.
1. [1] JavaScript版本的AsyncEnumerator http://blog.zhaojie.me/2010/11/a ... -in-javascript.html ↩
2. [2] Continuation-passing style http://en.wikipedia.org/wiki/Continuation-passing_style ↩
3. [3] Call-with-current-continuation http://en.wikipedia.org/wiki/Call-with-current-continuation ↩
4. [4] Fiber support for v8 and Node https://github.com/laverdet/node-fibers ↩
5. [5] Olivier Danvy and Andrzej Filinski. 1990. Abstracting control. In Proceedings of the 1990 ACM conference on LISP and functional programming (LFP '90). ACM, New York, NY, USA, 151-160. DOI=10.1145/91556.91622 http://doi.acm.org/10.1145/91556.91622 ↩
6. [6] Tiark Rompf, Ingo Maier, and Martin Odersky. 2009. Implementing first-class polymorphic delimited continuations by a type-directed selective CPS-transform. In Proceedings of the 14th ACM SIGPLAN international conference on Functional programming (ICFP '09). ACM, New York, NY, USA, 317-328. DOI=10.1145/1596550.1596596 http://doi.acm.org/10.1145/1596550.1596596 ↩
http://blog.kghost.info/index.php/2011/10/callcc-and-node-js/
|