From 50487422367cc61def89d952887f9ada61312afd Mon Sep 17 00:00:00 2001 From: ntr Date: Sun, 16 Jun 2019 22:40:15 +1000 Subject: [PATCH] pubsub --- server/src/main.rs | 1 + server/src/net.rs | 12 ++++++---- server/src/pubsub.rs | 52 ++++++++++++++++++++++++++++++++++++++++++++ server/src/warden.rs | 22 +------------------ 4 files changed, 62 insertions(+), 25 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 5437670e..e35a6777 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -32,6 +32,7 @@ mod names; mod net; mod player; mod rpc; +mod pubsub; mod skill; mod effect; mod spec; diff --git a/server/src/net.rs b/server/src/net.rs index 1afee3ac..a24ff8be 100644 --- a/server/src/net.rs +++ b/server/src/net.rs @@ -1,6 +1,5 @@ use std::time::{Instant, Duration}; use std::env; -use std::thread::spawn; use serde_cbor::{to_vec}; @@ -18,6 +17,7 @@ use r2d2_postgres::{TlsMode, PostgresConnectionManager}; use rpc::{receive, RpcResult, RpcErrorResponse, AccountLoginParams, AccountCreateParams}; use warden::{Warden}; +use pubsub::PubSub; use account::{Account, account_login, account_create, account_from_token, account_set_token}; pub type Db = PooledConnection; @@ -267,8 +267,12 @@ pub fn start() { .expect("DATABASE_URL must be set"); let pool = create_pool(database_url); - let sys = System::new("warden"); - let warden_addr = Warden::new(pool.clone()).start(); + let sys = System::new("mnml"); + + Warden::new(pool.clone()).start(); + + let pubsub_conn = pool.get().expect("could not get pubsub pg connection"); + let pubsub_addr = Supervisor::start(move |_| PubSub::new(pubsub_conn)); match env::var("DEV_CORS") { Ok(_) => { @@ -294,5 +298,5 @@ pub fn start() { .service(web::resource("/ws/").route(web::get().to(connect)))) .bind("127.0.0.1:40000").expect("could not bind to port") .run().expect("could not start http server"), - } + }; } diff --git a/server/src/pubsub.rs b/server/src/pubsub.rs index e69de29b..d812fc4a 100644 --- a/server/src/pubsub.rs +++ b/server/src/pubsub.rs @@ -0,0 +1,52 @@ +// Db Commons +use fallible_iterator::{FallibleIterator}; + +use actix::prelude::*; + +use net::{Db}; + +pub struct PubSub { + connection: Db, +} + +impl Actor for PubSub { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + self.pg_listen(ctx); + self.check_events(ctx); + } +} + +impl Supervised for PubSub { + fn restarting(&mut self, ctx: &mut Context) { + warn!("pubsub restarting"); + } +} + +impl PubSub { + pub fn new(connection: Db) -> PubSub { + PubSub { connection } + } + + fn pg_listen(&mut self, ctx: &mut ::Context) { + self.connection.execute("LISTEN events;", &[]).expect("unable to subscribe to pg events"); + info!("pubsub listening"); + } + + fn check_events(&mut self, ctx: &mut ::Context) { + let notifications = self.connection.notifications(); + let mut n_iter = notifications.iter(); + loop { + match n_iter.next() { + Ok(r) => { + if let Some(n) = r { + info!("{:?}", n); + } + } + Err(e) => warn!("{:?}", e), + } + } + } + +} diff --git a/server/src/warden.rs b/server/src/warden.rs index 07b179fd..8fa4a886 100644 --- a/server/src/warden.rs +++ b/server/src/warden.rs @@ -1,7 +1,6 @@ use std::time::{Instant, Duration}; // Db Commons -use fallible_iterator::{FallibleIterator, IntoFallibleIterator}; use postgres::transaction::Transaction; use failure::Error; @@ -53,7 +52,6 @@ impl Actor for Warden { fn started(&mut self, ctx: &mut Self::Context) { self.upkeep(ctx); - self.subscribe(ctx); } } @@ -68,6 +66,7 @@ impl Warden { // it starts the heartbeat interval and keepalive fn upkeep(&mut self, ctx: &mut ::Context) { ctx.run_interval(UPKEEP_INTERVAL, |act, _ctx| { + debug!("upkeep starting"); let db = act.pool.get().expect("warden could not get db connection"); fetch_games(db.transaction().expect("warden tx failure")) @@ -79,23 +78,4 @@ impl Warden { .commit().expect("warden instances commit failure"); }); } - - fn subscribe(&mut self, ctx: &mut ::Context) { - let db = self.pool.get().expect("warden could not get db connection"); - - db.execute("LISTEN events;", &[]).expect("unable to subscribe to pg events"); - let notifications = db.notifications(); - let mut n_iter = notifications.iter(); - - loop { - match n_iter.next() { - Ok(r) => { - if let Some(n) = r { - info!("{:?}", n); - } - } - Err(e) => warn!("{:?}", e), - } - } - } }