diff --git a/server/src/events.rs b/server/src/events.rs index f90a2e2f..bb19f8bc 100644 --- a/server/src/events.rs +++ b/server/src/events.rs @@ -1,29 +1,53 @@ +use std::collections::{HashMap, HashSet}; + // Db Commons use uuid::Uuid; -use fallible_iterator::{FallibleIterator}; use failure::Error; use crossbeam_channel::{unbounded, Sender, Receiver}; use account; +use account::Account; use game; use instance; use pg::{Db, PgPool}; use rpc::RpcMessage; -use websocket::{WsMessage}; -#[derive(Clone)] +type Id = usize; + pub struct Events { - pub tx: Sender, - rx: Receiver, + pub tx: Sender, + rx: Receiver, pool: PgPool, + + clients: HashMap, +} + +#[derive(Debug,Clone)] +pub enum Event { + WsConnect(Id, Option, Sender), + WsDisconnect(Id), + WsSubscribe(Id, Uuid), + WsUnsubscribe(Id, Uuid), +} + + +struct WsClient { + id: Id, + tx: Sender, + subs: HashSet, } impl Events { pub fn new(pool: PgPool) -> Events { let (tx, rx) = unbounded(); - return Events { tx, rx, pool }; + Events { + tx, + rx, + pool, + clients: HashMap::new(), + } } pub fn listen(&mut self) -> Result<(), Error> { @@ -32,6 +56,9 @@ impl Events { Ok(m) => { self.on_message(m)?; }, + + // idk if this is a good idea + // possibly just log errors and continue... Err(e) => { return Err(format_err!("events error err={:?}", e)); }, @@ -39,17 +66,50 @@ impl Events { } } - fn on_message(&mut self, msg: WsMessage) -> Result<(), Error> { + fn on_message(&mut self, msg: Event) -> Result<(), Error> { match msg { - WsMessage::Connect(tx) => { - info!("client connected {:?}", tx); + Event::WsConnect(id, account, tx) => { + info!("client connected to events id={:?} account={:?}", id, account); + + let client = WsClient { id, tx, subs: HashSet::new() }; + self.clients.insert(id, client); + + info!("events clients={:?}", self.clients.len()); Ok(()) }, - WsMessage::Disconnect => { - info!("client disconnected"); + Event::WsDisconnect(id) => { + info!("client disconnected from events id={:?}", id); + + self.clients.remove(&id); + + info!("events clients={:?}", self.clients.len()); Ok(()) } - _ => return Err(format_err!("events received unhandled msg={:?}", msg)), + Event::WsSubscribe(id, obj) => { + info!("client subscribed to updates from object id={:?} object={:?}", id, obj); + + match self.clients.get_mut(&id) { + Some(client) => { + client.subs.insert(obj); + info!("client subscriptions={:?}", client.subs.len()); + Ok(()) + }, + None => return Err(format_err!("unknown client {:?}", id)) + } + }, + Event::WsUnsubscribe(id, obj) => { + info!("client subscribed to updates from object id={:?} object={:?}", id, obj); + + match self.clients.get_mut(&id) { + Some(mut client) => { + client.subs.remove(&obj); + info!("client subscriptions={:?}", client.subs.len()); + Ok(()) + }, + None => return Err(format_err!("unknown client {:?}", id)) + } + }, + } } } diff --git a/server/src/websocket.rs b/server/src/websocket.rs index b48d593d..8b12004e 100644 --- a/server/src/websocket.rs +++ b/server/src/websocket.rs @@ -14,37 +14,55 @@ use ws::{ listen, CloseCode, Message, Handler, Result, Request, Response}; use account; use account::{Account}; use pg::{PgPool}; +use events::Event; use rpc::{RpcMessage}; use rpc; +use mtx; use net::TOKEN_HEADER; -// these are messages relating to the lifecycle -// of ws clients -#[derive(Debug,Clone)] -pub enum WsMessage { - Connect(CbSender), - Disconnect, -} - struct Connection { pub id: usize, pub ws: CbSender, pool: PgPool, account: Option, - events: CbSender, + events: CbSender, } +// we unwrap everything in here cause really +// we don't care if this panics +// it's run in a thread so it's supposed to bail +// when it encounters errors impl Handler for Connection { fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { - info!("connected account={:?}", self.account); - - if let Some(ref a) = self.account { - self.ws.send(RpcMessage::AccountState(a.clone())).unwrap(); - } + info!("websocket connected account={:?}", self.account); // tell events we have connected - self.events.send(WsMessage::Connect(self.ws.clone())); + self.events.send(Event::WsConnect(self.id, self.account.clone(), self.ws.clone())).unwrap(); + + // if user logged in do some prep work + if let Some(ref a) = self.account { + self.ws.send(RpcMessage::AccountState(a.clone())).unwrap(); + self.events.send(Event::WsSubscribe(self.id, a.id)).unwrap(); + + let db = self.pool.get().unwrap(); + let mut tx = db.transaction().unwrap(); + + // send account constructs + let account_constructs = account::account_constructs(&mut tx, a).unwrap(); + self.ws.send(rpc::RpcMessage::AccountConstructs(account_constructs)).unwrap(); + + // get account instances + // and send them to the client + let account_instances = account::account_instances(&mut tx, a).unwrap(); + self.ws.send(rpc::RpcMessage::AccountInstances(account_instances)).unwrap(); + + let shop = mtx::account_shop(&mut tx, &a).unwrap(); + self.ws.send(rpc::RpcMessage::AccountShop(shop)).unwrap(); + + // tx should do nothing + tx.commit().unwrap(); + } Ok(()) } @@ -57,6 +75,18 @@ impl Handler for Connection { match rpc::receive(msg, &db_connection, begin, &self.account) { Ok(reply) => { + // if the user queries the state of something + // we tell events to push updates to them + match reply { + RpcMessage::AccountState(ref v) => + self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(), + RpcMessage::GameState(ref v) => + self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(), + RpcMessage::InstanceState(ref v) => + self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(), + _ => (), + }; + self.ws.send(reply).unwrap(); }, Err(e) => { @@ -71,8 +101,8 @@ impl Handler for Connection { } fn on_close(&mut self, _: CloseCode, _: &str) { - info!("socket disconnected account={:?}", self.account); - // self.ws.shutdown().unwrap() + info!("websocket disconnected account={:?}", self.account); + self.events.send(Event::WsDisconnect(self.id)).unwrap(); } fn on_request(&mut self, req: &Request) -> Result { @@ -107,7 +137,7 @@ impl Handler for Connection { } -pub fn start(pool: PgPool, events_tx: CbSender) { +pub fn start(pool: PgPool, events_tx: CbSender) { let mut rng = thread_rng(); listen("127.0.0.1:40055", move |out| {