mnml/server/src/rpc.rs
2019-11-12 16:32:19 +11:00

495 lines
19 KiB
Rust

use std::time::{Instant};
use std::thread::{spawn};
use std::str;
use uuid::Uuid;
use rand::prelude::*;
use failure::Error;
use failure::err_msg;
use serde_cbor::{from_slice, to_vec};
use cookie::Cookie;
use stripe::{Client as StripeClient, Subscription};
use crossbeam_channel::{unbounded, Sender as CbSender};
use ws::{Builder, CloseCode, Message, Handler, Request, Response, Settings, Sender as WsSender};
use account::{Account};
use account;
use construct::{Construct};
use events::{Event};
use game::{Game, game_state, game_skill, game_skill_clear, game_ready, game_offer_draw};
use instance::{Instance, ChatState, instance_state, instance_practice, instance_ready, instance_abandon, demo};
use item::{Item, ItemInfoCtr, item_info};
use mtx;
use mail;
use player::{Player};
use payments;
use mail::Email;
use pg::{Db};
use pg::{PgPool};
use skill::{Skill, dev_resolve, Resolutions};
use vbox::{vbox_accept, vbox_apply, vbox_discard, vbox_combine, vbox_reclaim, vbox_unequip};
use http::{AUTH_CLEAR, TOKEN_HEADER};
#[derive(Debug,Clone,Serialize)]
pub enum RpcMessage {
AccountState(Account),
AccountConstructs(Vec<Construct>),
AccountTeam(Vec<Construct>),
AccountInstances(Vec<Instance>),
AccountShop(mtx::Shop),
Demo(Vec<Player>),
ConstructSpawn(Construct),
GameState(Game),
ItemInfo(ItemInfoCtr),
InstanceState(Instance),
InstanceChat(ChatState),
ChatWheel(Vec<String>),
EmailState(Option<Email>),
SubscriptionState(Option<Subscription>),
Pong(()),
DevResolutions(Resolutions),
QueueRequested(()),
QueueJoined(()),
QueueLeft(()),
QueueFound(()),
InviteRequested(()),
Invite(String),
Joining(()),
Processing(()),
Error(String),
}
#[derive(Debug,Clone,Serialize,Deserialize)]
pub enum RpcRequest {
Ping {},
ItemInfo {},
DevResolve { a: Uuid, b: Uuid, skill: Skill },
MtxConstructApply { mtx: mtx::MtxVariant, construct_id: Uuid, name: String },
MtxConstructSpawn { },
MtxAccountApply { mtx: mtx::MtxVariant },
MtxBuy { mtx: mtx::MtxVariant },
GameState { id: Uuid },
GameReady { id: Uuid },
GameSkill { game_id: Uuid, construct_id: Uuid, target_construct_id: Uuid, skill: Skill },
GameSkillClear { game_id: Uuid },
GameOfferDraw { game_id: Uuid },
AccountState {},
AccountShop {},
AccountInstances {},
AccountConstructs {},
AccountSetTeam { ids: Vec<Uuid> },
SubscriptionEnding { ending: bool },
SubscriptionState {},
EmailState {},
InstanceInvite {},
InstanceJoin { code: String },
InstanceQueue {},
InstanceLeave {},
InstancePractice {},
InstanceAbandon { instance_id: Uuid },
InstanceReady { instance_id: Uuid },
InstanceState { instance_id: Uuid },
InstanceChat { instance_id: Uuid, index: usize },
VboxAccept { instance_id: Uuid, group: usize, index: usize },
VboxAcceptEquip { instance_id: Uuid, group: usize, index: usize, construct_id: Uuid },
VboxDiscard { instance_id: Uuid },
VboxCombine { instance_id: Uuid, indices: Vec<usize> },
VboxApply { instance_id: Uuid, construct_id: Uuid, index: usize },
VboxUnequip { instance_id: Uuid, construct_id: Uuid, target: Item },
VboxUnequipApply { instance_id: Uuid, construct_id: Uuid, target: Item, target_construct_id: Uuid },
VboxReclaim { instance_id: Uuid, index: usize },
}
struct Connection {
pub id: usize,
pub ws: CbSender<RpcMessage>,
pool: PgPool,
stripe: StripeClient,
account: Option<Account>,
events: CbSender<Event>,
}
impl Connection {
fn receive(&self, data: Vec<u8>, db: &Db, begin: Instant) -> Result<RpcMessage, Error> {
// cast the msg to this type to receive method name
match from_slice::<RpcRequest>(&data) {
Ok(v) => {
// non authenticated
// non transactional reqs
match v {
RpcRequest::Ping {} => return Ok(RpcMessage::Pong(())),
RpcRequest::ItemInfo {} => return Ok(RpcMessage::ItemInfo(item_info())),
RpcRequest::DevResolve {a, b, skill } =>
return Ok(RpcMessage::DevResolutions(dev_resolve(a, b, skill))),
_ => (),
};
// check for authorization now
let account = match self.account {
Some(ref account) => account,
None => return Err(err_msg("auth required")),
};
let request = v.clone();
let response = match v {
// evented but authorization required
RpcRequest::InstanceQueue {} => {
self.events.send(Event::Queue(self.id))?;
Ok(RpcMessage::QueueRequested(()))
},
RpcRequest::InstanceInvite {} => {
self.events.send(Event::Invite(self.id))?;
Ok(RpcMessage::InviteRequested(()))
},
RpcRequest::InstanceJoin { code } => {
self.events.send(Event::Join(self.id, code))?;
Ok(RpcMessage::Joining(()))
},
RpcRequest::InstanceLeave {} => {
self.events.send(Event::Leave(self.id))?;
Ok(RpcMessage::Processing(()))
},
RpcRequest::InstanceChat { instance_id, index } => {
if !account.subscribed {
return Err(err_msg("subscribe to unlock chat"))
}
let wheel = account::chat_wheel(&db, account.id)?;
if let Some(c) = wheel.get(index) {
self.events.send(Event::Chat(self.id, instance_id, c.to_string()))?;
} else {
return Err(err_msg("invalid chat index"));
}
Ok(RpcMessage::Processing(()))
},
_ => {
// all good, let's make a tx and process
let mut tx = db.transaction()?;
let res = match v {
RpcRequest::AccountState {} =>
Ok(RpcMessage::AccountState(account.clone())),
RpcRequest::AccountConstructs {} =>
Ok(RpcMessage::AccountConstructs(account::constructs(&mut tx, &account)?)),
RpcRequest::AccountInstances {} =>
Ok(RpcMessage::AccountInstances(account::account_instances(&mut tx, account)?)),
RpcRequest::AccountSetTeam { ids } =>
Ok(RpcMessage::AccountTeam(account::set_team(&mut tx, &account, ids)?)),
RpcRequest::EmailState {} =>
Ok(RpcMessage::EmailState(mail::select_account(&db, account.id)?)),
RpcRequest::SubscriptionState {} =>
Ok(RpcMessage::SubscriptionState(payments::account_subscription(&db, &self.stripe, &account)?)),
// RpcRequest::AccountShop {} =>
// Ok(RpcMessage::AccountShop(mtx::account_shop(&mut tx, &account)?)),
// RpcRequest::ConstructDelete" => handle_construct_delete(data, &mut tx, account),
RpcRequest::GameState { id } =>
Ok(RpcMessage::GameState(game_state(&mut tx, account, id)?)),
RpcRequest::GameSkill { game_id, construct_id, target_construct_id, skill } =>
Ok(RpcMessage::GameState(game_skill(&mut tx, account, game_id, construct_id, target_construct_id, skill)?)),
RpcRequest::GameSkillClear { game_id } =>
Ok(RpcMessage::GameState(game_skill_clear(&mut tx, account, game_id)?)),
RpcRequest::GameReady { id } =>
Ok(RpcMessage::GameState(game_ready(&mut tx, account, id)?)),
RpcRequest::GameOfferDraw { game_id } =>
Ok(RpcMessage::GameState(game_offer_draw(&mut tx, account, game_id)?)),
RpcRequest::InstancePractice {} =>
Ok(RpcMessage::InstanceState(instance_practice(&mut tx, account)?)),
// these two can return GameState or InstanceState
RpcRequest::InstanceReady { instance_id } =>
Ok(instance_ready(&mut tx, account, instance_id)?),
RpcRequest::InstanceState { instance_id } =>
Ok(instance_state(&mut tx, instance_id)?),
RpcRequest::InstanceAbandon { instance_id } =>
Ok(instance_abandon(&mut tx, account, instance_id)?),
RpcRequest::VboxAccept { instance_id, group, index } =>
Ok(RpcMessage::InstanceState(vbox_accept(&mut tx, account, instance_id, group, index, None)?)),
RpcRequest::VboxAcceptEquip { instance_id, group, index, construct_id } =>
Ok(RpcMessage::InstanceState(vbox_accept(&mut tx, account, instance_id, group, index, Some(construct_id))?)),
RpcRequest::VboxApply { instance_id, construct_id, index } =>
Ok(RpcMessage::InstanceState(vbox_apply(&mut tx, account, instance_id, construct_id, index)?)),
RpcRequest::VboxCombine { instance_id, indices } =>
Ok(RpcMessage::InstanceState(vbox_combine(&mut tx, account, instance_id, indices)?)),
RpcRequest::VboxDiscard { instance_id } =>
Ok(RpcMessage::InstanceState(vbox_discard(&mut tx, account, instance_id)?)),
RpcRequest::VboxReclaim { instance_id, index } =>
Ok(RpcMessage::InstanceState(vbox_reclaim(&mut tx, account, instance_id, index)?)),
RpcRequest::VboxUnequip { instance_id, construct_id, target } =>
Ok(RpcMessage::InstanceState(vbox_unequip(&mut tx, account, instance_id, construct_id, target, None)?)),
RpcRequest::VboxUnequipApply { instance_id, construct_id, target, target_construct_id } =>
Ok(RpcMessage::InstanceState(vbox_unequip(&mut tx, account, instance_id, construct_id, target, Some(target_construct_id))?)),
RpcRequest::MtxConstructSpawn {} =>
Ok(RpcMessage::ConstructSpawn(mtx::new_construct(&mut tx, account)?)),
RpcRequest::MtxConstructApply { mtx, construct_id, name } =>
Ok(RpcMessage::AccountTeam(mtx::apply(&mut tx, account, mtx, construct_id, name)?)),
RpcRequest::MtxAccountApply { mtx } =>
Ok(RpcMessage::AccountState(mtx::account_apply(&mut tx, account, mtx)?)),
RpcRequest::MtxBuy { mtx } =>
Ok(RpcMessage::AccountShop(mtx::buy(&mut tx, account, mtx)?)),
RpcRequest::SubscriptionEnding { ending } =>
Ok(RpcMessage::SubscriptionState(payments::subscription_ending(&mut tx, &self.stripe, account, ending)?)),
_ => Err(format_err!("unknown request request={:?}", request)),
};
tx.commit()?;
res
}
};
info!("request={:?} account={:?} duration={:?}", request, account.name, begin.elapsed());
return response;
},
Err(e) => {
warn!("{:?}", e);
Err(err_msg("invalid message"))
},
}
}
// this is where last minute processing happens
// use it to modify outgoing messages, update subs, serialize in some way...
fn send(&self, msg: RpcMessage) -> Result<(), Error> {
let msg = match self.account {
Some(ref a) => match msg {
RpcMessage::InstanceState(v) => RpcMessage::InstanceState(v.redact(a.id)),
RpcMessage::AccountInstances(v) =>
RpcMessage::AccountInstances(v.into_iter().map(|i| i.redact(a.id)).collect()),
RpcMessage::GameState(v) => RpcMessage::GameState(v.redact(a.id)),
_ => msg,
},
None => msg,
};
self.ws.send(msg).unwrap();
Ok(())
}
}
// we unwrap everything in here cause really
// we don't care if this panics
// it's run in a thread so it's supposed to bail
// when it encounters errors
impl Handler for Connection {
fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> {
info!("websocket connected account={:?}", self.account);
// tell events we have connected
self.events.send(Event::Connect(self.id, self.account.clone(), self.ws.clone())).unwrap();
// if user logged in do some prep work
if let Some(ref a) = self.account {
self.send(RpcMessage::AccountState(a.clone())).unwrap();
self.events.send(Event::Subscribe(self.id, a.id)).unwrap();
// check if they have an image that needs to be generated
account::img_check(&a).unwrap();
let db = self.pool.get().unwrap();
let mut tx = db.transaction().unwrap();
// send account constructs
let account_constructs = account::constructs(&mut tx, a).unwrap();
self.send(RpcMessage::AccountConstructs(account_constructs)).unwrap();
// get account instances
// and send them to the client
let account_instances = account::account_instances(&mut tx, a).unwrap();
self.send(RpcMessage::AccountInstances(account_instances)).unwrap();
let shop = mtx::account_shop(&mut tx, &a).unwrap();
self.send(RpcMessage::AccountShop(shop)).unwrap();
let team = account::team(&mut tx, &a).unwrap();
self.send(RpcMessage::AccountTeam(team)).unwrap();
let wheel = account::chat_wheel(&db, a.id).unwrap();
self.send(RpcMessage::ChatWheel(wheel)).unwrap();
if let Some(instance) = account::tutorial(&mut tx, &a).unwrap() {
self.send(RpcMessage::InstanceState(instance)).unwrap();
}
// tx should do nothing
tx.commit().unwrap();
} else {
self.send(RpcMessage::Demo(demo().unwrap())).unwrap();
}
Ok(())
}
fn on_message(&mut self, msg: Message) -> ws::Result<()> {
match msg {
Message::Binary(msg) => {
let begin = Instant::now();
let db_connection = self.pool.get().unwrap();
match self.receive(msg, &db_connection, begin) {
Ok(reply) => {
// if the user queries the state of something
// we tell events to push updates to them
match reply {
RpcMessage::AccountState(ref v) => {
self.account = Some(v.clone());
self.events.send(Event::Subscribe(self.id, v.id)).unwrap()
},
RpcMessage::GameState(ref v) =>
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
RpcMessage::InstanceState(ref v) =>
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
_ => (),
};
self.send(reply).unwrap();
},
Err(e) => {
warn!("{:?}", e);
self.send(RpcMessage::Error(e.to_string())).unwrap();
},
};
},
_ => (),
};
Ok(())
}
fn on_close(&mut self, _: CloseCode, _: &str) {
info!("websocket disconnected account={:?}", self.account);
self.events.send(Event::Disconnect(self.id)).unwrap();
}
fn on_request(&mut self, req: &Request) -> ws::Result<Response> {
let res = Response::from_request(req)?;
if let Some(cl) = req.header("Cookie") {
let unauth = || {
let mut res = Response::new(401, "Unauthorized", b"401 - Unauthorized".to_vec());
res.headers_mut().push(("Set-Cookie".into(), AUTH_CLEAR.into()));
Ok(res)
};
let cookie_list = match str::from_utf8(cl) {
Ok(cl) => cl,
Err(_) => return unauth(),
};
for s in cookie_list.split(";").map(|s| s.trim()) {
let cookie = match Cookie::parse(s) {
Ok(c) => c,
Err(_) => return unauth(),
};
// got auth token
if cookie.name() == TOKEN_HEADER {
let db = self.pool.get().unwrap();
match account::from_token(&db, &cookie.value().to_string()) {
Ok(a) => self.account = Some(a),
Err(_) => return unauth(),
}
}
};
};
Ok(res)
}
}
pub fn start(pool: PgPool, events_tx: CbSender<Event>, stripe: StripeClient) {
let mut rng = thread_rng();
Builder::new()
.with_settings(Settings {
max_connections: 10_000,
..Settings::default()
})
.build(move |out: WsSender| {
// we give the tx half to the connection object
// which in turn passes a clone to the events system
// the rx half goes into a thread where it waits for messages
// that need to be delivered to the client
// both the ws message handler and the events thread must use
// this channel to send messages
let (tx, rx) = unbounded::<RpcMessage>();
spawn(move || {
loop {
match rx.recv() {
Ok(n) => {
let response = to_vec(&n).unwrap();
out.send(Message::Binary(response)).unwrap();
}
// we done
Err(_e) => {
// info!("{:?}", e);
break;
},
};
}
});
Connection {
id: rng.gen::<usize>(),
account: None,
ws: tx,
pool: pool.clone(),
stripe: stripe.clone(),
events: events_tx.clone(),
}
})
.unwrap()
.listen("127.0.0.1:40055")
.unwrap();
}