pom.xml添加maven依赖

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.2.RELEASE</version>
</parent><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

spring boot会自动配置kafka,接下来只要配置yml属性文件和主题名配置。

application.yml配置kafka

spring:kafka:bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092producer:retries: 0batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:linger.ms: 1consumer:enable-auto-commit: falseauto-commit-interval: 100mskey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000

application.yml配置主题和消费者组

kafka:topic:group-id: topicGroupIdtopic-name:- topic1- topic2- topic3

新建KafkaTopicProperties

@ConfigurationProperties("kafka.topic")
public class KafkaTopicProperties implements Serializable {private String groupId;private String[] topicName;public String getGroupId() {return groupId;}public void setGroupId(String groupId) {this.groupId = groupId;}public String[] getTopicName() {return topicName;}public void setTopicName(String[] topicName) {this.topicName = topicName;}

添加KafkaTopicConfiguration

@Configuration
@EnableConfigurationProperties(KafkaTopicProperties.class)
public class KafkaTopicConfiguration {private final KafkaTopicProperties properties;public KafkaTopicConfiguration(KafkaTopicProperties properties) {this.properties = properties;}@Beanpublic String[] kafkaTopicName() {return properties.getTopicName();}@Beanpublic String topicGroupId() {return properties.getGroupId();}}

添加自己的service

@Service
public class IndicatorService {private Logger LOG = LoggerFactory.getLogger(IndicatorService.class);private final KafkaTemplate<Integer, String> kafkaTemplate;/*** 注入KafkaTemplate* @param kafkaTemplate kafka模版类*/@Autowiredpublic IndicatorService(KafkaTemplate kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@KafkaListener(topics = "#{kafkaTopicName}", groupId = "#{topicGroupId}")public void processMessage(ConsumerRecord<Integer, String> record) {LOG.info("kafka processMessage start");LOG.info("processMessage, topic = {}, msg = {}", record.topic(), record.value());// do something ...properties.getProperties();LOG.info("kafka processMessage end");}public void sendMessage(String topic, String data) {LOG.info("kafka sendMessage start");ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, data);future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {@Overridepublic void onFailure(Throwable ex) {LOG.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);}@Overridepublic void onSuccess(SendResult<Integer, String> result) {LOG.info("kafka sendMessage success topic = {}, data = {}",topic, data);}});LOG.info("kafka sendMessage end");}
}

至此就可以跑起来了,有什么不明白的可以留言。

Spring Boot集成kafka完整版相关推荐

  1. Spring Boot 集成Kafka java.lang.String is in module java.base of loader ‘bootstrap‘;

    异常信息:java.lang.String is in module java.base of loader 'bootstrap'; com.htcyaifline.common.kafka.dom ...

  2. Kafka 入门和 Spring Boot 集成

    2019独角兽企业重金招聘Python工程师标准>>> Kafka 入门和 Spring Boot 集成 概述 kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流 ...

  3. spring boot 集成sleuth

    spring boot 集成sleuth 1. 理论 1.1 sleuth是什么 1.2 sleuth有哪些 1.3 链路追踪的一些基本概念 1.4 zipkin的组成 2. zipkin 实例 2. ...

  4. Spring boot 项目Kafka Error connecting to node xxx:xxx Kafka项目启动异常 Failed to construct kafka consumer

    Spring boot 项目Kafka Error connecting to node xxx:xxx Spring boot Kafka项目启动异常 新建了一个springBoot集成Kafka的 ...

  5. Spring Boot集成Hazelcast实现集群与分布式内存缓存

    2019独角兽企业重金招聘Python工程师标准>>> Hazelcast是Hazelcast公司开源的一款分布式内存数据库产品,提供弹性可扩展.高性能的分布式内存计算.并通过提供诸 ...

  6. 【Java进阶】Spring Boot集成ES

    目录 spring boot集成ES ElasticSearchConfig 测试文档的基本操作 Elasticsearch Clients 文档 spring boot集成ES Java REST ...

  7. Spring Boot 集成 Elasticsearch 实战

    今天讲解下如何使用 Spring Boot 结合 ES. 可以在 ES 官方文档中发现,ES 为 Java REST Client 提供了两种方式的 Client:Java Low Level Cli ...

  8. Linux 安装Redis-6.2.5,配置及使用(RDB与AOF持久化、sentinel机制、主从复制、Spring Boot 集成 Redis)

    CentOS 7 安装Redis-6.2.5版本 Redis采用的是基于内存的单进程 单线程模型 的KV数据库,由C语言编写.官方提供的数据是可以达到100000+的qps 应用场景: 令牌(Toke ...

  9. Spring Boot集成支付宝(最新版SDK)—— 手机支付

    前言 前些日子写了一篇关于H5网页集成支付宝的文章: Spring Boot集成支付宝(最新版SDK)-- H5/网页支付 当时写了好久,往那一坐就是俩小时,写完直接就发布了,发布之后才感觉少点啥-- ...

最新文章

  1. C# Json 序列化与反序列化二
  2. 某油企产成品标准成本估算逻辑
  3. postgresql 创建用户_Liunx系统安装PostgreSQL数据库教程,值得程序员收藏pg安装教程
  4. 从性能参数到业务大数据,浅谈直播CDN服务监控
  5. 基于jsp的教师科研工作量_基于jsp+mysql的JSP教师科研信息管理系统
  6. 条件查询_SQL简单查询(条件查询 模糊查询)
  7. es6入门6--数组拓展运算符,Array.from()基本用法
  8. jsoup教程_3 Jsoup 讲解
  9. ArcGIS 设置暂时固定存储地址
  10. PASCAL VOC 2012 and SBD (the augment dataset) 总结
  11. java车牌号识别EasyPR_EasyPR
  12. php 进销存 源代码_PHP 进销存源码
  13. 飞控算法-姿态解算之互补滤波
  14. 夏昕的3部开发手册.- -
  15. 图片加水印怎么弄?这些图片加水印方法分享给你
  16. 关于Git使用详细教程
  17. Linux使用zip打包文件
  18. sed 技巧一例:特定位置插入
  19. dB,dBi和dBm的区别
  20. 椭圆曲线加密(ECC)

热门文章

  1. matlab-矩阵应用
  2. Vivado 随笔(6) Timing Summary 相关讨论(一)
  3. 【 C 】高级字符串查找之查找标记(token)函数 strtok介绍
  4. 使用ISE创建IP核(以加法器的IP核建立为例)
  5. FIR滤波器设计(包括Verilog HDL设计以及MATLAB设计)
  6. Oracle RAC(Real Application Clusters)
  7. MySQL--5子查询与连接小结
  8. Bootstrap3 栅格系统-媒体查询
  9. [转]Eclipse中的Web项目自动部署到Tomcat
  10. iOS和OS X中的bundle