diff --git a/server/Cargo.toml b/server/Cargo.toml index c65db8cc..9a167f4e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -26,10 +26,13 @@ failure = "0.1" log = "0.4" fern = "0.5" -actix = "0.8.2" -actix-web = "1.0.0" -actix-web-actors = "1.0.0" -actix-cors = "0.1.0" +iron = "0.6" +bodyparser = "0.8" +persistent = "0.4" +router = "0.6" +cookie = "0.12" +tungstenite = "0.8" +crossbeam-channel = "0.3" stripe-rust = { version = "0.10.4", features = ["webhooks"] } diff --git a/server/src/actixnet.rs b/server/src/actixnet.rs new file mode 100644 index 00000000..cbab4b5b --- /dev/null +++ b/server/src/actixnet.rs @@ -0,0 +1,221 @@ +use std::env; +use std::thread::{spawn, sleep}; +use std::time::{Duration}; + +use actix_web::{middleware, web, App, HttpMessage, HttpRequest, HttpResponse, HttpServer}; +use actix_web::error::ResponseError; +use actix_web::http::{Cookie}; +use actix_web::cookie::{SameSite}; +use actix_cors::Cors; + +use r2d2::{Pool}; +use r2d2::{PooledConnection}; +use r2d2_postgres::{TlsMode, PostgresConnectionManager}; + +use warden::{warden}; +use pubsub::{pg_listen}; +use ws::{connect}; +use account; +use payments::{post_stripe_event}; + +pub type Db = PooledConnection; +pub type PgPool = Pool; + +const DB_POOL_SIZE: u32 = 20; + +#[derive(Debug,Clone,Serialize,Deserialize)] +pub struct JsonError { + pub err: String +} + +#[derive(Fail, Debug, Serialize, Deserialize)] +pub enum MnmlHttpError { + // User Facing Errors + #[fail(display="internal server error")] + ServerError, + #[fail(display="unauthorized")] + Unauthorized, + #[fail(display="bad request")] + BadRequest, + #[fail(display="account name taken or invalid")] + AccountNameTaken, + #[fail(display="password unacceptable. must be > 11 characters")] + PasswordUnacceptable, + #[fail(display="invalid code. https://discord.gg/YJJgurM")] + InvalidCode, +} + +impl ResponseError for MnmlHttpError { + fn error_response(&self) -> HttpResponse { + match *self { + MnmlHttpError::ServerError => HttpResponse::InternalServerError() + .json(JsonError { err: self.to_string() }), + + MnmlHttpError::BadRequest => HttpResponse::BadRequest() + .json(JsonError { err: self.to_string() }), + + MnmlHttpError::Unauthorized => HttpResponse::Unauthorized() + .cookie(Cookie::build("x-auth-token", "") + // .secure(secure) + .http_only(true) + .same_site(SameSite::Strict) + .max_age(-1) // 1 week aligns with db set + .finish()) + .json(JsonError { err: self.to_string() }), + + MnmlHttpError::AccountNameTaken => HttpResponse::BadRequest() + .json(JsonError { err: self.to_string() }), + + MnmlHttpError::PasswordUnacceptable => HttpResponse::BadRequest() + .json(JsonError { err: self.to_string() }), + + MnmlHttpError::InvalidCode => HttpResponse::BadRequest() + .json(JsonError { err: self.to_string() }), + } + } +} + +fn login_res(token: String) -> HttpResponse { + HttpResponse::Ok() + .cookie(Cookie::build("x-auth-token", token) + // .secure(secure) + .http_only(true) + .same_site(SameSite::Strict) + .max_age(60 * 60 * 24 * 7) // 1 week aligns with db set + .finish()) + .finish() +} + +fn logout_res() -> HttpResponse { + HttpResponse::Ok() + .cookie(Cookie::build("x-auth-token", "") + // .secure(secure) + .http_only(true) + .same_site(SameSite::Strict) + .max_age(-1) + .finish()) + .finish() +} + +#[derive(Debug,Clone,Serialize,Deserialize)] +struct AccountLoginParams { + name: String, + password: String, +} + +fn login(state: web::Data, params: web::Json::) -> Result { + let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; + let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; + + match account::login(&mut tx, ¶ms.name, ¶ms.password) { + Ok(a) => { + let token = account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::ServerError))?; + tx.commit().or(Err(MnmlHttpError::ServerError))?; + Ok(login_res(token)) + }, + Err(e) => { + info!("{:?}", e); + Err(MnmlHttpError::Unauthorized) + } + } +} + +fn logout(r: HttpRequest, state: web::Data) -> Result { + match r.cookie("x-auth-token") { + Some(t) => { + let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; + let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; + match account::from_token(&mut tx, t.value().to_string()) { + Ok(a) => { + account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::Unauthorized))?; + tx.commit().or(Err(MnmlHttpError::ServerError))?; + return Ok(logout_res()); + }, + Err(_) => Err(MnmlHttpError::Unauthorized), + } + }, + None => Err(MnmlHttpError::Unauthorized), + } +} + +#[derive(Debug,Clone,Serialize,Deserialize)] +struct AccountRegisterParams { + name: String, + password: String, + code: String, +} + +fn register(state: web::Data, params: web::Json::) -> Result { + let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; + let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; + + match account::create(¶ms.name, ¶ms.password, ¶ms.code, &mut tx) { + Ok(token) => { + tx.commit().or(Err(MnmlHttpError::ServerError))?; + Ok(login_res(token)) + }, + Err(e) => { + info!("{:?}", e); + Err(MnmlHttpError::BadRequest) + } + } +} + +fn create_pool(url: String) -> Pool { + let manager = PostgresConnectionManager::new(url, TlsMode::None) + .expect("could not instantiate pg manager"); + + Pool::builder() + .max_size(DB_POOL_SIZE) + .build(manager) + .expect("Failed to create pool.") +} + +pub struct State { + pub pool: PgPool, + // pub pubsub: PubSub, +} + +pub fn start() { + let database_url = env::var("DATABASE_URL") + .expect("DATABASE_URL must be set"); + let pool = create_pool(database_url); + + let warden_pool = pool.clone(); + spawn(move || { + loop { + let db_connection = warden_pool.get().expect("unable to get db connection"); + if let Err(e) = warden(db_connection) { + info!("{:?}", e); + } + sleep(Duration::new(1, 0)); + } + }); + + let pubsub_pool = pool.clone(); + spawn(move || loop { + let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); + match pg_listen(pubsub_conn) { + Ok(_) => warn!("pg listen closed"), + Err(e) => warn!("pg_listen error {:?}", e), + } + }); + + HttpServer::new(move || App::new() + .data(State { pool: pool.clone() }) + .wrap(middleware::Logger::default()) + .wrap(Cors::new().supports_credentials()) + .service(web::resource("/api/login").route(web::post().to(login))) + .service(web::resource("/api/logout").route(web::post().to(logout))) + .service(web::resource("/api/register").route(web::post().to(register))) + + .service(web::resource("/api/payments/stripe") + .route(web::post().to(post_stripe_event))) + + // .service(web::resource("/api/payments/crypto") + // .route(web::post().to(post_stripe_payment))) + + .service(web::resource("/api/ws").route(web::get().to(connect)))) + .bind("127.0.0.1:40000").expect("could not bind to port") + .run().expect("could not start http server"); +} diff --git a/server/src/error.rs b/server/src/error.rs new file mode 100644 index 00000000..e09bac41 --- /dev/null +++ b/server/src/error.rs @@ -0,0 +1,107 @@ +// This example illustrates the error flow of a Request in the middleware Chain. +// Here is the chain used and the path of the request through the middleware pieces: +// +// Normal Flow : __[ErrorProducer::before]__ [ErrorRecover::before] __[handle::HelloWorldHandler]__[ErrorProducer::after]__ [ErrorRecover::after] __ ... +// Error Flow : [ErrorProducer::catch ] |__[ErrorRecover::catch ]__| [ErrorProducer::catch] |__[ErrorRecover::catch]__| +// +// --------------- BEFORE MIDDLEWARE ----------------- || --------- HANDLER -------- || ---------------- AFTER MIDDLEWARE -------------- + +extern crate iron; + +use iron::prelude::*; +use iron::StatusCode; +use iron::{AfterMiddleware, BeforeMiddleware, Handler}; + +use std::error::Error; +use std::fmt::{self, Debug}; + +struct HelloWorldHandler; +struct ErrorProducer; +struct ErrorRecover; + +#[derive(Debug)] +struct StringError(String); + +impl fmt::Display for StringError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Debug::fmt(self, f) + } +} + +impl Error for StringError { + fn description(&self) -> &str { + &*self.0 + } +} + +impl Handler for HelloWorldHandler { + fn handle(&self, _: &mut Request) -> IronResult { + // This will be called since we are in the normal flow before reaching the Handler. + // However, the AfterMiddleware chain will override the Response. + println!("The HelloWorldHandler has been called !"); + Ok(Response::with((StatusCode::OK, "Hello world !"))) + } +} + +impl BeforeMiddleware for ErrorProducer { + fn before(&self, _: &mut Request) -> IronResult<()> { + // The error produced here switches to the error flow. + // The catch method of following middleware pieces will be called. + // The Handler will be skipped unless the error is handled by another middleware piece. + // IronError::error tells the next middleware what went wrong. + // IronError::response is the Response that will be sent back to the client if this error is not handled. + // Here status::BadRequest acts as modifier, thus we can put more there than just a status. + Err(IronError::new( + StringError("Error in ErrorProducer BeforeMiddleware".to_string()), + StatusCode::BAD_REQUEST, + )) + } +} + +impl AfterMiddleware for ErrorProducer { + fn after(&self, _: &mut Request, _: Response) -> IronResult { + // The behavior here is the same as in ErrorProducer::before. + // The previous response (from the Handler) is discarded and replaced with a new response (created from the modifier). + Err(IronError::new( + StringError("Error in ErrorProducer AfterMiddleware".to_string()), + (StatusCode::BAD_REQUEST, "Response created in ErrorProducer"), + )) + } +} + +impl BeforeMiddleware for ErrorRecover { + fn catch(&self, _: &mut Request, err: IronError) -> IronResult<()> { + // We can use the IronError from previous middleware to decide what to do. + // Returning Ok() from a catch method resumes the normal flow and + // passes the Request forward to the next middleware piece in the chain (here the HelloWorldHandler). + println!("{} caught in ErrorRecover BeforeMiddleware.", err.error); + match err.response.status { + Some(StatusCode::BAD_REQUEST) => Ok(()), + _ => Err(err), + } + } +} + +impl AfterMiddleware for ErrorRecover { + fn catch(&self, _: &mut Request, err: IronError) -> IronResult { + // Just like in the BeforeMiddleware, we can return Ok(Response) here to return to the normal flow. + // In this case, ErrorRecover is the last middleware in the chain + // and the Response created in the ErrorProducer is modified and sent back to the client. + println!("{} caught in ErrorRecover AfterMiddleware.", err.error); + match err.response.status { + Some(StatusCode::BAD_REQUEST) => Ok(err.response.set(StatusCode::OK)), + _ => Err(err), + } + } +} + +fn main() { + let mut chain = Chain::new(HelloWorldHandler); + chain.link_before(ErrorProducer); + chain.link_before(ErrorRecover); + + chain.link_after(ErrorProducer); + chain.link_after(ErrorRecover); + + Iron::new(chain).http("localhost:3000"); +} diff --git a/server/src/main.rs b/server/src/main.rs index dfa19ada..9f7f3460 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -9,11 +9,6 @@ extern crate r2d2; extern crate r2d2_postgres; extern crate fallible_iterator; -extern crate actix; -extern crate actix_cors; -extern crate actix_web; -extern crate actix_web_actors; - extern crate serde; extern crate serde_cbor; #[macro_use] extern crate serde_derive; @@ -24,6 +19,13 @@ extern crate fern; extern crate stripe; +extern crate iron; +extern crate bodyparser; +extern crate persistent; +extern crate router; +extern crate cookie; +extern crate tungstenite; + mod account; mod construct; mod effect; @@ -35,7 +37,8 @@ mod mob; mod mtx; mod names; mod net; -mod payments; +// mod payments; +mod pg; mod player; mod pubsub; mod rpc; @@ -46,8 +49,8 @@ mod vbox; mod warden; mod ws; +use std::thread::spawn; use dotenv::dotenv; -use net::{start}; fn setup_logger() -> Result<(), fern::InitError> { fern::Dispatch::new() @@ -73,6 +76,12 @@ fn main() { dotenv().ok(); setup_logger().unwrap(); + let pool = pg::create_pool(); + + let ws_pool = pool.clone(); + let http_pool = pool.clone(); + + spawn(move || net::start(http_pool)); + ws::start(ws_pool); info!("server started"); - start() } diff --git a/server/src/net.rs b/server/src/net.rs index cbab4b5b..ef4e78da 100644 --- a/server/src/net.rs +++ b/server/src/net.rs @@ -1,38 +1,33 @@ -use std::env; -use std::thread::{spawn, sleep}; -use std::time::{Duration}; -use actix_web::{middleware, web, App, HttpMessage, HttpRequest, HttpResponse, HttpServer}; -use actix_web::error::ResponseError; -use actix_web::http::{Cookie}; -use actix_web::cookie::{SameSite}; -use actix_cors::Cors; +use iron::status; +use iron::headers::{SetCookie}; +use iron::prelude::*; +use persistent::Read; +use iron::typemap::Key; +use router::Router; +use iron::{AfterMiddleware}; +use cookie::{Cookie, SameSite}; -use r2d2::{Pool}; -use r2d2::{PooledConnection}; -use r2d2_postgres::{TlsMode, PostgresConnectionManager}; +use chrono::Duration; -use warden::{warden}; -use pubsub::{pg_listen}; -use ws::{connect}; +use failure::Fail; + +// use warden::{warden}; +// use pubsub::{pg_listen}; +// use ws::{connect}; use account; -use payments::{post_stripe_event}; +use pg::PgPool; +// use payments::{post_stripe_event}; -pub type Db = PooledConnection; -pub type PgPool = Pool; - -const DB_POOL_SIZE: u32 = 20; - -#[derive(Debug,Clone,Serialize,Deserialize)] -pub struct JsonError { - pub err: String -} +pub const TOKEN_HEADER: &str = "x-auth-token"; #[derive(Fail, Debug, Serialize, Deserialize)] pub enum MnmlHttpError { // User Facing Errors #[fail(display="internal server error")] ServerError, + #[fail(display="internal server error")] + DbError, #[fail(display="unauthorized")] Unauthorized, #[fail(display="bad request")] @@ -45,67 +40,121 @@ pub enum MnmlHttpError { InvalidCode, } -impl ResponseError for MnmlHttpError { - fn error_response(&self) -> HttpResponse { - match *self { - MnmlHttpError::ServerError => HttpResponse::InternalServerError() - .json(JsonError { err: self.to_string() }), +impl From for IronError { + fn from(m_err: MnmlHttpError) -> Self { + let (err, res) = match m_err { + MnmlHttpError::ServerError => (m_err.compat(), status::InternalServerError), + MnmlHttpError::DbError => (m_err.compat(), status::InternalServerError), + MnmlHttpError::Unauthorized => (m_err.compat(), status::Unauthorized), + MnmlHttpError::BadRequest => (m_err.compat(), status::BadRequest), + MnmlHttpError::AccountNameTaken => (m_err.compat(), status::BadRequest), + MnmlHttpError::PasswordUnacceptable => (m_err.compat(), status::BadRequest), + MnmlHttpError::InvalidCode => (m_err.compat(), status::Unauthorized), + }; - MnmlHttpError::BadRequest => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), + IronError::new(err, res) + } +} - MnmlHttpError::Unauthorized => HttpResponse::Unauthorized() - .cookie(Cookie::build("x-auth-token", "") - // .secure(secure) +struct ErrorHandler; +impl AfterMiddleware for ErrorHandler { + fn catch(&self, _: &mut Request, mut err: IronError) -> IronResult { + + // on unauthorized we clear the auth token + match err.response.status { + Some(status::Unauthorized) => { + let v = Cookie::build(TOKEN_HEADER, "") .http_only(true) .same_site(SameSite::Strict) - .max_age(-1) // 1 week aligns with db set - .finish()) - .json(JsonError { err: self.to_string() }), + .max_age(Duration::seconds(-1)) // 1 week aligns with db set + .finish(); + err.response.headers.set(SetCookie(vec![v.to_string()])); + }, + _ => (), + }; - MnmlHttpError::AccountNameTaken => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), + warn!("{:?}", err); - MnmlHttpError::PasswordUnacceptable => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), + return Err(err); + } +} - MnmlHttpError::InvalidCode => HttpResponse::BadRequest() - .json(JsonError { err: self.to_string() }), +// fn logout(r: HttpRequest, state: web::Data) -> Result { +// match r.cookie("x-auth-token") { +// Some(t) => { +// let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; +// let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; +// match account::from_token(&mut tx, t.value().to_string()) { +// Ok(a) => { +// account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::Unauthorized))?; +// tx.commit().or(Err(MnmlHttpError::ServerError))?; +// return Ok(logout_res()); +// }, +// Err(_) => Err(MnmlHttpError::Unauthorized), +// } +// }, +// None => Err(MnmlHttpError::Unauthorized), +// } +// } + + +#[derive(Debug,Clone,Deserialize)] +struct RegisterBody { + name: String, + password: String, + code: String, +} + +fn register(req: &mut Request) -> IronResult { + let state = req.get::>().unwrap(); + let params = match req.get::>() { + Ok(Some(b)) => b, + _ => return Err(IronError::from(MnmlHttpError::BadRequest)), + }; + + let db = state.pool.get().or(Err(MnmlHttpError::DbError))?; + let mut tx = db.transaction().or(Err(MnmlHttpError::DbError))?; + + match account::create(¶ms.name, ¶ms.password, ¶ms.code, &mut tx) { + Ok(token) => { + tx.commit().or(Err(MnmlHttpError::ServerError))?; + Ok(login_res(token)) + }, + Err(e) => { + warn!("{:?}", e); + Err(IronError::from(MnmlHttpError::BadRequest)) } } } -fn login_res(token: String) -> HttpResponse { - HttpResponse::Ok() - .cookie(Cookie::build("x-auth-token", token) - // .secure(secure) - .http_only(true) - .same_site(SameSite::Strict) - .max_age(60 * 60 * 24 * 7) // 1 week aligns with db set - .finish()) - .finish() -} - -fn logout_res() -> HttpResponse { - HttpResponse::Ok() - .cookie(Cookie::build("x-auth-token", "") - // .secure(secure) - .http_only(true) - .same_site(SameSite::Strict) - .max_age(-1) - .finish()) - .finish() -} - -#[derive(Debug,Clone,Serialize,Deserialize)] -struct AccountLoginParams { +#[derive(Debug,Clone,Deserialize)] +struct LoginBody { name: String, password: String, } -fn login(state: web::Data, params: web::Json::) -> Result { - let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; +fn login_res(token: String) -> Response { + let v = Cookie::build(TOKEN_HEADER, token) + .http_only(true) + .same_site(SameSite::Strict) + .max_age(Duration::seconds(-1)) // 1 week aligns with db set + .finish(); + + let mut res = Response::with(status::Ok); + res.headers.set(SetCookie(vec![v.to_string()])); + + return res; +} + +fn login(req: &mut Request) -> IronResult { + let state = req.get::>().unwrap(); + let params = match req.get::>() { + Ok(Some(b)) => b, + _ => return Err(IronError::from(MnmlHttpError::BadRequest)), + }; + + let db = state.pool.get().or(Err(MnmlHttpError::DbError))?; + let mut tx = db.transaction().or(Err(MnmlHttpError::DbError))?; match account::login(&mut tx, ¶ms.name, ¶ms.password) { Ok(a) => { @@ -114,108 +163,28 @@ fn login(state: web::Data, params: web::Json::) -> Re Ok(login_res(token)) }, Err(e) => { - info!("{:?}", e); - Err(MnmlHttpError::Unauthorized) + warn!("{:?}", e); + Err(IronError::from(MnmlHttpError::Unauthorized)) } } } -fn logout(r: HttpRequest, state: web::Data) -> Result { - match r.cookie("x-auth-token") { - Some(t) => { - let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; - match account::from_token(&mut tx, t.value().to_string()) { - Ok(a) => { - account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::Unauthorized))?; - tx.commit().or(Err(MnmlHttpError::ServerError))?; - return Ok(logout_res()); - }, - Err(_) => Err(MnmlHttpError::Unauthorized), - } - }, - None => Err(MnmlHttpError::Unauthorized), - } -} - -#[derive(Debug,Clone,Serialize,Deserialize)] -struct AccountRegisterParams { - name: String, - password: String, - code: String, -} - -fn register(state: web::Data, params: web::Json::) -> Result { - let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; - - match account::create(¶ms.name, ¶ms.password, ¶ms.code, &mut tx) { - Ok(token) => { - tx.commit().or(Err(MnmlHttpError::ServerError))?; - Ok(login_res(token)) - }, - Err(e) => { - info!("{:?}", e); - Err(MnmlHttpError::BadRequest) - } - } -} - -fn create_pool(url: String) -> Pool { - let manager = PostgresConnectionManager::new(url, TlsMode::None) - .expect("could not instantiate pg manager"); - - Pool::builder() - .max_size(DB_POOL_SIZE) - .build(manager) - .expect("Failed to create pool.") -} +const MAX_BODY_LENGTH: usize = 1024 * 1024 * 10; pub struct State { pub pool: PgPool, // pub pubsub: PubSub, } -pub fn start() { - let database_url = env::var("DATABASE_URL") - .expect("DATABASE_URL must be set"); - let pool = create_pool(database_url); +impl Key for State { type Value = State; } - let warden_pool = pool.clone(); - spawn(move || { - loop { - let db_connection = warden_pool.get().expect("unable to get db connection"); - if let Err(e) = warden(db_connection) { - info!("{:?}", e); - } - sleep(Duration::new(1, 0)); - } - }); +pub fn start(pool: PgPool) { + let mut router = Router::new(); + router.post("/api/login", login, "login"); - let pubsub_pool = pool.clone(); - spawn(move || loop { - let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); - match pg_listen(pubsub_conn) { - Ok(_) => warn!("pg listen closed"), - Err(e) => warn!("pg_listen error {:?}", e), - } - }); - - HttpServer::new(move || App::new() - .data(State { pool: pool.clone() }) - .wrap(middleware::Logger::default()) - .wrap(Cors::new().supports_credentials()) - .service(web::resource("/api/login").route(web::post().to(login))) - .service(web::resource("/api/logout").route(web::post().to(logout))) - .service(web::resource("/api/register").route(web::post().to(register))) - - .service(web::resource("/api/payments/stripe") - .route(web::post().to(post_stripe_event))) - - // .service(web::resource("/api/payments/crypto") - // .route(web::post().to(post_stripe_payment))) - - .service(web::resource("/api/ws").route(web::get().to(connect)))) - .bind("127.0.0.1:40000").expect("could not bind to port") - .run().expect("could not start http server"); + let mut chain = Chain::new(router); + chain.link(Read::::both(State { pool })); + chain.link_before(Read::::one(MAX_BODY_LENGTH)); + chain.link_after(ErrorHandler); + Iron::new(chain).http("127.0.0.1:40000").unwrap(); } diff --git a/server/src/payments.rs b/server/src/payments.rs index fe8066e1..78ed1f7f 100644 --- a/server/src/payments.rs +++ b/server/src/payments.rs @@ -1,5 +1,4 @@ use uuid::Uuid; -use actix_web::{web, HttpResponse}; use postgres::transaction::Transaction; use failure::Error; diff --git a/server/src/pg.rs b/server/src/pg.rs index fc416eb9..0b0fe2db 100644 --- a/server/src/pg.rs +++ b/server/src/pg.rs @@ -1,12 +1,26 @@ +use std::env; + use r2d2::{Pool}; use r2d2::{PooledConnection}; -use r2d2_postgres::{PostgresConnectionManager}; +use r2d2_postgres::{TlsMode, PostgresConnectionManager}; + +// use postgres::transaction::Transaction; pub type Db = PooledConnection; pub type PgPool = Pool; -use postgres::transaction::Transaction; +const DB_POOL_SIZE: u32 = 20; -pub trait Pg { - fn persist(self, &mut Transaction) -> Self; + +pub fn create_pool() -> Pool { + let url = env::var("DATABASE_URL") + .expect("DATABASE_URL must be set"); + + let manager = PostgresConnectionManager::new(url, TlsMode::None) + .expect("could not instantiate pg manager"); + + Pool::builder() + .max_size(DB_POOL_SIZE) + .build(manager) + .expect("Failed to create pool.") } diff --git a/server/src/pubsub.rs b/server/src/pubsub.rs index afe6fe86..5dd31d16 100644 --- a/server/src/pubsub.rs +++ b/server/src/pubsub.rs @@ -2,7 +2,7 @@ use fallible_iterator::{FallibleIterator}; use postgres::error::Error; -use net::{Db}; +use pg::{Db}; pub fn pg_listen(connection: Db) -> Result<(), Error> { connection.execute("LISTEN events;", &[])?; diff --git a/server/src/rpc.rs b/server/src/rpc.rs index 2803109c..e7fee12f 100644 --- a/server/src/rpc.rs +++ b/server/src/rpc.rs @@ -1,14 +1,14 @@ +use std::net::TcpStream; use std::time::{Instant}; -use actix_web_actors::ws; +use tungstenite::protocol::WebSocket; use serde_cbor::{from_slice}; use uuid::Uuid; use failure::Error; use failure::err_msg; -use net::{Db}; -use ws::{MnmlSocket}; +use pg::{Db}; use construct::{Construct}; use game::{Game, game_state, game_skill, game_ready}; use account::{Account, account_constructs, account_instances}; @@ -19,8 +19,6 @@ use item::{Item, ItemInfoCtr, item_info}; use mtx; -type MnmlWs = ws::WebsocketContext; - #[derive(Debug,Clone,Serialize,Deserialize)] pub enum RpcResult { AccountState(Account), @@ -68,7 +66,7 @@ enum RpcRequest { VboxReclaim { instance_id: Uuid, index: usize }, } -pub fn receive(data: Vec, db: &Db, _client: &mut MnmlWs, begin: Instant, account: Option<&Account>) -> Result { +pub fn receive(data: Vec, db: &Db, _client: &mut WebSocket, begin: Instant, account: Option<&Account>) -> Result { // cast the msg to this type to receive method name match from_slice::(&data) { Ok(v) => { @@ -157,7 +155,6 @@ pub fn receive(data: Vec, db: &Db, _client: &mut MnmlWs, begin: Instant, acc return response; }, Err(e) => { - info!("{:?}", e); Err(format_err!("invalid message data={:?}", data)) }, } diff --git a/server/src/warden.rs b/server/src/warden.rs index eb07ebe1..658ce1e6 100644 --- a/server/src/warden.rs +++ b/server/src/warden.rs @@ -4,7 +4,7 @@ use failure::Error; use game::{games_need_upkeep, game_update, game_write, game_delete}; use instance::{instances_need_upkeep, instances_idle, instance_update, instance_delete}; -use net::{Db}; +use pg::{Db}; fn fetch_games(mut tx: Transaction) -> Result { let games = games_need_upkeep(&mut tx)?; diff --git a/server/src/ws.rs b/server/src/ws.rs index c3cc58c6..2b01c209 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -1,133 +1,75 @@ -use std::time::{Instant, Duration}; +use std::time::{Instant}; +use std::net::{TcpListener}; +use std::thread::{spawn}; -use actix_web::{web, HttpMessage, HttpRequest, HttpResponse}; -use actix_web_actors::ws; -use actix::prelude::*; +use tungstenite::server::accept_hdr; +use tungstenite::Message::Binary; +use tungstenite::handshake::server::Request; + +use failure::Error; +use failure::err_msg; -use account::{Account}; use serde_cbor::{to_vec}; -use net::{PgPool, State, MnmlHttpError, JsonError}; -use rpc::{receive, RpcResult}; -use account; +use net::TOKEN_HEADER; +use rpc::{receive}; +use pg::PgPool; -const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); -const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); - -pub struct MnmlSocket { - hb: Instant, - pool: PgPool, - account: Option, +#[derive(Debug,Clone,Serialize)] +struct RpcError { + err: String, } -impl Actor for MnmlSocket { - type Context = ws::WebsocketContext; +pub fn start(pool: PgPool) { + let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap(); + for stream in ws_server.incoming() { + let ws_pool = pool.clone(); + spawn(move || { + let cb = |req: &Request| { + let token = req.headers.find_first(TOKEN_HEADER); + println!("{:?}", token); + Ok(None) + }; + let mut websocket = accept_hdr(stream.unwrap(), cb).unwrap(); + loop { + match websocket.read_message() { + Ok(msg) => { + let begin = Instant::now(); + let db_connection = ws_pool.get() + .expect("unable to get db connection"); - // once the actor has been started this fn runs - // it starts the heartbeat interval and keepalive - fn started(&mut self, ctx: &mut Self::Context) { - self.hb(ctx); - } -} + let data = msg.into_data(); + match receive(data, &db_connection, &mut websocket, begin, None) { + Ok(reply) => { + let response = to_vec(&reply) + .expect("failed to serialize response"); -/// Handler for `ws::Message` -impl StreamHandler for MnmlSocket { - fn started(&mut self, ctx: &mut Self::Context) { - match self.account.as_ref() { - Some(a) => { - info!("user connected {:?}", a); - let account_state = to_vec(&RpcResult::AccountState(a.clone())) - .expect("could not serialize account state"); - ctx.binary(account_state) - }, - None => info!("new connection"), - } - } + if let Err(e) = websocket.write_message(Binary(response)) { + // connection closed + debug!("{:?}", e); + return; + }; + }, + Err(e) => { + warn!("{:?}", e); + let response = to_vec(&RpcError { err: e.to_string() }) + .expect("failed to serialize error response"); - fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) { - // process websocket messages - let begin = Instant::now(); - debug!("msg: {:?}", msg); - match msg { - ws::Message::Ping(msg) => { - self.hb = Instant::now(); - ctx.pong(&msg); - } - ws::Message::Pong(_) => { - self.hb = Instant::now(); - } - ws::Message::Text(_text) => (), - ws::Message::Close(_) => { - match self.account.as_ref() { - Some(a) => info!("disconnected {:?}", a), - None => info!("disconnected"), - } - ctx.stop(); - } - ws::Message::Nop => (), - ws::Message::Binary(bin) => { - let db_connection = self.pool.get().expect("unable to get db connection"); - match receive(bin.to_vec(), &db_connection, ctx, begin, self.account.as_ref()) { - Ok(reply) => { - let response = to_vec(&reply) - .expect("failed to serialize response"); - ctx.binary(response); + if let Err(e) = websocket.write_message(Binary(response)) { + // connection closed + debug!("{:?}", e); + return; + }; + } + } }, + // connection is closed Err(e) => { - let response = to_vec(&JsonError { err: e.to_string() }) - .expect("failed to serialize error response"); - ctx.binary(response); + debug!("{:?}", e); + return; } - } + }; } - } - } -} - -impl MnmlSocket { - fn new(state: web::Data, account: Option) -> MnmlSocket { - // idk why this has to be cloned again - // i guess because each socket is added as a new thread? - MnmlSocket { hb: Instant::now(), pool: state.pool.clone(), account } - } - - // starts the keepalive interval once actor started - fn hb(&self, ctx: &mut ::Context) { - ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { - if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { - info!("idle connection terminated"); - - // stop actor - ctx.stop(); - - // don't try to send a ping - return; - } - - ctx.ping(""); }); } } - -// idk how this stuff works -// but the args extract what you need from the incoming requests -// this grabs -// the req obj itself which we need for cookies -// the application state -// and the websocket stream -pub fn connect(r: HttpRequest, state: web::Data, stream: web::Payload) -> Result { - let account: Option = match r.cookie("x-auth-token") { - Some(t) => { - let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; - match account::from_token(&mut tx, t.value().to_string()) { - Ok(a) => Some(a), - Err(_) => None, - } - }, - None => None, - }; - - // state.pubsub.try_send(WsEvent(account.clone())).or(Err(MnmlHttpError::ServerError))?; - ws::start(MnmlSocket::new(state, account), &r, stream).or(Err(MnmlHttpError::ServerError)) -}