转自:

SpringBoot接入两套kafka集群 - 风小雅 - 博客园引入依赖 compile 'org.springframework.kafka:spring-kafka' 第一套kafka配置 package myapp.kafka; importhttps://www.cnblogs.com/ylty/p/13673357.html


引入依赖

  compile 'org.springframework.kafka:spring-kafka'

第一套kafka配置

package myapp.kafka;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;/*** 默认的kafka配置** @author zhengqian*/
@Slf4j
@Configuration
@Data
public class K1KafkaConfiguration {@Value("${app-name.kafka.k1.consumer.bootstrap-servers}")private String consumerBootstrapServers;@Value("${app-name.kafka.k1.consumer.group-id}")private String groupId;@Value("${app-name.kafka.k1.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${app-name.kafka.k1.consumer.enable-auto-commit}")private Boolean enableAutoCommit;@Value("${app-name.kafka.k2.producer.bootstrap-servers}")private String producerBootstrapServers;@Bean@PrimaryKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}

第二套kafka配置

package myapp.kafka;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;/*** 默认的kafka配置** @author zhengqian*/
@Slf4j
@Configuration
@Data
public class K2KafkaConfiguration {@Value("${app-name.kafka.k2.consumer.bootstrap-servers}")private String consumerBootstrapServers;@Value("${app-name.kafka.k2.consumer.group-id}")private String groupId;@Value("${app-name.kafka.k2.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${app-name.kafka.k2.consumer.enable-auto-commit}")private Boolean enableAutoCommit;@Value("${app-name.kafka.k2.producer.bootstrap-servers}")private String producerBootstrapServers;@Bean@PrimaryKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryK2() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactoryK2());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<Integer, String> consumerFactoryK2() {return new DefaultKafkaConsumerFactory<>(consumerConfigsK2());}@Beanpublic Map<String, Object> consumerConfigsK2() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic Map<String, Object> producerConfigsK2() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactoryK2() {return new DefaultKafkaProducerFactory<>(producerConfigsK2());}@Beanpublic KafkaTemplate<String, String> kafkaTemplateK2() {return new KafkaTemplate<>(producerFactoryK2());}
}

配置文件

app-name: kafka:k1:consumer:bootstrap-servers: host1:9092group-id: my-appauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: trueproducer:bootstrap-servers: host1:9092key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerk2:consumer:bootstrap-servers: host2:9092group-id: my-appauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: trueproducer:bootstrap-servers: host2:9092key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

指定消费的kafka集群

    @KafkaListener(topics = "topic-name", containerFactory = "kafkaListenerContainerFactoryK2")public void onEvent(ConsumerRecord<String, String> record) {// 省略}

指定生产者发生的kafka集群

public class KafkaTest {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Testpublic void test() {ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("topic", "data");try {SendResult<String, String> value = result.get(2, TimeUnit.SECONDS);System.out.println(value.getProducerRecord());System.out.println(value.getRecordMetadata());} catch (Exception e) {e.printStackTrace();}}
}

(转) SpringBoot接入两套kafka集群相关推荐

  1. java进阶Kafka集群实战之原理分析及优化教程全在这里

    我不去想是否能够成功 既然选择了Java 便只顾风雨兼程 我不去想能否征服Kafka集群 既然钟情于Java 就勇敢地追随千锋 我不去想Kafka集群有多么晦涩难懂 既然目标是远方 留给世界的只能是努 ...

  2. kafka集群under replicated分析

    近期随着业务消息量增大,现网几套kafka集群频繁收到under repliacted告警,集合近期定位分析过程,主要有以下几个方面: 1. 查看是否有主机挂掉,或近期是否有主机重启,通过kafdro ...

  3. SpringBoot集成Kafka集群并实现接收_发送消息操作_以及常见错误_亲测---Kafka工作笔记005

    1.注意这个过程中,很重要的是:版本,springboot的版本和spring-kafka的版本要对应起来. 2.我现在发现两个版本是没问题的,一会说明 3.还要注意yml资源文件,或者propert ...

  4. Debezium系列之:使用Debezium接入PostgreSQL数据库数据到Kafka集群的详细技术文档

    Debezium系列之:使用Debezium接入PostgreSQL数据库数据到Kafka集群的详细技术文档 一.概述 二.连接器的工作原理 1.安全 2.快照 3.Ad hoc snapshots ...

  5. win10下kafka集群安装+集成springboot

    kafka安装+集成springboot 记录kafka安装.学习.继承springboot的过程 文章目录 kafka安装+集成springboot 前言 一.kafka + zk的安装 1.zk的 ...

  6. Debezium系列之:使用Debezium接入SQL Server数据库数据到Kafka集群的详细技术文档

    Debezium系列之:使用Debezium接入SQL Server数据库数据到Kafka集群的详细技术文档 一.Debezium概述 二.SQL Server 连接器的工作原理 1.Snapshot ...

  7. 搭建kafka集群并使用springboot 整合

    上一篇文章我们已经成功安装了kafka,本文讲解部署kafka集群,并使用springboot整合测试. 设置多 broker 集群 由于只有一台虚拟机,于是通过多个配置文件模拟多台broker 首先 ...

  8. kafka集群搭建+权限认证(SASL/SCRAM)+整合springboot

    本文介绍的的是kafka集群搭建.kafka权限认证(SASL/SCRAM).整合springboot项目. 1.创建kafka日志和zookeeper文件目录: /data/kafka/kafka- ...

  9. kafka 丢弃数据_20条关于Kafka集群应对高吞吐量的避坑指南

    Apache Kafka是一款流行的分布式数据流平台,它已经广泛地被诸如New Relic(数据智能平台).Uber.Square(移动支付公司)等大型公司用来构建可扩展的.高吞吐量的.高可靠的实时数 ...

最新文章

  1. http://www.cnblogs.com/youfan/articles/3216816.html
  2. php消除连续字符,JS字符串去除连续或全部重复字符的实例
  3. android控制电脑,android-remote-control-computer
  4. mtk处理器和骁龙对比_高通正在开发全新AR/VR处理器骁龙XR2;骁龙865对比骁龙855 Plus/苹果A13:爆料称性能增加20%...
  5. 初识网络流(EK and Dinic 模板)
  6. 商业楼与写字楼的区别详解
  7. 塑身必知常识:究竟是有氧运动还是无氧运动好?
  8. 如何区分静态网页与动态网页
  9. SQL特殊comment语法
  10. 阿里巴巴校招实习生一面记录
  11. WMS系统(一)成品入库需求分析
  12. 学生会计算机办公软件培训制度,学生干部办公软件技能培训
  13. jquery mobile_jQuery Mobile课程1
  14. mybatis学习与理解
  15. html5如何设置视频为静音,html5中设置或返回音频/视频是否应该被静音的属性muted-...
  16. 字符串函数的使用和剖析(三)
  17. Hexo+Github: 博客网站搭建完全教程(看这篇就够了)
  18. 超全面的移动端UI 设计规范整理汇总
  19. CSS实现骨架屏 Skeleton 效果
  20. 他山之石,可以攻玉篇

热门文章

  1. 2019牛客暑期多校训练营(第五场)C generator 2 (BSGS)
  2. 【BZOJ3328】PYXFIB【矩阵快速幂】【单位根反演】【二项式定理】
  3. C - Digital Path 计蒜客 - 42397(dp记忆化搜索)
  4. Codeforces 1065 E. Side Transmutations
  5. 守列划分问题(圆排列+排列dp+结论)
  6. 切题 (problem)(线段树+最大流最小割)
  7. 2021牛客OI赛前集训营-提高组(第五场)C-第K排列【dp】
  8. P6880-[JOI 2020 Final]オリンピックバス【最短路】
  9. AT4378-[AGC027D]ModuloMatrix【构造】
  10. bzoj4403-序列统计【Lucas,组合数学】