it works, time to break it
This commit is contained in:
parent
3c085aa52b
commit
440554dc35
73
ops/migrations/20190624170147_stripe.js
Normal file
73
ops/migrations/20190624170147_stripe.js
Normal file
@ -0,0 +1,73 @@
|
||||
// INSERT into stripe_customers (account, customer, checkout)
|
||||
// INSERT into stripe_subscriptions (account, customer, checkout, subscription)
|
||||
// INSERT into stripe_purchases (account, customer, checkout, amount)
|
||||
|
||||
exports.up = async knex => {
|
||||
await knex.schema.createTable('stripe_customers', table => {
|
||||
table.string('customer', 64)
|
||||
.primary();
|
||||
|
||||
table.uuid('account')
|
||||
.notNullable()
|
||||
.index();
|
||||
|
||||
table.foreign('account')
|
||||
.references('id')
|
||||
.inTable('accounts')
|
||||
.onDelete('RESTRICT');
|
||||
|
||||
table.string('checkout', 64)
|
||||
.notNullable()
|
||||
.unique();
|
||||
|
||||
table.timestamps(true, true);
|
||||
});
|
||||
|
||||
await knex.schema.createTable('stripe_subscriptions', table => {
|
||||
table.string('subscription', 64)
|
||||
.primary();
|
||||
|
||||
table.uuid('account')
|
||||
.notNullable()
|
||||
.index();
|
||||
|
||||
table.foreign('account')
|
||||
.references('id')
|
||||
.inTable('accounts')
|
||||
.onDelete('RESTRICT');
|
||||
|
||||
table.string('customer', 64)
|
||||
.notNullable();
|
||||
|
||||
table.timestamps(true, true);
|
||||
});
|
||||
|
||||
await knex.schema.createTable('stripe_purchases', table => {
|
||||
table.string('checkout', 64)
|
||||
.primary();
|
||||
|
||||
table.uuid('account')
|
||||
.notNullable()
|
||||
.index();
|
||||
|
||||
table.foreign('account')
|
||||
.references('id')
|
||||
.inTable('accounts')
|
||||
.onDelete('RESTRICT');
|
||||
|
||||
table.string('customer', 64)
|
||||
.notNullable();
|
||||
|
||||
table.bigInteger('amount')
|
||||
.notNullable();
|
||||
|
||||
table.timestamps(true, true);
|
||||
});
|
||||
|
||||
await knex.schema.raw(`
|
||||
ALTER TABLE stripe_purchases
|
||||
ADD CHECK (amount > 0);
|
||||
`);
|
||||
};
|
||||
|
||||
exports.down = async () => {};
|
||||
@ -32,6 +32,3 @@ actix-web-actors = "1.0.0"
|
||||
actix-cors = "0.1.0"
|
||||
|
||||
stripe-rust = { version = "0.10", features = ["webhooks"] }
|
||||
|
||||
[patch.crates-io]
|
||||
stripe-rust = { git = "https://github.com/margh/stripe-rs.git" }
|
||||
|
||||
@ -31,7 +31,7 @@ mod game;
|
||||
mod instance;
|
||||
mod item;
|
||||
mod mob;
|
||||
mod mtx;
|
||||
mod payments;
|
||||
mod names;
|
||||
mod net;
|
||||
mod player;
|
||||
|
||||
@ -1,121 +0,0 @@
|
||||
use uuid::Uuid;
|
||||
use actix_web::{web, HttpResponse};
|
||||
use actix::prelude::*;
|
||||
|
||||
use stripe::{Event, EventObject, CheckoutSession};
|
||||
|
||||
use net::{State, PgPool, MnmlError};
|
||||
|
||||
#[derive(Debug,Clone,Copy,Serialize,Deserialize)]
|
||||
enum Mtx {
|
||||
Subscription,
|
||||
Currency,
|
||||
}
|
||||
|
||||
#[derive(Debug,Clone,Serialize,Deserialize)]
|
||||
struct Order {
|
||||
account: Uuid,
|
||||
customer: String,
|
||||
}
|
||||
|
||||
fn process_stripe_checkout(session: CheckoutSession) -> Result<Vec<Mtx>, MnmlError> {
|
||||
let account = match session.client_reference_id {
|
||||
Some(a) => Uuid::parse_str(&a).or(Err(MnmlError::NoUser))?,
|
||||
None => {
|
||||
warn!("unknown user checkout {:?}", session);
|
||||
return Err(MnmlError::NoUser)
|
||||
},
|
||||
};
|
||||
|
||||
// checkout.session.completed
|
||||
// assign stripe customer_id to account
|
||||
|
||||
// if subscription
|
||||
// go get it
|
||||
// set account sub to active and end date
|
||||
|
||||
// if just bits purchase
|
||||
|
||||
|
||||
Ok(vec![])
|
||||
}
|
||||
|
||||
pub struct PaymentProcessor {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl Actor for PaymentProcessor {
|
||||
type Context = Context<Self>;
|
||||
|
||||
fn started(&mut self, _ctx: &mut Self::Context) {
|
||||
info!("PaymentProcessor started");
|
||||
}
|
||||
|
||||
fn stopped(&mut self, _ctx: &mut Context<Self>) {
|
||||
println!("PaymentProcessor is stopped");
|
||||
}
|
||||
}
|
||||
|
||||
impl Supervised for PaymentProcessor {
|
||||
fn restarting(&mut self, _ctx: &mut Context<PaymentProcessor>) {
|
||||
warn!("PaymentProcessor restarting");
|
||||
}
|
||||
}
|
||||
|
||||
impl PaymentProcessor {
|
||||
pub fn new(pool: PgPool) -> PaymentProcessor {
|
||||
PaymentProcessor { pool }
|
||||
}
|
||||
}
|
||||
|
||||
struct StripeEvent(Event);
|
||||
impl Message for StripeEvent {
|
||||
type Result = Result<Vec<Mtx>, MnmlError>;
|
||||
}
|
||||
|
||||
impl Handler<StripeEvent> for PaymentProcessor {
|
||||
type Result = Result<Vec<Mtx>, MnmlError>;
|
||||
|
||||
fn handle(&mut self, msg: StripeEvent, _: &mut Context<Self>) -> Self::Result {
|
||||
let event = msg.0;
|
||||
match event.data.object {
|
||||
EventObject::CheckoutSession(s) => process_stripe_checkout(s),
|
||||
_ => {
|
||||
warn!("unhandled stripe event {:?}", event);
|
||||
Err(MnmlError::ServerError)
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn post_stripe_event(state: web::Data<State>, body: web::Json::<Event>) -> Result<HttpResponse, MnmlError> {
|
||||
let event: Event = body.into_inner();
|
||||
info!("stripe event {:?}", event);
|
||||
state.payments.try_send(StripeEvent(event)).or(Err(MnmlError::ServerError))?;
|
||||
Ok(HttpResponse::Accepted().finish())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::io::prelude::*;
|
||||
use std::fs::File;
|
||||
|
||||
#[test]
|
||||
fn test_stripe_checkout() {
|
||||
let mut f = File::open("./test/checkout.session.completed.purchase.json").expect("couldn't open file");
|
||||
let mut checkout_str = String::new();
|
||||
f.read_to_string(&mut checkout_str)
|
||||
.expect("unable to read file");
|
||||
|
||||
let event: Event = serde_json::from_str(&checkout_str)
|
||||
.expect("could not deserialize");
|
||||
|
||||
let mtx = match event.data.object {
|
||||
EventObject::CheckoutSession(s) => process_stripe_checkout(s),
|
||||
_ => panic!("unknown event obj"),
|
||||
};
|
||||
|
||||
println!("got some fuckin bling {:?}", mtx);
|
||||
}
|
||||
}
|
||||
@ -16,9 +16,9 @@ use r2d2_postgres::{TlsMode, PostgresConnectionManager};
|
||||
use rpc::{RpcErrorResponse, AccountLoginParams, AccountCreateParams};
|
||||
use warden::{Warden};
|
||||
use pubsub::{PubSub, pg_listen};
|
||||
use ws::{WsEvent, connect};
|
||||
use ws::{connect};
|
||||
use account::{account_login, account_create, account_from_token, account_set_token};
|
||||
use mtx::{PaymentProcessor, post_stripe_event};
|
||||
use payments::{PaymentProcessor, post_stripe_event};
|
||||
|
||||
pub type Db = PooledConnection<PostgresConnectionManager>;
|
||||
pub type PgPool = Pool<PostgresConnectionManager>;
|
||||
@ -26,30 +26,26 @@ pub type PgPool = Pool<PostgresConnectionManager>;
|
||||
const DB_POOL_SIZE: u32 = 20;
|
||||
|
||||
#[derive(Fail, Debug)]
|
||||
pub enum MnmlError {
|
||||
pub enum MnmlHttpError {
|
||||
// User Facing Errors
|
||||
#[fail(display="internal server error")]
|
||||
ServerError,
|
||||
#[fail(display="unauthorized")]
|
||||
Unauthorized,
|
||||
#[fail(display="bad request")]
|
||||
BadRequest,
|
||||
|
||||
#[fail(display="missing user id")]
|
||||
NoUser,
|
||||
#[fail(display="unknown user")]
|
||||
UnknownUser,
|
||||
}
|
||||
|
||||
impl ResponseError for MnmlError {
|
||||
impl ResponseError for MnmlHttpError {
|
||||
fn error_response(&self) -> HttpResponse {
|
||||
match *self {
|
||||
MnmlError::ServerError => HttpResponse::InternalServerError()
|
||||
MnmlHttpError::ServerError => HttpResponse::InternalServerError()
|
||||
.json(RpcErrorResponse { err: "server error".to_string() }),
|
||||
|
||||
MnmlError::BadRequest => HttpResponse::BadRequest()
|
||||
MnmlHttpError::BadRequest => HttpResponse::BadRequest()
|
||||
.json(RpcErrorResponse { err: "bad request ".to_string() }),
|
||||
|
||||
MnmlError::Unauthorized => HttpResponse::Unauthorized()
|
||||
MnmlHttpError::Unauthorized => HttpResponse::Unauthorized()
|
||||
.cookie(Cookie::build("x-auth-token", "")
|
||||
// .secure(secure)
|
||||
.http_only(true)
|
||||
@ -57,12 +53,6 @@ impl ResponseError for MnmlError {
|
||||
.max_age(-1) // 1 week aligns with db set
|
||||
.finish())
|
||||
.json(RpcErrorResponse { err: "unauthorized ".to_string() }),
|
||||
|
||||
MnmlError::UnknownUser => HttpResponse::BadRequest()
|
||||
.json(RpcErrorResponse { err: "unknown user".to_string() }),
|
||||
|
||||
MnmlError::NoUser => HttpResponse::BadRequest()
|
||||
.json(RpcErrorResponse { err: "missing user id".to_string() }),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -89,52 +79,52 @@ fn logout_res() -> HttpResponse {
|
||||
.finish()
|
||||
}
|
||||
|
||||
fn login(state: web::Data<State>, params: web::Json::<AccountLoginParams>) -> Result<HttpResponse, MnmlError> {
|
||||
let db = state.pool.get().or(Err(MnmlError::ServerError))?;
|
||||
let mut tx = db.transaction().or(Err(MnmlError::ServerError))?;
|
||||
fn login(state: web::Data<State>, params: web::Json::<AccountLoginParams>) -> Result<HttpResponse, MnmlHttpError> {
|
||||
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
|
||||
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
|
||||
|
||||
match account_login(¶ms.name, ¶ms.password, &mut tx) {
|
||||
Ok(token) => {
|
||||
tx.commit().or(Err(MnmlError::ServerError))?;
|
||||
tx.commit().or(Err(MnmlHttpError::ServerError))?;
|
||||
Ok(login_res(token, state.secure))
|
||||
},
|
||||
Err(e) => {
|
||||
info!("{:?}", e);
|
||||
Err(MnmlError::Unauthorized)
|
||||
Err(MnmlHttpError::Unauthorized)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn logout(r: HttpRequest, state: web::Data<State>) -> Result<HttpResponse, MnmlError> {
|
||||
fn logout(r: HttpRequest, state: web::Data<State>) -> Result<HttpResponse, MnmlHttpError> {
|
||||
match r.cookie("x-auth-token") {
|
||||
Some(t) => {
|
||||
let db = state.pool.get().or(Err(MnmlError::ServerError))?;
|
||||
let mut tx = db.transaction().or(Err(MnmlError::ServerError))?;
|
||||
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
|
||||
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
|
||||
match account_from_token(t.value().to_string(), &mut tx) {
|
||||
Ok(a) => {
|
||||
account_set_token(&mut tx, &a).or(Err(MnmlError::Unauthorized))?;
|
||||
tx.commit().or(Err(MnmlError::ServerError))?;
|
||||
account_set_token(&mut tx, &a).or(Err(MnmlHttpError::Unauthorized))?;
|
||||
tx.commit().or(Err(MnmlHttpError::ServerError))?;
|
||||
return Ok(logout_res());
|
||||
},
|
||||
Err(_) => Err(MnmlError::Unauthorized),
|
||||
Err(_) => Err(MnmlHttpError::Unauthorized),
|
||||
}
|
||||
},
|
||||
None => Err(MnmlError::Unauthorized),
|
||||
None => Err(MnmlHttpError::Unauthorized),
|
||||
}
|
||||
}
|
||||
|
||||
fn register(state: web::Data<State>, params: web::Json::<AccountCreateParams>) -> Result<HttpResponse, MnmlError> {
|
||||
let db = state.pool.get().or(Err(MnmlError::ServerError))?;
|
||||
let mut tx = db.transaction().or(Err(MnmlError::ServerError))?;
|
||||
fn register(state: web::Data<State>, params: web::Json::<AccountCreateParams>) -> Result<HttpResponse, MnmlHttpError> {
|
||||
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
|
||||
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
|
||||
|
||||
match account_create(¶ms.name, ¶ms.password, ¶ms.code, &mut tx) {
|
||||
Ok(token) => {
|
||||
tx.commit().or(Err(MnmlError::ServerError))?;
|
||||
tx.commit().or(Err(MnmlHttpError::ServerError))?;
|
||||
Ok(login_res(token, state.secure))
|
||||
},
|
||||
Err(e) => {
|
||||
info!("{:?}", e);
|
||||
Err(MnmlError::BadRequest)
|
||||
Err(MnmlHttpError::BadRequest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
255
server/src/payments.rs
Normal file
255
server/src/payments.rs
Normal file
@ -0,0 +1,255 @@
|
||||
use uuid::Uuid;
|
||||
use actix_web::{web, HttpResponse};
|
||||
use actix::prelude::*;
|
||||
use postgres::transaction::Transaction;
|
||||
|
||||
use failure::Error;
|
||||
use failure::err_msg;
|
||||
|
||||
use stripe::{Event, EventObject, CheckoutSession};
|
||||
|
||||
use net::{State, PgPool, MnmlHttpError};
|
||||
|
||||
// Because the client_reference_id (account.id) is only included
|
||||
// in the stripe CheckoutSession object
|
||||
// we ensure that we store each object in pg with a link to the object
|
||||
// and to the account id in case of refunds
|
||||
#[derive(Debug,Clone,Serialize,Deserialize)]
|
||||
enum StripeData {
|
||||
Customer { account: Uuid, customer: String, checkout: String },
|
||||
Subscription { account: Uuid, customer: String, checkout: String, subscription: String, },
|
||||
|
||||
// i64 used because it converts to psql BIGINT
|
||||
// expecting a similar system to be used for eth amounts
|
||||
Purchase { account: Uuid, amount: i64, customer: String, checkout: String },
|
||||
}
|
||||
|
||||
impl StripeData {
|
||||
fn persist(&self, tx: &mut Transaction) -> Result<&StripeData, Error> {
|
||||
match self {
|
||||
StripeData::Customer { account, customer, checkout } => {
|
||||
tx.execute("
|
||||
INSERT into stripe_customers (account, customer, checkout)
|
||||
VALUES ($1, $2, $3);
|
||||
", &[&account, &customer, &checkout])?;
|
||||
info!("new stripe customer {:?}", self);
|
||||
Ok(self)
|
||||
},
|
||||
|
||||
StripeData::Subscription { account, customer, checkout, subscription } => {
|
||||
tx.execute("
|
||||
INSERT into stripe_subscriptions (account, customer, checkout, subscription)
|
||||
VALUES ($1, $2, $3, $4);
|
||||
", &[&account, &customer, &checkout, &subscription])?;
|
||||
info!("new stripe subscription {:?}", self);
|
||||
Ok(self)
|
||||
},
|
||||
|
||||
StripeData::Purchase { account, amount, customer, checkout } => {
|
||||
tx.execute("
|
||||
INSERT into stripe_purchases (account, customer, checkout, amount)
|
||||
VALUES ($1, $2, $3, $4);
|
||||
", &[&account, &customer, &checkout, amount])?;
|
||||
info!("new stripe purchase {:?}", self);
|
||||
Ok(self)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn side_effects(&self, tx: &mut Transaction) -> Result<&StripeData, Error> {
|
||||
match self {
|
||||
StripeData::Subscription { subscription, account, customer, checkout } => {
|
||||
// go get it
|
||||
// set account sub to active and end date
|
||||
Ok(self)
|
||||
},
|
||||
StripeData::Purchase { account, customer, amount, checkout } => {
|
||||
// create currency mtx and store
|
||||
Ok(self)
|
||||
},
|
||||
_ => Ok(self)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fn stripe_checkout_mtx(session: CheckoutSession) -> Result<Vec<StripeData>, Error> {
|
||||
let account = match session.client_reference_id {
|
||||
Some(ref a) => Uuid::parse_str(a)?,
|
||||
None => {
|
||||
error!("unknown user checkout {:?}", session);
|
||||
return Err(err_msg("NoUser"))
|
||||
},
|
||||
};
|
||||
|
||||
let mut items = vec![];
|
||||
let customer = session.customer.ok_or(err_msg("UnknownCustomer"))?;
|
||||
let checkout = session.id;
|
||||
|
||||
items.push(StripeData::Customer { account, customer: customer.id().to_string(), checkout: checkout.to_string() });
|
||||
|
||||
if let Some(sub) = session.subscription {
|
||||
items.push(StripeData::Subscription {
|
||||
account,
|
||||
customer: customer.id().to_string(),
|
||||
checkout: checkout.to_string(),
|
||||
subscription: sub.id().to_string()
|
||||
});
|
||||
}
|
||||
|
||||
for item in session.display_items.into_iter() {
|
||||
let amount = item.amount.ok_or(err_msg("NoPricePurchase"))? as i64;
|
||||
items.push(StripeData::Purchase { account, amount, customer: customer.id().to_string(), checkout: checkout.to_string() });
|
||||
};
|
||||
|
||||
return Ok(items);
|
||||
}
|
||||
|
||||
pub struct PaymentProcessor {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl Actor for PaymentProcessor {
|
||||
type Context = Context<Self>;
|
||||
|
||||
fn started(&mut self, _ctx: &mut Self::Context) {
|
||||
info!("PaymentProcessor started");
|
||||
}
|
||||
|
||||
fn stopped(&mut self, _ctx: &mut Context<Self>) {
|
||||
println!("PaymentProcessor is stopped");
|
||||
}
|
||||
}
|
||||
|
||||
impl Supervised for PaymentProcessor {
|
||||
fn restarting(&mut self, _ctx: &mut Context<PaymentProcessor>) {
|
||||
warn!("PaymentProcessor restarting");
|
||||
}
|
||||
}
|
||||
|
||||
impl PaymentProcessor {
|
||||
pub fn new(pool: PgPool) -> PaymentProcessor {
|
||||
PaymentProcessor { pool }
|
||||
}
|
||||
|
||||
fn process_stripe(&mut self, msg: StripeEvent) -> Result<Vec<StripeData>, Error> {
|
||||
let event = msg.0;
|
||||
match event.data.object {
|
||||
EventObject::CheckoutSession(s) => {
|
||||
let data = match stripe_checkout_mtx(s) {
|
||||
Ok(data) => data,
|
||||
Err(e) => {
|
||||
error!("{:?}", e);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
let connection = self.pool.get()?;
|
||||
let mut tx = connection.transaction()?;
|
||||
|
||||
for item in data.iter() {
|
||||
item.side_effects(&mut tx)?;
|
||||
item.persist(&mut tx)?;
|
||||
}
|
||||
|
||||
tx.commit()?;
|
||||
Ok(data)
|
||||
},
|
||||
_ => {
|
||||
error!("unhandled stripe event {:?}", event);
|
||||
Err(err_msg("UnhanldedEvent"))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
struct StripeEvent(Event);
|
||||
impl Message for StripeEvent {
|
||||
type Result = Result<Vec<StripeData>, Error>;
|
||||
}
|
||||
|
||||
impl Handler<StripeEvent> for PaymentProcessor {
|
||||
type Result = Result<Vec<StripeData>, Error>;
|
||||
|
||||
fn handle(&mut self, msg: StripeEvent, _ctx: &mut Context<Self>) -> Self::Result {
|
||||
match self.process_stripe(msg) {
|
||||
Ok(data) => Ok(data),
|
||||
Err(e) => {
|
||||
error!("{:?}", e);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn post_stripe_event(state: web::Data<State>, body: web::Json::<Event>) -> Result<HttpResponse, MnmlHttpError> {
|
||||
let event: Event = body.into_inner();
|
||||
info!("stripe event {:?}", event);
|
||||
state.payments.try_send(StripeEvent(event)).or(Err(MnmlHttpError::ServerError))?;
|
||||
Ok(HttpResponse::Accepted().finish())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::io::prelude::*;
|
||||
use std::fs::File;
|
||||
|
||||
#[test]
|
||||
fn test_stripe_checkout_sku() {
|
||||
let mut f = File::open("./test/checkout.session.completed.purchase.json").expect("couldn't open file");
|
||||
let mut checkout_str = String::new();
|
||||
f.read_to_string(&mut checkout_str)
|
||||
.expect("unable to read file");
|
||||
|
||||
let event: Event = serde_json::from_str(&checkout_str)
|
||||
.expect("could not deserialize");
|
||||
|
||||
let data = match event.data.object {
|
||||
EventObject::CheckoutSession(s) => stripe_checkout_mtx(s).expect("purchase error"),
|
||||
_ => panic!("unknown event obj"),
|
||||
};
|
||||
|
||||
assert!(data.iter().any(|d| match d {
|
||||
StripeData::Customer { account: _, customer: _, checkout: _ } => true,
|
||||
_ => false,
|
||||
}));
|
||||
|
||||
assert!(data.iter().any(|d| match d {
|
||||
StripeData::Purchase { account: _, amount: _, customer: _, checkout: _ } => true,
|
||||
_ => false,
|
||||
}));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stripe_checkout_subscription() {
|
||||
let mut f = File::open("./test/checkout.session.completed.subscription.json").expect("couldn't open file");
|
||||
let mut checkout_str = String::new();
|
||||
f.read_to_string(&mut checkout_str)
|
||||
.expect("unable to read file");
|
||||
|
||||
let event: Event = serde_json::from_str(&checkout_str)
|
||||
.expect("could not deserialize");
|
||||
|
||||
let data = match event.data.object {
|
||||
EventObject::CheckoutSession(s) => stripe_checkout_mtx(s).expect("subscription error"),
|
||||
_ => panic!("unknown event obj"),
|
||||
};
|
||||
|
||||
assert!(data.iter().any(|d| match d {
|
||||
StripeData::Customer { account: _, customer: _, checkout: _ } => true,
|
||||
_ => false,
|
||||
}));
|
||||
|
||||
assert!(data.iter().any(|d| match d {
|
||||
StripeData::Purchase { account: _, amount: _, customer: _, checkout: _ } => true,
|
||||
_ => false,
|
||||
}));
|
||||
|
||||
assert!(data.iter().any(|d| match d {
|
||||
StripeData::Subscription { account: _, customer: _, checkout: _, subscription: _, } => true,
|
||||
_ => false,
|
||||
}));
|
||||
}
|
||||
}
|
||||
12
server/src/pg.rs
Normal file
12
server/src/pg.rs
Normal file
@ -0,0 +1,12 @@
|
||||
use r2d2::{Pool};
|
||||
use r2d2::{PooledConnection};
|
||||
use r2d2_postgres::{PostgresConnectionManager};
|
||||
|
||||
pub type Db = PooledConnection<PostgresConnectionManager>;
|
||||
pub type PgPool = Pool<PostgresConnectionManager>;
|
||||
|
||||
use postgres::transaction::Transaction;
|
||||
|
||||
pub trait Pg {
|
||||
fn persist(self, &mut Transaction) -> Self;
|
||||
}
|
||||
@ -42,7 +42,7 @@ impl Handler<WsEvent> for PubSub {
|
||||
}
|
||||
|
||||
pub fn pg_listen(connection: Db) -> Result<(), Error> {
|
||||
connection.execute("LISTEN events;", &[]).expect("unable to subscribe to pg events");
|
||||
connection.execute("LISTEN events;", &[])?;
|
||||
info!("pubsub listening");
|
||||
let notifications = connection.notifications();
|
||||
let mut n_iter = notifications.blocking_iter();
|
||||
|
||||
@ -6,7 +6,7 @@ use actix::prelude::*;
|
||||
|
||||
use account::{Account, account_from_token};
|
||||
use serde_cbor::{to_vec};
|
||||
use net::{PgPool, MnmlError, State};
|
||||
use net::{PgPool, State, MnmlHttpError};
|
||||
|
||||
use rpc::{receive, RpcResult, RpcErrorResponse};
|
||||
|
||||
@ -119,11 +119,11 @@ impl Message for WsEvent {
|
||||
// the req obj itself which we need for cookies
|
||||
// the application state
|
||||
// and the websocket stream
|
||||
pub fn connect(r: HttpRequest, state: web::Data<State>, stream: web::Payload) -> Result<HttpResponse, MnmlError> {
|
||||
pub fn connect(r: HttpRequest, state: web::Data<State>, stream: web::Payload) -> Result<HttpResponse, MnmlHttpError> {
|
||||
let account: Option<Account> = match r.cookie("x-auth-token") {
|
||||
Some(t) => {
|
||||
let db = state.pool.get().or(Err(MnmlError::ServerError))?;
|
||||
let mut tx = db.transaction().or(Err(MnmlError::ServerError))?;
|
||||
let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?;
|
||||
let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?;
|
||||
match account_from_token(t.value().to_string(), &mut tx) {
|
||||
Ok(a) => Some(a),
|
||||
Err(_) => None,
|
||||
@ -132,6 +132,6 @@ pub fn connect(r: HttpRequest, state: web::Data<State>, stream: web::Payload) ->
|
||||
None => None,
|
||||
};
|
||||
|
||||
// state.pubsub.try_send(WsEvent(account.clone())).or(Err(MnmlError::ServerError))?;
|
||||
ws::start(MnmlSocket::new(state, account), &r, stream).or(Err(MnmlError::ServerError))
|
||||
// state.pubsub.try_send(WsEvent(account.clone())).or(Err(MnmlHttpError::ServerError))?;
|
||||
ws::start(MnmlSocket::new(state, account), &r, stream).or(Err(MnmlHttpError::ServerError))
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user