Skip to content

Commit 0625449

Browse files
add LimitedParallelStream.js
1 parent 1529b97 commit 0625449

File tree

1 file changed

+50
-0
lines changed

1 file changed

+50
-0
lines changed

limitedParallelStream.js

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* 基于stream的异步并行处理解决方案, 有并发总数限制
3+
* */
4+
"use strict";
5+
6+
const stream = require('stream');
7+
8+
class LimitedParallelStream extends stream.Transform {
9+
constructor(concurrency, userTransform) {
10+
super({objectMode: true});
11+
this.concurrency = concurrency;
12+
this.userTransform = userTransform;
13+
this.running = 0;
14+
this.terminateCallback = null;
15+
this.continueCallback = null;
16+
}
17+
18+
_transform(chunk, enc, done) {
19+
this.running++;
20+
this.userTransform(chunk, enc, this.push.bind(this), this._onComplete.bind(this));
21+
if(this.running < this.concurrency) {
22+
done();
23+
} else {
24+
this.continueCallback = done;
25+
}
26+
}
27+
28+
_flush(done) {
29+
if(this.running > 0) {
30+
this.terminateCallback = done;
31+
} else {
32+
done();
33+
}
34+
}
35+
36+
_onComplete(err) {
37+
this.running--;
38+
if(err) {
39+
return this.emit('error', err);
40+
}
41+
const tmpCallback = this.continueCallback;
42+
this.continueCallback = null;
43+
tmpCallback && tmpCallback();
44+
if(this.running === 0) {
45+
this.terminateCallback && this.terminateCallback();
46+
}
47+
}
48+
}
49+
50+
module.exports = LimitedParallelStream;

0 commit comments

Comments
 (0)