服务调用(通信)流程

  • 一、服务发布
    • 1.1 NettyServer
    • 1.2 ChannlHandler 处理链
  • 二、服务发现
  • 三、服务调用(通信)流程
    • 3.1 客户端代理类执行链路
      • 1.InvokerInvocationHandler#invoke
      • 2. result = invoker.invoke(rpcInvocation)
        • 2.1 MigrationInvoker#invoke(Invocation invocation)
        • 2.2 MockClusterInvoker#invoke(Invocation invocation)
        • 2.3 AbstractCluster.InterceptorInvokerNode#invoke(Invocation invocation)
          • 2.3.1 ClusterInterceptor#intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation)
          • 2.3.2 AbstractClusterInvoker#invoke(final Invocation invocation)
          • 2.3.3 FailoverClusterInvoker#doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance)
          • 2.3.4 RegistryDirectory.InvokerDelegate#invoke(Invocation invocation)
          • 2.3.5 FilterNode#invoke(Invocation invocation)
            • 2.3.5.1 ConsumerContextFilter
            • 2.3.5.2 FutureFilter
            • 2.3.5.3 MonitorFilter
          • 2.3.6 ListenerInvokerWrapper
          • 2.3.7 AsyncToSyncInvoker
        • 2.4 DubboInvoker
      • 3. result.recreate()
    • 3.2 服务端 ChannelHandler 处理链路
      • 3.2.1 MultiMessageHandler
      • 3.2.2 HeartbeatHandler
      • 3.2.3 AllChannelHandler
        • ChannelEventRunnable
      • 3.2.4 DecodeHandler
      • 3.2.5 DubboProtocol.handler
    • 3.3 客户端收到服务端执行结果后处理
      • 3.3.1 HeaderExchangeHandler
      • 3.3.2 DefaultFuture
  • 四、总结

文章系列

【一、dubbo源码解析之框架粗谈】
【二、dubbo源码解析之dubbo配置解析】
【三、dubbo源码解析之服务发布与注册】
【四、dubbo源码解析之服务发现】
【五、dubbo源码解析之服务调用(通信)流程】
【六、dubbo获取服务提供者IP列表】

一、服务发布

在 dubbo源码解析之服务发布与注册 一文中,存在步骤 4.4.3 服务发布,通过 DubboProtocol.export() 暴露一个本地端口,用于监听并处理客户端连接请求。

public class DubboProtocol extends AbstractProtocol {@Overridepublic <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {URL url = invoker.getUrl();// export service.// key=serviceClassName + : + port,如com.example.demo.provider.DemoProvider:20880String key = serviceKey(url);DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);exporterMap.addExportMap(key, exporter);//export an stub service for dispatching eventBoolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackservice) {String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {if (logger.isWarnEnabled()) {logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +"], has set stubproxy support event ,but no stub methods founded."));}}}// 打开服务器:暴露一个本地端口,用于监听并处理客户端连接请求。openServer(url);// 优先采用的序列化算法optimizeSerialization(url);return exporter;}
}

如果采用Netty进行远程通信,最终会通创建一个 NettyServer 对象。

1.1 NettyServer

public class NettyServer extends AbstractServer implements RemotingServer {private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);/*** the cache for alive worker channel.* <ip:port, dubbo channel>*/private Map<String, Channel> channels;/*** netty server bootstrap.*/private ServerBootstrap bootstrap;/*** the boss channel that receive connections and dispatch these to worker channel.*/private io.netty.channel.Channel channel;private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;public NettyServer(URL url, ChannelHandler handler) throws RemotingException {// 会在父类的构造函数中,调用 doOpen() 方法:初始化并启动 netty 服务器super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));}/*** 初始化并启动 netty 服务器*/@Overrideprotected void doOpen() throws Throwable {bootstrap = new ServerBootstrap();bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");workerGroup = NettyEventLoopFactory.eventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),"NettyServerWorker");final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);channels = nettyServerHandler.getChannels();boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);bootstrap.group(bossGroup, workerGroup).channel(NettyEventLoopFactory.serverSocketChannelClass()).option(ChannelOption.SO_REUSEADDR, Boolean.TRUE).childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE).childOption(ChannelOption.SO_KEEPALIVE, keepalive).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// FIXME: should we use getTimeout()?int idleTimeout = UrlUtils.getIdleTimeout(getUrl());NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {ch.pipeline().addLast("negotiation",SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));}ch.pipeline().addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder()).addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)).addLast("handler", nettyServerHandler);}});// bindChannelFuture channelFuture = bootstrap.bind(getBindAddress());channelFuture.syncUninterruptibly();channel = channelFuture.channel();}
}

NettyServer 中,主要是初始化并启动了一个 netty 服务器,然后构造了一个 Handler 处理链。

1.2 ChannlHandler 处理链

NettyServer 构造函数中,存在代码 ChannelHandlers.wrap(handler, url),返回一个 ChannlHandler 处理链。

public class ChannelHandlers {private static ChannelHandlers INSTANCE = new ChannelHandlers();protected ChannelHandlers() {}public static ChannelHandler wrap(ChannelHandler handler, URL url) {// 调用其 wrapInternal 进行包装 handlerreturn ChannelHandlers.getInstance().wrapInternal(handler, url);}protected static ChannelHandlers getInstance() {return INSTANCE;}static void setTestingChannelHandlers(ChannelHandlers instance) {INSTANCE = instance;}// 包装 handlerprotected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));}
}

最终,ChannelHandler 链路如下:

MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> handler


其中 handler 为 DubboProtocol 中的成员变量。

二、服务发现

在 dubbo源码解析之服务发现 一文中,通过步骤 3.3 Reference.createProxy() 创建了并返回了一个代理对象(最终创建 Invoker 对象为 DubboInvoker)。

最终,创建的代理类结构如下:

三、服务调用(通信)流程

3.1 客户端代理类执行链路

在 DubboInvoker 打一个断点,通过对其执行链路分析,最终调用链路如下:

// 代理对象Invoker方法
1. org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke(Object proxy, Method method, Object[] args)// 调用
2. result = invoker.invoke(rpcInvocation)// 故障转移2.1 MigrationInvoker#invoke(Invocation invocation)// Mock2.2 MockClusterInvoker#invoke(Invocation invocation)// 集群拦截器:包含一些前置、后置拦截器2.3 AbstractCluster.InterceptorInvokerNode#invoke(Invocation invocation)// 执行拦截方法2.3.1 ClusterInterceptor#intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation)2.3.2 AbstractClusterInvoker#invoke(final Invocation invocation)// 集群容错模式2.3.3 FailoverClusterInvoker#doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance)// 包装2.3.4 RegistryDirectory.InvokerDelegate#invoke(Invocation invocation)// 过滤器2.3.5 FilterNode#invoke(Invocation invocation)2.3.5.1 ConsumerContextFilter2.3.5.2 FutureFilter2.3.5.3 MonitorFilter// 添加监听2.3.6 ListenerInvokerWrapper// 异步、同步处理器2.3.7 AsyncToSyncInvoker// dubbo调用2.4 DubboInvoker// 获取结果集
3. result.recreate()

1.InvokerInvocationHandler#invoke

通过 JavassistProxyFactory 生成,采用 JDK 动态代理,生成过程如下:

public class JavassistProxyFactory extends AbstractProxyFactory {@Override@SuppressWarnings("unchecked")public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));}
}

可见,InvokerInvocationHandler 一定实现了 InvocationHandler 接口,最终,执行对象方法,都会进入到其 Object invoke(Object proxy, Method method, Object[] args) 中。

public class InvokerInvocationHandler implements InvocationHandler {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// Object方法直接执行if (method.getDeclaringClass() == Object.class) {return method.invoke(invoker, args);}String methodName = method.getName();Class<?>[] parameterTypes = method.getParameterTypes();if (parameterTypes.length == 0) {if ("toString".equals(methodName)) {return invoker.toString();} else if ("$destroy".equals(methodName)) {invoker.destroy();return null;} else if ("hashCode".equals(methodName)) {return invoker.hashCode();}} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {return invoker.equals(args[0]);}/*** 构建 rpc 调用对象:核心对象,用于在rpc调用的时候进行传输* method:需要调用的方法* invoker.getInterface().getName():调用方法所属接口ClassName,*                                   如com.example.demo.provider.TestProvider* protocolServiceKey:如com.example.demo.provider.TestProvider:dubbo* args:方法实参*/RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);// 目标服务名称:如com.example.demo.provider.TestProviderString serviceKey = invoker.getUrl().getServiceKey();rpcInvocation.setTargetServiceUniqueName(serviceKey);// invoker.getUrl() returns consumer url.// 为当前上下文添加url参数:如dubbo://127.0.0.1/com.example.demo.provider.TestProvider?省略其他参数...RpcContext.setRpcContext(invoker.getUrl());if (consumerModel != null) {// 添加属性rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));}// 调用:invoker.invoke(rpcInvocation)// 获取结果:result.recreate()return invoker.invoke(rpcInvocation).recreate();}
}

2. result = invoker.invoke(rpcInvocation)

调用。

2.1 MigrationInvoker#invoke(Invocation invocation)

故障转移 Invoker。通过在 RegistryProtocol.doRefer() 进行创建,代码如下:

public class RegistryProtocol implements Protocol {protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {URL consumerUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);// 创建并获取一个 MigrationInvokerClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);return interceptInvoker(migrationInvoker, url, consumerUrl);}protected <T> ClusterInvoker<T> getMigrationInvoker(RegistryProtocol registryProtocol, Cluster cluster, Registry registry,Class<T> type, URL url, URL consumerUrl) {return new ServiceDiscoveryMigrationInvoker<T>(registryProtocol, cluster, registry, type, url, consumerUrl);}
}

MigrationInvoker

public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {@Overridepublic Result invoke(Invocation invocation) throws RpcException {// 检查 serviceDiscoveryInvoker 是否可用,初始不可用if (!checkInvokerAvailable(serviceDiscoveryInvoker)) {if (logger.isDebugEnabled()) {logger.debug("Using interface addresses to handle invocation, interface " + type.getName() + ", total address size " + (invoker.getDirectory().getAllInvokers() == null ? "is null" : invoker.getDirectory().getAllInvokers().size()));}// 走这里return invoker.invoke(invocation);}// 检查 invoker 是否可用if (!checkInvokerAvailable(invoker)) {if (logger.isDebugEnabled()) {logger.debug("Using instance addresses to handle invocation, interface " + type.getName() + ", total address size " + (serviceDiscoveryInvoker.getDirectory().getAllInvokers() == null ? " is null " : serviceDiscoveryInvoker.getDirectory().getAllInvokers().size()));}return serviceDiscoveryInvoker.invoke(invocation);}return currentAvailableInvoker.invoke(invocation);}
}

2.2 MockClusterInvoker#invoke(Invocation invocation)

执行 Mock 逻辑,在 RegistryProtocol.doCreateInvoker() 中进行创建,代码如下:

public class RegistryProtocol implements Protocol {protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {directory.setRegistry(registry);directory.setProtocol(protocol);// all attributes of REFER_KEYMap<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);if (directory.isShouldRegister()) {directory.setRegisteredConsumerUrl(urlToRegistry);registry.register(directory.getRegisteredConsumerUrl());}directory.buildRouterChain(urlToRegistry);directory.subscribe(toSubscribeUrl(urlToRegistry));// 创建并返回 Invoker// cluster 为ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(name, wrap) 获取的对象//默认情况下name=Cluster.DEFAULT=failover  wrap=truereturn (ClusterInvoker<T>) cluster.join(directory);}
}

所以,cluster 对象为 MockClusterWrapper(FailoverCluster) ,当执行 cluster.join(directory) 逻辑,最终会先到 MockClusterWrapperjoin() 方法中,进行包装增强。

public class MockClusterWrapper implements Cluster {private Cluster cluster;public MockClusterWrapper(Cluster cluster) {this.cluster = cluster;}@Overridepublic <T> Invoker<T> join(Directory<T> directory) throws RpcException {// 创建一个 MockClusterInvokerreturn new MockClusterInvoker<T>(directory,this.cluster.join(directory));}}

MockClusterWrapper 中,会创建一个 MockClusterInvoker

public class MockClusterInvoker<T> implements ClusterInvoker<T> {@Overridepublic Result invoke(Invocation invocation) throws RpcException {Result result = null;// 获取url中的 mock 参数String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();if (value.length() == 0 || "false".equalsIgnoreCase(value)) {// no mockresult = this.invoker.invoke(invocation);} else if (value.startsWith("force")) {if (logger.isWarnEnabled()) {logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());}//force:direct mock// force 模式:直接返回mock数据result = doMockInvoke(invocation, null);} else {//fail-mock// 异常执行mock逻辑try {result = this.invoker.invoke(invocation);//fix:#4585if(result.getException() != null && result.getException() instanceof RpcException){RpcException rpcException= (RpcException)result.getException();if(rpcException.isBiz()){throw  rpcException;}else {result = doMockInvoke(invocation, rpcException);}}} catch (RpcException e) {if (e.isBiz()) {throw e;}if (logger.isWarnEnabled()) {logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);}result = doMockInvoke(invocation, e);}}return result;}
}

2.3 AbstractCluster.InterceptorInvokerNode#invoke(Invocation invocation)

集群拦截器:包含一些前置、后置拦截器

在 MockClusterWrapper 构造函数中创建 MockClusterInvoker 时,会通过 this.cluster.join(directory) 创建一个 invoker 对象返回,如下:

new MockClusterInvoker<T>(directory, this.cluster.join(directory));

其中,this.cluster 为 dubbo SPI 注入的一个 cluster 对象,默认为 FailoverCluster

public class FailoverCluster extends AbstractCluster {public static final String NAME = "failover";@Overridepublic <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {return new FailoverClusterInvoker<>(directory);}
}

当调用其 join(directory) 方法时,会进入 AbstractCluster#invoke(directory) 中,如下:

public abstract class AbstractCluster implements Cluster {@Overridepublic <T> Invoker<T> join(Directory<T> directory) throws RpcException {// 构建集群拦截器return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY));}/*** 构建集群拦截器 * doJoin(directory):模版方法设计模式,执行子类的doJoin方法,默认情况下返回一个 FailoverClusterInvoker* directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY):获取url中reference.interceptor参数值*/private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) {AbstractClusterInvoker<T> last = clusterInvoker;// SPIList<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(clusterInvoker.getUrl(), key);if (!interceptors.isEmpty()) {for (int i = interceptors.size() - 1; i >= 0; i--) {final ClusterInterceptor interceptor = interceptors.get(i);final AbstractClusterInvoker<T> next = last;// 责任链模式,返回一个 InterceptorInvokerNodelast = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);}}return last;}
}

InterceptorInvokerNode 为 AbstractCluster 中的一个内部类,所以最终执行 invoke() 方法时,进入 AbstractCluster.InterceptorInvokerNode#invoke(Invocation invocation) 中,如下:

public abstract class AbstractCluster implements Cluster {protected class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {@Overridepublic Result invoke(Invocation invocation) throws RpcException {Result asyncResult;try {// 前置拦截处理器interceptor.before(next, invocation);// 执行拦截方法,返回一个异步结果asyncResult = interceptor.intercept(next, invocation);} catch (Exception e) {// onError callbackif (interceptor instanceof ClusterInterceptor.Listener) {ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;listener.onError(e, clusterInvoker, invocation);}throw e;} finally {// 后置拦截处理器interceptor.after(next, invocation);}// 异步 CallBackreturn asyncResult.whenCompleteWithContext((r, t) -> {// onResponse callbackif (interceptor instanceof ClusterInterceptor.Listener) {ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;if (t == null) {listener.onMessage(r, clusterInvoker, invocation);} else {listener.onError(t, clusterInvoker, invocation);}}});}}
}
2.3.1 ClusterInterceptor#intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation)

执行拦截方法,返回一个异步结果。

@SPI
public interface ClusterInterceptor {void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);default Result intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) throws RpcException {// 调用invoke,返回一个异步结果。return clusterInvoker.invoke(invocation);}interface Listener {void onMessage(Result appResponse, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);void onError(Throwable t, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);}
}
2.3.2 AbstractClusterInvoker#invoke(final Invocation invocation)

AbstractClusterInvoker#invoke(final Invocation invocation) 方法中存在几个重要步骤:

  1. 获取所有 invoker
  2. 初始化负载均衡算法
  3. 执行invoke
public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {@Overridepublic Result invoke(final Invocation invocation) throws RpcException {checkWhetherDestroyed();// binding attachments into invocation.// 绑定当前上下文中其他参数Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();if (contextAttachments != null && contextAttachments.size() != 0) {((RpcInvocation) invocation).addObjectAttachments(contextAttachments);}// step1:获取所有 invokerList<Invoker<T>> invokers = list(invocation);// step2:初始化负载均衡算法LoadBalance loadbalance = initLoadBalance(invokers, invocation);RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);// step3:执行invokereturn doInvoke(invocation, invokers, loadbalance);}
}

step1:获取所有 invoker

从 directory 中获取所有可调用的 Invoker 对象

  • DynamicDirectory:动态
  • StaticDirectory:静态

step2:初始化负载均衡算法

protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation  invocation) {if (CollectionUtils.isNotEmpty(invokers)) {// 默认随机return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));} else {// 默认随机return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);}
}

dubbo 内置负载均衡算法如下:

  • Random LoadBalance:随机,按权重设置随机概率(默认随机)。
  • RoundRobin LoadBalance:轮询,按公约后的权重设置轮询比率。
  • LeastActive LoadBalance:最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
  • ConsistentHash LoadBalance:一致性 Hash,相同参数的请求总是发到同一提供者。

step3:执行invoke

AbstractClusterInvoker 采用了抽象模版方法设计模式,实现类有如下(对应dubbo集群容错模式):

  • FailoverClusterInvoker:失败自动切换,当出现失败,重试其它服务器。
  • FailfastClusterInvoker:快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
  • FailsafeClusterInvoker:失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
  • FailbackClusterInvoker:失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
  • ForkingClusterInvoker:并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=“2” 来设置最大并行数。
  • BroadcastClusterInvoker:广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

默认采用 FailoverClusterInvoker 失败自动切换,当出现失败,重试其它服务器。

2.3.3 FailoverClusterInvoker#doInvoke(Invocation invocation, final List<Invoker> invokers, LoadBalance loadbalance)

FailoverClusterInvoker 失败自动切换,当出现失败,重试其它服务器

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {/*** 执行* @param invocation:方法调用元数据* @param Invokers:当前方法对应的所有url* @param LoadBalance:负载均衡算法*/@Override@SuppressWarnings({"unchecked", "rawtypes"})public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {List<Invoker<T>> copyInvokers = invokers;checkInvokers(copyInvokers, invocation);String methodName = RpcUtils.getMethodName(invocation);int len = calculateInvokeTimes(methodName);// retry loop.RpcException le = null; // last exception.List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.Set<String> providers = new HashSet<String>(len);// 失败自动切换,当出现失败,重试其它服务器// len 默认为3,也就是说,执行一次,异常最多重试2次for (int i = 0; i < len; i++) {//Reselect before retry to avoid a change of candidate `invokers`.//NOTE: if `invokers` changed, then `invoked` also lose accuracy.if (i > 0) {checkWhetherDestroyed();copyInvokers = list(invocation);// check againcheckInvokers(copyInvokers, invocation);}// 根据负载均衡算法、invoker 列表 获取一个invoker 对象Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);invoked.add(invoker);RpcContext.getContext().setInvokers((List) invoked);try {// 执行调用 invokeResult result = invoker.invoke(invocation);if (le != null && logger.isWarnEnabled()) {logger.warn("Although retry the method " + methodName+ " in the service " + getInterface().getName()+ " was successful by the provider " + invoker.getUrl().getAddress()+ ", but there have been failed providers " + providers+ " (" + providers.size() + "/" + copyInvokers.size()+ ") from the registry " + directory.getUrl().getAddress()+ " on the consumer " + NetUtils.getLocalHost()+ " using the dubbo version " + Version.getVersion() + ". Last error is: "+ le.getMessage(), le);}return result;} catch (RpcException e) {if (e.isBiz()) { // biz exception.throw e;}le = e;} catch (Throwable e) {le = new RpcException(e.getMessage(), e);} finally {providers.add(invoker.getUrl().getAddress());}}throw new RpcException(le.getCode(), "Failed to invoke the method "+ methodName + " in the service " + getInterface().getName()+ ". Tried " + len + " times of the providers " + providers+ " (" + providers.size() + "/" + copyInvokers.size()+ ") from the registry " + directory.getUrl().getAddress()+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "+ Version.getVersion() + ". Last error is: "+ le.getMessage(), le.getCause() != null ? le.getCause() : le);}
}

在 FailoverClusterInvoker 的 doInvoke() 方法中,会根据负载均衡算法,从 invokers 列表中选举出一个可执行的 invoke 对象,进而执行其 invoke() 方法。

其中,invoker 对象为 RegistryDirectory.InvokerDelegate(FilterNode(ListenerInvokerWrapper(AsyncToSyncInvoker(DubboInvoker)))),创建过程如下:

public class RegistryDirectory<T> extends DynamicDirectory<T> {private Map<URL, Invoker<T>> toInvokers(List<URL> urls) {// 省略其他代码.....invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);// 省略其他代码.....}
}
2.3.4 RegistryDirectory.InvokerDelegate#invoke(Invocation invocation)

包装。

public class RegistryDirectory<T> extends DynamicDirectory<T> {private static class InvokerDelegate<T> extends InvokerWrapper<T> {private URL providerUrl;public InvokerDelegate(Invoker<T> invoker, URL url, URL providerUrl) {super(invoker, url);this.providerUrl = providerUrl;}public URL getProviderUrl() {return providerUrl;}}
}

InvokerDelegate 为 RegistryDirectory 中的一个私有内部类,执行其 invoke() 方法,最终会调用其父类的 invoke() 方法,如下:

public class InvokerWrapper<T> implements Invoker<T> {@Overridepublic Result invoke(Invocation invocation) throws RpcException {// invoker  ->   FilterNodereturn invoker.invoke(invocation);}
}
2.3.5 FilterNode#invoke(Invocation invocation)

过滤器节点。

class FilterNode<T> implements Invoker<T>{@Overridepublic Result invoke(Invocation invocation) throws RpcException {Result asyncResult;try {// 执行其过滤链asyncResult = filter.invoke(next, invocation);} catch (Exception e) {if (filter instanceof ListenableFilter) {ListenableFilter listenableFilter = ((ListenableFilter) filter);try {Filter.Listener listener = listenableFilter.listener(invocation);if (listener != null) {listener.onError(e, invoker, invocation);}} finally {listenableFilter.removeListener(invocation);}} else if (filter instanceof Filter.Listener) {Filter.Listener listener = (Filter.Listener) filter;listener.onError(e, invoker, invocation);}throw e;} finally {}return asyncResult.whenCompleteWithContext((r, t) -> {if (filter instanceof ListenableFilter) {ListenableFilter listenableFilter = ((ListenableFilter) filter);Filter.Listener listener = listenableFilter.listener(invocation);try {if (listener != null) {if (t == null) {listener.onResponse(r, invoker, invocation);} else {listener.onError(t, invoker, invocation);}}} finally {listenableFilter.removeListener(invocation);}} else if (filter instanceof Filter.Listener) {Filter.Listener listener = (Filter.Listener) filter;if (t == null) {listener.onResponse(r, invoker, invocation);} else {listener.onError(t, invoker, invocation);}}});}
}

其中 filter 对象为 dubbo SPI 扩展点(在 ProtocolFilterWrapper#buildInvokerChain() 方法中进行获取构建。)。

List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
2.3.5.1 ConsumerContextFilter

消费者上下文过滤器。

@Activate(group = CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {RpcContext context = RpcContext.getContext();context.setInvoker(invoker).setInvocation(invocation).setLocalAddress(NetUtils.getLocalHost(), 0).setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()).setRemoteApplicationName(invoker.getUrl().getParameter(REMOTE_APPLICATION_KEY)).setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getParameter(APPLICATION_KEY));if (invocation instanceof RpcInvocation) {((RpcInvocation) invocation).setInvoker(invoker);}// pass default timeout set by end user (ReferenceConfig)Object countDown = context.get(TIME_COUNTDOWN_KEY);if (countDown != null) {TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;if (timeoutCountDown.isExpired()) {return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,"No time left for making the following call: " + invocation.getServiceName() + "."+ invocation.getMethodName() + ", terminate directly."), invocation);}}return invoker.invoke(invocation);}
}
2.3.5.2 FutureFilter

未来结果集处理Filter。

@Activate(group = CommonConstants.CONSUMER)
public class FutureFilter implements Filter, Filter.Listener {@Overridepublic Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {// 触发调用回调fireInvokeCallback(invoker, invocation);// 调用前需要配置是否有返回值,以帮助调用者判断是否需要返回future。return invoker.invoke(invocation);}
}
2.3.5.3 MonitorFilter

监控。

@Activate(group = {PROVIDER, CONSUMER})
public class MonitorFilter implements Filter, Filter.Listener {@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {if (invoker.getUrl().hasParameter(MONITOR_KEY)) {invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());invocation.put(MONITOR_REMOTE_HOST_STORE, RpcContext.getContext().getRemoteHost());getConcurrent(invoker, invocation).incrementAndGet(); // count up}return invoker.invoke(invocation); // proceed invocation chain}
}
2.3.6 ListenerInvokerWrapper
public class ListenerInvokerWrapper<T> implements Invoker<T> {@Overridepublic Result invoke(Invocation invocation) throws RpcException {return invoker.invoke(invocation);}
}
2.3.7 AsyncToSyncInvoker

异步、同步处理器

public class AsyncToSyncInvoker<T> implements Invoker<T> {@Overridepublic Result invoke(Invocation invocation) throws RpcException {// 执行 AbstractInvoker.invokerResult asyncResult = invoker.invoke(invocation);try {if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {/*** NOTICE!* must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because* {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.*/asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);}} catch (InterruptedException e) {throw new RpcException("Interrupted unexpectedly while waiting for remote result to return!  method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (ExecutionException e) {Throwable t = e.getCause();if (t instanceof TimeoutException) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} else if (t instanceof RemotingException) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} else {throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}} catch (Throwable e) {throw new RpcException(e.getMessage(), e);}return asyncResult;}
}

2.4 DubboInvoker

dubbo调用。

public class DubboInvoker<T> extends AbstractInvoker<T> {@Overrideprotected Result doInvoke(final Invocation invocation) throws Throwable {RpcInvocation inv = (RpcInvocation) invocation;final String methodName = RpcUtils.getMethodName(invocation);inv.setAttachment(PATH_KEY, getUrl().getPath());inv.setAttachment(VERSION_KEY, version);// 获取客户端连接ExchangeClient currentClient;if (clients.length == 1) {currentClient = clients[0];} else {currentClient = clients[index.getAndIncrement() % clients.length];}try {// 是否单向:根据有无返回值判断boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);// 超时时间int timeout = calculateTimeout(invocation, methodName);invocation.put(TIMEOUT_KEY, timeout);if (isOneway) { // 无返回值boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);// 通过Client端连接,进行远程通信currentClient.send(inv, isSent);return AsyncRpcResult.newDefaultAsyncResult(invocation);} else { // 有返回值ExecutorService executor = getCallbackExecutor(getUrl(), inv);// 通过Client端连接,进行远程通信CompletableFuture<AppResponse> appResponseFuture =currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter// 保持异步结果处理对象到当前请求上下文中FutureContext.getContext().setCompatibleFuture(appResponseFuture);AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);result.setExecutor(executor);// 方法return result;}} catch (TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (RemotingException e) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}}
}

3. result.recreate()

获取结果集。

result 为 AsyncRpcResult 对象。

public class AsyncRpcResult implements Result {@Overridepublic Object recreate() throws Throwable {RpcInvocation rpcInvocation = (RpcInvocation) invocation;if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {return RpcContext.getContext().getFuture();}return getAppResponse().recreate();}
}

3.2 服务端 ChannelHandler 处理链路

通过对 dubbo 服务发布的分析,最终,在服务收到客户端请求后,ChannelHandler 处理链路如下:

1. MultiMessageHandler
2. HeartbeatHandler
3. AllChannelHandler
4. DecodeHandler
5. DubboProtocol.handler

3.2.1 MultiMessageHandler

多消息处理Handler。

public class MultiMessageHandler extends AbstractChannelHandlerDelegate {protected static final Logger logger = LoggerFactory.getLogger(MultiMessageHandler.class);public MultiMessageHandler(ChannelHandler handler) {super(handler);}@SuppressWarnings("unchecked")@Overridepublic void received(Channel channel, Object message) throws RemotingException {// 判断是否为 MultiMessageif (message instanceof MultiMessage) {MultiMessage list = (MultiMessage) message;// 循环调用for (Object obj : list) {try {handler.received(channel, obj);} catch (ExecutionException e) {logger.error("MultiMessageHandler received fail.", e);handler.caught(channel, e);}}} else {handler.received(channel, message);}}
}

3.2.2 HeartbeatHandler

心跳处理Handler。

public class HeartbeatHandler extends AbstractChannelHandlerDelegate {@Overridepublic void received(Channel channel, Object message) throws RemotingException {setReadTimestamp(channel);// 判断是否为心跳请求if (isHeartbeatRequest(message)) {Request req = (Request) message;if (req.isTwoWay()) {Response res = new Response(req.getId(), req.getVersion());res.setEvent(HEARTBEAT_EVENT);channel.send(res);if (logger.isInfoEnabled()) {int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);if (logger.isDebugEnabled()) {logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()+ ", cause: The channel has no data-transmission exceeds a heartbeat period"+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));}}}return;}if (isHeartbeatResponse(message)) {if (logger.isDebugEnabled()) {logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());}return;}handler.received(channel, message);}
}

3.2.3 AllChannelHandler

处理客户端调用请求。

public class AllChannelHandler extends WrappedChannelHandler {@Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);try {// 异步执行一个 ChannelEventRunnableexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if(message instanceof Request && t instanceof RejectedExecutionException){sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}}
}

ChannelEventRunnable

public class ChannelEventRunnable implements Runnable {@Overridepublic void run() {// 处理客户端请求if (state == ChannelState.RECEIVED) {try {handler.received(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}} else {switch (state) {case CONNECTED: // 连接try {handler.connected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case DISCONNECTED: // 断开连接try {handler.disconnected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case SENT: // 发送try {handler.sent(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}break;case CAUGHT:try {handler.caught(channel, exception);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is: " + message + ", exception is " + exception, e);}break;default:logger.warn("unknown state: " + state + ", message is " + message);}}}
}

3.2.4 DecodeHandler

解码Handler。

public class DecodeHandler extends AbstractChannelHandlerDelegate {private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);public DecodeHandler(ChannelHandler handler) {super(handler);}@Overridepublic void received(Channel channel, Object message) throws RemotingException {if (message instanceof Decodeable) {decode(message);}if (message instanceof Request) {decode(((Request) message).getData());}if (message instanceof Response) {decode(((Response) message).getResult());}handler.received(channel, message);}private void decode(Object message) {if (message instanceof Decodeable) {try {((Decodeable) message).decode();if (log.isDebugEnabled()) {log.debug("Decode decodeable message " + message.getClass().getName());}} catch (Throwable e) {if (log.isWarnEnabled()) {log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);}} // ~ end of catch} // ~ end of if} // ~ end of method decode
}

3.2.5 DubboProtocol.handler

真正调用服务端方法的Handler。

public class DubboProtocol extends AbstractProtocol {private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {/*** 处理客户端方法调用请求,执行服务端方法,并方法*/@Overridepublic CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {if (!(message instanceof Invocation)) {throw new RemotingException(channel, "Unsupported request: "+ (message == null ? null : (message.getClass().getName() + ": " + message))+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());}Invocation inv = (Invocation) message;Invoker<?> invoker = getInvoker(channel, inv);// need to consider backward-compatibility if it's a callbackif (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {String methodsStr = invoker.getUrl().getParameters().get(METHODS_KEY);boolean hasMethod = false;if (methodsStr == null || !methodsStr.contains(COMMA_SEPARATOR)) {hasMethod = inv.getMethodName().equals(methodsStr);} else {String[] methods = methodsStr.split(COMMA_SEPARATOR);for (String method : methods) {if (inv.getMethodName().equals(method)) {hasMethod = true;break;}}}if (!hasMethod) {logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()+ " not found in callback service interface ,invoke will be ignored."+ " please update the api interface. url is:"+ invoker.getUrl()) + " ,invocation is :" + inv);return null;}}RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());// 执行本地方法Result result = invoker.invoke(inv);return result.thenApply(Function.identity());}@Overridepublic void received(Channel channel, Object message) throws RemotingException {if (message instanceof Invocation) {reply((ExchangeChannel) channel, message);} else {super.received(channel, message);}}@Overridepublic void connected(Channel channel) throws RemotingException {invoke(channel, ON_CONNECT_KEY);}@Overridepublic void disconnected(Channel channel) throws RemotingException {if (logger.isDebugEnabled()) {logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());}invoke(channel, ON_DISCONNECT_KEY);}private void invoke(Channel channel, String methodKey) {Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);if (invocation != null) {try {received(channel, invocation);} catch (Throwable t) {logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);}}}private Invocation createInvocation(Channel channel, URL url, String methodKey) {String method = url.getParameter(methodKey);if (method == null || method.length() == 0) {return null;}RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), "", new Class<?>[0], new Object[0]);invocation.setAttachment(PATH_KEY, url.getPath());invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));invocation.setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY));if (url.getParameter(STUB_EVENT_KEY, false)) {invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());}return invocation;}};
}

最终会执行 JavassistProxyFactory 生成的 AbstractProxyInvoker 对象中的方法。

3.3 客户端收到服务端执行结果后处理

在进行 NettyClient 连接创建时,会传入一个 Handler,如下:

public class NettyClient extends AbstractClient {@Overrideprotected void doOpen() throws Throwable {// 处理Handlerfinal NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);bootstrap = new Bootstrap();bootstrap.group(EVENT_LOOP_GROUP).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()).channel(socketChannelClass());bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(DEFAULT_CONNECT_TIMEOUT, getConnectTimeout()));bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));}NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug.addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder()).addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)).addLast("handler", nettyClientHandler);String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);if(socksProxyHost != null) {int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));ch.pipeline().addFirst(socks5ProxyHandler);}}});}
}

所以当收到服务端响应结果后,会执行 NettyClientHandler#channelRead() 方法,进行处理,最终,会进入 HeaderExchangeHandler#received() 方法中。

3.3.1 HeaderExchangeHandler

public class HeaderExchangeHandler implements ChannelHandlerDelegate {@Overridepublic void received(Channel channel, Object message) throws RemotingException {final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {handlerEvent(channel, request);} else {if (request.isTwoWay()) {handleRequest(exchangeChannel, request);} else {handler.received(exchangeChannel, request.getData());}}} else if (message instanceof Response) {// 处理服务端响应handleResponse(channel, (Response) message);} else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (echo != null && echo.length() > 0) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}}// 处理服务端响应结果static void handleResponse(Channel channel, Response response) throws RemotingException {if (response != null && !response.isHeartbeat()) {DefaultFuture.received(channel, response);}}
}

3.3.2 DefaultFuture

public class DefaultFuture extends CompletableFuture<Object> {// 处理服务端响应结果public static void received(Channel channel, Response response) {received(channel, response, false);}public static void received(Channel channel, Response response, boolean timeout) {try {DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {Timeout t = future.timeoutCheckTask;if (!timeout) {// decrease Timet.cancel();}// 处理服务端响应结果future.doReceived(response);} else {logger.warn("The timeout response finally returned at "+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))+ ", response status is " + response.getStatus()+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()+ " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");}} finally {CHANNELS.remove(response.getId());}}private void doReceived(Response res) {if (res == null) {throw new IllegalStateException("response cannot be null");}if (res.getStatus() == Response.OK) { // 是否执行成功this.complete(res.getResult());} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { // 超时this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));} else { // 远程服务异常this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));}// the result is returning, but the caller thread may still waiting// to avoid endless waiting for whatever reason, notify caller thread to return.if (executor != null && executor instanceof ThreadlessExecutor) {ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;if (threadlessExecutor.isWaiting()) {threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +" which is not an expected state, interrupt the thread manually by returning an exception."));}}}
}

四、总结

dubbo源码解析之服务调用(通信)流程相关推荐

  1. dubbo(5) Dubbo源码解析之服务调用过程

    来源:https://juejin.im/post/5ca4a1286fb9a05e731fc042 Dubbo源码解析之服务调用过程 简介 在前面的文章中,我们分析了 Dubbo SPI.服务导出与 ...

  2. Netty 源码解析系列-服务端启动流程解析

    netty源码解析系列 Netty 源码解析系列-服务端启动流程解析 Netty 源码解析系列-客户端连接接入及读I/O解析 五分钟就能看懂pipeline模型 -Netty 源码解析 1.服务端启动 ...

  3. dubbo(4) Dubbo源码解析之服务引入过程

    来源:https://juejin.im/post/5ca37314e51d454cb97d9c40 1. 简介 在 Dubbo 中,我们可以通过两种方式引用远程服务.第一种是使用服务直连的方式引用服 ...

  4. Dubbo源码解析之服务路由策略

    1. 简介 服务目录在刷新 Invoker 列表的过程中,会通过 Router 进行服务路由,筛选出符合路由规则的服务提供者.在详细分析服务路由的源码之前,先来介绍一下服务路由是什么.服务路由包含一条 ...

  5. dubbo源码解析之框架粗谈

    dubbo框架设计 一.dubbo框架整体设计 二.各层说明 三.dubbo工程模块分包 四.依赖关系 五.调用链 文章系列 [一.dubbo源码解析之框架粗谈] [二.dubbo源码解析之dubbo ...

  6. Eureka Server源码解析(服务故障下线剔除流程)

    原创不易,转载注明出处 系列文章目录 <SpringCloud Eureka Server源码解析(启动流程)> <Eureka Server源码解析(服务注册流程)> < ...

  7. dubbo源码解析-逻辑层设计之服务降级

    Dubbo源码解析系列文章均来自肥朝简书 前言 在dubbo服务暴露系列完结之后,按计划来说是应该要开启dubbo服务引用的讲解.但是现在到了年尾,一些朋友也和我谈起了明年跳槽的事.跳槽这件事,无非也 ...

  8. Dubbo源码解析-Dubbo服务消费者_Dubbo协议(一)

    前言: 在介绍完Dubbo 本地模式(Injvm协议)下的服务提供与消费后,上文我们又介绍了Dubbo远程模式(dubbo协议)下的服务暴露过程,本质上就是通过Netty将dubbo协议端口暴露出去, ...

  9. 【dubbo源码解析】--- dubbo中Invoker嵌套调用底层原理

    本文对应源码地址:https://github.com/nieandsun/dubbo-study 文章目录 1 dubbo中Invoker的重要性 2 dubbo RPC链条中代理对象的底层逻辑 2 ...

  10. Dubbo源码解析 —— Zookeeper 订阅

    作者:肥朝 原文地址:https://www.jianshu.com/p/73224a6c07bb 友情提示:欢迎关注公众号[芋道源码].????关注后,拉你进[源码圈]微信群和[肥朝]搞基嗨皮. 友 ...

最新文章

  1. 发布方配模板引擎V2.1及开发教程和案例
  2. Python字符串格式化之format方法详解
  3. linux下 C编程改变输出字体颜色
  4. python的应用范围有哪些_Python主要应用场景有哪些?
  5. 输入输出流(I/O)
  6. Error:Execution failed for task ':APP:transformClassesWithDexForDebug'...
  7. LeetCodeOJ. String to Integer (atoi)
  8. 华为手机如何调时间显示_华为手机照片如何出现时间地点天气,教你30秒,一学就会...
  9. 微信“农场”偷偷上线!网友:偷菜还会远吗 QQ被抄麻了
  10. APP提示框设计模板|UI设计师灵感好帮手
  11. iOS cell添加点击时改变字体的颜色及背景
  12. 观点|通过短生命周期和最小权限原则保护软件供应链安全
  13. Java 多线程 - 线程 - 守护线程
  14. python免费网课-Python网课推荐——免费学习Python编程
  15. python五位回文数_蓝桥杯,特殊回文数,Python
  16. Kotlin 函数(普通函数)
  17. 数据驱动的智慧城市 中兴通讯推进“沈阳模式”落地
  18. 介绍数据库中的wal技术_简介事务ACID的实现机制
  19. 浅析信息化项目的信息化绩效评价
  20. 用matlab求光谱的一阶导数二阶导数

热门文章

  1. 2021最新阿里代码规范(前端篇)
  2. 定时任务之cron表达式
  3. 按键精灵手机助手之实战篇(四)项目源码
  4. synchronized偏向锁
  5. 如何为html代码加密
  6. usboot应用两篇:用USBOOT修理移动硬盘等
  7. 史陶比尔Staubli库卡kuka机器人切割加工首先电主轴德国sycotec
  8. 服务器本地输入域名可以打开网站,但其他地方打不开网站,,域名解析后出现网站打不开的几种情况...
  9. 苹果录制屏幕在哪设置_屏幕录像专家如何录全屏 屏幕录像专家全屏录制设置方法...
  10. Mimics 21安装