spark与kafka整合需要引入spark-streaming-kafka.jar,该jar根据kafka版本有2个分支,分别是spark-streaming-kafka-0-8和spark-streaming-kafka-0-10。

jar包分支选择原则:0.10.0>kafka版本>=0.8.2.1,选择spark-streaming-kafka-0-8;kafka版本>=0.10.0,选择spark-streaming-kafka-0-10。

kafka0.8.2.1及之后版本依次是0.8.2.1(2015年3月11号发布)、0.8.2.2(2015年10月2号发布)、0.9.x、0.10.x(0.10.0.0于2016年5月22号发布)、0.11.x、1.0.x(1.0.0版本于2017年11月1号发布)、1.1.x、2.0.x(2.0.0版本于2018年7月30日发布)。

本次学习使用kafka1.0.0版本,故需要引入spark-streaming-kafka-0-10的jar,如下

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.2.1</version>
</dependency>

PS:从jar包的groupId可看出,该jar是由spark项目组开发的。

简单用例1:本例在spark2.4.0(scala2.12)、kafka2.2.0(scala2.12)环境测试通过

import org.apache.commons.collections.MapUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import com.alibaba.fastjson.JSON;import java.util.*;public class SparkConsumerTest {public static void main(String[] args) throws Exception {System.setProperty("hadoop.home.dir", "C:/Users/lenovo/Downloads/winutils-master/winutils-master/hadoop-2.7.1");SparkConf conf = new SparkConf().setAppName("heihei").setMaster("local[*]");conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.56.100:9092");props.setProperty("group.id", "my-test-consumer-group");props.setProperty("enable.auto.commit", "true");props.setProperty("auto.commit.interval.ms", "1000");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");Map kafkaParams = new HashMap(8);kafkaParams.putAll(props);JavaInputDStream<ConsumerRecord<String, String>> javaInputDStream = KafkaUtils.createDirectStream(jssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(Arrays.asList("test"), kafkaParams));javaInputDStream.persist(StorageLevel.MEMORY_AND_DISK_SER());SparkSession spark = SparkSession.builder().config(conf).getOrCreate();javaInputDStream.foreachRDD(rdd -> {Dataset<Row> df = spark.createDataFrame(rdd.map(consumerRecord -> {Map testMap = JSON.parseObject(consumerRecord.value(), Map.class);return new DemoBean(MapUtils.getString(testMap, "id"),MapUtils.getString(testMap, "name"),MapUtils.getIntValue(testMap, "age"));}), DemoBean.class);DataFrameWriter writer = df.write();String url = "jdbc:postgresql://192.168.56.100/postgres";String table = "test";Properties connectionProperties = new Properties();connectionProperties.put("user", "postgres");connectionProperties.put("password", "abc123");connectionProperties.put("driver", "org.postgresql.Driver");connectionProperties.put("batchsize", "3000");writer.mode(SaveMode.Append).jdbc(url, table, connectionProperties);});jssc.start();jssc.awaitTermination();}
}

DemoBean是另外的一个实体类。

相应pom.xml:

    <dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>2.2.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>2.4.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>2.4.0</version><!--            <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>2.4.0</version></dependency><!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.6.7.2</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><!-- https://mvnrepository.com/artifact/com.thoughtworks.paranamer/paranamer --><dependency><groupId>com.thoughtworks.paranamer</groupId><artifactId>paranamer</artifactId><version>2.8</version></dependency><!-- https://mvnrepository.com/artifact/org.postgresql/postgresql --><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.5</version></dependency></dependencies>

简单用例2:本例在spark1.6.0(scala2.11)、kafka0.10.2.0(scala2.11)环境测试通过

import com.alibaba.fastjson.JSON;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.ZKGroupTopicDirs;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaUtils;import java.nio.charset.StandardCharsets;
import java.util.*;import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.kafka.OffsetRange;
import scala.collection.JavaConversions;
import scala.collection.Map$;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayBuffer;
import scala.util.Either;public class SparkConsumerTest {public static CuratorFramework curatorFramework;static {curatorFramework = CuratorFrameworkFactory.builder().connectString("192.168.56.103:2181").connectionTimeoutMs(30000).sessionTimeoutMs(30000).retryPolicy(new RetryUntilElapsed(1000, 1000)).build();curatorFramework.start();}public static void main(String[] args) throws Exception {String topic = "test";String groupId = "spark-test-consumer-group";System.setProperty("hadoop.home.dir", "C:/Users/lenovo/Downloads/winutils-master/winutils-master/hadoop-2.7.1");SparkConf conf = new SparkConf().setAppName("heihei").setMaster("local[*]");conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");// 每5s一个批次JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(30));Map<String, String> kafkaParams = new HashMap(4);kafkaParams.put("bootstrap.servers", "192.168.56.103:9092");// 生成fromOffsets,KafkaUtils.createDirectStream要使用Map<TopicAndPartition, Long> fromOffsets = getFromOffsets(kafkaParams, topic, groupId);SQLContext sqlContext = new SQLContext(jssc.sparkContext());// Function不是jdk的类,是spark中的类Function<MessageAndMetadata<String, String>, String> function = MessageAndMetadata::message;kafkaParams.put("group.id", groupId);JavaInputDStream<String> messages = KafkaUtils.createDirectStream(jssc,String.class,String.class,StringDecoder.class,StringDecoder.class,String.class,kafkaParams,fromOffsets,function);messages.foreachRDD(rdd -> {if (!rdd.isEmpty()) {String firstValue = rdd.first();System.out.println("message:" + firstValue);JavaRDD<Person> personJavaRDD = rdd.mapPartitions(it -> {List<String> list = new ArrayList();while (it.hasNext()) {list.add(it.next());}return list;}).map(p -> {try {Person person = JSON.parseObject(StringUtils.deleteWhitespace(p), Person.class);return person;} catch (Exception e) {e.printStackTrace();}return new Person();}).filter(p -> StringUtils.isNotBlank(p.getId())|| StringUtils.isNotBlank(p.getName())|| p.getAge() != 0);if (!personJavaRDD.isEmpty()) {DataFrame df = sqlContext.createDataFrame(personJavaRDD, Person.class).select("id", "name", "age");df.show(5);}// 设置zookeeper 消费偏移量OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();Arrays.asList(offsetRanges).forEach(offsetRange -> {String consumerOffsetDir = new ZKGroupTopicDirs(groupId, topic).consumerOffsetDir()+ "/" + offsetRange.partition();try {curatorFramework.setData().forPath(consumerOffsetDir, String.valueOf(offsetRange.untilOffset()).getBytes(StandardCharsets.UTF_8));} catch (Exception e) {e.printStackTrace();}});}});jssc.start();jssc.awaitTermination();}public static scala.collection.immutable.Map jMap2sMap(Map<String, String> map) {scala.collection.mutable.Map mapTest = JavaConversions.mapAsScalaMap(map);Object objTest = Map$.MODULE$.newBuilder().$plus$plus$eq(mapTest.toSeq());Object resultTest = ((scala.collection.mutable.Builder) objTest).result();scala.collection.immutable.Map resultTest2 = (scala.collection.immutable.Map) resultTest;return resultTest2;}public static Map<TopicAndPartition, Long> getFromOffsets(Map kafkaParams, String topic, String groupId) throws Exception {// kafkaParams只有bootstrap.servers -> broker列表KafkaCluster kc = new KafkaCluster(jMap2sMap(kafkaParams));ArrayBuffer<String> arrayBuffer = new ArrayBuffer();arrayBuffer.$plus$eq(topic);Either<ArrayBuffer<Throwable>, Set<TopicAndPartition>> either = kc.getPartitions(arrayBuffer.toSet());if (either.isLeft()) {throw new SparkException("get partitions failed", either.left().toOption().get().last());}scala.collection.immutable.Set<TopicAndPartition> topicAndPartitions = either.right().get();Either<ArrayBuffer<Throwable>, scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset>> either2 = kc.getEarliestLeaderOffsets(topicAndPartitions);if (either2.isLeft()) {throw new SparkException("get earliestLeaderOffsets failed", either2.left().toOption().get().last());}scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset> earliestLeaderOffsets = either2.right().get();Either<ArrayBuffer<Throwable>, scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset>> either3 = kc.getLatestLeaderOffsets(topicAndPartitions);if (either3.isLeft()) {throw new SparkException("get latestLeaderOffsets failed", either3.left().toOption().get().last());}scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset> latestLeaderOffsets = either3.right().get();Map<TopicAndPartition, Long> fromOffsets = new HashMap();ZKGroupTopicDirs zKGroupTopicDirs = new ZKGroupTopicDirs(groupId, topic);// 从0分区开始for (int i = 0; i < topicAndPartitions.size(); i++) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, i);// 路径是/consumers/$group/offsets/$topicString consumerOffsetDir = zKGroupTopicDirs.consumerOffsetDir() + "/" + i;long zookeeperConsumerOffset = 0;// 没有消费组偏移量目录,说明没有开始消费if (curatorFramework.checkExists().forPath(consumerOffsetDir) == null) {System.out.println(consumerOffsetDir + "目录不存在");// 如果目录不存在的话,就创建目录,并设值为0curatorFramework.create().creatingParentsIfNeeded().forPath(consumerOffsetDir, "0".getBytes(StandardCharsets.UTF_8));} else {// 拿到zookeeper节点存储的值byte[] zookeeperConsumerOffsetBytes = curatorFramework.getData().forPath(consumerOffsetDir);if (zookeeperConsumerOffsetBytes != null) {zookeeperConsumerOffset = Long.parseLong(new String(zookeeperConsumerOffsetBytes, StandardCharsets.UTF_8));}}long earliestLeaderOffset = earliestLeaderOffsets.get(topicAndPartition).get().offset();long latestLeaderOffset = latestLeaderOffsets.get(topicAndPartition).get().offset();long fromOffset;if (zookeeperConsumerOffset < earliestLeaderOffset) {fromOffset = earliestLeaderOffset;} else if (zookeeperConsumerOffset > latestLeaderOffset) {fromOffset = latestLeaderOffset;} else {fromOffset = zookeeperConsumerOffset;}fromOffsets.put(topicAndPartition, fromOffset);}return fromOffsets;}
}

pom.xml:

    <dependencies><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.11</artifactId><version>1.6.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>1.6.0</version><scope>provided</scope></dependency><!--  https://mvnrepository.com/artifact/org.apache.spark/spark-sql --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>1.6.0</version><scope>provided</scope></dependency></dependencies>

这里用了spark-streaming-kafka_2.11-1.6.0.jar,而没有用spark-streaming-kafka-assembly_2.11-1.6.0.jar。这两个jar包是完全一样的,但是后面的assembly包死活找不到源码。注意,这里kafka服务器虽然是0.10.2.0版本,但是没有引用kafka_2.11-0.10.2.0.jar,因为实测会报java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker。根本原因是在kafka_2.11-0.8.2.1.jar中,kafka.api.PartitionMetadata类定义是case class PartitionMetadata(partitionId: Int, val leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty, errorCode: Short = ErrorMapping.NoError),但是从kafka_2.11-0.10.0.0.jar开始,变成了case class PartitionMetadata(partitionId: Int, leader: Option[BrokerEndPoint], replicas: Seq[BrokerEndPoint], isr: Seq[BrokerEndPoint] = Seq.empty, errorCode: Short = Errors.NONE.code),成员变量的类型发生了变化。但是spark-streaming-kafka_2.11-1.6.0.jar包是和kafka_2.11-0.8.2.1.jar兼容的,所以在kafka_2.11-0.10.2.0.jar时会发生类型转换错误。

需要特别提醒的是,spark-streaming-kafka_2.11-1.6.0.jar包的KafkaCluster的内部类都是private的,引用KafkaCluster.LeaderOffset时一直报错。大量搜索后,发现从spark-streaming-kafka_2.11-2.0.jar版本开始,内部类才不是private的。解决办法是在项目中创建一个名为org.apache.spark.streaming.kafka的package,把spark-streaming-kafka_2.11-1.6.0.jar中的KafkaCluster类拷贝到这个包下,同时修改源码,把LeaderOffset的private[spark]标识符去掉。这样,引用的KafkaCluster类就是我们自己的了,不是spark-streaming-kafka_2.11-1.6.0.jar包中的了,KafkaCluster.LeaderOffset就可以用了。

以上场景都是kafka的一条消息对应数据库中的一条记录。如果一条kafka消息对应数据库中的多条记录呢?

简单用例3:和例2一样环境,pom也一样

import com.alibaba.fastjson.JSON;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.ZKGroupTopicDirs;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaUtils;import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.kafka.OffsetRange;
import scala.collection.JavaConversions;
import scala.collection.Map$;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayBuffer;
import scala.util.Either;public class SparkConsumerTest2 {public static CuratorFramework curatorFramework;static {curatorFramework = CuratorFrameworkFactory.builder().connectString("192.168.56.103:2181").connectionTimeoutMs(30000).sessionTimeoutMs(30000).retryPolicy(new RetryUntilElapsed(1000, 1000)).build();curatorFramework.start();}public static void main(String[] args) throws Exception {String topic = "test";String groupId = "spark-test-consumer-group";System.setProperty("hadoop.home.dir", "C:/Users/lenovo/Downloads/winutils-master/winutils-master/hadoop-2.7.1");SparkConf conf = new SparkConf().setAppName("heihei").setMaster("local[*]");conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");// 每5s一个批次JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(30));Map<String, String> kafkaParams = new HashMap(4);kafkaParams.put("bootstrap.servers", "192.168.56.103:9092");// 生成fromOffsets,KafkaUtils.createDirectStream要使用Map<TopicAndPartition, Long> fromOffsets = getFromOffsets(kafkaParams, topic, groupId);SQLContext sqlContext = new SQLContext(jssc.sparkContext());// Function不是jdk的类,是spark中的类Function<MessageAndMetadata<String, String>, String> messageHandler = MessageAndMetadata::message;kafkaParams.put("group.id", groupId);JavaInputDStream<String> messages = KafkaUtils.createDirectStream(jssc,String.class,String.class,StringDecoder.class,StringDecoder.class,String.class,kafkaParams,fromOffsets,messageHandler);messages.foreachRDD(rdd -> {if (!rdd.isEmpty()) {String firstValue = rdd.first();System.out.println("message:" + firstValue);JavaRDD<List<Person>> personListJavaRDD = rdd.mapPartitions(it -> {List<String> list = new ArrayList();while (it.hasNext()) {list.add(it.next());}return list;}).map(p -> {try {List<Person> personList = JSON.parseArray(StringUtils.deleteWhitespace(p), Person.class);return personList;} catch (Exception e) {e.printStackTrace();}return Arrays.asList(new Person());});List<List<Person>> list = personListJavaRDD.collect();List<Person> personList = new ArrayList();list.stream().forEach(pl -> personList.addAll(pl));List filteredPersonList = personList.stream().filter(p -> StringUtils.isNotBlank(p.getId())|| StringUtils.isNotBlank(p.getName())|| p.getAge() != 0).collect(Collectors.toList());if (!filteredPersonList.isEmpty()) {DataFrame df = sqlContext.createDataFrame(filteredPersonList, Person.class).select("id", "name", "age");df.show(5);}// 设置zookeeper 消费偏移量OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();Arrays.asList(offsetRanges).forEach(offsetRange -> {String consumerOffsetDir = new ZKGroupTopicDirs(groupId, topic).consumerOffsetDir()+ "/" + offsetRange.partition();try {curatorFramework.setData().forPath(consumerOffsetDir, String.valueOf(offsetRange.untilOffset()).getBytes(StandardCharsets.UTF_8));} catch (Exception e) {e.printStackTrace();}});}});jssc.start();jssc.awaitTermination();}public static scala.collection.immutable.Map jMap2sMap(Map<String, String> map) {scala.collection.mutable.Map mapTest = JavaConversions.mapAsScalaMap(map);Object objTest = Map$.MODULE$.newBuilder().$plus$plus$eq(mapTest.toSeq());Object resultTest = ((scala.collection.mutable.Builder) objTest).result();scala.collection.immutable.Map resultTest2 = (scala.collection.immutable.Map) resultTest;return resultTest2;}public static Map<TopicAndPartition, Long> getFromOffsets(Map kafkaParams, String topic, String groupId) throws Exception {// kafkaParams只有bootstrap.servers -> broker列表KafkaCluster kc = new KafkaCluster(jMap2sMap(kafkaParams));ArrayBuffer<String> arrayBuffer = new ArrayBuffer();arrayBuffer.$plus$eq(topic);Either<ArrayBuffer<Throwable>, Set<TopicAndPartition>> either = kc.getPartitions(arrayBuffer.toSet());if (either.isLeft()) {throw new SparkException("get partitions failed", either.left().toOption().get().last());}scala.collection.immutable.Set<TopicAndPartition> topicAndPartitions = either.right().get();Either<ArrayBuffer<Throwable>, scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset>> either2 = kc.getEarliestLeaderOffsets(topicAndPartitions);if (either2.isLeft()) {throw new SparkException("get earliestLeaderOffsets failed", either2.left().toOption().get().last());}scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset> earliestLeaderOffsets = either2.right().get();Either<ArrayBuffer<Throwable>, scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset>> either3 = kc.getLatestLeaderOffsets(topicAndPartitions);if (either3.isLeft()) {throw new SparkException("get latestLeaderOffsets failed", either3.left().toOption().get().last());}scala.collection.immutable.Map<TopicAndPartition, KafkaCluster.LeaderOffset> latestLeaderOffsets = either3.right().get();Map<TopicAndPartition, Long> fromOffsets = new HashMap();ZKGroupTopicDirs zKGroupTopicDirs = new ZKGroupTopicDirs(groupId, topic);// 从0分区开始for (int i = 0; i < topicAndPartitions.size(); i++) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, i);// 路径是/consumers/$group/offsets/$topicString consumerOffsetDir = zKGroupTopicDirs.consumerOffsetDir() + "/" + i;long zookeeperConsumerOffset = 0;// 没有消费组偏移量目录,说明没有开始消费if (curatorFramework.checkExists().forPath(consumerOffsetDir) == null) {System.out.println(consumerOffsetDir + "目录不存在");// 如果目录不存在的话,就创建目录,并设值为0curatorFramework.create().creatingParentsIfNeeded().forPath(consumerOffsetDir, "0".getBytes(StandardCharsets.UTF_8));} else {// 拿到zookeeper节点存储的值byte[] zookeeperConsumerOffsetBytes = curatorFramework.getData().forPath(consumerOffsetDir);if (zookeeperConsumerOffsetBytes != null) {zookeeperConsumerOffset = Long.parseLong(new String(zookeeperConsumerOffsetBytes, StandardCharsets.UTF_8));}}long earliestLeaderOffset = earliestLeaderOffsets.get(topicAndPartition).get().offset();long latestLeaderOffset = latestLeaderOffsets.get(topicAndPartition).get().offset();long fromOffset;if (zookeeperConsumerOffset < earliestLeaderOffset) {fromOffset = earliestLeaderOffset;} else if (zookeeperConsumerOffset > latestLeaderOffset) {fromOffset = latestLeaderOffset;} else {fromOffset = zookeeperConsumerOffset;}fromOffsets.put(topicAndPartition, fromOffset);}return fromOffsets;}
}

转载于:https://www.cnblogs.com/koushr/p/5873442.html

spark第十篇:Spark与Kafka整合相关推荐

  1. 学习笔记Spark(十)—— Spark MLlib应用(2)—— Spark MLlib应用

    三.Spark MLlib应用 3.1.Spark ML线性模型 数据准备 基于Spark ML的线性模型需要DataFrame类型的模型数据,DataFrame需要包含:一列标签列,一列由多个特征合 ...

  2. 第十篇:Spring Boot整合mybatis+Mysql 入门试炼02

    前言: 1.(SprigBoot整合SpringMVC+Mybatis) 2.以thymeleaf作为视图层技术整合 3.springboot版本2.0.5.RELEASE 创建项目 1.添加依赖及启 ...

  3. 第十篇:Spring Boot整合mybatis+逆向工程(Mysql+Oracle) 入门试炼01

    1.添加pom依赖 <dependencies><!--springboot web 启动器--><dependency><groupId>org.sp ...

  4. 编程实现将rdd转换为dataframe:源文件内容如下(_第四篇|Spark Streaming编程指南(1)

    Spark Streaming是构建在Spark Core基础之上的流处理框架,是Spark非常重要的组成部分.Spark Streaming于2013年2月在Spark0.7.0版本中引入,发展至今 ...

  5. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

  6. spark 设置主类_最近Kafka这么火,聊一聊Kafka:Kafka与Spark的集成

    Spark 编程模型 在Spark 中, 我们通过对分布式数据集的操作来表达计算意图 ,这些计算会自动在集群上 井行执行 这样的数据集被称为弹性分布式数据集 Resilient Distributed ...

  7. 实验十八 Spark实验:Spark Streaming

    实验指导: 18.1 实验目的 1. 了解Spark Streaming版本的WordCount和MapReduce版本的WordCount的区别: 2. 理解Spark Streaming的工作流程 ...

  8. 大数据篇--Spark常见面试题总结一

    文章目录 一.Spark 概念.模块 1.相关概念: 2.基本模块: 二.Spark作业提交流程是怎么样的 三.Spark on YARN两种方式的区别以及工作流程 1.Yarn组件简介: 2.Spa ...

  9. hive编程指南电子版_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

最新文章

  1. 程序员工资为什么高?
  2. Google正式收购百度
  3. 正则表达式:密码中至少包含大写字母、小写字母、数字、特殊字符等字符中的2种或3种
  4. 以编程方式访问Java基本类型的大小
  5. 前端实习生笔试_前端实习生面试题——HTML
  6. (48)FPGA三态多驱动(tri型)
  7. php 走马灯轮播,Vue.js轮播图走马灯代码实例(全)
  8. 【ElasticSearch】Es 源码之 快照 RepositoriesModule RepositoriesService 源码解读
  9. 附合导线坐标计算例题_附合导线计算例题步骤
  10. 【前端实用工具集】js对url进行编码和解码的三种方式
  11. 计算机硬件仿真实验软件,计算机硬件实验虚拟仿真平台的设计
  12. 最大公约数,最小公倍数的求法
  13. 每日英语:Etiquette Catches On in China, Even in Government
  14. 【渝粤题库】陕西师范大学292251 公司金融学Ⅰ 作业(高起专)
  15. 《视频解密》中文版(第四版) 第七章 数字视频处理(第一部分)
  16. linux php zlib,Linux查询zlib版本
  17. key_t键和ftok函数(转)
  18. 【官方教程】使用Quick-Cocos2d-x搭建一个横版过关游戏(六)
  19. 现代密码学 | 02:流密码——1
  20. Ubuntu16.04安装谷歌拼音输入法

热门文章

  1. python编程语言_Python编程语言的历史
  2. wpf程序运行时停止工作,事件管理器报错lci_proxyumd32.dll
  3. Java基础篇:什么是死锁?如何去避免它?
  4. 002-软件质量模型
  5. java界面编程(9) ------ 列表框
  6. react key值警告问题
  7. Android OTA相关博文
  8. springmvc + excel代
  9. Linux操作系统的软件安装方法 — IT…
  10. 【cocos2d-x】对CCSprite进行高斯模糊