use mnml_core::item::ItemInfoCtr; use mnml_core::instance::ChatState; use std::collections::HashMap; use std::thread::{spawn}; use std::str; use uuid::Uuid; use failure::Error; use serde_cbor::{to_vec}; use cookie::Cookie; use stripe::{Client as StripeClient, Subscription}; use crossbeam_channel::{unbounded, Sender as CbSender}; use ws::{Builder, CloseCode, Message, Handler, Request, Response, Settings, Sender as WsSender}; use ws::deflate::DeflateHandler; use account::{Account}; use account; use events::{Event}; use user_anonymous::{Anonymous}; use user_authenticated::{Authenticated}; use mnml_core::construct::{Construct}; use mnml_core::game::{Game}; use mnml_core::vbox::{ItemType}; use mnml_core::item::Item; use mnml_core::skill::Skill; use mnml_core::instance::{Instance}; use mtx; use mail::Email; use pg::{PgPool}; use http::{AUTH_CLEAR, TOKEN_HEADER}; #[derive(Debug,Clone,Serialize)] pub enum RpcMessage { AccountState(Account), AccountAuthenticated(Account), AccountConstructs(Vec), AccountTeam(Vec), AccountInstances(Vec), AccountShop(mtx::Shop), ConstructSpawn(Construct), GameState(Game), ItemInfo(ItemInfoCtr), InstanceState(Instance), InstanceChat(ChatState), ChatWheel(Vec), EmailState(Option), SubscriptionState(Option), Pong(()), StartTutorial(()), PromptRegister(()), QueueRequested(()), QueueJoined(()), QueueLeft(()), QueueFound(()), InviteRequested(()), Invite(String), Joining(()), Processing(()), Error(String), } #[derive(Debug,Clone,Serialize,Deserialize)] pub enum RpcRequest { Ping {}, ItemInfo {}, DevResolve { skill: Skill }, MtxConstructApply { mtx: mtx::MtxVariant, construct_id: Uuid, name: String }, MtxConstructSpawn { }, MtxAccountApply { mtx: mtx::MtxVariant }, MtxBuy { mtx: mtx::MtxVariant }, GameState { id: Uuid }, GameReady { id: Uuid }, GameSkill { game_id: Uuid, construct_id: Uuid, target_construct_id: Uuid, skill: Skill }, GameSkillClear { game_id: Uuid }, GameOfferDraw { game_id: Uuid }, GameConcede { game_id: Uuid }, AccountState {}, AccountShop {}, AccountInstances {}, AccountConstructs {}, AccountSetTeam { ids: Vec }, SubscriptionEnding { ending: bool }, SubscriptionState {}, EmailState {}, InstanceInvite {}, InstanceJoin { code: String }, InstanceQueue {}, InstanceLeave {}, InstancePractice {}, InstanceAbandon { instance_id: Uuid }, InstanceReady { instance_id: Uuid }, InstanceState { instance_id: Uuid }, InstanceChat { instance_id: Uuid, index: usize }, VboxBuy { instance_id: Uuid, group: ItemType, index: String, construct_id: Option }, VboxRefill { instance_id: Uuid }, VboxCombine { instance_id: Uuid, inv_indices: Vec, vbox_indices: Option>> }, VboxApply { instance_id: Uuid, construct_id: Uuid, index: String }, VboxUnequip { instance_id: Uuid, construct_id: Uuid, target: Item }, VboxUnequipApply { instance_id: Uuid, construct_id: Uuid, target: Item, target_construct_id: Uuid }, VboxRefund { instance_id: Uuid, index: String }, } pub trait User { fn receive(&mut self, data: Vec, stripe: &StripeClient) -> Result; fn connected(&mut self) -> Result<(), Error>; fn send(&mut self, msg: RpcMessage) -> Result<(), Error>; } struct Connection { pub id: Uuid, pub ws: CbSender, pool: PgPool, stripe: StripeClient, // account: Option, user: Box, events: CbSender, } // we unwrap everything in here cause really // we don't care if this panics // it's run in a thread so it's supposed to bail // when it encounters errors impl Handler for Connection { fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { self.user.connected().unwrap(); Ok(()) } fn on_message(&mut self, msg: Message) -> ws::Result<()> { match msg { Message::Binary(msg) => { match self.user.receive(msg, &self.stripe) { Ok(msg) => { self.user.send(msg).unwrap(); }, Err(e) => { warn!("{:?}", e); self.ws.send(RpcMessage::Error(e.to_string())).unwrap(); }, }; }, _ => (), }; Ok(()) } fn on_close(&mut self, _: CloseCode, _: &str) { info!("websocket disconnected id={:?}", self.id); self.events.send(Event::Disconnect(self.id)).unwrap(); } fn on_request(&mut self, req: &Request) -> ws::Result { let res = Response::from_request(req)?; if let Some(cl) = req.header("Cookie") { let unauth = || { let mut res = Response::new(401, "Unauthorized", b"401 - Unauthorized".to_vec()); res.headers_mut().push(("Set-Cookie".into(), AUTH_CLEAR.into())); Ok(res) }; let cookie_list = match str::from_utf8(cl) { Ok(cl) => cl, Err(_) => return unauth(), }; for s in cookie_list.split(";").map(|s| s.trim()) { let cookie = match Cookie::parse(s) { Ok(c) => c, Err(_) => return unauth(), }; // got auth token if cookie.name() == TOKEN_HEADER { let db = self.pool.get().unwrap(); match account::from_token(&db, &cookie.value().to_string()) { Ok(a) => self.user = Box::new(Authenticated::new(a, self.ws.clone(), self.events.clone(), self.pool.clone())), Err(_) => return unauth(), } } }; } Ok(res) } } pub fn start(pool: PgPool, events_tx: CbSender, stripe: StripeClient) { let _ws = Builder::new() .with_settings(Settings { max_connections: 10_000, ..Settings::default() }) .build(move |out: WsSender| { // we give the tx half to the connection object // which in turn passes a clone to the events system // the rx half goes into a thread where it waits for messages // that need to be delivered to the client // both the ws message handler and the events thread must use // this channel to send messages let (tx, rx) = unbounded::(); spawn(move || { loop { match rx.recv() { Ok(n) => { let response = to_vec(&n).unwrap(); out.send(Message::Binary(response)).unwrap(); } // we done Err(_e) => { // info!("{:?}", e); break; }, }; } }); let anon_account = Account::anonymous(); let id = anon_account.id; DeflateHandler::new( Connection { id, ws: tx.clone(), pool: pool.clone(), stripe: stripe.clone(), events: events_tx.clone(), user: Box::new(Anonymous::new(anon_account, tx)) } ) }) .unwrap() .listen("127.0.0.1:40055") .unwrap(); }