前言

我们开源了一个订阅分发mysql的binlog的项目,一直用的非常好,忽然有天开发说能不能支持MongoDB的数据订阅呢,MongoDB的使用度也挺广泛的。安排。经过简单的了解后发现MongoDB也有类似binlog的机制,最终花了两天时间把功能完成,并统一抽象集成到binlog开源项目中,使用和binlog同一套订阅分发模型管理MongoDB数据源。整个过程非常顺利,比整mysql的binlog要简单的多了。

binlog数据订阅分发项目:https://gitee.com/kekingcn/ke...

oplog简介

先来聊聊MongoDB的主备机制,和mysql的binlog类似,在MongoDB中,有一个系统库“”Local”,库里有一个集合“oplog.rs”,这个集合类似于binlog文件,里面记录了MongoDB的所有操作。从节点通过读取oplog.rs里的数据做到数据同步。

解析oplog

和订阅mysql的binlog一样(模拟一个从节点mysql)。我们的订阅服务要像从节点那样读取解析oplog.rs里的数据。解析前先看下oplog.rs的Document的数据结构

上图是一个插入的数据的日志,可见oplog的doc中共有七个字段,含义分别如下:

ts:操作的时间戳(非常重要)

t:term最初在主数据库上生成操作的。(含义不明)

h:本次操作的唯一hashID

v: 版本号

op:操作类型,有六种类型,我们只需要关注其中的i(插入)、u(更新)、d(删除)即可

ns:库名和集合名称,中间使用“.”连接

o:本次操作的document内容

o2:只有op操作类型时u更新时,才会有这个字段,代表更新的条件语句

$set:o2获取后的文档里的属性,代表更新的字段

如上字段,完成一次oplog的解析,只需要ts、op、ns、o、o2、$set即可,其中ts非常重要,可以类比为binlog中的Position。同步mysql的数据时,通过记录消费binlog的位置,也就是Position,可以有效避免订阅服务停机后,消费记录丢失的问题。同步MongoDB时,通过记录ts的值,来记录消费的位置,可以到达和订阅binlog一样的效果。和mysql订阅不同的是,MongoDB的同步需要同步服务自己查询,而且oplog在MongoDB4.0之前的版本有大小限制,超过设置的容量后,老的数据就会被丢失,在4.0之后的版本已经解除了这个限制。

直接上代码

上面已经分析了oplog的结构以及订阅步骤,下面我们直接构建查询即可,需要注意,每次获取到的ts值,需要存储记录下来,已便重新订阅时,从上次断开的记录重新开始。下面直接看代码,重点逻辑都以注释详尽

private BsonTimestamp queryTs;

@Test

public void OpLogTest() {

MongoClient mongoClient = new MongoClient(new MongoClientURI("mongodb://admin:admin@127.0.0.1:3717"));

MongoCollection collection = mongoClient.getDatabase("local")

.getCollection("oplog.rs");

//如果是首次订阅,需要使用自然排序查询,获取第最后一次操作的操作时间戳。如果是续订阅直接读取记录的值赋值给queryTs即可

FindIterable tsCursor = collection.find().sort(new BasicDBObject("$natural", -1))

.limit(1);

Document tsDoc = tsCursor.first();

queryTs = (BsonTimestamp) tsDoc.get("ts");

while (true) try {

//构建查询语句,查询大于当前查询时间戳queryTs的记录

BasicDBObject query = new BasicDBObject("ts", new BasicDBObject("$gt", queryTs));

MongoCursor docCursor = collection.find(query)

.cursorType(CursorType.TailableAwait) //没有数据时阻塞休眠

.noCursorTimeout(true) //防止服务器在不活动时间(10分钟)后使空闲的游标超时。

.oplogReplay(true) //结合query条件,获取增量数据,这个参数比较难懂,见:https://docs.mongodb.com/manual/reference/command/find/index.html

.maxAwaitTime(1, TimeUnit.SECONDS) //设置此操作在服务器上的最大等待执行时间

.iterator();

while (docCursor.hasNext()) {

Document document = docCursor.next();

//更新查询时间戳

queryTs = (BsonTimestamp) document.get("ts");

//TODO 在这里接收到数据后通过订阅数据路由分发

String op = document.getString("op");

String database = document.getString("ns");

Document context = (Document) document.get("o");

Document where = null;

if (op.equals("u")) {

where = (Document) document.get("o2");

if (context != null) {

context = (Document) context.get("$set");

}

}

System.err.println("操作时间戳:" + queryTs.getTime());

System.err.println("操作类 型:" + op);

System.err.println("数据库.集合:" + database);

System.err.println("更新条件:" + JSON.toJSONString(where));

System.err.println("文档内容:" + JSON.toJSONString(context));

}

} catch (Exception e) { e.printStackTrace(); }

}

结语

上面代码只是一个简单的测试用例,完整的应用还需要考虑ts的记录更新,事件的抽象,数据的分发等。我们已经开源的binlog订阅分发项目目前支持数据源在线管理,订阅数据(库、表)在线管理,如果能够使用同一套管理后台管理binlog和oplog的订阅在好不过。要实现和binlog统一管理模型,配置和分发方面基本不需要改动,然后从顶层数据源方面做区分实现即可。目前我们整合管理的功能都已经开发好了,关于oplog部分的代码还没提交到github上,后面会和大家相见。

mysql mongodb binlog_订阅MongoDB的数据变更比解析mysql的binlog更简单相关推荐

  1. sqoop同步hdfs与mysql端口_使用Sqoop将数据在HDFS与MySQL互导

    1.去官网下载sqoop,直接百度即可 2.解压后进入conf目录 guo@drguo1:/opt/sqoop-1.4.6.bin__hadoop-2.0.4-alpha/conf$ cp sqoop ...

  2. 3款大数据bi工具,让企业数据分析更简单

    ​企业数据可视化的髙速发展趋势让互联网时代的数据分析及可视化拥有全新的面貌.企业针对信息内容的数据分析及可视化,的要求在日益严格,那么有哪些在企业数据分析方面做得好的大数据bi工具呢? 一.大数据bi ...

  3. 获取mongodb数据变更_支持mysql、MongoDB数据变更订阅/监听分发

    1 概述 mysql.MongoDB数据变动监听分发 本项目意在简化监听mysql.MongoDB数据库的不同表的各种数据变动 项目依赖redis,mysql 使用场景:刷新缓存.异构系统... 2 ...

  4. mysql在视图中增加新数据_怎么向Mysql视图中增加新数据

    本篇文章主要给大家介绍mysql数据表中视图是怎么新增数据的. mysql视图的相关知识在我们之前的文章中,都已经给大家详细介绍过了,相信大家对视图肯定有了更深一步的了解. 从前面文章的介绍中,大家应 ...

  5. MySQL同步到hadoop工具_数据同步工具Applier:MySQL数据库实时同步数据到Hadoop

    from: http://ourmysql.com/archives/1226 通过Map/Reduce进行批处理递送到Apache Hadoop仍然是中枢环节.,但随着要从"超思维速度&q ...

  6. java mysql防重复提交_防止数据重复提交的6种方法(超简单)!

    有位朋友,某天突然问磊哥:在 Java 中,防止重复提交最简单的方案是什么? 这句话中包含了两个关键信息,第一:防止重复提交:第二:最简单. 于是磊哥问他,是单机环境还是分布式环境? 得到的反馈是单机 ...

  7. mysql 重建分区_详解mysql重建表分区并保留数据的相关方法

    本文介绍mysql重建表分区并保留数据的方法,mysql的表分区(partition)可以把一个表的记录分开多个区去存储,查询时可根据查询的条件在对应的分区搜寻,而不需要整表查询,提高查询效率. 有分 ...

  8. Mysql学习笔记(二)——表格及数据的插入

    Mysql学习笔记(二)--表格及数据的插入 文章目录 Mysql学习笔记(二)--表格及数据的插入 1.Mysql常用指令 2.创建表格 A.数据类型 B.完整性约束条件 3.查看表格 4.修改表格 ...

  9. mysql重装系统后以前的数据_系统重装后,Mysql数据库重装加载原来数据库

    mysql 5.6 后热数据的加载 mysql 5.6 后热数据的加载 转自:http://blog.itpub.net/20892230/viewspace-2127469/ 故障现象:在数据库重启 ...

最新文章

  1. 对现有代码的分析方法随想
  2. linux/unix系统编程手册11-15
  3. 在Visual Studio 2019中修改项目名
  4. 大话云计算——认识云——“瑞友杯”虚拟化征文
  5. kafka基本管理操作命令
  6. 牛客题霸 [判断一个链表是否为回文结构] C++题解/答案
  7. 预防xml注入漏洞攻击_预防性编程-漏洞发生前如何修复
  8. bzoj15523506 robotic sort
  9. 嵌入式系统——流水线处理机执行时间计算
  10. 阿里算法工程师模拟题2018/5/7
  11. maven 构建spring boot + mysql 的基础项目
  12. Kafka从上手到实践 - 庖丁解牛:Topic Broker | 凌云时刻
  13. CAN协议学习(一)
  14. chromium的下载和编译(流程详解)
  15. #LeetCode15. 三数之和 @FDDLC
  16. 2022年PC推荐-组装机及品牌机 2022年8月16日(持续更新)
  17. iOS 导入自定义字体不生效
  18. 教你在MathType中输入空心字和花体字的重要方法
  19. 什么是 Pandas?
  20. 王者荣耀扫码登录源码,游戏扫码登录源码

热门文章

  1. 少年不懂鲁迅,如今才知道,我们都是闰土
  2. 打游戏学习人工智能!不写代码|湾区人工智能
  3. 在51系列中data,idata,xdata,pdata的区别
  4. iPhone 14 Pro影像规格曝光:升级48MP主摄 镜头模组也大了
  5. 钉钉新增“下班勿扰”功能:次日上班前不接受任何消息!
  6. 智商税?苹果1.8米连接线卖949元、3米的1169元被吐槽:高攀不起
  7. 上汽通用五菱与菜鸟合作数字供应链 一期整车数字物流系统正式启用
  8. 加投35000瓶原价茅台 天猫双11今晚8点开售
  9. 马斯克:如果我不担任CEO 特斯拉就会完蛋
  10. 阿里云盘今日公测:无论用户是否付费,未来都不会限速