use tungstenite::Message; use tungstenite::protocol::WebSocket; use tungstenite::server::accept; use tungstenite::error::Error; use tungstenite::Message::Binary; use std::net::{TcpListener, TcpStream}; use serde_cbor::{to_vec}; use std::env; use std::thread::{spawn, sleep}; use std::time::Duration; use std::any::Any; use std::panic; use r2d2::{Pool}; use r2d2::{PooledConnection}; use r2d2_postgres::{TlsMode, PostgresConnectionManager}; static DB_POOL_SIZE: u32 = 20; pub type Db = PooledConnection; use rpc::{Rpc}; use util::{startup}; use warden::{warden}; // struct Server { // client: WebSocket, // rpc: Rpc, // db: Pool, // } #[derive(Debug,Clone,Serialize,Deserialize)] struct RpcErrorResponse { err: String } fn receive(db: Db, rpc: &Rpc, msg: Message, client: &mut WebSocket) -> Result<(), Error> { match rpc.receive(msg, &db, client) { Ok(reply) => { let response = to_vec(&reply) .expect("failed to serialize response"); client.write_message(Binary(response)) }, Err(e) => { println!("{:?}", e); let response = to_vec(&RpcErrorResponse { err: e.to_string() }) .expect("failed to serialize error response"); client.write_message(Binary(response)) } } } pub fn db_connection(url: String) -> Pool { let manager = PostgresConnectionManager::new(url, TlsMode::None) .expect("could not instantiate pg manager"); Pool::builder() .max_size(DB_POOL_SIZE) .build(manager) .expect("Failed to create pool.") } fn print_panic_payload(ctx: &str, payload: &(Any + Send + 'static)) { let d = format!("{:?}", payload); let s = if let Some(s) = payload.downcast_ref::() { &s } else if let Some(s) = payload.downcast_ref::<&str>() { s } else { // "PAYLOAD IS NOT A STRING" d.as_str() }; println!("{}: PANIC OCCURRED: {}", ctx, s); } pub fn start() { panic::set_hook(Box::new(|panic_info| { print_panic_payload("set_hook", panic_info.payload()); if let Some(location) = panic_info.location() { println!("LOCATION: {}:{}", location.file(), location.line()); } else { println!("NO LOCATION INFORMATION"); } })); let database_url = env::var("DATABASE_URL") .expect("DATABASE_URL must be set"); let pool = db_connection(database_url); { let startup_connection = pool.get().expect("unable to get db connection"); startup(startup_connection).unwrap(); } let server = TcpListener::bind("0.0.0.0:40000").unwrap(); let warden_pool = pool.clone(); spawn(move || { loop { let db_connection = warden_pool.get().expect("unable to get db connection"); if let Err(e) = warden(db_connection) { println!("{:?}", e); } sleep(Duration::new(5, 0)); } }); for stream in server.incoming() { let db = pool.clone(); spawn(move || { let mut websocket = accept(stream.unwrap()).unwrap(); let rpc = Rpc {}; loop { match websocket.read_message() { Ok(msg) => { let db_connection = db.get().expect("unable to get db connection"); match receive(db_connection, &rpc, msg, &mut websocket) { Ok(_r) => (), Err(e) => println!("{:?}", e), } }, // connection is closed Err(e) => { println!("{:?}", e); return; } }; } }); } }