《SpringBoot2.0 实战》系列-整合kafka实现消息发送、消费
之前写过一篇关于ActiveMq的博文,有兴趣的小伙伴可以点击查看。但是ActiveMq总体性能表现一般,如果对消息队列性能要求较高,业务量较大,那我们需要重新考量。本文就介绍一款性能高的消息队列- kafka。首先看下它们的对比表:
对比 | Kafka | ActiveMQ |
---|---|---|
可用性 | 非常高,分布式,多副本备份 | 高,基于主从架构实现的高可用性 |
存储接口 | 文件存储,而且这些文件是顺序存储的 | 消息持久化机制有JDBC,KahaDB和LevelDB |
单机吞吐量 | 吞吐量非常大,可以达到10万级 | 吞吐量一般,万级,写入和读取message性能太低 |
消息传递模型 | PUB/SUB(发布/订阅) | P2P(点对点),PUB/SUB(发布/订阅) |
基础概念
- Broker:节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
- Topic:一类消息,即主题。
- Partition:topic物理上的分组,一个topic可以分为多个partition。
- Partition Offset:partition中的每个消息都有一个连续的序列号叫做offset。
- Producer : 生产者,生产message发送到topic。
- Consumer : 消费者,订阅topic消费message。
- Consumer Group:消费组,一个Group包含多个consumer。
安装
个人习惯使用docker来安装软件环境,可参考我的博客第5条:
https://blog.csdn.net/HXNLYW/article/details/88950291
如何整合
添加依赖,版本使用spring-boot-dependencies默认的就行。
<!--kafka -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
配置项
# kafka
spring: kafka:# kafka 代理地址bootstrap-servers: 47.103.5.190:9092producer:# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 发生错误后,消息重发的次数。retries: 0# 指定消息键和值的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer# topic主题gourd-topic: gourd-topicconsumer:# 指定消费者group idgroup-id: gourd-group# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 100# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: true# 指定消息键和值的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 消费topic主题gourd-topic: gourd-topiclistener:# 在侦听器容器中运行的线程数。concurrency: 5
增加配置类,根据配置项动态加载
/*** kafka消息队列配置* @author gour.hu*/
@Configuration
@ConditionalOnProperty(prefix = "spring.kafka",value = "bootstrap-servers")
@Import({KafkaConsumerListener.class})
public class KafkaConfig {}
封装消息发送工具类
@Slf4j
public class KafkaUtil {/*** 发送topic消息* @param topic* @param message*/public static void sendTopicMessage(String topic, String message){log.info("发送topic消息体:{}",message);KafkaTemplate<String,String> kafkaTemplate = SpringContextHolder.getBean(KafkaTemplate.class);ListenableFuture listenableFuture = kafkaTemplate.send(topic,message);listenableFuture.addCallback(o -> log.info("消息发送成功,{}", o.toString()),throwable -> log.info("消息发送失败,{}" + throwable.getMessage()));}
}
定义消费类逻辑
@Slf4j
public class KafkaConsumerListener {@KafkaListener(topics = {"${spring.kafka.consumer.gourd-topic}"}, groupId = "${spring.kafka.consumer.group-id}" ,containerFactory = "kafkaListenerContainerFactory")public void kafkaConsumerTest(ConsumerRecord<String, String> record) {log.info("消消费消息 topic = {} , content = {}",record.topic(),record.value());String messageText = record.value();// 业务代码......}
}
测试接口,本人喜欢使用swagger/ postman 测试
@Slf4j
@RestController
@RequestMapping("/mq/kafka")
@Api(tags = "kafka测试API")
@ConditionalOnBean({KafkaConfig.class})
public class KafkaController {@Value("${spring.kafka.producer.gourd-topic}")private String topic;@GetMapping("/sendMsg")@ApiOperation(value = "发送消息到主题")public BaseResponse sendMsg(@RequestParam("msg")String msg) {KafkaUtil.sendTopicMessage(topic,msg);return BaseResponse.ok("success!");}
}
这样基础的整合就好了,下面我们测试下。
测试附:
kafka可视化管理工具:kafka tool
下载地址:http://www.kafkatool.com/download.html
使用:https://www.jianshu.com/p/aa196f24f332源码:
代码均已上传至本人的开源项目
cloud-plus:https://blog.csdn.net/HXNLYW/article/details/104635673
《SpringBoot2.0 实战》系列-整合kafka实现消息发送、消费相关推荐
- 【SpringBoot实战系列】RabbitMQ实现消息发送并实现邮箱发送异常监控报警实战
大家好,我是工藤学编程
- WF4.0实战系列索引
从WF4.0 betal1出来的时候就开始使用WF4.0,由于资料不多,学习过程也非常艰苦.今年四月份的时候打算写WF4.0实战系列,由于今年是本命年故坚持写了24篇文章.这个系列的文章都有一个特点, ...
- springboot整合kafka_springboot整合kafka实现消息的发送消费
如下是springboot整合kafka的一个案例,方便需要的小伙伴. 启动kafka Server cd 到kafka的bin目录下:前提是启动zk./kafka-server-start.sh / ...
- 【kafka系列】kafka之生产者发送消息实践
目录 一.准备工作 二.终端命令 生产者命令 消费者命令 三.Java实践 搭建项目 异步发送-无回调 异步发送-有回调 同步发送 一.准备工作 进入实战之前先熟悉一下topic的相关命令,使用终端命 ...
- Knative 实战:基于 Kafka 实现消息推送
作者 | 元毅 阿里云智能事业群高级开发工程师 导读:当前在 Knative 中已经提供了对 Kafka 事件源的支持,那么如何基于 Kafka 实现消息推送呢?本文作者将以阿里云 Kafka 产品为 ...
- SpringBoot2.0之四 简单整合MyBatis
从最开始的SSH(Struts+Spring+Hibernate),到后来的SMM(SpringMVC+Spring+MyBatis),到目前的S(SpringBoot),随着框架的不断更新换代,也为 ...
- springboot2.0 多数据源整合问题 At least one JPA metamodel must be present! at
2019独角兽企业重金招聘Python工程师标准>>> 数据源代码: 第一个读取配置文件代码: package com.datasource;import org.apache.ib ...
- RocketMQ 实战-SpringBoot整合RocketMQ同步消息、异步消息、单向消息
官方样例:https://gitee.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md 1. 同步消息 producer向 bro ...
- springboot 整合kafka 实现生产,消费数据
一 kafka集群的启动 1.1 机器说明 192.168.152.128 master 192.168.152.129 slaver01 192.168.152.130 slaver02 1.2 查 ...
最新文章
- [转]CentOS 5.5下FTP安装及配置
- 链表面试题Java实现【重要】
- 表格合并行_Word制作验收单表格,很简单,快来学习吧
- python pycurl
- Codeforces Round #490 (Div. 3)
- HDU - 2196 Computer(树形dp)
- SAP CRM enterprise search index调试细节
- 如何在WhatsApp中将群聊静音
- 苹果CMSv10黑金色自适应网站模板
- netty 校验_Netty SSL双向验证
- 关系数据库的基本概念和MySQL说明
- android 传感器学习笔记 一
- python的httplib、urllib和urllib2的区别及应用
- 无问西东,哪怕重头来过
- Java 面试 ——可变参数、初始化数据块、设计秒杀系统
- win10鼠标灵敏度怎么调_和平精英,灵敏度到底怎么调?小编视频来教你!
- 山东理工大学ACM平台题答案关于C语言 1231 绝对值排序
- 万里汇WorldFirst人民币提现,1天内到账,太快了!
- 计算机内存占用过高,内存,教您电脑内存占用高怎么办
- 丰巢互动媒体的新玩法,智能柜焕新“皮肤”了解一下