文章目录

  • 1. 添加Maven依赖
  • 2. 配置与参数分离
  • 3. 工具类度内容
  • 4. Producer 消息生产者配置
  • 5. Consumer 消息消费者配置
  • 6. 使用注解监听消息
  • 7. 请求测试
  • 8. 测试结果

1. 添加Maven依赖

<!-- 添加spring-kafka支持 -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.2.RELEASE</version>
</dependency>

2. 配置与参数分离

使用kafka.properties文件形式,将配置与参数分离,方便管理。
kafka.properties文件如下:

################## kafkaListener Producer 发送端 ##################
# brokers 集群
kafka.producer.bootstrap.servers = 192.168.43.242:9092,192.168.43.134:9092,192.168.43.228:9092#发送端 id
kafka.producer.client.id = producerDemo#发送端确认模式
kafka.producer.acks = -1#发送失败重试次数
kafka.producer.retries = 3#批处理条数,当多个记录被发送至统一分区时,producer对于同一个分区来说,会按照 batch.size 的大小进行统一收集,批量发送
kafka.producer.batch.size = 4096#与 batch.size 配合使用。延迟统一收集,产生聚合,然后批量发送至broker
kafka.producer.linger.ms = 10# 33554432 即32MB的批处理缓冲区
#kafka.producer.buffer.memory = 40960#默认 topic
kafka.producer.defaultTopic = testTopic#key 序列化
kafka.producer.key.serializer = org.apache.kafka.common.serialization.StringSerializer#value 序列化
kafka.producer.value.serializer = org.apache.kafka.common.serialization.StringSerializer################## kafkaListener Consumer 消费端 ###################消费端 brokers 集群
kafka.consumer.bootstrap.servers = 192.168.43.242:9092,192.168.43.134:9092,192.168.43.228:9092#消费者 group.id 组ID
kafka.consumer.group.id = test-group#消费者消费消息后,进行自动提交
kafka.consumer.enable.auto.commit = true#自动提交的频率(与 enable.auto.commit = true 属性配合使用)
kafka.consumer.auto.commit.interval.ms = 1000#新的groupid,是否从头开始消费
kafka.consumer.auto.offset.reset = earliest#在使用kafka组管理时,发送心跳机制,用于检测消费者故障的超时
#kafka.consumer.session.timeout.ms = 1000#key 反序列化
kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer#value 反序列化
kafka.consumer.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer#消费端消费的topic
kafka.consumer.topic = testTopic

3. 工具类度内容

添加 PropertiesUtils 工具类,来读取 properties文件内容

package com.demo.utils;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;public class PropertiesUtils {private static Logger log = LoggerFactory.getLogger(PropertiesUtils.class);/*** 根据文件名获取Properties对象* @param fileName* @return*/public static Properties read(String fileName){InputStream in = null;try{Properties prop = new Properties();//InputStream in = Object.class.getResourceAsStream("/"+fileName);in = PropertiesUtils.class.getClassLoader().getResourceAsStream(fileName);if(in == null){return null;}prop.load(in);return prop;}catch(Exception e){e.printStackTrace();}finally{try {if(in != null){in.close();}} catch (IOException e) {e.printStackTrace();}}return null;}/*** 根据文件名和键名获取值* @param fileName* @param key* @return*/public static String readKeyValue(String fileName, String key){Properties prop = read(fileName);if(prop != null){return prop.getProperty(key);}return null;}/*** 根据键名获取值* @param prop* @param key* @return*/public static String readKeyValue(Properties prop, String key){if(prop != null){return prop.getProperty(key);}return null;}/*** 写入* @param fileName* @param key* @param value*/public static void writeValueByKey(String fileName, String key, String value){Map<String, String> properties = new HashMap<String, String>();properties.put(key, value);writeValues(fileName, properties);}/*** 写入* @param fileName* @param properties*/public static void writeValues(String fileName, Map<String, String> properties){InputStream in = null;OutputStream out = null;try {in = PropertiesUtils.class.getClassLoader().getResourceAsStream(fileName);if(in == null){throw new RuntimeException("读取的文件("+fileName+")不存在,请确认!");               }Properties prop = new Properties();prop.load(in);String path = PropertiesUtils.class.getResource("/"+fileName).getPath();out = new FileOutputStream(path);if(properties != null){Set<String> set = properties.keySet();for (String string : set) {prop.setProperty(string, properties.get(string));log.info("更新"+fileName+"的键("+string+")值为:"+properties.get(string));}}prop.store(out, "update properties");} catch (Exception e) {e.printStackTrace();} finally{try {if(in != null){in.close();}if(out != null){out.flush();out.close();}} catch (Exception e2) {e2.printStackTrace();}}}public static void main(String[] args) throws Exception {//System.out.println("read="+read("config.properties"));//System.out.println("readKeyValue="+readKeyValue("config.properties","superAdmin"));//writeValueByKey(CC.WEIXI_PROPERTIES, "access_token", "ddd");Map<String, String> properties = new HashMap<String, String>();properties.put("access_token", "ddd2");properties.put("access_token1", "ee2");properties.put("bbbb", "bbbb");}
}

4. Producer 消息生产者配置

package com.demo.kafka;import com.demo.utils.PropertiesUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;/*** Kafka 消息生产者配置*/
@Configurable
@Component
@EnableKafka
public class KafkaProducerConfig {Properties properties = PropertiesUtils.read("kafka.properties");public KafkaProducerConfig() {System.out.println("kafka 生产者配置加载...");}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory(producerProperties());}public Map<String, Object> producerProperties() {Map<String, Object> props = new HashMap<String, Object>();//Kafka服务地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("kafka.producer.bootstrap.servers"));//设置当前客户端idprops.put(ProducerConfig.CLIENT_ID_CONFIG, properties.getProperty("kafka.producer.client.id"));//设置消费端确认机制props.put(ProducerConfig.ACKS_CONFIG, properties.getProperty("kafka.producer.acks"));//发送失败重试次数props.put(ProducerConfig.RETRIES_CONFIG, properties.getProperty("kafka.producer.retries"));//批处理条数,当多个记录被发送至统一分区时,producer对于同一个分区来说,会按照 batch.size 的大小进行统一收集,批量发送props.put(ProducerConfig.BATCH_SIZE_CONFIG, properties.getProperty("kafka.producer.batch.size"));//与 batch.size 配合使用。延迟统一收集,产生聚合,然后批量发送至brokerprops.put(ProducerConfig.LINGER_MS_CONFIG,properties.getProperty("kafka.producer.linger.ms"));//Key序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.producer.key.serializer"));//Value序列化props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.producer.value.serializer"));return props;}@Beanpublic KafkaTemplate<String,String> kafkaTemplate(){KafkaTemplate<String,String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory(),true);//设置默认的topic(此处可做一些具体设置)kafkaTemplate.setDefaultTopic(properties.getProperty("kafka.producer.defaultTopic"));return kafkaTemplate;}
}

5. Consumer 消息消费者配置

package com.demo.kafka;import com.demo.utils.PropertiesUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;@Configurable
@Component
@EnableKafka
public class KafkaConsumerConfig {Properties properties = PropertiesUtils.read("kafka.properties");public KafkaConsumerConfig() {System.out.println("kafka消费者配置加载...");}public Map<String, Object> consumerProperties() {Map<String, Object> props = new HashMap<String, Object>();//Kafka服务地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("kafka.consumer.bootstrap.servers"));//消费组props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getProperty("kafka.consumer.group.id"));//设置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, properties.getProperty("kafka.consumer.enable.auto.commit"));//设置间隔时间props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, properties.getProperty("kafka.consumer.auto.commit.interval.ms"));//Key反序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.consumer.key.deserializer"));//Value反序列化props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.consumer.value.deserializer"));//从头开始消费props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getProperty("kafka.consumer.auto.offset.reset"));return props;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<String, String>(consumerProperties());}@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic KafkaConsumerListener kafkaConsumerListener() {return new KafkaConsumerListener();}
}

6. 使用注解监听消息

通过 @KafkaListener 注解的形式,消费topic中的消息

public class KafkaConsumerListener {@KafkaListener(topics = "testTopic01")public void listen01(ConsumerRecord<String,String> consumerRecord){System.out.println("开始消费testTopic01的消息");System.out.println("消费者线程:"+Thread.currentThread().getName()+"[ 消息 来自kafkatopic:"+consumerRecord.topic()+",分区:"+consumerRecord.partition() +" ,委托时间:"+consumerRecord.timestamp()+"]消息内容如下:");System.out.println(consumerRecord.value());}@KafkaListener(topics = "testTopic02")public void listen02(ConsumerRecord<String,String> consumerRecord){System.out.println("开始消费testTopic02的消息");System.out.println(consumerRecord.value());}/*** 消费 某个topic 下指定分区的消息*/@KafkaListener(topicPartitions = {@TopicPartition(topic = "liuzebiao",partitions = {"1"})})public void topicMessage(ConsumerRecord<?, ?> record,String content){System.out.println("消息:"+ content);System.out.println("消息被消费------>Topic:"+ record.topic() + ",------>Value:" + record.value() +",------>Partition:" + record.partition());}
}

7. 请求测试

通过Spring Controller的形式,开始测试

@Controller
@RequestMapping("kafka")
public class KafkaController {@AutowiredKafkaTemplate kafkaTemplate;/*** 消息发送*/@RequestMapping("producer")@ResponseBodypublic void producer(){kafkaTemplate.send("testTopic01","producer发送消息01");kafkaTemplate.send("testTopic02","producer发送消息02");}}

8. 测试结果

通过 localhost:8080/kafka/producer 调用接口,使用 kafkaTemplate 发送消息。

消息发送成功后,在控制台会收到如下信息:

开始消费testTopic01的消息
消费者线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1[ 消息 来自kafkatopic:testTopic01,分区:0 ,委托时间:1568107936693]消息内容如下:
producer发送消息01开始消费testTopic02的消息
producer发送消息02

项目链接:
https://github.com/gb-heima/spring-annotataion-kafka

【基于注解方式】Spring整合Kafka相关推荐

  1. spring IOC容器 Bean 管理——基于注解方式

    IOC 操作 Bean 管理(基于注解方式) 1.什么是注解 ​ (1)注解是代码特殊标记,格式:@注解名称(属性名称=属性值, 属性名称=属性值-) ​ (2)使用注解,注解作用在类上面,方法上面, ...

  2. JavaEE框架整合之基于注解的SSH整合

    基于注解的SSH整合 开发环节步骤: 实体类 -----> DAO开发 -----> Service开发 -----> Action动作类开发 -----> 配置文件(appl ...

  3. Elasticsearch-mapper 基于注解方式生成mapping(2.0以上)

    Elasticsearch生成mapping的方式上有多种方式,我们可以把mapping做成配置文件,也可以用spring-data-elasticsearch基于注解生成. 在基于注解生成这种方式上 ...

  4. java datasource 配置_Spring boot 基于注解方式配置datasource

    Spring boot 基于注解方式配置datasource Xml配置 我们先来回顾下,使用xml配置数据源. 步骤: 先加载数据库相关配置文件; 配置数据源; 配置sqlSessionFactor ...

  5. Spring整合Kafka

    文章目录 Spring整合Kafka 一.引入依赖 二.配置kafka 三.测试代码--如何用kafka 3.1 KafkaTests 3.2 测试结果 Spring整合Kafka 一.引入依赖 &l ...

  6. spring的依赖注入 -------基于注解方式

    前言: 做了2年的软件,刚开始入行的时候,没有个目标基本上都是在摸索,技术看的我眼花缭乱,这个想学,那个也想学结果是对很多技术一知半解的,工作中才发现,我们要掌握的一门可以搞定快速开发搞定所有业务需求 ...

  7. Spring Boot基于注解方式处理接口数据脱敏

    1.定义注解 创建Spring Boot项目添加以下依赖 <dependencies><dependency><groupId>org.springframewor ...

  8. 基于注解的Spring AOP的配置和使用--转载

    AOP是OOP的延续,是Aspect Oriented Programming的缩写,意思是面向切面编程.可以通过预编译方式和运行期动态代理实现在不修改源代码的情况下给程序动态统一添加功能的一种技术. ...

  9. Java Web之基于注解的Spring MVC环境配置

    1.在web.xml中配置前端控制器,拦截请求,然后配置加载SpringMVC的配置文件(处理器映射器.处理器适配器.视图解析器等) <!-- springmvc前端控制器 -->< ...

  10. spring入门第二讲 bean的生命周期 装配方式 Spring整合Junit

    bean的生命周期 实体类 //初始化 public void init(){System.out.println("--初始化--"); }//销毁 public void de ...

最新文章

  1. MySQL 5.6.26 Release Note解读
  2. 动态给a标签赋值_怎样利用Excel制作抖音上的心形动态函数图像?
  3. 深度优先搜索c语言详解,深度优先搜索 — C语言版
  4. 分布式架构--基本思想汇总
  5. java cellvalue_Java Cell.getErrorCellValue方法代码示例
  6. 2019 CCPC 秦皇岛F Forest Program(dfs)
  7. CentOS下通过yum安装svn及配置
  8. python3 二进制文件比较_《Python 3程序开发指南(第2版•修订版)》——7.4 随机存取二进制文件...
  9. 网站如何优化才是成功的
  10. java完全自学手册txt下载
  11. sai笔记4-加高光/图层切换颜色
  12. 淘宝上的零食能买吃吗?网上进口食品的秘密。
  13. iOS历史回顾(iOS1~iOS8)
  14. alpha测试和betal测试
  15. Ubuntu18.04 wifi已连接却没办法上网~代理服务器出现问题
  16. 3.2收缩-扩张喷管实例
  17. Security+新版601考过啦,分享我的备考经验
  18. WPF企业内训全程实录(上)
  19. hadoop和spark安装包 网盘地址
  20. vue+ElementUI 实现管理端照片墙(或广告位)效果

热门文章

  1. 一种基于邻域的聚类算法
  2. 归并排序的实现-代码
  3. C语言实验——数组逆序
  4. 线程可警告状态以及APC队列
  5. Hbase Solr 二级索引 同步int数据报错com.ngdata.hbaseindexer.parse.ByteArrayValueMappers: Error mapping byte
  6. Cloudera-Manager 与 原生集群 免密登录问题
  7. 洞察设计模式的底层逻辑
  8. 数据湖,已成为海量数据存储与分析的重要承载方式
  9. 问答题:如何构建一套满足GPT-3的存储系统?
  10. 使用TensorFlow,GPU和Docker容器进行深度学习