文章目录

  • 一、Spark SQL
  • 二、Spark on Hive
  • 三、Hive on Spark
  • 四、Spark读取Parquet文件
  • 五、Spark连接HBase
    • 1.Maven工程添加依赖
    • 2.代码实现

一、Spark SQL

本质上是Spark SQL引擎+Spark RDD引擎。

  • RDD(Resilient Distribute Dataset),弹性分布式数据集。

    Resilient:RDD默认是存放于内存中,当内存不足时会自动写入磁盘。

    Distributed:RDD是将数据拆分为多个分区的集合,存储在集群的工作节点上的内存和磁盘中。

    Dataset:RDD只是用于做数据转换的接口,并不真正存储数据,指向的是对数据和操作的描述和记录。

    Lineage:RDD可以根据相互之间的血缘关系(DAG有向无环图)从失败节点中恢复分区数据。

  • 整个Spark生态群的底层计算引擎是基于RDD的。而通常狭义理解的Spark RDD计算引擎,是指RDD底层生成DAG执行计划,基于DAG生成详细的Executor和更细粒度的多线程池模型来减少task启动开销。

二、Spark on Hive

Hive只作为存储角色,Spark负责sql解析优化、执行。
本质上是Hive SQL+Spark RDD。具体步骤如下:
1. 通过SparkSQL加载Hive的配置文件,获取到Hive的元数据信息;
2. 获取到Hive的元数据信息之后可以拿到Hive表的数据;
3. 通过SparkSQL来操作Hive表中的数据。

Spark集成Hive

  • 首先将Hive的配置文件hive-site.xml拷贝到Spark的配置文件目录下。

      [root@single ~]# cp /opt/software/hadoop/hive110/conf/hive-site.xml  /opt/software/hadoop/spark244/conf
    
  • 将MySQL驱动包拷贝到Spark的jar包目录下。因为Hive的元数据存储在MySQL中,Spark在读取Hive的元数据时需要访问MySQL表,所以需要MySQL驱动。

      [root@single ~]# cp mysql-connector-java-5.1.32.jar  /opt/software/hadoop/spark244/jars/
    
  • 启动Spark,注意这里需要通过–jars指定MySQL驱动文件。

      [root@single ~]# spark-shell --jars /opt/software/hadoop/spark244/jars/mysql-connector-java-5.1.32.jar
    
  • 访问Hive,注意这里需要先启动Hive服务。

      scala>spark.sql("show databases").show()
    

Spark连接Hive

  • Maven工程添加依赖
<!-- spark-hive -->
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.4.4</version>
</dependency><!-- mysql-connector-java -->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.31</version>
</dependency>
  • 将hive-site.xml文件复制到rescourse资源文件夹内并将元数据地址补全。

  • 创建工程连接Hive
object ConnectHive {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[4]").appName(this.getClass.getName).enableHiveSupport()//开启Hive支持.getOrCreate()//读取Hive中数据表,如报异常则在表名前面加上数据库名val df = spark.sql("select * from demo.shopping")df.show()}
}

Spark连接Mysql

object ConnectMysql {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[4]").appName(this.getClass.getName).getOrCreate()//设置要访问的mysql的url,表名val url = "jdbc:mysql://192.168.72.130:3306/mysqltest"val tablename ="student"val props = new Properties()//设置要访问的mysql的用户名,密码,数据库驱动类props.setProperty("user","root")props.setProperty("password","kb10")props.setProperty("driver","com.mysql.jdbc.Driver")//通过spark.read.jdbc方法读取mysql中数据val df = spark.read.jdbc(url,tablename,props)df.show()}
}

三、Hive on Spark

Hive既作为存储又负责sql的解析优化,Spark负责执行。
Hive的计算引擎是MR,因为shuffle过程中产生大量的小文件,从而导致频繁的磁盘I/O,效率低而且占用大量的磁盘资源。Hive on Spark将MR引擎转换成Spark RDD引擎,即将Hive SQL从MapReduce操作翻译成Spark RDD操作,并运行在Spark集群上。本质上还是Hive SQL+SparkRDD,但相较于Spark on Hive,这个实现较为麻烦,必须要重新编译Spark并导入相关jar包。目前大部分使用Spark on Hive。
Hive的元数据(metadata)建立了一种映射关系。执行HQL时,先到MySQL元数据库中查找描述信息,然后根据描述信息生成任务,并将任务下发到Spark集群中执行。Hive on Spark用的仅仅是Hive的标准和规范,没有Hive数据库一样可以使用。要使用Hive的标准需要将Hive的配置文件拷贝到Spark的conf目录下。没有安装Hive组件也没有影响。

四、Spark读取Parquet文件

Parquet文件是一种流行的列式存储格式,以二进制存储,文件中包含数据与元数据。

import org.apache.spark.sql.{Row, SparkSession}object readParquet {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]").appName(this.getClass.getName).getOrCreate()//通过SparkSession创建SparkContextval sc = spark.sparkContextimport org.apache.spark.sql.types.{StructType, StructField, StringType,ArrayType,IntegerType}val schema=StructType(Array(StructField("name",StringType),StructField("favorite_color",StringType),StructField("favorite_numbers",ArrayType(IntegerType))))val rdd=sc.parallelize(List(("Alyssa",null,Array(3,9,15,20)),("Ben","red",null)))val row=rdd.map(p=>Row(p._1,p._2,p._3))//转换为RDD[Row]val df=spark.createDataFrame(row,schema)//转换为DataFramedf.write.parquet("/data/users")  //在该目录下生成parquet文件//Spark SQL读parquet文件val df1=spark.read.parquet("/data/users") //读取该目录下的parquet文件df1.showdf1.printSchema}
}
+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|            null|
+------+--------------+----------------+root|-- name: string (nullable = true)|-- favorite_color: string (nullable = true)|-- favorite_numbers: array (nullable = true)|    |-- element: integer (containsNull = true)

五、Spark连接HBase

1.Maven工程添加依赖

<!-- spark-hbase -->
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version>
</dependency>
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version>
</dependency>

2.代码实现

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSessionobject Scala_Hbase {def main(args: Array[String]): Unit = {val conf = HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum","192.168.182.131")conf.set("hbase.zookeeper.property.clientPort","2181")conf.set(TableInputFormat.INPUT_TABLE,"customer")val spark = SparkSession.builder().appName("HBaseTest").master("local[2]").getOrCreate()val sc= spark.sparkContextval rdd1= sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])println("count="+rdd1.count())import spark.implicits._//遍历输出rdd1.foreach({case (_,result) =>//通过result.getRow来获取行键val key = Bytes.toString(result.getRow)//通过result.getValue("列簇","列名")来获取值//需要使用getBytes将字符流转化为字节流val city = Bytes.toString(result.getValue("addr".getBytes,"city".getBytes))val country = Bytes.toString(result.getValue("addr".getBytes,"country".getBytes))val age = Bytes.toString(result.getValue("order".getBytes,"age".getBytes))println("Row key:"+key+" city:"+city+" country:"+country+" age:"+age)})}
}

Spark操作外部数据源(RDBMS,Hive,HBase,Parquet)相关推荐

  1. Spark SQL操作外部数据源

    目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...

  2. 2021年大数据Spark(二十):Spark Core外部数据源引入

    目录 外部数据源 MySQL 数据源 演示代码 HBase 数据源 HBase Sink ​​​​​​​HBase Source 外部数据源 Spark可以从外部存储系统读取数据,比如RDBMs表中或 ...

  3. Spark SQL 外部数据源

    一.简介 1.1 多数据源支持 Spark 支持以下六个核心数据源,同时 Spark 社区还提供了多达上百种数据源的读取方式,能够满足绝大部分使用场景. CSV JSON Parquet ORC JD ...

  4. Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)

    目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...

  5. Spark SQL External DataSource外部数据源

    一:介绍 官网:https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html 随着Spark1.2的发布 ...

  6. Spark SQL连接外部数据源

    一.Spark SQL支持的外部数据源 Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现Spark SQL可以 加载任何地方的数据,例如mysql,hive,hdfs,hbase ...

  7. Spark SQL之External DataSource外部数据源(二)源代码分析

    上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External Da ...

  8. hive load data外部表报错_生产SparkSQL如何读写本地外部数据源及排错

    https://spark-packages.org/里有很多third-party数据源的package,spark把包加载进来就可以使用了 csv格式在spark2.0版本之后是内置的,2.0之前 ...

  9. Spark SQL External DataSource外部数据源操作流程

    一:获取文件 官网:https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html spark本身 有测试 ...

最新文章

  1. 运用jieba库分词
  2. codevs——1220 数字三角形(棋盘DP)
  3. docker安装_以简便的方式监控Docker容器中的ADF应用程序
  4. 2018.3.24 struct
  5. 服务器中有两个R文件夹,一台服务器中配置多个git sshkey
  6. linux路由器实际配置案例
  7. 【论文解读】使用Lattice LSTM的中文NER
  8. 手机连接USB通过宽带免费上网
  9. 有理数python_1034 有理数四则运算 (20分)(Python)
  10. java中cleanup的使用_【Lombok注解】@Cleanup 自动资源管理:安全无困扰地调用close方法...
  11. 瑞利商(Rayleigh Quotient)及瑞利定理(Rayleigh-Ritz theorem)的证明
  12. 大榕树BASIS QQ群
  13. java.net.SocketException和错误:org.apache.ftpserver.FtpServerConfigurationException
  14. 浪潮刀片服务器型号,浪潮刀片服务器 NF600 Center
  15. 分享毕业后在北京租房的经验
  16. 无需安装的Linux Live CD--第二篇:实战Knoppix
  17. 名校硕博生已经卷到小县城了?浙江山区基层新招岗位95%是硕博,来自上交复旦国科大等...
  18. FPGA和硬件描述语言HDL(如Verilog)简介
  19. 40 个信息丰富且有趣的 CSS 404 错误页面示例
  20. 数据结构与算法 - 链表(java)

热门文章

  1. 高级程序员装逼指南,是高级哦!(转)
  2. matlab识别黄色车牌,在网上下载了一个matlab的车牌识别,怎么将识别黄色车牌改成识别蓝色车牌,大神求解...
  3. Winform记住密码功能
  4. 听了一天的队歌,Liverpool F.C.的最棒
  5. C6678 SRIO
  6. SQL时间函数应用(时间、季度、旬、月、星期)
  7. canvas绘制圆形马赛克方法二
  8. can‘t convert np.ndarray of type numpy.object_
  9. [2201]:熊猫阿波的故事
  10. 在线答题APP动工前的小总结