消费者与消费者组代码:

import com.lvxiaosha.kafka.api.Const;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;public class Consumer1 {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");//    GROUP_ID_CONFIG 消费者组配置props.put(ConsumerConfig.GROUP_ID_CONFIG, "module-group-id-1"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅主题consumer.subscribe(Collections.singletonList(Const.TOPIC_MODULE));System.err.println("consumer1 started.. ");try {while (true) {//    拉取结果集ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);String topic = partition.topic();int size = partitionRecords.size();System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));for (int i = 0; i< size; i++) {long offset = partitionRecords.get(i).offset() + 1;System.err.println(String.format("获取value: %s, 提交的 offset: %s", partitionRecords.get(i).value(), offset)); }}}            } finally {consumer.close();}}
}
import com.lvxiaosha.kafka.api.Const;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;public class Consumer2 {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "module-group-id-2"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅主题consumer.subscribe(Collections.singletonList(Const.TOPIC_MODULE));System.err.println("consumer2 started.. ");try {while (true) {//    拉取结果集ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);String topic = partition.topic();int size = partitionRecords.size();System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));for (int i = 0; i< size; i++) {long offset = partitionRecords.get(i).offset() + 1;System.err.println(String.format("获取value: %s, 提交的 offset: %s", partitionRecords.get(i).value(), offset)); }}}            } finally {consumer.close();}}
}
import com.alibaba.fastjson.JSON;
import com.lvxiaosha.kafka.api.Const;
import com.lvxiaosha.kafka.api.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class Producer {public static void main(String[] args) {Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");props.put(ProducerConfig.CLIENT_ID_CONFIG, "module-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props); for(int i = 0 ; i < 10; i ++) {User user = new User();user.setId(i+"");user.setName("张三");producer.send(new ProducerRecord<>(Const.TOPIC_MODULE, JSON.toJSONString(user)));   }producer.close();}}

注:如果Consumer1和Consumer2里面消费者的消费者组配置(props.put(ConsumerConfig.GROUP_ID_CONFIG, "module-group-id-1");)都是一样的话,说明两个消费者隶属于同一个消费者组,相当于点对点模型,Consumer1和Consumer2这时会均摊的消费Topic的消息。反之,如果Consumer1和Consumer2里面消费者的消费者组配置不一样,相当于发布订阅模型,Consumer1和Consumer2这时会同时消费Topic里面的消息。

Kafka消费者必要参数方法(没有这些参数就启动不起来)

  • bootstrap.servers:用来指定连接Kafka集群所需的broker地址清单。
  • key.deserializer和value.deserizer:反序列化参数;
  • group.id:消费者所属消费组;
  • subscribe:消费主题订阅,支持集合/标准正则表达式;
  • assign:只订阅主题的某个分区。

Kafka消费者提交位移

主题订阅代码:

import com.lvxiaosha.kafka.api.Const;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;public class CoreConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "core-group");//    TODO : 使用手工方式提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//  对于consume消息的订阅 subscribe方法 :可以订阅一个 或者  多个 topic
//      consumer.subscribe(Collections.singletonList(Const.TOPIC_CORE));//  也可以支持正则表达式方式的订阅
//      consumer.subscribe(Pattern.compile("topic-.*"));//    可以指定订阅某个主题下的某一个 或者多个 partition
//      consumer.assign(Arrays.asList(new TopicPartition(Const.TOPIC_CORE, 0), new TopicPartition(Const.TOPIC_CORE, 2)));// 如何拉取主题下的所有partitionList<TopicPartition> tpList = new ArrayList<TopicPartition>();List<PartitionInfo> tpinfoList = consumer.partitionsFor(Const.TOPIC_CORE);for(PartitionInfo pi : tpinfoList) {System.err.println("主题:"+ pi.topic() +", 分区: " + pi.partition());tpList.add(new TopicPartition(pi.topic(), pi.partition()));}consumer.assign(tpList);System.err.println("core consumer started...");try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for(TopicPartition topicPartition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);String topic = topicPartition.topic();int size = partitionRecords.size();System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic, topicPartition.partition(), size));for(int i = 0; i < size; i++) {ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);String value = consumerRecord.value();long offset = consumerRecord.offset();long commitOffser = offset + 1;System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", value, offset, commitOffser));}}}         } finally {consumer.close();}}
}
import com.alibaba.fastjson.JSON;
import com.lvxiaosha.kafka.api.Const;
import com.lvxiaosha.kafka.api.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CoreProducer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");properties.put(ProducerConfig.CLIENT_ID_CONFIG, "core-producer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(properties);for(int i = 0; i <10; i ++) {User user = new User("00" + i, "张三");ProducerRecord<String, String> record = new ProducerRecord<String, String>(Const.TOPIC_CORE,JSON.toJSONString(user));producer.send(record);            }producer.close();}
}

手工提交代码:

import com.lvxiaosha.kafka.api.Const;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;public class CommitConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "core-group");//  使用手工方式提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//      properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//      properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);//  消费者默认每次拉取的位置:从什么位置开始拉取消息//   AUTO_OFFSET_RESET_CONFIG 有三种方式: "latest", "earliest", "none"// none//  latest  从一个分区的最后提交的offset开始拉取消息//   earliset    从最开始的起始位置拉取消息 0properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//   对于consume消息的订阅 subscribe方法 :可以订阅一个 或者  多个 topicconsumer.subscribe(Collections.singletonList(Const.TOPIC_CORE));System.err.println("core consumer started...");try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for(TopicPartition topicPartition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);String topic = topicPartition.topic();int size = partitionRecords.size();System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic, topicPartition.partition(), size));for(int i = 0; i < size; i++) {ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);String value = consumerRecord.value();long offset = consumerRecord.offset();long commitOffser = offset + 1;System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", value, offset, commitOffser));//   在一个partition内部,每一条消息记录 进行一一提交方式
//                      consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(commitOffser)));consumer.commitAsync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(commitOffser)),new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (null != exception) {System.err.println("error . 处理");}System.err.println("异步提交成功:" + offsets);}});}//    一个partition做一次提交动作}/**//    整体提交:同步方式
//              consumer.commitSync();//    整体提交:异步方式consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if(null != exception) {System.err.println("error . 处理");}System.err.println("异步提交成功:" + offsets);}});*/}         } finally {consumer.close();}}
}
import com.alibaba.fastjson.JSON;
import com.lvxiaosha.kafka.api.Const;
import com.lvxiaosha.kafka.api.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CommitProducer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");properties.put(ProducerConfig.CLIENT_ID_CONFIG, "core-producer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(properties);for(int i = 0; i <10; i ++) {User user = new User("00" + i, "张三");ProducerRecord<String, String> record = new ProducerRecord<String, String>(Const.TOPIC_CORE,JSON.toJSONString(user));producer.send(record);          }producer.close();}
}

谈一谈 Kafka 的再均衡?

在Kafka中,当有新消费者加入或者订阅的topic数发生变化时,会触发Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者)机制,Rebalance顾名思义就是重新均衡消费者消费。Rebalance的过程如下:

第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。所以对于Rebalance来说,Coordinator起着至关重要的作用。

Kafka再均衡监听代码:

import com.lvxiaosha.kafka.api.Const;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;public class RebalanceConsumer1 {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");//   GROUP_ID_CONFIG 消费者组配置props.put(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-group"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //   订阅主题consumer.subscribe(Collections.singletonList(Const.TOPIC_REBALANCE), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {System.err.println("Revoked Partitions:" + partitions);}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {System.err.println("AssignedAssigned Partitions:" + partitions);}});System.err.println("rebalance consumer1 started.. ");try {while (true) {//   拉取结果集ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);String topic = partition.topic();int size = partitionRecords.size();System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));for (int i = 0; i< size; i++) {long offset = partitionRecords.get(i).offset() + 1;System.err.println(String.format("获取value: %s, 提交的 offset: %s", partitionRecords.get(i).value(), offset)); }}}            } finally {consumer.close();}}
}
import com.lvxiaosha.kafka.api.Const;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;public class RebalanceConsumer2 {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-group"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //  订阅主题//  订阅主题consumer.subscribe(Collections.singletonList(Const.TOPIC_REBALANCE), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {System.err.println("Revoked Partitions:" + partitions);}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {System.err.println("AssignedAssigned Partitions:" + partitions);}});System.err.println("rebalance consumer2 started.. ");try {while (true) {//   拉取结果集ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);String topic = partition.topic();int size = partitionRecords.size();System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));for (int i = 0; i< size; i++) {long offset = partitionRecords.get(i).offset() + 1;System.err.println(String.format("获取value: %s, 提交的 offset: %s", partitionRecords.get(i).value(), offset)); }}}            } finally {consumer.close();}}
}
import com.alibaba.fastjson.JSON;
import com.lvxiaosha.kafka.api.Const;
import com.lvxiaosha.kafka.api.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class RebalanceProducer {public static void main(String[] args) {Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");props.put(ProducerConfig.CLIENT_ID_CONFIG, "rebalance-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props); for(int i = 0 ; i < 10; i ++) {User user = new User();user.setId(i+"");user.setName("张三");producer.send(new ProducerRecord<>(Const.TOPIC_REBALANCE, JSON.toJSONString(user)));    }producer.close();}}

Kafka消费者多线程

消费者多线程模型1代码实现:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;public class KafkaConsumerMt1 implements Runnable {private KafkaConsumer<String, String> consumer;private volatile boolean isRunning = true;private static AtomicInteger counter = new AtomicInteger(0);private String consumerName;public KafkaConsumerMt1(Properties properties, String topic) {this.consumer = new KafkaConsumer<>(properties);this.consumer.subscribe(Arrays.asList(topic));this.consumerName = "KafkaConsumerMt1-" + counter.getAndIncrement();System.err.println(this.consumerName + " started ");}@Overridepublic void run() {try {while(isRunning) {//  包含所有topic下的所有消息内容ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for(TopicPartition topicPartition : consumerRecords.partitions()) {
//                  String topic = topicPartition.topic();//   根据具体的topicPartition 去获取对应topicPartition下的数据集合List<ConsumerRecord<String, String>> partitionList = consumerRecords.records(topicPartition);int size = partitionList.size();for(int i = 0; i < size; i++) {ConsumerRecord<String, String> consumerRecord = partitionList.get(i);// do execute messageString message = consumerRecord.value();long messageOffset = consumerRecord.offset();System.err.println("当前消费者:"+ consumerName + ",消息内容:" + message + ", 消息的偏移量: " + messageOffset+ "当前线程:" + Thread.currentThread().getName());}}}} finally {if(consumer != null) {consumer.close();}}}public boolean isRunning() {return isRunning;}public void setRunning(boolean isRunning) {this.isRunning = isRunning;}}
import com.alibaba.fastjson.JSON;
import com.lvxiaosha.kafka.api.Const;
import com.lvxiaosha.kafka.api.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
/*** $Mt1Producer* @author hezhuo.bai* @since 2019年2月28日 下午12:38:46*/
public class Mt1Producer {public static void main(String[] args) {Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");props.put(ProducerConfig.CLIENT_ID_CONFIG, "mt1-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props); for(int i = 0 ; i < 10; i ++) {User user = new User();user.setId(i+"");user.setName("张三");producer.send(new ProducerRecord<>(Const.TOPIC_MT1, JSON.toJSONString(user)));  }producer.close();}}
import com.lvxiaosha.kafka.api.Const;
import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class Mt1Test {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "mt1-group"); //  自动提交的方式props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); String topic = Const.TOPIC_MT1;// coreSizeint coreSize = 5;ExecutorService executorService = Executors.newFixedThreadPool(coreSize);for(int i =0; i <5; i++) {executorService.execute(new KafkaConsumerMt1(props, topic));}}
}

采用Master-worker模型(master负责分发,worker负责处理)。master初始化阶段,创建多个worker实例,然后master负责拉取消息,并负责分发给各个worker执行。各个worker执行完成后,将执行结果回传给master,由master统一进行commit。

这样避免了多个线程共同操作consumer,导致kafka抛异常ConcurrentModifacationException

Master-worker模型

Kafka消费者重要参数

3、Kafka进阶提升-消费者相关推荐

  1. 深入分析Kafka生产者和消费者

    深入Kafka生产者和消费者 Kafka生产者 消息发送的流程 发送方式 发送并忘记 同步发送 异步发送 生产者属性配置 序列化器 分区器 自定义分区器 Kafka消费者 消费者属性配置 消费者基础概 ...

  2. Kafka生产者与消费者详解

    什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系 ...

  3. Kafka学习之消费者

    Kafka学习之消费者 前言 本博客主要介绍up在学习kafka中间件时候觉得需要记录的知识点. 内容 1.消费者与消费组 消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订 ...

  4. 关于Kafka 的 consumer 消费者手动提交详解

    前言 在上一篇 Kafka使用Java实现数据的生产和消费demo 中介绍如何简单的使用kafka进行数据传输.本篇则重点介绍kafka中的 consumer 消费者的讲解. 应用场景 在上一篇kaf ...

  5. [转载] Python Web开发—进阶提升 490集超强Python视频教程 真正零基础学习Python视频教程

    参考链接: 在Python中创建代理Web服务器 2 Python Web开发-进阶提升 490集超强Python视频教程 真正零基础学习Python视频教程 [课程简介] 这是一门Python We ...

  6. Kafka 多话题消费者

    卡夫卡多主题消费者 支持的管道类型: 数据收集器 卡夫卡多主题消费者源从阿帕奇卡集群中的多个主题读取数据.源可以使用多个线程来启用数据的并行处理.如果愿意,您可以使用 Kafka 使用者通过单个线程从 ...

  7. Kafka 生产者、消费者命令行操作

    Kafka 生产者.消费者命令行操作 1.查看操作生产者命令参数 bin/kafka-console-producer.sh 参数 --bootstrap-server <String: ser ...

  8. 会员积分营销系统操作的时候怎样提升消费者的积极性?

    当今商家和企业都在改变传统的营销方式,因为随着时代的发展,传统的营销方式已经达不到大家想要的效果.因此,为了让营销的效果更高,企业和商家开始采用新的营销方式.会员积分营销是目前商家和企业运用比较频繁的 ...

  9. 视频教程-HTML5进阶提升与案例开发视频课程-HTML5/CSS

    HTML5进阶提升与案例开发视频课程 中国实战派HTML5培训第一人,微软技术讲师,曾任百合网技术总监,博看文思HTML5总监.陶国荣长期致力于HTML5.JavaScript.CSS3.jQuery ...

最新文章

  1. 【VB】学生信息管理系统1——系统设计怎样开始?
  2. 32岁程序员面试被拒:比又穷又忙更可怕的,是2020年你还不懂...
  3. android开发字体样式,Android开发中修改程序字体的样式
  4. DC/DC变换器的典型拓扑
  5. Eclipse出现the type java.lang.CharSequence can't be resolved.
  6. wamp 配置 mysql_PHPWAMP配置应该如何修改,Web服务器、php、mysql的具体配置修改
  7. c# string 转 datetime_tesseract || PDF转PNG转txt
  8. 一些老程序员不错的经验分享
  9. 液压传动与气动技术【2】
  10. chrome双击突然打不开的解决办法
  11. Verilog练习:HDLBits笔记4
  12. copy-to-clipboard 复制
  13. 清华大学计算机崔勇,崔勇 简历 - 名人简历
  14. html5加载更多,HTML5[7]: 实现网页版的加载更多
  15. 科技大学计算机基础试卷答案,大学计算机基础期末考试试卷(带答案)
  16. OPENCV例子\samples\cpp\tutorial_code\ImgProc\changing_contrast_brigh的代码分析
  17. 有道科学计算机,网易有道超级计算器获App Store推荐 打造随身数学帮手
  18. 安全基线规范之Cisco核心交换机
  19. ASP.NET 母版页小实例(点击显示文本内容)
  20. 非梯度类启发式搜索算法:Nelder Mead

热门文章

  1. python threading setdaemon_Python setdaemon守护进程
  2. SVM 用于将数据分类为两分类或多分类(Matlab代码实现)
  3. C++面向对象编程思想
  4. Win32汇编:算数运算指令总结
  5. spring注入对象的几种方式
  6. 解读Android之数据存储方案
  7. Idea 设置代码提示忽略大小写
  8. 红玫瑰调成蓝玫瑰ps变色换装更改颜色#ps教程#ps学习
  9. 基于akka的flink RPC调用
  10. 拼多多和国家电网,选哪个?