目录

  • admin数据同步
    • 从SelectorController讲起
    • 发布事件
    • ConfigGroupEnum
    • DataEventTypeEnum
    • DataChangedEventDispatcher
    • 配置解析
  • gateway数据同步
    • ShenyuWebsocketClient
    • WebsocketDataHandler
    • AbstractDataHandler
    • CommonPluginDataSubscriber
    • BaseDataCache
    • 配置解析
  • 验证猜想
    • 增量数据同步
    • 全量数据同步
    • 总结

当在后台管理系统中,数据发生了更新后,如何将更新的数据同步到网关中呢?

ShenYu支持多种同步方式,本文以WebSocket为例分析。

admin数据同步

从SelectorController讲起

一般情况下像admin这种后台管理系统,启动的时候应该会全量的同步一次数据,后续如果发生修改,会增量同步数据。所以我们在admin的controller下,可以找到SelectorController的createSelector方法,从开始createSelector分析。

@PostMapping("")
public ShenyuAdminResult createSelector(@Valid @RequestBody final SelectorDTO selectorDTO) {Integer createCount = selectorService.createOrUpdate(selectorDTO);return ShenyuAdminResult.success(ShenyuResultMessage.CREATE_SUCCESS, createCount);
}

具体实现如下

@RequiredArgsConstructor
@Service
public class SelectorServiceImpl implements SelectorService {// 负责事件发布的eventPublisherprivate final ApplicationEventPublisher eventPublisher;@Override@Transactional(rollbackFor = Exception.class)public int createOrUpdate(final SelectorDTO selectorDTO) {int selectorCount;// 构建数据 DTO --> DOSelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();// 判断是添加还是更新if (StringUtils.isEmpty(selectorDTO.getId())) {// 插入选择器数据selectorCount = selectorMapper.insertSelective(selectorDO);// 插入选择器中的条件数据selectorConditionDTOs.forEach(selectorConditionDTO -> {selectorConditionDTO.setSelectorId(selectorDO.getId());selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));});// check selector add// 权限检查if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());dataPermissionDTO.setDataId(selectorDO.getId());dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));}} else {// 更新数据,先删除再新增selectorCount = selectorMapper.updateSelective(selectorDO);//delete rule condition then addselectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));selectorConditionDTOs.forEach(selectorConditionDTO -> {selectorConditionDTO.setSelectorId(selectorDO.getId());SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);selectorConditionMapper.insertSelective(selectorConditionDO);});}// 发布事件publishEvent(selectorDO, selectorConditionDTOs);// 更新upstreamupdateDivideUpstream(selectorDO);return selectorCount;}// ......}

浏览以上代码,我们猜测和同步有关系的方法,如下:

发布事件

publishEvent(selectorDO, selectorConditionDTOs);

发布事件

private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());List<ConditionData> conditionDataList =selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());// publish change event.eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
}

eventPublisher的对象为org.springframework.context.ApplicationEventPublisher。那么他一定是通过Spring的事件发布机制实现的。

Spring的事件发布机制,需要有发布者、监听者、以及事件。目前我们已经找到了发布者,从上面源码中我们可以知道,事件为DataChangedEvent。

public class DataChangedEvent extends ApplicationEvent {private final DataEventTypeEnum eventType;private final ConfigGroupEnum groupKey;public DataChangedEvent(final ConfigGroupEnum groupKey, final DataEventTypeEnum type, final List<?> source) {super(source.stream().filter(Objects::nonNull).collect(Collectors.toList()));this.eventType = type;this.groupKey = groupKey;}
}

ConfigGroupEnum

从ConfigGroupEnum中我们可以知道,admin需要与gateway同步的数据可能有插件、规则、选择器、元数据以及鉴权信息。

public enum ConfigGroupEnum {/*** App auth config group enum.*/APP_AUTH,/*** Plugin config group enum.*/PLUGIN,/*** Rule config group enum.*/RULE,/*** Selector config group enum.*/SELECTOR,/*** Meta data config group enum.*/META_DATA;/*** Acquire by name config group enum.** @param name the name* @return the config group enum*/public static ConfigGroupEnum acquireByName(final String name) {return Arrays.stream(ConfigGroupEnum.values()).filter(e -> Objects.equals(e.name(), name)).findFirst().orElseThrow(() -> new ShenyuException(String.format(" this ConfigGroupEnum can not support %s", name)));}
}

DataEventTypeEnum

从DataEventTypeEnum 我们可以知道,以下事件可能会触发同步,删除、创建、更新、刷新以及第一次的全量同步(我们猜测MYSELF,为全量标识)

public enum DataEventTypeEnum {/*** delete event.*/DELETE,/*** insert event.*/CREATE,/*** update event.*/UPDATE,/*** REFRESH data event type enum.*/REFRESH,/*** Myself data event type enum.*/MYSELF;/*** Acquire by name data event type enum.** @param name the name* @return the data event type enum*/public static DataEventTypeEnum acquireByName(final String name) {return Arrays.stream(DataEventTypeEnum.values()).filter(e -> Objects.equals(e.name(), name)).findFirst().orElseThrow(() -> new ShenyuException(String.format(" this DataEventTypeEnum can not support %s", name)));}
}

DataChangedEventDispatcher

监听者一般是实现了ApplicationListener接口的,于是我们全局搜索发现了下面这个类

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {private final ApplicationContext applicationContext;private List<DataChangedListener> listeners;public DataChangedEventDispatcher(final ApplicationContext applicationContext) {this.applicationContext = applicationContext;}@Override@SuppressWarnings("unchecked")public void onApplicationEvent(final DataChangedEvent event) {for (DataChangedListener listener : listeners) {switch (event.getGroupKey()) {case APP_AUTH:listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());break;case PLUGIN:listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());break;case RULE:listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());break;case SELECTOR:listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());break;case META_DATA:listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());break;default:throw new IllegalStateException("Unexpected value: " + event.getGroupKey());}}}@Overridepublic void afterPropertiesSet() {Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));}
}

配置解析

继续往下走我们发现有以下几个类

既然有这么多种实现,那admin又是怎么选择的呢?这里我们可以回想一下,在项目启动的时候,我们配置了以下同步属性

通过搜寻"shenyu.sync.websocket",我们找到了WebsocketSyncProperties

@ConfigurationProperties(prefix = "shenyu.sync.websocket")
public class WebsocketSyncProperties {/*** default: true.*/private boolean enabled = true;/*** Gets the value of enabled.** @return the value of enabled*/public boolean isEnabled() {return enabled;}/*** Sets the enabled.** @param enabled enabled*/public void setEnabled(final boolean enabled) {this.enabled = enabled;}
}

但是该配置文件只有是否开启websocket,并没有注册WebsocketDataChangedListener等通信相关的Bean,所以我们通过WebsocketSyncProperties 继续往下找,找到了下面这个类,这正是我们想要的~

@Configuration
// 如果shenyu.sync.websocket.enabled=true,那么就会走这个配置
// 这里默认为true
@ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(WebsocketSyncProperties.class)
static class WebsocketListener {/*** Config event listener data changed listener.** @return the data changed listener*/@Bean@ConditionalOnMissingBean(WebsocketDataChangedListener.class)public DataChangedListener websocketDataChangedListener() {return new WebsocketDataChangedListener();}/*** Websocket collector.** @return the websocket collector*/@Bean@ConditionalOnMissingBean(WebsocketCollector.class)public WebsocketCollector websocketCollector() {return new WebsocketCollector();}/*** Server endpoint exporter server endpoint exporter.** @return the server endpoint exporter*/@Bean@ConditionalOnMissingBean(ServerEndpointExporter.class)public ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}

回到listener,这里我们选择WebsocketDataChangedListener

public class WebsocketDataChangedListener implements DataChangedListener {@Overridepublic void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {// 组装WebsocketData数据WebsocketData<SelectorData> websocketData =new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);// 通信WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);}
}

我们再来看WebsocketCollector.send这个方法(WebsocketCollector正好也是前面注册过的Bean)

@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)
public class WebsocketCollector {private static final Logger LOG = LoggerFactory.getLogger(WebsocketCollector.class);private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();private static final String SESSION_KEY = "sessionKey";/*** Send.** @param message the message* @param type    the type*/public static void send(final String message, final DataEventTypeEnum type) {if (StringUtils.isBlank(message)) {return;}// 这里侧面证实了我们的猜想,DataEventTypeEnum.MYSELF为全量同步// 因为这里创建了需要通信的websocket的sessionif (DataEventTypeEnum.MYSELF == type) {Session session = (Session) ThreadLocalUtils.get(SESSION_KEY);if (Objects.nonNull(session)) {sendMessageBySession(session, message);}} else {SESSION_SET.forEach(session -> sendMessageBySession(session, message));}}private static synchronized void sendMessageBySession(final Session session, final String message) {try {// 通过websocket的session把消息发送出去session.getBasicRemote().sendText(message);} catch (IOException e) {LOG.error("websocket send result is exception: ", e);}}// ......
}

gateway数据同步

ShenyuWebsocketClient

有了数据的发送,那肯定得有接收,我们可以在shenyu-sync-data-websocket下面找到ShenyuWebsocketClient这个类,他继承了WebSocketClient。

public final class ShenyuWebsocketClient extends WebSocketClient {/*** logger.*/private static final Logger LOG = LoggerFactory.getLogger(ShenyuWebsocketClient.class);private volatile boolean alreadySync = Boolean.FALSE;private final WebsocketDataHandler websocketDataHandler;/*** Instantiates a new shenyu websocket client.** @param serverUri            the server uri* @param pluginDataSubscriber the plugin data subscriber* @param metaDataSubscribers  the meta data subscribers* @param authDataSubscribers  the auth data subscribers*/public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {super(serverUri);this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);}@Overridepublic void onOpen(final ServerHandshake serverHandshake) {、// 如果没有同步过,那么启动的时候会去同步一次if (!alreadySync) {// 再次验证了MYSELF为,全量同步标识send(DataEventTypeEnum.MYSELF.name());alreadySync = true;}}@Overridepublic void onMessage(final String result) {handleResult(result);}@Overridepublic void onClose(final int i, final String s, final boolean b) {this.close();}@Overridepublic void onError(final Exception e) {this.close();}@SuppressWarnings("ALL")private void handleResult(final String result) {LOG.info("handleResult({})", result);WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());String eventType = websocketData.getEventType();String json = GsonUtils.getInstance().toJson(websocketData.getData());websocketDataHandler.executor(groupEnum, json, eventType);}
}

WebsocketDataHandler

浏览了一下ShenyuWebsocketClient,发现websocketDataHandler.executor(groupEnum, json, eventType);为关键方法,具体实现如下

public class WebsocketDataHandler {private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);/*** Instantiates a new Websocket data handler.** @param pluginDataSubscriber the plugin data subscriber* @param metaDataSubscribers  the meta data subscribers* @param authDataSubscribers  the auth data subscribers*/public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers,final List<AuthDataSubscriber> authDataSubscribers) {ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));}/*** Executor.** @param type      the type* @param json      the json* @param eventType the event type*/public void executor(final ConfigGroupEnum type, final String json, final String eventType) {ENUM_MAP.get(type).handle(json, eventType);}
}

该类往内存中放入了五种handler,这五种与ConfigGroupEnum的类型一一对应。接下来调用此方法

ENUM_MAP.get(type).handle(json, eventType);

AbstractDataHandler

他是在AbstractDataHandler这个抽象方法里面做分发的

public abstract class AbstractDataHandler<T> implements DataHandler {@Overridepublic void handle(final String json, final String eventType) {List<T> dataList = convert(json);if (CollectionUtils.isEmpty(dataList)) {return;}DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);switch (eventTypeEnum) {case REFRESH:case MYSELF:doRefresh(dataList);break;case UPDATE:case CREATE:doUpdate(dataList);break;case DELETE:doDelete(dataList);break;default:break;}}
}

因为我们是新增了一个选择器,所以最终会调用SelectorDataHandler.doUpdate

@Override
protected void doUpdate(final List<SelectorData> dataList) {dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);
}

CommonPluginDataSubscriber

查找onSelectorSubscribe方法,我们能找到CommonPluginDataSubscriber类

public class CommonPluginDataSubscriber implements PluginDataSubscriber {@Overridepublic void onSelectorSubscribe(final SelectorData selectorData) {subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);}
}

subscribeDataHandler具体实现

// 订阅数据处理程序
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {Optional.ofNullable(classData).ifPresent(data -> {// 插件执行的操作if (data instanceof PluginData) {PluginData pluginData = (PluginData) data;if (dataType == DataEventTypeEnum.UPDATE) {BaseDataCache.getInstance().cachePluginData(pluginData);Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));} else if (dataType == DataEventTypeEnum.DELETE) {BaseDataCache.getInstance().removePluginData(pluginData);Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));}} // 选择器执行的操作else if (data instanceof SelectorData) {SelectorData selectorData = (SelectorData) data;if (dataType == DataEventTypeEnum.UPDATE) {// 将数据缓存到内存BaseDataCache.getInstance().cacheSelectData(selectorData);// 如果每个插件还有自己的处理逻辑,那么就去处理自己的逻辑Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));} else if (dataType == DataEventTypeEnum.DELETE) {BaseDataCache.getInstance().removeSelectData(selectorData);Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));}} // 规则执行的操作else if (data instanceof RuleData) {RuleData ruleData = (RuleData) data;if (dataType == DataEventTypeEnum.UPDATE) {BaseDataCache.getInstance().cacheRuleData(ruleData);Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));} else if (dataType == DataEventTypeEnum.DELETE) {BaseDataCache.getInstance().removeRuleData(ruleData);Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));}}});
}

BaseDataCache

调用BaseDataCache.getInstance().cacheSelectData(selectorData);将数据放入BaseDataCache这个单例内存中。

public final class BaseDataCache {private static final BaseDataCache INSTANCE = new BaseDataCache();/*** pluginName -> PluginData.*/private static final ConcurrentMap<String, PluginData> PLUGIN_MAP = Maps.newConcurrentMap();/*** pluginName -> SelectorData.*/private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();/*** selectorId -> RuleData.*/private static final ConcurrentMap<String, List<RuleData>> RULE_MAP = Maps.newConcurrentMap();private BaseDataCache() {}/*** Gets instance.** @return the instance*/public static BaseDataCache getInstance() {return INSTANCE;}/*** Cache plugin data.** @param pluginData the plugin data*/public void cachePluginData(final PluginData pluginData) {Optional.ofNullable(pluginData).ifPresent(data -> PLUGIN_MAP.put(data.getName(), data));}/*** Remove plugin data.** @param pluginData the plugin data*/public void removePluginData(final PluginData pluginData) {Optional.ofNullable(pluginData).ifPresent(data -> PLUGIN_MAP.remove(data.getName()));}/*** Remove plugin data by plugin name.** @param pluginName the plugin name*/public void removePluginDataByPluginName(final String pluginName) {PLUGIN_MAP.remove(pluginName);}/*** Clean plugin data.*/public void cleanPluginData() {PLUGIN_MAP.clear();}/*** Clean plugin data self.** @param pluginDataList the plugin data list*/public void cleanPluginDataSelf(final List<PluginData> pluginDataList) {pluginDataList.forEach(this::removePluginData);}/*** Obtain plugin data plugin data.** @param pluginName the plugin name* @return the plugin data*/public PluginData obtainPluginData(final String pluginName) {return PLUGIN_MAP.get(pluginName);}/*** Cache select data.** @param selectorData the selector data*/public void cacheSelectData(final SelectorData selectorData) {Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);}/*** Remove select data.** @param selectorData the selector data*/public void removeSelectData(final SelectorData selectorData) {Optional.ofNullable(selectorData).ifPresent(data -> {final List<SelectorData> selectorDataList = SELECTOR_MAP.get(data.getPluginName());Optional.ofNullable(selectorDataList).ifPresent(list -> list.removeIf(e -> e.getId().equals(data.getId())));});}/*** Remove select data by plugin name.** @param pluginName the plugin name*/public void removeSelectDataByPluginName(final String pluginName) {SELECTOR_MAP.remove(pluginName);}/*** Clean selector data.*/public void cleanSelectorData() {SELECTOR_MAP.clear();}/*** Clean selector data self.** @param selectorDataList the selector data list*/public void cleanSelectorDataSelf(final List<SelectorData> selectorDataList) {selectorDataList.forEach(this::removeSelectData);}/*** Obtain selector data list list.** @param pluginName the plugin name* @return the list*/public List<SelectorData> obtainSelectorData(final String pluginName) {return SELECTOR_MAP.get(pluginName);}/*** Cache rule data.** @param ruleData the rule data*/public void cacheRuleData(final RuleData ruleData) {Optional.ofNullable(ruleData).ifPresent(this::ruleAccept);}/*** Remove rule data.** @param ruleData the rule data*/public void removeRuleData(final RuleData ruleData) {Optional.ofNullable(ruleData).ifPresent(data -> {final List<RuleData> ruleDataList = RULE_MAP.get(data.getSelectorId());Optional.ofNullable(ruleDataList).ifPresent(list -> list.removeIf(rule -> rule.getId().equals(data.getId())));});}/*** Remove rule data by selector id.** @param selectorId the selector id*/public void removeRuleDataBySelectorId(final String selectorId) {RULE_MAP.remove(selectorId);}/*** Clean rule data.*/public void cleanRuleData() {RULE_MAP.clear();}/*** Clean rule data self.** @param ruleDataList the rule data list*/public void cleanRuleDataSelf(final List<RuleData> ruleDataList) {ruleDataList.forEach(this::removeRuleData);}/*** Obtain rule data list list.** @param selectorId the selector id* @return the list*/public List<RuleData> obtainRuleData(final String selectorId) {return RULE_MAP.get(selectorId);}/***  cache rule data.** @param data the rule data*/private void ruleAccept(final RuleData data) {String selectorId = data.getSelectorId();synchronized (RULE_MAP) {if (RULE_MAP.containsKey(selectorId)) {List<RuleData> existList = RULE_MAP.get(selectorId);final List<RuleData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());resultList.add(data);final List<RuleData> collect = resultList.stream().sorted(Comparator.comparing(RuleData::getSort)).collect(Collectors.toList());RULE_MAP.put(selectorId, collect);} else {RULE_MAP.put(selectorId, Lists.newArrayList(data));}}}/*** cache selector data.** @param data the selector data*/private void selectorAccept(final SelectorData data) {String key = data.getPluginName();synchronized (SELECTOR_MAP) {if (SELECTOR_MAP.containsKey(key)) {List<SelectorData> existList = SELECTOR_MAP.get(key);final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());resultList.add(data);final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());SELECTOR_MAP.put(key, collect);} else {SELECTOR_MAP.put(key, Lists.newArrayList(data));}}}
}

配置解析

接下来,我们找到gateway的配置文件

通过分析此配置,我们可以找到以下配置类

public class WebsocketConfig {/*** if have more shenyu admin url,please config like this.* 127.0.0.1:8888,127.0.0.1:8889*/private String urls;/*** get urls.** @return urls*/public String getUrls() {return urls;}/*** set urls.** @param urls urls*/public void setUrls(final String urls) {this.urls = urls;}@Overridepublic boolean equals(final Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}WebsocketConfig that = (WebsocketConfig) o;return Objects.equals(urls, that.urls);}@Overridepublic int hashCode() {return Objects.hash(urls);}@Overridepublic String toString() {return "WebsocketConfig{"+ "urls='"+ urls+ '\''+ '}';}
}

通过WebsocketConfig 我们可以找到WebsocketSyncDataConfiguration,该类注册了两个Bean

@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")
public class WebsocketSyncDataConfiguration {private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncDataConfiguration.class);/*** Websocket sync data service.** @param websocketConfig   the websocket config* @param pluginSubscriber the plugin subscriber* @param metaSubscribers   the meta subscribers* @param authSubscribers   the auth subscribers* @return the sync data service*/@Beanpublic SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {LOGGER.info("you use websocket sync shenyu data.......");return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));}/*** Config websocket config.** @return the websocket config*/@Bean@ConfigurationProperties(prefix = "shenyu.sync.websocket")public WebsocketConfig websocketConfig() {return new WebsocketConfig();}}

接下来我们观察WebsocketSyncDataService这个类

public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {/*** logger. */private static final Logger LOG = LoggerFactory.getLogger(WebsocketSyncDataService.class);private final List<WebSocketClient> clients = new ArrayList<>();private final ScheduledThreadPoolExecutor executor;/*** Instantiates a new Websocket sync cache.** @param websocketConfig      the websocket config* @param pluginDataSubscriber the plugin data subscriber* @param metaDataSubscribers  the meta data subscribers* @param authDataSubscribers  the auth data subscribers*/public WebsocketSyncDataService(final WebsocketConfig websocketConfig,final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers,final List<AuthDataSubscriber> authDataSubscribers) {// 以逗号分隔admin的urlString[] urls = StringUtils.split(websocketConfig.getUrls(), ",");// 根据url的个数创建定时任务线程池executor = new ScheduledThreadPoolExecutor(urls.length, ShenyuThreadFactory.create("websocket-connect", true));// 循环遍历,为所有client设置数据for (String url : urls) {try {clients.add(new ShenyuWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));} catch (URISyntaxException e) {LOG.error("websocket url({}) is error", url, e);}}try {// 循环遍历,为所有client建立连接for (WebSocketClient client : clients) {boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);if (success) {LOG.info("websocket connection is successful.....");} else {LOG.error("websocket connection is error.....");}// 执行定时任务,每隔10秒执行一次// 主要作用是判断websocket连接是否已经断开,如果已经断开,则尝试重连。// 如果没有断开,就进行 ping-pong 检测executor.scheduleAtFixedRate(() -> {try {if (client.isClosed()) {boolean reconnectSuccess = client.reconnectBlocking();if (reconnectSuccess) {LOG.info("websocket reconnect server[{}] is successful.....", client.getURI().toString());} else {LOG.error("websocket reconnection server[{}] is error.....", client.getURI().toString());}} else {client.sendPing();LOG.debug("websocket send to [{}] ping message successful", client.getURI().toString());}} catch (InterruptedException e) {LOG.error("websocket connect is error :{}", e.getMessage());}}, 10, 10, TimeUnit.SECONDS);}} catch (InterruptedException e) {LOG.info("websocket connection...exception....", e);}}@Overridepublic void close() {for (WebSocketClient client : clients) {if (!client.isClosed()) {client.close();}}if (Objects.nonNull(executor)) {executor.shutdown();}}
}

验证猜想

增量数据同步

Java Line Breakpoints

  • CommonPluginDataSubscriber.java:154
  • CommonPluginDataSubscriber.java:141
  • CommonPluginDataSubscriber.java:96
  • SelectorDataHandler.java:50
  • AbstractDataHandler.java:77
  • WebsocketDataHandler.java:59
  • ShenyuWebsocketClient.java:89
  • WebsocketCollector.java:131
  • WebsocketDataChangedListener.java:48
  • DataChangedEventDispatcher.java:64
  • SelectorServiceImpl.java:318
  • SelectorServiceImpl.java:152
  • SelectorController.java:92
  • AbstractCircuitBreaker.java:123

admin




gateway




全量数据同步

Java Line Breakpoints

  • WebsocketCollector.java:86
  • ShenyuWebsocketClient.java:66
    启动时

gateway

admin

总结

和猜想一致

ShenYu网关数据同步源码分析相关推荐

  1. Android SQLite多线程读写和线程同步源码分析

    没啥诀窍,只需保证几个线程都是用的一个SQLiteDataBase对象就行了. 如果我们非要在不同线程中用两个或更多的SQLiteDataBase对象呢,当然这些SQLiteDataBase对象所操作 ...

  2. Tomcat集群应用同步 —— 源码分析

    文章目录 前言 一.应用同步的配置与实现原理 二.应用同步源码分析 三.如何获取集群的节点列表 四.通讯模块Tribe 五.集群的Session同步 六.集群的Session共享 总结 前言 相信大家 ...

  3. django之:网页伪静态 JsonResponse form表单携带文件数据 CBV源码分析 模板语法传值 模板语法之过滤器 标签 自定义标签函数 过滤器、inclusion_tag模板的继承导入

    目录标题 一:网页伪静态 1.定义 2.如何实现 二:视图层 1.视图函数返回值问题 2.视图层返回json格式的数据 3.form表单携带文件数据 4.CBV源码分析 1.CBV和FBV: 2.CB ...

  4. SpringCloud Gateway微服务网关实战与源码分析-上

    概述 定义 Spring Cloud Gateway 官网地址 https://spring.io/projects/spring-cloud-gateway/ 最新版本3.1.3 Spring Cl ...

  5. Lpms-B2 IMU数据采源码分析 及 TCP/IP握手简单分析

    数据采集代码 源码的数据采集程序,可见第38行其中使用了pollData和update进行数据采集. void LpmsSensorManager::run(void) {MicroMeasure m ...

  6. ucos-II 任务间同步源码分析(一)

    ucos-II 任务间的同步主要有三个部分,信号量.互斥性信号量和事件标志组,前两者都基于事件机制完成(见ucos-II 事件控制块).本文主要记录信号量和互斥信号量的分析. 1. 信号量SEM的主要 ...

  7. solr dataimport 数据导入源码分析(二)

    上文说由DataImporter类进一步处理,DataImporter类的简要代码如下 /******************************************************* ...

  8. Redis源码分析之PSYNC同步

    Redis master-slave 同步源码分析 (1)slave 流程分析 (2)master 流程分析 Slave 分析 当Redis 启动后,会每隔 1s 调用 replicationCron ...

  9. CopyOnWriteArrayList实现原理及源码分析

    点击上方"方志朋",选择"置顶或者星标" 你的关注意义重大! CopyOnWriteArrayList是Java并发包中提供的一个并发容器,它是个线程安全且读操 ...

  10. NSQ源码分析之Topic

    什么是Topic Topic作为nsqd的重要组成部分,里面存在一些有趣的设计,单独开一篇文章进行学习. 每个nsqd实例旨在一次处理多个数据流.这些数据流称为"topics",一 ...

最新文章

  1. ios时间差,以时间格式显示
  2. 怎么监控一个接口的传输数据_监控安装超详细教学教程,学会又多一门技能
  3. 【C语言】06-基本数据类型
  4. GitHub被中国人霸榜!国外开发者不开心了
  5. vue组件的生命周期和执行过程
  6. JAVA进阶day05包和权限
  7. 织梦CMS内核宝宝算命取名企业模板
  8. (三)设置Jenkins为MLOps构建CI/CD管道
  9. keil无法生成axf文件之解决方法
  10. 双系统下Ubuntu安装教程
  11. SIFT特征原理与理解
  12. html不用ajax怎么提交,停止HTML中JS AJAX功能提交按钮
  13. SetTimer函数和 KillTimer函数
  14. springboot发送垃圾邮件
  15. linux登录界面鼠标键盘失灵,在archlinux安装界面这卡住了,鼠标键盘失灵
  16. c语言printf函数中的格式控制字符串,C++_C语言格式化输入输出函数详解,一:格式输出函数printf() 1 - phpStudy...
  17. 软件设计师——100
  18. Microsoft word 中的题注修改后更新的问题
  19. pg_hba.conf 中 md5 和 scram-sha-256 的区别
  20. 广丰计算机技术学院,广丰区五都镇中学祝晓旺——信息技术教育的拓荒者

热门文章

  1. 云队友丨十年寒窗苦读为什么赢不过几代人的努力?
  2. 如何在PPT中插入HTML页面|如何使用控件将Pyecharts图表插入PPT|ActiveX
  3. android中pdf转换成图片格式,Android-PDF转图片
  4. html中动态添加元素属性值,JavaScript实现动态添加、移除元素或属性的方法分析...
  5. Flutter 旋转动画
  6. 网页实现中英文切换方式对比与实现
  7. 计算机键盘中英文,电脑键盘中英文切换键
  8. 使用技巧-输出彩色TIF格式分类结果
  9. linux更换steam目录,终于可以在Linux上愉快地玩耍Steam啦
  10. Cityscapes数据集介绍