创建异步运行时环境

  • 使用默认运行时环境:
1
2
// 创建默认多线程异步运行环境
let runtime = tokio::runtime::Runtime::new().unwarp();
  • 使用Builder,提供更多可配置选项:
1
2
3
4
5
6
7
8
9
10
11
12
// 创建带有线程池的runtime
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(8) // 8个工作线程
.enable_io() // 可在runtime中使用异步IO
.enable_time() // 可在runtime中使用异步计时器(timer)
.build() // 创建runtime
.unwrap();

// 或者创建单线程的runtime
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
  • 使用注解 annotation 启用异步运行时环境(这个跟上面使用Builder是一样的效果,语法糖而已):
1
2
3
4
5
6
7
#[tokio::main]
async fn main() {}

// 以下这几种都行
#[tokio::main(flavor = "multi_thread"] // 等价于#[tokio::main]
#[tokio::main(flavor = "multi_thread", worker_threads = 10))]
#[tokio::main(worker_threads = 10))]

执行异步任务

  • 使用 runtime::block_on 函数(block_on 函数也可以有返回值)
1
2
3
4
5
6
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
// ... 这里编写你的异步代码
});
}
  • 也可以使用注解main函数,然后直接在main函数内使用异步代码
1
2
3
4
#[tokio::main]
async fn main() {
// ... 这里编写你的异步代码
}
  • 实用Rutime::enter() 函数进入异步运行时上下文——以上方法会阻塞使用block_on()的线程,而使用enter()则不会阻塞当前线程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
let rt = Runtime::new().unwrap();

// 进入runtime,但不阻塞当前线程
let guard1 = rt.enter();

// 生成的异步任务将放入当前的runtime上下文中执行
tokio::spawn(async {
time::sleep(time::Duration::from_secs(5)).await;
println!("task1 sleep over: {}", now());
});

// 释放runtime上下文,这并不会删除runtime
drop(guard1);

// 可以再次进入runtime
let guard2 = rt.enter();
tokio::spawn(async {
time::sleep(time::Duration::from_secs(4)).await;
println!("task2 sleep over: {}", now());
});

drop(guard2);

添加阻塞任务

Tokio提供阻塞线程,blocking thread,它默认是不存在的。blocking thread不用于执行异步任务,因此runtime不会去调度管理这类线程,它们在本质上相当于一个独立的thread::spawn()创建的线程,它也不会像block_on()一样会阻塞当前线程。它和独立线程的唯一区别,是blocking thread是在runtime内的,可以在runtime内对它们使用一些异步操作,例如await。

blocking thread执行完对应任务后,并不会立即释放,而是继续保持活动状态一段时间,此时它们的状态是空闲状态。当空闲时长超出一定时间后(可在runtime build时通过thread_keep_alive()配置空闲的超时时长),该空闲线程将被释放。

  • 可以通过Runtime::spawn_blocking()开启一个阻塞线程,并执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
let rt1 = Runtime::new().unwrap();
// 创建一个blocking thread,可立即执行(由操作系统调度系统决定何时执行)
// 注意,不阻塞当前线程
let task = rt1.spawn_blocking(|| {
println!("in task: {}", now());
// 注意,是线程的睡眠,不是tokio的睡眠,因此会阻塞整个线程
thread::sleep(std::time::Duration::from_secs(10))
});

// 小睡1毫秒,让上面的blocking thread先运行起来
std::thread::sleep(std::time::Duration::from_millis(1));
println!("not blocking: {}", now());

// 可在runtime内等待blocking_thread的完成
rt1.block_on(async {
task.await.unwrap();
println!("after blocking task: {}", now());
});
  • 在当前Worker线程中通过 block_in_place() 添加阻塞任务。但是它会先将该worker thread中已经存在的异步任务转移到其它worker thread,使得这些异步任务不会被饥饿。
1
2
3
task::block_in_place(move || {
// do some compute-heavy work or call synchronous code
});

在block_in_place内部,可以使用block_on()或enter()重新进入runtime环境。

1
2
3
4
5
6
7
8
9
10
11
12
use tokio::task;

async {
task::spawn(async {
// ...
println!("spawned task done!")
});

// Yield, allowing the newly-spawned task to execute first.
task::yield_now().await;
println!("main task done!");
}

添加异步任务

  • 异步执行环境外添加异步任务,并且立即执行
1
2
3
4
5
6
7
8
9
10
11
12
// 使用tokio::spawn函数添加异步任务
tokio::spawn(async {
time::sleep(time::Duration::from_secs(10)).await;
println!("async task over: {}", now());
});

// 或者可以使用runtime实例的spawn方法添加异步任务
fn async_task(rt: &Runtime) {
rt.spawn(async {
time::sleep(time::Duration::from_secs(10)).await;
});
}

关闭Runtime

  • 直接drop(runtime):完整的关闭过程如下:
    • 1.先移除整个任务队列,保证不再产生也不再调度新任务
    • 2.移除当前正在执行但尚未完成的异步任务,即终止所有的worker thread
    • 3.移除Reactor,禁止接收事件通知

注意⚠️,这种删除runtime句柄的方式只会立即关闭未被阻塞的worker thread,那些已经运行起来的blocking thread以及已经阻塞整个线程的worker thread仍然会执行。但是,删除runtime又要等待runtime中的所有异步和非异步任务(会阻塞线程的任务)都完成,因此删除操作会阻塞当前线程

1
2
3
let rt = Runtime::new().unwrap();
...
drop(rt);
  • tokio提供了另外两个关闭runtime的方式:
    • <font style="color:rgb(38, 38, 37);background-color:#FFFFFF;">shutdown_timeout()</font>等待指定的时间,如果正在超时时间内还未完成关闭,将强行终止runtime中的所有线程。
    • <font style="background-color:#FFFFFF;">shutdown_background()</font>立即强行关闭runtime。