深入浅出系列之 -- kafka消费者的三种语义模型
本文主要详解kafka client的使用,包括kafka消费者的三种消费语义at-most-once,at-least-once,和exact-once message,生产者的使用等。
创建主题
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic normal --partitions 2 --rerelication-factor 1
生产者
public class ProducerExample {public static void main(String[] str) throws InterruptedException, IOException {System.out.println("Starting ProducerExample ...");sendMessages();}private static void sendMessages() throws InterruptedException, IOException {Producer<String, String> producer = createProducer();sendMessages(producer);// Allow the producer to complete sending of the messages before program exit.Thread.sleep(20);}private static Producer<String, String> createProducer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);// Controls how much bytes sender would wait to batch up before publishing to Kafka.props.put("batch.size", 10);props.put("linger.ms", 1);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer(props);}private static void sendMessages(Producer<String, String> producer) {String topic = "normal-topic";int partition = 0;long record = 1;for (int i = 1; i <= 10; i++) {producer.send(new ProducerRecord<String, String>(topic, partition, Long.toString(record),Long.toString(record++)));}}
}
消费者
消费者注册到卡夫卡有多种方式:
订阅:这种方式在新增的话题或者分区或者消费者增加或者消费者减少的时候,会进行消费者组内消费者的再平衡。
分配:这种方式注册的消费者不会进行重新平衡。
上面两种方式都是可以实现,三种消费语义的。具体API的使用请看下文。
1.最多一次kafka消费者
最多一次消费语义是kafka消费者的默认实现。配置这种消费者最简单的方式是
1)enable.auto.commit设置为真。
2)auto.commit.interval.ms设置为一个较低的时间范围。
3)consumer.commitSync()不要调用该方法。
由于上面的配置,就可以使得kafka有线程负责按照指定间隔提交偏移。但是这种方式会使得kafka消费者有两种消费语义:
消费语义最多一次 :
消费者的偏移已经提交,但是消息还在处理,这个时候挂了,再重启的时候会从上次提交的偏移处消费,导致上次在处理的消息部分丢失。
消费语义最少一次:
消费者已经处理完了,但是偏移还没提交,那么这个时候消费者挂了,就会导致消费者重复消费消息处理。但是由于auto.commit.interval.ms设置为一个较低的时间范围,会降低这种情况出现的概率。
代码如下:
public class AtMostOnceConsumer {public static void main(String[] str) throws InterruptedException {System.out.println("Starting AtMostOnceConsumer ...");execute();}private static void execute() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Subscribe to all partition in that topic. 'assign' could be used here// instead of 'subscribe' to subscribe to specific partition.consumer.subscribe(Arrays.asList("normal-topic"));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg1";props.put("group.id", consumeGroup);// Set this property, if auto commit should happen.props.put("enable.auto.commit", "true");// Auto commit interval, kafka would commit offset at this interval.props.put("auto.commit.interval.ms", "101");// This is how to control number of records being read in each pollprops.put("max.partition.fetch.bytes", "135");// Set this if you want to always read from beginning.// props.put("auto.offset.reset", "earliest");props.put("heartbeat.interval.ms", "3000");props.put("session.timeout.ms", "6001");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer) {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);long lastOffset = 0;for (ConsumerRecord<String, String> record : records) {System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());lastOffset = record.offset();}System.out.println("lastOffset read: " + lastOffset);process();}}private static void process() throws InterruptedException {// create some delay to simulate processing of the message.Thread.sleep(20);}
}
2.至少一次kafka消费者
实现最少一次消费语义的消费者也很简单。
1)设置enable.auto.commit为假
2)消息处理完之后手动调用consumer.commitSync()
这种方式就是要手动在处理完该次轮询得到消息之后,调用偏移异步提交函数consumer.commitSync()。建议是消费者内部实现密等,来避免消费者重复处理消息进而得到重复结果。最多一次发生的场景是消费者的消息处理完并输出到结果库(也可能是部分处理完),但是偏移还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息。
代码如下:
public class AtLeastOnceConsumer {public static void main(String[] str) throws InterruptedException {System.out.println("Starting AutoOffsetGuranteedAtLeastOnceConsumer ...");execute();}private static void execute() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Subscribe to all partition in that topic. 'assign' could be used here// instead of 'subscribe' to subscribe to specific partition.consumer.subscribe(Arrays.asList("normal-topic"));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg1";props.put("group.id", consumeGroup);// Set this property, if auto commit should happen.props.put("enable.auto.commit", "true");// Make Auto commit interval to a big number so that auto commit does not happen,// we are going to control the offset commit via consumer.commitSync(); after processing // message.props.put("auto.commit.interval.ms", "999999999999");// This is how to control number of messages being read in each pollprops.put("max.partition.fetch.bytes", "135");props.put("heartbeat.interval.ms", "3000");props.put("session.timeout.ms", "6001");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer) throws {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);long lastOffset = 0;for (ConsumerRecord<String, String> record : records) {System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());lastOffset = record.offset();}System.out.println("lastOffset read: " + lastOffset);process();// Below call is important to control the offset commit. Do this call after you// finish processing the business process.consumer.commitSync();}}private static void process() throws InterruptedException {// create some delay to simulate processing of the record.Thread.sleep(20);}
}
3.使用subscribe实现Exactly-once
使用subscribe实现Exactly-once很简单,具体思路如下:
1)将enable.auto.commit设置为假。
2)不调用consumer.commitSync()。
3)使用SUBCRIBE定于话题。
4)实现一个ConsumerRebalanceListener,在该监听器内部执行consumer.seek(topicPartition,偏移),从指定的主题/分区的偏移处启动。
5)在处理消息的时候,要同时控制保存住每个消息的偏移量。以原子事务的方式保存偏移和处理的消息结果。传统数据库实现原子事务比较简单。但对于非传统数据库,比如HDFS或者nosql的,为了实现这个目标,只能将偏移与消息保存在同一行。
6)实现密等,作为保护层。
代码如下:
public class ExactlyOnceDynamicConsumer {private static OffsetManager offsetManager = new OffsetManager("storage2");public static void main(String[] str) throws InterruptedException {System.out.println("Starting ExactlyOnceDynamicConsumer ...");readMessages();}private static void readMessages() throws InterruptedException {KafkaConsumer<String, String> consumer = createConsumer();// Manually controlling offset but register consumer to topics to get dynamically// assigned partitions. Inside MyConsumerRebalancerListener use// consumer.seek(topicPartition,offset) to control offset which messages to be read.consumer.subscribe(Arrays.asList("normal-topic"),new MyConsumerRebalancerListener(consumer));processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg3";props.put("group.id", consumeGroup);// Below is a key setting to turn off the auto commit.props.put("enable.auto.commit", "false");props.put("heartbeat.interval.ms", "2000");props.put("session.timeout.ms", "6001");// Control maximum data on each poll, make sure this value is bigger than the maximum // single message sizeprops.put("max.partition.fetch.bytes", "140");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}private static void processRecords(KafkaConsumer<String, String> consumer) {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());// Save processed offset in external storage.offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());}}}
}
public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {private OffsetManager offsetManager = new OffsetManager("storage2");private Consumer<String, String> consumer;public MyConsumerRebalancerListener(Consumer<String, String> consumer) {this.consumer = consumer;}public void onPartitionsRevoked(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));}}public void onPartitionsAssigned(Collection<TopicPartition> partitions) {for (TopicPartition partition : partitions) {consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));}}
}
/**
* The partition offset are stored in an external storage. In this case in a local file system where
* program runs.
*/
public class OffsetManager {private String storagePrefix;public OffsetManager(String storagePrefix) {this.storagePrefix = storagePrefix;}/*** Overwrite the offset for the topic in an external storage.** @param topic - Topic name.* @param partition - Partition of the topic.* @param offset - offset to be stored.*/void saveOffsetInExternalStore(String topic, int partition, long offset) {try {FileWriter writer = new FileWriter(storageName(topic, partition), false);BufferedWriter bufferedWriter = new BufferedWriter(writer);bufferedWriter.write(offset + "");bufferedWriter.flush();bufferedWriter.close();} catch (Exception e) {e.printStackTrace();throw new RuntimeException(e);}}/*** @return he last offset + 1 for the provided topic and partition.*/long readOffsetFromExternalStore(String topic, int partition) {try {Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;} catch (Exception e) {e.printStackTrace();}return 0;}private String storageName(String topic, int partition) {return storagePrefix + "-" + topic + "-" + partition;}
}
4.使用指定实现完全一次
使用assign实现Exactly-once也很简单,具体思路如下:
1)将enable.auto.commit设置为假。
2)不调用consumer.commitSync()。
3)调用指定注册卡夫卡消费者到卡夫卡
4)初次启动的时候,调用consumer.seek(topicPartition,偏移)来指定偏移量。
5)在处理消息的时候,要同时控制保存住每个消息的偏移量。以原子事务的方式保存偏移和处理的消息结果。传统数据库实现原子事务比较简单。但对于非传统数据库,比如HDFS或者nosql的,为了实现这个目标,只能将偏移与消息保存在同一行。
6)实现密等,作为保护层。
代码如下:
public class ExactlyOnceStaticConsumer {private static OffsetManager offsetManager = new OffsetManager("storage1");public static void main(String[] str) throws InterruptedException, IOException {System.out.println("Starting ExactlyOnceStaticConsumer ...");readMessages();}private static void readMessages() throws InterruptedException, IOException {KafkaConsumer<String, String> consumer = createConsumer();String topic = "normal-topic";int partition = 1;TopicPartition topicPartition =registerConsumerToSpecificPartition(consumer, topic, partition);// Read the offset for the topic and partition from external storage.long offset = offsetManager.readOffsetFromExternalStore(topic, partition);// Use seek and go to exact offset for that topic and partition.consumer.seek(topicPartition, offset);processRecords(consumer);}private static KafkaConsumer<String, String> createConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");String consumeGroup = "cg2";props.put("group.id", consumeGroup);// Below is a key setting to turn off the auto commit.props.put("enable.auto.commit", "false");props.put("heartbeat.interval.ms", "2000");props.put("session.timeout.ms", "6001");// control maximum data on each poll, make sure this value is bigger than the maximum // single message sizeprops.put("max.partition.fetch.bytes", "140");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(props);}/*** Manually listens for specific topic partition. But, if you are looking for example of how to * dynamically listens to partition and want to manually control offset then see* ExactlyOnceDynamicConsumer.java*/private static TopicPartition registerConsumerToSpecificPartition(KafkaConsumer<String, String> consumer, String topic, int partition) {TopicPartition topicPartition = new TopicPartition(topic, partition);List<TopicPartition> partitions = Arrays.asList(topicPartition);consumer.assign(partitions);return topicPartition;}/*** Process data and store offset in external store. Best practice is to do these operations* atomically. */private static void processRecords(KafkaConsumer<String, String> consumer) throws {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());}}}
}
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
考虑一千次,不如去做一次;犹豫一万次,不如实践一次;华丽的跌倒,胜过无谓的彷徨,将来的你,一定会感谢现在奋斗的你。欢迎大家加入大数据交流群:725967421 一起交流,一起进步!!
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
深入浅出系列之 -- kafka消费者的三种语义模型相关推荐
- 【Kafka】Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor
文章目录 1. 分配策略 1.1 Range(默认策略) 1.2 RoundRobin RoundRobin的两种情况 1.3 StickyAssignor 2. Range策略演示 参考 相关文章 ...
- Kafka消费者组三种分区分配策略roundrobin,range,StickyAssignor
一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消 ...
- 深入理解C语言-二级指针三种内存模型
二级指针相对于一级指针,显得更难,难在于指针和数组的混合,定义不同类型的二级指针,在使用的时候有着很大的区别 第一种内存模型char *arr[] 若有如下定义 char *arr[] = {&quo ...
- reactor线程模型_简单了解Java Netty Reactor三种线程模型
1. Reactor三种线程模型 1.1. 单线程模型 Reactor单线程模型,指的是所有的IO操作都在同一个NIO线程上面完成,NIO线程的职责如下: 1)作为NIO服务端,接收客户端的TCP连接 ...
- Reactor三种线程模型与Netty线程模型
一.Reactor三种线程模型 1.1.单线程模型 单个线程以非阻塞IO或事件IO处理所有IO事件,包括连接.读.写.异常.关闭等等.单线程Reactor模型基于同步事件分离器来分发事件,这个同步事件 ...
- 浅析常用软件架构中的一定要理解的三种架构模型
2019独角兽企业重金招聘Python工程师标准>>> 常用的软件架构模型可以归类为三种架构模型:3/N层架构."框架+插件"架构.地域分布式架构. 一.三种架构 ...
- 简单说一linux内核的内存模型(平坦,不连续,稀疏等三种内存模型)
目录 Linux内核支持的三种内存模型 CONFIG_FLATMEM(平坦内存模型) 基本概念(以第一个为例,不在复述) 所以说什么是平坦模型 所以说什么是不连续模型 所以说什么是稀疏模型 总结 鸣谢 ...
- word2vec三种保存模型方式
大家好,我是爱编程的喵喵.双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中.从事机器学习以及相关的前后端开发工作.曾在阿里云.科大讯飞.CCF等比赛获得多次Top名次.现 ...
- 三种Cross-lingual模型 (XLM, XLM-R, mBART)详解
本文将详述三种Cross-lingual模型,按照其在Arxiv上发表论文的时间,分别是XLM(2019/1/22).XLM-R(2019/11/5).mBART(2020/1/22),有意思的是这三 ...
最新文章
- 报名 | 清华大数据论坛之深度学习技术与应用
- libtool: line 454 CDPATH libtool: line 1132: func_opt_split: : command not found
- [bzoj4131]并行博弈_博弈论
- 介绍 Echoo: go 语言编写的 echo 服务器
- php怎么把字符转成大写,php怎么把字符串转换为大写
- 小甲鱼 OllyDbg 教程系列 (十) : Windows 逆向常用 api 以及 XOFTSPY 逆向
- WdatePicker日历控件使用方法
- 翻译pdf中的英文 python_浅谈python实现Google翻译PDF,解决换行的问题
- 如何获取集合里面的下标_怎样获取list集合中的最后一个对象中的值
- 安卓手机运行ios教程_安卓手机充电提示音教程
- 笨猪猪:“暹粒游记”(下)
- 福建省漳州市谷歌卫星地图下载
- dellg3计算机rom,戴尔G3 U盘装系统win7教程
- BAT前端老鸟总结:未来几年web前端发展四大趋势前瞻
- 不同计算机用户的区别是什么意思,电脑操作系统的32位和64位分别是什么意思?有什么区别?...
- LCD制作工艺及相关设备资料配方大全(转)
- 使用正则表达式验证银行帐号
- VBA调用Shell
- wegame每次登陆都要滑动验证_WeGame版《怪物猎人世界》的猎人们,你的权限验证正常吗?...
- Bundle Adjustment (BA) in vSLAM or SFM