在本文中,我将说明如何借助Storm框架以可扩展且无锁定的方式在数据库中维护实时事件驱动流程的当前状态。

Storm是基于事件的数据处理引擎。 它的模型依赖于基本原语,例如事件转换,过滤,聚合……,我们将它们组合成拓扑 。 拓扑的执行通常分布在多个节点上,并且风暴群集还可以并行执行给定拓扑的多个实例。 因此,在设计时,务必牢记哪些Storm原语在分区范围内执行,即在一个群集节点的级别上执行,以及哪些在群集范围内执行(又称为重新分区操作) ,因为它们涉及将事件从分区到分区)。 Storm Trident API文档明确提到了哪些功能做什么,作用范围如何。 Storm的分区概念与Kafka队列的分区概念保持一致, Kafka队列是入站事件的常见来源。

拓扑通常需要维护一些执行的持续状态。 例如,这可以是一些传感器值的滑动窗口平均值,从推文中提取的近期情绪,在不同位置出现的人数。……由于某些状态更新操作具有分区范围(例如partitionAggregate ),因此可伸缩性模型在这里尤为重要。其他则具有集群范围(例如groupby + perstitentAggregate的组合)。 这篇文章中说明了这一点。

示例代码可在githup上获得 。 它基于Storm 0.8.2,Cassandra 1.2.5和JDK 1.7.0。 请注意,此示例未包含适当的错误处理:喷口或螺栓均不支持重试失败的元组,我将在以后的文章中解决。 另外,我使用Java序列化将数据存储在元组中,因此,即使Storm支持多种语言,我的示例也是特定于Java的。

实际示例:出席事件

我的示例是模拟一个跟踪人们在建筑物内位置的系统。 每当用户进入或离开房间时,每个房间入口处的传感器都会发出如下事件:

{"eventType": "ENTER", "userId": "John_5", "time": 1374922058918, "roomId": "Cafetaria", "id": "bf499c0bd09856e7e0f68271336103e0A", "corrId": "bf499c0bd09856e7e0f68271336103e0"}
{"eventType": "ENTER", "userId": "Zoe_15", "time": 1374915978294, "roomId": "Conf1", "id": "3051649a933a5ca5aeff0d951aa44994A", "corrId": "3051649a933a5ca5aeff0d951aa44994"}
{"eventType": "LEAVE", "userId": "Jenny_6", "time": 1374934783522, "roomId": "Conf1", "id": "6abb451d45061968d9ca01b984445ee8B", "corrId": "6abb451d45061968d9ca01b984445ee8"}
{"eventType": "ENTER", "userId": "Zoe_12", "time": 1374921990623, "roomId": "Hall", "id": "86a691490fff3fd4d805dce39f832b31A", "corrId": "86a691490fff3fd4d805dce39f832b31"}
{"eventType": "LEAVE", "userId": "Marie_11", "time": 1374927215277, "roomId": "Conf1", "id": "837e05916349b42bc4c5f65c0b2bca9dB", "corrId": "837e05916349b42bc4c5f65c0b2bca9d"}
{"eventType": "ENTER", "userId": "Robert_8", "time": 1374911746598, "roomId": "Annex1", "id": "c461a50e236cb5b4d6b2f45d1de5cbb5A", "corrId": "c461a50e236cb5b4d6b2f45d1de5cbb5"}

对应于一个房间内一个用户的一个使用周期的(“ ENTER”和“ LEAVE”)对中的每个事件具有相同的相关性ID。 这可能对传感器提出了很多要求,但是出于本示例的目的,这使我的生活更加轻松

为了使事情变得有趣,让我们想象一下,不能保证到达我们服务器的事件遵守时间顺序(请参阅生成事件的python脚本中的shuffle()调用)。

我们将构建一个Storm拓扑,该拓扑将构建每个房间的每分钟每分钟的占用时间线,如本文结尾处的时间图所示。 在数据库中,房间时间线被切成一个小时的时间段,这些时间段被独立存储和更新。 这是Cafetaria占用1小时的示例:

{"roomId":"Cafetaria","sliceStartMillis":1374926400000,"occupancies":[11,12,12,12,13,15,15,14,17,18,18,19,20,22,22,22,21,22,23,25,25,25,28,28,33,32,31,31,29,28,27,27,25,
22,22,21,20,19,19,19,17,17,16,16,15,15,16,15,14,13,13,12,11,10,9,11,10,9,11,10]}

为了实现这一点,我们的拓扑需要:

  • 根据correlationIDID重新组合“ ENTER”和“ LEAVE”事件,并为此用户在此房间中产生相应的存在时间
  • 将每个在场期间的影响应用于房间入住时间表

顺便说一句,Cassandra提供了Counter列 ,尽管我可以很好地替代它们,但我在这里不使用它们。 但是,我的目的是说明Storm功能,即使它会使方法有些虚构。

分组依据/ persistentAggregate / iBackingMap说明

在查看示例代码之前,让我们澄清一下这些“三叉戟风暴”原语如何协同工作。

想象一下,我们从上午9:47到上午10:34收到了两个描述用户在roomA中存在​​的事件。 更新会议室的时间表需要:

  • 从数据库加载两个受影响的时间轴切片:[9.00am,10:00 am]和[10.00am,11:00 am]
  • 在这两个时间轴切片中添加此用户的状态
  • 将它们保存到数据库

但是,像这样天真地实现这一目标远非最优,首先是因为它每个事件使用两个DB请求,其次是因为这种“读取-更新-写入”序列通常需要一种锁定机制,该锁定机制通常无法很好地扩展。

为了解决第一点,我们想对几个事件重新组合数据库操作。 在Storm中,事件(或元组 )被成批处理。 IBackingMap是一个我们可以实现的原语,它使我们可以立即查看整批元组。 我们将使用它在批处理的开始(multiget)和结束时的所有DB-write操作(multiput)重新分组。 但是,multiget不允许我们查看元组本身,而只能查看“查询键”,这是从元组内容中计算出来的,如下所述。

原因在于上面提到的关于天真的实现的第二点:我们想并行执行几个[multiget +更新逻辑+ multiput]流,而不依赖锁。 这是通过确保那些并行子进程更新不相交的数据集来实现的。 这就要求定义拆分为并行流的拓扑元素还控制在每个流内的DB中加载和更新哪些数据。 该元素是Storm groupBy原语:它通过按字段值对元组进行分组来定义拆分,并且它通过将“ groupedBy”值作为对multiget的查询关键字来控制每个并行流更新的数据。

下图在房间占用示例中对此进行了说明(简化为每个房间仅存储一个时间轴,而不是每个一小时片段存储一个时间轴):

但是,并行性并没有完全发生(例如,当前的Storm实现在分组流中依次调用每个reducer / combiner),但这是设计拓扑时要牢记的一个好模型。

有趣的是,在groupBy和multiget之间发生了一些Storm魔术。 回想一下,Storm旨在进行大规模分布,这意味着每个流在多个节点上并行执行,并从诸如Hadoop HDFS或分布式Kafka队列之类的分布式数据源获取输入数据。 这意味着groupBy()同时在几个节点上执行,所有可能处理的事件都需要组合在一起。 groupBy是一种重新分区操作 ,可确保将所有需要分组的事件都发送到同一节点,并由IBackingMap +组合器或化简器的同一实例处理,因此不会发生争用情况。

同样,Storm要求我们将IBackingMap包装到可用的Storm MapState原语(或我们自己的原语)之一中,通常用于处理失败/重播的元组。 如上所述,我不在本文中讨论这一方面。

使用这种方法,我们必须实现IBackingMap,以便它尊重以下属性:

  • 对于不同的键值,由multiget读取和由IBackingMap的multiput操作写入的数据库行必须是不同的。

我想这就是他们将这些值称为“关键”的原因 (尽管任何尊重此属性的方法都可以)。

回到例子

让我们看看这在实践中是如何工作的。 该示例的主要拓扑在此处可用:

// reading events
.newStream("occupancy", new SimpleFileStringSpout("data/events.json", "rawOccupancyEvent"))
.each(new Fields("rawOccupancyEvent"), new EventBuilder(), new Fields("occupancyEvent"))

第一部分只是读取JSON格式的输入事件(我使用的是简单的文件输出),对它们进行反序列化,然后使用Java序列化将它们放入称为“ occupancyEvent”的元组字段中。 这些元组中的每一个都描述了用户在房间内或房间外的“ ENTER”或“ LEAVE”事件。

// gathering "enter" and "leave" events into "presence periods"
.each(new Fields("occupancyEvent"), new ExtractCorrelationId(), new Fields("correlationId"))
.groupBy(new Fields("correlationId"))
.persistentAggregate( PeriodBackingMap.FACTORY, new Fields("occupancyEvent"), new PeriodBuilder(), new Fields("presencePeriod"))
.newValuesStream()

当我们遇到correlationId的不同值时,groupBy原语会创建尽可能多的元组组(这可能意味着很多,因为通常最多两个事件具有相同的correlationId)。 当前批处理中具有相同相关ID的所有元组将重新组合在一起,并且一组或几组元组将一起显示给persistentAggregate中定义的元素。 PeriodBackingMap是IBackingMap的实现,其中实现了multiget方法,该方法将接收将在后续步骤中处理的元组组的所有相关ID(例如:{“ roomA”,“ roomB”,“ Hall ”},如上图所示。

public List<RoomPresencePeriod> multiGet(List<List<Object>> keys) {return CassandraDB.DB.getPresencePeriods(toCorrelationIdList(keys));
}

该代码只需要从数据库中检索每个相关ID的潜在存在期间即可。 因为我们在一个元组字段上执行了groupBy,所以每个List在这里都包含一个字符串:correlationId。 请注意,我们返回的列表必须与键列表的大小完全相同,以便Storm知道哪个周期对应于哪个键。 因此,对于数据库中不存在的任何键,我们只需在结果列表中放置一个空值即可。

一旦加载,Storm就会将一个具有相同相关性ID的元组一个一个地呈现给我们的化简器PeriodBuilder 。 在我们的例子中,我们知道在此批处理中每个唯一的RelationshipId最多被调用两次,但是一般来说可能会更多,或者如果当前批处理中不存在其他ENTER / LEAVE事件,则只能调用一次。 在对muliget()/ multiput()的调用与我们的reducer之间,借助我们选择的MapState实现,Storm让我们可以插入适当的逻辑来重放以前失败的元组。 在以后的文章中有更多关于…

一旦我们减少了每个元组序列,Storm就会将结果传递给IBackingMap的mulitput(),在这里我们将所有内容“追加”到数据库:

public void multiPut(List<List<Object>> keys, List<RoomPresencePeriod> newOrUpdatedPeriods) {CassandraDB.DB.upsertPeriods(newOrUpdatedPeriods);
}

Storm persistenceAggregate会使用我们的reducer提供给multitput()的值自动向拓扑元组的后续部分发出。 这意味着我们刚刚建立的在线状态很容易作为元组字段使用,我们可以使用它们直接更新会议室时间线:

// building room timeline
.each(new Fields("presencePeriod"), new IsPeriodComplete())
.each(new Fields("presencePeriod"), new BuildHourlyUpdateInfo(), new Fields("roomId", "roundStartTime"))
.groupBy(new Fields("roomId", "roundStartTime"))
.persistentAggregate( TimelineBackingMap.FACTORY, new Fields("presencePeriod","roomId", "roundStartTime"), new TimelineUpdater(), new Fields("hourlyTimeline"))

第一行只是过滤掉尚未包含“ ENTER”和“ LEAVE”事件的任何期间。

然后, BuildHourlyUpdateInfo实现一对多的元组发射逻辑:对于每个占用期,它仅在“开始时间”内发射一个元组。 例如,从9:47 am到10:34 am在roomA中的占用将在此处触发针对roomA的9.00am时间轴切片的元组,以及另一个针对10.00am的元组的发射。

下一部分实现与以前相同的groupBy / IBackingMap方法,只是这次使用两个分组键而不是一个(因此,mulitget中的List <Object>将包含两个值:一个String和一个Long)。 由于我们存储一个小时的时间轴块,因此上面提到的IBackingMap的必要属性受到尊重。 多重获取为每个(“ roomId”,“开始时间”)对检索时间线块,然后TimelineUpdater (再次是reducer)用与当前批次中找到的该时间线片相对应的每个存在时间更新时间线片(这就是BuildHourlyUpdateInfo的一对多元组发射逻辑)和multiput()仅保存结果。

导致咖啡厅占用

当我们看着它时,一切总是更加美丽,因此让我们来绘制房间的占用情况 。 稍后,用一些R代码 ,我们可以看到每分钟的房间占用情况(由于所有数据都是随机的,所以意义不大,但是……):

结论

希望本文提供了一种维护Storm拓扑状态的有用方法。 我还尝试说明了将处理逻辑实现为小型拓扑元素的实现,这些拓扑元素彼此插入,而不是将一些“冗长的”螺栓捆绑在冗长而复杂的逻辑部分上。

Storm的一个重要方面是它的可扩展性,很可能会在任何地方插入该子类或子类的子类来调整其行为。 春天十年前有这种聪明有趣的感觉(哦,该死,我现在有点老了……^ __ ^)

参考:来自Svend博客的 JCG合作伙伴 Svend Vanderveken 使用Storm进行的可伸缩实时状态更新 。

翻译自: https://www.javacodegeeks.com/2013/08/scalable-real-time-state-update-with-storm.html

使用Storm进行可扩展的实时状态更新相关推荐

  1. 使用storm 实时计算_使用Storm进行可扩展的实时状态更新

    使用storm 实时计算 在本文中,我将说明如何借助Storm框架以可扩展且无锁定的方式在数据库中维护实时事件驱动流程的当前状态. Storm是基于事件的数据处理引擎. 它的模型依赖于基本原语,例如事 ...

  2. Spotify如何对Apache Storm进行规模扩展

    [编者的话]Spotify是一家音乐流媒体服务商,最新的数据显示他们已经有6000万用户.Spotify内部使用Apache Storm来构建实时类系统,包括广告定位.音乐推荐以及数据可视化等.本文来 ...

  3. vspy如何在图形面板显示报文_设备实时状态监控:如何进行工业生产设备数据采集?...

    设备实时状态监控:如何进行工业生产设备数据采集?数据采集(DAQ),是指从传感器和其它待测设备等模拟和数字被测单元中自动采集非电量或者电量信号,送到上位机中进行分析,处理. 慧都设备数据采集系统解决方 ...

  4. XAML实时显示更新插件LiveXAML

    2019独角兽企业重金招聘Python工程师标准>>>  XAML实时显示更新插件LiveXAML LiveXAML是Visual Studio的第三方扩展插件.该插件可以从Visu ...

  5. 学习MSCKF笔记——后端、状态预测、状态扩增、状态更新

    学习MSCKF笔记--后端.状态预测.状态扩增.状态更新 学习MSCKF笔记--后端.状态预测.状态扩增.状态更新 1. 状态预测 2. 状态扩增 3. 状态更新 学习MSCKF笔记--后端.状态预测 ...

  6. java 更新订单状态_Java 8状态更新

    java 更新订单状态 即将到来的Java SE 8发行版的两大新语言功能是Lambda Expressions和Modularity. 这两天的状态更新都已经发布. 我会与您分享链接,因此您可能会在 ...

  7. Java 8状态更新

    即将到来的Java SE 8发行版的两大新语言功能是Lambda Expressions和Modularity. 对于这两者,这些天的状态更新已经发布. 我会与您共享链接,因此您可能会在假期中通读它们 ...

  8. 【077】Flight Aware-实时航班和机场状态更新

    飞行出行已经成为现代人出行的常用方式,每天也有数万个航班在天空飞行.今天分享一个实时更新全球民用航班和民用机场状态的网站. 地址:参见文末图 Fight Aware提供全球实时航班交通情况,首页的世界 ...

  9. MYSQL数据库跨服务器实时同步更新实践----文献阅读(污水管网水质预测)

    文章目录 摘要 一. MYSQL 数据库主从复制,实时同步的再现 1. 1 先下载虚拟机(硬件)与lunxi系统(centos7) 1.2 在lunxi 系统安装rpm 版mysql 5.5.55 1 ...

最新文章

  1. 第一个jfinal的样例
  2. vba搜索java里面的sql_在EXCEL中使用SQL语句查询集锦-持续更新中,敬请关注
  3. 用CSS美化被鼠标选中的文字的样式
  4. ajax发送请求-同步和异步
  5. Objective-C征途:Hello Objective-C
  6. 浅谈Vue.js的优势
  7. 在Visual Studio上开发Node.js程序(2)——远程调试及发布到Azure
  8. GWT(Google Web Tookit) Eclipse Plugin的zip下载地址(同时提供GWT Designer下载地址)
  9. mysql客户端报错1366_mysql一些异常
  10. oracle 日期 加一秒,Leap Second (闰秒) 在ORACLE环境的影响
  11. uniapp 模糊搜索文字添加颜色
  12. graphpad做单因素方差分析_手把手教你用Graphpad做单因素方差分析
  13. 服务器设置来电自动重启,电脑来电自动重启怎么样设置
  14. h5活动是什么意思_H5是什么,怎么用H5做运营活动?
  15. 山水印|竹林野茶:走向世界的中国茶文化
  16. Windows7/10耳机插入前面板没反应的解决方案
  17. 如何在element-plus配套vue3中使用日期时间选择器默认英文修改为中文
  18. 投影坐标系的shp数据,如何获取到它地理坐标系下的经纬度坐标
  19. pageInfo的转化,do转vo
  20. 图像金字塔(Python实现)

热门文章

  1. java常用代码_Java 中常用代码 (欢迎补充)
  2. vue插槽面试题_VUE面试题解析,半年出一篇,建议收藏!
  3. 服务器复制不了文档,服务器复制粘贴不了
  4. ubuntu安装python3.8_Ubuntu 16.04 安装 python3.8
  5. mysql批量插入数据的函数和存储过程
  6. java流与文件——操作文件
  7. Spring boot(3):Spring boot中Redis 的使用
  8. io.realm:rea_使Java具有响应性的框架和工具包:RxJava,Spring Reactor,Akka和Vert.x概述...
  9. 认识JSON补丁:JSON-P 1.1概述系列
  10. mongodb json_在MongoDB和Spring Batch中将XML转换为JSON和原始使用