Spark操作外部数据源(RDBMS,Hive,HBase,Parquet)
文章目录
- 一、Spark SQL
- 二、Spark on Hive
- 三、Hive on Spark
- 四、Spark读取Parquet文件
- 五、Spark连接HBase
- 1.Maven工程添加依赖
- 2.代码实现
一、Spark SQL
本质上是Spark SQL引擎+Spark RDD引擎。
RDD(Resilient Distribute Dataset),弹性分布式数据集。
Resilient:RDD默认是存放于内存中,当内存不足时会自动写入磁盘。
Distributed:RDD是将数据拆分为多个分区的集合,存储在集群的工作节点上的内存和磁盘中。
Dataset:RDD只是用于做数据转换的接口,并不真正存储数据,指向的是对数据和操作的描述和记录。
Lineage:RDD可以根据相互之间的血缘关系(DAG有向无环图)从失败节点中恢复分区数据。
整个Spark生态群的底层计算引擎是基于RDD的。而通常狭义理解的Spark RDD计算引擎,是指RDD底层生成DAG执行计划,基于DAG生成详细的Executor和更细粒度的多线程池模型来减少task启动开销。
二、Spark on Hive
Hive只作为存储角色,Spark负责sql解析优化、执行。
本质上是Hive SQL+Spark RDD。具体步骤如下:
1. 通过SparkSQL加载Hive的配置文件,获取到Hive的元数据信息;
2. 获取到Hive的元数据信息之后可以拿到Hive表的数据;
3. 通过SparkSQL来操作Hive表中的数据。
Spark集成Hive
首先将Hive的配置文件hive-site.xml拷贝到Spark的配置文件目录下。
[root@single ~]# cp /opt/software/hadoop/hive110/conf/hive-site.xml /opt/software/hadoop/spark244/conf
将MySQL驱动包拷贝到Spark的jar包目录下。因为Hive的元数据存储在MySQL中,Spark在读取Hive的元数据时需要访问MySQL表,所以需要MySQL驱动。
[root@single ~]# cp mysql-connector-java-5.1.32.jar /opt/software/hadoop/spark244/jars/
启动Spark,注意这里需要通过–jars指定MySQL驱动文件。
[root@single ~]# spark-shell --jars /opt/software/hadoop/spark244/jars/mysql-connector-java-5.1.32.jar
访问Hive,注意这里需要先启动Hive服务。
scala>spark.sql("show databases").show()
Spark连接Hive
- Maven工程添加依赖
<!-- spark-hive -->
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.4.4</version>
</dependency><!-- mysql-connector-java -->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.31</version>
</dependency>
- 将hive-site.xml文件复制到rescourse资源文件夹内并将元数据地址补全。
- 创建工程连接Hive
object ConnectHive {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[4]").appName(this.getClass.getName).enableHiveSupport()//开启Hive支持.getOrCreate()//读取Hive中数据表,如报异常则在表名前面加上数据库名val df = spark.sql("select * from demo.shopping")df.show()}
}
Spark连接Mysql
object ConnectMysql {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[4]").appName(this.getClass.getName).getOrCreate()//设置要访问的mysql的url,表名val url = "jdbc:mysql://192.168.72.130:3306/mysqltest"val tablename ="student"val props = new Properties()//设置要访问的mysql的用户名,密码,数据库驱动类props.setProperty("user","root")props.setProperty("password","kb10")props.setProperty("driver","com.mysql.jdbc.Driver")//通过spark.read.jdbc方法读取mysql中数据val df = spark.read.jdbc(url,tablename,props)df.show()}
}
三、Hive on Spark
Hive既作为存储又负责sql的解析优化,Spark负责执行。
Hive的计算引擎是MR,因为shuffle过程中产生大量的小文件,从而导致频繁的磁盘I/O,效率低而且占用大量的磁盘资源。Hive on Spark将MR引擎转换成Spark RDD引擎,即将Hive SQL从MapReduce操作翻译成Spark RDD操作,并运行在Spark集群上。本质上还是Hive SQL+SparkRDD,但相较于Spark on Hive,这个实现较为麻烦,必须要重新编译Spark并导入相关jar包。目前大部分使用Spark on Hive。
Hive的元数据(metadata)建立了一种映射关系。执行HQL时,先到MySQL元数据库中查找描述信息,然后根据描述信息生成任务,并将任务下发到Spark集群中执行。Hive on Spark用的仅仅是Hive的标准和规范,没有Hive数据库一样可以使用。要使用Hive的标准需要将Hive的配置文件拷贝到Spark的conf目录下。没有安装Hive组件也没有影响。
四、Spark读取Parquet文件
Parquet文件是一种流行的列式存储格式,以二进制存储,文件中包含数据与元数据。
import org.apache.spark.sql.{Row, SparkSession}object readParquet {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[4]").appName(this.getClass.getName).getOrCreate()//通过SparkSession创建SparkContextval sc = spark.sparkContextimport org.apache.spark.sql.types.{StructType, StructField, StringType,ArrayType,IntegerType}val schema=StructType(Array(StructField("name",StringType),StructField("favorite_color",StringType),StructField("favorite_numbers",ArrayType(IntegerType))))val rdd=sc.parallelize(List(("Alyssa",null,Array(3,9,15,20)),("Ben","red",null)))val row=rdd.map(p=>Row(p._1,p._2,p._3))//转换为RDD[Row]val df=spark.createDataFrame(row,schema)//转换为DataFramedf.write.parquet("/data/users") //在该目录下生成parquet文件//Spark SQL读parquet文件val df1=spark.read.parquet("/data/users") //读取该目录下的parquet文件df1.showdf1.printSchema}
}
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| null|
+------+--------------+----------------+root|-- name: string (nullable = true)|-- favorite_color: string (nullable = true)|-- favorite_numbers: array (nullable = true)| |-- element: integer (containsNull = true)
五、Spark连接HBase
1.Maven工程添加依赖
<!-- spark-hbase -->
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version>
</dependency>
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version>
</dependency>
2.代码实现
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSessionobject Scala_Hbase {def main(args: Array[String]): Unit = {val conf = HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum","192.168.182.131")conf.set("hbase.zookeeper.property.clientPort","2181")conf.set(TableInputFormat.INPUT_TABLE,"customer")val spark = SparkSession.builder().appName("HBaseTest").master("local[2]").getOrCreate()val sc= spark.sparkContextval rdd1= sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])println("count="+rdd1.count())import spark.implicits._//遍历输出rdd1.foreach({case (_,result) =>//通过result.getRow来获取行键val key = Bytes.toString(result.getRow)//通过result.getValue("列簇","列名")来获取值//需要使用getBytes将字符流转化为字节流val city = Bytes.toString(result.getValue("addr".getBytes,"city".getBytes))val country = Bytes.toString(result.getValue("addr".getBytes,"country".getBytes))val age = Bytes.toString(result.getValue("order".getBytes,"age".getBytes))println("Row key:"+key+" city:"+city+" country:"+country+" age:"+age)})}
}
Spark操作外部数据源(RDBMS,Hive,HBase,Parquet)相关推荐
- Spark SQL操作外部数据源
目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...
- 2021年大数据Spark(二十):Spark Core外部数据源引入
目录 外部数据源 MySQL 数据源 演示代码 HBase 数据源 HBase Sink HBase Source 外部数据源 Spark可以从外部存储系统读取数据,比如RDBMs表中或 ...
- Spark SQL 外部数据源
一.简介 1.1 多数据源支持 Spark 支持以下六个核心数据源,同时 Spark 社区还提供了多达上百种数据源的读取方式,能够满足绝大部分使用场景. CSV JSON Parquet ORC JD ...
- Spark SQL与外部数据源的操作(Spark SQL ——> CSV/JSON/Parquet/hive/mysql)
目录 一.Spark SQL支持的外部数据源 二.Spark SQL -> CSV 2.1 读CSV文件 a.有列名 b.无列名 2.2 写CSV文件 三.Spark SQL -> JSO ...
- Spark SQL External DataSource外部数据源
一:介绍 官网:https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html 随着Spark1.2的发布 ...
- Spark SQL连接外部数据源
一.Spark SQL支持的外部数据源 Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现Spark SQL可以 加载任何地方的数据,例如mysql,hive,hdfs,hbase ...
- Spark SQL之External DataSource外部数据源(二)源代码分析
上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External Da ...
- hive load data外部表报错_生产SparkSQL如何读写本地外部数据源及排错
https://spark-packages.org/里有很多third-party数据源的package,spark把包加载进来就可以使用了 csv格式在spark2.0版本之后是内置的,2.0之前 ...
- Spark SQL External DataSource外部数据源操作流程
一:获取文件 官网:https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html spark本身 有测试 ...
最新文章
- 运用jieba库分词
- codevs——1220 数字三角形(棋盘DP)
- docker安装_以简便的方式监控Docker容器中的ADF应用程序
- 2018.3.24 struct
- 服务器中有两个R文件夹,一台服务器中配置多个git sshkey
- linux路由器实际配置案例
- 【论文解读】使用Lattice LSTM的中文NER
- 手机连接USB通过宽带免费上网
- 有理数python_1034 有理数四则运算 (20分)(Python)
- java中cleanup的使用_【Lombok注解】@Cleanup 自动资源管理:安全无困扰地调用close方法...
- 瑞利商(Rayleigh Quotient)及瑞利定理(Rayleigh-Ritz theorem)的证明
- 大榕树BASIS QQ群
- java.net.SocketException和错误:org.apache.ftpserver.FtpServerConfigurationException
- 浪潮刀片服务器型号,浪潮刀片服务器 NF600 Center
- 分享毕业后在北京租房的经验
- 无需安装的Linux Live CD--第二篇:实战Knoppix
- 名校硕博生已经卷到小县城了?浙江山区基层新招岗位95%是硕博,来自上交复旦国科大等...
- FPGA和硬件描述语言HDL(如Verilog)简介
- 40 个信息丰富且有趣的 CSS 404 错误页面示例
- 数据结构与算法 - 链表(java)
热门文章
- 高级程序员装逼指南,是高级哦!(转)
- matlab识别黄色车牌,在网上下载了一个matlab的车牌识别,怎么将识别黄色车牌改成识别蓝色车牌,大神求解...
- Winform记住密码功能
- 听了一天的队歌,Liverpool F.C.的最棒
- C6678 SRIO
- SQL时间函数应用(时间、季度、旬、月、星期)
- canvas绘制圆形马赛克方法二
- can‘t convert np.ndarray of type numpy.object_
- [2201]:熊猫阿波的故事
- 在线答题APP动工前的小总结