sheeeeeeeeeeeeeeeeeeeeeeeeeeit

This commit is contained in:
ntr 2019-07-30 00:48:40 +10:00
parent fd518ba4e2
commit c94b6f134c
12 changed files with 357 additions and 324 deletions

View File

@ -64,7 +64,6 @@ const addState = connect(
function List(args) { function List(args) {
const { const {
team, team,
constructs,
constructRename, constructRename,
clearMtxRename, clearMtxRename,
setConstructRename, setConstructRename,
@ -73,9 +72,7 @@ function List(args) {
sendConstructAvatarReroll, sendConstructAvatarReroll,
} = args; } = args;
const constructPanels = constructs const constructPanels = team
.filter(c => team.includes(c.id))
.sort(idSort)
.map(construct => { .map(construct => {
const constructName = constructRename === construct.id const constructName = constructRename === construct.id
? <input id='renameInput' type="text" style="text-align: center" placeholder="enter a new name"></input> ? <input id='renameInput' type="text" style="text-align: center" placeholder="enter a new name"></input>

View File

@ -10,7 +10,6 @@ function registerEvents(store) {
} }
function setTeam(team) { function setTeam(team) {
localStorage.setItem('team', JSON.stringify(team));
store.dispatch(actions.setTeam(team)); store.dispatch(actions.setTeam(team));
} }
@ -179,11 +178,6 @@ function registerEvents(store) {
return store.dispatch(actions.setItemInfo(v)); return store.dispatch(actions.setItemInfo(v));
} }
const team = JSON.parse(localStorage.getItem('team'));
if (team && team.every(t => t)) {
store.dispatch(actions.setTeam(team));
}
// events.on('SET_PLAYER', setInstance); // events.on('SET_PLAYER', setInstance);
// events.on('SEND_SKILL', function skillActive(gameId, constructId, targetConstructId, skill) { // events.on('SEND_SKILL', function skillActive(gameId, constructId, targetConstructId, skill) {
@ -234,6 +228,7 @@ function registerEvents(store) {
setItemInfo, setItemInfo,
setPing, setPing,
setShop, setShop,
setTeam,
setWs, setWs,
}; };
} }

View File

@ -113,10 +113,9 @@ function createSocket(events) {
} }
function sendInstanceQueue() { function sendInstanceQueue() {
send(['InstancePractice', {}]); send(['InstanceQueue', {}]);
} }
function sendInstanceReady(instanceId) { function sendInstanceReady(instanceId) {
send(['InstanceReady', { instance_id: instanceId }]); send(['InstanceReady', { instance_id: instanceId }]);
} }
@ -155,6 +154,10 @@ function createSocket(events) {
events.setConstructList(constructs); events.setConstructList(constructs);
} }
function onAccountTeam(constructs) {
events.setTeam(constructs);
}
function onConstructSpawn(construct) { function onConstructSpawn(construct) {
events.setNewConstruct(construct); events.setNewConstruct(construct);
} }
@ -186,6 +189,7 @@ function createSocket(events) {
const handlers = { const handlers = {
AccountState: onAccount, AccountState: onAccount,
AccountConstructs: onAccountConstructs, AccountConstructs: onAccountConstructs,
AccountTeam: onAccountTeam,
AccountInstances: onAccountInstances, AccountInstances: onAccountInstances,
AccountShop: onAccountShop, AccountShop: onAccountShop,
ConstructSpawn: onConstructSpawn, ConstructSpawn: onConstructSpawn,
@ -193,6 +197,10 @@ function createSocket(events) {
InstanceState: onInstanceState, InstanceState: onInstanceState,
ItemInfo: onItemInfo, ItemInfo: onItemInfo,
Pong: onPong, Pong: onPong,
QueueRequested: () => console.log('pvp queue request received'),
QueueJoined: () => console.log('you have joined the pvp queue'),
Error: errHandler, Error: errHandler,
}; };

View File

@ -20,6 +20,9 @@ server {
root /var/lib/mnml/public/current; root /var/lib/mnml/public/current;
index index.html; index index.html;
try_files $uri $uri/ index.html; try_files $uri $uri/ index.html;
add_header 'Cache-Control' 'no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0';
expires off;
} }
location /imgs/ { location /imgs/ {

View File

@ -265,7 +265,7 @@ pub fn create(name: &String, password: &String, code: &String, tx: &mut Transact
Ok(token) Ok(token)
} }
pub fn account_constructs(tx: &mut Transaction, account: &Account) -> Result<Vec<Construct>, Error> { pub fn constructs(tx: &mut Transaction, account: &Account) -> Result<Vec<Construct>, Error> {
let query = " let query = "
SELECT data SELECT data
FROM constructs FROM constructs

View File

@ -5,7 +5,7 @@ use uuid::Uuid;
use failure::Error; use failure::Error;
use crossbeam_channel::{unbounded, Sender, Receiver}; use crossbeam_channel::{Sender, Receiver};
use account; use account;
use account::Account; use account::Account;
@ -19,12 +19,23 @@ use warden::{GameEvent};
pub type EventsTx = Sender<Event>; pub type EventsTx = Sender<Event>;
type Id = usize; type Id = usize;
// this is pretty heavyweight
// but it makes the ergonomics easy
// and prevents message mania
// good candidate to trim if the mm is slow
#[derive(Debug, Clone)]
pub struct PvpRequest {
pub id: Id,
pub tx: Sender<RpcMessage>,
pub account: Account,
}
pub struct Events { pub struct Events {
pub tx: Sender<Event>, pub tx: Sender<Event>,
rx: Receiver<Event>, rx: Receiver<Event>,
warden: Sender<GameEvent>, warden: Sender<GameEvent>,
queue: Option<Uuid>, queue: Option<PvpRequest>,
clients: HashMap<Id, WsClient>, clients: HashMap<Id, WsClient>,
} }
@ -41,7 +52,7 @@ pub enum Event {
Push(Uuid, RpcMessage), Push(Uuid, RpcMessage),
// client events // client events
Queue(Id, Uuid), Queue(PvpRequest),
} }
struct WsClient { struct WsClient {
@ -51,8 +62,7 @@ struct WsClient {
} }
impl Events { impl Events {
pub fn new(warden: Sender<GameEvent>) -> Events { pub fn new(tx: Sender<Event>, rx: Receiver<Event>, warden: Sender<GameEvent>) -> Events {
let (tx, rx) = unbounded();
Events { Events {
tx, tx,
rx, rx,
@ -66,11 +76,14 @@ impl Events {
loop { loop {
match self.rx.recv() { match self.rx.recv() {
Ok(m) => { Ok(m) => {
self.on_event(m)?; match self.event(m) {
Ok(()) => (), // :)
Err(e) => {
warn!("err={:?}", e);
}
}
}, },
// idk if this is a good idea
// possibly just log errors and continue...
Err(e) => { Err(e) => {
return Err(format_err!("events error err={:?}", e)); return Err(format_err!("events error err={:?}", e));
}, },
@ -89,7 +102,7 @@ impl Events {
self.clients.remove(&id); self.clients.remove(&id);
} }
fn on_event(&mut self, msg: Event) -> Result<(), Error> { fn event(&mut self, msg: Event) -> Result<(), Error> {
match msg { match msg {
Event::Connect(id, account, tx) => { Event::Connect(id, account, tx) => {
info!("connect id={:?} account={:?}", id, account); info!("connect id={:?} account={:?}", id, account);
@ -100,6 +113,7 @@ impl Events {
info!("clients={:?}", self.clients.len()); info!("clients={:?}", self.clients.len());
Ok(()) Ok(())
}, },
Event::Disconnect(id) => { Event::Disconnect(id) => {
info!("disconnect id={:?}", id); info!("disconnect id={:?}", id);
@ -107,7 +121,8 @@ impl Events {
info!("clients={:?}", self.clients.len()); info!("clients={:?}", self.clients.len());
Ok(()) Ok(())
} },
Event::Subscribe(id, obj) => { Event::Subscribe(id, obj) => {
info!("subscribe id={:?} object={:?}", id, obj); info!("subscribe id={:?} object={:?}", id, obj);
@ -120,6 +135,7 @@ impl Events {
None => return Err(format_err!("unknown client {:?}", id)) None => return Err(format_err!("unknown client {:?}", id))
} }
}, },
Event::Unsubscribe(id, obj) => { Event::Unsubscribe(id, obj) => {
info!("unsubscribe id={:?} object={:?}", id, obj); info!("unsubscribe id={:?} object={:?}", id, obj);
@ -162,17 +178,18 @@ impl Events {
Ok(()) Ok(())
}, },
Event::Queue(id, account) => { Event::Queue(req) => {
info!("queue id={:?} account={:?}", id, account); info!("queue id={:?} account={:?}", req.id, req.account);
self.queue = match self.queue { self.queue = match self.queue {
Some(id) => { Some(ref q_req) => {
info!("game queue pair a={:?} b={:?}", account, id); info!("game queue pair found a={:?} b={:?}", req.account, q_req.account);
self.warden.send(GameEvent::Match((req, q_req.clone())))?;
None None
}, },
None => { None => {
info!("joined game queue id={:?} account={:?}", id, account); info!("joined game queue id={:?} account={:?}", req.id, req.account);
Some(account) Some(req)
}, },
}; };

View File

@ -13,7 +13,7 @@ use chrono::Duration;
use account::Account; use account::Account;
use account; use account;
use events::EventsTx; use events::{EventsTx, Event};
use player::{Player, player_create}; use player::{Player, player_create};
use construct::{Construct, construct_get}; use construct::{Construct, construct_get};
use mob::{bot_player, instance_mobs}; use mob::{bot_player, instance_mobs};
@ -692,7 +692,7 @@ pub fn instance_practice(tx: &mut Transaction, account: &Account) -> Result<Inst
// pub fn instance_queue(tx: &mut Transaction, a: &Account, b: &Account) -> Result<Instance, Error> { // pub fn instance_queue(tx: &mut Transaction, a: &Account, b: &Account) -> Result<Instance, Error> {
// } // }
pub fn instance_pvp(tx: &mut Transaction, a: &Account, b: &Account) -> Result<Instance, Error> { pub fn pvp(tx: &mut Transaction, a: &Account, b: &Account) -> Result<Instance, Error> {
let mut instance = Instance::new() let mut instance = Instance::new()
// TODO generate nice game names // TODO generate nice game names
.set_name("PVP".to_string())?; .set_name("PVP".to_string())?;
@ -747,10 +747,6 @@ pub fn instance_game_finished(tx: &mut Transaction, game: &Game, instance_id: Uu
Ok(()) Ok(())
} }
pub fn instance_queue(events: &EventsTx, account: &Account) -> Result<RpcMessage, Error> {
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -49,11 +49,12 @@ mod spec;
mod util; mod util;
mod vbox; mod vbox;
mod warden; mod warden;
mod websocket;
use std::thread::{spawn}; use std::thread::{spawn};
use std::path::{Path}; use std::path::{Path};
use crossbeam_channel::{unbounded};
fn setup_logger() -> Result<(), fern::InitError> { fn setup_logger() -> Result<(), fern::InitError> {
fern::Dispatch::new() fern::Dispatch::new()
.format(|out, message, record| { .format(|out, message, record| {
@ -80,20 +81,22 @@ fn main() {
setup_logger().unwrap(); setup_logger().unwrap();
let pool = pg::create_pool(); let pool = pg::create_pool();
let warden = warden::Warden::new(pool.clone());
let warden_tx = warden.tx.clone();
let warden_tick_tx = warden.tx.clone();
let http_pool = pool.clone(); let http_pool = pool.clone();
let (events_tx, events_rx) = unbounded();
let pg_events_tx = events_tx.clone();
let rpc_events_tx = events_tx.clone();
let (warden_tx, warden_rx) = unbounded();
let events_warden_tx = warden_tx.clone();
let warden_tick_tx = warden_tx.clone();
// create a clone of the tx so ws handler can tell events // create a clone of the tx so ws handler can tell events
// about connection status // about connection status
let events = events::Events::new(warden_tx); let events = events::Events::new(events_tx, events_rx, events_warden_tx);
let ws_events_tx = events.tx.clone(); let warden = warden::Warden::new(warden_tx, warden_rx, events.tx.clone(), pool.clone());
let pg_pool = pool.clone(); let pg_pool = pool.clone();
let pg_events_tx = events.tx.clone();
spawn(move || net::start(http_pool)); spawn(move || net::start(http_pool));
spawn(move || warden.listen()); spawn(move || warden.listen());
@ -102,6 +105,6 @@ fn main() {
spawn(move || events.listen()); spawn(move || events.listen());
// the main thread becomes this ws listener // the main thread becomes this ws listener
let ws_pool = pool.clone(); let rpc_pool = pool.clone();
websocket::start(ws_pool, ws_events_tx); rpc::start(rpc_pool, rpc_events_tx);
} }

View File

@ -159,7 +159,7 @@ pub fn apply(tx: &mut Transaction, account: &Account, variant: MtxVariant, const
}; };
construct_write(tx, construct)?; construct_write(tx, construct)?;
account::account_constructs(tx, account) account::constructs(tx, account)
} }
pub fn select(tx: &mut Transaction, variant: MtxVariant, account: Uuid) -> Result<Mtx, Error> { pub fn select(tx: &mut Transaction, variant: MtxVariant, account: Uuid) -> Result<Mtx, Error> {

View File

@ -1,31 +1,38 @@
use std::time::{Instant}; use std::time::{Instant};
use std::thread::spawn;
use std::str;
use serde_cbor::{from_slice};
use uuid::Uuid; use uuid::Uuid;
use rand::prelude::*;
use failure::Error; use failure::Error;
use failure::err_msg; use failure::err_msg;
use crossbeam_channel::{Sender}; use serde_cbor::{from_slice, to_vec};
use cookie::Cookie;
use crossbeam_channel::{unbounded, Sender as CbSender};
use ws::{listen, CloseCode, Message, Handler, Request, Response};
use account::{Account};
use account; use account;
use pg::{Db};
use events::{Event};
use construct::{Construct}; use construct::{Construct};
use events::{Event, PvpRequest};
use game::{Game, game_state, game_skill, game_ready}; use game::{Game, game_state, game_skill, game_ready};
use account::{Account, account_constructs}; use instance::{Instance, instance_state, instance_practice, instance_ready};
use skill::{Skill, dev_resolve, Resolutions};
use instance::{Instance, instance_state, instance_practice, instance_ready, instance_queue};
use vbox::{vbox_accept, vbox_apply, vbox_discard, vbox_combine, vbox_reclaim, vbox_unequip};
use item::{Item, ItemInfoCtr, item_info}; use item::{Item, ItemInfoCtr, item_info};
use websocket::{Connection};
use mtx; use mtx;
use pg::{Db};
use pg::{PgPool};
use skill::{Skill, dev_resolve, Resolutions};
use vbox::{vbox_accept, vbox_apply, vbox_discard, vbox_combine, vbox_reclaim, vbox_unequip};
use net::TOKEN_HEADER;
#[derive(Debug,Clone,Serialize,Deserialize)] #[derive(Debug,Clone,Serialize,Deserialize)]
pub enum RpcMessage { pub enum RpcMessage {
AccountState(Account), AccountState(Account),
AccountConstructs(Vec<Construct>), AccountConstructs(Vec<Construct>),
AccountTeam(Vec<Construct>),
AccountInstances(Vec<Instance>), AccountInstances(Vec<Instance>),
AccountShop(mtx::Shop), AccountShop(mtx::Shop),
ConstructSpawn(Construct), ConstructSpawn(Construct),
@ -38,9 +45,9 @@ pub enum RpcMessage {
DevResolutions(Resolutions), DevResolutions(Resolutions),
QueueRequested, QueueRequested(()),
QueueJoined, QueueJoined(()),
QueueCancelled, QueueCancelled(()),
Error(String), Error(String),
} }
@ -78,7 +85,16 @@ enum RpcRequest {
VboxReclaim { instance_id: Uuid, index: usize }, VboxReclaim { instance_id: Uuid, index: usize },
} }
pub fn receive(data: Vec<u8>, db: &Db, connection: &Connection, begin: Instant, account: &Option<Account>) -> Result<RpcMessage, Error> { struct Connection {
pub id: usize,
pub ws: CbSender<RpcMessage>,
pool: PgPool,
account: Option<Account>,
events: CbSender<Event>,
}
impl Connection {
fn receive(&self, data: Vec<u8>, db: &Db, begin: Instant) -> Result<RpcMessage, Error> {
// cast the msg to this type to receive method name // cast the msg to this type to receive method name
match from_slice::<RpcRequest>(&data) { match from_slice::<RpcRequest>(&data) {
Ok(v) => { Ok(v) => {
@ -94,11 +110,21 @@ pub fn receive(data: Vec<u8>, db: &Db, connection: &Connection, begin: Instant,
}; };
// check for authorization now // check for authorization now
let account = match account { let account = match self.account {
Some(account) => account, Some(ref account) => account,
None => return Err(err_msg("auth required")), None => return Err(err_msg("auth required")),
}; };
// evented but authorization required
match v {
RpcRequest::InstanceQueue {} => {
let pvp = PvpRequest { id: self.id, account: account.clone(), tx: self.ws.clone() };
self.events.send(Event::Queue(pvp))?;
return Ok(RpcMessage::QueueRequested(()));
},
_ => (),
};
// all good, let's make a tx and process // all good, let's make a tx and process
let mut tx = db.transaction()?; let mut tx = db.transaction()?;
@ -108,10 +134,10 @@ pub fn receive(data: Vec<u8>, db: &Db, connection: &Connection, begin: Instant,
RpcRequest::AccountState {} => RpcRequest::AccountState {} =>
return Ok(RpcMessage::AccountState(account.clone())), return Ok(RpcMessage::AccountState(account.clone())),
RpcRequest::AccountConstructs {} => RpcRequest::AccountConstructs {} =>
Ok(RpcMessage::AccountConstructs(account_constructs(&mut tx, &account)?)), Ok(RpcMessage::AccountConstructs(account::constructs(&mut tx, &account)?)),
RpcRequest::AccountSetTeam { ids } => RpcRequest::AccountSetTeam { ids } =>
Ok(RpcMessage::AccountConstructs(account::set_team(&mut tx, &account, ids)?)), Ok(RpcMessage::AccountTeam(account::set_team(&mut tx, &account, ids)?)),
// RpcRequest::AccountShop {} => // RpcRequest::AccountShop {} =>
// Ok(RpcMessage::AccountShop(mtx::account_shop(&mut tx, &account)?)), // Ok(RpcMessage::AccountShop(mtx::account_shop(&mut tx, &account)?)),
@ -127,8 +153,6 @@ pub fn receive(data: Vec<u8>, db: &Db, connection: &Connection, begin: Instant,
RpcRequest::GameReady { id } => RpcRequest::GameReady { id } =>
Ok(RpcMessage::GameState(game_ready(&mut tx, account, id)?)), Ok(RpcMessage::GameState(game_ready(&mut tx, account, id)?)),
RpcRequest::InstanceQueue {} =>
Ok(RpcMessage::QueueRequested),
RpcRequest::InstancePractice {} => RpcRequest::InstancePractice {} =>
Ok(RpcMessage::InstanceState(instance_practice(&mut tx, account)?)), Ok(RpcMessage::InstanceState(instance_practice(&mut tx, account)?)),
@ -180,3 +204,152 @@ pub fn receive(data: Vec<u8>, db: &Db, connection: &Connection, begin: Instant,
}, },
} }
} }
}
// we unwrap everything in here cause really
// we don't care if this panics
// it's run in a thread so it's supposed to bail
// when it encounters errors
impl Handler for Connection {
fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> {
info!("websocket connected account={:?}", self.account);
// tell events we have connected
self.events.send(Event::Connect(self.id, self.account.clone(), self.ws.clone())).unwrap();
// if user logged in do some prep work
if let Some(ref a) = self.account {
self.ws.send(RpcMessage::AccountState(a.clone())).unwrap();
self.events.send(Event::Subscribe(self.id, a.id)).unwrap();
let db = self.pool.get().unwrap();
let mut tx = db.transaction().unwrap();
// send account constructs
let account_constructs = account::constructs(&mut tx, a).unwrap();
self.ws.send(RpcMessage::AccountConstructs(account_constructs)).unwrap();
// get account instances
// and send them to the client
let account_instances = account::account_instances(&mut tx, a).unwrap();
self.ws.send(RpcMessage::AccountInstances(account_instances)).unwrap();
let shop = mtx::account_shop(&mut tx, &a).unwrap();
self.ws.send(RpcMessage::AccountShop(shop)).unwrap();
let team = account::account_team(&mut tx, &a).unwrap();
self.ws.send(RpcMessage::AccountTeam(team)).unwrap();
// tx should do nothing
tx.commit().unwrap();
}
Ok(())
}
fn on_message(&mut self, msg: Message) -> ws::Result<()> {
match msg {
Message::Binary(msg) => {
let begin = Instant::now();
let db_connection = self.pool.get().unwrap();
match self.receive(msg, &db_connection, begin) {
Ok(reply) => {
// if the user queries the state of something
// we tell events to push updates to them
match reply {
RpcMessage::AccountState(ref v) =>
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
RpcMessage::GameState(ref v) =>
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
RpcMessage::InstanceState(ref v) =>
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
_ => (),
};
self.ws.send(reply).unwrap();
},
Err(e) => {
warn!("{:?}", e);
self.ws.send(RpcMessage::Error(e.to_string())).unwrap();
},
};
},
_ => (),
};
Ok(())
}
fn on_close(&mut self, _: CloseCode, _: &str) {
info!("websocket disconnected account={:?}", self.account);
self.events.send(Event::Disconnect(self.id)).unwrap();
}
fn on_request(&mut self, req: &Request) -> ws::Result<Response> {
let res = Response::from_request(req)?;
if let Some(cl) = req.header("Cookie") {
let unauth = || Ok(Response::new(401, "Unauthorized", b"401 - Unauthorized".to_vec()));
let cookie_list = match str::from_utf8(cl) {
Ok(cl) => cl,
Err(_) => return unauth(),
};
for s in cookie_list.split(";").map(|s| s.trim()) {
let cookie = match Cookie::parse(s) {
Ok(c) => c,
Err(_) => return unauth(),
};
// got auth token
if cookie.name() == TOKEN_HEADER {
let db = self.pool.get().unwrap();
match account::from_token(&db, cookie.value().to_string()) {
Ok(a) => self.account = Some(a),
Err(_) => return unauth(),
}
}
};
};
Ok(res)
}
}
pub fn start(pool: PgPool, events_tx: CbSender<Event>) {
let mut rng = thread_rng();
listen("127.0.0.1:40055", move |out| {
// we give the tx half to the connection object
// which in turn passes a clone to the events system
// the rx half goes into a thread where it waits for messages
// that need to be delivered to the client
// both the ws message handler and the events thread must use
// this channel to send messages
let (tx, rx) = unbounded::<RpcMessage>();
spawn(move || {
loop {
match rx.recv() {
Ok(n) => {
let response = to_vec(&n).unwrap();
out.send(Message::Binary(response)).unwrap();
}
// we done
Err(_e) => {
break;
},
};
}
});
Connection {
id: rng.gen::<usize>(),
account: None,
ws: tx,
pool: pool.clone(),
events: events_tx.clone(),
}
}).unwrap();
}

View File

@ -2,18 +2,21 @@ use std::time::{Duration};
use uuid::Uuid; use uuid::Uuid;
use crossbeam_channel::{unbounded, tick, Sender, Receiver}; use crossbeam_channel::{tick, Sender, Receiver};
// Db Commons // Db Commons
use postgres::transaction::Transaction; use postgres::transaction::Transaction;
use failure::Error; use failure::Error;
use game::{games_need_upkeep, game_update, game_write, game_delete}; use game::{games_need_upkeep, game_update, game_write, game_delete};
use instance;
use instance::{instances_need_upkeep, instances_idle, instance_update, instance_delete}; use instance::{instances_need_upkeep, instances_idle, instance_update, instance_delete};
use pg::{Db, PgPool}; use pg::{Db, PgPool};
use events::{Event, EventsTx, PvpRequest};
use rpc::{RpcMessage};
type Id = usize; type Id = usize;
type Pair = ((Id, Uuid), (Id, Uuid)); type Pair = (PvpRequest, PvpRequest);
pub enum GameEvent { pub enum GameEvent {
Upkeep, Upkeep,
@ -27,15 +30,16 @@ pub struct Warden {
pub tx: Sender<GameEvent>, pub tx: Sender<GameEvent>,
rx: Receiver<GameEvent>, rx: Receiver<GameEvent>,
events: EventsTx,
pool: PgPool, pool: PgPool,
} }
impl Warden { impl Warden {
pub fn new(pool: PgPool) -> Warden { pub fn new(tx: Sender<GameEvent>, rx: Receiver<GameEvent>, events: EventsTx, pool: PgPool) -> Warden {
let (tx, rx) = unbounded();
Warden { Warden {
tx, tx,
rx, rx,
events,
pool, pool,
} }
} }
@ -44,13 +48,16 @@ impl Warden {
loop { loop {
match self.rx.recv() { match self.rx.recv() {
Ok(m) => { Ok(m) => {
self.event(m)?; match self.event(m) {
Ok(()) => (), // :)
Err(e) => {
warn!("err={:?}", e);
}
}
}, },
// idk if this is a good idea
// possibly just log errors and continue...
Err(e) => { Err(e) => {
return Err(format_err!("events error err={:?}", e)); return Err(format_err!("err={:?}", e));
}, },
}; };
} }
@ -80,15 +87,24 @@ impl Warden {
} }
fn on_match(&mut self, pair: Pair) -> Result<(), Error> { fn on_match(&mut self, pair: Pair) -> Result<(), Error> {
let db = self.pool.get()?;
let tx = db.transaction()?;
info!("received pair={:?}", pair); info!("received pair={:?}", pair);
let db = self.pool.get()?;
let mut tx = db.transaction()?;
let instance = instance::pvp(&mut tx, &pair.0.account, &pair.1.account)?;
tx.commit()?;
// subscribe users to instance events
self.events.send(Event::Subscribe(pair.0.id, instance.id))?;
self.events.send(Event::Subscribe(pair.1.id, instance.id))?;
// send them the new instance state
let msg = RpcMessage::InstanceState(instance);
pair.0.tx.send(msg.clone())?;
pair.1.tx.send(msg)?;
Ok(()) Ok(())
} }
} }
fn fetch_games(mut tx: Transaction) -> Result<Transaction, Error> { fn fetch_games(mut tx: Transaction) -> Result<Transaction, Error> {

View File

@ -1,175 +0,0 @@
use std::time::{Instant};
use std::thread::spawn;
use std::str;
use rand::prelude::*;
use serde_cbor::to_vec;
use cookie::Cookie;
use crossbeam_channel::{unbounded, Sender as CbSender};
use ws::{ listen, CloseCode, Message, Handler, Result, Request, Response};
use account;
use account::{Account};
use pg::{PgPool};
use events::Event;
use rpc::{RpcMessage};
use rpc;
use mtx;
use net::TOKEN_HEADER;
struct Connection {
pub id: usize,
pub ws: CbSender<RpcMessage>,
pool: PgPool,
account: Option<Account>,
events: CbSender<Event>,
}
// we unwrap everything in here cause really
// we don't care if this panics
// it's run in a thread so it's supposed to bail
// when it encounters errors
impl Handler for Connection {
fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> {
info!("websocket connected account={:?}", self.account);
// tell events we have connected
self.events.send(Event::Connect(self.id, self.account.clone(), self.ws.clone())).unwrap();
// if user logged in do some prep work
if let Some(ref a) = self.account {
self.ws.send(RpcMessage::AccountState(a.clone())).unwrap();
self.events.send(Event::Subscribe(self.id, a.id)).unwrap();
let db = self.pool.get().unwrap();
let mut tx = db.transaction().unwrap();
// send account constructs
let account_constructs = account::account_constructs(&mut tx, a).unwrap();
self.ws.send(rpc::RpcMessage::AccountConstructs(account_constructs)).unwrap();
// get account instances
// and send them to the client
let account_instances = account::account_instances(&mut tx, a).unwrap();
self.ws.send(rpc::RpcMessage::AccountInstances(account_instances)).unwrap();
let shop = mtx::account_shop(&mut tx, &a).unwrap();
self.ws.send(rpc::RpcMessage::AccountShop(shop)).unwrap();
// tx should do nothing
tx.commit().unwrap();
}
Ok(())
}
fn on_message(&mut self, msg: Message) -> Result<()> {
match msg {
Message::Binary(msg) => {
let begin = Instant::now();
let db_connection = self.pool.get().unwrap();
match rpc::receive(msg, &db_connection, &self, begin, &self.account) {
Ok(reply) => {
// if the user queries the state of something
// we tell events to push updates to them
match reply {
RpcMessage::AccountState(ref v) =>
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
RpcMessage::GameState(ref v) =>
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
RpcMessage::InstanceState(ref v) =>
self.events.send(Event::Subscribe(self.id, v.id)).unwrap(),
_ => (),
};
self.ws.send(reply).unwrap();
},
Err(e) => {
warn!("{:?}", e);
self.ws.send(RpcMessage::Error(e.to_string())).unwrap();
},
};
},
_ => (),
};
Ok(())
}
fn on_close(&mut self, _: CloseCode, _: &str) {
info!("websocket disconnected account={:?}", self.account);
self.events.send(Event::Disconnect(self.id)).unwrap();
}
fn on_request(&mut self, req: &Request) -> Result<Response> {
let res = Response::from_request(req)?;
if let Some(cl) = req.header("Cookie") {
let unauth = || Ok(Response::new(401, "Unauthorized", b"401 - Unauthorized".to_vec()));
let cookie_list = match str::from_utf8(cl) {
Ok(cl) => cl,
Err(_) => return unauth(),
};
for s in cookie_list.split(";").map(|s| s.trim()) {
let cookie = match Cookie::parse(s) {
Ok(c) => c,
Err(_) => return unauth(),
};
// got auth token
if cookie.name() == TOKEN_HEADER {
let db = self.pool.get().unwrap();
match account::from_token(&db, cookie.value().to_string()) {
Ok(a) => self.account = Some(a),
Err(_) => return unauth(),
}
}
};
};
Ok(res)
}
}
pub fn start(pool: PgPool, events_tx: CbSender<Event>) {
let mut rng = thread_rng();
listen("127.0.0.1:40055", move |out| {
// we give the tx half to the connection object
// which in turn passes a clone to the events system
// the rx half goes into a thread where it waits for messages
// that need to be delivered to the client
// both the ws message handler and the events thread must use
// this channel to send messages
let (tx, rx) = unbounded::<RpcMessage>();
spawn(move || {
loop {
match rx.recv() {
Ok(n) => {
let response = to_vec(&n).unwrap();
out.send(Message::Binary(response)).unwrap();
}
// we done
Err(_e) => {
break;
},
};
}
});
Connection {
id: rng.gen::<usize>(),
account: None,
ws: tx,
pool: pool.clone(),
events: events_tx.clone(),
}
}).unwrap();
}