diff --git a/server/Cargo.toml b/server/Cargo.toml index 4cfd6849..cabe80d7 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -19,6 +19,7 @@ dotenv = "0.9.0" postgres = { version = "0.15", features = ["with-uuid", "with-chrono"] } r2d2 = "*" r2d2_postgres = "*" +fallible-iterator = "0.1" failure = "0.1" diff --git a/server/src/main.rs b/server/src/main.rs index eaffe954..5437670e 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -8,6 +8,7 @@ extern crate dotenv; extern crate postgres; extern crate r2d2; extern crate r2d2_postgres; +extern crate fallible_iterator; extern crate actix; extern crate actix_web; diff --git a/server/src/net.rs b/server/src/net.rs index 2f53251f..1afee3ac 100644 --- a/server/src/net.rs +++ b/server/src/net.rs @@ -1,28 +1,27 @@ use std::time::{Instant, Duration}; use std::env; +use std::thread::spawn; use serde_cbor::{to_vec}; -use actix_web::{middleware, web, App, Error, HttpMessage, HttpRequest, HttpResponse, HttpServer, Responder}; +use actix_web::{middleware, web, App, Error, HttpMessage, HttpRequest, HttpResponse, HttpServer}; use actix_web::middleware::cors::Cors; use actix_web::error::ResponseError; -use actix_web::http::{StatusCode, Cookie}; +use actix_web::http::{Cookie}; use actix_web::cookie::{SameSite}; use actix_web_actors::ws; use actix::prelude::*; -use actix::fut::ok; -use actix::fut::FutureResult; use r2d2::{Pool}; use r2d2::{PooledConnection}; use r2d2_postgres::{TlsMode, PostgresConnectionManager}; use rpc::{receive, RpcResult, RpcErrorResponse, AccountLoginParams, AccountCreateParams}; -use warden::{warden}; +use warden::{Warden}; use account::{Account, account_login, account_create, account_from_token, account_set_token}; pub type Db = PooledConnection; -type PgPool = Pool; +pub type PgPool = Pool; const DB_POOL_SIZE: u32 = 20; const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); @@ -177,25 +176,25 @@ fn connect(r: HttpRequest, state: web::Data, stream: web::Payload) -> Res } fn login_res(token: String, secure: bool) -> HttpResponse { - HttpResponse::Ok() - .cookie(Cookie::build("x-auth-token", token) - .secure(secure) - .http_only(true) - .same_site(SameSite::Strict) - .max_age(60 * 60 * 24 * 7) // 1 week aligns with db set - .finish()) - .finish() + HttpResponse::Ok() + .cookie(Cookie::build("x-auth-token", token) + .secure(secure) + .http_only(true) + .same_site(SameSite::Strict) + .max_age(60 * 60 * 24 * 7) // 1 week aligns with db set + .finish()) + .finish() } fn logout_res() -> HttpResponse { - HttpResponse::Ok() - .cookie(Cookie::build("x-auth-token", "") - // .secure(secure) - .http_only(true) - .same_site(SameSite::Strict) - .max_age(-1) // 1 week aligns with db set - .finish()) - .finish() + HttpResponse::Ok() + .cookie(Cookie::build("x-auth-token", "") + // .secure(secure) + .http_only(true) + .same_site(SameSite::Strict) + .max_age(-1) // 1 week aligns with db set + .finish()) + .finish() } fn login(state: web::Data, params: web::Json::) -> Result { @@ -266,9 +265,11 @@ struct State { pub fn start() { let database_url = env::var("DATABASE_URL") .expect("DATABASE_URL must be set"); - let pool = create_pool(database_url); + let sys = System::new("warden"); + let warden_addr = Warden::new(pool.clone()).start(); + match env::var("DEV_CORS") { Ok(_) => { warn!("enabling dev CORS middleware"); diff --git a/server/src/warden.rs b/server/src/warden.rs index 1b361b37..07b179fd 100644 --- a/server/src/warden.rs +++ b/server/src/warden.rs @@ -1,11 +1,15 @@ +use std::time::{Instant, Duration}; // Db Commons +use fallible_iterator::{FallibleIterator, IntoFallibleIterator}; use postgres::transaction::Transaction; use failure::Error; +use actix::prelude::*; + use game::{games_need_upkeep, game_update, game_write, game_delete}; use instance::{instances_need_upkeep, instances_idle, instance_update, instance_delete}; -use net::{Db}; +use net::{Db, PgPool}; fn fetch_games(mut tx: Transaction) -> Result { let games = games_need_upkeep(&mut tx)?; @@ -40,12 +44,58 @@ fn fetch_instances(mut tx: Transaction) -> Result { Ok(tx) } -pub fn warden(db: Db) -> Result<(), Error> { - fetch_games(db.transaction()?)? - .commit()?; +pub struct Warden { + pool: PgPool, +} - fetch_instances(db.transaction()?)? - .commit()?; +impl Actor for Warden { + type Context = Context; - Ok(()) -} \ No newline at end of file + fn started(&mut self, ctx: &mut Self::Context) { + self.upkeep(ctx); + self.subscribe(ctx); + } +} + +const UPKEEP_INTERVAL: Duration = Duration::from_secs(1); + +impl Warden { + pub fn new(pool: PgPool) -> Warden { + Warden { pool } + } + + // once the actor has been started this fn runs + // it starts the heartbeat interval and keepalive + fn upkeep(&mut self, ctx: &mut ::Context) { + ctx.run_interval(UPKEEP_INTERVAL, |act, _ctx| { + let db = act.pool.get().expect("warden could not get db connection"); + + fetch_games(db.transaction().expect("warden tx failure")) + .expect("upkeep games failure") + .commit().expect("warden games commit failure"); + + fetch_instances(db.transaction().expect("warden tx failure")) + .expect("instances games failure") + .commit().expect("warden instances commit failure"); + }); + } + + fn subscribe(&mut self, ctx: &mut ::Context) { + let db = self.pool.get().expect("warden could not get db connection"); + + db.execute("LISTEN events;", &[]).expect("unable to subscribe to pg events"); + let notifications = db.notifications(); + let mut n_iter = notifications.iter(); + + loop { + match n_iter.next() { + Ok(r) => { + if let Some(n) = r { + info!("{:?}", n); + } + } + Err(e) => warn!("{:?}", e), + } + } + } +}