use failure::{Error, err_msg}; use tungstenite::Message; use tungstenite::protocol::WebSocket; use tungstenite::server::accept; use tungstenite::Message::Binary; use std::net::{TcpListener, TcpStream}; use serde_cbor::{to_vec}; use std::env; use std::thread::{spawn, sleep}; use std::time::{Instant, Duration}; use r2d2::{Pool}; use r2d2::{PooledConnection}; use r2d2_postgres::{TlsMode, PostgresConnectionManager}; static DB_POOL_SIZE: u32 = 20; pub type Db = PooledConnection; use rpc::{Rpc}; use warden::{warden}; #[derive(Debug,Clone,Serialize,Deserialize)] struct RpcErrorResponse { err: String } fn receive(db: Db, begin: Instant, rpc: &Rpc, msg: Message, client: &mut WebSocket) -> Result { match rpc.receive(msg, begin, &db, client) { Ok(reply) => { let response = to_vec(&reply) .expect("failed to serialize response"); client.write_message(Binary(response))?; return Ok(reply.method); }, Err(e) => { let response = to_vec(&RpcErrorResponse { err: e.to_string() }) .expect("failed to serialize error response"); client.write_message(Binary(response))?; return Err(err_msg(e)); } } } pub fn db_connection(url: String) -> Pool { let manager = PostgresConnectionManager::new(url, TlsMode::None) .expect("could not instantiate pg manager"); Pool::builder() .max_size(DB_POOL_SIZE) .build(manager) .expect("Failed to create pool.") } // fn print_panic_payload(ctx: &str, payload: &(Any + Send + 'static)) { // let d = format!("{:?}", payload); // let s = if let Some(s) = payload.downcast_ref::() { // &s // } else if let Some(s) = payload.downcast_ref::<&str>() { // s // } else { // // "PAYLOAD IS NOT A STRING" // d.as_str() // }; // info!("{}: PANIC OCCURRED: {}", ctx, s); // } pub fn start() { // panic::set_hook(Box::new(|panic_info| { // print_panic_payload("set_hook", panic_info.payload()); // if let Some(location) = panic_info.location() { // info!("LOCATION: {}:{}", location.file(), location.line()); // } else { // info!("NO LOCATION INFORMATION"); // } // })); let database_url = env::var("DATABASE_URL") .expect("DATABASE_URL must be set"); let pool = db_connection(database_url); // { // let startup_connection = pool.get().expect("unable to get db connection"); // startup(startup_connection).unwrap(); // } let server = TcpListener::bind("0.0.0.0:40000").unwrap(); let warden_pool = pool.clone(); spawn(move || { loop { let db_connection = warden_pool.get().expect("unable to get db connection"); if let Err(e) = warden(db_connection) { info!("{:?}", e); } sleep(Duration::new(1, 0)); } }); for stream in server.incoming() { let db = pool.clone(); spawn(move || { let mut websocket = accept(stream.unwrap()).unwrap(); let rpc = Rpc {}; loop { match websocket.read_message() { Ok(msg) => { let begin = Instant::now(); let db_connection = db.get().expect("unable to get db connection"); match receive(db_connection, begin, &rpc, msg, &mut websocket) { Ok(_) => (), Err(e) => warn!("{:?}", e), } }, // connection is closed Err(e) => { debug!("{:?}", e); return; } }; } }); } }