在进行复杂数据传输时,特别是异构数据库的多表数据传输,我们经常会用到ETL工具来完成。Kettle是一个典型的ETL工具且使用广泛。由于Kettle功能强大且复杂,对于java开发人员来说无疑增加了项目运维的难度和复杂度。因此将Kettle的集成到Java项目中可以大大降低项目的开发难度和开发效率,同时也降低了运维复杂度。网上大多关于Kettle集成的中文资料都是基于Kettle4.0之前的。以下是根据6.0官方文档及网上相关资料开发的几个Demo如有错误还望及时指出!

1、jar包引用

文件安装目录的data-integration\lib文件夹下有很多jar包,可以根据实际需要进行添加。经过个人测试以下几个是必要的:

avalon-framework-4.1.5.jar;

commons-codec-1.9.jar;

commons-collections-3.2.1.jar;

commons-io-2.1.jar;

commons-lang-2.5.jar;

commons-logging-1.1.3.jar;

commons-vfs2-2.1-20150824.jar;

guava-17.0.jar;

jug-lgpl-2.0.0.jar;

kettle-core-6.0.1.0-386.jar;

kettle-dbdialog-6.0.1.0-386.jar;

kettle-engine-6.0.1.0-386.jar;

kettle-ui-swt-6.0.1.0-386.jar;

metastore-6.0.1.0-386.jar;

ognl-2.6.9.jar;

scannotation-1.0.2.jar

maven:

pentaho-releases

http://repository.pentaho.org/artifactory/repo/

pentaho-kettle

kettle-core

6.1.0.4-225

com.verhas

license3j

1.0.7

pentaho-kettle

kettle-dbdialog

6.1.0.4-225

pentaho-kettle

kettle-engine

6.1.0.4-225

pentaho

metastore

6.1.0.4-225

org.safehaus.jug

jug

2.0.0

lgpl

2、Java创建transaction

/**

* Creates a new Transformation using input parameters such as the tablename to read from.

* @param transformationName transformation的名称

* @param sourceDatabaseName 输入的 database 名称

* @param sourceTableName 要读取的表名

* @param sourceFields 要读取的列名

* @param targetDatabaseName 目标database名

* @param targetTableName要写入的表名

* @param targetFields要写入的列名(要跟读取的列长度相同)

* @return A new transformation metadata object

* @throws KettleException In the rare case something goes wrong

*/

public static final TransMeta buildCopyTable(String transformationName,

String sourceDatabaseName, String sourceTableName,

String[] sourceFields, String targetDatabaseName,

String targetTableName, String[] targetFields,

DatabaseMeta[] databases)

throws KettleException {

EnvUtil.environmentInit();

try

{

// Create a new transformation...

TransMeta transMeta = new TransMeta();

transMeta.setName(transformationName);

// 添加数据库连接

for (int i = 0; i < databases.length; i++) {

DatabaseMeta databaseMeta = databases[i];

transMeta.addDatabase(databaseMeta);

}

DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);

DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName);

//添加注释

String note = "Reads information from table [" + sourceTableName+ "] on database [" + sourceDBInfo + "]" + Const.CR;

note += "After that, it writes the information to table ["+ targetTableName + "] on database [" + targetDBInfo + "]";

NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1);

transMeta.addNote(ni);

// 创建读取数据源的 step...

String fromstepname = "read from [" + sourceTableName + "]";

TableInputMeta tii = new TableInputMeta();

tii.setDatabaseMeta(sourceDBInfo);

String selectSQL = "SELECT " + Const.CR;

for (int i = 0; i < sourceFields.length; i++) {

if (i > 0) selectSQL += ", "; else selectSQL += " ";

selectSQL += sourceFields[i] + Const.CR;

}

selectSQL += "FROM " + sourceTableName;

tii.setSQL(selectSQL);

PluginRegistry registry = PluginRegistry.getInstance();

String fromstepid = registry.getPluginId(tii);

StepMeta fromstep = new StepMeta(fromstepid, fromstepname,(StepMetaInterface) tii);

fromstep.setLocation(150, 100);

fromstep.setDraw(true);

fromstep.setDescription("Reads information from table [" + sourceTableName + "] on database [" + sourceDBInfo + "]");

transMeta.addStep(fromstep);

// 添加 重命名 fields的逻辑

// Use metadata logic in SelectValues, use SelectValueInfo...

SelectValuesMeta svi = new SelectValuesMeta();

svi.allocate(0, 0, sourceFields.length);

for (int i = 0; i < sourceFields.length; i++) {

svi.getSelectName()[i] = sourceFields[i];

svi.getSelectRename()[i] = targetFields[i];

}

String selstepname = "Rename field names";

String selstepid = registry.getPluginId(svi);

StepMeta selstep = new StepMeta(selstepid, selstepname, (StepMetaInterface) svi);

selstep.setLocation(350, 100);

selstep.setDraw(true);

selstep.setDescription("Rename field names");

transMeta.addStep(selstep);

TransHopMeta shi = new TransHopMeta(fromstep, selstep);

transMeta.addTransHop(shi);

fromstep = selstep;

// 创建 写数据的 step...

// 添加 输出表 step...

String tostepname = "write to [" + targetTableName + "]";

TableOutputMeta toi = new TableOutputMeta();

toi.setDatabaseMeta(targetDBInfo);

toi.setTablename(targetTableName);

toi.setCommitSize(200);

toi.setTruncateTable(true);

String tostepid = registry.getPluginId(toi);

StepMeta tostep = new StepMeta(tostepid, tostepname, (StepMetaInterface) toi);

tostep.setLocation(550, 100);

tostep.setDraw(true);

tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]");

transMeta.addStep(tostep);

// 添加连线...

TransHopMeta hi = new TransHopMeta(fromstep, tostep);

transMeta.addTransHop(hi);

// The transformation is complete, return it...

return transMeta;

} catch (Exception e) {

throw new KettleException("An unexpected error occurred creating the new transformation", e);

}

}

3、Java运行Kettle的transaction:

/**

* 运行转换文件方法

* @param params 多个参数变量值

* @param ktrPath 转换文件的路径,后缀ktr

*/

public static void runTransfer(String[] params, String ktrPath) {

Trans trans = null;

try {

// 初始化

// 转换元对象

KettleEnvironment.init();

EnvUtil.environmentInit();

TransMeta transMeta = new TransMeta(ktrPath);

// 转换

trans = new Trans(transMeta);

// 执行转换

trans.execute(params);

// 等待转换执行结束

trans.waitUntilFinished();

// 抛出异常

if (trans.getErrors() > 0) {

throw new Exception(

"There are errors during transformation exception!(传输过程中发生异常)");

}

} catch (Exception e) {

e.printStackTrace();

}

}

4、Java运行Kettle的Job:

/**

* java 调用 kettle 的job

*

* @paramjobPath

*

*/

public static void runJob(String[] params, String jobPath) {

try {

KettleEnvironment.init();

//jobPath是Job脚本的路径及名称

JobMeta jobMeta = new JobMeta(jobPath, null);

Job job = new Job(null, jobMeta);

// 向Job 脚本传递参数,脚本中获取参数值:${参数名}

// job.setVariable(paraname, paravalue);

job.setVariable("id", params[0]);

job.setVariable("dt", params[1]);

job.start();

job.waitUntilFinished();

if (job.getErrors() > 0) {

throw new Exception(

"There are errors during job exception!(执行job发生异常)");

}

} catch (Exception e) {

e.printStackTrace();

}

}

注:

1、在Kettle连接SqlServer数据库时建议使用开源的jtds数据库jar包,微软官方jar包不受支持。

2、个人建议使用项目中的调度框架(如quartz、Spring的schedule等)调用transaction来实现定时执行,可以更灵活的控制我们的Job。

3、Kettle有强大的图形化设计器,transaction的创建建议在Kettle中进行。

顺便附上实现后的系统界面样例

clipboard.png

java kettle6_Java调用Kettle6的transaction和job相关推荐

  1. java kettle6_java 调用kettle 6.1 转换

    有人用过java 调用kettle6.1 转换脚本吗?       我用同样的代码,可以执行kettle 4.1的脚本,可6.1始终不行? 是6.1 不支持了?还是写法变了? 如下这段代码4.1可以, ...

  2. java中调用python

    在Java中调用Python </h1><div class="clear"></div><div class="postBod ...

  3. Atitit.java jna  调用c  c++ dll的原理与实践  总结  v2  q27

    Atitit.java jna  调用c  c++ dll的原理与实践  总结  v2  q27 1. Jna简单介绍1 2. Jna范例halo owrld1 3. Jna概念2 3.1. (1)需 ...

  4. 在Java中调用Python

    写在前面 参考:https://www.cnblogs.com/nuccch/p/8435693.html 在微服务架构大行其道的今天,对于将程序进行嵌套调用的做法其实并不可取,甚至显得有些愚蠢.当然 ...

  5. 如何在java中调用js方法

    [java] view plain copy/* * 加载脚本引擎,并在java中调用js方法 */ public void test2() { ScriptEngineManager manager ...

  6. java 中调用 Matlab 的函数

    一.matlab版本必须支持java 在command 模式下面运行deploytool,如果支持该命令即可使用 二.matlab中function的书写 %定义一个函数operation(a,b), ...

  7. java怎么调用存储函数_java中调用存储过程或存储函数的方法

    java中调用存储过程或存储函数的方法 1.调用存储过程:CallableStatement clstmt = null;try {clstmt = conn.prepareCall("{c ...

  8. java 采集rtsp_通过Java程序调用RTSP拉流协议视频平台EasyNVR程序接口步骤概览

    RTSP协议视频平台EasyNVR经过多年的积累,已经是一套成熟且完善的视频平台了,用户可以通过网页直接访问视频监控,也可以通过调用二次开发接口进行二次开发或者集成.下面介绍下Java程序调用Easy ...

  9. JAVA方法调用中的解析与分派

    JAVA方法调用中的解析与分派 本文算是<深入理解JVM>的读书笔记,参考书中的相关代码示例,从字节码指令角度看看解析与分派的区别. 方法调用,其实就是要回答一个问题:JVM在执行一个方法 ...

最新文章

  1. 利用查找替换批处理(附完整源码),进行高效重构
  2. 如何修改html页眉页脚,如何使用标准页眉和页脚修改/更新一组html文件
  3. Hadoop API编程——FileSystem操作
  4. 零基础如何学好Python?Python有哪些必须学的知识?
  5. 万万没想到,刷1000道题目,还不如搞懂这几个机械动图!
  6. matlab imwrite将图像保存到其他目录
  7. Xshell如何进行文件上传?
  8. 基础总结篇之六:ContentProvider之读写联系人
  9. 中如何设置电气栅格_游戏中的设置界面如何设计?
  10. CCF201612试题
  11. Axure 元件 模板 MES系统 全套(带下载地址)
  12. linux 终端 backspace不能删除字符 出现^H
  13. 英特尔老款CPU支持虚拟化对照表(转)
  14. Web前端工程师推荐的书籍
  15. 用iTunes制作苹果手机铃声
  16. LaTex Verbatim 环境下使用数学符号
  17. 安卓APP注册登录+Tomcat服务器搭建+MySQL数据库建立+加密传输+servlet后端内容编写及部署到Tomcat服务器
  18. 2022 年终奖个税计算方法,看看你被多收割了多少
  19. tensorflow.keras入门1
  20. PAT L3-011 直捣黄龙

热门文章

  1. android碎片化的解决方法,解决 Android 设备碎片化--屏幕适配
  2. python垃圾分类图像识别算法_Python 实现一个简单的垃圾分类小游戏(已获校级二等奖)...
  3. 算法练习day2——190319(大顶堆、冒泡、选择、插入)
  4. linux mysql 5.7 双机热备_2017年5月5日 星红桉liunx动手实践mysql 主主双机热备
  5. Cpp 对象模型探索 / 类普通成员函数的调用方式
  6. mysql scope runtime_Maven依赖scope属性详解-一个报错引发的问题 - 老郭种树
  7. python环境变量配置_Python环境变量的配置
  8. Rtx userlist.php,【图片】【C语言】【控制台】提取腾讯通用户信息(id,用户名,手机)【erbi_lucifer吧】_百度贴吧...
  9. 每日软件进度报告—12月6日
  10. bzoj 1028: [JSOI2007]麻将