第一步,引入pom文件

     <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

第二步:引入配置文件

spring:kafka:bootstrap-servers: 192.168.227.141:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: testenable-auto-commit: trueauto-commit-interval: 1000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

第三步:注入配置文件

package com.my.tool;import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.util.backoff.FixedBackOff;@Component
public class KafkaConfig {@Beanpublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory,KafkaTemplate<Object, Object> template) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory);factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), new FixedBackOff(0L, 2))); // dead-letter after 3 triesreturn factory;}}

第四步:发送消息

package com.my.tool;import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class KafkaSender {private final org.slf4j.Logger log = LoggerFactory.getLogger(getClass());@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;//发送消息方法public void send(String strjson) {log.info("+++++++++++++++++++++  message = {}", strjson);kafkaTemplate.send("test", strjson);}
}

第五步:接收消息

package com.my.tool;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Optional;@Component
public class KafkaReceiver {private final org.slf4j.Logger log = LoggerFactory.getLogger(getClass());@KafkaListener(topics = {"test"})public void listen(ConsumerRecord<?, ?> record) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();log.info("----------------- record =" + record);log.info("------------------ message =" + message);}}
}

第六步:写一个controller测试

package com.my.controller;import com.my.tool.KafkaSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import java.io.IOException;@RestController
public class PorduceController {@AutowiredKafkaSender kafkaSender;@GetMapping("/Porduce")public void Produce(String Message) throws IOException {kafkaSender.send(Message);}
}

SpringBoot之kafka对接topic相关推荐

  1. SpringBoot操作Kafka创建Topic、Producer、Consumer

    环境 kafka 2.6.0(安装步骤查看这里) 引入依赖 <dependency><groupId>org.springframework.boot</groupId& ...

  2. springboot集成kafka及kafka web UI的使用

    springboot集成kafka application.properties spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,Ce ...

  3. SpringBoot整合kafka(实现producer和consumer)

    转载自 SpringBoot整合kafka(实现producer和consumer) 在Windows环境下安装运行Kafka:https://www.jianshu.com/p/d64798e81f ...

  4. springboot 整合 kafka demo 顺便看一下源码

    大家好,我是烤鸭: 今天分享下 springboot 整合 kafka. 1.  环境参数: windows + kafka_2.11-2.3.0 + zookeeper-3.5.6 + spring ...

  5. SpringBoot整合kafka之kafka分区实战

    本文来说下SpringBoot整合kafka之kafka分区实战 文章目录 准备工作 程序代码 程序测试 本文小结 准备工作 当然我们也可以不手动创建topic,在执行代码kafkaTemplate. ...

  6. SpringBoot整合kafka实战之带回调的生产者

    本文来说下SpringBoot整合kafka部分知识内容 文章目录 带回调的生产者 方式一 方式二 本文小结 带回调的生产者 前面我们说了简单的生产和消费,本文说下带回调的生产者.kafkaTempl ...

  7. SpringBoot 重置 kafka 偏移量offset(kafka-0.10.1.0)

    如果使用的自动提交偏移量的模式,偏移量会给到kafka或者zk进行管理,其中kafka的偏移量重置给了重新消费kafka内未过期的数据提供了机会,当消费者出错,比如消费了数据,但是中途处理失败,导致数 ...

  8. 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例

    从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...

  9. Springboot与Kafka的小插曲

    目录 1.背景 2.环境 3.应用 1)pom.xml 2)application.yml 3)main方法 4)Entity 5)ServiceImpl 6)controller 4.遇到的问题 5 ...

最新文章

  1. 在linux系统上运行新加的内核模块(驱动模块) 需要安装的东西
  2. Synchronize对象属性改变
  3. 关于MM的几个经典问题及回答
  4. ubuntu14.04禁用USB外存储设备
  5. 主机无法访问虚拟机的httpd服务
  6. Wordcounter,使用Lambdas和Fork / Join计算Java中的单词数
  7. Android Hook技术防范漫谈
  8. EventBus猜想 ----手把手带你自己实现一个EventBus
  9. 手写数字识别代码,可以跑通
  10. mysql linux root密码忘记了怎么办,linux下忘记mysql的root密码解决办法 | 严佳冬
  11. 10 个实用技巧,让 Finder 带你飞
  12. 【Java从0到架构师】SpringCloud - Eureka、Ribbon、Feign
  13. Jquery头像编辑器
  14. 厦门大学 软件学院 夏令营
  15. 7.13 编程序,比较字符串大小。
  16. Windows 10创建用户
  17. 鸿蒙系统深度解读(三)
  18. 三针风扇接法_三针和四针CPU风扇有什么区别?
  19. 统计综合指标有哪些?
  20. 节日头像小程序源码,直接部署可用!

热门文章

  1. RDKit化学式 分子式搜索
  2. 高并发架构系列:Redis缓存和MySQL数据一致性方案详解
  3. 12月 Web 服务器调查:nginx 增长最快,微软市场份额最高
  4. 鸿蒙系统是一场营销,品牌营销专家点评鸿蒙,华为内部定位有点乱别搞砸了
  5. linux 域名怎么平台,linux平台搭建DNS域名服务与常用配置
  6. 手机端php mime设置,php,_如何使用MIME协议配合表单在iphone手机上下载文件?,php - phpStudy...
  7. 语言怎么得到直流电压并采样_250V10A高频直流电源/大电流直流稳压稳流电源
  8. 关于JavaScript的词法作用域及变量提升的个人理解
  9. JS键盘KEYCODE值参考
  10. MyBatis学习总结_03_优化MyBatis配置文件中的配置