文章目录

  • 一、背景
  • 二、基于java的本地测试datax
    • 2.1 github上下载datax的源代码
    • 2.2 datax代码导入idea
  • 三、docker安装南大通用数据库GBase和GBase 8a
    • 3.1 docker安装Gbase 8a
    • 3.2 docker安装Gbase 8s
  • 四、南大通用数据库GBase 8s To GBase 8a
    • 4.1 GBase 8s的reader读插件开发(writer同理)
  • 五、南大通用gbase可视化工具
    • 5.1 南大通用GBase 8a的连接配置
    • 5.2 南大通用GBase 8s的连接配置

一、背景

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。但是随着业务需求的增加,Datax自带的插件逐渐不满足,因此需要进行二次开发新的插件。基于此,本文对datax插件的二次开发步骤进行详细的说明,希望能帮助到大家。

二、基于java的本地测试datax

2.1 github上下载datax的源代码

网址: https://github.com/alibaba/DataX

2.2 datax代码导入idea

1.在此,把从Git上下载的datax代码,本地解压导入idea,如下图所示:

2. 基于java的本地测试

利用本地的maven进行编译datax源代码,打包命令如下:

 mvn -U clean package assembly:assembly -Dmaven.test.skip=true

等待打包完成…即生成target目录:

3. 利用核心类,Core模块下的Engine进行本地测试:

4 修改Engine.java代码,因为这里是datax的入口。
首先设置系统变量,也就是datax的目录路径:

System.setProperty("datax.home","F:\\gx\\DataX\\target\\datax\\datax");

其次设置job路径和一些参数。

String[] datxArgs = {"-job", "F:\\gx\\DataX\\target\\datax\\datax\\job\\job.json", "-mode", "standalone", "-jobid", "-1"};

(datax目录路径和job路径用的是打包生成的target目录里的datax) 进行本地测试的时候,修改job.json即可。

测试: 直接点击main方法运行。

三、docker安装南大通用数据库GBase和GBase 8a

3.1 docker安装Gbase 8a

1 第一步 环境准备

确认机器上已经安装了Docker环境

2 第二步 在安装好docker的服务器上进行下载GBase 8a镜像

$ docker pull shihd/gbase8a:1.0 

3 第三步 创建并启动容器

$ docker run --name 容器名 -itd -p5258:5258 shihd/gbase8a:1.0

GBase8a数据库信息

DB: gbase
User: root
Password: root
Port: 5258

3.2 docker安装Gbase 8s

1.查找GBase 8s 镜像版本

[root@localhost ~]# docker search gbase8s


2. 拉取GBase 8s镜像

[root@localhost ~]# docker pull liaosnet/gbase8s:3.3.0_2_amd64

3 运行容器

docker run --name 容器名 -p 19088:9088 -itd liaosnet/gbase8s:3.3.0_2_amd64

4 安装成功
通过docker ps 命令查看安装是否成功。并且进入容器:docker exec -i -t 容器名 /bin/bash
进入容器,查看数据库状态:


在则查看gbase 8s的实例服务:

四、南大通用数据库GBase 8s To GBase 8a

4.1 GBase 8s的reader读插件开发(writer同理)

1 新建gbase8sreader模块

2 复制mysqlreader的assembly包和resource包下的plugin.json、plugin_job_template.json
把复制的放入gbase8sreader模块相应的位置。

3. 修改复制过来的文件。

package.xml

<assemblyxmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"><id></id><formats><format>dir</format></formats><includeBaseDirectory>false</includeBaseDirectory><fileSets><fileSet><directory>src/main/resources</directory><includes><include>plugin.json</include><include>plugin_job_template.json</include></includes><outputDirectory>plugin/reader/gbase8sreader</outputDirectory></fileSet><fileSet><directory>target/</directory><includes><include>gbase8sreader-0.0.1-SNAPSHOT.jar</include></includes><outputDirectory>plugin/reader/gbase8sreader</outputDirectory></fileSet></fileSets><dependencySets><dependencySet><useProjectArtifact>false</useProjectArtifact><outputDirectory>plugin/reader/gbase8sreader/libs</outputDirectory><scope>runtime</scope></dependencySet></dependencySets>
</assembly>

plugin.json

{"name": "gbase8sreader","class": "com.cetc.datax.plugin.reader.gbase8sreader.Gbase8sReader","description": "useScene: test. mechanism: use datax framework to transport data from txt file. warn: The more you know about the data, the less problems you encounter.","developer": "cetc"
}

注意:“class” 为我们后面需要编写的代码类名
plugin_job_template.json

{"name": "gbase8sreader","parameter": {"username": "","password": "","column": [],"connection": [{"jdbcUrl": [],"table": []}],"where": ""}
}

4 新建插件代码包

Gbase8sReader代码如下所示:

package com.cetc.datax.plugin.reader.gbase8sreader;import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import java.util.List;
import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;/*** @author : shujuelin* @date : 16:08 2021/10/11*/
public class Gbase8sReader extends Reader {private static final DataBaseType DATABASE_TYPE = DataBaseType.Gbase8s;public static class Job extends Reader.Job {private static final Logger LOG = LoggerFactory.getLogger(Job.class);private Configuration originalConfig = null;private CommonRdbmsReader.Job commonRdbmsReaderJob;@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);if (userConfigedFetchSize != null) {LOG.warn("对 gbasereader 不需要配置 fetchSize, gbasereader 将会忽略这项配置. 如果您不想再看到此警告,请去除fetchSize 配置.");}this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);this.commonRdbmsReaderJob.init(this.originalConfig);}@Overridepublic void preCheck(){init();this.commonRdbmsReaderJob.preCheck(this.originalConfig,DATABASE_TYPE);}@Overridepublic List<Configuration> split(int adviceNumber) {return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);}@Overridepublic void post() {this.commonRdbmsReaderJob.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsReaderJob.destroy(this.originalConfig);}}public static class Task extends Reader.Task {private Configuration readerSliceConfig;private CommonRdbmsReader.Task commonRdbmsReaderTask;@Overridepublic void init() {this.readerSliceConfig = super.getPluginJobConf();this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE,super.getTaskGroupId(), super.getTaskId());this.commonRdbmsReaderTask.init(this.readerSliceConfig);}@Overridepublic void startRead(RecordSender recordSender) {int fetchSize = 1000; //this.readerSliceConfig.getInt(Constant.FETCH_SIZE);this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,super.getTaskPluginCollector(), fetchSize);}@Overridepublic void post() {this.commonRdbmsReaderTask.post(this.readerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);}}
}

注意

代码里的:private static final DataBaseType DATABASE_TYPE = DataBaseType.Gbase8s;
需要在

进行修改,增加自己的数据源类型。

  Gbase("gbase","com.gbase.jdbc.Driver"),Gbase8s("gbase8s","com.gbasedbt.jdbc.Driver");

在DataBaseType类中针对reader的数据源,要修改appendJDBCSuffixForReader方法:

同理: 编写writer插件时候也需要在相应的方法里appendJDBCSuffixForWriter进行修改。

5 打包运行
将其他模块注释只留下公共模块和自己的项目模块。最外层的pom

在最外层的package.xml加上下面这个

<fileSet><directory>gbase8sreader/target/datax/</directory><includes><include>**/*.*</include></includes><outputDirectory>datax</outputDirectory></fileSet>

五、南大通用gbase可视化工具

本文使用的客户端连接工具为:

5.1 南大通用GBase 8a的连接配置

5.2 南大通用GBase 8s的连接配置

大数据技术之DataX (一)DataX插件开发相关推荐

  1. 百分点大数据技术团队:乘风破浪 海外数据中台项目实践

    编者按 踏上一带一路的新丝路,北京百分点信息科技有限公司从2016年开拓海外业务,以大数据技术为基础,结合中国先进的数据治国理念,用数据智能推动社会进步.三年时间,百分点海外团队在非洲某国实施大数据项 ...

  2. 百分点大数据技术团队:数据治理“PAI”实施方法论

    数据作为第五大生产要素,已逐渐成为政府和企业决策的重要手段与依据.面对数据多样化.数据需求个性化.数据应用智能化的需求,以及在2B和2G行业中数据质量参差不齐.数据应用难以发挥价值.数据资产难以沉淀等 ...

  3. 360互联网训练营第十四期——大数据技术开放日

    一 月 一年之计在于春 360互联网技术训练营陪你开启2019,为新一年加满能量! 在大数据时代的潮流中,数据存储与分析在价值创造过程起着越来越重要的作用.本次大数据技术开放日邀请360.快手.人人车 ...

  4. 大数据技术系列(1)

    大数据技术概述   转载于:https://younger.blog.csdn.net/article/details/127632128?spm=1001.2014.3001.5502 仅仅作为个人 ...

  5. StoneDT开源舆情系统大数据技术栈介绍

    我们目前开源的 舆情系统 分为3个部分,整个系统使用了多种开源技术组件和开源框架,涵盖涉及技术领域广泛,例如:分布式计算.大数据.人工智能.数据中台.数据挖掘.深度学习.java和python的大量实 ...

  6. 一文看懂大数据生态圈完整知识体系【大数据技术及架构图解实战派】

    一文看懂大数据生态圈完整知识体系 徐葳 随着大数据行业的发展,大数据生态圈中相关的技术也在一直迭代进步,作者有幸亲身经历了国内大数据行业从零到一的发展历程,通过本文希望能够帮助大家快速构建大数据生态圈 ...

  7. SDCC 2017优秀专题线上展第一站:大数据技术实战峰会首解密

    SDCC始创于2007年,十年长空,历久弥新--2016年,SDCC已分别在上海.深圳.成都.杭州.北京五地举办系列峰会,广受当地开发者欢迎.其中,北京作为年度收官之作,聚焦最前沿技术成果,汇聚年度最 ...

  8. FinTech研发报告-之大数据技术

    前序: 手记血泪史:2016年底~2017年是自己技术生涯的元年,所以逼着自己写一般书籍(原打算,后来发现自律性和俗事太多最后没有实现),当时一直关注FinTech个方面的内容,所以决定先写大数据方面 ...

  9. 关于大数据技术的演讲_大数据以及大数据技术都包括哪些内容

    大数据经过多年的发展,目前在概念上已经有了更多的含义,从不同的角度来看待大数据也会有不同的定义,但是总的来说,大数据可以用三个方面来进行概括,其一是"新的价值领域";其二是&quo ...

  10. 倒计时1天 | 2019 中国大数据技术大会(BDTC)报名通道即将关闭(附参会提醒)...

    2019年12月5-7日,由中国计算机学会主办,CCF 大数据专家委员会承办,CSDN.中科天玑数据科技股份有限公司协办的中国大数据技术大会(BDTC 2019)将于北京长城饭店隆重举行.届时,超过百 ...

最新文章

  1. php调用python pkl_Python Pickle的任意代码执行漏洞实践和Payload构造
  2. ospf hello时间和dead_图文并茂解释OSPF邻居关系建立失败的几种常见情况(太实用了!)...
  3. python打印99乘法表_Python 实例:九九乘法表
  4. java jni日志输出_java打印Jni层log
  5. my batis plus 小数没有0_大黄蜂3号Plus,妈咪保贝的强劲对手!
  6. TortoiseSVN 命令 (命令行执行工具)
  7. php读取mssql中文乱码,PHP读取mssql json数据中文乱码的解决办法
  8. SPOJ Pouring Water
  9. 20k超声波电路原理图讲解_超声波液位开关和液位开关的区别,它们的工作原理分别是什么?...
  10. java web 和js区别_jsp和javascript之间有什么区别?
  11. 什么是DHCP(中继模式)
  12. python基础——闭包函数和生成器
  13. MySQL查询指令示例---初学者必看
  14. C++20新特性全在这一张图里了
  15. JavaScript 验证码制作
  16. redis分布式锁的安全性及与zookeeper的对比
  17. python os.path.splitext()的用法_Python中os.path用法分析
  18. 记2019北航计算机夏令营体验~
  19. flash 调用 脚本_Flash脚本-DNA双螺旋
  20. 打开IDE里XXX.rc文件夹的子项,显示“!加载失败”

热门文章

  1. 芝加哥计算机社会科学硕士,芝加哥大学统计学研究生学费费用解读
  2. java 计算圆的面积
  3. Python--循环语句练习
  4. 戴尔服务器前置信息屏报错CPU1 mem vtt pg voltage is outside of range
  5. 关于角色卡入土里与水平自动移动的问题
  6. SMB交换机、接入交换机、汇聚交换机、核心交换机
  7. JAVA网络传输乱码问题
  8. python3-opencv库(4)--图片像素运算,利用加权叠加调节图像对比度和亮度
  9. LeetCode 会议室 II
  10. 网络攻防之dns劫持与网页挂马(实测)