practice rpc

This commit is contained in:
ntr 2019-07-27 19:24:52 +10:00
parent fb84e1becf
commit 66223033b0
14 changed files with 286 additions and 309 deletions

View File

@ -53,7 +53,6 @@ if you click a vbox item and click inventory
it buys it
*SERVER*
* test rust-ws w/ hyper upgrade
* tx middleware
remove tx from methods that don't need it
@ -70,11 +69,7 @@ it buys it
* elo
* password on MM game to prevent direct joins
join queue ->
pubsub gets notified
sends a message to all ws
* convert game vecs to hashmap
* iconography
* icons change with %

View File

@ -26,6 +26,7 @@ html, body, #mnml {
/* stops inspector going skitz*/
overflow-x: hidden;
overflow-y: hidden;
}
@media (min-width: 1921px) {

View File

@ -15,7 +15,7 @@ const addState = connect(
const { ws, constructs, team } = state;
function sendConstructSpawn(name) {
return ws.sendMtxConstructBuy(name);
return ws.sendMtxConstructSpawn(name);
}
return {

View File

@ -131,8 +131,8 @@ function createSocket(events) {
send(['MtxBuy', { mtx }]);
}
function sendMtxConstructBuy() {
send(['MtxConstructBuy', {}]);
function sendMtxConstructSpawn() {
send(['MtxConstructSpawn', {}]);
}
// -------------
// Incoming
@ -298,7 +298,7 @@ function createSocket(events) {
sendItemInfo,
sendMtxApply,
sendMtxBuy,
sendMtxConstructBuy,
sendMtxConstructSpawn,
connect,
};
}

View File

@ -8,7 +8,10 @@ exports.up = async knex => {
.inTable('accounts')
.onDelete('CASCADE');
table.binary('data').notNullable();
table.index('id');
table.boolean('team')
.notNullable()
.defaultTo(false);
});
};

View File

@ -17,10 +17,6 @@ exports.up = async knex => {
table.timestamps(true, true);
table.binary('data').notNullable();
table.boolean('open')
.defaultTo(false)
.notNullable()
.index();
table.boolean('finished')
.defaultTo(false)

View File

@ -251,8 +251,8 @@ pub fn create(name: &String, password: &String, code: &String, tx: &mut Transact
};
// 3 constructs for a team and 1 to swap
for _i in 0..4 {
construct_spawn(tx, id, generate_name())?;
for i in 0..4 {
construct_spawn(tx, id, generate_name(), i < 3)?;
}
for mtx in FREE_MTX.iter() {
@ -296,6 +296,38 @@ pub fn account_constructs(tx: &mut Transaction, account: &Account) -> Result<Vec
return Ok(constructs);
}
pub fn account_team(tx: &mut Transaction, account: &Account) -> Result<Vec<Construct>, Error> {
let query = "
SELECT data
FROM constructs
WHERE account = $1
AND team = true;
";
let result = tx
.query(query, &[&account.id])?;
let constructs: Result<Vec<Construct>, _> = result.iter()
.map(|row| {
let construct_bytes: Vec<u8> = row.get(0);
match from_slice::<Construct>(&construct_bytes) {
Ok(c) => Ok(c),
Err(_e) => construct_recover(construct_bytes, tx),
}
})
.collect();
// catch any errors
if constructs.is_err() {
warn!("{:?}", constructs);
return Err(err_msg("could not deserialise a construct"));
}
let mut constructs = constructs.unwrap();
constructs.sort_by_key(|c| c.id);
return Ok(constructs);
}
pub fn account_instances(tx: &mut Transaction, account: &Account) -> Result<Vec<Instance>, Error> {
let query = "
SELECT data, id

View File

@ -869,7 +869,7 @@ pub fn construct_select(tx: &mut Transaction, id: Uuid, account_id: Uuid) -> Res
return Ok(construct);
}
pub fn construct_spawn(tx: &mut Transaction, account: Uuid, name: String) -> Result<Construct, Error> {
pub fn construct_spawn(tx: &mut Transaction, account: Uuid, name: String, team: bool) -> Result<Construct, Error> {
let construct = Construct::new()
.named(&name)
.set_account(account);
@ -877,13 +877,13 @@ pub fn construct_spawn(tx: &mut Transaction, account: Uuid, name: String) -> Res
let construct_bytes = to_vec(&construct)?;
let query = "
INSERT INTO constructs (id, account, data)
VALUES ($1, $2, $3)
INSERT INTO constructs (id, account, data, team)
VALUES ($1, $2, $3, $4)
RETURNING id, account;
";
let result = tx
.query(query, &[&construct.id, &account, &construct_bytes])?;
.query(query, &[&construct.id, &account, &construct_bytes, &team])?;
let _returned = result.iter().next().ok_or(err_msg("no row returned"))?;

View File

@ -11,27 +11,35 @@ use account;
use account::Account;
use game;
use instance;
use pg::{Db, PgPool};
use rpc::RpcMessage;
use warden::{GameEvent};
type Id = usize;
pub struct Events {
pub tx: Sender<Event>,
rx: Receiver<Event>,
pool: PgPool,
warden: Sender<GameEvent>,
clients: HashMap<Id, WsClient>,
}
#[derive(Debug,Clone)]
pub enum Event {
// ws lifecycle
Connect(Id, Option<Account>, Sender<RpcMessage>),
Disconnect(Id),
Subscribe(Id, Uuid),
Unsubscribe(Id, Uuid),
// notifications
Push(Uuid, RpcMessage),
// client events
Queue(Id)
}
struct WsClient {
@ -41,17 +49,17 @@ struct WsClient {
}
impl Events {
pub fn new(pool: PgPool) -> Events {
pub fn new(warden: Sender<GameEvent>) -> Events {
let (tx, rx) = unbounded();
Events {
tx,
rx,
pool,
warden,
clients: HashMap::new(),
}
}
pub fn listen(&mut self) -> Result<(), Error> {
pub fn listen(mut self) -> Result<(), Error> {
loop {
match self.rx.recv() {
Ok(m) => {
@ -74,24 +82,24 @@ impl Events {
fn on_event(&mut self, msg: Event) -> Result<(), Error> {
match msg {
Event::Connect(id, account, tx) => {
info!("client connected to events id={:?} account={:?}", id, account);
info!("connect id={:?} account={:?}", id, account);
let client = WsClient { id, tx, subs: HashSet::new() };
self.clients.insert(id, client);
info!("events clients={:?}", self.clients.len());
info!("clients={:?}", self.clients.len());
Ok(())
},
Event::Disconnect(id) => {
info!("client disconnected from events id={:?}", id);
info!("disconnect id={:?}", id);
self.clients.remove(&id);
info!("events clients={:?}", self.clients.len());
info!("clients={:?}", self.clients.len());
Ok(())
}
Event::Subscribe(id, obj) => {
info!("client subscribed to updates from object id={:?} object={:?}", id, obj);
info!("subscribe id={:?} object={:?}", id, obj);
match self.clients.get_mut(&id) {
Some(client) => {
@ -103,12 +111,12 @@ impl Events {
}
},
Event::Unsubscribe(id, obj) => {
info!("client subscribed to updates from object id={:?} object={:?}", id, obj);
info!("unsubscribe id={:?} object={:?}", id, obj);
match self.clients.get_mut(&id) {
Some(mut client) => {
client.subs.remove(&obj);
info!("client subscriptions={:?}", client.subs.len());
info!("unsubscribe subscriptions removed={:?}", client.subs.len());
Ok(())
},
None => return Err(format_err!("unknown client {:?}", id))
@ -116,124 +124,40 @@ impl Events {
},
Event::Push(id, msg) => {
info!("events received push notification id={:?} msg={:?}", id, msg);
info!("push id={:?} msg={:?}", id, msg);
let mut subs = 0;
for (_client_id, client) in self.clients.iter() {
let mut dead = vec![];
for (client_id, client) in self.clients.iter() {
if client.subs.contains(&id) {
subs += 1;
match client.tx.send(msg.clone()) {
Ok(_) => (),
Err(e) => {
warn!("unable to send msg to client err={:?}", e);
// self.remove_client(*client_id);
dead.push(*client_id);
},
};
}
}
info!("notification subscribers={:?}", subs);
if !dead.is_empty() {
info!("dead connections={:?}", dead.len());
dead.iter().for_each(|id| self.remove_client(*id));
}
info!("push subscribers={:?}", subs);
Ok(())
},
Event::Queue(id) => {
info!("queue id={:?}", id);
Ok(())
},
}
}
}
// #[derive(Debug)]
// struct Subscriptions {
// account: Option<Uuid>,
// game: Option<Uuid>,
// instance: Option<Uuid>,
// // account_instances: Vec<Uuid>,
// }
// impl Subscriptions {
// fn new(ws_pool: &PgPool, account: &Option<Account>, ws: &mut Ws) -> Result<Subscriptions, Error> {
// if let Some(a) = account {
// let db = ws_pool.get()?;
// let mut tx = db.transaction()?;
// // send account constructs
// let account_constructs = account::account_constructs(&mut tx, a)?;
// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountConstructs(account_constructs))?))?;
// // get account instances
// // and send them to the client
// let account_instances = account::account_instances(&mut tx, a)?;
// // let instances = account_instances.iter().map(|i| i.id).collect::<Vec<Uuid>>();
// ws.client.write_message(Binary(to_vec(&rpc::RpcMessage::AccountInstances(account_instances))?))?;
// // get players
// // add to games
// tx.commit()?;
// return Ok(Subscriptions {
// account: Some(a.id),
// game: None,
// instance: None,
// })
// }
// Ok(Subscriptions {
// account: None,
// game: None,
// instance: None
// })
// }
// fn update(&mut self, msg: &RpcMessage) -> Result<&mut Subscriptions, Error> {
// match msg {
// RpcMessage::AccountState(a) => self.account = Some(a.id),
// RpcMessage::InstanceState(i) => self.instance = Some(i.id),
// RpcMessage::GameState(g) => self.game = Some(g.id),
// _ => (),
// };
// // info!("subscriptions updated {:?}", self);
// Ok(self)
// }
// }
// fn handle_message(subs: &Subscriptions, m: Message, ws: &mut Ws) {
// if let Some(msg) = match m {
// Message::Account(a) => {
// match subs.account {
// Some(wsa) => match wsa == a.id {
// true => Some(rpc::RpcMessage::AccountState(a)),
// false => None,
// },
// None => None,
// }
// },
// Message::Instance(i) => {
// match subs.instance {
// Some(ci) => match ci == i.id {
// true => Some(rpc::RpcMessage::InstanceState(i)),
// false => None,
// },
// None => None,
// }
// },
// Message::Game(g) => {
// match subs.game {
// Some(cg) => match cg == g.id {
// true => Some(rpc::RpcMessage::GameState(g)),
// false => None,
// },
// None => None,
// }
// },
// Message::Connect(tx) => {
// info!("client connected {:?}", tx);
// None
// },
// // _ => None,
// } {
// ws.client.write_message(Binary(to_vec(&msg).unwrap())).unwrap();
// }
// }

View File

@ -12,6 +12,7 @@ use chrono::prelude::*;
use chrono::Duration;
use account::Account;
use account;
use player::{Player, player_create};
use construct::{Construct, construct_get};
use mob::{bot_player, instance_mobs};
@ -96,10 +97,7 @@ pub struct Instance {
players: Vec<Player>,
rounds: Vec<Round>,
open: bool,
max_players: usize,
max_rounds: usize,
password: Option<String>,
time_control: TimeControl,
phase: InstancePhase,
@ -116,30 +114,9 @@ impl Instance {
players: vec![],
rounds: vec![],
phase: InstancePhase::Lobby,
open: true,
max_players: 2,
max_rounds: 5,
name: String::new(),
time_control: TimeControl::Standard,
password: None,
phase_start: Utc::now(),
phase_end: Some(TimeControl::Standard.lobby_timeout()),
winner: None,
}
}
pub fn global(player: Player) -> Instance {
Instance {
id: Uuid::nil(),
players: vec![player],
rounds: vec![],
phase: InstancePhase::InProgress,
open: false,
max_players: 0,
max_rounds: 5,
time_control: TimeControl::Standard,
name: "Global Matchmaking".to_string(),
password: None,
phase_start: Utc::now(),
phase_end: Some(TimeControl::Standard.lobby_timeout()),
winner: None,
@ -199,11 +176,6 @@ impl Instance {
self
}
fn set_max_rounds(mut self, rounds: usize) -> Instance {
self.max_rounds = rounds;
self
}
fn add_player(&mut self, player: Player) -> Result<&mut Instance, Error> {
if self.players.len() >= self.max_players {
return Err(err_msg("game full"))
@ -314,7 +286,6 @@ impl Instance {
fn start(&mut self) -> &mut Instance {
// self.players.sort_unstable_by_key(|p| p.id);
self.open = false;
self.next_round()
}
@ -358,11 +329,6 @@ impl Instance {
}
return false;
// boN
// self.players.iter()
// .any(|p| p.wins as usize >= self.max_rounds / 2 + 1)
// || self.rounds.len() == self.max_rounds
}
pub fn finish(&mut self) -> &mut Instance {
@ -531,13 +497,13 @@ pub fn instance_create(tx: &mut Transaction, instance: Instance) -> Result<Insta
let instance_bytes = to_vec(&instance)?;
let query = "
INSERT INTO instances (id, data, open)
INSERT INTO instances (id, data)
VALUES ($1, $2, $3)
RETURNING id;
";
let result = tx
.query(query, &[&instance.id, &instance_bytes, &instance.open])?;
.query(query, &[&instance.id, &instance_bytes])?;
result.iter().next().ok_or(format_err!("no instances written"))?;
@ -549,13 +515,13 @@ pub fn instance_update(tx: &mut Transaction, instance: Instance) -> Result<Insta
let query = "
UPDATE instances
SET data = $1, open = $2, finished = $3, upkeep = $4, updated_at = now()
WHERE id = $5
SET data = $1, finished = $2, upkeep = $3, updated_at = now()
WHERE id = $4
RETURNING id, data;
";
let result = tx
.query(query, &[&instance_bytes, &instance.open, &instance.finished(), &instance.phase_end, &instance.id])?;
.query(query, &[&instance_bytes, &instance.finished(), &instance.phase_end, &instance.id])?;
result.iter().next().ok_or(err_msg("no instance row returned"))?;
@ -697,60 +663,47 @@ pub fn instances_idle(tx: &mut Transaction) -> Result<Vec<Instance>, Error> {
}
pub fn instance_new(tx: &mut Transaction, account: &Account, construct_ids: Vec<Uuid>, name: String, pve: bool, _password: Option<String>) -> Result<Instance, Error> {
let mut instance = match pve {
true => {
let bot = bot_player();
let bot_id = bot.id;
pub fn instance_practice(tx: &mut Transaction, account: &Account) -> Result<Instance, Error> {
let bot = bot_player();
let bot_id = bot.id;
// generate bot imgs only in the real world
for c in bot.constructs.iter() {
img::molecular_write(c.img)?;
}
let mut instance = Instance::new()
.set_time_control(TimeControl::Practice)
.set_max_rounds(10)
.set_name(name)?;
instance.add_player(bot)?;
instance.player_ready(bot_id)?;
instance.open = false;
instance
},
false => Instance::new()
.set_name(name)?
};
instance = instance_create(tx, instance)?;
instance_join(tx, account, instance.id, construct_ids)
}
pub fn instance_join(tx: &mut Transaction, account: &Account, instance_id: Uuid, construct_ids: Vec<Uuid>) -> Result<Instance, Error> {
let mut instance = instance_get(tx, instance_id)?;
let constructs = construct_ids
.iter()
.map(|id| construct_get(tx, *id, account.id))
.collect::<Result<Vec<Construct>, Error>>()?;
if constructs.len() != 3 {
return Err(format_err!("incorrect player size. ({:})", 3));
// generate bot imgs for the client to see
for c in bot.constructs.iter() {
img::molecular_write(c.img)?;
}
if instance.players.len() >= instance.max_players {
return Err(err_msg("game is full"));
}
let mut instance = Instance::new()
.set_time_control(TimeControl::Practice)
.set_name(bot.name.clone())?;
let constructs = account::account_team(tx, account)?;
let player = player_create(tx, Player::new(account.id, &account.name, constructs), instance.id, account)?;
instance.add_player(player)?;
instance.add_player(bot)?;
instance.player_ready(bot_id)?;
instance_update(tx, instance)
instance_create(tx, instance)
}
// pub fn instance_queue(tx: &mut Transaction, a: &Account, b: &Account) -> Result<Instance, Error> {
// }
pub fn instance_pvp(tx: &mut Transaction, a: &Account, b: &Account) -> Result<Instance, Error> {
let mut instance = Instance::new()
// TODO generate nice game names
.set_name("PVP".to_string())?;
for account in [a, b].iter() {
let constructs = account::account_team(tx, account)?;
let player = player_create(tx, Player::new(account.id, &account.name, constructs), instance.id, account)?;
instance.add_player(player)?;
}
instance_create(tx, instance)
}
pub fn instance_ready(tx: &mut Transaction, account: &Account, instance_id: Uuid) -> Result<RpcMessage, Error> {
let mut instance = instance_get(tx, instance_id)?;
let player_id = instance.account_player(account.id)?.id;
@ -812,7 +765,6 @@ mod tests {
instance.player_ready(bot).unwrap();
assert_eq!(instance.phase, InstancePhase::Finished);
// assert!(instance.players.iter().any(|p| p.wins as usize == instance.max_rounds / 2 + 1));
}
#[test]

View File

@ -51,13 +51,9 @@ mod vbox;
mod warden;
mod websocket;
use std::thread::{sleep, spawn};
use std::time::{Duration};
use std::thread::{spawn};
use std::path::{Path};
use events::Events;
use warden::warden;
fn setup_logger() -> Result<(), fern::InitError> {
fern::Dispatch::new()
.format(|out, message, record| {
@ -70,6 +66,7 @@ fn setup_logger() -> Result<(), fern::InitError> {
))
})
.level_for("postgres", log::LevelFilter::Info)
.level_for("ws", log::LevelFilter::Warn)
.level_for("iron", log::LevelFilter::Info)
.level(log::LevelFilter::Info)
.chain(std::io::stdout())
@ -84,29 +81,24 @@ fn main() {
let pool = pg::create_pool();
let warden_pool = pool.clone();
spawn(move || {
loop {
let db_connection = warden_pool.get().expect("unable to get db connection");
if let Err(e) = warden(db_connection) {
info!("{:?}", e);
}
sleep(Duration::new(1, 0));
}
});
let warden = warden::Warden::new(pool.clone());
let warden_tx = warden.tx.clone();
let warden_tick_tx = warden.tx.clone();
let http_pool = pool.clone();
spawn(move || net::start(http_pool));
// create a clone of the tx so ws handler can tell events
// about connection status
// TODO store as an Arc<Mutex<Events>> and make cpuN threads
let mut events = events::Events::new(pool.clone());
let events = events::Events::new(warden_tx);
let ws_events_tx = events.tx.clone();
let pg_pool = pool.clone();
let pg_events = events.tx.clone();
spawn(move || pg::listen(pg_pool, pg_events));
let pg_events_tx = events.tx.clone();
spawn(move || net::start(http_pool));
spawn(move || warden.listen());
spawn(move || warden::upkeep_tick(warden_tick_tx));
spawn(move || pg::listen(pg_pool, pg_events_tx));
spawn(move || events.listen());
// the main thread becomes this ws listener

View File

@ -73,59 +73,6 @@ pub struct Mtx {
variant: MtxVariant,
}
pub fn apply(tx: &mut Transaction, account: &Account, variant: MtxVariant, construct_id: Uuid, name: String) -> Result<Vec<Construct>, Error> {
let mtx = select(tx, variant, account.id)?;
let mut construct = construct_select(tx, construct_id, account.id)?;
let cost = match mtx.variant {
MtxVariant::Rename => NEW_NAME_COST,
_ => NEW_IMAGE_COST,
};
account::debit(tx, account.id, cost)?;
construct = match mtx.variant {
MtxVariant::Rename => construct.new_name(name),
_ => construct.new_img(),
};
match mtx.variant {
MtxVariant::Invader => img::invader_write(construct.img)?,
MtxVariant::Molecular => img::molecular_write(construct.img)?,
_ => construct.img,
};
construct_write(tx, construct)?;
account::account_constructs(tx, account)
}
pub fn select(tx: &mut Transaction, variant: MtxVariant, account: Uuid) -> Result<Mtx, Error> {
let query = "
SELECT id, account, variant
FROM mtx
WHERE account = $1
AND variant = $2
FOR UPDATE;
";
let result = tx
.query(query, &[&account, &variant.to_sql()])?;
if let Some(row) = result.iter().next() {
let id: Uuid = row.get(0);
let account: Uuid = row.get(1);
let v_str: String = row.get(2);
let variant = MtxVariant::try_from(v_str)?;
Ok(Mtx { id, account, variant })
} else {
Err(format_err!("mtx not found account={:?} variant={:?}", account, variant))
}
}
pub fn new_construct(tx: &mut Transaction, account: &Account) -> Result<Construct, Error> {
account::debit(tx, account.id, NEW_CONSTRUCT_COST)?;
let construct = construct_spawn(tx, account.id, generate_name())?;
Ok(construct)
}
impl Mtx {
pub fn new(variant: MtxVariant, account: Uuid) -> Mtx {
match variant {
@ -192,6 +139,59 @@ impl Mtx {
// }
}
pub fn apply(tx: &mut Transaction, account: &Account, variant: MtxVariant, construct_id: Uuid, name: String) -> Result<Vec<Construct>, Error> {
let mtx = select(tx, variant, account.id)?;
let mut construct = construct_select(tx, construct_id, account.id)?;
let cost = match mtx.variant {
MtxVariant::Rename => NEW_NAME_COST,
_ => NEW_IMAGE_COST,
};
account::debit(tx, account.id, cost)?;
construct = match mtx.variant {
MtxVariant::Rename => construct.new_name(name),
_ => construct.new_img(),
};
match mtx.variant {
MtxVariant::Invader => img::invader_write(construct.img)?,
MtxVariant::Molecular => img::molecular_write(construct.img)?,
_ => construct.img,
};
construct_write(tx, construct)?;
account::account_constructs(tx, account)
}
pub fn select(tx: &mut Transaction, variant: MtxVariant, account: Uuid) -> Result<Mtx, Error> {
let query = "
SELECT id, account, variant
FROM mtx
WHERE account = $1
AND variant = $2
FOR UPDATE;
";
let result = tx
.query(query, &[&account, &variant.to_sql()])?;
if let Some(row) = result.iter().next() {
let id: Uuid = row.get(0);
let account: Uuid = row.get(1);
let v_str: String = row.get(2);
let variant = MtxVariant::try_from(v_str)?;
Ok(Mtx { id, account, variant })
} else {
Err(format_err!("mtx not found account={:?} variant={:?}", account, variant))
}
}
pub fn new_construct(tx: &mut Transaction, account: &Account) -> Result<Construct, Error> {
account::debit(tx, account.id, NEW_CONSTRUCT_COST)?;
let construct = construct_spawn(tx, account.id, generate_name(), false)?;
Ok(construct)
}
pub fn account_shop(tx: &mut Transaction, account: &Account) -> Result<Shop, Error> {
let query = "
SELECT id, variant

View File

@ -1,7 +1,6 @@
use std::net::TcpStream;
use std::time::{Instant};
use serde_cbor::{from_slice};
use uuid::Uuid;
use failure::Error;
@ -12,7 +11,7 @@ use construct::{Construct};
use game::{Game, game_state, game_skill, game_ready};
use account::{Account, account_constructs};
use skill::{Skill, dev_resolve, Resolutions};
use instance::{Instance, instance_state, instance_list, instance_new, instance_ready, instance_join};
use instance::{Instance, instance_state, instance_list, instance_practice, instance_ready, instance_pvp};
use vbox::{vbox_accept, vbox_apply, vbox_discard, vbox_combine, vbox_reclaim, vbox_unequip};
use item::{Item, ItemInfoCtr, item_info};
@ -45,7 +44,7 @@ enum RpcRequest {
DevResolve { a: Uuid, b: Uuid, skill: Skill },
MtxConstructApply { mtx: mtx::MtxVariant, construct_id: Uuid, name: String },
MtxConstructBuy { },
MtxConstructSpawn { },
MtxAccountApply { mtx: mtx::MtxVariant },
MtxBuy { mtx: mtx::MtxVariant },
@ -59,7 +58,8 @@ enum RpcRequest {
InstanceList {},
InstanceLobby { construct_ids: Vec<Uuid>, name: String, pve: bool, password: Option<String> },
InstanceJoin { instance_id: Uuid, construct_ids: Vec<Uuid> },
InstancePvp {},
InstancePractice {},
InstanceReady { instance_id: Uuid },
InstanceState { instance_id: Uuid },
@ -121,10 +121,10 @@ pub fn receive(data: Vec<u8>, db: &Db, begin: Instant, account: &Option<Account>
RpcRequest::InstanceList {} =>
Ok(RpcMessage::OpenInstances(instance_list(&mut tx)?)),
RpcRequest::InstanceLobby { construct_ids, name, pve, password } =>
Ok(RpcMessage::InstanceState(instance_new(&mut tx, account, construct_ids, name, pve, password)?)),
RpcRequest::InstanceJoin { instance_id, construct_ids } =>
Ok(RpcMessage::InstanceState(instance_join(&mut tx, account, instance_id, construct_ids)?)),
// RpcRequest::InstanceQueue {} =>
// Ok(RpcMessage::QueueState(instance_queue(&mut tx, account)?)),
RpcRequest::InstancePractice {} =>
Ok(RpcMessage::InstanceState(instance_practice(&mut tx, account)?)),
// these two can return GameState or InstanceState
RpcRequest::InstanceReady { instance_id } =>
@ -150,7 +150,7 @@ pub fn receive(data: Vec<u8>, db: &Db, begin: Instant, account: &Option<Account>
RpcRequest::VboxUnequip { instance_id, construct_id, target } =>
Ok(RpcMessage::InstanceState(vbox_unequip(&mut tx, account, instance_id, construct_id, target)?)),
RpcRequest::MtxConstructBuy {} =>
RpcRequest::MtxConstructSpawn {} =>
Ok(RpcMessage::ConstructSpawn(mtx::new_construct(&mut tx, account)?)),
RpcRequest::MtxConstructApply { mtx, construct_id, name } =>

View File

@ -1,10 +1,93 @@
use std::time::{Duration};
use uuid::Uuid;
use crossbeam_channel::{unbounded, tick, Sender, Receiver};
// Db Commons
use postgres::transaction::Transaction;
use failure::Error;
use game::{games_need_upkeep, game_update, game_write, game_delete};
use instance::{instances_need_upkeep, instances_idle, instance_update, instance_delete};
use pg::{Db};
use pg::{Db, PgPool};
type Id = usize;
type Pair = ((Id, Uuid), (Id, Uuid));
pub enum GameEvent {
Upkeep,
Finish(Uuid),
Match(Pair),
}
pub struct Warden {
pub tx: Sender<GameEvent>,
rx: Receiver<GameEvent>,
pool: PgPool,
}
impl Warden {
pub fn new(pool: PgPool) -> Warden {
let (tx, rx) = unbounded();
Warden {
tx,
rx,
pool,
}
}
pub fn listen(mut self) -> Result<(), Error> {
loop {
match self.rx.recv() {
Ok(m) => {
self.event(m)?;
},
// idk if this is a good idea
// possibly just log errors and continue...
Err(e) => {
return Err(format_err!("events error err={:?}", e));
},
};
}
}
fn event(&mut self, msg: GameEvent) -> Result<(), Error> {
match msg {
GameEvent::Upkeep => self.on_upkeep(),
GameEvent::Match(pair) => self.on_match(pair),
GameEvent::Finish(id) => {
info!("game finished id={:?}", id);
Ok(())
},
}
}
fn on_upkeep(&mut self) -> Result<(), Error> {
let db = self.pool.get()?;
fetch_games(db.transaction()?)?
.commit()?;
fetch_instances(db.transaction()?)?
.commit()?;
Ok(())
}
fn on_match(&mut self, pair: Pair) -> Result<(), Error> {
let db = self.pool.get()?;
let tx = db.transaction()?;
Ok(())
}
}
fn fetch_games(mut tx: Transaction) -> Result<Transaction, Error> {
let games = games_need_upkeep(&mut tx)?;
@ -39,12 +122,11 @@ fn fetch_instances(mut tx: Transaction) -> Result<Transaction, Error> {
Ok(tx)
}
pub fn warden(db: Db) -> Result<(), Error> {
fetch_games(db.transaction()?)?
.commit()?;
pub fn upkeep_tick(warden: Sender<GameEvent>) {
let ticker = tick(Duration::from_millis(1000));
fetch_instances(db.transaction()?)?
.commit()?;
Ok(())
loop {
ticker.recv().unwrap();
warden.send(GameEvent::Upkeep).unwrap();
}
}