diff --git a/client/src/components/list.jsx b/client/src/components/list.jsx
index 1d7a1c0c..26cd07a0 100644
--- a/client/src/components/list.jsx
+++ b/client/src/components/list.jsx
@@ -64,7 +64,6 @@ const addState = connect(
function List(args) {
const {
team,
- constructs,
constructRename,
clearMtxRename,
setConstructRename,
@@ -73,9 +72,7 @@ function List(args) {
sendConstructAvatarReroll,
} = args;
- const constructPanels = constructs
- .filter(c => team.includes(c.id))
- .sort(idSort)
+ const constructPanels = team
.map(construct => {
const constructName = constructRename === construct.id
?
diff --git a/client/src/events.jsx b/client/src/events.jsx
index a32251b0..abe1b065 100644
--- a/client/src/events.jsx
+++ b/client/src/events.jsx
@@ -10,7 +10,6 @@ function registerEvents(store) {
}
function setTeam(team) {
- localStorage.setItem('team', JSON.stringify(team));
store.dispatch(actions.setTeam(team));
}
@@ -179,11 +178,6 @@ function registerEvents(store) {
return store.dispatch(actions.setItemInfo(v));
}
- const team = JSON.parse(localStorage.getItem('team'));
- if (team && team.every(t => t)) {
- store.dispatch(actions.setTeam(team));
- }
-
// events.on('SET_PLAYER', setInstance);
// events.on('SEND_SKILL', function skillActive(gameId, constructId, targetConstructId, skill) {
@@ -234,6 +228,7 @@ function registerEvents(store) {
setItemInfo,
setPing,
setShop,
+ setTeam,
setWs,
};
}
diff --git a/client/src/socket.jsx b/client/src/socket.jsx
index eed3385d..78ebd771 100644
--- a/client/src/socket.jsx
+++ b/client/src/socket.jsx
@@ -113,10 +113,9 @@ function createSocket(events) {
}
function sendInstanceQueue() {
- send(['InstancePractice', {}]);
+ send(['InstanceQueue', {}]);
}
-
function sendInstanceReady(instanceId) {
send(['InstanceReady', { instance_id: instanceId }]);
}
@@ -155,6 +154,10 @@ function createSocket(events) {
events.setConstructList(constructs);
}
+ function onAccountTeam(constructs) {
+ events.setTeam(constructs);
+ }
+
function onConstructSpawn(construct) {
events.setNewConstruct(construct);
}
@@ -186,6 +189,7 @@ function createSocket(events) {
const handlers = {
AccountState: onAccount,
AccountConstructs: onAccountConstructs,
+ AccountTeam: onAccountTeam,
AccountInstances: onAccountInstances,
AccountShop: onAccountShop,
ConstructSpawn: onConstructSpawn,
@@ -193,6 +197,10 @@ function createSocket(events) {
InstanceState: onInstanceState,
ItemInfo: onItemInfo,
Pong: onPong,
+
+ QueueRequested: () => console.log('pvp queue request received'),
+ QueueJoined: () => console.log('you have joined the pvp queue'),
+
Error: errHandler,
};
diff --git a/etc/nginx/sites-available/mnml.gg.DEV.nginx.conf b/etc/nginx/sites-available/mnml.gg.DEV.nginx.conf
index dc7c219d..9b6b8c97 100644
--- a/etc/nginx/sites-available/mnml.gg.DEV.nginx.conf
+++ b/etc/nginx/sites-available/mnml.gg.DEV.nginx.conf
@@ -20,6 +20,9 @@ server {
root /var/lib/mnml/public/current;
index index.html;
try_files $uri $uri/ index.html;
+
+ add_header 'Cache-Control' 'no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0';
+ expires off;
}
location /imgs/ {
diff --git a/server/src/account.rs b/server/src/account.rs
index cb1f74b9..009ba195 100644
--- a/server/src/account.rs
+++ b/server/src/account.rs
@@ -265,7 +265,7 @@ pub fn create(name: &String, password: &String, code: &String, tx: &mut Transact
Ok(token)
}
-pub fn account_constructs(tx: &mut Transaction, account: &Account) -> Result, Error> {
+pub fn constructs(tx: &mut Transaction, account: &Account) -> Result, Error> {
let query = "
SELECT data
FROM constructs
diff --git a/server/src/events.rs b/server/src/events.rs
index 51c1e456..95cf18e6 100644
--- a/server/src/events.rs
+++ b/server/src/events.rs
@@ -5,7 +5,7 @@ use uuid::Uuid;
use failure::Error;
-use crossbeam_channel::{unbounded, Sender, Receiver};
+use crossbeam_channel::{Sender, Receiver};
use account;
use account::Account;
@@ -19,12 +19,23 @@ use warden::{GameEvent};
pub type EventsTx = Sender;
type Id = usize;
+// this is pretty heavyweight
+// but it makes the ergonomics easy
+// and prevents message mania
+// good candidate to trim if the mm is slow
+#[derive(Debug, Clone)]
+pub struct PvpRequest {
+ pub id: Id,
+ pub tx: Sender,
+ pub account: Account,
+}
+
pub struct Events {
pub tx: Sender,
rx: Receiver,
warden: Sender,
- queue: Option,
+ queue: Option,
clients: HashMap,
}
@@ -41,7 +52,7 @@ pub enum Event {
Push(Uuid, RpcMessage),
// client events
- Queue(Id, Uuid),
+ Queue(PvpRequest),
}
struct WsClient {
@@ -51,8 +62,7 @@ struct WsClient {
}
impl Events {
- pub fn new(warden: Sender) -> Events {
- let (tx, rx) = unbounded();
+ pub fn new(tx: Sender, rx: Receiver, warden: Sender) -> Events {
Events {
tx,
rx,
@@ -66,11 +76,14 @@ impl Events {
loop {
match self.rx.recv() {
Ok(m) => {
- self.on_event(m)?;
+ match self.event(m) {
+ Ok(()) => (), // :)
+ Err(e) => {
+ warn!("err={:?}", e);
+ }
+ }
},
- // idk if this is a good idea
- // possibly just log errors and continue...
Err(e) => {
return Err(format_err!("events error err={:?}", e));
},
@@ -89,7 +102,7 @@ impl Events {
self.clients.remove(&id);
}
- fn on_event(&mut self, msg: Event) -> Result<(), Error> {
+ fn event(&mut self, msg: Event) -> Result<(), Error> {
match msg {
Event::Connect(id, account, tx) => {
info!("connect id={:?} account={:?}", id, account);
@@ -100,6 +113,7 @@ impl Events {
info!("clients={:?}", self.clients.len());
Ok(())
},
+
Event::Disconnect(id) => {
info!("disconnect id={:?}", id);
@@ -107,7 +121,8 @@ impl Events {
info!("clients={:?}", self.clients.len());
Ok(())
- }
+ },
+
Event::Subscribe(id, obj) => {
info!("subscribe id={:?} object={:?}", id, obj);
@@ -120,6 +135,7 @@ impl Events {
None => return Err(format_err!("unknown client {:?}", id))
}
},
+
Event::Unsubscribe(id, obj) => {
info!("unsubscribe id={:?} object={:?}", id, obj);
@@ -162,17 +178,18 @@ impl Events {
Ok(())
},
- Event::Queue(id, account) => {
- info!("queue id={:?} account={:?}", id, account);
+ Event::Queue(req) => {
+ info!("queue id={:?} account={:?}", req.id, req.account);
self.queue = match self.queue {
- Some(id) => {
- info!("game queue pair a={:?} b={:?}", account, id);
+ Some(ref q_req) => {
+ info!("game queue pair found a={:?} b={:?}", req.account, q_req.account);
+ self.warden.send(GameEvent::Match((req, q_req.clone())))?;
None
},
None => {
- info!("joined game queue id={:?} account={:?}", id, account);
- Some(account)
+ info!("joined game queue id={:?} account={:?}", req.id, req.account);
+ Some(req)
},
};
diff --git a/server/src/instance.rs b/server/src/instance.rs
index 6e8f760e..e6c7ad11 100644
--- a/server/src/instance.rs
+++ b/server/src/instance.rs
@@ -13,7 +13,7 @@ use chrono::Duration;
use account::Account;
use account;
-use events::EventsTx;
+use events::{EventsTx, Event};
use player::{Player, player_create};
use construct::{Construct, construct_get};
use mob::{bot_player, instance_mobs};
@@ -692,7 +692,7 @@ pub fn instance_practice(tx: &mut Transaction, account: &Account) -> Result Result {
// }
-pub fn instance_pvp(tx: &mut Transaction, a: &Account, b: &Account) -> Result {
+pub fn pvp(tx: &mut Transaction, a: &Account, b: &Account) -> Result {
let mut instance = Instance::new()
// TODO generate nice game names
.set_name("PVP".to_string())?;
@@ -747,10 +747,6 @@ pub fn instance_game_finished(tx: &mut Transaction, game: &Game, instance_id: Uu
Ok(())
}
-pub fn instance_queue(events: &EventsTx, account: &Account) -> Result {
-
-}
-
#[cfg(test)]
mod tests {
use super::*;
diff --git a/server/src/main.rs b/server/src/main.rs
index 044df941..77682917 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -49,11 +49,12 @@ mod spec;
mod util;
mod vbox;
mod warden;
-mod websocket;
use std::thread::{spawn};
use std::path::{Path};
+use crossbeam_channel::{unbounded};
+
fn setup_logger() -> Result<(), fern::InitError> {
fern::Dispatch::new()
.format(|out, message, record| {
@@ -80,20 +81,22 @@ fn main() {
setup_logger().unwrap();
let pool = pg::create_pool();
-
- let warden = warden::Warden::new(pool.clone());
- let warden_tx = warden.tx.clone();
- let warden_tick_tx = warden.tx.clone();
-
let http_pool = pool.clone();
+ let (events_tx, events_rx) = unbounded();
+ let pg_events_tx = events_tx.clone();
+ let rpc_events_tx = events_tx.clone();
+
+ let (warden_tx, warden_rx) = unbounded();
+ let events_warden_tx = warden_tx.clone();
+ let warden_tick_tx = warden_tx.clone();
+
// create a clone of the tx so ws handler can tell events
// about connection status
- let events = events::Events::new(warden_tx);
- let ws_events_tx = events.tx.clone();
+ let events = events::Events::new(events_tx, events_rx, events_warden_tx);
+ let warden = warden::Warden::new(warden_tx, warden_rx, events.tx.clone(), pool.clone());
let pg_pool = pool.clone();
- let pg_events_tx = events.tx.clone();
spawn(move || net::start(http_pool));
spawn(move || warden.listen());
@@ -102,6 +105,6 @@ fn main() {
spawn(move || events.listen());
// the main thread becomes this ws listener
- let ws_pool = pool.clone();
- websocket::start(ws_pool, ws_events_tx);
+ let rpc_pool = pool.clone();
+ rpc::start(rpc_pool, rpc_events_tx);
}
diff --git a/server/src/mtx.rs b/server/src/mtx.rs
index d20e97fc..41f95601 100644
--- a/server/src/mtx.rs
+++ b/server/src/mtx.rs
@@ -159,7 +159,7 @@ pub fn apply(tx: &mut Transaction, account: &Account, variant: MtxVariant, const
};
construct_write(tx, construct)?;
- account::account_constructs(tx, account)
+ account::constructs(tx, account)
}
pub fn select(tx: &mut Transaction, variant: MtxVariant, account: Uuid) -> Result {
diff --git a/server/src/rpc.rs b/server/src/rpc.rs
index 726622b0..1316d78f 100644
--- a/server/src/rpc.rs
+++ b/server/src/rpc.rs
@@ -1,31 +1,38 @@
-
use std::time::{Instant};
+use std::thread::spawn;
+use std::str;
-use serde_cbor::{from_slice};
use uuid::Uuid;
+use rand::prelude::*;
+
use failure::Error;
use failure::err_msg;
-use crossbeam_channel::{Sender};
+use serde_cbor::{from_slice, to_vec};
+use cookie::Cookie;
+use crossbeam_channel::{unbounded, Sender as CbSender};
+use ws::{listen, CloseCode, Message, Handler, Request, Response};
+
+use account::{Account};
use account;
-use pg::{Db};
-use events::{Event};
use construct::{Construct};
+use events::{Event, PvpRequest};
use game::{Game, game_state, game_skill, game_ready};
-use account::{Account, account_constructs};
-use skill::{Skill, dev_resolve, Resolutions};
-use instance::{Instance, instance_state, instance_practice, instance_ready, instance_queue};
-use vbox::{vbox_accept, vbox_apply, vbox_discard, vbox_combine, vbox_reclaim, vbox_unequip};
+use instance::{Instance, instance_state, instance_practice, instance_ready};
use item::{Item, ItemInfoCtr, item_info};
-use websocket::{Connection};
-
use mtx;
+use pg::{Db};
+use pg::{PgPool};
+use skill::{Skill, dev_resolve, Resolutions};
+use vbox::{vbox_accept, vbox_apply, vbox_discard, vbox_combine, vbox_reclaim, vbox_unequip};
+use net::TOKEN_HEADER;
#[derive(Debug,Clone,Serialize,Deserialize)]
pub enum RpcMessage {
AccountState(Account),
AccountConstructs(Vec),
+ AccountTeam(Vec),
AccountInstances(Vec),
AccountShop(mtx::Shop),
ConstructSpawn(Construct),
@@ -38,9 +45,9 @@ pub enum RpcMessage {
DevResolutions(Resolutions),
- QueueRequested,
- QueueJoined,
- QueueCancelled,
+ QueueRequested(()),
+ QueueJoined(()),
+ QueueCancelled(()),
Error(String),
}
@@ -78,105 +85,271 @@ enum RpcRequest {
VboxReclaim { instance_id: Uuid, index: usize },
}
-pub fn receive(data: Vec, db: &Db, connection: &Connection, begin: Instant, account: &Option) -> Result {
- // cast the msg to this type to receive method name
- match from_slice::(&data) {
- Ok(v) => {
+struct Connection {
+ pub id: usize,
+ pub ws: CbSender,
+ pool: PgPool,
+ account: Option,
+ events: CbSender,
+}
- // non authenticated
- // non transactional reqs
- match v {
- RpcRequest::Ping {} => return Ok(RpcMessage::Pong(())),
- RpcRequest::ItemInfo {} => return Ok(RpcMessage::ItemInfo(item_info())),
- RpcRequest::DevResolve {a, b, skill } =>
- return Ok(RpcMessage::DevResolutions(dev_resolve(a, b, skill))),
- _ => (),
- };
+impl Connection {
+ fn receive(&self, data: Vec, db: &Db, begin: Instant) -> Result {
+ // cast the msg to this type to receive method name
+ match from_slice::(&data) {
+ Ok(v) => {
- // check for authorization now
- let account = match account {
- Some(account) => account,
- None => return Err(err_msg("auth required")),
- };
+ // non authenticated
+ // non transactional reqs
+ match v {
+ RpcRequest::Ping {} => return Ok(RpcMessage::Pong(())),
+ RpcRequest::ItemInfo {} => return Ok(RpcMessage::ItemInfo(item_info())),
+ RpcRequest::DevResolve {a, b, skill } =>
+ return Ok(RpcMessage::DevResolutions(dev_resolve(a, b, skill))),
+ _ => (),
+ };
- // all good, let's make a tx and process
- let mut tx = db.transaction()?;
+ // check for authorization now
+ let account = match self.account {
+ Some(ref account) => account,
+ None => return Err(err_msg("auth required")),
+ };
- let request = v.clone();
+ // evented but authorization required
+ match v {
+ RpcRequest::InstanceQueue {} => {
+ let pvp = PvpRequest { id: self.id, account: account.clone(), tx: self.ws.clone() };
+ self.events.send(Event::Queue(pvp))?;
+ return Ok(RpcMessage::QueueRequested(()));
+ },
+ _ => (),
+ };
- let response = match v {
- RpcRequest::AccountState {} =>
- return Ok(RpcMessage::AccountState(account.clone())),
- RpcRequest::AccountConstructs {} =>
- Ok(RpcMessage::AccountConstructs(account_constructs(&mut tx, &account)?)),
+ // all good, let's make a tx and process
+ let mut tx = db.transaction()?;
- RpcRequest::AccountSetTeam { ids } =>
- Ok(RpcMessage::AccountConstructs(account::set_team(&mut tx, &account, ids)?)),
+ let request = v.clone();
- // RpcRequest::AccountShop {} =>
- // Ok(RpcMessage::AccountShop(mtx::account_shop(&mut tx, &account)?)),
+ let response = match v {
+ RpcRequest::AccountState {} =>
+ return Ok(RpcMessage::AccountState(account.clone())),
+ RpcRequest::AccountConstructs {} =>
+ Ok(RpcMessage::AccountConstructs(account::constructs(&mut tx, &account)?)),
- // RpcRequest::ConstructDelete" => handle_construct_delete(data, &mut tx, account),
+ RpcRequest::AccountSetTeam { ids } =>
+ Ok(RpcMessage::AccountTeam(account::set_team(&mut tx, &account, ids)?)),
- RpcRequest::GameState { id } =>
- Ok(RpcMessage::GameState(game_state(&mut tx, account, id)?)),
+ // RpcRequest::AccountShop {} =>
+ // Ok(RpcMessage::AccountShop(mtx::account_shop(&mut tx, &account)?)),
- RpcRequest::GameSkill { game_id, construct_id, target_construct_id, skill } =>
- Ok(RpcMessage::GameState(game_skill(&mut tx, account, game_id, construct_id, target_construct_id, skill)?)),
+ // RpcRequest::ConstructDelete" => handle_construct_delete(data, &mut tx, account),
- RpcRequest::GameReady { id } =>
- Ok(RpcMessage::GameState(game_ready(&mut tx, account, id)?)),
+ RpcRequest::GameState { id } =>
+ Ok(RpcMessage::GameState(game_state(&mut tx, account, id)?)),
- RpcRequest::InstanceQueue {} =>
- Ok(RpcMessage::QueueRequested),
- RpcRequest::InstancePractice {} =>
- Ok(RpcMessage::InstanceState(instance_practice(&mut tx, account)?)),
+ RpcRequest::GameSkill { game_id, construct_id, target_construct_id, skill } =>
+ Ok(RpcMessage::GameState(game_skill(&mut tx, account, game_id, construct_id, target_construct_id, skill)?)),
- // these two can return GameState or InstanceState
- RpcRequest::InstanceReady { instance_id } =>
- Ok(instance_ready(&mut tx, account, instance_id)?),
- RpcRequest::InstanceState { instance_id } =>
- Ok(instance_state(&mut tx, account, instance_id)?),
+ RpcRequest::GameReady { id } =>
+ Ok(RpcMessage::GameState(game_ready(&mut tx, account, id)?)),
- RpcRequest::VboxAccept { instance_id, group, index } =>
- Ok(RpcMessage::InstanceState(vbox_accept(&mut tx, account, instance_id, group, index)?)),
+ RpcRequest::InstancePractice {} =>
+ Ok(RpcMessage::InstanceState(instance_practice(&mut tx, account)?)),
- RpcRequest::VboxApply { instance_id, construct_id, index } =>
- Ok(RpcMessage::InstanceState(vbox_apply(&mut tx, account, instance_id, construct_id, index)?)),
+ // these two can return GameState or InstanceState
+ RpcRequest::InstanceReady { instance_id } =>
+ Ok(instance_ready(&mut tx, account, instance_id)?),
+ RpcRequest::InstanceState { instance_id } =>
+ Ok(instance_state(&mut tx, account, instance_id)?),
- RpcRequest::VboxCombine { instance_id, indices } =>
- Ok(RpcMessage::InstanceState(vbox_combine(&mut tx, account, instance_id, indices)?)),
+ RpcRequest::VboxAccept { instance_id, group, index } =>
+ Ok(RpcMessage::InstanceState(vbox_accept(&mut tx, account, instance_id, group, index)?)),
- RpcRequest::VboxDiscard { instance_id } =>
- Ok(RpcMessage::InstanceState(vbox_discard(&mut tx, account, instance_id)?)),
+ RpcRequest::VboxApply { instance_id, construct_id, index } =>
+ Ok(RpcMessage::InstanceState(vbox_apply(&mut tx, account, instance_id, construct_id, index)?)),
- RpcRequest::VboxReclaim { instance_id, index } =>
- Ok(RpcMessage::InstanceState(vbox_reclaim(&mut tx, account, instance_id, index)?)),
+ RpcRequest::VboxCombine { instance_id, indices } =>
+ Ok(RpcMessage::InstanceState(vbox_combine(&mut tx, account, instance_id, indices)?)),
- RpcRequest::VboxUnequip { instance_id, construct_id, target } =>
- Ok(RpcMessage::InstanceState(vbox_unequip(&mut tx, account, instance_id, construct_id, target)?)),
+ RpcRequest::VboxDiscard { instance_id } =>
+ Ok(RpcMessage::InstanceState(vbox_discard(&mut tx, account, instance_id)?)),
- RpcRequest::MtxConstructSpawn {} =>
- Ok(RpcMessage::ConstructSpawn(mtx::new_construct(&mut tx, account)?)),
+ RpcRequest::VboxReclaim { instance_id, index } =>
+ Ok(RpcMessage::InstanceState(vbox_reclaim(&mut tx, account, instance_id, index)?)),
- RpcRequest::MtxConstructApply { mtx, construct_id, name } =>
- Ok(RpcMessage::AccountConstructs(mtx::apply(&mut tx, account, mtx, construct_id, name)?)),
+ RpcRequest::VboxUnequip { instance_id, construct_id, target } =>
+ Ok(RpcMessage::InstanceState(vbox_unequip(&mut tx, account, instance_id, construct_id, target)?)),
- RpcRequest::MtxBuy { mtx } =>
- Ok(RpcMessage::AccountShop(mtx::buy(&mut tx, account, mtx)?)),
+ RpcRequest::MtxConstructSpawn {} =>
+ Ok(RpcMessage::ConstructSpawn(mtx::new_construct(&mut tx, account)?)),
- _ => Err(format_err!("unknown request request={:?}", request)),
- };
+ RpcRequest::MtxConstructApply { mtx, construct_id, name } =>
+ Ok(RpcMessage::AccountConstructs(mtx::apply(&mut tx, account, mtx, construct_id, name)?)),
- tx.commit()?;
+ RpcRequest::MtxBuy { mtx } =>
+ Ok(RpcMessage::AccountShop(mtx::buy(&mut tx, account, mtx)?)),
- info!("request={:?} account={:?} duration={:?}", request, account.name, begin.elapsed());
+ _ => Err(format_err!("unknown request request={:?}", request)),
+ };
- return response;
- },
- Err(e) => {
- warn!("{:?}", e);
- Err(err_msg("invalid message"))
- },
+ tx.commit()?;
+
+ info!("request={:?} account={:?} duration={:?}", request, account.name, begin.elapsed());
+
+ return response;
+ },
+ Err(e) => {
+ warn!("{:?}", e);
+ Err(err_msg("invalid message"))
+ },
+ }
}
}
+
+// we unwrap everything in here cause really
+// we don't care if this panics
+// it's run in a thread so it's supposed to bail
+// when it encounters errors
+impl Handler for Connection {
+ fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> {
+ info!("websocket connected account={:?}", self.account);
+
+ // tell events we have connected
+ self.events.send(Event::Connect(self.id, self.account.clone(), self.ws.clone())).unwrap();
+
+ // if user logged in do some prep work
+ if let Some(ref a) = self.account {
+ self.ws.send(RpcMessage::AccountState(a.clone())).unwrap();
+ self.events.send(Event::Subscribe(self.id, a.id)).unwrap();
+
+ let db = self.pool.get().unwrap();
+ let mut tx = db.transaction().unwrap();
+
+ // send account constructs
+ let account_constructs = account::constructs(&mut tx, a).unwrap();
+ self.ws.send(RpcMessage::AccountConstructs(account_constructs)).unwrap();
+
+ // get account instances
+ // and send them to the client
+ let account_instances = account::account_instances(&mut tx, a).unwrap();
+ self.ws.send(RpcMessage::AccountInstances(account_instances)).unwrap();
+
+ let shop = mtx::account_shop(&mut tx, &a).unwrap();
+ self.ws.send(RpcMessage::AccountShop(shop)).unwrap();
+
+ let team = account::account_team(&mut tx, &a).unwrap();
+ self.ws.send(RpcMessage::AccountTeam(team)).unwrap();
+
+ // tx should do nothing
+ tx.commit().unwrap();
+ }
+
+ Ok(())
+ }
+
+ fn on_message(&mut self, msg: Message) -> ws::Result<()> {
+ match msg {
+ Message::Binary(msg) => {
+ let begin = Instant::now();
+ let db_connection = self.pool.get().unwrap();
+
+ match self.receive(msg, &db_connection, begin) {
+ Ok(reply) => {
+ // if the user queries the state of something
+ // we tell events to push updates to them
+ match reply {
+ RpcMessage::AccountState(ref v) =>
+ self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
+ RpcMessage::GameState(ref v) =>
+ self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
+ RpcMessage::InstanceState(ref v) =>
+ self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
+ _ => (),
+ };
+
+ self.ws.send(reply).unwrap();
+ },
+ Err(e) => {
+ warn!("{:?}", e);
+ self.ws.send(RpcMessage::Error(e.to_string())).unwrap();
+ },
+ };
+ },
+ _ => (),
+ };
+ Ok(())
+ }
+
+ fn on_close(&mut self, _: CloseCode, _: &str) {
+ info!("websocket disconnected account={:?}", self.account);
+ self.events.send(Event::Disconnect(self.id)).unwrap();
+ }
+
+ fn on_request(&mut self, req: &Request) -> ws::Result {
+ let res = Response::from_request(req)?;
+
+ if let Some(cl) = req.header("Cookie") {
+ let unauth = || Ok(Response::new(401, "Unauthorized", b"401 - Unauthorized".to_vec()));
+ let cookie_list = match str::from_utf8(cl) {
+ Ok(cl) => cl,
+ Err(_) => return unauth(),
+ };
+
+ for s in cookie_list.split(";").map(|s| s.trim()) {
+ let cookie = match Cookie::parse(s) {
+ Ok(c) => c,
+ Err(_) => return unauth(),
+ };
+
+ // got auth token
+ if cookie.name() == TOKEN_HEADER {
+ let db = self.pool.get().unwrap();
+ match account::from_token(&db, cookie.value().to_string()) {
+ Ok(a) => self.account = Some(a),
+ Err(_) => return unauth(),
+ }
+ }
+ };
+ };
+
+ Ok(res)
+ }
+}
+
+pub fn start(pool: PgPool, events_tx: CbSender) {
+ let mut rng = thread_rng();
+ listen("127.0.0.1:40055", move |out| {
+
+ // we give the tx half to the connection object
+ // which in turn passes a clone to the events system
+ // the rx half goes into a thread where it waits for messages
+ // that need to be delivered to the client
+ // both the ws message handler and the events thread must use
+ // this channel to send messages
+ let (tx, rx) = unbounded::();
+
+ spawn(move || {
+ loop {
+ match rx.recv() {
+ Ok(n) => {
+ let response = to_vec(&n).unwrap();
+ out.send(Message::Binary(response)).unwrap();
+ }
+ // we done
+ Err(_e) => {
+ break;
+ },
+ };
+ }
+ });
+
+ Connection {
+ id: rng.gen::(),
+ account: None,
+ ws: tx,
+ pool: pool.clone(),
+ events: events_tx.clone(),
+ }
+ }).unwrap();
+}
+
diff --git a/server/src/warden.rs b/server/src/warden.rs
index c37cf024..f1ebd731 100644
--- a/server/src/warden.rs
+++ b/server/src/warden.rs
@@ -2,18 +2,21 @@ use std::time::{Duration};
use uuid::Uuid;
-use crossbeam_channel::{unbounded, tick, Sender, Receiver};
+use crossbeam_channel::{tick, Sender, Receiver};
// Db Commons
use postgres::transaction::Transaction;
use failure::Error;
use game::{games_need_upkeep, game_update, game_write, game_delete};
+use instance;
use instance::{instances_need_upkeep, instances_idle, instance_update, instance_delete};
use pg::{Db, PgPool};
+use events::{Event, EventsTx, PvpRequest};
+use rpc::{RpcMessage};
type Id = usize;
-type Pair = ((Id, Uuid), (Id, Uuid));
+type Pair = (PvpRequest, PvpRequest);
pub enum GameEvent {
Upkeep,
@@ -27,15 +30,16 @@ pub struct Warden {
pub tx: Sender,
rx: Receiver,
+ events: EventsTx,
pool: PgPool,
}
impl Warden {
- pub fn new(pool: PgPool) -> Warden {
- let (tx, rx) = unbounded();
+ pub fn new(tx: Sender, rx: Receiver, events: EventsTx, pool: PgPool) -> Warden {
Warden {
tx,
rx,
+ events,
pool,
}
}
@@ -44,13 +48,16 @@ impl Warden {
loop {
match self.rx.recv() {
Ok(m) => {
- self.event(m)?;
+ match self.event(m) {
+ Ok(()) => (), // :)
+ Err(e) => {
+ warn!("err={:?}", e);
+ }
+ }
},
- // idk if this is a good idea
- // possibly just log errors and continue...
Err(e) => {
- return Err(format_err!("events error err={:?}", e));
+ return Err(format_err!("err={:?}", e));
},
};
}
@@ -80,15 +87,24 @@ impl Warden {
}
fn on_match(&mut self, pair: Pair) -> Result<(), Error> {
- let db = self.pool.get()?;
- let tx = db.transaction()?;
-
info!("received pair={:?}", pair);
+ let db = self.pool.get()?;
+ let mut tx = db.transaction()?;
+ let instance = instance::pvp(&mut tx, &pair.0.account, &pair.1.account)?;
+ tx.commit()?;
+
+ // subscribe users to instance events
+ self.events.send(Event::Subscribe(pair.0.id, instance.id))?;
+ self.events.send(Event::Subscribe(pair.1.id, instance.id))?;
+
+ // send them the new instance state
+ let msg = RpcMessage::InstanceState(instance);
+ pair.0.tx.send(msg.clone())?;
+ pair.1.tx.send(msg)?;
+
Ok(())
}
-
-
}
fn fetch_games(mut tx: Transaction) -> Result {
@@ -131,4 +147,4 @@ pub fn upkeep_tick(warden: Sender) {
ticker.recv().unwrap();
warden.send(GameEvent::Upkeep).unwrap();
}
-}
\ No newline at end of file
+}
diff --git a/server/src/websocket.rs b/server/src/websocket.rs
deleted file mode 100644
index 016a2ae9..00000000
--- a/server/src/websocket.rs
+++ /dev/null
@@ -1,175 +0,0 @@
-use std::time::{Instant};
-use std::thread::spawn;
-use std::str;
-
-use rand::prelude::*;
-
-use serde_cbor::to_vec;
-
-use cookie::Cookie;
-
-use crossbeam_channel::{unbounded, Sender as CbSender};
-use ws::{ listen, CloseCode, Message, Handler, Result, Request, Response};
-
-use account;
-use account::{Account};
-use pg::{PgPool};
-use events::Event;
-
-use rpc::{RpcMessage};
-use rpc;
-use mtx;
-use net::TOKEN_HEADER;
-
-struct Connection {
- pub id: usize,
- pub ws: CbSender,
- pool: PgPool,
- account: Option,
- events: CbSender,
-}
-
-// we unwrap everything in here cause really
-// we don't care if this panics
-// it's run in a thread so it's supposed to bail
-// when it encounters errors
-impl Handler for Connection {
- fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> {
- info!("websocket connected account={:?}", self.account);
-
- // tell events we have connected
- self.events.send(Event::Connect(self.id, self.account.clone(), self.ws.clone())).unwrap();
-
- // if user logged in do some prep work
- if let Some(ref a) = self.account {
- self.ws.send(RpcMessage::AccountState(a.clone())).unwrap();
- self.events.send(Event::Subscribe(self.id, a.id)).unwrap();
-
- let db = self.pool.get().unwrap();
- let mut tx = db.transaction().unwrap();
-
- // send account constructs
- let account_constructs = account::account_constructs(&mut tx, a).unwrap();
- self.ws.send(rpc::RpcMessage::AccountConstructs(account_constructs)).unwrap();
-
- // get account instances
- // and send them to the client
- let account_instances = account::account_instances(&mut tx, a).unwrap();
- self.ws.send(rpc::RpcMessage::AccountInstances(account_instances)).unwrap();
-
- let shop = mtx::account_shop(&mut tx, &a).unwrap();
- self.ws.send(rpc::RpcMessage::AccountShop(shop)).unwrap();
-
- // tx should do nothing
- tx.commit().unwrap();
- }
-
- Ok(())
- }
-
- fn on_message(&mut self, msg: Message) -> Result<()> {
- match msg {
- Message::Binary(msg) => {
- let begin = Instant::now();
- let db_connection = self.pool.get().unwrap();
-
- match rpc::receive(msg, &db_connection, &self, begin, &self.account) {
- Ok(reply) => {
- // if the user queries the state of something
- // we tell events to push updates to them
- match reply {
- RpcMessage::AccountState(ref v) =>
- self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
- RpcMessage::GameState(ref v) =>
- self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
- RpcMessage::InstanceState(ref v) =>
- self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
- _ => (),
- };
-
- self.ws.send(reply).unwrap();
- },
- Err(e) => {
- warn!("{:?}", e);
- self.ws.send(RpcMessage::Error(e.to_string())).unwrap();
- },
- };
- },
- _ => (),
- };
- Ok(())
- }
-
- fn on_close(&mut self, _: CloseCode, _: &str) {
- info!("websocket disconnected account={:?}", self.account);
- self.events.send(Event::Disconnect(self.id)).unwrap();
- }
-
- fn on_request(&mut self, req: &Request) -> Result {
- let res = Response::from_request(req)?;
-
- if let Some(cl) = req.header("Cookie") {
- let unauth = || Ok(Response::new(401, "Unauthorized", b"401 - Unauthorized".to_vec()));
- let cookie_list = match str::from_utf8(cl) {
- Ok(cl) => cl,
- Err(_) => return unauth(),
- };
-
- for s in cookie_list.split(";").map(|s| s.trim()) {
- let cookie = match Cookie::parse(s) {
- Ok(c) => c,
- Err(_) => return unauth(),
- };
-
- // got auth token
- if cookie.name() == TOKEN_HEADER {
- let db = self.pool.get().unwrap();
- match account::from_token(&db, cookie.value().to_string()) {
- Ok(a) => self.account = Some(a),
- Err(_) => return unauth(),
- }
- }
- };
- };
-
- Ok(res)
- }
-}
-
-
-pub fn start(pool: PgPool, events_tx: CbSender) {
- let mut rng = thread_rng();
- listen("127.0.0.1:40055", move |out| {
-
- // we give the tx half to the connection object
- // which in turn passes a clone to the events system
- // the rx half goes into a thread where it waits for messages
- // that need to be delivered to the client
- // both the ws message handler and the events thread must use
- // this channel to send messages
- let (tx, rx) = unbounded::();
-
- spawn(move || {
- loop {
- match rx.recv() {
- Ok(n) => {
- let response = to_vec(&n).unwrap();
- out.send(Message::Binary(response)).unwrap();
- }
- // we done
- Err(_e) => {
- break;
- },
- };
- }
- });
-
- Connection {
- id: rng.gen::(),
- account: None,
- ws: tx,
- pool: pool.clone(),
- events: events_tx.clone(),
- }
- }).unwrap();
-}