目的

将kafka的offset保存到外部的redis数据库中,再次读取的时候也从外部的redis数据库读取
主要步骤

1 从kafka获取要读取的消息的开始offset
2 通过offset读取数据,进行处理
3将读取到的最新的offset更新到redis

演示案例

首先启动生产者

kafka-console-producer.sh \
--broker-list mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet

下述为consumer.properties的内容,消费者策略从这里提取

bootstrap.servers=mypc01:9092,mypc02:9092,mypc03:9092
group.id=test1
enable.auto.commit=false
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
zookeeper.servers=mypc01:2181,mypc02:2181,mypc03:2181

实例代码

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.{Jedis, JedisPool}import java.util
import java.util.Properties/*
使用redis kv数据库维护kafka主题分区的offset
1 从kafka获取要读取的消息的开始offset
2 通过offset读取数据,进行处理
3将读取到的最新的offset更新到redisredis 存储offset数据的设计思路:使用hash类型比较ok
test1     pet0  12pet1  10pet2  12*/
object RedisOffsetDemo extends App {private val conf = new SparkConf().setAppName("test").setMaster("local[*]").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")private val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))private val properties = new Properties()//加载消费者的配置文件properties.load(RedisOffsetDemo.getClass.getClassLoader.getResourceAsStream("consumer.properties"))//将消费者的配置参数转为Map类型private val paras: Map[String, String] = Map[String, String]("bootstrap.servers" -> properties.getProperty("bootstrap.servers"),"group.id" -> properties.getProperty("group.id"),"enable.auto.commit" -> properties.getProperty("enable.auto.commit"),"key.deserializer" -> properties.getProperty("key.deserializer"),"value.deserializer" -> properties.getProperty("value.deserializer"))//定义topic数组val topics = Array("pet")private val RedisUtils = new RedisUtils()//获取jedis对象private val jedis: Jedis = RedisUtils.getJedis//获取offsets对象,类型是一个Mapprivate val offsets: Map[TopicPartition, Long] = RedisUtils.getOffset(jedis, properties)var dstream: InputDStream[ConsumerRecord[String, String]] = _//如果offsets不为空,就从offsets处开始消费if (offsets.nonEmpty) {//从kafka消费数据,消费的数据构成一个DStream,之后就可以应用各种算子进行处理了//createDirectStream的第三个参数是个方法,且该方法可以传入一个offsets//如果需要手动提交,我们需要传入这个offsetsdstream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Array("pet"), paras, offsets))} else {//如果offsets为空,就从头开始消费dstream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Array("pet"), paras))}//处理消费者获取的数据//从kafka获取的Dstream,每一条都是一个ConsumerRecord,就是一条消息//从消息上可以解析出各种信息dstream.foreachRDD((rdd: RDD[ConsumerRecord[String, String]]) => {rdd.foreach((x: ConsumerRecord[String, String]) => {println(s"partition: ${x.partition()} offset: ${x.offset()}  value: ${x.value()}")//获取offset的最新值val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRangesRedisUtils.updateOffsets(properties.getProperty("group.id"), ranges, jedis)})})ssc.start()ssc.awaitTermination()
}class RedisUtils {//定义一个获取jedis的工具方法def getJedis: Jedis = {val config = new GenericObjectPoolConfig()config.setMaxTotal(15)//最大空闲连接数config.setMaxIdle(10)//最小空闲连接数config.setMinIdle(5)//创建线程池val pool = new JedisPool(config, "mypc01", 6379)//获取连接对象val jedis: Jedis = pool.getResourcejedis}//定义一个获取offsets对象的工具方法def getOffset(jedis: Jedis, prop: Properties): Map[TopicPartition, Long] = {//定义一个空的offsets对象var offsets: Map[TopicPartition, Long] = Map()//通过组名作为key从redis获取对应的field和value//本例中就是获取key=test1的fied以及value,返回的是一个map//就是利用hash类型的key获取hash类型的值//此处field代表 主题,value代表offset的那个数字//Map((pet0,11),(pet0,18))var kvs: util.Map[String, String] = jedis.hgetAll(prop.getProperty("group.id"))import scala.collection.JavaConversions._for (kv <- kvs) {val arr: Array[String] = kv._1.split("#")//从field解析出topicval topic: String = arr(0)从field解析出partitionval partition: Int = arr(1).toInt//offsets是个mapoffsets += (new TopicPartition(topic, partition) -> kv._2.toLong)}offsets}def updateOffsets(groupName: String, range: Array[OffsetRange], jedis: Jedis): Unit = {for (x <- range) {jedis.hset(groupName, x.topic + "#" + x.partition, x.untilOffset.toString)}}
}

解析

def createDirectStream[K, V](ssc: StreamingContext, locationStrategy: LocationStrategy, consumerStrategy: ConsumerStrategy[K, V]): InputDStream[ConsumerRecord[K, V]]

DStream的Scala构造函数,其中每个给定的Kafka主题/分区都对应于RDD分区。 spark配置spark.streaming.kafka.maxRatePerPartition给出每个分区每秒接受的最大消息数。

org.apache.spark.streaming.kafka010
trait HasOffsetRanges

表示任何具有OffsetRanges集合的对象。 这可用于访问由直接Kafka DStream生成的RDD中的偏移范围(请参阅KafkaUtils.createDirectStream)。

  KafkaUtils.createDirectStream(...)。foreachRDD {rdd =>val offsetRanges = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges...}

使用redis kv数据库维护kafka主题分区的offset相关推荐

  1. 【Kafka】Kafka 1.1.0以后版本获取Kafka每个分区最新Offset的几种方法

    1.概述 脚本方法 [lcc@lcc ~/soft/kafka/kafka_2.11-1.1.0]$ ./bin/kafka-run-

  2. 【Kafka】Kafka 0.10.0版本获取Kafka每个分区最新Offset的几种方法

    1.概述 脚本方法 [root@1 kafka]# ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list xxx:9092

  3. 使用Redis和Apache Kafka处理时间序列数据

    目录 场景:设备监控 解决方案架构 先决条件: 设置基础设施组件 设置本地服务 MQTT代理 Grafana Kafka 连接 创建MQTT源连接器实例 部署设备数据处理器应用程序 启动模拟设备数据生 ...

  4. Kafka主题体系架构-复制、故障转移和并行处理

    本文讨论了Kafka主题的体系架构,讨论了如何将分区用于故障转移和并行处理. Kafka主题,日志和分区 Kafka将主题存储在日志中.主题日志分为多个分区.Kafka将日志的分区分布在多个服务器或磁 ...

  5. Kafka系列之:深入理解Kafka 主题、分区、副本、LEO、ISR、HW、Kafka的主写主读和分区leader选举

    Kafka系列之:深入理解Kafka 主题.分区.副本.LEO.ISR.HW.Kafka的主写主读和分区leader选举 一.Kafka重要知识点提炼 二.详细介绍Kafka 主题.分区.副本.LEO ...

  6. 使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...

    原文:http://blog.csdn.net/changong28/article/details/39325079 使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指 ...

  7. kafka的分区策略(partition assignment strategy)

    概述 kafka的分区策略指的是producer端的 各个partition中的数据如何安排给consumer消费. Range(按范围) ange策略是对每个主题而言的,首先对同一个主题里面的分区按 ...

  8. java上传kafka的方法_哪种方法是将所有数据从Kafka主题复制到接收器(文件或Hive表)的最佳方法?...

    我正在使用Kafka Consumer API将所有数据从Kafka主题复制到Hive表 . 为此,我使用HDFS作为中间步骤 . 我使用唯一的组ID并将偏移重置为"最早",以便从 ...

  9. 【kafka】kafka 指定分区消费 不会触发 reblance

    文章目录 1.概述 2.验证 2.1 2个都是subscribeTopic 2.2 指定消费与全部消费 2.3 两个指定消费 2.4 2个都消费同样的分区呢? 1.概述 今天在博客:Kafka-消费, ...

最新文章

  1. Mopaas 初体验
  2. 最小栈—leetcode155
  3. java树算法_Java数据结构算法(三)树
  4. 用一个比特币买一辆Model3?马斯克血洗空头后,苹果也要跟?
  5. ubuntu下的tomcat监控脚本
  6. PJAX,站点加速之翼
  7. android内存泄露_Java应用程序中的内存泄漏及内存管理
  8. 自己动手简单实现vbb的URL静态化
  9. Java 批量下载图片并压缩为Zip
  10. 域名投毒,DNS污染,域名欺骗,其实就是域名污染。
  11. 今日头条视频采集方法
  12. html easyui怎么实现折叠面板,Easyui 创建折叠面板_EasyUI 教程
  13. IPv6地址基础理论讲解
  14. [NWERC 2019] E. Expeditious Cubing 浮点数精度判断
  15. 关于vc 2008 runtime
  16. Android国际化多语言切换
  17. excel拆分单元格,然后每个拆分出的单元格沿用原未拆分单元格内容
  18. 低代码,虽然有点毒瘤,但管用就好
  19. No.2第一章 启航 | Flink 知其然,知其所以然
  20. GLES2.0中文API-glFramebufferTexture2D

热门文章

  1. Python numpy 多维数组切片
  2. 如何用阿里云服务器建立一个wordpress网站?
  3. python输出杨辉三角啊二维数组_用Python输出一个杨辉三角的例子
  4. mllib调参 spark_《Spark 官方文档》机器学习库(MLlib)指南
  5. mac git配置 idea
  6. rosdep init 和rosdep update的解决方法,亲测有效
  7. cesium 球体半倾斜角度
  8. Apache配置文件httpd.conf详解
  9. 普通办公用计算机,工业计算机与普通办公用的电脑有什么区别?
  10. linux 环境下安装 docker 精简步骤