This commit is contained in:
ntr 2019-07-11 17:25:11 +10:00
parent 031c0f86f7
commit e06f97252b
11 changed files with 565 additions and 304 deletions

View File

@ -26,10 +26,13 @@ failure = "0.1"
log = "0.4" log = "0.4"
fern = "0.5" fern = "0.5"
actix = "0.8.2" iron = "0.6"
actix-web = "1.0.0" bodyparser = "0.8"
actix-web-actors = "1.0.0" persistent = "0.4"
actix-cors = "0.1.0" router = "0.6"
cookie = "0.12"
tungstenite = "0.8"
crossbeam-channel = "0.3"
stripe-rust = { version = "0.10.4", features = ["webhooks"] } stripe-rust = { version = "0.10.4", features = ["webhooks"] }

221
server/src/actixnet.rs Normal file
View File

@ -0,0 +1,221 @@
use std::env;
use std::thread::{spawn, sleep};
use std::time::{Duration};
use actix_web::{middleware, web, App, HttpMessage, HttpRequest, HttpResponse, HttpServer};
use actix_web::error::ResponseError;
use actix_web::http::{Cookie};
use actix_web::cookie::{SameSite};
use actix_cors::Cors;
use r2d2::{Pool};
use r2d2::{PooledConnection};
use r2d2_postgres::{TlsMode, PostgresConnectionManager};
use warden::{warden};
use pubsub::{pg_listen};
use ws::{connect};
use account;
use payments::{post_stripe_event};
pub type Db = PooledConnection<PostgresConnectionManager>;
pub type PgPool = Pool<PostgresConnectionManager>;
const DB_POOL_SIZE: u32 = 20;
#[derive(Debug,Clone,Serialize,Deserialize)]
pub struct JsonError {
pub err: String
}
#[derive(Fail, Debug, Serialize, Deserialize)]
pub enum MnmlHttpError {
// User Facing Errors
#[fail(display="internal server error")]
ServerError,
#[fail(display="unauthorized")]
Unauthorized,
#[fail(display="bad request")]
BadRequest,
#[fail(display="account name taken or invalid")]
AccountNameTaken,
#[fail(display="password unacceptable. must be > 11 characters")]
PasswordUnacceptable,
#[fail(display="invalid code. https://discord.gg/YJJgurM")]
InvalidCode,
}
impl ResponseError for MnmlHttpError {
fn error_response(&self) -> HttpResponse {
match *self {
MnmlHttpError::ServerError => HttpResponse::InternalServerError()
.json(JsonError { err: self.to_string() }),
MnmlHttpError::BadRequest => HttpResponse::BadRequest()
.json(JsonError { err: self.to_string() }),
MnmlHttpError::Unauthorized => HttpResponse::Unauthorized()
.cookie(Cookie::build("x-auth-token", "")
// .secure(secure)
.http_only(true)
.same_site(SameSite::Strict)
.max_age(-1) // 1 week aligns with db set
.finish())
.json(JsonError { err: self.to_string() }),
MnmlHttpError::AccountNameTaken => HttpResponse::BadRequest()
.json(JsonError { err: self.to_string() }),
MnmlHttpError::PasswordUnacceptable => HttpResponse::BadRequest()
.json(JsonError { err: self.to_string() }),
MnmlHttpError::InvalidCode => HttpResponse::BadRequest()
.json(JsonError { err: self.to_string() }),
}
}
}
fn login_res(token: String) -> HttpResponse {
HttpResponse::Ok()
.cookie(Cookie::build("x-auth-token", token)
// .secure(secure)
.http_only(true)
.same_site(SameSite::Strict)
.max_age(60 * 60 * 24 * 7) // 1 week aligns with db set
.finish())
.finish()
}
fn logout_res() -> HttpResponse {
HttpResponse::Ok()
.cookie(Cookie::build("x-auth-token", "")
// .secure(secure)
.http_only(true)
.same_site(SameSite::Strict)
.max_age(-1)
.finish())
.finish()
}
#[derive(Debug,Clone,Serialize,Deserialize)]
struct AccountLoginParams {
name: String,
password: String,
}
fn login(state: web::Data<State>, params: web::Json::<AccountLoginParams>) -> Result<HttpResponse, MnmlHttpError> {
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
match account::login(&mut tx, &params.name, &params.password) {
Ok(a) => {
let token = account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::ServerError))?;
tx.commit().or(Err(MnmlHttpError::ServerError))?;
Ok(login_res(token))
},
Err(e) => {
info!("{:?}", e);
Err(MnmlHttpError::Unauthorized)
}
}
}
fn logout(r: HttpRequest, state: web::Data<State>) -> Result<HttpResponse, MnmlHttpError> {
match r.cookie("x-auth-token") {
Some(t) => {
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
match account::from_token(&mut tx, t.value().to_string()) {
Ok(a) => {
account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::Unauthorized))?;
tx.commit().or(Err(MnmlHttpError::ServerError))?;
return Ok(logout_res());
},
Err(_) => Err(MnmlHttpError::Unauthorized),
}
},
None => Err(MnmlHttpError::Unauthorized),
}
}
#[derive(Debug,Clone,Serialize,Deserialize)]
struct AccountRegisterParams {
name: String,
password: String,
code: String,
}
fn register(state: web::Data<State>, params: web::Json::<AccountRegisterParams>) -> Result<HttpResponse, MnmlHttpError> {
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
match account::create(&params.name, &params.password, &params.code, &mut tx) {
Ok(token) => {
tx.commit().or(Err(MnmlHttpError::ServerError))?;
Ok(login_res(token))
},
Err(e) => {
info!("{:?}", e);
Err(MnmlHttpError::BadRequest)
}
}
}
fn create_pool(url: String) -> Pool<PostgresConnectionManager> {
let manager = PostgresConnectionManager::new(url, TlsMode::None)
.expect("could not instantiate pg manager");
Pool::builder()
.max_size(DB_POOL_SIZE)
.build(manager)
.expect("Failed to create pool.")
}
pub struct State {
pub pool: PgPool,
// pub pubsub: PubSub,
}
pub fn start() {
let database_url = env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
let pool = create_pool(database_url);
let warden_pool = pool.clone();
spawn(move || {
loop {
let db_connection = warden_pool.get().expect("unable to get db connection");
if let Err(e) = warden(db_connection) {
info!("{:?}", e);
}
sleep(Duration::new(1, 0));
}
});
let pubsub_pool = pool.clone();
spawn(move || loop {
let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection");
match pg_listen(pubsub_conn) {
Ok(_) => warn!("pg listen closed"),
Err(e) => warn!("pg_listen error {:?}", e),
}
});
HttpServer::new(move || App::new()
.data(State { pool: pool.clone() })
.wrap(middleware::Logger::default())
.wrap(Cors::new().supports_credentials())
.service(web::resource("/api/login").route(web::post().to(login)))
.service(web::resource("/api/logout").route(web::post().to(logout)))
.service(web::resource("/api/register").route(web::post().to(register)))
.service(web::resource("/api/payments/stripe")
.route(web::post().to(post_stripe_event)))
// .service(web::resource("/api/payments/crypto")
// .route(web::post().to(post_stripe_payment)))
.service(web::resource("/api/ws").route(web::get().to(connect))))
.bind("127.0.0.1:40000").expect("could not bind to port")
.run().expect("could not start http server");
}

107
server/src/error.rs Normal file
View File

@ -0,0 +1,107 @@
// This example illustrates the error flow of a Request in the middleware Chain.
// Here is the chain used and the path of the request through the middleware pieces:
//
// Normal Flow : __[ErrorProducer::before]__ [ErrorRecover::before] __[handle::HelloWorldHandler]__[ErrorProducer::after]__ [ErrorRecover::after] __ ...
// Error Flow : [ErrorProducer::catch ] |__[ErrorRecover::catch ]__| [ErrorProducer::catch] |__[ErrorRecover::catch]__|
//
// --------------- BEFORE MIDDLEWARE ----------------- || --------- HANDLER -------- || ---------------- AFTER MIDDLEWARE --------------
extern crate iron;
use iron::prelude::*;
use iron::StatusCode;
use iron::{AfterMiddleware, BeforeMiddleware, Handler};
use std::error::Error;
use std::fmt::{self, Debug};
struct HelloWorldHandler;
struct ErrorProducer;
struct ErrorRecover;
#[derive(Debug)]
struct StringError(String);
impl fmt::Display for StringError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Debug::fmt(self, f)
}
}
impl Error for StringError {
fn description(&self) -> &str {
&*self.0
}
}
impl Handler for HelloWorldHandler {
fn handle(&self, _: &mut Request) -> IronResult<Response> {
// This will be called since we are in the normal flow before reaching the Handler.
// However, the AfterMiddleware chain will override the Response.
println!("The HelloWorldHandler has been called !");
Ok(Response::with((StatusCode::OK, "Hello world !")))
}
}
impl BeforeMiddleware for ErrorProducer {
fn before(&self, _: &mut Request) -> IronResult<()> {
// The error produced here switches to the error flow.
// The catch method of following middleware pieces will be called.
// The Handler will be skipped unless the error is handled by another middleware piece.
// IronError::error tells the next middleware what went wrong.
// IronError::response is the Response that will be sent back to the client if this error is not handled.
// Here status::BadRequest acts as modifier, thus we can put more there than just a status.
Err(IronError::new(
StringError("Error in ErrorProducer BeforeMiddleware".to_string()),
StatusCode::BAD_REQUEST,
))
}
}
impl AfterMiddleware for ErrorProducer {
fn after(&self, _: &mut Request, _: Response) -> IronResult<Response> {
// The behavior here is the same as in ErrorProducer::before.
// The previous response (from the Handler) is discarded and replaced with a new response (created from the modifier).
Err(IronError::new(
StringError("Error in ErrorProducer AfterMiddleware".to_string()),
(StatusCode::BAD_REQUEST, "Response created in ErrorProducer"),
))
}
}
impl BeforeMiddleware for ErrorRecover {
fn catch(&self, _: &mut Request, err: IronError) -> IronResult<()> {
// We can use the IronError from previous middleware to decide what to do.
// Returning Ok() from a catch method resumes the normal flow and
// passes the Request forward to the next middleware piece in the chain (here the HelloWorldHandler).
println!("{} caught in ErrorRecover BeforeMiddleware.", err.error);
match err.response.status {
Some(StatusCode::BAD_REQUEST) => Ok(()),
_ => Err(err),
}
}
}
impl AfterMiddleware for ErrorRecover {
fn catch(&self, _: &mut Request, err: IronError) -> IronResult<Response> {
// Just like in the BeforeMiddleware, we can return Ok(Response) here to return to the normal flow.
// In this case, ErrorRecover is the last middleware in the chain
// and the Response created in the ErrorProducer is modified and sent back to the client.
println!("{} caught in ErrorRecover AfterMiddleware.", err.error);
match err.response.status {
Some(StatusCode::BAD_REQUEST) => Ok(err.response.set(StatusCode::OK)),
_ => Err(err),
}
}
}
fn main() {
let mut chain = Chain::new(HelloWorldHandler);
chain.link_before(ErrorProducer);
chain.link_before(ErrorRecover);
chain.link_after(ErrorProducer);
chain.link_after(ErrorRecover);
Iron::new(chain).http("localhost:3000");
}

View File

@ -9,11 +9,6 @@ extern crate r2d2;
extern crate r2d2_postgres; extern crate r2d2_postgres;
extern crate fallible_iterator; extern crate fallible_iterator;
extern crate actix;
extern crate actix_cors;
extern crate actix_web;
extern crate actix_web_actors;
extern crate serde; extern crate serde;
extern crate serde_cbor; extern crate serde_cbor;
#[macro_use] extern crate serde_derive; #[macro_use] extern crate serde_derive;
@ -24,6 +19,13 @@ extern crate fern;
extern crate stripe; extern crate stripe;
extern crate iron;
extern crate bodyparser;
extern crate persistent;
extern crate router;
extern crate cookie;
extern crate tungstenite;
mod account; mod account;
mod construct; mod construct;
mod effect; mod effect;
@ -35,7 +37,8 @@ mod mob;
mod mtx; mod mtx;
mod names; mod names;
mod net; mod net;
mod payments; // mod payments;
mod pg;
mod player; mod player;
mod pubsub; mod pubsub;
mod rpc; mod rpc;
@ -46,8 +49,8 @@ mod vbox;
mod warden; mod warden;
mod ws; mod ws;
use std::thread::spawn;
use dotenv::dotenv; use dotenv::dotenv;
use net::{start};
fn setup_logger() -> Result<(), fern::InitError> { fn setup_logger() -> Result<(), fern::InitError> {
fern::Dispatch::new() fern::Dispatch::new()
@ -73,6 +76,12 @@ fn main() {
dotenv().ok(); dotenv().ok();
setup_logger().unwrap(); setup_logger().unwrap();
let pool = pg::create_pool();
let ws_pool = pool.clone();
let http_pool = pool.clone();
spawn(move || net::start(http_pool));
ws::start(ws_pool);
info!("server started"); info!("server started");
start()
} }

View File

@ -1,38 +1,33 @@
use std::env;
use std::thread::{spawn, sleep};
use std::time::{Duration};
use actix_web::{middleware, web, App, HttpMessage, HttpRequest, HttpResponse, HttpServer}; use iron::status;
use actix_web::error::ResponseError; use iron::headers::{SetCookie};
use actix_web::http::{Cookie}; use iron::prelude::*;
use actix_web::cookie::{SameSite}; use persistent::Read;
use actix_cors::Cors; use iron::typemap::Key;
use router::Router;
use iron::{AfterMiddleware};
use cookie::{Cookie, SameSite};
use r2d2::{Pool}; use chrono::Duration;
use r2d2::{PooledConnection};
use r2d2_postgres::{TlsMode, PostgresConnectionManager};
use warden::{warden}; use failure::Fail;
use pubsub::{pg_listen};
use ws::{connect}; // use warden::{warden};
// use pubsub::{pg_listen};
// use ws::{connect};
use account; use account;
use payments::{post_stripe_event}; use pg::PgPool;
// use payments::{post_stripe_event};
pub type Db = PooledConnection<PostgresConnectionManager>; pub const TOKEN_HEADER: &str = "x-auth-token";
pub type PgPool = Pool<PostgresConnectionManager>;
const DB_POOL_SIZE: u32 = 20;
#[derive(Debug,Clone,Serialize,Deserialize)]
pub struct JsonError {
pub err: String
}
#[derive(Fail, Debug, Serialize, Deserialize)] #[derive(Fail, Debug, Serialize, Deserialize)]
pub enum MnmlHttpError { pub enum MnmlHttpError {
// User Facing Errors // User Facing Errors
#[fail(display="internal server error")] #[fail(display="internal server error")]
ServerError, ServerError,
#[fail(display="internal server error")]
DbError,
#[fail(display="unauthorized")] #[fail(display="unauthorized")]
Unauthorized, Unauthorized,
#[fail(display="bad request")] #[fail(display="bad request")]
@ -45,67 +40,121 @@ pub enum MnmlHttpError {
InvalidCode, InvalidCode,
} }
impl ResponseError for MnmlHttpError { impl From<MnmlHttpError> for IronError {
fn error_response(&self) -> HttpResponse { fn from(m_err: MnmlHttpError) -> Self {
match *self { let (err, res) = match m_err {
MnmlHttpError::ServerError => HttpResponse::InternalServerError() MnmlHttpError::ServerError => (m_err.compat(), status::InternalServerError),
.json(JsonError { err: self.to_string() }), MnmlHttpError::DbError => (m_err.compat(), status::InternalServerError),
MnmlHttpError::Unauthorized => (m_err.compat(), status::Unauthorized),
MnmlHttpError::BadRequest => (m_err.compat(), status::BadRequest),
MnmlHttpError::AccountNameTaken => (m_err.compat(), status::BadRequest),
MnmlHttpError::PasswordUnacceptable => (m_err.compat(), status::BadRequest),
MnmlHttpError::InvalidCode => (m_err.compat(), status::Unauthorized),
};
MnmlHttpError::BadRequest => HttpResponse::BadRequest() IronError::new(err, res)
.json(JsonError { err: self.to_string() }), }
}
MnmlHttpError::Unauthorized => HttpResponse::Unauthorized() struct ErrorHandler;
.cookie(Cookie::build("x-auth-token", "") impl AfterMiddleware for ErrorHandler {
// .secure(secure) fn catch(&self, _: &mut Request, mut err: IronError) -> IronResult<Response> {
// on unauthorized we clear the auth token
match err.response.status {
Some(status::Unauthorized) => {
let v = Cookie::build(TOKEN_HEADER, "")
.http_only(true) .http_only(true)
.same_site(SameSite::Strict) .same_site(SameSite::Strict)
.max_age(-1) // 1 week aligns with db set .max_age(Duration::seconds(-1)) // 1 week aligns with db set
.finish()) .finish();
.json(JsonError { err: self.to_string() }), err.response.headers.set(SetCookie(vec![v.to_string()]));
},
_ => (),
};
MnmlHttpError::AccountNameTaken => HttpResponse::BadRequest() warn!("{:?}", err);
.json(JsonError { err: self.to_string() }),
MnmlHttpError::PasswordUnacceptable => HttpResponse::BadRequest() return Err(err);
.json(JsonError { err: self.to_string() }), }
}
MnmlHttpError::InvalidCode => HttpResponse::BadRequest() // fn logout(r: HttpRequest, state: web::Data<State>) -> Result<HttpResponse, MnmlHttpError> {
.json(JsonError { err: self.to_string() }), // match r.cookie("x-auth-token") {
// Some(t) => {
// let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
// let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
// match account::from_token(&mut tx, t.value().to_string()) {
// Ok(a) => {
// account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::Unauthorized))?;
// tx.commit().or(Err(MnmlHttpError::ServerError))?;
// return Ok(logout_res());
// },
// Err(_) => Err(MnmlHttpError::Unauthorized),
// }
// },
// None => Err(MnmlHttpError::Unauthorized),
// }
// }
#[derive(Debug,Clone,Deserialize)]
struct RegisterBody {
name: String,
password: String,
code: String,
}
fn register(req: &mut Request) -> IronResult<Response> {
let state = req.get::<Read<State>>().unwrap();
let params = match req.get::<bodyparser::Struct<RegisterBody>>() {
Ok(Some(b)) => b,
_ => return Err(IronError::from(MnmlHttpError::BadRequest)),
};
let db = state.pool.get().or(Err(MnmlHttpError::DbError))?;
let mut tx = db.transaction().or(Err(MnmlHttpError::DbError))?;
match account::create(&params.name, &params.password, &params.code, &mut tx) {
Ok(token) => {
tx.commit().or(Err(MnmlHttpError::ServerError))?;
Ok(login_res(token))
},
Err(e) => {
warn!("{:?}", e);
Err(IronError::from(MnmlHttpError::BadRequest))
} }
} }
} }
fn login_res(token: String) -> HttpResponse { #[derive(Debug,Clone,Deserialize)]
HttpResponse::Ok() struct LoginBody {
.cookie(Cookie::build("x-auth-token", token)
// .secure(secure)
.http_only(true)
.same_site(SameSite::Strict)
.max_age(60 * 60 * 24 * 7) // 1 week aligns with db set
.finish())
.finish()
}
fn logout_res() -> HttpResponse {
HttpResponse::Ok()
.cookie(Cookie::build("x-auth-token", "")
// .secure(secure)
.http_only(true)
.same_site(SameSite::Strict)
.max_age(-1)
.finish())
.finish()
}
#[derive(Debug,Clone,Serialize,Deserialize)]
struct AccountLoginParams {
name: String, name: String,
password: String, password: String,
} }
fn login(state: web::Data<State>, params: web::Json::<AccountLoginParams>) -> Result<HttpResponse, MnmlHttpError> { fn login_res(token: String) -> Response {
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; let v = Cookie::build(TOKEN_HEADER, token)
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; .http_only(true)
.same_site(SameSite::Strict)
.max_age(Duration::seconds(-1)) // 1 week aligns with db set
.finish();
let mut res = Response::with(status::Ok);
res.headers.set(SetCookie(vec![v.to_string()]));
return res;
}
fn login(req: &mut Request) -> IronResult<Response> {
let state = req.get::<Read<State>>().unwrap();
let params = match req.get::<bodyparser::Struct<LoginBody>>() {
Ok(Some(b)) => b,
_ => return Err(IronError::from(MnmlHttpError::BadRequest)),
};
let db = state.pool.get().or(Err(MnmlHttpError::DbError))?;
let mut tx = db.transaction().or(Err(MnmlHttpError::DbError))?;
match account::login(&mut tx, &params.name, &params.password) { match account::login(&mut tx, &params.name, &params.password) {
Ok(a) => { Ok(a) => {
@ -114,108 +163,28 @@ fn login(state: web::Data<State>, params: web::Json::<AccountLoginParams>) -> Re
Ok(login_res(token)) Ok(login_res(token))
}, },
Err(e) => { Err(e) => {
info!("{:?}", e); warn!("{:?}", e);
Err(MnmlHttpError::Unauthorized) Err(IronError::from(MnmlHttpError::Unauthorized))
} }
} }
} }
fn logout(r: HttpRequest, state: web::Data<State>) -> Result<HttpResponse, MnmlHttpError> { const MAX_BODY_LENGTH: usize = 1024 * 1024 * 10;
match r.cookie("x-auth-token") {
Some(t) => {
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
match account::from_token(&mut tx, t.value().to_string()) {
Ok(a) => {
account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::Unauthorized))?;
tx.commit().or(Err(MnmlHttpError::ServerError))?;
return Ok(logout_res());
},
Err(_) => Err(MnmlHttpError::Unauthorized),
}
},
None => Err(MnmlHttpError::Unauthorized),
}
}
#[derive(Debug,Clone,Serialize,Deserialize)]
struct AccountRegisterParams {
name: String,
password: String,
code: String,
}
fn register(state: web::Data<State>, params: web::Json::<AccountRegisterParams>) -> Result<HttpResponse, MnmlHttpError> {
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
match account::create(&params.name, &params.password, &params.code, &mut tx) {
Ok(token) => {
tx.commit().or(Err(MnmlHttpError::ServerError))?;
Ok(login_res(token))
},
Err(e) => {
info!("{:?}", e);
Err(MnmlHttpError::BadRequest)
}
}
}
fn create_pool(url: String) -> Pool<PostgresConnectionManager> {
let manager = PostgresConnectionManager::new(url, TlsMode::None)
.expect("could not instantiate pg manager");
Pool::builder()
.max_size(DB_POOL_SIZE)
.build(manager)
.expect("Failed to create pool.")
}
pub struct State { pub struct State {
pub pool: PgPool, pub pool: PgPool,
// pub pubsub: PubSub, // pub pubsub: PubSub,
} }
pub fn start() { impl Key for State { type Value = State; }
let database_url = env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
let pool = create_pool(database_url);
let warden_pool = pool.clone(); pub fn start(pool: PgPool) {
spawn(move || { let mut router = Router::new();
loop { router.post("/api/login", login, "login");
let db_connection = warden_pool.get().expect("unable to get db connection");
if let Err(e) = warden(db_connection) {
info!("{:?}", e);
}
sleep(Duration::new(1, 0));
}
});
let pubsub_pool = pool.clone(); let mut chain = Chain::new(router);
spawn(move || loop { chain.link(Read::<State>::both(State { pool }));
let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); chain.link_before(Read::<bodyparser::MaxBodyLength>::one(MAX_BODY_LENGTH));
match pg_listen(pubsub_conn) { chain.link_after(ErrorHandler);
Ok(_) => warn!("pg listen closed"), Iron::new(chain).http("127.0.0.1:40000").unwrap();
Err(e) => warn!("pg_listen error {:?}", e),
}
});
HttpServer::new(move || App::new()
.data(State { pool: pool.clone() })
.wrap(middleware::Logger::default())
.wrap(Cors::new().supports_credentials())
.service(web::resource("/api/login").route(web::post().to(login)))
.service(web::resource("/api/logout").route(web::post().to(logout)))
.service(web::resource("/api/register").route(web::post().to(register)))
.service(web::resource("/api/payments/stripe")
.route(web::post().to(post_stripe_event)))
// .service(web::resource("/api/payments/crypto")
// .route(web::post().to(post_stripe_payment)))
.service(web::resource("/api/ws").route(web::get().to(connect))))
.bind("127.0.0.1:40000").expect("could not bind to port")
.run().expect("could not start http server");
} }

View File

@ -1,5 +1,4 @@
use uuid::Uuid; use uuid::Uuid;
use actix_web::{web, HttpResponse};
use postgres::transaction::Transaction; use postgres::transaction::Transaction;
use failure::Error; use failure::Error;

View File

@ -1,12 +1,26 @@
use std::env;
use r2d2::{Pool}; use r2d2::{Pool};
use r2d2::{PooledConnection}; use r2d2::{PooledConnection};
use r2d2_postgres::{PostgresConnectionManager}; use r2d2_postgres::{TlsMode, PostgresConnectionManager};
// use postgres::transaction::Transaction;
pub type Db = PooledConnection<PostgresConnectionManager>; pub type Db = PooledConnection<PostgresConnectionManager>;
pub type PgPool = Pool<PostgresConnectionManager>; pub type PgPool = Pool<PostgresConnectionManager>;
use postgres::transaction::Transaction; const DB_POOL_SIZE: u32 = 20;
pub trait Pg {
fn persist(self, &mut Transaction) -> Self; pub fn create_pool() -> Pool<PostgresConnectionManager> {
let url = env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
let manager = PostgresConnectionManager::new(url, TlsMode::None)
.expect("could not instantiate pg manager");
Pool::builder()
.max_size(DB_POOL_SIZE)
.build(manager)
.expect("Failed to create pool.")
} }

View File

@ -2,7 +2,7 @@
use fallible_iterator::{FallibleIterator}; use fallible_iterator::{FallibleIterator};
use postgres::error::Error; use postgres::error::Error;
use net::{Db}; use pg::{Db};
pub fn pg_listen(connection: Db) -> Result<(), Error> { pub fn pg_listen(connection: Db) -> Result<(), Error> {
connection.execute("LISTEN events;", &[])?; connection.execute("LISTEN events;", &[])?;

View File

@ -1,14 +1,14 @@
use std::net::TcpStream;
use std::time::{Instant}; use std::time::{Instant};
use actix_web_actors::ws; use tungstenite::protocol::WebSocket;
use serde_cbor::{from_slice}; use serde_cbor::{from_slice};
use uuid::Uuid; use uuid::Uuid;
use failure::Error; use failure::Error;
use failure::err_msg; use failure::err_msg;
use net::{Db}; use pg::{Db};
use ws::{MnmlSocket};
use construct::{Construct}; use construct::{Construct};
use game::{Game, game_state, game_skill, game_ready}; use game::{Game, game_state, game_skill, game_ready};
use account::{Account, account_constructs, account_instances}; use account::{Account, account_constructs, account_instances};
@ -19,8 +19,6 @@ use item::{Item, ItemInfoCtr, item_info};
use mtx; use mtx;
type MnmlWs = ws::WebsocketContext<MnmlSocket>;
#[derive(Debug,Clone,Serialize,Deserialize)] #[derive(Debug,Clone,Serialize,Deserialize)]
pub enum RpcResult { pub enum RpcResult {
AccountState(Account), AccountState(Account),
@ -68,7 +66,7 @@ enum RpcRequest {
VboxReclaim { instance_id: Uuid, index: usize }, VboxReclaim { instance_id: Uuid, index: usize },
} }
pub fn receive(data: Vec<u8>, db: &Db, _client: &mut MnmlWs, begin: Instant, account: Option<&Account>) -> Result<RpcResult, Error> { pub fn receive(data: Vec<u8>, db: &Db, _client: &mut WebSocket<TcpStream>, begin: Instant, account: Option<&Account>) -> Result<RpcResult, Error> {
// cast the msg to this type to receive method name // cast the msg to this type to receive method name
match from_slice::<RpcRequest>(&data) { match from_slice::<RpcRequest>(&data) {
Ok(v) => { Ok(v) => {
@ -157,7 +155,6 @@ pub fn receive(data: Vec<u8>, db: &Db, _client: &mut MnmlWs, begin: Instant, acc
return response; return response;
}, },
Err(e) => { Err(e) => {
info!("{:?}", e);
Err(format_err!("invalid message data={:?}", data)) Err(format_err!("invalid message data={:?}", data))
}, },
} }

View File

@ -4,7 +4,7 @@ use failure::Error;
use game::{games_need_upkeep, game_update, game_write, game_delete}; use game::{games_need_upkeep, game_update, game_write, game_delete};
use instance::{instances_need_upkeep, instances_idle, instance_update, instance_delete}; use instance::{instances_need_upkeep, instances_idle, instance_update, instance_delete};
use net::{Db}; use pg::{Db};
fn fetch_games(mut tx: Transaction) -> Result<Transaction, Error> { fn fetch_games(mut tx: Transaction) -> Result<Transaction, Error> {
let games = games_need_upkeep(&mut tx)?; let games = games_need_upkeep(&mut tx)?;

View File

@ -1,133 +1,75 @@
use std::time::{Instant, Duration}; use std::time::{Instant};
use std::net::{TcpListener};
use std::thread::{spawn};
use actix_web::{web, HttpMessage, HttpRequest, HttpResponse}; use tungstenite::server::accept_hdr;
use actix_web_actors::ws; use tungstenite::Message::Binary;
use actix::prelude::*; use tungstenite::handshake::server::Request;
use failure::Error;
use failure::err_msg;
use account::{Account};
use serde_cbor::{to_vec}; use serde_cbor::{to_vec};
use net::{PgPool, State, MnmlHttpError, JsonError};
use rpc::{receive, RpcResult}; use net::TOKEN_HEADER;
use account; use rpc::{receive};
use pg::PgPool;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); #[derive(Debug,Clone,Serialize)]
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); struct RpcError {
err: String,
pub struct MnmlSocket {
hb: Instant,
pool: PgPool,
account: Option<Account>,
} }
impl Actor for MnmlSocket { pub fn start(pool: PgPool) {
type Context = ws::WebsocketContext<Self>; let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap();
for stream in ws_server.incoming() {
// once the actor has been started this fn runs let ws_pool = pool.clone();
// it starts the heartbeat interval and keepalive spawn(move || {
fn started(&mut self, ctx: &mut Self::Context) { let cb = |req: &Request| {
self.hb(ctx); let token = req.headers.find_first(TOKEN_HEADER);
} println!("{:?}", token);
} Ok(None)
};
/// Handler for `ws::Message` let mut websocket = accept_hdr(stream.unwrap(), cb).unwrap();
impl StreamHandler<ws::Message, ws::ProtocolError> for MnmlSocket { loop {
fn started(&mut self, ctx: &mut Self::Context) { match websocket.read_message() {
match self.account.as_ref() { Ok(msg) => {
Some(a) => {
info!("user connected {:?}", a);
let account_state = to_vec(&RpcResult::AccountState(a.clone()))
.expect("could not serialize account state");
ctx.binary(account_state)
},
None => info!("new connection"),
}
}
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
// process websocket messages
let begin = Instant::now(); let begin = Instant::now();
debug!("msg: {:?}", msg); let db_connection = ws_pool.get()
match msg { .expect("unable to get db connection");
ws::Message::Ping(msg) => {
self.hb = Instant::now(); let data = msg.into_data();
ctx.pong(&msg); match receive(data, &db_connection, &mut websocket, begin, None) {
}
ws::Message::Pong(_) => {
self.hb = Instant::now();
}
ws::Message::Text(_text) => (),
ws::Message::Close(_) => {
match self.account.as_ref() {
Some(a) => info!("disconnected {:?}", a),
None => info!("disconnected"),
}
ctx.stop();
}
ws::Message::Nop => (),
ws::Message::Binary(bin) => {
let db_connection = self.pool.get().expect("unable to get db connection");
match receive(bin.to_vec(), &db_connection, ctx, begin, self.account.as_ref()) {
Ok(reply) => { Ok(reply) => {
let response = to_vec(&reply) let response = to_vec(&reply)
.expect("failed to serialize response"); .expect("failed to serialize response");
ctx.binary(response);
if let Err(e) = websocket.write_message(Binary(response)) {
// connection closed
debug!("{:?}", e);
return;
};
}, },
Err(e) => { Err(e) => {
let response = to_vec(&JsonError { err: e.to_string() }) warn!("{:?}", e);
let response = to_vec(&RpcError { err: e.to_string() })
.expect("failed to serialize error response"); .expect("failed to serialize error response");
ctx.binary(response);
}
}
}
}
}
}
impl MnmlSocket { if let Err(e) = websocket.write_message(Binary(response)) {
fn new(state: web::Data<State>, account: Option<Account>) -> MnmlSocket { // connection closed
// idk why this has to be cloned again debug!("{:?}", e);
// i guess because each socket is added as a new thread? return;
MnmlSocket { hb: Instant::now(), pool: state.pool.clone(), account } };
} }
}
// starts the keepalive interval once actor started },
fn hb(&self, ctx: &mut <MnmlSocket as Actor>::Context) { // connection is closed
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { Err(e) => {
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { debug!("{:?}", e);
info!("idle connection terminated");
// stop actor
ctx.stop();
// don't try to send a ping
return; return;
} }
};
ctx.ping(""); }
}); });
} }
} }
// idk how this stuff works
// but the args extract what you need from the incoming requests
// this grabs
// the req obj itself which we need for cookies
// the application state
// and the websocket stream
pub fn connect(r: HttpRequest, state: web::Data<State>, stream: web::Payload) -> Result<HttpResponse, MnmlHttpError> {
let account: Option<Account> = match r.cookie("x-auth-token") {
Some(t) => {
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
match account::from_token(&mut tx, t.value().to_string()) {
Ok(a) => Some(a),
Err(_) => None,
}
},
None => None,
};
// state.pubsub.try_send(WsEvent(account.clone())).or(Err(MnmlHttpError::ServerError))?;
ws::start(MnmlSocket::new(state, account), &r, stream).or(Err(MnmlHttpError::ServerError))
}