fix blocking mesasges

This commit is contained in:
ntr 2019-07-22 16:21:43 +10:00
parent 5275c26ac8
commit ca16090555

View File

@ -7,11 +7,13 @@ use uuid::Uuid;
use cookie::Cookie; use cookie::Cookie;
use tungstenite::server::accept_hdr;
use tungstenite::Message::Binary; use tungstenite::Message::Binary;
use tungstenite::handshake::server::{Request, ErrorResponse}; use tungstenite::handshake::server::{Request, ErrorResponse};
use tungstenite::handshake::HandshakeRole;
use tungstenite::http::StatusCode; use tungstenite::http::StatusCode;
use tungstenite::protocol::WebSocket; use tungstenite::protocol::WebSocket;
use tungstenite::util::NonBlockingResult;
use tungstenite::{accept_hdr};
use crossbeam_channel::{unbounded, Receiver}; use crossbeam_channel::{unbounded, Receiver};
@ -136,9 +138,11 @@ pub fn start(pool: PgPool, psr: Receiver<Message>) {
let ws_pool = pool.clone(); let ws_pool = pool.clone();
let ws_psr = psr.clone(); let ws_psr = psr.clone();
spawn(move || { spawn(move || {
let (acc_s, acc_r) = unbounded(); let (acc_s, acc_r) = unbounded();
let nb_stream = stream.unwrap();
nb_stream.set_nonblocking(true).unwrap();
// search through the ws request for the auth cookie // search through the ws request for the auth cookie
let cb = |req: &Request| { let cb = |req: &Request| {
let err = || ErrorResponse { let err = || ErrorResponse {
@ -163,7 +167,7 @@ pub fn start(pool: PgPool, psr: Receiver<Message>) {
Ok(None) Ok(None)
}; };
let mut websocket = accept_hdr(stream.unwrap(), cb).unwrap(); let mut websocket = accept_hdr(nb_stream, cb).unwrap();
// get a copy of the account // get a copy of the account
let account = match acc_r.recv().unwrap() { let account = match acc_r.recv().unwrap() {
@ -206,54 +210,56 @@ pub fn start(pool: PgPool, psr: Receiver<Message>) {
}; };
loop { loop {
match websocket.read_message() { match websocket.read_message().no_block() {
Ok(msg) => { Ok(msg) => {
match msg { if let Some(msg) = msg {
Binary(data) => { match msg {
let begin = Instant::now(); Binary(data) => {
let db_connection = ws_pool.get() let begin = Instant::now();
.expect("unable to get db connection"); let db_connection = ws_pool.get()
.expect("unable to get db connection");
match rpc::receive(data, &db_connection, &mut websocket, begin, &account) { match rpc::receive(data, &db_connection, &mut websocket, begin, &account) {
Ok(reply) => { Ok(reply) => {
let response = to_vec(&reply) let response = to_vec(&reply)
.expect("failed to serialize response"); .expect("failed to serialize response");
if let Err(e) = websocket.write_message(Binary(response)) { if let Err(e) = websocket.write_message(Binary(response)) {
// connection closed // connection closed
debug!("{:?}", e); warn!("{:?}", e);
return; return;
}; };
subs.update(&reply).unwrap(); subs.update(&reply).unwrap();
}, },
Err(e) => { Err(e) => {
warn!("{:?}", e); warn!("{:?}", e);
let response = to_vec(&RpcError { err: e.to_string() }) let response = to_vec(&RpcError { err: e.to_string() })
.expect("failed to serialize error response"); .expect("failed to serialize error response");
if let Err(e) = websocket.write_message(Binary(response)) { if let Err(e) = websocket.write_message(Binary(response)) {
// connection closed // connection closed
debug!("{:?}", e); warn!("{:?}", e);
return; return;
}; };
}
} }
} },
}, _ => (),
_ => (), }
} };
match ws_psr.try_recv() {
Ok(n) => handle_message(&subs, n, &mut websocket),
Err(_) => (),
};
}, },
// connection is closed // connection is closed
Err(e) => { Err(e) => {
debug!("{:?}", e); warn!("{:?}", e);
return; return;
} }
}; };
match ws_psr.try_recv() {
Ok(n) => handle_message(&subs, n, &mut websocket),
Err(_) => (),
};
} }
}); });
} }