mnml/server/src/net.rs
2019-06-01 14:59:01 +10:00

129 lines
3.9 KiB
Rust

use failure::{Error, err_msg};
use tungstenite::Message;
use tungstenite::protocol::WebSocket;
use tungstenite::server::accept;
use tungstenite::Message::Binary;
use std::net::{TcpListener, TcpStream};
use serde_cbor::{to_vec};
use std::env;
use std::thread::{spawn, sleep};
use std::time::{Instant, Duration};
use r2d2::{Pool};
use r2d2::{PooledConnection};
use r2d2_postgres::{TlsMode, PostgresConnectionManager};
static DB_POOL_SIZE: u32 = 20;
pub type Db = PooledConnection<PostgresConnectionManager>;
use rpc::{Rpc};
use warden::{warden};
#[derive(Debug,Clone,Serialize,Deserialize)]
struct RpcErrorResponse {
err: String
}
fn receive(db: Db, begin: Instant, rpc: &Rpc, msg: Message, client: &mut WebSocket<TcpStream>) -> Result<String, Error> {
match rpc.receive(msg, begin, &db, client) {
Ok(reply) => {
let response = to_vec(&reply)
.expect("failed to serialize response");
client.write_message(Binary(response))?;
return Ok(reply.method);
},
Err(e) => {
let response = to_vec(&RpcErrorResponse { err: e.to_string() })
.expect("failed to serialize error response");
client.write_message(Binary(response))?;
return Err(err_msg(e));
}
}
}
pub fn db_connection(url: String) -> Pool<PostgresConnectionManager> {
let manager = PostgresConnectionManager::new(url, TlsMode::None)
.expect("could not instantiate pg manager");
Pool::builder()
.max_size(DB_POOL_SIZE)
.build(manager)
.expect("Failed to create pool.")
}
// fn print_panic_payload(ctx: &str, payload: &(Any + Send + 'static)) {
// let d = format!("{:?}", payload);
// let s = if let Some(s) = payload.downcast_ref::<String>() {
// &s
// } else if let Some(s) = payload.downcast_ref::<&str>() {
// s
// } else {
// // "PAYLOAD IS NOT A STRING"
// d.as_str()
// };
// info!("{}: PANIC OCCURRED: {}", ctx, s);
// }
pub fn start() {
// panic::set_hook(Box::new(|panic_info| {
// print_panic_payload("set_hook", panic_info.payload());
// if let Some(location) = panic_info.location() {
// info!("LOCATION: {}:{}", location.file(), location.line());
// } else {
// info!("NO LOCATION INFORMATION");
// }
// }));
let database_url = env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
let pool = db_connection(database_url);
// {
// let startup_connection = pool.get().expect("unable to get db connection");
// startup(startup_connection).unwrap();
// }
let server = TcpListener::bind("0.0.0.0:40000").unwrap();
let warden_pool = pool.clone();
spawn(move || {
loop {
let db_connection = warden_pool.get().expect("unable to get db connection");
if let Err(e) = warden(db_connection) {
info!("{:?}", e);
}
sleep(Duration::new(1, 0));
}
});
for stream in server.incoming() {
let db = pool.clone();
spawn(move || {
let mut websocket = accept(stream.unwrap()).unwrap();
let rpc = Rpc {};
loop {
match websocket.read_message() {
Ok(msg) => {
let begin = Instant::now();
let db_connection = db.get().expect("unable to get db connection");
match receive(db_connection, begin, &rpc, msg, &mut websocket) {
Ok(_) => (),
Err(e) => warn!("{:?}", e),
}
},
// connection is closed
Err(e) => {
info!("{:?}", e);
return;
}
};
}
});
}
}