spark streaming中维护kafka偏移量到外部介质

以kafka偏移量维护到redis为例。

redis存储格式

使用的数据结构为string,其中key为topic:partition,value为offset。

例如bobo这个topic下有3个分区,则key-value结构如下:

bobo:0的偏移量为x

bobo:1的偏移量为y

bobo:2的偏移量为z

消费时指定offset

主要是如下两个方法:

createKafkaStream()创建kakfa流

getOffsets()从redis中获取offsets

/**

* kakfa参数

*/

private val kafkaParams = Map[String, Object](

"bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"group.id" -> "use_a_separate_group_id_for_each_stream",

// 注意这里是none。

"auto.offset.reset" -> "none",

"enable.auto.commit" -> (false: java.lang.Boolean)

)

// `bobo`topic下有3个分区

private val topicPartitions = Map[String, Int]("bobo" -> 3)

// 从redis中获取offsets

def getOffsets: Map[TopicPartition, Long] = {

val jedis = InternalRedisClient.getResource

// 设置每个分区起始的offset

val offsets = mutable.Map[TopicPartition, Long]()

topicPartitions.foreach { it =>

val topic = it._1

val partitions = it._2

// 遍历分区,设置每个topic下对应partition的offset

for (partition

val topicPartitionKey = topic + ":" + partition

var lastOffset = 0L

val lastSavedOffset = jedis.get(topicPartitionKey)

if (null != lastSavedOffset) {

try {

lastOffset = lastSavedOffset.toLong

} catch {

case e: Exception =>

log.error("get lastSavedOffset error", e)

System.exit(1)

}

}

log.info("from redis topic: {}, partition: {}, lastOffset: {}", topic, partition, lastOffset)

// 添加

offsets += (new TopicPartition(topic, partition) -> lastOffset)

}

}

InternalRedisClient.returnResource(jedis)

offsets.toMap

}

/**

* 创建kakfa流

*

* @param ssc StreamingContext

* @return InputDStream

*/

def createKafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {

val offsets = getOffsets

// 创建kafka stream

val stream = KafkaUtils.createDirectStream[String, String](

ssc,

LocationStrategies.PreferConsistent,

ConsumerStrategies.Assign[String, String](offsets.keys.toList, kafkaParams, offsets)

)

stream

}

其中:核心是通过ConsumerStrategies.Assign方法来指定topic下对应partition的offset信息。

更新offset到redis

最后将offset信息维护到redis即可。

/**

* 消费

*

* @param stream InputDStream

*/

def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {

stream.foreachRDD { rdd =>

// 获取offset信息

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

// 计算相关指标,这里就统计下条数了

val total = rdd.count()

val jedis = InternalRedisClient.getResource

val pipeline = jedis.pipelined()

// 会阻塞redis

pipeline.multi()

// 更新相关指标

pipeline.incrBy("totalRecords", total)

// 更新offset

offsetRanges.foreach { offsetRange =>

log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetRange.topic, offsetRange.partition, offsetRange.untilOffset)

val topicPartitionKey = offsetRange.topic + ":" + offsetRange.partition

pipeline.set(topicPartitionKey, offsetRange.untilOffset + "")

}

// 执行,释放

pipeline.exec()

pipeline.sync()

pipeline.close()

InternalRedisClient.returnResource(jedis)

}

}

参考

spark代码

顺便贴一下自己整理的spark相关的代码。

主要包括:

RDD的基本使用

SQL

jdbc(读、写)

hive(读、写、动态分区)

Streaming

消费kafka(手动提交、手动维护offset)

写入HBase

写入Hive

kafka spark java_spark streaming中维护kafka偏移量到外部介质相关推荐

  1. kafka spark Structured streaming整合后集群报错KafkaConsumer.subscribe(Ljava/util/Collection;)V

    简介 整个项目架构是在CDH中,,然后spark Structured streaming消费kafka. spark 2.3版本 kafka0.10版本 <!-- spark sql kafk ...

  2. Java 版spark Streaming 维护kafka 的偏移量

    基于Direct  API  手动维护kafka 的偏移量,  将偏移量同步导了 redis 中, 我将对比较重要的代码拿出来说明, 完整代码在下方: 首先是通过Direct AIP 获取 JavaI ...

  3. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

    如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...

  4. Flume+Kafka+Spark Streaming+MySQL实时日志分析

    文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...

  5. 大数据Spark Structured Streaming集成 Kafka

    目录 1 Kafka 数据消费 2 Kafka 数据源 3 Kafka 接收器 3.1 配置说明 3.2 实时数据ETL架构 3.3 模拟基站日志数据 3.4 实时增量ETL 4 Kafka 特定配置 ...

  6. Kafka+Spark Streaming如何保证exactly once语义

    大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 在Kafka.Storm.Flink.Spark Streaming等分布式流处理系统中(没错,Ka ...

  7. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装

    一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...

  8. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。...

    Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭 ...

  9. Flume+Kafka+Spark Streaming实现大数据实时流式数据采集

    近年来,随着企业信息化建设的飞速发展,大数据应用的问题越来越备受关注.很多企业投入大量的人力.物力和财力建设企业大数据平台,平台建设工作涵盖数据采集.数据处理.数据存储.数据服务.数据展示以及数据质量 ...

最新文章

  1. Java项目 常用包的命名及理解【dao包、domain包、service包、utils包、web包、impl包】
  2. IMP出现的ORA-01401错误可能和字符集有关(转载)
  3. Oracle11完全卸载方法
  4. java锁包读写锁_java8读写锁如何使用
  5. 为何谷歌放弃以甜品命名android,甜点不见了 谷歌变更Android命名方式
  6. Vue Router 4 快速入门
  7. 算法笔记_面试题_13.二叉树的最近公共祖先
  8. Qt配置OpenCV教程,无需复杂的编译过程,(详细版)
  9. 网站优化之robots文件详解
  10. 瓜娃系列 (6) - ComparisonChain和primitives包
  11. 极米投影、坚果投影、当贝投影,三大品牌对比来了
  12. 研究表明:漂亮的配图会让数据/结果看起更可靠
  13. 在 Lenovo G360 笔记本上安装 Debian Squeeze AMD64
  14. csvn-httpd启动报AH00094错解决办法
  15. 指针的类型和指针所指向的类型
  16. 物联网创业项目(物联网创业点子大全500个)
  17. python 微信公众号网页接口调用_Python调用微信公众平台接口操作示例
  18. OS_PV操作_5.行人与机动车过路口问题
  19. 读书笔记(四)--PBFT
  20. 【转】码斗士的修炼之路 -- 如何保持并提升战斗力

热门文章

  1. bzoj2285 [SDOI2011]保密 分数规划spfa+最小割
  2. strace 简单用法
  3. Linux多线程同步
  4. 《程序员面试宝典》精华 面向对象部分
  5. python定时发送微信消息_Python3 itchat实现微信定时发送群消息的实例代码
  6. IOS 内存优化和调试技巧
  7. Unreal、CryEngine、Gamebryo引擎介绍
  8. watchOS更新后 Apple Watch 4心电图功能已开始支持欧洲用户
  9. 25.4. Phing
  10. 过程 线 多线程 并发 同步异步