Merge branch 'develop' of ssh://mnml.gg:40022/~/mnml into develop

This commit is contained in:
Mashy 2019-07-16 21:53:28 +10:00
commit 9edf7635d1
6 changed files with 131 additions and 17 deletions

View File

@ -528,6 +528,9 @@ header {
color: black; color: black;
} }
.play-btn {
font-size: 150%;
}
.refresh-btn { .refresh-btn {
border: 1px solid #222; border: 1px solid #222;

View File

@ -138,7 +138,7 @@ function Nav(args) {
<h1 class="header-title">mnml.gg</h1> <h1 class="header-title">mnml.gg</h1>
<AccountStatus /> <AccountStatus />
<hr /> <hr />
<button disabled={canJoin} onClick={() => navTo('list')}>Play</button> <button class="play-btn" disabled={canJoin} onClick={() => navTo('list')}>Play</button>
<hr /> <hr />
{joined} {joined}
{haxSection} {haxSection}

View File

@ -28,14 +28,14 @@ pub struct Account {
pub subscribed: bool, pub subscribed: bool,
} }
pub fn select(tx: &mut Transaction, id: Uuid) -> Result<Account, Error> { pub fn select(db: &Db, id: Uuid) -> Result<Account, Error> {
let query = " let query = "
SELECT id, name, balance, subscribed SELECT id, name, balance, subscribed
FROM accounts FROM accounts
WHERE id = $1; WHERE id = $1;
"; ";
let result = tx let result = db
.query(query, &[&id])?; .query(query, &[&id])?;
let row = result.iter().next() let row = result.iter().next()

View File

@ -54,6 +54,8 @@ use std::thread::{sleep, spawn};
use std::time::{Duration}; use std::time::{Duration};
use std::path::{Path}; use std::path::{Path};
use crossbeam_channel::{unbounded};
use pubsub::pg_listen; use pubsub::pg_listen;
use warden::warden; use warden::warden;
@ -88,6 +90,8 @@ fn main() {
let warden_pool = pool.clone(); let warden_pool = pool.clone();
let pubsub_pool = pool.clone(); let pubsub_pool = pool.clone();
let (pss, psr) = unbounded();
spawn(move || { spawn(move || {
loop { loop {
let db_connection = warden_pool.get().expect("unable to get db connection"); let db_connection = warden_pool.get().expect("unable to get db connection");
@ -100,13 +104,13 @@ fn main() {
spawn(move || loop { spawn(move || loop {
let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection");
match pg_listen(pubsub_conn) { match pg_listen(pubsub_conn, pss.clone()) {
Ok(_) => warn!("pg listen closed"), Ok(_) => warn!("pg listen closed"),
Err(e) => warn!("pg_listen error {:?}", e), Err(e) => warn!("pg_listen error {:?}", e),
} }
}); });
spawn(move || net::start(http_pool)); spawn(move || net::start(http_pool));
ws::start(ws_pool); ws::start(ws_pool, psr);
info!("server started"); info!("server started");
} }

View File

@ -1,18 +1,81 @@
// Db Commons // Db Commons
use uuid::Uuid;
use fallible_iterator::{FallibleIterator}; use fallible_iterator::{FallibleIterator};
use postgres::error::Error;
use failure::Error;
use failure::err_msg;
use crossbeam_channel::{Sender};
use pg::{Db}; use pg::{Db};
use account;
use game;
use instance;
pub fn pg_listen(connection: Db) -> Result<(), Error> { #[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
connection.execute("LISTEN events;", &[])?; #[serde(rename_all(deserialize = "lowercase"))]
enum Table {
Accounts,
Constructs,
Instances,
Mtx,
Players,
Games,
}
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
#[serde(rename_all(deserialize = "UPPERCASE"))]
enum Action {
Insert,
Update,
Delete,
}
#[derive(Debug,Clone,Serialize,Deserialize)]
struct Notification {
table: Table,
action: Action,
id: Uuid,
}
#[derive(Debug,Clone)]
pub enum Message {
Account(account::Account),
Game(game::Game),
Instance(instance::Instance),
}
fn handle_notification(n: Notification, db: &Db, pss: &Sender<Message>) -> Result<(), Error> {
info!("pubsub received notification notification={:?}", n);
let msg = match n.table {
Table::Accounts => Message::Account(account::select(db, n.id)?),
_ => unimplemented!(),
};
match pss.try_send(msg.clone()) {
Ok(()) => info!("pubsub message sent message={:?}", msg),
Err(e) => warn!("pubsub delivery failure err={:?}", e),
};
Ok(())
}
pub fn pg_listen(db: Db, pss: Sender<Message>) -> Result<(), Error> {
db.execute("LISTEN events;", &[])?;
info!("pubsub listening"); info!("pubsub listening");
let notifications = connection.notifications(); let notifications = db.notifications();
let mut n_iter = notifications.blocking_iter(); let mut n_iter = notifications.blocking_iter();
loop { loop {
let n = n_iter.next()?; let n = n_iter.next().unwrap();
if let Some(n) = n { if let Some(n) = n {
info!("{:?}", n); match serde_json::from_str::<Notification>(&n.payload) {
Ok(notification) => match handle_notification(notification, &db, &pss) {
Ok(()) => (),
Err(e) => warn!("{:?}", e),
}
Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e),
};
} }
} }
} }

View File

@ -1,37 +1,67 @@
use std::time::{Instant}; use std::time::{Instant};
use std::net::{TcpListener}; use std::net::{TcpStream, TcpListener};
use std::thread::{spawn}; use std::thread::{spawn};
use std::str; use std::str;
use uuid::Uuid;
use cookie::Cookie; use cookie::Cookie;
use tungstenite::server::accept_hdr; use tungstenite::server::accept_hdr;
use tungstenite::Message::Binary; use tungstenite::Message::Binary;
use tungstenite::handshake::server::{Request, ErrorResponse}; use tungstenite::handshake::server::{Request, ErrorResponse};
use tungstenite::http::StatusCode; use tungstenite::http::StatusCode;
use tungstenite::protocol::WebSocket;
use crossbeam_channel::{unbounded, Sender}; use crossbeam_channel::{unbounded, Receiver};
use failure::Error;
use failure::err_msg;
use serde_cbor::{to_vec}; use serde_cbor::{to_vec};
use net::TOKEN_HEADER; use net::TOKEN_HEADER;
use rpc; use rpc;
use mtx; use mtx;
use pg::PgPool; use pg::PgPool;
use account; use account;
use pubsub::Message;
#[derive(Debug,Clone,Serialize)] #[derive(Debug,Clone,Serialize)]
struct RpcError { struct RpcError {
err: String, err: String,
} }
pub fn start(pool: PgPool) { #[derive(Debug)]
struct Subscriptions {
account: Option<Uuid>,
games: Vec<Uuid>,
instances: Vec<Uuid>,
}
fn handle_message(subs: &Subscriptions, m: Message, ws: &mut WebSocket<TcpStream>) {
if let Some(msg) = match m {
Message::Account(a) => {
match (a.id, &subs.account) {
(id, Some(b)) => {
if id == *b {
Some(rpc::RpcResult::AccountState(a))
} else {
None
}
}
_ => None,
}
},
_ => None,
} {
ws.write_message(Binary(to_vec(&msg).unwrap())).unwrap();
}
}
pub fn start(pool: PgPool, psr: Receiver<Message>) {
let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap(); let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap();
for stream in ws_server.incoming() { for stream in ws_server.incoming() {
let ws_pool = pool.clone(); let ws_pool = pool.clone();
let ws_psr = psr.clone();
spawn(move || { spawn(move || {
let (acc_s, acc_r) = unbounded(); let (acc_s, acc_r) = unbounded();
@ -94,6 +124,15 @@ pub fn start(pool: PgPool) {
None => None, None => None,
}; };
let mut subs = Subscriptions {
account: match account.as_ref() {
Some(a) => Some(a.id),
None => None,
},
games: vec![],
instances: vec![],
};
loop { loop {
match websocket.read_message() { match websocket.read_message() {
Ok(msg) => { Ok(msg) => {
@ -136,6 +175,11 @@ pub fn start(pool: PgPool) {
return; return;
} }
}; };
match ws_psr.try_recv() {
Ok(n) => handle_message(&subs, n, &mut websocket),
Err(_) => (),
};
} }
}); });
} }