Skip to content

Commit 3e7606d

Browse files
authored
feat(ws): add methods taking in a tungstenite config (gakonst#2373)
* feat(ws): add methods taking in a tungstenite config Signed-off-by: Francesco <[email protected]> * fix(ws): change args order in connect_with_config Signed-off-by: Francesco <[email protected]> * fix(ws): use config in reconnect if present Signed-off-by: Francesco <[email protected]> * fix(ws): fix wasm build Signed-off-by: Francesco <[email protected]> * fix(ws): make code more dry Signed-off-by: Francesco <[email protected]> --------- Signed-off-by: Francesco <[email protected]>
1 parent 4b6fc29 commit 3e7606d

File tree

4 files changed

+134
-6
lines changed

4 files changed

+134
-6
lines changed

ethers-providers/src/rpc/transports/ws/backend.rs

+9
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,15 @@ impl WsBackend {
6969
Ok(Self::new(ws))
7070
}
7171

72+
#[cfg(not(target_arch = "wasm32"))]
73+
pub async fn connect_with_config(
74+
details: ConnectionDetails,
75+
config: WebSocketConfig,
76+
) -> Result<(Self, BackendDriver), WsClientError> {
77+
let ws = connect_async_with_config(details, Some(config)).await?.0.fuse();
78+
Ok(Self::new(ws))
79+
}
80+
7281
pub fn new(server: InternalStream) -> (Self, BackendDriver) {
7382
let (handler, to_handle) = mpsc::unbounded();
7483
let (dispatcher, to_dispatch) = mpsc::unbounded();

ethers-providers/src/rpc/transports/ws/manager.rs

+93-5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#[cfg(not(target_arch = "wasm32"))]
2+
use super::WebSocketConfig;
13
use super::{
24
backend::{BackendDriver, WsBackend},
35
ActiveSub, ConnectionDetails, InFlight, Instruction, Notification, PubSubItem, Response, SubId,
@@ -196,6 +198,9 @@ pub struct RequestManager {
196198
backend: BackendDriver,
197199
// The URL and optional auth info for the connection
198200
conn: ConnectionDetails,
201+
#[cfg(not(target_arch = "wasm32"))]
202+
// An Option wrapping a tungstenite WebsocketConfig. If None, the default config is used.
203+
config: Option<WebSocketConfig>,
199204
// Instructions from the user-facing providers
200205
instructions: mpsc::UnboundedReceiver<Instruction>,
201206
}
@@ -209,16 +214,84 @@ impl RequestManager {
209214
Self::connect_with_reconnects(conn, DEFAULT_RECONNECTS).await
210215
}
211216

217+
async fn connect_internal(
218+
conn: ConnectionDetails,
219+
) -> Result<
220+
(
221+
BackendDriver,
222+
(mpsc::UnboundedSender<Instruction>, mpsc::UnboundedReceiver<Instruction>),
223+
SharedChannelMap,
224+
),
225+
WsClientError,
226+
> {
227+
let (ws, backend) = WsBackend::connect(conn).await?;
228+
229+
ws.spawn();
230+
231+
Ok((backend, mpsc::unbounded(), Default::default()))
232+
}
233+
234+
#[cfg(target_arch = "wasm32")]
212235
pub async fn connect_with_reconnects(
213236
conn: ConnectionDetails,
214237
reconnects: usize,
215238
) -> Result<(Self, WsClient), WsClientError> {
216-
let (ws, backend) = WsBackend::connect(conn.clone()).await?;
239+
let (backend, (instructions_tx, instructions_rx), channel_map) =
240+
Self::connect_internal(conn.clone()).await?;
241+
242+
Ok((
243+
Self {
244+
id: Default::default(),
245+
reconnects,
246+
subs: SubscriptionManager::new(channel_map.clone()),
247+
reqs: Default::default(),
248+
backend,
249+
conn,
250+
instructions: instructions_rx,
251+
},
252+
WsClient { instructions: instructions_tx, channel_map },
253+
))
254+
}
217255

218-
let (instructions_tx, instructions_rx) = mpsc::unbounded();
219-
let channel_map: SharedChannelMap = Default::default();
256+
#[cfg(not(target_arch = "wasm32"))]
257+
pub async fn connect_with_reconnects(
258+
conn: ConnectionDetails,
259+
reconnects: usize,
260+
) -> Result<(Self, WsClient), WsClientError> {
261+
let (backend, (instructions_tx, instructions_rx), channel_map) =
262+
Self::connect_internal(conn.clone()).await?;
220263

221-
ws.spawn();
264+
Ok((
265+
Self {
266+
id: Default::default(),
267+
reconnects,
268+
subs: SubscriptionManager::new(channel_map.clone()),
269+
reqs: Default::default(),
270+
backend,
271+
conn,
272+
config: None,
273+
instructions: instructions_rx,
274+
},
275+
WsClient { instructions: instructions_tx, channel_map },
276+
))
277+
}
278+
279+
#[cfg(not(target_arch = "wasm32"))]
280+
pub async fn connect_with_config(
281+
conn: ConnectionDetails,
282+
config: WebSocketConfig,
283+
) -> Result<(Self, WsClient), WsClientError> {
284+
Self::connect_with_config_and_reconnects(conn, config, DEFAULT_RECONNECTS).await
285+
}
286+
287+
#[cfg(not(target_arch = "wasm32"))]
288+
pub async fn connect_with_config_and_reconnects(
289+
conn: ConnectionDetails,
290+
config: WebSocketConfig,
291+
reconnects: usize,
292+
) -> Result<(Self, WsClient), WsClientError> {
293+
let (backend, (instructions_tx, instructions_rx), channel_map) =
294+
Self::connect_internal(conn.clone()).await?;
222295

223296
Ok((
224297
Self {
@@ -228,12 +301,27 @@ impl RequestManager {
228301
reqs: Default::default(),
229302
backend,
230303
conn,
304+
config: Some(config),
231305
instructions: instructions_rx,
232306
},
233307
WsClient { instructions: instructions_tx, channel_map },
234308
))
235309
}
236310

311+
#[cfg(target_arch = "wasm32")]
312+
async fn reconnect_backend(&mut self) -> Result<(WsBackend, BackendDriver), WsClientError> {
313+
WsBackend::connect(self.conn.clone()).await
314+
}
315+
316+
#[cfg(not(target_arch = "wasm32"))]
317+
async fn reconnect_backend(&mut self) -> Result<(WsBackend, BackendDriver), WsClientError> {
318+
if let Some(config) = self.config {
319+
WsBackend::connect_with_config(self.conn.clone(), config).await
320+
} else {
321+
WsBackend::connect(self.conn.clone()).await
322+
}
323+
}
324+
237325
async fn reconnect(&mut self) -> Result<(), WsClientError> {
238326
if self.reconnects == 0 {
239327
return Err(WsClientError::TooManyReconnects)
@@ -242,7 +330,7 @@ impl RequestManager {
242330

243331
tracing::info!(remaining = self.reconnects, url = self.conn.url, "Reconnecting to backend");
244332
// create the new backend
245-
let (s, mut backend) = WsBackend::connect(self.conn.clone()).await?;
333+
let (s, mut backend) = self.reconnect_backend().await?;
246334

247335
// spawn the new backend
248336
s.spawn();

ethers-providers/src/rpc/transports/ws/mod.rs

+30
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,36 @@ impl WsClient {
5050
Ok(this)
5151
}
5252

53+
#[cfg(not(target_arch = "wasm32"))]
54+
/// Establishes a new websocket connection. This method allows specifying a custom websocket
55+
/// configuration, see the [tungstenite docs](https://docs.rs/tungstenite/latest/tungstenite/protocol/struct.WebSocketConfig.html) for all avaible options.
56+
pub async fn connect_with_config(
57+
conn: impl Into<ConnectionDetails>,
58+
config: impl Into<WebSocketConfig>,
59+
) -> Result<Self, WsClientError> {
60+
let (man, this) = RequestManager::connect_with_config(conn.into(), config.into()).await?;
61+
man.spawn();
62+
Ok(this)
63+
}
64+
65+
#[cfg(not(target_arch = "wasm32"))]
66+
/// Establishes a new websocket connection with auto-reconnects. This method allows specifying a
67+
/// custom websocket configuration, see the [tungstenite docs](https://docs.rs/tungstenite/latest/tungstenite/protocol/struct.WebSocketConfig.html) for all avaible options.
68+
pub async fn connect_with_config_and_reconnects(
69+
conn: impl Into<ConnectionDetails>,
70+
config: impl Into<WebSocketConfig>,
71+
reconnects: usize,
72+
) -> Result<Self, WsClientError> {
73+
let (man, this) = RequestManager::connect_with_config_and_reconnects(
74+
conn.into(),
75+
config.into(),
76+
reconnects,
77+
)
78+
.await?;
79+
man.spawn();
80+
Ok(this)
81+
}
82+
5383
#[tracing::instrument(skip(self, params), err)]
5484
async fn make_request<R>(&self, method: &str, params: Box<RawValue>) -> Result<R, WsClientError>
5585
where

ethers-providers/src/rpc/transports/ws/types.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -257,10 +257,11 @@ mod aliases {
257257
#[cfg(not(target_arch = "wasm32"))]
258258
mod aliases {
259259
pub use tokio_tungstenite::{
260-
connect_async,
260+
connect_async, connect_async_with_config,
261261
tungstenite::{self, protocol::CloseFrame},
262262
};
263263
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
264+
pub type WebSocketConfig = tungstenite::protocol::WebSocketConfig;
264265
pub type Message = tungstenite::protocol::Message;
265266
pub type WsError = tungstenite::Error;
266267
pub type WsStreamItem = Result<Message, WsError>;

0 commit comments

Comments
 (0)