deno.land / x / wasm@wasmer-sdk-v0.6.0 / src / net.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc,};
use tokio::sync::mpsc;use virtual_net::{meta::MessageRequest, RemoteNetworkingClient};use wasm_bindgen_futures::JsFuture;
use crate::{utils::GlobalScope, ws::WebSocket};
pub(crate) fn connect_networking(connect: String) -> RemoteNetworkingClient { let (recv_tx, recv_rx) = mpsc::channel(100); let (send_tx, send_rx) = mpsc::channel(100); let send_tx2 = send_tx.clone();
let (client, driver) = virtual_net::RemoteNetworkingClient::new_from_mpsc(send_tx, recv_rx); wasm_bindgen_futures::spawn_local(driver);
let send_rx = Arc::new(tokio::sync::Mutex::new(send_rx));
wasm_bindgen_futures::spawn_local(async move { let backoff = Arc::new(AtomicUsize::new(0)); loop { // Exponential backoff prevents thrashing of the connection let backoff_ms = backoff.load(Ordering::SeqCst); if backoff_ms > 0 { let promise = GlobalScope::current().sleep(backoff_ms as i32); JsFuture::from(promise).await.ok(); } let new_backoff = 8000usize.min((backoff_ms * 2) + 100); backoff.store(new_backoff, Ordering::SeqCst);
// Establish a websocket connection to the edge network let mut ws = match WebSocket::new(connect.as_str()) { Ok(ws) => ws, Err(err) => { tracing::error!("failed to establish web socket connection - {}", err); continue; } };
// Wire up the events let (relay_tx, mut relay_rx) = mpsc::unbounded_channel(); let (connected_tx, mut connected_rx) = mpsc::unbounded_channel(); ws.set_onopen({ let connect = connect.clone(); let connected_tx = connected_tx.clone(); Box::new(move || { tracing::debug!(url = connect, "networking web-socket opened"); connected_tx.send(true).ok(); }) }); ws.set_onclose({ let connect = connect.clone();
let connected_tx = connected_tx.clone(); let relay_tx = relay_tx.clone(); Box::new(move || { tracing::debug!(url = connect, "networking web-socket closed"); relay_tx.send(Vec::new()).ok(); connected_tx.send(false).ok(); }) }); ws.set_onmessage({ Box::new(move |data| { relay_tx.send(data).unwrap(); }) });
// Wait for it to connect and setup the rest of the callbacks if !connected_rx.recv().await.unwrap_or_default() { continue; } backoff.store(100, Ordering::SeqCst);
// We process any backends wasm_bindgen_futures::spawn_local({ let send_tx2 = send_tx2.clone(); let recv_tx = recv_tx.clone(); async move { while let Some(data) = relay_rx.recv().await { if data.is_empty() { break; } let data = match bincode::deserialize(&data) { Ok(d) => d, Err(err) => { tracing::error!( "failed to deserialize networking message - {}", err ); break; } }; if recv_tx.send(data).await.is_err() { break; } } send_tx2.try_send(MessageRequest::Reconnect).ok(); } });
while let Some(data) = send_rx.lock().await.recv().await { if let MessageRequest::Reconnect = &data { tracing::info!("websocket will reconnect"); break; } let data = match bincode::serialize(&data) { Ok(d) => d, Err(err) => { tracing::error!("failed to serialize networking message - {}", err); break; } }; if let Err(err) = ws.send(data) { tracing::error!("websocket has failed - {}", err); break; } } } }); client}
wasm

Version Info

Tagged at
4 months ago