摘要:实践解析如何利用SarkSQL高并发进行读取数据库和存储数据到数据库。

本文分享自华为云社区《SarkSQL高并发读取数据库和存储数据到数据库》,作者:Copy工程师 。

1. SparkSql 高并发读取数据库

SparkSql连接数据库读取数据给了三个API:

//Construct a DataFrame representing the database table accessible via JDBC URL url named table and connection properties.
Dataset<Row>  jdbc(String url, String table, java.util.Properties properties)
//Construct a DataFrame representing the database table accessible via JDBC URL url named table using connection properties.
Dataset<Row>  jdbc(String url, String table, String[] predicates, java.util.Properties connectionProperties)
//Construct a DataFrame representing the database table accessible via JDBC URL url named table.
Dataset<Row>  jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, java.util.Properties connectionProperties)

三个API介绍:

  1. 单个分区,单个task执行,无并发

遇到数据量很大的表,抽取速度慢。

实例:

SparkSession sparkSession = SparkSession.builder().appName("SPARK_FENGDING_TASK1").master("local").config("spark.testing.memory", 471859200).getOrCreate();
// 配置连接属性
Properties dbProps = new Properties();
dbProps.put("user","user");
dbProps.put("password","pwd");
dbProps.put("driver","oracle.jdbc.driver.OracleDriver");
// 连接数据库 获取数据 要使用自己的数据库连接串
Dataset<Row> tableDf = sparkSession.read().jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO", dbProps);
// 返回1
tableDf.rdd().getPartitions();

该API的并发数为1,单分区,不管你留给该任务节点多少资源,都只有一个task执行任务

2. 任意字段分区

该API是第二个API,根据设置的分层条件设置并发度:

def jdbc(url: String,table: String,predicates: Array[String], #这个是分层的条件,一个数组connectionProperties: Properties): DataFrame = {val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>JDBCPartition(part, i) : Partition}jdbc(url, table, parts, connectionProperties)
}

实例:

// 设置分区条件 通过入库时间 把 10月和11月 的数据 分两个分区
String[] patitions = {"rksj >= '1569859200' and rksj < '1572537600'","rksj >= '1572537600' and rksj < '1575129600'"};
// 根据StudentId 分15个分区,就会有15个task抽取数据
Dataset<Row> tableDf3 = sparkSession.read().jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO",patitions,dbProps);
// 返回2
tableDf3.rdd().getPartitions();

该API操作相对自由,就是设置分区条件麻烦一点。

3. 根据Long类型字段分区

该API是第三个API,根据设置的分区数并发抽取数据:

def jdbc(url: String,table: String,columnName: String,    # 根据该字段分区,需要为整形,比如id等lowerBound: Long,      # 分区的下界upperBound: Long,      # 分区的上界numPartitions: Int,    # 分区的个数connectionProperties: Properties): DataFrame = {val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions)val parts = JDBCRelation.columnPartition(partitioning)jdbc(url, table, parts, connectionProperties)
}

实例:

// 根据StudentId 分15个分区,就会有15个task抽取数据
Dataset<Row> tableDf2 = sparkSession.read().jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO", "studentId",0,1500,15,dbProps);
// 返回10
tableDf2.rdd().getPartitions();

该操作根据分区数设置并发度,缺点是只能用于Long类型字段。

2. 存储数据到数据库

存储数据库API给了Class DataFrameWriter<T>类,该类有存储到文本,Hive,数据库的API。这里只说数据库的API,提一句,如果保存到Text格式,只支持保存一列。。。就很难受。

实例:

有三种写法

// 第一张写法,指定format类型,使用save方法存储数据库
jdbcDF.write().format("jdbc").option("url", "jdbc:postgresql:dbserver").option("dbtable", "schema.tablename").option("user", "username").option("password", "password").save();
// 第二种写法 使用jdbc写入数据库
jdbcDF2.write().jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);// 第三种写法,也是使用jdbc,只不过添加createTableColumnTypes,创建表的时候使用该属性字段创建表字段
jdbcDF.write().option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)").jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

当我们的表已经存在的时候,使用上面的语句就会报错表已存在,这是因为我们没有指定存储模式,默认是ErrorIfExists

保存模式:

所以一般都是这样用:

tableDf3.write().mode(SaveMode.Append).jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO", connectionProperties);

对于connectionProperties还有很多其他选项:

这里面的truncate就是说当使用SaveMode.Overwrite的时候,设置truncate为true,就会对表进行truncate语句清理表,不再是删除表在重建表的操作。

点击关注,第一时间了解华为云新鲜技术~

SparkSQL高并发:读取存储数据库相关推荐

  1. 高并发场景下数据库的常见问题及解决方案

    一.分库分表 (1)为什么要分库分表 随着系统访问量的增加,QPS越来越高,数据库磁盘容量不断增加,一般数据库服务器的QPS在800-1200的时候性能最佳,当超过2000的时候sql就会变得很慢并且 ...

  2. 高并发系列:存储优化之也许可能是史上最详尽的分库分表文章之一

    趣味性不强,但知识性很强,建议耐心看或者先收藏 本文内容预览: 库表会在哪天到达瓶颈? 1.1 苏宁拼购百万级库表拆分之前 1.2 京东配运平台库表拆分之前 1.3 大众点评订单库拆分之前 1.4 小 ...

  3. 大数据量高并发访问的数据库优化方法

    一.数据库结构的设计 如果不能设计一个合理的数据库模型,不仅会增加客户端和服务器段程序的编程和维护的难度,而且将会影响系统实际运行的性能.所以,在一个系统开始实施之前,完备的数据库模型的设计是必须的. ...

  4. 多线程高并发编程MySQL数据库处理

    本文作者:陈进坚 个人博客:https://jian1098.github.io CSDN博客:https://blog.csdn.net/c_jian 简书:https://www.jianshu. ...

  5. 面对高并发,大数据 数据库分区、分表、分库,读写分离

    https://blog.csdn.net/liangz/article/details/79352870 https://www.cnblogs.com/sunny3096/p/8595058.ht ...

  6. 快手春节红包背后,高并发存储架构设计

    导语 | 2020年春节,腾讯云文件存储(CFS)在通过了预演层层压测的选拔后成为快手广告推荐业务的护旗手,以100%的可用性护航了快手春节红包活动.本文是腾讯云高级工程师陈宏亮在「云加社区沙龙onl ...

  7. 如何设计一个高并发的存储系统

    1. 如何设计一个高并发的系统 ① 数据库的优化,包括合理的事务隔离级别.SQL语句优化.索引的优化 ② 使用缓存,尽量减少数据库 IO ③ 分布式数据库.分布式缓存 ④ 服务器的负载均衡 2. 锁的 ...

  8. PHP高并发商品秒杀问题的解决方案

    前言 秒杀会产生一个瞬间的高并发,使用数据库会增加数据库的访问压力,也会降低访问速度,所以我们应该使用缓存,来降低数据库的访问压力: 可以看出这里的操作和原来的下单是不一样的:产生的秒杀预订单不会马上 ...

  9. 生产者和消费者代码———操作系统_kafka如何保证高并发(从生产者、消费者角度)...

    内容梗概: 系统缓存+顺序写+批处理+mmap(生产者角度-高并发写入) 零拷贝技术(消费者角度-高并发读取) Kafka在生产者写入消息的时候会将数据最终写入磁盘,既然它是基于磁盘读写,那么频繁的I ...

最新文章

  1. ----移动端移动端调试神器vConsole----
  2. 苹果芯片工程师又被挖!这次是微软,要自研Azure服务器芯片
  3. 温州大学《机器学习》课程课件(九、支持向量机)
  4. 美国伊利诺伊大学香槟分校计算机专业,伊利诺伊大学香槟分校计算机科学排名第7(2020年TFE美国排名)...
  5. vue-cli3中的vue.config.js配置
  6. YUV422格式信号格式(以备学习之用)
  7. vaex 处理海量数据_Vaex真香!几秒钟就能处理数十亿行数据,比Pandas、Dask更好用...
  8. ip地址和MAC地址的捆绑
  9. Bailian2807 两倍【序列】
  10. thinkphp 表单令牌
  11. 《腾讯Android自动化测试实战》— Android 书籍
  12. 12.Linux 高性能服务器编程 --- 高性能 IO 框架库 Libevent
  13. K - 迷宫问题 POJ - 3984(广度搜索)
  14. 松下FP-XH系列PLC 断电保持寄存器使用注意事项
  15. 美图秀秀 web开发图片编辑器
  16. 电脑文件夹加密怎么做?6步教你设置文件夹密码
  17. TscanCode代码扫描工具
  18. 回文素数(10亿)—— unfinished
  19. mt6799芯片资料mt6799参考设计资料
  20. Android 上传头像(文件)到服务器

热门文章

  1. apache web_Web发明家预测文化将发生变化,Apache推动一半的互联网发展,等等
  2. (46)HTML网页开发流程
  3. 枚举类型 实现一个enumeration对于类的加工的函数
  4. Bootstrap两端对齐的按钮组
  5. ES6规格之数组的空位
  6. mysql5.7.17主从_mysql5.7.17主从同步配置
  7. android 休眠唤醒驱动流程分析,Android4.0.4休眠唤醒机制分析(基于MSM8260)
  8. 为什么hbase里没有表会显示表已经存在_0712-6.2.0-HBase快照异常
  9. 数据库自增主键用完了怎么办
  10. python--编码问题