deno.land / x / deno@v1.28.2 / runtime / ops / io.rs
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use deno_core::error::resource_unavailable;use deno_core::error::AnyError;use deno_core::op;use deno_core::parking_lot::Mutex;use deno_core::AsyncMutFuture;use deno_core::AsyncRefCell;use deno_core::AsyncResult;use deno_core::BufMutView;use deno_core::BufView;use deno_core::CancelHandle;use deno_core::CancelTryFuture;use deno_core::Extension;use deno_core::OpState;use deno_core::RcRef;use deno_core::Resource;use deno_core::ResourceId;use deno_core::ZeroCopyBuf;use once_cell::sync::Lazy;use std::borrow::Cow;use std::cell::RefCell;use std::fs::File as StdFile;use std::io::ErrorKind;use std::io::Read;use std::io::Write;use std::rc::Rc;use std::sync::Arc;use tokio::io::AsyncRead;use tokio::io::AsyncReadExt;use tokio::io::AsyncWrite;use tokio::io::AsyncWriteExt;use tokio::process;
#[cfg(unix)]use std::os::unix::io::FromRawFd;
#[cfg(windows)]use { std::os::windows::io::FromRawHandle, winapi::um::{processenv::GetStdHandle, winbase},};
// Store the stdio fd/handles in global statics in order to keep them// alive for the duration of the application since the last handle/fd// being dropped will close the corresponding pipe.#[cfg(unix)]static STDIN_HANDLE: Lazy<StdFile> = Lazy::new(|| { // SAFETY: corresponds to OS stdin unsafe { StdFile::from_raw_fd(0) }});#[cfg(unix)]static STDOUT_HANDLE: Lazy<StdFile> = Lazy::new(|| { // SAFETY: corresponds to OS stdout unsafe { StdFile::from_raw_fd(1) }});#[cfg(unix)]static STDERR_HANDLE: Lazy<StdFile> = Lazy::new(|| { // SAFETY: corresponds to OS stderr unsafe { StdFile::from_raw_fd(2) }});
#[cfg(windows)]static STDIN_HANDLE: Lazy<StdFile> = Lazy::new(|| { // SAFETY: corresponds to OS stdin unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_INPUT_HANDLE)) }});#[cfg(windows)]static STDOUT_HANDLE: Lazy<StdFile> = Lazy::new(|| { // SAFETY: corresponds to OS stdout unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_OUTPUT_HANDLE)) }});#[cfg(windows)]static STDERR_HANDLE: Lazy<StdFile> = Lazy::new(|| { // SAFETY: corresponds to OS stderr unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_ERROR_HANDLE)) }});
pub fn init() -> Extension { Extension::builder() .ops(vec![op_read_sync::decl(), op_write_sync::decl()]) .build()}
pub enum StdioPipe { Inherit, File(StdFile),}
impl Default for StdioPipe { fn default() -> Self { Self::Inherit }}
impl Clone for StdioPipe { fn clone(&self) -> Self { match self { StdioPipe::Inherit => StdioPipe::Inherit, StdioPipe::File(pipe) => StdioPipe::File(pipe.try_clone().unwrap()), } }}
/// Specify how stdin, stdout, and stderr are piped./// By default, inherits from the process.#[derive(Clone, Default)]pub struct Stdio { pub stdin: StdioPipe, pub stdout: StdioPipe, pub stderr: StdioPipe,}
pub fn init_stdio(stdio: Stdio) -> Extension { // todo(dsheret): don't do this? Taking out the writers was necessary to prevent invalid handle panics let stdio = Rc::new(RefCell::new(Some(stdio)));
Extension::builder() .middleware(|op| match op.name { "op_print" => op_print::decl(), _ => op, }) .state(move |state| { let stdio = stdio .borrow_mut() .take() .expect("Extension only supports being used once."); let t = &mut state.resource_table;
let rid = t.add(StdFileResource::stdio( match stdio.stdin { StdioPipe::Inherit => StdFileResourceInner { kind: StdFileResourceKind::Stdin, file: STDIN_HANDLE.try_clone().unwrap(), }, StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), }, "stdin", )); assert_eq!(rid, 0, "stdin must have ResourceId 0");
let rid = t.add(StdFileResource::stdio( match stdio.stdout { StdioPipe::Inherit => StdFileResourceInner { kind: StdFileResourceKind::Stdout, file: STDOUT_HANDLE.try_clone().unwrap(), }, StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), }, "stdout", )); assert_eq!(rid, 1, "stdout must have ResourceId 1");
let rid = t.add(StdFileResource::stdio( match stdio.stderr { StdioPipe::Inherit => StdFileResourceInner { kind: StdFileResourceKind::Stderr, file: STDERR_HANDLE.try_clone().unwrap(), }, StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), }, "stderr", )); assert_eq!(rid, 2, "stderr must have ResourceId 2"); Ok(()) }) .build()}
#[cfg(unix)]use nix::sys::termios;
use super::utils::TaskQueue;
#[derive(Default)]pub struct TtyMetadata { #[cfg(unix)] pub mode: Option<termios::Termios>,}
#[derive(Default)]pub struct FileMetadata { pub tty: TtyMetadata,}
#[derive(Debug)]pub struct WriteOnlyResource<S> { stream: AsyncRefCell<S>,}
impl<S: 'static> From<S> for WriteOnlyResource<S> { fn from(stream: S) -> Self { Self { stream: stream.into(), } }}
impl<S> WriteOnlyResource<S>where S: AsyncWrite + Unpin + 'static,{ pub fn borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<S> { RcRef::map(self, |r| &r.stream).borrow_mut() }
async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> { let mut stream = self.borrow_mut().await; let nwritten = stream.write(data).await?; Ok(nwritten) }
async fn shutdown(self: Rc<Self>) -> Result<(), AnyError> { let mut stream = self.borrow_mut().await; stream.shutdown().await?; Ok(()) }
pub fn into_inner(self) -> S { self.stream.into_inner() }}
#[derive(Debug)]pub struct ReadOnlyResource<S> { stream: AsyncRefCell<S>, cancel_handle: CancelHandle,}
impl<S: 'static> From<S> for ReadOnlyResource<S> { fn from(stream: S) -> Self { Self { stream: stream.into(), cancel_handle: Default::default(), } }}
impl<S> ReadOnlyResource<S>where S: AsyncRead + Unpin + 'static,{ pub fn borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<S> { RcRef::map(self, |r| &r.stream).borrow_mut() }
pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> { RcRef::map(self, |r| &r.cancel_handle) }
pub fn cancel_read_ops(&self) { self.cancel_handle.cancel() }
async fn read(self: Rc<Self>, data: &mut [u8]) -> Result<usize, AnyError> { let mut rd = self.borrow_mut().await; let nread = rd.read(data).try_or_cancel(self.cancel_handle()).await?; Ok(nread) }
pub fn into_inner(self) -> S { self.stream.into_inner() }}
pub type ChildStdinResource = WriteOnlyResource<process::ChildStdin>;
impl Resource for ChildStdinResource { fn name(&self) -> Cow<str> { "childStdin".into() }
deno_core::impl_writable!();
fn shutdown(self: Rc<Self>) -> AsyncResult<()> { Box::pin(self.shutdown()) }}
pub type ChildStdoutResource = ReadOnlyResource<process::ChildStdout>;
impl Resource for ChildStdoutResource { deno_core::impl_readable_byob!();
fn name(&self) -> Cow<str> { "childStdout".into() }
fn close(self: Rc<Self>) { self.cancel_read_ops(); }}
pub type ChildStderrResource = ReadOnlyResource<process::ChildStderr>;
impl Resource for ChildStderrResource { deno_core::impl_readable_byob!();
fn name(&self) -> Cow<str> { "childStderr".into() }
fn close(self: Rc<Self>) { self.cancel_read_ops(); }}
#[derive(Clone, Copy)]enum StdFileResourceKind { File, // For stdout and stderr, we sometimes instead use std::io::stdout() directly, // because we get some Windows specific functionality for free by using Rust // std's wrappers. So we take a bit of a complexity hit in order to not // have to duplicate the functionality in Rust's std/src/sys/windows/stdio.rs Stdin, Stdout, Stderr,}
struct StdFileResourceInner { kind: StdFileResourceKind, file: StdFile,}
impl StdFileResourceInner { pub fn file(fs_file: StdFile) -> Self { StdFileResourceInner { kind: StdFileResourceKind::File, file: fs_file, } }
pub fn with_file<R>(&mut self, f: impl FnOnce(&mut StdFile) -> R) -> R { f(&mut self.file) }
pub fn try_clone(&self) -> Result<Self, std::io::Error> { Ok(Self { kind: self.kind, file: self.file.try_clone()?, }) }
pub fn write_and_maybe_flush( &mut self, buf: &[u8], ) -> Result<usize, AnyError> { // Rust will line buffer and we don't want that behavior // (see https://github.com/denoland/deno/issues/948), so flush stdout and stderr. // Although an alternative solution could be to bypass Rust's std by // using the raw fds/handles, it will cause encoding issues on Windows // that we get solved for free by using Rust's stdio wrappers (see // std/src/sys/windows/stdio.rs in Rust's source code). match self.kind { StdFileResourceKind::File => Ok(self.file.write(buf)?), StdFileResourceKind::Stdin => { Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into()) } StdFileResourceKind::Stdout => { // bypass the file and use std::io::stdout() let mut stdout = std::io::stdout().lock(); let nwritten = stdout.write(buf)?; stdout.flush()?; Ok(nwritten) } StdFileResourceKind::Stderr => { // bypass the file and use std::io::stderr() let mut stderr = std::io::stderr().lock(); let nwritten = stderr.write(buf)?; stderr.flush()?; Ok(nwritten) } } }
pub fn write_all_and_maybe_flush( &mut self, buf: &[u8], ) -> Result<(), AnyError> { // this method exists instead of using a `Write` implementation // so that we can acquire the locks once and do both actions match self.kind { StdFileResourceKind::File => Ok(self.file.write_all(buf)?), StdFileResourceKind::Stdin => { Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into()) } StdFileResourceKind::Stdout => { // bypass the file and use std::io::stdout() let mut stdout = std::io::stdout().lock(); stdout.write_all(buf)?; stdout.flush()?; Ok(()) } StdFileResourceKind::Stderr => { // bypass the file and use std::io::stderr() let mut stderr = std::io::stderr().lock(); stderr.write_all(buf)?; stderr.flush()?; Ok(()) } } }}
impl Read for StdFileResourceInner { fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { match self.kind { StdFileResourceKind::File | StdFileResourceKind::Stdin => { self.file.read(buf) } StdFileResourceKind::Stdout | StdFileResourceKind::Stderr => { Err(ErrorKind::Unsupported.into()) } } }}
struct StdFileResourceCellValue { inner: StdFileResourceInner, meta_data: Arc<Mutex<FileMetadata>>,}
impl StdFileResourceCellValue { pub fn try_clone(&self) -> Result<Self, std::io::Error> { Ok(Self { inner: self.inner.try_clone()?, meta_data: self.meta_data.clone(), }) }}
pub struct StdFileResource { name: String, // We can't use an AsyncRefCell here because we need to allow // access to the resource synchronously at any time and // asynchronously one at a time in order cell: RefCell<Option<StdFileResourceCellValue>>, // Used to keep async actions in order and only allow one // to occur at a time cell_async_task_queue: TaskQueue,}
impl StdFileResource { fn stdio(inner: StdFileResourceInner, name: &str) -> Self { Self { cell: RefCell::new(Some(StdFileResourceCellValue { inner, meta_data: Default::default(), })), cell_async_task_queue: Default::default(), name: name.to_string(), } }
pub fn fs_file(fs_file: StdFile) -> Self { Self { cell: RefCell::new(Some(StdFileResourceCellValue { inner: StdFileResourceInner::file(fs_file), meta_data: Default::default(), })), cell_async_task_queue: Default::default(), name: "fsFile".to_string(), } }
fn with_inner_and_metadata<TResult>( self: Rc<Self>, action: impl FnOnce( &mut StdFileResourceInner, &Arc<Mutex<FileMetadata>>, ) -> Result<TResult, AnyError>, ) -> Result<TResult, AnyError> { match self.cell.try_borrow_mut() { Ok(mut cell) => { let mut file = cell.take().unwrap(); let result = action(&mut file.inner, &file.meta_data); cell.replace(file); result } Err(_) => Err(resource_unavailable()), } }
async fn with_inner_blocking_task<F, R: Send + 'static>( self: Rc<Self>, action: F, ) -> R where F: FnOnce(&mut StdFileResourceInner) -> R + Send + 'static, { // we want to restrict this to one async action at a time let _permit = self.cell_async_task_queue.acquire().await; // we take the value out of the cell, use it on a blocking task, // then put it back into the cell when we're done let mut did_take = false; let mut cell_value = { let mut cell = self.cell.borrow_mut(); match cell.as_mut().unwrap().try_clone() { Ok(value) => value, Err(_) => { did_take = true; cell.take().unwrap() } } }; let (cell_value, result) = tokio::task::spawn_blocking(move || { let result = action(&mut cell_value.inner); (cell_value, result) }) .await .unwrap();
if did_take { // put it back self.cell.borrow_mut().replace(cell_value); }
result }
async fn read_byob( self: Rc<Self>, mut buf: BufMutView, ) -> Result<(usize, BufMutView), AnyError> { self .with_inner_blocking_task(move |inner| { let nread = inner.read(&mut buf)?; Ok((nread, buf)) }) .await }
async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> { let buf = data.to_owned(); self .with_inner_blocking_task(move |inner| inner.write_and_maybe_flush(&buf)) .await }
async fn write_all(self: Rc<Self>, data: &[u8]) -> Result<(), AnyError> { let buf = data.to_owned(); self .with_inner_blocking_task(move |inner| { inner.write_all_and_maybe_flush(&buf) }) .await }
fn with_resource<F, R>( state: &mut OpState, rid: ResourceId, f: F, ) -> Result<R, AnyError> where F: FnOnce(Rc<StdFileResource>) -> Result<R, AnyError>, { let resource = state.resource_table.get::<StdFileResource>(rid)?; f(resource) }
pub fn with_file<F, R>( state: &mut OpState, rid: ResourceId, f: F, ) -> Result<R, AnyError> where F: FnOnce(&mut StdFile) -> Result<R, AnyError>, { Self::with_resource(state, rid, move |resource| { resource.with_inner_and_metadata(move |inner, _| inner.with_file(f)) }) }
pub fn with_file_and_metadata<F, R>( state: &mut OpState, rid: ResourceId, f: F, ) -> Result<R, AnyError> where F: FnOnce(&mut StdFile, &Arc<Mutex<FileMetadata>>) -> Result<R, AnyError>, { Self::with_resource(state, rid, move |resource| { resource.with_inner_and_metadata(move |inner, metadata| { inner.with_file(move |file| f(file, metadata)) }) }) }
pub async fn with_file_blocking_task<F, R: Send + 'static>( state: Rc<RefCell<OpState>>, rid: ResourceId, f: F, ) -> Result<R, AnyError> where F: (FnOnce(&mut StdFile) -> Result<R, AnyError>) + Send + 'static, { let resource = state .borrow_mut() .resource_table .get::<StdFileResource>(rid)?;
resource .with_inner_blocking_task(move |inner| inner.with_file(f)) .await }
pub fn clone_file( state: &mut OpState, rid: ResourceId, ) -> Result<StdFile, AnyError> { Self::with_file(state, rid, move |std_file| { std_file.try_clone().map_err(AnyError::from) }) }
pub fn as_stdio( state: &mut OpState, rid: u32, ) -> Result<std::process::Stdio, AnyError> { Self::with_resource(state, rid, |resource| { resource.with_inner_and_metadata(|inner, _| match inner.kind { StdFileResourceKind::File => { let file = inner.file.try_clone()?; Ok(file.into()) } _ => Ok(std::process::Stdio::inherit()), }) }) }}
impl Resource for StdFileResource { fn name(&self) -> Cow<str> { self.name.as_str().into() }
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<deno_core::BufView> { Box::pin(async move { let vec = vec![0; limit]; let buf = BufMutView::from(vec); let (nread, buf) = self.read_byob(buf).await?; let mut vec = buf.unwrap_vec(); if vec.len() != nread { vec.truncate(nread); } Ok(BufView::from(vec)) }) }
fn read_byob( self: Rc<Self>, buf: deno_core::BufMutView, ) -> AsyncResult<(usize, deno_core::BufMutView)> { Box::pin(self.read_byob(buf)) }
deno_core::impl_writable!(with_all);
#[cfg(unix)] fn backing_fd(self: Rc<Self>) -> Option<std::os::unix::prelude::RawFd> { use std::os::unix::io::AsRawFd; self .with_inner_and_metadata(move |std_file, _| { Ok(std_file.with_file(|f| f.as_raw_fd())) }) .ok() }}
// override op_print to use the stdout and stderr in the resource table#[op]pub fn op_print( state: &mut OpState, msg: String, is_err: bool,) -> Result<(), AnyError> { let rid = if is_err { 2 } else { 1 }; StdFileResource::with_resource(state, rid, move |resource| { resource.with_inner_and_metadata(|inner, _| { inner.write_all_and_maybe_flush(msg.as_bytes())?; Ok(()) }) })}
#[op]fn op_read_sync( state: &mut OpState, rid: ResourceId, mut buf: ZeroCopyBuf,) -> Result<u32, AnyError> { StdFileResource::with_resource(state, rid, move |resource| { resource.with_inner_and_metadata(|inner, _| { inner .read(&mut buf) .map(|n: usize| n as u32) .map_err(AnyError::from) }) })}
#[op]fn op_write_sync( state: &mut OpState, rid: ResourceId, buf: ZeroCopyBuf,) -> Result<u32, AnyError> { StdFileResource::with_resource(state, rid, move |resource| { resource.with_inner_and_metadata(|inner, _| { inner .write_and_maybe_flush(&buf) .map(|nwritten: usize| nwritten as u32) .map_err(AnyError::from) }) })}
Version Info