205 lines
5.5 KiB
Rust
205 lines
5.5 KiB
Rust
use std::collections::{HashMap, HashSet};
|
|
|
|
// Db Commons
|
|
use uuid::Uuid;
|
|
|
|
use failure::Error;
|
|
|
|
use crossbeam_channel::{Sender, Receiver};
|
|
|
|
use account;
|
|
use account::Account;
|
|
use game;
|
|
use instance;
|
|
|
|
use pg::{Db, PgPool};
|
|
use rpc::RpcMessage;
|
|
use warden::{GameEvent};
|
|
use mail::Mail;
|
|
|
|
pub type EventsTx = Sender<Event>;
|
|
type Id = usize;
|
|
|
|
// this is pretty heavyweight
|
|
// but it makes the ergonomics easy
|
|
// and prevents message mania
|
|
// good candidate to trim if the mm is slow
|
|
#[derive(Debug, Clone)]
|
|
pub struct PvpRequest {
|
|
pub id: Id,
|
|
pub tx: Sender<RpcMessage>,
|
|
pub account: Account,
|
|
}
|
|
|
|
pub struct Events {
|
|
pub tx: Sender<Event>,
|
|
rx: Receiver<Event>,
|
|
|
|
mail: Sender<Mail>,
|
|
warden: Sender<GameEvent>,
|
|
queue: Option<PvpRequest>,
|
|
|
|
clients: HashMap<Id, WsClient>,
|
|
}
|
|
|
|
#[derive(Debug,Clone)]
|
|
pub enum Event {
|
|
// ws lifecycle
|
|
Connect(Id, Option<Account>, Sender<RpcMessage>),
|
|
Disconnect(Id),
|
|
Subscribe(Id, Uuid),
|
|
Unsubscribe(Id, Uuid),
|
|
|
|
// notifications
|
|
Push(Uuid, RpcMessage),
|
|
|
|
// client events
|
|
Queue(PvpRequest),
|
|
}
|
|
|
|
struct WsClient {
|
|
id: Id,
|
|
tx: Sender<RpcMessage>,
|
|
subs: HashSet<Uuid>,
|
|
}
|
|
|
|
impl Events {
|
|
pub fn new(tx: Sender<Event>, rx: Receiver<Event>, warden: Sender<GameEvent>, mail: Sender<Mail>) -> Events {
|
|
Events {
|
|
tx,
|
|
rx,
|
|
warden,
|
|
mail,
|
|
queue: None,
|
|
clients: HashMap::new(),
|
|
}
|
|
}
|
|
|
|
pub fn listen(mut self) -> Result<(), Error> {
|
|
loop {
|
|
match self.rx.recv() {
|
|
Ok(m) => {
|
|
match self.event(m) {
|
|
Ok(()) => (), // :)
|
|
Err(e) => {
|
|
warn!("err={:?}", e);
|
|
}
|
|
}
|
|
},
|
|
|
|
Err(e) => {
|
|
return Err(format_err!("events error err={:?}", e));
|
|
},
|
|
};
|
|
}
|
|
}
|
|
|
|
fn get_client(&mut self, id: Id) -> Result<&mut WsClient, Error> {
|
|
match self.clients.get_mut(&id) {
|
|
Some(c) => Ok(c),
|
|
None => Err(format_err!("connection not found id={:?}", id)),
|
|
}
|
|
}
|
|
|
|
fn remove_client(&mut self, id: Id) {
|
|
self.clients.remove(&id);
|
|
}
|
|
|
|
fn event(&mut self, msg: Event) -> Result<(), Error> {
|
|
match msg {
|
|
Event::Connect(id, account, tx) => {
|
|
info!("connect id={:?} account={:?}", id, account);
|
|
|
|
let client = WsClient { id, tx, subs: HashSet::new() };
|
|
self.clients.insert(id, client);
|
|
|
|
info!("clients={:?}", self.clients.len());
|
|
Ok(())
|
|
},
|
|
|
|
Event::Disconnect(id) => {
|
|
info!("disconnect id={:?}", id);
|
|
|
|
self.clients.remove(&id);
|
|
|
|
info!("clients={:?}", self.clients.len());
|
|
Ok(())
|
|
},
|
|
|
|
Event::Subscribe(id, obj) => {
|
|
info!("subscribe id={:?} object={:?}", id, obj);
|
|
|
|
match self.clients.get_mut(&id) {
|
|
Some(client) => {
|
|
client.subs.insert(obj);
|
|
info!("client={:?} subscriptions={:?}", id, client.subs.len());
|
|
Ok(())
|
|
},
|
|
None => return Err(format_err!("unknown client {:?}", id))
|
|
}
|
|
},
|
|
|
|
Event::Unsubscribe(id, obj) => {
|
|
info!("unsubscribe id={:?} object={:?}", id, obj);
|
|
|
|
match self.clients.get_mut(&id) {
|
|
Some(mut client) => {
|
|
client.subs.remove(&obj);
|
|
info!("unsubscribe subscriptions removed={:?}", client.subs.len());
|
|
Ok(())
|
|
},
|
|
None => return Err(format_err!("unknown client {:?}", id))
|
|
}
|
|
},
|
|
|
|
Event::Push(id, msg) => {
|
|
info!("push id={:?}", id);
|
|
|
|
let mut subs = 0;
|
|
let mut dead = vec![];
|
|
|
|
for (client_id, client) in self.clients.iter() {
|
|
if client.subs.contains(&id) {
|
|
subs += 1;
|
|
match client.tx.send(msg.clone()) {
|
|
Ok(_) => (),
|
|
Err(e) => {
|
|
warn!("unable to send msg to client err={:?}", e);
|
|
dead.push(*client_id);
|
|
},
|
|
};
|
|
}
|
|
}
|
|
|
|
if !dead.is_empty() {
|
|
info!("dead connections={:?}", dead.len());
|
|
dead.iter().for_each(|id| self.remove_client(*id));
|
|
}
|
|
|
|
info!("push subscribers={:?}", subs);
|
|
|
|
Ok(())
|
|
},
|
|
|
|
Event::Queue(req) => {
|
|
info!("queue id={:?} account={:?}", req.id, req.account);
|
|
|
|
self.queue = match self.queue {
|
|
Some(ref q_req) => {
|
|
info!("game queue pair found a={:?} b={:?}", req.account, q_req.account);
|
|
self.warden.send(GameEvent::Match((req, q_req.clone())))?;
|
|
None
|
|
},
|
|
None => {
|
|
info!("joined game queue id={:?} account={:?}", req.id, req.account);
|
|
Some(req)
|
|
},
|
|
};
|
|
|
|
Ok(())
|
|
},
|
|
|
|
}
|
|
}
|
|
}
|