概述

一般消息队列的是实现是支持两种模式的,即点对点,还有一种是topic发布订阅者模式,比如ACTIVEMQ。KAFKA也支持这两种模式,但是实现的原理不一样。

KAFKA 的消息被读取后,并不是马上删除,这样就可以重复读取。kafka 正式利用这种特性实现发布订阅者模式。

即在发布消息的时候,发布一个topic,可以使用配置多个消费者来消费,消费者使用分组来实现。比如一个topic ,有两个分组的消费者订阅。

那么发布一个消息的时候,两个分组的消费者可以读取到此条消息。

实现

配置两组消费者。

分组1

 <bean id="consumerProperties" class="java.util.HashMap"><constructor-arg><map><!-- 配置kafka的broke --><entry key="bootstrap.servers" value="${kafka.brokerurl}"/><!-- 配置组--><entry key="group.id" value="group1"/><entry key="enable.auto.commit" value="true"/><entry key="auto.commit.interval.ms" value="1000"/><entry key="session.timeout.ms" value="30000"/><entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/><entry key="value.deserializer" value="com.redxun.jms.ObjectDeSerializer"/></map></constructor-arg></bean><!-- 创建consumerFactory bean --><bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"><constructor-arg><ref bean="consumerProperties"/></constructor-arg></bean>

注意 这个分组的ID 是 group1

分组2

<bean id="consumerProperties2" class="java.util.HashMap"><constructor-arg><map><!-- 配置kafka的broke --><entry key="bootstrap.servers" value="${kafka.brokerurl}"/><!-- 配置组--><entry key="group.id" value="group2"/><entry key="enable.auto.commit" value="true"/><entry key="auto.commit.interval.ms" value="1000"/><entry key="session.timeout.ms" value="30000"/><entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/><entry key="value.deserializer" value="com.redxun.jms.ObjectDeSerializer"/></map></constructor-arg></bean><!-- 创建consumerFactory bean --><bean id="consumerFactory2" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"><constructor-arg><ref bean="consumerProperties2"/></constructor-arg></bean>

这里配置的分组是2 group2 。

我们使用代码测试发布消息:

IMessageProducer producer= MessageUtil.getProducer();LogEntity ent=new LogEntity();ent.setId("000000001");ent.setIp("192.168.1.1");ent.setAction("test");producer.send("logMessageQueue", ent);return "1";

我们在发布一个消息的时候,两个分组的消费者都读取到了这条消息,因此就实现了 发布订阅者模式。

转载于:https://www.cnblogs.com/yg_zhang/p/10194115.html

kafka 支持发布订阅相关推荐

  1. 搭建高吞吐量 Kafka 分布式发布订阅消息 集群

    搭建高吞吐量 Kafka 分布式发布订阅消息 集群 简介 Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区. ...

  2. Kafka(分布式发布-订阅消息系统)

    一.简介 Apache Kafka是分布式发布-订阅消息系统,在 kafka官网上对 kafka 的定义:一个分布式发布-订阅消息传递系统. 它最初由LinkedIn公司开发,Linkedin于201 ...

  3. kafka 发布订阅_在Kafka中发布订阅模型

    kafka 发布订阅 这是第四个柱中的一系列关于同步客户端集成与异步系统( 1, 2, 3 ). 在这里,我们将尝试了解Kafka的工作方式,以便正确利用其发布-订阅实现. 卡夫卡概念 根据官方文件 ...

  4. 在Kafka中发布订阅模型

    这是第四个柱中的一系列关于同步客户端集成与异步系统( 1, 2, 3 ). 在这里,我们将尝试了解Kafka的工作方式,以便正确利用其发布-订阅实现. 卡夫卡概念 根据官方文件 : Kafka是一种分 ...

  5. kafka为什么用java重写,kafka怎么发布订阅 怎么在java中实现

    匿名用户 1级 2017-03-28 回答 这是我们项目中用到的代码 public class ProducerService { private static Logger log = Logger ...

  6. Kafka分布式发布订阅消息系统

  7. kafka 基础知识梳理-kafka是一种高吞吐量的分布式发布订阅消息系统

    一.kafka 简介 今社会各种应用系统诸如商业.社交.搜索.浏览等像信息工厂一样不断的生产出各种信息,在大数据时代,我们面临如下几个挑战: 如何收集这些巨大的信息 如何分析它 如何及时做到如上两点 ...

  8. 【自动驾驶】9.分布式通信技术之发布订阅,干货满满

    分布式通信技术之发布订阅 原文链接 前面我们一起学习了分布式通信中的远程调用(分布式通信技术之远程调用:RPC ).远程调用的核心是在网络服务层封装了通信协议.序列化.传输等操作,让用户调用远程服务如 ...

  9. kafka 发布-订阅模式_使用Apache Kafka作为消息系统的发布-订阅通信中的微服务,并通过集成测试进行了验证...

    kafka 发布-订阅模式 发布-订阅消息系统在任何企业体系结构中都起着重要作用,因为它可以实现可靠的集成而无需紧密耦合应用程序. 在解耦的系统之间共享数据的能力并不是一个容易解决的问题. 考虑一个企 ...

最新文章

  1. 网上几种常见校验码图片分析
  2. hdu1799 循环多少次?(组合递推公式的使用)
  3. Linux查看谁修改的文件,linux如何查看近来修改的文件
  4. Android 6.0 动态权限申请
  5. HDU 1176 免费馅饼 (动态规划、另类数塔)
  6. red hat linux 远程,Red Hat Linux 远程桌面 – 如何设置
  7. docker sonarqube 7.7 sonar-scanner-4.6.2 maven 安装、搭建+实战
  8. 恒强制版系统980_速来围观 | 恒强制版小图高级功能讲解
  9. 打印list_按之字形顺序打印二叉树
  10. 越知道自己要什么,越知道自己是什么
  11. 实用的摩斯编码(二)
  12. 红与黑题解(深搜入门ing)
  13. python read_csv chunk_Python chunk读取超大文件
  14. 《控制论导论》读书:基本概念
  15. 搜狗浏览器个人数据丢失解决方案
  16. 鱼鹰软件签约新三板挂牌企业风盛股份
  17. 从 HTTP 瞎逼逼到 HTTP/2
  18. WPF教程(五) XAML是什么?
  19. 白盒测试(单元测试JUnit使用断言assertThat中startsWith、endsWith方法)
  20. nginx禁止外网访问

热门文章

  1. C/C++ atol函数- C语言零基础入门教程
  2. 安卓的java无法访问网络_Android网络访问的基本方法
  3. 服务器修改用户组权限设置,如何:修改用户的权限
  4. [Deepin - Pycharm调试记录] Pyinstaller索引系统库问题
  5. java and dsl_Groovy语法糖以及DSL
  6. JAVA结课_一点心情,写java结课考试之前
  7. 萧县机器人_全国总决赛第一名!萧县杨楼的这位学生厉害了
  8. java线程6种状态转换,Java线程的生命周期和各种状态转换详解
  9. 查询列名在哪张表_探索SQL-多表查询
  10. getline没有与参数列表匹配的重载函数_C++新增基础功能解析—函数重载功能的使用...