use std::env; use std::thread::spawn; use uuid::Uuid; use failure::Error; use r2d2::{Pool}; use r2d2::{PooledConnection}; use r2d2_postgres::{TlsMode, PostgresConnectionManager}; use fallible_iterator::{FallibleIterator}; use crossbeam_channel::{Sender}; use events::{Event}; use account; use game; use instance; use rpc::RpcMessage; pub type Db = PooledConnection; pub type PgPool = Pool; const DB_POOL_SIZE: u32 = 20; #[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] #[serde(rename_all(deserialize = "lowercase"))] enum Table { Accounts, Constructs, Instances, Mtx, Players, Games, } #[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] #[serde(rename_all(deserialize = "UPPERCASE"))] enum Action { Insert, Update, Delete, } #[derive(Debug,Clone,Serialize,Deserialize)] struct Notification { table: Table, action: Action, id: Uuid, } pub fn create_pool() -> Pool { let url = env::var("DATABASE_URL") .expect("DATABASE_URL must be set"); let manager = PostgresConnectionManager::new(url, TlsMode::None) .expect("could not instantiate pg manager"); Pool::builder() .max_size(DB_POOL_SIZE) .build(manager) .expect("Failed to create pool.") } fn handle_notification(n: Notification, pool: &PgPool, events: &Sender) { info!("pg received notification={:?}", n); // bang out a thread to do the slow work of fetching the state from db // the thread will notify events let pool = pool.clone(); let events = events.clone(); spawn(move || { // maybe we need it let db = pool.get().unwrap(); let mut tx = db.transaction().unwrap(); let msg = match n.action { Action::Delete => { warn!("unimplemented delete notification {:?}", n); None }, Action::Insert => { warn!("unimplemented insert notification {:?}", n); None }, Action::Update => match n.table { Table::Accounts => Some(Event::Push(n.id, RpcMessage::AccountState(account::select(&db, n.id).unwrap()))), Table::Instances => Some(Event::Push(n.id, instance::instance_state(&mut tx, n.id).unwrap())), Table::Games => Some(Event::Push(n.id, RpcMessage::GameState(game::game_get(&mut tx, n.id).unwrap()))), _ => { warn!("unimplemented update notification {:?}", n); None }, }, }; tx.commit().unwrap(); if let Some(msg) = msg { events.send(msg).unwrap(); } }); } // this function gets a dedicated connection // because it has to subscribe and listen for notifications pub fn listen(pool: PgPool, events: Sender) -> Result<(), Error> { let db = pool.get()?; db.execute("LISTEN events;", &[])?; info!("pg listening"); let notifications = db.notifications(); let mut n_iter = notifications.blocking_iter(); // main event loop, checks pg and checks messages loop { // check notifications let n = n_iter.next()?; if let Some(n) = n { match serde_json::from_str::(&n.payload) { Ok(notification) => handle_notification(notification, &pool, &events), Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e), }; } } }