3、Kafka进阶提升-消费者
消费者与消费者组代码:
import com.lvxiaosha.kafka.api.Const;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;public class Consumer1 {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");// GROUP_ID_CONFIG 消费者组配置props.put(ConsumerConfig.GROUP_ID_CONFIG, "module-group-id-1"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅主题consumer.subscribe(Collections.singletonList(Const.TOPIC_MODULE));System.err.println("consumer1 started.. ");try {while (true) {// 拉取结果集ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);String topic = partition.topic();int size = partitionRecords.size();System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));for (int i = 0; i< size; i++) {long offset = partitionRecords.get(i).offset() + 1;System.err.println(String.format("获取value: %s, 提交的 offset: %s", partitionRecords.get(i).value(), offset)); }}} } finally {consumer.close();}}
}
import com.lvxiaosha.kafka.api.Const;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;public class Consumer2 {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "module-group-id-2"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅主题consumer.subscribe(Collections.singletonList(Const.TOPIC_MODULE));System.err.println("consumer2 started.. ");try {while (true) {// 拉取结果集ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);String topic = partition.topic();int size = partitionRecords.size();System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));for (int i = 0; i< size; i++) {long offset = partitionRecords.get(i).offset() + 1;System.err.println(String.format("获取value: %s, 提交的 offset: %s", partitionRecords.get(i).value(), offset)); }}} } finally {consumer.close();}}
}
import com.alibaba.fastjson.JSON;
import com.lvxiaosha.kafka.api.Const;
import com.lvxiaosha.kafka.api.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class Producer {public static void main(String[] args) {Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");props.put(ProducerConfig.CLIENT_ID_CONFIG, "module-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props); for(int i = 0 ; i < 10; i ++) {User user = new User();user.setId(i+"");user.setName("张三");producer.send(new ProducerRecord<>(Const.TOPIC_MODULE, JSON.toJSONString(user))); }producer.close();}}
注:如果Consumer1和Consumer2里面消费者的消费者组配置(props.put(ConsumerConfig.GROUP_ID_CONFIG, "module-group-id-1");)都是一样的话,说明两个消费者隶属于同一个消费者组,相当于点对点模型,Consumer1和Consumer2这时会均摊的消费Topic的消息。反之,如果Consumer1和Consumer2里面消费者的消费者组配置不一样,相当于发布订阅模型,Consumer1和Consumer2这时会同时消费Topic里面的消息。
Kafka消费者必要参数方法(没有这些参数就启动不起来)
- bootstrap.servers:用来指定连接Kafka集群所需的broker地址清单。
- key.deserializer和value.deserizer:反序列化参数;
- group.id:消费者所属消费组;
- subscribe:消费主题订阅,支持集合/标准正则表达式;
- assign:只订阅主题的某个分区。
Kafka消费者提交位移
主题订阅代码:
import com.lvxiaosha.kafka.api.Const;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;public class CoreConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "core-group");// TODO : 使用手工方式提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 对于consume消息的订阅 subscribe方法 :可以订阅一个 或者 多个 topic
// consumer.subscribe(Collections.singletonList(Const.TOPIC_CORE));// 也可以支持正则表达式方式的订阅
// consumer.subscribe(Pattern.compile("topic-.*"));// 可以指定订阅某个主题下的某一个 或者多个 partition
// consumer.assign(Arrays.asList(new TopicPartition(Const.TOPIC_CORE, 0), new TopicPartition(Const.TOPIC_CORE, 2)));// 如何拉取主题下的所有partitionList<TopicPartition> tpList = new ArrayList<TopicPartition>();List<PartitionInfo> tpinfoList = consumer.partitionsFor(Const.TOPIC_CORE);for(PartitionInfo pi : tpinfoList) {System.err.println("主题:"+ pi.topic() +", 分区: " + pi.partition());tpList.add(new TopicPartition(pi.topic(), pi.partition()));}consumer.assign(tpList);System.err.println("core consumer started...");try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for(TopicPartition topicPartition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);String topic = topicPartition.topic();int size = partitionRecords.size();System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic, topicPartition.partition(), size));for(int i = 0; i < size; i++) {ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);String value = consumerRecord.value();long offset = consumerRecord.offset();long commitOffser = offset + 1;System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", value, offset, commitOffser));}}} } finally {consumer.close();}}
}
import com.alibaba.fastjson.JSON;
import com.lvxiaosha.kafka.api.Const;
import com.lvxiaosha.kafka.api.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CoreProducer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");properties.put(ProducerConfig.CLIENT_ID_CONFIG, "core-producer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(properties);for(int i = 0; i <10; i ++) {User user = new User("00" + i, "张三");ProducerRecord<String, String> record = new ProducerRecord<String, String>(Const.TOPIC_CORE,JSON.toJSONString(user));producer.send(record); }producer.close();}
}
手工提交代码:
import com.lvxiaosha.kafka.api.Const;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;public class CommitConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "core-group");// 使用手工方式提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);// 消费者默认每次拉取的位置:从什么位置开始拉取消息// AUTO_OFFSET_RESET_CONFIG 有三种方式: "latest", "earliest", "none"// none// latest 从一个分区的最后提交的offset开始拉取消息// earliset 从最开始的起始位置拉取消息 0properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 对于consume消息的订阅 subscribe方法 :可以订阅一个 或者 多个 topicconsumer.subscribe(Collections.singletonList(Const.TOPIC_CORE));System.err.println("core consumer started...");try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for(TopicPartition topicPartition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);String topic = topicPartition.topic();int size = partitionRecords.size();System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s", topic, topicPartition.partition(), size));for(int i = 0; i < size; i++) {ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);String value = consumerRecord.value();long offset = consumerRecord.offset();long commitOffser = offset + 1;System.err.println(String.format("获取实际消息 value:%s, 消息offset: %s, 提交offset: %s", value, offset, commitOffser));// 在一个partition内部,每一条消息记录 进行一一提交方式
// consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(commitOffser)));consumer.commitAsync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(commitOffser)),new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (null != exception) {System.err.println("error . 处理");}System.err.println("异步提交成功:" + offsets);}});}// 一个partition做一次提交动作}/**// 整体提交:同步方式
// consumer.commitSync();// 整体提交:异步方式consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if(null != exception) {System.err.println("error . 处理");}System.err.println("异步提交成功:" + offsets);}});*/} } finally {consumer.close();}}
}
import com.alibaba.fastjson.JSON;
import com.lvxiaosha.kafka.api.Const;
import com.lvxiaosha.kafka.api.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CommitProducer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");properties.put(ProducerConfig.CLIENT_ID_CONFIG, "core-producer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(properties);for(int i = 0; i <10; i ++) {User user = new User("00" + i, "张三");ProducerRecord<String, String> record = new ProducerRecord<String, String>(Const.TOPIC_CORE,JSON.toJSONString(user));producer.send(record); }producer.close();}
}
谈一谈 Kafka 的再均衡?
在Kafka中,当有新消费者加入或者订阅的topic数发生变化时,会触发Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者)机制,Rebalance顾名思义就是重新均衡消费者消费。Rebalance的过程如下:
第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。所以对于Rebalance来说,Coordinator起着至关重要的作用。
Kafka再均衡监听代码:
import com.lvxiaosha.kafka.api.Const;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;public class RebalanceConsumer1 {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");// GROUP_ID_CONFIG 消费者组配置props.put(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-group"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅主题consumer.subscribe(Collections.singletonList(Const.TOPIC_REBALANCE), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {System.err.println("Revoked Partitions:" + partitions);}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {System.err.println("AssignedAssigned Partitions:" + partitions);}});System.err.println("rebalance consumer1 started.. ");try {while (true) {// 拉取结果集ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);String topic = partition.topic();int size = partitionRecords.size();System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));for (int i = 0; i< size; i++) {long offset = partitionRecords.get(i).offset() + 1;System.err.println(String.format("获取value: %s, 提交的 offset: %s", partitionRecords.get(i).value(), offset)); }}} } finally {consumer.close();}}
}
import com.lvxiaosha.kafka.api.Const;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;public class RebalanceConsumer2 {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-group"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅主题// 订阅主题consumer.subscribe(Collections.singletonList(Const.TOPIC_REBALANCE), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {System.err.println("Revoked Partitions:" + partitions);}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {System.err.println("AssignedAssigned Partitions:" + partitions);}});System.err.println("rebalance consumer2 started.. ");try {while (true) {// 拉取结果集ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);String topic = partition.topic();int size = partitionRecords.size();System.err.println(String.format("---- 获取topic: %s, 分区位置:%s, 消息数为:%s ----",topic, partition.partition(), size));for (int i = 0; i< size; i++) {long offset = partitionRecords.get(i).offset() + 1;System.err.println(String.format("获取value: %s, 提交的 offset: %s", partitionRecords.get(i).value(), offset)); }}} } finally {consumer.close();}}
}
import com.alibaba.fastjson.JSON;
import com.lvxiaosha.kafka.api.Const;
import com.lvxiaosha.kafka.api.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class RebalanceProducer {public static void main(String[] args) {Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");props.put(ProducerConfig.CLIENT_ID_CONFIG, "rebalance-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props); for(int i = 0 ; i < 10; i ++) {User user = new User();user.setId(i+"");user.setName("张三");producer.send(new ProducerRecord<>(Const.TOPIC_REBALANCE, JSON.toJSONString(user))); }producer.close();}}
Kafka消费者多线程
消费者多线程模型1代码实现:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;public class KafkaConsumerMt1 implements Runnable {private KafkaConsumer<String, String> consumer;private volatile boolean isRunning = true;private static AtomicInteger counter = new AtomicInteger(0);private String consumerName;public KafkaConsumerMt1(Properties properties, String topic) {this.consumer = new KafkaConsumer<>(properties);this.consumer.subscribe(Arrays.asList(topic));this.consumerName = "KafkaConsumerMt1-" + counter.getAndIncrement();System.err.println(this.consumerName + " started ");}@Overridepublic void run() {try {while(isRunning) {// 包含所有topic下的所有消息内容ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));for(TopicPartition topicPartition : consumerRecords.partitions()) {
// String topic = topicPartition.topic();// 根据具体的topicPartition 去获取对应topicPartition下的数据集合List<ConsumerRecord<String, String>> partitionList = consumerRecords.records(topicPartition);int size = partitionList.size();for(int i = 0; i < size; i++) {ConsumerRecord<String, String> consumerRecord = partitionList.get(i);// do execute messageString message = consumerRecord.value();long messageOffset = consumerRecord.offset();System.err.println("当前消费者:"+ consumerName + ",消息内容:" + message + ", 消息的偏移量: " + messageOffset+ "当前线程:" + Thread.currentThread().getName());}}}} finally {if(consumer != null) {consumer.close();}}}public boolean isRunning() {return isRunning;}public void setRunning(boolean isRunning) {this.isRunning = isRunning;}}
import com.alibaba.fastjson.JSON;
import com.lvxiaosha.kafka.api.Const;
import com.lvxiaosha.kafka.api.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
/*** $Mt1Producer* @author hezhuo.bai* @since 2019年2月28日 下午12:38:46*/
public class Mt1Producer {public static void main(String[] args) {Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");props.put(ProducerConfig.CLIENT_ID_CONFIG, "mt1-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props); for(int i = 0 ; i < 10; i ++) {User user = new User();user.setId(i+"");user.setName("张三");producer.send(new ProducerRecord<>(Const.TOPIC_MT1, JSON.toJSONString(user))); }producer.close();}}
import com.lvxiaosha.kafka.api.Const;
import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class Mt1Test {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.130:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "mt1-group"); // 自动提交的方式props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); String topic = Const.TOPIC_MT1;// coreSizeint coreSize = 5;ExecutorService executorService = Executors.newFixedThreadPool(coreSize);for(int i =0; i <5; i++) {executorService.execute(new KafkaConsumerMt1(props, topic));}}
}
采用Master-worker模型(master负责分发,worker负责处理)。master初始化阶段,创建多个worker实例,然后master负责拉取消息,并负责分发给各个worker执行。各个worker执行完成后,将执行结果回传给master,由master统一进行commit。
这样避免了多个线程共同操作consumer,导致kafka抛异常ConcurrentModifacationException
Master-worker模型
Kafka消费者重要参数
3、Kafka进阶提升-消费者相关推荐
- 深入分析Kafka生产者和消费者
深入Kafka生产者和消费者 Kafka生产者 消息发送的流程 发送方式 发送并忘记 同步发送 异步发送 生产者属性配置 序列化器 分区器 自定义分区器 Kafka消费者 消费者属性配置 消费者基础概 ...
- Kafka生产者与消费者详解
什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系 ...
- Kafka学习之消费者
Kafka学习之消费者 前言 本博客主要介绍up在学习kafka中间件时候觉得需要记录的知识点. 内容 1.消费者与消费组 消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订 ...
- 关于Kafka 的 consumer 消费者手动提交详解
前言 在上一篇 Kafka使用Java实现数据的生产和消费demo 中介绍如何简单的使用kafka进行数据传输.本篇则重点介绍kafka中的 consumer 消费者的讲解. 应用场景 在上一篇kaf ...
- [转载] Python Web开发—进阶提升 490集超强Python视频教程 真正零基础学习Python视频教程
参考链接: 在Python中创建代理Web服务器 2 Python Web开发-进阶提升 490集超强Python视频教程 真正零基础学习Python视频教程 [课程简介] 这是一门Python We ...
- Kafka 多话题消费者
卡夫卡多主题消费者 支持的管道类型: 数据收集器 卡夫卡多主题消费者源从阿帕奇卡集群中的多个主题读取数据.源可以使用多个线程来启用数据的并行处理.如果愿意,您可以使用 Kafka 使用者通过单个线程从 ...
- Kafka 生产者、消费者命令行操作
Kafka 生产者.消费者命令行操作 1.查看操作生产者命令参数 bin/kafka-console-producer.sh 参数 --bootstrap-server <String: ser ...
- 会员积分营销系统操作的时候怎样提升消费者的积极性?
当今商家和企业都在改变传统的营销方式,因为随着时代的发展,传统的营销方式已经达不到大家想要的效果.因此,为了让营销的效果更高,企业和商家开始采用新的营销方式.会员积分营销是目前商家和企业运用比较频繁的 ...
- 视频教程-HTML5进阶提升与案例开发视频课程-HTML5/CSS
HTML5进阶提升与案例开发视频课程 中国实战派HTML5培训第一人,微软技术讲师,曾任百合网技术总监,博看文思HTML5总监.陶国荣长期致力于HTML5.JavaScript.CSS3.jQuery ...
最新文章
- 【VB】学生信息管理系统1——系统设计怎样开始?
- 32岁程序员面试被拒:比又穷又忙更可怕的,是2020年你还不懂...
- android开发字体样式,Android开发中修改程序字体的样式
- DC/DC变换器的典型拓扑
- Eclipse出现the type java.lang.CharSequence can't be resolved.
- wamp 配置 mysql_PHPWAMP配置应该如何修改,Web服务器、php、mysql的具体配置修改
- c# string 转 datetime_tesseract || PDF转PNG转txt
- 一些老程序员不错的经验分享
- 液压传动与气动技术【2】
- chrome双击突然打不开的解决办法
- Verilog练习:HDLBits笔记4
- copy-to-clipboard 复制
- 清华大学计算机崔勇,崔勇 简历 - 名人简历
- html5加载更多,HTML5[7]: 实现网页版的加载更多
- 科技大学计算机基础试卷答案,大学计算机基础期末考试试卷(带答案)
- OPENCV例子\samples\cpp\tutorial_code\ImgProc\changing_contrast_brigh的代码分析
- 有道科学计算机,网易有道超级计算器获App Store推荐 打造随身数学帮手
- 安全基线规范之Cisco核心交换机
- ASP.NET 母版页小实例(点击显示文本内容)
- 非梯度类启发式搜索算法:Nelder Mead