multithreaded connections

This commit is contained in:
ntr 2018-10-12 13:21:22 +11:00
parent 67f62c659a
commit 6bee3f1fe9
4 changed files with 56 additions and 55 deletions

View File

@ -10,7 +10,7 @@ serde = "1"
serde_derive = "1" serde_derive = "1"
serde_cbor = "0.9" serde_cbor = "0.9"
ws = "*" tungstenite = "0.6"
bcrypt = "0.2" bcrypt = "0.2"
dotenv = "0.9.0" dotenv = "0.9.0"

View File

@ -1,6 +1,6 @@
extern crate rand; extern crate rand;
extern crate uuid; extern crate uuid;
extern crate ws; extern crate tungstenite;
extern crate env_logger; extern crate env_logger;
extern crate bcrypt; extern crate bcrypt;

View File

@ -1,4 +1,9 @@
use ws::{listen, Handler, Sender, Result, Message, Handshake, CloseCode, Error}; use tungstenite::Message;
use tungstenite::protocol::WebSocket;
use tungstenite::server::accept;
use tungstenite::error::Error;
use tungstenite::Message::Binary;
use std::net::{TcpListener, TcpStream};
use serde_cbor::{to_vec}; use serde_cbor::{to_vec};
use std::env; use std::env;
@ -14,53 +19,31 @@ pub type Db = PooledConnection<PostgresConnectionManager>;
use rpc::{Rpc}; use rpc::{Rpc};
struct Server { // struct Server {
out: Sender, // client: WebSocket<TcpStream>,
rpc: Rpc, // rpc: Rpc,
db: Pool<PostgresConnectionManager>, // db: Pool<PostgresConnectionManager>,
} // }
#[derive(Debug,Clone,Serialize,Deserialize)] #[derive(Debug,Clone,Serialize,Deserialize)]
struct RpcErrorResponse { struct RpcErrorResponse {
err: String err: String
} }
impl Handler for Server { fn receive(db: Db, rpc: &Rpc, msg: Message, client: &mut WebSocket<TcpStream>) -> Result<(), Error> {
fn on_open(&mut self, _: Handshake) -> Result<()> { match rpc.receive(msg, &db, client) {
println!("somebody joined");
Ok(())
}
fn on_message(&mut self, msg: Message) -> Result<()> {
let db = self.db.get().expect("unable to get db connection");
match self.rpc.receive(msg, &db, &self.out) {
Ok(reply) => { Ok(reply) => {
let response = to_vec(&reply) let response = to_vec(&reply)
.expect("failed to serialize response"); .expect("failed to serialize response");
self.out.send(response) client.write_message(Binary(response))
}, },
Err(e) => { Err(e) => {
println!("{:?}", e); println!("{:?}", e);
let response = to_vec(&RpcErrorResponse { err: e.to_string() }) let response = to_vec(&RpcErrorResponse { err: e.to_string() })
.expect("failed to serialize error response"); .expect("failed to serialize error response");
self.out.send(response) client.write_message(Binary(response))
} }
} }
}
fn on_close(&mut self, code: CloseCode, reason: &str) {
match code {
CloseCode::Normal => println!("The client is done with the connection."),
CloseCode::Away => println!("The client is leaving the site."),
CloseCode::Abnormal => println!(
"Closing handshake failed! Unable to obtain closing status from client."),
_ => println!("The client encountered an error: {}", reason),
}
}
fn on_error(&mut self, err: Error) {
println!("The server encountered an error: {:?}", err);
}
} }
pub fn db_connection(url: String) -> Pool<PostgresConnectionManager> { pub fn db_connection(url: String) -> Pool<PostgresConnectionManager> {
@ -79,12 +62,26 @@ pub fn start() {
let pool = db_connection(database_url); let pool = db_connection(database_url);
listen("127.0.0.1:40000", |out| { // listen("127.0.0.1:40000", |out| {
// let db = pool.clone();
// let handler = spawn(move || {
// });
// let result = handler.join().unwrap();
// return result;
// }).unwrap();
let server = TcpListener::bind("0.0.0.0:40000").unwrap();
for stream in server.incoming() {
let db = pool.clone(); let db = pool.clone();
let handler = spawn(move || { spawn (move || {
Server { out, rpc: Rpc {}, db } let mut websocket = accept(stream.unwrap()).unwrap();
let rpc = Rpc {};
loop {
let msg = websocket.read_message().unwrap();
let db_connection = db.get().expect("unable to get db connection");
receive(db_connection, &rpc, msg, &mut websocket);
}
}); });
let result = handler.join().unwrap(); }
return result;
}).unwrap();
} }

View File

@ -1,4 +1,8 @@
use ws::{Message, Sender}; use tungstenite::Message;
use tungstenite::protocol::WebSocket;
use tungstenite::Message::Binary;
use std::net::{TcpStream};
use serde_cbor::{from_slice, to_vec}; use serde_cbor::{from_slice, to_vec};
use uuid::Uuid; use uuid::Uuid;
use failure::Error; use failure::Error;
@ -13,7 +17,7 @@ use account::{Account, create, login, from_token, fetch_cryps};
pub struct Rpc; pub struct Rpc;
impl Rpc { impl Rpc {
pub fn receive(&self, msg: Message, db: &Db, socket: &Sender) -> Result<RpcResponse, Error> { pub fn receive(&self, msg: Message, db: &Db, client: &mut WebSocket<TcpStream>) -> Result<RpcResponse, Error> {
// consume the ws data into bytes // consume the ws data into bytes
let data = msg.into_data(); let data = msg.into_data();
@ -32,7 +36,7 @@ impl Rpc {
// match on that to determine what fn to call // match on that to determine what fn to call
match v.method.as_ref() { match v.method.as_ref() {
"cryp_spawn" => Rpc::cryp_spawn(data, db, account), "cryp_spawn" => Rpc::cryp_spawn(data, db, account),
"combat_pve" => Rpc::combat_pve(data, db, account, socket), "combat_pve" => Rpc::combat_pve(data, db, account, client),
"account_create" => Rpc::account_create(data, db), "account_create" => Rpc::account_create(data, db),
"account_login" => Rpc::account_login(data, db), "account_login" => Rpc::account_login(data, db),
"account_cryps" => Rpc::account_cryps(db, account), "account_cryps" => Rpc::account_cryps(db, account),
@ -44,15 +48,15 @@ impl Rpc {
} }
} }
fn send_msg(socket: &Sender, msg: RpcResponse) -> Result<(), Error> { fn send_msg(client: &mut WebSocket<TcpStream>, msg: RpcResponse) -> Result<(), Error> {
let bytes = to_vec(&msg)?; let bytes = to_vec(&msg)?;
match socket.send(bytes) { match client.write_message(Binary(bytes)) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
Err(e) => Err(err_msg(e)) Err(e) => Err(err_msg(e))
} }
} }
fn combat_pve(data: Vec<u8>, db: &Db, account: Option<Account>, socket: &Sender) -> Result<RpcResponse, Error> { fn combat_pve(data: Vec<u8>, db: &Db, account: Option<Account>, client: &mut WebSocket<TcpStream>) -> Result<RpcResponse, Error> {
let u = match account { let u = match account {
Some(u) => u, Some(u) => u,
None => return Err(err_msg("auth required")), None => return Err(err_msg("auth required")),
@ -65,7 +69,7 @@ impl Rpc {
params: RpcResult::Pve(pve(msg.params, db, &u)?) params: RpcResult::Pve(pve(msg.params, db, &u)?)
}; };
Rpc::send_msg(socket, RpcResponse { Rpc::send_msg(client, RpcResponse {
method: "account_cryps".to_string(), method: "account_cryps".to_string(),
params: RpcResult::CrypList(fetch_cryps(db, &u)?) params: RpcResult::CrypList(fetch_cryps(db, &u)?)
})?; })?;