美文网首页
Rust修行之Future篇-part4

Rust修行之Future篇-part4

作者: 黑天鹅学院 | 来源:发表于2020-03-01 22:32 被阅读0次

本文翻译自Rust futures: an uneducated, short and hopefully not boring tutorial - Part 4 - A "real" future from scratch

介绍

在前面的文章中,我们涵盖了Future的基本操作,包括集联,执行,以及创建。但是到目前为止,我们对Future的了解并不能帮助我们解决一些实际的问题。在part3中,我们使用简单粗暴的方法实现了Future的parking以及即时unparking,这个小技巧可以帮助我们利用Future处理一些流程处理的事情,但是仍然离真实的场景相距甚远。在本文中,我们将实现一个更加贴合实际的例子。

A timer future

在part3中,我们创建了一个最简单的Timer Future,我们依然从这个例子着手,但是这一次我们不会立即unparking这个Future的task,而是持续保持Parked状态,直到其达到了完成状态。实现这个功能最简单的做法是引入另一个线程,这个线程将持续等待,直到一段时间之后unpark我们之前设置的任务。

这种机制类似于模拟了一个异步IO的过程,在异步事情完成时,会收到相应的通知。为了简单起见,我们仅创建一个单线程的reactor,在等待期间我们可以执行其他的事情。

Timer revised

我们定义一个简单的结构体,仅包含超时时间以及当前的运行状态。


pub struct WaitInAnotherThread {

    end_time: DateTime<Utc>,

    running: bool,

}

impl WaitInAnotherThread {

    pub fn new(how_long: Duration) -> WaitInAnotherThread {

        WaitInAnotherThread {

            end_time: Utc::now() + how_long,

            running: false,

        }

    }

}

在上面的定义中,DateTime与Duration这两个类型定义来自于chronos crate。

Spin wait

我们实现的等待函数如下:


pub fn wait_spin(&self) {

    while Utc::now() < self.end_time {}

    println!("the time has come == {:?}!", self.end_time);

}

这个函数仅仅简单的比较当前时间与超时时间,这种方法有效,并且检测方式非常精确,缺点是过度消耗cpu资源。


fn main() {

    let wiat = WaitInAnotherThread::new(Duration::seconds(30));

    println!("wait spin started");

    wiat.wait_spin();

    println!("wait spin completed");

}

fcpu

从图中可以看出core 8的利用率是100%,这与part3中的例子是一致的。

spin-wait这种方式非常精确但是极度浪费,应该仅用在等待时间非常短,或者是没有选择的情况下使用。

Sleep wait

通常OS提供sleep函数来park用户线程,睡眠X秒的意思实际上是告诉cpu在这段时间内,不要调度当前的线程。在sleep期间,如果cpu还有足够的资源,可以执行其他的任务。在Rust中,通过std::thread::sleep()来实现线程睡眠。

我们基于sleep改造前面的等待函数:


pub fn wait_blocking(&self) {

    while Utc::now() < self.end_time {

        let delta_sec = self.end_time.timestamp() - Utc::now().timestamp();

        if delta_sec > 0 {

            thread::sleep(::std::time::Duration::from_secs(delta_sec as u64));

        }

    }

    println!("the time has come == {:?}!", self.end_time);

}

在上面的函数中,我们通过减去已经睡眠的时间来得到尚需继续睡眠的时间,由于timestamp函数精度不高,整体准确度将低于spin-wait方法。调用方法如下:


let wiat = WaitInAnotherThread::new(Duration::seconds(30));

println!("wait blocking started");

wiat.wait_blocking();

println!("wait blocking completed");

修改之后的性能取得了极大的改善,cpu消耗如下:


fcpu1

改善了性能,但是我们怎样在Future中使用呢。

Future

我们先实现一个简单的Future:


impl Future for WaitInAnotherThread {

    type Item = ();

    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

        while Utc::now() < self.end_time {

            let delta_sec = self.end_time.timestamp() - Utc::now().timestamp();

            if delta_sec > 0 {

                thread::sleep(::std::time::Duration::from_secs(delta_sec as u64));

            }

        }

        println!("the time has come == {:?}!", self.end_time);

        Ok(Async::Ready(())

}

在poll函数中,通过sleep来睡眠等待,并不消耗cpu,但是这种方法将会阻塞reactor,将会影响到其他的Future。这种写法是不提倡的。

在Rust中,Future应尽量避免使用阻塞函数。

一个好的reactor使用习惯应该遵循以下原则:

1 当需要等待超时事件时,应该停止当前任务

2 不要阻塞当前线程

3 异步事件完成时通知reactor。

我们将在另一个线程中引入sleep函数来满足以上原则。这个睡眠函数不消耗资源,由于运行在看另一个线程中,所以不受当前线程的影响。当sleep线程完成时,将会唤醒当前任务,同时通知reactor。

我们一步步来实现上述想法。


impl Future for WaitInAnotherThread {

    type Item = ();

    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

        if Utc::now() < self.end_time {

            println!("not ready yet! parking the task.");

            if !self.running {

                println!("side thread not running! starting now!");

                self.run(task::current());

                self.running = true;

            }

            Ok(Async::NotReady)

        } else {

            println!("ready! the task will complete.");

            Ok(Async::Ready(()))

        }

    }

}

我们仅需要启动一次睡眠线程,所以使用一个running变量来标记。有一点需要切记,在Future的poll函数被调用之前,我们的任务并不会被执行,这一点与我们的需求是一致的。此外,在启动睡眠线程之前,我们还检查了超时时间是否已过,如果已经超时,将不会创建睡眠线程。

如果未超时,且睡眠线程创建完成后,我们向reactor申请park我们的任务,通过返回Ok(Async::NotReady)完成。与part3相反,在这里我们并不会立即unpark,这个工作将由睡眠线程来完成。在其他的实现中,比如IO,这样的park行为可能由OS来完成。

睡眠线程的实现如下:


fn run(&mut self, task: task::Task) {

    let lend = self.end_time;

    thread::spawn(move || {

        while Utc::now() < lend {

            let delta_sec = lend.timestamp() - Utc::now().timestamp();

            if delta_sec > 0 {

                thread::sleep(::std::time::Duration::from_secs(delta_sec as u64));

            }

            task.notify();

        }

        println!("the time has come == {:?}!", lend);

    });

}

有两点值得注意。首先,我们将task传递给了睡眠线程,原因是我们我们不能在另外的线程中访问到Task::current()。其次,我们不能将self移动到闭包中,所以我们需要转移所有权至lend变量。为啥这样做呢?原因是Rust中的线程需要实现具有'static生命周期的Send Trait。task自身的实现同时满足申明周期的要求,所以我们可以将task传递至闭包中,但是我们实现的结构体并不满足条件,所以我们使用了end_time的一个拷贝。

这种方法意味这在线程启动后,将不能修改超时时间。

调用方式如下:


fn main() {

    let mut reactor = Core::new().unwrap();

    let wiat = WaitInAnotherThread::new(Duration::seconds(3));

    println!("wait future started");

    let ret = reactor.run(wiat).unwrap();

    println!("wait future completed. ret == {:?}", ret);

}

输出如下:


Finished dev [unoptimized + debuginfo] target(s) in 0.96 secs

    Running `target/debug/tst_fut_complete`

wait future started

not ready yet! parking the task.

side thread not running! starting now!

the time has come == 2017-11-21T12:55:23.397862771Z!

ready! the task will complete.

wait future completed. ret == ()

我们来回顾下几个关键的流程:

1 请求reactor启动我们的Future

2 我们的Future发现当前未超时,于是:

2.1 par task

2.2 启动辅助线程

3 辅助线程在一段时间后被唤醒,并且执行:

3.1 通知reactor某一个task能够被unpark

3.2 退出自身线程

4 reactor唤醒被park的线程

5 task完成自身执行,并且:

5.1 告诉reactor当前task执行完成

5.2 返回执行结果

6 reactor将task的返回值返回给run函数的调用者

总结来看,整个过程是很有逻辑条理的。

Conclusion

本文的例子是一个几乎接近于真实情况的Future实现。由于没有引入阻塞过程,所以能够满足reactor的要求,也不消耗额外的资源。这个例子并没有实现具体的任务,尽管Rust已经实现了内建的timeout机制,但是,了解内部的原理对提升我们自身有着很大的帮助。

实际上,通常情况下,大多数程序员并不会手写Future,而是使用包含了相应功能的库,但是了解Future的内部原理仍然很重要。

下一个主题是Streams,我们将尝试创建不会阻塞reactor的Iterators。

appendix

完整代码如下:


extern crate chrono;

extern crate futures;

extern crate tokio_core;

use chrono::prelude::*;

use chrono::*;

use futures::prelude::*;

use futures::*;

use std::error::Error;

use std::thread::{sleep, spawn};

use tokio_core::reactor::Core;

pub struct WaitInAnotherThread {

    end_time: DateTime<Utc>,

    running: bool,

}

impl WaitInAnotherThread {

    pub fn new(how_long: Duration) -> WaitInAnotherThread {

        WaitInAnotherThread {

            end_time: Utc::now() + how_long,

            running: false,

        }

    }

    fn run(&mut self, task: task::Task) {

      let lend = self.end_time;

        spawn(move || {

            while Utc::now() < lend {

                let delta_sec = lend.timestamp() - Utc::now().timestamp();

                if delta_sec > 0 {

                    sleep(::std::time::Duration::from_secs(delta_sec as u64));

                }

                task.notify();

            }

            println!("the time has come == {:?}!", lend);

        });

    }

}

impl Future for WaitInAnotherThread {

    type Item = ();

    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

        if Utc::now() < self.end_time {

            println!("not ready yet! parking the task.");

            if !self.running {

                println!("side thread not running! starting now!");

                self.run(task::current());

                self.running = true;

            }

            Ok(Async::NotReady)

        } else {

            println!("ready! the task will complete.");

            Ok(Async::Ready(()))

        }

    }

}

fn main() {

    let mut reactor = Core::new().unwrap();

    let wiat = WaitInAnotherThread::new(Duration::seconds(3));

    println!("wait future started");

    let ret = reactor.run(wiat).unwrap();

    println!("wait future completed. ret == {:?}", ret);

}

相关文章

网友评论

      本文标题:Rust修行之Future篇-part4

      本文链接:https://www.haomeiwen.com/subject/rzirkhtx.html