diff --git a/server/Cargo.toml b/server/Cargo.toml index e2a27f4e..1a238de3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -30,10 +30,7 @@ bodyparser = "0.8" persistent = "0.4" router = "0.6" cookie = "0.12" -tungstenite = "0.8" -tokio-tungstenite = "0.8" -futures = "0.1" -tokio = "0.1" crossbeam-channel = "0.3" +ws = "0.8" stripe-rust = { version = "0.10.4", features = ["webhooks"] } diff --git a/server/src/main.rs b/server/src/main.rs index 0e2f2a16..ebb549ce 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -25,12 +25,7 @@ extern crate persistent; extern crate router; extern crate cookie; -extern crate futures; -extern crate tokio; - -extern crate tokio_tungstenite; -extern crate tungstenite; - +extern crate ws; extern crate crossbeam_channel; mod account; @@ -54,14 +49,12 @@ mod spec; mod util; mod vbox; mod warden; -mod ws; +mod websocket; use std::thread::{sleep, spawn}; use std::time::{Duration}; use std::path::{Path}; -use futures::Future; - use events::{start as events_start}; use warden::warden; @@ -110,11 +103,8 @@ fn main() { let http_pool = pool.clone(); spawn(move || net::start(http_pool)); - // this should go on a thread too? let ws_pool = pool.clone(); - let wss = ws::server(ws_pool, events); - - tokio::runtime::run(wss.map_err(|_e| ())); + websocket::start(ws_pool, events); info!("server started"); } diff --git a/server/src/rpc.rs b/server/src/rpc.rs index 3bbcec7d..76a610d5 100644 --- a/server/src/rpc.rs +++ b/server/src/rpc.rs @@ -33,6 +33,8 @@ pub enum RpcMessage { Pong(()), DevResolutions(Resolutions), + + Error(String), } #[derive(Debug,Clone,Serialize,Deserialize)] diff --git a/server/src/ws.rs b/server/src/ws.rs deleted file mode 100644 index 4c79ee9a..00000000 --- a/server/src/ws.rs +++ /dev/null @@ -1,170 +0,0 @@ -use std::time::{Instant}; -use std::str; - -use uuid::Uuid; - -use cookie::Cookie; - -use futures::stream::Stream; -use futures::Future; -use tokio::net::TcpListener; -use tungstenite::handshake::server::{Request, ErrorResponse}; - -use tungstenite::http::StatusCode; - - -use tungstenite::protocol::Message; -use tokio_tungstenite::accept_hdr_async; - -use crossbeam_channel::{unbounded}; - -use serde_cbor::{to_vec}; - -use std::io::{Error, ErrorKind}; -use futures::Sink; - -use net::TOKEN_HEADER; -use rpc; -use rpc::{RpcMessage}; - -use mtx; -use pg::PgPool; -use account; - -use events::{Events}; - -#[derive(Debug,Clone,Serialize)] -struct RpcError { - err: String, -} - -pub fn server(pool: PgPool, events: Events) -> impl Future { - let addr = "127.0.0.1:40055".parse().unwrap(); - let ws_server = TcpListener::bind(&addr).unwrap(); - - let wss = ws_server.incoming().for_each(move |stream| { - let ws_pool = pool.clone(); - let events_tx = events.tx.clone(); - - let (acc_s, acc_r) = unbounded(); - - // search through the ws request for the auth cookie - let cb = move |req: &Request| { - let err = || ErrorResponse { - error_code: StatusCode::FORBIDDEN, - headers: None, - body: Some("Unauthorized".into()), - }; - - if let Some(cl) = req.headers.find_first("Cookie") { - let cookie_list = str::from_utf8(cl).or(Err(err()))?; - - for s in cookie_list.split(";").map(|s| s.trim()) { - let cookie = Cookie::parse(s).or(Err(err()))?; - - // got auth token - if cookie.name() == TOKEN_HEADER { - acc_s.send(Some(cookie.value().to_string())).or(Err(err()))?; - } - }; - }; - acc_s.send(None).unwrap(); - Ok(None) - }; - - accept_hdr_async(stream, cb).and_then(move |ws_stream| { - info!("new connection"); - - let (mut sink, stream) = ws_stream.split(); - - // get a copy of the account - let account = match acc_r.recv().unwrap() { - Some(t) => { - let db = ws_pool.get() - .expect("unable to get db connection"); - - match account::from_token(&db, t) { - Ok(a) => { - let state = to_vec(&rpc::RpcMessage::AccountState(a.clone())).unwrap(); - sink.start_send(Message::Binary(state)).unwrap(); - - let mut tx = db.transaction().unwrap(); - let shop = mtx::account_shop(&mut tx, &a).unwrap(); - let shop = to_vec(&rpc::RpcMessage::AccountShop(shop)).unwrap(); - - sink.start_send(Message::Binary(shop)).unwrap(); - - // tx doesn't change anything - tx.commit().unwrap(); - - Some(a) - }, - Err(e) => { - warn!("{:?}", e); - None - }, - } - }, - None => None, - }; - - let (tx, rx) = unbounded(); - events_tx.try_send(WsMessage::Connect(tx)).unwrap(); - - let message_reader = stream - .filter(|msg| msg.is_binary()) - .and_then(|message: Message| { - let begin = Instant::now(); - let db_connection = ws_pool.get() - .expect("unable to get db connection"); - - match rpc::receive(message.into_data(), &db_connection, begin, &account) { - Ok(reply) => { - let response = to_vec(&reply) - .expect("failed to serialize response"); - Ok(response); - }, - Err(e) => { - warn!("{:?}", e); - let response = to_vec(&RpcError { err: e.to_string() }) - .expect("failed to serialize error response"); - Ok(response); - } - }; - - Ok(sink) - }); - - let events_reader = rx.map(|message: RpcMessage| { - let response = to_vec(&message) - .expect("failed to serialize event"); - Ok(response); - }); - - let combined = message_reader.select(events_reader); - - let outgoing = combined.fold(sink, |mut sink, message| { - sink.start_send(Message::Binary(message)).unwrap(); - Ok(sink) - }); - - // if let Err(e) = sink.start_send(Message::Binary(response)) { - // // connection closed - // warn!("{:?}", e); - // return Err(e); - // }; - tokio::spawn(outgoing.then(move |_| { - info!("connection closed"); - Ok(()) - })); - - Ok(()) - }) - .map_err(|e| { - warn!("Error during the websocket handshake occurred: {}", e); - Error::new(ErrorKind::Other, e) - }) - }); - - return wss; -}