Kafka 入门和 Spring Boot 集成
2019独角兽企业重金招聘Python工程师标准>>>
Kafka 入门和 Spring Boot 集成
概述
kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流指的是数据流)。由java 和 Scala 语言编写,最早由 LinkedIn 开发,并 2011年开源,现在由 Apache 开发维护。
应用场景
下面列举了一些kafka常见的应用场景。
消息队列 : Kafka 可以作为消息队列使用,可用于系统内异步解耦,流量削峰等场景。
应用监控:利用 Kafka 采集应用程序和服务器健康相关的指标,如应用程序相关的日志,服务器相关的 CPU、占用率、 IO、内存、连接数、 TPS、 QPS等,然后将指标信息进行处理,从而构建一个具有监控仪表盘、曲线图等可视化监控系统。 例如, 很多公司采用 Kafka 与 ELK(ElasticSearch、 Logstash 和Kibana)整合构建应用服务的监控系统。
流处理:比如将 kafka 接收到的数据发送给 Storm 流式计算框架处理。
基本概念
record(消息):kafka 通信的基本单位,每一条消息称为record
producer (生产者 ):发送消息的客户端。
consumer(消费者 ):消费消息的客户端。
consumerGroup (消费者组):每一个消费者都属于一个特定的消费者组。
消费者和消费者组的关系:
- 如果a,b,c 属于同一个消费者组,那一条消息只能被 a,b,c 中的某一个消费者消费。
- 如果a,b,c 属于不同的消费者组(比如 ga,gb,gc) ,那一条消息过来,a,b,c 三个消费者都能消费到。
topic (主题): kafka的消息通过topic来分类,类似于数据库的表。 producer 发布消息到 topic,consumer订阅 topic 进行消费
partition( 分区):一个topic会被分成一到多个分区(partition),然后多个分区可以分布在不同的机器上,这样一个主题就相当于运行在了多台机子上,kafka用分区的方式提高了性能和吞吐量
replica (副本):一个分区有一到多个副本,副本的作用是提高分区的 可用性。
offset(偏移量):偏移量 类似数据库自增int Id,随着数据的不断写入 kafka 分区内的偏移量会不断增加,一条消息由一个唯一的偏移量来标识。偏移量的作用是,让消费者知道自己消费到了哪个位置,下次可以接着从这里消费。如下图: 消费者A 消费到了 offset 为 9 的记录,消费者 B 消费到了offset 为 11 的记录。
基本结构
kafka 最基本的结构如下,跟常见的消息队列结构一样。 消息通过生产者发送到 kafka 集群, 然后消费者从 kafka 集群拉取消息进行消费。
和Spring Boot 集成
集成概述
本集成方式采用的是 spring boot 官方文档说的集成方式,官方链接,集成的大体思路是,通过在 spring boot application.properties 中配置 生产者和消费者的基本信息,然后spring boot 启动后会创建 KafkaTemplate 对象,这个对象可以用来发送消息到Kafka,然后用 @KafkaListener 注解来消费 kafka 里面的消息,具体步骤如下。
集成环境
spring boot
:1.5.13 版本 spring-kafka
:1.3.5 版本 kafka
:1.0.1 版本
kafka 环境搭建
先启动Zookeeper:
docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime zookeeper:latest
再启动Kafka:替换下面的IP为你服务器IP即可
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.10.253 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:1.0.1
Spring Boot 和 Spring for Apache Kafka 集成步骤
- 首先pom中引入 Spring for Apache Kafka
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
- 然后 application.properties 配置文件中加入如下配置: 各个配置的解释见:spring boot 附录中的 kafka 配置,搜索kafka 关键字即可定位。
server.port=8090####### kafka### producer 配置
spring.kafka.producer.bootstrap-servers=192.168.10.48:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer### consumer 配置
spring.kafka.consumer.bootstrap-servers=192.168.10.48:9092
spring.kafka.consumer.group-id=anuoapp
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.concurrency=5
- 创建 Kafka Producer 生产者
package com.example.anuoapp.kafka;import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;@Component
public class KafkaProducer {@AutowiredKafkaTemplate kafkaTemplate;public void kafkaSend() throws Exception {UserAccount userAccount=new UserAccount();userAccount.setCard_name("jk");userAccount.setAddress("cd");ListenableFuture send = kafkaTemplate.send("jktopic", "key", JSON.toJSONString(userAccount));}
}
- 创建 Kafka Consumer 消费者
package com.example.anuoapp.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumer {public static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);@KafkaListener(topics = {"jktopic"})public void jktopic(ConsumerRecord consumerRecord) throws InterruptedException {System.out.println(consumerRecord.offset());System.out.println(consumerRecord.value().toString());Thread.sleep(3000);}}
- 创建一个rest api 来调用 Kafka 的消息生产者
package com.example.anuoapp.controller;import com.example.anuoapp.kafka.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/api/system")
public class SystemController {private Logger logger = LoggerFactory.getLogger(SystemController.class);@AutowiredKafkaProducer kafkaProducer;@RequestMapping(value = "/Kafka/send", method = RequestMethod.GET)public void WarnInfo() throws Exception {int count=10;for (int i = 0; i < count; i++) {kafkaProducer.kafkaSend();}}}
- 用 post man 调用 第 5 步创建的接口, 就可以看到 如下消费者产生的输出信息
30
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}
31
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}
32
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}
最后
恭喜你 ! spring boot kafka 集成完毕。
完整的基础源码见: 链接: https://pan.baidu.com/s/1E2Lmbj9A9uruTXG54uPl_g 密码: e6d6
踩过的坑
集成 spring kafka的时候注意版本要选对 ,目前spring boot 1.5.1 对应集成 spring kafka 1.x 版本 , 如果直接集成 spring kafka 2.x 版本 的话会报错, 要用 spring kafka 2.x 的话, 需要升级spring boot 到 2.x 版本。
消费者配置的时候 spring.kafka.consumer.max-poll-records=1 ,轮询拉取的最大记录数要设置成1,不然会出现重复消费,即:如果消费者程序崩了,再启动起来,会消费到以前消费过的数据,造成重复消费。 设置成 1 就没得这个问题了, 不知道是不是 spring kafka 这个lib封装得有问题。
转载于:https://my.oschina.net/anuodog/blog/1824848
Kafka 入门和 Spring Boot 集成相关推荐
- Spring Boot 集成MyBatis
Spring Boot 集成MyBatis Spring Boot 系列 Spring Boot 入门 Spring Boot 属性配置和使用 Spring Boot 集成MyBatis Spring ...
- 从零搭建开发脚手架 Spring Boot集成Mybatis-plus之一
文章目录 简介 特性 框架结构 依赖集成 依赖 配置 编码 开始使用 核心功能 代码生成器 添加依赖 编码 编写配置 自定义模板引擎 自定义代码模板 自定义属性注入 字段其他信息查询注入 实战总结 常 ...
- Linux 安装Redis-6.2.5,配置及使用(RDB与AOF持久化、sentinel机制、主从复制、Spring Boot 集成 Redis)
CentOS 7 安装Redis-6.2.5版本 Redis采用的是基于内存的单进程 单线程模型 的KV数据库,由C语言编写.官方提供的数据是可以达到100000+的qps 应用场景: 令牌(Toke ...
- Spring boot集成axis2开发webservice 服务
Spring boot集成axis2开发webservice 服务 1.新建Spring boot 项目 此处省略... 项目结构如下: 2.添加Axis2依赖 <!--axis2版本信息--& ...
- spring boot 集成sleuth
spring boot 集成sleuth 1. 理论 1.1 sleuth是什么 1.2 sleuth有哪些 1.3 链路追踪的一些基本概念 1.4 zipkin的组成 2. zipkin 实例 2. ...
- 软件架构-Spring boot集成模板引擎swagger2实现
上次说过springboot其实就是一个CI工具,如何体验出来CI的作用就是持续集成,它可以集成各种的工具,这里说说关于模板的集成引擎和Swagger. (一)Spring boot 集成模板引擎实现 ...
- Spring Boot 集成 Flyway 实现数据库版本控制
在项目迭代开发中,难免会有更新数据库 Schema 的情况,比如添加新表.在表中增加字段或者删除字段等,那么当我对数据库进行一系列操作后,如何快速地在其他同事的电脑上同步?如何在测试/生产服务器上快速 ...
- Spring Boot集成Swagger导入YApi@无界编程
接口APi开发现状 现在开发接口都要在类似YApi上写文档,这样方便不同的团队之间协作,同步更新接口,提高效率. 但是如果接口很多,你一个个手工在YApi去录入无疑效率很低. 如果是使用Spring ...
- spring boot集成swagger,自定义注解,拦截器,xss过滤,异步调用,定时任务案例...
本文介绍spring boot集成swagger,自定义注解,拦截器,xss过滤,异步调用,定时任务案例 集成swagger--对于做前后端分离的项目,后端只需要提供接口访问,swagger提供了接口 ...
最新文章
- 你需要知道的高性能并发框架Disruptor原理
- 干货,Wireshark使用技巧-过滤规则
- vs下载python最后一点很慢-python读取大文件越来越慢的原因与解决
- python+selenium+Robot
- [转]MSBuild入门
- 接口方法上的注解无法被@Aspect声明的切面拦截的原因分析
- linux运行powershell,linux – 是否可以编写一个在bash / shell和PowerShell中运行的脚本?...
- UVA 2519 Radar Installtion
- Go1.18泛型使用详解(附最新gocode)
- android studio for android learning (二十四 )bitmap and bitmapFactory
- 2022电工杯数学建模A题目思路总结分享
- 基于安卓android studio的电影订票选座APP设计 前台后台
- 【统计学】利用spss正态分布假定检验 S-W检验 K-S检验 直方图 Q-Q图
- xilinx低延时视频编解码方案
- JAMA Psychiatry:老年抑郁症患者的神经影像学、认知、临床症状和遗传学的异质性表征
- 360全景倒车影像怎么看_360全景影像和倒车影像有什么区别
- 遥感基础——红外波段分类
- 优矿量化向导式因子选股
- 好用的用户准入控制管理系统
- Java桌面截图程序(带快捷键)
热门文章
- CNN结构:SPP-Net为CNNs添加空间尺度卷积-神经元层
- PCL第三方库:Eigen, Flann , Qhull, VTK, Boost简介
- 硅谷渐患“大城市”病,世界创新中心或将外移...
- x86_64编译JPEG遇到Invalid configuration `x86_64-unknown-linux-gnu'
- DBA日常工作内容和职责
- Nvidia DX10 Lighting例子解析
- Android Shape Drawable Resources
- Android——Fragment介绍
- flink1.12在ubuntu下面的编译记录(转载+自己整理)
- spring书籍调研