deno.land / std@0.166.0 / node / _tools / test / parallel / test-stream2-basic.js
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453// deno-fmt-ignore-file// deno-lint-ignore-file
// Copyright Joyent and Node contributors. All rights reserved. MIT license.// Taken from Node 18.12.0// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually
// Copyright Joyent, Inc. and other Node contributors.//// Permission is hereby granted, free of charge, to any person obtaining a// copy of this software and associated documentation files (the// "Software"), to deal in the Software without restriction, including// without limitation the rights to use, copy, modify, merge, publish,// distribute, sublicense, and/or sell copies of the Software, and to permit// persons to whom the Software is furnished to do so, subject to the// following conditions://// The above copyright notice and this permission notice shall be included// in all copies or substantial portions of the Software.//// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE// USE OR OTHER DEALINGS IN THE SOFTWARE.
'use strict';
const common = require('../common');const { Readable: R, Writable: W } = require('stream');const assert = require('assert');
const EE = require('events').EventEmitter;
class TestReader extends R { constructor(n) { super(); this._buffer = Buffer.alloc(n || 100, 'x'); this._pos = 0; this._bufs = 10; }
_read(n) { const max = this._buffer.length - this._pos; n = Math.max(n, 0); const toRead = Math.min(n, max); if (toRead === 0) { // Simulate the read buffer filling up with some more bytes some time // in the future. setTimeout(() => { this._pos = 0; this._bufs -= 1; if (this._bufs <= 0) { // read them all! if (!this.ended) this.push(null); } else { // now we have more. // kinda cheating by calling _read, but whatever, // it's just fake anyway. this._read(n); } }, 10); return; }
const ret = this._buffer.slice(this._pos, this._pos + toRead); this._pos += toRead; this.push(ret); }}
class TestWriter extends EE { constructor() { super(); this.received = []; this.flush = false; }
write(c) { this.received.push(c.toString()); this.emit('write', c); return true; }
end(c) { if (c) this.write(c); this.emit('end', this.received); }}
{ // Test basic functionality const r = new TestReader(20);
const reads = []; const expect = [ 'x', 'xx', 'xxx', 'xxxx', 'xxxxx', 'xxxxxxxxx', 'xxxxxxxxxx', 'xxxxxxxxxxxx', 'xxxxxxxxxxxxx', 'xxxxxxxxxxxxxxx', 'xxxxxxxxxxxxxxxxx', 'xxxxxxxxxxxxxxxxxxx', 'xxxxxxxxxxxxxxxxxxxxx', 'xxxxxxxxxxxxxxxxxxxxxxx', 'xxxxxxxxxxxxxxxxxxxxxxxxx', 'xxxxxxxxxxxxxxxxxxxxx' ];
r.on('end', common.mustCall(function() { assert.deepStrictEqual(reads, expect); }));
let readSize = 1; function flow() { let res; while (null !== (res = r.read(readSize++))) { reads.push(res.toString()); } r.once('readable', flow); }
flow();}
{ // Verify pipe const r = new TestReader(5);
const expect = [ 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx' ];
const w = new TestWriter();
w.on('end', common.mustCall(function(received) { assert.deepStrictEqual(received, expect); }));
r.pipe(w);}
[1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(function(SPLIT) { // Verify unpipe const r = new TestReader(5);
// Unpipe after 3 writes, then write to another stream instead. let expect = [ 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx' ]; expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ];
const w = [ new TestWriter(), new TestWriter() ];
let writes = SPLIT; w[0].on('write', function() { if (--writes === 0) { r.unpipe(); assert.deepStrictEqual(r._readableState.pipes, []); w[0].end(); r.pipe(w[1]); assert.deepStrictEqual(r._readableState.pipes, [w[1]]); } });
let ended = 0;
w[0].on('end', common.mustCall(function(results) { ended++; assert.strictEqual(ended, 1); assert.deepStrictEqual(results, expect[0]); }));
w[1].on('end', common.mustCall(function(results) { ended++; assert.strictEqual(ended, 2); assert.deepStrictEqual(results, expect[1]); }));
r.pipe(w[0]);});
{ // Verify both writers get the same data when piping to destinations const r = new TestReader(5); const w = [ new TestWriter(), new TestWriter() ];
const expect = [ 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx' ];
w[0].on('end', common.mustCall(function(received) { assert.deepStrictEqual(received, expect); })); w[1].on('end', common.mustCall(function(received) { assert.deepStrictEqual(received, expect); }));
r.pipe(w[0]); r.pipe(w[1]);}
[1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(function(SPLIT) { // Verify multi-unpipe const r = new TestReader(5);
// Unpipe after 3 writes, then write to another stream instead. let expect = [ 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx', 'xxxxx' ]; expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ];
const w = [ new TestWriter(), new TestWriter(), new TestWriter() ];
let writes = SPLIT; w[0].on('write', function() { if (--writes === 0) { r.unpipe(); w[0].end(); r.pipe(w[1]); } });
let ended = 0;
w[0].on('end', common.mustCall(function(results) { ended++; assert.strictEqual(ended, 1); assert.deepStrictEqual(results, expect[0]); }));
w[1].on('end', common.mustCall(function(results) { ended++; assert.strictEqual(ended, 2); assert.deepStrictEqual(results, expect[1]); }));
r.pipe(w[0]); r.pipe(w[2]);});
{ // Verify that back pressure is respected const r = new R({ objectMode: true }); r._read = common.mustNotCall(); let counter = 0; r.push(['one']); r.push(['two']); r.push(['three']); r.push(['four']); r.push(null);
const w1 = new R(); w1.write = function(chunk) { assert.strictEqual(chunk[0], 'one'); w1.emit('close'); process.nextTick(function() { r.pipe(w2); r.pipe(w3); }); }; w1.end = common.mustNotCall();
r.pipe(w1);
const expected = ['two', 'two', 'three', 'three', 'four', 'four'];
const w2 = new R(); w2.write = function(chunk) { assert.strictEqual(chunk[0], expected.shift()); assert.strictEqual(counter, 0);
counter++;
if (chunk[0] === 'four') { return true; }
setTimeout(function() { counter--; w2.emit('drain'); }, 10);
return false; }; w2.end = common.mustCall();
const w3 = new R(); w3.write = function(chunk) { assert.strictEqual(chunk[0], expected.shift()); assert.strictEqual(counter, 1);
counter++;
if (chunk[0] === 'four') { return true; }
setTimeout(function() { counter--; w3.emit('drain'); }, 50);
return false; }; w3.end = common.mustCall(function() { assert.strictEqual(counter, 2); assert.strictEqual(expected.length, 0); });}
{ // Verify read(0) behavior for ended streams const r = new R(); let written = false; let ended = false; r._read = common.mustNotCall();
r.push(Buffer.from('foo')); r.push(null);
const v = r.read(0);
assert.strictEqual(v, null);
const w = new R(); w.write = function(buffer) { written = true; assert.strictEqual(ended, false); assert.strictEqual(buffer.toString(), 'foo'); };
w.end = common.mustCall(function() { ended = true; assert.strictEqual(written, true); });
r.pipe(w);}
{ // Verify synchronous _read ending const r = new R(); let called = false; r._read = function(n) { r.push(null); };
r.once('end', function() { // Verify that this is called before the next tick called = true; });
r.read();
process.nextTick(function() { assert.strictEqual(called, true); });}
{ // Verify that adding readable listeners trigger data flow const r = new R({ highWaterMark: 5 }); let onReadable = false; let readCalled = 0;
r._read = function(n) { if (readCalled++ === 2) r.push(null); else r.push(Buffer.from('asdf')); };
r.on('readable', function() { onReadable = true; r.read(); });
r.on('end', common.mustCall(function() { assert.strictEqual(readCalled, 3); assert.ok(onReadable); }));}
{ // Verify that streams are chainable const r = new R(); r._read = common.mustCall(); const r2 = r.setEncoding('utf8').pause().resume().pause(); assert.strictEqual(r, r2);}
{ // Verify readableEncoding property assert(Object.hasOwn(R.prototype, 'readableEncoding'));
const r = new R({ encoding: 'utf8' }); assert.strictEqual(r.readableEncoding, 'utf8');}
{ // Verify readableObjectMode property assert(Object.hasOwn(R.prototype, 'readableObjectMode'));
const r = new R({ objectMode: true }); assert.strictEqual(r.readableObjectMode, true);}
{ // Verify writableObjectMode property assert(Object.hasOwn(W.prototype, 'writableObjectMode'));
const w = new W({ objectMode: true }); assert.strictEqual(w.writableObjectMode, true);}
Version Info