diff --git a/server/src/net.rs b/server/src/net.rs index 6516a0b8..3e8df851 100644 --- a/server/src/net.rs +++ b/server/src/net.rs @@ -1,5 +1,6 @@ use std::env; -use std::thread::spawn; +use std::thread::{spawn, sleep}; +use std::time::{Duration}; use actix_web::{middleware, web, App, HttpMessage, HttpRequest, HttpResponse, HttpServer}; use actix_web::error::ResponseError; @@ -7,18 +8,16 @@ use actix_web::http::{Cookie}; use actix_web::cookie::{SameSite}; use actix_cors::Cors; -use actix::prelude::*; - use r2d2::{Pool}; use r2d2::{PooledConnection}; use r2d2_postgres::{TlsMode, PostgresConnectionManager}; use rpc::{RpcErrorResponse, AccountLoginParams, AccountCreateParams}; -use warden::{Warden}; -use pubsub::{PubSub, pg_listen}; +use warden::{warden}; +use pubsub::{pg_listen}; use ws::{connect}; use account::{account_login, account_create, account_from_token, account_set_token}; -use payments::{PaymentProcessor, post_stripe_event}; +use payments::{post_stripe_event}; pub type Db = PooledConnection; pub type PgPool = Pool; @@ -141,8 +140,7 @@ fn create_pool(url: String) -> Pool { pub struct State { pub pool: PgPool, - pub payments: Addr, - pub pubsub: Addr, + // pub pubsub: PubSub, secure: bool, } @@ -151,13 +149,18 @@ pub fn start() { .expect("DATABASE_URL must be set"); let pool = create_pool(database_url); - let _system = System::new("mnml"); - let _warden = Warden::new(pool.clone()).start(); - - let payments = PaymentProcessor::new(pool.clone()).start(); + let warden_pool = pool.clone(); + spawn(move || { + loop { + let db_connection = warden_pool.get().expect("unable to get db connection"); + if let Err(e) = warden(db_connection) { + info!("{:?}", e); + } + sleep(Duration::new(1, 0)); + } + }); let pubsub_pool = pool.clone(); - let pubsub = Supervisor::start(move |_| PubSub::new()); spawn(move || loop { let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); match pg_listen(pubsub_conn) { @@ -167,7 +170,7 @@ pub fn start() { }); HttpServer::new(move || App::new() - .data(State { pool: pool.clone(), secure: false, payments: payments.clone(), pubsub: pubsub.clone() }) + .data(State { pool: pool.clone(), secure: false }) .wrap(middleware::Logger::default()) .wrap(Cors::new().supports_credentials()) .service(web::resource("/api/login").route(web::post().to(login))) diff --git a/server/src/payments.rs b/server/src/payments.rs index 020b6347..17a1c371 100644 --- a/server/src/payments.rs +++ b/server/src/payments.rs @@ -1,6 +1,5 @@ use uuid::Uuid; use actix_web::{web, HttpResponse}; -use actix::prelude::*; use postgres::transaction::Transaction; use failure::Error; @@ -73,7 +72,7 @@ impl StripeData { } -fn stripe_checkout_mtx(session: CheckoutSession) -> Result, Error> { +fn stripe_checkout_data(session: CheckoutSession) -> Result, Error> { let account = match session.client_reference_id { Some(ref a) => Uuid::parse_str(a)?, None => { @@ -105,88 +104,39 @@ fn stripe_checkout_mtx(session: CheckoutSession) -> Result, Erro return Ok(items); } -pub struct PaymentProcessor { - pool: PgPool, -} - -impl Actor for PaymentProcessor { - type Context = Context; - - fn started(&mut self, _ctx: &mut Self::Context) { - info!("PaymentProcessor started"); - } - - fn stopped(&mut self, _ctx: &mut Context) { - println!("PaymentProcessor is stopped"); - } -} - -impl Supervised for PaymentProcessor { - fn restarting(&mut self, _ctx: &mut Context) { - warn!("PaymentProcessor restarting"); - } -} - -impl PaymentProcessor { - pub fn new(pool: PgPool) -> PaymentProcessor { - PaymentProcessor { pool } - } - - fn process_stripe(&mut self, msg: StripeEvent) -> Result, Error> { - let event = msg.0; - match event.data.object { - EventObject::CheckoutSession(s) => { - let data = match stripe_checkout_mtx(s) { - Ok(data) => data, - Err(e) => { - error!("{:?}", e); - return Err(e); - } - }; - - let connection = self.pool.get()?; - let mut tx = connection.transaction()?; - - for item in data.iter() { - item.side_effects(&mut tx)?; - item.persist(&mut tx)?; +fn process_stripe(event: Event, pool: &PgPool) -> Result, Error> { + info!("stripe event {:?}", event); + match event.data.object { + EventObject::CheckoutSession(s) => { + let data = match stripe_checkout_data(s) { + Ok(data) => data, + Err(e) => { + error!("{:?}", e); + return Err(e); } + }; - tx.commit()?; - Ok(data) - }, - _ => { - error!("unhandled stripe event {:?}", event); - Err(err_msg("UnhanldedEvent")) - }, - } - } + let connection = pool.get()?; + let mut tx = connection.transaction()?; -} - -struct StripeEvent(Event); -impl Message for StripeEvent { - type Result = Result, Error>; -} - -impl Handler for PaymentProcessor { - type Result = Result, Error>; - - fn handle(&mut self, msg: StripeEvent, _ctx: &mut Context) -> Self::Result { - match self.process_stripe(msg) { - Ok(data) => Ok(data), - Err(e) => { - error!("{:?}", e); - Err(e) + for item in data.iter() { + item.side_effects(&mut tx)?; + item.persist(&mut tx)?; } - } + + tx.commit()?; + Ok(data) + }, + _ => { + error!("unhandled stripe event {:?}", event); + Err(err_msg("UnhanldedEvent")) + }, } } pub fn post_stripe_event(state: web::Data, body: web::Json::) -> Result { let event: Event = body.into_inner(); - info!("stripe event {:?}", event); - state.payments.try_send(StripeEvent(event)).or(Err(MnmlHttpError::ServerError))?; + process_stripe(event, &state.pool).or(Err(MnmlHttpError::ServerError))?; Ok(HttpResponse::Accepted().finish()) } @@ -207,7 +157,7 @@ mod tests { .expect("could not deserialize"); let data = match event.data.object { - EventObject::CheckoutSession(s) => stripe_checkout_mtx(s).expect("purchase error"), + EventObject::CheckoutSession(s) => stripe_checkout_data(s).expect("purchase error"), _ => panic!("unknown event obj"), }; @@ -233,7 +183,7 @@ mod tests { .expect("could not deserialize"); let data = match event.data.object { - EventObject::CheckoutSession(s) => stripe_checkout_mtx(s).expect("subscription error"), + EventObject::CheckoutSession(s) => stripe_checkout_data(s).expect("subscription error"), _ => panic!("unknown event obj"), }; diff --git a/server/src/pubsub.rs b/server/src/pubsub.rs index efa009f6..afe6fe86 100644 --- a/server/src/pubsub.rs +++ b/server/src/pubsub.rs @@ -1,45 +1,8 @@ // Db Commons use fallible_iterator::{FallibleIterator}; use postgres::error::Error; -use actix::prelude::*; use net::{Db}; -use ws::{WsEvent}; - -pub struct PubSub; - -impl Actor for PubSub { - type Context = Context; - - fn started(&mut self, ctx: &mut Self::Context) { - info!("PubSub started"); - } - - fn stopped(&mut self, _ctx: &mut Context) { - println!("PubSub is stopped"); - } -} - -impl Supervised for PubSub { - fn restarting(&mut self, _ctx: &mut Context) { - warn!("pubsub restarting"); - } -} - -impl PubSub { - pub fn new() -> PubSub { - PubSub - } -} - -impl Handler for PubSub { - type Result = (); - - fn handle(&mut self, _msg: WsEvent, _: &mut Context) -> Self::Result { - info!("received ws event"); - println!("received ws event"); - } -} pub fn pg_listen(connection: Db) -> Result<(), Error> { connection.execute("LISTEN events;", &[])?; diff --git a/server/src/warden.rs b/server/src/warden.rs index 8fa4a886..eb07ebe1 100644 --- a/server/src/warden.rs +++ b/server/src/warden.rs @@ -1,14 +1,10 @@ -use std::time::{Instant, Duration}; - // Db Commons use postgres::transaction::Transaction; use failure::Error; -use actix::prelude::*; - use game::{games_need_upkeep, game_update, game_write, game_delete}; use instance::{instances_need_upkeep, instances_idle, instance_update, instance_delete}; -use net::{Db, PgPool}; +use net::{Db}; fn fetch_games(mut tx: Transaction) -> Result { let games = games_need_upkeep(&mut tx)?; @@ -43,39 +39,12 @@ fn fetch_instances(mut tx: Transaction) -> Result { Ok(tx) } -pub struct Warden { - pool: PgPool, -} +pub fn warden(db: Db) -> Result<(), Error> { + fetch_games(db.transaction()?)? + .commit()?; -impl Actor for Warden { - type Context = Context; + fetch_instances(db.transaction()?)? + .commit()?; - fn started(&mut self, ctx: &mut Self::Context) { - self.upkeep(ctx); - } -} - -const UPKEEP_INTERVAL: Duration = Duration::from_secs(1); - -impl Warden { - pub fn new(pool: PgPool) -> Warden { - Warden { pool } - } - - // once the actor has been started this fn runs - // it starts the heartbeat interval and keepalive - fn upkeep(&mut self, ctx: &mut ::Context) { - ctx.run_interval(UPKEEP_INTERVAL, |act, _ctx| { - debug!("upkeep starting"); - let db = act.pool.get().expect("warden could not get db connection"); - - fetch_games(db.transaction().expect("warden tx failure")) - .expect("upkeep games failure") - .commit().expect("warden games commit failure"); - - fetch_instances(db.transaction().expect("warden tx failure")) - .expect("instances games failure") - .commit().expect("warden instances commit failure"); - }); - } -} + Ok(()) +} \ No newline at end of file diff --git a/server/src/ws.rs b/server/src/ws.rs index 844c2b7d..a615b5b4 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -1,6 +1,6 @@ use std::time::{Instant, Duration}; -use actix_web::{web, Error, HttpMessage, HttpRequest, HttpResponse}; +use actix_web::{web, HttpMessage, HttpRequest, HttpResponse}; use actix_web_actors::ws; use actix::prelude::*; @@ -108,11 +108,6 @@ impl MnmlSocket { } } -pub struct WsEvent(Option); -impl Message for WsEvent { - type Result = (); -} - // idk how this stuff works // but the args extract what you need from the incoming requests // this grabs