use std::time::{Instant}; use std::str; use uuid::Uuid; use cookie::Cookie; use futures::stream::Stream; use futures::Future; use tokio::net::TcpListener; use tungstenite::Message::Binary; use tungstenite::handshake::server::{Request, ErrorResponse}; use tungstenite::handshake::HandshakeRole; use tungstenite::http::StatusCode; use tungstenite::protocol::WebSocket; use tungstenite::util::NonBlockingResult; use tungstenite::protocol::Message; use tokio_tungstenite::accept_hdr_async; use crossbeam_channel::{unbounded, Receiver, Sender}; use serde_cbor::{to_vec}; use std::io::{Error, ErrorKind}; use net::TOKEN_HEADER; use rpc; use mtx; use pg::PgPool; use account; use account::Account; use events::{Message as WsMessage, Events}; // pub fn ws(mut client: WebSocket, pool: PgPool, account: Option, events: Sender) { // let (tx, rx) = unbounded(); // events.try_send(Message::Connect(tx)).unwrap(); // loop { // match client.read_message().no_block() { // Ok(msg) => { // if let Some(msg) = msg { // match msg { // Binary(data) => { // let begin = Instant::now(); // let db_connection = pool.get() // .expect("unable to get db connection"); // match rpc::receive(data, &db_connection, begin, &account) { // Ok(reply) => { // let response = to_vec(&reply) // .expect("failed to serialize response"); // if let Err(e) = client.write_message(Binary(response)) { // // connection closed // warn!("{:?}", e); // return; // }; // // subscriptions.update(&reply).unwrap(); // }, // Err(e) => { // warn!("{:?}", e); // let response = to_vec(&RpcError { err: e.to_string() }) // .expect("failed to serialize error response"); // if let Err(e) = client.write_message(Binary(response)) { // // connection closed // warn!("{:?}", e); // return; // }; // } // } // }, // _ => (), // } // }; // // match receiver.try_recv() { // // Ok(n) => handle_message(&subs, n, &mut websocket), // // Err(_) => (), // // }; // }, // // connection is closed // Err(e) => { // warn!("{:?}", e); // return; // } // }; // } // } #[derive(Debug,Clone,Serialize)] struct RpcError { err: String, } pub fn start(pool: PgPool, events: Events) -> impl Future { let addr = "127.0.0.1:40055".parse().unwrap(); let ws_server = TcpListener::bind(&addr).unwrap(); let wsf = ws_server.incoming().for_each(move |stream| { let ws_pool = pool.clone(); let events_tx = events.tx.clone(); let (acc_s, acc_r) = unbounded(); // search through the ws request for the auth cookie let cb = move |req: &Request| { let err = || ErrorResponse { error_code: StatusCode::FORBIDDEN, headers: None, body: Some("Unauthorized".into()), }; if let Some(cl) = req.headers.find_first("Cookie") { let cookie_list = str::from_utf8(cl).or(Err(err()))?; for s in cookie_list.split(";").map(|s| s.trim()) { let cookie = Cookie::parse(s).or(Err(err()))?; // got auth token if cookie.name() == TOKEN_HEADER { acc_s.send(Some(cookie.value().to_string())).or(Err(err()))?; } }; }; acc_s.send(None).unwrap(); Ok(None) }; accept_hdr_async(stream, cb).and_then(move |ws_stream| { info!("new connection"); let (sink, stream) = ws_stream.split(); // get a copy of the account let account = match acc_r.recv().unwrap() { Some(t) => { let db = ws_pool.get() .expect("unable to get db connection"); match account::from_token(&db, t) { Ok(a) => { let state = to_vec(&rpc::RpcMessage::AccountState(a.clone())).unwrap(); // client.write_message(Binary(state)).unwrap(); let mut tx = db.transaction().unwrap(); let shop = mtx::account_shop(&mut tx, &a).unwrap(); let shop = to_vec(&rpc::RpcMessage::AccountShop(shop)).unwrap(); // client.write_message(Binary(shop)).unwrap(); // tx doesn't change anything tx.commit().unwrap(); Some(a) }, Err(e) => { warn!("{:?}", e); None }, } }, None => None, }; let ws_reader = stream.for_each(move |message: Message| { Ok(()) }); tokio::spawn(ws_reader.then(move |_| { println!("Connection closed."); Ok(()) })); info!("{:?}", account); Ok(()) // ws(client, ws_pool, account, events_tx) }) .map_err(|e| { warn!("Error during the websocket handshake occurred: {}", e); Error::new(ErrorKind::Other, e) }) }); return wsf; }