deno.land / std@0.91.0 / node / _stream / writable.ts
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444// Copyright Node.js contributors. All rights reserved. MIT License.import { Buffer } from "../buffer.ts";import Stream from "./stream.ts";import { captureRejectionSymbol } from "../events.ts";import { ERR_INVALID_ARG_TYPE, ERR_INVALID_OPT_VALUE, ERR_METHOD_NOT_IMPLEMENTED, ERR_STREAM_ALREADY_FINISHED, ERR_STREAM_CANNOT_PIPE, ERR_STREAM_DESTROYED, ERR_STREAM_NULL_VALUES, ERR_STREAM_WRITE_AFTER_END, ERR_UNKNOWN_ENCODING,} from "../_errors.ts";import type { AfterWriteTick, writeV } from "./writable_internal.ts";import { clearBuffer, destroy, errorBuffer, errorOrDestroy, finishMaybe, kOnFinished, nop, onwrite, resetBuffer, writeOrBuffer,} from "./writable_internal.ts";import type { Encodings } from "../_utils.ts";
type WritableEncodings = Encodings | "buffer";
export interface WritableOptions { autoDestroy?: boolean; decodeStrings?: boolean; defaultEncoding?: WritableEncodings; destroy?( this: Writable, error: Error | null, callback: (error: Error | null) => void, ): void; emitClose?: boolean; final?(this: Writable, callback: (error?: Error | null) => void): void; highWaterMark?: number; objectMode?: boolean; write?( this: Writable, // deno-lint-ignore no-explicit-any chunk: any, encoding: WritableEncodings, callback: (error?: Error | null) => void, ): void; writev?( this: Writable, // deno-lint-ignore no-explicit-any chunks: Array<{ chunk: any; encoding: string }>, callback: (error?: Error | null) => void, ): void;}
export class WritableState { [kOnFinished]: Array<(error?: Error) => void> = []; afterWriteTickInfo: null | AfterWriteTick = null; allBuffers = true; allNoop = true; autoDestroy: boolean; buffered: Array<{ allBuffers?: boolean; // deno-lint-ignore no-explicit-any chunk: any; encoding: string; callback: (error: Error) => void; }> = []; bufferedIndex = 0; bufferProcessing = false; closed = false; closeEmitted = false; constructed: boolean; corked = 0; decodeStrings: boolean; defaultEncoding: WritableEncodings; destroyed = false; emitClose: boolean; ended = false; ending = false; errored: Error | null = null; errorEmitted = false; finalCalled = false; finished = false; highWaterMark: number; length = 0; needDrain = false; objectMode: boolean; onwrite: (error?: Error | null) => void; pendingcb = 0; prefinished = false; sync = true; writecb: null | ((error: Error) => void) = null; writable = true; writelen = 0; writing = false;
constructor(options: WritableOptions | undefined, stream: Writable) { this.objectMode = !!options?.objectMode;
this.highWaterMark = options?.highWaterMark ?? (this.objectMode ? 16 : 16 * 1024);
if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) { this.highWaterMark = Math.floor(this.highWaterMark); } else { throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark); }
this.decodeStrings = !options?.decodeStrings === false;
this.defaultEncoding = options?.defaultEncoding || "utf8";
this.onwrite = onwrite.bind(undefined, stream);
resetBuffer(this);
this.emitClose = options?.emitClose ?? true; this.autoDestroy = options?.autoDestroy ?? true; this.constructed = true; }
getBuffer() { return this.buffered.slice(this.bufferedIndex); }
get bufferedRequestCount() { return this.buffered.length - this.bufferedIndex; }}
/** A bit simpler than readable streams.* Implement an async `._write(chunk, encoding, cb)`, and it'll handle all* the drain event emission and buffering.*/class Writable extends Stream { _final?: ( this: Writable, callback: (error?: Error | null | undefined) => void, ) => void; _writableState: WritableState; _writev?: writeV | null = null;
constructor(options?: WritableOptions) { super(); this._writableState = new WritableState(options, this);
if (options) { if (typeof options.write === "function") { this._write = options.write; }
if (typeof options.writev === "function") { this._writev = options.writev; }
if (typeof options.destroy === "function") { this._destroy = options.destroy; }
if (typeof options.final === "function") { this._final = options.final; } } }
[captureRejectionSymbol](err?: Error) { this.destroy(err); }
static WritableState = WritableState;
get destroyed() { return this._writableState ? this._writableState.destroyed : false; }
set destroyed(value) { if (this._writableState) { this._writableState.destroyed = value; } }
get writable() { const w = this._writableState; return !w.destroyed && !w.errored && !w.ending && !w.ended; }
set writable(val) { if (this._writableState) { this._writableState.writable = !!val; } }
get writableFinished() { return this._writableState ? this._writableState.finished : false; }
get writableObjectMode() { return this._writableState ? this._writableState.objectMode : false; }
get writableBuffer() { return this._writableState && this._writableState.getBuffer(); }
get writableEnded() { return this._writableState ? this._writableState.ending : false; }
get writableHighWaterMark() { return this._writableState && this._writableState.highWaterMark; }
get writableCorked() { return this._writableState ? this._writableState.corked : 0; }
get writableLength() { return this._writableState && this._writableState.length; }
_undestroy() { const w = this._writableState; w.constructed = true; w.destroyed = false; w.closed = false; w.closeEmitted = false; w.errored = null; w.errorEmitted = false; w.ended = false; w.ending = false; w.finalCalled = false; w.prefinished = false; w.finished = false; }
_destroy(err: Error | null, cb: (error?: Error | null) => void) { cb(err); }
destroy(err?: Error | null, cb?: () => void) { const state = this._writableState; if (!state.destroyed) { queueMicrotask(() => errorBuffer(state)); } destroy.call(this, err, cb); return this; }
end(cb?: () => void): void; // deno-lint-ignore no-explicit-any end(chunk: any, cb?: () => void): void; // deno-lint-ignore no-explicit-any end(chunk: any, encoding: WritableEncodings, cb?: () => void): void;
end( // deno-lint-ignore no-explicit-any x?: any | (() => void), y?: WritableEncodings | (() => void), z?: () => void, ) { const state = this._writableState; // deno-lint-ignore no-explicit-any let chunk: any | null; let encoding: WritableEncodings | null; let cb: undefined | ((error?: Error) => void);
if (typeof x === "function") { chunk = null; encoding = null; cb = x; } else if (typeof y === "function") { chunk = x; encoding = null; cb = y; } else { chunk = x; encoding = y as WritableEncodings; cb = z; }
if (chunk !== null && chunk !== undefined) { this.write(chunk, encoding); }
if (state.corked) { state.corked = 1; this.uncork(); }
let err: Error | undefined; if (!state.errored && !state.ending) { state.ending = true; finishMaybe(this, state, true); state.ended = true; } else if (state.finished) { err = new ERR_STREAM_ALREADY_FINISHED("end"); } else if (state.destroyed) { err = new ERR_STREAM_DESTROYED("end"); }
if (typeof cb === "function") { if (err || state.finished) { queueMicrotask(() => { (cb as (error?: Error | undefined) => void)(err); }); } else { state[kOnFinished].push(cb); } }
return this; }
_write( // deno-lint-ignore no-explicit-any chunk: any, encoding: string, cb: (error?: Error | null) => void, ): void { if (this._writev) { this._writev([{ chunk, encoding }], cb); } else { throw new ERR_METHOD_NOT_IMPLEMENTED("_write()"); } }
//This signature was changed to keep inheritance coherent pipe(dest: Writable): Writable { errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); return dest; }
// deno-lint-ignore no-explicit-any write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean; write( // deno-lint-ignore no-explicit-any chunk: any, encoding: WritableEncodings | null, cb?: (error: Error | null | undefined) => void, ): boolean;
write( // deno-lint-ignore no-explicit-any chunk: any, x?: WritableEncodings | null | ((error: Error | null | undefined) => void), y?: ((error: Error | null | undefined) => void), ) { const state = this._writableState; let encoding: WritableEncodings; let cb: (error?: Error | null) => void;
if (typeof x === "function") { cb = x; encoding = state.defaultEncoding; } else { if (!x) { encoding = state.defaultEncoding; } else if (x !== "buffer" && !Buffer.isEncoding(x)) { throw new ERR_UNKNOWN_ENCODING(x); } else { encoding = x; } if (typeof y !== "function") { cb = nop; } else { cb = y; } }
if (chunk === null) { throw new ERR_STREAM_NULL_VALUES(); } else if (!state.objectMode) { if (typeof chunk === "string") { if (state.decodeStrings !== false) { chunk = Buffer.from(chunk, encoding); encoding = "buffer"; } } else if (chunk instanceof Buffer) { encoding = "buffer"; } else if (Stream._isUint8Array(chunk)) { chunk = Stream._uint8ArrayToBuffer(chunk); encoding = "buffer"; } else { throw new ERR_INVALID_ARG_TYPE( "chunk", ["string", "Buffer", "Uint8Array"], chunk, ); } }
let err: Error | undefined; if (state.ending) { err = new ERR_STREAM_WRITE_AFTER_END(); } else if (state.destroyed) { err = new ERR_STREAM_DESTROYED("write"); }
if (err) { queueMicrotask(() => cb(err)); errorOrDestroy(this, err, true); return false; } state.pendingcb++; return writeOrBuffer(this, state, chunk, encoding, cb); }
cork() { this._writableState.corked++; }
uncork() { const state = this._writableState;
if (state.corked) { state.corked--;
if (!state.writing) { clearBuffer(this, state); } } }
setDefaultEncoding(encoding: string) { // node::ParseEncoding() requires lower case. if (typeof encoding === "string") { encoding = encoding.toLowerCase(); } if (!Buffer.isEncoding(encoding)) { throw new ERR_UNKNOWN_ENCODING(encoding); } this._writableState.defaultEncoding = encoding as WritableEncodings; return this; }}
export default Writable;
Version Info
External Dependencies