This commit is contained in:
ntr 2019-06-16 19:47:16 +10:00
parent 7dcbe06766
commit 5c081f8d9f
4 changed files with 84 additions and 31 deletions

View File

@ -19,6 +19,7 @@ dotenv = "0.9.0"
postgres = { version = "0.15", features = ["with-uuid", "with-chrono"] } postgres = { version = "0.15", features = ["with-uuid", "with-chrono"] }
r2d2 = "*" r2d2 = "*"
r2d2_postgres = "*" r2d2_postgres = "*"
fallible-iterator = "0.1"
failure = "0.1" failure = "0.1"

View File

@ -8,6 +8,7 @@ extern crate dotenv;
extern crate postgres; extern crate postgres;
extern crate r2d2; extern crate r2d2;
extern crate r2d2_postgres; extern crate r2d2_postgres;
extern crate fallible_iterator;
extern crate actix; extern crate actix;
extern crate actix_web; extern crate actix_web;

View File

@ -1,28 +1,27 @@
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
use std::env; use std::env;
use std::thread::spawn;
use serde_cbor::{to_vec}; use serde_cbor::{to_vec};
use actix_web::{middleware, web, App, Error, HttpMessage, HttpRequest, HttpResponse, HttpServer, Responder}; use actix_web::{middleware, web, App, Error, HttpMessage, HttpRequest, HttpResponse, HttpServer};
use actix_web::middleware::cors::Cors; use actix_web::middleware::cors::Cors;
use actix_web::error::ResponseError; use actix_web::error::ResponseError;
use actix_web::http::{StatusCode, Cookie}; use actix_web::http::{Cookie};
use actix_web::cookie::{SameSite}; use actix_web::cookie::{SameSite};
use actix_web_actors::ws; use actix_web_actors::ws;
use actix::prelude::*; use actix::prelude::*;
use actix::fut::ok;
use actix::fut::FutureResult;
use r2d2::{Pool}; use r2d2::{Pool};
use r2d2::{PooledConnection}; use r2d2::{PooledConnection};
use r2d2_postgres::{TlsMode, PostgresConnectionManager}; use r2d2_postgres::{TlsMode, PostgresConnectionManager};
use rpc::{receive, RpcResult, RpcErrorResponse, AccountLoginParams, AccountCreateParams}; use rpc::{receive, RpcResult, RpcErrorResponse, AccountLoginParams, AccountCreateParams};
use warden::{warden}; use warden::{Warden};
use account::{Account, account_login, account_create, account_from_token, account_set_token}; use account::{Account, account_login, account_create, account_from_token, account_set_token};
pub type Db = PooledConnection<PostgresConnectionManager>; pub type Db = PooledConnection<PostgresConnectionManager>;
type PgPool = Pool<PostgresConnectionManager>; pub type PgPool = Pool<PostgresConnectionManager>;
const DB_POOL_SIZE: u32 = 20; const DB_POOL_SIZE: u32 = 20;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
@ -266,9 +265,11 @@ struct State {
pub fn start() { pub fn start() {
let database_url = env::var("DATABASE_URL") let database_url = env::var("DATABASE_URL")
.expect("DATABASE_URL must be set"); .expect("DATABASE_URL must be set");
let pool = create_pool(database_url); let pool = create_pool(database_url);
let sys = System::new("warden");
let warden_addr = Warden::new(pool.clone()).start();
match env::var("DEV_CORS") { match env::var("DEV_CORS") {
Ok(_) => { Ok(_) => {
warn!("enabling dev CORS middleware"); warn!("enabling dev CORS middleware");

View File

@ -1,11 +1,15 @@
use std::time::{Instant, Duration};
// Db Commons // Db Commons
use fallible_iterator::{FallibleIterator, IntoFallibleIterator};
use postgres::transaction::Transaction; use postgres::transaction::Transaction;
use failure::Error; use failure::Error;
use actix::prelude::*;
use game::{games_need_upkeep, game_update, game_write, game_delete}; use game::{games_need_upkeep, game_update, game_write, game_delete};
use instance::{instances_need_upkeep, instances_idle, instance_update, instance_delete}; use instance::{instances_need_upkeep, instances_idle, instance_update, instance_delete};
use net::{Db}; use net::{Db, PgPool};
fn fetch_games(mut tx: Transaction) -> Result<Transaction, Error> { fn fetch_games(mut tx: Transaction) -> Result<Transaction, Error> {
let games = games_need_upkeep(&mut tx)?; let games = games_need_upkeep(&mut tx)?;
@ -40,12 +44,58 @@ fn fetch_instances(mut tx: Transaction) -> Result<Transaction, Error> {
Ok(tx) Ok(tx)
} }
pub fn warden(db: Db) -> Result<(), Error> { pub struct Warden {
fetch_games(db.transaction()?)? pool: PgPool,
.commit()?; }
fetch_instances(db.transaction()?)? impl Actor for Warden {
.commit()?; type Context = Context<Self>;
Ok(()) fn started(&mut self, ctx: &mut Self::Context) {
self.upkeep(ctx);
self.subscribe(ctx);
}
}
const UPKEEP_INTERVAL: Duration = Duration::from_secs(1);
impl Warden {
pub fn new(pool: PgPool) -> Warden {
Warden { pool }
}
// once the actor has been started this fn runs
// it starts the heartbeat interval and keepalive
fn upkeep(&mut self, ctx: &mut <Warden as Actor>::Context) {
ctx.run_interval(UPKEEP_INTERVAL, |act, _ctx| {
let db = act.pool.get().expect("warden could not get db connection");
fetch_games(db.transaction().expect("warden tx failure"))
.expect("upkeep games failure")
.commit().expect("warden games commit failure");
fetch_instances(db.transaction().expect("warden tx failure"))
.expect("instances games failure")
.commit().expect("warden instances commit failure");
});
}
fn subscribe(&mut self, ctx: &mut <Warden as Actor>::Context) {
let db = self.pool.get().expect("warden could not get db connection");
db.execute("LISTEN events;", &[]).expect("unable to subscribe to pg events");
let notifications = db.notifications();
let mut n_iter = notifications.iter();
loop {
match n_iter.next() {
Ok(r) => {
if let Some(n) = r {
info!("{:?}", n);
}
}
Err(e) => warn!("{:?}", e),
}
}
}
} }