Spark SQL 快速入门系列(六)Spark SQL 访问 JDBC
这里写目录标题
- JDBC
- 准备 MySQL 环境
- 使用 SparkSQL 向 MySQL 中写入数据
- 从 MySQL 中读取数据
JDBC
导读
1,通过 SQL 操作 MySQL 的表
2,将数据写入 MySQL 的表中
准备 MySQL 环境
在使用 SparkSQL 访问 MySQL 之前, 要对 MySQL 进行一些操作, 例如说创建用户, 表和库等
Step 1: 连接 MySQL 数据库
在 MySQL 所在的主机上执行如下命令
mysql -uroot -p
Step 2: 创建 Spark 使用的用户
登进 MySQL 后, 需要先创建用户
CREATE USER 'spark'@'%' IDENTIFIED BY 'Spark123!';
GRANT ALL ON spark_test.* TO 'spark'@'%';
Step 3: 创建库和表
CREATE DATABASE spark_test;USE spark_test;CREATE TABLE IF NOT EXISTS `student`(
`id` INT AUTO_INCREMENT,
`name` VARCHAR(100) NOT NULL,
`age` INT NOT NULL,
`gpa` FLOAT,
PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
使用 SparkSQL 向 MySQL 中写入数据
其实在使用 SparkSQL 访问 MySQL 是通过 JDBC, 那么其实所有支持 JDBC 的数据库理论上都可以通过这种方式进行访问
在使用 JDBC 访问关系型数据的时候, 其实也是使用 DataFrameReader, 对 DataFrameReader 提供一些配置, 就可以使用 Spark 访问 JDBC, 有如下几个配置可用
属性 | 含义 |
---|---|
url | 要连接的 JDBC URL |
dbtable | 要访问的表, 可以使用任何 SQL 语句中 from 子句支持的语法 |
fetchsize | 数据抓取的大小(单位行), 适用于读的情况 |
batchsize | 数据传输的大小(单位行), 适用于写的情况 |
isolationLevel | 事务隔离级别, 是一个枚举, 取值 NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE, 默认为 READ_UNCOMMITTED |
创建个文件名:(studenttab10k)
放入如下数据(由于笔者数据有1W个,但无法全部写入,这里只放200个)
ulysses thompson 64 1.90
katie carson 25 3.65
luke king 65 0.73
holly davidson 57 2.43
fred miller 55 3.77
holly white 43 0.24
luke steinbeck 51 1.14
nick underhill 31 2.46
holly davidson 59 1.26
calvin brown 56 0.72
rachel robinson 62 2.25
tom carson 35 0.56
tom johnson 72 0.99
irene garcia 54 1.06
oscar nixon 39 3.60
holly allen 32 2.58
oscar hernandez 19 0.05
alice ichabod 65 2.25
wendy thompson 30 2.39
priscilla hernandez 73 0.23
gabriella van buren 68 1.32
yuri thompson 42 3.65
yuri laertes 60 1.16
sarah young 23 2.76
zach white 32 0.20
nick van buren 68 1.75
xavier underhill 41 1.51
bob ichabod 56 2.81
zach steinbeck 61 2.22
alice garcia 42 2.03
jessica king 29 3.61
calvin nixon 37 0.30
fred polk 66 3.69
bob zipper 40 0.28
alice young 75 0.31
nick underhill 37 1.65
mike white 57 0.69
calvin ovid 41 3.02
fred steinbeck 47 3.57
sarah ovid 65 0.00
wendy nixon 63 0.62
gabriella zipper 77 1.51
david king 40 1.99
jessica white 30 3.82
alice robinson 37 3.69
zach nixon 74 2.75
irene davidson 27 1.22
priscilla xylophone 43 1.60
oscar zipper 25 2.43
fred falkner 38 2.23
ulysses polk 58 0.01
katie hernandez 47 3.80
zach steinbeck 55 0.68
fred laertes 69 3.62
quinn laertes 70 3.66
nick garcia 50 0.12
oscar young 55 2.22
bob underhill 47 0.24
calvin young 77 1.60
mike allen 65 2.95
david young 77 0.26
oscar garcia 69 1.59
ulysses ichabod 26 0.95
wendy laertes 76 1.13
sarah laertes 20 0.24
zach ichabod 60 1.60
tom robinson 62 0.78
zach steinbeck 69 1.01
quinn garcia 57 0.98
yuri van buren 32 1.97
luke carson 39 0.76
calvin ovid 73 0.82
luke ellison 27 0.56
oscar zipper 50 1.31
fred steinbeck 52 3.14
katie xylophone 76 1.38
luke king 54 2.30
ethan white 72 1.43
yuri ovid 37 3.64
jessica garcia 54 1.08
luke young 29 0.80
mike miller 39 3.35
fred hernandez 63 0.17
priscilla hernandez 52 0.35
ethan garcia 43 1.70
quinn hernandez 25 2.58
calvin nixon 33 1.01
yuri xylophone 47 1.36
ulysses steinbeck 63 1.05
jessica nixon 25 2.13
bob johnson 53 3.31
jessica ichabod 56 2.21
zach miller 63 3.87
priscilla white 66 2.82
ulysses allen 21 1.68
katie falkner 47 1.49
tom king 51 1.91
bob laertes 60 3.33
luke nixon 27 3.54
quinn johnson 42 2.24
wendy quirinius 71 0.10
victor polk 55 3.63
rachel robinson 32 1.11
sarah king 57 1.37
victor young 38 1.72
priscilla steinbeck 38 2.11
fred brown 19 2.72
xavier underhill 55 3.56
irene ovid 67 3.80
calvin brown 37 2.22
katie thompson 20 3.27
katie carson 66 3.55
tom miller 57 2.83
rachel brown 56 0.74
holly johnson 38 2.51
irene steinbeck 29 1.97
wendy falkner 37 0.14
ethan white 29 3.62
bob underhill 26 1.10
jessica king 64 0.69
luke steinbeck 19 1.16
luke laertes 70 3.58
rachel polk 74 0.92
calvin xylophone 52 0.58
luke white 57 3.86
calvin van buren 52 3.13
holly quirinius 59 1.70
mike brown 44 1.93
yuri ichabod 61 0.70
ulysses miller 56 3.53
victor hernandez 64 2.52
oscar young 34 0.34
luke ovid 36 3.17
quinn ellison 50 1.13
quinn xylophone 72 2.07
nick underhill 48 0.15
rachel miller 23 3.38
mike van buren 68 1.74
zach van buren 38 0.34
irene zipper 32 0.54
sarah garcia 31 3.87
rachel van buren 56 0.35
fred davidson 69 1.57
nick hernandez 19 2.11
irene polk 40 3.89
katie young 26 2.88
priscilla ovid 49 3.28
jessica hernandez 39 3.13
yuri allen 29 3.51
victor garcia 66 3.45
zach johnson 77 0.95
yuri zipper 48 3.44
alice falkner 28 3.72
gabriella allen 58 3.61
bob nixon 34 3.34
bob white 67 2.93
holly steinbeck 57 1.81
wendy van buren 40 1.09
calvin brown 61 2.08
irene young 25 2.66
holly van buren 40 2.37
katie underhill 30 0.63
quinn hernandez 73 0.31
fred nixon 53 1.76
luke ellison 59 1.10
quinn nixon 24 0.30
ethan underhill 68 2.25
fred underhill 28 3.88
jessica brown 59 3.66
katie falkner 49 3.96
calvin ellison 27 2.23
zach carson 59 0.46
priscilla polk 47 2.99
rachel zipper 49 3.26
holly king 73 1.23
zach carson 64 2.60
fred xylophone 61 3.15
gabriella miller 43 1.73
david laertes 56 3.43
tom garcia 63 2.78
ethan king 66 3.13
david hernandez 26 2.52
wendy ichabod 57 2.81
alice young 69 0.25
tom xylophone 50 2.78
ulysses carson 62 2.26
nick garcia 43 2.23
gabriella ellison 33 1.18
ethan miller 28 2.15
tom zipper 19 2.56
wendy white 19 1.12
luke ovid 31 1.68
wendy xylophone 75 2.58
quinn garcia 22 3.65
holly ellison 68 0.26
yuri hernandez 75 2.50
tom underhill 71 2.68
ulysses king 31 1.76
fred thompson 46 1.55
gabriella ichabod 26 1.59
读取数据集, 处理过后存往 MySQL 中的代码如下
package com.spark.jdbcimport org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructField, StructType}object MySQLWrite {/*** MySQL 的访问方式有两种 : 使用本地运行 , 提交到集群运行* 在写入MySQL数据时 , 使用本地运行 , 读取的时候使用集群运行*/def main(args: Array[String]): Unit = {//1.创建SparkSession 对象val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[6]").getOrCreate()//2.读取数据创建DataFrame// 1.拷贝文件// 2.读取val schema = StructType(List(StructField("name",StringType),StructField("age",IntegerType),StructField("gpa",FloatType)))val studentDF = spark.read.schema(schema).option("delimiter", "\t").csv("E:\\Project\\Spark\\spark-sql\\input\\studenttab10k")//3.处理数据 (不处理了,直接写)//4.落地studentDF.write.format("jdbc").option("url","jdbc:mysql://Bigdata01:3306/spark_test").option("dbtable","student").option("user","spark").option("password","Spark123!").mode(SaveMode.Overwrite).save()}
}
运行程序
如果是在本地运行, 需要导入 Maven 依赖
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version>
</dependency>
如果使用 Spark submit 或者 Spark shell 来运行任务, 需要通过 --jars 参数提交 MySQL 的 Jar 包, 或者指定 --packages 从 Maven 库中读取
bin/spark-shell \
--packages mysql:mysql-connector-java:5.1.47 \
--repositories http://maven.aliyun.com/nexus/content/groups/public/
运行后查询结果如下:
mysql> select * from student limit 29;
+---------------------+------+------+
| name | age | gpa |
+---------------------+------+------+
| katie carson | 25 | 3.65 |
| luke king | 65 | 0.73 |
| holly davidson | 57 | 2.43 |
| fred miller | 55 | 3.77 |
| holly white | 43 | 0.24 |
| luke steinbeck | 51 | 1.14 |
| nick underhill | 31 | 2.46 |
| holly davidson | 59 | 1.26 |
| calvin brown | 56 | 0.72 |
| rachel robinson | 62 | 2.25 |
| tom carson | 35 | 0.56 |
| tom johnson | 72 | 0.99 |
| irene garcia | 54 | 1.06 |
| oscar nixon | 39 | 3.6 |
| holly allen | 32 | 2.58 |
| oscar hernandez | 19 | 0.05 |
| alice ichabod | 65 | 2.25 |
| wendy thompson | 30 | 2.39 |
| priscilla hernandez | 73 | 0.23 |
| gabriella van buren | 68 | 1.32 |
| yuri thompson | 42 | 3.65 |
| yuri laertes | 60 | 1.16 |
| sarah young | 23 | 2.76 |
| zach white | 32 | 0.2 |
| nick van buren | 68 | 1.75 |
| xavier underhill | 41 | 1.51 |
| bob ichabod | 56 | 2.81 |
| zach steinbeck | 61 | 2.22 |
| alice garcia | 42 | 2.03 |
+---------------------+------+------+
29 rows in set (0.00 sec)
从 MySQL 中读取数据
读取 MySQL 的方式也非常的简单, 只是使用 SparkSQL 的 DataFrameReader 加上参数配置即可访问
package com.spark.jdbcimport org.apache.spark.sql.SparkSessionobject MySQLRead {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[6]").getOrCreate()spark.read.format("jdbc").option("url","jdbc:mysql://Bigdata01:3306/spark_test").option("dbtable","student").option("user","spark").option("password","Spark123!").load().show()}
}
默认情况下读取 MySQL 表时, 从 MySQL 表中读取的数据放入了一个分区, 拉取后可以使用 DataFrame 重分区来保证并行计算和内存占用不会太高, 但是如果感觉 MySQL 中数据过多的时候, 读取时可能就会产生 OOM, 所以在数据量比较大的场景, 就需要在读取的时候就将其分发到不同的 RDD 分区
属性 | 含义 |
---|---|
partitionColumn | 指定按照哪一列进行分区, 只能设置类型为数字的列, 一般指定为 ID |
lowerBound, upperBound | 确定步长的参数, lowerBound - upperBound 之间的数据均分给每一个分区, 小于 lowerBound 的数据分给第一个分区, 大于 upperBound 的数据分给最后一个分区 |
numPartitions | 分区数量 |
spark.read.format("jdbc").option("url", "jdbc:mysql://node01:3306/spark_test").option("dbtable", "student").option("user", "spark").option("password", "Spark123!").option("partitionColumn", "age").option("lowerBound", 1).option("upperBound", 60).option("numPartitions", 10).load().show()
有时候可能要使用非数字列来作为分区依据, Spark 也提供了针对任意类型的列作为分区依据的方法
val predicates = Array("age < 20","age >= 20, age < 30","age >= 30"
)val connectionProperties = new Properties()
connectionProperties.setProperty("user", "spark")
connectionProperties.setProperty("password", "Spark123!")spark.read.jdbc(url = "jdbc:mysql://node01:3306/spark_test",table = "student",predicates = predicates,connectionProperties = connectionProperties).show()
SparkSQL 中并没有直接提供按照 SQL 进行筛选读取数据的 API 和参数, 但是可以通过 dbtable 来曲线救国, dbtable 指定目标表的名称, 但是因为 dbtable 中可以编写 SQL, 所以使用子查询即可做到
spark.read.format("jdbc").option("url", "jdbc:mysql://node01:3306/spark_test").option("dbtable", "(select name, age from student where age > 10 and age < 20) as stu").option("user", "spark").option("password", "Spark123!").option("partitionColumn", "age").option("lowerBound", 1).option("upperBound", 60).option("numPartitions", 10).load().show()
Spark SQL 快速入门系列(六)Spark SQL 访问 JDBC相关推荐
- Spark SQL 快速入门系列(五)SparkSQL 访问 Hive
文章目录 访问 Hive SparkSQL 整合 Hive 访问 Hive 表 idea实现SparkSQL连接hive 访问 Hive 导读 1,整合 SparkSQL 和 Hive, 使用 Hiv ...
- Spark Core快速入门系列(5) | RDD 中函数的传递
大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...
- VPLC系列机器视觉运动控制一体机快速入门(六)
于形状匹配的视觉定位.BLOB有无检测以及测量尺寸. 本期课程,正运动技术和大家一起分享和标定有关的详细知识内容. 视频教程:<VPLC系列机器视觉运动控制一体机快速入门(六)> 机器视觉 ...
- c# wpf listbox 高度_WPF快速入门系列(1)——WPF布局概览
一.引言 关于WPF早在一年前就已经看过<深入浅出WPF>这本书,当时看完之后由于没有做笔记,以至于我现在又重新捡起来并记录下学习的过程,本系列将是一个WPF快速入门系列,主要介绍WPF中 ...
- python r转义_Python快速入门系列之二:还学不会我直播跪搓衣板
Python作为一个,目前最火的编程语言之一,已经渗透到了各行各业.它易学好懂,拥有着丰富的库,功能齐全.人生苦短,就用Python. 这个快速入门系列分为六篇,包含了Python大部分基础知识,每篇 ...
- 2021-08-26 转载 Scala快速入门系列博客文章
作者:Huidoo_Yang 出处:http://www.cnblogs.com/yangp/ 本文版权归作者Huidoo_Yang和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面 ...
- 视频教程-Unity快速入门系列课程(第1部)-Unity3D
Unity快速入门系列课程(第1部) 二十多年的软件开发与教学经验IT技术布道者,资深软件工程师.具备深厚编程语言经验,在国内上市企业做项目经理.研发经理,熟悉企业大型软件运作管理过程.软件架构设计理 ...
- 视频教程-Unity快速入门系列课程(第2部)-Unity3D
Unity快速入门系列课程(第2部) 二十多年的软件开发与教学经验IT技术布道者,资深软件工程师.具备深厚编程语言经验,在国内上市企业做项目经理.研发经理,熟悉企业大型软件运作管理过程.软件架构设计理 ...
- WPF快速入门系列(6)——WPF资源和样式
WPF快速入门系列(6)--WPF资源和样式 一.引言 WPF资源系统可以用来保存一些公有对象和样式,从而实现重用这些对象和样式的作用.而WPF样式是重用元素的格式的重要手段,可以理解样式就如CSS一 ...
- 树莓派从零开始快速入门系列汇总
树莓派从零开始快速入门系列汇总 树莓派从零开始快速入门第0讲--环境安装 树莓派从零开始快速入门第1讲--命令行 树莓派从零开始快速入门第2讲--更换国内源 树莓派从零开始快速入门第3讲--文件编辑 ...
最新文章
- dataframe for 循环 数据格式 python_Python中的for循环
- WCF一个运行环境,一个服务逻辑人,一个客户
- struts2 mysql 乱码_struts2项目插入中文到mysql数据库乱码的解决方法
- macos远程桌面连接_如何在macOS中使用Microsoft远程桌面连接Amazon EC2
- 2017.3.6~2017.3.7 Harry And Magic Box 思考记录(特别不容易)
- 文献管理与信息分析_全球酒店PMS行业市场现状分析,酒店信息管理全链条的灵魂...
- SQLServer数据库写入操作时报错:not all arguments converted during string formatting 问题解决
- 四、Oracle学习笔记:DML数据操作语句
- Perl语言入门(第六版)pdf
- arduino win8 驱动安装
- 谷歌这是要全面退出中国!
- 六级考研单词之路-十五
- dell服务器安装系统加载驱动,DELL服务器使用U盘加载驱动安装Win2003.doc
- JavaScript 常用数组方法及使用技巧「数组的力量隐藏在数组方法中,必收藏」
- 动态网站与数据库的连接
- 全球及中国BTK抑制剂市场发展状况与投资前景建议报告2022-2028年
- 黑马视频~rocketMq
- Zemax 2023安装教程
- 音乐,新闻等开放api接口
- 高精度乘法(c++实现)