Spark Steaming

Spark Streaming 简介

什么是Spark Streaming

Spark Streaming使用Spark Core的快速调度功能来执行流分析。它以小批量方式提取数据,并对这些小批量数据执行RDD转换。此设计使得为批量分析编写的同一组应用程序代码可用于流分析,从而有助于轻松实现lambda体系结构。 然而,这种便利性带来的等待时间等于小批量持续时间。其他按事件而不是小批量处理事件的流数据引擎包括Storm和Flink的流媒体组件。 Spark Streaming内置支持消费Kafka,Flume,Twitter,ZeroMQ,Kinesis和TCP / IP套接字。

在Spark 2.x中,还提供了一种基于数据集的独立技术,称为结构化流,具有更高级别的接口,以支持流式传输。

spark streaming实时接收输入数据流,并根据时间将数据流分成连续的多个batch,然后由Spark引擎一次处理一批数据,以批量生成最终结果流。

spark streaming的核心参数,设置流数据被分成多个batch的时间间隔,每个spark引擎处理的就是这个时间间隔内的数据。在Spark Streaming中,Job之间有可能存在依赖关系,所以后面的作业必须确保前面的作业执行完后才会被调度执行。如果批处理时间超过了batch duration,意味着数据处理速率跟不上数据接收速率,那么会导致后面正常的batch提交的作业无法按时执行,随着时间的推移,越来越多的作业被延迟执行,最后导致整个Streaming作业被阻塞,所以需要设置一个合理的批处理间隔以确保作业能够在这个批处理间隔内执行完成。

Spark Streaming 特点

1、Spark Streaming用于处理流式计算问题。能够和Spark的其他模块无缝集成。

2、Spark Streaming是一个粗粒度的框架【也就是只能对一批数据指定处理方法】,核心是采用微批次架构。和Storm采用的以条处理的不同。

3、Spark Streaming会运行接收器来不断的接收输入的数据流,然后根据程序配置的时间,将时间范围内的所有数据打成一个RDD,发送给Spark Core去进行处理。依次来打成对数据流的计算。

4、Spark Streaming有它自己的抽象,叫DStream Discretized Stream离散化流

5、如果入水口的速度大于出水口的速度,那么势必导致水管爆裂,Spark Streaming也存在这个问题,内部采用背压机制来进行处理,会通过ReceiverRateController来不断计算RDD的处理速度和RDD的生成速度,来通过令牌桶机制进行速度控制。只要是控制令牌的生成周期。

术语定义

  • 离散流(discretized stream)或DStream:这是Spark Streaming对内部持续的实时数据流的抽象描述,即我们处理的一个实时数据流,在Spark Streaming中对应于一个DStream 实例。

  • 批数据(batch data):这是化整为零的第一步,将实时流数据以时间片为单位进行分批,将流处理转化为时间片数据的批处理。随着持续时间的推移,这些处理结果就形成了对应的结果数据流了。

  • 时间片或批处理时间间隔( batch interval):这是人为地对流数据进行定量的标准,以时间片作为我们拆分流数据的依据。一个时间片的数据对应一个RDD实例。

  • 窗口长度(window length):一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数,

  • 滑动时间间隔:前一个窗口到后一个窗口所经过的时间长度。必须是批处理时间间隔的倍数

  • Input DStream :一个input DStream是一个特殊的DStream,将Spark Streaming连接到一个外部数据源来读取数据。

Storm与Spark Streming比较

  • 处理模型以及延迟

虽然两框架都提供了可扩展性(scalability)和可容错性(fault tolerance),但是它们的处理模型从根本上说是不一样的。Storm可以实现亚秒级时延的处理,而每次只处理一条event,而Spark Streaming可以在一个短暂的时间窗口里面处理多条(batches)Event。所以说Storm可以实现亚秒级时延的处理,而Spark Streaming则有一定的时延。

  • 容错和数据保证

然而两者的代价都是容错时候的数据保证,Spark Streaming的容错为有状态的计算提供了更好的支持。在Storm中,每条记录在系统的移动过程中都需要被标记跟踪,所以Storm只能保证每条记录最少被处理一次,但是允许从错误状态恢复时被处理多次。这就意味着可变更的状态可能被更新两次从而导致结果不正确。

任一方面,Spark Streaming仅仅需要在批处理级别对记录进行追踪,所以他能保证每个批处理记录仅仅被处理一次,即使是node节点挂掉。虽然说Storm的 Trident library可以保证一条记录被处理一次,但是它依赖于事务更新状态,而这个过程是很慢的,并且需要由用户去实现。

  • 实现和编程API

Storm主要是由Clojure语言实现,Spark Streaming是由Scala实现。如果你想看看这两个框架是如何实现的或者你想自定义一些东西你就得记住这一点。Storm是由BackType和 Twitter开发,而Spark Streaming是在UC Berkeley开发的。

Storm提供了Java API,同时也支持其他语言的API。 Spark Streaming支持Scala和Java语言(其实也支持Python)。

  • 批处理框架集成

Spark Streaming的一个很棒的特性就是它是在Spark框架上运行的。这样你就可以想使用其他批处理代码一样来写Spark Streaming程序,或者是在Spark中交互查询。这就减少了单独编写流批量处理程序和历史数据处理程序。

  • 生产支持

Storm已经出现好多年了,而且自从2011年开始就在Twitter内部生产环境中使用,还有其他一些公司。而Spark Streaming是一个新的项目,并且在2013年仅仅被Sharethrough使用(据作者了解)。

Storm是 Hortonworks Hadoop数据平台中流处理的解决方案,而Spark Streaming出现在 MapR的分布式平台和Cloudera的企业数据平台中。除此之外,Databricks是为Spark提供技术支持的公司,包括了Spark Streaming。

虽然说两者都可以在各自的集群框架中运行,但是Storm可以在Mesos上运行, 而Spark Streaming可以在YARN和Mesos上运行。

DStream

表示一系列时间序列上连续的RDDs,每一个RDDs代表一定时间间隔内到达的数据,这样就把连续的数据流拆成很多小的RDDs数据块(RDDs数据块内的数据是连续的数据)。可以通过实时数据创建DStream,也可以对现有的DStream进行transformation操作生成,例如map、window、reduceByKeyAndWindow等转换操作。 在spark streaming运行期间,每个DStream都会定期生成一个RDDs,具体的是compute(time) 方法,生成的RDDs代表一个批次内的数据,作为提交job的输入元数据

Spark Streaming 实战

spark nc word count scala


import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}/*** <p>** @author leone* @since 2018-12-24**/
object NcWordCount {def main(args: Array[String]): Unit = {// 创建sparkContextval sc = new SparkContext(new SparkConf().setAppName("nc-wc-steaming").setMaster("local[2]"))// 设置批次产生的时间间隔val ssc = new StreamingContext(sc, Seconds(5000))// 从一个socket端口读取数据val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node-1", 8888)// 对DStream进行操作val words: DStream[String] = lines.flatMap(_.split(" "))val wordAndOne: DStream[(String, Int)] = words.map((_, 1))val reduced: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)// 打印结果reduced.print()// 启动spark程序ssc.start()ssc.awaitTermination()}}

Spark Streaming kafka scala

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** <p>** @author leone* @since 2018-12-24**/
object KafkaWordCount {def main(args: Array[String]): Unit = {// offset保存路径val checkpointPath = "file:///e:/tmp/spark/streaming/checkpoint/kafka-direct"val conf = new SparkConf().setAppName("ScalaKafkaStream").setMaster("local[2]")val ssc = new StreamingContext(conf, Seconds(3))ssc.checkpoint(checkpointPath)val bootstrapServers = "node-2:9092,node-3:9092,node-4:9092"val groupId = "streaming-group"val topicName = "streaming-topic"val maxPoll = 20000val kafkaParams = Map(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,ConsumerConfig.GROUP_ID_CONFIG -> groupId,ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])val DStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set(topicName), kafkaParams))DStream.map(_.value).flatMap(_.split(" ")).map(x => (x, 1L)).reduceByKey(_ + _).transform(data => {data.sortBy(_._2, false)}).print()ssc.start()ssc.awaitTermination()}}

Spark Streaming nc word count java


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;import java.util.Arrays;/*** <p>** @author leone* @since 2018-12-25**/
public class JavaStreamingTest {public static void main(String[] args) throws InterruptedException {SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("streaming");JavaStreamingContext jsc = new JavaStreamingContext(conf, Seconds.apply(5));JavaReceiverInputDStream<String> stream = jsc.socketTextStream("node-1", 8888);// 拆分每一行数据JavaDStream<String> words = stream.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator());// 单词映射成 (word, 1) 的形式JavaPairDStream<Object, Integer> pairWords = words.mapToPair((PairFunction<String, Object, Integer>) s -> new Tuple2<>(s, 1));// 进行reduce聚合操作JavaPairDStream<Object, Integer> result = pairWords.reduceByKey((Function2<Integer, Integer, Integer>) (integer, integer2) -> integer + integer2);// 打印输出结构result.print();jsc.start();jsc.awaitTermination();jsc.close();}}

Spark Streaming kafka word count java


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;import java.util.*;/*** <p>基于Kafka Direct方式实时 word count 程序** @author leone* @since 2018-12-25**/
public class JavaStreamingKafkaTest {public static void main(String[] args) throws InterruptedException {// 创建SparkConf对象SparkConf conf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[*]");// 创建JavaStreamingContext对象JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));// kafka的brokersString brokers = "node-2:9092,node-3:9092,node-4:9092";// 创建Kafka参数MapMap<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("metadata.broker.list", brokers);kafkaParams.put("bootstrap.servers", brokers);kafkaParams.put("group.id", "g1");kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");kafkaParams.put("auto.offset.reset", "latest");// 创建Kafka的topics ,里面可以填多个topicCollection<String> topics = Collections.singletonList("streaming-topic");// 创建DStreamJavaInputDStream<ConsumerRecord<Object, Object>> lines = KafkaUtils.createDirectStream(jsc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafkaParams));// 拆分Kafka topic里面的数据JavaDStream<String> linesSplit = lines.flatMap((FlatMapFunction<ConsumerRecord<Object, Object>, String>) line -> Arrays.asList(line.value().toString().split(" ")).iterator());// 单词映射成(word,1)的形式JavaPairDStream<String, Integer> word = linesSplit.mapToPair((PairFunction<String, String, Integer>) everyWord -> new Tuple2<>(everyWord, 1));// 进行reduce聚合操作JavaPairDStream<String, Integer> wordsCount = word.reduceByKey((Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2);// 打印输出结构wordsCount.print();jsc.start();jsc.awaitTermination();jsc.close();}}

Spark Steaming快速入门相关推荐

  1. spark SQL快速入门 1-9 慕课网

    1.hadoop安装 1.修改hadoop配置文件hadoop-env.shexport JAVA_HOME=/home/hadoop/app/jdk1.8.0_91core-site.xml< ...

  2. Spark GraphX 快速入门

    教程目录 0x00 教程内容 0x01 Spark GraphX 介绍 1. GraphX 介绍 2. GraphX 的使用场景 0x02 GraphX 重要概念与实操 1. 属性图 2. 多种方式理 ...

  3. spark教程python案例_Spark实战(四)spark+python快速入门实战小例子(PySpark)

    由于目前很多spark程序资料都是用scala语言写的,但是现在需要用python来实现,于是在网上找了scala写的例子改为python实现 1.集群测试实例 代码如下: from pyspark. ...

  4. Spark SQL 快速入门系列(五)SparkSQL 访问 Hive

    文章目录 访问 Hive SparkSQL 整合 Hive 访问 Hive 表 idea实现SparkSQL连接hive 访问 Hive 导读 1,整合 SparkSQL 和 Hive, 使用 Hiv ...

  5. Spark Core快速入门系列(5) | RDD 中函数的传递

      大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...

  6. Apache Spark 2.2.0 中文文档 - 快速入门 | ApacheCN

    快速入门 使用 Spark Shell 进行交互式分析 基础 Dataset 上的更多操作 缓存 独立的应用 快速跳转 本教程提供了如何使用 Spark 的快速入门介绍.首先通过运行 Spark 交互 ...

  7. spark之1:快速入门

    spark之1:快速入门 @(SPARK)[spark, 大数据] spark可以通过交互式命令行及编程两种方式来进行调用: 前者支持scala与python 后者支持scala.python与jav ...

  8. Spark快速入门指南 – Spark安装与基础使用

    本文转载自Spark快速入门指南 – Spark安装与基础使用 Apache Spark 是一个新兴的大数据处理通用引擎,提供了分布式的内存抽象.Spark 正如其名,最大的特点就是快(Lightni ...

  9. [学习笔记]黑马程序员Spark全套视频教程,4天spark3.2快速入门到精通,基于Python语言的spark教程

    文章目录 视频资料: 思维导图 一.Spark基础入门(环境搭建.入门概念) 第二章:Spark环境搭建-Local 2.1 课程服务器环境 2.2 Local模式基本原理 2.3 安装包下载 2.4 ...

最新文章

  1. Dremel - Interactive Analysis of WebScale Datasets
  2. 第十五届全国大学智能汽车提问回答问题 2020-8-9
  3. 利用ConfigParser读取配置文件
  4. 11.2 滑动窗口-机器学习笔记-斯坦福吴恩达教授
  5. 代理加盟哪家小程序开发公司好
  6. OpenCASCADE:Foundation Classes之数学原语和算法
  7. VTK:简单操作之PerspectiveTransform
  8. 内存泄露、内存溢出以及解决方法
  9. SQL点滴19—T-SQL中的透视和逆透视
  10. HTML + CSS 为何得不到编程界的认可?
  11. VMware卸载后再安装提示无法打开注册表项 UNKNOWN\Components\…解决办法
  12. 快速下载各类网页视频插件~COCOCUT
  13. 怎样注册一个codepen账号
  14. keras有cpu和gpu版本的区别
  15. SQLOS任务调度算法
  16. 2022年操作系统行业研究报告
  17. PHP版本更新功能实现,技术分享:最低PHP版本更新操作 | Wopus
  18. mysql 分离 实时读_MySQL深入利用Ameoba实现读写分离
  19. java 集合封装树形结构
  20. Unity 中摄像机跟踪的两种实现

热门文章

  1. 成功的10000小时定律
  2. 根据特征图画热图_heatmap
  3. uva11401:Triangle Counting 递推 数学
  4. css:字母hover文字加粗,盒子变宽,导致文字列表抖动
  5. 操作系统硬件介绍-处理器(CPU)
  6. 直达号PK公众号的背后还有哪些市场空间?
  7. echarts的xAxis的axisLabel的使用
  8. 通过 iptables 禁止 ping
  9. ccid linux,在Linux环境下搭建CCID测试环境
  10. Oracle EBS fnd_flex_ext.get_ccid返回CodeCombinationId为0