spark java foreach_Spark Java使用DataFrame的foreach/foreachPartition
Spark已更新至2.x,DataFrame归DataSet管了,因此API也相应统一。本文不再适用2.0.0及以上版本。
DataFrame原生支持直接输出到JDBC,但如果目标表有自增字段(比如id),那么DataFrame就不能直接进行写入了。因为DataFrame.write().jdbc()要求DataFrame的schema与目标表的表结构必须完全一致(甚至字段顺序都要一致),否则会抛异常,当然,如果你SaveMode选择了Overwrite,那么Spark删除你原有的表,然后根据DataFrame的Schema生成一个。。。。字段类型会非常非常奇葩。。。。
于是我们只能通过DataFrame.collect(),把整个DataFrame转成List到Driver上,然后通过原生的JDBC方法进行写入。但是如果DataFrame体积过于庞大,很容易导致Driver OOM(特别是我们一般不会给Driver配置过高的内存)。这个问题真的很让人纠结。
翻看Spark的JDBC源码,发现实际上是通过foreachPartition方法,在DataFrame每一个分区中,对每个Row的数据进行JDBC插入,那么为什么我们就不能直接用呢?
Spark JdbcUtils.scala部分源码:
def saveTable(df: DataFrame,url: String,table: String,properties: Properties = new Properties()) {
val dialect = JdbcDialects.get(url)
val nullTypes: Array[Int] = df.schema.fields.map { field =>
dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(
field.dataType match {
case IntegerType => java.sql.Types.INTEGER
case LongType => java.sql.Types.BIGINT
case DoubleType => java.sql.Types.DOUBLE
case FloatType => java.sql.Types.REAL
case ShortType => java.sql.Types.INTEGER
case ByteType => java.sql.Types.INTEGER
case BooleanType => java.sql.Types.BIT
case StringType => java.sql.Types.CLOB
case BinaryType => java.sql.Types.BLOB
case TimestampType => java.sql.Types.TIMESTAMP
case DateType => java.sql.Types.DATE
case t: DecimalType => java.sql.Types.DECIMAL
case _ => throw new IllegalArgumentException(
s"Can't translate null value for field $field")
})
}
val rddSchema = df.schema
val driver: String = DriverRegistry.getDriverClassName(url)
val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties)
// ****************** here ******************
df.foreachPartition { iterator =>
savePartition(getConnection, table, iterator, rddSchema, nullTypes)
}
}
嗯。。。既然Scala能实现,那么作为他的爸爸,Java也应该能玩!
我们看看foreachPartition的方法原型:
def foreachPartition(f: Iterator[Row] => Unit)
又是函数式语言最爱的匿名函数。。。非常讨厌写lambda,所以我们还是实现个匿名类吧。要实现的抽象类为:
scala.runtime.AbstractFunction1,BoxedUnit> 两个模板参数,第一个很直观,就是Row的迭代器,作为函数的参数。第二个BoxedUnit,是函数的返回值。不熟悉Scala的可能会很困惑,其实这就是Scala的void。由于Scala函数式编程的特性,代码块的末尾必须返回点什么,于是他们就搞出了个unit来代替本应什么都没有的void(解释得可能不是很准确,我是这么理解的)。对于Java而言,我们可以直接使用BoxedUnit.UNIT,来得到这个“什么都没有”的东西。
来玩耍一下吧!
df.foreachPartition(new AbstractFunction1, BoxedUnit>() {
@Override
public BoxedUnit apply(Iterator it) {
while (it.hasNext()){
System.out.println(it.next().toString());
}
return BoxedUnit.UNIT;
}
});
嗯,maven complete一下,spark-submit看看~
好勒~抛异常了
org.apache.spark.SparkException: Task not serializable
Task不能被序列化
嗯哼,想想之前实现UDF的时候,UDF1/2/3/4...各接口,都extends Serializable,也就是说,在Spark运行期间,Driver会把UDF接口实现类序列化,并在Executor中反序列化,执行call方法。。。这就不难理解了,我们foreachPartition丢进去的类,也应该implements Serializable。这样,我们就得自己搞一个继承AbstractFunction1, BoxedUnit>,又实现Serializable的抽象类,给我们这些匿名类去实现!
import org.apache.spark.sql.Row;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import java.io.Serializable;
public abstract class JavaForeachPartitionFunc extends AbstractFunction1, BoxedUnit> implements Serializable {
}
可是每次都要return BoxedUnit.UNIT 搞得太别扭了,没一点Java的风格。
import org.apache.spark.sql.Row;
import scala.collection.Iterator;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import java.io.Serializable;
public abstract class JavaForeachPartitionFunc extends AbstractFunction1, BoxedUnit> implements Serializable {
@Override
public BoxedUnit apply(Iterator it) {
call(it);
return BoxedUnit.UNIT;
}
public abstract void call(Iterator it);
}
于是我们可以直接Override call方法,就可以用满满Java Style的代码去玩耍了!
df.foreachPartition(new JavaForeachPartitionFunc() {
@Override
public void call(Iterator it) {
while (it.hasNext()){
System.out.println(it.next().toString());
}
}
});
注意!我们实现的匿名类的方法,实际上是在executor上执行的,所以println是输出到executor机器的stdout上。这个我们可以通过Spark的web ui,点击具体Application的Executor页面去查看(调试用的虚拟机集群,手扶拖拉机一样的配置,别吐槽了~)
至于foreach方法同理。只不过把Iterator 换成 Row。具体怎么搞,慢慢玩吧~~~
have fun~
spark java foreach_Spark Java使用DataFrame的foreach/foreachPartition相关推荐
- Spark算子实战Java版,学到了
(一)概述 算子从功能上可以分为Transformations转换算子和Action行动算子.转换算子用来做数据的转换操作,比如map.flatMap.reduceByKey等都是转换算子,这类算子通 ...
- shell调用spark不执行JAVA,当代码在Spark shell中工作时,spark-submit不能引用“--jars”指定的jar?...
我使用intelliJ创建了一个sbt项目 . 我在项目的 lib 文件夹中复制了所需的jdbc jar sqljdbc42.jar . sbt package 圆满结束 . 我在 Windows 的 ...
- 一点一点看JDK源码(五)java.util.ArrayList 后篇之forEach
一点一点看JDK源码(五)java.util.ArrayList 后篇之forEach liuyuhang原创,未经允许禁止转载 本文举例使用的是JDK8的API 目录:一点一点看JDK源码(〇) 代 ...
- java for数组遍历数组_Java foreach操作(遍历)数组
语法: 我们分别使用 for 和 foreach 语句来遍历数组 运行结果: 练习: import java.util.Arrays; public class HelloWorld { public ...
- Spark SQL 1.3.0 DataFrame介绍、使用及提供了些完整的数据写入
问题导读 1.DataFrame是什么? 2.如何创建DataFrame? 3.如何将普通RDD转变为DataFrame? 4.如何使用DataFrame? 5.在1.3.0中,提供了哪些完整的 ...
- 做了6年的Java,java视频教程传智播客
JAVA基础 JAVA异常分类及处理 异常分类 异常的处理方式 Throw和throws的区别 JAVA反射 动态语言 反射机制概念 (运行状态中知道类所有的属性和方法) Java反射API 反射使用 ...
- java 配置信息_[Java教程]java 配置信息类 Properties 的简单使用
[Java教程]java 配置信息类 Properties 的简单使用 0 2016-12-08 09:00:09 Properties :(配置信息类) 是一个表示持久性的集合 ,继承 Hashta ...
- java与java ee_Java EE MVC:处理表单验证
java与java ee 在本文中,我们将介绍Java EE MVC中的表单验证. Java EE MVC与Java Bean验证API( JSR 303 )集成在一起,这使得添加验证约束变得非常容易 ...
- Java LinkedList – Java中的LinkedList
Java LinkedList is an implementation of the List and Deque interfaces. It is one of the frequently u ...
最新文章
- java在jsp中判断td的值是1还是2_Snap7,Eel与S71200简单组态1
- Openwrt MiniDLNA 安装方法及 其需要的依赖关系
- php中使用httpclient
- 【牛客 - 318M】被打脸的潇洒哥(几何问题,水题,结论,知识点)
- dynamic_debug动态打印kernel日志
- 谈谈数次生信线下活动的收获和体会
- java 类继承命名_Java 语言的类间的继承关系是( )。_学小易找答案
- 这个 bug 可劫持同一 WiFi 网络上所有的安卓版火狐移动浏览器
- 从零开始入门芯片行业
- 高斯求积分公式matlab,matlab高斯积分公式
- can卡通用测试软件LCANTest详细介绍
- c语言语法 英语,英语干货:英语语法基础知识大全
- R数据分析:论文中的轨迹的做法,潜增长模型和增长混合模型
- 资源管理器关闭了怎么打开
- MMO与弱交互游戏的服务端技术区别
- Django Web 官方 中文文档 开发手册
- 退出mysql控制台与退出mysql
- flash---星星闪
- 乔布斯一生都在模仿的偶像:两度从哈佛大学退学的发明家Edwin H. Land
- json对象遍历输出key和value
热门文章
- 详解Python线程对象daemon属性对线程退出的影响
- Python把汉字转换成拼音
- 怎么设置linux端口权限,Linux下设置端口权限的系统调用—ioperm和iopl
- python error loading package_Pycharm Available Package无法显示/安装包的问题Error Loading Package List解决...
- 两种方式实现节流函数
- mysql报错代码10051_zabbix_server 不能监听端口tcp 10051(示例代码)
- VScode 知识点查阅
- 青岛大学计算机科学技术学院官网,田呈亮 - 青岛大学 - 计算机科学技术学院
- java 简单的webshell_Java Web使用JSPX白名单绕过上传WebShell | kTWO-个人博客
- php栏目树,php生成无限栏目树的代码实例分享