Spring Boot集成kafka完整版
pom.xml添加maven依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.2.RELEASE</version>
</parent><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
spring boot会自动配置kafka,接下来只要配置yml属性文件和主题名配置。
application.yml配置kafka
spring:kafka:bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092producer:retries: 0batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:linger.ms: 1consumer:enable-auto-commit: falseauto-commit-interval: 100mskey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000
application.yml配置主题和消费者组
kafka:topic:group-id: topicGroupIdtopic-name:- topic1- topic2- topic3
新建KafkaTopicProperties
@ConfigurationProperties("kafka.topic")
public class KafkaTopicProperties implements Serializable {private String groupId;private String[] topicName;public String getGroupId() {return groupId;}public void setGroupId(String groupId) {this.groupId = groupId;}public String[] getTopicName() {return topicName;}public void setTopicName(String[] topicName) {this.topicName = topicName;}
添加KafkaTopicConfiguration
@Configuration
@EnableConfigurationProperties(KafkaTopicProperties.class)
public class KafkaTopicConfiguration {private final KafkaTopicProperties properties;public KafkaTopicConfiguration(KafkaTopicProperties properties) {this.properties = properties;}@Beanpublic String[] kafkaTopicName() {return properties.getTopicName();}@Beanpublic String topicGroupId() {return properties.getGroupId();}}
添加自己的service
@Service
public class IndicatorService {private Logger LOG = LoggerFactory.getLogger(IndicatorService.class);private final KafkaTemplate<Integer, String> kafkaTemplate;/*** 注入KafkaTemplate* @param kafkaTemplate kafka模版类*/@Autowiredpublic IndicatorService(KafkaTemplate kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@KafkaListener(topics = "#{kafkaTopicName}", groupId = "#{topicGroupId}")public void processMessage(ConsumerRecord<Integer, String> record) {LOG.info("kafka processMessage start");LOG.info("processMessage, topic = {}, msg = {}", record.topic(), record.value());// do something ...properties.getProperties();LOG.info("kafka processMessage end");}public void sendMessage(String topic, String data) {LOG.info("kafka sendMessage start");ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, data);future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {@Overridepublic void onFailure(Throwable ex) {LOG.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);}@Overridepublic void onSuccess(SendResult<Integer, String> result) {LOG.info("kafka sendMessage success topic = {}, data = {}",topic, data);}});LOG.info("kafka sendMessage end");}
}
至此就可以跑起来了,有什么不明白的可以留言。
Spring Boot集成kafka完整版相关推荐
- Spring Boot 集成Kafka java.lang.String is in module java.base of loader ‘bootstrap‘;
异常信息:java.lang.String is in module java.base of loader 'bootstrap'; com.htcyaifline.common.kafka.dom ...
- Kafka 入门和 Spring Boot 集成
2019独角兽企业重金招聘Python工程师标准>>> Kafka 入门和 Spring Boot 集成 概述 kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流 ...
- spring boot 集成sleuth
spring boot 集成sleuth 1. 理论 1.1 sleuth是什么 1.2 sleuth有哪些 1.3 链路追踪的一些基本概念 1.4 zipkin的组成 2. zipkin 实例 2. ...
- Spring boot 项目Kafka Error connecting to node xxx:xxx Kafka项目启动异常 Failed to construct kafka consumer
Spring boot 项目Kafka Error connecting to node xxx:xxx Spring boot Kafka项目启动异常 新建了一个springBoot集成Kafka的 ...
- Spring Boot集成Hazelcast实现集群与分布式内存缓存
2019独角兽企业重金招聘Python工程师标准>>> Hazelcast是Hazelcast公司开源的一款分布式内存数据库产品,提供弹性可扩展.高性能的分布式内存计算.并通过提供诸 ...
- 【Java进阶】Spring Boot集成ES
目录 spring boot集成ES ElasticSearchConfig 测试文档的基本操作 Elasticsearch Clients 文档 spring boot集成ES Java REST ...
- Spring Boot 集成 Elasticsearch 实战
今天讲解下如何使用 Spring Boot 结合 ES. 可以在 ES 官方文档中发现,ES 为 Java REST Client 提供了两种方式的 Client:Java Low Level Cli ...
- Linux 安装Redis-6.2.5,配置及使用(RDB与AOF持久化、sentinel机制、主从复制、Spring Boot 集成 Redis)
CentOS 7 安装Redis-6.2.5版本 Redis采用的是基于内存的单进程 单线程模型 的KV数据库,由C语言编写.官方提供的数据是可以达到100000+的qps 应用场景: 令牌(Toke ...
- Spring Boot集成支付宝(最新版SDK)—— 手机支付
前言 前些日子写了一篇关于H5网页集成支付宝的文章: Spring Boot集成支付宝(最新版SDK)-- H5/网页支付 当时写了好久,往那一坐就是俩小时,写完直接就发布了,发布之后才感觉少点啥-- ...
最新文章
- C# Json 序列化与反序列化二
- 某油企产成品标准成本估算逻辑
- postgresql 创建用户_Liunx系统安装PostgreSQL数据库教程,值得程序员收藏pg安装教程
- 从性能参数到业务大数据,浅谈直播CDN服务监控
- 基于jsp的教师科研工作量_基于jsp+mysql的JSP教师科研信息管理系统
- 条件查询_SQL简单查询(条件查询 模糊查询)
- es6入门6--数组拓展运算符,Array.from()基本用法
- jsoup教程_3 Jsoup 讲解
- ArcGIS 设置暂时固定存储地址
- PASCAL VOC 2012 and SBD (the augment dataset) 总结
- java车牌号识别EasyPR_EasyPR
- php 进销存 源代码_PHP 进销存源码
- 飞控算法-姿态解算之互补滤波
- 夏昕的3部开发手册.- -
- 图片加水印怎么弄?这些图片加水印方法分享给你
- 关于Git使用详细教程
- Linux使用zip打包文件
- sed 技巧一例:特定位置插入
- dB,dBi和dBm的区别
- 椭圆曲线加密(ECC)