FLinkX的Mongodb插件存在的问题以及对源码进行优化

一、目前FlinkX的mongodb插件存在的问题

1、mongodbreader在查询[*]的时候,并不能适应真正的nosql非结构化数据

2、查询mongodb的ObjectId时,数据类型没有处理,插入mysql时,会报错

3、查询Array和Object时,结果数据的数据结构不正确

二、mongodbreader查询[*]

1)、mongodb中存在的测试数据

{ "_id" : ObjectId("5f38ba948592e21c30acf06e"), "username" : "jack", "age" : "30" }

{ "_id" : ObjectId("5f38ba948592e21c30acf06f"), "age" : "30", "username" : "jack" }

{ "_id" : ObjectId("5f38bd9aa2f2b04d9f9125a7"), "username" : "jack", "age" : "30", "installDate" : ISODate("2020-08-16T05:01:14.098Z") }

{ "_id" : ObjectId("5f38bd9aa2f2b04d9f9125a8"), "age" : "30", "username" : "jack", "installDate" : ISODate("2020-08-16T05:01:14.098Z") }

{ "_id" : ObjectId("5f38bdd5b356190f2752cdb3") }

{ "_id" : ObjectId("5f38bde622d2ab66755ea950"), "username" : "jack", "list" : [ { "username" : "jack", "age" : "30", "installDate" : ISODate("2020-08-16T05:02:30.323Z") }, { "age" : "30", "username" : "jack", "installDate" : ISODate("2020-08-16T05:02:30.323Z") } ] }

{ "_id" : ObjectId("5f38be06635b721e5ff16183"), "username" : "jack", "doc" : { "age" : "30", "username" : "jack", "installDate" : ISODate("2020-08-16T05:03:02.212Z") } }

2)、查询的job配置

{

"job":{

"content":[

{

"reader":{

"parameter":{

"url":"mongodb://username:password@localhost:27017/test",

"fetchSize":100,

"collectionName":"mycol2",

"filter":"{}",

"column":[

"*"

]

},

"name":"mongodbreader"

},

"writer":{

"parameter":{

"path":"/Users/jack/Documents/jack-project/flinkx/data",

"protocol":"sftp",

"port":22,

"writeMode":"overwrite",

"host":"localhost",

"column":[

{

"name":"_id",

"type":"string"

},

{

"name":"username",

"type":"string"

},

{

"name":"age",

"type":"string"

},

{

"name":"installDate",

"type":"datetime"

},

{

"name":"list",

"type":"string"

},

{

"name":"doc",

"type":"string"

}

],

"password":"",

"fieldDelimiter":",",

"encoding":"utf-8",

"username":"jack"

},

"name":"ftpwriter"

}

}

],

"setting":{

"speed":{

"channel":1,

"bytes":0

},

"errorLimit":{

"record":100

},

"restore":{

"maxRowNumForCheckpoint":0,

"isRestore":false,

"restoreColumnName":"",

"restoreColumnIndex":0

},

"log":{

"isLogger":true,

"level":"info",

"path":"",

"pattern":""

}

}

}

}

3)、查询结果

5f38ba948592e21c30acf06e,jack,30

5f38ba948592e21c30acf06f,30,jack

5f38bd9aa2f2b04d9f9125a7,jack,30,2020-08-16 13:01:14

5f38bd9aa2f2b04d9f9125a8,30,jack,2020-08-16 13:01:14

5f38bdd5b356190f2752cdb3

5f38bde622d2ab66755ea950,jack,[Document{{username=jack, age=30, installDate=Sun Aug 16 13:02:30 CST 2020}}, Document{{age=30, username=jack, installDate=Sun Aug 16 13:02:30 CST 2020}}]

5f38be06635b721e5ff16183,jack,Document{{age=30, username=jack, installDate=Sun Aug 16 13:03:02 CST 2020}}

4)、分析存在的问题

可以看出username字段和age字段在第一行和第二行,字段位置错位了。查看代码发现原因:

if(metaColumns.size() == 1 && ConstantValue.STAR_SYMBOL.equals(metaColumns.get(0).getName())){

row = new Row(doc.size());

// 遍历mongodb的doc中的key,插入顺序取决于doc中key的顺序,而key顺序是无序的,因此会导致读取乱序

String[] names = doc.keySet().toArray(new String[0]);

for (int i = 0; i < names.length; i++) {

row.setField(i,doc.get(names[i]));

}

}

5)、改进思路

在查询[*]时,将查询Document对象保存在一个字段中,而不是遍历Document的所有字段

if(metaColumns.size() == 1 && ConstantValue.STAR_SYMBOL.equals(metaColumns.get(0).getName())){

row = new Row(1);

doc = Document.parse(doc.toJson(settings));

row.setField(0,doc.toJson());

}

6)、改进后测试job

{

"job":{

"content":[

{

"reader":{

"parameter":{

"url":"mongodb://username:password@localhost:27017/test",

"fetchSize":100,

"collectionName":"mycol2",

"filter":"{}",

"column":[

"*"

]

},

"name":"mongodbreader"

},

"writer":{

"parameter":{

"path":"/Users/jack/Documents/jack-project/flinkx/data",

"protocol":"sftp",

"port":22,

"writeMode":"overwrite",

"host":"localhost",

"column":[

{

"name":"_id",

"type":"string"

}

],

"password":"xxxx",

"fieldDelimiter":",",

"encoding":"utf-8",

"username":"jack"

},

"name":"ftpwriter"

}

}

],

"setting":{

"speed":{

"channel":1,

"bytes":0

},

"errorLimit":{

"record":100

},

"restore":{

"maxRowNumForCheckpoint":0,

"isRestore":false,

"restoreColumnName":"",

"restoreColumnIndex":0

},

"log":{

"isLogger":true,

"level":"info",

"path":"",

"pattern":""

}

}

}

}

7)、查看导出到本地的结果数据

{"_id": "5f38ba948592e21c30acf06e", "username": "jack", "age": "30"}

{"_id": "5f38ba948592e21c30acf06f", "age": "30", "username": "jack"}

{"_id": "5f38bd9aa2f2b04d9f9125a7", "username": "jack", "age": "30", "installDate": "1597554074098"}

{"_id": "5f38bd9aa2f2b04d9f9125a8", "age": "30", "username": "jack", "installDate": "1597554074098"}

{"_id": "5f38bdd5b356190f2752cdb3"}

{"_id": "5f38bde622d2ab66755ea950", "username": "jack", "list": [{"username": "jack", "age": "30", "installDate": "1597554150323"}, {"age": "30", "username": "jack", "installDate": "1597554150323"}]}

{"_id": "5f38be06635b721e5ff16183", "username": "jack", "doc": {"age": "30", "username": "jack", "installDate": "1597554182212"}}

8)、结论

可以看出所有的字段都导出来了,而且放入了一个字段,用户在writer配置时,不需要配置具体的字段名称了。能动态的将Collection中所有字段导出来,真正地满足了nosql非结构化数据导出。

三、查询mongodb的ObjectId时,数据类型没有处理,插入mysql时,会报错

1)、配置job文件

查询_id、name插入到mysql,将提前将日志的level设置为all,或者按照此pr修改日志级别[https://github.com/DTStack/flinkx/pull/260]

{

"job": {

"content": [

{

"reader" : {

"parameter" : {

"url": "mongodb://username:password@localhost:27017/test",

"fetchSize": 100,

"collectionName" : "mycol2",

"filter" : "{}",

"column" : ["_id","name"]

},

"name" : "mongodbreader"

},

"writer": {

"name": "mysqlwriter",

"parameter": {

"username": "root",

"password": "password",

"connection": [

{

"jdbcUrl": "jdbc:mysql://localhost:3306/test2?useSSL=false&characterEncoding=utf-8",

"table": ["bb"]

}

],

"writeMode": "insert",

"column": ["aa","bb"],

"batchSize": 1024

}

}

}

],

"setting": {

"speed": {

"channel": 1,

"bytes": 0

},

"errorLimit": {

"record": 100

},

"restore": {

"maxRowNumForCheckpoint": 0,

"isRestore": false,

"restoreColumnName": "",

"restoreColumnIndex": 0

},

"log" : {

"isLogger": true,

"level" : "info",

"path" : "",

"pattern":""

}

}

}

}

2)、运行job文件,发现插入异常

16:13:42.124 [Source: mongodbreader -> Sink: mysqlwriter (1/1)] ERROR com.dtstack.flinkx.mysql.format.MysqlOutputFormat - write error row, row = 5f38be06635b721e5ff16183,null, e = com.dtstack.flinkx.exception.WriteRecordException: Incorrect string value: '\xAC\xED\x00\x05sr...' for column 'aa' at row 1

java.sql.SQLException: Incorrect string value: '\xAC\xED\x00\x05sr...' for column 'aa' at row 1

at com.dtstack.flinkx.rdb.outputformat.JdbcOutputFormat.processWriteException(JdbcOutputFormat.java:224)

at com.dtstack.flinkx.rdb.outputformat.JdbcOutputFormat.writeSingleRecordInternal(JdbcOutputFormat.java:210)

at com.dtstack.flinkx.outputformat.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:337)

at java.util.ArrayList.forEach(ArrayList.java:1257)

at com.dtstack.flinkx.outputformat.BaseRichOutputFormat.writeRecordInternal(BaseRichOutputFormat.java:426)

at com.dtstack.flinkx.outputformat.BaseRichOutputFormat.close(BaseRichOutputFormat.java:464)

at com.dtstack.flinkx.streaming.api.functions.sink.DtOutputFormatSinkFunction.close(DtOutputFormatSinkFunction.java:120)

at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)

at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:635)

at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$afterInvoke$1(StreamTask.java:515)

at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:513)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:478)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.sql.SQLException: Incorrect string value: '\xAC\xED\x00\x05sr...' for column 'aa' at row 1

at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)

at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)

at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)

at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530)

at com.mysql.jdbc.ServerPreparedStatement.serverExecute(ServerPreparedStatement.java:1283)

at com.mysql.jdbc.ServerPreparedStatement.executeInternal(ServerPreparedStatement.java:783)

at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1197)

at com.dtstack.flinkx.rdb.outputformat.JdbcOutputFormat.writeSingleRecordInternal(JdbcOutputFormat.java:208)

... 15 more

3)、异常原因分析

在JdbcOutputFormat的writeSingleRecordInternal方法打debug断点,然后进行debug查看。可以查看_id字段的getField()方法的返回结果数据类型为ObjectId,

而Mysql的Jdbc协议不支持ObjectId数据类型,因此会报错

debug.jpg

4)、对Mongodb的_id查询进行优化

public Row nextRecordInternal(Row row) throws IOException {

Document doc = cursor.next();

// 添加bson的反序列化参数设置

doc = Document.parse(doc.toJson(settings));

if(metaColumns.size() == 1 && ConstantValue.STAR_SYMBOL.equals(metaColumns.get(0).getName())){

row = new Row(1);

row.setField(0,doc.toJson());

} else {

row = new Row(metaColumns.size());

for (int i = 0; i < metaColumns.size(); i++) {

MetaColumn metaColumn = metaColumns.get(i);

Object value = null;

if(metaColumn.getName() != null){

value = doc.get(metaColumn.getName());

if(value == null && metaColumn.getValue() != null){

value = metaColumn.getValue();

}

} else if(metaColumn.getValue() != null){

value = metaColumn.getValue();

}

if(value instanceof String){

value = StringUtil.string2col(String.valueOf(value),metaColumn.getType(),metaColumn.getTimeFormat());

}

row.setField(i,value);

}

}

return row;

}

5)、运行并查看执行结果,可以看出_id成功插入到mysql数据库了

flinkx -mode local -job /Users/jack/Documents/jack-project/flinkx/flinkconf/mongodb2mysql.json -pluginRoot /Users/jack/Documents/jack-project/flinkx/syncplugins -confProp "{\"flink.checkpoint.interval\":60000}"

---------------------------------

numWrite | 7

last_write_num_0 | 0

conversionErrors | 0

writeDuration | 20631

duplicateErrors | 0

numRead | 7

snapshotWrite | 0

otherErrors | 0

readDuration | 310

byteRead | 217

last_write_location_0 | 0

byteWrite | 217

nullErrors | 0

nErrors | 0

result.jpg

四、查询Mongodb的Array和Object时,数据结构不正确

导出结果并不符合JsonObject或者JsonArray的数据结构规范

1)、配置job文件

{

"job": {

"content": [

{

"reader" : {

"parameter" : {

"url": "mongodb://username:password@localhost:27017/test",

"fetchSize": 100,

"collectionName" : "mycol2",

"filter" : "{}",

"column" : ["list","doc"]

},

"name" : "mongodbreader"

},

"writer": {

"parameter": {

"path": "/Users/jack/Documents/jack-project/flinkx/data",

"protocol": "sftp",

"port": 22,

"writeMode": "overwrite",

"host": "localhost",

"column": [

{

"name": "_id",

"type": "string"

}

],

"password": "xxxx",

"fieldDelimiter": ",",

"encoding": "utf-8",

"username": "jack"

},

"name": "ftpwriter"

}

}

],

"setting": {

"speed": {

"channel": 1,

"bytes": 0

},

"errorLimit": {

"record": 100

},

"restore": {

"maxRowNumForCheckpoint": 0,

"isRestore": false,

"restoreColumnName": "",

"restoreColumnIndex": 0

},

"log" : {

"isLogger": true,

"level" : "info",

"path" : "",

"pattern":""

}

}

}

}

2)、查看导出结果,可以发现doc字段没有成功导出,list对象格式不符合JsonArray规范

[Document{{username=jack, age=30, installDate=Sun Aug 16 13:02:30 CST 2020}}, Document{{age=30, username=jack, installDate=Sun Aug 16 13:02:30 CST 2020}}],

3)、优化代码

public Row nextRecordInternal(Row row) throws IOException {

Document doc = cursor.next();

// 添加bson的反序列化参数设置

doc = Document.parse(doc.toJson(settings));

if(metaColumns.size() == 1 && ConstantValue.STAR_SYMBOL.equals(metaColumns.get(0).getName())){

row = new Row(1);

row.setField(0,doc.toJson());

} else {

row = new Row(metaColumns.size());

for (int i = 0; i < metaColumns.size(); i++) {

MetaColumn metaColumn = metaColumns.get(i);

Object value = null;

if(metaColumn.getName() != null){

value = doc.get(metaColumn.getName());

if(value == null && metaColumn.getValue() != null){

value = metaColumn.getValue();

}

} else if(metaColumn.getValue() != null){

value = metaColumn.getValue();

}

if(value instanceof String){

value = StringUtil.string2col(String.valueOf(value),metaColumn.getType(),metaColumn.getTimeFormat());

}

row.setField(i,value);

}

}

return row;

}

4)、查看执行结果,可以看出Mongodb的Object和Array对象已经符合json格式了

---------------------------------

numWrite | 7

last_write_num_0 | 0

conversionErrors | 0

writeDuration | 20736

duplicateErrors | 0

numRead | 7

snapshotWrite | 0

otherErrors | 0

readDuration | 429

byteRead | 607

last_write_location_0 | 0

byteWrite | 607

nullErrors | 0

nErrors | 0

---------------------------------

mysql> select * from bb;



| aa | bb |



| NULL | NULL |

| NULL | NULL |

| NULL | NULL |

| NULL | NULL |

| NULL | NULL |

| NULL | ["{\"_id\": \"5f38bde622d2ab66755ea950\", \"username\": \"jack\", \"list\": [{\"username\": \"jack\", \"age\": \"30\", \"installDate\": \"1597554150323\"}, {\"age\": \"30\", \"username\": \"jack\", \"installDate\": \"1597554150323\"}]}","{\"_id\": \"5f38bde622d2ab66755ea950\", \"username\": \"jack\", \"list\": [{\"username\": \"jack\", \"age\": \"30\", \"installDate\": \"1597554150323\"}, {\"age\": \"30\", \"username\": \"jack\", \"installDate\": \"1597554150323\"}]}"] |

| {"age": "30", "username": "jack", "installDate": "1597554182212"} | NULL |



mysql mongodb插件_FLinkX的Mongodb插件优化(三)相关推荐

  1. 阿里mysql迁移mongodb_快速掌握 MongoDB 数据库

    本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装.配置.服务的启动.数据的CRUD操作函数使用.MongoDB索引的使用(唯一索引.地理索引.过期索引.全文索引等).Map ...

  2. mysql innodb插件_mysql安装innodb插件

    1.可以用 show engines;或者show plugins;来查看 mysql>show plugins;+----------------------------+--------+- ...

  3. es mysql 同步插件_[es和数据库怎么同步]mysql与elasticsearch实时同步常用插件及优缺点对比(ES与关系型数据库同步)...

    目前mysql与elasticsearch常用的同步机制大多是基于插件实现的,常用的插件包括:elasticsearch-jdbc,elasticsearch-river-MySQL,go-mysql ...

  4. mongodb mysql json数据_使用MongoDB与MySQL有很多JSON字段?

    所以,直接回答问题- Shall we chose mongodb if half of data is schemaless, and is being stored as JSON if usin ...

  5. linux mysql 安装innodb_在ubuntu Mysql 5.7 安装InnoDB Memcached 插件

    在ubuntu Mysql 5.7 安装Memcached 插件 参考:https://dev.mysql.com/doc/refman/5.7/en/innodb-memcached-setup.h ...

  6. wordpress插件-Media folder插件汉化版_优化加速插件

    介绍: Media folder插件汉化版_优化加速插件: 是一款真正的节省图片加载时间的插件,你可以管理文件和图片,从本地WordPress媒体管理器.该插件还包括一个增强版的WordPress图库 ...

  7. opencart seo优化_「opencart seo插件」wordpress SEO插件都有哪些好用的?...

    opencart seo插件: wordpress SEO插件都有哪些好用的? wordpress本身已经将SEO做的十分好了,但还有一些没在Wordpress核心中实现的SEO特性,我们可以通过安装 ...

  8. 常用插件一、chosen插件使用及优化

    1.插件介绍 Chosen 是select下拉框美化插件,它能让丑陋的.很长的select选择框变的更好看.更方便.此外,还可以搜索查找下拉选项.它可对列表进行分组,同时也可禁用某些选择项. chos ...

  9. 【Unity3D插件】Build Report Tool插件,Build报告,优化包体,查看资源占用

    推荐阅读 CSDN主页 GitHub开源地址 Unity3D插件分享 简书地址 我的个人博客 大家好,我是佛系工程师☆恬静的小魔龙☆,不定时更新Unity开发技巧,觉得有用记得一键三连哦. 一.前言 ...

最新文章

  1. iOS Public Beta 5值得关注的7个变化
  2. html常用代码大全文库,html特效代码大全
  3. 2019年5月 Teams Community Call (China)
  4. html文件怎么生产vm页面,如何使用spring mvc将Html文件转换为.vm(velocity模板)文件...
  5. MySQL:日期函数、时间函数总结
  6. c mysql 双主复制_mysql双主复制总结
  7. 有感于去哪儿的一道笔试题
  8. (228)FPGA岗位有哪些?
  9. 运行roscore出现unable to contact my own server无法启动小海龟的部分故障问题解决
  10. 数据库链接池c3p0配置踩坑
  11. 前端Demo - 日历控件纯原生实现
  12. 《参禅与悟道》——浅谈人生
  13. 面向对象实现气缸吹气类的PLC逻辑
  14. CATIA二次开发——元素隐藏
  15. fatal error C1083:/fatal error C1010: 错误处理
  16. 部署项目启动提示找不到locahost:8080.....地址问题
  17. opencv26:霍夫直线变换
  18. hbase协处理器Coprocessor(简介)
  19. ZYNQ芯片AXI 协议和PL和PS接口互联
  20. Windows Server 2019 安装oracle11g

热门文章

  1. 官方实锤!程序员都是农民工?
  2. python数据挖掘学习笔记】十九.鸢尾花数据集可视化、线性回归、决策树花样分析
  3. go mysql 条件查询_go-sql-driver包 实现mysql不定字段查询
  4. 致敬F1七冠王!Redmi K50电竞版邀请函曝光 打造掌上梦幻跑车
  5. 支付宝集五福1月19日开启 超1000个商家机构齐发福卡
  6. 京东成全国首批支持第三方商家接入数字人民币的企业
  7. 中国互联网大佬隐退简史
  8. 报告漏洞后 马斯克宣布撤回FSD Beta 10.3版本
  9. 京东发布公告禁售87款游戏
  10. 让每一首心动歌曲穿越人海遇见你,背后竟藏着这么多“黑科技”|回响·TME音乐公开课...