diff --git a/server/src/events.rs b/server/src/events.rs index bb19f8bc..d47d412e 100644 --- a/server/src/events.rs +++ b/server/src/events.rs @@ -26,12 +26,13 @@ pub struct Events { #[derive(Debug,Clone)] pub enum Event { - WsConnect(Id, Option, Sender), - WsDisconnect(Id), - WsSubscribe(Id, Uuid), - WsUnsubscribe(Id, Uuid), -} + Connect(Id, Option, Sender), + Disconnect(Id), + Subscribe(Id, Uuid), + Unsubscribe(Id, Uuid), + Push(Uuid, RpcMessage), +} struct WsClient { id: Id, @@ -54,7 +55,7 @@ impl Events { loop { match self.rx.recv() { Ok(m) => { - self.on_message(m)?; + self.on_event(m)?; }, // idk if this is a good idea @@ -66,9 +67,13 @@ impl Events { } } - fn on_message(&mut self, msg: Event) -> Result<(), Error> { + fn remove_client(&mut self, id: Id) { + self.clients.remove(&id); + } + + fn on_event(&mut self, msg: Event) -> Result<(), Error> { match msg { - Event::WsConnect(id, account, tx) => { + Event::Connect(id, account, tx) => { info!("client connected to events id={:?} account={:?}", id, account); let client = WsClient { id, tx, subs: HashSet::new() }; @@ -77,7 +82,7 @@ impl Events { info!("events clients={:?}", self.clients.len()); Ok(()) }, - Event::WsDisconnect(id) => { + Event::Disconnect(id) => { info!("client disconnected from events id={:?}", id); self.clients.remove(&id); @@ -85,7 +90,7 @@ impl Events { info!("events clients={:?}", self.clients.len()); Ok(()) } - Event::WsSubscribe(id, obj) => { + Event::Subscribe(id, obj) => { info!("client subscribed to updates from object id={:?} object={:?}", id, obj); match self.clients.get_mut(&id) { @@ -97,7 +102,7 @@ impl Events { None => return Err(format_err!("unknown client {:?}", id)) } }, - Event::WsUnsubscribe(id, obj) => { + Event::Unsubscribe(id, obj) => { info!("client subscribed to updates from object id={:?} object={:?}", id, obj); match self.clients.get_mut(&id) { @@ -110,6 +115,27 @@ impl Events { } }, + Event::Push(id, msg) => { + info!("events received push notification id={:?} msg={:?}", id, msg); + + let mut subs = 0; + for (_client_id, client) in self.clients.iter() { + if client.subs.contains(&id) { + subs += 1; + match client.tx.send(msg.clone()) { + Ok(_) => (), + Err(e) => { + warn!("unable to send msg to client err={:?}", e); + // self.remove_client(*client_id); + }, + }; + } + } + + info!("notification subscribers {:?}", subs); + + Ok(()) + }, } } } diff --git a/server/src/main.rs b/server/src/main.rs index 2206df8f..a53119bb 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -100,8 +100,13 @@ fn main() { // create a clone of the tx so ws handler can tell events // about connection status + // TODO store as an Arc> and make cpuN threads let mut events = events::Events::new(pool.clone()); let ws_events_tx = events.tx.clone(); + + let pg_pool = pool.clone(); + let pg_events = events.tx.clone(); + spawn(move || pg::listen(pg_pool, pg_events)); spawn(move || events.listen()); // the main thread becomes this ws listener diff --git a/server/src/pg.rs b/server/src/pg.rs index b4cc5f3f..88e05a87 100644 --- a/server/src/pg.rs +++ b/server/src/pg.rs @@ -1,4 +1,5 @@ use std::env; +use std::thread::spawn; use uuid::Uuid; @@ -9,7 +10,13 @@ use r2d2::{PooledConnection}; use r2d2_postgres::{TlsMode, PostgresConnectionManager}; use fallible_iterator::{FallibleIterator}; -use events::Events; +use crossbeam_channel::{Sender}; + +use events::{Event}; +use account; +use game; +use instance; +use rpc::RpcMessage; pub type Db = PooledConnection; pub type PgPool = Pool; @@ -55,56 +62,69 @@ pub fn create_pool() -> Pool { .expect("Failed to create pool.") } -// fn handle_notification(n: Notification, db: &Db, events: &Events) -> Result<(), Error> { -// info!("events received notification notification={:?}", n); +fn handle_notification(n: Notification, pool: &PgPool, events: &Sender) { + info!("pg received notification={:?}", n); -// // maybe we need it -// let mut tx = db.transaction()?; + // bang out a thread to do the slow work of fetching the state from db + // the thread will notify events -// let msg = match n.action { -// Action::Delete => return Err(format_err!("unimplemented delete notification {:?}", n)), -// Action::Insert => return Err(format_err!("unimplemented insert notification {:?}", n)), -// Action::Update => match n.table { -// Table::Accounts => Message::Account(account::select(db, n.id)?), -// Table::Instances => Message::Instance(instance::instance_get(&mut tx, n.id)?), -// Table::Games => Message::Game(game::game_get(&mut tx, n.id)?), -// _ => return Err(format_err!("unimplemented update notification {:?}", n)), -// }, -// }; + let pool = pool.clone(); + let events = events.clone(); + spawn(move || { + // maybe we need it + let db = pool.get().unwrap(); + let mut tx = db.transaction().unwrap(); -// tx.commit()?; + let msg = match n.action { + Action::Delete => { + warn!("unimplemented delete notification {:?}", n); + None + }, + Action::Insert => { + warn!("unimplemented insert notification {:?}", n); + None + }, + Action::Update => match n.table { + Table::Accounts => + Some(Event::Push(n.id, RpcMessage::AccountState(account::select(&db, n.id).unwrap()))), + Table::Instances => + Some(Event::Push(n.id, RpcMessage::InstanceState(instance::instance_get(&mut tx, n.id).unwrap()))), + Table::Games => + Some(Event::Push(n.id, RpcMessage::GameState(game::game_get(&mut tx, n.id).unwrap()))), + _ => { + warn!("unimplemented update notification {:?}", n); + None + }, + }, + }; -// info!("got a msg to send to whoever cares {:?}", msg); + tx.commit().unwrap(); -// // match events.try_send(msg.clone()) { -// // Ok(()) => info!("events message sent message={:?}", msg), -// // Err(e) => warn!("events delivery failure err={:?}", e), -// // }; + if let Some(msg) = msg { + events.send(msg).unwrap(); + } + }); +} -// Ok(()) -// } +// this function gets a dedicated connection +// because it has to subscribe and listen for notifications +pub fn listen(pool: PgPool, events: Sender) -> Result<(), Error> { + let db = pool.get()?; + db.execute("LISTEN events;", &[])?; + info!("pg listening"); + let notifications = db.notifications(); + let mut n_iter = notifications.blocking_iter(); -// pub fn listen(pool: &PgPool, events: &mut Events) -> Result<(), Error> { -// let db = pool.get()?; -// db.execute("LISTEN events;", &[])?; -// info!("events listening"); - -// let notifications = db.notifications(); -// let mut n_iter = notifications.blocking_iter(); - -// // main event loop, checks pg and checks messages -// loop { -// // check notifications -// let n = n_iter.next()?; -// if let Some(n) = n { -// match serde_json::from_str::(&n.payload) { -// Ok(notification) => match handle_notification(notification, &db, &events) { -// Ok(()) => (), -// Err(e) => warn!("{:?}", e), -// } -// Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e), -// }; -// } -// } -// } \ No newline at end of file + // main event loop, checks pg and checks messages + loop { + // check notifications + let n = n_iter.next()?; + if let Some(n) = n { + match serde_json::from_str::(&n.payload) { + Ok(notification) => handle_notification(notification, &pool, &events), + Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e), + }; + } + } +} diff --git a/server/src/websocket.rs b/server/src/websocket.rs index 8b12004e..0c78b977 100644 --- a/server/src/websocket.rs +++ b/server/src/websocket.rs @@ -38,12 +38,12 @@ impl Handler for Connection { info!("websocket connected account={:?}", self.account); // tell events we have connected - self.events.send(Event::WsConnect(self.id, self.account.clone(), self.ws.clone())).unwrap(); + self.events.send(Event::Connect(self.id, self.account.clone(), self.ws.clone())).unwrap(); // if user logged in do some prep work if let Some(ref a) = self.account { self.ws.send(RpcMessage::AccountState(a.clone())).unwrap(); - self.events.send(Event::WsSubscribe(self.id, a.id)).unwrap(); + self.events.send(Event::Subscribe(self.id, a.id)).unwrap(); let db = self.pool.get().unwrap(); let mut tx = db.transaction().unwrap(); @@ -79,11 +79,11 @@ impl Handler for Connection { // we tell events to push updates to them match reply { RpcMessage::AccountState(ref v) => - self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(), + self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), RpcMessage::GameState(ref v) => - self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(), + self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), RpcMessage::InstanceState(ref v) => - self.events.send(Event::WsSubscribe(self.id, v.id)).unwrap(), + self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), _ => (), }; @@ -102,7 +102,7 @@ impl Handler for Connection { fn on_close(&mut self, _: CloseCode, _: &str) { info!("websocket disconnected account={:?}", self.account); - self.events.send(Event::WsDisconnect(self.id)).unwrap(); + self.events.send(Event::Disconnect(self.id)).unwrap(); } fn on_request(&mut self, req: &Request) -> Result {