一、添加依赖项

compile 'org.springframework.kafka:spring-kafka:1.2.2.RELEASE'

二、发消息(生产者)

2.1 xml配置

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans
 5          http://www.springframework.org/schema/beans/spring-beans.xsd">
 6
 7     <bean id="producerProperties" class="java.util.HashMap">
 8         <constructor-arg>
 9             <map>
10                 <!--kafka的服务地址,多个地址用英文逗号连接-->
11                 <entry key="bootstrap.servers" value="192.168.0.10:9092,192.168.0.11:9092,192.168.0.12:9092"/>
12                 <entry key="group.id" value="0"/>
13                 <entry key="retries" value="10"/>
14                 <entry key="batch.size" value="16384"/>
15                 <entry key="linger.ms" value="1"/>
16                 <entry key="buffer.memory" value="33554432"/>
17                 <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
18                 <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
19             </map>
20         </constructor-arg>
21     </bean>
22
23     <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
24         <constructor-arg>
25             <ref bean="producerProperties"/>
26         </constructor-arg>
27     </bean>
28
29     <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
30         <constructor-arg ref="producerFactory"/>
31         <constructor-arg name="autoFlush" value="true"/>
32         <!--topic名字-->
33         <property name="defaultTopic" value="dc-monitor"/>
34     </bean>
35
36 </beans>

2.2 发送代码示例

    @Testpublic void send() throws InterruptedException, ExecutionException, TimeoutException {KafkaTemplate template = context.getBean(KafkaTemplate.class);String msg = "中华人民共和国万岁!";ListenableFuture<SendResult<String, String>> future = template.sendDefault(msg);SendResult<String, String> result = future.get(10, TimeUnit.SECONDS);System.out.println("发送成功=====>" + msg);}

  

三、收消息(消费者)

3.1 xml配置

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans
 5          http://www.springframework.org/schema/beans/spring-beans.xsd">
 6
 7     <bean id="consumerProperties" class="java.util.HashMap">
 8         <constructor-arg>
 9             <map>
10                 <!--kafka的服务地址,多个地址用英文逗号连接-->
11                 <entry key="bootstrap.servers" value="192.168.0.10:9092,192.168.0.11:9092,192.168.0.12:9092"/>
12                 <entry key="group.id" value="0"/>
13                 <entry key="enable.auto.commit" value="true"/>
14                 <entry key="auto.commit.interval.ms" value="1000"/>
15                 <entry key="session.timeout.ms" value="15000"/>
16                 <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
17                 <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
18             </map>
19         </constructor-arg>
20     </bean>
21
22     <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
23         <constructor-arg ref="consumerProperties"/>
24     </bean>
25
26     <!-- 实际执行消息消费的类 -->
27     <bean id="kafkaConsumer" class="com.cnblogs.yjmyzz.consumer.DemoKafkaConsumer"/>
28
29     <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
30         <!--topic名字-->
31         <constructor-arg value="dc-monitor"/>
32         <property name="messageListener" ref="kafkaConsumer"/>
33     </bean>
34
35     <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
36           init-method="doStart">
37         <constructor-arg ref="consumerFactory"/>
38         <constructor-arg ref="containerProperties"/>
39     </bean>
40
41 </beans>

3.2 接收代码示例

public class DemoKafkaConsumer implements MessageListener<String, String> {@Overridepublic void onMessage(ConsumerRecord<String, String> data) {System.out.println("收到消息=====>" + data.value());}
}

 

spring集成kafka相关推荐

  1. spring集成kafka运行时报错:Failed to construct kafka producer] with root cause

    spring集成kafka运行时报错:Failed to construct kafka producer] with root cause org.apache.kafka.common.Kafka ...

  2. spring集成kafka,以及常见错误解决

    spring集成kafka,以及常见错误解决 一.配置kafka 1.引入jar包 <!--Kafka和spring集成的支持类库,spring和kafka通信监听--><!-- h ...

  3. spring 集成kafka:java.lang.NoSuchMethodError: org.springframework.util.Assert.state

    kafka 服务端版本 2.3.12 jdk 1.8 spring 版本 4.3.18 spring-kafka 版本 2.3.0 客户端调用报错信息如下: Exception in thread & ...

  4. 聊聊spring for kafka对consumer的封装与集成

    序 本文主要解析一下spring for kafka对原生的kafka client consumer的封装与集成. consumer工厂 spring-kafka-1.2.3.RELEASE-sou ...

  5. SpringBoot笔记:SpringBoot2.3集成Kafka组件配置

    文章目录 说明 Springboot集成Kafka 依赖配置 配置文件yml配置 Producer生产者 Consumer消费者 测试代码 注意事项 说明 本文是接<Kafka学习:CentOS ...

  6. springboot集成kafka及kafka web UI的使用

    springboot集成kafka application.properties spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,Ce ...

  7. Spring Apache Kafka教程

    在本SpringApache Kafka课程中,我们将学习如何在Spring Boot项目中开始使用Apache Kafka,并开始生成和使用我们所选主题的消息. 除了一个简单的项目外,我们还将深入探 ...

  8. SpringCloud学习之SpringCloudStream集成kafka

    一.关于Spring-Cloud-Stream Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框 ...

  9. 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例

    从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...

最新文章

  1. Javascript全局变量和delete
  2. 10.29T1 倍增+同余
  3. MFC,ADO方式实现数据库操作
  4. Django REST framework快速入门
  5. JS URL参数传递 谷歌乱码解决
  6. java tf值搜索_搜索引擎优化 TF_IDF之Java实现
  7. Xcode生成的可执行文件在哪里
  8. php 递归太多报错,php – javascript太多的递归?
  9. 第七章节 类的抽象(抽象类)
  10. searchBar没有光标解决方法
  11. 在GridView分页过程中,CheckBox的选中状态不丢失
  12. asp:树型select菜单
  13. Shell脚本基础语法教程
  14. 几种网赚项目引流的方法
  15. java scene_JavaFX中场景(Scene)的意义是什么?
  16. 关于货币流动的一点思考
  17. phalapi做登录检测_Phalapi使用教程
  18. Java 生成N位随机数的方法
  19. radius mysql md5_Radius认证
  20. iOS高仿微信项目、阴影圆角渐变色效果、卡片动画、波浪动画、路由框架等源码

热门文章

  1. linux分区知识,Linux硬盘分区知识
  2. Java创建MySQL句柄_MySQL创建用户(CREATE USER)
  3. 三支一扶计算机公共基础知识,三支一扶公共基础知识教材(精华版).doc
  4. 保镖机器人作文_我的保镖作文500字
  5. Docker容器的使用方法
  6. 软考高项之沟通与干系人管理-背诵记忆
  7. 数据库系统概论:第四章 数据库安全性
  8. java静态分派_Java基础——重载、静态分派与动态分派
  9. Linux下安装MongoDB单节点
  10. GitBash上传项目出现[fatal: remote origin already exists.]问题解决方案