1. 异步编程简介

通常我们将消息通信分成同步和异步两种:

  • 同步就是消息的发送方要等待消息返回才能继续处理其它事情
  • 异步就是消息的发送方不需要等待消息返回就可以处理其它事情

很显然异步允许我们同时做更多事情,往往也能获得更高的性能。异步编程,是一种被越来越多编程语言支持的并发编程模型。

1.1 常见的并发编程模型

并发编程相对于常规、顺序式编程不够成熟或“标准化”。结果是,我们表达并发的方式不一样,取决于语言支持哪种并发模型。

常见的并发模型有:

  • OS 线程
    不需要编程模型作任何改动,这使得表达并发很容易。然而,线程间同步可能会很困难,并且性能开销很大。线程池可以较少一部分开销,但是不足够支持超大量 IO 密集负载。
  • 事件驱动编程
    可以变得高性能,但倾向于导致冗长,“非线性”的控制流。数据流和错误传播通常就变得很难跟进了。
  • 协程
    就像线程,但不需要改变编程模型,于是他们变得便于使用。像异步,他们可以支持大量的任务。然而,他们抽象了对于系统编程和自定义运行时实现非常重要的底层细节。

1.2 异步 vs 回调

异步编程的核心问题是如何处理通信:要么有办法知道通信有没有完成,要么能保证在通信完成后执行一段特定的逻辑。前者就是通知机制,比如信号量、条件变量等;后者就是callback,即回调。

当一项任务需要分成多个异步阶段完成时,就需要在每个阶段的回调函数中加入下阶段回调的代码,最终产生下面这样金字塔形状的代码:

getData = function(param, callback){$.get('http://example.com/get/'+param,function(responseText){callback(responseText);});
}getData(0, function(a){getData(a, function(b){getData(b, function(c){getData(c, function(d){getData(d, function(e){// ...});});});});
});

可以想象当回调层次继续增加时,代码有多恐怖。这就是回调噩梦。

1.3 异步 vs 多线程

  • OS 线程
    适合少量任务,因为线程会有 CPU 和内存开销。生成和切换线程是代价相当昂贵,甚至闲置的线程也会消耗系统资源。一个线程池库可以减轻这些开销,但并不能全部健康。然而,线程能让你重新利用存在的同步代码,而不需要大改源代码——不需要特别的编程模型。

  • 异步
    极大地降低了 CPU 和内存开销,尤其是再负载大量越过IO 边界的任务,例如服务器和数据库。同样,你可以处理比 OS 线程更高数量级的任务,因为异步运行时使用少量(昂贵的)线程来处理大量任务

这个例子的目标,是并发地下载两个网页。在典型的线程化(threaded)应用中,我们需要生成线程来达到并发:

fn get_two_sites() {// 生成两个线程来下载网页.let thread_one = thread::spawn(|| download("https:://www.foo.com"));let thread_two = thread::spawn(|| download("https:://www.bar.com"));// 等待两个线程运行下载完成.thread_one.join().expect("thread one panicked");thread_two.join().expect("thread two panicked");
}

然而,下载网页是小任务,为了这么少量工作创建线程相当浪费。对更大的应用来说,这很容易就会变成瓶颈。在异步 Rust,我们能够并发地运行这些任务而不需要额外的线程:

async fn get_two_sites_async() {// 创建两个不同的 "futures", 当创建完成之后将异步下载网页.let future_one = download_async("https:://www.foo.com");let future_two = download_async("https:://www.bar.com");// 同时运行两个 "futures" 直到完成.join!(future_one, future_two);
}

这里没有创建额外的线程。此外,所有函数调用都是静态分发的,也没有堆分配!然而,我们需要先编写能够异步执行的代码。

1.4 Future和Promise

Future和Promise来源于函数式语言,其目的是分离一个值和产生值的方法,从而简化异步代码的处理。

Future指一个只读的值的容器,这个值可能立即可用,也可能在未来某个时间可用。而Promise则是一个只能写入一次的对象。每个Promise关联一个Future,对Promise的写入会令Future的值可用。

Future与Promise配合起来可以实现一种可靠的通知机制,即我们可以异步执行一个方法,通过返回的Future来知道异步方法何时结束、是否成功、返回值是什么。

// 调用方
void SyncOperation() {Promise<int> promise;RunAsync(std::bind(AsyncFunc, promise));Future<int> future = promise.GetFuture();int result = future.Get(); // wait until future is done
}
// 接收方
void AsyncFunc(Promise<int> promise) {// do somethingpromise.Done(result);
}

Promise的一个重要特性就是它支持then,可以将金字塔式的回调组织为链式,极大地降低了理解和维护的难度:

getData = function(param, callback){return new Promise(function(resolve, reject) {$.get('http://example.com/get/'+param,function(responseText){resolve(responseText);});});
}getData(0).then(getData).then(getData).then(getData).then(getData);

1.5 async/await

async/.await在promise链式代码的基础上,更进一步,让异步函数编写得像同步代码。

getData = async function(param, callback){return new Promise(function(resolve, reject) {$.get('http://example.com/get/'+param,function(responseText){resolve(responseText);});});
}var data = await getData(0);
var data1 = await getData(data);
var data2 = await getData(data1);
var data3 = await getData(data2);
var data4 = await getData(data3);

这种写法要比Promise链更接近同步,也更易懂,但其底层依然是Promise。这种写法很接近于协程:用Promise来实现yield和resume,它就是一种协程。

async在运行之前什么都不做。运行async函数的最常见方式是 await它。当在async函数上调用 await时,它将尝试运行以完成它。如果函数被阻止,它将让出当前线程。当可以取得更多进展时,执行者将继续运行,以便 await 解决。

2. rust并发编程

2.1 rust多线程

下面是一个简单的程序,它可以显示10次Sleepus消息,每次间隔 0.5秒;同时显示5次Interruptus消息,每次间隔1秒。

use std::thread::{sleep, spawn};
use std::time::Duration;fn sleepus() {for i in 1..=10 {println!("Sleepus {}", i);sleep(Duration::from_millis(500));}
}fn interruptus() {for i in 1..=5 {println!("Interruptus {}", i);sleep(Duration::from_millis(1000));}
}fn main() {let sleepus = spawn(sleepus);let interruptus = spawn(interruptus);sleepus.join().unwrap();interruptus.join().unwrap();
}

可以看到,和其他语言的多线程编程写法基本类似。不需要对同步函数代码做太大修改。

2.2 基于async/await的rust异步编程

我们对上面的例子,进行异步改造,实现在单一线程内让两个任务 协作执行。

use async_std::task::{sleep, spawn};
use std::time::Duration;async fn sleepus() {for i in 1..=10 {println!("Sleepus {}", i);sleep(Duration::from_millis(500)).await;}
}async fn interruptus() {for i in 1..=5 {println!("Interruptus {}", i);sleep(Duration::from_millis(1000)).await;}
}#[async_std::main]
async fn main() {let sleepus = spawn(sleepus());interruptus().await;sleepus.await;
}

看起来有很多修改,不过实际上,我们的代码结构和之前的版本基本是一致的。

异步函数能够与普通的 Rust 函数一样使用。但是,调用这些函数不意味着执行这些函数,调用 async fn 类型的函数返回的是一个代表该操作的标识。在概念上他跟一个无参的闭包函数类型。为了能够真正的执行它,你需要在函数返回的标识上使用 .await 操作。

比如:

async fn say_world() {println!("world");
}#[tokio::main]
async fn main() {// Calling `say_world()` does not execute the body of `say_world()`let op = say_hello();// This println! comes firstprintln!("hello");// Calling `.await` on `op` starts executing `say_world`.op.await;
}

输出

hello
world

3. rust Tokio库

Tokio 是 Rust 的异步 runtime,可用于编写快速、可靠的网络应用。Tokio 还提供用于 TCP、UDP、计时器、多线程、工作窃取算法(work-stealing)调度等的 API。

3.1 Tokio 入门

我们从写一个最基础的的 Tokio 程序开始,这个程序会连接到 MiniRedis 的服务端,然后设置一个 key 为 hello,value 为 world 的键值对,然后再把这个键值对读取回来。

打开 Cargo.toml ,并在 [dependencies] 后添加下面的代码

tokio = { version = "1", features = ["full"] }
mini-redis = "0.4"

代码如下

use mini_redis::{client, Result};#[tokil::main]
pub async fn main() -> Result<()> {// Open a connection to the mini-redis address.let mut client = client::connect("127.0.0.1:6379").await?;// Set the key "hello" with value "world"client.set("hello", "world".into()).await?;// Get key "hello"let result = client.get("hello").await?;println!("got value from the server; result={:?}", result);Ok(())
}

接下来花点时间梳理下我们刚才做的事情。代码并不多,但其中却触发了许多的事情。

let mut client = client::connect("127.0.0.1:6379").await?;

函数 client::connect 是 mini-redis 这个包所提供的,他会使用指定的地址来异步的创建一个 TCP 连接,当这个连接建立成功时, client 则保存了该函数返回的结果。尽管这个操作是异步发生的,但代码 看起来 却是同步的。其中唯一指示了该操作为异步的只有 .await 操作符。

用来启动程序的 main 函数其他普通的 Rust 程序的有所不同:

  • 被定义为 async fn
  • 添加了 #[tokio::main] 宏

async fn 函数在我们需要执行异步操作的上下文中被使用。然而,异步函数需要通过 runtime 来运行,runtime 中包含异步任务的调度器,他提供了事件驱动的 I/O、定时器等。runtime 并不会自动的运行,所以需要在主函数中运行它。

我们在 async fn main() 函数中添加的 #[tokio::main] 宏会将其转换为同步的 fn main() 函数,该函数会初始化 runtime 并执行我们定义的异步的 main 函数。

比如

#[tokio::main]
async fn main() {println!("hello");
}

会被转换为

fn main() {let mut rt = tokio::runtime::Runtime::new().unwrap();rt.block_on(async {println!("hello");})
}

3.2 Spawning(并发)

接下来,我们写一个Redis服务端

use mini_redis::{Connection, Frame};
use tokio::net::{TcpListener, TcpStream};#[tokio::main]
async fn main() {let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();loop {// The second item contains the IP and Port or the new connectionlet (socket, _) = listener.accept().await.unwrap();process(socket).await;}
}async fn process(socket: TcpStream) {// The `Connection` lets us read/write redis **frame** instead of// byte streams. The `Connection` type is defined by mini-redislet mut connection = Connection::new(socket);if let Some(frame) = connection.read_frame().await.unwrap() {println!("GOT: {:?}", frame);// Response with an errorlet response = Frame::Error("unimplemented".to_string());connection.write_frame(&response);}
}

每次只能处理一个请求。当接收了一个连接后,服务端会在当前循环中一直堵塞直到返回信息完全写到套接字中。

我们希望 Redis 服务能够同时处理多个请求,所以我们需要让他并发 (Concurrenty) 起来。

use tokio::net::TcpListener;#[tokio::main]
async fn main() {let listener = TcpListener::bind("127.0.0.1:6379")loop {let (socket, _) = listerner.accept().await.unwrap();// A new task is spqwned for each inbound socket. the socket is// moved to the new task and processed there.tokio::spawn(async move {process(socket).await;});}
}

Tokio 的任务是异步的绿色线程,他通过传递给 tokio::spawn 的 async 语句块创建,这个函数接收 async 语句块后返回一个 JoinHandle,调用者则通过 JoinHandle 与创建的任务交互。有些传递的 async 语句块是具有返回值的,调用者通过 JoinHandle 的 .await 来获取其返回值,

#[tokio::main]
async fn main() {let handle = tokio::spawn(async {"return value"});// Do some other worklet out = handle.await.unwrap();println!("GOT {}", out);
}

任务在 Tokio 中是非常轻量的,实际上他只需要申请一次 64 个字节的内存。所以程序可以轻松的产生成千上万的任务。

接下来继续实现 process 函数来处理接收的命令。我们将使用 HashMap 来存储收到的值,SET 操作会插入一条新的记录到 HashMap 中,而 GET 操作则从中读取。并且,我们还会使用一个循环来处理来自同个连接的多个命令。

async fn process(socket: TcpStream) {// The `Connection` lets us read/write redis **frame** instead of// byte streams. The `Connection` type is defined by mini-redisuse mini_redis::Command::{self, Get, Set};use std::collections::HashMap;// A hashmap is used to store datalet mut db = HashMap::new();// Connection, provided by `mini-redis`, handles parsing frames from// the socketlet mut connection = Connection::new(socket);while let Some(frame) = connection.read_frame().await.unwrap() {let response = match Command::from_frame(frame).unwrap() {Set(cmd) => {db.insert(cmd.key().to_string(), cmd.value().to_vec());Frame::Simple("OK".to_string())}Get(cmd) => {if let Some(value) = db.get(cmd.key()) {Frame::Bulk(value.clone().into())} else {Frame::Null}}cmd => panic!("unimplemented {:?}", cmd),};connection.write_frame(&response).await.unwrap();}

现在我们能获取跟设置信息了,但还存在一个问题。设置的信息还没办法在不同的连接中共享,如果其他的套接字连接尝试使用 GET 命令获取 hello 的值,他将找不到任何东西。

3.3 Channel(消息队列)

Channel大部分用在消息传递的场景中。
Tokio 提供了数种用于处理不同场景的 Channel

  • mpsc: 多生产者、单消费者的 Channel,能够发送多个信息
  • oneshot 单生产者、单消费者的 Channel,只能发送一个信息
  • broadcast 多生产者、多消费者,能够发送多个信息,每个消费者都能收到所有信息
  • watch 单生产者、多消费者,能够发送多个信息,但不会保存历史信息,消费者只能收到最新的信息

我们创建一个 mppsc 类型的 Channel

use tokio::sync::mpsc;#[tokio::main]
async fn main() {// Create a new channel with a capacity of at most 32let (tx, mut rx) = mpsc::channel(32);
}

mpsc 的 Channel 将用来发送命令给管理 Redis 连接的任务,其多生产者的模式允许多个任务通过他来发送消息。创建 Channel 的函数返回了两个值,一个发送者跟一个接收者,这两个句柄通常是分开使用的,他们会被移到到不同的任务中。

创建 Channel 时设置了容量为 32,如果消息发送的速度超过了接收的速度,这个 Channel 只会最多保存 32 个消息,当其中保存的消息超过了 32 时,继续调用 send(…).await 会让发送的任务进入睡眠,直到接收者又从 Channel 中消费了消息。

在使用中会通过 clone 发送者的方式,来让多个任务同时发送消息,如下例

use tokio::sync::mpsc;#[tokio::main]
async fn main() -> Result<()> {let (tx, mut rx) = mpsc::channel(32);let tx2 = tx.clone();tokio::spawn(async move {tx.send("sending from first handle").await;});tokio::spawn(async move {tx2.send("sending from second handle").await;});while let Some(message) = rx.recv().await {println!("GOT = {}", message);}Ok(())
}

每个消息最后都会发送给唯一的接收者,因为通过 mpsc 创建的接收者是不能 clone 的。

当所有发送者出了自身的作用域或被 drop 后就不再允许发送消息了,在这个时候接收者会返回 None,意味着所有的发送者已经被销毁,所以 Channel 也已经被关闭了。

3.4 Select(等待多个异步任务)

需要可以并发运行程序时,可以通过 spawn 创建一个新的任务。

tokio::select! 宏允许我们等待多个异步的任务,并且在其中一个完成时返回。

use tokio::sync::oneshot;#[tokio::main]
async fn main() {let (tx1, rx1) = oneshot::channel();let (tx2, rx2) = oneshot::channel();tokio::spawn(async {let _ = tx1.send("one");});tokio::spawn(async {let _ = tx2.send("two");});tokio::select! {val = rx1 => {println!("rx1 completed first with {:?}", val);}val = rx2 => {println!("rx2 completed first with {:?}", val);}}
}

这里我们使用了两个 OneShot Channel, 每个 Channel 都可能会先完成,select! 语句同时等待这两个 Channel,并在操作完成时将其返回值绑定到语句块的 val 变量,然后执行对应的完成语句。

要注意的是,另一个未完成的操作将会被丢弃,在这个示例中,对应的操作是等待每一个 oneshot::Receiver 的结果,最后未完成的那个 Channel 将会被丢弃。

3.5 Streams(异步迭代)

Stream 表示一个异步的数据序列,我们用 Stream Trait 来表示跟标准库的 std::iter::Iterator 类似的概念。

Tokio 提供的 Stream 支持是通过一个独立的包来实现的,他就是 tokio-stream

到目前为止,Rust 这门编程语言尚未支持异步的循环,因此要对 Stream 进行迭代我们需要用到 while let 循环及 StreamExt::next().

use tokio_stream::StreamExt;#[tokio::main]
async fn main() {let mut stream = tokio_stream::iter(&[1, 2, 3]);while let Some(v) = stream.next().await {println!("GOT = {:?}", v);}
}

现在来看一个略微复杂的 Mini-Redis 客户端的例子。

use tokio_stream::StreamExt;
use mini_redis::client;async fn publish() -> mini_redis::Result<()> {let mut client = client::connect("127.0.0.1:6379").await?;// Publish some dataclient.publish("numbers", "1".into()).await?lclient.publish("numbers", "two".into()).await?lclient.publish("numbers", "3".into()).await?lclient.publish("numbers", "four".into()).await?lclient.publish("numbers", "five".into()).await?lclient.publish("numbers", "6".into()).await?l
}async fn subscribe() -> mini_redis::Result<()> {let client = client::connect("127.0.0.1:6379").await?;let subscriber = client::subscribe(vec!["numbers".to_string()]).await?;let messages = subscriber.into_stream();tokio::pin!(messages);while let Some(msg) = messages.next().await {println!("Got = {:?}", msg);}Ok(())
}#[tokio::main]
async fn main() -> mini_redis::Result<()> {tokio::spawn(async {publish().await});subscribe().await?;println!("DONE");Ok(())
}

在上面的代码中我们创建了一个用 Mini-Redis 在 numbers频道中发布消息的任务,而在主任务中,我们订阅了 number 频道,并且每次在收到该频道的消息时将它打印了出来。

在订阅之后,我们在订阅者上面调用了 into_stream() 函数,这个函数消费了 Subscriber 然后返回了一个 在接收到消息时迭代数据的 Stream。

参考

https://fuzhe1989.github.io/2018/01/30/future-promise/
https://www.oschina.net/news/124525/tokio-1-0-released
https://www.zhihu.com/people/sinsay-chen/posts
https://learnku.com/docs/async-book/2018

进一步学习Rust

欢迎加入我的知识星球,我会提供一些学习资料(书籍/视频)以及解答一些问题。
大家一起学习Rust

rust异步编程--理解并发/多线程/回调/异步/future/promise/async/await/tokio相关推荐

  1. JavaScript 如何工作的: 事件循环和异步编程的崛起 + 5 个关于如何使用 async/await 编写更好的技巧...

    原文地址:How JavaScript works: Event loop and the rise of Async programming + 5 ways to better coding wi ...

  2. [译] JavaScript 如何工作的: 事件循环和异步编程的崛起 + 5 个关于如何使用 async/await 编写更好的技巧...

    原文地址:How JavaScript works: Event loop and the rise of Async programming + 5 ways to better coding wi ...

  3. 【转】1.7异步编程:基于事件的异步编程模式(EAP)

    传送门:异步编程系列目录-- 上一篇,我给大家介绍了".NET1.0 IAsyncResult异步编程模型(APM)",通过Begin*** 开启操作并返回IAsyncResult ...

  4. 避免回调地狱的解决方案 async/await:用同步的方式去写异步代码

    文章目录 前言 一.引入异步编程 二.常见处理异步编程的几种方式 1.Generator函数 2.Promise函数 3.async/await 总结 前言 这篇文章主要给大家分享一下,自己关于异步编 ...

  5. Promise async/await的理解和用法

    Promise && async/await的理解和用法 为什么需要promise(承诺)这个东西 在之前我们处理异步函数都是用回调这个方法,回调嵌套的时候会发现 阅读性 和 调试 的 ...

  6. JS 异步编程的解决方案,以及回调地狱的解决方案

    1.回调函数 回调函数是异步编程最基本的方法. 所谓回调函数,就是把任务的第二段单独写在一个函数里面,等到重新执行这个任务的时候,再调用这个函数. fs.readFile('/etc/fstab', ...

  7. python3异步编程_协程 Python异步编程(asyncio)

    协程(Coroutine) 也可以被称为微线程,是一种用户态内的上下文切换技术.简而言之,其实就是通过一个线程实现代码块相互切换执行. 直接上代码,例如: 同步编程 import time def f ...

  8. JavaScript异步编程(1)- ECMAScript 6的Promise对象

    JavaScript的Callback机制深入人心.而ECMAScript的世界同样充斥的各种异步操作(异步IO.setTimeout等).异步和Callback的搭载很容易就衍生"回调金字 ...

  9. Rust FFI 编程--理解不同语言的数据类型转换

    1. 简介 "FFI"是" Foreign Function Interface"的缩写,大意为不同编程语言所写程序间的相互调用.鉴于C语言事实上是编程语言界的 ...

最新文章

  1. python自学教材-python零基础自学教材
  2. ATF RT-SVC的介绍
  3. SpringBoot v2.2.6版本遇到的坑------Thymeleaf的sec:authorize标签无效
  4. Feedback about (Blockchain OR ML) AND (logistics)
  5. visual basic.net 2019-当前内存状态、字符串内插、操作系统系统信息
  6. jax-rs jax-ws_通过JAX-WS Provider在Web服务中利用MOXy
  7. 武魂觉醒s系列服务器,[多线]星河斗罗——新服开荒丨高程度剧情还原丨3D坐骑丨魂环丨武魂觉醒[1.12.2]...
  8. python医学图像分割_基于cv2的医学图像分割
  9. [洪流学堂]Hololens开发入门篇3:使用基本功能开发一个小应用
  10. 【SQL Server备份恢复】维护计划实现备份:每周数据库完整备份、每天差异备份、每小时日志备份...
  11. 第六章 使用ADO.NET查询和操作数据
  12. iOS FMDB有返回结果集和无返回结果集
  13. signature=32c56289e10e63e51063305adfc34ef0,Deconfinement transition and Black Holes
  14. 不装插件,查看.rp文件
  15. 基于ABBYY SDK 实现java版本 Hello 功能!
  16. 四、守护线程 deamon
  17. 赛马问题(30匹马,5个跑道,比赛多少次可以分出前三名)
  18. 一款性能足够的4.5寸以下的手机
  19. RoboMaster视觉教程(5)目标位置解算(通过像素点获取转角)
  20. FFmpeg:常用命令小笔记

热门文章

  1. ims系统 呈现服务器,基于IMS的呈现服务器的设计与实现
  2. 请教如何保存matlab仿真出来的图,如何保存Matlab绘制出来的图像
  3. Linux运行级别介绍和root忘记密码找回方法
  4. Spring高级应用之bean的生命周期
  5. 3-36Pytorch与tensorboardX
  6. ps cs3怎样能保存html,ps cs3用消失点清理杂物方法介绍
  7. 学习easyui疑问(二)
  8. python //运算符
  9. 解决pytorch RuntimeError: expected scalar type XXXX but found XXXX
  10. docker 删除镜像时报错Error response from daemon: conflict: unable to delete xxx (must be forced) -