准备工作

kafka版本:kafka_2.10-0.10.1.0

spring版本:spring4.3

配置文件

pom文件配置(也可以直接下载jar包)

Kafka和spring集成的支持类库,spring和kafka通信监听

1 <dependency>
2   <groupId>org.springframework.integration</groupId>
3   <artifactId>spring-integration-kafka</artifactId>
4   <version>1.3.0.RELEASE</version>
5 </dependency>

kafka发送消息以及接受消息使用的类库

1 <dependency>
2     <groupId>org.apache.kafka</groupId>
3     <artifactId>kafka-clients</artifactId>
4     <version>0.10.1.0</version>
5 </dependency>

使用高版本是因为低版本无法支持kafka监听,spring和kafka集成不好

1 <dependency>
2     <groupId>org.springframework</groupId>
3     <artifactId>spring-webmvc</artifactId>
4     <version>4.3.0.RELEASE</version>
5 </dependency>

kafka自带监听器,依赖于spring,所以需要和pring-integration-kafka结合使用

1 <dependency>
2     <groupId>org.springframework.kafka</groupId>
3     <artifactId>spring-kafka</artifactId>
4     <version>1.0.0.RC1</version>
5 </dependency>

producer配置

  1.如果你的topic没有设置名称,按照默认的topic的名字生成对应的数据文件夹。

  2.producerListener用来判断kafka发送数据是否成功以及发送反馈信息。

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
 4     xsi:schemaLocation="http://www.springframework.org/schema/beans
 5          http://www.springframework.org/schema/beans/spring-beans.xsd
 6          http://www.springframework.org/schema/context
 7          http://www.springframework.org/schema/context/spring-context.xsd">
 8
 9     <!-- 定义producer的参数 -->
10     <bean id="producerProperties" class="java.util.HashMap">
11         <constructor-arg>
12             <map>
13                 <entry key="bootstrap.servers" value="localhost:7000" />
14                 <entry key="group.id" value="0" />
15                 <entry key="retries" value="1" />
16                 <entry key="batch.size" value="16384" />
17                 <entry key="linger.ms" value="1" />
18                 <entry key="buffer.memory" value="33554432" />
19                 <entry key="key.serializer"
20                 value="org.apache.kafka.common.serialization.StringSerializer" />
21                 <entry key="value.serializer"
22                 value="org.apache.kafka.common.serialization.StringSerializer" />
23             </map>
24         </constructor-arg>
25     </bean>
26
27     <!-- 创建kafkatemplate需要使用的producerfactory bean -->
28     <bean id="producerFactory"
29         class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
30         <constructor-arg>
31             <ref bean="producerProperties" />
32         </constructor-arg>
33     </bean>
34
35     <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
36     <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
37         <constructor-arg ref="producerFactory" />
38         <constructor-arg name="autoFlush" value="true" />
39         <property name="defaultTopic" value="defaultTopic" />
40         <property name="producerListener" ref="producerListener"/>
41     </bean>
42
43     <bean id="producerListener" class="com.git.kafka.producer.KafkaProducerListener" />
44 </beans>

consumer配置

  1.使用kafka的listener进行消息消费监听,如果有消费消息进入会自动调用OnMessage方法进行消息消费以及后续业务处理。

  2.如果要配置多个topic,需要创建新的消费者容器,然后统一指向listner的消息处理类,统一让这个类进行后续业务处理。

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4      xmlns:context="http://www.springframework.org/schema/context"
 5      xsi:schemaLocation="http://www.springframework.org/schema/beans
 6      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 7      http://www.springframework.org/schema/tx
 8      http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
 9      http://www.springframework.org/schema/jee
10      http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
11      http://www.springframework.org/schema/context
12       http://www.springframework.org/schema/context/spring-context-3.0.xsd">
13
14
15     <!-- 定义consumer的参数 -->
16      <bean id="consumerProperties" class="java.util.HashMap">
17          <constructor-arg>
18              <map>
19                  <entry key="bootstrap.servers" value="127.0.0.1:7000"/>
20                  <entry key="group.id" value="0"/>
21                  <entry key="enable.auto.commit" value="false"/>
22                  <entry key="auto.commit.interval.ms" value="1000"/>
23                  <entry key="session.timeout.ms" value="15000"/>
24                  <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
25                  <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
26              </map>
27          </constructor-arg>
28      </bean>
29
30      <!-- 创建consumerFactory bean -->
31      <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
32          <constructor-arg>
33              <ref bean="consumerProperties"/>
34          </constructor-arg>
35      </bean>
36
37      <!-- 实际执行消息消费的类 -->
38      <bean id="messageListernerConsumerService" class="com.git.kafka.consumer.KafkaConsumerServer"/>
39
40      <!-- 消费者容器配置信息 -->
41      <bean id="containerProperties_trade" class="org.springframework.kafka.listener.config.ContainerProperties">
42          <constructor-arg value="order_test_topic"/>
43          <property name="messageListener" ref="messageListernerConsumerService"/>
44      </bean>
45      <bean id="containerProperties_other" class="org.springframework.kafka.listener.config.ContainerProperties">
46          <constructor-arg value="other_test_topic"/>
47          <property name="messageListener" ref="messageListernerConsumerService"/>
48      </bean>
49
50      <!-- 创建messageListenerContainer bean,使用的时候,只需要注入这个bean -->
51      <bean id="messageListenerContainer_trade" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
52          init-method="doStart">
53          <constructor-arg ref="consumerFactory"/>
54          <constructor-arg ref="containerProperties_trade"/>
55      </bean>
56
57      <bean id="messageListenerContainer_other" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
58          init-method="doStart">
59          <constructor-arg ref="consumerFactory"/>
60          <constructor-arg ref="containerProperties_other"/>
61      </bean>
62
63 </beans>

applicationContext配置

1 <import resource="classpath:kafkaConsumer.xml" />
2 <import resource="classpath:kafkaProducer.xml" />

具体实现

constant.java  //常量类

 1 package com.git.kafka.constant;
 2
 3 /**
 4  * kafkaMessageConstant
 5  * @author wangb
 6  *
 7  */
 8 public class KafkaMesConstant {
 9
10     public static final String SUCCESS_CODE = "00000";
11     public static final String SUCCESS_MES = "成功";
12
13     /*kakfa-code*/
14     public static final String KAFKA_SEND_ERROR_CODE = "30001";
15     public static final String KAFKA_NO_RESULT_CODE = "30002";
16     public static final String KAFKA_NO_OFFSET_CODE = "30003";
17
18     /*kakfa-mes*/
19     public static final String KAFKA_SEND_ERROR_MES = "发送消息超时,联系相关技术人员";
20     public static final String KAFKA_NO_RESULT_MES = "未查询到返回结果,联系相关技术人员";
21     public static final String KAFKA_NO_OFFSET_MES = "未查到返回数据的offset,联系相关技术人员";
22
23
24 }

KafkaConsumerServer.java  //消费者监听

 1 package com.git.kafka.consumer;
 2
 3 import org.apache.kafka.clients.consumer.ConsumerRecord;
 4 import org.slf4j.Logger;
 5 import org.slf4j.LoggerFactory;
 6 import org.springframework.kafka.listener.MessageListener;
 7
 8 /**
 9  * kafka监听器启动
10  * 自动监听是否有消息需要消费
11  * @author wangb
12  *
13  */
14 public class KafkaConsumerServer implements MessageListener<String, String> {
15     protected final Logger LOG = LoggerFactory.getLogger("kafkaConsumer");
16     /**
17      * 监听器自动执行该方法
18      *     消费消息
19      *     自动提交offset
20      *     执行业务代码
21      *     (high level api 不提供offset管理,不能指定offset进行消费)
22      */
23     @Override
24     public void onMessage(ConsumerRecord<String, String> record) {
25         LOG.info("=============kafkaConsumer开始消费=============");
26         String topic = record.topic();
27         String key = record.key();
28         String value = record.value();
29         long offset = record.offset();
30         int partition = record.partition();
31         LOG.info("-------------topic:"+topic);
32         LOG.info("-------------value:"+value);
33         LOG.info("-------------key:"+key);
34         LOG.info("-------------offset:"+offset);
35         LOG.info("-------------partition:"+partition);
36         LOG.info("~~~~~~~~~~~~~kafkaConsumer消费结束~~~~~~~~~~~~~");
37     }
38
39 }

kafkaProducerListener.java  //生产者监听-打印日志

package com.git.kafka.producer;import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;/*** kafkaProducer监听器,在producer配置文件中开启* @author wangb**/
@SuppressWarnings("rawtypes")
public class KafkaProducerListener implements ProducerListener{protected final Logger LOG = LoggerFactory.getLogger("kafkaProducer");/*** 发送消息成功后调用*/@Overridepublic void onSuccess(String topic, Integer partition, Object key,Object value, RecordMetadata recordMetadata) {LOG.info("==========kafka发送数据成功(日志开始)==========");LOG.info("----------topic:"+topic);LOG.info("----------partition:"+partition);LOG.info("----------key:"+key);LOG.info("----------value:"+value);LOG.info("----------RecordMetadata:"+recordMetadata);LOG.info("~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~");}/*** 发送消息错误后调用*/@Overridepublic void onError(String topic, Integer partition, Object key,Object value, Exception exception) {LOG.info("==========kafka发送数据错误(日志开始)==========");LOG.info("----------topic:"+topic);LOG.info("----------partition:"+partition);LOG.info("----------key:"+key);LOG.info("----------value:"+value);LOG.info("----------Exception:"+exception);LOG.info("~~~~~~~~~~kafka发送数据错误(日志结束)~~~~~~~~~~");exception.printStackTrace();}/*** 方法返回值代表是否启动kafkaProducer监听器*/@Overridepublic boolean isInterestedInSuccess() {LOG.info("///kafkaProducer监听器启动///");return true;}}

KafkaProducerServer.java  //生产者

package com.git.kafka.producer;import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import com.alibaba.fastjson.JSON;
import com.git.kafka.constant.KafkaMesConstant;/*** kafkaProducer模板*     使用此模板发送消息* @author wangb**/
@Component
public class KafkaProducerServer{@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** kafka发送消息模板* @param topic 主题* @param value    messageValue* @param ifPartition 是否使用分区 0是\1不是* @param partitionNum 分区数 如果是否使用分区为0,分区数必须大于0* @param role 角色:bbc app erp...*/public Map<String,Object> sndMesForTemplate(String topic, Object value, String ifPartition, Integer partitionNum, String role){String key = role+"-"+value.hashCode();String valueString = JSON.toJSONString(value);if(ifPartition.equals("0")){//表示使用分区int partitionIndex = getPartitionIndex(key, partitionNum);ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, partitionIndex, key, valueString);Map<String,Object> res = checkProRecord(result);return res;}else{ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, valueString);Map<String,Object> res = checkProRecord(result);return res;}}/*** 根据key值获取分区索引* @param key* @param partitionNum* @return*/private int getPartitionIndex(String key, int partitionNum){if (key == null) {Random random = new Random();return random.nextInt(partitionNum);}else {int result = Math.abs(key.hashCode())%partitionNum;return result;}}/*** 检查发送返回结果record* @param res* @return*/@SuppressWarnings("rawtypes")private Map<String,Object> checkProRecord(ListenableFuture<SendResult<String, String>> res){Map<String,Object> m = new HashMap<String,Object>();if(res!=null){try {SendResult r = res.get();//检查result结果集/*检查recordMetadata的offset数据,不检查producerRecord*/Long offsetIndex = r.getRecordMetadata().offset();if(offsetIndex!=null && offsetIndex>=0){m.put("code", KafkaMesConstant.SUCCESS_CODE);m.put("message", KafkaMesConstant.SUCCESS_MES);return m;}else{m.put("code", KafkaMesConstant.KAFKA_NO_OFFSET_CODE);m.put("message", KafkaMesConstant.KAFKA_NO_OFFSET_MES);return m;}} catch (InterruptedException e) {e.printStackTrace();m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);return m;} catch (ExecutionException e) {e.printStackTrace();m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);return m;}}else{m.put("code", KafkaMesConstant.KAFKA_NO_RESULT_CODE);m.put("message", KafkaMesConstant.KAFKA_NO_RESULT_MES);return m;}}}

KafkaProducerTest.java  //kafka生产者测试(消费者使用spring启动监听,自动执行onMessage方法)

package com.git.test;import java.util.Map;import com.git.kafka.producer.KafkaProducerServer;public class KafkaProducerTest {public static void main(String[] args) {KafkaProducerServer kafkaProducer = new KafkaProducerServer();String topic = "orderTopic";String value = "test";String ifPartition = "0";Integer partitionNum = 3;String role = "test";//用来生成keyMap<String,Object> res = kafkaProducer.sndMesForTemplate(topic, value, ifPartition, partitionNum, role);System.out.println("测试结果如下:===============");String message = (String)res.get("message");String code = (String)res.get("code");System.out.println("code:"+code);System.out.println("message:"+message);}
}

转载自https://www.cnblogs.com/wangb0402/p/6187796.html

kafka与Spring的集成相关推荐

  1. Kafka——使用spring进行集成

    生产者: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://w ...

  2. Kafka 入门和 Spring Boot 集成

    2019独角兽企业重金招聘Python工程师标准>>> Kafka 入门和 Spring Boot 集成 概述 kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流 ...

  3. spring cloud微服务分布式云架构 - Spring Cloud集成项目简介

    Spring Cloud集成项目有很多,下面我们列举一下和Spring Cloud相关的优秀项目,我们的企业架构中用到了很多的优秀项目,说白了,也是站在巨人的肩膀上去整合的.在学习Spring Clo ...

  4. 关于SpringCloud微服务云架构构建B2B2C电子商务平台之- Spring Cloud集成项目简介(三)...

    2019独角兽企业重金招聘Python工程师标准>>> Spring Cloud集成项目有很多,下面我们列举一下和Spring Cloud相关的优秀项目,我们的企业架构中用到了很多的 ...

  5. (三)java版spring cloud+spring boot 社交电子商务平台 - Spring Cloud集成项目简介

    2019独角兽企业重金招聘Python工程师标准>>> 电子商务平台源码请加企鹅求求:一零三八七七四六二六.Spring Cloud集成项目有很多,下面我们列举一下和Spring C ...

  6. Linux 安装Redis-6.2.5,配置及使用(RDB与AOF持久化、sentinel机制、主从复制、Spring Boot 集成 Redis)

    CentOS 7 安装Redis-6.2.5版本 Redis采用的是基于内存的单进程 单线程模型 的KV数据库,由C语言编写.官方提供的数据是可以达到100000+的qps 应用场景: 令牌(Toke ...

  7. Spring boot集成axis2开发webservice 服务

    Spring boot集成axis2开发webservice 服务 1.新建Spring boot 项目 此处省略... 项目结构如下: 2.添加Axis2依赖 <!--axis2版本信息--& ...

  8. spring boot 集成sleuth

    spring boot 集成sleuth 1. 理论 1.1 sleuth是什么 1.2 sleuth有哪些 1.3 链路追踪的一些基本概念 1.4 zipkin的组成 2. zipkin 实例 2. ...

  9. spring cloud微服务分布式云架构 - Spring Cloud集成项目简介(三)

    点击上面 免费订阅本账号! 本文作者:it菲菲 原文:https://yq.aliyun.com/articles/672242 点击阅读全文前往 Spring Cloud集成项目有很多,下面我们列举 ...

最新文章

  1. 安卓java读取软件自身包名,android 获取第三方应用程序包名并启动 | 学步园
  2. 进入编辑模式、vim命令模式、vim实践
  3. Android——Intent动作汇总(转)
  4. Linux系统之更改默认块大小
  5. java 图片导出_java导出含图片的word
  6. Springboot第二篇:与前端fetch通信(附springboot解决跨域方法)
  7. 解决Eclipse 项目报错:Unbound classpath container
  8. Lync2010服务器的高可用性部署测试
  9. Linux网络协议栈:中断下半部处理
  10. 重装系统Win10电脑磁盘被写保护怎么办
  11. crontab实现以秒执行,很好很强大
  12. BrnShop开源网上商城第三讲:插件的工作机制
  13. 36. 理解copy_if算法的正确实现
  14. idea 设置版权信息
  15. mysql之事务 锁(三)
  16. 计算机常用单词 pdf,关于计算机的常用词汇.pdf
  17. android图片做平移动画,Android中用Matrix实现ImageView里的图片平移和缩放动画
  18. [学习Cython编程]Cython编程入门
  19. 【排序】一次查找两元素
  20. 输入学生成绩,输出学生成绩等级,学习成绩>=90且<=100分的同学用A表示,60-89分之间的用B表示,<60且>0分以下的用C表示。其余输入都有误!“

热门文章

  1. 谷歌补丁Android官网,谷歌已发布 19年12月 Android 安全补丁
  2. The history and design philosophy of Spring
  3. javaScript从入门到精通3.md
  4. 2345联盟通过流氓软件推广挖矿工具, 众多用户电脑沦为“肉鸡”
  5. oracle Group by 分组查询后,分页
  6. linux内存管理总结
  7. spark 性能优化
  8. u盘 连接服务器系统软件,u盘服务器系统
  9. 大同大学计算机与网络工程学院,计算机与网络工程学院代表队 在第十一届“蓝桥杯”全国软件和信息技术人才大赛全国决赛中再获佳绩...
  10. (89)多周期时序约束(四)