大家都知道在spark1.3版本后,kafkautil里面提供了两个创建dstream的方法,一个是老版本中有的createStream方法,还有一个是后面新加的createDirectStream方法。关于这两个方法的优缺点,官方已经说的很详细(http://spark.apache.org/docs/latest/streaming-kafka-integration.html),总之就是createDirectStream性能会更好一点,通过新方法创建出来的dstream的rdd partition和kafka的topic的partition是一一对应的,通过低阶API直接从kafka的topic消费消息,但是它不再往zookeeper中更新consumer offsets,使得基于zk的consumer offsets的监控工具都会失效。

官方只是蜻蜓点水般的说了一下可以在foreachRDD中更新zookeeper上的offsets:

[plain] view plaincopy
  1. directKafkaStream.foreachRDD { rdd =>
  2. val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
  3. // offsetRanges.length = # of Kafka partitions being consumed
  4. ...
  5. }

对应 Exactly-once semantics要自己去实现了,大致的实现思路就是在driver启动的时候先从zk上获得consumer offsets信息,createDirectStream有两个重载方法,其中一个可以设置从任意offsets位置开始消费,部分代码如下:

[plain] view plaincopy
  1. def createDirectStream(implicit streamingConfig: StreamingConfig, kc: KafkaCluster) = {
  2. val extractors = streamingConfig.getExtractors()
  3. //从zookeeper上读取offset开始消费message
  4. val messages = {
  5. val kafkaPartitionsE = kc.getPartitions(streamingConfig.topicSet)
  6. if (kafkaPartitionsE.isLeft) throw new SparkException("get kafka partition failed:")
  7. val kafkaPartitions = kafkaPartitionsE.right.get
  8. val consumerOffsetsE = kc.getConsumerOffsets(streamingConfig.group, kafkaPartitions)
  9. if (consumerOffsetsE.isLeft) throw new SparkException("get kafka consumer offsets failed:")
  10. val consumerOffsets = consumerOffsetsE.right.get
  11. consumerOffsets.foreach {
  12. case (tp, n) => println("===================================" + tp.topic + "," + tp.partition + "," + n)
  13. }
  14. KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
  15. ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message))
  16. }
  17. messages
  18. }

这里会有几个问题,就是在一个group是新的consumer group时,即首次消费,zk上海没有相应的group offsets目录,这时要先初始化一下zk上的offsets目录,或者是zk上记录的offsets已经过时,由于kafka有定时清理策略,直接从zk上的offsets开始消费会报ArrayOutofRange异常,即找不到offsets所属的index文件了,针对这两种情况,做了以下处理:

[plain] view plaincopy
  1. def setOrUpdateOffsets(implicit streamingConfig: StreamingConfig, kc: KafkaCluster): Unit = {
  2. streamingConfig.topicSet.foreach(topic => {
  3. println("current topic:" + topic)
  4. var hasConsumed = true
  5. val kafkaPartitionsE = kc.getPartitions(Set(topic))
  6. if (kafkaPartitionsE.isLeft) throw new SparkException("get kafka partition failed:")
  7. val kafkaPartitions = kafkaPartitionsE.right.get
  8. val consumerOffsetsE = kc.getConsumerOffsets(streamingConfig.group, kafkaPartitions)
  9. if (consumerOffsetsE.isLeft) hasConsumed = false
  10. if (hasConsumed) {
  11. //如果有消费过,有两种可能,如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
  12. //针对这种情况,只要判断一下zk上的consumerOffsets和leaderEarliestOffsets的大小,如果consumerOffsets比leaderEarliestOffsets还小的话,说明是过时的offsets,这时把leaderEarliestOffsets更新为consumerOffsets
  13. val leaderEarliestOffsets = kc.getEarliestLeaderOffsets(kafkaPartitions).right.get
  14. println(leaderEarliestOffsets)
  15. val consumerOffsets = consumerOffsetsE.right.get
  16. val flag = consumerOffsets.forall {
  17. case (tp, n) => n < leaderEarliestOffsets(tp).offset
  18. }
  19. if (flag) {
  20. println("consumer group:" + streamingConfig.group + " offsets已经过时,更新为leaderEarliestOffsets")
  21. val offsets = leaderEarliestOffsets.map {
  22. case (tp, offset) => (tp, offset.offset)
  23. }
  24. kc.setConsumerOffsets(streamingConfig.group, offsets)
  25. }
  26. else {
  27. println("consumer group:" + streamingConfig.group + " offsets正常,无需更新")
  28. }
  29. }
  30. else {
  31. //如果没有被消费过,则从最新的offset开始消费。
  32. val leaderLatestOffsets = kc.getLatestLeaderOffsets(kafkaPartitions).right.get
  33. println(leaderLatestOffsets)
  34. println("consumer group:" + streamingConfig.group + " 还未消费过,更新为leaderLatestOffsets")
  35. val offsets = leaderLatestOffsets.map {
  36. case (tp, offset) => (tp, offset.offset)
  37. }
  38. kc.setConsumerOffsets(streamingConfig.group, offsets)
  39. }
  40. })
  41. }

这里又碰到了一个问题,从consumer offsets到leader latest offsets中间延迟了很多消息,在下一次启动的时候,首个batch要处理大量的消息,会导致spark-submit设置的资源无法满足大量消息的处理而导致崩溃。因此在spark-submit启动的时候多加了一个配置:--conf spark.streaming.kafka.maxRatePerPartition=10000。限制每秒钟从topic的每个partition最多消费的消息条数,这样就把首个batch的大量的消息拆分到多个batch中去了,为了更快的消化掉delay的消息,可以调大计算资源和把这个参数调大。

OK,driver启动的问题解决了,那么接下来处理处理完消息后更新zk offsets的工作,这里要注意是在处理完之后再更新,想想如果你消费了消息先更新zk offset在去处理消息将处理好的消息保存到其他地方去,如果后一步由于处理消息的代码有BUG失败了,前一步已经更新了zk了,会导致这部分消息虽然被消费了但是没被处理,等你把处理消息的BUG修复再重新提交后,这部分消息在下次启动的时候不会再被消费了,因为你已经更新了ZK OFFSETS,针对这些因素考虑,部分代码实现如下:

[plain] view plaincopy
  1. def updateZKOffsets(rdd: RDD[(String, String)])(implicit streamingConfig: StreamingConfig, kc: KafkaCluster): Unit = {
  2. println("rdd not empty,update zk offset")
  3. val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  4. for (offsets <- offsetsList) {
  5. val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
  6. val o = kc.setConsumerOffsets(streamingConfig.group, Map((topicAndPartition, offsets.untilOffset)))
  7. if (o.isLeft) {
  8. println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
  9. }
  10. }
  11. }
  12. def processData(messages: InputDStream[(String, String)])(implicit streamingConfig: StreamingConfig, kc: KafkaCluster): Unit = {
  13. messages.foreachRDD(rdd => {
  14. if (!rdd.isEmpty()) {
  15. val datamodelRDD = streamingConfig.relation match {
  16. case "1" =>
  17. val (topic, _) = streamingConfig.topic_table_mapping
  18. val extractor = streamingConfig.getExtractor(topic)
  19. // Create direct kafka stream with brokers and topics
  20. val topicsSet = Set(topic)
  21. val datamodel = rdd.filter(msg => {
  22. extractor.filter(msg)
  23. }).map(msg => extractor.msgToRow(msg))
  24. datamodel
  25. case "2" =>
  26. val (topics, _) = streamingConfig.topic_table_mapping
  27. val extractors = streamingConfig.getExtractors(topics)
  28. val topicsSet = topics.split(",").toSet
  29. //kafka msg为key-value形式,key用来对msg进行分区用的,为了散列存储消息,采集器那边key采用的是:topic|加一个随机数的形式,例如:rd_e_pal|20,split by |取0可以拿到对应的topic名字,这样union在一起的消息可以区分出来自哪一个topic
  30. val datamodel = rdd.filter(msg => {
  31. //kafka msg为key-value形式,key用来对msg进行分区用的,为了散列存储消息,采集器那边key采用的是:topic|加一个随机数的形式,例如:rd_e_pal|20,split by |取0可以拿到对应的topic名字,这样union在一起的消息可以区分出来自哪一个topic
  32. val keyValid = msg != null && msg._1 != null && msg._1.split("\\|").length == 2
  33. if (keyValid) {
  34. val topic = msg._1.split("\\|")(0)
  35. val (_, extractor) = extractors.find(p => {
  36. p._1.equalsIgnoreCase(topic)
  37. }).getOrElse(throw new RuntimeException("配置文件中没有找到topic:" + topic + " 对应的extractor"))
  38. //trim去掉末尾的换行符,否则取最后一个字段时会有一个\n
  39. extractor.filter(msg._2.trim)
  40. }
  41. else {
  42. false
  43. }
  44. }).map {
  45. case (key, msgContent) =>
  46. val topic = key.split("\\|")(0)
  47. val (_, extractor) = extractors.find(p => {
  48. p._1.equalsIgnoreCase(topic)
  49. }).getOrElse(throw new RuntimeException("配置文件中没有找到topic:" + topic + " 对应的extractor"))
  50. extractor.msgToRow((key, msgContent))
  51. }
  52. datamodel
  53. }
  54. //先处理消息
  55. processRDD(datamodelRDD)
  56. //再更新offsets
  57. updateZKOffsets(rdd)
  58. }
  59. })
  60. }
  61. def processRDD(rdd: RDD[Row])(implicit streamingConfig: StreamingConfig) = {
  62. if (streamingConfig.targetType == "mongo") {
  63. val target = streamingConfig.getTarget().asInstanceOf[MongoTarget]
  64. if (!MongoDBClient.db.collectionExists(target.collection)) {
  65. println("create collection:" + target.collection)
  66. MongoDBClient.db.createCollection(target.collection, MongoDBObject("storageEngine" -> MongoDBObject("wiredTiger" -> MongoDBObject())))
  67. val coll = MongoDBClient.db(target.collection)
  68. //创建ttl index
  69. if (target.ttlIndex) {
  70. val indexs = coll.getIndexInfo
  71. if (indexs.find(p => p.get("name") == "ttlIndex") == None) {
  72. coll.createIndex(MongoDBObject(target.ttlColumn -> 1), MongoDBObject("expireAfterSeconds" -> target.ttlExpire, "name" -> "ttlIndex"))
  73. }
  74. }
  75. }
  76. }
  77. val (_, table) = streamingConfig.topic_table_mapping
  78. val schema = streamingConfig.getTableSchema(table)
  79. // Get the singleton instance of SQLContext
  80. val sqlContext = HIVEContextSingleton.getInstance(rdd.sparkContext)
  81. // Convert RDD[String] to RDD[case class] to DataFrame
  82. val dataFrame = sqlContext.createDataFrame(rdd, schema)
  83. // Register as table
  84. dataFrame.registerTempTable(table)
  85. // Do word count on table using SQL and print it
  86. val results = sqlContext.sql(streamingConfig.sql)
  87. //select dt,hh(vtm) as hr,app_key, collect_set(device_id) as deviceids  from rd_e_app_header where dt=20150401 and hh(vtm)='01' group by dt,hh(vtm),app_key limit 100 ;
  88. //          results.show()
  89. streamingConfig.targetType match {
  90. case "mongo" => saveToMongo(results)
  91. case "show" => results.show()
  92. }
  93. }
  94. def saveToMongo(df: DataFrame)(implicit streamingConfig: StreamingConfig) = {
  95. val target = streamingConfig.getTarget().asInstanceOf[MongoTarget]
  96. val coll = MongoDBClient.db(target.collection)
  97. val result = df.collect()
  98. if (result.size > 0) {
  99. val bulkWrite = coll.initializeUnorderedBulkOperation
  100. result.foreach(row => {
  101. val id = row(target.pkIndex)
  102. val setFields = target.columns.filter(p => p.op == "set").map(f => (f.name, row(f.index))).toArray
  103. val incFields = target.columns.filter(p => p.op == "inc").map(f => {
  104. (f.name, row(f.index).asInstanceOf[Long])
  105. }).toArray
  106. //        obj=obj.++($addToSet(MongoDBObject("test"->MongoDBObject("$each"->Array(3,4)),"test1"->MongoDBObject("$each"->Array(1,2)))))
  107. var obj = MongoDBObject()
  108. var addToSetObj = MongoDBObject()
  109. target.columns.filter(p => p.op == "addToSet").foreach(col => {
  110. col.mType match {
  111. case "Int" =>
  112. addToSetObj = addToSetObj.++(col.name -> MongoDBObject("$each" -> row(col.index).asInstanceOf[ArrayBuffer[Int]]))
  113. case "Long" =>
  114. addToSetObj = addToSetObj.++(col.name -> MongoDBObject("$each" -> row(col.index).asInstanceOf[ArrayBuffer[Long]]))
  115. case "String" =>
  116. addToSetObj = addToSetObj.++(col.name -> MongoDBObject("$each" -> row(col.index).asInstanceOf[ArrayBuffer[String]]))
  117. }
  118. })
  119. if (addToSetObj.size > 0) obj = obj.++($addToSet(addToSetObj))
  120. if (incFields.size > 0) obj = obj.++($inc(incFields: _*))
  121. if (setFields.size > 0) obj = obj.++($set(setFields: _*))
  122. bulkWrite.find(MongoDBObject("_id" -> id)).upsert().updateOne(obj)
  123. })
  124. bulkWrite.execute()
  125. }
  126. }

仔细想一想,还是没有实现精确一次的语义,写入mongo和更新ZK由于不是一个事务的,如果更新mongo成功,然后更新ZK失败,则下次启动的时候这个批次的数据就被重复计算,对于UV由于是addToSet去重操作,没什么影响,但是PV是inc操作就会多算这一个批次的的数据,其实如果batch time比较短的话,其实都还是可以接受的。

spark的kafka的低阶API createDirectStream的一些总结。相关推荐

  1. 这几天折腾spark的kafka的低阶API createDirectStream的一些总结。

    大家都知道在spark1.3版本后,kafkautil里面提供了两个创建dstream的方法,一个是老版本中有的createStream方法,还有一个是后面新加的createDirectStream方 ...

  2. 基于Tensorflow2.x低阶API搭建神经网络模型并训练及解决梯度爆炸与消失方法实践

    1. 低阶API神经网络模型 1.1. 关于tf.Module 关于Tensorflow 2.x,最令我觉得有意思的功能就是tf.function和AutoGraph了.他们可以把Python风格的代 ...

  3. 【进阶篇】全流程学习《20天掌握Pytorch实战》纪实 | Day08 | 低阶API示范

  4. spark消费kafka产生数据堆积怎么处理_SparkStreaming读取Kafka的两种方式

    本文主要从以下几个方面介绍SparkStreaming读取Kafka的两种方式: 一.SparkStreaming简介 二.Kafka简介 三.Redis简介(可用于保存历史数据或偏移量数据) 四.S ...

  5. Spark对Kafka两种连接方式的对比——Receiver和Direct

    在知乎 Flink 取代 Spark Streaming 的实战之路中,提到 因此下面对两种方式进行详细说明一下. Receiver方式 Receiver:接收器模式是使用Kafka高级Consume ...

  6. pythonspark实践_基于Python的Spark Streaming Kafka编程实践

    版权声明:本文为CSDN博主原创文章,未经博主允许不得转载. 说明 Spark Streaming的原理说明的文章很多,这里不做介绍.本文主要介绍使用Kafka作为数据源的编程模型,编码实践,以及一些 ...

  7. Scala Spark Streaming + Kafka + Zookeeper完成数据的发布和消费

    一.Spark Streaming Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理.数据可以从许多来源(如Kafka,Flume,Kine ...

  8. spark第十篇:Spark与Kafka整合

    spark与kafka整合需要引入spark-streaming-kafka.jar,该jar根据kafka版本有2个分支,分别是spark-streaming-kafka-0-8和spark-str ...

  9. spark streamming + kafka + Redis 实践

    java操作Redis:http://blog.csdn.net/xyang81/article/details/51918129 数据order.txt A 202.106.196.115 手机 i ...

最新文章

  1. PHP jQuery Ajax 无刷新表单提交实例
  2. mpython_mPython下载 mPython(图形化编程软件) v0.5.0 官方安装版 下载-脚本之家
  3. 神经网络中,正则化L1与L2的区别、如何选择以及代码验证
  4. faster-rcnn tensorflow windows demo运行
  5. c语言中变量后减号大于号,大于等于运算符.ppt
  6. Filter过滤器概念及生命周期
  7. 陈纪修老师《数学分析》 第08章:反常积分 笔记
  8. 数学建模论文题目优选专业题目128个
  9. CVE-2013-3893 IE浏览器UAF漏洞分析
  10. 空城旧梦,相逢不必邂逅
  11. 无法复制:数据错误(循环冗余检查)的解决方法
  12. 知道今天是星期几java_java如何判断今天是星期几
  13. 【手写汉字识别】基于深度学习的脱机手写汉字识别技术研究
  14. wps excel查找不定位_【WPS神技能】如何在WPSExcel表格中批量查找两列数据的不同?...
  15. 各大主流BBS论坛程序简介
  16. 《重学设计模式》PDF 出炉了 - 小傅哥,肝了50天写出18万字271页的实战编程资料...
  17. Express 框架
  18. 江南大学人工智能与计算机学院拟录取名单,江南大学人工智能与计算机学院2021考研预调剂信息...
  19. EPS DATA数据库:各省市上市公司统计
  20. 关于QQHelper.mo删除的方法

热门文章

  1. SpringSecurity - WebSecurityConfigurerAdapter 过时问题
  2. JS与jQuery小结
  3. html收集用户信息的标签,Python实现抖音网页端用户页面信息爬取
  4. 新个人所得税EXCEL计算公式以及税后工资反算税前工资公式详解
  5. RFIC中的电流回路问题
  6. 文章瞎读 SigVox – A 3D feature matching algorithm for automatic street object recognition in mobile lase
  7. 大型语言模型中的隐私考量
  8. 循序渐进之单点登录(4)--分布式系统认证(OAuth2,JWT)
  9. python attrs_在python中dict和attrs是什么关系?
  10. Latex插图索引太长了如何解决?