带着问题读源码系列-soul的本地服务筛选

在上一期中,了解到soul的http请求是通过dividePlugin插件完成对本地服务的筛选。

总体来说,可以分为两步:

1. 选出符合调用要求的服务列表

2. 对服务的列表进行负载均衡

下面对两部分源码进行分析。

在DividePlugin插件中,可以轻松找到对应的步骤。

final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);assert soulContext != null;final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId()); // 选出符合调用要求的服务列表if (CollectionUtils.isEmpty(upstreamList)) {log.error("divide upstream configuration error: {}", rule.toString());Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);return WebFluxResultUtils.result(exchange, error);}final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip); // 对服务的列表进行负载均衡if (Objects.isNull(divideUpstream)) {log.error("divide has no upstream");Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);return WebFluxResultUtils.result(exchange, error);}// set the http urlString domain = buildDomain(divideUpstream);String realURL = buildRealURL(domain, soulContext, exchange);exchange.getAttributes().put(Constants.HTTP_URL, realURL);// set the http timeoutexchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());return chain.execute(exchange);

选出符合调用要求的服务列表

通过命名可以看出 UpstreamCacheManager 是一个单例类。分析其源码

public final class UpstreamCacheManager {private static final UpstreamCacheManager INSTANCE = new UpstreamCacheManager();private static final Map<String, List<DivideUpstream>> UPSTREAM_MAP = Maps.newConcurrentMap();private static final Map<String, List<DivideUpstream>> UPSTREAM_MAP_TEMP = Maps.newConcurrentMap();/*** suggest soul.upstream.scheduledTime set 1 SECONDS.*/private UpstreamCacheManager() {boolean check = Boolean.parseBoolean(System.getProperty("soul.upstream.check", "false"));if (check) {new ScheduledThreadPoolExecutor(1, SoulThreadFactory.create("scheduled-        upstream-task", false)).scheduleWithFixedDelay(this::scheduled, 30,         Integer.parseInt(System.getProperty("soul.upstream.scheduledTime", "30")),    TimeUnit.SECONDS);}}/*** Gets instance.** @return the instance*/public static UpstreamCacheManager getInstance() {return INSTANCE;}/*** Find upstream list by selector id list.** @param selectorId the selector id* @return the list*/public List<DivideUpstream> findUpstreamListBySelectorId(final String selectorId) {return UPSTREAM_MAP_TEMP.get(selectorId);}/*** Remove by key.** @param key the key*/public void removeByKey(final String key) {UPSTREAM_MAP_TEMP.remove(key);}/*** Submit.** @param selectorData the selector data*/public void submit(final SelectorData selectorData) {final List<DivideUpstream> upstreamList = GsonUtils.getInstance().fromList(selectorData.getHandle(), DivideUpstream.class);if (null != upstreamList && upstreamList.size() > 0) {UPSTREAM_MAP.put(selectorData.getId(), upstreamList);UPSTREAM_MAP_TEMP.put(selectorData.getId(), upstreamList);} else {UPSTREAM_MAP.remove(selectorData.getId());UPSTREAM_MAP_TEMP.remove(selectorData.getId());}}private void scheduled() {if (UPSTREAM_MAP.size() > 0) {UPSTREAM_MAP.forEach((k, v) -> {List<DivideUpstream> result = check(v);if (result.size() > 0) {UPSTREAM_MAP_TEMP.put(k, result);} else {UPSTREAM_MAP_TEMP.remove(k);}});}}private List<DivideUpstream> check(final List<DivideUpstream> upstreamList) {List<DivideUpstream> resultList = Lists.newArrayListWithCapacity(upstreamList.size());for (DivideUpstream divideUpstream : upstreamList) {final boolean pass = UpstreamCheckUtils.checkUrl(divideUpstream.getUpstreamUrl());if (pass) {if (!divideUpstream.isStatus()) {divideUpstream.setTimestamp(System.currentTimeMillis());divideUpstream.setStatus(true);log.info("UpstreamCacheManager detect success the url: {}, host: {} ", divideUpstream.getUpstreamUrl(), divideUpstream.getUpstreamHost());}resultList.add(divideUpstream);} else {divideUpstream.setStatus(false);log.error("check the url={} is fail ", divideUpstream.getUpstreamUrl());}}return resultList;}}

通过阅读上述代码,UPSTREAM_MAP_TEMP 为 ConcurrentHashMap 存储了选择器数据相关信息。key为选择器id,值为选择器信息,方便检索。 而 UPSTREAM_MAP 存储的是实际的选择器相关信息,并且在构造器中启动一个线程每30s更新 UPSTREAM_MAP_TEMP 的信息(将 UPSTREAM_MAP 内相关信息拷贝到 UPSTREAM_MAP_TEMP 提供给外部访问 )。

而对外提供注册的接口是 submit。

而进行注册的是是插件订阅器

public class CommonPluginDataSubscriber implements PluginDataSubscriber {private final Map<String, PluginDataHandler> handlerMap;/*** Instantiates a new Common plugin data subscriber.** @param pluginDataHandlerList the plugin data handler list*/public CommonPluginDataSubscriber(final List<PluginDataHandler> pluginDataHandlerList) {this.handlerMap = pluginDataHandlerList.stream().collect(Collectors.toConcurrentMap(PluginDataHandler::pluginNamed, e -> e));}@Overridepublic void onSubscribe(final PluginData pluginData) {subscribeDataHandler(pluginData, DataEventTypeEnum.UPDATE);}@Overridepublic void unSubscribe(final PluginData pluginData) {subscribeDataHandler(pluginData, DataEventTypeEnum.DELETE);}@Overridepublic void refreshPluginDataAll() {BaseDataCache.getInstance().cleanPluginData();}@Overridepublic void refreshPluginDataSelf(final List<PluginData> pluginDataList) {if (CollectionUtils.isEmpty(pluginDataList)) {return;}BaseDataCache.getInstance().cleanPluginDataSelf(pluginDataList);}@Overridepublic void onSelectorSubscribe(final SelectorData selectorData) {subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);}@Overridepublic void unSelectorSubscribe(final SelectorData selectorData) {subscribeDataHandler(selectorData, DataEventTypeEnum.DELETE);}@Overridepublic void refreshSelectorDataAll() {BaseDataCache.getInstance().cleanSelectorData();}@Overridepublic void refreshSelectorDataSelf(final List<SelectorData> selectorDataList) {if (CollectionUtils.isEmpty(selectorDataList)) {return;}BaseDataCache.getInstance().cleanSelectorDataSelf(selectorDataList);}@Overridepublic void onRuleSubscribe(final RuleData ruleData) {subscribeDataHandler(ruleData, DataEventTypeEnum.UPDATE);}@Overridepublic void unRuleSubscribe(final RuleData ruleData) {subscribeDataHandler(ruleData, DataEventTypeEnum.DELETE);}@Overridepublic void refreshRuleDataAll() {BaseDataCache.getInstance().cleanRuleData();}@Overridepublic void refreshRuleDataSelf(final List<RuleData> ruleDataList) {if (CollectionUtils.isEmpty(ruleDataList)) {return;}BaseDataCache.getInstance().cleanRuleDataSelf(ruleDataList);}private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {Optional.ofNullable(classData).ifPresent(data -> {if (data instanceof PluginData) {PluginData pluginData = (PluginData) data;if (dataType == DataEventTypeEnum.UPDATE) {BaseDataCache.getInstance().cachePluginData(pluginData);Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));} else if (dataType == DataEventTypeEnum.DELETE) {BaseDataCache.getInstance().removePluginData(pluginData);Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));}} else if (data instanceof SelectorData) {SelectorData selectorData = (SelectorData) data;if (dataType == DataEventTypeEnum.UPDATE) {BaseDataCache.getInstance().cacheSelectData(selectorData);Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));} else if (dataType == DataEventTypeEnum.DELETE) {BaseDataCache.getInstance().removeSelectData(selectorData);Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));}} else if (data instanceof RuleData) {RuleData ruleData = (RuleData) data;if (dataType == DataEventTypeEnum.UPDATE) {BaseDataCache.getInstance().cacheRuleData(ruleData);Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));} else if (dataType == DataEventTypeEnum.DELETE) {BaseDataCache.getInstance().removeRuleData(ruleData);Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));}}});}
}

可以看出负载均衡算法这里是通过spi方式扩展的。

通过查看LoadBalance的实现类,可以看出soul支持的算法有Hash,Random,RoundRobin

  • Hash
@Join
public class HashLoadBalance extends AbstractLoadBalance {private static final int VIRTUAL_NODE_NUM = 5;@Overridepublic DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();for (DivideUpstream address : upstreamList) {for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-" + i);treeMap.put(addressHash, address);}}long hash = hash(String.valueOf(ip));SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);if (!lastRing.isEmpty()) {return lastRing.get(lastRing.firstKey());}return treeMap.firstEntry().getValue();}private static long hash(final String key) {// md5 byteMessageDigest md5;try {md5 = MessageDigest.getInstance("MD5");} catch (NoSuchAlgorithmException e) {throw new SoulException("MD5 not supported", e);}md5.reset();byte[] keyBytes;keyBytes = key.getBytes(StandardCharsets.UTF_8);md5.update(keyBytes);byte[] digest = md5.digest();// hash code, Truncate to 32-bitslong hashCode = (long) (digest[3] & 0xFF) << 24| ((long) (digest[2] & 0xFF) << 16)| ((long) (digest[1] & 0xFF) << 8)| (digest[0] & 0xFF);return hashCode & 0xffffffffL;}}

通过计算请求ip的哈希值,来找到本地服务。

  • Random
@Join
public class RandomLoadBalance extends AbstractLoadBalance {private static final Random RANDOM = new Random();@Overridepublic DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {int totalWeight = calculateTotalWeight(upstreamList);boolean sameWeight = isAllUpStreamSameWeight(upstreamList);if (totalWeight > 0 && !sameWeight) {return random(totalWeight, upstreamList);}// If the weights are the same or the weights are 0 then randomreturn random(upstreamList);}private boolean isAllUpStreamSameWeight(final List<DivideUpstream> upstreamList) {boolean sameWeight = true;int length = upstreamList.size();for (int i = 0; i < length; i++) {int weight = getWeight(upstreamList.get(i));if (i > 0 && weight != getWeight(upstreamList.get(i - 1))) {// Calculate whether the weight of ownership is the samesameWeight = false;break;}}return sameWeight;}private int calculateTotalWeight(final List<DivideUpstream> upstreamList) {// total weightint totalWeight = 0;for (DivideUpstream divideUpstream : upstreamList) {int weight = getWeight(divideUpstream);// Cumulative total weighttotalWeight += weight;}return totalWeight;}private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) {// If the weights are not the same and the weights are greater than 0, then random by the total number of weightsint offset = RANDOM.nextInt(totalWeight);// Determine which segment the random value falls onfor (DivideUpstream divideUpstream : upstreamList) {offset -= getWeight(divideUpstream);if (offset < 0) {return divideUpstream;}}return upstreamList.get(0);}private DivideUpstream random(final List<DivideUpstream> upstreamList) {return upstreamList.get(RANDOM.nextInt(upstreamList.size()));}
}

可以看出这里的逻辑是,如果是有权重的话,则采用权重进行负载均衡,如果没有权重,则直接随机访问。

  • RoundRobin
@Join
public class RoundRobinLoadBalance extends AbstractLoadBalance {private final int recyclePeriod = 60000;private final ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<>(16);private final AtomicBoolean updateLock = new AtomicBoolean();@Overridepublic DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {String key = upstreamList.get(0).getUpstreamUrl();ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);if (map == null) {methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));map = methodWeightMap.get(key);}int totalWeight = 0;long maxCurrent = Long.MIN_VALUE;long now = System.currentTimeMillis();DivideUpstream selectedInvoker = null;WeightedRoundRobin selectedWRR = null;for (DivideUpstream upstream : upstreamList) {String rKey = upstream.getUpstreamUrl();WeightedRoundRobin weightedRoundRobin = map.get(rKey);int weight = getWeight(upstream);if (weightedRoundRobin == null) {weightedRoundRobin = new WeightedRoundRobin();weightedRoundRobin.setWeight(weight);map.putIfAbsent(rKey, weightedRoundRobin);}if (weight != weightedRoundRobin.getWeight()) {//weight changedweightedRoundRobin.setWeight(weight);}long cur = weightedRoundRobin.increaseCurrent();weightedRoundRobin.setLastUpdate(now);if (cur > maxCurrent) {maxCurrent = cur;selectedInvoker = upstream;selectedWRR = weightedRoundRobin;}totalWeight += weight;}if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {try {// copy -> modify -> update referenceConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);methodWeightMap.put(key, newMap);} finally {updateLock.set(false);}}if (selectedInvoker != null) {selectedWRR.sel(totalWeight);return selectedInvoker;}// should not happen herereturn upstreamList.get(0);}/*** The type Weighted round robin.*/protected static class WeightedRoundRobin {private int weight;private final AtomicLong current = new AtomicLong(0);private long lastUpdate;/*** Gets weight.** @return the weight*/int getWeight() {return weight;}/*** Sets weight.** @param weight the weight*/void setWeight(final int weight) {this.weight = weight;current.set(0);}/*** Increase current long.** @return the long*/long increaseCurrent() {return current.addAndGet(weight);}/*** Sel.** @param total the total*/void sel(final int total) {current.addAndGet(-1 * total);}/*** Gets last update.** @return the last update*/long getLastUpdate() {return lastUpdate;}/*** Sets last update.** @param lastUpdate the last update*/void setLastUpdate(final long lastUpdate) {this.lastUpdate = lastUpdate;}}}

从这里可以看出,轮询算法也是区分是否有权重的算法。

带着问题读源码-soul(2021-01-15)相关推荐

  1. 带着问题读源码-soul(2021-01-16)

    ### 带着问题读源码系列之Dubbo插件 像往常一样启动 [soul-admin] 和 [soul-bootstrap] . 因为dubbo需要依赖zookeeper, 需要需要启动一个监听在 lo ...

  2. 带着问题读源码-soul(2021-01-14)

    下载编译 git clone git@github.com:dromara/soul.gitcd soulmvn clean package install -Dmaven.test.skip=tru ...

  3. 夜读源码,带你探究 Go 语言的iota

    Go 语言的 iota 怎么说呢,感觉像枚举,又有点不像枚举,它的底层是什么样的,用哪个姿势使用才算正规,今天转载一篇「Go夜读」社区上分享的文章,咱们一起学习下.Go 夜读,带你每页读源码~!  这 ...

  4. myisam怎么读_耗时半年,我成功“逆袭”,拿下美团offer(刷面试题+读源码+项目准备)...

    欢迎关注专栏[以架构赢天下]--每天持续分享Java相关知识点 以架构赢天下​zhuanlan.zhihu.com 以架构赢天下--持续分享Java相关知识点 每篇文章首发此专栏 欢迎各路Java程序 ...

  5. 微信读书vscode插件_跟我一起读源码 – 如何阅读开源代码

    阅读是最好的老师 在学习和提升编程技术的时候,通过阅读高质量的源码,来学习专家写的高质量的代码,是一种非常有效的提升自我的方式.程序员群体是一群乐于分享的群体,因此在互联网上有大量的高质量开源项目,阅 ...

  6. 手把手带你阅读Mybatis源码(三)缓存篇

    点击上方"Java知音",选择"置顶公众号" 技术文章第一时间送达! 前言 大家好,这一篇文章是MyBatis系列的最后一篇文章,前面两篇文章:手把手带你阅读M ...

  7. (建议收藏)第一人称视角带你走进 Vue 源码世界

    点击上方关注 前端技术江湖,一起学习,天天进步 前言 本文不引战,成熟的人应该脱离框架的范畴,而不是纠结谁更好或者谁更不好.有道是黑猫白猫,抓到老鼠就是好猫. 所以本文会带大家读源码.简单易懂,大佬小 ...

  8. 「建议收藏」第一人称视角带你走进 Vue 源码世界

    前言 本文不引战,成熟的人应该脱离框架的范畴,而不是纠结谁更好或者谁更不好.有道是黑猫白猫,抓到老鼠就是好猫. 所以本文会带大家读源码.简单易懂,大佬小白都能看明白.并收获益处. 从 new 一个 V ...

  9. 京东CTO的笔记23种设计模式和5大读源码方法...!网友:这次稳了...

    大家都知道源码框架有23个设计模式,但是我们大多停留在概念层面,真实开发中很少应用到,也不知道如何落地!!!那有没有办法解决了? 我整理了 Mybatis 和 Spring 源码中使用了大量的设计模式 ...

最新文章

  1. Tensorflow 10分钟快速上手
  2. 【学习笔记】18、函数的其他功能
  3. 深耕边缘计算 揭秘阿里云边缘云网一体化的技术实践
  4. 前端工作面试问题(下)
  5. Mysql 替换字段的一部分内容
  6. 小米键盘 键盘切换_“年轻人的第一把机械键盘”,小米机械键盘到底值不值...
  7. 算法竞赛中的随机数产生和断言
  8. 电平转换芯片_「厚积薄发」润石产品面面观之电平转换芯片 RS0104
  9. Windows 命令行及Git操作
  10. NLP中的预训练方法总结 word2vec、ELMO、GPT、BERT、XLNET
  11. ubuntu20.04.1下安装qt4相关依赖库
  12. 在线文档转换接口 word,excel,ppt等在线文件转pdf、png
  13. 2023北京国际老年产业博览会/养老产业展/养老服务业展
  14. Hibernate占位符?和:及JPA占位符
  15. Network Battery for mac(实时网速显示和电池健康) 教程
  16. 一文回顾 Java 入门知识(上)
  17. Android中关于ComponentName的使用
  18. HJ88 扑克牌大小
  19. 公众号内容拓展学习笔记(2021.5.1)
  20. Sublime配置java运行环境(IntelliJ IDEA也许更好用)

热门文章

  1. openlayer 画圆Circle实际半径解决方案
  2. 请回答2020 | 智行中国,用脚步记录这个时代的智变故事
  3. 双素数(质数)C语言程序详解
  4. 远程控制软件向日葵等
  5. com.mysql.jdbc.Driver飘红,已解决
  6. ASP NET - ArrayList 对象 方法描述
  7. 天气API 实时降水预报接口, 实时降雨量数据接口
  8. 磁盘性能基本测试方法
  9. FinTechthon赛果公布| 这些区块链脑洞如何实现?
  10. 续.第一次冲刺之后.