搭建kafka集群并使用springboot 整合
上一篇文章我们已经成功安装了kafka,本文讲解部署kafka集群,并使用springboot整合测试。
设置多 broker 集群
由于只有一台虚拟机,于是通过多个配置文件模拟多台broker
首先为每个broker创建一个配置文件:
cp config/server.properties config/server-1.propertie
cp config/server.properties config/server-2.properties
现在编辑这些新建的文件,设置以下属性
config/server.properties:
broker.id=0
#此处填写你的服务器ip
listeners=PLAINTEXT://192.168.130.128:9092
#选择你的logs存放目录
log.dirs=/usr/kafka2.12/kafka-logs
delete.topic.enable=true
#zookeeper集群信息
zookeeper.connect=192.168.130.128:2181,192.168.130.128:2182,192.168.130.128:2183
config/server-1.properties:
broker.id=1
#此处填写你的服务器ip
listeners=PLAINTEXT://192.168.130.128:9093
#选择你的logs存放目录
log.dirs=/usr/kafka2.12/kafka-logs-1
delete.topic.enable=true
#zookeeper集群信息
zookeeper.connect=192.168.130.128:2181,192.168.130.128:2182,192.168.130.128:2183
config/server-2.properties:
broker.id=0
#此处填写你的服务器ip
listeners=PLAINTEXT://192.168.130.128:9094
#选择你的logs存放目录
log.dirs=/usr/kafka2.12/kafka-logs-2
delete.topic.enable=true
#zookeeper集群信息
zookeeper.connect=192.168.130.128:2181,192.168.130.128:2182,192.168.130.128:2183
broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的。
启动三个的节点
bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
现在创建一个副本为3的新 topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic new_topic
当其中一台broker挂掉时,再次启动执行此命令重置优先副本
bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181
springboot整合kafka
导入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.3.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
application.yml
spring:application:name: spring-boot-kafka# kafkakafka:# 指定 kafka集群地址bootstrap-servers:- 192.168.130.128:9092- 192.168.130.128:9093- 192.168.130.128:9094# 指定listener 容器中的线程数,用于提高并发量listener:concurrency: 3# 生产者的配置producer:# 每次批量发送消息的数量batch-size: 1000retries: 0buffer-memory: 33554432# key,value序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# 消费者的配置consumer:# 指定默认消费者group idgroup-id: test-group2auto-offset-reset: latest# 是否开启自动提交enable-auto-commit: true# 自动提交的时间间隔auto-commit-interval: 1000# key,value的解码方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 指定默认topic idtemplate:default-topic: new_topic
引导类
package com.sunyuqi.springboot;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaDemoApplication {public static void main(String[] args) {SpringApplication.run(KafkaDemoApplication.class, args);}
}
生产者
package com.sunyuqi.springboot.producer;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;@Component
public class Producer {private static final Logger log = LoggerFactory.getLogger(Producer.class);@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String data) {log.info("kafka sendMessage start");ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {log.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);}@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("kafka sendMessage success topic = {}, data = {}",topic, data);}});log.info("kafka sendMessage end");}
}
消费者
package com.sunyuqi.springboot.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@KafkaListener(topics = {"new_topic"})public void processMessage(ConsumerRecord<?, ?> record) {System.out.printf("topic is %s, offset is %d, value is %s \n", record.topic(), record.offset(), record.value());}
}
运行引导类,监听消息
测试类
package com.sunyuqi.springboot;import com.sunyuqi.springboot.producer.Producer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaDemoApplicationTests {@Autowiredprivate Producer producer;@Testpublic void contextLoads() {producer.sendMessage("new_topic", "hello world");}
}
运行测试类,发送消息
成功接收到消息
搭建kafka集群并使用springboot 整合相关推荐
- 单机 搭建kafka集群 本地_单机快速搭建多节点kafka集群
有时候为了更好地了解kafka集群的运行机制,需要自己搭建kafka集群.本文的目的就是让大家在单机上快速搭建kafka集群(仅作为单机测试使用). 环境及工具版本 mac OS 10.15.5 ka ...
- Mac 使用 docker 搭建 kafka 集群 + Zookeeper + kafka-manager
Kafka 搭建: 建立Zookeeper容器: 这里我们用最简单的方式创建一个独立的Zookeeper节点,如果要考虑zookeeper的高可用,可以将其做成一个集群,最好是能有多台机器. $ do ...
- 搭建Kafka集群环境
计划使用三台主机:11.12.112.206.11.12.112.207.11.12.112.208搭建Kafka集群环境, 使用的zookeeper集群为:11.12.112.215:2181,11 ...
- docker环境,搭建kafka集群
https://zhuanlan.zhihu.com/p/114968151 docker环境,搭建kafka集群
- 单机 搭建kafka集群 本地_10分钟搭建单机Kafka集群
单机版kafka集群有什么作用 练习上手用. 搭建zookeeper集群首先下载zookeeper解压 apache zookeeper tar -zxvf apache-zookeeper-3.5. ...
- Kafka:搭建Kafka集群
博主在之前已经介绍过如何部署Kafka,Kafka的部署模式只有集群模式,Kafka的架构本就是天然的集群架构,因此单节点的部署和多节点的部署是类似的. 集群节点: 节点 地址 ZooKeeper 1 ...
- Linux 搭建Kafka集群,最新教程,细到极致
大家好呀,今天给大家带来的是,最新版kafka集群的安装教程,希望给小伙伴们一点小小的帮助. 注意:提前安装好jdk, Jdk安装教程 1.准备安装包,Kafka官网下载 2.kafka安装需要z ...
- Docker搭建Kafka集群
对于个人开发者而言,一般手头上没有多台服务器,有人可能会有云服务器,不过一般也只会买一台来用用就好:有人可能更习惯将本机当作服务器来玩.都可以.那么如何通过一台服务器或本机来搭建Kafka集群呢? 无 ...
- docker-compose快速搭建kafka集群
文章目录 前言 docker-compose快速搭建kafka集群(较详细) docker-compose快速搭建Zookeeper集群+kafka集群 参考资料 前言 当前是在学习kafka3.0的 ...
最新文章
- 2(3).选择排序_快排(线性表)
- Android stadio
- 面向对象并不是必要的
- Python之路(第二十七篇) 面向对象进阶:内置方法、描述符
- php递归算法的简单示例,php递归函数 php递归算法经典实例大全 | 帮助信息-动天数据...
- 2017年日本光伏市场展望
- 在文档阅读器上为 PDF 文档生成多级目录
- 设置idea类注释模板
- 【电子知识篇】放大器定义与分类
- 电脑无法启动显示计算机comt,电脑开机显示press any key to restart进不了系统怎么办?...
- Webshell的预防措施
- Linux命令学习之五
- 如何防止论文重复率高?
- linux内核中使用inet_ntop,linux网络编程之inet_pton和inet_ntop函数
- 二.百度UEditor编辑器之配置文件:ueditor.config.js
- 计算机网络常用术语WWW,计算机网络常用术语中英文对照表
- 火狐html页面空白页,火狐修改空白新标签页背景颜色,适应深色主题
- 大话设计模式 简单工厂模式
- 【转】《信号完整性分析》个人学习笔记
- 推荐一个超好用免费PDF软件(微软自带免费)
热门文章
- 计算机三级网络技术(补充)
- 前端基础 CSS 第十一章 使用CSS样式表 ----暑假学习第七、八天
- 天线为什么会有多次谐振_如果天线不处于谐振状态,辐射效率会受到多大影响?...
- c语言打字游戏程序设计报告,打字游戏程序设计报告.doc
- 通识~FIR数字滤波器设计讲解
- 金蝶KIS商贸版实现'条码标签打印'功能进行商品条码打印
- html+p标签和span,文章段落用span和p标签对seo有影响吗
- 传奇人物郭盛华:汽车黑客,远远比我们想象中还要恐怖
- 《一本书读懂大数据营销 玩透大数据营销 创造网络营销奇迹》pdf下载 百度云
- 解决 C# 中 Using ‘UseMvcWithDefaultRoute‘ to configure MVC is not supported while using Endpoint Routin