2.1.3 客户端网络连接对象

客户端网络连接对象(NetworkClient )管理了客户端和服务端之间的 网络通信,包括连接的建立、发送客户端请求 、 读取客户端响应。回顾下2 . 1.2节中第 1小节“从记录收集器获取数据”部分,发送线程的run()方法中会调用 NetworkClient的3个方法,如下 。

  • ready()方法。从记录收集器获取准备完毕的节点,并连接所有准备好的节点 ;
  • send()方法。为每个节点创建一个客户端请求 , 将请求暂存到节点对应的通道 中;
  • poll()方法 。轮询动作会真正执行网 络请求, 比如发送请求给节点,并读取响应 。

我们把前面两个方法都叫作准备阶段,因为调用这两个方法并没有真正地将客户端请求发送到服务端上,只有第三个方法才会发送客户端请求 。

  1. 准备发送客户端请求

客户端向服务端发送请求需要先建立网络连接。 如果服务端还没有准备好,即还不能连接,这个节点在客户端就会被移除掉,确保消息不会发送给还没有准备好的节点;如果服务端已经准备好了,则调用selector. connect ()方法建立到目标节点的网络连接 。 相关代码如下:

    public boolean ready(Node node, long now) {if (node.isEmpty())throw new IllegalArgumentException("Cannot connect to empty node " + node);if (isReady(node, now))return true;if (connectionStates.canConnect(node.idString(), now))// if we are interested in sending to a node and we don't have a connection to it, initiate oneinitiateConnect(node, now);return false;}
    private void initiateConnect(Node node, long now) {String nodeConnectionId = node.idString();try {log.debug("Initiating connection to node {}", node);this.connectionStates.connecting(nodeConnectionId, now);selector.connect(nodeConnectionId,new InetSocketAddress(node.host(), node.port()),this.socketSendBuffer,this.socketReceiveBuffer);} catch (IOException e) {/* attempt failed, we'll try again after the backoff */connectionStates.disconnected(nodeConnectionId, now);/* maybe the problem is our metadata, update it */metadataUpdater.requestUpdate();log.debug("Error connecting to node {}", node, e);}}

连接建立后,发送线程调用NetworkClientsend() ,先将客户端请求加入initFlightRequests列表,然后调用 selector. send()方法。 注意:这一步只是将请求暂存到节点对应的网络通道中,还没有真正地将客户端请求发送出去。 相关代码如下:

public void send(ClientRequest request, long now) {doSend(request, false, now);}private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {String nodeId = clientRequest.destination();if (!isInternalRequest) {// If this request came from outside the NetworkClient, validate// that we can send data.  If the request is internal, we trust// that that internal code has done this validation.  Validation// will be slightly different for some internal requests (for// example, ApiVersionsRequests can be sent prior to being in// READY state.)if (!canSendRequest(nodeId))throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");}AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();try {NodeApiVersions versionInfo = apiVersions.get(nodeId);short version;// Note: if versionInfo is null, we have no server version information. This would be// the case when sending the initial ApiVersionRequest which fetches the version// information itself.  It is also the case when discoverBrokerVersions is set to false.if (versionInfo == null) {version = builder.desiredOrLatestVersion();if (discoverBrokerVersions && log.isTraceEnabled())log.trace("No version information found when sending {} with correlation id {} to node {}. " +"Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);} else {version = versionInfo.usableVersion(clientRequest.apiKey(), builder.desiredVersion());}// The call to build may also throw UnsupportedVersionException, if there are essential// fields that cannot be represented in the chosen version.doSend(clientRequest, isInternalRequest, now, builder.build(version));} catch (UnsupportedVersionException e) {// If the version is not supported, skip sending the request over the wire.// Instead, simply add it to the local queue of aborted requests.log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,clientRequest.correlationId(), clientRequest.destination(), e);ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.desiredOrLatestVersion()),clientRequest.callback(), clientRequest.destination(), now, now,false, e, null);abortedSends.add(clientResponse);}}

为了保证服务端的处理性能,客户端网络连接对象有一个限制条件:针对同一个服务端,如果上一个客户端请求还没有发送完成,则不允许发送新的客户端请求 。 客户端网络连接对象用i.nFli.ghtReq四sts变量在客户端缓存了还没有收到响应的客户端请求, InFli.ghtRequests类包含一个节点到双端队列的映射结构 。 在准备发送客户端请求时,请求将添加到指定节点对应的队列中;在收到响应后 ,才会将请求从队列中移除 。

  1. 客户端轮询并调用回调函数
    发送线程 run ()方法的最后一步是调用 NetworkClient的 poll ()方法 。 轮询的最关键步骤是调用selectorpoll ()方法,而在轮询之后,定义了多个处理方法 。 轮询不仅仅会发送客户端请求,也会接收客户端响应 。 客户端发送请求后会调用 handleCol’lpletedSends ()处理已经完成的发送,客户端接收到响应后会调用 handleCol’lpletedReceives ()处理已经完成的接收。
    如果客户端发送完请求不需要响应,在处理已经完成的发送时,就会将对应的请求从inFlightRequests 队列中移踪 。 而因为没有响应结果,也就不会有机会调用 handleCollpletedReceives()方法。 如果客户端请求需要响应, 则只有在handleCol’lpletedReceives () 中才会删除对应的请求 : 因为inFli.ghtRequests队列保存的是未收到响应的客户端请求,请求已经有响应,就不需要存在于队列 中 。
    相关代码如下 :
public List<ClientResponse> poll(long timeout, long now) {if (!abortedSends.isEmpty()) {// If there are aborted sends because of unsupported version exceptions or disconnects,// handle them immediately without waiting for Selector#poll.List<ClientResponse> responses = new ArrayList<>();handleAbortedSends(responses);completeResponses(responses);return responses;}long metadataTimeout = metadataUpdater.maybeUpdate(now);try {this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));} catch (IOException e) {log.error("Unexpected error during I/O", e);}// process completed actionslong updatedNow = this.time.milliseconds();List<ClientResponse> responses = new ArrayList<>();handleCompletedSends(responses, updatedNow);handleCompletedReceives(responses, updatedNow);handleDisconnections(responses, updatedNow);handleConnections();handleInitiateApiVersionRequests(updatedNow);handleTimedOutRequests(responses, updatedNow);completeResponses(responses);return responses;}private void handleCompletedSends(List<ClientResponse> responses, long now) {// if no response is expected then when the send is completed, return itfor (Send send : this.selector.completedSends()) {InFlightRequest request = this.inFlightRequests.lastSent(send.destination());if (!request.expectResponse) {this.inFlightRequests.completeLastSent(send.destination());responses.add(request.completed(null, now));}}}private void handleCompletedSends(List<ClientResponse> responses, long now) {// if no response is expected then when the send is completed, return itfor (Send send : this.selector.completedSends()) {InFlightRequest request = this.inFlightRequests.lastSent(send.destination());if (!request.expectResponse) {this.inFlightRequests.completeLastSent(send.destination());responses.add(request.completed(null, now));}}}

下面总结了客户端是否需要响应结果的两种场景下 , 从队列 中删除或添加请求的顺序 。

  • 不需要响应的流程。开始发送请求→添加客户端请求到队列→发送请求→请求发送成功→从队列中删除发送请求→构造客户端响应。
  • 需要晌应的流程。开始发送请求→添加客户端请求到队列→发送请求→请求发送成功→等待接收响应→接收响应→接收到完整的响应→从队列中删除客户端请求→构造客户端响应 。

上面几个处理方法创建的客户端响应对象(ClientResponse )都需要从队列中获取对应的客户端请求(ClientRequest ),这是因为最后要调用回调函数 , 只有客户端请求中才有回调对象。 把客户端请求作为客户端响应的一个成员变量,在接收到客户端响应时, 通过获取其中的客户端请求 , 就可以得到客户端请求中的回调对象,也就可以调用到回调函数。

客户端响应包含客户端请求的目的是 : 根据响应获取请求中的回调对象 , 在收到响应后调用回调函数。

3 . 客户端请求和客户端响应的关系
客户端请求(ClientRequest )包含客户端发送的请求和回调处理器,客户端响应(ClientResponse )包含客户端请求对象和响应结果的内容。 相关代码如下 :

public final class ClientRequest {private final String destination;private final AbstractRequest.Builder<?> requestBuilder;private final int correlationId;private final String clientId;private final long createdTimeMs;private final boolean expectResponse;private final RequestCompletionHandler callback;
…………
}public class ClientResponse {private final RequestHeader requestHeader;private final RequestCompletionHandler callback;private final String destination;private final long receivedTimeMs;private final long latencyMs;private final boolean disconnected;private final UnsupportedVersionException versionMismatch;private final AbstractResponse responseBody;
…………
}

客户端请求和客户端响应的生命周期都在客户端的连接管理类(NetworkClient)里 。NetworkClient不仅负责将发送钱程构造好的客户端请求发送出去,而且还要将服务端的响应结果构造成客户端响应
并返回给客户端。 图2-9以“客户端发送请求,服务端接收请求,服务端返回结果,客户端接收请求”这个完整的流程,来梳理这些对象之间的关联 。

  1. 发送线程创建的客户端请求对象包括请求本身和回调对象 。
  2. 发送线程将客户端请求交给 NetworkClient,并记录目标节点到客户端请求的映射关系 。
  3. NetworkClient的轮询得到发送请求,将客户端请求发送到对应的服务端目标节点 。
  4. 服务端处理客户端请求 , 将客户端响应通过服务端的请求通道返回给客户端 。
  5. NetworkClient的轮询得到响应结果, 说明客户端收到服务端发送过来的请求处理结果 。
  6. 由于客户端发送请求 时发送到了不 同节点,收到的结果也可能来向不同节点 。 服务端发送过来的响应结果都表示了它是从哪里来的,客户端根据NetworkRecelve的 source查找步骤(2)记录的信息,得到对应的客户端请求, 把客户端请求作为客户端响应的成员变量。
  7. 调用口 lentResponse.ClientRequest.Callback.onComplete() ,触发回调函数的调用 。
  8. 客户端请求中的回调对象会使用客户端的响应结果 , 来调用生产者应用程序向定义的回调函数 。

客户端请求对应的底层数据来源于Send ,客户端响应对应的底层数据来源于NetworkReceive。客户端网络连接对象(NetworkClient )的底层网络操作都交给了选择器(Selector)。


2.1.3 客户端网络连接对象相关推荐

  1. Android聊天软件的开发(三)--网络连接

    一,服务器网络接口    服务器网络接口通过Servlet实现,可以获得客户端提交的数据,对数据进行查询存储操作,以及返回结果数据给客户端.客户端可以通过HTTP协议直接访问网络接口.    HTTP ...

  2. 【Java——网络编程基础之客户端服务器连接】

    网络编程 1.1软件结构 1.2 网络通信协议 1.3 协议分类 1.4网络编程三要素 协议 IP地址 端口号 TCP通信程序 2.1 概述 2.2 Socket类 构造方法 成员方法 2.3 Ser ...

  3. win10无法登录(调用的对象已与其客户端断开连接)

    win10无法登录--调用的对象已与其客户端断开连接 问题描述 解决方案 问题描述 启动电脑,进入登录界面,输入密码,无法进入系统,显示:调用的对象已与其客户端断开连接 解决方案 1.点击 重启电脑. ...

  4. python 批量转换xls to xlsx,出现pywintypes.com_error: (-2147417848, ‘被调用的对象已与其客户端断开连接)’的问题处理

    先上python批量转换xls文件转xlsx文件的源代码: import os import win32com.client as win32def change_xls_to_xlsx():p_pr ...

  5. python 批量转换docx只转换了一个出现pywintypes.com_error被调用的对象已与其客户端断开连接

    如下,把txt文件或.doc文件转换为docx,结果只转换了一个 pywintypes.com_error: (-2147417848, '被调用的对象已与其客户端断开连接.', None, None ...

  6. 如何解决“被调用的对象已与其客户端断开连接“

    一.问题描述 使用我的ThinkPad笔记本进入Windows10,总是会偶尔出现"被调用的对象已与其客户端断开连接"的消息,有时候重启就能解决,有时候需要另一台设备登录微软账户删 ...

  7. WIN10开机显示被调用的对象已与其客户端断开连接解决方法之一

    WIN10开机显示被调用的对象已与其客户端断开连接解决方法之一 有关windows系统开机输入密码后显示"被调用的对象已与其客户端断开连接"的原因具体有几种不是太清楚,不过参照其他 ...

  8. 堡垒机远程连接报“由于安全设置错误, 客户端无法连接到远程计算机. 确定你已登录到网络后,再重新连接” 错误处理步骤

    window客户端 通过堡垒机 远程连接出现 "由于安全设置错误, 客户端无法连接到远程计算机. 确定你已登录到网络后,再重新连接" 错误 解决方法如下: 第一步:打开" ...

  9. B2B 手动客户端网络上传数据报错:由于目标机器积极拒绝,无法连接,解决办法

    B2B 手动客户端网络上传数据报错:由于目标机器积极拒绝,无法连接,解决办法 B2B手动客户端在外网运行时,选择网络上传数据时,报错"由于目标机器积极拒绝,无法连接":在内网通 ...

最新文章

  1. 邻接矩阵中啥时候写0和无穷_集合中的上极限与下极限
  2. python如何读dat数据_如何用Python进行数据质量分析
  3. 微服务模式下,实现前后端多资源服务调用
  4. 西安工程大学计算机科学学院刘宝宝,计算机科学学院召开研究生国家奖学金答辩会...
  5. 2017.9.18 数颜色 思考记录
  6. java生成bcp_java-如何将IETF BCP 47语言代码转换为显示字符串?
  7. 极其艰难地下了决心建立博客虽然没有很好的理由或者仅仅是因为觉得自己的确要改变了......
  8. 《剑指offer》第二十三题(链表中环的入口结点)
  9. 缺项级数的收敛域求解
  10. 社科院与杜兰大学金融管理硕士——在自己的领域努力拼搏,终将遇到专属的光芒
  11. excel空白单元格自动下下填充上一个单元格的值
  12. 函数最值题目及答案_有关函数的极值与导数的测试题及答案
  13. 【马士兵】笔记_Java网络编程
  14. CGAL 凹包(alpha-Shape)
  15. ios屏幕录制60帧_探索iOS屏幕帧缓冲区–内核反转实验
  16. 使用SpringBoot及Construct2的WebSocket制作联机游戏(二)
  17. MySQL 启停过程了解一二
  18. 通用一键打包软件,数据包制作工具
  19. 简单性能测试:springboot-2.x vs actix-web-4.x benchmark
  20. 计算机学业水平测试基础知识,全国通用信息技术学业水平测试必考知识要点(一)...

热门文章

  1. 深入浅出JS—18 手把手实现一个Promise类
  2. 【思考题】新客老客定义
  3. PHP实现 记录网站访问量
  4. 以太网通信连接不上自检步骤
  5. 无分类编址CIDR(构造超网)
  6. Python实用工具之制作证件照(有界面、附源码、赞关藏)
  7. zz-tcp参数配置
  8. /usr/bin/ld: cannot find -lxxx错误的通用解决方法
  9. LeetCode刷题之---上一个排序
  10. vue项目-android版本引入微信录音