This commit is contained in:
ntr 2018-09-03 01:14:05 +10:00
parent 0012b1b585
commit 9212cff67a
6 changed files with 87 additions and 546 deletions

View File

@ -4,14 +4,10 @@ version = "0.1.0"
authors = ["ntr <ntr@smokestack.io>"]
[dependencies]
rand = "0.5.4"
rand = "0.5"
uuid = { version = "0.5", features = ["serde", "v4"] }
tokio = "0.1"
tokio-core = "0.1"
futures = "0.1"
bytes = "0.4"
serde = "1"
serde_derive = "1"
serde_cbor = "0.9"
tarpc = "0.12.0"
tarpc-plugins = "0.4.0"
ws = "*"
env_logger = "*"

11
DIARY.md Executable file
View File

@ -0,0 +1,11 @@
## 02-09-2018
* went full circle through the last 20 years of the web's problems
* debated using vanilla tcp sockets but realised would be very time consuming
* Struggled a lot with google/tarpc
found the documentation absolutely mad, macros perform most of the functionality
couldn't find any way to keep server running, client stub appears to direcly rely on the server structs
needed a specific version of the rust nightly from several months ago to compile
* found wa-rs, hope was restored, had a websocket server up and running in seconds
* lost hope again when its client doesn't compile into wasm due to unix dependencies in mio
this also prevents any tokio based futures client from working
* realised i'd been reading very out of date documentation and there was plenty of work happening on `stdweb` and `cargo web`

View File

@ -1,474 +0,0 @@
//! A chat server that broadcasts a message to all connections.
//!
//! This example is explicitly more verbose than it has to be. This is to
//! illustrate more concepts.
//!
//! A chat server for telnet clients. After a telnet client connects, the first
//! line should contain the client's name. After that, all lines sent by a
//! client are broadcasted to all other connected clients.
//!
//! Because the client is telnet, lines are delimited by "\r\n".
//!
//! You can test this out by running:
//!
//! cargo run --example chat
//!
//! And then in another terminal run:
//!
//! telnet localhost 6142
//!
//! You can run the `telnet` command in any number of additional windows.
//!
//! You can run the second command in multiple windows and then chat between the
//! two, seeing the messages from the other client as they're received. For all
//! connected clients they'll all join the same room and see everyone else's
//! messages.
#![deny(warnings)]
extern crate tokio;
#[macro_use]
extern crate futures;
extern crate bytes;
use tokio::io;
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
use futures::sync::mpsc;
use futures::future::{self, Either};
use bytes::{BytesMut, Bytes, BufMut};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
/// Shorthand for the transmit half of the message channel.
type Tx = mpsc::UnboundedSender<Bytes>;
/// Shorthand for the receive half of the message channel.
type Rx = mpsc::UnboundedReceiver<Bytes>;
/// Data that is shared between all peers in the chat server.
///
/// This is the set of `Tx` handles for all connected clients. Whenever a
/// message is received from a client, it is broadcasted to all peers by
/// iterating over the `peers` entries and sending a copy of the message on each
/// `Tx`.
struct Shared {
peers: HashMap<SocketAddr, Tx>,
}
/// The state for each connected client.
struct Peer {
/// Name of the peer.
///
/// When a client connects, the first line sent is treated as the client's
/// name (like alice or bob). The name is used to preface all messages that
/// arrive from the client so that we can simulate a real chat server:
///
/// ```text
/// alice: Hello everyone.
/// bob: Welcome to telnet chat!
/// ```
name: BytesMut,
/// The TCP socket wrapped with the `Lines` codec, defined below.
///
/// This handles sending and receiving data on the socket. When using
/// `Lines`, we can work at the line level instead of having to manage the
/// raw byte operations.
lines: Lines,
/// Handle to the shared chat state.
///
/// This is used to broadcast messages read off the socket to all connected
/// peers.
state: Arc<Mutex<Shared>>,
/// Receive half of the message channel.
///
/// This is used to receive messages from peers. When a message is received
/// off of this `Rx`, it will be written to the socket.
rx: Rx,
/// Client socket address.
///
/// The socket address is used as the key in the `peers` HashMap. The
/// address is saved so that the `Peer` drop implementation can clean up its
/// entry.
addr: SocketAddr,
}
/// Line based codec
///
/// This decorates a socket and presents a line based read / write interface.
///
/// As a user of `Lines`, we can focus on working at the line level. So, we send
/// and receive values that represent entire lines. The `Lines` codec will
/// handle the encoding and decoding as well as reading from and writing to the
/// socket.
#[derive(Debug)]
struct Lines {
/// The TCP socket.
socket: TcpStream,
/// Buffer used when reading from the socket. Data is not returned from this
/// buffer until an entire line has been read.
rd: BytesMut,
/// Buffer used to stage data before writing it to the socket.
wr: BytesMut,
}
impl Shared {
/// Create a new, empty, instance of `Shared`.
fn new() -> Self {
Shared {
peers: HashMap::new(),
}
}
}
impl Peer {
/// Create a new instance of `Peer`.
fn new(name: BytesMut,
state: Arc<Mutex<Shared>>,
lines: Lines) -> Peer
{
// Get the client socket address
let addr = lines.socket.peer_addr().unwrap();
// Create a channel for this peer
let (tx, rx) = mpsc::unbounded();
// Add an entry for this `Peer` in the shared state map.
state.lock().unwrap()
.peers.insert(addr, tx);
Peer {
name,
lines,
state,
rx,
addr,
}
}
}
/// This is where a connected client is managed.
///
/// A `Peer` is also a future representing completely processing the client.
///
/// When a `Peer` is created, the first line (representing the client's name)
/// has already been read. When the socket closes, the `Peer` future completes.
///
/// While processing, the peer future implementation will:
///
/// 1) Receive messages on its message channel and write them to the socket.
/// 2) Receive messages from the socket and broadcast them to all peers.
///
impl Future for Peer {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
// Tokio (and futures) use cooperative scheduling without any
// preemption. If a task never yields execution back to the executor,
// then other tasks may be starved.
//
// To deal with this, robust applications should not have any unbounded
// loops. In this example, we will read at most `LINES_PER_TICK` lines
// from the client on each tick.
//
// If the limit is hit, the current task is notified, informing the
// executor to schedule the task again asap.
const LINES_PER_TICK: usize = 10;
// Receive all messages from peers.
for i in 0..LINES_PER_TICK {
// Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is
// safe.
match self.rx.poll().unwrap() {
Async::Ready(Some(v)) => {
// Buffer the line. Once all lines are buffered, they will
// be flushed to the socket (right below).
self.lines.buffer(&v);
// If this is the last iteration, the loop will break even
// though there could still be lines to read. Because we did
// not reach `Async::NotReady`, we have to notify ourselves
// in order to tell the executor to schedule the task again.
if i+1 == LINES_PER_TICK {
task::current().notify();
}
}
_ => break,
}
}
// Flush the write buffer to the socket
let _ = self.lines.poll_flush()?;
// Read new lines from the socket
while let Async::Ready(line) = self.lines.poll()? {
println!("Received line ({:?}) : {:?}", self.name, line);
if let Some(message) = line {
// Append the peer's name to the front of the line:
let mut line = self.name.clone();
line.extend_from_slice(b": ");
line.extend_from_slice(&message);
line.extend_from_slice(b"\r\n");
// We're using `Bytes`, which allows zero-copy clones (by
// storing the data in an Arc internally).
//
// However, before cloning, we must freeze the data. This
// converts it from mutable -> immutable, allowing zero copy
// cloning.
let line = line.freeze();
// Now, send the line to all other peers
for (addr, tx) in &self.state.lock().unwrap().peers {
// Don't send the message to ourselves
if *addr != self.addr {
// The send only fails if the rx half has been dropped,
// however this is impossible as the `tx` half will be
// removed from the map before the `rx` is dropped.
tx.unbounded_send(line.clone()).unwrap();
}
}
} else {
// EOF was reached. The remote client has disconnected. There is
// nothing more to do.
return Ok(Async::Ready(()));
}
}
// As always, it is important to not just return `NotReady` without
// ensuring an inner future also returned `NotReady`.
//
// We know we got a `NotReady` from either `self.rx` or `self.lines`, so
// the contract is respected.
Ok(Async::NotReady)
}
}
impl Drop for Peer {
fn drop(&mut self) {
self.state.lock().unwrap().peers
.remove(&self.addr);
}
}
impl Lines {
/// Create a new `Lines` codec backed by the socket
fn new(socket: TcpStream) -> Self {
Lines {
socket,
rd: BytesMut::new(),
wr: BytesMut::new(),
}
}
/// Buffer a line.
///
/// This writes the line to an internal buffer. Calls to `poll_flush` will
/// attempt to flush this buffer to the socket.
fn buffer(&mut self, line: &[u8]) {
// Ensure the buffer has capacity. Ideally this would not be unbounded,
// but to keep the example simple, we will not limit this.
self.wr.reserve(line.len());
// Push the line onto the end of the write buffer.
//
// The `put` function is from the `BufMut` trait.
self.wr.put(line);
}
/// Flush the write buffer to the socket
fn poll_flush(&mut self) -> Poll<(), io::Error> {
// As long as there is buffered data to write, try to write it.
while !self.wr.is_empty() {
// Try to write some bytes to the socket
let n = try_ready!(self.socket.poll_write(&self.wr));
// As long as the wr is not empty, a successful write should
// never write 0 bytes.
assert!(n > 0);
// This discards the first `n` bytes of the buffer.
let _ = self.wr.split_to(n);
}
Ok(Async::Ready(()))
}
/// Read data from the socket.
///
/// This only returns `Ready` when the socket has closed.
fn fill_read_buf(&mut self) -> Poll<(), io::Error> {
loop {
// Ensure the read buffer has capacity.
//
// This might result in an internal allocation.
self.rd.reserve(1024);
// Read data into the buffer.
let n = try_ready!(self.socket.read_buf(&mut self.rd));
if n == 0 {
return Ok(Async::Ready(()));
}
}
}
}
impl Stream for Lines {
type Item = BytesMut;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// First, read any new data that might have been received off the socket
let sock_closed = self.fill_read_buf()?.is_ready();
// Now, try finding lines
let pos = self.rd.windows(2).enumerate()
.find(|&(_, bytes)| bytes == b"\r\n")
.map(|(i, _)| i);
if let Some(pos) = pos {
// Remove the line from the read buffer and set it to `line`.
let mut line = self.rd.split_to(pos + 2);
// Drop the trailing \r\n
line.split_off(pos);
// Return the line
return Ok(Async::Ready(Some(line)));
}
if sock_closed {
Ok(Async::Ready(None))
} else {
Ok(Async::NotReady)
}
}
}
/// Spawn a task to manage the socket.
///
/// This will read the first line from the socket to identify the client, then
/// add the client to the set of connected peers in the chat service.
fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
// Wrap the socket with the `Lines` codec that we wrote above.
//
// By doing this, we can operate at the line level instead of doing raw byte
// manipulation.
let lines = Lines::new(socket);
// The first line is treated as the client's name. The client is not added
// to the set of connected peers until this line is received.
//
// We use the `into_future` combinator to extract the first item from the
// lines stream. `into_future` takes a `Stream` and converts it to a future
// of `(first, rest)` where `rest` is the original stream instance.
let connection = lines.into_future()
// `into_future` doesn't have the right error type, so map the error to
// make it work.
.map_err(|(e, _)| e)
// Process the first received line as the client's name.
.and_then(|(name, lines)| {
// If `name` is `None`, then the client disconnected without
// actually sending a line of data.
//
// Since the connection is closed, there is no further work that we
// need to do. So, we just terminate processing by returning
// `future::ok()`.
//
// The problem is that only a single future type can be returned
// from a combinator closure, but we want to return both
// `future::ok()` and `Peer` (below).
//
// This is a common problem, so the `futures` crate solves this by
// providing the `Either` helper enum that allows creating a single
// return type that covers two concrete future types.
let name = match name {
Some(name) => name,
None => {
// The remote client closed the connection without sending
// any data.
return Either::A(future::ok(()));
}
};
println!("`{:?}` is joining the chat", name);
// Create the peer.
//
// This is also a future that processes the connection, only
// completing when the socket closes.
let peer = Peer::new(
name,
state,
lines);
// Wrap `peer` with `Either::B` to make the return type fit.
Either::B(peer)
})
// Task futures have an error of type `()`, this ensures we handle the
// error. We do this by printing the error to STDOUT.
.map_err(|e| {
println!("connection error = {:?}", e);
});
// Spawn the task. Internally, this submits the task to a thread pool.
tokio::spawn(connection);
}
pub fn main() {
// Create the shared state. This is how all the peers communicate.
//
// The server task will hold a handle to this. For every new client, the
// `state` handle is cloned and passed into the task that processes the
// client connection.
let state = Arc::new(Mutex::new(Shared::new()));
let addr = "127.0.0.1:6142".parse().unwrap();
// Bind a TCP listener to the socket address.
//
// Note that this is the Tokio TcpListener, which is fully async.
let listener = TcpListener::bind(&addr).unwrap();
// The server task asynchronously iterates over and processes each
// incoming connection.
let server = listener.incoming().for_each(move |socket| {
// Spawn a task to process the connection
process(socket, state.clone());
Ok(())
})
.map_err(|err| {
// All tasks must have an `Error` type of `()`. This forces error
// handling and helps avoid silencing failures.
//
// In our example, we are only going to log the error to STDOUT.
println!("accept error = {:?}", err);
});
println!("server running on localhost:6142");
// Start the Tokio runtime.
//
// The Tokio is a pre-configured "out of the box" runtime for building
// asynchronous applications. It includes both a reactor and a task
// scheduler. This means applications are multithreaded by default.
//
// This function blocks until the runtime reaches an idle state. Idle is
// defined as all spawned tasks have completed and all I/O resources (TCP
// sockets in our case) have been dropped.
//
// In our example, we have not defined a shutdown strategy, so this will
// block until `ctrl-c` is pressed at the terminal.
tokio::run(server);
}

View File

@ -1,26 +1,72 @@
// needed by tarpc until it becomes stable
#![feature(plugin, use_extern_macros, proc_macro_path_invoc)]
#![plugin(tarpc_plugins)]
extern crate tokio_core;
#[macro_use] extern crate futures;
#[macro_use] extern crate tarpc;
extern crate rand;
extern crate uuid;
extern crate ws;
extern crate env_logger;
extern crate serde;
extern crate serde_cbor;
#[macro_use] extern crate serde_derive;
extern crate rand;
extern crate uuid;
#[macro_use]
extern crate serde_derive;
mod cryp;
mod combat;
mod battle;
mod skill;
mod net;
mod combat;
mod skill;
use net::run_server;
use std::rc::Rc;
use std::cell::Cell;
use ws::{listen, Handler, Sender, Result, Message, Handshake, CloseCode, Error};
use net::{generate};
struct Server {
out: Sender,
count: Rc<Cell<u32>>,
}
impl Handler for Server {
fn on_open(&mut self, _: Handshake) -> Result<()> {
// We have a new connection, so we increment the connection counter
println!("somebody joined");
Ok(self.count.set(self.count.get() + 1))
}
fn on_message(&mut self, msg: Message) -> Result<()> {
// Tell the user the current count
println!("The number of live connections is {}", self.count.get());
let reply = Message::binary(generate());
// Echo the message back
self.out.send(reply)
}
fn on_close(&mut self, code: CloseCode, reason: &str) {
match code {
CloseCode::Normal => println!("The client is done with the connection."),
CloseCode::Away => println!("The client is leaving the site."),
CloseCode::Abnormal => println!(
"Closing handshake failed! Unable to obtain closing status from client."),
_ => println!("The client encountered an error: {}", reason),
}
// The connection is going down, so we need to decrement the count
self.count.set(self.count.get() - 1)
}
fn on_error(&mut self, err: Error) {
println!("The server encountered an error: {:?}", err);
}
}
fn main() {
run_server()
// Cell gives us interior mutability so we can increment
// or decrement the count between handlers.
// Rc is a reference-counted box for sharing the count between handlers
// since each handler needs to own its contents.
let count = Rc::new(Cell::new(0));
listen("127.0.0.1:40000", |out| { Server { out: out, count: count.clone() } }).unwrap();
}

View File

@ -1,51 +1,16 @@
use futures::Future;
use tarpc::future::{client, server};
use tarpc::future::client::ClientExt;
use tarpc::util::{FirstSocketAddr, Never};
use tokio_core::reactor;
use cryp::{Cryp};
use skill::{Skill};
use serde_cbor::{to_vec};
service! {
rpc hello(name: String) -> String;
}
pub fn generate() -> Vec<u8> {
let level_two = Cryp::new()
.named("hatchling".to_string())
.level(2)
.learn(Skill::Stoney)
.create();
#[derive(Clone)]
struct HelloServer;
impl FutureService for HelloServer {
type HelloFut = Result<String, Never>;
fn hello(&self, name: String) -> Self::HelloFut {
Ok(format!("Hello, {}!", name))
}
}
pub fn run_server() {
let mut reactor = reactor::Core::new().unwrap();
let (mut handle, server) = HelloServer
.listen(
"localhost:10000".first_socket_addr(),
&reactor.handle(),
server::Options::default(),
)
.unwrap();
reactor.handle().spawn(server);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn connect() {
let mut reactor = reactor::Core::new().unwrap();
let options = client::Options::default().handle(reactor.handle());
reactor
.run(
FutureClient::connect("localhost:10000".first_socket_addr(), options)
.map_err(|e| panic!("{:?}", e))
.and_then(|client| client.hello("Mom".to_string()))
.map(|resp| println!("{}", resp)),
)
.unwrap();
match to_vec(&level_two) {
Ok(v) => v,
Err(e) => panic!("couldn't serialize cryp"),
}
}

View File

@ -1,3 +0,0 @@
pub fn start_server() {
}