在Spark中提供了一个JdbcRDD类,该RDD就是读取JDBC中的数据并转换成RDD,之后我们就可以对该RDD进行各种的操作。我们先看看该类的构造函数:

JdbcRDD[T: ClassTag](
    sc: SparkContext,
    getConnection: () => Connection,
    sql: String,
    lowerBound: Long,
    upperBound: Long,
    numPartitions: Int,
    mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)

  这个类带了很多参数,关于这个函数的各个参数的含义,我觉得直接看英文就可以很好的理解,如下:

@param getConnection a function that returns an open Connection.
The RDD takes care of closing the connection.
@param sql the text of the query.
The query must contain two ? placeholders for parameters used to partition the results.
E.g. "select title, author from books where ? < = id and id <= ?"
@param lowerBound the minimum value of the first placeholder
@param upperBound the maximum value of the second placeholder
The lower and upper bounds are inclusive.
@param numPartitions the number of partitions.
Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
the query would be executed twice, once with (1, 10) and once with (11, 20)
@param mapRow a function from a ResultSet to a single row of the desired result type(s).
This should only call getInt, getString, etc; the RDD takes care of calling next.
The default maps a ResultSet to an array of Object.

  上面英文看不懂??那好吧,我给你翻译一下。
  1、getConnection 返回一个已经打开的结构化数据库连接,JdbcRDD会自动维护关闭。
  2、sql 是查询语句,此查询语句必须包含两处占位符?来作为分割数据库ResulSet的参数,例如:"select title, author from books where ? < = id and id <= ?"
  3、lowerBound, upperBound, numPartitions 分别为第一、第二占位符,partition的个数。例如,给出lowebound 1,upperbound 20, numpartitions 2,则查询分别为(1, 10)与(11, 20)
  4、mapRow 是转换函数,将返回的ResultSet转成RDD需用的单行数据,此处可以选择Array或其他,也可以是自定义的case class。默认的是将ResultSet 转换成一个Object数组。

下面我们说明如何使用该类。

package scala
 
import java.sql.DriverManager
 
import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD
 
object SparkToJDBC {
 
  def main(args: Array[String]) {
    val sc = new SparkContext("local", "mysql")
    val rdd = new JdbcRDD(
      sc,
      () => {
        Class.forName("com.mysql.jdbc.Driver").newInstance()
        DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")
      },
      "SELECT content FROM mysqltest WHERE ID >= ? AND ID <= ?",
      1, 100, 3,
      r => r.getString(1)).cache()
 
    print(rdd.filter(_.contains("success")).count())
    sc.stop()
  }
}

代码比较简短,主要是读mysqltest 表中的数据,并统计ID >=1 && ID < = 100 && content.contains("success")的记录条数。我们从代码中可以看出JdbcRDD的sql参数要带有两个?的占位符,而这两个占位符是给参数lowerBound和参数upperBound定义where语句的上下边界的。从JdbcRDD类的构造函数可以知道,参数lowerBound和参数upperBound都只能是Long类型的,并不支持其他类型的比较,这个使得JdbcRDD使用场景比较有限。而且在使用过程中sql参数必须有类似 ID >= ? AND ID <= ?这样的where语句,如果你写成下面的形式:

val rdd = new JdbcRDD(
  sc,
  () => {
    Class.forName("com.mysql.jdbc.Driver").newInstance()
    DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")
  },
  "SELECT content FROM mysqltest",
  1, 100, 3,
  r => r.getString(1)).cache()

那不好意思,运行的时候会出现以下的错误:

2014-09-10 15:47:45,621 (Executor task launch worker-0) [ERROR -
org.apache.spark.Logging$class.logError(Logging.scala:95)] Exception in task ID 1
java.sql.SQLException: Parameter index out of range (1 > number of parameters, which is 0).
  at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1074)
  at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:988)
  at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:974)
  at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:919)
  at com.mysql.jdbc.PreparedStatement.checkBounds(PreparedStatement.java:3813)
  at com.mysql.jdbc.PreparedStatement.setInternal(PreparedStatement.java:3795)
  at com.mysql.jdbc.PreparedStatement.setInternal(PreparedStatement.java:3840)
  at com.mysql.jdbc.PreparedStatement.setLong(PreparedStatement.java:3857)
  at org.apache.spark.rdd.JdbcRDD$$anon$1.<init>(JdbcRDD.scala:84)
  at org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:70)
  at org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:50)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
  at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
  at org.apache.spark.scheduler.Task.run(Task.scala:51)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
  at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
  at java.lang.Thread.run(Thread.java:619)

看下JdbcRDD类的compute函数实现就知道了:

override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
    context.addOnCompleteCallback{ () => closeIfNeeded() }
    val part = thePart.asInstanceOf[JdbcPartition]
    val conn = getConnection()
    val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
                                ResultSet.CONCUR_READ_ONLY)
 
    if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
      stmt.setFetchSize(Integer.MIN_VALUE)
      logInfo("statement fetch size set to: " + stmt.getFetchSize +
                                " to force mySQL streaming ")
    }
 
    stmt.setLong(1, part.lower)
    stmt.setLong(2, part.upper)
 
...........................

  不过值得高兴的是,我们可以自定义一个JdbcRDD,修改上面的计算思路,这样就可以得到符合我们自己要求的JdbcRDD。
  PS: 在写本文的时候,本来我想提供一个JAVA例子,但是JdbcRDD类的最后一个参数很不好传,网上有个哥们是这么说的:

I don't think there is a completely Java-friendly version of this class. However you should be able to get away with passing something generic like "ClassTag.MODULE.<k>apply(Object.class)"  There's probably something even simpler.

下面是我发邮件到Spark开发组询问如何在Java中使用JdbcRDD,开发人员给我的回复信息如下:

The two parameters which might cause you some difficulty in Java are

getConnection, which is a Function0[Connection], i.e. a 0 argument function that returns a jdbc connection
and
mapRow, which is a Function1[ResultSet, T], i.e. a 1 argument function that takes a result set and returns whatever type you want to convert it to.

You could try and include the scala library as a compile-time dependency in your project, and make your own instances of classes that implement the Function0 and Function1 interfaces. I've never tried that, so your mileage may vary. The mapRow parameter might be skippable - it has as a default a function that just returns an array of object, which you could then cast.

It would probably be easy to make the JdbcRDD interface more usable from Java8, but I don't know how much demand there is for that.

Spark与Mysql(JdbcRDD)整合开发相关推荐

  1. spark 写mysql 设置主键_Spark Sql 连接mysql

    1.基本概念和用法(摘自spark官方文档中文版) Spark SQL 还有一个能够使用 JDBC 从其他数据库读取数据的数据源.当使用 JDBC 访问其它数据库时,应该首选 JdbcRDD.这是因为 ...

  2. 使用Maven搭建Struts2+Spring3+Hibernate4的整合开发环境

    做了三年多的JavaEE开发了,在平时的JavaEE开发中,为了能够用最快的速度开发项目,一般都会选择使用Struts2,SpringMVC,Spring,Hibernate,MyBatis这些开源框 ...

  3. springmvc教程--整合mybatis开发(spring+springMVC+mybatis整合开发)

    一.整合mybatis 为了更好的学习 springmvc和mybatis整合开发的方法,需要将springmvc和mybatis进行整合. 整合目标:控制层采用springmvc.持久层使用myba ...

  4. 阿里开发规范文档_华为阿里等技术专家15年开发经验总结:SSM整合开发实战文档...

    前言 Spring自2002年诞生至今,已有近20年的历史,虽然几经变迁,但始终在继续发展和精进.Spring目前由Pivotal维护和开发. Pivotal是PaaS(平台即服务)的领导者,也是消息 ...

  5. 记录一次spark连接mysql遇到的问题

    在使用spark连接mysql的过程中报错了,错误如下 08:51:32.495 [main] ERROR - Error loading factory org.apache.calcite.jdb ...

  6. [Spark][Python]Spark 访问 mysql , 生成 dataframe 的例子:

    [Spark][Python]Spark 访问 mysql , 生成 dataframe 的例子: mydf001=sqlContext.read.format("jdbc").o ...

  7. (转)Spring4.2.5+Hibernate4.3.11+Struts2.3.24整合开发

    http://blog.csdn.net/yerenyuan_pku/article/details/52902851 前面我们已经学会了Spring4.2.5+Hibernate4.3.11+Str ...

  8. spark to mysql date_[Spark][Python]Spark 访问 mysql , 生成 dataframe 的例子:

    [Spark][Python]Spark 访问 mysql , 生成 dataframe 的例子: mydf001=sqlContext.read.format("jdbc").o ...

  9. JAVA WEB整合开发王者归来 -- 读书笔记 by CZF 完整版

    JAVA WEB整合开发王者归来 -- 读书笔记  目录 第1章 概述. 1 第2章 搭建web开发环境. 1 第3章 Servlet技术. 1 第4章 深入JSP技术. 7 第5章 会话跟踪. 12 ...

最新文章

  1. Android期末项目-校园论坛
  2. IntelliLock托管代码保护和许可授权管理系统软件详细介绍及下载
  3. PHP高并发的解决方案
  4. 本来调试无误的程序在真机运行时报标题错误解决方案
  5. Eclipse常用快捷键、常用设置、常见问题等
  6. Redis的数据类型详解
  7. 操作系统(四)文件管理
  8. 温故而知新:柯里化 与 bind() 的认知
  9. ftp服务器在线浏览,ftp服务器PDF文件在线查看的实现方法
  10. python实战===用python识别图片中的中文
  11. EBMIDE——打印格式管理
  12. eclipse 配置多个tomcat
  13. 知识竞赛现场管理系统安装配置及使用疑难问题汇编
  14. plecs使用C-Script模块建立PI传递函数模型
  15. virtualBox经常报错“内存不能为written”解决方法
  16. [UOJ#132][BZOJ4200][luogu_P2304][NOI2015]小园丁与老司机
  17. 支付宝-生成二维码实现url,实现支付
  18. LVDS、FPD-Link/GMSL、MIPI的区别
  19. 有道再出发:真正的教育事业没有终点
  20. 关于微功率短距离无线电发射设备,无需做SRRC认证

热门文章

  1. Linux简介之——目录与文件管理
  2. 工程之星android版使用,安卓版工程之星软件网络1+1模式及网络cors连接操作详解...
  3. arm linux读cpu id,基于ARM架构的芯片获取CPU信息(cpuID)的多种方法
  4. oracle学习数据,oracle学习中的一些心得
  5. wpspbc按钮是什么意思_抖音私密账号什么意思 抖音热评私密账号什么梗怎么设置?...
  6. build文件_把编译时间加入到目标文件
  7. 爬虫模拟登陆手机验证码_Python+scrapy爬虫之模拟登陆
  8. checksum命令 linux_关于Linux操作系统的一些命令是什么?
  9. img设置宽高不生效_便宜 好用 不掉盘 保姆级粒子云刷机攻略
  10. mysql 代码怎么优化_MySQL 性能优化的简略办法