通过Spark SQL External Data Sources JDBC实现将RDD的数据写入到MySQL数据库中。

jdbc.scala重要API介绍:

/*** Save this RDD to a JDBC database at `url` under the table name `table`.* This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.* If you pass `true` for `allowExisting`, it will drop any table with the* given name; if you pass `false`, it will throw if the table already* exists.*/
def createJDBCTable(url: String, table: String, allowExisting: Boolean) /*** Save this RDD to a JDBC database at `url` under the table name `table`.* Assumes the table already exists and has a compatible schema.  If you* pass `true` for `overwrite`, it will `TRUNCATE` the table before* performing the `INSERT`s.** The table must already exist on the database.  It must have a schema* that is compatible with the schema of this RDD; inserting the rows of* the RDD in order via the simple statement* `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.*/
def insertIntoJDBC(url: String, table: String, overwrite: Boolean) 

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._val sqlContext  = new SQLContext(sc)
import sqlContext._#数据准备
val url = "jdbc:mysql://hadoop000:3306/test?user=root&password=root"val arr2x2 = Array[Row](Row.apply("dave", 42), Row.apply("mary", 222))
val arr1x2 = Array[Row](Row.apply("fred", 3))
val schema2 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: Nil)val arr2x3 = Array[Row](Row.apply("dave", 42, 1), Row.apply("mary", 222, 2))
val schema3 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: StructField("seq", IntegerType) :: Nil) import org.apache.spark.sql.jdbc._================================CREATE======================================
val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)srdd.createJDBCTable(url, "person", false)
sqlContext.jdbcRDD(url, "person").collect.foreach(println)
[dave,42]
[mary,222]==============================CREATE with overwrite========================================
val srdd = sqlContext.applySchema(sc.parallelize(arr2x3), schema3)
srdd.createJDBCTable(url, "person2", false)
sqlContext.jdbcRDD(url, "person2").collect.foreach(println)
[mary,222,2]
[dave,42,1]val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)
srdd2.createJDBCTable(url, "person2", true)
sqlContext.jdbcRDD(url, "person2").collect.foreach(println)
[fred,3]================================CREATE then INSERT to append======================================
val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)
val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)
srdd.createJDBCTable(url, "person3", false)
sqlContext.jdbcRDD(url, "person3").collect.foreach(println)
[mary,222]
[dave,42]srdd2.insertIntoJDBC(url, "person3", false)
sqlContext.jdbcRDD(url, "person3").collect.foreach(println)
[mary,222]
[dave,42]
[fred,3]================================CREATE then INSERT to truncate======================================
val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)
val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)srdd.createJDBCTable(url, "person4", false)
sqlContext.jdbcRDD(url, "person4").collect.foreach(println)
[dave,42]
[mary,222]srdd2.insertIntoJDBC(url, "person4", true)
[fred,3]================================Incompatible INSERT to append======================================
val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)
val srdd2 = sqlContext.applySchema(sc.parallelize(arr2x3), schema3)
srdd.createJDBCTable(url, "person5", false)
srdd2.insertIntoJDBC(url, "person5", true)java.sql.SQLException: Column count doesn't match value count at row 1

Spark SQL External Data Sources JDBC官方实现写测试相关推荐

  1. Spark SQL Guide——Data Sources

    文章目录 Parquet Files Partition Discovery(解析分区信息) Schema Merging(模式归并) Hive metastore Parquet table con ...

  2. Spark SQL: Relational Data Processing in Spark

    Spark SQL: Relational Data Processing in Spark Spark SQL : Spark中关系型处理模块 说明: 类似这样的说明并非是原作者的内容翻译,而是本篇 ...

  3. Spark SQL External DataSource外部数据源操作流程

    一:获取文件 官网:https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html spark本身 有测试 ...

  4. spark SQL Running the Thrift JDBC/ODBC server

    Running the Thrift JDBC/ODBC server 1:运行 ./sbin/start-thriftserver.sh  --hiveconf hive.server2.thrif ...

  5. Spark SQL External DataSource外部数据源

    一:介绍 官网:https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html 随着Spark1.2的发布 ...

  6. Spark SQL玩起来

    标签(空格分隔): Spark [toc] 前言 Spark SQL的介绍只包含官方文档的Getting Started.DataSource.Performance Tuning和Distribut ...

  7. Spark SQL 快速入门系列(五)SparkSQL 访问 Hive

    文章目录 访问 Hive SparkSQL 整合 Hive 访问 Hive 表 idea实现SparkSQL连接hive 访问 Hive 导读 1,整合 SparkSQL 和 Hive, 使用 Hiv ...

  8. Spark SQL 核心编程

    文章目录 Spark SQL 核心编程 1.新的起点 2.SQL 语法 1) 读取 json 文件创建 DataFrame 2) 对 DataFrame 创建一个临时表 3) 通过SQL语句实现查询全 ...

  9. Spark SQL操作多数据源

    Spark SQL支持通过DataFrame接口操作的多种不同的数据源.DataFrame提供支持统一的接口加载和保存数据源中的数据,包括:结构化数据,Parquet文件,JSON文件,Hive表 , ...

  10. 大数据之Spark(四):Spark SQL

    一.SparkSQL的发展 1.1 概述 SparkSQL是Spark⽣态体系中的构建在SparkCore基础之上的⼀个基于SQL的计算模块. SparkSQL的前身不叫SparkSQL,⽽叫Shar ...

最新文章

  1. sqlserver mysql 乱码_SQLServer数据库如何解决中文乱码问题?方法有哪些?
  2. mysql php ajax_PHP 和 AJAX MySQL 数据库实例
  3. android百度定位没反应,百度定位回调无反应,第一次能回调到数据,第二次无反应...
  4. 程序员须掌握的大数据分析核心技术
  5. getchar()不停止原因
  6. 2.支付平台架构:业务、规划、设计与实现 --- 收银台系统
  7. 【优化算法】量子遗传优化算法(QGA)【含Matlab源码 1123期】
  8. hive 指定字段插入数据_Hive插入数据的几种常用方法
  9. yoga710怎么进入bios_联想笔记本怎么进入BIOS联想手提电脑进BIOS方法汇总
  10. js 浏览器语音播报
  11. 【风险管理】(第二篇)风险管理分析及预测方法
  12. java jmf播放mp3_关于jmf不能播放mp3的问题解决
  13. 【Word】Word更改默认模板样式——使用自定义模板【以Windows10+Word2019为例】
  14. java drawstring 模糊_Java绘制文字质量太低的解决方案?
  15. 使用非对称加密匿名加好友
  16. linux下paste、diff、meld的使用
  17. 22.8.29 C语言作业5道
  18. UOS利用系统安装光盘做本地apt源安装软件包
  19. 决策树实例之预测隐形眼镜类型
  20. 《青春》--塞缪尔·厄尔曼

热门文章

  1. iOS语言中的KVO机制
  2. git学习 远程仓库02
  3. oracle桌面工具plsql连接本地远程数据库
  4. ubuntu 开发环境的配置 (转)
  5. 小巧实用的节拍器软件FineMetronome介绍 原创
  6. SpringBoot解决驼峰命名 ---返回Json实体类属性大小写问题
  7. 尚学堂--面向对象2
  8. QT分析之QApplication的初始化
  9. 黑马day11 脏读数据amp;解
  10. Address already in use: JVM_Bindnull:8080