From 86d77d23bb29872732e0bdef0533b32a2aa2690b Mon Sep 17 00:00:00 2001 From: ntr Date: Thu, 25 Jul 2019 23:59:51 +1000 Subject: [PATCH] carnnnn cunt --- server/src/websocket.rs | 120 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 server/src/websocket.rs diff --git a/server/src/websocket.rs b/server/src/websocket.rs new file mode 100644 index 00000000..38569a55 --- /dev/null +++ b/server/src/websocket.rs @@ -0,0 +1,120 @@ +use std::time::{Instant}; +use std::thread::spawn; +use std::str; + +use serde_cbor::to_vec; + +use cookie::Cookie; + +use crossbeam_channel::{unbounded, Sender as CbSender, Receiver as CbReceiver}; +use ws::{ listen, CloseCode, Message, Sender, Handler, Handshake, Result, Request, Response}; + +use account; +use account::{Account}; +use pg::{PgPool}; +use events::{Events}; +use rpc::{RpcMessage}; +use rpc; +use net::TOKEN_HEADER; + +#[derive(Debug,Clone,Serialize)] +struct WsError { + err: String, +} + +struct Connection { + ws: CbSender, + pool: PgPool, + events: Events, + account: Option, +} + +impl Handler for Connection { + fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { + info!("connected account={:?}", self.account); + Ok(()) + } + + fn on_message(&mut self, msg: Message) -> Result<()> { + match msg { + Message::Binary(msg) => { + let begin = Instant::now(); + let db_connection = self.pool.get().unwrap(); + + match rpc::receive(msg, &db_connection, begin, &self.account) { + Ok(reply) => { + self.ws.send(reply).unwrap(); + }, + Err(e) => { + warn!("{:?}", e); + self.ws.send(RpcMessage::Error(e.to_string())).unwrap(); + }, + }; + }, + _ => (), + }; + Ok(()) + } + + fn on_close(&mut self, _: CloseCode, _: &str) { + info!("socket disconnected account={:?}", self.account); + // self.ws.shutdown().unwrap() + } + + fn on_request(&mut self, req: &Request) -> Result { + let res = Response::from_request(req)?; + + if let Some(cl) = req.header("Cookie") { + let unauth = || Ok(Response::new(401, "Unauthorized", b"401 - Unauthorized".to_vec())); + let cookie_list = match str::from_utf8(cl) { + Ok(cl) => cl, + Err(_) => return unauth(), + }; + + for s in cookie_list.split(";").map(|s| s.trim()) { + let cookie = match Cookie::parse(s) { + Ok(c) => c, + Err(_) => return unauth(), + }; + + // got auth token + if cookie.name() == TOKEN_HEADER { + let db = self.pool.get().unwrap(); + match account::from_token(&db, cookie.value().to_string()) { + Ok(a) => self.account = Some(a), + Err(_) => return unauth(), + } + } + }; + }; + + Ok(res) + } +} + + +pub fn start(pool: PgPool, events: Events) { + listen("127.0.0.1:40055", move |out| { + let (tx, rx) = unbounded::(); + + spawn(move || { + loop { + match rx.recv() { + Ok(n) => { + let response = to_vec(&n).unwrap(); + out.send(Message::Binary(response)).unwrap(); + } + Err(_) => (), + }; + } + }); + + Connection { + account: None, + ws: tx, + pool: pool.clone(), + events: events.clone(), + } + + }).unwrap(); +}