use std::time::{Duration}; use uuid::Uuid; use crossbeam_channel::{tick, Sender, Receiver}; // Db Commons use postgres::transaction::Transaction; use failure::Error; use account; use game::{games_need_upkeep, game_update, game_write, game_delete}; use instance; use instance::{instances_need_upkeep, instances_idle, instance_update, instance_delete}; use pg::{Db, PgPool}; use events::{Event, EventsTx, PvpRequest}; use rpc::{RpcMessage}; type Id = usize; type Pair = (PvpRequest, PvpRequest); pub enum GameEvent { Upkeep, Finish(Uuid), Match(Pair), } pub struct Warden { pub tx: Sender, rx: Receiver, events: EventsTx, pool: PgPool, } impl Warden { pub fn new(tx: Sender, rx: Receiver, events: EventsTx, pool: PgPool) -> Warden { Warden { tx, rx, events, pool, } } pub fn listen(mut self) -> Result<(), Error> { loop { match self.rx.recv() { Ok(m) => { match self.event(m) { Ok(()) => (), // :) Err(e) => { warn!("err={:?}", e); } } }, Err(e) => { warn!("err={:?}", e); }, }; } } fn event(&mut self, msg: GameEvent) -> Result<(), Error> { match msg { GameEvent::Upkeep => self.on_upkeep(), GameEvent::Match(pair) => self.on_match(pair), GameEvent::Finish(id) => { info!("game finished id={:?}", id); Ok(()) }, } } fn on_upkeep(&mut self) -> Result<(), Error> { let db = self.pool.get()?; fetch_games(db.transaction()?)? .commit()?; fetch_instances(db.transaction()?)? .commit()?; Ok(()) } fn on_match(&mut self, pair: Pair) -> Result<(), Error> { info!("received pair={:?}", pair); let db = self.pool.get()?; let mut tx = db.transaction()?; let a = account::select(&db, pair.0.account)?; let b = account::select(&db, pair.1.account)?; let instance = instance::pvp(&mut tx, &a, &b)?; tx.commit()?; // subscribe users to instance events self.events.send(Event::Subscribe(pair.0.id, instance.id))?; self.events.send(Event::Subscribe(pair.1.id, instance.id))?; // send them the new instance state let msg = RpcMessage::InstanceState(instance); pair.0.tx.send(msg.clone())?; pair.1.tx.send(msg)?; Ok(()) } } fn fetch_games(mut tx: Transaction) -> Result { let games = games_need_upkeep(&mut tx)?; for mut game in games { let game = game.upkeep(); match game_update(&mut tx, &game) { Ok(_) => (), Err(e) => { info!("{:?}", e); game_delete(&mut tx, game.id)?; } } } Ok(tx) } fn fetch_instances(mut tx: Transaction) -> Result { for mut instance in instances_need_upkeep(&mut tx)? { let (instance, new_game) = instance.upkeep(); if let Some(game) = new_game { game_write(&mut tx, &game)?; instance_update(&mut tx, instance)?; // ensures cleanup for forfeits etc is done game_update(&mut tx, &game)?; } else { instance_update(&mut tx, instance)?; } } for mut instance in instances_idle(&mut tx)? { instance_delete(&mut tx, instance.id)?; } Ok(tx) } pub fn upkeep_tick(warden: Sender) { let ticker = tick(Duration::from_millis(1000)); loop { ticker.recv().unwrap(); warden.send(GameEvent::Upkeep).unwrap(); } }