最近公司开始做大数据项目,让我使用sqoop(1.6.4版本)导数据进行数据分析计算,然而当我们将所有的工作流都放到azkaban上时整个流程跑完需要花费13分钟,而其中导数据(增量)就占了4分钟左右,老板给我提供了使用 spark 导数据的思路,学习整理了一个多星期,终于实现了sqoop的主要功能。

这里我使用的是pyspark完成的所有操作。

条件:hdfs平台,pyspark,ubuntu系统

运行:我这里是在 /usr/bin 目录下(或者指定在此目录下 )运行的python文件,也可以使用系统自带的pyspark

1 ./spark-submit --jars "/home/engyne/spark/ojdbc7.jar" --master local /home/engyne/spark/SparkDataBase.py

其中--jars 是指定连接oracle的驱动,ojdbc7.jar对应的是oracle12版本,--master local /...指定的是运行的python文件

注意:我的代码没有解决中文问题,所以不管是注释还是代码中都不能出现中文,记得删除!!!

1、pyspark连接oracle,导数据到hive(后面的代码需要在此篇代码基础上进行,重复代码不再copy了)

1 importsys2 from pyspark.sql importHiveContext3 from pyspark importSparkConf, SparkContext, SQLContext4

5 conf = SparkConf().setAppName('inc_dd_openings')6 sc = SparkContext(conf=conf)7 sqlContext =HiveContext(sc)8

9 #以下是为了在console中打印出表内容

10 reload(sys)11 sys.setdefaultencoding("utf-8")12

13 get_df_url = "jdbc:oracle:thin:@//192.168.1.1:1521/ORCLPDB"

14 get_df_driver = "oracle.jdbc.driver.OracleDriver"

15 get_df_user = "xxx"

16 get_df_password = "xxx"

17

18 df = sqlContext.read.format("jdbc") \19 .option("url", get_df_url) \20 .option("driver", get_df_driver) \21 .option("dbtable", "tableName") \22 .option("user", get_df_user).option("password", get_df_password) \23 .load()24 #df.show() #可以查看到获取的表的内容,默认显示20行

25 sqlContext.sql("use databaseName") #databaseName指定使用hive中的数据库

26 #创建临时表

27 df.registerTempTable("tempTable")28 #创建表并写入数据

29 sqlContext.sql("create table %s as select * from tempTable")

2、pyspark在hive中创建动态分区表

1 #修改一下hive的默认设置以支持动态分区

2 sqlContext.sql("set hive.exec.dynamic.partition=true")3 sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")4 #设置hive支持创建分区文件的最大值

5 sqlContext.sql("SET hive.exec.max.dynamic.partitions=100000")6 sqlContext.sql("SET hive.exec.max.dynamic.partitions.pernode=100000")

这里需要先手动创建分区表,我使用dataframe的dtypes属性获取到表结构,然后循环拼接表的每个字段在hive中所对应的类型

最后写入表数据的代码是:

1 sqlContext.sql("insert overwrite table STUDENT partition(AGE) SELECT ID,NAME,UPDATETIME,AGE FROM tempTable")

3、实现增量导入数据

我这里使用了MySql数据库,用来存储增量导入的信息,创建表(job)

DROP TABLE IF EXISTS`job`;CREATE TABLE`job` (

`id`int(10) NOT NULLAUTO_INCREMENT,

`database_name`varchar(50) DEFAULT NULL, --数据库名称

`table_name` varchar(100) DEFAULT NULL, --需要增量导入的表名

`partition_column_name` varchar(100) DEFAULT NULL, --分区的字段名(这里只考虑对一个字段分区,如果多个字段这里应该使用一对多表结构吧)

`partition_column_desc` varchar(50) DEFAULT NULL, --分区字段类型

`check_column` varchar(50) DEFAULT NULL, --根据(table_name中)此字段进行增量导入校验(我这里例子使用的是updatetime)

`last_value` varchar(255) DEFAULT NULL, --校验值

`status` int(1) NOT NULL, --是否使用(1表示此job激活)

PRIMARY KEY(`id`)

) INCREMENTAL=InnoDB AUTO_INCREMENT=81 DEFAULT CHARSET=utf8;

存储STUDENT表增量导入信息(这里是为了演示)

insert into `job`(`id`,`database_name`,`table_name`,`partition_column_name`,`partition_column_desc`,`check_column`,`last_value`,`status`)values (1,'test_datebase','STUDENT','AGE','string','UPDATETIME','2018-07-30',1)

python 连接MySql的方法我这里就直接怼代码了,具体详解大家就看菜鸟教程

Ubuntu需要安装MySQLdb(   sudo apt-get install python-mysqldb   )

importMySQLdb#insert update delete

defconMysqlDB_exec(sqlStr):

db= MySQLdb.connect("192.168.xxx.xxx", "xx", "xx", "xx", charset='utf8')

cursor=db.cursor()try:

cursor.execute(sqlStr)

db.commit()

result=Trueexcept:print("---->MySqlError: execute error")

result=False

db.rollback()

db.closereturnresult#select

defconMysqlDB_fetchall(sqlStr):

db= MySQLdb.connect("192.168.xxx.xxx", "xx", "xx", "xx", charset='utf8')

cursor=db.cursor()

results=[]try:

cursor.execute(sqlStr)

results=cursor.fetchall()except:print("---->MySqlError: unable to fecth data")

db.closereturn results

查询增量信息,使用spark进行导入

findJobSql = "SELECT * FROM job where status=1"

result=conMysqlDB_fetchall(findJobSql)

databaseName= val[1]

tableName= val[2]

partitionColumnName= val[3]

partitionColumnDesc= val[4]

checkColumn= val[5]

lastValue= val[6]

sqlContext.sql("use database")

df= sqlContext.read.format("jdbc") \

.option("url", "jdbc:oracle:thin:@//192.168.xxx.xxx:1521/ORCLPDB") \

.option("driver", "oracle.jdbc.driver.OracleDriver") \

.option("dbtable", "(select * from %s where to_char(%s, 'yyyy-MM-dd')>'%s')" % (tableName, checkColumn, lastValue)) \ #这里是关键,直接查询出新增的数据,这样后面的速度才能提升,否则要对整个表的dataframe进行操作,慢死了,千万不要相信dataframe的filter,where这些东西,4万多条数据要查3分钟!!!

.option("user", "xxx").option("password", "xxx") \

.load()

def  max(a, b):

if a>b:

return a

else:

return btry:#获取到新增字段的最大值!!!(这块也困了我好久)这里使用的是python的reduce函数,调用的max方法

nowLastValue =df.rdd.reduce(max)[checkColumn]

df.registerTempTable("temp")#写入内容

saveSql = "insert into table student select * from temp"sqlContext.sql(saveSql)#更新mysql表,使lastValue是表最新值

updataJobSql = "UPDATE job SET last_value='%s' WHERE table_name='%s'" %(nowLastValue, tableName)ifconMysqlDB_exec(updataJobSql):print("---->SUCCESS: incremental import success")exceptValueError:print("---->INFO: No new data added!")except:print("---->ERROR: other error")

4、解决导入数据换行符问题

有时候oracle中的数据中会存在换行符(" \n ")然而hive1.1.0中数据换行默认识别的也是\n,最坑的是还不能对它进行修改(目前我没有查出修改的方法,大家要是有办法欢迎在评论区讨论)

那我只能对数据进行处理了,以前使用sqoop的时候也有这个问题,所幸sqoop有解决换行符的语句,,,,巴拉巴拉,,,扯远了

解决换行符需要dataframe的map方法,然后使用lambda表达式进行replace,总结好就是下面的代码(第3行)

解释:这是个for循环里面加if else 判断,整个需要用  [ ]  包起来,没错这是个list ,如果不包就报错,lambda x 获取到的是表中一行行的数据,for循环对每一行进行遍历,然后对一行中每个字段

进行判断,是否是unicode或者str类型,(一般只有是这两个类型才存在换行符)如果是则进行replace处理,否则不做处理。

转化好之后这是个rdd类型的数据,需要转化为dataframe类型才能写入hive

1 #df自带获取schema的方法,不要学我去拼凑出来(

分类:

技术点:

spark

By © 2017 likecs 版权所有.

粤ICP备12038626号-2

Powered By WordPress . Theme by Luju

sqoop增量导入hive_使用pyspark模仿sqoop从oracle导数据到hive的主要功能(自动建表,分区导入,增量,解决数据换行符问题)...相关推荐

  1. sqoop动态分区导入mysql,sqoop 导入数据到hive分区表(外表,内表) 指定分区 指定数据库 指定表...

    sqoop 导入数据到hive 1.1.导入数据到hive分区表(内表),指定分区 创建hive分区表 –hive-database 指定数据库 –table 指定表 –hive-overwrite ...

  2. Hive建表以及导入数据

    目录 一:内部表和外部表 1:外部表 2:外部表 3:外部表和内部表区别 二:上传数据方式 一:内部表和外部表 1:外部表 内部表基础建表语句一:(默认指定文件类型为TextFile,HDFS路径为/ ...

  3. hive:建库建表、表分区、内部表外部表、数据导入导出

    hive建库建表与数据导入 建库 hive中有一个默认的库: 库名: default 库目录:hdfs://hdp20-01:9000/user/hive/warehouse 新建库: create  ...

  4. 12.1.2、Doris__基本使用、doris的基本命令、建表概念、语句、建表语法、建表方式(引擎存储规则)、导入数据的方式、支持的数据类型、rollup索引

    1.Doris使用(类似mysql的操作命令) 1)自带root用户 进入Doris集群:mysql -uroot -h node1 -P 9030 -p (回车输入密码) 创建用户(普通用户):cr ...

  5. Shell脚本导出导入MySQL建表语句

    Shell脚本导出导入MySQL建表语句 一.导出sql语句 1.需求: 2.shell脚本如下: 二.导入sql语句 1.需求 2.shell实现1 3.shell实现2 一.导出sql语句 1.需 ...

  6. hive向mysql导数据_Mysql Hive 导入导出数据

    ---王燕行转列sql select split(concat_ws(',',collect_set(cast(smzq as string))),',')[1] ,split(concat_ws(' ...

  7. 5.hive建库建表与数据导入

    建库 hive中有一个默认的库: 库名: default 库目录:hdfs://hdp20-01:9000/user/hive/warehouse 新建库: create database db_or ...

  8. sqoop增量导入hive_Sqoop 增量导MySQL数据 至Hive

    通过Sqoop 增量导数据到Hive, 命令如下: ./sqoop-job --meta-connect jdbc:hsqldb:hsql://127.0.0.1:16000/sqoop --crea ...

  9. Ambari2.7.4+HDP3.1.4下sqoop增量导入只支持append模式,mysql直接进入hive的lastmodified的不支持。下面是增量的命令。

    1. 创建mysql表,并创建初始化数据 grant all privileges on *.* to 'root'@'%' identified by 'xxxxxxxxx' with grant ...

最新文章

  1. Halcon算子翻译——dev_set_line_width
  2. TIOBE 9 月榜单:C#上涨1.18,Java 同比下滑3.18
  3. Pytorch环境安装【Python3.7+Anaconda3+CUDA10.1】
  4. AndroidStudio_安卓原生开发_自定义服务器Token验证_MD5加密方法---Android原生开发工作笔记156
  5. 将Visual Studio打造成为Node.js IDE
  6. 职高计算机基础知识大全,职高计算机基本教学大纲.doc
  7. 第26次ccf认证第二题:寻宝!大冒险!
  8. Sketch2AE插件(Sketch文件导入AE)最新破解版
  9. 启动器Android标准,【转】各款安卓启动器评测(之我见)
  10. 黑群晖二合一已损毁_手动修复黑群晖已损毁磁盘空间
  11. 如何制作U盘启动盘并且安装系统(保姆级教学)
  12. 爬虫案例——模拟登录QQ空间
  13. 中山大学新华学院计算机,中山大学新华学院信息科学学院电子信息科学与技术、计算机科学与技术、软件...
  14. 解决Vmware 16安装Windows7后安装VMware tools选项为灰色及无法成功安装问题
  15. 手动清除2345流氓主页小记录以及对过去的一些回忆
  16. 计算机无线网卡,电脑如何无线上网 电脑无线网卡买什么好
  17. Python日记——柿子要捡软的捏,记第一只小爬虫
  18. 利用matlab从TXT中读数据1
  19. Python量化学习笔记02——量化投资——以Python为工具 Part01-C02
  20. 校招面试真题 | 你的期望薪资是多少?为什么

热门文章

  1. Stave——让Fiddler拥有路径映射功能。
  2. 关于华为蓝牙耳机连不上计算机的问题
  3. cad怎么转换成pdf格式?分享三种途径
  4. DFRobotGravity: 模拟甲烷气体传感器 (MQ4)工作原理
  5. 传智播客JAVA培训20100524SPRING SECURITY
  6. 这家公司的市值被严重低估,不信半年之后我们再看
  7. 【Linux逻辑卷管理】之pvcreate、pvdisplay和pvremove
  8. 快递小哥5年赚260万:哪有什么天生​牛逼,不过是笨笨地熬
  9. 系统IIS安装卡住不动了卡死解决方法
  10. Idea一键清除所有断点