use std::time::{Instant}; use std::thread::spawn; use std::str; use rand::prelude::*; use serde_cbor::to_vec; use cookie::Cookie; use crossbeam_channel::{unbounded, Sender as CbSender}; use ws::{ listen, CloseCode, Message, Handler, Result, Request, Response}; use account; use account::{Account}; use pg::{PgPool}; use events::Event; use rpc::{RpcMessage}; use rpc; use mtx; use net::TOKEN_HEADER; struct Connection { pub id: usize, pub ws: CbSender, pool: PgPool, account: Option, events: CbSender, } // we unwrap everything in here cause really // we don't care if this panics // it's run in a thread so it's supposed to bail // when it encounters errors impl Handler for Connection { fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { info!("websocket connected account={:?}", self.account); // tell events we have connected self.events.send(Event::Connect(self.id, self.account.clone(), self.ws.clone())).unwrap(); // if user logged in do some prep work if let Some(ref a) = self.account { self.ws.send(RpcMessage::AccountState(a.clone())).unwrap(); self.events.send(Event::Subscribe(self.id, a.id)).unwrap(); let db = self.pool.get().unwrap(); let mut tx = db.transaction().unwrap(); // send account constructs let account_constructs = account::account_constructs(&mut tx, a).unwrap(); self.ws.send(rpc::RpcMessage::AccountConstructs(account_constructs)).unwrap(); // get account instances // and send them to the client let account_instances = account::account_instances(&mut tx, a).unwrap(); self.ws.send(rpc::RpcMessage::AccountInstances(account_instances)).unwrap(); let shop = mtx::account_shop(&mut tx, &a).unwrap(); self.ws.send(rpc::RpcMessage::AccountShop(shop)).unwrap(); // tx should do nothing tx.commit().unwrap(); } Ok(()) } fn on_message(&mut self, msg: Message) -> Result<()> { match msg { Message::Binary(msg) => { let begin = Instant::now(); let db_connection = self.pool.get().unwrap(); match rpc::receive(msg, &db_connection, begin, &self.account) { Ok(reply) => { // if the user queries the state of something // we tell events to push updates to them match reply { RpcMessage::AccountState(ref v) => self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), RpcMessage::GameState(ref v) => self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), RpcMessage::InstanceState(ref v) => self.events.send(Event::Subscribe(self.id, v.id)).unwrap(), _ => (), }; self.ws.send(reply).unwrap(); }, Err(e) => { warn!("{:?}", e); self.ws.send(RpcMessage::Error(e.to_string())).unwrap(); }, }; }, _ => (), }; Ok(()) } fn on_close(&mut self, _: CloseCode, _: &str) { info!("websocket disconnected account={:?}", self.account); self.events.send(Event::Disconnect(self.id)).unwrap(); } fn on_request(&mut self, req: &Request) -> Result { let res = Response::from_request(req)?; if let Some(cl) = req.header("Cookie") { let unauth = || Ok(Response::new(401, "Unauthorized", b"401 - Unauthorized".to_vec())); let cookie_list = match str::from_utf8(cl) { Ok(cl) => cl, Err(_) => return unauth(), }; for s in cookie_list.split(";").map(|s| s.trim()) { let cookie = match Cookie::parse(s) { Ok(c) => c, Err(_) => return unauth(), }; // got auth token if cookie.name() == TOKEN_HEADER { let db = self.pool.get().unwrap(); match account::from_token(&db, cookie.value().to_string()) { Ok(a) => self.account = Some(a), Err(_) => return unauth(), } } }; }; Ok(res) } } pub fn start(pool: PgPool, events_tx: CbSender) { let mut rng = thread_rng(); listen("127.0.0.1:40055", move |out| { // we give the tx half to the connection object // which in turn passes a clone to the events system // the rx half goes into a thread where it waits for messages // that need to be delivered to the client // both the ws message handler and the events thread must use // this channel to send messages let (tx, rx) = unbounded::(); spawn(move || { loop { match rx.recv() { Ok(n) => { let response = to_vec(&n).unwrap(); out.send(Message::Binary(response)).unwrap(); } // we done Err(_e) => { break; }, }; } }); Connection { id: rng.gen::(), account: None, ws: tx, pool: pool.clone(), events: events_tx.clone(), } }).unwrap(); }