account state pubsub

This commit is contained in:
ntr 2019-07-16 21:45:54 +10:00
parent a4436c9669
commit 4d3e4a843e
6 changed files with 131 additions and 17 deletions

View File

@ -528,6 +528,9 @@ header {
color: black;
}
.play-btn {
font-size: 150%;
}
.refresh-btn {
border: 1px solid #222;

View File

@ -138,7 +138,7 @@ function Nav(args) {
<h1 class="header-title">mnml.gg</h1>
<AccountStatus />
<hr />
<button disabled={canJoin} onClick={() => navTo('list')}>Play</button>
<button class="play-btn" disabled={canJoin} onClick={() => navTo('list')}>Play</button>
<hr />
{joined}
{haxSection}

View File

@ -28,14 +28,14 @@ pub struct Account {
pub subscribed: bool,
}
pub fn select(tx: &mut Transaction, id: Uuid) -> Result<Account, Error> {
pub fn select(db: &Db, id: Uuid) -> Result<Account, Error> {
let query = "
SELECT id, name, balance, subscribed
FROM accounts
WHERE id = $1;
";
let result = tx
let result = db
.query(query, &[&id])?;
let row = result.iter().next()

View File

@ -54,6 +54,8 @@ use std::thread::{sleep, spawn};
use std::time::{Duration};
use std::path::{Path};
use crossbeam_channel::{unbounded};
use pubsub::pg_listen;
use warden::warden;
@ -88,6 +90,8 @@ fn main() {
let warden_pool = pool.clone();
let pubsub_pool = pool.clone();
let (pss, psr) = unbounded();
spawn(move || {
loop {
let db_connection = warden_pool.get().expect("unable to get db connection");
@ -100,13 +104,13 @@ fn main() {
spawn(move || loop {
let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection");
match pg_listen(pubsub_conn) {
match pg_listen(pubsub_conn, pss.clone()) {
Ok(_) => warn!("pg listen closed"),
Err(e) => warn!("pg_listen error {:?}", e),
}
});
spawn(move || net::start(http_pool));
ws::start(ws_pool);
ws::start(ws_pool, psr);
info!("server started");
}

View File

@ -1,18 +1,81 @@
// Db Commons
use uuid::Uuid;
use fallible_iterator::{FallibleIterator};
use postgres::error::Error;
use failure::Error;
use failure::err_msg;
use crossbeam_channel::{Sender};
use pg::{Db};
use account;
use game;
use instance;
pub fn pg_listen(connection: Db) -> Result<(), Error> {
connection.execute("LISTEN events;", &[])?;
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
#[serde(rename_all(deserialize = "lowercase"))]
enum Table {
Accounts,
Constructs,
Instances,
Mtx,
Players,
Games,
}
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
#[serde(rename_all(deserialize = "UPPERCASE"))]
enum Action {
Insert,
Update,
Delete,
}
#[derive(Debug,Clone,Serialize,Deserialize)]
struct Notification {
table: Table,
action: Action,
id: Uuid,
}
#[derive(Debug,Clone)]
pub enum Message {
Account(account::Account),
Game(game::Game),
Instance(instance::Instance),
}
fn handle_notification(n: Notification, db: &Db, pss: &Sender<Message>) -> Result<(), Error> {
info!("pubsub received notification notification={:?}", n);
let msg = match n.table {
Table::Accounts => Message::Account(account::select(db, n.id)?),
_ => unimplemented!(),
};
match pss.try_send(msg.clone()) {
Ok(()) => info!("pubsub message sent message={:?}", msg),
Err(e) => warn!("pubsub delivery failure err={:?}", e),
};
Ok(())
}
pub fn pg_listen(db: Db, pss: Sender<Message>) -> Result<(), Error> {
db.execute("LISTEN events;", &[])?;
info!("pubsub listening");
let notifications = connection.notifications();
let notifications = db.notifications();
let mut n_iter = notifications.blocking_iter();
loop {
let n = n_iter.next()?;
let n = n_iter.next().unwrap();
if let Some(n) = n {
info!("{:?}", n);
match serde_json::from_str::<Notification>(&n.payload) {
Ok(notification) => match handle_notification(notification, &db, &pss) {
Ok(()) => (),
Err(e) => warn!("{:?}", e),
}
Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e),
};
}
}
}

View File

@ -1,37 +1,67 @@
use std::time::{Instant};
use std::net::{TcpListener};
use std::net::{TcpStream, TcpListener};
use std::thread::{spawn};
use std::str;
use uuid::Uuid;
use cookie::Cookie;
use tungstenite::server::accept_hdr;
use tungstenite::Message::Binary;
use tungstenite::handshake::server::{Request, ErrorResponse};
use tungstenite::http::StatusCode;
use tungstenite::protocol::WebSocket;
use crossbeam_channel::{unbounded, Sender};
use failure::Error;
use failure::err_msg;
use crossbeam_channel::{unbounded, Receiver};
use serde_cbor::{to_vec};
use net::TOKEN_HEADER;
use rpc;
use mtx;
use pg::PgPool;
use account;
use pubsub::Message;
#[derive(Debug,Clone,Serialize)]
struct RpcError {
err: String,
}
pub fn start(pool: PgPool) {
#[derive(Debug)]
struct Subscriptions {
account: Option<Uuid>,
games: Vec<Uuid>,
instances: Vec<Uuid>,
}
fn handle_message(subs: &Subscriptions, m: Message, ws: &mut WebSocket<TcpStream>) {
if let Some(msg) = match m {
Message::Account(a) => {
match (a.id, &subs.account) {
(id, Some(b)) => {
if id == *b {
Some(rpc::RpcResult::AccountState(a))
} else {
None
}
}
_ => None,
}
},
_ => None,
} {
ws.write_message(Binary(to_vec(&msg).unwrap())).unwrap();
}
}
pub fn start(pool: PgPool, psr: Receiver<Message>) {
let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap();
for stream in ws_server.incoming() {
let ws_pool = pool.clone();
let ws_psr = psr.clone();
spawn(move || {
let (acc_s, acc_r) = unbounded();
@ -94,6 +124,15 @@ pub fn start(pool: PgPool) {
None => None,
};
let mut subs = Subscriptions {
account: match account.as_ref() {
Some(a) => Some(a.id),
None => None,
},
games: vec![],
instances: vec![],
};
loop {
match websocket.read_message() {
Ok(msg) => {
@ -136,6 +175,11 @@ pub fn start(pool: PgPool) {
return;
}
};
match ws_psr.try_recv() {
Ok(n) => handle_message(&subs, n, &mut websocket),
Err(_) => (),
};
}
});
}