Skip to content

Commit 1529b97

Browse files
add ParalleStream.js
1 parent 391eef7 commit 1529b97

File tree

1 file changed

+42
-0
lines changed

1 file changed

+42
-0
lines changed

ParallelStream.js

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* 基于stream的异步并行处理解决方案
3+
* */
4+
5+
"use strict";
6+
7+
const stream = require('stream');
8+
9+
class ParallelStream extends stream.Transform {
10+
constructor(userTransform) {
11+
super({objectMode: true});
12+
this.userTransform = userTransform;
13+
this.running = 0;
14+
this.terminateCallback = null;
15+
}
16+
17+
_transform(chunk, enc, done) {
18+
this.running++;
19+
this.userTransform(chunk, enc, this._onComplete.bind(this), this.push.bind(this));
20+
done();
21+
}
22+
23+
_flush(done) {
24+
if(this.running > 0) {
25+
this.terminateCallback = done;
26+
} else {
27+
done();
28+
}
29+
}
30+
31+
_onComplete(err) {
32+
this.running--;
33+
if(err) {
34+
return this.emit('error', err);
35+
}
36+
if(this.running === 0) {
37+
this.terminateCallback && this.terminateCallback();
38+
}
39+
}
40+
}
41+
42+
module.exports = ParallelStream;

0 commit comments

Comments
 (0)