云计算设计模式(十五)——管道和过滤器模式

分解,执行复杂处理成一系列可重复使用分立元件的一个任务。这种模式可以允许执行的处理进行部署和独立缩放任务元素提高性能,可扩展性和可重用性。

背景和问题

一个应用程序可能需要执行各种关于它处理的信息不同复杂的任务。一个简单,但不灵活的方式来实施这个应用程序可以执行此处理为单一模块。然而,这种方法有可能减少用于重构代码,对其进行优化,或者重新使用它,如果是在应用程序中其他地方所需要的相同的处理的部件的机会。

图1通过使用单片式的方式示出了与处理数据的问题。一个应用程序接收并处理来自两个来源的数据进行处理。从每个源数据是由执行一系列任务来转换该数据,并传递结果给应用程序的业务逻辑之前的独立模块进行处理。

图1  - 使用单一模块实现的解决方案

部分的单片模块执行的任务在功能上是非常相似的,但在模块已被分开设计的。实现该任务的代码被紧密模块内耦合,并且此代码已开发具有很少或没有给定重新使用或可伸缩性的思想。

然而,由每个模块或每个任务的部署要求执行的处理任务,可能会改变,因为业务需求进行修改。有些任务可能是计算密集型的,并可能受益于强大的硬件上运行,而其他人可能并不需要如此昂贵的资源。此外,额外的处理可能需要在将来,或顺序,其中由所述处理执行的任务可能会改变。一个解决方案是必需的,解决了这些问题,并且增加的可能性代码重用。

解决方案

分解需要为每个数据流转换为一组离散的元件(或过滤器)的处理,其中每一个执行单任务。通过标准化每个组件接收和发射的数据的格式,这些过滤器可以组合在一起成为一个管道。这有助于避免重复代码,并且可以很容易地移除,替换或集成额外的组件,如果处理要求改变。图2显示了这种结构的一个例子。

图2 - 通过使用管道和过滤器实现的解决方案

处理一个请求所花费的时间取决于最慢的过滤器管道中的速度。这可能是一个或多个滤波器可能被证明是一个瓶颈,尤其是如果出现在从一个特定的数据源的数据流的大量请求。流水线结构的一个关键优点是它提供了机会,运行速度慢的过滤器的并联情况下,使系统能够分散负载并提高吞吐量。

可以独立缩放组成一个管道可以在不同的机器上运行过滤器,使他们和可以利用的弹性,许多云计算环境提供的优势。过滤器是计算密集型可以在高性能的硬件上运行,而其他要求不高的过滤器可以对商品(便宜)的硬件来承载。过滤器甚至不需要是在同一数据中心或地理位置,它允许在一个管道中的每个元素的环境下接近它需要的资源来运行。

图3示出了从源1施加到管道中的数据的一个例子。

图3  - 在一个管道负载平衡组件

如果一个滤波器的输入和输出被构造为一个流,它可能是能够进行的处理并行的每个过滤器。在流水线的第一个过滤器可以开始工作,并开始发射其结果,它们会直接传递到序列中的下一个过滤器之前的第一过滤器已经完成它的工作。

另一个好处是灵活性,这种模式可以提供。如果一个过滤器发生故障或者其上运行的机器不再可用时,管道可能能够重新安排滤波器所执行的工作,并指示此工作到组件的另一个实例。单个过滤器的故障不会必然导致整个管道的故障。

使用管道和过滤器与补偿交易模式相结合的模式可以提供一种替代的方法来实现分布式事务。分布式事务可以被分解成单独的赔的任务,每个都可以通过使用一个过滤器,也实现了补偿事务图案来实现。在一个管道中的过滤器可以在运行接近它们保持数据被实现为单独的托管工作。

问题和注意事项

在决定如何实现这个模式时,您应考虑以下几点:
•复杂性。增加的灵活性,这种模式提供了还可以引入复杂性,特别是如果被分布在不同的服务器上在管道的过滤器。
•可靠性。使用一个基础结构,可以确保在管道中的过滤器之间流动的数据也不会丢失。
•幂等性。如果在管道中的过滤失败接收到消息后,任务被重新调度到过滤器的另一个实例,所述部分工作可能已经完成。如果这个工作更新的全局状态的某些方面(如存储在数据库中的信息),同样更新可以重复。如果公布的结果,在管道中的下一个过滤器后,过滤器出现故障,但在此之前表示,该公司已经成功地完成了它的工作可能会出现类似的问题。在这些情况下,相同的工作可以由过滤器的另一个实例被重复,导致相同的结果要贴两次。这可能导致在管道随后过滤两次处理相同的数据。因此,在一个管道的过滤器应该被设计为幂等。欲了解更多信息,请参见乔纳森·奥利弗的博客幂等模式。
•重复的消息。如果在管道中的过滤器可以发布一个消息给流水线的下一个阶段之后发生故障时,过滤器的另一个实例,可以执行(由幂等考虑以上所描述的),并且将发布相同消息的拷贝到流水线。这可能导致同样的信息的两个实例被传递到下一个过滤器。为了避免这种情况,该管道应检测并消除重复的消息。

注意:

如果要实现管道使用消息队列(如微软的Azure服务总线队列),消息队列基础设施可以提供自动重复消息检测和清除。

•上下文和状态。在管道中,每个过滤器主要运行在孤立和不应该做这件事是如何被调用的任何假设。这意味着,每一个过滤器必须具有足够的上下文与它能够执行它的工作提供。这种情况下可包含相当数量的状态信息。

何时使用这个模式

使用这种模式时:
•由一个应用程序所需的处理可以很容易地被分解成一组离散的,独立的步骤。
•由应用程序执行的处理步骤具有不同的可扩展性要求。

注意:

它可能会向组过滤器应扩展一起在相同的过程。欲了解更多信息,请参阅计算资源整合模式。

•灵活性是必需的,以允许通过一个应用程序,或能力进行添加和删除步骤中的处理步骤重新排序。
•该系统可以受益于分配处理跨不同服务器的步骤。
•一个可靠的解决方案是必需的,当数据正在被处理的最小化在一个步骤失败的影响。

这种模式可能不适合时:
•通过应用程序执行的处理步骤并不是独立的,或者他们必须共同作为同一事务的一部分来执行。
•在一个步骤所需的上下文或状态的信息量使得这种方法效率很低。它可能会持续状态信息到数据库代替,但不要使用此策略,如果在数据库上的额外负载会导致过度竞争。

例子

可以使用消息队列的一个序列,以提供执行流水线所需的基础设施。最初的消息队列接收未处理的消息。实现为过滤器的任务侦听此队列的消息的组件,它执行其工作,然后投递转化的消息序列中的下一个队列。另一个过滤器的任务可以侦听在这个队列中的消息,对其进行处理,后的结果到另一个队列,依此类推,直到完全转化的数据出现在队列中的最后一个消息。

图4 - 通过使用消息队列实现管道

如果你正在构建一个解决方案,在Azure上,你可以使用服务总线队列提供了可靠的,可扩展的排队机制。下面所示的ServiceBusPipeFilter类提供了一个例子。它演示了如何实现接收从队列中输入消息,处理这些邮件的过滤器,并张贴结果到另一个队列。

注意:

该ServiceBusPipeFilter类在PipesAndFilters解决方案PipesAndFilters.Shared项目定义。此示例代码都可以可以下载本指导意见。

public class ServiceBusPipeFilter
{...private readonly string inQueuePath;private readonly string outQueuePath;...private QueueClient inQueue;private QueueClient outQueue;...public ServiceBusPipeFilter(..., string inQueuePath, string outQueuePath = null){...this.inQueuePath = inQueuePath;this.outQueuePath = outQueuePath;}public void Start(){...// Create the outbound filter queue if it does not exist....this.outQueue = QueueClient.CreateFromConnectionString(...);...// Create the inbound and outbound queue clients.this.inQueue = QueueClient.CreateFromConnectionString(...);}public void OnPipeFilterMessageAsync(Func<BrokeredMessage, Task<BrokeredMessage>> asyncFilterTask, ...) {...this.inQueue.OnMessageAsync(async (msg) =>{...// Process the filter and send the output to the // next queue in the pipeline.var outMessage = await asyncFilterTask(msg);// Send the message from the filter processor // to the next queue in the pipeline.if (outQueue != null){await outQueue.SendAsync(outMessage);}// Note: There is a chance that the same message could be sent twice // or that a message may be processed by an upstream or downstream // filter at the same time.// This would happen in a situation where processing of a message was// completed, it was sent to the next pipe/queue, and then failed // to complete when using the PeekLock method.// Idempotent message processing and concurrency should be considered // in a real-world implementation.},options);}public async Task Close(TimeSpan timespan){// Pause the processing threads.this.pauseProcessingEvent.Reset();// There is no clean approach for waiting for the threads to complete// the processing. This example simply stops any new processing, waits// for the existing thread to complete, then closes the message pump // and finally returns.Thread.Sleep(timespan);this.inQueue.Close();...}...
}

在ServiceBusPipeFilter类Start方法连接到一对输入和输出队列,以及关闭方法从输入队列断开。该OnPipeFilterMessageAsync方法执行消息的实际处理;该asyncFilterTask参数这种方法指定要执行的处理。该OnPipeFilterMessageAsync方法等待输入队列中收到的消息,因为它到达,并张贴结果到输出队列通过运行在每个邮件的asyncFilterTask参数指定的代码。队列本身的构造函数中指定。

样品溶液的过滤器实现了在一组工作角色。每个工人的作用可独立进行调整,这取决于它执行的业务处理的复杂性,或者它需要执行此处理的资源。此外,各辅助角色的多个实例可以并行地运行,以提高吞吐量。

下面的代码显示了一个名为PipeFilterARoleEntry的Azure工作者角色,这是在样品溶液中PipeFilterA项目定义。

public class PipeFilterARoleEntry : RoleEntryPoint
{...private ServiceBusPipeFilter pipeFilterA;public override bool OnStart(){...this.pipeFilterA = new ServiceBusPipeFilter(...,Constants.QueueAPath,Constants.QueueBPath);this.pipeFilterA.Start();...}public override void Run(){this.pipeFilterA.OnPipeFilterMessageAsync(async (msg) =>{// Clone the message and update it.// Properties set by the broker (Deliver count, enqueue time, ...) // are not cloned and must be copied over if required.var newMsg = msg.Clone();await Task.Delay(500); // DOING WORKTrace.TraceInformation("Filter A processed message:{0} at {1}", msg.MessageId, DateTime.UtcNow);newMsg.Properties.Add(Constants.FilterAMessageKey, "Complete");return newMsg;});...}...
}

这个角色包含ServiceBusPipeFilter对象。在角色OnStart方法连接到队列接收输入的信息并张贴输出消息(队列的名称在常量类中定义)。 Run方法调用OnPipeFilterMessagesAsync方法来对接收到的(在本例中,该处理通过等待较短的时间段模拟的)的每个消息执行某些处理。何时处理完成时,一个新的消息被构造包含结果(在这种情况下,输入消息被简单地增加了一个自定义属性),并将该消息发送到输出队列。

示例代码中包含一个名为PipeFilterBRoleEntry在PipeFilterB项目的另一名工人的作用。这个角色类似于PipeFilterARoleEntry不同之处在于它的Run方法进行不同的处理。在本例中的解决方案,这两种作用结合起来,构建一个管道;为PipeFilterARoleEntry角色输出队列是用于PipeFilterBRoleEntry角色的输入队列。

样品溶液还提供了两个名为InitialSenderRoleEntry(在InitialSender项目)和FinalReceiverRoleEntry(在FinalReceiver项目),进一步的角色。该InitialSenderRoleEntry作用提供了在管道中的初始消息。 OnStart方法连接到单个队列和运行方法的帖子的方法来此队列。这个队列是所使用的PipeFilterARoleEntry作用,所以发布一条消息到这个队列的输入队列导致由PipeFilterARoleEntry作用来接收和处理消息。经处理的信息,然后通过PipeFilterBRoleEntry作用传递。

为FinalReceiveRoleEntry角色输入队列是用于PipeFilterBRoleEntry角色的输出队列。 Run方法在FinalReceiveRoleEntry作用,如下图所示,接收到该消息,并且执行一些最后的处理。然后将其写入了过滤器的管道跟踪输出添加自定义属性的值。

public class FinalReceiverRoleEntry : RoleEntryPoint
{...// Final queue/pipe in the pipeline from which to process data.private ServiceBusPipeFilter queueFinal;public override bool OnStart(){...// Set up the queue.this.queueFinal = new ServiceBusPipeFilter(...,Constants.QueueFinalPath);this.queueFinal.Start();...}public override void Run(){this.queueFinal.OnPipeFilterMessageAsync(async (msg) =>{await Task.Delay(500); // DOING WORK// The pipeline message was received.Trace.TraceInformation("Pipeline Message Complete - FilterA:{0} FilterB:{1}",msg.Properties[Constants.FilterAMessageKey],msg.Properties[Constants.FilterBMessageKey]);return null;});...}...
}

本文翻译自MSDN: http://msdn.microsoft.com/en-us/library/dn568100.aspx

云计算设计模式(十五)——管道和过滤器模式相关推荐

  1. 管道和过滤器模式----POSA模式助记格式

    名称:管道和过滤器(Pipes and Filters) 问题: 处理数据流 1.步骤可替换 2.步骤比组件更易于重用 3.不相连的步骤不共享信息 4.存在不同的输入数据源 5.可以以多种方式给出或存 ...

  2. 云计算设计模式(五)——计算资源整合模式

    合并多个任务或操作成一个单一的计算单元.这种模式可以提高计算资源的利用率,并降低与云托管的应用程序进行计算处理相关的成本和管理开销. 背景和问题 云应用程序频繁执行各种操作.在某些解决方案也可能是有意 ...

  3. 【每天一个java设计模式(十五)】 - 命令模式

    命令模式是一种数据驱动的设计模式,它属于行为型模式.请求以命令的形式包裹在对象中,并传给调用对象.调用对象寻找可以处理该命令的合适的对象,并把该命令传给相应的对象,该对象执行命令. 命令模式也就是一个 ...

  4. Linux中的管道和过滤器

    管道和过滤器 管道 Shell可以将两个或多个程序连接到一起,以使一个程序的输出变成另一个程序的输入,这种方式连接的两个或者多个程序就形成了管道.管道通常用于执行一些复杂的数据处理操作.这些命令之间使 ...

  5. 云计算设计模式(三)——补偿交易模式

    撤消由一系列步骤,它们共同限定了最终一致性操作中,如果一个或多个步骤失败执行的工作.按照最终一致性模型,业务实现复杂的业务流程和工作流的云托管的应用程序中很常见. 背景和问题 在云中运行的应用程序频繁 ...

  6. 45.深度解密四十五:网站盈利模式和私域流量的变现方式细致讲解

    网络营销推广技术.技巧深度解密(四十五)指南: 1.本文档适合零基础以及互联网营销推广工作者,主要讲解网站的盈利模式和私域流量变现的方式问题. 2.原创版权文档,任何抄袭或者全部.部分模仿都是侵权行为 ...

  7. 设计模式(五) —— 行为型模式(中)

    本篇我们将介绍四种行为型模式,分别是 解释器模式 迭代器模式 中介者模式 备忘录模式 解释器模式 我国 IT 界历来有一个汉语编程梦,虽然各方对于汉语编程争论不休,甚至上升到民族大义的高度,本文不讨论 ...

  8. 云计算设计模式(一)缓存预留模式

    云带来的改变是显而易见的,云计算是一种按使用量付费的模式,这种模式提供可用的.便捷的.按需的网络访问, 进入可配置的计算资源共享池(资源包括网络,服务器,存储,应用软件,服务),这些资源能够被快速提供 ...

  9. 设计模式(十五)享元模式(结构型)

    概述 当一个软件系统在运行时产生的对象数量太多,将导致运行代价过高,带来系统性能下降等问题.例如在一个文本字符串中存在很多重复的字符,如果每一个字符都用一个单独的对象来表示,将会占用较多的内存空间,那 ...

最新文章

  1. 自定义对话框控件bate2----20050516
  2. python - paramiko模块 修改logging提示为静默
  3. 你的网站添加X-UA-Compatible meta标签了吗?
  4. android控件---自定义带文本的ImageButton
  5. mysql录入foreigen错误_Python MySQLdb 使用utf-8 编码插入中文数据问题
  6. Android中activity传值的两种方式
  7. Java中重写equals()方法时注意点
  8. android studio设置生成的release版本apk的名称
  9. 我的世界服务器显示玩家坐标,我的世界手游版怎么显示玩家坐标
  10. Storyboard和Xib混用黑魔法-swift版
  11. 转:性能测试常见术语
  12. oracle卸载和服务问题
  13. 现实版心灵捕手,MIT研发新型耳机可读取大脑想法
  14. html help文档制作,HTML Help Workshop(文件制作工具)
  15. MySQL中针对SQL语句优化
  16. 墨染の軍師(シオン) / 墨染(异时层火刀)
  17. 2020 中国软件 100 强,腾讯第二、阿里第三,第一是谁呢?你们公司上榜了吗?
  18. OpenCV 4.5.1 - 新版本 中 BEBLID 描述符尝鲜 (基于 ubuntu / c++)
  19. tp5出现“continue“ targeting switch is equivalent to “break“. Did you mean to use “continue 2“?错误解决方法
  20. 【kernel学习】内核下载与编译

热门文章

  1. windows10系统纯净版下载地址
  2. 复习SSM day01 SSM Maven工程的搭建及配置文件
  3. 银行间市场评论员文章词云绘制效果
  4. 2018世界杯赛程表
  5. termux 安装mysql 并运行b站爬虫入库 安卓11(手机上跑定时python爬虫)
  6. ROS机器人021-机器人命令行发送cmd_vel话题及/cmd_vel geometry_msgs/Twist示例
  7. 求一个好用的简洁设置的PPT倒计时器
  8. 商业智能BI与业务管理决策思维之二:业务结构分析
  9. 谷歌ai人工智能叫什么_谷歌正在通过AI策展和内置订阅全面革新Google新闻
  10. Glance 镜像服务