deno.land / x / deno@v1.28.2 / ext / flash / socket.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
use deno_core::error::AnyError;use mio::net::TcpStream;use std::cell::UnsafeCell;use std::future::Future;use std::io::Read;use std::io::Write;use std::marker::PhantomPinned;use std::pin::Pin;use std::sync::Arc;use std::sync::Mutex;use tokio::sync::mpsc;
use crate::ParseStatus;
type TlsTcpStream = rustls::StreamOwned<rustls::ServerConnection, TcpStream>;
#[derive(Debug)]pub enum InnerStream { Tcp(TcpStream), Tls(Box<TlsTcpStream>),}
#[derive(Debug)]pub struct Stream { pub inner: InnerStream, pub detached: bool, pub read_rx: Option<mpsc::Receiver<()>>, pub read_tx: Option<mpsc::Sender<()>>, pub parse_done: ParseStatus, pub buffer: UnsafeCell<Vec<u8>>, pub read_lock: Arc<Mutex<()>>, pub _pinned: PhantomPinned,}
impl Stream { pub fn detach_ownership(&mut self) { self.detached = true; }
/// Try to write to the socket. #[inline] pub fn try_write(&mut self, buf: &[u8]) -> usize { let mut nwritten = 0; while nwritten < buf.len() { match self.write(&buf[nwritten..]) { Ok(n) => nwritten += n, Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { break; } Err(e) => { log::trace!("Error writing to socket: {}", e); break; } } } nwritten }
#[inline] pub fn shutdown(&mut self) { match &mut self.inner { InnerStream::Tcp(stream) => { // Typically shutdown shouldn't fail. let _ = stream.shutdown(std::net::Shutdown::Both); } InnerStream::Tls(stream) => { let _ = stream.sock.shutdown(std::net::Shutdown::Both); } } }
pub fn as_std(&mut self) -> std::net::TcpStream { #[cfg(unix)] let std_stream = { use std::os::unix::prelude::AsRawFd; use std::os::unix::prelude::FromRawFd; let fd = match self.inner { InnerStream::Tcp(ref tcp) => tcp.as_raw_fd(), _ => todo!(), }; // SAFETY: `fd` is a valid file descriptor. unsafe { std::net::TcpStream::from_raw_fd(fd) } }; #[cfg(windows)] let std_stream = { use std::os::windows::prelude::AsRawSocket; use std::os::windows::prelude::FromRawSocket; let fd = match self.inner { InnerStream::Tcp(ref tcp) => tcp.as_raw_socket(), _ => todo!(), }; // SAFETY: `fd` is a valid file descriptor. unsafe { std::net::TcpStream::from_raw_socket(fd) } }; std_stream }
#[inline] pub async fn with_async_stream<F, T>(&mut self, f: F) -> Result<T, AnyError> where F: FnOnce( &mut tokio::net::TcpStream, ) -> Pin<Box<dyn '_ + Future<Output = Result<T, AnyError>>>>, { let mut async_stream = tokio::net::TcpStream::from_std(self.as_std())?; let result = f(&mut async_stream).await?; forget_stream(async_stream.into_std()?); Ok(result) }}
#[inline]pub fn forget_stream(stream: std::net::TcpStream) { #[cfg(unix)] { use std::os::unix::prelude::IntoRawFd; let _ = stream.into_raw_fd(); } #[cfg(windows)] { use std::os::windows::prelude::IntoRawSocket; let _ = stream.into_raw_socket(); }}
impl Write for Stream { #[inline] fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { match self.inner { InnerStream::Tcp(ref mut stream) => stream.write(buf), InnerStream::Tls(ref mut stream) => stream.write(buf), } } #[inline] fn flush(&mut self) -> std::io::Result<()> { match self.inner { InnerStream::Tcp(ref mut stream) => stream.flush(), InnerStream::Tls(ref mut stream) => stream.flush(), } }}
impl Read for Stream { #[inline] fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { match self.inner { InnerStream::Tcp(ref mut stream) => stream.read(buf), InnerStream::Tls(ref mut stream) => stream.read(buf), } }}
deno

Version Info

Tagged at
a year ago