localhost

sorting the wheat from the chaff

Hyper, the Rust async microframework

§ Introducing Hyper 0.12

I had investigated how Futures work in Rust, the next step was finding a light web framework in Rust to build a microservice so I ended up trying Hyper. This is the first of a series of tutorials written for myself.

Reading the "label on the tin", Hyper is a high-performance HTTP async server + client that sits on the Tokio runtime and Future crate.

What a Future is, I've put some notes here.

What the Tokio runtime is ... I don't care! Let's say it's just an engine that acts like a juggler and handles stuff very quickly.

The guides for Hyper are clear enough to get started.

Oh, one big disclaimer before we start. Be sure to look at the latest Hyper 0.12.x branch. Version 0.13.x is in the works and changes a number of things (f.e. uses the async/await syntax). Slighly previous versions (like 0.12.15) have a different API. Hyper is heavily in flux and breaking changes are not properly advertised. You'll find yourself looking at code written for 0.11 that doesn't compile anymore.

But let's see how it works. As far as I could understand the basic principle of Hyper is: it creates a Service struct that will handle all requests. I don't care now how all that work, just let me get started.

§ Fire up a minimal server

Here we have a server listening for incoming connections and returning a string to each of them as response:

// Notice we are using the Future crate re-exported from hyper
// use futures::future;
use hyper::rt::Future;

// this is a "convenience" function to transform any function into a Service
use hyper::service::service_fn_ok;
use hyper::{Body, Request, Response, Server};

fn main() {
    println!("Start");

    let addr = ([127, 0, 0, 1], 3000).into();
    let server = Server::bind(&addr)
        .serve(|| {
            // This is the `Service` that will handle the connection.
            // `service_fn_ok` is a helper to convert a function that
            // returns a Response into a `Service`.
            service_fn_ok(move |_: Request<Body>| Response::new(Body::from("Hello World!\n")))
        })
        .map_err(|e| eprintln!("server error: {}", e));

    // runs on tokio runtime
    println!("Listening on http://{}", addr);
    hyper::rt::run(server);

    println!("Exiting");
}

§ Hyper spawns a Future

The server listens for incoming connections and spawns a Future that sleep for 2 seconds.

Notice how che client connection is closed immediately and the Future is resolved at a later stage.

Do not use std::thread::sleep to add a delay, you'll end up blocking the whole Tokio runtime thread!

fn svc_wait(t: u64) -> impl Future<Item = (), Error = ()> {
    println!("[start] waiting...");
    let when = Instant::now() + Duration::from_millis(t);
    Delay::new(when)
        .map_err(|e| panic!("timer failed; err={:?}", e))
        .and_then(|_| {
            println!("[end] waiting");
            Ok(())
        })
}

fn main() {
    let addr = ([127, 0, 0, 1], 3000).into();
    let server = Server::bind(&addr)
        .serve(|| {
            service_fn_ok(|req: Request<Body>| {
                // received the client connection
                eprintln!("Received client: {:?}", req.headers());
                // creating the future
                let f = svc_wait(2000);
                // the future is run NOW
                hyper::rt::spawn(f);
                // the client receives immediately a reply
                eprintln!("Sending back NOW a response to the client");
                Response::new(Body::from("Future triggered"))
            })
        })
        .map_err(|e| eprintln!("server error: {}", e));

    // runs on tokio runtime
    println!("Listening on http://{}", addr);
    hyper::rt::run(server);
}

Observe the server logging:

# The request to the server will be immediately served, then the connection closed
$ curl localhost:3000
Future triggered

# this is the output you'll see on the server
$ cargo run
Listening on http://127.0.0.1:3000
Received client: {"host": "127.0.0.1:3000", "user-agent": "curl/7.64.0", "accept": "*/*"}
[start] waiting...
Sending back NOW a response to the client
... waiting ...
[end] waiting

A reply is being sent immediately to the client. The future is triggered, starts doing its "work" and finishes way after a reply is sent to the client.

§ A simple endpoint router, spawns different Futures

We mentioned earlier that the Hyper server runs a Service that handles all the requests. This service implements a router that handles a tuple (http_method, path) where http_method can be GET, POST, etc. and path can be (f.e. /, /user, /user/:id and so on).

In the following example, the router handles two endpoints:

  • GET /wait: triggers the waiting Future seen before
  • GET /fetch: triggers a request on a remote server

Basically the same as before but with a twist: the request router is itself a Future that resolves when the final Future is resolved.

Full code of this example is here. Here we have the interesting bits:

fn fetch_data() -> impl Future<Item = future::FutureResult<RespStruct, String>, Error = ()> {
    let uri: Uri = "http://httpbin.org/get".parse().expect("Cannot parse URL");
    Client::new()
        .get(uri)
        // Future is polled here
        .and_then(|res| {
            // extract the body from the Response
            res.into_body().concat2()
        })
        .map_err(|err| println!("error: {}", err))
        .map(|body| {
            // here parse the FutureResult, serialize into a validated Struct
            let decoded: RespStruct = serde_json::from_slice(&body).expect("Couldn't deserialize");
            future::ok(decoded)
        })
}

fn svc_wait(t: u64) -> impl Future<Item = (), Error = ()> {
    // code omitted for brevity
}

// Just an alias to make it more readable
type BoxFut = Box<dyn Future<Item = Response<Body>, Error = hyper::Error> + Send>;

// This is our request router
fn service_router(req: Request<Body>) -> BoxFut {
    let mut response = Response::new(Body::empty());

    // routes the request to the appropriate worker
    match (req.method(), req.uri().path()) {

         // GET /wait
        (&Method::GET, "/wait") => {
            let r = svc_wait(1500);
            hyper::rt::spawn(r);
            *response.body_mut() = Body::from(format!("Triggered waiting {}ms", 1500));
        }

         // GET /fetch
        (&Method::GET, "/fetch") => {
            let r = fetch_data().map(|x| {
                println!("got data: {:?}", x);
            });
            hyper::rt::spawn(r);
            *response.body_mut() = Body::from("Sent request to external webservice");
        }

        // ... more routers

    }
    eprintln!("Returning a response");
    Box::new(future::ok(response))
}

fn main() {
    let addr = ([127, 0, 0, 1], 3000).into();
    let server = Server::bind(&addr)
        .serve(|| {
            // now we spawn a Future with our request router
            service_fn(service_router)
        })
        .map_err(|e| eprintln!("server error: {}", e));

    println!("Listening on http://{}", addr);
    hyper::rt::run(server);
}

Now you can call the server on two endpoints:

$ curl localhost:3000/wait
Triggered waiting 1500ms

$ curl localhost:3000/fetch
Sent request to external webservice

More examples can be found on the Hyper GitHub repo but without a proper context they didnt make too much sense to me (your mileage may vary). Maybe a bit more explaination of they actual do should suffice.


Comments closed for this article