diff --git a/ops/migrations/20190624170147_stripe.js b/ops/migrations/20190624170147_stripe.js new file mode 100644 index 00000000..fc53b0a3 --- /dev/null +++ b/ops/migrations/20190624170147_stripe.js @@ -0,0 +1,73 @@ +// INSERT into stripe_customers (account, customer, checkout) +// INSERT into stripe_subscriptions (account, customer, checkout, subscription) +// INSERT into stripe_purchases (account, customer, checkout, amount) + +exports.up = async knex => { + await knex.schema.createTable('stripe_customers', table => { + table.string('customer', 64) + .primary(); + + table.uuid('account') + .notNullable() + .index(); + + table.foreign('account') + .references('id') + .inTable('accounts') + .onDelete('RESTRICT'); + + table.string('checkout', 64) + .notNullable() + .unique(); + + table.timestamps(true, true); + }); + + await knex.schema.createTable('stripe_subscriptions', table => { + table.string('subscription', 64) + .primary(); + + table.uuid('account') + .notNullable() + .index(); + + table.foreign('account') + .references('id') + .inTable('accounts') + .onDelete('RESTRICT'); + + table.string('customer', 64) + .notNullable(); + + table.timestamps(true, true); + }); + + await knex.schema.createTable('stripe_purchases', table => { + table.string('checkout', 64) + .primary(); + + table.uuid('account') + .notNullable() + .index(); + + table.foreign('account') + .references('id') + .inTable('accounts') + .onDelete('RESTRICT'); + + table.string('customer', 64) + .notNullable(); + + table.bigInteger('amount') + .notNullable(); + + table.timestamps(true, true); + }); + + await knex.schema.raw(` + ALTER TABLE stripe_purchases + ADD CHECK (amount > 0); + `); +}; + +exports.down = async () => {}; \ No newline at end of file diff --git a/server/Cargo.toml b/server/Cargo.toml index eaf3216f..6c9626eb 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -32,6 +32,3 @@ actix-web-actors = "1.0.0" actix-cors = "0.1.0" stripe-rust = { version = "0.10", features = ["webhooks"] } - -[patch.crates-io] -stripe-rust = { git = "https://github.com/margh/stripe-rs.git" } diff --git a/server/src/main.rs b/server/src/main.rs index ff03e820..ee55392a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -31,7 +31,7 @@ mod game; mod instance; mod item; mod mob; -mod mtx; +mod payments; mod names; mod net; mod player; diff --git a/server/src/mtx.rs b/server/src/mtx.rs deleted file mode 100644 index 8d3d1c21..00000000 --- a/server/src/mtx.rs +++ /dev/null @@ -1,121 +0,0 @@ -use uuid::Uuid; -use actix_web::{web, HttpResponse}; -use actix::prelude::*; - -use stripe::{Event, EventObject, CheckoutSession}; - -use net::{State, PgPool, MnmlError}; - -#[derive(Debug,Clone,Copy,Serialize,Deserialize)] -enum Mtx { - Subscription, - Currency, -} - -#[derive(Debug,Clone,Serialize,Deserialize)] -struct Order { - account: Uuid, - customer: String, -} - -fn process_stripe_checkout(session: CheckoutSession) -> Result, MnmlError> { - let account = match session.client_reference_id { - Some(a) => Uuid::parse_str(&a).or(Err(MnmlError::NoUser))?, - None => { - warn!("unknown user checkout {:?}", session); - return Err(MnmlError::NoUser) - }, - }; - - // checkout.session.completed - // assign stripe customer_id to account - - // if subscription - // go get it - // set account sub to active and end date - - // if just bits purchase - - - Ok(vec![]) -} - -pub struct PaymentProcessor { - pool: PgPool, -} - -impl Actor for PaymentProcessor { - type Context = Context; - - fn started(&mut self, _ctx: &mut Self::Context) { - info!("PaymentProcessor started"); - } - - fn stopped(&mut self, _ctx: &mut Context) { - println!("PaymentProcessor is stopped"); - } -} - -impl Supervised for PaymentProcessor { - fn restarting(&mut self, _ctx: &mut Context) { - warn!("PaymentProcessor restarting"); - } -} - -impl PaymentProcessor { - pub fn new(pool: PgPool) -> PaymentProcessor { - PaymentProcessor { pool } - } -} - -struct StripeEvent(Event); -impl Message for StripeEvent { - type Result = Result, MnmlError>; -} - -impl Handler for PaymentProcessor { - type Result = Result, MnmlError>; - - fn handle(&mut self, msg: StripeEvent, _: &mut Context) -> Self::Result { - let event = msg.0; - match event.data.object { - EventObject::CheckoutSession(s) => process_stripe_checkout(s), - _ => { - warn!("unhandled stripe event {:?}", event); - Err(MnmlError::ServerError) - }, - } - } -} - -pub fn post_stripe_event(state: web::Data, body: web::Json::) -> Result { - let event: Event = body.into_inner(); - info!("stripe event {:?}", event); - state.payments.try_send(StripeEvent(event)).or(Err(MnmlError::ServerError))?; - Ok(HttpResponse::Accepted().finish()) -} - -#[cfg(test)] -mod tests { - use super::*; - use std::io::prelude::*; - use std::fs::File; - - #[test] - fn test_stripe_checkout() { - let mut f = File::open("./test/checkout.session.completed.purchase.json").expect("couldn't open file"); - let mut checkout_str = String::new(); - f.read_to_string(&mut checkout_str) - .expect("unable to read file"); - - let event: Event = serde_json::from_str(&checkout_str) - .expect("could not deserialize"); - - let mtx = match event.data.object { - EventObject::CheckoutSession(s) => process_stripe_checkout(s), - _ => panic!("unknown event obj"), - }; - - println!("got some fuckin bling {:?}", mtx); - } -} \ No newline at end of file diff --git a/server/src/net.rs b/server/src/net.rs index e8b64c17..6516a0b8 100644 --- a/server/src/net.rs +++ b/server/src/net.rs @@ -16,9 +16,9 @@ use r2d2_postgres::{TlsMode, PostgresConnectionManager}; use rpc::{RpcErrorResponse, AccountLoginParams, AccountCreateParams}; use warden::{Warden}; use pubsub::{PubSub, pg_listen}; -use ws::{WsEvent, connect}; +use ws::{connect}; use account::{account_login, account_create, account_from_token, account_set_token}; -use mtx::{PaymentProcessor, post_stripe_event}; +use payments::{PaymentProcessor, post_stripe_event}; pub type Db = PooledConnection; pub type PgPool = Pool; @@ -26,30 +26,26 @@ pub type PgPool = Pool; const DB_POOL_SIZE: u32 = 20; #[derive(Fail, Debug)] -pub enum MnmlError { +pub enum MnmlHttpError { + // User Facing Errors #[fail(display="internal server error")] ServerError, #[fail(display="unauthorized")] Unauthorized, #[fail(display="bad request")] BadRequest, - - #[fail(display="missing user id")] - NoUser, - #[fail(display="unknown user")] - UnknownUser, } -impl ResponseError for MnmlError { +impl ResponseError for MnmlHttpError { fn error_response(&self) -> HttpResponse { match *self { - MnmlError::ServerError => HttpResponse::InternalServerError() + MnmlHttpError::ServerError => HttpResponse::InternalServerError() .json(RpcErrorResponse { err: "server error".to_string() }), - MnmlError::BadRequest => HttpResponse::BadRequest() + MnmlHttpError::BadRequest => HttpResponse::BadRequest() .json(RpcErrorResponse { err: "bad request ".to_string() }), - MnmlError::Unauthorized => HttpResponse::Unauthorized() + MnmlHttpError::Unauthorized => HttpResponse::Unauthorized() .cookie(Cookie::build("x-auth-token", "") // .secure(secure) .http_only(true) @@ -57,12 +53,6 @@ impl ResponseError for MnmlError { .max_age(-1) // 1 week aligns with db set .finish()) .json(RpcErrorResponse { err: "unauthorized ".to_string() }), - - MnmlError::UnknownUser => HttpResponse::BadRequest() - .json(RpcErrorResponse { err: "unknown user".to_string() }), - - MnmlError::NoUser => HttpResponse::BadRequest() - .json(RpcErrorResponse { err: "missing user id".to_string() }), } } } @@ -89,52 +79,52 @@ fn logout_res() -> HttpResponse { .finish() } -fn login(state: web::Data, params: web::Json::) -> Result { - let db = state.pool.get().or(Err(MnmlError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlError::ServerError))?; +fn login(state: web::Data, params: web::Json::) -> Result { + let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; + let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; match account_login(¶ms.name, ¶ms.password, &mut tx) { Ok(token) => { - tx.commit().or(Err(MnmlError::ServerError))?; + tx.commit().or(Err(MnmlHttpError::ServerError))?; Ok(login_res(token, state.secure)) }, Err(e) => { info!("{:?}", e); - Err(MnmlError::Unauthorized) + Err(MnmlHttpError::Unauthorized) } } } -fn logout(r: HttpRequest, state: web::Data) -> Result { +fn logout(r: HttpRequest, state: web::Data) -> Result { match r.cookie("x-auth-token") { Some(t) => { - let db = state.pool.get().or(Err(MnmlError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlError::ServerError))?; + let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; + let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; match account_from_token(t.value().to_string(), &mut tx) { Ok(a) => { - account_set_token(&mut tx, &a).or(Err(MnmlError::Unauthorized))?; - tx.commit().or(Err(MnmlError::ServerError))?; + account_set_token(&mut tx, &a).or(Err(MnmlHttpError::Unauthorized))?; + tx.commit().or(Err(MnmlHttpError::ServerError))?; return Ok(logout_res()); }, - Err(_) => Err(MnmlError::Unauthorized), + Err(_) => Err(MnmlHttpError::Unauthorized), } }, - None => Err(MnmlError::Unauthorized), + None => Err(MnmlHttpError::Unauthorized), } } -fn register(state: web::Data, params: web::Json::) -> Result { - let db = state.pool.get().or(Err(MnmlError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlError::ServerError))?; +fn register(state: web::Data, params: web::Json::) -> Result { + let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; + let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; match account_create(¶ms.name, ¶ms.password, ¶ms.code, &mut tx) { Ok(token) => { - tx.commit().or(Err(MnmlError::ServerError))?; + tx.commit().or(Err(MnmlHttpError::ServerError))?; Ok(login_res(token, state.secure)) }, Err(e) => { info!("{:?}", e); - Err(MnmlError::BadRequest) + Err(MnmlHttpError::BadRequest) } } } diff --git a/server/src/payments.rs b/server/src/payments.rs new file mode 100644 index 00000000..020b6347 --- /dev/null +++ b/server/src/payments.rs @@ -0,0 +1,255 @@ +use uuid::Uuid; +use actix_web::{web, HttpResponse}; +use actix::prelude::*; +use postgres::transaction::Transaction; + +use failure::Error; +use failure::err_msg; + +use stripe::{Event, EventObject, CheckoutSession}; + +use net::{State, PgPool, MnmlHttpError}; + +// Because the client_reference_id (account.id) is only included +// in the stripe CheckoutSession object +// we ensure that we store each object in pg with a link to the object +// and to the account id in case of refunds +#[derive(Debug,Clone,Serialize,Deserialize)] +enum StripeData { + Customer { account: Uuid, customer: String, checkout: String }, + Subscription { account: Uuid, customer: String, checkout: String, subscription: String, }, + + // i64 used because it converts to psql BIGINT + // expecting a similar system to be used for eth amounts + Purchase { account: Uuid, amount: i64, customer: String, checkout: String }, +} + +impl StripeData { + fn persist(&self, tx: &mut Transaction) -> Result<&StripeData, Error> { + match self { + StripeData::Customer { account, customer, checkout } => { + tx.execute(" + INSERT into stripe_customers (account, customer, checkout) + VALUES ($1, $2, $3); + ", &[&account, &customer, &checkout])?; + info!("new stripe customer {:?}", self); + Ok(self) + }, + + StripeData::Subscription { account, customer, checkout, subscription } => { + tx.execute(" + INSERT into stripe_subscriptions (account, customer, checkout, subscription) + VALUES ($1, $2, $3, $4); + ", &[&account, &customer, &checkout, &subscription])?; + info!("new stripe subscription {:?}", self); + Ok(self) + }, + + StripeData::Purchase { account, amount, customer, checkout } => { + tx.execute(" + INSERT into stripe_purchases (account, customer, checkout, amount) + VALUES ($1, $2, $3, $4); + ", &[&account, &customer, &checkout, amount])?; + info!("new stripe purchase {:?}", self); + Ok(self) + }, + } + } + + fn side_effects(&self, tx: &mut Transaction) -> Result<&StripeData, Error> { + match self { + StripeData::Subscription { subscription, account, customer, checkout } => { + // go get it + // set account sub to active and end date + Ok(self) + }, + StripeData::Purchase { account, customer, amount, checkout } => { + // create currency mtx and store + Ok(self) + }, + _ => Ok(self) + } + } + +} + +fn stripe_checkout_mtx(session: CheckoutSession) -> Result, Error> { + let account = match session.client_reference_id { + Some(ref a) => Uuid::parse_str(a)?, + None => { + error!("unknown user checkout {:?}", session); + return Err(err_msg("NoUser")) + }, + }; + + let mut items = vec![]; + let customer = session.customer.ok_or(err_msg("UnknownCustomer"))?; + let checkout = session.id; + + items.push(StripeData::Customer { account, customer: customer.id().to_string(), checkout: checkout.to_string() }); + + if let Some(sub) = session.subscription { + items.push(StripeData::Subscription { + account, + customer: customer.id().to_string(), + checkout: checkout.to_string(), + subscription: sub.id().to_string() + }); + } + + for item in session.display_items.into_iter() { + let amount = item.amount.ok_or(err_msg("NoPricePurchase"))? as i64; + items.push(StripeData::Purchase { account, amount, customer: customer.id().to_string(), checkout: checkout.to_string() }); + }; + + return Ok(items); +} + +pub struct PaymentProcessor { + pool: PgPool, +} + +impl Actor for PaymentProcessor { + type Context = Context; + + fn started(&mut self, _ctx: &mut Self::Context) { + info!("PaymentProcessor started"); + } + + fn stopped(&mut self, _ctx: &mut Context) { + println!("PaymentProcessor is stopped"); + } +} + +impl Supervised for PaymentProcessor { + fn restarting(&mut self, _ctx: &mut Context) { + warn!("PaymentProcessor restarting"); + } +} + +impl PaymentProcessor { + pub fn new(pool: PgPool) -> PaymentProcessor { + PaymentProcessor { pool } + } + + fn process_stripe(&mut self, msg: StripeEvent) -> Result, Error> { + let event = msg.0; + match event.data.object { + EventObject::CheckoutSession(s) => { + let data = match stripe_checkout_mtx(s) { + Ok(data) => data, + Err(e) => { + error!("{:?}", e); + return Err(e); + } + }; + + let connection = self.pool.get()?; + let mut tx = connection.transaction()?; + + for item in data.iter() { + item.side_effects(&mut tx)?; + item.persist(&mut tx)?; + } + + tx.commit()?; + Ok(data) + }, + _ => { + error!("unhandled stripe event {:?}", event); + Err(err_msg("UnhanldedEvent")) + }, + } + } + +} + +struct StripeEvent(Event); +impl Message for StripeEvent { + type Result = Result, Error>; +} + +impl Handler for PaymentProcessor { + type Result = Result, Error>; + + fn handle(&mut self, msg: StripeEvent, _ctx: &mut Context) -> Self::Result { + match self.process_stripe(msg) { + Ok(data) => Ok(data), + Err(e) => { + error!("{:?}", e); + Err(e) + } + } + } +} + +pub fn post_stripe_event(state: web::Data, body: web::Json::) -> Result { + let event: Event = body.into_inner(); + info!("stripe event {:?}", event); + state.payments.try_send(StripeEvent(event)).or(Err(MnmlHttpError::ServerError))?; + Ok(HttpResponse::Accepted().finish()) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::prelude::*; + use std::fs::File; + + #[test] + fn test_stripe_checkout_sku() { + let mut f = File::open("./test/checkout.session.completed.purchase.json").expect("couldn't open file"); + let mut checkout_str = String::new(); + f.read_to_string(&mut checkout_str) + .expect("unable to read file"); + + let event: Event = serde_json::from_str(&checkout_str) + .expect("could not deserialize"); + + let data = match event.data.object { + EventObject::CheckoutSession(s) => stripe_checkout_mtx(s).expect("purchase error"), + _ => panic!("unknown event obj"), + }; + + assert!(data.iter().any(|d| match d { + StripeData::Customer { account: _, customer: _, checkout: _ } => true, + _ => false, + })); + + assert!(data.iter().any(|d| match d { + StripeData::Purchase { account: _, amount: _, customer: _, checkout: _ } => true, + _ => false, + })); + } + + #[test] + fn test_stripe_checkout_subscription() { + let mut f = File::open("./test/checkout.session.completed.subscription.json").expect("couldn't open file"); + let mut checkout_str = String::new(); + f.read_to_string(&mut checkout_str) + .expect("unable to read file"); + + let event: Event = serde_json::from_str(&checkout_str) + .expect("could not deserialize"); + + let data = match event.data.object { + EventObject::CheckoutSession(s) => stripe_checkout_mtx(s).expect("subscription error"), + _ => panic!("unknown event obj"), + }; + + assert!(data.iter().any(|d| match d { + StripeData::Customer { account: _, customer: _, checkout: _ } => true, + _ => false, + })); + + assert!(data.iter().any(|d| match d { + StripeData::Purchase { account: _, amount: _, customer: _, checkout: _ } => true, + _ => false, + })); + + assert!(data.iter().any(|d| match d { + StripeData::Subscription { account: _, customer: _, checkout: _, subscription: _, } => true, + _ => false, + })); + } +} \ No newline at end of file diff --git a/server/src/pg.rs b/server/src/pg.rs new file mode 100644 index 00000000..fc416eb9 --- /dev/null +++ b/server/src/pg.rs @@ -0,0 +1,12 @@ +use r2d2::{Pool}; +use r2d2::{PooledConnection}; +use r2d2_postgres::{PostgresConnectionManager}; + +pub type Db = PooledConnection; +pub type PgPool = Pool; + +use postgres::transaction::Transaction; + +pub trait Pg { + fn persist(self, &mut Transaction) -> Self; +} diff --git a/server/src/pubsub.rs b/server/src/pubsub.rs index ecf8e063..efa009f6 100644 --- a/server/src/pubsub.rs +++ b/server/src/pubsub.rs @@ -42,7 +42,7 @@ impl Handler for PubSub { } pub fn pg_listen(connection: Db) -> Result<(), Error> { - connection.execute("LISTEN events;", &[]).expect("unable to subscribe to pg events"); + connection.execute("LISTEN events;", &[])?; info!("pubsub listening"); let notifications = connection.notifications(); let mut n_iter = notifications.blocking_iter(); diff --git a/server/src/ws.rs b/server/src/ws.rs index 6cf71cba..844c2b7d 100644 --- a/server/src/ws.rs +++ b/server/src/ws.rs @@ -6,7 +6,7 @@ use actix::prelude::*; use account::{Account, account_from_token}; use serde_cbor::{to_vec}; -use net::{PgPool, MnmlError, State}; +use net::{PgPool, State, MnmlHttpError}; use rpc::{receive, RpcResult, RpcErrorResponse}; @@ -119,11 +119,11 @@ impl Message for WsEvent { // the req obj itself which we need for cookies // the application state // and the websocket stream -pub fn connect(r: HttpRequest, state: web::Data, stream: web::Payload) -> Result { +pub fn connect(r: HttpRequest, state: web::Data, stream: web::Payload) -> Result { let account: Option = match r.cookie("x-auth-token") { Some(t) => { - let db = state.pool.get().or(Err(MnmlError::ServerError))?; - let mut tx = db.transaction().or(Err(MnmlError::ServerError))?; + let db = state.pool.get().or(Err(MnmlHttpError::ServerError))?; + let mut tx = db.transaction().or(Err(MnmlHttpError::ServerError))?; match account_from_token(t.value().to_string(), &mut tx) { Ok(a) => Some(a), Err(_) => None, @@ -132,6 +132,6 @@ pub fn connect(r: HttpRequest, state: web::Data, stream: web::Payload) -> None => None, }; - // state.pubsub.try_send(WsEvent(account.clone())).or(Err(MnmlError::ServerError))?; - ws::start(MnmlSocket::new(state, account), &r, stream).or(Err(MnmlError::ServerError)) + // state.pubsub.try_send(WsEvent(account.clone())).or(Err(MnmlHttpError::ServerError))?; + ws::start(MnmlSocket::new(state, account), &r, stream).or(Err(MnmlHttpError::ServerError)) }