deno.land / x / deno@v1.28.2 / ext / flash / lib.rs
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
// False positive lint for explicit drops.// https://github.com/rust-lang/rust-clippy/issues/6446#![allow(clippy::await_holding_lock)]// https://github.com/rust-lang/rust-clippy/issues/6353#![allow(clippy::await_holding_refcell_ref)]
use deno_core::error::generic_error;use deno_core::error::type_error;use deno_core::error::AnyError;use deno_core::op;use deno_core::serde_v8;use deno_core::v8;use deno_core::v8::fast_api;use deno_core::ByteString;use deno_core::CancelFuture;use deno_core::CancelHandle;use deno_core::Extension;use deno_core::OpState;use deno_core::StringOrBuffer;use deno_core::ZeroCopyBuf;use deno_core::V8_WRAPPER_OBJECT_INDEX;use deno_tls::load_certs;use deno_tls::load_private_keys;use http::header::HeaderName;use http::header::CONNECTION;use http::header::CONTENT_LENGTH;use http::header::EXPECT;use http::header::TRANSFER_ENCODING;use http::HeaderValue;use log::trace;use mio::net::TcpListener;use mio::Events;use mio::Interest;use mio::Poll;use mio::Token;use mio::Waker;use serde::Deserialize;use serde::Serialize;use socket2::Socket;use std::cell::RefCell;use std::cell::UnsafeCell;use std::collections::HashMap;use std::ffi::c_void;use std::future::Future;use std::intrinsics::transmute;use std::io::BufReader;use std::io::Read;use std::io::Write;use std::marker::PhantomPinned;use std::mem::replace;use std::net::SocketAddr;use std::net::ToSocketAddrs;use std::pin::Pin;use std::rc::Rc;use std::sync::Arc;use std::sync::Mutex;use std::task::Context;use tokio::sync::mpsc;use tokio::sync::oneshot;use tokio::task::JoinHandle;
mod chunked;mod request;#[cfg(unix)]mod sendfile;mod socket;
use request::InnerRequest;use request::Request;use socket::InnerStream;use socket::Stream;
pub struct FlashContext { next_server_id: u32, join_handles: HashMap<u32, JoinHandle<Result<(), AnyError>>>, pub servers: HashMap<u32, ServerContext>,}
impl Drop for FlashContext { fn drop(&mut self) { // Signal each server instance to shutdown. for (_, server) in self.servers.drain() { let _ = server.waker.wake(); } }}
pub struct ServerContext { _addr: SocketAddr, tx: mpsc::Sender<Request>, rx: Option<mpsc::Receiver<Request>>, requests: HashMap<u32, Request>, next_token: u32, listening_rx: Option<mpsc::Receiver<Result<u16, std::io::Error>>>, cancel_handle: Rc<CancelHandle>, waker: Arc<Waker>,}
#[derive(Debug, Eq, PartialEq)]pub enum ParseStatus { None, Ongoing(usize),}
#[op]fn op_flash_respond( op_state: &mut OpState, server_id: u32, token: u32, response: StringOrBuffer, shutdown: bool,) -> u32 { let flash_ctx = op_state.borrow_mut::<FlashContext>(); let ctx = match flash_ctx.servers.get_mut(&server_id) { Some(ctx) => ctx, None => return 0, }; flash_respond(ctx, token, shutdown, &response)}
#[op(fast)]fn op_try_flash_respond_chuncked( op_state: &mut OpState, server_id: u32, token: u32, response: &[u8], shutdown: bool,) -> u32 { let flash_ctx = op_state.borrow_mut::<FlashContext>(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); let tx = ctx.requests.get_mut(&token).unwrap(); let sock = tx.socket();
// TODO(@littledivy): Use writev when `UnixIoSlice` lands. // https://github.com/denoland/deno/pull/15629 let h = format!("{:x}\r\n", response.len());
let concat = [h.as_bytes(), response, b"\r\n"].concat(); let expected = sock.try_write(&concat); if expected != concat.len() { if expected > 2 { return expected as u32; } return expected as u32; }
if shutdown { // Best case: We've written everything and the stream is done too. let _ = ctx.requests.remove(&token).unwrap(); } 0}
#[op]async fn op_flash_respond_async( state: Rc<RefCell<OpState>>, server_id: u32, token: u32, response: StringOrBuffer, shutdown: bool,) -> Result<(), AnyError> { trace!("op_flash_respond_async");
let mut close = false; let sock = { let mut op_state = state.borrow_mut(); let flash_ctx = op_state.borrow_mut::<FlashContext>(); let ctx = match flash_ctx.servers.get_mut(&server_id) { Some(ctx) => ctx, None => return Ok(()), };
match shutdown { true => { let mut tx = ctx.requests.remove(&token).unwrap(); close = !tx.keep_alive; tx.socket() } // In case of a websocket upgrade or streaming response. false => { let tx = ctx.requests.get_mut(&token).unwrap(); tx.socket() } } };
sock .with_async_stream(|stream| { Box::pin(async move { Ok(tokio::io::AsyncWriteExt::write(stream, &response).await?) }) }) .await?; // server is done writing and request doesn't want to kept alive. if shutdown && close { sock.shutdown(); } Ok(())}
#[op]async fn op_flash_respond_chuncked( op_state: Rc<RefCell<OpState>>, server_id: u32, token: u32, response: Option<ZeroCopyBuf>, shutdown: bool, nwritten: u32,) -> Result<(), AnyError> { let mut op_state = op_state.borrow_mut(); let flash_ctx = op_state.borrow_mut::<FlashContext>(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); let sock = match shutdown { true => { let mut tx = ctx.requests.remove(&token).unwrap(); tx.socket() } // In case of a websocket upgrade or streaming response. false => { let tx = ctx.requests.get_mut(&token).unwrap(); tx.socket() } };
drop(op_state); sock .with_async_stream(|stream| { Box::pin(async move { use tokio::io::AsyncWriteExt; // TODO(@littledivy): Use writev when `UnixIoSlice` lands. // https://github.com/denoland/deno/pull/15629 macro_rules! write_whats_not_written { ($e:expr) => { let e = $e; let n = nwritten as usize; if n < e.len() { stream.write_all(&e[n..]).await?; } }; } if let Some(response) = response { let h = format!("{:x}\r\n", response.len()); write_whats_not_written!(h.as_bytes()); write_whats_not_written!(&response); write_whats_not_written!(b"\r\n"); }
// The last chunk if shutdown { write_whats_not_written!(b"0\r\n\r\n"); }
Ok(()) }) }) .await?; Ok(())}
#[op]async fn op_flash_write_resource( op_state: Rc<RefCell<OpState>>, response: StringOrBuffer, server_id: u32, token: u32, resource_id: deno_core::ResourceId, auto_close: bool,) -> Result<(), AnyError> { let (resource, sock) = { let op_state = &mut op_state.borrow_mut(); let resource = if auto_close { op_state.resource_table.take_any(resource_id)? } else { op_state.resource_table.get_any(resource_id)? }; let flash_ctx = op_state.borrow_mut::<FlashContext>(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); (resource, ctx.requests.remove(&token).unwrap().socket()) };
let _ = sock.write(&response);
#[cfg(unix)] { use std::os::unix::io::AsRawFd; if let InnerStream::Tcp(stream_handle) = &sock.inner { let stream_handle = stream_handle.as_raw_fd(); if let Some(fd) = resource.clone().backing_fd() { // SAFETY: all-zero byte-pattern is a valid value for libc::stat. let mut stat: libc::stat = unsafe { std::mem::zeroed() }; // SAFETY: call to libc::fstat. if unsafe { libc::fstat(fd, &mut stat) } >= 0 { let _ = sock.write( format!("Content-Length: {}\r\n\r\n", stat.st_size).as_bytes(), ); let tx = sendfile::SendFile { io: (fd, stream_handle), written: 0, }; tx.await?; return Ok(()); } } } }
sock .with_async_stream(|stream| { Box::pin(async move { use tokio::io::AsyncWriteExt; stream .write_all(b"Transfer-Encoding: chunked\r\n\r\n") .await?; loop { let view = resource.clone().read(64 * 1024).await?; // 64KB if view.is_empty() { stream.write_all(b"0\r\n\r\n").await?; break; } // TODO(@littledivy): use vectored writes. stream .write_all(format!("{:x}\r\n", view.len()).as_bytes()) .await?; stream.write_all(&view).await?; stream.write_all(b"\r\n").await?; } resource.close(); Ok(()) }) }) .await?; Ok(())}
pub struct RespondFast;
impl fast_api::FastFunction for RespondFast { fn function(&self) -> *const c_void { op_flash_respond_fast as *const c_void }
fn args(&self) -> &'static [fast_api::Type] { &[ fast_api::Type::V8Value, fast_api::Type::Uint32, fast_api::Type::TypedArray(fast_api::CType::Uint8), fast_api::Type::Bool, ] }
fn return_type(&self) -> fast_api::CType { fast_api::CType::Uint32 }}
fn flash_respond( ctx: &mut ServerContext, token: u32, shutdown: bool, response: &[u8],) -> u32 { let tx = ctx.requests.get_mut(&token).unwrap(); let sock = tx.socket();
sock.read_tx.take(); sock.read_rx.take();
let nwritten = sock.try_write(response);
if shutdown && nwritten == response.len() { if !tx.keep_alive { sock.shutdown(); } ctx.requests.remove(&token).unwrap(); }
nwritten as u32}
unsafe fn op_flash_respond_fast( recv: v8::Local<v8::Object>, token: u32, response: *const fast_api::FastApiTypedArray<u8>, shutdown: bool,) -> u32 { let ptr = recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX); let ctx = &mut *(ptr as *mut ServerContext);
let response = &*response; if let Some(response) = response.get_storage_if_aligned() { flash_respond(ctx, token, shutdown, response) } else { todo!(); }}
macro_rules! get_request { ($op_state: ident, $token: ident) => { get_request!($op_state, 0, $token) }; ($op_state: ident, $server_id: expr, $token: ident) => {{ let flash_ctx = $op_state.borrow_mut::<FlashContext>(); let ctx = flash_ctx.servers.get_mut(&$server_id).unwrap(); ctx.requests.get_mut(&$token).unwrap() }};}
#[repr(u32)]pub enum Method { GET = 0, HEAD, CONNECT, PUT, DELETE, OPTIONS, TRACE, POST, PATCH,}
#[inline]fn get_method(req: &mut Request) -> u32 { let method = match req.method() { "GET" => Method::GET, "POST" => Method::POST, "PUT" => Method::PUT, "DELETE" => Method::DELETE, "OPTIONS" => Method::OPTIONS, "HEAD" => Method::HEAD, "PATCH" => Method::PATCH, "TRACE" => Method::TRACE, "CONNECT" => Method::CONNECT, _ => Method::GET, }; method as u32}
#[op]fn op_flash_method(state: &mut OpState, server_id: u32, token: u32) -> u32 { let req = get_request!(state, server_id, token); get_method(req)}
#[op]fn op_flash_drive_server( state: &mut OpState, server_id: u32,) -> Result<impl Future<Output = Result<(), AnyError>> + 'static, AnyError> { let join_handle = { let flash_ctx = state.borrow_mut::<FlashContext>(); flash_ctx .join_handles .remove(&server_id) .ok_or_else(|| type_error("server not found"))? }; Ok(async move { join_handle .await .map_err(|_| type_error("server join error"))??; Ok(()) })}
#[op]fn op_flash_close_server(state: &mut OpState, server_id: u32) { let flash_ctx = state.borrow_mut::<FlashContext>(); let ctx = flash_ctx.servers.get(&server_id).unwrap();
// NOTE: We don't drop ServerContext associated with the given `server_id`, // because it may still be in use by some unsettled promise after the flash // thread is finished.
ctx.cancel_handle.cancel(); let _ = ctx.waker.wake();}
#[op]fn op_flash_path( state: Rc<RefCell<OpState>>, server_id: u32, token: u32,) -> String { let mut op_state = state.borrow_mut(); let flash_ctx = op_state.borrow_mut::<FlashContext>(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); ctx .requests .get(&token) .unwrap() .inner .req .path .unwrap() .to_string()}
#[inline]fn next_request_sync(ctx: &mut ServerContext) -> u32 { let offset = ctx.next_token;
while let Ok(token) = ctx.rx.as_mut().unwrap().try_recv() { ctx.requests.insert(ctx.next_token, token); ctx.next_token += 1; }
ctx.next_token - offset}
pub struct NextRequestFast;
impl fast_api::FastFunction for NextRequestFast { fn function(&self) -> *const c_void { op_flash_next_fast as *const c_void }
fn args(&self) -> &'static [fast_api::Type] { &[fast_api::Type::V8Value] }
fn return_type(&self) -> fast_api::CType { fast_api::CType::Uint32 }}
unsafe fn op_flash_next_fast(recv: v8::Local<v8::Object>) -> u32 { let ptr = recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX); let ctx = &mut *(ptr as *mut ServerContext); next_request_sync(ctx)}
pub struct GetMethodFast;
impl fast_api::FastFunction for GetMethodFast { fn function(&self) -> *const c_void { op_flash_get_method_fast as *const c_void }
fn args(&self) -> &'static [fast_api::Type] { &[fast_api::Type::V8Value, fast_api::Type::Uint32] }
fn return_type(&self) -> fast_api::CType { fast_api::CType::Uint32 }}
unsafe fn op_flash_get_method_fast( recv: v8::Local<v8::Object>, token: u32,) -> u32 { let ptr = recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX); let ctx = &mut *(ptr as *mut ServerContext); let req = ctx.requests.get_mut(&token).unwrap(); get_method(req)}
// Fast calls#[op(v8)]fn op_flash_make_request<'scope>( scope: &mut v8::HandleScope<'scope>, state: &mut OpState, server_id: u32,) -> serde_v8::Value<'scope> { let object_template = v8::ObjectTemplate::new(scope); assert!(object_template .set_internal_field_count((V8_WRAPPER_OBJECT_INDEX + 1) as usize)); let obj = object_template.new_instance(scope).unwrap(); let ctx = { let flash_ctx = state.borrow_mut::<FlashContext>(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); ctx as *mut ServerContext }; obj.set_aligned_pointer_in_internal_field(V8_WRAPPER_OBJECT_INDEX, ctx as _);
// nextRequest { let builder = v8::FunctionTemplate::builder( |_: &mut v8::HandleScope, args: v8::FunctionCallbackArguments, mut rv: v8::ReturnValue| { let external: v8::Local<v8::External> = args.data().try_into().unwrap(); // SAFETY: This external is guaranteed to be a pointer to a ServerContext let ctx = unsafe { &mut *(external.value() as *mut ServerContext) }; rv.set_uint32(next_request_sync(ctx)); }, ) .data(v8::External::new(scope, ctx as *mut _).into());
let func = builder.build_fast(scope, &NextRequestFast, None); let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
let key = v8::String::new(scope, "nextRequest").unwrap(); obj.set(scope, key.into(), func).unwrap(); }
// getMethod { let builder = v8::FunctionTemplate::builder( |scope: &mut v8::HandleScope, args: v8::FunctionCallbackArguments, mut rv: v8::ReturnValue| { let external: v8::Local<v8::External> = args.data().try_into().unwrap(); // SAFETY: This external is guaranteed to be a pointer to a ServerContext let ctx = unsafe { &mut *(external.value() as *mut ServerContext) }; let token = args.get(0).uint32_value(scope).unwrap(); let req = ctx.requests.get_mut(&token).unwrap(); rv.set_uint32(get_method(req)); }, ) .data(v8::External::new(scope, ctx as *mut _).into());
let func = builder.build_fast(scope, &GetMethodFast, None); let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
let key = v8::String::new(scope, "getMethod").unwrap(); obj.set(scope, key.into(), func).unwrap(); }
// respond { let builder = v8::FunctionTemplate::builder( |scope: &mut v8::HandleScope, args: v8::FunctionCallbackArguments, mut rv: v8::ReturnValue| { let external: v8::Local<v8::External> = args.data().try_into().unwrap(); // SAFETY: This external is guaranteed to be a pointer to a ServerContext let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
let token = args.get(0).uint32_value(scope).unwrap();
let response: v8::Local<v8::ArrayBufferView> = args.get(1).try_into().unwrap(); let ab = response.buffer(scope).unwrap(); let store = ab.get_backing_store(); let (offset, len) = (response.byte_offset(), response.byte_length()); // SAFETY: v8::SharedRef<v8::BackingStore> is similar to Arc<[u8]>, // it points to a fixed continuous slice of bytes on the heap. // We assume it's initialized and thus safe to read (though may not contain meaningful data) let response = unsafe { &*(&store[offset..offset + len] as *const _ as *const [u8]) };
let shutdown = args.get(2).boolean_value(scope);
rv.set_uint32(flash_respond(ctx, token, shutdown, response)); }, ) .data(v8::External::new(scope, ctx as *mut _).into());
let func = builder.build_fast(scope, &RespondFast, None); let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
let key = v8::String::new(scope, "respond").unwrap(); obj.set(scope, key.into(), func).unwrap(); }
let value: v8::Local<v8::Value> = obj.into(); value.into()}
#[inline]fn has_body_stream(req: &mut Request) -> bool { let sock = req.socket(); sock.read_rx.is_some()}
#[op]fn op_flash_has_body_stream( op_state: &mut OpState, server_id: u32, token: u32,) -> bool { let req = get_request!(op_state, server_id, token); has_body_stream(req)}
#[op]fn op_flash_headers( state: Rc<RefCell<OpState>>, server_id: u32, token: u32,) -> Result<Vec<(ByteString, ByteString)>, AnyError> { let mut op_state = state.borrow_mut(); let flash_ctx = op_state.borrow_mut::<FlashContext>(); let ctx = flash_ctx .servers .get_mut(&server_id) .ok_or_else(|| type_error("server closed"))?; let inner_req = &ctx .requests .get(&token) .ok_or_else(|| type_error("request closed"))? .inner .req; Ok( inner_req .headers .iter() .map(|h| (h.name.as_bytes().into(), h.value.into())) .collect(), )}
// Remember the first packet we read? It probably also has some body data. This op quickly copies it into// a buffer and sets up channels for streaming the rest.#[op]fn op_flash_first_packet( op_state: &mut OpState, server_id: u32, token: u32,) -> Result<Option<ZeroCopyBuf>, AnyError> { let tx = get_request!(op_state, server_id, token); let sock = tx.socket();
if !tx.te_chunked && tx.content_length.is_none() { return Ok(None); }
if tx.expect_continue { let _ = sock.write(b"HTTP/1.1 100 Continue\r\n\r\n"); tx.expect_continue = false; }
let buffer = &tx.inner.buffer[tx.inner.body_offset..tx.inner.body_len]; // Oh there is nothing here. if buffer.is_empty() { return Ok(Some(ZeroCopyBuf::empty())); }
if tx.te_chunked { let mut buf = vec![0; 1024]; let mut offset = 0; let mut decoder = chunked::Decoder::new( std::io::Cursor::new(buffer), tx.remaining_chunk_size, );
loop { match decoder.read(&mut buf[offset..]) { Ok(n) => { tx.remaining_chunk_size = decoder.remaining_chunks_size; offset += n;
if n == 0 { tx.te_chunked = false; buf.truncate(offset); return Ok(Some(buf.into())); }
if offset < buf.len() && decoder.source.position() < buffer.len() as u64 { continue; }
buf.truncate(offset); return Ok(Some(buf.into())); } Err(e) => { return Err(type_error(format!("{}", e))); } } } }
tx.content_length .ok_or_else(|| type_error("no content-length"))?; tx.content_read += buffer.len();
Ok(Some(buffer.to_vec().into()))}
#[op]async fn op_flash_read_body( state: Rc<RefCell<OpState>>, server_id: u32, token: u32, mut buf: ZeroCopyBuf,) -> usize { // SAFETY: we cannot hold op_state borrow across the await point. The JS caller // is responsible for ensuring this is not called concurrently. let ctx = unsafe { { let op_state = &mut state.borrow_mut(); let flash_ctx = op_state.borrow_mut::<FlashContext>(); match flash_ctx.servers.get_mut(&server_id) { Some(ctx) => ctx as *mut ServerContext, None => return 0, } } .as_mut() .unwrap() }; let tx = ctx.requests.get_mut(&token).unwrap();
if tx.te_chunked { let mut decoder = chunked::Decoder::new(tx.socket(), tx.remaining_chunk_size); loop { let sock = tx.socket();
let _lock = sock.read_lock.lock().unwrap(); match decoder.read(&mut buf) { Ok(n) => { tx.remaining_chunk_size = decoder.remaining_chunks_size; return n; } Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => { panic!("chunked read error: {}", e); } Err(_) => { drop(_lock); sock.read_rx.as_mut().unwrap().recv().await.unwrap(); } } } }
if let Some(content_length) = tx.content_length { let sock = tx.socket(); let l = sock.read_lock.clone();
loop { let _lock = l.lock().unwrap(); if tx.content_read >= content_length as usize { return 0; } match sock.read(&mut buf) { Ok(n) => { tx.content_read += n; return n; } _ => { drop(_lock); sock.read_rx.as_mut().unwrap().recv().await.unwrap(); } } } }
0}
// https://github.com/hyperium/hyper/blob/0c8ee93d7f557afc63ca2a5686d19071813ab2b7/src/headers.rs#L67#[inline]fn from_digits(bytes: &[u8]) -> Option<u64> { // cannot use FromStr for u64, since it allows a signed prefix let mut result = 0u64; const RADIX: u64 = 10; if bytes.is_empty() { return None; } for &b in bytes { // can't use char::to_digit, since we haven't verified these bytes // are utf-8. match b { b'0'..=b'9' => { result = result.checked_mul(RADIX)?; result = result.checked_add((b - b'0') as u64)?; } _ => { return None; } } } Some(result)}
#[inline]fn connection_has(value: &HeaderValue, needle: &str) -> bool { if let Ok(s) = value.to_str() { for val in s.split(',') { if val.trim().eq_ignore_ascii_case(needle) { return true; } } } false}
#[derive(Serialize, Deserialize)]#[serde(rename_all = "camelCase")]pub struct ListenOpts { cert: Option<String>, key: Option<String>, hostname: String, port: u16, reuseport: bool,}
const SERVER_TOKEN: Token = Token(0);// Token reserved for the thread close signal.const WAKER_TOKEN: Token = Token(1);
#[allow(clippy::too_many_arguments)]fn run_server( tx: mpsc::Sender<Request>, listening_tx: mpsc::Sender<Result<u16, std::io::Error>>, addr: SocketAddr, maybe_cert: Option<String>, maybe_key: Option<String>, reuseport: bool, mut poll: Poll, // We put a waker as an unused argument here as it needs to be alive both in // the flash thread and in the main thread (otherwise the notification would // not be caught by the event loop on Linux). // See the comment in mio's example: // https://docs.rs/mio/0.8.4/x86_64-unknown-linux-gnu/mio/struct.Waker.html#examples _waker: Arc<Waker>,) -> Result<(), AnyError> { let mut listener = match listen(addr, reuseport) { Ok(listener) => listener, Err(e) => { listening_tx.blocking_send(Err(e)).unwrap(); return Err(generic_error( "failed to start listening on the specified address", )); } };
// Register server. poll .registry() .register(&mut listener, SERVER_TOKEN, Interest::READABLE) .unwrap();
let tls_context: Option<Arc<rustls::ServerConfig>> = { if let Some(cert) = maybe_cert { let key = maybe_key.unwrap(); let certificate_chain: Vec<rustls::Certificate> = load_certs(&mut BufReader::new(cert.as_bytes()))?; let private_key = load_private_keys(key.as_bytes())?.remove(0);
let config = rustls::ServerConfig::builder() .with_safe_defaults() .with_no_client_auth() .with_single_cert(certificate_chain, private_key) .expect("invalid key or certificate"); Some(Arc::new(config)) } else { None } };
listening_tx .blocking_send(Ok(listener.local_addr().unwrap().port())) .unwrap(); let mut sockets = HashMap::with_capacity(1000); let mut socket_senders = HashMap::with_capacity(1000); let mut counter: usize = 2; let mut events = Events::with_capacity(1024); 'outer: loop { match poll.poll(&mut events, None) { Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, Err(e) => panic!("{}", e), Ok(()) => (), } 'events: for event in &events { let token = event.token(); match token { WAKER_TOKEN => { break 'outer; } SERVER_TOKEN => loop { match listener.accept() { Ok((mut socket, _)) => { counter += 1; let token = Token(counter); poll .registry() .register(&mut socket, token, Interest::READABLE) .unwrap();
let socket = match tls_context { Some(ref tls_conf) => { let connection = rustls::ServerConnection::new(tls_conf.clone()).unwrap(); InnerStream::Tls(Box::new(rustls::StreamOwned::new( connection, socket, ))) } None => InnerStream::Tcp(socket), }; let stream = Box::pin(Stream { inner: socket, detached: false, read_rx: None, read_tx: None, read_lock: Arc::new(Mutex::new(())), parse_done: ParseStatus::None, buffer: UnsafeCell::new(vec![0_u8; 1024]), _pinned: PhantomPinned, });
trace!("New connection: {}", token.0); sockets.insert(token, stream); } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break, Err(_) => break, } }, token => { let socket = sockets.get_mut(&token).unwrap(); // SAFETY: guarantee that we will never move the data out of the mutable reference. let socket = unsafe { let mut_ref: Pin<&mut Stream> = Pin::as_mut(socket); Pin::get_unchecked_mut(mut_ref) };
if socket.detached { match &mut socket.inner { InnerStream::Tcp(ref mut socket) => { poll.registry().deregister(socket).unwrap(); } InnerStream::Tls(_) => { todo!("upgrade tls not implemented"); } }
let boxed = sockets.remove(&token).unwrap(); std::mem::forget(boxed); socket_senders.remove(&token); trace!("Socket detached: {}", token.0); continue; }
debug_assert!(event.is_readable());
trace!("Socket readable: {}", token.0); if let Some(tx) = &socket.read_tx { { let _l = socket.read_lock.lock().unwrap(); } trace!("Sending readiness notification: {}", token.0); let _ = tx.blocking_send(());
continue; }
let mut headers = vec![httparse::EMPTY_HEADER; 40]; let mut req = httparse::Request::new(&mut headers); let body_offset; let body_len; loop { // SAFETY: It is safe for the read buf to be mutable here. let buffer = unsafe { &mut *socket.buffer.get() }; let offset = match socket.parse_done { ParseStatus::None => 0, ParseStatus::Ongoing(offset) => offset, }; if offset >= buffer.len() { buffer.resize(offset * 2, 0); } let nread = socket.read(&mut buffer[offset..]);
match nread { Ok(0) => { trace!("Socket closed: {}", token.0); // FIXME: don't remove while JS is writing! // sockets.remove(&token); continue 'events; } Ok(read) => { match req.parse(&buffer[..offset + read]) { Ok(httparse::Status::Complete(n)) => { body_offset = n; body_len = offset + read; socket.parse_done = ParseStatus::None; // On Windows, We must keep calling socket.read() until it fails with WouldBlock. // // Mio tries to emulate edge triggered events on Windows. // AFAICT it only rearms the event on WouldBlock, but it doesn't when a partial read happens. // https://github.com/denoland/deno/issues/15549 #[cfg(target_os = "windows")] match &mut socket.inner { InnerStream::Tcp(ref mut socket) => { poll .registry() .reregister(socket, token, Interest::READABLE) .unwrap(); } InnerStream::Tls(ref mut socket) => { poll .registry() .reregister( &mut socket.sock, token, Interest::READABLE, ) .unwrap(); } }; break; } Ok(httparse::Status::Partial) => { socket.parse_done = ParseStatus::Ongoing(offset + read); continue; } Err(_) => { let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n"); continue 'events; } } } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { break 'events } Err(_) => break 'events, } }
debug_assert_eq!(socket.parse_done, ParseStatus::None); if let Some(method) = &req.method { if method == &"POST" || method == &"PUT" { let (tx, rx) = mpsc::channel(100); socket.read_tx = Some(tx); socket.read_rx = Some(rx); } }
// SAFETY: It is safe for the read buf to be mutable here. let buffer = unsafe { &mut *socket.buffer.get() }; let inner_req = InnerRequest { // SAFETY: backing buffer is pinned and lives as long as the request. req: unsafe { transmute::<httparse::Request<'_, '_>, _>(req) }, // SAFETY: backing buffer is pinned and lives as long as the request. _headers: unsafe { transmute::<Vec<httparse::Header<'_>>, _>(headers) }, buffer: Pin::new( replace(buffer, vec![0_u8; 1024]).into_boxed_slice(), ), body_offset, body_len, }; // h1 // https://github.com/tiny-http/tiny-http/blob/master/src/client.rs#L177 // https://github.com/hyperium/hyper/blob/4545c3ef191ce9b5f5d250ee27c4c96f9b71d2c6/src/proto/h1/role.rs#L127 let mut keep_alive = inner_req.req.version.unwrap() == 1; let mut expect_continue = false; let mut te = false; let mut te_chunked = false; let mut content_length = None; for header in inner_req.req.headers.iter() { match HeaderName::from_bytes(header.name.as_bytes()) { Ok(CONNECTION) => { // SAFETY: illegal bytes are validated by httparse. let value = unsafe { HeaderValue::from_maybe_shared_unchecked(header.value) }; if keep_alive { // 1.1 keep_alive = !connection_has(&value, "close"); } else { // 1.0 keep_alive = connection_has(&value, "keep-alive"); } } Ok(TRANSFER_ENCODING) => { // https://tools.ietf.org/html/rfc7230#section-3.3.3 debug_assert!(inner_req.req.version.unwrap() == 1); // Two states for Transfer-Encoding because we want to make sure Content-Length handling knows it. te = true; content_length = None; // SAFETY: illegal bytes are validated by httparse. let value = unsafe { HeaderValue::from_maybe_shared_unchecked(header.value) }; if let Ok(Some(encoding)) = value.to_str().map(|s| s.rsplit(',').next()) { // Chunked must always be the last encoding if encoding.trim().eq_ignore_ascii_case("chunked") { te_chunked = true; } } } // Transfer-Encoding overrides the Content-Length. Ok(CONTENT_LENGTH) if !te => { if let Some(len) = from_digits(header.value) { if let Some(prev) = content_length { if prev != len { let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n"); continue 'events; } continue; } content_length = Some(len); } else { let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n"); continue 'events; } } Ok(EXPECT) if inner_req.req.version.unwrap() != 0 => { expect_continue = header.value.eq_ignore_ascii_case(b"100-continue"); } _ => {} } }
// There is Transfer-Encoding but its not chunked. if te && !te_chunked { let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n"); continue 'events; }
let (socket_tx, socket_rx) = oneshot::channel();
tx.blocking_send(Request { socket: socket as *mut _, // SAFETY: headers backing buffer outlives the mio event loop ('static) inner: inner_req, keep_alive, te_chunked, remaining_chunk_size: None, content_read: 0, content_length, expect_continue, socket_rx, owned_socket: None, }) .ok();
socket_senders.insert(token, socket_tx); } } } }
// Now the flash thread is about to finish, but there may be some unsettled // promises in the main thread that will use the socket. To make the socket // alive longer enough, we move its ownership to the main thread. for (tok, socket) in sockets { if let Some(sender) = socket_senders.remove(&tok) { // Do nothing if the receiver has already been dropped. _ = sender.send(socket); } }
Ok(())}
#[inline]fn listen( addr: SocketAddr, reuseport: bool,) -> Result<TcpListener, std::io::Error> { let domain = if addr.is_ipv4() { socket2::Domain::IPV4 } else { socket2::Domain::IPV6 }; let socket = Socket::new(domain, socket2::Type::STREAM, None)?;
#[cfg(not(windows))] socket.set_reuse_address(true)?; if reuseport { #[cfg(target_os = "linux")] socket.set_reuse_port(true)?; }
let socket_addr = socket2::SockAddr::from(addr); socket.bind(&socket_addr)?; socket.listen(128)?; socket.set_nonblocking(true)?; let std_listener: std::net::TcpListener = socket.into(); Ok(TcpListener::from_std(std_listener))}
fn make_addr_port_pair(hostname: &str, port: u16) -> (&str, u16) { // Default to localhost if given just the port. Example: ":80" if hostname.is_empty() { return ("0.0.0.0", port); }
// If this looks like an ipv6 IP address. Example: "[2001:db8::1]" // Then we remove the brackets. let addr = hostname.trim_start_matches('[').trim_end_matches(']'); (addr, port)}
/// Resolve network address *synchronously*.pub fn resolve_addr_sync( hostname: &str, port: u16,) -> Result<impl Iterator<Item = SocketAddr>, AnyError> { let addr_port_pair = make_addr_port_pair(hostname, port); let result = addr_port_pair.to_socket_addrs()?; Ok(result)}
fn flash_serve<P>( state: &mut OpState, opts: ListenOpts,) -> Result<u32, AnyError>where P: FlashPermissions + 'static,{ state .borrow_mut::<P>() .check_net(&(&opts.hostname, Some(opts.port)), "Deno.serve()")?;
let addr = resolve_addr_sync(&opts.hostname, opts.port)? .next() .ok_or_else(|| generic_error("No resolved address found"))?; let (tx, rx) = mpsc::channel(100); let (listening_tx, listening_rx) = mpsc::channel(1);
let poll = Poll::new()?; let waker = Arc::new(Waker::new(poll.registry(), WAKER_TOKEN).unwrap()); let ctx = ServerContext { _addr: addr, tx, rx: Some(rx), requests: HashMap::with_capacity(1000), next_token: 0, listening_rx: Some(listening_rx), cancel_handle: CancelHandle::new_rc(), waker: waker.clone(), }; let tx = ctx.tx.clone(); let maybe_cert = opts.cert; let maybe_key = opts.key; let reuseport = opts.reuseport; let join_handle = tokio::task::spawn_blocking(move || { run_server( tx, listening_tx, addr, maybe_cert, maybe_key, reuseport, poll, waker, ) }); let flash_ctx = state.borrow_mut::<FlashContext>(); let server_id = flash_ctx.next_server_id; flash_ctx.next_server_id += 1; flash_ctx.join_handles.insert(server_id, join_handle); flash_ctx.servers.insert(server_id, ctx); Ok(server_id)}
#[op]fn op_flash_serve<P>( state: &mut OpState, opts: ListenOpts,) -> Result<u32, AnyError>where P: FlashPermissions + 'static,{ check_unstable(state, "Deno.serve"); flash_serve::<P>(state, opts)}
#[op]fn op_node_unstable_flash_serve<P>( state: &mut OpState, opts: ListenOpts,) -> Result<u32, AnyError>where P: FlashPermissions + 'static,{ flash_serve::<P>(state, opts)}
#[op]async fn op_flash_wait_for_listening( state: Rc<RefCell<OpState>>, server_id: u32,) -> Result<u16, AnyError> { let mut listening_rx = { let mut op_state = state.borrow_mut(); let flash_ctx = op_state.borrow_mut::<FlashContext>(); let server_ctx = flash_ctx .servers .get_mut(&server_id) .ok_or_else(|| type_error("server not found"))?; server_ctx.listening_rx.take().unwrap() }; match listening_rx.recv().await { Some(Ok(port)) => Ok(port), Some(Err(e)) => Err(e.into()), _ => Err(generic_error( "unknown error occurred while waiting for listening", )), }}
// Asychronous version of op_flash_next. This can be a bottleneck under// heavy load, it should be used as a fallback if there are no buffered// requests i.e `op_flash_next() == 0`.#[op]async fn op_flash_next_async( state: Rc<RefCell<OpState>>, server_id: u32,) -> u32 { let mut op_state = state.borrow_mut(); let flash_ctx = op_state.borrow_mut::<FlashContext>(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); let cancel_handle = ctx.cancel_handle.clone(); let mut rx = ctx.rx.take().unwrap(); // We need to drop the borrow before await point. drop(op_state);
if let Ok(Some(req)) = rx.recv().or_cancel(&cancel_handle).await { let mut op_state = state.borrow_mut(); let flash_ctx = op_state.borrow_mut::<FlashContext>(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); ctx.requests.insert(ctx.next_token, req); ctx.next_token += 1; // Set the rx back. ctx.rx = Some(rx); return 1; }
// Set the rx back. let mut op_state = state.borrow_mut(); let flash_ctx = op_state.borrow_mut::<FlashContext>(); if let Some(ctx) = flash_ctx.servers.get_mut(&server_id) { ctx.rx = Some(rx); } 0}
// Synchronous version of op_flash_next_async. Under heavy load,// this can collect buffered requests from rx channel and return tokens in a single batch.//// perf: please do not add any arguments to this op. With optimizations enabled,// the ContextScope creation is optimized away and the op is as simple as:// f(info: *const v8::FunctionCallbackInfo) { let rv = ...; rv.set_uint32(op_flash_next()); }#[op]fn op_flash_next(state: &mut OpState) -> u32 { let flash_ctx = state.borrow_mut::<FlashContext>(); let ctx = flash_ctx.servers.get_mut(&0).unwrap(); next_request_sync(ctx)}
// Syncrhonous version of op_flash_next_async. Under heavy load,// this can collect buffered requests from rx channel and return tokens in a single batch.#[op]fn op_flash_next_server(state: &mut OpState, server_id: u32) -> u32 { let flash_ctx = state.borrow_mut::<FlashContext>(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); next_request_sync(ctx)}
// Wrapper type for tokio::net::TcpStream that implements// deno_websocket::UpgradedStreamstruct UpgradedStream(tokio::net::TcpStream);impl tokio::io::AsyncRead for UpgradedStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context, buf: &mut tokio::io::ReadBuf, ) -> std::task::Poll<std::result::Result<(), std::io::Error>> { Pin::new(&mut self.get_mut().0).poll_read(cx, buf) }}
impl tokio::io::AsyncWrite for UpgradedStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context, buf: &[u8], ) -> std::task::Poll<Result<usize, std::io::Error>> { Pin::new(&mut self.get_mut().0).poll_write(cx, buf) } fn poll_flush( self: Pin<&mut Self>, cx: &mut Context, ) -> std::task::Poll<Result<(), std::io::Error>> { Pin::new(&mut self.get_mut().0).poll_flush(cx) } fn poll_shutdown( self: Pin<&mut Self>, cx: &mut Context, ) -> std::task::Poll<Result<(), std::io::Error>> { Pin::new(&mut self.get_mut().0).poll_shutdown(cx) }}
impl deno_websocket::Upgraded for UpgradedStream {}
#[inline]pub fn detach_socket( ctx: &mut ServerContext, token: u32,) -> Result<tokio::net::TcpStream, AnyError> { // Two main 'hacks' to get this working: // * make server thread forget about the socket. `detach_ownership` prevents the socket from being // dropped on the server thread. // * conversion from mio::net::TcpStream -> tokio::net::TcpStream. There is no public API so we // use raw fds. let mut tx = ctx .requests .remove(&token) .ok_or_else(|| type_error("request closed"))?; let stream = tx.socket(); // prevent socket from being dropped on server thread. // TODO(@littledivy): Box-ify, since there is no overhead. stream.detach_ownership();
#[cfg(unix)] let std_stream = { use std::os::unix::prelude::AsRawFd; use std::os::unix::prelude::FromRawFd; let fd = match stream.inner { InnerStream::Tcp(ref tcp) => tcp.as_raw_fd(), _ => todo!(), }; // SAFETY: `fd` is a valid file descriptor. unsafe { std::net::TcpStream::from_raw_fd(fd) } }; #[cfg(windows)] let std_stream = { use std::os::windows::prelude::AsRawSocket; use std::os::windows::prelude::FromRawSocket; let fd = match stream.inner { InnerStream::Tcp(ref tcp) => tcp.as_raw_socket(), _ => todo!(), }; // SAFETY: `fd` is a valid file descriptor. unsafe { std::net::TcpStream::from_raw_socket(fd) } }; let stream = tokio::net::TcpStream::from_std(std_stream)?; Ok(stream)}
#[op]async fn op_flash_upgrade_websocket( state: Rc<RefCell<OpState>>, server_id: u32, token: u32,) -> Result<deno_core::ResourceId, AnyError> { let stream = { let op_state = &mut state.borrow_mut(); let flash_ctx = op_state.borrow_mut::<FlashContext>(); detach_socket(flash_ctx.servers.get_mut(&server_id).unwrap(), token)? }; deno_websocket::ws_create_server_stream( &state, Box::pin(UpgradedStream(stream)), ) .await}
pub struct Unstable(pub bool);
fn check_unstable(state: &OpState, api_name: &str) { let unstable = state.borrow::<Unstable>();
if !unstable.0 { eprintln!( "Unstable API '{}'. The --unstable flag must be provided.", api_name ); std::process::exit(70); }}
pub trait FlashPermissions { fn check_net<T: AsRef<str>>( &mut self, _host: &(T, Option<u16>), _api_name: &str, ) -> Result<(), AnyError>;}
pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension { Extension::builder() .js(deno_core::include_js_files!( prefix "deno:ext/flash", "01_http.js", )) .ops(vec![ op_flash_serve::decl::<P>(), op_node_unstable_flash_serve::decl::<P>(), op_flash_respond::decl(), op_flash_respond_async::decl(), op_flash_respond_chuncked::decl(), op_flash_method::decl(), op_flash_path::decl(), op_flash_headers::decl(), op_flash_next::decl(), op_flash_next_server::decl(), op_flash_next_async::decl(), op_flash_read_body::decl(), op_flash_upgrade_websocket::decl(), op_flash_wait_for_listening::decl(), op_flash_first_packet::decl(), op_flash_has_body_stream::decl(), op_flash_close_server::decl(), op_flash_drive_server::decl(), op_flash_make_request::decl(), op_flash_write_resource::decl(), op_try_flash_respond_chuncked::decl(), ]) .state(move |op_state| { op_state.put(Unstable(unstable)); op_state.put(FlashContext { next_server_id: 0, join_handles: HashMap::default(), servers: HashMap::default(), }); Ok(()) }) .build()}
Version Info