https://spark-packages.org/里有很多third-party数据源的package,spark把包加载进来就可以使用了

csv格式在spark2.0版本之后是内置的,2.0之前属于第三方数据源

一、读取本地外部数据源

1.直接读取一个json文件

[hadoop@hadoop000 bin]$ ./spark-shell --master local[2] --jars ~/software/mysql-connector-java-5.1.27.jar scala> spark.read.load("file:///home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json").show

运行报错:

Caused by: java.lang.RuntimeException: file:/home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 57, 125, 10]  at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476)  at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:445)  at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)  at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:519)  ... 32 more

查看load方法的源码:

/**   * Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by   * a local or distributed file system).   *   * @since 1.4.0   */  def load(path: String): DataFrame = {option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)`  }---------------------------------------------------------/**   * Loads input in as a `DataFrame`, for data sources that support multiple paths.   * Only works if the source is a HadoopFsRelationProvider.   *   * @since 1.6.0   */  @scala.annotation.varargsdef load(paths: String*): DataFrame = {if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {      throw new AnalysisException("Hive data source can only be used with tables, you can not " +"read files of Hive data source directly.")    }    val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)if (classOf[DataSourceV2].isAssignableFrom(cls)) {      val ds = cls.newInstance()      val options = new DataSourceOptions((extraOptions ++        DataSourceV2Utils.extractSessionConfigs(          ds = ds.asInstanceOf[DataSourceV2],          conf = sparkSession.sessionState.conf)).asJava)      // Streaming also uses the data source V2 API. So it may be that the data source implements      // v2, but has no v2 implementation for batch reads. In that case, we fall back to loading      // the dataframe as a v1 source.      val reader = (ds, userSpecifiedSchema) match {case (ds: ReadSupportWithSchema, Some(schema)) =>          ds.createReader(schema, options)case (ds: ReadSupport, None) =>          ds.createReader(options)case (ds: ReadSupportWithSchema, None) =>          throw new AnalysisException(s"A schema needs to be specified when using $ds.")case (ds: ReadSupport, Some(schema)) =>          val reader = ds.createReader(options)if (reader.readSchema() != schema) {            throw new AnalysisException(s"$ds does not allow user-specified schemas.")          }          readercase _ => null // fall back to v1      }if (reader == null) {        loadV1Source(paths: _*)      } else {        Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))      }    } else {      loadV1Source(paths: _*)    }  }private def loadV1Source(paths: String*) = {    // Code path for data source v1.    sparkSession.baseRelationToDataFrame(      DataSource.apply(        sparkSession,        paths = paths,        userSpecifiedSchema = userSpecifiedSchema,        className = source,        options = extraOptions.toMap).resolveRelation())  }------------------------------------------------------private var source: String = sparkSession.sessionState.conf.defaultDataSourceName-------------------------------------------------------def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)--------------------------------------------------------// This is used to set the default data source  val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default")    .doc("The default data source to use in input/output.")    .stringConf    .createWithDefault("parquet")

从源码中可以看出,如果不指定format,load默认读取的是parquet文件

scala> val users = spark.read.load("file:///home/hadoop/app/spark-2.3.1-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")scala> users.show()+------+--------------+----------------+                                        |  name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa|          null|  [3, 9, 15, 20]||   Ben|           red|              []|+------+--------------+----------------+

读取其他格式的文件,必须通过format指定文件格式,如下:

//windows idea环境下val df1 = spark.read.format("json").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").load("hdfs://192.168.137.141:9000/data/people.json")df1.show()+----+-------+| age|   name|+----+-------+|null|Michael||  30|   Andy||  19| Justin|+----+-------+

option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")必须带上,不然报错

Exception in thread "main" java.lang.IllegalArgumentException: Illegal pattern component: XXX

2.读取CSV格式文件

//源文件内容如下:[hadoop@hadoop001 ~]$ hadoop fs -text /data/people.csvname;age;jobJorge;30;DeveloperBob;32;Developer

//windows idea环境下val df2 = spark.read.format("csv")      .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")      .option("sep",";")      .option("header","true")     //use first line of all files as header      .option("inferSchema","true")      .load("hdfs://192.168.137.141:9000/data/people.csv")df2.show()df2.printSchema()//输出结果:+-----+---+---------+| name|age|      job|+-----+---+---------+|Jorge| 30|Developer||  Bob| 32|Developer|+-----+---+---------+root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- job: string (nullable = true)-----------------------------------------------------------//如果不指定option("sep",";")+------------------+|      name;age;job|+------------------+|Jorge;30;Developer||  Bob;32;Developer|+------------------+//如果不指定option("header","true")+-----+---+---------+|  _c0|_c1|      _c2|+-----+---+---------+| name|age|      job||Jorge| 30|Developer||  Bob| 32|Developer|+-----+---+---------+

读取csv格式文件还可以自定义schema

val peopleschema = StructType(Array(StructField("hlwname",StringType,true), StructField("hlwage",IntegerType,true), StructField("hlwjob",StringType,true)))val df2 = spark.read.format("csv").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").option("sep",";")        .option("header","true")        .schema(peopleschema)        .load("hdfs://192.168.137.141:9000/data/people.csv")      //打印测试      df2.show()      df2.printSchema()输出结果:+-------+------+---------+|hlwname|hlwage|   hlwjob|+-------+------+---------+|  Jorge|    30|Developer||    Bob|    32|Developer|+-------+------+---------+root |-- hlwname: string (nullable = true) |-- hlwage: integer (nullable = true) |-- hlwjob: string (nullable = true)

二、将读取的文件以其他格式写出

//将上文读取的users.parquet以json格式写出scala> users.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/parquet2json/")[hadoop@hadoop000 ~]$ cd /home/hadoop/tmp/parquet2json[hadoop@hadoop000 parquet2json]$ lltotal 4-rw-r--r--. 1 hadoop hadoop 56 Sep 24 10:15 part-00000-dfbd9ba5-598f-4e0c-8e81-df85120333db-c000.json-rw-r--r--. 1 hadoop hadoop  0 Sep 24 10:15 _SUCCESS[hadoop@hadoop000 parquet2json]$ cat part-00000-dfbd9ba5-598f-4e0c-8e81-df85120333db-c000.json {"name":"Alyssa"}{"name":"Ben","favorite_color":"red"}

//将上文读取的people.json以csv格式写出df1.write.format("csv")     .mode("overwrite")     .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")     .save("hdfs://192.168.137.141:9000/data/formatconverttest/")------------------------------------------[hadoop@hadoop001 ~]$ hadoop fs -text /data/formatconverttest/part-00000-6fd65eff-d0d3-43e5-9549-2b11bc3ca9de-c000.csv,Michael30,Andy19,Justin//发现若没有.option("header","true"),写出的csv丢失了首行的age,name信息//若不指定.option("sep",";"),默认逗号为分隔符

此操作的目的在于学会类型转换,生产上最开始进来的数据大多都是text,json等行式存储的文件,一般都要转成ORC,parquet列式存储的文件,加上压缩,能把文件大小减小到10%左右,大幅度减小IO和数据处理量,提高性能
此时如果再执行一次save,路径不变,则会报错:

scala> users.select("name","favorite_color").write.format("json").save("file:///home/hadoop/tmp/parquet2json/")org.apache.spark.sql.AnalysisException: path file:/home/hadoop/tmp/parquet2json already exists.;  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:109)  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104).........................................................

可以通过设置savemode来解决这个问题

默认是errorifexists

scala> users.select("name","favorite_color").write.format("json").mode("overwrite").save("file:///home/hadoop/tmp/parquet2json/")

作者:若泽数据—白面葫芦娃92

原文:https://www.jianshu.com/p/6fde69ea56bc


回归原创文章:

若泽数据2018视频集合

Flink生产最佳实践,2018年12月刚出炉

我去过端午、国庆生产项目线下班,你呢?

2019元旦-线下项目第11期圆满结束

大数据生产预警平台项目之文章汇总

学习大数据的路上,别忘了多给自己鼓掌

明年毕业的我,拿了大数据30万的offer!

最全的Flink部署及开发案例

我司Kafka+Flink+MySQL生产完整案例代码

代码 | Spark读取mongoDB数据写入Hive普通表和分区表

我司Spark迁移Hive数据到MongoDB生产案例代码

2019高级班&线下班报名咨询请加

hive load data外部表报错_生产SparkSQL如何读写本地外部数据源及排错相关推荐

  1. hive load data外部表报错_从0开始学大数据-Hive基础篇

    Hive起源于Facebook,是基于 Hadoop HDFS 分布式文件系统的分布式 数据仓库 架构.它为数据仓库的管理提供了许多功能:数据ETL(抽取.转换和加载)工具.数据存储管理和大型数据集的 ...

  2. ORA-29913,ORA-29400,KUP-00554,KUP-01005,KUP-01007 oracle外部表报错解决记录

    @ORA-29913,ORA-29400,KUP-00554,KUP-01005,KUP-01007 oracle外部表报错解决记录TOC 今天新建完oracle外部表,看了语法没错误,看了数据文件和 ...

  3. mysql load报错_mysql:执行LOAD DATA LOCAL 报错

    mysql:执行LOAD DATA LOCAL 报错. 我使用navicat for mysql 连接 mysql服务器执行local 可以正常执行 . show VARIABLES like  '% ...

  4. hive load data出错

    今天往hive导数据时发生了个错误 hive> load data inpath '/user/tmp/uids.test_copy_3.txt' into table userfeature. ...

  5. hive load data inpath 空目录_走近大数据之Hive进阶(一、Hive数据的导入)

    一.使用Load语句进行数据的导入 -语法: LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE]INTO TABLE tablename [PARTITIO ...

  6. hive load data inpath 空目录_Hive内部表 | 每日五分钟学大数据

    上一篇说的是外部表,当把EXTERNAL关键字去掉的时候就是内部表了.为什么叫内部表,因为这种表,Hive会(或多或少地)控制着数据的生命周期. 如果你熟悉Hive那你一定知道,Hive默认情况下会将 ...

  7. hive load data inpath 空目录_hive学习笔记之四:分区表

    欢迎访问我的GitHub https://github.com/zq2599/blog_demos 内容:所有原创文章分类和汇总,及配套源码,涉及Java.Docker.Kubernetes.DevO ...

  8. hive load data inpath ‘‘ overwrite into 坑

    load data inpath 'dataDir/dim_url.csv' overwrite into table dim_url partition(day='2021-03') 注意: 1,以 ...

  9. Hive - Load Data 数据过长或过短

    一.引言 Hive 可以通过 load data inpath 加载本地或者 hdfs 的数据到 hive 表中,有时会出现生成数据长于 hive 表字段或者短于 hive 表字段的情况,经过测试,两 ...

最新文章

  1. Deformable ConvNets--Part2: Spatial Transfomer Networks(STN)
  2. visudo 普通用户账户 添加root全新
  3. 基因名2-MAR等错误名字产生原因
  4. 判断javascript数组的方法
  5. BZOJ 4553: [Tjoi2016Heoi2016]序列
  6. sed删除空行,#开头的行,以及第一行
  7. OS10.11安装Cocoapods并集成ReactiveCocoa
  8. BeycondCompare3破解绿色版下载
  9. WebBrowser keystroke
  10. java怎样调用图像做按钮_swing-Java:使用图像作为按钮
  11. ansible 使用主机IP批量修改机器名
  12. [AX]AX2012 C#使用IIS宿主AIF服务的一些问题
  13. 多线程之线程池复习总结
  14. 6 EDA技术实用教程【基础知识2】
  15. windows驱动开发-调试工具traceview使用
  16. 学习Spring框架这一篇就够了
  17. 敏捷开发 SCRUM 简介
  18. 17岁韩寒在CCTV《对话》舌战群吊的视频
  19. 【组队学习】【37期】组队学习内容详情
  20. 干货 | 节省55%测试时间,携程酒店比对平台介绍

热门文章

  1. duilib消息事件产生和分发解释
  2. PyQT5 之 Qt Designer 介绍与入门
  3. Spring Data JPA 从入门到精通~@Query详解
  4. 教师要合理使用计算机,教师学习计算机应用基础总结
  5. mysql load data infile 导入数据 某一列 空_Sql数据挑战赛amp;网络销售案例分析
  6. SSD之硬的不能再硬的硬核解析
  7. Docker用法整理
  8. 风机桨叶故障诊断(四) 正负样本准备——从图像中随机扣图
  9. 计算机组成原理201501,计算机组成原理201501.pdf
  10. 微软学术搜索项目 10个版本的历程