use std::collections::{HashMap, HashSet}; // Db Commons use uuid::Uuid; use failure::Error; use crossbeam_channel::{Sender, Receiver}; use account; use account::Account; use game; use instance; use pg::{Db, PgPool}; use rpc::RpcMessage; use warden::{GameEvent}; use mail::Mail; pub type EventsTx = Sender; type Id = usize; // this is pretty heavyweight // but it makes the ergonomics easy // and prevents message mania // good candidate to trim if the mm is slow #[derive(Debug, Clone)] pub struct PvpRequest { pub id: Id, pub tx: Sender, pub account: Account, } pub struct Events { pub tx: Sender, rx: Receiver, mail: Sender, warden: Sender, queue: Option, clients: HashMap, } #[derive(Debug,Clone)] pub enum Event { // ws lifecycle Connect(Id, Option, Sender), Disconnect(Id), Subscribe(Id, Uuid), Unsubscribe(Id, Uuid), // notifications Push(Uuid, RpcMessage), // client events Queue(PvpRequest), } struct WsClient { id: Id, tx: Sender, subs: HashSet, } impl Events { pub fn new(tx: Sender, rx: Receiver, warden: Sender, mail: Sender) -> Events { Events { tx, rx, warden, mail, queue: None, clients: HashMap::new(), } } pub fn listen(mut self) -> Result<(), Error> { loop { match self.rx.recv() { Ok(m) => { match self.event(m) { Ok(()) => (), // :) Err(e) => { warn!("err={:?}", e); } } }, Err(e) => { return Err(format_err!("events error err={:?}", e)); }, }; } } fn get_client(&mut self, id: Id) -> Result<&mut WsClient, Error> { match self.clients.get_mut(&id) { Some(c) => Ok(c), None => Err(format_err!("connection not found id={:?}", id)), } } fn remove_client(&mut self, id: Id) { self.clients.remove(&id); } fn event(&mut self, msg: Event) -> Result<(), Error> { match msg { Event::Connect(id, account, tx) => { info!("connect id={:?} account={:?}", id, account); let client = WsClient { id, tx, subs: HashSet::new() }; self.clients.insert(id, client); info!("clients={:?}", self.clients.len()); Ok(()) }, Event::Disconnect(id) => { info!("disconnect id={:?}", id); self.clients.remove(&id); info!("clients={:?}", self.clients.len()); Ok(()) }, Event::Subscribe(id, obj) => { info!("subscribe id={:?} object={:?}", id, obj); match self.clients.get_mut(&id) { Some(client) => { client.subs.insert(obj); info!("client subscriptions={:?}", client.subs.len()); Ok(()) }, None => return Err(format_err!("unknown client {:?}", id)) } }, Event::Unsubscribe(id, obj) => { info!("unsubscribe id={:?} object={:?}", id, obj); match self.clients.get_mut(&id) { Some(mut client) => { client.subs.remove(&obj); info!("unsubscribe subscriptions removed={:?}", client.subs.len()); Ok(()) }, None => return Err(format_err!("unknown client {:?}", id)) } }, Event::Push(id, msg) => { info!("push id={:?}", id); let mut subs = 0; let mut dead = vec![]; for (client_id, client) in self.clients.iter() { if client.subs.contains(&id) { subs += 1; match client.tx.send(msg.clone()) { Ok(_) => (), Err(e) => { warn!("unable to send msg to client err={:?}", e); dead.push(*client_id); }, }; } } if !dead.is_empty() { info!("dead connections={:?}", dead.len()); dead.iter().for_each(|id| self.remove_client(*id)); } info!("push subscribers={:?}", subs); Ok(()) }, Event::Queue(req) => { info!("queue id={:?} account={:?}", req.id, req.account); self.queue = match self.queue { Some(ref q_req) => { info!("game queue pair found a={:?} b={:?}", req.account, q_req.account); self.warden.send(GameEvent::Match((req, q_req.clone())))?; None }, None => { info!("joined game queue id={:?} account={:?}", req.id, req.account); Some(req) }, }; Ok(()) }, } } }