ShenYu网关数据同步源码分析
目录
- admin数据同步
- 从SelectorController讲起
- 发布事件
- ConfigGroupEnum
- DataEventTypeEnum
- DataChangedEventDispatcher
- 配置解析
- gateway数据同步
- ShenyuWebsocketClient
- WebsocketDataHandler
- AbstractDataHandler
- CommonPluginDataSubscriber
- BaseDataCache
- 配置解析
- 验证猜想
- 增量数据同步
- 全量数据同步
- 总结
当在后台管理系统中,数据发生了更新后,如何将更新的数据同步到网关中呢?
ShenYu支持多种同步方式,本文以WebSocket为例分析。
admin数据同步
从SelectorController讲起
一般情况下像admin这种后台管理系统,启动的时候应该会全量的同步一次数据,后续如果发生修改,会增量同步数据。所以我们在admin的controller下,可以找到SelectorController的createSelector方法,从开始createSelector分析。
@PostMapping("")
public ShenyuAdminResult createSelector(@Valid @RequestBody final SelectorDTO selectorDTO) {Integer createCount = selectorService.createOrUpdate(selectorDTO);return ShenyuAdminResult.success(ShenyuResultMessage.CREATE_SUCCESS, createCount);
}
具体实现如下
@RequiredArgsConstructor
@Service
public class SelectorServiceImpl implements SelectorService {// 负责事件发布的eventPublisherprivate final ApplicationEventPublisher eventPublisher;@Override@Transactional(rollbackFor = Exception.class)public int createOrUpdate(final SelectorDTO selectorDTO) {int selectorCount;// 构建数据 DTO --> DOSelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);List<SelectorConditionDTO> selectorConditionDTOs = selectorDTO.getSelectorConditions();// 判断是添加还是更新if (StringUtils.isEmpty(selectorDTO.getId())) {// 插入选择器数据selectorCount = selectorMapper.insertSelective(selectorDO);// 插入选择器中的条件数据selectorConditionDTOs.forEach(selectorConditionDTO -> {selectorConditionDTO.setSelectorId(selectorDO.getId());selectorConditionMapper.insertSelective(SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO));});// check selector add// 权限检查if (dataPermissionMapper.listByUserId(JwtUtils.getUserInfo().getUserId()).size() > 0) {DataPermissionDTO dataPermissionDTO = new DataPermissionDTO();dataPermissionDTO.setUserId(JwtUtils.getUserInfo().getUserId());dataPermissionDTO.setDataId(selectorDO.getId());dataPermissionDTO.setDataType(AdminConstants.SELECTOR_DATA_TYPE);dataPermissionMapper.insertSelective(DataPermissionDO.buildPermissionDO(dataPermissionDTO));}} else {// 更新数据,先删除再新增selectorCount = selectorMapper.updateSelective(selectorDO);//delete rule condition then addselectorConditionMapper.deleteByQuery(new SelectorConditionQuery(selectorDO.getId()));selectorConditionDTOs.forEach(selectorConditionDTO -> {selectorConditionDTO.setSelectorId(selectorDO.getId());SelectorConditionDO selectorConditionDO = SelectorConditionDO.buildSelectorConditionDO(selectorConditionDTO);selectorConditionMapper.insertSelective(selectorConditionDO);});}// 发布事件publishEvent(selectorDO, selectorConditionDTOs);// 更新upstreamupdateDivideUpstream(selectorDO);return selectorCount;}// ......}
浏览以上代码,我们猜测和同步有关系的方法,如下:
发布事件
publishEvent(selectorDO, selectorConditionDTOs);
发布事件
private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());List<ConditionData> conditionDataList =selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());// publish change event.eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
}
eventPublisher的对象为org.springframework.context.ApplicationEventPublisher。那么他一定是通过Spring的事件发布机制实现的。
Spring的事件发布机制,需要有发布者、监听者、以及事件。目前我们已经找到了发布者,从上面源码中我们可以知道,事件为DataChangedEvent。
public class DataChangedEvent extends ApplicationEvent {private final DataEventTypeEnum eventType;private final ConfigGroupEnum groupKey;public DataChangedEvent(final ConfigGroupEnum groupKey, final DataEventTypeEnum type, final List<?> source) {super(source.stream().filter(Objects::nonNull).collect(Collectors.toList()));this.eventType = type;this.groupKey = groupKey;}
}
ConfigGroupEnum
从ConfigGroupEnum中我们可以知道,admin需要与gateway同步的数据可能有插件、规则、选择器、元数据以及鉴权信息。
public enum ConfigGroupEnum {/*** App auth config group enum.*/APP_AUTH,/*** Plugin config group enum.*/PLUGIN,/*** Rule config group enum.*/RULE,/*** Selector config group enum.*/SELECTOR,/*** Meta data config group enum.*/META_DATA;/*** Acquire by name config group enum.** @param name the name* @return the config group enum*/public static ConfigGroupEnum acquireByName(final String name) {return Arrays.stream(ConfigGroupEnum.values()).filter(e -> Objects.equals(e.name(), name)).findFirst().orElseThrow(() -> new ShenyuException(String.format(" this ConfigGroupEnum can not support %s", name)));}
}
DataEventTypeEnum
从DataEventTypeEnum 我们可以知道,以下事件可能会触发同步,删除、创建、更新、刷新以及第一次的全量同步(我们猜测MYSELF,为全量标识)
public enum DataEventTypeEnum {/*** delete event.*/DELETE,/*** insert event.*/CREATE,/*** update event.*/UPDATE,/*** REFRESH data event type enum.*/REFRESH,/*** Myself data event type enum.*/MYSELF;/*** Acquire by name data event type enum.** @param name the name* @return the data event type enum*/public static DataEventTypeEnum acquireByName(final String name) {return Arrays.stream(DataEventTypeEnum.values()).filter(e -> Objects.equals(e.name(), name)).findFirst().orElseThrow(() -> new ShenyuException(String.format(" this DataEventTypeEnum can not support %s", name)));}
}
DataChangedEventDispatcher
监听者一般是实现了ApplicationListener接口的,于是我们全局搜索发现了下面这个类
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {private final ApplicationContext applicationContext;private List<DataChangedListener> listeners;public DataChangedEventDispatcher(final ApplicationContext applicationContext) {this.applicationContext = applicationContext;}@Override@SuppressWarnings("unchecked")public void onApplicationEvent(final DataChangedEvent event) {for (DataChangedListener listener : listeners) {switch (event.getGroupKey()) {case APP_AUTH:listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());break;case PLUGIN:listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());break;case RULE:listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());break;case SELECTOR:listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());break;case META_DATA:listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());break;default:throw new IllegalStateException("Unexpected value: " + event.getGroupKey());}}}@Overridepublic void afterPropertiesSet() {Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));}
}
配置解析
继续往下走我们发现有以下几个类
既然有这么多种实现,那admin又是怎么选择的呢?这里我们可以回想一下,在项目启动的时候,我们配置了以下同步属性
通过搜寻"shenyu.sync.websocket",我们找到了WebsocketSyncProperties
@ConfigurationProperties(prefix = "shenyu.sync.websocket")
public class WebsocketSyncProperties {/*** default: true.*/private boolean enabled = true;/*** Gets the value of enabled.** @return the value of enabled*/public boolean isEnabled() {return enabled;}/*** Sets the enabled.** @param enabled enabled*/public void setEnabled(final boolean enabled) {this.enabled = enabled;}
}
但是该配置文件只有是否开启websocket,并没有注册WebsocketDataChangedListener等通信相关的Bean,所以我们通过WebsocketSyncProperties 继续往下找,找到了下面这个类,这正是我们想要的~
@Configuration
// 如果shenyu.sync.websocket.enabled=true,那么就会走这个配置
// 这里默认为true
@ConditionalOnProperty(name = "shenyu.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(WebsocketSyncProperties.class)
static class WebsocketListener {/*** Config event listener data changed listener.** @return the data changed listener*/@Bean@ConditionalOnMissingBean(WebsocketDataChangedListener.class)public DataChangedListener websocketDataChangedListener() {return new WebsocketDataChangedListener();}/*** Websocket collector.** @return the websocket collector*/@Bean@ConditionalOnMissingBean(WebsocketCollector.class)public WebsocketCollector websocketCollector() {return new WebsocketCollector();}/*** Server endpoint exporter server endpoint exporter.** @return the server endpoint exporter*/@Bean@ConditionalOnMissingBean(ServerEndpointExporter.class)public ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
回到listener,这里我们选择WebsocketDataChangedListener
public class WebsocketDataChangedListener implements DataChangedListener {@Overridepublic void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {// 组装WebsocketData数据WebsocketData<SelectorData> websocketData =new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);// 通信WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);}
}
我们再来看WebsocketCollector.send这个方法(WebsocketCollector正好也是前面注册过的Bean)
@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)
public class WebsocketCollector {private static final Logger LOG = LoggerFactory.getLogger(WebsocketCollector.class);private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();private static final String SESSION_KEY = "sessionKey";/*** Send.** @param message the message* @param type the type*/public static void send(final String message, final DataEventTypeEnum type) {if (StringUtils.isBlank(message)) {return;}// 这里侧面证实了我们的猜想,DataEventTypeEnum.MYSELF为全量同步// 因为这里创建了需要通信的websocket的sessionif (DataEventTypeEnum.MYSELF == type) {Session session = (Session) ThreadLocalUtils.get(SESSION_KEY);if (Objects.nonNull(session)) {sendMessageBySession(session, message);}} else {SESSION_SET.forEach(session -> sendMessageBySession(session, message));}}private static synchronized void sendMessageBySession(final Session session, final String message) {try {// 通过websocket的session把消息发送出去session.getBasicRemote().sendText(message);} catch (IOException e) {LOG.error("websocket send result is exception: ", e);}}// ......
}
gateway数据同步
ShenyuWebsocketClient
有了数据的发送,那肯定得有接收,我们可以在shenyu-sync-data-websocket下面找到ShenyuWebsocketClient这个类,他继承了WebSocketClient。
public final class ShenyuWebsocketClient extends WebSocketClient {/*** logger.*/private static final Logger LOG = LoggerFactory.getLogger(ShenyuWebsocketClient.class);private volatile boolean alreadySync = Boolean.FALSE;private final WebsocketDataHandler websocketDataHandler;/*** Instantiates a new shenyu websocket client.** @param serverUri the server uri* @param pluginDataSubscriber the plugin data subscriber* @param metaDataSubscribers the meta data subscribers* @param authDataSubscribers the auth data subscribers*/public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {super(serverUri);this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);}@Overridepublic void onOpen(final ServerHandshake serverHandshake) {、// 如果没有同步过,那么启动的时候会去同步一次if (!alreadySync) {// 再次验证了MYSELF为,全量同步标识send(DataEventTypeEnum.MYSELF.name());alreadySync = true;}}@Overridepublic void onMessage(final String result) {handleResult(result);}@Overridepublic void onClose(final int i, final String s, final boolean b) {this.close();}@Overridepublic void onError(final Exception e) {this.close();}@SuppressWarnings("ALL")private void handleResult(final String result) {LOG.info("handleResult({})", result);WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());String eventType = websocketData.getEventType();String json = GsonUtils.getInstance().toJson(websocketData.getData());websocketDataHandler.executor(groupEnum, json, eventType);}
}
WebsocketDataHandler
浏览了一下ShenyuWebsocketClient,发现websocketDataHandler.executor(groupEnum, json, eventType);为关键方法,具体实现如下
public class WebsocketDataHandler {private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);/*** Instantiates a new Websocket data handler.** @param pluginDataSubscriber the plugin data subscriber* @param metaDataSubscribers the meta data subscribers* @param authDataSubscribers the auth data subscribers*/public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers,final List<AuthDataSubscriber> authDataSubscribers) {ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));}/*** Executor.** @param type the type* @param json the json* @param eventType the event type*/public void executor(final ConfigGroupEnum type, final String json, final String eventType) {ENUM_MAP.get(type).handle(json, eventType);}
}
该类往内存中放入了五种handler,这五种与ConfigGroupEnum的类型一一对应。接下来调用此方法
ENUM_MAP.get(type).handle(json, eventType);
AbstractDataHandler
他是在AbstractDataHandler这个抽象方法里面做分发的
public abstract class AbstractDataHandler<T> implements DataHandler {@Overridepublic void handle(final String json, final String eventType) {List<T> dataList = convert(json);if (CollectionUtils.isEmpty(dataList)) {return;}DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);switch (eventTypeEnum) {case REFRESH:case MYSELF:doRefresh(dataList);break;case UPDATE:case CREATE:doUpdate(dataList);break;case DELETE:doDelete(dataList);break;default:break;}}
}
因为我们是新增了一个选择器,所以最终会调用SelectorDataHandler.doUpdate
@Override
protected void doUpdate(final List<SelectorData> dataList) {dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);
}
CommonPluginDataSubscriber
查找onSelectorSubscribe方法,我们能找到CommonPluginDataSubscriber类
public class CommonPluginDataSubscriber implements PluginDataSubscriber {@Overridepublic void onSelectorSubscribe(final SelectorData selectorData) {subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);}
}
subscribeDataHandler具体实现
// 订阅数据处理程序
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {Optional.ofNullable(classData).ifPresent(data -> {// 插件执行的操作if (data instanceof PluginData) {PluginData pluginData = (PluginData) data;if (dataType == DataEventTypeEnum.UPDATE) {BaseDataCache.getInstance().cachePluginData(pluginData);Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));} else if (dataType == DataEventTypeEnum.DELETE) {BaseDataCache.getInstance().removePluginData(pluginData);Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));}} // 选择器执行的操作else if (data instanceof SelectorData) {SelectorData selectorData = (SelectorData) data;if (dataType == DataEventTypeEnum.UPDATE) {// 将数据缓存到内存BaseDataCache.getInstance().cacheSelectData(selectorData);// 如果每个插件还有自己的处理逻辑,那么就去处理自己的逻辑Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));} else if (dataType == DataEventTypeEnum.DELETE) {BaseDataCache.getInstance().removeSelectData(selectorData);Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));}} // 规则执行的操作else if (data instanceof RuleData) {RuleData ruleData = (RuleData) data;if (dataType == DataEventTypeEnum.UPDATE) {BaseDataCache.getInstance().cacheRuleData(ruleData);Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));} else if (dataType == DataEventTypeEnum.DELETE) {BaseDataCache.getInstance().removeRuleData(ruleData);Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));}}});
}
BaseDataCache
调用BaseDataCache.getInstance().cacheSelectData(selectorData);将数据放入BaseDataCache这个单例内存中。
public final class BaseDataCache {private static final BaseDataCache INSTANCE = new BaseDataCache();/*** pluginName -> PluginData.*/private static final ConcurrentMap<String, PluginData> PLUGIN_MAP = Maps.newConcurrentMap();/*** pluginName -> SelectorData.*/private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();/*** selectorId -> RuleData.*/private static final ConcurrentMap<String, List<RuleData>> RULE_MAP = Maps.newConcurrentMap();private BaseDataCache() {}/*** Gets instance.** @return the instance*/public static BaseDataCache getInstance() {return INSTANCE;}/*** Cache plugin data.** @param pluginData the plugin data*/public void cachePluginData(final PluginData pluginData) {Optional.ofNullable(pluginData).ifPresent(data -> PLUGIN_MAP.put(data.getName(), data));}/*** Remove plugin data.** @param pluginData the plugin data*/public void removePluginData(final PluginData pluginData) {Optional.ofNullable(pluginData).ifPresent(data -> PLUGIN_MAP.remove(data.getName()));}/*** Remove plugin data by plugin name.** @param pluginName the plugin name*/public void removePluginDataByPluginName(final String pluginName) {PLUGIN_MAP.remove(pluginName);}/*** Clean plugin data.*/public void cleanPluginData() {PLUGIN_MAP.clear();}/*** Clean plugin data self.** @param pluginDataList the plugin data list*/public void cleanPluginDataSelf(final List<PluginData> pluginDataList) {pluginDataList.forEach(this::removePluginData);}/*** Obtain plugin data plugin data.** @param pluginName the plugin name* @return the plugin data*/public PluginData obtainPluginData(final String pluginName) {return PLUGIN_MAP.get(pluginName);}/*** Cache select data.** @param selectorData the selector data*/public void cacheSelectData(final SelectorData selectorData) {Optional.ofNullable(selectorData).ifPresent(this::selectorAccept);}/*** Remove select data.** @param selectorData the selector data*/public void removeSelectData(final SelectorData selectorData) {Optional.ofNullable(selectorData).ifPresent(data -> {final List<SelectorData> selectorDataList = SELECTOR_MAP.get(data.getPluginName());Optional.ofNullable(selectorDataList).ifPresent(list -> list.removeIf(e -> e.getId().equals(data.getId())));});}/*** Remove select data by plugin name.** @param pluginName the plugin name*/public void removeSelectDataByPluginName(final String pluginName) {SELECTOR_MAP.remove(pluginName);}/*** Clean selector data.*/public void cleanSelectorData() {SELECTOR_MAP.clear();}/*** Clean selector data self.** @param selectorDataList the selector data list*/public void cleanSelectorDataSelf(final List<SelectorData> selectorDataList) {selectorDataList.forEach(this::removeSelectData);}/*** Obtain selector data list list.** @param pluginName the plugin name* @return the list*/public List<SelectorData> obtainSelectorData(final String pluginName) {return SELECTOR_MAP.get(pluginName);}/*** Cache rule data.** @param ruleData the rule data*/public void cacheRuleData(final RuleData ruleData) {Optional.ofNullable(ruleData).ifPresent(this::ruleAccept);}/*** Remove rule data.** @param ruleData the rule data*/public void removeRuleData(final RuleData ruleData) {Optional.ofNullable(ruleData).ifPresent(data -> {final List<RuleData> ruleDataList = RULE_MAP.get(data.getSelectorId());Optional.ofNullable(ruleDataList).ifPresent(list -> list.removeIf(rule -> rule.getId().equals(data.getId())));});}/*** Remove rule data by selector id.** @param selectorId the selector id*/public void removeRuleDataBySelectorId(final String selectorId) {RULE_MAP.remove(selectorId);}/*** Clean rule data.*/public void cleanRuleData() {RULE_MAP.clear();}/*** Clean rule data self.** @param ruleDataList the rule data list*/public void cleanRuleDataSelf(final List<RuleData> ruleDataList) {ruleDataList.forEach(this::removeRuleData);}/*** Obtain rule data list list.** @param selectorId the selector id* @return the list*/public List<RuleData> obtainRuleData(final String selectorId) {return RULE_MAP.get(selectorId);}/*** cache rule data.** @param data the rule data*/private void ruleAccept(final RuleData data) {String selectorId = data.getSelectorId();synchronized (RULE_MAP) {if (RULE_MAP.containsKey(selectorId)) {List<RuleData> existList = RULE_MAP.get(selectorId);final List<RuleData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());resultList.add(data);final List<RuleData> collect = resultList.stream().sorted(Comparator.comparing(RuleData::getSort)).collect(Collectors.toList());RULE_MAP.put(selectorId, collect);} else {RULE_MAP.put(selectorId, Lists.newArrayList(data));}}}/*** cache selector data.** @param data the selector data*/private void selectorAccept(final SelectorData data) {String key = data.getPluginName();synchronized (SELECTOR_MAP) {if (SELECTOR_MAP.containsKey(key)) {List<SelectorData> existList = SELECTOR_MAP.get(key);final List<SelectorData> resultList = existList.stream().filter(r -> !r.getId().equals(data.getId())).collect(Collectors.toList());resultList.add(data);final List<SelectorData> collect = resultList.stream().sorted(Comparator.comparing(SelectorData::getSort)).collect(Collectors.toList());SELECTOR_MAP.put(key, collect);} else {SELECTOR_MAP.put(key, Lists.newArrayList(data));}}}
}
配置解析
接下来,我们找到gateway的配置文件
通过分析此配置,我们可以找到以下配置类
public class WebsocketConfig {/*** if have more shenyu admin url,please config like this.* 127.0.0.1:8888,127.0.0.1:8889*/private String urls;/*** get urls.** @return urls*/public String getUrls() {return urls;}/*** set urls.** @param urls urls*/public void setUrls(final String urls) {this.urls = urls;}@Overridepublic boolean equals(final Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}WebsocketConfig that = (WebsocketConfig) o;return Objects.equals(urls, that.urls);}@Overridepublic int hashCode() {return Objects.hash(urls);}@Overridepublic String toString() {return "WebsocketConfig{"+ "urls='"+ urls+ '\''+ '}';}
}
通过WebsocketConfig 我们可以找到WebsocketSyncDataConfiguration,该类注册了两个Bean
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")
public class WebsocketSyncDataConfiguration {private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncDataConfiguration.class);/*** Websocket sync data service.** @param websocketConfig the websocket config* @param pluginSubscriber the plugin subscriber* @param metaSubscribers the meta subscribers* @param authSubscribers the auth subscribers* @return the sync data service*/@Beanpublic SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {LOGGER.info("you use websocket sync shenyu data.......");return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));}/*** Config websocket config.** @return the websocket config*/@Bean@ConfigurationProperties(prefix = "shenyu.sync.websocket")public WebsocketConfig websocketConfig() {return new WebsocketConfig();}}
接下来我们观察WebsocketSyncDataService这个类
public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {/*** logger. */private static final Logger LOG = LoggerFactory.getLogger(WebsocketSyncDataService.class);private final List<WebSocketClient> clients = new ArrayList<>();private final ScheduledThreadPoolExecutor executor;/*** Instantiates a new Websocket sync cache.** @param websocketConfig the websocket config* @param pluginDataSubscriber the plugin data subscriber* @param metaDataSubscribers the meta data subscribers* @param authDataSubscribers the auth data subscribers*/public WebsocketSyncDataService(final WebsocketConfig websocketConfig,final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers,final List<AuthDataSubscriber> authDataSubscribers) {// 以逗号分隔admin的urlString[] urls = StringUtils.split(websocketConfig.getUrls(), ",");// 根据url的个数创建定时任务线程池executor = new ScheduledThreadPoolExecutor(urls.length, ShenyuThreadFactory.create("websocket-connect", true));// 循环遍历,为所有client设置数据for (String url : urls) {try {clients.add(new ShenyuWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));} catch (URISyntaxException e) {LOG.error("websocket url({}) is error", url, e);}}try {// 循环遍历,为所有client建立连接for (WebSocketClient client : clients) {boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);if (success) {LOG.info("websocket connection is successful.....");} else {LOG.error("websocket connection is error.....");}// 执行定时任务,每隔10秒执行一次// 主要作用是判断websocket连接是否已经断开,如果已经断开,则尝试重连。// 如果没有断开,就进行 ping-pong 检测executor.scheduleAtFixedRate(() -> {try {if (client.isClosed()) {boolean reconnectSuccess = client.reconnectBlocking();if (reconnectSuccess) {LOG.info("websocket reconnect server[{}] is successful.....", client.getURI().toString());} else {LOG.error("websocket reconnection server[{}] is error.....", client.getURI().toString());}} else {client.sendPing();LOG.debug("websocket send to [{}] ping message successful", client.getURI().toString());}} catch (InterruptedException e) {LOG.error("websocket connect is error :{}", e.getMessage());}}, 10, 10, TimeUnit.SECONDS);}} catch (InterruptedException e) {LOG.info("websocket connection...exception....", e);}}@Overridepublic void close() {for (WebSocketClient client : clients) {if (!client.isClosed()) {client.close();}}if (Objects.nonNull(executor)) {executor.shutdown();}}
}
验证猜想
增量数据同步
Java Line Breakpoints
- CommonPluginDataSubscriber.java:154
- CommonPluginDataSubscriber.java:141
- CommonPluginDataSubscriber.java:96
- SelectorDataHandler.java:50
- AbstractDataHandler.java:77
- WebsocketDataHandler.java:59
- ShenyuWebsocketClient.java:89
- WebsocketCollector.java:131
- WebsocketDataChangedListener.java:48
- DataChangedEventDispatcher.java:64
- SelectorServiceImpl.java:318
- SelectorServiceImpl.java:152
- SelectorController.java:92
- AbstractCircuitBreaker.java:123
admin
gateway
全量数据同步
Java Line Breakpoints
- WebsocketCollector.java:86
- ShenyuWebsocketClient.java:66
启动时
gateway
admin
总结
和猜想一致
ShenYu网关数据同步源码分析相关推荐
- Android SQLite多线程读写和线程同步源码分析
没啥诀窍,只需保证几个线程都是用的一个SQLiteDataBase对象就行了. 如果我们非要在不同线程中用两个或更多的SQLiteDataBase对象呢,当然这些SQLiteDataBase对象所操作 ...
- Tomcat集群应用同步 —— 源码分析
文章目录 前言 一.应用同步的配置与实现原理 二.应用同步源码分析 三.如何获取集群的节点列表 四.通讯模块Tribe 五.集群的Session同步 六.集群的Session共享 总结 前言 相信大家 ...
- django之:网页伪静态 JsonResponse form表单携带文件数据 CBV源码分析 模板语法传值 模板语法之过滤器 标签 自定义标签函数 过滤器、inclusion_tag模板的继承导入
目录标题 一:网页伪静态 1.定义 2.如何实现 二:视图层 1.视图函数返回值问题 2.视图层返回json格式的数据 3.form表单携带文件数据 4.CBV源码分析 1.CBV和FBV: 2.CB ...
- SpringCloud Gateway微服务网关实战与源码分析-上
概述 定义 Spring Cloud Gateway 官网地址 https://spring.io/projects/spring-cloud-gateway/ 最新版本3.1.3 Spring Cl ...
- Lpms-B2 IMU数据采源码分析 及 TCP/IP握手简单分析
数据采集代码 源码的数据采集程序,可见第38行其中使用了pollData和update进行数据采集. void LpmsSensorManager::run(void) {MicroMeasure m ...
- ucos-II 任务间同步源码分析(一)
ucos-II 任务间的同步主要有三个部分,信号量.互斥性信号量和事件标志组,前两者都基于事件机制完成(见ucos-II 事件控制块).本文主要记录信号量和互斥信号量的分析. 1. 信号量SEM的主要 ...
- solr dataimport 数据导入源码分析(二)
上文说由DataImporter类进一步处理,DataImporter类的简要代码如下 /******************************************************* ...
- Redis源码分析之PSYNC同步
Redis master-slave 同步源码分析 (1)slave 流程分析 (2)master 流程分析 Slave 分析 当Redis 启动后,会每隔 1s 调用 replicationCron ...
- CopyOnWriteArrayList实现原理及源码分析
点击上方"方志朋",选择"置顶或者星标" 你的关注意义重大! CopyOnWriteArrayList是Java并发包中提供的一个并发容器,它是个线程安全且读操 ...
- NSQ源码分析之Topic
什么是Topic Topic作为nsqd的重要组成部分,里面存在一些有趣的设计,单独开一篇文章进行学习. 每个nsqd实例旨在一次处理多个数据流.这些数据流称为"topics",一 ...
最新文章
- ios时间差,以时间格式显示
- 怎么监控一个接口的传输数据_监控安装超详细教学教程,学会又多一门技能
- 【C语言】06-基本数据类型
- GitHub被中国人霸榜!国外开发者不开心了
- vue组件的生命周期和执行过程
- JAVA进阶day05包和权限
- 织梦CMS内核宝宝算命取名企业模板
- (三)设置Jenkins为MLOps构建CI/CD管道
- keil无法生成axf文件之解决方法
- 双系统下Ubuntu安装教程
- SIFT特征原理与理解
- html不用ajax怎么提交,停止HTML中JS AJAX功能提交按钮
- SetTimer函数和 KillTimer函数
- springboot发送垃圾邮件
- linux登录界面鼠标键盘失灵,在archlinux安装界面这卡住了,鼠标键盘失灵
- c语言printf函数中的格式控制字符串,C++_C语言格式化输入输出函数详解,一:格式输出函数printf()
1 - phpStudy...
- 软件设计师——100
- Microsoft word 中的题注修改后更新的问题
- pg_hba.conf 中 md5 和 scram-sha-256 的区别
- 广丰计算机技术学院,广丰区五都镇中学祝晓旺——信息技术教育的拓荒者
热门文章
- 云队友丨十年寒窗苦读为什么赢不过几代人的努力?
- 如何在PPT中插入HTML页面|如何使用控件将Pyecharts图表插入PPT|ActiveX
- android中pdf转换成图片格式,Android-PDF转图片
- html中动态添加元素属性值,JavaScript实现动态添加、移除元素或属性的方法分析...
- Flutter 旋转动画
- 网页实现中英文切换方式对比与实现
- 计算机键盘中英文,电脑键盘中英文切换键
- 使用技巧-输出彩色TIF格式分类结果
- linux更换steam目录,终于可以在Linux上愉快地玩耍Steam啦
- Cityscapes数据集介绍