为什么80%的码农都做不了架构师?>>>   

本文主要研究一下scalecube-cluster的GossipProtocol

GossipProtocol

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocol.java

/*** Gossip Protocol component responsible for spreading information (gossips) over the cluster* members using infection-style dissemination algorithms. It provides reliable cross-cluster* broadcast.*/
public interface GossipProtocol {/** Starts running gossip protocol. After started it begins to receive and send gossip messages */void start();/** Stops running gossip protocol and releases occupied resources. */void stop();/*** Spreads given message between cluster members.** @return future result with gossip id once gossip fully spread.*/Mono<String> spread(Message message);/** Listens for gossips from other cluster members. */Flux<Message> listen();
}
  • GossipProtocol接口定义了start、stop、spread、listen方法

GossipProtocolImpl

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java

public final class GossipProtocolImpl implements GossipProtocol {private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocolImpl.class);// Qualifierspublic static final String GOSSIP_REQ = "sc/gossip/req";// Injectedprivate final Member localMember;private final Transport transport;private final GossipConfig config;// Local Stateprivate long currentPeriod = 0;private long gossipCounter = 0;private Map<String, GossipState> gossips = new HashMap<>();private Map<String, MonoSink<String>> futures = new HashMap<>();private List<Member> remoteMembers = new ArrayList<>();private int remoteMembersIndex = -1;// Disposablesprivate final Disposable.Composite actionsDisposables = Disposables.composite();// Subjectprivate final FluxProcessor<Message, Message> subject =DirectProcessor.<Message>create().serialize();private final FluxSink<Message> sink = subject.sink();// Scheduledprivate final Scheduler scheduler;/*** Creates new instance of gossip protocol with given memberId, transport and settings.** @param localMember local cluster member* @param transport cluster transport* @param membershipProcessor membership event processor* @param config gossip protocol settings* @param scheduler scheduler*/public GossipProtocolImpl(Member localMember,Transport transport,Flux<MembershipEvent> membershipProcessor,GossipConfig config,Scheduler scheduler) {this.transport = Objects.requireNonNull(transport);this.config = Objects.requireNonNull(config);this.localMember = Objects.requireNonNull(localMember);this.scheduler = Objects.requireNonNull(scheduler);// SubscribeactionsDisposables.addAll(Arrays.asList(membershipProcessor //.publishOn(scheduler).subscribe(this::onMemberEvent, this::onError),transport.listen().publishOn(scheduler).filter(this::isGossipReq).subscribe(this::onGossipReq, this::onError)));}@Overridepublic void start() {actionsDisposables.add(scheduler.schedulePeriodically(this::doSpreadGossip,config.getGossipInterval(),config.getGossipInterval(),TimeUnit.MILLISECONDS));}@Overridepublic void stop() {// Stop accepting gossip requests and spreading gossipsactionsDisposables.dispose();// Stop publishing eventssink.complete();}@Overridepublic Mono<String> spread(Message message) {return Mono.fromCallable(() -> message).subscribeOn(scheduler).flatMap(msg -> Mono.create(sink -> futures.put(createAndPutGossip(msg), sink)));}@Overridepublic Flux<Message> listen() {return subject.onBackpressureBuffer();}private void onMemberEvent(MembershipEvent event) {Member member = event.member();if (event.isRemoved()) {remoteMembers.remove(member);}if (event.isAdded()) {remoteMembers.add(member);}}private void onGossipReq(Message message) {long period = this.currentPeriod;GossipRequest gossipRequest = message.data();for (Gossip gossip : gossipRequest.gossips()) {GossipState gossipState = gossips.get(gossip.gossipId());if (gossipState == null) { // new gossipgossipState = new GossipState(gossip, period);gossips.put(gossip.gossipId(), gossipState);sink.next(gossip.message());}gossipState.addToInfected(gossipRequest.from());}}private boolean isGossipReq(Message message) {return GOSSIP_REQ.equals(message.qualifier());}private String createAndPutGossip(Message message) {long period = this.currentPeriod;Gossip gossip = new Gossip(generateGossipId(), message);GossipState gossipState = new GossipState(gossip, period);gossips.put(gossip.gossipId(), gossipState);return gossip.gossipId();}//......}
  • GossipProtocolImpl实现了GossipProtocol接口,它维护了名为gossips的gossipId与GossipState的map,以及remoteMembers列表
  • 它的构造器订阅了membershipProcessor,触发onMemberEvent方法,该方法根据MembershipEvent来对remoteMembers进行添加或移除member;订阅了transport.listen(),过滤出GossipReq,触发onGossipReq方法,该方法合并GossipRequest的gossips到本地的gossips,对于新的gossip的message则发送到sink,并维护该gossip的gossipState,将请求的memberId添加到infected中;spread方法则将message放入到本地的gossips中
  • start方法每隔gossipInterval执行doSpreadGossip方法;spread方法则通过createAndPutGossip创建Gossip并放入gossips中

doSpreadGossip

scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java

public final class GossipProtocolImpl implements GossipProtocol {//......private List<Member> remoteMembers = new ArrayList<>();private int remoteMembersIndex = -1;private void doSpreadGossip() {// Increment periodlong period = currentPeriod++;// Check any gossips existsif (gossips.isEmpty()) {return; // nothing to spread}try {// Spread gossips to randomly selected member(s)selectGossipMembers().forEach(member -> spreadGossipsTo(period, member));// Sweep gossipssweepGossips(period);} catch (Exception ex) {LOGGER.warn("Exception at doSpreadGossip[{}]: {}", period, ex.getMessage(), ex);}}private void spreadGossipsTo(long period, Member member) {// Select gossips to sendList<Gossip> gossips = selectGossipsToSend(period, member);if (gossips.isEmpty()) {return; // nothing to spread}// Send gossip requestAddress address = member.address();gossips.stream().map(this::buildGossipRequestMessage).forEach(message ->transport.send(address, message).subscribe(null,ex ->LOGGER.debug("Failed to send GossipReq[{}]: {} to {}, cause: {}",period,message,address,ex.toString())));}private List<Gossip> selectGossipsToSend(long period, Member member) {int periodsToSpread =ClusterMath.gossipPeriodsToSpread(config.getGossipRepeatMult(), remoteMembers.size() + 1);return gossips.values().stream().filter(gossipState -> gossipState.infectionPeriod() + periodsToSpread >= period) // max rounds.filter(gossipState -> !gossipState.isInfected(member.id())) // already infected.map(GossipState::gossip).collect(Collectors.toList());}private List<Member> selectGossipMembers() {int gossipFanout = config.getGossipFanout();if (remoteMembers.size() < gossipFanout) { // select allreturn remoteMembers;} else { // select random members// Shuffle members initially and once reached top boundif (remoteMembersIndex < 0 || remoteMembersIndex + gossipFanout > remoteMembers.size()) {Collections.shuffle(remoteMembers);remoteMembersIndex = 0;}// Select membersList<Member> selectedMembers =gossipFanout == 1? Collections.singletonList(remoteMembers.get(remoteMembersIndex)): remoteMembers.subList(remoteMembersIndex, remoteMembersIndex + gossipFanout);// Increment index and return resultremoteMembersIndex += gossipFanout;return selectedMembers;}}private Message buildGossipRequestMessage(Gossip gossip) {GossipRequest gossipRequest = new GossipRequest(gossip, localMember.id());return Message.withData(gossipRequest).qualifier(GOSSIP_REQ).sender(localMember.address()).build();}private void sweepGossips(long period) {// Select gossips to sweepint periodsToSweep =ClusterMath.gossipPeriodsToSweep(config.getGossipRepeatMult(), remoteMembers.size() + 1);Set<GossipState> gossipsToRemove =gossips.values().stream().filter(gossipState -> period > gossipState.infectionPeriod() + periodsToSweep).collect(Collectors.toSet());// Check if anything selectedif (gossipsToRemove.isEmpty()) {return; // nothing to sweep}// Sweep gossipsLOGGER.debug("Sweep gossips[{}]: {}", period, gossipsToRemove);for (GossipState gossipState : gossipsToRemove) {gossips.remove(gossipState.gossip().gossipId());MonoSink<String> sink = futures.remove(gossipState.gossip().gossipId());if (sink != null) {sink.success(gossipState.gossip().gossipId());}}}//......}
  • doSpreadGossip方法首先递增currentPeriod,然后执行selectGossipMembers,遍历该member执行spreadGossipsTo,最后执行sweepGossips
  • selectGossipMembers方法会根据gossipFanout配置随机选择gossipFanout个member,这里维护了remoteMembersIndex,具体是对remoteMembers进行subList,当remoteMembersIndex小于0或remoteMembersIndex + gossipFanout > remoteMembers.size()时会Collections.shuffle(remoteMembers)并重置remoteMembersIndex为0,之后对remoteMembersIndex加上gossipFanout
  • spreadGossipsTo方法首先执行selectGossipsToSend获取要发送的gossips,然后通过buildGossipRequestMessage构造GOSSIP_REQ消息,最后通过transport.send方法发送
  • sweepGossips方法则选取periodsToSweep,然后从gossips移除period > gossipState.infectionPeriod() + periodsToSweep的gossipState

小结

  • GossipProtocol接口定义了start、stop、spread、listen方法;GossipProtocolImpl实现了GossipProtocol接口,它维护了名为gossips的gossipId与GossipState的map,以及remoteMembers列表
  • GossipProtocolImpl的构造器订阅了membershipProcessor,触发onMemberEvent方法,该方法根据MembershipEvent来对remoteMembers进行添加或移除member;订阅了transport.listen(),过滤出GossipReq,触发onGossipReq方法,该方法合并GossipRequest的gossips到本地的gossips,对于新的gossip的message则发送到sink,并维护该gossip的gossipState,将请求的memberId添加到infected中;spread方法则将message放入到本地的gossips中
  • GossipProtocolImpl的start方法每隔gossipInterval执行doSpreadGossip方法;spread方法则通过createAndPutGossip创建Gossip并放入gossips中;doSpreadGossip方法首先递增currentPeriod,然后执行selectGossipMembers,遍历该member执行spreadGossipsTo,最后执行sweepGossips

这里GossipProtocolImpl注册了onMemberEvent及onGossipReq,其中onMemberEvent用于监听MembershipEvent,并根据该event来维护remoteMembers列表;onGossipReq则是监听其他member的doSpreadGossip方法发送过来的GossipReq消息,合并该消息的gossips到本地的gossips;而doSpreadGossip方法则是每隔gossipInterval执行,根据gossipFanout配置随机选择gossipFanout个member,然后针对每个member选择要发送的gossips进行spread(onGossipReq及spread方法会更改gossips,而每隔gossipInterval触发的doSpreadGossip则从gossips选择待spread的消息进行发送)

doc

  • Gossip
  • GossipProtocol
  • GossipProtocolImpl

转载于:https://my.oschina.net/go4it/blog/3046056

聊聊scalecube-cluster的GossipProtocol相关推荐

  1. Redis实战(十三)Redis的三种集群方式

    序言 能聊聊redis cluster集群模式的原理吗 资料 https://www.cnblogs.com/51life/p/10233340.html Redis 集群分片原理 转载于:https ...

  2. 最新史上最好的Java面试突击课程第一季视频教程

    目录 ├─01_先来看一个互联网java工程师的招聘JD.zip ├─02_互联网Java工程师面试突击训练课程第一季的内容说明.zip ├─03_关于互联网Java工程师面试突击训练课程的几点说明. ...

  3. 聊聊storm的LoggingClusterMetricsConsumer

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下storm的LoggingClusterMetricsConsumer LoggingClusterMetrics ...

  4. Redis Cluster 原理你了解不?

    1. redis cluster 介绍 1.1 自动将数据进行分片,每个master上放一部分数据 1.2 提供内置的高可用支持,部分master不可用时,这是可以继续工作的 在redis clust ...

  5. mysql cluster_redislt;3.cluster集群模式gt;

    不点蓝字,我们哪来故事? 本文约:2000字 预计阅读时间:5分钟 1 前言 现在已经到了国庆的末尾了,大家这个国庆过的怎么样?是否已经顺利地从家中返航? 当你看到这篇文章的时候,就知道moon要来提 ...

  6. 聊聊分布式锁——Redis和Redisson的方式

    聊聊分布式锁--Redis和Redisson的方式 一.什么是分布式锁 分布式~~锁,要这么念,首先得是『分布式』,然后才是『锁』 分布式:这里的分布式指的是分布式系统,涉及到好多技术和理论,包括CA ...

  7. Redis都不懂?就别去面试了!聊聊我的Redis新专栏「视频版」

    前不久,有一个读者在后台留言,说他面试 Java 开发工程师岗位时,居然大部分的面试问题都是关于 Redis ,他都差点都忘记了自己应聘的是 Java 工程师了.而然这种现象在现在的后端面试中很常见, ...

  8. 源三:聊聊注册中心在蚂蚁集团的降本提效之路

    文|林育智(花名:源三 ) 蚂蚁集团高级专家 专注微服务/服务发现相关领域 校对|李旭东 本文 8624 字 阅读 18 分钟 |引 言| 服务发现是构建分布式系统的最重要的依赖之一, 在蚂蚁集团承担 ...

  9. 聊聊后端程序员的知识体系-第一篇

    聊聊后端程序员的知识体系-第一篇 原文链接:https://www.fpthinker.com/backend_knowledge_architecture/knowledge.htmll 亲爱的读者 ...

最新文章

  1. 和12岁小同志搞创客开发:Mind+编程软件简介、安装及使用
  2. 初学Windows编程笔记1——窗口和消息
  3. Android开源框架——内存泄漏检测工具 LeakCanary
  4. 真香!原来 CLI 开发可以这么简单
  5. QML 性能优化建议(二)
  6. php用go做跳转翻页,go.php跳转不输出权重的跳转方式真的有用么?
  7. Angular应用页面里_ngcontent属性的生成逻辑
  8. 最大流自用模板(例题:HDU1532)
  9. [即将举行的网络研讨会]对Kubernetes进行故障排除:您需要具备的7个关键组件
  10. python增加子类的参数_python 子类向父类传递关键字参数
  11. SQL查询成绩前3的student
  12. 接口参数,get和post
  13. linux vsftpd
  14. Springboot集成通用Mapper与Pagehelper,实现mybatis+Druid的多数据源配置
  15. python好用的内置库_python内置的高效好用各种库
  16. 王家林Spark 课程,蘑菇云,IMF真相
  17. 轻量纯css框架,网页设计的12个轻量CSS框架
  18. 教你十分钟写一个软件防火墙
  19. 从零开始学习菜鸟晋级黑客之黑客之“名词介绍”
  20. 微信十年,张小龙教给我们的产品方法论!(教科书一般的经典)

热门文章

  1. C++托管代码生成DLL
  2. IBM服务器raid5崩溃数据恢复方案及过程
  3. 伊利诺伊大学厄巴纳-香槟分校
  4. 什么叫双核、四核、八核?
  5. 今天什么日子啊,这么倒霉。。。
  6. Android进阶:七、Retrofit2.0原理解析之最简流程【上】
  7. HTTP2 基础知识点总结
  8. SVN、Git设置提交时忽略的文件
  9. 如何用技术搞好英俄翻译?
  10. Linux学习命令汇总三——Linux用户组管理,文件权限管理,文本搜索命令grep及正则表达式...