spark mysql 写_Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序)...
1. JDBC
Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。
1.1. 从MySQL中加载数据(Spark Shell方式)
1.启动Spark Shell,必须指定mysql连接驱动jar包
[[email protected] spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077 --jars /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar --driver-class-path /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar
2.从mysql中加载数据
进入bigdata中创建person表:
CREATE DATABASE bigdata CHARACTER SET utf8;
USE bigdata;
CREATE TABLE person ( id INT(10) AUTO_INCREMENT PRIMARY KEY, name varchar(100), age INT(3) ) ENGINE=INNODB DEFAULT CHARSET=utf8;
并初始化数据:
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://hadoop10:3306/bigdata", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "person", "user" -> "root", "password" -> "123456")).load()
3.执行查询
scala> jdbcDF.show +---+--------+---+
| id| name|age| +---+--------+---+
| 1|zhangsan| 19|
| 2| lisi| 20|
| 3| wangwu| 28|
| 4| zhaoliu| 26|
| 5| tianqi| 55| +---+--------+---+
1.2. 将数据写入到MySQL中(打jar包方式)
1.2.1编写Spark SQL程序
package cn.toto.spark
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
/** * Created by toto on 2017/7/11. */
object JdbcRDDDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[2]")
val sc = new SparkContext(conf)
val connection = () => {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://hadoop10:3306/bigdata","root","123456")
}
//这个地方没有读取数据(数据库表也用的是person)
val jdbcRDD = new JdbcRDD(
sc,
connection,
"SELECT * FROM person where id >= ? AND id <= ?",
//这里表示从取数据库中的第1、2、3、4条数据,然后分两个区
1, 4, 2,
r => {
val id = r.getInt(1)
val code = r.getString(2)
(id, code)
}
)
//这里相当于是action获取到数据
val jrdd = jdbcRDD.collect()
println(jrdd.toBuffer)
sc.stop()
}
}
注意在运行的时候使用的还是person这个表,表中的数据如下:
如果是在IDEA中运行程序,程序结果如下:
1.2.2用maven将程序打包
1.2.3.将Jar包提交到spark集群
将bigdata-1.0-SNAPSHOT.jar放到:/home/tuzq/software/sparkdata,如下:
注意在运行执行,要将mysql-connector-java-5.1.38.jar 放到:/home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/下
bin/spark-submit --class cn.toto.spark.JdbcRDDDemo --master spark://hadoop1:7077 --jars /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar --driver-class-path /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar /home/tuzq/software/sparkdata/bigdata-1.0-SNAPSHOT.jar
运行结果:
2、通过Spark-sql将数据存储到数据库中
2.2.1.代码如下:
package cn.toto.spark
import java.util.Properties
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
/** * Created by toto on 2017/7/11. */
object JdbcRDD {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MySQL-Demo").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//通过并行化创建RDD
val personRDD = sc.parallelize(Array("14 tom 5", "15 jerry 3", "16 kitty 6")).map(_.split(" "))
//通过StrutType直接指定每个字段的schema
val schema = StructType(
List(
StructField("id",IntegerType,true),
StructField("name",StringType,true),
StructField("age",IntegerType,true)
)
)
//将RDD映射到rowRDD
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
//将schema信息应用到rowRDD上
val personDataFrame = sqlContext.createDataFrame(rowRDD,schema)
//创建Properties存储数据库相关属性
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "123456")
//将数据追加到数据库
personDataFrame.write.mode("append").jdbc("jdbc:mysql://hadoop10:3306/bigdata",
"bigdata.person",prop)
//停止SparkContext
sc.stop()
}
}
运行结果:
2.2.2、用maven将程序打包
2.2.3、将Jar包提交到spark集群
bin/spark-submit --class cn.toto.spark.JdbcRDD --master spark://hadoop1:7077 --jars /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar --driver-class-path /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar /home/tuzq/software/sparkdata/bigdata-1.0-SNAPSHOT.jar
spark mysql 写_Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序)...相关推荐
- python没有错误但是不显示结果_为什么我在Python3中加载模块时遇到问题但在Python2中没有加载?...
根据我使用的 Python安装,我在加载模块时遇到了一些问题.当我输入 from quantecon import approx_markov 在使用Python 3.4.0的终端中,返回以下错误消息 ...
- 如何在微信小程序中加载自己的地图数据
由于微信小程序无法进行DOM操作,导致像openlayers.leaflet这种常用的js库无法在微信小程序内使用,导致加载高德.百度.mapbox还有自定义的瓦片地图数据变得很困难. 目前,大多数情 ...
- 从内存中加载并启动一个exe
从内存中加载并启动一个exe 文章作者:Idle_ (阿呆) 信息来源:[url]http://cnxhacker.net/article/show/2821.html[/url] windows似乎 ...
- 四个步骤实现在ESRI ArcMap中加载17.6G离线卫星地图的方法
四个步骤实现在ESRI ArcMap中加载17.6G离线卫星地图的方法 ArcMap是GIS行业的从业人员再熟悉不过的一款功能非常强大的软件,尤其是对从事地质方面工作的外业人员来讲,更是一款不可或缺的 ...
- web高德地图怎么加载离线地图_基于 QGIS 在内网中离线加载卫星地图的方法
1. 概述 我们之前为大家分享过在三维地球开源平台离线加载卫星影像的方法,主要包括基于桌面端的OsgEarth开源三维地球和基于Web端的Cesium开源三维地球等平台的局域网离线影像加载. 另外,也 ...
- 从内存中加载并运行exe
{配合anskya的AnyWhereFileToPas效果不错} { ******************************************************* } { * ...
- 【gazebo中加载DEM高程图】
gazebo中加载DEM高程图 今天在ubuntu18.04中使用gazebo加载DEM高程图,发现始终无法显示高程图,最后在多方查找下找到了解决方案. 1.官方链接 gazebo中加载DEM链接 h ...
- Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序)
1. JDBC Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中. 1.1. 从MySQ ...
- beeline执行sql文件_【SparkSQL】介绍、与Hive整合、Spark的th/beeline/jdbc/thriftserve2、shell方式使用SQL...
目录 一.Spark SQL介绍 SQL on Hadoop框架: 1)Spark SQL 2)Hive 3)Impala 4)Phoenix Spark SQL是用来处理离线数据的,他的编程模型是D ...
最新文章
- POJ1088(滑雪)
- mysql clob blob,如何在MySQL中插入BLOB和CLOB文件?
- Latex中的一些表格用法总结(二)——行列式的表格,表格的切分和合并
- 数据库内核月报 - 2015 / 11-MySQL · 社区见闻 · OOW 2015 总结 MySQL 篇
- Android studio 设置主题
- php iis redis,iis windows phpstudy安装redis扩展
- mac 系统安装 eclipse 方法
- HttpClient 模拟登录网易微博
- 洛谷——P1317 低洼地
- 绝对定位元素、浮动元素会生成一个块级框
- SpringBoot在前端发送url时,不能识别特殊字符的问题
- MDT2010学习(八),MDT结合WDS部署Win7 x86企业版
- 自学web前端课程大纲分享,适合所有人学习
- sgx使用记录(Windows开发环境搭建以及sgx的简单介绍)1
- HDLC概述-iealb
- 华为S/CE系列交换机stelnet示例(带外管理地址绑定vpn实现业务和管理平面的隔离)
- 机器学习(七) 自编码器
- 计算机视觉:2.3.1、梯度下降法优化权重矩阵
- 实现某位置附近距离【Redis的GEO】
- 为期五天的实训总结--Java Web
热门文章
- 蓝牙怎么区分单模和双模_小院闲聊#01#——蓝牙的发展和不同蓝牙之间的关系...
- k折交叉验证法的额外步骤_教你几招蝴蝶结系法步骤,OMG!这怎么配都美
- mysql中sysdate(),curdate(),curtime(),now()
- Spring Cloud 相关配置信息说明
- html div英文自动换行,div 实现长英文字母自动换行CSS
- java.library.path在哪?
- MySQL中varchar类型字段隐式转换造成多删除数据
- ftp ---- 配置文件(默认配置文件解读)
- oracle表结构迁移麻不麻烦_干货分享 | 手把手教你get数据库迁移的正确姿势
- 博客园首页新随笔联系管理订阅订阅随笔- 610 文章- 0 评论- 83 阅读- 144万 Calendar时间获取天,周,月,季度,年度时间段