select

Tokio教程之select

https://tokio.rs/tokio/tutorial/select

到目前为止,当我们想给系统添加并发性时,我们会生成一个新的任务。现在我们将介绍一些额外的方法,用Tokio并发执行异步代码。

tokio::select!

tokio::select! 宏允许在多个异步计算中等待,并在单个计算完成后返回。

比如说:

use tokio::sync::oneshot;#[tokio::main]
async fn main() {let (tx1, rx1) = oneshot::channel();let (tx2, rx2) = oneshot::channel();tokio::spawn(async {let _ = tx1.send("one");});tokio::spawn(async {let _ = tx2.send("two");});tokio::select! {val = rx1 => {println!("rx1 completed first with {:?}", val);}val = rx2 => {println!("rx2 completed first with {:?}", val);}}
}

Copy

使用了两个 oneshot 通道。任何一个通道都可以先完成。select! 语句在两个通道上等待,并将 val 与任务返回的值绑定。当 tx1 或 tx2 完成时,相关的块被执行。

没有完成的分支被放弃。在这个例子中,计算正在等待每个通道的 oneshot::Receiver。尚未完成的通道的 oneshot::Receiver 被放弃。

取消

在异步Rust中,取消操作是通过丢弃一个 future 来实现的。回顾 “Async in depth”,异步Rust操作是使用 futures 实现的,而 futures 是 lazy 的。只有当期货被 poll 时,操作才会继续进行。如果future被丢弃,操作就不能进行,因为所有相关的状态都被丢弃了。

也就是说,有时一个异步操作会催生后台任务或启动其他在后台运行的操作。例如,在上面的例子中,一个任务被催生出来,以发送一个消息回来。通常情况下,该任务会进行一些计算来生成数值。

Futures或其他类型可以实现 Drop 来清理后台资源。Tokio 的 oneshot::Receiver 通过向 Sender half 发送一个关闭的通知来实现 Drop。sender 部分可以收到这个通知,并通过丢弃来中止正在进行的操作。

use tokio::sync::oneshot;async fn some_operation() -> String {// Compute value here
}#[tokio::main]
async fn main() {let (mut tx1, rx1) = oneshot::channel();let (tx2, rx2) = oneshot::channel();tokio::spawn(async {// Select on the operation and the oneshot's// `closed()` notification.tokio::select! {val = some_operation() => {let _ = tx1.send(val);}_ = tx1.closed() => {// `some_operation()` is canceled, the// task completes and `tx1` is dropped.}}});tokio::spawn(async {let _ = tx2.send("two");});tokio::select! {val = rx1 => {println!("rx1 completed first with {:?}", val);}val = rx2 => {println!("rx2 completed first with {:?}", val);}}
}

Copy

Future 实现

为了帮助更好地理解 select! 的工作原理,让我们看看一个假想的Future实现是什么样子的。这是一个简化版本。在实践中,select! 包括额外的功能,如随机选择要先 poll 的分支。

use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};struct MySelect {rx1: oneshot::Receiver<&'static str>,rx2: oneshot::Receiver<&'static str>,
}impl Future for MySelect {type Output = ();fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {println!("rx1 completed first with {:?}", val);return Poll::Ready(());}if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {println!("rx2 completed first with {:?}", val);return Poll::Ready(());}Poll::Pending}
}#[tokio::main]
async fn main() {let (tx1, rx1) = oneshot::channel();let (tx2, rx2) = oneshot::channel();// use tx1 and tx2MySelect {rx1,rx2,}.await;
}

Copy

MySelect future 包含每个分支的future。当MySelect被 poll 时,第一个分支被 poll。如果它准备好了,该值被使用,MySelect完成。在 .await 收到一个future的输出后,该future被放弃。这导致两个分支的futures都被丢弃。由于有一个分支没有完成,所以该操作实际上被取消了。

请记住上一节的内容:

当一个 future 返回 Poll::Pending 时,它必须确保在未来的某个时间点上对 waker 发出信号。如果忘记这样做,任务就会被无限期地挂起。

在 MySelect 的实现中,没有明确使用 Context 参数。相应的是,waker的要求是通过传递 cx 给内部 future 来满足的。由于内部 future 也必须满足waker的要求,通过只在收到内部 future 的 Poll::Pending 时返回 Poll::PendingMySelect 也满足 waker 的要求。

语法

选择 select! 宏可以处理两个以上的分支。目前的限制是64个分支。每个分支的结构为:

<pattern> = <async expression> => <handler>,

当 select 宏被评估时,所有的 <async expression> 被聚集起来并同时执行。当一个表达式完成时,其结果与 <pattern> 匹配。如果结果与模式匹配,那么所有剩余的异步表达式被放弃,<handler> 被执行。<handler> 表达式可以访问由 <pattern> 建立的任何绑定关系。

基本情况是 <pattern> 是一个变量名,异步表达式的结果被绑定到该变量名,<handler> 可以访问该变量。这就是为什么在最初的例子中,val 被用于<pattern>,而 <handler> 能够访问 val

如果 <pattern> 与异步计算的结果不匹配,那么剩下的异步表达式继续并发地执行,直到下一个表达式完成。这时,同样的逻辑被应用于该结果。

因为 select! 可以接受任何异步表达式,所以可以定义更复杂的计算来进行选择。

在这里,我们在一个 oneshot 通道和一个TCP连接的输出上进行选择。

use tokio::net::TcpStream;
use tokio::sync::oneshot;#[tokio::main]
async fn main() {let (tx, rx) = oneshot::channel();// Spawn a task that sends a message over the oneshottokio::spawn(async move {tx.send("done").unwrap();});tokio::select! {socket = TcpStream::connect("localhost:3465") => {println!("Socket connected {:?}", socket);}msg = rx => {println!("received message first {:?}", msg);}}
}

Copy

在这里,我们选择了一个 onehot 并接受来自 TcpListener 的套接字。

use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;#[tokio::main]
async fn main() -> io::Result<()> {let (tx, rx) = oneshot::channel();tokio::spawn(async move {tx.send(()).unwrap();});let mut listener = TcpListener::bind("localhost:3465").await?;tokio::select! {_ = async {loop {let (socket, _) = listener.accept().await?;tokio::spawn(async move { process(socket) });}// Help the rust type inferencer outOk::<_, io::Error>(())} => {}_ = rx => {println!("terminating accept loop");}}Ok(())
}

Copy

accept 循环一直运行到遇到错误或 rx 收到一个值。_模式表示我们对异步计算的返回值不感兴趣。

返回值

tokio::select! 宏返回被评估的 <handler> 表达式的结果。

async fn computation1() -> String {// .. computation
}async fn computation2() -> String {// .. computation
}#[tokio::main]
async fn main() {let out = tokio::select! {res1 = computation1() => res1,res2 = computation2() => res2,};println!("Got = {}", out);
}

Copy

正因为如此,要求每个分支的 <handler> 表达式求值为同一类型。如果不需要 select! 表达式的输出,让表达式求值为()是很好的做法。

错误

使用?操作符会从表达式中传播错误。这取决于是在异步表达式中还是在处理程序中使用? 在一个异步表达式中使用?操作符会将错误从异步表达式中传播出去。这使得异步表达式的输出成为一个结果。在 handler 中使用?会立即将错误从 select!表达式中传播出去。让我们再看一下接受循环的例子。

use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;#[tokio::main]
async fn main() -> io::Result<()> {// [setup `rx` oneshot channel]let listener = TcpListener::bind("localhost:3465").await?;tokio::select! {res = async {loop {let (socket, _) = listener.accept().await?;tokio::spawn(async move { process(socket) });}// Help the rust type inferencer outOk::<_, io::Error>(())} => {res?;}_ = rx => {println!("terminating accept loop");}}Ok(())
}

Copy

注意 listener.accept().await? 操作符将错误从该表达式中传播出来,并传播到 res 绑定中。在发生错误时, res 将被设置为 Err(_)。然后,在处理程序中,再次使用?操作符。res? 语句将把一个错误从主函数中传播出去。

模式匹配

回顾一下,select! 宏分支语法被定义为:

<pattern> = <async expression> => <handler>,

到目前为止,我们只使用了 <pattern> 的变量绑定。然而,任何Rust模式都可以被使用。例如,假设我们从多个MPSC通道接收信息,我们可以这样做。

use tokio::sync::mpsc;#[tokio::main]
async fn main() {let (mut tx1, mut rx1) = mpsc::channel(128);let (mut tx2, mut rx2) = mpsc::channel(128);tokio::spawn(async move {// Do something w/ `tx1` and `tx2`});tokio::select! {Some(v) = rx1.recv() => {println!("Got {:?} from rx1", v);}Some(v) = rx2.recv() => {println!("Got {:?} from rx2", v);}else => {println!("Both channels closed");}}
}

Copy

借用

当催生任务时,被催生的异步表达式必须拥有其所有的数据。select! 宏没有这个限制。每个分支的异步表达式都可以借用数据并同时操作。按照Rust的借用规则,多个异步表达式可以不变地借用一个数据,或者一个异步表达式可以可变地借用一个数据。

我们来看看一些例子。在这里,我们同时向两个不同的TCP目的地发送相同的数据。

use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::io;
use std::net::SocketAddr;async fn race(data: &[u8],addr1: SocketAddr,addr2: SocketAddr
) -> io::Result<()> {tokio::select! {Ok(_) = async {let mut socket = TcpStream::connect(addr1).await?;socket.write_all(data).await?;Ok::<_, io::Error>(())} => {}Ok(_) = async {let mut socket = TcpStream::connect(addr2).await?;socket.write_all(data).await?;Ok::<_, io::Error>(())} => {}else => {}};Ok(())
}

Copy

data 变量被从两个异步表达式中不可变地借用。当其中一个操作成功完成时,另一个就会被放弃。因为我们在 Ok(_) 上进行模式匹配,如果一个表达式失败,另一个表达式继续执行。

当涉及到每个分支的 <handler> 时,select! 保证只运行一个 <handler>。正因为如此,每个<handler>都可以相互借用相同的数据。

例如,这在两个处理程序中都修改了out:

use tokio::sync::oneshot;#[tokio::main]
async fn main() {let (tx1, rx1) = oneshot::channel();let (tx2, rx2) = oneshot::channel();let mut out = String::new();tokio::spawn(async move {// Send values on `tx1` and `tx2`.});tokio::select! {_ = rx1 => {out.push_str("rx1 completed");}_ = rx2 => {out.push_str("rx2 completed");}}println!("{}", out);
}

Copy

循环

select! 宏经常在循环中使用。本节将通过一些例子来说明在循环中使用 select! 宏的常见方法。我们首先在多个通道上进行选择。

use tokio::sync::mpsc;#[tokio::main]
async fn main() {let (tx1, mut rx1) = mpsc::channel(128);let (tx2, mut rx2) = mpsc::channel(128);let (tx3, mut rx3) = mpsc::channel(128);loop {let msg = tokio::select! {Some(msg) = rx1.recv() => msg,Some(msg) = rx2.recv() => msg,Some(msg) = rx3.recv() => msg,else => { break }};println!("Got {}", msg);}println!("All channels have been closed.");
}

Copy

这个例子在三个通道的接收器上进行 select。当在任何通道上收到消息时,它被写入STDOUT。当一个通道被关闭时,recv() 以None返回。通过使用模式匹配,select! 宏继续在其余通道上等待。当所有的通道都关闭时,else分支被评估,循环被终止。

select! 宏随机挑选分支,首先检查是否准备就绪。当多个通道有等待值时,将随机挑选一个通道来接收。这是为了处理这样的情况:接收循环处理消息的速度比推入通道的速度慢,也就是说,通道开始被填满。如果 select! 不随机挑选一个分支先检查,在循环的每个迭代中,rx1将被首先检查。如果rx1总是包含一个新的消息,其余的通道将永远不会被检查。

如果当select!被评估时,多个通道有待处理的消息,只有一个通道有一个值被弹出。所有其他的通道保持不动,它们的消息保持在这些通道中,直到下一个循环迭代。没有消息丢失。

恢复异步操作

现在我们将展示如何在多次调用 select! 时运行一个异步操作。在这个例子中,我们有一个MPSC通道,类型为i32,还有一个异步函数。我们想运行异步函数,直到它完成或在通道上收到一个偶数整数。

async fn action() {// Some asynchronous logic
}#[tokio::main]
async fn main() {let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);    let operation = action();tokio::pin!(operation);loop {tokio::select! {_ = &mut operation => break,Some(v) = rx.recv() => {if v % 2 == 0 {break;}}}}
}

Copy

请注意,不是在 select! 宏中调用 action() ,而是在循环之外调用它。action() 的返回被分配给 operation,而不调用 .await。然后我们在 operation 上调用 tokio::pin!

在 select! 循环中,我们没有传入 operation,而是传入 &mut operation 。operation 变量正在跟踪飞行中的异步操作。循环的每个迭代都使用相同的 operation,而不是对 action() 发出一个新的调用。

另一个 select! 分支从通道中接收消息。如果该消息是偶数,我们就完成了循环。否则,再次启动 select! 。

这是我们第一次使用 tokio::pin! 我们现在还不打算讨论 pining 的细节。需要注意的是,为了 .await 一个引用,被引用的值必须被 pin 或者实现 Unpin

如果我们删除 tokio::pin! 这一行,并尝试编译,我们会得到以下错误:

error[E0599]: no method named `poll` found for struct`std::pin::Pin<&mut &mut impl std::future::Future>`in the current scope--> src/main.rs:16:9|
16 | /         tokio::select! {
17 | |             _ = &mut operation => break,
18 | |             Some(v) = rx.recv() => {
19 | |                 if v % 2 == 0 {
...  |
22 | |             }
23 | |         }| |_________^ method not found in|             `std::pin::Pin<&mut &mut impl std::future::Future>`|= note: the method `poll` exists but the following trait boundswere not satisfied:`impl std::future::Future: std::marker::Unpin`which is required by`&mut impl std::future::Future: std::future::Future`

Copy

虽然我们在上一章中介绍了 Future,但这个错误仍然不是很清楚。如果你在试图对一个引用调用 .await 时遇到这样一个关于 Future 没有被实现的错误,那么这个Future可能需要被 pin

关于标准库中的Pin。

修改分支

让我们来看看一个稍微复杂的循环。我们有:

  1. 一个 i32 值的通道。
  2. 一个在 i32 值上执行的异步操作。

我们要实现的逻辑是:

  1. 在通道上等待一个偶数。
  2. 使用偶数作为输入启动异步操作。
  3. 等待操作,但同时在通道上监听更多的偶数。
  4. 如果在现有的操作完成之前收到一个新的偶数,则中止现有的操作,用新的偶数重新开始操作。
async fn action(input: Option<i32>) -> Option<String> {// If the input is `None`, return `None`.// This could also be written as `let i = input?;`let i = match input {Some(input) => input,None => return None,};// async logic here
}#[tokio::main]
async fn main() {let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);let mut done = false;let operation = action(None);tokio::pin!(operation);tokio::spawn(async move {let _ = tx.send(1).await;let _ = tx.send(3).await;let _ = tx.send(2).await;});loop {tokio::select! {res = &mut operation, if !done => {done = true;if let Some(v) = res {println!("GOT = {}", v);return;}}Some(v) = rx.recv() => {if v % 2 == 0 {// `.set` is a method on `Pin`.operation.set(action(Some(v)));done = false;}}}}
}

Copy

我们使用的策略与前面的例子类似。async fn 在循环之外被调用,并被分配给 operation。operation 变量被 pin 住。循环在 operation 和通道接收器上都进行select。

注意 action 是如何将 Option<i32> 作为参数的。在我们接收第一个偶数之前,我们需要将 operation 实例化为某种东西。我们让 action 接受 Option 并返回Option。如果传入的是 None,就会返回 None。在第一个循环迭代中,operation 立即以 None 完成。

这个例子使用了一些新的语法。第一个分支包括 , if !done。这是一个分支的前提条件。在解释它是如何工作的之前,让我们看一下如果省略了这个前提条件会发生什么。省略 , if !done 并运行这个例子的结果是如下输出。

thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Copy

这个错误发生在试图使用已经完成的 operation 时。通常情况下,当使用 .await 时,被等待的值会被消耗。在这个例子中,我们对一个引用进行 await。这意味着 operation 在完成后仍然存在。

为了避免这种 panic,我们必须注意在 operation 完成后禁用第一个分支。done 变量用于跟踪 operation 是否完成。一个 select! 分支可能包括一个 precondition。这个前提条件在 select! 分支等待之前被检查。如果该条件被评估为false,那么该分支将被禁用。done变量被初始化为false。当 operation 完成后,done被设置为true。下一个循环迭代将禁用该操作分支。当从通道收到一个偶数信息时,operation 被重置,done被设置为false。

每任务的并发性

tokio::spoon 和 select! 都可以运行并发的异步操作。然而,用于运行并发操作的策略是不同的。tokio::spoon 函数接收一个异步操作并生成一个新的任务来运行它。任务是 Tokio 运行时安排的对象。两个不同的任务是由 Tokio 独立调度的。它们可能同时运行在不同的操作系统线程上。正因为如此,一个催生的任务和一个催生的线程有同样的限制:不能借用。

select! 宏在同一个任务上同时运行所有分支。因为 select! 宏的所有分支都在同一个任务上执行,所以它们永远不会同时运行。select! 宏在一个任务上复用异步操作。

Tokio教程之select相关推荐

  1. iBATIS教程之like语句的写法浅析

    iBATIS教程之like语句的使用我们可以先看看网上搜了一下iBATIS的关于like的使用 select * from USERS where USER_NAME like '%wang%'; 这 ...

  2. pgsql数据库默认配置事务类型_PostgreSQL基础教程之:初始化配置

    PostgreSQL基础教程之:初始化配置 时间:2020-04-27 来源: PostgreSQL基础教程之:初始化配置 一.配置pg_hba.conf 先说明客户端认证配置文件pg_hba.con ...

  3. 易语言逐条读access数据_易语言操作数据教程之ACCESS实战视频教程

    易语言操作数据教程之ACCESS实战教程第一讲 第一讲: 这一节我们讲ACCESS实战教程 实战教程: 分类的账号管理器 --------------------------------------- ...

  4. java jdbc 教程_java JDBC系列教程之JDBC类的简析与JDBC的基础操作

    什么是JDBC? 概念:JAVA Database Connectivity Javas数据库连接,Java语言操作数据库接口,然后由各个数据库厂商去实现这个接口,提供数据库驱动java包,我们可以使 ...

  5. java+mysql性能优化_Java培训实战教程之mysql优化

    Java培训实战教程之mysql优化 更新时间:2015年12月29日13时30分 来源:传智播客Java培训学院 浏览次数: 1.   mysql引擎 1.1.  引擎类型 MySQL常用的存储引擎 ...

  6. Docker最全教程之Python爬网实战(二十二)

    Python目前是流行度增长最快的主流编程语言,也是第二大最受开发者喜爱的语言(参考Stack Overflow 2019开发者调查报告发布).笔者建议.NET.Java开发人员可以将Python发展 ...

  7. Oracle(11g)数据库教程之十:Oracle操作题 (复习课)

    Oracle(11g)数据库教程之十:Oracle操作题 (复习课) 操作题 Sutdent表的定义 字段名 字段描述 数据类型 主键 非空 Id 学号 INT(10) 是 是 Name 姓名 VAR ...

  8. python pymysql cursors_老雷python基础教程之pymysql学习及DB类的实现

    老雷python教程之pymysql学习及DB类的实现 CREATE TABLE `sky_guest` ( `id` int(11) NOT NULL AUTO_INCREMENT, `title` ...

  9. 叩丁狼—Java培训实战教程之mysql优化

    Java培训实战教程之mysql优化 Java培训过程中精点.难点知识解析 1. mysql引擎1.1. 引擎类型MySQL常用的存储引擎为MyISAM.InnoDB.MEMORY.MERGE,其中I ...

最新文章

  1. 广州图书馆借阅抓取——httpClient的使用
  2. RSA非对称加密简析-java
  3. mybatis 获得一个map的返回集合
  4. 今日arXiv精选 | Survey/ICCV/ACM MM/ICML/CIKM/SIGIR/RecSys/IROS
  5. 算法函数_关于损失函数和优化算法,看这一篇就够了
  6. java关键字和标识符_Java数据类型和标识符
  7. Python基础——模块的安装
  8. ffmpeg 源代码简单分析 : av_register_all()
  9. Spring+SpringMVC+MyBatis+easyUI整合基础篇(五)讲一下maven
  10. 【luogu2272】 [ZJOI2007]最大半连通子图 [tarjan 缩点][拓扑排序]
  11. word在试图打开文件时遇到错误,检查稳定或驱动器文件权限
  12. oracle32 plsql,32位plsql developer连接64位oracle
  13. Godaddy、Lunarpages、IXwebhosting国外三大主机点评
  14. index.php g wap,代码阅读--wap端入口文件index.php
  15. 互联网年底裁员,离职倒计时!!!
  16. 苹果app商品定价_苹果将调整应用商店定价:中国区应用最低价涨至8元
  17. HDU - 2520 我是菜鸟,我怕谁
  18. RadiAnt DICOM Viewer 2021.1中文版
  19. g33k 专用:使用 Mutt Email 客户端管理你的 Gmail
  20. phpnow安装,phpnow卸载,phpnow教程,phpnow安装教程

热门文章

  1. 论文笔记(二十二):Soft Tracking Using Contacts for Cluttered Objects to Perform Blind Object Retrieval
  2. xp系统怎么创建新宽带连接服务器地址,XP宽带连接怎么创建?
  3. 旋转矩阵中6保6_双色球旋转矩阵(9-12)个号中6保5公式
  4. citra 图形设置_bios怎么设置硬盘启动顺序 bios设置硬盘启动方法
  5. Lottie 免费动画、在线预览
  6. 虚拟机软件有哪些?分别有哪些作用?
  7. 景元利老师--沪师经纪
  8. c盘太小想扩容,合并硬盘分区的方法,硬盘合并分区的步骤
  9. 爱存在用计算机怎么弹,qiukepingvf
  10. 关于测试思维的个人思考框架