学习Elixir的GenStage

你好! Leraning Elixir的读者!从我的上一篇文章发表已经过去了很长时间。我一直专注于其他技术,并获得了新的工作。我要感谢我的朋友Doug Goldie,让我在那段时间里仍然关注Elixir。我要感谢我的妻子鼓励我回到这个博客上。

从我最后一次用 Elixir 做严肃的工作以来,已经发生了很多事情。 Elixir 1.3发布,2016 Elixir世界大会,以及更多。引起我注意的事情之一是GenStage。去年,GenRouter计划开始,看起来像现在演变成了GenStage。

GenStage有很多要学习。为了接近这个主题,让我们从一些问题开始:

什么是GenStage(以及 Flow)?
    在哪里可以找到更多信息?
    什么是一个值得一试的好的项目?

对于问题#1,我开始谷歌搜索,它出现了正式的GenStage公告。哇,它是7月发布的,我已经走了很长时间。无论如何,它给出以下描述:

GenStage是一种新的Elixir行为,用于在Elixir进程之间使用背压来交换事件。

但这是什么意思?让我们来看看这个定义。首先,GenStage用于Elixir。这很简单。

GenStage是一个“行为”,这是接口的OTP术语。具体来说,GenStage定义了一组必须由接受行为的进程实现的函数或回调。 GenStage还可以提供这些函数的默认实现。

GenStage是“用于在Elixir 进程之间交换事件…”。事件?我记得这是从GenRouter项目的意图改变而来的。如果我记得正确,GenRouter用于通过多进程stream传输数据。我认为这种向事件的转变可能是来自GenRouter的泛化。

GenStage通过“背压”实现这一切。背压是用于控制生产者 - 消费者的速率的机制。如果生产者和消费者异步运行,那么很可能一个人能够领先另一个。如果消费者落后,则背压可以用于防止生产者过度生产。

背景

公告里给出了这个例子

File.stream!("path/to/some/file")
|> Stream.flat_map(fn line ->String.split(line, " ")end)
|> Enum.reduce(%{}, fn word, acc ->Map.update(acc, word, 1, & &1 + 1)end)
|> Enum.to_list()

来描述GenStage的动机。这是Elixir中常见的惰性数据转换流水线的示例。但是,这个解决方案没有利用BEAM和现代CPU提供的并发性。

GenStage的目标是允许并发处理大型数据集,同时仍保留Elixir易于理解的数据转换管道样式。

GenStage

该公告接着展示了GenStage的例子。首先有一个称为A的计数器

alias Experimental.GenStagedefmodule A douse GenStagedef init(counter) do{:producer, counter}enddef handle_demand(demand, counter) when demand > 0 do# If the counter is 3 and we ask for 2 items, we will# emit the items 3 and 4, and set the state to 5.events = Enum.to_list(counter..counter+demand-1)# The events to emit is the second element of the tuple,# the third being the state.{:noreply, events, counter + demand}end
end

这是一个生产者。有一件事,我读完公告之后想知道的是::producerinit/1函数中的意思。:producer是由GenStage识别的特殊值还是只是A的名称?看看我发现的文档:

在成功启动的情况下,此回调必须返回一个元组,其中第一个元素是stage类型, 它是:producer:consumer:producer_consumer

所以事实上::producer是GenStage里的特殊值。

A的其余部分是handle_demand/2函数。这是一个回调,GenStage将使用它从生产者阶段请求更多的东西。 demand参数是请求的事件数。计数器参数是进程A的当前状态。因为A是一个计数器,它将当前计数保持为其状态。对于每个handle_demand/2调用,返回足够的值以满足需求,并且计数器按需增加。这样,A可以在下一次调用时返回后续的值。

该声明继续构建了一个:consumer阶段,它有一个handle_events/3回调。此函数应处理或存储传递的事件并更新GenStage的状态。

声明里也建立了一个:producer_consumer类型阶段。此类型必须定义handle_demand/2handle_events/3回调。

连接stage

下一步是启动stage,然后使用sync_subscribe/3连接它们。这个步骤似乎有点手动,但它听起来像在简单的情形下Flow将授权一个更容易的方法来组装这些stage。

我发现最有趣的部分是,多个用户可以连接,以创建更多的并发。当我开始阅读声明时,我担心GenStage将只允许创建管道并发,这不是并发的最有效的形式。这是因为5个 stage 的管道只允许5个并发活动。但GenStage似乎更加灵活。

GenStage用例

声明描述了GenStage的一些用例。

GenStage 的数据摄取

GenStage的一个用例是使用来自第三方系统的数据。

这听起来很像我的Domain Scraper实验。使用GenStage重写Domain Scraper将是一个有趣的练习。

GenStage用于事件分派

另一个用例:

GenStage今天可以使用的另一种情况是替换开发人员过去使用GenEvent的情况。

该宣言继续描述GenStage对GenEvent的优势:

然而,GenEvent有一个很大的缺陷:事件管理器和所有事件处理程序在同一个进程中运行。

那很有意思。我以前没有意识到GenEvent的这个限制。 GenStage在这种情况下似乎有了很大的改进。当使用GenEvent作为观察者模式的通知系统时,可能有许多观察者。由于所有处理程序在同一进程中运行,所有处理程序必须连续运行,牺牲BEAM为我们提供的并发性。

接着号召有GenEvent用户的行动起来:

首先,现在是社区介入并尝试GenStage的时刻。如果你以前使用过GenEvent,可以用GenStage代替吗?同样,如果您计划实现事件处理系统,请尝试GenStage。

我有一个使用GenEvent的项目。回到二月,我写了关于使用GenEvent来通知Elixir中的更新频道。这可能是一个很好的开始试验GenStage的地方。

看看Flow

声明的结尾给出了一个Flow的简介,这允许在开始的例子被重写成:

alias Experimental.GenStage.Flow# Let's compile common patterns for performance
empty_space = :binary.compile_pattern(" ") # NEW!File.stream!("path/to/some/file", read_ahead: 100_000) # NEW!
|> Flow.from_enumerable()
|> Flow.flat_map(fn line ->for word <- String.split(empty_space), do: {word, 1}end)
|> Flow.partition_with(storage: :ets) # NEW!
|> Flow.reduce_by_key(& &1 + &2)
|> Enum.to_list()

这允许在数据变换流水线中的同步处理。看看文档,看起来Flow是可用的,我想在未来的一篇文章中尝试。

[elixir! #0015][译] 学习 GenStage by Joseph Kain相关推荐

  1. [译] 学习 JavaScript:9 个常见错误阻碍你进步

    本文讲的是[译] 学习 JavaScript:9 个常见错误阻碍你进步, 原文地址:Learning JavaScript: 9 Common Mistakes That Are Holding Yo ...

  2. [elixir! #0007] [译] 理解Elixir中的宏——part.5 重塑AST by Saša Jurić

    上一章我们提出了一个基本版的deftraceable宏,能让我们编写可跟踪的函数.宏的最终版本有一些剩余的问题,今天我们将解决其中的一个--参数模式匹配. 今天的练习表明我们必须仔细考虑宏可能接收到的 ...

  3. [译]学习IPython进行交互式计算和数据可视化(四)

    第三章 使用Python进行数字计算 尽管IPython强大的shell和扩展后的控制台能被任何Python程序员使用,但是这个工具最初是科学奖为科学家设计的.它的主要设计目标就是为使用Python进 ...

  4. Distribute Strategy--翻译学习

    Overview tf.distribute.Strategy,是一个用于多GPU.多服务器或多TPU进行分布式模型训练的API接口.用户通过改动较少的代码就可以使用该接口进行分布式训练.该API除了 ...

  5. python人机交互界面设计_[译]学习IPython进行交互式计算和数据可视化(五)

    第四章:交互式绘图接口 本章我们将展示Python的绘图功能以及如何在IPython中交互式地使用它们. NumPy为处理大量的多维数组结构的数据提供了高效的方法.但是看行行列列的数字总不如直接看曲线 ...

  6. SegmentFault 技术周刊 Vol.30 - 学习 Python 来做一些神奇好玩的事情吧

    前言 开始之前,我们先来看这样一个提问: python初学者,请教python学习路径 相信看完 @X_AirDu 的回答我们已经对 Python 有了一个大概的了解.那接下来就让我们更深入的了解 P ...

  7. Elixir - comprehensions

    Elixir - comprehensions 第一次学习Elixir,作为一个笔记记录一下学习过程,内容中均为自己的理解,存在理解有误的地方,请指出,谢谢! comprehension基础使用 功能 ...

  8. 技术周刊 Vol.40 - 来学习一门新的编程语言吧!

    新的一年不知不觉已经到来了,总结过去的 2017,相信小伙们一定有很多收获和感悟,那对于刚刚到来的 2018,有什么期待和规划呢?不防就来学习一门新的语言吧,进一步提高编程技能,扩宽自己的知识领域.希 ...

  9. 108本开源的技术学习资源!登上GitHub趋势镑

    ​ 同学们,不知道你们每天做什么事情时最开心那? (当然是喝肥宅快乐水了呀) 反正小七最快乐的就是打开电脑,学习最新的知识 近期一个整合了108本免费开源的技术书籍/项目/手册的项目火了,登上了Git ...

最新文章

  1. 高等数学:第三章 微分中值定理与导数的应用(3)曲线的凹凸 拐点 曲率
  2. web服务器采用的是什么协议,webservice中采用协议Http,它是指什么意思
  3. Spring编程模型:Spring实现了哪些编程模型?
  4. php aes 128位加密,php实现AES 128位加密的相关操作技巧分享
  5. SAS宏技术中,%let和call symput有什么区别?
  6. matlab里数组的赋值,arrays – MATLAB结构赋值数组
  7. java 文件下载方法_【工具类】Java后台上传下载文件的几种方式
  8. php 2个经纬度之间的距离,php计算两个经纬度之间的距离
  9. 属性getter和setter
  10. 注释为基础的SpringMVC
  11. 物联网大数据如何改善农业运营
  12. turbo c语言教程,C语言基础教程(六)Turbo C 程序设计初步(1)
  13. 推荐7 款实用好用的电脑软件
  14. 万年历java循环,万年历代码 java万年历源代码是多少?
  15. ffplay 分析概述
  16. 解决电脑因System进程而变得很卡
  17. vmware上用kali破解wifi
  18. 《东周列国志》第六十回 智武子分军肆敌 偪阳城三将斗力
  19. MySQL 百万级/千万级表 全量更新
  20. oralce_函数使用

热门文章

  1. 计算机视觉论文-2021-03-01
  2. Linux 查看MAC地址
  3. 互联网券商证券公司网上开户系统设计
  4. SpringBoot 雪花算法生成商品订单号【SpringBoot系列13】
  5. 2D/3D加速器概念
  6. ZZULIOJ:1008美元和人民币
  7. 3dsmax2020安装报1603错误的解决方法
  8. 获取IFeatureWorkspace所有要素类、表
  9. 懂球帝Android客户端WebView优化之路
  10. python数据分析与可视化