From 9212cff67a33e314f86a6c9e00269a75f77513cd Mon Sep 17 00:00:00 2001 From: ntr Date: Mon, 3 Sep 2018 01:14:05 +1000 Subject: [PATCH] bleh --- Cargo.toml | 10 +- DIARY.md | 11 ++ src/chat.rs | 474 -------------------------------------------------- src/main.rs | 76 ++++++-- src/net.rs | 59 ++----- src/server.rs | 3 - 6 files changed, 87 insertions(+), 546 deletions(-) create mode 100755 DIARY.md delete mode 100644 src/chat.rs delete mode 100755 src/server.rs diff --git a/Cargo.toml b/Cargo.toml index 24149af3..edbb45ab 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,14 +4,10 @@ version = "0.1.0" authors = ["ntr "] [dependencies] -rand = "0.5.4" +rand = "0.5" uuid = { version = "0.5", features = ["serde", "v4"] } -tokio = "0.1" -tokio-core = "0.1" -futures = "0.1" -bytes = "0.4" serde = "1" serde_derive = "1" serde_cbor = "0.9" -tarpc = "0.12.0" -tarpc-plugins = "0.4.0" +ws = "*" +env_logger = "*" diff --git a/DIARY.md b/DIARY.md new file mode 100755 index 00000000..d32aa776 --- /dev/null +++ b/DIARY.md @@ -0,0 +1,11 @@ +## 02-09-2018 +* went full circle through the last 20 years of the web's problems +* debated using vanilla tcp sockets but realised would be very time consuming +* Struggled a lot with google/tarpc + found the documentation absolutely mad, macros perform most of the functionality + couldn't find any way to keep server running, client stub appears to direcly rely on the server structs + needed a specific version of the rust nightly from several months ago to compile +* found wa-rs, hope was restored, had a websocket server up and running in seconds +* lost hope again when its client doesn't compile into wasm due to unix dependencies in mio + this also prevents any tokio based futures client from working +* realised i'd been reading very out of date documentation and there was plenty of work happening on `stdweb` and `cargo web` diff --git a/src/chat.rs b/src/chat.rs deleted file mode 100644 index cb28ce72..00000000 --- a/src/chat.rs +++ /dev/null @@ -1,474 +0,0 @@ -//! A chat server that broadcasts a message to all connections. -//! -//! This example is explicitly more verbose than it has to be. This is to -//! illustrate more concepts. -//! -//! A chat server for telnet clients. After a telnet client connects, the first -//! line should contain the client's name. After that, all lines sent by a -//! client are broadcasted to all other connected clients. -//! -//! Because the client is telnet, lines are delimited by "\r\n". -//! -//! You can test this out by running: -//! -//! cargo run --example chat -//! -//! And then in another terminal run: -//! -//! telnet localhost 6142 -//! -//! You can run the `telnet` command in any number of additional windows. -//! -//! You can run the second command in multiple windows and then chat between the -//! two, seeing the messages from the other client as they're received. For all -//! connected clients they'll all join the same room and see everyone else's -//! messages. - -#![deny(warnings)] - -extern crate tokio; -#[macro_use] -extern crate futures; -extern crate bytes; - -use tokio::io; -use tokio::net::{TcpListener, TcpStream}; -use tokio::prelude::*; -use futures::sync::mpsc; -use futures::future::{self, Either}; -use bytes::{BytesMut, Bytes, BufMut}; - -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; - -/// Shorthand for the transmit half of the message channel. -type Tx = mpsc::UnboundedSender; - -/// Shorthand for the receive half of the message channel. -type Rx = mpsc::UnboundedReceiver; - -/// Data that is shared between all peers in the chat server. -/// -/// This is the set of `Tx` handles for all connected clients. Whenever a -/// message is received from a client, it is broadcasted to all peers by -/// iterating over the `peers` entries and sending a copy of the message on each -/// `Tx`. -struct Shared { - peers: HashMap, -} - -/// The state for each connected client. -struct Peer { - /// Name of the peer. - /// - /// When a client connects, the first line sent is treated as the client's - /// name (like alice or bob). The name is used to preface all messages that - /// arrive from the client so that we can simulate a real chat server: - /// - /// ```text - /// alice: Hello everyone. - /// bob: Welcome to telnet chat! - /// ``` - name: BytesMut, - - /// The TCP socket wrapped with the `Lines` codec, defined below. - /// - /// This handles sending and receiving data on the socket. When using - /// `Lines`, we can work at the line level instead of having to manage the - /// raw byte operations. - lines: Lines, - - /// Handle to the shared chat state. - /// - /// This is used to broadcast messages read off the socket to all connected - /// peers. - state: Arc>, - - /// Receive half of the message channel. - /// - /// This is used to receive messages from peers. When a message is received - /// off of this `Rx`, it will be written to the socket. - rx: Rx, - - /// Client socket address. - /// - /// The socket address is used as the key in the `peers` HashMap. The - /// address is saved so that the `Peer` drop implementation can clean up its - /// entry. - addr: SocketAddr, -} - -/// Line based codec -/// -/// This decorates a socket and presents a line based read / write interface. -/// -/// As a user of `Lines`, we can focus on working at the line level. So, we send -/// and receive values that represent entire lines. The `Lines` codec will -/// handle the encoding and decoding as well as reading from and writing to the -/// socket. -#[derive(Debug)] -struct Lines { - /// The TCP socket. - socket: TcpStream, - - /// Buffer used when reading from the socket. Data is not returned from this - /// buffer until an entire line has been read. - rd: BytesMut, - - /// Buffer used to stage data before writing it to the socket. - wr: BytesMut, -} - -impl Shared { - /// Create a new, empty, instance of `Shared`. - fn new() -> Self { - Shared { - peers: HashMap::new(), - } - } -} - -impl Peer { - /// Create a new instance of `Peer`. - fn new(name: BytesMut, - state: Arc>, - lines: Lines) -> Peer - { - // Get the client socket address - let addr = lines.socket.peer_addr().unwrap(); - - // Create a channel for this peer - let (tx, rx) = mpsc::unbounded(); - - // Add an entry for this `Peer` in the shared state map. - state.lock().unwrap() - .peers.insert(addr, tx); - - Peer { - name, - lines, - state, - rx, - addr, - } - } -} - -/// This is where a connected client is managed. -/// -/// A `Peer` is also a future representing completely processing the client. -/// -/// When a `Peer` is created, the first line (representing the client's name) -/// has already been read. When the socket closes, the `Peer` future completes. -/// -/// While processing, the peer future implementation will: -/// -/// 1) Receive messages on its message channel and write them to the socket. -/// 2) Receive messages from the socket and broadcast them to all peers. -/// -impl Future for Peer { - type Item = (); - type Error = io::Error; - - fn poll(&mut self) -> Poll<(), io::Error> { - // Tokio (and futures) use cooperative scheduling without any - // preemption. If a task never yields execution back to the executor, - // then other tasks may be starved. - // - // To deal with this, robust applications should not have any unbounded - // loops. In this example, we will read at most `LINES_PER_TICK` lines - // from the client on each tick. - // - // If the limit is hit, the current task is notified, informing the - // executor to schedule the task again asap. - const LINES_PER_TICK: usize = 10; - - // Receive all messages from peers. - for i in 0..LINES_PER_TICK { - // Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is - // safe. - match self.rx.poll().unwrap() { - Async::Ready(Some(v)) => { - // Buffer the line. Once all lines are buffered, they will - // be flushed to the socket (right below). - self.lines.buffer(&v); - - // If this is the last iteration, the loop will break even - // though there could still be lines to read. Because we did - // not reach `Async::NotReady`, we have to notify ourselves - // in order to tell the executor to schedule the task again. - if i+1 == LINES_PER_TICK { - task::current().notify(); - } - } - _ => break, - } - } - - // Flush the write buffer to the socket - let _ = self.lines.poll_flush()?; - - // Read new lines from the socket - while let Async::Ready(line) = self.lines.poll()? { - println!("Received line ({:?}) : {:?}", self.name, line); - - if let Some(message) = line { - // Append the peer's name to the front of the line: - let mut line = self.name.clone(); - line.extend_from_slice(b": "); - line.extend_from_slice(&message); - line.extend_from_slice(b"\r\n"); - - // We're using `Bytes`, which allows zero-copy clones (by - // storing the data in an Arc internally). - // - // However, before cloning, we must freeze the data. This - // converts it from mutable -> immutable, allowing zero copy - // cloning. - let line = line.freeze(); - - // Now, send the line to all other peers - for (addr, tx) in &self.state.lock().unwrap().peers { - // Don't send the message to ourselves - if *addr != self.addr { - // The send only fails if the rx half has been dropped, - // however this is impossible as the `tx` half will be - // removed from the map before the `rx` is dropped. - tx.unbounded_send(line.clone()).unwrap(); - } - } - } else { - // EOF was reached. The remote client has disconnected. There is - // nothing more to do. - return Ok(Async::Ready(())); - } - } - - // As always, it is important to not just return `NotReady` without - // ensuring an inner future also returned `NotReady`. - // - // We know we got a `NotReady` from either `self.rx` or `self.lines`, so - // the contract is respected. - Ok(Async::NotReady) - } -} - -impl Drop for Peer { - fn drop(&mut self) { - self.state.lock().unwrap().peers - .remove(&self.addr); - } -} - -impl Lines { - /// Create a new `Lines` codec backed by the socket - fn new(socket: TcpStream) -> Self { - Lines { - socket, - rd: BytesMut::new(), - wr: BytesMut::new(), - } - } - - /// Buffer a line. - /// - /// This writes the line to an internal buffer. Calls to `poll_flush` will - /// attempt to flush this buffer to the socket. - fn buffer(&mut self, line: &[u8]) { - // Ensure the buffer has capacity. Ideally this would not be unbounded, - // but to keep the example simple, we will not limit this. - self.wr.reserve(line.len()); - - // Push the line onto the end of the write buffer. - // - // The `put` function is from the `BufMut` trait. - self.wr.put(line); - } - - /// Flush the write buffer to the socket - fn poll_flush(&mut self) -> Poll<(), io::Error> { - // As long as there is buffered data to write, try to write it. - while !self.wr.is_empty() { - // Try to write some bytes to the socket - let n = try_ready!(self.socket.poll_write(&self.wr)); - - // As long as the wr is not empty, a successful write should - // never write 0 bytes. - assert!(n > 0); - - // This discards the first `n` bytes of the buffer. - let _ = self.wr.split_to(n); - } - - Ok(Async::Ready(())) - } - - /// Read data from the socket. - /// - /// This only returns `Ready` when the socket has closed. - fn fill_read_buf(&mut self) -> Poll<(), io::Error> { - loop { - // Ensure the read buffer has capacity. - // - // This might result in an internal allocation. - self.rd.reserve(1024); - - // Read data into the buffer. - let n = try_ready!(self.socket.read_buf(&mut self.rd)); - - if n == 0 { - return Ok(Async::Ready(())); - } - } - } -} - -impl Stream for Lines { - type Item = BytesMut; - type Error = io::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - // First, read any new data that might have been received off the socket - let sock_closed = self.fill_read_buf()?.is_ready(); - - // Now, try finding lines - let pos = self.rd.windows(2).enumerate() - .find(|&(_, bytes)| bytes == b"\r\n") - .map(|(i, _)| i); - - if let Some(pos) = pos { - // Remove the line from the read buffer and set it to `line`. - let mut line = self.rd.split_to(pos + 2); - - // Drop the trailing \r\n - line.split_off(pos); - - // Return the line - return Ok(Async::Ready(Some(line))); - } - - if sock_closed { - Ok(Async::Ready(None)) - } else { - Ok(Async::NotReady) - } - } -} - -/// Spawn a task to manage the socket. -/// -/// This will read the first line from the socket to identify the client, then -/// add the client to the set of connected peers in the chat service. -fn process(socket: TcpStream, state: Arc>) { - // Wrap the socket with the `Lines` codec that we wrote above. - // - // By doing this, we can operate at the line level instead of doing raw byte - // manipulation. - let lines = Lines::new(socket); - - // The first line is treated as the client's name. The client is not added - // to the set of connected peers until this line is received. - // - // We use the `into_future` combinator to extract the first item from the - // lines stream. `into_future` takes a `Stream` and converts it to a future - // of `(first, rest)` where `rest` is the original stream instance. - let connection = lines.into_future() - // `into_future` doesn't have the right error type, so map the error to - // make it work. - .map_err(|(e, _)| e) - // Process the first received line as the client's name. - .and_then(|(name, lines)| { - // If `name` is `None`, then the client disconnected without - // actually sending a line of data. - // - // Since the connection is closed, there is no further work that we - // need to do. So, we just terminate processing by returning - // `future::ok()`. - // - // The problem is that only a single future type can be returned - // from a combinator closure, but we want to return both - // `future::ok()` and `Peer` (below). - // - // This is a common problem, so the `futures` crate solves this by - // providing the `Either` helper enum that allows creating a single - // return type that covers two concrete future types. - let name = match name { - Some(name) => name, - None => { - // The remote client closed the connection without sending - // any data. - return Either::A(future::ok(())); - } - }; - - println!("`{:?}` is joining the chat", name); - - // Create the peer. - // - // This is also a future that processes the connection, only - // completing when the socket closes. - let peer = Peer::new( - name, - state, - lines); - - // Wrap `peer` with `Either::B` to make the return type fit. - Either::B(peer) - }) - // Task futures have an error of type `()`, this ensures we handle the - // error. We do this by printing the error to STDOUT. - .map_err(|e| { - println!("connection error = {:?}", e); - }); - - // Spawn the task. Internally, this submits the task to a thread pool. - tokio::spawn(connection); -} - -pub fn main() { - // Create the shared state. This is how all the peers communicate. - // - // The server task will hold a handle to this. For every new client, the - // `state` handle is cloned and passed into the task that processes the - // client connection. - let state = Arc::new(Mutex::new(Shared::new())); - - let addr = "127.0.0.1:6142".parse().unwrap(); - - // Bind a TCP listener to the socket address. - // - // Note that this is the Tokio TcpListener, which is fully async. - let listener = TcpListener::bind(&addr).unwrap(); - - // The server task asynchronously iterates over and processes each - // incoming connection. - let server = listener.incoming().for_each(move |socket| { - // Spawn a task to process the connection - process(socket, state.clone()); - Ok(()) - }) - .map_err(|err| { - // All tasks must have an `Error` type of `()`. This forces error - // handling and helps avoid silencing failures. - // - // In our example, we are only going to log the error to STDOUT. - println!("accept error = {:?}", err); - }); - - println!("server running on localhost:6142"); - - // Start the Tokio runtime. - // - // The Tokio is a pre-configured "out of the box" runtime for building - // asynchronous applications. It includes both a reactor and a task - // scheduler. This means applications are multithreaded by default. - // - // This function blocks until the runtime reaches an idle state. Idle is - // defined as all spawned tasks have completed and all I/O resources (TCP - // sockets in our case) have been dropped. - // - // In our example, we have not defined a shutdown strategy, so this will - // block until `ctrl-c` is pressed at the terminal. - tokio::run(server); -} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 144556fe..34e434f0 100755 --- a/src/main.rs +++ b/src/main.rs @@ -1,26 +1,72 @@ -// needed by tarpc until it becomes stable -#![feature(plugin, use_extern_macros, proc_macro_path_invoc)] -#![plugin(tarpc_plugins)] - -extern crate tokio_core; -#[macro_use] extern crate futures; -#[macro_use] extern crate tarpc; +extern crate rand; +extern crate uuid; +extern crate ws; +extern crate env_logger; extern crate serde; extern crate serde_cbor; -#[macro_use] extern crate serde_derive; - -extern crate rand; -extern crate uuid; +#[macro_use] +extern crate serde_derive; mod cryp; -mod combat; mod battle; -mod skill; mod net; +mod combat; +mod skill; -use net::run_server; +use std::rc::Rc; +use std::cell::Cell; +use ws::{listen, Handler, Sender, Result, Message, Handshake, CloseCode, Error}; + +use net::{generate}; + +struct Server { + out: Sender, + count: Rc>, +} + +impl Handler for Server { + + fn on_open(&mut self, _: Handshake) -> Result<()> { + // We have a new connection, so we increment the connection counter + println!("somebody joined"); + Ok(self.count.set(self.count.get() + 1)) + } + + fn on_message(&mut self, msg: Message) -> Result<()> { + // Tell the user the current count + println!("The number of live connections is {}", self.count.get()); + + let reply = Message::binary(generate()); + + // Echo the message back + self.out.send(reply) + } + + fn on_close(&mut self, code: CloseCode, reason: &str) { + match code { + CloseCode::Normal => println!("The client is done with the connection."), + CloseCode::Away => println!("The client is leaving the site."), + CloseCode::Abnormal => println!( + "Closing handshake failed! Unable to obtain closing status from client."), + _ => println!("The client encountered an error: {}", reason), + } + + // The connection is going down, so we need to decrement the count + self.count.set(self.count.get() - 1) + } + + fn on_error(&mut self, err: Error) { + println!("The server encountered an error: {:?}", err); + } + +} fn main() { - run_server() + // Cell gives us interior mutability so we can increment + // or decrement the count between handlers. + // Rc is a reference-counted box for sharing the count between handlers + // since each handler needs to own its contents. + let count = Rc::new(Cell::new(0)); + listen("127.0.0.1:40000", |out| { Server { out: out, count: count.clone() } }).unwrap(); } \ No newline at end of file diff --git a/src/net.rs b/src/net.rs index 3d831cd9..7f422e3d 100755 --- a/src/net.rs +++ b/src/net.rs @@ -1,51 +1,16 @@ -use futures::Future; -use tarpc::future::{client, server}; -use tarpc::future::client::ClientExt; -use tarpc::util::{FirstSocketAddr, Never}; -use tokio_core::reactor; +use cryp::{Cryp}; +use skill::{Skill}; +use serde_cbor::{to_vec}; -service! { - rpc hello(name: String) -> String; -} +pub fn generate() -> Vec { + let level_two = Cryp::new() + .named("hatchling".to_string()) + .level(2) + .learn(Skill::Stoney) + .create(); -#[derive(Clone)] -struct HelloServer; - -impl FutureService for HelloServer { - type HelloFut = Result; - - fn hello(&self, name: String) -> Self::HelloFut { - Ok(format!("Hello, {}!", name)) - } -} - -pub fn run_server() { - let mut reactor = reactor::Core::new().unwrap(); - let (mut handle, server) = HelloServer - .listen( - "localhost:10000".first_socket_addr(), - &reactor.handle(), - server::Options::default(), - ) - .unwrap(); - reactor.handle().spawn(server); -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn connect() { - let mut reactor = reactor::Core::new().unwrap(); - let options = client::Options::default().handle(reactor.handle()); - reactor - .run( - FutureClient::connect("localhost:10000".first_socket_addr(), options) - .map_err(|e| panic!("{:?}", e)) - .and_then(|client| client.hello("Mom".to_string())) - .map(|resp| println!("{}", resp)), - ) - .unwrap(); + match to_vec(&level_two) { + Ok(v) => v, + Err(e) => panic!("couldn't serialize cryp"), } } \ No newline at end of file diff --git a/src/server.rs b/src/server.rs deleted file mode 100755 index d4a32946..00000000 --- a/src/server.rs +++ /dev/null @@ -1,3 +0,0 @@ - -pub fn start_server() { -} \ No newline at end of file