kafka诞生之初,它自带一个基于scala的生产者和消费者客户端。但是慢慢的我们认识到这些API有很多限制。比如,消费者有一个“高级”API支持分组和异常控制,但是不支持很多更复杂的应用场景;它也有一个“低级”API,支持对细节的完全控制,但是要求码农自己控制失败和异常。所以重新设计了它们。

这个过程的第一阶段就是在0.8.1版本的时候重写了生产者API。在最近的0.9版本中完成了第二阶段,提供了消费者的新API。建立在新的分组协议只是,新的消费者带来以下好处:

  • API更加简洁:新的消费者API综合了老版本的“高级”和“低级”API的功能,同时提供了分组机制和lower level access来实现自己的消费策略;
  • 减少了依赖:新的消费者API是用纯java写的。没有了scala和zk的依赖,让代码工程更轻量级;
  • 更安全:新的消费者API支持kafka0.9版本的安全机制;
  • 新的消费者也增加了一系列的机制来控制组消费时的容错。老的API使用大量的java代码实现的(与ZK交互过多),复杂的逻辑很难让其他语言的消费者实现。新的API使这变得更简单。现在已经有C版本的客户端了。

虽然新的消费者是被重新设计过的和新的交互机制,但很多感念没有本质区别,所以熟悉老API的码农也不会觉得新API生硬。但是,也有一些特别细微的细节相对于组管理和线程模型需要在码代码的时候注意。

还有一个注意点:新的消费者API还是测试版本。(不稳定哦,随时会有BUG冒出来,伟大的踩坑者)

Getting Started

略过旧API中的分组消费介绍。。。

旧的API强依赖ZK做分组管理,新的API使用kafka自己的分组协调机制。针对每个消费组,会从所有的broker中挑选出一个出来充当这个组的“协调员”。协调员负责管理该组的状态。它的主要任务是,当新的组成员进入、老的组成员离开和元数据改变时进行分区的协调分配。这种重新分配分区的行为称之为“重新平衡组”。

当一个组首次被初始化,每个分区的消费者一般会从最早或最近的数据开始读。然后在每个分区的消息被依次读出。在消费过程中,消费者会提交已经成功处理了的消息的偏移量。例如,在下图中,消费者正在读的消息的偏移量是6,而它最近一次提交的偏移量是1:

当一个分区被重新分配给组中的另一个消费者时,这个消费者会从上一个消费者最后一次提交的偏移量处开始读。如果上面例子中的消费者突然崩溃了,其他组成员读的时候会从1开始读。这种情况下,它会从1到6重新消费一遍。

上图中还标注了其他两个位置。Log End Offset标记了最后一条消息写入后的偏移量。High Watermark标记了最后被其他replicas同步成功了的偏移量。对于消费者来说,只能读到High Watermark处,这样为了防止未同步的消息被读了以后丢失掉。

配置和初始化

在开始使用新的消费者API之前,先把 kafka-clients 这个依赖加到工程中。

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.9.0.1</version>
</dependency>

View Code

消费者通过Properties文件来配置消费属性,下面是一个最小配置:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

View Code

与旧的消费者和生产者一样,我们需要配置broker连接参数。我们不需要提供集群中所有服务器的连接参数,客户端会根据给定的连接参数集合得到所有的存活broker。客户端还需要配置key和value的初始化类。最后配置group.id。

订阅TOPIC

在开始消费之前,必须先订阅一些需要读取消息的topic。下面的例子中,同时订阅了foo和bar两个topic:

consumer.subscribe(Arrays.asList("foo", "bar"));

View Code

订阅后,消费者会与组内其他消费者协调分区的分配。在开始消费消息的时候这些事自动完成的。稍后会展示如何使用分配API手动指定分区 。但是不能手动和自动一起用。

订阅topic的方法不能增量订阅:每次订阅必须包含要订阅的所有topic。可以随时改变订阅,新的订阅会替换旧的订阅。

基本的POLL循环

消费者需要并行化地读取数据,可能从分布在不同broker的不同topic的不同分区。为了做到这一点,新的API用了近似unix得pool或者select调用:一旦订阅了一些topic,所有未来的协调、重新平衡和数据获取都被一个调用事件所驱动。这需要单个线程掌控所有IO的一个简单而有效的实现。

订阅一个主题后,需要一个事件循环来接受分区的分配和数据的获取。听起来复杂,其实只需要在循环调用poll方法,然后消费者客户端就会处理剩下的事情。每次调用poll方法,都会收到(可能为空)被分配的分区里面的一系列数据。下面是基本例子:

try {while (running) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records)System.out.println(record.offset() + ": " + record.value());}
} finally {consumer.close();
}

View Code

传进poll方法里面的参数是一个Long类型的,表示等待消息的时间:如果队列里面有消息,会立马返回,如果没有,会等待指定的时间然后返回。

消费者被设计成在自己的线程里面运行。没有外部同步的多线程是不安全的,也是不建议这样做的。

当消费完成后一定记得关闭它,这样会保证组内协调分配分区不会混乱(因为一个分区只能被组内的一个消费者消费)。

上例中使用了一个较小的超时时间为了保证不会有太多延时去关闭消费者。下面这个例子中使用了很长的超时时间和用wakeup API来跳出循环:

try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);for (ConsumerRecord<String, String> record : records)System.out.println(record.offset() + “: ” + record.value());}
} catch (WakeupException e) {// ignore for shutdown
} finally {consumer.close();
}

View Code

wakeup操作是线程安全的:

/*** Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.* The thread which is blocking in an operation will throw {@link org.apache.kafka.common.errors.WakeupException}.*/
@Override
public void wakeup() {this.client.wakeup();
}

View Code

整合到一起:

public class ConsumerLoop implements Runnable {private final KafkaConsumer<String, String> consumer;private final List<String> topics;private final int id;public ConsumerLoop(int id,String groupId, List<String> topics) {this.id = id;this.topics = topics;Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put(“group.id”, groupId);props.put(“key.deserializer”, StringDeserializer.class.getName());props.put(“value.deserializer”, StringDeserializer.class.getName());this.consumer = new KafkaConsumer<>(props);}@Overridepublic void run() {try {consumer.subscribe(topics);while (true) {ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);for (ConsumerRecord<String, String> record : records) {Map<String, Object> data = new HashMap<>();data.put("partition", record.partition());data.put("offset", record.offset());data.put("value", record.value());System.out.println(this.id + ": " + data);}}} catch (WakeupException e) {// ignore for shutdown } finally {consumer.close();}}public void shutdown() {consumer.wakeup();}
}

View Code

测试这里例子的话需要造一些数据。最简单的方式是使用kafka-verifiable-producer.sh这个脚本。

# bin/kafka-topics.sh --create --topic consumer-tutorial --replication-factor 1 --partitions 3 --zookeeper localhost:2181# bin/kafka-verifiable-producer.sh --topic consumer-tutorial --max-messages 200000 --broker-list localhost:9092

View Code

然后是驱动类:

public static void main(String[] args) { int numConsumers = 3;String groupId = "consumer-tutorial-group"List<String> topics = Arrays.asList("consumer-tutorial");ExecutorService executor = Executors.newFixedThreadPool(numConsumers);final List<ConsumerLoop> consumers = new ArrayList<>();for (int i = 0; i < numConsumers; i++) {ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);consumers.add(consumer);executor.submit(consumer);}Runtime.getRuntime().addShutdownHook(new Thread() {@Overridepublic void run() {for (ConsumerLoop consumer : consumers) {consumer.shutdown();} executor.shutdown();try {executor.awaitTermination(5000, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {e.printStackTrace;}}});
}

View Code

例子中启动了三个线程来消费消息,每个线程给一个单独的ID,这样就能清楚的看到哪个线程消费到了哪些信息。shutdown hook会调用线程的wakeup方法来结束消费。在IDE里面可以点击关闭或者在命令行里面使用Ctrl-C。输出结果:

2: {partition=0, offset=928, value=2786}
2: {partition=0, offset=929, value=2789}
1: {partition=2, offset=297, value=891}
2: {partition=0, offset=930, value=2792}
1: {partition=2, offset=298, value=894}
2: {partition=0, offset=931, value=2795}
0: {partition=1, offset=278, value=835}
2: {partition=0, offset=932, value=2798}
0: {partition=1, offset=279, value=838}
1: {partition=2, offset=299, value=897}
1: {partition=2, offset=300, value=900}
1: {partition=2, offset=301, value=903}
1: {partition=2, offset=302, value=906}
1: {partition=2, offset=303, value=909}
1: {partition=2, offset=304, value=912}
0: {partition=1, offset=280, value=841}
2: {partition=0, offset=933, value=2801}

View Code

Consumer Liveness

当组内的一个消费者消费某个分区的时候,这些分区上会有一个基于组的锁,即一个组里面一个消费者正在消费某个分区,组内的其他消费者就不能消费这个分区,如果这个消费者一直健康的运行当然最好,如果因为某些原因死掉,你需要把这个锁解掉,然后把分区分给其他消费者。

kafka的组协调机制使用了心跳机制来解决这个问题。每次重新平衡分区分配后,组内消费者开始向组协调员(某个broker)发送心跳。组协调员持续收到某个消费者的心跳,它就认为这个消费者是健康的。协调员每次收到心跳,都会启动一个计时器。当计时器到时间后还没有收到后面的心跳,就认为这个消费者已经挂掉了,就会把这个分区分配给其他合适的消费者。计时器的持续时间是被称为会话超时,由客户端的session.timeout.ms配置。

props.put("session.timeout.ms", "60000");

会话超时机制能保证当消费者挂掉或者网络故障的时候,分区的锁会被释放,并分配给其他消费。老的消费者再发送心跳也不认为它是健康的。

心跳发送线程和poll线程是一起的,正常poll数据的时候才会发送心跳,否则不会发。

会话超时时间默认是30秒,在网络延时大的集群中可以适当调大这个参数,避免非异常情况下的重新分配分区。

Delivery Semantics

当一个组刚创建时,它的初始化offset是根据 auto.offset.reset 这个配置属性来获取的(在0.8中就加入了这个配置项)。一旦消费者开始消费,它根据应用的需求来提交offset。每次组内重新平衡partition以后,读offset的位置就是上一次最后提交的offset。如果一个应用成功处理了某条消息,但是在成功提交offset之前就崩溃掉了,那么下一个消费者将重新读这条消息,造成重复读。当然,offset的提交频率越快,这种损失就越小。

当我们将 enable.auto.commit 属性设置为true时(默认为true),消费者会在配置属性 auto.commit.interval.ms 的时间间隔后自动提交offset。时间间隔越小,崩溃造成的损失越小,随之影响性能。

如果要自己手动控制offset的提交,则必须将 auto.offset.reset 设置为false。

手动提交offset的API现在还是测试版,但是重要的是如果将它集成到poll循环中。下面代码是一个例子:

try {while (running) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records)System.out.println(record.offset() + ": " + record.value());try {consumer.commitSync();} catch (CommitFailedException e) {// application specific failure handling
    }}
} finally {consumer.close();
}

View Code

上例中使用了commitSync API来提交,它会在成功返回或者遇到错误之前阻塞。你需要关心的主要错误就是消息处理的时间超过session的时间造成超时。当这种事情真正发生的时候,这个消费者会被踢出去,然后造成CommitFailedException异常。应用应该处理这种异常,在上次成功提交offset之后和失败提交offset之后的消息造成的改变进行回滚。

另外你应该保证必须在消息成功处理后再提交offset。

译自:http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client

转载于:https://www.cnblogs.com/admln/p/5446361.html

Kafka 0.9 新消费者API相关推荐

  1. kafka0.9 java commit_Kafka 0.9 新消费者API

    kafka诞生之初,它自带一个基于scala的生产者和消费者客户端.但是慢慢的我们认识到这些API有很多限制.比如,消费者有一个"高级"API支持分组和异常控制,但是不支持很多更复 ...

  2. java kafka 消费_java编程之Kafka_消费者API详解

    1 消息发送 1.异步发送导入依赖 org.apache.kafka kafka-clients 0.11.0.0 编写代码 需要用到的类: KafkaProducer:需要创建一个生产者对象,用来发 ...

  3. Kafka消费者APi

    Kafka客户端从集群中消费消息,并透明地处理kafka集群中出现故障服务器,透明地调节适应集群中变化的数据分区.也和服务器交互,平衡均衡消费者. public class KafkaConsumer ...

  4. kafka偏移量保存到mysql里_Kafka 新版消费者 API(二):提交偏移量

    1. 自动提交 最简单的提交方式是让消费者自动提交偏移量.如果 enable.auto.commit 被设为 true,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去. ...

  5. kafka 主动消费_Kafka核心API——Consumer消费者

    Consumer之自动提交 在上文中介绍了Producer API的使用,现在我们已经知道如何将消息通过API发送到Kafka中了,那么现在的生产者/消费者模型就还差一位扮演消费者的角色了.因此,本文 ...

  6. java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明

    本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...

  7. kafka系列九、kafka事务原理、事务API和使用场景

    一.事务场景 最简单的需求是producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见 . producer可能会给多个topic,多个partition发消息,这些 ...

  8. 海边的卡夫卡之 - kafka的基本概念以及Api使用

    海边的卡夫卡之 - kafka的基本概念以及Api使用 kafka的应用以及与其他MQ的对比 关于kafka的介绍,也许没有人能比官网更具有话语权,所以这里可以参考官网了解一下kafka:Kafka介 ...

  9. Kafka知识总结之消费者简单使用

    本文简述 这篇文件主要是讲kafka消费者相关使用,诸如,offset的使用,消费者的相关配置,多线程消费模式和springboot整合.至于这些里面涉及到原理等相关深入的知识会放到下一篇文件kafk ...

最新文章

  1. FileBuffer 与 ImageBuffer 互相转换(滴水PE作业)
  2. 对linux内核学习的一点感受,对linux内核学习的一点感受
  3. linux -对称加密、 非对称加密
  4. 计算机应用与技术大赛,关于举办2017年燕山大学第一届计算机应用技术与程序设计大赛的通知...
  5. 智能会议系统(23)---移动端视频通话开源软件比较
  6. C/S架构和B/S架构
  7. Linux常用基本命令:三剑客命令之-awk内置函数用法
  8. 简单的爬虫爬取教务网获取成绩
  9. centos部署k8s集群(kubeadm方式)
  10. JAVA项目实战开发电商项目案例(一)java技术演进与更新
  11. Adam优化算法详细解析
  12. 美面魔心伊莉丝:会有蜘蛛之神制裁你
  13. pythonspiit函数_python专题高阶函数
  14. 计算机老是跳出usb设备无法识别,如何解决电脑一直弹出USB设备无法识别的问题?...
  15. 【本人秃顶程序员】高级 Java 必须突破的 10 个知识点!
  16. 淘淘商城(前台系统,展示商城首页,商品分类展示)
  17. 关于使用pop()的用法
  18. 读书笔记第五讲:《卓有成效的工程师》
  19. 144显示器只有60_Win10系统下144hz显示器刷新率只显示60hz如何解决
  20. 有qq的linux系统软件,除了腾讯的QQ在Linux下还能用哪个QQ软件

热门文章

  1. deeplearning4j – 分布式DL开源项目
  2. -1.#IND000 图像类型转换
  3. BigDataMini导论
  4. 《Flask 入门教程》第 6 章:模板优化
  5. Netflix:我们为什么要将GraphQL引入前端架构?\n
  6. 对Unity的Resources目录进行改名
  7. 单机PC手动更改windows update 地址
  8. 百度地图标注图标太小
  9. 轻松Ghost XP系统!
  10. 玻璃体浑浊的分子原理