github : https://github.com/zhouPingHua/spring-data-kafka2

1.依赖(注意kafka客户版本与服务版本要一致,不然会出错)

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.0.0.RC1</version>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope>
</dependency>

2.application.yml

#服务prot
server:
  port : 20001#kafka相关
kafka:
  brokers : 127.0.0.1:9092groupid : test-group

3.生产者 producer

package com.caiyi.financial.data.kafka;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;/**
 * Created by zph  Date2017/8/9.
 * kafka生产者配置
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${kafka.brokers}")private String brokers;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Bean
    public KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());}
}

4.消费者

package com.caiyi.financial.data.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;/**
 * Created by zph  Date2017/8/9.
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${kafka.brokers}")private String brokers;@Value("${kafka.groupid}")private String groupid;@Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");return propsMap;}@Bean
    public Listener listener() {return new Listener();}}

5.监听类

package com.caiyi.financial.data.kafka;/**
 * Created by zph  Date2017/8/9.
 */

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;import java.util.Optional;public class Listener {@KafkaListener(topics = {"testtopic"})public void listen(ConsumerRecord<?, ?> record) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();System.out.println("testtopic " + message);}}@KafkaListener(topics = {"testtopic2"})public void listen2(ConsumerRecord<?, ?> record) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();System.out.println("testtopic2" + message);}}
}

6.controller

package com.caiyi.financial.data.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/**
 * Created by zph  Date2017/8/10.
 */
@RestController
public class Controller {@Autowired
    private KafkaTemplate kafkaTemplate;@RequestMapping(value = "/send", method = RequestMethod.GET)public void send(@RequestParam(required = true) String topic, @RequestParam(required = true) String message) {kafkaTemplate.send(topic, message);}}

7.项目启动

package com.caiyi.financial.data;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;/**
 * Created by zph  Date2017/8/8.
 */
@SpringBootApplication
public class Application {protected static final Logger logger = LoggerFactory.getLogger(Application.class);public static void main(String[] args) {SpringApplication.run(Application.class, args);logger.info("spring BOOT Start");}protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {return application.sources(Application.class);}}

转载于:https://my.oschina.net/u/3642896/blog/1506378

springboot kafka集成相关推荐

  1. kafka的单机搭建与springboot快速集成

    一.kakfa相关术语 Broker 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群 Topic Kafka根据topic对消息进行归类 ...

  2. kafka修改分区数_大数据技术:解析SparkStreaming和Kafka集成的两种方式

    Spark Streaming是基于微批处理的流式计算引擎,通常是利用Spark Core或者Spark Core与Spark Sql一起来处理数据.在企业实时处理架构中,通常将Spark Strea ...

  3. springboot项目集成docker

    文章目录 一.docker常用命令 0.拉取镜像到本地仓库 1.查看所有镜像 2.创建一个新的容器并运行,返回的是容器的ID -- CONTAINER ID: 3.查看运行中的docker实例 4.查 ...

  4. springboot的jsp应该放在哪_在springboot中集成jsp开发

    springboot就是一个升级版的spring.它可以极大的简化xml配置文件,可以采用全注解形式开发,一个字就是很牛. 在springboot想要使用jsp开发,需要集成jsp,在springbo ...

  5. springboot+mybatis集成自定义缓存ehcache用法笔记

    今天小编给大家整理了springboot+mybatis集成自定义缓存ehcache用法笔记,希望对大家能有所办帮助! 一.ehcache介绍 EhCache 是一个纯Java的进程内缓存管理框架,属 ...

  6. Springboot/Cloud集成Sentinel 和 入门实战

    文章目录 一.Springboot/Cloud集成Sentinel 1. spring-cloud-alibaba依赖 2. 引入 Sentinel starter 3. 配置application. ...

  7. springboot nacos_springboot集成nacos

    1.现在nacos进行配置添加 2.springboot项目集成nacos nacos上有说明,根据不同的工程进行配置,如下图 集成nacos客户端包 com.alibaba.boot nacos-c ...

  8. SpringBoot 2 集成微信扫码支付

    前言 该文主要是手把手教你如何在SpringBoot 中集成微信扫码支付,以及集成的过程需要注意的问题事项.另外需要感谢 vbirdbest 关于微信支付和支付宝支付相关包博客总结.因为文中很多地方参 ...

  9. spark kafka java api_java实现spark streaming与kafka集成进行流式计算

    java实现spark streaming与kafka集成进行流式计算 2017/6/26补充:接手了搜索系统,这半年有了很多新的心得,懒改这篇粗鄙之文,大家看综合看这篇新博文来理解下面的粗鄙代码吧, ...

  10. MyBatis系列之--Java 项目(非SpringBoot)集成MyBatis

    MyBatis系列之--Java 项目(非SpringBoot)集成MyBatis 对MyBatis简单介绍 核心接口SqlSessionFactory 实战 1. Maven创建Java项目 2. ...

最新文章

  1. 基于SharePoint 2013的论坛解决方案[开源]
  2. kattis ones简单题取模运算+枚举
  3. python 压缩文件 调用7z_Python:如何从Python压缩的7z文件中读取一行?
  4. 【斜率优化】仓库建设(luogu 2120)
  5. 自组网中继台_同频自组网基站
  6. epson me 1+只有主机能打印不能共享网络打印问题的处理
  7. 自考那些事儿(八):计算机网络原理(原理篇)之网络各层
  8. python循环读取文件越来越慢_python读取大文件越来越慢的原因与解决
  9. 用C#,SQL Server编写的音乐播放软件
  10. mysql垂直分库_mysql垂直分库,水平分库,垂直分表,水平分表
  11. 一篇文章读懂拿了图灵奖和诺贝尔奖的概率图模型
  12. java数组= 0_JAVA数组
  13. kk每日一句:第一句
  14. ICX285 ICX205 ICX414 3CCD共用驱动板电路设计
  15. 微信公众号小程序实战开发vue3+nodejs+koa2+mysql+nginx阿里云部署教程
  16. 第22批符合道路运输车辆卫星定位系统标准 及规范的车载终端
  17. 交换机,路由器接口类型
  18. 给定一个递增序列,a1 a2 ...an 。定义这个序列的最大间隔为d=max{ai+1 - ai }(1≤in),现在要从a2 ,a3 ..an-1 中删除一个元素。问剩余序列的最大间隔最小...
  19. CUDA总结:Occupancy
  20. 图片标签和图片格式~

热门文章

  1. 笔记———计算机网络原理(二)
  2. 如何root安卓手机_超级神器——安卓端的手机虚拟机,手机中的手机(支持root,xp框架)...
  3. win10 插上有线耳机 无声音:装上声卡驱动【检查】
  4. 数据可视化④:大学生就业可视化呈现
  5. python 直方图 横向_python绘制直方图
  6. 京东智联云能打破“强者恒强”的定律吗?
  7. matlab2016环境变量,matlab环境变量path
  8. Python分析中国大陆各直辖市及各省省会的平均工资与平均房价 课程报告+源码及数据
  9. GF系列卫星分辨率介绍
  10. 写给程序员的 2018 新年计划清单