diff --git a/server/Cargo.toml b/server/Cargo.toml index 2ad706a4..1a238de3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -3,13 +3,6 @@ name = "mnml" version = "0.1.0" authors = ["ntr "] -# makes sure to include openssl links in runtime path -# [profile.release] -# rpath = true - -# [profile.dev] -# rpath = true - [dependencies] serde = "1" serde_derive = "1" @@ -37,11 +30,7 @@ bodyparser = "0.8" persistent = "0.4" router = "0.6" cookie = "0.12" -tungstenite = "0.8" crossbeam-channel = "0.3" +ws = "0.8" stripe-rust = { version = "0.10.4", features = ["webhooks"] } - -# [patch.crates-io] -# stripe-rust = { git = "https://github.com/margh/stripe-rs.git" } - diff --git a/server/src/actixnet.rs b/server/src/actixnet.rs deleted file mode 100644 index cbab4b5b..00000000 --- a/server/src/actixnet.rs +++ /dev/null @@ -1,221 +0,0 @@ -use std::env; -use std::thread::{spawn, sleep}; -use std::time::{Duration}; - -use actix_web::{middleware, web, App, HttpMessage, HttpRequest, HttpResponse, HttpServer}; -use actix_web::error::ResponseError; -use actix_web::http::{Cookie}; -use actix_web::cookie::{SameSite}; -use actix_cors::Cors; - -use r2d2::{Pool}; -use r2d2::{PooledConnection}; -use r2d2_postgres::{TlsMode, PostgresConnectionManager}; - -use warden::{warden}; -use pubsub::{pg_listen}; -use ws::{connect}; -use account; -use payments::{post_stripe_event}; - -pub type Db = PooledConnection; -pub type PgPool = Pool; - -const DB_POOL_SIZE: u32 = 20; - -#[derive(Debug,Clone,Serialize,Deserialize)] -pub struct JsonError { - pub err: String -} - -#[derive(Fail, Debug, Serialize, Deserialize)] -pub enum MnmlHttpError { - // User Facing Errors - #[fail(display="internal server error")] - ServerError, - #[fail(display="unauthorized")] - Unauthorized, - #[fail(display="bad request")] - BadRequest, - #[fail(display="account name taken or invalid")] - AccountNameTaken, - #[fail(display="password unacceptable. must be > 11 characters")] - PasswordUnacceptable, - #[fail(display="invalid code. https://discord.gg/YJJgurM")] - InvalidCode, -} - -impl ResponseError for MnmlHttpError { - fn error_response(&self) -> HttpResponse { - match *self { - MnmlHttpError::ServerError => HttpResponse::InternalServerError() - .json(JsonError { err: self.to_string() }), - - MnmlHttpError::BadRequest => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), - - MnmlHttpError::Unauthorized => HttpResponse::Unauthorized() - .cookie(Cookie::build("x-auth-token", "") - // .secure(secure) - .http_only(true) - .same_site(SameSite::Strict) - .max_age(-1) // 1 week aligns with db set - .finish()) - .json(JsonError { err: self.to_string() }), - - MnmlHttpError::AccountNameTaken => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), - - MnmlHttpError::PasswordUnacceptable => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), - - MnmlHttpError::InvalidCode => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), - } - } -} - -fn login_res(token: String) -> HttpResponse { - HttpResponse::Ok() - .cookie(Cookie::build("x-auth-token", token) - // .secure(secure) - .http_only(true) - .same_site(SameSite::Strict) - .max_age(60 * 60 * 24 * 7) // 1 week aligns with db set - .finish()) - .finish() -} - -fn logout_res() -> HttpResponse { - HttpResponse::Ok() - .cookie(Cookie::build("x-auth-token", "") - // .secure(secure) - .http_only(true) - .same_site(SameSite::Strict) - .max_age(-1) - .finish()) - .finish() -} - -#[derive(Debug,Clone,Serialize,Deserialize)] -struct AccountLoginParams { - name: String, - password: String, -} - -fn login(state: web::Data, params: web::Json::) -> Result { - let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; - - match account::login(&mut tx, ¶ms.name, ¶ms.password) { - Ok(a) => { - let token = account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::ServerError))?; - tx.commit().or(Err(MnmlHttpError::ServerError))?; - Ok(login_res(token)) - }, - Err(e) => { - info!("{:?}", e); - Err(MnmlHttpError::Unauthorized) - } - } -} - -fn logout(r: HttpRequest, state: web::Data) -> Result { - match r.cookie("x-auth-token") { - Some(t) => { - let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; - match account::from_token(&mut tx, t.value().to_string()) { - Ok(a) => { - account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::Unauthorized))?; - tx.commit().or(Err(MnmlHttpError::ServerError))?; - return Ok(logout_res()); - }, - Err(_) => Err(MnmlHttpError::Unauthorized), - } - }, - None => Err(MnmlHttpError::Unauthorized), - } -} - -#[derive(Debug,Clone,Serialize,Deserialize)] -struct AccountRegisterParams { - name: String, - password: String, - code: String, -} - -fn register(state: web::Data, params: web::Json::) -> Result { - let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; - - match account::create(¶ms.name, ¶ms.password, ¶ms.code, &mut tx) { - Ok(token) => { - tx.commit().or(Err(MnmlHttpError::ServerError))?; - Ok(login_res(token)) - }, - Err(e) => { - info!("{:?}", e); - Err(MnmlHttpError::BadRequest) - } - } -} - -fn create_pool(url: String) -> Pool { - let manager = PostgresConnectionManager::new(url, TlsMode::None) - .expect("could not instantiate pg manager"); - - Pool::builder() - .max_size(DB_POOL_SIZE) - .build(manager) - .expect("Failed to create pool.") -} - -pub struct State { - pub pool: PgPool, - // pub pubsub: PubSub, -} - -pub fn start() { - let database_url = env::var("DATABASE_URL") - .expect("DATABASE_URL must be set"); - let pool = create_pool(database_url); - - let warden_pool = pool.clone(); - spawn(move || { - loop { - let db_connection = warden_pool.get().expect("unable to get db connection"); - if let Err(e) = warden(db_connection) { - info!("{:?}", e); - } - sleep(Duration::new(1, 0)); - } - }); - - let pubsub_pool = pool.clone(); - spawn(move || loop { - let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); - match pg_listen(pubsub_conn) { - Ok(_) => warn!("pg listen closed"), - Err(e) => warn!("pg_listen error {:?}", e), - } - }); - - HttpServer::new(move || App::new() - .data(State { pool: pool.clone() }) - .wrap(middleware::Logger::default()) - .wrap(Cors::new().supports_credentials()) - .service(web::resource("/api/login").route(web::post().to(login))) - .service(web::resource("/api/logout").route(web::post().to(logout))) - .service(web::resource("/api/register").route(web::post().to(register))) - - .service(web::resource("/api/payments/stripe") - .route(web::post().to(post_stripe_event))) - - // .service(web::resource("/api/payments/crypto") - // .route(web::post().to(post_stripe_payment))) - - .service(web::resource("/api/ws").route(web::get().to(connect)))) - .bind("127.0.0.1:40000").expect("could not bind to port") - .run().expect("could not start http server"); -} diff --git a/server/src/events.rs b/server/src/events.rs new file mode 100644 index 00000000..79bc30d9 --- /dev/null +++ b/server/src/events.rs @@ -0,0 +1,239 @@ +use std::collections::{HashMap, HashSet}; + +// Db Commons +use uuid::Uuid; + +use failure::Error; + +use crossbeam_channel::{unbounded, Sender, Receiver}; + +use account; +use account::Account; +use game; +use instance; +use pg::{Db, PgPool}; +use rpc::RpcMessage; + +type Id = usize; + +pub struct Events { + pub tx: Sender, + rx: Receiver, + pool: PgPool, + + clients: HashMap, +} + +#[derive(Debug,Clone)] +pub enum Event { + Connect(Id, Option, Sender), + Disconnect(Id), + Subscribe(Id, Uuid), + Unsubscribe(Id, Uuid), + + Push(Uuid, RpcMessage), +} + +struct WsClient { + id: Id, + tx: Sender, + subs: HashSet, +} + +impl Events { + pub fn new(pool: PgPool) -> Events { + let (tx, rx) = unbounded(); + Events { + tx, + rx, + pool, + clients: HashMap::new(), + } + } + + pub fn listen(&mut self) -> Result<(), Error> { + loop { + match self.rx.recv() { + Ok(m) => { + self.on_event(m)?; + }, + + // idk if this is a good idea + // possibly just log errors and continue... + Err(e) => { + return Err(format_err!("events error err={:?}", e)); + }, + }; + } + } + + fn remove_client(&mut self, id: Id) { + self.clients.remove(&id); + } + + fn on_event(&mut self, msg: Event) -> Result<(), Error> { + match msg { + Event::Connect(id, account, tx) => { + info!("client connected to events id={:?} account={:?}", id, account); + + let client = WsClient { id, tx, subs: HashSet::new() }; + self.clients.insert(id, client); + + info!("events clients={:?}", self.clients.len()); + Ok(()) + }, + Event::Disconnect(id) => { + info!("client disconnected from events id={:?}", id); + + self.clients.remove(&id); + + info!("events clients={:?}", self.clients.len()); + Ok(()) + } + Event::Subscribe(id, obj) => { + info!("client subscribed to updates from object id={:?} object={:?}", id, obj); + + match self.clients.get_mut(&id) { + Some(client) => { + client.subs.insert(obj); + info!("client subscriptions={:?}", client.subs.len()); + Ok(()) + }, + None => return Err(format_err!("unknown client {:?}", id)) + } + }, + Event::Unsubscribe(id, obj) => { + info!("client subscribed to updates from object id={:?} object={:?}", id, obj); + + match self.clients.get_mut(&id) { + Some(mut client) => { + client.subs.remove(&obj); + info!("client subscriptions={:?}", client.subs.len()); + Ok(()) + }, + None => return Err(format_err!("unknown client {:?}", id)) + } + }, + + Event::Push(id, msg) => { + info!("events received push notification id={:?} msg={:?}", id, msg); + + let mut subs = 0; + for (_client_id, client) in self.clients.iter() { + if client.subs.contains(&id) { + subs += 1; + match client.tx.send(msg.clone()) { + Ok(_) => (), + Err(e) => { + warn!("unable to send msg to client err={:?}", e); + // self.remove_client(*client_id); + }, + }; + } + } + + info!("notification subscribers={:?}", subs); + + Ok(()) + }, + } + } +} + +// #[derive(Debug)] +// struct Subscriptions { +// account: Option, +// game: Option, +// instance: Option, +// // account_instances: Vec, +// } + +// impl Subscriptions { +// fn new(ws_pool: &PgPool, account: &Option, ws: &mut Ws) -> Result { +// if let Some(a) = account { +// let db = ws_pool.get()?; +// let mut tx = db.transaction()?; + +// // send account constructs +// let account_constructs = account::account_constructs(&mut tx, a)?; +// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; + +// // get account instances +// // and send them to the client +// let account_instances = account::account_instances(&mut tx, a)?; +// // let instances = account_instances.iter().map(|i| i.id).collect::>(); +// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; + +// // get players +// // add to games +// tx.commit()?; + +// return Ok(Subscriptions { +// account: Some(a.id), +// game: None, +// instance: None, +// }) +// } + +// Ok(Subscriptions { +// account: None, +// game: None, +// instance: None +// }) +// } + +// fn update(&mut self, msg: &RpcMessage) -> Result<&mut Subscriptions, Error> { +// match msg { +// RpcMessage::AccountState(a) => self.account = Some(a.id), +// RpcMessage::InstanceState(i) => self.instance = Some(i.id), +// RpcMessage::GameState(g) => self.game = Some(g.id), +// _ => (), +// }; + +// // info!("subscriptions updated {:?}", self); + +// Ok(self) +// } +// } + + +// fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) { +// if let Some(msg) = match m { +// Message::Account(a) => { +// match subs.account { +// Some(wsa) => match wsa == a.id { +// true => Some(rpc::RpcMessage::AccountState(a)), +// false => None, +// }, +// None => None, +// } +// }, +// Message::Instance(i) => { +// match subs.instance { +// Some(ci) => match ci == i.id { +// true => Some(rpc::RpcMessage::InstanceState(i)), +// false => None, +// }, +// None => None, +// } +// }, +// Message::Game(g) => { +// match subs.game { +// Some(cg) => match cg == g.id { +// true => Some(rpc::RpcMessage::GameState(g)), +// false => None, +// }, +// None => None, +// } +// }, +// Message::Connect(tx) => { +// info!("client connected {:?}", tx); +// None +// }, +// // _ => None, +// } { +// ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); +// } +// } + + diff --git a/server/src/main.rs b/server/src/main.rs index 7434ad6f..a53119bb 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -24,7 +24,8 @@ extern crate bodyparser; extern crate persistent; extern crate router; extern crate cookie; -extern crate tungstenite; + +extern crate ws; extern crate crossbeam_channel; mod account; @@ -41,22 +42,20 @@ mod net; mod payments; mod pg; mod player; -mod pubsub; +mod events; mod rpc; mod skill; mod spec; mod util; mod vbox; mod warden; -mod ws; +mod websocket; use std::thread::{sleep, spawn}; use std::time::{Duration}; use std::path::{Path}; -use crossbeam_channel::{unbounded}; - -use pubsub::pg_listen; +use events::Events; use warden::warden; fn setup_logger() -> Result<(), fern::InitError> { @@ -85,13 +84,7 @@ fn main() { let pool = pg::create_pool(); - let ws_pool = pool.clone(); - let http_pool = pool.clone(); let warden_pool = pool.clone(); - let pubsub_pool = pool.clone(); - - let (pss, psr) = unbounded(); - spawn(move || { loop { let db_connection = warden_pool.get().expect("unable to get db connection"); @@ -102,15 +95,21 @@ fn main() { } }); - spawn(move || loop { - let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); - match pg_listen(pubsub_conn, pss.clone()) { - Ok(_) => warn!("pg listen closed"), - Err(e) => warn!("pg_listen error {:?}", e), - } - }); - + let http_pool = pool.clone(); spawn(move || net::start(http_pool)); - ws::start(ws_pool, psr); - info!("server started"); + + // create a clone of the tx so ws handler can tell events + // about connection status + // TODO store as an Arc> and make cpuN threads + let mut events = events::Events::new(pool.clone()); + let ws_events_tx = events.tx.clone(); + + let pg_pool = pool.clone(); + let pg_events = events.tx.clone(); + spawn(move || pg::listen(pg_pool, pg_events)); + spawn(move || events.listen()); + + // the main thread becomes this ws listener + let ws_pool = pool.clone(); + websocket::start(ws_pool, ws_events_tx); } diff --git a/server/src/net.rs b/server/src/net.rs index 12bae55a..b67abe78 100644 --- a/server/src/net.rs +++ b/server/src/net.rs @@ -12,7 +12,7 @@ use router::Router; use serde::{Serialize, Deserialize}; // use warden::{warden}; -// use pubsub::{pg_listen}; +// use events::{pg_listen}; // use ws::{connect}; use account; use pg::PgPool; @@ -103,7 +103,7 @@ impl From for IronError { MnmlHttpError::AccountNotFound | MnmlHttpError::BadRequest | MnmlHttpError::PasswordUnacceptable => (m_err.compat(), status::BadRequest), - + MnmlHttpError::PasswordNotMatch | MnmlHttpError::InvalidCode | MnmlHttpError::TokenDoesNotMatch | @@ -260,7 +260,7 @@ const MAX_BODY_LENGTH: usize = 1024 * 1024 * 10; pub struct State { pub pool: PgPool, - // pub pubsub: PubSub, + // pub events: Events, } impl Key for State { type Value = State; } diff --git a/server/src/pg.rs b/server/src/pg.rs index 0b0fe2db..88e05a87 100644 --- a/server/src/pg.rs +++ b/server/src/pg.rs @@ -1,16 +1,53 @@ use std::env; +use std::thread::spawn; + +use uuid::Uuid; + +use failure::Error; use r2d2::{Pool}; use r2d2::{PooledConnection}; use r2d2_postgres::{TlsMode, PostgresConnectionManager}; +use fallible_iterator::{FallibleIterator}; -// use postgres::transaction::Transaction; +use crossbeam_channel::{Sender}; + +use events::{Event}; +use account; +use game; +use instance; +use rpc::RpcMessage; pub type Db = PooledConnection; pub type PgPool = Pool; const DB_POOL_SIZE: u32 = 20; +#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] +#[serde(rename_all(deserialize = "lowercase"))] +enum Table { + Accounts, + Constructs, + Instances, + Mtx, + Players, + Games, +} + +#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] +#[serde(rename_all(deserialize = "UPPERCASE"))] +enum Action { + Insert, + Update, + Delete, +} + +#[derive(Debug,Clone,Serialize,Deserialize)] +struct Notification { + table: Table, + action: Action, + id: Uuid, +} pub fn create_pool() -> Pool { let url = env::var("DATABASE_URL") @@ -24,3 +61,70 @@ pub fn create_pool() -> Pool { .build(manager) .expect("Failed to create pool.") } + +fn handle_notification(n: Notification, pool: &PgPool, events: &Sender) { + info!("pg received notification={:?}", n); + + // bang out a thread to do the slow work of fetching the state from db + // the thread will notify events + + let pool = pool.clone(); + let events = events.clone(); + spawn(move || { + // maybe we need it + let db = pool.get().unwrap(); + let mut tx = db.transaction().unwrap(); + + let msg = match n.action { + Action::Delete => { + warn!("unimplemented delete notification {:?}", n); + None + }, + Action::Insert => { + warn!("unimplemented insert notification {:?}", n); + None + }, + Action::Update => match n.table { + Table::Accounts => + Some(Event::Push(n.id, RpcMessage::AccountState(account::select(&db, n.id).unwrap()))), + Table::Instances => + Some(Event::Push(n.id, RpcMessage::InstanceState(instance::instance_get(&mut tx, n.id).unwrap()))), + Table::Games => + Some(Event::Push(n.id, RpcMessage::GameState(game::game_get(&mut tx, n.id).unwrap()))), + _ => { + warn!("unimplemented update notification {:?}", n); + None + }, + }, + }; + + tx.commit().unwrap(); + + if let Some(msg) = msg { + events.send(msg).unwrap(); + } + }); +} + +// this function gets a dedicated connection +// because it has to subscribe and listen for notifications +pub fn listen(pool: PgPool, events: Sender) -> Result<(), Error> { + let db = pool.get()?; + db.execute("LISTEN events;", &[])?; + info!("pg listening"); + + let notifications = db.notifications(); + let mut n_iter = notifications.blocking_iter(); + + // main event loop, checks pg and checks messages + loop { + // check notifications + let n = n_iter.next()?; + if let Some(n) = n { + match serde_json::from_str::(&n.payload) { + Ok(notification) => handle_notification(notification, &pool, &events), + Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e), + }; + } + } +} diff --git a/server/src/pubsub.rs b/server/src/pubsub.rs deleted file mode 100644 index 6a4cd90b..00000000 --- a/server/src/pubsub.rs +++ /dev/null @@ -1,92 +0,0 @@ -// Db Commons -use uuid::Uuid; -use fallible_iterator::{FallibleIterator}; - -use failure::Error; -use failure::err_msg; - -use crossbeam_channel::{Sender}; - -use pg::{Db}; -use account; -use game; -use instance; - -#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] -#[serde(rename_all(deserialize = "lowercase"))] -enum Table { - Accounts, - Constructs, - Instances, - Mtx, - Players, - Games, -} - -#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] -#[serde(rename_all(deserialize = "UPPERCASE"))] -enum Action { - Insert, - Update, - Delete, -} - -#[derive(Debug,Clone,Serialize,Deserialize)] -struct Notification { - table: Table, - action: Action, - id: Uuid, -} - -#[derive(Debug,Clone)] -pub enum Message { - Account(account::Account), - Game(game::Game), - Instance(instance::Instance), -} - -fn handle_notification(n: Notification, db: &Db, pss: &Sender) -> Result<(), Error> { - info!("pubsub received notification notification={:?}", n); - - // maybe we need it - let mut tx = db.transaction()?; - - let msg = match n.action { - Action::Delete => return Err(format_err!("unimplemented delete notification {:?}", n)), - Action::Insert => return Err(format_err!("unimplemented insert notification {:?}", n)), - Action::Update => match n.table { - Table::Accounts => Message::Account(account::select(db, n.id)?), - Table::Instances => Message::Instance(instance::instance_get(&mut tx, n.id)?), - Table::Games => Message::Game(game::game_get(&mut tx, n.id)?), - _ => return Err(format_err!("unimplemented update notification {:?}", n)), - }, - }; - - tx.commit()?; - - match pss.try_send(msg.clone()) { - Ok(()) => info!("pubsub message sent message={:?}", msg), - Err(e) => warn!("pubsub delivery failure err={:?}", e), - }; - - Ok(()) -} - -pub fn pg_listen(db: Db, pss: Sender) -> Result<(), Error> { - db.execute("LISTEN events;", &[])?; - info!("pubsub listening"); - let notifications = db.notifications(); - let mut n_iter = notifications.blocking_iter(); - loop { - let n = n_iter.next().unwrap(); - if let Some(n) = n { - match serde_json::from_str::(&n.payload) { - Ok(notification) => match handle_notification(notification, &db, &pss) { - Ok(()) => (), - Err(e) => warn!("{:?}", e), - } - Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e), - }; - } - } -} diff --git a/server/src/rpc.rs b/server/src/rpc.rs index 897f459d..76a610d5 100644 --- a/server/src/rpc.rs +++ b/server/src/rpc.rs @@ -7,7 +7,6 @@ use uuid::Uuid; use failure::Error; use failure::err_msg; -use ws::{Ws}; use pg::{Db}; use construct::{Construct}; use game::{Game, game_state, game_skill, game_ready}; @@ -34,6 +33,8 @@ pub enum RpcMessage { Pong(()), DevResolutions(Resolutions), + + Error(String), } #[derive(Debug,Clone,Serialize,Deserialize)] @@ -68,7 +69,7 @@ enum RpcRequest { VboxReclaim { instance_id: Uuid, index: usize }, } -pub fn receive(data: Vec, db: &Db, _client: &mut Ws, begin: Instant, account: &Option) -> Result { +pub fn receive(data: Vec, db: &Db, begin: Instant, account: &Option) -> Result { // cast the msg to this type to receive method name match from_slice::(&data) { Ok(v) => { diff --git a/server/src/websocket.rs b/server/src/websocket.rs new file mode 100644 index 00000000..0c78b977 --- /dev/null +++ b/server/src/websocket.rs @@ -0,0 +1,175 @@ +use std::time::{Instant}; +use std::thread::spawn; +use std::str; + +use rand::prelude::*; + +use serde_cbor::to_vec; + +use cookie::Cookie; + +use crossbeam_channel::{unbounded, Sender as CbSender}; +use ws::{ listen, CloseCode, Message, Handler, Result, Request, Response}; + +use account; +use account::{Account}; +use pg::{PgPool}; +use events::Event; + +use rpc::{RpcMessage}; +use rpc; +use mtx; +use net::TOKEN_HEADER; + +struct Connection { + pub id: usize, + pub ws: CbSender, + pool: PgPool, + account: Option, + events: CbSender, +} + +// we unwrap everything in here cause really +// we don't care if this panics +// it's run in a thread so it's supposed to bail +// when it encounters errors +impl Handler for Connection { + fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { + info!("websocket connected account={:?}", self.account); + + // tell events we have connected + self.events.send(Event::Connect(self.id, self.account.clone(), self.ws.clone())).unwrap(); + + // if user logged in do some prep work + if let Some(ref a) = self.account { + self.ws.send(RpcMessage::AccountState(a.clone())).unwrap(); + self.events.send(Event::Subscribe(self.id, a.id)).unwrap(); + + let db = self.pool.get().unwrap(); + let mut tx = db.transaction().unwrap(); + + // send account constructs + let account_constructs = account::account_constructs(&mut tx, a).unwrap(); + self.ws.send(rpc::RpcMessage::AccountConstructs(account_constructs)).unwrap(); + + // get account instances + // and send them to the client + let account_instances = account::account_instances(&mut tx, a).unwrap(); + self.ws.send(rpc::RpcMessage::AccountInstances(account_instances)).unwrap(); + + let shop = mtx::account_shop(&mut tx, &a).unwrap(); + self.ws.send(rpc::RpcMessage::AccountShop(shop)).unwrap(); + + // tx should do nothing + tx.commit().unwrap(); + } + + Ok(()) + } + + fn on_message(&mut self, msg: Message) -> Result<()> { + match msg { + Message::Binary(msg) => { + let begin = Instant::now(); + let db_connection = self.pool.get().unwrap(); + + match rpc::receive(msg, &db_connection, begin, &self.account) { + Ok(reply) => { + // if the user queries the state of something + // we tell events to push updates to them + match reply { + RpcMessage::AccountState(ref v) => + self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), + RpcMessage::GameState(ref v) => + self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), + RpcMessage::InstanceState(ref v) => + self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), + _ => (), + }; + + self.ws.send(reply).unwrap(); + }, + Err(e) => { + warn!("{:?}", e); + self.ws.send(RpcMessage::Error(e.to_string())).unwrap(); + }, + }; + }, + _ => (), + }; + Ok(()) + } + + fn on_close(&mut self, _: CloseCode, _: &str) { + info!("websocket disconnected account={:?}", self.account); + self.events.send(Event::Disconnect(self.id)).unwrap(); + } + + fn on_request(&mut self, req: &Request) -> Result { + let res = Response::from_request(req)?; + + if let Some(cl) = req.header("Cookie") { + let unauth = || Ok(Response::new(401, "Unauthorized", b"401 - Unauthorized".to_vec())); + let cookie_list = match str::from_utf8(cl) { + Ok(cl) => cl, + Err(_) => return unauth(), + }; + + for s in cookie_list.split(";").map(|s| s.trim()) { + let cookie = match Cookie::parse(s) { + Ok(c) => c, + Err(_) => return unauth(), + }; + + // got auth token + if cookie.name() == TOKEN_HEADER { + let db = self.pool.get().unwrap(); + match account::from_token(&db, cookie.value().to_string()) { + Ok(a) => self.account = Some(a), + Err(_) => return unauth(), + } + } + }; + }; + + Ok(res) + } +} + + +pub fn start(pool: PgPool, events_tx: CbSender) { + let mut rng = thread_rng(); + listen("127.0.0.1:40055", move |out| { + + // we give the tx half to the connection object + // which in turn passes a clone to the events system + // the rx half goes into a thread where it waits for messages + // that need to be delivered to the client + // both the ws message handler and the events thread must use + // this channel to send messages + let (tx, rx) = unbounded::(); + + spawn(move || { + loop { + match rx.recv() { + Ok(n) => { + let response = to_vec(&n).unwrap(); + out.send(Message::Binary(response)).unwrap(); + } + // we done + Err(_e) => { + break; + }, + }; + } + }); + + Connection { + id: rng.gen::(), + account: None, + ws: tx, + pool: pool.clone(), + events: events_tx.clone(), + } + }).unwrap(); +} diff --git a/server/src/ws.rs b/server/src/ws.rs deleted file mode 100644 index f5ab2412..00000000 --- a/server/src/ws.rs +++ /dev/null @@ -1,266 +0,0 @@ -use std::time::{Instant}; -use std::net::{TcpStream, TcpListener}; -use std::thread::{spawn}; -use std::str; - -use uuid::Uuid; - -use cookie::Cookie; - -use tungstenite::Message::Binary; -use tungstenite::handshake::server::{Request, ErrorResponse}; -use tungstenite::handshake::HandshakeRole; -use tungstenite::http::StatusCode; -use tungstenite::protocol::WebSocket; -use tungstenite::util::NonBlockingResult; -use tungstenite::{accept_hdr}; - -use crossbeam_channel::{unbounded, Receiver}; - -use serde_cbor::{to_vec}; - -use failure::Error; -use failure::{err_msg, format_err}; - -use net::TOKEN_HEADER; -use rpc; -use rpc::{RpcMessage}; - -use mtx; -use pg::PgPool; -use account; -use account::Account; -use pubsub::Message; - -pub type Ws = WebSocket; - -#[derive(Debug,Clone,Serialize)] -struct RpcError { - err: String, -} - -#[derive(Debug)] -struct Subscriptions { - account: Option, - game: Option, - instance: Option, - // account_instances: Vec, -} - -impl Subscriptions { - fn new(ws_pool: &PgPool, account: &Option, ws: &mut Ws) -> Result { - if let Some(a) = account { - let db = ws_pool.get()?; - let mut tx = db.transaction()?; - - // send account constructs - let account_constructs = account::account_constructs(&mut tx, a)?; - ws.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; - - // get account instances - // and send them to the client - let account_instances = account::account_instances(&mut tx, a)?; - // let instances = account_instances.iter().map(|i| i.id).collect::>(); - ws.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; - - // get players - // add to games - - tx.commit()?; - - return Ok(Subscriptions { - account: Some(a.id), - game: None, - instance: None, - }) - } - - Ok(Subscriptions { - account: None, - game: None, - instance: None - }) - } - - fn update(&mut self, msg: &RpcMessage) -> Result<&mut Subscriptions, Error> { - match msg { - RpcMessage::AccountState(a) => self.account = Some(a.id), - RpcMessage::InstanceState(i) => self.instance = Some(i.id), - RpcMessage::GameState(g) => self.game = Some(g.id), - _ => (), - }; - - // info!("subscriptions updated {:?}", self); - - Ok(self) - } -} - - -fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) { - if let Some(msg) = match m { - Message::Account(a) => { - match subs.account { - Some(wsa) => match wsa == a.id { - true => Some(rpc::RpcMessage::AccountState(a)), - false => None, - }, - None => None, - } - }, - Message::Instance(i) => { - match subs.instance { - Some(ci) => match ci == i.id { - true => Some(rpc::RpcMessage::InstanceState(i)), - false => None, - }, - None => None, - } - }, - Message::Game(g) => { - match subs.game { - Some(cg) => match cg == g.id { - true => Some(rpc::RpcMessage::GameState(g)), - false => None, - }, - None => None, - } - }, - // _ => None, - } { - ws.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); - } -} - -pub fn start(pool: PgPool, psr: Receiver) { - let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap(); - for stream in ws_server.incoming() { - let ws_pool = pool.clone(); - let ws_psr = psr.clone(); - spawn(move || { - let (acc_s, acc_r) = unbounded(); - - let nb_stream = stream.unwrap(); - nb_stream.set_nonblocking(true).unwrap(); - - // search through the ws request for the auth cookie - let cb = |req: &Request| { - let err = || ErrorResponse { - error_code: StatusCode::FORBIDDEN, - headers: None, - body: Some("Unauthorized".into()), - }; - - if let Some(cl) = req.headers.find_first("Cookie") { - let cookie_list = str::from_utf8(cl).or(Err(err()))?; - - for s in cookie_list.split(";").map(|s| s.trim()) { - let cookie = Cookie::parse(s).or(Err(err()))?; - - // got auth token - if cookie.name() == TOKEN_HEADER { - acc_s.send(Some(cookie.value().to_string())).or(Err(err()))?; - } - }; - }; - acc_s.send(None).unwrap(); - Ok(None) - }; - - let mut websocket = accept_hdr(nb_stream, cb).unwrap(); - - // get a copy of the account - let account = match acc_r.recv().unwrap() { - Some(t) => { - let db = ws_pool.get() - .expect("unable to get db connection"); - - - match account::from_token(&db, t) { - Ok(a) => { - let state = to_vec(&rpc::RpcMessage::AccountState(a.clone())).unwrap(); - websocket.write_message(Binary(state)).unwrap(); - - let mut tx = db.transaction().unwrap(); - let shop = mtx::account_shop(&mut tx, &a).unwrap(); - let shop = to_vec(&rpc::RpcMessage::AccountShop(shop)).unwrap(); - - websocket.write_message(Binary(shop)).unwrap(); - - // tx doesn't change anything - tx.commit().unwrap(); - - Some(a) - }, - Err(e) => { - warn!("{:?}", e); - return; - }, - } - }, - None => None, - }; - - let mut subs = match Subscriptions::new(&ws_pool, &account, &mut websocket) { - Ok(s) => s, - Err(e) => { - warn!("subscriptions error err={:?}", e); - return; - }, - }; - - loop { - match websocket.read_message().no_block() { - Ok(msg) => { - if let Some(msg) = msg { - match msg { - Binary(data) => { - let begin = Instant::now(); - let db_connection = ws_pool.get() - .expect("unable to get db connection"); - - match rpc::receive(data, &db_connection, &mut websocket, begin, &account) { - Ok(reply) => { - let response = to_vec(&reply) - .expect("failed to serialize response"); - - if let Err(e) = websocket.write_message(Binary(response)) { - // connection closed - warn!("{:?}", e); - return; - }; - - subs.update(&reply).unwrap(); - }, - Err(e) => { - warn!("{:?}", e); - let response = to_vec(&RpcError { err: e.to_string() }) - .expect("failed to serialize error response"); - - if let Err(e) = websocket.write_message(Binary(response)) { - // connection closed - warn!("{:?}", e); - return; - }; - } - } - }, - _ => (), - } - }; - - match ws_psr.try_recv() { - Ok(n) => handle_message(&subs, n, &mut websocket), - Err(_) => (), - }; - }, - // connection is closed - Err(e) => { - warn!("{:?}", e); - return; - } - }; - } - }); - } -}