deno.land / std@0.91.0 / node / _stream / from.ts

نووسراو ببینە
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// Copyright Node.js contributors. All rights reserved. MIT License.import { Buffer } from "../buffer.ts";import Readable from "./readable.ts";import type { ReadableOptions } from "./readable.ts";import { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } from "../_errors.ts";
export default function from( // deno-lint-ignore no-explicit-any iterable: Iterable<any> | AsyncIterable<any>, opts?: ReadableOptions,) { let iterator: // deno-lint-ignore no-explicit-any | Iterator<any, any, undefined> // deno-lint-ignore no-explicit-any | AsyncIterator<any, any, undefined>; if (typeof iterable === "string" || iterable instanceof Buffer) { return new Readable({ objectMode: true, ...opts, read() { this.push(iterable); this.push(null); }, }); }
if (Symbol.asyncIterator in iterable) { // deno-lint-ignore no-explicit-any iterator = (iterable as AsyncIterable<any>)[Symbol.asyncIterator](); } else if (Symbol.iterator in iterable) { // deno-lint-ignore no-explicit-any iterator = (iterable as Iterable<any>)[Symbol.iterator](); } else { throw new ERR_INVALID_ARG_TYPE("iterable", ["Iterable"], iterable); }
const readable = new Readable({ objectMode: true, highWaterMark: 1, ...opts, });
// Reading boolean to protect against _read // being called before last iteration completion. let reading = false;
// needToClose boolean if iterator needs to be explicitly closed let needToClose = false;
readable._read = function () { if (!reading) { reading = true; next(); } };
readable._destroy = function (error, cb) { if (needToClose) { needToClose = false; close().then( () => queueMicrotask(() => cb(error)), (e) => queueMicrotask(() => cb(error || e)), ); } else { cb(error); } };
async function close() { if (typeof iterator.return === "function") { const { value } = await iterator.return(); await value; } }
async function next() { try { needToClose = false; const { value, done } = await iterator.next(); needToClose = !done; if (done) { readable.push(null); } else if (readable.destroyed) { await close(); } else { const res = await value; if (res === null) { reading = false; throw new ERR_STREAM_NULL_VALUES(); } else if (readable.push(res)) { next(); } else { reading = false; } } } catch (err) { readable.destroy(err); } } return readable;}
std

Version Info

Tagged at
3 years ago

External Dependencies

No external dependencies 🎉