创建异步运行时环境
1 2
| let runtime = tokio::runtime::Runtime::new().unwarp();
|
1 2 3 4 5 6 7 8 9 10 11 12
| let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(8) .enable_io() .enable_time() .build() .unwrap();
let rt = tokio::runtime::Builder::new_current_thread() .build() .unwrap();
|
- 使用注解 annotation 启用异步运行时环境(这个跟上面使用Builder是一样的效果,语法糖而已):
1 2 3 4 5 6 7
| #[tokio::main] async fn main() {}
#[tokio::main(flavor = "multi_thread"] #[tokio::main(flavor = "multi_thread", worker_threads = 10))] #[tokio::main(worker_threads = 10))]
|
执行异步任务
- 使用 runtime::block_on 函数(block_on 函数也可以有返回值)
1 2 3 4 5 6
| fn main() { let rt = Runtime::new().unwrap(); rt.block_on(async { }); }
|
- 也可以使用注解main函数,然后直接在main函数内使用异步代码
1 2 3 4
| #[tokio::main] async fn main() { }
|
- 实用Rutime::enter() 函数进入异步运行时上下文——以上方法会阻塞使用block_on()的线程,而使用enter()则不会阻塞当前线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| let rt = Runtime::new().unwrap();
let guard1 = rt.enter();
tokio::spawn(async { time::sleep(time::Duration::from_secs(5)).await; println!("task1 sleep over: {}", now()); });
drop(guard1);
let guard2 = rt.enter(); tokio::spawn(async { time::sleep(time::Duration::from_secs(4)).await; println!("task2 sleep over: {}", now()); });
drop(guard2);
|
添加阻塞任务
Tokio提供阻塞线程,blocking thread,它默认是不存在的。blocking thread不用于执行异步任务,因此runtime不会去调度管理这类线程,它们在本质上相当于一个独立的thread::spawn()创建的线程,它也不会像block_on()一样会阻塞当前线程。它和独立线程的唯一区别,是blocking thread是在runtime内的,可以在runtime内对它们使用一些异步操作,例如await。
blocking thread执行完对应任务后,并不会立即释放,而是继续保持活动状态一段时间,此时它们的状态是空闲状态。当空闲时长超出一定时间后(可在runtime build时通过thread_keep_alive()配置空闲的超时时长),该空闲线程将被释放。
- 可以通过Runtime::spawn_blocking()开启一个阻塞线程,并执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| let rt1 = Runtime::new().unwrap();
let task = rt1.spawn_blocking(|| { println!("in task: {}", now()); thread::sleep(std::time::Duration::from_secs(10)) });
std::thread::sleep(std::time::Duration::from_millis(1)); println!("not blocking: {}", now());
rt1.block_on(async { task.await.unwrap(); println!("after blocking task: {}", now()); });
|
- 在当前Worker线程中通过 block_in_place() 添加阻塞任务。但是它会先将该worker thread中已经存在的异步任务转移到其它worker thread,使得这些异步任务不会被饥饿。
1 2 3
| task::block_in_place(move || { });
|
在block_in_place内部,可以使用block_on()或enter()重新进入runtime环境。
1 2 3 4 5 6 7 8 9 10 11 12
| use tokio::task;
async { task::spawn(async { println!("spawned task done!") });
task::yield_now().await; println!("main task done!"); }
|
添加异步任务
1 2 3 4 5 6 7 8 9 10 11 12
| tokio::spawn(async { time::sleep(time::Duration::from_secs(10)).await; println!("async task over: {}", now()); });
fn async_task(rt: &Runtime) { rt.spawn(async { time::sleep(time::Duration::from_secs(10)).await; }); }
|
关闭Runtime
- 直接drop(runtime):完整的关闭过程如下:
- 1.先移除整个任务队列,保证不再产生也不再调度新任务
- 2.移除当前正在执行但尚未完成的异步任务,即终止所有的worker thread
- 3.移除Reactor,禁止接收事件通知
注意⚠️,这种删除runtime句柄的方式只会立即关闭未被阻塞的worker thread,那些已经运行起来的blocking thread以及已经阻塞整个线程的worker thread仍然会执行。但是,删除runtime又要等待runtime中的所有异步和非异步任务(会阻塞线程的任务)都完成,因此删除操作会阻塞当前线程。
1 2 3
| let rt = Runtime::new().unwrap(); ... drop(rt);
|
- tokio提供了另外两个关闭runtime的方式:
<font style="color:rgb(38, 38, 37);background-color:#FFFFFF;">shutdown_timeout()</font>等待指定的时间,如果正在超时时间内还未完成关闭,将强行终止runtime中的所有线程。
<font style="background-color:#FFFFFF;">shutdown_background()</font>立即强行关闭runtime。