diff --git a/server/src/actixnet.rs b/server/src/actixnet.rs deleted file mode 100644 index cbab4b5b..00000000 --- a/server/src/actixnet.rs +++ /dev/null @@ -1,221 +0,0 @@ -use std::env; -use std::thread::{spawn, sleep}; -use std::time::{Duration}; - -use actix_web::{middleware, web, App, HttpMessage, HttpRequest, HttpResponse, HttpServer}; -use actix_web::error::ResponseError; -use actix_web::http::{Cookie}; -use actix_web::cookie::{SameSite}; -use actix_cors::Cors; - -use r2d2::{Pool}; -use r2d2::{PooledConnection}; -use r2d2_postgres::{TlsMode, PostgresConnectionManager}; - -use warden::{warden}; -use pubsub::{pg_listen}; -use ws::{connect}; -use account; -use payments::{post_stripe_event}; - -pub type Db = PooledConnection; -pub type PgPool = Pool; - -const DB_POOL_SIZE: u32 = 20; - -#[derive(Debug,Clone,Serialize,Deserialize)] -pub struct JsonError { - pub err: String -} - -#[derive(Fail, Debug, Serialize, Deserialize)] -pub enum MnmlHttpError { - // User Facing Errors - #[fail(display="internal server error")] - ServerError, - #[fail(display="unauthorized")] - Unauthorized, - #[fail(display="bad request")] - BadRequest, - #[fail(display="account name taken or invalid")] - AccountNameTaken, - #[fail(display="password unacceptable. must be > 11 characters")] - PasswordUnacceptable, - #[fail(display="invalid code. https://discord.gg/YJJgurM")] - InvalidCode, -} - -impl ResponseError for MnmlHttpError { - fn error_response(&self) -> HttpResponse { - match *self { - MnmlHttpError::ServerError => HttpResponse::InternalServerError() - .json(JsonError { err: self.to_string() }), - - MnmlHttpError::BadRequest => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), - - MnmlHttpError::Unauthorized => HttpResponse::Unauthorized() - .cookie(Cookie::build("x-auth-token", "") - // .secure(secure) - .http_only(true) - .same_site(SameSite::Strict) - .max_age(-1) // 1 week aligns with db set - .finish()) - .json(JsonError { err: self.to_string() }), - - MnmlHttpError::AccountNameTaken => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), - - MnmlHttpError::PasswordUnacceptable => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), - - MnmlHttpError::InvalidCode => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), - } - } -} - -fn login_res(token: String) -> HttpResponse { - HttpResponse::Ok() - .cookie(Cookie::build("x-auth-token", token) - // .secure(secure) - .http_only(true) - .same_site(SameSite::Strict) - .max_age(60 * 60 * 24 * 7) // 1 week aligns with db set - .finish()) - .finish() -} - -fn logout_res() -> HttpResponse { - HttpResponse::Ok() - .cookie(Cookie::build("x-auth-token", "") - // .secure(secure) - .http_only(true) - .same_site(SameSite::Strict) - .max_age(-1) - .finish()) - .finish() -} - -#[derive(Debug,Clone,Serialize,Deserialize)] -struct AccountLoginParams { - name: String, - password: String, -} - -fn login(state: web::Data, params: web::Json::) -> Result { - let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; - - match account::login(&mut tx, ¶ms.name, ¶ms.password) { - Ok(a) => { - let token = account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::ServerError))?; - tx.commit().or(Err(MnmlHttpError::ServerError))?; - Ok(login_res(token)) - }, - Err(e) => { - info!("{:?}", e); - Err(MnmlHttpError::Unauthorized) - } - } -} - -fn logout(r: HttpRequest, state: web::Data) -> Result { - match r.cookie("x-auth-token") { - Some(t) => { - let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; - match account::from_token(&mut tx, t.value().to_string()) { - Ok(a) => { - account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::Unauthorized))?; - tx.commit().or(Err(MnmlHttpError::ServerError))?; - return Ok(logout_res()); - }, - Err(_) => Err(MnmlHttpError::Unauthorized), - } - }, - None => Err(MnmlHttpError::Unauthorized), - } -} - -#[derive(Debug,Clone,Serialize,Deserialize)] -struct AccountRegisterParams { - name: String, - password: String, - code: String, -} - -fn register(state: web::Data, params: web::Json::) -> Result { - let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; - - match account::create(¶ms.name, ¶ms.password, ¶ms.code, &mut tx) { - Ok(token) => { - tx.commit().or(Err(MnmlHttpError::ServerError))?; - Ok(login_res(token)) - }, - Err(e) => { - info!("{:?}", e); - Err(MnmlHttpError::BadRequest) - } - } -} - -fn create_pool(url: String) -> Pool { - let manager = PostgresConnectionManager::new(url, TlsMode::None) - .expect("could not instantiate pg manager"); - - Pool::builder() - .max_size(DB_POOL_SIZE) - .build(manager) - .expect("Failed to create pool.") -} - -pub struct State { - pub pool: PgPool, - // pub pubsub: PubSub, -} - -pub fn start() { - let database_url = env::var("DATABASE_URL") - .expect("DATABASE_URL must be set"); - let pool = create_pool(database_url); - - let warden_pool = pool.clone(); - spawn(move || { - loop { - let db_connection = warden_pool.get().expect("unable to get db connection"); - if let Err(e) = warden(db_connection) { - info!("{:?}", e); - } - sleep(Duration::new(1, 0)); - } - }); - - let pubsub_pool = pool.clone(); - spawn(move || loop { - let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); - match pg_listen(pubsub_conn) { - Ok(_) => warn!("pg listen closed"), - Err(e) => warn!("pg_listen error {:?}", e), - } - }); - - HttpServer::new(move || App::new() - .data(State { pool: pool.clone() }) - .wrap(middleware::Logger::default()) - .wrap(Cors::new().supports_credentials()) - .service(web::resource("/api/login").route(web::post().to(login))) - .service(web::resource("/api/logout").route(web::post().to(logout))) - .service(web::resource("/api/register").route(web::post().to(register))) - - .service(web::resource("/api/payments/stripe") - .route(web::post().to(post_stripe_event))) - - // .service(web::resource("/api/payments/crypto") - // .route(web::post().to(post_stripe_payment))) - - .service(web::resource("/api/ws").route(web::get().to(connect)))) - .bind("127.0.0.1:40000").expect("could not bind to port") - .run().expect("could not start http server"); -} diff --git a/server/src/pubsub.rs b/server/src/events.rs similarity index 53% rename from server/src/pubsub.rs rename to server/src/events.rs index 38e1d301..d71adfba 100644 --- a/server/src/pubsub.rs +++ b/server/src/events.rs @@ -3,28 +3,14 @@ use uuid::Uuid; use fallible_iterator::{FallibleIterator}; use failure::Error; -use failure::err_msg; use crossbeam_channel::{unbounded, Sender, Receiver}; -use pg::{Db}; +use pg::{Db, PgPool}; use account; use game; use instance; -#[derive(Clone)] -pub struct PubSub { - pub s: Sender, - r: Receiver, -} - -impl PubSub { - pub fn new() -> PubSub { - let (s, r) = unbounded(); - return PubSub { s, r }; - } -} - #[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] #[serde(rename_all(deserialize = "lowercase"))] enum Table { @@ -53,13 +39,50 @@ struct Notification { #[derive(Debug,Clone)] pub enum Message { + // outgoing Account(account::Account), Game(game::Game), Instance(instance::Instance), + + // incoming + Connect(Sender), + Disconnect, } -fn handle_notification(n: Notification, db: &Db, pss: &Sender) -> Result<(), Error> { - info!("pubsub received notification notification={:?}", n); +#[derive(Clone)] +pub struct Events { + pub tx: Sender, + rx: Receiver, +} + +impl Events { + pub fn new() -> Events { + let (tx, rx) = unbounded(); + return Events { tx, rx }; + } + + fn receive(&mut self) { + match self.rx.try_recv() { + Ok(m) => self.on_message(m), + Err(_) => (), + } + } + + fn on_message(&mut self, msg: Message) { + match msg { + Message::Connect(tx) => { + info!("client connected {:?}", tx); + }, + Message::Disconnect => { + info!("client disconnected"); + } + _ => panic!("events received unhandled msg={:?}", msg), + } + } +} + +fn pg_notification(n: Notification, db: &Db, events: &Events) -> Result<(), Error> { + info!("events received notification notification={:?}", n); // maybe we need it let mut tx = db.transaction()?; @@ -77,29 +100,47 @@ fn handle_notification(n: Notification, db: &Db, pss: &Sender) -> Resul tx.commit()?; - match pss.try_send(msg.clone()) { - Ok(()) => info!("pubsub message sent message={:?}", msg), - Err(e) => warn!("pubsub delivery failure err={:?}", e), - }; + info!("got a msg to send to whoever cares {:?}", msg); + + // match events.try_send(msg.clone()) { + // Ok(()) => info!("events message sent message={:?}", msg), + // Err(e) => warn!("events delivery failure err={:?}", e), + // }; Ok(()) } -pub fn pg_listen(db: Db, ps: &PubSub) -> Result<(), Error> { +pub fn listen(pool: &PgPool, events: &mut Events) -> Result<(), Error> { + let db = pool.get()?; db.execute("LISTEN events;", &[])?; - info!("pubsub listening"); + info!("events listening"); + let notifications = db.notifications(); let mut n_iter = notifications.blocking_iter(); + + // main event loop, checks pg and checks messages loop { - let n = n_iter.next().unwrap(); + // check notifications + let n = n_iter.next()?; if let Some(n) = n { match serde_json::from_str::(&n.payload) { - Ok(notification) => match handle_notification(notification, &db, &ps.s) { + Ok(notification) => match pg_notification(notification, &db, &events) { Ok(()) => (), Err(e) => warn!("{:?}", e), } Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e), }; } + + events.receive(); + } +} + +pub fn start(pool: PgPool, mut events: Events) { + loop { + match listen(&pool, &mut events) { + Ok(()) => panic!("events listen returned"), + Err(e) => warn!("events listener error err={:?}", e), + } } } diff --git a/server/src/main.rs b/server/src/main.rs index 531c2729..e6ea1b01 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -41,7 +41,7 @@ mod net; mod payments; mod pg; mod player; -mod pubsub; +mod events; mod rpc; mod skill; mod spec; @@ -54,9 +54,7 @@ use std::thread::{sleep, spawn}; use std::time::{Duration}; use std::path::{Path}; -use crossbeam_channel::{unbounded}; - -use pubsub::pg_listen; +use events::{start as events_start}; use warden::warden; fn setup_logger() -> Result<(), fern::InitError> { @@ -85,13 +83,7 @@ fn main() { let pool = pg::create_pool(); - let ws_pool = pool.clone(); - let http_pool = pool.clone(); let warden_pool = pool.clone(); - let pubsub_pool = pool.clone(); - - let pubsub = pubsub::PubSub::new(); - spawn(move || { loop { let db_connection = warden_pool.get().expect("unable to get db connection"); @@ -102,16 +94,16 @@ fn main() { } }); - let pg_listener = pubsub.clone(); - spawn(move || loop { - let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); - match pg_listen(pubsub_conn, &pg_listener) { - Ok(_) => warn!("pg listen closed"), - Err(e) => warn!("pg_listen error {:?}", e), - } - }); + let events = events::Events::new(); + let event_listener = events.clone(); + let events_pool = pool.clone(); + // spawn(move || events_start(events_pool, event_listener)); + let http_pool = pool.clone(); spawn(move || net::start(http_pool)); - ws::start(ws_pool, pubsub); + + // this should go on a thread too? + let ws_pool = pool.clone(); + ws::start(ws_pool, events); info!("server started"); } diff --git a/server/src/net.rs b/server/src/net.rs index 12bae55a..b67abe78 100644 --- a/server/src/net.rs +++ b/server/src/net.rs @@ -12,7 +12,7 @@ use router::Router; use serde::{Serialize, Deserialize}; // use warden::{warden}; -// use pubsub::{pg_listen}; +// use events::{pg_listen}; // use ws::{connect}; use account; use pg::PgPool; @@ -103,7 +103,7 @@ impl From for IronError { MnmlHttpError::AccountNotFound | MnmlHttpError::BadRequest | MnmlHttpError::PasswordUnacceptable => (m_err.compat(), status::BadRequest), - + MnmlHttpError::PasswordNotMatch | MnmlHttpError::InvalidCode | MnmlHttpError::TokenDoesNotMatch | @@ -260,7 +260,7 @@ const MAX_BODY_LENGTH: usize = 1024 * 1024 * 10; pub struct State { pub pool: PgPool, - // pub pubsub: PubSub, + // pub events: Events, } impl Key for State { type Value = State; } diff --git a/server/src/rpc.rs b/server/src/rpc.rs index bf4ce82b..3bbcec7d 100644 --- a/server/src/rpc.rs +++ b/server/src/rpc.rs @@ -7,7 +7,6 @@ use uuid::Uuid; use failure::Error; use failure::err_msg; -use ws::{Ws}; use pg::{Db}; use construct::{Construct}; use game::{Game, game_state, game_skill, game_ready}; diff --git a/server/src/ws.rs b/server/src/ws.rs index d0c43bb9..1ffcc887 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -24,85 +24,69 @@ use failure::{err_msg, format_err}; use net::TOKEN_HEADER; use rpc; -use rpc::{RpcMessage}; use mtx; use pg::PgPool; use account; use account::Account; -use pubsub::{Message, PubSub}; +use events::{Message, Events}; -pub struct Ws { - client: WebSocket, - sender: Sender, - receiver: Receiver, - pool: PgPool, - account: Option, -} +pub fn ws(mut client: WebSocket, pool: PgPool, account: Option, events: Sender) { + let (tx, rx) = unbounded(); + events.try_send(Message::Connect(tx)).unwrap(); -impl Ws { - pub fn new(client: WebSocket, pool: PgPool, account: Option) -> Ws { - let (sender, receiver) = unbounded(); - return Ws { sender, receiver, client, pool, account }; - } + loop { + match client.read_message().no_block() { + Ok(msg) => { + if let Some(msg) = msg { + match msg { + Binary(data) => { + let begin = Instant::now(); + let db_connection = pool.get() + .expect("unable to get db connection"); - fn start(&mut self) { - loop { - match self.client.read_message().no_block() { - Ok(msg) => { - if let Some(msg) = msg { - match msg { - Binary(data) => { - let begin = Instant::now(); - let db_connection = self.pool.get() - .expect("unable to get db connection"); + match rpc::receive(data, &db_connection, begin, &account) { + Ok(reply) => { + let response = to_vec(&reply) + .expect("failed to serialize response"); - match rpc::receive(data, &db_connection, begin, &self.account) { - Ok(reply) => { - let response = to_vec(&reply) - .expect("failed to serialize response"); - - if let Err(e) = self.client.write_message(Binary(response)) { - // connection closed - warn!("{:?}", e); - return; - }; - - // self.subscriptions.update(&reply).unwrap(); - }, - Err(e) => { + if let Err(e) = client.write_message(Binary(response)) { + // connection closed warn!("{:?}", e); - let response = to_vec(&RpcError { err: e.to_string() }) - .expect("failed to serialize error response"); + return; + }; - if let Err(e) = self.client.write_message(Binary(response)) { - // connection closed - warn!("{:?}", e); - return; - }; - } + // subscriptions.update(&reply).unwrap(); + }, + Err(e) => { + warn!("{:?}", e); + let response = to_vec(&RpcError { err: e.to_string() }) + .expect("failed to serialize error response"); + + if let Err(e) = client.write_message(Binary(response)) { + // connection closed + warn!("{:?}", e); + return; + }; } - }, - _ => (), - } - }; + } + }, + _ => (), + } + }; - self.receive(); - }, - // connection is closed - Err(e) => { - warn!("{:?}", e); - return; - } - }; - } - } + // match receiver.try_recv() { + // Ok(n) => handle_message(&subs, n, &mut websocket), + // Err(_) => (), + // }; - fn receive(&mut self) { - // match self.receiver.try_recv() { - // Ok(n) => handle_message(&subs, n, &mut websocket), - // Err(_) => (), - // }; + }, + // connection is closed + Err(e) => { + warn!("{:?}", e); + return; + } + }; } } @@ -111,103 +95,108 @@ struct RpcError { err: String, } -#[derive(Debug)] -struct Subscriptions { - account: Option, - game: Option, - instance: Option, - // account_instances: Vec, -} +// #[derive(Debug)] +// struct Subscriptions { +// account: Option, +// game: Option, +// instance: Option, +// // account_instances: Vec, +// } -impl Subscriptions { - fn new(ws_pool: &PgPool, account: &Option, ws: &mut Ws) -> Result { - if let Some(a) = account { - let db = ws_pool.get()?; - let mut tx = db.transaction()?; +// impl Subscriptions { +// fn new(ws_pool: &PgPool, account: &Option, ws: &mut Ws) -> Result { +// if let Some(a) = account { +// let db = ws_pool.get()?; +// let mut tx = db.transaction()?; - // send account constructs - let account_constructs = account::account_constructs(&mut tx, a)?; - ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; +// // send account constructs +// let account_constructs = account::account_constructs(&mut tx, a)?; +// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; - // get account instances - // and send them to the client - let account_instances = account::account_instances(&mut tx, a)?; - // let instances = account_instances.iter().map(|i| i.id).collect::>(); - ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; +// // get account instances +// // and send them to the client +// let account_instances = account::account_instances(&mut tx, a)?; +// // let instances = account_instances.iter().map(|i| i.id).collect::>(); +// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; - // get players - // add to games +// // get players +// // add to games +// tx.commit()?; - tx.commit()?; +// return Ok(Subscriptions { +// account: Some(a.id), +// game: None, +// instance: None, +// }) +// } - return Ok(Subscriptions { - account: Some(a.id), - game: None, - instance: None, - }) - } +// Ok(Subscriptions { +// account: None, +// game: None, +// instance: None +// }) +// } - Ok(Subscriptions { - account: None, - game: None, - instance: None - }) - } +// fn update(&mut self, msg: &RpcMessage) -> Result<&mut Subscriptions, Error> { +// match msg { +// RpcMessage::AccountState(a) => self.account = Some(a.id), +// RpcMessage::InstanceState(i) => self.instance = Some(i.id), +// RpcMessage::GameState(g) => self.game = Some(g.id), +// _ => (), +// }; - fn update(&mut self, msg: &RpcMessage) -> Result<&mut Subscriptions, Error> { - match msg { - RpcMessage::AccountState(a) => self.account = Some(a.id), - RpcMessage::InstanceState(i) => self.instance = Some(i.id), - RpcMessage::GameState(g) => self.game = Some(g.id), - _ => (), - }; +// // info!("subscriptions updated {:?}", self); - // info!("subscriptions updated {:?}", self); - - Ok(self) - } -} +// Ok(self) +// } +// } -fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) { - if let Some(msg) = match m { - Message::Account(a) => { - match subs.account { - Some(wsa) => match wsa == a.id { - true => Some(rpc::RpcMessage::AccountState(a)), - false => None, - }, - None => None, - } - }, - Message::Instance(i) => { - match subs.instance { - Some(ci) => match ci == i.id { - true => Some(rpc::RpcMessage::InstanceState(i)), - false => None, - }, - None => None, - } - }, - Message::Game(g) => { - match subs.game { - Some(cg) => match cg == g.id { - true => Some(rpc::RpcMessage::GameState(g)), - false => None, - }, - None => None, - } - }, - // _ => None, - } { - ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); - } -} +// fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) { +// if let Some(msg) = match m { +// Message::Account(a) => { +// match subs.account { +// Some(wsa) => match wsa == a.id { +// true => Some(rpc::RpcMessage::AccountState(a)), +// false => None, +// }, +// None => None, +// } +// }, +// Message::Instance(i) => { +// match subs.instance { +// Some(ci) => match ci == i.id { +// true => Some(rpc::RpcMessage::InstanceState(i)), +// false => None, +// }, +// None => None, +// } +// }, +// Message::Game(g) => { +// match subs.game { +// Some(cg) => match cg == g.id { +// true => Some(rpc::RpcMessage::GameState(g)), +// false => None, +// }, +// None => None, +// } +// }, +// Message::Connect(tx) => { +// info!("client connected {:?}", tx); +// None +// }, +// // _ => None, +// } { +// ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); +// } +// } -pub fn start(pool: PgPool, ps: PubSub) { +pub fn start(pool: PgPool, events: Events) { let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap(); for stream in ws_server.incoming() { let ws_pool = pool.clone(); + let events_tx = events.tx.clone(); + spawn(move || { let (acc_s, acc_r) = unbounded(); @@ -272,8 +261,7 @@ pub fn start(pool: PgPool, ps: PubSub) { None => None, }; - Ws::new(client, ws_pool, account) - .start() + ws(client, ws_pool, account, events_tx) }); } }