一.软件版本

  1.linux:centos6

  2.zookeeper:zookeeper-3.4.1

  3.kafka:kafka_2.12-2.2.0

  4.jdk:1.8
  5.instelliJ Idea

二.环境准备

  1.关闭防火墙:service iptables stop

  2.zookeeper:复制conf目录下的zoo_sample.cfg配置文件并改名为zoo.cfg

  3.kafka:修改配置文件conf/server.properties中

      ①broker.id=123(broker.id每个kafka的都不一样,必须唯一)

      ②advertised.listeners=PLAINTEXT://192.168.56.101:9092(对应自己的linux的ip地址)

      ③zookeeper.connect=127.0.0.1:2181(本机默认端口是2181,有需要到zookeeper的zoo.cfg文件中修改)

三.软件setup&start

  1.zookeeper:zookeeper-3.4.14/bin目录下执行

    ①./zkServer.sh start ../conf/zoo.cfg

  2.kafka:kafka_2.12-2.2.0/bin目录下执行
    ①./kafka-server-start.sh ../config/server.properties

四.创建topic&发送消息&消费消息

  1.创建topic “test”,kafka_2.12-2.2.0/bin目录下执行

    ①./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

    ②验证是否创建成功:./kafka-topics.sh --list --bootstrap-server localhost:9092

      

    ③开启消息生产者的console,kafka_2.12-2.2.0/bin目录下执行,并发送几条信息

      ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

      

    ④开启消息消费者的console,kafka_2.12-2.2.0/bin目录下执行

      ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

      

    以上步骤没出现问题表示kafka服务器端以及搭建完毕。

五.Spring boot2 kafka通信

  1.创建topic:TopicConfiguration

package com.aaron.kafka.springboot.configuration;import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.util.StringUtils;import java.util.HashMap;
import java.util.Map;@Configuration
public class TopicConfiguration {@Beanpublic KafkaAdmin admin() {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,StringUtils.arrayToCommaDelimitedString(new String[]{"192.168.56.101:9092"}));return new KafkaAdmin(configs);}@Beanpublic NewTopic topic1() {return new NewTopic("test", 1, (short) 2);}}

  2.spring boot2中建立kafka的消息生产者:KafkaProducerConfiguration

package com.aaron.kafka.springboot.configuration;import com.aaron.kafka.springboot.listener.MessageListenerImpl;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;import java.util.HashMap;
import java.util.Map;@Configuration
public class kafkaConsumerConfiguration {@Beanpublic MessageListenerImpl messageListener(){return new MessageListenerImpl();}@Beanpublic KafkaMessageListenerContainer kafkaMessageListenerContainer(){ContainerProperties containerProps = new ContainerProperties("test");containerProps.setMessageListener(messageListener());DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<Integer, String>(consumerProps());KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);return container;}/*** 设置consumer的properties* @return*/private Map<String,Object> consumerProps(){Map<String, Object> consumerProps = new HashMap<>();consumerProps.put("bootstrap.servers","192.168.56.101:9092");consumerProps.put("group.id","123");consumerProps.put("enable.auto.commit","true");consumerProps.put("auto.commit.interval.ms","1000");consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return consumerProps;}
}

    

  3.建立测试代码:KafkaTest

package com.aaron.kafka.springboot;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource;@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaTest {@Resourceprivate KafkaTemplate kafkaTemplate;@Testpublic void sendMsg(){kafkaTemplate.send("test","hello","world");}
}

  运行测试结果如下:

  4.Spring boot2中建立消费者和消息监听器:kafkaConsumerConfiguration和MessageListenerImpl  

package com.aaron.kafka.springboot.configuration;import com.aaron.kafka.springboot.listener.MessageListenerImpl;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;import java.util.HashMap;
import java.util.Map;@Configuration
public class kafkaConsumerConfiguration {@Beanpublic MessageListenerImpl messageListener(){return new MessageListenerImpl();}@Beanpublic KafkaMessageListenerContainer kafkaMessageListenerContainer(){ContainerProperties containerProps = new ContainerProperties("test");containerProps.setMessageListener(messageListener());DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<Integer, String>(consumerProps());KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);return container;}/*** 设置consumer的properties* @return*/private Map<String,Object> consumerProps(){Map<String, Object> consumerProps = new HashMap<>();consumerProps.put("bootstrap.servers","192.168.56.101:9092");consumerProps.put("group.id","123");consumerProps.put("enable.auto.commit","true");consumerProps.put("auto.commit.interval.ms","1000");consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");return consumerProps;}
}

package com.aaron.kafka.springboot.listener;
import org.springframework.kafka.listener.MessageListener;public class MessageListenerImpl implements MessageListener {@Overridepublic void onMessage(Object o) {System.out.println(o.toString());}
}

关闭服务器上的消费者然后启动springBoot

在服务器上的producer中输入文本 Hello World

Idea 中的console显示如下:

六.kafka的集群配置

  1.复制配置文件server.properties改名为server-1.properties、server-2.properties

    修改配置文件中

      broker.id=121

      log.dirs=/tmp/kafka-logs-1

      advertised.listeners=PLAINTEXT://192.168.56.101:9093

      (server-2.properties中的则为:

        broker.id=122

        log.dirs=/tmp/kafka-logs-2

        advertised.listeners=PLAINTEXT://192.168.56.101:9094)

  2.运行方式启动kafka,kafka_2.12-2.2.0/bin下执行

     ./kafka-server-start.sh ../config/server-1.properties

    ./kafka-server-start.sh ../config/server-2.properties

    后台方式运行命令如下:

    nohup ./kafka-server-start.sh ../config/server-1.properties &

    nohup ./kafka-server-start.sh ../config/server-2.properties &

  注意:如果出现

  则配置文件(server-1.properties和server-2.properties)中打开注释并修改为:listeners=PLAINTEXT://192.168.56.101:9093(server-2.properties的为:listeners=PLAINTEXT://192.168.56.101:9094)

转载于:https://www.cnblogs.com/aaronL/p/10762733.html

spring boot2 kafka相关推荐

  1. 3.了解Spring Boot2自动配置原理

    了解Spring Boot2自动配置原理 1.SpringBoot特点 1.1.依赖管理 1.父项目做依赖管理 依赖管理 <parent><groupId>org.spring ...

  2. spring boot2.3.1版本导入spring-boot-starter-web没有validation校验框架的解决办法

    导入 我们发现下面的包飘红 再查看spring boot2.3.1版本导入spring-boot-starter-web底下确实没有validation 解决办法 需要手动加入validation的依 ...

  3. Spring Boot2.x-13前后端分离的跨域问题解决方法之Nginx

    文章目录 概述 浏览器同源策略 后台搭建 pom.xml interceptor 配置 Controller 启动测试 浏览器和session 后端工程发布到服务器上 问题复现 通过Nginx反向代理 ...

  4. Spring Boot2.x-12 Spring Boot2.1.2中Filter和Interceptor 的使用

    文章目录 Interceptor 拦截器 拦截器中方法的执行流程 传统项目拦截器的配置 Spring Boot2.1.2整合拦截器Interceptor 示例 Step1 实现HandlerInter ...

  5. Spring Boot2.x-11 使用@ControllerAdvice和@ExceptionHandler实现自定义全局异常

    文章目录 概述 未使用全局异常且未显式捕获异常的情况 使用全局异常 Step1. 自定义异常类 Step2. 封装异常信息模板 Step3. 全局异常处理类 Step4. 使用全局异常 小结 概述 我 ...

  6. Spring Boot2.x-10 基于Spring Boot 2.1.2 + Mybatis 2.0.0实现多数据源,支持事务

    文章目录 概述 思路 步骤 Step1 多数据源配置文件applicaiton.yml Step2 初始化多个数据源 Step3 配置多个数据源 验证测试 支持事务 Step1 配置类中通过@Bean ...

  7. Spring Boot2.x-09 基于Spring Boot 2.1.2 + Mybatis使用自定义注解实现数据库切换

    文章目录 概述 场景说明:读写分离 操作步骤 工程结构 Step1 自定义注解 Step2 数据源定义 Step3 配置文件配置数据源 Step4 数据源实例化DatasourceConfig Ste ...

  8. Spring Boot2.x-08Spring Boot2.1.2 整合 Mybatis1.3.2 + 通用Mapper2.1.4 + PageHelper1.2.10 + Druid 1.1.10

    文章目录 概述 整合 MyBatis 整合 通用Mapper2.1.4及 PageHelper1.2.10 添加依赖 通用mapper 编写 application.yml增加配置 集成验证测试 整合 ...

  9. Spring Boot2.x-05Spring Boot基础-使用注解完成依赖注入

    文章目录 概述 @Autowired注解 @Autowired的匹配原则 @Autowired的 required 属性 使用@Primary 和@Qualifier消除@Autowired的歧义 @ ...

最新文章

  1. ubb代码转化html代码
  2. Task05:青少年软件编程(Python)等级考试模拟卷(一级)
  3. 天地图,js 4.0 api,简单调用,高手请绕行
  4. mysql 四舍五入
  5. 企业分布式微服务云SpringCloud SpringBoot mybatis (五)路由网关(zuul)
  6. 物联网安全的三个重点
  7. openstack基于mysql的问题修复
  8. linux 取消证书登录密码,SUSELinux 中为 SSH 访问设置不输入密码的证书认证登录方式...
  9. Mat的初始化以及Mat与数组的相互转化
  10. 解开 Windows 下的临界区中的代码死锁(转)
  11. 数据结构队列代码_代码简介:队列数据结构如何工作
  12. GDAL源码剖析(五)之Python命令行程序
  13. Java中的断言 Assert
  14. 饿了么UI库--Vue开发必备神器--让你轻松设计出好看的页面
  15. 如何用windows7搭建radius服务器
  16. 【申博攻略】一.北交计算机学院学术型博士“申请-考核”攻略(招生简章篇)
  17. 单机大天使服务器端修改商店,《大天使之剑h5》变态单机版最新开服_无限钻石打造最强阵容_满V公益私服...
  18. LTE-M通信系统传输建模与可用性分析
  19. 自变量趋于无穷大时函数的极限
  20. 2017年世界科技11大进展

热门文章

  1. RHEL下SendMail修改发邮箱地址
  2. JDK collections - 使用
  3. 最新25个高质量PSD素材免费下载
  4. 使用swfupload出现2049的错误原因
  5. 某医院信息化硬件平台建设方案
  6. PetShop之表示层设计
  7. python携程使用_简单了解python gevent 协程使用及作用
  8. du -sh 如何找到最大的文件夹_线上故障如何快速排查?来看这套技巧大全
  9. Jenkins介绍及安装
  10. 测试面试题集-逻辑推理题