springboot kafka集成
github : https://github.com/zhouPingHua/spring-data-kafka2
1.依赖(注意kafka客户版本与服务版本要一致,不然会出错)
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.0.0.RC1</version> </dependency> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope> </dependency>
2.application.yml
#服务prot server: port : 20001#kafka相关 kafka: brokers : 127.0.0.1:9092groupid : test-group
3.生产者 producer
package com.caiyi.financial.data.kafka;import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap; import java.util.Map;/** * Created by zph Date:2017/8/9. * kafka生产者配置 */ @Configuration @EnableKafka public class KafkaProducerConfig {@Value("${kafka.brokers}")private String brokers;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Bean public KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());} }
4.消费者
package com.caiyi.financial.data.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap; import java.util.Map;/** * Created by zph Date:2017/8/9. */ @Configuration @EnableKafka public class KafkaConsumerConfig {@Value("${kafka.brokers}")private String brokers;@Value("${kafka.groupid}")private String groupid;@Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");return propsMap;}@Bean public Listener listener() {return new Listener();}}
5.监听类
package com.caiyi.financial.data.kafka;/** * Created by zph Date:2017/8/9. */ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener;import java.util.Optional;public class Listener {@KafkaListener(topics = {"testtopic"})public void listen(ConsumerRecord<?, ?> record) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();System.out.println("testtopic " + message);}}@KafkaListener(topics = {"testtopic2"})public void listen2(ConsumerRecord<?, ?> record) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();System.out.println("testtopic2" + message);}} }
6.controller
package com.caiyi.financial.data.controller;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;/** * Created by zph Date:2017/8/10. */ @RestController public class Controller {@Autowired private KafkaTemplate kafkaTemplate;@RequestMapping(value = "/send", method = RequestMethod.GET)public void send(@RequestParam(required = true) String topic, @RequestParam(required = true) String message) {kafkaTemplate.send(topic, message);}}
7.项目启动
package com.caiyi.financial.data;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder;/** * Created by zph Date:2017/8/8. */ @SpringBootApplication public class Application {protected static final Logger logger = LoggerFactory.getLogger(Application.class);public static void main(String[] args) {SpringApplication.run(Application.class, args);logger.info("spring BOOT Start");}protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {return application.sources(Application.class);}}
转载于:https://my.oschina.net/u/3642896/blog/1506378
springboot kafka集成相关推荐
- kafka的单机搭建与springboot快速集成
一.kakfa相关术语 Broker 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群 Topic Kafka根据topic对消息进行归类 ...
- kafka修改分区数_大数据技术:解析SparkStreaming和Kafka集成的两种方式
Spark Streaming是基于微批处理的流式计算引擎,通常是利用Spark Core或者Spark Core与Spark Sql一起来处理数据.在企业实时处理架构中,通常将Spark Strea ...
- springboot项目集成docker
文章目录 一.docker常用命令 0.拉取镜像到本地仓库 1.查看所有镜像 2.创建一个新的容器并运行,返回的是容器的ID -- CONTAINER ID: 3.查看运行中的docker实例 4.查 ...
- springboot的jsp应该放在哪_在springboot中集成jsp开发
springboot就是一个升级版的spring.它可以极大的简化xml配置文件,可以采用全注解形式开发,一个字就是很牛. 在springboot想要使用jsp开发,需要集成jsp,在springbo ...
- springboot+mybatis集成自定义缓存ehcache用法笔记
今天小编给大家整理了springboot+mybatis集成自定义缓存ehcache用法笔记,希望对大家能有所办帮助! 一.ehcache介绍 EhCache 是一个纯Java的进程内缓存管理框架,属 ...
- Springboot/Cloud集成Sentinel 和 入门实战
文章目录 一.Springboot/Cloud集成Sentinel 1. spring-cloud-alibaba依赖 2. 引入 Sentinel starter 3. 配置application. ...
- springboot nacos_springboot集成nacos
1.现在nacos进行配置添加 2.springboot项目集成nacos nacos上有说明,根据不同的工程进行配置,如下图 集成nacos客户端包 com.alibaba.boot nacos-c ...
- SpringBoot 2 集成微信扫码支付
前言 该文主要是手把手教你如何在SpringBoot 中集成微信扫码支付,以及集成的过程需要注意的问题事项.另外需要感谢 vbirdbest 关于微信支付和支付宝支付相关包博客总结.因为文中很多地方参 ...
- spark kafka java api_java实现spark streaming与kafka集成进行流式计算
java实现spark streaming与kafka集成进行流式计算 2017/6/26补充:接手了搜索系统,这半年有了很多新的心得,懒改这篇粗鄙之文,大家看综合看这篇新博文来理解下面的粗鄙代码吧, ...
- MyBatis系列之--Java 项目(非SpringBoot)集成MyBatis
MyBatis系列之--Java 项目(非SpringBoot)集成MyBatis 对MyBatis简单介绍 核心接口SqlSessionFactory 实战 1. Maven创建Java项目 2. ...
最新文章
- 基于SharePoint 2013的论坛解决方案[开源]
- kattis ones简单题取模运算+枚举
- python 压缩文件 调用7z_Python:如何从Python压缩的7z文件中读取一行?
- 【斜率优化】仓库建设(luogu 2120)
- 自组网中继台_同频自组网基站
- epson me 1+只有主机能打印不能共享网络打印问题的处理
- 自考那些事儿(八):计算机网络原理(原理篇)之网络各层
- python循环读取文件越来越慢_python读取大文件越来越慢的原因与解决
- 用C#,SQL Server编写的音乐播放软件
- mysql垂直分库_mysql垂直分库,水平分库,垂直分表,水平分表
- 一篇文章读懂拿了图灵奖和诺贝尔奖的概率图模型
- java数组= 0_JAVA数组
- kk每日一句:第一句
- ICX285 ICX205 ICX414 3CCD共用驱动板电路设计
- 微信公众号小程序实战开发vue3+nodejs+koa2+mysql+nginx阿里云部署教程
- 第22批符合道路运输车辆卫星定位系统标准 及规范的车载终端
- 交换机,路由器接口类型
- 给定一个递增序列,a1 a2 ...an 。定义这个序列的最大间隔为d=max{ai+1 - ai }(1≤in),现在要从a2 ,a3 ..an-1 中删除一个元素。问剩余序列的最大间隔最小...
- CUDA总结:Occupancy
- 图片标签和图片格式~
热门文章
- 笔记———计算机网络原理(二)
- 如何root安卓手机_超级神器——安卓端的手机虚拟机,手机中的手机(支持root,xp框架)...
- win10 插上有线耳机 无声音:装上声卡驱动【检查】
- 数据可视化④:大学生就业可视化呈现
- python 直方图 横向_python绘制直方图
- 京东智联云能打破“强者恒强”的定律吗?
- matlab2016环境变量,matlab环境变量path
- Python分析中国大陆各直辖市及各省省会的平均工资与平均房价 课程报告+源码及数据
- GF系列卫星分辨率介绍
- 写给程序员的 2018 新年计划清单