use std::collections::{HashMap, HashSet}; use std::thread::{spawn, sleep}; use std::time; // Db Commons use uuid::Uuid; use failure::Error; use failure::{err_msg, format_err}; use crossbeam_channel::{Sender, Receiver}; use account; use account::Account; use game; use instance; use names; use pg::{Db, PgPool}; use rpc::RpcMessage; use warden::{GameEvent}; use mail::Mail; pub type EventsTx = Sender; type Id = usize; // this is pretty heavyweight // but it makes the ergonomics easy // and prevents message mania // good candidate to trim if the mm is slow #[derive(Debug, Clone)] pub struct PvpRequest { pub id: Id, pub tx: Sender, pub account: Uuid, } pub struct Events { pub tx: Sender, rx: Receiver, mail: Sender, warden: Sender, clients: HashMap, } #[derive(Debug,Clone)] pub enum Event { // ws lifecycle Connect(Id, Option, Sender), Disconnect(Id), Subscribe(Id, Uuid), Unsubscribe(Id, Uuid), // notifications Push(Uuid, RpcMessage), // client events Queue(Id), Invite(Id), Join(Id, String), Joined(Id), Chat(Id, Uuid, String), ChatClear(Id, Uuid), } struct WsClient { id: Id, account: Option, tx: Sender, subs: HashSet, chat: Option<(Uuid, String)>, pvp: bool, invite: Option, } impl Events { pub fn new(tx: Sender, rx: Receiver, warden: Sender, mail: Sender) -> Events { Events { tx, rx, warden, mail, clients: HashMap::new(), } } pub fn listen(mut self) -> Result<(), Error> { loop { match self.rx.recv() { Ok(m) => { match self.event(m) { Ok(()) => (), // :) Err(e) => { warn!("err={:?}", e); } } }, Err(e) => { warn!("events error err={:?}", e); }, }; } } fn get_client(&mut self, id: Id) -> Result<&mut WsClient, Error> { match self.clients.get_mut(&id) { Some(c) => Ok(c), None => Err(format_err!("connection not found id={:?}", id)), } } fn remove_client(&mut self, id: Id) { self.clients.remove(&id); } fn event(&mut self, msg: Event) -> Result<(), Error> { match msg { Event::Connect(id, account, tx) => { info!("connect id={:?} account={:?}", id, account); let account_id = match account { Some(a) => Some(a.id), None => None, }; let client = WsClient { id, tx, account: account_id, subs: HashSet::new(), pvp: false, invite: None, chat: None, }; self.clients.insert(id, client); info!("clients={:?}", self.clients.len()); Ok(()) }, Event::Disconnect(id) => { info!("disconnect id={:?}", id); self.clients.remove(&id); info!("clients={:?}", self.clients.len()); Ok(()) }, Event::Subscribe(id, obj) => { trace!("subscribe id={:?} object={:?}", id, obj); match self.clients.get_mut(&id) { Some(client) => { client.subs.insert(obj); trace!("client={:?} subscriptions={:?}", id, client.subs.len()); Ok(()) }, None => return Err(format_err!("unknown client {:?}", id)) } }, Event::Unsubscribe(id, obj) => { trace!("unsubscribe id={:?} object={:?}", id, obj); match self.clients.get_mut(&id) { Some(mut client) => { client.subs.remove(&obj); trace!("unsubscribe subscriptions removed={:?}", client.subs.len()); Ok(()) }, None => return Err(format_err!("unknown client {:?}", id)) } }, Event::Push(id, msg) => { trace!("push id={:?}", id); let mut subs = 0; let mut dead = vec![]; for (client_id, client) in self.clients.iter() { if client.subs.contains(&id) { subs += 1; let redacted = match client.account { Some(a) => match msg { RpcMessage::InstanceState(ref i) => RpcMessage::InstanceState(i.clone().redact(a)), RpcMessage::GameState(ref i) => RpcMessage::GameState(i.clone().redact(a)), _ => msg.clone(), } None => msg.clone(), }; match client.tx.send(redacted) { Ok(_) => (), Err(e) => { warn!("unable to send msg to client err={:?}", e); dead.push(*client_id); }, }; } } if !dead.is_empty() { trace!("dead connections={:?}", dead.len()); dead.iter().for_each(|id| self.remove_client(*id)); } trace!("push subscribers={:?}", subs); Ok(()) }, Event::Queue(id) => { // check whether request is valid { let c = self.clients.get(&id) .ok_or(format_err!("connection not found id={:?}", id))?; if let None = c.account { return Err(err_msg("cannot join pvp queue anonymously")); } info!("pvp queue request id={:?} account={:?}", c.id, c.account); } // create the req for the already queued opponent if let Some(opp_req) = match self.clients.iter_mut().find(|(c_id, c)| c.pvp && **c_id != id) { Some((q_id, q)) => { q.pvp = false; Some(PvpRequest { id: *q_id, account: q.account.unwrap(), tx: q.tx.clone() }) }, None => None, } { // combine the requests and send to warden let c = self.clients.get_mut(&id) .ok_or(format_err!("connection not found id={:?}", id))?; let player_req = PvpRequest { id: c.id, account: c.account.unwrap(), tx: c.tx.clone() }; self.warden.send(GameEvent::Match((opp_req, player_req)))?; return Ok(()) } // or flag the requester as pvp ready let requester = self.clients.get_mut(&id).unwrap(); requester.pvp = true; info!("joined game queue id={:?} account={:?}", requester.id, requester.account); return Ok(()); }, Event::Invite(id) => { // check whether request is valid let c = self.clients.get_mut(&id) .ok_or(format_err!("connection not found id={:?}", id))?; if let None = c.account { return Err(err_msg("cannot join pvp queue anonymously")); } let code = names::name().split_whitespace().collect::>().join("-"); info!("pvp invite request id={:?} account={:?} code={:?}", c.id, c.account, code); c.invite = Some(code.clone()); c.tx.send(RpcMessage::Invite(code))?; return Ok(()); }, Event::Join(id, code) => { // check whether request is valid let c = self.clients.get(&id) .ok_or(format_err!("connection not found id={:?}", id))?; if let None = c.account { return Err(err_msg("cannot join pvp queue anonymously")); } info!("pvp join request id={:?} account={:?} code={:?}", c.id, c.account, code); let inv = self.clients.iter() .filter(|(_id, c)| c.invite.is_some()) .find(|(_id, c)| match c.invite { Some(ref c) => *c == code, None => false, }) .map(|(_id, c)| PvpRequest { id: c.id, account: c.account.unwrap(), tx: c.tx.clone() }) .ok_or(format_err!("invite expired code={:?}", code))?; let join = PvpRequest { id: c.id, account: c.account.unwrap(), tx: c.tx.clone() }; self.warden.send(GameEvent::Match((join, inv)))?; return Ok(()); }, Event::Joined(id) => { // check whether request is valid let c = self.clients.get_mut(&id) .ok_or(format_err!("connection not found id={:?}", id))?; c.pvp = false; c.invite = None; return Ok(()); }, Event::Chat(id, instance, msg) => { // set the chat state of this connection { let c = self.clients.get_mut(&id) .ok_or(format_err!("connection not found id={:?}", id))?; if c.chat.is_some() { return Ok(()); } c.chat = Some((instance, msg)); let events_tx = self.tx.clone(); spawn(move || { sleep(time::Duration::from_secs(3)); events_tx.send(Event::ChatClear(id, instance)).unwrap(); }); } // now collect all listeners of this instance let chat_state: HashMap = self.clients.iter() .filter(|(_id, c)| c.account.is_some()) .filter(|(_id, c)| match c.chat { Some(ref chat) => chat.0 == instance, None => false, }) .map(|(_id, c)| (c.account.unwrap(), c.chat.clone().unwrap().1)) .collect(); return self.event(Event::Push(instance, RpcMessage::InstanceChat(chat_state))); }, Event::ChatClear(id, instance) => { { match self.clients.get_mut(&id) { Some(c) => c.chat = None, None => (), }; } let chat_state: HashMap = self.clients.iter() .filter(|(_id, c)| c.account.is_some()) .filter(|(_id, c)| match c.chat { Some(ref chat) => chat.0 == instance, None => false, }) .map(|(_id, c)| (c.account.unwrap(), c.chat.clone().unwrap().1)) .collect(); return self.event(Event::Push(instance, RpcMessage::InstanceChat(chat_state))); } } } }