摘要:

  Sprak Streaming属于Saprk API的扩展,支持实时数据流(live data streams)的可扩展,高吞吐(hight-throughput) 容错(fault-tolerant)的流处理。可以接受来自KafKa,Flume,ZeroMQ KinesisTwitter或TCP套接字的数据源,处理的结果数据可以存储到文件系统 数据库 现场dashboards等。

  DStream编程模型

  Dstream是Spark streaming中的高级抽象连续数据流,这个数据源可以从外部获得(如KafKa Flume等),也可以通过输入流获得,还可以通过在其他DStream上进行高级操作创建,DStream是通过一组时间序列上连续的RDD表示的,所以一个DStream可以看作是一个RDDs的序列。

  DStream操作

  1.套接字流:通过监听Socket端口来接收数据。

  通过Scala编写程序来产生一系列的字符作为输入流:

  GenerateChar:

  object GenerateChar {

  def generateContext(index : Int) : String = {

  import scala.collection.mutable.ListBuffer

  val charList = ListBuffer[Char]()

  for(i - 65 to 90)

  charList += i.toChar

  val charArray = charList.toArray

  charArray(index).toString

  }

  def index = {

  import java.util.Random

  val rdm = new Random

  rdm.nextInt(7)

  }

  def main(args: Array[String]) {

  val listener = new ServerSocket(9998)

  while(true){

  val socket = listener.accept()

  new Thread(){

  override def run() = {

  println(Got client connected from :+ socket.getInetAddress)

  val out = new PrintWriter(socket.getOutputStream,true)

  while(true){

  Thread.sleep(500)

  val context = generateContext(index) //产生的字符是字母表的前七个随机字母

  println(context)

  out.write(context + '\n')

  out.flush()

  }

  socket.close()

  }

  }.start()

  }

  }

  }

  ScoketStreaming:

  object ScoketStreaming {

  def main(args: Array[String]) {

  //创建一个本地的StreamingContext,含2个工作线程

  val conf = new SparkConf().setMaster(local[2]).setAppName(ScoketStreaming)

  val sc = new StreamingContext(conf,Seconds(10)) //每隔10秒统计一次字符总数

  //创建珍一个DStream,连接master:9998

  val lines = sc.socketTextStream(master,9998)

  val words = lines.flatMap(_.split( ))

  val wordCounts = words.map(x = (x , 1)).reduceByKey(_ + _)

  wordCounts.print()

  sc.start() //开始计算

  sc.awaitTermination() //通过手动终止计算,否则一直运行下去

  }

  }

  运行结果:

  GenerateChar产生的数据如下:

  Got client connected from :/192.168.31.128

  ScoketStreaming运行结果:

  -------------------------------------------

  Time: 1459426750000 ms

  -------------------------------------------

  (B,1)

  (G,1)

  (C,1)

  -------------------------------------------

  Time: 1459426760000 ms

  -------------------------------------------

  (B,5)

  (F,3)

  (D,4)

  (G,3)

  (C,3)

  (E,1)

  注意:如果是在本地运行的,setMaster的参数必须为local[n],n 1,官网解释:

  When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either ofthese means that only one thread

  will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single

  thread will be used to run the receiver,leaving no thread for processing the received data.

  当在本地运行Spark Streaming程序时,Master的URL不能设置为local或local[1],这两种设置都意味着你将会只有一个线程来运行作业,如果你的Input DStream基于一个接收器

  (如Kafka,Flume等),那么只有一个线程来接收数据,而没有多余的线程来处理接收到的数据。

  如果是在集群上运行,为Spark streaming应分配的核数应该在大于接收器的数据,否则同样只接收了数据而没有能力处理。

  2.文件流:Spark Streaming通过监控文件系统的变化,若有新文件添加,则将它读入并作为数据流

  需要注意的是:

  1.这些文件具有相同的格式

  2.这些文件通过原子移动或重命名文件的方式在dataDirectory创建

  3.一旦移动这些文件,就不能再进行修改,如果在文件中追加内容,这些追加的新数据也不会被读取。

  FileStreaming:

  object FileStreaming {

  def main(args: Array[String]) {

  val conf = new SparkConf().setMaster(local).setAppName(FileStreaming)

  val sc = new StreamingContext(conf,Seconds(5))

  val lines = sc.textFileStream(/home/hadoop/wordCount)

  val words = lines.flatMap(_.split( ))

  val wordCounts = words.map(x = (x , 1)).reduceByKey(_ + _)

  sc.start()

  sc.awaitTermination()

  }

  }

  当你在文件目录里添加文件时,SparkStreaming就会自动帮你读入并计算 ,可以读取本地目录 HDFS和其他文件系统。

  注意:文件流不需要运行接收器,所以不需要分配核数

  3.RDD队列流:使用streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream,用于调试Spark Streaming应用程序。

  QueueStream:程序每隔1秒就创建一个RDD,Streaming每隔1秒就就对数据进行处理

  object QueueStream {

  def main(args: Array[String]) {

  val conf = new SparkConf().setMaster(local[2]).setAppName(queueStream)

  //每1秒对数据进行处理

  val ssc = new StreamingContext(conf,Seconds(1))

  //创建一个能够push到QueueInputDStream的RDDs队列

  val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()

  //基于一个RDD队列创建一个输入源

  val inputStream = ssc.queueStream(rddQueue)

  val mappedStream = inputStream.map(x = (x % 10,1))

  val reduceStream = mappedStream.reduceByKey(_ + _)

  reduceStream.print

  ssc.start()

  for(i - 1 to 30){

  rddQueue += ssc.sparkContext.makeRDD(1 to 100, 2) //创建RDD,并分配两个核数

  Thread.sleep(1000)

  }

  ssc.stop()

  }

  }

  

转载于:https://juejin.im/post/5c35b1086fb9a049e3084a21

Spark Streaming--实战篇相关推荐

  1. Spark Streaming 实战案例(一)

    本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html#mllib-opera ...

  2. Spark Streaming 实战案例(五) Spark Streaming与Kafka

    主要内容 Spark Streaming与Kafka版的WordCount示例(一) Spark Streaming与Kafka版的WordCount示例(二) 1. Spark Streaming与 ...

  3. Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制

    主要内容 Spark Stream 缓存 Checkpoint 案例 1. Spark Stream 缓存 通过前面一系列的课程介绍,我们知道DStream是由一系列的RDD构成的,它同一般的RDD一 ...

  4. Spark Streaming 实战案例(二) Transformation操作

    本节主要内容 本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html DStream Tra ...

  5. 倾情大奉送--Spark入门实战系列

    这一两年Spark技术很火,自己也凑热闹,反复的试验.研究,有痛苦万分也有欣喜若狂,抽空把这些整理成文章共享给大家.这个系列基本上围绕了Spark生态圈进行介绍,从Spark的简介.编译.部署,再到编 ...

  6. Spark项目实战:大数据实时流处理日志(非常详细)

    实战概览 一.实战内容 二.大数据实时流处理分析系统简介 1.需求 2.背景及架构 三.实战所用到的架构和涉及的知识 1.后端架构 2.前端框架 四.项目实战 1.后端开发实战 1.构建项目 2.引入 ...

  7. Spark Streaming 流式计算实战

    这篇文章由一次平安夜的微信分享整理而来.在Stuq 做的分享,原文内容. 业务场景 这次分享会比较实战些.具体业务场景描述: 我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名 ...

  8. Spark Streaming揭秘 Day13 数据安全容错(Driver篇)

    Spark Streaming揭秘 Day13 数据安全容错(Driver篇) 书接上回,首先我们要考虑的是在Driver层面,有哪些东西需要维持状态,只有在需要维持状态的情况下才需要容错,总的来说, ...

  9. 编程实现将rdd转换为dataframe:源文件内容如下(_第四篇|Spark Streaming编程指南(1)

    Spark Streaming是构建在Spark Core基础之上的流处理框架,是Spark非常重要的组成部分.Spark Streaming于2013年2月在Spark0.7.0版本中引入,发展至今 ...

  10. Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍

    [注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Sp ...

最新文章

  1. 【论文理解】ArcFace: Additive Angular Margin Loss for Deep Face Recognition(InsightFace)
  2. 编程语言python入门要电脑什么配置能带动-对于几乎是零基础的人,直接学 Python 编程合适吗?...
  3. 宜昌远安谋定功能性-农业大健康·万祥军:绿色和谐新路
  4. VTK:Math之LeastSquares
  5. Flink countWindow窗口
  6. JBoss 7.1.1启动时遇到Address already in use: bind /127.0.0.1:9990的处理办法
  7. 亚洲新首富出炉!富豪榜单大洗牌,马云3年来首次跌出中国前三
  8. vs2008调试c语言,VS2008调试Release程序-Dump文件方式_C/C++技术分享_看流星社区 www.kanliuxing.com...
  9. 【招聘内推】猎聘网招聘推荐算法工程师
  10. ROM PROM EPROM EEPROM FLASH(NAND、NOR)
  11. Red Hat Linux 启动流程图
  12. 使用自己的服务器中转远程桌面
  13. 代码检查工具系列——CheckStyle
  14. 基于Pipeline的CI/CD在趣头条的应用实践
  15. 微分几何 Class 3 曲线,曲率与挠率
  16. win7系统调整屏幕刷新率方法
  17. 计算机图形直线分析,计算机图形学 直线反走样Wu算法(4)
  18. 鹏保宝 v7.1.0 官方版
  19. 今日头条用户搜索“室内设计”显示的自媒体粉丝数量及分布情况统计(2020.1.8)
  20. Visual Studio 匹配花括号的背景颜色

热门文章

  1. 快捷简易统计图表模型设计与实现
  2. studio添加依赖工程方法
  3. (0079)iOS开发之安全策略之HTTPS(1)
  4. [ Luogu 3924 ] 康纳的线段树
  5. django 整理一
  6. JavaSE_坚持读源码_ClassLoader对象_Java1.7
  7. 还是来说class,什么鬼,类会生宝宝
  8. Java学习笔记之[ 利用扫描仪Scanner进行数据输入 ]
  9. HDu 3449 (有依赖的01背包) Consumer
  10. ORACLE 如何查询被锁定表及释放session