pubsub fix

This commit is contained in:
ntr 2019-06-23 13:23:59 +10:00
parent f832113966
commit 3c085aa52b
4 changed files with 54 additions and 48 deletions

View File

@ -34,4 +34,4 @@ actix-cors = "0.1.0"
stripe-rust = { version = "0.10", features = ["webhooks"] } stripe-rust = { version = "0.10", features = ["webhooks"] }
[patch.crates-io] [patch.crates-io]
stripe-rust = { path = "/home/ntr/code/stripe-rs" } stripe-rust = { git = "https://github.com/margh/stripe-rs.git" }

View File

@ -6,6 +6,40 @@ use stripe::{Event, EventObject, CheckoutSession};
use net::{State, PgPool, MnmlError}; use net::{State, PgPool, MnmlError};
#[derive(Debug,Clone,Copy,Serialize,Deserialize)]
enum Mtx {
Subscription,
Currency,
}
#[derive(Debug,Clone,Serialize,Deserialize)]
struct Order {
account: Uuid,
customer: String,
}
fn process_stripe_checkout(session: CheckoutSession) -> Result<Vec<Mtx>, MnmlError> {
let account = match session.client_reference_id {
Some(a) => Uuid::parse_str(&a).or(Err(MnmlError::NoUser))?,
None => {
warn!("unknown user checkout {:?}", session);
return Err(MnmlError::NoUser)
},
};
// checkout.session.completed
// assign stripe customer_id to account
// if subscription
// go get it
// set account sub to active and end date
// if just bits purchase
Ok(vec![])
}
pub struct PaymentProcessor { pub struct PaymentProcessor {
pool: PgPool, pool: PgPool,
} }
@ -54,41 +88,6 @@ impl Handler<StripeEvent> for PaymentProcessor {
} }
} }
fn process_stripe_checkout(session: CheckoutSession) -> Result<Vec<Mtx>, MnmlError> {
let account = match session.client_reference_id {
Some(a) => Uuid::parse_str(&a).or(Err(MnmlError::UnknownUser))?,
None => {
warn!("unknown user checkout {:?}", session);
return Err(MnmlError::UnknownUser)
},
};
// checkout.session.completed
// assign stripe customer_id to account
// if subscription
// go get it
// set account sub to active and end date
// if just bits purchase
Ok(vec![])
}
#[derive(Debug,Clone,Copy,Serialize,Deserialize)]
enum Mtx {
Subscription,
Currency,
}
#[derive(Debug,Clone,Serialize,Deserialize)]
struct Order {
account: Uuid,
customer: String,
}
pub fn post_stripe_event(state: web::Data<State>, body: web::Json::<Event>) -> Result<HttpResponse, MnmlError> { pub fn post_stripe_event(state: web::Data<State>, body: web::Json::<Event>) -> Result<HttpResponse, MnmlError> {
let event: Event = body.into_inner(); let event: Event = body.into_inner();
info!("stripe event {:?}", event); info!("stripe event {:?}", event);

View File

@ -34,6 +34,8 @@ pub enum MnmlError {
#[fail(display="bad request")] #[fail(display="bad request")]
BadRequest, BadRequest,
#[fail(display="missing user id")]
NoUser,
#[fail(display="unknown user")] #[fail(display="unknown user")]
UnknownUser, UnknownUser,
} }
@ -58,6 +60,9 @@ impl ResponseError for MnmlError {
MnmlError::UnknownUser => HttpResponse::BadRequest() MnmlError::UnknownUser => HttpResponse::BadRequest()
.json(RpcErrorResponse { err: "unknown user".to_string() }), .json(RpcErrorResponse { err: "unknown user".to_string() }),
MnmlError::NoUser => HttpResponse::BadRequest()
.json(RpcErrorResponse { err: "missing user id".to_string() }),
} }
} }
} }
@ -161,9 +166,15 @@ pub fn start() {
let payments = PaymentProcessor::new(pool.clone()).start(); let payments = PaymentProcessor::new(pool.clone()).start();
let pubsub_conn = pool.get().expect("could not get pubsub pg connection"); let pubsub_pool = pool.clone();
spawn(move || pg_listen(pubsub_conn));
let pubsub = Supervisor::start(move |_| PubSub::new()); let pubsub = Supervisor::start(move |_| PubSub::new());
spawn(move || loop {
let pubsub_conn = pubsub_pool.get().expect("could not get pubsub pg connection");
match pg_listen(pubsub_conn) {
Ok(_) => warn!("pg listen closed"),
Err(e) => warn!("pg_listen error {:?}", e),
}
});
HttpServer::new(move || App::new() HttpServer::new(move || App::new()
.data(State { pool: pool.clone(), secure: false, payments: payments.clone(), pubsub: pubsub.clone() }) .data(State { pool: pool.clone(), secure: false, payments: payments.clone(), pubsub: pubsub.clone() })

View File

@ -1,6 +1,6 @@
// Db Commons // Db Commons
use fallible_iterator::{FallibleIterator}; use fallible_iterator::{FallibleIterator};
use postgres::error::Error;
use actix::prelude::*; use actix::prelude::*;
use net::{Db}; use net::{Db};
@ -41,19 +41,15 @@ impl Handler<WsEvent> for PubSub {
} }
} }
pub fn pg_listen(connection: Db) { pub fn pg_listen(connection: Db) -> Result<(), Error> {
connection.execute("LISTEN events;", &[]).expect("unable to subscribe to pg events"); connection.execute("LISTEN events;", &[]).expect("unable to subscribe to pg events");
info!("pubsub listening"); info!("pubsub listening");
let notifications = connection.notifications(); let notifications = connection.notifications();
let mut n_iter = notifications.iter(); let mut n_iter = notifications.blocking_iter();
loop { loop {
match n_iter.next() { let n = n_iter.next()?;
Ok(r) => { if let Some(n) = n {
if let Some(n) = r {
info!("{:?}", n); info!("{:?}", n);
} }
} }
Err(e) => warn!("{:?}", e),
}
}
} }