第一次接触Talend,做一个Demo,目的是实现定时同步!经过一番折腾,终于实现了,在此记录一下,以慰折腾之心!哈哈!

目的:每月定时触发两个数据库之间当月数据的同步以及可以人工通过发送JMS来触发指定月份的数据同步,在同步过程中出错的话自动转发送JMS继续同步!

过程:

1、实现基本业务流程

话不多说,直接上图!呵呵!

上图中有几点备注一下:

1、tLibraryLoad是因为我用TIBCO EMS,所以需要先把相关jar添加进来

2、tSetGlobalVar是为了我判断是否是第一次发送,如果是第一次发送我需要删除目标库相同月份的数据后再同步

3、tLogCatcher是当数据库操作出错时,将同步到哪个月份的相关参数发JMS,以便继续触发同步

4、tJavaFlex是为了处理多个年月的循环用,tJavaFlex代码如下:

start code:

//start part of your Java code

System.out.println("tJavaFlex_1: Start code");int beginYear =context.beginYear;int beginMonth =context.beginMonth;int endYear =context.endYear;int endMonth =context.endMonth;int currYear =beginYear;int currMonth =beginMonth;

String currYearMonth= "";

row1.beginYear=beginYear;

row1.beginMonth=beginMonth;

row1.endYear=endYear;

row1.endMonth=endMonth;while((currYear*100+currMonth) <= (endYear*100+endMonth))

{

main code:

//here is the main part of the component,//a piece of code executed in the row//loop

if(currMonth >= 10){

currYearMonth= "" + currYear +currMonth;

}else{

currYearMonth= "" + currYear + "0" +currMonth;

}

row1.currYear=currYear;

row1.currMonth=currMonth;

row1.currYearMonth=currYearMonth;

currMonth= currMonth + 1;if(currMonth > 12){

currYear= currYear + 1;

currMonth= 1;

}

end code:

//end of the component, outside/closing the loop

}

System.out.println("tJavaFlex_1: End code");

2、实现JMS监听调用

这个主要接受JMS消息后继续调用基本流程。

3、定时调用业务流程

因为在组件里面没有找到定时的组件,故自己开发了一个基于Quartz的timer组件,可能不是很好,但凑合着能用,先记录一下,以后有新的想法后再优化!呵呵!

在做定时组件时,开始打算把代码都写到模板中去,但是总是有问题,后来就把一部分代码写在外面用jar的形式引入进来!

A、用eclipse或其他工具创建一个java工程,因为是基于Quartz的,所以需要引进相关jar:quartz-2.2.3.jar,slf4j-api-1.7.22.jar,slf4j-log4j12-1.7.22.jar,log4j-1.2.17.jar,c3p0-0.9.2.jar

B、创建3个java文件(当然也可以合并,根据自己的喜好):EsquelTimerJobStatus.java,EsquelTimerJob.java,EsquelTimerJobMonitor.java

直接上代码:

EsquelTimerJobStatus.java

packagecom.esquel.talend.quartz;importjava.util.HashMap;importjava.util.Map;public classEsquelTimerJobStatus {private static Map timerInfo = new HashMap();public static Map operateTimerInfo(String cid, Map timerMap, booleanisRemove) {synchronized(timerInfo) {if (timerMap != null) {

timerInfo.putAll(timerMap);return null;

}else{if (cid == null) {

cid= "1";

}if(isRemove) {

timerInfo.remove("currentYear_" +cid);

timerInfo.remove("currentMonth_" +cid);

timerInfo.remove("currentDay_" +cid);

timerInfo.remove("currentHour_" +cid);

timerInfo.remove("currentMinute_" +cid);

timerInfo.remove("currentSecond_" +cid);

timerInfo.remove("currentDate_" +cid);

timerInfo.remove("currentDateTime_" +cid);

timerInfo.remove("isRun_" +cid);return null;

}else{if (timerInfo.get("isRun_" + cid) != null && (Boolean) timerInfo.get("isRun_" +cid)) {

Map returnMap = new HashMap();

returnMap.put("isRun_" + cid, true);

returnMap.put("currentYear_" + cid, timerInfo.get("currentYear_" +cid));

returnMap.put("currentMonth_" + cid, timerInfo.get("currentMonth_" +cid));

returnMap.put("currentDay_" + cid, timerInfo.get("currentDay_" +cid));

returnMap.put("currentHour_" + cid, timerInfo.get("currentHour_" +cid));

returnMap.put("currentMinute_" + cid, timerInfo.get("currentMinute_" +cid));

returnMap.put("currentSecond_" + cid, timerInfo.get("currentSecond_" +cid));

returnMap.put("currentDate_" + cid, timerInfo.get("currentDate_" +cid));

returnMap.put("currentDateTime_" + cid, timerInfo.get("currentDateTime_" +cid));

timerInfo.put("isRun_" + cid, false);returnreturnMap;

}else{return null;

}

}

}

}

}

}

EsquelTimerJob.java

packagecom.esquel.talend.quartz;importjava.util.Calendar;importjava.util.HashMap;importjava.util.Map;importorg.quartz.Job;importorg.quartz.JobDataMap;importorg.quartz.JobExecutionContext;importorg.quartz.JobExecutionException;public final class EsquelTimerJob implementsJob {public void execute(JobExecutionContext context) throwsJobExecutionException {

java.util.Calendar currentDate=java.util.Calendar.getInstance();

JobDataMap dataMap=context.getJobDetail().getJobDataMap();

String cid= dataMap.getString("cid");if (cid == null) {

cid= "1";

}

java.text.SimpleDateFormat dateFormat= new java.text.SimpleDateFormat("yyyy-MM-dd");

String datestr=dateFormat.format(currentDate.getTime());

java.text.SimpleDateFormat datetimeFormat= new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

String datetimestr=datetimeFormat.format(currentDate.getTime());

Map timerMap = new HashMap();

timerMap.put("currentYear_" +cid, currentDate.get(Calendar.YEAR));

timerMap.put("currentMonth_" + cid, currentDate.get(Calendar.MONTH) + 1);

timerMap.put("currentDay_" +cid, currentDate.get(Calendar.DAY_OF_MONTH));

timerMap.put("currentHour_" +cid, currentDate.get(Calendar.HOUR_OF_DAY));

timerMap.put("currentMinute_" +cid, currentDate.get(Calendar.MINUTE));

timerMap.put("currentSecond_" +cid, currentDate.get(Calendar.SECOND));

timerMap.put("currentDate_" +cid, datestr);

timerMap.put("currentDateTime_" +cid, datetimestr);

timerMap.put("isRun_" + cid, true);

EsquelTimerJobStatus.operateTimerInfo(cid, timerMap,false);//System.out.println(cid+ ":" + datetimestr);

}

}

EsquelTimerJobMonitor.java

packagecom.esquel.talend.quartz;importjava.util.Map;public classEsquelTimerJobMonitor {publicEsquelTimerJobMonitor() {

}public MapgetTimerInfo(String cid) {while (true) {

Map returnMap = EsquelTimerJobStatus.operateTimerInfo(cid, null, false);if (returnMap != null) {returnreturnMap;

}

}

}

}

C、加入一个log4j.properties文件,这个自己定义,因为不加入的话,好像会报log引入错误

D、将上面代码和log4j.properties打包成一个jar:EsqueTimerJob.jar

E、接下来是对Talend进行组件开发,创建一个组件tEsquelTimer,引入相关jar,勾选begin和end,如下图:

注:自定义图标下面的esquelTimer.png的命名不对,要用tEsquelTimer_icon32.png

话不多说,直接上相关文件代码!

tEsquelTimer_java.xml:

Esquel

"0/10 * * * * ? *"

tEsquelTimer_begin.javajet:

imports="org.talend.core.model.process.INode

org.talend.core.model.process.ElementParameterParser

org.talend.core.model.metadata.IMetadataTable

org.talend.core.model.metadata.IMetadataColumn

org.talend.core.model.process.IConnection

org.talend.core.model.process.IConnectionCategory

org.talend.designer.codegen.config.CodeGeneratorArgument

org.talend.core.model.metadata.types.JavaTypesManager

org.talend.core.model.metadata.types.JavaType

java.util.List

java.util.Map"

%>

INode node=(INode)codeGenArgument.getArgument();

String cid=node.getUniqueName();

log= newLogUtil(node);

String cronExpression= ElementParameterParser.getValue(node, "__TIMER_CORN__");

IMetadataTable metadata=null;

List metadatas =node.getMetadataList();if ((metadatas!=null)&&(metadatas.size()>0)) {

metadata= metadatas.get(0);

}%>

int nbline_ = 0;

com.esquel.talend.quartz.EsquelTimerJobMonitor etm= newcom.esquel.talend.quartz.EsquelTimerJobMonitor();try{

org.quartz.impl.StdSchedulerFactory sf_ = neworg.quartz.impl.StdSchedulerFactory();

org.quartz.Scheduler sched_ = sf_.getScheduler();

org.quartz.impl.JobDetailImpl jobDetail_ = neworg.quartz.impl.JobDetailImpl();

jobDetail_.setName("EsquelTimerJob_");

jobDetail_.setGroup("EsquelTimerJobGroup_");

jobDetail_.setJobClass(com.esquel.talend.quartz.EsquelTimerJob.class);

org.quartz.JobDataMap jobDataMap_ = neworg.quartz.JobDataMap();

jobDataMap_.put("cid", "");

jobDetail_.setJobDataMap(jobDataMap_);

org.quartz.impl.triggers.CronTriggerImpl cornTrigger_ = neworg.quartz.impl.triggers.CronTriggerImpl();

cornTrigger_.setName("EsquelTimerTrigger_");

cornTrigger_.setGroup("EsquelTimerTriggerGroup_");

cornTrigger_.setCronExpression();

sched_.scheduleJob(jobDetail_, cornTrigger_);

sched_.start();

System.out.println("Ready to schedule");

System.out.println("Waiting...");

java.util.Map currMap = null;while((currMap=etm.getTimerInfo("")) != null){

conns =node.getOutgoingSortedConnections();

List columnLists =metadata.getListColumns();for(IConnection conn:conns){if(conn.getLineStyle().hasConnectionCategory(IConnectionCategory.DATA)) {

String firstConnName=conn.getName();%>

.currentYear=(Integer)currMap.get("currentYear_");.currentMonth=(Integer)currMap.get("currentMonth_");.currentDay=(Integer)currMap.get("currentDay_");.currentHour=(Integer)currMap.get("currentHour_");.currentMinute=(Integer)currMap.get("currentMinute_");.currentSecond=(Integer)currMap.get("currentSecond_");.currentDate=(String)currMap.get("currentDate_");.currentDateTime=(String)currMap.get("currentDateTime_");

}%>

tEsquelTimer_end.javajet:

imports="org.talend.core.model.process.INode

org.talend.core.model.process.ElementParameterParser

org.talend.core.model.metadata.IMetadataTable

org.talend.core.model.metadata.IMetadataColumn

org.talend.core.model.process.IConnection

org.talend.core.model.process.IConnectionCategory

org.talend.designer.codegen.config.CodeGeneratorArgument

org.talend.core.model.metadata.types.JavaTypesManager

org.talend.core.model.metadata.types.JavaType

java.util.List

java.util.Map"

%>

INode node=(INode)codeGenArgument.getArgument();

String cid=node.getUniqueName();

log= newLogUtil(node);%>nbline_++;

}

}catch(Exception e) {

e.printStackTrace();

}globalMap.put("_NB_LINE", nbline_);

tEsquelTimer_messages.properties:

#

#Tue Feb14 13:20:59 CST 2017TIMER_CORN.NAME=CronExpression

NB_LINE.NAME=NB_LINE

HELP=org.talend.help.tEsquelTimer

LONG_NAME=tEsquelTimer using quartz

F、发布组件,并开发Job

好了!在此大功告成!哈哈!

talend同步mysql_Talend初试,实现定时同步相关推荐

  1. Linux平台上文件同步——rsync+inotify之定时同步

    1 前言 1.1 概述 本文介绍使用rsync和 inotify-tools,实现linux 上的本地定时同步和远程定时同步的方法. 1.2 实验环境 服务器两台 操作系统: CentOS-7.4 软 ...

  2. 数据库同步复制|sqlserver同步工具|sqlserver数据库定时同步

    SyncNavigator v8.6.2 SyncNavigator是一款功能强大的数据库同步软件,适用于SQL SERVER, MySQL,具有自动/定时同步数据.无人值守.故障自动恢复.同构/异构 ...

  3. 【定时同步系列1】定时同步之MARTIN OERDER算法原理与公式推导

    关注公号[逆向通信猿],口令:OM算法 信号模型 接收到的信号(PAM)或等效的低通信号(QAM,PSK)可以写为 r ( t ) = ∑ n = − ∞ ∞ a n g T (

  4. 【定时同步系列2】16QAM调制+OM定时+信号分段处理+误码率曲线之MATLAB仿真(复信号模型)

    关注公号[逆向通信猿]内容!!! 算法回顾 关于O&M算法的原理.公式推导与详解,请参考博客: [定时同步系列1]定时同步之MARTIN OERDER算法原理与公式推导 鉴于前期有很多读者私信 ...

  5. 5GNR漫谈16:OFDM的符号定时同步与偏差(STO)

    任何一个无线收发通信系统,不管是4G,5G,蓝牙,wifi,都要考虑两个同步,一个是定时同步,即接收端要找到接收信号的起始位置:另一个是载波同步,即发送端采用的晶振频率和接收端采用的晶振频率之间会有物 ...

  6. solr mysql 分词_solr 7+tomcat 8 + mysql实现solr 7基本使用(安装、集成中文分词器、定时同步数据库数据以及项目集成)...

    基本说明 Solr是一个开源项目,基于Lucene的搜索服务器,一般用于高级的搜索功能: solr还支持各种插件(如中文分词器等),便于做多样化功能的集成: 提供页面操作,查看日志和配置信息,功能全面 ...

  7. sql 定时同步两个数据库

    --定时同步服务器上的数据 --例子: --测试环境,SQL Server2000,远程服务器名:xz,用户名为:sa,无密码,测试数据库:test --服务器上的表(查询分析器连接到服务器上创建) ...

  8. java控制一次传10条数据_java 定时同步数据的任务优化

    前言 定时任务在系统中并不少见,主要目的是用于需要定时处理数据或者执行某个操作的情况下,如定时关闭订单,或者定时备份.而常见的定时任务分为2种,第一种:固定时间执行,如:每分钟执行一次,每天执行一次. ...

  9. mysql备份至cos_宝塔面板网站文件/数据库定时同步备份至腾讯云COS设置

    本来老蒋这篇文章是要分享张戈同学关于利用腾讯云COS备份网站和数据库脚本工具的整理的,但是翻看之前的博文发现我们能用到的面板和工具包大部分都自带第三方云存储接口快速备份的.所以这篇文章延期到后面再去分 ...

最新文章

  1. 【C】数组数组初始化总结
  2. 新站SEO优化如何吸引搜索引擎蜘蛛的爬行?
  3. 美国数学三大分支专业就业前景解析 你选对了吗?
  4. Caffe官方教程翻译(7):Fine-tuning for Style Recognition
  5. HDU 5612 Baby Ming and Matrix games
  6. you know what I mean
  7. Linux移植随笔:终于解决Tslib的问题了【转】
  8. C#中的变量、常量、数据类型
  9. python 字符串 4位一组_Python基础4- 字符串
  10. C# 输出目录结构树到Console或文本文件
  11. 摘花生(信息学奥赛一本通-T1284)
  12. Spark基础学习笔记12:Scala内建控制结构
  13. python调用java方法_python调用Java方法传入HashMap ArrayList
  14. LeetCode 5357. 设计一个支持增量操作的栈
  15. [2018.07.10 T3]数论题
  16. Netty工作原理最详细分析
  17. 什么是GMSK调制-高斯最小移位键控
  18. 最全名企笔试题+算法题
  19. RUBi: Reducing Unimodal Biases in Visual Question Answering
  20. android 游戏 平板电脑,打游戏的安卓平板电脑推荐-打游戏最适合的平板电脑排行榜...

热门文章

  1. 假设检验、Z检验与T检验
  2. r语言rank降序_R语言速成之第一章 向量(编辑,排序,10个基本函数)
  3. 大数据工具:IKAnalyzer分词工具介绍与使用
  4. charles 如何获取电脑端微信小程序接口
  5. NPDP认证|制造业产品经理日常工作必备技能,快来学习提升吧!
  6. 计算机工资真的很高吗,有没有夸大?
  7. CteX的安装和使用
  8. 添加注册表右键以管理员身份在当前文件夹打开CMD窗口
  9. 深入.NET平台和C#编程_深入.NET框架
  10. nginx搭建私有云盘云桌面——“可道云”安装搭建