mnml/server/src/pg.rs

877 lines
23 KiB
Rust

use std::fs::File;
use std::env;
use std::thread::spawn;
use uuid::Uuid;
use failure::err_msg;
use failure::Error;
use r2d2::{Pool, PooledConnection};
use r2d2_postgres::{TlsMode, PostgresConnectionManager};
use fallible_iterator::{FallibleIterator};
use postgres::transaction::Transaction;
use crossbeam_channel::{Sender};
use serde_cbor::{from_slice, to_vec};
use mnml_core::construct::{Construct, ConstructSkeleton};
use mnml_core::game::{Game, Phase};
use mnml_core::player::Player;
use mnml_core::mob::instance_mobs;
use mnml_core::vbox::{ItemType, VboxIndices};
use mnml_core::item::Item;
use mnml_core::skill::Skill;
use mnml_core::mob::{bot_player, anon_player};
use mnml_core::instance::{Instance, TimeControl};
use events::{Event};
use rpc::RpcMessage;
use account;
use account::{Account};
use img;
pub type Db = PooledConnection<PostgresConnectionManager>;
pub type PgPool = Pool<PostgresConnectionManager>;
const DB_POOL_SIZE: u32 = 20;
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
#[serde(rename_all(deserialize = "lowercase"))]
enum Table {
Accounts,
Constructs,
Instances,
Mtx,
Players,
Games,
}
#[derive(Debug,Copy,Clone,PartialEq,Serialize,Deserialize)]
#[serde(rename_all(deserialize = "UPPERCASE"))]
enum Action {
Insert,
Update,
Delete,
}
#[derive(Debug,Clone,Serialize,Deserialize)]
struct Notification {
table: Table,
action: Action,
id: Uuid,
}
pub fn create_pool() -> Pool<PostgresConnectionManager> {
let url = env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
let manager = PostgresConnectionManager::new(url, TlsMode::None)
.expect("could not instantiate pg manager");
Pool::builder()
.max_size(DB_POOL_SIZE)
.build(manager)
.expect("Failed to create pool.")
}
fn handle_notification(n: Notification, pool: &PgPool, events: &Sender<Event>) {
info!("pg received notification={:?}", n);
// bang out a thread to do the slow work of fetching the state from db
// the thread will notify events
let pool = pool.clone();
let events = events.clone();
spawn(move || {
// maybe we need it
let db = pool.get().unwrap();
let mut tx = db.transaction().unwrap();
let msg = match n.action {
Action::Delete => {
// warn!("unimplemented delete notification {:?}", n);
None
},
Action::Insert => {
// warn!("unimplemented insert notification {:?}", n);
None
},
Action::Update => match n.table {
Table::Accounts =>
Some(Event::Push(n.id, RpcMessage::AccountState(account::select(&db, n.id).unwrap()))),
Table::Instances =>
Some(Event::Push(n.id, instance_state(&mut tx, n.id).unwrap())),
Table::Games =>
Some(Event::Push(n.id, RpcMessage::GameState(game_get(&mut tx, n.id).unwrap()))),
_ => {
// warn!("unimplemented update notification {:?}", n);
None
},
},
};
tx.commit().unwrap();
if let Some(msg) = msg {
events.send(msg).unwrap();
}
});
}
// this function gets a dedicated connection
// because it has to subscribe and listen for notifications
pub fn listen(pool: PgPool, events: Sender<Event>) -> Result<(), Error> {
let db = pool.get()?;
db.execute("LISTEN events;", &[])?;
info!("pg listening");
let notifications = db.notifications();
let mut n_iter = notifications.blocking_iter();
// main event loop, checks pg and checks messages
loop {
// check notifications
let n = n_iter.next()?;
if let Some(n) = n {
match serde_json::from_str::<Notification>(&n.payload) {
Ok(notification) => handle_notification(notification, &pool, &events),
Err(e) => warn!("could not deserialize notification payload={:?} err={:?}", n.payload, e),
};
}
}
}
pub fn _construct_delete(tx: &mut Transaction, id: Uuid, account_id: Uuid) -> Result<(), Error> {
let query = "
DELETE
FROM constructs
WHERE id = $1
and account = $2;
";
let result = tx
.execute(query, &[&id, &account_id])?;
if result != 1 {
return Err(format_err!("unable to delete construct {:?}", id));
}
info!("construct deleted {:?}", id);
return Ok(());
}
pub fn _construct_get(tx: &mut Transaction, id: Uuid, account_id: Uuid) -> Result<Construct, Error> {
let query = "
SELECT data
FROM constructs
WHERE id = $1
AND account = $2;
";
let result = tx
.query(query, &[&id, &account_id])?;
let result = result.iter().next().ok_or(format_err!("construct {:} not found", id))?;
let construct_bytes: Vec<u8> = result.get(0);
let skeleton = from_slice::<ConstructSkeleton>(&construct_bytes)?;
return Ok(Construct::from_skeleton(&skeleton));
}
pub fn construct_select(tx: &mut Transaction, id: Uuid, account_id: Uuid) -> Result<Construct, Error> {
let query = "
SELECT data
FROM constructs
WHERE id = $1
AND account = $2
FOR UPDATE;
";
let result = tx
.query(query, &[&id, &account_id])?;
let result = result.iter().next().ok_or(format_err!("construct {:} not found", id))?;
let construct_bytes: Vec<u8> = result.get(0);
let skeleton = from_slice::<ConstructSkeleton>(&construct_bytes)?;
return Ok(Construct::from_skeleton(&skeleton));
}
pub fn construct_spawn(tx: &mut Transaction, account: Uuid, name: String, team: bool) -> Result<Construct, Error> {
let construct = Construct::new()
.named(&name)
.set_account(account);
let construct_bytes = to_vec(&construct)?;
let query = "
INSERT INTO constructs (id, account, data, team)
VALUES ($1, $2, $3, $4)
RETURNING id, account;
";
let result = tx
.query(query, &[&construct.id, &account, &construct_bytes, &team])?;
let _returned = result.iter().next().ok_or(err_msg("no row returned"))?;
img::shapes_write(construct.img)?;
info!("spawned construct account={:} name={:?}", account, construct.name);
return Ok(construct);
}
pub fn construct_write(tx: &mut Transaction, construct: Construct) -> Result<Construct, Error> {
let construct_bytes = to_vec(&construct.to_skeleton())?;
let query = "
UPDATE constructs
SET data = $1, updated_at = now()
WHERE id = $2
RETURNING id, account, data;
";
let result = tx
.query(query, &[&construct_bytes, &construct.id])?;
let _returned = result.iter().next().expect("no row returned");
// info!("{:?} wrote construct", construct.id);
return Ok(construct);
}
pub fn game_write(tx: &mut Transaction, game: &Game) -> Result<(), Error> {
let game_bytes = to_vec(&game)?;
let query = "
INSERT INTO games (id, data, upkeep)
VALUES ($1, $2, $3)
RETURNING id;
";
// no games should be sent to db that are not in progress
let result = tx
.query(query, &[&game.id, &game_bytes, &game.phase_end])?;
result.iter().next().ok_or(format_err!("no game written"))?;
// info!("{:} wrote game", game.id);
return Ok(());
}
pub fn game_state(tx: &mut Transaction, account: &Account, id: Uuid) -> Result<Game, Error> {
Ok(game_get(tx, id)?.redact(account.id))
}
pub fn game_get(tx: &mut Transaction, id: Uuid) -> Result<Game, Error> {
let query = "
SELECT *
FROM games
WHERE id = $1
FOR UPDATE;
";
let result = tx
.query(query, &[&id])?;
let returned = match result.iter().next() {
Some(row) => row,
None => return Err(err_msg("game not found")),
};
// tells from_slice to cast into a construct
let game_bytes: Vec<u8> = returned.get("data");
let game = from_slice::<Game>(&game_bytes)?;
return Ok(game);
}
pub fn _game_select(db: &Db, id: Uuid) -> Result<Game, Error> {
let query = "
SELECT *
FROM games
WHERE id = $1;
";
let result = db
.query(query, &[&id])?;
let returned = match result.iter().next() {
Some(row) => row,
None => return Err(err_msg("game not found")),
};
// tells from_slice to cast into a construct
let game_bytes: Vec<u8> = returned.get("data");
let game = from_slice::<Game>(&game_bytes)?;
return Ok(game);
}
pub fn _game_list(db: &Db, number: u32) -> Result<Vec<Game>, Error> {
let query = "
SELECT data
FROM games
ORDER BY created_at
LIMIT $1;
";
let result = db
.query(query, &[&number])?;
let mut list = vec![];
for row in result.into_iter() {
let bytes: Vec<u8> = row.get(0);
match from_slice::<Game>(&bytes) {
Ok(i) => list.push(i),
Err(e) => {
warn!("{:?}", e);
}
};
}
return Ok(list);
}
pub fn games_need_upkeep(tx: &mut Transaction) -> Result<Vec<Game>, Error> {
let query = "
SELECT data, id
FROM games
WHERE finished = false
AND upkeep < now()
FOR UPDATE;
";
let result = tx
.query(query, &[])?;
let mut list = vec![];
for row in result.into_iter() {
let bytes: Vec<u8> = row.get(0);
let id = row.get(1);
match from_slice::<Game>(&bytes) {
Ok(i) => list.push(i),
Err(_e) => {
game_delete(tx, id)?;
}
};
}
return Ok(list);
}
pub fn game_delete(tx: &mut Transaction, id: Uuid) -> Result<(), Error> {
let query = "
DELETE
FROM games
WHERE id = $1;
";
let result = tx
.execute(query, &[&id])?;
if result != 1 {
return Err(format_err!("unable to delete player {:?}", id));
}
info!("game deleted {:?}", id);
return Ok(());
}
pub fn game_update(tx: &mut Transaction, game: &Game) -> Result<(), Error> {
let game_bytes = to_vec(&game)?;
let query = "
UPDATE games
SET data = $1, finished = $2, upkeep = $3, updated_at = now()
WHERE id = $4
RETURNING id, data;
";
let result = tx
.query(query, &[&game_bytes, &game.finished(), &game.phase_end, &game.id])?;
result.iter().next().ok_or(format_err!("game {:?} could not be written", game))?;
if game.finished() {
info!("finished id={:?}", game.id);
match game_json_file_write(&game) {
Ok(dest) => info!("wrote dest={:?}", dest),
Err(e) => error!("json write error={:?}", e),
};
if let Some(i) = game.instance {
instance_game_finished(tx, &game, i)?;
}
}
return Ok(());
}
fn game_json_file_write(g: &Game) -> Result<String, Error> {
let dest = format!("/var/lib/mnml/data/games/{}.mnml.game.json", g.id);
serde_json::to_writer(File::create(&dest)?, g)?;
Ok(dest)
}
pub fn game_skill(tx: &mut Transaction, account: &Account, game_id: Uuid, construct_id: Uuid, target_construct_id: Uuid, skill: Skill) -> Result<Game, Error> {
let mut game = game_get(tx, game_id)?;
game.add_skill(account.id, construct_id, target_construct_id, skill)?;
if game.skill_phase_finished() {
game = game.resolve_phase_start();
}
game_update(tx, &game)?;
Ok(game)
}
pub fn game_offer_draw(tx: &mut Transaction, account: &Account, game_id: Uuid) -> Result<Game, Error> {
let game = game_get(tx, game_id)?
.offer_draw(account.id)?;
game_update(tx, &game)?;
Ok(game)
}
pub fn game_concede(tx: &mut Transaction, account: &Account, game_id: Uuid) -> Result<Game, Error> {
let game = game_get(tx, game_id)?
.concede(account.id)?;
game_update(tx, &game)?;
Ok(game)
}
pub fn game_skill_clear(tx: &mut Transaction, account: &Account, game_id: Uuid) -> Result<Game, Error> {
let mut game = game_get(tx, game_id)?;
game.clear_skill(account.id)?;
game_update(tx, &game)?;
Ok(game)
}
pub fn game_ready(tx: &mut Transaction, account: &Account, id: Uuid) -> Result<Game, Error> {
let mut game = game_get(tx, id)?;
game.player_ready(account.id)?;
if game.skill_phase_finished() {
game = game.resolve_phase_start();
}
game_update(tx, &game)?;
Ok(game)
}
pub fn instance_create(tx: &mut Transaction, instance: Instance) -> Result<Instance, Error> {
let instance_bytes = to_vec(&instance)?;
let query = "
INSERT INTO instances (id, data, upkeep)
VALUES ($1, $2, $3)
RETURNING id;
";
let result = tx
.query(query, &[&instance.id, &instance_bytes, &instance.phase_end])?;
result.iter().next().ok_or(format_err!("no instances written"))?;
return Ok(instance);
}
pub fn instance_update(tx: &mut Transaction, instance: Instance) -> Result<Instance, Error> {
let instance_bytes = to_vec(&instance)?;
let query = "
UPDATE instances
SET data = $1, finished = $2, upkeep = $3, updated_at = now()
WHERE id = $4
RETURNING id, data;
";
let result = tx
.query(query, &[&instance_bytes, &instance.finished(), &instance.phase_end, &instance.id])?;
result.iter().next().ok_or(err_msg("no instance row returned"))?;
trace!("{:?} wrote instance", instance.id);
if instance.finished() {
info!("finished id={:?}", instance.id);
match instance_json_file_write(&instance) {
Ok(dest) => info!("wrote dest={:?}", dest),
Err(e) => error!("json write error={:?}", e),
};
}
return Ok(instance);
}
fn instance_json_file_write(g: &Instance) -> Result<String, Error> {
let dest = format!("/var/lib/mnml/data/instances/{}.mnml.instance.json", g.id);
serde_json::to_writer(File::create(&dest)?, g)?;
Ok(dest)
}
pub fn instance_get(tx: &mut Transaction, instance_id: Uuid) -> Result<Instance, Error> {
let query = "
SELECT *
FROM instances
WHERE id = $1
FOR UPDATE;
";
let result = tx
.query(query, &[&instance_id])?;
let returned = match result.iter().next() {
Some(row) => row,
None => return Err(err_msg("instance not found")),
};
let instance_bytes: Vec<u8> = returned.get("data");
let instance = from_slice::<Instance>(&instance_bytes)?;
return Ok(instance);
}
pub fn instance_delete(tx: &mut Transaction, id: Uuid) -> Result<(), Error> {
let query = "
DELETE
FROM instances
WHERE id = $1;
";
let result = tx
.execute(query, &[&id])?;
if result != 1 {
return Err(format_err!("unable to delete instance {:?}", id));
}
info!("instance deleted {:?}", id);
return Ok(());
}
pub fn _instance_list(tx: &mut Transaction) -> Result<Vec<Instance>, Error> {
let query = "
SELECT data, id
FROM instances
AND finished = false;
";
let result = tx
.query(query, &[])?;
let mut list = vec![];
for row in result.into_iter() {
let bytes: Vec<u8> = row.get(0);
let id = row.get(1);
match from_slice::<Instance>(&bytes) {
Ok(i) => list.push(i),
Err(_e) => {
instance_delete(tx, id)?;
}
};
}
return Ok(list);
}
pub fn instances_need_upkeep(tx: &mut Transaction) -> Result<Vec<Instance>, Error> {
let query = "
SELECT data, id
FROM instances
WHERE finished = false
AND upkeep < now()
FOR UPDATE;
";
let result = tx
.query(query, &[])?;
let mut list = vec![];
for row in result.into_iter() {
let bytes: Vec<u8> = row.get(0);
let id = row.get(1);
match from_slice::<Instance>(&bytes) {
Ok(i) => list.push(i),
Err(_e) => {
instance_delete(tx, id)?;
}
};
}
return Ok(list);
}
// timed out instances with no time control
pub fn instances_idle(tx: &mut Transaction) -> Result<Vec<Instance>, Error> {
let query = "
SELECT data, id
FROM instances
WHERE finished = false
AND updated_at < now() - interval '1 hour'
FOR UPDATE;
";
let result = tx
.query(query, &[])?;
let mut list = vec![];
for row in result.into_iter() {
let bytes: Vec<u8> = row.get(0);
let id = row.get(1);
match from_slice::<Instance>(&bytes) {
Ok(i) => list.push(i),
Err(_e) => {
instance_delete(tx, id)?;
}
};
}
return Ok(list);
}
pub fn instance_practice(tx: &mut Transaction, account: &Account) -> Result<Instance, Error> {
let bot = bot_player();
let bot_id = bot.id;
// generate bot imgs for the client to see
for c in bot.constructs.iter() {
img::shapes_write(c.img)?;
}
let mut instance = Instance::new()
.set_time_control(TimeControl::Practice)
.set_name(bot.name.clone())?;
let player = account.to_player(tx)?;
instance.add_player(player.clone())?;
instance.add_player(bot)?;
instance.player_ready(bot_id)?;
// skip faceoff
instance.player_ready(player.id)?;
instance = instance_create(tx, instance)?;
player_create(tx, player, instance.id, account)?;
Ok(instance)
}
pub fn instance_demo(account: &Account) -> Result<Instance, Error> {
let mut bot = bot_player();
let bot_id = bot.id;
// generate imgs for the client to see
for c in bot.constructs.iter_mut() {
// smash these nubs
c.green_life.force(64);
c.red_life.force(0);
c.blue_life.force(0);
img::shapes_write(c.img)?;
}
let mut instance = Instance::new()
.set_time_control(TimeControl::Practice)
.set_name(bot.name.clone())?;
let player = anon_player(account.id);
// smash these noobs
for c in player.constructs.iter() {
img::shapes_write(c.img)?;
}
instance.add_player(player.clone())?;
instance.add_player(bot)?;
instance.player_ready(bot_id)?;
// skip faceoff
instance.player_ready(player.id)?;
Ok(instance)
}
pub fn pvp(tx: &mut Transaction, a: &Account, b: &Account) -> Result<Instance, Error> {
let mut instance = Instance::new()
// TODO generate nice game names
.set_name("PVP".to_string())?;
instance = instance_create(tx, instance)?;
for account in [a, b].iter() {
let acc_p = account.to_player(tx)?;
let player = player_create(tx, acc_p, instance.id, account)?;
instance.add_player(player)?;
}
instance_update(tx, instance)
}
pub fn player_create(tx: &mut Transaction, player: Player, instance: Uuid, account: &Account) -> Result<Player, Error> {
let query = "
INSERT INTO players (id, instance, account)
VALUES ($1, $2, $3)
RETURNING id, account;
";
let result = tx
.query(query, &[&Uuid::new_v4(), &instance, &account.id])?;
let _returned = result.iter().next().expect("no row written");
info!("wrote player {:} joined instance: {:}", account.name, instance);
return Ok(player);
}
pub fn instance_abandon(tx: &mut Transaction, account: &Account, instance_id: Uuid) -> Result<RpcMessage, Error> {
let mut instance = instance_get(tx, instance_id)?;
if let Some(game_id) = instance.current_game_id() {
let mut game = game_get(tx, game_id)?;
game.player_by_id(account.id)?.forfeit();
game = game.start(); // actually finishes it...
game_update(tx, &game)?;
}
instance.account_player(account.id)?.set_lose();
instance.account_opponent(account.id)?.set_win();
instance.next_round();
Ok(RpcMessage::InstanceState(instance_update(tx, instance)?))
}
pub fn instance_ready(tx: &mut Transaction, account: &Account, instance_id: Uuid) -> Result<RpcMessage, Error> {
let mut instance = instance_get(tx, instance_id)?;
let player_id = instance.account_player(account.id)?.id;
if let Some(game) = instance.player_ready(player_id)? {
game_write(tx, &game)?;
// ensures cleanup for warden etc is done
game_update(tx, &game)?;
instance_update(tx, instance)?;
return Ok(RpcMessage::GameState(game));
}
Ok(RpcMessage::InstanceState(instance_update(tx, instance)?))
}
pub fn instance_state(tx: &mut Transaction, instance_id: Uuid) -> Result<RpcMessage, Error> {
let instance = instance_get(tx, instance_id)?;
if let Some(game_id) = instance.current_game_id() {
let game = game_get(tx, game_id)?;
// return the game until it's finished
if game.phase != Phase::Finished {
return Ok(RpcMessage::GameState(game))
}
}
Ok(RpcMessage::InstanceState(instance))
}
pub fn instance_game_finished(tx: &mut Transaction, game: &Game, instance_id: Uuid) -> Result<(), Error> {
let mut instance = instance_get(tx, instance_id)?;
instance.game_finished(game)?;
// info!("{:?}", instance_get(tx, instance_id)?);
instance_update(tx, instance)?;
Ok(())
}
pub fn _bot_instance() -> Instance {
let mut instance = Instance::new();
let bot_player = bot_player();
let bot = bot_player.id;
instance.add_player(bot_player).unwrap();
let player_account = Uuid::new_v4();
let constructs = instance_mobs(player_account);
let player = Player::new(player_account, None, &"test".to_string(), constructs).set_bot(true);
instance.add_player(player).expect("could not add player");
instance.player_ready(player_account).unwrap();
instance.player_ready(bot).unwrap();
return instance;
}
pub fn vbox_refill(tx: &mut Transaction, account: &Account, instance_id: Uuid) -> Result<Instance, Error> {
let instance = instance_get(tx, instance_id)?
.vbox_refill(account.id)?;
return instance_update(tx, instance);
}
pub fn vbox_buy(tx: &mut Transaction, account: &Account, instance_id: Uuid, group: ItemType, index: String, construct_id: Option<Uuid>) -> Result<Instance, Error> {
let instance = instance_get(tx, instance_id)?
.vbox_buy(account.id, group, index, construct_id)?;
return instance_update(tx, instance);
}
pub fn vbox_combine(tx: &mut Transaction, account: &Account, instance_id: Uuid, stash_indices: Vec<String>, vbox_indices: VboxIndices) -> Result<Instance, Error> {
let instance = instance_get(tx, instance_id)?
.vbox_combine(account.id, stash_indices, vbox_indices)?;
return instance_update(tx, instance);
}
pub fn vbox_refund(tx: &mut Transaction, account: &Account, instance_id: Uuid, index: String) -> Result<Instance, Error> {
let instance = instance_get(tx, instance_id)?
.vbox_refund(account.id, index)?;
return instance_update(tx, instance);
}
pub fn vbox_apply(tx: &mut Transaction, account: &Account, instance_id: Uuid, construct_id: Uuid, index: String) -> Result<Instance, Error> {
let instance = instance_get(tx, instance_id)?
.vbox_apply(account.id, index, construct_id)?;
return instance_update(tx, instance);
}
pub fn vbox_unequip(tx: &mut Transaction, account: &Account, instance_id: Uuid, construct_id: Uuid, target: Item, target_construct_id: Option<Uuid>) -> Result<Instance, Error> {
let instance = instance_get(tx, instance_id)?
.vbox_unequip(account.id, target, construct_id, target_construct_id)?;
return instance_update(tx, instance);
}