This commit is contained in:
ntr 2019-07-25 14:33:03 +10:00
parent 5851f20211
commit a77bec2c9f
6 changed files with 217 additions and 418 deletions

View File

@ -1,221 +0,0 @@
use std::env;
use std::thread::{spawn, sleep};
use std::time::{Duration};
use actix_web::{middleware, web, App, HttpMessage, HttpRequest, HttpResponse, HttpServer};
use actix_web::error::ResponseError;
use actix_web::http::{Cookie};
use actix_web::cookie::{SameSite};
use actix_cors::Cors;
use r2d2::{Pool};
use r2d2::{PooledConnection};
use r2d2_postgres::{TlsMode, PostgresConnectionManager};
use warden::{warden};
use pubsub::{pg_listen};
use ws::{connect};
use account;
use payments::{post_stripe_event};
pub type Db = PooledConnection<PostgresConnectionManager>;
pub type PgPool = Pool<PostgresConnectionManager>;
const DB_POOL_SIZE: u32 = 20;
#[derive(Debug,Clone,Serialize,Deserialize)]
pub struct JsonError {
pub err: String
}
#[derive(Fail, Debug, Serialize, Deserialize)]
pub enum MnmlHttpError {
// User Facing Errors
#[fail(display="internal server error")]
ServerError,
#[fail(display="unauthorized")]
Unauthorized,
#[fail(display="bad request")]
BadRequest,
#[fail(display="account name taken or invalid")]
AccountNameTaken,
#[fail(display="password unacceptable. must be > 11 characters")]
PasswordUnacceptable,
#[fail(display="invalid code. https://discord.gg/YJJgurM")]
InvalidCode,
}
impl ResponseError for MnmlHttpError {
fn error_response(&self) -> HttpResponse {
match *self {
MnmlHttpError::ServerError => HttpResponse::InternalServerError()
.json(JsonError { err: self.to_string() }),
MnmlHttpError::BadRequest => HttpResponse::BadRequest()
.json(JsonError { err: self.to_string() }),
MnmlHttpError::Unauthorized => HttpResponse::Unauthorized()
.cookie(Cookie::build("x-auth-token", "")
// .secure(secure)
.http_only(true)
.same_site(SameSite::Strict)
.max_age(-1) // 1 week aligns with db set
.finish())
.json(JsonError { err: self.to_string() }),
MnmlHttpError::AccountNameTaken => HttpResponse::BadRequest()
.json(JsonError { err: self.to_string() }),
MnmlHttpError::PasswordUnacceptable => HttpResponse::BadRequest()
.json(JsonError { err: self.to_string() }),
MnmlHttpError::InvalidCode => HttpResponse::BadRequest()
.json(JsonError { err: self.to_string() }),
}
}
}
fn login_res(token: String) -> HttpResponse {
HttpResponse::Ok()
.cookie(Cookie::build("x-auth-token", token)
// .secure(secure)
.http_only(true)
.same_site(SameSite::Strict)
.max_age(60 * 60 * 24 * 7) // 1 week aligns with db set
.finish())
.finish()
}
fn logout_res() -> HttpResponse {
HttpResponse::Ok()
.cookie(Cookie::build("x-auth-token", "")
// .secure(secure)
.http_only(true)
.same_site(SameSite::Strict)
.max_age(-1)
.finish())
.finish()
}
#[derive(Debug,Clone,Serialize,Deserialize)]
struct AccountLoginParams {
name: String,
password: String,
}
fn login(state: web::Data<State>, params: web::Json::<AccountLoginParams>) -> Result<HttpResponse, MnmlHttpError> {
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
match account::login(&mut tx, &params.name, &params.password) {
Ok(a) => {
let token = account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::ServerError))?;
tx.commit().or(Err(MnmlHttpError::ServerError))?;
Ok(login_res(token))
},
Err(e) => {
info!("{:?}", e);
Err(MnmlHttpError::Unauthorized)
}
}
}
fn logout(r: HttpRequest, state: web::Data<State>) -> Result<HttpResponse, MnmlHttpError> {
match r.cookie("x-auth-token") {
Some(t) => {
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
match account::from_token(&mut tx, t.value().to_string()) {
Ok(a) => {
account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::Unauthorized))?;
tx.commit().or(Err(MnmlHttpError::ServerError))?;
return Ok(logout_res());
},
Err(_) => Err(MnmlHttpError::Unauthorized),
}
},
None => Err(MnmlHttpError::Unauthorized),
}
}
#[derive(Debug,Clone,Serialize,Deserialize)]
struct AccountRegisterParams {
name: String,
password: String,
code: String,
}
fn register(state: web::Data<State>, params: web::Json::<AccountRegisterParams>) -> Result<HttpResponse, MnmlHttpError> {
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
match account::create(&params.name, &params.password, &params.code, &mut tx) {
Ok(token) => {
tx.commit().or(Err(MnmlHttpError::ServerError))?;
Ok(login_res(token))
},
Err(e) => {
info!("{:?}", e);
Err(MnmlHttpError::BadRequest)
}
}
}
fn create_pool(url: String) -> Pool<PostgresConnectionManager> {
let manager = PostgresConnectionManager::new(url, TlsMode::None)
.expect("could not instantiate pg manager");
Pool::builder()
.max_size(DB_POOL_SIZE)
.build(manager)
.expect("Failed to create pool.")
}
pub struct State {
pub pool: PgPool,
// pub pubsub: PubSub,
}
pub fn start() {
let database_url = env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
let pool = create_pool(database_url);
let warden_pool = pool.clone();
spawn(move || {
loop {
let db_connection = warden_pool.get().expect("unable to get db connection");
if let Err(e) = warden(db_connection) {
info!("{:?}", e);
}
sleep(Duration::new(1, 0));
}
});
let pubsub_pool = pool.clone();
spawn(move || loop {
let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection");
match pg_listen(pubsub_conn) {
Ok(_) => warn!("pg listen closed"),
Err(e) => warn!("pg_listen error {:?}", e),
}
});
HttpServer::new(move || App::new()
.data(State { pool: pool.clone() })
.wrap(middleware::Logger::default())
.wrap(Cors::new().supports_credentials())
.service(web::resource("/api/login").route(web::post().to(login)))
.service(web::resource("/api/logout").route(web::post().to(logout)))
.service(web::resource("/api/register").route(web::post().to(register)))
.service(web::resource("/api/payments/stripe")
.route(web::post().to(post_stripe_event)))
// .service(web::resource("/api/payments/crypto")
// .route(web::post().to(post_stripe_payment)))
.service(web::resource("/api/ws").route(web::get().to(connect))))
.bind("127.0.0.1:40000").expect("could not bind to port")
.run().expect("could not start http server");
}

View File

@ -3,28 +3,14 @@ use uuid::Uuid;
use fallible_iterator::{FallibleIterator}; use fallible_iterator::{FallibleIterator};
use failure::Error; use failure::Error;
use failure::err_msg;
use crossbeam_channel::{unbounded, Sender, Receiver}; use crossbeam_channel::{unbounded, Sender, Receiver};
use pg::{Db}; use pg::{Db, PgPool};
use account; use account;
use game; use game;
use instance; use instance;
#[derive(Clone)]
pub struct PubSub {
pub s: Sender<Message>,
r: Receiver<Message>,
}
impl PubSub {
pub fn new() -> PubSub {
let (s, r) = unbounded();
return PubSub { s, r };
}
}
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] #[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
#[serde(rename_all(deserialize = "lowercase"))] #[serde(rename_all(deserialize = "lowercase"))]
enum Table { enum Table {
@ -53,13 +39,50 @@ struct Notification {
#[derive(Debug,Clone)] #[derive(Debug,Clone)]
pub enum Message { pub enum Message {
// outgoing
Account(account::Account), Account(account::Account),
Game(game::Game), Game(game::Game),
Instance(instance::Instance), Instance(instance::Instance),
// incoming
Connect(Sender<Message>),
Disconnect,
} }
fn handle_notification(n: Notification, db: &Db, pss: &Sender<Message>) -> Result<(), Error> { #[derive(Clone)]
info!("pubsub received notification notification={:?}", n); pub struct Events {
pub tx: Sender<Message>,
rx: Receiver<Message>,
}
impl Events {
pub fn new() -> Events {
let (tx, rx) = unbounded();
return Events { tx, rx };
}
fn receive(&mut self) {
match self.rx.try_recv() {
Ok(m) => self.on_message(m),
Err(_) => (),
}
}
fn on_message(&mut self, msg: Message) {
match msg {
Message::Connect(tx) => {
info!("client connected {:?}", tx);
},
Message::Disconnect => {
info!("client disconnected");
}
_ => panic!("events received unhandled msg={:?}", msg),
}
}
}
fn pg_notification(n: Notification, db: &Db, events: &Events) -> Result<(), Error> {
info!("events received notification notification={:?}", n);
// maybe we need it // maybe we need it
let mut tx = db.transaction()?; let mut tx = db.transaction()?;
@ -77,29 +100,47 @@ fn handle_notification(n: Notification, db: &Db, pss: &Sender<Message>) -> Resul
tx.commit()?; tx.commit()?;
match pss.try_send(msg.clone()) { info!("got a msg to send to whoever cares {:?}", msg);
Ok(()) => info!("pubsub message sent message={:?}", msg),
Err(e) => warn!("pubsub delivery failure err={:?}", e), // match events.try_send(msg.clone()) {
}; // Ok(()) => info!("events message sent message={:?}", msg),
// Err(e) => warn!("events delivery failure err={:?}", e),
// };
Ok(()) Ok(())
} }
pub fn pg_listen(db: Db, ps: &PubSub) -> Result<(), Error> { pub fn listen(pool: &PgPool, events: &mut Events) -> Result<(), Error> {
let db = pool.get()?;
db.execute("LISTEN events;", &[])?; db.execute("LISTEN events;", &[])?;
info!("pubsub listening"); info!("events listening");
let notifications = db.notifications(); let notifications = db.notifications();
let mut n_iter = notifications.blocking_iter(); let mut n_iter = notifications.blocking_iter();
// main event loop, checks pg and checks messages
loop { loop {
let n = n_iter.next().unwrap(); // check notifications
let n = n_iter.next()?;
if let Some(n) = n { if let Some(n) = n {
match serde_json::from_str::<Notification>(&n.payload) { match serde_json::from_str::<Notification>(&n.payload) {
Ok(notification) => match handle_notification(notification, &db, &ps.s) { Ok(notification) => match pg_notification(notification, &db, &events) {
Ok(()) => (), Ok(()) => (),
Err(e) => warn!("{:?}", e), Err(e) => warn!("{:?}", e),
} }
Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e), Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e),
}; };
} }
events.receive();
}
}
pub fn start(pool: PgPool, mut events: Events) {
loop {
match listen(&pool, &mut events) {
Ok(()) => panic!("events listen returned"),
Err(e) => warn!("events listener error err={:?}", e),
}
} }
} }

View File

@ -41,7 +41,7 @@ mod net;
mod payments; mod payments;
mod pg; mod pg;
mod player; mod player;
mod pubsub; mod events;
mod rpc; mod rpc;
mod skill; mod skill;
mod spec; mod spec;
@ -54,9 +54,7 @@ use std::thread::{sleep, spawn};
use std::time::{Duration}; use std::time::{Duration};
use std::path::{Path}; use std::path::{Path};
use crossbeam_channel::{unbounded}; use events::{start as events_start};
use pubsub::pg_listen;
use warden::warden; use warden::warden;
fn setup_logger() -> Result<(), fern::InitError> { fn setup_logger() -> Result<(), fern::InitError> {
@ -85,13 +83,7 @@ fn main() {
let pool = pg::create_pool(); let pool = pg::create_pool();
let ws_pool = pool.clone();
let http_pool = pool.clone();
let warden_pool = pool.clone(); let warden_pool = pool.clone();
let pubsub_pool = pool.clone();
let pubsub = pubsub::PubSub::new();
spawn(move || { spawn(move || {
loop { loop {
let db_connection = warden_pool.get().expect("unable to get db connection"); let db_connection = warden_pool.get().expect("unable to get db connection");
@ -102,16 +94,16 @@ fn main() {
} }
}); });
let pg_listener = pubsub.clone(); let events = events::Events::new();
spawn(move || loop { let event_listener = events.clone();
let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); let events_pool = pool.clone();
match pg_listen(pubsub_conn, &pg_listener) { // spawn(move || events_start(events_pool, event_listener));
Ok(_) => warn!("pg listen closed"),
Err(e) => warn!("pg_listen error {:?}", e),
}
});
let http_pool = pool.clone();
spawn(move || net::start(http_pool)); spawn(move || net::start(http_pool));
ws::start(ws_pool, pubsub);
// this should go on a thread too?
let ws_pool = pool.clone();
ws::start(ws_pool, events);
info!("server started"); info!("server started");
} }

View File

@ -12,7 +12,7 @@ use router::Router;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
// use warden::{warden}; // use warden::{warden};
// use pubsub::{pg_listen}; // use events::{pg_listen};
// use ws::{connect}; // use ws::{connect};
use account; use account;
use pg::PgPool; use pg::PgPool;
@ -103,7 +103,7 @@ impl From<MnmlHttpError> for IronError {
MnmlHttpError::AccountNotFound | MnmlHttpError::AccountNotFound |
MnmlHttpError::BadRequest | MnmlHttpError::BadRequest |
MnmlHttpError::PasswordUnacceptable => (m_err.compat(), status::BadRequest), MnmlHttpError::PasswordUnacceptable => (m_err.compat(), status::BadRequest),
MnmlHttpError::PasswordNotMatch | MnmlHttpError::PasswordNotMatch |
MnmlHttpError::InvalidCode | MnmlHttpError::InvalidCode |
MnmlHttpError::TokenDoesNotMatch | MnmlHttpError::TokenDoesNotMatch |
@ -260,7 +260,7 @@ const MAX_BODY_LENGTH: usize = 1024 * 1024 * 10;
pub struct State { pub struct State {
pub pool: PgPool, pub pool: PgPool,
// pub pubsub: PubSub, // pub events: Events,
} }
impl Key for State { type Value = State; } impl Key for State { type Value = State; }

View File

@ -7,7 +7,6 @@ use uuid::Uuid;
use failure::Error; use failure::Error;
use failure::err_msg; use failure::err_msg;
use ws::{Ws};
use pg::{Db}; use pg::{Db};
use construct::{Construct}; use construct::{Construct};
use game::{Game, game_state, game_skill, game_ready}; use game::{Game, game_state, game_skill, game_ready};

View File

@ -24,85 +24,69 @@ use failure::{err_msg, format_err};
use net::TOKEN_HEADER; use net::TOKEN_HEADER;
use rpc; use rpc;
use rpc::{RpcMessage};
use mtx; use mtx;
use pg::PgPool; use pg::PgPool;
use account; use account;
use account::Account; use account::Account;
use pubsub::{Message, PubSub}; use events::{Message, Events};
pub struct Ws { pub fn ws(mut client: WebSocket<TcpStream>, pool: PgPool, account: Option<Account>, events: Sender<Message>) {
client: WebSocket<TcpStream>, let (tx, rx) = unbounded();
sender: Sender<Message>, events.try_send(Message::Connect(tx)).unwrap();
receiver: Receiver<Message>,
pool: PgPool,
account: Option<Account>,
}
impl Ws { loop {
pub fn new(client: WebSocket<TcpStream>, pool: PgPool, account: Option<Account>) -> Ws { match client.read_message().no_block() {
let (sender, receiver) = unbounded(); Ok(msg) => {
return Ws { sender, receiver, client, pool, account }; if let Some(msg) = msg {
} match msg {
Binary(data) => {
let begin = Instant::now();
let db_connection = pool.get()
.expect("unable to get db connection");
fn start(&mut self) { match rpc::receive(data, &db_connection, begin, &account) {
loop { Ok(reply) => {
match self.client.read_message().no_block() { let response = to_vec(&reply)
Ok(msg) => { .expect("failed to serialize response");
if let Some(msg) = msg {
match msg {
Binary(data) => {
let begin = Instant::now();
let db_connection = self.pool.get()
.expect("unable to get db connection");
match rpc::receive(data, &db_connection, begin, &self.account) { if let Err(e) = client.write_message(Binary(response)) {
Ok(reply) => { // connection closed
let response = to_vec(&reply)
.expect("failed to serialize response");
if let Err(e) = self.client.write_message(Binary(response)) {
// connection closed
warn!("{:?}", e);
return;
};
// self.subscriptions.update(&reply).unwrap();
},
Err(e) => {
warn!("{:?}", e); warn!("{:?}", e);
let response = to_vec(&RpcError { err: e.to_string() }) return;
.expect("failed to serialize error response"); };
if let Err(e) = self.client.write_message(Binary(response)) { // subscriptions.update(&reply).unwrap();
// connection closed },
warn!("{:?}", e); Err(e) => {
return; warn!("{:?}", e);
}; let response = to_vec(&RpcError { err: e.to_string() })
} .expect("failed to serialize error response");
if let Err(e) = client.write_message(Binary(response)) {
// connection closed
warn!("{:?}", e);
return;
};
} }
}, }
_ => (), },
} _ => (),
}; }
};
self.receive(); // match receiver.try_recv() {
}, // Ok(n) => handle_message(&subs, n, &mut websocket),
// connection is closed // Err(_) => (),
Err(e) => { // };
warn!("{:?}", e);
return;
}
};
}
}
fn receive(&mut self) { },
// match self.receiver.try_recv() { // connection is closed
// Ok(n) => handle_message(&subs, n, &mut websocket), Err(e) => {
// Err(_) => (), warn!("{:?}", e);
// }; return;
}
};
} }
} }
@ -111,103 +95,108 @@ struct RpcError {
err: String, err: String,
} }
#[derive(Debug)] // #[derive(Debug)]
struct Subscriptions { // struct Subscriptions {
account: Option<Uuid>, // account: Option<Uuid>,
game: Option<Uuid>, // game: Option<Uuid>,
instance: Option<Uuid>, // instance: Option<Uuid>,
// account_instances: Vec<Uuid>, // // account_instances: Vec<Uuid>,
} // }
impl Subscriptions { // impl Subscriptions {
fn new(ws_pool: &PgPool, account: &Option<Account>, ws: &mut Ws) -> Result<Subscriptions, Error> { // fn new(ws_pool: &PgPool, account: &Option<Account>, ws: &mut Ws) -> Result<Subscriptions, Error> {
if let Some(a) = account { // if let Some(a) = account {
let db = ws_pool.get()?; // let db = ws_pool.get()?;
let mut tx = db.transaction()?; // let mut tx = db.transaction()?;
// send account constructs // // send account constructs
let account_constructs = account::account_constructs(&mut tx, a)?; // let account_constructs = account::account_constructs(&mut tx, a)?;
ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; // ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?;
// get account instances // // get account instances
// and send them to the client // // and send them to the client
let account_instances = account::account_instances(&mut tx, a)?; // let account_instances = account::account_instances(&mut tx, a)?;
// let instances = account_instances.iter().map(|i| i.id).collect::<Vec<Uuid>>(); // // let instances = account_instances.iter().map(|i| i.id).collect::<Vec<Uuid>>();
ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; // ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?;
// get players // // get players
// add to games // // add to games
// tx.commit()?;
tx.commit()?; // return Ok(Subscriptions {
// account: Some(a.id),
// game: None,
// instance: None,
// })
// }
return Ok(Subscriptions { // Ok(Subscriptions {
account: Some(a.id), // account: None,
game: None, // game: None,
instance: None, // instance: None
}) // })
} // }
Ok(Subscriptions { // fn update(&mut self, msg: &RpcMessage) -> Result<&mut Subscriptions, Error> {
account: None, // match msg {
game: None, // RpcMessage::AccountState(a) => self.account = Some(a.id),
instance: None // RpcMessage::InstanceState(i) => self.instance = Some(i.id),
}) // RpcMessage::GameState(g) => self.game = Some(g.id),
} // _ => (),
// };
fn update(&mut self, msg: &RpcMessage) -> Result<&mut Subscriptions, Error> { // // info!("subscriptions updated {:?}", self);
match msg {
RpcMessage::AccountState(a) => self.account = Some(a.id),
RpcMessage::InstanceState(i) => self.instance = Some(i.id),
RpcMessage::GameState(g) => self.game = Some(g.id),
_ => (),
};
// info!("subscriptions updated {:?}", self); // Ok(self)
// }
Ok(self) // }
}
}
fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) { // fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) {
if let Some(msg) = match m { // if let Some(msg) = match m {
Message::Account(a) => { // Message::Account(a) => {
match subs.account { // match subs.account {
Some(wsa) => match wsa == a.id { // Some(wsa) => match wsa == a.id {
true => Some(rpc::RpcMessage::AccountState(a)), // true => Some(rpc::RpcMessage::AccountState(a)),
false => None, // false => None,
}, // },
None => None, // None => None,
} // }
}, // },
Message::Instance(i) => { // Message::Instance(i) => {
match subs.instance { // match subs.instance {
Some(ci) => match ci == i.id { // Some(ci) => match ci == i.id {
true => Some(rpc::RpcMessage::InstanceState(i)), // true => Some(rpc::RpcMessage::InstanceState(i)),
false => None, // false => None,
}, // },
None => None, // None => None,
} // }
}, // },
Message::Game(g) => { // Message::Game(g) => {
match subs.game { // match subs.game {
Some(cg) => match cg == g.id { // Some(cg) => match cg == g.id {
true => Some(rpc::RpcMessage::GameState(g)), // true => Some(rpc::RpcMessage::GameState(g)),
false => None, // false => None,
}, // },
None => None, // None => None,
} // }
}, // },
// _ => None, // Message::Connect(tx) => {
} { // info!("client connected {:?}", tx);
ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); // None
} // },
} // // _ => None,
// } {
// ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap();
// }
// }
pub fn start(pool: PgPool, ps: PubSub) { pub fn start(pool: PgPool, events: Events) {
let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap(); let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap();
for stream in ws_server.incoming() { for stream in ws_server.incoming() {
let ws_pool = pool.clone(); let ws_pool = pool.clone();
let events_tx = events.tx.clone();
spawn(move || { spawn(move || {
let (acc_s, acc_r) = unbounded(); let (acc_s, acc_r) = unbounded();
@ -272,8 +261,7 @@ pub fn start(pool: PgPool, ps: PubSub) {
None => None, None => None,
}; };
Ws::new(client, ws_pool, account) ws(client, ws_pool, account, events_tx)
.start()
}); });
} }
} }