1.yml配置文件(简单配置)

spring:kafka:bootstrap-servers: ip:端口consumer:group-id: group-testenable-auto-commit: trueauto-commit-interval: 1000msauto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2.类配置方式(使用了SASL/PLAINTEXT安全认证协议)

出现[Consumer clientId=consumer-1, groupId=group1] Bootstrap broker xxx (id: -1 rack: null) disconnected报错信息就需要用这种配置方式,加入安全认证

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;/*** @author 向振华* @date 2021/05/10 15:58*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(10);factory.getContainerProperties().setPollTimeout(1500);return factory;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>(16);// 服务地址propsMap.put("bootstrap.servers", "ip:端口");// 安全认证协议propsMap.put("security.protocol", "SASL_PLAINTEXT");propsMap.put("sasl.mechanism", "PLAIN");// 填充安全认证用户名和密码propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"usnm\" password=\"pwd\";");propsMap.put("group.id", "group-test");propsMap.put("enable.auto.commit", true);propsMap.put("auto.commit.interval.ms", 1000);// latest: 从最新的偏移量开始消费propsMap.put("auto.offset.reset", "latest");// 反序列化方式propsMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");propsMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return propsMap;}
}

再加一个配置文件sasl.jaas.config到resource

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafkaadmin"password="kafkaadminpwd"user_kafkaadmin="kafkaadminpwd"user_kafkaclient1="kafkaclient1pwd"user_kafkaclient2="kafkaclient2pwd";
}; 

使用:

@Slf4j
@Component
public class KafkaConsumer {@KafkaListener(topics = {"xxx"})public void listener(ConsumerRecord<?, ?> record) {log.info("收到消息 ---> " + record.value().toString());}
}

Kafka两种配置文件方式相关推荐

  1. Spark对Kafka两种连接方式的对比——Receiver和Direct

    在知乎 Flink 取代 Spark Streaming 的实战之路中,提到 因此下面对两种方式进行详细说明一下. Receiver方式 Receiver:接收器模式是使用Kafka高级Consume ...

  2. Kafka结合Spark-streaming 的两种连接方式(AWL与直连)

    kafka结合spark-streaming的用法及说明之前博客有些,这里就不赘述了. 这篇文章说下他们结合使用的两种连接方式.(AWL与直连) 先看一张图: 这是kafka与streaming结合的 ...

  3. SpringBoot的properties和yml两种配置方式, 配置注入参数, 以及配置文件读取失效的问题

    SpringBoot支持两种配置方式,一种是properties文件,一种是yml 首先在pom文件中添加依赖: <dependency><groupId>org.spring ...

  4. redis的两种持久化方式详解

    一.背景 在实际开发中,为了保证数据的完整性,防止数据丢失,我们除了在原有的传统数据库保存数据的同时,最好是再用redis持久化再保存一次数据.如果仅仅是使用redis而不进行持久化配置的话,当red ...

  5. android旋转动画的两种实现方式

    在android开发,我们会常常使用到旋转动画,普通情况下旋转动画有两种实现方式,一种是直接通过java代码去实现,第二种是通过配置文件实现动画.以下是两种动画的基本是用法: 纯Java代码实现: / ...

  6. wdcp支持两种安装方式

    v3.2版本已发布,支持多PHP版本共存共用,支持SSL证书,更多可看论坛 v3版讨论区 更多安装说明请看 http://www.wdlinux.cn/bbs/thread-57643-1-1.htm ...

  7. Java框架篇---spring aop两种配置方式

    Java框架篇---spring aop两种配置方式 第一种:注解配置AOP 注解配置AOP(使用 AspectJ 类库实现的),大致分为三步:  1. 使用注解@Aspect来定义一个切面,在切面中 ...

  8. 探究Redis两种持久化方式下的数据恢复

    对长期奋战在一线的后端开发人员来说,都知道redis有两种持久化方式RDB和AOF,虽说大家都知道这两种方式大概运作方式,但想必有实操的人不会太多. 这里是自己实操两种持久化方式的一点点记录. 先看以 ...

  9. Destoon数据库配置文件在哪_Mybatis 系列 2:Mybatis 的两种配置文件

    Mybatis 的配置文件,有两种: 一.Mybatis 全局配置文件(主配置文件): 起名:不固定,但一般起名要见名知意 --> mybatis-config.xml 路径:classpath ...

最新文章

  1. RocketMQ介绍与云服务器安装
  2. 使用 Acegi 保护 Java 应用程序
  3. web前端数组塌陷的解决办法
  4. LVS(15)——tun技术
  5. jquery设置输入框为只读_将SQL中几张表设为只读,这是什么奇怪需求?
  6. 多态_月隐学python第18课
  7. 1w存银行一年多少利息_100万存银行一年利息多少?能赚多少钱?
  8. python自动化测试脚本怎么编写_【Python + uiautomator2】之编写unittest自动化测试脚本...
  9. Mongodb2.6升级到Mongodb3.0.2笔记
  10. pdf文件的处理(弄成小容量大小的文本文件)
  11. 程序崩溃调试 Linux开启产生coredump文件
  12. java出租车源码_基于WEB的JAVA出租车打车系统
  13. 趣味项目—MyQQ机器人(一)
  14. 2016杭州云栖大会回顾网址
  15. 保险丝的作用,参数及选型应用,你真的懂了吗——电子元器件篇
  16. 正则表达式--文本处理神器
  17. JAVA 开发命名规范——阿里巴巴Java开发手册
  18. 【Markdown基础教程】编辑环境的下载
  19. Ubuntu 16.04下安装Caffe解决 undefined symbol: _ZN5boost6python6detail11init_moduleER11PyModuleDefPFvvE
  20. Java使用itext生成PDF 然后将多个PDF打包zip下载

热门文章

  1. Windows 2012安全防护: 密码安全策略设置为强密码
  2. GetChild( )
  3. 802.1Q中的secure/check/fallback/disable的简单分析
  4. Word中公式上浮怎么办?
  5. java面向对象 宠物领养系统 包含继承多态的使用 抽象方法和抽象类
  6. 泸州职业技术学院 二级计算机,泸州职业技术学院2021年招生代码
  7. 南阳计算机职称考试报名时间2015,南阳市2015年上半年全国计算机等级考试报名人数共7718人...
  8. 施工晴雨表图例_工程施工晴雨表范本
  9. 主打安全 阿里巴巴联合公安部打造PMOS
  10. 优友机器人价格_优友,大型商用服务机器人