springboot集成kafka

application.properties

spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,CentOSC:9092spring.kafka.producer.retries=5
spring.kafka.producer.acks=all
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.transaction-id-prefix=transaction-id-spring.kafka.consumer.group-id=group1
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.streams.application-id=wordcount_id
spring.kafka.streams.client-id=app1
spring.kafka.streams.auto-startup=true
spring.kafka.streams.state-dir=/Users/admin/Desktop/checkpoint
spring.kafka.streams.replication-factor=1
spring.kafka.streams.properties.processing.guarantee=exactly_once

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zhangxueliang</groupId><artifactId>springboot-kafka</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.5.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.0.1</version></dependency><!--测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

@KafkaListener消费消息

@SendTo转发消息

kafka web UI创建topic


KSQL的使用

mock发送消息


此时消息就被处理后(加了个后缀)发到了topic03中:

使用KafkaTemplate发送消息


数据是发往topic02,但是进行了转发,topic03会收到加了后缀的消息数据:

开启事务


开启事务后发送消息有两种编码方式:

  • 使用executeInTransaction方法


此时消息就被转发到了topic03:

  • 所在类加@Transactional注解


测试:

查看转发的消息:

springboot集成kafka及kafka web UI的使用相关推荐

  1. Spring boot2集成quartz动态任务管理+Web UI

    Dynamic Quartz ​ 最近在公司实习,发现公司有一套spring+Quartz的动态任务管理系统.可以使用Web界面进行任务动态的创建.删除.停止.运行和修改.刚好最近在学习spring ...

  2. kafka(组件分析 整合springboot集成 实战)

    kafka 组件 搭建 springboot集成 实战 kafka 组件 搭建 springboot集成 实战 1.应用场景 1.1 kafka场景 1.2 kafka特性 1.3 消息对比 1.4 ...

  3. SpringBoot集成Kafka低版本和高版本

    SpringBoot集成Kafka低版本和高版本 说明 地址 低版本SpringBoot集成Kafka代码 代码 kafka生产者配置 kafka消费者配置 发送消息给kafka的Controller ...

  4. 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例

    从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...

  5. SpringBoot集成Kafka

    SpringBoot集成Kafka 知识索引 SpringBoot集成Kafka工程搭建 SpringBoot集成Kafka配置 SpringBoot集成Kafka生产消息 SpringBoot集成K ...

  6. SpringBoot 集成 kafka,基于注解批量消费设置

    网上关于SpringBoot 集成kafka的批量消费功能需要手动创建类(这篇文章不错:[弄nèng - Kafka]应用篇(三) -- Springboot整合Kafka(批量消费)_司马缸砸缸了- ...

  7. SpringBoot集成Kafka消息队列

    1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...

  8. 【无废话】SpringBoot集成Kafka消息队列

    0.前言 本人整理收藏了22年多家公司面试知识点整理 ,以及各种Java核心知识点免费分享给大家,我认为对面试与学习来说是非常有用的,想要资料的话请点白嫖这份答案←戳我** 1.说明 Spring可以 ...

  9. springboot集成kafka消费手动启动停止

    项目场景: 在月结,或者某些时候,我们需要停掉kafka所有的消费端,让其暂时停止消费,而后等月结完成,再从新对消费监听恢复,进行消费,此动作不需要重启服务,最后源码下载 解决分析 KafkaList ...

最新文章

  1. LeetCode简单题之数字转换为十六进制数
  2. 「后端小伙伴来学前端了」Vue中为什么直接安装less-loader会报版本过高错误?你有没有思考过?
  3. CLAMAV 杀毒软件安装及使用配置
  4. mysql 中文含数字排序_英语| 124期 Sort 坑爹的字符串排序
  5. leetcode140. 单词拆分 II(回溯+记忆化)
  6. python 神经网络工具_神经网络15分钟入门!使用python从零开始写一个两层神经网络...
  7. EOS技术研究:合约与数据库交互
  8. UGUI ScrollRect使用
  9. stdafx.h简介
  10. 故宫商业大跃进 逐渐失控的IP洪流
  11. C语言求素数/质数最高效的方法
  12. 郝斌c语言96-99,《祁连山Photoshop CS3专家讲堂系列教程》[ISO]
  13. mysql查询条件为空时不作为条件_怎么实现当查询值为空时,不作为查询条件
  14. 数据恢复——在Windows 10中恢复永久删除的文件的5种方法
  15. 最新麦子学院嵌入式开发系列培训教程
  16. 桌面点右键,白屏怎么办
  17. ImageJ实践——测量大小/长短(以细胞为例)
  18. 在 WPS Word中成功使用zetero
  19. 电脑安装不了pr提示不满足系统要求怎么办?
  20. 最新百家姓-你排老几

热门文章

  1. 【Python】Python语言学习:设计模式、单例模式
  2. 【NLP】医学AI又一突破,微软开源生物医学NLP基准:BLURB
  3. 【Flask】快速入门后台写接口【API】
  4. 推荐系统炼丹笔记:推荐算法特征交叉新方式CAN
  5. 浅谈MySQL数据库中的锁与事务
  6. 网易青果后端系统设计窥探(含直播方案踩坑往事)
  7. 数据结构+算法——错题总结
  8. MS SQL自定义函数IsPositiveInteger MS SQL自定义函数IsNumeric 水晶报表使用IEnumerableT数据源...
  9. Ubantu使用笔记
  10. ubuntu出现HWE问题