remove actors from everything except ws

This commit is contained in:
ntr 2019-06-24 22:31:52 +10:00
parent 440554dc35
commit bcaee128ac
5 changed files with 53 additions and 173 deletions

View File

@ -1,5 +1,6 @@
use std::env;
use std::thread::spawn;
use std::thread::{spawn, sleep};
use std::time::{Duration};
use actix_web::{middleware, web, App, HttpMessage, HttpRequest, HttpResponse, HttpServer};
use actix_web::error::ResponseError;
@ -7,18 +8,16 @@ use actix_web::http::{Cookie};
use actix_web::cookie::{SameSite};
use actix_cors::Cors;
use actix::prelude::*;
use r2d2::{Pool};
use r2d2::{PooledConnection};
use r2d2_postgres::{TlsMode, PostgresConnectionManager};
use rpc::{RpcErrorResponse, AccountLoginParams, AccountCreateParams};
use warden::{Warden};
use pubsub::{PubSub, pg_listen};
use warden::{warden};
use pubsub::{pg_listen};
use ws::{connect};
use account::{account_login, account_create, account_from_token, account_set_token};
use payments::{PaymentProcessor, post_stripe_event};
use payments::{post_stripe_event};
pub type Db = PooledConnection<PostgresConnectionManager>;
pub type PgPool = Pool<PostgresConnectionManager>;
@ -141,8 +140,7 @@ fn create_pool(url: String) -> Pool<PostgresConnectionManager> {
pub struct State {
pub pool: PgPool,
pub payments: Addr<PaymentProcessor>,
pub pubsub: Addr<PubSub>,
// pub pubsub: PubSub,
secure: bool,
}
@ -151,13 +149,18 @@ pub fn start() {
.expect("DATABASE_URL must be set");
let pool = create_pool(database_url);
let _system = System::new("mnml");
let _warden = Warden::new(pool.clone()).start();
let payments = PaymentProcessor::new(pool.clone()).start();
let warden_pool = pool.clone();
spawn(move || {
loop {
let db_connection = warden_pool.get().expect("unable to get db connection");
if let Err(e) = warden(db_connection) {
info!("{:?}", e);
}
sleep(Duration::new(1, 0));
}
});
let pubsub_pool = pool.clone();
let pubsub = Supervisor::start(move |_| PubSub::new());
spawn(move || loop {
let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection");
match pg_listen(pubsub_conn) {
@ -167,7 +170,7 @@ pub fn start() {
});
HttpServer::new(move || App::new()
.data(State { pool: pool.clone(), secure: false, payments: payments.clone(), pubsub: pubsub.clone() })
.data(State { pool: pool.clone(), secure: false })
.wrap(middleware::Logger::default())
.wrap(Cors::new().supports_credentials())
.service(web::resource("/api/login").route(web::post().to(login)))

View File

@ -1,6 +1,5 @@
use uuid::Uuid;
use actix_web::{web, HttpResponse};
use actix::prelude::*;
use postgres::transaction::Transaction;
use failure::Error;
@ -73,7 +72,7 @@ impl StripeData {
}
fn stripe_checkout_mtx(session: CheckoutSession) -> Result<Vec<StripeData>, Error> {
fn stripe_checkout_data(session: CheckoutSession) -> Result<Vec<StripeData>, Error> {
let account = match session.client_reference_id {
Some(ref a) => Uuid::parse_str(a)?,
None => {
@ -105,88 +104,39 @@ fn stripe_checkout_mtx(session: CheckoutSession) -> Result<Vec<StripeData>, Erro
return Ok(items);
}
pub struct PaymentProcessor {
pool: PgPool,
}
impl Actor for PaymentProcessor {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
info!("PaymentProcessor started");
}
fn stopped(&mut self, _ctx: &mut Context<Self>) {
println!("PaymentProcessor is stopped");
}
}
impl Supervised for PaymentProcessor {
fn restarting(&mut self, _ctx: &mut Context<PaymentProcessor>) {
warn!("PaymentProcessor restarting");
}
}
impl PaymentProcessor {
pub fn new(pool: PgPool) -> PaymentProcessor {
PaymentProcessor { pool }
}
fn process_stripe(&mut self, msg: StripeEvent) -> Result<Vec<StripeData>, Error> {
let event = msg.0;
match event.data.object {
EventObject::CheckoutSession(s) => {
let data = match stripe_checkout_mtx(s) {
Ok(data) => data,
Err(e) => {
error!("{:?}", e);
return Err(e);
}
};
let connection = self.pool.get()?;
let mut tx = connection.transaction()?;
for item in data.iter() {
item.side_effects(&mut tx)?;
item.persist(&mut tx)?;
fn process_stripe(event: Event, pool: &PgPool) -> Result<Vec<StripeData>, Error> {
info!("stripe event {:?}", event);
match event.data.object {
EventObject::CheckoutSession(s) => {
let data = match stripe_checkout_data(s) {
Ok(data) => data,
Err(e) => {
error!("{:?}", e);
return Err(e);
}
};
tx.commit()?;
Ok(data)
},
_ => {
error!("unhandled stripe event {:?}", event);
Err(err_msg("UnhanldedEvent"))
},
}
}
let connection = pool.get()?;
let mut tx = connection.transaction()?;
}
struct StripeEvent(Event);
impl Message for StripeEvent {
type Result = Result<Vec<StripeData>, Error>;
}
impl Handler<StripeEvent> for PaymentProcessor {
type Result = Result<Vec<StripeData>, Error>;
fn handle(&mut self, msg: StripeEvent, _ctx: &mut Context<Self>) -> Self::Result {
match self.process_stripe(msg) {
Ok(data) => Ok(data),
Err(e) => {
error!("{:?}", e);
Err(e)
for item in data.iter() {
item.side_effects(&mut tx)?;
item.persist(&mut tx)?;
}
}
tx.commit()?;
Ok(data)
},
_ => {
error!("unhandled stripe event {:?}", event);
Err(err_msg("UnhanldedEvent"))
},
}
}
pub fn post_stripe_event(state: web::Data<State>, body: web::Json::<Event>) -> Result<HttpResponse, MnmlHttpError> {
let event: Event = body.into_inner();
info!("stripe event {:?}", event);
state.payments.try_send(StripeEvent(event)).or(Err(MnmlHttpError::ServerError))?;
process_stripe(event, &state.pool).or(Err(MnmlHttpError::ServerError))?;
Ok(HttpResponse::Accepted().finish())
}
@ -207,7 +157,7 @@ mod tests {
.expect("could not deserialize");
let data = match event.data.object {
EventObject::CheckoutSession(s) => stripe_checkout_mtx(s).expect("purchase error"),
EventObject::CheckoutSession(s) => stripe_checkout_data(s).expect("purchase error"),
_ => panic!("unknown event obj"),
};
@ -233,7 +183,7 @@ mod tests {
.expect("could not deserialize");
let data = match event.data.object {
EventObject::CheckoutSession(s) => stripe_checkout_mtx(s).expect("subscription error"),
EventObject::CheckoutSession(s) => stripe_checkout_data(s).expect("subscription error"),
_ => panic!("unknown event obj"),
};

View File

@ -1,45 +1,8 @@
// Db Commons
use fallible_iterator::{FallibleIterator};
use postgres::error::Error;
use actix::prelude::*;
use net::{Db};
use ws::{WsEvent};
pub struct PubSub;
impl Actor for PubSub {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
info!("PubSub started");
}
fn stopped(&mut self, _ctx: &mut Context<Self>) {
println!("PubSub is stopped");
}
}
impl Supervised for PubSub {
fn restarting(&mut self, _ctx: &mut Context<PubSub>) {
warn!("pubsub restarting");
}
}
impl PubSub {
pub fn new() -> PubSub {
PubSub
}
}
impl Handler<WsEvent> for PubSub {
type Result = ();
fn handle(&mut self, _msg: WsEvent, _: &mut Context<Self>) -> Self::Result {
info!("received ws event");
println!("received ws event");
}
}
pub fn pg_listen(connection: Db) -> Result<(), Error> {
connection.execute("LISTEN events;", &[])?;

View File

@ -1,14 +1,10 @@
use std::time::{Instant, Duration};
// Db Commons
use postgres::transaction::Transaction;
use failure::Error;
use actix::prelude::*;
use game::{games_need_upkeep, game_update, game_write, game_delete};
use instance::{instances_need_upkeep, instances_idle, instance_update, instance_delete};
use net::{Db, PgPool};
use net::{Db};
fn fetch_games(mut tx: Transaction) -> Result<Transaction, Error> {
let games = games_need_upkeep(&mut tx)?;
@ -43,39 +39,12 @@ fn fetch_instances(mut tx: Transaction) -> Result<Transaction, Error> {
Ok(tx)
}
pub struct Warden {
pool: PgPool,
}
impl Actor for Warden {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.upkeep(ctx);
}
}
const UPKEEP_INTERVAL: Duration = Duration::from_secs(1);
impl Warden {
pub fn new(pool: PgPool) -> Warden {
Warden { pool }
}
// once the actor has been started this fn runs
// it starts the heartbeat interval and keepalive
fn upkeep(&mut self, ctx: &mut <Warden as Actor>::Context) {
ctx.run_interval(UPKEEP_INTERVAL, |act, _ctx| {
debug!("upkeep starting");
let db = act.pool.get().expect("warden could not get db connection");
fetch_games(db.transaction().expect("warden tx failure"))
.expect("upkeep games failure")
.commit().expect("warden games commit failure");
fetch_instances(db.transaction().expect("warden tx failure"))
.expect("instances games failure")
.commit().expect("warden instances commit failure");
});
}
pub fn warden(db: Db) -> Result<(), Error> {
fetch_games(db.transaction()?)?
.commit()?;
fetch_instances(db.transaction()?)?
.commit()?;
Ok(())
}

View File

@ -1,6 +1,6 @@
use std::time::{Instant, Duration};
use actix_web::{web, Error, HttpMessage, HttpRequest, HttpResponse};
use actix_web::{web, HttpMessage, HttpRequest, HttpResponse};
use actix_web_actors::ws;
use actix::prelude::*;
@ -108,11 +108,6 @@ impl MnmlSocket {
}
}
pub struct WsEvent(Option<Account>);
impl Message for WsEvent {
type Result = ();
}
// idk how this stuff works
// but the args extract what you need from the incoming requests
// this grabs