spring集成kafka
一、添加依赖项
compile 'org.springframework.kafka:spring-kafka:1.2.2.RELEASE'
二、发消息(生产者)
2.1 xml配置
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans 5 http://www.springframework.org/schema/beans/spring-beans.xsd"> 6 7 <bean id="producerProperties" class="java.util.HashMap"> 8 <constructor-arg> 9 <map> 10 <!--kafka的服务地址,多个地址用英文逗号连接--> 11 <entry key="bootstrap.servers" value="192.168.0.10:9092,192.168.0.11:9092,192.168.0.12:9092"/> 12 <entry key="group.id" value="0"/> 13 <entry key="retries" value="10"/> 14 <entry key="batch.size" value="16384"/> 15 <entry key="linger.ms" value="1"/> 16 <entry key="buffer.memory" value="33554432"/> 17 <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/> 18 <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/> 19 </map> 20 </constructor-arg> 21 </bean> 22 23 <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> 24 <constructor-arg> 25 <ref bean="producerProperties"/> 26 </constructor-arg> 27 </bean> 28 29 <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> 30 <constructor-arg ref="producerFactory"/> 31 <constructor-arg name="autoFlush" value="true"/> 32 <!--topic名字--> 33 <property name="defaultTopic" value="dc-monitor"/> 34 </bean> 35 36 </beans>
2.2 发送代码示例
@Testpublic void send() throws InterruptedException, ExecutionException, TimeoutException {KafkaTemplate template = context.getBean(KafkaTemplate.class);String msg = "中华人民共和国万岁!";ListenableFuture<SendResult<String, String>> future = template.sendDefault(msg);SendResult<String, String> result = future.get(10, TimeUnit.SECONDS);System.out.println("发送成功=====>" + msg);}
三、收消息(消费者)
3.1 xml配置
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans 5 http://www.springframework.org/schema/beans/spring-beans.xsd"> 6 7 <bean id="consumerProperties" class="java.util.HashMap"> 8 <constructor-arg> 9 <map> 10 <!--kafka的服务地址,多个地址用英文逗号连接--> 11 <entry key="bootstrap.servers" value="192.168.0.10:9092,192.168.0.11:9092,192.168.0.12:9092"/> 12 <entry key="group.id" value="0"/> 13 <entry key="enable.auto.commit" value="true"/> 14 <entry key="auto.commit.interval.ms" value="1000"/> 15 <entry key="session.timeout.ms" value="15000"/> 16 <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> 17 <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> 18 </map> 19 </constructor-arg> 20 </bean> 21 22 <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> 23 <constructor-arg ref="consumerProperties"/> 24 </bean> 25 26 <!-- 实际执行消息消费的类 --> 27 <bean id="kafkaConsumer" class="com.cnblogs.yjmyzz.consumer.DemoKafkaConsumer"/> 28 29 <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> 30 <!--topic名字--> 31 <constructor-arg value="dc-monitor"/> 32 <property name="messageListener" ref="kafkaConsumer"/> 33 </bean> 34 35 <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" 36 init-method="doStart"> 37 <constructor-arg ref="consumerFactory"/> 38 <constructor-arg ref="containerProperties"/> 39 </bean> 40 41 </beans>
3.2 接收代码示例
public class DemoKafkaConsumer implements MessageListener<String, String> {@Overridepublic void onMessage(ConsumerRecord<String, String> data) {System.out.println("收到消息=====>" + data.value());}
}
spring集成kafka相关推荐
- spring集成kafka运行时报错:Failed to construct kafka producer] with root cause
spring集成kafka运行时报错:Failed to construct kafka producer] with root cause org.apache.kafka.common.Kafka ...
- spring集成kafka,以及常见错误解决
spring集成kafka,以及常见错误解决 一.配置kafka 1.引入jar包 <!--Kafka和spring集成的支持类库,spring和kafka通信监听--><!-- h ...
- spring 集成kafka:java.lang.NoSuchMethodError: org.springframework.util.Assert.state
kafka 服务端版本 2.3.12 jdk 1.8 spring 版本 4.3.18 spring-kafka 版本 2.3.0 客户端调用报错信息如下: Exception in thread & ...
- 聊聊spring for kafka对consumer的封装与集成
序 本文主要解析一下spring for kafka对原生的kafka client consumer的封装与集成. consumer工厂 spring-kafka-1.2.3.RELEASE-sou ...
- SpringBoot笔记:SpringBoot2.3集成Kafka组件配置
文章目录 说明 Springboot集成Kafka 依赖配置 配置文件yml配置 Producer生产者 Consumer消费者 测试代码 注意事项 说明 本文是接<Kafka学习:CentOS ...
- springboot集成kafka及kafka web UI的使用
springboot集成kafka application.properties spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,Ce ...
- Spring Apache Kafka教程
在本SpringApache Kafka课程中,我们将学习如何在Spring Boot项目中开始使用Apache Kafka,并开始生成和使用我们所选主题的消息. 除了一个简单的项目外,我们还将深入探 ...
- SpringCloud学习之SpringCloudStream集成kafka
一.关于Spring-Cloud-Stream Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框 ...
- 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例
从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...
最新文章
- Javascript全局变量和delete
- 10.29T1 倍增+同余
- MFC,ADO方式实现数据库操作
- Django REST framework快速入门
- JS URL参数传递 谷歌乱码解决
- java tf值搜索_搜索引擎优化 TF_IDF之Java实现
- Xcode生成的可执行文件在哪里
- php 递归太多报错,php – javascript太多的递归?
- 第七章节 类的抽象(抽象类)
- searchBar没有光标解决方法
- 在GridView分页过程中,CheckBox的选中状态不丢失
- asp:树型select菜单
- Shell脚本基础语法教程
- 几种网赚项目引流的方法
- java scene_JavaFX中场景(Scene)的意义是什么?
- 关于货币流动的一点思考
- phalapi做登录检测_Phalapi使用教程
- Java 生成N位随机数的方法
- radius mysql md5_Radius认证
- iOS高仿微信项目、阴影圆角渐变色效果、卡片动画、波浪动画、路由框架等源码
热门文章
- linux分区知识,Linux硬盘分区知识
- Java创建MySQL句柄_MySQL创建用户(CREATE USER)
- 三支一扶计算机公共基础知识,三支一扶公共基础知识教材(精华版).doc
- 保镖机器人作文_我的保镖作文500字
- Docker容器的使用方法
- 软考高项之沟通与干系人管理-背诵记忆
- 数据库系统概论:第四章 数据库安全性
- java静态分派_Java基础——重载、静态分派与动态分派
- Linux下安装MongoDB单节点
- GitBash上传项目出现[fatal: remote origin already exists.]问题解决方案