Spring Boot 整合之前的内容

项目名称 描述 地址
base-data-mybatis 整合mybatis-plus(实际上官方教程已经很多,只做了自定义插件) 未完成
base-jpa JPA基础使用 JPA 数据模型定义
base-jpa-query JPA多表关联使用 JPA 数据模型关联操作
base-log 日志配置 SpringBoot日志配置
base-rabbit rabbitMQ简单使用 RabbitMQ基础使用
base-rabbit3 rabbitMQ一些自定义配置 消息确认回调、消息转换以及消息异常处理
base-rabbit-delay rabbitMQ延时队列 延时队列和消息重试
base-redis redis简单使用 RedisTemplate基础使用;Redis实现简单的发布订阅以及配置序列化方式
base-redis-lock redis分布式锁 Redis分布式锁的简单实现
base-redis-delay 基于有赞的延时消息方案的简单实现 延时队列的简单实现
base-swagger swagger使用 wagger2使用
base-mongodb mongodb简单使用 MongoDB安装以及Spring Boot整合,MongoDB实体创建以及简单CRUD,MongoDB聚合操作,MongoDB分组去重以及MongoDB联表查询

关于版本

依赖 版本
springboot 2.0.8.RELEASE
mongodb 4.0.14

项目地址

因为涉及的代码较多,所以并不会贴出所有代码。本篇文章涉及的源码下载地址:springboot-samples

kafka消息请求的原理

这里简单的介绍下kafka发送消息的流程。(更详细的原理在网络上已经有足够的文章,这里不再赘述)。另外关于如何安装kafka目前也存在相当多的文章。

kafka相关名词

  • broker
  • producer
  • consumer
  • topic
  • partition

producer

Producer 作为消息的生产者,负责生产数据推入broker中。

consumer

Producer 作为消息的消费者,从broker中的某个topic中获取数据。

broker

broker 作为消息服务载体,一般又多个kafka server组成。

topic

kafka对消息进行拆分的方式,不同的数据被保存在不同的topic中,通过将消息推入某个topic或者从某个topic中获取消息来实现消息业务的进行。

partition

每个topic拆分成多个partition,每个partition又由一个一个消息组成。每个消息都被标识了一个递增序列号代表其进来的先后顺序,并按顺序存储在partition中。

consumer group

关于用户组可以简单理解为。假如有十个consumer订阅同一个topci其groupid不同,则这个topic中的任一条消息被消费10次。如果其groupid相同,此消息只能被其中一个消费。

整个消息生产流程

topic1

推入数据到topic1
推入数据到topic1
数据保存到partition0
数据保存到partition1
数据同步
数据同步
数据同步
数据同步
topic1
partition0-leader
partition1-leader
partition0-following1
partition0-following2
partition1-following1
partition1-following2
producer1
producer2

消息的消费流程

groupid2 groupid1

消息被用户组groupid1消费
消息被用户组groupid2消费
此时只能被一个此组中的一个消费
此时只能被一个此组中的一个消费
consumer21
groupid2
consumer22
consumer23
groupid1
consumer11
consumer12
consumer13
topic1
consumer

关于Spring-Boot整合kafka

关于依赖

  • 主项目依赖
    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.8.RELEASE</version><relativePath/> </parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties>
  • 子项目依赖
    <dependencies><dependency><artifactId>base-core</artifactId><groupId>daifyutils</groupId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>

spring-kafka配置

spring:application:name: base.kafkakafka:bootstrap-servers: kafka服务地址1:端口,kafka服务地址2:端口,kafka服务地址3:端口producer:# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。retries: 0#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。#可以设置的值为:all, -1, 0, 1acks: 1consumer:group-id: testGroup# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallestauto-offset-reset: earliest# 设置自动提交offsetenable-auto-commit: true# 如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。auto-commit-interval: 100max-poll-records: 5server:port: 8060

对topic的操作

基于topic的操作需要我们在项目中实例化下面的bean。SpringBoot对topic的操作主要是通过KafkaAdmin进行操作

/*** 主要是初始化对kafka进行操作的admin对象* @author daify*/
@Configuration
@ConditionalOnClass(KafkaAdmin.class)
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaBaseConfiguration {private final KafkaProperties properties;public KafkaBaseConfiguration(KafkaProperties properties) {this.properties = properties;}/*** 初始化对kafka执行操作的对象* @return*/@Beanpublic KafkaAdmin kafkaAdmin() {KafkaAdmin admin = new KafkaAdmin(this.properties.buildProducerProperties());return admin;}/*** 初始化操作连接* @return*/@Beanpublic AdminClient adminClient() {return AdminClient.create(kafkaAdmin().getConfig());}
}

新增topic

假如希望操作kafka中的topic可以使用下面的代码

  • 新增
    /*** 创建topic* @param topicName* @return*/public String createTopic(String topicName) {NewTopic topic = new NewTopic(topicName, 2, (short) 1);adminClient.createTopics(Arrays.asList(topic));return topicName;}

查看topic

需要查询指定topic的信息可以使用下面的方式

    /*** 查询topic* @param topicName* @return*/public String queryTopic(String topicName) {DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList(topicName));StringBuffer sb = new StringBuffer("topic信息:");try {result.all().get().forEach((k,v)->sb.append("key").append(k).append(";v:").append(v));} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return sb.toString();}

比如上面我们创建的topic用上面代码查询可以得到下面内容

name=createTopic3,
internal=false,
partitions=(partition=0, leader=localhost:9093 (id: 1 rack: null),replicas=localhost:9093 (id: 1 rack: null), localhost:9094 (id: 2 rack: null), isr=localhost:9093 (id: 1 rack: null), localhost:9094 (id: 2 rack: null)),
(partition=1, leader=localhost:9094 (id: 2 rack: null), replicas=localhost:9094 (id: 2 rack: null), localhost:9095 (id: 3 rack: null), isr=localhost:9094 (id: 2 rack: null), localhost:9095 (id: 3 rack: null)
)

删除topic

需要删除指定topic的信息可以使用下面的方式

    /*** 删除topic* @param topicName* @return*/public String deleteTopic(String topicName) {adminClient.deleteTopics(Arrays.asList(topicName));return topicName;}

消息的生产和消费

消息的生产

需要向kafka发送消息可以使用下面的语句。

/*** kafka消息发送者*/
@Component
public class KafkaSender {@Autowiredprivate KafkaTemplate<String,Object> kafkaTemplate;/***  发送文字消息* @param message* @return*/public String sendStr(String message){kafkaTemplate.send(KafkaConfig.TOPIC1,message);return message;}/***  发送对象消息* @param obj* @return*/public String sendObj(Object obj){String message = JSON.toJSONString(obj);kafkaTemplate.send(KafkaConfig.TOPIC2,message);return message;}
}

消息的消费

Spring-Boot封装了大量kafka的方法。所以仅仅是需要获得某些topic的数据可以使用这种方法创建消息监听器


@Component
@Log4j2
public class KafkaConsumerListener {@KafkaListener(topics = "kafka-topic1")public void onMessage1(String message){System.out.println(message);log.info("kafka-topic1接收结果:{}",message);}@KafkaListener(topics = "kafka-topic2")public void onMessage2(String message){System.out.println(message);log.info("kafka-topic2接收结果:{}",message);}
}

上面的代码订阅量两个不同的topic。而我们在初始化配置的时候有一项group-id: testGroup这个时候项目启动时,使用上面的方法消费者实例会被归入到testGroup用户组。

当我们向某个topic发送消息的时候控制台会输入下面内容

{"id":100,"name":"message对象","type":2}
2020-05-02 00:09:01.400  INFO 20296 --- [ntainer#0-0-C-1] d.s.k.b.consumer.KafkaConsumerListener   : kafka-topic2接收结果:{"id":100,"name":"message对象","type":2}

到目前,springboot关于kafka的简单操作就截止了。后面我会介绍一些其他的内容。


个人水平有限,上面的内容可能存在没有描述清楚或者错误的地方,假如开发同学发现了,请及时告知,我会第一时间修改相关内容。假如我的这篇内容对你有任何帮助的话,麻烦给我点一个赞。你的点赞就是我前进的动力。

Spring Boot 整合——Spring Boot整合kafka整合相关推荐

  1. spring boot整合spring security笔记

    最近自己做了一个小项目,正在进行springboot和spring Security的整合,有一丢丢的感悟,在这里分享一下: 首先,spring boot整合spring security最好是使用T ...

  2. Spring Boot(Spring的自动整合框架)

    2019独角兽企业重金招聘Python工程师标准>>> Spring Boot 是一套基于Spring框架的微服务框架,由于Spring是一个轻量级的企业开发框架,主要功能就是用于整 ...

  3. Spring Boot 应用系列 1 -- Spring Boot 2 整合Spring Data JPA和Druid,双数据源

    最近Team开始尝试使用Spring Boot + Spring Data JPA作为数据层的解决方案,在网上逛了几圈之后发现大家并不待见JPA,理由是(1)MyBatis简单直观够用,(2)以Hib ...

  4. springboot templates读取不到_整合spring mvc + mybatis,其实很简单,spring boot实践(5)

    01 spring boot读取配置信息 02 多环境配置 03 处理全局异常 04 spring boot admin 主要通过spring boot整合spring mvc 以及mybatis实现 ...

  5. springboot整合hibernate_峰哥说技术系列-17 .Spring Boot 整合 Spring Data JPA

    今日份主题 Spring Boot 整合 Spring Data JPA JPA(Java Persistence API)是用于对象持久化的 API,是Java EE 5.0 平台标准的 ORM 规 ...

  6. Spring Boot整合Spring Data Redis-整合步骤

    如何通过SpringBoot去整合我们的Redis,这里我们先对SpringBoot Redis做一个简单的介绍,其实SpringBoot Redis,和我们之前讲的Spring JPA都是Sprin ...

  7. 第九篇:Spring Boot整合Spring Data JPA_入门试炼01

    Spring Data JPA:介绍: Spring Data就是spring提供操作数据库的框架,而Spring Data JPA只是Spring Data框架下的一个基于JPA标准操作数据库的模块 ...

  8. Spring boot 整合Spring Security Jwt

    记录学习Spring boot 整合Spring Security Jwt 学习参考 – 慢慢的干货 https://shimo.im/docs/OnZDwoxFFL8bnP1c/read 首先创建S ...

  9. Spring Boot 整合——Spring batch重试和回滚

    关于版本 依赖 版本 springboot 2.4.0 spring batch 2.4.0 代码地址 因为每个例子涉及代码较多,且包含测试用例,如果都贴到文章中内容过多,所以只贴出了部分代码.全部的 ...

最新文章

  1. bash shell test条件测试[[ ]]和[ ]异同小结
  2. 边缘计算架构_更灵活的自动化系统架构、通信和编程——在自动化领域部署边缘计算...
  3. SpringBoot运行原理初探
  4. html dom透明度,HTML DOM Style overflow 属性
  5. Layout Management
  6. php for循环可以变量关联数组,数组与字符串,变量之间的转换+数组元素的回调处理+用for()循环来遍历关联数组...
  7. 74hc165C语言程序,单片机驱动74hc165程序
  8. oracle 快速检索表名称及包含的字段名称
  9. 时间复杂度与空间复杂度-o(1)、o(n)、o(logn)、o(nlogn)、斐波那契
  10. UVM设置超时退出timeout
  11. 开心网kaixin001状告kaixin,停用“开心网”名称,赔偿1000万元
  12. initializationerror错误的解决
  13. 第八篇《高速铁路钢轨光带检测系统》论文阅读笔记
  14. Serverless 极致弹性解构在线游戏行业痛点,你有过迷茫吗
  15. unicloud云开发---uniapp云开发(一)---服务空间创建以及部署一个云函数
  16. 简单的解决textarea文本框内容换行,对应到页面的内容也换行的问题
  17. Lesson 18 He often does this! 他经常干这种事!
  18. (十四)懈寄生(4)
  19. 嵌入式物联网入门:物联网工程就业方向及前景
  20. 使用OpenWrt实现IPv6 DDNS

热门文章

  1. 应届毕业生转行java_应届毕业生、转行人必看!2019最新薪酬报告
  2. 硬件设计37之积分放大电路的继续研究
  3. 电子商务商城系统开发方案:中大型交易类电商网站架构设计
  4. STC52检测开关C语言,STC89C52源代码.c
  5. 搭建国产化麒麟kylinos操作系统虚拟机
  6. 项目需求分析(那周余嘉熊掌将得队)
  7. ESP32超详细学习记录:NTP同步时间
  8. 八骏登场 学子圆梦 一卷在手 良师益友
  9. 雪碧图PHP,雪碧图有什么用
  10. clickhouse--物化视图