ElasticMQ是一台消息服务器,具有Scala,Java和与Amazon SQS兼容的接口。 它通过跨服务器群集复制消息来支持有保证的消息传递,并通过日志记录实现消息持久性。
消息复制是ElasticMQ的核心功能之一。 但是,如果您看一下代码,则只有少数几个类,最长的类有76行(请记住,这是Scala,虽然;))。 这是因为ElasticMQ使用JGroups作为基础通信库。 JGroups已经很老了,特别是对于Java库而言-JGroups的第一个发行版是在1999年(!)。 但是,它远不是过时和过时的-它具有一个不错的API,可以正常工作,拥有一个良好的社区。 并且因为任何Java库都可以与Scala很好地协作。
JGroups具有许多有用的功能:
  • 可靠的组播
  • 集群管理
  • 故障检测
  • 节点发现
  • 多年的性能改进
它们广泛用于在ElasticMQ中实施复制。 以下是其完成方式的摘要。
ElasticMQ集群如何工作?
在单个ElasticMQ集群中,一个节点始终是主节点。 您只能对此节点执行操作。 然后将每个操作的结果复制到其他成员。 有两种与阻止有关的选项; 复制可以是完全异步的,也可以等待直到至少一个或所有节点确认该操作。 为了确保在群集分区的情况下不会从不同分区收到相同的消息,只有具有至少一半+1节点操作的分区处于活动状态。
ElasticMQ中的中心概念是消息存储。 存储器执行命令(例如,发送消息命令,删除消息命令等)。 复制层只是任何其他存储的包装。 但是请注意,我们只能复制产生的存储突变(因此在执行命令之后),而不是原始命令本身。 例如,如果命令是“接收消息”,则在每台计算机上执行该命令的结果可能会有所不同。 因此,如果接收消息成功,我们将仅复制消息可见性的更改(在ElasticMQ中,类似于Amazon SQS ,如果接收到消息,则会在指定的时间段内阻止后续接收该消息)。 您可以在JGroupsReplicatedStorage中看到此基本逻辑。
初始化集群
在开始复制之前,首先要做的是初始化集群。 这是在ReplicatedStorageConfigurator中完成的。 作为参数,我们需要一个JGroups配置文件,该文件是协议栈。 您实际上并不需要知道每种协议的功能以及所有这些配置参数的含义。 最有用的两个是udp.xml和tcp.xml 。 如果您的网络中有多播,则应使用第一个;如果所有通信都应通过TCP(例如,在EC2上),则应使用第二个。 在后一种情况下,您还需要提供初始IP列表。 该列表不必详尽无遗,只需列出种子即可。
拥有协议栈,ElasticMQ创建一个JChannel并将其连接,这仅意味着连接到集群。 实际上,这就是使用JGroups创建集群所需要做的所有工作-非常简单,对吧? 正如您在ReplicatedStorageConfigurator的末尾看到的那样,连接之后的第一件事是对channel.getState(null,0)的调用。 这将转到当前的主节点(稍后会进行有关主选举的更多信息),获取状态(当前的队列和消息)并将其应用于当前的节点(请参阅非常简单的JGroupsStateTransferMessageListener-处理发送和接收)。 这里有两件事要注意。 首先,此传输不会阻止整个群集正常运行。 其次,如果在状态转移期间执行了一项操作,则该操作也会被复制。 因此,可能会在新节点上执行一次命令两次。 但这无关紧要,因为每个复制的命令都是幂等的,因此可以多次应用。 在其他情况下,必须实施某些应用程序侧机制以防止此类情况。
复制数据
最后,我们进入核心:复制命令。 在发送方,这由JGroupsReplicationMessageSender处理。 同样,这不是一个非常复杂的类。 它使用来自JGroups的MessageDispatcher “构建块”,除了在整个集群中对消息进行多重处理外,还使您能够等待,直到指定数量的节点接收到它为止。 在接收方,我们有JGroupsRequestHandler 。 同样,非常简单。 收到消息后,它仅发送到存储。
集群管理
您可能还注意到SetMaster特殊消息。 用户需要此权限才能读取当前主节点的节点地址。 主选举(决定哪个节点是主节点)完全由JGroups处理。 JGroups中没有特定的算法来选举主节点,但是我们可以利用以下事实:每个节点都有相同的集群视图,由JGroups View类表示。 我们要做的就是简单地从列表中获取第一个(或最后一个或第3个,等等-只要在所有节点上都相同),然后将其设置为主节点即可。
群集视图由最后一个“核心”复制类JGroupsMembershipListener处理 。 那里发生了两件事。 每当新节点加入或离开集群时,都会调用viewAccepted回调。 每个具有View类的实例(很好,等于:))的节点。 主机在单独的线程中广播其地址(这是ElasticMQ服务器地址,而不是内部JGroups集群通信地址)。 在一个JGroups回调方法中执行阻塞操作是一个非常容易的错误。 您永远不应该那样做,因为整个堆栈都可以锁定。 我们还需要FLUSH协议(总是在集群设置过程中添加); 该协议可确保在所有节点都安装新视图之前,不发送新消息,因此,我们确保新节点始终接收主信息。
成员资格侦听器还处理集群合并。 同样,JGroups为我们提供了合并分区的视图以及新的合并视图。 在ElasticMQ中,除了主分区(最大分区)以外的所有分区都请求状态转移,就像连接到集群之后一样。 这样,数据将保持一致状态。
加起来
还值得注意的是,使用ScalaTest对ElasticMQ的复制进行了全面测试。 每个测试都会创建一个内存存储集群,创建新节点或模拟节点崩溃。 请参见JGroupsReplicatedStorageTest类。
有了JGroups的机制,就可以轻松实现集群通信。 但是,与往常一样,您需要记住一些有关并发的陷阱(例如,新节点加入时可能会有集群活动;分区和合并可能随时发生;正常消息和集群视图更改之间没有顺序) ;可以在状态转移期间发送消息;等等。 但是,JGroups 教程和手册都非常全面,并且得到了论坛的其他帮助(感谢Bela!),您应该一切顺利。
您可以通过下载独立的ElasticMQ 发行版或以嵌入式方式运行它,来尝试复制在实践中的工作方式。
参考:来自Adam Warski博客的Blog的 JCG合作伙伴 Adam Warski使用JGroups在ElasticMQ中实现消息复制 。

翻译自: https://www.javacodegeeks.com/2012/06/elasticmq-message-replication-with.html

使用JGroups进行ElasticMQ消息复制相关推荐

  1. jgroups传输消息_使用JGroups进行ElasticMQ消息复制

    jgroups传输消息 ElasticMQ是一个消息服务器,具有Scala,Java和与Amazon SQS兼容的接口. 它通过跨服务器群集复制消息来支持有保证的消息传递,并通过日志记录实现消息持久性 ...

  2. rocketmq怎么保证数据不会重复_阿里架构师亲授:Kafka和RocketMQ的消息复制实现的差异点在哪?...

    众所周知,消息队列在收发两端,主要是依靠业务代码,配合请求确认的机制,来保证消息不会丢失的.而在服务端,一般采用持久化和复制的方式来保证不丢消息. 把消息复制到多个节点上,不仅可以解决丢消息的问题,还 ...

  3. JAVA 多用户商城系统b2b2c-kafka处理超大消息

    Kafka设计的初衷是迅速处理短小的消息,一般10K大小的消息吞吐性能最好.但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如 ...

  4. 浅谈分布式消息技术 Kafka--大数据技术栈05

    回顾:大数据平台技术栈 (ps:可点击查看),今天就来说说其中的Kafka! 本文转载自 linkedkeeper.com (文/张松然) Kafka的基本介绍 Kafka是最初由Linkedin公司 ...

  5. “简单”的消息队列与kafka

    小时候就特别喜欢龙系精灵,特别是乘龙,后来才知道只是冰水...尴尬. 在宠物小精灵中,乘龙一直是训练家的载人伙伴,和我们下面的MQ好像有几分相似呢~~ 前言 MQ,全称消息队列,现在市面上有很多种消息 ...

  6. javax消息队列_Java面试—消息队列

    消息队列面试题 题目来自于中华石杉,解决方案根据自己的思路来总结而得. 题目主要如下: 1. 为什么要引入消息队列? 消息队列的引入可以解决3个核心问题: 解耦 异步 削峰 解耦 在一个项目中,如果一 ...

  7. rocketmq广播消息为什么不能重试_几分钟带你看懂“消息队列和RocketMQ”的入门总结

    消息队列扫盲 消息队列顾名思义就是存放消息的队列,队列我就不解释了,别告诉我你连队列都不知道似啥吧? 所以问题并不是消息队列是什么,而是 消息队列为什么会出现?消息队列能用来干什么?用它来干这些事会带 ...

  8. 还不知道事务消息吗?这篇文章带你全面扫盲

    目录 为什么需要事务消息? 事务消息 事务消息使用注意点 彩蛋 在分布式系统中,为了保证数据一致性是必须使用分布式事务.分布式事务实现方式就很多种,今天主要介绍一下使用 RocketMQ 事务消息,实 ...

  9. PWA(Progressive Web App)入门系列:消息通讯

    前言 serviceWorker 的能力决定它要处理的事情,网站页面的部分逻辑处理会转移到 serviceWorker 层进行处理,这里就要页面层和 serviceWorker 层进行交互来实现消息通 ...

最新文章

  1. ​《2021联邦学习全球研究与应用趋势报告》发布,中美为最大领跑者 | 附下载链接...
  2. Netty通信框架Java实现小记
  3. Python中的高效的集合操作
  4. C++ using namespace 命名空间的定义与使用
  5. vs2019中如何创建qt项目_在VS2015中创建Qt项目【VS+Qt项目开发系列】(二)
  6. 空格 过滤多个_CAD选择过滤器的运算符如何使用?
  7. mac下增加eclipse内存
  8. npm 私有库开源组件_苹果的ResearchKit,npm私有模块以及更多开源新闻
  9. 树莓派4B Raspbian-buster 更换源
  10. 计算机主板用塑料做的好吗,电脑主板包装的塑料袋为什么是用透明胶封的,这样...-卓优商学问答...
  11. Mmap的实现原理和应用
  12. VBA连接MySQL数据库以及ODBC的配置(ODBC版本和MySQL版本如果不匹配会出现驱动和应用程序的错误)...
  13. Mac 下利用 FileMerge 进行代码比较、合并
  14. c#随机产生常用汉字
  15. 人在广州,力撑陈 Sir
  16. Python入门笔记3
  17. 【PyTorch教程】P30 GPU加速
  18. np.array的axis进行横向纵向的求和运算
  19. CIE1931-XYZ转CCT和Duv
  20. Pytorch 实现全连接神经网络/卷积神经网络训练MNIST数据集,并将训练好的模型在制作自己的手写图片数据集上测试

热门文章

  1. 利用ant构建 jsp-servlet-class-jar
  2. java xmpp_Java XMPP负载测试工具
  3. stream分组计数_Java Stream:第2部分,计数始终是计数吗?
  4. java延时执行_Java谓词的延迟执行
  5. openshift命令_使用命令行工具创建WildFly OpenShift应用程序
  6. jpa 实体映射视图_JPA教程:实体映射-第3部分
  7. Angular 8 + Spring Boot 2.2:立即构建一个CRUD应用程序!
  8. maven项目 ant_将旧项目从Ant迁移到Maven的4个简单步骤
  9. AWS re:Invent 2018的5大公告
  10. orika 映射非空字段_Orika:将JAXB对象映射到业务/域对象