使用MongoDB进行事件流
MongoDB是一个非常出色的“ NoSQL”数据库,具有广泛的应用程序。 在SoftwareMill开发的一个项目中,我们将其用作复制的事件存储,然后将事件从事件流传输到其他组件。
介绍
基本思想非常简单(另请参阅Martin Fowler关于Event Sourcing的文章)。 我们的系统生成一系列事件。 这些事件将保留在事件存储中。 系统中的其他组件遵循事件流并对其进行“处理”。 例如,可以将它们汇总并写入报告数据库(另一方面,它类似于CQRS )。 这种方法有很多优点:
- 事件的读取和写入是解耦的(异步的)
- 鉴于它没有死得太久,任何后续组件都可能死亡,然后“追赶”
- 可能有多个关注者。 跟随者可以从从属副本读取数据,以获得更好的可伸缩性
- 事件活动的爆发对事件接收器的影响减少; 最坏的情况下,报告生成速度会变慢
这里的关键组件当然是快速可靠的事件存储。 我们用来实现一个的MongoDB的三个关键功能是:
- 上限集合和尾部游标
- 快速收集附件
- 复制集
采集
作为基础,我们使用有上限的集合 ,根据定义,该集合受大小限制。 如果编写新事件将导致集合超出大小限制,则最早的事件将被覆盖。 这给了我们类似于事件的循环缓冲区的功能。 (此外,我们也很安全地避免了磁盘空间不足错误。)
在2.2版之前,默认情况下,上限集合没有_id字段(因此没有索引)。 但是,由于我们希望事件能够在整个副本集上可靠地写入,因此_id字段及其上的索引都是必需的。
写作活动
编写事件是一个简单的Mongo插入操作; 插入也可以分批完成。 根据我们对事件丢失的容忍度,我们可能会使用各种Mongo 写入问题 (例如,等待来自单节点或多个节点的写入确认)。
所有事件都是不可变的。 除了更好的,线程安全的Java代码外,这是事件流的必要条件。 如果事件是可变的,事件接收器将如何知道更新的内容? 而且,这对Mongo的性能有很好的影响。 由于永远不会更改数据,因此写入磁盘的文档永远不会缩小或扩展,因此无需在磁盘上移动块。 实际上,在具有上限的集合中,Mongo不允许增长曾经编写的文档。
阅读活动
读取事件流要复杂一些。 首先,可能有多个阅读器,每个阅读器在流中具有不同的进度。 其次,如果流中没有事件,我们希望读者等待一些事件可用,并避免主动轮询。 最后,我们想分批处理事件,以提高性能。
有尾游标可以解决这些问题。 要创建这样的游标,我们必须提供一个起点–事件的ID,我们将从该事件开始读取; 如果未提供ID,则光标将返回最早的可用事件。 因此,每个读取器必须存储它已读取和处理的最后一个事件。
更重要的是,如果没有新数据可用,可尾光标可以有选择地阻塞一段时间,从而解决了主动轮询问题。
(顺便说一下,mongo用于在副本集之间复制数据的oplog集合也是一个有上限的集合。从属Mongo实例在该集合后面尾随,流式传输“事件”(即数据库操作),并按顺序在本地应用它们。 )
读取Java中的事件
使用Mongo Java驱动程序时 ,有一些“问题”。 首先,您需要初始化游标。 为此,我们需要提供(1)最后一个事件ID(如果存在); (2)我们要读取事件的顺序(此处为自然顺序,即插入顺序); (3)两个关键的游标选项,我们希望游标是可拖尾的,并且如果没有新数据,我们希望将其阻止:
DBObject query = lastReceivedEventId.isPresent()? BasicDBObjectBuilder.start('_id', BasicDBObjectBuilder.start('$gte', lastReceivedEventId.get()).get()).get(): null;DBObject sortBy = BasicDBObjectBuilder.start('$natural', 1).get();DBCollection collection = ... // must be a capped collection
DBCursor cursor = collection.find(query).sort(sortBy).addOption(Bytes.QUERYOPTION_TAILABLE).addOption(Bytes.QUERYOPTION_AWAITDATA);
您可能想知道为什么我们使用>= last_id
而不是>
。 由于生成Mongo ObjectId的方式在这里需要。 如果使用一个简单的> last_id
我们可能会错过一些与last_id
事件在同一秒之后但之后发生的事件。 这也意味着我们的Java代码必须处理这一事实,并丢弃收到的第一个事件。
游标的类扩展了基本的Java Iterator
接口,因此非常易于使用。 因此,现在我们可以进行批处理了。 在游标上进行迭代时,驱动程序将批量从Mongo服务器接收数据; 因此我们可以像调用其他迭代器一样简单地调用hasNext()
和next()
来接收后续元素,并且只有某些调用会真正导致与服务器的网络通信。
在Mongo Java驱动程序中,实际上可能阻塞的hasNext()
是hasNext()
。 如果我们要分批处理事件,我们需要(a)只要有可用的元素就读取它们,并且(b)在被阻止没有更多事件之前有某种了解的方式,并且我们可以处理事件已经批处理。 由于hasNext()
可以阻止,因此我们无法直接执行此操作。
这就是为什么我们引入了中间队列( LinkedBlockingQueue
)的原因。 在单独的线程中,从游标读取的事件在到达时即被放入队列中。 如果没有事件,则线程将在cursor.hasNext()
上cursor.hasNext()
。 阻塞队列有一个可选的大小限制,因此,如果队列已满,则放置一个元素也将阻塞,直到有可用空间为止。 在事件消费者线程中,我们首先尝试以阻塞方式(使用.poll
从队列中读取单个元素,因此我们在这里等待所有事件可用。 然后,我们尝试将队列的全部内容消耗到一个临时集合中(使用.drainTo
,构建批处理,并可能获取0个元素,但我们始终拥有第一个)。
值得一提的是,如果集合为空,则Mongo不会阻止,因此我们必须回到主动轮询。 我们还必须考虑到游标可能会在等待期间死亡的事实。 要对此进行检查,我们应该验证cursor.getCursorId() != 0
,其中0是“死光标”的ID。 在这种情况下,我们只需要重新实例化游标即可。
加起来
综上所述,我们得到了一个非常快速的事件源/流解决方案。 从某种意义上说,这是“自我调节”,即如果事件活动达到高峰,则事件接收器将大批量读取这些事件。 如果事件活动少,则将分批快速处理它们。
我们还将同一个Mongo实例用于其他目的。 从操作角度来看,拥有一个数据库系统来聚簇和维护常规数据和事件肯定是一件好事。
参考: Adam Warski博客的Blog中来自我们的JCG合作伙伴 Adam Warski的MongoDB事件流 。
翻译自: https://www.javacodegeeks.com/2012/11/event-streaming-with-mongodb.html
使用MongoDB进行事件流相关推荐
- 事件流--事件冒泡现象及阻止
事件冒泡现象 <!DOCTYPE html> <html lang="en"> <head><meta charset="UTF ...
- QWidget一生,从创建到销毁事件流
版权声明:若无来源注明,Techie亮博客文章均为原创. 转载请以链接形式标明本文标题和地址: 本文标题:QWidget一生,从创建到销毁事件流 本文地址:http://techieliang ...
- 后处理程序文件大小的变量_【每日一题】(17题)面试官问:JS中事件流,事件处理程序,事件对象的理解?...
关注「松宝写代码」,精选好文,每日一题 作者:saucxs | songEagle 2020,实「鼠」不易 2021,「牛」转乾坤 风劲潮涌当扬帆,任重道远须奋蹄! 一.前言 2020.12.23 立 ...
- html流动模型,javascript的事件流模型都有什么?
事件流:当你在页面触发一个点击事件后,页面上不仅仅有一个元素响应该事件而是多个元素响应同一个事件,因为元素是在容器中的.事件发生的顺序就是事件流,不同的浏览器对事件流的处理不同. JavaScript ...
- 功能整合(二):轮播图(可控)、事件流
1.轮播图 可实现: 自动轮播(加载完成后调用定时器).鼠标移入停止(停止定时器).鼠标移出开始(开始定时器),点击切换(点击事件) 1 window.onload = function(){ 2 / ...
- JavaScriptjQuery.事件流
事件流 事件冒泡:事件从最具体的节点开始向外传播到最宽泛的节点.这是事件流默认类型,被绝代多数浏览器支持. element.addEventListener('event',function,trun ...
- IE和DOM事件流、普通事件和绑定事件的区别
IE和DOM事件流的区别 IE采用冒泡型事件 Netscape(网络信息浏览器)使用捕获型事件 DOM使用先捕获后冒泡型事件 示例: <body> <div> <butt ...
- JS事件流和事件委托
在上一篇<JS知识点大杂烩>中说到了事件流但没有详细的介绍,这篇文章就来介绍一下事件流. 事件流一共由三个阶段分别是: 1.捕获阶段 2.目标阶段 3.冒泡阶段 复制代码 事件绑定大家都知 ...
- 由event target引发的关于事件流的一连串思考(二)
阻止事件冒泡 W3C的方法是ev.stopPropagation(),IE则是使用ev.cancelBubble = true. 先不谈IE的私有方法,首先讨论一个问题:ev.stopPropagat ...
最新文章
- linux shell命令设置内存大小运行jar文件
- python读取csv文件的方法-python读写csv文件的方法
- C++ Primer 5th笔记(9)chapter9 顺序容器 vector 容器的自增长 容器适配器
- 用matlab做一个有刻度的网格,已知45个点X Y Z的坐标值已知,如何用matlab画出网格图,另外每个小方格里带颜色 - 程序语言 - 小木虫 - 学术 科研 互动社区...
- d3.js 入门指南 - 仪表盘
- eclipse MyEclipse中安装 spket插件 js文件内容字体变小解决方案
- 安卓非常实用的自动化测试工具 -- Monkey详细的说明
- Java 和C# 最大的不同是对底层的控制能力不同
- coredata 自动化刷新uitableview数据
- 树莓派的命令和linux一样吗,常用的linux命令
- 为什么三表联查查出的数据每条出现好多次_独家解读!京东高可用分布式流数据存储的架构设计...
- Linux登陆密码策略
- 今日尾号限行数据接口代码实现
- AMD64(x86_64)架构abi文档:中
- 清北毕业生5年来去向大数据:北大偏爱银行,清华更倾向国网,华为堪称最大黑洞-1
- 各种进制换算成十进制
- [转载]只需要读内存实现的Dota全图
- 本地计算机上的windows installer,一个烦人的Windows Installer问题
- 2020央视元宵晚会 | 《你的样子》朗诵词
- 一个清华大学生几天猎头生活的感想---很有感触的一篇文章(zz)
热门文章
- 基于Apache POI 从xlsx读出数据
- lombok var_使用var,Lombok和Fluxtion轻松处理事件
- 大数据 java 代码示例_功能Java示例 第7部分–将失败也视为数据
- lineseparator_首选System.lineSeparator()以用Java编写系统相关的行分隔符字符串
- sts集成jboss_与JBoss BPM Travel Agency更新了Modern BPM数据集成
- tomcat默认连接数_Tomcat的默认连接器
- osgi:install_OSGi服务测试助手:ServiceCollector
- lambda 延迟执行_Java Lambdas和低延迟
- mockito 单元测试_使用FizzBu​​zz和Mockito进行单元测试
- JDK 14 Rampdown:内部版本27