在Spark实时或离线计算的应用程序中,有时候需要将计算结果保存到数据库中,为了高效这里使用批量插入,结合c3po连接池,说明一下使用方法。

数据计算完成后,在foreachRDD中批量插入数据,因为是针对每一个partiton的数据操作,所以使用 rdd.foreachPartition,这里是一个批量插入页面PV和UV的操作,代码如下:

//RDD[(String,Int,Int)] 的意思是RDD[(页面名称,UV,PV)]
data.foreachRDD((rdd:RDD[(String,Int,Int)],time:Time)=>{rdd.foreachPartition(data=>{//从连接池中获取一个连接val conn = MDBManager.getMDBManager(isLocal).getConnectionconn.setAutoCommit(false)val sql = "insert into tableName set pageName=?,uvNum=?,pvNum=?"val preparedStatement = conn.prepareStatement(sql)data.foreach(r => {preparedStatement.setObject(1, r._1)preparedStatement.setObject(2, r._2)preparedStatement.setObject(3, r._3)preparedStatement.addBatch()})//批量提交,如果数据量大,这里可以分批提交preparedStatement.executeBatch()conn.commit()conn.close()
})

这里创建一个单例的MDBManager,并使用c3p0获取连接,代码如下:

class MDBManager(isLocal:Boolean) extends Serializable{            private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true);private val prop = new Properties()private var in:InputStream = _    isLocal match{case true  => in = getClass().getResourceAsStream("/c3p0.properties");case false => in = new FileInputStream(new File(SparkFiles.get("c3p0.properties")))}       try {prop.load(in);cpds.setJdbcUrl(prop.getProperty("jdbcUrl").toString());cpds.setDriverClass(prop.getProperty("driverClass").toString());cpds.setUser(prop.getProperty("user").toString());cpds.setPassword(prop.getProperty("password").toString());      cpds.setMaxPoolSize(Integer.valueOf(prop.getProperty("maxPoolSize").toString()));      cpds.setMinPoolSize(Integer.valueOf(prop.getProperty("minPoolSize").toString()));      cpds.setAcquireIncrement(Integer.valueOf(prop.getProperty("acquireIncrement").toString()));      cpds.setInitialPoolSize(Integer.valueOf(prop.getProperty("initialPoolSize").toString()));      cpds.setMaxIdleTime(Integer.valueOf(prop.getProperty("maxIdleTime").toString()));} catch {case ex: Exception => ex.printStackTrace()}def getConnection:Connection={  try {  return cpds.getConnection();  } catch { case ex:Exception => ex.printStackTrace()null}  }
}
object MDBManager{  var mdbManager:MDBManager=_def getMDBManager(isLocal:Boolean):MDBManager={synchronized{if(mdbManager==null){mdbManager = new MDBManager(isLocal)}}mdbManager}
}

因为本地模式和集群模式的不同获取c3p0.properties配置文件也不一样,代码中分别提供了两种获取配件文件的方式,通过参数isLocal来确定使用哪种方式。

由于使用的是mysql数据库和c3p0连接池,所以提交应用时需要添加mysql连接的jar包和c3p0的jar包,在Spark-submit中添加参数

--jars /usr/local/spark1.6.1/lib/mysql-connector-java-5.1.38-bin.jar,/usr/local/spark1.6.1/lib/c3p0-0.9.1.2.jar 

提交应用时添加c3p0的配置文件,在Spark-submit中添加参数

--files /usr/local/spark1.3/conf/c3p0.properties

Spark中使用c3p0连接池相关推荐

  1. maven的pom.xml文件中导入c3p0连接池,在运行时报错

    maven的pom.xml文件中导入c3p0连接池运行时报错 错误内容如下: java.lang.AbstractMethodError: Method com/mchange/v2/c3p0/imp ...

  2. Hibernate中配置C3P0连接池

    分享一下我老师大神的人工智能教程!零基础,通俗易懂!http://blog.csdn.net/jiangjunshow 也欢迎大家转载本篇文章.分享知识,造福人民,实现我们中华民族伟大复兴! 转自:h ...

  3. MyEclipse中Hibernate+C3P0连接池配置

    首先,我们还是老套的讲讲连接池的基本概念,概念理解清楚了,我们也知道后面是怎么回事了.     以前我们程序连接数据库的时候,每一次连接数据库都要一个连接,用完后再释放.如果频繁的数据库操作,就会导致 ...

  4. 在maven中使用c3p0连接池的c3p0-config.xml文件出现named-config with name ‘xxxxx‘ does not exist. Using default-con

    本人之前都用的java下的web application创建项目,第一次用maven创建web项目, 在我导c3p0配置文件的时候出现了 named-config with name 'xxxx' d ...

  5. spring连接jdbc_在Spring JDBC中添加C3PO连接池

    spring连接jdbc 连接池是一种操作,其中系统会预先初始化将来要使用的连接. 这样做是因为在使用时创建连接是一项昂贵的操作. 在这篇文章中,我们将学习如何在Spring JDBC中创建C3P0连 ...

  6. 在Spring JDBC中添加C3PO连接池

    连接池是一种操作,其中系统会预先初始化将来要使用的连接. 这样做是因为在使用时创建连接是一项昂贵的操作. 在本文中,我们将学习如何在Spring JDBC中创建C3P0连接池(某人未使用休眠). Po ...

  7. hibernate4配置c3p0连接池报错

    在hibernate的xml文件中配置c3p0连接池时,运行报错: java.lang.ClassNotFoundException: Could not load requested class : ...

  8. C3P0 连接池时报 TimeoutException 的解决方法

    最近在项目中遇到 C3P0 连接池时 IDEA 报 TimeoutException,记一次排查过程,希望对大家有帮助,过程如下: 首先,看 mysql 服务是否正常.先重启 mysql 服务,发现项 ...

  9. 配置springboot使用c3p0连接池

    为什么要使用c3p0? 使用springboot默认的数据源配置方式: spring:datasource:driver-class-name: com.mysql.jdbc.Driverurl: j ...

最新文章

  1. 30道Web前端面试题,你能答出多少道?
  2. 机器学习博主推荐、博文推荐
  3. 类型,对象,线程栈和托管堆在运行时的相互关系(一)。
  4. 计算机网络技术三级做题技巧,三级网络技术——我的经历,我的技巧
  5. 电磁感应理论之父,法拉第传奇
  6. ajax没效果,ajax没有效果
  7. 鸟书shell 学习笔记(一) shell专注于概念和命令
  8. 面试之函数节流和函数防抖
  9. 2020五一数学建模比赛总结
  10. 【openeuler 21.3】Linux硬盘分区、更改/home目录挂载空间及root目录扩容
  11. phpcms整站代码分析
  12. 将dBm转换为W的方法
  13. Sass!默认和主题化的设计系统
  14. Either类java_在Java 8中有相当于Scala的Either吗?
  15. java 登陆短信验证码_JAVA短信验证登录
  16. Yearning做SQL审核
  17. React Fullpage
  18. Win10_ltsc_2019_x64集成DPO基于人工智能优化可动态提高应用程序性能提升高达394%
  19. golang 原生支持 apple m1 cpu
  20. 使用Xshell远程连接CentOS7全过程,包括遇到的各种问题集合及解决方案

热门文章

  1. c语言中什么运算符可以求出数据字节数,C语言的数据类型及其运算符
  2. 计算机vb考试能插u盘么,08年计算机二级VisualBasic辅导:用VB打开任意盘(硬盘/U盘/光盘)的文件...
  3. OncoKB:肿瘤药物靶点相关基因组变异数据库
  4. MySQL安装版步骤
  5. Vegas Moive Studio 18 视频剪辑软件
  6. Tf铁蛋白颗粒包载顺铂/奥沙利铂/阿霉素/甲氨蝶呤MTX/紫杉醇PTX等药物
  7. 有没有能免费下载CSDN资源的方法呢
  8. Java实验报告经验总结
  9. 现在不起眼的小IT公司值得去吗?我分享自己的亲身经历
  10. 大佬们,救救可怜的孩子吧