服务消费者

<!-- consumer's application name, used for tracing dependency relationship (not a matching criterion),don't set it same as provider --><dubbo:application name="dubbo-consumer"/><!-- protocol="zookeeper"必须有 --><dubbo:registry protocol="zookeeper" address="${dubbo.registry.address}"/><!-- generate proxy for the remote service, then demoService can be used in the same way as thelocal regular interface --><dubbo:reference id="userService" check="false" interface="com.study.dubbo.userapi.service.UserService" loadbalance="consistenthash"/>

ReferenceBean

public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {private static final long serialVersionUID = 213195494150089726L;private transient ApplicationContext applicationContext;public ReferenceBean() {super();}public ReferenceBean(Reference reference) {super(reference);}public void setApplicationContext(ApplicationContext applicationContext) {this.applicationContext = applicationContext;SpringExtensionFactory.addApplicationContext(applicationContext);}public Object getObject() throws Exception {return get();}public Class<?> getObjectType() {return getInterfaceClass();}@Parameter(excluded = true)public boolean isSingleton() {return true;}@SuppressWarnings({"unchecked"})public void afterPropertiesSet() throws Exception {if (getConsumer() == null) {Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);if (consumerConfigMap != null && consumerConfigMap.size() > 0) {ConsumerConfig consumerConfig = null;for (ConsumerConfig config : consumerConfigMap.values()) {if (config.isDefault() == null || config.isDefault().booleanValue()) {if (consumerConfig != null) {throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);}consumerConfig = config;}}if (consumerConfig != null) {setConsumer(consumerConfig);}}}if (getApplication() == null&& (getConsumer() == null || getConsumer().getApplication() == null)) {Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);if (applicationConfigMap != null && applicationConfigMap.size() > 0) {ApplicationConfig applicationConfig = null;for (ApplicationConfig config : applicationConfigMap.values()) {if (config.isDefault() == null || config.isDefault().booleanValue()) {if (applicationConfig != null) {throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);}applicationConfig = config;}}if (applicationConfig != null) {setApplication(applicationConfig);}}}if (getModule() == null&& (getConsumer() == null || getConsumer().getModule() == null)) {Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);if (moduleConfigMap != null && moduleConfigMap.size() > 0) {ModuleConfig moduleConfig = null;for (ModuleConfig config : moduleConfigMap.values()) {if (config.isDefault() == null || config.isDefault().booleanValue()) {if (moduleConfig != null) {throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);}moduleConfig = config;}}if (moduleConfig != null) {setModule(moduleConfig);}}}if ((getRegistries() == null || getRegistries().isEmpty())&& (getConsumer() == null || getConsumer().getRegistries() == null || getConsumer().getRegistries().isEmpty())&& (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().isEmpty())) {Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);if (registryConfigMap != null && registryConfigMap.size() > 0) {List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();for (RegistryConfig config : registryConfigMap.values()) {if (config.isDefault() == null || config.isDefault().booleanValue()) {registryConfigs.add(config);}}if (registryConfigs != null && !registryConfigs.isEmpty()) {super.setRegistries(registryConfigs);}}}if (getMonitor() == null&& (getConsumer() == null || getConsumer().getMonitor() == null)&& (getApplication() == null || getApplication().getMonitor() == null)) {Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);if (monitorConfigMap != null && monitorConfigMap.size() > 0) {MonitorConfig monitorConfig = null;for (MonitorConfig config : monitorConfigMap.values()) {if (config.isDefault() == null || config.isDefault().booleanValue()) {if (monitorConfig != null) {throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);}monitorConfig = config;}}if (monitorConfig != null) {setMonitor(monitorConfig);}}}Boolean b = isInit();if (b == null && getConsumer() != null) {b = getConsumer().isInit();}if (b != null && b.booleanValue()) {getObject();}}@Overridepublic void destroy() {// do nothing}
}

afterPropertiesSet初始化,调用getObject方法获取bean

ReferenceConfig

public synchronized T get() {if (destroyed) {throw new IllegalStateException("Already destroyed!");}if (ref == null) {init();}return ref;}private void init() {if (initialized) {return;}initialized = true;if (interfaceName == null || interfaceName.length() == 0) {throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");}// get consumer's global configurationcheckDefault();appendProperties(this);if (getGeneric() == null && getConsumer() != null) {setGeneric(getConsumer().getGeneric());}if (ProtocolUtils.isGeneric(getGeneric())) {interfaceClass = GenericService.class;} else {try {interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}checkInterfaceAndMethods(interfaceClass, methods);}String resolve = System.getProperty(interfaceName);String resolveFile = null;if (resolve == null || resolve.length() == 0) {resolveFile = System.getProperty("dubbo.resolve.file");if (resolveFile == null || resolveFile.length() == 0) {File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");if (userResolveFile.exists()) {resolveFile = userResolveFile.getAbsolutePath();}}if (resolveFile != null && resolveFile.length() > 0) {Properties properties = new Properties();FileInputStream fis = null;try {fis = new FileInputStream(new File(resolveFile));properties.load(fis);} catch (IOException e) {throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);} finally {try {if (null != fis) fis.close();} catch (IOException e) {logger.warn(e.getMessage(), e);}}resolve = properties.getProperty(interfaceName);}}if (resolve != null && resolve.length() > 0) {url = resolve;if (logger.isWarnEnabled()) {if (resolveFile != null && resolveFile.length() > 0) {logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");} else {logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");}}}if (consumer != null) {if (application == null) {application = consumer.getApplication();}if (module == null) {module = consumer.getModule();}if (registries == null) {registries = consumer.getRegistries();}if (monitor == null) {monitor = consumer.getMonitor();}}if (module != null) {if (registries == null) {registries = module.getRegistries();}if (monitor == null) {monitor = module.getMonitor();}}if (application != null) {if (registries == null) {registries = application.getRegistries();}if (monitor == null) {monitor = application.getMonitor();}}checkApplication();checkStubAndMock(interfaceClass);Map<String, String> map = new HashMap<String, String>();Map<Object, Object> attributes = new HashMap<Object, Object>();map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));if (ConfigUtils.getPid() > 0) {map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));}if (!isGeneric()) {String revision = Version.getVersion(interfaceClass, version);if (revision != null && revision.length() > 0) {map.put("revision", revision);}String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();if (methods.length == 0) {logger.warn("NO method found in service interface " + interfaceClass.getName());map.put("methods", Constants.ANY_VALUE);} else {map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));}}map.put(Constants.INTERFACE_KEY, interfaceName);appendParameters(map, application);appendParameters(map, module);appendParameters(map, consumer, Constants.DEFAULT_KEY);appendParameters(map, this);String prefix = StringUtils.getServiceKey(map);if (methods != null && !methods.isEmpty()) {for (MethodConfig method : methods) {appendParameters(map, method, method.getName());String retryKey = method.getName() + ".retry";if (map.containsKey(retryKey)) {String retryValue = map.remove(retryKey);if ("false".equals(retryValue)) {map.put(method.getName() + ".retries", "0");}}appendAttributes(attributes, method, prefix + "." + method.getName());checkAndConvertImplicitConfig(method, map, attributes);}}String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);if (hostToRegistry == null || hostToRegistry.length() == 0) {hostToRegistry = NetUtils.getLocalHost();} else if (isInvalidLocalHost(hostToRegistry)) {throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);}map.put(Constants.REGISTER_IP_KEY, hostToRegistry);//attributes are stored by system context.StaticContext.getSystemContext().putAll(attributes);ref = createProxy(map);ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);}

ReferenceConfig创建代理

    @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})private T createProxy(Map<String, String> map) {URL tmpUrl = new URL("temp", "localhost", 0, map);final boolean isJvmRefer;if (isInjvm() == null) {if (url != null && url.length() > 0) { // if a url is specified, don't do local referenceisJvmRefer = false;} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {// by default, reference local service if there isisJvmRefer = true;} else {isJvmRefer = false;}} else {isJvmRefer = isInjvm().booleanValue();}if (isJvmRefer) {URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);invoker = refprotocol.refer(interfaceClass, url);if (logger.isInfoEnabled()) {logger.info("Using injvm service " + interfaceClass.getName());}} else {if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);if (us != null && us.length > 0) {for (String u : us) {URL url = URL.valueOf(u);if (url.getPath() == null || url.getPath().length() == 0) {url = url.setPath(interfaceName);}if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));} else {urls.add(ClusterUtils.mergeUrl(url, map));}}}} else { // assemble URL from register center's configurationList<URL> us = loadRegistries(false);if (us != null && !us.isEmpty()) {for (URL u : us) {URL monitorUrl = loadMonitor(u);if (monitorUrl != null) {map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));}urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));}}if (urls == null || urls.isEmpty()) {throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");}}if (urls.size() == 1) {invoker = refprotocol.refer(interfaceClass, urls.get(0));} else {List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();URL registryURL = null;for (URL url : urls) {invokers.add(refprotocol.refer(interfaceClass, url));if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {registryURL = url; // use last registry url}}if (registryURL != null) { // registry url is available// use AvailableCluster only when register's cluster is availableURL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);invoker = cluster.join(new StaticDirectory(u, invokers));} else { // not a registry urlinvoker = cluster.join(new StaticDirectory(invokers));}}}Boolean c = check;if (c == null && consumer != null) {c = consumer.isCheck();}if (c == null) {c = true; // default true}if (c && !invoker.isAvailable()) {throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());}if (logger.isInfoEnabled()) {logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());}// create service proxyreturn (T) proxyFactory.getProxy(invoker);}

DubboProtocol

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {optimizeSerialization(url);// create rpc invoker.DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);invokers.add(invoker);return invoker;}

DubboInvoker

public class DubboInvoker<T> extends AbstractInvoker<T> {public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers) {super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});this.clients = clients;// get version.this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0");this.invokers = invokers;}@Overrideprotected Result doInvoke(final Invocation invocation) throws Throwable {RpcInvocation inv = (RpcInvocation) invocation;final String methodName = RpcUtils.getMethodName(invocation);inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());inv.setAttachment(Constants.VERSION_KEY, version);ExchangeClient currentClient;if (clients.length == 1) {currentClient = clients[0];} else {currentClient = clients[index.getAndIncrement() % clients.length];}try {boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);if (isOneway) {boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);currentClient.send(inv, isSent);RpcContext.getContext().setFuture(null);return new RpcResult();} else if (isAsync) {ResponseFuture future = currentClient.request(inv, timeout);RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));return new RpcResult();} else {RpcContext.getContext().setFuture(null);return (Result) currentClient.request(inv, timeout).get();}} catch (TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (RemotingException e) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}}

AbstractInvoker

public Result invoke(Invocation inv) throws RpcException {if (destroyed.get()) {throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()+ " use dubbo version " + Version.getVersion()+ " is DESTROYED, can not be invoked any more!");}RpcInvocation invocation = (RpcInvocation) inv;invocation.setInvoker(this);if (attachment != null && attachment.size() > 0) {invocation.addAttachmentsIfAbsent(attachment);}Map<String, String> context = RpcContext.getContext().getAttachments();if (context != null) {invocation.addAttachmentsIfAbsent(context);}if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());}RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);try {return doInvoke(invocation);} catch (InvocationTargetException e) { // biz exceptionThrowable te = e.getTargetException();if (te == null) {return new RpcResult(e);} else {if (te instanceof RpcException) {((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);}return new RpcResult(te);}} catch (RpcException e) {if (e.isBiz()) {return new RpcResult(e);} else {throw e;}} catch (Throwable e) {return new RpcResult(e);}}

doInvoke模板方法,子类DubboInvoker实现

Invoker接口,核心是invoke方法

public interface Invoker<T> extends Node {/*** get service interface.** @return service interface.*/Class<T> getInterface();/*** invoke.** @param invocation* @return result* @throws RpcException*/Result invoke(Invocation invocation) throws RpcException;}

然后看createProxy方法的最后面,getProxy
AbstractProxyFactory

public abstract class AbstractProxyFactory implements ProxyFactory {public <T> T getProxy(Invoker<T> invoker) throws RpcException {Class<?>[] interfaces = null;String config = invoker.getUrl().getParameter("interfaces");if (config != null && config.length() > 0) {String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);if (types != null && types.length > 0) {interfaces = new Class<?>[types.length + 2];interfaces[0] = invoker.getInterface();interfaces[1] = EchoService.class;for (int i = 0; i < types.length; i++) {interfaces[i + 1] = ReflectUtils.forName(types[i]);}}}if (interfaces == null) {interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};}return getProxy(invoker, interfaces);}public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);}

JavassistProxyFactory

public class JavassistProxyFactory extends AbstractProxyFactory {@SuppressWarnings("unchecked")public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));}public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable {return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}};}}

dubbo源码-服务发现相关推荐

  1. JAVA b2b2c多用户商城系统源码-服务发现服务端EurekaServer微服务

    一.大致介绍 1.众所周知,在现在互联网开发中,访问地址的IP和端口号是动态的,一个服务停掉再重新启用后IP和端口就可能发生了改变,所以用硬编码是肯定不行了.于是我们尝试使用新的技术来解决这一难题.需 ...

  2. Nacos源码服务发现

    Nacos提供了一个根据serviceId查询实例列表的接口: 接口描述:查询服务下的实例列表 请求类型:GET 请求路径: /nacos/v1/ns/instance/list 请求参数: 名称 类 ...

  3. Dubbo源码分析(三) -- Dubbo的服务发现源码深入解析4万字长文

    前言 前面两篇基本上已经对dubbo的SPI,服务发布,注册等功能进行了分析,那么在消费端是如何发现服务,并进行透明的远程调用的呢?带着这个疑问,走入今天的篇章,Dubbo的服务发现 服务发现的流程 ...

  4. Dubbo 源码分析 - 服务导出

    1.服务导出过程 本篇文章,我们来研究一下 Dubbo 导出服务的过程.Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑.整个逻辑大致可 ...

  5. dubbo 支持服务降级吗_dubbo面试题!会这些,说明你真正看懂了dubbo源码

    整理了一些dubbo可能会被面试的面试题,感觉非常不错.如果你基本能回答说明你看懂了dubbo源码,对dubbo了解的足够全面.你可以尝试看能不能回答下.我们一起看下有哪些问题吧? dubbo中&qu ...

  6. 【Dubbo源码阅读系列】服务暴露之本地暴露

    在上一篇文章中我们介绍 Dubbo 自定义标签解析相关内容,其中我们自定义的 XML 标签 <dubbo:service /> 会被解析为 ServiceBean 对象(传送门:Dubbo ...

  7. dubbo源码解析-逻辑层设计之服务降级

    Dubbo源码解析系列文章均来自肥朝简书 前言 在dubbo服务暴露系列完结之后,按计划来说是应该要开启dubbo服务引用的讲解.但是现在到了年尾,一些朋友也和我谈起了明年跳槽的事.跳槽这件事,无非也 ...

  8. dubbo(5) Dubbo源码解析之服务调用过程

    来源:https://juejin.im/post/5ca4a1286fb9a05e731fc042 Dubbo源码解析之服务调用过程 简介 在前面的文章中,我们分析了 Dubbo SPI.服务导出与 ...

  9. dubbo源码分析7 之 服务本地暴露

    在上一篇文章我们分析了一下 dubbo 在服务暴露发生了哪些事,今天我们就来分析一下整个服务暴露中的本地暴露.(PS:其实我感觉本地暴露蛮鸡肋的).本地暴露需要服务提供方与服务消费方在同一个 JVM. ...

最新文章

  1. Python 与 Excel结合
  2. image caption优秀链接
  3. 坦克大战代码_坦克大战系列文章-坦克大战简介
  4. 《啊哈算法》 解密 QQ 号 —— 队列(python实现)
  5. python基础语法手册-python语法大全,python语法手册
  6. Excel做题记录——整数规划优化模型
  7. ApiPost简单的接口测试教程
  8. 【Python】用150行代码模拟太阳系行星运转+源码
  9. Qt Moc及信号-槽源代码解析
  10. 计算机网络技术ip地址计算,计算机网络原理-IP地址计算题.doc
  11. windows安装OHS(weblogic管理)
  12. 在写CSDN的文章时,如何插入表格并进行简单的配置
  13. 快递100快递实时快递查询接口API案例代码
  14. matlab斜杠日期,日期用斜线怎么写
  15. 在外企当程序员是怎样的体验?
  16. 第四章 账号权限管理
  17. 安居客 楼盘信息 项目代码-
  18. Matplotlib——散点图_多种自定义
  19. 银行各个岗位及薪酬排名出炉(供参考)
  20. BZOJ4585: [Apio2016]烟火表演

热门文章

  1. 【Vue】—事件处理
  2. String ,StringBuilder,StringBuffer的区分
  3. 耳机的L和R是什么意思?
  4. 五百兆电信宽带玩穿越火线,电信区,延时卡70到80怎么回事,换的千兆猫和路由器,线都是六类?
  5. 女生天天和我微信语音5小时以上,突然没有联系,应该怎么办?
  6. 眼儿媚·迟迟春日弄轻柔 [宋] 朱淑真
  7. 微商人赚钱的4个落地动作
  8. 做自媒体花式撸收益?
  9. 富人是如何发财的——思考致富的方法
  10. 中国有了北斗系统,为什么手机上还是GPS?