原文地址:http://www.infoq.com/cn/articles/spark-sreaming-practice

本篇文章用Spark Streaming +Hbase为列,Spark Streaming专为流式数据处理,对Spark核心API进行了相应的扩展。

什么是Spark Streaming?

首先,什么是流式处理呢?数据流是一个数据持续不断到达的无边界序列集。流式处理是把连续不断的数据输入分割成单元数据块来处理。流式处理是一个低延迟的处理和流式数据分析。Spark Streaming对Spark核心API进行了相应的扩展,支持高吞吐、低延迟、可扩展的流式数据处理。实时数据处理应用的场景有下面几个:

  • 网站监控和网络监控;
  • 异常监测;
  • 网页点击;
  • 广告数据;

物联网(IOT)

图1

Spark Streaming支持的数据源包括HDFS文件,TCP socket,Kafka,Flume,Twitter等,数据流可以通过Spark核心API、DataFrame SQL或者机器学习API处理,并可以持久化到本地文件、HDFS、数据库或者其它任意支持Hadoop输出格式的形式。

Spark Streaming如何工作?

Spark Streaming以X秒(batch size)为时间间隔把数据流分割成Dstream,组成一个RDD序列。你的Spark应用处理RDD,并把处理的结果批量返回。

图2

Spark Streaming例子的架构图


图3

Spark Streaming例子代码分下面几部分:
- 读取流式数据;
- 处理流式数据;
- 写处理结果倒Hbase表。

Spark处理部分的代码涉及到如下内容:

  • 读取Hbase表的数据;
  • 按天计算数据统计;
  • 写统计结果到Hbase表,列簇:stats。

数据集

数据集来自油泵信号数据,以CSV格式存储在指定目录下。Spark Streaming监控此目录,CSV文件的格式如图3。

图4

采用Scala的case class来定义数据表结构,parseSensor函数解析逗号分隔的数据。

Hbase表结构

流式处理的Hbase表结构如下:

  • 油泵名字 + 日期 + 时间戳 组合成row key;
  • 列簇是由输入数据列、报警数据列等组成,并设置过期时间。
  • 每天等统计数据表结构如下:
  • 油泵名和日期组成row key;

列簇为stats,包含列有最大值、最小值和平均值;

图5

配置写入Hbase表

Spark直接用TableOutputFormat类写数据到Hbase里,跟在MapReduce中写数据到Hbase表一样,下面就直接用TableOutputFormat类了。

Spark Streaming代码

Spark Streaming的基本步骤:

  • 初始化Spark StreamingContext对象;
  • 在DStream上进行transformation操作和输出操作;
  • 开始接收数据并用streamingContext.start();
  • 等待处理停止,streamingContext.awaitTermination()。

初始化Spark StreamingContext对象

创建 StreamingContext对象,StreamingContext是Spark Streaming处理的入口,这里设置2秒的时间间隔。

val sparkConf = new SparkConf().setAppName("HBaseStream")
// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sparkConf, Seconds(2))

接下来用StreamingContext的textFileStream(directory)创建输入流跟踪Hadoop文件系统的新文件,并处理此目录下的所有文件,这里directory指文件目录。

// create a DStream that represents streaming data from a directory source
val linesDStream = ssc.textFileStream("/user/user01/stream")

linesDStream是数据流,每条记录是按行记录的text格式。

图6

对DStream进行transformation操作和输出操作

接下来进行解析,对linesDStream进行map操作,map操作是对RDD应用Sensor.parseSensor函数,返回Sensor的RDD。

// parse each line of data in linesDStream into sensor objects
val sensorDStream = linesDStream.map(Sensor.parseSensor)


图7

对DStream的每个RDD执行foreachRDD 方法,使用filter过滤Sensor中低psi值来创建报警,使用Hbase的Put对象转换sensor和alter数据以便能写入到Hbase。然后使用PairRDDFunctions的saveAsHadoopDataset方法将最终结果写入到任何Hadoop兼容到存储系统。

// for each RDD. performs function on each RDD in DStream
sensorRDD.foreachRDD { rdd =>
// filter sensor data for low psi val alertRDD = rdd.filter(sensor => sensor.psi < 5.0) // convert sensor data to put object and write to HBase Table CF data rdd.map(Sensor.convertToPut).saveAsHadoopDataset(jobConfig) // convert alert to put object write to HBase Table CF alerts rdd.map(Sensor.convertToPutAlert).saveAsHadoopDataset(jobConfig) }

sensorRDD经过Put对象转换,然后写入到Hbase。

图8

开始接收数据

通过streamingContext.start()显式的启动数据接收,然后调用streamingContext.awaitTermination()来等待计算完成。

// Start the computationssc.start()// Wait for the computation to terminate ssc.awaitTermination()

Spark读写Hbase

现在开始读取Hbase的sensor表,计算每条的统计指标并把对应的数据写入stats列簇。

图9

下面的代码读取Hbase的sensor表psi列数据,用StatCounter计算统计数据,然后写入stats列簇。

// configure HBase for reading val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName) // scan data column family psi column conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi") // Load an RDD of (row key, row Result) tuples from the table val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) // transform (row key, row Result) tuples into an RDD of Results val resultRDD = hBaseRDD.map(tuple => tuple._2) // transform into an RDD of (RowKey, ColumnValue)s , with Time removed from row key val keyValueRDD = resultRDD. map(result => (Bytes.toString(result.getRow()). split(" ")(0), Bytes.toDouble(result.value))) // group by rowkey , get statistics for column value val keyStatsRDD = keyValueRDD. groupByKey(). mapValues(list => StatCounter(list)) // convert rowkey, stats to put and write to hbase table stats column family keyStatsRDD.map { case (k, v) => convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)

下面的流程图显示newAPIHadoopRDD输出,(row key,result)的键值对。PairRDDFunctions 的saveAsHadoopDataset方法把Put对象存入到Hbase。

图10

运行Spark Streaming应用

运行Spark Streaming应用跟运行Spark应用类似,比较简单,此处不赘述,参见Spark Streaming官方文档。

转载于:https://www.cnblogs.com/davidwang456/p/5488195.html

用实例讲解Spark Sreaming--转相关推荐

  1. 实例讲解spark在京东智能供应链预测系统的应用

    问题导读: 1. 京东的供应链是什么样的呢? 2. 预测技术在京东的供应链起着什么样的作用呢? 3. 京东整个预测系统的架构是什么样的呢? 4. 预测系统不同层面的技术选型分别为什么? 5. 预测系统 ...

  2. scala写入mysql_spark rdd转dataframe 写入mysql的实例讲解

    dataframe是在spark1.3.0中推出的新的api,这让spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,据说计算性能更还快了两倍.spark在离线批处理或者 ...

  3. java实现页面高效刷新_selenium高效应对Web页面元素刷新的实例讲解

    当我们在页面上进行selenium.type()或者selenium.click()操作的时候,往往需要需要等待一个元素的出现,对于一般的网页,当我们进入一个新页面的时候,往往会使用selenium. ...

  4. php 返回一个json对象,PHP给前端返回一个JSON对象的实例讲解

    解决问题:用php做后台时,如何给前端发起的AJAX请求返回一个JSON格式的"对象": 说明:我本身是一个前端,工作久了之后发现要是不掌握一门后端开发语言的话,总感觉有点无力.最 ...

  5. python简易版实例_Python3之简单搭建自带服务器的实例讲解

    WEB开发,我们先从搭建一个简单的服务器开始,Python自带服务模块,且python3相比于python2有很大不同, 在Python2.6版本里,/usr/bin/lib/python2.6/ 目 ...

  6. 手摸手教你数据可视化!(附实例讲解)

    ↑↑↑关注后"星标"Datawhale 每日干货 & 每月组队学习,不错过 Datawhale干货 作者:CrescentAI,华南理工大学,Datawhale优秀学习者 ...

  7. 【Python基础】手把手教你数据可视化!(附实例讲解)

    点击上方"小白学视觉",选择加"星标"或"置顶" 重磅干货,第一时间送达 作者:CrescentAI,华南理工大学,Datawhale优秀学 ...

  8. php脚本函数,PHP执行系统命令函数实例讲解

    命令注入 命令注入(Command Injection),对一些函数的参数没有做过滤或过滤不严导致的,可以执行系统或者应用指令(CMD命令或者 bash 命令)的一种注入攻击手段. 常见的执行系统命令 ...

  9. 实例讲解之校园网病毒该如何铲除

    实例讲解之校园网病毒该如何铲除 转载于:https://blog.51cto.com/wilsondy/163994

最新文章

  1. Python 为什么不支持 i++ 自增语法,不提供 ++ 操作符?
  2. Windows,远程计算机:X.X.X.X,这可能是由于CredSSP加密Oracle修正
  3. Failed to install VS Code update.
  4. java 算法优化向导
  5. Mybatis处理表关联(懒加载)
  6. 信息学奥赛一本通(1251:仙岛求药)
  7. OpenVINO主要工作流程
  8. 罗永浩带货520鲜花礼盒再翻车:自掏腰包100多万,双倍赔偿
  9. linux查看进程命令,linux查询指定进程命令
  10. web.xml/servlet过滤器之压缩UrlRewriteFilter
  11. 第十章 Scala 容器基础(二十二):合并有序集合
  12. 触摸屏与单片机通讯C语言程序,讲述如何实现单片机与触摸屏的通信
  13. 用caffe框架做号牌识别笔记
  14. 计算机关机快捷键是什么,win7关机快捷键是什么
  15. 用Excel和OutLook实现自动批量发邮件
  16. 非Build Rebuild--Compilation of Maven projects is supported only if external build is started from an
  17. 超快速!10分钟入门Keras指南
  18. 音视频系列1:流媒体
  19. jq 之 download下载图片或文件功能,以及一个神奇的download属性!
  20. 第十一章无线渗透 理论篇

热门文章

  1. php+方法返回多个参数,PHP中调用外部程序,及其参数与返回值
  2. mysql分析表锁,MySQL锁分析和监控
  3. oracle mysql 付费_oracle数据库要钱的吗?
  4. 文本编辑器中实现设置工具栏和状态栏可见性的功能
  5. 计算机组成知识试题及答案,《计算机组成与结构复习题及答案.doc
  6. 查看mysql view作用_Mysql中View视图的作用
  7. linux resin 自动启动不了,Resin 安装-配置-自启动-Linux
  8. linux find -size参数,Linux find 常用命令
  9. RecyclerView控件实现横向滚动和瀑布流布局,以及RecyclerView的点击监听(项目已上传GitHub)
  10. 启动php服务命令,启动|停止服务