This commit is contained in:
ntr 2019-07-26 16:39:55 +10:00
parent 4726a8c5b3
commit 7c619a8bec
2 changed files with 120 additions and 30 deletions

View File

@ -1,29 +1,53 @@
use std::collections::{HashMap, HashSet};
// Db Commons
use uuid::Uuid;
use fallible_iterator::{FallibleIterator};
use failure::Error;
use crossbeam_channel::{unbounded, Sender, Receiver};
use account;
use account::Account;
use game;
use instance;
use pg::{Db, PgPool};
use rpc::RpcMessage;
use websocket::{WsMessage};
#[derive(Clone)]
type Id = usize;
pub struct Events {
pub tx: Sender<WsMessage>,
rx: Receiver<WsMessage>,
pub tx: Sender<Event>,
rx: Receiver<Event>,
pool: PgPool,
clients: HashMap<Id, WsClient>,
}
#[derive(Debug,Clone)]
pub enum Event {
WsConnect(Id, Option<Account>, Sender<RpcMessage>),
WsDisconnect(Id),
WsSubscribe(Id, Uuid),
WsUnsubscribe(Id, Uuid),
}
struct WsClient {
id: Id,
tx: Sender<RpcMessage>,
subs: HashSet<Uuid>,
}
impl Events {
pub fn new(pool: PgPool) -> Events {
let (tx, rx) = unbounded();
return Events { tx, rx, pool };
Events {
tx,
rx,
pool,
clients: HashMap::new(),
}
}
pub fn listen(&mut self) -> Result<(), Error> {
@ -32,6 +56,9 @@ impl Events {
Ok(m) => {
self.on_message(m)?;
},
// idk if this is a good idea
// possibly just log errors and continue...
Err(e) => {
return Err(format_err!("events error err={:?}", e));
},
@ -39,17 +66,50 @@ impl Events {
}
}
fn on_message(&mut self, msg: WsMessage) -> Result<(), Error> {
fn on_message(&mut self, msg: Event) -> Result<(), Error> {
match msg {
WsMessage::Connect(tx) => {
info!("client connected {:?}", tx);
Event::WsConnect(id, account, tx) => {
info!("client connected to events id={:?} account={:?}", id, account);
let client = WsClient { id, tx, subs: HashSet::new() };
self.clients.insert(id, client);
info!("events clients={:?}", self.clients.len());
Ok(())
},
WsMessage::Disconnect => {
info!("client disconnected");
Event::WsDisconnect(id) => {
info!("client disconnected from events id={:?}", id);
self.clients.remove(&id);
info!("events clients={:?}", self.clients.len());
Ok(())
}
_ => return Err(format_err!("events received unhandled msg={:?}", msg)),
Event::WsSubscribe(id, obj) => {
info!("client subscribed to updates from object id={:?} object={:?}", id, obj);
match self.clients.get_mut(&id) {
Some(client) => {
client.subs.insert(obj);
info!("client subscriptions={:?}", client.subs.len());
Ok(())
},
None => return Err(format_err!("unknown client {:?}", id))
}
},
Event::WsUnsubscribe(id, obj) => {
info!("client subscribed to updates from object id={:?} object={:?}", id, obj);
match self.clients.get_mut(&id) {
Some(mut client) => {
client.subs.remove(&obj);
info!("client subscriptions={:?}", client.subs.len());
Ok(())
},
None => return Err(format_err!("unknown client {:?}", id))
}
},
}
}
}

View File

@ -14,37 +14,55 @@ use ws::{ listen, CloseCode, Message, Handler, Result, Request, Response};
use account;
use account::{Account};
use pg::{PgPool};
use events::Event;
use rpc::{RpcMessage};
use rpc;
use mtx;
use net::TOKEN_HEADER;
// these are messages relating to the lifecycle
// of ws clients
#[derive(Debug,Clone)]
pub enum WsMessage {
Connect(CbSender<RpcMessage>),
Disconnect,
}
struct Connection {
pub id: usize,
pub ws: CbSender<RpcMessage>,
pool: PgPool,
account: Option<Account>,
events: CbSender<WsMessage>,
events: CbSender<Event>,
}
// we unwrap everything in here cause really
// we don't care if this panics
// it's run in a thread so it's supposed to bail
// when it encounters errors
impl Handler for Connection {
fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> {
info!("connected account={:?}", self.account);
if let Some(ref a) = self.account {
self.ws.send(RpcMessage::AccountState(a.clone())).unwrap();
}
info!("websocket connected account={:?}", self.account);
// tell events we have connected
self.events.send(WsMessage::Connect(self.ws.clone()));
self.events.send(Event::WsConnect(self.id, self.account.clone(), self.ws.clone())).unwrap();
// if user logged in do some prep work
if let Some(ref a) = self.account {
self.ws.send(RpcMessage::AccountState(a.clone())).unwrap();
self.events.send(Event::WsSubscribe(self.id, a.id)).unwrap();
let db = self.pool.get().unwrap();
let mut tx = db.transaction().unwrap();
// send account constructs
let account_constructs = account::account_constructs(&mut tx, a).unwrap();
self.ws.send(rpc::RpcMessage::AccountConstructs(account_constructs)).unwrap();
// get account instances
// and send them to the client
let account_instances = account::account_instances(&mut tx, a).unwrap();
self.ws.send(rpc::RpcMessage::AccountInstances(account_instances)).unwrap();
let shop = mtx::account_shop(&mut tx, &a).unwrap();
self.ws.send(rpc::RpcMessage::AccountShop(shop)).unwrap();
// tx should do nothing
tx.commit().unwrap();
}
Ok(())
}
@ -57,6 +75,18 @@ impl Handler for Connection {
match rpc::receive(msg, &db_connection, begin, &self.account) {
Ok(reply) => {
// if the user queries the state of something
// we tell events to push updates to them
match reply {
RpcMessage::AccountState(ref v) =>
self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(),
RpcMessage::GameState(ref v) =>
self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(),
RpcMessage::InstanceState(ref v) =>
self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(),
_ => (),
};
self.ws.send(reply).unwrap();
},
Err(e) => {
@ -71,8 +101,8 @@ impl Handler for Connection {
}
fn on_close(&mut self, _: CloseCode, _: &str) {
info!("socket disconnected account={:?}", self.account);
// self.ws.shutdown().unwrap()
info!("websocket disconnected account={:?}", self.account);
self.events.send(Event::WsDisconnect(self.id)).unwrap();
}
fn on_request(&mut self, req: &Request) -> Result<Response> {
@ -107,7 +137,7 @@ impl Handler for Connection {
}
pub fn start(pool: PgPool, events_tx: CbSender<WsMessage>) {
pub fn start(pool: PgPool, events_tx: CbSender<Event>) {
let mut rng = thread_rng();
listen("127.0.0.1:40055", move |out| {