diff --git a/client/assets/styles/styles.css b/client/assets/styles/styles.css
index 1ba2ee35..a222660a 100644
--- a/client/assets/styles/styles.css
+++ b/client/assets/styles/styles.css
@@ -528,6 +528,9 @@ header {
color: black;
}
+.play-btn {
+ font-size: 150%;
+}
.refresh-btn {
border: 1px solid #222;
diff --git a/client/src/components/nav.jsx b/client/src/components/nav.jsx
index 313ea2c7..30fd9981 100644
--- a/client/src/components/nav.jsx
+++ b/client/src/components/nav.jsx
@@ -138,7 +138,7 @@ function Nav(args) {
-
+
{joined}
{haxSection}
diff --git a/server/src/account.rs b/server/src/account.rs
index a8bc95f7..7232351a 100644
--- a/server/src/account.rs
+++ b/server/src/account.rs
@@ -28,14 +28,14 @@ pub struct Account {
pub subscribed: bool,
}
-pub fn select(tx: &mut Transaction, id: Uuid) -> Result {
+pub fn select(db: &Db, id: Uuid) -> Result {
let query = "
SELECT id, name, balance, subscribed
FROM accounts
WHERE id = $1;
";
- let result = tx
+ let result = db
.query(query, &[&id])?;
let row = result.iter().next()
diff --git a/server/src/main.rs b/server/src/main.rs
index 22f0bf2c..7434ad6f 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -54,6 +54,8 @@ use std::thread::{sleep, spawn};
use std::time::{Duration};
use std::path::{Path};
+use crossbeam_channel::{unbounded};
+
use pubsub::pg_listen;
use warden::warden;
@@ -88,6 +90,8 @@ fn main() {
let warden_pool = pool.clone();
let pubsub_pool = pool.clone();
+ let (pss, psr) = unbounded();
+
spawn(move || {
loop {
let db_connection = warden_pool.get().expect("unable to get db connection");
@@ -100,13 +104,13 @@ fn main() {
spawn(move || loop {
let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection");
- match pg_listen(pubsub_conn) {
+ match pg_listen(pubsub_conn, pss.clone()) {
Ok(_) => warn!("pg listen closed"),
Err(e) => warn!("pg_listen error {:?}", e),
}
});
spawn(move || net::start(http_pool));
- ws::start(ws_pool);
+ ws::start(ws_pool, psr);
info!("server started");
}
diff --git a/server/src/pubsub.rs b/server/src/pubsub.rs
index 5dd31d16..dd8e0a1c 100644
--- a/server/src/pubsub.rs
+++ b/server/src/pubsub.rs
@@ -1,18 +1,81 @@
// Db Commons
+use uuid::Uuid;
use fallible_iterator::{FallibleIterator};
-use postgres::error::Error;
+
+use failure::Error;
+use failure::err_msg;
+
+use crossbeam_channel::{Sender};
use pg::{Db};
+use account;
+use game;
+use instance;
-pub fn pg_listen(connection: Db) -> Result<(), Error> {
- connection.execute("LISTEN events;", &[])?;
+#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
+#[serde(rename_all(deserialize = "lowercase"))]
+enum Table {
+ Accounts,
+ Constructs,
+ Instances,
+ Mtx,
+ Players,
+ Games,
+}
+
+#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
+#[serde(rename_all(deserialize = "UPPERCASE"))]
+enum Action {
+ Insert,
+ Update,
+ Delete,
+}
+
+#[derive(Debug,Clone,Serialize,Deserialize)]
+struct Notification {
+ table: Table,
+ action: Action,
+ id: Uuid,
+}
+
+#[derive(Debug,Clone)]
+pub enum Message {
+ Account(account::Account),
+ Game(game::Game),
+ Instance(instance::Instance),
+}
+
+fn handle_notification(n: Notification, db: &Db, pss: &Sender) -> Result<(), Error> {
+ info!("pubsub received notification notification={:?}", n);
+
+ let msg = match n.table {
+ Table::Accounts => Message::Account(account::select(db, n.id)?),
+ _ => unimplemented!(),
+ };
+
+ match pss.try_send(msg.clone()) {
+ Ok(()) => info!("pubsub message sent message={:?}", msg),
+ Err(e) => warn!("pubsub delivery failure err={:?}", e),
+ };
+
+ Ok(())
+}
+
+pub fn pg_listen(db: Db, pss: Sender) -> Result<(), Error> {
+ db.execute("LISTEN events;", &[])?;
info!("pubsub listening");
- let notifications = connection.notifications();
+ let notifications = db.notifications();
let mut n_iter = notifications.blocking_iter();
loop {
- let n = n_iter.next()?;
+ let n = n_iter.next().unwrap();
if let Some(n) = n {
- info!("{:?}", n);
+ match serde_json::from_str::(&n.payload) {
+ Ok(notification) => match handle_notification(notification, &db, &pss) {
+ Ok(()) => (),
+ Err(e) => warn!("{:?}", e),
+ }
+ Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e),
+ };
}
}
}
diff --git a/server/src/ws.rs b/server/src/ws.rs
index 7a759246..ed59b821 100644
--- a/server/src/ws.rs
+++ b/server/src/ws.rs
@@ -1,37 +1,67 @@
use std::time::{Instant};
-use std::net::{TcpListener};
+use std::net::{TcpStream, TcpListener};
use std::thread::{spawn};
use std::str;
+use uuid::Uuid;
+
use cookie::Cookie;
use tungstenite::server::accept_hdr;
use tungstenite::Message::Binary;
use tungstenite::handshake::server::{Request, ErrorResponse};
use tungstenite::http::StatusCode;
+use tungstenite::protocol::WebSocket;
-use crossbeam_channel::{unbounded, Sender};
-
-use failure::Error;
-use failure::err_msg;
+use crossbeam_channel::{unbounded, Receiver};
use serde_cbor::{to_vec};
use net::TOKEN_HEADER;
use rpc;
+
use mtx;
use pg::PgPool;
use account;
+use pubsub::Message;
#[derive(Debug,Clone,Serialize)]
struct RpcError {
err: String,
}
-pub fn start(pool: PgPool) {
+#[derive(Debug)]
+struct Subscriptions {
+ account: Option,
+ games: Vec,
+ instances: Vec,
+}
+
+fn handle_message(subs: &Subscriptions, m: Message, ws: &mut WebSocket) {
+ if let Some(msg) = match m {
+ Message::Account(a) => {
+ match (a.id, &subs.account) {
+ (id, Some(b)) => {
+ if id == *b {
+ Some(rpc::RpcResult::AccountState(a))
+ } else {
+ None
+ }
+ }
+ _ => None,
+ }
+ },
+ _ => None,
+ } {
+ ws.write_message(Binary(to_vec(&msg).unwrap())).unwrap();
+ }
+}
+
+pub fn start(pool: PgPool, psr: Receiver) {
let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap();
for stream in ws_server.incoming() {
let ws_pool = pool.clone();
+ let ws_psr = psr.clone();
spawn(move || {
let (acc_s, acc_r) = unbounded();
@@ -94,6 +124,15 @@ pub fn start(pool: PgPool) {
None => None,
};
+ let mut subs = Subscriptions {
+ account: match account.as_ref() {
+ Some(a) => Some(a.id),
+ None => None,
+ },
+ games: vec![],
+ instances: vec![],
+ };
+
loop {
match websocket.read_message() {
Ok(msg) => {
@@ -136,6 +175,11 @@ pub fn start(pool: PgPool) {
return;
}
};
+
+ match ws_psr.try_recv() {
+ Ok(n) => handle_message(&subs, n, &mut websocket),
+ Err(_) => (),
+ };
}
});
}