一 kafka集群的启动

1.1 机器说明

192.168.152.128 master
192.168.152.129 slaver01
192.168.152.130 slaver02

1.2 查看防火墙

如果防火墙开着,则进行关闭:systemctl stop firewalld

1.在master节点查看防火墙状态:systemctl status firewalld

可以看到: Active: inactive (dead) 则表示防火墙已经关闭

[root@localhost ~]# systemctl status firewalld
● firewalld.service - firewalld - dynamic firewall daemonLoaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled)Active: inactive (dead)Docs: man:firewalld(1)

2.在slaver01节点查看防火墙状态:systemctl status firewalld

[root@localhost ~]# systemctl status firewalld
● firewalld.service - firewalld - dynamic firewall daemonLoaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled)Active: inactive (dead)Docs: man:firewalld(1)

3.在slaver02节点查看防火墙的状态:systemctl status firewalld

[root@localhost ~]# systemctl status firewalld
● firewalld.service - firewalld - dynamic firewall daemonLoaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled)Active: inactive (dead)Docs: man:firewalld(1)

1.3 启动zk

kafka集群依赖zk,所以需要先启动zk。

1.master节点启动zk

2.slaver01节点启动zk

3.slaver02节点启动zk

1.4 启动kafka

1.在master节点启动kafka:进入kafka的安装目录:/root/export/servers/kafka_2.11-0.10.0.0  执行命令:nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

2.在slaver02节点启动kafka:进入kafka的安装目录:/root/export/servers/kafka_2.11-0.10.0.0  执行命令:nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

3.在slaver03节点启动kafka, 进入kafka的安装目录:/root/export/servers/kafka_2.11-0.10.0.0  执行命令:nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

ok,到此,kafka集群,已经启动完成,接下来需要写程序来进行模拟生产者发送数据,消费者消费数据的过程。

1.6 kafka常用命令

#查看消费者数据
bin/kafka-console-consumer.sh --bootstrap-server 10.56.9.60:9092,10.56.9.61:9092,10.56.9.62:9092 --topic alarm-fire --from-beginning
#删除主题bin/kafka-topics.sh --zookeeper 10.56.9.60:2181 --delete --topic alarm-fire

二 springboot整合kafka

2.1 工程结构

2.1.1 工程总体思路

 ①  生产者环境类配置好以后,@Autowired自动注入KafkaTemplate类,使用send方法生产消息

         ②  消费者环境类配置好以后,方法头前使用@KafkaListener(topics = {"${kafka.consumer.topic}"})注解监听topic并传入ConsumerRecord<?, ?> record对象即可自动消费topic

2.2 公共部分

2.2.1 pom文件的编写

    <!--springboot 启动 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--springboot web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--kafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><!--<version>2.2.0.RELEASE</version>--></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version><scope>provided</scope></dependency>

2. 2.2  model层Javabean编写

package com.ljf.spring.boot.demo.model;import java.util.Date;/*** @ClassName: User* @Description: TODO* @Author: liujianfu* @Date: 2021/04/03 12:40:11 * @Version: V1.0**/
public class User {private  String name="beijing";private String message;private Date sendTime;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}public Date getSendTime() {return sendTime;}public void setSendTime(Date sendTime) {this.sendTime = sendTime;}@Overridepublic String toString() {return "User{" +"name='" + name + '\'' +", message='" + message + '\'' +", sendTime=" + sendTime +'}';}
}

2.3 resouces下application配置文件生产者和消费者的配置

2.3.1 生产者配置

##################################################################kafka的producer相关配置#####################################################
# 指定kafka server的地址,集群配多个,中间,逗号隔开
kafka.producer.bootstrap.servers=192.168.152.128:9092,192.168.152.129:9092,192.168.152.130:9092
##设置topic主题
kafka.producer.topic=alarm-fire
# 写入失败时,重试次数。当leader节点失效,一个replicate节点会替代成为leader节点,此时可能出现写入失败,当retris为0时,produce不会重复。retirs重发,此时replicate节点完全成为leader节点,不会产生消息丢失。
kafka.producer.retries=0
#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
kafka.producer.acks=all
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
kafka.producer.batch.size=16384
kafka.producer.linger=1
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
kafka.producer.buffer.memory=33554432

注:

bootstrap.servers:kafka server的地址
acks:写入kafka时,leader负责一个该partion读写,当写入partition时,需要将记录同步到repli节点,all是全部同步节点都返回成功,leader才返回ack。
retris:写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
batch.size:produce积累到一定数据,一次发送。
buffer.memory:produce积累数据一次发送,缓存大小达到buffer.memory就发送数据。
linger.ms:  同时设置batch.size和 linger.ms,就是哪个条件先满足就都会将消息发送出去,Kafka需要考虑高吞吐量延时的平衡.。上面比如我们设置batch size为32KB,但是比如有的时刻消息比较少,过了很久,比如5min也没有凑够32KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,即使数据没达到32KB,也将这个批次发送出去. 比如设置5ms,就是到了5ms,大小没到32KB,也会发出去。
原文链接:https://blog.csdn.net/u010711495/article/details/113178943
key/value serializer:序列化类。

2.3.2 消费者配置

##########################################################kafka的consumer相关配置#######################################################
#kafka.consumer.zookeeper.connect=IP:PORT
# 指定kafka server的地址,集群配多个,中间,逗号隔开
kafka.consumer.bootstrap.servers=192.168.152.128:9092,192.168.152.129:9092,192.168.152.130:9092
# enable.auto.commit:true --> 设置自动提交offset
kafka.consumer.enable.auto.commit=false
#设置session超时时间
kafka.consumer.session.timeout=6000
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
kafka.consumer.auto.commit.interval=100
# earliest
#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#latest
#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据:即:实时生产,实时消费,不会从头开始消费
#none
#topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
kafka.consumer.auto.offset.reset=latest
#设置主题名称
kafka.consumer.topic=alarm-fire
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
kafka.consumer.group.id=testGroup
#设置消费线程数
kafka.consumer.concurrency=10
# 指定消息key和消息体的编解码方式
kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

2.4 模拟生产者

建立kafka producer的步骤为:

1.通过@Configuration、@EnableKafka注解,声明Config并且打开KafkaTemplate能力。后面就是通过kafkaTemplate来操作发送数据。

2.通过@Value注解,注入application.properties配置文件中的kafka配置。

3.使用@Bean注解,生成bean对象。

2.4.1 生产者的注册配置

package com.ljf.spring.boot.demo.config;import com.ljf.spring.boot.demo.producer.KafkaProducerListener;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.ProducerListener;import java.util.HashMap;
import java.util.Map;/*** @ClassName: KafkaInfoConfig* @Description:* 建立kafka producer的步骤为:1.通过@Configuration、@EnableKafka注解,声明Config并且打开KafkaTemplate能力。2.通过@Value注解,注入application.properties配置文件中的kafka配置。3.使用@Bean注解,生成bean对象。* @Author: liujianfu* @Date: 2021/04/01 20:42:30 * @Version: V1.0**/
@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${kafka.producer.bootstrap.servers}")private String servers;@Value("${kafka.producer.topic}")private String topic;@Value("${kafka.producer.retries}")private int retries;@Value("${kafka.producer.acks}")private String acks;@Value("${kafka.producer.batch.size}")private int batchSize;@Value("${kafka.producer.linger}")private int linger;@Value("${kafka.producer.buffer.memory}")private int bufferMemory;/*** @author liujianfu* @description       加载配置信息* @date 2021/4/1 21:29* @param []* @return java.util.Map<java.lang.String,java.lang.Object>*/public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);props.put(ProducerConfig.LINGER_MS_CONFIG, linger);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);props.put(ProducerConfig.ACKS_CONFIG, acks);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}/*** @author liujianfu* @description      使用producer配置项对象来构建producerFactory* @date 2021/4/1 21:41* @param []* @return org.springframework.kafka.core.ProducerFactory<java.lang.String,java.lang.String>*/public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}/*** @author liujianfu* @description       使用producerFactory来构建kafkaTemplate的bean,实例化一个KafkaTemplate对象* @date 2021/4/1 21:42* @param []* @return org.springframework.kafka.core.KafkaTemplate<java.lang.String,java.lang.String>*/@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {KafkaTemplate<String, String> template= new KafkaTemplate<String, String>(producerFactory());template.setProducerListener(kafkaProducerListener());return template;}/*** @author liujianfu* @description      生产者的监听器* @date 2021/4/3 19:13* @param []* @return org.springframework.kafka.support.ProducerListener*/@Beanpublic KafkaProducerListener kafkaProducerListener(){KafkaProducerListener listener = new KafkaProducerListener();return listener;}}

设置监听器的第一种方式:

2.4.2 生产者的监听器

代码如下:

package com.ljf.spring.boot.demo.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;/*** @ClassName: KafkaProducerListener* @Description: TODO* @Author: liujianfu* @Date: 2021/04/03 18:41:39 * @Version: V1.0**/
@Component
@Slf4j
public class KafkaProducerListener implements ProducerListener<String, String> {protected final Logger logger = LoggerFactory.getLogger(this.getClass());/*** @author liujianfu* @description   成功回调* @date 2021/4/3 18:45* @param [s, integer, s2, s3, recordMetadata]* @return void*/@Overridepublic void onSuccess(String s, Integer integer, String s2, String s3, RecordMetadata recordMetadata) {logger.info("!!!!!!!!!!!!!!!!!!!!!!!我是kafka的发送者的监听者,推送成功,推送数据:" + recordMetadata.serializedValueSize()+" s3:"+s3);}/*** @author liujianfu* @description       失败回调* @date 2021/4/3 18:47* @param [s, integer, s2, s3, e]* @return void*/@Overridepublic void onError(String s, Integer integer, String s2, String s3, Exception e) {System.out.println("!!!!!!!!!!!!!!!!!!我是kafka的发送者的监听者,推送失败,推送数据:" + s3+ ",失败原因:" + e.getMessage());}@Overridepublic boolean isInterestedInSuccess() {logger.info("!!!!!!!!!!!! logger我是kafka的发送者的监听者,数据发送完毕!!!!!");return true;}
}

2.4.3 模拟生产者

package com.ljf.spring.boot.demo.controller;import com.google.gson.Gson;
import com.ljf.spring.boot.demo.common.ResponseResult;
import com.ljf.spring.boot.demo.model.User;
import com.ljf.spring.boot.demo.producer.KafkaProducerListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Date;/*** @ClassName: ProducerController* @Description: kafka消息的发送者* @Author: liujianfu* @Date: 2021/04/03 12:36:28 * @Version: V1.0**/@RestController
@RequestMapping("/kafka")
public class ProducerController {protected final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate KafkaTemplate kafkaTemplate;//在kafakaproducerconfig配置文件中已经初始化了,这里直接拿来使用//@Autowired// private KafkaProducerListener producerListener;//发送者的监听器@Value("${kafka.producer.topic}")private String topicName;@RequestMapping(value = "/send", method = RequestMethod.GET)public Object sendKafka(HttpServletRequest request, HttpServletResponse response) {String sendJson="";try {String message = request.getParameter("message");logger.info("kafka的消息={}", message);Gson gson=new Gson();User u=new User();u.setMessage(message);u.setSendTime(new Date());sendJson= gson.toJson(u);logger.info("kafka的json报文为:={}", message);// kafkaTemplate.setProducerListener(producerListener);//设置发送者的监听器kafkaTemplate.send(topicName, "keyTest", sendJson);logger.info("发送kafka成功.");return ResponseResult.ok(sendJson);} catch (Exception e) {logger.error("发送kafka失败", e);return ResponseResult.ok(sendJson);}}}

第二种注册生产者的监听器:

2.5 模拟消费者

建立kafka consumer的步骤为:

1.通过@Configuration、@EnableKafka注解,声明Config并且打开KafkaTemplate能力。

2.通过@Value注解,注入application.properties配置文件中的kafka配置。

3.使用@Bean注解,生成bean对象。

2.5.1 消费者注册组件

package com.ljf.spring.boot.demo.config;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;/*** @ClassName: KafkaConsumerConfig* @Description: TODO* @Author: liujianfu* @Date: 2021/04/03 16:11:00 * @Version: V1.0**/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${kafka.consumer.bootstrap.servers}")private String servers;@Value("${kafka.consumer.enable.auto.commit}")private boolean enableAutoCommit;@Value("${kafka.consumer.session.timeout}")private String sessionTimeout;@Value("${kafka.consumer.auto.commit.interval}")private String autoCommitInterval;@Value("${kafka.consumer.group.id}")private String groupId;@Value("${kafka.consumer.auto.offset.reset}")private String autoOffsetReset;@Value("${kafka.consumer.concurrency}")private int concurrency;// 1.构建consumer配置项对象public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);return propsMap;}// 2.使用consumer配置项对象来构建consumerFactorypublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}// 3.使用consumerFactory来构建kafkaListenerContainerFactory@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(concurrency);factory.getContainerProperties().setPollTimeout(1500);return factory;}}

2.5.2  实时监听消费

在完成KafkaConsumerConfig配置后,构建一个监听指定kafka topic的component组件,即可对消息进行获取。

@KafkaListener注解中topics属性用于指定kafka topic名称,topic名称由消息生产者指定,也就是由kafkaTemplate在发送消息时指定。

@KafkaListener注解中containerFactory属性用于指定KafkaListenerContainerFactory名称,也是就是KafkaConsumerConfig中Kafka监听器容器工厂bean的名称。@bean不指定名称,默认就是放回类的名字且首字母小写

package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @ClassName: KafkaConsumer* @Description: kafka的消费监听器* @Author: liujianfu* @Date: 2021/04/03 16:14:44 * @Version: V1.0**/
@Component
public class KafkaConsumer {protected final Logger logger = LoggerFactory.getLogger(this.getClass());// @KafkaListener(topics = {"${kafka.consumer.topic}"}, containerFactory = "KafkaListenerContainerFactory")//kafkaListenerContainerFactory@KafkaListener(topics = {"${kafka.consumer.topic}"}, containerFactory = "kafkaListenerContainerFactory")public void listen(ConsumerRecord<?, ?> record) {logger.info("=============我是kafka的消费者的监听器,正在监听kafka的消费信息,kafka的key: " + record.key());logger.info("=============我是kafka的消费者的监听器,正在监听kafka的消费信息,kafka的value: " + record.value().toString());}
}

图中标红的就是上图实例化返回:KafkaListenerContainerFactory这个类,且首字母小写。

注意:在定义监听消息配置时,GROUP_ID_CONFIG配置项用于指定消费者组的名称。如果存在组名相同的多个监听器对象,则只有一个监听器对象能收到消息。

2.6 开始测试

2.6.1 生产者发送数据

2.6.2 查看生产者日志

2.6.3 查看生产者的监听器

2.6.4 消费者监听消费

2.6.5 查看消费者的信息

[root@localhost kafka_2.11-0.10.0.0]# bin/kafka-console-consumer.sh --from-beginning --topic alarm-fire  --zookeeper 192.168.152.128:2181,192.168.152.129:2181,192.168.152.130:2181
{"name":"beijing","message":"我在美丽的新疆","sendTime":"Apr 3, 2021 6:14:52 PM"}
{"name":"beijing","message":"我在北京,beijing a beautifual city!","sendTime":"Apr 3, 2021 6:55:02 PM"}
{"name":"beijing","message":"我在上海,shanghai shi guoji大都市123!","sendTime":"Apr 3, 2021 7:03:24 PM"}
{"name":"beijing","message":"我是程序员,today is onduty!","sendTime":"Apr 3, 2021 7:08:55 PM"}
{"name":"beijing","message":"我是程序员,today is onduty!","sendTime":"Apr 3, 2021 7:15:53 PM"}
{"name":"beijing","message":"我是程序员,today is onduty!","sendTime":"Apr 3, 2021 8:02:08 PM"}
{"name":"beijing","message":"我是程序员,today is onduty!","sendTime":"Apr 3, 2021 8:19:25 PM"}
{"name":"beijing","message":"我是程序员,today is onduty!","sendTime":"Apr 3, 2021 8:28:13 PM"}
{"name":"beijing","message":"我是程序员,today is onduty!","sendTime":"Apr 3, 2021 8:37:38 PM"}
{"name":"beijing","message":"我是程序员,today is onduty!","sendTime":"Apr 3, 2021 8:46:33 PM"}
{"name":"beijing","message":"我是程序员,today is onduty!","sendTime":"Apr 3, 2021 8:54:00 PM"}
{"name":"beijing","message":"我是程序员,today is onduty!","sendTime":"Apr 3, 2021 8:57:47 PM"}
{"name":"beijing","message":"我是程序员,today is onduty!","sendTime":"Apr 3, 2021 9:10:12 PM"}
{"name":"beijing","message":"今天是清明节,缅怀先人!","sendTime":"Apr 4, 2021 12:28:48 AM"}

注意:kafka版本不同,查看消费者的命令不同

#kafka的版本:kafka_2.11-0.10.0.0
 bin/kafka-console-consumer.sh --from-beginning --topic alarm-fire  --zookeeper 192.168.152.128:2181,192.168.152.129:2181,192.168.152.130:2181
 #kafka的版本:kafka_2.12-2.1.0   kafka_2.13-2.7.0
 bin/kafka-console-consumer.sh --bootstrap-server 192.168.152.128:9092,192.168.152.129:9092,192.168.152.130:9092 --topic alarm-fire --from-beginning

参考文献:

1.https://evernote.blog.csdn.net/article/details/113004582

2.https://blog.csdn.net/xibei19921101/article/details/106541016/

3.https://blog.csdn.net/H900302/article/details/109845069

4.https://blog.csdn.net/Lv_1093964643/article/details/83177280

springboot 整合kafka 实现生产,消费数据相关推荐

  1. springboot整合kafka实现批量消费

    linux安装kafka:https://blog.csdn.net/qq_37936542/article/details/109453249 kafka版本:kafka_2.12-2.6.0.tg ...

  2. Kafka精品教学(入门,安装,Springboot整合Kafka)

    ps:本文是博主结合视频和博客学习之后,自己实验总结编写的,如果侵权请联系删除. 要学习kafka首先要了解什么是消息队列,因为Kafka 是一个分布式的基于发布 / 订阅模式的消息队列(Messag ...

  3. kafka 安装使用 /springboot整合kafka /消息投递机制以及存储策略 /副本处理机制

    一.背景 1.基本信息 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写.Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流 ...

  4. springboot 整合 kafka demo 顺便看一下源码

    大家好,我是烤鸭: 今天分享下 springboot 整合 kafka. 1.  环境参数: windows + kafka_2.11-2.3.0 + zookeeper-3.5.6 + spring ...

  5. SpringBoot整合kafka实战之带回调的生产者

    本文来说下SpringBoot整合kafka部分知识内容 文章目录 带回调的生产者 方式一 方式二 本文小结 带回调的生产者 前面我们说了简单的生产和消费,本文说下带回调的生产者.kafkaTempl ...

  6. Kafka原理以及SpringBoot整合Kafka

    1.Kafka原理 1. brokers有多个broker组成,broker是指Kafka服务器(192.168.223.140就是其中的一个broker),上面三台Kafka服务器组成了Kafka集 ...

  7. kafka 查看待消费数据_kafka查看消费数据

    一.如何查看 在老版本中,使用kafka-run-class.sh 脚本进行查看.但是对于最新版本,kafka-run-class.sh 已经不能使用,必须使用另外一个脚本才行,它就是kafka-co ...

  8. SpringBoot整合kafka(实现producer和consumer)

    转载自 SpringBoot整合kafka(实现producer和consumer) 在Windows环境下安装运行Kafka:https://www.jianshu.com/p/d64798e81f ...

  9. SpringBoot整合kafka之kafka分区实战

    本文来说下SpringBoot整合kafka之kafka分区实战 文章目录 准备工作 程序代码 程序测试 本文小结 准备工作 当然我们也可以不手动创建topic,在执行代码kafkaTemplate. ...

最新文章

  1. Cocos Creator 音频文件Audio的绑定与使用(TypeScript)
  2. PacBio SMRT Sequencing
  3. Exchange 2007 配置POP3
  4. python判断网页密码加密方式_python实现网页登录时的rsa加密流程
  5. ML 04、模型评估与模型选择
  6. (四)Java B2B2C o2o多用户商城 springcloud架构-断路器(Hystrix)
  7. new是不是c语言运算符优先级表,C语言运算符优先级列表(超详细)
  8. 在阿里干了 5 年招聘,这 10 条建议我必须分享给你!
  9. SSH2+Daoz项目中的分页查询
  10. HDU ACM 1728 逃离迷宫 (广搜BFS)
  11. mybatis缓存二级缓存_MyBatis缓存与Apache Ignite的陷阱
  12. ArrayList学习[常用方法|源码]
  13. win10远程关机命令
  14. python环境调用OpenModelica模型并进行仿真计算
  15. js实现页面表格内容的复制粘贴填充,实现快速填写
  16. DISCUZ的数据字典3
  17. linux5关闭apic服务,Linux中断 - APIC
  18. 4年工作:从量变到质变(公开版)
  19. 数字电子技术——Verilog
  20. Javascript混淆与解混淆的那些事儿

热门文章

  1. vue 实现 发票打印功能
  2. 阿里云ACP学习资料
  3. pyechars切片器如何实现
  4. 黑马学生入职B站1年,晒出21K月薪:我想跳槽华为
  5. linux运行fastboot脚本,fastboot命令的自动补全
  6. c语言编程中野指针错误,小心C语言野指针
  7. Garmin 佳明 D2 charlie 设置尾翼编号
  8. Android核心基础(手机卫士的一个知识点总结)
  9. [POI2009]KAM-Pebbles
  10. HTML和CSS的概述