  • Tokio,Rust异步编程实践之路


    在许多编程语言里,我们都非常乐于去研究在这个语言中所使用的异步网络编程的框架,比如说Python的 Gevent、asyncio,Nginx 和 OpenResty,Go 等,今年年初我开始接触 Rust,并被其无 GC、内存安全、极小的运行时等特性所吸引,经过一段时间的学习,开始寻找构建实际项目的解决方案,很快 mio、tokio 等框架进入了我的视野,于是开始从更加底层的 mio 出发实验。


    可以看到 mio 是一个非常底层的异步编程的框架,这意味着如果我们要在实际的项目开发中使用它时,就不得不从 event loop 开始编写我们的软件,这并不是我们所期望的,于是我们需要一个更高层次抽象的框架,这便是本文要为大家讲述的 tokio。


    tokio 是 Rust 中的异步编程框架,它将复杂的异步编程抽象为 Futures、Tasks 和 Executor,并提供了 Timers 等基础设施,下文中我们将一一展开。


    tokio 是一个基于轮训的模型。比如我们要在 tokio 上调度我们的 task,我们需要为其实现 Future trait。比如下面的例子中,我们想要得到一个 widget,但它有可能还没有准备好,这时候我们调用 poll 的结果就是 Ok(Async::NotReady),Executor 会负责重复的调用 poll,直到 widget 准备好,返回Ok(Async::Ready(()))

    /// A task that polls a single widget and writes it to STDOUT.
    pub struct MyTask;
    impl Future for MyTask {
        type Item = ();
        type Error = ();
        fn poll(&mut self) -> Result<Async<()>, ()> {
            match poll_widget() {
                Async::Ready(widget) => {
                    println!("widget={:?}", widget);
                Async::NotReady => {
                    return Ok(Async::NotReady);

    在最简单的情况下,Executor 可能会长这样。(注:这不是真实的实现,只是用来说明概念)

    pub struct SpinExecutor {
        tasks: VecDeque<Box<Future<Item = (), Error = ()>>>,
    impl SpinExecutor {
        pub fn spawn<T>(&mut self, task: T)
        where T: Future<Item = (), Error = ()> + 'static
        pub fn run(&mut self) {
            while let Some(mut task) = self.tasks.pop_front() {
                match task.poll().unwrap() {
                    Async::Ready(_) => {}
                    Async::NotReady => {

    Executor 频繁地轮询所有 task,即使某些 task 仍然会以 NotReady 返回。
    理想情况下,Executor 应该可以通过某种方式知道哪些 task 恰好转变为 “就绪” 状态。这正是 futures 任务模型的核心。


    future 是对一个未来事件的抽象。比如你可以将各种事件抽象为 future:

    • 在线程池中执行的数据库查询。当数据库查询完成时,future 完成,其值是查询的结果。
    • 对服务器的 RPC 调用。当服务器回复时,future 完成,其值是服务器的响应。
    • 超时事件。当时间到了,future 就完成了,它的值是 ()
    • 在线程池上运行的长时间运行的 CPU 密集型任务。任务完成后,future 完成,其值为任务的返回值。


    extern crate futures;
    extern crate tokio;
    extern crate tokio_core;
    use std::error::Error;
    use futures::Future;
    use futures::future::{ok, done};
    use tokio_core::reactor::Core;
    fn my_fn_squared(i: u32) -> Result<u32, Box<Error>> {
        Ok(i * i)
    fn my_fut_squared(i: u32) -> impl Future<Item = u32, Error = Box<Error + 'static>> {
        ok(i * i)
    fn my_fut() -> impl Future<Item = u32, Error = Box<Error + 'static>> {
    fn main() {
        let mut reactor = Core::new().unwrap();
        let chained_future = my_fut().and_then(|retval| {
            done(my_fn_squared(retval)).and_then(|retval2| my_fut_squared(retval2))
        let retval3 = reactor.run(chained_future).unwrap();
        println!("{:?}", retval3);

    这里,我们的 my_fut 的返回值实现了 Future,我们知道它被 Executor 执行完成后,会返回一个 u32 或者 一个 Box<Error + 'static>,而现在我们就可以通过 .and_then 来处理这个 u32 的值,而最终我们将我们的 future 链接了起来,交给 Executor 执行。


    Tasks 是应用程序的 “逻辑单元”。他们以 Future trait 来表示。一旦 task 完成处理,task 的 future 实现将以值 () 返回。

    Tasks 被传递给 Executor,Executor 处理 task 的调度。Executor 通常在一组或一组线程中调度许多 task。task 不得执行计算繁重的逻辑,否则将阻止其他 task 执行。

    Tasks 既可以通过实现 Future trait 来实现,也可以通过使用 futurestokio crates 中的各种组合器函数来构建 future 来实现。


    tokio crate 也提供了 TCP、UDP 的支持,不像 std 中的实现,tokio 的网络类型是基于 poll 模型的,并且当他们的 “就绪” 状态改变时会通知 task executors。在 tokio::net 模块中你将会找到像 TcpListener、TcpStream、UdpSocket 这些类型。

    所有这些类型都提供了 future 的 API 以及 poll API。

    Tokio 网络类型被一个基于 mio 的 reactor 所驱动,默认情况下,它在后台线程上启动。

    使用 future API

    一些帮助使用 future API 的函数包括:

    • incoming:入站 TCP 连接的 Stream。
    • read_exact:将n字节准确读入缓冲区。
    • read_to_end:将所有字节读入缓冲区。
    • write_all:写缓冲区的全部内容。
    • copy:将字节从一个 I/O 句柄复制到另一个。

    这些函数中的许多都是源于 AsyncReadAsyncWrite trait 的。这些 trait 类似于 std 中的 ReadWrite,但仅仅用于具有 future aware 的类型,例如符合下面的特征:

    • 调用 readwrite 是非阻塞的,他们从不阻塞调用线程。
    • 如果一个调用会以其他方式阻塞,那么会返回一个错误 WouldBlock。如果发生这种情况,则当前 future 的task 将在 I/O 再次准备就绪时被调度。

    注意 AsyncReadAsyncWrite 类型的用户应该使用 poll_readpoll_write 代替直接调用 readwrite

    例如,以下是如何接受连接,从中读取5个字节,然后将5个字节写回 socket 的例子:

    let server = listener.incoming().for_each(|socket| {
        println!("accepted socket; addr={:?}", socket.peer_addr().unwrap());
        let buf = vec![0; 5];
        let connection = io::read_exact(socket, buf)
            .and_then(|(socket, buf)| {
                io::write_all(socket, buf)
            .then(|_| Ok(())); // Just discard the socket and buffer
        // Spawn a new task that processes the socket:

    使用 Poll API

    当手动实现 Future 时,需要使用基于 Poll 的 API,并且你需要返回 Async。当您需要实现自己的处理自定义逻辑的组合器时,这非常有用。

    例如,这就是如何为 TcpStream 实现 read_exact future 的例子。

    pub struct ReadExact {
        state: State,
    enum State {
        Reading {
            stream: TcpStream,
            buf: Vec<u8>,
            pos: usize,
    impl Future for ReadExact {
        type Item = (TcpStream, Vec<u8>);
        type Error = io::Error;
        fn poll(&mut self) -> Result<Async<Self::Item>, io::Error> {
            match self.state {
                State::Reading {
                    ref mut stream,
                    ref mut buf,
                    ref mut pos
                } => {
                    while *pos < buf.len() {
                        let n = try_ready!({
                            stream.poll_read(&mut buf[*pos..])
                        *pos += n;
                        if n == 0 {
                            let err = io::Error::new(
                                "early eof");
                            return Err(err)
                State::Empty => panic!("poll a ReadExact after it's done"),
            match mem::replace(&mut self.state, State::Empty) {
                State::Reading { stream, buf, .. } => {
                    Ok(Async::Ready((stream, buf)))
                State::Empty => panic!(),


    UdpSocket 类型提供了许多方便的方法:

    • send_dgram 允许您将发送数据报作为 future,如果无法立即发送整个数据报,则返回错误。
    • recv_dgram 表示将数据报读入缓冲区。


    extern crate log;
    extern crate futures;
    extern crate pretty_env_logger;
    extern crate tokio;
    use futures::future::{done, ok};
    use futures::{Future, Stream};
    use tokio::io::{self as tio, AsyncRead};
    use tokio::net::{TcpListener, TcpStream};
    use std::error;
    use std::fmt;
    use std::io;
    fn client_fut(socket: TcpStream) -> impl Future<Item = (), Error = ()> + 'static + Send {
        futures::lazy(move || match socket.peer_addr() {
            Ok(peer) => {
                info!("Tcp connection [{:?}] connected to server", peer);
                Ok((socket, peer))
            Err(err) => {
                error!("Fetch peer address failed: {:?}", err);
        }).and_then(move |(socket, peer)| {
                let buf = vec![0; 5];
                let svc_fut = tio::read_exact(socket, buf)
                    .and_then(|(socket, buf)| {
                        tio::write_all(socket, buf)
                    .then(|_| Ok(()));
    fn server_fut(listener: TcpListener) -> impl Future<Item = (), Error = ()> + 'static + Send {
            .for_each(|socket| {
            .map_err(|err| {
                error!("Accept connection failed: {:?}", err);
    fn run() -> Result<(), io::Error> {
        let addr = "".parse().unwrap();
        info!("Listening on {:?}", addr);
        let listener = TcpListener::bind(&addr)?;
        let server_fut = server_fut(listener);
    fn print<T: fmt::Debug, E: error::Error>(result: Result<T, E>) {
        match result {
            Ok(any) => info!("Result: {:?}", any),
            Err(err) => error!("Error: {:?}", err),
    fn init() {
    fn main() {



    • 在一段时间后运行一些代码。
    • 取消运行时间过长的运行操作。
    • 以一定间隔重复执行操作。

    这些用例通过使用 timer 模块中提供的各种计时器 API 来处理。


    在这个例子中,我们希望在一段时间后执行任务。为此,我们使用 Delay API。我们要做的只是将 "Hello world!" 写到终端。

    use tokio::prelude::*;
    use tokio::timer::Delay;
    use std::time::{Duration, Instant};
    fn main() {
        let when = Instant::now() + Duration::from_millis(100);
        let task = Delay::new(when)
            .and_then(|_| {
                println!("Hello world!");
            .map_err(|e| panic!("delay errored; err={:?}", e));

    为长时间运行的操作设置 Timeout


    Deadline 类型确保操作在固定的时间内完成。

    use tokio::io;
    use tokio::net::TcpStream;
    use tokio::prelude::*;
    use std::time::{Duration, Instant};
    fn read_four_bytes(socket: TcpStream)
        -> Box<Future<Item = (TcpStream, Vec<u8>), Error = ()>>
        // The instant at which the read will be aborted if
        // it has not yet completed.
        let when = Instant::now() + Duration::from_secs(5);
        let buf = vec![0; 4];
        let fut = io::read_exact(socket, buf)
            .map_err(|_| println!("failed to read 4 bytes by deadline"));


    在一个时间间隔内重复运行代码对于在套接字上发送 PING 消息,或经常检查配置文件等情况很有用。

    Interval 类型实现了 Stream,并以指定的速率挂起。

    use tokio::prelude::*;
    use tokio::timer::Interval;
    use std::time::{Duration, Instant};
    fn main() {
        let task = Interval::new(Instant::now(), Duration::from_millis(100))
            .for_each(|instant| {
                println!("fire; instant={:?}", instant);
            .map_err(|e| panic!("interval errored; err={:?}", e));


    Tokio 计时器的粒度为 1 毫秒。任何更小的间隔都会向上舍入到最接近的毫秒。定时器在用户域中实现(即不使用操作系统定时器,像 linux 上的 timerfd)。它使用分层散列计时器轮实现,在创建,取消和触发超时时提供有效的恒定时间复杂度。

    Tokio 运行时包括每个工作线程一个计时器实例。这意味着,如果运行时启动4个工作线程,则将有4个计时器实例。这在大多数情况下避免了同步,因为当使用计时器时,任务将在位于当前线程上的状态下操作。



    下面是关于 Future 的图表,来自于 Cheatsheet for Futures

    // Constructing leaf futures
    fn empty ()             -> Future<T, E>
    fn ok    (T)            -> Future<T, E>
    fn err   (E)            -> Future<T, E>
    fn result(Result<T, E>) -> Future<T, E>
    // General future constructor
    fn poll_fn(FnMut(thread_local!(Task)) -> Poll<T, E>) -> Future<T, E>
    // Mapping futures
    fn Future::map     (Future<T, E>, FnOnce(T) -> U) -> Future<U, E>
    fn Future::map_err (Future<T, E>, FnOnce(E) -> F) -> Future<T, F>
    fn Future::from_err(Future<T, Into<E>>)           -> Future<T, E>
    // Chaining (sequencing) futures
    fn Future::then    (Future<T, E>, FnOnce(Result<T, E>) -> IntoFuture<U, F>) -> Future<U, F>
    fn Future::and_then(Future<T, E>, FnOnce(T)            -> IntoFuture<U, E>) -> Future<U, E>
    fn Future::or_else (Future<T, E>, FnOnce(E)            -> IntoFuture<T, F>) -> Future<T, F>
    fn Future::flatten (Future<Future<T, E>, Into<E>>)                          -> Future<T, E>
    // Joining (waiting) futures
    fn Future::join (Future<T, E>, IntoFuture<U, E>)                                                       -> Future<(T, U),          E>
    fn Future::join3(Future<T, E>, IntoFuture<U, E>, IntoFuture<V, E>)                                     -> Future<(T, U, V),       E>
    fn Future::join4(Future<T, E>, IntoFuture<U, E>, IntoFuture<V, E>, IntoFuture<W, E>)                   -> Future<(T, U, V, W),    E>
    fn Future::join5(Future<T, E>, IntoFuture<U, E>, IntoFuture<V, E>, IntoFuture<W, E>, IntoFuture<X, E>) -> Future<(T, U, V, W, X), E>
    fn join_all     (IntoIterator<IntoFuture<T, E>>)                                                       -> Future<Vec<T>,          E>
    // Selecting (racing) futures
    fn Future::select (Future<T, E>, IntoFuture<T, E>) -> Future<(T, Future<T, E>), (E, Future<T, E>)>
    fn Future::select2(Future<T, E>, IntoFuture<U, F>) -> Future<Either<(T, Future<U, F>), (U, Future<T, E>)>, Either<(E, Future<U, F>), (F, Future<T, E>)>>
    fn select_all     (IntoIterator<IntoFuture<T, E>>) -> Future<(T, usize, Vec<Future<T, E>>), (E, usize, Vec<Future<T, E>>)>
    fn select_ok      (IntoIterator<IntoFuture<T, E>>) -> Future<(T, Vec<Future<T, E>>), E>
    // Utility
    fn lazy         (FnOnce() -> IntoFuture<T, E>)             -> Future<T, E>
    fn loop_fn      (S, FnMut(S) -> IntoFuture<Loop<T, S>, E>) -> Future<T, E>
    // Miscellaneous
    fn Future::into_stream   (Future<T, E>)            -> Stream<T, E>
    fn Future::flatten_stream(Future<Stream<T, E>, E>) -> Stream<T, E>
    fn Future::fuse          (Future<T, E>)            -> Future<T, E>
    fn Future::catch_unwind  (Future<T, E>+UnwindSafe) -> Future<Result<T, E>, Any+Send>
    fn Future::shared        (Future<T, E>)            -> Future<SharedItem<T>, SharedError<E>>+Clone
    fn Future::wait          (Future<T, E>)            -> Result<T, E>



    返回 futures

    在使用 futures 时,您可能需要做的第一件事就是返回一个 Future。这有几种选择,从最符合人体工程学到最不符合。

    • Trait 对象
    • impl Trait

    Trait 对象

    首先,您始终可以选择返回一个 boxed trait 对象

    fn foo() -> Box<Future<Item = u32, Error = io::Error>> {
        // ...


    这种方法的缺点是,在构建 future 时需要运行时分配,在使用该 future 时需要动态分派。Box 需要在堆上分配而 future 会被置入其中。

    通常可以通过仅在您想要返回的 future 链的最后来 Boxing 来减少分配。

    impl Trait

    在 Rust 1.26 版本之后(2018年5月7日发布),我们可以使用叫做 impl Trait 的新的语言特性。

    fn add_10<F>(f: F) -> impl Future<Item = i32, Error = F::Error>
        where F: Future<Item = i32>,
        f.map(|i| i + 10)

    这种方法的好处在于它是零开销的,不再需要 Box

    使用 framed streams

    Tokio 有帮助函数将字节流转换为帧流。字节流的例子包括 TCP 连接,管道,文件对象以及标准输入和输出。在Rust中,streams 很容易识别,因为它们实现了 ReadWrite trait。

    最简单的帧化的消息形式之一是行分隔消息。每条消息都以一个 字符结尾。让我们看一下如何使用 tokio 实现行分隔消息流。


    编解码器实现 tokio_codec::Decodertokio_codec::Encoder trait。他的工作就是将字节转为帧以及相反。这些 traittokio_codec::Framed struct一起使用,以提供字节流的缓冲,解码和编码。

    让我们看一下LinesCodec struct 的简化版本,它实现了行分隔消息的解码和编码。

    pub struct LinesCodec {
        // Stored index of the next index to examine for a `
    ` character.
        // This is used to optimize searching.
        // For example, if `decode` was called with `abc`, it would hold `3`,
        // because that is the next index to examine.
        // The next time `decode` is called with `abcde
    `, the method will
        // only look at `de
    ` before returning.
        next_index: usize,

    这里的注释解释了,由于字节被缓存直到找到一行,因此每次接收数据时从缓冲区的开头搜索 是浪费的。保存缓冲区的最后长度并在收到新数据时从那里开始搜索将更有效。

    Decoder::decode 是在底层流上接收到数据时调用的方法。该方法可以生成帧或返回 Ok(None) 以表示它需要更多数据来生成帧。该 decode 方法负责通过使用 BytesMut 的方法将不再需要缓冲的数据删除。如果数据未删除,缓冲区将持续增长。

    让我们来看看如何为 LinesCodec 实现 Decoder::decode

    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
        // Look for a byte with the value '
    ' in buf. Start searching from the search start index.
        if let Some(newline_offset) = buf[self.next_index..].iter().position(|b| *b == b'
            // Found a '
    ' in the string.
            // The index of the '
    ' is at the sum of the start position + the offset found.
            let newline_index = newline_offset + self.next_index;
            // Split the buffer at the index of the '
    ' + 1 to include the '
            // `split_to` returns a new buffer with the contents up to the index.
            // The buffer on which `split_to` is called will now start at this index.
            let line = buf.split_to(newline_index + 1);
            // Trim the `
    ` from the buffer because it's part of the protocol,
            // not the data.
            let line = &line[..line.len() - 1];
            // Convert the bytes to a string and panic if the bytes are not valid utf-8.
            let line = str::from_utf8(&line).expect("invalid utf8 data");
            // Set the search start index back to 0.
            self.next_index = 0;
            // Return Ok(Some(...)) to signal that a full frame has been produced.
        } else {
            // '
    ' not found in the string.
            // Tell the next call to start searching after the current length of the buffer
            // since all of it was scanned and no '
    ' was found.
            self.next_index = buf.len();
            // Ok(None) signifies that more data is needed to produce a full frame.

    当需要将帧写入下层流时,Encoder::encode 方法被调用。帧必须写入缓冲区并作为一个参数。写入缓冲区的数据将在准备好发送数据时写入流。

    现在让我们来看看如何为 LinesCodec 实现 Encoder::encode

    fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> {
        // It's important to reserve the amount of space needed. The `bytes` API
        // does not grow the buffers implicitly.
        // Reserve the length of the string + 1 for the '
        buf.reserve(line.len() + 1);
        // String implements IntoBuf, a trait used by the `bytes` API to work with
        // types that can be expressed as a sequence of bytes.
        // Put the '
    ' in the buffer.
        // Return ok to signal that no error occured.



    使用编解码器的最简单方法是使用 Framed 结构体。它是实现自动缓冲的编解码器的包装器。该 Framed 结构体既是 Stream 也是 Sink。因此,您可以从中接收帧并向其发送帧。

    您可以使用任何实现了 AsyncReadAsyncWrite trait 的类型,使用 AsyncRead::framed 方法创建一个 Framed 结构体。

    TcpStream::connect(&addr).and_then(|sock| {
        let framed_sock = sock.framed(LinesCodec::new());
        framed_sock.for_each(|line| {
            println!("Received line {}", line);


    本博客原创作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。
  • 原文地址:https://www.cnblogs.com/hymenz/p/9334297.html
