Collections 集合处理

async.forEachOf  |  eachOf(object, iterator, callback)

  • 实现功能:遍历object对象执行iterator,报错或遍历执行完成时调用callback(error);callback(error)函数的触发时机需要手动在iterator中设置。
  • 源码解读:_keyIterator函数借用闭包遍历对象或数组(闭包中缓存数组的index值、或对象属性集合的index值),执行时获取不同的index值展开遍历,遍历终止时返回null。iterator若为异步延迟函数,每次启动执行时completed+1,延迟完成调用only_once(done)函数时completed-1,completed在源码中发挥作用没那么明确。iterator函数的尾参only_once(done)通过内部函数封装后输出给使用者,实现类似Promise模块将resolve、reject函数封装后输出给模块的调用者。callback函数在内部的执行条件是在外部调用过程中携带error错误,或者key=null、completed=0,即遍历执行完毕。
async.forEachOf =async.eachOf = function (object, iterator, callback) {callback = _once(callback || noop);object = object || [];var iter = _keyIterator(object);var key, completed = 0;while ((key = iter()) != null) {// 反复执行闭包函数iter遍历对象completed += 1;// only_once(done)中调用外部函数callback,机理同Promise-resolve相似// 内部函数对外部函数的影响是参数iterator(object[key], key, only_once(done));}if (completed === 0) callback(null);function done(err) {completed--;if (err) {callback(err);}// Check key is null in case iterator isn't exhausted// and done resolved synchronously.else if (key === null && completed <= 0) {callback(null);}}};
  • 主要问题:报错不影响后续iterator函数执行,参看async.some方法实现的中断回调的执行,捕获到err时,生成status=false状态,根据status状态调用回调函数callback。参看async.applyEach针对数组元素为函数的特殊情况,函数参数相同。
  • 官方示例:遍历读取json文件,报错或单次读取操作完成时执行callback回调,区别是携带参数为err或null。
var obj = {dev: "/dev.json", test: "/test.json", prod: "/prod.json"};
var configs = {};async.forEachOf(obj, function (value, key, callback) {fs.readFile(__dirname + value, "utf8", function (err, data) {if (err) return callback(err);// 报错时执行callbacktry {configs[key] = JSON.parse(data);} catch (e) {return callback(e);}callback();// 执行完成时调用function(err)});
}, function (err) {if (err) console.error(err.message);// configs is now a map of JSON datadoSomethingWith(configs);
});

async.forEach  |  each(arr, iterator, callback)

  • 实现功能:实现功能同async.eachOf,只是iterator函数中没有数组的index值,或者没有对象属性集合的index值。
  • 源码解读:_without函数用于去除iterator函数携带的数组index值参数。
async.forEach =
async.each = function (arr, iterator, callback) {return async.eachOf(arr, _withoutIndex(iterator), callback);
};
  • 官方示例:遍历文件信息。
async.each(openFiles, function(file, callback) {// Perform operation on file here.console.log('Processing file ' + file);if( file.length > 32 ) {console.log('This file name is too long');callback('File name too long');} else {// Do work to process file hereconsole.log('File processed');callback();}
}, function(err) {// if any of the file processing produced an error, err would equal that errorif( err ) {// One of the iterations produced an error.// All processing will now stop.console.log('A file failed to process');} else {console.log('All files have been processed successfully');}
});

async.forEachOfSeries |  eachOfSeries(obj, iterator, callback)

  • 实现功能:async.eachOf对obj遍历调用的iterator函数有延迟时,同步执行该延迟。eachOfSeries方法则在一个延迟函数iterator完成后再调用下一个延迟函数iterator,实现依赖于使用者调用尾参only_once(done)函数的时机。和async.eachOf方法不同的另一点是,callback回调触发时机为捕获到错误或者遍历执行完毕时。
  • 源码解读:通过_keyIterator函数控制遍历对象或数组的节奏,在延迟执行完成后获取下一元素项执行iterator函数,或者通过完整遍历依次获取元素项执行iterator函数(async.eachOf方法中实现),通常的each方法不能做到。
async.forEachOfSeries =async.eachOfSeries = function (obj, iterator, callback) {callback = _once(callback || noop);obj = obj || [];var nextKey = _keyIterator(obj);var key = nextKey();function iterate() {var sync = true;if (key === null) {return callback(null);}iterator(obj[key], key, only_once(function (err) {if (err) {callback(err);}else {key = nextKey();if (key === null) {return callback(null);} else {if (sync) {async.setImmediate(iterate);} else {iterate();}}}}));sync = false;}iterate();};
  • 示例:阻塞式读取文件,遍历完成或报错时打印错误消息,终止遍历。
var obj = {dev: "/dev.json", test: "/test.json", prod: "/prod.json"};
var configs = {};async.forEachOf(obj, function (value, key, callback) {fs.readFile(__dirname + value, "utf8", function (err, data) {if (err) return callback(err);try {configs[key] = JSON.parse(data);callback(); // 启动下一次读取} catch (e) {return callback(e);}});
}, function (err) {if (err) console.error(err.message);// configs is now a map of JSON datadoSomethingWith(configs);
});

async.forEachSeries |  eachSeries(arr, iterator, callback)

  • 实现功能:实现功能同async.eachOfSeries,只是iterator函数中没有数组的index值,或者没有对象属性集合的index值。
  • 源码解读:_without函数用于去除iterator函数携带的数组index值参数。
async.forEachSeries =
async.eachSeries = function (arr, iterator, callback) {return async.eachOfSeries(arr, _withoutIndex(iterator), callback);
};

async.forEachOfLimit  |  eachOfLimit(obj, iterator, callback)

  • 实现功能:当limit=1时,eachOfLimit方法同async.eachOfSeries相似,也是上一次延时执行完成后再执行下一次延时函数,callback回调的触发时机也是报错或遍历完成后。当limit=obj.lenth(数组的长度,或对象属性集合的长度)时,eachOfLimit方法同async.eachOf相似,延时函数iterator同步执行,callback回调的触发时机是报错或遍历完成,在延时过程中报错,没法阻止下一个iterator函数的执行。limit在1和obj.length之间,limit个数的iterator同步启动,当一个延时执行完成,加载下一个iterator函数。iterator同时执行个数限制为limit。
  • 源码解读:
async.forEachOfLimit =async.eachOfLimit = function (obj, limit, iterator, callback) {_eachOfLimit(limit)(obj, iterator, callback);};function _eachOfLimit(limit) {return function (obj, iterator, callback) {callback = _once(callback || noop);obj = obj || [];var nextKey = _keyIterator(obj);if (limit <= 0) {return callback(null);}var done = false;var running = 0;// 记录iterator执行个数,iterator启动前+1,执行完成后-1var errored = false;(function replenish () {if (done && running <= 0) {return callback(null);}while (running < limit && !errored) {var key = nextKey();if (key === null) {done = true;if (running <= 0) {callback(null);}return;}running += 1;iterator(obj[key], key, only_once(function (err) {running -= 1;if (err) {callback(err);errored = true;}else {replenish();}}));}})();};}

async.forEachLimit  |  eachLimit(arr, iterator, callback)

  • 实现功能:实现功能同async.eachOfLimit,只是iterator函数中没有数组的index值,或者没有对象属性集合的index值。
  • 源码解读:_without函数用于去除iterator函数携带的数组index值参数。
async.forEachLimit =async.eachLimit = function (arr, limit, iterator, callback) {return _eachOfLimit(limit)(arr, _withoutIndex(iterator), callback);};

async.map  |  mapSeries  |  mapLimit(arr, iterator, callback)

  • 实现功能:async.map调用async.eachOf方法(延时函数同时执行),async.mapSeries调用async.eachOfSeries方法(延时函数逐个执行),async.mapLimit调用async.eachOfLimit方法(限制延时函数同时执行的个数)。三者均遍历obj执行iterator函数,主要目的是获取处理后的最终值results,并传给最终的回调函数callback进行处理。
  • 源码:
function doParallel(fn) {return function (obj, iterator, callback) {return fn(async.eachOf, obj, iterator, callback);};
}
function doParallelLimit(fn) {return function (obj, limit, iterator, callback) {return fn(_eachOfLimit(limit), obj, iterator, callback);};
}
function doSeries(fn) {return function (obj, iterator, callback) {return fn(async.eachOfSeries, obj, iterator, callback);};
}function _asyncMap(eachfn, arr, iterator, callback) {callback = _once(callback || noop);arr = arr || [];var results = _isArrayLike(arr) ? [] : {};eachfn(arr, function (value, index, callback) {iterator(value, function (err, v) {results[index] = v;callback(err);});}, function (err) {callback(err, results);});
}async.map = doParallel(_asyncMap);
async.mapSeries = doSeries(_asyncMap);
async.mapLimit = doParallelLimit(_asyncMap);
  • 源码:iterator加工获得results,并传给callback作后续处理
async.map(['file1','file2','file3'], fs.stat, function(err, results) {// results is now an array of stats for each file
});

async.inject  |  foldl  |  reduce(arr, memo, iterator, callback)

  • 实现功能:调用async.eachOfSeries方法(延时函数逐个执行),三者均遍历obj执行iterator函数,主要目的是处理并更新memo,并传给最终的回调函数callback进行处理。
  • 源码:
async.inject =async.foldl =async.reduce = function (arr, memo, iterator, callback) {async.eachOfSeries(arr, function (x, i, callback) {iterator(memo, x, function (err, v) {memo = v;// 更新memocallback(err);// 错误处理});}, function (err) {callback(err, memo);//memo不是通过传值或者闭包驻留,由上级作用域赋予});};
  • 官方示例
async.reduce([1,2,3], 0, function(memo, item, callback) {// pointless async:process.nextTick(function() {callback(null, memo + item) // 更新memo等});
}, function(err, result) {// result is now equal to the last value of memo, which is 6
});

async.foldr  |  reduceRight(arr, memo, iterator, callback)

  • 实现功能:同async.reduce相似,区别是自右向左遍历。
  • 源码解读:
async.foldr =async.reduceRight = function (arr, memo, iterator, callback) {var reversed = _map(arr, identity).reverse();async.reduce(reversed, memo, iterator, callback);};

async.transform(arr, memo, iterator, callback)

  • 实现功能:同async.reduce方法相似,transform方法在每次延时执行完成后处理memo,等到各延时均执行完成后,调用callback获取并处理memo。transform方法内部调用async.eachOf方法,因此各延时函数近乎同一时间执行,非阻塞式实现。
  • 源码解读:
async.transform = function (arr, memo, iterator, callback) {if (arguments.length === 3) {callback = iterator;iterator = memo;memo = _isArray(arr) ? [] : {};}async.eachOf(arr, function(v, k, cb) {iterator(memo, v, k, cb);}, function(err) {callback(err, memo);});};
  • 官方示例:
async.transform([1,2,3], function(acc, item, index, callback) {// pointless async:process.nextTick(function() {acc.push(item * 2)callback(null)});
}, function(err, result) {// result is now equal to [2, 4, 6]
});
async.transform({a: 1, b: 2, c: 3}, function (obj, val, key, callback) {setImmediate(function () {obj[key] = val * 2;callback();})
}, function (err, result) {// result is equal to {a: 2, b: 4, c: 6}
})

async.select | filter |  selectSeries | filterSeries  |  selectLimit | filterLimit(arr, iterator, callback)

  • 实现功能:async.filter调用async.eachOf方法(延时函数同时执行),async.filterSeries调用async.eachOfSeries方法(延时函数逐个执行),async.filterLimit调用async.eachOfLimit方法(限制延时函数同时执行的个数)。三者均遍历arr执行iterator函数,主要目的是iterator回调传参v过滤arr,arr的最终值是由原来的元素项构成的数组形式,arr最初为对象时,最终值由对象的键值构成数组,并将该最终值传给最终的回调函数callback进行处理。
  • 源码解读:
function _filter(eachfn, arr, iterator, callback) {var results = [];eachfn(arr, function (x, index, callback) {iterator(x, function (v) {if (v) {results.push({index: index, value: x});}callback();});}, function () {callback(_map(results.sort(function (a, b) { // 由回调函数重置数组的元素项为results的value键值return a.index - b.index;}), function (x) {return x.value;}));});}
    // 调用async.eachOf方法遍历obj执行iterator,通过iterator的回调传参v过滤,返回数组results// 经过处理后,构成按序排列原元素值的数组,最终传给callback回调async.select =async.filter = doParallel(_filter);async.selectLimit =async.filterLimit = doParallelLimit(_filter);async.selectSeries =async.filterSeries = doSeries(_filter);
  • 官方示例:
async.filter(['file1','file2','file3'], function(filePath, callback) {fs.access(filePath, function(err) {callback(null, !err)});
}, function(err, results) {// results now equals an array of the existing files 以数组形式返回存在的文件
});

async.reject  |  rejectSeries   |  rejectLimit(arr, iterator, callback)

  • 实现功能:与async.filter的不同之处是,async.filter方法凭借iterator的回调参数v为真值保留arr原始项,async.reject凭借v为否时保留arr原始项。
  • 源码解读:
function _reject(eachfn, arr, iterator, callback) {_filter(eachfn, arr, function(value, cb) {iterator(value, function(v) {cb(!v);});}, callback);}// 与async.filter的不同是,reject方法的过滤条件是,iterator的回调传参v为否值async.reject = doParallel(_reject);async.rejectLimit = doParallelLimit(_reject);async.rejectSeries = doSeries(_reject);
  • 官方示例:
async.reject(['file1','file2','file3'], function(filePath, callback) {fs.access(filePath, function(err) {callback(null, !err)});
}, function(err, results) {// results now equals an array of missing files 为不存在的文件进行后续生成文件操作createFiles(results);
});

async.any  |  some(arr, iterator, callback)

  • 实现功能:async.any|some方法调用ansyc.eachOf方法,当arr中有一项符合条件时,回调函数获得参数即为true。回调触发时机为遍历的每个阶段arr有一项符合条件时,或者在遍历执行完成时,当遍历执行完成时触发回调函数,回调函数不携带参数。
  • 源码解读:遍历的每个阶段有符合条件项时都能触发回调的原由是,以getResult(true, x)替换eachOf方法中的err,err为真,也即触发回调。不符合条件时,err为否,也即不触发回调,除非遍历过程执行完毕。中断遍历的实现是,arr中有一项符合条件,将cb改写为false,调用callback()回调,因为没有携带参数,eachOf方法相关代码也便不执行了。
// check、getResult均为传递函数,一个转化为布尔值,一个原样输出function _createTester(eachfn, check, getResult) {return function(arr, limit, iterator, cb) {function done() {if (cb) cb(getResult(false, void 0));}function iteratee(x, _, callback) {if (!cb) return callback();// 参数function(v){}由内部函数传给外部,作为iterator的回调,执行done回调iterator(x, function (v) {// v由外部函数传入,x是arr的元素项if (cb && check(v)) {// v为真、cb存在时执行cb回调,cb和iterator都赋值为false// 因为iteratee中首先判断cb是否为否值,后续遍历直接进入callback回调// 进入callback回调,callback在遍历执行完成时才有意义cb(getResult(true, x));cb = iterator = false;}callback();});}if (arguments.length > 3) {eachfn(arr, limit, iteratee, done);} else {cb = iterator;iterator = limit;eachfn(arr, iteratee, done);}};}// 调用async.eachOf方法,arr参数中有一项满足条件时,回调获得结果为真// 回调时机为遍历的每个阶段执行完毕async.any =async.some = _createTester(async.eachOf, toBool, identity);
  • 官方示例:原示例callback首参为null,将无机会触发回调。
async.some(['file1','file2','file3'], function(filePath, callback) {fs.access(filePath, function(err) {callback(!err)});
}, function(result) {// 有一项满足条件时,result为true// if result is true then at least one of the files exists
});

async.someLimit(arr, limit, iterator, callback)

  • 实现功能:async.someLimit方法同ansyc.some方法,只是对同时触发的iterator个数作了限制。
  • 源码解读:
async.someLimit = _createTester(async.eachOfLimit, toBool, identity);

async.all  |  every(arr, iterator, callback)

  • 实现功能:async.all|every方法调用ansyc.eachOf方法,当arr中每一项符合条件时,回调函数获得参数即为true。回调触发时机为在遍历执行完成时。当遍历每个阶段执行完成时,回调函数不携带参数,因而也就不触发回调。
  • 源码解读:有一项不满足条件中断回调的原由是,cb传入否值,以及callback参数为空,回调不得执行。最末一次时,cb赋值为空,done函数不得执行,回调也不能顺利执行。
// check、getResult均为传递函数,取反function _createTester(eachfn, check, getResult) {return function(arr, limit, iterator, cb) {function done() {// cb回调为真值,遍历完成后才触发执行if (cb) cb(getResult(false, void 0));}function iteratee(x, _, callback) {if (!cb) return callback();// 参数function(v){}由内部函数传给外部,作为iterator的回调,执行done回调iterator(x, function (v) {// v由外部函数传入,x是arr的元素项// v传入真值,check永远为否,cb回调不执行,callback只在遍历完成时执行// v为否值,cb回调参数为否值,回调不执行if (cb && check(v)) {cb(getResult(true, x));cb = iterator = false;}callback();});}if (arguments.length > 3) {eachfn(arr, limit, iteratee, done);} else {cb = iterator;iterator = limit;eachfn(arr, iteratee, done);}};}// 调用async.eachOf方法,arr参数中每一项满足条件时,回调获得结果为真// 回调时机为遍历执行完成时async.all =async.every = _createTester(async.eachOf, notId, notId);
  • 官方示例:原示例callback首参为null,将无机会触发回调。
async.some(['file1','file2','file3'], function(filePath, callback) {fs.access(filePath, function(err) {callback(!err)});
}, function(result) {// 每一项满足条件时,result为true
});

async.everyLimit(arr, limit, iterator, callback)

  • 实现功能:async.everyLimit方法同ansyc.every方法,只是对同时触发的iterator个数作了限制。
  • 源码解读:
async.everyLimit = _createTester(async.eachOfLimit, notId, notId);

async.detect | detectSeries (arr, iterator, callback)

async.detectLimit (arr, iterator, callback)

  • 实现功能:async.detect方法同ansyc.some方法相同,当arr中有一项满足条件,触发回调,不同于some方法的是,some方法传递给回调函数的参数为v判断条件,detect传参为arr中第一个符合条件的元素项。detectSeries方法上一个延时函数完成后调用下一个延时函数;detectLimit方法对同时执行的迭代器iterator作了限制。
  • 源码解读:
function _findGetResult(v, x) {return x;}// 与some方法相同,调用了_createTester函数,差别是some传参为v判断条件,detect传参为arr的元素项async.detect = _createTester(async.eachOf, identity, _findGetResult);async.detectSeries = _createTester(async.eachOfSeries, identity, _findGetResult);async.detectLimit = _createTester(async.eachOfLimit, identity, _findGetResult);
  • 官方示例:
async.detect(['file1','file2','file3'], function(filePath, callback) {fs.access(filePath, function(err) {callback(!err)});
}, function(result) {// result now equals the first file in the list that exists
});

async.sortBy (arr, iterator, callback)

  • 实现功能 :async.sortBy实现arr的排序,遍历执行完成后将重新排序的arr传给回调。
  • 源码解读:
// 根据用户设置的criteria排序async.sortBy = function (arr, iterator, callback) {async.map(arr, function (x, callback) {iterator(x, function (err, criteria) {if (err) {callback(err);}else {callback(null, {value: x, criteria: criteria});}});}, function (err, results) {if (err) {return callback(err);}else {callback(null, _map(results.sort(comparator), function (x) {return x.value;}));}});function comparator(left, right) {var a = left.criteria, b = right.criteria;return a < b ? -1 : a > b ? 1 : 0;}};
  • 官方示例:
async.sortBy(['file1','file2','file3'], function(file, callback) {fs.stat(file, function(err, stats) {callback(err, stats.mtime);});
}, function(err, results,result) {// results is now the original array of files sorted by// modified date 根据修改时间排序的文件
});
async.sortBy([1,9,3,5], function(x, callback) {callback(null, x*-1);    //<- x*-1 instead of x, turns the order around
}, function(err,result) {// result callback
});

async.concat  |  concatSeries(arr, iterator, callback)

  • 实现功能 :async.concat方法用于遍历arr执行iterator函数过程中没有报错的项
  • 源码解读:
// 拼接arr元素项传参给回调,前提是没有捕获到错误,捕获到错误则略过function _concat(eachfn, arr, fn, callback) {var result = [];eachfn(arr, function (x, index, cb) {fn(x, function (err, y) {result = result.concat(y || []);cb(err);});}, function (err) {callback(err, result);});}async.concat = doParallel(_concat);async.concatSeries = doSeries(_concat);
  • 官方示例
async.concat(['dir1','dir2','dir3'], fs.readdir, function(err, files) {// files is now a list of filenames that exist in the 3 directories// 对存在的目录名进行拼接
});

Controls Flow 流程管控

async.parallel  |  parallelLimit  |  series(tasks, callback)

  • 实现功能:async.parallel调用async.eachOf方法,遍历执行tasks中的任务函数,根据tasks的数据类型(数组或对象形式)构建最终值results,传给最终回调callback函数。async.parallelLimit调用async.eachOfLimit方法,async.series调用async.eachOfSeries方法,其他原理相同。tasks或者以键值对形式存储任务函数,或者以数组形式存储任务函数。results通过作用域实现在task任务函数和最终回调callback中传递。
  • 源码解读:
function _parallel(eachfn, tasks, callback) {callback = callback || noop;var results = _isArrayLike(tasks) ? [] : {};eachfn(tasks, function (task, key, callback) {task(_restParam(function (err, args) {if (args.length <= 1) {args = args[0];}results[key] = args;// 根据tasks数据类型构建results,传给最终回调,results根据作用域传递callback(err);}));}, function (err) {callback(err, results);});}// 遍历执行tasks中任务函数,数组或对象形式构建results,传给回调函数callbackasync.parallel = function (tasks, callback) {_parallel(async.eachOf, tasks, callback);};async.parallelLimit = function(tasks, limit, callback) {_parallel(_eachOfLimit(limit), tasks, callback);};async.series = function(tasks, callback) {_parallel(async.eachOfSeries, tasks, callback);};
  • 官方示例:示例一为数组形式,数组二为对象形式。
async.parallel([function(callback) {setTimeout(function() {callback(null, 'one');}, 200);},function(callback) {setTimeout(function() {callback(null, 'two');}, 100);}
],
// optional callback
function(err, results) {// the results array will equal ['one','two'] even though// the second function had a shorter timeout.
});
// an example using an object instead of an array
async.parallel({one: function(callback) {setTimeout(function() {callback(null, 1);}, 200);},two: function(callback) {setTimeout(function() {callback(null, 2);}, 100);}
}, function(err, results) {// results is now equals to: {one: 1, two: 2}
});

async.applyEach  |  applyEachSeries(fns, args, callback)

  • 实现功能:async.applyEach方法调用async.eachOf遍历执行fns数组中的函数元素项,fns函数参数均相同,且为args、callback。async.applyEachSeries方法调用async.eachOfSeries,为阻塞式调用。
  • 源码解读:
 function _applyEach(eachfn) {return _restParam(function(fns, args) {var go = _restParam(function(args) {var that = this;var callback = args.pop();// 尾项为回调函数return eachfn(fns, function (fn, _, cb) {fn.apply(that, args.concat([cb]));},callback);});if (args.length) {return go.apply(this, args);}else {return go;}});}// 调用async.eachOf遍历执行fns函数,fns函数参数均相同,且为args、callbackasync.applyEach = _applyEach(async.eachOf);async.applyEachSeries = _applyEach(async.eachOfSeries);
  • 官方示例:
async.applyEach([enableSearch, updateSchema], 'bucket', callback);// partial application example:
async.each(buckets,async.applyEach([enableSearch, updateSchema]),callback
);

async.auto(tasks, concurrency, callback)

  • 实现功能:由tasks中处理函数获得结果results,该结果值传给最终处理函数callback作为参数,再作后续处理。特殊情况下,当tasks中任务函数需要在另一函数执行完毕后调用,async.auto会自动根据依赖关系,率先执行无依赖的任务函数,接着执行依赖函数已经执行的任务函数。当函数执行过程中报错时,最终回调callback捕获到错误err,而results由先前及本次执行函数的结果值构成。concurrency限制同时执行的任务个数。async.auto用来处理异步延时函数时,需要设置task[task.length - 1]尾项任务函数中callback回调的触发时机,进而调用依赖函数执行。
  • 源码解读:对依赖函数的处理,若tasks中任务函数无依赖时,执行task[task.length - 1](taskCallback, results),即tasks任务对象中每个键值下的尾项数组元素function(callback, results),该函数中,调用并执行taskCallback(err,result)函数,其一是更新results[k]=result输出给最终回调的参数值,其二是调用taskComplete遍历依赖函数队列listeners,加载其中依赖函数已经执行的任务函数。若tasks中任务函数有依赖时,且依赖函数尚未执行,将该任务函数存放到依赖函数队列listeners中。
// 自动以无依赖的函数到依赖执行完成的函数顺序执行函数// tasks以函数映射名为键,函数队列为值,尾参是处理results的回调函数async.auto = function (tasks, concurrency, callback) {if (typeof arguments[1] === 'function') {// concurrency is optional, shift the args.callback = concurrency;concurrency = null;}callback = _once(callback || noop);var keys = _keys(tasks);var remainingTasks = keys.length;// tasks对象共有多少键,即共多少任务// 没有任务,直接调用回调函数if (!remainingTasks) {return callback(null);}if (!concurrency) {concurrency = remainingTasks;// concurrency同时执行的任务数,默认为tasks对象有多少键}var results = {};var runningTasks = 0;// 跑的任务数,与concurrency比较以对同时跑的任务数作限制var hasError = false;// 需要依赖函数完成加载后才执行的函数,通过addListener方法添加到listeners队列// 每当一个函数加载完成,调用taskComplete执行依赖加载完成的函数项,调用removeListener更新listeners队列var listeners = [];function addListener(fn) {listeners.unshift(fn);}function removeListener(fn) {var idx = _indexOf(listeners, fn);if (idx >= 0) listeners.splice(idx, 1);}// remainingTasks-1,遍历listeners执行依赖加载完成的函数项function taskComplete() {remainingTasks--;_arrayEach(listeners.slice(0), function (fn) {fn();});}// tasks中任务函数已执行完毕的情况下,remainingTasks=0,执行最终回调函数callbackaddListener(function () {if (!remainingTasks) {callback(null, results);}});_arrayEach(keys, function (k) {if (hasError) return;var task = _isArray(tasks[k]) ? tasks[k]: [tasks[k]];// 因为startIndex为0,_restParam作用是将taskCallback赋值为function(err,args),处理resultsvar taskCallback = _restParam(function(err, args) {runningTasks--;if (args.length <= 1) {args = args[0];}if (err) {var safeResults = {};_forEachOf(results, function(val, rkey) {safeResults[rkey] = val;});safeResults[k] = args;// 关于k值的驻留???hasError = true;// 报错直接调用最终回调函数callback(err, safeResults);// 没有错误,执行taskComplete,遍历listeners执行依赖加载完成的函数项}else {results[k] = args;// results由tasks的末尾项task[task.length - 1]参数带入async.setImmediate(taskComplete);}});// 检查依赖的函数是否存在,依赖可以是数组形式,但不能依赖自身var requires = task.slice(0, task.length - 1);// 排除最后一项// prevent dead-locksvar len = requires.length;var dep;while (len--) {if (!(dep = tasks[requires[len]])) {throw new Error('Has nonexistent dependency in ' + requires.join(', '));}if (_isArray(dep) && _indexOf(dep, k) >= 0) {throw new Error('Has cyclic dependencies');}}// 依赖函数x在results中,执行函数k不在results中,且同时跑的函数个数runningTasks小于限制function ready() {// _reduce遍历数组项对尾参memo作处理,迭代器函数iterator中首参为memo,次参为元素项return runningTasks < concurrency && _reduce(requires, function (a, x) {return (a && results.hasOwnProperty(x));}, true) && !results.hasOwnProperty(k);}// task[task.length - 1]末尾项为函数,之前为依赖// taskCallback,通过调用taskComplete,遍历listeners执行依赖加载完成的函数项if (ready()) {runningTasks++;task[task.length - 1](taskCallback, results);}else {addListener(listener);}// 添加listeners函数队列的依赖函数,执行时runningTasks+1,删除listeners队列依赖function listener() {if (ready()) {runningTasks++;removeListener(listener);task[task.length - 1](taskCallback, results);}}});};
  • 官方示例:tasks键值对任务函数末尾项参数为callback、results,官方示例写反
async.auto({// this function will just be passed a callbackreadData: async.apply(fs.readFile, 'data.txt', 'utf-8'),showData: ['readData', function(cb,results) {// results.readData is the file's contents// ...}]
}, callback);
async.auto({get_data: function(callback) {console.log('in get_data');// async code to get some datacallback(null, 'data', 'converted to array');},make_folder: function(callback) {console.log('in make_folder');// async code to create a directory to store a file in// this is run at the same time as getting the datacallback(null, 'folder');},write_file: ['get_data', 'make_folder', function(callback, results)) {console.log('in write_file', JSON.stringify(results));// once there is some data and the directory exists,// write the data to a file in the directorycallback(null, 'filename');}],email_link: ['write_file', function(callback, results) {console.log('in email_link', JSON.stringify(results));// once the file is written let's email a link to it...// results.write_file contains the filename returned by write_file.callback(null, {'file':results.write_file, 'email':'user@example.com'});}]
}, function(err, results) {console.log('err = ', err);console.log('results = ', results);
});

async.retry(times, task, callback)

  • 实现功能:task任务报错时以times设置的重置执行次数,反复调用task任务并执行,直到task任务成功时或执行到最末一个task任务时,调用最终回调函数callback。async.retry方法通过调用async.series方法实现,当times中设置了重复执行task任务的等待时间interval时,async.series方法的任务队列中添加setTimeout(fn,interval)延迟函数,fn中调用下一个任务队列函数seriesCallback(null)。成功时终止执行任务队列函数、报错时继续执行下一个任务队列函数的实现,利用async.series方法任务队列函数中内部回调函数seriesCallback传参为否时,调用下一个任务队列函数,为真时,调用最终回调。因此,当task执行成功时,seriesCallback传参为真,终止执行任务队列函数;当task任务函数执行失败时,seriesCallback传参为否,继续执行下一个任务队列函数。
  • 源码解读:
// 报错时重复尝试task任务函数,直到最末一个task任务或某个task任务执行成功时,跳向最终回调async.retry = function(times, task, callback) {var DEFAULT_TIMES = 5;var DEFAULT_INTERVAL = 0;var attempts = [];var opts = {times: DEFAULT_TIMES,// task执行次数interval: DEFAULT_INTERVAL// task执行完成间隙时间};function parseTimes(acc, t){if(typeof t === 'number'){acc.times = parseInt(t, 10) || DEFAULT_TIMES;} else if(typeof t === 'object'){acc.times = parseInt(t.times, 10) || DEFAULT_TIMES;acc.interval = parseInt(t.interval, 10) || DEFAULT_INTERVAL;} else {throw new Error('Unsupported argument type for \'times\': ' + typeof t);}}var length = arguments.length;if (length < 1 || length > 3) {throw new Error('Invalid arguments - must be either (task), (task, callback), (times, task) or (times, task, callback)');} else if (length <= 2 && typeof times === 'function') {callback = task;task = times;}if (typeof times !== 'function') {parseTimes(opts, times);}opts.callback = callback;opts.task = task;function wrappedTask(wrappedCallback, wrappedResults) {function retryAttempt(task, finalAttempt) {// seriesCallback为task任务执行完成后回调函数,通过async.series方法设置// seriesCallback首参等同async.series方法中执行函数的回调函数参数err,正常执行需为否// 为真跳到最终回调// seriesCallback在async.series方法的意义是调用下一个task或最终回调return function(seriesCallback) {// task首参为函数,该函数中调用async.series方法seriesCallback,也就是设置回调时机task(function(err, result){// err为否,执行成功的前提下,跳到最终回调,结束task任务的反复执行// err为真,忽略该错误,继续执行task任务// 到最末一个task任务时,执行完毕即调用最终回调seriesCallback(!err || finalAttempt, {err: err, result: result});}, wrappedResults);};}// 两个task任务之间添加等待时间function retryInterval(interval){return function(seriesCallback){setTimeout(function(){seriesCallback(null);}, interval);};}while (opts.times) {var finalAttempt = !(opts.times-=1);attempts.push(retryAttempt(opts.task, finalAttempt));// 两个task任务之间添加等待时间if(!finalAttempt && opts.interval > 0){attempts.push(retryInterval(opts.interval));}}// function(done,data)作为async.series方法的最终回调,done为是否报错,data为传入数据async.series(attempts, function(done, data){// data为tasks处理获得的最终值,获取尾项,其余项为中间处理值data = data[data.length - 1];(wrappedCallback || opts.callback)(data.err, data.result);});}// If a callback is passed, run this as a controll flow// 没有callback参数项时,输出为函数,需要传递wrappedCallback与wrappedResultsreturn opts.callback ? wrappedTask() : wrappedTask;};
  • 示例:
function(callback) {async.retry({times:5, interval:1000}, function(cb) {do_task();var some_err = '';var some_result = '';cb(some_err, some_result);// 启用下一个任务函数或最终回调}, function(err, result) {callback(err, result);});},

async.iterator( tasks )

  • 实现功能:async模块内部_keyIterator函数遍历数组或对象执行时机是可控。当遍历对象是数组时,执行一次,去除数组元素的首项,执行第二次,取出数组元素的第二项。async.itertor方法同该数组、对象遍历函数相似,以可控的方式遍历执行tasks函数队列。调用async.itertor方法返回tasks队列首函数的执行函数,又通过执行函数返回次函数的执行函数,构成链式调用。特别的,可以当前执行函数的next方法获取下一个任务函数的执行函数,在async.waterfall方法中有使用。内部_keyIterator函数通过闭包实现,async.itertor方法通过返回函数实现。
  • 源码解读:
// 以可控的方式遍历执行tasks函数队列// makeCallback构建执行函数,并作为返回值// 执行函数调用过程中,执行tasks的任务函数,并调用makeCallback函数构建下一个任务函数的执行函数作为返回值// 执行函数的next方法获取下一个任务函数的执行函数async.iterator = function (tasks) {function makeCallback(index) {function fn() {// 执行函数if (tasks.length) {tasks[index].apply(null, arguments);}return fn.next();}fn.next = function () {return (index < tasks.length - 1) ? makeCallback(index + 1): null;};return fn;}return makeCallback(0);};
  • 示例:
var a=async.intertor([function(){console.log(1)
},function(){console.log(2)
}]);
var b=a();// 打印1
b(); // 打印2
a.next()() // 打印2

async.waterfall(tasks, callback)

  • 实现功能:async.intertor(tasks)方法为顺序执行tasks任务函数,上一个任务函数和下一个任务函数的关联只是影响执行时机,对下一个任务函数的参数无影响。async.auto(tasks, concurrency, callback)方法可以设置下一组任务函数的参数,但是最终回调函数callback的参数由多个任务函数处理值拼接后形成。async.parallel | series方法,最终回调函数获得参数同auto方法,由历次任务函数结果值拼接成数组或对象构成。比较下,async.waterfall(tasks, callback)方法也可以设置下一个任务函数的参数,并且最终回调callback的参数也由最后一个任务函数传参形成,似水珠似滚落,修改后传给下一个回调;async.auto像滚雪球,越滚越大。
  • 源码解读:
// 顺序执行tasks任务函数,并为任务函数传递参数,报错或任务函数执行完毕调用callbackasync.waterfall = function (tasks, callback) {callback = _once(callback || noop);if (!_isArray(tasks)) {var err = new Error('First argument to waterfall must be an array of functions');return callback(err);}if (!tasks.length) {return callback();}function wrapIterator(iterator) {return _restParam(function (err, args) {if (err) {callback.apply(null, [err].concat(args));}else {var next = iterator.next();// async.iterator方法获取下一个执行函数if (next) {args.push(wrapIterator(next));}else {args.push(callback);}// 调用ensureAsync避免堆栈溢出// 执行iterator函数,函数尾参为下一个iterator函数或最终回调callback// 同时为下一个iterator函数或最终回调callback传递参数ensureAsync(iterator).apply(null, args);}});}wrapIterator(async.iterator(tasks))();};
  • 官方示例:
async.waterfall([function(callback) {callback(null, 'one', 'two');},function(arg1, arg2, callback) {// arg1 now equals 'one' and arg2 now equals 'two'callback(null, 'three');},function(arg1, callback) {// arg1 now equals 'three'callback(null, 'done');}
], function (err, result) {// result now equals 'done'
});

async.whilst ( test, intertor, callback)

async.doWhilst ( intertor, test, callback)

async.until ( test, intertor, callback)

async.doUntil ( intertor, test, callback)

  • 实现功能:根据test校验结果执行intertor,test返回为真,执行intertor,test为否,执行最终回调callback。async.whilst方法和async.doWhilst方法的差别是前者可能执行也可能不执行intertor,完成看test校验结果,后者intertor必然执行一遍,随后看test校验结果。test获得参数和intertor获得参数相同,由使用者提供。until方法与whilst方法不同之处是,until方法作取反校验。
  • 源码解读:
// test合格后,执行iterator,否则跳到最终回调callback,目的是处理变量async.whilst = function (test, iterator, callback) {callback = callback || noop;if (test()) {var next = _restParam(function(err, args) {// args由外部函数传入if (err) {callback(err);} else if (test.apply(this, args)) {iterator(next);// 调用下一个iterator} else {callback.apply(null, [null].concat(args));// 执行完成后最终回调,参数为null、args}});iterator(next);// 第一个iterator} else {callback(null);}};// iterator必然执行一次,其他看test检验是否合格async.doWhilst = function (iterator, test, callback) {var calls = 0;return async.whilst(function() {return ++calls <= 1 || test.apply(this, arguments);}, iterator, callback);};// 取反校验async.until = function (test, iterator, callback) {return async.whilst(function() {return !test.apply(this, arguments);}, iterator, callback);};async.doUntil = function (iterator, test, callback) {return async.doWhilst(iterator, function() {return !test.apply(this, arguments);}, callback);};
  • 官方示例:
var count = 0;
async.whilst(function() { return count < 5; },function(callback) {count++;setTimeout(function() {callback(null, count);}, 1000);},function (err, n) {// 5 seconds have passed, n = 5}
);

async.during ( test, intertor, callback)

async.doDuring ( intertor, test, callback)

  • 实现功能:相比whilst方法,during方法支持异步校验,test通过传递check函数作为参数的方式,校验check函数的参数是否为真,check函数的执行时机由使用者把握,whilst方法中test函数执行时机固定。
  • 源码解读:
// 同whilst,不同的是whilst方法校验立即执行,during方法校验需要等待使用者向test回调函数中注入参数// 可以实现异步校验,test回调参数通过延时函数获得async.during = function (test, iterator, callback) {callback = callback || noop;var next = _restParam(function(err, args) {if (err) {callback(err);} else {args.push(check);// args可以在同步处理或异步处理后使用test.apply(this, args);// args尾参中传入check,使用者调用执行}});var check = function(err, truth) {if (err) {callback(err);} else if (truth) {iterator(next);} else {callback(null);}};test(check);};async.doDuring = function (iterator, test, callback) {var calls = 0;async.during(function(next) {if (calls++ < 1) {next(null, true);} else {test.apply(this, arguments);}}, iterator, callback);};
  • 官方示例:
var count = 0;async.during(function (callback) {// 可设置异步函数,在异步函数中执行callback校验回调,实现异步校验return callback(null, count < 5);},function (callback) {count++;setTimeout(callback, 1000);},function (err) {// 5 seconds have passed}
);

async.queue(worker, concurrency)

async.cargo(worker, payload)

  • 实现功能:async.queue添加任务函数并执行,提供暂停和重启功能。通过构建queue对象实现,任务函数在woker函数执行过程中启动。queue方法中参数concurrency用以限制同步执行的任务函数个数,cargo方法中参数concurrency设为1,即一次执行一个任务单元;cargo方法中参数payload设置预加载的任务单元中包含几个任务函数,这几条任务函数和下一个任务单元为同步执行关系,queue方法中payload设置为1,即一个任务单元只有一条任务函数。
  • 源码解读:实现过程中queue.tasks缓存待执行的任务函数,push方法调用_insert注册任务函数时添加,process执行任务单元时减少,kill方法清空,idle方法判断任务函数是否执行完毕;workers正在执行任务单元个数,process方法自增1,process方法中设置的回调函数_next函数自减1,表示该任务单元执行完成;workersList正在执行的任务单元,process方法添加,_next函数清除。unshift、push方法添加任务函数更新queue.tasks任务函数队列,并调用process方法执行任务函数;process方法设置任务单元,任务单元中包含的函数,调用worker方法执行任务函数的回调,启动下一个任务函数的执行;pause方法终止执行;resume方法恢复执行。saturated、empty、drain方法通过改写实现其功能,saturated方法当任务单元执行数量达到限制时执行,empty方法当process方法校验任务函数队列queue.tasks为空时执行,drain方法当所有任务执行完成后调用。
// 加载或执行任务函数(回调函数),任务函数的目的是启动任务函数中的回调函数function _queue(worker, concurrency, payload) {if (concurrency == null) {concurrency = 1;}else if(concurrency === 0) {throw new Error('Concurrency must not be zero');}// q.tasks待执行的任务函数队列添加任务function _insert(q, data, pos, callback) {// pos头部插入任务队列q.tasksif (callback != null && typeof callback !== "function") {throw new Error("task callback must be a function");}q.started = true;if (!_isArray(data)) {data = [data];}if(data.length === 0 && q.idle()) {// call drain immediately if there are no tasksreturn async.setImmediate(function() {q.drain();});}_arrayEach(data, function(task) {var item = {data: task,callback: callback || noop};if (pos) {q.tasks.unshift(item);} else {q.tasks.push(item);}if (q.tasks.length === q.concurrency) {q.saturated();}});async.setImmediate(q.process);}// 一个任务执行完成后回调,执行任务数workers-1,更新执行任务队列workersList// 执行任务固有的回调函数task.callback,调用执行下一个任务function _next(q, tasks) {return function(){workers -= 1;var removed = false;var args = arguments;// 更新workersList执行任务队列_arrayEach(tasks, function (task) {_arrayEach(workersList, function (worker, index) {if (worker === task && !removed) {workersList.splice(index, 1);removed = true;}});// 执行任务固有的回调函数task.callback.apply(task, args);});if (q.tasks.length + workers === 0) {q.drain();}q.process();};}var workers = 0;// 执行中的任务数量var workersList = [];// 执行中的任务var q = {tasks: [],// 任务函数队列concurrency: concurrency,// 同步执行的worker个数payload: payload,saturated: noop,// 空函数,改写后实现功能,同步执行达到限制时执行empty: noop,// 改写实现功能,任务为空时执行drain: noop,// 改写实现功能,所有任务都执行完成后调用started: false,paused: false,push: function (data, callback) {// 尾部插入_insert(q, data, false, callback);},kill: function () {// 清空q.tasks任务函数队列q.drain = noop;q.tasks = [];},unshift: function (data, callback) {// 头部插入_insert(q, data, true, callback);},process: function () {while(!q.paused && workers < q.concurrency && q.tasks.length){// 从q.tasks中取出任务,并更新q.tasks需要执行的任务函数队列// tasks一个任务单元,任务单元中的任务同步执行var tasks = q.payload ?q.tasks.splice(0, q.payload) :q.tasks.splice(0, q.tasks.length);// 预加载的任务函数数量var data = _map(tasks, function (task) {return task.data;});if (q.tasks.length === 0) {q.empty();}workers += 1;workersList.push(tasks[0]);var cb = only_once(_next(q, tasks));worker(data, cb);// _queue函数参数worker执行过程中调用cb函数执行下一个任务函数}},length: function () {// 需要执行的任务return q.tasks.length;},running: function () {// 同步执行的任务数return workers;},workersList: function () {// 同步执行的任务return workersList;},idle: function() {// 所有任务都已执行完成return q.tasks.length + workers === 0;},pause: function () {// 中断process函数执行q.paused = true;},resume: function () {// 恢复执行if (q.paused === false) { return; }q.paused = false;var resumeCount = Math.min(q.concurrency, q.tasks.length);// Need to call q.process once per concurrent// worker to preserve full concurrency after pausefor (var w = 1; w <= resumeCount; w++) {async.setImmediate(q.process);}}};return q;}// 返回queue对象,听过该对象添加或执行任务函数,通过调用worker函数执行任务函数// concurrency限制同步执行的任务个数async.queue = function (worker, concurrency) {// push进queue对象的任务函数通过调用worker执行其功能var q = _queue(function (items, cb) {worker(items[0], cb);}, concurrency, 1);return q;};// 返回queue对象,听过该对象添加或执行任务函数,通过调用worker函数执行任务函数// payload单次执行的任务单元中包含几条任务函数,这几条任务函数和下一个任务单元同步执行async.cargo = function (worker, payload) {return _queue(worker, 1, payload);};
  • 官方示例:
// create a queue object with concurrency 2
var q = async.queue(function(task, callback) {console.log('hello ' + task.name);callback();
}, 2);// assign a callback
q.drain = function() {console.log('all items have been processed');
};// add some items to the queue 添加任务函数并执行,任务函数在queue方法首参callback中执行
q.push({name: 'foo'}, function(err) {// 先打印task.name 即foo,后打印finished processing fooconsole.log('finished processing foo');
});
q.push({name: 'bar'}, function (err) {console.log('finished processing bar');
});// add some items to the queue (batch-wise)
q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function(err) {console.log('finished processing item');
});// add some items to the front of the queue
q.unshift({name: 'bar'}, function (err) {console.log('finished processing bar');
});
// create a cargo object with payload 2
var cargo = async.cargo(function(tasks, callback) {for (var i=0; i<tasks.length; i++) {console.log('hello ' + tasks[i].name);}callback();
}, 2);// add some items
cargo.push({name: 'foo'}, function(err) {console.log('finished processing foo');
});
cargo.push({name: 'bar'}, function(err) {console.log('finished processing bar');
});
cargo.push({name: 'baz'}, function(err) {console.log('finished processing baz');
});

async.priorityQueue(worker, concurrency)

  • 实现功能:priorityQueue方法同queue方法基本相同,同时可以设置任务函数执行的优先级。priorityQueue方法内部调用queue方法,同时删除queue.unshift方法,改写queue.push方法。
  • 源码解读:通过中值法以及移位运算符将优先级较低的任务函数插到queue.tasks合适的位置值得留意
 // 内部调用async.queue方法返回queue对象,只是删除了unshift方法、改写了push方法// 设置任务函数执行的优先级async.priorityQueue = function (worker, concurrency) {// 比较优先级,越低越高优先级function _compareTasks(a, b){return a.priority - b.priority;}// 通过不断和拆分数组的中值作比较,得出item应该插到sequence数组中哪个位置function _binarySearch(sequence, item, compare) {var beg = -1,end = sequence.length - 1;while (beg < end) {var mid = beg + ((end - beg + 1) >>> 1);// 无符号右移,即除2,忽略余数if (compare(item, sequence[mid]) >= 0) {beg = mid;} else {end = mid - 1;}}return beg;}function _insert(q, data, priority, callback) {if (callback != null && typeof callback !== "function") {throw new Error("task callback must be a function");}q.started = true;if (!_isArray(data)) {data = [data];}if(data.length === 0) {// call drain immediately if there are no tasksreturn async.setImmediate(function() {q.drain();});}_arrayEach(data, function(task) {var item = {data: task,priority: priority,callback: typeof callback === 'function' ? callback : noop};q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item);if (q.tasks.length === q.concurrency) {q.saturated();}async.setImmediate(q.process);});}// Start with a normal queuevar q = async.queue(worker, concurrency);// Override push to accept second parameter representing priorityq.push = function (data, priority, callback) {_insert(q, data, priority, callback);};// Remove unshift functiondelete q.unshift;return q;};
  • 示例:
var q = async.priorityQueue(function(task, callback) {console.log('hello ' + task.name);callback();
}, 2);q.drain = function() {console.log('all items have been processed');
};q.push({name: 'foo'}, 1,function(err) {console.log('finished processing foo');
});
q.push({name: 'bar'},2, function (err) {console.log('finished processing bar');
});q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}],3, function(err) {console.log('finished processing item');
});

async.times | timesSeries | timesLimit( times, intertor, callback)

  • 实现功能:times方法调用map方法,timesSeries调用mapSeries方法,timesLimit调用mapLimit方法,设置iterator执行次数,传参为index值,即index遍历。
  • 源码解读:
function _times(mapper) {return function (count, iterator, callback) {// _range(count)输出为0到count构成数组mapper(_range(count), iterator, callback);};}// 设置iterator的调用次数countasync.times = _times(async.map);async.timesSeries = _times(async.mapSeries);async.timesLimit = function (count, limit, iterator, callback) {return async.mapLimit(_range(count), limit, iterator, callback);};
  • 官方示例:
// Pretend this is some complicated async factory
var createUser = function(id, callback) {callback(null, {id: 'user' + id});
};// generate 5 users
async.times(5, function(n, next) {createUser(n, function(err, user) {next(err, user);});
}, function(err, users) {// we should now have 5 users
});

async.seq | compose(fns)

  • 实现功能:返回函数,函数参数传递memo,seq方法本质调用async.reduce方法实现,遍历fns函数并执行,各回调处理memo后,将memo传递给最终回调callback函数。compose反向遍历并执行fns函数。
  • 源码解读:
// async.reduce的简化方案,遍历执行函数,对传递数据作处理后传给各回调async.seq = function (/* functions... */) {var fns = arguments;return _restParam(function (args) {var that = this;var callback = args[args.length - 1];if (typeof callback == 'function') {args.pop();} else {callback = noop;}async.reduce(fns, args, function (newargs, fn, cb) {fn.apply(that, newargs.concat([_restParam(function (err, nextargs) {cb(err, nextargs);})]));},function (err, results) {callback.apply(that, [err].concat(results));});});};async.compose = function (/* functions... */) {return async.seq.apply(null, Array.prototype.reverse.call(arguments));};

官方示例:

// Requires lodash (or underscore), express3 and dresende's orm2.
// Part of an app, that fetches cats of the logged user.
// This example uses `seq` function to avoid overnesting and error
// handling clutter.
app.get('/cats', function(request, response) {var User = request.models.User;async.seq(_.bind(User.get, User),  // 'User.get' has signature (id, callback(err, data))function(user, fn) {user.getCats(fn);      // 'getCats' has signature (callback(err, data))})(req.session.user_id, function (err, cats) {if (err) {console.error(err);response.json({ status: 'error', message: err.message });} else {response.json({ status: 'ok', message: 'Cats found', data: cats });}});
});

async.forever( fn, callback)

  • 实现功能:fn执行过程中,next回调执行调用fn自身,当报错时调用callback回调。
  • 源码解读:
// fn执行完毕,使用next函数,或者调用自身,或者报错时执行回调async.forever = function (fn, callback) {var done = only_once(callback || noop);var task = ensureAsync(fn);function next(err) {if (err) {return done(err);}task(next);}next();};
  • 官方示例:
async.forever(function(next) {// next is suitable for passing to things that need a callback(err [, whatever]);// it will result in this function being called again.},function(err) {// if next is called with a value in its first parameter, it will appear// in here as 'err', and execution will stop.}
);

Utils 工具方法

async.ensureAsync(fn)

  • 实现功能:根据fn(args,callback)是否延时函数,包装callback函数后传给fn。实现过程,ensureAsync方法接收fn的参数,改写后,再传递给fn。改写callback的目的是,当fn为同步函数时,使用async.setImmediate方法立即执行(该方法执行完成后也许立即回收堆栈),可以避免内存的浪费。
  • 源码解读:
function ensureAsync(fn) {// 输入参数最后一项是回调函数,sync为真同步执行状态,为否异步执行状态// 改写回调项,async.setImmediate跟普通调用callback的区别是没有堆栈溢出?return _restParam(function (args) {var callback = args.pop();args.push(function () {var innerArgs = arguments;if (sync) {async.setImmediate(function () {callback.apply(null, innerArgs);});} else {callback.apply(null, innerArgs);}});var sync = true;fn.apply(this, args);sync = false;});}// 改写参数函数fn中参数的回调函数callback设置,callback启动时机依然需使用者触发// 改写callback的目的避免同步函数的内存浪费async.ensureAsync = ensureAsync;
  • 官方示例:
function sometimesAsync(arg, callback) {if (cache[arg]) {return callback(null, cache[arg]); // this would be synchronous!!} else {doSomeIO(arg, callback); // this IO would be asynchronous}
}// this has a risk of stack overflows if many results are cached in a row
// 堆栈溢出
async.mapSeries(args, sometimesAsync, done);// this will defer sometimesAsync's callback if necessary,
// preventing stack overflows
// 避免堆栈溢出
async.mapSeries(args, async.ensureAsync(sometimesAsync), done);

async.ensureAsync(fn)

  • 实现功能:重设fn的参数。通过改造函数ensureAsync方法向fn中传递参数,返回fn的包装执行函数,该包装执行函数的参数也传给fn作为参数。
  • 源码解读:
// 返回fn的执行函数,目的是重设fn的参数async.apply = _restParam(function (fn, args) {return _restParam(function (callArgs) {return fn.apply(null, args.concat(callArgs));});});
  • 官方示例:
node> var fn = async.apply(sys.puts, 'one');
node> fn('two', 'three');
one
two
three// sys.puts获得参数"one","two","three"

async.log | dir(fn,args)

  • 实现功能:在异步函数执行完成后打印结果,dir方法调用console.dir以dom视图打印。实现思路是,调用_resetParam重设fn函数的参数,最后一项为回调函数callback,其余项为log、dir方法的args,callback在内部封装后输出给外部,外部设置该回调的执行时机以及参数。
  • 源码解读:
function _console_fn(name) {return _restParam(function (fn, args) {// 通过内部函数向外部函数传递回调函数function(err,args)作为参数// 外部函数中设置回调函数的执行时机// 实际意义似乎和在异步函数中直接调用console.log()没差别fn.apply(null, args.concat([_restParam(function (err, args) {if (typeof console === 'object') {if (err) {if (console.error) {console.error(err);}}else if (console[name]) {_arrayEach(args, function (x) {console[name](x);});}}})]));});}async.log = _console_fn('log');async.dir = _console_fn('dir');// 显示dom试图/*async.info = _console_fn('info');async.warn = _console_fn('warn');async.error = _console_fn('error');*/
  • 官方示例:
// in a module
var hello = function(name, callback) {setTimeout(function() {callback(null, 'hello ' + name);}, 1000);
};// in the node repl
node> async.log(hello, 'world');
'hello world'
// in a module
var hello = function(name, callback) {setTimeout(function() {callback(null, {hello: name});}, 1000);
};// in the node repl
node> async.dir(hello, 'world');
{hello: 'world'}

async.memoize(fn,hasher)

async.unmemoize(fn)

  • 实现功能:若fn已执行,且传参相同,重新调用fn的回调函数,若fn执行过程中,且fn传参相同,添加fn的回调函数。实现思路是使用queues记录fn参数下所用回调函数,执行完成时在回调函数中清空queues,当fn执行过程中,且传参相同,queues相应键值下就有值,可以做比较;在fn执行完成的回调中,使用memo记录fn参数下回调函数使用的参数,再次用同样参数执行fn时,memo相应键值下为真;初次调用作普通函数处理。memoize方法返回memoized函数。unmemoize方法将memoized函数作为普通函数处理。
  • 源码解读:
// 传参相同,fn已执行的状态下,重复调用回调函数,或fn执行中,添加回调函数async.memoize = function (fn, hasher) {var memo = {};var queues = {};var has = Object.prototype.hasOwnProperty;hasher = hasher || identity;var memoized = _restParam(function memoized(args) {var callback = args.pop();var key = hasher.apply(null, args);if (has.call(memo, key)) {   // 已执行,立即执行fn的回调async.setImmediate(function () {callback.apply(null, memo[key]);});}else if (has.call(queues, key)) {// fn执行过程中,参数args相同,添加回调callbackqueues[key].push(callback);}else {queues[key] = [callback];// 执行前fn回调添加到queues队列fn.apply(null, args.concat([_restParam(function (args) {memo[key] = args;// fn执行完成更新memo记录已执行,对象的键可以是数组、对象等数据类型var q = queues[key];delete queues[key];// 执行完清空该fn回调队列queues[key]for (var i = 0, l = q.length; i < l; i++) {q[i].apply(null, args);// 执行fn的回调}})]));}});memoized.memo = memo;memoized.unmemoized = fn;// 没有memo记录的直接调用return memoized;};// 对async.memoize返回对象或普通函数fn作处理,视为普通函数处理async.unmemoize = function (fn) {return function () {return (fn.unmemoized || fn).apply(null, arguments);};};
  • 官方示例:
var slow_fn = function(name, callback) {// do somethingcallback(null, result);
};
var fn = async.memoize(slow_fn);// fn can now be used as if it were slow_fn   初次使用fn
fn('some name', function() {// callbackconsole.log(1)
});// 添加fn回调
fn('some name', function() {// callbackconsole.log(2)
});

async.constant(value)

  • 实现功能:使回调函数的首参err值为null,次参为value。
  • 源码解读:
// 重设callback首参err为nullasync.constant = _restParam(function(values) {var args = [null].concat(values);return function (callback) {return callback.apply(this, args);};});
  • 官方示例:
async.waterfall([async.constant(42),function (value, next) {// value === 42},//...
], callback);async.waterfall([async.constant(filename, "utf8"),fs.readFile,function (fileData, next) {//...}//...
], callback);async.auto({hostname: async.constant("https://server.net/"),port: findFreePort,launchServer: ["hostname", "port", function (options, cb) {startServer(options, cb);}],//...
}, callback);

async.wrapSync | asyncify(func)

  • 实现功能:返回函数中传入func函数的参数(取出最末一个回调函数callback),执行func,若返回值不是带有then方法的thenable对象,直接执行回调函数callback;若返回值是thenable对象,执行该thenable对象的then方法。
  • 源码解读:
// 执行func,若返回值为thenable对象,再次执行该thenable对象的then方法,若不是,直接调用回调函数async.wrapSync =async.asyncify = function asyncify(func) {return _restParam(function (args) {var callback = args.pop();var result;try {result = func.apply(this, args);} catch (e) {return callback(e);}// if result is Promise objectif (_isObject(result) && typeof result.then === "function") {result.then(function(value) {callback(null, value);})["catch"](function(err) {callback(err.message ? err : new Error(err));});} else {callback(null, result);}});};
  • 官方示例:
// passing a regular synchronous function
async.waterfall([async.apply(fs.readFile, filename, "utf8"),async.asyncify(JSON.parse),function (data, next) {// data is the result of parsing the text.// If there was a parsing error, it would have been caught.}
], callback);// passing a function returning a promise
async.waterfall([async.apply(fs.readFile, filename, "utf8"),async.asyncify(function (contents) {return db.model.create(contents);}),function (model, next) {// `model` is the instantiated model object.// If there was an error, this function would be skipped.}
], callback);// es6 example
var q = async.queue(async.asyncify(async function(file) {var intermediateStep = await processFile(file);return await somePromise(intermediateStep)
}));q.push(files);

整体代码

/*!* async* https://github.com/caolan/async** Copyright 2010-2014 Caolan McMahon* Released under the MIT license*/
(function () {var async = {};function noop() {}function identity(v) {return v;}function toBool(v) {return !!v;}function notId(v) {return !v;}// global on the server, window in the browservar previous_async;// Establish the root object, `window` (`self`) in the browser, `global`// on the server, or `this` in some virtual machines. We use `self`// instead of `window` for `WebWorker` support.var root = typeof self === 'object' && self.self === self && self ||typeof global === 'object' && global.global === global && global ||this;if (root != null) {previous_async = root.async;}async.noConflict = function () {root.async = previous_async;return async;};// 包装执行函数,执行完成后清空function only_once(fn) {return function() {if (fn === null) throw new Error("Callback was already called.");fn.apply(this, arguments);fn = null;};}// 包装执行函数,执行完成后清空function _once(fn) {return function() {if (fn === null) return;fn.apply(this, arguments);fn = null;};}cross-browser compatiblity functions // 转化为字符串var _toString = Object.prototype.toString;// 是否数组var _isArray = Array.isArray || function (obj) {return _toString.call(obj) === '[object Array]';};// 是否对象var _isObject = function(obj) {var type = typeof obj;return type === 'function' || type === 'object' && !!obj;};// 是否数组或伪数组function _isArrayLike(arr) {return _isArray(arr) || (// has a positive integer length propertytypeof arr.length === "number" &&arr.length >= 0 &&arr.length % 1 === 0);}// 遍历数组执行回调function _arrayEach(arr, iterator) {var index = -1,length = arr.length;while (++index < length) {iterator(arr[index], index, arr);}}// 由回调函数重置数组的元素项function _map(arr, iterator) {var index = -1,length = arr.length,result = Array(length);while (++index < length) {result[index] = iterator(arr[index], index, arr);}return result;}// 由0到count构成数组function _range(count) {return _map(Array(count), function (v, i) { return i; });}// 遍历数组项用回调对memo作处理function _reduce(arr, iterator, memo) {_arrayEach(arr, function (x, i, a) {memo = iterator(memo, x, i, a);});return memo;}// 遍历对象的属性执行回调function _forEachOf(object, iterator) {_arrayEach(_keys(object), function (key) {iterator(object[key], key);});}// 获取元素的index,不存在是返回-1function _indexOf(arr, item) {for (var i = 0; i < arr.length; i++) {if (arr[i] === item) return i;}return -1;}// 获取对象的属性var _keys = Object.keys || function (obj) {var keys = [];for (var k in obj) {if (obj.hasOwnProperty(k)) {keys.push(k);}}return keys;};// 返回以数组元素项序号或对象主键作为返回值的函数,每执行一次通过闭包i+1,构成遍历function _keyIterator(coll) {var i = -1;var len;var keys;if (_isArrayLike(coll)) {len = coll.length;return function next() {i++;return i < len ? i : null;};} else {keys = _keys(coll);len = keys.length;return function next() {i++;return i < len ? keys[i] : null;};}}// 约定从返回函数的第几项开始获取参数,0、1有效,返回函数末参为index值// startIndex为0时,func传参为返回函数的arguments,默认为0// startIndex为1时,func传参为返回函数的arguments[0]、其余项function _restParam(func, startIndex) {startIndex = startIndex == null ? func.length - 1 : +startIndex;return function() {var length = Math.max(arguments.length - startIndex, 0);var rest = Array(length);for (var index = 0; index < length; index++) {rest[index] = arguments[index + startIndex];}switch (startIndex) {case 0: return func.call(this, rest);case 1: return func.call(this, arguments[0], rest);}// Currently unused but handle cases outside of the switch statement:// var args = Array(startIndex + 1);// for (index = 0; index < startIndex; index++) {//     args[index] = arguments[index];// }// args[startIndex] = rest;// return func.apply(this, args);};}// 没有index的数组项处理函数function _withoutIndex(iterator) {return function (value, index, callback) {return iterator(value, callback);};}exported async module functions nextTick implementation with browser-compatible fallback // capture the global reference to guard against fakeTimer mocksvar _setImmediate = typeof setImmediate === 'function' && setImmediate;var _delay = _setImmediate ? function(fn) {// not a direct alias for IE10 compatibility_setImmediate(fn);} : function(fn) {setTimeout(fn, 0);};if (typeof process === 'object' && typeof process.nextTick === 'function') {async.nextTick = process.nextTick;} else {async.nextTick = _delay;}// 浏览器使用setTimeout、服务器端使用process.nextTick,在setImmediate函数不存在的前提下async.setImmediate = _setImmediate ? _delay : async.nextTick;// 实现功能同async.eachOf,只是iterator函数中没有数组的index值,或者没有对象属性集合的index值async.forEach =async.each = function (arr, iterator, callback) {return async.eachOf(arr, _withoutIndex(iterator), callback);};// 实现功能同async.eachOfSeries,只是iterator函数中没有数组的index值,或者没有对象属性集合的index值async.forEachSeries =async.eachSeries = function (arr, iterator, callback) {return async.eachOfSeries(arr, _withoutIndex(iterator), callback);};// 实现功能同async.eachOfLimit,只是iterator函数中没有数组的index值,或者没有对象属性集合的index值async.forEachLimit =async.eachLimit = function (arr, limit, iterator, callback) {return _eachOfLimit(limit)(arr, _withoutIndex(iterator), callback);};// 遍历object对象执行iterator,报错或遍历完成后调用callback(error)// callback(error)函数的触发时机需要手动在iterator中设置/*官方示例var obj = {dev: "/dev.json", test: "/test.json", prod: "/prod.json"};var configs = {};async.forEachOf(obj, function (value, key, callback) {fs.readFile(__dirname + value, "utf8", function (err, data) {if (err) return callback(err);try {configs[key] = JSON.parse(data);} catch (e) {return callback(e);}callback();});}, function (err) {if (err) console.error(err.message);// configs is now a map of JSON datadoSomethingWith(configs);});*/async.forEachOf =async.eachOf = function (object, iterator, callback) {callback = _once(callback || noop);object = object || [];var iter = _keyIterator(object);var key, completed = 0;while ((key = iter()) != null) {// 反复执行闭包函数iter遍历对象completed += 1;// only_once(done)中调用外部函数callback,机理同Promise-resolve相似iterator(object[key], key, only_once(done));}if (completed === 0) callback(null);function done(err) {completed--;if (err) {callback(err);}// Check key is null in case iterator isn't exhausted// and done resolved synchronously.else if (key === null && completed <= 0) {callback(null);}}};// 本次iterator延迟执行完毕后,通过回调函数only_once(done)触发下一个iterator执行// _keyIterator函数的意义遍历的节奏变得可控,可终止,可触发,通常的each方法则不能async.forEachOfSeries =async.eachOfSeries = function (obj, iterator, callback) {callback = _once(callback || noop);obj = obj || [];var nextKey = _keyIterator(obj);var key = nextKey();function iterate() {var sync = true;if (key === null) {return callback(null);}iterator(obj[key], key, only_once(function (err) {if (err) {callback(err);}else {key = nextKey();if (key === null) {return callback(null);} else {if (sync) {async.setImmediate(iterate);} else {iterate();}}}}));sync = false;}iterate();};// iterator同时执行个数限制为limit,延迟完成后加载下一个// limit=1时,eachOfLimit方法同async.eachOfSeries// limit=数组长度或对象属性集合的长度时,eachOfLimit方法同async.eachOfasync.forEachOfLimit =async.eachOfLimit = function (obj, limit, iterator, callback) {_eachOfLimit(limit)(obj, iterator, callback);};function _eachOfLimit(limit) {return function (obj, iterator, callback) {callback = _once(callback || noop);obj = obj || [];var nextKey = _keyIterator(obj);if (limit <= 0) {return callback(null);}var done = false;var running = 0;// 记录iterator执行个数,iterator启动前+1,执行完成后-1var errored = false;(function replenish () {if (done && running <= 0) {return callback(null);}while (running < limit && !errored) {var key = nextKey();if (key === null) {done = true;if (running <= 0) {callback(null);}return;}running += 1;iterator(obj[key], key, only_once(function (err) {running -= 1;if (err) {callback(err);errored = true;}else {replenish();}}));}})();};}function doParallel(fn) {return function (obj, iterator, callback) {return fn(async.eachOf, obj, iterator, callback);};}function doParallelLimit(fn) {return function (obj, limit, iterator, callback) {return fn(_eachOfLimit(limit), obj, iterator, callback);};}function doSeries(fn) {return function (obj, iterator, callback) {return fn(async.eachOfSeries, obj, iterator, callback);};}function _asyncMap(eachfn, arr, iterator, callback) {callback = _once(callback || noop);arr = arr || [];var results = _isArrayLike(arr) ? [] : {};eachfn(arr, function (value, index, callback) {iterator(value, function (err, v) {results[index] = v;callback(err);});}, function (err) {callback(err, results);});}// 使用async.eachOf方法遍历obj执行iterator,主要目的是获取results,传给最终回调函数callbackasync.map = doParallel(_asyncMap);async.mapSeries = doSeries(_asyncMap);async.mapLimit = doParallelLimit(_asyncMap);// 调用eachOfSeries遍历arr,延迟完成后更新memo,最后将memo传给最终的回调函数callbackasync.inject =async.foldl =async.reduce = function (arr, memo, iterator, callback) {async.eachOfSeries(arr, function (x, i, callback) {iterator(memo, x, function (err, v) {memo = v;// 更新memocallback(err);// 错误处理});}, function (err) {callback(err, memo);//memo不是通过传值或者闭包驻留,由上级作用域赋予});};// 自右向左遍历async.foldr =async.reduceRight = function (arr, memo, iterator, callback) {var reversed = _map(arr, identity).reverse();async.reduce(reversed, memo, iterator, callback);};// 同async.reduce,主要功能是延迟后处理memo,再将memo交给callback回调,只是不阻塞延时函数执行async.transform = function (arr, memo, iterator, callback) {if (arguments.length === 3) {callback = iterator;iterator = memo;memo = _isArray(arr) ? [] : {};}async.eachOf(arr, function(v, k, cb) {iterator(memo, v, k, cb);}, function(err) {callback(err, memo);});};function _filter(eachfn, arr, iterator, callback) {var results = [];eachfn(arr, function (x, index, callback) {iterator(x, function (v) {if (v) {results.push({index: index, value: x});}callback();});}, function () {callback(_map(results.sort(function (a, b) { // 由回调函数重置数组的元素项为results的value键值return a.index - b.index;}), function (x) {return x.value;}));});}// 调用async.eachOf方法遍历obj执行iterator,通过iterator的回调传参v过滤,返回数组results// 经过处理后,构成按序排列原元素值的数组,最终传给callback回调async.select =async.filter = doParallel(_filter);async.selectLimit =async.filterLimit = doParallelLimit(_filter);async.selectSeries =async.filterSeries = doSeries(_filter);function _reject(eachfn, arr, iterator, callback) {_filter(eachfn, arr, function(value, cb) {// cb在_filter函数中写就,cb为引用iterator(value, function(v) {cb(!v);});}, callback);}// 与async.filter的不同是,reject方法的过滤条件是,iterator的回调传参v为否值async.reject = doParallel(_reject);async.rejectLimit = doParallelLimit(_reject);async.rejectSeries = doSeries(_reject);// 以async.some方法解读// check、getResult均为传递函数,一个转化为布尔值,一个原样输出function _createTester(eachfn, check, getResult) {return function(arr, limit, iterator, cb) {function done() {if (cb) cb(getResult(false, void 0));}function iteratee(x, _, callback) {if (!cb) return callback();// 参数function(v){}由内部函数传给外部,作为iterator的回调,执行done回调iterator(x, function (v) {// v由外部函数传入,x是arr的元素项if (cb && check(v)) {// v为真、cb存在时执行cb回调,cb和iterator都赋值为false// 因为iteratee中首先判断cb是否为否值,后续遍历直接进入callback回调// 进入callback回调,callback在遍历执行完成时才有意义cb(getResult(true, x));cb = iterator = false;}callback();});}if (arguments.length > 3) {eachfn(arr, limit, iteratee, done);} else {cb = iterator;iterator = limit;eachfn(arr, iteratee, done);}};}// 调用async.eachOf方法,arr参数中有一项满足条件时,回调获得结果为真// 回调时机为遍历的每个阶段执行完毕async.any =async.some = _createTester(async.eachOf, toBool, identity);async.someLimit = _createTester(async.eachOfLimit, toBool, identity);// 调用async.eachOf方法,arr参数中每一项满足条件时,回调获得结果为真// 回调时机为遍历执行完成时async.all =async.every = _createTester(async.eachOf, notId, notId);async.everyLimit = _createTester(async.eachOfLimit, notId, notId);function _findGetResult(v, x) {return x;}// 与some方法相同,调用了_createTester函数,差别是some传参为v判断条件,detect传参为arr的元素项async.detect = _createTester(async.eachOf, identity, _findGetResult);async.detectSeries = _createTester(async.eachOfSeries, identity, _findGetResult);async.detectLimit = _createTester(async.eachOfLimit, identity, _findGetResult);// 根据用户设置的criteria排序async.sortBy = function (arr, iterator, callback) {async.map(arr, function (x, callback) {iterator(x, function (err, criteria) {if (err) {callback(err);}else {callback(null, {value: x, criteria: criteria});}});}, function (err, results) {if (err) {return callback(err);}else {callback(null, _map(results.sort(comparator), function (x) {return x.value;}));}});function comparator(left, right) {var a = left.criteria, b = right.criteria;return a < b ? -1 : a > b ? 1 : 0;}};/*---------------------------------------------------------------------------------*/// 自动以无依赖的函数到依赖执行完成的函数顺序执行函数// tasks以函数映射名为键,函数队列为值,尾参是处理results的回调函数async.auto = function (tasks, concurrency, callback) {if (typeof arguments[1] === 'function') {// concurrency is optional, shift the args.callback = concurrency;concurrency = null;}callback = _once(callback || noop);var keys = _keys(tasks);var remainingTasks = keys.length;// tasks对象共有多少键,即共多少任务// 没有任务,直接调用回调函数if (!remainingTasks) {return callback(null);}if (!concurrency) {concurrency = remainingTasks;// concurrency同时执行的任务数,默认为tasks对象有多少键}var results = {};var runningTasks = 0;// 跑的任务数,与concurrency比较以对同时跑的任务数作限制var hasError = false;// 需要依赖函数完成加载后才执行的函数,通过addListener方法添加到listeners队列// 每当一个函数加载完成,调用taskComplete执行依赖加载完成的函数项,调用removeListener更新listeners队列var listeners = [];function addListener(fn) {listeners.unshift(fn);}function removeListener(fn) {var idx = _indexOf(listeners, fn);if (idx >= 0) listeners.splice(idx, 1);}// remainingTasks-1,遍历listeners执行依赖加载完成的函数项function taskComplete() {remainingTasks--;_arrayEach(listeners.slice(0), function (fn) {fn();});}// tasks中任务函数已执行完毕的情况下,remainingTasks=0,执行最终回调函数callbackaddListener(function () {if (!remainingTasks) {callback(null, results);}});_arrayEach(keys, function (k) {if (hasError) return;var task = _isArray(tasks[k]) ? tasks[k]: [tasks[k]];// 因为startIndex为0,_restParam作用是将taskCallback赋值为function(err,args),处理resultsvar taskCallback = _restParam(function(err, args) {runningTasks--;if (args.length <= 1) {args = args[0];}if (err) {var safeResults = {};_forEachOf(results, function(val, rkey) {safeResults[rkey] = val;});safeResults[k] = args;// 关于k值的驻留???hasError = true;// 报错直接调用最终回调函数callback(err, safeResults);// 没有错误,执行taskComplete,遍历listeners执行依赖加载完成的函数项}else {results[k] = args;// results由tasks的末尾项task[task.length - 1]参数带入async.setImmediate(taskComplete);}});// 检查依赖的函数是否存在,依赖可以是数组形式,但不能依赖自身var requires = task.slice(0, task.length - 1);// 排除最后一项// prevent dead-locksvar len = requires.length;var dep;while (len--) {if (!(dep = tasks[requires[len]])) {throw new Error('Has nonexistent dependency in ' + requires.join(', '));}if (_isArray(dep) && _indexOf(dep, k) >= 0) {throw new Error('Has cyclic dependencies');}}// 依赖函数x在results中,执行函数k不在results中,且同时跑的函数个数runningTasks小于限制function ready() {// _reduce遍历数组项对尾参memo作处理,迭代器函数iterator中首参为memo,次参为元素项return runningTasks < concurrency && _reduce(requires, function (a, x) {return (a && results.hasOwnProperty(x));}, true) && !results.hasOwnProperty(k);}// task[task.length - 1]末尾项为函数,之前为依赖// taskCallback,通过调用taskComplete,遍历listeners执行依赖加载完成的函数项if (ready()) {runningTasks++;task[task.length - 1](taskCallback, results);}else {addListener(listener);}// 添加listeners函数队列的依赖函数,执行时runningTasks+1,删除listeners队列依赖function listener() {if (ready()) {runningTasks++;removeListener(listener);task[task.length - 1](taskCallback, results);}}});};// 报错时重复尝试task任务函数,直到最末一个task任务或某个task任务执行成功时,跳向最终回调async.retry = function(times, task, callback) {var DEFAULT_TIMES = 5;var DEFAULT_INTERVAL = 0;var attempts = [];var opts = {times: DEFAULT_TIMES,// task执行次数interval: DEFAULT_INTERVAL// task执行完成间隙时间};function parseTimes(acc, t){if(typeof t === 'number'){acc.times = parseInt(t, 10) || DEFAULT_TIMES;} else if(typeof t === 'object'){acc.times = parseInt(t.times, 10) || DEFAULT_TIMES;acc.interval = parseInt(t.interval, 10) || DEFAULT_INTERVAL;} else {throw new Error('Unsupported argument type for \'times\': ' + typeof t);}}var length = arguments.length;if (length < 1 || length > 3) {throw new Error('Invalid arguments - must be either (task), (task, callback), (times, task) or (times, task, callback)');} else if (length <= 2 && typeof times === 'function') {callback = task;task = times;}if (typeof times !== 'function') {parseTimes(opts, times);}opts.callback = callback;opts.task = task;function wrappedTask(wrappedCallback, wrappedResults) {function retryAttempt(task, finalAttempt) {// seriesCallback为task任务执行完成后回调函数,通过async.series方法设置// seriesCallback首参等同async.series方法中执行函数的回调函数参数err,正常执行需为否// 为真跳到最终回调// seriesCallback在async.series方法的意义是调用下一个task或最终回调return function(seriesCallback) {// task首参为函数,该函数中调用async.series方法seriesCallback,也就是设置回调时机task(function(err, result){// err为否,执行成功的前提下,跳到最终回调,结束task任务的反复执行// err为真,忽略该错误,继续执行task任务// 到最末一个task任务时,执行完毕即调用最终回调seriesCallback(!err || finalAttempt, {err: err, result: result});}, wrappedResults);};}// 两个task任务之间添加等待时间function retryInterval(interval){return function(seriesCallback){setTimeout(function(){seriesCallback(null);}, interval);};}while (opts.times) {var finalAttempt = !(opts.times-=1);attempts.push(retryAttempt(opts.task, finalAttempt));// 两个task任务之间添加等待时间if(!finalAttempt && opts.interval > 0){attempts.push(retryInterval(opts.interval));}}// function(done,data)作为async.series方法的最终回调,done为是否报错,data为传入数据async.series(attempts, function(done, data){// data为tasks处理获得的最终值,获取尾项,其余项为中间处理值data = data[data.length - 1];(wrappedCallback || opts.callback)(data.err, data.result);});}// If a callback is passed, run this as a controll flow// 没有callback参数项时,输出为函数,需要传递wrappedCallback与wrappedResultsreturn opts.callback ? wrappedTask() : wrappedTask;};// 顺序执行tasks任务函数,并为任务函数传递参数,报错或任务函数执行完毕调用callbackasync.waterfall = function (tasks, callback) {callback = _once(callback || noop);if (!_isArray(tasks)) {var err = new Error('First argument to waterfall must be an array of functions');return callback(err);}if (!tasks.length) {return callback();}function wrapIterator(iterator) {return _restParam(function (err, args) {if (err) {callback.apply(null, [err].concat(args));}else {var next = iterator.next();// async.iterator方法获取下一个执行函数if (next) {args.push(wrapIterator(next));}else {args.push(callback);}// 调用ensureAsync避免堆栈溢出// 执行iterator函数,函数尾参为下一个iterator函数或最终回调callback// 同时为下一个iterator函数或最终回调callback传递参数ensureAsync(iterator).apply(null, args);}});}wrapIterator(async.iterator(tasks))();};function _parallel(eachfn, tasks, callback) {callback = callback || noop;var results = _isArrayLike(tasks) ? [] : {};eachfn(tasks, function (task, key, callback) {task(_restParam(function (err, args) {if (args.length <= 1) {args = args[0];}results[key] = args;// 根据tasks数据类型构建results,传给最终回调,results根据作用域传递callback(err);}));}, function (err) {callback(err, results);});}// 遍历执行tasks中任务函数,数组或对象形式构建results,传给回调函数callbackasync.parallel = function (tasks, callback) {_parallel(async.eachOf, tasks, callback);};async.parallelLimit = function(tasks, limit, callback) {_parallel(_eachOfLimit(limit), tasks, callback);};async.series = function(tasks, callback) {_parallel(async.eachOfSeries, tasks, callback);};// 以可控的方式遍历执行tasks函数队列// makeCallback构建执行函数,并作为返回值// 执行函数调用过程中,执行tasks的任务函数,并调用makeCallback函数构建下一个任务函数的执行函数作为返回值// 执行函数的next方法获取下一个任务函数的执行函数async.iterator = function (tasks) {function makeCallback(index) {function fn() {// 执行函数if (tasks.length) {tasks[index].apply(null, arguments);}return fn.next();}fn.next = function () {return (index < tasks.length - 1) ? makeCallback(index + 1): null;};return fn;}return makeCallback(0);};// 返回fn的执行函数,目的是重设fn的参数async.apply = _restParam(function (fn, args) {return _restParam(function (callArgs) {return fn.apply(null, args.concat(callArgs));});});// 拼接arr元素项传参给回调,前提是没有捕获到错误,捕获到错误则略过function _concat(eachfn, arr, fn, callback) {var result = [];eachfn(arr, function (x, index, cb) {fn(x, function (err, y) {result = result.concat(y || []);cb(err);});}, function (err) {callback(err, result);});}async.concat = doParallel(_concat);async.concatSeries = doSeries(_concat);// test合格后,执行iterator,否则跳到最终回调callback,目的是处理变量async.whilst = function (test, iterator, callback) {callback = callback || noop;if (test()) {var next = _restParam(function(err, args) {// args由外部函数传入if (err) {callback(err);} else if (test.apply(this, args)) {iterator(next);// 调用下一个iterator} else {callback.apply(null, [null].concat(args));// 执行完成后最终回调,参数为null、args}});iterator(next);// 第一个iterator} else {callback(null);}};// iterator必然执行一次,其他看test检验是否合格async.doWhilst = function (iterator, test, callback) {var calls = 0;return async.whilst(function() {return ++calls <= 1 || test.apply(this, arguments);}, iterator, callback);};// 取反校验async.until = function (test, iterator, callback) {return async.whilst(function() {return !test.apply(this, arguments);}, iterator, callback);};async.doUntil = function (iterator, test, callback) {return async.doWhilst(iterator, function() {return !test.apply(this, arguments);}, callback);};// 同whilst,不同的是whilst方法校验立即执行,during方法校验需要等待使用者向test回调函数中注入参数// 可以实现异步校验,test回调参数通过延时函数获得async.during = function (test, iterator, callback) {callback = callback || noop;var next = _restParam(function(err, args) {if (err) {callback(err);} else {args.push(check);// args可以在同步处理或异步处理后使用test.apply(this, args);// args尾参中传入check,使用者调用执行}});var check = function(err, truth) {if (err) {callback(err);} else if (truth) {iterator(next);} else {callback(null);}};test(check);};async.doDuring = function (iterator, test, callback) {var calls = 0;async.during(function(next) {if (calls++ < 1) {next(null, true);} else {test.apply(this, arguments);}}, iterator, callback);};// 加载或执行任务函数(回调函数),任务函数的目的是启动任务函数中的回调函数function _queue(worker, concurrency, payload) {if (concurrency == null) {concurrency = 1;}else if(concurrency === 0) {throw new Error('Concurrency must not be zero');}// q.tasks待执行的任务函数队列添加任务function _insert(q, data, pos, callback) {// pos头部插入任务队列q.tasksif (callback != null && typeof callback !== "function") {throw new Error("task callback must be a function");}q.started = true;if (!_isArray(data)) {data = [data];}if(data.length === 0 && q.idle()) {// call drain immediately if there are no tasksreturn async.setImmediate(function() {q.drain();});}_arrayEach(data, function(task) {var item = {data: task,callback: callback || noop};if (pos) {q.tasks.unshift(item);} else {q.tasks.push(item);}if (q.tasks.length === q.concurrency) {q.saturated();}});async.setImmediate(q.process);}// 一个任务执行完成后回调,执行任务数workers-1,更新执行任务队列workersList// 执行任务固有的回调函数task.callback,调用执行下一个任务function _next(q, tasks) {return function(){workers -= 1;var removed = false;var args = arguments;// 更新workersList执行任务队列_arrayEach(tasks, function (task) {_arrayEach(workersList, function (worker, index) {if (worker === task && !removed) {workersList.splice(index, 1);removed = true;}});// 执行任务固有的回调函数task.callback.apply(task, args);});if (q.tasks.length + workers === 0) {q.drain();}q.process();};}var workers = 0;// 执行中的任务数量var workersList = [];// 执行中的任务var q = {tasks: [],// 任务函数队列concurrency: concurrency,// 同步执行的worker个数payload: payload,saturated: noop,// 空函数,改写后实现功能,同步执行达到限制时执行empty: noop,// 改写实现功能,任务为空时执行drain: noop,// 改写实现功能,所有任务都执行完成后调用started: false,paused: false,push: function (data, callback) {// 尾部插入_insert(q, data, false, callback);},kill: function () {// 清空q.tasks任务函数队列q.drain = noop;q.tasks = [];},unshift: function (data, callback) {// 头部插入_insert(q, data, true, callback);},process: function () {while(!q.paused && workers < q.concurrency && q.tasks.length){// 从q.tasks中取出任务,并更新q.tasks需要执行的任务函数队列// tasks一个任务单元,任务单元中的任务同步执行var tasks = q.payload ?q.tasks.splice(0, q.payload) :q.tasks.splice(0, q.tasks.length);// 预加载的任务函数数量var data = _map(tasks, function (task) {return task.data;});if (q.tasks.length === 0) {q.empty();}workers += 1;workersList.push(tasks[0]);var cb = only_once(_next(q, tasks));worker(data, cb);// _queue函数参数worker执行过程中调用cb函数执行下一个任务函数}},length: function () {// 需要执行的任务return q.tasks.length;},running: function () {// 同步执行的任务数return workers;},workersList: function () {// 同步执行的任务return workersList;},idle: function() {// 所有任务都已执行完成return q.tasks.length + workers === 0;},pause: function () {// 中断process函数执行q.paused = true;},resume: function () {// 恢复执行if (q.paused === false) { return; }q.paused = false;var resumeCount = Math.min(q.concurrency, q.tasks.length);// Need to call q.process once per concurrent// worker to preserve full concurrency after pausefor (var w = 1; w <= resumeCount; w++) {async.setImmediate(q.process);}}};return q;}// 返回queue对象,听过该对象添加或执行任务函数,通过调用worker函数执行任务函数// concurrency限制同步执行的任务个数async.queue = function (worker, concurrency) {// push进queue对象的任务函数通过调用worker执行其功能var q = _queue(function (items, cb) {worker(items[0], cb);}, concurrency, 1);return q;};// 内部调用async.queue方法返回queue对象,只是删除了unshift方法、改写了push方法// 设置任务函数执行的优先级async.priorityQueue = function (worker, concurrency) {// 比较优先级,越低越高优先级function _compareTasks(a, b){return a.priority - b.priority;}// 通过不断和拆分数组的中值作比较,得出item应该插到sequence数组中哪个位置function _binarySearch(sequence, item, compare) {var beg = -1,end = sequence.length - 1;while (beg < end) {var mid = beg + ((end - beg + 1) >>> 1);// 无符号右移,即除2,忽略余数if (compare(item, sequence[mid]) >= 0) {beg = mid;} else {end = mid - 1;}}return beg;}function _insert(q, data, priority, callback) {if (callback != null && typeof callback !== "function") {throw new Error("task callback must be a function");}q.started = true;if (!_isArray(data)) {data = [data];}if(data.length === 0) {// call drain immediately if there are no tasksreturn async.setImmediate(function() {q.drain();});}_arrayEach(data, function(task) {var item = {data: task,priority: priority,callback: typeof callback === 'function' ? callback : noop};q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item);if (q.tasks.length === q.concurrency) {q.saturated();}async.setImmediate(q.process);});}// Start with a normal queuevar q = async.queue(worker, concurrency);// Override push to accept second parameter representing priorityq.push = function (data, priority, callback) {_insert(q, data, priority, callback);};// Remove unshift functiondelete q.unshift;return q;};// 返回queue对象,听过该对象添加或执行任务函数,通过调用worker函数执行任务函数// payload单次执行的任务单元中包含几条任务函数,这几条任务函数和下一个任务单元同步执行async.cargo = function (worker, payload) {return _queue(worker, 1, payload);};function _console_fn(name) {return _restParam(function (fn, args) {// 通过内部函数向外部函数传递回调函数function(err,args)作为参数// 外部函数中设置回调函数的执行时机// 实际意义似乎和在异步函数中直接调用console.log()没差别fn.apply(null, args.concat([_restParam(function (err, args) {if (typeof console === 'object') {if (err) {if (console.error) {console.error(err);}}else if (console[name]) {_arrayEach(args, function (x) {console[name](x);});}}})]));});}async.log = _console_fn('log');async.dir = _console_fn('dir');// 显示dom试图/*async.info = _console_fn('info');async.warn = _console_fn('warn');async.error = _console_fn('error');*/// 传参相同,fn已执行的状态下,重复调用回调函数,或fn执行中,添加回调函数async.memoize = function (fn, hasher) {var memo = {};var queues = {};var has = Object.prototype.hasOwnProperty;hasher = hasher || identity;var memoized = _restParam(function memoized(args) {var callback = args.pop();var key = hasher.apply(null, args);if (has.call(memo, key)) {   // 已执行,立即执行fn的回调async.setImmediate(function () {callback.apply(null, memo[key]);});}else if (has.call(queues, key)) {// fn执行过程中,参数args相同,添加回调callbackqueues[key].push(callback);}else {queues[key] = [callback];// 执行前fn回调添加到queues队列fn.apply(null, args.concat([_restParam(function (args) {memo[key] = args;// fn执行完成更新memo记录已执行,对象的键可以是数组、对象等数据类型var q = queues[key];delete queues[key];// 执行完清空该fn回调队列queues[key]for (var i = 0, l = q.length; i < l; i++) {q[i].apply(null, args);// 执行fn的回调}})]));}});memoized.memo = memo;memoized.unmemoized = fn;// 没有memo记录的直接调用return memoized;};// 对async.memoize返回对象或普通函数fn作处理,视为普通函数处理async.unmemoize = function (fn) {return function () {return (fn.unmemoized || fn).apply(null, arguments);};};function _times(mapper) {return function (count, iterator, callback) {// _range(count)输出为0到count构成数组mapper(_range(count), iterator, callback);};}// 设置iterator的调用次数countasync.times = _times(async.map);async.timesSeries = _times(async.mapSeries);async.timesLimit = function (count, limit, iterator, callback) {return async.mapLimit(_range(count), limit, iterator, callback);};// async.reduce的简化方案,遍历执行函数,对传递数据作处理后传给各回调async.seq = function (/* functions... */) {var fns = arguments;return _restParam(function (args) {var that = this;var callback = args[args.length - 1];if (typeof callback == 'function') {args.pop();} else {callback = noop;}async.reduce(fns, args, function (newargs, fn, cb) {fn.apply(that, newargs.concat([_restParam(function (err, nextargs) {cb(err, nextargs);})]));},function (err, results) {callback.apply(that, [err].concat(results));});});};async.compose = function (/* functions... */) {return async.seq.apply(null, Array.prototype.reverse.call(arguments));};function _applyEach(eachfn) {return _restParam(function(fns, args) {var go = _restParam(function(args) {var that = this;var callback = args.pop();// 尾项为回调函数return eachfn(fns, function (fn, _, cb) {fn.apply(that, args.concat([cb]));},callback);});if (args.length) {return go.apply(this, args);}else {return go;}});}// 调用async.eachOf遍历执行fns函数,fns函数参数均相同,且为args、callbackasync.applyEach = _applyEach(async.eachOf);async.applyEachSeries = _applyEach(async.eachOfSeries);// fn执行完毕,使用next函数,或者调用自身,或者报错时执行回调async.forever = function (fn, callback) {var done = only_once(callback || noop);var task = ensureAsync(fn);function next(err) {if (err) {return done(err);}task(next);}next();};function ensureAsync(fn) {// 输入参数最后一项是回调函数,sync为真同步执行状态,为否异步执行状态// 改写回调项,async.setImmediate跟普通调用callback的区别是没有堆栈溢出?return _restParam(function (args) {var callback = args.pop();args.push(function () {var innerArgs = arguments;if (sync) {async.setImmediate(function () {callback.apply(null, innerArgs);});} else {callback.apply(null, innerArgs);}});var sync = true;fn.apply(this, args);sync = false;});}// 改写参数函数fn中参数的回调函数callback设置,callback启动时机依然需使用者触发// 改写callback的目的避免同步函数的内存浪费async.ensureAsync = ensureAsync;// 重设callback首参err为nullasync.constant = _restParam(function(values) {var args = [null].concat(values);return function (callback) {return callback.apply(this, args);};});// 执行func,若返回值为thenable对象,再次执行该thenable对象的then方法,若不是,直接调用回调函数async.wrapSync =async.asyncify = function asyncify(func) {return _restParam(function (args) {var callback = args.pop();var result;try {result = func.apply(this, args);} catch (e) {return callback(e);}// if result is Promise objectif (_isObject(result) && typeof result.then === "function") {result.then(function(value) {callback(null, value);})["catch"](function(err) {callback(err.message ? err : new Error(err));});} else {callback(null, result);}});};// Node.jsif (typeof module === 'object' && module.exports) {module.exports = async;}// AMD / RequireJSelse if (typeof define === 'function' && define.amd) {define([], function () {return async;});}// included directly via <script> tagelse {root.async = async;}}());

node.js async循环数组的方法相关推荐

  1. js中的数组方法以及循环数组的方法

    目录 前言 一.数组是什么? 二.数组基本操作方法 1.创建数组 2.常用数组方法 三.循环数组的方法 总结 前言 数组是用来存储元素的一种非常重要的方式,掌握常用的数组方法以及学会遍历数组是学习编程 ...

  2. Linux事件循环阻塞,深入浅析Node.js 事件循环、定时器和process.nextTick()

    什么是事件循环 尽管JavaScript是单线程的,但通过尽可能将操作放到系统内核执行,事件循环允许Node.js执行非阻塞I/O操作. 由于现代大多数内核都是多线程的,因此它们可以处理在后台执行的多 ...

  3. js便利json 数组的方法

    js便利json 数组的方法 通过Jason对象获取里面某个键的值方法: 1,对象["键"]. 2,对象.键. 这篇文章主要介绍了JQuery遍历json数组的3种方法,本文分别给 ...

  4. html设置数组的方法,js改变原数组的方法有哪些?

    js改变原数组的方法 1.pop() pop():删除 arrayObject 的最后一个元素,把数组长度减 1,并且返回它删除的元素的值.如果数组已经为空,则 pop() 不 改变数组,并返回 un ...

  5. JS 复制新数组的方法

    JS 复制新数组的方法 需求:将旧数组复制新数组,不是引用 方法1: arr = [1,2,3]; let newArr = new Array(); arr.forEach((item) => ...

  6. node.js取参四种方法req.body,req.params,req.param,req.body

    node.js取参四种方法req.body,req.params,req.param,req.body 参考:https://my.oschina.net/u/2519530/blog/535309 ...

  7. js对象转数组的方法一种方法

    js对象转数组的方法一种方法 let obj = { name: '张三', age: 18, sex: '男' };let a = Object.values(obj) //属性值 // ['张三' ...

  8. node.js中的url.parse方法

    学习node.js中的url.parse方法 文章目录 前言 一.URL模块之parse方法详解 1.参数 2.实例 例子1:url.parse只传一个参数的情况 例子2:url.parse第二个参数 ...

  9. 八七、Node.js事件循环与多进程

    nodejs事件循环与多进程 why 事件循环对于深入理解nodejs异步至关重要 fs, net,http,events 事件循环是企业面试中的最高频考题之一 能驾驭nodejs多进程是一名资深前端 ...

最新文章

  1. 向量算子优化Vector Operation Optimization
  2. 从Java到Spring为何独得青睐Spring Summit 2017不可不知的那些事儿
  3. 报名丨2019全球AI文创大赛启动仪式邀您参加!
  4. OpenAI的GPT-3花费了1200万美元,现在放出商用API,人人皆可用
  5. c语言funcode空格消失的函数,01北科大暑期计算机实践FunCode游戏设计+C++课程设计 - 海底世界 - 图文...
  6. Android网络传输中必用的两个加密算法:MD5 和 RSA (附java完成测试代码)
  7. XPS reader for Silverlight
  8. zuul路由前缀配置
  9. Qt Creator导出QML
  10. 6-7 求链表的倒数第m个元素 (25 分)
  11. 最大流,最小费用最大流:解析 + 各种板子
  12. Reddit程序员的酒后真言
  13. 【嵌入式Linux】嵌入式Linux驱动开发基础知识之总线设备驱动模型
  14. POJ NOI MATH-7827 质数的和与积
  15. JAVA类型转换系列文章一
  16. 腾讯X5 内核 的导入
  17. 网页版微博HTML解析和提取,爬虫聚焦——以新浪微博为例
  18. 02时态(2):一般现在时、疑问句主语相同的句子
  19. 对角化求可逆矩阵_「线性代数」求可逆矩阵P,使得相似矩阵对角化
  20. 情商高手与小白的言辞,差别究竟在哪里?

热门文章

  1. Ubuntu 18.04 Server 设置静态IP
  2. FLUENT-UDF日记-10-DEFINE_RW_FILE
  3. Spring源码学习(一)源码如何变成可编辑
  4. 录音转文字助手,会议记录的好帮手,学生也能轻松完成笔记记录!
  5. 打破双亲委派机制有什么用_tomcat打破双亲委派机制
  6. java游戏super赛亚人传说,龙珠激斗之赛亚人传说
  7. GOLDENGATE运维手册
  8. 我们眼中土的掉渣的白球鞋,却在法国成为时尚界的传奇
  9. c语言程序设计实训目的,c语言程序设计实验报告(三)
  10. TX2的开机测试 刷机过程 功耗模式选择 源更新 语言修改 拼音输入法设置