From ca1609055583cbe26dcf7cfb70873d74b616f289 Mon Sep 17 00:00:00 2001 From: ntr Date: Mon, 22 Jul 2019 16:21:43 +1000 Subject: [PATCH] fix blocking mesasges --- server/src/ws.rs | 84 ++++++++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 39 deletions(-) diff --git a/server/src/ws.rs b/server/src/ws.rs index 4a485003..f5ab2412 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -7,11 +7,13 @@ use uuid::Uuid; use cookie::Cookie; -use tungstenite::server::accept_hdr; use tungstenite::Message::Binary; use tungstenite::handshake::server::{Request, ErrorResponse}; +use tungstenite::handshake::HandshakeRole; use tungstenite::http::StatusCode; use tungstenite::protocol::WebSocket; +use tungstenite::util::NonBlockingResult; +use tungstenite::{accept_hdr}; use crossbeam_channel::{unbounded, Receiver}; @@ -136,9 +138,11 @@ pub fn start(pool: PgPool, psr: Receiver) { let ws_pool = pool.clone(); let ws_psr = psr.clone(); spawn(move || { - let (acc_s, acc_r) = unbounded(); + let nb_stream = stream.unwrap(); + nb_stream.set_nonblocking(true).unwrap(); + // search through the ws request for the auth cookie let cb = |req: &Request| { let err = || ErrorResponse { @@ -163,7 +167,7 @@ pub fn start(pool: PgPool, psr: Receiver) { Ok(None) }; - let mut websocket = accept_hdr(stream.unwrap(), cb).unwrap(); + let mut websocket = accept_hdr(nb_stream, cb).unwrap(); // get a copy of the account let account = match acc_r.recv().unwrap() { @@ -206,54 +210,56 @@ pub fn start(pool: PgPool, psr: Receiver) { }; loop { - match websocket.read_message() { + match websocket.read_message().no_block() { Ok(msg) => { - match msg { - Binary(data) => { - let begin = Instant::now(); - let db_connection = ws_pool.get() - .expect("unable to get db connection"); + if let Some(msg) = msg { + match msg { + Binary(data) => { + let begin = Instant::now(); + let db_connection = ws_pool.get() + .expect("unable to get db connection"); - match rpc::receive(data, &db_connection, &mut websocket, begin, &account) { - Ok(reply) => { - let response = to_vec(&reply) - .expect("failed to serialize response"); + match rpc::receive(data, &db_connection, &mut websocket, begin, &account) { + Ok(reply) => { + let response = to_vec(&reply) + .expect("failed to serialize response"); - if let Err(e) = websocket.write_message(Binary(response)) { - // connection closed - debug!("{:?}", e); - return; - }; + if let Err(e) = websocket.write_message(Binary(response)) { + // connection closed + warn!("{:?}", e); + return; + }; - subs.update(&reply).unwrap(); - }, - Err(e) => { - warn!("{:?}", e); - let response = to_vec(&RpcError { err: e.to_string() }) - .expect("failed to serialize error response"); + subs.update(&reply).unwrap(); + }, + Err(e) => { + warn!("{:?}", e); + let response = to_vec(&RpcError { err: e.to_string() }) + .expect("failed to serialize error response"); - if let Err(e) = websocket.write_message(Binary(response)) { - // connection closed - debug!("{:?}", e); - return; - }; + if let Err(e) = websocket.write_message(Binary(response)) { + // connection closed + warn!("{:?}", e); + return; + }; + } } - } - }, - _ => (), - } + }, + _ => (), + } + }; + + match ws_psr.try_recv() { + Ok(n) => handle_message(&subs, n, &mut websocket), + Err(_) => (), + }; }, // connection is closed Err(e) => { - debug!("{:?}", e); + warn!("{:?}", e); return; } }; - - match ws_psr.try_recv() { - Ok(n) => handle_message(&subs, n, &mut websocket), - Err(_) => (), - }; } }); }