This commit is contained in:
ntr 2019-07-26 17:27:38 +10:00
parent 7c619a8bec
commit 813fc0184f
4 changed files with 114 additions and 63 deletions

View File

@ -26,12 +26,13 @@ pub struct Events {
#[derive(Debug,Clone)]
pub enum Event {
WsConnect(Id, Option<Account>, Sender<RpcMessage>),
WsDisconnect(Id),
WsSubscribe(Id, Uuid),
WsUnsubscribe(Id, Uuid),
}
Connect(Id, Option<Account>, Sender<RpcMessage>),
Disconnect(Id),
Subscribe(Id, Uuid),
Unsubscribe(Id, Uuid),
Push(Uuid, RpcMessage),
}
struct WsClient {
id: Id,
@ -54,7 +55,7 @@ impl Events {
loop {
match self.rx.recv() {
Ok(m) => {
self.on_message(m)?;
self.on_event(m)?;
},
// idk if this is a good idea
@ -66,9 +67,13 @@ impl Events {
}
}
fn on_message(&mut self, msg: Event) -> Result<(), Error> {
fn remove_client(&mut self, id: Id) {
self.clients.remove(&id);
}
fn on_event(&mut self, msg: Event) -> Result<(), Error> {
match msg {
Event::WsConnect(id, account, tx) => {
Event::Connect(id, account, tx) => {
info!("client connected to events id={:?} account={:?}", id, account);
let client = WsClient { id, tx, subs: HashSet::new() };
@ -77,7 +82,7 @@ impl Events {
info!("events clients={:?}", self.clients.len());
Ok(())
},
Event::WsDisconnect(id) => {
Event::Disconnect(id) => {
info!("client disconnected from events id={:?}", id);
self.clients.remove(&id);
@ -85,7 +90,7 @@ impl Events {
info!("events clients={:?}", self.clients.len());
Ok(())
}
Event::WsSubscribe(id, obj) => {
Event::Subscribe(id, obj) => {
info!("client subscribed to updates from object id={:?} object={:?}", id, obj);
match self.clients.get_mut(&id) {
@ -97,7 +102,7 @@ impl Events {
None => return Err(format_err!("unknown client {:?}", id))
}
},
Event::WsUnsubscribe(id, obj) => {
Event::Unsubscribe(id, obj) => {
info!("client subscribed to updates from object id={:?} object={:?}", id, obj);
match self.clients.get_mut(&id) {
@ -110,6 +115,27 @@ impl Events {
}
},
Event::Push(id, msg) => {
info!("events received push notification id={:?} msg={:?}", id, msg);
let mut subs = 0;
for (_client_id, client) in self.clients.iter() {
if client.subs.contains(&id) {
subs += 1;
match client.tx.send(msg.clone()) {
Ok(_) => (),
Err(e) => {
warn!("unable to send msg to client err={:?}", e);
// self.remove_client(*client_id);
},
};
}
}
info!("notification subscribers {:?}", subs);
Ok(())
},
}
}
}

View File

@ -100,8 +100,13 @@ fn main() {
// create a clone of the tx so ws handler can tell events
// about connection status
// TODO store as an Arc<Mutex<Events>> and make cpuN threads
let mut events = events::Events::new(pool.clone());
let ws_events_tx = events.tx.clone();
let pg_pool = pool.clone();
let pg_events = events.tx.clone();
spawn(move || pg::listen(pg_pool, pg_events));
spawn(move || events.listen());
// the main thread becomes this ws listener

View File

@ -1,4 +1,5 @@
use std::env;
use std::thread::spawn;
use uuid::Uuid;
@ -9,7 +10,13 @@ use r2d2::{PooledConnection};
use r2d2_postgres::{TlsMode, PostgresConnectionManager};
use fallible_iterator::{FallibleIterator};
use events::Events;
use crossbeam_channel::{Sender};
use events::{Event};
use account;
use game;
use instance;
use rpc::RpcMessage;
pub type Db = PooledConnection<PostgresConnectionManager>;
pub type PgPool = Pool<PostgresConnectionManager>;
@ -55,56 +62,69 @@ pub fn create_pool() -> Pool<PostgresConnectionManager> {
.expect("Failed to create pool.")
}
// fn handle_notification(n: Notification, db: &Db, events: &Events) -> Result<(), Error> {
// info!("events received notification notification={:?}", n);
fn handle_notification(n: Notification, pool: &PgPool, events: &Sender<Event>) {
info!("pg received notification={:?}", n);
// // maybe we need it
// let mut tx = db.transaction()?;
// bang out a thread to do the slow work of fetching the state from db
// the thread will notify events
// let msg = match n.action {
// Action::Delete => return Err(format_err!("unimplemented delete notification {:?}", n)),
// Action::Insert => return Err(format_err!("unimplemented insert notification {:?}", n)),
// Action::Update => match n.table {
// Table::Accounts => Message::Account(account::select(db, n.id)?),
// Table::Instances => Message::Instance(instance::instance_get(&mut tx, n.id)?),
// Table::Games => Message::Game(game::game_get(&mut tx, n.id)?),
// _ => return Err(format_err!("unimplemented update notification {:?}", n)),
// },
// };
let pool = pool.clone();
let events = events.clone();
spawn(move || {
// maybe we need it
let db = pool.get().unwrap();
let mut tx = db.transaction().unwrap();
// tx.commit()?;
let msg = match n.action {
Action::Delete => {
warn!("unimplemented delete notification {:?}", n);
None
},
Action::Insert => {
warn!("unimplemented insert notification {:?}", n);
None
},
Action::Update => match n.table {
Table::Accounts =>
Some(Event::Push(n.id, RpcMessage::AccountState(account::select(&db, n.id).unwrap()))),
Table::Instances =>
Some(Event::Push(n.id, RpcMessage::InstanceState(instance::instance_get(&mut tx, n.id).unwrap()))),
Table::Games =>
Some(Event::Push(n.id, RpcMessage::GameState(game::game_get(&mut tx, n.id).unwrap()))),
_ => {
warn!("unimplemented update notification {:?}", n);
None
},
},
};
// info!("got a msg to send to whoever cares {:?}", msg);
tx.commit().unwrap();
// // match events.try_send(msg.clone()) {
// // Ok(()) => info!("events message sent message={:?}", msg),
// // Err(e) => warn!("events delivery failure err={:?}", e),
// // };
if let Some(msg) = msg {
events.send(msg).unwrap();
}
});
}
// Ok(())
// }
// this function gets a dedicated connection
// because it has to subscribe and listen for notifications
pub fn listen(pool: PgPool, events: Sender<Event>) -> Result<(), Error> {
let db = pool.get()?;
db.execute("LISTEN events;", &[])?;
info!("pg listening");
let notifications = db.notifications();
let mut n_iter = notifications.blocking_iter();
// pub fn listen(pool: &PgPool, events: &mut Events) -> Result<(), Error> {
// let db = pool.get()?;
// db.execute("LISTEN events;", &[])?;
// info!("events listening");
// let notifications = db.notifications();
// let mut n_iter = notifications.blocking_iter();
// // main event loop, checks pg and checks messages
// loop {
// // check notifications
// let n = n_iter.next()?;
// if let Some(n) = n {
// match serde_json::from_str::<Notification>(&n.payload) {
// Ok(notification) => match handle_notification(notification, &db, &events) {
// Ok(()) => (),
// Err(e) => warn!("{:?}", e),
// }
// Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e),
// };
// }
// }
// }
// main event loop, checks pg and checks messages
loop {
// check notifications
let n = n_iter.next()?;
if let Some(n) = n {
match serde_json::from_str::<Notification>(&n.payload) {
Ok(notification) => handle_notification(notification, &pool, &events),
Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e),
};
}
}
}

View File

@ -38,12 +38,12 @@ impl Handler for Connection {
info!("websocket connected account={:?}", self.account);
// tell events we have connected
self.events.send(Event::WsConnect(self.id, self.account.clone(), self.ws.clone())).unwrap();
self.events.send(Event::Connect(self.id, self.account.clone(), self.ws.clone())).unwrap();
// if user logged in do some prep work
if let Some(ref a) = self.account {
self.ws.send(RpcMessage::AccountState(a.clone())).unwrap();
self.events.send(Event::WsSubscribe(self.id, a.id)).unwrap();
self.events.send(Event::Subscribe(self.id, a.id)).unwrap();
let db = self.pool.get().unwrap();
let mut tx = db.transaction().unwrap();
@ -79,11 +79,11 @@ impl Handler for Connection {
// we tell events to push updates to them
match reply {
RpcMessage::AccountState(ref v) =>
self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(),
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
RpcMessage::GameState(ref v) =>
self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(),
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
RpcMessage::InstanceState(ref v) =>
self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(),
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
_ => (),
};
@ -102,7 +102,7 @@ impl Handler for Connection {
fn on_close(&mut self, _: CloseCode, _: &str) {
info!("websocket disconnected account={:?}", self.account);
self.events.send(Event::WsDisconnect(self.id)).unwrap();
self.events.send(Event::Disconnect(self.id)).unwrap();
}
fn on_request(&mut self, req: &Request) -> Result<Response> {