From 4d3e4a843e49d46f5ef8f5e6dda743a4c47c8cf8 Mon Sep 17 00:00:00 2001 From: ntr Date: Tue, 16 Jul 2019 21:45:54 +1000 Subject: [PATCH] account state pubsub --- client/assets/styles/styles.css | 3 ++ client/src/components/nav.jsx | 2 +- server/src/account.rs | 4 +- server/src/main.rs | 8 +++- server/src/pubsub.rs | 75 ++++++++++++++++++++++++++++++--- server/src/ws.rs | 56 +++++++++++++++++++++--- 6 files changed, 131 insertions(+), 17 deletions(-) diff --git a/client/assets/styles/styles.css b/client/assets/styles/styles.css index 1ba2ee35..a222660a 100644 --- a/client/assets/styles/styles.css +++ b/client/assets/styles/styles.css @@ -528,6 +528,9 @@ header { color: black; } +.play-btn { + font-size: 150%; +} .refresh-btn { border: 1px solid #222; diff --git a/client/src/components/nav.jsx b/client/src/components/nav.jsx index 313ea2c7..30fd9981 100644 --- a/client/src/components/nav.jsx +++ b/client/src/components/nav.jsx @@ -138,7 +138,7 @@ function Nav(args) {

mnml.gg


- +
{joined} {haxSection} diff --git a/server/src/account.rs b/server/src/account.rs index a8bc95f7..7232351a 100644 --- a/server/src/account.rs +++ b/server/src/account.rs @@ -28,14 +28,14 @@ pub struct Account { pub subscribed: bool, } -pub fn select(tx: &mut Transaction, id: Uuid) -> Result { +pub fn select(db: &Db, id: Uuid) -> Result { let query = " SELECT id, name, balance, subscribed FROM accounts WHERE id = $1; "; - let result = tx + let result = db .query(query, &[&id])?; let row = result.iter().next() diff --git a/server/src/main.rs b/server/src/main.rs index 22f0bf2c..7434ad6f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -54,6 +54,8 @@ use std::thread::{sleep, spawn}; use std::time::{Duration}; use std::path::{Path}; +use crossbeam_channel::{unbounded}; + use pubsub::pg_listen; use warden::warden; @@ -88,6 +90,8 @@ fn main() { let warden_pool = pool.clone(); let pubsub_pool = pool.clone(); + let (pss, psr) = unbounded(); + spawn(move || { loop { let db_connection = warden_pool.get().expect("unable to get db connection"); @@ -100,13 +104,13 @@ fn main() { spawn(move || loop { let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); - match pg_listen(pubsub_conn) { + match pg_listen(pubsub_conn, pss.clone()) { Ok(_) => warn!("pg listen closed"), Err(e) => warn!("pg_listen error {:?}", e), } }); spawn(move || net::start(http_pool)); - ws::start(ws_pool); + ws::start(ws_pool, psr); info!("server started"); } diff --git a/server/src/pubsub.rs b/server/src/pubsub.rs index 5dd31d16..dd8e0a1c 100644 --- a/server/src/pubsub.rs +++ b/server/src/pubsub.rs @@ -1,18 +1,81 @@ // Db Commons +use uuid::Uuid; use fallible_iterator::{FallibleIterator}; -use postgres::error::Error; + +use failure::Error; +use failure::err_msg; + +use crossbeam_channel::{Sender}; use pg::{Db}; +use account; +use game; +use instance; -pub fn pg_listen(connection: Db) -> Result<(), Error> { - connection.execute("LISTEN events;", &[])?; +#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] +#[serde(rename_all(deserialize = "lowercase"))] +enum Table { + Accounts, + Constructs, + Instances, + Mtx, + Players, + Games, +} + +#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] +#[serde(rename_all(deserialize = "UPPERCASE"))] +enum Action { + Insert, + Update, + Delete, +} + +#[derive(Debug,Clone,Serialize,Deserialize)] +struct Notification { + table: Table, + action: Action, + id: Uuid, +} + +#[derive(Debug,Clone)] +pub enum Message { + Account(account::Account), + Game(game::Game), + Instance(instance::Instance), +} + +fn handle_notification(n: Notification, db: &Db, pss: &Sender) -> Result<(), Error> { + info!("pubsub received notification notification={:?}", n); + + let msg = match n.table { + Table::Accounts => Message::Account(account::select(db, n.id)?), + _ => unimplemented!(), + }; + + match pss.try_send(msg.clone()) { + Ok(()) => info!("pubsub message sent message={:?}", msg), + Err(e) => warn!("pubsub delivery failure err={:?}", e), + }; + + Ok(()) +} + +pub fn pg_listen(db: Db, pss: Sender) -> Result<(), Error> { + db.execute("LISTEN events;", &[])?; info!("pubsub listening"); - let notifications = connection.notifications(); + let notifications = db.notifications(); let mut n_iter = notifications.blocking_iter(); loop { - let n = n_iter.next()?; + let n = n_iter.next().unwrap(); if let Some(n) = n { - info!("{:?}", n); + match serde_json::from_str::(&n.payload) { + Ok(notification) => match handle_notification(notification, &db, &pss) { + Ok(()) => (), + Err(e) => warn!("{:?}", e), + } + Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e), + }; } } } diff --git a/server/src/ws.rs b/server/src/ws.rs index 7a759246..ed59b821 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -1,37 +1,67 @@ use std::time::{Instant}; -use std::net::{TcpListener}; +use std::net::{TcpStream, TcpListener}; use std::thread::{spawn}; use std::str; +use uuid::Uuid; + use cookie::Cookie; use tungstenite::server::accept_hdr; use tungstenite::Message::Binary; use tungstenite::handshake::server::{Request, ErrorResponse}; use tungstenite::http::StatusCode; +use tungstenite::protocol::WebSocket; -use crossbeam_channel::{unbounded, Sender}; - -use failure::Error; -use failure::err_msg; +use crossbeam_channel::{unbounded, Receiver}; use serde_cbor::{to_vec}; use net::TOKEN_HEADER; use rpc; + use mtx; use pg::PgPool; use account; +use pubsub::Message; #[derive(Debug,Clone,Serialize)] struct RpcError { err: String, } -pub fn start(pool: PgPool) { +#[derive(Debug)] +struct Subscriptions { + account: Option, + games: Vec, + instances: Vec, +} + +fn handle_message(subs: &Subscriptions, m: Message, ws: &mut WebSocket) { + if let Some(msg) = match m { + Message::Account(a) => { + match (a.id, &subs.account) { + (id, Some(b)) => { + if id == *b { + Some(rpc::RpcResult::AccountState(a)) + } else { + None + } + } + _ => None, + } + }, + _ => None, + } { + ws.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); + } +} + +pub fn start(pool: PgPool, psr: Receiver) { let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap(); for stream in ws_server.incoming() { let ws_pool = pool.clone(); + let ws_psr = psr.clone(); spawn(move || { let (acc_s, acc_r) = unbounded(); @@ -94,6 +124,15 @@ pub fn start(pool: PgPool) { None => None, }; + let mut subs = Subscriptions { + account: match account.as_ref() { + Some(a) => Some(a.id), + None => None, + }, + games: vec![], + instances: vec![], + }; + loop { match websocket.read_message() { Ok(msg) => { @@ -136,6 +175,11 @@ pub fn start(pool: PgPool) { return; } }; + + match ws_psr.try_recv() { + Ok(n) => handle_message(&subs, n, &mut websocket), + Err(_) => (), + }; } }); }