SparkStreaming读取Socket数据
SparkStreaming是7*24h不间断运行,底层操作的是DStream。
读取Socket数据过程
1.SparkStreaming启动后,首先启动一个job,这个job有一个task一直接收数据。
2.task每隔一段时间(batchInterval)就把接收来的数据封装到batch中。生成的每个batch又被封装到一个RDD中,这个RDD又被封装到DStream中。SparkStreaming底层操作的就是DStream
3.DStream有自己的Transformation算子,懒执行需要DStream的outputOperator类算子来触发执行。
4.生成DStream之后,还需启动Job处理DStream
注意:
- 1.batchInterval表示接收数据的延迟度,可以设置。
- 2.(集群接收一批次数据的时间 > 处理时间)假如batchInterval=5s,生成一个DStream的时间5s,集群处理一批数据的时间=3s,0-5s集群一直接收数据,5-8s一边接受数据、一边处理数据,8~10s只接受数据…这样每一批次都会造成集群“休息”2s,集群资源不能充分利用。
- 3.(集群接收一批次数据的时间 < 处理时间)如果集群处理一批次的时间是8s,0-5s一边接受一边处理,10-13s一边接受、一边处理…这样批次数据会越堆越多。如果接收来的数据放内存,会OOM。如果内存不足放Disk,也会加大数据处理的延迟度。
- 4.最好状态:batchInterval=5s,集群处理一批数据=5s。
读取Socket数据的代码
1.需要设置local[2],因为一个线程是读取数据,一个线程是处理数据
2.创建StreamingContext有两种方式:val ssc = new StreamingContext(SparkConf ,Durations.seconds(5)) / val ssc = new StreamingContext(SparkContext,Durations.seconds(5))
3.Durations 批次间隔时间的设置需要根据集群的资源情况以及监控每一个job的执行时间来调节出最佳时间
4.SparkStreaming所有业务处理完成之后需要有一个output operato操作
5.StreamingContext.start()straming框架启动之后是不能再次添加业务逻辑
6.StreamingContext.stop()无参的stop方法会将sparkContext一同关闭,stop(false) ,默认为true,会一同关闭
7.StreamingContext.stop()停止之后是不能在调用start
val conf = new SparkConf()
conf.setMaster("local[2]") //1个task接收,1个task处理数据
conf.setAppName("test")
val ssc = new StreamingContext(conf,Durations.seconds(5))val lines = ssc.socketTextStream("node4",9999)
val words = lines.flatMap(one => {one.split(" ")
})
val pairWords = words.map(one=>{(one,1)})
val result = pairWords.reduceByKey(_+_)
result.print(100)ssc.start()
ssc.awaitTermination()ssc.stop()
SparkStreaming读取Socket数据相关推荐
- DStream操作实战:1.SparkStreaming接受socket数据,实现单词计数WordCount
package cn.testdemo.dstream.socket import org.apache.spark.{SparkConf, SparkContext} import ...
- SparkStreaming读取Kakfa数据时发生OffsetOutOfRangeException异常
参考文章:http://www.jianshu.com/p/791137760c14 运行SparkStreming程序一段时间后,发现产生了异常: ERROR JobScheduler: Error ...
- sparkstreaming 读取mysql_SparkStreaming读取Kafka的两种方式
本文主要从以下几个方面介绍SparkStreaming读取Kafka的两种方式: 一.SparkStreaming简介 二.Kafka简介 三.Redis简介(可用于保存历史数据或偏移量数据) 四.S ...
- sparkStreaming流式处理,接受socket数据,实现单词统计并且每个批次数据结果累加
package com.spark.streamingimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} ...
- SparkStreaming读取Kafka的Json数据然后保存到MySQL
一般我们使用SparkStreaming消费kafka数据,获取到数据后解析,使用JDBC的方式写入数据库,如下所示. 以上的方式没什么毛病,但是当我们消费的kafka数据类型比较多样的时候,我们需要 ...
- windows nginx c++读取请求数据_震撼!全网第一张源码分析全景图揭秘Nginx
不管是C/C++技术栈,还是PHP,Java技术栈,从事后端开发的朋友对nginx一定不会陌生. 想要深入学习nginx,阅读源码一定是非常重要的一环,但nginx源码量毕竟还是不算少,一不小心就容易 ...
- C# 三种方式实现Socket数据接收(经典)
Stream.Read 方法 当在派生类中重写时,从当前流读取字节序列,并将此流中的位置提升读取的字节数. 语法: public abstract int Read(byte[] buffer, in ...
- python读取二进制数据转整形,在python中读取二进制数据(替换C代码)
我正在写一个python程序来代替C程序,它从微控制器接收数据.这是在C语言中使用一个简单的socket和read函数完成的.在我的python程序中,我可以从微控制器读取一系列数据,但我似乎无法将其 ...
- java实现socket连接,向指定主机指定端口发送socket数据,并获取响应数据
全栈工程师开发手册 (作者:栾鹏) java教程全解 java实现socket连接,向指定主机指定端口发送socket数据,并获取响应数据 测试代码 public static void main(S ...
最新文章
- C++类的案例(一)
- OpenCV中的TermCriteria模板类
- MySQL集群节点宕机,数据库脑裂!如何排障?
- JS创建对象学习笔记
- vscode里好用的html插件_知乎口碑最好的六大PPT插件分享!用好插件,效率倍增...
- sql 中实现打乱数据的排序
- 蒙特卡洛法求圆周率100亿数据
- MongoDB的基本shell操作(三)
- linux下使用dd命令制作ubuntu的u盘启动,Ubuntu使用dd命令制作U盘系统启动盘
- cron表达式 每天0点10分和30分_揭开考研阅卷的内幕,注意这些多得20分!
- 解决VsCode中C程序无法键盘输入的问题
- spring cron表达式(定时器)
- shell foreach 拼接字符串_FIND_IN_SET 及IN 处理逗号间隔的字符串参数
- java中常量有初始化值吗,Java基础_变量、常量
- 完整的支付系统整体架构
- Linux怎么有两个vmdk文件,「Linux」- 挂载 VMDK 文件
- R语言学习之深圳市空气质量分析
- dp光纤线传输距离既然超过百米之长?
- 玩乐购与京东天猫深度合作 打造云购全网最低价
- Form表单之get提交与post提交