点击上方 "编程技术圈"关注, 星标或置顶一起成长

后台回复“大礼包”有惊喜礼包!

日英文

Close your eyes. Clear your heart. Let it go.

闭上眼睛,清理你的心,过去的就让它过去吧。

每日掏心话

清淡的人生,步履更轻松。一粥一勺是清淡,健康、温暖、妥帖;一瓢一箪是清淡,随意、自在、安心。

责编:乐乐 | 来自:网络

编程技术圈(ID:study_tech)第 1184 次推文

往日回顾:漫画:为什么程序猿 996 会猝死,而企业家 007 却不会?

     

   正文   

-     背景     -最近在github上看了soul网关的设计,突然就来了兴趣准备自己从零开始写一个高性能的网关。经过两周时间的开发,我的网关ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差没有管理后台。-     设计     -1、技术选型网关是所有请求的入口,所以要求有很高的吞吐量,为了实现这点可以使用请求异步化来解决。目前一般有以下两种方案:
Tomcat/Jetty+NIO+Servlet3Servlet3已经支持异步,这种方案使用比较多,京东,有赞和Zuul,都用的是这种方案。
Netty+NIONetty为高并发而生,目前唯品会的网关使用这个策略,在唯品会的技术文章中在相同的情况下Netty是每秒30w+的吞吐量,Tomcat是13w+,可以看出是有一定的差距的,但是Netty需要自己处理HTTP协议,这一块比较麻烦。
后面发现Soul网关是基于Spring WebFlux(底层Netty)的,不用太关心HTTP协议的处理,于是决定也用Spring WebFlux。
网关的第二个特点是具备可扩展性,比如Netflix Zuul有preFilters,postFilters等在不同的阶段方便处理不同的业务,基于责任链模式将请求进行链式处理即可实现。
在微服务架构下,服务都会进行多实例部署来保证高可用,请求到达网关时,网关需要根据URL找到所有可用的实例,这时就需要服务注册和发现功能,即注册中心。
现在流行的注册中心有Apache的Zookeeper和阿里的Nacos两种(consul有点小众),因为之前写RPC框架时已经用过了Zookeeper,所以这次就选择了Nacos。
2、需求清单首先要明确目标,即开发一个具备哪些特性的网关,总结下后如下:
自定义路由规则可基于version的路由规则设置,路由对象包括DEFAUL,HEADER和QUERY三种,匹配方式包括=、regex、like三种。
跨语言HTTP协议天生跨语言
高性能Netty本身就是一款高性能的通信框架,同时server将一些路由规则等数据缓存到JVM内存避免请求admin服务。
高可用支持集群模式防止单节点故障,无状态。
灰度发布灰度发布(又名金丝雀发布)是指在黑与白之间,能够平滑过渡的一种发布方式。在其上可以进行A/B testing,即让一部分用户继续用产品特性A,一部分用户开始用产品特性B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面来。通过特性一可以实现。
接口鉴权基于责任链模式,用户开发自己的鉴权插件即可。
负载均衡支持多种负载均衡算法,如随机,轮询,加权轮询等。利用SPI机制可以根据配置进行动态加载。
3、架构设计在参考了一些优秀的网关Zuul,Spring Cloud Gateway,Soul后,将项目划分为以下几个模块。它们之间的关系如图:
网关设计注意: 这张图与实际实现有点出入,Nacos push到本地缓存的那个环节没有实现,目前只有ship-sever定时轮询pull的过程。ship-admin从Nacos获取注册服务信息的过程,也改成了ServiceA启动时主动发生HTTP请求通知ship-admin。
4、表结构设计-     编码     -
1、ship-client-spring-boot-starter
首先创建一个spring-boot-starter命名为ship-client-spring-boot-starter,不知道如何自定义starter的可以看我以前写的《开发自己的starter》。
其核心类 AutoRegisterListener 就是在项目启动时做了两件事:1.将服务信息注册到Nacos注册中心2.通知ship-admin服务上线了并注册下线hook。
代码如下:* Created by 2YSP on 2020/12/21
*/
public class AutoRegisterListener implements ApplicationListener<ContextRefreshedEvent> {private final static Logger LOGGER = LoggerFactory.getLogger(AutoRegisterListener.class);private volatile AtomicBoolean registered = new AtomicBoolean(false);private final ClientConfigProperties properties;@NacosInjectedprivate NamingService namingService;@Autowiredprivate RequestMappingHandlerMapping handlerMapping;private final ExecutorService pool;/**
* url list to ignore
*/private static List<String> ignoreUrlList = new LinkedList<>();static {ignoreUrlList.add("/error");}public AutoRegisterListener(ClientConfigProperties properties) {if (!check(properties)) {LOGGER.error("client config port,contextPath,appName adminUrl and version can't be empty!");throw new ShipException("client config port,contextPath,appName adminUrl and version can't be empty!");}this.properties = properties;pool = new ThreadPoolExecutor(1, 4, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());}/**
* check the ClientConfigProperties
*
* @param properties
* @return
*/private boolean check(ClientConfigProperties properties) {if (properties.getPort() == null| properties.getContextPath() == null| properties.getVersion() == null| properties.getAppName() == null| properties.getAdminUrl() == null) {return false;}return true;}@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {if (!registered.compareAndSet(false, true)) {return;}doRegister();registerShutDownHook();}/**
* send unregister request to admin when jvm shutdown
*/private void registerShutDownHook() {final String url = "http://" + properties.getAdminUrl() + AdminConstants.UNREGISTER_PATH;final UnregisterAppDTO unregisterAppDTO = new UnregisterAppDTO();unregisterAppDTO.setAppName(properties.getAppName());unregisterAppDTO.setVersion(properties.getVersion());unregisterAppDTO.setIp(IpUtil.getLocalIpAddress());unregisterAppDTO.setPort(properties.getPort());Runtime.getRuntime().addShutdownHook(new Thread(() -> {OkhttpTool.doPost(url, unregisterAppDTO);LOGGER.info("[{}:{}] unregister from ship-admin success!", unregisterAppDTO.getAppName(), unregisterAppDTO.getVersion());}));}/**
* register all interface info to register center
*/private void doRegister() {Instance instance = new Instance();instance.setIp(IpUtil.getLocalIpAddress());instance.setPort(properties.getPort());instance.setEphemeral(true);Map<String, String> metadataMap = new HashMap<>();metadataMap.put("version", properties.getVersion());metadataMap.put("appName", properties.getAppName());instance.setMetadata(metadataMap);try {namingService.registerInstance(properties.getAppName(), NacosConstants.APP_GROUP_NAME, instance);} catch (NacosException e) {LOGGER.error("register to nacos fail", e);throw new ShipException(e.getErrCode(), e.getErrMsg());}LOGGER.info("register interface info to nacos success!");// send register request to ship-adminString url = "http://" + properties.getAdminUrl() + AdminConstants.REGISTER_PATH;RegisterAppDTO registerAppDTO = buildRegisterAppDTO(instance);OkhttpTool.doPost(url, registerAppDTO);LOGGER.info("register to ship-admin success!");}private RegisterAppDTO buildRegisterAppDTO(Instance instance) {RegisterAppDTO registerAppDTO = new RegisterAppDTO();registerAppDTO.setAppName(properties.getAppName());registerAppDTO.setContextPath(properties.getContextPath());registerAppDTO.setIp(instance.getIp());registerAppDTO.setPort(instance.getPort());registerAppDTO.setVersion(properties.getVersion());return registerAppDTO;}
}2、ship-servership-sever项目主要包括了两个部分内容, 1.请求动态路由的主流程 2.本地缓存数据和ship-admin及nacos同步,这部分在后面3.3再讲。ship-server实现动态路由的原理是利用WebFilter拦截请求,然后将请求教给plugin chain去链式处理。
PluginFilter根据URL解析出appName,然后将启用的plugin组装成plugin chain。
public class PluginFilter implements WebFilter {private ServerConfigProperties properties;public PluginFilter(ServerConfigProperties properties) {this.properties = properties;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {String appName = parseAppName(exchange);if (CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName))) {throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);}PluginChain pluginChain = new PluginChain(properties, appName);pluginChain.addPlugin(new DynamicRoutePlugin(properties));pluginChain.addPlugin(new AuthPlugin(properties));return pluginChain.execute(exchange, pluginChain);}private String parseAppName(ServerWebExchange exchange) {RequestPath path = exchange.getRequest().getPath();String appName = path.value().split("/")[1];return appName;}
}```PluginChain继承了AbstractShipPlugin并持有所有要执行的插件。```java
* @Author: Ship
* @Description:
* @Date: Created in 2020/12/25
*/
public class PluginChain extends AbstractShipPlugin {/**
* the pos point to current plugin
*/private int pos;/**
* the plugins of chain
*/private List<ShipPlugin> plugins;private final String appName;public PluginChain(ServerConfigProperties properties, String appName) {super(properties);this.appName = appName;}/**
* add enabled plugin to chain
*
* @param shipPlugin
*/public void addPlugin(ShipPlugin shipPlugin) {if (plugins == null) {plugins = new ArrayList<>();}if (!PluginCache.isEnabled(appName, shipPlugin.name())) {return;}plugins.add(shipPlugin);// order by the plugin's orderplugins.sort(Comparator.comparing(ShipPlugin::order));}@Overridepublic Integer order() {return null;}@Overridepublic String name() {return null;}@Overridepublic Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {if (pos == plugins.size()) {return exchange.getResponse().setComplete();}return pluginChain.plugins.get(pos++).execute(exchange, pluginChain);}public String getAppName() {return appName;}}
AbstractShipPlugin实现了ShipPlugin接口,并持有ServerConfigProperties配置对象。
public abstract class AbstractShipPlugin implements ShipPlugin {protected ServerConfigProperties properties;public AbstractShipPlugin(ServerConfigProperties properties) {this.properties = properties;}
}```ShipPlugin接口定义了所有插件必须实现的三个方法order(),name()和execute()。```java
public interface ShipPlugin {/**
* lower values have higher priority
*
* @return
*/Integer order();/**
* return current plugin name
*
* @return
*/String name();Mono<Void> execute(ServerWebExchange exchange,PluginChain pluginChain);}```DynamicRoutePlugin继承了抽象类AbstractShipPlugin,包含了动态路由的主要业务逻辑。```java
* @Author: Ship
* @Description:
* @Date: Created in 2020/12/25
*/
public class DynamicRoutePlugin extends AbstractShipPlugin {private final static Logger LOGGER = LoggerFactory.getLogger(DynamicRoutePlugin.class);private static WebClient webClient;private static final Gson gson = new GsonBuilder().create();static {HttpClient httpClient = HttpClient.create().tcpConfiguration(client ->client.doOnConnected(conn ->conn.addHandlerLast(new ReadTimeoutHandler(3)).addHandlerLast(new WriteTimeoutHandler(3))).option(ChannelOption.TCP_NODELAY, true));webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build();}public DynamicRoutePlugin(ServerConfigProperties properties) {super(properties);}@Overridepublic Integer order() {return ShipPluginEnum.DYNAMIC_ROUTE.getOrder();}@Overridepublic String name() {return ShipPluginEnum.DYNAMIC_ROUTE.getName();}@Overridepublic Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {String appName = pluginChain.getAppName();ServiceInstance serviceInstance = chooseInstance(appName, exchange.getRequest());
//        LOGGER.info("selected instance is [{}]", gson.toJson(serviceInstance));// request serviceString url = buildUrl(exchange, serviceInstance);return forward(exchange, url);}/**
* forward request to backend service
*
* @param exchange
* @param url
* @return
*/private Mono<Void> forward(ServerWebExchange exchange, String url) {ServerHttpRequest request = exchange.getRequest();ServerHttpResponse response = exchange.getResponse();HttpMethod method = request.getMethod();WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(url).headers((headers) -> {headers.addAll(request.getHeaders());});WebClient.RequestHeadersSpec<?> reqHeadersSpec;if (requireHttpBody(method)) {reqHeadersSpec = requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));} else {reqHeadersSpec = requestBodySpec;}// nio->callback->nioreturn reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis())).onErrorResume(ex -> {return Mono.defer(() -> {String errorResultJson = "";if (ex instanceof TimeoutException) {errorResultJson = "{\"code\":5001,\"message\":\"network timeout\"}";} else {errorResultJson = "{\"code\":5000,\"message\":\"system error\"}";}return ShipResponseUtil.doResponse(exchange, errorResultJson);}).then(Mono.empty());}).flatMap(backendResponse -> {response.setStatusCode(backendResponse.statusCode());response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());return response.writeWith(backendResponse.bodyToFlux(DataBuffer.class));});}/**
* weather the http method need http body
*
* @param method
* @return
*/private boolean requireHttpBody(HttpMethod method) {if (method.equals(HttpMethod.POST)| method.equals(HttpMethod.PUT)| method.equals(HttpMethod.PATCH)) {return true;}return false;}private String buildUrl(ServerWebExchange exchange, ServiceInstance serviceInstance) {ServerHttpRequest request = exchange.getRequest();String query = request.getURI().getQuery();String path = request.getPath().value().replaceFirst("/" + serviceInstance.getAppName(), "");String url = "http://" + serviceInstance.getIp() + ":" + serviceInstance.getPort() + path;if (!StringUtils.isEmpty(query)) {url = url + "?" + query;}return url;}/**
* choose an ServiceInstance according to route rule config and load balancing algorithm
*
* @param appName
* @param request
* @return
*/private ServiceInstance chooseInstance(String appName, ServerHttpRequest request) {List<ServiceInstance> serviceInstances = ServiceCache.getAllInstances(appName);if (CollectionUtils.isEmpty(serviceInstances)) {LOGGER.error("service instance of {} not find", appName);throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);}String version = matchAppVersion(appName, request);if (StringUtils.isEmpty(version)) {throw new ShipException("match app version error");}// filter serviceInstances by versionList<ServiceInstance> instances = serviceInstances.stream().filter(i -> i.getVersion().equals(version)).collect(Collectors.toList());//Select an instance based on the load balancing algorithmLoadBalance loadBalance = LoadBalanceFactory.getInstance(properties.getLoadBalance(), appName, version);ServiceInstance serviceInstance = loadBalance.chooseOne(instances);return serviceInstance;}private String matchAppVersion(String appName, ServerHttpRequest request) {List<AppRuleDTO> rules = RouteRuleCache.getRules(appName);rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed());for (AppRuleDTO rule : rules) {if (match(rule, request)) {return rule.getVersion();}}return null;}private boolean match(AppRuleDTO rule, ServerHttpRequest request) {String matchObject = rule.getMatchObject();String matchKey = rule.getMatchKey();String matchRule = rule.getMatchRule();Byte matchMethod = rule.getMatchMethod();if (MatchObjectEnum.DEFAULT.getCode().equals(matchObject)) {return true;} else if (MatchObjectEnum.QUERY.getCode().equals(matchObject)) {String param = request.getQueryParams().getFirst(matchKey);if (!StringUtils.isEmpty(param)) {return StringTools.match(param, matchMethod, matchRule);}} else if (MatchObjectEnum.HEADER.getCode().equals(matchObject)) {HttpHeaders headers = request.getHeaders();String headerValue = headers.getFirst(matchKey);if (!StringUtils.isEmpty(headerValue)) {return StringTools.match(headerValue, matchMethod, matchRule);}}return false;}}
3、数据同步app数据同步后台服务(如订单服务)启动时,只将服务名,版本,ip地址和端口号注册到了Nacos,并没有实例的权重和启用的插件信息怎么办?
搜索后端架构师公众号回复“架构整洁”,送你一份惊喜礼包。
一般在线的实例权重和插件列表都是在管理界面配置,然后动态生效的,所以需要ship-admin定时更新实例的权重和插件信息到注册中心。
对应代码ship-admin的NacosSyncListener:
* @Author: Ship
* @Description:
* @Date: Created in 2020/12/30
*/
@Configuration
public class NacosSyncListener implements ApplicationListener<ContextRefreshedEvent> {private static final Logger LOGGER = LoggerFactory.getLogger(NacosSyncListener.class);private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,new ShipThreadFactory("nacos-sync", true).create());@NacosInjectedprivate NamingService namingService;@Value("${nacos.discovery.server-addr}")private String baseUrl;@Resourceprivate AppService appService;@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {if (event.getApplicationContext().getParent() != null) {return;}String url = "http://" + baseUrl + NacosConstants.INSTANCE_UPDATE_PATH;scheduledPool.scheduleWithFixedDelay(new NacosSyncTask(namingService, url, appService), 0, 30L, TimeUnit.SECONDS);}class NacosSyncTask implements Runnable {private NamingService namingService;private String url;private AppService appService;private Gson gson = new GsonBuilder().create();public NacosSyncTask(NamingService namingService, String url, AppService appService) {this.namingService = namingService;this.url = url;this.appService = appService;}/**
* Regular update weight,enabled plugins to nacos instance
*/@Overridepublic void run() {try {// get all app namesListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);if (CollectionUtils.isEmpty(services.getData())) {return;}List<String> appNames = services.getData();List<AppInfoDTO> appInfos = appService.getAppInfos(appNames);for (AppInfoDTO appInfo : appInfos) {if (CollectionUtils.isEmpty(appInfo.getInstances())) {continue;}for (ServiceInstance instance : appInfo.getInstances()) {Map<String, Object> queryMap = buildQueryMap(appInfo, instance);String resp = OkhttpTool.doPut(url, queryMap, "");LOGGER.debug("response :{}", resp);}}} catch (Exception e) {LOGGER.error("nacos sync task error", e);}}private Map<String, Object> buildQueryMap(AppInfoDTO appInfo, ServiceInstance instance) {Map<String, Object> map = new HashMap<>();map.put("serviceName", appInfo.getAppName());map.put("groupName", NacosConstants.APP_GROUP_NAME);map.put("ip", instance.getIp());map.put("port", instance.getPort());map.put("weight", instance.getWeight().doubleValue());NacosMetadata metadata = new NacosMetadata();metadata.setAppName(appInfo.getAppName());metadata.setVersion(instance.getVersion());metadata.setPlugins(String.join(",", appInfo.getEnabledPlugins()));map.put("metadata", StringTools.urlEncode(gson.toJson(metadata)));map.put("ephemeral", true);return map;}}
}
ship-server再定时从Nacos拉取app数据更新到本地Map缓存。
* @Author: Ship
* @Description: sync data to local cache
* @Date: Created in 2020/12/25
*/
@Configuration
public class DataSyncTaskListener implements ApplicationListener<ContextRefreshedEvent> {private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,new ShipThreadFactory("service-sync", true).create());@NacosInjectedprivate NamingService namingService;@Autowiredprivate ServerConfigProperties properties;@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {if (event.getApplicationContext().getParent() != null) {return;}scheduledPool.scheduleWithFixedDelay(new DataSyncTask(namingService), 0L, properties.getCacheRefreshInterval(), TimeUnit.SECONDS);WebsocketSyncCacheServer websocketSyncCacheServer = new WebsocketSyncCacheServer(properties.getWebSocketPort());websocketSyncCacheServer.start();}class DataSyncTask implements Runnable {private NamingService namingService;public DataSyncTask(NamingService namingService) {this.namingService = namingService;}@Overridepublic void run() {try {// get all app namesListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);if (CollectionUtils.isEmpty(services.getData())) {return;}List<String> appNames = services.getData();// get all instancesfor (String appName : appNames) {List<Instance> instanceList = namingService.getAllInstances(appName, NacosConstants.APP_GROUP_NAME);if (CollectionUtils.isEmpty(instanceList)) {continue;}ServiceCache.add(appName, buildServiceInstances(instanceList));List<String> pluginNames = getEnabledPlugins(instanceList);PluginCache.add(appName, pluginNames);}ServiceCache.removeExpired(appNames);PluginCache.removeExpired(appNames);} catch (NacosException e) {e.printStackTrace();}}private List<String> getEnabledPlugins(List<Instance> instanceList) {Instance instance = instanceList.get(0);Map<String, String> metadata = instance.getMetadata();// plugins: DynamicRoute,AuthString plugins = metadata.getOrDefault("plugins", ShipPluginEnum.DYNAMIC_ROUTE.getName());return Arrays.stream(plugins.split(",")).collect(Collectors.toList());}private List<ServiceInstance> buildServiceInstances(List<Instance> instanceList) {List<ServiceInstance> list = new LinkedList<>();instanceList.forEach(instance -> {Map<String, String> metadata = instance.getMetadata();ServiceInstance serviceInstance = new ServiceInstance();serviceInstance.setAppName(metadata.get("appName"));serviceInstance.setIp(instance.getIp());serviceInstance.setPort(instance.getPort());serviceInstance.setVersion(metadata.get("version"));serviceInstance.setWeight((int) instance.getWeight());list.add(serviceInstance);});return list;}}
}
路由规则数据同步
同时,如果用户在管理后台更新了路由规则,ship-admin需要推送规则数据到ship-server,这里参考了soul网关的做法利用websocket在第一次建立连接后进行全量同步,此后路由规则发生变更就只作增量同步。
搜索顶级架构师公众号回复“架构”,送你一份惊喜礼包。
服务端WebsocketSyncCacheServer:
* @Author: Ship
* @Description:
* @Date: Created in 2020/12/28
*/
public class WebsocketSyncCacheServer extends WebSocketServer {private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheServer.class);private Gson gson = new GsonBuilder().create();private MessageHandler messageHandler;public WebsocketSyncCacheServer(Integer port) {super(new InetSocketAddress(port));this.messageHandler = new MessageHandler();}@Overridepublic void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {LOGGER.info("server is open");}@Overridepublic void onClose(WebSocket webSocket, int i, String s, boolean b) {LOGGER.info("websocket server close...");}@Overridepublic void onMessage(WebSocket webSocket, String message) {LOGGER.info("websocket server receive message:\n[{}]", message);this.messageHandler.handler(message);}@Overridepublic void onError(WebSocket webSocket, Exception e) {}@Overridepublic void onStart() {LOGGER.info("websocket server start...");}class MessageHandler {public void handler(String message) {RouteRuleOperationDTO operationDTO = gson.fromJson(message, RouteRuleOperationDTO.class);if (CollectionUtils.isEmpty(operationDTO.getRuleList())) {return;}Map<String, List<AppRuleDTO>> map = operationDTO.getRuleList().stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));if (OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType())| OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType())) {RouteRuleCache.add(map);} else if (OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType())) {RouteRuleCache.remove(map);}}}
}
客户端WebsocketSyncCacheClient:
* @Author: Ship
* @Description:
* @Date: Created in 2020/12/28
*/
@Component
public class WebsocketSyncCacheClient {private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheClient.class);private WebSocketClient client;private RuleService ruleService;private Gson gson = new GsonBuilder().create();public WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}") String serverWebSocketUrl,
RuleService ruleService) {if (StringUtils.isEmpty(serverWebSocketUrl)) {throw new ShipException(ShipExceptionEnum.CONFIG_ERROR);}this.ruleService = ruleService;ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,new ShipThreadFactory("websocket-connect", true).create());try {client = new WebSocketClient(new URI(serverWebSocketUrl)) {@Overridepublic void onOpen(ServerHandshake serverHandshake) {LOGGER.info("client is open");List<AppRuleDTO> list = ruleService.getEnabledRule();String msg = gson.toJson(new RouteRuleOperationDTO(OperationTypeEnum.INSERT, list));send(msg);}@Overridepublic void onMessage(String s) {}@Overridepublic void onClose(int i, String s, boolean b) {}@Overridepublic void onError(Exception e) {LOGGER.error("websocket client error", e);}};client.connectBlocking();//使用调度线程池进行断线重连,30秒进行一次executor.scheduleAtFixedRate(() -> {if (client != null && client.isClosed()) {try {client.reconnectBlocking();} catch (InterruptedException e) {LOGGER.error("reconnect server fail", e);}}}, 10, 30, TimeUnit.SECONDS);} catch (Exception e) {LOGGER.error("websocket sync cache exception", e);throw new ShipException(e.getMessage());}}public <T> void send(T t) {while (!client.getReadyState().equals(ReadyState.OPEN)) {LOGGER.debug("connecting ...please wait");}client.send(gson.toJson(t));}
}-     测试     -1、动态路由测试本地启动nacos ,sh startup.sh -m standalone;
启动ship-admin;
本地启动两个ship-example实例。
实例1配置:ship:http:app-name: orderversion: gray_1.0context-path: /orderport: 8081admin-url: 127.0.0.1:9001server:port: 8081nacos:discovery:server-addr: 127.0.0.1:8848
实例2配置:
ship:http:app-name: orderversion: prod_1.0context-path: /orderport: 8082admin-url: 127.0.0.1:9001server:port: 8082nacos:discovery:server-addr: 127.0.0.1:8848
在数据库添加路由规则配置,该规则表示当http header 中的name=ship时请求路由到gray_1.0版本的节点。启动ship-server,看到以下日志时则可以进行测试了。2021-01-02 19:57:09.159  INFO 30413 --- [SocketWorker-29] cn.sp.sync.WebsocketSyncCacheServer      : websocket server receive message:[{"operationType":"INSERT","ruleList":[{"id":1,"appId":5,"appName":"order","version":"gray_1.0","matchObject":"HEADER","matchKey":"name","matchMethod":1,"matchRule":"ship","priority":50}]}]
用Postman请求http://localhost:9000/order/user/add,POST方式,header设置name=ship,可以看到只有实例1有日志显示。
==========add user,version:gray_1.02、性能压测压测环境:
MacBook Pro 13英寸处理器 2.3 GHz 四核Intel Core i7内存 16 GB 3733 MHz LPDDR4X后端节点个数一个压测工具:wrk压测结果:20个线程,500个连接数,吞吐量大概每秒9400个请求。
压测结果-     总结    -千里之行始于足下,开始以为写一个网关会很难,但当你实际开始行动时就会发现其实没那么难,所以迈出第一步很重要。过程中也遇到了很多问题,还在github上给soul和nacos这两个开源项目提了两个issue,后来发现是自己的问题,尴尬。本文代码已全部上传到:https://github.com/2YSP/ship-gate 。
PS:欢迎在留言区留下你的观点,一起讨论提高。如果今天的文章让你有新的启发,欢迎转发分享给更多人。版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢!欢迎加入后端架构师交流群,在后台回复“学习”即可。猜你还想看
阿里、腾讯、百度、华为、京东最新面试题汇集
费解!为什么那么多人用“ji32k7au4a83”作密码?微软在日本尝试了每周4天工作制,生产力跃升了40%996引起公愤,要到头了?
BAT等大厂Java面试经验总结
别找了,想获取 Java大厂面试题学习资料
扫下方二维码回复「手册」就好了嘿,你在看吗?

如何设计一个高性能网关?相关推荐

  1. 阿里终面:如何设计一个高性能网关?

    作者:烟味i cnblogs.com/2YSP/p/14223892.html 一.前言 最近在 github 上看了 soul 网关的设计,突然就来了兴趣准备自己从零开始写一个高性能的网关.经过两周 ...

  2. 设计 一个高性能爬虫系统

    资料来源 http://www.xuebuyuan.com/1296711.html 开源中国 http://my.oschina.net/eshijia/blog/136595 最近看了一篇来自纽约 ...

  3. 系统设计——如何设计一个高性能的短链接系统?

    短链系统设计看起来很简单,但如何设计一个高性能短链系统呢,这也是面试中非常常见的一道设计题. 首先,为什么要用短链? 短链跳转的基本原理是什么? 短链生成的几种方法你知道吗? 高性能短链的架构如何设计 ...

  4. 如何设计一个高性能的秒杀系统

    秒杀系统要如何架构,在做技术方案时要注意哪些问题,搞了个秒杀专辑,专门收集秒杀系列文章. 当你去一家公司面试时,很多面试官都会问你如何设计一个高性能秒杀系统.秒杀涉及的技术域从客户端.浏览器.网络.负 ...

  5. 如何设计一个高性能短链系统?

    目录 前言 短链有啥好处,用长链不香吗 短链跳转的基本原理 短链生成的几种方法 1.哈希算法 如何缩短域名 如何解决哈希冲突的问题? 2.自增序列算法 高性能短链的架构设计 总结 前言 今天,我们来谈 ...

  6. 如何设计一个高性能积分系统

    如何设计一个高性能积分系统 功能说明 1:用户签到可以获得积分,需要按照用户维度每天进行用户总积分排行榜 2:需要近实时更新排行榜 3:在积分相同的情况下,需要按照先注册的用户排在前面 4:用户量10 ...

  7. 如何设计一个高性能Elasticsearch mapping

    如何设计一个高性能Elasticsearch mapping 前言 mapping mapping 能做什么 Dynamic mapping dynamic=true dynamic=runtime ...

  8. 如何设计一个高性能CPU?

    任何一种技术都会经历从阳春白雪到下里巴人的过程,就像我们对计算机的理解从"戴着鞋套才能进的机房"变成了随处可见的智能手机.在前面20年中,大数据技术也经历了这样的过程,从曾经高高在 ...

  9. 基于计算机底层基础设计一个高性能的单机管理主机的心跳服务

    大家耐心看下去,你会发现原来计算机基础知识的用处,相信我,你会感触很深刻. 案例需求 ==== 后台通常是由多台服务器对外提供服务的,也就是所谓的集群. 如果集群中的某一台主机宕机了,我们必须要感知到 ...

最新文章

  1. Linux内核对设备树的处理
  2. 大数据入门笔记(三)
  3. 微软Build 2019大会.NET课程视频汇总
  4. linux命令终极系列awk
  5. PC智能自媒体高效运营管理工具
  6. java 旅游网站项目实现_基于jsp的旅游网站a-JavaEE实现旅游网站a - java项目源码
  7. 【STM32】STM32F4 GPIO八种模式及工作原理详解
  8. MySql查询某个时间段内的数据
  9. 多目标优化问题和遗传算法学习笔记
  10. YGG:2021年年终回顾
  11. 2017年全国大学生电子设计竞赛 单相用电器分析监测装置(k题)
  12. AMD RDNA Architecture - AMD RDNA 架构
  13. Clinical Chemistry | 张建中/徐健开发幽门螺杆菌单细胞精准诊疗技术
  14. 我的世界java边境之地_我的世界:MC人迹罕至的6种“边境之地”,最后1种让人后背发凉!...
  15. 【鸿蒙】HarMonyOS之Text组件的常用属性
  16. 忠诚度管理软件市场分析-主要企业、产品类别及应用
  17. 淘宝直播技术干货:高清、低延时的实时视频直播技术解密
  18. java b2c_全渠道java b2b b2c o2o平台
  19. 线上线下课程教学培训小程序开发制作功能介绍
  20. ERP群雄逐鹿市场预测

热门文章

  1. 第一阶段 C语言基础与入门3
  2. L78XXCV三端正电压调节器
  3. 风能风力发电系统案例
  4. chronos和simplescalar安装
  5. JAVA—— Vue和Element
  6. Adobe Experience Cloud:整合三朵云 借鉴国外经验提升营销效果
  7. 基于OPENCV的人脸识别学习笔记
  8. 2017计蒜客初赛第二场第一题 百度的年会游戏
  9. 新建实例启动S7-PLCSIM Advanced V2.0失败:Error Code: -30, LicenseNotFound
  10. 人和人的相遇,是必然还是偶然