use std::time::{Instant}; use std::thread::{spawn, sleep}; use std::time; use std::str; use uuid::Uuid; use rand::prelude::*; use failure::Error; use failure::err_msg; use serde_cbor::{from_slice, to_vec}; use cookie::Cookie; use stripe::{Client as StripeClient, Subscription}; use crossbeam_channel::{unbounded, Sender as CbSender}; use ws::{Builder, CloseCode, Message, Handler, Request, Response, Settings, Sender as WsSender}; use account::{Account}; use account; use construct::{Construct}; use events::{Event}; use game::{Game, game_state, game_skill, game_skill_clear, game_ready}; use instance::{Instance, ChatState, instance_state, instance_practice, instance_ready, instance_abandon, demo}; use item::{Item, ItemInfoCtr, item_info}; use mtx; use mail; use player::{Player}; use payments; use mail::Email; use pg::{Db}; use pg::{PgPool}; use skill::{Skill, dev_resolve, Resolutions}; use vbox::{vbox_accept, vbox_apply, vbox_discard, vbox_combine, vbox_reclaim, vbox_unequip}; use http::{AUTH_CLEAR, TOKEN_HEADER}; #[derive(Debug,Clone,Serialize)] pub enum RpcMessage { AccountState(Account), AccountConstructs(Vec), AccountTeam(Vec), AccountInstances(Vec), AccountShop(mtx::Shop), Demo(Vec), ConstructSpawn(Construct), GameState(Game), ItemInfo(ItemInfoCtr), InstanceState(Instance), InstanceChat(ChatState), ChatWheel(Vec), EmailState(Option), SubscriptionState(Option), Pong(()), DevResolutions(Resolutions), QueueRequested(()), QueueJoined(()), QueueCancelled(()), InviteRequested(()), Invite(String), Joining(()), Processing(()), Error(String), } #[derive(Debug,Clone,Serialize,Deserialize)] pub enum RpcRequest { Ping {}, ItemInfo {}, DevResolve { a: Uuid, b: Uuid, skill: Skill }, MtxConstructApply { mtx: mtx::MtxVariant, construct_id: Uuid, name: String }, MtxConstructSpawn { }, MtxAccountApply { mtx: mtx::MtxVariant }, MtxBuy { mtx: mtx::MtxVariant }, GameState { id: Uuid }, GameReady { id: Uuid }, GameSkill { game_id: Uuid, construct_id: Uuid, target_construct_id: Uuid, skill: Skill }, GameSkillClear { game_id: Uuid }, AccountState {}, AccountShop {}, AccountInstances {}, AccountConstructs {}, AccountSetTeam { ids: Vec }, SubscriptionEnding { ending: bool }, SubscriptionState {}, EmailState {}, InstanceInvite {}, InstanceJoin { code: String }, InstanceQueue {}, InstancePractice {}, InstanceAbandon { instance_id: Uuid }, InstanceReady { instance_id: Uuid }, InstanceState { instance_id: Uuid }, InstanceChat { instance_id: Uuid, index: usize }, VboxAccept { instance_id: Uuid, group: usize, index: usize }, VboxDiscard { instance_id: Uuid }, VboxCombine { instance_id: Uuid, indices: Vec }, VboxApply { instance_id: Uuid, construct_id: Uuid, index: usize }, VboxUnequip { instance_id: Uuid, construct_id: Uuid, target: Item }, VboxReclaim { instance_id: Uuid, index: usize }, } struct Connection { pub id: usize, pub ws: CbSender, pool: PgPool, stripe: StripeClient, account: Option, events: CbSender, } impl Connection { fn receive(&self, data: Vec, db: &Db, begin: Instant) -> Result { // cast the msg to this type to receive method name match from_slice::(&data) { Ok(v) => { // non authenticated // non transactional reqs match v { RpcRequest::Ping {} => return Ok(RpcMessage::Pong(())), RpcRequest::ItemInfo {} => return Ok(RpcMessage::ItemInfo(item_info())), RpcRequest::DevResolve {a, b, skill } => return Ok(RpcMessage::DevResolutions(dev_resolve(a, b, skill))), _ => (), }; // check for authorization now let account = match self.account { Some(ref account) => account, None => return Err(err_msg("auth required")), }; let request = v.clone(); let response = match v { // evented but authorization required RpcRequest::InstanceQueue {} => { self.events.send(Event::Queue(self.id))?; Ok(RpcMessage::QueueRequested(())) }, RpcRequest::InstanceInvite {} => { self.events.send(Event::Invite(self.id))?; Ok(RpcMessage::InviteRequested(())) }, RpcRequest::InstanceJoin { code } => { self.events.send(Event::Join(self.id, code))?; Ok(RpcMessage::Joining(())) }, RpcRequest::InstanceChat { instance_id, index } => { if !account.subscribed { return Err(err_msg("subscribe to unlock chat")) } let wheel = account::chat_wheel(&db, account.id)?; if let Some(c) = wheel.get(index) { self.events.send(Event::Chat(self.id, instance_id, c.to_string()))?; } else { return Err(err_msg("invalid chat index")); } let events_tx = self.events.clone(); let id = self.id; spawn(move || { sleep(time::Duration::from_secs(3)); events_tx.send(Event::ChatClear(id, instance_id)).unwrap(); }); Ok(RpcMessage::Processing(())) }, _ => { // all good, let's make a tx and process let mut tx = db.transaction()?; let res = match v { RpcRequest::AccountState {} => Ok(RpcMessage::AccountState(account.clone())), RpcRequest::AccountConstructs {} => Ok(RpcMessage::AccountConstructs(account::constructs(&mut tx, &account)?)), RpcRequest::AccountInstances {} => Ok(RpcMessage::AccountInstances(account::account_instances(&mut tx, account)?)), RpcRequest::AccountSetTeam { ids } => Ok(RpcMessage::AccountTeam(account::set_team(&mut tx, &account, ids)?)), RpcRequest::EmailState {} => Ok(RpcMessage::EmailState(mail::select_account(&db, account.id)?)), RpcRequest::SubscriptionState {} => Ok(RpcMessage::SubscriptionState(payments::account_subscription(&db, &self.stripe, &account)?)), // RpcRequest::AccountShop {} => // Ok(RpcMessage::AccountShop(mtx::account_shop(&mut tx, &account)?)), // RpcRequest::ConstructDelete" => handle_construct_delete(data, &mut tx, account), RpcRequest::GameState { id } => Ok(RpcMessage::GameState(game_state(&mut tx, account, id)?)), RpcRequest::GameSkill { game_id, construct_id, target_construct_id, skill } => Ok(RpcMessage::GameState(game_skill(&mut tx, account, game_id, construct_id, target_construct_id, skill)?)), RpcRequest::GameSkillClear { game_id } => Ok(RpcMessage::GameState(game_skill_clear(&mut tx, account, game_id)?)), RpcRequest::GameReady { id } => Ok(RpcMessage::GameState(game_ready(&mut tx, account, id)?)), RpcRequest::InstancePractice {} => Ok(RpcMessage::InstanceState(instance_practice(&mut tx, account)?)), // these two can return GameState or InstanceState RpcRequest::InstanceReady { instance_id } => Ok(instance_ready(&mut tx, account, instance_id)?), RpcRequest::InstanceState { instance_id } => Ok(instance_state(&mut tx, instance_id)?), RpcRequest::InstanceAbandon { instance_id } => Ok(instance_abandon(&mut tx, account, instance_id)?), RpcRequest::VboxAccept { instance_id, group, index } => Ok(RpcMessage::InstanceState(vbox_accept(&mut tx, account, instance_id, group, index)?)), RpcRequest::VboxApply { instance_id, construct_id, index } => Ok(RpcMessage::InstanceState(vbox_apply(&mut tx, account, instance_id, construct_id, index)?)), RpcRequest::VboxCombine { instance_id, indices } => Ok(RpcMessage::InstanceState(vbox_combine(&mut tx, account, instance_id, indices)?)), RpcRequest::VboxDiscard { instance_id } => Ok(RpcMessage::InstanceState(vbox_discard(&mut tx, account, instance_id)?)), RpcRequest::VboxReclaim { instance_id, index } => Ok(RpcMessage::InstanceState(vbox_reclaim(&mut tx, account, instance_id, index)?)), RpcRequest::VboxUnequip { instance_id, construct_id, target } => Ok(RpcMessage::InstanceState(vbox_unequip(&mut tx, account, instance_id, construct_id, target)?)), RpcRequest::MtxConstructSpawn {} => Ok(RpcMessage::ConstructSpawn(mtx::new_construct(&mut tx, account)?)), RpcRequest::MtxConstructApply { mtx, construct_id, name } => Ok(RpcMessage::AccountTeam(mtx::apply(&mut tx, account, mtx, construct_id, name)?)), RpcRequest::MtxAccountApply { mtx } => Ok(RpcMessage::AccountState(mtx::account_apply(&mut tx, account, mtx)?)), RpcRequest::MtxBuy { mtx } => Ok(RpcMessage::AccountShop(mtx::buy(&mut tx, account, mtx)?)), RpcRequest::SubscriptionEnding { ending } => Ok(RpcMessage::SubscriptionState(payments::subscription_ending(&mut tx, &self.stripe, account, ending)?)), _ => Err(format_err!("unknown request request={:?}", request)), }; tx.commit()?; res } }; info!("request={:?} account={:?} duration={:?}", request, account.name, begin.elapsed()); return response; }, Err(e) => { warn!("{:?}", e); Err(err_msg("invalid message")) }, } } // this is where last minute processing happens // use it to modify outgoing messages, update subs, serialize in some way... fn send(&self, msg: RpcMessage) -> Result<(), Error> { let msg = match self.account { Some(ref a) => match msg { RpcMessage::InstanceState(v) => RpcMessage::InstanceState(v.redact(a.id)), RpcMessage::AccountInstances(v) => RpcMessage::AccountInstances(v.into_iter().map(|i| i.redact(a.id)).collect()), RpcMessage::GameState(v) => RpcMessage::GameState(v.redact(a.id)), _ => msg, }, None => msg, }; self.ws.send(msg).unwrap(); Ok(()) } } // we unwrap everything in here cause really // we don't care if this panics // it's run in a thread so it's supposed to bail // when it encounters errors impl Handler for Connection { fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { info!("websocket connected account={:?}", self.account); // tell events we have connected self.events.send(Event::Connect(self.id, self.account.clone(), self.ws.clone())).unwrap(); // if user logged in do some prep work if let Some(ref a) = self.account { self.send(RpcMessage::AccountState(a.clone())).unwrap(); self.events.send(Event::Subscribe(self.id, a.id)).unwrap(); // check if they have an image that needs to be generated account::img_check(&a).unwrap(); let db = self.pool.get().unwrap(); let mut tx = db.transaction().unwrap(); // send account constructs let account_constructs = account::constructs(&mut tx, a).unwrap(); self.send(RpcMessage::AccountConstructs(account_constructs)).unwrap(); // get account instances // and send them to the client let account_instances = account::account_instances(&mut tx, a).unwrap(); self.send(RpcMessage::AccountInstances(account_instances)).unwrap(); let shop = mtx::account_shop(&mut tx, &a).unwrap(); self.send(RpcMessage::AccountShop(shop)).unwrap(); let team = account::team(&mut tx, &a).unwrap(); self.send(RpcMessage::AccountTeam(team)).unwrap(); let wheel = account::chat_wheel(&db, a.id).unwrap(); self.send(RpcMessage::ChatWheel(wheel)).unwrap(); // tx should do nothing tx.commit().unwrap(); } else { self.send(RpcMessage::Demo(demo().unwrap())).unwrap(); } Ok(()) } fn on_message(&mut self, msg: Message) -> ws::Result<()> { match msg { Message::Binary(msg) => { let begin = Instant::now(); let db_connection = self.pool.get().unwrap(); match self.receive(msg, &db_connection, begin) { Ok(reply) => { // if the user queries the state of something // we tell events to push updates to them match reply { RpcMessage::AccountState(ref v) => { self.account = Some(v.clone()); self.events.send(Event::Subscribe(self.id, v.id)).unwrap() }, RpcMessage::GameState(ref v) => self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), RpcMessage::InstanceState(ref v) => self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), _ => (), }; self.send(reply).unwrap(); }, Err(e) => { warn!("{:?}", e); self.send(RpcMessage::Error(e.to_string())).unwrap(); }, }; }, _ => (), }; Ok(()) } fn on_close(&mut self, _: CloseCode, _: &str) { info!("websocket disconnected account={:?}", self.account); self.events.send(Event::Disconnect(self.id)).unwrap(); } fn on_request(&mut self, req: &Request) -> ws::Result { let res = Response::from_request(req)?; if let Some(cl) = req.header("Cookie") { let unauth = || { let mut res = Response::new(401, "Unauthorized", b"401 - Unauthorized".to_vec()); res.headers_mut().push(("Set-Cookie".into(), AUTH_CLEAR.into())); Ok(res) }; let cookie_list = match str::from_utf8(cl) { Ok(cl) => cl, Err(_) => return unauth(), }; for s in cookie_list.split(";").map(|s| s.trim()) { let cookie = match Cookie::parse(s) { Ok(c) => c, Err(_) => return unauth(), }; // got auth token if cookie.name() == TOKEN_HEADER { let db = self.pool.get().unwrap(); match account::from_token(&db, &cookie.value().to_string()) { Ok(a) => self.account = Some(a), Err(_) => return unauth(), } } }; }; Ok(res) } } pub fn start(pool: PgPool, events_tx: CbSender, stripe: StripeClient) { let mut rng = thread_rng(); Builder::new() .with_settings(Settings { max_connections: 10_000, ..Settings::default() }) .build(move |out: WsSender| { // we give the tx half to the connection object // which in turn passes a clone to the events system // the rx half goes into a thread where it waits for messages // that need to be delivered to the client // both the ws message handler and the events thread must use // this channel to send messages let (tx, rx) = unbounded::(); spawn(move || { loop { match rx.recv() { Ok(n) => { let response = to_vec(&n).unwrap(); out.send(Message::Binary(response)).unwrap(); } // we done Err(_e) => { break; }, }; } }); Connection { id: rng.gen::(), account: None, ws: tx, pool: pool.clone(), stripe: stripe.clone(), events: events_tx.clone(), } }) .unwrap() .listen("127.0.0.1:40055") .unwrap(); }