This commit is contained in:
ntr 2019-07-11 20:00:39 +10:00
parent e06f97252b
commit 7b50bfaffb
5 changed files with 87 additions and 32 deletions

View File

@ -12,6 +12,7 @@ use names::{name as generate_name};
use construct::{Construct, construct_recover, construct_spawn}; use construct::{Construct, construct_recover, construct_spawn};
use instance::{Instance, instance_delete}; use instance::{Instance, instance_delete};
use mtx::{Mtx, FREE_MTX}; use mtx::{Mtx, FREE_MTX};
use pg::Db;
use failure::Error; use failure::Error;
use failure::{err_msg, format_err}; use failure::{err_msg, format_err};
@ -48,7 +49,7 @@ pub fn select(tx: &mut Transaction, id: Uuid) -> Result<Account, Error> {
Ok(Account { id, name: row.get(1), credits, subscribed }) Ok(Account { id, name: row.get(1), credits, subscribed })
} }
pub fn from_token(tx: &mut Transaction, token: String) -> Result<Account, Error> { pub fn from_token(db: &Db, token: String) -> Result<Account, Error> {
let query = " let query = "
SELECT id, name, subscribed, credits SELECT id, name, subscribed, credits
FROM accounts FROM accounts
@ -56,7 +57,7 @@ pub fn from_token(tx: &mut Transaction, token: String) -> Result<Account, Error>
AND token_expiry > now(); AND token_expiry > now();
"; ";
let result = tx let result = db
.query(query, &[&token])?; .query(query, &[&token])?;
let row = result.iter().next() let row = result.iter().next()

View File

@ -25,6 +25,7 @@ extern crate persistent;
extern crate router; extern crate router;
extern crate cookie; extern crate cookie;
extern crate tungstenite; extern crate tungstenite;
extern crate crossbeam_channel;
mod account; mod account;
mod construct; mod construct;

View File

@ -137,7 +137,7 @@ fn login_res(token: String) -> Response {
let v = Cookie::build(TOKEN_HEADER, token) let v = Cookie::build(TOKEN_HEADER, token)
.http_only(true) .http_only(true)
.same_site(SameSite::Strict) .same_site(SameSite::Strict)
.max_age(Duration::seconds(-1)) // 1 week aligns with db set .max_age(Duration::weeks(1)) // 1 week aligns with db set
.finish(); .finish();
let mut res = Response::with(status::Ok); let mut res = Response::with(status::Ok);

View File

@ -66,7 +66,7 @@ enum RpcRequest {
VboxReclaim { instance_id: Uuid, index: usize }, VboxReclaim { instance_id: Uuid, index: usize },
} }
pub fn receive(data: Vec<u8>, db: &Db, _client: &mut WebSocket<TcpStream>, begin: Instant, account: Option<&Account>) -> Result<RpcResult, Error> { pub fn receive(data: Vec<u8>, db: &Db, _client: &mut WebSocket<TcpStream>, begin: Instant, account: &Option<Account>) -> Result<RpcResult, Error> {
// cast the msg to this type to receive method name // cast the msg to this type to receive method name
match from_slice::<RpcRequest>(&data) { match from_slice::<RpcRequest>(&data) {
Ok(v) => { Ok(v) => {

View File

@ -1,10 +1,16 @@
use std::time::{Instant}; use std::time::{Instant};
use std::net::{TcpListener}; use std::net::{TcpListener};
use std::thread::{spawn}; use std::thread::{spawn};
use std::str;
use cookie::Cookie;
use tungstenite::server::accept_hdr; use tungstenite::server::accept_hdr;
use tungstenite::Message::Binary; use tungstenite::Message::Binary;
use tungstenite::handshake::server::Request; use tungstenite::handshake::server::{Request, ErrorResponse};
use tungstenite::http::StatusCode;
use crossbeam_channel::{unbounded, Sender};
use failure::Error; use failure::Error;
use failure::err_msg; use failure::err_msg;
@ -12,8 +18,9 @@ use failure::err_msg;
use serde_cbor::{to_vec}; use serde_cbor::{to_vec};
use net::TOKEN_HEADER; use net::TOKEN_HEADER;
use rpc::{receive}; use rpc;
use pg::PgPool; use pg::PgPool;
use account;
#[derive(Debug,Clone,Serialize)] #[derive(Debug,Clone,Serialize)]
struct RpcError { struct RpcError {
@ -25,42 +32,88 @@ pub fn start(pool: PgPool) {
for stream in ws_server.incoming() { for stream in ws_server.incoming() {
let ws_pool = pool.clone(); let ws_pool = pool.clone();
spawn(move || { spawn(move || {
let (acc_s, acc_r) = unbounded();
let cb = |req: &Request| { let cb = |req: &Request| {
let token = req.headers.find_first(TOKEN_HEADER); let err = || ErrorResponse {
println!("{:?}", token); error_code: StatusCode::FORBIDDEN,
headers: None,
body: Some("Unauthorized".into()),
};
if let Some(cl) = req.headers.find_first("Cookie") {
let cookie_list = str::from_utf8(cl).or(Err(err()))?;
for s in cookie_list.split(";").map(|s| s.trim()) {
let cookie = Cookie::parse(s).or(Err(err()))?;
// got auth token
if cookie.name() == TOKEN_HEADER {
info!("{:?}", cookie.value().to_string());
acc_s.send(Some(cookie.value().to_string())).or(Err(err()))?;
}
};
};
acc_s.send(None).unwrap();
Ok(None) Ok(None)
}; };
let mut websocket = accept_hdr(stream.unwrap(), cb).unwrap(); let mut websocket = accept_hdr(stream.unwrap(), cb).unwrap();
let account = match acc_r.recv().unwrap() {
Some(t) => {
let db = ws_pool.get()
.expect("unable to get db connection");
match account::from_token(&db, t) {
Ok(a) => {
let state = to_vec(&rpc::RpcResult::AccountState(a.clone())).unwrap();
websocket.write_message(Binary(state)).unwrap();
Some(a)
},
Err(e) => {
warn!("{:?}", e);
return;
},
}
},
None => None,
};
loop { loop {
match websocket.read_message() { match websocket.read_message() {
Ok(msg) => { Ok(msg) => {
let begin = Instant::now(); match msg {
let db_connection = ws_pool.get() Binary(data) => {
.expect("unable to get db connection"); let begin = Instant::now();
let db_connection = ws_pool.get()
.expect("unable to get db connection");
let data = msg.into_data(); match rpc::receive(data, &db_connection, &mut websocket, begin, &account) {
match receive(data, &db_connection, &mut websocket, begin, None) { Ok(reply) => {
Ok(reply) => { let response = to_vec(&reply)
let response = to_vec(&reply) .expect("failed to serialize response");
.expect("failed to serialize response");
if let Err(e) = websocket.write_message(Binary(response)) { if let Err(e) = websocket.write_message(Binary(response)) {
// connection closed // connection closed
debug!("{:?}", e); debug!("{:?}", e);
return; return;
}; };
},
Err(e) => {
warn!("{:?}", e);
let response = to_vec(&RpcError { err: e.to_string() })
.expect("failed to serialize error response");
if let Err(e) = websocket.write_message(Binary(response)) {
// connection closed
debug!("{:?}", e);
return;
};
}
}
}, },
Err(e) => { _ => (),
warn!("{:?}", e);
let response = to_vec(&RpcError { err: e.to_string() })
.expect("failed to serialize error response");
if let Err(e) = websocket.write_message(Binary(response)) {
// connection closed
debug!("{:?}", e);
return;
};
}
} }
}, },
// connection is closed // connection is closed