fix blocking event loop

This commit is contained in:
ntr 2019-06-20 17:17:47 +10:00
parent a43f0d309e
commit f832113966
5 changed files with 61 additions and 39 deletions

View File

@ -59,7 +59,8 @@ fn setup_logger() -> Result<(), fern::InitError> {
)) ))
}) })
.level_for("postgres", log::LevelFilter::Info) .level_for("postgres", log::LevelFilter::Info)
.level(log::LevelFilter::Debug) .level_for("actix_web", log::LevelFilter::Info)
.level(log::LevelFilter::Info)
.chain(std::io::stdout()) .chain(std::io::stdout())
.chain(fern::log_file("log/mnml.log")?) .chain(fern::log_file("log/mnml.log")?)
.apply()?; .apply()?;

View File

@ -14,7 +14,11 @@ impl Actor for PaymentProcessor {
type Context = Context<Self>; type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) { fn started(&mut self, _ctx: &mut Self::Context) {
info!("listening for PaymentProcessor"); info!("PaymentProcessor started");
}
fn stopped(&mut self, _ctx: &mut Context<Self>) {
println!("PaymentProcessor is stopped");
} }
} }
@ -30,6 +34,11 @@ impl PaymentProcessor {
} }
} }
struct StripeEvent(Event);
impl Message for StripeEvent {
type Result = Result<Vec<Mtx>, MnmlError>;
}
impl Handler<StripeEvent> for PaymentProcessor { impl Handler<StripeEvent> for PaymentProcessor {
type Result = Result<Vec<Mtx>, MnmlError>; type Result = Result<Vec<Mtx>, MnmlError>;
@ -37,7 +46,10 @@ impl Handler<StripeEvent> for PaymentProcessor {
let event = msg.0; let event = msg.0;
match event.data.object { match event.data.object {
EventObject::CheckoutSession(s) => process_stripe_checkout(s), EventObject::CheckoutSession(s) => process_stripe_checkout(s),
_ => Err(MnmlError::ServerError), _ => {
warn!("unhandled stripe event {:?}", event);
Err(MnmlError::ServerError)
},
} }
} }
} }
@ -77,16 +89,11 @@ struct Order {
customer: String, customer: String,
} }
struct StripeEvent(Event);
impl Message for StripeEvent {
type Result = Result<Vec<Mtx>, MnmlError>;
}
pub fn post_stripe_event(state: web::Data<State>, body: web::Json::<Event>) -> Result<HttpResponse, MnmlError> { pub fn post_stripe_event(state: web::Data<State>, body: web::Json::<Event>) -> Result<HttpResponse, MnmlError> {
let event: Event = body.into_inner(); let event: Event = body.into_inner();
info!("stripe event {:?}", event); info!("stripe event {:?}", event);
state.payments.do_send(StripeEvent(event)); state.payments.try_send(StripeEvent(event)).or(Err(MnmlError::ServerError))?;
Ok(HttpResponse::Ok().finish()) Ok(HttpResponse::Accepted().finish())
} }
#[cfg(test)] #[cfg(test)]

View File

@ -1,4 +1,5 @@
use std::env; use std::env;
use std::thread::spawn;
use actix_web::{middleware, web, App, HttpMessage, HttpRequest, HttpResponse, HttpServer}; use actix_web::{middleware, web, App, HttpMessage, HttpRequest, HttpResponse, HttpServer};
use actix_web::error::ResponseError; use actix_web::error::ResponseError;
@ -14,8 +15,8 @@ use r2d2_postgres::{TlsMode, PostgresConnectionManager};
use rpc::{RpcErrorResponse, AccountLoginParams, AccountCreateParams}; use rpc::{RpcErrorResponse, AccountLoginParams, AccountCreateParams};
use warden::{Warden}; use warden::{Warden};
use pubsub::PubSub; use pubsub::{PubSub, pg_listen};
use ws::{connect}; use ws::{WsEvent, connect};
use account::{account_login, account_create, account_from_token, account_set_token}; use account::{account_login, account_create, account_from_token, account_set_token};
use mtx::{PaymentProcessor, post_stripe_event}; use mtx::{PaymentProcessor, post_stripe_event};
@ -155,14 +156,14 @@ pub fn start() {
.expect("DATABASE_URL must be set"); .expect("DATABASE_URL must be set");
let pool = create_pool(database_url); let pool = create_pool(database_url);
let _sys = System::new("mnml"); let _system = System::new("mnml");
let _warden = Warden::new(pool.clone()).start(); let _warden = Warden::new(pool.clone()).start();
let payments = PaymentProcessor::new(pool.clone()).start(); let payments = PaymentProcessor::new(pool.clone()).start();
let pubsub_conn = pool.get().expect("could not get pubsub pg connection"); let pubsub_conn = pool.get().expect("could not get pubsub pg connection");
let pubsub = Supervisor::start(move |_| PubSub::new(pubsub_conn)); spawn(move || pg_listen(pubsub_conn));
let pubsub = Supervisor::start(move |_| PubSub::new());
HttpServer::new(move || App::new() HttpServer::new(move || App::new()
.data(State { pool: pool.clone(), secure: false, payments: payments.clone(), pubsub: pubsub.clone() }) .data(State { pool: pool.clone(), secure: false, payments: payments.clone(), pubsub: pubsub.clone() })

View File

@ -4,49 +4,56 @@ use fallible_iterator::{FallibleIterator};
use actix::prelude::*; use actix::prelude::*;
use net::{Db}; use net::{Db};
use ws::{WsEvent};
pub struct PubSub { pub struct PubSub;
connection: Db,
}
impl Actor for PubSub { impl Actor for PubSub {
type Context = Context<Self>; type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
self.pg_listen(ctx); info!("PubSub started");
self.check_events(ctx); }
fn stopped(&mut self, _ctx: &mut Context<Self>) {
println!("PubSub is stopped");
} }
} }
impl Supervised for PubSub { impl Supervised for PubSub {
fn restarting(&mut self, ctx: &mut Context<PubSub>) { fn restarting(&mut self, _ctx: &mut Context<PubSub>) {
warn!("pubsub restarting"); warn!("pubsub restarting");
} }
} }
impl PubSub { impl PubSub {
pub fn new(connection: Db) -> PubSub { pub fn new() -> PubSub {
PubSub { connection } PubSub
} }
}
fn pg_listen(&mut self, ctx: &mut <PubSub as Actor>::Context) { impl Handler<WsEvent> for PubSub {
self.connection.execute("LISTEN events;", &[]).expect("unable to subscribe to pg events"); type Result = ();
info!("pubsub listening");
fn handle(&mut self, _msg: WsEvent, _: &mut Context<Self>) -> Self::Result {
info!("received ws event");
println!("received ws event");
} }
}
fn check_events(&mut self, ctx: &mut <PubSub as Actor>::Context) { pub fn pg_listen(connection: Db) {
let notifications = self.connection.notifications(); connection.execute("LISTEN events;", &[]).expect("unable to subscribe to pg events");
let mut n_iter = notifications.iter(); info!("pubsub listening");
loop { let notifications = connection.notifications();
match n_iter.next() { let mut n_iter = notifications.iter();
Ok(r) => { loop {
if let Some(n) = r { match n_iter.next() {
info!("{:?}", n); Ok(r) => {
} if let Some(n) = r {
info!("{:?}", n);
} }
Err(e) => warn!("{:?}", e),
} }
Err(e) => warn!("{:?}", e),
} }
} }
} }

View File

@ -108,13 +108,18 @@ impl MnmlSocket {
} }
} }
pub struct WsEvent(Option<Account>);
impl Message for WsEvent {
type Result = ();
}
// idk how this stuff works // idk how this stuff works
// but the args extract what you need from the incoming requests // but the args extract what you need from the incoming requests
// this grabs // this grabs
// the req obj itself which we need for cookies // the req obj itself which we need for cookies
// the application state // the application state
// and the websocket stream // and the websocket stream
pub fn connect(r: HttpRequest, state: web::Data<State>, stream: web::Payload) -> Result<HttpResponse, Error> { pub fn connect(r: HttpRequest, state: web::Data<State>, stream: web::Payload) -> Result<HttpResponse, MnmlError> {
let account: Option<Account> = match r.cookie("x-auth-token") { let account: Option<Account> = match r.cookie("x-auth-token") {
Some(t) => { Some(t) => {
let db = state.pool.get().or(Err(MnmlError::ServerError))?; let db = state.pool.get().or(Err(MnmlError::ServerError))?;
@ -127,5 +132,6 @@ pub fn connect(r: HttpRequest, state: web::Data<State>, stream: web::Payload) ->
None => None, None => None,
}; };
ws::start(MnmlSocket::new(state, account), &r, stream) // state.pubsub.try_send(WsEvent(account.clone())).or(Err(MnmlError::ServerError))?;
ws::start(MnmlSocket::new(state, account), &r, stream).or(Err(MnmlError::ServerError))
} }