Step.js源码分析

简单、易用、轻量的流程控制利器Step.js

Posted by Mr.Harry on 2016-11-15

一、前言

对于前端工程师,准确来说对于javascript开发来说,异步流程太多,导致回调嵌套过深,一则代码不好看,二则读起来也比较晦涩难懂。另外异步并发如果串行执行,效率不高,要实现并行处理统一回调就只能自己封装代码。

页面开发这样的问题一直存在,只是以前一般只需要和cgi进行交互,流程处理相对比较简单,需求没有那么迫切。现在我们很多页面都是内嵌到客户端,不仅需要和cgi交互,也需要和客户端进行交互,流程变得越来越复杂和多样,流程控制组件的使用就变得比较迫切

本文是分析step.js的用法以及其实现细节。

二、step的使用

Step.js是控制流程工具,特点是简单、轻量、源码少易于扩展。
适用于Ajax请求、读文件、查询数据库等回调函数相互依赖,或者分别获取内容最后组合数据返回等应用情景。能够解决代码中的“回调地域”等问题

流程控制可以简单地分为“串行”和“并行”,以及一种支持不确定异步函数数量时“并行”的操作。

(一)、串行

Step.js提供了一个Step(fn1,fn2,....)方法,参数是多个需要串行执行的函数。
demo如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
Step(
function readText(){
fs.readFile('a.txt', this);
//这里是读取本地文件,传入this作为回调函数(this即为流程中的下一个函数,在这里是getText,并有接收相应的参数)
},
function getText(err, text) {
text = text + '23333';
return text; //这里是使用return来控制流程,表示将return的值传递到下一个需要被流程执行的函数中(这里是showText)
},
function showText(ntext) {
console.log(ntext);
}
)

从上面的代码中可以看到,Step.js有两种控制控制流程的方式:使用thisreturn
两者的区别在于,只要是同步的工作,我们只需直接 return 即可。如果需要异步的话,则调用对象指针 this,此时 this 是一个函数类型的对象。

还有一个需要注意的地方是:
Step 使用this的时候回调函数的第一个参数总是 err,第二个才是值。如果上一个步骤发生异常,那么异常对象将被送入到下一个步骤中。
但是在我们公共库的Step.js的源码中,这部分被修改了,没有对异常做捕获,即使用this的时候第一个参数并不是err

(二)、并行

并行执行就是函数同时执行,程序等待所有回调产生结果并把所有结果按调用次序组成数组,作为参数传给最后一个函数处理。

1
2
3
4
5
6
7
8
9
10
Step(
function readSelf(){
fs.readFile('a.txt', this.parallel()); //读取文本a的值
fs.readFile('b.txt', this.parallel()); //读取文本b的值
},
function showIt(err,a,b) {
//函数接收的参数是按照第一个为err,后面依次按照调用this.parallel()的次数和顺序来的
console.log(a+b);
}
)

this.parallel() 可以帮助解决我们解决只能产生一次异步函数的场景,适合多个异步函数的分发,最后完成阶段由最后一个回调按照调用顺序收集结果数据。所以调用了n次的 this.parallel(),就调用了n次的收集 results 的函数。

(三)、this.group()

有些时候,我们想要遍历某个文件夹目录下面的文件,但文件数量又对我们不透明时,我们便需要使用this.group()来解决这种不确定多个异步任务数量的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Step(
function readDir() {
fs.readdir(__dirname, this);
},
function readFiles(err, results) {
if (err) throw err;
var group = this.group(); //创建 group,其实内部创建一个results数组,保存结果
results.forEach(function (filename) {
if (/\.js$/.test(filename)) {
fs.readFile(__dirname + "/" + filename, 'utf8', group()); //这里传入的回调函数是group(),其内部逻辑与 this.parallel相似。
}
});
},
function showAll(err , files) {
//这里参数files即是上面回调函数的结果集
if (err) throw err;
console.dir(files);
}
);

this.group 与 this.parallel 非常类似都是用于异步场景,从表面上看它们之间的区别有点象 fn.call() 与 fn.apply() 之间的区别。

三、源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
function Step() {
var steps = Array.prototype.slice.call(arguments), // 参数列表
counter, pending, results, lock; // 全局的四个变量

//定义主流程函数
function next() {
counter = pending = 0; // counter 和 pendig分别是保存数据的索引和是否执行完毕的计数器

// 看看是否还有剩余的 steps
if (steps.length === 0) {
// 当全部流程走完后,发现还存在参数列表,则说明该参数列表中保存的为异常信息
if (arguments[0]) {
//将异常抛出
throw arguments[0];
}
//整个流程正常结束
return;
}

// 获取下一个要执行的step
var fn = steps.shift();

//保存并行执行时的结果集
results = [];

//此处捕获异常(我们公共库中没有做异常捕获)
try {
lock = true;
var result = fn.apply(next, arguments); // 此时 args 是 next 的参数列表
} catch (e) {
// 如果有异常,把捕捉到的异常送入下一个回调
next(e);
}

if (counter > 0 && pending == 0) {
//couter > 0 表示有并行任务,pending == 0 表示全部运行完毕
// 如果执行了并行任务,而且全部分支都同步执行完毕后,立刻执行下一步
next.apply(null, results); // 注意这里是 results 数组。此时results已经包含了全部的结果
} else if (result !== undefined) {
// 如发现有return执行,将其送入到回调
next(undefined, result);
}
lock = false;
}

// 用于并行的生成器
next.parallel = function () {
var index = 1 + counter++; //这里计数器先从1开始,是给error预留了一个位置。(我们公共库中没有对异常进行捕获,所以也没有对这个部分的处理)
pending++; // 开启了一个新的异步任务

return function () {
pending--;// 计算器减 1,表示执行完毕
// 如果有错误,则保存在结果数组的第一个元素中
if (arguments[0]) {
results[0] = arguments[0];
}
// 按次序保存结果
results[index] = arguments[1];
if (!lock && pending === 0) {// 最后才执行
// 当所有分支都搞定,执行最后一个回调。
next.apply(null, results);
}
};
};

next.group = function () {
var localCallback = next.parallel();
var counter = 0;
var pending = 0;
var result = [];
var error = undefined;

function check() {
if (pending === 0) {
//复用this.parallel()的方式将结果传回。
localCallback(error, result);
}
}
process.nextTick(check);//每次待函数执行完后,检查一次是否所有结果都返回

return function () {
var index = counter++;
pending++;
return function () {
//触发一次回调
pending--;
if (arguments[0]) {
//结果集中保存error信息
error = arguments[0];
}
// 把结果(从下标1开始,0保存错误信息)存到结果集中
result[index] = arguments[1];
if (!lock) { check(); }
};
};
};

// 开始工作流
next();
}

//对外暴露的一个函数,按我的理解是用来包装步骤列表的
Step.fn = function StepFn() {
var steps = Array.prototype.slice.call(arguments);
return function () {
var args = Array.prototype.slice.call(arguments);
var toRun = [function () {
this.apply(null, args);
}].concat(steps);
if (typeof args[args.length-1] === 'function') {
toRun.push(args.pop());
}
Step.apply(null, toRun);
}
}

// CommonJS模块系统的钩子
if (typeof module !== 'undefined' && "exports" in module) {
module.exports = Step;
}