here we go

This commit is contained in:
ntr 2019-07-26 15:24:35 +10:00
parent 8adcd08daf
commit 4726a8c5b3
4 changed files with 153 additions and 142 deletions

View File

@ -6,146 +6,54 @@ use failure::Error;
use crossbeam_channel::{unbounded, Sender, Receiver}; use crossbeam_channel::{unbounded, Sender, Receiver};
use pg::{Db, PgPool};
use account; use account;
use game; use game;
use instance; use instance;
use pg::{Db, PgPool};
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] use rpc::RpcMessage;
#[serde(rename_all(deserialize = "lowercase"))] use websocket::{WsMessage};
enum Table {
Accounts,
Constructs,
Instances,
Mtx,
Players,
Games,
}
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
#[serde(rename_all(deserialize = "UPPERCASE"))]
enum Action {
Insert,
Update,
Delete,
}
#[derive(Debug,Clone,Serialize,Deserialize)]
struct Notification {
table: Table,
action: Action,
id: Uuid,
}
#[derive(Debug,Clone)]
pub enum Message {
// outgoing
Account(account::Account),
Game(game::Game),
Instance(instance::Instance),
// incoming
Connect(Sender<Message>),
Disconnect,
}
#[derive(Clone)] #[derive(Clone)]
pub struct Events { pub struct Events {
pub tx: Sender<Message>, pub tx: Sender<WsMessage>,
rx: Receiver<Message>, rx: Receiver<WsMessage>,
pool: PgPool,
} }
impl Events { impl Events {
pub fn new() -> Events { pub fn new(pool: PgPool) -> Events {
let (tx, rx) = unbounded(); let (tx, rx) = unbounded();
return Events { tx, rx }; return Events { tx, rx, pool };
} }
fn receive(&mut self) { pub fn listen(&mut self) -> Result<(), Error> {
match self.rx.try_recv() { loop {
Ok(m) => self.on_message(m), match self.rx.recv() {
Err(_) => (), Ok(m) => {
} self.on_message(m)?;
}
fn on_message(&mut self, msg: Message) {
match msg {
Message::Connect(tx) => {
info!("client connected {:?}", tx);
}, },
Message::Disconnect => { Err(e) => {
info!("client disconnected"); return Err(format_err!("events error err={:?}", e));
}
_ => panic!("events received unhandled msg={:?}", msg),
}
}
}
fn pg_notification(n: Notification, db: &Db, events: &Events) -> Result<(), Error> {
info!("events received notification notification={:?}", n);
// maybe we need it
let mut tx = db.transaction()?;
let msg = match n.action {
Action::Delete => return Err(format_err!("unimplemented delete notification {:?}", n)),
Action::Insert => return Err(format_err!("unimplemented insert notification {:?}", n)),
Action::Update => match n.table {
Table::Accounts => Message::Account(account::select(db, n.id)?),
Table::Instances => Message::Instance(instance::instance_get(&mut tx, n.id)?),
Table::Games => Message::Game(game::game_get(&mut tx, n.id)?),
_ => return Err(format_err!("unimplemented update notification {:?}", n)),
}, },
}; };
}
}
tx.commit()?; fn on_message(&mut self, msg: WsMessage) -> Result<(), Error> {
match msg {
info!("got a msg to send to whoever cares {:?}", msg); WsMessage::Connect(tx) => {
info!("client connected {:?}", tx);
// match events.try_send(msg.clone()) { Ok(())
// Ok(()) => info!("events message sent message={:?}", msg), },
// Err(e) => warn!("events delivery failure err={:?}", e), WsMessage::Disconnect => {
// }; info!("client disconnected");
Ok(()) Ok(())
} }
_ => return Err(format_err!("events received unhandled msg={:?}", msg)),
pub fn listen(pool: &PgPool, events: &mut Events) -> Result<(), Error> {
let db = pool.get()?;
db.execute("LISTEN events;", &[])?;
info!("events listening");
let notifications = db.notifications();
let mut n_iter = notifications.blocking_iter();
// main event loop, checks pg and checks messages
loop {
// check notifications
let n = n_iter.next()?;
if let Some(n) = n {
match serde_json::from_str::<Notification>(&n.payload) {
Ok(notification) => match pg_notification(notification, &db, &events) {
Ok(()) => (),
Err(e) => warn!("{:?}", e),
}
Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e),
};
}
events.receive();
}
}
pub fn start(pool: PgPool, mut events: Events) {
loop {
match listen(&pool, &mut events) {
Ok(()) => panic!("events listen returned"),
Err(e) => warn!("events listener error err={:?}", e),
} }
} }
} }
// #[derive(Debug)] // #[derive(Debug)]
// struct Subscriptions { // struct Subscriptions {
// account: Option<Uuid>, // account: Option<Uuid>,
@ -241,3 +149,5 @@ pub fn start(pool: PgPool, mut events: Events) {
// ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); // ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap();
// } // }
// } // }

View File

@ -55,7 +55,7 @@ use std::thread::{sleep, spawn};
use std::time::{Duration}; use std::time::{Duration};
use std::path::{Path}; use std::path::{Path};
use events::{start as events_start}; use events::Events;
use warden::warden; use warden::warden;
fn setup_logger() -> Result<(), fern::InitError> { fn setup_logger() -> Result<(), fern::InitError> {
@ -95,16 +95,16 @@ fn main() {
} }
}); });
let events = events::Events::new();
let event_listener = events.clone();
let events_pool = pool.clone();
// spawn(move || events_start(events_pool, event_listener));
let http_pool = pool.clone(); let http_pool = pool.clone();
spawn(move || net::start(http_pool)); spawn(move || net::start(http_pool));
let ws_pool = pool.clone(); // create a clone of the tx so ws handler can tell events
websocket::start(ws_pool, events); // about connection status
let mut events = events::Events::new(pool.clone());
let ws_events_tx = events.tx.clone();
spawn(move || events.listen());
info!("server started"); // the main thread becomes this ws listener
let ws_pool = pool.clone();
websocket::start(ws_pool, ws_events_tx);
} }

View File

@ -1,16 +1,46 @@
use std::env; use std::env;
use uuid::Uuid;
use failure::Error;
use r2d2::{Pool}; use r2d2::{Pool};
use r2d2::{PooledConnection}; use r2d2::{PooledConnection};
use r2d2_postgres::{TlsMode, PostgresConnectionManager}; use r2d2_postgres::{TlsMode, PostgresConnectionManager};
use fallible_iterator::{FallibleIterator};
// use postgres::transaction::Transaction; use events::Events;
pub type Db = PooledConnection<PostgresConnectionManager>; pub type Db = PooledConnection<PostgresConnectionManager>;
pub type PgPool = Pool<PostgresConnectionManager>; pub type PgPool = Pool<PostgresConnectionManager>;
const DB_POOL_SIZE: u32 = 20; const DB_POOL_SIZE: u32 = 20;
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
#[serde(rename_all(deserialize = "lowercase"))]
enum Table {
Accounts,
Constructs,
Instances,
Mtx,
Players,
Games,
}
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
#[serde(rename_all(deserialize = "UPPERCASE"))]
enum Action {
Insert,
Update,
Delete,
}
#[derive(Debug,Clone,Serialize,Deserialize)]
struct Notification {
table: Table,
action: Action,
id: Uuid,
}
pub fn create_pool() -> Pool<PostgresConnectionManager> { pub fn create_pool() -> Pool<PostgresConnectionManager> {
let url = env::var("DATABASE_URL") let url = env::var("DATABASE_URL")
@ -24,3 +54,57 @@ pub fn create_pool() -> Pool<PostgresConnectionManager> {
.build(manager) .build(manager)
.expect("Failed to create pool.") .expect("Failed to create pool.")
} }
// fn handle_notification(n: Notification, db: &Db, events: &Events) -> Result<(), Error> {
// info!("events received notification notification={:?}", n);
// // maybe we need it
// let mut tx = db.transaction()?;
// let msg = match n.action {
// Action::Delete => return Err(format_err!("unimplemented delete notification {:?}", n)),
// Action::Insert => return Err(format_err!("unimplemented insert notification {:?}", n)),
// Action::Update => match n.table {
// Table::Accounts => Message::Account(account::select(db, n.id)?),
// Table::Instances => Message::Instance(instance::instance_get(&mut tx, n.id)?),
// Table::Games => Message::Game(game::game_get(&mut tx, n.id)?),
// _ => return Err(format_err!("unimplemented update notification {:?}", n)),
// },
// };
// tx.commit()?;
// info!("got a msg to send to whoever cares {:?}", msg);
// // match events.try_send(msg.clone()) {
// // Ok(()) => info!("events message sent message={:?}", msg),
// // Err(e) => warn!("events delivery failure err={:?}", e),
// // };
// Ok(())
// }
// pub fn listen(pool: &PgPool, events: &mut Events) -> Result<(), Error> {
// let db = pool.get()?;
// db.execute("LISTEN events;", &[])?;
// info!("events listening");
// let notifications = db.notifications();
// let mut n_iter = notifications.blocking_iter();
// // main event loop, checks pg and checks messages
// loop {
// // check notifications
// let n = n_iter.next()?;
// if let Some(n) = n {
// match serde_json::from_str::<Notification>(&n.payload) {
// Ok(notification) => match handle_notification(notification, &db, &events) {
// Ok(()) => (),
// Err(e) => warn!("{:?}", e),
// }
// Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e),
// };
// }
// }
// }

View File

@ -2,31 +2,37 @@ use std::time::{Instant};
use std::thread::spawn; use std::thread::spawn;
use std::str; use std::str;
use rand::prelude::*;
use serde_cbor::to_vec; use serde_cbor::to_vec;
use cookie::Cookie; use cookie::Cookie;
use crossbeam_channel::{unbounded, Sender as CbSender, Receiver as CbReceiver}; use crossbeam_channel::{unbounded, Sender as CbSender};
use ws::{ listen, CloseCode, Message, Sender, Handler, Handshake, Result, Request, Response}; use ws::{ listen, CloseCode, Message, Handler, Result, Request, Response};
use account; use account;
use account::{Account}; use account::{Account};
use pg::{PgPool}; use pg::{PgPool};
use events::{Events};
use rpc::{RpcMessage}; use rpc::{RpcMessage};
use rpc; use rpc;
use net::TOKEN_HEADER; use net::TOKEN_HEADER;
#[derive(Debug,Clone,Serialize)] // these are messages relating to the lifecycle
struct WsError { // of ws clients
err: String, #[derive(Debug,Clone)]
pub enum WsMessage {
Connect(CbSender<RpcMessage>),
Disconnect,
} }
struct Connection { struct Connection {
ws: CbSender<RpcMessage>, pub id: usize,
pub ws: CbSender<RpcMessage>,
pool: PgPool, pool: PgPool,
events: Events,
account: Option<Account>, account: Option<Account>,
events: CbSender<WsMessage>,
} }
impl Handler for Connection { impl Handler for Connection {
@ -37,6 +43,9 @@ impl Handler for Connection {
self.ws.send(RpcMessage::AccountState(a.clone())).unwrap(); self.ws.send(RpcMessage::AccountState(a.clone())).unwrap();
} }
// tell events we have connected
self.events.send(WsMessage::Connect(self.ws.clone()));
Ok(()) Ok(())
} }
@ -98,8 +107,16 @@ impl Handler for Connection {
} }
pub fn start(pool: PgPool, events: Events) { pub fn start(pool: PgPool, events_tx: CbSender<WsMessage>) {
let mut rng = thread_rng();
listen("127.0.0.1:40055", move |out| { listen("127.0.0.1:40055", move |out| {
// we give the tx half to the connection object
// which in turn passes a clone to the events system
// the rx half goes into a thread where it waits for messages
// that need to be delivered to the client
// both the ws message handler and the events thread must use
// this channel to send messages
let (tx, rx) = unbounded::<RpcMessage>(); let (tx, rx) = unbounded::<RpcMessage>();
spawn(move || { spawn(move || {
@ -118,11 +135,11 @@ pub fn start(pool: PgPool, events: Events) {
}); });
Connection { Connection {
id: rng.gen::<usize>(),
account: None, account: None,
ws: tx, ws: tx,
pool: pool.clone(), pool: pool.clone(),
events: events.clone(), events: events_tx.clone(),
} }
}).unwrap(); }).unwrap();
} }