一、入门程序

先上代码,从代码入手,讲解kafka消费者客户端的细节。

public class HelloKafkaConsumer {public static void main(String[] args) {//设置消费者属性Properties properties = new Properties();properties.put("bootstrap.servers","127.0.0.1:9092");//反序列化器,与生产者的序列化器相对应properties.put("key.deserializer", StringDeserializer.class);properties.put("value.deserializer", StringDeserializer.class);//设置消费者的消费者群组properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);try {//消费者订阅主题(可以多个,支持正则表达式,进行模糊匹配)consumer.subscribe(Collections.singletonList("Hello World"));//kafka消费者是通过拉取的方式获得服务端消息while(true){//循环调用poll方法,获取数据。ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));for(ConsumerRecord<String, String> record:records){System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));}}} finally {consumer.close();}}

二、消费者属性

消费者有以下属性可以进行设置,在ConsumerConfig类里定义了这些属性:

  • auto.offset.reset
    如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除),该怎么办。值有以下几种:
    earliest:自动将偏移量重置为最早偏移量
    latest:自动将偏移量重置为最新偏移量
    none:抛出异常
  • enable.auto.commit
    表明消费者是否自动提交偏移 默认值true。这个的作用是消费者获取到数据后,进行业务处理,当业务处理操作失败时,我们可以选择不提交偏移量,这样的话下次还可以读取这个数据。
  • max.poll.records
    控制每次poll方法返回的的记录数量 默认值500
  • partition.assignment.strategy
    分区分配给消费者的策略。系统提供两种策略。默认为Range, 把主题的连续分区分配给消费者。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区)。RoundRobin是把主题的分区循环分配给消费者。我们也可以继承PartitionAssignor类自定义分区策略。

三、消费者群组

共同订阅一个主题的消费者们,构成一个消费者群组。在一个主题下,一个分区只能被消费者群组里的一个消费者所消费,而不能被多个消费,但是可以被不同群组的消费者所消费。一个消费者,可以消费一个主题下的多个分区,也可以消费不同主题下的分区。




四、偏移量和提交

消费者读取到分区数据到哪里了,通过偏移量来记录。将读取到信息的偏移量上传给kafka服务器,称之为提交。kafka服务中_consumer_offset 的特殊主题记录了消费者读取的偏移量。需要注意的是,偏移量的提交,提交的是最后一刻的偏移量,消费者一下读取多条数据,偏移量只是提交最后的那个偏移量,而不是每条数据的偏移量都读取。
消费者提交偏移量的方式:
自动提交:自动提交即enable.auto.comnit为true。自动提交我们无需关注,kafka每隔几秒的时候提交一次偏移量。
自动提交的弊端:1.当一个消费者挂了时,如果该消费者的偏移量已经提交了,但是消息的业务处理还没处理完,那么,这种情况下我们会认为是消息丢失了。
2.我们处理完业务了,偏移量还没提交,此时消费者挂掉了,那么会造成数据的重复消费问题。

手动提交:手动提交可以在我们处理完业务后,进行偏移量的提交。如果业务处理失败,我们可以选择不提交,重复读取业务处理失败的消息。手动提交分为同步提交和异步提交。同步提交,线程会等待偏移量提交成功。异步提交不会阻塞线程,可以在回调函数里判断成功和失败。代码如下:

public class SyncAndAsync {public static void main(String[] args) {/*消息消费者*/Properties properties = KafkaConst.consumerConfig("SyncAndAsync",StringDeserializer.class,StringDeserializer.class);/*取消自动提交*/properties.put("enable.auto.commit",false);KafkaConsumer<String,String> consumer= new KafkaConsumer<String, String>(properties);try {consumer.subscribe(Collections.singletonList("Hello World"));while(true){ConsumerRecords<String, String> records= consumer.poll(500);for(ConsumerRecord<String, String> record:records){System.out.println(String.format("主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));//do our work}//业务处理的时候,异步提交。//consumer.commitAsync();/*允许执行回调*/consumer.commitAsync(new OffsetCommitCallback() {public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if(exception!=null){System.out.print("Commmit failed for offsets ");System.out.println(offsets);exception.printStackTrace();}}});}} catch (CommitFailedException e) {System.out.println("Commit failed:");e.printStackTrace();} finally {try {//最后,再同步提交一次,保证都提交了。consumer.commitSync();} finally {consumer.close();}}}
}

五、多线程安全问题

消费者在多线程环境下是不安全的,即多个线程公用一个消费者的话,消息是不安全的。所以我们不要将一个消费者,让多个线程使用。做到一个线程使用一个消费者。

六、群组协调

消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求,第一个加入群主的消费者成为群主,群主会获得群组的成员列表,并负责给每一个 消费者分配分区。分配完毕后,群主把分配情况发送给群组协调器,协调器再把这些信息发送给所有的消费者,每个消费者只能看到自己的分配信息, 只有群主知道群组里所有消费者的分配信息。群组协调的工作会在消费者发生变化(新加入或者掉线),主题中分区发生了变化(增加)时发生。

七、分区再均衡

当有新的消费者加入群组或者某个消费者挂掉时,或者主题的分区数量发生变化时,都会触发分区再均衡。分区再均衡就是分区与消费者的映射关系重新洗牌。在再均衡期间,消费者无法读取数据,会造成一段时间的停顿。发生分区再均衡后,消费者会先读取_consumer_offset主题中读取分区的最后偏移量,然后再往后进行读取。
需要注意的是,在发生分区再均衡前,消费者会进行一次清理工作:提交偏移量,避免发生消息丢失等问题。
理解: 消费者群组是针对主题而言的。虽然消费者都是和分区进行对接,而且提交的偏移量都是分区的偏移量。但是这个偏移量是这个消费者群组整体的读取偏移量。所以,当发生分区再均衡,一个分区换成这个消费者群组其他的消费者消费时,也是按该分区在该群组的偏移量去继续进行的。

八、再均衡监听器

ConsumerRebalancelistener是kafka为我们提供的再均衡监听器接口,在分区再均衡发生之前和发生之后,我们可以做一些我们自己的业务,ConsumerRebalancelistener源码如下:

public interface ConsumerRebalanceListener {void onPartitionsRevoked(Collection<TopicPartition> var1);void onPartitionsAssigned(Collection<TopicPartition> var1);
}

其中,onPartitionsRevoked是再均衡之前,执行的方法。onPartitionsAssigned是分区再均衡完成之后,执行的方法。从上面的方法定义中我们可以看到,参数是TopicPartition的集合。它是消费者所订阅的分区。我们可以遍历分区,将偏移量进行提交,以确保再均衡前,把所有的偏移量都提交了,同时,可以把偏移量存在数据库中,确保偏移量的正确性。
同理,分区再均衡之后,遍历消费者订阅的分区,调用seek方法,指定分区再均衡之前的偏移量(从数据库中获取),进行继续消费。

九、处理失败数据再处理思路

当消费者获取到数据并进行业务处理时,如果业务处理失败了,我们不要提交它的偏移量。但是kafka不止返回一条数据,会返回多条记录。消费者在循环遍历数据的时候,如果前一个偏移量数据处理失败了,但是后一个偏移量的数据处理成功了。那么kafka会提交后一个数据的偏移量。这样的话,前一个失败的数据,也就无法读取了。针对这种情况,我们可以改变思路。我们将处理失败的数据的分区,偏移量等信息保存到数据库中,另外单独开启一个消费者,专门去处理这些失败的数据。
在kafka中,我们一定要有结合关系型数据库的思路,来配合kafka的偏移量来解决我们的实际业务问题。

十、spring整合kafka消费者客户端

与spring整合生产者类似,只不过spring配置文件里的配置需要配消费者的。这里我们需要注意一点,在配置生产者时,我们最终得到productTemplate类,通过这个类去调用生产者的方法,发送消息,我们无需自己定义生产者类。而在消费者中,spring整合kafka后,是以监听的形式在接收消息,我们需要手动实现消费者类监听消息后对消息的业务处理,并实现MessageListener接口。接口的onMessage方法就是监听到消费者接收消息触发的方法,在该方法里,就是我们拿接收到的数据做业务处理的方法。
spring配置文件配置具体如下:

<!-- 1.定义consumer的参数 --><bean id="consumerProperties" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="${bootstrap.servers}" /><entry key="group.id" value="spring-kafka-group" /><entry key="key.deserializer"value="org.apache.kafka.common.serialization.StringDeserializer" /><entry key="value.deserializer"value="org.apache.kafka.common.serialization.StringDeserializer" /></map></constructor-arg></bean><!-- 2.创建consumerFactory bean --><bean id="consumerFactory"class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" ><constructor-arg><ref bean="consumerProperties" /></constructor-arg></bean><!-- 消费者自行确认-3.定义消费实现类,实现MessageListener接口,定义onMessage方法,监听消息并对消息进行业务处理 --><bean id="kafkaConsumerServiceAck" class="xxx.xxx.KafkaConsumer" /><!-- 4.消费者容器配置信息,监听哪些主题,相当于原生API中的subscribe方法 --><bean id="containerProperties"class="org.springframework.kafka.listener.ContainerProperties"><constructor-arg name="topics"><list><value>kafka-spring-topic</value></list></constructor-arg><property name="messageListener" ref="kafkaConsumerService"></property></bean><!-- 5.消费者并发消息监听容器,执行doStart()方法 --><bean id="messageListenerContainer"class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"init-method="doStart" ><constructor-arg ref="consumerFactory" /><constructor-arg ref="containerProperties" /><!--  并发几个消费者 --><property name="concurrency" value="3" /></bean><!-- 上面我们配置了一个消费者群组,我们还可以配置多个消费者群组,下面我们配置手动提交偏移量的消费者群组 --><!-- 消费者自行确认-1.定义consumer的参数 --><bean id="consumerPropertiesAck" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="${bootstrap.servers}" /><entry key="group.id" value="spring-kafka-group-ack" /><entry key="key.deserializer"value="org.apache.kafka.common.serialization.StringDeserializer" /><entry key="value.deserializer"value="org.apache.kafka.common.serialization.StringDeserializer" /><entry key="enable.auto.commit" value="false"/></map></constructor-arg></bean><!-- 消费者自行确认-2.创建consumerFactory bean --><bean id="consumerFactoryAck"class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" ><constructor-arg><ref bean="consumerPropertiesAck" /></constructor-arg></bean><!-- 消费者自行确认-3.定义消费实现类 --><bean id="kafkaConsumerServiceAck" class="xxx.xxx.KafkaConsumerAck" /><!-- 消费者自行确认-4.消费者容器配置信息 --><bean id="containerPropertiesAck"class="org.springframework.kafka.listener.ContainerProperties"><!-- topic --><constructor-arg name="topics"><list><value>kafka-spring-topic-b</value></list></constructor-arg><property name="messageListener" ref="kafkaConsumerServiceAck" /><!-- 消费者自行确认模式 --><property name="ackMode" value="MANUAL_IMMEDIATE"></property></bean>
<!-- 消费者自行确认-5.消费者并发消息监听容器,执行doStart()方法 --><bean id="messageListenerContainerAck"class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"init-method="doStart" ><constructor-arg ref="consumerFactoryAck" /><constructor-arg ref="containerPropertiesAck" /><property name="concurrency" value="3" /></bean>

下面我们看需要我们手动定义的处理接收消息的MessageListener实现类:

//普通消费者,自动提交偏移量
public class KafkaConsumer  implements MessageListener<String,String> {public void onMessage(ConsumerRecord<String, String> data) {String name = Thread.currentThread().getName();System.out.println(name+"|"+String.format("主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",data.topic(),data.partition(),data.offset(),data.key(),data.value()));}
}
//手动提交偏移量
public class KafkaConsumerAck implements AcknowledgingMessageListener<String,String> {public void onMessage(ConsumerRecord<String, String> data,Acknowledgment acknowledgment) {String name = Thread.currentThread().getName();System.out.println(name+"|"+String.format("主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",data.topic(),data.partition(),data.offset(),data.key(),data.value()));//手动确认acknowledgment.acknowledge();}
}

这样,我们就配置好了spring环境下的kafka消费者。由此可以看到,在消费者中,我们只需手动实现处理消息的业务逻辑,其他的都通过配置即可。

十一、springboot整合kafka消费者

首先在application.properties配置文件中配置kafka的属性信息:

#============== kafka ===================
kafka.consumer.zookeeper.connect=localhost:2181/kafka-one
kafka.consumer.servers=localhost:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10kafka.producer.servers=localhost:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960

然后,定义配置类:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${kafka.consumer.servers}")private String servers;@Value("${kafka.consumer.enable.auto.commit}")private boolean enableAutoCommit;@Value("${kafka.consumer.session.timeout}")private String sessionTimeout;@Value("${kafka.consumer.auto.commit.interval}")private String autoCommitInterval;@Value("${kafka.consumer.group.id}")private String groupId;@Value("${kafka.consumer.auto.offset.reset}")private String autoOffsetReset;@Value("${kafka.consumer.concurrency}")private int concurrency;public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);return propsMap;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic MyListener listener() {return new MyListener();}@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory= new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(concurrency);factory.getContainerProperties().setPollTimeout(1500);return factory;}public Map<String, Object> consumerConfigsAck() {Map<String, Object> propsMap = new HashMap<>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);return propsMap;}public ConsumerFactory<String, String> consumerFactoryAck() {return new DefaultKafkaConsumerFactory<>(consumerConfigsAck());}@Bean("listenerAck")public MyListenerAck listenerAck() {return new MyListenerAck();}@Bean("factoryAck")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>kafkaListenerContainerFactoryAck() {ConcurrentKafkaListenerContainerFactory<String, String> factory= new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactoryAck());factory.setConcurrency(concurrency);factory.getContainerProperties().setPollTimeout(1500);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}

在springboot中,我们使用@KafkaListener注解,来监听收到的消息,并进行业务处理,需要我们自己定义实现类,具体代码如下:

public class MyListener {protected final Logger logger = LoggerFactory.getLogger(this.getClass());@KafkaListener(topics = {"test"})public void listen(ConsumerRecord<?, ?> record) {logger.info("收到消息的key: " + record.key());logger.info("收到消息的value: " + record.value().toString());}
}
public class MyListenerAck {protected final Logger logger = LoggerFactory.getLogger(this.getClass());@KafkaListener(topics = {"testAck"},containerFactory = "factoryAck")public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {try {logger.info("自行确认方式收到消息的key: " + record.key());logger.info("自行确认方式收到消息的value: " + record.value().toString());} finally {logger.info("消息确认!");ack.acknowledge();}}
}

这样,springboot整合kafka客户端就完成了。

kafka基础篇(四)——kafka消费者客户端相关推荐

  1. kafka 基础知识梳理-kafka是一种高吞吐量的分布式发布订阅消息系统

    一.kafka 简介 今社会各种应用系统诸如商业.社交.搜索.浏览等像信息工厂一样不断的生产出各种信息,在大数据时代,我们面临如下几个挑战: 如何收集这些巨大的信息 如何分析它 如何及时做到如上两点 ...

  2. 前端开发之JavaScript基础篇四

    主要内容: 1.定时器 2.正则表达式入门 3.元字符 4.正则表达式实战运用 一.定时器 javaScript里主要使用两种定时器,分别是:setInterval()和setTimeout(). 1 ...

  3. 转载:谢谢原作者:块设备驱动实战基础篇四 (逐渐成型,加入ioctl通信机制)

    1.6介绍一种内核与用户空间通信的方法-misc设备ioctl机制 块设备驱动开发中往往需要配合用户态的管理程序工具,管理我们的块设备,此时我们需要涉及用户空间程序与块设备通信的方法,ioctl机制就 ...

  4. 测试需要了解的技术之基础篇四__UI自动化测试体系

    UI自动化测试体系 1.Andriod 自动化测试:Appium 环境安装与架构介绍.Appium Desktop用例录制.Appium测试用例流程.元素定位方法 IA/AID/XPATH/UISel ...

  5. java程序试岗内容_java程序员修炼之路基础篇四:继承

    上一篇文章我跟大家聊了一下"封装",今天我们聊一下同样作为java语言三大特征之一的"继承". 简单说"继承"就是从一个已知类派生出新类的过 ...

  6. c if语句多个条件判断顺序_Java中的流程控制语句 (基础篇四)

    流程控制就是对事物次序的布置和安排,在程序中就是对代码执行次序的安排和控制 程序中的流程控制主要有三种:顺序流程.选择流程.循环流程. 顺序流程:比如打印输出的代码按照指定的顺序结构依次排序,打印的结 ...

  7. java中的四个跳转语句_Java中的流程控制语句 (基础篇四)

    流程控制就是对事物次序的布置和安排,在程序中就是对代码执行次序的安排和控制 程序中的流程控制主要有三种:顺序流程.选择流程.循环流程. 顺序流程:比如打印输出的代码按照指定的顺序结构依次排序,打印的结 ...

  8. oracle职工工资数据表四表联动,oracle 学习之基础篇(四):多表查询

    员工表emp和部门表dept的笛卡尔集(笛卡尔集表=列数之和,行数之积,笛卡尔集表内中有些数据是不符合要求的)select emp.ename,dept.dname from emp,dept; 使用 ...

  9. Android基础篇(四)

    AdapterView 指使用适配器来配置多个内容显示的视图,代表的有ListView(列表).GridView(九宫格).ExpanableListView(分组列表) ListView <L ...

最新文章

  1. nginx Win下实现简单的负载均衡(2)站点共享Session
  2. CI 如何获取get请求过来的数据
  3. 安装搜狗输入法之后 Linux Mint 19.1 字体发虚解决方案
  4. 让mysql返回的结果按照传入的id的顺序排序
  5. CentOS上安装Python3.7.4
  6. 使用Area(区域)会遇到的问题
  7. 02.C(数据类型与运算符)
  8. Rocketmq技术分享
  9. 03-JVM内存分配机制详解
  10. Windows server2016 计算机管理中找不到用户和组
  11. 第四章:迭代器与生成器
  12. php 5.2 apc,将APC(替代PHP缓存)集成到PHP5(Debian Etch&Apache2)
  13. 普通程序员如何转向人工智能方向?
  14. 【c语言】两个队列实现一个栈
  15. 路由器和交换机用什么线连接?
  16. Sapphire应用场景剖析 | 基于行业首个隐私EVM构建DApp
  17. Dell Inspiron 15R - QQ语音时麦克风没有声音的设置办法
  18. Delphi 10.3.1 Memo打开/保存utf-8不乱码的方法,网上都是胡天!
  19. SAPNoteSAR格式解压_SAP刘梦_新浪博客
  20. 国开《国家开放大学学习指南》形考任务1-5

热门文章

  1. android记事本的设计报告,安卓记事本开发设计报告.pdf
  2. 南加大计算机专业硕士申请,南加州大学计算机科学专业硕士申请条件独家整理附案例分析...
  3. 书架bookshelf
  4. 建设工程法规专科【5】
  5. 基于偏置比例导引的任意指定攻击角度控制导引律(matlab源代码+原理)
  6. NodeJs 的fs模块
  7. fs.readFile和fs.readFileSync的区别
  8. 祝福语html特效,2015年微信祝福语特效
  9. 女生的择偶标准:曾仕强
  10. 个人投资课 张潇雨_张潇雨《个人投资课》之五大原则