Spark已更新至2.x,DataFrame归DataSet管了,因此API也相应统一。本文不再适用2.0.0及以上版本。

DataFrame原生支持直接输出到JDBC,但如果目标表有自增字段(比如id),那么DataFrame就不能直接进行写入了。因为DataFrame.write().jdbc()要求DataFrame的schema与目标表的表结构必须完全一致(甚至字段顺序都要一致),否则会抛异常,当然,如果你SaveMode选择了Overwrite,那么Spark删除你原有的表,然后根据DataFrame的Schema生成一个。。。。字段类型会非常非常奇葩。。。。

于是我们只能通过DataFrame.collect(),把整个DataFrame转成List到Driver上,然后通过原生的JDBC方法进行写入。但是如果DataFrame体积过于庞大,很容易导致Driver OOM(特别是我们一般不会给Driver配置过高的内存)。这个问题真的很让人纠结。

翻看Spark的JDBC源码,发现实际上是通过foreachPartition方法,在DataFrame每一个分区中,对每个Row的数据进行JDBC插入,那么为什么我们就不能直接用呢?

Spark JdbcUtils.scala部分源码:

def saveTable(df: DataFrame,url: String,table: String,properties: Properties = new Properties()) {

val dialect = JdbcDialects.get(url)

val nullTypes: Array[Int] = df.schema.fields.map { field =>

dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(

field.dataType match {

case IntegerType => java.sql.Types.INTEGER

case LongType => java.sql.Types.BIGINT

case DoubleType => java.sql.Types.DOUBLE

case FloatType => java.sql.Types.REAL

case ShortType => java.sql.Types.INTEGER

case ByteType => java.sql.Types.INTEGER

case BooleanType => java.sql.Types.BIT

case StringType => java.sql.Types.CLOB

case BinaryType => java.sql.Types.BLOB

case TimestampType => java.sql.Types.TIMESTAMP

case DateType => java.sql.Types.DATE

case t: DecimalType => java.sql.Types.DECIMAL

case _ => throw new IllegalArgumentException(

s"Can't translate null value for field $field")

})

}

val rddSchema = df.schema

val driver: String = DriverRegistry.getDriverClassName(url)

val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties)

// ****************** here ******************

df.foreachPartition { iterator =>

savePartition(getConnection, table, iterator, rddSchema, nullTypes)

}

}

嗯。。。既然Scala能实现,那么作为他的爸爸,Java也应该能玩!

我们看看foreachPartition的方法原型:

def foreachPartition(f: Iterator[Row] => Unit)

又是函数式语言最爱的匿名函数。。。非常讨厌写lambda,所以我们还是实现个匿名类吧。要实现的抽象类为:

scala.runtime.AbstractFunction1,BoxedUnit> 两个模板参数,第一个很直观,就是Row的迭代器,作为函数的参数。第二个BoxedUnit,是函数的返回值。不熟悉Scala的可能会很困惑,其实这就是Scala的void。由于Scala函数式编程的特性,代码块的末尾必须返回点什么,于是他们就搞出了个unit来代替本应什么都没有的void(解释得可能不是很准确,我是这么理解的)。对于Java而言,我们可以直接使用BoxedUnit.UNIT,来得到这个“什么都没有”的东西。

来玩耍一下吧!

df.foreachPartition(new AbstractFunction1, BoxedUnit>() {

@Override

public BoxedUnit apply(Iterator it) {

while (it.hasNext()){

System.out.println(it.next().toString());

}

return BoxedUnit.UNIT;

}

});

嗯,maven complete一下,spark-submit看看~

好勒~抛异常了

org.apache.spark.SparkException: Task not serializable

Task不能被序列化

嗯哼,想想之前实现UDF的时候,UDF1/2/3/4...各接口,都extends Serializable,也就是说,在Spark运行期间,Driver会把UDF接口实现类序列化,并在Executor中反序列化,执行call方法。。。这就不难理解了,我们foreachPartition丢进去的类,也应该implements Serializable。这样,我们就得自己搞一个继承AbstractFunction1, BoxedUnit>,又实现Serializable的抽象类,给我们这些匿名类去实现!

import org.apache.spark.sql.Row;

import scala.runtime.AbstractFunction1;

import scala.runtime.BoxedUnit;

import java.io.Serializable;

public abstract class JavaForeachPartitionFunc extends AbstractFunction1, BoxedUnit> implements Serializable {

}

可是每次都要return BoxedUnit.UNIT 搞得太别扭了,没一点Java的风格。

import org.apache.spark.sql.Row;

import scala.collection.Iterator;

import scala.runtime.AbstractFunction1;

import scala.runtime.BoxedUnit;

import java.io.Serializable;

public abstract class JavaForeachPartitionFunc extends AbstractFunction1, BoxedUnit> implements Serializable {

@Override

public BoxedUnit apply(Iterator it) {

call(it);

return BoxedUnit.UNIT;

}

public abstract void call(Iterator it);

}

于是我们可以直接Override call方法,就可以用满满Java Style的代码去玩耍了!

df.foreachPartition(new JavaForeachPartitionFunc() {

@Override

public void call(Iterator it) {

while (it.hasNext()){

System.out.println(it.next().toString());

}

}

});

注意!我们实现的匿名类的方法,实际上是在executor上执行的,所以println是输出到executor机器的stdout上。这个我们可以通过Spark的web ui,点击具体Application的Executor页面去查看(调试用的虚拟机集群,手扶拖拉机一样的配置,别吐槽了~)

至于foreach方法同理。只不过把Iterator 换成 Row。具体怎么搞,慢慢玩吧~~~

have fun~

spark java foreach_Spark Java使用DataFrame的foreach/foreachPartition相关推荐

  1. Spark算子实战Java版,学到了

    (一)概述 算子从功能上可以分为Transformations转换算子和Action行动算子.转换算子用来做数据的转换操作,比如map.flatMap.reduceByKey等都是转换算子,这类算子通 ...

  2. shell调用spark不执行JAVA,当代码在Spark shell中工作时,spark-submit不能引用“--jars”指定的jar?...

    我使用intelliJ创建了一个sbt项目 . 我在项目的 lib 文件夹中复制了所需的jdbc jar sqljdbc42.jar . sbt package 圆满结束 . 我在 Windows 的 ...

  3. 一点一点看JDK源码(五)java.util.ArrayList 后篇之forEach

    一点一点看JDK源码(五)java.util.ArrayList 后篇之forEach liuyuhang原创,未经允许禁止转载 本文举例使用的是JDK8的API 目录:一点一点看JDK源码(〇) 代 ...

  4. java for数组遍历数组_Java foreach操作(遍历)数组

    语法: 我们分别使用 for 和 foreach 语句来遍历数组 运行结果: 练习: import java.util.Arrays; public class HelloWorld { public ...

  5. Spark SQL 1.3.0 DataFrame介绍、使用及提供了些完整的数据写入

     问题导读 1.DataFrame是什么? 2.如何创建DataFrame? 3.如何将普通RDD转变为DataFrame? 4.如何使用DataFrame? 5.在1.3.0中,提供了哪些完整的 ...

  6. 做了6年的Java,java视频教程传智播客

    JAVA基础 JAVA异常分类及处理 异常分类 异常的处理方式 Throw和throws的区别 JAVA反射 动态语言 反射机制概念 (运行状态中知道类所有的属性和方法) Java反射API 反射使用 ...

  7. java 配置信息_[Java教程]java 配置信息类 Properties 的简单使用

    [Java教程]java 配置信息类 Properties 的简单使用 0 2016-12-08 09:00:09 Properties :(配置信息类) 是一个表示持久性的集合 ,继承 Hashta ...

  8. java与java ee_Java EE MVC:处理表单验证

    java与java ee 在本文中,我们将介绍Java EE MVC中的表单验证. Java EE MVC与Java Bean验证API( JSR 303 )集成在一起,这使得添加验证约束变得非常容易 ...

  9. Java LinkedList – Java中的LinkedList

    Java LinkedList is an implementation of the List and Deque interfaces. It is one of the frequently u ...

最新文章

  1. java在jsp中判断td的值是1还是2_Snap7,Eel与S71200简单组态1
  2. Openwrt MiniDLNA 安装方法及 其需要的依赖关系
  3. php中使用httpclient
  4. 【牛客 - 318M】被打脸的潇洒哥(几何问题,水题,结论,知识点)
  5. dynamic_debug动态打印kernel日志
  6. 谈谈数次生信线下活动的收获和体会
  7. java 类继承命名_Java 语言的类间的继承关系是( )。_学小易找答案
  8. 这个 bug 可劫持同一 WiFi 网络上所有的安卓版火狐移动浏览器
  9. 从零开始入门芯片行业
  10. 高斯求积分公式matlab,matlab高斯积分公式
  11. can卡通用测试软件LCANTest详细介绍
  12. c语言语法 英语,英语干货:英语语法基础知识大全
  13. R数据分析:论文中的轨迹的做法,潜增长模型和增长混合模型
  14. 资源管理器关闭了怎么打开
  15. MMO与弱交互游戏的服务端技术区别
  16. Django Web 官方 中文文档 开发手册
  17. 退出mysql控制台与退出mysql
  18. flash---星星闪
  19. 乔布斯一生都在模仿的偶像:两度从哈佛大学退学的发明家Edwin H. Land
  20. json对象遍历输出key和value

热门文章

  1. 详解Python线程对象daemon属性对线程退出的影响
  2. Python把汉字转换成拼音
  3. 怎么设置linux端口权限,Linux下设置端口权限的系统调用—ioperm和iopl
  4. python error loading package_Pycharm Available Package无法显示/安装包的问题Error Loading Package List解决...
  5. 两种方式实现节流函数
  6. mysql报错代码10051_zabbix_server 不能监听端口tcp 10051(示例代码)
  7. VScode 知识点查阅
  8. 青岛大学计算机科学技术学院官网,田呈亮 - 青岛大学 - 计算机科学技术学院
  9. java 简单的webshell_Java Web使用JSPX白名单绕过上传WebShell | kTWO-个人博客
  10. php栏目树,php生成无限栏目树的代码实例分享