soul数据同步之websocket

1. 在【soul-admin】中开启websocket同步

soul:sync:websocket:enabled: true

2. 在【soul-bootstrap】中添加websocket数据同步依赖

<dependency><groupId>org.dromara</groupId><artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId><version>${project.version}</version>
</dependency>

3.在【soul-bootstrap】中开启websocket同步

soul:sync:websocket:urls: ws://localhost:9095/websocket

4. 这样启动 【soul-admin】和 【soul-bootstrap】,他们之间的数据就完成同步了。

带着问题读源码之【soul-admin】是如何出发推送的逻辑的

上述第一步配置是打开Spring容器里的配置, 将三个Bean加入Spring容器

    /*** The WebsocketListener(default strategy).*/@Configuration@ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)@EnableConfigurationProperties(WebsocketSyncProperties.class)static class WebsocketListener {/*** Config event listener data changed listener.** @return the data changed listener*/@Bean@ConditionalOnMissingBean(WebsocketDataChangedListener.class)public DataChangedListener websocketDataChangedListener() {return new WebsocketDataChangedListener();}/*** Websocket collector websocket collector.** @return the websocket collector*/@Bean@ConditionalOnMissingBean(WebsocketCollector.class)public WebsocketCollector websocketCollector() {return new WebsocketCollector();}/*** Server endpoint exporter server endpoint exporter.** @return the server endpoint exporter*/@Bean@ConditionalOnMissingBean(ServerEndpointExporter.class)public ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}

先看WebSocketController,从他的代码可以猜想,【soul-bootstrap】连接 ws://localhost:9095/websocket 时会发送一个 DataEventTypeEnum.MYSELF 进行全量信息的同步,

@ServerEndpoint("/websocket")
public class WebsocketCollector {private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();private static final String SESSION_KEY = "sessionKey";/*** On open.** @param session the session*/@OnOpenpublic void onOpen(final Session session) {log.info("websocket on open successful....");SESSION_SET.add(session);}/*** On message.** @param message the message* @param session the session*/@OnMessagepublic void onMessage(final String message, final Session session) {if (message.equals(DataEventTypeEnum.MYSELF.name())) {try {ThreadLocalUtil.put(SESSION_KEY, session);SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);} finally {ThreadLocalUtil.clear();}}}/*** On close.** @param session the session*/@OnClosepublic void onClose(final Session session) {SESSION_SET.remove(session);ThreadLocalUtil.clear();}/*** On error.** @param session the session* @param error   the error*/@OnErrorpublic void onError(final Session session, final Throwable error) {SESSION_SET.remove(session);ThreadLocalUtil.clear();log.error("websocket collection error: ", error);}/*** Send.** @param message the message* @param type    the type*/public static void send(final String message, final DataEventTypeEnum type) {if (StringUtils.isNotBlank(message)) {if (DataEventTypeEnum.MYSELF == type) {try {Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);if (session != null) {session.getBasicRemote().sendText(message);}} catch (IOException e) {log.error("websocket send result is exception: ", e);}return;}for (Session session : SESSION_SET) {try {session.getBasicRemote().sendText(message);} catch (IOException e) {log.error("websocket send result is exception: ", e);}}}}
}

再来看 DataChangedListener, 是通过观察着模式来监听数据变更时进行的操作,比如向websocket连接对方推送变更的数据完成增量更新

public class WebsocketDataChangedListener implements DataChangedListener {@Overridepublic void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {WebsocketData<PluginData> websocketData =new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList);WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);}@Overridepublic void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {WebsocketData<SelectorData> websocketData =new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);}@Overridepublic void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) {WebsocketData<RuleData> configData =new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList);WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);}@Overridepublic void onAppAuthChanged(final List<AppAuthData> appAuthDataList, final DataEventTypeEnum eventType) {WebsocketData<AppAuthData> configData =new WebsocketData<>(ConfigGroupEnum.APP_AUTH.name(), eventType.name(), appAuthDataList);WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);}@Overridepublic void onMetaDataChanged(final List<MetaData> metaDataList, final DataEventTypeEnum eventType) {WebsocketData<MetaData> configData =new WebsocketData<>(ConfigGroupEnum.META_DATA.name(), eventType.name(), metaDataList);WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);}}

而 ServerEndpointExporter 只是Spring提供的websocket容器

带着问题读源码之【soul-bootstrap】是如何获取数据的

上述第三步是打开如下配置

@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {/*** Websocket sync data service.** @param websocketConfig   the websocket config* @param pluginSubscriber the plugin subscriber* @param metaSubscribers   the meta subscribers* @param authSubscribers   the auth subscribers* @return the sync data service*/@Beanpublic SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {log.info("you use websocket sync soul data.......");return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));}/*** Config websocket config.** @return the websocket config*/@Bean@ConfigurationProperties(prefix = "soul.sync.websocket")public WebsocketConfig websocketConfig() {return new WebsocketConfig();}
}

WebsocketConfig 是读取 soul.sync.websocket 下的Websocket地址

WebsocketSyncDataService 是完成websocket连接更新数据的逻辑,在构造函数中,初始化websocket连接,并启动后每30s执行一次的定时任务去更新数据

public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {private final List<WebSocketClient> clients = new ArrayList<>();private final ScheduledThreadPoolExecutor executor;/*** Instantiates a new Websocket sync cache.** @param websocketConfig      the websocket config* @param pluginDataSubscriber the plugin data subscriber* @param metaDataSubscribers  the meta data subscribers* @param authDataSubscribers  the auth data subscribers*/public WebsocketSyncDataService(final WebsocketConfig websocketConfig,final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers,final List<AuthDataSubscriber> authDataSubscribers) {String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));for (String url : urls) {try {clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));} catch (URISyntaxException e) {log.error("websocket url({}) is error", url, e);}}try {for (WebSocketClient client : clients) {boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);if (success) {log.info("websocket connection is successful.....");} else {log.error("websocket connection is error.....");}executor.scheduleAtFixedRate(() -> {try {if (client.isClosed()) {boolean reconnectSuccess = client.reconnectBlocking();if (reconnectSuccess) {log.info("websocket reconnect is successful.....");} else {log.error("websocket reconnection is error.....");}}} catch (InterruptedException e) {log.error("websocket connect is error :{}", e.getMessage());}}, 10, 30, TimeUnit.SECONDS);}/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/} catch (InterruptedException e) {log.info("websocket connection...exception....", e);}}@Overridepublic void close() {for (WebSocketClient client : clients) {if (!client.isClosed()) {client.close();}}if (Objects.nonNull(executor)) {executor.shutdown();}}
}
public final class SoulWebsocketClient extends WebSocketClient {private volatile boolean alreadySync = Boolean.FALSE;private final WebsocketDataHandler websocketDataHandler;/*** Instantiates a new Soul websocket client.** @param serverUri             the server uri* @param pluginDataSubscriber the plugin data subscriber* @param metaDataSubscribers   the meta data subscribers* @param authDataSubscribers   the auth data subscribers*/public SoulWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {super(serverUri);this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);}@Overridepublic void onOpen(final ServerHandshake serverHandshake) {if (!alreadySync) {send(DataEventTypeEnum.MYSELF.name()); // 进行全量更新alreadySync = true;}}@Overridepublic void onMessage(final String result) {handleResult(result); // 进行增量更新}@Overridepublic void onClose(final int i, final String s, final boolean b) {this.close();}@Overridepublic void onError(final Exception e) {this.close();}@SuppressWarnings("ALL")private void handleResult(final String result) {WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());String eventType = websocketData.getEventType();String json = GsonUtils.getInstance().toJson(websocketData.getData());websocketDataHandler.executor(groupEnum, json, eventType);}
}

综上所述,soul的websocket数据同步是第一次进行全量更新,运行中进行增量更新的设计。

soul之websocket数据同步相关推荐

  1. Soul 源码分析07 SOUL Admin 网关 Http长轮询 数据同步

    SOUL Admin & 网关 Http长轮询 数据同步 书接上回, 使用Http长轮询, 数据同步, soul-admin/src/main/resources/application.ym ...

  2. soul从入门到进阶05——soul-bootstrap数据同步流程

    我们在 soul-admin的数据同步流程中分析了admin的数据同步流程,这篇我们来看看soul-bootstrap的数据同步流程 启动 soul-bootstrap 打印如下日志,我们同样从日志着 ...

  3. soul从入门到进阶02——soul-admin的数据同步流程

    soul-admin 的数据同步流程分析 在上一篇文章中我们提到,根据官网的介绍数据配置流程 soul网关的配置数据在修改后立即同步到soul gateway 中,不需要重启,性能高,生效快. 这个特 ...

  4. soul网关mysql8_深度解析 Soul 网关——数据同步

    引子 从官网 clone 代码下来后,依次启动 soul-admin.soul-bootstrap 和 soul-examples-http 模块,启动成功后查看 soul-admin 会发现 sou ...

  5. Apache ShenYu源码阅读系列-基于ZooKeeper的数据同步

    Apache ShenYu 是一个异步的,高性能的,跨语言的,响应式的 API 网关. 在ShenYu网关中,数据同步是指,当在后台管理系统中,数据发送了更新后,如何将更新的数据同步到网关中.Apac ...

  6. soul_admin之使用zookeeper数据同步

    soul-admin修改成zookeeper的数据同步方式 参考的是项目文档https://dromara.org/zh-cn/docs/soul/user-dataSync.html 记得先启动zo ...

  7. 这款 MySQL、Oracle、HDFS 数据同步工具,有点牛逼!

    点击上方"Java基基",选择"设为星标" 做积极的人,而不是积极废人! 每天 14:00 更新文章,每天掉亿点点头发... 源码精品专栏 原创 | Java ...

  8. 两台SQL Server数据同步解决方案

    复制的概念 复制是将一组数据从一个数据源拷贝到多个数据源的技术,是将一份数据发布到多个存储站点上的有效方式.使用复制技术,用户可以将一份数据发布到多台服务器上,从而使不同的服务器用户都可以在权限的许可 ...

  9. Java多线程编程实战:模拟大量数据同步

    背景 最近对于 Java 多线程做了一段时间的学习,笔者一直认为,学习东西就是要应用到实际的业务需求中的.否则要么无法深入理解,要么硬生生地套用技术只是达到炫技的效果. 不过笔者仍旧认为自己对于多线程 ...

最新文章

  1. PyQt5 技巧篇-参数控制窗体右上角只显示关闭按钮实例演示
  2. POJ 3922 A simple stone game(K倍减法游戏)
  3. UITableView的UITableViewStyleGrouped
  4. Kickstart之添加自动化脚本
  5. 计算机系统结构专业是什么专业,中国大学计算机系统结构专业排名
  6. SAP那些事-理论篇-7-SAP的优势和劣势
  7. SYNOPSYS™光学设计软件---设计自由曲面反射系统
  8. SpringSecurity自定义多Provider时提示No AuthenticationProvider found for问题的解决方案与原理(四)
  9. C语言打印菱形和空心菱形
  10. 《人月神话》(The Mythical Man-Month)看清问题的本质:如果我们想解决问题,就必须试图先去理解它...
  11. 美术 3.2 2D动画Spine基础教学
  12. 【中学】判断三角形的形状
  13. AIX7.1 VMO 参数默认设置
  14. 2019-11-29-win10-UWP-Controls-by-function
  15. AFN(上传、下载)
  16. STM32跳入HardFault Handler中断分析
  17. 数据永久保存?有人要把资料存月球上
  18. 机器学习基础:信息论
  19. Python简单?先来40道基础面试题测试下
  20. 问财爬虫Python第三方包,仅供学习使用

热门文章

  1. 微信小程序中相机api_微信小程序API 相机·CameraContext实例
  2. 如何恢复一个被误drop的存储过程
  3. 阿里达摩盘:双11大促人群诊断、DEEPLINK洞察、大促标签定制
  4. IDEA中注释@param 参数名称不存在时,飘红报错解决
  5. 看门狗2服务器位置,看门狗2怎么爬进服务器 | 手游网游页游攻略大全
  6. 【Lua 教程系列第 4 篇】Lua 中的第一行代码 hello world
  7. 【CYH-02】NOIp考砸后虐题赛:成绩:题解
  8. Chapter23: Molecule Ideation Using Matched Molecular Pairs
  9. 有色金属行业数字化之路探析
  10. 日语语法实践篇十二——新编日语第一册第十三课之前文篇