diff --git a/server/src/main.rs b/server/src/main.rs index b97cd1c2..0e2f2a16 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -112,9 +112,9 @@ fn main() { // this should go on a thread too? let ws_pool = pool.clone(); - let wsf = ws::start(ws_pool, events); + let wss = ws::server(ws_pool, events); - tokio::runtime::run(wsf.map_err(|_e| ())); + tokio::runtime::run(wss.map_err(|_e| ())); info!("server started"); } diff --git a/server/src/ws.rs b/server/src/ws.rs index 330e3abb..4c79ee9a 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -8,99 +8,41 @@ use cookie::Cookie; use futures::stream::Stream; use futures::Future; use tokio::net::TcpListener; -use tungstenite::Message::Binary; use tungstenite::handshake::server::{Request, ErrorResponse}; -use tungstenite::handshake::HandshakeRole; + use tungstenite::http::StatusCode; -use tungstenite::protocol::WebSocket; -use tungstenite::util::NonBlockingResult; + + use tungstenite::protocol::Message; use tokio_tungstenite::accept_hdr_async; -use crossbeam_channel::{unbounded, Receiver, Sender}; +use crossbeam_channel::{unbounded}; use serde_cbor::{to_vec}; use std::io::{Error, ErrorKind}; +use futures::Sink; use net::TOKEN_HEADER; use rpc; +use rpc::{RpcMessage}; use mtx; use pg::PgPool; use account; -use account::Account; -use events::{Message as WsMessage, Events}; -// pub fn ws(mut client: WebSocket, pool: PgPool, account: Option, events: Sender) { -// let (tx, rx) = unbounded(); -// events.try_send(Message::Connect(tx)).unwrap(); - -// loop { -// match client.read_message().no_block() { -// Ok(msg) => { -// if let Some(msg) = msg { -// match msg { -// Binary(data) => { -// let begin = Instant::now(); -// let db_connection = pool.get() -// .expect("unable to get db connection"); - -// match rpc::receive(data, &db_connection, begin, &account) { -// Ok(reply) => { -// let response = to_vec(&reply) -// .expect("failed to serialize response"); - -// if let Err(e) = client.write_message(Binary(response)) { -// // connection closed -// warn!("{:?}", e); -// return; -// }; - -// // subscriptions.update(&reply).unwrap(); -// }, -// Err(e) => { -// warn!("{:?}", e); -// let response = to_vec(&RpcError { err: e.to_string() }) -// .expect("failed to serialize error response"); - -// if let Err(e) = client.write_message(Binary(response)) { -// // connection closed -// warn!("{:?}", e); -// return; -// }; -// } -// } -// }, -// _ => (), -// } -// }; - -// // match receiver.try_recv() { -// // Ok(n) => handle_message(&subs, n, &mut websocket), -// // Err(_) => (), -// // }; - -// }, -// // connection is closed -// Err(e) => { -// warn!("{:?}", e); -// return; -// } -// }; -// } -// } +use events::{Events}; #[derive(Debug,Clone,Serialize)] struct RpcError { err: String, } -pub fn start(pool: PgPool, events: Events) -> impl Future { +pub fn server(pool: PgPool, events: Events) -> impl Future { let addr = "127.0.0.1:40055".parse().unwrap(); let ws_server = TcpListener::bind(&addr).unwrap(); - let wsf = ws_server.incoming().for_each(move |stream| { + let wss = ws_server.incoming().for_each(move |stream| { let ws_pool = pool.clone(); let events_tx = events.tx.clone(); @@ -133,7 +75,7 @@ pub fn start(pool: PgPool, events: Events) -> impl Future impl Future { let state = to_vec(&rpc::RpcMessage::AccountState(a.clone())).unwrap(); - // client.write_message(Binary(state)).unwrap(); + sink.start_send(Message::Binary(state)).unwrap(); let mut tx = db.transaction().unwrap(); let shop = mtx::account_shop(&mut tx, &a).unwrap(); let shop = to_vec(&rpc::RpcMessage::AccountShop(shop)).unwrap(); - // client.write_message(Binary(shop)).unwrap(); + sink.start_send(Message::Binary(shop)).unwrap(); // tx doesn't change anything tx.commit().unwrap(); @@ -166,21 +108,57 @@ pub fn start(pool: PgPool, events: Events) -> impl Future None, }; - let ws_reader = stream.for_each(move |message: Message| { - Ok(()) + let (tx, rx) = unbounded(); + events_tx.try_send(WsMessage::Connect(tx)).unwrap(); + + let message_reader = stream + .filter(|msg| msg.is_binary()) + .and_then(|message: Message| { + let begin = Instant::now(); + let db_connection = ws_pool.get() + .expect("unable to get db connection"); + + match rpc::receive(message.into_data(), &db_connection, begin, &account) { + Ok(reply) => { + let response = to_vec(&reply) + .expect("failed to serialize response"); + Ok(response); + }, + Err(e) => { + warn!("{:?}", e); + let response = to_vec(&RpcError { err: e.to_string() }) + .expect("failed to serialize error response"); + Ok(response); + } + }; + + Ok(sink) + }); + + let events_reader = rx.map(|message: RpcMessage| { + let response = to_vec(&message) + .expect("failed to serialize event"); + Ok(response); }); + let combined = message_reader.select(events_reader); - tokio::spawn(ws_reader.then(move |_| { - println!("Connection closed."); + let outgoing = combined.fold(sink, |mut sink, message| { + sink.start_send(Message::Binary(message)).unwrap(); + Ok(sink) + }); + + // if let Err(e) = sink.start_send(Message::Binary(response)) { + // // connection closed + // warn!("{:?}", e); + // return Err(e); + // }; + tokio::spawn(outgoing.then(move |_| { + info!("connection closed"); Ok(()) })); - - info!("{:?}", account); Ok(()) - // ws(client, ws_pool, account, events_tx) - }) .map_err(|e| { warn!("Error during the websocket handshake occurred: {}", e); @@ -188,5 +166,5 @@ pub fn start(pool: PgPool, events: Events) -> impl Future