pubsub
This commit is contained in:
parent
5c081f8d9f
commit
5048742236
@ -32,6 +32,7 @@ mod names;
|
|||||||
mod net;
|
mod net;
|
||||||
mod player;
|
mod player;
|
||||||
mod rpc;
|
mod rpc;
|
||||||
|
mod pubsub;
|
||||||
mod skill;
|
mod skill;
|
||||||
mod effect;
|
mod effect;
|
||||||
mod spec;
|
mod spec;
|
||||||
|
|||||||
@ -1,6 +1,5 @@
|
|||||||
use std::time::{Instant, Duration};
|
use std::time::{Instant, Duration};
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::thread::spawn;
|
|
||||||
|
|
||||||
use serde_cbor::{to_vec};
|
use serde_cbor::{to_vec};
|
||||||
|
|
||||||
@ -18,6 +17,7 @@ use r2d2_postgres::{TlsMode, PostgresConnectionManager};
|
|||||||
|
|
||||||
use rpc::{receive, RpcResult, RpcErrorResponse, AccountLoginParams, AccountCreateParams};
|
use rpc::{receive, RpcResult, RpcErrorResponse, AccountLoginParams, AccountCreateParams};
|
||||||
use warden::{Warden};
|
use warden::{Warden};
|
||||||
|
use pubsub::PubSub;
|
||||||
use account::{Account, account_login, account_create, account_from_token, account_set_token};
|
use account::{Account, account_login, account_create, account_from_token, account_set_token};
|
||||||
|
|
||||||
pub type Db = PooledConnection<PostgresConnectionManager>;
|
pub type Db = PooledConnection<PostgresConnectionManager>;
|
||||||
@ -267,8 +267,12 @@ pub fn start() {
|
|||||||
.expect("DATABASE_URL must be set");
|
.expect("DATABASE_URL must be set");
|
||||||
let pool = create_pool(database_url);
|
let pool = create_pool(database_url);
|
||||||
|
|
||||||
let sys = System::new("warden");
|
let sys = System::new("mnml");
|
||||||
let warden_addr = Warden::new(pool.clone()).start();
|
|
||||||
|
Warden::new(pool.clone()).start();
|
||||||
|
|
||||||
|
let pubsub_conn = pool.get().expect("could not get pubsub pg connection");
|
||||||
|
let pubsub_addr = Supervisor::start(move |_| PubSub::new(pubsub_conn));
|
||||||
|
|
||||||
match env::var("DEV_CORS") {
|
match env::var("DEV_CORS") {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
@ -294,5 +298,5 @@ pub fn start() {
|
|||||||
.service(web::resource("/ws/").route(web::get().to(connect))))
|
.service(web::resource("/ws/").route(web::get().to(connect))))
|
||||||
.bind("127.0.0.1:40000").expect("could not bind to port")
|
.bind("127.0.0.1:40000").expect("could not bind to port")
|
||||||
.run().expect("could not start http server"),
|
.run().expect("could not start http server"),
|
||||||
}
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,52 @@
|
|||||||
|
// Db Commons
|
||||||
|
use fallible_iterator::{FallibleIterator};
|
||||||
|
|
||||||
|
use actix::prelude::*;
|
||||||
|
|
||||||
|
use net::{Db};
|
||||||
|
|
||||||
|
pub struct PubSub {
|
||||||
|
connection: Db,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Actor for PubSub {
|
||||||
|
type Context = Context<Self>;
|
||||||
|
|
||||||
|
fn started(&mut self, ctx: &mut Self::Context) {
|
||||||
|
self.pg_listen(ctx);
|
||||||
|
self.check_events(ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Supervised for PubSub {
|
||||||
|
fn restarting(&mut self, ctx: &mut Context<PubSub>) {
|
||||||
|
warn!("pubsub restarting");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PubSub {
|
||||||
|
pub fn new(connection: Db) -> PubSub {
|
||||||
|
PubSub { connection }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pg_listen(&mut self, ctx: &mut <PubSub as Actor>::Context) {
|
||||||
|
self.connection.execute("LISTEN events;", &[]).expect("unable to subscribe to pg events");
|
||||||
|
info!("pubsub listening");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_events(&mut self, ctx: &mut <PubSub as Actor>::Context) {
|
||||||
|
let notifications = self.connection.notifications();
|
||||||
|
let mut n_iter = notifications.iter();
|
||||||
|
loop {
|
||||||
|
match n_iter.next() {
|
||||||
|
Ok(r) => {
|
||||||
|
if let Some(n) = r {
|
||||||
|
info!("{:?}", n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => warn!("{:?}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -1,7 +1,6 @@
|
|||||||
use std::time::{Instant, Duration};
|
use std::time::{Instant, Duration};
|
||||||
|
|
||||||
// Db Commons
|
// Db Commons
|
||||||
use fallible_iterator::{FallibleIterator, IntoFallibleIterator};
|
|
||||||
use postgres::transaction::Transaction;
|
use postgres::transaction::Transaction;
|
||||||
use failure::Error;
|
use failure::Error;
|
||||||
|
|
||||||
@ -53,7 +52,6 @@ impl Actor for Warden {
|
|||||||
|
|
||||||
fn started(&mut self, ctx: &mut Self::Context) {
|
fn started(&mut self, ctx: &mut Self::Context) {
|
||||||
self.upkeep(ctx);
|
self.upkeep(ctx);
|
||||||
self.subscribe(ctx);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -68,6 +66,7 @@ impl Warden {
|
|||||||
// it starts the heartbeat interval and keepalive
|
// it starts the heartbeat interval and keepalive
|
||||||
fn upkeep(&mut self, ctx: &mut <Warden as Actor>::Context) {
|
fn upkeep(&mut self, ctx: &mut <Warden as Actor>::Context) {
|
||||||
ctx.run_interval(UPKEEP_INTERVAL, |act, _ctx| {
|
ctx.run_interval(UPKEEP_INTERVAL, |act, _ctx| {
|
||||||
|
debug!("upkeep starting");
|
||||||
let db = act.pool.get().expect("warden could not get db connection");
|
let db = act.pool.get().expect("warden could not get db connection");
|
||||||
|
|
||||||
fetch_games(db.transaction().expect("warden tx failure"))
|
fetch_games(db.transaction().expect("warden tx failure"))
|
||||||
@ -79,23 +78,4 @@ impl Warden {
|
|||||||
.commit().expect("warden instances commit failure");
|
.commit().expect("warden instances commit failure");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn subscribe(&mut self, ctx: &mut <Warden as Actor>::Context) {
|
|
||||||
let db = self.pool.get().expect("warden could not get db connection");
|
|
||||||
|
|
||||||
db.execute("LISTEN events;", &[]).expect("unable to subscribe to pg events");
|
|
||||||
let notifications = db.notifications();
|
|
||||||
let mut n_iter = notifications.iter();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match n_iter.next() {
|
|
||||||
Ok(r) => {
|
|
||||||
if let Some(n) = r {
|
|
||||||
info!("{:?}", n);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => warn!("{:?}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user