hive load data外部表报错_生产SparkSQL如何读写本地外部数据源及排错
https://spark-packages.org/里有很多third-party数据源的package,spark把包加载进来就可以使用了
csv格式在spark2.0版本之后是内置的,2.0之前属于第三方数据源
一、读取本地外部数据源
1.直接读取一个json文件
[hadoop@hadoop000 bin]$ ./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27.jar scala> spark.read.load("file:///home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").show
运行报错:
Caused by: java.lang.RuntimeException: file:/home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 57, 125, 10] at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:445) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:519) ... 32 more
查看load方法的源码:
/** * Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by * a local or distributed file system). * * @since 1.4.0 */ def load(path: String): DataFrame = {option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)` }---------------------------------------------------------/** * Loads input in as a `DataFrame`, for data sources that support multiple paths. * Only works if the source is a HadoopFsRelationProvider. * * @since 1.6.0 */ @scala.annotation.varargsdef load(paths: String*): DataFrame = {if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) { throw new AnalysisException("Hive data source can only be used with tables, you can not " +"read files of Hive data source directly.") } val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)if (classOf[DataSourceV2].isAssignableFrom(cls)) { val ds = cls.newInstance() val options = new DataSourceOptions((extraOptions ++ DataSourceV2Utils.extractSessionConfigs( ds = ds.asInstanceOf[DataSourceV2], conf = sparkSession.sessionState.conf)).asJava) // Streaming also uses the data source V2 API. So it may be that the data source implements // v2, but has no v2 implementation for batch reads. In that case, we fall back to loading // the dataframe as a v1 source. val reader = (ds, userSpecifiedSchema) match {case (ds: ReadSupportWithSchema, Some(schema)) => ds.createReader(schema, options)case (ds: ReadSupport, None) => ds.createReader(options)case (ds: ReadSupportWithSchema, None) => throw new AnalysisException(s"A schema needs to be specified when using $ds.")case (ds: ReadSupport, Some(schema)) => val reader = ds.createReader(options)if (reader.readSchema() != schema) { throw new AnalysisException(s"$ds does not allow user-specified schemas.") } readercase _ => null // fall back to v1 }if (reader == null) { loadV1Source(paths: _*) } else { Dataset.ofRows(sparkSession, DataSourceV2Relation(reader)) } } else { loadV1Source(paths: _*) } }private def loadV1Source(paths: String*) = { // Code path for data source v1. sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession, paths = paths, userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap).resolveRelation()) }------------------------------------------------------private var source: String = sparkSession.sessionState.conf.defaultDataSourceName-------------------------------------------------------def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)--------------------------------------------------------// This is used to set the default data source val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default") .doc("The default data source to use in input/output.") .stringConf .createWithDefault("parquet")
从源码中可以看出,如果不指定format,load默认读取的是parquet文件
scala> val users = spark.read.load("file:///home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")scala> users.show()+------+--------------+----------------+ | name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa| null| [3, 9, 15, 20]|| Ben| red| []|+------+--------------+----------------+
读取其他格式的文件,必须通过format指定文件格式,如下:
//windows idea环境下val df1 = spark.read.format("json").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").load("hdfs://192.168.137.141:9000/data/people.json")df1.show()+----+-------+| age| name|+----+-------+|null|Michael|| 30| Andy|| 19| Justin|+----+-------+
option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")必须带上,不然报错
Exception in thread "main" java.lang.IllegalArgumentException: Illegal pattern component: XXX
2.读取CSV格式文件
//源文件内容如下:[hadoop@hadoop001 ~]$ hadoop fs -text /data/people.csvname;age;jobJorge;30;DeveloperBob;32;Developer
//windows idea环境下val df2 = spark.read.format("csv") .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") .option("sep",";") .option("header","true") //use first line of all files as header .option("inferSchema","true") .load("hdfs://192.168.137.141:9000/data/people.csv")df2.show()df2.printSchema()//输出结果:+-----+---+---------+| name|age| job|+-----+---+---------+|Jorge| 30|Developer|| Bob| 32|Developer|+-----+---+---------+root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- job: string (nullable = true)-----------------------------------------------------------//如果不指定option("sep",";")+------------------+| name;age;job|+------------------+|Jorge;30;Developer|| Bob;32;Developer|+------------------+//如果不指定option("header","true")+-----+---+---------+| _c0|_c1| _c2|+-----+---+---------+| name|age| job||Jorge| 30|Developer|| Bob| 32|Developer|+-----+---+---------+
读取csv格式文件还可以自定义schema
val peopleschema = StructType(Array(StructField("hlwname",StringType,true), StructField("hlwage",IntegerType,true), StructField("hlwjob",StringType,true)))val df2 = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").option("sep",";") .option("header","true") .schema(peopleschema) .load("hdfs://192.168.137.141:9000/data/people.csv") //打印测试 df2.show() df2.printSchema()输出结果:+-------+------+---------+|hlwname|hlwage| hlwjob|+-------+------+---------+| Jorge| 30|Developer|| Bob| 32|Developer|+-------+------+---------+root |-- hlwname: string (nullable = true) |-- hlwage: integer (nullable = true) |-- hlwjob: string (nullable = true)
二、将读取的文件以其他格式写出
//将上文读取的users.parquet以json格式写出scala> users.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/parquet2json/")[hadoop@hadoop000 ~]$ cd /home/hadoop/tmp/parquet2json[hadoop@hadoop000 parquet2json]$ lltotal 4-rw-r--r--. 1 hadoop hadoop 56 Sep 24 10:15 part-00000-dfbd9ba5-598f-4e0c-8e81-df85120333db-c000.json-rw-r--r--. 1 hadoop hadoop 0 Sep 24 10:15 _SUCCESS[hadoop@hadoop000 parquet2json]$ cat part-00000-dfbd9ba5-598f-4e0c-8e81-df85120333db-c000.json {"name":"Alyssa"}{"name":"Ben","favorite_color":"red"}
//将上文读取的people.json以csv格式写出df1.write.format("csv") .mode("overwrite") .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") .save("hdfs://192.168.137.141:9000/data/formatconverttest/")------------------------------------------[hadoop@hadoop001 ~]$ hadoop fs -text /data/formatconverttest/part-00000-6fd65eff-d0d3-43e5-9549-2b11bc3ca9de-c000.csv,Michael30,Andy19,Justin//发现若没有.option("header","true"),写出的csv丢失了首行的age,name信息//若不指定.option("sep",";"),默认逗号为分隔符
此操作的目的在于学会类型转换,生产上最开始进来的数据大多都是text,json等行式存储的文件,一般都要转成ORC,parquet列式存储的文件,加上压缩,能把文件大小减小到10%左右,大幅度减小IO和数据处理量,提高性能
此时如果再执行一次save,路径不变,则会报错:
scala> users.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/parquet2json/")org.apache.spark.sql.AnalysisException: path file:/home/hadoop/tmp/parquet2json already exists.; at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:109) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104).........................................................
可以通过设置savemode来解决这个问题
默认是errorifexists
scala> users.select("name","favorite_color").write.format("json").mode("overwrite").save("file:///home/hadoop/tmp/parquet2json/")
作者:若泽数据—白面葫芦娃92
原文:https://www.jianshu.com/p/6fde69ea56bc
回归原创文章:
若泽数据2018视频集合
Flink生产最佳实践,2018年12月刚出炉
我去过端午、国庆生产项目线下班,你呢?
2019元旦-线下项目第11期圆满结束
大数据生产预警平台项目之文章汇总
学习大数据的路上,别忘了多给自己鼓掌
明年毕业的我,拿了大数据30万的offer!
最全的Flink部署及开发案例
我司Kafka+Flink+MySQL生产完整案例代码
代码 | Spark读取mongoDB数据写入Hive普通表和分区表
我司Spark迁移Hive数据到MongoDB生产案例代码
2019高级班&线下班报名咨询请加
hive load data外部表报错_生产SparkSQL如何读写本地外部数据源及排错相关推荐
- hive load data外部表报错_从0开始学大数据-Hive基础篇
Hive起源于Facebook,是基于 Hadoop HDFS 分布式文件系统的分布式 数据仓库 架构.它为数据仓库的管理提供了许多功能:数据ETL(抽取.转换和加载)工具.数据存储管理和大型数据集的 ...
- ORA-29913,ORA-29400,KUP-00554,KUP-01005,KUP-01007 oracle外部表报错解决记录
@ORA-29913,ORA-29400,KUP-00554,KUP-01005,KUP-01007 oracle外部表报错解决记录TOC 今天新建完oracle外部表,看了语法没错误,看了数据文件和 ...
- mysql load报错_mysql:执行LOAD DATA LOCAL 报错
mysql:执行LOAD DATA LOCAL 报错. 我使用navicat for mysql 连接 mysql服务器执行local 可以正常执行 . show VARIABLES like '% ...
- hive load data出错
今天往hive导数据时发生了个错误 hive> load data inpath '/user/tmp/uids.test_copy_3.txt' into table userfeature. ...
- hive load data inpath 空目录_走近大数据之Hive进阶(一、Hive数据的导入)
一.使用Load语句进行数据的导入 -语法: LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE]INTO TABLE tablename [PARTITIO ...
- hive load data inpath 空目录_Hive内部表 | 每日五分钟学大数据
上一篇说的是外部表,当把EXTERNAL关键字去掉的时候就是内部表了.为什么叫内部表,因为这种表,Hive会(或多或少地)控制着数据的生命周期. 如果你熟悉Hive那你一定知道,Hive默认情况下会将 ...
- hive load data inpath 空目录_hive学习笔记之四:分区表
欢迎访问我的GitHub https://github.com/zq2599/blog_demos 内容:所有原创文章分类和汇总,及配套源码,涉及Java.Docker.Kubernetes.DevO ...
- hive load data inpath ‘‘ overwrite into 坑
load data inpath 'dataDir/dim_url.csv' overwrite into table dim_url partition(day='2021-03') 注意: 1,以 ...
- Hive - Load Data 数据过长或过短
一.引言 Hive 可以通过 load data inpath 加载本地或者 hdfs 的数据到 hive 表中,有时会出现生成数据长于 hive 表字段或者短于 hive 表字段的情况,经过测试,两 ...
最新文章
- Deformable ConvNets--Part2: Spatial Transfomer Networks(STN)
- visudo 普通用户账户 添加root全新
- 基因名2-MAR等错误名字产生原因
- 判断javascript数组的方法
- BZOJ 4553: [Tjoi2016Heoi2016]序列
- sed删除空行,#开头的行,以及第一行
- OS10.11安装Cocoapods并集成ReactiveCocoa
- BeycondCompare3破解绿色版下载
- WebBrowser keystroke
- java怎样调用图像做按钮_swing-Java:使用图像作为按钮
- ansible 使用主机IP批量修改机器名
- [AX]AX2012 C#使用IIS宿主AIF服务的一些问题
- 多线程之线程池复习总结
- 6 EDA技术实用教程【基础知识2】
- windows驱动开发-调试工具traceview使用
- 学习Spring框架这一篇就够了
- 敏捷开发 SCRUM 简介
- 17岁韩寒在CCTV《对话》舌战群吊的视频
- 【组队学习】【37期】组队学习内容详情
- 干货 | 节省55%测试时间,携程酒店比对平台介绍