Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关。

ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中。Apache ShenYu 网关当前支持ZooKeeperWebSocketHttp长轮询NacosEtcdConsul 进行数据同步。本文的主要内容是基于WebSocket的数据同步源码分析。

本文基于shenyu-2.4.0版本进行源码分析,官网的介绍请参考 数据同步原理 。

1. 关于WebSocket通信

WebSocket协议诞生于2008年,在2011年成为国际标准。它可以双向通信,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息。WebSocket协议建立在 TCP 协议之上,属于应用层,性能开销小,通信高效,协议标识符是ws

2. Admin数据同步

我们从一个实际案例进行源码追踪,比如在后台管理系统中,新增一条选择器数据:

2.1 接收数据

  • SelectorController.createSelector()

进入SelectorController类中的createSelector()方法,它负责数据的校验,添加或更新数据,返回结果信息。

@Validated
@RequiredArgsConstructor
@RestController
@RequestMapping("/selector")
public class SelectorController {@PostMapping("")public ShenyuAdminResult createSelector(@Valid @RequestBody final SelectorDTO selectorDTO) { // @Valid 数校验// 添加或更新数据Integer createCount = selectorService.createOrUpdate(selectorDTO);// 返回结果信息return ShenyuAdminResult.success(ShenyuResultMessage.CREATE_SUCCESS, createCount);}// ......
}

2.2 处理数据

  • SelectorServiceImpl.createOrUpdate()

SelectorServiceImpl类中通过createOrUpdate()方法完成数据的转换,保存到数据库,发布事件,更新upstream

@RequiredArgsConstructor
@Service
public class SelectorServiceImpl implements SelectorService {// 负责事件发布的eventPublisherprivate final ApplicationEventPublisher eventPublisher;@Override@Transactional(rollbackFor = Exception.class)public int createOrUpdate(final SelectorDTO selectorDTO) {int selectorCount;// 构建数据 DTO --> DOSelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();// 判断是添加还是更新if (StringUtils.isEmpty(selectorDTO.getId())) {// 插入选择器数据selectorCount = selectorMapper.insertSelective(selectorDO);// 插入选择器中的条件数据selectorConditionDTOs.forEach(selectorConditionDTO -> {selectorConditionDTO.setSelectorId(selectorDO.getId());selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));});// check selector add// 权限检查if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());dataPermissionDTO.setDataId(selectorDO.getId());dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));}} else {// 更新数据,先删除再新增selectorCount = selectorMapper.updateSelective(selectorDO);//delete rule condition then addselectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));selectorConditionDTOs.forEach(selectorConditionDTO -> {selectorConditionDTO.setSelectorId(selectorDO.getId());SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);selectorConditionMapper.insertSelective(selectorConditionDO);});}// 发布事件publishEvent(selectorDO, selectorConditionDTOs);// 更新upstreamupdateDivideUpstream(selectorDO);return selectorCount;}// ......}

Serrvice类完成数据的持久化操作,即保存数据到数据库,这个大家应该很熟悉了,就不展开。关于更新upstream操作,放到后面对应的章节中进行分析,重点关注发布事件的操作,它会进行数据同步。

publishEvent()方法的逻辑是:找到选择器对应的插件,构建条件数据,发布变更数据。

       private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {// 找到选择器对应的插件PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());// 构建条件数据List<ConditionData> conditionDataList =                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());// 发布变更数据eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));}

发布变更数据通过eventPublisher.publishEvent()完成,这个eventPublisher对象是一个ApplicationEventPublisher类,这个类的全限定名是org.springframework.context.ApplicationEventPublisher。看到这儿,我们知道了发布数据是通过Spring相关的功能来完成的。

关于ApplicationEventPublisher

当有状态发生变化时,发布者调用 ApplicationEventPublisherpublishEvent 方法发布一个事件,Spring容器广播事件给所有观察者,调用观察者的 onApplicationEvent 方法把事件对象传递给观察者。调用 publishEvent方法有两种途径,一种是实现接口由容器注入 ApplicationEventPublisher 对象然后调用其方法,另一种是直接调用容器的方法,两种方法发布事件没有太大区别。

  • ApplicationEventPublisher:发布事件;
  • ApplicationEventSpring 事件,记录事件源、时间和数据;
  • ApplicationListener:事件监听者,观察者;

Spring的事件发布机制中,有三个对象,

一个是发布事件的ApplicationEventPublisher,在ShenYu中通过构造器注入了一个eventPublisher

另一个对象是ApplicationEvent,在ShenYu中通过DataChangedEvent继承了它,表示事件对象。

public class DataChangedEvent extends ApplicationEvent {//......
}

最后一个是 ApplicationListener,在ShenYu中通过DataChangedEventDispatcher类实现了该接口,作为事件的监听者,负责处理事件对象。

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {//......}

2.3 分发数据

  • DataChangedEventDispatcher.onApplicationEvent()

当事件发布完成后,会自动进入到DataChangedEventDispatcher类中的onApplicationEvent()方法,进行事件处理。

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {/*** 有数据变更时,调用此方法* @param event*/@Override@SuppressWarnings("unchecked")public void onApplicationEvent(final DataChangedEvent event) {// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)for (DataChangedListener listener : listeners) {// 哪种数据发生变更switch (event.getGroupKey()) {case APP_AUTH: // 认证信息listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());break;case PLUGIN:  // 插件信息listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());break;case RULE:    // 规则信息listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());break;case SELECTOR:   // 选择器信息listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());break;case META_DATA:  // 元数据listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());break;default:  // 其他类型,抛出异常throw new IllegalStateException("Unexpected value: " + event.getGroupKey());}}}}

当有数据变更时,调用onApplicationEvent方法,然后遍历所有数据变更监听器,判断是哪种数据类型,交给相应的数据监听器进行处理。

ShenYu将所有数据进行了分组,一共是五种:认证信息、插件信息、规则信息、选择器信息和元数据。

这里的数据变更监听器(DataChangedListener),就是数据同步策略的抽象,它的具体实现有:

这几个实现类就是当前ShenYu支持的同步策略:

  • WebsocketDataChangedListener:基于websocket的数据同步;
  • ZookeeperDataChangedListener:基于zookeeper的数据同步;
  • ConsulDataChangedListener:基于consul的数据同步;
  • EtcdDataDataChangedListener:基于etcd的数据同步;
  • HttpLongPollingDataChangedListener:基于http长轮询的数据同步;
  • NacosDataChangedListener:基于nacos的数据同步;

既然有这么多种实现策略,那么如何确定使用哪一种呢?

因为本文是基于websocket的数据同步源码分析,所以这里以WebsocketDataChangedListener为例,分析它是如何被加载并实现的。

通过在源码工程中进行全局搜索,可以看到,它的实现是在DataSyncConfiguration类完成的。

/*** 数据同步配置类* 通过springboot条件装配实现* The type Data sync configuration.*/
@Configuration
public class DataSyncConfiguration {/*** websocket数据同步(默认策略)* The WebsocketListener(default strategy).*/@Configuration@ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)@EnableConfigurationProperties(WebsocketSyncProperties.class)static class WebsocketListener {/*** Config event listener data changed listener.* 配置websocket数据变更监听器* @return the data changed listener*/@Bean@ConditionalOnMissingBean(WebsocketDataChangedListener.class)public DataChangedListener websocketDataChangedListener() {return new WebsocketDataChangedListener();}/*** Websocket collector.* Websocket处理类:建立连接,发送消息,关闭连接等操作* @return the websocket collector*/@Bean@ConditionalOnMissingBean(WebsocketCollector.class)public WebsocketCollector websocketCollector() {return new WebsocketCollector();}/*** Server endpoint exporter ** @return the server endpoint exporter*/@Bean@ConditionalOnMissingBean(ServerEndpointExporter.class)public ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}//......
}

这个配置类是通过SpringBoot条件装配类实现的。在WebsocketListener类上面有几个注解:

  • @Configuration:配置文件,应用上下文;

  • @ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true):属性条件判断,满足条件,该配置类才会生效。也就是说,当我们有如下配置时,就会采用websocket进行数据同步。不过,这里需要注意下matchIfMissing = true这个属性,它表示,如果你没有如下的配置,该配置类也会生效。基于websocket的数据同步时官方推荐的方式,也是默认采用的方式。

    shenyu:  sync:websocket:enabled: true
    
  • @EnableConfigurationProperties:启用配置属性;

当我们主动配置,采用websocket进行数据同步时,WebsocketDataChangedListener就会生成。所以在事件处理方法onApplicationEvent()中,就会到相应的listener中。在我们的案例中,是新增加了一条选择器数据,数据通过采用的是websocket,所以,代码会进入到WebsocketDataChangedListener进行选择器数据变更处理。

    @Override@SuppressWarnings("unchecked")public void onApplicationEvent(final DataChangedEvent event) {// 遍历数据变更监听器(一般使用一种数据同步的方式就好了)for (DataChangedListener listener : listeners) {// 哪种数据发生变更switch (event.getGroupKey()) {// 省略了其他逻辑case SELECTOR:   // 选择器信息listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());   // WebsocketDataChangedListener进行选择器数据变更处理break;}}

2.4 Websocket数据变更监听器

  • WebsocketDataChangedListener.onSelectorChanged()

    onSelectorChanged()方法中,将数据进行了封装,转成WebsocketData,然后通过WebsocketCollector.send()发送数据。

    // 选择器数据有更新@Overridepublic void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {// 构造 WebsocketData 数据WebsocketData<SelectorData> websocketData =new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);// 通过websocket发送数据WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);}

2.5 Websocket发送数据

  • WebsocketCollector.send()

send()方法中,判断了一下同步的类型,根据不同的类型,进行处理。

@Slf4j
@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)
public class WebsocketCollector {/*** Send.** @param message the message* @param type    the type*/public static void send(final String message, final DataEventTypeEnum type) {if (StringUtils.isNotBlank(message)) {// 如果是MYSELF(第一次的全量同步)if (DataEventTypeEnum.MYSELF == type) {// 从threadlocal中获取sessionSession session = (Session) ThreadLocalUtil.get(SESSION_KEY);if (session != null) {// 向该session发送全量数据sendMessageBySession(session, message);}} else {// 后续的增量同步// 向所有的session中同步变更数据SESSION_SET.forEach(session -> sendMessageBySession(session, message));}}}private static void sendMessageBySession(final Session session, final String message) {try {// 通过websocket的session把消息发送出去session.getBasicRemote().sendText(message);} catch (IOException e) {log.error("websocket send result is exception: ", e);}}
}

我们给的案例是一个新增操作 ,是一个增量同步,所以会走

SESSION_SET.forEach(session -> sendMessageBySession(session, message));

这个逻辑。

再通过

session.getBasicRemote().sendText(message);

将数据发送了出去。

至此,当admin端发生数据变更时,就将变更的数据以增量形式通过WebSocket发给了网关。

分析到这里,不知道大家有没有疑问呢?比如session是怎么来的?网关如何和admin建立连接的?

不要着急,我们接下来就进行网关端的同步分析。

不过,在继续源码分析前,我们用一张图将上面的分析过程串联起来。

3. 网关数据同步

假设ShenYu网关已经在正常运行了,使用的数据同步方式也是websocket。那么当在admin端新增一条选择器数据后,并且通过WebSocket发送到网关,那网关是如何接收并处理数据的呢?接下来我们就继续进行源码分析,一探究竟。

3.1 WebsocketClient接收数据

  • ShenyuWebsocketClient.onMessage()

在网关端有一个ShenyuWebsocketClient类,它继承了WebSocketClient,可以和WebSocket建立连接并通信。

public final class ShenyuWebsocketClient extends WebSocketClient {// ......
}

当在admin端通过websocket发送数据后,ShenyuWebsocketClient就可以通过onMessage()接收到数据,然后就可以自己进行处理。

public final class ShenyuWebsocketClient extends WebSocketClient {// 接受到消息后执行@Overridepublic void onMessage(final String result) {// 处理接收到的数据handleResult(result);}private void handleResult(final String result) {// 数据反序列化WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);// 哪种数据类型,插件、选择器、规则...ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());// 哪种操作类型,更新、删除...String eventType = websocketData.getEventType();String json = GsonUtils.getInstance().toJson(websocketData.getData());// 处理数据websocketDataHandler.executor(groupEnum, json, eventType);}
}

接收到数据后,首先进行了反序列化操作,读取数据类型和操作类型,紧接着,就交给websocketDataHandler.executor()进行处理。

3.2 执行Websocket事件处理器

  • WebsocketDataHandler.executor()

通过工厂模式创建了Websocket数据处理器,每种数据类型,都提供了一个处理器:

插件 --> 插件数据处理器;

选择器 --> 选择器数据处理器;

规则 --> 规则数据处理器;

认证信息 --> 认证数据处理器;

元数据 --> 元数据处理器。

/*** 通过工厂模式创建 Websocket数据处理器* The type Websocket cache handler.*/
public class WebsocketDataHandler {private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);/*** Instantiates a new Websocket data handler.* 每种数据类型,提供一个处理器* @param pluginDataSubscriber the plugin data subscriber* @param metaDataSubscribers  the meta data subscribers* @param authDataSubscribers  the auth data subscribers*/public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers,final List<AuthDataSubscriber> authDataSubscribers) {// 插件 --> 插件数据处理器ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));// 选择器 --> 选择器数据处理器ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));// 规则 --> 规则数据处理器ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));// 认证信息 --> 认证数据处理器ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));// 元数据 --> 元数据处理器ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));}/*** Executor.** @param type      the type* @param json      the json* @param eventType the event type*/public void executor(final ConfigGroupEnum type, final String json, final String eventType) {// 根据数据类型,找到对应的数据处理器ENUM_MAP.get(type).handle(json, eventType);}
}

不同的数据类型,有不同的数据处理方式,所以有不同的实现类。但是它们之间也有相同的处理逻辑,所以可以通过模板方法设计模式来实现。相同的逻辑放在抽象类中的handle()方法中,不同逻辑就交给各自的实现类。

我们的案例是新增了一条选择器数据,所以会交给SelectorDataHandler( 选择器 --> 选择器数据处理器)进行数据处理。

3.3 判断事件类型

  • AbstractDataHandler.handle()

实现数据变更的通用逻辑处理:根据不同的操作类型调用不同方法。

public abstract class AbstractDataHandler<T> implements DataHandler {/*** Convert list.* 不同的逻辑由各自实现类去实现* @param json the json* @return the list*/protected abstract List<T> convert(String json);/*** Do refresh.* 不同的逻辑由各自实现类去实现* @param dataList the data list*/protected abstract void doRefresh(List<T> dataList);/*** Do update.* 不同的逻辑由各自实现类去实现* @param dataList the data list*/protected abstract void doUpdate(List<T> dataList);/*** Do delete.* 不同的逻辑由各自实现类去实现* @param dataList the data list*/protected abstract void doDelete(List<T> dataList);// 通用逻辑,抽象类实现@Overridepublic void handle(final String json, final String eventType) {List<T> dataList = convert(json);if (CollectionUtils.isNotEmpty(dataList)) {DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);switch (eventTypeEnum) {case REFRESH:case MYSELF:doRefresh(dataList);  //刷新数据,全量同步break;case UPDATE:case CREATE:doUpdate(dataList); // 更新或创建数据,增量同步break;case DELETE:doDelete(dataList);  // 删除数据break;default:break;}}}
}

新增一条选择器数据,是新增操作,通过switch-case进入到doUpdate()方法中。

3.4 进入具体的数据处理器

  • SelectorDataHandler.doUpdate()
/*** 选择器数据处理器* The type Selector data handler.*/
@RequiredArgsConstructor
public class SelectorDataHandler extends AbstractDataHandler<SelectorData> {private final PluginDataSubscriber pluginDataSubscriber;//......// 更新操作@Overrideprotected void doUpdate(final List<SelectorData> dataList) {dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);}
}

遍历数据,进入onSelectorSubscribe()方法。

  • PluginDataSubscriber.onSelectorSubscribe()

它没有其他逻辑,直接调用subscribeDataHandler()方法。在方法中,更具数据类型(插件、选择器或规则),操作类型(更新或删除),去执行不同逻辑。

/*** 通用插件数据订阅者,负责处理所有插件、选择器和规则信息* The type Common plugin data subscriber.*/
public class CommonPluginDataSubscriber implements PluginDataSubscriber {//......// 处理选择器数据@Overridepublic void onSelectorSubscribe(final SelectorData selectorData) {subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);}    // 订阅数据处理器,处理数据的更新或删除private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {Optional.ofNullable(classData).ifPresent(data -> {// 插件数据if (data instanceof PluginData) {PluginData pluginData = (PluginData) data;if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作// 将数据保存到网关内存BaseDataCache.getInstance().cachePluginData(pluginData);// 如果每个插件还有自己的处理逻辑,那么就去处理Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));} else if (dataType == DataEventTypeEnum.DELETE) {  // 删除操作// 从网关内存移除数据BaseDataCache.getInstance().removePluginData(pluginData);// 如果每个插件还有自己的处理逻辑,那么就去处理Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));}} else if (data instanceof SelectorData) {  // 选择器数据SelectorData selectorData = (SelectorData) data;if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作// 将数据保存到网关内存BaseDataCache.getInstance().cacheSelectData(selectorData);// 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));} else if (dataType == DataEventTypeEnum.DELETE) {  // 删除操作// 从网关内存移除数据BaseDataCache.getInstance().removeSelectData(selectorData);// 如果每个插件还有自己的处理逻辑,那么就去处理Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));}} else if (data instanceof RuleData) {  // 规则数据RuleData ruleData = (RuleData) data;if (dataType == DataEventTypeEnum.UPDATE) { // 更新操作// 将数据保存到网关内存BaseDataCache.getInstance().cacheRuleData(ruleData);// 如果每个插件还有自己的处理逻辑,那么就去处理Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));} else if (dataType == DataEventTypeEnum.DELETE) { // 删除操作// 从网关内存移除数据BaseDataCache.getInstance().removeRuleData(ruleData);// 如果每个插件还有自己的处理逻辑,那么就去处理Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));}}});}}

那么新增一条选择器数据,会进入下面的逻辑:

// 将数据保存到网关内存
BaseDataCache.getInstance().cacheSelectData(selectorData);
// 如果每个插件还有自己的处理逻辑,那么就去处理                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));

一是将数据保存到网关的内存中。BaseDataCache是最终缓存数据的类,通过单例模式实现。选择器数据就存到了SELECTOR_MAP这个Map中。在后续使用的时候,也是从这里拿数据。

public final class BaseDataCache {// 私有变量private static final BaseDataCache INSTANCE = new BaseDataCache();// 私有构造器private BaseDataCache() {}/*** Gets instance.*  公开方法* @return the instance*/public static BaseDataCache getInstance() {return INSTANCE;}/***  缓存选择器数据的Map* pluginName -> SelectorData.*/private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();public void cacheSelectData(final SelectorData selectorData) {Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);}/*** cache selector data.* 缓存选择器数据* @param data the selector data*/private void selectorAccept(final SelectorData data) {String key = data.getPluginName();if (SELECTOR_MAP.containsKey(key)) { // 更新操作,先删除再插入List<SelectorData> existList = SELECTOR_MAP.get(key);final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());resultList.add(data);final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());SELECTOR_MAP.put(key, collect);} else {  // 新增操作,直接放到Map中SELECTOR_MAP.put(key, Lists.newArrayList(data));}}}

二是如果每个插件还有自己的处理逻辑,那么就去处理。 通过idea编辑器可以看到,当新增一条选择器后,有如下的插件还有处理。这里我们就不再展开了。

经过以上的源码追踪,并通过一个实际的案例,在admin端新增一条选择器数据,就将websocket数据同步的流程分析清除了。

我们还是用下面的一张图将网关端的数据同步流程串联一下:

数据同步的流程已经分析完了,但是还有一些问题没有分析到,就是网关是如何跟admin建立连接的?

4. 网关和admin建立websocket连接

  • websocket配置

在网关的配置文件中有如下配置,并且引入了相关依赖,就会启动websocket相关服务。

shenyu:file:enabled: truecross:enabled: truedubbo :parameter: multisync:websocket :  # 使用websocket进行数据同步urls: ws://localhost:9095/websocket   # admin端的websocket地址

在网关中引入websocket的依赖。

<!--shenyu data sync start use websocket-->
<dependency><groupId>org.apache.shenyu</groupId><artifactId>shenyu-spring-boot-starter-sync-data-websocket</artifactId><version>${project.version}</version>
</dependency>
  • Websocket数据同步配置

通过springboot的条件装配,创建相关的bean。在网关启动的时候,如果我们配置了shenyu.sync.websocket.urls,那么Websocket数据同步配置就会被加载。这里通过spring boot starter完成依赖的加载。

/*** Websocket数据同步配置* 通过springboot实现条件注入* Websocket sync data configuration for spring boot.*/
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {/*** Websocket sync data service.* Websocket数据同步服务* @param websocketConfig   the websocket config* @param pluginSubscriber the plugin subscriber* @param metaSubscribers   the meta subscribers* @param authSubscribers   the auth subscribers* @return the sync data service*/// 创建websocketSyncDataService@Beanpublic SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {log.info("you use websocket sync shenyu data.......");return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));}/*** Config websocket config.** @return the websocket config*/@Bean@ConfigurationProperties(prefix = "shenyu.sync.websocket")public WebsocketConfig websocketConfig() {return new WebsocketConfig();  // 创建WebsocketConfig}
}

在项目的resources/META-INF目录先新建spring.factories文件,在文件中指明配置类。

  • Websocket数据同步服务

WebsocketSyncDataService中做了如下几件事情:

  • 读取配置中的urls,这个表示admin端的同步地址,有多个的话,使用","分割;
  • 创建调度线程池,一个admin分配一个,用于执行定时任务;
  • 创建ShenyuWebsocketClient,一个admin分配一个,用于和admin建立websocket通信;
  • 开始和admin端的websocket 建立连接;
  • 执行定时任务,每隔10秒执行一次。主要作用是判断websocket连接是否已经断开,如果已经断开,则尝试重连。如果没有断开,就进行 ping-pong 检测。
/*** Websocket数据同步服务* Websocket sync data service.*/
@Slf4j
public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {private final List<WebSocketClient> clients = new ArrayList<>();private final ScheduledThreadPoolExecutor executor;/*** Instantiates a new Websocket sync cache.* 创建Websocket数据同步服务* @param websocketConfig      the websocket config* @param pluginDataSubscriber the plugin data subscriber* @param metaDataSubscribers  the meta data subscribers* @param authDataSubscribers  the auth data subscribers*/public WebsocketSyncDataService(final WebsocketConfig websocketConfig,final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers,final List<AuthDataSubscriber> authDataSubscribers) {// admin端的同步地址,有多个的话,使用","分割String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");// 创建调度线程池,一个admin分配一个executor = new ScheduledThreadPoolExecutor(urls.length, ShenyuThreadFactory.create("websocket-connect", true));for (String url : urls) {try {//创建WebsocketClient,一个admin分配一个clients.add(new ShenyuWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));} catch (URISyntaxException e) {log.error("websocket url({}) is error", url, e);}}try {for (WebSocketClient client : clients) {// 和websocket server建立连接boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);if (success) {log.info("websocket connection is successful.....");} else {log.error("websocket connection is error.....");}// 执行定时任务,每隔10秒执行一次// 主要作用是判断websocket连接是否已经断开,如果已经断开,则尝试重连。// 如果没有断开,就进行 ping-pong 检测executor.scheduleAtFixedRate(() -> {try {if (client.isClosed()) {boolean reconnectSuccess = client.reconnectBlocking();if (reconnectSuccess) {log.info("websocket reconnect server[{}] is successful.....", client.getURI().toString());} else {log.error("websocket reconnection server[{}] is error.....", client.getURI().toString());}} else {client.sendPing();log.debug("websocket send to [{}] ping message successful", client.getURI().toString());}} catch (InterruptedException e) {log.error("websocket connect is error :{}", e.getMessage());}}, 10, 10, TimeUnit.SECONDS);}/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/} catch (InterruptedException e) {log.info("websocket connection...exception....", e);}}@Overridepublic void close() {// 关闭 websocket clientfor (WebSocketClient client : clients) {if (!client.isClosed()) {client.close();}}// 关闭线程池if (Objects.nonNull(executor)) {executor.shutdown();}}
}
  • ShenyuWebsocketClient

ShenYu中创建的WebSocket客户端,用于和admin端通信。第一次成功建立连接后,同步全量数据,后续进行增量同步。

/*** 在ShenYu中自定义的WebSocket客户端* The type shenyu websocket client.*/
@Slf4j
public final class ShenyuWebsocketClient extends WebSocketClient {private volatile boolean alreadySync = Boolean.FALSE;private final WebsocketDataHandler websocketDataHandler;/*** Instantiates a new shenyu websocket client.* 创建ShenyuWebsocketClient* @param serverUri             the server uri  服务端uri* @param pluginDataSubscriber the plugin data subscriber 插件数据订阅器* @param metaDataSubscribers   the meta data subscribers 元数据订阅器* @param authDataSubscribers   the auth data subscribers 认证数据订阅器*/public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {super(serverUri);this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);}// 成功建立连接后执行@Overridepublic void onOpen(final ServerHandshake serverHandshake) {// 防止重新建立连接时,再次执行,所以用alreadySync进行判断if (!alreadySync) {// 同步所有数据,MYSELF 类型send(DataEventTypeEnum.MYSELF.name());alreadySync = true;}}// 接受到消息后执行@Overridepublic void onMessage(final String result) {// 处理接收到的数据handleResult(result);}// 关闭后执行@Overridepublic void onClose(final int i, final String s, final boolean b) {this.close();}// 失败后执行@Overridepublic void onError(final Exception e) {this.close();}@SuppressWarnings("ALL")private void handleResult(final String result) {// 数据反序列化WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);// 哪种数据类型,插件、选择器、规则...ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());// 哪种操作类型,更新、删除...String eventType = websocketData.getEventType();String json = GsonUtils.getInstance().toJson(websocketData.getData());// 处理数据websocketDataHandler.executor(groupEnum, json, eventType);}
}

5. 总结

本文通过一个实际案例,对websocket的数据同步原理进行了源码分析。涉及到的主要知识点如下:

  • websocket支持双向通信,性能好,推荐使用;
  • 通过Spring完成事件发布和监听;
  • 通过抽象DataChangedListener接口,支持多种同步策略,面向接口编程;
  • 使用工厂模式创建 WebsocketDataHandler,实现不同数据类型的处理;
  • 使用模板方法设计模式实现AbstractDataHandler,处理通用的操作类型;
  • 使用单例设计模式实现缓存数据类BaseDataCache
  • 通过SpringBoot的条件装配和starter加载机制实现配置类的加载。

Apache ShenYu源码阅读系列-基于WebSocket的数据同步相关推荐

  1. Apache ShenYu源码阅读系列-基于ZooKeeper的数据同步

    Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关. 在ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中.Apac ...

  2. 【Dubbo源码阅读系列】之远程服务调用(上)

    今天打算来讲一讲 Dubbo 服务远程调用.笔者在开始看 Dubbo 远程服务相关源码的时候,看的有点迷糊.后来慢慢明白 Dubbo 远程服务的调用的本质就是动态代理模式的一种实现.本地消费者无须知道 ...

  3. TiDB 源码阅读系列文章(十五)Sort Merge Join

    2019独角兽企业重金招聘Python工程师标准>>> 什么是 Sort Merge Join 在开始阅读源码之前, 我们来看看什么是 Sort Merge Join (SMJ),定 ...

  4. SpringMVC源码阅读系列汇总

    1.前言 1.1 导入 SpringMVC是基于Servlet和Spring框架设计的Web框架,做JavaWeb的同学应该都知道 本文基于Spring4.3.7源码分析,(不要被图片欺骗了,手动滑稽 ...

  5. TiDB 源码阅读系列文章(十六)INSERT 语句详解

    在之前的一篇文章 <TiDB 源码阅读系列文章(四)INSERT 语句概览> 中,我们已经介绍了 INSERT 语句的大体流程.为什么需要为 INSERT 单独再写一篇?因为在 TiDB ...

  6. 【Dubbo源码阅读系列】服务暴露之本地暴露

    在上一篇文章中我们介绍 Dubbo 自定义标签解析相关内容,其中我们自定义的 XML 标签 <dubbo:service /> 会被解析为 ServiceBean 对象(传送门:Dubbo ...

  7. TiDB 源码阅读系列文章(六)Select 语句概览

    在先前的 TiDB 源码阅读系列文章(四) 中,我们介绍了 Insert 语句,想必大家已经了解了 TiDB 是如何写入数据,本篇文章介绍一下 Select 语句是如何执行.相比 Insert,Sel ...

  8. TiDB 源码阅读系列文章(十九)tikv-client(下)

    上篇文章 中,我们介绍了数据读写过程中 tikv-client 需要解决的几个具体问题,本文将继续介绍 tikv-client 里的两个主要的模块--负责处理分布式计算的 copIterator 和执 ...

  9. DM 源码阅读系列文章(二)整体架构介绍

    2019独角兽企业重金招聘Python工程师标准>>> 作者:张学程 本文为 DM 源码阅读系列文章的第二篇,第一篇文章 简单介绍了 DM 源码阅读的目的和规划,以及 DM 的源码结 ...

  10. DM 源码阅读系列文章(四)dump/load 全量同步的实现

    作者:杨非 本文为 DM 源码阅读系列文章的第四篇,上篇文章 介绍了数据同步处理单元实现的功能,数据同步流程的运行逻辑以及数据同步处理单元的 interface 设计.本篇文章在此基础上展开,详细介绍 ...

最新文章

  1. 特殊时期之下的 AI 成功“逆行”,技术繁荣生长之下的“AI隐疾”
  2. eclipse设置文档注释的格式
  3. make j4什么意思_为什么天天坚持撸铁 肌肉增长不明显
  4. Selenium两万字大题库
  5. Node js npm 包管理工具的基本使用
  6. mysql中最常用的存储引擎有_mysql常用的存储引擎有哪些
  7. ZOJ2833*(并查集)
  8. db2 linux 64位下载,Redhat6.2 64位 安装DB2V10.5
  9. VCC、VDD、VSS、GND分别代表什么呢?
  10. python 移动平均线_使用python计算简单移动平均线
  11. 撸了个反代工具, 可用于激活JRebel
  12. dcos 1.7 目录挂载测试
  13. 秋冬易感冒着凉 风寒感冒9大食疗方
  14. MSS(Microsoft smoothing streaming)介绍
  15. KWP2000协议学习笔记(一)
  16. 深入理解BFC与IFC
  17. Gogole C++ 编程风格(二)
  18. 会打字、能翻译,联想智能语音鼠标好小橙使用评测
  19. Android系统初识
  20. 第十二篇:从生稣出熟稣,从熟稣出醍醐-再读内核驱动设计目标

热门文章

  1. 驾考——科一,三笔记
  2. npm安装依赖包 版本冲突怎么办 --legacy-peer-deps的正确使用方
  3. Android原生获取经纬度位置信息
  4. DS18B20数字温度计 (二) 测温, ROM和CRC算法
  5. BOM Routing (2009-08-31 23:46:00)
  6. 用户·角色·权限·表
  7. 2012-7-07可樂词汇积累#9314;
  8. 反馈电路反馈类型的快速判断
  9. excel按某个值进行筛选后,将筛选出来的行进行排序
  10. 40个笑到抽筋的神回复,哈哈哈哈哈...