diff --git a/server/src/main.rs b/server/src/main.rs index 42cbe65a..ff03e820 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -59,7 +59,8 @@ fn setup_logger() -> Result<(), fern::InitError> { )) }) .level_for("postgres", log::LevelFilter::Info) - .level(log::LevelFilter::Debug) + .level_for("actix_web", log::LevelFilter::Info) + .level(log::LevelFilter::Info) .chain(std::io::stdout()) .chain(fern::log_file("log/mnml.log")?) .apply()?; diff --git a/server/src/mtx.rs b/server/src/mtx.rs index bf84c101..a2e4a398 100644 --- a/server/src/mtx.rs +++ b/server/src/mtx.rs @@ -14,7 +14,11 @@ impl Actor for PaymentProcessor { type Context = Context; fn started(&mut self, _ctx: &mut Self::Context) { - info!("listening for PaymentProcessor"); + info!("PaymentProcessor started"); + } + + fn stopped(&mut self, _ctx: &mut Context) { + println!("PaymentProcessor is stopped"); } } @@ -30,6 +34,11 @@ impl PaymentProcessor { } } +struct StripeEvent(Event); +impl Message for StripeEvent { + type Result = Result, MnmlError>; +} + impl Handler for PaymentProcessor { type Result = Result, MnmlError>; @@ -37,7 +46,10 @@ impl Handler for PaymentProcessor { let event = msg.0; match event.data.object { EventObject::CheckoutSession(s) => process_stripe_checkout(s), - _ => Err(MnmlError::ServerError), + _ => { + warn!("unhandled stripe event {:?}", event); + Err(MnmlError::ServerError) + }, } } } @@ -77,16 +89,11 @@ struct Order { customer: String, } -struct StripeEvent(Event); -impl Message for StripeEvent { - type Result = Result, MnmlError>; -} - pub fn post_stripe_event(state: web::Data, body: web::Json::) -> Result { let event: Event = body.into_inner(); info!("stripe event {:?}", event); - state.payments.do_send(StripeEvent(event)); - Ok(HttpResponse::Ok().finish()) + state.payments.try_send(StripeEvent(event)).or(Err(MnmlError::ServerError))?; + Ok(HttpResponse::Accepted().finish()) } #[cfg(test)] diff --git a/server/src/net.rs b/server/src/net.rs index deb2775f..d5d41619 100644 --- a/server/src/net.rs +++ b/server/src/net.rs @@ -1,4 +1,5 @@ use std::env; +use std::thread::spawn; use actix_web::{middleware, web, App, HttpMessage, HttpRequest, HttpResponse, HttpServer}; use actix_web::error::ResponseError; @@ -14,8 +15,8 @@ use r2d2_postgres::{TlsMode, PostgresConnectionManager}; use rpc::{RpcErrorResponse, AccountLoginParams, AccountCreateParams}; use warden::{Warden}; -use pubsub::PubSub; -use ws::{connect}; +use pubsub::{PubSub, pg_listen}; +use ws::{WsEvent, connect}; use account::{account_login, account_create, account_from_token, account_set_token}; use mtx::{PaymentProcessor, post_stripe_event}; @@ -155,14 +156,14 @@ pub fn start() { .expect("DATABASE_URL must be set"); let pool = create_pool(database_url); - let _sys = System::new("mnml"); - + let _system = System::new("mnml"); let _warden = Warden::new(pool.clone()).start(); let payments = PaymentProcessor::new(pool.clone()).start(); let pubsub_conn = pool.get().expect("could not get pubsub pg connection"); - let pubsub = Supervisor::start(move |_| PubSub::new(pubsub_conn)); + spawn(move || pg_listen(pubsub_conn)); + let pubsub = Supervisor::start(move |_| PubSub::new()); HttpServer::new(move || App::new() .data(State { pool: pool.clone(), secure: false, payments: payments.clone(), pubsub: pubsub.clone() }) diff --git a/server/src/pubsub.rs b/server/src/pubsub.rs index d812fc4a..bdb9066e 100644 --- a/server/src/pubsub.rs +++ b/server/src/pubsub.rs @@ -4,49 +4,56 @@ use fallible_iterator::{FallibleIterator}; use actix::prelude::*; use net::{Db}; +use ws::{WsEvent}; -pub struct PubSub { - connection: Db, -} +pub struct PubSub; impl Actor for PubSub { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { - self.pg_listen(ctx); - self.check_events(ctx); + info!("PubSub started"); + } + + fn stopped(&mut self, _ctx: &mut Context) { + println!("PubSub is stopped"); } } impl Supervised for PubSub { - fn restarting(&mut self, ctx: &mut Context) { + fn restarting(&mut self, _ctx: &mut Context) { warn!("pubsub restarting"); } } impl PubSub { - pub fn new(connection: Db) -> PubSub { - PubSub { connection } + pub fn new() -> PubSub { + PubSub } +} - fn pg_listen(&mut self, ctx: &mut ::Context) { - self.connection.execute("LISTEN events;", &[]).expect("unable to subscribe to pg events"); - info!("pubsub listening"); +impl Handler for PubSub { + type Result = (); + + fn handle(&mut self, _msg: WsEvent, _: &mut Context) -> Self::Result { + info!("received ws event"); + println!("received ws event"); } +} - fn check_events(&mut self, ctx: &mut ::Context) { - let notifications = self.connection.notifications(); - let mut n_iter = notifications.iter(); - loop { - match n_iter.next() { - Ok(r) => { - if let Some(n) = r { - info!("{:?}", n); - } +pub fn pg_listen(connection: Db) { + connection.execute("LISTEN events;", &[]).expect("unable to subscribe to pg events"); + info!("pubsub listening"); + let notifications = connection.notifications(); + let mut n_iter = notifications.iter(); + loop { + match n_iter.next() { + Ok(r) => { + if let Some(n) = r { + info!("{:?}", n); } - Err(e) => warn!("{:?}", e), } + Err(e) => warn!("{:?}", e), } } - } diff --git a/server/src/ws.rs b/server/src/ws.rs index 22ab9389..6cf71cba 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -108,13 +108,18 @@ impl MnmlSocket { } } +pub struct WsEvent(Option); +impl Message for WsEvent { + type Result = (); +} + // idk how this stuff works // but the args extract what you need from the incoming requests // this grabs // the req obj itself which we need for cookies // the application state // and the websocket stream -pub fn connect(r: HttpRequest, state: web::Data, stream: web::Payload) -> Result { +pub fn connect(r: HttpRequest, state: web::Data, stream: web::Payload) -> Result { let account: Option = match r.cookie("x-auth-token") { Some(t) => { let db = state.pool.get().or(Err(MnmlError::ServerError))?; @@ -127,5 +132,6 @@ pub fn connect(r: HttpRequest, state: web::Data, stream: web::Payload) -> None => None, }; - ws::start(MnmlSocket::new(state, account), &r, stream) + // state.pubsub.try_send(WsEvent(account.clone())).or(Err(MnmlError::ServerError))?; + ws::start(MnmlSocket::new(state, account), &r, stream).or(Err(MnmlError::ServerError)) }