notifications init

This commit is contained in:
ntr 2019-06-16 17:47:45 +10:00
parent 102a8e9817
commit 7dcbe06766
6 changed files with 119 additions and 52 deletions

View File

@ -197,9 +197,10 @@ function createSocket(events) {
events.setItemInfo(info);
}
let pongTimeout;
function onPong() {
events.setPing(Date.now() - ping);
setTimeout(sendPing, 1000);
pongTimeout = setTimeout(sendPing, 1000);
}
// -------------
@ -253,41 +254,52 @@ function createSocket(events) {
return handlers[msgType](params);
}
// Connection opened
function onOpen() {
toast.info({
message: 'connected',
position: 'topRight',
});
sendPing();
sendItemInfo();
return true;
}
function onError(event) {
console.error('WebSocket error', event);
}
function onClose(event) {
console.error('WebSocket closed', event);
toast.warning({
message: 'disconnected',
position: 'topRight',
});
return setTimeout(connect, 5000);
}
function connect() {
if (ws) {
clearGameStateTimeout();
clearInstanceStateTimeout();
clearTimeout(pongTimeout);
ws.removeEventListener('open', onOpen);
ws.removeEventListener('message', onMessage);
ws.removeEventListener('error', onError);
ws.removeEventListener('close', onClose);
ws = null;
}
ws = new WebSocket(SOCKET_URL);
ws.binaryType = 'arraybuffer';
// Connection opened
ws.addEventListener('open', () => {
toast.info({
message: 'connected',
position: 'topRight',
});
sendPing();
sendItemInfo();
return true;
});
// Listen for messages
ws.addEventListener('open', onOpen);
ws.addEventListener('message', onMessage);
ws.addEventListener('error', (event) => {
console.error('WebSocket error', event);
// account = null;
// return setTimeout(connect, 5000);
});
ws.addEventListener('close', (event) => {
console.error('WebSocket closed', event);
toast.warning({
message: 'disconnected',
position: 'topRight',
});
return setTimeout(connect, 5000);
});
ws.addEventListener('error', onError);
ws.addEventListener('close', onClose);
return ws;
}

View File

@ -5,7 +5,7 @@ exports.up = async knex => {
table.string('name', 42).notNullable().unique();
table.string('password').notNullable();
table.string('token', 64).notNullable();
table.timestamp('token_expiry');
table.timestamp('token_expiry').notNullable();
table.index('name');
table.index('id');

View File

@ -0,0 +1,41 @@
const notify = `
CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$
DECLARE
record RECORD;
id UUID;
payload JSON;
BEGIN
IF (TG_OP = 'DELETE') THEN
id = OLD.id;
ELSE
id = NEW.id;
END IF;
payload = json_build_object(
'table', TG_TABLE_NAME,
'action', TG_OP,
'id', id
);
PERFORM pg_notify('events', payload::text);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
`;
const trigger = table => `
CREATE TRIGGER notify_${table}_event
AFTER INSERT OR UPDATE OR DELETE ON ${table}
FOR EACH ROW EXECUTE PROCEDURE notify_event();
`;
exports.up = async knex => {
await knex.raw(notify);
await knex.raw(trigger('accounts'));
await knex.raw(trigger('games'));
await knex.raw(trigger('instances'));
};
exports.down = async () => {};

View File

@ -70,28 +70,29 @@ pub fn account_create(name: &String, password: &String, code: &String, tx: &mut
let rounds = 8;
let password = hash(&password, rounds)?;
let mut rng = thread_rng();
let token: String = iter::repeat(())
.map(|()| rng.sample(Alphanumeric))
.take(64)
.collect();
let query = "
INSERT INTO accounts (id, name, password)
VALUES ($1, $2, $3)
INSERT INTO accounts (id, name, password, token, token_expiry)
VALUES ($1, $2, $3, $4, now() + interval '1 week')
RETURNING id, name;
";
let result = tx
.query(query, &[&id, &name, &password])?;
.query(query, &[&id, &name, &password, &token])?;
let returned = match result.iter().next() {
match result.iter().next() {
Some(row) => row,
None => return Err(err_msg("account not created")),
};
info!("registration account={:?}", name);
let account = Account {
id: returned.get(0),
name: returned.get(1),
};
account_set_token(tx, &account)
Ok(token)
}
pub fn account_login(name: &String, password: &String, tx: &mut Transaction) -> Result<String, Error> {

View File

@ -3,13 +3,15 @@ use std::env;
use serde_cbor::{to_vec};
use actix::prelude::*;
use actix_web::{middleware, web, App, Error, HttpMessage, HttpRequest, HttpResponse, HttpServer};
use actix_web::{middleware, web, App, Error, HttpMessage, HttpRequest, HttpResponse, HttpServer, Responder};
use actix_web::middleware::cors::Cors;
use actix_web::error::ResponseError;
use actix_web::http::{StatusCode, Cookie};
use actix_web::cookie::{SameSite};
use actix_web_actors::ws;
use actix::prelude::*;
use actix::fut::ok;
use actix::fut::FutureResult;
use r2d2::{Pool};
use r2d2::{PooledConnection};
@ -107,7 +109,7 @@ impl MnmlSocket {
fn hb(&self, ctx: &mut <MnmlSocket as Actor>::Context) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
info!("Websocket Client heartbeat failed, disconnecting!");
info!("idle connection terminated");
// stop actor
ctx.stop();
@ -134,9 +136,20 @@ enum MnmlError {
impl ResponseError for MnmlError {
fn error_response(&self) -> HttpResponse {
match *self {
MnmlError::ServerError => HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR),
MnmlError::Unauthorized => HttpResponse::new(StatusCode::UNAUTHORIZED),
MnmlError::BadRequest => HttpResponse::new(StatusCode::BAD_REQUEST),
MnmlError::ServerError => HttpResponse::InternalServerError()
.json(RpcErrorResponse { err: "server error".to_string() }),
MnmlError::BadRequest => HttpResponse::BadRequest()
.json(RpcErrorResponse { err: "bad request ".to_string() }),
MnmlError::Unauthorized => HttpResponse::Unauthorized()
.cookie(Cookie::build("x-auth-token", "")
// .secure(secure)
.http_only(true)
.same_site(SameSite::Strict)
.max_age(-1) // 1 week aligns with db set
.finish())
.json(RpcErrorResponse { err: "unauthorized ".to_string() }),
}
}
}
@ -163,7 +176,7 @@ fn connect(r: HttpRequest, state: web::Data<State>, stream: web::Payload) -> Res
ws::start(MnmlSocket::new(state, account), &r, stream)
}
fn token_res(token: String, secure: bool) -> HttpResponse {
fn login_res(token: String, secure: bool) -> HttpResponse {
HttpResponse::Ok()
.cookie(Cookie::build("x-auth-token", token)
.secure(secure)
@ -174,7 +187,7 @@ fn token_res(token: String, secure: bool) -> HttpResponse {
.finish()
}
fn token_clear() -> HttpResponse {
fn logout_res() -> HttpResponse {
HttpResponse::Ok()
.cookie(Cookie::build("x-auth-token", "")
// .secure(secure)
@ -192,7 +205,7 @@ fn login(state: web::Data<State>, params: web::Json::<AccountLoginParams>) -> Re
match account_login(&params.name, &params.password, &mut tx) {
Ok(token) => {
tx.commit().or(Err(MnmlError::ServerError))?;
Ok(token_res(token, state.secure))
Ok(login_res(token, state.secure))
},
Err(e) => {
info!("{:?}", e);
@ -210,7 +223,7 @@ fn logout(r: HttpRequest, state: web::Data<State>) -> Result<HttpResponse, MnmlE
Ok(a) => {
account_set_token(&mut tx, &a).or(Err(MnmlError::Unauthorized))?;
tx.commit().or(Err(MnmlError::ServerError))?;
return Ok(token_clear());
return Ok(logout_res());
},
Err(_) => Err(MnmlError::Unauthorized),
}
@ -226,7 +239,7 @@ fn register(state: web::Data<State>, params: web::Json::<AccountCreateParams>) -
match account_create(&params.name, &params.password, &params.code, &mut tx) {
Ok(token) => {
tx.commit().or(Err(MnmlError::ServerError))?;
Ok(token_res(token, state.secure))
Ok(login_res(token, state.secure))
},
Err(e) => {
info!("{:?}", e);

0
server/src/pubsub.rs Normal file
View File