spark第十篇:Spark与Kafka整合
spark与kafka整合需要引入spark-streaming-kafka.jar,该jar根据kafka版本有2个分支,分别是spark-streaming-kafka-0-8和spark-streaming-kafka-0-10。
jar包分支选择原则:0.10.0>kafka版本>=0.8.2.1,选择spark-streaming-kafka-0-8;kafka版本>=0.10.0,选择spark-streaming-kafka-0-10。
kafka0.8.2.1及之后版本依次是0.8.2.1(2015年3月11号发布)、0.8.2.2(2015年10月2号发布)、0.9.x、0.10.x(0.10.0.0于2016年5月22号发布)、0.11.x、1.0.x(1.0.0版本于2017年11月1号发布)、1.1.x、2.0.x(2.0.0版本于2018年7月30日发布)。
本次学习使用kafka1.0.0版本,故需要引入spark-streaming-kafka-0-10的jar,如下
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 --> <dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.2.1</version> </dependency>
PS:从jar包的groupId可看出,该jar是由spark项目组开发的。
简单用例1:本例在spark2.4.0(scala2.12)、kafka2.2.0(scala2.12)环境测试通过
import org.apache.commons.collections.MapUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.spark.SparkConf; import org.apache.spark.sql.*; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.kafka010.*; import com.alibaba.fastjson.JSON;import java.util.*;public class SparkConsumerTest {public static void main(String[] args) throws Exception {System.setProperty("hadoop.home.dir", "C:/Users/lenovo/Downloads/winutils-master/winutils-master/hadoop-2.7.1");SparkConf conf = new SparkConf().setAppName("heihei").setMaster("local[*]");conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.56.100:9092");props.setProperty("group.id", "my-test-consumer-group");props.setProperty("enable.auto.commit", "true");props.setProperty("auto.commit.interval.ms", "1000");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");Map kafkaParams = new HashMap(8);kafkaParams.putAll(props);JavaInputDStream<ConsumerRecord<String, String>> javaInputDStream = KafkaUtils.createDirectStream(jssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(Arrays.asList("test"), kafkaParams));javaInputDStream.persist(StorageLevel.MEMORY_AND_DISK_SER());SparkSession spark = SparkSession.builder().config(conf).getOrCreate();javaInputDStream.foreachRDD(rdd -> {Dataset<Row> df = spark.createDataFrame(rdd.map(consumerRecord -> {Map testMap = JSON.parseObject(consumerRecord.value(), Map.class);return new DemoBean(MapUtils.getString(testMap, "id"),MapUtils.getString(testMap, "name"),MapUtils.getIntValue(testMap, "age"));}), DemoBean.class);DataFrameWriter writer = df.write();String url = "jdbc:postgresql://192.168.56.100/postgres";String table = "test";Properties connectionProperties = new Properties();connectionProperties.put("user", "postgres");connectionProperties.put("password", "abc123");connectionProperties.put("driver", "org.postgresql.Driver");connectionProperties.put("batchsize", "3000");writer.mode(SaveMode.Append).jdbc(url, table, connectionProperties);});jssc.start();jssc.awaitTermination();} }
DemoBean是另外的一个实体类。
相应pom.xml:
<dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>2.2.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>2.4.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>2.4.0</version><!-- <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>2.4.0</version></dependency><!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.6.7.2</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><!-- https://mvnrepository.com/artifact/com.thoughtworks.paranamer/paranamer --><dependency><groupId>com.thoughtworks.paranamer</groupId><artifactId>paranamer</artifactId><version>2.8</version></dependency><!-- https://mvnrepository.com/artifact/org.postgresql/postgresql --><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.5</version></dependency></dependencies>
简单用例2:本例在spark1.6.0(scala2.11)、kafka0.10.2.0(scala2.11)环境测试通过
import com.alibaba.fastjson.JSON; import kafka.common.TopicAndPartition; import kafka.message.MessageAndMetadata; import kafka.serializer.StringDecoder; import kafka.utils.ZKGroupTopicDirs; import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryUntilElapsed; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.HasOffsetRanges; import org.apache.spark.streaming.kafka.KafkaCluster; import org.apache.spark.streaming.kafka.KafkaUtils;import java.nio.charset.StandardCharsets; import java.util.*;import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.kafka.OffsetRange; import scala.collection.JavaConversions; import scala.collection.Map$; import scala.collection.immutable.Set; import scala.collection.mutable.ArrayBuffer; import scala.util.Either;public class SparkConsumerTest {public static CuratorFramework curatorFramework;static {curatorFramework = CuratorFrameworkFactory.builder().connectString("192.168.56.103:2181").connectionTimeoutMs(30000).sessionTimeoutMs(30000).retryPolicy(new RetryUntilElapsed(1000, 1000)).build();curatorFramework.start();}public static void main(String[] args) throws Exception {String topic = "test";String groupId = "spark-test-consumer-group";System.setProperty("hadoop.home.dir", "C:/Users/lenovo/Downloads/winutils-master/winutils-master/hadoop-2.7.1");SparkConf conf = new SparkConf().setAppName("heihei").setMaster("local[*]");conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");// 每5s一个批次JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(30));Map<String, String> kafkaParams = new HashMap(4);kafkaParams.put("bootstrap.servers", "192.168.56.103:9092");// 生成fromOffsets,KafkaUtils.createDirectStream要使用Map<TopicAndPartition, Long> fromOffsets = getFromOffsets(kafkaParams, topic, groupId);SQLContext sqlContext = new SQLContext(jssc.sparkContext());// Function不是jdk的类,是spark中的类Function<MessageAndMetadata<String, String>, String> function = MessageAndMetadata::message;kafkaParams.put("group.id", groupId);JavaInputDStream<String> messages = KafkaUtils.createDirectStream(jssc,String.class,String.class,StringDecoder.class,StringDecoder.class,String.class,kafkaParams,fromOffsets,function);messages.foreachRDD(rdd -> {if (!rdd.isEmpty()) {String firstValue = rdd.first();System.out.println("message:" + firstValue);JavaRDD<Person> personJavaRDD = rdd.mapPartitions(it -> {List<String> list = new ArrayList();while (it.hasNext()) {list.add(it.next());}return list;}).map(p -> {try {Person person = JSON.parseObject(StringUtils.deleteWhitespace(p), Person.class);return person;} catch (Exception e) {e.printStackTrace();}return new Person();}).filter(p -> StringUtils.isNotBlank(p.getId())|| StringUtils.isNotBlank(p.getName())|| p.getAge() != 0);if (!personJavaRDD.isEmpty()) {DataFrame df = sqlContext.createDataFrame(personJavaRDD, Person.class).select("id", "name", "age");df.show(5);}// 设置zookeeper 消费偏移量OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();Arrays.asList(offsetRanges).forEach(offsetRange -> {String consumerOffsetDir = new ZKGroupTopicDirs(groupId, topic).consumerOffsetDir()+ "/" + offsetRange.partition();try {curatorFramework.setData().forPath(consumerOffsetDir, String.valueOf(offsetRange.untilOffset()).getBytes(StandardCharsets.UTF_8));} catch (Exception e) {e.printStackTrace();}});}});jssc.start();jssc.awaitTermination();}public static scala.collection.immutable.Map jMap2sMap(Map<String, String> map) {scala.collection.mutable.Map mapTest = JavaConversions.mapAsScalaMap(map);Object objTest = Map$.MODULE$.newBuilder().$plus$plus$eq(mapTest.toSeq());Object resultTest = ((scala.collection.mutable.Builder) objTest).result();scala.collection.immutable.Map resultTest2 = (scala.collection.immutable.Map) resultTest;return resultTest2;}public static Map<TopicAndPartition, Long> getFromOffsets(Map kafkaParams, String topic, String groupId) throws Exception {// kafkaParams只有bootstrap.servers -> broker列表KafkaCluster kc = new KafkaCluster(jMap2sMap(kafkaParams));ArrayBuffer<String> arrayBuffer = new ArrayBuffer();arrayBuffer.$plus$eq(topic);Either<ArrayBuffer<Throwable>, Set<TopicAndPartition>> either = kc.getPartitions(arrayBuffer.toSet());if (either.isLeft()) {throw new SparkException("get partitions failed", either.left().toOption().get().last());}scala.collection.immutable.Set<TopicAndPartition> topicAndPartitions = either.right().get();Either<ArrayBuffer<Throwable>, scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset>> either2 = kc.getEarliestLeaderOffsets(topicAndPartitions);if (either2.isLeft()) {throw new SparkException("get earliestLeaderOffsets failed", either2.left().toOption().get().last());}scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset> earliestLeaderOffsets = either2.right().get();Either<ArrayBuffer<Throwable>, scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset>> either3 = kc.getLatestLeaderOffsets(topicAndPartitions);if (either3.isLeft()) {throw new SparkException("get latestLeaderOffsets failed", either3.left().toOption().get().last());}scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset> latestLeaderOffsets = either3.right().get();Map<TopicAndPartition, Long> fromOffsets = new HashMap();ZKGroupTopicDirs zKGroupTopicDirs = new ZKGroupTopicDirs(groupId, topic);// 从0分区开始for (int i = 0; i < topicAndPartitions.size(); i++) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, i);// 路径是/consumers/$group/offsets/$topicString consumerOffsetDir = zKGroupTopicDirs.consumerOffsetDir() + "/" + i;long zookeeperConsumerOffset = 0;// 没有消费组偏移量目录,说明没有开始消费if (curatorFramework.checkExists().forPath(consumerOffsetDir) == null) {System.out.println(consumerOffsetDir + "目录不存在");// 如果目录不存在的话,就创建目录,并设值为0curatorFramework.create().creatingParentsIfNeeded().forPath(consumerOffsetDir, "0".getBytes(StandardCharsets.UTF_8));} else {// 拿到zookeeper节点存储的值byte[] zookeeperConsumerOffsetBytes = curatorFramework.getData().forPath(consumerOffsetDir);if (zookeeperConsumerOffsetBytes != null) {zookeeperConsumerOffset = Long.parseLong(new String(zookeeperConsumerOffsetBytes, StandardCharsets.UTF_8));}}long earliestLeaderOffset = earliestLeaderOffsets.get(topicAndPartition).get().offset();long latestLeaderOffset = latestLeaderOffsets.get(topicAndPartition).get().offset();long fromOffset;if (zookeeperConsumerOffset < earliestLeaderOffset) {fromOffset = earliestLeaderOffset;} else if (zookeeperConsumerOffset > latestLeaderOffset) {fromOffset = latestLeaderOffset;} else {fromOffset = zookeeperConsumerOffset;}fromOffsets.put(topicAndPartition, fromOffset);}return fromOffsets;} }
pom.xml:
<dependencies><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.11</artifactId><version>1.6.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>1.6.0</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>1.6.0</version><scope>provided</scope></dependency></dependencies>
这里用了spark-streaming-kafka_2.11-1.6.0.jar,而没有用spark-streaming-kafka-assembly_2.11-1.6.0.jar。这两个jar包是完全一样的,但是后面的assembly包死活找不到源码。注意,这里kafka服务器虽然是0.10.2.0版本,但是没有引用kafka_2.11-0.10.2.0.jar,因为实测会报java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker。根本原因是在kafka_2.11-0.8.2.1.jar中,kafka.api.PartitionMetadata类定义是case class PartitionMetadata(partitionId: Int, val leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty, errorCode: Short = ErrorMapping.NoError),但是从kafka_2.11-0.10.0.0.jar开始,变成了case class PartitionMetadata(partitionId: Int, leader: Option[BrokerEndPoint], replicas: Seq[BrokerEndPoint], isr: Seq[BrokerEndPoint] = Seq.empty, errorCode: Short = Errors.NONE.code),成员变量的类型发生了变化。但是spark-streaming-kafka_2.11-1.6.0.jar包是和kafka_2.11-0.8.2.1.jar兼容的,所以在kafka_2.11-0.10.2.0.jar时会发生类型转换错误。
需要特别提醒的是,spark-streaming-kafka_2.11-1.6.0.jar包的KafkaCluster的内部类都是private的,引用KafkaCluster.LeaderOffset时一直报错。大量搜索后,发现从spark-streaming-kafka_2.11-2.0.jar版本开始,内部类才不是private的。解决办法是在项目中创建一个名为org.apache.spark.streaming.kafka的package,把spark-streaming-kafka_2.11-1.6.0.jar中的KafkaCluster类拷贝到这个包下,同时修改源码,把LeaderOffset的private[spark]标识符去掉。这样,引用的KafkaCluster类就是我们自己的了,不是spark-streaming-kafka_2.11-1.6.0.jar包中的了,KafkaCluster.LeaderOffset就可以用了。
以上场景都是kafka的一条消息对应数据库中的一条记录。如果一条kafka消息对应数据库中的多条记录呢?
简单用例3:和例2一样环境,pom也一样
import com.alibaba.fastjson.JSON; import kafka.common.TopicAndPartition; import kafka.message.MessageAndMetadata; import kafka.serializer.StringDecoder; import kafka.utils.ZKGroupTopicDirs; import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryUntilElapsed; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.HasOffsetRanges; import org.apache.spark.streaming.kafka.KafkaCluster; import org.apache.spark.streaming.kafka.KafkaUtils;import java.nio.charset.StandardCharsets; import java.util.*; import java.util.stream.Collectors;import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.kafka.OffsetRange; import scala.collection.JavaConversions; import scala.collection.Map$; import scala.collection.immutable.Set; import scala.collection.mutable.ArrayBuffer; import scala.util.Either;public class SparkConsumerTest2 {public static CuratorFramework curatorFramework;static {curatorFramework = CuratorFrameworkFactory.builder().connectString("192.168.56.103:2181").connectionTimeoutMs(30000).sessionTimeoutMs(30000).retryPolicy(new RetryUntilElapsed(1000, 1000)).build();curatorFramework.start();}public static void main(String[] args) throws Exception {String topic = "test";String groupId = "spark-test-consumer-group";System.setProperty("hadoop.home.dir", "C:/Users/lenovo/Downloads/winutils-master/winutils-master/hadoop-2.7.1");SparkConf conf = new SparkConf().setAppName("heihei").setMaster("local[*]");conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");// 每5s一个批次JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(30));Map<String, String> kafkaParams = new HashMap(4);kafkaParams.put("bootstrap.servers", "192.168.56.103:9092");// 生成fromOffsets,KafkaUtils.createDirectStream要使用Map<TopicAndPartition, Long> fromOffsets = getFromOffsets(kafkaParams, topic, groupId);SQLContext sqlContext = new SQLContext(jssc.sparkContext());// Function不是jdk的类,是spark中的类Function<MessageAndMetadata<String, String>, String> messageHandler = MessageAndMetadata::message;kafkaParams.put("group.id", groupId);JavaInputDStream<String> messages = KafkaUtils.createDirectStream(jssc,String.class,String.class,StringDecoder.class,StringDecoder.class,String.class,kafkaParams,fromOffsets,messageHandler);messages.foreachRDD(rdd -> {if (!rdd.isEmpty()) {String firstValue = rdd.first();System.out.println("message:" + firstValue);JavaRDD<List<Person>> personListJavaRDD = rdd.mapPartitions(it -> {List<String> list = new ArrayList();while (it.hasNext()) {list.add(it.next());}return list;}).map(p -> {try {List<Person> personList = JSON.parseArray(StringUtils.deleteWhitespace(p), Person.class);return personList;} catch (Exception e) {e.printStackTrace();}return Arrays.asList(new Person());});List<List<Person>> list = personListJavaRDD.collect();List<Person> personList = new ArrayList();list.stream().forEach(pl -> personList.addAll(pl));List filteredPersonList = personList.stream().filter(p -> StringUtils.isNotBlank(p.getId())|| StringUtils.isNotBlank(p.getName())|| p.getAge() != 0).collect(Collectors.toList());if (!filteredPersonList.isEmpty()) {DataFrame df = sqlContext.createDataFrame(filteredPersonList, Person.class).select("id", "name", "age");df.show(5);}// 设置zookeeper 消费偏移量OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();Arrays.asList(offsetRanges).forEach(offsetRange -> {String consumerOffsetDir = new ZKGroupTopicDirs(groupId, topic).consumerOffsetDir()+ "/" + offsetRange.partition();try {curatorFramework.setData().forPath(consumerOffsetDir, String.valueOf(offsetRange.untilOffset()).getBytes(StandardCharsets.UTF_8));} catch (Exception e) {e.printStackTrace();}});}});jssc.start();jssc.awaitTermination();}public static scala.collection.immutable.Map jMap2sMap(Map<String, String> map) {scala.collection.mutable.Map mapTest = JavaConversions.mapAsScalaMap(map);Object objTest = Map$.MODULE$.newBuilder().$plus$plus$eq(mapTest.toSeq());Object resultTest = ((scala.collection.mutable.Builder) objTest).result();scala.collection.immutable.Map resultTest2 = (scala.collection.immutable.Map) resultTest;return resultTest2;}public static Map<TopicAndPartition, Long> getFromOffsets(Map kafkaParams, String topic, String groupId) throws Exception {// kafkaParams只有bootstrap.servers -> broker列表KafkaCluster kc = new KafkaCluster(jMap2sMap(kafkaParams));ArrayBuffer<String> arrayBuffer = new ArrayBuffer();arrayBuffer.$plus$eq(topic);Either<ArrayBuffer<Throwable>, Set<TopicAndPartition>> either = kc.getPartitions(arrayBuffer.toSet());if (either.isLeft()) {throw new SparkException("get partitions failed", either.left().toOption().get().last());}scala.collection.immutable.Set<TopicAndPartition> topicAndPartitions = either.right().get();Either<ArrayBuffer<Throwable>, scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset>> either2 = kc.getEarliestLeaderOffsets(topicAndPartitions);if (either2.isLeft()) {throw new SparkException("get earliestLeaderOffsets failed", either2.left().toOption().get().last());}scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset> earliestLeaderOffsets = either2.right().get();Either<ArrayBuffer<Throwable>, scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset>> either3 = kc.getLatestLeaderOffsets(topicAndPartitions);if (either3.isLeft()) {throw new SparkException("get latestLeaderOffsets failed", either3.left().toOption().get().last());}scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset> latestLeaderOffsets = either3.right().get();Map<TopicAndPartition, Long> fromOffsets = new HashMap();ZKGroupTopicDirs zKGroupTopicDirs = new ZKGroupTopicDirs(groupId, topic);// 从0分区开始for (int i = 0; i < topicAndPartitions.size(); i++) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, i);// 路径是/consumers/$group/offsets/$topicString consumerOffsetDir = zKGroupTopicDirs.consumerOffsetDir() + "/" + i;long zookeeperConsumerOffset = 0;// 没有消费组偏移量目录,说明没有开始消费if (curatorFramework.checkExists().forPath(consumerOffsetDir) == null) {System.out.println(consumerOffsetDir + "目录不存在");// 如果目录不存在的话,就创建目录,并设值为0curatorFramework.create().creatingParentsIfNeeded().forPath(consumerOffsetDir, "0".getBytes(StandardCharsets.UTF_8));} else {// 拿到zookeeper节点存储的值byte[] zookeeperConsumerOffsetBytes = curatorFramework.getData().forPath(consumerOffsetDir);if (zookeeperConsumerOffsetBytes != null) {zookeeperConsumerOffset = Long.parseLong(new String(zookeeperConsumerOffsetBytes, StandardCharsets.UTF_8));}}long earliestLeaderOffset = earliestLeaderOffsets.get(topicAndPartition).get().offset();long latestLeaderOffset = latestLeaderOffsets.get(topicAndPartition).get().offset();long fromOffset;if (zookeeperConsumerOffset < earliestLeaderOffset) {fromOffset = earliestLeaderOffset;} else if (zookeeperConsumerOffset > latestLeaderOffset) {fromOffset = latestLeaderOffset;} else {fromOffset = zookeeperConsumerOffset;}fromOffsets.put(topicAndPartition, fromOffset);}return fromOffsets;} }
转载于:https://www.cnblogs.com/koushr/p/5873442.html
spark第十篇:Spark与Kafka整合相关推荐
- 学习笔记Spark(十)—— Spark MLlib应用(2)—— Spark MLlib应用
三.Spark MLlib应用 3.1.Spark ML线性模型 数据准备 基于Spark ML的线性模型需要DataFrame类型的模型数据,DataFrame需要包含:一列标签列,一列由多个特征合 ...
- 第十篇:Spring Boot整合mybatis+Mysql 入门试炼02
前言: 1.(SprigBoot整合SpringMVC+Mybatis) 2.以thymeleaf作为视图层技术整合 3.springboot版本2.0.5.RELEASE 创建项目 1.添加依赖及启 ...
- 第十篇:Spring Boot整合mybatis+逆向工程(Mysql+Oracle) 入门试炼01
1.添加pom依赖 <dependencies><!--springboot web 启动器--><dependency><groupId>org.sp ...
- 编程实现将rdd转换为dataframe:源文件内容如下(_第四篇|Spark Streaming编程指南(1)
Spark Streaming是构建在Spark Core基础之上的流处理框架,是Spark非常重要的组成部分.Spark Streaming于2013年2月在Spark0.7.0版本中引入,发展至今 ...
- spark 源码分析之十八 -- Spark存储体系剖析
本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...
- spark 设置主类_最近Kafka这么火,聊一聊Kafka:Kafka与Spark的集成
Spark 编程模型 在Spark 中, 我们通过对分布式数据集的操作来表达计算意图 ,这些计算会自动在集群上 井行执行 这样的数据集被称为弹性分布式数据集 Resilient Distributed ...
- 实验十八 Spark实验:Spark Streaming
实验指导: 18.1 实验目的 1. 了解Spark Streaming版本的WordCount和MapReduce版本的WordCount的区别: 2. 理解Spark Streaming的工作流程 ...
- 大数据篇--Spark常见面试题总结一
文章目录 一.Spark 概念.模块 1.相关概念: 2.基本模块: 二.Spark作业提交流程是怎么样的 三.Spark on YARN两种方式的区别以及工作流程 1.Yarn组件简介: 2.Spa ...
- hive编程指南电子版_第三篇|Spark SQL编程指南
在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...
最新文章
- 程序员工资为什么高?
- Google正式收购百度
- 正则表达式:密码中至少包含大写字母、小写字母、数字、特殊字符等字符中的2种或3种
- 以编程方式访问Java基本类型的大小
- 前端实习生笔试_前端实习生面试题——HTML
- (48)FPGA三态多驱动(tri型)
- php 走马灯轮播,Vue.js轮播图走马灯代码实例(全)
- 【ElasticSearch】Es 源码之 快照 RepositoriesModule RepositoriesService 源码解读
- 附合导线坐标计算例题_附合导线计算例题步骤
- 【前端实用工具集】js对url进行编码和解码的三种方式
- 计算机硬件仿真实验软件,计算机硬件实验虚拟仿真平台的设计
- 最大公约数,最小公倍数的求法
- 每日英语:Etiquette Catches On in China, Even in Government
- 【渝粤题库】陕西师范大学292251 公司金融学Ⅰ 作业(高起专)
- 《视频解密》中文版(第四版) 第七章 数字视频处理(第一部分)
- linux php zlib,Linux查询zlib版本
- key_t键和ftok函数(转)
- 【官方教程】使用Quick-Cocos2d-x搭建一个横版过关游戏(六)
- 现代密码学 | 02:流密码——1
- Ubuntu16.04安装谷歌拼音输入法