From c94b6f134c23f09d51065fe4dfe16bc02b9386d2 Mon Sep 17 00:00:00 2001 From: ntr Date: Tue, 30 Jul 2019 00:48:40 +1000 Subject: [PATCH] sheeeeeeeeeeeeeeeeeeeeeeeeeeit --- client/src/components/list.jsx | 5 +- client/src/events.jsx | 7 +- client/src/socket.jsx | 12 +- .../sites-available/mnml.gg.DEV.nginx.conf | 3 + server/src/account.rs | 2 +- server/src/events.rs | 49 ++- server/src/instance.rs | 8 +- server/src/main.rs | 25 +- server/src/mtx.rs | 2 +- server/src/rpc.rs | 349 +++++++++++++----- server/src/warden.rs | 44 ++- server/src/websocket.rs | 175 --------- 12 files changed, 357 insertions(+), 324 deletions(-) delete mode 100644 server/src/websocket.rs diff --git a/client/src/components/list.jsx b/client/src/components/list.jsx index 1d7a1c0c..26cd07a0 100644 --- a/client/src/components/list.jsx +++ b/client/src/components/list.jsx @@ -64,7 +64,6 @@ const addState = connect( function List(args) { const { team, - constructs, constructRename, clearMtxRename, setConstructRename, @@ -73,9 +72,7 @@ function List(args) { sendConstructAvatarReroll, } = args; - const constructPanels = constructs - .filter(c => team.includes(c.id)) - .sort(idSort) + const constructPanels = team .map(construct => { const constructName = constructRename === construct.id ? diff --git a/client/src/events.jsx b/client/src/events.jsx index a32251b0..abe1b065 100644 --- a/client/src/events.jsx +++ b/client/src/events.jsx @@ -10,7 +10,6 @@ function registerEvents(store) { } function setTeam(team) { - localStorage.setItem('team', JSON.stringify(team)); store.dispatch(actions.setTeam(team)); } @@ -179,11 +178,6 @@ function registerEvents(store) { return store.dispatch(actions.setItemInfo(v)); } - const team = JSON.parse(localStorage.getItem('team')); - if (team && team.every(t => t)) { - store.dispatch(actions.setTeam(team)); - } - // events.on('SET_PLAYER', setInstance); // events.on('SEND_SKILL', function skillActive(gameId, constructId, targetConstructId, skill) { @@ -234,6 +228,7 @@ function registerEvents(store) { setItemInfo, setPing, setShop, + setTeam, setWs, }; } diff --git a/client/src/socket.jsx b/client/src/socket.jsx index eed3385d..78ebd771 100644 --- a/client/src/socket.jsx +++ b/client/src/socket.jsx @@ -113,10 +113,9 @@ function createSocket(events) { } function sendInstanceQueue() { - send(['InstancePractice', {}]); + send(['InstanceQueue', {}]); } - function sendInstanceReady(instanceId) { send(['InstanceReady', { instance_id: instanceId }]); } @@ -155,6 +154,10 @@ function createSocket(events) { events.setConstructList(constructs); } + function onAccountTeam(constructs) { + events.setTeam(constructs); + } + function onConstructSpawn(construct) { events.setNewConstruct(construct); } @@ -186,6 +189,7 @@ function createSocket(events) { const handlers = { AccountState: onAccount, AccountConstructs: onAccountConstructs, + AccountTeam: onAccountTeam, AccountInstances: onAccountInstances, AccountShop: onAccountShop, ConstructSpawn: onConstructSpawn, @@ -193,6 +197,10 @@ function createSocket(events) { InstanceState: onInstanceState, ItemInfo: onItemInfo, Pong: onPong, + + QueueRequested: () => console.log('pvp queue request received'), + QueueJoined: () => console.log('you have joined the pvp queue'), + Error: errHandler, }; diff --git a/etc/nginx/sites-available/mnml.gg.DEV.nginx.conf b/etc/nginx/sites-available/mnml.gg.DEV.nginx.conf index dc7c219d..9b6b8c97 100644 --- a/etc/nginx/sites-available/mnml.gg.DEV.nginx.conf +++ b/etc/nginx/sites-available/mnml.gg.DEV.nginx.conf @@ -20,6 +20,9 @@ server { root /var/lib/mnml/public/current; index index.html; try_files $uri $uri/ index.html; + + add_header 'Cache-Control' 'no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0'; + expires off; } location /imgs/ { diff --git a/server/src/account.rs b/server/src/account.rs index cb1f74b9..009ba195 100644 --- a/server/src/account.rs +++ b/server/src/account.rs @@ -265,7 +265,7 @@ pub fn create(name: &String, password: &String, code: &String, tx: &mut Transact Ok(token) } -pub fn account_constructs(tx: &mut Transaction, account: &Account) -> Result, Error> { +pub fn constructs(tx: &mut Transaction, account: &Account) -> Result, Error> { let query = " SELECT data FROM constructs diff --git a/server/src/events.rs b/server/src/events.rs index 51c1e456..95cf18e6 100644 --- a/server/src/events.rs +++ b/server/src/events.rs @@ -5,7 +5,7 @@ use uuid::Uuid; use failure::Error; -use crossbeam_channel::{unbounded, Sender, Receiver}; +use crossbeam_channel::{Sender, Receiver}; use account; use account::Account; @@ -19,12 +19,23 @@ use warden::{GameEvent}; pub type EventsTx = Sender; type Id = usize; +// this is pretty heavyweight +// but it makes the ergonomics easy +// and prevents message mania +// good candidate to trim if the mm is slow +#[derive(Debug, Clone)] +pub struct PvpRequest { + pub id: Id, + pub tx: Sender, + pub account: Account, +} + pub struct Events { pub tx: Sender, rx: Receiver, warden: Sender, - queue: Option, + queue: Option, clients: HashMap, } @@ -41,7 +52,7 @@ pub enum Event { Push(Uuid, RpcMessage), // client events - Queue(Id, Uuid), + Queue(PvpRequest), } struct WsClient { @@ -51,8 +62,7 @@ struct WsClient { } impl Events { - pub fn new(warden: Sender) -> Events { - let (tx, rx) = unbounded(); + pub fn new(tx: Sender, rx: Receiver, warden: Sender) -> Events { Events { tx, rx, @@ -66,11 +76,14 @@ impl Events { loop { match self.rx.recv() { Ok(m) => { - self.on_event(m)?; + match self.event(m) { + Ok(()) => (), // :) + Err(e) => { + warn!("err={:?}", e); + } + } }, - // idk if this is a good idea - // possibly just log errors and continue... Err(e) => { return Err(format_err!("events error err={:?}", e)); }, @@ -89,7 +102,7 @@ impl Events { self.clients.remove(&id); } - fn on_event(&mut self, msg: Event) -> Result<(), Error> { + fn event(&mut self, msg: Event) -> Result<(), Error> { match msg { Event::Connect(id, account, tx) => { info!("connect id={:?} account={:?}", id, account); @@ -100,6 +113,7 @@ impl Events { info!("clients={:?}", self.clients.len()); Ok(()) }, + Event::Disconnect(id) => { info!("disconnect id={:?}", id); @@ -107,7 +121,8 @@ impl Events { info!("clients={:?}", self.clients.len()); Ok(()) - } + }, + Event::Subscribe(id, obj) => { info!("subscribe id={:?} object={:?}", id, obj); @@ -120,6 +135,7 @@ impl Events { None => return Err(format_err!("unknown client {:?}", id)) } }, + Event::Unsubscribe(id, obj) => { info!("unsubscribe id={:?} object={:?}", id, obj); @@ -162,17 +178,18 @@ impl Events { Ok(()) }, - Event::Queue(id, account) => { - info!("queue id={:?} account={:?}", id, account); + Event::Queue(req) => { + info!("queue id={:?} account={:?}", req.id, req.account); self.queue = match self.queue { - Some(id) => { - info!("game queue pair a={:?} b={:?}", account, id); + Some(ref q_req) => { + info!("game queue pair found a={:?} b={:?}", req.account, q_req.account); + self.warden.send(GameEvent::Match((req, q_req.clone())))?; None }, None => { - info!("joined game queue id={:?} account={:?}", id, account); - Some(account) + info!("joined game queue id={:?} account={:?}", req.id, req.account); + Some(req) }, }; diff --git a/server/src/instance.rs b/server/src/instance.rs index 6e8f760e..e6c7ad11 100644 --- a/server/src/instance.rs +++ b/server/src/instance.rs @@ -13,7 +13,7 @@ use chrono::Duration; use account::Account; use account; -use events::EventsTx; +use events::{EventsTx, Event}; use player::{Player, player_create}; use construct::{Construct, construct_get}; use mob::{bot_player, instance_mobs}; @@ -692,7 +692,7 @@ pub fn instance_practice(tx: &mut Transaction, account: &Account) -> Result Result { // } -pub fn instance_pvp(tx: &mut Transaction, a: &Account, b: &Account) -> Result { +pub fn pvp(tx: &mut Transaction, a: &Account, b: &Account) -> Result { let mut instance = Instance::new() // TODO generate nice game names .set_name("PVP".to_string())?; @@ -747,10 +747,6 @@ pub fn instance_game_finished(tx: &mut Transaction, game: &Game, instance_id: Uu Ok(()) } -pub fn instance_queue(events: &EventsTx, account: &Account) -> Result { - -} - #[cfg(test)] mod tests { use super::*; diff --git a/server/src/main.rs b/server/src/main.rs index 044df941..77682917 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -49,11 +49,12 @@ mod spec; mod util; mod vbox; mod warden; -mod websocket; use std::thread::{spawn}; use std::path::{Path}; +use crossbeam_channel::{unbounded}; + fn setup_logger() -> Result<(), fern::InitError> { fern::Dispatch::new() .format(|out, message, record| { @@ -80,20 +81,22 @@ fn main() { setup_logger().unwrap(); let pool = pg::create_pool(); - - let warden = warden::Warden::new(pool.clone()); - let warden_tx = warden.tx.clone(); - let warden_tick_tx = warden.tx.clone(); - let http_pool = pool.clone(); + let (events_tx, events_rx) = unbounded(); + let pg_events_tx = events_tx.clone(); + let rpc_events_tx = events_tx.clone(); + + let (warden_tx, warden_rx) = unbounded(); + let events_warden_tx = warden_tx.clone(); + let warden_tick_tx = warden_tx.clone(); + // create a clone of the tx so ws handler can tell events // about connection status - let events = events::Events::new(warden_tx); - let ws_events_tx = events.tx.clone(); + let events = events::Events::new(events_tx, events_rx, events_warden_tx); + let warden = warden::Warden::new(warden_tx, warden_rx, events.tx.clone(), pool.clone()); let pg_pool = pool.clone(); - let pg_events_tx = events.tx.clone(); spawn(move || net::start(http_pool)); spawn(move || warden.listen()); @@ -102,6 +105,6 @@ fn main() { spawn(move || events.listen()); // the main thread becomes this ws listener - let ws_pool = pool.clone(); - websocket::start(ws_pool, ws_events_tx); + let rpc_pool = pool.clone(); + rpc::start(rpc_pool, rpc_events_tx); } diff --git a/server/src/mtx.rs b/server/src/mtx.rs index d20e97fc..41f95601 100644 --- a/server/src/mtx.rs +++ b/server/src/mtx.rs @@ -159,7 +159,7 @@ pub fn apply(tx: &mut Transaction, account: &Account, variant: MtxVariant, const }; construct_write(tx, construct)?; - account::account_constructs(tx, account) + account::constructs(tx, account) } pub fn select(tx: &mut Transaction, variant: MtxVariant, account: Uuid) -> Result { diff --git a/server/src/rpc.rs b/server/src/rpc.rs index 726622b0..1316d78f 100644 --- a/server/src/rpc.rs +++ b/server/src/rpc.rs @@ -1,31 +1,38 @@ - use std::time::{Instant}; +use std::thread::spawn; +use std::str; -use serde_cbor::{from_slice}; use uuid::Uuid; +use rand::prelude::*; + use failure::Error; use failure::err_msg; -use crossbeam_channel::{Sender}; +use serde_cbor::{from_slice, to_vec}; +use cookie::Cookie; +use crossbeam_channel::{unbounded, Sender as CbSender}; +use ws::{listen, CloseCode, Message, Handler, Request, Response}; + +use account::{Account}; use account; -use pg::{Db}; -use events::{Event}; use construct::{Construct}; +use events::{Event, PvpRequest}; use game::{Game, game_state, game_skill, game_ready}; -use account::{Account, account_constructs}; -use skill::{Skill, dev_resolve, Resolutions}; -use instance::{Instance, instance_state, instance_practice, instance_ready, instance_queue}; -use vbox::{vbox_accept, vbox_apply, vbox_discard, vbox_combine, vbox_reclaim, vbox_unequip}; +use instance::{Instance, instance_state, instance_practice, instance_ready}; use item::{Item, ItemInfoCtr, item_info}; -use websocket::{Connection}; - use mtx; +use pg::{Db}; +use pg::{PgPool}; +use skill::{Skill, dev_resolve, Resolutions}; +use vbox::{vbox_accept, vbox_apply, vbox_discard, vbox_combine, vbox_reclaim, vbox_unequip}; +use net::TOKEN_HEADER; #[derive(Debug,Clone,Serialize,Deserialize)] pub enum RpcMessage { AccountState(Account), AccountConstructs(Vec), + AccountTeam(Vec), AccountInstances(Vec), AccountShop(mtx::Shop), ConstructSpawn(Construct), @@ -38,9 +45,9 @@ pub enum RpcMessage { DevResolutions(Resolutions), - QueueRequested, - QueueJoined, - QueueCancelled, + QueueRequested(()), + QueueJoined(()), + QueueCancelled(()), Error(String), } @@ -78,105 +85,271 @@ enum RpcRequest { VboxReclaim { instance_id: Uuid, index: usize }, } -pub fn receive(data: Vec, db: &Db, connection: &Connection, begin: Instant, account: &Option) -> Result { - // cast the msg to this type to receive method name - match from_slice::(&data) { - Ok(v) => { +struct Connection { + pub id: usize, + pub ws: CbSender, + pool: PgPool, + account: Option, + events: CbSender, +} - // non authenticated - // non transactional reqs - match v { - RpcRequest::Ping {} => return Ok(RpcMessage::Pong(())), - RpcRequest::ItemInfo {} => return Ok(RpcMessage::ItemInfo(item_info())), - RpcRequest::DevResolve {a, b, skill } => - return Ok(RpcMessage::DevResolutions(dev_resolve(a, b, skill))), - _ => (), - }; +impl Connection { + fn receive(&self, data: Vec, db: &Db, begin: Instant) -> Result { + // cast the msg to this type to receive method name + match from_slice::(&data) { + Ok(v) => { - // check for authorization now - let account = match account { - Some(account) => account, - None => return Err(err_msg("auth required")), - }; + // non authenticated + // non transactional reqs + match v { + RpcRequest::Ping {} => return Ok(RpcMessage::Pong(())), + RpcRequest::ItemInfo {} => return Ok(RpcMessage::ItemInfo(item_info())), + RpcRequest::DevResolve {a, b, skill } => + return Ok(RpcMessage::DevResolutions(dev_resolve(a, b, skill))), + _ => (), + }; - // all good, let's make a tx and process - let mut tx = db.transaction()?; + // check for authorization now + let account = match self.account { + Some(ref account) => account, + None => return Err(err_msg("auth required")), + }; - let request = v.clone(); + // evented but authorization required + match v { + RpcRequest::InstanceQueue {} => { + let pvp = PvpRequest { id: self.id, account: account.clone(), tx: self.ws.clone() }; + self.events.send(Event::Queue(pvp))?; + return Ok(RpcMessage::QueueRequested(())); + }, + _ => (), + }; - let response = match v { - RpcRequest::AccountState {} => - return Ok(RpcMessage::AccountState(account.clone())), - RpcRequest::AccountConstructs {} => - Ok(RpcMessage::AccountConstructs(account_constructs(&mut tx, &account)?)), + // all good, let's make a tx and process + let mut tx = db.transaction()?; - RpcRequest::AccountSetTeam { ids } => - Ok(RpcMessage::AccountConstructs(account::set_team(&mut tx, &account, ids)?)), + let request = v.clone(); - // RpcRequest::AccountShop {} => - // Ok(RpcMessage::AccountShop(mtx::account_shop(&mut tx, &account)?)), + let response = match v { + RpcRequest::AccountState {} => + return Ok(RpcMessage::AccountState(account.clone())), + RpcRequest::AccountConstructs {} => + Ok(RpcMessage::AccountConstructs(account::constructs(&mut tx, &account)?)), - // RpcRequest::ConstructDelete" => handle_construct_delete(data, &mut tx, account), + RpcRequest::AccountSetTeam { ids } => + Ok(RpcMessage::AccountTeam(account::set_team(&mut tx, &account, ids)?)), - RpcRequest::GameState { id } => - Ok(RpcMessage::GameState(game_state(&mut tx, account, id)?)), + // RpcRequest::AccountShop {} => + // Ok(RpcMessage::AccountShop(mtx::account_shop(&mut tx, &account)?)), - RpcRequest::GameSkill { game_id, construct_id, target_construct_id, skill } => - Ok(RpcMessage::GameState(game_skill(&mut tx, account, game_id, construct_id, target_construct_id, skill)?)), + // RpcRequest::ConstructDelete" => handle_construct_delete(data, &mut tx, account), - RpcRequest::GameReady { id } => - Ok(RpcMessage::GameState(game_ready(&mut tx, account, id)?)), + RpcRequest::GameState { id } => + Ok(RpcMessage::GameState(game_state(&mut tx, account, id)?)), - RpcRequest::InstanceQueue {} => - Ok(RpcMessage::QueueRequested), - RpcRequest::InstancePractice {} => - Ok(RpcMessage::InstanceState(instance_practice(&mut tx, account)?)), + RpcRequest::GameSkill { game_id, construct_id, target_construct_id, skill } => + Ok(RpcMessage::GameState(game_skill(&mut tx, account, game_id, construct_id, target_construct_id, skill)?)), - // these two can return GameState or InstanceState - RpcRequest::InstanceReady { instance_id } => - Ok(instance_ready(&mut tx, account, instance_id)?), - RpcRequest::InstanceState { instance_id } => - Ok(instance_state(&mut tx, account, instance_id)?), + RpcRequest::GameReady { id } => + Ok(RpcMessage::GameState(game_ready(&mut tx, account, id)?)), - RpcRequest::VboxAccept { instance_id, group, index } => - Ok(RpcMessage::InstanceState(vbox_accept(&mut tx, account, instance_id, group, index)?)), + RpcRequest::InstancePractice {} => + Ok(RpcMessage::InstanceState(instance_practice(&mut tx, account)?)), - RpcRequest::VboxApply { instance_id, construct_id, index } => - Ok(RpcMessage::InstanceState(vbox_apply(&mut tx, account, instance_id, construct_id, index)?)), + // these two can return GameState or InstanceState + RpcRequest::InstanceReady { instance_id } => + Ok(instance_ready(&mut tx, account, instance_id)?), + RpcRequest::InstanceState { instance_id } => + Ok(instance_state(&mut tx, account, instance_id)?), - RpcRequest::VboxCombine { instance_id, indices } => - Ok(RpcMessage::InstanceState(vbox_combine(&mut tx, account, instance_id, indices)?)), + RpcRequest::VboxAccept { instance_id, group, index } => + Ok(RpcMessage::InstanceState(vbox_accept(&mut tx, account, instance_id, group, index)?)), - RpcRequest::VboxDiscard { instance_id } => - Ok(RpcMessage::InstanceState(vbox_discard(&mut tx, account, instance_id)?)), + RpcRequest::VboxApply { instance_id, construct_id, index } => + Ok(RpcMessage::InstanceState(vbox_apply(&mut tx, account, instance_id, construct_id, index)?)), - RpcRequest::VboxReclaim { instance_id, index } => - Ok(RpcMessage::InstanceState(vbox_reclaim(&mut tx, account, instance_id, index)?)), + RpcRequest::VboxCombine { instance_id, indices } => + Ok(RpcMessage::InstanceState(vbox_combine(&mut tx, account, instance_id, indices)?)), - RpcRequest::VboxUnequip { instance_id, construct_id, target } => - Ok(RpcMessage::InstanceState(vbox_unequip(&mut tx, account, instance_id, construct_id, target)?)), + RpcRequest::VboxDiscard { instance_id } => + Ok(RpcMessage::InstanceState(vbox_discard(&mut tx, account, instance_id)?)), - RpcRequest::MtxConstructSpawn {} => - Ok(RpcMessage::ConstructSpawn(mtx::new_construct(&mut tx, account)?)), + RpcRequest::VboxReclaim { instance_id, index } => + Ok(RpcMessage::InstanceState(vbox_reclaim(&mut tx, account, instance_id, index)?)), - RpcRequest::MtxConstructApply { mtx, construct_id, name } => - Ok(RpcMessage::AccountConstructs(mtx::apply(&mut tx, account, mtx, construct_id, name)?)), + RpcRequest::VboxUnequip { instance_id, construct_id, target } => + Ok(RpcMessage::InstanceState(vbox_unequip(&mut tx, account, instance_id, construct_id, target)?)), - RpcRequest::MtxBuy { mtx } => - Ok(RpcMessage::AccountShop(mtx::buy(&mut tx, account, mtx)?)), + RpcRequest::MtxConstructSpawn {} => + Ok(RpcMessage::ConstructSpawn(mtx::new_construct(&mut tx, account)?)), - _ => Err(format_err!("unknown request request={:?}", request)), - }; + RpcRequest::MtxConstructApply { mtx, construct_id, name } => + Ok(RpcMessage::AccountConstructs(mtx::apply(&mut tx, account, mtx, construct_id, name)?)), - tx.commit()?; + RpcRequest::MtxBuy { mtx } => + Ok(RpcMessage::AccountShop(mtx::buy(&mut tx, account, mtx)?)), - info!("request={:?} account={:?} duration={:?}", request, account.name, begin.elapsed()); + _ => Err(format_err!("unknown request request={:?}", request)), + }; - return response; - }, - Err(e) => { - warn!("{:?}", e); - Err(err_msg("invalid message")) - }, + tx.commit()?; + + info!("request={:?} account={:?} duration={:?}", request, account.name, begin.elapsed()); + + return response; + }, + Err(e) => { + warn!("{:?}", e); + Err(err_msg("invalid message")) + }, + } } } + +// we unwrap everything in here cause really +// we don't care if this panics +// it's run in a thread so it's supposed to bail +// when it encounters errors +impl Handler for Connection { + fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { + info!("websocket connected account={:?}", self.account); + + // tell events we have connected + self.events.send(Event::Connect(self.id, self.account.clone(), self.ws.clone())).unwrap(); + + // if user logged in do some prep work + if let Some(ref a) = self.account { + self.ws.send(RpcMessage::AccountState(a.clone())).unwrap(); + self.events.send(Event::Subscribe(self.id, a.id)).unwrap(); + + let db = self.pool.get().unwrap(); + let mut tx = db.transaction().unwrap(); + + // send account constructs + let account_constructs = account::constructs(&mut tx, a).unwrap(); + self.ws.send(RpcMessage::AccountConstructs(account_constructs)).unwrap(); + + // get account instances + // and send them to the client + let account_instances = account::account_instances(&mut tx, a).unwrap(); + self.ws.send(RpcMessage::AccountInstances(account_instances)).unwrap(); + + let shop = mtx::account_shop(&mut tx, &a).unwrap(); + self.ws.send(RpcMessage::AccountShop(shop)).unwrap(); + + let team = account::account_team(&mut tx, &a).unwrap(); + self.ws.send(RpcMessage::AccountTeam(team)).unwrap(); + + // tx should do nothing + tx.commit().unwrap(); + } + + Ok(()) + } + + fn on_message(&mut self, msg: Message) -> ws::Result<()> { + match msg { + Message::Binary(msg) => { + let begin = Instant::now(); + let db_connection = self.pool.get().unwrap(); + + match self.receive(msg, &db_connection, begin) { + Ok(reply) => { + // if the user queries the state of something + // we tell events to push updates to them + match reply { + RpcMessage::AccountState(ref v) => + self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), + RpcMessage::GameState(ref v) => + self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), + RpcMessage::InstanceState(ref v) => + self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), + _ => (), + }; + + self.ws.send(reply).unwrap(); + }, + Err(e) => { + warn!("{:?}", e); + self.ws.send(RpcMessage::Error(e.to_string())).unwrap(); + }, + }; + }, + _ => (), + }; + Ok(()) + } + + fn on_close(&mut self, _: CloseCode, _: &str) { + info!("websocket disconnected account={:?}", self.account); + self.events.send(Event::Disconnect(self.id)).unwrap(); + } + + fn on_request(&mut self, req: &Request) -> ws::Result { + let res = Response::from_request(req)?; + + if let Some(cl) = req.header("Cookie") { + let unauth = || Ok(Response::new(401, "Unauthorized", b"401 - Unauthorized".to_vec())); + let cookie_list = match str::from_utf8(cl) { + Ok(cl) => cl, + Err(_) => return unauth(), + }; + + for s in cookie_list.split(";").map(|s| s.trim()) { + let cookie = match Cookie::parse(s) { + Ok(c) => c, + Err(_) => return unauth(), + }; + + // got auth token + if cookie.name() == TOKEN_HEADER { + let db = self.pool.get().unwrap(); + match account::from_token(&db, cookie.value().to_string()) { + Ok(a) => self.account = Some(a), + Err(_) => return unauth(), + } + } + }; + }; + + Ok(res) + } +} + +pub fn start(pool: PgPool, events_tx: CbSender) { + let mut rng = thread_rng(); + listen("127.0.0.1:40055", move |out| { + + // we give the tx half to the connection object + // which in turn passes a clone to the events system + // the rx half goes into a thread where it waits for messages + // that need to be delivered to the client + // both the ws message handler and the events thread must use + // this channel to send messages + let (tx, rx) = unbounded::(); + + spawn(move || { + loop { + match rx.recv() { + Ok(n) => { + let response = to_vec(&n).unwrap(); + out.send(Message::Binary(response)).unwrap(); + } + // we done + Err(_e) => { + break; + }, + }; + } + }); + + Connection { + id: rng.gen::(), + account: None, + ws: tx, + pool: pool.clone(), + events: events_tx.clone(), + } + }).unwrap(); +} + diff --git a/server/src/warden.rs b/server/src/warden.rs index c37cf024..f1ebd731 100644 --- a/server/src/warden.rs +++ b/server/src/warden.rs @@ -2,18 +2,21 @@ use std::time::{Duration}; use uuid::Uuid; -use crossbeam_channel::{unbounded, tick, Sender, Receiver}; +use crossbeam_channel::{tick, Sender, Receiver}; // Db Commons use postgres::transaction::Transaction; use failure::Error; use game::{games_need_upkeep, game_update, game_write, game_delete}; +use instance; use instance::{instances_need_upkeep, instances_idle, instance_update, instance_delete}; use pg::{Db, PgPool}; +use events::{Event, EventsTx, PvpRequest}; +use rpc::{RpcMessage}; type Id = usize; -type Pair = ((Id, Uuid), (Id, Uuid)); +type Pair = (PvpRequest, PvpRequest); pub enum GameEvent { Upkeep, @@ -27,15 +30,16 @@ pub struct Warden { pub tx: Sender, rx: Receiver, + events: EventsTx, pool: PgPool, } impl Warden { - pub fn new(pool: PgPool) -> Warden { - let (tx, rx) = unbounded(); + pub fn new(tx: Sender, rx: Receiver, events: EventsTx, pool: PgPool) -> Warden { Warden { tx, rx, + events, pool, } } @@ -44,13 +48,16 @@ impl Warden { loop { match self.rx.recv() { Ok(m) => { - self.event(m)?; + match self.event(m) { + Ok(()) => (), // :) + Err(e) => { + warn!("err={:?}", e); + } + } }, - // idk if this is a good idea - // possibly just log errors and continue... Err(e) => { - return Err(format_err!("events error err={:?}", e)); + return Err(format_err!("err={:?}", e)); }, }; } @@ -80,15 +87,24 @@ impl Warden { } fn on_match(&mut self, pair: Pair) -> Result<(), Error> { - let db = self.pool.get()?; - let tx = db.transaction()?; - info!("received pair={:?}", pair); + let db = self.pool.get()?; + let mut tx = db.transaction()?; + let instance = instance::pvp(&mut tx, &pair.0.account, &pair.1.account)?; + tx.commit()?; + + // subscribe users to instance events + self.events.send(Event::Subscribe(pair.0.id, instance.id))?; + self.events.send(Event::Subscribe(pair.1.id, instance.id))?; + + // send them the new instance state + let msg = RpcMessage::InstanceState(instance); + pair.0.tx.send(msg.clone())?; + pair.1.tx.send(msg)?; + Ok(()) } - - } fn fetch_games(mut tx: Transaction) -> Result { @@ -131,4 +147,4 @@ pub fn upkeep_tick(warden: Sender) { ticker.recv().unwrap(); warden.send(GameEvent::Upkeep).unwrap(); } -} \ No newline at end of file +} diff --git a/server/src/websocket.rs b/server/src/websocket.rs deleted file mode 100644 index 016a2ae9..00000000 --- a/server/src/websocket.rs +++ /dev/null @@ -1,175 +0,0 @@ -use std::time::{Instant}; -use std::thread::spawn; -use std::str; - -use rand::prelude::*; - -use serde_cbor::to_vec; - -use cookie::Cookie; - -use crossbeam_channel::{unbounded, Sender as CbSender}; -use ws::{ listen, CloseCode, Message, Handler, Result, Request, Response}; - -use account; -use account::{Account}; -use pg::{PgPool}; -use events::Event; - -use rpc::{RpcMessage}; -use rpc; -use mtx; -use net::TOKEN_HEADER; - -struct Connection { - pub id: usize, - pub ws: CbSender, - pool: PgPool, - account: Option, - events: CbSender, -} - -// we unwrap everything in here cause really -// we don't care if this panics -// it's run in a thread so it's supposed to bail -// when it encounters errors -impl Handler for Connection { - fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { - info!("websocket connected account={:?}", self.account); - - // tell events we have connected - self.events.send(Event::Connect(self.id, self.account.clone(), self.ws.clone())).unwrap(); - - // if user logged in do some prep work - if let Some(ref a) = self.account { - self.ws.send(RpcMessage::AccountState(a.clone())).unwrap(); - self.events.send(Event::Subscribe(self.id, a.id)).unwrap(); - - let db = self.pool.get().unwrap(); - let mut tx = db.transaction().unwrap(); - - // send account constructs - let account_constructs = account::account_constructs(&mut tx, a).unwrap(); - self.ws.send(rpc::RpcMessage::AccountConstructs(account_constructs)).unwrap(); - - // get account instances - // and send them to the client - let account_instances = account::account_instances(&mut tx, a).unwrap(); - self.ws.send(rpc::RpcMessage::AccountInstances(account_instances)).unwrap(); - - let shop = mtx::account_shop(&mut tx, &a).unwrap(); - self.ws.send(rpc::RpcMessage::AccountShop(shop)).unwrap(); - - // tx should do nothing - tx.commit().unwrap(); - } - - Ok(()) - } - - fn on_message(&mut self, msg: Message) -> Result<()> { - match msg { - Message::Binary(msg) => { - let begin = Instant::now(); - let db_connection = self.pool.get().unwrap(); - - match rpc::receive(msg, &db_connection, &self, begin, &self.account) { - Ok(reply) => { - // if the user queries the state of something - // we tell events to push updates to them - match reply { - RpcMessage::AccountState(ref v) => - self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), - RpcMessage::GameState(ref v) => - self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), - RpcMessage::InstanceState(ref v) => - self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), - _ => (), - }; - - self.ws.send(reply).unwrap(); - }, - Err(e) => { - warn!("{:?}", e); - self.ws.send(RpcMessage::Error(e.to_string())).unwrap(); - }, - }; - }, - _ => (), - }; - Ok(()) - } - - fn on_close(&mut self, _: CloseCode, _: &str) { - info!("websocket disconnected account={:?}", self.account); - self.events.send(Event::Disconnect(self.id)).unwrap(); - } - - fn on_request(&mut self, req: &Request) -> Result { - let res = Response::from_request(req)?; - - if let Some(cl) = req.header("Cookie") { - let unauth = || Ok(Response::new(401, "Unauthorized", b"401 - Unauthorized".to_vec())); - let cookie_list = match str::from_utf8(cl) { - Ok(cl) => cl, - Err(_) => return unauth(), - }; - - for s in cookie_list.split(";").map(|s| s.trim()) { - let cookie = match Cookie::parse(s) { - Ok(c) => c, - Err(_) => return unauth(), - }; - - // got auth token - if cookie.name() == TOKEN_HEADER { - let db = self.pool.get().unwrap(); - match account::from_token(&db, cookie.value().to_string()) { - Ok(a) => self.account = Some(a), - Err(_) => return unauth(), - } - } - }; - }; - - Ok(res) - } -} - - -pub fn start(pool: PgPool, events_tx: CbSender) { - let mut rng = thread_rng(); - listen("127.0.0.1:40055", move |out| { - - // we give the tx half to the connection object - // which in turn passes a clone to the events system - // the rx half goes into a thread where it waits for messages - // that need to be delivered to the client - // both the ws message handler and the events thread must use - // this channel to send messages - let (tx, rx) = unbounded::(); - - spawn(move || { - loop { - match rx.recv() { - Ok(n) => { - let response = to_vec(&n).unwrap(); - out.send(Message::Binary(response)).unwrap(); - } - // we done - Err(_e) => { - break; - }, - }; - } - }); - - Connection { - id: rng.gen::(), - account: None, - ws: tx, - pool: pool.clone(), - events: events_tx.clone(), - } - }).unwrap(); -}