sparksql mysql_使用SparkSQL操作MySQL - Spark入门教程
抽离Common信息
SparkHelper用于获取SparkSession和SparkContext。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
/**
* @Auther: majx2
* @Date: 2019-8-2 09:41
* @Description:
*/
public class SparkHelper {
private static SparkSession session = SparkSession.builder().config(getConf()).getOrCreate();
// private static JavaSparkContext context = new JavaSparkContext(getConf());
public static JavaSparkContext getContext(){
return JavaSparkContext.fromSparkContext(session.sparkContext());
// return context;
}
public static SparkSession getSession() {
return session;
}
private static SparkConf getConf(){
final SparkConf conf = new SparkConf().setAppName("SparkDemo").setMaster("local[4]");
// other config
return conf;
}
}
使用SparkSQL操作Mysql数据库
Spark SQL也包括一个可以使用JDBC从其它数据库读取数据的数据源。该功能应该优于使用JdbcRDD,因为它的返回结果是一个DataFrame,而在Spark SQL中DataFrame处理简单,且和其它数据源进行关联操作。
1、连接MySQL读取数据
连接Mysql数据库,全量读取表数据。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.Properties;
/**
* @Auther: majx2
* @Date: 2019-8-2 10:10
* @Description:
*/
public class SparkMyTest {
private final static String jdbcUrl = "jdbc:mysql://localhost:3306/test?autoReconnect=true&characterEncoding=UTF8&tinyInt1isBit=false&allowMultiQueries=true";
private final static String tableName = "user"; // 表名
private final static String targetTable = "user_target"; // 目标表
public static void main(String[] args) {
final SparkSession session = SparkHelper.getSession();
Dataset dataset = session.read().jdbc(jdbcUrl,tableName,getProperties());
dataset.show();
}
private static Properties getProperties() {
Properties connectionProperties = new Properties();
connectionProperties.put("user", "root"); // 用户名
connectionProperties.put("password", "123456"); // 密码
connectionProperties.put("driver", "com.mysql.jdbc.Driver"); // 数据库驱动
return connectionProperties;
}
}
2、利用lowerBound,upperBound,numPartitions分区读取数据
简明解析:
partitionColumn:分片字段;
lowerBound:下界;
upperBound:上界;
numpartition:分区数
下面例子Spark会转换为:
第一个分区:select * from tablename where test_id <122901;
第二个分区:select * from tablename where test_id >=122901 and id <245802;
第三个分区:select * from tablename where test_id >=245802 and id <368705;
第四个分区:select * from tablename where test_id >= 368705;
Dataset dataset = session.read().jdbc(jdbcUrl, tableName, "test_id", 2, 491606, 4, getProperties());
返回结果:
+-------+---------+------+---+---------+-------------------+----+-----+
|test_id|tenant_id| name|age|test_type| test_date|role|phone|
+-------+---------+------+---+---------+-------------------+----+-----+
| 2| 1| 小马| 1| 1|2017-03-03 01:01:01| 1|10000|
| 3| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 4| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 5| 1| 王五| 2| 1|2017-03-03 01:01:01| 1|10010|
| 6| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|
| 100| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|
| 107| 1| 三毛| 2| 1|2017-02-02 01:01:01| 1|10086|
| 108| 1| 小马| 1| 1|2017-03-03 01:01:01| 1|10000|
| 109| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 110| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 111| 1| 王五| 2| 1|2017-03-03 01:01:01| 1|10010|
| 112| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|
| 113| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|
| 114| 1| 三毛| 2| 1|2017-02-02 01:01:01| 1|10086|
| 115| 1| 小马| 1| 1|2017-03-03 01:01:01| 1|10000|
| 116| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 117| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 118| 1| 王五| 2| 1|2017-03-03 01:01:01| 1|10010|
| 119| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|
| 120| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|
+-------+---------+------+---+---------+-------------------+----+-----+
only showing top 20 rows
4、使用条件分区读取数据
修改session.read().jdbc入参条件。每一个条件表示一个分区,利用分布式特性加速数据导入。
Dataset dataset = session.read().jdbc(jdbcUrl, tableName, new String[]{
"tenant_id=2 AND test_id <122901 order by test_id ", // 每个条件表示一个分区
"tenant_id=2 AND test_id >=122901 and test_id <245802 order by test_id",
"tenant_id=2 AND test_id >=245802 and test_id <368705 order by test_id",
"tenant_id=2 AND test_id >= 368705 order by test_id"
}, getProperties());
返回结果:
+-------+---------+------+---+---------+-------------------+----+-----+
|test_id|tenant_id| name|age|test_type| test_date|role|phone|
+-------+---------+------+---+---------+-------------------+----+-----+
| 3| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 4| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 109| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 110| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 116| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 117| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 123| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 124| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 131| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 132| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 138| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 139| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 145| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 146| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 152| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 153| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 162| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 163| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
| 169| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|
| 170| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|
+-------+---------+------+---+---------+-------------------+----+-----+
only showing top 20 rows
5、做个统计
根据name字段统计出相同姓名的总数
// dataset.groupBy("name").count().show();
dataset.createOrReplaceTempView(tableName);
Dataset result = dataset.sqlContext().sql("SELECT name, COUNT(name) count FROM " + tableName + " GROUP BY name");
result.show();
返回结果:
+------+------+
| name| count|
+------+------+
| 王五| 65537|
| 小马| 65537|
| 雷锋|131072|
| 东狗| 65537|
| 三毛| 65535|
|麻花藤| 65536|
+------+------+
6、将结果入库
result.repartition(4) // 访问mysql的并发数/分区数
// SaveMode.Append:在数据源后添加;
// SaveMode.Overwrite:如果如果数据源已经存在记录,则覆盖;
// SaveMode.ErrorIfExists:如果如果数据源已经存在记录,则包异常;
// SaveMode.Ignore:如果如果数据源已经存在记录,则忽略;
.write().mode(SaveMode.Overwrite)
// 类型参考:org.apache.spark.sql.catalyst.parser.AstBuilder#visitPrimitiveDataType
.option("createTableColumnTypes", "name VARCHAR(100),count int") // 设置创建类型,多个以逗号间隔。
.option("createTableOptions", "ENGINE=INNODB DEFAULT CHARSET=utf8")
.option("truncate", "true") // 当savemode是 overwrite时,若dataframe 与原mysql 结构相同,则只truncate mysql,不会重新建表
.jdbc(jdbcUrl,targetTable,getProperties());
新建目标表
查看目标结果
转换Dataset
想要得到一个不一样的DataSet结构怎么办?
private static void testMy() {
final SparkSession session = SparkHelper.getSession();
Dataset report = session.read().jdbc(SparkHelper.jdbcUrl, "f_order_report_701", SparkHelper.getProperties());
Dataset relation = session.read().jdbc(SparkHelper.jdbcUrl, "f_sales_order_relation", SparkHelper.getProperties());
relation = relation.select("settle_date", "sales_order_id", "ou_id").distinct();
Dataset result = report.join(relation,
relation.col("sales_order_id").equalTo(report.col("sales_order_id")).and(
relation.col("ou_id").equalTo(report.col("ou_id")))
, "left").drop(relation.col("ou_id")).drop(relation.col("sales_order_id"))
.selectExpr(ArrayUtil.append(FIELDS,"if(write_off_amount = settlement_amount,1,0) as write_off_status"));
JavaRDD rdd = result.javaRDD().map(p -> {
Map map = Maps.newHashMap();
Lists.newArrayList(ArrayUtil.append(FIELDS, "write_off_status")).forEach(f -> map.put(StrUtil.toCamelCase(f), p.getAs(f)));
Map newMap = MapUtil.sort(map);
return RowFactory.create(p.getAs("id").toString(), JSON.toJSONString(newMap));
});
Dataset dataset = SparkHelper.getSession().createDataFrame(rdd, SparkHelper.getStructType());
dataset.show(3);
log.info(dataset.head().toString());
}
返回结果:
+---+--------------------+
| id| entity|
+---+--------------------+
| 69|{"accountTime":15...|
| 54|{"applyType":3,"c...|
| 68|{"applyType":1,"c...|
+---+--------------------+
only showing top 3 rows
2019-08-06 15:40:46.367 [main] [INFO ] [c.m.e.f.d.common.spark.SparkMyTest] - [69,{"accountTime":1563379200000,"applyType":5,"cateId":"1","createTime":1563420070000,"customerCode":"C0033688","franchiser":"0","id":69,"isAdjust":0,"itemId":"31031050000042","orderAttribute":2,"ouId":701,"outerOrderId":"920190718004","outerStoreCode":"SHC44F4140","salesCenterCode":"3608380682-1","salesChannel":2,"salesOrderId":"OM19071800000003","settlementAmount":699.00,"shipedReturnTime":1563420061000,"shopId":2,"status":5,"writeOffAmount":0.00,"writeOffStatus":0}]
The End !
sparksql mysql_使用SparkSQL操作MySQL - Spark入门教程相关推荐
- pythonmysql数据分析_Python操作Mysql数据库入门——数据导入pandas(数据分析准备)...
原标题:Python操作Mysql数据库入门--数据导入pandas(数据分析准备) 欢迎关注天善智能 hellobi.com,我们是专注于商业智能BI,大数据,数据分析领域的垂直社区,学习.问答.求 ...
- mysql数据库入门教程(5):多表操作(连接查询,子查询,分页查询,联合查询)
前文介绍了单表查询:mysql数据库入门教程(4):查询讲解大全 今天介绍下多表查询 一.连接查询 含义:又称多表查询,当查询的字段来自于多个表时,就会用到连接查询 先送上下面所讲用到的sql脚本 h ...
- MySQL 快速入门教程
转:MySQL快速 入门教程 目录 一.MySQL的相关概念介绍 二.Windows下MySQL的配置 配置步骤 MySQL服务的启动.停止与卸载 三.MySQL脚本的基本组成 四.MySQL中的数据 ...
- mysql数据库入门教程(11):视图讲解大全
一.视图的介绍 含义:虚拟表,和普通表一样使用 mysql5.1版本出现的新特性,是通过表动态生成的数据 举例说明什么是视图:假设一个年级有10个班,上面有领导来啦,说要检查舞蹈功底,学校为了应付检查 ...
- mysql数据库入门教程(6):数据的增删改
前面两篇博文介绍了数据库的查询 mysql数据库入门教程(4):查询讲解大全 mysql数据库入门教程(5):多表操作(连接查询,子查询,分页查询,联合查询) 今天介绍下数据库的增删改. 数据库基本操 ...
- mysql 经典入门教程_MySQL 经典入门教程
MySQL 经典入门教程 1 定义 数据库中的表:一行叫一条记录.每一列叫一个属性,或一个字段. 主键:表中的某个特殊字段,具有唯一的确定的值,可以根据该字段唯一的确定一条记录 外键:表中的某个字段的 ...
- MySQL简单入门教程
http://www.ahnu.edu.cn/homepage/info_read.php?id=66183&cata_0=%B0%B2%BB%D5%CA%A6%B7%B6%B4%F3%D1% ...
- python docx 字体大小_Python操作Word的入门教程
Python操作Word的入门教程 前言 今天来介绍下,如何用 Python 来操作 Word. 再来介绍操作 Word 之前,先来说一个最近看书学到的法则,即 3W 法则. 3W:3W分别指 Wha ...
- python操作word详细操作_Python操作Word的入门教程
Python操作Word的入门教程 前言 今天来介绍下,如何用 Python 来操作 Word. 再来介绍操作 Word 之前,先来说一个最近看书学到的法则,即 3W 法则. 3W:3W分别指 Wha ...
- 比读文档更易上手的Spark入门教程来啦!
Spark 开创至今,已经走过了近 12 年.12 年间,时代的脚步不断前进,我们看到互联网不断发展,各种初创公司崭露头角,在公司日常业务中需要处理的数据量也飞速增长.数据中心也从云下逐渐迁往云上,从 ...
最新文章
- python下载过程中最后一步执行opencv出错怎么回事_如何修复python中opencv中的错误“QObject::moveToThread:”?...
- Luogu_2774 方格取数问题
- Linux内核线程kernel thread详解--Linux进程的管理与调度(十)【转】
- CodeForces 1058C C. Vasya and Golden Ticket
- 解决IDEA中的Terminal工具无法识别git或者npm等命令
- 如何使用CSS将文本垂直居中?
- MyBatis中字符串拼接实现模糊查询的sql
- 开课吧课堂之如何使用多重catch语句
- crypto 乱码_base64编码加密解密程序,输出有乱码,为什么?
- 2019 related conferences 相关会议 ISMAR, VRST, UIST
- kodexplor类似php,Windows 下搭建 PHP + Nginx + KODExplorer
- xp系统网上邻居看不到局域网电脑_win7系统网上邻居看不到局域网中其他电脑的解决方法...
- swift WkWebView的返回,goback,跳过同级
- 论文阅读——A Comprehensive Study on Deep Learning-Based 3D Hand Pose Estimation Methods综述阅读2
- ME54N 采购申请审批
- 附近的人打招呼V1.0
- 四级地址库 国家标准的行政区划代码 省市区街道
- Python写入Excel格式和颜色
- 怎么把mp4转换成gif格式?在线转换方法
- pip如何适应多版本的python