kafka spark java_spark streaming中维护kafka偏移量到外部介质
spark streaming中维护kafka偏移量到外部介质
以kafka偏移量维护到redis为例。
redis存储格式
使用的数据结构为string,其中key为topic:partition,value为offset。
例如bobo这个topic下有3个分区,则key-value结构如下:
bobo:0的偏移量为x
bobo:1的偏移量为y
bobo:2的偏移量为z
消费时指定offset
主要是如下两个方法:
createKafkaStream()创建kakfa流
getOffsets()从redis中获取offsets
/**
* kakfa参数
*/
private val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
// 注意这里是none。
"auto.offset.reset" -> "none",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// `bobo`topic下有3个分区
private val topicPartitions = Map[String, Int]("bobo" -> 3)
// 从redis中获取offsets
def getOffsets: Map[TopicPartition, Long] = {
val jedis = InternalRedisClient.getResource
// 设置每个分区起始的offset
val offsets = mutable.Map[TopicPartition, Long]()
topicPartitions.foreach { it =>
val topic = it._1
val partitions = it._2
// 遍历分区,设置每个topic下对应partition的offset
for (partition
val topicPartitionKey = topic + ":" + partition
var lastOffset = 0L
val lastSavedOffset = jedis.get(topicPartitionKey)
if (null != lastSavedOffset) {
try {
lastOffset = lastSavedOffset.toLong
} catch {
case e: Exception =>
log.error("get lastSavedOffset error", e)
System.exit(1)
}
}
log.info("from redis topic: {}, partition: {}, lastOffset: {}", topic, partition, lastOffset)
// 添加
offsets += (new TopicPartition(topic, partition) -> lastOffset)
}
}
InternalRedisClient.returnResource(jedis)
offsets.toMap
}
/**
* 创建kakfa流
*
* @param ssc StreamingContext
* @return InputDStream
*/
def createKafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
val offsets = getOffsets
// 创建kafka stream
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](offsets.keys.toList, kafkaParams, offsets)
)
stream
}
其中:核心是通过ConsumerStrategies.Assign方法来指定topic下对应partition的offset信息。
更新offset到redis
最后将offset信息维护到redis即可。
/**
* 消费
*
* @param stream InputDStream
*/
def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
stream.foreachRDD { rdd =>
// 获取offset信息
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 计算相关指标,这里就统计下条数了
val total = rdd.count()
val jedis = InternalRedisClient.getResource
val pipeline = jedis.pipelined()
// 会阻塞redis
pipeline.multi()
// 更新相关指标
pipeline.incrBy("totalRecords", total)
// 更新offset
offsetRanges.foreach { offsetRange =>
log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetRange.topic, offsetRange.partition, offsetRange.untilOffset)
val topicPartitionKey = offsetRange.topic + ":" + offsetRange.partition
pipeline.set(topicPartitionKey, offsetRange.untilOffset + "")
}
// 执行,释放
pipeline.exec()
pipeline.sync()
pipeline.close()
InternalRedisClient.returnResource(jedis)
}
}
参考
spark代码
顺便贴一下自己整理的spark相关的代码。
主要包括:
RDD的基本使用
SQL
jdbc(读、写)
hive(读、写、动态分区)
Streaming
消费kafka(手动提交、手动维护offset)
写入HBase
写入Hive
kafka spark java_spark streaming中维护kafka偏移量到外部介质相关推荐
- kafka spark Structured streaming整合后集群报错KafkaConsumer.subscribe(Ljava/util/Collection;)V
简介 整个项目架构是在CDH中,,然后spark Structured streaming消费kafka. spark 2.3版本 kafka0.10版本 <!-- spark sql kafk ...
- Java 版spark Streaming 维护kafka 的偏移量
基于Direct API 手动维护kafka 的偏移量, 将偏移量同步导了 redis 中, 我将对比较重要的代码拿出来说明, 完整代码在下方: 首先是通过Direct AIP 获取 JavaI ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0
如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...
- Flume+Kafka+Spark Streaming+MySQL实时日志分析
文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...
- 大数据Spark Structured Streaming集成 Kafka
目录 1 Kafka 数据消费 2 Kafka 数据源 3 Kafka 接收器 3.1 配置说明 3.2 实时数据ETL架构 3.3 模拟基站日志数据 3.4 实时增量ETL 4 Kafka 特定配置 ...
- Kafka+Spark Streaming如何保证exactly once语义
大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 在Kafka.Storm.Flink.Spark Streaming等分布式流处理系统中(没错,Ka ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装
一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。...
Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭 ...
- Flume+Kafka+Spark Streaming实现大数据实时流式数据采集
近年来,随着企业信息化建设的飞速发展,大数据应用的问题越来越备受关注.很多企业投入大量的人力.物力和财力建设企业大数据平台,平台建设工作涵盖数据采集.数据处理.数据存储.数据服务.数据展示以及数据质量 ...
最新文章
- Java项目 常用包的命名及理解【dao包、domain包、service包、utils包、web包、impl包】
- IMP出现的ORA-01401错误可能和字符集有关(转载)
- Oracle11完全卸载方法
- java锁包读写锁_java8读写锁如何使用
- 为何谷歌放弃以甜品命名android,甜点不见了 谷歌变更Android命名方式
- Vue Router 4 快速入门
- 算法笔记_面试题_13.二叉树的最近公共祖先
- Qt配置OpenCV教程,无需复杂的编译过程,(详细版)
- 网站优化之robots文件详解
- 瓜娃系列 (6) - ComparisonChain和primitives包
- 极米投影、坚果投影、当贝投影,三大品牌对比来了
- 研究表明:漂亮的配图会让数据/结果看起更可靠
- 在 Lenovo G360 笔记本上安装 Debian Squeeze AMD64
- csvn-httpd启动报AH00094错解决办法
- 指针的类型和指针所指向的类型
- 物联网创业项目(物联网创业点子大全500个)
- python 微信公众号网页接口调用_Python调用微信公众平台接口操作示例
- OS_PV操作_5.行人与机动车过路口问题
- 读书笔记(四)--PBFT
- 【转】码斗士的修炼之路 -- 如何保持并提升战斗力