目录

1.前情回顾

2.配置

2.1 soul-admin

2.2 soul-bootstrap

3.启动

3.1 启动 soul-admin

3.2 启动 soul-bootstrap

3.2.1 数据更新器工厂初始化

3.2.2 从 soul-admin 获取所有配置数据

3.2.3 判断数据是否发生改变

4. 记录个错误

4.1 由于 sofa 插件没开启的错误

4.2 不开 zk 网关不能启动的错误


1.前情回顾

紧接着前两天的 zookeeper,今天来看下 http 长轮询。

2.配置

数据同步策略官网链接 https://dromara.org/zh-cn/docs/soul/user-dataSync.html

2.1 soul-admin

修改 application.yml 配置文件,打开注释的代码:

soul:sync:http:enabled: true

对应 Bean,默认 enabled 就是 true,只要配置 soul.sync.http 即可。

/*** the http sync strategy properties.* @author huangxiaofeng*/
@Getter
@Setter
@ConfigurationProperties(prefix = "soul.sync.http")
public class HttpSyncProperties {/*** Whether enabled http sync strategy, default: true.*/private boolean enabled = true;/*** Periodically refresh the config data interval from the database, default: 5 minutes.*/private Duration refreshInterval = Duration.ofMinutes(5);}

这里会把配置传入到配置类中,加载 HttpLongPollingDataChangedListener:

@Configuration
public class DataSyncConfiguration {/*** http long polling.*/@Configuration@ConditionalOnProperty(name = "soul.sync.http.enabled", havingValue = "true")@EnableConfigurationProperties(HttpSyncProperties.class)static class HttpLongPollingListener {@Bean@ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {return new HttpLongPollingDataChangedListener(httpSyncProperties);}}...}

2.2 soul-bootstrap

修改 application-local.yml 配置文件

soul:sync:http:url : http://localhost:9095
#url: 配置成你的 soul-admin的 ip与端口地址,多个admin集群环境请使用(,)分隔。

对应 Bean

/*** The type Http config.*/
@Data
public class HttpConfig {private String url;private Integer delayTime;private Integer connectionTimeout;
}

使用 Spring starter 机制加载的配置

@Configuration
@ConditionalOnClass(HttpSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.http", name = "url")
@Slf4j
public class HttpSyncDataConfiguration {/*** Http sync data service.** @param httpConfig        the http config* @param pluginSubscriber the plugin subscriber* @param metaSubscribers   the meta subscribers* @param authSubscribers   the auth subscribers* @return the sync data service*/@Beanpublic SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {log.info("you use http long pull sync soul data");return new HttpSyncDataService(Objects.requireNonNull(httpConfig.getIfAvailable()), Objects.requireNonNull(pluginSubscriber.getIfAvailable()),metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));}/*** Http config http config.** @return the http config*/@Bean@ConfigurationProperties(prefix = "soul.sync.http")public HttpConfig httpConfig() {return new HttpConfig();}
}

在 pom.xml 文件中 引入以下依赖(代码中已经引入了):

        <!--soul data sync start use http--><dependency><groupId>org.dromara</groupId><artifactId>soul-spring-boot-starter-sync-data-http</artifactId><version>${project.version}</version></dependency>

3.启动

3.1 启动 soul-admin

3.2 启动 soul-bootstrap

这里在2.2小节中的 HttpSyncDataConfiguration 中的 httpSyncDataService 方法上打上断点,会调用 HttpSyncDataService 的构造方法:

// HttpSyncDataService.javapublic HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {// 初始化数据更新器工厂this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);this.httpConfig = httpConfig;this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));// 初始化 RestTemplatethis.httpClient = createRestTemplate();// 开启 HTTP 长连接线程this.start();}

3.2.1 数据更新器工厂初始化

/*** The type Data refresh factory.*/
public final class DataRefreshFactory {private static final EnumMap<ConfigGroupEnum, DataRefresh> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);/*** Instantiates a new Data refresh factory.** @param pluginDataSubscriber the plugin data subscriber* @param metaDataSubscribers  the meta data subscribers* @param authDataSubscribers  the auth data subscribers*/public DataRefreshFactory(final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers,final List<AuthDataSubscriber> authDataSubscribers) {ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataRefresh(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataRefresh(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataRefresh(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AppAuthDataRefresh(authDataSubscribers));ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataRefresh(metaDataSubscribers));}...}

在构造方法里初始化了5种数据更新器,这个后面判断数据是否变化时会用到。

仔细看下类继承关系及各自覆写方法,又是模板方法(Boolean : refresh(JsonObject)),又是策略模式,6得飞起。

3.2.2 从 soul-admin 获取所有配置数据

// HttpSyncDataService.javaprivate void start() {// It could be initialized multiple times, so you need to control that.if (RUNNING.compareAndSet(false, true)) {// fetch all group configs.// 获取所有的配置数据this.fetchGroupConfig(ConfigGroupEnum.values());int threadSize = serverList.size();// 这里初始化线程池,核心线程数为 soul-admin 个数,即1个线程负责与1个 soul-admin同步this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(),// 自定义线程池工厂,名字设定好,方便 jstack 时按线程名称查找SoulThreadFactory.create("http-long-polling", true));// start long polling, each server creates a thread to listen for changes.// 开启 http 长轮询,每一个 soul-admin 创建一个线程去监听变化this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));} else {log.info("soul http long polling was started, executor=[{}]", executor);}}
// HttpSyncDataService.javaprivate void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {for (int index = 0; index < this.serverList.size(); index++) {String server = serverList.get(index);try {this.doFetchGroupConfig(server, groups);break;} catch (SoulException e) {// no available server, throw exception.if (index >= serverList.size() - 1) {throw e;}log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));}}}private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {StringBuilder params = new StringBuilder();for (ConfigGroupEnum groupKey : groups) {params.append("groupKeys").append("=").append(groupKey.name()).append("&");}// 拼接出来的数据 http://localhost:9095/configs/fetch?groupKeys=APP_AUTH&groupKeys=PLUGIN&groupKeys=RULE&groupKeys=SELECTOR&groupKeys=META_DATAString url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");log.info("request configs: [{}]", url);String json = null;try {// 通过 RestTemplate 调用接口获取所有数据,内容有点儿多,就不贴出来了json = this.httpClient.getForObject(url, String.class);} catch (RestClientException e) {String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());log.warn(message);throw new SoulException(message, e);}// update local cache// 判断数据是否发生改变,这是下面分析的重点boolean updated = this.updateCacheWithJson(json);if (updated) {log.info("get latest configs: [{}]", json);return;}// not updated. it is likely that the current config server has not been updated yet. wait a moment.log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);// 当前线程休眠30秒ThreadUtils.sleep(TimeUnit.SECONDS, 30);}

3.2.3 判断数据是否发生改变

// HttpSyncDataService.java /*** update local cache.* @param json the response from config server.* @return true: the local cache was updated. false: not updated.*/private boolean updateCacheWithJson(final String json) {JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);JsonObject data = jsonObject.getAsJsonObject("data");// if the config cache will be updated?// 这里调用数据更新器工厂方法return factory.executor(data);}
// DataRefreshFactory.javapublic boolean executor(final JsonObject data) {final boolean[] success = {false};ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));return success[0];}

这里使用了 3.2.1 小节中分析的 ENUM_MAP,而且这里使用了并行流,5种不同的数据并行执行,缩短数据处理的时间。

这里还定义了一个布尔值的数组,用来存放数据是否发生变化的结果,只要这5种数据中,有1种变化了,就是变化了;只有都没有变化,才是没有变化。

这里迎来了一个重点,每种数据更新器的不同判断方式,也就是上面说的模板方法:dataRefresh.refresh(data)

// AbstractDataRefresh.java@Overridepublic Boolean refresh(final JsonObject data) {boolean updated = false;// 从所有数据中分别获取各自的数据JsonObject jsonObject = convert(data);if (null != jsonObject) {// 把 json 数据转成 Java BeanConfigData<T> result = fromJson(jsonObject);if (this.updateCacheIfNeed(result)) {updated = true;refresh(result.getData());}}return updated;}

这里以 PluginDataRefresh 为例走一遍流程,其他的小伙伴自己看下吧。

// PluginDataRefresh.java@Overrideprotected JsonObject convert(final JsonObject data) {return data.getAsJsonObject(ConfigGroupEnum.PLUGIN.name());}@Overrideprotected ConfigData<PluginData> fromJson(final JsonObject data) {return GSON.fromJson(data, new TypeToken<ConfigData<PluginData>>() {}.getType());}@Overrideprotected boolean updateCacheIfNeed(final ConfigData<PluginData> result) {return updateCacheIfNeed(result, ConfigGroupEnum.PLUGIN);}@Overrideprotected void refresh(final List<PluginData> data) {if (CollectionUtils.isEmpty(data)) {log.info("clear all plugin data cache");pluginDataSubscriber.refreshPluginDataAll();} else {// 这里将 BaseDataCache#PLUGIN_MAP 缓存清空了pluginDataSubscriber.refreshPluginDataAll();// 重新订阅data.forEach(pluginDataSubscriber::onSubscribe);}}// AbstractDataRefresh.java/*** If the MD5 values are different and the last update time of the old data is less than* the last update time of the new data, the configuration cache is considered to have been changed.* 当 MD5 值不同,或者旧数据的最后更新时间较最新数据靠前,就认为配置的缓存变化了** @param newVal    the lasted config* @param groupEnum the group enum* @return true : if need update*/protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {// first init cacheif (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {return true;}ResultHolder holder = new ResultHolder(false);GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {// must compare the last update timeif (!StringUtils.equals(oldVal.getMd5(), newVal.getMd5()) && oldVal.getLastModifyTime() < newVal.getLastModifyTime()) {log.info("update {} config: {}", groupEnum, newVal);holder.result = true;return newVal;}log.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());return oldVal;});return holder.result;}

fromJson 方法通过 GSON,把 json 数据转成 Java Bean

这个先分析到这里,明天继续。


4. 记录个错误

4.1 由于 sofa 插件没开启的错误

这里有个坑,记录一下。

昨天在使用 zookeeper 时,使用 sofa 插件作为例子在 web 页面开启关闭来着,如果把 sofa 关闭了,以下代码不能进入 if 语句,也就意味着不能初始化 sofa 插件,最终造成 sofa-rpc 包里 for 循环时 NPE。

// SofaPluginDataHandler.java@Overridepublic void handlerPlugin(final PluginData pluginData) {// 如果插件关闭 getEnabled 为false 不能进入if (null != pluginData && pluginData.getEnabled()) {SofaRegisterConfig sofaRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), SofaRegisterConfig.class);SofaRegisterConfig exist = Singleton.INST.get(SofaRegisterConfig.class);if (Objects.isNull(sofaRegisterConfig)) {return;}if (Objects.isNull(exist) || !sofaRegisterConfig.equals(exist)) {// If it is null, initialize it// 导致这里的初始化不能执行ApplicationConfigCache.getInstance().init(sofaRegisterConfig);ApplicationConfigCache.getInstance().invalidateAll();}Singleton.INST.single(SofaRegisterConfig.class, sofaRegisterConfig);}}// ApplicationConfigCache.java/*** Init.** @param sofaRegisterConfig the sofa register config*/public void init(final SofaRegisterConfig sofaRegisterConfig) {if (applicationConfig == null) {applicationConfig = new ApplicationConfig();applicationConfig.setAppId("soul_proxy");applicationConfig.setAppName("soul_proxy");}// 这里的 registryConfig 始终为 nullif (registryConfig == null) {registryConfig = new RegistryConfig();registryConfig.setProtocol(sofaRegisterConfig.getProtocol());registryConfig.setId("soul_proxy");registryConfig.setRegister(false);registryConfig.setAddress(sofaRegisterConfig.getRegister());}}/*** Build reference config.** @param metaData the meta data* @return the reference config*/public ConsumerConfig<GenericService> build(final MetaData metaData) {ConsumerConfig<GenericService> reference = new ConsumerConfig<>();reference.setGeneric(true);reference.setApplication(applicationConfig);// 后面再调用到这里时,sofa-rpc 包里的 AbstractInterfaceConfig 的 registry 数组里有1个值,就是 nullreference.setRegistry(registryConfig);reference.setInterfaceId(metaData.getServiceName());reference.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT);reference.setInvokeType(RpcConstants.INVOKER_TYPE_CALLBACK);reference.setRepeatedReferLimit(-1);String rpcExt = metaData.getRpcExt();SofaParamExtInfo sofaParamExtInfo = GsonUtils.getInstance().fromJson(rpcExt, SofaParamExtInfo.class);if (Objects.nonNull(sofaParamExtInfo)) {if (StringUtils.isNoneBlank(sofaParamExtInfo.getLoadbalance())) {final String loadBalance = sofaParamExtInfo.getLoadbalance();reference.setLoadBalancer(buildLoadBalanceName(loadBalance));}Optional.ofNullable(sofaParamExtInfo.getTimeout()).ifPresent(reference::setTimeout);Optional.ofNullable(sofaParamExtInfo.getRetries()).ifPresent(reference::setRetries);}Object obj = reference.refer();if (obj != null) {log.info("init sofa reference success there meteData is :{}", metaData.toString());cache.put(metaData.getPath(), reference);}return reference;}
// com.alipay.sofa.rpc.client.router.MeshRouter.java@Overridepublic boolean needToLoad(ConsumerBootstrap consumerBootstrap) {ConsumerConfig consumerConfig = consumerBootstrap.getConsumerConfig();// 不是直连,且从注册中心订阅配置final boolean isDirect = StringUtils.isNotBlank(consumerConfig.getDirectUrl());final List<RegistryConfig> registrys = consumerConfig.getRegistry();boolean isMesh = false;if (registrys != null) {for (RegistryConfig registry : registrys) {// 这里遍历出来的是1个 null,报 NPEif (registry.getProtocol().equalsIgnoreCase(RpcConstants.REGISTRY_PROTOCOL_MESH)) {isMesh = true;break;}}}return !isDirect && isMesh;}

在 soul-admin web 页面,把 sofa 插件开启。

4.2 不开 zk 网关不能启动的错误

不启动 zk,网关一直连接不上zk,早晨不能正常启动。这个错误还在定位,明天继续努力吧,太难了。

Caused by: com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException: RPC-010060014: 注册 consumer 到 [ZookeeperRegistry] 失败!

看样子还是 sofa 的锅。

【Soul源码阅读】12.soul-admin 与 soul-bootstrap 同步机制之 http 长轮询解析(上)相关推荐

  1. 【Soul源码阅读】2.单机部署 Soul

    上一篇中,我们对 Soul 有了一个简单的认识. 在学习一项新技术时,我们先按照官网上的 Demo 把环境搭建起来.今天先搭建一个单机版测试环境. 1.下载源码 soul 的 github 地址: h ...

  2. 【Soul源码阅读系列(一)】Soul网关初探

    本篇文章主要内容如下: Soul是什么 如何在本地运行Soul 对Soul进行压测 Soul 是什么 Soul是什么?它可不是灵魂交友软件! 引用Soul的官网,它是这样描述Soul的: 这是一个异步 ...

  3. soul源码阅读 启动soul应用

             soul的官方文档地址:https://dromara.org/zh-cn/docs/soul/soul.html         素质三连,watch,start,fork之后cl ...

  4. 【Soul源码阅读】3.HTTP 用户接入 Soul 流程解析

    昨天只是极简入门,关于网关是怎么感知到我们的应用的,相信小伙伴们一定有疑问,今天先来看下 HTTP 用户如何接入 Soul,以及接入的流程是怎样的. 这是官网对于 HTTP 用户的文档,https:/ ...

  5. soul源码阅读 soul数据同步之nacos

    nacos的安装在官网明确写的很清楚,这里就不再赘述了.管理页面地址是http://localhost:8848/nacos记得登录用户名密码是nacos/nacos; 启动nacos同步 图1 图2 ...

  6. Soul网关源码阅读番外篇(一) HTTP参数请求错误

    Soul网关源码阅读番外篇(一) HTTP参数请求错误 共同作者:石立 萧 * 简介     在Soul网关2.2.1版本源码阅读中,遇到了HTTP请求加上参数返回404的错误,此篇文章基于此进行探索 ...

  7. Soul 网关源码阅读(四)Dubbo请求概览

    Soul 网关源码阅读(四)Dubbo请求概览 简介     本次启动一个dubbo服务示例,初步探索Soul网关源码的Dubbo请求处理流程 示例运行 环境配置     在Soul源码clone下来 ...

  8. Soul网关源码阅读(十)自定义简单插件编写

    Soul网关源码阅读(十)自定义简单插件编写 简介     综合前面所分析的插件处理流程相关知识,此次我们来编写自定义的插件:统计请求在插件链中的经历时长 编写准备     首先我们先探究一下,一个P ...

  9. Soul网关源码阅读(九)插件配置加载初探

    Soul网关源码阅读(九)插件配置加载初探 简介     今日来探索一下插件的初始化,及相关的配置的加载 源码Debug 插件初始化     首先来到我们非常熟悉的插件链调用的类: SoulWebHa ...

最新文章

  1. ubuntu 强制关机后 mysql无法启动
  2. DNS and Bind (一)
  3. 用php画一个蓝底红色的圆_php把图片处理成圆形透明的头像
  4. c语言派,C语言中 派/4=1-1/3+1/5-1/7....公式求派
  5. Cloudera Enterprise 试用版 6.3.1查看cloudrea的许可证---可用期限
  6. AJAX自学笔记01
  7. Greenplum installation guide
  8. python和django的关系_Django一对一关系实践
  9. Abaqus汉化问题
  10. python贪吃蛇的实验报告_贪吃蛇游戏程序设计实验报告.doc
  11. 伺服驱动器开发案例,迈信EP100
  12. 5G无线技术基础自学系列 | 5G信道结构
  13. 常用符号大全 特殊符号
  14. java拼音_Java获取汉字对应的拼音(全拼或首字母)
  15. IT6801FN中文版
  16. 硬币找钱问题(最小硬币和问题)详解与代码实现
  17. Dharma勒索病毒变种 ---加密后文件后缀.bkpx
  18. “@” Java中的特殊符号——注解(Java中’@‘符号是什么意思?)
  19. 论文查重率多少算正常?
  20. 在html文件中 url是统一资源定位器,HTML URL(统一资源定位器)

热门文章

  1. 一个毕业设计 小学生英语app
  2. Linux下安装qq的方法
  3. 追妹神器,恋爱神器,哄老婆开心,智能机器人每天给你心爱的TA发送早晚安问候
  4. 新氧发布首个医美消费避坑指南 为爱美人士带来干货分享
  5. c#将字符串写入Sream中
  6. 【原创】光棍节里 吻我逝去的青春
  7. java并发编程学习一
  8. [转]小米智能家庭套装为什么选择 ZigBee 协议?
  9. 工作分流是什么意思_军校“合训分流”专业是什么意思?
  10. ArcGIS教程:更改标题的文本和样式