mnml/server/src/pg.rs

131 lines
3.6 KiB
Rust

use std::env;
use std::thread::spawn;
use uuid::Uuid;
use failure::Error;
use r2d2::{Pool};
use r2d2::{PooledConnection};
use r2d2_postgres::{TlsMode, PostgresConnectionManager};
use fallible_iterator::{FallibleIterator};
use crossbeam_channel::{Sender};
use events::{Event};
use account;
use game;
use instance;
use rpc::RpcMessage;
pub type Db = PooledConnection<PostgresConnectionManager>;
pub type PgPool = Pool<PostgresConnectionManager>;
const DB_POOL_SIZE: u32 = 20;
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
#[serde(rename_all(deserialize = "lowercase"))]
enum Table {
Accounts,
Constructs,
Instances,
Mtx,
Players,
Games,
}
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
#[serde(rename_all(deserialize = "UPPERCASE"))]
enum Action {
Insert,
Update,
Delete,
}
#[derive(Debug,Clone,Serialize,Deserialize)]
struct Notification {
table: Table,
action: Action,
id: Uuid,
}
pub fn create_pool() -> Pool<PostgresConnectionManager> {
let url = env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
let manager = PostgresConnectionManager::new(url, TlsMode::None)
.expect("could not instantiate pg manager");
Pool::builder()
.max_size(DB_POOL_SIZE)
.build(manager)
.expect("Failed to create pool.")
}
fn handle_notification(n: Notification, pool: &PgPool, events: &Sender<Event>) {
info!("pg received notification={:?}", n);
// bang out a thread to do the slow work of fetching the state from db
// the thread will notify events
let pool = pool.clone();
let events = events.clone();
spawn(move || {
// maybe we need it
let db = pool.get().unwrap();
let mut tx = db.transaction().unwrap();
let msg = match n.action {
Action::Delete => {
warn!("unimplemented delete notification {:?}", n);
None
},
Action::Insert => {
warn!("unimplemented insert notification {:?}", n);
None
},
Action::Update => match n.table {
Table::Accounts =>
Some(Event::Push(n.id, RpcMessage::AccountState(account::select(&db, n.id).unwrap()))),
Table::Instances =>
Some(Event::Push(n.id, instance::instance_state(&mut tx, n.id).unwrap())),
Table::Games =>
Some(Event::Push(n.id, RpcMessage::GameState(game::game_get(&mut tx, n.id).unwrap()))),
_ => {
warn!("unimplemented update notification {:?}", n);
None
},
},
};
tx.commit().unwrap();
if let Some(msg) = msg {
events.send(msg).unwrap();
}
});
}
// this function gets a dedicated connection
// because it has to subscribe and listen for notifications
pub fn listen(pool: PgPool, events: Sender<Event>) -> Result<(), Error> {
let db = pool.get()?;
db.execute("LISTEN events;", &[])?;
info!("pg listening");
let notifications = db.notifications();
let mut n_iter = notifications.blocking_iter();
// main event loop, checks pg and checks messages
loop {
// check notifications
let n = n_iter.next()?;
if let Some(n) = n {
match serde_json::from_str::<Notification>(&n.payload) {
Ok(notification) => handle_notification(notification, &pool, &events),
Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e),
};
}
}
}