抽离Common信息

SparkHelper用于获取SparkSession和SparkContext。

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.sql.SparkSession;

/**

* @Auther: majx2

* @Date: 2019-8-2 09:41

* @Description:

*/

public class SparkHelper {

private static SparkSession session = SparkSession.builder().config(getConf()).getOrCreate();

// private static JavaSparkContext context = new JavaSparkContext(getConf());

public static JavaSparkContext getContext(){

return JavaSparkContext.fromSparkContext(session.sparkContext());

// return context;

}

public static SparkSession getSession() {

return session;

}

private static SparkConf getConf(){

final SparkConf conf = new SparkConf().setAppName("SparkDemo").setMaster("local[4]");

// other config

return conf;

}

}

使用SparkSQL操作Mysql数据库

Spark SQL也包括一个可以使用JDBC从其它数据库读取数据的数据源。该功能应该优于使用JdbcRDD,因为它的返回结果是一个DataFrame,而在Spark SQL中DataFrame处理简单,且和其它数据源进行关联操作。

1、连接MySQL读取数据

连接Mysql数据库,全量读取表数据。

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import java.util.Properties;

/**

* @Auther: majx2

* @Date: 2019-8-2 10:10

* @Description:

*/

public class SparkMyTest {

private final static String jdbcUrl = "jdbc:mysql://localhost:3306/test?autoReconnect=true&characterEncoding=UTF8&tinyInt1isBit=false&allowMultiQueries=true";

private final static String tableName = "user"; // 表名

private final static String targetTable = "user_target"; // 目标表

public static void main(String[] args) {

final SparkSession session = SparkHelper.getSession();

Dataset dataset = session.read().jdbc(jdbcUrl,tableName,getProperties());

dataset.show();

}

private static Properties getProperties() {

Properties connectionProperties = new Properties();

connectionProperties.put("user", "root"); // 用户名

connectionProperties.put("password", "123456"); // 密码

connectionProperties.put("driver", "com.mysql.jdbc.Driver"); // 数据库驱动

return connectionProperties;

}

}

2、利用lowerBound,upperBound,numPartitions分区读取数据

简明解析:

partitionColumn:分片字段;

lowerBound:下界;

upperBound:上界;

numpartition:分区数

下面例子Spark会转换为:

第一个分区:select * from tablename where test_id <122901;

第二个分区:select * from tablename where test_id >=122901 and id <245802;

第三个分区:select * from tablename where test_id >=245802 and id <368705;

第四个分区:select * from tablename where test_id >= 368705;

Dataset dataset = session.read().jdbc(jdbcUrl, tableName, "test_id", 2, 491606, 4, getProperties());

返回结果:

+-------+---------+------+---+---------+-------------------+----+-----+

|test_id|tenant_id| name|age|test_type| test_date|role|phone|

+-------+---------+------+---+---------+-------------------+----+-----+

| 2| 1| 小马| 1| 1|2017-03-03 01:01:01| 1|10000|

| 3| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|

| 4| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|

| 5| 1| 王五| 2| 1|2017-03-03 01:01:01| 1|10010|

| 6| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|

| 100| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|

| 107| 1| 三毛| 2| 1|2017-02-02 01:01:01| 1|10086|

| 108| 1| 小马| 1| 1|2017-03-03 01:01:01| 1|10000|

| 109| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|

| 110| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|

| 111| 1| 王五| 2| 1|2017-03-03 01:01:01| 1|10010|

| 112| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|

| 113| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|

| 114| 1| 三毛| 2| 1|2017-02-02 01:01:01| 1|10086|

| 115| 1| 小马| 1| 1|2017-03-03 01:01:01| 1|10000|

| 116| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|

| 117| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|

| 118| 1| 王五| 2| 1|2017-03-03 01:01:01| 1|10010|

| 119| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|

| 120| 1| 雷锋| 1| 1|2017-01-01 01:01:01| 1|10010|

+-------+---------+------+---+---------+-------------------+----+-----+

only showing top 20 rows

4、使用条件分区读取数据

修改session.read().jdbc入参条件。每一个条件表示一个分区,利用分布式特性加速数据导入。

Dataset dataset = session.read().jdbc(jdbcUrl, tableName, new String[]{

"tenant_id=2 AND test_id <122901 order by test_id ", // 每个条件表示一个分区

"tenant_id=2 AND test_id >=122901 and test_id <245802 order by test_id",

"tenant_id=2 AND test_id >=245802 and test_id <368705 order by test_id",

"tenant_id=2 AND test_id >= 368705 order by test_id"

}, getProperties());

返回结果:

+-------+---------+------+---+---------+-------------------+----+-----+

|test_id|tenant_id| name|age|test_type| test_date|role|phone|

+-------+---------+------+---+---------+-------------------+----+-----+

| 3| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|

| 4| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|

| 109| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|

| 110| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|

| 116| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|

| 117| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|

| 123| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|

| 124| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|

| 131| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|

| 132| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|

| 138| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|

| 139| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|

| 145| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|

| 146| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|

| 152| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|

| 153| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|

| 162| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|

| 163| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|

| 169| 2|麻花藤| 1| 1|2017-03-03 01:01:01| 1|10000|

| 170| 2| 东狗| 2| 1|2017-03-03 01:01:01| 1|10086|

+-------+---------+------+---+---------+-------------------+----+-----+

only showing top 20 rows

5、做个统计

根据name字段统计出相同姓名的总数

// dataset.groupBy("name").count().show();

dataset.createOrReplaceTempView(tableName);

Dataset result = dataset.sqlContext().sql("SELECT name, COUNT(name) count FROM " + tableName + " GROUP BY name");

result.show();

返回结果:

+------+------+

| name| count|

+------+------+

| 王五| 65537|

| 小马| 65537|

| 雷锋|131072|

| 东狗| 65537|

| 三毛| 65535|

|麻花藤| 65536|

+------+------+

6、将结果入库

result.repartition(4) // 访问mysql的并发数/分区数

// SaveMode.Append:在数据源后添加;

// SaveMode.Overwrite:如果如果数据源已经存在记录,则覆盖;

// SaveMode.ErrorIfExists:如果如果数据源已经存在记录,则包异常;

// SaveMode.Ignore:如果如果数据源已经存在记录,则忽略;

.write().mode(SaveMode.Overwrite)

// 类型参考:org.apache.spark.sql.catalyst.parser.AstBuilder#visitPrimitiveDataType

.option("createTableColumnTypes", "name VARCHAR(100),count int") // 设置创建类型,多个以逗号间隔。

.option("createTableOptions", "ENGINE=INNODB DEFAULT CHARSET=utf8")

.option("truncate", "true") // 当savemode是 overwrite时,若dataframe 与原mysql 结构相同,则只truncate mysql,不会重新建表

.jdbc(jdbcUrl,targetTable,getProperties());

新建目标表

查看目标结果

转换Dataset

想要得到一个不一样的DataSet结构怎么办?

private static void testMy() {

final SparkSession session = SparkHelper.getSession();

Dataset report = session.read().jdbc(SparkHelper.jdbcUrl, "f_order_report_701", SparkHelper.getProperties());

Dataset relation = session.read().jdbc(SparkHelper.jdbcUrl, "f_sales_order_relation", SparkHelper.getProperties());

relation = relation.select("settle_date", "sales_order_id", "ou_id").distinct();

Dataset result = report.join(relation,

relation.col("sales_order_id").equalTo(report.col("sales_order_id")).and(

relation.col("ou_id").equalTo(report.col("ou_id")))

, "left").drop(relation.col("ou_id")).drop(relation.col("sales_order_id"))

.selectExpr(ArrayUtil.append(FIELDS,"if(write_off_amount = settlement_amount,1,0) as write_off_status"));

JavaRDD rdd = result.javaRDD().map(p -> {

Map map = Maps.newHashMap();

Lists.newArrayList(ArrayUtil.append(FIELDS, "write_off_status")).forEach(f -> map.put(StrUtil.toCamelCase(f), p.getAs(f)));

Map newMap = MapUtil.sort(map);

return RowFactory.create(p.getAs("id").toString(), JSON.toJSONString(newMap));

});

Dataset dataset = SparkHelper.getSession().createDataFrame(rdd, SparkHelper.getStructType());

dataset.show(3);

log.info(dataset.head().toString());

}

返回结果:

+---+--------------------+

| id| entity|

+---+--------------------+

| 69|{"accountTime":15...|

| 54|{"applyType":3,"c...|

| 68|{"applyType":1,"c...|

+---+--------------------+

only showing top 3 rows

2019-08-06 15:40:46.367 [main] [INFO ] [c.m.e.f.d.common.spark.SparkMyTest] - [69,{"accountTime":1563379200000,"applyType":5,"cateId":"1","createTime":1563420070000,"customerCode":"C0033688","franchiser":"0","id":69,"isAdjust":0,"itemId":"31031050000042","orderAttribute":2,"ouId":701,"outerOrderId":"920190718004","outerStoreCode":"SHC44F4140","salesCenterCode":"3608380682-1","salesChannel":2,"salesOrderId":"OM19071800000003","settlementAmount":699.00,"shipedReturnTime":1563420061000,"shopId":2,"status":5,"writeOffAmount":0.00,"writeOffStatus":0}]

The End !

sparksql mysql_使用SparkSQL操作MySQL - Spark入门教程相关推荐

  1. pythonmysql数据分析_Python操作Mysql数据库入门——数据导入pandas(数据分析准备)...

    原标题:Python操作Mysql数据库入门--数据导入pandas(数据分析准备) 欢迎关注天善智能 hellobi.com,我们是专注于商业智能BI,大数据,数据分析领域的垂直社区,学习.问答.求 ...

  2. mysql数据库入门教程(5):多表操作(连接查询,子查询,分页查询,联合查询)

    前文介绍了单表查询:mysql数据库入门教程(4):查询讲解大全 今天介绍下多表查询 一.连接查询 含义:又称多表查询,当查询的字段来自于多个表时,就会用到连接查询 先送上下面所讲用到的sql脚本 h ...

  3. MySQL 快速入门教程

    转:MySQL快速 入门教程 目录 一.MySQL的相关概念介绍 二.Windows下MySQL的配置 配置步骤 MySQL服务的启动.停止与卸载 三.MySQL脚本的基本组成 四.MySQL中的数据 ...

  4. mysql数据库入门教程(11):视图讲解大全

    一.视图的介绍 含义:虚拟表,和普通表一样使用 mysql5.1版本出现的新特性,是通过表动态生成的数据 举例说明什么是视图:假设一个年级有10个班,上面有领导来啦,说要检查舞蹈功底,学校为了应付检查 ...

  5. mysql数据库入门教程(6):数据的增删改

    前面两篇博文介绍了数据库的查询 mysql数据库入门教程(4):查询讲解大全 mysql数据库入门教程(5):多表操作(连接查询,子查询,分页查询,联合查询) 今天介绍下数据库的增删改. 数据库基本操 ...

  6. mysql 经典入门教程_MySQL 经典入门教程

    MySQL 经典入门教程 1 定义 数据库中的表:一行叫一条记录.每一列叫一个属性,或一个字段. 主键:表中的某个特殊字段,具有唯一的确定的值,可以根据该字段唯一的确定一条记录 外键:表中的某个字段的 ...

  7. MySQL简单入门教程

    http://www.ahnu.edu.cn/homepage/info_read.php?id=66183&cata_0=%B0%B2%BB%D5%CA%A6%B7%B6%B4%F3%D1% ...

  8. python docx 字体大小_Python操作Word的入门教程

    Python操作Word的入门教程 前言 今天来介绍下,如何用 Python 来操作 Word. 再来介绍操作 Word 之前,先来说一个最近看书学到的法则,即 3W 法则. 3W:3W分别指 Wha ...

  9. python操作word详细操作_Python操作Word的入门教程

    Python操作Word的入门教程 前言 今天来介绍下,如何用 Python 来操作 Word. 再来介绍操作 Word 之前,先来说一个最近看书学到的法则,即 3W 法则. 3W:3W分别指 Wha ...

  10. 比读文档更易上手的Spark入门教程来啦!

    Spark 开创至今,已经走过了近 12 年.12 年间,时代的脚步不断前进,我们看到互联网不断发展,各种初创公司崭露头角,在公司日常业务中需要处理的数据量也飞速增长.数据中心也从云下逐渐迁往云上,从 ...

最新文章

  1. python下载过程中最后一步执行opencv出错怎么回事_如何修复python中opencv中的错误“QObject::moveToThread:”?...
  2. Luogu_2774 方格取数问题
  3. Linux内核线程kernel thread详解--Linux进程的管理与调度(十)【转】
  4. CodeForces 1058C C. Vasya and Golden Ticket
  5. 解决IDEA中的Terminal工具无法识别git或者npm等命令
  6. 如何使用CSS将文本垂直居中?
  7. MyBatis中字符串拼接实现模糊查询的sql
  8. 开课吧课堂之如何使用多重catch语句
  9. crypto 乱码_base64编码加密解密程序,输出有乱码,为什么?
  10. 2019 related conferences 相关会议 ISMAR, VRST, UIST
  11. kodexplor类似php,Windows 下搭建 PHP + Nginx + KODExplorer
  12. xp系统网上邻居看不到局域网电脑_win7系统网上邻居看不到局域网中其他电脑的解决方法...
  13. swift WkWebView的返回,goback,跳过同级
  14. 论文阅读——A Comprehensive Study on Deep Learning-Based 3D Hand Pose Estimation Methods综述阅读2
  15. ME54N 采购申请审批
  16. 附近的人打招呼V1.0
  17. 四级地址库 国家标准的行政区划代码 省市区街道
  18. Python写入Excel格式和颜色
  19. 怎么把mp4转换成gif格式?在线转换方法
  20. pip如何适应多版本的python

热门文章

  1. codewars练习js2021/5/67891026
  2. 2048C语言源码linux
  3. 导入Android 项目,按钮点击事件,添加okhttp三合一,android动态修改标题
  4. D3D9学习笔记之基础几何体的深入应用(一)
  5. 装黑苹果读条到一半重启_安装黑苹果完成,重启卡进度条
  6. PAT 1072 开学寄语
  7. vmware启动sda assuming drive cache黑屏
  8. ssdt函数索引号_【转】SSDT索引号的获取
  9. chrome的APP模式与全屏模式 --app --kiosk
  10. 如何让学习像打游戏一样具有成瘾性