503 lines
20 KiB
Rust
503 lines
20 KiB
Rust
use std::time::{Instant};
|
|
use std::thread::{spawn};
|
|
|
|
use std::str;
|
|
|
|
use uuid::Uuid;
|
|
use rand::prelude::*;
|
|
|
|
use failure::Error;
|
|
use failure::err_msg;
|
|
|
|
use serde_cbor::{from_slice, to_vec};
|
|
use cookie::Cookie;
|
|
|
|
use stripe::{Client as StripeClient, Subscription};
|
|
|
|
use crossbeam_channel::{unbounded, Sender as CbSender};
|
|
use ws::{Builder, CloseCode, Message, Handler, Request, Response, Settings, Sender as WsSender};
|
|
use ws::deflate::DeflateHandler;
|
|
|
|
use account::{Account};
|
|
use account;
|
|
use construct::{Construct};
|
|
use events::{Event};
|
|
use game::{Game, game_state, game_skill, game_skill_clear, game_ready, game_offer_draw, game_concede};
|
|
use instance::{Instance, ChatState, instance_state, instance_practice, instance_ready, instance_abandon, demo};
|
|
use item::{Item, ItemInfoCtr, item_info};
|
|
use mtx;
|
|
use mail;
|
|
|
|
use player::{Player};
|
|
use payments;
|
|
use mail::Email;
|
|
use pg::{Db};
|
|
use pg::{PgPool};
|
|
use skill::{Skill, dev_resolve, Resolutions};
|
|
use vbox::{vbox_accept, vbox_apply, vbox_discard, vbox_combine, vbox_reclaim, vbox_unequip};
|
|
use http::{AUTH_CLEAR, TOKEN_HEADER};
|
|
|
|
#[derive(Debug,Clone,Serialize)]
|
|
pub enum RpcMessage {
|
|
AccountState(Account),
|
|
AccountConstructs(Vec<Construct>),
|
|
AccountTeam(Vec<Construct>),
|
|
AccountInstances(Vec<Instance>),
|
|
AccountShop(mtx::Shop),
|
|
|
|
Demo(Vec<Player>),
|
|
|
|
ConstructSpawn(Construct),
|
|
GameState(Game),
|
|
ItemInfo(ItemInfoCtr),
|
|
|
|
InstanceState(Instance),
|
|
InstanceChat(ChatState),
|
|
ChatWheel(Vec<String>),
|
|
|
|
EmailState(Option<Email>),
|
|
SubscriptionState(Option<Subscription>),
|
|
|
|
Pong(()),
|
|
|
|
DevResolutions(Resolutions),
|
|
|
|
QueueRequested(()),
|
|
QueueJoined(()),
|
|
QueueLeft(()),
|
|
QueueFound(()),
|
|
|
|
InviteRequested(()),
|
|
Invite(String),
|
|
Joining(()),
|
|
|
|
Processing(()),
|
|
|
|
Error(String),
|
|
}
|
|
|
|
#[derive(Debug,Clone,Serialize,Deserialize)]
|
|
pub enum RpcRequest {
|
|
Ping {},
|
|
ItemInfo {},
|
|
DevResolve { a: Uuid, b: Uuid, skill: Skill },
|
|
|
|
MtxConstructApply { mtx: mtx::MtxVariant, construct_id: Uuid, name: String },
|
|
MtxConstructSpawn { },
|
|
MtxAccountApply { mtx: mtx::MtxVariant },
|
|
MtxBuy { mtx: mtx::MtxVariant },
|
|
|
|
GameState { id: Uuid },
|
|
GameReady { id: Uuid },
|
|
GameSkill { game_id: Uuid, construct_id: Uuid, target_construct_id: Uuid, skill: Skill },
|
|
GameSkillClear { game_id: Uuid },
|
|
GameOfferDraw { game_id: Uuid },
|
|
GameConcede { game_id: Uuid },
|
|
|
|
AccountState {},
|
|
AccountShop {},
|
|
AccountInstances {},
|
|
AccountConstructs {},
|
|
AccountSetTeam { ids: Vec<Uuid> },
|
|
|
|
SubscriptionEnding { ending: bool },
|
|
SubscriptionState {},
|
|
EmailState {},
|
|
|
|
InstanceInvite {},
|
|
InstanceJoin { code: String },
|
|
InstanceQueue {},
|
|
InstanceLeave {},
|
|
InstancePractice {},
|
|
InstanceAbandon { instance_id: Uuid },
|
|
InstanceReady { instance_id: Uuid },
|
|
InstanceState { instance_id: Uuid },
|
|
InstanceChat { instance_id: Uuid, index: usize },
|
|
|
|
VboxAccept { instance_id: Uuid, group: usize, index: usize },
|
|
VboxAcceptEquip { instance_id: Uuid, group: usize, index: usize, construct_id: Uuid },
|
|
VboxDiscard { instance_id: Uuid },
|
|
VboxCombine { instance_id: Uuid, inv_indices: Vec<usize>, vbox_indices: Vec<Vec<usize>> },
|
|
VboxApply { instance_id: Uuid, construct_id: Uuid, index: usize },
|
|
VboxUnequip { instance_id: Uuid, construct_id: Uuid, target: Item },
|
|
VboxUnequipApply { instance_id: Uuid, construct_id: Uuid, target: Item, target_construct_id: Uuid },
|
|
VboxReclaim { instance_id: Uuid, index: usize },
|
|
}
|
|
|
|
struct Connection {
|
|
pub id: usize,
|
|
pub ws: CbSender<RpcMessage>,
|
|
pool: PgPool,
|
|
stripe: StripeClient,
|
|
account: Option<Account>,
|
|
events: CbSender<Event>,
|
|
}
|
|
|
|
impl Connection {
|
|
fn receive(&self, data: Vec<u8>, db: &Db, begin: Instant) -> Result<RpcMessage, Error> {
|
|
// cast the msg to this type to receive method name
|
|
match from_slice::<RpcRequest>(&data) {
|
|
Ok(v) => {
|
|
|
|
// non authenticated
|
|
// non transactional reqs
|
|
match v {
|
|
RpcRequest::Ping {} => return Ok(RpcMessage::Pong(())),
|
|
RpcRequest::ItemInfo {} => return Ok(RpcMessage::ItemInfo(item_info())),
|
|
RpcRequest::DevResolve {a, b, skill } =>
|
|
return Ok(RpcMessage::DevResolutions(dev_resolve(a, b, skill))),
|
|
_ => (),
|
|
};
|
|
|
|
// check for authorization now
|
|
let account = match self.account {
|
|
Some(ref account) => account,
|
|
None => return Err(err_msg("auth required")),
|
|
};
|
|
|
|
let request = v.clone();
|
|
|
|
let response = match v {
|
|
// evented but authorization required
|
|
RpcRequest::InstanceQueue {} => {
|
|
self.events.send(Event::Queue(self.id))?;
|
|
Ok(RpcMessage::QueueRequested(()))
|
|
},
|
|
RpcRequest::InstanceInvite {} => {
|
|
self.events.send(Event::Invite(self.id))?;
|
|
Ok(RpcMessage::InviteRequested(()))
|
|
},
|
|
RpcRequest::InstanceJoin { code } => {
|
|
self.events.send(Event::Join(self.id, code))?;
|
|
Ok(RpcMessage::Joining(()))
|
|
},
|
|
RpcRequest::InstanceLeave {} => {
|
|
self.events.send(Event::Leave(self.id))?;
|
|
Ok(RpcMessage::Processing(()))
|
|
},
|
|
|
|
RpcRequest::InstanceChat { instance_id, index } => {
|
|
if !account.subscribed {
|
|
return Err(err_msg("subscribe to unlock chat"))
|
|
}
|
|
|
|
let wheel = account::chat_wheel(&db, account.id)?;
|
|
|
|
if let Some(c) = wheel.get(index) {
|
|
self.events.send(Event::Chat(self.id, instance_id, c.to_string()))?;
|
|
} else {
|
|
return Err(err_msg("invalid chat index"));
|
|
}
|
|
|
|
Ok(RpcMessage::Processing(()))
|
|
},
|
|
_ => {
|
|
// all good, let's make a tx and process
|
|
let mut tx = db.transaction()?;
|
|
|
|
let res = match v {
|
|
RpcRequest::AccountState {} =>
|
|
Ok(RpcMessage::AccountState(account.clone())),
|
|
RpcRequest::AccountConstructs {} =>
|
|
Ok(RpcMessage::AccountConstructs(account::constructs(&mut tx, &account)?)),
|
|
RpcRequest::AccountInstances {} =>
|
|
Ok(RpcMessage::AccountInstances(account::account_instances(&mut tx, account)?)),
|
|
RpcRequest::AccountSetTeam { ids } =>
|
|
Ok(RpcMessage::AccountTeam(account::set_team(&mut tx, &account, ids)?)),
|
|
|
|
RpcRequest::EmailState {} =>
|
|
Ok(RpcMessage::EmailState(mail::select_account(&db, account.id)?)),
|
|
|
|
RpcRequest::SubscriptionState {} =>
|
|
Ok(RpcMessage::SubscriptionState(payments::account_subscription(&db, &self.stripe, &account)?)),
|
|
|
|
// RpcRequest::AccountShop {} =>
|
|
// Ok(RpcMessage::AccountShop(mtx::account_shop(&mut tx, &account)?)),
|
|
|
|
// RpcRequest::ConstructDelete" => handle_construct_delete(data, &mut tx, account),
|
|
|
|
RpcRequest::GameState { id } =>
|
|
Ok(RpcMessage::GameState(game_state(&mut tx, account, id)?)),
|
|
|
|
RpcRequest::GameSkill { game_id, construct_id, target_construct_id, skill } =>
|
|
Ok(RpcMessage::GameState(game_skill(&mut tx, account, game_id, construct_id, target_construct_id, skill)?)),
|
|
|
|
RpcRequest::GameSkillClear { game_id } =>
|
|
Ok(RpcMessage::GameState(game_skill_clear(&mut tx, account, game_id)?)),
|
|
|
|
RpcRequest::GameReady { id } =>
|
|
Ok(RpcMessage::GameState(game_ready(&mut tx, account, id)?)),
|
|
|
|
RpcRequest::GameConcede { game_id } =>
|
|
Ok(RpcMessage::GameState(game_concede(&mut tx, account, game_id)?)),
|
|
|
|
RpcRequest::GameOfferDraw { game_id } =>
|
|
Ok(RpcMessage::GameState(game_offer_draw(&mut tx, account, game_id)?)),
|
|
|
|
RpcRequest::InstancePractice {} =>
|
|
Ok(RpcMessage::InstanceState(instance_practice(&mut tx, account)?)),
|
|
|
|
// these two can return GameState or InstanceState
|
|
RpcRequest::InstanceReady { instance_id } =>
|
|
Ok(instance_ready(&mut tx, account, instance_id)?),
|
|
RpcRequest::InstanceState { instance_id } =>
|
|
Ok(instance_state(&mut tx, instance_id)?),
|
|
RpcRequest::InstanceAbandon { instance_id } =>
|
|
Ok(instance_abandon(&mut tx, account, instance_id)?),
|
|
|
|
RpcRequest::VboxAccept { instance_id, group, index } =>
|
|
Ok(RpcMessage::InstanceState(vbox_accept(&mut tx, account, instance_id, group, index, None)?)),
|
|
|
|
RpcRequest::VboxAcceptEquip { instance_id, group, index, construct_id } =>
|
|
Ok(RpcMessage::InstanceState(vbox_accept(&mut tx, account, instance_id, group, index, Some(construct_id))?)),
|
|
|
|
RpcRequest::VboxApply { instance_id, construct_id, index } =>
|
|
Ok(RpcMessage::InstanceState(vbox_apply(&mut tx, account, instance_id, construct_id, index)?)),
|
|
|
|
RpcRequest::VboxCombine { instance_id, inv_indices, vbox_indices } =>
|
|
Ok(RpcMessage::InstanceState(vbox_combine(&mut tx, account, instance_id, inv_indices, vbox_indices)?)),
|
|
|
|
RpcRequest::VboxDiscard { instance_id } =>
|
|
Ok(RpcMessage::InstanceState(vbox_discard(&mut tx, account, instance_id)?)),
|
|
|
|
RpcRequest::VboxReclaim { instance_id, index } =>
|
|
Ok(RpcMessage::InstanceState(vbox_reclaim(&mut tx, account, instance_id, index)?)),
|
|
|
|
RpcRequest::VboxUnequip { instance_id, construct_id, target } =>
|
|
Ok(RpcMessage::InstanceState(vbox_unequip(&mut tx, account, instance_id, construct_id, target, None)?)),
|
|
|
|
RpcRequest::VboxUnequipApply { instance_id, construct_id, target, target_construct_id } =>
|
|
Ok(RpcMessage::InstanceState(vbox_unequip(&mut tx, account, instance_id, construct_id, target, Some(target_construct_id))?)),
|
|
|
|
RpcRequest::MtxConstructSpawn {} =>
|
|
Ok(RpcMessage::ConstructSpawn(mtx::new_construct(&mut tx, account)?)),
|
|
|
|
RpcRequest::MtxConstructApply { mtx, construct_id, name } =>
|
|
Ok(RpcMessage::AccountTeam(mtx::apply(&mut tx, account, mtx, construct_id, name)?)),
|
|
|
|
RpcRequest::MtxAccountApply { mtx } =>
|
|
Ok(RpcMessage::AccountState(mtx::account_apply(&mut tx, account, mtx)?)),
|
|
|
|
RpcRequest::MtxBuy { mtx } =>
|
|
Ok(RpcMessage::AccountShop(mtx::buy(&mut tx, account, mtx)?)),
|
|
|
|
RpcRequest::SubscriptionEnding { ending } =>
|
|
Ok(RpcMessage::SubscriptionState(payments::subscription_ending(&mut tx, &self.stripe, account, ending)?)),
|
|
|
|
_ => Err(format_err!("unknown request request={:?}", request)),
|
|
};
|
|
|
|
tx.commit()?;
|
|
res
|
|
}
|
|
};
|
|
|
|
info!("request={:?} account={:?} duration={:?}", request, account.name, begin.elapsed());
|
|
|
|
return response;
|
|
},
|
|
Err(e) => {
|
|
warn!("{:?}", e);
|
|
Err(err_msg("invalid message"))
|
|
},
|
|
}
|
|
}
|
|
|
|
// this is where last minute processing happens
|
|
// use it to modify outgoing messages, update subs, serialize in some way...
|
|
fn send(&self, msg: RpcMessage) -> Result<(), Error> {
|
|
let msg = match self.account {
|
|
Some(ref a) => match msg {
|
|
RpcMessage::InstanceState(v) => RpcMessage::InstanceState(v.redact(a.id)),
|
|
RpcMessage::AccountInstances(v) =>
|
|
RpcMessage::AccountInstances(v.into_iter().map(|i| i.redact(a.id)).collect()),
|
|
RpcMessage::GameState(v) => RpcMessage::GameState(v.redact(a.id)),
|
|
_ => msg,
|
|
},
|
|
None => msg,
|
|
};
|
|
|
|
self.ws.send(msg).unwrap();
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
// we unwrap everything in here cause really
|
|
// we don't care if this panics
|
|
// it's run in a thread so it's supposed to bail
|
|
// when it encounters errors
|
|
impl Handler for Connection {
|
|
fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> {
|
|
info!("websocket connected account={:?}", self.account);
|
|
|
|
// tell events we have connected
|
|
self.events.send(Event::Connect(self.id, self.account.clone(), self.ws.clone())).unwrap();
|
|
|
|
// if user logged in do some prep work
|
|
if let Some(ref a) = self.account {
|
|
self.send(RpcMessage::AccountState(a.clone())).unwrap();
|
|
self.events.send(Event::Subscribe(self.id, a.id)).unwrap();
|
|
|
|
// check if they have an image that needs to be generated
|
|
account::img_check(&a).unwrap();
|
|
|
|
let db = self.pool.get().unwrap();
|
|
let mut tx = db.transaction().unwrap();
|
|
|
|
// send account constructs
|
|
let account_constructs = account::constructs(&mut tx, a).unwrap();
|
|
self.send(RpcMessage::AccountConstructs(account_constructs)).unwrap();
|
|
|
|
// get account instances
|
|
// and send them to the client
|
|
let account_instances = account::account_instances(&mut tx, a).unwrap();
|
|
self.send(RpcMessage::AccountInstances(account_instances)).unwrap();
|
|
|
|
let shop = mtx::account_shop(&mut tx, &a).unwrap();
|
|
self.send(RpcMessage::AccountShop(shop)).unwrap();
|
|
|
|
let team = account::team(&mut tx, &a).unwrap();
|
|
self.send(RpcMessage::AccountTeam(team)).unwrap();
|
|
|
|
let wheel = account::chat_wheel(&db, a.id).unwrap();
|
|
self.send(RpcMessage::ChatWheel(wheel)).unwrap();
|
|
|
|
if let Some(instance) = account::tutorial(&mut tx, &a).unwrap() {
|
|
self.send(RpcMessage::InstanceState(instance)).unwrap();
|
|
}
|
|
|
|
// tx should do nothing
|
|
tx.commit().unwrap();
|
|
} else {
|
|
self.send(RpcMessage::Demo(demo().unwrap())).unwrap();
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn on_message(&mut self, msg: Message) -> ws::Result<()> {
|
|
match msg {
|
|
Message::Binary(msg) => {
|
|
let begin = Instant::now();
|
|
let db_connection = self.pool.get().unwrap();
|
|
|
|
match self.receive(msg, &db_connection, begin) {
|
|
Ok(reply) => {
|
|
// if the user queries the state of something
|
|
// we tell events to push updates to them
|
|
match reply {
|
|
RpcMessage::AccountState(ref v) => {
|
|
self.account = Some(v.clone());
|
|
self.events.send(Event::Subscribe(self.id, v.id)).unwrap()
|
|
},
|
|
RpcMessage::GameState(ref v) =>
|
|
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
|
|
RpcMessage::InstanceState(ref v) =>
|
|
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
|
|
_ => (),
|
|
};
|
|
|
|
self.send(reply).unwrap();
|
|
},
|
|
Err(e) => {
|
|
warn!("{:?}", e);
|
|
self.send(RpcMessage::Error(e.to_string())).unwrap();
|
|
},
|
|
};
|
|
},
|
|
_ => (),
|
|
};
|
|
Ok(())
|
|
}
|
|
|
|
fn on_close(&mut self, _: CloseCode, _: &str) {
|
|
info!("websocket disconnected account={:?}", self.account);
|
|
self.events.send(Event::Disconnect(self.id)).unwrap();
|
|
}
|
|
|
|
fn on_request(&mut self, req: &Request) -> ws::Result<Response> {
|
|
let res = Response::from_request(req)?;
|
|
|
|
if let Some(cl) = req.header("Cookie") {
|
|
let unauth = || {
|
|
let mut res = Response::new(401, "Unauthorized", b"401 - Unauthorized".to_vec());
|
|
res.headers_mut().push(("Set-Cookie".into(), AUTH_CLEAR.into()));
|
|
Ok(res)
|
|
};
|
|
let cookie_list = match str::from_utf8(cl) {
|
|
Ok(cl) => cl,
|
|
Err(_) => return unauth(),
|
|
};
|
|
|
|
for s in cookie_list.split(";").map(|s| s.trim()) {
|
|
let cookie = match Cookie::parse(s) {
|
|
Ok(c) => c,
|
|
Err(_) => return unauth(),
|
|
};
|
|
|
|
// got auth token
|
|
if cookie.name() == TOKEN_HEADER {
|
|
let db = self.pool.get().unwrap();
|
|
match account::from_token(&db, &cookie.value().to_string()) {
|
|
Ok(a) => self.account = Some(a),
|
|
Err(_) => return unauth(),
|
|
}
|
|
}
|
|
};
|
|
};
|
|
|
|
Ok(res)
|
|
}
|
|
}
|
|
|
|
pub fn start(pool: PgPool, events_tx: CbSender<Event>, stripe: StripeClient) {
|
|
let mut rng = thread_rng();
|
|
|
|
let ws = Builder::new()
|
|
.with_settings(Settings {
|
|
max_connections: 10_000,
|
|
..Settings::default()
|
|
})
|
|
.build(move |out: WsSender| {
|
|
// we give the tx half to the connection object
|
|
// which in turn passes a clone to the events system
|
|
// the rx half goes into a thread where it waits for messages
|
|
// that need to be delivered to the client
|
|
// both the ws message handler and the events thread must use
|
|
// this channel to send messages
|
|
let (tx, rx) = unbounded::<RpcMessage>();
|
|
|
|
spawn(move || {
|
|
loop {
|
|
match rx.recv() {
|
|
Ok(n) => {
|
|
let response = to_vec(&n).unwrap();
|
|
out.send(Message::Binary(response)).unwrap();
|
|
}
|
|
// we done
|
|
Err(_e) => {
|
|
// info!("{:?}", e);
|
|
break;
|
|
},
|
|
};
|
|
}
|
|
});
|
|
|
|
DeflateHandler::new(
|
|
Connection {
|
|
id: rng.gen::<usize>(),
|
|
account: None,
|
|
ws: tx,
|
|
pool: pool.clone(),
|
|
stripe: stripe.clone(),
|
|
events: events_tx.clone(),
|
|
}
|
|
)
|
|
})
|
|
.unwrap()
|
|
.listen("127.0.0.1:40055")
|
|
.unwrap();
|
|
}
|
|
|