From 4726a8c5b33f83dcf2a1da6e80df3ab3bec1565f Mon Sep 17 00:00:00 2001 From: ntr Date: Fri, 26 Jul 2019 15:24:35 +1000 Subject: [PATCH] here we go --- server/src/events.rs | 152 ++++++++-------------------------------- server/src/main.rs | 18 ++--- server/src/pg.rs | 86 ++++++++++++++++++++++- server/src/websocket.rs | 39 ++++++++--- 4 files changed, 153 insertions(+), 142 deletions(-) diff --git a/server/src/events.rs b/server/src/events.rs index bb1ab020..f90a2e2f 100644 --- a/server/src/events.rs +++ b/server/src/events.rs @@ -6,146 +6,54 @@ use failure::Error; use crossbeam_channel::{unbounded, Sender, Receiver}; -use pg::{Db, PgPool}; use account; use game; use instance; - -#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] -#[serde(rename_all(deserialize = "lowercase"))] -enum Table { - Accounts, - Constructs, - Instances, - Mtx, - Players, - Games, -} - -#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] -#[serde(rename_all(deserialize = "UPPERCASE"))] -enum Action { - Insert, - Update, - Delete, -} - -#[derive(Debug,Clone,Serialize,Deserialize)] -struct Notification { - table: Table, - action: Action, - id: Uuid, -} - -#[derive(Debug,Clone)] -pub enum Message { - // outgoing - Account(account::Account), - Game(game::Game), - Instance(instance::Instance), - - // incoming - Connect(Sender), - Disconnect, -} +use pg::{Db, PgPool}; +use rpc::RpcMessage; +use websocket::{WsMessage}; #[derive(Clone)] pub struct Events { - pub tx: Sender, - rx: Receiver, + pub tx: Sender, + rx: Receiver, + pool: PgPool, } impl Events { - pub fn new() -> Events { + pub fn new(pool: PgPool) -> Events { let (tx, rx) = unbounded(); - return Events { tx, rx }; + return Events { tx, rx, pool }; } - fn receive(&mut self) { - match self.rx.try_recv() { - Ok(m) => self.on_message(m), - Err(_) => (), - } - } - - fn on_message(&mut self, msg: Message) { - match msg { - Message::Connect(tx) => { - info!("client connected {:?}", tx); - }, - Message::Disconnect => { - info!("client disconnected"); - } - _ => panic!("events received unhandled msg={:?}", msg), - } - } -} - -fn pg_notification(n: Notification, db: &Db, events: &Events) -> Result<(), Error> { - info!("events received notification notification={:?}", n); - - // maybe we need it - let mut tx = db.transaction()?; - - let msg = match n.action { - Action::Delete => return Err(format_err!("unimplemented delete notification {:?}", n)), - Action::Insert => return Err(format_err!("unimplemented insert notification {:?}", n)), - Action::Update => match n.table { - Table::Accounts => Message::Account(account::select(db, n.id)?), - Table::Instances => Message::Instance(instance::instance_get(&mut tx, n.id)?), - Table::Games => Message::Game(game::game_get(&mut tx, n.id)?), - _ => return Err(format_err!("unimplemented update notification {:?}", n)), - }, - }; - - tx.commit()?; - - info!("got a msg to send to whoever cares {:?}", msg); - - // match events.try_send(msg.clone()) { - // Ok(()) => info!("events message sent message={:?}", msg), - // Err(e) => warn!("events delivery failure err={:?}", e), - // }; - - Ok(()) -} - -pub fn listen(pool: &PgPool, events: &mut Events) -> Result<(), Error> { - let db = pool.get()?; - db.execute("LISTEN events;", &[])?; - info!("events listening"); - - let notifications = db.notifications(); - let mut n_iter = notifications.blocking_iter(); - - // main event loop, checks pg and checks messages - loop { - // check notifications - let n = n_iter.next()?; - if let Some(n) = n { - match serde_json::from_str::(&n.payload) { - Ok(notification) => match pg_notification(notification, &db, &events) { - Ok(()) => (), - Err(e) => warn!("{:?}", e), - } - Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e), + pub fn listen(&mut self) -> Result<(), Error> { + loop { + match self.rx.recv() { + Ok(m) => { + self.on_message(m)?; + }, + Err(e) => { + return Err(format_err!("events error err={:?}", e)); + }, }; } - - events.receive(); } -} -pub fn start(pool: PgPool, mut events: Events) { - loop { - match listen(&pool, &mut events) { - Ok(()) => panic!("events listen returned"), - Err(e) => warn!("events listener error err={:?}", e), + fn on_message(&mut self, msg: WsMessage) -> Result<(), Error> { + match msg { + WsMessage::Connect(tx) => { + info!("client connected {:?}", tx); + Ok(()) + }, + WsMessage::Disconnect => { + info!("client disconnected"); + Ok(()) + } + _ => return Err(format_err!("events received unhandled msg={:?}", msg)), } } } - // #[derive(Debug)] // struct Subscriptions { // account: Option, @@ -240,4 +148,6 @@ pub fn start(pool: PgPool, mut events: Events) { // } { // ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); // } -// } \ No newline at end of file +// } + + diff --git a/server/src/main.rs b/server/src/main.rs index ebb549ce..2206df8f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -55,7 +55,7 @@ use std::thread::{sleep, spawn}; use std::time::{Duration}; use std::path::{Path}; -use events::{start as events_start}; +use events::Events; use warden::warden; fn setup_logger() -> Result<(), fern::InitError> { @@ -95,16 +95,16 @@ fn main() { } }); - let events = events::Events::new(); - let event_listener = events.clone(); - let events_pool = pool.clone(); - // spawn(move || events_start(events_pool, event_listener)); - let http_pool = pool.clone(); spawn(move || net::start(http_pool)); - let ws_pool = pool.clone(); - websocket::start(ws_pool, events); + // create a clone of the tx so ws handler can tell events + // about connection status + let mut events = events::Events::new(pool.clone()); + let ws_events_tx = events.tx.clone(); + spawn(move || events.listen()); - info!("server started"); + // the main thread becomes this ws listener + let ws_pool = pool.clone(); + websocket::start(ws_pool, ws_events_tx); } diff --git a/server/src/pg.rs b/server/src/pg.rs index 0b0fe2db..b4cc5f3f 100644 --- a/server/src/pg.rs +++ b/server/src/pg.rs @@ -1,16 +1,46 @@ use std::env; +use uuid::Uuid; + +use failure::Error; + use r2d2::{Pool}; use r2d2::{PooledConnection}; use r2d2_postgres::{TlsMode, PostgresConnectionManager}; +use fallible_iterator::{FallibleIterator}; -// use postgres::transaction::Transaction; +use events::Events; pub type Db = PooledConnection; pub type PgPool = Pool; const DB_POOL_SIZE: u32 = 20; +#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] +#[serde(rename_all(deserialize = "lowercase"))] +enum Table { + Accounts, + Constructs, + Instances, + Mtx, + Players, + Games, +} + +#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] +#[serde(rename_all(deserialize = "UPPERCASE"))] +enum Action { + Insert, + Update, + Delete, +} + +#[derive(Debug,Clone,Serialize,Deserialize)] +struct Notification { + table: Table, + action: Action, + id: Uuid, +} pub fn create_pool() -> Pool { let url = env::var("DATABASE_URL") @@ -24,3 +54,57 @@ pub fn create_pool() -> Pool { .build(manager) .expect("Failed to create pool.") } + +// fn handle_notification(n: Notification, db: &Db, events: &Events) -> Result<(), Error> { +// info!("events received notification notification={:?}", n); + +// // maybe we need it +// let mut tx = db.transaction()?; + +// let msg = match n.action { +// Action::Delete => return Err(format_err!("unimplemented delete notification {:?}", n)), +// Action::Insert => return Err(format_err!("unimplemented insert notification {:?}", n)), +// Action::Update => match n.table { +// Table::Accounts => Message::Account(account::select(db, n.id)?), +// Table::Instances => Message::Instance(instance::instance_get(&mut tx, n.id)?), +// Table::Games => Message::Game(game::game_get(&mut tx, n.id)?), +// _ => return Err(format_err!("unimplemented update notification {:?}", n)), +// }, +// }; + +// tx.commit()?; + +// info!("got a msg to send to whoever cares {:?}", msg); + +// // match events.try_send(msg.clone()) { +// // Ok(()) => info!("events message sent message={:?}", msg), +// // Err(e) => warn!("events delivery failure err={:?}", e), +// // }; + +// Ok(()) +// } + + +// pub fn listen(pool: &PgPool, events: &mut Events) -> Result<(), Error> { +// let db = pool.get()?; +// db.execute("LISTEN events;", &[])?; +// info!("events listening"); + +// let notifications = db.notifications(); +// let mut n_iter = notifications.blocking_iter(); + +// // main event loop, checks pg and checks messages +// loop { +// // check notifications +// let n = n_iter.next()?; +// if let Some(n) = n { +// match serde_json::from_str::(&n.payload) { +// Ok(notification) => match handle_notification(notification, &db, &events) { +// Ok(()) => (), +// Err(e) => warn!("{:?}", e), +// } +// Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e), +// }; +// } +// } +// } \ No newline at end of file diff --git a/server/src/websocket.rs b/server/src/websocket.rs index 4f1655de..b48d593d 100644 --- a/server/src/websocket.rs +++ b/server/src/websocket.rs @@ -2,31 +2,37 @@ use std::time::{Instant}; use std::thread::spawn; use std::str; +use rand::prelude::*; + use serde_cbor::to_vec; use cookie::Cookie; -use crossbeam_channel::{unbounded, Sender as CbSender, Receiver as CbReceiver}; -use ws::{ listen, CloseCode, Message, Sender, Handler, Handshake, Result, Request, Response}; +use crossbeam_channel::{unbounded, Sender as CbSender}; +use ws::{ listen, CloseCode, Message, Handler, Result, Request, Response}; use account; use account::{Account}; use pg::{PgPool}; -use events::{Events}; + use rpc::{RpcMessage}; use rpc; use net::TOKEN_HEADER; -#[derive(Debug,Clone,Serialize)] -struct WsError { - err: String, +// these are messages relating to the lifecycle +// of ws clients +#[derive(Debug,Clone)] +pub enum WsMessage { + Connect(CbSender), + Disconnect, } struct Connection { - ws: CbSender, + pub id: usize, + pub ws: CbSender, pool: PgPool, - events: Events, account: Option, + events: CbSender, } impl Handler for Connection { @@ -37,6 +43,9 @@ impl Handler for Connection { self.ws.send(RpcMessage::AccountState(a.clone())).unwrap(); } + // tell events we have connected + self.events.send(WsMessage::Connect(self.ws.clone())); + Ok(()) } @@ -98,8 +107,16 @@ impl Handler for Connection { } -pub fn start(pool: PgPool, events: Events) { +pub fn start(pool: PgPool, events_tx: CbSender) { + let mut rng = thread_rng(); listen("127.0.0.1:40055", move |out| { + + // we give the tx half to the connection object + // which in turn passes a clone to the events system + // the rx half goes into a thread where it waits for messages + // that need to be delivered to the client + // both the ws message handler and the events thread must use + // this channel to send messages let (tx, rx) = unbounded::(); spawn(move || { @@ -118,11 +135,11 @@ pub fn start(pool: PgPool, events: Events) { }); Connection { + id: rng.gen::(), account: None, ws: tx, pool: pool.clone(), - events: events.clone(), + events: events_tx.clone(), } - }).unwrap(); }