This commit is contained in:
ntr 2019-07-25 21:45:27 +10:00
parent be7a388812
commit 066c61c227
2 changed files with 59 additions and 81 deletions

View File

@ -112,9 +112,9 @@ fn main() {
// this should go on a thread too?
let ws_pool = pool.clone();
let wsf = ws::start(ws_pool, events);
let wss = ws::server(ws_pool, events);
tokio::runtime::run(wsf.map_err(|_e| ()));
tokio::runtime::run(wss.map_err(|_e| ()));
info!("server started");
}

View File

@ -8,99 +8,41 @@ use cookie::Cookie;
use futures::stream::Stream;
use futures::Future;
use tokio::net::TcpListener;
use tungstenite::Message::Binary;
use tungstenite::handshake::server::{Request, ErrorResponse};
use tungstenite::handshake::HandshakeRole;
use tungstenite::http::StatusCode;
use tungstenite::protocol::WebSocket;
use tungstenite::util::NonBlockingResult;
use tungstenite::protocol::Message;
use tokio_tungstenite::accept_hdr_async;
use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_channel::{unbounded};
use serde_cbor::{to_vec};
use std::io::{Error, ErrorKind};
use futures::Sink;
use net::TOKEN_HEADER;
use rpc;
use rpc::{RpcMessage};
use mtx;
use pg::PgPool;
use account;
use account::Account;
use events::{Message as WsMessage, Events};
// pub fn ws(mut client: WebSocket<Stream>, pool: PgPool, account: Option<Account>, events: Sender<Message>) {
// let (tx, rx) = unbounded();
// events.try_send(Message::Connect(tx)).unwrap();
// loop {
// match client.read_message().no_block() {
// Ok(msg) => {
// if let Some(msg) = msg {
// match msg {
// Binary(data) => {
// let begin = Instant::now();
// let db_connection = pool.get()
// .expect("unable to get db connection");
// match rpc::receive(data, &db_connection, begin, &account) {
// Ok(reply) => {
// let response = to_vec(&reply)
// .expect("failed to serialize response");
// if let Err(e) = client.write_message(Binary(response)) {
// // connection closed
// warn!("{:?}", e);
// return;
// };
// // subscriptions.update(&reply).unwrap();
// },
// Err(e) => {
// warn!("{:?}", e);
// let response = to_vec(&RpcError { err: e.to_string() })
// .expect("failed to serialize error response");
// if let Err(e) = client.write_message(Binary(response)) {
// // connection closed
// warn!("{:?}", e);
// return;
// };
// }
// }
// },
// _ => (),
// }
// };
// // match receiver.try_recv() {
// // Ok(n) => handle_message(&subs, n, &mut websocket),
// // Err(_) => (),
// // };
// },
// // connection is closed
// Err(e) => {
// warn!("{:?}", e);
// return;
// }
// };
// }
// }
use events::{Events};
#[derive(Debug,Clone,Serialize)]
struct RpcError {
err: String,
}
pub fn start(pool: PgPool, events: Events) -> impl Future<Item = (), Error = Error> {
pub fn server(pool: PgPool, events: Events) -> impl Future<Item = (), Error = Error> {
let addr = "127.0.0.1:40055".parse().unwrap();
let ws_server = TcpListener::bind(&addr).unwrap();
let wsf = ws_server.incoming().for_each(move |stream| {
let wss = ws_server.incoming().for_each(move |stream| {
let ws_pool = pool.clone();
let events_tx = events.tx.clone();
@ -133,7 +75,7 @@ pub fn start(pool: PgPool, events: Events) -> impl Future<Item = (), Error = Err
accept_hdr_async(stream, cb).and_then(move |ws_stream| {
info!("new connection");
let (sink, stream) = ws_stream.split();
let (mut sink, stream) = ws_stream.split();
// get a copy of the account
let account = match acc_r.recv().unwrap() {
@ -144,13 +86,13 @@ pub fn start(pool: PgPool, events: Events) -> impl Future<Item = (), Error = Err
match account::from_token(&db, t) {
Ok(a) => {
let state = to_vec(&rpc::RpcMessage::AccountState(a.clone())).unwrap();
// client.write_message(Binary(state)).unwrap();
sink.start_send(Message::Binary(state)).unwrap();
let mut tx = db.transaction().unwrap();
let shop = mtx::account_shop(&mut tx, &a).unwrap();
let shop = to_vec(&rpc::RpcMessage::AccountShop(shop)).unwrap();
// client.write_message(Binary(shop)).unwrap();
sink.start_send(Message::Binary(shop)).unwrap();
// tx doesn't change anything
tx.commit().unwrap();
@ -166,21 +108,57 @@ pub fn start(pool: PgPool, events: Events) -> impl Future<Item = (), Error = Err
None => None,
};
let ws_reader = stream.for_each(move |message: Message| {
Ok(())
let (tx, rx) = unbounded();
events_tx.try_send(WsMessage::Connect(tx)).unwrap();
let message_reader = stream
.filter(|msg| msg.is_binary())
.and_then(|message: Message| {
let begin = Instant::now();
let db_connection = ws_pool.get()
.expect("unable to get db connection");
match rpc::receive(message.into_data(), &db_connection, begin, &account) {
Ok(reply) => {
let response = to_vec(&reply)
.expect("failed to serialize response");
Ok(response);
},
Err(e) => {
warn!("{:?}", e);
let response = to_vec(&RpcError { err: e.to_string() })
.expect("failed to serialize error response");
Ok(response);
}
};
Ok(sink)
});
let events_reader = rx.map(|message: RpcMessage| {
let response = to_vec(&message)
.expect("failed to serialize event");
Ok(response);
});
let combined = message_reader.select(events_reader);
tokio::spawn(ws_reader.then(move |_| {
println!("Connection closed.");
let outgoing = combined.fold(sink, |mut sink, message| {
sink.start_send(Message::Binary(message)).unwrap();
Ok(sink)
});
// if let Err(e) = sink.start_send(Message::Binary(response)) {
// // connection closed
// warn!("{:?}", e);
// return Err(e);
// };
tokio::spawn(outgoing.then(move |_| {
info!("connection closed");
Ok(())
}));
info!("{:?}", account);
Ok(())
// ws(client, ws_pool, account, events_tx)
})
.map_err(|e| {
warn!("Error during the websocket handshake occurred: {}", e);
@ -188,5 +166,5 @@ pub fn start(pool: PgPool, events: Events) -> impl Future<Item = (), Error = Err
})
});
return wsf;
return wss;
}