diff --git a/server/Cargo.toml b/server/Cargo.toml index 2ad706a4..e2a27f4e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -3,13 +3,6 @@ name = "mnml" version = "0.1.0" authors = ["ntr "] -# makes sure to include openssl links in runtime path -# [profile.release] -# rpath = true - -# [profile.dev] -# rpath = true - [dependencies] serde = "1" serde_derive = "1" @@ -38,10 +31,9 @@ persistent = "0.4" router = "0.6" cookie = "0.12" tungstenite = "0.8" +tokio-tungstenite = "0.8" +futures = "0.1" +tokio = "0.1" crossbeam-channel = "0.3" stripe-rust = { version = "0.10.4", features = ["webhooks"] } - -# [patch.crates-io] -# stripe-rust = { git = "https://github.com/margh/stripe-rs.git" } - diff --git a/server/src/events.rs b/server/src/events.rs index d71adfba..bb1ab020 100644 --- a/server/src/events.rs +++ b/server/src/events.rs @@ -144,3 +144,100 @@ pub fn start(pool: PgPool, mut events: Events) { } } } + + +// #[derive(Debug)] +// struct Subscriptions { +// account: Option, +// game: Option, +// instance: Option, +// // account_instances: Vec, +// } + +// impl Subscriptions { +// fn new(ws_pool: &PgPool, account: &Option, ws: &mut Ws) -> Result { +// if let Some(a) = account { +// let db = ws_pool.get()?; +// let mut tx = db.transaction()?; + +// // send account constructs +// let account_constructs = account::account_constructs(&mut tx, a)?; +// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; + +// // get account instances +// // and send them to the client +// let account_instances = account::account_instances(&mut tx, a)?; +// // let instances = account_instances.iter().map(|i| i.id).collect::>(); +// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; + +// // get players +// // add to games +// tx.commit()?; + +// return Ok(Subscriptions { +// account: Some(a.id), +// game: None, +// instance: None, +// }) +// } + +// Ok(Subscriptions { +// account: None, +// game: None, +// instance: None +// }) +// } + +// fn update(&mut self, msg: &RpcMessage) -> Result<&mut Subscriptions, Error> { +// match msg { +// RpcMessage::AccountState(a) => self.account = Some(a.id), +// RpcMessage::InstanceState(i) => self.instance = Some(i.id), +// RpcMessage::GameState(g) => self.game = Some(g.id), +// _ => (), +// }; + +// // info!("subscriptions updated {:?}", self); + +// Ok(self) +// } +// } + + +// fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) { +// if let Some(msg) = match m { +// Message::Account(a) => { +// match subs.account { +// Some(wsa) => match wsa == a.id { +// true => Some(rpc::RpcMessage::AccountState(a)), +// false => None, +// }, +// None => None, +// } +// }, +// Message::Instance(i) => { +// match subs.instance { +// Some(ci) => match ci == i.id { +// true => Some(rpc::RpcMessage::InstanceState(i)), +// false => None, +// }, +// None => None, +// } +// }, +// Message::Game(g) => { +// match subs.game { +// Some(cg) => match cg == g.id { +// true => Some(rpc::RpcMessage::GameState(g)), +// false => None, +// }, +// None => None, +// } +// }, +// Message::Connect(tx) => { +// info!("client connected {:?}", tx); +// None +// }, +// // _ => None, +// } { +// ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); +// } +// } \ No newline at end of file diff --git a/server/src/main.rs b/server/src/main.rs index e6ea1b01..b97cd1c2 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -24,7 +24,13 @@ extern crate bodyparser; extern crate persistent; extern crate router; extern crate cookie; + +extern crate futures; +extern crate tokio; + +extern crate tokio_tungstenite; extern crate tungstenite; + extern crate crossbeam_channel; mod account; @@ -54,6 +60,8 @@ use std::thread::{sleep, spawn}; use std::time::{Duration}; use std::path::{Path}; +use futures::Future; + use events::{start as events_start}; use warden::warden; @@ -104,6 +112,9 @@ fn main() { // this should go on a thread too? let ws_pool = pool.clone(); - ws::start(ws_pool, events); + let wsf = ws::start(ws_pool, events); + + tokio::runtime::run(wsf.map_err(|_e| ())); + info!("server started"); } diff --git a/server/src/ws.rs b/server/src/ws.rs index 1ffcc887..330e3abb 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -1,26 +1,27 @@ use std::time::{Instant}; -use std::net::{TcpStream, TcpListener}; -use std::thread::{spawn}; use std::str; use uuid::Uuid; use cookie::Cookie; +use futures::stream::Stream; +use futures::Future; +use tokio::net::TcpListener; use tungstenite::Message::Binary; use tungstenite::handshake::server::{Request, ErrorResponse}; use tungstenite::handshake::HandshakeRole; use tungstenite::http::StatusCode; use tungstenite::protocol::WebSocket; use tungstenite::util::NonBlockingResult; -use tungstenite::{accept_hdr}; +use tungstenite::protocol::Message; +use tokio_tungstenite::accept_hdr_async; use crossbeam_channel::{unbounded, Receiver, Sender}; use serde_cbor::{to_vec}; -use failure::Error; -use failure::{err_msg, format_err}; +use std::io::{Error, ErrorKind}; use net::TOKEN_HEADER; use rpc; @@ -29,205 +30,110 @@ use mtx; use pg::PgPool; use account; use account::Account; -use events::{Message, Events}; +use events::{Message as WsMessage, Events}; -pub fn ws(mut client: WebSocket, pool: PgPool, account: Option, events: Sender) { - let (tx, rx) = unbounded(); - events.try_send(Message::Connect(tx)).unwrap(); +// pub fn ws(mut client: WebSocket, pool: PgPool, account: Option, events: Sender) { +// let (tx, rx) = unbounded(); +// events.try_send(Message::Connect(tx)).unwrap(); - loop { - match client.read_message().no_block() { - Ok(msg) => { - if let Some(msg) = msg { - match msg { - Binary(data) => { - let begin = Instant::now(); - let db_connection = pool.get() - .expect("unable to get db connection"); +// loop { +// match client.read_message().no_block() { +// Ok(msg) => { +// if let Some(msg) = msg { +// match msg { +// Binary(data) => { +// let begin = Instant::now(); +// let db_connection = pool.get() +// .expect("unable to get db connection"); - match rpc::receive(data, &db_connection, begin, &account) { - Ok(reply) => { - let response = to_vec(&reply) - .expect("failed to serialize response"); +// match rpc::receive(data, &db_connection, begin, &account) { +// Ok(reply) => { +// let response = to_vec(&reply) +// .expect("failed to serialize response"); - if let Err(e) = client.write_message(Binary(response)) { - // connection closed - warn!("{:?}", e); - return; - }; +// if let Err(e) = client.write_message(Binary(response)) { +// // connection closed +// warn!("{:?}", e); +// return; +// }; - // subscriptions.update(&reply).unwrap(); - }, - Err(e) => { - warn!("{:?}", e); - let response = to_vec(&RpcError { err: e.to_string() }) - .expect("failed to serialize error response"); +// // subscriptions.update(&reply).unwrap(); +// }, +// Err(e) => { +// warn!("{:?}", e); +// let response = to_vec(&RpcError { err: e.to_string() }) +// .expect("failed to serialize error response"); - if let Err(e) = client.write_message(Binary(response)) { - // connection closed - warn!("{:?}", e); - return; - }; - } - } - }, - _ => (), - } - }; +// if let Err(e) = client.write_message(Binary(response)) { +// // connection closed +// warn!("{:?}", e); +// return; +// }; +// } +// } +// }, +// _ => (), +// } +// }; - // match receiver.try_recv() { - // Ok(n) => handle_message(&subs, n, &mut websocket), - // Err(_) => (), - // }; +// // match receiver.try_recv() { +// // Ok(n) => handle_message(&subs, n, &mut websocket), +// // Err(_) => (), +// // }; - }, - // connection is closed - Err(e) => { - warn!("{:?}", e); - return; - } - }; - } -} +// }, +// // connection is closed +// Err(e) => { +// warn!("{:?}", e); +// return; +// } +// }; +// } +// } #[derive(Debug,Clone,Serialize)] struct RpcError { err: String, } -// #[derive(Debug)] -// struct Subscriptions { -// account: Option, -// game: Option, -// instance: Option, -// // account_instances: Vec, -// } +pub fn start(pool: PgPool, events: Events) -> impl Future { + let addr = "127.0.0.1:40055".parse().unwrap(); + let ws_server = TcpListener::bind(&addr).unwrap(); -// impl Subscriptions { -// fn new(ws_pool: &PgPool, account: &Option, ws: &mut Ws) -> Result { -// if let Some(a) = account { -// let db = ws_pool.get()?; -// let mut tx = db.transaction()?; - -// // send account constructs -// let account_constructs = account::account_constructs(&mut tx, a)?; -// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; - -// // get account instances -// // and send them to the client -// let account_instances = account::account_instances(&mut tx, a)?; -// // let instances = account_instances.iter().map(|i| i.id).collect::>(); -// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; - -// // get players -// // add to games -// tx.commit()?; - -// return Ok(Subscriptions { -// account: Some(a.id), -// game: None, -// instance: None, -// }) -// } - -// Ok(Subscriptions { -// account: None, -// game: None, -// instance: None -// }) -// } - -// fn update(&mut self, msg: &RpcMessage) -> Result<&mut Subscriptions, Error> { -// match msg { -// RpcMessage::AccountState(a) => self.account = Some(a.id), -// RpcMessage::InstanceState(i) => self.instance = Some(i.id), -// RpcMessage::GameState(g) => self.game = Some(g.id), -// _ => (), -// }; - -// // info!("subscriptions updated {:?}", self); - -// Ok(self) -// } -// } - - -// fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) { -// if let Some(msg) = match m { -// Message::Account(a) => { -// match subs.account { -// Some(wsa) => match wsa == a.id { -// true => Some(rpc::RpcMessage::AccountState(a)), -// false => None, -// }, -// None => None, -// } -// }, -// Message::Instance(i) => { -// match subs.instance { -// Some(ci) => match ci == i.id { -// true => Some(rpc::RpcMessage::InstanceState(i)), -// false => None, -// }, -// None => None, -// } -// }, -// Message::Game(g) => { -// match subs.game { -// Some(cg) => match cg == g.id { -// true => Some(rpc::RpcMessage::GameState(g)), -// false => None, -// }, -// None => None, -// } -// }, -// Message::Connect(tx) => { -// info!("client connected {:?}", tx); -// None -// }, -// // _ => None, -// } { -// ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); -// } -// } - -pub fn start(pool: PgPool, events: Events) { - let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap(); - for stream in ws_server.incoming() { + let wsf = ws_server.incoming().for_each(move |stream| { let ws_pool = pool.clone(); let events_tx = events.tx.clone(); - spawn(move || { - let (acc_s, acc_r) = unbounded(); + let (acc_s, acc_r) = unbounded(); - let nb_stream = stream.unwrap(); - nb_stream.set_nonblocking(true).unwrap(); - - // search through the ws request for the auth cookie - let cb = |req: &Request| { - let err = || ErrorResponse { - error_code: StatusCode::FORBIDDEN, - headers: None, - body: Some("Unauthorized".into()), - }; - - if let Some(cl) = req.headers.find_first("Cookie") { - let cookie_list = str::from_utf8(cl).or(Err(err()))?; - - for s in cookie_list.split(";").map(|s| s.trim()) { - let cookie = Cookie::parse(s).or(Err(err()))?; - - // got auth token - if cookie.name() == TOKEN_HEADER { - acc_s.send(Some(cookie.value().to_string())).or(Err(err()))?; - } - }; - }; - acc_s.send(None).unwrap(); - Ok(None) + // search through the ws request for the auth cookie + let cb = move |req: &Request| { + let err = || ErrorResponse { + error_code: StatusCode::FORBIDDEN, + headers: None, + body: Some("Unauthorized".into()), }; - let mut client = accept_hdr(nb_stream, cb).unwrap(); + if let Some(cl) = req.headers.find_first("Cookie") { + let cookie_list = str::from_utf8(cl).or(Err(err()))?; + + for s in cookie_list.split(";").map(|s| s.trim()) { + let cookie = Cookie::parse(s).or(Err(err()))?; + + // got auth token + if cookie.name() == TOKEN_HEADER { + acc_s.send(Some(cookie.value().to_string())).or(Err(err()))?; + } + }; + }; + acc_s.send(None).unwrap(); + Ok(None) + }; + + accept_hdr_async(stream, cb).and_then(move |ws_stream| { + info!("new connection"); + + let (sink, stream) = ws_stream.split(); // get a copy of the account let account = match acc_r.recv().unwrap() { @@ -235,17 +141,16 @@ pub fn start(pool: PgPool, events: Events) { let db = ws_pool.get() .expect("unable to get db connection"); - match account::from_token(&db, t) { Ok(a) => { let state = to_vec(&rpc::RpcMessage::AccountState(a.clone())).unwrap(); - client.write_message(Binary(state)).unwrap(); + // client.write_message(Binary(state)).unwrap(); let mut tx = db.transaction().unwrap(); let shop = mtx::account_shop(&mut tx, &a).unwrap(); let shop = to_vec(&rpc::RpcMessage::AccountShop(shop)).unwrap(); - client.write_message(Binary(shop)).unwrap(); + // client.write_message(Binary(shop)).unwrap(); // tx doesn't change anything tx.commit().unwrap(); @@ -254,14 +159,34 @@ pub fn start(pool: PgPool, events: Events) { }, Err(e) => { warn!("{:?}", e); - return; + None }, } }, None => None, }; - ws(client, ws_pool, account, events_tx) - }); - } + let ws_reader = stream.for_each(move |message: Message| { + Ok(()) + }); + + + tokio::spawn(ws_reader.then(move |_| { + println!("Connection closed."); + Ok(()) + })); + + + info!("{:?}", account); + Ok(()) + // ws(client, ws_pool, account, events_tx) + + }) + .map_err(|e| { + warn!("Error during the websocket handshake occurred: {}", e); + Error::new(ErrorKind::Other, e) + }) + }); + + return wsf; }