一、前言
对于前端工程师,准确来说对于javascript开发来说,异步流程太多,导致回调嵌套过深,一则代码不好看,二则读起来也比较晦涩难懂。另外异步并发如果串行执行,效率不高,要实现并行处理统一回调就只能自己封装代码。
页面开发这样的问题一直存在,只是以前一般只需要和cgi进行交互,流程处理相对比较简单,需求没有那么迫切。现在我们很多页面都是内嵌到客户端,不仅需要和cgi交互,也需要和客户端进行交互,流程变得越来越复杂和多样,流程控制组件的使用就变得比较迫切
本文是分析step.js的用法以及其实现细节。
二、step的使用
Step.js是控制流程工具,特点是简单、轻量、源码少易于扩展。
适用于Ajax请求、读文件、查询数据库等回调函数相互依赖,或者分别获取内容最后组合数据返回等应用情景。能够解决代码中的“回调地域”等问题
流程控制可以简单地分为“串行”和“并行”,以及一种支持不确定异步函数数量时“并行”的操作。
(一)、串行
Step.js提供了一个Step(fn1,fn2,....)
方法,参数是多个需要串行执行的函数。
demo如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| Step( function readText(){ fs.readFile('a.txt', this); }, function getText(err, text) { text = text + '23333'; return text; }, function showText(ntext) { console.log(ntext); } )
|
从上面的代码中可以看到,Step.js有两种控制控制流程的方式:使用this
和return
。
两者的区别在于,只要是同步的工作,我们只需直接 return 即可。如果需要异步的话,则调用对象指针 this,此时 this 是一个函数类型的对象。
还有一个需要注意的地方是:
Step 使用this
的时候回调函数的第一个参数总是 err,第二个才是值。如果上一个步骤发生异常,那么异常对象将被送入到下一个步骤中。
但是在我们公共库的Step.js的源码中,这部分被修改了,没有对异常做捕获,即使用this的时候第一个参数并不是err
(二)、并行
并行执行就是函数同时执行,程序等待所有回调产生结果并把所有结果按调用次序组成数组,作为参数传给最后一个函数处理。
1 2 3 4 5 6 7 8 9 10
| Step( function readSelf(){ fs.readFile('a.txt', this.parallel()); //读取文本a的值 fs.readFile('b.txt', this.parallel()); //读取文本b的值 }, function showIt(err,a,b) { //函数接收的参数是按照第一个为err,后面依次按照调用this.parallel()的次数和顺序来的 console.log(a+b); } )
|
this.parallel() 可以帮助解决我们解决只能产生一次异步函数的场景,适合多个异步函数的分发,最后完成阶段由最后一个回调按照调用顺序收集结果数据。所以调用了n次的 this.parallel(),就调用了n次的收集 results 的函数。
(三)、this.group()
有些时候,我们想要遍历某个文件夹目录下面的文件,但文件数量又对我们不透明时,我们便需要使用this.group()来解决这种不确定多个异步任务数量的场景。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Step( function readDir() { fs.readdir(__dirname, this); }, function readFiles(err, results) { if (err) throw err; var group = this.group(); //创建 group,其实内部创建一个results数组,保存结果 results.forEach(function (filename) { if (/\.js$/.test(filename)) { fs.readFile(__dirname + "/" + filename, 'utf8', group()); //这里传入的回调函数是group(),其内部逻辑与 this.parallel相似。 } }); }, function showAll(err , files) { //这里参数files即是上面回调函数的结果集 if (err) throw err; console.dir(files); } );
|
this.group 与 this.parallel 非常类似都是用于异步场景,从表面上看它们之间的区别有点象 fn.call() 与 fn.apply() 之间的区别。
三、源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
| function Step() { var steps = Array.prototype.slice.call(arguments), // 参数列表 counter, pending, results, lock; // 全局的四个变量
//定义主流程函数 function next() { counter = pending = 0; // counter 和 pendig分别是保存数据的索引和是否执行完毕的计数器
// 看看是否还有剩余的 steps if (steps.length === 0) { // 当全部流程走完后,发现还存在参数列表,则说明该参数列表中保存的为异常信息 if (arguments[0]) { //将异常抛出 throw arguments[0]; } //整个流程正常结束 return; }
// 获取下一个要执行的step var fn = steps.shift();
//保存并行执行时的结果集 results = [];
//此处捕获异常(我们公共库中没有做异常捕获) try { lock = true; var result = fn.apply(next, arguments); // 此时 args 是 next 的参数列表 } catch (e) { // 如果有异常,把捕捉到的异常送入下一个回调 next(e); }
if (counter > 0 && pending == 0) { //couter > 0 表示有并行任务,pending == 0 表示全部运行完毕 // 如果执行了并行任务,而且全部分支都同步执行完毕后,立刻执行下一步 next.apply(null, results); // 注意这里是 results 数组。此时results已经包含了全部的结果 } else if (result !== undefined) { // 如发现有return执行,将其送入到回调 next(undefined, result); } lock = false; }
// 用于并行的生成器 next.parallel = function () { var index = 1 + counter++; //这里计数器先从1开始,是给error预留了一个位置。(我们公共库中没有对异常进行捕获,所以也没有对这个部分的处理) pending++; // 开启了一个新的异步任务
return function () { pending--;// 计算器减 1,表示执行完毕 // 如果有错误,则保存在结果数组的第一个元素中 if (arguments[0]) { results[0] = arguments[0]; } // 按次序保存结果 results[index] = arguments[1]; if (!lock && pending === 0) {// 最后才执行 // 当所有分支都搞定,执行最后一个回调。 next.apply(null, results); } }; };
next.group = function () { var localCallback = next.parallel(); var counter = 0; var pending = 0; var result = []; var error = undefined;
function check() { if (pending === 0) { //复用this.parallel()的方式将结果传回。 localCallback(error, result); } } process.nextTick(check);//每次待函数执行完后,检查一次是否所有结果都返回
return function () { var index = counter++; pending++; return function () { //触发一次回调 pending--; if (arguments[0]) { //结果集中保存error信息 error = arguments[0]; } // 把结果(从下标1开始,0保存错误信息)存到结果集中 result[index] = arguments[1]; if (!lock) { check(); } }; }; };
// 开始工作流 next(); }
//对外暴露的一个函数,按我的理解是用来包装步骤列表的 Step.fn = function StepFn() { var steps = Array.prototype.slice.call(arguments); return function () { var args = Array.prototype.slice.call(arguments); var toRun = [function () { this.apply(null, args); }].concat(steps); if (typeof args[args.length-1] === 'function') { toRun.push(args.pop()); } Step.apply(null, toRun); } }
// CommonJS模块系统的钩子 if (typeof module !== 'undefined' && "exports" in module) { module.exports = Step; }
|