1. 引入maven依赖

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-kafka</artifactId><version>${spring-integration-kafka.version}</version>
</dependency>

2. 生产者的xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:int="http://www.springframework.org/schema/integration"xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsdhttp://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsdhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"><int:channel id="outWriteBackLemmaRecordChannel" /><int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"kafka-template="kafkaTemplate"auto-startup="true"channel="outWriteBackLemmaRecordChannel"order="3"topic="writeBackLemmaRecordTopic"><int-kafka:request-handler-advice-chain><bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" /></int-kafka:request-handler-advice-chain></int-kafka:outbound-channel-adapter><bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"><constructor-arg><bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory"><constructor-arg><map><entry key="bootstrap.servers" value="1.1.1.1:9092,2.2.2.2:9092"/><entry key="retries" value="10"/><entry key="batch.size" value="16384"/><entry key="linger.ms" value="1"/><entry key="buffer.memory" value="33554432"/><entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/><entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/></map></constructor-arg></bean></constructor-arg><constructor-arg name="autoFlush" value="true"/><property name="defaultTopic" value="writeBackLemmaRecordTopic"/></bean><bean id="kafkaProducerService"class="com.soso.baike.admin.service.kafka.producer.impl.KafkaProducerServiceImpl"/>
</beans>

针对DefaultKafkaProducerFactory 的参数,本公司其实是配置注册到了zookeeper上,针对开发环境,预发环境,线上环境的配置是不同的,所以zookeeper上分别针对不同的环境注册了三套配置文件,发布的时候,会根据要发布的环境去zookeeper上拉取对应环境的配置文件,从而填充DefaultKafkaProducerFactory的构造参数

3. 发送消息

发送消息是上述配置文件中配置的KafkaProducerServiceImpl类

package com.soso.baike.admin.service.kafka.producer.impl;import com.soso.baike.admin.service.kafka.producer.IKafkaProducerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;public class KafkaProducerServiceImpl implements IKafkaProducerService {private Logger logger = LoggerFactory.getLogger("kafka");@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;<span style="white-space:pre">   </span>//这个已经在上述xml文件中配置@Overridepublic void sendMessage(String topic, String data) {logger.info("the message is to be send by kafka is : topic = {}, data = {}", topic, data);kafkaTemplate.setDefaultTopic(topic);kafkaTemplate.sendDefault(data);}@Overridepublic void sendMessage(String topic, int key, String data) {logger.info("the message is to be send by kafka is : topic = {}, data = {}", topic, data);kafkaTemplate.setDefaultTopic(topic);kafkaTemplate.sendDefault(key, data);}
}

4.  消费者xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd"><int-kafka:message-driven-channel-adapterid="kafkaMessageDrivenChannelAdapter"listener-container="kafkaMessageListenerContainer"auto-startup="true"phase="100"send-timeout="5000"channel="nullChannel"message-converter="messagingMessageConverter"error-channel="errorChannel"/><bean id="messagingMessageConverter" class="org.springframework.kafka.support.converter.MessagingMessageConverter"/><bean id="kafkaMessageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"><constructor-arg><bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"><constructor-arg><map><entry key="bootstrap.servers" value="${kafka.consumer.bootstrap.servers}"/><entry key="group.id" value="${kafka.consumer.group.id}"/><entry key="enable.auto.commit" value="${kafka.consumer.enable.auto.commit}"/><entry key="auto.commit.interval.ms" value="${kafka.consumer.auto.commit.interval.ms}"/><entry key="session.timeout.ms" value="${kafka.consumer.session.timeout.ms}"/><entry key="key.deserializer" value="${kafka.consumer.key.deserializer}"/><entry key="value.deserializer" value="${kafka.consumer.value.deserializer}"/></map></constructor-arg></bean></constructor-arg><constructor-arg><bean class="org.springframework.kafka.listener.config.ContainerProperties"><constructor-arg name="topics" value="writeBackLemmaRecordTopic"/></bean></constructor-arg></bean><!-- 实际执行消息消费的类 --><bean id="kafkaConsumerService"class="com.soso.baike.admin.service.kafka.consumer.impl.KafkaConsumerServiceImpl"/>
</beans>

上述DefaultKafkaConsumerFactory的构造参数就是在配置文件中配置的,这里你可以直接替换成实际的参数而不用配置文件

5. 接收消息类是上述配置文件中配置的KafkaConsumerServiceImpl类,代码如下:

package com.soso.baike.admin.service.kafka.consumer.impl;import com.soso.baike.admin.constant.KafkaConstants;
import com.soso.baike.admin.lmaimp.DummyUser;
import com.soso.baike.admin.service.kafka.consumer.IKafkaConsumerService;
import com.soso.baike.audit.Auditors;
import com.soso.baike.audit.db.LemmaAuditDao;
import com.soso.baike.audit.lemma.LemmaRecord;
import com.soso.baike.audit.lemma.LemmaWriteBackOp;
import com.soso.baike.domain.IdConvert;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** Created by zhangyongguang on 2016/6/30.*/
public class KafkaConsumerServiceImpl implements IKafkaConsumerService, InitializingBean {private Logger logger = LoggerFactory.getLogger("kafka");@Autowiredprivate KafkaMessageListenerContainer kafkaMessageListenerContainer;@Autowiredprivate LemmaWriteBackOp lemmaWriteBackOp;private int threadNum = 8;private int maxQueueSize = 2000;private ExecutorService executorService = new ThreadPoolExecutor(threadNum,threadNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(maxQueueSize),new ThreadPoolExecutor.CallerRunsPolicy());@Overridepublic void onMessage(ConsumerRecord<Integer, String> record) {logger.info("===============processMessage===============");logger.info("the kafka message is arriving with topic = {}, partition = {}, key = {}, value = {}",new Object[]{record.topic(), record.partition(), record.key(), record.value()});
<span style="white-space:pre"> </span>//这里收到消息后,开启了一个线程来处理<span style="white-space:pre"> </span>executorService.execute(new Runnable() {@Overridepublic void run() {String msg = record.value();}});}@Override<span style="white-space:pre">   </span>//设置监听public void afterPropertiesSet() throws Exception {ContainerProperties containerProperties = kafkaMessageListenerContainer.getContainerProperties();if (null != containerProperties) {containerProperties.setMessageListener(this);}}
}

使用spring集成的kafka收发消息相关推荐

  1. Java操作Kafka收发消息demo

    通过Java程序来进行Kafka收发消息的演示 Kafka自身提供的Java客户端来演示消息的收发,与Kafka的Java客户端相关的Maven依赖如下: <properties>< ...

  2. 第三集 Spring for Apache Kafka 接受消息

    我们可以接受消息通过配置一个MessageListenerContainer 和提供一个消息监听或者通过使用@KafkaListener 注解 3.1 Message Listeners 当我们使用一 ...

  3. 使用 CocoaPods 给微信集成 SDK 打印收发消息

    推荐序 本文介绍的是一套逆向工具,可以在非越狱手机上给任意应用增加插件.在文末的示例中,作者拿微信举例,展示出在微信中打印收发消息的功能. 这套工具可以加快逆向开发的速度,其重签名思想也可以用于二次分 ...

  4. 使用CocoaPods给微信集成SDK打印收发消息

    推荐序 本文介绍的是一套逆向工具,可以在非越狱手机上给任意应用增加插件.在文末的示例中,作者拿微信举例,展示出在微信中打印收发消息的功能. 这套工具可以加快逆向开发的速度,其重签名思想也可以用于二次分 ...

  5. spring cloud stream kafka 处理消息

    spring cloud stream kafka <dependency><groupId>org.springframework.cloud</groupId> ...

  6. Kafka实现消息生产和消费

    文章目录 一.Kafka测试消息生产与消费 二.Java程序进行Kafka收发消息 1.消息生产者 2.消息消费者 一.Kafka测试消息生产与消费 # 首先创建一个主题 [root@192 kafk ...

  7. Kafka 使用SASL接入点PLAIN机制收发消息(集成Spring Boot)

    默认接入点接入kafka可以参考: Spring工程项目使用拉取型消费者 KafkaConsumer的常用配置及注意事项 这里没有直接使用 spring封装的kafkaTemplate, 使用的是 原 ...

  8. spring集成kafka,以及常见错误解决

    spring集成kafka,以及常见错误解决 一.配置kafka 1.引入jar包 <!--Kafka和spring集成的支持类库,spring和kafka通信监听--><!-- h ...

  9. RabbitMQ –使用Spring集成Java DSL串行处理消息

    如果您曾经需要使用RabbitMQ来串行处理消息,并且有一群监听器来处理消息,那么我所看到的最好方法是在监听器上使用"独占消费者"标志,每个监听器上有1个线程来处理消息. 专用使用 ...

最新文章

  1. FD.io/VPP — IPSec NAT-T
  2. 屏蔽鼠标右键、Ctrl+N、Shift+F10、F11、F5刷新、退格键
  3. centos系统设置局域网静态IP
  4. Android之实现夸克浏览器书签和历史页面滑动时候右上角图标切换效果
  5. hdu 4810 Wall Painting
  6. linux7squid编译安装,CentOS 7.3 源码安装squid 4.12 及安装过程遇到的一些问题
  7. SPSS分析技术:多元方差分析
  8. 深入了解Android蓝牙Bluetooth——《基础篇》
  9. HP服务器集成 iLO 端口的配置
  10. poj1990两个树状数组
  11. anaconda安装torch_零基础入门PyTorch:怎样用?有哪些优势?手把手带你安装配置...
  12. 历史上的重大软件BUG启示录 第6篇---蠕虫“冲击波”
  13. html input属性都有啥,input 属性有哪些input标签常用属性
  14. 企业微信最全17种获客+4种自动转化玩法
  15. 网络协议—三要素与五层网络协议
  16. RabbitMQ解决消息幂等性问题
  17. 数字IC后端流程——(二)布局规划Floorplan
  18. 《Linux指令从入门到精通》——4.3 Linux下全屏幕文本编辑器的命令行方式
  19. Docker之API操作
  20. Java 查找数组中某个数字的下标

热门文章

  1. 获取Tekla属性方式
  2. Bytom国密网说明和指南
  3. C#中的 Stream
  4. 迭代器之输入和输出迭代器
  5. 在Sharepoint Designer 2007 中加入定制的工作流动作
  6. 去掉IE的图片工具条
  7. 【maven】新建一个maven项目的基本配置
  8. 08-02 性能测试--负载模型与压力来源
  9. php 发帖代码,我的论坛源代码(四)_php
  10. 服务器x不会下载mysql_MySQL_解决MySQL数据库死掉以及拒绝服务的方法,从Mysql 5.x的某个版本之后,MySQ - phpStudy...