文章目录

  • 测试代码
  • InvokerConfig定义
  • 服务调用方静态初始化流程
    • ServiceFactory静态初始化
  • ServiceFactory#getService方法实现
    • 调用方启动类InvokerBootStrap 初始化流程
    • 解析序列化类创建服务代理对象ServiceInvocationProxy
    • 注册负载均衡管理器
    • 区域路由策略
    • 发布客户端到注册中心
  • 基于Xml配置完成服务注册

测试代码

以下面最小化代码初始pigeon服务调用者为例,分析pigeon完成服务注册的流程:

public static void main(String[] args) throws Exception {InvokerConfig<EchoService> invokerConfig = new InvokerConfig<>(EchoService.class);EchoService echoService = ServiceFactory.getService( invokerConfig);System.out.println("echoService result:" + echoService.echo("echoService_input"));
}

调用ServiceFactory.getService创建流程:

  1. 创建一个InvokerConfig,至少传入一个接口类全限定名
  2. 调用ServiceFactory.getService(invokerConfig)获取具体的代理对象
  3. 以接口引用,直接通过代理对象调用远程方法

InvokerConfig定义

每个InvokerConfig对应一个调用服务,内部属性定义如下:

public class InvokerConfig<T> {private static final Logger logger = LoggerLoader.getLogger(InvokerConfig.class);/* 调用方式 */// 同步调用,客户端线程会阻塞等待返回结果,默认设置是sync模式。public static final String CALL_SYNC = CallMethod.SYNC.getName();// 回调方式,客户端将请求提交给pigeon后立即返回,也不等待返回结果,它与future方式的区别是// callback必须提供一个实现了pigeon提供的InvocationCallback接口的回调对象给pigeon,pigeon负责接收返回结果并传递回给这个回调对象public static final String CALL_CALLBACK = CallMethod.CALLBACK.getName();// 客户端只是将请求传递给pigeon,pigeon提交给服务端,客户端也不等待立即返回,服务端也不会返回结果给客户端,这种方式一般都是没有返回结果的接口调用。​public static final String CALL_ONEWAY = CallMethod.ONEWAY.getName();// 客户端将请求提交给pigeon后立即返回,不等待返回结果,由pigeon负责等待返回结果,客户端可以自行决定何时何地来取返回结果public static final String CALL_FUTURE = CallMethod.FUTURE.getName();/* 服务协议 */// 服务器协议,httppublic static final String PROTOCOL_HTTP = Constants.PROTOCOL_HTTP;// 服务器协议,tcppublic static final String PROTOCOL_DEFAULT = Constants.PROTOCOL_DEFAULT;/* 序列化方式字段 */// hessian序列化方式public static final String SERIALIZE_HESSIAN = SerializerType.HESSIAN.getName();// java原生序列化方式public static final String SERIALIZE_JAVA = SerializerType.JAVA.getName();// protostuff序列化方式public static final String SERIALIZE_PROTO = SerializerType.PROTO.getName();// json序列化方式public static final String SERIALIZE_JSON = SerializerType.JSON.getName();// FST序列化方式public static final String SERIALIZE_FST = SerializerType.FST.getName();// 配置中心private ConfigManager configManager = ConfigManagerLoader.getConfigManager();// 服务接口private Class<T> serviceInterface;// 服务访问urlprivate String url;// 服务访问版本private String version;// 方法调用方式字节private byte callMethod = CallMethod.SYNC.getCode();// 方法调用方式字符串private String callType = CallMethod.SYNC.getName();// HESSIAN序列化方式眦裂private byte serialize = SerializerType.HESSIAN.getCode();// 调用超时时间private int timeout = configManager.getIntValue(Constants.KEY_INVOKER_TIMEOUT, Constants.DEFAULT_INVOKER_TIMEOUT);// 调用回调private InvocationCallback callback;// 泳道private String suffix = configManager.getGroup();// 负载均衡算法private String loadbalance = LoadBalanceManager.DEFAULT_LOADBALANCE;// 区域路由策略private String regionPolicy = RegionPolicyManager.INSTANCE.DEFAULT_REGIONPOLICY;// 是否超时重试private boolean timeoutRetry = false;// 集群访问策略private String cluster = Constants.CLUSTER_FAILFAST;// 重试次数private int retries = 1;// 虚拟ipprivate String vip;// 最大请求数限制private int maxRequests = configManager.getIntValue(Constants.KEY_INVOKER_MAXREQUESTS, 0);// 服务协议private String protocol = Constants.PROTOCOL_DEFAULT;// 当前服务提供的方法private Map<String, InvokerMethodConfig> methods;private ClassLoader classLoader;// 服务访问密钥private transient String secret;// 远程appkeyprivate String remoteAppKey;private Object mock;
}

服务调用方静态初始化流程

ServiceFactory静态初始化

ServiceFactory静态初始化流程类似于服务提供方,大致流程如下:

  1. 在首次访问ServiceFactory,会触发ServiceProxy和PublishPolicy的加载

    1. ServiceProxy提供作为调用方的信息注册,和服务代理获取
    2. PublishPolicy针对特定的providerConfig进行服务注册
  2. 随后会调用ProviderBootStrap.init()初始化对外提供服务的基础依赖
    1. ProviderProcessHandlerFactory.init()初始化所有相关拦截器
    2. SerializerFactory.init()初始化所有序列化方式
    3. ClassUtils.loadClasses(“com.dianping.pigeon”)预加载所有pigeon相关类
    4. 注册关闭钩子ShutdownHookListener,内部调用ServiceFactory.unpublishAllServices()进程完成相关清理工作
    5. RegistryManager.getInstance() 初始化注册管理器
    6. 加载服务器,默认包括NettyServer和JettyHttpServer,当前会注册HTTP服务器
      1. 启动http服务器,默认注册到4080
      2. 设置consoleServer,初始化注册配置

这里代码和服务提供方的初始化流程大致相同,不再展示,下面主要看服务调用方pigeon初始化流程

ServiceFactory#getService方法实现

在具体调用ServiceFactory.getService时,实际会调用serviceProxy.getProxy(invokerConfig),进一步调用AbstractServiceProxy#getProxy方法,这个方法的实现逻辑大致如下:

  1. 检查interface,url,protocol等参数合法性,如果url没有设置,默认使用接口全限定名,如果protocol不为空,且等于default,则修改成@DEFAULT@,同时更新url加上这个协议前缀
  2. 双重加锁检查invokeConfig对应的服务是否以存在,不存在先尝试启动调用方启动类InvokerBootStrap
  3. 先根据配置的序列方式获取相应的序列化类,再根据invokerConfig创建动态代理:ServiceInvocationProxy
  4. 如果配置中的负载均衡配置存在,注册服务到负载均衡管理器中
  5. 注册区域策略服务
  6. ClientManager.getInstance().registerClients(invokerConfig)注册客户端到注册中心(默认为zk)
  7. 缓存服务
    代码实现逻辑如下:
public <T> T getProxy(InvokerConfig<T> invokerConfig) {// 参数检查if (invokerConfig.getServiceInterface() == null) {throw new IllegalArgumentException("service interface is required");}// 如果url没有设置,默认使用接口全限定名if (StringUtils.isBlank(invokerConfig.getUrl())) {invokerConfig.setUrl(ServiceFactory.getServiceUrl(invokerConfig));}// 如果protocol不为空,且等于default,则修改成@DEFAULT@,同时更新url加上这个协议前缀if (!StringUtils.isBlank(invokerConfig.getProtocol())&& !invokerConfig.getProtocol().equalsIgnoreCase(Constants.PROTOCOL_DEFAULT)) {String protocolPrefix = "@" + invokerConfig.getProtocol().toUpperCase() + "@";if (!invokerConfig.getUrl().startsWith(protocolPrefix)) {invokerConfig.setUrl(protocolPrefix + invokerConfig.getUrl());}}// 双重检查Object service = null;service = services.get(invokerConfig);if (service == null) {synchronized (interner.intern(invokerConfig)) {service = services.get(invokerConfig);if (service == null) {try {// 初始化调用方启动类InvokerBootStrap.startup();// 先根据配置的序列方式获取相应的序列化类,再根据invokerConfig创建动态代理:ServiceInvocationProxyservice = SerializerFactory.getSerializer(invokerConfig.getSerialize()).proxyRequest(invokerConfig);// 如果配置中的负载均衡配置存在,注册服务到负载均衡管理器中if (StringUtils.isNotBlank(invokerConfig.getLoadbalance())) {LoadBalanceManager.register(invokerConfig.getUrl(), invokerConfig.getSuffix(),invokerConfig.getLoadbalance());}} catch (Throwable t) {throw new RpcException("error while trying to get service:" + invokerConfig, t);}// 注册区域路由策略try {regionPolicyManager.register(invokerConfig.getUrl(), invokerConfig.getSuffix(),invokerConfig.getRegionPolicy());} catch (Throwable t) {throw new RouteException("error while setup region route policy: " + invokerConfig, t);}//注册客户端到注册中心try {ClientManager.getInstance().registerClients(invokerConfig);} catch (Throwable t) {logger.warn("error while trying to setup service client:" + invokerConfig, t);}// 缓存服务services.put(invokerConfig, service);}}}return (T) service;
}

调用方启动类InvokerBootStrap 初始化流程

类似于服务提供方启动类ProviderBootStrap,调用方启动类InvokerBootStrap主要完成了服务调用方的初始化流程,具体逻辑如下:

  1. ServiceInvocationRepository.getInstance().init()初始化服务调用仓库,内部初始化一个InvocationTimeoutListener线程并启动,用于检查调用超时,默认每隔一秒检查一次
  2. InvokerProcessHandlerFactory.init() 初始化调用方拦截器链
  3. SerializerFactory.init()初始化序列化工厂,初始化所有序列化方式
  4. LoadBalanceManager.init()初始化负载均衡管理器
    1. 初始化线程类WeightFactorMaintainer和CapacityChecker,并基于loadbalanceThreadPool运行。
    2. 初始化step ticks
  5. RegionPolicyManager.INSTANCE.init()初始化区域策略管理器
    1. 注册三种区域路由策略:AutoSwitchRegionPolicy,WeightBasedRegionPolicy,ForceRegionPolicy
    2. 判断是否允许路由策略,如果允许,初始化相关配置
    3. 注册配置监听,如果修改路由策略配置,重新初始化区域相关配置
  6. 如果监控管理器存在,初始化监控配置
  7. 预启动用于处理相应请求的线程池的所有核心线程数
  8. 标志启动成功

具体实现代码如下:

public static void startup() {if (!isStartup) {synchronized (InvokerBootStrap.class) {if (!isStartup) {// 初始化服务调用仓库,内部初始化一个InvocationTimeoutListener线程并启动,用于检查调用超时,默认每隔一秒检查一次ServiceInvocationRepository.getInstance().init();// 初始化调用方拦截器链InvokerProcessHandlerFactory.init();// 初始化序列化工厂,初始化所有序列化方式SerializerFactory.init();// 初始化负载均衡管理器LoadBalanceManager.init();// 初始化区域路由策略RegionPolicyManager.INSTANCE.init();// 如果监控管理器存在,初始化监控配置Monitor monitor = MonitorLoader.getMonitor();if (monitor != null) {monitor.init();}// 预启动用于处理相应请求的线程池的所有核心线程数ResponseProcessorFactory.selectProcessor().getResponseProcessThreadPool().prestartAllCoreThreads();// 标志启动成功isStartup = true;logger.warn("pigeon client[version:" + VersionUtils.VERSION + "] has been started");}}}
}

在初始化调用方拦截器链一步,初始化了以下拦截器

public static void init() {if (!isInitialized) {if (Constants.MONITOR_ENABLE) {// 远程调用打点监控registerBizProcessFilter(new RemoteCallMonitorInvokeFilter()); }// 请求追踪拦截器,记录请求调用耗时、成功失败等情况registerBizProcessFilter(new TraceFilter());//故障注入拦截器registerBizProcessFilter(new FaultInjectionFilter());// 降级拦截器registerBizProcessFilter(new DegradationFilter());// 集群选择拦截器registerBizProcessFilter(new ClusterInvokeFilter());// 网关拦截器registerBizProcessFilter(new GatewayInvokeFilter());// 调用上下文信息拦截器registerBizProcessFilter(new ContextPrepareInvokeFilter());// 统一鉴权管理拦截器registerBizProcessFilter(new SecurityFilter());//远程调用拦截器registerBizProcessFilter(new RemoteCallInvokeFilter()); // 创建调用方拦截器链bizInvocationHandler = createInvocationHandler(bizProcessFilters);isInitialized = true;}
}

在初始化完一系列的拦截器后,InvokerProcessHandlerFactory调用了createInvocationHandler方法,建立起了一系列拦截器的链式引用,后续接受处理请求时,返回拦截器头节点,在每一个节点的调用中调用下一个节点,来完成链式调用。
下面举例分析拦截器链的生成调用流程:
对于顺序注册的3个拦截器f1,f2,f3,根据遍历顺序f3,f2,f1,用g(f3,null))=g3表示创建了新的ServiceInvocationHandler,内部执行了f3.invoke(null,InvocationContext),对此有以下引用关系:

  1. 第一次遍历:last=null,next=last=null,last=g(f3,null))=g3;
  2. 第二次遍历,last=f3(g(null))=g3,next=last=g1,last=g(f2,g3)=g2
  3. 第三次遍历:last=g(f2,g3)=g2,next=last=g2,next=g(f1,g2)=g1;
  4. 最后return last。
  5. 则每次执行last.handle()方法即会执行g1.handler(),内部调用f1.invoke(),且持有g2的引用,在f1.invoker中间再会执行g2.handler(),然后内部执行f2.invoke()一部分,再执行g3.handle(),内部执行f3.invoke(),执行完后,再执行f2的后半部分,再执行f1的后半部分,即最后的执行顺序为:f1->f2->f3->f2->f1。
    具体创建实现如下:
private static <V extends ServiceInvocationFilter> ServiceInvocationHandler createInvocationHandler(List<V> internalFilters) {// 链式拦截器头节点ServiceInvocationHandler last = null;List<V> filterList = new ArrayList<V>();// 对于list中的拦截器f1,f2,f3,f4拦截器生成流程如下filterList.addAll(internalFilters);for (int i = filterList.size() - 1; i >= 0; i--) {final V filter = filterList.get(i);// 每一层拦截器用ServiceInvocationHandler包装,next作为final ServiceInvocationHandler next = last;last = new ServiceInvocationHandler() {@SuppressWarnings("unchecked")@Overridepublic InvocationResponse handle(InvocationContext invocationContext) throws Throwable {// 调用filter f(n),内部持有f(n-1)的引用,会在执行f(n)的中途执行f(n-1)InvocationResponse resp = filter.invoke(next, invocationContext);return resp;}};}return last;
}

解析序列化类创建服务代理对象ServiceInvocationProxy

pigeon根据根据配置的序列化方式获取到相应的序列化工厂,如默认配置hessian,会拿到HessianSerializerFactory。接着会调用抽象父类的proxyRequest方法,借助Java jdk动态代理生成相应的服务调用代理类,具体代码如下:

public Object proxyRequest(InvokerConfig<?> invokerConfig) throws SerializationException {return Proxy.newProxyInstance(ClassUtils.getCurrentClassLoader(invokerConfig.getClassLoader()),new Class[]{invokerConfig.getServiceInterface()}, new ServiceInvocationProxy(invokerConfig,InvokerProcessHandlerFactory.selectInvocationHandler(invokerConfig)));
}

在源码中,创建了一个ServiceInvocationProxy来根据invokerConfig完成具体的代理工作,调用ServiceInvocationProxy的构造函数用到了两个参数,一个是InvokerConfig,另一个则是ServiceInvocationHandler。ServiceInvocationHandler实际上是一个链式拦截器,内部定义了唯一的方法handle,在内部进行了拦截器的链式调用。
在每次调用代理对象的方法时,实际调用的是ServiceInvocationProxy#invoke方法,在这个方法里,再实际完成拦截器链的链式调用。

下面先看看ServiceInvocationProxy的实现:

public class ServiceInvocationProxy implements InvocationHandler {private static final Logger logger = LoggerLoader.getLogger(ServiceInvocationProxy.class);// 调用服务配置private InvokerConfig<?> invokerConfig;// 服务调用拦截器链处理器private ServiceInvocationHandler handler;public ServiceInvocationProxy(InvokerConfig<?> invokerConfig, ServiceInvocationHandler handler) {this.invokerConfig = invokerConfig;this.handler = handler;}// 调用代理接口方法时,实际会调用下面方法public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {String methodName = method.getName();Class<?>[] parameterTypes = method.getParameterTypes();// 如果方法定义于Object类中,直接调用if (method.getDeclaringClass() == Object.class) {return method.invoke(handler, args);}if ("toString".equals(methodName) && parameterTypes.length == 0) {return handler.toString();}if ("hashCode".equals(methodName) && parameterTypes.length == 0) {return handler.hashCode();}if ("equals".equals(methodName) && parameterTypes.length == 1) {return handler.equals(args[0]);}// 交由拦截器链进行处理,处理完后解析结果返回return extractResult(handler.handle(new DefaultInvokerContext(invokerConfig, methodName, parameterTypes, args)),method.getReturnType());}// 解析调用结果public Object extractResult(InvocationResponse response, Class<?> returnType) throws Throwable {Object responseReturn = response.getReturn();if (responseReturn != null) {// 判断结果类型,如果是服务异常或应用异常进行封装int messageType = response.getMessageType();if (messageType == Constants.MESSAGE_TYPE_SERVICE) {return responseReturn;} else if (messageType == Constants.MESSAGE_TYPE_EXCEPTION) {throw InvokerUtils.toRpcException(response);} else if (messageType == Constants.MESSAGE_TYPE_SERVICE_EXCEPTION) {throw InvokerUtils.toApplicationException(response);}throw new BadResponseException(response.toString());}// 基本类型且无返回值则返回相关类型的默认值return getReturn(returnType);}// 根据基本类型返回对应的默认值private Object getReturn(Class<?> returnType) {if (returnType == byte.class) {return (byte) 0;} else if (returnType == short.class) {return (short) 0;} else if (returnType == int.class) {return 0;} else if (returnType == boolean.class) {return false;} else if (returnType == long.class) {return 0l;} else if (returnType == float.class) {return 0.0f;} else if (returnType == double.class) {return 0.0d;} else {return null;}}}

下面再看看ServiceInvocationHandler是怎么获取。
在前面的代码里,InvokerBootStrap通过调用InvokerProcessHandlerFactory.init()初始化了一系列的拦截器,在InvokerProcessHandlerFactory#selectInvocationHandler方法里则返回了这一系列的拦截器。最后调用了last的handler->f(n)。invoke,再一次次递归调用拦截器链里的每个拦截器。

注册负载均衡管理器

在这一步里,主要根据服务名,泳道,负载均衡策略字段等参数确保相应的负载均衡策略完成注册,系统原有的四种负载均衡算法RandomLoadBalance,AutoawareLoadBalance,RoundRobinLoadBalance,WeightedAutoawareLoadBalance在这一步一般都已经初始化好,这里主要用于注册自定义的负载均衡策略器。
具体注册流程如下:

public static void register(String serviceName, String suffix, Object loadBalance) {// 解析serviceId,默认serviceName,如果suffix不为空,则为默认serviceName:suffixString serviceId = ServiceUtils.getServiceId(serviceName, suffix);LoadBalance loadBlanceObj = null;if (loadBalance instanceof LoadBalance) {loadBlanceObj = (LoadBalance) loadBalance;} else if (loadBalance instanceof String && StringUtils.isNotBlank((String) loadBalance)) {// 看是否为默认存在的if (!loadBalanceMap.containsKey(loadBalance)) {try {// 不存在则加载相应实例,进行实例化,一般是自定义情况才需要实例化Class<? extends LoadBalance> loadbalanceClass = (Class<? extends LoadBalance>) ClassUtils.loadClass((String) loadBalance);loadBlanceObj = loadbalanceClass.newInstance();} catch (Throwable e) {throw new IllegalArgumentException("failed to register loadbalance[service=" + serviceId+ ",class=" + loadBalance + "]", e);}} else {loadBlanceObj = loadBalanceMap.get(loadBalance);}} else if (loadBalance instanceof Class) {try {// 实例化Class<? extends LoadBalance> loadbalanceClass = (Class<? extends LoadBalance>) loadBalance;loadBlanceObj = loadbalanceClass.newInstance();} catch (Throwable e) {throw new IllegalArgumentException("failed to register loadbalance[service=" + serviceId + ",class="+ loadBalance + "]", e);}}// 缓存起来if (loadBlanceObj != null) {loadBalanceMap.put(serviceId, loadBlanceObj);}
}

区域路由策略

类似于注册负载均衡策略的流程,下面直接看源码:

public void register(String serviceName, String suffix, Object regionPolicy) {// 解析serviceId,默认serviceName,如果suffix不为空,则为默认serviceName:suffixString serviceId = ServiceUtils.getServiceId(serviceName, suffix);RegionPolicy regionPolicyObj = null;if (regionPolicy instanceof RegionPolicy) {regionPolicyObj = (RegionPolicy) regionPolicy;} else if (regionPolicy instanceof String && StringUtils.isNotBlank((String) regionPolicy)) {// 看是否为默认存在的if (!regionPolicyMap.containsKey(regionPolicy)) {try {// 不存在则加载相应实例,进行实例化,一般是自定义情况才需要实例化Class<? extends RegionPolicy> regionPolicyClass = (Class<? extends RegionPolicy>) ClassUtils.loadClass((String) regionPolicy);regionPolicyObj = regionPolicyClass.newInstance();} catch (Throwable e) {throw new IllegalArgumentException("failed to register regionPolicy[service=" + serviceId+ ",class=" + regionPolicy + "]", e);}} else {regionPolicyObj = regionPolicyMap.get(regionPolicy);}} else if (regionPolicy instanceof Class) {try {// 实例化Class<? extends RegionPolicy> regionPolicyClass = (Class<? extends RegionPolicy>) regionPolicy;regionPolicyObj = regionPolicyClass.newInstance();} catch (Throwable e) {throw new IllegalArgumentException("failed to register regionPolicy[service=" + serviceId + ",class="+ regionPolicy + "]", e);}}// 以serviceId为key缓存if (regionPolicyObj != null) {regionPolicyMap.put(serviceId, regionPolicyObj);}
}

发布客户端到注册中心

这一步主要将调用方相关信息注册到注册中心中,大致实现如下

  1. 获取服务地址列表,这里主要从注册中心zk获取,具体获取地址如/DP/SERVER/com.dianping.pigeon.demo.EchoService,获取到的实际值是ip:port,如172.23.51.30:6354,如果有多个ip:port,会以逗号分割
  2. 遍历服务列表,从注册红心获取每个服务地址的权重,如通过/DP/WEIGHT/172.23.51.30:6354获取,获取到ip,port,weight后,会封装成一个HostInfo
  3. 通过RegistryEventListener发布providerAdded和serverInfoChanged事件,对于providerAdded事件:
    1. 会导致ClientManager.InnerServiceProviderChangeListener触发进行客户端注册,首先会初始化一个ConnectInfo,包含serviceName, host, port, weight等信息,然后会进行以下两步:

      1. 进一步会触发相关的ClusterListener调用addConnect,这里主要包含:

        1. 触发NettyClientFactory#createClient调用,进一步创建NioClientSocketChannelFactory,建立和服务端的netty连接,并建立心跳检测
        2. 触发WeightFactorMaintainer#providerAdded,进一步调用addWeight方法,更新weights和weightFactors成员。
      2. 在RegistryManager中添加服务引用地址,更新referencedServiceAddresses和referencedAddresses
    2. 触发WeightFactorMaintainer#providerAdded,进一步调用addWeight方法,更新weights和weightFactors成员。这里和前面一步有重复

具体实现代码逻辑如下:

public Set<HostInfo> registerClients(InvokerConfig invokerConfig) {// 获取相关基本信息String remoteAppkey = invokerConfig.getRemoteAppKey();String serviceName = invokerConfig.getUrl();String group = RegistryManager.getInstance().getGroup(serviceName);String vip = invokerConfig.getVip();logger.info("start to register clients for service '" + serviceName + "#" + group + "'");String localHost = null;if (vip != null && vip.startsWith("console:")) {localHost = configManager.getLocalIp() + vip.substring(vip.indexOf(":"));}// 从注册中心获取服务地址String serviceAddress = getServiceAddress(invokerConfig);String[] addressArray = serviceAddress.split(",");Set<HostInfo> addresses = Collections.newSetFromMap(new ConcurrentHashMap<HostInfo, Boolean>());for (int i = 0; i < addressArray.length; i++) {if (StringUtils.isNotBlank(addressArray[i])) {// 解析服务地址String address = addressArray[i];int idx = address.lastIndexOf(":");if (idx != -1) {String host = null;int port = -1;try {host = address.substring(0, idx);port = Integer.parseInt(address.substring(idx + 1));} catch (RuntimeException e) {logger.warn("invalid address:" + address + " for service:" + serviceName);}if (host != null && port > 0) {if (localHost != null && !localHost.equals(host + ":" + port)) {continue;}try {// 从注册中心获取相应服务权重int weight = RegistryManager.getInstance().getServiceWeight(address, serviceName, false);addresses.add(new HostInfo(host, port, weight));} catch (Throwable e) {logger.error("error while registering service invoker:" + serviceName + ", address:"+ address + ", env:" + configManager.getEnv(), e);throw new ServiceUnavailableException("error while registering service invoker:"+ serviceName + ", address:" + address + ", env:" + configManager.getEnv(), e);}}} else {logger.warn("invalid address:" + address + " for service:" + serviceName);}}}final String url = serviceName;long start = System.nanoTime();if (enableRegisterConcurrently) {// 并发发布providerAdded和serverInfoChanged时间给RegistryEventListenerfinal CountDownLatch latch = new CountDownLatch(addresses.size());for (final HostInfo hostInfo : addresses) {Runnable r = new Runnable() {@Overridepublic void run() {try {RegistryEventListener.providerAdded(url, hostInfo.getHost(), hostInfo.getPort(),hostInfo.getWeight());RegistryEventListener.serverInfoChanged(url, hostInfo.getConnect());} catch (Throwable t) {logger.error("failed to add provider client:" + hostInfo, t);} finally {latch.countDown();}}};registerThreadPool.submit(r);}try {latch.await();} catch (InterruptedException e) {logger.info("", e);}} else {//顺序执行for (final HostInfo hostInfo : addresses) {RegistryEventListener.providerAdded(url, hostInfo.getHost(), hostInfo.getPort(), hostInfo.getWeight());RegistryEventListener.serverInfoChanged(url, hostInfo.getConnect());}}long end = System.nanoTime();logger.info("end to register clients for service '" + serviceName + "#" + group + "', cost:"+ ((end - start) / 1000000));return addresses;
}

基于Xml配置完成服务注册

基于xml文件配置创建流程更复杂些,具体在创建ReferenceBean流程中实现,类似与服务注册,在Spring调用默认构造函数创建实例对象、完成依赖注入后,在调用init方法阶段进行,这里和ServiceBean服务注册略有不同的是ReferenceBean实现了FactoryBean接口。
具体init方法调用逻辑如下:

  1. 加载接口类
  2. 根据ReferenceBean当前配置创建InvokerConfig
  3. 如果存在线程池隔离方法配置,设置到invokeConfig并缓存到methodMap。
  4. 降级配置检查 ?????
  5. 检查和设置remoteAppkey
  6. 调用ServiceFactory.getService(invokerConfig)获取服务
  7. 配置负载均衡

代码实现:

public void init() throws Exception {// 1. 加载接口类if (StringUtils.isBlank(interfaceName)) {throw new IllegalArgumentException("invalid interface:" + interfaceName);}this.objType = ClassUtils.loadClass(this.classLoader, this.interfaceName.trim());// 2. 根据ReferenceBean当前配置创建InvokerConfigInvokerConfig<?> invokerConfig = new InvokerConfig(this.objType, this.url, this.timeout, this.callType,this.serialize, this.callback, this.suffix, this.writeBufferLimit, this.loadBalance, this.cluster,this.retries, this.timeoutRetry, this.vip, this.version, this.protocol);invokerConfig.setClassLoader(classLoader);invokerConfig.setSecret(secret);invokerConfig.setRegionPolicy(regionPolicy);// 3. 如果存在线程池隔离方法配置,设置到invokeConfig并缓存到methodMap。if (!CollectionUtils.isEmpty(methods)) {Map<String, InvokerMethodConfig> methodMap = new HashMap<String, InvokerMethodConfig>();invokerConfig.setMethods(methodMap);for (InvokerMethodConfig method : methods) {methodMap.put(method.getName(), method);}}// 4. 降级配置检查checkMock(); invokerConfig.setMock(mock);// 5. 检查和设置remoteAppkeycheckRemoteAppkey();invokerConfig.setRemoteAppKey(remoteAppKey);// 6. 获取服务代理对象this.obj = ServiceFactory.getService(invokerConfig);// 7. 配置负载均衡configLoadBalance(invokerConfig);
}

【Pigeon源码阅读】服务发现与调用初始化流程解析(五)相关推荐

  1. ijkplayer 源码分析(1):初始化流程

    一.ijkplayer 初始化流程 本文是基于 A4ijkplayer 项目进行 ijkplayer 源码分析,该项目是将 ijkplayer 改成基于 CMake 编译,可导入 Android St ...

  2. java B2B2C springmvc mybatis电子商务平台源码-Consul服务发现原理...

    Consul 是什么 Consul 是一个支持多数据中心分布式高可用的服务发现和配置共享的服务软件,由 HashiCorp 公司用 Go 语言开发, 基于 Mozilla Public License ...

  3. Scrapy源码阅读分析_1_整体框架和流程介绍

    From:https://blog.csdn.net/weixin_37947156/article/details/74435304 Scrapy github 下载地址:https://githu ...

  4. Mybatis源码阅读(二):动态节点解析2.1 —— SqlSource和SqlNode

    *************************************优雅的分割线 ********************************** 分享一波:程序员赚外快-必看的巅峰干货 如 ...

  5. Redis源码阅读01-读了一下redis启动流程涉及的源码我都读了个啥

    阅读源码是学习一门技术的必经之路,经过1周左右的c语言入门学习,我就开始硬读redis的源码了.因为公司的多版本的改造,所以源码就选择redis6.x的最高版本redis6.2.7. 在阅读源码前,首 ...

  6. spring 源码阅读笔记-从浅到深的解析

    目录 第一章 源码安装 文章目录 目录 前言 一.spring源码下载 二.构建源码及使用 1.源码构建 2.使用构建源码 总结 前言 由于spring的源码常常以语言和高深莫测的地位存在,而源码解析 ...

  7. STL源码剖析 内存基本处理工具 初始化空间的五个函数

    初始化空间的五个函数 构造函数 construct() 析构函数 destroy() 剩余三个底层函数 和 高层函数之间的对应关系如下 uninitialized_copy()  对应 copy() ...

  8. Gaea源码阅读(二):客户端流程

    转载地址:http://blog.csdn.net/m_vptr/article/details/9147279 以GaeaClientTest为入口 [java]  view plain  copy ...

  9. PHP yii 框架源码阅读(二) - 整体执行流程分析

    转载链接:http://tech.ddvip.com/2013-11/1384432766205970.html 一  程序入口 <?php// change the following pat ...

  10. Mybatis源码阅读(二):动态节点解析2.2 —— SqlSourceBuilder与三种SqlSource

    *************************************优雅的分割线 ********************************** 分享一波:程序员赚外快-必看的巅峰干货 如 ...

最新文章

  1. springboot-springmvc-requestParam
  2. JSONObject、JSONArray区别
  3. IDEA 2020 本土化,真的是全中文了,新手,开心了!
  4. 你没干什么坏事,你怕什么?
  5. C++ STL string迭代器的使用
  6. ADO.NET三个基本对象(一)
  7. Web后端学习笔记 Flask(13)memcached
  8. 栈的典型应用 —— 延迟缓冲
  9. 安装pkgconfig_一个R包怎么也安装不上,憋着急!
  10. c语言中十进制与十六进制转换_二进制、八进制、十进制、十六进制数据转换...
  11. 1013 数素数 (20 分)—PAT (Basic Level) Practice (中文)
  12. SecureCrt 利用公匙登录L机取消密码登录。
  13. (转)美国:2016-2045新科技趋势报告
  14. 哥斯拉Godzilla shell管理工具
  15. 微信科室预约挂号小程序
  16. React.js 小书
  17. RPA机器人如何调用USB SERVER
  18. 【STATA】ARIMA模型(含代码)
  19. Vim下快速删除字符串
  20. nginx日志磁盘空间使用率100%

热门文章

  1. 关于扩散模型(Diffusion Models)中的P2-weighting使用防坑
  2. 【兴趣书签】科幻小说——《走出一日囚》
  3. Android自定义图片拼图游戏案例
  4. 【TF-Slim使用】
  5. vbox 虚拟机导入 无法启动 fatal:int18:boot failure
  6. js md5加密 无法md5解密
  7. 使用 backdoor 工具注入ShellCode
  8. java 小数位数控制
  9. mpeg文件格式分析
  10. 【C语言】下标法 编写数组元素的输入与输出