From 95c463757df398b9895c9869ee3ed9c332a47b69 Mon Sep 17 00:00:00 2001 From: ntr Date: Sat, 27 Apr 2019 22:54:16 +1000 Subject: [PATCH] warden wip --- server/WORKLOG.md | 25 +++++++++++++----- server/src/game.rs | 47 ++++++++++++++++++++++++++++++++++ server/src/instance.rs | 18 ++----------- server/src/main.rs | 1 + server/src/net.rs | 7 +++++- server/src/player.rs | 3 ++- server/src/rpc.rs | 14 +---------- server/src/warden.rs | 57 ++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 135 insertions(+), 37 deletions(-) create mode 100644 server/src/warden.rs diff --git a/server/WORKLOG.md b/server/WORKLOG.md index 22cece8f..8cda1ec5 100644 --- a/server/WORKLOG.md +++ b/server/WORKLOG.md @@ -16,14 +16,22 @@ # WORK WORK ## NOW -*INSTANCES* +*WARDEN* + +* games +check updated timestamps + once a second? +add a timestamp to each team + after 30s issue warning (client) + after 1m automove + increment warnings + after 3 warnings forfeit + +* instances + add timestamp to each player + after 60s force ready -lobby opens - add player - add player - players ready - on start -> vbox *CLIENT* * general @@ -76,6 +84,11 @@ make strike *really* hit first / resolve at same time? ## LATER * redis for game events + +* store instances / games in redis? + * not sure hwo to get sets of player games + * set joined_games_$account [game_id] + * chat * notifications * elo + leaderboards diff --git a/server/src/game.rs b/server/src/game.rs index 3bfca93b..90e43348 100644 --- a/server/src/game.rs +++ b/server/src/game.rs @@ -681,6 +681,7 @@ pub fn game_get(tx: &mut Transaction, id: Uuid) -> Result { SELECT * FROM games WHERE id = $1 + FOR UPDATE; "; let result = tx @@ -698,6 +699,52 @@ pub fn game_get(tx: &mut Transaction, id: Uuid) -> Result { return Ok(game); } +pub fn games_afk(tx: &mut Transaction) -> Result, Error> { + let query = " + SELECT data, id + FROM games + WHERE updated_at < now() - interval '5 seconds'; + "; + + let result = tx + .query(query, &[])?; + + let mut list = vec![]; + + for row in result.into_iter() { + let bytes: Vec = row.get(0); + let id = row.get(1); + + match from_slice::(&bytes) { + Ok(i) => list.push(i), + Err(_e) => { + game_delete(tx, id)?; + } + }; + } + + return Ok(list); +} + +pub fn game_delete(tx: &mut Transaction, id: Uuid) -> Result<(), Error> { + let query = " + DELETE + FROM games + WHERE id = $1; + "; + + let result = tx + .execute(query, &[&id])?; + + if result != 1 { + return Err(format_err!("unable to delete player {:?}", id)); + } + + println!("game deleted {:?}", id); + + return Ok(()); +} + pub fn game_global_startup(tx: &mut Transaction) -> Result<(), Error> { if game_global_get(tx).is_ok() { println!("global mm game exists"); diff --git a/server/src/instance.rs b/server/src/instance.rs index 8c97d6a7..31189ceb 100644 --- a/server/src/instance.rs +++ b/server/src/instance.rs @@ -389,16 +389,6 @@ impl Instance { } } - fn scores(&self) -> Vec<(String, Score)> { - let mut scores = self.players.iter() - .map(|p| (p.name.clone(), p.score)) - .collect::>(); - scores.sort_unstable_by_key(|s| s.1.wins); - scores.reverse(); - - scores - } - // PLAYER ACTIONS fn account_player(&mut self, account: Uuid) -> Result<&mut Player, Error> { self.players @@ -485,7 +475,8 @@ pub fn instance_get(tx: &mut Transaction, instance_id: Uuid) -> Result Result, Error> { - let scores = instance_get(tx, params.instance_id)?.scores(); - Ok(scores) -} - pub fn instance_ready(params: InstanceReadyParams, tx: &mut Transaction, account: &Account) -> Result { let mut instance = instance_get(tx, params.instance_id)?; let player_id = instance.account_player(account.id)?.id; diff --git a/server/src/main.rs b/server/src/main.rs index 8fa4a8b1..ba798350 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -33,6 +33,7 @@ mod player; mod mob; mod util; mod vbox; +mod warden; use dotenv::dotenv; use net::{start}; diff --git a/server/src/net.rs b/server/src/net.rs index 862ab83d..7ca1cc94 100644 --- a/server/src/net.rs +++ b/server/src/net.rs @@ -19,6 +19,7 @@ pub type Db = PooledConnection; use rpc::{Rpc}; use util::{startup}; +use warden::{warden}; // struct Server { // client: WebSocket, @@ -69,9 +70,13 @@ pub fn start() { } let server = TcpListener::bind("0.0.0.0:40000").unwrap(); + + let warden_pool = pool.clone(); + spawn(move || warden(warden_pool)); + for stream in server.incoming() { let db = pool.clone(); - spawn (move || { + spawn(move || { let mut websocket = accept(stream.unwrap()).unwrap(); let rpc = Rpc {}; diff --git a/server/src/player.rs b/server/src/player.rs index c5edeefd..4e278e0a 100644 --- a/server/src/player.rs +++ b/server/src/player.rs @@ -266,7 +266,8 @@ pub fn player_get(tx: &mut Transaction, account_id: Uuid, instance_id: Uuid) -> SELECT * FROM players WHERE account = $1 - AND instance = $2; + AND instance = $2 + FOR UPDATE; "; let result = tx diff --git a/server/src/rpc.rs b/server/src/rpc.rs index 357a1cf3..d802c936 100644 --- a/server/src/rpc.rs +++ b/server/src/rpc.rs @@ -22,7 +22,7 @@ use skill::{Skill}; // use zone::{Zone, zone_create, zone_join, zone_close}; use spec::{Spec}; use player::{Score, player_mm_cryps_set, Player}; -use instance::{Instance, instance_state, instance_new, instance_ready, instance_join, instance_scores}; +use instance::{Instance, instance_state, instance_new, instance_ready, instance_join}; use vbox::{Var, vbox_accept, vbox_apply, vbox_discard, vbox_combine, vbox_reclaim, vbox_unequip}; pub struct Rpc; @@ -75,7 +75,6 @@ impl Rpc { "instance_join" => Rpc::instance_join(data, &mut tx, account.unwrap(), client), "instance_ready" => Rpc::instance_ready(data, &mut tx, account.unwrap(), client), "instance_new" => Rpc::instance_new(data, &mut tx, account.unwrap(), client), - "instance_scores" => Rpc::instance_scores(data, &mut tx, account.unwrap(), client), "instance_state" => Rpc::instance_state(data, &mut tx, account.unwrap(), client), "player_mm_cryps_set" => Rpc::player_mm_cryps_set(data, &mut tx, account.unwrap(), client), @@ -261,17 +260,6 @@ impl Rpc { // return Ok(response); // } - fn instance_scores(data: Vec, tx: &mut Transaction, account: Account, _client: &mut WebSocket) -> Result { - let msg = from_slice::(&data).or(Err(err_msg("invalid params")))?; - - let response = RpcResponse { - method: "instance_scores".to_string(), - params: RpcResult::InstanceScores(instance_scores(msg.params, tx, &account)?) - }; - - return Ok(response); - } - fn instance_state(data: Vec, tx: &mut Transaction, account: Account, _client: &mut WebSocket) -> Result { let msg = from_slice::(&data).or(Err(err_msg("invalid params")))?; match instance_state(msg.params, tx, &account)? { diff --git a/server/src/warden.rs b/server/src/warden.rs new file mode 100644 index 00000000..a77f8263 --- /dev/null +++ b/server/src/warden.rs @@ -0,0 +1,57 @@ +use std::time::{Duration}; +use std::thread::sleep; + +// Db Commons +use postgres::transaction::Transaction; +use failure::Error; +use failure::err_msg; + +use r2d2::{Pool}; +use r2d2_postgres::{PostgresConnectionManager}; + +use game::{Game, games_afk, game_write}; +use instance::{Instance, instances_afk, instance_write}; + +fn handle_afk_game(game: Game) -> Game { + game +} + +fn fetch_games(mut tx: Transaction) -> Result { + let games = games_afk(&mut tx)?; + + for mut game in games { + game = handle_afk_game(game); + game_write(&game, &mut tx)?; + } + + Ok(tx) +} + +fn handle_afk_instance(instance: Instance) -> Instance { + instance +} + +fn fetch_instances(mut tx: Transaction) -> Result { + let instances = instances_afk(&mut tx)?; + + for mut instance in instances { + instance = handle_afk_instance(instance); + instance_write(&instance, &mut tx)?; + } + + Ok(tx) +} + +pub fn warden(pool: Pool) -> Result<(), Error> { + loop { + let db_connection = pool.get().expect("unable to get db connection"); + + fetch_games(db_connection.transaction()?)? + .commit()?; + + fetch_instances(db_connection.transaction()?) + .commit()?; + + sleep(Duration::new(30, 0)); + } +} \ No newline at end of file