mnml/server/src/payments.rs

375 lines
12 KiB
Rust

use std::env;
use std::io::Read;
use http::State;
use iron::prelude::*;
use iron::status;
use persistent::Read as Readable;
use uuid::Uuid;
use postgres::transaction::Transaction;
use failure::Error;
use failure::err_msg;
use stripe::{Client, Event, EventObject, CheckoutSession, SubscriptionStatus, Subscription};
use http::{MnmlHttpError};
use pg::{Db, PgPool};
use account;
use account::Account;
pub fn subscription_account(tx: &mut Transaction, sub: String) -> Result<Uuid, Error> {
let query = "
SELECT account, customer, checkout, subscription
FROM stripe_subscriptions
WHERE subscription = $1;
";
let result = tx
.query(query, &[&sub])?;
let row = result.iter().next()
.ok_or(err_msg("user not subscribed"))?;
Ok(row.get(0))
}
pub fn subscription_ending(tx: &mut Transaction, client: &Client, account: &Account, ending: bool) -> Result<Option<Subscription>, Error> {
let query = "
SELECT account, customer, checkout, subscription
FROM stripe_subscriptions
WHERE account = $1;
";
let result = tx
.query(query, &[&account.id])?;
let row = result.iter().next()
.ok_or(err_msg("user not subscribed"))?;
let _customer: String = row.get(1);
let _checkout: String = row.get(2);
let subscription: String = row.get(3);
let id = subscription.parse()?;
let mut params = stripe::UpdateSubscription::new();
params.cancel_at_period_end = Some(ending);
match stripe::Subscription::update(client, &id, params) {
Ok(s) => {
info!("subscription changed ending={:?} account={:?} subscription={:?}", ending, account, s);
Ok(Some(s))
},
Err(e) => {
warn!("{:?}", e);
Err(err_msg("unable to update subscription"))
}
}
}
pub fn account_subscription(db: &Db, client: &Client, account: &Account) -> Result<Option<Subscription>, Error> {
let query = "
SELECT account, customer, checkout, subscription
FROM stripe_subscriptions
WHERE account = $1;
";
let result = db
.query(query, &[&account.id])?;
let row = match result.iter().next() {
Some(r) => r,
None => return Ok(None),
};
let _customer: String = row.get(1);
let _checkout: String = row.get(2);
let subscription: String = row.get(3);
let id = subscription.parse()?;
match stripe::Subscription::retrieve(client, &id, &[]) {
Ok(s) => Ok(Some(s)),
Err(e) => {
warn!("{:?}", e);
Err(err_msg("unable to retrieve subscription"))
}
}
}
// we use i64 because it is converted to BIGINT for pg
// and we can losslessly pull it into u32 which is big
// enough for the ballers
const CREDITS_COST_CENTS: i64 = 10;
const CREDITS_SUB_BONUS: i64 = 40;
// Because the client_reference_id (account.id) is only included
// in the stripe CheckoutSession object
// we ensure that we store each object in pg with a link to the object
// and to the account id in case of refunds
#[derive(Debug,Clone,Serialize,Deserialize)]
pub enum StripeData {
Customer { account: Uuid, customer: String, checkout: String },
Subscription { account: Uuid, customer: String, checkout: String, subscription: String, },
// i64 used because it converts to psql BIGINT
// expecting a similar system to be used for eth amounts
Purchase { account: Uuid, amount: i64, customer: String, checkout: String },
}
impl StripeData {
fn insert(&self, tx: &mut Transaction) -> Result<&StripeData, Error> {
match self {
StripeData::Customer { account, customer, checkout } => {
tx.execute("
INSERT into stripe_customers (account, customer, checkout)
VALUES ($1, $2, $3);
", &[&account, &customer, &checkout])?;
info!("new stripe customer {:?}", self);
Ok(self)
},
StripeData::Subscription { account, customer, checkout, subscription } => {
tx.execute("
INSERT into stripe_subscriptions (account, customer, checkout, subscription)
VALUES ($1, $2, $3, $4);
", &[&account, &customer, &checkout, &subscription])?;
info!("new stripe subscription {:?}", self);
Ok(self)
},
StripeData::Purchase { account, amount, customer, checkout } => {
tx.execute("
INSERT into stripe_purchases (account, customer, checkout, amount)
VALUES ($1, $2, $3, $4);
", &[&account, &customer, &checkout, amount])?;
info!("new stripe purchase {:?}", self);
Ok(self)
},
}
}
fn side_effects(&self, tx: &mut Transaction) -> Result<&StripeData, Error> {
match self {
// when we get a subscription we just immediately set the user to be subbed
// so we don't have to deal with going to fetch all the details from
// stripe just to double check
// update webhooks will tell us when the subscription changes
// see EventObject::Subscription handler below
StripeData::Subscription { subscription: _, account, customer: _, checkout: _ } => {
account::credit(tx, *account, CREDITS_SUB_BONUS)?;
account::set_subscribed(tx, *account, true)?;
Ok(self)
},
StripeData::Purchase { account, customer: _, amount, checkout: _ } => {
let credits = match amount {
500 => 50,
1000 => 110,
2000 => 250,
5000 => 660,
_ => amount
.checked_div(CREDITS_COST_CENTS)
.expect("credits cost 0"),
};
account::credit(tx, *account, credits)?;
Ok(self)
},
_ => Ok(self),
}
}
}
fn stripe_checkout_data(session: CheckoutSession) -> Result<Vec<StripeData>, Error> {
let account = match session.client_reference_id {
Some(ref a) => Uuid::parse_str(a)?,
None => {
error!("unknown user checkout {:?}", session);
return Err(err_msg("NoUser"))
},
};
let mut items = vec![];
let customer = session.customer.ok_or(err_msg("UnknownCustomer"))?;
let checkout = session.id;
items.push(StripeData::Customer { account, customer: customer.id().to_string(), checkout: checkout.to_string() });
if let Some(sub) = session.subscription {
items.push(StripeData::Subscription {
account,
customer: customer.id().to_string(),
checkout: checkout.to_string(),
subscription: sub.id().to_string()
});
}
for item in session.display_items.into_iter() {
let amount = item.amount.ok_or(err_msg("NoPricePurchase"))? as i64;
items.push(StripeData::Purchase { account, amount, customer: customer.id().to_string(), checkout: checkout.to_string() });
};
return Ok(items);
}
fn process_stripe_event(event: Event, pool: &PgPool) -> Result<String, Error> {
info!("stripe event {:?}", event);
let connection = pool.get()?;
let mut tx = connection.transaction()?;
match event.data.object {
EventObject::CheckoutSession(s) => {
let data = match stripe_checkout_data(s) {
Ok(data) => data,
Err(e) => {
error!("{:?}", e);
return Err(e);
}
};
for item in data.iter() {
item.insert(&mut tx)?
.side_effects(&mut tx)?;
}
},
// we only receive the cancelled and updated events
// because the checkout object is needed to link
// a sub to an account initially and
// stripe doesn't guarantee the order
// so this just checks if the sub is still active
EventObject::Subscription(s) => {
let account = subscription_account(&mut tx, s.id.to_string())?;
let subbed = match s.status {
SubscriptionStatus::Active => true,
_ => false,
};
account::set_subscribed(&mut tx, account, subbed)?;
}
_ => {
error!("unhandled stripe event {:?}", event);
return Err(err_msg("UnhanldedEvent"));
},
};
tx.commit()?;
Ok(event.id.to_string())
}
pub fn stripe(req: &mut Request) -> IronResult<Response> {
let webhook_secret = env::var("STRIPE_WH_SECRET").ok();
let state= req.get::<Readable<State>>().unwrap();
let event = match webhook_secret {
// prod mode
Some(s) => {
let mut payload = String::new();
req.body.read_to_string(&mut payload)
.or(Err(IronError::from(MnmlHttpError::BadRequest)))?;
let sig = match req.headers.get_raw("Stripe-Signature") {
Some(s) => String::from_utf8(s[0].clone())
.or(Err(IronError::from(MnmlHttpError::BadRequest)))?,
None => return Err(IronError::from(MnmlHttpError::BadRequest)),
};
stripe::Webhook::construct_event(payload, sig, s)
.or(Err(IronError::from(MnmlHttpError::BadRequest)))?
},
// dev server
None => match req.get::<bodyparser::Struct<Event>>() {
Ok(Some(b)) => b,
_ => return Err(IronError::from(MnmlHttpError::BadRequest)),
},
};
match process_stripe_event(event, &state.pool) {
Ok(id)=> {
info!("event processed successfully {:?}", id);
Ok(Response::with(status::Ok))
}
Err(e) => {
error!("{:?}", e);
Err(IronError::from(MnmlHttpError::ServerError))
}
}
}
pub fn stripe_client() -> Client {
let secret = env::var("STRIPE_SECRET")
.expect("STRIPE_SECRET must be set");
stripe::Client::new(secret)
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
#[test]
fn test_stripe_checkout_sku() {
let mut f = File::open("./test/checkout.session.completed.purchase.json").expect("couldn't open file");
let mut checkout_str = String::new();
f.read_to_string(&mut checkout_str)
.expect("unable to read file");
let event: Event = serde_json::from_str(&checkout_str)
.expect("could not deserialize");
let data = match event.data.object {
EventObject::CheckoutSession(s) => stripe_checkout_data(s).expect("purchase error"),
_ => panic!("unknown event obj"),
};
assert!(data.iter().any(|d| match d {
StripeData::Customer { account: _, customer: _, checkout: _ } => true,
_ => false,
}));
assert!(data.iter().any(|d| match d {
StripeData::Purchase { account: _, amount: _, customer: _, checkout: _ } => true,
_ => false,
}));
}
#[test]
fn test_stripe_checkout_subscription() {
let mut f = File::open("./test/checkout.session.completed.subscription.json").expect("couldn't open file");
let mut checkout_str = String::new();
f.read_to_string(&mut checkout_str)
.expect("unable to read file");
let event: Event = serde_json::from_str(&checkout_str)
.expect("could not deserialize");
let data = match event.data.object {
EventObject::CheckoutSession(s) => stripe_checkout_data(s).expect("subscription error"),
_ => panic!("unknown event obj"),
};
assert!(data.iter().any(|d| match d {
StripeData::Customer { account: _, customer: _, checkout: _ } => true,
_ => false,
}));
assert!(data.iter().any(|d| match d {
StripeData::Purchase { account: _, amount: _, customer: _, checkout: _ } => true,
_ => false,
}));
assert!(data.iter().any(|d| match d {
StripeData::Subscription { account: _, customer: _, checkout: _, subscription: _, } => true,
_ => false,
}));
}
}