目录

实时ETL模块开发准备

一、编写配置文件

二、创建包结构

三、编写工具类加载配置文件


实时ETL模块开发准备

一、编写配置文件

  • 公共模块resources目录创建配置文件:config.properties
# CDH-6.2.1
bigdata.host=node2
# HDFS
dfs.uri=hdfs://node2:8020
# Local FS
local.fs.uri=file://
# Kafka
kafka.broker.host=node2
kafka.broker.port=9092
kafka.init.topic=kafka-topics --zookeeper node2:2181/kafka --create --replication-factor 1 --partitions 1 --topic logistics
kafka.logistics.topic=logistics
kafka.crm.topic=crm
# ZooKeeper
zookeeper.host=node2.
zookeeper.port=2181
# Kudu
kudu.rpc.host=node2
kudu.rpc.port=7051
kudu.http.host=node2
kudu.http.port=8051
# ClickHouse
clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
clickhouse.url=jdbc:clickhouse://node2:8123/logistics?characterEncoding=utf-8&useSSL=false
clickhouse.user=root
clickhouse.password=123456
# ElasticSearch
elasticsearch.host=node2
elasticsearch.rpc.port=9300
elasticsearch.http.port=9200
# Azkaban
app.first.runnable=true
# Oracle JDBC
db.oracle.url="jdbc:oracle:thin:@//192.168.88.10:1521/ORCL"
db.oracle.user=root
db.oracle.password=123456
# MySQL JDBC
db.mysql.driver=com.mysql.jdbc.Driver
db.mysql.url=jdbc:mysql://192.168.88.10:3306/crm?useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false
db.mysql.user=root
db.mysql.password=123456
## Data path of ETL program output ##
# Run in the yarn mode in Linux
spark.app.dfs.checkpoint.dir=/apps/logistics/dat-hdfs/spark-checkpoint
spark.app.dfs.data.dir=/apps/logistics/dat-hdfs/warehouse
spark.app.dfs.jars.dir=/apps/logistics/jars
# Run in the local mode in Linux
spark.app.local.checkpoint.dir=/apps/logistics/dat-local/spark-checkpoint
spark.app.local.data.dir=/apps/logistics/dat-local/warehouse
spark.app.local.jars.dir=/apps/logistics/jars
# Running in the local Mode in Windows
spark.app.win.checkpoint.dir=D://apps/logistics/dat-local/spark-checkpoint
spark.app.win.data.dir=D://apps/logistics/dat-local/warehouse
spark.app.win.jars.dir=D://apps/logistics/jars

二、​​​​​​​创建包结构

本次项目采用scala编程语言,因此创建scala目录

包名

说明

cn.it.logistics.etl.realtime

实时ETL程序所在包

cn.it.logistics.etl.parser

Canal和Ogg数据解析类所在包

三、编写工具类加载配置文件

实现步骤:

  • 公共模块scala目录下common包下创建 Configure  单例对象
  • 编写代码
    • 使用 ResourceBundle.getBundle 获取配置对象
    • 调用config.getString方法加载 config.properties 配置
    • 添加一个 main 方法测试,工具类是否能够正确读取出配置项

参考代码:

package cn.it.logistics.commonimport java.util.{Locale, ResourceBundle}/*** 读取配置文件的工具类*/
class Configuration {/*** 定义配置文件操作的对象*/private val resourceBundle: ResourceBundle = ResourceBundle.getBundle("config", new Locale("zh", "CN"))private val sep = ":"// CDH-6.2.1val bigdataHost: String = resourceBundle.getString("bigdata.host")// HDFSval dfsUri: String = resourceBundle.getString("dfs.uri")// Local FSval localFsUri: String = resourceBundle.getString("local.fs.uri")// Kafkaval kafkaBrokerHost: String = resourceBundle.getString("kafka.broker.host")val kafkaBrokerPort: Int = Integer.valueOf(resourceBundle.getString("kafka.broker.port"))val kafkaInitTopic: String = resourceBundle.getString("kafka.init.topic")val kafkaLogisticsTopic: String = resourceBundle.getString("kafka.logistics.topic")val kafkaCrmTopic: String = resourceBundle.getString("kafka.crm.topic")val kafkaAddress = kafkaBrokerHost+sep+kafkaBrokerPort// Sparkval LOG_OFF = "OFF"val LOG_DEBUG = "DEBUG"val LOG_INFO = "INFO"val LOCAL_HADOOP_HOME = "E:\\softs\\hadoop-3.0.0"val SPARK_KAFKA_FORMAT = "kafka"val SPARK_KUDU_FORMAT = "kudu"val SPARK_ES_FORMAT = "es"val SPARK_CLICKHOUSE_FORMAT = "clickhouse"// ZooKeeperval zookeeperHost: String = resourceBundle.getString("zookeeper.host")val zookeeperPort: Int = Integer.valueOf(resourceBundle.getString("zookeeper.port"))// Kuduval kuduRpcHost: String = resourceBundle.getString("kudu.rpc.host")val kuduRpcPort: Int = Integer.valueOf(resourceBundle.getString("kudu.rpc.port"))val kuduHttpHost: String = resourceBundle.getString("kudu.http.host")val kuduHttpPort: Int = Integer.valueOf(resourceBundle.getString("kudu.http.port"))val kuduRpcAddress = kuduRpcHost+sep+kuduRpcPort// ClickHouseval clickhouseDriver: String = resourceBundle.getString("clickhouse.driver")val clickhouseUrl: String = resourceBundle.getString("clickhouse.url")val clickhouseUser: String = resourceBundle.getString("clickhouse.user")val clickhousePassword: String = resourceBundle.getString("clickhouse.password")// ElasticSearchval elasticsearchHost: String = resourceBundle.getString("elasticsearch.host")val elasticsearchRpcPort: Int = Integer.valueOf(resourceBundle.getString("elasticsearch.rpc.port"))val elasticsearchHttpPort: Int = Integer.valueOf(resourceBundle.getString("elasticsearch.http.port"))val elasticsearchAddress = elasticsearchHost+sep+elasticsearchHttpPort// Azkabanval isFirstRunnable = java.lang.Boolean.valueOf(resourceBundle.getString("app.first.runnable"))// ## Data path of ETL program output ##// # Run in the yarn mode in Linuxval sparkAppDfsCheckpointDir = resourceBundle.getString("spark.app.dfs.checkpoint.dir")// /apps/logistics/dat-hdfs/spark-checkpointval sparkAppDfsDataDir = resourceBundle.getString("spark.app.dfs.data.dir")// /apps/logistics/dat-hdfs/warehouseval sparkAppDfsJarsDir = resourceBundle.getString("spark.app.dfs.jars.dir")// /apps/logistics/jars// # Run in the local mode in Linuxval sparkAppLocalCheckpointDir = resourceBundle.getString("spark.app.local.checkpoint.dir")// /apps/logistics/dat-local/spark-checkpointval sparkAppLocalDataDir = resourceBundle.getString("spark.app.local.data.dir")// /apps/logistics/dat-local/warehouseval sparkAppLocalJarsDir = resourceBundle.getString("spark.app.local.jars.dir")// /apps/logistics/jars// # Running in the local Mode in Windowsval sparkAppWinCheckpointDir = resourceBundle.getString("spark.app.win.checkpoint.dir")// D://apps/logistics/dat-local/spark-checkpointval sparkAppWinDataDir = resourceBundle.getString("spark.app.win.data.dir")// D://apps/logistics/dat-local/warehouseval sparkAppWinJarsDir = resourceBundle.getString("spark.app.win.jars.dir")// D://apps/logistics/jarsval dbOracleUrl = resourceBundle.getString("db.oracle.url")val dbOracleUser = resourceBundle.getString("db.oracle.user")val dbOraclePassword = resourceBundle.getString("db.oracle.password")val dbMySQLDriver = resourceBundle.getString("db.mysql.driver")val dbMySQLUrl = resourceBundle.getString("db.mysql.url")val dbMySQLUser = resourceBundle.getString("db.mysql.user")val dbMySQLPassword = resourceBundle.getString("db.mysql.password")
}
object Configuration extends Configuration {def main(args: Array[String]): Unit = {println(Configuration.dbOracleUrl)println(Configuration.dbMySQLDriver)println(Configuration.dbMySQLUrl)println(Configuration.dbMySQLPassword)}
}