From 3c085aa52b8fd3a2cb6e1761a79b23d33ec04d38 Mon Sep 17 00:00:00 2001 From: ntr Date: Sun, 23 Jun 2019 13:23:59 +1000 Subject: [PATCH] pubsub fix --- server/Cargo.toml | 2 +- server/src/mtx.rs | 69 ++++++++++++++++++++++---------------------- server/src/net.rs | 15 ++++++++-- server/src/pubsub.rs | 16 ++++------ 4 files changed, 54 insertions(+), 48 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index 6610612d..eaf3216f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -34,4 +34,4 @@ actix-cors = "0.1.0" stripe-rust = { version = "0.10", features = ["webhooks"] } [patch.crates-io] -stripe-rust = { path = "/home/ntr/code/stripe-rs" } +stripe-rust = { git = "https://github.com/margh/stripe-rs.git" } diff --git a/server/src/mtx.rs b/server/src/mtx.rs index a2e4a398..8d3d1c21 100644 --- a/server/src/mtx.rs +++ b/server/src/mtx.rs @@ -6,6 +6,40 @@ use stripe::{Event, EventObject, CheckoutSession}; use net::{State, PgPool, MnmlError}; +#[derive(Debug,Clone,Copy,Serialize,Deserialize)] +enum Mtx { + Subscription, + Currency, +} + +#[derive(Debug,Clone,Serialize,Deserialize)] +struct Order { + account: Uuid, + customer: String, +} + +fn process_stripe_checkout(session: CheckoutSession) -> Result, MnmlError> { + let account = match session.client_reference_id { + Some(a) => Uuid::parse_str(&a).or(Err(MnmlError::NoUser))?, + None => { + warn!("unknown user checkout {:?}", session); + return Err(MnmlError::NoUser) + }, + }; + + // checkout.session.completed + // assign stripe customer_id to account + + // if subscription + // go get it + // set account sub to active and end date + + // if just bits purchase + + + Ok(vec![]) +} + pub struct PaymentProcessor { pool: PgPool, } @@ -54,41 +88,6 @@ impl Handler for PaymentProcessor { } } -fn process_stripe_checkout(session: CheckoutSession) -> Result, MnmlError> { - let account = match session.client_reference_id { - Some(a) => Uuid::parse_str(&a).or(Err(MnmlError::UnknownUser))?, - None => { - warn!("unknown user checkout {:?}", session); - return Err(MnmlError::UnknownUser) - }, - }; - - // checkout.session.completed - // assign stripe customer_id to account - - // if subscription - // go get it - // set account sub to active and end date - - // if just bits purchase - - - Ok(vec![]) -} - - -#[derive(Debug,Clone,Copy,Serialize,Deserialize)] -enum Mtx { - Subscription, - Currency, -} - -#[derive(Debug,Clone,Serialize,Deserialize)] -struct Order { - account: Uuid, - customer: String, -} - pub fn post_stripe_event(state: web::Data, body: web::Json::) -> Result { let event: Event = body.into_inner(); info!("stripe event {:?}", event); diff --git a/server/src/net.rs b/server/src/net.rs index d5d41619..e8b64c17 100644 --- a/server/src/net.rs +++ b/server/src/net.rs @@ -34,6 +34,8 @@ pub enum MnmlError { #[fail(display="bad request")] BadRequest, + #[fail(display="missing user id")] + NoUser, #[fail(display="unknown user")] UnknownUser, } @@ -58,6 +60,9 @@ impl ResponseError for MnmlError { MnmlError::UnknownUser => HttpResponse::BadRequest() .json(RpcErrorResponse { err: "unknown user".to_string() }), + + MnmlError::NoUser => HttpResponse::BadRequest() + .json(RpcErrorResponse { err: "missing user id".to_string() }), } } } @@ -161,9 +166,15 @@ pub fn start() { let payments = PaymentProcessor::new(pool.clone()).start(); - let pubsub_conn = pool.get().expect("could not get pubsub pg connection"); - spawn(move || pg_listen(pubsub_conn)); + let pubsub_pool = pool.clone(); let pubsub = Supervisor::start(move |_| PubSub::new()); + spawn(move || loop { + let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection"); + match pg_listen(pubsub_conn) { + Ok(_) => warn!("pg listen closed"), + Err(e) => warn!("pg_listen error {:?}", e), + } + }); HttpServer::new(move || App::new() .data(State { pool: pool.clone(), secure: false, payments: payments.clone(), pubsub: pubsub.clone() }) diff --git a/server/src/pubsub.rs b/server/src/pubsub.rs index bdb9066e..ecf8e063 100644 --- a/server/src/pubsub.rs +++ b/server/src/pubsub.rs @@ -1,6 +1,6 @@ // Db Commons use fallible_iterator::{FallibleIterator}; - +use postgres::error::Error; use actix::prelude::*; use net::{Db}; @@ -41,19 +41,15 @@ impl Handler for PubSub { } } -pub fn pg_listen(connection: Db) { +pub fn pg_listen(connection: Db) -> Result<(), Error> { connection.execute("LISTEN events;", &[]).expect("unable to subscribe to pg events"); info!("pubsub listening"); let notifications = connection.notifications(); - let mut n_iter = notifications.iter(); + let mut n_iter = notifications.blocking_iter(); loop { - match n_iter.next() { - Ok(r) => { - if let Some(n) = r { - info!("{:?}", n); - } - } - Err(e) => warn!("{:?}", e), + let n = n_iter.next()?; + if let Some(n) = n { + info!("{:?}", n); } } }