前言

RPC 框架需要维护客户端和服务端的连接,通常是一个客户端对应多个服务端,而客户端看到的是接口,并不是服务端的地址,服务端地址对于客户端来讲是透明的。

那么,如何实现这样一个 RPC 框架的网络连接呢?

我们从 SOFA 中寻找答案。

连接管理器介绍

先从一个小 demo 开始看:

ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>().setInterfaceId(HelloService.class.getName()) // 指定接口.setProtocol("bolt") // 指定协议.setDirectUrl("bolt://127.0.0.1:9696"); // 指定直连地址HelloService helloService = consumerConfig.refer();while (true) {System.out.println(helloService.sayHello("world"));try {Thread.sleep(2000);} catch (Exception e) {}
}

上面的代码中,一个 ConsumerConfig 对应一个接口服务,并指定了直连地址。

然后调用 ref 方法。每个 ConsumerConfig 绑定了一个 ConsumerBootstrap,这是一个非单例的类。

而每个 ConsumerBootstrap 又绑定了一个 Cluster,这是真正的客户端。该类包含了一个客户端所有的关键信息,例如:

  1. Router 路由链
  2. loadBalance 负载均衡
  3. addressHolder 地址管理器
  4. connectionHolder 连接管理器
  5. filterChain 过滤器链

这 5 个实例是 Cluster 的核心。一个客户端的正常使用绝对离不开这 5 个元素。

我们之前分析了 5 个中的 4 个,今天分析最后一个 —— 连接管理器。

他可以说是 RPC 网络通信的核心。

地址管理器代表的是:一个客户端可以拥有多个接口。
连接管理器代表的是:一个客户端可以拥有多个 TCP 连接。

很明显,地址管理器的数据肯定比连接管理器要多。因为通常一个 TCP 连接(Server 端)可以含有多个接口。

那么 SOFA 是如何实现连接管理器的呢?

从 AbstractCluster 的 init 方法中,我们知道,该方法初始化了 Cluster。同时也初始化了 connectionHolder。

具体代码如下:

// 连接管理器
connectionHolder = ConnectionHolderFactory.getConnectionHolder(consumerBootstrap);

使用了 SPI 的方式进行的初始化。目前 RPC 框架的具体实现类只有一个 AllConnectConnectionHolder。即长连接管理器。

该类需要一个 ConsumerConfig 才能初始化。

该类中包含很多和连接相关的属性,有 4 个 Map,未初始化的 Map,存活的节点列表,存活但亚健康的列表,失败待重试的列表。这些 Map 的元素都会随着服务的网络变化而变化。

而这些 Map 中的元素则是:ConcurrentHashMap<ProviderInfo, ClientTransport> 。

即每个服务者的信息对应一个客户端传输。那么这个 ClientTransport 是什么呢?看过之前文章的都知道,这个一个 RPC 和 Bolt 的胶水类。该类的默认实现 BoltClientTransport 包含了一个 RpcClient 属性,注意,该属性是个静态的。也就是说,是所有实例公用的。并且,BoltClientTransport 包含一个 ProviderInfo 属性。还有一个 Url 属性,Connection 属性(网络连接)。

我们理一下:一个 ConsumerConfig 绑定一个 Cluster,一个 Cluster 绑定一个 connectionHolder,一个 connectionHolder 绑定多个 ProviderInfo 和 ClientTransport。

因为一个客户端可以和多个服务进行通信。

代码如何实现?

在 Cluster 中,会对 connectionHolder 进行初始化,在 Cluster 从注册中心得到服务端列表后,会建立长连接。

从这里开始,地址管理器开始运作。

Cluster 的 updateAllProviders 方法是源头。该方法会将服务列表添加到 connectionHolder 中。即调用 connectionHolder.updateAllProviders(providerGroups) 方法。该方法会全量更新服务端列表。

如果更新的时候,发现有新的服务,便会建立长连接。具体代码如下:

if (!needAdd.isEmpty()) {addNode(needAdd);
}

addNode 方法就是添加新的节点。该方法会多线程建立 TCP 连接。

首先会根据 ProviderInfo 信息创建一个 ClientTransport,然后向线程池提交一个任务,任务内容是 initClientTransport(),即初始化客户端传输。

该方法代码如下(精简过了):

private void initClientTransport(String interfaceId, ProviderInfo providerInfo, ClientTransport transport) {transport.connect();if (doubleCheck(interfaceId, providerInfo, transport)) {printSuccess(interfaceId, providerInfo, transport);addAlive(providerInfo, transport);} else {printFailure(interfaceId, providerInfo, transport);addRetry(providerInfo, transport);}
}

其中关键是调用 transport 的 connect 方法建立连接。

该方法的默认实现在 BoltClientTransport 中,符合我们的预期。我们知道, BoltClientTransport 有一个 RpcClient 的静态实例。这个实例在类加载的时候,就会在静态块中初始化。初始化内容则是初始化他的一些属性,例如地址解析器,连接管理器,连接监控等等。

我们再看 BoltClientTransport 的 connect 方法,该方法主要逻辑是初始化连接。方式则是通过 RpcClient 的 getConnection 方法来获取,具体代码如下:

 connection = RPC_CLIENT.getConnection(url, url.getConnectTimeout());

传入一个 URL 和超时时间。 RpcClient 则是调用连接管理器的 getAndCreateIfAbsent 方法获取,同样传入 Url,这个方法的名字很好,根据 URL 获取连接,如果没有,就创建一个。

有必要看看具体代码:

public Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException {// get and create a connection pool with initialized connections.ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(),new ConnectionPoolCall(url));if (null != pool) {return pool.get();} else {logger.error("[NOTIFYME] bug detected! pool here must not be null!");return null;}
}

该方法会继续调用自身的 getConnectionPoolAndCreateIfAbsent 方法,传入 URL 的唯一标识,和一个 ConnectionPoolCall 对象(实现了 Callable)。

然后阻塞等待返回连接。

我们看看这个 ConnectionPoolCall 的 call 方法实现。该方法调用了连接管理器的 doCreate 方法。传入了 URL 和一个连接池。然后 call 方法返回连接池。

doCreate 方法中,重点就是 create 方法,传入了一个 url,返回一个 Connection,并放入连接池。默认池中只有一个长连接。

而 create 方法则是调用连接工厂的 createConnection 方法。然后调用 doCreateConnection 方法。该方法内部给了我们明确的答案:调用 Netty 的 Bootstrap 的 connect 方法。

代码如下:

bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));

熟悉 Netty 的同学一眼便看出来了。这是一个连接服务端的操作。而这个 BootStrap 的初始化则是在 RpcClient 初始化的时候进行的。注意:BootStrap 是可以共享的。

可以看到, ConnectionPoolCall 的 call 方法就是用来创建 Netty 连接的。回到 getAndCreateIfAbsent 方法里,继续看 getConnectionPoolAndCreateIfAbsent 方法的实现。

该方法内部将 Callable 包装成一个 FutureTask,目的应该是为了以后的异步运行吧,总之,最后还是同步调用了 run 方法。然后调用 get 方法阻塞等待,等待刚刚 call 方法返回的连接池。然后返回。

得到连接池,连接池调用 get 方法,从池中根据策略选取一个连接返回。目前只有一个随机选取的策略。

这个 Connection 连接实例会保存在 BoltClientTransport 中。

在客户端进行调用的时候, RpcClient 会根据 URL 找到对应的连接,然后,获取这个连接对应的 Channel ,向服务端发送数据。具体代码如下:

conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (!f.isSuccess()) {conn.removeInvokeFuture(request.getId());future.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), f.cause()));logger.error("Invoke send failed, id={}", request.getId(), f.cause());}}
});

以上,就是 SOFA 的连接的原理和设计。

总结

连接管理器是我们分析 SOFA—RPC Cluster 中的最后一个模块,他管理着一个客户端对应的所有服务网络连接。

connectionHolder 内部包含多个 Map,Map 中的 key 是 Provider,value 是 ClientTransport,ClientTransport 是 RpcClient 和 SOFA 的胶水类,通常一个 Provider 对应一个 ClientTransport。ClientTransport 其实就是一个连接的包装。

ClientTransport 获取连接的方式则是通过 RpcClient 的 连接管理器获取的。该连接管理器内部包含一个连接工厂,会根据 URL 创建连接。创建连接的凡是则是通过 Netty 的 BootStrap 来创建。

当我们使用 Provider 对应的 ClientTransport 中的 RpcClient 发送数据的时候,则会根据 URL 找到对应 Connection,并获取他的 Channel ,向服务端发送数据。

好了,以上就是 SOFA—RPC 连接管理的分析。

篇幅有限,如有错误,还请指正。

转载于:https://www.cnblogs.com/stateis0/p/9006080.html

SOFA 源码分析 — 连接管理器相关推荐

  1. Tomcat7.0源码分析——Session管理分析(下)

    版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/beliefer/article/details/52451061 前言 在<Tomcat7.0 ...

  2. Tomcat7.0源码分析——Session管理分析(上)

    版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/beliefer/article/details/52450268 前言 对于广大java开发者而言, ...

  3. v35.03 鸿蒙内核源码分析(时间管理) | 内核基本时间单位是谁 | 百篇博客分析HarmonyOS源码

    子曰:"譬如为山,未成一篑,止,吾止也:譬如平地,虽覆一篑,进,吾往也." <论语>:子罕篇 百篇博客系列篇.本篇为: v35.xx 鸿蒙内核源码分析(时间管理篇) | ...

  4. v19.04 鸿蒙内核源码分析(位图管理) | 特节俭的苦命孩子 | 百篇博客分析HarmonyOS源码

    子曰:"饭疏食,饮水,曲肱而枕之,乐亦在其中矣.不义而富且贵,于我如浮云." <论语>:述而篇 百篇博客系列篇.本篇为: v19.xx 鸿蒙内核源码分析(位图管理篇) ...

  5. threejs 源码解析_ThreeJS 物理材质shader源码分析(顶点着色器)

    ThreeJS 物理材质shader源码分析(顶点着色器) Threejs将shader代码分为ShaderLib和ShaderChunk两部分,ShaderLib通过组合ShaderChunk的代码 ...

  6. SOFA 源码分析 —— 服务发布过程

    前言 SOFA 包含了 RPC 框架,底层通信框架是 bolt ,基于 Netty 4,今天将通过 SOFA-RPC 源码中的例子,看看他是如何发布一个服务的. 示例代码 下面的代码在 com.ali ...

  7. Python3.5源码分析-内存管理

    Python3源码分析 本文环境python3.5.2. 参考书籍<<Python源码剖析>> python官网 Python3的内存管理概述 python提供了对内存的垃圾收 ...

  8. SOFA 源码分析 — 链路数据透传

    前言 SOFA-RPC 支持数据链路透传功能,官方解释: 链路数据透传功能支持应用向调用上下文中存放数据,达到整个链路上的应用都可以操作该数据. 使用方式如下,可分别向链路的 request 和 re ...

  9. Tomcat源码分析(十)--部署器 转载

    本系列转载自 http://blog.csdn.net/haitao111313/article/category/1179996 我们知道,在Tomcat的世界里,一个Host容器代表一个虚机器资源 ...

最新文章

  1. 图书抄袭何时休,技术人的版权在哪里?
  2. 【学习笔记】月末操作-GR/IR重组
  3. 利用 squid 反向代理提高网站性能
  4. boost::geometry::segment_view用法的测试程序
  5. Android经常使用的五种弹出对话框
  6. MySQL连接方式:长连接或者短连接
  7. Tmux : GNU Screen 的替代品
  8. sqlserver 插入数据时异常,仅当使用了列列表并且 IDENTITY_INSERT 为 ON 时,才能为表'XXXXX.dbo.XXXXXXXXX'中的标识列指定显式值。...
  9. excel和html互相转换,Excel2016与Html格式之间的互相转换
  10. 新手抖音直播需要什么设备;看完让你少花冤枉钱。
  11. 敏捷管理-PDCA循环(戴明环)
  12. Kmplayer的各种功能设置
  13. 基于 Creator 3.0 的 3D 换装
  14. 什么是元数据 (MetaData)
  15. 基于微信小程序+爬虫制作一个表情包小程序
  16. 美的2021高频题汇总 | 备战春招,刷这30题就够了!
  17. WIFI基础入门--802.11k--无线局域网络频谱测量
  18. pycharm使用私钥远程连接服务器
  19. eNSP动态NAT实验记录
  20. 使用Java语言开发在线电影推荐网 电影推荐系统 豆瓣电影爬虫 基于用户、物品的协同过滤推荐算法实现 SSM(Spring+SpringMVC+Mybatis)开发框架 机器学习、人工智能、大数据开发

热门文章

  1. git使用log命令显示中文乱码
  2. tomcat配置项目路径或部署工程名
  3. es6的map()方法解释
  4. php memcache可存,php使用memcache共享存储session(二)
  5. 2019年的前端学习计划
  6. 使用VirtualEnvWrapper隔离python项目的库依赖
  7. Sql Server系列:存储过程
  8. Repeater的嵌套结合用户控件的使用
  9. Android---- 获取当前应用的版本号和当前android系统的版本号
  10. 关于ie6下提交上传表单的注意事项