spark的kafka的低阶API createDirectStream的一些总结。
大家都知道在spark1.3版本后,kafkautil里面提供了两个创建dstream的方法,一个是老版本中有的createStream方法,还有一个是后面新加的createDirectStream方法。关于这两个方法的优缺点,官方已经说的很详细(http://spark.apache.org/docs/latest/streaming-kafka-integration.html),总之就是createDirectStream性能会更好一点,通过新方法创建出来的dstream的rdd partition和kafka的topic的partition是一一对应的,通过低阶API直接从kafka的topic消费消息,但是它不再往zookeeper中更新consumer offsets,使得基于zk的consumer offsets的监控工具都会失效。
官方只是蜻蜓点水般的说了一下可以在foreachRDD中更新zookeeper上的offsets:
- directKafkaStream.foreachRDD { rdd =>
- val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
- // offsetRanges.length = # of Kafka partitions being consumed
- ...
- }
对应 Exactly-once semantics要自己去实现了,大致的实现思路就是在driver启动的时候先从zk上获得consumer offsets信息,createDirectStream有两个重载方法,其中一个可以设置从任意offsets位置开始消费,部分代码如下:
- def createDirectStream(implicit streamingConfig: StreamingConfig, kc: KafkaCluster) = {
- val extractors = streamingConfig.getExtractors()
- //从zookeeper上读取offset开始消费message
- val messages = {
- val kafkaPartitionsE = kc.getPartitions(streamingConfig.topicSet)
- if (kafkaPartitionsE.isLeft) throw new SparkException("get kafka partition failed:")
- val kafkaPartitions = kafkaPartitionsE.right.get
- val consumerOffsetsE = kc.getConsumerOffsets(streamingConfig.group, kafkaPartitions)
- if (consumerOffsetsE.isLeft) throw new SparkException("get kafka consumer offsets failed:")
- val consumerOffsets = consumerOffsetsE.right.get
- consumerOffsets.foreach {
- case (tp, n) => println("===================================" + tp.topic + "," + tp.partition + "," + n)
- }
- KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
- ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message))
- }
- messages
- }
这里会有几个问题,就是在一个group是新的consumer group时,即首次消费,zk上海没有相应的group offsets目录,这时要先初始化一下zk上的offsets目录,或者是zk上记录的offsets已经过时,由于kafka有定时清理策略,直接从zk上的offsets开始消费会报ArrayOutofRange异常,即找不到offsets所属的index文件了,针对这两种情况,做了以下处理:
- def setOrUpdateOffsets(implicit streamingConfig: StreamingConfig, kc: KafkaCluster): Unit = {
- streamingConfig.topicSet.foreach(topic => {
- println("current topic:" + topic)
- var hasConsumed = true
- val kafkaPartitionsE = kc.getPartitions(Set(topic))
- if (kafkaPartitionsE.isLeft) throw new SparkException("get kafka partition failed:")
- val kafkaPartitions = kafkaPartitionsE.right.get
- val consumerOffsetsE = kc.getConsumerOffsets(streamingConfig.group, kafkaPartitions)
- if (consumerOffsetsE.isLeft) hasConsumed = false
- if (hasConsumed) {
- //如果有消费过,有两种可能,如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
- //针对这种情况,只要判断一下zk上的consumerOffsets和leaderEarliestOffsets的大小,如果consumerOffsets比leaderEarliestOffsets还小的话,说明是过时的offsets,这时把leaderEarliestOffsets更新为consumerOffsets
- val leaderEarliestOffsets = kc.getEarliestLeaderOffsets(kafkaPartitions).right.get
- println(leaderEarliestOffsets)
- val consumerOffsets = consumerOffsetsE.right.get
- val flag = consumerOffsets.forall {
- case (tp, n) => n < leaderEarliestOffsets(tp).offset
- }
- if (flag) {
- println("consumer group:" + streamingConfig.group + " offsets已经过时,更新为leaderEarliestOffsets")
- val offsets = leaderEarliestOffsets.map {
- case (tp, offset) => (tp, offset.offset)
- }
- kc.setConsumerOffsets(streamingConfig.group, offsets)
- }
- else {
- println("consumer group:" + streamingConfig.group + " offsets正常,无需更新")
- }
- }
- else {
- //如果没有被消费过,则从最新的offset开始消费。
- val leaderLatestOffsets = kc.getLatestLeaderOffsets(kafkaPartitions).right.get
- println(leaderLatestOffsets)
- println("consumer group:" + streamingConfig.group + " 还未消费过,更新为leaderLatestOffsets")
- val offsets = leaderLatestOffsets.map {
- case (tp, offset) => (tp, offset.offset)
- }
- kc.setConsumerOffsets(streamingConfig.group, offsets)
- }
- })
- }
这里又碰到了一个问题,从consumer offsets到leader latest offsets中间延迟了很多消息,在下一次启动的时候,首个batch要处理大量的消息,会导致spark-submit设置的资源无法满足大量消息的处理而导致崩溃。因此在spark-submit启动的时候多加了一个配置:--conf spark.streaming.kafka.maxRatePerPartition=10000。限制每秒钟从topic的每个partition最多消费的消息条数,这样就把首个batch的大量的消息拆分到多个batch中去了,为了更快的消化掉delay的消息,可以调大计算资源和把这个参数调大。
OK,driver启动的问题解决了,那么接下来处理处理完消息后更新zk offsets的工作,这里要注意是在处理完之后再更新,想想如果你消费了消息先更新zk offset在去处理消息将处理好的消息保存到其他地方去,如果后一步由于处理消息的代码有BUG失败了,前一步已经更新了zk了,会导致这部分消息虽然被消费了但是没被处理,等你把处理消息的BUG修复再重新提交后,这部分消息在下次启动的时候不会再被消费了,因为你已经更新了ZK OFFSETS,针对这些因素考虑,部分代码实现如下:
- def updateZKOffsets(rdd: RDD[(String, String)])(implicit streamingConfig: StreamingConfig, kc: KafkaCluster): Unit = {
- println("rdd not empty,update zk offset")
- val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
- for (offsets <- offsetsList) {
- val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
- val o = kc.setConsumerOffsets(streamingConfig.group, Map((topicAndPartition, offsets.untilOffset)))
- if (o.isLeft) {
- println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
- }
- }
- }
- def processData(messages: InputDStream[(String, String)])(implicit streamingConfig: StreamingConfig, kc: KafkaCluster): Unit = {
- messages.foreachRDD(rdd => {
- if (!rdd.isEmpty()) {
- val datamodelRDD = streamingConfig.relation match {
- case "1" =>
- val (topic, _) = streamingConfig.topic_table_mapping
- val extractor = streamingConfig.getExtractor(topic)
- // Create direct kafka stream with brokers and topics
- val topicsSet = Set(topic)
- val datamodel = rdd.filter(msg => {
- extractor.filter(msg)
- }).map(msg => extractor.msgToRow(msg))
- datamodel
- case "2" =>
- val (topics, _) = streamingConfig.topic_table_mapping
- val extractors = streamingConfig.getExtractors(topics)
- val topicsSet = topics.split(",").toSet
- //kafka msg为key-value形式,key用来对msg进行分区用的,为了散列存储消息,采集器那边key采用的是:topic|加一个随机数的形式,例如:rd_e_pal|20,split by |取0可以拿到对应的topic名字,这样union在一起的消息可以区分出来自哪一个topic
- val datamodel = rdd.filter(msg => {
- //kafka msg为key-value形式,key用来对msg进行分区用的,为了散列存储消息,采集器那边key采用的是:topic|加一个随机数的形式,例如:rd_e_pal|20,split by |取0可以拿到对应的topic名字,这样union在一起的消息可以区分出来自哪一个topic
- val keyValid = msg != null && msg._1 != null && msg._1.split("\\|").length == 2
- if (keyValid) {
- val topic = msg._1.split("\\|")(0)
- val (_, extractor) = extractors.find(p => {
- p._1.equalsIgnoreCase(topic)
- }).getOrElse(throw new RuntimeException("配置文件中没有找到topic:" + topic + " 对应的extractor"))
- //trim去掉末尾的换行符,否则取最后一个字段时会有一个\n
- extractor.filter(msg._2.trim)
- }
- else {
- false
- }
- }).map {
- case (key, msgContent) =>
- val topic = key.split("\\|")(0)
- val (_, extractor) = extractors.find(p => {
- p._1.equalsIgnoreCase(topic)
- }).getOrElse(throw new RuntimeException("配置文件中没有找到topic:" + topic + " 对应的extractor"))
- extractor.msgToRow((key, msgContent))
- }
- datamodel
- }
- //先处理消息
- processRDD(datamodelRDD)
- //再更新offsets
- updateZKOffsets(rdd)
- }
- })
- }
- def processRDD(rdd: RDD[Row])(implicit streamingConfig: StreamingConfig) = {
- if (streamingConfig.targetType == "mongo") {
- val target = streamingConfig.getTarget().asInstanceOf[MongoTarget]
- if (!MongoDBClient.db.collectionExists(target.collection)) {
- println("create collection:" + target.collection)
- MongoDBClient.db.createCollection(target.collection, MongoDBObject("storageEngine" -> MongoDBObject("wiredTiger" -> MongoDBObject())))
- val coll = MongoDBClient.db(target.collection)
- //创建ttl index
- if (target.ttlIndex) {
- val indexs = coll.getIndexInfo
- if (indexs.find(p => p.get("name") == "ttlIndex") == None) {
- coll.createIndex(MongoDBObject(target.ttlColumn -> 1), MongoDBObject("expireAfterSeconds" -> target.ttlExpire, "name" -> "ttlIndex"))
- }
- }
- }
- }
- val (_, table) = streamingConfig.topic_table_mapping
- val schema = streamingConfig.getTableSchema(table)
- // Get the singleton instance of SQLContext
- val sqlContext = HIVEContextSingleton.getInstance(rdd.sparkContext)
- // Convert RDD[String] to RDD[case class] to DataFrame
- val dataFrame = sqlContext.createDataFrame(rdd, schema)
- // Register as table
- dataFrame.registerTempTable(table)
- // Do word count on table using SQL and print it
- val results = sqlContext.sql(streamingConfig.sql)
- //select dt,hh(vtm) as hr,app_key, collect_set(device_id) as deviceids from rd_e_app_header where dt=20150401 and hh(vtm)='01' group by dt,hh(vtm),app_key limit 100 ;
- // results.show()
- streamingConfig.targetType match {
- case "mongo" => saveToMongo(results)
- case "show" => results.show()
- }
- }
- def saveToMongo(df: DataFrame)(implicit streamingConfig: StreamingConfig) = {
- val target = streamingConfig.getTarget().asInstanceOf[MongoTarget]
- val coll = MongoDBClient.db(target.collection)
- val result = df.collect()
- if (result.size > 0) {
- val bulkWrite = coll.initializeUnorderedBulkOperation
- result.foreach(row => {
- val id = row(target.pkIndex)
- val setFields = target.columns.filter(p => p.op == "set").map(f => (f.name, row(f.index))).toArray
- val incFields = target.columns.filter(p => p.op == "inc").map(f => {
- (f.name, row(f.index).asInstanceOf[Long])
- }).toArray
- // obj=obj.++($addToSet(MongoDBObject("test"->MongoDBObject("$each"->Array(3,4)),"test1"->MongoDBObject("$each"->Array(1,2)))))
- var obj = MongoDBObject()
- var addToSetObj = MongoDBObject()
- target.columns.filter(p => p.op == "addToSet").foreach(col => {
- col.mType match {
- case "Int" =>
- addToSetObj = addToSetObj.++(col.name -> MongoDBObject("$each" -> row(col.index).asInstanceOf[ArrayBuffer[Int]]))
- case "Long" =>
- addToSetObj = addToSetObj.++(col.name -> MongoDBObject("$each" -> row(col.index).asInstanceOf[ArrayBuffer[Long]]))
- case "String" =>
- addToSetObj = addToSetObj.++(col.name -> MongoDBObject("$each" -> row(col.index).asInstanceOf[ArrayBuffer[String]]))
- }
- })
- if (addToSetObj.size > 0) obj = obj.++($addToSet(addToSetObj))
- if (incFields.size > 0) obj = obj.++($inc(incFields: _*))
- if (setFields.size > 0) obj = obj.++($set(setFields: _*))
- bulkWrite.find(MongoDBObject("_id" -> id)).upsert().updateOne(obj)
- })
- bulkWrite.execute()
- }
- }
仔细想一想,还是没有实现精确一次的语义,写入mongo和更新ZK由于不是一个事务的,如果更新mongo成功,然后更新ZK失败,则下次启动的时候这个批次的数据就被重复计算,对于UV由于是addToSet去重操作,没什么影响,但是PV是inc操作就会多算这一个批次的的数据,其实如果batch time比较短的话,其实都还是可以接受的。
spark的kafka的低阶API createDirectStream的一些总结。相关推荐
- 这几天折腾spark的kafka的低阶API createDirectStream的一些总结。
大家都知道在spark1.3版本后,kafkautil里面提供了两个创建dstream的方法,一个是老版本中有的createStream方法,还有一个是后面新加的createDirectStream方 ...
- 基于Tensorflow2.x低阶API搭建神经网络模型并训练及解决梯度爆炸与消失方法实践
1. 低阶API神经网络模型 1.1. 关于tf.Module 关于Tensorflow 2.x,最令我觉得有意思的功能就是tf.function和AutoGraph了.他们可以把Python风格的代 ...
- 【进阶篇】全流程学习《20天掌握Pytorch实战》纪实 | Day08 | 低阶API示范
- spark消费kafka产生数据堆积怎么处理_SparkStreaming读取Kafka的两种方式
本文主要从以下几个方面介绍SparkStreaming读取Kafka的两种方式: 一.SparkStreaming简介 二.Kafka简介 三.Redis简介(可用于保存历史数据或偏移量数据) 四.S ...
- Spark对Kafka两种连接方式的对比——Receiver和Direct
在知乎 Flink 取代 Spark Streaming 的实战之路中,提到 因此下面对两种方式进行详细说明一下. Receiver方式 Receiver:接收器模式是使用Kafka高级Consume ...
- pythonspark实践_基于Python的Spark Streaming Kafka编程实践
版权声明:本文为CSDN博主原创文章,未经博主允许不得转载. 说明 Spark Streaming的原理说明的文章很多,这里不做介绍.本文主要介绍使用Kafka作为数据源的编程模型,编码实践,以及一些 ...
- Scala Spark Streaming + Kafka + Zookeeper完成数据的发布和消费
一.Spark Streaming Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理.数据可以从许多来源(如Kafka,Flume,Kine ...
- spark第十篇:Spark与Kafka整合
spark与kafka整合需要引入spark-streaming-kafka.jar,该jar根据kafka版本有2个分支,分别是spark-streaming-kafka-0-8和spark-str ...
- spark streamming + kafka + Redis 实践
java操作Redis:http://blog.csdn.net/xyang81/article/details/51918129 数据order.txt A 202.106.196.115 手机 i ...
最新文章
- PHP jQuery Ajax 无刷新表单提交实例
- mpython_mPython下载 mPython(图形化编程软件) v0.5.0 官方安装版 下载-脚本之家
- 神经网络中,正则化L1与L2的区别、如何选择以及代码验证
- faster-rcnn tensorflow windows demo运行
- c语言中变量后减号大于号,大于等于运算符.ppt
- Filter过滤器概念及生命周期
- 陈纪修老师《数学分析》 第08章:反常积分 笔记
- 数学建模论文题目优选专业题目128个
- CVE-2013-3893 IE浏览器UAF漏洞分析
- 空城旧梦,相逢不必邂逅
- 无法复制:数据错误(循环冗余检查)的解决方法
- 知道今天是星期几java_java如何判断今天是星期几
- 【手写汉字识别】基于深度学习的脱机手写汉字识别技术研究
- wps excel查找不定位_【WPS神技能】如何在WPSExcel表格中批量查找两列数据的不同?...
- 各大主流BBS论坛程序简介
- 《重学设计模式》PDF 出炉了 - 小傅哥,肝了50天写出18万字271页的实战编程资料...
- Express 框架
- 江南大学人工智能与计算机学院拟录取名单,江南大学人工智能与计算机学院2021考研预调剂信息...
- EPS DATA数据库:各省市上市公司统计
- 关于QQHelper.mo删除的方法
热门文章
- SpringSecurity - WebSecurityConfigurerAdapter 过时问题
- JS与jQuery小结
- html收集用户信息的标签,Python实现抖音网页端用户页面信息爬取
- 新个人所得税EXCEL计算公式以及税后工资反算税前工资公式详解
- RFIC中的电流回路问题
- 文章瞎读 SigVox – A 3D feature matching algorithm for automatic street object recognition in mobile lase
- 大型语言模型中的隐私考量
- 循序渐进之单点登录(4)--分布式系统认证(OAuth2,JWT)
- python attrs_在python中dict和attrs是什么关系?
- Latex插图索引太长了如何解决?