Skip to content

Commit d5d39f7

Browse files
trxcllntTheNeuralBit
authored andcommitted
ARROW-2779: [JS] stream reader fixes
This is a stop-gap fix for node-stream compatibility. Some APIs (like node's `http` module) still only accept Buffer instances, so this PR emits those if we're in node. Also includes a small fix in `fromReadableStream` to ensure it doesn't try to read the length if there's not enough bytes. Author: ptaylor <[email protected]> Closes apache#2201 from trxcllnt/js-stream-reader-fixes and squashes the following commits: d229a55 <ptaylor> Merge branch 'master' into js-stream-reader-fixes 2c2e694 <ptaylor> emit native Buffers in node, else Uint8Arrays ab02407 <ptaylor> silence node's experimental async-iterator warnings f483567 <ptaylor> Merge branch 'js-stream-reader-fixes' of github.com:trxcllnt/arrow into js-stream-reader-fixes 3425104 <ptaylor> Update dependencies 481d15a <ptaylor> guard against reading data from 0-length buffer d411871 <ptaylor> guard against reading data from 0-length buffer 9b199c3 <ptaylor> finish implementing unions in the JS reader 953232c <ptaylor> write schema metadata
1 parent 0175167 commit d5d39f7

File tree

11 files changed

+89
-78
lines changed

11 files changed

+89
-78
lines changed

integration/integration_test.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -1123,21 +1123,21 @@ def validate(self, json_path, arrow_path):
11231123
return self._run(self.VALIDATE, arrow_path, json_path, 'VALIDATE')
11241124

11251125
def json_to_file(self, json_path, arrow_path):
1126-
cmd = ['node', self.JSON_TO_ARROW, '-a', arrow_path, '-j', json_path]
1126+
cmd = ['node', '--no-warnings', self.JSON_TO_ARROW, '-a', arrow_path, '-j', json_path]
11271127
cmd = ' '.join(cmd)
11281128
if self.debug:
11291129
print(cmd)
11301130
os.system(cmd)
11311131

11321132
def stream_to_file(self, stream_path, file_path):
1133-
cmd = ['cat', stream_path, '|', 'node', self.STREAM_TO_FILE, '>', file_path]
1133+
cmd = ['cat', stream_path, '|', 'node', '--no-warnings', self.STREAM_TO_FILE, '>', file_path]
11341134
cmd = ' '.join(cmd)
11351135
if self.debug:
11361136
print(cmd)
11371137
os.system(cmd)
11381138

11391139
def file_to_stream(self, file_path, stream_path):
1140-
cmd = ['cat', file_path, '|', 'node', self.FILE_TO_STREAM, '>', stream_path]
1140+
cmd = ['cat', file_path, '|', 'node', '--no-warnings', self.FILE_TO_STREAM, '>', stream_path]
11411141
cmd = ' '.join(cmd)
11421142
if self.debug:
11431143
print(cmd)

js/lerna.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"lerna": "2.0.0",
2+
"lerna": "2.11.0",
33
"version": "0.1.1",
44
"packages": [
55
"targets/ts",

js/package.json

+24-24
Original file line numberDiff line numberDiff line change
@@ -53,53 +53,53 @@
5353
"npm-release.sh"
5454
],
5555
"dependencies": {
56-
"@types/flatbuffers": "1.6.5",
57-
"@types/node": "10.0.8",
56+
"@types/flatbuffers": "1.9.0",
57+
"@types/node": "10.5.1",
5858
"@types/text-encoding-utf-8": "1.0.1",
59-
"command-line-args": "5.0.1",
60-
"command-line-usage": "4.1.0",
61-
"flatbuffers": "trxcllnt/flatbuffers-esm",
59+
"command-line-args": "5.0.2",
60+
"command-line-usage": "5.0.5",
61+
"flatbuffers": "1.9.0",
6262
"json-bignum": "0.0.3",
6363
"text-encoding-utf-8": "1.0.2",
64-
"tslib": "1.9.0"
64+
"tslib": "1.9.3"
6565
},
6666
"devDependencies": {
6767
"@std/esm": "0.26.0",
6868
"@types/glob": "5.0.35",
6969
"@types/jest": "22.2.3",
7070
"babel-jest": "22.4.3",
7171
"benchmark": "2.1.4",
72-
"coveralls": "3.0.0",
72+
"coveralls": "3.0.2",
7373
"del": "3.0.0",
7474
"glob": "7.1.2",
75-
"google-closure-compiler": "20180506.0.0",
75+
"google-closure-compiler": "20180610.0.2",
7676
"gulp": "github:gulpjs/gulp#6d71a658c61edb3090221579d8f97dbe086ba2ed",
7777
"gulp-json-transform": "0.4.5",
78-
"gulp-rename": "1.2.2",
79-
"gulp-sourcemaps": "2.6.3",
80-
"gulp-typescript": "3.2.4",
81-
"ix": "2.3.4",
78+
"gulp-rename": "1.3.0",
79+
"gulp-sourcemaps": "2.6.4",
80+
"gulp-typescript": "4.0.2",
81+
"ix": "2.3.5",
8282
"jest": "22.4.3",
8383
"jest-environment-node-debug": "2.0.0",
8484
"json": "9.0.6",
85-
"lerna": "2.7.1",
86-
"lint-staged": "6.0.1",
87-
"merge2": "1.2.1",
85+
"lerna": "2.11.0",
86+
"lint-staged": "7.2.0",
87+
"merge2": "1.2.2",
8888
"mkdirp": "0.5.1",
89-
"npm-run-all": "4.1.2",
90-
"pump": "1.0.2",
89+
"npm-run-all": "4.1.3",
90+
"pump": "3.0.0",
9191
"rimraf": "2.6.2",
9292
"rxjs": "5.5.6",
93-
"shx": "0.2.2",
93+
"shx": "0.3.1",
9494
"source-map-loader": "0.2.3",
95-
"trash": "4.2.1",
95+
"trash": "4.3.0",
9696
"ts-jest": "22.4.6",
97-
"ts-node": "6.0.3",
98-
"tslint": "5.9.1",
99-
"typedoc": "0.10.0",
100-
"typescript": "2.7.1",
97+
"ts-node": "7.0.0",
98+
"tslint": "5.10.0",
99+
"typedoc": "0.11.1",
100+
"typescript": "2.9.2",
101101
"uglifyjs-webpack-plugin": "1.1.6",
102-
"webpack": "3.10.0",
102+
"webpack": "4.14.0",
103103
"xml2js": "0.4.19"
104104
},
105105
"@std/esm": {

js/src/data.ts

+15-15
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ export class BaseData<T extends DataType = DataType> implements VectorLike {
8686
}
8787
return nullCount;
8888
}
89-
public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount) {
90-
return new BaseData(type, length, offset, nullCount);
89+
public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount): Data<R> {
90+
return new BaseData(type, length, offset, nullCount) as any;
9191
}
9292
public slice(offset: number, length: number) {
9393
return length <= 0 ? this : this.sliceInternal(this.clone(
@@ -180,8 +180,8 @@ export class NestedData<T extends NestedType = NestedType> extends BaseData<T> {
180180
this.childData = childData;
181181
this[VectorType.VALIDITY] = toTypedArray(Uint8Array, nullBitmap);
182182
}
183-
public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount) {
184-
return new NestedData<R>(type, length, this[VectorType.VALIDITY], this.childData, offset, nullCount);
183+
public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount): Data<R> {
184+
return new NestedData<R>(type, length, this[VectorType.VALIDITY], this.childData, offset, nullCount) as any;
185185
}
186186
protected sliceInternal(clone: this, offset: number, length: number) {
187187
if (!this[VectorType.OFFSET]) {
@@ -208,8 +208,8 @@ export class ListData<T extends ListType> extends SingleNestedData<T> {
208208
super(type, length, nullBitmap, valueChildData, offset, nullCount);
209209
this[VectorType.OFFSET] = toTypedArray(Int32Array, valueOffsets);
210210
}
211-
public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount) {
212-
return new ListData<R>(type, length, this[VectorType.VALIDITY], this[VectorType.OFFSET], this._valuesData as any, offset, nullCount);
211+
public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount): Data<R> {
212+
return new ListData(type, length, this[VectorType.VALIDITY], this[VectorType.OFFSET], this._valuesData as any, offset, nullCount) as any;
213213
}
214214
}
215215

@@ -224,24 +224,24 @@ export class UnionData<T extends (DenseUnion | SparseUnion) = any> extends Neste
224224
return (typeIdToChildIndex[typeId] = idx) && typeIdToChildIndex || typeIdToChildIndex;
225225
}, Object.create(null) as { [key: number]: number });
226226
}
227-
public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount) {
228-
return new UnionData<R>(type, length, this[VectorType.VALIDITY], this[VectorType.TYPE], this.childData, offset, nullCount);
227+
public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount): Data<R> {
228+
return new UnionData<R>(type, length, this[VectorType.VALIDITY], this[VectorType.TYPE], this.childData, offset, nullCount) as any;
229229
}
230230
}
231231

232232
export class SparseUnionData extends UnionData<SparseUnion> {
233233
constructor(type: SparseUnion, length: number, nullBitmap: Uint8Array | null | undefined, typeIds: Iterable<number>, childData: Data<any>[], offset?: number, nullCount?: number) {
234234
super(type, length, nullBitmap, typeIds, childData, offset, nullCount);
235235
}
236-
public clone<R extends SparseUnion>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount) {
236+
public clone<R extends SparseUnion>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount): Data<R> {
237237
return new SparseUnionData(
238238
type,
239239
length,
240240
this[VectorType.VALIDITY],
241241
this[VectorType.TYPE],
242242
this.childData,
243243
offset, nullCount
244-
) as any as UnionData<R>;
244+
) as any;
245245
}
246246
}
247247

@@ -252,7 +252,7 @@ export class DenseUnionData extends UnionData<DenseUnion> {
252252
super(type, length, nullBitmap, typeIds, childData, offset, nullCount);
253253
this[VectorType.OFFSET] = toTypedArray(Int32Array, valueOffsets);
254254
}
255-
public clone<R extends DenseUnion>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount) {
255+
public clone<R extends DenseUnion>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount): Data<R> {
256256
return new DenseUnionData(
257257
type,
258258
length,
@@ -261,7 +261,7 @@ export class DenseUnionData extends UnionData<DenseUnion> {
261261
this[VectorType.OFFSET],
262262
this.childData,
263263
offset, nullCount
264-
) as any as UnionData<R>;
264+
) as any;
265265
}
266266
}
267267

@@ -288,12 +288,12 @@ export class ChunkedData<T extends DataType> extends BaseData<T> {
288288
}
289289
return nullCount;
290290
}
291-
public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount) {
292-
return new ChunkedData<R>(
291+
public clone<R extends T>(type: R, length = this.length, offset = this.offset, nullCount = this._nullCount): Data<R> {
292+
return new ChunkedData(
293293
type, length,
294294
this._chunkVectors.map((vec) => vec.clone(vec.data.clone(type))) as any,
295295
offset, nullCount, this._chunkOffsets
296-
);
296+
) as any;
297297
}
298298
protected sliceInternal(clone: this, offset: number, length: number) {
299299
const chunks = this._chunkVectors;

js/src/ipc/reader/node.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ export async function* fromReadableStream(stream: NodeJS.ReadableStream) {
5050
return yield bytes;
5151
}
5252

53-
if (messageLength <= 0) {
53+
if (bytes.byteLength > 0 && messageLength <= 0) {
5454
messageLength = new DataView(bytes.buffer).getInt32(0, true);
5555
}
5656

@@ -66,7 +66,7 @@ export async function* fromReadableStream(stream: NodeJS.ReadableStream) {
6666
bytesRead += messageLength + PADDING;
6767
yield bytes.subarray(0, messageLength + PADDING);
6868
bytes = bytes.subarray(messageLength + PADDING);
69-
messageLength = bytes.byteLength <= 0 ? 0 :
69+
messageLength = bytes.byteLength < 4 ? 0 :
7070
new DataView(bytes.buffer).getInt32(bytes.byteOffset, true);
7171
message = null;
7272
}

js/src/util/node.ts

+14-5
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export class PipeIterator<T> implements IterableIterator<T> {
2727
if ((res = this.next()).done) break;
2828
} while (emit(stream, encoding, res.value));
2929
}
30-
return wait(stream, encoding, res && res.done, write);
30+
return wait(stream, res && res.done, write);
3131
};
3232
write();
3333
return stream;
@@ -62,23 +62,32 @@ export class AsyncPipeIterator<T> implements AsyncIterableIterator<T> {
6262
if ((res = await this.next()).done) break;
6363
} while (emit(stream, encoding, res.value));
6464
}
65-
return wait(stream, encoding, res && res.done, write);
65+
return wait(stream, res && res.done, write);
6666
};
6767
write();
6868
return stream;
6969
}
7070
}
7171

72+
const toBufferOrUint8Array = (() => {
73+
// If in node, convert Uint8Arrays to Buffer instances. This is necessary
74+
// because some node APIs ('http' etc.) don't work unless you give them Buffers.
75+
// This eval also defeats closure-compiler, which doesn't recognize the Buffer constructor.
76+
const BufferCtor = eval('typeof Buffer !== "undefined" ? Buffer : null');
77+
return !BufferCtor ? (arr: Uint8Array) => arr :
78+
(arr: Uint8Array) => BufferCtor.from(arr.buffer, arr.byteOffset, arr.byteLength);
79+
})();
80+
7281
function emit(stream: NodeJS.WritableStream, encoding: string, value: any) {
73-
return stream['write']((encoding === 'utf8' ? value + '\n' : value) as any, encoding);
82+
return stream['write']((encoding === 'utf8' ? value + '\n' : toBufferOrUint8Array(value)) as any, encoding);
7483
}
7584

76-
function wait(stream: NodeJS.WritableStream, encoding: string, done: boolean, write: (x?: any) => void) {
85+
function wait(stream: NodeJS.WritableStream, done: boolean, write: (x?: any) => void) {
7786
const p = eval('process'); // defeat closure compiler
7887
if (!done) {
7988
stream['once']('error', write);
8089
stream['once']('drain', write);
8190
} else if (!(!p || stream === p.stdout) && !(stream as any)['isTTY']) {
82-
stream['end'](<any> (encoding === 'utf8' ? '\n' : new Uint8Array(0)));
91+
stream['end'](<any> null);
8392
}
8493
}

js/src/vector.ts

+23-23
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ export class Vector<T extends DataType = any> implements VectorLike, View<T>, Vi
4949
this.length = data.length;
5050
let nulls: Uint8Array;
5151
if ((<any> data instanceof ChunkedData) && !(view instanceof ChunkedView)) {
52-
this.view = new ChunkedView(data);
52+
this.view = new ChunkedView(data as any) as any;
5353
} else if (!(view instanceof ValidityView) && (nulls = data.nullBitmap!) && nulls.length > 0 && data.nullCount > 0) {
5454
this.view = new ValidityView(data, view);
5555
} else {
@@ -159,12 +159,12 @@ export abstract class NestedVector<T extends NestedType> extends Vector<T> {
159159
return data as Data<any>[];
160160
} else if (!(<any> (data = this.data) instanceof ChunkedData)) {
161161
// If data isn't chunked, cache and return NestedData's childData
162-
return this._childData = (data as NestedData<T>).childData;
162+
return this._childData = data.childData;
163163
}
164164
// Otherwise if the data is chunked, concatenate the childVectors from each chunk
165165
// to construct a single chunked Vector for each column. Then return the ChunkedData
166166
// instance from each unified chunked column as the childData of a chunked NestedVector
167-
const chunks = ((data as ChunkedData<T>).chunkVectors as NestedVector<T>[]);
167+
const chunks = ((data as any as ChunkedData<T>).chunkVectors as NestedVector<T>[]);
168168
return this._childData = chunks
169169
.reduce<(Vector<T> | null)[][]>((cols, chunk) => chunk.childData
170170
.reduce<(Vector<T> | null)[][]>((cols, _, i) => (
@@ -197,7 +197,7 @@ export class NullVector extends Vector<Null> {
197197

198198
export class BoolVector extends Vector<Bool> {
199199
public static from(data: IterableArrayLike<boolean>) {
200-
return new BoolVector(new BoolData(new Bool(), data.length, null, packBools(data)));
200+
return new BoolVector(new BoolData(new Bool(), data.length, null, packBools(data)) as Data<Bool>);
201201
}
202202
public get values() { return this.data.values; }
203203
constructor(data: Data<Bool>, view: View<Bool> = new BoolView(data)) {
@@ -360,7 +360,7 @@ export class Utf8Vector extends ListVectorBase<Utf8> {
360360
export class ListVector<T extends DataType = DataType> extends ListVectorBase<List<T>> {
361361
// @ts-ignore
362362
public readonly view: ListView<T>;
363-
constructor(data: Data<T>, view: View<List<T>> = new ListView(data)) {
363+
constructor(data: Data<List<T>>, view: ListView<T> = new ListView<T>(data as any)) {
364364
super(data, view);
365365
}
366366
public getChildAt(index: number): Vector<T> | null {
@@ -439,22 +439,22 @@ export const createVector = ((VectorLoader: new <T extends DataType>(data: Data<
439439
<T extends DataType>(data: Data<T>) => TypeVisitor.visitTypeInline(new VectorLoader(data), data.type) as Vector<T>
440440
))(class VectorLoader<T extends DataType> extends TypeVisitor {
441441
constructor(private data: Data<T>) { super(); }
442-
visitNull (_type: Null) { return new NullVector(this.data); }
443-
visitInt (_type: Int) { return new IntVector(this.data); }
444-
visitFloat (_type: Float) { return new FloatVector(this.data); }
445-
visitBinary (_type: Binary) { return new BinaryVector(this.data); }
446-
visitUtf8 (_type: Utf8) { return new Utf8Vector(this.data); }
447-
visitBool (_type: Bool) { return new BoolVector(this.data); }
448-
visitDecimal (_type: Decimal) { return new DecimalVector(this.data); }
449-
visitDate (_type: Date_) { return new DateVector(this.data); }
450-
visitTime (_type: Time) { return new TimeVector(this.data); }
451-
visitTimestamp (_type: Timestamp) { return new TimestampVector(this.data); }
452-
visitInterval (_type: Interval) { return new IntervalVector(this.data); }
453-
visitList (_type: List) { return new ListVector(this.data); }
454-
visitStruct (_type: Struct) { return new StructVector(this.data); }
455-
visitUnion (_type: Union) { return new UnionVector(this.data); }
456-
visitFixedSizeBinary(_type: FixedSizeBinary) { return new FixedSizeBinaryVector(this.data); }
457-
visitFixedSizeList (_type: FixedSizeList) { return new FixedSizeListVector(this.data); }
458-
visitMap (_type: Map_) { return new MapVector(this.data); }
459-
visitDictionary (_type: Dictionary) { return new DictionaryVector(this.data); }
442+
visitNull (_type: Null) { return new NullVector(<any> this.data); }
443+
visitInt (_type: Int) { return new IntVector(<any> this.data); }
444+
visitFloat (_type: Float) { return new FloatVector(<any> this.data); }
445+
visitBinary (_type: Binary) { return new BinaryVector(<any> this.data); }
446+
visitUtf8 (_type: Utf8) { return new Utf8Vector(<any> this.data); }
447+
visitBool (_type: Bool) { return new BoolVector(<any> this.data); }
448+
visitDecimal (_type: Decimal) { return new DecimalVector(<any> this.data); }
449+
visitDate (_type: Date_) { return new DateVector(<any> this.data); }
450+
visitTime (_type: Time) { return new TimeVector(<any> this.data); }
451+
visitTimestamp (_type: Timestamp) { return new TimestampVector(<any> this.data); }
452+
visitInterval (_type: Interval) { return new IntervalVector(<any> this.data); }
453+
visitList (_type: List) { return new ListVector(<any> this.data); }
454+
visitStruct (_type: Struct) { return new StructVector(<any> this.data); }
455+
visitUnion (_type: Union) { return new UnionVector(<any> this.data); }
456+
visitFixedSizeBinary(_type: FixedSizeBinary) { return new FixedSizeBinaryVector(<any> this.data); }
457+
visitFixedSizeList (_type: FixedSizeList) { return new FixedSizeListVector(<any> this.data); }
458+
visitMap (_type: Map_) { return new MapVector(<any> this.data); }
459+
visitDictionary (_type: Dictionary) { return new DictionaryVector(<any> this.data); }
460460
});

js/src/vector/chunked.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
import { ChunkedData } from '../data';
18+
import { ChunkedData, Data } from '../data';
1919
import { View, Vector, NestedVector } from '../vector';
2020
import { DataType, TypedArray, IterableArrayLike } from '../type';
2121

@@ -28,7 +28,7 @@ export class ChunkedView<T extends DataType> implements View<T> {
2828
this.chunkVectors = data.chunkVectors;
2929
this.chunkOffsets = data.chunkOffsets;
3030
}
31-
public clone(data: ChunkedData<T>): this {
31+
public clone(data: ChunkedData<T> & Data<T>): this {
3232
return new ChunkedView(data) as this;
3333
}
3434
public *[Symbol.iterator](): IterableIterator<T['TValue'] | null> {

js/src/vector/dictionary.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export class DictionaryView<T extends DataType> implements View<T> {
2626
this.indices = indices;
2727
this.dictionary = dictionary;
2828
}
29-
public clone(data: Data<Dictionary<T>>): this {
29+
public clone(data: Data<Dictionary<T>> & Data<T>): this {
3030
return new DictionaryView(data.dictionary, this.indices.clone(data.indices)) as this;
3131
}
3232
public isValid(index: number): boolean {

js/src/vector/list.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ export abstract class VariableListViewBase<T extends (ListType | FlatListType)>
8383
export class ListView<T extends DataType> extends VariableListViewBase<List<T>> {
8484
public values: Vector<T>;
8585
constructor(data: Data<T>) {
86-
super(data);
87-
this.values = createVector(data.values);
86+
super(data as any);
87+
this.values = createVector((data as any).values);
8888
}
8989
public getChildAt<R extends T = T>(index: number): Vector<R> | null {
9090
return index === 0 ? (this.values as Vector<R>) : null;

0 commit comments

Comments
 (0)