diff --git a/server/src/main.rs b/server/src/main.rs index 7434ad6f..531c2729 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -90,7 +90,7 @@ fn main() { let warden_pool = pool.clone(); let pubsub_pool = pool.clone(); - let (pss, psr) = unbounded(); + let pubsub = pubsub::PubSub::new(); spawn(move || { loop { @@ -102,15 +102,16 @@ fn main() { } }); + let pg_listener = pubsub.clone(); spawn(move || loop { let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); - match pg_listen(pubsub_conn, pss.clone()) { + match pg_listen(pubsub_conn, &pg_listener) { Ok(_) => warn!("pg listen closed"), Err(e) => warn!("pg_listen error {:?}", e), } }); spawn(move || net::start(http_pool)); - ws::start(ws_pool, psr); + ws::start(ws_pool, pubsub); info!("server started"); } diff --git a/server/src/pubsub.rs b/server/src/pubsub.rs index 6a4cd90b..38e1d301 100644 --- a/server/src/pubsub.rs +++ b/server/src/pubsub.rs @@ -5,13 +5,26 @@ use fallible_iterator::{FallibleIterator}; use failure::Error; use failure::err_msg; -use crossbeam_channel::{Sender}; +use crossbeam_channel::{unbounded, Sender, Receiver}; use pg::{Db}; use account; use game; use instance; +#[derive(Clone)] +pub struct PubSub { + pub s: Sender, + r: Receiver, +} + +impl PubSub { + pub fn new() -> PubSub { + let (s, r) = unbounded(); + return PubSub { s, r }; + } +} + #[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] #[serde(rename_all(deserialize = "lowercase"))] enum Table { @@ -72,7 +85,7 @@ fn handle_notification(n: Notification, db: &Db, pss: &Sender) -> Resul Ok(()) } -pub fn pg_listen(db: Db, pss: Sender) -> Result<(), Error> { +pub fn pg_listen(db: Db, ps: &PubSub) -> Result<(), Error> { db.execute("LISTEN events;", &[])?; info!("pubsub listening"); let notifications = db.notifications(); @@ -81,7 +94,7 @@ pub fn pg_listen(db: Db, pss: Sender) -> Result<(), Error> { let n = n_iter.next().unwrap(); if let Some(n) = n { match serde_json::from_str::(&n.payload) { - Ok(notification) => match handle_notification(notification, &db, &pss) { + Ok(notification) => match handle_notification(notification, &db, &ps.s) { Ok(()) => (), Err(e) => warn!("{:?}", e), } diff --git a/server/src/rpc.rs b/server/src/rpc.rs index 897f459d..bf4ce82b 100644 --- a/server/src/rpc.rs +++ b/server/src/rpc.rs @@ -68,7 +68,7 @@ enum RpcRequest { VboxReclaim { instance_id: Uuid, index: usize }, } -pub fn receive(data: Vec, db: &Db, _client: &mut Ws, begin: Instant, account: &Option) -> Result { +pub fn receive(data: Vec, db: &Db, begin: Instant, account: &Option) -> Result { // cast the msg to this type to receive method name match from_slice::(&data) { Ok(v) => { diff --git a/server/src/ws.rs b/server/src/ws.rs index f5ab2412..d0c43bb9 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -15,7 +15,7 @@ use tungstenite::protocol::WebSocket; use tungstenite::util::NonBlockingResult; use tungstenite::{accept_hdr}; -use crossbeam_channel::{unbounded, Receiver}; +use crossbeam_channel::{unbounded, Receiver, Sender}; use serde_cbor::{to_vec}; @@ -30,9 +30,81 @@ use mtx; use pg::PgPool; use account; use account::Account; -use pubsub::Message; +use pubsub::{Message, PubSub}; -pub type Ws = WebSocket; +pub struct Ws { + client: WebSocket, + sender: Sender, + receiver: Receiver, + pool: PgPool, + account: Option, +} + +impl Ws { + pub fn new(client: WebSocket, pool: PgPool, account: Option) -> Ws { + let (sender, receiver) = unbounded(); + return Ws { sender, receiver, client, pool, account }; + } + + fn start(&mut self) { + loop { + match self.client.read_message().no_block() { + Ok(msg) => { + if let Some(msg) = msg { + match msg { + Binary(data) => { + let begin = Instant::now(); + let db_connection = self.pool.get() + .expect("unable to get db connection"); + + match rpc::receive(data, &db_connection, begin, &self.account) { + Ok(reply) => { + let response = to_vec(&reply) + .expect("failed to serialize response"); + + if let Err(e) = self.client.write_message(Binary(response)) { + // connection closed + warn!("{:?}", e); + return; + }; + + // self.subscriptions.update(&reply).unwrap(); + }, + Err(e) => { + warn!("{:?}", e); + let response = to_vec(&RpcError { err: e.to_string() }) + .expect("failed to serialize error response"); + + if let Err(e) = self.client.write_message(Binary(response)) { + // connection closed + warn!("{:?}", e); + return; + }; + } + } + }, + _ => (), + } + }; + + self.receive(); + }, + // connection is closed + Err(e) => { + warn!("{:?}", e); + return; + } + }; + } + } + + fn receive(&mut self) { + // match self.receiver.try_recv() { + // Ok(n) => handle_message(&subs, n, &mut websocket), + // Err(_) => (), + // }; + } +} #[derive(Debug,Clone,Serialize)] struct RpcError { @@ -55,13 +127,13 @@ impl Subscriptions { // send account constructs let account_constructs = account::account_constructs(&mut tx, a)?; - ws.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; + ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; // get account instances // and send them to the client let account_instances = account::account_instances(&mut tx, a)?; // let instances = account_instances.iter().map(|i| i.id).collect::>(); - ws.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; + ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; // get players // add to games @@ -128,15 +200,14 @@ fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) { }, // _ => None, } { - ws.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); + ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); } } -pub fn start(pool: PgPool, psr: Receiver) { +pub fn start(pool: PgPool, ps: PubSub) { let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap(); for stream in ws_server.incoming() { let ws_pool = pool.clone(); - let ws_psr = psr.clone(); spawn(move || { let (acc_s, acc_r) = unbounded(); @@ -167,7 +238,7 @@ pub fn start(pool: PgPool, psr: Receiver) { Ok(None) }; - let mut websocket = accept_hdr(nb_stream, cb).unwrap(); + let mut client = accept_hdr(nb_stream, cb).unwrap(); // get a copy of the account let account = match acc_r.recv().unwrap() { @@ -179,13 +250,13 @@ pub fn start(pool: PgPool, psr: Receiver) { match account::from_token(&db, t) { Ok(a) => { let state = to_vec(&rpc::RpcMessage::AccountState(a.clone())).unwrap(); - websocket.write_message(Binary(state)).unwrap(); + client.write_message(Binary(state)).unwrap(); let mut tx = db.transaction().unwrap(); let shop = mtx::account_shop(&mut tx, &a).unwrap(); let shop = to_vec(&rpc::RpcMessage::AccountShop(shop)).unwrap(); - websocket.write_message(Binary(shop)).unwrap(); + client.write_message(Binary(shop)).unwrap(); // tx doesn't change anything tx.commit().unwrap(); @@ -201,66 +272,8 @@ pub fn start(pool: PgPool, psr: Receiver) { None => None, }; - let mut subs = match Subscriptions::new(&ws_pool, &account, &mut websocket) { - Ok(s) => s, - Err(e) => { - warn!("subscriptions error err={:?}", e); - return; - }, - }; - - loop { - match websocket.read_message().no_block() { - Ok(msg) => { - if let Some(msg) = msg { - match msg { - Binary(data) => { - let begin = Instant::now(); - let db_connection = ws_pool.get() - .expect("unable to get db connection"); - - match rpc::receive(data, &db_connection, &mut websocket, begin, &account) { - Ok(reply) => { - let response = to_vec(&reply) - .expect("failed to serialize response"); - - if let Err(e) = websocket.write_message(Binary(response)) { - // connection closed - warn!("{:?}", e); - return; - }; - - subs.update(&reply).unwrap(); - }, - Err(e) => { - warn!("{:?}", e); - let response = to_vec(&RpcError { err: e.to_string() }) - .expect("failed to serialize error response"); - - if let Err(e) = websocket.write_message(Binary(response)) { - // connection closed - warn!("{:?}", e); - return; - }; - } - } - }, - _ => (), - } - }; - - match ws_psr.try_recv() { - Ok(n) => handle_message(&subs, n, &mut websocket), - Err(_) => (), - }; - }, - // connection is closed - Err(e) => { - warn!("{:?}", e); - return; - } - }; - } + Ws::new(client, ws_pool, account) + .start() }); } }