2019独角兽企业重金招聘Python工程师标准>>>

Kafka 入门和 Spring Boot 集成

概述

kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流指的是数据流)。由java 和 Scala 语言编写,最早由 LinkedIn 开发,并 2011年开源,现在由 Apache 开发维护。

应用场景

下面列举了一些kafka常见的应用场景。

消息队列 : Kafka 可以作为消息队列使用,可用于系统内异步解耦,流量削峰等场景。

应用监控:利用 Kafka 采集应用程序和服务器健康相关的指标,如应用程序相关的日志,服务器相关的 CPU、占用率、 IO、内存、连接数、 TPS、 QPS等,然后将指标信息进行处理,从而构建一个具有监控仪表盘、曲线图等可视化监控系统。 例如, 很多公司采用 Kafka 与 ELK(ElasticSearch、 Logstash 和Kibana)整合构建应用服务的监控系统。

流处理:比如将 kafka 接收到的数据发送给 Storm 流式计算框架处理。

基本概念

record(消息):kafka 通信的基本单位,每一条消息称为record

producer (生产者 ):发送消息的客户端。

consumer(消费者 ):消费消息的客户端。

consumerGroup (消费者组):每一个消费者都属于一个特定的消费者组。

消费者和消费者组的关系

  • 如果a,b,c 属于同一个消费者组,那一条消息只能被 a,b,c 中的某一个消费者消费。
  • 如果a,b,c 属于不同的消费者组(比如 ga,gb,gc) ,那一条消息过来,a,b,c 三个消费者都能消费到。

topic (主题): kafka的消息通过topic来分类,类似于数据库的表。 producer 发布消息到 topic,consumer订阅 topic 进行消费

partition( 分区):一个topic会被分成一到多个分区(partition),然后多个分区可以分布在不同的机器上,这样一个主题就相当于运行在了多台机子上,kafka用分区的方式提高了性能和吞吐量

replica (副本):一个分区有一到多个副本,副本的作用是提高分区的 可用性。

offset(偏移量):偏移量 类似数据库自增int Id,随着数据的不断写入 kafka 分区内的偏移量会不断增加,一条消息由一个唯一的偏移量来标识。偏移量的作用是,让消费者知道自己消费到了哪个位置,下次可以接着从这里消费。如下图: 消费者A 消费到了 offset 为 9 的记录,消费者 B 消费到了offset 为 11 的记录。

基本结构

kafka 最基本的结构如下,跟常见的消息队列结构一样。 消息通过生产者发送到 kafka 集群, 然后消费者从 kafka 集群拉取消息进行消费。

和Spring Boot 集成

集成概述

本集成方式采用的是 spring boot 官方文档说的集成方式,官方链接,集成的大体思路是,通过在 spring boot application.properties 中配置 生产者和消费者的基本信息,然后spring boot 启动后会创建 KafkaTemplate 对象,这个对象可以用来发送消息到Kafka,然后用 @KafkaListener 注解来消费 kafka 里面的消息,具体步骤如下。

集成环境

spring boot:1.5.13 版本 spring-kafka:1.3.5 版本 kafka:1.0.1 版本

kafka 环境搭建

先启动Zookeeper:

docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime zookeeper:latest

再启动Kafka:替换下面的IP为你服务器IP即可

docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.10.253 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:1.0.1

Spring Boot 和 Spring for Apache Kafka 集成步骤

  1. 首先pom中引入 Spring for Apache Kafka
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
  1. 然后 application.properties 配置文件中加入如下配置: 各个配置的解释见:spring boot 附录中的 kafka 配置,搜索kafka 关键字即可定位。
server.port=8090####### kafka### producer 配置
spring.kafka.producer.bootstrap-servers=192.168.10.48:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer### consumer 配置
spring.kafka.consumer.bootstrap-servers=192.168.10.48:9092
spring.kafka.consumer.group-id=anuoapp
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.concurrency=5
  1. 创建 Kafka Producer 生产者
package com.example.anuoapp.kafka;import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;@Component
public class KafkaProducer {@AutowiredKafkaTemplate kafkaTemplate;public void kafkaSend() throws Exception {UserAccount userAccount=new UserAccount();userAccount.setCard_name("jk");userAccount.setAddress("cd");ListenableFuture send = kafkaTemplate.send("jktopic", "key", JSON.toJSONString(userAccount));}
}
  1. 创建 Kafka Consumer 消费者
package com.example.anuoapp.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumer {public static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);@KafkaListener(topics = {"jktopic"})public void jktopic(ConsumerRecord consumerRecord) throws InterruptedException {System.out.println(consumerRecord.offset());System.out.println(consumerRecord.value().toString());Thread.sleep(3000);}}
  1. 创建一个rest api 来调用 Kafka 的消息生产者
package com.example.anuoapp.controller;import com.example.anuoapp.kafka.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/api/system")
public class SystemController {private Logger logger = LoggerFactory.getLogger(SystemController.class);@AutowiredKafkaProducer kafkaProducer;@RequestMapping(value = "/Kafka/send", method = RequestMethod.GET)public void WarnInfo() throws Exception {int count=10;for (int i = 0; i < count; i++) {kafkaProducer.kafkaSend();}}}
  1. 用 post man 调用 第 5 步创建的接口, 就可以看到 如下消费者产生的输出信息
30
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}
31
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}
32
{"address":"cd","bind_qq":false,"bind_weixin":false,"card_name":"jk","passwordDirectCompare":false}

最后

恭喜你 ! spring boot kafka 集成完毕。

完整的基础源码见: 链接: https://pan.baidu.com/s/1E2Lmbj9A9uruTXG54uPl_g 密码: e6d6

踩过的坑

  1. 集成 spring kafka的时候注意版本要选对 ,目前spring boot 1.5.1 对应集成 spring kafka 1.x 版本 , 如果直接集成 spring kafka 2.x 版本 的话会报错, 要用 spring kafka 2.x 的话, 需要升级spring boot 到 2.x 版本。

  2. 消费者配置的时候 spring.kafka.consumer.max-poll-records=1 ,轮询拉取的最大记录数要设置成1,不然会出现重复消费,即:如果消费者程序崩了,再启动起来,会消费到以前消费过的数据,造成重复消费。 设置成 1 就没得这个问题了, 不知道是不是 spring kafka 这个lib封装得有问题。

转载于:https://my.oschina.net/anuodog/blog/1824848

Kafka 入门和 Spring Boot 集成相关推荐

  1. Spring Boot 集成MyBatis

    Spring Boot 集成MyBatis Spring Boot 系列 Spring Boot 入门 Spring Boot 属性配置和使用 Spring Boot 集成MyBatis Spring ...

  2. 从零搭建开发脚手架 Spring Boot集成Mybatis-plus之一

    文章目录 简介 特性 框架结构 依赖集成 依赖 配置 编码 开始使用 核心功能 代码生成器 添加依赖 编码 编写配置 自定义模板引擎 自定义代码模板 自定义属性注入 字段其他信息查询注入 实战总结 常 ...

  3. Linux 安装Redis-6.2.5,配置及使用(RDB与AOF持久化、sentinel机制、主从复制、Spring Boot 集成 Redis)

    CentOS 7 安装Redis-6.2.5版本 Redis采用的是基于内存的单进程 单线程模型 的KV数据库,由C语言编写.官方提供的数据是可以达到100000+的qps 应用场景: 令牌(Toke ...

  4. Spring boot集成axis2开发webservice 服务

    Spring boot集成axis2开发webservice 服务 1.新建Spring boot 项目 此处省略... 项目结构如下: 2.添加Axis2依赖 <!--axis2版本信息--& ...

  5. spring boot 集成sleuth

    spring boot 集成sleuth 1. 理论 1.1 sleuth是什么 1.2 sleuth有哪些 1.3 链路追踪的一些基本概念 1.4 zipkin的组成 2. zipkin 实例 2. ...

  6. 软件架构-Spring boot集成模板引擎swagger2实现

    上次说过springboot其实就是一个CI工具,如何体验出来CI的作用就是持续集成,它可以集成各种的工具,这里说说关于模板的集成引擎和Swagger. (一)Spring boot 集成模板引擎实现 ...

  7. Spring Boot 集成 Flyway 实现数据库版本控制

    在项目迭代开发中,难免会有更新数据库 Schema 的情况,比如添加新表.在表中增加字段或者删除字段等,那么当我对数据库进行一系列操作后,如何快速地在其他同事的电脑上同步?如何在测试/生产服务器上快速 ...

  8. Spring Boot集成Swagger导入YApi@无界编程

    接口APi开发现状 现在开发接口都要在类似YApi上写文档,这样方便不同的团队之间协作,同步更新接口,提高效率. 但是如果接口很多,你一个个手工在YApi去录入无疑效率很低. 如果是使用Spring ...

  9. spring boot集成swagger,自定义注解,拦截器,xss过滤,异步调用,定时任务案例...

    本文介绍spring boot集成swagger,自定义注解,拦截器,xss过滤,异步调用,定时任务案例 集成swagger--对于做前后端分离的项目,后端只需要提供接口访问,swagger提供了接口 ...

最新文章

  1. 你需要知道的高性能并发框架Disruptor原理
  2. 干货,Wireshark使用技巧-过滤规则
  3. vs下载python最后一点很慢-python读取大文件越来越慢的原因与解决
  4. python+selenium+Robot
  5. [转]MSBuild入门
  6. 接口方法上的注解无法被@Aspect声明的切面拦截的原因分析
  7. linux运行powershell,linux – 是否可以编写一个在bash / shell和PowerShell中运行的脚本?...
  8. UVA 2519 Radar Installtion
  9. Go1.18泛型使用详解(附最新gocode)
  10. android studio for android learning (二十四 )bitmap and bitmapFactory
  11. 2022电工杯数学建模A题目思路总结分享
  12. 基于安卓android studio的电影订票选座APP设计 前台后台
  13. 【统计学】利用spss正态分布假定检验 S-W检验 K-S检验 直方图 Q-Q图
  14. xilinx低延时视频编解码方案
  15. JAMA Psychiatry:老年抑郁症患者的神经影像学、认知、临床症状和遗传学的异质性表征
  16. 360全景倒车影像怎么看_360全景影像和倒车影像有什么区别
  17. 遥感基础——红外波段分类
  18. 优矿量化向导式因子选股
  19. 好用的用户准入控制管理系统
  20. Java桌面截图程序(带快捷键)

热门文章

  1. CNN结构:SPP-Net为CNNs添加空间尺度卷积-神经元层
  2. PCL第三方库:Eigen, Flann , Qhull, VTK, Boost简介
  3. 硅谷渐患“大城市”病,世界创新中心或将外移...
  4. x86_64编译JPEG遇到Invalid configuration `x86_64-unknown-linux-gnu'
  5. DBA日常工作内容和职责
  6. Nvidia DX10 Lighting例子解析
  7. Android Shape Drawable Resources
  8. Android——Fragment介绍
  9. flink1.12在ubuntu下面的编译记录(转载+自己整理)
  10. spring书籍调研