mnml/server/src/events.rs
2019-09-15 15:42:47 +10:00

287 lines
8.8 KiB
Rust

use std::collections::{HashMap, HashSet};
// Db Commons
use uuid::Uuid;
use failure::Error;
use failure::{err_msg, format_err};
use crossbeam_channel::{Sender, Receiver};
use account;
use account::Account;
use game;
use instance;
use names;
use pg::{Db, PgPool};
use rpc::RpcMessage;
use warden::{GameEvent};
use mail::Mail;
pub type EventsTx = Sender<Event>;
type Id = usize;
// this is pretty heavyweight
// but it makes the ergonomics easy
// and prevents message mania
// good candidate to trim if the mm is slow
#[derive(Debug, Clone)]
pub struct PvpRequest {
pub id: Id,
pub tx: Sender<RpcMessage>,
pub account: Uuid,
}
pub struct Events {
pub tx: Sender<Event>,
rx: Receiver<Event>,
mail: Sender<Mail>,
warden: Sender<GameEvent>,
clients: HashMap<Id, WsClient>,
}
#[derive(Debug,Clone)]
pub enum Event {
// ws lifecycle
Connect(Id, Option<Account>, Sender<RpcMessage>),
Disconnect(Id),
Subscribe(Id, Uuid),
Unsubscribe(Id, Uuid),
// notifications
Push(Uuid, RpcMessage),
// client events
Queue(Id),
Invite(Id),
Join(Id, String),
Joined(Id),
}
struct WsClient {
id: Id,
account: Option<Uuid>,
tx: Sender<RpcMessage>,
subs: HashSet<Uuid>,
pvp: bool,
invite: Option<String>,
}
impl Events {
pub fn new(tx: Sender<Event>, rx: Receiver<Event>, warden: Sender<GameEvent>, mail: Sender<Mail>) -> Events {
Events {
tx,
rx,
warden,
mail,
clients: HashMap::new(),
}
}
pub fn listen(mut self) -> Result<(), Error> {
loop {
match self.rx.recv() {
Ok(m) => {
match self.event(m) {
Ok(()) => (), // :)
Err(e) => {
warn!("err={:?}", e);
}
}
},
Err(e) => {
warn!("events error err={:?}", e);
},
};
}
}
fn get_client(&mut self, id: Id) -> Result<&mut WsClient, Error> {
match self.clients.get_mut(&id) {
Some(c) => Ok(c),
None => Err(format_err!("connection not found id={:?}", id)),
}
}
fn remove_client(&mut self, id: Id) {
self.clients.remove(&id);
}
fn event(&mut self, msg: Event) -> Result<(), Error> {
match msg {
Event::Connect(id, account, tx) => {
info!("connect id={:?} account={:?}", id, account);
let account_id = match account {
Some(a) => Some(a.id),
None => None,
};
let client = WsClient { id, tx, account: account_id, subs: HashSet::new(), pvp: false, invite: None };
self.clients.insert(id, client);
info!("clients={:?}", self.clients.len());
Ok(())
},
Event::Disconnect(id) => {
info!("disconnect id={:?}", id);
self.clients.remove(&id);
info!("clients={:?}", self.clients.len());
Ok(())
},
Event::Subscribe(id, obj) => {
trace!("subscribe id={:?} object={:?}", id, obj);
match self.clients.get_mut(&id) {
Some(client) => {
client.subs.insert(obj);
trace!("client={:?} subscriptions={:?}", id, client.subs.len());
Ok(())
},
None => return Err(format_err!("unknown client {:?}", id))
}
},
Event::Unsubscribe(id, obj) => {
trace!("unsubscribe id={:?} object={:?}", id, obj);
match self.clients.get_mut(&id) {
Some(mut client) => {
client.subs.remove(&obj);
trace!("unsubscribe subscriptions removed={:?}", client.subs.len());
Ok(())
},
None => return Err(format_err!("unknown client {:?}", id))
}
},
Event::Push(id, msg) => {
trace!("push id={:?}", id);
let mut subs = 0;
let mut dead = vec![];
for (client_id, client) in self.clients.iter() {
if client.subs.contains(&id) {
subs += 1;
match client.tx.send(msg.clone()) {
Ok(_) => (),
Err(e) => {
warn!("unable to send msg to client err={:?}", e);
dead.push(*client_id);
},
};
}
}
if !dead.is_empty() {
trace!("dead connections={:?}", dead.len());
dead.iter().for_each(|id| self.remove_client(*id));
}
trace!("push subscribers={:?}", subs);
Ok(())
},
Event::Queue(id) => {
// check whether request is valid
{
let c = self.clients.get(&id)
.ok_or(format_err!("connection not found id={:?}", id))?;
if let None = c.account {
return Err(err_msg("cannot join pvp queue anonymously"));
}
info!("pvp queue request id={:?} account={:?}", c.id, c.account);
}
// create the req for the already queued opponent
if let Some(opp_req) = match self.clients.iter_mut().find(|(_c_id, c)| c.pvp) {
Some((q_id, q)) => {
q.pvp = false;
Some(PvpRequest { id: *q_id, account: q.account.unwrap(), tx: q.tx.clone() })
},
None => None,
} {
// combine the requests and send to warden
let c = self.clients.get_mut(&id)
.ok_or(format_err!("connection not found id={:?}", id))?;
let player_req = PvpRequest { id: c.id, account: c.account.unwrap(), tx: c.tx.clone() };
self.warden.send(GameEvent::Match((opp_req, player_req)))?;
return Ok(())
}
// or flag the requester as pvp ready
let requester = self.clients.get_mut(&id).unwrap();
requester.pvp = true;
info!("joined game queue id={:?} account={:?}", requester.id, requester.account);
return Ok(());
},
Event::Invite(id) => {
// check whether request is valid
let c = self.clients.get_mut(&id)
.ok_or(format_err!("connection not found id={:?}", id))?;
if let None = c.account {
return Err(err_msg("cannot join pvp queue anonymously"));
}
let code = names::name().split_whitespace().collect::<Vec<&str>>().join("-");
info!("pvp invite request id={:?} account={:?} code={:?}", c.id, c.account, code);
c.invite = Some(code.clone());
c.tx.send(RpcMessage::Invite(code))?;
return Ok(());
},
Event::Join(id, code) => {
// check whether request is valid
let c = self.clients.get(&id)
.ok_or(format_err!("connection not found id={:?}", id))?;
if let None = c.account {
return Err(err_msg("cannot join pvp queue anonymously"));
}
info!("pvp join request id={:?} account={:?} code={:?}", c.id, c.account, code);
let inv = self.clients.iter()
.filter(|(_id, c)| c.invite.is_some())
.find(|(_id, c)| match c.invite {
Some(ref c) => *c == code,
None => false,
})
.map(|(_id, c)| PvpRequest { id: c.id, account: c.account.unwrap(), tx: c.tx.clone() })
.ok_or(format_err!("invite not found code={:?}", code))?;
let join = PvpRequest { id: c.id, account: c.account.unwrap(), tx: c.tx.clone() };
self.warden.send(GameEvent::Match((join, inv)))?;
return Ok(());
},
Event::Joined(id) => {
// check whether request is valid
let c = self.clients.get_mut(&id)
.ok_or(format_err!("connection not found id={:?}", id))?;
c.pvp = false;
c.invite = None;
return Ok(());
},
}
}
}