sparkStreaming 是一种流处理框架,支持多种数据源和多种输出,是一中微批处理,
主要的数据结构是:DStream 离散数据流,由多个RDD组成,每一个微批都是一个RDD。
Spark Streaming 的入口需要单独创立,因为sparkSession中灭有整合:
创建如下:
val conf=new SparkConf().setMaster(“local[*]”).setAppName(“kgc streaming demo”)
val ssc=new StreamingContext(conf,Seconds(5))
注意:一个jvm中只有一个StreamingContext启动
StreamingContext停止后,不能在启动
使用scala 编写sparkStreaming程序:
Scoket数据源:

//local[n]  其中n要大于接受器的个数
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
//创建一个接收器
val lines = ssc.socketTextStream("localhost", 9999)//指定数据源
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
//开始
ssc.start()
//等待终止信号
ssc.awaitTermination()

sparkStreaming内建的流式数据:文件系统(不与接收器相关联)、Scoket、kafka、Flume等
文件系统数据源:

val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))// 创建FileInputDStream去读取文件系统上的数据
val lines = ssc.textFileStream("hdfs://hadoop131:9000/data")
//使用空格进行分割每行记录的字符串
val words = lines.flatMap(_.split(" "))
//类似于RDD的编程,将每个单词赋值为1,并进行合并计算
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

flume数据源
1、push的方式读取数据

val conf: SparkConf = new SparkConf().setAppName("flumedemo").setMaster("local[3]")val ssc = new StreamingContext(conf,Seconds(5))//push 方式  由主机推送数据给sparkStreaming   需要先启动sparkStreamingval flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(ssc,"hadoop131",5678)//flume 作为sparking streaming 的实时数据流  每一条数据是一个event 故此时形成的dStream中的数据是一个一个的event//event 有body  和headerflumeStream.map(x=>new String(x.event.getBody.array()).trim).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()ssc.start()ssc.awaitTermination()

2、poll的方式获取数据

val conf: SparkConf = new SparkConf().setAppName("flumedemo").setMaster("local[3]")val ssc = new StreamingContext(conf,Seconds(5))//poll方式  主动拉取数据,需要先启动flumeval flumeStream=FlumeUtils.createPollingStream(ssc,"hadoop131",5678)flumeStream.map(x=>new String(x.event.getBody.array()).trim).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()ssc.start()ssc.awaitTermination()

kafka数据源

  //设置主函数的参数  第一个是brokers  第二个是topics  可以使用逗号隔开 传入多个topics//sparkStreaming  可以一次性读取 kafka中的多个topic中的数据val Array(brokers, topics) = argsval sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")val ssc = new StreamingContext(sparkConf, Seconds(2))val topicsSet = topics.split(",").toSetval kafkaParams = Map[String, String]("bootstrap.servers" -> brokers)val messages = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topicsSet,kafkaParams))messages.map(_.value())       // 取出 value.flatMap(_.split(" "))   // 将字符串使用空格分隔.map(word => (word, 1))      // 每个单词映射成一个 pair.reduceByKey(_+_)    // 根据每个 key 进行累加.print()     // 打印前 10 个数据ssc.start()ssc.awaitTermination()

sparkStreaming基础知识整理相关推荐

  1. python常用变量名_python基础知识整理

    Python Python开发 Python语言 python基础知识整理 序言:本文简单介绍python基础知识的一些重要知识点,用于总结复习,每个知识点的具体用法会在后面的博客中一一补充程序: 一 ...

  2. 计算机二级c语基础知识,计算机二级C语基础知识整理.doc

    计算机二级C语基础知识整理 1.1 算法 算法:是一组有穷指令集,是解题方案的准确而完整的描述.通俗地说,算法就是计算机解题的过程.算法不等于程序,也不等于计算方法,程序的编制不可能优于算法的设计. ...

  3. 使用Aspose.Cells的基础知识整理

    使用Aspose.Cells的基础知识整理 转自 http://www.cnblogs.com/kenblove/archive/2009/01/07/1371104.html 这两天用Aspose. ...

  4. 前端基础知识整理汇总(中)

    前端基础知识整理汇总(中) Call, bind, apply实现 // call Function.prototype.myCall = function (context) {context = ...

  5. 前端基础知识整理汇总(上)

    前端基础知识整理汇总(上) HTML页面的生命周期 HTML页面的生命周期有以下三个重要事件: 1.DOMContentLoaded -- 浏览器已经完全加载了 HTML,DOM 树已经构建完毕,但是 ...

  6. centos7创建asm磁盘_Oracle ASM 磁盘组基础知识整理(收藏版)

    为什么要写这么一篇基础知识呢?还是有那么一点点原因的,不是胡编乱造还真是有真实存在的事件的,前两周里因一套生产环境数据库磁盘不足无法对其进行表空间扩容,需要向存储岗申请存储资源,当存储岗划好资源加完存 ...

  7. Web前端基础知识整理

    1. 前端基础知识 文件分类 XML(扩展标记语言) 装载有格式的数据信息,用于各个框架和技术的配置文件描述 特点: 扩展名为.xml 内容区分大小写 标签要成对出现,形成容器,只能有一个 标签按正确 ...

  8. Kali Linux渗透基础知识整理(四):维持访问

    Kali Linux渗透基础知识整理系列文章回顾 维持访问 在获得了目标系统的访问权之后,攻击者需要进一步维持这一访问权限.使用木马程序.后门程序和rootkit来达到这一目的.维持访问是一种艺术形式 ...

  9. 矩阵论(零):线性代数基础知识整理(1)——逆矩阵、(广义)初等变换、满秩分解

    矩阵论专栏:专栏(文章按照顺序排序) 线性代数是矩阵论的先修课程,本篇博客整理线性代数的基础理论知识,为矩阵论的学习做准备.限于篇幅,梳理的重点将在定理和结论上(只给出部分必要的定义),对最基础的概念 ...

  10. CSP-S初赛基础知识整理

    文章目录 CSP-S初赛基础知识整理 RT [1]计算机基础知识 计算机系统的组成 计算机硬件的五大组成 [1-2]进制及其转化和运算 [1-2]二进制 [1]基本定义及应用 [1]基本运算 [2]位 ...

最新文章

  1. Android备份和添加短信
  2. 使sqoop能够启用压缩的一些配置
  3. Java中String类的方法及说明
  4. 免费开放阅读 | 数据库管理系统的事务原理(上)
  5. python3 协程 写法_理解Python的协程(Coroutine)
  6. python绘制图像直方图_Python – 计算图像的直方图
  7. 机器学习与计算机视觉(数据集的选择)
  8. github不支持html,为什么Github页面不允许我有效的HTML?
  9. wls12C启用Gzip
  10. 向日葵Gantt支持的XML 数据结构
  11. 介绍几种jquery ui使用方法
  12. 一位清华在校生的报告
  13. linux通过端口测试网速,【转】Linux下测试网速的工具
  14. Kali中MSF利用永恒之蓝(复现、演示)
  15. python入门教程 傻瓜_python傻瓜教程
  16. Android 4.0.4-在build.prop中添加属性
  17. 骁龙660是32位还是64位_高通骁龙手机cpu64位比32位有什么优势?
  18. STM32 F7的MAC层过滤使用+实例代码
  19. 【CE入门教程】使用Cheat Engine(CE)查找“扫雷”中“雷数”、“旗子”、“笑脸”和“计时器”的内存地址以及“初级”、“中级”和“高级”的棋盘内存地址范围
  20. 专利的保护期限是多久?过期了应该如何处理?

热门文章

  1. 计算机云班课王清答案,基于蓝墨云班课的移动学习实践
  2. 基于HI600R的差分GPS设搭建过程介绍
  3. 首都师范 博弈论 6 3 2子博弈完美均衡 蜈蚣博弈
  4. t检验和wilcoxon秩和检验 判断两组数据间的显著性差异
  5. docker(9):高级网络配置
  6. 当心Excel中的嵌套IF公式
  7. Unity ipad UI适配
  8. myeclipse创建web-project没有WebRoot文件夹
  9. chrome打不开axure的html文件解决方法
  10. 如何讲好一个故事(6个要素)