131 lines
3.6 KiB
Rust
131 lines
3.6 KiB
Rust
use std::env;
|
|
use std::thread::spawn;
|
|
|
|
use uuid::Uuid;
|
|
|
|
use failure::Error;
|
|
|
|
use r2d2::{Pool};
|
|
use r2d2::{PooledConnection};
|
|
use r2d2_postgres::{TlsMode, PostgresConnectionManager};
|
|
use fallible_iterator::{FallibleIterator};
|
|
|
|
use crossbeam_channel::{Sender};
|
|
|
|
use events::{Event};
|
|
use account;
|
|
use game;
|
|
use instance;
|
|
use rpc::RpcMessage;
|
|
|
|
pub type Db = PooledConnection<PostgresConnectionManager>;
|
|
pub type PgPool = Pool<PostgresConnectionManager>;
|
|
|
|
const DB_POOL_SIZE: u32 = 20;
|
|
|
|
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
|
|
#[serde(rename_all(deserialize = "lowercase"))]
|
|
enum Table {
|
|
Accounts,
|
|
Constructs,
|
|
Instances,
|
|
Mtx,
|
|
Players,
|
|
Games,
|
|
}
|
|
|
|
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
|
|
#[serde(rename_all(deserialize = "UPPERCASE"))]
|
|
enum Action {
|
|
Insert,
|
|
Update,
|
|
Delete,
|
|
}
|
|
|
|
#[derive(Debug,Clone,Serialize,Deserialize)]
|
|
struct Notification {
|
|
table: Table,
|
|
action: Action,
|
|
id: Uuid,
|
|
}
|
|
|
|
pub fn create_pool() -> Pool<PostgresConnectionManager> {
|
|
let url = env::var("DATABASE_URL")
|
|
.expect("DATABASE_URL must be set");
|
|
|
|
let manager = PostgresConnectionManager::new(url, TlsMode::None)
|
|
.expect("could not instantiate pg manager");
|
|
|
|
Pool::builder()
|
|
.max_size(DB_POOL_SIZE)
|
|
.build(manager)
|
|
.expect("Failed to create pool.")
|
|
}
|
|
|
|
fn handle_notification(n: Notification, pool: &PgPool, events: &Sender<Event>) {
|
|
info!("pg received notification={:?}", n);
|
|
|
|
// bang out a thread to do the slow work of fetching the state from db
|
|
// the thread will notify events
|
|
|
|
let pool = pool.clone();
|
|
let events = events.clone();
|
|
spawn(move || {
|
|
// maybe we need it
|
|
let db = pool.get().unwrap();
|
|
let mut tx = db.transaction().unwrap();
|
|
|
|
let msg = match n.action {
|
|
Action::Delete => {
|
|
warn!("unimplemented delete notification {:?}", n);
|
|
None
|
|
},
|
|
Action::Insert => {
|
|
warn!("unimplemented insert notification {:?}", n);
|
|
None
|
|
},
|
|
Action::Update => match n.table {
|
|
Table::Accounts =>
|
|
Some(Event::Push(n.id, RpcMessage::AccountState(account::select(&db, n.id).unwrap()))),
|
|
Table::Instances =>
|
|
Some(Event::Push(n.id, instance::instance_state(&mut tx, n.id).unwrap())),
|
|
Table::Games =>
|
|
Some(Event::Push(n.id, RpcMessage::GameState(game::game_get(&mut tx, n.id).unwrap()))),
|
|
_ => {
|
|
warn!("unimplemented update notification {:?}", n);
|
|
None
|
|
},
|
|
},
|
|
};
|
|
|
|
tx.commit().unwrap();
|
|
|
|
if let Some(msg) = msg {
|
|
events.send(msg).unwrap();
|
|
}
|
|
});
|
|
}
|
|
|
|
// this function gets a dedicated connection
|
|
// because it has to subscribe and listen for notifications
|
|
pub fn listen(pool: PgPool, events: Sender<Event>) -> Result<(), Error> {
|
|
let db = pool.get()?;
|
|
db.execute("LISTEN events;", &[])?;
|
|
info!("pg listening");
|
|
|
|
let notifications = db.notifications();
|
|
let mut n_iter = notifications.blocking_iter();
|
|
|
|
// main event loop, checks pg and checks messages
|
|
loop {
|
|
// check notifications
|
|
let n = n_iter.next()?;
|
|
if let Some(n) = n {
|
|
match serde_json::from_str::<Notification>(&n.payload) {
|
|
Ok(notification) => handle_notification(notification, &pool, &events),
|
|
Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e),
|
|
};
|
|
}
|
|
}
|
|
}
|