Spark SQL External Data Sources JDBC官方实现写测试
通过Spark SQL External Data Sources JDBC实现将RDD的数据写入到MySQL数据库中。
jdbc.scala重要API介绍:
/*** Save this RDD to a JDBC database at `url` under the table name `table`.* This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.* If you pass `true` for `allowExisting`, it will drop any table with the* given name; if you pass `false`, it will throw if the table already* exists.*/ def createJDBCTable(url: String, table: String, allowExisting: Boolean) /*** Save this RDD to a JDBC database at `url` under the table name `table`.* Assumes the table already exists and has a compatible schema. If you* pass `true` for `overwrite`, it will `TRUNCATE` the table before* performing the `INSERT`s.** The table must already exist on the database. It must have a schema* that is compatible with the schema of this RDD; inserting the rows of* the RDD in order via the simple statement* `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.*/ def insertIntoJDBC(url: String, table: String, overwrite: Boolean)
import org.apache.spark.sql.SQLContext import org.apache.spark.sql.Row import org.apache.spark.sql.types._val sqlContext = new SQLContext(sc) import sqlContext._#数据准备 val url = "jdbc:mysql://hadoop000:3306/test?user=root&password=root"val arr2x2 = Array[Row](Row.apply("dave", 42), Row.apply("mary", 222)) val arr1x2 = Array[Row](Row.apply("fred", 3)) val schema2 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: Nil)val arr2x3 = Array[Row](Row.apply("dave", 42, 1), Row.apply("mary", 222, 2)) val schema3 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: StructField("seq", IntegerType) :: Nil) import org.apache.spark.sql.jdbc._================================CREATE====================================== val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)srdd.createJDBCTable(url, "person", false) sqlContext.jdbcRDD(url, "person").collect.foreach(println) [dave,42] [mary,222]==============================CREATE with overwrite======================================== val srdd = sqlContext.applySchema(sc.parallelize(arr2x3), schema3) srdd.createJDBCTable(url, "person2", false) sqlContext.jdbcRDD(url, "person2").collect.foreach(println) [mary,222,2] [dave,42,1]val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2) srdd2.createJDBCTable(url, "person2", true) sqlContext.jdbcRDD(url, "person2").collect.foreach(println) [fred,3]================================CREATE then INSERT to append====================================== val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2) val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2) srdd.createJDBCTable(url, "person3", false) sqlContext.jdbcRDD(url, "person3").collect.foreach(println) [mary,222] [dave,42]srdd2.insertIntoJDBC(url, "person3", false) sqlContext.jdbcRDD(url, "person3").collect.foreach(println) [mary,222] [dave,42] [fred,3]================================CREATE then INSERT to truncate====================================== val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2) val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)srdd.createJDBCTable(url, "person4", false) sqlContext.jdbcRDD(url, "person4").collect.foreach(println) [dave,42] [mary,222]srdd2.insertIntoJDBC(url, "person4", true) [fred,3]================================Incompatible INSERT to append====================================== val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2) val srdd2 = sqlContext.applySchema(sc.parallelize(arr2x3), schema3) srdd.createJDBCTable(url, "person5", false) srdd2.insertIntoJDBC(url, "person5", true)java.sql.SQLException: Column count doesn't match value count at row 1
Spark SQL External Data Sources JDBC官方实现写测试相关推荐
- Spark SQL Guide——Data Sources
文章目录 Parquet Files Partition Discovery(解析分区信息) Schema Merging(模式归并) Hive metastore Parquet table con ...
- Spark SQL: Relational Data Processing in Spark
Spark SQL: Relational Data Processing in Spark Spark SQL : Spark中关系型处理模块 说明: 类似这样的说明并非是原作者的内容翻译,而是本篇 ...
- Spark SQL External DataSource外部数据源操作流程
一:获取文件 官网:https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html spark本身 有测试 ...
- spark SQL Running the Thrift JDBC/ODBC server
Running the Thrift JDBC/ODBC server 1:运行 ./sbin/start-thriftserver.sh --hiveconf hive.server2.thrif ...
- Spark SQL External DataSource外部数据源
一:介绍 官网:https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html 随着Spark1.2的发布 ...
- Spark SQL玩起来
标签(空格分隔): Spark [toc] 前言 Spark SQL的介绍只包含官方文档的Getting Started.DataSource.Performance Tuning和Distribut ...
- Spark SQL 快速入门系列(五)SparkSQL 访问 Hive
文章目录 访问 Hive SparkSQL 整合 Hive 访问 Hive 表 idea实现SparkSQL连接hive 访问 Hive 导读 1,整合 SparkSQL 和 Hive, 使用 Hiv ...
- Spark SQL 核心编程
文章目录 Spark SQL 核心编程 1.新的起点 2.SQL 语法 1) 读取 json 文件创建 DataFrame 2) 对 DataFrame 创建一个临时表 3) 通过SQL语句实现查询全 ...
- Spark SQL操作多数据源
Spark SQL支持通过DataFrame接口操作的多种不同的数据源.DataFrame提供支持统一的接口加载和保存数据源中的数据,包括:结构化数据,Parquet文件,JSON文件,Hive表 , ...
- 大数据之Spark(四):Spark SQL
一.SparkSQL的发展 1.1 概述 SparkSQL是Spark⽣态体系中的构建在SparkCore基础之上的⼀个基于SQL的计算模块. SparkSQL的前身不叫SparkSQL,⽽叫Shar ...
最新文章
- sqlserver mysql 乱码_SQLServer数据库如何解决中文乱码问题?方法有哪些?
- mysql php ajax_PHP 和 AJAX MySQL 数据库实例
- android百度定位没反应,百度定位回调无反应,第一次能回调到数据,第二次无反应...
- 程序员须掌握的大数据分析核心技术
- getchar()不停止原因
- 2.支付平台架构:业务、规划、设计与实现 --- 收银台系统
- 【优化算法】量子遗传优化算法(QGA)【含Matlab源码 1123期】
- hive 指定字段插入数据_Hive插入数据的几种常用方法
- yoga710怎么进入bios_联想笔记本怎么进入BIOS联想手提电脑进BIOS方法汇总
- js 浏览器语音播报
- 【风险管理】(第二篇)风险管理分析及预测方法
- java jmf播放mp3_关于jmf不能播放mp3的问题解决
- 【Word】Word更改默认模板样式——使用自定义模板【以Windows10+Word2019为例】
- java drawstring 模糊_Java绘制文字质量太低的解决方案?
- 使用非对称加密匿名加好友
- linux下paste、diff、meld的使用
- 22.8.29 C语言作业5道
- UOS利用系统安装光盘做本地apt源安装软件包
- 决策树实例之预测隐形眼镜类型
- 《青春》--塞缪尔·厄尔曼