apache spark

如今,流数据是一个热门话题,而Apache Spark是出色的流框架。 在此博客文章中,我将向您展示如何将自定义数据源集成到Spark中。

Spark Streaming使我们能够从各种来源流式传输,同时使用相同的简洁API访问数据流,执行SQL查询或创建机器学习算法。 这些功能使Spark成为流(或任何类型的工作流)应用程序的首选框架,因为我们可以使用框架的所有方面。

面临的挑战是弄清楚如何将自定义数据源集成到Spark中,以便我们可以利用其功能而无需更改为更多标准源。 更改似乎是合乎逻辑的,但是在某些情况下,这样做是不可能或不方便的。

流式自定义接收器

Spark提供了不同的扩展点,正如我们在此处扩展Data Source API以便将自定义数据存储集成到Spark SQL中所看到的那样。

在此示例中,我们将做同样的事情,但是我们还将扩展流API,以便我们可以从任何地方流。

为了实现我们的自定义接收器,我们需要扩展Receiver [A]类。 请注意,它具有类型注释,因此我们可以从流客户端的角度对DStream实施类型安全。

我们将使用此自定义接收器来流式传输我们的应用程序之一通过套接字发送的订单。

通过网络传输的数据的结构如下所示:

1 5
1 1 2
2 1 1
2 1 1
4 1 1
2 2
1 2 2

我们首先接收订单ID和订单总额,然后接收订单的订单项。 第一个值是商品ID,第二个是订单ID(与订单ID值匹配),然后是商品成本。 在此示例中,我们有两个订单。 第一个只有四个项目,第二个只有一个项目。

这个想法是对我们的Spark应用程序隐藏所有这些内容,因此它在DStream上收到的是在流上定义的完整顺序,如下所示:

val orderStream: DStream[Order] = .....
val orderStream: DStream[Order] = .....

同时,我们还使用接收器来流式传输我们的自定义流式源。 即使它通过套接字发送数据,使用来自Spark的标准套接字流也将是相当复杂的,因为我们将无法控制数据的输入方式,并且会遇到在应用程序上遵守订单的问题。本身。 这可能非常复杂,因为一旦进入应用程序空间,我们便会并行运行,并且很难同步所有这些传入数据。 但是,在接收器空间中,很容易根据原始输入文本创建订单。

让我们看看我们的初始实现是什么样的。

case class Order(id: Int, total: Int, items: List[Item] = null)
case class Item(id: Int, cost: Int)class OrderReceiver(host: String, port: Int) extends Receiver[Order](StorageLevel.MEMORY_ONLY)  {override def onStart(): Unit = {println("starting...")val thread = new Thread("Receiver") {override def run() {receive() }}thread.start()}override def onStop(): Unit = stop("I am done")def receive() = ....
}
case class Order(id: Int, total: Int, items: List[Item] = null)
case class Item(id: Int, cost: Int)class OrderReceiver(host: String, port: Int) extends Receiver[Order](StorageLevel.MEMORY_ONLY)  {override def onStart(): Unit = {println("starting...")val thread = new Thread("Receiver") {override def run() {receive() }}thread.start()}override def onStop(): Unit = stop("I am done")def receive() = ....
}

我们的OrderReceiver扩展了Receiver [Order],它使我们可以在Spark内部存储Order(带注释的类型)。 我们还需要实现onStart()和onStop()方法。 请注意,onStart()创建一个线程,因此它是非阻塞的,这对于正确的行为非常重要。

现在,让我们看一下真正发生魔术的接收方法。

def receive() = {val socket = new Socket(host, port)var currentOrder: Order = nullvar currentItems: List[Item] = nullval reader = new BufferedReader(new InputStreamReader (socket.getInputStream(), "UTF-8"))while (!isStopped()) {var userInput = reader.readLine()if (userInput == null) stop("Stream has ended")else {val parts = userInput.split(" ")if (parts.length == 2) {if (currentOrder != null) {store(Order(currentOrder.id, currentOrder.total, currentItems))}currentOrder = Order(parts(0).toInt, parts(1).toInt)currentItems = List[Item]()}else {currentItems = Item(parts(0).toInt, parts(1).toInt) :: currentItems}}}}
def receive() = {val socket = new Socket(host, port)var currentOrder: Order = nullvar currentItems: List[Item] = nullval reader = new BufferedReader(new InputStreamReader (socket.getInputStream(), "UTF-8"))while (!isStopped()) {var userInput = reader.readLine()if (userInput == null) stop("Stream has ended")else {val parts = userInput.split(" ")if (parts.length == 2) {if (currentOrder != null) {store(Order(currentOrder.id, currentOrder.total, currentItems))}currentOrder = Order(parts(0).toInt, parts(1).toInt)currentItems = List[Item]()}else {currentItems = Item(parts(0).toInt, parts(1).toInt) :: currentItems}}}}

在这里,我们创建一个套接字并将其指向源,然后我们就可以简单地开始读取它,直到调度了stop命令,或者套接字上没有更多数据为止。 请注意,我们正在读取与之前定义的结构相同的结构(如何发送数据)。 完全阅读订单后,我们将调用store(…),以便将其保存到Spark中。

除了在我们的应用程序中使用接收器外,这里别无所要做:

val config = new SparkConf().setAppName("streaming")
val sc = new SparkContext(config)
val ssc = new StreamingContext(sc, Seconds(5))val stream: DStream[Order] = ssc.receiverStream(new OrderReceiver(port))
val config = new SparkConf().setAppName("streaming")
val sc = new SparkContext(config)
val ssc = new StreamingContext(sc, Seconds(5))val stream: DStream[Order] = ssc.receiverStream(new OrderReceiver(port))

请注意,我们是如何使用自定义OrderReceiver创建流的(仅为了清楚起见,对val流进行了注释,但这不是必需的)。 从现在开始,我们将流(DString [Order])用作我们在任何其他应用程序中使用的任何其他流。

stream.foreachRDD { rdd =>rdd.foreach(order => {println(order.id))              order.items.foreach(println)}}
stream.foreachRDD { rdd =>rdd.foreach(order => {println(order.id))              order.items.foreach(println)}}

摘要

在处理生成无尽数据的源时,Spark Streaming非常方便。 您可以使用与Spark SQL和系统中其他组件相同的API,但它也足够灵活,可以扩展以满足您的特定需求。

翻译自: https://www.javacodegeeks.com/2016/05/integrate-custom-data-sources-apache-spark.html

apache spark

apache spark_如何将自定义数据源集成到Apache Spark中相关推荐

  1. 如何将自定义数据源集成到Apache Spark中

    如今,流数据是一个热门话题,而Apache Spark是出色的流框架. 在此博客文章中,我将向您展示如何将自定义数据源集成到Spark中. Spark Streaming使我们能够从各种来源进行流传输 ...

  2. Flink程序加载数据源(3)自定义数据源(2)从Mysql 加载数据源

    Flink程序加载数据源(3)自定义数据源(2)从Mysql 加载数据源 ​ 上文引出了Flink程序自定义数据源的方法,我们来再次回顾下. ​ Flink还提供了数据源接口(抽象类),我们实现该接口 ...

  3. Flink程序加载数据源(3)自定义数据源(1)

    文章目录 代码实现 ① 准备环境 ② 获取数据源 ③ 从Mysql中获取数据源示例 ​ flink 可以从我们常用的各种DB.文件(HDFS/LOCAL).SCOKET.MQ等等-中加载数据,Flin ...

  4. 集成spark_全面对比,深度解析 Ignite 与 Spark

    经常有人拿 Ignite 和 Spark 进行比较,然后搞不清两者的区别和联系.Ignite 和 Spark,如果笼统归类,都可以归于内存计算平台,然而两者功能上虽然有交集,并且 Ignite 也会对 ...

  5. Ambari 自定义服务集成 | quicklinks 快速链接不显示的排查方案

    点击卡片"大数据实战演练",选择"设为星标"或"置顶" 回复"Ambari知识库"可领取独家整理的Ambari学习资料! ...

  6. 起源于 Kettle 的新一代数据集成平台 Apache Hop 成为 Apache 顶级项目

    Apache Hop(Hop Orchestration Platform 的首字母缩写)是一种数据编排(data orchestration )和数据工程平台(data engineering pl ...

  7. Spring自定义数据源配置不当引起的Mybatis拦截器Interceptors 失效/不生效

    目录 内容 Interceptor接口与@Intercepts注解 PageHelper实现拦截器 默认数据源与拦截器 自定义数据源与拦截器的问题 自定义数据源注入拦截器 内容 Interceptor ...

  8. Flink自带的Source源算子以及自定义数据源Source

    文章目录 Flink的DataStream API(基础篇) Source源算子 从集合中读取数据 从文件中读取数据 从Scoket中读取数据 从Kafka中读取数据 自定义Source Flink的 ...

  9. Win7下安装配置PHP+Apache+Mysql+PHPMyAdmin环境教程(非集成)

    =====================================Apache 服务安装 ========================================== Apache 服 ...

最新文章

  1. json key 命名规范_jsonapi
  2. 漫谈C++重载运算符
  3. 自己调试接口遇到的错误记录
  4. python 商城api编写_Python实现简单的API接口
  5. 笔记本电脑网络连接显示红叉_物联网设备的网络连接---上篇
  6. 机械+固态双硬盘时机械硬盘卡顿问题解决
  7. 洛阳地铁一号线无人驾驶_洛阳地铁第一个过街通道建成,地铁时代离洛阳人还远吗?...
  8. C++ 小游戏 视频及资料集(9)
  9. deepin更新启动项_Deepin修复启动项菜单---grub2启动修复
  10. FFS(快速文件系统)–Unix文件系统
  11. vmware设置虚拟机静态ip
  12. win10高危服务_win10系统禁用Update Orchestrator Service服务的操作方法
  13. 王兴:8年时间,我对商业的思考
  14. 网络web渗透工程师-教你怎么喝着茶,把甲方爸爸的活干了。
  15. Kindling the Darkness: A Practical Low-light Image Enhancer
  16. 2019伯克利中美峰会 | 2019峰会揭秘 峰会历程回顾 售票通道
  17. 数据库 - 逻辑结构设计
  18. python中while True的用处
  19. 正则匹配之正则匹配全部汇总:
  20. MongoDB数据库性能监控详解

热门文章

  1. YBTOJ洛谷P3209:平面图判定(2-SAT)
  2. YbtOJ#20078-[NOIP2020模拟赛B组Day7]路径之和【分治,Flody】
  3. P3899-[湖南集训]谈笑风生【主席树】
  4. ssl1335-最佳派对【二分图,最大匹配,图论】
  5. HDU 5008 Boring String Problem ( 后缀数组求本质不同第k大子串)
  6. 【期望DP】概率充电器(luogu 4284)
  7. codeforces 884E Binary Matrix 并查集,滚动数组
  8. Nacos(十)之Kubernetes Nacos
  9. Java自动化邮件中发送图表(三)之Highchart
  10. 扫盲,为什么分布式一定要有Redis?