DataX Transformer从入口到加载的源码分析及UDF扩展与使用
- DataX GitHub
- DataX Transformer
目录
- 1 前言
- 2 需求说明
- 3 解决方案分析
- 4 解密算法
- 5 Hive UDF
- 5.1 测试数据
- 5.2 新建 Maven 项目
- 5.3 POM
- 5.4 UDF
- 5.5 测试代码
- 5.6 编译打包
- 5.7 使用
- 6 DataX
- 6.1 DataX Transformer
- 6.2 Transformer配置样例
- 6.3 一份测试数据
- 6.4 方法1:开发自定义的解密 Transformer功能
- 6.5 方法2: 通过使用dx_groovy
- 6.6 DataX Transformer 进阶
- 6.6.1 入口到执行
- 6.6.2 Transformer参数
- 6.6.3 Transformer 加载
- 6.6.4 更优雅的加载第三方 Transformer UDF
- 7 小节
1 前言
2 需求说明
下面会以解密处理指定字段(也可以对指定字段进行其它的处理)、整个流程为 MySQL -> DataX -> Hive -> sql 脚本定时调度执行 为例给出一种通用的高效的优雅的解决方法。
3 解决方案分析
基于整个流程以及性能的考虑,总体上主要有两种方案,三种实现方式。
4 解密算法
package yore.utils;import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.crypto.Cipher;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.SecretKeySpec;
import java.io.UnsupportedEncodingException;
import java.security.NoSuchAlgorithmException;/*** Created by yore on 2020/5/6 09:57*/
public class AesUtil {private static Logger LOG = LoggerFactory.getLogger(AesUtil.class);;private static final String CHARSET_NAME = "UTF-8";private static final String C_Key = "Mercury";private static SecretKeySpec skeySpec;private static Cipher cipher;static {init(C_Key);}/*** 初始化加密工具类** @param key 指定的密钥*/public static void init(String key){if(StringUtils.isEmpty(key)){key = C_Key;}else if(key.length()<16){key = String.format("%-16s", key);}else if(key.length()>16){key = key.substring(0, 16);}System.out.println("key = " + key);try {byte[] raw = key.getBytes("ASCII");skeySpec = new SecretKeySpec(raw, "AES");cipher = Cipher.getInstance("AES");} catch (UnsupportedEncodingException e) {LOG.error(e.getMessage());} catch (NoSuchPaddingException e) {LOG.error(e.getMessage());} catch (NoSuchAlgorithmException e) {LOG.error(e.getMessage());}}/*** 返回解密后的字符串(默认使用 cKey)** @param content 待解密的字符串* @return 解密后的字符串。null 值返回 null,空字符串返回空字符串*/public static String decrypt(String content) throws Exception {if(null == content) return null;if("".equals(content)) return "";try {cipher.init(2, skeySpec);return new String(cipher.doFinal(toByte(new String(content.getBytes(), CHARSET_NAME))), CHARSET_NAME);} catch (Exception e) {LOG.error("AES解密失败,失败原因:" + e.getMessage());return null;}}/*** 返回加密后的字符串(默认使用 cKey)** @param content 待加密的字符串* @return 加密后的字符串*/public static String encrypt(String content) throws Exception {try {cipher.init(1, skeySpec);return toHexStr(cipher.doFinal(content.getBytes("utf-8")));}catch (Exception e){LOG.error("AES加密失败,失败原因:" + e.getMessage());e.printStackTrace();return null;}}public static byte[] toByte(String hexStr) {if (hexStr.length() < 1) {return null;} else {byte[] result = new byte[hexStr.length() / 2];for(int i = 0; i < hexStr.length() / 2; ++i) {int high = Integer.parseInt(hexStr.substring(i * 2, i * 2 + 1), 16);int low = Integer.parseInt(hexStr.substring(i * 2 + 1, i * 2 + 2), 16);result[i] = (byte)(high * 16 + low);}return result;}}public static String toHexStr(byte[] buf) {StringBuffer sb = new StringBuffer();for(int i = 0; i < buf.length; ++i) {String hex = Integer.toHexString(buf[i] & 255);if (hex.length() == 1) {hex = '0' + hex;}sb.append(hex.toUpperCase());}return sb.toString();}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>yore</groupId><artifactId>common-utils</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>yore utils</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.12</version><scope>compile</scope></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.3.2</version></dependency></dependencies><build><finalName>yore-utils-${project.version}</finalName><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.7.0</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target><encoding>${project.build.sourceEncoding}</encoding></configuration></plugin></plugins></build>
</project>
5 Hive UDF
5.1 测试数据
假如有如下测试数据,其中字段utterance 为加密的内容,下面通过 UDF 方式实现该字段的解密。
-- 建表
CREATE TABLE `flower_name` (`id` int COMMENT '主键',`date` varchar(110) COMMENT '日期',`en_name` varchar(128) COMMENT '英文名',`name` varchar(128) COMMENT '花名',`utterance` varchar(255) COMMENT '含义'
)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
STORED AS TEXTFILE;-- 如下数据文件,以 | 分割的文本数据
1|1月1日|雪莲花|Snow Drop|81B60DA28D5DEF0B8F735D1CBA56A4E5
2|1月2日|黄水仙|Narcisus Jonquilla|BAEE91C1CAFB8D7060F606843298C424
3|1月3日|藏红花|Spring Crocus|1ED48705D0020CE92130DFE3BBE37852
4|1月4日|风信子|Hyacinth|7916CCFA2E24A9AB08C620F5ABBEEFD2
5|1月5日|雪割草|hepatica|EE4DDBC6D4CE93499A37F4A93EBD1DDB
6|1月6日|白色紫罗兰|Violet|DD7F5675C6C2D9A1B5B9F7A7764A303EB43854DB83A3EE5FEA49ADB46E2F7F6B
7|1月7日|白色郁金香|Tulip|A9E978CEAF80344304487C4384609C53
8|1月8日|紫色紫罗兰|Violet|8D4C956F2E4C8A9A5B286221DFD0ECF6
9|1月9日|黄色紫罗兰|Voilet|D7B297C136E2CEA3457C6E7C72993210
10|1月10日|黄杨|Box-Tree|4134826A9117E0169E1F58607D0FB967
11|1月11日|匀桧叶|Arbor-Vitae|78C8A5BD91B0B46944DF8F5B817718D8
12|1月12日|庭荠|Sweet Alyssum|344FF3CBB0C99E82B04BA2BDC782814B
13|1月13日|水仙|Narcissus|F0255F21D8DDF2DD1B0F310297AF4E6B
14|1月14日|报春花|Cyclamen|FBA715DBED589345FAC5D798B1AE0D7E
15|1月15日|剌竹|Thorn|00D5A273C1A47851064DB3600E7F4D43
16|1月16日|黄色风信子|Hyacinth|3DB90590EEEFA2F16AA6A83975F24E77
17|1月17日|酸模|Sorrel|AEF74CA958A3BED52E12394C367B329A
18|1月18日|印度锦葵|Indian Mallow|BAFA1A30FC4F494E32A8134284AAB2F4
19|1月19日|松|Pine|D4591D56BAD2DD460A9D94C859576A4A
20|1月20日|金凤花|Butter Cup|1F63DD3F25E38A6492F955758FA4761B
21|1月21日|常春藤|Ivy|E4E7CEA6348A346201D04BD0A922A287
22|1月22日|苔藓|Moss|F372A370BB2204A1C8B8B56A2AB076C5
23|1月23日|芦荟|Bulrush|259F62C87A8B05B2CE55F37CE2919F46
24|1月24日|番红花|Saffron Crocus|61E2F3C526C33134FFC1DF375789AC8D
25|1月25日|鼠耳草|Cerasrium|444DBD65A8266BBB71AFEB0AF3E44C3F
26|1月26日|含羞草|Humble Plant|30AB4150B2063F4E6C6BF27371191860
27|1月27日|七度灶|Mountain Ash|00DBCE714104EBDF455BD426B2F811B1
28|1月28日|黑色白杨木|Black Piolar|8BA91B432FBAD26736B2372C65E13FB5
29|1月29日|苔藓|Moss|DD64FE4645DFD4A8A5A788E94B67428F
30|1月30日|金盏花|Marsh Marigold|73A1ABB30A8D7E11E2A7BE8600273F2D
31|1月31日|黄色藏红花|Spring Crocus|129E46A29C3F1E9E9F689B9EE29798D5-- 将数据导入 Hive。从本地到如到表
LOAD DATA LOCAL INPATH '/home/yore/data/flower_name.txt' OVERWRITE INTO TABLE flower_name;
5.2 新建 Maven 项目
例如使用 IntelliJ IDEA 创建一个 Maven 项目,GAV 信息类似于如下:
<artifactId>udf_decrypt</artifactId><groupId>yore</groupId><version>1.0-SNAPSHOT</version><packaging>jar</packaging>
5.3 POM
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><provided.scope>provided</provided.scope><hadoop.version>3.1.2</hadoop.version><hive.version>3.1.1</hive.version></properties><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.2</version><scope>${provided.scope}</scope></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>3.1.1</version><scope>${provided.scope}</scope></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-cli</artifactId><version>3.1.1</version><scope>${provided.scope}</scope></dependency><!-- 引入自己的工具包模块,包含解密方法的 --><dependency><groupId>yore</groupId><artifactId>common-utils</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency></dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.7.0</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target><encoding>${project.build.sourceEncoding}</encoding></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions><configuration><finalName>udf_decrypt</finalName><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.20.1</version><configuration><skip>true</skip></configuration></plugin><plugin><artifactId>maven-source-plugin</artifactId><version>2.4</version><executions><execution><phase>package</phase><goals><goal>jar-no-fork</goal></goals></execution></executions></plugin></plugins></build>
5.4 UDF
package yore;import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.log4j.Logger;
import yore.utils.AesUtil;/**** <pre>* hadoop fs -put zodiac.jar /app/udf-lib/** CREATE FUNCTION default.udf_decrypt AS 'yore.DecryptUDF' USING jar 'hdfs:/app/udf-lib/zodiac.jar';* CREATE TEMPORARY FUNCTION default.udf_decrypt2 AS 'yore.DecryptUDF' USING jar 'hdfs:/app/udf-lib/zodiac.jar';** SELECT udf_decrypt("F9953708E0E479101887BF30ECF4606A");* SELECT udf_decrypt("F9953708E0E479101887BF30ECF4606A") FROM movie LIMIT 20;* DROP FUNCTION udf_decrypt;* </pre>** Created by yore on 2020/5/6 09:57*/
@Description(name="udf_decrypt",value="_FUNC_(value) - Returns the decrypted value of the encrypted string",extended = "Example:\n"+ " > SELECT _FUNC_('4A6DE4A853E3C69B5FA63D70C7B1F350') AS a1 FROM pdata_chq.t_test_jm limit 1; \n")
public class DecryptUDF extends GenericUDF {private static Logger logger = Logger.getLogger(DecryptUDF.class);@Overridepublic ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {if(arguments.length != 1){throw new UDFArgumentException("The operator 'udf_decrypt' accepts 1 arguments.");}ObjectInspector returnType = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING);return returnType;}@Overridepublic Object evaluate(DeferredObject[] arguments) throws HiveException {if(arguments[0].get()==null)return null;try {return AesUtil.decrypt(String.valueOf(arguments[0].get()));}catch (Exception e){logger.error(e.getMessage());return null;}}@Overridepublic String getDisplayString(String[] children) {StringBuilder sb = new StringBuilder();sb.append("返回 " + children[0] + " 的解密后的值").append("\n").append("Usage: udf_decrypt(cloumn_name)").append("\n").append("当解密过程中报异常,返回 null");return sb.toString();}
}
5.5 测试代码
可以在不进行部署的情况下,测试 UDF 的逻辑是否有误,测试代码采用 JUNIT测试框架实现(但这不保证集群环境一定没问题,因为测试过程中没有获取元数据的过程,但可以检测计算逻辑是否正确),如下
package yore;import junit.framework.Assert;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredJavaObject;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;
import org.junit.Test;/*** Created by yore on 2020/4/30 10:12*/
public class DecryptUDFTest {@Testpublic void decryptUDFTest() throws HiveException {DecryptUDF udf = new DecryptUDF();ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.writableVoidObjectInspector;ObjectInspector[] arguments = { valueOI1};udf.initialize(arguments);DeferredObject[] args = new DeferredObject[1];String columnVal = "F9953708E0E479101887BF30ECF4606A";args[0] = new DeferredJavaObject(columnVal==null? null: new Text(columnVal));Object result = udf.evaluate(args);System.out.println(result);Assert.assertEquals(result, "Yore");columnVal = null;args[0] = new DeferredJavaObject(columnVal==null? null: new Text(columnVal));result = udf.evaluate(args);Assert.assertEquals(result, null);columnVal = "DD64FE4645DFD4A8A5A788E94B67428F";args[0] = new DeferredJavaObject(columnVal==null? null: new Text(columnVal));result = udf.evaluate(args);System.out.println(result);Assert.assertEquals(result, "呵护");}
}
5.6 编译打包
5.7 使用
使用 Maven 打包命令,将上述编写好的代码打包,上传到HDFS 的指定路径下(例如/app/udf-lib/),然后执行如下 SQL。
-- 创建函数。
-- 这样会创建一个永久函数,如果是临时函数在 CREATE 后加上 TEMPORARY 关键字即可
CREATE FUNCTION udf_decrypt AS 'yore.DecryptUDF' USING jar 'hdfs:/app/udf-lib/udf_decrypt-jar-with-dependencies.jar';-- 查看函数说明
DESCRIBE FUNCTION EXTENDED udf_decrypt ;-- 解密数据
DESCRIBE FUNCTION EXTENDED udf_decrypt ;-- 查询数据,解密 utterance 字段信息
SELECT id,`date`,en_name,name,udf_decrypt(utterance) FROM flower_name;-- 删除自定义函数
DROP FUNCTION udf_decrypt;
6 DataX
6.1 DataX Transformer
6.2 Transformer配置样例
{"job": {"setting": {……},"content": [{"reader": {……},"writer": {……},"transformer": [{"name": "dx_substr","parameter": {"columnIndex": 5,"paras": [ "1", "3" ]}}]}]}
}
6.3 一份测试数据
-- 建表(MySQL)
CREATE TABLE `flower_name` (`id` int(6) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`date` varchar(110) COLLATE utf8_bin DEFAULT NULL COMMENT '日期',`en_name` varchar(128) COLLATE utf8_bin NOT NULL COMMENT '英文名',`name` varchar(128) COLLATE utf8_bin NOT NULL COMMENT '花名',`utterance` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '含义',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=367 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;-- 插入测试数据
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(1, '1月1日', '雪莲花', 'Snow Drop', '81B60DA28D5DEF0B8F735D1CBA56A4E5');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(2, '1月2日', '黄水仙', 'Narcisus Jonquilla', 'BAEE91C1CAFB8D7060F606843298C424');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(3, '1月3日', '藏红花', 'Spring Crocus', '1ED48705D0020CE92130DFE3BBE37852');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(4, '1月4日', '风信子', 'Hyacinth', '7916CCFA2E24A9AB08C620F5ABBEEFD2');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(5, '1月5日', '雪割草', 'hepatica', 'EE4DDBC6D4CE93499A37F4A93EBD1DDB');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(6, '1月6日', '白色紫罗兰', 'Violet', 'DD7F5675C6C2D9A1B5B9F7A7764A303EB43854DB83A3EE5FEA49ADB46E2F7F6B');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(7, '1月7日', '白色郁金香', 'Tulip', 'A9E978CEAF80344304487C4384609C53');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(8, '1月8日', '紫色紫罗兰', 'Violet', '8D4C956F2E4C8A9A5B286221DFD0ECF6');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(9, '1月9日', '黄色紫罗兰', 'Voilet', 'D7B297C136E2CEA3457C6E7C72993210');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(10, '1月10日', '黄杨', 'Box-Tree', '4134826A9117E0169E1F58607D0FB967');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(11, '1月11日', '匀桧叶', 'Arbor-Vitae', '78C8A5BD91B0B46944DF8F5B817718D8');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(12, '1月12日', '庭荠', 'Sweet Alyssum', '344FF3CBB0C99E82B04BA2BDC782814B');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(13, '1月13日', '水仙', 'Narcissus', 'F0255F21D8DDF2DD1B0F310297AF4E6B');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(14, '1月14日', '报春花', 'Cyclamen', 'FBA715DBED589345FAC5D798B1AE0D7E');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(15, '1月15日', '剌竹', 'Thorn', '00D5A273C1A47851064DB3600E7F4D43');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(16, '1月16日', '黄色风信子', 'Hyacinth', '3DB90590EEEFA2F16AA6A83975F24E77');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(17, '1月17日', '酸模', 'Sorrel', 'AEF74CA958A3BED52E12394C367B329A');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(18, '1月18日', '印度锦葵', 'Indian Mallow', 'BAFA1A30FC4F494E32A8134284AAB2F4');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(19, '1月19日', '松', 'Pine', 'D4591D56BAD2DD460A9D94C859576A4A');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(20, '1月20日', '金凤花', 'Butter Cup', '1F63DD3F25E38A6492F955758FA4761B');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(21, '1月21日', '常春藤', 'Ivy', 'E4E7CEA6348A346201D04BD0A922A287');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(22, '1月22日', '苔藓', 'Moss', 'F372A370BB2204A1C8B8B56A2AB076C5');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(23, '1月23日', '芦荟', 'Bulrush', '259F62C87A8B05B2CE55F37CE2919F46');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(24, '1月24日', '番红花', 'Saffron Crocus', '61E2F3C526C33134FFC1DF375789AC8D');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(25, '1月25日', '鼠耳草', 'Cerasrium', '444DBD65A8266BBB71AFEB0AF3E44C3F');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(26, '1月26日', '含羞草', 'Humble Plant', '30AB4150B2063F4E6C6BF27371191860');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(27, '1月27日', '七度灶', 'Mountain Ash', '00DBCE714104EBDF455BD426B2F811B1');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(28, '1月28日', '黑色白杨木', 'Black Piolar', '8BA91B432FBAD26736B2372C65E13FB5');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(29, '1月29日', '苔藓', 'Moss', 'DD64FE4645DFD4A8A5A788E94B67428F');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(30, '1月30日', '金盏花', 'Marsh Marigold', '73A1ABB30A8D7E11E2A7BE8600273F2D');
INSERT INTO flower_name(id, date, name, en_name, utterance) VALUES(31, '1月31日', '黄色藏红花', 'Spring Crocus', '129E46A29C3F1E9E9F689B9EE29798D5');
6.4 方法1:开发自定义的解密 Transformer功能
<groupId>com.yore</groupId><artifactId>datax-decrypt</artifactId><version>1.0.0-SNAPSHOT</version><packaging>jar</packaging><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><datax.version>0.0.1-SNAPSHOT</datax.version></properties><dependencies><!-- 可以不引入,这里主要分析解密方法和用于测试 --><!--<dependency><groupId>yore.utils</groupId><artifactId>jiemi</artifactId><version>1.0</version><scope>system</scope><systemPath>${project.basedir}/lib/jiemi.jar</systemPath></dependency>--><!-- 引入自己的工具包模块,包含解密方法的 --><dependency><groupId>yore</groupId><artifactId>common-utils</artifactId><version>1.0-SNAPSHOT</version></dependency><!-- DataX Common --><dependency><groupId>com.alibaba.datax</groupId><artifactId>datax-common</artifactId><version>${datax.version}</version><scope>system</scope><systemPath>${project.basedir}/lib/datax-common-0.0.1-SNAPSHOT.jar</systemPath></dependency><!-- DataX Transformer --><dependency><groupId>com.alibaba.datax</groupId><artifactId>datax-transformer</artifactId><version>${datax.version}</version><scope>system</scope><systemPath>${project.basedir}/lib/datax-transformer-0.0.1-SNAPSHOT.jar</systemPath></dependency><!-- DataX Core --><dependency><groupId>com.alibaba.datax</groupId><artifactId>datax-core</artifactId><version>${datax.version}</version><scope>system</scope><systemPath>${project.basedir}/lib/datax-core-0.0.1-SNAPSHOT.jar</systemPath></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.12</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.3.2</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency></dependencies>
package com.alibaba.datax.core.transport.transformer;import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.transformer.Transformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import yore.utils.AesUtil;import java.util.Arrays;/*** 自定义一个 DataX Transformer ,用于特殊情况下的解密** <pre>* 编译项目(clone 完整项目时务必这样编译)* mvn clean compile -Dmaven.test.skip=true* </pre>** Created by yore on 2020/5/7 10:02*/
public class YoreDecryptTransformer extends Transformer{private static final Logger LOG = LoggerFactory.getLogger(YoreDecryptTransformer.class);public YoreDecryptTransformer() {setTransformerName("dx_yore_decrypt");}/*** 实现对指定字段的解密处理** @author Yore* @param record 记录* @param paras transformer 方法传入的参数* @return Record 记录*/@Overridepublic Record evaluate(Record record, Object... paras) {int columnIndex;String aesKey = null;try {if(paras.length == 1){columnIndex = (Integer) paras[0];}else if(paras.length ==2){columnIndex = (Integer) paras[0];aesKey = (String) paras[1];}else{throw new RuntimeException(getTransformerName() + " paras at moust 2");}}catch (Exception e){throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER, "paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage());}Column column = record.getColumn(columnIndex);try {String oriValue = column.asString();//如果字段为空,跳过,不进行解密操作if(oriValue == null){return record;}String newValue;if(aesKey == null){newValue = AesUtil.decrypt(oriValue);}else if(aesKey.trim().length()<1){newValue = AesUtil.decrypt(oriValue);LOG.warn("指定的解密密钥 key={} 无效,将采用默认密钥解密", aesKey);}else {AesUtil.init(aesKey);newValue = AesUtil.decrypt(oriValue);}record.setColumn(columnIndex, new StringColumn(newValue));}catch (Exception e){throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_RUN_EXCEPTION, e.getMessage(),e);}return record;}
}
剩下最重要的一步就是把上面扩展的 Transformer 注册到 DataX 中。打开 com.alibaba.datax.core.transport.transformer;
类添加如下代码:
package com.alibaba.datax.core.transport.transformer;
// ……
/*** no comments.* Created by liqiang on 16/3/3.*/
public class TransformerRegistry {// ……static {/*** add native transformer* local storage and from server will be delay load.*/registTransformer(new SubstrTransformer());registTransformer(new PadTransformer());registTransformer(new ReplaceTransformer());registTransformer(new FilterTransformer());registTransformer(new GroovyTransformer());// 自定义添加的 transformerregistTransformer(new YoreDecryptTransformer());}// ……
}
{"job": {"setting": {"speed": {"channel": 1},"errorLimit": {"record": 0}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","column": ["id","date","en_name","name","utterance"],"splitPk": "id","connection": [{"table": ["flower_name"],"jdbcUrl": ["jdbc:mysql://cdh1:3306/flink_test"]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://cdh1:8020","column": [{"name": "id","type": "BIGINT"},{"name": "date","type": "STRING"},{"name": "en_name","type": "STRING"},{"name": "name","type": "STRING"},{"name": "utterance","type": "STRING"}],"fieldDelimiter": "|","fileName": "flower_name","fileType": "text","path": "/yore","writeMode": "append"}},"transformer": [{"name": "dx_yore_decrypt","parameter": {"columnIndex": 4,"paras": []}}]}]}
}
# 查看HDFS目标目录下的文件信息hadoop fs -ls /yore# 查看数据
hadoop fs -cat /yore/flower_name__b00a2792_1c23_4074_ac94_7d40f5409b16
6.5 方法2: 通过使用dx_groovy
// getColumn(4) 获取第 reader 处column 中指定的第 5 个字段对象。
Column column = record.getColumn(4);// .asString() 将获取的第5个字段对象转成字符串的值。
// 同时还支持:asLong、asDouble、asDate、asBytes、asBoolean、asBigDecimal、asBigInteger
String oriValue = column.asString();// 调用自己内部提供的解密 jar包中的工具类的解密方法
String newValue= AesUtil.decrypt(oriValue);// 将第五个字段的值改为解密后的值,
// 同时还支持new 的对象有: LongColumn、DoubleColumn、DateColumn、BytesColumn、BoolColumn、
record.setColumn(4, new StringColumn(newValue));// 返回设置好的记录对象
return record;
也可以将上面的代码合并为一行。如果解密多个字段,可以合并写为(例如同时解密第5 和第 7 个字段的值)
record.setColumn(4, new StringColumn(AesUtil.decrypt(record.getColumn(4).asString())));
record.setColumn(6, new StringColumn(AesUtil.decrypt(record.getColumn(6).asString())));
return record;
{"job": {"setting": {"speed": {"channel": 1},"errorLimit": {"record": 0}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","column": ["id","date","en_name","name","utterance"],"splitPk": "id","connection": [{"table": ["flower_name"],"jdbcUrl": ["jdbc:mysql://cdh1:3306/flink_test"]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://cdh1:8020","column": [{"name": "id","type": "BIGINT"},{"name": "date","type": "STRING"},{"name": "en_name","type": "STRING"},{"name": "name","type": "STRING"},{"name": "utterance","type": "STRING"}],"fieldDelimiter": "|","fileName": "flower_name","fileType": "text","path": "/yore","writeMode": "append"}},"transformer": [{"name": "dx_groovy","parameter": {"code": "record.setColumn(4, new StringColumn(AesUtil.decrypt(record.getColumn(4).asString())));\nreturn record;","extraPackage": ["import yore.utils.AesUtil;"]}}]}]}
}
6.6 DataX Transformer 进阶
6.6.1 入口到执行
LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (DATAX_HOME, LOGBACK_FILE)
ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (DEFAULT_PROPERTY_CONF, CLASS_PATH)
REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"
/*** 获取transformer的参数*/
List<TransformerExecution> transformerInfoExecs = TransformerUtil.buildTransformerInfo(taskConfig);/*** 生成writerThread*/
writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
this.writerThread = new Thread(writerRunner,String.format("%d-%d-%d-writer",jobId, taskGroupId, this.taskId));
//通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器
this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME)));
/*** 生成readerThread*/
readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs);
this.readerThread = new Thread(readerRunner,String.format("%d-%d-%d-reader",jobId, taskGroupId, this.taskId));
/*** 通过设置thread的contextClassLoader,即可实现同步和主程序不通的加载器*/
this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.taskConfig.getString(CoreConstant.JOB_READER_NAME)));
6.6.2 Transformer参数
在查看 TransformerExecution 之前,我们先看下 TransformerExecutionParas 部分的源码,这个类主要定义了 Transformer 的参数格式,
public class TransformerExecutionParas {/*** 以下是function参数*/private Integer columnIndex;private String[] paras;private Map<String, Object> tContext;private String code;private List<String> extraPackage;// 省略 Getter 和 Setter 方法
}
columnIndex
:字段编号,对应record中第几个字段,从 0 开始。paras
:方法中的参数列表。tContext
: 一个map 类型的上下文参数,对应于josn 中的 transformer 节点下的 parameter.context,在 TransformerUtil 中将 parameter.context 的值设置到此变量上。code
:代码,目前主要用于dx_groovy 中用来这支 groovy 代码片段。extraPackage
:目前主要用于 dx_groovy 的 groovy 代码段中引入的包名。
我们继续看下 TransformerExecution 部分的源码,重点查看 genFinalParas
方法:
public class TransformerExecution {…private Object[] finalParas;public void genFinalParas() {/*** groovy不支持传参*/if (transformerInfo.getTransformer().getTransformerName().equals("dx_groovy")) {finalParas = new Object[2];finalParas[0] = transformerExecutionParas.getCode();finalParas[1] = transformerExecutionParas.getExtraPackage();return;}/*** 其他function,按照columnIndex和para的顺序,如果columnIndex为空,跳过conlumnIndex*/if (transformerExecutionParas.getColumnIndex() != null) {if (transformerExecutionParas.getParas() != null) {finalParas = new Object[transformerExecutionParas.getParas().length + 1];System.arraycopy(transformerExecutionParas.getParas(), 0, finalParas, 1, transformerExecutionParas.getParas().length);} else {finalParas = new Object[1];}finalParas[0] = transformerExecutionParas.getColumnIndex();} else {if (transformerExecutionParas.getParas() != null) {finalParas = transformerExecutionParas.getParas();} else {finalParas = null;}}}……
}
通过源码,我们可以知道 DataX 对 Transformer 参数主要做了如下:
- 当 TransformerName 为
dx_groovy
时,限制了其参数中只能为两个,且必须是 code 和 extraPackage - 当不是
dx_groovy
时- 如果指定了
parameter.columnIndex
,就继续获取 parameter.paras ,如果 paras 没有指定就初始化一个长度为 1 的对象数组;否则就创建一个 paras 的长度加 1 的长度的数组,并拷贝到创建的数组中的 索引位 1 之后的中。最后将 columnIndex 放到 0 位置上。 - 如果没有指定了
parameter.columnIndex
,同样根据 parameter.paras 创建参数对象数组,否则 finalParas 其值为 null。
- 如果指定了
所以在实现自己的 Transformer UDF 时,对参数的设置和传入需要满足上面的要求。
6.6.3 Transformer 加载
下面就调用了 TransformerRegistry 的 loadTransformerFromLocalStorage 对用到的第三方的 Transformer 方法按需延迟加载。
/*** 延迟load 第三方插件的function,并按需load*/
TransformerRegistry.loadTransformerFromLocalStorage(functionNames);
public static void loadTransformerFromLocalStorage(List<String> transformers) {String[] paths = new File(CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME).list();if (null == paths) {return;}for (final String each : paths) {try {if (transformers == null || transformers.contains(each)) {// 这里的 each 就是我们添加的以 UDF name 为名字的文件夹名loadTransformer(each);}} catch (Exception e) {LOG.error(String.format("skip transformer(%s) loadTransformer has Exception(%s)", each, e.getMessage()), e);}}
}
public static void loadTransformer(String each) {String transformerPath = CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME + File.separator + each;Configuration transformerConfiguration;try {transformerConfiguration = loadTransFormerConfig(transformerPath);} catch (Exception e) {LOG.error(String.format("skip transformer(%s),load transformer.json error, path = %s, ", each, transformerPath), e);return;}String className = transformerConfiguration.getString("class");if (StringUtils.isEmpty(className)) {LOG.error(String.format("skip transformer(%s),class not config, path = %s, config = %s", each, transformerPath, transformerConfiguration.beautify()));return;}String funName = transformerConfiguration.getString("name");if (!each.equals(funName)) {LOG.warn(String.format("transformer(%s) name not match transformer.json config name[%s], will ignore json's name, path = %s, config = %s", each, funName, transformerPath, transformerConfiguration.beautify()));}JarLoader jarLoader = new JarLoader(new String[]{transformerPath});try {Class<?> transformerClass = jarLoader.loadClass(className);Object transformer = transformerClass.newInstance();if (ComplexTransformer.class.isAssignableFrom(transformer.getClass())) {((ComplexTransformer) transformer).setTransformerName(each);registComplexTransformer((ComplexTransformer) transformer, jarLoader, false);} else if (Transformer.class.isAssignableFrom(transformer.getClass())) {((Transformer) transformer).setTransformerName(each);registTransformer((Transformer) transformer, jarLoader, false);} else {LOG.error(String.format("load Transformer class(%s) error, path = %s", className, transformerPath));}} catch (Exception e) {//错误funciton跳过LOG.error(String.format("skip transformer(%s),load Transformer class error, path = %s ", each, transformerPath), e);}
}private static Configuration loadTransFormerConfig(String transformerPath) {// 告诉了我们加载的 transformer 配置文件为 local_storage/transformer/UDF_NAME/transformer.jsonreturn Configuration.from(new File(transformerPath + File.separator + "transformer.json"));
}
6.6.4 更优雅的加载第三方 Transformer UDF
{"class": "Transformer全限定类名","name": "udf 的名字"
}
7 小节
DataX Transformer从入口到加载的源码分析及UDF扩展与使用相关推荐
- DataX Transformer 源码分析及 UDF 扩展与使用
DataX GitHub DataX Transformer 目录 1 前言 2 需求说明 3 解决方案分析 4 解密算法 5 Hive UDF 5.1 测试数据 5.2 新建 Maven 项目 5. ...
- MultiDex的加载dex源码分析
工作流程 MultiDex的工作流程具体分为两个部分,一个部分是打包构建Apk的时候,将Dex文件拆分成若干个小的Dex文件,这个Android Studio已经帮我们做了(设置 "mult ...
- AsyncTask异步加载的源码分析与实现实例
一 . AsyncTask Android的Lazy Load主要体现在网络数据(图片)异步加载.数据库查询.复杂业务逻辑处理以及费时任务操作导致的异步处理等方面.在介绍Android开发过程中,异步 ...
- Android UIL图片加载缓存源码分析-内存缓存
本篇文章我们来分析一下著名图片加载库Android-Universal-Image-Loader的图片缓存源码. 源码环境 版本:V1.9.5 GitHub链接地址:https://github.co ...
- YOLO3 数据处理与数据加载 Keras源码分析
YOLO3 Keras 源码:https://github.com/qqwweee/keras-yolo3 前言 本文从主要是从源码层面对 YOLO3 的数据处理相关内容进行分析与讲解.通常,一个功能 ...
- Volley 图片加载相关源码解析
转载请标明出处: http://blog.csdn.net/lmj623565791/article/details/47721631: 本文出自:[张鸿洋的博客] 一 概述 最近在完善图片加载方面的 ...
- FPGA - Zynq - 加载 - FSBL源码解析1
FPGA - Zynq - 加载 - FSBL源码解析1 前文回顾 FSBL的数据段和代码段如何链接 建个Example工程,不要光顾着看,自己动动手掌握的更快. 查看链接文件,原来存储空间是这样有条 ...
- Android开发之WebView加载HTML源码包含转义字符实现富文本显示的方法
老套路先看效果图: WebView加载带有转移字符的HTML源码 再看转义后的字符的效果图: 先看WebView加载HTML源码的方法如下: webview.loadDataWithBaseURL(n ...
- 未能加载文件或程序集rsy3_abp vnext2.0之核心组件模块加载系统源码解析
abp vnext是abp官方在abp的基础之上构建的微服务架构,说实话,看完核心组件源码的时候,很兴奋,整个框架将组件化的细想运用的很好,真的超级解耦.老版整个框架依赖Castle的问题,vnext ...
最新文章
- linux遍历文件夹下所有文件大小,Linux系统遍历文件夹 获取文件大小的操作(C语言实现)...
- 电脑账户头像怎么删掉_情侣头像丨情侣头像一男一女背影
- shell--printf
- python生成json_如何将Python数组转为Json格式数据并存储?
- web前端好入门吗?
- oracle往游标中存数据,Oracle数据库:ORACLE11G在存储过程里面遍历游标
- 常用推荐算法(50页干货)
- [转] 史上最全英文免费编程电子书列表
- Basler相机拍照
- 使用Docker部署ONLYOFFICE Document Server
- springboot集成阿里云短信
- win10 uwp 应用转后台清理内存
- 看不见的竞争-谈谈策略设计
- rem与px之间的转换
- windows 安装metis_如何在windows上安装python中的METIS包?
- 18.MYSQL数据库(2)
- 蓝桥杯 城市建设问题 java实现
- 无法打开python27_d.lib(python36_d.lib)的问题
- 响应式织梦模板婚纱照摄影类网站
- SAP中五个报废率的计算逻辑
热门文章
- 基于uml的大学图书馆图书信息管理系统设计实验_全国大学最美图书馆排行!这个学校居然有按摩服务?!...
- 4.9 数值分析: 牛顿下山法
- QtXlsx 读写 excel
- SpringCloud版本Hoxton SR5 --- 第七讲:SpringCloud Config 分布式配置中心+整合bus、rabbitmq、actuator
- Linux下的磁盘分区和逻辑卷
- 12.12飞思卡尔芯片的prm mcp和映射理解
- 工作4年从美团、360、陌陌、百度、阿里、京东面试回来感想
- Python代码制作24点小游戏
- MAC_BOOKPRO苹果电脑系统常用快捷键大全
- win10家庭版没有本地组策略编辑器