spring boot2 kafka
一.软件版本
1.linux:centos6
2.zookeeper:zookeeper-3.4.1
3.kafka:kafka_2.12-2.2.0
4.jdk:1.8
5.instelliJ Idea
二.环境准备
1.关闭防火墙:service iptables stop
2.zookeeper:复制conf目录下的zoo_sample.cfg配置文件并改名为zoo.cfg
3.kafka:修改配置文件conf/server.properties中
①broker.id=123(broker.id每个kafka的都不一样,必须唯一)
②advertised.listeners=PLAINTEXT://192.168.56.101:9092(对应自己的linux的ip地址)
③zookeeper.connect=127.0.0.1:2181(本机默认端口是2181,有需要到zookeeper的zoo.cfg文件中修改)
三.软件setup&start
1.zookeeper:zookeeper-3.4.14/bin目录下执行
①./zkServer.sh start ../conf/zoo.cfg
2.kafka:kafka_2.12-2.2.0/bin目录下执行
①./kafka-server-start.sh ../config/server.properties
四.创建topic&发送消息&消费消息
1.创建topic “test”,kafka_2.12-2.2.0/bin目录下执行
①./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
②验证是否创建成功:./kafka-topics.sh --list --bootstrap-server localhost:9092
③开启消息生产者的console,kafka_2.12-2.2.0/bin目录下执行,并发送几条信息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
④开启消息消费者的console,kafka_2.12-2.2.0/bin目录下执行
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
以上步骤没出现问题表示kafka服务器端以及搭建完毕。
五.Spring boot2 kafka通信
1.创建topic:TopicConfiguration
package com.aaron.kafka.springboot.configuration;import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.util.StringUtils;import java.util.HashMap; import java.util.Map;@Configuration public class TopicConfiguration {@Beanpublic KafkaAdmin admin() {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,StringUtils.arrayToCommaDelimitedString(new String[]{"192.168.56.101:9092"}));return new KafkaAdmin(configs);}@Beanpublic NewTopic topic1() {return new NewTopic("test", 1, (short) 2);}}
2.spring boot2中建立kafka的消息生产者:KafkaProducerConfiguration
package com.aaron.kafka.springboot.configuration;import com.aaron.kafka.springboot.listener.MessageListenerImpl; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.KafkaMessageListenerContainer;import java.util.HashMap; import java.util.Map;@Configuration public class kafkaConsumerConfiguration {@Beanpublic MessageListenerImpl messageListener(){return new MessageListenerImpl();}@Beanpublic KafkaMessageListenerContainer kafkaMessageListenerContainer(){ContainerProperties containerProps = new ContainerProperties("test");containerProps.setMessageListener(messageListener());DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<Integer, String>(consumerProps());KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);return container;}/*** 设置consumer的properties* @return*/private Map<String,Object> consumerProps(){Map<String, Object> consumerProps = new HashMap<>();consumerProps.put("bootstrap.servers","192.168.56.101:9092");consumerProps.put("group.id","123");consumerProps.put("enable.auto.commit","true");consumerProps.put("auto.commit.interval.ms","1000");consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return consumerProps;} }
3.建立测试代码:KafkaTest
package com.aaron.kafka.springboot;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource;@RunWith(SpringRunner.class) @SpringBootTest public class KafkaTest {@Resourceprivate KafkaTemplate kafkaTemplate;@Testpublic void sendMsg(){kafkaTemplate.send("test","hello","world");} }
运行测试结果如下:
4.Spring boot2中建立消费者和消息监听器:kafkaConsumerConfiguration和MessageListenerImpl
package com.aaron.kafka.springboot.configuration;import com.aaron.kafka.springboot.listener.MessageListenerImpl; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.KafkaMessageListenerContainer;import java.util.HashMap; import java.util.Map;@Configuration public class kafkaConsumerConfiguration {@Beanpublic MessageListenerImpl messageListener(){return new MessageListenerImpl();}@Beanpublic KafkaMessageListenerContainer kafkaMessageListenerContainer(){ContainerProperties containerProps = new ContainerProperties("test");containerProps.setMessageListener(messageListener());DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<Integer, String>(consumerProps());KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);return container;}/*** 设置consumer的properties* @return*/private Map<String,Object> consumerProps(){Map<String, Object> consumerProps = new HashMap<>();consumerProps.put("bootstrap.servers","192.168.56.101:9092");consumerProps.put("group.id","123");consumerProps.put("enable.auto.commit","true");consumerProps.put("auto.commit.interval.ms","1000");consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return consumerProps;} }
package com.aaron.kafka.springboot.listener; import org.springframework.kafka.listener.MessageListener;public class MessageListenerImpl implements MessageListener {@Overridepublic void onMessage(Object o) {System.out.println(o.toString());} }
关闭服务器上的消费者然后启动springBoot
在服务器上的producer中输入文本 Hello World
Idea 中的console显示如下:
六.kafka的集群配置
1.复制配置文件server.properties改名为server-1.properties、server-2.properties
修改配置文件中
broker.id=121
log.dirs=/tmp/kafka-logs-1
advertised.listeners=PLAINTEXT://192.168.56.101:9093
(server-2.properties中的则为:
broker.id=122
log.dirs=/tmp/kafka-logs-2
advertised.listeners=PLAINTEXT://192.168.56.101:9094)
2.运行方式启动kafka,kafka_2.12-2.2.0/bin下执行
./kafka-server-start
.sh ../config
/server-1
.properties
./kafka-server-start
.sh ../config
/server-2
.properties
后台方式运行命令如下:
nohup ./kafka-server-start
.sh ../config
/server-1
.properties &
nohup ./kafka-server-start
.sh ../config
/server-2
.properties &
注意:如果出现
则配置文件(server-1.properties和server-2.properties)中打开注释并修改为:listeners=PLAINTEXT://192.168.56.101:9093(server-2.properties的为:listeners=PLAINTEXT://192.168.56.101:9094)
转载于:https://www.cnblogs.com/aaronL/p/10762733.html
spring boot2 kafka相关推荐
- 3.了解Spring Boot2自动配置原理
了解Spring Boot2自动配置原理 1.SpringBoot特点 1.1.依赖管理 1.父项目做依赖管理 依赖管理 <parent><groupId>org.spring ...
- spring boot2.3.1版本导入spring-boot-starter-web没有validation校验框架的解决办法
导入 我们发现下面的包飘红 再查看spring boot2.3.1版本导入spring-boot-starter-web底下确实没有validation 解决办法 需要手动加入validation的依 ...
- Spring Boot2.x-13前后端分离的跨域问题解决方法之Nginx
文章目录 概述 浏览器同源策略 后台搭建 pom.xml interceptor 配置 Controller 启动测试 浏览器和session 后端工程发布到服务器上 问题复现 通过Nginx反向代理 ...
- Spring Boot2.x-12 Spring Boot2.1.2中Filter和Interceptor 的使用
文章目录 Interceptor 拦截器 拦截器中方法的执行流程 传统项目拦截器的配置 Spring Boot2.1.2整合拦截器Interceptor 示例 Step1 实现HandlerInter ...
- Spring Boot2.x-11 使用@ControllerAdvice和@ExceptionHandler实现自定义全局异常
文章目录 概述 未使用全局异常且未显式捕获异常的情况 使用全局异常 Step1. 自定义异常类 Step2. 封装异常信息模板 Step3. 全局异常处理类 Step4. 使用全局异常 小结 概述 我 ...
- Spring Boot2.x-10 基于Spring Boot 2.1.2 + Mybatis 2.0.0实现多数据源,支持事务
文章目录 概述 思路 步骤 Step1 多数据源配置文件applicaiton.yml Step2 初始化多个数据源 Step3 配置多个数据源 验证测试 支持事务 Step1 配置类中通过@Bean ...
- Spring Boot2.x-09 基于Spring Boot 2.1.2 + Mybatis使用自定义注解实现数据库切换
文章目录 概述 场景说明:读写分离 操作步骤 工程结构 Step1 自定义注解 Step2 数据源定义 Step3 配置文件配置数据源 Step4 数据源实例化DatasourceConfig Ste ...
- Spring Boot2.x-08Spring Boot2.1.2 整合 Mybatis1.3.2 + 通用Mapper2.1.4 + PageHelper1.2.10 + Druid 1.1.10
文章目录 概述 整合 MyBatis 整合 通用Mapper2.1.4及 PageHelper1.2.10 添加依赖 通用mapper 编写 application.yml增加配置 集成验证测试 整合 ...
- Spring Boot2.x-05Spring Boot基础-使用注解完成依赖注入
文章目录 概述 @Autowired注解 @Autowired的匹配原则 @Autowired的 required 属性 使用@Primary 和@Qualifier消除@Autowired的歧义 @ ...
最新文章
- ubb代码转化html代码
- Task05:青少年软件编程(Python)等级考试模拟卷(一级)
- 天地图,js 4.0 api,简单调用,高手请绕行
- mysql 四舍五入
- 企业分布式微服务云SpringCloud SpringBoot mybatis (五)路由网关(zuul)
- 物联网安全的三个重点
- openstack基于mysql的问题修复
- linux 取消证书登录密码,SUSELinux 中为 SSH 访问设置不输入密码的证书认证登录方式...
- Mat的初始化以及Mat与数组的相互转化
- 解开 Windows 下的临界区中的代码死锁(转)
- 数据结构队列代码_代码简介:队列数据结构如何工作
- GDAL源码剖析(五)之Python命令行程序
- Java中的断言 Assert
- 饿了么UI库--Vue开发必备神器--让你轻松设计出好看的页面
- 如何用windows7搭建radius服务器
- 【申博攻略】一.北交计算机学院学术型博士“申请-考核”攻略(招生简章篇)
- 单机大天使服务器端修改商店,《大天使之剑h5》变态单机版最新开服_无限钻石打造最强阵容_满V公益私服...
- LTE-M通信系统传输建模与可用性分析
- 自变量趋于无穷大时函数的极限
- 2017年世界科技11大进展