1、基于Receiver方式

  • 这种方式构建出的DStream有一个接收者Receiver,通过这个接收者将数据保存在Executor中。
    这种方式是需要独享CPU的core,也就是说需要独立占用若干个线程。所以如果在本地模式下,local[N]中的N指定为1的话,就只有一个线程来运行SparkStreaming程序,这一个线程只能用来接收数据,没有额外的线程去计算,所以会看到数据不被处理的现象。
  • 这种方式数据可能在计算失败的情况下丢失。为了防止数据零丢失,我们需要开启SparkStreaming的预写日志机制(write ahead log,WAL),该机制会同步的将接收到的Kafka数据写入分布式文件系统,比如HDFS中。所及,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。
  • 开启方式:spark.streaming.receiver.writeAheadLog.enable=true,默认是false。
  • 需要注意的方面:
    1、Kafka的topic分区和SparkStreaming中生成的RDD分区没有关系。在KafkaUtils.createStream中增加分区数量只会增加单个receiver的线程数,不会增加spark的并行度。
    2、可以创建多个的Kafka输入DSteam,使用不同的group和topic,使用多个Receiver并行接收数据。
    3、如果启用了HDFS等有容错的存储系统, 并且启用了写入日志,则接收到的数据已经被复制到日志中。 因此,输入流的存储级别设置StorageLevel.MEMORY_AND_DISK_SER(即使用KafkaUtils.createStream(…,StorageLevel.MEMORY_AND_DISK_SER))的存储级别。
  • 代码案例
object _01SparkStreamingWithKafkaReceiverOps {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)Logger.getLogger("org.spark-project").setLevel(Level.WARN)if(args == null || args.length < 3) {println("""Parameter Errors ! Usage: <batchInterval> <zk> <groupId>|batchInterval    :  作业提交的间隔时间|zk               :  zk的元数据地址|groupId          :  分组id""".stripMargin)System.exit(-1)}val Array(batchInterval, zk, groupId) = argsval conf = new SparkConf().setAppName("SparkStreamingWithKafkaReceiver").setMaster("local[*]")val ssc = new StreamingContext(conf, Seconds(batchInterval.toLong))val topics = Map[String, Int]("t-1810-1" -> 3)/*** 这里面的Key--->kafka中message的Key,如果没有指定key,key就是null* value--->kafka中message的value*/val kafkaStream:InputDStream[(String, String)] = KafkaUtils.createStream(ssc, zk, groupId, topics)
//        kafkaStream.print()val retDStream:DStream[(String, Int)] = kafkaStream.map{case (key, msg) => msg}.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_+_)retDStream.print()ssc.start()ssc.awaitTermination()}
}
  • 读取流程分析

2、基于Direct

  • 这种方式区别于Receiver的方式,没有接收者专门消耗CPU core或者线程去接收数据,是通过kafka底层的api直接从kafka中读取数据。每次读取的是偏移量的范围代表数据[fromOffset,untilOffset],而这个有范围的偏移量数据就构成了我们进行处理的DSteam或者RDD,也就是说RDD是有范围的。
  • 需要注意的地方
    1、简化的并行性:不需要创建多个输入kafka流并将其合并。使用DirectSteam,SparkSteaming将创建与kafka分区一样多的RDD分区,这些分区将全部从Kafka并行读取数据。所以在Kafka和RDD分区之间有一对一关系。
    2、效率:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
    3、Exactly-once语义:第一种方法使用Kafka的高阶API来在Zookeeper中存储消耗的偏移量。虽然这种方法结合WAL机制可以确保零数据丢失(即至少一次语义),但是在某些失败情况下,有一些记录可能会被消费两次。发生这种情况是因为SparkStreaming接收到数据与Zookeeper跟踪的偏移量之间不一致。因此,在第二种方法中,我们不使用Zookeeper的简单KafkaAPI,在其检查点内,SparkStreaming跟踪偏移量。这消除了SparkSteaming和Zookeeper/Kafka之间的不一致。因此SparkStreaming每次记录都会在发生故障的情况下有效地收到一次。为了实现输出结果的一次语义,将数据保存在外部数据存储区的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务。
  • 代码案例:
object _01SparkStreamingWithDirectKafkaOps {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)Logger.getLogger("org.spark-project").setLevel(Level.WARN)if(args == null || args.length < 3) {println("""Parameter Errors ! Usage: <batchInterval> <groupId> <topicList>|batchInterval    :  作业提交的间隔时间|groupId          :  分组id|topicList        :  要消费的topic列表""".stripMargin)System.exit(-1)}val Array(batchInterval, groupId, topicList) = argsval conf = new SparkConf().setAppName("SparkStreamingWithDirectKafkaOps").setMaster("local[*]")val ssc = new StreamingContext(conf, Seconds(batchInterval.toLong))val kafkaParams = Map[String, String]("bootstrap.servers" -> "bigdata01:9092,bigdata02:9092,bigdata03:9092","group.id" -> groupId,//largest从偏移量最新的位置开始读取数据//smallest从偏移量最早的位置开始读取"auto.offset.reset" -> "smallest")val topics = topicList.split(",").toSet//基于Direct的方式读取数据val kafkaDStream:InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)kafkaDStream.foreachRDD((rdd, bTime) => {if(!rdd.isEmpty()) {println("-------------------------------------------")println(s"Time: $bTime")println("-------------------------------------------")rdd.foreach{case (key, value) => {println(value)}}//查看rdd的范围println("偏移量范围:")val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor (offsetRange <- offsetRanges) {val topic = offsetRange.topicval parition = offsetRange.partitionval fromOffset = offsetRange.fromOffsetval untilOffset = offsetRange.untilOffsetval count = offsetRange.count()println(s"topic:${topic}, partition:${parition}, " +s"fromOffset:${fromOffset}, untilOffset:${untilOffset}, count:${count}")}}})ssc.start()ssc.awaitTermination()}
}
  • 读取流程分析

3、基于Direct的问题

  • 如果我们设置auto.offset.reset=largest的话,如果程序挂掉,然后重启这个过程当中,kafka中可能有没有被消费;如果设置为smallest的话,每一次重启之后,程序都会从头开始读取数据,造成重复消费,这都不是我们愿意看到的。
  • 归根到的产生问题的原因就是偏移量默认情况下,不可控,有SparkStreaming来控制,所以要想解决这个问题,就必须自己来管理偏移量。管理偏移量的方式有很多,在企业中常用的方式,有基于ZooKeeper,HBase,ES等等,我们这里给大家通过使用ZK来管理offset。
  • 基于ZK的解决方式
    1、步骤:
    2、代码:
object _02SparkStreamingWithDirectKafkaOps {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)Logger.getLogger("org.spark-project").setLevel(Level.WARN)if(args == null || args.length < 3) {println("""Parameter Errors ! Usage: <batchInterval> <groupId> <topicList>|batchInterval    :  作业提交的间隔时间|groupId          :  分组id|topicList        :  要消费的topic列表""".stripMargin)System.exit(-1)}val Array(batchInterval, groupId, topicList) = argsval conf = new SparkConf().setAppName("SparkStreamingWithDirectKafkaOps").setMaster("local[*]")val ssc = new StreamingContext(conf, Seconds(batchInterval.toLong))val kafkaParams = Map[String, String]("bootstrap.servers" -> "bigdata01:9092,bigdata02:9092,bigdata03:9092","group.id" -> groupId,//largest从偏移量最新的位置开始读取数据//smallest从偏移量最早的位置开始读取"auto.offset.reset" -> "smallest")val topics = topicList.split(",").toSetval messages:InputDStream[(String, String)] = createMsg(ssc, kafkaParams, topics, groupId)//step 3、业务处理messages.foreachRDD((rdd, bTime) => {if(!rdd.isEmpty()) {println("-------------------------------------------")println(s"Time: $bTime")println("-------------------------------------------")println("########rdd'count: " + rdd.count())//step 4、更新偏移量store(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, groupId)}})ssc.start()ssc.awaitTermination()}def createMsg(ssc:StreamingContext, kafkaParams:Map[String, String],topics:Set[String], group:String):InputDStream[(String, String)] = {//step 1、从zk中获取偏移量val offsets: Map[TopicAndPartition, Long] = getOffsets(topics, group)var messages:InputDStream[(String, String)] = null//step 2、基于偏移量创建messageif(!offsets.isEmpty) {//读取到了对应的偏移量val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)messages = KafkaUtils.createDirectStream[String, String,StringDecoder, StringDecoder,(String, String)](ssc,kafkaParams, offsets, messageHandler)} else {//无偏移量messages = KafkaUtils.createDirectStream[String, String,StringDecoder, StringDecoder](ssc, kafkaParams, topics)}messages}/*** step 1、获取对应topic的partition在zk中的偏移量信息* 框架默认将数据保存的路径:/kafka/consumers/${groupId}/offsets/${topic}/${partition}*                                                                          数据是offset* 自己模拟一个路径:*     /kafka/mykafka/offsets/${topic}/${groupId}/${partition}*                                                     数据是offset*/def getOffsets(topics:Set[String], group:String): Map[TopicAndPartition, Long] = {val offsets = mutable.Map[TopicAndPartition, Long]()for (topic <- topics) {val path = s"${topic}/${group}"checkExists(path)for(partition <- JavaConversions.asScalaBuffer(curator.getChildren.forPath(path))) {val fullPath = s"${path}/${partition}"//获取指定分区的偏移量val offset = new String(curator.getData.forPath(fullPath)).toLongoffsets.put(TopicAndPartition(topic, partition.toInt), offset)}}offsets.toMap}//step 4、更新偏移量def store(offsetRanges: Array[OffsetRange], group:String) {for(offsetRange <- offsetRanges) {val topic = offsetRange.topicval partition = offsetRange.partitionval offset = offsetRange.untilOffsetval fullPath = s"${topic}/${group}/${partition}"checkExists(fullPath)curator.setData().forPath(fullPath, (offset + "").getBytes())}}def checkExists(path:String): Unit = {if(curator.checkExists().forPath(path) == null) {curator.create().creatingParentsIfNeeded().forPath(path)//路径一定存在}}val curator = {val zk = "bigdata01:2181,bigdata02:2181,bigdata03:2181"val curator:CuratorFramework = CuratorFrameworkFactory.builder().connectString(zk).namespace("kafka/mykafka/offsets").retryPolicy(new ExponentialBackoffRetry(1000, 3)).build()curator.start()curator}
}

SparkSteaming整合Kafka的方式相关推荐

  1. Springboot2整合kafka的两种使用方式

    Springboot2整合kafka kafka docker上安装环境 Springboot2引入kafka 基于注解 基于客户端 kafka是一个分布式消息队列.在项目中应用十分广泛,具有高性能. ...

  2. 最简单的kafka接入方式(kafka配置),kafka整合Spring

    文章目录 一.前言. 二.主要流程. 三.各个细节,步骤 一.前言. 本文主要介绍了Springboot项目整合kafka的最简单的方式. 二.主要流程. 1.引入Maven 2.增加消费者和生产者配 ...

  3. 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka

    目录 整合 Kafka 说明 Kafka特定配置 ​​​​​​​KafkaSoure 1.消费一个Topic数据 2.消费多个Topic数据 3.消费通配符匹配Topic数据 ​​​​​​​Kafka ...

  4. 2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用

    目录 整合Kafka 0-10-开发使用 原理 1.Direct方式 2.简单的并行度1 : 1 ​​​​​​​API 注意 ​​​​​​​代码实现-自动提交偏移量到默认主题 ​​​​​​​代码实现- ...

  5. SpringBoot实战(十四)之整合KafKa

    本人今天上午参考了不少博文,发现不少博文不是特别好,不是因为依赖冲突问题就是因为版本问题. 于是我结合相关的博文和案例,自己改写了下并参考了下,于是就有了这篇文章.希望能够给大家帮助,少走一些弯路. ...

  6. SpringBoot整合kafka(实现producer和consumer)

    转载自 SpringBoot整合kafka(实现producer和consumer) 在Windows环境下安装运行Kafka:https://www.jianshu.com/p/d64798e81f ...

  7. springboot 整合 kafka demo 顺便看一下源码

    大家好,我是烤鸭: 今天分享下 springboot 整合 kafka. 1.  环境参数: windows + kafka_2.11-2.3.0 + zookeeper-3.5.6 + spring ...

  8. 超详细!一文告诉你 SparkStreaming 如何整合 Kafka !附代码可实践

    来源 | Alice菌 责编 | Carol 封图 |  CSDN 下载于视觉中国 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲太多了,今天的内容主要是为大家带来的是 Spa ...

  9. kafka专题:kafka单机和集群安装详情,Spring Boot如何整合Kafka

    文章目录 1. kafka单机安装 1.1 server.properties核心配置 2. kafka集群安装 2.1 kafka集群可视化界面 3. springboot如何整合kafka 4. ...

最新文章

  1. 数据加密之MD5加密
  2. mysql 忘记root密码 进行重置
  3. Maven最全教程,还有哪里对maven不解的地方看过来!
  4. apache spark_Apache Spark Job的剖析
  5. lr LoadRunner Internal Architecture loadrunner运行原理图解
  6. c语言socket段错误,(Qtcpsocket)退出程序时提示段错误的解决
  7. 封装数据库增删该通用方法
  8. 【路径规划】基于matlab A_star算法智能仓储机器人移动避碰路径规划【含Matlab源码 1180期】
  9. 万字长文:AWS如何跨越“鸿沟”
  10. 芒果 mysql插件,NoSQL代表:MongoDB(芒果数据库)
  11. Docker理论与实践(四)
  12. oracle导出dmp文件合集
  13. python常见算法实现_几种常见算法的Python实现
  14. 厦门信托•震雷先行者集合资金信托计划
  15. 如何运行linux中的vi,如何在linux中vi使用方法
  16. objectArx --- 实体类AcDbEntity方法
  17. GPL LGPL M.
  18. flask后端接受图片文件数据
  19. avplayer播放结束监听
  20. numpy--np.ix_()用法

热门文章

  1. eclipse32位安装教程_Maya2015 (64位) 软件安装教程
  2. 517codingP770
  3. 如何html5将文字插入图片,如何在文字中加入图片?
  4. UVA 488 Triangle Wave
  5. 公寓上网新认证方式破解研究
  6. 2021 最新 Win10 MySQL 安装教程
  7. 微信小程序之获取当前位置附近的美食、酒店、娱乐、超市等,并显示标记
  8. 计算机在医学影像学的应用,计算机图像数字化与医学影像学之应用探析
  9. Windows 10 Conda 更换清华大学的镜像源
  10. echarts配置项图文介绍——xAxis