如今,流数据是一个热门话题,而Apache Spark是出色的流框架。 在此博客文章中,我将向您展示如何将自定义数据源集成到Spark中。

Spark Streaming使我们能够从各种来源进行流传输,同时使用相同的简洁API访问数据流,执行SQL查询或创建机器学习算法。 这些功能使Spark成为流式(或任何类型的工作流)应用程序的首选框架,因为我们可以使用框架的所有方面。

面临的挑战是弄清楚如何将自定义数据源集成到Spark中,以便我们能够利用其强大功能而无需更改为更多标准源。 更改似乎是合乎逻辑的,但是在某些情况下,这样做是不可能或不方便的。

流式自定义接收器

Spark提供了不同的扩展点,正如我们在此处扩展Data Source API以便将自定义数据存储集成到Spark SQL中所看到的那样。

在此示例中,我们将做同样的事情,但是我们还将扩展流API,以便我们可以从任何地方流。

为了实现我们的自定义接收器,我们需要扩展Receiver [A]类。 请注意,它具有类型注释,因此我们可以从流客户端的角度对DStream实施类型安全。

我们将使用此自定义接收器来流式传输我们的应用程序之一通过套接字发送的订单。

通过网络传输的数据的结构如下所示:

1 5
1 1 2
2 1 1
2 1 1
4 1 1
2 2
1 2 2

我们首先接收订单ID和订单总金额,然后接收订单的行项目。 第一个值是商品ID,第二个是订单ID(与订单ID值匹配),然后是商品成本。 在此示例中,我们有两个订单。 第一个只有四个项目,第二个只有一个项目。

这个想法是将所有这些隐藏在我们的Spark应用程序中,因此它在DStream上收到的是在流上定义的完整顺序,如下所示:

val orderStream: DStream[Order] = .....
val orderStream: DStream[Order] = .....

同时,我们还使用接收器来流式传输我们的自定义流式源。 即使它通过套接字发送数据,使用来自Spark的标准套接字流也将非常复杂,因为我们将无法控制数据的输入方式,并且会遇到在应用程序上遵循顺序的问题本身。 这可能非常复杂,因为一旦进入应用程序空间,我们便会并行运行,并且很难同步所有这些传入数据。 但是,在接收方空间中,很容易从原始输入文本创建订单。

让我们看看我们的初始实现是什么样的。

case class Order(id: Int, total: Int, items: List[Item] = null)
case class Item(id: Int, cost: Int)class OrderReceiver(host: String, port: Int) extends Receiver[Order](StorageLevel.MEMORY_ONLY)  {override def onStart(): Unit = {println("starting...")val thread = new Thread("Receiver") {override def run() {receive() }}thread.start()}override def onStop(): Unit = stop("I am done")def receive() = ....
}
case class Order(id: Int, total: Int, items: List[Item] = null)
case class Item(id: Int, cost: Int)class OrderReceiver(host: String, port: Int) extends Receiver[Order](StorageLevel.MEMORY_ONLY)  {override def onStart(): Unit = {println("starting...")val thread = new Thread("Receiver") {override def run() {receive() }}thread.start()}override def onStop(): Unit = stop("I am done")def receive() = ....
}

我们的OrderReceiver扩展了Receiver [Order],它使我们可以在Spark内部存储Order(带注释的类型)。 我们还需要实现onStart()和onStop()方法。 请注意,onStart()创建一个线程,因此它是非阻塞的,这对于正确的行为非常重要。

现在,让我们看一下接收方法,真正发生魔术的地方。

def receive() = {val socket = new Socket(host, port)var currentOrder: Order = nullvar currentItems: List[Item] = nullval reader = new BufferedReader(new InputStreamReader (socket.getInputStream(), "UTF-8"))while (!isStopped()) {var userInput = reader.readLine()if (userInput == null) stop("Stream has ended")else {val parts = userInput.split(" ")if (parts.length == 2) {if (currentOrder != null) {store(Order(currentOrder.id, currentOrder.total, currentItems))}currentOrder = Order(parts(0).toInt, parts(1).toInt)currentItems = List[Item]()}else {currentItems = Item(parts(0).toInt, parts(1).toInt) :: currentItems}}}}
def receive() = {val socket = new Socket(host, port)var currentOrder: Order = nullvar currentItems: List[Item] = nullval reader = new BufferedReader(new InputStreamReader (socket.getInputStream(), "UTF-8"))while (!isStopped()) {var userInput = reader.readLine()if (userInput == null) stop("Stream has ended")else {val parts = userInput.split(" ")if (parts.length == 2) {if (currentOrder != null) {store(Order(currentOrder.id, currentOrder.total, currentItems))}currentOrder = Order(parts(0).toInt, parts(1).toInt)currentItems = List[Item]()}else {currentItems = Item(parts(0).toInt, parts(1).toInt) :: currentItems}}}}

在这里,我们创建一个套接字并将其指向源,然后我们就可以简单地开始读取它,直到调度了stop命令,或者套接字上没有更多数据为止。 请注意,我们正在读取与之前定义的结构相同的结构(如何发送数据)。 完全阅读订单后,我们将调用store(…),以便将其保存到Spark中。

除了在我们的应用程序中使用我们的接收器外,这里别无所要做:

val config = new SparkConf().setAppName("streaming")
val sc = new SparkContext(config)
val ssc = new StreamingContext(sc, Seconds(5))val stream: DStream[Order] = ssc.receiverStream(new OrderReceiver(port))
val config = new SparkConf().setAppName("streaming")
val sc = new SparkContext(config)
val ssc = new StreamingContext(sc, Seconds(5))val stream: DStream[Order] = ssc.receiverStream(new OrderReceiver(port))

请注意我们是如何使用自定义OrderReceiver创建流的(仅为了清楚起见,对val流进行了注释,但这不是必需的)。 从现在开始,我们将流(DString [Order])用作我们在任何其他应用程序中使用的任何其他流。

stream.foreachRDD { rdd =>rdd.foreach(order => {println(order.id))              order.items.foreach(println)}}
stream.foreachRDD { rdd =>rdd.foreach(order => {println(order.id))              order.items.foreach(println)}}

摘要

当处理生成无尽数据的源时,Spark Streaming非常方便。 您可以使用与Spark SQL和系统中其他组件相同的API,但它也足够灵活,可以扩展以满足您的特定需求。

翻译自: https://www.javacodegeeks.com/2016/05/integrate-custom-data-sources-apache-spark.html

如何将自定义数据源集成到Apache Spark中相关推荐

  1. apache spark_如何将自定义数据源集成到Apache Spark中

    apache spark 如今,流数据是一个热门话题,而Apache Spark是出色的流框架. 在此博客文章中,我将向您展示如何将自定义数据源集成到Spark中. Spark Streaming使我 ...

  2. spark java udf_在 Apache Spark 中使用 UDF

    用户定义函数(User-defined functions, UDFs)是大多数 SQL 环境的关键特性,用于扩展系统的内置功能. UDF允许开发人员通过抽象其低级语言实现来在更高级语言(如SQL)中 ...

  3. 在 Apache Spark 中利用 HyperLogLog 函数实现高级分析

    在 Apache Spark 中利用 HyperLogLog 函数实现高级分析 预聚合是高性能分析中的常用技术,例如,每小时100亿条的网站访问数据可以通过对常用的查询纬度进行聚合,被降低到1000万 ...

  4. Apache Spark中的有向无环图DAG

    Apache Spark中的有向无环图DAG 由DATAFLAIR TEAM ·更新· 2018年11月21日 1.目的 在本Apache Spark教程中,我们将了解Apache Spark中的DA ...

  5. Apache Spark中的自定义日志

    您是否曾经对运行了几个小时的Spark作业感到沮丧,但由于基础设施问题而失败了. 您会很晚才知道此故障,并浪费了数小时的时间,当Spark UI日志也无法用于事后检查时,它会更加痛苦. 你不是一个人! ...

  6. spark匹配html字段,Apache Spark中的高效字符串匹配

    我不会首先使用Spark,但如果你真的承诺特定的堆栈,你可以结合一堆ml变压器来获得最佳匹配.你需要Tokenizer(或split): import org.apache.spark.ml.feat ...

  7. Apache Spark中实现的MapReduce设计模式

    该博客是该系列文章的第一篇,讨论了MapReduce设计模式一书中的一些设计模式,并展示了如何在Apache Spark(R)中实现这些模式. 在编写MapReduce或Spark程序时,考虑执行作业 ...

  8. spark 读取ftp_scala – 使用ftp在Apache Spark中的远程计算机上读取文件

    我正在尝试使用ftp在Apache Spark( Scala版本)中的远程计算机上读取文件.目前,我在 GitHub上关注Databricks的Learning Spark回购中的一个例子.使用cur ...

  9. Apache Spark 3.0 结构化Streaming流编程指南

    目录 总览 快速范例 Scala语言 Java语言 Python语言 R语言 程式设计模型 基本概念 处理事件时间和延迟数据 容错语义 使用数据集和数据帧的API 创建流数据框架和流数据集 流数据帧/ ...

最新文章

  1. .net Core+Dapper MySQL增删改查
  2. 时间序列与R语言应用(part5)--移动平均MA模型及其可逆性
  3. 中国电网招聘 计算机岗位
  4. springboot集成swagger2,构建优雅的Restful API
  5. dns、网关、IP地址,主要是配置resolv.conf\network\ifcfg-eth0
  6. 传输线模型(分布参数模型)
  7. c#餐饮系统打印机_C# 实现打印机功能
  8. python文档自动翻译
  9. 天涯明月刀7月5号服务器维护,天涯明月刀7月5日更新_天刀7月5日版本改动_3DM网游...
  10. ArcGIS拓扑功能的应用:将点的数据落入面内
  11. php word 生成图片,PHP导出成word,带图片样式
  12. 蓝牙、wifi、3G/4G、lora技术的对比
  13. coms 传输门棍棒图_棍棒和石头可能会伤到我的骨头,但反馈绝对不会伤害我
  14. flask返回本地文件到服务器,Flask返回静态文件
  15. 某Q音乐最新歌曲查询API 可用!
  16. 虚拟化——初始化系统配置
  17. 企业破产重整网_最高法开通全国企业破产重整案件信息网
  18. 在Windows中安装Anaconda、NumPy和Matplotlib软件包
  19. CS231n - Assignment2 Tensorflow
  20. 计算机关系差运算与交运算的区别,计算机二级 公基础——关系运算.ppt

热门文章

  1. 用命令行执行java代码
  2. xml vs db.properties
  3. 同步和异步有何异同,什么场景使用
  4. bmp180气压传感器工作原理_陕西压力传感器的工作原理信息推荐
  5. (转)公钥,私钥和数字签名这样最好理解
  6. java反射的工具类的函数集合
  7. NanoHTTPD web server的一个简单荔枝
  8. soapui 测试soap_使用SoapUI调用不同的安全WCF SOAP服务-基本身份验证,第二部分
  9. apache.camel_Apache Camel 3.1 –更多骆驼核心优化(第3部分)
  10. java 堆转储快照_捕获Java堆转储的7个选项