337 lines
11 KiB
Rust
337 lines
11 KiB
Rust
use std::collections::{HashMap, HashSet};
|
|
use std::thread::{spawn, sleep};
|
|
use std::time;
|
|
|
|
// Db Commons
|
|
use uuid::Uuid;
|
|
|
|
use failure::Error;
|
|
use failure::{format_err};
|
|
|
|
use crossbeam_channel::{Sender, Receiver};
|
|
|
|
use account::Account;
|
|
use names;
|
|
|
|
use rpc::RpcMessage;
|
|
use warden::{GameEvent};
|
|
|
|
pub type EventsTx = Sender<Event>;
|
|
type Id = Uuid;
|
|
|
|
// this is pretty heavyweight
|
|
// but it makes the ergonomics easy
|
|
// and prevents message mania
|
|
// good candidate to trim if the mm is slow
|
|
#[derive(Debug, Clone)]
|
|
pub struct PvpRequest {
|
|
pub id: Id,
|
|
pub tx: Sender<RpcMessage>,
|
|
pub account: Uuid,
|
|
}
|
|
|
|
pub struct Events {
|
|
pub tx: Sender<Event>,
|
|
rx: Receiver<Event>,
|
|
|
|
warden: Sender<GameEvent>,
|
|
clients: HashMap<Id, WsClient>,
|
|
}
|
|
|
|
#[derive(Debug,Clone)]
|
|
pub enum Event {
|
|
// ws lifecycle
|
|
Connect(Id, Account, Sender<RpcMessage>),
|
|
Disconnect(Id),
|
|
Subscribe(Id, Uuid),
|
|
Unsubscribe(Id, Uuid),
|
|
|
|
// notifications
|
|
Push(Uuid, RpcMessage),
|
|
|
|
// client events
|
|
Queue(Id),
|
|
Invite(Id),
|
|
Join(Id, String),
|
|
Joined(Id),
|
|
Leave(Id),
|
|
|
|
Chat(Id, Uuid, String),
|
|
ChatClear(Id, Uuid),
|
|
}
|
|
|
|
struct WsClient {
|
|
id: Id,
|
|
tx: Sender<RpcMessage>,
|
|
subs: HashSet<Uuid>,
|
|
chat: Option<(Uuid, String)>,
|
|
pvp: bool,
|
|
invite: Option<String>,
|
|
}
|
|
|
|
impl Events {
|
|
pub fn new(tx: Sender<Event>, rx: Receiver<Event>, warden: Sender<GameEvent>) -> Events {
|
|
Events {
|
|
tx,
|
|
rx,
|
|
warden,
|
|
clients: HashMap::new(),
|
|
}
|
|
}
|
|
|
|
pub fn listen(mut self) -> Result<(), Error> {
|
|
loop {
|
|
match self.rx.recv() {
|
|
Ok(m) => {
|
|
match self.event(m) {
|
|
Ok(()) => (), // :)
|
|
Err(e) => {
|
|
warn!("err={:?}", e);
|
|
}
|
|
}
|
|
},
|
|
|
|
Err(e) => {
|
|
warn!("events error err={:?}", e);
|
|
},
|
|
};
|
|
}
|
|
}
|
|
|
|
fn event(&mut self, msg: Event) -> Result<(), Error> {
|
|
match msg {
|
|
Event::Connect(id, account, tx) => {
|
|
info!("connect id={:?} account={:?}", id, account);
|
|
|
|
let client = WsClient { id,
|
|
tx,
|
|
subs: HashSet::new(),
|
|
pvp: false,
|
|
invite: None,
|
|
chat: None,
|
|
};
|
|
|
|
self.clients.insert(id, client);
|
|
|
|
info!("clients={:?}", self.clients.len());
|
|
Ok(())
|
|
},
|
|
|
|
Event::Disconnect(id) => {
|
|
info!("disconnect id={:?}", id);
|
|
|
|
self.clients.remove(&id);
|
|
|
|
info!("clients={:?}", self.clients.len());
|
|
Ok(())
|
|
},
|
|
|
|
Event::Subscribe(id, obj) => {
|
|
trace!("subscribe id={:?} object={:?}", id, obj);
|
|
|
|
match self.clients.get_mut(&id) {
|
|
Some(client) => {
|
|
client.subs.insert(obj);
|
|
trace!("client={:?} subscriptions={:?}", id, client.subs.len());
|
|
Ok(())
|
|
},
|
|
None => return Err(format_err!("unknown client {:?}", id))
|
|
}
|
|
},
|
|
|
|
Event::Unsubscribe(id, obj) => {
|
|
trace!("unsubscribe id={:?} object={:?}", id, obj);
|
|
|
|
match self.clients.get_mut(&id) {
|
|
Some(client) => {
|
|
client.subs.remove(&obj);
|
|
trace!("unsubscribe subscriptions removed={:?}", client.subs.len());
|
|
Ok(())
|
|
},
|
|
None => return Err(format_err!("unknown client {:?}", id))
|
|
}
|
|
},
|
|
|
|
Event::Push(id, msg) => {
|
|
trace!("push id={:?}", id);
|
|
|
|
let mut subs = 0;
|
|
let mut dead = vec![];
|
|
|
|
for (client_id, client) in self.clients.iter() {
|
|
if client.subs.contains(&id) {
|
|
subs += 1;
|
|
|
|
let redacted = match msg {
|
|
RpcMessage::InstanceState(ref i) => RpcMessage::InstanceState(i.clone().redact(client.id)),
|
|
RpcMessage::GameState(ref i) => RpcMessage::GameState(i.clone().redact(client.id)),
|
|
_ => msg.clone(),
|
|
};
|
|
|
|
match client.tx.send(redacted) {
|
|
Ok(_) => (),
|
|
Err(e) => {
|
|
warn!("unable to send msg to client err={:?}", e);
|
|
dead.push(*client_id);
|
|
},
|
|
};
|
|
}
|
|
}
|
|
|
|
if !dead.is_empty() {
|
|
trace!("dead connections={:?}", dead.len());
|
|
for id in dead.iter() {
|
|
self.clients.remove(id);
|
|
}
|
|
}
|
|
|
|
trace!("push subscribers={:?}", subs);
|
|
|
|
Ok(())
|
|
},
|
|
|
|
Event::Queue(id) => {
|
|
// check whether request is valid
|
|
{
|
|
let c = self.clients.get(&id)
|
|
.ok_or(format_err!("connection not found id={:?}", id))?;
|
|
|
|
info!("pvp queue request id={:?} account={:?}", c.id, c.id);
|
|
}
|
|
|
|
// create the req for the already queued opponent
|
|
if let Some(opp_req) = match self.clients.iter_mut().find(|(c_id, c)| c.pvp && **c_id != id) {
|
|
Some((q_id, q)) => {
|
|
q.pvp = false;
|
|
Some(PvpRequest { id: *q_id, account: q.id, tx: q.tx.clone() })
|
|
},
|
|
None => None,
|
|
} {
|
|
// combine the requests and send to warden
|
|
let c = self.clients.get_mut(&id)
|
|
.ok_or(format_err!("connection not found id={:?}", id))?;
|
|
|
|
let player_req = PvpRequest { id: c.id, account: c.id, tx: c.tx.clone() };
|
|
self.warden.send(GameEvent::Match((opp_req, player_req)))?;
|
|
|
|
return Ok(())
|
|
}
|
|
|
|
// or flag the requester as pvp ready
|
|
let requester = self.clients.get_mut(&id).unwrap();
|
|
requester.pvp = true;
|
|
requester.tx.send(RpcMessage::QueueJoined(()))?;
|
|
info!("joined game queue id={:?} account={:?}", requester.id, requester.id);
|
|
return Ok(());
|
|
},
|
|
|
|
Event::Invite(id) => {
|
|
// check whether request is valid
|
|
let c = self.clients.get_mut(&id)
|
|
.ok_or(format_err!("connection not found id={:?}", id))?;
|
|
|
|
let code = names::name().split_whitespace().collect::<Vec<&str>>().join("-");
|
|
info!("pvp invite request id={:?} account={:?} code={:?}", c.id, c.id, code);
|
|
c.invite = Some(code.clone());
|
|
c.tx.send(RpcMessage::Invite(code))?;
|
|
return Ok(());
|
|
},
|
|
|
|
Event::Join(id, code) => {
|
|
// check whether request is valid
|
|
let c = self.clients.get(&id)
|
|
.ok_or(format_err!("connection not found id={:?}", id))?;
|
|
|
|
info!("pvp join request id={:?} account={:?} code={:?}", c.id, c.id, code);
|
|
|
|
let inv = self.clients.iter()
|
|
.filter(|(_id, c)| c.invite.is_some())
|
|
.find(|(_id, c)| match c.invite {
|
|
Some(ref c) => *c == code,
|
|
None => false,
|
|
})
|
|
.map(|(_id, c)| PvpRequest { id: c.id, account: c.id, tx: c.tx.clone() })
|
|
.ok_or(format_err!("invite expired code={:?}", code))?;
|
|
|
|
let join = PvpRequest { id: c.id, account: c.id, tx: c.tx.clone() };
|
|
|
|
self.warden.send(GameEvent::Match((join, inv)))?;
|
|
return Ok(());
|
|
},
|
|
|
|
Event::Joined(id) => {
|
|
// check whether request is valid
|
|
let c = self.clients.get_mut(&id)
|
|
.ok_or(format_err!("connection not found id={:?}", id))?;
|
|
|
|
c.pvp = false;
|
|
c.invite = None;
|
|
return Ok(());
|
|
},
|
|
|
|
Event::Leave(id) => {
|
|
// check whether request is valid
|
|
let c = self.clients.get_mut(&id)
|
|
.ok_or(format_err!("connection not found id={:?}", id))?;
|
|
|
|
c.pvp = false;
|
|
c.tx.send(RpcMessage::QueueLeft(()))?;
|
|
info!("left game queue id={:?} account={:?}", c.id, c.id);
|
|
return Ok(());
|
|
},
|
|
|
|
|
|
Event::Chat(id, instance, msg) => {
|
|
// set the chat state of this connection
|
|
{
|
|
let c = self.clients.get_mut(&id)
|
|
.ok_or(format_err!("connection not found id={:?}", id))?;
|
|
|
|
if c.chat.is_some() {
|
|
return Ok(());
|
|
}
|
|
|
|
c.chat = Some((instance, msg));
|
|
|
|
let events_tx = self.tx.clone();
|
|
spawn(move || {
|
|
sleep(time::Duration::from_secs(3));
|
|
events_tx.send(Event::ChatClear(id, instance)).unwrap();
|
|
});
|
|
}
|
|
|
|
// now collect all listeners of this instance
|
|
|
|
let chat_state: HashMap<Uuid, String> = self.clients.iter()
|
|
.filter(|(_id, c)| match c.chat {
|
|
Some(ref chat) => chat.0 == instance,
|
|
None => false,
|
|
})
|
|
.map(|(_id, c)| (c.id, c.chat.clone().unwrap().1))
|
|
.collect();
|
|
|
|
return self.event(Event::Push(instance, RpcMessage::InstanceChat(chat_state)));
|
|
},
|
|
|
|
Event::ChatClear(id, instance) => {
|
|
{
|
|
match self.clients.get_mut(&id) {
|
|
Some(c) => c.chat = None,
|
|
None => (),
|
|
};
|
|
}
|
|
|
|
let chat_state: HashMap<Uuid, String> = self.clients.iter()
|
|
.filter(|(_id, c)| match c.chat {
|
|
Some(ref chat) => chat.0 == instance,
|
|
None => false,
|
|
})
|
|
.map(|(_id, c)| (c.id, c.chat.clone().unwrap().1))
|
|
.collect();
|
|
|
|
return self.event(Event::Push(instance, RpcMessage::InstanceChat(chat_state)));
|
|
}
|
|
}
|
|
}
|
|
}
|