上一篇文章我们已经成功安装了kafka,本文讲解部署kafka集群,并使用springboot整合测试。

设置多 broker 集群

由于只有一台虚拟机,于是通过多个配置文件模拟多台broker
首先为每个broker创建一个配置文件:

cp config/server.properties config/server-1.propertie
cp config/server.properties config/server-2.properties

现在编辑这些新建的文件,设置以下属性
config/server.properties:

broker.id=0
#此处填写你的服务器ip
listeners=PLAINTEXT://192.168.130.128:9092
#选择你的logs存放目录
log.dirs=/usr/kafka2.12/kafka-logs
delete.topic.enable=true
#zookeeper集群信息
zookeeper.connect=192.168.130.128:2181,192.168.130.128:2182,192.168.130.128:2183

config/server-1.properties:

broker.id=1
#此处填写你的服务器ip
listeners=PLAINTEXT://192.168.130.128:9093
#选择你的logs存放目录
log.dirs=/usr/kafka2.12/kafka-logs-1
delete.topic.enable=true
#zookeeper集群信息
zookeeper.connect=192.168.130.128:2181,192.168.130.128:2182,192.168.130.128:2183

config/server-2.properties:

broker.id=0
#此处填写你的服务器ip
listeners=PLAINTEXT://192.168.130.128:9094
#选择你的logs存放目录
log.dirs=/usr/kafka2.12/kafka-logs-2
delete.topic.enable=true
#zookeeper集群信息
zookeeper.connect=192.168.130.128:2181,192.168.130.128:2182,192.168.130.128:2183

broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的。
启动三个的节点

bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

现在创建一个副本为3的新 topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic new_topic

当其中一台broker挂掉时,再次启动执行此命令重置优先副本

bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181

springboot整合kafka

导入依赖

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.3.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

application.yml

spring:application:name: spring-boot-kafka# kafkakafka:# 指定 kafka集群地址bootstrap-servers:- 192.168.130.128:9092- 192.168.130.128:9093- 192.168.130.128:9094# 指定listener 容器中的线程数,用于提高并发量listener:concurrency: 3# 生产者的配置producer:# 每次批量发送消息的数量batch-size: 1000retries: 0buffer-memory: 33554432# key,value序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 消费者的配置consumer:# 指定默认消费者group idgroup-id: test-group2auto-offset-reset: latest# 是否开启自动提交enable-auto-commit: true# 自动提交的时间间隔auto-commit-interval: 1000# key,value的解码方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 指定默认topic idtemplate:default-topic: new_topic

引导类

package com.sunyuqi.springboot;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaDemoApplication {public static void main(String[] args) {SpringApplication.run(KafkaDemoApplication.class, args);}
}

生产者

package com.sunyuqi.springboot.producer;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;@Component
public class Producer {private static final Logger log = LoggerFactory.getLogger(Producer.class);@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String data) {log.info("kafka sendMessage start");ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {log.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);}@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("kafka sendMessage success topic = {}, data = {}",topic, data);}});log.info("kafka sendMessage end");}
}

消费者

package com.sunyuqi.springboot.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@KafkaListener(topics = {"new_topic"})public void processMessage(ConsumerRecord<?, ?> record) {System.out.printf("topic is %s, offset is %d, value is %s \n", record.topic(), record.offset(), record.value());}
}

运行引导类,监听消息
测试类

package com.sunyuqi.springboot;import com.sunyuqi.springboot.producer.Producer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaDemoApplicationTests {@Autowiredprivate Producer producer;@Testpublic void contextLoads() {producer.sendMessage("new_topic", "hello world");}
}

运行测试类,发送消息

成功接收到消息

搭建kafka集群并使用springboot 整合相关推荐

  1. 单机 搭建kafka集群 本地_单机快速搭建多节点kafka集群

    有时候为了更好地了解kafka集群的运行机制,需要自己搭建kafka集群.本文的目的就是让大家在单机上快速搭建kafka集群(仅作为单机测试使用). 环境及工具版本 mac OS 10.15.5 ka ...

  2. Mac 使用 docker 搭建 kafka 集群 + Zookeeper + kafka-manager

    Kafka 搭建: 建立Zookeeper容器: 这里我们用最简单的方式创建一个独立的Zookeeper节点,如果要考虑zookeeper的高可用,可以将其做成一个集群,最好是能有多台机器. $ do ...

  3. 搭建Kafka集群环境

    计划使用三台主机:11.12.112.206.11.12.112.207.11.12.112.208搭建Kafka集群环境, 使用的zookeeper集群为:11.12.112.215:2181,11 ...

  4. docker环境,搭建kafka集群

    https://zhuanlan.zhihu.com/p/114968151 docker环境,搭建kafka集群

  5. 单机 搭建kafka集群 本地_10分钟搭建单机Kafka集群

    单机版kafka集群有什么作用 练习上手用. 搭建zookeeper集群首先下载zookeeper解压 apache zookeeper tar -zxvf apache-zookeeper-3.5. ...

  6. Kafka:搭建Kafka集群

    博主在之前已经介绍过如何部署Kafka,Kafka的部署模式只有集群模式,Kafka的架构本就是天然的集群架构,因此单节点的部署和多节点的部署是类似的. 集群节点: 节点 地址 ZooKeeper 1 ...

  7. Linux 搭建Kafka集群,最新教程,细到极致

      大家好呀,今天给大家带来的是,最新版kafka集群的安装教程,希望给小伙伴们一点小小的帮助. 注意:提前安装好jdk, Jdk安装教程 1.准备安装包,Kafka官网下载 2.kafka安装需要z ...

  8. Docker搭建Kafka集群

    对于个人开发者而言,一般手头上没有多台服务器,有人可能会有云服务器,不过一般也只会买一台来用用就好:有人可能更习惯将本机当作服务器来玩.都可以.那么如何通过一台服务器或本机来搭建Kafka集群呢? 无 ...

  9. docker-compose快速搭建kafka集群

    文章目录 前言 docker-compose快速搭建kafka集群(较详细) docker-compose快速搭建Zookeeper集群+kafka集群 参考资料 前言 当前是在学习kafka3.0的 ...

最新文章

  1. 2(3).选择排序_快排(线性表)
  2. Android stadio
  3. 面向对象并不是必要的
  4. Python之路(第二十七篇) 面向对象进阶:内置方法、描述符
  5. php递归算法的简单示例,php递归函数 php递归算法经典实例大全 | 帮助信息-动天数据...
  6. 2017年日本光伏市场展望
  7. 在文档阅读器上为 PDF 文档生成多级目录
  8. 设置idea类注释模板
  9. 【电子知识篇】放大器定义与分类
  10. 电脑无法启动显示计算机comt,电脑开机显示press any key to restart进不了系统怎么办?...
  11. Webshell的预防措施
  12. Linux命令学习之五
  13. 如何防止论文重复率高?
  14. linux内核中使用inet_ntop,linux网络编程之inet_pton和inet_ntop函数
  15. 二.百度UEditor编辑器之配置文件:ueditor.config.js
  16. 计算机网络常用术语WWW,计算机网络常用术语中英文对照表
  17. 火狐html页面空白页,火狐修改空白新标签页背景颜色,适应深色主题
  18. 大话设计模式 简单工厂模式
  19. 【转】《信号完整性分析》个人学习笔记
  20. 推荐一个超好用免费PDF软件(微软自带免费)

热门文章

  1. 计算机三级网络技术(补充)
  2. 前端基础 CSS 第十一章 使用CSS样式表 ----暑假学习第七、八天
  3. 天线为什么会有多次谐振_如果天线不处于谐振状态,辐射效率会受到多大影响?...
  4. c语言打字游戏程序设计报告,打字游戏程序设计报告.doc
  5. 通识~FIR数字滤波器设计讲解
  6. 金蝶KIS商贸版实现'条码标签打印'功能进行商品条码打印
  7. html+p标签和span,文章段落用span和p标签对seo有影响吗
  8. 传奇人物郭盛华:汽车黑客,远远比我们想象中还要恐怖
  9. 《一本书读懂大数据营销 玩透大数据营销 创造网络营销奇迹》pdf下载 百度云
  10. 解决 C# 中 Using ‘UseMvcWithDefaultRoute‘ to configure MVC is not supported while using Endpoint Routin