SparkSteaming整合Kafka的方式
1、基于Receiver方式
- 这种方式构建出的DStream有一个接收者Receiver,通过这个接收者将数据保存在Executor中。
这种方式是需要独享CPU的core,也就是说需要独立占用若干个线程。所以如果在本地模式下,local[N]中的N指定为1的话,就只有一个线程来运行SparkStreaming程序,这一个线程只能用来接收数据,没有额外的线程去计算,所以会看到数据不被处理的现象。 - 这种方式数据可能在计算失败的情况下丢失。为了防止数据零丢失,我们需要开启SparkStreaming的预写日志机制(write ahead log,WAL),该机制会同步的将接收到的Kafka数据写入分布式文件系统,比如HDFS中。所及,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。
- 开启方式:spark.streaming.receiver.writeAheadLog.enable=true,默认是false。
- 需要注意的方面:
1、Kafka的topic分区和SparkStreaming中生成的RDD分区没有关系。在KafkaUtils.createStream中增加分区数量只会增加单个receiver的线程数,不会增加spark的并行度。
2、可以创建多个的Kafka输入DSteam,使用不同的group和topic,使用多个Receiver并行接收数据。
3、如果启用了HDFS等有容错的存储系统, 并且启用了写入日志,则接收到的数据已经被复制到日志中。 因此,输入流的存储级别设置StorageLevel.MEMORY_AND_DISK_SER(即使用KafkaUtils.createStream(…,StorageLevel.MEMORY_AND_DISK_SER))的存储级别。 - 代码案例
object _01SparkStreamingWithKafkaReceiverOps {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)Logger.getLogger("org.spark-project").setLevel(Level.WARN)if(args == null || args.length < 3) {println("""Parameter Errors ! Usage: <batchInterval> <zk> <groupId>|batchInterval : 作业提交的间隔时间|zk : zk的元数据地址|groupId : 分组id""".stripMargin)System.exit(-1)}val Array(batchInterval, zk, groupId) = argsval conf = new SparkConf().setAppName("SparkStreamingWithKafkaReceiver").setMaster("local[*]")val ssc = new StreamingContext(conf, Seconds(batchInterval.toLong))val topics = Map[String, Int]("t-1810-1" -> 3)/*** 这里面的Key--->kafka中message的Key,如果没有指定key,key就是null* value--->kafka中message的value*/val kafkaStream:InputDStream[(String, String)] = KafkaUtils.createStream(ssc, zk, groupId, topics)
// kafkaStream.print()val retDStream:DStream[(String, Int)] = kafkaStream.map{case (key, msg) => msg}.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_+_)retDStream.print()ssc.start()ssc.awaitTermination()}
}
- 读取流程分析
2、基于Direct
- 这种方式区别于Receiver的方式,没有接收者专门消耗CPU core或者线程去接收数据,是通过kafka底层的api直接从kafka中读取数据。每次读取的是偏移量的范围代表数据[fromOffset,untilOffset],而这个有范围的偏移量数据就构成了我们进行处理的DSteam或者RDD,也就是说RDD是有范围的。
- 需要注意的地方
1、简化的并行性:不需要创建多个输入kafka流并将其合并。使用DirectSteam,SparkSteaming将创建与kafka分区一样多的RDD分区,这些分区将全部从Kafka并行读取数据。所以在Kafka和RDD分区之间有一对一关系。
2、效率:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
3、Exactly-once语义:第一种方法使用Kafka的高阶API来在Zookeeper中存储消耗的偏移量。虽然这种方法结合WAL机制可以确保零数据丢失(即至少一次语义),但是在某些失败情况下,有一些记录可能会被消费两次。发生这种情况是因为SparkStreaming接收到数据与Zookeeper跟踪的偏移量之间不一致。因此,在第二种方法中,我们不使用Zookeeper的简单KafkaAPI,在其检查点内,SparkStreaming跟踪偏移量。这消除了SparkSteaming和Zookeeper/Kafka之间的不一致。因此SparkStreaming每次记录都会在发生故障的情况下有效地收到一次。为了实现输出结果的一次语义,将数据保存在外部数据存储区的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务。 - 代码案例:
object _01SparkStreamingWithDirectKafkaOps {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)Logger.getLogger("org.spark-project").setLevel(Level.WARN)if(args == null || args.length < 3) {println("""Parameter Errors ! Usage: <batchInterval> <groupId> <topicList>|batchInterval : 作业提交的间隔时间|groupId : 分组id|topicList : 要消费的topic列表""".stripMargin)System.exit(-1)}val Array(batchInterval, groupId, topicList) = argsval conf = new SparkConf().setAppName("SparkStreamingWithDirectKafkaOps").setMaster("local[*]")val ssc = new StreamingContext(conf, Seconds(batchInterval.toLong))val kafkaParams = Map[String, String]("bootstrap.servers" -> "bigdata01:9092,bigdata02:9092,bigdata03:9092","group.id" -> groupId,//largest从偏移量最新的位置开始读取数据//smallest从偏移量最早的位置开始读取"auto.offset.reset" -> "smallest")val topics = topicList.split(",").toSet//基于Direct的方式读取数据val kafkaDStream:InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)kafkaDStream.foreachRDD((rdd, bTime) => {if(!rdd.isEmpty()) {println("-------------------------------------------")println(s"Time: $bTime")println("-------------------------------------------")rdd.foreach{case (key, value) => {println(value)}}//查看rdd的范围println("偏移量范围:")val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor (offsetRange <- offsetRanges) {val topic = offsetRange.topicval parition = offsetRange.partitionval fromOffset = offsetRange.fromOffsetval untilOffset = offsetRange.untilOffsetval count = offsetRange.count()println(s"topic:${topic}, partition:${parition}, " +s"fromOffset:${fromOffset}, untilOffset:${untilOffset}, count:${count}")}}})ssc.start()ssc.awaitTermination()}
}
- 读取流程分析
3、基于Direct的问题
- 如果我们设置auto.offset.reset=largest的话,如果程序挂掉,然后重启这个过程当中,kafka中可能有没有被消费;如果设置为smallest的话,每一次重启之后,程序都会从头开始读取数据,造成重复消费,这都不是我们愿意看到的。
- 归根到的产生问题的原因就是偏移量默认情况下,不可控,有SparkStreaming来控制,所以要想解决这个问题,就必须自己来管理偏移量。管理偏移量的方式有很多,在企业中常用的方式,有基于ZooKeeper,HBase,ES等等,我们这里给大家通过使用ZK来管理offset。
- 基于ZK的解决方式
1、步骤:
2、代码:
object _02SparkStreamingWithDirectKafkaOps {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)Logger.getLogger("org.spark-project").setLevel(Level.WARN)if(args == null || args.length < 3) {println("""Parameter Errors ! Usage: <batchInterval> <groupId> <topicList>|batchInterval : 作业提交的间隔时间|groupId : 分组id|topicList : 要消费的topic列表""".stripMargin)System.exit(-1)}val Array(batchInterval, groupId, topicList) = argsval conf = new SparkConf().setAppName("SparkStreamingWithDirectKafkaOps").setMaster("local[*]")val ssc = new StreamingContext(conf, Seconds(batchInterval.toLong))val kafkaParams = Map[String, String]("bootstrap.servers" -> "bigdata01:9092,bigdata02:9092,bigdata03:9092","group.id" -> groupId,//largest从偏移量最新的位置开始读取数据//smallest从偏移量最早的位置开始读取"auto.offset.reset" -> "smallest")val topics = topicList.split(",").toSetval messages:InputDStream[(String, String)] = createMsg(ssc, kafkaParams, topics, groupId)//step 3、业务处理messages.foreachRDD((rdd, bTime) => {if(!rdd.isEmpty()) {println("-------------------------------------------")println(s"Time: $bTime")println("-------------------------------------------")println("########rdd'count: " + rdd.count())//step 4、更新偏移量store(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, groupId)}})ssc.start()ssc.awaitTermination()}def createMsg(ssc:StreamingContext, kafkaParams:Map[String, String],topics:Set[String], group:String):InputDStream[(String, String)] = {//step 1、从zk中获取偏移量val offsets: Map[TopicAndPartition, Long] = getOffsets(topics, group)var messages:InputDStream[(String, String)] = null//step 2、基于偏移量创建messageif(!offsets.isEmpty) {//读取到了对应的偏移量val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)messages = KafkaUtils.createDirectStream[String, String,StringDecoder, StringDecoder,(String, String)](ssc,kafkaParams, offsets, messageHandler)} else {//无偏移量messages = KafkaUtils.createDirectStream[String, String,StringDecoder, StringDecoder](ssc, kafkaParams, topics)}messages}/*** step 1、获取对应topic的partition在zk中的偏移量信息* 框架默认将数据保存的路径:/kafka/consumers/${groupId}/offsets/${topic}/${partition}* 数据是offset* 自己模拟一个路径:* /kafka/mykafka/offsets/${topic}/${groupId}/${partition}* 数据是offset*/def getOffsets(topics:Set[String], group:String): Map[TopicAndPartition, Long] = {val offsets = mutable.Map[TopicAndPartition, Long]()for (topic <- topics) {val path = s"${topic}/${group}"checkExists(path)for(partition <- JavaConversions.asScalaBuffer(curator.getChildren.forPath(path))) {val fullPath = s"${path}/${partition}"//获取指定分区的偏移量val offset = new String(curator.getData.forPath(fullPath)).toLongoffsets.put(TopicAndPartition(topic, partition.toInt), offset)}}offsets.toMap}//step 4、更新偏移量def store(offsetRanges: Array[OffsetRange], group:String) {for(offsetRange <- offsetRanges) {val topic = offsetRange.topicval partition = offsetRange.partitionval offset = offsetRange.untilOffsetval fullPath = s"${topic}/${group}/${partition}"checkExists(fullPath)curator.setData().forPath(fullPath, (offset + "").getBytes())}}def checkExists(path:String): Unit = {if(curator.checkExists().forPath(path) == null) {curator.create().creatingParentsIfNeeded().forPath(path)//路径一定存在}}val curator = {val zk = "bigdata01:2181,bigdata02:2181,bigdata03:2181"val curator:CuratorFramework = CuratorFrameworkFactory.builder().connectString(zk).namespace("kafka/mykafka/offsets").retryPolicy(new ExponentialBackoffRetry(1000, 3)).build()curator.start()curator}
}
SparkSteaming整合Kafka的方式相关推荐
- Springboot2整合kafka的两种使用方式
Springboot2整合kafka kafka docker上安装环境 Springboot2引入kafka 基于注解 基于客户端 kafka是一个分布式消息队列.在项目中应用十分广泛,具有高性能. ...
- 最简单的kafka接入方式(kafka配置),kafka整合Spring
文章目录 一.前言. 二.主要流程. 三.各个细节,步骤 一.前言. 本文主要介绍了Springboot项目整合kafka的最简单的方式. 二.主要流程. 1.引入Maven 2.增加消费者和生产者配 ...
- 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
目录 整合 Kafka 说明 Kafka特定配置 KafkaSoure 1.消费一个Topic数据 2.消费多个Topic数据 3.消费通配符匹配Topic数据 Kafka ...
- 2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
目录 整合Kafka 0-10-开发使用 原理 1.Direct方式 2.简单的并行度1 : 1 API 注意 代码实现-自动提交偏移量到默认主题 代码实现- ...
- SpringBoot实战(十四)之整合KafKa
本人今天上午参考了不少博文,发现不少博文不是特别好,不是因为依赖冲突问题就是因为版本问题. 于是我结合相关的博文和案例,自己改写了下并参考了下,于是就有了这篇文章.希望能够给大家帮助,少走一些弯路. ...
- SpringBoot整合kafka(实现producer和consumer)
转载自 SpringBoot整合kafka(实现producer和consumer) 在Windows环境下安装运行Kafka:https://www.jianshu.com/p/d64798e81f ...
- springboot 整合 kafka demo 顺便看一下源码
大家好,我是烤鸭: 今天分享下 springboot 整合 kafka. 1. 环境参数: windows + kafka_2.11-2.3.0 + zookeeper-3.5.6 + spring ...
- 超详细!一文告诉你 SparkStreaming 如何整合 Kafka !附代码可实践
来源 | Alice菌 责编 | Carol 封图 | CSDN 下载于视觉中国 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲太多了,今天的内容主要是为大家带来的是 Spa ...
- kafka专题:kafka单机和集群安装详情,Spring Boot如何整合Kafka
文章目录 1. kafka单机安装 1.1 server.properties核心配置 2. kafka集群安装 2.1 kafka集群可视化界面 3. springboot如何整合kafka 4. ...
最新文章
- 数据加密之MD5加密
- mysql 忘记root密码 进行重置
- Maven最全教程,还有哪里对maven不解的地方看过来!
- apache spark_Apache Spark Job的剖析
- lr LoadRunner Internal Architecture loadrunner运行原理图解
- c语言socket段错误,(Qtcpsocket)退出程序时提示段错误的解决
- 封装数据库增删该通用方法
- 【路径规划】基于matlab A_star算法智能仓储机器人移动避碰路径规划【含Matlab源码 1180期】
- 万字长文:AWS如何跨越“鸿沟”
- 芒果 mysql插件,NoSQL代表:MongoDB(芒果数据库)
- Docker理论与实践(四)
- oracle导出dmp文件合集
- python常见算法实现_几种常见算法的Python实现
- 厦门信托•震雷先行者集合资金信托计划
- 如何运行linux中的vi,如何在linux中vi使用方法
- objectArx --- 实体类AcDbEntity方法
- GPL LGPL M.
- flask后端接受图片文件数据
- avplayer播放结束监听
- numpy--np.ix_()用法