From 5851f20211ba449eea2c40a31a2621e0a0bf80cc Mon Sep 17 00:00:00 2001 From: ntr Date: Wed, 24 Jul 2019 16:35:33 +1000 Subject: [PATCH 01/11] wip --- server/src/main.rs | 7 +- server/src/pubsub.rs | 19 +++++- server/src/rpc.rs | 2 +- server/src/ws.rs | 155 +++++++++++++++++++++++-------------------- 4 files changed, 105 insertions(+), 78 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 7434ad6f..531c2729 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -90,7 +90,7 @@ fn main() { let warden_pool = pool.clone(); let pubsub_pool = pool.clone(); - let (pss, psr) = unbounded(); + let pubsub = pubsub::PubSub::new(); spawn(move || { loop { @@ -102,15 +102,16 @@ fn main() { } }); + let pg_listener = pubsub.clone(); spawn(move || loop { let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); - match pg_listen(pubsub_conn, pss.clone()) { + match pg_listen(pubsub_conn, &pg_listener) { Ok(_) => warn!("pg listen closed"), Err(e) => warn!("pg_listen error {:?}", e), } }); spawn(move || net::start(http_pool)); - ws::start(ws_pool, psr); + ws::start(ws_pool, pubsub); info!("server started"); } diff --git a/server/src/pubsub.rs b/server/src/pubsub.rs index 6a4cd90b..38e1d301 100644 --- a/server/src/pubsub.rs +++ b/server/src/pubsub.rs @@ -5,13 +5,26 @@ use fallible_iterator::{FallibleIterator}; use failure::Error; use failure::err_msg; -use crossbeam_channel::{Sender}; +use crossbeam_channel::{unbounded, Sender, Receiver}; use pg::{Db}; use account; use game; use instance; +#[derive(Clone)] +pub struct PubSub { + pub s: Sender, + r: Receiver, +} + +impl PubSub { + pub fn new() -> PubSub { + let (s, r) = unbounded(); + return PubSub { s, r }; + } +} + #[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] #[serde(rename_all(deserialize = "lowercase"))] enum Table { @@ -72,7 +85,7 @@ fn handle_notification(n: Notification, db: &Db, pss: &Sender) -> Resul Ok(()) } -pub fn pg_listen(db: Db, pss: Sender) -> Result<(), Error> { +pub fn pg_listen(db: Db, ps: &PubSub) -> Result<(), Error> { db.execute("LISTEN events;", &[])?; info!("pubsub listening"); let notifications = db.notifications(); @@ -81,7 +94,7 @@ pub fn pg_listen(db: Db, pss: Sender) -> Result<(), Error> { let n = n_iter.next().unwrap(); if let Some(n) = n { match serde_json::from_str::(&n.payload) { - Ok(notification) => match handle_notification(notification, &db, &pss) { + Ok(notification) => match handle_notification(notification, &db, &ps.s) { Ok(()) => (), Err(e) => warn!("{:?}", e), } diff --git a/server/src/rpc.rs b/server/src/rpc.rs index 897f459d..bf4ce82b 100644 --- a/server/src/rpc.rs +++ b/server/src/rpc.rs @@ -68,7 +68,7 @@ enum RpcRequest { VboxReclaim { instance_id: Uuid, index: usize }, } -pub fn receive(data: Vec, db: &Db, _client: &mut Ws, begin: Instant, account: &Option) -> Result { +pub fn receive(data: Vec, db: &Db, begin: Instant, account: &Option) -> Result { // cast the msg to this type to receive method name match from_slice::(&data) { Ok(v) => { diff --git a/server/src/ws.rs b/server/src/ws.rs index f5ab2412..d0c43bb9 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -15,7 +15,7 @@ use tungstenite::protocol::WebSocket; use tungstenite::util::NonBlockingResult; use tungstenite::{accept_hdr}; -use crossbeam_channel::{unbounded, Receiver}; +use crossbeam_channel::{unbounded, Receiver, Sender}; use serde_cbor::{to_vec}; @@ -30,9 +30,81 @@ use mtx; use pg::PgPool; use account; use account::Account; -use pubsub::Message; +use pubsub::{Message, PubSub}; -pub type Ws = WebSocket; +pub struct Ws { + client: WebSocket, + sender: Sender, + receiver: Receiver, + pool: PgPool, + account: Option, +} + +impl Ws { + pub fn new(client: WebSocket, pool: PgPool, account: Option) -> Ws { + let (sender, receiver) = unbounded(); + return Ws { sender, receiver, client, pool, account }; + } + + fn start(&mut self) { + loop { + match self.client.read_message().no_block() { + Ok(msg) => { + if let Some(msg) = msg { + match msg { + Binary(data) => { + let begin = Instant::now(); + let db_connection = self.pool.get() + .expect("unable to get db connection"); + + match rpc::receive(data, &db_connection, begin, &self.account) { + Ok(reply) => { + let response = to_vec(&reply) + .expect("failed to serialize response"); + + if let Err(e) = self.client.write_message(Binary(response)) { + // connection closed + warn!("{:?}", e); + return; + }; + + // self.subscriptions.update(&reply).unwrap(); + }, + Err(e) => { + warn!("{:?}", e); + let response = to_vec(&RpcError { err: e.to_string() }) + .expect("failed to serialize error response"); + + if let Err(e) = self.client.write_message(Binary(response)) { + // connection closed + warn!("{:?}", e); + return; + }; + } + } + }, + _ => (), + } + }; + + self.receive(); + }, + // connection is closed + Err(e) => { + warn!("{:?}", e); + return; + } + }; + } + } + + fn receive(&mut self) { + // match self.receiver.try_recv() { + // Ok(n) => handle_message(&subs, n, &mut websocket), + // Err(_) => (), + // }; + } +} #[derive(Debug,Clone,Serialize)] struct RpcError { @@ -55,13 +127,13 @@ impl Subscriptions { // send account constructs let account_constructs = account::account_constructs(&mut tx, a)?; - ws.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; + ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; // get account instances // and send them to the client let account_instances = account::account_instances(&mut tx, a)?; // let instances = account_instances.iter().map(|i| i.id).collect::>(); - ws.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; + ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; // get players // add to games @@ -128,15 +200,14 @@ fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) { }, // _ => None, } { - ws.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); + ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); } } -pub fn start(pool: PgPool, psr: Receiver) { +pub fn start(pool: PgPool, ps: PubSub) { let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap(); for stream in ws_server.incoming() { let ws_pool = pool.clone(); - let ws_psr = psr.clone(); spawn(move || { let (acc_s, acc_r) = unbounded(); @@ -167,7 +238,7 @@ pub fn start(pool: PgPool, psr: Receiver) { Ok(None) }; - let mut websocket = accept_hdr(nb_stream, cb).unwrap(); + let mut client = accept_hdr(nb_stream, cb).unwrap(); // get a copy of the account let account = match acc_r.recv().unwrap() { @@ -179,13 +250,13 @@ pub fn start(pool: PgPool, psr: Receiver) { match account::from_token(&db, t) { Ok(a) => { let state = to_vec(&rpc::RpcMessage::AccountState(a.clone())).unwrap(); - websocket.write_message(Binary(state)).unwrap(); + client.write_message(Binary(state)).unwrap(); let mut tx = db.transaction().unwrap(); let shop = mtx::account_shop(&mut tx, &a).unwrap(); let shop = to_vec(&rpc::RpcMessage::AccountShop(shop)).unwrap(); - websocket.write_message(Binary(shop)).unwrap(); + client.write_message(Binary(shop)).unwrap(); // tx doesn't change anything tx.commit().unwrap(); @@ -201,66 +272,8 @@ pub fn start(pool: PgPool, psr: Receiver) { None => None, }; - let mut subs = match Subscriptions::new(&ws_pool, &account, &mut websocket) { - Ok(s) => s, - Err(e) => { - warn!("subscriptions error err={:?}", e); - return; - }, - }; - - loop { - match websocket.read_message().no_block() { - Ok(msg) => { - if let Some(msg) = msg { - match msg { - Binary(data) => { - let begin = Instant::now(); - let db_connection = ws_pool.get() - .expect("unable to get db connection"); - - match rpc::receive(data, &db_connection, &mut websocket, begin, &account) { - Ok(reply) => { - let response = to_vec(&reply) - .expect("failed to serialize response"); - - if let Err(e) = websocket.write_message(Binary(response)) { - // connection closed - warn!("{:?}", e); - return; - }; - - subs.update(&reply).unwrap(); - }, - Err(e) => { - warn!("{:?}", e); - let response = to_vec(&RpcError { err: e.to_string() }) - .expect("failed to serialize error response"); - - if let Err(e) = websocket.write_message(Binary(response)) { - // connection closed - warn!("{:?}", e); - return; - }; - } - } - }, - _ => (), - } - }; - - match ws_psr.try_recv() { - Ok(n) => handle_message(&subs, n, &mut websocket), - Err(_) => (), - }; - }, - // connection is closed - Err(e) => { - warn!("{:?}", e); - return; - } - }; - } + Ws::new(client, ws_pool, account) + .start() }); } } From a77bec2c9fbbf7dc501aaded90878623de7c047b Mon Sep 17 00:00:00 2001 From: ntr Date: Thu, 25 Jul 2019 14:33:03 +1000 Subject: [PATCH 02/11] wip --- server/src/actixnet.rs | 221 --------------------- server/src/{pubsub.rs => events.rs} | 91 ++++++--- server/src/main.rs | 30 ++- server/src/net.rs | 6 +- server/src/rpc.rs | 1 - server/src/ws.rs | 286 +++++++++++++--------------- 6 files changed, 217 insertions(+), 418 deletions(-) delete mode 100644 server/src/actixnet.rs rename server/src/{pubsub.rs => events.rs} (53%) diff --git a/server/src/actixnet.rs b/server/src/actixnet.rs deleted file mode 100644 index cbab4b5b..00000000 --- a/server/src/actixnet.rs +++ /dev/null @@ -1,221 +0,0 @@ -use std::env; -use std::thread::{spawn, sleep}; -use std::time::{Duration}; - -use actix_web::{middleware, web, App, HttpMessage, HttpRequest, HttpResponse, HttpServer}; -use actix_web::error::ResponseError; -use actix_web::http::{Cookie}; -use actix_web::cookie::{SameSite}; -use actix_cors::Cors; - -use r2d2::{Pool}; -use r2d2::{PooledConnection}; -use r2d2_postgres::{TlsMode, PostgresConnectionManager}; - -use warden::{warden}; -use pubsub::{pg_listen}; -use ws::{connect}; -use account; -use payments::{post_stripe_event}; - -pub type Db = PooledConnection; -pub type PgPool = Pool; - -const DB_POOL_SIZE: u32 = 20; - -#[derive(Debug,Clone,Serialize,Deserialize)] -pub struct JsonError { - pub err: String -} - -#[derive(Fail, Debug, Serialize, Deserialize)] -pub enum MnmlHttpError { - // User Facing Errors - #[fail(display="internal server error")] - ServerError, - #[fail(display="unauthorized")] - Unauthorized, - #[fail(display="bad request")] - BadRequest, - #[fail(display="account name taken or invalid")] - AccountNameTaken, - #[fail(display="password unacceptable. must be > 11 characters")] - PasswordUnacceptable, - #[fail(display="invalid code. https://discord.gg/YJJgurM")] - InvalidCode, -} - -impl ResponseError for MnmlHttpError { - fn error_response(&self) -> HttpResponse { - match *self { - MnmlHttpError::ServerError => HttpResponse::InternalServerError() - .json(JsonError { err: self.to_string() }), - - MnmlHttpError::BadRequest => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), - - MnmlHttpError::Unauthorized => HttpResponse::Unauthorized() - .cookie(Cookie::build("x-auth-token", "") - // .secure(secure) - .http_only(true) - .same_site(SameSite::Strict) - .max_age(-1) // 1 week aligns with db set - .finish()) - .json(JsonError { err: self.to_string() }), - - MnmlHttpError::AccountNameTaken => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), - - MnmlHttpError::PasswordUnacceptable => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), - - MnmlHttpError::InvalidCode => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), - } - } -} - -fn login_res(token: String) -> HttpResponse { - HttpResponse::Ok() - .cookie(Cookie::build("x-auth-token", token) - // .secure(secure) - .http_only(true) - .same_site(SameSite::Strict) - .max_age(60 * 60 * 24 * 7) // 1 week aligns with db set - .finish()) - .finish() -} - -fn logout_res() -> HttpResponse { - HttpResponse::Ok() - .cookie(Cookie::build("x-auth-token", "") - // .secure(secure) - .http_only(true) - .same_site(SameSite::Strict) - .max_age(-1) - .finish()) - .finish() -} - -#[derive(Debug,Clone,Serialize,Deserialize)] -struct AccountLoginParams { - name: String, - password: String, -} - -fn login(state: web::Data, params: web::Json::) -> Result { - let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; - - match account::login(&mut tx, ¶ms.name, ¶ms.password) { - Ok(a) => { - let token = account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::ServerError))?; - tx.commit().or(Err(MnmlHttpError::ServerError))?; - Ok(login_res(token)) - }, - Err(e) => { - info!("{:?}", e); - Err(MnmlHttpError::Unauthorized) - } - } -} - -fn logout(r: HttpRequest, state: web::Data) -> Result { - match r.cookie("x-auth-token") { - Some(t) => { - let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; - match account::from_token(&mut tx, t.value().to_string()) { - Ok(a) => { - account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::Unauthorized))?; - tx.commit().or(Err(MnmlHttpError::ServerError))?; - return Ok(logout_res()); - }, - Err(_) => Err(MnmlHttpError::Unauthorized), - } - }, - None => Err(MnmlHttpError::Unauthorized), - } -} - -#[derive(Debug,Clone,Serialize,Deserialize)] -struct AccountRegisterParams { - name: String, - password: String, - code: String, -} - -fn register(state: web::Data, params: web::Json::) -> Result { - let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; - - match account::create(¶ms.name, ¶ms.password, ¶ms.code, &mut tx) { - Ok(token) => { - tx.commit().or(Err(MnmlHttpError::ServerError))?; - Ok(login_res(token)) - }, - Err(e) => { - info!("{:?}", e); - Err(MnmlHttpError::BadRequest) - } - } -} - -fn create_pool(url: String) -> Pool { - let manager = PostgresConnectionManager::new(url, TlsMode::None) - .expect("could not instantiate pg manager"); - - Pool::builder() - .max_size(DB_POOL_SIZE) - .build(manager) - .expect("Failed to create pool.") -} - -pub struct State { - pub pool: PgPool, - // pub pubsub: PubSub, -} - -pub fn start() { - let database_url = env::var("DATABASE_URL") - .expect("DATABASE_URL must be set"); - let pool = create_pool(database_url); - - let warden_pool = pool.clone(); - spawn(move || { - loop { - let db_connection = warden_pool.get().expect("unable to get db connection"); - if let Err(e) = warden(db_connection) { - info!("{:?}", e); - } - sleep(Duration::new(1, 0)); - } - }); - - let pubsub_pool = pool.clone(); - spawn(move || loop { - let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); - match pg_listen(pubsub_conn) { - Ok(_) => warn!("pg listen closed"), - Err(e) => warn!("pg_listen error {:?}", e), - } - }); - - HttpServer::new(move || App::new() - .data(State { pool: pool.clone() }) - .wrap(middleware::Logger::default()) - .wrap(Cors::new().supports_credentials()) - .service(web::resource("/api/login").route(web::post().to(login))) - .service(web::resource("/api/logout").route(web::post().to(logout))) - .service(web::resource("/api/register").route(web::post().to(register))) - - .service(web::resource("/api/payments/stripe") - .route(web::post().to(post_stripe_event))) - - // .service(web::resource("/api/payments/crypto") - // .route(web::post().to(post_stripe_payment))) - - .service(web::resource("/api/ws").route(web::get().to(connect)))) - .bind("127.0.0.1:40000").expect("could not bind to port") - .run().expect("could not start http server"); -} diff --git a/server/src/pubsub.rs b/server/src/events.rs similarity index 53% rename from server/src/pubsub.rs rename to server/src/events.rs index 38e1d301..d71adfba 100644 --- a/server/src/pubsub.rs +++ b/server/src/events.rs @@ -3,28 +3,14 @@ use uuid::Uuid; use fallible_iterator::{FallibleIterator}; use failure::Error; -use failure::err_msg; use crossbeam_channel::{unbounded, Sender, Receiver}; -use pg::{Db}; +use pg::{Db, PgPool}; use account; use game; use instance; -#[derive(Clone)] -pub struct PubSub { - pub s: Sender, - r: Receiver, -} - -impl PubSub { - pub fn new() -> PubSub { - let (s, r) = unbounded(); - return PubSub { s, r }; - } -} - #[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] #[serde(rename_all(deserialize = "lowercase"))] enum Table { @@ -53,13 +39,50 @@ struct Notification { #[derive(Debug,Clone)] pub enum Message { + // outgoing Account(account::Account), Game(game::Game), Instance(instance::Instance), + + // incoming + Connect(Sender), + Disconnect, } -fn handle_notification(n: Notification, db: &Db, pss: &Sender) -> Result<(), Error> { - info!("pubsub received notification notification={:?}", n); +#[derive(Clone)] +pub struct Events { + pub tx: Sender, + rx: Receiver, +} + +impl Events { + pub fn new() -> Events { + let (tx, rx) = unbounded(); + return Events { tx, rx }; + } + + fn receive(&mut self) { + match self.rx.try_recv() { + Ok(m) => self.on_message(m), + Err(_) => (), + } + } + + fn on_message(&mut self, msg: Message) { + match msg { + Message::Connect(tx) => { + info!("client connected {:?}", tx); + }, + Message::Disconnect => { + info!("client disconnected"); + } + _ => panic!("events received unhandled msg={:?}", msg), + } + } +} + +fn pg_notification(n: Notification, db: &Db, events: &Events) -> Result<(), Error> { + info!("events received notification notification={:?}", n); // maybe we need it let mut tx = db.transaction()?; @@ -77,29 +100,47 @@ fn handle_notification(n: Notification, db: &Db, pss: &Sender) -> Resul tx.commit()?; - match pss.try_send(msg.clone()) { - Ok(()) => info!("pubsub message sent message={:?}", msg), - Err(e) => warn!("pubsub delivery failure err={:?}", e), - }; + info!("got a msg to send to whoever cares {:?}", msg); + + // match events.try_send(msg.clone()) { + // Ok(()) => info!("events message sent message={:?}", msg), + // Err(e) => warn!("events delivery failure err={:?}", e), + // }; Ok(()) } -pub fn pg_listen(db: Db, ps: &PubSub) -> Result<(), Error> { +pub fn listen(pool: &PgPool, events: &mut Events) -> Result<(), Error> { + let db = pool.get()?; db.execute("LISTEN events;", &[])?; - info!("pubsub listening"); + info!("events listening"); + let notifications = db.notifications(); let mut n_iter = notifications.blocking_iter(); + + // main event loop, checks pg and checks messages loop { - let n = n_iter.next().unwrap(); + // check notifications + let n = n_iter.next()?; if let Some(n) = n { match serde_json::from_str::(&n.payload) { - Ok(notification) => match handle_notification(notification, &db, &ps.s) { + Ok(notification) => match pg_notification(notification, &db, &events) { Ok(()) => (), Err(e) => warn!("{:?}", e), } Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e), }; } + + events.receive(); + } +} + +pub fn start(pool: PgPool, mut events: Events) { + loop { + match listen(&pool, &mut events) { + Ok(()) => panic!("events listen returned"), + Err(e) => warn!("events listener error err={:?}", e), + } } } diff --git a/server/src/main.rs b/server/src/main.rs index 531c2729..e6ea1b01 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -41,7 +41,7 @@ mod net; mod payments; mod pg; mod player; -mod pubsub; +mod events; mod rpc; mod skill; mod spec; @@ -54,9 +54,7 @@ use std::thread::{sleep, spawn}; use std::time::{Duration}; use std::path::{Path}; -use crossbeam_channel::{unbounded}; - -use pubsub::pg_listen; +use events::{start as events_start}; use warden::warden; fn setup_logger() -> Result<(), fern::InitError> { @@ -85,13 +83,7 @@ fn main() { let pool = pg::create_pool(); - let ws_pool = pool.clone(); - let http_pool = pool.clone(); let warden_pool = pool.clone(); - let pubsub_pool = pool.clone(); - - let pubsub = pubsub::PubSub::new(); - spawn(move || { loop { let db_connection = warden_pool.get().expect("unable to get db connection"); @@ -102,16 +94,16 @@ fn main() { } }); - let pg_listener = pubsub.clone(); - spawn(move || loop { - let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); - match pg_listen(pubsub_conn, &pg_listener) { - Ok(_) => warn!("pg listen closed"), - Err(e) => warn!("pg_listen error {:?}", e), - } - }); + let events = events::Events::new(); + let event_listener = events.clone(); + let events_pool = pool.clone(); + // spawn(move || events_start(events_pool, event_listener)); + let http_pool = pool.clone(); spawn(move || net::start(http_pool)); - ws::start(ws_pool, pubsub); + + // this should go on a thread too? + let ws_pool = pool.clone(); + ws::start(ws_pool, events); info!("server started"); } diff --git a/server/src/net.rs b/server/src/net.rs index 12bae55a..b67abe78 100644 --- a/server/src/net.rs +++ b/server/src/net.rs @@ -12,7 +12,7 @@ use router::Router; use serde::{Serialize, Deserialize}; // use warden::{warden}; -// use pubsub::{pg_listen}; +// use events::{pg_listen}; // use ws::{connect}; use account; use pg::PgPool; @@ -103,7 +103,7 @@ impl From for IronError { MnmlHttpError::AccountNotFound | MnmlHttpError::BadRequest | MnmlHttpError::PasswordUnacceptable => (m_err.compat(), status::BadRequest), - + MnmlHttpError::PasswordNotMatch | MnmlHttpError::InvalidCode | MnmlHttpError::TokenDoesNotMatch | @@ -260,7 +260,7 @@ const MAX_BODY_LENGTH: usize = 1024 * 1024 * 10; pub struct State { pub pool: PgPool, - // pub pubsub: PubSub, + // pub events: Events, } impl Key for State { type Value = State; } diff --git a/server/src/rpc.rs b/server/src/rpc.rs index bf4ce82b..3bbcec7d 100644 --- a/server/src/rpc.rs +++ b/server/src/rpc.rs @@ -7,7 +7,6 @@ use uuid::Uuid; use failure::Error; use failure::err_msg; -use ws::{Ws}; use pg::{Db}; use construct::{Construct}; use game::{Game, game_state, game_skill, game_ready}; diff --git a/server/src/ws.rs b/server/src/ws.rs index d0c43bb9..1ffcc887 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -24,85 +24,69 @@ use failure::{err_msg, format_err}; use net::TOKEN_HEADER; use rpc; -use rpc::{RpcMessage}; use mtx; use pg::PgPool; use account; use account::Account; -use pubsub::{Message, PubSub}; +use events::{Message, Events}; -pub struct Ws { - client: WebSocket, - sender: Sender, - receiver: Receiver, - pool: PgPool, - account: Option, -} +pub fn ws(mut client: WebSocket, pool: PgPool, account: Option, events: Sender) { + let (tx, rx) = unbounded(); + events.try_send(Message::Connect(tx)).unwrap(); -impl Ws { - pub fn new(client: WebSocket, pool: PgPool, account: Option) -> Ws { - let (sender, receiver) = unbounded(); - return Ws { sender, receiver, client, pool, account }; - } + loop { + match client.read_message().no_block() { + Ok(msg) => { + if let Some(msg) = msg { + match msg { + Binary(data) => { + let begin = Instant::now(); + let db_connection = pool.get() + .expect("unable to get db connection"); - fn start(&mut self) { - loop { - match self.client.read_message().no_block() { - Ok(msg) => { - if let Some(msg) = msg { - match msg { - Binary(data) => { - let begin = Instant::now(); - let db_connection = self.pool.get() - .expect("unable to get db connection"); + match rpc::receive(data, &db_connection, begin, &account) { + Ok(reply) => { + let response = to_vec(&reply) + .expect("failed to serialize response"); - match rpc::receive(data, &db_connection, begin, &self.account) { - Ok(reply) => { - let response = to_vec(&reply) - .expect("failed to serialize response"); - - if let Err(e) = self.client.write_message(Binary(response)) { - // connection closed - warn!("{:?}", e); - return; - }; - - // self.subscriptions.update(&reply).unwrap(); - }, - Err(e) => { + if let Err(e) = client.write_message(Binary(response)) { + // connection closed warn!("{:?}", e); - let response = to_vec(&RpcError { err: e.to_string() }) - .expect("failed to serialize error response"); + return; + }; - if let Err(e) = self.client.write_message(Binary(response)) { - // connection closed - warn!("{:?}", e); - return; - }; - } + // subscriptions.update(&reply).unwrap(); + }, + Err(e) => { + warn!("{:?}", e); + let response = to_vec(&RpcError { err: e.to_string() }) + .expect("failed to serialize error response"); + + if let Err(e) = client.write_message(Binary(response)) { + // connection closed + warn!("{:?}", e); + return; + }; } - }, - _ => (), - } - }; + } + }, + _ => (), + } + }; - self.receive(); - }, - // connection is closed - Err(e) => { - warn!("{:?}", e); - return; - } - }; - } - } + // match receiver.try_recv() { + // Ok(n) => handle_message(&subs, n, &mut websocket), + // Err(_) => (), + // }; - fn receive(&mut self) { - // match self.receiver.try_recv() { - // Ok(n) => handle_message(&subs, n, &mut websocket), - // Err(_) => (), - // }; + }, + // connection is closed + Err(e) => { + warn!("{:?}", e); + return; + } + }; } } @@ -111,103 +95,108 @@ struct RpcError { err: String, } -#[derive(Debug)] -struct Subscriptions { - account: Option, - game: Option, - instance: Option, - // account_instances: Vec, -} +// #[derive(Debug)] +// struct Subscriptions { +// account: Option, +// game: Option, +// instance: Option, +// // account_instances: Vec, +// } -impl Subscriptions { - fn new(ws_pool: &PgPool, account: &Option, ws: &mut Ws) -> Result { - if let Some(a) = account { - let db = ws_pool.get()?; - let mut tx = db.transaction()?; +// impl Subscriptions { +// fn new(ws_pool: &PgPool, account: &Option, ws: &mut Ws) -> Result { +// if let Some(a) = account { +// let db = ws_pool.get()?; +// let mut tx = db.transaction()?; - // send account constructs - let account_constructs = account::account_constructs(&mut tx, a)?; - ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; +// // send account constructs +// let account_constructs = account::account_constructs(&mut tx, a)?; +// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; - // get account instances - // and send them to the client - let account_instances = account::account_instances(&mut tx, a)?; - // let instances = account_instances.iter().map(|i| i.id).collect::>(); - ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; +// // get account instances +// // and send them to the client +// let account_instances = account::account_instances(&mut tx, a)?; +// // let instances = account_instances.iter().map(|i| i.id).collect::>(); +// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; - // get players - // add to games +// // get players +// // add to games +// tx.commit()?; - tx.commit()?; +// return Ok(Subscriptions { +// account: Some(a.id), +// game: None, +// instance: None, +// }) +// } - return Ok(Subscriptions { - account: Some(a.id), - game: None, - instance: None, - }) - } +// Ok(Subscriptions { +// account: None, +// game: None, +// instance: None +// }) +// } - Ok(Subscriptions { - account: None, - game: None, - instance: None - }) - } +// fn update(&mut self, msg: &RpcMessage) -> Result<&mut Subscriptions, Error> { +// match msg { +// RpcMessage::AccountState(a) => self.account = Some(a.id), +// RpcMessage::InstanceState(i) => self.instance = Some(i.id), +// RpcMessage::GameState(g) => self.game = Some(g.id), +// _ => (), +// }; - fn update(&mut self, msg: &RpcMessage) -> Result<&mut Subscriptions, Error> { - match msg { - RpcMessage::AccountState(a) => self.account = Some(a.id), - RpcMessage::InstanceState(i) => self.instance = Some(i.id), - RpcMessage::GameState(g) => self.game = Some(g.id), - _ => (), - }; +// // info!("subscriptions updated {:?}", self); - // info!("subscriptions updated {:?}", self); - - Ok(self) - } -} +// Ok(self) +// } +// } -fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) { - if let Some(msg) = match m { - Message::Account(a) => { - match subs.account { - Some(wsa) => match wsa == a.id { - true => Some(rpc::RpcMessage::AccountState(a)), - false => None, - }, - None => None, - } - }, - Message::Instance(i) => { - match subs.instance { - Some(ci) => match ci == i.id { - true => Some(rpc::RpcMessage::InstanceState(i)), - false => None, - }, - None => None, - } - }, - Message::Game(g) => { - match subs.game { - Some(cg) => match cg == g.id { - true => Some(rpc::RpcMessage::GameState(g)), - false => None, - }, - None => None, - } - }, - // _ => None, - } { - ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); - } -} +// fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) { +// if let Some(msg) = match m { +// Message::Account(a) => { +// match subs.account { +// Some(wsa) => match wsa == a.id { +// true => Some(rpc::RpcMessage::AccountState(a)), +// false => None, +// }, +// None => None, +// } +// }, +// Message::Instance(i) => { +// match subs.instance { +// Some(ci) => match ci == i.id { +// true => Some(rpc::RpcMessage::InstanceState(i)), +// false => None, +// }, +// None => None, +// } +// }, +// Message::Game(g) => { +// match subs.game { +// Some(cg) => match cg == g.id { +// true => Some(rpc::RpcMessage::GameState(g)), +// false => None, +// }, +// None => None, +// } +// }, +// Message::Connect(tx) => { +// info!("client connected {:?}", tx); +// None +// }, +// // _ => None, +// } { +// ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); +// } +// } -pub fn start(pool: PgPool, ps: PubSub) { +pub fn start(pool: PgPool, events: Events) { let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap(); for stream in ws_server.incoming() { let ws_pool = pool.clone(); + let events_tx = events.tx.clone(); + spawn(move || { let (acc_s, acc_r) = unbounded(); @@ -272,8 +261,7 @@ pub fn start(pool: PgPool, ps: PubSub) { None => None, }; - Ws::new(client, ws_pool, account) - .start() + ws(client, ws_pool, account, events_tx) }); } } From be7a3888127a5fefd30d0e77d886d0d0f564b253 Mon Sep 17 00:00:00 2001 From: ntr Date: Thu, 25 Jul 2019 17:11:56 +1000 Subject: [PATCH 03/11] YEH --- server/Cargo.toml | 14 +- server/src/events.rs | 97 ++++++++++++++ server/src/main.rs | 13 +- server/src/ws.rs | 305 ++++++++++++++++--------------------------- 4 files changed, 227 insertions(+), 202 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index 2ad706a4..e2a27f4e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -3,13 +3,6 @@ name = "mnml" version = "0.1.0" authors = ["ntr "] -# makes sure to include openssl links in runtime path -# [profile.release] -# rpath = true - -# [profile.dev] -# rpath = true - [dependencies] serde = "1" serde_derive = "1" @@ -38,10 +31,9 @@ persistent = "0.4" router = "0.6" cookie = "0.12" tungstenite = "0.8" +tokio-tungstenite = "0.8" +futures = "0.1" +tokio = "0.1" crossbeam-channel = "0.3" stripe-rust = { version = "0.10.4", features = ["webhooks"] } - -# [patch.crates-io] -# stripe-rust = { git = "https://github.com/margh/stripe-rs.git" } - diff --git a/server/src/events.rs b/server/src/events.rs index d71adfba..bb1ab020 100644 --- a/server/src/events.rs +++ b/server/src/events.rs @@ -144,3 +144,100 @@ pub fn start(pool: PgPool, mut events: Events) { } } } + + +// #[derive(Debug)] +// struct Subscriptions { +// account: Option, +// game: Option, +// instance: Option, +// // account_instances: Vec, +// } + +// impl Subscriptions { +// fn new(ws_pool: &PgPool, account: &Option, ws: &mut Ws) -> Result { +// if let Some(a) = account { +// let db = ws_pool.get()?; +// let mut tx = db.transaction()?; + +// // send account constructs +// let account_constructs = account::account_constructs(&mut tx, a)?; +// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; + +// // get account instances +// // and send them to the client +// let account_instances = account::account_instances(&mut tx, a)?; +// // let instances = account_instances.iter().map(|i| i.id).collect::>(); +// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; + +// // get players +// // add to games +// tx.commit()?; + +// return Ok(Subscriptions { +// account: Some(a.id), +// game: None, +// instance: None, +// }) +// } + +// Ok(Subscriptions { +// account: None, +// game: None, +// instance: None +// }) +// } + +// fn update(&mut self, msg: &RpcMessage) -> Result<&mut Subscriptions, Error> { +// match msg { +// RpcMessage::AccountState(a) => self.account = Some(a.id), +// RpcMessage::InstanceState(i) => self.instance = Some(i.id), +// RpcMessage::GameState(g) => self.game = Some(g.id), +// _ => (), +// }; + +// // info!("subscriptions updated {:?}", self); + +// Ok(self) +// } +// } + + +// fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) { +// if let Some(msg) = match m { +// Message::Account(a) => { +// match subs.account { +// Some(wsa) => match wsa == a.id { +// true => Some(rpc::RpcMessage::AccountState(a)), +// false => None, +// }, +// None => None, +// } +// }, +// Message::Instance(i) => { +// match subs.instance { +// Some(ci) => match ci == i.id { +// true => Some(rpc::RpcMessage::InstanceState(i)), +// false => None, +// }, +// None => None, +// } +// }, +// Message::Game(g) => { +// match subs.game { +// Some(cg) => match cg == g.id { +// true => Some(rpc::RpcMessage::GameState(g)), +// false => None, +// }, +// None => None, +// } +// }, +// Message::Connect(tx) => { +// info!("client connected {:?}", tx); +// None +// }, +// // _ => None, +// } { +// ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); +// } +// } \ No newline at end of file diff --git a/server/src/main.rs b/server/src/main.rs index e6ea1b01..b97cd1c2 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -24,7 +24,13 @@ extern crate bodyparser; extern crate persistent; extern crate router; extern crate cookie; + +extern crate futures; +extern crate tokio; + +extern crate tokio_tungstenite; extern crate tungstenite; + extern crate crossbeam_channel; mod account; @@ -54,6 +60,8 @@ use std::thread::{sleep, spawn}; use std::time::{Duration}; use std::path::{Path}; +use futures::Future; + use events::{start as events_start}; use warden::warden; @@ -104,6 +112,9 @@ fn main() { // this should go on a thread too? let ws_pool = pool.clone(); - ws::start(ws_pool, events); + let wsf = ws::start(ws_pool, events); + + tokio::runtime::run(wsf.map_err(|_e| ())); + info!("server started"); } diff --git a/server/src/ws.rs b/server/src/ws.rs index 1ffcc887..330e3abb 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -1,26 +1,27 @@ use std::time::{Instant}; -use std::net::{TcpStream, TcpListener}; -use std::thread::{spawn}; use std::str; use uuid::Uuid; use cookie::Cookie; +use futures::stream::Stream; +use futures::Future; +use tokio::net::TcpListener; use tungstenite::Message::Binary; use tungstenite::handshake::server::{Request, ErrorResponse}; use tungstenite::handshake::HandshakeRole; use tungstenite::http::StatusCode; use tungstenite::protocol::WebSocket; use tungstenite::util::NonBlockingResult; -use tungstenite::{accept_hdr}; +use tungstenite::protocol::Message; +use tokio_tungstenite::accept_hdr_async; use crossbeam_channel::{unbounded, Receiver, Sender}; use serde_cbor::{to_vec}; -use failure::Error; -use failure::{err_msg, format_err}; +use std::io::{Error, ErrorKind}; use net::TOKEN_HEADER; use rpc; @@ -29,205 +30,110 @@ use mtx; use pg::PgPool; use account; use account::Account; -use events::{Message, Events}; +use events::{Message as WsMessage, Events}; -pub fn ws(mut client: WebSocket, pool: PgPool, account: Option, events: Sender) { - let (tx, rx) = unbounded(); - events.try_send(Message::Connect(tx)).unwrap(); +// pub fn ws(mut client: WebSocket, pool: PgPool, account: Option, events: Sender) { +// let (tx, rx) = unbounded(); +// events.try_send(Message::Connect(tx)).unwrap(); - loop { - match client.read_message().no_block() { - Ok(msg) => { - if let Some(msg) = msg { - match msg { - Binary(data) => { - let begin = Instant::now(); - let db_connection = pool.get() - .expect("unable to get db connection"); +// loop { +// match client.read_message().no_block() { +// Ok(msg) => { +// if let Some(msg) = msg { +// match msg { +// Binary(data) => { +// let begin = Instant::now(); +// let db_connection = pool.get() +// .expect("unable to get db connection"); - match rpc::receive(data, &db_connection, begin, &account) { - Ok(reply) => { - let response = to_vec(&reply) - .expect("failed to serialize response"); +// match rpc::receive(data, &db_connection, begin, &account) { +// Ok(reply) => { +// let response = to_vec(&reply) +// .expect("failed to serialize response"); - if let Err(e) = client.write_message(Binary(response)) { - // connection closed - warn!("{:?}", e); - return; - }; +// if let Err(e) = client.write_message(Binary(response)) { +// // connection closed +// warn!("{:?}", e); +// return; +// }; - // subscriptions.update(&reply).unwrap(); - }, - Err(e) => { - warn!("{:?}", e); - let response = to_vec(&RpcError { err: e.to_string() }) - .expect("failed to serialize error response"); +// // subscriptions.update(&reply).unwrap(); +// }, +// Err(e) => { +// warn!("{:?}", e); +// let response = to_vec(&RpcError { err: e.to_string() }) +// .expect("failed to serialize error response"); - if let Err(e) = client.write_message(Binary(response)) { - // connection closed - warn!("{:?}", e); - return; - }; - } - } - }, - _ => (), - } - }; +// if let Err(e) = client.write_message(Binary(response)) { +// // connection closed +// warn!("{:?}", e); +// return; +// }; +// } +// } +// }, +// _ => (), +// } +// }; - // match receiver.try_recv() { - // Ok(n) => handle_message(&subs, n, &mut websocket), - // Err(_) => (), - // }; +// // match receiver.try_recv() { +// // Ok(n) => handle_message(&subs, n, &mut websocket), +// // Err(_) => (), +// // }; - }, - // connection is closed - Err(e) => { - warn!("{:?}", e); - return; - } - }; - } -} +// }, +// // connection is closed +// Err(e) => { +// warn!("{:?}", e); +// return; +// } +// }; +// } +// } #[derive(Debug,Clone,Serialize)] struct RpcError { err: String, } -// #[derive(Debug)] -// struct Subscriptions { -// account: Option, -// game: Option, -// instance: Option, -// // account_instances: Vec, -// } +pub fn start(pool: PgPool, events: Events) -> impl Future { + let addr = "127.0.0.1:40055".parse().unwrap(); + let ws_server = TcpListener::bind(&addr).unwrap(); -// impl Subscriptions { -// fn new(ws_pool: &PgPool, account: &Option, ws: &mut Ws) -> Result { -// if let Some(a) = account { -// let db = ws_pool.get()?; -// let mut tx = db.transaction()?; - -// // send account constructs -// let account_constructs = account::account_constructs(&mut tx, a)?; -// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; - -// // get account instances -// // and send them to the client -// let account_instances = account::account_instances(&mut tx, a)?; -// // let instances = account_instances.iter().map(|i| i.id).collect::>(); -// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; - -// // get players -// // add to games -// tx.commit()?; - -// return Ok(Subscriptions { -// account: Some(a.id), -// game: None, -// instance: None, -// }) -// } - -// Ok(Subscriptions { -// account: None, -// game: None, -// instance: None -// }) -// } - -// fn update(&mut self, msg: &RpcMessage) -> Result<&mut Subscriptions, Error> { -// match msg { -// RpcMessage::AccountState(a) => self.account = Some(a.id), -// RpcMessage::InstanceState(i) => self.instance = Some(i.id), -// RpcMessage::GameState(g) => self.game = Some(g.id), -// _ => (), -// }; - -// // info!("subscriptions updated {:?}", self); - -// Ok(self) -// } -// } - - -// fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) { -// if let Some(msg) = match m { -// Message::Account(a) => { -// match subs.account { -// Some(wsa) => match wsa == a.id { -// true => Some(rpc::RpcMessage::AccountState(a)), -// false => None, -// }, -// None => None, -// } -// }, -// Message::Instance(i) => { -// match subs.instance { -// Some(ci) => match ci == i.id { -// true => Some(rpc::RpcMessage::InstanceState(i)), -// false => None, -// }, -// None => None, -// } -// }, -// Message::Game(g) => { -// match subs.game { -// Some(cg) => match cg == g.id { -// true => Some(rpc::RpcMessage::GameState(g)), -// false => None, -// }, -// None => None, -// } -// }, -// Message::Connect(tx) => { -// info!("client connected {:?}", tx); -// None -// }, -// // _ => None, -// } { -// ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); -// } -// } - -pub fn start(pool: PgPool, events: Events) { - let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap(); - for stream in ws_server.incoming() { + let wsf = ws_server.incoming().for_each(move |stream| { let ws_pool = pool.clone(); let events_tx = events.tx.clone(); - spawn(move || { - let (acc_s, acc_r) = unbounded(); + let (acc_s, acc_r) = unbounded(); - let nb_stream = stream.unwrap(); - nb_stream.set_nonblocking(true).unwrap(); - - // search through the ws request for the auth cookie - let cb = |req: &Request| { - let err = || ErrorResponse { - error_code: StatusCode::FORBIDDEN, - headers: None, - body: Some("Unauthorized".into()), - }; - - if let Some(cl) = req.headers.find_first("Cookie") { - let cookie_list = str::from_utf8(cl).or(Err(err()))?; - - for s in cookie_list.split(";").map(|s| s.trim()) { - let cookie = Cookie::parse(s).or(Err(err()))?; - - // got auth token - if cookie.name() == TOKEN_HEADER { - acc_s.send(Some(cookie.value().to_string())).or(Err(err()))?; - } - }; - }; - acc_s.send(None).unwrap(); - Ok(None) + // search through the ws request for the auth cookie + let cb = move |req: &Request| { + let err = || ErrorResponse { + error_code: StatusCode::FORBIDDEN, + headers: None, + body: Some("Unauthorized".into()), }; - let mut client = accept_hdr(nb_stream, cb).unwrap(); + if let Some(cl) = req.headers.find_first("Cookie") { + let cookie_list = str::from_utf8(cl).or(Err(err()))?; + + for s in cookie_list.split(";").map(|s| s.trim()) { + let cookie = Cookie::parse(s).or(Err(err()))?; + + // got auth token + if cookie.name() == TOKEN_HEADER { + acc_s.send(Some(cookie.value().to_string())).or(Err(err()))?; + } + }; + }; + acc_s.send(None).unwrap(); + Ok(None) + }; + + accept_hdr_async(stream, cb).and_then(move |ws_stream| { + info!("new connection"); + + let (sink, stream) = ws_stream.split(); // get a copy of the account let account = match acc_r.recv().unwrap() { @@ -235,17 +141,16 @@ pub fn start(pool: PgPool, events: Events) { let db = ws_pool.get() .expect("unable to get db connection"); - match account::from_token(&db, t) { Ok(a) => { let state = to_vec(&rpc::RpcMessage::AccountState(a.clone())).unwrap(); - client.write_message(Binary(state)).unwrap(); + // client.write_message(Binary(state)).unwrap(); let mut tx = db.transaction().unwrap(); let shop = mtx::account_shop(&mut tx, &a).unwrap(); let shop = to_vec(&rpc::RpcMessage::AccountShop(shop)).unwrap(); - client.write_message(Binary(shop)).unwrap(); + // client.write_message(Binary(shop)).unwrap(); // tx doesn't change anything tx.commit().unwrap(); @@ -254,14 +159,34 @@ pub fn start(pool: PgPool, events: Events) { }, Err(e) => { warn!("{:?}", e); - return; + None }, } }, None => None, }; - ws(client, ws_pool, account, events_tx) - }); - } + let ws_reader = stream.for_each(move |message: Message| { + Ok(()) + }); + + + tokio::spawn(ws_reader.then(move |_| { + println!("Connection closed."); + Ok(()) + })); + + + info!("{:?}", account); + Ok(()) + // ws(client, ws_pool, account, events_tx) + + }) + .map_err(|e| { + warn!("Error during the websocket handshake occurred: {}", e); + Error::new(ErrorKind::Other, e) + }) + }); + + return wsf; } From 066c61c227104ace85e96fe0677a09d2439117f4 Mon Sep 17 00:00:00 2001 From: ntr Date: Thu, 25 Jul 2019 21:45:27 +1000 Subject: [PATCH 04/11] wip --- server/src/main.rs | 4 +- server/src/ws.rs | 136 +++++++++++++++++++-------------------------- 2 files changed, 59 insertions(+), 81 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index b97cd1c2..0e2f2a16 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -112,9 +112,9 @@ fn main() { // this should go on a thread too? let ws_pool = pool.clone(); - let wsf = ws::start(ws_pool, events); + let wss = ws::server(ws_pool, events); - tokio::runtime::run(wsf.map_err(|_e| ())); + tokio::runtime::run(wss.map_err(|_e| ())); info!("server started"); } diff --git a/server/src/ws.rs b/server/src/ws.rs index 330e3abb..4c79ee9a 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -8,99 +8,41 @@ use cookie::Cookie; use futures::stream::Stream; use futures::Future; use tokio::net::TcpListener; -use tungstenite::Message::Binary; use tungstenite::handshake::server::{Request, ErrorResponse}; -use tungstenite::handshake::HandshakeRole; + use tungstenite::http::StatusCode; -use tungstenite::protocol::WebSocket; -use tungstenite::util::NonBlockingResult; + + use tungstenite::protocol::Message; use tokio_tungstenite::accept_hdr_async; -use crossbeam_channel::{unbounded, Receiver, Sender}; +use crossbeam_channel::{unbounded}; use serde_cbor::{to_vec}; use std::io::{Error, ErrorKind}; +use futures::Sink; use net::TOKEN_HEADER; use rpc; +use rpc::{RpcMessage}; use mtx; use pg::PgPool; use account; -use account::Account; -use events::{Message as WsMessage, Events}; -// pub fn ws(mut client: WebSocket, pool: PgPool, account: Option, events: Sender) { -// let (tx, rx) = unbounded(); -// events.try_send(Message::Connect(tx)).unwrap(); - -// loop { -// match client.read_message().no_block() { -// Ok(msg) => { -// if let Some(msg) = msg { -// match msg { -// Binary(data) => { -// let begin = Instant::now(); -// let db_connection = pool.get() -// .expect("unable to get db connection"); - -// match rpc::receive(data, &db_connection, begin, &account) { -// Ok(reply) => { -// let response = to_vec(&reply) -// .expect("failed to serialize response"); - -// if let Err(e) = client.write_message(Binary(response)) { -// // connection closed -// warn!("{:?}", e); -// return; -// }; - -// // subscriptions.update(&reply).unwrap(); -// }, -// Err(e) => { -// warn!("{:?}", e); -// let response = to_vec(&RpcError { err: e.to_string() }) -// .expect("failed to serialize error response"); - -// if let Err(e) = client.write_message(Binary(response)) { -// // connection closed -// warn!("{:?}", e); -// return; -// }; -// } -// } -// }, -// _ => (), -// } -// }; - -// // match receiver.try_recv() { -// // Ok(n) => handle_message(&subs, n, &mut websocket), -// // Err(_) => (), -// // }; - -// }, -// // connection is closed -// Err(e) => { -// warn!("{:?}", e); -// return; -// } -// }; -// } -// } +use events::{Events}; #[derive(Debug,Clone,Serialize)] struct RpcError { err: String, } -pub fn start(pool: PgPool, events: Events) -> impl Future { +pub fn server(pool: PgPool, events: Events) -> impl Future { let addr = "127.0.0.1:40055".parse().unwrap(); let ws_server = TcpListener::bind(&addr).unwrap(); - let wsf = ws_server.incoming().for_each(move |stream| { + let wss = ws_server.incoming().for_each(move |stream| { let ws_pool = pool.clone(); let events_tx = events.tx.clone(); @@ -133,7 +75,7 @@ pub fn start(pool: PgPool, events: Events) -> impl Future impl Future { let state = to_vec(&rpc::RpcMessage::AccountState(a.clone())).unwrap(); - // client.write_message(Binary(state)).unwrap(); + sink.start_send(Message::Binary(state)).unwrap(); let mut tx = db.transaction().unwrap(); let shop = mtx::account_shop(&mut tx, &a).unwrap(); let shop = to_vec(&rpc::RpcMessage::AccountShop(shop)).unwrap(); - // client.write_message(Binary(shop)).unwrap(); + sink.start_send(Message::Binary(shop)).unwrap(); // tx doesn't change anything tx.commit().unwrap(); @@ -166,21 +108,57 @@ pub fn start(pool: PgPool, events: Events) -> impl Future None, }; - let ws_reader = stream.for_each(move |message: Message| { - Ok(()) + let (tx, rx) = unbounded(); + events_tx.try_send(WsMessage::Connect(tx)).unwrap(); + + let message_reader = stream + .filter(|msg| msg.is_binary()) + .and_then(|message: Message| { + let begin = Instant::now(); + let db_connection = ws_pool.get() + .expect("unable to get db connection"); + + match rpc::receive(message.into_data(), &db_connection, begin, &account) { + Ok(reply) => { + let response = to_vec(&reply) + .expect("failed to serialize response"); + Ok(response); + }, + Err(e) => { + warn!("{:?}", e); + let response = to_vec(&RpcError { err: e.to_string() }) + .expect("failed to serialize error response"); + Ok(response); + } + }; + + Ok(sink) + }); + + let events_reader = rx.map(|message: RpcMessage| { + let response = to_vec(&message) + .expect("failed to serialize event"); + Ok(response); }); + let combined = message_reader.select(events_reader); - tokio::spawn(ws_reader.then(move |_| { - println!("Connection closed."); + let outgoing = combined.fold(sink, |mut sink, message| { + sink.start_send(Message::Binary(message)).unwrap(); + Ok(sink) + }); + + // if let Err(e) = sink.start_send(Message::Binary(response)) { + // // connection closed + // warn!("{:?}", e); + // return Err(e); + // }; + tokio::spawn(outgoing.then(move |_| { + info!("connection closed"); Ok(()) })); - - info!("{:?}", account); Ok(()) - // ws(client, ws_pool, account, events_tx) - }) .map_err(|e| { warn!("Error during the websocket handshake occurred: {}", e); @@ -188,5 +166,5 @@ pub fn start(pool: PgPool, events: Events) -> impl Future Date: Thu, 25 Jul 2019 23:59:46 +1000 Subject: [PATCH 05/11] carnnnn cunt --- server/Cargo.toml | 5 +- server/src/main.rs | 16 +---- server/src/rpc.rs | 2 + server/src/ws.rs | 170 --------------------------------------------- 4 files changed, 6 insertions(+), 187 deletions(-) delete mode 100644 server/src/ws.rs diff --git a/server/Cargo.toml b/server/Cargo.toml index e2a27f4e..1a238de3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -30,10 +30,7 @@ bodyparser = "0.8" persistent = "0.4" router = "0.6" cookie = "0.12" -tungstenite = "0.8" -tokio-tungstenite = "0.8" -futures = "0.1" -tokio = "0.1" crossbeam-channel = "0.3" +ws = "0.8" stripe-rust = { version = "0.10.4", features = ["webhooks"] } diff --git a/server/src/main.rs b/server/src/main.rs index 0e2f2a16..ebb549ce 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -25,12 +25,7 @@ extern crate persistent; extern crate router; extern crate cookie; -extern crate futures; -extern crate tokio; - -extern crate tokio_tungstenite; -extern crate tungstenite; - +extern crate ws; extern crate crossbeam_channel; mod account; @@ -54,14 +49,12 @@ mod spec; mod util; mod vbox; mod warden; -mod ws; +mod websocket; use std::thread::{sleep, spawn}; use std::time::{Duration}; use std::path::{Path}; -use futures::Future; - use events::{start as events_start}; use warden::warden; @@ -110,11 +103,8 @@ fn main() { let http_pool = pool.clone(); spawn(move || net::start(http_pool)); - // this should go on a thread too? let ws_pool = pool.clone(); - let wss = ws::server(ws_pool, events); - - tokio::runtime::run(wss.map_err(|_e| ())); + websocket::start(ws_pool, events); info!("server started"); } diff --git a/server/src/rpc.rs b/server/src/rpc.rs index 3bbcec7d..76a610d5 100644 --- a/server/src/rpc.rs +++ b/server/src/rpc.rs @@ -33,6 +33,8 @@ pub enum RpcMessage { Pong(()), DevResolutions(Resolutions), + + Error(String), } #[derive(Debug,Clone,Serialize,Deserialize)] diff --git a/server/src/ws.rs b/server/src/ws.rs deleted file mode 100644 index 4c79ee9a..00000000 --- a/server/src/ws.rs +++ /dev/null @@ -1,170 +0,0 @@ -use std::time::{Instant}; -use std::str; - -use uuid::Uuid; - -use cookie::Cookie; - -use futures::stream::Stream; -use futures::Future; -use tokio::net::TcpListener; -use tungstenite::handshake::server::{Request, ErrorResponse}; - -use tungstenite::http::StatusCode; - - -use tungstenite::protocol::Message; -use tokio_tungstenite::accept_hdr_async; - -use crossbeam_channel::{unbounded}; - -use serde_cbor::{to_vec}; - -use std::io::{Error, ErrorKind}; -use futures::Sink; - -use net::TOKEN_HEADER; -use rpc; -use rpc::{RpcMessage}; - -use mtx; -use pg::PgPool; -use account; - -use events::{Events}; - -#[derive(Debug,Clone,Serialize)] -struct RpcError { - err: String, -} - -pub fn server(pool: PgPool, events: Events) -> impl Future { - let addr = "127.0.0.1:40055".parse().unwrap(); - let ws_server = TcpListener::bind(&addr).unwrap(); - - let wss = ws_server.incoming().for_each(move |stream| { - let ws_pool = pool.clone(); - let events_tx = events.tx.clone(); - - let (acc_s, acc_r) = unbounded(); - - // search through the ws request for the auth cookie - let cb = move |req: &Request| { - let err = || ErrorResponse { - error_code: StatusCode::FORBIDDEN, - headers: None, - body: Some("Unauthorized".into()), - }; - - if let Some(cl) = req.headers.find_first("Cookie") { - let cookie_list = str::from_utf8(cl).or(Err(err()))?; - - for s in cookie_list.split(";").map(|s| s.trim()) { - let cookie = Cookie::parse(s).or(Err(err()))?; - - // got auth token - if cookie.name() == TOKEN_HEADER { - acc_s.send(Some(cookie.value().to_string())).or(Err(err()))?; - } - }; - }; - acc_s.send(None).unwrap(); - Ok(None) - }; - - accept_hdr_async(stream, cb).and_then(move |ws_stream| { - info!("new connection"); - - let (mut sink, stream) = ws_stream.split(); - - // get a copy of the account - let account = match acc_r.recv().unwrap() { - Some(t) => { - let db = ws_pool.get() - .expect("unable to get db connection"); - - match account::from_token(&db, t) { - Ok(a) => { - let state = to_vec(&rpc::RpcMessage::AccountState(a.clone())).unwrap(); - sink.start_send(Message::Binary(state)).unwrap(); - - let mut tx = db.transaction().unwrap(); - let shop = mtx::account_shop(&mut tx, &a).unwrap(); - let shop = to_vec(&rpc::RpcMessage::AccountShop(shop)).unwrap(); - - sink.start_send(Message::Binary(shop)).unwrap(); - - // tx doesn't change anything - tx.commit().unwrap(); - - Some(a) - }, - Err(e) => { - warn!("{:?}", e); - None - }, - } - }, - None => None, - }; - - let (tx, rx) = unbounded(); - events_tx.try_send(WsMessage::Connect(tx)).unwrap(); - - let message_reader = stream - .filter(|msg| msg.is_binary()) - .and_then(|message: Message| { - let begin = Instant::now(); - let db_connection = ws_pool.get() - .expect("unable to get db connection"); - - match rpc::receive(message.into_data(), &db_connection, begin, &account) { - Ok(reply) => { - let response = to_vec(&reply) - .expect("failed to serialize response"); - Ok(response); - }, - Err(e) => { - warn!("{:?}", e); - let response = to_vec(&RpcError { err: e.to_string() }) - .expect("failed to serialize error response"); - Ok(response); - } - }; - - Ok(sink) - }); - - let events_reader = rx.map(|message: RpcMessage| { - let response = to_vec(&message) - .expect("failed to serialize event"); - Ok(response); - }); - - let combined = message_reader.select(events_reader); - - let outgoing = combined.fold(sink, |mut sink, message| { - sink.start_send(Message::Binary(message)).unwrap(); - Ok(sink) - }); - - // if let Err(e) = sink.start_send(Message::Binary(response)) { - // // connection closed - // warn!("{:?}", e); - // return Err(e); - // }; - tokio::spawn(outgoing.then(move |_| { - info!("connection closed"); - Ok(()) - })); - - Ok(()) - }) - .map_err(|e| { - warn!("Error during the websocket handshake occurred: {}", e); - Error::new(ErrorKind::Other, e) - }) - }); - - return wss; -} From 86d77d23bb29872732e0bdef0533b32a2aa2690b Mon Sep 17 00:00:00 2001 From: ntr Date: Thu, 25 Jul 2019 23:59:51 +1000 Subject: [PATCH 06/11] carnnnn cunt --- server/src/websocket.rs | 120 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 server/src/websocket.rs diff --git a/server/src/websocket.rs b/server/src/websocket.rs new file mode 100644 index 00000000..38569a55 --- /dev/null +++ b/server/src/websocket.rs @@ -0,0 +1,120 @@ +use std::time::{Instant}; +use std::thread::spawn; +use std::str; + +use serde_cbor::to_vec; + +use cookie::Cookie; + +use crossbeam_channel::{unbounded, Sender as CbSender, Receiver as CbReceiver}; +use ws::{ listen, CloseCode, Message, Sender, Handler, Handshake, Result, Request, Response}; + +use account; +use account::{Account}; +use pg::{PgPool}; +use events::{Events}; +use rpc::{RpcMessage}; +use rpc; +use net::TOKEN_HEADER; + +#[derive(Debug,Clone,Serialize)] +struct WsError { + err: String, +} + +struct Connection { + ws: CbSender, + pool: PgPool, + events: Events, + account: Option, +} + +impl Handler for Connection { + fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { + info!("connected account={:?}", self.account); + Ok(()) + } + + fn on_message(&mut self, msg: Message) -> Result<()> { + match msg { + Message::Binary(msg) => { + let begin = Instant::now(); + let db_connection = self.pool.get().unwrap(); + + match rpc::receive(msg, &db_connection, begin, &self.account) { + Ok(reply) => { + self.ws.send(reply).unwrap(); + }, + Err(e) => { + warn!("{:?}", e); + self.ws.send(RpcMessage::Error(e.to_string())).unwrap(); + }, + }; + }, + _ => (), + }; + Ok(()) + } + + fn on_close(&mut self, _: CloseCode, _: &str) { + info!("socket disconnected account={:?}", self.account); + // self.ws.shutdown().unwrap() + } + + fn on_request(&mut self, req: &Request) -> Result { + let res = Response::from_request(req)?; + + if let Some(cl) = req.header("Cookie") { + let unauth = || Ok(Response::new(401, "Unauthorized", b"401 - Unauthorized".to_vec())); + let cookie_list = match str::from_utf8(cl) { + Ok(cl) => cl, + Err(_) => return unauth(), + }; + + for s in cookie_list.split(";").map(|s| s.trim()) { + let cookie = match Cookie::parse(s) { + Ok(c) => c, + Err(_) => return unauth(), + }; + + // got auth token + if cookie.name() == TOKEN_HEADER { + let db = self.pool.get().unwrap(); + match account::from_token(&db, cookie.value().to_string()) { + Ok(a) => self.account = Some(a), + Err(_) => return unauth(), + } + } + }; + }; + + Ok(res) + } +} + + +pub fn start(pool: PgPool, events: Events) { + listen("127.0.0.1:40055", move |out| { + let (tx, rx) = unbounded::(); + + spawn(move || { + loop { + match rx.recv() { + Ok(n) => { + let response = to_vec(&n).unwrap(); + out.send(Message::Binary(response)).unwrap(); + } + Err(_) => (), + }; + } + }); + + Connection { + account: None, + ws: tx, + pool: pool.clone(), + events: events.clone(), + } + + }).unwrap(); +} From 8adcd08daf4ff695707d10dbb2449334ec5abe8e Mon Sep 17 00:00:00 2001 From: ntr Date: Fri, 26 Jul 2019 00:17:45 +1000 Subject: [PATCH 07/11] we hawt --- server/src/websocket.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/server/src/websocket.rs b/server/src/websocket.rs index 38569a55..4f1655de 100644 --- a/server/src/websocket.rs +++ b/server/src/websocket.rs @@ -32,6 +32,11 @@ struct Connection { impl Handler for Connection { fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { info!("connected account={:?}", self.account); + + if let Some(ref a) = self.account { + self.ws.send(RpcMessage::AccountState(a.clone())).unwrap(); + } + Ok(()) } @@ -104,7 +109,10 @@ pub fn start(pool: PgPool, events: Events) { let response = to_vec(&n).unwrap(); out.send(Message::Binary(response)).unwrap(); } - Err(_) => (), + // we done + Err(_e) => { + break; + }, }; } }); From 4726a8c5b33f83dcf2a1da6e80df3ab3bec1565f Mon Sep 17 00:00:00 2001 From: ntr Date: Fri, 26 Jul 2019 15:24:35 +1000 Subject: [PATCH 08/11] here we go --- server/src/events.rs | 152 ++++++++-------------------------------- server/src/main.rs | 18 ++--- server/src/pg.rs | 86 ++++++++++++++++++++++- server/src/websocket.rs | 39 ++++++++--- 4 files changed, 153 insertions(+), 142 deletions(-) diff --git a/server/src/events.rs b/server/src/events.rs index bb1ab020..f90a2e2f 100644 --- a/server/src/events.rs +++ b/server/src/events.rs @@ -6,146 +6,54 @@ use failure::Error; use crossbeam_channel::{unbounded, Sender, Receiver}; -use pg::{Db, PgPool}; use account; use game; use instance; - -#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] -#[serde(rename_all(deserialize = "lowercase"))] -enum Table { - Accounts, - Constructs, - Instances, - Mtx, - Players, - Games, -} - -#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] -#[serde(rename_all(deserialize = "UPPERCASE"))] -enum Action { - Insert, - Update, - Delete, -} - -#[derive(Debug,Clone,Serialize,Deserialize)] -struct Notification { - table: Table, - action: Action, - id: Uuid, -} - -#[derive(Debug,Clone)] -pub enum Message { - // outgoing - Account(account::Account), - Game(game::Game), - Instance(instance::Instance), - - // incoming - Connect(Sender), - Disconnect, -} +use pg::{Db, PgPool}; +use rpc::RpcMessage; +use websocket::{WsMessage}; #[derive(Clone)] pub struct Events { - pub tx: Sender, - rx: Receiver, + pub tx: Sender, + rx: Receiver, + pool: PgPool, } impl Events { - pub fn new() -> Events { + pub fn new(pool: PgPool) -> Events { let (tx, rx) = unbounded(); - return Events { tx, rx }; + return Events { tx, rx, pool }; } - fn receive(&mut self) { - match self.rx.try_recv() { - Ok(m) => self.on_message(m), - Err(_) => (), - } - } - - fn on_message(&mut self, msg: Message) { - match msg { - Message::Connect(tx) => { - info!("client connected {:?}", tx); - }, - Message::Disconnect => { - info!("client disconnected"); - } - _ => panic!("events received unhandled msg={:?}", msg), - } - } -} - -fn pg_notification(n: Notification, db: &Db, events: &Events) -> Result<(), Error> { - info!("events received notification notification={:?}", n); - - // maybe we need it - let mut tx = db.transaction()?; - - let msg = match n.action { - Action::Delete => return Err(format_err!("unimplemented delete notification {:?}", n)), - Action::Insert => return Err(format_err!("unimplemented insert notification {:?}", n)), - Action::Update => match n.table { - Table::Accounts => Message::Account(account::select(db, n.id)?), - Table::Instances => Message::Instance(instance::instance_get(&mut tx, n.id)?), - Table::Games => Message::Game(game::game_get(&mut tx, n.id)?), - _ => return Err(format_err!("unimplemented update notification {:?}", n)), - }, - }; - - tx.commit()?; - - info!("got a msg to send to whoever cares {:?}", msg); - - // match events.try_send(msg.clone()) { - // Ok(()) => info!("events message sent message={:?}", msg), - // Err(e) => warn!("events delivery failure err={:?}", e), - // }; - - Ok(()) -} - -pub fn listen(pool: &PgPool, events: &mut Events) -> Result<(), Error> { - let db = pool.get()?; - db.execute("LISTEN events;", &[])?; - info!("events listening"); - - let notifications = db.notifications(); - let mut n_iter = notifications.blocking_iter(); - - // main event loop, checks pg and checks messages - loop { - // check notifications - let n = n_iter.next()?; - if let Some(n) = n { - match serde_json::from_str::(&n.payload) { - Ok(notification) => match pg_notification(notification, &db, &events) { - Ok(()) => (), - Err(e) => warn!("{:?}", e), - } - Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e), + pub fn listen(&mut self) -> Result<(), Error> { + loop { + match self.rx.recv() { + Ok(m) => { + self.on_message(m)?; + }, + Err(e) => { + return Err(format_err!("events error err={:?}", e)); + }, }; } - - events.receive(); } -} -pub fn start(pool: PgPool, mut events: Events) { - loop { - match listen(&pool, &mut events) { - Ok(()) => panic!("events listen returned"), - Err(e) => warn!("events listener error err={:?}", e), + fn on_message(&mut self, msg: WsMessage) -> Result<(), Error> { + match msg { + WsMessage::Connect(tx) => { + info!("client connected {:?}", tx); + Ok(()) + }, + WsMessage::Disconnect => { + info!("client disconnected"); + Ok(()) + } + _ => return Err(format_err!("events received unhandled msg={:?}", msg)), } } } - // #[derive(Debug)] // struct Subscriptions { // account: Option, @@ -240,4 +148,6 @@ pub fn start(pool: PgPool, mut events: Events) { // } { // ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); // } -// } \ No newline at end of file +// } + + diff --git a/server/src/main.rs b/server/src/main.rs index ebb549ce..2206df8f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -55,7 +55,7 @@ use std::thread::{sleep, spawn}; use std::time::{Duration}; use std::path::{Path}; -use events::{start as events_start}; +use events::Events; use warden::warden; fn setup_logger() -> Result<(), fern::InitError> { @@ -95,16 +95,16 @@ fn main() { } }); - let events = events::Events::new(); - let event_listener = events.clone(); - let events_pool = pool.clone(); - // spawn(move || events_start(events_pool, event_listener)); - let http_pool = pool.clone(); spawn(move || net::start(http_pool)); - let ws_pool = pool.clone(); - websocket::start(ws_pool, events); + // create a clone of the tx so ws handler can tell events + // about connection status + let mut events = events::Events::new(pool.clone()); + let ws_events_tx = events.tx.clone(); + spawn(move || events.listen()); - info!("server started"); + // the main thread becomes this ws listener + let ws_pool = pool.clone(); + websocket::start(ws_pool, ws_events_tx); } diff --git a/server/src/pg.rs b/server/src/pg.rs index 0b0fe2db..b4cc5f3f 100644 --- a/server/src/pg.rs +++ b/server/src/pg.rs @@ -1,16 +1,46 @@ use std::env; +use uuid::Uuid; + +use failure::Error; + use r2d2::{Pool}; use r2d2::{PooledConnection}; use r2d2_postgres::{TlsMode, PostgresConnectionManager}; +use fallible_iterator::{FallibleIterator}; -// use postgres::transaction::Transaction; +use events::Events; pub type Db = PooledConnection; pub type PgPool = Pool; const DB_POOL_SIZE: u32 = 20; +#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] +#[serde(rename_all(deserialize = "lowercase"))] +enum Table { + Accounts, + Constructs, + Instances, + Mtx, + Players, + Games, +} + +#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] +#[serde(rename_all(deserialize = "UPPERCASE"))] +enum Action { + Insert, + Update, + Delete, +} + +#[derive(Debug,Clone,Serialize,Deserialize)] +struct Notification { + table: Table, + action: Action, + id: Uuid, +} pub fn create_pool() -> Pool { let url = env::var("DATABASE_URL") @@ -24,3 +54,57 @@ pub fn create_pool() -> Pool { .build(manager) .expect("Failed to create pool.") } + +// fn handle_notification(n: Notification, db: &Db, events: &Events) -> Result<(), Error> { +// info!("events received notification notification={:?}", n); + +// // maybe we need it +// let mut tx = db.transaction()?; + +// let msg = match n.action { +// Action::Delete => return Err(format_err!("unimplemented delete notification {:?}", n)), +// Action::Insert => return Err(format_err!("unimplemented insert notification {:?}", n)), +// Action::Update => match n.table { +// Table::Accounts => Message::Account(account::select(db, n.id)?), +// Table::Instances => Message::Instance(instance::instance_get(&mut tx, n.id)?), +// Table::Games => Message::Game(game::game_get(&mut tx, n.id)?), +// _ => return Err(format_err!("unimplemented update notification {:?}", n)), +// }, +// }; + +// tx.commit()?; + +// info!("got a msg to send to whoever cares {:?}", msg); + +// // match events.try_send(msg.clone()) { +// // Ok(()) => info!("events message sent message={:?}", msg), +// // Err(e) => warn!("events delivery failure err={:?}", e), +// // }; + +// Ok(()) +// } + + +// pub fn listen(pool: &PgPool, events: &mut Events) -> Result<(), Error> { +// let db = pool.get()?; +// db.execute("LISTEN events;", &[])?; +// info!("events listening"); + +// let notifications = db.notifications(); +// let mut n_iter = notifications.blocking_iter(); + +// // main event loop, checks pg and checks messages +// loop { +// // check notifications +// let n = n_iter.next()?; +// if let Some(n) = n { +// match serde_json::from_str::(&n.payload) { +// Ok(notification) => match handle_notification(notification, &db, &events) { +// Ok(()) => (), +// Err(e) => warn!("{:?}", e), +// } +// Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e), +// }; +// } +// } +// } \ No newline at end of file diff --git a/server/src/websocket.rs b/server/src/websocket.rs index 4f1655de..b48d593d 100644 --- a/server/src/websocket.rs +++ b/server/src/websocket.rs @@ -2,31 +2,37 @@ use std::time::{Instant}; use std::thread::spawn; use std::str; +use rand::prelude::*; + use serde_cbor::to_vec; use cookie::Cookie; -use crossbeam_channel::{unbounded, Sender as CbSender, Receiver as CbReceiver}; -use ws::{ listen, CloseCode, Message, Sender, Handler, Handshake, Result, Request, Response}; +use crossbeam_channel::{unbounded, Sender as CbSender}; +use ws::{ listen, CloseCode, Message, Handler, Result, Request, Response}; use account; use account::{Account}; use pg::{PgPool}; -use events::{Events}; + use rpc::{RpcMessage}; use rpc; use net::TOKEN_HEADER; -#[derive(Debug,Clone,Serialize)] -struct WsError { - err: String, +// these are messages relating to the lifecycle +// of ws clients +#[derive(Debug,Clone)] +pub enum WsMessage { + Connect(CbSender), + Disconnect, } struct Connection { - ws: CbSender, + pub id: usize, + pub ws: CbSender, pool: PgPool, - events: Events, account: Option, + events: CbSender, } impl Handler for Connection { @@ -37,6 +43,9 @@ impl Handler for Connection { self.ws.send(RpcMessage::AccountState(a.clone())).unwrap(); } + // tell events we have connected + self.events.send(WsMessage::Connect(self.ws.clone())); + Ok(()) } @@ -98,8 +107,16 @@ impl Handler for Connection { } -pub fn start(pool: PgPool, events: Events) { +pub fn start(pool: PgPool, events_tx: CbSender) { + let mut rng = thread_rng(); listen("127.0.0.1:40055", move |out| { + + // we give the tx half to the connection object + // which in turn passes a clone to the events system + // the rx half goes into a thread where it waits for messages + // that need to be delivered to the client + // both the ws message handler and the events thread must use + // this channel to send messages let (tx, rx) = unbounded::(); spawn(move || { @@ -118,11 +135,11 @@ pub fn start(pool: PgPool, events: Events) { }); Connection { + id: rng.gen::(), account: None, ws: tx, pool: pool.clone(), - events: events.clone(), + events: events_tx.clone(), } - }).unwrap(); } From 7c619a8becfac1608cba558bba0aab65a2086a8e Mon Sep 17 00:00:00 2001 From: ntr Date: Fri, 26 Jul 2019 16:39:55 +1000 Subject: [PATCH 09/11] HAWT --- server/src/events.rs | 84 +++++++++++++++++++++++++++++++++++------ server/src/websocket.rs | 66 +++++++++++++++++++++++--------- 2 files changed, 120 insertions(+), 30 deletions(-) diff --git a/server/src/events.rs b/server/src/events.rs index f90a2e2f..bb19f8bc 100644 --- a/server/src/events.rs +++ b/server/src/events.rs @@ -1,29 +1,53 @@ +use std::collections::{HashMap, HashSet}; + // Db Commons use uuid::Uuid; -use fallible_iterator::{FallibleIterator}; use failure::Error; use crossbeam_channel::{unbounded, Sender, Receiver}; use account; +use account::Account; use game; use instance; use pg::{Db, PgPool}; use rpc::RpcMessage; -use websocket::{WsMessage}; -#[derive(Clone)] +type Id = usize; + pub struct Events { - pub tx: Sender, - rx: Receiver, + pub tx: Sender, + rx: Receiver, pool: PgPool, + + clients: HashMap, +} + +#[derive(Debug,Clone)] +pub enum Event { + WsConnect(Id, Option, Sender), + WsDisconnect(Id), + WsSubscribe(Id, Uuid), + WsUnsubscribe(Id, Uuid), +} + + +struct WsClient { + id: Id, + tx: Sender, + subs: HashSet, } impl Events { pub fn new(pool: PgPool) -> Events { let (tx, rx) = unbounded(); - return Events { tx, rx, pool }; + Events { + tx, + rx, + pool, + clients: HashMap::new(), + } } pub fn listen(&mut self) -> Result<(), Error> { @@ -32,6 +56,9 @@ impl Events { Ok(m) => { self.on_message(m)?; }, + + // idk if this is a good idea + // possibly just log errors and continue... Err(e) => { return Err(format_err!("events error err={:?}", e)); }, @@ -39,17 +66,50 @@ impl Events { } } - fn on_message(&mut self, msg: WsMessage) -> Result<(), Error> { + fn on_message(&mut self, msg: Event) -> Result<(), Error> { match msg { - WsMessage::Connect(tx) => { - info!("client connected {:?}", tx); + Event::WsConnect(id, account, tx) => { + info!("client connected to events id={:?} account={:?}", id, account); + + let client = WsClient { id, tx, subs: HashSet::new() }; + self.clients.insert(id, client); + + info!("events clients={:?}", self.clients.len()); Ok(()) }, - WsMessage::Disconnect => { - info!("client disconnected"); + Event::WsDisconnect(id) => { + info!("client disconnected from events id={:?}", id); + + self.clients.remove(&id); + + info!("events clients={:?}", self.clients.len()); Ok(()) } - _ => return Err(format_err!("events received unhandled msg={:?}", msg)), + Event::WsSubscribe(id, obj) => { + info!("client subscribed to updates from object id={:?} object={:?}", id, obj); + + match self.clients.get_mut(&id) { + Some(client) => { + client.subs.insert(obj); + info!("client subscriptions={:?}", client.subs.len()); + Ok(()) + }, + None => return Err(format_err!("unknown client {:?}", id)) + } + }, + Event::WsUnsubscribe(id, obj) => { + info!("client subscribed to updates from object id={:?} object={:?}", id, obj); + + match self.clients.get_mut(&id) { + Some(mut client) => { + client.subs.remove(&obj); + info!("client subscriptions={:?}", client.subs.len()); + Ok(()) + }, + None => return Err(format_err!("unknown client {:?}", id)) + } + }, + } } } diff --git a/server/src/websocket.rs b/server/src/websocket.rs index b48d593d..8b12004e 100644 --- a/server/src/websocket.rs +++ b/server/src/websocket.rs @@ -14,37 +14,55 @@ use ws::{ listen, CloseCode, Message, Handler, Result, Request, Response}; use account; use account::{Account}; use pg::{PgPool}; +use events::Event; use rpc::{RpcMessage}; use rpc; +use mtx; use net::TOKEN_HEADER; -// these are messages relating to the lifecycle -// of ws clients -#[derive(Debug,Clone)] -pub enum WsMessage { - Connect(CbSender), - Disconnect, -} - struct Connection { pub id: usize, pub ws: CbSender, pool: PgPool, account: Option, - events: CbSender, + events: CbSender, } +// we unwrap everything in here cause really +// we don't care if this panics +// it's run in a thread so it's supposed to bail +// when it encounters errors impl Handler for Connection { fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { - info!("connected account={:?}", self.account); - - if let Some(ref a) = self.account { - self.ws.send(RpcMessage::AccountState(a.clone())).unwrap(); - } + info!("websocket connected account={:?}", self.account); // tell events we have connected - self.events.send(WsMessage::Connect(self.ws.clone())); + self.events.send(Event::WsConnect(self.id, self.account.clone(), self.ws.clone())).unwrap(); + + // if user logged in do some prep work + if let Some(ref a) = self.account { + self.ws.send(RpcMessage::AccountState(a.clone())).unwrap(); + self.events.send(Event::WsSubscribe(self.id, a.id)).unwrap(); + + let db = self.pool.get().unwrap(); + let mut tx = db.transaction().unwrap(); + + // send account constructs + let account_constructs = account::account_constructs(&mut tx, a).unwrap(); + self.ws.send(rpc::RpcMessage::AccountConstructs(account_constructs)).unwrap(); + + // get account instances + // and send them to the client + let account_instances = account::account_instances(&mut tx, a).unwrap(); + self.ws.send(rpc::RpcMessage::AccountInstances(account_instances)).unwrap(); + + let shop = mtx::account_shop(&mut tx, &a).unwrap(); + self.ws.send(rpc::RpcMessage::AccountShop(shop)).unwrap(); + + // tx should do nothing + tx.commit().unwrap(); + } Ok(()) } @@ -57,6 +75,18 @@ impl Handler for Connection { match rpc::receive(msg, &db_connection, begin, &self.account) { Ok(reply) => { + // if the user queries the state of something + // we tell events to push updates to them + match reply { + RpcMessage::AccountState(ref v) => + self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(), + RpcMessage::GameState(ref v) => + self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(), + RpcMessage::InstanceState(ref v) => + self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(), + _ => (), + }; + self.ws.send(reply).unwrap(); }, Err(e) => { @@ -71,8 +101,8 @@ impl Handler for Connection { } fn on_close(&mut self, _: CloseCode, _: &str) { - info!("socket disconnected account={:?}", self.account); - // self.ws.shutdown().unwrap() + info!("websocket disconnected account={:?}", self.account); + self.events.send(Event::WsDisconnect(self.id)).unwrap(); } fn on_request(&mut self, req: &Request) -> Result { @@ -107,7 +137,7 @@ impl Handler for Connection { } -pub fn start(pool: PgPool, events_tx: CbSender) { +pub fn start(pool: PgPool, events_tx: CbSender) { let mut rng = thread_rng(); listen("127.0.0.1:40055", move |out| { From 813fc0184fdb2ad8ce132c838ee7700bcd147141 Mon Sep 17 00:00:00 2001 From: ntr Date: Fri, 26 Jul 2019 17:27:38 +1000 Subject: [PATCH 10/11] yes --- server/src/events.rs | 48 +++++++++++++---- server/src/main.rs | 5 ++ server/src/pg.rs | 112 +++++++++++++++++++++++----------------- server/src/websocket.rs | 12 ++--- 4 files changed, 114 insertions(+), 63 deletions(-) diff --git a/server/src/events.rs b/server/src/events.rs index bb19f8bc..d47d412e 100644 --- a/server/src/events.rs +++ b/server/src/events.rs @@ -26,12 +26,13 @@ pub struct Events { #[derive(Debug,Clone)] pub enum Event { - WsConnect(Id, Option, Sender), - WsDisconnect(Id), - WsSubscribe(Id, Uuid), - WsUnsubscribe(Id, Uuid), -} + Connect(Id, Option, Sender), + Disconnect(Id), + Subscribe(Id, Uuid), + Unsubscribe(Id, Uuid), + Push(Uuid, RpcMessage), +} struct WsClient { id: Id, @@ -54,7 +55,7 @@ impl Events { loop { match self.rx.recv() { Ok(m) => { - self.on_message(m)?; + self.on_event(m)?; }, // idk if this is a good idea @@ -66,9 +67,13 @@ impl Events { } } - fn on_message(&mut self, msg: Event) -> Result<(), Error> { + fn remove_client(&mut self, id: Id) { + self.clients.remove(&id); + } + + fn on_event(&mut self, msg: Event) -> Result<(), Error> { match msg { - Event::WsConnect(id, account, tx) => { + Event::Connect(id, account, tx) => { info!("client connected to events id={:?} account={:?}", id, account); let client = WsClient { id, tx, subs: HashSet::new() }; @@ -77,7 +82,7 @@ impl Events { info!("events clients={:?}", self.clients.len()); Ok(()) }, - Event::WsDisconnect(id) => { + Event::Disconnect(id) => { info!("client disconnected from events id={:?}", id); self.clients.remove(&id); @@ -85,7 +90,7 @@ impl Events { info!("events clients={:?}", self.clients.len()); Ok(()) } - Event::WsSubscribe(id, obj) => { + Event::Subscribe(id, obj) => { info!("client subscribed to updates from object id={:?} object={:?}", id, obj); match self.clients.get_mut(&id) { @@ -97,7 +102,7 @@ impl Events { None => return Err(format_err!("unknown client {:?}", id)) } }, - Event::WsUnsubscribe(id, obj) => { + Event::Unsubscribe(id, obj) => { info!("client subscribed to updates from object id={:?} object={:?}", id, obj); match self.clients.get_mut(&id) { @@ -110,6 +115,27 @@ impl Events { } }, + Event::Push(id, msg) => { + info!("events received push notification id={:?} msg={:?}", id, msg); + + let mut subs = 0; + for (_client_id, client) in self.clients.iter() { + if client.subs.contains(&id) { + subs += 1; + match client.tx.send(msg.clone()) { + Ok(_) => (), + Err(e) => { + warn!("unable to send msg to client err={:?}", e); + // self.remove_client(*client_id); + }, + }; + } + } + + info!("notification subscribers {:?}", subs); + + Ok(()) + }, } } } diff --git a/server/src/main.rs b/server/src/main.rs index 2206df8f..a53119bb 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -100,8 +100,13 @@ fn main() { // create a clone of the tx so ws handler can tell events // about connection status + // TODO store as an Arc> and make cpuN threads let mut events = events::Events::new(pool.clone()); let ws_events_tx = events.tx.clone(); + + let pg_pool = pool.clone(); + let pg_events = events.tx.clone(); + spawn(move || pg::listen(pg_pool, pg_events)); spawn(move || events.listen()); // the main thread becomes this ws listener diff --git a/server/src/pg.rs b/server/src/pg.rs index b4cc5f3f..88e05a87 100644 --- a/server/src/pg.rs +++ b/server/src/pg.rs @@ -1,4 +1,5 @@ use std::env; +use std::thread::spawn; use uuid::Uuid; @@ -9,7 +10,13 @@ use r2d2::{PooledConnection}; use r2d2_postgres::{TlsMode, PostgresConnectionManager}; use fallible_iterator::{FallibleIterator}; -use events::Events; +use crossbeam_channel::{Sender}; + +use events::{Event}; +use account; +use game; +use instance; +use rpc::RpcMessage; pub type Db = PooledConnection; pub type PgPool = Pool; @@ -55,56 +62,69 @@ pub fn create_pool() -> Pool { .expect("Failed to create pool.") } -// fn handle_notification(n: Notification, db: &Db, events: &Events) -> Result<(), Error> { -// info!("events received notification notification={:?}", n); +fn handle_notification(n: Notification, pool: &PgPool, events: &Sender) { + info!("pg received notification={:?}", n); -// // maybe we need it -// let mut tx = db.transaction()?; + // bang out a thread to do the slow work of fetching the state from db + // the thread will notify events -// let msg = match n.action { -// Action::Delete => return Err(format_err!("unimplemented delete notification {:?}", n)), -// Action::Insert => return Err(format_err!("unimplemented insert notification {:?}", n)), -// Action::Update => match n.table { -// Table::Accounts => Message::Account(account::select(db, n.id)?), -// Table::Instances => Message::Instance(instance::instance_get(&mut tx, n.id)?), -// Table::Games => Message::Game(game::game_get(&mut tx, n.id)?), -// _ => return Err(format_err!("unimplemented update notification {:?}", n)), -// }, -// }; + let pool = pool.clone(); + let events = events.clone(); + spawn(move || { + // maybe we need it + let db = pool.get().unwrap(); + let mut tx = db.transaction().unwrap(); -// tx.commit()?; + let msg = match n.action { + Action::Delete => { + warn!("unimplemented delete notification {:?}", n); + None + }, + Action::Insert => { + warn!("unimplemented insert notification {:?}", n); + None + }, + Action::Update => match n.table { + Table::Accounts => + Some(Event::Push(n.id, RpcMessage::AccountState(account::select(&db, n.id).unwrap()))), + Table::Instances => + Some(Event::Push(n.id, RpcMessage::InstanceState(instance::instance_get(&mut tx, n.id).unwrap()))), + Table::Games => + Some(Event::Push(n.id, RpcMessage::GameState(game::game_get(&mut tx, n.id).unwrap()))), + _ => { + warn!("unimplemented update notification {:?}", n); + None + }, + }, + }; -// info!("got a msg to send to whoever cares {:?}", msg); + tx.commit().unwrap(); -// // match events.try_send(msg.clone()) { -// // Ok(()) => info!("events message sent message={:?}", msg), -// // Err(e) => warn!("events delivery failure err={:?}", e), -// // }; + if let Some(msg) = msg { + events.send(msg).unwrap(); + } + }); +} -// Ok(()) -// } +// this function gets a dedicated connection +// because it has to subscribe and listen for notifications +pub fn listen(pool: PgPool, events: Sender) -> Result<(), Error> { + let db = pool.get()?; + db.execute("LISTEN events;", &[])?; + info!("pg listening"); + let notifications = db.notifications(); + let mut n_iter = notifications.blocking_iter(); -// pub fn listen(pool: &PgPool, events: &mut Events) -> Result<(), Error> { -// let db = pool.get()?; -// db.execute("LISTEN events;", &[])?; -// info!("events listening"); - -// let notifications = db.notifications(); -// let mut n_iter = notifications.blocking_iter(); - -// // main event loop, checks pg and checks messages -// loop { -// // check notifications -// let n = n_iter.next()?; -// if let Some(n) = n { -// match serde_json::from_str::(&n.payload) { -// Ok(notification) => match handle_notification(notification, &db, &events) { -// Ok(()) => (), -// Err(e) => warn!("{:?}", e), -// } -// Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e), -// }; -// } -// } -// } \ No newline at end of file + // main event loop, checks pg and checks messages + loop { + // check notifications + let n = n_iter.next()?; + if let Some(n) = n { + match serde_json::from_str::(&n.payload) { + Ok(notification) => handle_notification(notification, &pool, &events), + Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e), + }; + } + } +} diff --git a/server/src/websocket.rs b/server/src/websocket.rs index 8b12004e..0c78b977 100644 --- a/server/src/websocket.rs +++ b/server/src/websocket.rs @@ -38,12 +38,12 @@ impl Handler for Connection { info!("websocket connected account={:?}", self.account); // tell events we have connected - self.events.send(Event::WsConnect(self.id, self.account.clone(), self.ws.clone())).unwrap(); + self.events.send(Event::Connect(self.id, self.account.clone(), self.ws.clone())).unwrap(); // if user logged in do some prep work if let Some(ref a) = self.account { self.ws.send(RpcMessage::AccountState(a.clone())).unwrap(); - self.events.send(Event::WsSubscribe(self.id, a.id)).unwrap(); + self.events.send(Event::Subscribe(self.id, a.id)).unwrap(); let db = self.pool.get().unwrap(); let mut tx = db.transaction().unwrap(); @@ -79,11 +79,11 @@ impl Handler for Connection { // we tell events to push updates to them match reply { RpcMessage::AccountState(ref v) => - self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(), + self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), RpcMessage::GameState(ref v) => - self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(), + self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), RpcMessage::InstanceState(ref v) => - self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(), + self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), _ => (), }; @@ -102,7 +102,7 @@ impl Handler for Connection { fn on_close(&mut self, _: CloseCode, _: &str) { info!("websocket disconnected account={:?}", self.account); - self.events.send(Event::WsDisconnect(self.id)).unwrap(); + self.events.send(Event::Disconnect(self.id)).unwrap(); } fn on_request(&mut self, req: &Request) -> Result { From d03c07cc5987c4de394b9f8288d1fd0ea2438306 Mon Sep 17 00:00:00 2001 From: ntr Date: Fri, 26 Jul 2019 17:28:16 +1000 Subject: [PATCH 11/11] = --- server/src/events.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/events.rs b/server/src/events.rs index d47d412e..79bc30d9 100644 --- a/server/src/events.rs +++ b/server/src/events.rs @@ -132,7 +132,7 @@ impl Events { } } - info!("notification subscribers {:?}", subs); + info!("notification subscribers={:?}", subs); Ok(()) },