Merge branch 'develop' into buy-constructs
This commit is contained in:
commit
b72c819c63
@ -3,13 +3,6 @@ name = "mnml"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
authors = ["ntr <ntr@smokestack.io>"]
|
authors = ["ntr <ntr@smokestack.io>"]
|
||||||
|
|
||||||
# makes sure to include openssl links in runtime path
|
|
||||||
# [profile.release]
|
|
||||||
# rpath = true
|
|
||||||
|
|
||||||
# [profile.dev]
|
|
||||||
# rpath = true
|
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
serde = "1"
|
serde = "1"
|
||||||
serde_derive = "1"
|
serde_derive = "1"
|
||||||
@ -37,11 +30,7 @@ bodyparser = "0.8"
|
|||||||
persistent = "0.4"
|
persistent = "0.4"
|
||||||
router = "0.6"
|
router = "0.6"
|
||||||
cookie = "0.12"
|
cookie = "0.12"
|
||||||
tungstenite = "0.8"
|
|
||||||
crossbeam-channel = "0.3"
|
crossbeam-channel = "0.3"
|
||||||
|
ws = "0.8"
|
||||||
|
|
||||||
stripe-rust = { version = "0.10.4", features = ["webhooks"] }
|
stripe-rust = { version = "0.10.4", features = ["webhooks"] }
|
||||||
|
|
||||||
# [patch.crates-io]
|
|
||||||
# stripe-rust = { git = "https://github.com/margh/stripe-rs.git" }
|
|
||||||
|
|
||||||
|
|||||||
@ -1,221 +0,0 @@
|
|||||||
use std::env;
|
|
||||||
use std::thread::{spawn, sleep};
|
|
||||||
use std::time::{Duration};
|
|
||||||
|
|
||||||
use actix_web::{middleware, web, App, HttpMessage, HttpRequest, HttpResponse, HttpServer};
|
|
||||||
use actix_web::error::ResponseError;
|
|
||||||
use actix_web::http::{Cookie};
|
|
||||||
use actix_web::cookie::{SameSite};
|
|
||||||
use actix_cors::Cors;
|
|
||||||
|
|
||||||
use r2d2::{Pool};
|
|
||||||
use r2d2::{PooledConnection};
|
|
||||||
use r2d2_postgres::{TlsMode, PostgresConnectionManager};
|
|
||||||
|
|
||||||
use warden::{warden};
|
|
||||||
use pubsub::{pg_listen};
|
|
||||||
use ws::{connect};
|
|
||||||
use account;
|
|
||||||
use payments::{post_stripe_event};
|
|
||||||
|
|
||||||
pub type Db = PooledConnection<PostgresConnectionManager>;
|
|
||||||
pub type PgPool = Pool<PostgresConnectionManager>;
|
|
||||||
|
|
||||||
const DB_POOL_SIZE: u32 = 20;
|
|
||||||
|
|
||||||
#[derive(Debug,Clone,Serialize,Deserialize)]
|
|
||||||
pub struct JsonError {
|
|
||||||
pub err: String
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Fail, Debug, Serialize, Deserialize)]
|
|
||||||
pub enum MnmlHttpError {
|
|
||||||
// User Facing Errors
|
|
||||||
#[fail(display="internal server error")]
|
|
||||||
ServerError,
|
|
||||||
#[fail(display="unauthorized")]
|
|
||||||
Unauthorized,
|
|
||||||
#[fail(display="bad request")]
|
|
||||||
BadRequest,
|
|
||||||
#[fail(display="account name taken or invalid")]
|
|
||||||
AccountNameTaken,
|
|
||||||
#[fail(display="password unacceptable. must be > 11 characters")]
|
|
||||||
PasswordUnacceptable,
|
|
||||||
#[fail(display="invalid code. https://discord.gg/YJJgurM")]
|
|
||||||
InvalidCode,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ResponseError for MnmlHttpError {
|
|
||||||
fn error_response(&self) -> HttpResponse {
|
|
||||||
match *self {
|
|
||||||
MnmlHttpError::ServerError => HttpResponse::InternalServerError()
|
|
||||||
.json(JsonError { err: self.to_string() }),
|
|
||||||
|
|
||||||
MnmlHttpError::BadRequest => HttpResponse::BadRequest()
|
|
||||||
.json(JsonError { err: self.to_string() }),
|
|
||||||
|
|
||||||
MnmlHttpError::Unauthorized => HttpResponse::Unauthorized()
|
|
||||||
.cookie(Cookie::build("x-auth-token", "")
|
|
||||||
// .secure(secure)
|
|
||||||
.http_only(true)
|
|
||||||
.same_site(SameSite::Strict)
|
|
||||||
.max_age(-1) // 1 week aligns with db set
|
|
||||||
.finish())
|
|
||||||
.json(JsonError { err: self.to_string() }),
|
|
||||||
|
|
||||||
MnmlHttpError::AccountNameTaken => HttpResponse::BadRequest()
|
|
||||||
.json(JsonError { err: self.to_string() }),
|
|
||||||
|
|
||||||
MnmlHttpError::PasswordUnacceptable => HttpResponse::BadRequest()
|
|
||||||
.json(JsonError { err: self.to_string() }),
|
|
||||||
|
|
||||||
MnmlHttpError::InvalidCode => HttpResponse::BadRequest()
|
|
||||||
.json(JsonError { err: self.to_string() }),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn login_res(token: String) -> HttpResponse {
|
|
||||||
HttpResponse::Ok()
|
|
||||||
.cookie(Cookie::build("x-auth-token", token)
|
|
||||||
// .secure(secure)
|
|
||||||
.http_only(true)
|
|
||||||
.same_site(SameSite::Strict)
|
|
||||||
.max_age(60 * 60 * 24 * 7) // 1 week aligns with db set
|
|
||||||
.finish())
|
|
||||||
.finish()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn logout_res() -> HttpResponse {
|
|
||||||
HttpResponse::Ok()
|
|
||||||
.cookie(Cookie::build("x-auth-token", "")
|
|
||||||
// .secure(secure)
|
|
||||||
.http_only(true)
|
|
||||||
.same_site(SameSite::Strict)
|
|
||||||
.max_age(-1)
|
|
||||||
.finish())
|
|
||||||
.finish()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug,Clone,Serialize,Deserialize)]
|
|
||||||
struct AccountLoginParams {
|
|
||||||
name: String,
|
|
||||||
password: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn login(state: web::Data<State>, params: web::Json::<AccountLoginParams>) -> Result<HttpResponse, MnmlHttpError> {
|
|
||||||
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
|
|
||||||
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
|
|
||||||
|
|
||||||
match account::login(&mut tx, ¶ms.name, ¶ms.password) {
|
|
||||||
Ok(a) => {
|
|
||||||
let token = account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::ServerError))?;
|
|
||||||
tx.commit().or(Err(MnmlHttpError::ServerError))?;
|
|
||||||
Ok(login_res(token))
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
info!("{:?}", e);
|
|
||||||
Err(MnmlHttpError::Unauthorized)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn logout(r: HttpRequest, state: web::Data<State>) -> Result<HttpResponse, MnmlHttpError> {
|
|
||||||
match r.cookie("x-auth-token") {
|
|
||||||
Some(t) => {
|
|
||||||
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
|
|
||||||
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
|
|
||||||
match account::from_token(&mut tx, t.value().to_string()) {
|
|
||||||
Ok(a) => {
|
|
||||||
account::new_token(&mut tx, a.id).or(Err(MnmlHttpError::Unauthorized))?;
|
|
||||||
tx.commit().or(Err(MnmlHttpError::ServerError))?;
|
|
||||||
return Ok(logout_res());
|
|
||||||
},
|
|
||||||
Err(_) => Err(MnmlHttpError::Unauthorized),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
None => Err(MnmlHttpError::Unauthorized),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug,Clone,Serialize,Deserialize)]
|
|
||||||
struct AccountRegisterParams {
|
|
||||||
name: String,
|
|
||||||
password: String,
|
|
||||||
code: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn register(state: web::Data<State>, params: web::Json::<AccountRegisterParams>) -> Result<HttpResponse, MnmlHttpError> {
|
|
||||||
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
|
|
||||||
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
|
|
||||||
|
|
||||||
match account::create(¶ms.name, ¶ms.password, ¶ms.code, &mut tx) {
|
|
||||||
Ok(token) => {
|
|
||||||
tx.commit().or(Err(MnmlHttpError::ServerError))?;
|
|
||||||
Ok(login_res(token))
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
info!("{:?}", e);
|
|
||||||
Err(MnmlHttpError::BadRequest)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_pool(url: String) -> Pool<PostgresConnectionManager> {
|
|
||||||
let manager = PostgresConnectionManager::new(url, TlsMode::None)
|
|
||||||
.expect("could not instantiate pg manager");
|
|
||||||
|
|
||||||
Pool::builder()
|
|
||||||
.max_size(DB_POOL_SIZE)
|
|
||||||
.build(manager)
|
|
||||||
.expect("Failed to create pool.")
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct State {
|
|
||||||
pub pool: PgPool,
|
|
||||||
// pub pubsub: PubSub,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn start() {
|
|
||||||
let database_url = env::var("DATABASE_URL")
|
|
||||||
.expect("DATABASE_URL must be set");
|
|
||||||
let pool = create_pool(database_url);
|
|
||||||
|
|
||||||
let warden_pool = pool.clone();
|
|
||||||
spawn(move || {
|
|
||||||
loop {
|
|
||||||
let db_connection = warden_pool.get().expect("unable to get db connection");
|
|
||||||
if let Err(e) = warden(db_connection) {
|
|
||||||
info!("{:?}", e);
|
|
||||||
}
|
|
||||||
sleep(Duration::new(1, 0));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let pubsub_pool = pool.clone();
|
|
||||||
spawn(move || loop {
|
|
||||||
let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection");
|
|
||||||
match pg_listen(pubsub_conn) {
|
|
||||||
Ok(_) => warn!("pg listen closed"),
|
|
||||||
Err(e) => warn!("pg_listen error {:?}", e),
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
HttpServer::new(move || App::new()
|
|
||||||
.data(State { pool: pool.clone() })
|
|
||||||
.wrap(middleware::Logger::default())
|
|
||||||
.wrap(Cors::new().supports_credentials())
|
|
||||||
.service(web::resource("/api/login").route(web::post().to(login)))
|
|
||||||
.service(web::resource("/api/logout").route(web::post().to(logout)))
|
|
||||||
.service(web::resource("/api/register").route(web::post().to(register)))
|
|
||||||
|
|
||||||
.service(web::resource("/api/payments/stripe")
|
|
||||||
.route(web::post().to(post_stripe_event)))
|
|
||||||
|
|
||||||
// .service(web::resource("/api/payments/crypto")
|
|
||||||
// .route(web::post().to(post_stripe_payment)))
|
|
||||||
|
|
||||||
.service(web::resource("/api/ws").route(web::get().to(connect))))
|
|
||||||
.bind("127.0.0.1:40000").expect("could not bind to port")
|
|
||||||
.run().expect("could not start http server");
|
|
||||||
}
|
|
||||||
239
server/src/events.rs
Normal file
239
server/src/events.rs
Normal file
@ -0,0 +1,239 @@
|
|||||||
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|
||||||
|
// Db Commons
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use failure::Error;
|
||||||
|
|
||||||
|
use crossbeam_channel::{unbounded, Sender, Receiver};
|
||||||
|
|
||||||
|
use account;
|
||||||
|
use account::Account;
|
||||||
|
use game;
|
||||||
|
use instance;
|
||||||
|
use pg::{Db, PgPool};
|
||||||
|
use rpc::RpcMessage;
|
||||||
|
|
||||||
|
type Id = usize;
|
||||||
|
|
||||||
|
pub struct Events {
|
||||||
|
pub tx: Sender<Event>,
|
||||||
|
rx: Receiver<Event>,
|
||||||
|
pool: PgPool,
|
||||||
|
|
||||||
|
clients: HashMap<Id, WsClient>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug,Clone)]
|
||||||
|
pub enum Event {
|
||||||
|
Connect(Id, Option<Account>, Sender<RpcMessage>),
|
||||||
|
Disconnect(Id),
|
||||||
|
Subscribe(Id, Uuid),
|
||||||
|
Unsubscribe(Id, Uuid),
|
||||||
|
|
||||||
|
Push(Uuid, RpcMessage),
|
||||||
|
}
|
||||||
|
|
||||||
|
struct WsClient {
|
||||||
|
id: Id,
|
||||||
|
tx: Sender<RpcMessage>,
|
||||||
|
subs: HashSet<Uuid>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Events {
|
||||||
|
pub fn new(pool: PgPool) -> Events {
|
||||||
|
let (tx, rx) = unbounded();
|
||||||
|
Events {
|
||||||
|
tx,
|
||||||
|
rx,
|
||||||
|
pool,
|
||||||
|
clients: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn listen(&mut self) -> Result<(), Error> {
|
||||||
|
loop {
|
||||||
|
match self.rx.recv() {
|
||||||
|
Ok(m) => {
|
||||||
|
self.on_event(m)?;
|
||||||
|
},
|
||||||
|
|
||||||
|
// idk if this is a good idea
|
||||||
|
// possibly just log errors and continue...
|
||||||
|
Err(e) => {
|
||||||
|
return Err(format_err!("events error err={:?}", e));
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_client(&mut self, id: Id) {
|
||||||
|
self.clients.remove(&id);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_event(&mut self, msg: Event) -> Result<(), Error> {
|
||||||
|
match msg {
|
||||||
|
Event::Connect(id, account, tx) => {
|
||||||
|
info!("client connected to events id={:?} account={:?}", id, account);
|
||||||
|
|
||||||
|
let client = WsClient { id, tx, subs: HashSet::new() };
|
||||||
|
self.clients.insert(id, client);
|
||||||
|
|
||||||
|
info!("events clients={:?}", self.clients.len());
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
Event::Disconnect(id) => {
|
||||||
|
info!("client disconnected from events id={:?}", id);
|
||||||
|
|
||||||
|
self.clients.remove(&id);
|
||||||
|
|
||||||
|
info!("events clients={:?}", self.clients.len());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Event::Subscribe(id, obj) => {
|
||||||
|
info!("client subscribed to updates from object id={:?} object={:?}", id, obj);
|
||||||
|
|
||||||
|
match self.clients.get_mut(&id) {
|
||||||
|
Some(client) => {
|
||||||
|
client.subs.insert(obj);
|
||||||
|
info!("client subscriptions={:?}", client.subs.len());
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
None => return Err(format_err!("unknown client {:?}", id))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Event::Unsubscribe(id, obj) => {
|
||||||
|
info!("client subscribed to updates from object id={:?} object={:?}", id, obj);
|
||||||
|
|
||||||
|
match self.clients.get_mut(&id) {
|
||||||
|
Some(mut client) => {
|
||||||
|
client.subs.remove(&obj);
|
||||||
|
info!("client subscriptions={:?}", client.subs.len());
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
None => return Err(format_err!("unknown client {:?}", id))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
Event::Push(id, msg) => {
|
||||||
|
info!("events received push notification id={:?} msg={:?}", id, msg);
|
||||||
|
|
||||||
|
let mut subs = 0;
|
||||||
|
for (_client_id, client) in self.clients.iter() {
|
||||||
|
if client.subs.contains(&id) {
|
||||||
|
subs += 1;
|
||||||
|
match client.tx.send(msg.clone()) {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(e) => {
|
||||||
|
warn!("unable to send msg to client err={:?}", e);
|
||||||
|
// self.remove_client(*client_id);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("notification subscribers={:?}", subs);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// #[derive(Debug)]
|
||||||
|
// struct Subscriptions {
|
||||||
|
// account: Option<Uuid>,
|
||||||
|
// game: Option<Uuid>,
|
||||||
|
// instance: Option<Uuid>,
|
||||||
|
// // account_instances: Vec<Uuid>,
|
||||||
|
// }
|
||||||
|
|
||||||
|
// impl Subscriptions {
|
||||||
|
// fn new(ws_pool: &PgPool, account: &Option<Account>, ws: &mut Ws) -> Result<Subscriptions, Error> {
|
||||||
|
// if let Some(a) = account {
|
||||||
|
// let db = ws_pool.get()?;
|
||||||
|
// let mut tx = db.transaction()?;
|
||||||
|
|
||||||
|
// // send account constructs
|
||||||
|
// let account_constructs = account::account_constructs(&mut tx, a)?;
|
||||||
|
// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?;
|
||||||
|
|
||||||
|
// // get account instances
|
||||||
|
// // and send them to the client
|
||||||
|
// let account_instances = account::account_instances(&mut tx, a)?;
|
||||||
|
// // let instances = account_instances.iter().map(|i| i.id).collect::<Vec<Uuid>>();
|
||||||
|
// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?;
|
||||||
|
|
||||||
|
// // get players
|
||||||
|
// // add to games
|
||||||
|
// tx.commit()?;
|
||||||
|
|
||||||
|
// return Ok(Subscriptions {
|
||||||
|
// account: Some(a.id),
|
||||||
|
// game: None,
|
||||||
|
// instance: None,
|
||||||
|
// })
|
||||||
|
// }
|
||||||
|
|
||||||
|
// Ok(Subscriptions {
|
||||||
|
// account: None,
|
||||||
|
// game: None,
|
||||||
|
// instance: None
|
||||||
|
// })
|
||||||
|
// }
|
||||||
|
|
||||||
|
// fn update(&mut self, msg: &RpcMessage) -> Result<&mut Subscriptions, Error> {
|
||||||
|
// match msg {
|
||||||
|
// RpcMessage::AccountState(a) => self.account = Some(a.id),
|
||||||
|
// RpcMessage::InstanceState(i) => self.instance = Some(i.id),
|
||||||
|
// RpcMessage::GameState(g) => self.game = Some(g.id),
|
||||||
|
// _ => (),
|
||||||
|
// };
|
||||||
|
|
||||||
|
// // info!("subscriptions updated {:?}", self);
|
||||||
|
|
||||||
|
// Ok(self)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
// fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) {
|
||||||
|
// if let Some(msg) = match m {
|
||||||
|
// Message::Account(a) => {
|
||||||
|
// match subs.account {
|
||||||
|
// Some(wsa) => match wsa == a.id {
|
||||||
|
// true => Some(rpc::RpcMessage::AccountState(a)),
|
||||||
|
// false => None,
|
||||||
|
// },
|
||||||
|
// None => None,
|
||||||
|
// }
|
||||||
|
// },
|
||||||
|
// Message::Instance(i) => {
|
||||||
|
// match subs.instance {
|
||||||
|
// Some(ci) => match ci == i.id {
|
||||||
|
// true => Some(rpc::RpcMessage::InstanceState(i)),
|
||||||
|
// false => None,
|
||||||
|
// },
|
||||||
|
// None => None,
|
||||||
|
// }
|
||||||
|
// },
|
||||||
|
// Message::Game(g) => {
|
||||||
|
// match subs.game {
|
||||||
|
// Some(cg) => match cg == g.id {
|
||||||
|
// true => Some(rpc::RpcMessage::GameState(g)),
|
||||||
|
// false => None,
|
||||||
|
// },
|
||||||
|
// None => None,
|
||||||
|
// }
|
||||||
|
// },
|
||||||
|
// Message::Connect(tx) => {
|
||||||
|
// info!("client connected {:?}", tx);
|
||||||
|
// None
|
||||||
|
// },
|
||||||
|
// // _ => None,
|
||||||
|
// } {
|
||||||
|
// ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap();
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
@ -24,7 +24,8 @@ extern crate bodyparser;
|
|||||||
extern crate persistent;
|
extern crate persistent;
|
||||||
extern crate router;
|
extern crate router;
|
||||||
extern crate cookie;
|
extern crate cookie;
|
||||||
extern crate tungstenite;
|
|
||||||
|
extern crate ws;
|
||||||
extern crate crossbeam_channel;
|
extern crate crossbeam_channel;
|
||||||
|
|
||||||
mod account;
|
mod account;
|
||||||
@ -41,22 +42,20 @@ mod net;
|
|||||||
mod payments;
|
mod payments;
|
||||||
mod pg;
|
mod pg;
|
||||||
mod player;
|
mod player;
|
||||||
mod pubsub;
|
mod events;
|
||||||
mod rpc;
|
mod rpc;
|
||||||
mod skill;
|
mod skill;
|
||||||
mod spec;
|
mod spec;
|
||||||
mod util;
|
mod util;
|
||||||
mod vbox;
|
mod vbox;
|
||||||
mod warden;
|
mod warden;
|
||||||
mod ws;
|
mod websocket;
|
||||||
|
|
||||||
use std::thread::{sleep, spawn};
|
use std::thread::{sleep, spawn};
|
||||||
use std::time::{Duration};
|
use std::time::{Duration};
|
||||||
use std::path::{Path};
|
use std::path::{Path};
|
||||||
|
|
||||||
use crossbeam_channel::{unbounded};
|
use events::Events;
|
||||||
|
|
||||||
use pubsub::pg_listen;
|
|
||||||
use warden::warden;
|
use warden::warden;
|
||||||
|
|
||||||
fn setup_logger() -> Result<(), fern::InitError> {
|
fn setup_logger() -> Result<(), fern::InitError> {
|
||||||
@ -85,13 +84,7 @@ fn main() {
|
|||||||
|
|
||||||
let pool = pg::create_pool();
|
let pool = pg::create_pool();
|
||||||
|
|
||||||
let ws_pool = pool.clone();
|
|
||||||
let http_pool = pool.clone();
|
|
||||||
let warden_pool = pool.clone();
|
let warden_pool = pool.clone();
|
||||||
let pubsub_pool = pool.clone();
|
|
||||||
|
|
||||||
let (pss, psr) = unbounded();
|
|
||||||
|
|
||||||
spawn(move || {
|
spawn(move || {
|
||||||
loop {
|
loop {
|
||||||
let db_connection = warden_pool.get().expect("unable to get db connection");
|
let db_connection = warden_pool.get().expect("unable to get db connection");
|
||||||
@ -102,15 +95,21 @@ fn main() {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
spawn(move || loop {
|
let http_pool = pool.clone();
|
||||||
let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection");
|
|
||||||
match pg_listen(pubsub_conn, pss.clone()) {
|
|
||||||
Ok(_) => warn!("pg listen closed"),
|
|
||||||
Err(e) => warn!("pg_listen error {:?}", e),
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
spawn(move || net::start(http_pool));
|
spawn(move || net::start(http_pool));
|
||||||
ws::start(ws_pool, psr);
|
|
||||||
info!("server started");
|
// create a clone of the tx so ws handler can tell events
|
||||||
|
// about connection status
|
||||||
|
// TODO store as an Arc<Mutex<Events>> and make cpuN threads
|
||||||
|
let mut events = events::Events::new(pool.clone());
|
||||||
|
let ws_events_tx = events.tx.clone();
|
||||||
|
|
||||||
|
let pg_pool = pool.clone();
|
||||||
|
let pg_events = events.tx.clone();
|
||||||
|
spawn(move || pg::listen(pg_pool, pg_events));
|
||||||
|
spawn(move || events.listen());
|
||||||
|
|
||||||
|
// the main thread becomes this ws listener
|
||||||
|
let ws_pool = pool.clone();
|
||||||
|
websocket::start(ws_pool, ws_events_tx);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,7 @@ use router::Router;
|
|||||||
use serde::{Serialize, Deserialize};
|
use serde::{Serialize, Deserialize};
|
||||||
|
|
||||||
// use warden::{warden};
|
// use warden::{warden};
|
||||||
// use pubsub::{pg_listen};
|
// use events::{pg_listen};
|
||||||
// use ws::{connect};
|
// use ws::{connect};
|
||||||
use account;
|
use account;
|
||||||
use pg::PgPool;
|
use pg::PgPool;
|
||||||
@ -260,7 +260,7 @@ const MAX_BODY_LENGTH: usize = 1024 * 1024 * 10;
|
|||||||
|
|
||||||
pub struct State {
|
pub struct State {
|
||||||
pub pool: PgPool,
|
pub pool: PgPool,
|
||||||
// pub pubsub: PubSub,
|
// pub events: Events,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Key for State { type Value = State; }
|
impl Key for State { type Value = State; }
|
||||||
|
|||||||
106
server/src/pg.rs
106
server/src/pg.rs
@ -1,16 +1,53 @@
|
|||||||
use std::env;
|
use std::env;
|
||||||
|
use std::thread::spawn;
|
||||||
|
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use failure::Error;
|
||||||
|
|
||||||
use r2d2::{Pool};
|
use r2d2::{Pool};
|
||||||
use r2d2::{PooledConnection};
|
use r2d2::{PooledConnection};
|
||||||
use r2d2_postgres::{TlsMode, PostgresConnectionManager};
|
use r2d2_postgres::{TlsMode, PostgresConnectionManager};
|
||||||
|
use fallible_iterator::{FallibleIterator};
|
||||||
|
|
||||||
// use postgres::transaction::Transaction;
|
use crossbeam_channel::{Sender};
|
||||||
|
|
||||||
|
use events::{Event};
|
||||||
|
use account;
|
||||||
|
use game;
|
||||||
|
use instance;
|
||||||
|
use rpc::RpcMessage;
|
||||||
|
|
||||||
pub type Db = PooledConnection<PostgresConnectionManager>;
|
pub type Db = PooledConnection<PostgresConnectionManager>;
|
||||||
pub type PgPool = Pool<PostgresConnectionManager>;
|
pub type PgPool = Pool<PostgresConnectionManager>;
|
||||||
|
|
||||||
const DB_POOL_SIZE: u32 = 20;
|
const DB_POOL_SIZE: u32 = 20;
|
||||||
|
|
||||||
|
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
|
||||||
|
#[serde(rename_all(deserialize = "lowercase"))]
|
||||||
|
enum Table {
|
||||||
|
Accounts,
|
||||||
|
Constructs,
|
||||||
|
Instances,
|
||||||
|
Mtx,
|
||||||
|
Players,
|
||||||
|
Games,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
|
||||||
|
#[serde(rename_all(deserialize = "UPPERCASE"))]
|
||||||
|
enum Action {
|
||||||
|
Insert,
|
||||||
|
Update,
|
||||||
|
Delete,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug,Clone,Serialize,Deserialize)]
|
||||||
|
struct Notification {
|
||||||
|
table: Table,
|
||||||
|
action: Action,
|
||||||
|
id: Uuid,
|
||||||
|
}
|
||||||
|
|
||||||
pub fn create_pool() -> Pool<PostgresConnectionManager> {
|
pub fn create_pool() -> Pool<PostgresConnectionManager> {
|
||||||
let url = env::var("DATABASE_URL")
|
let url = env::var("DATABASE_URL")
|
||||||
@ -24,3 +61,70 @@ pub fn create_pool() -> Pool<PostgresConnectionManager> {
|
|||||||
.build(manager)
|
.build(manager)
|
||||||
.expect("Failed to create pool.")
|
.expect("Failed to create pool.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn handle_notification(n: Notification, pool: &PgPool, events: &Sender<Event>) {
|
||||||
|
info!("pg received notification={:?}", n);
|
||||||
|
|
||||||
|
// bang out a thread to do the slow work of fetching the state from db
|
||||||
|
// the thread will notify events
|
||||||
|
|
||||||
|
let pool = pool.clone();
|
||||||
|
let events = events.clone();
|
||||||
|
spawn(move || {
|
||||||
|
// maybe we need it
|
||||||
|
let db = pool.get().unwrap();
|
||||||
|
let mut tx = db.transaction().unwrap();
|
||||||
|
|
||||||
|
let msg = match n.action {
|
||||||
|
Action::Delete => {
|
||||||
|
warn!("unimplemented delete notification {:?}", n);
|
||||||
|
None
|
||||||
|
},
|
||||||
|
Action::Insert => {
|
||||||
|
warn!("unimplemented insert notification {:?}", n);
|
||||||
|
None
|
||||||
|
},
|
||||||
|
Action::Update => match n.table {
|
||||||
|
Table::Accounts =>
|
||||||
|
Some(Event::Push(n.id, RpcMessage::AccountState(account::select(&db, n.id).unwrap()))),
|
||||||
|
Table::Instances =>
|
||||||
|
Some(Event::Push(n.id, RpcMessage::InstanceState(instance::instance_get(&mut tx, n.id).unwrap()))),
|
||||||
|
Table::Games =>
|
||||||
|
Some(Event::Push(n.id, RpcMessage::GameState(game::game_get(&mut tx, n.id).unwrap()))),
|
||||||
|
_ => {
|
||||||
|
warn!("unimplemented update notification {:?}", n);
|
||||||
|
None
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
tx.commit().unwrap();
|
||||||
|
|
||||||
|
if let Some(msg) = msg {
|
||||||
|
events.send(msg).unwrap();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// this function gets a dedicated connection
|
||||||
|
// because it has to subscribe and listen for notifications
|
||||||
|
pub fn listen(pool: PgPool, events: Sender<Event>) -> Result<(), Error> {
|
||||||
|
let db = pool.get()?;
|
||||||
|
db.execute("LISTEN events;", &[])?;
|
||||||
|
info!("pg listening");
|
||||||
|
|
||||||
|
let notifications = db.notifications();
|
||||||
|
let mut n_iter = notifications.blocking_iter();
|
||||||
|
|
||||||
|
// main event loop, checks pg and checks messages
|
||||||
|
loop {
|
||||||
|
// check notifications
|
||||||
|
let n = n_iter.next()?;
|
||||||
|
if let Some(n) = n {
|
||||||
|
match serde_json::from_str::<Notification>(&n.payload) {
|
||||||
|
Ok(notification) => handle_notification(notification, &pool, &events),
|
||||||
|
Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -1,92 +0,0 @@
|
|||||||
// Db Commons
|
|
||||||
use uuid::Uuid;
|
|
||||||
use fallible_iterator::{FallibleIterator};
|
|
||||||
|
|
||||||
use failure::Error;
|
|
||||||
use failure::err_msg;
|
|
||||||
|
|
||||||
use crossbeam_channel::{Sender};
|
|
||||||
|
|
||||||
use pg::{Db};
|
|
||||||
use account;
|
|
||||||
use game;
|
|
||||||
use instance;
|
|
||||||
|
|
||||||
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
|
|
||||||
#[serde(rename_all(deserialize = "lowercase"))]
|
|
||||||
enum Table {
|
|
||||||
Accounts,
|
|
||||||
Constructs,
|
|
||||||
Instances,
|
|
||||||
Mtx,
|
|
||||||
Players,
|
|
||||||
Games,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
|
|
||||||
#[serde(rename_all(deserialize = "UPPERCASE"))]
|
|
||||||
enum Action {
|
|
||||||
Insert,
|
|
||||||
Update,
|
|
||||||
Delete,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug,Clone,Serialize,Deserialize)]
|
|
||||||
struct Notification {
|
|
||||||
table: Table,
|
|
||||||
action: Action,
|
|
||||||
id: Uuid,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug,Clone)]
|
|
||||||
pub enum Message {
|
|
||||||
Account(account::Account),
|
|
||||||
Game(game::Game),
|
|
||||||
Instance(instance::Instance),
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_notification(n: Notification, db: &Db, pss: &Sender<Message>) -> Result<(), Error> {
|
|
||||||
info!("pubsub received notification notification={:?}", n);
|
|
||||||
|
|
||||||
// maybe we need it
|
|
||||||
let mut tx = db.transaction()?;
|
|
||||||
|
|
||||||
let msg = match n.action {
|
|
||||||
Action::Delete => return Err(format_err!("unimplemented delete notification {:?}", n)),
|
|
||||||
Action::Insert => return Err(format_err!("unimplemented insert notification {:?}", n)),
|
|
||||||
Action::Update => match n.table {
|
|
||||||
Table::Accounts => Message::Account(account::select(db, n.id)?),
|
|
||||||
Table::Instances => Message::Instance(instance::instance_get(&mut tx, n.id)?),
|
|
||||||
Table::Games => Message::Game(game::game_get(&mut tx, n.id)?),
|
|
||||||
_ => return Err(format_err!("unimplemented update notification {:?}", n)),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
tx.commit()?;
|
|
||||||
|
|
||||||
match pss.try_send(msg.clone()) {
|
|
||||||
Ok(()) => info!("pubsub message sent message={:?}", msg),
|
|
||||||
Err(e) => warn!("pubsub delivery failure err={:?}", e),
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn pg_listen(db: Db, pss: Sender<Message>) -> Result<(), Error> {
|
|
||||||
db.execute("LISTEN events;", &[])?;
|
|
||||||
info!("pubsub listening");
|
|
||||||
let notifications = db.notifications();
|
|
||||||
let mut n_iter = notifications.blocking_iter();
|
|
||||||
loop {
|
|
||||||
let n = n_iter.next().unwrap();
|
|
||||||
if let Some(n) = n {
|
|
||||||
match serde_json::from_str::<Notification>(&n.payload) {
|
|
||||||
Ok(notification) => match handle_notification(notification, &db, &pss) {
|
|
||||||
Ok(()) => (),
|
|
||||||
Err(e) => warn!("{:?}", e),
|
|
||||||
}
|
|
||||||
Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -7,7 +7,6 @@ use uuid::Uuid;
|
|||||||
use failure::Error;
|
use failure::Error;
|
||||||
use failure::err_msg;
|
use failure::err_msg;
|
||||||
|
|
||||||
use ws::{Ws};
|
|
||||||
use pg::{Db};
|
use pg::{Db};
|
||||||
use construct::{Construct};
|
use construct::{Construct};
|
||||||
use game::{Game, game_state, game_skill, game_ready};
|
use game::{Game, game_state, game_skill, game_ready};
|
||||||
@ -34,6 +33,8 @@ pub enum RpcMessage {
|
|||||||
Pong(()),
|
Pong(()),
|
||||||
|
|
||||||
DevResolutions(Resolutions),
|
DevResolutions(Resolutions),
|
||||||
|
|
||||||
|
Error(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug,Clone,Serialize,Deserialize)]
|
#[derive(Debug,Clone,Serialize,Deserialize)]
|
||||||
@ -68,7 +69,7 @@ enum RpcRequest {
|
|||||||
VboxReclaim { instance_id: Uuid, index: usize },
|
VboxReclaim { instance_id: Uuid, index: usize },
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn receive(data: Vec<u8>, db: &Db, _client: &mut Ws, begin: Instant, account: &Option<Account>) -> Result<RpcMessage, Error> {
|
pub fn receive(data: Vec<u8>, db: &Db, begin: Instant, account: &Option<Account>) -> Result<RpcMessage, Error> {
|
||||||
// cast the msg to this type to receive method name
|
// cast the msg to this type to receive method name
|
||||||
match from_slice::<RpcRequest>(&data) {
|
match from_slice::<RpcRequest>(&data) {
|
||||||
Ok(v) => {
|
Ok(v) => {
|
||||||
|
|||||||
175
server/src/websocket.rs
Normal file
175
server/src/websocket.rs
Normal file
@ -0,0 +1,175 @@
|
|||||||
|
use std::time::{Instant};
|
||||||
|
use std::thread::spawn;
|
||||||
|
use std::str;
|
||||||
|
|
||||||
|
use rand::prelude::*;
|
||||||
|
|
||||||
|
use serde_cbor::to_vec;
|
||||||
|
|
||||||
|
use cookie::Cookie;
|
||||||
|
|
||||||
|
use crossbeam_channel::{unbounded, Sender as CbSender};
|
||||||
|
use ws::{ listen, CloseCode, Message, Handler, Result, Request, Response};
|
||||||
|
|
||||||
|
use account;
|
||||||
|
use account::{Account};
|
||||||
|
use pg::{PgPool};
|
||||||
|
use events::Event;
|
||||||
|
|
||||||
|
use rpc::{RpcMessage};
|
||||||
|
use rpc;
|
||||||
|
use mtx;
|
||||||
|
use net::TOKEN_HEADER;
|
||||||
|
|
||||||
|
struct Connection {
|
||||||
|
pub id: usize,
|
||||||
|
pub ws: CbSender<RpcMessage>,
|
||||||
|
pool: PgPool,
|
||||||
|
account: Option<Account>,
|
||||||
|
events: CbSender<Event>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// we unwrap everything in here cause really
|
||||||
|
// we don't care if this panics
|
||||||
|
// it's run in a thread so it's supposed to bail
|
||||||
|
// when it encounters errors
|
||||||
|
impl Handler for Connection {
|
||||||
|
fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> {
|
||||||
|
info!("websocket connected account={:?}", self.account);
|
||||||
|
|
||||||
|
// tell events we have connected
|
||||||
|
self.events.send(Event::Connect(self.id, self.account.clone(), self.ws.clone())).unwrap();
|
||||||
|
|
||||||
|
// if user logged in do some prep work
|
||||||
|
if let Some(ref a) = self.account {
|
||||||
|
self.ws.send(RpcMessage::AccountState(a.clone())).unwrap();
|
||||||
|
self.events.send(Event::Subscribe(self.id, a.id)).unwrap();
|
||||||
|
|
||||||
|
let db = self.pool.get().unwrap();
|
||||||
|
let mut tx = db.transaction().unwrap();
|
||||||
|
|
||||||
|
// send account constructs
|
||||||
|
let account_constructs = account::account_constructs(&mut tx, a).unwrap();
|
||||||
|
self.ws.send(rpc::RpcMessage::AccountConstructs(account_constructs)).unwrap();
|
||||||
|
|
||||||
|
// get account instances
|
||||||
|
// and send them to the client
|
||||||
|
let account_instances = account::account_instances(&mut tx, a).unwrap();
|
||||||
|
self.ws.send(rpc::RpcMessage::AccountInstances(account_instances)).unwrap();
|
||||||
|
|
||||||
|
let shop = mtx::account_shop(&mut tx, &a).unwrap();
|
||||||
|
self.ws.send(rpc::RpcMessage::AccountShop(shop)).unwrap();
|
||||||
|
|
||||||
|
// tx should do nothing
|
||||||
|
tx.commit().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_message(&mut self, msg: Message) -> Result<()> {
|
||||||
|
match msg {
|
||||||
|
Message::Binary(msg) => {
|
||||||
|
let begin = Instant::now();
|
||||||
|
let db_connection = self.pool.get().unwrap();
|
||||||
|
|
||||||
|
match rpc::receive(msg, &db_connection, begin, &self.account) {
|
||||||
|
Ok(reply) => {
|
||||||
|
// if the user queries the state of something
|
||||||
|
// we tell events to push updates to them
|
||||||
|
match reply {
|
||||||
|
RpcMessage::AccountState(ref v) =>
|
||||||
|
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
|
||||||
|
RpcMessage::GameState(ref v) =>
|
||||||
|
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
|
||||||
|
RpcMessage::InstanceState(ref v) =>
|
||||||
|
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
|
||||||
|
_ => (),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.ws.send(reply).unwrap();
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
warn!("{:?}", e);
|
||||||
|
self.ws.send(RpcMessage::Error(e.to_string())).unwrap();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
},
|
||||||
|
_ => (),
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_close(&mut self, _: CloseCode, _: &str) {
|
||||||
|
info!("websocket disconnected account={:?}", self.account);
|
||||||
|
self.events.send(Event::Disconnect(self.id)).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_request(&mut self, req: &Request) -> Result<Response> {
|
||||||
|
let res = Response::from_request(req)?;
|
||||||
|
|
||||||
|
if let Some(cl) = req.header("Cookie") {
|
||||||
|
let unauth = || Ok(Response::new(401, "Unauthorized", b"401 - Unauthorized".to_vec()));
|
||||||
|
let cookie_list = match str::from_utf8(cl) {
|
||||||
|
Ok(cl) => cl,
|
||||||
|
Err(_) => return unauth(),
|
||||||
|
};
|
||||||
|
|
||||||
|
for s in cookie_list.split(";").map(|s| s.trim()) {
|
||||||
|
let cookie = match Cookie::parse(s) {
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(_) => return unauth(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// got auth token
|
||||||
|
if cookie.name() == TOKEN_HEADER {
|
||||||
|
let db = self.pool.get().unwrap();
|
||||||
|
match account::from_token(&db, cookie.value().to_string()) {
|
||||||
|
Ok(a) => self.account = Some(a),
|
||||||
|
Err(_) => return unauth(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn start(pool: PgPool, events_tx: CbSender<Event>) {
|
||||||
|
let mut rng = thread_rng();
|
||||||
|
listen("127.0.0.1:40055", move |out| {
|
||||||
|
|
||||||
|
// we give the tx half to the connection object
|
||||||
|
// which in turn passes a clone to the events system
|
||||||
|
// the rx half goes into a thread where it waits for messages
|
||||||
|
// that need to be delivered to the client
|
||||||
|
// both the ws message handler and the events thread must use
|
||||||
|
// this channel to send messages
|
||||||
|
let (tx, rx) = unbounded::<RpcMessage>();
|
||||||
|
|
||||||
|
spawn(move || {
|
||||||
|
loop {
|
||||||
|
match rx.recv() {
|
||||||
|
Ok(n) => {
|
||||||
|
let response = to_vec(&n).unwrap();
|
||||||
|
out.send(Message::Binary(response)).unwrap();
|
||||||
|
}
|
||||||
|
// we done
|
||||||
|
Err(_e) => {
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Connection {
|
||||||
|
id: rng.gen::<usize>(),
|
||||||
|
account: None,
|
||||||
|
ws: tx,
|
||||||
|
pool: pool.clone(),
|
||||||
|
events: events_tx.clone(),
|
||||||
|
}
|
||||||
|
}).unwrap();
|
||||||
|
}
|
||||||
266
server/src/ws.rs
266
server/src/ws.rs
@ -1,266 +0,0 @@
|
|||||||
use std::time::{Instant};
|
|
||||||
use std::net::{TcpStream, TcpListener};
|
|
||||||
use std::thread::{spawn};
|
|
||||||
use std::str;
|
|
||||||
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use cookie::Cookie;
|
|
||||||
|
|
||||||
use tungstenite::Message::Binary;
|
|
||||||
use tungstenite::handshake::server::{Request, ErrorResponse};
|
|
||||||
use tungstenite::handshake::HandshakeRole;
|
|
||||||
use tungstenite::http::StatusCode;
|
|
||||||
use tungstenite::protocol::WebSocket;
|
|
||||||
use tungstenite::util::NonBlockingResult;
|
|
||||||
use tungstenite::{accept_hdr};
|
|
||||||
|
|
||||||
use crossbeam_channel::{unbounded, Receiver};
|
|
||||||
|
|
||||||
use serde_cbor::{to_vec};
|
|
||||||
|
|
||||||
use failure::Error;
|
|
||||||
use failure::{err_msg, format_err};
|
|
||||||
|
|
||||||
use net::TOKEN_HEADER;
|
|
||||||
use rpc;
|
|
||||||
use rpc::{RpcMessage};
|
|
||||||
|
|
||||||
use mtx;
|
|
||||||
use pg::PgPool;
|
|
||||||
use account;
|
|
||||||
use account::Account;
|
|
||||||
use pubsub::Message;
|
|
||||||
|
|
||||||
pub type Ws = WebSocket<TcpStream>;
|
|
||||||
|
|
||||||
#[derive(Debug,Clone,Serialize)]
|
|
||||||
struct RpcError {
|
|
||||||
err: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct Subscriptions {
|
|
||||||
account: Option<Uuid>,
|
|
||||||
game: Option<Uuid>,
|
|
||||||
instance: Option<Uuid>,
|
|
||||||
// account_instances: Vec<Uuid>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Subscriptions {
|
|
||||||
fn new(ws_pool: &PgPool, account: &Option<Account>, ws: &mut Ws) -> Result<Subscriptions, Error> {
|
|
||||||
if let Some(a) = account {
|
|
||||||
let db = ws_pool.get()?;
|
|
||||||
let mut tx = db.transaction()?;
|
|
||||||
|
|
||||||
// send account constructs
|
|
||||||
let account_constructs = account::account_constructs(&mut tx, a)?;
|
|
||||||
ws.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?;
|
|
||||||
|
|
||||||
// get account instances
|
|
||||||
// and send them to the client
|
|
||||||
let account_instances = account::account_instances(&mut tx, a)?;
|
|
||||||
// let instances = account_instances.iter().map(|i| i.id).collect::<Vec<Uuid>>();
|
|
||||||
ws.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?;
|
|
||||||
|
|
||||||
// get players
|
|
||||||
// add to games
|
|
||||||
|
|
||||||
tx.commit()?;
|
|
||||||
|
|
||||||
return Ok(Subscriptions {
|
|
||||||
account: Some(a.id),
|
|
||||||
game: None,
|
|
||||||
instance: None,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Subscriptions {
|
|
||||||
account: None,
|
|
||||||
game: None,
|
|
||||||
instance: None
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn update(&mut self, msg: &RpcMessage) -> Result<&mut Subscriptions, Error> {
|
|
||||||
match msg {
|
|
||||||
RpcMessage::AccountState(a) => self.account = Some(a.id),
|
|
||||||
RpcMessage::InstanceState(i) => self.instance = Some(i.id),
|
|
||||||
RpcMessage::GameState(g) => self.game = Some(g.id),
|
|
||||||
_ => (),
|
|
||||||
};
|
|
||||||
|
|
||||||
// info!("subscriptions updated {:?}", self);
|
|
||||||
|
|
||||||
Ok(self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) {
|
|
||||||
if let Some(msg) = match m {
|
|
||||||
Message::Account(a) => {
|
|
||||||
match subs.account {
|
|
||||||
Some(wsa) => match wsa == a.id {
|
|
||||||
true => Some(rpc::RpcMessage::AccountState(a)),
|
|
||||||
false => None,
|
|
||||||
},
|
|
||||||
None => None,
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Message::Instance(i) => {
|
|
||||||
match subs.instance {
|
|
||||||
Some(ci) => match ci == i.id {
|
|
||||||
true => Some(rpc::RpcMessage::InstanceState(i)),
|
|
||||||
false => None,
|
|
||||||
},
|
|
||||||
None => None,
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Message::Game(g) => {
|
|
||||||
match subs.game {
|
|
||||||
Some(cg) => match cg == g.id {
|
|
||||||
true => Some(rpc::RpcMessage::GameState(g)),
|
|
||||||
false => None,
|
|
||||||
},
|
|
||||||
None => None,
|
|
||||||
}
|
|
||||||
},
|
|
||||||
// _ => None,
|
|
||||||
} {
|
|
||||||
ws.write_message(Binary(to_vec(&msg).unwrap())).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn start(pool: PgPool, psr: Receiver<Message>) {
|
|
||||||
let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap();
|
|
||||||
for stream in ws_server.incoming() {
|
|
||||||
let ws_pool = pool.clone();
|
|
||||||
let ws_psr = psr.clone();
|
|
||||||
spawn(move || {
|
|
||||||
let (acc_s, acc_r) = unbounded();
|
|
||||||
|
|
||||||
let nb_stream = stream.unwrap();
|
|
||||||
nb_stream.set_nonblocking(true).unwrap();
|
|
||||||
|
|
||||||
// search through the ws request for the auth cookie
|
|
||||||
let cb = |req: &Request| {
|
|
||||||
let err = || ErrorResponse {
|
|
||||||
error_code: StatusCode::FORBIDDEN,
|
|
||||||
headers: None,
|
|
||||||
body: Some("Unauthorized".into()),
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(cl) = req.headers.find_first("Cookie") {
|
|
||||||
let cookie_list = str::from_utf8(cl).or(Err(err()))?;
|
|
||||||
|
|
||||||
for s in cookie_list.split(";").map(|s| s.trim()) {
|
|
||||||
let cookie = Cookie::parse(s).or(Err(err()))?;
|
|
||||||
|
|
||||||
// got auth token
|
|
||||||
if cookie.name() == TOKEN_HEADER {
|
|
||||||
acc_s.send(Some(cookie.value().to_string())).or(Err(err()))?;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
};
|
|
||||||
acc_s.send(None).unwrap();
|
|
||||||
Ok(None)
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut websocket = accept_hdr(nb_stream, cb).unwrap();
|
|
||||||
|
|
||||||
// get a copy of the account
|
|
||||||
let account = match acc_r.recv().unwrap() {
|
|
||||||
Some(t) => {
|
|
||||||
let db = ws_pool.get()
|
|
||||||
.expect("unable to get db connection");
|
|
||||||
|
|
||||||
|
|
||||||
match account::from_token(&db, t) {
|
|
||||||
Ok(a) => {
|
|
||||||
let state = to_vec(&rpc::RpcMessage::AccountState(a.clone())).unwrap();
|
|
||||||
websocket.write_message(Binary(state)).unwrap();
|
|
||||||
|
|
||||||
let mut tx = db.transaction().unwrap();
|
|
||||||
let shop = mtx::account_shop(&mut tx, &a).unwrap();
|
|
||||||
let shop = to_vec(&rpc::RpcMessage::AccountShop(shop)).unwrap();
|
|
||||||
|
|
||||||
websocket.write_message(Binary(shop)).unwrap();
|
|
||||||
|
|
||||||
// tx doesn't change anything
|
|
||||||
tx.commit().unwrap();
|
|
||||||
|
|
||||||
Some(a)
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
warn!("{:?}", e);
|
|
||||||
return;
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut subs = match Subscriptions::new(&ws_pool, &account, &mut websocket) {
|
|
||||||
Ok(s) => s,
|
|
||||||
Err(e) => {
|
|
||||||
warn!("subscriptions error err={:?}", e);
|
|
||||||
return;
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match websocket.read_message().no_block() {
|
|
||||||
Ok(msg) => {
|
|
||||||
if let Some(msg) = msg {
|
|
||||||
match msg {
|
|
||||||
Binary(data) => {
|
|
||||||
let begin = Instant::now();
|
|
||||||
let db_connection = ws_pool.get()
|
|
||||||
.expect("unable to get db connection");
|
|
||||||
|
|
||||||
match rpc::receive(data, &db_connection, &mut websocket, begin, &account) {
|
|
||||||
Ok(reply) => {
|
|
||||||
let response = to_vec(&reply)
|
|
||||||
.expect("failed to serialize response");
|
|
||||||
|
|
||||||
if let Err(e) = websocket.write_message(Binary(response)) {
|
|
||||||
// connection closed
|
|
||||||
warn!("{:?}", e);
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
subs.update(&reply).unwrap();
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
warn!("{:?}", e);
|
|
||||||
let response = to_vec(&RpcError { err: e.to_string() })
|
|
||||||
.expect("failed to serialize error response");
|
|
||||||
|
|
||||||
if let Err(e) = websocket.write_message(Binary(response)) {
|
|
||||||
// connection closed
|
|
||||||
warn!("{:?}", e);
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
_ => (),
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match ws_psr.try_recv() {
|
|
||||||
Ok(n) => handle_message(&subs, n, &mut websocket),
|
|
||||||
Err(_) => (),
|
|
||||||
};
|
|
||||||
},
|
|
||||||
// connection is closed
|
|
||||||
Err(e) => {
|
|
||||||
warn!("{:?}", e);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user