soul网关mysql8_深度解析 Soul 网关——数据同步
引子
从官网 clone 代码下来后,依次启动 soul-admin、soul-bootstrap 和 soul-examples-http 模块,启动成功后查看 soul-admin 会发现 soul-examples-http 模块里的代理配置和路由规则已经自动同步到了 soul-admin 上,通过 postman 调用发现配置也自动生效。
soul-examples-http 核心配置
soul:
http:
adminUrl: http://localhost:9095
port: 8188
contextPath: /http
appName: http
full: false
soul-examples-http 主要接口
接口类
soul-admin 自动更新到的配置
devide 配置全局
更新到的配置
选择器详情
自动更新到的选择器配置
可以看到选择器已经读取了 soul-examples-http 配置文件中的信息,并且 soul-admin 自动获取到了soul-examples-http 服务的 IP 和端口信息。
postman 验证
直接请求 soul-examples-http 服务
在这里插入图片描述
通过网关转发到 soul-examples-http 服务
在这里插入图片描述
结果符合预期,证明 soul-examples-http 的配置自动同步给了 soul-admin 和 soul-bootstrap,接下来就研究一下 soul 是怎么实现这一块逻辑的。
客户端向 soul-admin 同步
配置分析
通过查看 soul-examples-http 的配置文件,可以看到有配置 soul-admin 的地址
soul:
http:
adminUrl: http://localhost:9095
跟进这个配置映射 Java 实体,找到了soul-client-springmvc 下的SoulSpringMvcConfig类,这个类定值了配置相关的选择
public class SoulSpringMvcConfig {
/**
* soul-admin 地址
*/
private String adminUrl;
/**
* 代理服务上下文地址
*/
private String contextPath;
/**
* 服务名
*/
private String appName;
/**
* 是否全局代理
*/
private boolean full;
/**
* 服务 host 地址
*/
private String host;
/**
* 服务端口
*/
private Integer port;
}
客户端源码解析
查看配置类的调用方,追踪到两个类,分别是 ContextRegisterListener 和 SpringMvcClientBeanPostProcessor,分别在项目启动完成后执行和 bean 扫描后执行。具体代码和解析如下:
public class ContextRegisterListener implements ApplicationListener {
private final AtomicBoolean registered = new AtomicBoolean(false);
private final String url;
private final SoulSpringMvcConfig soulSpringMvcConfig;
/**
* Instantiates a new Context register listener.
*
* @param soulSpringMvcConfig the soul spring mvc config
*/
public ContextRegisterListener(final SoulSpringMvcConfig soulSpringMvcConfig) {
ValidateUtils.validate(soulSpringMvcConfig);
this.soulSpringMvcConfig = soulSpringMvcConfig;
url = soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register";
}
@Override
public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
if (!registered.compareAndSet(false, true)) {
return;
}
if (soulSpringMvcConfig.isFull()) {
RegisterUtils.doRegister(buildJsonParams(), url, RpcTypeEnum.HTTP);
}
}
private String buildJsonParams() {
String contextPath = soulSpringMvcConfig.getContextPath();
String appName = soulSpringMvcConfig.getAppName();
Integer port = soulSpringMvcConfig.getPort();
String path = contextPath + "/**";
String configHost = soulSpringMvcConfig.getHost();
String host = StringUtils.isBlank(configHost) ? IpUtils.getHost() : configHost;
SpringMvcRegisterDTO registerDTO = SpringMvcRegisterDTO.builder()
.context(contextPath)
.host(host)
.port(port)
.appName(appName)
.path(path)
.rpcType(RpcTypeEnum.HTTP.getName())
.enabled(true)
.ruleName(path)
.build();
return OkHttpTools.getInstance().getGson().toJson(registerDTO);
}
}
核心代码逻辑是当配置为全局代理的时候,调用 soul-admin 的/soul-client/springmvc-register 接口,将配置信息同步给 soul-admin。
package org.dromara.soul.client.springmvc.init;
/**
* The type Soul spring mvc client bean post processor.
*
* @author xiaoyu(Myth)
*/
@Slf4j
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
private final ThreadPoolExecutor executorService;
private final String url;
private final SoulSpringMvcConfig soulSpringMvcConfig;
public SpringMvcClientBeanPostProcessor(final SoulSpringMvcConfig soulSpringMvcConfig) {
ValidateUtils.validate(soulSpringMvcConfig);
this.soulSpringMvcConfig = soulSpringMvcConfig;
url = soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register";
executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
}
@Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
if (soulSpringMvcConfig.isFull()) {
return bean;
}
Controller controller = AnnotationUtils.findAnnotation(bean.getClass(), Controller.class);
RestController restController = AnnotationUtils.findAnnotation(bean.getClass(), RestController.class);
RequestMapping requestMapping = AnnotationUtils.findAnnotation(bean.getClass(), RequestMapping.class);
if (controller != null || restController != null || requestMapping != null) {
SoulSpringMvcClient clazzAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), SoulSpringMvcClient.class);
String prePath = "";
if (Objects.nonNull(clazzAnnotation)) {
if (clazzAnnotation.path().indexOf("*") > 1) {
String finalPrePath = prePath;
executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(clazzAnnotation, finalPrePath), url,
RpcTypeEnum.HTTP));
return bean;
}
prePath = clazzAnnotation.path();
}
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
for (Method method : methods) {
SoulSpringMvcClient soulSpringMvcClient = AnnotationUtils.findAnnotation(method, SoulSpringMvcClient.class);
if (Objects.nonNull(soulSpringMvcClient)) {
String finalPrePath = prePath;
executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(soulSpringMvcClient, finalPrePath), url,
RpcTypeEnum.HTTP));
}
}
}
return bean;
}
private String buildJsonParams(final SoulSpringMvcClient soulSpringMvcClient, final String prePath) {
String contextPath = soulSpringMvcConfig.getContextPath();
String appName = soulSpringMvcConfig.getAppName();
Integer port = soulSpringMvcConfig.getPort();
String path = contextPath + prePath + soulSpringMvcClient.path();
String desc = soulSpringMvcClient.desc();
String configHost = soulSpringMvcConfig.getHost();
String host = StringUtils.isBlank(configHost) ? IpUtils.getHost() : configHost;
String configRuleName = soulSpringMvcClient.ruleName();
String ruleName = StringUtils.isBlank(configRuleName) ? path : configRuleName;
SpringMvcRegisterDTO registerDTO = SpringMvcRegisterDTO.builder()
.context(contextPath)
.host(host)
.port(port)
.appName(appName)
.path(path)
.pathDesc(desc)
.rpcType(soulSpringMvcClient.rpcType())
.enabled(soulSpringMvcClient.enabled())
.ruleName(ruleName)
.registerMetaData(soulSpringMvcClient.registerMetaData())
.build();
return OkHttpTools.getInstance().getGson().toJson(registerDTO);
}
}
代码核心逻辑是
校验代码是否有 @Controller 或者 @RestController 或者 @RequestMapping 注解
如果有,则校验是否有 @SoulSpringMvcClient 注解
检查路径配置是否有 1 个以上的 * 号,如果有,则直接发送配置信息到 soul-admin
如果不包含 1 个以上 * 号,则遍历类中每一个方法,找到有 @SoulSpringMvcClient 注解的方法,发送配置到 soul-admin
soul-admin 源码解析
研究服务端注册接口 /soul-client/springmvc-register ,其实现类为 SoulClientController 。
具体实现代码是下面部分
@Override
@Transactional
public String registerSpringMvc(final SpringMvcRegisterDTO dto) {
if (dto.isRegisterMetaData()) {
MetaDataDO exist = metaDataMapper.findByPath(dto.getPath());
if (Objects.isNull(exist)) {
saveSpringMvcMetaData(dto);
}
}
String selectorId = handlerSpringMvcSelector(dto);
handlerSpringMvcRule(selectorId, dto);
return SoulResultMessage.SUCCESS;
}
根据配置地址查库校验是否已存在
如果不存在,则根据收到的配置保存元数据
校验 Selector 是否存在,不存在则保存
校验 Rule 是否存在,不存在则保存
小结
至此,客户端向 soul-admin 同步代码的逻辑就梳理清楚了,其核心配置是 yml 文件中的配置文件和 @SoulSpringMvcClient 注解,发送配置实现的核心代码为 ContextRegisterListener 类和 SpringMvcClientBeanPostProcessor 类,接收配置核心类为 SoulClientController。
soul-admin 向 soul-bootstrap 同步
数据同步流程图
阅读官方文档,发现数据同步流程图如下
数据同步流程
核心依赖
从图中可以看到,数据同步核心为粉色框部分,四种方式的 SPI,然后 soul-gateway 去pull/watch 数据同步到本地。
查看 soul-gateway 的依赖,找到依赖模块为
org.dromara
soul-spring-boot-starter-sync-data-zookeeper
${project.version}
org.dromara
soul-spring-boot-starter-sync-data-websocket
${project.version}
org.dromara
soul-spring-boot-starter-sync-data-http
${project.version}
org.dromara
soul-spring-boot-starter-sync-data-nacos
${project.version}
查看他们的 SPI 定义,代码位置在下图
spi 定义
代码解析
根据 SpringBoot 的 SPI 规范,查看模块对应的 spring.factories 描述文件,以 websocket 为例,配置了同步的配置文件如下
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.dromara.soul.spring.boot.starter.sync.data.websocket.WebsocketSyncDataConfiguration
找到其代码实现为
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider websocketConfig, final ObjectProvider pluginSubscriber,
final ObjectProvider> metaSubscribers, final ObjectProvider> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
主要是 new 了 WebsocketSyncDataService 服务,说明核心服务在 WebsocketSyncDataService 类中。继续跟踪代码研究具体实现逻辑。
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List metaDataSubscribers,
final List authDataSubscribers) {
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}
读取配置中的同步地址列表,根据地址数量创建定时调度连接池
每个同步地址都发起一个 websocket 连接,核心为放入了3个订阅者
每个 socket 连接每 30 秒检测一次健康度,并打印日志
线程池的线程发起一个每 30 秒触发一次的请求,检查连接是否成功,并打印日志
很明显,核心为第二步的3个订阅者,这里采用了订阅者模式,发布者一定是在 soul-admin 中进行的。
数据发布实现
观察 soul-admin 代码,找到如下代码
发布者实现
四种不同的同步方式,都是对 DataChangedListener 的不同实现,发送的自定义事件为DataChangedEvent。
DataChangedListener 定义了5种响应时间,分别是 AppAuth 变更、Plugin 变更、Selector 变更、MetaData 变更和 Rule 变更。
DataChangedEvent 定义了 2 中枚举类型,分别是操作类型(新增、变更等)和数据类型(Plugin、Rule等)。
一次发送事件的实例代码
代码源于org.dromara.soul.admin.listener.zookeeper.HttpServiceDiscovery:line171
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(selectorData)));
小结
soul-admin 向 soul-gateway 同步核心原理为 SPI 机制,SPI 的定义在soul-spring-boot-starter-sync-data-center 模块下的spring.factories描述文件中。
SPI 具体实现在 soul-sync-data-center 模块下,本文以 websocket 方式为例解析了代码,其它方式同步在具体实现方式上略有不同。
网关获取数据采用发布订阅模式,订阅者为 soul-gateway 网关,发布者为 soul-admin,发布者的实现逻辑在 org.dromara.soul.admin.listener 包下面。
soul网关mysql8_深度解析 Soul 网关——数据同步相关推荐
- soul从入门到进阶05——soul-bootstrap数据同步流程
我们在 soul-admin的数据同步流程中分析了admin的数据同步流程,这篇我们来看看soul-bootstrap的数据同步流程 启动 soul-bootstrap 打印如下日志,我们同样从日志着 ...
- 资深数据大牛深度解析:大数据底层架构!
https://www.sohu.com/a/197469801_99989999?sec=wd&spm=smpc.author.fd-d.20.1553654800781yejuW6n 随着 ...
- [案例分享]激活电力价值,八大案例深度解析电力大数据应用
麦肯锡曾有报告预测,在全球范围内,大数据分析方案的广泛使用能够带来每年3000亿美元的电费削减.电力大数据的有效应用可以面向行业内外提供大量的高附加值的增值服务业务,对于电力企业盈利与控制水平的提升有 ...
- 深度解析:大数据面前,统计学的价值在哪里?
导读:本文介绍了关于统计学与大数据的一些观点. 作者:朱利平 来源:<光明日报>2019年03月30日 10版首发 01 统计学对大数据的意义 很高兴有这样一个机会,我能与大家在这里做一些 ...
- JVM-整体结构深度解析(2)
JVM-整体结构深度解析 运行时数据区总览图 栈区 线程栈总览图 线程栈 栈帧 局部变量表 操作数栈 动态链接 方法出口 线程栈JVM调优参数 本地方法栈 pc寄存器 堆区 方法区 线程共有 线程私有 ...
- 大数据任务调度和数据同步组件初探
本文个人博客地址 本文公众号地址 背景 数据从最原始的状态,可能是一个 excel,一个文本,或者是来自业务数据库的数据,格式各种各样,落地到数据仓库.数据湖中,数据的同步过程 是必不可少的 图片来源 ...
- Soul网关源码解析目录
Soul网关源码解析目录 Soul网关源码解析文章列表 对用Java写的高性能网关:Soul,进行一波学习和研究,下面是相关的文章记录 掘金 了解与初步运行 Soul网关源码解析(一) 概览 ...
- Soul网关源码解析(三)代理Dubbo服务
文章目录 目标 一.使用 soul 代理 dubbo 服务 1.dubbo 服务接入网关 1.1 springboot 项目接入方式 1.2 spring 项目接入方式 2.配置 dubbo 插件 3 ...
- Soul 源码分析07 SOUL Admin 网关 Http长轮询 数据同步
SOUL Admin & 网关 Http长轮询 数据同步 书接上回, 使用Http长轮询, 数据同步, soul-admin/src/main/resources/application.ym ...
最新文章
- 2009年2月Windows Mobile Webcast预告
- G7终极2.3.7完美版,黑白分明,值得永久收藏使用
- 这三个NLP项目写进简历,网申通过率提高50%
- C语言入门(4)——常量、变量与赋值
- QT Core | 信号槽03 - 自定义信号与槽
- 作为面试官的一些经历,希望能给找工作的朋友一些参考
- Mac最常用快键键持续更新ing
- Vue-Access-Control:前端用户权限控制解决方案
- “新闻”频道“最新更新”有问题吗?
- 光伏补贴新政出台 投资机会解析
- React Native Android混合开发实战教程
- 如何制做计算机病毒,电脑病毒制作-怎么制作电脑病毒请教高手,怎么做病毒? – 手机爱问...
- 透析极大极小搜索算法和α-β剪枝算法(有案例和完整代码)
- java的数据类型有哪些_java数据类型有哪些
- 如何解决卸载驱动之后又重新装的问题
- kpi绩效考核流程图_KPI绩效考核如何运作起来(内含企业KPI实例之详解)
- oeasy教您玩转vim - 39 - # 剪切粘贴
- Unity 初识:创建游戏场景
- java语言就业方向_Java的就业方向有哪些?
- flash中乱数排列(随机数)方法