mnml/server/src/warden.rs
2019-09-04 16:38:03 +10:00

156 lines
3.7 KiB
Rust

use std::time::{Duration};
use uuid::Uuid;
use crossbeam_channel::{tick, Sender, Receiver};
// Db Commons
use postgres::transaction::Transaction;
use failure::Error;
use account;
use game::{games_need_upkeep, game_update, game_write, game_delete};
use instance;
use instance::{instances_need_upkeep, instances_idle, instance_update, instance_delete};
use pg::{Db, PgPool};
use events::{Event, EventsTx, PvpRequest};
use rpc::{RpcMessage};
type Id = usize;
type Pair = (PvpRequest, PvpRequest);
pub enum GameEvent {
Upkeep,
Finish(Uuid),
Match(Pair),
}
pub struct Warden {
pub tx: Sender<GameEvent>,
rx: Receiver<GameEvent>,
events: EventsTx,
pool: PgPool,
}
impl Warden {
pub fn new(tx: Sender<GameEvent>, rx: Receiver<GameEvent>, events: EventsTx, pool: PgPool) -> Warden {
Warden {
tx,
rx,
events,
pool,
}
}
pub fn listen(mut self) -> Result<(), Error> {
loop {
match self.rx.recv() {
Ok(m) => {
match self.event(m) {
Ok(()) => (), // :)
Err(e) => {
warn!("err={:?}", e);
}
}
},
Err(e) => {
warn!("err={:?}", e);
},
};
}
}
fn event(&mut self, msg: GameEvent) -> Result<(), Error> {
match msg {
GameEvent::Upkeep => self.on_upkeep(),
GameEvent::Match(pair) => self.on_match(pair),
GameEvent::Finish(id) => {
info!("game finished id={:?}", id);
Ok(())
},
}
}
fn on_upkeep(&mut self) -> Result<(), Error> {
let db = self.pool.get()?;
fetch_games(db.transaction()?)?
.commit()?;
fetch_instances(db.transaction()?)?
.commit()?;
Ok(())
}
fn on_match(&mut self, pair: Pair) -> Result<(), Error> {
info!("received pair={:?}", pair);
let db = self.pool.get()?;
let mut tx = db.transaction()?;
let a = account::select(&db, pair.0.account)?;
let b = account::select(&db, pair.1.account)?;
let instance = instance::pvp(&mut tx, &a, &b)?;
tx.commit()?;
// subscribe users to instance events
self.events.send(Event::Subscribe(pair.0.id, instance.id))?;
self.events.send(Event::Subscribe(pair.1.id, instance.id))?;
// send them the new instance state
let msg = RpcMessage::InstanceState(instance);
pair.0.tx.send(msg.clone())?;
pair.1.tx.send(msg)?;
Ok(())
}
}
fn fetch_games(mut tx: Transaction) -> Result<Transaction, Error> {
let games = games_need_upkeep(&mut tx)?;
for mut game in games {
let game = game.upkeep();
match game_update(&mut tx, &game) {
Ok(_) => (),
Err(e) => {
info!("{:?}", e);
game_delete(&mut tx, game.id)?;
}
}
}
Ok(tx)
}
fn fetch_instances(mut tx: Transaction) -> Result<Transaction, Error> {
for mut instance in instances_need_upkeep(&mut tx)? {
let (instance, new_games) = instance.upkeep();
for game in new_games {
game_write(&mut tx, &game)?;
}
instance_update(&mut tx, instance)?;
}
for mut instance in instances_idle(&mut tx)? {
instance_delete(&mut tx, instance.id)?;
}
Ok(tx)
}
pub fn upkeep_tick(warden: Sender<GameEvent>) {
let ticker = tick(Duration::from_millis(1000));
loop {
ticker.recv().unwrap();
warden.send(GameEvent::Upkeep).unwrap();
}
}