Spark踩坑记——数据库(Hbase+Mysql)转
转自:http://www.cnblogs.com/xlturing/p/spark.html
前言
在使用Spark Streaming的过程中对于计算产生结果的进行持久化时,我们往往需要操作数据库,去统计或者改变一些值。最近一个实时消费者处理任务,在使用spark streaming进行实时的数据流处理时,我需要将计算好的数据更新到hbase和mysql中,所以本文对spark操作hbase和mysql的内容进行总结,并且对自己踩到的一些坑进行记录。
Spark Streaming持久化设计模式
DStreams输出操作
- print:打印driver结点上每个Dstream中的前10个batch元素,常用于开发和调试
- saveAsTextFiles(prefix, [suffix]):将当前Dstream保存为文件,每个interval batch的文件名命名规则基于prefix和suffix:"prefix-TIME_IN_MS[.suffix]".
- saveAsObjectFiles(prefix, [suffix]):将当前的Dstream内容作为Java可序列化对象的序列化文件进行保存,每个interval batch的文件命名规则基于prefix和suffix:: "prefix-TIME_IN_MS[.suffix]".
- saveAsHadoopFiles(prefix, [suffix]):将Dstream以hadoop文件的形式进行保存,每个interval batch的文件命名规则基于prefix和suffix:: "prefix-TIME_IN_MS[.suffix]".
- foreachRDD(func):最通用的输出操作,可以对从数据流中产生的每一个RDD应用函数_fun_。通常_fun_会将每个RDD中的数据保存到外部系统,如:将RDD保存到文件,或者通过网络连接保存到数据库。值得注意的是:_fun_执行在跑应用的driver进程中,并且通常会包含RDD action以促使数据流RDD开始计算。
使用foreachRDD的设计模式
dstream.foreachRDD对于开发而言提供了很大的灵活性,但在使用时也要避免很多常见的坑。我们通常将数据保存到外部系统中的流程是:建立远程连接->通过连接传输数据到远程系统->关闭连接。针对这个流程我们很直接的想到了下面的程序代码:
dstream.foreachRDD { rdd =>val connection = createNewConnection() // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker } }
在spark踩坑记——初试中,对spark的worker和driver进行了整理,我们知道在集群模式下,上述代码中的connection需要通过序列化对象的形式从driver发送到worker,但是connection是无法在机器之间传递的,即connection是无法序列化的,这样可能会引起_serialization errors (connection object not serializable)_的错误。为了避免这种错误,我们将conenction在worker当中建立,代码如下:
dstream.foreachRDD { rdd =>rdd.foreach { record =>val connection = createNewConnection()connection.send(record)connection.close() } }
似乎这样问题解决了?但是细想下,我们在每个rdd的每条记录当中都进行了connection的建立和关闭,这会导致不必要的高负荷并且降低整个系统的吞吐量。所以一个更好的方式是使用_rdd.foreachPartition_即对于每一个rdd的partition建立唯一的连接(注:每个partition是内的rdd是运行在同一worker之上的),代码如下:
dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords =>val connection = createNewConnection()partitionOfRecords.foreach(record => connection.send(record))connection.close()}
}
这样我们降低了频繁建立连接的负载,通常我们在连接数据库时会使用连接池,把连接池的概念引入,代码优化如下:
dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords =>// ConnectionPool is a static, lazily initialized pool of connectionsval connection = ConnectionPool.getConnection()partitionOfRecords.foreach(record => connection.send(record))ConnectionPool.returnConnection(connection) // return to the pool for future reuse}
}
通过持有一个静态连接池对象,我们可以重复利用connection而进一步优化了连接建立的开销,从而降低了负载。另外值得注意的是,同数据库的连接池类似,我们这里所说的连接池同样应该是lazy的按需建立连接,并且及时的收回超时的连接。
另外值得注意的是:
- 如果在spark streaming中使用了多次foreachRDD,它们之间是按照程序顺序向下执行的
- Dstream对于输出操作的执行策略是lazy的,所以如果我们在foreachRDD中不添加任何RDD action,那么系统仅仅会接收数据然后将数据丢弃。
Spark访问Hbase
上面我们阐述了将spark streaming的Dstream输出到外部系统的基本设计模式,这里我们阐述如何将Dstream输出到Hbase集群。
Hbase通用连接类
Scala连接Hbase是通过zookeeper获取信息,所以在配置时需要提供zookeeper的相关信息,如下:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.HConstants import org.apache.hadoop.hbase.client.ConnectionFactory object HbaseUtil extends Serializable { private val conf = HBaseConfiguration.create() private val para = Conf.hbaseConfig // Conf为配置类,获取hbase的配置 conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, para.get("port").getOrElse("2181")) conf.set(HConstants.ZOOKEEPER_QUORUM, para.get("quorum").getOrElse("127-0-0-1")) // hosts private val connection = ConnectionFactory.createConnection(conf) def getHbaseConn: Connection = connection }
根据网上资料,Hbase的连接的特殊性我们并没有使用连接池
Hbase输出操作
我们以put操作为例,演示将上述设计模式应用到Hbase输出操作当中:
dstream.foreachRDD(rdd => {if (!rdd.isEmpty) {rdd.foreachPartition(partitionRecords => {val connection = HbaseUtil.getHbaseConn // 获取Hbase连接partitionRecords.foreach(data => {val tableName = TableName.valueOf("tableName")val t = connection.getTable(tableName)try { val put = new Put(Bytes.toBytes(_rowKey_)) // row key // column, qualifier, value put.addColumn(_column_.getBytes, _qualifier_.getBytes, _value_.getBytes) Try(t.put(put)).getOrElse(t.close()) // do some log(显示在worker上) } catch { case e: Exception => // log error e.printStackTrace() } finally { t.close() } }) }) // do some log(显示在driver上) } })
关于Hbase的其他操作可以参考Spark 下操作 HBase(1.0.0 新 API)
填坑记录
重点记录在连接Hbase过程中配置HConstants.ZOOKEEPER_QUORUM的问题:
由于Hbase的连接不能直接使用ip地址进行访问,往往需要配置hosts,例如我在上述代码段中127-0-0-1(任意),我们在hosts中需要配置
127-0-0-1 127.0.0.1
在单机情况下,我们只需要配置一台zookeeper所在Hbase的hosts即可,但是当切换到Hbase集群是遇到一个诡异的bug
问题描述:在foreachRDD中将Dstream保存到Hbase时会卡住,并且没有任何错误信息爆出(没错!它就是卡住,没反应)
问题分析:由于Hbase集群有多台机器,而我们只配置了一台Hbase机器的hosts,这样导致Spark集群在访问Hbase时不断的去寻找但却找不到就卡在那里
解决方式:对每个worker上的hosts配置了所有hbase的节点ip,问题解决
Spark访问Mysql
同访问Hbase类似,我们也需要有一个可序列化的类来建立Mysql连接,这里我们利用了Mysql的C3P0连接池
MySQL通用连接类
import java.sql.Connection
import java.util.Propertiesimport com.mchange.v2.c3p0.ComboPooledDataSource class MysqlPool extends Serializable { private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true) private val conf = Conf.mysqlConfig try { cpds.setJdbcUrl(conf.get("url").getOrElse("jdbc:mysql://127.0.0.1:3306/test_bee?useUnicode=true&characterEncoding=UTF-8")); cpds.setDriverClass("com.mysql.jdbc.Driver"); cpds.setUser(conf.get("username").getOrElse("root")); cpds.setPassword(conf.get("password").getOrElse("")) cpds.setMaxPoolSize(200) cpds.setMinPoolSize(20) cpds.setAcquireIncrement(5) cpds.setMaxStatements(180) } catch { case e: Exception => e.printStackTrace() } def getConnection: Connection = { try { return cpds.getConnection(); } catch { case ex: Exception => ex.printStackTrace() null } } } object MysqlManager { var mysqlManager: MysqlPool = _ def getMysqlManager: MysqlPool = { synchronized { if (mysqlManager == null) { mysqlManager = new MysqlPool } } mysqlManager } }
我们利用c3p0建立Mysql连接池,然后访问的时候每次从连接池中取出连接用于数据传输。
Mysql输出操作
同样利用之前的foreachRDD设计模式,将Dstream输出到mysql的代码如下:
dstream.foreachRDD(rdd => {if (!rdd.isEmpty) {rdd.foreachPartition(partitionRecords => {//从连接池中获取一个连接val conn = MysqlManager.getMysqlManager.getConnectionval statement = conn.createStatementtry {conn.setAutoCommit(false)partitionRecords.foreach(record => { val sql = "insert into table..." // 需要执行的sql操作 statement.addBatch(sql) }) statement.executeBatch conn.commit } catch { case e: Exception => // do some log } finally { statement.close() conn.close() } }) } })
值得注意的是:
- 我们在提交Mysql的操作的时候,并不是每条记录提交一次,而是采用了批量提交的形式,所以需要将conn.setAutoCommit(false),这样可以进一步提高mysql的效率。
- 如果我们更新Mysql中带索引的字段时,会导致更新速度较慢,这种情况应想办法避免,如果不可避免,那就硬上吧(T^T)
部署
提供一下Spark连接Mysql和Hbase所需要的jar包的maven配置:
<dependency><!-- Hbase --><groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.0.0</version> </dependency> <dependency><!-- Mysql --> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.31</version> </dependency> <dependency> <groupId>c3p0</groupId> <artifactId>c3p0</artifactId> <version>0.9.1.2</version> </dependency>
参考文献:
- Spark Streaming Programming Guide
- HBase介绍
- Spark 下操作 HBase(1.0.0 新 API)
- Spark开发快速入门
- kafka->spark->streaming->mysql(scala)实时数据处理示例
- Spark Streaming 中使用c3p0连接池操作mysql数据库
Spark踩坑记——数据库(Hbase+Mysql)转相关推荐
- Spark一路火花带闪电——Spark踩坑记
文章目录 坑有哪些? 踩坑记:版本号一定要正确!!!JDK一定要匹配. 使用idea打包maven项目 坑有哪些? ClassNotFoundException:表明maven依赖和服务器中的版本不匹 ...
- python从入门到实践django看不懂_Python编程:从入门到实践踩坑记 Django
<>踩坑记 Django Django Python 19.1.1.5 模板new_topic 做完书上的步骤后,对主题添加页面经行测试,但是浏览器显示 服务器异常. 个人采用的开发环境是 ...
- Spring @Transactional踩坑记
@Transactional踩坑记 总述 Spring在1.2引入@Transactional注解, 该注解的引入使得我们可以简单地通过在方法或者类上添加@Transactional注解,实现事务 ...
- 服务器重新部署踩坑记
服务器重新部署踩坑记 Intro 之前的服务器是 Ubuntu 18.04 ,上周周末想升级一下服务器系统,从 18.04 升级到 20.04,结果升级升挂了... 后来 SSH 始终连不上,索性删除 ...
- Vue + TypeScript + Element 搭建简洁时尚的博客网站及踩坑记
前言 本文讲解如何在 Vue 项目中使用 TypeScript 来搭建并开发项目,并在此过程中踩过的坑 . TypeScript 具有类型系统,且是 JavaScript 的超集,TypeScript ...
- 安装sql server踩坑记【sql2000程序安装配置服务器失败】
安装sql server踩坑记 安装程序配置服务器失败.参考服务器错误日志和 C:\WINDOWS\sqlstp.log 了解更多信息. 在C:\Program Files\Microsoft SQL ...
- Spark踩坑填坑-聚合函数-序列化异常
Spark踩坑填坑-聚合函数-序列化异常 一.Spark聚合函数特殊场景 二.spark sql group by 三.Spark Caused by: java.io.NotSerializable ...
- 东八区转为0时区_踩坑记 | Flink 天级别窗口中存在的时区问题
❝ 本系列每篇文章都是从一些实际的 case 出发,分析一些生产环境中经常会遇到的问题,抛砖引玉,以帮助小伙伴们解决一些实际问题.本文介绍 Flink 时间以及时区问题,分析了在天级别的窗口时会遇到的 ...
- IdentityServer 部署踩坑记
IdentityServer 部署踩坑记 Intro 周末终于部署了 IdentityServer 以及 IdentityServerAdmin 项目,踩了几个坑,在此记录分享一下. 部署架构 项目是 ...
最新文章
- drugbank下载XML文件解析
- 大牛书单 | 读书日,他们最近看了这些书
- 第一篇JavaScript基础
- java mp3 to wav_java实现wavToMP3格式转换详解
- 使用RestTemplate时报错java.lang.IllegalStateException: No instances available for 127.0.0.1
- 程序员又迎一利器,联想 LeapIOT 工业互联网平台大曝光
- RF工具ride使用
- 反编译与计算机软件的知识产权保护
- 阿里巴巴前端实习面经总结(可内推)
- 高通modem启动过程_「msm8953」高通8953启动流程 - seo实验室
- Linux常用命令--软件包管理之(服务管理)
- 摆脱客户端?网页发起直播势在必行!
- 研究生语音识别课程作业记录(一) 非特定人孤立词识别
- 论文阅读:Attention-based Dropout Layer for Weakly Supervised Object Localization
- 判断一个字符串是否为另外一个字符串旋转之后的字符串。 例如:给定s1 =AABCD和s2 = BCDAA,返回1,给定s1=abcd和s2=ACBD,返回0.
- 三学生上课玩手机遭批 喊十几人围砍老师
- justify-content具体属性值
- Eclipse使用SVN进行代码提交的步骤
- 做oms系统时候知道的
- 如果有不限额度,稳定的Facebook广告账号,你也能爆单?!
热门文章
- java const string_深入研究Java String
- 串口通信的基本原理----STM32
- c语言1e3和1e3,自考“高级语言程序设计”习题答案详解(33)
- uva 1252——Twenty Questions
- c++中的文件读写的操作
- c++中的继承--2(继承中的析构函数和构造函数,继承中同名成员,继承中静态成员)
- dup/dup2函数的用法
- 交叉编译openssl不修改Makefile的方法
- STM32时钟树解析
- 【Leetcode | 9】217. 存在重复元素