MongoDB是一个非常出色的“ NoSQL”数据库,具有广泛的应用程序。 在SoftwareMill开发的一个项目中,我们将其用作复制的事件存储,然后将事件从事件流传输到其他组件。

介绍

基本思想非常简单(另请参阅Martin Fowler关于Event Sourcing的文章)。 我们的系统生成一系列事件。 这些事件将保留在事件存储中。 系统中的其他组件遵循事件流并对其进行“处理”。 例如,可以将它们汇总并写入报告数据库(另一方面,它类似于CQRS )。 这种方法有很多优点:

  • 事件的读取和写入是解耦的(异步的)
  • 鉴于它没有死得太久,任何后续组件都可能死亡,然后“追赶”
  • 可能有多个关注者。 跟随者可以从从属副本读取数据,以获得更好的可伸缩性
  • 事件活动的爆发对事件接收器的影响减少; 最坏的情况下,报告生成速度会变慢

这里的关键组件当然是快速可靠的事件存储。 我们用来实现一个的MongoDB的三个关键功能是:

  • 上限集合和尾部游标
  • 快速收集附件
  • 复制集

采集

作为基础,我们使用有上限的集合 ,根据定义,该集合受大小限制。 如果编写新事件将导致集合超出大小限制,则最早的事件将被覆盖。 这给了我们类似于事件的循环缓冲区的功能。 (此外,我们也很安全地避免了磁盘空间不足错误。)

在2.2版之前,默认情况下,上限集合没有_id字段(因此没有索引)。 但是,由于我们希望事件能够在整个副本集上可靠地写入,因此_id字段及其上的索引都是必需的。

写作活动

编写事件是一个简单的Mongo插入操作; 插入也可以分批完成。 根据我们对事件丢失的容忍度,我们可能会使用各种Mongo 写入问题 (例如,等待来自单节点或多个节点的写入确认)。

所有事件都是不可变的。 除了更好的,线程安全的Java代码外,这是事件流的必要条件。 如果事件是可变的,事件接收器将如何知道更新的内容? 而且,这对Mongo的性能有很好的影响。 由于永远不会更改数据,因此写入磁盘的文档永远不会缩小或扩展,因此无需在磁盘上移动块。 实际上,在具有上限的集合中,Mongo不允许增长曾经编写的文档。

阅读活动

读取事件流要复杂一些。 首先,可能有多个阅读器,每个阅读器在流中具有不同的进度。 其次,如果流中没有事件,我们希望读者等待一些事件可用,并避免主动轮询。 最后,我们想分批处理事件,以提高性能。

有尾游标可以解决这些问题。 要创建这样的游标,我们必须提供一个起点–事件的ID,我们将从该事件开始读取; 如果未提供ID,则光标将返回最早的可用事件。 因此,每个读取器必须存储它已读取和处理的最后一个事件。

更重要的是,如果没有新数据可用,可尾光标可以有选择地阻塞一段时间,从而解决了主动轮询问题。

(顺便说一下,mongo用于在副本集之间复制数据的oplog集合也是一个有上限的集合。从属Mongo实例在该集合后面尾随,流式传输“事件”(即数据库操作),并按顺序在本地应用它们。 )

读取Java中的事件

使用Mongo Java驱动程序时 ,有一些“问题”。 首先,您需要初始化游标。 为此,我们需要提供(1)最后一个事件ID(如果存在); (2)我们要读取事件的顺序(此处为自然顺序,即插入顺序); (3)两个关键的游标选项,我们希望游标是可拖尾的,并且如果没有新数据,我们希望将其阻止:

DBObject query = lastReceivedEventId.isPresent()? BasicDBObjectBuilder.start('_id', BasicDBObjectBuilder.start('$gte', lastReceivedEventId.get()).get()).get(): null;DBObject sortBy = BasicDBObjectBuilder.start('$natural', 1).get();DBCollection collection = ... // must be a capped collection
DBCursor cursor = collection.find(query).sort(sortBy).addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA);

您可能想知道为什么我们使用>= last_id而不是> 。 由于生成Mongo ObjectId的方式在这里需要。 如果使用一个简单的> last_id我们可能会错过一些与last_id事件在同一秒之后但之后发生的事件。 这也意味着我们的Java代码必须处理这一事实,并丢弃收到的第一个事件。

游标的类扩展了基本的Java Iterator接口,因此非常易于使用。 因此,现在我们可以进行批处理了。 在游标上进行迭代时,驱动程序将批量从Mongo服务器接收数据; 因此我们可以像调用其他迭代器一样简单地调用hasNext()next()来接收后续元素,并且只有某些调用会真正导致与服务器的网络通信。

在Mongo Java驱动程序中,实际上可能阻塞的hasNext()hasNext() 。 如果我们要分批处理事件,我们需要(a)只要有可用的元素就读取它们,并且(b)在被阻止没有更多事件之前有某种了解的方式,并且我们可以处理事件已经批处理。 由于hasNext()可以阻止,因此我们无法直接执行此操作。

这就是为什么我们引入了中间队列( LinkedBlockingQueue )的原因。 在单独的线程中,从游标读取的事件在到达时即被放入队列中。 如果没有事件,则线程将在cursor.hasNext()cursor.hasNext() 。 阻塞队列有一个可选的大小限制,因此,如果队列已满,则放置一个元素也将阻塞,直到有可用空间为止。 在事件消费者线程中,我们首先尝试以阻塞方式(使用.poll从队列中读取单个元素,因此我们在这里等待所有事件可用。 然后,我们尝试将队列的全部内容消耗到一个临时集合中(使用.drainTo ,构建批处理,并可能获取0个元素,但我们始终拥有第一个)。

值得一提的是,如果集合为空,则Mongo不会阻止,因此我们必须回到主动轮询。 我们还必须考虑到游标可能会在等待期间死亡的事实。 要对此进行检查,我们应该验证cursor.getCursorId() != 0 ,其中0是“死光标”的ID。 在这种情况下,我们只需要重新实例化游标即可。

加起来

综上所述,我们得到了一个非常快速的事件源/流解决方案。 从某种意义上说,这是“自我调节”,即如果事件活动达到高峰,则事件接收器将大批量读取这些事件。 如果事件活动少,则将分批快速处理它们。

我们还将同一个Mongo实例用于其他目的。 从操作角度来看,拥有一个数据库系统来聚簇和维护常规数据和事件肯定是一件好事。

参考: Adam Warski博客的Blog中来自我们的JCG合作伙伴 Adam Warski的MongoDB事件流 。

翻译自: https://www.javacodegeeks.com/2012/11/event-streaming-with-mongodb.html

使用MongoDB进行事件流相关推荐

  1. 事件流--事件冒泡现象及阻止

    事件冒泡现象 <!DOCTYPE html> <html lang="en"> <head><meta charset="UTF ...

  2. QWidget一生,从创建到销毁事件流

    版权声明:若无来源注明,Techie亮博客文章均为原创. 转载请以链接形式标明本文标题和地址: 本文标题:QWidget一生,从创建到销毁事件流     本文地址:http://techieliang ...

  3. 后处理程序文件大小的变量_【每日一题】(17题)面试官问:JS中事件流,事件处理程序,事件对象的理解?...

    关注「松宝写代码」,精选好文,每日一题 作者:saucxs | songEagle 2020,实「鼠」不易 2021,「牛」转乾坤 风劲潮涌当扬帆,任重道远须奋蹄! 一.前言 2020.12.23 立 ...

  4. html流动模型,javascript的事件流模型都有什么?

    事件流:当你在页面触发一个点击事件后,页面上不仅仅有一个元素响应该事件而是多个元素响应同一个事件,因为元素是在容器中的.事件发生的顺序就是事件流,不同的浏览器对事件流的处理不同. JavaScript ...

  5. 功能整合(二):轮播图(可控)、事件流

    1.轮播图 可实现: 自动轮播(加载完成后调用定时器).鼠标移入停止(停止定时器).鼠标移出开始(开始定时器),点击切换(点击事件) 1 window.onload = function(){ 2 / ...

  6. JavaScriptjQuery.事件流

    事件流 事件冒泡:事件从最具体的节点开始向外传播到最宽泛的节点.这是事件流默认类型,被绝代多数浏览器支持. element.addEventListener('event',function,trun ...

  7. IE和DOM事件流、普通事件和绑定事件的区别

    IE和DOM事件流的区别 IE采用冒泡型事件 Netscape(网络信息浏览器)使用捕获型事件 DOM使用先捕获后冒泡型事件 示例: <body> <div> <butt ...

  8. JS事件流和事件委托

    在上一篇<JS知识点大杂烩>中说到了事件流但没有详细的介绍,这篇文章就来介绍一下事件流. 事件流一共由三个阶段分别是: 1.捕获阶段 2.目标阶段 3.冒泡阶段 复制代码 事件绑定大家都知 ...

  9. 由event target引发的关于事件流的一连串思考(二)

    阻止事件冒泡 W3C的方法是ev.stopPropagation(),IE则是使用ev.cancelBubble = true. 先不谈IE的私有方法,首先讨论一个问题:ev.stopPropagat ...

最新文章

  1. linux shell命令设置内存大小运行jar文件
  2. python读取csv文件的方法-python读写csv文件的方法
  3. C++ Primer 5th笔记(9)chapter9 顺序容器 vector 容器的自增长 容器适配器
  4. 用matlab做一个有刻度的网格,已知45个点X Y Z的坐标值已知,如何用matlab画出网格图,另外每个小方格里带颜色 - 程序语言 - 小木虫 - 学术 科研 互动社区...
  5. d3.js 入门指南 - 仪表盘
  6. eclipse MyEclipse中安装 spket插件 js文件内容字体变小解决方案
  7. 安卓非常实用的自动化测试工具 -- Monkey详细的说明
  8. Java 和C# 最大的不同是对底层的控制能力不同
  9. coredata 自动化刷新uitableview数据
  10. 树莓派的命令和linux一样吗,常用的linux命令
  11. 为什么三表联查查出的数据每条出现好多次_独家解读!京东高可用分布式流数据存储的架构设计...
  12. Linux登陆密码策略
  13. 今日尾号限行数据接口代码实现
  14. AMD64(x86_64)架构abi文档:中
  15. 清北毕业生5年来去向大数据:北大偏爱银行,清华更倾向国网,华为堪称最大黑洞-1
  16. 各种进制换算成十进制
  17. [转载]只需要读内存实现的Dota全图
  18. 本地计算机上的windows installer,一个烦人的Windows Installer问题
  19. 2020央视元宵晚会 | 《你的样子》朗诵词
  20. 一个清华大学生几天猎头生活的感想---很有感触的一篇文章(zz)

热门文章

  1. 基于Apache POI 从xlsx读出数据
  2. lombok var_使用var,Lombok和Fluxtion轻松处理事件
  3. 大数据 java 代码示例_功能Java示例 第7部分–将失败也视为数据
  4. lineseparator_首选System.lineSeparator()以用Java编写系统相关的行分隔符字符串
  5. sts集成jboss_与JBoss BPM Travel Agency更新了Modern BPM数据集成
  6. tomcat默认连接数_Tomcat的默认连接器
  7. osgi:install_OSGi服务测试助手:ServiceCollector
  8. lambda 延迟执行_Java Lambdas和低延迟
  9. mockito 单元测试_使用FizzBu​​zz和Mockito进行单元测试
  10. JDK 14 Rampdown:内部版本27