在前面的文章中,我们分析了 dubbo 从 provider 进行服务暴露,然后把服务信息注册到注册中心上面解耦 consumerprovider 的调用。consumer 通过 javassist 创建代理对象引用远程服务。当通过代理对象调用远程服务的时候,讲到进行真正调用的时候 dubbo 抽象出集群容错(ClusterDirectoryRouterLoadBalance)从服务多个暴露方选取出一个合适的 Invoke 来进行调用。 dubbo 默认是通过 FailoverClusterInvoker 从多个 Invoke 中选择出一个 Invoke 实例 InvokerWrapper 来进行远程调用。本次分析主要包括以下 4 个部分:

  • consumer 发送扩展
  • consumer 发送原理
  • consumer 接收原理
  • dubbo 异步变同步

1、consumer 发送扩展

我们先来看一下 dubbo 中 consumer 端的请求发送原理,也就是从 InvokerWrapper#invoke 开始,在 consumer 服务引用分析的时候,我们知道根据 Invoke 调用的时候, dubbo 会创建 ProtocolListenerWrapper与 ProtocolFilterWrapper 来用集成框架使用者的扩展包含:InvokerListenerFilterProtocolListenerWrapper 在对象创建的时候就会调用InvokerListener#referred扩展,所以在远程服务调用的时候最主要的还是 Filter 扩展,下面我们就看一下在远程调用的时候默认包括哪些 Filter 扩展:

  • ConsumerContextFilter
  • FutureFilter
  • MonitorFilter

1.1 ConsumerContextFilter

ConsumerContextFilter 保存客户端信息到 RpcContext

@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {RpcContext.getContext().setInvoker(invoker).setInvocation(invocation).setLocalAddress(NetUtils.getLocalHost(), 0).setRemoteAddress(invoker.getUrl().getHost(),invoker.getUrl().getPort());if (invocation instanceof RpcInvocation) {((RpcInvocation) invocation).setInvoker(invoker);}try {return invoker.invoke(invocation);} finally {RpcContext.getContext().clearAttachments();}}}

RpcContext 使用 ThreadLocal 来记录一个临时状态。当接收到 RPC 请求,或发起 RPC请求时,RpcContext 的状态都会变化。

比如:A 调 B,B 再调 C,则 B 机器上,在 B 调 C 之前,RpcContext 记录的是 A 调 B 的信息,在 B 调 C 之后,RpcContext 记录的是 B 调 C 的信息。

可以通过 RpcContext 上的 setAttachmentgetAttachment 在服务消费方和提供方之间进行参数的隐式传递。

1.2 FutureFilter

FutureFilter 会来处理 dubbo 服务接口调用方配置 async="true" 来使用同步调用来是异步调用。

public class FutureFilter implements Filter {protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);fireInvokeCallback(invoker, invocation);//需要在调用前配置好是否有返回值,已供invoker判断是否需要返回future.Result result = invoker.invoke(invocation);if (isAsync) {asyncCallback(invoker, invocation);} else {syncCallback(invoker, invocation, result);}return result;}
}

同步调用 dubbo 就会同步的返回 provider 方法调用返回的响应.如果是异步调用在进行调用的时候就会把请求信息发送到 provider 然后返回一个空的 RpcResultconsumer 端如果要获取响应需要通过以下方法获取:

// 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
Future<Bar> barFuture = RpcContext.getContext().getFuture();
// 同理等待bar返回
Bar bar = barFuture.get();

1.3 MonitorFilter

MonitorFilter 其实是在分析之前 dubbo monitor 的时候就进行了详细的分析。它主要是通过以下配置来激活 providerconsumer 端的指标监控。

<dubbo:monitor protocol="registry" />

我们还是简单的来看一下它的源码:

public class MonitorFilter implements Filter {private MonitorFactory monitorFactory;public void setMonitorFactory(MonitorFactory monitorFactory) {this.monitorFactory = monitorFactory;}// 调用过程拦截public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {RpcContext context = RpcContext.getContext(); // 提供方必须在invoke()之前获取context信息String remoteHost = context.getRemoteHost();long start = System.currentTimeMillis(); // 记录起始时间戮getConcurrent(invoker, invocation).incrementAndGet(); // 并发计数try {Result result = invoker.invoke(invocation); // 让调用链往下执行collect(invoker, invocation, result, remoteHost, start, false);return result;} catch (RpcException e) {collect(invoker, invocation, null, remoteHost, start, true);throw e;} finally {getConcurrent(invoker, invocation).decrementAndGet(); // 并发计数}} else {return invoker.invoke(invocation);}}
}

当启动 dubbo monitor 的时候会暴露一个远程服务 MonitorService 接口服务服务,具体的处理类是 SimpleMonitorService。而在 MonitorFilter#collect 方法里面 MonitorFactory 会创建一个 Monitor 接口实例(继承于 MonitorService)。其实就是 DubboMonitorFactroy#createMonitor 远程引用 dubbo monitor 暴露的 MonitorService 服务。

public class DubboMonitorFactroy extends AbstractMonitorFactory {private Protocol protocol;private ProxyFactory proxyFactory;public void setProtocol(Protocol protocol) {this.protocol = protocol;}public void setProxyFactory(ProxyFactory proxyFactory) {this.proxyFactory = proxyFactory;}@Overrideprotected Monitor createMonitor(URL url) {url = url.setProtocol(url.getParameter(Constants.PROTOCOL_KEY, "dubbo"));if (url.getPath() == null || url.getPath().length() == 0) {url = url.setPath(MonitorService.class.getName());}String filter = url.getParameter(Constants.REFERENCE_FILTER_KEY);if (filter == null || filter.length() == 0) {filter = "";} else {filter = filter + ",";}url = url.addParameters(Constants.CLUSTER_KEY, "failsafe", Constants.CHECK_KEY, String.valueOf(false),Constants.REFERENCE_FILTER_KEY, filter + "-monitor");Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, url);MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);return new DubboMonitor(monitorInvoker, monitorService);}}

获取到远程服务 SimpleMonitorService,最后在 MonitorFilter#collect 调用 MonitorService#collect 进行监控数据采集提供给 dubbo monitor。调用过程如下所示:

2、consumer 发送原理

最终 consumer 会到 DubboInvoke 进行服务调用。它会在 AbstractInvoker#invoke 添加一些扩展参数到 RpcInvocation 这个远程调用对象里面。添加的扩展参数包含:

  • interface : 远程调用的接口名称
  • group : 接口分组名称
  • token : 调用的 token 信息
  • timeout : 调用服务的超时时间
  • async : 是否异步调用
  • id : 异步操作默认添加 invocation id,用于保证操作幂等

以及 RpcContext 传递过来的扩展参数(RpcContext#attachments)。然后在 DubboInvoker#doInvoke 中会添加 path (接口全类名) 以及 version(版本信息)。再根据 dubbo 的调用模式进行远程调用,包含以下三种调用模式:

  • oneway 模式:<dubbo:method>标签的 return 属性配置为false,则是oneway模式,利用ExchangeClient 对象向服务端发送请求消息之后,立即返回空 RpcResult 对象
  • 异步模式:<dubbo:method>标签的 async 属性配置为 ture,则是异步模式,直接返回空 RpcResult对象,由 FutureFilterDefaultFuture 完成异步处理工作
  • 同步模式:默认即是同步,则发送请求之后线程进入等待状态,直到收到服务端的响应消息或者超时。

下面我们看一下 dubbo 同步调用时序图:

ChannelFuture future = channel.write(message);

最终是调用 org.jboss.netty.channel.Channel 通过 socket 发送消息到从集群中选择出的一个暴露服务信息的服务器发送网络数据。

3、consumer 接收原理

我们都知道 dubbo 其实是通过 netty 来进行 socket 通信的。而在使用 netty 进行网络编程的时候,其实核心就是就是实现 ChannelHandler。而在 dubbo 中对应的实现类就是 NettyHandler(高版本支持支持 netty 4 使用的是 NettyClientHandler ,NettyHandler 使用的是 netty 3.x)。如果在 consumer 端(provider 也支持)需要使用 netty 4 进行业务处理,需要进行进行以下配置:

<dubbo:consumer client="netty4" />

所以 consumer 接收 provider 响应的入口就在 NettyClientHandler#channelRead

首先 ChannelHandler 用于接收 provider 端响应回来的请求,然后经过 3 个 dubbo 自定义的 ChannelHandler

  • MultiMessageHandler:支持 MultiMessage 消息处理,也就是多条消息处理。
  • HeartbeatHandler:netty 心条检测。如果心跳请求,发送心跳然后直接 return,如果是心跳响应直接 return
  • DecodeHandler:解码 message,解析成 dubbo 中的 Response 对象
  • HeaderExchangeHandler:处理解析后的 provider 端返回的 Response 响应信息,把响应结果赋值到 DefaultFuture 响应获取阻塞对象中。

4、dubbo 异步变同步

我们都知道 dubbo 是基于 netty NIO 的非阻塞 并行调用通信。所以 dubbo 在 consumer 请求 provider 后响应都是异步的。但是在 dubbo 里面默认是同步返回的,那么 dubbo 是如何把异步响应变成同步请求的呢?带着这个问题,首先我们来看一下 dubbo 里面的几种请求方式。

4.1 异步且无返回值

这种请求最简单,consumer 把请求信息发送给 provider 就行了。只是需要在 consumer 端把请求方式配置成异步请求就好了。如下:

<dubbo:method name="sayHello" return="false"></dubbo:method>

4.2 异步且有返回值

这种情况下 consumer 首先把请求信息发送给 provider 。这个时候在 consumer 端不仅把请求方式配置成异步,并且需要 RpcContext 这个 ThreadLocal 对象获取到 Future 对象,然后通过 Future#get() 阻塞式获取到 provider 的响应。那么这个 Future 是如果添加到 RpcContext 中呢?

在第二小节讲服务发送的时候, 在 DubboInvoke 里面有三种调用方式,之前只具体请求了同步请求的发送方式而且没有异步请求的发送。异步请求发送代码如下:

DubboInvoker#doInvoke 中的 else if (isAsync) 分支

    ResponseFuture future = currentClient.request(inv, timeout);FutureAdapter<T> futureAdapter = new FutureAdapter<>(future);RpcContext.getContext().setFuture(futureAdapter);Result result;if (RpcUtils.isAsyncFuture(getUrl(), inv)) {result = new AsyncRpcResult<>(futureAdapter);} else {result = new RpcResult();}return result;

上面的代码逻辑是直接发送请求到 provider 返回一个 ResponseFuture 实例,然后把这个 Future 对象保存到 RpcContext#LOCAL 这个 ThreadLocal 当前线程对象当中,并且返回一个空的 RpcResult对象。如果要获取到 provider 响应的信息,需要进行以下操作:

// 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
Future<String> temp= RpcContext.getContext().getFuture();
// 同理等待bar返回
hello=temp.get();

4.3 异步变同步(默认)

下面我们就来讨论一下 dubbo 是如何把异步请求转化成同步请求的。其实原理和异步请求的通过 Future#get 等待 provider 响应返回一样,只不过异步有返回值是显示调用而默认是 dubbo 内部把这步完成了。下面我们就来分析一下 dubbo 是如何把 netty 的异步响应变成同步返回的。(当前线程怎么让它 “暂停”,等结果回来后,再执行?)

我们都知道在 consumer 发送请求的时候会调用 HeaderExchangeChannel#request 方法:

HeaderExchangeChannel#request

    public ResponseFuture request(Object request, int timeout) throws RemotingException {if (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request.Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setData(request);DefaultFuture future = new DefaultFuture(channel, req, timeout);try {channel.send(req);} catch (RemotingException e) {future.cancel();throw e;}return future;}

它首先会通过 dubbo 自定义的 ChannelRequesttimeout(int) 构造一个 DefaultFuture 对象。然后再通过 NettyChannel 发送请求到 provider,最后返回这个 DefaultFuture。下面我们来看一下通过构造方法是如何创建 DefaultFuture 的。我只把主要涉及到的属性展示出来:

public class DefaultFuture implements ResponseFuture {private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();private final long id;private final Channel channel;private final Request request;private final int timeout;public DefaultFuture(Channel channel, Request request, int timeout) {this.channel = channel;this.request = request;this.id = request.getId();this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);// put into waiting map.FUTURES.put(id, this);CHANNELS.put(id, channel);}
}

这个 id 是在创建 Request 的时候使用 AtomicLong#getAndIncrement 生成的。从 1 开始并且如果它一直增加直到生成负数也能保证这台机器这个值是唯一的,且不冲突的。符合唯一主键原则。 dubbo 默认同步变异步其实和异步调用一样,也是在 DubboInvoker#doInvoke 实现的。

DubboInvoker#doInvoke

    RpcContext.getContext().setFuture(null);return (Result) currentClient.request(inv, timeout).get();

关键就在 ResponseFuture#get 方法上面,下面我们来看一下这个方法的源码:

    public Object get(int timeout) throws RemotingException {if (timeout <= 0) {timeout = Constants.DEFAULT_TIMEOUT;}if (!isDone()) {long start = System.currentTimeMillis();lock.lock();try {while (!isDone()) {done.await(timeout, TimeUnit.MILLISECONDS);if (isDone() || System.currentTimeMillis() - start > timeout) {break;}}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}if (!isDone()) {throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));}}return returnFromResponse();}

其实就是 while 循环,利用 java 的 lock 机制判断如果在超时时间范围内 DefaultFuture#response 如果赋值成不为空就返回响应,否则抛出 TimeoutException 异常。下面我们就来看一下 DefaultFuture#response 是如何被赋值的。

还记得 consumer 接收 provider 响应的最后一步吗?就是 DefaultFuture#received,在 provider 端会带回 consumer 请求的 id。我们来看一下它的具体处理逻辑:

    public static void received(Channel channel, Response response) {try {DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {future.doReceived(response);} else {logger.warn("The timeout response finally returned at "+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))+ ", response " + response+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()+ " -> " + channel.getRemoteAddress()));}} finally {CHANNELS.remove(response.getId());}}

它会从最开始通过构造函数传进去的 DefaultFuture#FUTURES 根据请求的 id 拿到 DefaultFuture ,然后根据这个 DefaultFuture 调用 DefaultFuture#doReceived 方法。通过 Java 里面的 lock 机制把 provider 的值赋值给 DefaultFuture#response。此时 consumer 也正在调用 DefaultFuture#get 方法进行阻塞,当这个 DefaultFuture#response 被赋值后,它的值就不为空。阻塞操作完成,且根据请求号的 idconsumer 端的 Request 以及 Provider 端返回的 Response 关联了起来。

这个就是 Dubbo 异步转同步的原理,是不是很巧妙,很简单。

dubbo源码分析22 -- consumer 发送与接收原理相关推荐

  1. apache dubbo 源码分析系列汇总

    Dubbo(读音[ˈdʌbəʊ])是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和 Spring框架无缝集成.后面捐献给了知名的开源社区 ...

  2. Dubbo 源码分析 - 集群容错之 Router

    1. 简介 上一篇文章分析了集群容错的第一部分 – 服务目录 Directory.服务目录在刷新 Invoker 列表的过程中,会通过 Router 进行服务路由.上一篇文章关于服务路由相关逻辑没有细 ...

  3. Dubbo源码分析:小白入门篇

    关注公众号"java后端技术全栈" 回复"000"获取优质面试资料 大家好,我是老田 答应了小伙伴的Dubbo源码分析系列,今天终于来了,希望不是很晚. 主要也 ...

  4. 志宇-dubbo源码分析

    dubbo源码分析 文档 dubbo加载配置文件 dubboSPI dubbo服务提供 1.校验配置信息 2.创建URL 3.本地注册 4.远程注册 4.1 开启netty服务端 4.2 连接注册中心 ...

  5. Dubbo 源码分析 - 集群容错之 LoadBalance

    1.简介 LoadBalance 中文意思为负载均衡,它的职责是将网络请求,或者其他形式的负载"均摊"到不同的机器上.避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况.通 ...

  6. Dubbo 源码分析 - 集群容错之 Cluster

    1.简介 为了避免单点故障,现在的应用至少会部署在两台服务器上.对于一些负载比较高的服务,会部署更多台服务器.这样,同一环境下的服务提供者数量会大于1.对于服务消费者来说,同一环境下出现了多个服务提供 ...

  7. Dubbo 源码分析 - 服务导出

    1.服务导出过程 本篇文章,我们来研究一下 Dubbo 导出服务的过程.Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑.整个逻辑大致可 ...

  8. 精尽 Dubbo 源码分析 —— API 配置

    1. 概述 Dubbo 的配置目前提供了四种配置方式:1. API 配置 2. 属性配置 3. XML 配置 4. 注解配置 2. 配置一览 我们来看看 dubbo-config-api 的项目结构, ...

  9. dubbo源码分析系列(1)扩展机制的实现

    1 系列目录 dubbo源码分析系列(1)扩展机制的实现 dubbo源码分析系列(2)服务的发布 dubbo源码分析系列(3)服务的引用 dubbo源码分析系列(4)dubbo通信设计 2 SPI扩展 ...

最新文章

  1. u盘循环冗余能修复吗_古董修复能修复吗?-恩平 - 商业服务
  2. smarty_modifier_truncate,无或者有md_substr的情况下都能正确截取字符串的php函数,可用于smarty。...
  3. 【Android】Mac Android adb 配置
  4. phpmailer SMTP Error: Could not connect to SMTP host. 错误解决
  5. python远程ftp服务器文件,如何将远程python FTP服务器连接到本地pythonftp客户端
  6. go flag包获取命令行参数使用示例
  7. linux下mysql安装
  8. 网络——Base64Encode(转:自定义Base64编码器——Base64Encode)
  9. Android开发中无处不在的设计模式——动态代理模式
  10. linux 路由表设置 之 route 指令详解【转】
  11. ssh配置config文件
  12. 下载新版火狐后无法同步书签_火狐浏览器书签同步设置教程
  13. android 字体调节大小,教你如何修改Android默认字体大小和设置字体大小比例
  14. 常见音频格式开源解码库汇总(持续更新)
  15. 两个重要极限及相关推导极限
  16. 关于ABAQUS2020二次开发
  17. 基于光流和RNN的长视频微表情检测
  18. SpringCloud 微服务网关Gateway常用限流算法以及简单实现
  19. UPlayer播放器问题相关
  20. python自动点赞微信_基于AirTest+Python的ios自动化测试demo(微信朋友圈无限点赞)

热门文章

  1. 微信小程序数据传值与引用data变量
  2. 内网服务器通过代理服务器访问外网
  3. 建筑力学与结构【1】
  4. 普瑞姆管理系统(软件业)V1.8.5迭代开发内容
  5. 1086 就不告诉你(C语言)
  6. 谷歌chrome利用Bookmarks文件找回书签
  7. ccf 201703-4 地铁修建(100分)
  8. 使用SGC训练和测试SEED数据集
  9. 太诱电容不同系列区别
  10. 边播放歌曲边滚动歌词