SparkStreaming是7*24h不间断运行,底层操作的是DStream。

读取Socket数据过程

1.SparkStreaming启动后,首先启动一个job,这个job有一个task一直接收数据。
2.task每隔一段时间(batchInterval)就把接收来的数据封装到batch中。生成的每个batch又被封装到一个RDD中,这个RDD又被封装到DStream中。SparkStreaming底层操作的就是DStream
3.DStream有自己的Transformation算子,懒执行需要DStream的outputOperator类算子来触发执行。
4.生成DStream之后,还需启动Job处理DStream

注意:

  • 1.batchInterval表示接收数据的延迟度,可以设置。
  • 2.(集群接收一批次数据的时间 > 处理时间)假如batchInterval=5s,生成一个DStream的时间5s,集群处理一批数据的时间=3s,0-5s集群一直接收数据,5-8s一边接受数据、一边处理数据,8~10s只接受数据…这样每一批次都会造成集群“休息”2s,集群资源不能充分利用。
  • 3.(集群接收一批次数据的时间 < 处理时间)如果集群处理一批次的时间是8s,0-5s一边接受一边处理,10-13s一边接受、一边处理…这样批次数据会越堆越多。如果接收来的数据放内存,会OOM。如果内存不足放Disk,也会加大数据处理的延迟度。
  • 4.最好状态:batchInterval=5s,集群处理一批数据=5s。

读取Socket数据的代码

1.需要设置local[2],因为一个线程是读取数据,一个线程是处理数据
2.创建StreamingContext有两种方式:val ssc = new StreamingContext(SparkConf ,Durations.seconds(5)) / val ssc = new StreamingContext(SparkContext,Durations.seconds(5))
3.Durations 批次间隔时间的设置需要根据集群的资源情况以及监控每一个job的执行时间来调节出最佳时间
4.SparkStreaming所有业务处理完成之后需要有一个output operato操作
5.StreamingContext.start()straming框架启动之后是不能再次添加业务逻辑
6.StreamingContext.stop()无参的stop方法会将sparkContext一同关闭,stop(false) ,默认为true,会一同关闭
7.StreamingContext.stop()停止之后是不能在调用start

val conf = new SparkConf()
conf.setMaster("local[2]") //1个task接收,1个task处理数据
conf.setAppName("test")
val ssc = new StreamingContext(conf,Durations.seconds(5))val lines = ssc.socketTextStream("node4",9999)
val words = lines.flatMap(one => {one.split(" ")
})
val pairWords = words.map(one=>{(one,1)})
val result = pairWords.reduceByKey(_+_)
result.print(100)ssc.start()
ssc.awaitTermination()ssc.stop()

SparkStreaming读取Socket数据相关推荐

  1. DStream操作实战:1.SparkStreaming接受socket数据,实现单词计数WordCount

    package cn.testdemo.dstream.socket     import org.apache.spark.{SparkConf, SparkContext}     import ...

  2. SparkStreaming读取Kakfa数据时发生OffsetOutOfRangeException异常

    参考文章:http://www.jianshu.com/p/791137760c14 运行SparkStreming程序一段时间后,发现产生了异常: ERROR JobScheduler: Error ...

  3. sparkstreaming 读取mysql_SparkStreaming读取Kafka的两种方式

    本文主要从以下几个方面介绍SparkStreaming读取Kafka的两种方式: 一.SparkStreaming简介 二.Kafka简介 三.Redis简介(可用于保存历史数据或偏移量数据) 四.S ...

  4. sparkStreaming流式处理,接受socket数据,实现单词统计并且每个批次数据结果累加

    package com.spark.streamingimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} ...

  5. SparkStreaming读取Kafka的Json数据然后保存到MySQL

    一般我们使用SparkStreaming消费kafka数据,获取到数据后解析,使用JDBC的方式写入数据库,如下所示. 以上的方式没什么毛病,但是当我们消费的kafka数据类型比较多样的时候,我们需要 ...

  6. windows nginx c++读取请求数据_震撼!全网第一张源码分析全景图揭秘Nginx

    不管是C/C++技术栈,还是PHP,Java技术栈,从事后端开发的朋友对nginx一定不会陌生. 想要深入学习nginx,阅读源码一定是非常重要的一环,但nginx源码量毕竟还是不算少,一不小心就容易 ...

  7. C# 三种方式实现Socket数据接收(经典)

    Stream.Read 方法 当在派生类中重写时,从当前流读取字节序列,并将此流中的位置提升读取的字节数. 语法: public abstract int Read(byte[] buffer, in ...

  8. python读取二进制数据转整形,在python中读取二进制数据(替换C代码)

    我正在写一个python程序来代替C程序,它从微控制器接收数据.这是在C语言中使用一个简单的socket和read函数完成的.在我的python程序中,我可以从微控制器读取一系列数据,但我似乎无法将其 ...

  9. java实现socket连接,向指定主机指定端口发送socket数据,并获取响应数据

    全栈工程师开发手册 (作者:栾鹏) java教程全解 java实现socket连接,向指定主机指定端口发送socket数据,并获取响应数据 测试代码 public static void main(S ...

最新文章

  1. C++类的案例(一)
  2. OpenCV中的TermCriteria模板类
  3. MySQL集群节点宕机,数据库脑裂!如何排障?
  4. JS创建对象学习笔记
  5. vscode里好用的html插件_知乎口碑最好的六大PPT插件分享!用好插件,效率倍增...
  6. sql 中实现打乱数据的排序
  7. 蒙特卡洛法求圆周率100亿数据
  8. MongoDB的基本shell操作(三)
  9. linux下使用dd命令制作ubuntu的u盘启动,Ubuntu使用dd命令制作U盘系统启动盘
  10. cron表达式 每天0点10分和30分_揭开考研阅卷的内幕,注意这些多得20分!
  11. 解决VsCode中C程序无法键盘输入的问题
  12. spring cron表达式(定时器)
  13. shell foreach 拼接字符串_FIND_IN_SET 及IN 处理逗号间隔的字符串参数
  14. java中常量有初始化值吗,Java基础_变量、常量
  15. 完整的支付系统整体架构
  16. Linux怎么有两个vmdk文件,「Linux」- 挂载 VMDK 文件
  17. R语言学习之深圳市空气质量分析
  18. dp光纤线传输距离既然超过百米之长?
  19. 玩乐购与京东天猫深度合作 打造云购全网最低价
  20. Form表单之get提交与post提交

热门文章

  1. 软件体系架构阅读笔记一
  2. python数据库-mysql
  3. 东芝发布15nm SG5固态硬盘 容量高达1TB
  4. vmware克隆server2008R2造成SID冲突
  5. 《响应式Web设计全流程解析》一1.2 静态设计稿舒适区
  6. 每天首次登陆记录设备信息
  7. JS中定义式函数与变量时函数的差别
  8. 【HDOJ】1058 Humble Numbers
  9. war包部署到tomcat的疑问
  10. HK Openstack Summit 归来有感