文章目录

  • Java-Chassis核心源码
    • Consumer到Provider整体流程
    • @EnableServiceComb-初始化ServiceComb
      • TransportManager-管理传输层
      • TransportManager-管理传输层
      • RegistrationManager-注册服务
      • DiscoveryManager-服务发现
      • Registration/Discovery-继承多接口将功能抽象分类可学
      • RemoteServiceRegistry-注意这个怎么和Eureka交互,底层交互有意思
    • @RpcSchema-Pojo类型调用的生产者
    • @RestSchema-Rest形式调用的生产者
    • @RpcReference-消费者
    • SPIServiceUtils-自定义SPI加载器
    • 职责链管理器FilterChainsManager/ConsumerHandlerManager
  • 举一反三
    • EventBus
    • PaaSResourceUtils+PathMatchingResourcePatternResolver
    • PropertySourcesPlaceholderConfigurer
    • ThreadPoolExecutorEx/LinkedBlockingQueueEx
    • ConcurrentHashMapEx
    • AopProxyUtils/BeanUtils
    • ReflectionUtils
    • StringValueResolver
    • SpringCloud

Java-Chassis核心源码

Dubbo3场景及其原理
ServiceComb中的Java-Chassis模块是实现类似Dubbo的RPC的框架,又称Java底座,Dubbo解析请参考上一篇博文。

Consumer到Provider整体流程

  • Consumer和Provider的Handler都支持通过SPI动态扩展,如流控、熔断、隔离、负载均衡等扩展。
  • Consumer在传输层支持Filter拦截过滤请求,支持通过扩展点处理Request和Response。
  • 监听者模式的完美实现,通过SPI读取监听者,监听者监听控制中心的状态,修改控制中心的状态信息。
  • 自己业务代码也可以使用这种方式扩展功能并动态组合策略,解耦各种功能业务代码。
类型 artifact id 是否可选 功能说明
编程模型 provider-pojo 提供RPC开发模式
编程模型 provider-jaxrs 提供JAX RS开发模式
编程模型 provider-springmvc 提供Spring MVC开发模式
通信模型 transport-rest-vertx 运行于HTTP之上的开发框架,不依赖WEB容器,应用打包为可执行jar
通信模型 transport-rest-servlet 运行于WEB容器的开发框架,应用打包为war包
通信模型 transport-highway 提供高性能的私有通信协议,仅用于java之间互通。
运行模型 handler-loadbalance 负载均衡模块。提供各种路由策略和配置方法。一般客户端使用。
运行模型 handler-bizkeeper 和服务治理相关的功能,比如隔离、熔断、容错。
运行模型 handler-tracing 调用链跟踪模块,对接监控系统,吐出打点数据。

@EnableServiceComb-初始化ServiceComb

通过@ImportResource的方式导入ServiceComb的核心Bean定义文件"classpath*:META-INF/spring/scb-core-bean.xml";和"classpath*:META-INF/spring/*.bean.xml";,注入如下Bean定义。

  • ConfigurationSpringInitializer

  • CseApplicationListener,ContextRefreshedEvent事件监听器

    • CseApplicationListener#setApplicationContext初始化相关的连接

      if (this.applicationContext == applicationContext) {// same object. avoid initialize many times.return;}this.applicationContext = applicationContext;BeanUtils.setContext(applicationContext);HttpClients.load(); // Http相关的RegistrationManager.INSTANCE.init(); // 服务注册 管理初始化,通过SPI获取。DiscoveryManager.INSTANCE.init(); // 服务发现 管理初始化,通过SPI获取。
      
    • ContextRefreshedEvent Bean实例化完事件触达,初始化ServiceComb引擎SCBEngine

        @Overridepublic void onApplicationEvent(ApplicationEvent event) {if (initEventClass.isInstance(event)) {if (applicationContext instanceof AbstractApplicationContext) {((AbstractApplicationContext) applicationContext).registerShutdownHook();}SCBEngine scbEngine = SCBEngine.getInstance(); // 1、获取单例scbEngine.setApplicationContext(applicationContext); // 2、设置ContexscbEngine.setPriorityPropertyManager(applicationContext.getBean(PriorityPropertyManager.class));scbEngine.setFilterChainsManager(applicationContext.getBean(FilterChainsManager.class)); // 3、指责链管理类scbEngine.getConsumerProviderManager().getConsumerProviderList().addAll(applicationContext.getBeansOfType(ConsumerProvider.class).values()); // 4、获取支持的Consumer形式,如Rest/PojoscbEngine.getProducerProviderManager().getProducerProviderList().addAll(applicationContext.getBeansOfType(ProducerProvider.class).values()); // // 5、获取Provider注册的元信息,支持从Rest/Pojo中获取scbEngine.addBootListeners(applicationContext.getBeansOfType(BootListener.class).values()); // 6、获取全量SCB初始化监听器BootListener,用于监听SCB在生命周期内的各种事件。scbEngine.run(); // 7、run方法} else if (event instanceof ContextClosedEvent) {if (SCBEngine.getInstance() != null) {SCBEngine.getInstance().destroy();}}}
      
    1. 获取单例SCBEngine<重载类,必须单例,Double Check,骚操作可实现隔离>,会首先初始化事件总线等逻辑。

      public static SCBEngine getInstance() {if (null == INSTANCE) {// private static final Object initializationLock = new Object(); 全局锁synchronized (initializationLock) {if (null == INSTANCE) {new SCBEngine();}}}return INSTANCE;}protected SCBEngine() {eventBus = EventManager.getEventBus(); // 获取事件总线eventBus.register(this); // 注册事件 @Subscribe  @AllowConcurrentEventsINSTANCE = this;producerProviderManager = new ProducerProviderManager(this);serviceRegistryListener = new ServiceRegistryListener(this);}
      
      • 初始化事件总线,使用Guava事件总线处理事件,解耦内部各个模块的逻辑。

      • ProducerProviderManager生产提供者管理器,用于管理不同的Provider类型,包括PojoProducerProvider、RestProducerProvider,下文会通过注解的方式讲解这两种Provider。

      • SCBEngine监听InvocationStartEvent/InvocationFinishEvent事件,用于AtomicLong统计调用开始和调用结束的总数。

          @AllowConcurrentEvents@Subscribepublic void onInvocationStart(InvocationStartEvent event) {invocationStartedCounter.incrementAndGet();}@AllowConcurrentEvents@Subscribepublic void onInvocationFinish(InvocationFinishEvent event) {invocationFinishedCounter.incrementAndGet();}
        
      • 注册ServiceRegistryListener到事件总线,监听以下事件,将引擎需要关注的,和服务发现类的各项操作解耦开,这是常规的操作,我们一般还是可以效仿的。

          @EnableExceptionPropagation@SubscriberOrder(-1000)@Subscribepublic void onCreateMicroservice(CreateMicroserviceEvent event);@EnableExceptionPropagation@SubscriberOrder(-1000)@Subscribepublic void onDestroyMicroservice(DestroyMicroserviceEvent event);@EnableExceptionPropagation@SubscriberOrder(-1000)@Subscribepublic void onCreateMicroserviceVersion(CreateMicroserviceVersionEvent event);
        
    2. 初始化消费者管理,提供者管理,BootListener,BootListener支持SPI或者@Component注解扩展,这种事件总线的模式值得产参考参考各位。

      • 监听各种事件,如切handler或filter前后、服务注册前后、传输前后、消费和生产提供者前后等,支持如下事件

        enum EventType {BEFORE_HANDLER,AFTER_HANDLER,BEFORE_FILTER,AFTER_FILTER,BEFORE_PRODUCER_PROVIDER,AFTER_PRODUCER_PROVIDER,BEFORE_CONSUMER_PROVIDER,AFTER_CONSUMER_PROVIDER,BEFORE_TRANSPORT,AFTER_TRANSPORT,BEFORE_REGISTRY,AFTER_REGISTRY,BEFORE_CLOSE,AFTER_CLOSE}
        
    3. scb.run -> doRun方法,可以打开org.apache.servicecomb.core.SCBEngine的日志

      public synchronized SCBEngine run() {if (SCBStatus.DOWN.equals(status)) {try {doRun(); waitStatusUp(); // 等待SCB变成UP状态,会在收到MicroserviceInstanceRegisteredEvent事件的时候修改。} catch (TimeoutException e) {} catch (Throwable e) {try {destroy();} catch (Exception exception) {}status = SCBStatus.FAILED;} finally {printServiceInfo(); // 最终打印Service的全部信息}}
      private void doRun() throws Exception {status = SCBStatus.STARTING;// 1、BootListener排序包括SPI+Compnent扩展的类bootListeners.sort(Comparator.comparingInt(BootListener::getOrder));// 2、触发BootEvent调用BootListener里面对应的监听方法,上下文Contex类,不论是监听者还是职责链,都会需要传递上下文的Contex类。triggerEvent(EventType.BEFORE_HANDLER);// 3、通过SPI加载全部的Handler到Config.class中,并初始化到Manager中HandlerConfigUtils.init(consumerHandlerManager, producerHandlerManager);// 4、触发Handler加载之后的事件,我们可以监听handler加载到Engine的事件,这种写法爽不,非常爽,事件总线。triggerEvent(EventType.AFTER_HANDLER);// 5、同理操作FiltertriggerEvent(EventType.BEFORE_FILTER);filterChainsManager.init();triggerEvent(EventType.AFTER_FILTER);// 6、创建提供者元信息createProducerMicroserviceMeta();// 7、初始化生产管理者并触发事件triggerEvent(EventType.BEFORE_PRODUCER_PROVIDER);producerProviderManager.init();triggerEvent(EventType.AFTER_PRODUCER_PROVIDER);// 8、初始化消费管理者并触发事件triggerEvent(EventType.BEFORE_CONSUMER_PROVIDER);consumerProviderManager.init(); // 含有生命周期的Class可以使用init,destory等模板方法/**for (ConsumerProvider provider : consumerProviderList) {provider.init(); // 模板方法}**/triggerEvent(EventType.AFTER_CONSUMER_PROVIDER);// 9、初始化TransportManager管理器,通过SPI加载Transport的实现,// 包括HighwayTransport/ServletRestTransport/VertxRestTransport// Transport的实现都有名字,同名会选择其中的一个。// 加载选择完会调用Transport的Init方法初始化。triggerEvent(EventType.BEFORE_TRANSPORT);transportManager.init(this);triggerEvent(EventType.AFTER_TRANSPORT);triggerEvent(EventType.BEFORE_REGISTRY);// 10、注册AfterRegistryEventHanlder监MicroserviceInstanceRegisteredEvent事件,修改scb的状态为uptriggerAfterRegistryEvent(); // 11、服务注册启动RegistrationManager.INSTANCE.run(); // 12、服务发现启动DiscoveryManager.INSTANCE.run(); shutdownHook = new Thread(this::destroyForShutdownHook);Runtime.getRuntime().addShutdownHook(shutdownHook);}    

      通过以上注释的数字索引详解部分有价值的地方。

      • 9、TransportManager

        public void init(SCBEngine scbEngine) throws Exception {buildTransportMap(); // SPI分组并选择for (Transport transport : transportMap.values()) {if (transport.init()) { // 初始化,初始化失败会出问题的Endpoint endpoint = transport.getPublishEndpoint();if (endpoint != null && endpoint.getEndpoint() != null) {LOGGER.info("endpoint to publish: {}", endpoint.getEndpoint());RegistrationManager.INSTANCE.addEndpoint(endpoint.getEndpoint()); // 注册到注册中心中}continue;}}}
        
      • 11、RegistrationManager,非常重要,见下详解。

      • 12、DiscoveryManager

TransportManager-管理传输层

通过SPI解耦了各个Module,用户也可以基于实现自定义Transport层的实现,这种策略方式还是可以学习的。

  1. java-chassis-core-2.6.0.jar核心包中提供了TransportManager、AbstractTransport、Transport基础类。TransportManager通过SPI加载Transport.class的实现类,提供了AbstractTransport.class用于将传输层的功能抽象到父类中。
  2. transport-rest-vertx/transport-rest-servlet/transport-highway三个都是独立的jar包,用于扩展Transport

TransportManager-管理传输层

通过SPI解耦了各个Module,用户也可以基于实现自定义Transport层的实现,这种策略方式还是可以学习的。

  1. java-chassis-core-2.6.0.jar核心包中提供了TransportManager、AbstractTransport、Transport基础类。TransportManager通过SPI加载Transport.class的实现类,提供了AbstractTransport.class用于将传输层的功能抽象到父类中。

  2. transport-rest-vertx/transport-rest-servlet/transport-highway三个都是独立的jar包,用于扩展Transport。

  3. Rest模式的transport会经过各种HttpClientFilter来动态拦截前后的请求,在拦截流程中通过Invocation这个Contex上下文来传递全量信息,这种思路非常的通用极易学习,代码这样写起来非常的舒服好看,这里的异步编程也是值得我们学习的,后面可以细看。

    org.apache.servicecomb.transport.rest.client.http.RestClientInvocation#invoke
    public void invoke(Invocation invocation, AsyncResponse asyncResp) throws Exception {this.invocation = invocation;this.asyncResp = asyncResp;OperationMeta operationMeta = invocation.getOperationMeta();restOperationMeta = operationMeta.getExtData(RestConst.SWAGGER_REST_OPERATION);String path = this.createRequestPath(restOperationMeta);IpPort ipPort = (IpPort) invocation.getEndpoint().getAddress();Future<HttpClientRequest> requestFuture = createRequest(ipPort, path);invocation.getInvocationStageTrace().startGetConnection();requestFuture.compose(clientRequest -> {invocation.getInvocationStageTrace().finishGetConnection();this.clientRequest = clientRequest;clientRequest.putHeader(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE, invocation.getMicroserviceName());RestClientRequestImpl restClientRequest =new RestClientRequestImpl(clientRequest, httpClientWithContext.context(), asyncResp, throwableHandler);invocation.getHandlerContext().put(RestConst.INVOCATION_HANDLER_REQUESTCLIENT, restClientRequest);Buffer requestBodyBuffer;try {requestBodyBuffer = restClientRequest.getBodyBuffer();} catch (Exception e) {return Future.failedFuture(e);}HttpServletRequestEx requestEx = new VertxClientRequestToHttpServletRequest(clientRequest, requestBodyBuffer);invocation.getInvocationStageTrace().startClientFiltersRequest();// 1、执行Filter.beforeSendRequest,可用于动态修改request撒for (HttpClientFilter filter : httpClientFilters) {if (filter.enabled()) {filter.beforeSendRequest(invocation, requestEx);}}// 2、通过事件总线发事件撒invocation.onStartSendRequest();httpClientWithContext.runOnContext(httpClient -> {clientRequest.setTimeout(operationMeta.getConfig().getMsRequestTimeout());clientRequest.response().onComplete(asyncResult -> {if (asyncResult.failed()) {fail(asyncResult.cause());return;}// 3、执行Filter.afterReceiveResponse,可用于动态修改Response撒handleResponse(asyncResult.result());});processServiceCombHeaders(invocation, operationMeta);restClientRequest.end().onComplete((t) -> invocation.getInvocationStageTrace().finishWriteToBuffer(System.nanoTime()));});return Future.succeededFuture();}).onFailure(failure -> {});}
    

RegistrationManager-注册服务

Faced门面类,用于管理全部的Registration,功能全部路由到下游实体类,Registration是实际管理Provider注册到注册中心的逻辑(支持动态扩展不同的注册中心,可插拔)。会通过SPI的方式加载全部的Registration.class实现,解耦和实际注册中心处理类的实现,Faced设计模式可以学习,门面简化内部的实现细节及其使用。

  1. RegistrationManager.INSTANCE.init()INSTANCE单例初始化,模板方法addBeans有些牛逼,做了两件事,非常值得学习啊。
    其一:自动给SPI加载的类Autowire(兼容Spring)。

    其二:将Spring框架里面的Bean也加载到RegistrationManager之中。

    // 1、SPI加载全部的实现,并调用过滤方法
    SPIServiceUtils.getOrLoadSortedService(Registration.class).stream().filter((SPIEnabled::enabled)).forEach(registrationList::add);
    // 2、会在CseApplicationListener#setApplicationContext被调用的init
    public void init() {//BeanUtils.addBeans(Registration.class, registrationList);initPrimary();// 初始化全部的registrationregistrationList.forEach(registration -> registration.init());}public static <T extends SPIOrder & SPIEnabled> void addBeans(Class<T> cls, List<T> exists) {if (context == null) {return;}// 尝试通过Spring中的Bean autowire相应的instancefor (T instance : exists) {context.getAutowireCapableBeanFactory().autowireBean(instance);}// 添加到exists中for (T bean : context.getBeansOfType(cls).values()) {if (bean.enabled()) {exists.add(bean);}}// 排个序呗exists.sort(Comparator.comparingInt(SPIOrder::getOrder));}
    }
    
  2. RegistrationManager.INSTANCE.run()

    public void run() {EventManager.getEventBus().register(new AfterServiceInstanceRegistryHandler(registrationList.size())); registrationList.forEach(registration -> registration.run());}
    
  3. Registration的具体实现-ServiceCenterRegistration,代理到RegistryUtils通过ServiceRegistry(RemoteServiceRegistry是其实现)和注册中心通信。

    • RemoteServiceRegistry,Rest请求,事件总线监听者模式,定时任务执行,这里可以后续细看细看EventBus发挥着巨大的作用。

      • 定时执行ServiceCenterTask,包括各种Runnable的实现,定时执行参数从ServiceRegistryConfig中获取。
      • 定时心跳健康监测执行。
      • 通过ServiceRegistryClientImpl和注册中心Rest通信,注册服务到注册中心。在执行的逻辑中,会通过post各种事件,让外部监听者可以收到相应的请求。
  4. Registration的具体实现-LocalRegistration,本地注册中心

DiscoveryManager-服务发现

Registration/Discovery-继承多接口将功能抽象分类可学

继承多个接口,每个接口都是高度抽象的功能,区分不同的接口,将方法分类,经典的面向对象的思路,解耦不同的方法,便于后续动态扩展。模板方法将面向接口的编程,使得当前的类功能非常容易扩展。

public interface Registration extends SPIEnabled, SPIOrder, LifeCycle
public interface Discovery extends SPIEnabled, SPIOrder, LifeCycle
  • SPIEnabled,SPI加载进去是否使用。

  • SPIOrder,SPI加载到内存中的排序。

  • LifeCycle,当前类具有生命周期,模板方法相继调用不同的方法,如init、run,destory。

  • ServiceCenterRegistration实现了Registration用于和注册中心交互,代理到RegistryUtils -> RemoteServiceRegistry和注册中心交互。

  • ServiceCenterDiscovery实现了Discovery,同样代理到了RegistryUtils -> RemoteServiceRegistry和注册中心交互。

基于上述二位,我们有必要解读下RemoteServiceRegistry哈。

RemoteServiceRegistry-注意这个怎么和Eureka交互,底层交互有意思

这种类的设计方式和Apollo的类构造很像,分为本地仓库,远端仓库等模式,本地配置,远端配置等。

最终肯定会调用ServiceRegistryClient - > HttpClients 发起Rest请求。

重点关注如下配置?

  1. Provider新增,多久会更新到Consumer服务本地?
  2. Provider挂了,多久会更新到Consumer服务本地?
  3. 这些参数是否支持可以配置?

结合下图理解如下方法流程:

  • Provider心跳保活/注册,定时任务和注册中心保活provider,这是定时间隔serviceRegistryConfig.getHeartbeatInterval() /s,会定时执行ServiceCenterTask -> MicroserviceServiceCenterTask,包括如下Task,注册微服务监控,心跳,状态同步等。

    addTask(new MicroserviceRegisterTask(eventBus, srClient, microservice));
    addTask(new MicroserviceInstanceRegisterTask(eventBus, serviceRegistryConfig, srClient, microservice));
    addTask(new MicroserviceWatchTask(eventBus, serviceRegistryConfig, srClient, microservice));
    addTask(new MicroserviceInstanceHeartbeatTask(eventBus, srClient, microservice)); // 心跳心跳,活着么?
    addTask(new MicroserviceInstanceStatusSyncTask(eventBus, srClient, microservice));
    
  • Provider下线

  • Consumer定时更新拉取Provider并更新本地服务缓存:定时间隔,serviceRegistryConfig.getInstancePullInterval() /s,如果服务失效需要移除服务。org.apache.servicecomb.serviceregistry.registry.cache.RefreshableServiceRegistryCache#refreshCache

寻找某个jar包由哪个pom文件引入

1、mvn dependency:tree -Dverbose -Dincludes=org.apache.servicecomb:java-chassis-core:2.1.5

2、mvn dependency:tree -Dverbose 找出全部依赖树,如果失败则执行向下install方法

@RpcSchema-Pojo类型调用的生产者

@RpcSchema(schemaId = "hello")
public class HelloImpl implements Hello {@Overridepublic String sayHi(String name) {return "Hello " + name;}@Overridepublic String sayHello(Person person) {return "Hello person " + person.getName();}
}
  • 注解会被PojoProducers这个BeanPostProcessor处理,用于往注册中心注册服务。

    protected void processProvider(String beanName, Object bean) {// 1、aop后,新的实例的父类可能是原class,也可能只是个proxy,父类不是原class,所以,需要先取出原class,再取标注// 调用 AopProxyUtils.ultimateTargetClassClass<?> beanCls = BeanUtils.getImplClassFromBean(bean);if (beanCls == null) {return;}RpcSchema rpcSchema = beanCls.getAnnotation(RpcSchema.class);if (rpcSchema == null) {return;}// 2、获取schemaId,如果没有传递则获取接口名String schemaId = rpcSchema.schemaId();if (StringUtils.isEmpty(schemaId)) {Class<?>[] intfs = beanCls.getInterfaces();if (intfs.length == 1) {schemaId = intfs[0].getName();} else {throw new Error("Must be schemaId or implements only one interface");}}// 3、注册producer元信息PojoProducerMeta pojoProducerMeta = new PojoProducerMeta();pojoProducerMeta.setSchemaId(schemaId); //pojoProducerMeta.setSchemaInterface(rpcSchema.schemaInterface()); // 接口名pojoProducerMeta.setInstance(bean); // 实例类registerPojoProducer(pojoProducerMeta);}
    
    • 在SCB启动的时候,scbEngine.getProducerProviderManager().getProducerProviderList(),获取全部ProducerProvider的实现类,用于获取Bean工厂中的生产提供者。包括从SPI中创建的,及使用Bean的形式注入的,默认主要有以下两种

      • PojoProducerProvider:scb run的时候,会调用PojoProducerProvider的Init方法从PojoProducers中获取全部的Provider元信息,注意在scbRun的时候,Bean工厂实例化全部的Bean实例。

        public List<ProducerMeta> init() {// for some test cases, there is no spring contextif (BeanUtils.getContext() == null) {return Collections.emptyList();}PojoProducers pojoProducers = BeanUtils.getContext().getBean(PojoProducers.class);for (ProducerMeta producerMeta : pojoProducers.getProducerMetas()) {PojoProducerMeta pojoProducerMeta = (PojoProducerMeta) producerMeta;initPojoProducerMeta(pojoProducerMeta);}return pojoProducers.getProducerMetas();}
        
      • RestProducerProvider:同理,从RestProducers中获取全部提供Rest接口的实例信息。

    • `会获取PojoProducerProvider -> 。

      
      
  • SCB.Run的时候,会通过获取到的元信息,将其注册到注册中心上对外提供服务。

@RestSchema-Rest形式调用的生产者

和RpcSchema处理方式类似,并且可以和RpcSchema的形式并存。

  • 注解会被RestProducers处理,并将元信息保存。
  • 在scb.run的时候,会被RestProducerProvider获取全部的元信息,并传递到SCB最终的控制中心中去。

@RpcReference-消费者

@RpcReference(microserviceName = "hello", schemaId = "hello")
private static Hello hello;

Pojo形式的Rpc消费方的注解,将自动从注册中心拉取对应的服务名,动态代理Compute并注入代理类。

  • 注解会被RpcReferenceProcessor这个BeanPostProcessor处理,动态代理增强Hello。十分注意StringValueResolver支持动态解析占位符。

    public class RpcReferenceProcessor implements BeanPostProcessor, EmbeddedValueResolverAware {private StringValueResolver resolver;@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {// 扫描所有field,处理扩展的field标注ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {processConsumerField(bean, field);}});return bean;}protected void processConsumerField(Object bean, Field field) {RpcReference reference = field.getAnnotation(RpcReference.class);if (reference == null) {return;}handleReferenceField(bean, field, reference);}private void handleReferenceField(Object obj, Field field,RpcReference reference) {String microserviceName = reference.microserviceName();microserviceName = resolver.resolveStringValue(microserviceName); // 微服务的名字支持占位符PojoReferenceMeta pojoReference = new PojoReferenceMeta();pojoReference.setMicroserviceName(microserviceName);pojoReference.setSchemaId(reference.schemaId());pojoReference.setConsumerIntf(field.getType());// proxy = Invoker.createProxy(microserviceName, schemaId, consumerIntf); 动态代理pojoReference.afterPropertiesSet();ReflectionUtils.makeAccessible(field);ReflectionUtils.setField(field, obj, pojoReference.getProxy());} }
    
    1. 设置Consumer的元信息,包括服务的名字,SchemaId,契约接口名,最后根据接口名,生成动态代理类。
    2. 反射的方式注入field。必须makeAccessible使其支持private注入。

SPIServiceUtils-自定义SPI加载器

ServiceComb大量使用SPI动态扩展对应的实现,SPI定义的使能及优先级,从全部jar包中

META-INF/services的路径文件中加载扩展类,文件名为接口类的全路径里面内容为接口实现。

  • SPIOrder,支持加载SPI顺序,排序从小到大,order数值越小,优先级越高,调用越靠前。

  • SPIEnabled,支持SPI实现类是否使能

  • SPIServiceUtils,代理到JDK自带的ServiceLoader动态加载SPI类,并使用ReflectionUtils用于获取SPIOrder排序对应的实现类

    // 加载SPI全部类
    public static <T> List<T> loadSortedService(Class<T> serviceType) {List<Entry<Integer, T>> serviceEntries = new ArrayList<>();ServiceLoader<T> serviceLoader = ServiceLoader.load(serviceType); // 代理到JDKserviceLoader.forEach(service -> {int serviceOrder = 0;// 实现getOrder方法就获取,否则就使用默认值,不强迫用户实现Order方法Method getOrder = ReflectionUtils.findMethod(service.getClass(), "getOrder"); if (getOrder != null) {serviceOrder = (int) ReflectionUtils.invokeMethod(getOrder, service); // 触发方法}Entry<Integer, T> entry = new SimpleEntry<>(serviceOrder, service);serviceEntries.add(entry);});List<T> services = serviceEntries.stream().sorted(Comparator.comparingInt(Entry::getKey)).map(Entry::getValue).collect(Collectors.toList()); // 根据 getOrder 排序,并返回全部的serviceType。LOGGER.info("Found SPI service {}, count={}.", serviceType.getName(), services.size());for (int idx = 0; idx < services.size(); idx++) {T service = services.get(idx);LOGGER.info("  {}. {}.", idx, service.getClass().getName());} // 输出加载的日志及其顺序,方便定位调试功能return services;}
    // Map<Class<?>, List<Object>> cache = new ConcurrentHashMap<>();
    // loadSortedService 并非线程安全,因此添加类锁
    public static <T> List<T> getOrLoadSortedService(Class<T> serviceType) {List<Object> services = cache.get(serviceType);if (services == null) {synchronized (LOCK) {services = cache.get(serviceType);if (services == null) {services = (List<Object>) loadSortedService(serviceType);cache.put(serviceType, services);}}}

SPI加载类的全量日志:



职责链管理器FilterChainsManager/ConsumerHandlerManager

不论在服务的提供方或者在服务的调用方,在触发请求或者响应请求的时候,都会通过职责链的方式顺序调用各种Handler,框架动态从handler.xml文件中读取Handler算子,支持在yaml文件中通过配置文件动态装配,非常灵活。

  • 实现了Handler仓库HandlerConfigUtils。PaaSResourceUtils继承Spring ResourceUtils,读取资源文件

    // 1、handler-loadbalance-2.6.0.jar ->cse.handler.xml
    <config><handler id="loadbalance"class="org.apache.servicecomb.loadbalance.LoadbalanceHandler"/>
    </config>// 2、从配置文件中加载全部的Config类
    private static Config loadConfig() throws Exception {Config config = new Config();// 1、读取配置文件 classpath* 代表在全量jar包中寻找// 2、classpath*:config/cse.handler.xml   classpath*:config/cse.*.handler.xml// 3、PathMatchingResourcePatternResolver 加载全部的Resource资源路径// 4、排序资源的路径(xxxx.handler.cse,根据名字xxx进行排序)List<Resource> resList =PaaSResourceUtils.getSortedResources("classpath*:config/cse.handler.xml", ".handler.xml");for (Resource res : resList) {// 5. 通过XmlMapper读取handler.xml文件映射到Config类中Config tmpConfig = XmlLoaderUtils.load(res, Config.class);config.mergeFrom(tmpConfig);}
    
  • 从xml文件中加载到Config.class,这里可以学习如何读取XML文件
    通过ObjectMapper xmlMapper = new XmlMapper()读取readValue(res.getURL(), cls)

    <config><handler id="loadbalance"class="org.apache.servicecomb.loadbalance.LoadbalanceHandler"/>
    </config>// 1、通过注解,将Config Bean类 和 xml文件一一映射
    @JacksonXmlRootElement // XML根元素<config>
    public class Config {// key为handler idprivate Map<String, Class<Handler>> handlerClassMap = new HashMap<>();public void mergeFrom(Config otherConfig) {handlerClassMap.putAll(otherConfig.handlerClassMap);}public Map<String, Class<Handler>> getHandlerClassMap() {return this.handlerClassMap;}@JacksonXmlProperty(localName = "handler") // 属性@JacksonXmlElementWrapper(useWrapping = false)public void setHandlerConfigList(List<HandlerConfig> handlerConfigList) {for (HandlerConfig handlerConfig : handlerConfigList) {handlerClassMap.put(handlerConfig.getHandlerId(), handlerConfig.getClazz());}}
    }
    // 2、定义子属性映射
    public class HandlerConfig {private String handlerId;private Class<Handler> clazz;@JacksonXmlProperty(localName = "id", isAttribute = true)public String getHandlerId() {return handlerId;}public void setHandlerId(String handlerId) {this.handlerId = handlerId;}@JacksonXmlProperty(localName = "class", isAttribute = true)public Class<Handler> getClazz() {return clazz;}public void setClazz(Class<Handler> clazz) {this.clazz = clazz;}@SuppressWarnings("unchecked")public void setClazz(String clazz) throws ClassNotFoundException {this.clazz = (Class<Handler>) Class.forName(clazz);}
    }
  • Handler算子的编排,使用ConsumerHandlerManager和ProducerHandlerManager读取yaml中的配置文件,通过名字对Handler进行编排,Handler并未实现Order接口,调用顺序交给编排方。

    • Consumer/Provider支持自定义ConsumerHandlerManager/ProducerHandlerManager类生成职责链列表,最后的Consumer Handler-TransportClientHandler.INSTANCE, 最后的Provider Handler–ProducerOperationHandler.INSTANCE

      以下动态创建职责链配置:

      // 职责链Key,配置在yaml文件中
      protected List<Handler> create(String microserviceName) {// 是否定义服务对应的handler编排servicecomb.handler.chain.Provider.service.calculator// 没有则使用默认的 servicecomb.handler.chain.Provider.defaultString handlerChainKey = "servicecomb.handler.chain." + getName() + ".service." + microserviceName;String chainDef = DynamicPropertyFactory.getInstance().getStringProperty(handlerChainKey,defaultChainDef).get();LOGGER.info("get handler chain for [{}]: [{}]", handlerChainKey, chainDef);return createHandlerChain(chainDef);}
      

BootListener

PojoInvocation保存List<Handler>职责链,并实现

举一反三

EventBus

事件总线:通过发布事件类来驱动监听当前事件的类中方法的调用,简化监听者模式的实现

以下几个使用的注解可以了解下:

@EnableExceptionPropagation
@SubscriberOrder(-1000)
@Subscribe

PaaSResourceUtils+PathMatchingResourcePatternResolver

  • 继承自Spring自带的ResourceUtils用于resolving resource locations to files in the file system,获取文件及判断文件。
  • PathMatchingResourcePatternResolver通过正则方式去读取路径上文件并返回Resource[]
  • 正因为Resolver的存在可以解析"classpath*:config/cse.handler.xml", ".handler.xml"这种资源路径。

PropertySourcesPlaceholderConfigurer

BeanFactoryPostProcessor,用于解析Bean对应的Property。

  • @Value注解,会被AutowiredAnnotationBeanPostProcessor解析,并替换占位符注入合适的值。
  • XML配置如果含有属性注入配置,则PropertySourcesPlaceholderConfigurer#postProcessBeanFactory会被替换占位符注入合适的值。

ThreadPoolExecutorEx/LinkedBlockingQueueEx

ThreadPoolExecutorEx

背景:扩展的ThreadPool,在coreSize到达的时候,并发继续增加,会优先创建Thread直到达到maxSize,才会放入BlockQueue,Dubbo和Tomcat都扩展了类似的功能,当BlockQueue的size为无限大的时候,避免Thread永远无法到达maxSize。

实现:默认ThreadPool按照如下逻辑执行Task。

public void execute(Runnable command) {     int c = ctl.get();if (workerCountOf(c) < corePoolSize) { // 1、workTask小于coreSize,则直接创建新的Thread执行command。if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) { // 2、已达到coreSize,则放入BlockQueue调用offer方法。int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false)) // 3、BlockQueue放满,则继续添加worker,直到达到最大。reject(command);
}

如上要点在第二步,只需要修改workQueue的offer逻辑,就可以优先创建Thead,可使用扩展的LinkedBlockingQueueEx。

在offer的时候,获取TheadPool的size,当小于最大的时候,直接创建新的Thead,执行Task。

public class LinkedBlockingQueueEx extends LinkedBlockingQueue<Runnable> {private transient volatile ThreadPoolExecutorEx owner = null;public void setOwner(ThreadPoolExecutorEx owner) {this.owner = owner;}@Overridepublic boolean offer(Runnable runnable) {// task can come before owner availableif (owner == null) {return super.offer(runnable);}// can not create more thread, just queue the taskif (owner.getPoolSize() == owner.getMaximumPoolSize()) {return super.offer(runnable);}// no need to create more thread, just queue the taskif (owner.getNotFinished() <= owner.getPoolSize()) {return super.offer(runnable);}// all threads are busy, and can create new thread, not queue the taskif (owner.getPoolSize() < owner.getMaximumPoolSize()) {return false;}return super.offer(runnable);}/** when task is rejected (thread pool if full), force the item onto queue.*/public boolean force(Runnable runnable) {if (owner == null || owner.isShutdown()) {throw new RejectedExecutionException("queue is not running.");}return super.offer(runnable);}
}

相应的扩展的TheadPool也override部分方法,用于当前提交的任务总数,完成的人数总数,拒绝的任务总数。

public class ThreadPoolExecutorEx extends ThreadPoolExecutor {private AtomicInteger submittedCount = new AtomicInteger();private AtomicInteger finishedCount = new AtomicInteger();private AtomicInteger rejectedCount = new AtomicInteger();public ThreadPoolExecutorEx(int coreThreads, int maxThreads, int maxIdleInSecond, TimeUnit timeUnit,BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {super(coreThreads, maxThreads, maxIdleInSecond, timeUnit, queue, threadFactory);if (queue instanceof LinkedBlockingQueueEx) {((LinkedBlockingQueueEx) queue).setOwner(this);}setRejectedExecutionHandler(this::rejectedExecution);}@Overridepublic void execute(Runnable command) {submittedCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException e) {if (getQueue() instanceof LinkedBlockingQueueEx) {final LinkedBlockingQueueEx queue = (LinkedBlockingQueueEx) getQueue();if (!queue.force(command)) {throw new RejectedExecutionException("thread pool queue is full");}} else {throw e;}}}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {rejectedCount.incrementAndGet();finishedCount.incrementAndGet();throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}@Overrideprotected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);finishedCount.incrementAndGet();}public int getNotFinished() {return submittedCount.get() - finishedCount.get();}public int getRejectedCount() {return rejectedCount.get();}
}

ConcurrentHashMapEx

默认的ConcurrentHashMap当Key都在同一个桶的时候,调用computeIfAbsent的时候,都会对当前的桶加锁(分段锁),当在高并发读多余写的场景,这里的加锁会影响单个桶的性能,因此可以优先通过CAS获取下元素,然后再调用默认方法,扩展性能。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FzUwVpEM-1671287799657)(images/image-20221204161851865.png)]

  // ConcurrentHashMap.computeIfAbsent always do "synchronized" operation// so we wrap it to improve performance@Overridepublic V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {V value = get(key); // CAS GETif (value != null) {return value;}return super.computeIfAbsent(key, mappingFunction);}

AopProxyUtils/BeanUtils

ReflectionUtils

  • ReflectionUtils#doWithFields(Class<?> clazz, FieldCallback, FieldFilter):使用Lamda表达式简化实现模板方法<可以参考>,支持自定义处理方法以及过滤逻辑,也类似实现了访问者模式的效果,可以支持访问Filed,访问Methed等逻辑。

    getDeclaredFields会优先从ConcurrentReferenceHashMap缓存中获取,否则再从clazz.getDeclaredFields()中获取

           // 递归获取全部的DeclaredFields,然后执行过滤以及处理逻辑Class<?> targetClass = clazz;do {
    // public, protected, default (package) access, and private fields, but excludes inherited fields.
    // 因此需要递归Field[] fields = getDeclaredFields(targetClass);for (Field field : fields) {if (ff != null && !ff.matches(field)) {continue;}try {fc.doWith(field);}catch (IllegalAccessException ex) {}}targetClass = targetClass.getSuperclass();}while (targetClass != null && targetClass != Object.class);}
    
  • makeAccessible:合理将Method、Feild、Ctor设置为Accessible,支持set值进去。

  • setField(Field field, @Nullable Object target, @Nullable Object value),将实例的Field设置Value。

  • findField(Class<?> clazz, @Nullable String name, @Nullable Class<?> type) :递归通过name或type寻找Field,找到第一个就直接返回。两个判断条件且支持传null,所以可以加个&&和短路法逻辑,type为null则后面逻辑不起作用,如果不为null则后面逻辑需要使用,简化逻辑的书写,这个可以学习下。(type == null || type.equals(field.getType()))

            Class<?> searchType = clazz;while (Object.class != searchType && searchType != null) {Field[] fields = getDeclaredFields(searchType);for (Field field : fields) {// 短路法,极大简化了if else逻辑的书写,非常方便。if ((name == null || name.equals(field.getName())) &&(type == null || type.equals(field.getType()))) {return field; // 找到就返回了哈,不继续找了。}}searchType = searchType.getSuperclass();}return null;
    
  • 其余方法代办。

StringValueResolver

Bean类实现EmbeddedValueResolverAware接口,则会自动注入StringValueResolver,用于解析带有占位符的String值,如${com.huawei.com.name} -> Perjoker。这样我们也支持解析从属性配置中 自动填充String占位符啦。

SpringCloud

负载均衡

ServiceComb场景及其原理相关推荐

  1. 简单粗暴理解与实现机器学习之逻辑回归:逻辑回归介绍、应用场景、原理、损失以及优化...

    作者 | 汪雯琦 责编 | Carol 来源 | CSDN 博客 出品 | AI科技大本营(ID:rgznai100) 学习目标 知道逻辑回归的损失函数 知道逻辑回归的优化方法 知道sigmoid函数 ...

  2. Mysql binlog应用场景与原理深度剖析

    1 基于binlog的主从复制 Mysql 5.0以后,支持通过binary log(二进制日志)以支持主从复制.复制允许将来自一个MySQL数据库服务器(master) 的数据复制到一个或多个其他M ...

  3. 好文推荐 | MySQL binlog应用场景与原理深度剖析

    作者:田守枝 来自:田守枝的博客(公众号) 本文深入介绍Mysql Binlog的应用场景,以及如何与MQ.elasticsearch.redis等组件的保持数据最终一致.最后通过案例深入分析binl ...

  4. Hbase的应用场景、原理及架构分析(转:https://blog.csdn.net/xiangxizhishi/article/details/75388971)

    Hbase概述 hbase是一个构建在HDFS上的分布式列存储系统.HBase是Apache Hadoop生态系统中的重要 一员,主要用于海量结构化数据存储.从逻辑上讲,HBase将数据按照表.行和列 ...

  5. 详解 Redis 应用场景及原理

    本文转自https://blog.csdn.net/niucsd/article/details/50966733,描述了redis实现原理和应用场景,篇幅较长,有意学习redis的同学可耐心阅读. ...

  6. AD FS是什么,用在什么场景,原理是什么?

    AD FS(联合身份验证)是一种身份访问解决方案,即使用户帐户和应用程序位于完全不同的网络或组织中,它也可以为客户端计算机(网络内部或外部)提供对受保护的面向Internet的应用程序或服务的无缝SS ...

  7. 远程诊断DoIP(笔记一)场景和原理

    根据互联网中的定义, OSI模型中,由软件(进程)或者硬件(输入/输出芯片)实现的活跃部分称之为实体Entity,用于进行信息的发送或者接收.实体是子系统中的活动单元,每一层被拆分成多个实体,同一层内 ...

  8. 人工智能大模型多场景应用原理解析

    前言 在上篇文章<人工智能大模型之ChatGPT原理解析>中分享了一些大模型之ChatGPT的核心原理后,收到大量读者的反馈,诸如:在了解了核心原理后想进一步了解未来的发展趋势(比如生成式 ...

  9. Unity 场景烘焙原理

    一.基础四种烘焙方式 1.静态灯光下静态物体烘焙: 2.静态灯光下动态物体烘焙: 3.动态灯光下静态物体烘焙: 4.动态灯光下动态物体烘焙: 二.实现方法 1.静态灯光下静态物体烘焙设置如下: ①灯光 ...

最新文章

  1. 虚拟机访问svn服务器超时_SVN卡顿原因及简单修复方法
  2. ps随机排列_漂亮!自然材料:人工可控微米级胶体粒子“堆积木”——粒子随心所欲的组装排列!...
  3. c# 大数据量比较时-方案
  4. 找工作笔试面试那些事儿(10)---SQL语句总结
  5. win10蓝牙允许设备连接到此计算机,Win7自由天空专业版系统配置蓝牙时“允许Bluetooth设备连接到此计算机”选项灰...
  6. java文档注释 编写格式
  7. python pygame sdl2教程_无法安装pygame sdl2
  8. apache2 + django
  9. 非旋Treap——维护数列
  10. 微信小程序npm引用ui框架
  11. requests爬取4399游戏链接
  12. macOS安装MySQL,使用Navicat连接MySQL数据库/2022
  13. php时间戳求时间差,php中计算时间差的几种方法
  14. 03 注册Gitee账号及设置公钥
  15. 2020年中国科技行业最可能发生的38件事
  16. egret 发布微端项目
  17. 如何用 JS 实现 3D 赛车效果
  18. CSS高度塌陷问题(float塌陷margin塌陷)
  19. Servlet入门案例
  20. DeepLearning(李沐老师)

热门文章

  1. python元宵节_用Python写一份独特的元宵节祝福
  2. 服务器安全狗拦截微信,服务器安全狗v4.0 DDOS防护功能设置教程
  3. 成都盛迈坤电商:店铺直通车要怎么操作
  4. UVA 662 - Fast Food
  5. 像背单词一样搞定机器学习关键概念:300张小抄表满足你的所有AI好奇
  6. 个人使用C++标准类
  7. [BUUCTF]REVERSE解题记录 [GWCTF 2019]pyre
  8. 如何用python画玫瑰花_如何用python画玫瑰花
  9. 关于虚拟机移动或复制后,系统无法启动或网卡配置不生效效的解决方法
  10. 【观察】Akamai:做中国泛娱乐企业出海的“加速器”