Introduction to Reactive Programming (Rx) 响应式编程入门(Rx)

Reactive programming has a higher learning curve than other forms of concurrency, and the code can be harder to maintain unless you keep up with your reactive skills. If you’re willing to learn it, though, reactive programming is extremely powerful. Reactive programming enables you to treat a stream of events like a stream of data. As a rule of thumb, if you use any of the event arguments passed to an event, then your code would benefit from using System.Reactive instead of a regular event handler.

响应式编程比其他形式的并发具有更高的学习曲线,并且代码可能更难维护,除非您跟上了响应式编程的技能。但是,如果您愿意学习它,响应式编程是非常强大的。响应式编程使您能够像处理数据流一样处理事件流。根据经验,如果您使用传递给事件的任何事件参数,那么您的代码将受益于使用System.Reactive,而不是常规的事件处理程序。

TIP:System.Reactive used to be called Reactive Extensions, which was often shortened to “Rx.” All three of these terms refer to the same technology.

System.Reactive过去被称为响应式扩展,通常缩写为“Rx”。这三个术语都指的是同一种技术。

Reactive programming is based on the notion of observable streams. When you subscribe to an observable stream, you’ll receive any number of data items (OnNext), and then the stream may end with a single error (OnError) or “end of stream” notification (OnCompleted). Some observable streams never end. The actual interfaces look like the following:

响应式编程基于可观察流的概念。当你订阅一个可观察对象流时,你会接收到任意数量的数据项(OnNext),然后流可能会以一个错误(OnError)或“流结束”通知(OnCompleted)结束。一些可观察到的流永远不会结束。实际的接口如下所示:

interface IObserver<in T>
{void OnNext(T item);void OnCompleted();void OnError(Exception error);
}interface IObservable<out T>
{IDisposable Subscribe(IObserver<TResult> observer);
}

However, you should never implement these interfaces. The System.Reactive (Rx) library by Microsoft has all the implementations you should ever need.Reactive code ends up looking very much like LINQ; you can think of it as “LINQ to Events.” System.Reactive has everything that LINQ does and adds in a large number of its own operators, particularly ones that deal with time. The following code starts with some unfamiliar operators (Interval and Timestamp) and ends with a Subscribe, but in the middle are some Where and Select operators that should be familiar from LINQ:

但是,您永远不应该实现这些接口。这个系统。微软的响应式(Rx)库拥有所有你需要的实现。响应式代码最终看起来非常像LINQ;你可以把它看作是“事件的LINQ”。 System.Reactive拥有LINQ所做的一切,并添加了大量自己的操作符,尤其是处理时间的操作符。下面的代码以一些不熟悉的操作符(时间间隔和时间戳)开始,以订阅结束,但中间是一些在LINQ中应该熟悉的Where和Select操作符:

Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp().Where(x => x.Value % 2 == 0).Select(x => x.Timestamp).Subscribe(x => Trace.WriteLine(x));

The example code starts with a counter running off a periodic timer (Interval) and adds a timestamp to each event (Timestamp). It then filters the events to only include even counter values (Where), selects the timestamp values (Timestamp), and then as each resulting timestamp value arrives, writes it to the debugger (Subscribe). Don’t worry if you don’t understand the new operators, such as Interval: these are covered later in this book. For now, just keep in mind that this is a LINQ query very similar to the ones you’re already familiar with. The main difference is that LINQ to Objects and LINQ to Entities use a “pull” model, where the enumeration of a LINQ query pulls the data through the query, while LINQ to Events (System.Reactive) uses a “push” model, where the events arrive and travel through the query by themselves. The definition of an observable stream is independent from its subscriptions. The last example is the same as the following code:

示例代码从一个运行定时计时器(间隔)的计数器开始,并向每个事件(时间戳)添加一个时间戳。然后,它过滤事件,使其仅包含计数器值(Where),选择时间戳值(timestamp),然后当每个产生的时间戳值到达时,将其写入调试器(订阅)。如果您不理解新的运算符,如Interval,请不要担心:这些将在本书后面介绍。现在,请记住,这是一个LINQ查询,非常类似于您已经熟悉的那些查询。主要的区别在于,LINQ和LINQ to实体对象使用一个“pull”模式,在LINQ查询的枚举通过查询获取数据,同时LINQ事件(System.Reactive)使用“push”模型,事件到达,通过查询本身。可观察流的定义独立于它的订阅。最后一个示例与下面的代码相同:

IObservable<DateTimeOffset> timestamps =Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp().Where(x => x.Value % 2 == 0).Select(x => x.Timestamp);timestamps.Subscribe(x => Trace.WriteLine(x));

It is normal for a type to define the observable streams and make them available as an IObservable resource. Other types can then subscribe to those streams or combine them with other operators to create another observable stream.

对于一个类型来说,定义可观察流并让它们作为IObservable资源可用是很正常的。其他类型可以订阅这些流,或者将它们与其他操作符组合起来创建另一个可观察流。

A System.Reactive subscription is also a resource. The Subscribe operators return an IDisposable that represents the subscription. When your code is done listening to an observable stream, it should dispose its subscription.

一个System.Reactive订阅也是一种资源。订阅操作符返回一个表示订阅的IDisposable。当你的代码监听完一个可观察流后,它应该释放它的订阅。

Subscriptions behave differently with hot and cold observables. A hot observable is a stream of events that is always going on, and if there are no subscribers when the events come in, they are lost. For example, mouse movement is a hot observable. A cold observable is an observable that doesn’t have incoming events all the time. A cold observable will react to a subscription by starting the sequence of events. For example, an HTTP download is a cold observable; the subscription causes the HTTP request to be sent.

对于热观察对象和冷观察对象,订阅的行为是不同的。一个热门的可观察对象是一个始终在进行的事件流,如果事件传入时没有订阅者,那么它们就丢失了。例如,鼠标移动是一个热门的可观察对象。一个冷的可观察对象是一个不总是有传入事件的可观察对象。冷观察对象将通过启动事件序列来响应订阅。例如,HTTP下载是一个cold observable;订阅导致发送HTTP请求。

The Subscribe operator should always take an error handling parameter as well. The preceding examples do not; the following is a better example that will respond appropriately if the observable stream ends in an error:

订阅操作符还应该始终接受错误处理参数。前面的例子没有;下面是一个更好的例子,它会在可观察流以错误结束时做出适当的响应:

Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp().Where(x => x.Value % 2 == 0).Select(x => x.Timestamp).Subscribe(x => Trace.WriteLine(x),ex => Trace.WriteLine(ex));

Subject is one type that is useful when experimenting with System.Reactive. This “subject” is like a manual implementation of an observable stream. Your code can call OnNext, OnError, and OnCompleted, and the subject will forward those calls to its subscribers. Subject is great for experimenting, but in production code, you should strive to use operators like those covered in Chapter 6.

Subject是一种有用的类型,当试验System.Reactive。这个“subject”就像一个可观察流的手动实现。您的代码可以调用OnNext、OnError和OnCompleted,Subject将把这些调用转发给它的订阅者。Subject非常适合实验,但在产品代码中,您应该努力使用类似于第6章中所述的操作符。

There are tons of useful System.Reactive operators, and I only cover a few selected ones in this book. For more information on System.Reactive, I recommend the excellent online book Introduction to Rx.

有很多有用的System.Reactive算子,在本书中我只介绍几个选定的算子。有关System.Reactive的更多信息。我推荐优秀的在线书籍介绍Rx。

Introduction to Dataflows:介绍数据流

TPL Dataflow is an interesting mix of asynchronous and parallel technologies.It’s useful when you have a sequence of processes that need to be applied to your data. For example, you may need to download data from a URL, parse it, and then process it in parallel with other data.

TPL数据流是异步和并行技术的有趣组合。当您有一系列需要应用于数据的流程时,它很有用。例如,您可能需要从一个URL下载数据,解析它,然后与其他数据并行处理它。

TPL Dataflow is commonly used as a simple pipeline, where data enters one end and travels until it comes out the other. However, TPL Dataflow is far more powerful than this; it’s capable of handling any kind of mesh. You can define forks, joins, and loops in a mesh, and TPL Dataflow will handle them appropriately. Most of the time, though, TPL Dataflow meshes are used as a pipeline.

TPL数据流通常用作简单的管道,其中数据从一端进入,一直传输到另一端出来。然而,TPL数据流远比这强大;它能够处理任何类型的网格。您可以在网格中定义分叉、连接和循环,TPL数据流将适当地处理它们。但是,大多数时候,TPL数据流网格被用作管道。

The basic building unit of a dataflow mesh is a dataflow block. A block can either be a target block (receiving data), a source block (producing data), or both. Source blocks can be linked to target blocks to create the mesh; linking is covered in Recipe 5.1. Blocks are semi-independent; they will attempt to process data as it arrives and push the results downstream. The usual way of using TPL Dataflow is to create all the blocks, link them together, and then start putting data in at one end. The data then comes out of the other end by itself. Again, Dataflow is more powerful than this; it’s possible to break links and create new blocks and add them to the mesh while there is data flowing through it, but that is a very advanced scenario.

数据流网格的基本构建单元是数据流块。一个块可以是一个目标块(接收数据),一个源块(产生数据),或者同时是两个。源块可以链接到目标块来创建网格;链接在本书5.1中有介绍。块半独立;他们将尝试在数据到达时处理数据,并将结果推到下游。使用TPL数据流的通常方法是创建所有块,将它们链接在一起,然后开始在一端插入数据。然后数据就会自己从另一端出来。再说一次,数据流比这更强大;它可以打破链接,创建新的块,并将它们添加到网格中,而有数据流过它,但这是一个非常高级的场景。

Target blocks have buffers for the data they receive. Having buffers enables them to accept new data items even if they aren’t ready to process them yet; this keeps data flowing through the mesh. This buffering can cause problems in fork scenarios, where one source block is linked to two target blocks. When the source block has data to send downstream, it starts offering it to its linked blocks one at a time. By default, the first target block would just take the data and buffer it, and the second target block would never get any. The fix for this situation is to limit the target block buffers by making them nongreedy; Recipe 5.4 covers this.

目标块有用于接收数据的缓冲区。拥有缓冲区使它们能够接受新的数据项,即使它们还没有准备好处理它们;这使得数据在网格中流动。在fork场景中,一个源块链接到两个目标块,这种缓冲可能会导致问题。当源块有数据要发送到下游时,它开始一次向其链接的块提供一个数据。默认情况下,第一个目标块只会获取数据并对其进行缓冲,而第二个目标块不会获取任何数据。解决这种情况的方法是限制目标块缓冲区,使它们不贪婪;本书5.4涵盖了这一点。

A block will fault when something goes wrong, for example, if the processing delegate throws an exception when processing a data item. When a block faults, it will stop receiving data. By default, it won’t take down the whole mesh; this enables you to rebuild that part of the mesh or redirect the data. However, this is an advanced scenario; most times, you want the faults to propagate along the links to the target blocks. Dataflow supports this option as well; the only tricky part is that when an exception is propagated along a link, it is wrapped in an AggregateException. So, if you have a long pipeline, you could end up with a deeply nested exception; the method AggregateException.Flatten can be used to work around this:

当出现故障时,块就会出现故障,例如,如果处理委托在处理数据项时抛出异常。当一个块发生故障时,将停止接收数据。默认情况下,它不会删除整个网格;这使您可以重新生成网格的那一部分或重定向数据。然而,这是一个高级的场景;大多数情况下,您希望故障沿着链接传播到目标块。Dataflow也支持这个选项;唯一需要技巧的部分是,当异常沿着链接传播时,它被包装在AggregateException中。因此,如果你有一个很长的管道,你可能会得到一个深度嵌套的异常;AggregateException方法。Flatten可以用来解决这个问题:

try
{var multiplyBlock = new TransformBlock<int, int>(item =>{if (item == 1)throw new InvalidOperationException("Blech.");return item * 2;});var subtractBlock = new TransformBlock<int, int>(item => item - 2);multiplyBlock.LinkTo(subtractBlock,new DataflowLinkOptions { PropagateCompletion = true });multiplyBlock.Post(1);subtractBlock.Completion.Wait();
}
catch (AggregateException exception)
{AggregateException ex = exception.Flatten();Trace.WriteLine(ex.InnerException);
}

Recipe 5.2 covers dataflow error handling in more detail.

章节 5.2更详细地涵盖了数据流错误处理。

At first glance, dataflow meshes sound very much like observable streams, and they do have much in common. Both meshes and streams have the concept of data items passing through them. Also, both meshes and streams have the notion of a normal completion (a notification that no more data is coming), as well as a faulting completion (a notification that some error occurred during data processing).

乍一看,数据流网格听起来非常像可观察流,而且它们确实有很多共同点。网格和流都有通过它们的数据项的概念。同样,网格和流都有正常完成(通知不再有数据到来)和错误完成(通知在数据处理过程中发生了一些错误)的概念。

But System.Reactive (Rx) and TPL Dataflow do not have the same capabilities. Rx observables are generally better than dataflow blocks when doing anything related to timing. Dataflow blocks are generally better than Rx observables when doing parallel processing. Conceptually, Rx works more like setting up callbacks: each step in the observable directly calls the next step.

但System.Reactive (Rx)和TPL数据流没有相同的功能。在做任何与时间相关的事情时,Rx observable通常比数据流块要好。在进行并行处理时,数据流块通常比Rx observable要好。从概念上讲,Rx的工作方式更像是设置回调:可观察对象中的每一步都直接调用下一步。

In contrast, each block in a dataflow mesh is very independent from all the other blocks. Both Rx and TPL Dataflow have their own uses, with some amount of overlap. They also work quite well together; Recipe 8.8 covers interoperability between Rx and TPL Dataflow.

相比之下,数据流网格中的每个块都非常独立于所有其他块。Rx和TPL数据流都有各自的用途,但有一些重叠。他们也能很好地合作;配方8.8涵盖了Rx和TPL数据流之间的互操作性。

If you’re familiar with actor frameworks, TPL Dataflow will seem to share similarities with them. Each dataflow block is independent, in the sense that it will spin up tasks to do work as needed, like executing a transformation delegate or pushing output to the next block. You can also set up each block to run in parallel, so that it’ll spin up multiple tasks to deal with additional input. Due to this behavior, each block does have a certain similarity to an actor in an actor framework. However, TPL Dataflow is not a full actor framework; in particular, there’s no built-in support for clean error recovery or retries of any kind. TPL Dataflow is a library with an actor-like feel, but it isn’t a fullfeatured actor framework.

如果您熟悉actor框架,那么TPL数据流似乎与它们有一些相似之处。每个数据流块都是独立的,因为它将根据需要启动任务来完成工作,比如执行转换委托或将输出推送到下一个块。您还可以设置每个块并行运行,这样它就会启动多个任务来处理额外的输入。由于这种行为,每个块都与参与者框架中的参与者有一定的相似性。然而,TPL数据流并不是一个完整的参与者框架;特别是,没有内置的对干净的错误恢复或任何类型的重试的支持。TPL Dataflow是一个具有角色感觉的库,但它不是一个功能齐全的角色框架。

The most common TPL Dataflow block types are TransformBlock<TInput,TOutput> (similar to LINQ’s Select), TransformManyBlock<TInput,TOutput> (similar to LINQ’s SelectMany), and ActionBlock,which executes a delegate for each data item. For more information on TPL Dataflow, I recommend the MSDN documentation and the “Guide to Implementing Custom TPL Dataflow Blocks”.

最常见的TPL数据流块类型是TransformBlock<TInput,TOutput>(类似于LINQ的Select), TransformManyBlock<TInput,TOutput>(类似于LINQ的SelectMany),和ActionBlock,它为每个数据项执行一个委托。关于TPL数据流的更多信息,我推荐MSDN文档和“实现自定义TPL数据流块指南”。

Concurrency in C# Cookbook中文翻译 :1.3并发性概述:响应式编程入门(Rx)相关推荐

  1. 【Ada语言学习笔记】参考手册中文翻译及注记——语言概述

    我们在Concurrent & Distributed Systems课的实验中需要用到Ada语言. 通俗而笼统地讲,Ada语言是一种描述特别详尽的语言(highly specific),因此 ...

  2. Python wmi Cookbook 中文翻译

    简介:   本文所有的例均是假设你在使用来自 http://timgolden.me.uk/python/wmi/cookbook.html 的 WMI 模块.使用此模块,你可以在 Windows 系 ...

  3. 【翻译】使用Ext JS设计响应式应用程序

    原文:Designing Responsive Applications with Ext JS 在当今这个时代,用户都希望Web应用程序无论在形状还是大小上,既能在桌面电脑,也能在移动设备上使用.使 ...

  4. 【转】关于HTTP中文翻译的讨论

    http://www.ituring.com.cn/article/1817 讨论参与者共16位: 图灵谢工 杨博 陈睿杰 贾洪峰 李锟 丁雪丰 郭义 梁涛 吴玺喆 邓聪 胡金埔 臧秀涛 张伸 图钉派 ...

  5. 翻译是一份严谨的工作——关于HTTP中文翻译的讨论

    讨论参与者共16位: 图灵谢工 杨博 陈睿杰 贾洪峰 李锟 丁雪丰 郭义 梁涛 吴玺喆 邓聪 胡金埔 臧秀涛 张伸 图钉派007LL 图钉派111DP 图钉派-34徐浩然 辩论主题:HTTP中的&qu ...

  6. YOLOv4全文阅读(全文中文翻译)

    YOLOv4全文阅读(全文中文翻译) YOLOv4: Optimal Speed and Accuracy of Object Detection 论文链接: https://arxiv.org/pd ...

  7. ctypealpha php_php ctype函数中文翻译和示例

    PHP Ctype扩展是PHP4.2开始就内建的扩展,注意,Ctype系列函数都只有一个字符串类型参数,它们返回布尔值. $str = "0.1123"; //检查字符串所有字符是 ...

  8. sound.js # pixi辅助插件 — 中文翻译教程

    本篇博客为中文翻译博客,转载请注明出处 sound.js-pixi的交互性插件[版本3.0.11] 安装配置 加载声音文件 初始化加载的声音 播放和控制加载的声音 更改回放速率 添加回声 添加混响 产 ...

  9. jBPM3.12用户指南中文翻译----第一章 绪论

    这是Jboss 的jBPM3.12框架的用户指南的中文翻译.其中第一章的译文,是我在网上找到的.其他几章都是我自己做的翻译.我的翻译是中英文对照,只翻译部分我认为重要的,不翻译简单的英文,以免浪费你我 ...

最新文章

  1. [Tarjan][割点] 洛谷 P3469 BLO-Blockade
  2. 在测试时用到的一些mysql的小技巧(持续更新)
  3. 如何使用vscode安装和调试Java程序
  4. scrapy-1.2.1安装失败之解决方法
  5. UVA 10976 Fractions Again?!【暴力枚举/注意推导下/分子分母分开保存】
  6. python之迭代器,生成器
  7. node --- 使用node连接mysql
  8. python3.8安装xlwings出错_Python xlwings模块简单使用
  9. PostgreSQL 9.6 keepalived主从部署
  10. Qt 设置当前窗口出现在左右窗口的最前面
  11. [转载] Java面试题大全(2020版)
  12. linux下c语言俄罗斯方块,c语言做俄罗斯方块
  13. RTX移植到STM32F103
  14. sql卡住php-fpm会cpu大涨,排查PHP-FPM占用CPU过高
  15. 马化腾:一推就倒!中国技术实力只是表面辉煌罢了
  16. 开源经济模型 MAKRO
  17. php查询过滤字段,php 字符过滤类,用于过滤各类用户输入的数据
  18. idm6.40最新版exe下载器介绍
  19. matlab差值报错,matlab插值介绍
  20. Android DES加密解密

热门文章

  1. 部署Tectonic服务到kubernetes集群
  2. 大鹏半岛海边团建烧烤野炊上岛赶海一日游全攻略
  3. Python解压rar 文件
  4. 怎么分辨GOIP网关型号鼎信通达
  5. 中国AI研究超美国?专家:比如深度学习已发文章数
  6. 在Eclipse环境下Tomcat的配置
  7. 【工作感悟】 工作感悟-时间管理
  8. 日本语外来语(片假名)与英语发音对照
  9. 开启关闭外网数据库的方法
  10. (车内网IDS-CAN)读书笔记——Scission