python读取oracle数据到hvie parquet_关于sparksql操作hive,读取本地csv文件并以parquet的形式装入hive中...
说明:spark版本:2.2.0
hive版本:1.2.1
需求: 有本地csv格式的一个文件,格式为${当天日期}visit.txt,例如20180707visit.txt,现在需要将其通过spark-sql程序实现将该文件读取并以parquet的格式通过外部表的形式保存到hive中,最终要实现通过传参的形式,将该日期区间内的csv文件批量加载进去,方式有两种:
1、之传入一个参数,说明只加载一天的数据进去
2、传入两个参数,批量加载这两个日期区间的每一天的数据
最终打成jar包,进行运行
步骤如下:
1、初始化配置,先创建sparkSession(spark2.0版本开始将sqlContext、hiveContext同意整合为sparkSession)
//初始化配置
val spark = new sql.SparkSession
.Builder()
.enableHiveSupport() //操作hive这一步千万不能少
.appName("project_1")
.master("local[2]")
.getOrCreate()
2、先将文件读进来,并转换为DF
val data = spark.read.option("inferSchema", "true").option("header", "false") //这里设置是否处理头信息,false代表不处理,也就是说文件的第一行也会被加载进来,如果设置为true,那么加载进来的数据中不包含第一行,第一行被当作了头信息,也就是表中的字段名处理了
.csv(s"file:///home/spark/file/project/${i}visit.txt") //这里设置读取的文件,${i}是我引用的一个变量,如果要在双引号之间引用变量的话,括号前面的那个s不能少
.toDF("mac", "phone_brand", "enter_time", "first_time", "last_time", "region", "screen", "stay_time") //将读进来的数据转换为DF,并为每个字段设置字段名
3、将转换后的DF注册为一张临时表
data.createTempView(s"table_${i}")
4、通过spark-sql创建hive外部表,这里有坑
spark.sql(
s"""
|create external table if not exists ${i}visit
|(mac string, phone_brand string, enter_time timestamp, first_time timestamp, last_time timestamp,
|region string, screen string, stay_time int) stored as parquet
|location 'hdfs://master:9000/project_dest/${i}'
""".stripMargin)
这里的见表语句需要特别注意,如果写成如下的方式是错误的:
spark.sql(
s"""
|create external table if not exists ${i}visit
|(mac string, phone_brand string, enter_time timestamp, first_time timestamp, last_time timestamp,
|region string, screen string, stay_time int) row format delimited fields terminated by '\t' stored as parquet
|location /project_dest/${i}'
""".stripMargin)
(1)对于row format delimited fields terminated by '\t'这语句只支持存储文件格式为textFile,对于parquet文件格式不支持
(2)对于location这里,一定要写hdfs的全路径,如果向上面这样写,系统不认识,切记
5、通过spark-sql执行insert语句,将数据插入到hive表中
spark.sql(s"insert overwrite table ${i}visit select * from table_${i}".stripMargin)
至此,即完成了将本地数据以parquet的形式加载至hive表中了,接下来既可以到hive表中进行查看数据是否成功载入
贴一下完整代码:
package _sql.project_1
import org.apache.spark.sql
/**
* Author Mr. Guo
* Create 2018/9/4 - 9:04
* ┌───┐ ┌───┬───┬───┬───┐ ┌───┬───┬───┬───┐ ┌───┬───┬───┬───┐ ┌───┬───┬───┐
* │Esc│ │ F1│ F2│ F3│ F4│ │ F5│ F6│ F7│ F8│ │ F9│F10│F11│F12│ │P/S│S L│P/B│ ┌┐ ┌┐ ┌┐
* └───┘ └───┴───┴───┴───┘ └───┴───┴───┴───┘ └───┴───┴───┴───┘ └───┴───┴───┘ └┘ └┘ └┘
* ┌───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───────┐ ┌───┬───┬───┐ ┌───┬───┬───┬───┐
* │~ `│! 1│@ 2│# 3│$ 4│% 5│^ 6│& 7│* 8│( 9│) 0│_ -│+ =│ BacSp │ │Ins│Hom│PUp│ │N L│ / │ * │ - │
* ├───┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─────┤ ├───┼───┼───┤ ├───┼───┼───┼───┤
* │ Tab │ Q │ W │ E │ R │ T │ Y │ U │ I │ O │ P │{ [│} ]│ | \ │ │Del│End│PDn│ │ 7 │ 8 │ 9 │ │
* ├─────┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴─────┤ └───┴───┴───┘ ├───┼───┼───┤ + │
* │ Caps │ A │ S │ D │ F │ G │ H │ J │ K │ L │: ;│" '│ Enter │ │ 4 │ 5 │ 6 │ │
* ├──────┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴────────┤ ┌───┐ ├───┼───┼───┼───┤
* │ Shift │ Z │ X │ C │ V │ B │ N │ M │< ,│> .│? /│ Shift │ │ ↑ │ │ 1 │ 2 │ 3 │ │
* ├─────┬──┴─┬─┴──┬┴───┴───┴───┴───┴───┴──┬┴───┼───┴┬────┬────┤ ┌───┼───┼───┐ ├───┴───┼───┤ E││
* │ Ctrl│ │Alt │ Space │ Alt│ │ │Ctrl│ │ ← │ ↓ │ → │ │ 0 │ . │←─┘│
* └─────┴────┴────┴───────────────────────┴────┴────┴────┴────┘ └───┴───┴───┘ └───────┴───┴───┘
**/
object Spark_Sql_Load_Data_To_Hive {
//初始化配置
val spark = new sql.SparkSession
.Builder()
.enableHiveSupport()
.appName("project_1")
.master("local[2]")
.getOrCreate()
//设置日志的级别
spark.sparkContext.setLogLevel("WARN")
def main(args: Array[String]): Unit = {
try {
if (args.length != 1) {
data_load(args(0).toInt)
} else if (args.length != 2) {
for (i
data_load(i)
}
} else {
System.err.println("Usage: or ")
System.exit(1)
}
}catch {
case ex:Exception => println("Exception")
}finally{
spark.stop()
}
}
def data_load(i:Int): Unit = {
println(s"*******data_${i}********")
val data = spark.read.option("inferSchema", "true").option("header", "false")
.csv(s"file:///home/spark/file/project/${i}visit.txt")
.toDF("mac", "phone_brand", "enter_time", "first_time", "last_time", "region", "screen", "stay_time")
data.createTempView(s"table_${i}")
spark.sql("use project_1".stripMargin)
spark.sql(
s"""
|create external table if not exists ${i}visit
|(mac string, phone_brand string, enter_time timestamp, first_time timestamp, last_time timestamp,
|region string, screen string, stay_time int) stored as parquet
|location 'hdfs://master:9000/project_dest/${i}'
""".stripMargin)
spark
.sql(s"insert overwrite table ${i}visit select * from table_${i}".stripMargin)
}
}
6、打成jar包(我的IDEA版本是2017.3版本)
如果没有上面这一栏,点击View,然后勾选Toolbar即可
点击ok
此时这里会成成这么一个文件,是编译之后的class文件
到这个目录下会找到这么一个jar包
找到该文件夹,上传到服务器,cd到该目录下运行命令:
spark-submit --class spark._sql.project_1.Conn_hive --master spark://master:7077 --executor-memory 2g --num-executors 3 /spark_maven_project.jar 20180901 20180910
python读取oracle数据到hvie parquet_关于sparksql操作hive,读取本地csv文件并以parquet的形式装入hive中...相关推荐
- python读取oracle数据到hvie parquet_创建Hive表来从parquet / avro模式读取parquet文件
We are looking for a solution in order to create an external hive table to read data from parquet fi ...
- oracle数据导入到python,Python导入oracle数据的方法 -电脑资料
作者:Sephiroth 字体:[增加 减小] 类型:转载 这篇文章主要介绍了Python导入oracle数据的方法,涉及Python读取csv文件信息再插入到Oracle数据库的相关技巧,具有一定参 ...
- oracle数据导入到python,Python导入oracle数据的方法
Python导入oracle数据的方法 本文实例讲述了Python导入oracle数据的方法.分享给大家供大家参考.具体如下: import cx_Oracle dns_tns=cx_Oracle.m ...
- JS读取本地CSV文件数据
JS读取本地CSV文件数据 文件中的部分数据如图 需求是需要提取出文件的数据 使用到的模块是 Papa Parse 1. 依赖安装 yarn add papaparse papaparse的基本使用可 ...
- python中csv文件通过什么表示字符_python_写入csv文件时候无法进行原样写入(写入字符串中出现逗号,时候,csv文件自动分成两个单元格)...
问题描述: 写入csv文件时候无法进行原样写入(写入字符串中出现逗号","时候,csv文件自动分成两个单元格) with open("test.csv",&qu ...
- Python读取Oracle数据乱码问题解决
[问题描述] 在使用Python读取Oracle中数据的时候,遇到了中文乱码的问题,简单举例如下: import cx_Oracle import pandas as pdconn = cx_Orac ...
- python读取数据库数据、并保存为docx_Python从数据库读取大量数据批量写入文件的方法...
Python从数据库读取大量数据批量写入文件的方法 使用机器学习训练数据时,如果数据量较大可能我们不能够一次性将数据加载进内存,这时我们需要将数据进行预处理,分批次加载进内存. 下面是代码作用是将数据 ...
- flex java oracle_Flex使用Blazeds与Java交互及自定义对象转换详解-DATAGRID读取ORACLE数据...
一.建立Flex与Java交互的工程. 本文中讲到的交互是利用Blazeds的,因为这个是免费的,呵呵,我是穷人. 首先就是去下载Blazeds的压缩包,这个可以从官网或者CSDN.JavaEye上下 ...
- java读取字节流设置字节数组长度_java读取流数据时,字节缓存数组,第一次读取时,是否读满,才进行下次读取??...
使用缓存字节数组读取java字节流时,第一次读取是,读满缓存字节数组大小,才进行下次读取,还是随机读一个小于数组大小的值,再进行下次读取??? 读取本地文件时,首次读取读满整个字节数组,在进行下次读取 ...
最新文章
- 10行Python代码实现Web自动化管控
- AI总监王长虎被曝离职,字节跳动AI Lab 再失一将!
- Visual Studio 2017为Android APK包签名
- win10,pip更新后,Spyder打不开
- [鸟哥linux视频教程整理]04_02_Linux 权限及权限管理
- scala case class 继承_数字硬件系统设计之一:Scala快速入门(2)
- C++递归斐波那契数列
- 中国多媒体大会(ChinaMM 2020) 征文通知
- OpenCV vs Dlib 人脸检测比较分析
- 鸟哥的私房菜-基础篇学习-文件与目录管理-2-1
- mysql的导入导出命令_mysql导入导出命令
- Qualomm openwrt SDK编译
- 哈希函数-SHA1和SHA256算法
- linux编译安卓源码,Ubuntu下编译Android源码
- 计算机三级要英语词汇,大学英语三级常考词汇
- 区块链技术指2.3 基于区块链的电子货币
- java计算水仙花数_Java 求水仙花数
- 计算机桌面黑屏有鼠标,win7系统启动黑屏只有显示鼠标指针怎么办(图文)
- iOS 设备的屏幕尺寸、分辨率及其屏幕边长比例详细情况
- MapReduce论文阅读记录
热门文章
- Windows Server Core管理之WinRM
- ASP调用web services
- mysql中blog数据_zp blog
- java 如何知道对象是否被修改过_Java 并发编程:AQS 的原子性如何保证
- 时间排序_你懂使用C ++ STL在线性时间内查找未排序数组的中位数吗
- 高职院校计算机基础课程要求,浅谈高职院校计算机的应用基础课程的改革.doc...
- mysql搜索_mysql 几种搜索引擎的比较
- 哈弗f5i潮配置参数_10万元起售的潮人新玩法,哈弗F5国潮版购车手册
- c语言在dos下执行bat文件,应用dos批处理文件经常用到的DOS常用命令
- 链接ftp命令行_windows下最轻便的FTP/SCP文件管理器