carnnnn cunt

This commit is contained in:
ntr 2019-07-25 23:59:46 +10:00
parent 066c61c227
commit 2aafa76f66
4 changed files with 6 additions and 187 deletions

View File

@ -30,10 +30,7 @@ bodyparser = "0.8"
persistent = "0.4" persistent = "0.4"
router = "0.6" router = "0.6"
cookie = "0.12" cookie = "0.12"
tungstenite = "0.8"
tokio-tungstenite = "0.8"
futures = "0.1"
tokio = "0.1"
crossbeam-channel = "0.3" crossbeam-channel = "0.3"
ws = "0.8"
stripe-rust = { version = "0.10.4", features = ["webhooks"] } stripe-rust = { version = "0.10.4", features = ["webhooks"] }

View File

@ -25,12 +25,7 @@ extern crate persistent;
extern crate router; extern crate router;
extern crate cookie; extern crate cookie;
extern crate futures; extern crate ws;
extern crate tokio;
extern crate tokio_tungstenite;
extern crate tungstenite;
extern crate crossbeam_channel; extern crate crossbeam_channel;
mod account; mod account;
@ -54,14 +49,12 @@ mod spec;
mod util; mod util;
mod vbox; mod vbox;
mod warden; mod warden;
mod ws; mod websocket;
use std::thread::{sleep, spawn}; use std::thread::{sleep, spawn};
use std::time::{Duration}; use std::time::{Duration};
use std::path::{Path}; use std::path::{Path};
use futures::Future;
use events::{start as events_start}; use events::{start as events_start};
use warden::warden; use warden::warden;
@ -110,11 +103,8 @@ fn main() {
let http_pool = pool.clone(); let http_pool = pool.clone();
spawn(move || net::start(http_pool)); spawn(move || net::start(http_pool));
// this should go on a thread too?
let ws_pool = pool.clone(); let ws_pool = pool.clone();
let wss = ws::server(ws_pool, events); websocket::start(ws_pool, events);
tokio::runtime::run(wss.map_err(|_e| ()));
info!("server started"); info!("server started");
} }

View File

@ -33,6 +33,8 @@ pub enum RpcMessage {
Pong(()), Pong(()),
DevResolutions(Resolutions), DevResolutions(Resolutions),
Error(String),
} }
#[derive(Debug,Clone,Serialize,Deserialize)] #[derive(Debug,Clone,Serialize,Deserialize)]

View File

@ -1,170 +0,0 @@
use std::time::{Instant};
use std::str;
use uuid::Uuid;
use cookie::Cookie;
use futures::stream::Stream;
use futures::Future;
use tokio::net::TcpListener;
use tungstenite::handshake::server::{Request, ErrorResponse};
use tungstenite::http::StatusCode;
use tungstenite::protocol::Message;
use tokio_tungstenite::accept_hdr_async;
use crossbeam_channel::{unbounded};
use serde_cbor::{to_vec};
use std::io::{Error, ErrorKind};
use futures::Sink;
use net::TOKEN_HEADER;
use rpc;
use rpc::{RpcMessage};
use mtx;
use pg::PgPool;
use account;
use events::{Events};
#[derive(Debug,Clone,Serialize)]
struct RpcError {
err: String,
}
pub fn server(pool: PgPool, events: Events) -> impl Future<Item = (), Error = Error> {
let addr = "127.0.0.1:40055".parse().unwrap();
let ws_server = TcpListener::bind(&addr).unwrap();
let wss = ws_server.incoming().for_each(move |stream| {
let ws_pool = pool.clone();
let events_tx = events.tx.clone();
let (acc_s, acc_r) = unbounded();
// search through the ws request for the auth cookie
let cb = move |req: &Request| {
let err = || ErrorResponse {
error_code: StatusCode::FORBIDDEN,
headers: None,
body: Some("Unauthorized".into()),
};
if let Some(cl) = req.headers.find_first("Cookie") {
let cookie_list = str::from_utf8(cl).or(Err(err()))?;
for s in cookie_list.split(";").map(|s| s.trim()) {
let cookie = Cookie::parse(s).or(Err(err()))?;
// got auth token
if cookie.name() == TOKEN_HEADER {
acc_s.send(Some(cookie.value().to_string())).or(Err(err()))?;
}
};
};
acc_s.send(None).unwrap();
Ok(None)
};
accept_hdr_async(stream, cb).and_then(move |ws_stream| {
info!("new connection");
let (mut sink, stream) = ws_stream.split();
// get a copy of the account
let account = match acc_r.recv().unwrap() {
Some(t) => {
let db = ws_pool.get()
.expect("unable to get db connection");
match account::from_token(&db, t) {
Ok(a) => {
let state = to_vec(&rpc::RpcMessage::AccountState(a.clone())).unwrap();
sink.start_send(Message::Binary(state)).unwrap();
let mut tx = db.transaction().unwrap();
let shop = mtx::account_shop(&mut tx, &a).unwrap();
let shop = to_vec(&rpc::RpcMessage::AccountShop(shop)).unwrap();
sink.start_send(Message::Binary(shop)).unwrap();
// tx doesn't change anything
tx.commit().unwrap();
Some(a)
},
Err(e) => {
warn!("{:?}", e);
None
},
}
},
None => None,
};
let (tx, rx) = unbounded();
events_tx.try_send(WsMessage::Connect(tx)).unwrap();
let message_reader = stream
.filter(|msg| msg.is_binary())
.and_then(|message: Message| {
let begin = Instant::now();
let db_connection = ws_pool.get()
.expect("unable to get db connection");
match rpc::receive(message.into_data(), &db_connection, begin, &account) {
Ok(reply) => {
let response = to_vec(&reply)
.expect("failed to serialize response");
Ok(response);
},
Err(e) => {
warn!("{:?}", e);
let response = to_vec(&RpcError { err: e.to_string() })
.expect("failed to serialize error response");
Ok(response);
}
};
Ok(sink)
});
let events_reader = rx.map(|message: RpcMessage| {
let response = to_vec(&message)
.expect("failed to serialize event");
Ok(response);
});
let combined = message_reader.select(events_reader);
let outgoing = combined.fold(sink, |mut sink, message| {
sink.start_send(Message::Binary(message)).unwrap();
Ok(sink)
});
// if let Err(e) = sink.start_send(Message::Binary(response)) {
// // connection closed
// warn!("{:?}", e);
// return Err(e);
// };
tokio::spawn(outgoing.then(move |_| {
info!("connection closed");
Ok(())
}));
Ok(())
})
.map_err(|e| {
warn!("Error during the websocket handshake occurred: {}", e);
Error::new(ErrorKind::Other, e)
})
});
return wss;
}