文 Akisann@CNblogs / zhaihj@Github
本篇文章同时发布在Github上:https://zhaihj.github.io/writing-a-threadpool-in-rust.html

多线程一直是我相当不想碰的东西,总觉得看起来很棒,用起来却一点都不放心——尤其是过去用Delphi体验了多线程之后。实际上到了多线程里根本就没法定位那里出了错误,因此大部分时间压根不是在“调试”,而是告诉用户怎么用才能避免这个错误。在给OOC写MultiTheard Generating的时候我也紧紧用了最简单的ThreadPool,并且因为互斥锁多线程还没有单线程快。总之,这么多年我一直有意的躲避多线程的问题。

而Rust一直在宣传它在多线程上的优势——所有权如何的好,在Servo里面他们如何用Rust写了个漂亮的Parallel Parser。加上最近一年一直再用Rust写游戏的服务器,于是我决定回头看看Rust下的多线程体验到底如何。

ThreadPool

相比简单的用多线程算个加法,或者写个The Computer Language Benchmarks Game的测试程序来说,ThreadPool可能更适合练手。在这里,我会实现下面这个结构的ThreadPool:

简单来说,主线程通过Channel向ThreadPool发送Job,然后Threadpool里的每一个子线程在空闲时都会尝从Channel里获取Job,然后执行它。执行完毕之后,Job的返回结果会通过另一个Channel传送给主线程。实际上,在C或Delphi里,实现这么一个东西似乎并不是很困难,然而,Rust因为独特的所有权制度,代码比思路要复杂一些,下面,让我们来看看怎么实现这个ThreadPool。

Jobs for the Job

首先,让我们来设计Job,在这篇文章里,Job总是一个没有参数的函数,用代码来说,是这样:

type Job<T> = Box<Fn() -> T + Send + 'static>

由于我们需要把返回值传回给主线程,因此Job的返回值需要Send属性。这里,没有用FnOnce而是Fn仅仅是因为目前Rust的Box并不支持Box(),并且我并不打算用BoxFn来增加文章的复杂度。
对于每一个线程,Job大概是这样使用的:

loop {if let Ok(f) = get_a_job() {send_to_main_thread(f());}
}

我们每个线程都会不停的尝试获取job,执行它,然后获取下一个。相信到这里已经有不少人发现了问题:这个子线程似乎永远都无法结束。因为get_a_job显然只能返回Ok或阻塞——如果当Channel为空它就出错的话,一旦没有新任务,所有的子线程都会退出,之后这个ThreadPool就再也没法用了。于是,为了让这个子线程能退出,我们给Job添加一个用来表示结束的状态:

enum Message<T> {Work(Box<Fn() -> T + Send + 'static),Terminate,
}type Job<T> = Message<T>;

这样,一旦Job是Terminate,线程就知道ThreadPool已经准备退出了。于是,我们可以把子线程写成这样:

loop {if let Ok(f) = get_a_job() {match f {Message::Work(f) => f(),Message::Terminate => break,}}
}

有了这个定义之后,让我们来看看ThreadPool该怎么设计。

从前一节的图片里,我们知道这个ThrealPolo需要两个Channel,一个用来接受新Job,另一个用来发送Job的返回结果,因此,ThrealPool可以写成这样:

struct ThreadPool <T>{sender: mpsc::Sender<Job<T>>,pub result: mpsc::Receiver<T>,threads: Vec<Option<thread::JoinHandle<()>>>,
}

这里,我们用了std::sync::mpsc,而threads则是用来储存所有子线程的,毕竟在结束的时候我们还要关闭他们的句柄。下面,我们尝试生成这个ThreadPool。由于子线程数是静态的,几乎所有的工作都可以在new里面完成,我们的大致思路是这样:

  • 生成两个Channel, A和B,A用来接受Job,B用来发送结果
  • 生成n个线程,每一个线程里:
  • 保留A的Receiver和B的Sender
  • 通过A的Receiver接受Job,执行,通过Sender发送

看起来并不是很难,让我们把它变成代码:

impl<T> ThreadPool<T> where T: Send + 'static { ....// n是线程数,s是每一个线程获取新Job时的等待时间fn new(n: usize, s: u64) -> Self {// 用来接收Job的channellet (tx, rx) = mpsc::channel();// 用来发送结果的Channellet (tx1, rx1) = mpsc::channel();let rx : Arc<Mutex<mpsc::Receiver<Job<T>>>> = Arc::new(Mutex::new(rx));

首先我们定义了必要的channel,需要注意的是,对于channel,通常我们认为Sender可以有多个,但Receiver只有一个,因此在设计时Sender可以自然应对多线程,但Receiver则不。对于我们这里的情况,可以手动对Receiver加一个互斥锁,在Rust里可以使用std::sync::Mutex

下面就是生成n个线程了:

let v = (0 .. n).map(| _ | {let rx = rx.clone();let tx1 : mpsc::Sender<T> = tx1.clone();Some(thread::spawn(move || loop {if let Ok(f) = rx.lock() {if let Ok(f) = f.recv() {match f {Message::Work(f) => {let r : T = f();tx1.send(r);},Message::Terminate => break,}}}}))
}).collect::<Vec<_>>();

这里,thread::spawn用来生成新的线程,每个线程所作的事情就如之前描述的一样。最后,把这一切合起来,就可以得到一个ThreadPool了:

ThreadPool {sender: tx,result: rx1,threads: v,
}

有了这个ThreadPool之后,我们还需要一个方法来随时向里添加新Job,不过这件事情非常简单——只需要通过sender发送就够了:

fn add(&self, f: Job<T>) {self.sender.send(f).unwrap();
}

为了简单,我们并没有处理send返回的错误,在实际应用里,add可以返回Result

Finishing and Drop

实际上,到此为止,大部分工作已经结束了,最后还有一个小事情:ThreadPool没法自己结束,因此我们需要手动实现这一部分。在其他语言里析构可能是理所当然的,不过在Rust里,除了C/C++的Wrapper之外,这种事情并不常见。Rust提供了Drop Trait来实现析构,Drop里的drop函数会在内存释放前自动执行。因此,我们只需要这么做:

impl<T> Drop for ThreadPool<T> {fn drop(&mut self) {for _ in &self.threads{self.sender.send(Message::Terminate);}for t in &mut self.threads {if let Some(t) = t.take() {t.join().unwrap();}}}
}

在这里,我们先对每一个线程发送结束命令,然后等待他们结束(join)。在实际应用里,线程可能会因为Job比较耗时而无法处理Terminate命令,Drop也会卡在Join的部分,不过可惜的是Rust的Thread并没有提供non-blocking的方法,因此你可能需要Future-rs来实现non-blocking的join。

Try It

到这里,主要部分已经完成了,下面我们来测试一下这个ThreadPool的效果如何:

fn main() {let tp = ThreadPool::new(10, 100);for i in 0 .. 100 {tp.add(Message::Work(Box::new(move || { println!("Thread: {}", i); i*100 })));}let mut c = 0;loop {if let Ok(s) = tp.result.recv() {println!("Result: {}", s);c += 1;}if c == 100 {break;}}
}

线程池有10个子线程,每个Job会输出一行文字,并返回一个数字,编译执行后,结果看起来像这样:

Thread: 0
Thread: 1
Result: 0
Result: 100
Thread: 2
Thread: 3
Result: 200
Result: 300
Thread: 4
Thread: 5
Thread: 6
Result: 400
Result: 500
Result: 600
......

Conclusion

在文章里,我尽量平白的描述这个过程,不过如果在没有基础的情况下自己去写的话,可能会遇到很多有趣的问题,比如:实际上thread.join会消耗掉JoinHandle的,因此,如果用Vec<JoinHandle>来储存线程句柄的话,会无法编译通过——因为drop是by ref的。在这篇文章里,我用了跟The Book同样的方法:添加一个Option来解决这个问题。不过,The Boox并不代表着最好的解决办法,比如我们还可以:

while let Some(e) = self.threads.pop() {e.join().unwrap();
}

对我来说,这个办法看起来更加简洁和优雅——但不知为什么The Book没有这么做。

本文的源代码可以在我的Github里找到。

转载于:https://www.cnblogs.com/akisan/p/7435722.html

Writing A Threadpool in Rust相关推荐

  1. rust(54)-字符串

    Rust有两种类型的字符串:String 和&str. String 存储为字节向量(Vec),但保证始终是有效的UTF-8序列.字符串是堆分配的,可增长的,不以空null 结束. & ...

  2. gRPC-rs:从 C 到 Rust

    介绍 在上篇文章中,我们讲到 TiKV 为了支持 gRPC,我们造了个轮子 gRPC-rs,这篇文章简要地介绍一下这个库.首先我们来聊聊什么是 gRPC.gRPC 是 Google 推出的基于 HTT ...

  3. gRPC-rs:从 C 到 Rust 1

    介绍 在上篇文章中,我们讲到 TiKV 为了支持 [gRPC],我们造了个轮子 [gRPC-rs],这篇文章简要地介绍一下这个库.首先我们来聊聊什么是 gRPC.gRPC 是 Google 推出的基于 ...

  4. new file会创建文件吗_Rust 文件系统处理之文件读写 Rust 实践指南

    Rust 中,文件读写处理简单而高效.代码也很紧凑,容易阅读.我们从读取文件的字符串行.避免读取写入同一文件.使用内存映射随机访问文件这三个文件处理中的典型案例来了解一下. 文件处理场景大家都很熟悉, ...

  5. GO、Rust 这些新一代高并发编程语言为何都极其讨厌共享内存?

    作者 | 马超   责编 | 王晓曼 出品 | CSDN博客 今天我想再来讨论一下高并发的问题,我们看到最近以Rust.Go为代表的云原生.Serverless时代的语言,在设计高并发编程模式时往往都 ...

  6. c语言 sprintf_s 参数 通配符,Rust教程(翻译).doc

    Rust教程(翻译) The Rust Language Tutorial 目录 1.Introduction(介绍) 2.Getting started(开始) 3.Syntax basics(基础 ...

  7. rust 入门笔记: rustlings(推荐一些学习rust语法的一些非常好的小练习)

    rustlings 推荐一个学习rust非常好的repo: Small exercises to get you used to reading and writing Rust code! - 学习 ...

  8. 初学rust——Tests

    今天是学习rust的第四天,学习材料为官网的<Rust Programming Language>,本笔记的主要内容为第11章:Writing Automated Tests. 今日学习的 ...

  9. Programming Rust Fast, Safe Systems Development(译) 错误处理(第七章)

    I knew if I stayed around long enough, something like this would happen. -George Bernard Shaw on dyi ...

  10. Rust模板引擎Tera中文英文对照官方文档

    来嘞早不如来的巧,刚翻译好,你就来啦!翻译完成 要开发CMS(内容管理系统)得有个模板引擎,Tera是使用Rust编写的模板引擎,语法跟JinJa2很像. 引入Tera 要使用Tera只需要在 Car ...

最新文章

  1. uploadhandler.php,WordPress Kernel Theme ‘upload-handler.php’任意文件上传漏洞
  2. 021Python路--单例设计模式
  3. Ogre 3d 工具集
  4. Java 中的线程管理概念梳理
  5. 终极算法【5】——进化学派
  6. 主成分分析法步骤matlab,主成分分析法matlab实现程序
  7. c++编译报错:ld returned 1 exit status
  8. 随想录一期 day2 [977.有序数组的平方|209. 长度最小的子数组|59.螺旋矩阵II(剥洋葱)]
  9. Android 11 Audio框架探索之AudioTracK(二)
  10. 超详细的ArcGIS生成格网知识汇总
  11. 干货分享丨精心整理了份Python知识点高清速查表!太受用了!
  12. PN532半加密、无漏洞卡解密
  13. 计算机视觉 专业术语,计算机视觉中常用的术语.doc
  14. vmstat 命令的用法说明
  15. 尚硅谷Vue3(天禹老师主讲)的笔记
  16. wps android 版 参数控制介绍,最强手机办公软件 Android版金山WPS首评测
  17. C++ Standard Library
  18. NSURLConnection的使用
  19. 做网络推广怎么换IP地址?
  20. 解决安卓手机USB接口被外设占用导致无法调试的问题

热门文章

  1. DirectX 9 学习笔记
  2. 在 Windows下使用 fastText
  3. python max int
  4. python 爬虫爬取内容时, \xa0 、 \u3000 的含义与处理方法
  5. 每日算法系列【LeetCode 329】矩阵中的最长递增路径
  6. 数据预处理—2.为什么Lasso回归可以做特征选择(变量挑选)而岭回归做不到呢?
  7. python如何识别文件中的空行?
  8. Python Cheat Sheet 中文版
  9. 提高应用程序可用性的五个要点
  10. 博文视点大讲堂第20期——Windows 7来了