一、前言:

使用Apache Kafka消费者组时,有一个为消费者分配对应分区partition的过程,我们可以使用“自动”subscribe和“手动”assign的方式。

  • KafkaConsumer.subscribe():为consumer自动分配partition,有内部算法保证topic-partition以最优的方式均匀分配给同group下的不同consumer。
  • KafkaConsumer.assign():为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制。

注意:consumer.assign()可以不被消费者的组管理功能管理,他相对于是一个临时的,不提交就不会改变当前group.id的offset(如果consumer.commitSync()或者consumer.commitASync()或者自动提交 还是会影响offset),

比如:在使用consumer.subscribe(Arrays.asList(topicName));时offset为20:

(1) 如果再通过assign方式已经获取了消息后,不提交offset,在下次通过consumer.subscribe(Arrays.asList(topicName));来获取消息时offset还是20,还是会获取20以后的消息。

(2) 如果再通过assign方式已经获取了消息后,提交了offset,,在下次通过consumer.subscribe(Arrays.asList(topicName));来获取消息时offset就变了

二、如果两种模式都用的话会报错

报错信息:

java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusiveat org.apache.kafka.clients.consumer.internals.SubscriptionState.setSubscriptionType(SubscriptionState.java:111) ~[kafka-clients-0.11.0.2.jar!/:na]at org.apache.kafka.clients.consumer.internals.SubscriptionState.subscribe(SubscriptionState.java:118) ~[kafka-clients-0.11.0.2.jar!/:na]at org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(KafkaConsumer.java:873) ~[kafka-clients-0.11.0.2.jar!/:na]at org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(KafkaConsumer.java:901) ~[kafka-clients-0.11.0.2.jar!/:na]at com.guoxin.sydjtxry.SydjtxryConsumer.doWork(SydjtxryConsumer.java:77) ~[classes!/:1.0-SNAPSHOT]at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) ~[kafka_2.11-0.11.0.2.jar!/:na]2020-04-12 09:46:38.705  INFO 43884 --- [ConsumerExample] com.guoxin.sydjtxry.SydjtxryConsumer     : [KafkaConsumerExample]: Stopped

错误代码:

consumer.subscribe(Collections.singletonList(this.topic));TopicPartition partition = new TopicPartition(this.topic, 0);
consumer.assign(Arrays.asList(partition));
consumer.seek(partition, seekOffset);

三、从指定的offset进行消费

场景:kafka_2.11-0.11.0.2版本中创建的topic只有一个分区。如果是多分区的话可以参考下这篇文章https://www.cnblogs.com/dongxishaonian/p/12038500.html
代码:

package com.guoxin.sydjtxry;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;/*** Created by HuiQ on 2019-10-30.*/
@Component
public class KafkaConsumerTask implements CommandLineRunner {private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerTask.class);@Overridepublic void run(String... args) {// 全量消费SydjtxryConsumer.consumer();}
}
package com.guoxin.sydjtxry;import kafka.utils.ShutdownableThread;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;public class SydjtxryConsumer extends ShutdownableThread
{private static final Logger LOG = LoggerFactory.getLogger(SydjtxryConsumer.class);private final KafkaConsumer<String, String> consumer;private final String topic;private Long rowcount = 0L;// 一次请求的最大等待时间private final int waitTime = 10000;// consumer从指定的offset处理private Long seekOffset = 1936170L;// Broker连接地址private final String bootstrapServers = "bootstrap.servers";/*** NewConsumer构造函数* @param topic 订阅的Topic名称*/public SydjtxryConsumer(String topic){super("KafkaConsumerExample", false);Properties props = new Properties();KafkaProperties kafkaProc = KafkaProperties.getInstance();// Broker连接地址props.put(bootstrapServers,kafkaProc.getValues(bootstrapServers, "192.110.110.33:9092"));props.put("enable.auto.commit", "true"); // 自动提交props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");consumer = new KafkaConsumer<String, String>(props);this.topic = topic;}/*** 订阅Topic的消息处理函数*/public void doWork(){// 订阅TopicPartition partition = new TopicPartition(this.topic, 0);consumer.assign(Arrays.asList(partition));consumer.seek(partition, seekOffset);compareOffset = seekOffset;// 消息消费请求ConsumerRecords<String, String> records = consumer.poll(waitTime);if (records.isEmpty()) {System.out.println("消费者没有消费到数据------->");} else {// 消息处理for (ConsumerRecord<String, String> record : records) {try {JSONObject jsondata = JSONObject.fromObject(record.value().toString());String table = jsondata.getString("table"); // 库名.表名if (table.equals("BJSX_OGG.GR_XX")) {rowcount++;// 业务逻辑}LOG.info("数据偏移量为-->"  + record.offset());} catch (Exception e) {e.printStackTrace();LOG.warn("偏移量为" + record.offset() + "的数据处理有问题,请排查-->" + record.value().toString());}seekOffset = record.offset();if (seekOffset % 10000 == 0) {LOG.info("offset-->" + seekOffset);}}}}public static void consumer(){SydjtxryConsumer consumerThread = new SydjtxryConsumer("heheda");consumerThread.start();}
}

遇到的问题:当消费到该topic最后一条数据后,以后的消费会循环消费该数据。改进:当消费完最后一条数据,以后的订阅模式都由assign改为subscribe。

package com.guoxin.sydjtxry;import kafka.utils.ShutdownableThread;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;public class SydjtxryConsumer extends ShutdownableThread
{private static final Logger LOG = LoggerFactory.getLogger(SydjtxryConsumer.class);private final KafkaConsumer<String, String> consumer;private final String topic;private Long rowcount = 0L;// 一次请求的最大等待时间private final int waitTime = 10000;// consumer从指定的offset处理private Long seekOffset = 1936170L;private Long compareOffset = 0L;private boolean flag = false;// Broker连接地址private final String bootstrapServers = "bootstrap.servers";// Group idprivate final String groupId = "group.id";private StringBuilder grxxdata = new StringBuilder(); // 要批量插入gauss表中的数据private StringBuilder grylzfdata = new StringBuilder(); // 要批量插入gauss表中的数据/*** NewConsumer构造函数* @param topic 订阅的Topic名称*/public SydjtxryConsumer(String topic){super("KafkaConsumerExample", false);Properties props = new Properties();KafkaProperties kafkaProc = KafkaProperties.getInstance();// Broker连接地址props.put(bootstrapServers,kafkaProc.getValues(bootstrapServers, "192.110.110.33:9092"));props.put("enable.auto.commit", "true"); // 自动提交props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");// Group idprops.put(groupId, UUID.randomUUID().toString());consumer = new KafkaConsumer<String, String>(props);this.topic = topic;}/*** 订阅Topic的消息处理函数*/public void doWork(){// 订阅if (compareOffset.equals(seekOffset) && flag == false) {// 暂停kafka的消费 暂停分区的分配consumer.unsubscribe(); // 此处不取消订阅暂停太久会出现订阅超时的错误consumer.pause(consumer.assignment());consumer.subscribe(Collections.singletonList(this.topic));flag = true;} else if (flag == true) {consumer.subscribe(Collections.singletonList(this.topic));} else {TopicPartition partition = new TopicPartition(this.topic, 0);consumer.assign(Arrays.asList(partition));consumer.seek(partition, seekOffset);compareOffset = seekOffset;}// 消息消费请求ConsumerRecords<String, String> records = consumer.poll(waitTime);if (compareOffset.equals(seekOffset + records.count() - 1) && flag == false) {System.out.println("指定offset消费已结束,此条为末尾的重复消费数据,跳过业务处理,此后由assign改为subscribe订阅模式-->");} else {if (records.isEmpty()) {System.out.println("消费者没有消费到数据------->");} else {// 消息处理for (ConsumerRecord<String, String> record : records) {try {JSONObject jsondata = JSONObject.fromObject(record.value().toString());String table = jsondata.getString("table"); // 库名.表名if (table.equals("BJSX_OGG.GR_XX")) {// 业务逻辑}LOG.info("数据偏移量为-->"  + record.offset());} catch (Exception e) {e.printStackTrace();LOG.warn("偏移量为" + record.offset() + "的数据处理有问题,请排查-->" + record.value().toString());}seekOffset = record.offset();if (seekOffset % 10000 == 0) {LOG.info("offset-->" + seekOffset);}}}}}public static void consumer(){SydjtxryConsumer consumerThread = new SydjtxryConsumer("heheda");consumerThread.start();}
}

消费速度控制:
提供pause(Collection<TopicPartition> partitions)resume(Collection<TopicPartition> partitions)方法,分别用来暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据操作。通过这两个方法可以对消费速度加以控制,结合业务使用。

四、怎么获得最后一条消息的offset

我觉得最后这种方法获取到的值减1才是最后一条消息的offset:

package com.guoxin.sydjtxry;import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.utils.ShutdownableThread;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.*;public class SydjtxryConsumer extends ShutdownableThread
{private static final Logger LOG = LoggerFactory.getLogger(SydjtxryConsumer.class);private final KafkaConsumer<String, String> consumer;private final SimpleConsumer simpleConsumer;private final String topic;private Long rowcount = 0L;// 一次请求的最大等待时间private final int waitTime = 10000;final int TIMEOUT = 100000;final int BUFFERSIZE = 64 * 1024;// Broker连接地址private final String bootstrapServers = "bootstrap.servers";private final String groupId = UUID.randomUUID().toString();/*** NewConsumer构造函数* @param topic 订阅的Topic名称*/public SydjtxryConsumer(String topic){super("KafkaConsumerExample", false);Properties props = new Properties();KafkaProperties kafkaProc = KafkaProperties.getInstance();// Broker连接地址props.put(bootstrapServers,kafkaProc.getValues(bootstrapServers, "192.110.110.33:9092"));props.put("enable.auto.commit", "true"); // 自动提交props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest");props.put("group.id", groupId);consumer = new KafkaConsumer<String, String>(props);this.topic = topic;// 获取最后一条消息的offsetsimpleConsumer = new SimpleConsumer("192.110.110.33", 9092, TIMEOUT, BUFFERSIZE, groupid);long lastOffset = getLastOffset(simpleConsumer, this.topic, 0, groupid);System.out.println("最后一条消息的offset是--->" + lastOffset);}/*** 订阅Topic的消息处理函数*/public void doWork(){// 订阅consumer.subscribe(Collections.singletonList(this.topic));// 消息消费请求ConsumerRecords<String, String> records = consumer.poll(waitTime);if (records.isEmpty()) {System.out.println("消费者没有消费到数据------->");} else {// 消息处理for (ConsumerRecord<String, String> record : records) {try {JSONObject jsondata = JSONObject.fromObject(record.value().toString());String table = jsondata.getString("table"); // 库名.表名if (table.equals("BJSX_OGG.GR_XX")) {rowcount++;// 业务逻辑}LOG.info("数据偏移量为-->"  + record.offset());} catch (Exception e) {e.printStackTrace();LOG.warn("偏移量为" + record.offset() + "的数据处理有问题,请排查-->" + record.value().toString());}}}}public static void consumer(){SydjtxryConsumer consumerThread = new SydjtxryConsumer("heheda");consumerThread.start();}// 获取最后一条消息的offset方法public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, String groupId) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),groupId);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];}
}

五、以时间戳查询消息

Kafka 在0.10.1.1 版本增加了时间戳索引文件,因此我们除了直接根据偏移量索引文件查询消息之外,还可以根据时间戳来访问消息。consumer-API 提供了一个offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,该方法入参为一个Map 对象,Key 为待查询的分区,Value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的偏移量和时间戳。需要注意的是,若待查询的分区不存在,则该方法会被一直阻塞。

假设我们希望从某个时间段开始消费,那们就可以用offsetsForTimes()方法定位到离这个时间最近的第一条消息的偏移量,在查到偏移量之后调用seek(TopicPartition partition, long offset)方法将消费偏移量重置到所查询的偏移量位置,然后调用poll()方法长轮询拉取消息。例如,我们希望从主题“stock-quotation”第0 分区距离当前时间相差12 小时之前的位置开始拉取消息

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("client.id", "test");
props.put("enable.auto.commit", true);// 显示设置偏移量自动提交
props.put("auto.commit.interval.ms", 1000);// 设置偏移量提交时间间隔
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.assign(Arrays.asList(new TopicPartition("test", 0)));
try {  Map<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition,Long>();  // 构造待查询的分区  TopicPartition partition = new TopicPartition("stock-quotation", 0);  // 设置查询12 小时之前消息的偏移量  timestampsToSearch.put(partition, (System.currentTimeMillis() - 12 * 3600 * 1000));  // 会返回时间大于等于查找时间的第一个偏移量  Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes (timestampsToSearch);  OffsetAndTimestamp offsetTimestamp = null;  // 这里依然用for 轮询,当然由于本例是查询的一个分区,因此也可以用if 处理  for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetMap.entrySet()) {  // 若查询时间大于时间戳索引文件中最大记录索引时间,  // 此时value 为空,即待查询时间点之后没有新消息生成  offsetTimestamp = entry.getValue();  if (null != offsetTimestamp) {  // 重置消费起始偏移量  consumer.seek(partition, entry.getValue().offset());  }  }  while (true) {  // 等待拉取消息  ConsumerRecords<String, String> records = consumer.poll(1000);  for (ConsumerRecord<String, String> record : records){  // 简单打印出消息内容  System.out.printf("partition = %d, offset = %d,key= %s value = %s%n", record.partition(), record.offset(), record.key(),record.value());  }  }
} catch (Exception e) {  e.printStackTrace();
} finally {  consumer.close();
}

六、消费者手动提交

场景:offset下标自动提交其实在很多场景都不适用,因为自动提交是在kafka拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。
很多情况下我们需要从kafka成功拉取数据之后,对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种 , 所以这时我们就需要进行手动提交kafka的offset下标。

实施测试:
将 enable.auto.commit 改成 false 进行手动提交,并且设置每次拉取最大10条

props.put("enable.auto.commit", "false");
props.put("max.poll.records", 10);

将提交方式改成false之后,需要手动提交只需加上这段代码

  • 同步提交:consumer.commitSync();
  • 异步提交:consumer.commitAsync()

注:在成功提交或碰到无法恢复的错误之前,commitSync() 会一直重试,但是 commitAsync() 不会,这也是 commitAsync() 不好的一个地方。
它之所以不进行重试,是因为在它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用于提交偏移量 2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync() 重新尝试提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。这个时候如果发生再均衡,就会出现重复消息。

while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("topic = %s, partition = %s,offset = %d, customer = %s, country = %s\n",record.topic(), record.partition(), record.offset(),record.key(), record.value());}consumer.commitAsync(new OffsetCommitCallback() {public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if (e != null)log.error("Commit failed for offsets {}", map, e);}});
}

说明:
手动提交的offset不能再次消费,未提交的可以再次进行消费。
这种做法一般也可以满足大部分需求。例如从kafka获取数据入库,如果一批数据入库成功,就提交offset,否则不提交,然后再次拉取。但是这种做法并不能最大的保证数据的完整性。
比如在运行的时候,程序挂了之类的。所以还有一种方法是手动的指定offset下标进行获取数据,直到kafka的数据处理成功之后,将offset记录下来,比如写在数据库中。

七、同步和异步混合提交:

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。
  
但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。因此在这种情况下,我们应该考虑使用混合提交的方法:

try {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.println("topic = %s, partition = %s, offset = %d,customer = %s, country = %s\n",record.topic(), record.partition(),record.offset(), record.key(), record.value());}consumer.commitAsync(); }
} catch (Exception e) {log.error("Unexpected error", e);
} finally {try {consumer.commitSync(); } finally {consumer.close();}
}
  1. 在程序正常运行过程中,我们使用 commitAsync 方法来进行提交,这样的运行速度更快,而且就算当前提交失败,下次提交成功也可以。
  2. 如果直接关闭消费者,就没有所谓的“下一次提交”了,因为不会再调用poll()方法。使用 commitSync() 方法会一直重试,直到提交成功或发生无法恢复的错误。

八、提交特定的偏移量:

如果 poll() 方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,想要在批次中间提交偏移量该怎么办?这种情况无法通过调用 commitSync() 或 commitAsync() 来实现,因为它们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。

这时候需要使用一下的两个方法:

/*** Commit the specified offsets for the specified list of topics and partitions.*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)/*** Commit the specified offsets for the specified list of topics and partitions to Kafka.*/
@Override
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

消费者 API 允许在调用 commitSync() 和 commitAsync() 方法时传进去希望提交的分区和偏移量的 map。
假设处理了半个批次的消息,最后一个来自主题“customers”分区 3 的消息的偏移量是 5000,你可以调用 commitSync() 方法来提交它。不过,因为消费者可能不只读取一个分区,你需要跟踪所有分区的偏移量,所以在这个层面上控制偏移量的提交会让代码变复杂。

代码如下:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets =new HashMap<>();
int count = 0;
。。。while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records){System.out.printf("topic = %s, partition = %s, offset = %d,customer = %s, country = %s\n",record.topic(), record.partition(), record.offset(),record.key(), record.value()); currentOffsets.put(new TopicPartition(record.topic(),record.partition()), newOffsetAndMetadata(record.offset()+1, "no metadata")); if (count % 1000 == 0) consumer.commitAsync(currentOffsets,null); count++;}
}

这里调用的是 commitAsync(),不过调用commitSync()也是完全可以的。在提交特定偏移量时,仍然要处理可能发生的错误。

九、监听再均衡:

如果 Kafka 触发了再均衡,我们需要在消费者失去对一个分区的所有权之前提交最后一个已处理记录的偏移量。如果消费者准备了一个缓冲区用于处理偶发的事件,那么在失去分区所有权之前,需要处理在缓冲区累积下来的记录。可能还需要关闭文件句柄、数据库连接等。

在为消费者分配新分区或移除旧分区时,可以通过消费者 API 执行一些应用程序代码,在调用 subscribe() 方法时传进去一个 ConsumerRebalanceListener 实例就可以了。 ConsumerRebalanceListener 有两个需要实现的方法。

  • public void onPartitionsRevoked(Collection partitions) 方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。
  • public void onPartitionsAssigned(Collection partitions) 方法会在重新分配分区之后和消费者开始读取消息之前被调用。

下面的例子将演示如何在失去分区所有权之前通过 onPartitionsRevoked() 方法来提交偏移量。

private Map<TopicPartition, OffsetAndMetadata> currentOffsets=new HashMap<>();private class HandleRebalance implements ConsumerRebalanceListener { public void onPartitionsAssigned(Collection<TopicPartition>partitions) { }public void onPartitionsRevoked(Collection<TopicPartition>partitions) {System.out.println("Lost partitions in rebalance.Committing currentoffsets:" + currentOffsets);consumer.commitSync(currentOffsets); }
}try {consumer.subscribe(topics, new HandleRebalance()); while (true) {ConsumerRecords<String, String> records =consumer.poll(100);for (ConsumerRecord<String, String> record : records){System.out.println("topic = %s, partition = %s, offset = %d,customer = %s, country = %s\n",record.topic(), record.partition(), record.offset(),record.key(), record.value());currentOffsets.put(new TopicPartition(record.topic(),record.partition()), newOffsetAndMetadata(record.offset()+1, "no metadata"));}consumer.commitAsync(currentOffsets, null);}
} catch (WakeupException e) {// 忽略异常,正在关闭消费者
} catch (Exception e) {log.error("Unexpected error", e);
} finally {try {consumer.commitSync(currentOffsets);} finally {consumer.close();System.out.println("Closed consumer and we are done");}
}

如果发生再均衡,我们要在即将失去分区所有权时提交偏移量。要注意,提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量。因为分区有可能在我们还在处理消息的时候被撤回。我们要提交所有分区的偏移量,而不只是那些即将失去所有权的分区的偏移量——因为提交的偏移量是已经处理过的,所以不会有什么问题。调用 commitSync() 方法,确保在再均衡发生之前提交偏移量。

Kafka的assign和subscribe订阅模式相关推荐

  1. Redis 之 subscribe 订阅模式封装

    subscribe 订阅模式封装 Redis 里的订阅/发布命令 命令 用例和描述 subscribe subscribe channel [channel -] 订阅一个或多个频道 unsubscr ...

  2. kafka consumer assign 和 subscribe模式差异分析

    转载请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7200971.html 最近需要研究flink-connector-kafka的消费行为,发现fli ...

  3. RabbitMQ-Java实现Publish/Subscribe订阅模式

    订阅模式 一个生成者,多个消费者,每个消费者有自己的队列,生产者没有直接把消息发到队列,而是发给了交换机exchange 适合场景举例:对于同一个消息,要发邮件,也要发短信,因此拆分成两个队列 新建连 ...

  4. Java笔记-使用RabbitMQ的Java接口实现Publish/Subscribe(订阅模式)

    目录 基本概念 代码与实例 基本概念 模型如上: 1. 一个生产者,多个消费者: 2. 每个消费者都有自己的队列: 3. 生产者没有直接把消息发送到队列,而是发送到交换机,通过交换机转发到队列: 4. ...

  5. kafka 发布-订阅模式_使用Apache Kafka作为消息系统的发布-订阅通信中的微服务,并通过集成测试进行了验证...

    kafka 发布-订阅模式 发布-订阅消息系统在任何企业体系结构中都起着重要作用,因为它可以实现可靠的集成而无需紧密耦合应用程序. 在解耦的系统之间共享数据的能力并不是一个容易解决的问题. 考虑一个企 ...

  6. RabbitMQ发布/订阅模式(Publish/Subscribe)

    工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有 ...

  7. Kafka的点对点模式、发布订阅模式、基础架构

    一.定义 Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于 大数据实时处理领域. 二.消息队列 使用消息队列的好处 1)解耦 允许你独立的扩展或修改两边 ...

  8. 【RabbitMQ】基础三:发布与订阅模式(Publish/Subscribe)

    [RabbitMQ]基础三:发布与订阅模式(Publish/Subscribe) 1. 订阅模式 2. 发布与订阅模式说明 3. 代码示例 3.1 生产者 3.2 消费者 3.3 测试 4. 总结 1 ...

  9. RabbitMQ指南之三:发布/订阅模式(Publish/Subscribe)

    在上一章中,我们创建了一个工作队列,工作队列模式的设想是每一条消息只会被转发给一个消费者.本章将会讲解完全不一样的场景: 我们会把一个消息转发给多个消费者,这种模式称之为发布-订阅模式. 为了阐述这个 ...

最新文章

  1. GDAL库简介以及在Windows下编译过程
  2. ThinkPHP 3.1.2 视图 1
  3. Keras + Windows +Anaconda2-4.2.0 深度学习框架快速搭建
  4. 定义一个计算字符串有效长度的_一个正方形的小抽屉柜,根据设计草图计算出所需四片木板的长度...
  5. kvmweb管理工具_KVM web管理工具——WebVirtMgr
  6. 文献学习(part44)--Aberrance suppresse dspatio-temporal correlation filters for visual object tracking
  7. Visual Studio 2017 ASP.NET Core开发
  8. JAVA JFrame编程
  9. java程序(1016)
  10. MTK平台上电话黑名单功能总结
  11. iOS开发之 Autolayout 详解
  12. 关于Autorelease和RunLoop
  13. php随机产生六位数密码
  14. mongoDB--初识mongoDB安装过程
  15. 解析G652,G657A,G655和G654光缆之间的区别
  16. 【论文翻译】SETR:Rethinking Semantic Segmentation from a Sequence-to-Sequence Perspective with Transformer
  17. 关于调整互联网、电话订票起售时间的公告
  18. 加密流量分析-4.加密协议分析
  19. 144显示器只有60_DIY老司机:吃鸡显示器非得用144Hz,60Hz就不行?
  20. 51单片机系列--LCD1602A

热门文章

  1. win10系统如何打开.swf视频文件,flash palyer无法使用
  2. 认识并理顺元宇宙与产业互联网之间的关系,可以打开产业互联网的发展新症结
  3. 跨职能流程图_大数据优化:跨职能集成是否关键?
  4. mac彻底卸载idea
  5. 网站劫持 网站(域名)被劫持怎么检测 遇到网站恶意跳转不要慌(干货)
  6. 吴恩达机器学习(十五)—— 应用实例:图片文字识别
  7. 赚钱的方法分享--首先你要有赚钱的思维和方向计划
  8. QT生成动态链接库及调用详细步骤
  9. 希望计算机专业同学都知道这些宝藏博主
  10. 分布式 - ElasticSearch解决大数据量检索难题