This commit is contained in:
ntr 2019-07-24 16:35:33 +10:00
parent 384cdb2de5
commit 5851f20211
4 changed files with 105 additions and 78 deletions

View File

@ -90,7 +90,7 @@ fn main() {
let warden_pool = pool.clone(); let warden_pool = pool.clone();
let pubsub_pool = pool.clone(); let pubsub_pool = pool.clone();
let (pss, psr) = unbounded(); let pubsub = pubsub::PubSub::new();
spawn(move || { spawn(move || {
loop { loop {
@ -102,15 +102,16 @@ fn main() {
} }
}); });
let pg_listener = pubsub.clone();
spawn(move || loop { spawn(move || loop {
let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection");
match pg_listen(pubsub_conn, pss.clone()) { match pg_listen(pubsub_conn, &pg_listener) {
Ok(_) => warn!("pg listen closed"), Ok(_) => warn!("pg listen closed"),
Err(e) => warn!("pg_listen error {:?}", e), Err(e) => warn!("pg_listen error {:?}", e),
} }
}); });
spawn(move || net::start(http_pool)); spawn(move || net::start(http_pool));
ws::start(ws_pool, psr); ws::start(ws_pool, pubsub);
info!("server started"); info!("server started");
} }

View File

@ -5,13 +5,26 @@ use fallible_iterator::{FallibleIterator};
use failure::Error; use failure::Error;
use failure::err_msg; use failure::err_msg;
use crossbeam_channel::{Sender}; use crossbeam_channel::{unbounded, Sender, Receiver};
use pg::{Db}; use pg::{Db};
use account; use account;
use game; use game;
use instance; use instance;
#[derive(Clone)]
pub struct PubSub {
pub s: Sender<Message>,
r: Receiver<Message>,
}
impl PubSub {
pub fn new() -> PubSub {
let (s, r) = unbounded();
return PubSub { s, r };
}
}
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)] #[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
#[serde(rename_all(deserialize = "lowercase"))] #[serde(rename_all(deserialize = "lowercase"))]
enum Table { enum Table {
@ -72,7 +85,7 @@ fn handle_notification(n: Notification, db: &Db, pss: &Sender<Message>) -> Resul
Ok(()) Ok(())
} }
pub fn pg_listen(db: Db, pss: Sender<Message>) -> Result<(), Error> { pub fn pg_listen(db: Db, ps: &PubSub) -> Result<(), Error> {
db.execute("LISTEN events;", &[])?; db.execute("LISTEN events;", &[])?;
info!("pubsub listening"); info!("pubsub listening");
let notifications = db.notifications(); let notifications = db.notifications();
@ -81,7 +94,7 @@ pub fn pg_listen(db: Db, pss: Sender<Message>) -> Result<(), Error> {
let n = n_iter.next().unwrap(); let n = n_iter.next().unwrap();
if let Some(n) = n { if let Some(n) = n {
match serde_json::from_str::<Notification>(&n.payload) { match serde_json::from_str::<Notification>(&n.payload) {
Ok(notification) => match handle_notification(notification, &db, &pss) { Ok(notification) => match handle_notification(notification, &db, &ps.s) {
Ok(()) => (), Ok(()) => (),
Err(e) => warn!("{:?}", e), Err(e) => warn!("{:?}", e),
} }

View File

@ -68,7 +68,7 @@ enum RpcRequest {
VboxReclaim { instance_id: Uuid, index: usize }, VboxReclaim { instance_id: Uuid, index: usize },
} }
pub fn receive(data: Vec<u8>, db: &Db, _client: &mut Ws, begin: Instant, account: &Option<Account>) -> Result<RpcMessage, Error> { pub fn receive(data: Vec<u8>, db: &Db, begin: Instant, account: &Option<Account>) -> Result<RpcMessage, Error> {
// cast the msg to this type to receive method name // cast the msg to this type to receive method name
match from_slice::<RpcRequest>(&data) { match from_slice::<RpcRequest>(&data) {
Ok(v) => { Ok(v) => {

View File

@ -15,7 +15,7 @@ use tungstenite::protocol::WebSocket;
use tungstenite::util::NonBlockingResult; use tungstenite::util::NonBlockingResult;
use tungstenite::{accept_hdr}; use tungstenite::{accept_hdr};
use crossbeam_channel::{unbounded, Receiver}; use crossbeam_channel::{unbounded, Receiver, Sender};
use serde_cbor::{to_vec}; use serde_cbor::{to_vec};
@ -30,9 +30,81 @@ use mtx;
use pg::PgPool; use pg::PgPool;
use account; use account;
use account::Account; use account::Account;
use pubsub::Message; use pubsub::{Message, PubSub};
pub type Ws = WebSocket<TcpStream>; pub struct Ws {
client: WebSocket<TcpStream>,
sender: Sender<Message>,
receiver: Receiver<Message>,
pool: PgPool,
account: Option<Account>,
}
impl Ws {
pub fn new(client: WebSocket<TcpStream>, pool: PgPool, account: Option<Account>) -> Ws {
let (sender, receiver) = unbounded();
return Ws { sender, receiver, client, pool, account };
}
fn start(&mut self) {
loop {
match self.client.read_message().no_block() {
Ok(msg) => {
if let Some(msg) = msg {
match msg {
Binary(data) => {
let begin = Instant::now();
let db_connection = self.pool.get()
.expect("unable to get db connection");
match rpc::receive(data, &db_connection, begin, &self.account) {
Ok(reply) => {
let response = to_vec(&reply)
.expect("failed to serialize response");
if let Err(e) = self.client.write_message(Binary(response)) {
// connection closed
warn!("{:?}", e);
return;
};
// self.subscriptions.update(&reply).unwrap();
},
Err(e) => {
warn!("{:?}", e);
let response = to_vec(&RpcError { err: e.to_string() })
.expect("failed to serialize error response");
if let Err(e) = self.client.write_message(Binary(response)) {
// connection closed
warn!("{:?}", e);
return;
};
}
}
},
_ => (),
}
};
self.receive();
},
// connection is closed
Err(e) => {
warn!("{:?}", e);
return;
}
};
}
}
fn receive(&mut self) {
// match self.receiver.try_recv() {
// Ok(n) => handle_message(&subs, n, &mut websocket),
// Err(_) => (),
// };
}
}
#[derive(Debug,Clone,Serialize)] #[derive(Debug,Clone,Serialize)]
struct RpcError { struct RpcError {
@ -55,13 +127,13 @@ impl Subscriptions {
// send account constructs // send account constructs
let account_constructs = account::account_constructs(&mut tx, a)?; let account_constructs = account::account_constructs(&mut tx, a)?;
ws.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?; ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?;
// get account instances // get account instances
// and send them to the client // and send them to the client
let account_instances = account::account_instances(&mut tx, a)?; let account_instances = account::account_instances(&mut tx, a)?;
// let instances = account_instances.iter().map(|i| i.id).collect::<Vec<Uuid>>(); // let instances = account_instances.iter().map(|i| i.id).collect::<Vec<Uuid>>();
ws.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?; ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?;
// get players // get players
// add to games // add to games
@ -128,15 +200,14 @@ fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) {
}, },
// _ => None, // _ => None,
} { } {
ws.write_message(Binary(to_vec(&msg).unwrap())).unwrap(); ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap();
} }
} }
pub fn start(pool: PgPool, psr: Receiver<Message>) { pub fn start(pool: PgPool, ps: PubSub) {
let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap(); let ws_server = TcpListener::bind("127.0.0.1:40055").unwrap();
for stream in ws_server.incoming() { for stream in ws_server.incoming() {
let ws_pool = pool.clone(); let ws_pool = pool.clone();
let ws_psr = psr.clone();
spawn(move || { spawn(move || {
let (acc_s, acc_r) = unbounded(); let (acc_s, acc_r) = unbounded();
@ -167,7 +238,7 @@ pub fn start(pool: PgPool, psr: Receiver<Message>) {
Ok(None) Ok(None)
}; };
let mut websocket = accept_hdr(nb_stream, cb).unwrap(); let mut client = accept_hdr(nb_stream, cb).unwrap();
// get a copy of the account // get a copy of the account
let account = match acc_r.recv().unwrap() { let account = match acc_r.recv().unwrap() {
@ -179,13 +250,13 @@ pub fn start(pool: PgPool, psr: Receiver<Message>) {
match account::from_token(&db, t) { match account::from_token(&db, t) {
Ok(a) => { Ok(a) => {
let state = to_vec(&rpc::RpcMessage::AccountState(a.clone())).unwrap(); let state = to_vec(&rpc::RpcMessage::AccountState(a.clone())).unwrap();
websocket.write_message(Binary(state)).unwrap(); client.write_message(Binary(state)).unwrap();
let mut tx = db.transaction().unwrap(); let mut tx = db.transaction().unwrap();
let shop = mtx::account_shop(&mut tx, &a).unwrap(); let shop = mtx::account_shop(&mut tx, &a).unwrap();
let shop = to_vec(&rpc::RpcMessage::AccountShop(shop)).unwrap(); let shop = to_vec(&rpc::RpcMessage::AccountShop(shop)).unwrap();
websocket.write_message(Binary(shop)).unwrap(); client.write_message(Binary(shop)).unwrap();
// tx doesn't change anything // tx doesn't change anything
tx.commit().unwrap(); tx.commit().unwrap();
@ -201,66 +272,8 @@ pub fn start(pool: PgPool, psr: Receiver<Message>) {
None => None, None => None,
}; };
let mut subs = match Subscriptions::new(&ws_pool, &account, &mut websocket) { Ws::new(client, ws_pool, account)
Ok(s) => s, .start()
Err(e) => {
warn!("subscriptions error err={:?}", e);
return;
},
};
loop {
match websocket.read_message().no_block() {
Ok(msg) => {
if let Some(msg) = msg {
match msg {
Binary(data) => {
let begin = Instant::now();
let db_connection = ws_pool.get()
.expect("unable to get db connection");
match rpc::receive(data, &db_connection, &mut websocket, begin, &account) {
Ok(reply) => {
let response = to_vec(&reply)
.expect("failed to serialize response");
if let Err(e) = websocket.write_message(Binary(response)) {
// connection closed
warn!("{:?}", e);
return;
};
subs.update(&reply).unwrap();
},
Err(e) => {
warn!("{:?}", e);
let response = to_vec(&RpcError { err: e.to_string() })
.expect("failed to serialize error response");
if let Err(e) = websocket.write_message(Binary(response)) {
// connection closed
warn!("{:?}", e);
return;
};
}
}
},
_ => (),
}
};
match ws_psr.try_recv() {
Ok(n) => handle_message(&subs, n, &mut websocket),
Err(_) => (),
};
},
// connection is closed
Err(e) => {
warn!("{:?}", e);
return;
}
};
}
}); });
} }
} }