使用redis kv数据库维护kafka主题分区的offset
目的
将kafka的offset保存到外部的redis数据库中,再次读取的时候也从外部的redis数据库读取
主要步骤
1 从kafka获取要读取的消息的开始offset
2 通过offset读取数据,进行处理
3将读取到的最新的offset更新到redis
演示案例
首先启动生产者
kafka-console-producer.sh \
--broker-list mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet
下述为consumer.properties的内容,消费者策略从这里提取
bootstrap.servers=mypc01:9092,mypc02:9092,mypc03:9092
group.id=test1
enable.auto.commit=false
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
zookeeper.servers=mypc01:2181,mypc02:2181,mypc03:2181
实例代码
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.{Jedis, JedisPool}import java.util
import java.util.Properties/*
使用redis kv数据库维护kafka主题分区的offset
1 从kafka获取要读取的消息的开始offset
2 通过offset读取数据,进行处理
3将读取到的最新的offset更新到redisredis 存储offset数据的设计思路:使用hash类型比较ok
test1 pet0 12pet1 10pet2 12*/
object RedisOffsetDemo extends App {private val conf = new SparkConf().setAppName("test").setMaster("local[*]").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")private val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))private val properties = new Properties()//加载消费者的配置文件properties.load(RedisOffsetDemo.getClass.getClassLoader.getResourceAsStream("consumer.properties"))//将消费者的配置参数转为Map类型private val paras: Map[String, String] = Map[String, String]("bootstrap.servers" -> properties.getProperty("bootstrap.servers"),"group.id" -> properties.getProperty("group.id"),"enable.auto.commit" -> properties.getProperty("enable.auto.commit"),"key.deserializer" -> properties.getProperty("key.deserializer"),"value.deserializer" -> properties.getProperty("value.deserializer"))//定义topic数组val topics = Array("pet")private val RedisUtils = new RedisUtils()//获取jedis对象private val jedis: Jedis = RedisUtils.getJedis//获取offsets对象,类型是一个Mapprivate val offsets: Map[TopicPartition, Long] = RedisUtils.getOffset(jedis, properties)var dstream: InputDStream[ConsumerRecord[String, String]] = _//如果offsets不为空,就从offsets处开始消费if (offsets.nonEmpty) {//从kafka消费数据,消费的数据构成一个DStream,之后就可以应用各种算子进行处理了//createDirectStream的第三个参数是个方法,且该方法可以传入一个offsets//如果需要手动提交,我们需要传入这个offsetsdstream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Array("pet"), paras, offsets))} else {//如果offsets为空,就从头开始消费dstream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Array("pet"), paras))}//处理消费者获取的数据//从kafka获取的Dstream,每一条都是一个ConsumerRecord,就是一条消息//从消息上可以解析出各种信息dstream.foreachRDD((rdd: RDD[ConsumerRecord[String, String]]) => {rdd.foreach((x: ConsumerRecord[String, String]) => {println(s"partition: ${x.partition()} offset: ${x.offset()} value: ${x.value()}")//获取offset的最新值val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRangesRedisUtils.updateOffsets(properties.getProperty("group.id"), ranges, jedis)})})ssc.start()ssc.awaitTermination()
}class RedisUtils {//定义一个获取jedis的工具方法def getJedis: Jedis = {val config = new GenericObjectPoolConfig()config.setMaxTotal(15)//最大空闲连接数config.setMaxIdle(10)//最小空闲连接数config.setMinIdle(5)//创建线程池val pool = new JedisPool(config, "mypc01", 6379)//获取连接对象val jedis: Jedis = pool.getResourcejedis}//定义一个获取offsets对象的工具方法def getOffset(jedis: Jedis, prop: Properties): Map[TopicPartition, Long] = {//定义一个空的offsets对象var offsets: Map[TopicPartition, Long] = Map()//通过组名作为key从redis获取对应的field和value//本例中就是获取key=test1的fied以及value,返回的是一个map//就是利用hash类型的key获取hash类型的值//此处field代表 主题,value代表offset的那个数字//Map((pet0,11),(pet0,18))var kvs: util.Map[String, String] = jedis.hgetAll(prop.getProperty("group.id"))import scala.collection.JavaConversions._for (kv <- kvs) {val arr: Array[String] = kv._1.split("#")//从field解析出topicval topic: String = arr(0)从field解析出partitionval partition: Int = arr(1).toInt//offsets是个mapoffsets += (new TopicPartition(topic, partition) -> kv._2.toLong)}offsets}def updateOffsets(groupName: String, range: Array[OffsetRange], jedis: Jedis): Unit = {for (x <- range) {jedis.hset(groupName, x.topic + "#" + x.partition, x.untilOffset.toString)}}
}
解析
def createDirectStream[K, V](ssc: StreamingContext, locationStrategy: LocationStrategy, consumerStrategy: ConsumerStrategy[K, V]): InputDStream[ConsumerRecord[K, V]]
DStream的Scala构造函数,其中每个给定的Kafka主题/分区都对应于RDD分区。 spark配置spark.streaming.kafka.maxRatePerPartition
给出每个分区每秒接受的最大消息数。
org.apache.spark.streaming.kafka010
trait HasOffsetRanges
表示任何具有OffsetRanges集合的对象。 这可用于访问由直接Kafka DStream生成的RDD中的偏移范围(请参阅KafkaUtils.createDirectStream)。
KafkaUtils.createDirectStream(...)。foreachRDD {rdd =>val offsetRanges = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges...}
使用redis kv数据库维护kafka主题分区的offset相关推荐
- 【Kafka】Kafka 1.1.0以后版本获取Kafka每个分区最新Offset的几种方法
1.概述 脚本方法 [lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ ./bin/kafka-run-
- 【Kafka】Kafka 0.10.0版本获取Kafka每个分区最新Offset的几种方法
1.概述 脚本方法 [root@1 kafka]# ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list xxx:9092
- 使用Redis和Apache Kafka处理时间序列数据
目录 场景:设备监控 解决方案架构 先决条件: 设置基础设施组件 设置本地服务 MQTT代理 Grafana Kafka 连接 创建MQTT源连接器实例 部署设备数据处理器应用程序 启动模拟设备数据生 ...
- Kafka主题体系架构-复制、故障转移和并行处理
本文讨论了Kafka主题的体系架构,讨论了如何将分区用于故障转移和并行处理. Kafka主题,日志和分区 Kafka将主题存储在日志中.主题日志分为多个分区.Kafka将日志的分区分布在多个服务器或磁 ...
- Kafka系列之:深入理解Kafka 主题、分区、副本、LEO、ISR、HW、Kafka的主写主读和分区leader选举
Kafka系列之:深入理解Kafka 主题.分区.副本.LEO.ISR.HW.Kafka的主写主读和分区leader选举 一.Kafka重要知识点提炼 二.详细介绍Kafka 主题.分区.副本.LEO ...
- 使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...
原文:http://blog.csdn.net/changong28/article/details/39325079 使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指 ...
- kafka的分区策略(partition assignment strategy)
概述 kafka的分区策略指的是producer端的 各个partition中的数据如何安排给consumer消费. Range(按范围) ange策略是对每个主题而言的,首先对同一个主题里面的分区按 ...
- java上传kafka的方法_哪种方法是将所有数据从Kafka主题复制到接收器(文件或Hive表)的最佳方法?...
我正在使用Kafka Consumer API将所有数据从Kafka主题复制到Hive表 . 为此,我使用HDFS作为中间步骤 . 我使用唯一的组ID并将偏移重置为"最早",以便从 ...
- 【kafka】kafka 指定分区消费 不会触发 reblance
文章目录 1.概述 2.验证 2.1 2个都是subscribeTopic 2.2 指定消费与全部消费 2.3 两个指定消费 2.4 2个都消费同样的分区呢? 1.概述 今天在博客:Kafka-消费, ...
最新文章
- Mopaas 初体验
- 最小栈—leetcode155
- java树算法_Java数据结构算法(三)树
- 用一个比特币买一辆Model3?马斯克血洗空头后,苹果也要跟?
- ubuntu下的tomcat监控脚本
- PJAX,站点加速之翼
- android内存泄露_Java应用程序中的内存泄漏及内存管理
- 自己动手简单实现vbb的URL静态化
- Java 批量下载图片并压缩为Zip
- 域名投毒,DNS污染,域名欺骗,其实就是域名污染。
- 今日头条视频采集方法
- html easyui怎么实现折叠面板,Easyui 创建折叠面板_EasyUI 教程
- IPv6地址基础理论讲解
- [NWERC 2019] E. Expeditious Cubing 浮点数精度判断
- 关于vc 2008 runtime
- Android国际化多语言切换
- excel拆分单元格,然后每个拆分出的单元格沿用原未拆分单元格内容
- 低代码,虽然有点毒瘤,但管用就好
- No.2第一章 启航 | Flink 知其然,知其所以然
- GLES2.0中文API-glFramebufferTexture2D