概述

ProtobufRpcEngine是在RPC通信过程中,使用ptotobuf作为数据交换格式的RPC实现类。

对ProtobufRpcEngine的源码分析将围绕RPC概念模型展开。

RPC概念模型

 RPC调用流程如下:

  1. RPC 服务端通过 RpcServer 去导出(export)远程接口方法,而客户端通过 RpcClient 去导入(import)远程接口方法。
  2. 客户端像调用本地方法一样去调用远程接口方法,RPC 框架提供接口的代理实现,实际的调用将委托给代理 RpcProxy 。
  3. 代理封装调用信息并将调用转交给 RpcInvoker 去实际执行。在客户端的 RpcInvoker 通过连接器 RpcConnector 去维持与服务端的通道 RpcChannel,并使用 RpcProtocol 执行协议编码(encode)并将编码后的请求消息通过通道发送给服务端。
  4. RPC 服务端接收器 RpcAcceptor 接收客户端的调用请求,同样使用 RpcProtocol 执行协议解码(decode)。
  5. 解码后的调用信息传递给 RpcProcessor 去控制处理调用过程,最后再委托调用给 RpcInvoker 去实际执行并返回调用结果。

RPC要素

1、RpcServer 
     负责导出(export)远程接口
2、RpcClient 
     负责导入(import)远程接口的代理实现
3、RpcProxy 
     远程接口的代理实现
4、RpcInvoker 
     客户端:负责编码调用信息和发送调用请求到服务端并等待调用结果返回 
     服务端:负责调用服务端接口的具体实现并返回调用结果
5、RpcProtocol 
     负责协议编/解码
6、RpcConnector 
     负责维持客户端和服务端的连接通道和发送数据到服务端
7、RpcAcceptor 
     负责接收客户端请求并返回请求结果
8、RpcProcessor 
     负责在服务端控制调用过程,包括管理调用线程池、超时时间等
9、RpcChannel 
     数据传输通道

RPC实现要点

1、协议

协议指 RPC 调用在网络传输中约定的数据封装方式,包括三个部分:编解码、消息头 和 消息体。

编解码
客户端代理在发起调用前需要对调用信息进行编码,这就要考虑需要编码些什么信息并以什么格式传输到服务端才能让服务端完成调用。 出于效率考虑,编码的信息越少越好(传输数据少),编码的规则越简单越好(执行效率高)。

我们先看下需要编码些什么信息:

调用编码

  • 接口方法 
    包括接口名、方法名
  • 方法参数 
    包括参数类型、参数值
  • 调用属性 
    包括调用属性信息,例如调用附加的隐式参数、调用超时时间等

返回编码

  • 返回结果 
    接口方法中定义的返回值
  • 返回码 
    异常返回码
  • 返回异常信息 
    调用异常信息

消息头
除了以上这些必须的调用信息,我们可能还需要一些元信息以方便程序编解码以及未来可能的扩展。这样我们的编码消息里面就分成了两部分,一部分是元信息、另一部分是调用的必要信息。如果设计一种 RPC 协议消息的话,元信息我们把它放在协议消息头中,而必要信息放在协议消息体中。下面给出一种概念上的 RPC 协议消息头设计格式:

  • magic 
    协议魔数,为解码设计
  • header size 
    协议头长度,为扩展设计
  • version 
    协议版本,为兼容设计
  • st 
    消息体序列化类型
  • hb 
    心跳消息标记,为长连接传输层心跳设计
  • ow 
    单向消息标记
  • rp 
    响应消息标记,不置位默认是请求消息
  • status code 
    响应消息状态码
  • reserved 
    为字节对齐保留
  • message id 
    消息 id
  • body size 
    消息体长度

消息体
消息体常采用序列化编码,常见有以下序列化方式(数据交换格式):

  • xml 
    如 webservie SOAP
  • json 
    如 JSON-RPC
  • binary 
    如 thrift; hession; kryo 等

格式确定后编解码就简单了,由于头长度一定所以我们比较关心的就是消息体的序列化方式。 序列化我们关心三个方面:

  • 效率:序列化和反序列化的效率,越快越好。
  • 长度:序列化后的字节长度,越小越好。
  • 兼容:序列化和反序列化的兼容性,接口参数对象若增加了字段,是否兼容。

2、通信

消息数据结构被序列化为二进制串后,下一步就要进行网络通信了。目前有两种常用IO通信模型:1)BIO;2)NIO。一般RPC框架需要支持这两种IO模型。

如何实现RPC的IO通信框架呢?1)使用java nio方式自研,这种方式较为复杂,而且很有可能出现隐藏bug,但也见过一些互联网公司使用这种方式;2)基于mina,mina在早几年比较火热,不过这些年版本更新缓慢;3)基于netty,现在很多RPC框架都直接基于netty这一IO通信框架,省力又省心,比如阿里巴巴的HSF、dubbo,Twitter的finagle等。

ProtobufRpcEngine源码分析

1、RPCServer和服务端的RpcInvoker

public static class Server extends RPC.Server {/*** Construct an RPC server.* * @param protocolClass the class of protocol* @param protocolImpl the protocolImpl whose methods will be called* @param conf the configuration to use* @param bindAddress the address to bind on to listen for connection* @param port the port to listen for connections on* @param numHandlers the number of method handler threads to run* @param verbose whether each call should be logged* @param portRangeConfig A config parameter that can be used to restrict* the range of ports used when port is 0 (an ephemeral port)*/public Server(Class<?> protocolClass, Object protocolImpl,Configuration conf, String bindAddress, int port, int numHandlers,int numReaders, int queueSizePerHandler, boolean verbose,SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig)throws IOException {super(bindAddress, port, null, numHandlers,numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl.getClass().getName()), secretManager, portRangeConfig);this.verbose = verbose;  registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,protocolImpl);}/*** Protobuf invoker for {@link RpcInvoker}*/static class ProtoBufRpcInvoker implements RpcInvoker {private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server,String protoName, long clientVersion) throws RpcServerException {ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);ProtoClassProtoImpl impl = server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv);if (impl == null) { // no match for Protocol AND VersionVerProtocolImpl highest = server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protoName);if (highest == null) {throw new RpcNoSuchProtocolException("Unknown protocol: " + protoName);}// protocol supported but not the version that client wantsthrow new RPC.VersionMismatch(protoName, clientVersion,highest.version);}return impl;}@Override /*** This is a server side method, which is invoked over RPC. On success* the return response has protobuf response payload. On failure, the* exception name and the stack trace are returned in the response.* See {@link HadoopRpcResponseProto}* * In this method there three types of exceptions possible and they are* returned in response as follows.* <ol>* <li> Exceptions encountered in this method that are returned * as {@link RpcServerException} </li>* <li> Exceptions thrown by the service is wrapped in ServiceException. * In that this method returns in response the exception thrown by the * service.</li>* <li> Other exceptions thrown by the service. They are returned as* it is.</li>* </ol>*/public Writable call(RPC.Server server, String protocol,Writable writableRequest, long receiveTime) throws Exception {RpcRequestWrapper request = (RpcRequestWrapper) writableRequest;RequestHeaderProto rpcRequest = request.requestHeader;String methodName = rpcRequest.getMethodName();String protoName = rpcRequest.getDeclaringClassProtocolName();long clientVersion = rpcRequest.getClientProtocolVersion();if (server.verbose)LOG.info("Call: protocol=" + protocol + ", method=" + methodName);ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,clientVersion);BlockingService service = (BlockingService) protocolImpl.protocolImpl;MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);if (methodDescriptor == null) {String msg = "Unknown method " + methodName + " called on " + protocol+ " protocol.";LOG.warn(msg);throw new RpcNoSuchMethodException(msg);}Message prototype = service.getRequestPrototype(methodDescriptor);Message param = prototype.newBuilderForType().mergeFrom(request.theRequestRead).build();Message result;long startTime = Time.now();int qTime = (int) (startTime - receiveTime);Exception exception = null;try {server.rpcDetailedMetrics.init(protocolImpl.protocolClass);result = service.callBlockingMethod(methodDescriptor, null, param);} catch (ServiceException e) {exception = (Exception) e.getCause();throw (Exception) e.getCause();} catch (Exception e) {exception = e;throw e;} finally {int processingTime = (int) (Time.now() - startTime);if (LOG.isDebugEnabled()) {String msg = "Served: " + methodName + " queueTime= " + qTime +" procesingTime= " + processingTime;if (exception != null) {msg += " exception= " + exception.getClass().getSimpleName();}LOG.debug(msg);}String detailedMetricsName = (exception == null) ?methodName :exception.getClass().getSimpleName();server.rpcMetrics.addRpcQueueTime(qTime);server.rpcMetrics.addRpcProcessingTime(processingTime);server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,processingTime);if (server.isLogSlowRPC()) {server.logSlowRpcCalls(methodName, processingTime);}}return new RpcResponseWrapper(result);}}}

2、客户端的RpcInvoker

private static class Invoker implements RpcInvocationHandler {private final Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();private boolean isClosed = false;private final Client.ConnectionId remoteId;private final Client client;private final long clientProtocolVersion;private final String protocolName;private AtomicBoolean fallbackToSimpleAuth;private Invoker(Class<?> protocol, InetSocketAddress addr,UserGroupInformation ticket, Configuration conf, SocketFactory factory,int rpcTimeout, RetryPolicy connectionRetryPolicy,AtomicBoolean fallbackToSimpleAuth) throws IOException {this(protocol, Client.ConnectionId.getConnectionId(addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),conf, factory);this.fallbackToSimpleAuth = fallbackToSimpleAuth;}/*** This constructor takes a connectionId, instead of creating a new one.*/private Invoker(Class<?> protocol, Client.ConnectionId connId,Configuration conf, SocketFactory factory) {this.remoteId = connId;this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class);this.protocolName = RPC.getProtocolName(protocol);this.clientProtocolVersion = RPC.getProtocolVersion(protocol);}private RequestHeaderProto constructRpcRequestHeader(Method method) {RequestHeaderProto.Builder builder = RequestHeaderProto.newBuilder();builder.setMethodName(method.getName());// For protobuf, {@code protocol} used when creating client side proxy is// the interface extending BlockingInterface, which has the annotations // such as ProtocolName etc.//// Using Method.getDeclaringClass(), as in WritableEngine to get at// the protocol interface will return BlockingInterface, from where // the annotation ProtocolName and Version cannot be// obtained.//// Hence we simply use the protocol class used to create the proxy.// For PB this may limit the use of mixins on client side.builder.setDeclaringClassProtocolName(protocolName);builder.setClientProtocolVersion(clientProtocolVersion);return builder.build();}/*** This is the client side invoker of RPC method. It only throws* ServiceException, since the invocation proxy expects only* ServiceException to be thrown by the method in case protobuf service.* * ServiceException has the following causes:* <ol>* <li>Exceptions encountered on the client side in this method are * set as cause in ServiceException as is.</li>* <li>Exceptions from the server are wrapped in RemoteException and are* set as cause in ServiceException</li>* </ol>* * Note that the client calling protobuf RPC methods, must handle* ServiceException by getting the cause from the ServiceException. If the* cause is RemoteException, then unwrap it to get the exception thrown by* the server.*/@Overridepublic Object invoke(Object proxy, Method method, Object[] args)throws ServiceException {long startTime = 0;if (LOG.isDebugEnabled()) {startTime = Time.now();}if (args.length != 2) { // RpcController + Messagethrow new ServiceException("Too many parameters for request. Method: ["+ method.getName() + "]" + ", Expected: 2, Actual: "+ args.length);}if (args[1] == null) {throw new ServiceException("null param while calling Method: ["+ method.getName() + "]");}TraceScope traceScope = null;// if Tracing is on then start a new span for this rpc.// guard it in the if statement to make sure there isn't// any extra string manipulation.if (Trace.isTracing()) {traceScope = Trace.startSpan(RpcClientUtil.methodToTraceString(method));}RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);if (LOG.isTraceEnabled()) {LOG.trace(Thread.currentThread().getId() + ": Call -> " +remoteId + ": " + method.getName() +" {" + TextFormat.shortDebugString((Message) args[1]) + "}");}Message theRequest = (Message) args[1];final RpcResponseWrapper val;try {val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,fallbackToSimpleAuth);} catch (Throwable e) {if (LOG.isTraceEnabled()) {LOG.trace(Thread.currentThread().getId() + ": Exception <- " +remoteId + ": " + method.getName() +" {" + e + "}");}if (Trace.isTracing()) {traceScope.getSpan().addTimelineAnnotation("Call got exception: " + e.getMessage());}throw new ServiceException(e);} finally {if (traceScope != null) traceScope.close();}if (LOG.isDebugEnabled()) {long callTime = Time.now() - startTime;LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");}Message prototype = null;try {prototype = getReturnProtoType(method);} catch (Exception e) {throw new ServiceException(e);}Message returnMessage;try {returnMessage = prototype.newBuilderForType().mergeFrom(val.theResponseRead).build();if (LOG.isTraceEnabled()) {LOG.trace(Thread.currentThread().getId() + ": Response <- " +remoteId + ": " + method.getName() +" {" + TextFormat.shortDebugString(returnMessage) + "}");}} catch (Throwable e) {throw new ServiceException(e);}return returnMessage;}@Overridepublic void close() throws IOException {if (!isClosed) {isClosed = true;CLIENTS.stopClient(client);}}private Message getReturnProtoType(Method method) throws Exception {if (returnTypes.containsKey(method.getName())) {return returnTypes.get(method.getName());}Class<?> returnType = method.getReturnType();Method newInstMethod = returnType.getMethod("getDefaultInstance");newInstMethod.setAccessible(true);Message prototype = (Message) newInstMethod.invoke(null, (Object[]) null);returnTypes.put(method.getName(), prototype);return prototype;}@Override //RpcInvocationHandlerpublic ConnectionId getConnectionId() {return remoteId;}}

3、请求编码RpcRequestWrapper

RequestHeaderProto requestHeader是请求编码的消息头;

Message theRequest是请求编码的消息体;

private static class RpcRequestWrapperextends RpcMessageWithHeader<RequestHeaderProto> {@SuppressWarnings("unused")public RpcRequestWrapper() {}public RpcRequestWrapper(RequestHeaderProto requestHeader, Message theRequest) {super(requestHeader, theRequest);}@OverrideRequestHeaderProto parseHeaderFrom(byte[] bytes) throws IOException {return RequestHeaderProto.parseFrom(bytes);}@Overridepublic String toString() {return requestHeader.getDeclaringClassProtocolName() + "." +requestHeader.getMethodName();}}//父类
/*** Wrapper for Protocol Buffer Requests* * Note while this wrapper is writable, the request on the wire is in* Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} * use type Writable as a wrapper to work across multiple RpcEngine kinds.*/private static abstract class RpcMessageWithHeader<T extends GeneratedMessage>implements RpcWrapper {T requestHeader;Message theRequest; // for clientSide, the request is herebyte[] theRequestRead; // for server side, the request is herepublic RpcMessageWithHeader() {}public RpcMessageWithHeader(T requestHeader, Message theRequest) {this.requestHeader = requestHeader;this.theRequest = theRequest;}@Overridepublic void write(DataOutput out) throws IOException {OutputStream os = DataOutputOutputStream.constructOutputStream(out);((Message)requestHeader).writeDelimitedTo(os);theRequest.writeDelimitedTo(os);}@Overridepublic void readFields(DataInput in) throws IOException {requestHeader = parseHeaderFrom(readVarintBytes(in));theRequestRead = readMessageRequest(in);}abstract T parseHeaderFrom(byte[] bytes) throws IOException;byte[] readMessageRequest(DataInput in) throws IOException {return readVarintBytes(in);}private static byte[] readVarintBytes(DataInput in) throws IOException {final int length = ProtoUtil.readRawVarint32(in);final byte[] bytes = new byte[length];in.readFully(bytes);return bytes;}public T getMessageHeader() {return requestHeader;}public byte[] getMessageBytes() {return theRequestRead;}@Overridepublic int getLength() {int headerLen = requestHeader.getSerializedSize();int reqLen;if (theRequest != null) {reqLen = theRequest.getSerializedSize();} else if (theRequestRead != null ) {reqLen = theRequestRead.length;} else {throw new IllegalArgumentException("getLength on uninitialized RpcWrapper");      }return CodedOutputStream.computeRawVarint32Size(headerLen) +  headerLen+ CodedOutputStream.computeRawVarint32Size(reqLen) + reqLen;}}

4、返回编码RpcResponseWrapper

  /***  Wrapper for Protocol Buffer Responses* * Note while this wrapper is writable, the request on the wire is in* Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} * use type Writable as a wrapper to work across multiple RpcEngine kinds.*/@InterfaceAudience.LimitedPrivate({"RPC"}) // temporarily exposed public static class RpcResponseWrapper implements RpcWrapper {Message theResponse; // for senderSide, the response is herebyte[] theResponseRead; // for receiver side, the response is herepublic RpcResponseWrapper() {}public RpcResponseWrapper(Message message) {this.theResponse = message;}@Overridepublic void write(DataOutput out) throws IOException {OutputStream os = DataOutputOutputStream.constructOutputStream(out);theResponse.writeDelimitedTo(os);   }@Overridepublic void readFields(DataInput in) throws IOException {int length = ProtoUtil.readRawVarint32(in);theResponseRead = new byte[length];in.readFully(theResponseRead);}@Overridepublic int getLength() {int resLen;if (theResponse != null) {resLen = theResponse.getSerializedSize();} else if (theResponseRead != null ) {resLen = theResponseRead.length;} else {throw new IllegalArgumentException("getLength on uninitialized RpcWrapper");      }return CodedOutputStream.computeRawVarint32Size(resLen) + resLen;}}

5、服务端的网络通信

服务端的网络通信由ProtoBufRpcEngine的内部类Server的父类完成。这是模板方法设计模式的应用,在父类中定义通信的主要流程,而把编解码这些个性化的步骤延迟到子类中去实现。

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.hadoop.ipc;import static org.apache.hadoop.ipc.RpcConstants.AUTHORIZATION_FAILED_CALL_ID;
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;import org.apache.commons.io.Charsets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcKindProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto.SaslAuth;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcSaslProto.SaslState;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceInfo;
import org.apache.htrace.TraceScope;import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder;/** An abstract IPC service.  IPC calls take a single {@link Writable} as a* parameter, and return a {@link Writable} as their value.  A service runs on* a port and is defined by a parameter class and a value class.* * @see Client*/
public abstract class Server {private final boolean authorize;private List<AuthMethod> enabledAuthMethods;private RpcSaslProto negotiateResponse;private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();/*** Add exception classes for which server won't log stack traces.** @param exceptionClass exception classes*/public void addTerseExceptions(Class<?>... exceptionClass) {exceptionsHandler.addTerseLoggingExceptions(exceptionClass);}/*** Add exception classes which server won't log at all.** @param exceptionClass exception classes*/public void addSuppressedLoggingExceptions(Class<?>... exceptionClass) {exceptionsHandler.addSuppressedLoggingExceptions(exceptionClass);}/*** ExceptionsHandler manages Exception groups for special handling* e.g., terse exception group for concise logging messages*/static class ExceptionsHandler {private volatile Set<String> terseExceptions = new HashSet<>();private volatile Set<String> suppressedExceptions = new HashSet<>();/*** Add exception classes for which server won't log stack traces.* Optimized for infrequent invocation.* @param exceptionClass exception classes */void addTerseLoggingExceptions(Class<?>... exceptionClass) {// Thread-safe replacement of terseExceptions.terseExceptions = addExceptions(terseExceptions, exceptionClass);}/*** Add exception classes which server won't log at all.* Optimized for infrequent invocation.* @param exceptionClass exception classes*/void addSuppressedLoggingExceptions(Class<?>... exceptionClass) {// Thread-safe replacement of suppressedExceptions.suppressedExceptions = addExceptions(suppressedExceptions, exceptionClass);}boolean isTerseLog(Class<?> t) {return terseExceptions.contains(t.toString());}boolean isSuppressedLog(Class<?> t) {return suppressedExceptions.contains(t.toString());}/*** Return a new set containing all the exceptions in exceptionsSet* and exceptionClass.* @return*/private static Set<String> addExceptions(final Set<String> exceptionsSet, Class<?>[] exceptionClass) {// Make a copy of the exceptionSet for performing modificationfinal HashSet<String> newSet = new HashSet<>(exceptionsSet);// Add all class names into the HashSetfor (Class<?> name : exceptionClass) {newSet.add(name.toString());}return Collections.unmodifiableSet(newSet);}}/*** If the user accidentally sends an HTTP GET to an IPC port, we detect this* and send back a nicer response.*/private static final ByteBuffer HTTP_GET_BYTES = ByteBuffer.wrap("GET ".getBytes(Charsets.UTF_8));/*** An HTTP response to send back if we detect an HTTP request to our IPC* port.*/static final String RECEIVED_HTTP_REQ_RESPONSE ="HTTP/1.1 404 Not Found\r\n" +"Content-type: text/plain\r\n\r\n" +"It looks like you are making an HTTP request to a Hadoop IPC port. " +"This is not the correct port for the web interface on this daemon.\r\n";/*** Initial and max size of response buffer*/static int INITIAL_RESP_BUF_SIZE = 10240;static class RpcKindMapValue {final Class<? extends Writable> rpcRequestWrapperClass;final RpcInvoker rpcInvoker;RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass,RpcInvoker rpcInvoker) {this.rpcInvoker = rpcInvoker;this.rpcRequestWrapperClass = rpcRequestWrapperClass;}   }static Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap = newHashMap<RPC.RpcKind, RpcKindMapValue>(4);/*** Register a RPC kind and the class to deserialize the rpc request.* * Called by static initializers of rpcKind Engines* @param rpcKind* @param rpcRequestWrapperClass - this class is used to deserialze the*  the rpc request.*  @param rpcInvoker - use to process the calls on SS.*/public static void registerProtocolEngine(RPC.RpcKind rpcKind, Class<? extends Writable> rpcRequestWrapperClass,RpcInvoker rpcInvoker) {RpcKindMapValue  old = rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker));if (old != null) {rpcKindMap.put(rpcKind, old);throw new IllegalArgumentException("ReRegistration of rpcKind: " +rpcKind);      }LOG.debug("rpcKind=" + rpcKind + ", rpcRequestWrapperClass=" + rpcRequestWrapperClass + ", rpcInvoker=" + rpcInvoker);}public Class<? extends Writable> getRpcRequestWrapper(RpcKindProto rpcKind) {if (rpcRequestClass != null)return rpcRequestClass;RpcKindMapValue val = rpcKindMap.get(ProtoUtil.convert(rpcKind));return (val == null) ? null : val.rpcRequestWrapperClass; }public static RpcInvoker  getRpcInvoker(RPC.RpcKind rpcKind) {RpcKindMapValue val = rpcKindMap.get(rpcKind);return (val == null) ? null : val.rpcInvoker; }public static final Log LOG = LogFactory.getLog(Server.class);public static final Log AUDITLOG = LogFactory.getLog("SecurityLogger."+Server.class.getName());private static final String AUTH_FAILED_FOR = "Auth failed for ";private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";private static final ThreadLocal<Server> SERVER = new ThreadLocal<Server>();private static final Map<String, Class<?>> PROTOCOL_CACHE = new ConcurrentHashMap<String, Class<?>>();static Class<?> getProtocolClass(String protocolName, Configuration conf) throws ClassNotFoundException {Class<?> protocol = PROTOCOL_CACHE.get(protocolName);if (protocol == null) {protocol = conf.getClassByName(protocolName);PROTOCOL_CACHE.put(protocolName, protocol);}return protocol;}/** Returns the server instance called under or null.  May be called under* {@link #call(Writable, long)} implementations, and under {@link Writable}* methods of paramters and return values.  Permits applications to access* the server context.*/public static Server get() {return SERVER.get();}/** This is set to Call object before Handler invokes an RPC and reset* after the call returns.*/private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();/** Get the current call */@VisibleForTestingpublic static ThreadLocal<Call> getCurCall() {return CurCall;}/*** Returns the currently active RPC call's sequential ID number.  A negative* call ID indicates an invalid value, such as if there is no currently active* RPC call.* * @return int sequential ID number of currently active RPC call*/public static int getCallId() {Call call = CurCall.get();return call != null ? call.callId : RpcConstants.INVALID_CALL_ID;}/*** @return The current active RPC call's retry count. -1 indicates the retry*         cache is not supported in the client side.*/public static int getCallRetryCount() {Call call = CurCall.get();return call != null ? call.retryCount : RpcConstants.INVALID_RETRY_COUNT;}/** Returns the remote side ip address when invoked inside an RPC *  Returns null incase of an error.*/public static InetAddress getRemoteIp() {Call call = CurCall.get();return (call != null && call.connection != null) ? call.connection.getHostInetAddress() : null;}/*** Returns the clientId from the current RPC request*/public static byte[] getClientId() {Call call = CurCall.get();return call != null ? call.clientId : RpcConstants.DUMMY_CLIENT_ID;}/** Returns remote address as a string when invoked inside an RPC.*  Returns null in case of an error.*/public static String getRemoteAddress() {InetAddress addr = getRemoteIp();return (addr == null) ? null : addr.getHostAddress();}/** Returns the RPC remote user when invoked inside an RPC.  Note this*  may be different than the current user if called within another doAs*  @return connection's UGI or null if not an RPC*/public static UserGroupInformation getRemoteUser() {Call call = CurCall.get();return (call != null && call.connection != null) ? call.connection.user: null;}/** Return true if the invocation was through an RPC.*/public static boolean isRpcInvocation() {return CurCall.get() != null;}private String bindAddress; private int port;                               // port we listen onprivate int handlerCount;                       // number of handler threadsprivate int readThreads;                        // number of read threadsprivate int readerPendingConnectionQueue;         // number of connections to queue per read threadprivate Class<? extends Writable> rpcRequestClass;   // class used for deserializing the rpc requestfinal protected RpcMetrics rpcMetrics;final protected RpcDetailedMetrics rpcDetailedMetrics;private Configuration conf;private String portRangeConfig = null;private SecretManager<TokenIdentifier> secretManager;private SaslPropertiesResolver saslPropsResolver;private ServiceAuthorizationManager serviceAuthorizationManager = new ServiceAuthorizationManager();private int maxQueueSize;private final int maxRespSize;private int socketSendBufferSize;private final int maxDataLength;private final boolean tcpNoDelay; // if T then disable Nagle's Algorithmvolatile private boolean running = true;         // true while server runsprivate CallQueueManager<Call> callQueue;// maintains the set of client connections and handles idle timeoutsprivate ConnectionManager connectionManager;private Listener listener = null;private Responder responder = null;private Handler[] handlers = null;private boolean logSlowRPC = false;/*** Checks if LogSlowRPC is set true.* @return*/protected boolean isLogSlowRPC() {return logSlowRPC;}/*** Sets slow RPC flag.* @param logSlowRPCFlag*/@VisibleForTestingprotected void setLogSlowRPC(boolean logSlowRPCFlag) {this.logSlowRPC = logSlowRPCFlag;}/*** Logs a Slow RPC Request.** @param methodName - RPC Request method name* @param processingTime - Processing Time.** if this request took too much time relative to other requests* we consider that as a slow RPC. 3 is a magic number that comes* from 3 sigma deviation. A very simple explanation can be found* by searching for 68–95–99.7 rule. We flag an RPC as slow RPC* if and only if it falls above 99.7% of requests. We start this logic* only once we have enough sample size.*/void logSlowRpcCalls(String methodName, int processingTime) {final int deviation = 3;// 1024 for minSampleSize just a guess -- not a number computed based on// sample size analysis. It is chosen with the hope that this// number is high enough to avoid spurious logging, yet useful// in practice.final int minSampleSize = 1024;final double threeSigma = rpcMetrics.getProcessingMean() +(rpcMetrics.getProcessingStdDev() * deviation);if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&(processingTime > threeSigma)) {if(LOG.isWarnEnabled()) {String client = CurCall.get().connection.toString();LOG.warn("Slow RPC : " + methodName + " took " + processingTime +" milliseconds to process from client " + client);}rpcMetrics.incrSlowRpc();}}/*** A convenience method to bind to a given address and report * better exceptions if the address is not a valid host.* @param socket the socket to bind* @param address the address to bind to* @param backlog the number of connections allowed in the queue* @throws BindException if the address can't be bound* @throws UnknownHostException if the address isn't a valid host name* @throws IOException other random errors from bind*/public static void bind(ServerSocket socket, InetSocketAddress address, int backlog) throws IOException {bind(socket, address, backlog, null, null);}public static void bind(ServerSocket socket, InetSocketAddress address, int backlog, Configuration conf, String rangeConf) throws IOException {try {IntegerRanges range = null;if (rangeConf != null) {range = conf.getRange(rangeConf, "");}if (range == null || range.isEmpty() || (address.getPort() != 0)) {socket.bind(address, backlog);} else {for (Integer port : range) {if (socket.isBound()) break;try {InetSocketAddress temp = new InetSocketAddress(address.getAddress(),port);socket.bind(temp, backlog);} catch(BindException e) {//Ignored}}if (!socket.isBound()) {throw new BindException("Could not find a free port in "+range);}}} catch (SocketException e) {throw NetUtils.wrapException(null,0,address.getHostName(),address.getPort(), e);}}/*** Returns a handle to the rpcMetrics (required in tests)* @return rpc metrics*/@VisibleForTestingpublic RpcMetrics getRpcMetrics() {return rpcMetrics;}@VisibleForTestingpublic RpcDetailedMetrics getRpcDetailedMetrics() {return rpcDetailedMetrics;}@VisibleForTestingIterable<? extends Thread> getHandlers() {return Arrays.asList(handlers);}@VisibleForTestingConnection[] getConnections() {return connectionManager.toArray();}/*** Refresh the service authorization ACL for the service handled by this server.*/public void refreshServiceAcl(Configuration conf, PolicyProvider provider) {serviceAuthorizationManager.refresh(conf, provider);}/*** Refresh the service authorization ACL for the service handled by this server* using the specified Configuration.*/@Privatepublic void refreshServiceAclWithLoadedConfiguration(Configuration conf,PolicyProvider provider) {serviceAuthorizationManager.refreshWithLoadedConfiguration(conf, provider);}/*** Returns a handle to the serviceAuthorizationManager (required in tests)* @return instance of ServiceAuthorizationManager for this server*/@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})public ServiceAuthorizationManager getServiceAuthorizationManager() {return serviceAuthorizationManager;}static Class<? extends BlockingQueue<Call>> getQueueClass(String prefix, Configuration conf) {String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;Class<?> queueClass = conf.getClass(name, LinkedBlockingQueue.class);return CallQueueManager.convertQueueClass(queueClass, Call.class);}private String getQueueClassPrefix() {return CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + port;}/** Refresh the call queue*/public synchronized void refreshCallQueue(Configuration conf) {// Create the next queueString prefix = getQueueClassPrefix();callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf);}/*** Get from config if client backoff is enabled on that port.*/static boolean getClientBackoffEnable(String prefix, Configuration conf) {String name = prefix + "." +CommonConfigurationKeys.IPC_BACKOFF_ENABLE;return conf.getBoolean(name,CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);}/** A call queued for handling. */public static class Call implements Schedulable {private final int callId;             // the client's call idprivate final int retryCount;        // the retry count of the callprivate final Writable rpcRequest;    // Serialized Rpc request from clientprivate final Connection connection;  // connection to clientprivate long timestamp;               // time received when response is null// time served when response is not nullprivate ByteBuffer rpcResponse;       // the response for this callprivate final RPC.RpcKind rpcKind;private final byte[] clientId;private final Span traceSpan; // the tracing span on the server sideprivate final CallerContext callerContext; // the call contextpublic Call(int id, int retryCount, Writable param, Connection connection) {this(id, retryCount, param, connection, RPC.RpcKind.RPC_BUILTIN,RpcConstants.DUMMY_CLIENT_ID);}public Call(int id, int retryCount, Writable param, Connection connection,RPC.RpcKind kind, byte[] clientId) {this(id, retryCount, param, connection, kind, clientId, null, null);}public Call(int id, int retryCount, Writable param, Connection connection,RPC.RpcKind kind, byte[] clientId, Span span,CallerContext callerContext) {this.callId = id;this.retryCount = retryCount;this.rpcRequest = param;this.connection = connection;this.timestamp = Time.now();this.rpcResponse = null;this.rpcKind = kind;this.clientId = clientId;this.traceSpan = span;this.callerContext = callerContext;}@Overridepublic String toString() {return rpcRequest + " from " + connection + " Call#" + callId + " Retry#"+ retryCount;}public void setResponse(ByteBuffer response) {this.rpcResponse = response;}// For Schedulable@Overridepublic UserGroupInformation getUserGroupInformation() {return connection.user;}    }/** Listens on the socket. Creates jobs for the handler threads*/private class Listener extends Thread {private ServerSocketChannel acceptChannel = null; //the accept channelprivate Selector selector = null; //the selector that we use for the serverprivate Reader[] readers = null;private int currentReader = 0;private InetSocketAddress address; //the address we bind atprivate int backlogLength = conf.getInt(CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);public Listener() throws IOException {address = new InetSocketAddress(bindAddress, port);// Create a new server socket and set to non blocking modeacceptChannel = ServerSocketChannel.open();acceptChannel.configureBlocking(false);// Bind the server socket to the local host and portbind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port// create a selector;selector= Selector.open();readers = new Reader[readThreads];for (int i = 0; i < readThreads; i++) {Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port " + port);readers[i] = reader;reader.start();}// Register accepts on the server socket with the selector.acceptChannel.register(selector, SelectionKey.OP_ACCEPT);this.setName("IPC Server listener on " + port);this.setDaemon(true);}private class Reader extends Thread {final private BlockingQueue<Connection> pendingConnections;private final Selector readSelector;Reader(String name) throws IOException {super(name);this.pendingConnections =new LinkedBlockingQueue<Connection>(readerPendingConnectionQueue);this.readSelector = Selector.open();}@Overridepublic void run() {LOG.info("Starting " + Thread.currentThread().getName());try {doRunLoop();} finally {try {readSelector.close();} catch (IOException ioe) {LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);}}}private synchronized void doRunLoop() {while (running) {SelectionKey key = null;try {// consume as many connections as currently queued to avoid// unbridled acceptance of connections that starves the selectint size = pendingConnections.size();for (int i=size; i>0; i--) {Connection conn = pendingConnections.take();conn.channel.register(readSelector, SelectionKey.OP_READ, conn);}readSelector.select();Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();while (iter.hasNext()) {key = iter.next();iter.remove();if (key.isValid()) {if (key.isReadable()) {doRead(key);}}key = null;}} catch (InterruptedException e) {if (running) {                      // unexpected -- log itLOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);}} catch (IOException ex) {LOG.error("Error in Reader", ex);}}}/*** Updating the readSelector while it's being used is not thread-safe,* so the connection must be queued.  The reader will drain the queue* and update its readSelector before performing the next select*/public void addConnection(Connection conn) throws InterruptedException {pendingConnections.put(conn);readSelector.wakeup();}void shutdown() {assert !running;readSelector.wakeup();try {super.interrupt();super.join();} catch (InterruptedException ie) {Thread.currentThread().interrupt();}}}@Overridepublic void run() {LOG.info(Thread.currentThread().getName() + ": starting");SERVER.set(Server.this);connectionManager.startIdleScan();while (running) {SelectionKey key = null;try {getSelector().select();Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();while (iter.hasNext()) {key = iter.next();iter.remove();try {if (key.isValid()) {if (key.isAcceptable())doAccept(key);}} catch (IOException e) {}key = null;}} catch (OutOfMemoryError e) {// we can run out of memory if we have too many threads// log the event and sleep for a minute and give // some thread(s) a chance to finishLOG.warn("Out of Memory in server select", e);closeCurrentConnection(key, e);connectionManager.closeIdle(true);try { Thread.sleep(60000); } catch (Exception ie) {}} catch (Exception e) {closeCurrentConnection(key, e);}}LOG.info("Stopping " + Thread.currentThread().getName());synchronized (this) {try {acceptChannel.close();selector.close();} catch (IOException e) { }selector= null;acceptChannel= null;// close all connectionsconnectionManager.stopIdleScan();connectionManager.closeAll();}}private void closeCurrentConnection(SelectionKey key, Throwable e) {if (key != null) {Connection c = (Connection)key.attachment();if (c != null) {closeConnection(c);c = null;}}}InetSocketAddress getAddress() {return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();}void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel channel;while ((channel = server.accept()) != null) {channel.configureBlocking(false);channel.socket().setTcpNoDelay(tcpNoDelay);channel.socket().setKeepAlive(true);Reader reader = getReader();Connection c = connectionManager.register(channel);// If the connectionManager can't take it, close the connection.if (c == null) {if (channel.isOpen()) {IOUtils.cleanup(null, channel);}continue;}key.attach(c);  // so closeCurrentConnection can get the objectreader.addConnection(c);}}void doRead(SelectionKey key) throws InterruptedException {int count;Connection c = (Connection)key.attachment();if (c == null) {return;  }c.setLastContact(Time.now());try {count = c.readAndProcess();} catch (InterruptedException ieo) {LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);throw ieo;} catch (Exception e) {// Do not log WrappedRpcServerExceptionSuppressed.if (!(e instanceof WrappedRpcServerExceptionSuppressed)) {// A WrappedRpcServerException is an exception that has been sent// to the client, so the stacktrace is unnecessary; any other// exceptions are unexpected internal server errors and thus the// stacktrace should be logged.LOG.info(Thread.currentThread().getName() +": readAndProcess from client " + c.getHostAddress() +" threw exception [" + e + "]",(e instanceof WrappedRpcServerException) ? null : e);}count = -1; //so that the (count < 0) block is executed}if (count < 0) {closeConnection(c);c = null;}else {c.setLastContact(Time.now());}}   synchronized void doStop() {if (selector != null) {selector.wakeup();Thread.yield();}if (acceptChannel != null) {try {acceptChannel.socket().close();} catch (IOException e) {LOG.info(Thread.currentThread().getName() + ":Exception in closing listener socket. " + e);}}for (Reader r : readers) {r.shutdown();}}synchronized Selector getSelector() { return selector; }// The method that will return the next reader to work with// Simplistic implementation of round robin for nowReader getReader() {currentReader = (currentReader + 1) % readers.length;return readers[currentReader];}}// Sends responses of RPC back to clients.private class Responder extends Thread {private final Selector writeSelector;private int pending;         // connections waiting to registerfinal static int PURGE_INTERVAL = 900000; // 15minsResponder() throws IOException {this.setName("IPC Server Responder");this.setDaemon(true);writeSelector = Selector.open(); // create a selectorpending = 0;}@Overridepublic void run() {LOG.info(Thread.currentThread().getName() + ": starting");SERVER.set(Server.this);try {doRunLoop();} finally {LOG.info("Stopping " + Thread.currentThread().getName());try {writeSelector.close();} catch (IOException ioe) {LOG.error("Couldn't close write selector in " + Thread.currentThread().getName(), ioe);}}}private void doRunLoop() {long lastPurgeTime = 0;   // last check for old calls.while (running) {try {waitPending();     // If a channel is being registered, wait.writeSelector.select(PURGE_INTERVAL);Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();try {if (key.isValid() && key.isWritable()) {doAsyncWrite(key);}} catch (IOException e) {LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);}}long now = Time.now();if (now < lastPurgeTime + PURGE_INTERVAL) {continue;}lastPurgeTime = now;//// If there were some calls that have not been sent out for a// long time, discard them.//if(LOG.isDebugEnabled()) {LOG.debug("Checking for old call responses.");}ArrayList<Call> calls;// get the list of channels from list of keys.synchronized (writeSelector.keys()) {calls = new ArrayList<Call>(writeSelector.keys().size());iter = writeSelector.keys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();Call call = (Call)key.attachment();if (call != null && key.channel() == call.connection.channel) { calls.add(call);}}}for(Call call : calls) {doPurge(call, now);}} catch (OutOfMemoryError e) {//// we can run out of memory if we have too many threads// log the event and sleep for a minute and give// some thread(s) a chance to finish//LOG.warn("Out of Memory in server select", e);try { Thread.sleep(60000); } catch (Exception ie) {}} catch (Exception e) {LOG.warn("Exception in Responder", e);}}}private void doAsyncWrite(SelectionKey key) throws IOException {Call call = (Call)key.attachment();if (call == null) {return;}if (key.channel() != call.connection.channel) {throw new IOException("doAsyncWrite: bad channel");}synchronized(call.connection.responseQueue) {if (processResponse(call.connection.responseQueue, false)) {try {key.interestOps(0);} catch (CancelledKeyException e) {/* The Listener/reader might have closed the socket.* We don't explicitly cancel the key, so not sure if this will* ever fire.* This warning could be removed.*/LOG.warn("Exception while changing ops : " + e);}}}}//// Remove calls that have been pending in the responseQueue // for a long time.//private void doPurge(Call call, long now) {LinkedList<Call> responseQueue = call.connection.responseQueue;synchronized (responseQueue) {Iterator<Call> iter = responseQueue.listIterator(0);while (iter.hasNext()) {call = iter.next();if (now > call.timestamp + PURGE_INTERVAL) {closeConnection(call.connection);break;}}}}// Processes one response. Returns true if there are no more pending// data for this channel.//private boolean processResponse(LinkedList<Call> responseQueue,boolean inHandler) throws IOException {boolean error = true;boolean done = false;       // there is more data for this channel.int numElements = 0;Call call = null;try {synchronized (responseQueue) {//// If there are no items for this channel, then we are done//numElements = responseQueue.size();if (numElements == 0) {error = false;return true;              // no more data for this channel.}//// Extract the first call//call = responseQueue.removeFirst();SocketChannel channel = call.connection.channel;if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call);}//// Send as much data as we can in the non-blocking fashion//int numBytes = channelWrite(channel, call.rpcResponse);if (numBytes < 0) {return true;}if (!call.rpcResponse.hasRemaining()) {//Clear out the response buffer so it can be collectedcall.rpcResponse = null;call.connection.decRpcCount();if (numElements == 1) {    // last call fully processes.done = true;             // no more data for this channel.} else {done = false;            // more calls pending to be sent.}if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call+ " Wrote " + numBytes + " bytes.");}} else {//// If we were unable to write the entire response out, then // insert in Selector queue. //call.connection.responseQueue.addFirst(call);if (inHandler) {// set the serve time when the response has to be sent latercall.timestamp = Time.now();incPending();try {// Wakeup the thread blocked on select, only then can the call // to channel.register() complete.writeSelector.wakeup();channel.register(writeSelector, SelectionKey.OP_WRITE, call);} catch (ClosedChannelException e) {//Its ok. channel might be closed else where.done = true;} finally {decPending();}}if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call+ " Wrote partial " + numBytes + " bytes.");}}error = false;              // everything went off well}} finally {if (error && call != null) {LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");done = true;               // error. no more data for this channel.closeConnection(call.connection);}}return done;}//// Enqueue a response from the application.//void doRespond(Call call) throws IOException {synchronized (call.connection.responseQueue) {call.connection.responseQueue.addLast(call);if (call.connection.responseQueue.size() == 1) {processResponse(call.connection.responseQueue, true);}}}private synchronized void incPending() {   // call waiting to be enqueued.pending++;}private synchronized void decPending() { // call done enqueueing.pending--;notify();}private synchronized void waitPending() throws InterruptedException {while (pending > 0) {wait();}}}@InterfaceAudience.Privatepublic static enum AuthProtocol {NONE(0),SASL(-33);public final int callId;AuthProtocol(int callId) {this.callId = callId;}static AuthProtocol valueOf(int callId) {for (AuthProtocol authType : AuthProtocol.values()) {if (authType.callId == callId) {return authType;}}return null;}};/*** Wrapper for RPC IOExceptions to be returned to the client.  Used to* let exceptions bubble up to top of processOneRpc where the correct* callId can be associated with the response.  Also used to prevent* unnecessary stack trace logging if it's not an internal server error. */@SuppressWarnings("serial")private static class WrappedRpcServerException extends RpcServerException {private final RpcErrorCodeProto errCode;public WrappedRpcServerException(RpcErrorCodeProto errCode, IOException ioe) {super(ioe.toString(), ioe);this.errCode = errCode;}public WrappedRpcServerException(RpcErrorCodeProto errCode, String message) {this(errCode, new RpcServerException(message));}@Overridepublic RpcErrorCodeProto getRpcErrorCodeProto() {return errCode;}@Overridepublic String toString() {return getCause().toString();}}/*** A WrappedRpcServerException that is suppressed altogether* for the purposes of logging.*/private static class WrappedRpcServerExceptionSuppressedextends WrappedRpcServerException {public WrappedRpcServerExceptionSuppressed(RpcErrorCodeProto errCode, IOException ioe) {super(errCode, ioe);}}/** Reads calls from a connection and queues them for handling. */public class Connection {private boolean connectionHeaderRead = false; // connection  header is read?private boolean connectionContextRead = false; //if connection context that//follows connection header is readprivate SocketChannel channel;private ByteBuffer data;private ByteBuffer dataLengthBuffer;private LinkedList<Call> responseQueue;// number of outstanding rpcsprivate AtomicInteger rpcCount = new AtomicInteger();private long lastContact;private int dataLength;private Socket socket;// Cache the remote host & port info so that even if the socket is // disconnected, we can say where it used to connect to.private String hostAddress;private int remotePort;private InetAddress addr;IpcConnectionContextProto connectionContext;String protocolName;SaslServer saslServer;private AuthMethod authMethod;private AuthProtocol authProtocol;private boolean saslContextEstablished;private ByteBuffer connectionHeaderBuf = null;private ByteBuffer unwrappedData;private ByteBuffer unwrappedDataLengthBuffer;private int serviceClass;UserGroupInformation user = null;public UserGroupInformation attemptingUser = null; // user name before auth// Fake 'call' for failed authorization responseprivate final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID,RpcConstants.INVALID_RETRY_COUNT, null, this);private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();private final Call saslCall = new Call(AuthProtocol.SASL.callId,RpcConstants.INVALID_RETRY_COUNT, null, this);private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();private boolean sentNegotiate = false;private boolean useWrap = false;public Connection(SocketChannel channel, long lastContact) {this.channel = channel;this.lastContact = lastContact;this.data = null;this.dataLengthBuffer = ByteBuffer.allocate(4);this.unwrappedData = null;this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);this.socket = channel.socket();this.addr = socket.getInetAddress();if (addr == null) {this.hostAddress = "*Unknown*";} else {this.hostAddress = addr.getHostAddress();}this.remotePort = socket.getPort();this.responseQueue = new LinkedList<Call>();if (socketSendBufferSize != 0) {try {socket.setSendBufferSize(socketSendBufferSize);} catch (IOException e) {LOG.warn("Connection: unable to set socket send buffer size to " +socketSendBufferSize);}}}   @Overridepublic String toString() {return getHostAddress() + ":" + remotePort; }public String getHostAddress() {return hostAddress;}public InetAddress getHostInetAddress() {return addr;}public void setLastContact(long lastContact) {this.lastContact = lastContact;}public long getLastContact() {return lastContact;}/* Return true if the connection has no outstanding rpc */private boolean isIdle() {return rpcCount.get() == 0;}/* Decrement the outstanding RPC count */private void decRpcCount() {rpcCount.decrementAndGet();}/* Increment the outstanding RPC count */private void incRpcCount() {rpcCount.incrementAndGet();}private UserGroupInformation getAuthorizedUgi(String authorizedId)throws InvalidToken, AccessControlException {if (authMethod == AuthMethod.TOKEN) {TokenIdentifier tokenId = SaslRpcServer.getIdentifier(authorizedId,secretManager);UserGroupInformation ugi = tokenId.getUser();if (ugi == null) {throw new AccessControlException("Can't retrieve username from tokenIdentifier.");}ugi.addTokenIdentifier(tokenId);return ugi;} else {return UserGroupInformation.createRemoteUser(authorizedId, authMethod);}}private void saslReadAndProcess(DataInputStream dis) throwsWrappedRpcServerException, IOException, InterruptedException {final RpcSaslProto saslMessage =decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis);switch (saslMessage.getState()) {case WRAP: {if (!saslContextEstablished || !useWrap) {throw new WrappedRpcServerException(RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,new SaslException("Server is not wrapping data"));}// loops over decoded data and calls processOneRpcunwrapPacketAndProcessRpcs(saslMessage.getToken().toByteArray());break;}default:saslProcess(saslMessage);}}private Throwable getCauseForInvalidToken(IOException e) {Throwable cause = e;while (cause != null) {if (cause instanceof RetriableException) {return cause;} else if (cause instanceof StandbyException) {return cause;} else if (cause instanceof InvalidToken) {// FIXME: hadoop method signatures are restricting the SASL// callbacks to only returning InvalidToken, but some services// need to throw other exceptions (ex. NN + StandyException),// so for now we'll tunnel the real exceptions via an// InvalidToken's cause which normally is not set if (cause.getCause() != null) {cause = cause.getCause();}return cause;}cause = cause.getCause();}return e;}private void saslProcess(RpcSaslProto saslMessage)throws WrappedRpcServerException, IOException, InterruptedException {if (saslContextEstablished) {throw new WrappedRpcServerException(RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,new SaslException("Negotiation is already complete"));}RpcSaslProto saslResponse = null;try {try {saslResponse = processSaslMessage(saslMessage);} catch (IOException e) {rpcMetrics.incrAuthenticationFailures();// attempting user could be nullAUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":"+ attemptingUser + " (" + e.getLocalizedMessage() + ")");throw (IOException) getCauseForInvalidToken(e);}if (saslServer != null && saslServer.isComplete()) {if (LOG.isDebugEnabled()) {LOG.debug("SASL server context established. Negotiated QoP is "+ saslServer.getNegotiatedProperty(Sasl.QOP));}user = getAuthorizedUgi(saslServer.getAuthorizationID());if (LOG.isDebugEnabled()) {LOG.debug("SASL server successfully authenticated client: " + user);}rpcMetrics.incrAuthenticationSuccesses();AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);saslContextEstablished = true;}} catch (WrappedRpcServerException wrse) { // don't re-wrapthrow wrse;} catch (IOException ioe) {throw new WrappedRpcServerException(RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe);}// send back response if any, may throw IOExceptionif (saslResponse != null) {doSaslReply(saslResponse);}// do NOT enable wrapping until the last auth response is sentif (saslContextEstablished) {String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);// SASL wrapping is only used if the connection has a QOP, and// the value is not auth.  ex. auth-int & auth-privuseWrap = (qop != null && !"auth".equalsIgnoreCase(qop));        }}private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)throws IOException, InterruptedException {final RpcSaslProto saslResponse;final SaslState state = saslMessage.getState(); // required      switch (state) {case NEGOTIATE: {if (sentNegotiate) {throw new AccessControlException("Client already attempted negotiation");}saslResponse = buildSaslNegotiateResponse();// simple-only server negotiate response is success which client// interprets as switch to simpleif (saslResponse.getState() == SaslState.SUCCESS) {switchToSimple();}break;}case INITIATE: {if (saslMessage.getAuthsCount() != 1) {throw new SaslException("Client mechanism is malformed");}// verify the client requested an advertised authTypeSaslAuth clientSaslAuth = saslMessage.getAuths(0);if (!negotiateResponse.getAuthsList().contains(clientSaslAuth)) {if (sentNegotiate) {throw new AccessControlException(clientSaslAuth.getMethod() + " authentication is not enabled."+ "  Available:" + enabledAuthMethods);}saslResponse = buildSaslNegotiateResponse();break;}authMethod = AuthMethod.valueOf(clientSaslAuth.getMethod());// abort SASL for SIMPLE auth, server has already ensured that// SIMPLE is a legit option above.  we will send no responseif (authMethod == AuthMethod.SIMPLE) {switchToSimple();saslResponse = null;break;}// sasl server for tokens may already be instantiatedif (saslServer == null || authMethod != AuthMethod.TOKEN) {saslServer = createSaslServer(authMethod);}saslResponse = processSaslToken(saslMessage);break;}case RESPONSE: {saslResponse = processSaslToken(saslMessage);break;}default:throw new SaslException("Client sent unsupported state " + state);}return saslResponse;}private RpcSaslProto processSaslToken(RpcSaslProto saslMessage)throws SaslException {if (!saslMessage.hasToken()) {throw new SaslException("Client did not send a token");}byte[] saslToken = saslMessage.getToken().toByteArray();if (LOG.isDebugEnabled()) {LOG.debug("Have read input token of size " + saslToken.length+ " for processing by saslServer.evaluateResponse()");}saslToken = saslServer.evaluateResponse(saslToken);return buildSaslResponse(saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE,saslToken);}private void switchToSimple() {// disable SASL and blank out any SASL serverauthProtocol = AuthProtocol.NONE;saslServer = null;}private RpcSaslProto buildSaslResponse(SaslState state, byte[] replyToken) {if (LOG.isDebugEnabled()) {LOG.debug("Will send " + state + " token of size "+ ((replyToken != null) ? replyToken.length : null)+ " from saslServer.");}RpcSaslProto.Builder response = RpcSaslProto.newBuilder();response.setState(state);if (replyToken != null) {response.setToken(ByteString.copyFrom(replyToken));}return response.build();}private void doSaslReply(Message message) throws IOException {if (LOG.isDebugEnabled()) {LOG.debug("Sending sasl message "+message);}setupResponse(saslResponse, saslCall,RpcStatusProto.SUCCESS, null,new RpcResponseWrapper(message), null, null);responder.doRespond(saslCall);}private void doSaslReply(Exception ioe) throws IOException {setupResponse(authFailedResponse, authFailedCall,RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED,null, ioe.getClass().getName(), ioe.getLocalizedMessage());responder.doRespond(authFailedCall);}private void disposeSasl() {if (saslServer != null) {try {saslServer.dispose();} catch (SaslException ignored) {}}}private void checkDataLength(int dataLength) throws IOException {if (dataLength < 0) {String error = "Unexpected data length " + dataLength +"!! from " + getHostAddress();LOG.warn(error);throw new IOException(error);} else if (dataLength > maxDataLength) {String error = "Requested data length " + dataLength +" is longer than maximum configured RPC length " + maxDataLength + ".  RPC came from " + getHostAddress();LOG.warn(error);throw new IOException(error);}}public int readAndProcess()throws WrappedRpcServerException, IOException, InterruptedException {while (true) {/* Read at most one RPC. If the header is not read completely yet* then iterate until we read first RPC or until there is no data left.*/    int count = -1;if (dataLengthBuffer.remaining() > 0) {count = channelRead(channel, dataLengthBuffer);       if (count < 0 || dataLengthBuffer.remaining() > 0) return count;}if (!connectionHeaderRead) {//Every connection is expected to send the header.if (connectionHeaderBuf == null) {connectionHeaderBuf = ByteBuffer.allocate(3);}count = channelRead(channel, connectionHeaderBuf);if (count < 0 || connectionHeaderBuf.remaining() > 0) {return count;}int version = connectionHeaderBuf.get(0);// TODO we should add handler for service class laterthis.setServiceClass(connectionHeaderBuf.get(1));dataLengthBuffer.flip();// Check if it looks like the user is hitting an IPC port// with an HTTP GET - this is a common error, so we can// send back a simple string indicating as much.if (HTTP_GET_BYTES.equals(dataLengthBuffer)) {setupHttpRequestOnIpcPortResponse();return -1;}if (!RpcConstants.HEADER.equals(dataLengthBuffer)|| version != CURRENT_VERSION) {//Warning is ok since this is not supposed to happen.LOG.warn("Incorrect header or version mismatch from " + hostAddress + ":" + remotePort +" got version " + version + " expected version " + CURRENT_VERSION);setupBadVersionResponse(version);return -1;}// this may switch us into SIMPLEauthProtocol = initializeAuthContext(connectionHeaderBuf.get(2));          dataLengthBuffer.clear();connectionHeaderBuf = null;connectionHeaderRead = true;continue;}if (data == null) {dataLengthBuffer.flip();dataLength = dataLengthBuffer.getInt();checkDataLength(dataLength);data = ByteBuffer.allocate(dataLength);}count = channelRead(channel, data);if (data.remaining() == 0) {dataLengthBuffer.clear();data.flip();boolean isHeaderRead = connectionContextRead;processOneRpc(data.array());data = null;if (!isHeaderRead) {continue;}} return count;}}private AuthProtocol initializeAuthContext(int authType)throws IOException {AuthProtocol authProtocol = AuthProtocol.valueOf(authType);if (authProtocol == null) {IOException ioe = new IpcException("Unknown auth protocol:" + authType);doSaslReply(ioe);throw ioe;        }boolean isSimpleEnabled = enabledAuthMethods.contains(AuthMethod.SIMPLE);switch (authProtocol) {case NONE: {// don't reply if client is simple and server is insecureif (!isSimpleEnabled) {IOException ioe = new AccessControlException("SIMPLE authentication is not enabled."+ "  Available:" + enabledAuthMethods);doSaslReply(ioe);throw ioe;}break;}default: {break;}}return authProtocol;}private RpcSaslProto buildSaslNegotiateResponse()throws IOException, InterruptedException {RpcSaslProto negotiateMessage = negotiateResponse;// accelerate token negotiation by sending initial challenge// in the negotiation responseif (enabledAuthMethods.contains(AuthMethod.TOKEN)) {saslServer = createSaslServer(AuthMethod.TOKEN);byte[] challenge = saslServer.evaluateResponse(new byte[0]);RpcSaslProto.Builder negotiateBuilder =RpcSaslProto.newBuilder(negotiateResponse);negotiateBuilder.getAuthsBuilder(0)  // TOKEN is always first.setChallenge(ByteString.copyFrom(challenge));negotiateMessage = negotiateBuilder.build();}sentNegotiate = true;return negotiateMessage;}private SaslServer createSaslServer(AuthMethod authMethod)throws IOException, InterruptedException {final Map<String,?> saslProps =saslPropsResolver.getServerProperties(addr);return new SaslRpcServer(authMethod).create(this, saslProps, secretManager);}/*** Try to set up the response to indicate that the client version* is incompatible with the server. This can contain special-case* code to speak enough of past IPC protocols to pass back* an exception to the caller.* @param clientVersion the version the caller is using * @throws IOException*/private void setupBadVersionResponse(int clientVersion) throws IOException {String errMsg = "Server IPC version " + CURRENT_VERSION +" cannot communicate with client version " + clientVersion;ByteArrayOutputStream buffer = new ByteArrayOutputStream();if (clientVersion >= 9) {// Versions >>9  understand the normal responseCall fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,this);setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,null, VersionMismatch.class.getName(), errMsg);responder.doRespond(fakeCall);} else if (clientVersion >= 3) {Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,this);// Versions 3 to 8 use older responsesetupResponseOldVersionFatal(buffer, fakeCall,null, VersionMismatch.class.getName(), errMsg);responder.doRespond(fakeCall);} else if (clientVersion == 2) { // Hadoop 0.18.3Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null,this);DataOutputStream out = new DataOutputStream(buffer);out.writeInt(0); // call IDout.writeBoolean(true); // errorWritableUtils.writeString(out, VersionMismatch.class.getName());WritableUtils.writeString(out, errMsg);fakeCall.setResponse(ByteBuffer.wrap(buffer.toByteArray()));responder.doRespond(fakeCall);}}private void setupHttpRequestOnIpcPortResponse() throws IOException {Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this);fakeCall.setResponse(ByteBuffer.wrap(RECEIVED_HTTP_REQ_RESPONSE.getBytes(Charsets.UTF_8)));responder.doRespond(fakeCall);}/** Reads the connection context following the connection header* @param dis - DataInputStream from which to read the header * @throws WrappedRpcServerException - if the header cannot be*         deserialized, or the user is not authorized*/ private void processConnectionContext(DataInputStream dis)throws WrappedRpcServerException {// allow only one connection context during a sessionif (connectionContextRead) {throw new WrappedRpcServerException(RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,"Connection context already processed");}connectionContext = decodeProtobufFromStream(IpcConnectionContextProto.newBuilder(), dis);protocolName = connectionContext.hasProtocol() ? connectionContext.getProtocol() : null;UserGroupInformation protocolUser = ProtoUtil.getUgi(connectionContext);if (saslServer == null) {user = protocolUser;} else {// user is authenticateduser.setAuthenticationMethod(authMethod);//Now we check if this is a proxy user case. If the protocol user is//different from the 'user', it is a proxy user scenario. However, //this is not allowed if user authenticated with DIGEST.if ((protocolUser != null)&& (!protocolUser.getUserName().equals(user.getUserName()))) {if (authMethod == AuthMethod.TOKEN) {// Not allowed to doAs if token authentication is usedthrow new WrappedRpcServerException(RpcErrorCodeProto.FATAL_UNAUTHORIZED,new AccessControlException("Authenticated user (" + user+ ") doesn't match what the client claims to be ("+ protocolUser + ")"));} else {// Effective user can be different from authenticated user// for simple auth or kerberos auth// The user is the real user. Now we create a proxy userUserGroupInformation realUser = user;user = UserGroupInformation.createProxyUser(protocolUser.getUserName(), realUser);}}}authorizeConnection();// don't set until after authz because connection isn't establishedconnectionContextRead = true;}/*** Process a wrapped RPC Request - unwrap the SASL packet and process* each embedded RPC request * @param buf - SASL wrapped request of one or more RPCs* @throws IOException - SASL packet cannot be unwrapped* @throws InterruptedException*/    private void unwrapPacketAndProcessRpcs(byte[] inBuf)throws WrappedRpcServerException, IOException, InterruptedException {if (LOG.isDebugEnabled()) {LOG.debug("Have read input token of size " + inBuf.length+ " for processing by saslServer.unwrap()");}inBuf = saslServer.unwrap(inBuf, 0, inBuf.length);ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));// Read all RPCs contained in the inBuf, even partial oneswhile (true) {int count = -1;if (unwrappedDataLengthBuffer.remaining() > 0) {count = channelRead(ch, unwrappedDataLengthBuffer);if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)return;}if (unwrappedData == null) {unwrappedDataLengthBuffer.flip();int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();unwrappedData = ByteBuffer.allocate(unwrappedDataLength);}count = channelRead(ch, unwrappedData);if (count <= 0 || unwrappedData.remaining() > 0)return;if (unwrappedData.remaining() == 0) {unwrappedDataLengthBuffer.clear();unwrappedData.flip();processOneRpc(unwrappedData.array());unwrappedData = null;}}}/*** Process an RPC Request - handle connection setup and decoding of* request into a Call* @param buf - contains the RPC request header and the rpc request* @throws IOException - internal error that should not be returned to*         client, typically failure to respond to client* @throws WrappedRpcServerException - an exception to be sent back to*         the client that does not require verbose logging by the*         Listener thread* @throws InterruptedException*/    private void processOneRpc(byte[] buf)throws IOException, WrappedRpcServerException, InterruptedException {int callId = -1;int retry = RpcConstants.INVALID_RETRY_COUNT;try {final DataInputStream dis =new DataInputStream(new ByteArrayInputStream(buf));final RpcRequestHeaderProto header =decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);callId = header.getCallId();retry = header.getRetryCount();if (LOG.isDebugEnabled()) {LOG.debug(" got #" + callId);}checkRpcHeaders(header);if (callId < 0) { // callIds typically used during connection setupprocessRpcOutOfBandRequest(header, dis);} else if (!connectionContextRead) {throw new WrappedRpcServerException(RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,"Connection context not established");} else {processRpcRequest(header, dis);}} catch (WrappedRpcServerException wrse) { // inform client of errorThrowable ioe = wrse.getCause();final Call call = new Call(callId, retry, null, this);setupResponse(authFailedResponse, call,RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,ioe.getClass().getName(), ioe.getMessage());responder.doRespond(call);throw wrse;}}/*** Verify RPC header is valid* @param header - RPC request header* @throws WrappedRpcServerException - header contains invalid values */private void checkRpcHeaders(RpcRequestHeaderProto header)throws WrappedRpcServerException {if (!header.hasRpcOp()) {String err = " IPC Server: No rpc op in rpcRequestHeader";throw new WrappedRpcServerException(RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);}if (header.getRpcOp() != RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {String err = "IPC Server does not implement rpc header operation" + header.getRpcOp();throw new WrappedRpcServerException(RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);}// If we know the rpc kind, get its class so that we can deserialize// (Note it would make more sense to have the handler deserialize but // we continue with this original design.if (!header.hasRpcKind()) {String err = " IPC Server: No rpc kind in rpcRequestHeader";throw new WrappedRpcServerException(RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);}}/*** Process an RPC Request - the connection headers and context must* have been already read* @param header - RPC request header* @param dis - stream to request payload* @throws WrappedRpcServerException - due to fatal rpc layer issues such*   as invalid header or deserialization error. In this case a RPC fatal*   status response will later be sent back to client.* @throws InterruptedException*/private void processRpcRequest(RpcRequestHeaderProto header,DataInputStream dis) throws WrappedRpcServerException,InterruptedException {Class<? extends Writable> rpcRequestClass = getRpcRequestWrapper(header.getRpcKind());if (rpcRequestClass == null) {LOG.warn("Unknown rpc kind "  + header.getRpcKind() + " from client " + getHostAddress());final String err = "Unknown rpc kind in rpc header"  + header.getRpcKind();throw new WrappedRpcServerException(RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);   }Writable rpcRequest;try { //Read the rpc requestrpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);rpcRequest.readFields(dis);} catch (Throwable t) { // includes runtime exception from newInstanceLOG.warn("Unable to read call parameters for client " +getHostAddress() + "on connection protocol " +this.protocolName + " for rpcKind " + header.getRpcKind(),  t);String err = "IPC server unable to read call parameters: "+ t.getMessage();throw new WrappedRpcServerException(RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);}Span traceSpan = null;if (header.hasTraceInfo()) {// If the incoming RPC included tracing info, always continue the traceTraceInfo parentSpan = new TraceInfo(header.getTraceInfo().getTraceId(),header.getTraceInfo().getParentId());traceSpan = Trace.startSpan(rpcRequest.toString(), parentSpan).detach();}CallerContext callerContext = null;if (header.hasCallerContext()) {callerContext =new CallerContext.Builder(header.getCallerContext().getContext()).setSignature(header.getCallerContext().getSignature().toByteArray()).build();}Call call = new Call(header.getCallId(), header.getRetryCount(),rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),header.getClientId().toByteArray(), traceSpan, callerContext);if (callQueue.isClientBackoffEnabled()) {// if RPC queue is full, we will ask the RPC client to back off by// throwing RetriableException. Whether RPC client will honor// RetriableException and retry depends on client ipc retry policy.// For example, FailoverOnNetworkExceptionRetry handles// RetriableException.queueRequestOrAskClientToBackOff(call);} else {callQueue.put(call);              // queue the call; maybe blocked here}incRpcCount();  // Increment the rpc count}private void queueRequestOrAskClientToBackOff(Call call)throws WrappedRpcServerException, InterruptedException {// If rpc queue is full, we will ask the client to back off.boolean isCallQueued = callQueue.offer(call);if (!isCallQueued) {rpcMetrics.incrClientBackoff();RetriableException retriableException =new RetriableException("Server is too busy.");throw new WrappedRpcServerExceptionSuppressed(RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException);}}/*** Establish RPC connection setup by negotiating SASL if required, then* reading and authorizing the connection header* @param header - RPC header* @param dis - stream to request payload* @throws WrappedRpcServerException - setup failed due to SASL*         negotiation failure, premature or invalid connection context,*         or other state errors * @throws IOException - failed to send a response back to the client* @throws InterruptedException*/private void processRpcOutOfBandRequest(RpcRequestHeaderProto header,DataInputStream dis) throws WrappedRpcServerException, IOException,InterruptedException {final int callId = header.getCallId();if (callId == CONNECTION_CONTEXT_CALL_ID) {// SASL must be established prior to connection contextif (authProtocol == AuthProtocol.SASL && !saslContextEstablished) {throw new WrappedRpcServerException(RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,"Connection header sent during SASL negotiation");}// read and authorize the userprocessConnectionContext(dis);} else if (callId == AuthProtocol.SASL.callId) {// if client was switched to simple, ignore first SASL messageif (authProtocol != AuthProtocol.SASL) {throw new WrappedRpcServerException(RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,"SASL protocol not requested by client");}saslReadAndProcess(dis);} else if (callId == PING_CALL_ID) {LOG.debug("Received ping message");} else {throw new WrappedRpcServerException(RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,"Unknown out of band call #" + callId);}}    /*** Authorize proxy users to access this server* @throws WrappedRpcServerException - user is not allowed to proxy*/private void authorizeConnection() throws WrappedRpcServerException {try {// If auth method is TOKEN, the token was obtained by the// real user for the effective user, therefore not required to// authorize real user. doAs is allowed only for simple or kerberos// authenticationif (user != null && user.getRealUser() != null&& (authMethod != AuthMethod.TOKEN)) {ProxyUsers.authorize(user, this.getHostAddress());}authorize(user, protocolName, getHostInetAddress());if (LOG.isDebugEnabled()) {LOG.debug("Successfully authorized " + connectionContext);}rpcMetrics.incrAuthorizationSuccesses();} catch (AuthorizationException ae) {LOG.info("Connection from " + this+ " for protocol " + connectionContext.getProtocol()+ " is unauthorized for user " + user);rpcMetrics.incrAuthorizationFailures();throw new WrappedRpcServerException(RpcErrorCodeProto.FATAL_UNAUTHORIZED, ae);}}/*** Decode the a protobuf from the given input stream * @param builder - Builder of the protobuf to decode* @param dis - DataInputStream to read the protobuf* @return Message - decoded protobuf* @throws WrappedRpcServerException - deserialization failed*/@SuppressWarnings("unchecked")private <T extends Message> T decodeProtobufFromStream(Builder builder,DataInputStream dis) throws WrappedRpcServerException {try {builder.mergeDelimitedFrom(dis);return (T)builder.build();} catch (Exception ioe) {Class<?> protoClass = builder.getDefaultInstanceForType().getClass();throw new WrappedRpcServerException(RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,"Error decoding " + protoClass.getSimpleName() + ": "+ ioe);}}/*** Get service class for connection* @return the serviceClass*/public int getServiceClass() {return serviceClass;}/*** Set service class for connection* @param serviceClass the serviceClass to set*/public void setServiceClass(int serviceClass) {this.serviceClass = serviceClass;}private synchronized void close() {disposeSasl();data = null;dataLengthBuffer = null;if (!channel.isOpen())return;try {socket.shutdownOutput();} catch(Exception e) {LOG.debug("Ignoring socket shutdown exception", e);}if (channel.isOpen()) {IOUtils.cleanup(null, channel);}IOUtils.cleanup(null, socket);}}/** Handles queued calls . */private class Handler extends Thread {public Handler(int instanceNumber) {this.setDaemon(true);this.setName("IPC Server handler "+ instanceNumber + " on " + port);}@Overridepublic void run() {LOG.debug(Thread.currentThread().getName() + ": starting");SERVER.set(Server.this);ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);while (running) {TraceScope traceScope = null;try {final Call call = callQueue.take(); // pop the queue; maybe blocked hereif (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);}if (!call.connection.channel.isOpen()) {LOG.info(Thread.currentThread().getName() + ": skipped " + call);continue;}String errorClass = null;String error = null;RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;RpcErrorCodeProto detailedErr = null;Writable value = null;CurCall.set(call);if (call.traceSpan != null) {traceScope = Trace.continueSpan(call.traceSpan);}// always update the current call contextCallerContext.setCurrent(call.callerContext);try {// Make the call as the user via Subject.doAs, thus associating// the call with the Subjectif (call.connection.user == null) {value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp);} else {value = call.connection.user.doAs(new PrivilegedExceptionAction<Writable>() {@Overridepublic Writable run() throws Exception {// make the callreturn call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp);}});}} catch (Throwable e) {if (e instanceof UndeclaredThrowableException) {e = e.getCause();}logException(LOG, e, call);if (e instanceof RpcServerException) {RpcServerException rse = ((RpcServerException)e); returnStatus = rse.getRpcStatusProto();detailedErr = rse.getRpcErrorCodeProto();} else {returnStatus = RpcStatusProto.ERROR;detailedErr = RpcErrorCodeProto.ERROR_APPLICATION;}errorClass = e.getClass().getName();error = StringUtils.stringifyException(e);// Remove redundant error class name from the beginning of the stack traceString exceptionHdr = errorClass + ": ";if (error.startsWith(exceptionHdr)) {error = error.substring(exceptionHdr.length());}}CurCall.set(null);synchronized (call.connection.responseQueue) {// setupResponse() needs to be sync'ed together with // responder.doResponse() since setupResponse may use// SASL to encrypt response data and SASL enforces// its own message ordering.setupResponse(buf, call, returnStatus, detailedErr, value, errorClass, error);// Discard the large buf and reset it back to smaller size // to free up heapif (buf.size() > maxRespSize) {LOG.warn("Large response size " + buf.size() + " for call "+ call.toString());buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);}responder.doRespond(call);}} catch (InterruptedException e) {if (running) {                          // unexpected -- log itLOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);if (Trace.isTracing()) {traceScope.getSpan().addTimelineAnnotation("unexpectedly interrupted: " +StringUtils.stringifyException(e));}}} catch (Exception e) {LOG.info(Thread.currentThread().getName() + " caught an exception", e);if (Trace.isTracing()) {traceScope.getSpan().addTimelineAnnotation("Exception: " +StringUtils.stringifyException(e));}} finally {if (traceScope != null) {traceScope.close();}IOUtils.cleanup(LOG, traceScope);}}LOG.debug(Thread.currentThread().getName() + ": exiting");}}@VisibleForTestingvoid logException(Log logger, Throwable e, Call call) {if (exceptionsHandler.isSuppressedLog(e.getClass())) {return; // Log nothing.}final String logMsg = Thread.currentThread().getName() + ", call " + call;if (exceptionsHandler.isTerseLog(e.getClass())) {// Don't log the whole stack trace. Way too noisy!logger.info(logMsg + ": " + e);} else if (e instanceof RuntimeException || e instanceof Error) {// These exception types indicate something is probably wrong// on the server side, as opposed to just a normal exceptional// result.logger.warn(logMsg, e);} else {logger.info(logMsg, e);}}protected Server(String bindAddress, int port,Class<? extends Writable> paramClass, int handlerCount, Configuration conf)throws IOException {this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null, null);}protected Server(String bindAddress, int port,Class<? extends Writable> rpcRequestClass, int handlerCount,int numReaders, int queueSizePerHandler, Configuration conf,String serverName, SecretManager<? extends TokenIdentifier> secretManager)throws IOException {this(bindAddress, port, rpcRequestClass, handlerCount, numReaders, queueSizePerHandler, conf, serverName, secretManager, null);}/** * Constructs a server listening on the named port and address.  Parameters passed must* be of the named class.  The <code>handlerCount</handlerCount> determines* the number of handler threads that will be used to process calls.* If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters* from configuration. Otherwise the configuration will be picked up.* * If rpcRequestClass is null then the rpcRequestClass must have been * registered via {@link #registerProtocolEngine(RpcPayloadHeader.RpcKind,*  Class, RPC.RpcInvoker)}* This parameter has been retained for compatibility with existing tests* and usage.*/@SuppressWarnings("unchecked")protected Server(String bindAddress, int port,Class<? extends Writable> rpcRequestClass, int handlerCount,int numReaders, int queueSizePerHandler, Configuration conf,String serverName, SecretManager<? extends TokenIdentifier> secretManager,String portRangeConfig)throws IOException {this.bindAddress = bindAddress;this.conf = conf;this.portRangeConfig = portRangeConfig;this.port = port;this.rpcRequestClass = rpcRequestClass; this.handlerCount = handlerCount;this.socketSendBufferSize = 0;this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);if (queueSizePerHandler != -1) {this.maxQueueSize = queueSizePerHandler;} else {this.maxQueueSize = handlerCount * conf.getInt(CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);      }this.maxRespSize = conf.getInt(CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);if (numReaders != -1) {this.readThreads = numReaders;} else {this.readThreads = conf.getInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);}this.readerPendingConnectionQueue = conf.getInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);// Setup appropriate callqueuefinal String prefix = getQueueClassPrefix();this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);this.secretManager = (SecretManager<TokenIdentifier>) secretManager;this.authorize = conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);// configure supported authenticationsthis.enabledAuthMethods = getAuthMethods(secretManager, conf);this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods);// Start the listener here and let it bind to the portlistener = new Listener();this.port = listener.getAddress().getPort();    connectionManager = new ConnectionManager();this.rpcMetrics = RpcMetrics.create(this, conf);this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);this.tcpNoDelay = conf.getBoolean(CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT);this.setLogSlowRPC(conf.getBoolean(CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));// Create the responder hereresponder = new Responder();if (secretManager != null || UserGroupInformation.isSecurityEnabled()) {SaslRpcServer.init(conf);saslPropsResolver = SaslPropertiesResolver.getInstance(conf);}this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);}private RpcSaslProto buildNegotiateResponse(List<AuthMethod> authMethods)throws IOException {RpcSaslProto.Builder negotiateBuilder = RpcSaslProto.newBuilder();if (authMethods.contains(AuthMethod.SIMPLE) && authMethods.size() == 1) {// SIMPLE-only servers return success in response to negotiatenegotiateBuilder.setState(SaslState.SUCCESS);} else {negotiateBuilder.setState(SaslState.NEGOTIATE);for (AuthMethod authMethod : authMethods) {SaslRpcServer saslRpcServer = new SaslRpcServer(authMethod);      SaslAuth.Builder builder = negotiateBuilder.addAuthsBuilder().setMethod(authMethod.toString()).setMechanism(saslRpcServer.mechanism);if (saslRpcServer.protocol != null) {builder.setProtocol(saslRpcServer.protocol);}if (saslRpcServer.serverId != null) {builder.setServerId(saslRpcServer.serverId);}}}return negotiateBuilder.build();}// get the security type from the conf. implicitly include token support// if a secret manager is provided, or fail if token is the conf value but// there is no secret managerprivate List<AuthMethod> getAuthMethods(SecretManager<?> secretManager,Configuration conf) {AuthenticationMethod confAuthenticationMethod =SecurityUtil.getAuthenticationMethod(conf);        List<AuthMethod> authMethods = new ArrayList<AuthMethod>();if (confAuthenticationMethod == AuthenticationMethod.TOKEN) {if (secretManager == null) {throw new IllegalArgumentException(AuthenticationMethod.TOKEN +" authentication requires a secret manager");} } else if (secretManager != null) {LOG.debug(AuthenticationMethod.TOKEN +" authentication enabled for secret manager");// most preferred, go to the front of the line!authMethods.add(AuthenticationMethod.TOKEN.getAuthMethod());}authMethods.add(confAuthenticationMethod.getAuthMethod());        LOG.debug("Server accepts auth methods:" + authMethods);return authMethods;}private void closeConnection(Connection connection) {connectionManager.close(connection);}/*** Setup response for the IPC Call.* * @param responseBuf buffer to serialize the response into* @param call {@link Call} to which we are setting up the response* @param status of the IPC call* @param rv return value for the IPC Call, if the call was successful* @param errorClass error class, if the the call failed* @param error error message, if the call failed* @throws IOException*/private void setupResponse(ByteArrayOutputStream responseBuf,Call call, RpcStatusProto status, RpcErrorCodeProto erCode,Writable rv, String errorClass, String error) throws IOException {responseBuf.reset();DataOutputStream out = new DataOutputStream(responseBuf);RpcResponseHeaderProto.Builder headerBuilder =  RpcResponseHeaderProto.newBuilder();headerBuilder.setClientId(ByteString.copyFrom(call.clientId));headerBuilder.setCallId(call.callId);headerBuilder.setRetryCount(call.retryCount);headerBuilder.setStatus(status);headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);if (status == RpcStatusProto.SUCCESS) {RpcResponseHeaderProto header = headerBuilder.build();final int headerLen = header.getSerializedSize();int fullLength  = CodedOutputStream.computeRawVarint32Size(headerLen) +headerLen;try {if (rv instanceof ProtobufRpcEngine.RpcWrapper) {ProtobufRpcEngine.RpcWrapper resWrapper = (ProtobufRpcEngine.RpcWrapper) rv;fullLength += resWrapper.getLength();out.writeInt(fullLength);header.writeDelimitedTo(out);rv.write(out);} else { // Have to serialize to buffer to get lenfinal DataOutputBuffer buf = new DataOutputBuffer();rv.write(buf);byte[] data = buf.getData();fullLength += buf.getLength();out.writeInt(fullLength);header.writeDelimitedTo(out);out.write(data, 0, buf.getLength());}} catch (Throwable t) {LOG.warn("Error serializing call response for call " + call, t);// Call back to same function - this is OK since the// buffer is reset at the top, and since status is changed// to ERROR it won't infinite loop.setupResponse(responseBuf, call, RpcStatusProto.ERROR,RpcErrorCodeProto.ERROR_SERIALIZING_RESPONSE,null, t.getClass().getName(),StringUtils.stringifyException(t));return;}} else { // Rpc FailureheaderBuilder.setExceptionClassName(errorClass);headerBuilder.setErrorMsg(error);headerBuilder.setErrorDetail(erCode);RpcResponseHeaderProto header = headerBuilder.build();int headerLen = header.getSerializedSize();final int fullLength  = CodedOutputStream.computeRawVarint32Size(headerLen) + headerLen;out.writeInt(fullLength);header.writeDelimitedTo(out);}if (call.connection.useWrap) {wrapWithSasl(responseBuf, call);}call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray()));}/*** Setup response for the IPC Call on Fatal Error from a * client that is using old version of Hadoop.* The response is serialized using the previous protocol's response* layout.* * @param response buffer to serialize the response into* @param call {@link Call} to which we are setting up the response* @param rv return value for the IPC Call, if the call was successful* @param errorClass error class, if the the call failed* @param error error message, if the call failed* @throws IOException*/private void setupResponseOldVersionFatal(ByteArrayOutputStream response, Call call,Writable rv, String errorClass, String error) throws IOException {final int OLD_VERSION_FATAL_STATUS = -1;response.reset();DataOutputStream out = new DataOutputStream(response);out.writeInt(call.callId);                // write call idout.writeInt(OLD_VERSION_FATAL_STATUS);   // write FATAL_STATUSWritableUtils.writeString(out, errorClass);WritableUtils.writeString(out, error);if (call.connection.useWrap) {wrapWithSasl(response, call);}call.setResponse(ByteBuffer.wrap(response.toByteArray()));}private void wrapWithSasl(ByteArrayOutputStream response, Call call)throws IOException {if (call.connection.saslServer != null) {byte[] token = response.toByteArray();// synchronization may be needed since there can be multiple Handler// threads using saslServer to wrap responses.synchronized (call.connection.saslServer) {token = call.connection.saslServer.wrap(token, 0, token.length);}if (LOG.isDebugEnabled())LOG.debug("Adding saslServer wrapped token of size " + token.length+ " as call response.");response.reset();// rebuild with sasl header and payloadRpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder().setCallId(AuthProtocol.SASL.callId).setStatus(RpcStatusProto.SUCCESS).build();RpcSaslProto saslMessage = RpcSaslProto.newBuilder().setState(SaslState.WRAP).setToken(ByteString.copyFrom(token, 0, token.length)).build();RpcResponseMessageWrapper saslResponse =new RpcResponseMessageWrapper(saslHeader, saslMessage);DataOutputStream out = new DataOutputStream(response);out.writeInt(saslResponse.getLength());saslResponse.write(out);}}Configuration getConf() {return conf;}/** Sets the socket buffer size used for responding to RPCs */public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }/** Starts the service.  Must be called before any calls will be handled. */public synchronized void start() {responder.start();listener.start();handlers = new Handler[handlerCount];for (int i = 0; i < handlerCount; i++) {handlers[i] = new Handler(i);handlers[i].start();}}/** Stops the service.  No new calls will be handled after this is called. */public synchronized void stop() {LOG.info("Stopping server on " + port);running = false;if (handlers != null) {for (int i = 0; i < handlerCount; i++) {if (handlers[i] != null) {handlers[i].interrupt();}}}listener.interrupt();listener.doStop();responder.interrupt();notifyAll();this.rpcMetrics.shutdown();this.rpcDetailedMetrics.shutdown();}/** Wait for the server to be stopped.* Does not wait for all subthreads to finish.*  See {@link #stop()}.*/public synchronized void join() throws InterruptedException {while (running) {wait();}}/*** Return the socket (ip+port) on which the RPC server is listening to.* @return the socket (ip+port) on which the RPC server is listening to.*/public synchronized InetSocketAddress getListenerAddress() {return listener.getAddress();}/** * Called for each call. * @deprecated Use  {@link #call(RpcPayloadHeader.RpcKind, String,*  Writable, long)} instead*/@Deprecatedpublic Writable call(Writable param, long receiveTime) throws Exception {return call(RPC.RpcKind.RPC_BUILTIN, null, param, receiveTime);}/** Called for each call. */public abstract Writable call(RPC.RpcKind rpcKind, String protocol,Writable param, long receiveTime) throws Exception;/*** Authorize the incoming client connection.* * @param user client user* @param protocolName - the protocol* @param addr InetAddress of incoming connection* @throws AuthorizationException when the client isn't authorized to talk the protocol*/private void authorize(UserGroupInformation user, String protocolName,InetAddress addr) throws AuthorizationException {if (authorize) {if (protocolName == null) {throw new AuthorizationException("Null protocol not authorized");}Class<?> protocol = null;try {protocol = getProtocolClass(protocolName, getConf());} catch (ClassNotFoundException cfne) {throw new AuthorizationException("Unknown protocol: " + protocolName);}serviceAuthorizationManager.authorize(user, protocol, getConf(), addr);}}/*** Get the port on which the IPC Server is listening for incoming connections.* This could be an ephemeral port too, in which case we return the real* port on which the Server has bound.* @return port on which IPC Server is listening*/public int getPort() {return port;}/*** The number of open RPC conections* @return the number of open rpc connections*/public int getNumOpenConnections() {return connectionManager.size();}/*** The number of rpc calls in the queue.* @return The number of rpc calls in the queue.*/public int getCallQueueLen() {return callQueue.size();}/*** The maximum size of the rpc call queue of this server.* @return The maximum size of the rpc call queue.*/public int getMaxQueueSize() {return maxQueueSize;}/*** The number of reader threads for this server.* @return The number of reader threads.*/public int getNumReaders() {return readThreads;}/*** When the read or write buffer size is larger than this limit, i/o will be * done in chunks of this size. Most RPC requests and responses would be* be smaller.*/private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB./*** This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}.* If the amount of data is large, it writes to channel in smaller chunks. * This is to avoid jdk from creating many direct buffers as the size of * buffer increases. This also minimizes extra copies in NIO layer* as a result of multiple write operations required to write a large * buffer.  ** @see WritableByteChannel#write(ByteBuffer)*/private int channelWrite(WritableByteChannel channel, ByteBuffer buffer) throws IOException {int count =  (buffer.remaining() <= NIO_BUFFER_LIMIT) ?channel.write(buffer) : channelIO(null, channel, buffer);if (count > 0) {rpcMetrics.incrSentBytes(count);}return count;}/*** This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}.* If the amount of data is large, it writes to channel in smaller chunks. * This is to avoid jdk from creating many direct buffers as the size of * ByteBuffer increases. There should not be any performance degredation.* * @see ReadableByteChannel#read(ByteBuffer)*/private int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?channel.read(buffer) : channelIO(channel, null, buffer);if (count > 0) {rpcMetrics.incrReceivedBytes(count);}return count;}/*** Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)}* and {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only* one of readCh or writeCh should be non-null.* * @see #channelRead(ReadableByteChannel, ByteBuffer)* @see #channelWrite(WritableByteChannel, ByteBuffer)*/private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh,ByteBuffer buf) throws IOException {int originalLimit = buf.limit();int initialRemaining = buf.remaining();int ret = 0;while (buf.remaining() > 0) {try {int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);buf.limit(buf.position() + ioSize);ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); if (ret < ioSize) {break;}} finally {buf.limit(originalLimit);        }}int nBytes = initialRemaining - buf.remaining(); return (nBytes > 0) ? nBytes : ret;}private class ConnectionManager {final private AtomicInteger count = new AtomicInteger();    final private Set<Connection> connections;final private Timer idleScanTimer;final private int idleScanThreshold;final private int idleScanInterval;final private int maxIdleTime;final private int maxIdleToClose;final private int maxConnections;ConnectionManager() {this.idleScanTimer = new Timer("IPC Server idle connection scanner for port " + getPort(), true);this.idleScanThreshold = conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY,CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT);this.idleScanInterval = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY,CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_DEFAULT);this.maxIdleTime = 2 * conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);this.maxIdleToClose = conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY,CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT);this.maxConnections = conf.getInt(CommonConfigurationKeysPublic.IPC_SERVER_MAX_CONNECTIONS_KEY,CommonConfigurationKeysPublic.IPC_SERVER_MAX_CONNECTIONS_DEFAULT);// create a set with concurrency -and- a thread-safe iterator, add 2// for listener and idle closer threadsthis.connections = Collections.newSetFromMap(new ConcurrentHashMap<Connection,Boolean>(maxQueueSize, 0.75f, readThreads+2));}private boolean add(Connection connection) {boolean added = connections.add(connection);if (added) {count.getAndIncrement();}return added;}private boolean remove(Connection connection) {boolean removed = connections.remove(connection);if (removed) {count.getAndDecrement();}return removed;}int size() {return count.get();}boolean isFull() {// The check is disabled when maxConnections <= 0.return ((maxConnections > 0) && (size() >= maxConnections));}Connection[] toArray() {return connections.toArray(new Connection[0]);}Connection register(SocketChannel channel) {if (isFull()) {return null;}Connection connection = new Connection(channel, Time.now());add(connection);if (LOG.isDebugEnabled()) {LOG.debug("Server connection from " + connection +"; # active connections: " + size() +"; # queued calls: " + callQueue.size());}      return connection;}boolean close(Connection connection) {boolean exists = remove(connection);if (exists) {if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() +": disconnecting client " + connection +". Number of active connections: "+ size());}// only close if actually removed to avoid double-closing due// to possible racesconnection.close();}return exists;}// synch'ed to avoid explicit invocation upon OOM from colliding with// timer task firingsynchronized void closeIdle(boolean scanAll) {long minLastContact = Time.now() - maxIdleTime;// concurrent iterator might miss new connections added// during the iteration, but that's ok because they won't// be idle yet anyway and will be caught on next scanint closed = 0;for (Connection connection : connections) {// stop if connections dropped below threshold unless scanning allif (!scanAll && size() < idleScanThreshold) {break;}// stop if not scanning all and max connections are closedif (connection.isIdle() &&connection.getLastContact() < minLastContact &&close(connection) &&!scanAll && (++closed == maxIdleToClose)) {break;}}}void closeAll() {// use a copy of the connections to be absolutely sure the concurrent// iterator doesn't miss a connectionfor (Connection connection : toArray()) {close(connection);}}void startIdleScan() {scheduleIdleScanTask();}void stopIdleScan() {idleScanTimer.cancel();}private void scheduleIdleScanTask() {if (!running) {return;}TimerTask idleScanTask = new TimerTask(){@Overridepublic void run() {if (!running) {return;}if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName()+": task running");}try {closeIdle(false);} finally {// explicitly reschedule so next execution occurs relative// to the end of this scan, not the beginningscheduleIdleScanTask();}}};idleScanTimer.schedule(idleScanTask, idleScanInterval);}}
}

6、客户端的网络通信

客户端的网络通信由Client类实现。客户端的Invoker关联了一个Client对象,在初始化Invoker的同时会为该Client对象赋值。

 /*** This constructor takes a connectionId, instead of creating a new one.*/private Invoker(Class<?> protocol, Client.ConnectionId connId,Configuration conf, SocketFactory factory) {this.remoteId = connId;this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class);this.protocolName = RPC.getProtocolName(protocol);this.clientProtocolVersion = RPC.getProtocolVersion(protocol);}

在Invoker#invoke()方法中,对消息进行编码后,就会调用Client#call()方法进行网络通信远程调用,返回RpcResponseWrapper。截取invoke()方法部分代码如下:

val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,fallbackToSimpleAuth);

Client类的源码实现如下:

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.hadoop.ipc;import static org.apache.hadoop.ipc.RpcConstants.*;import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;import javax.net.SocketFactory;
import javax.security.sasl.Sasl;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.AuthProtocol;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.SaslRpcClient;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.htrace.Trace;import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.CodedOutputStream;/** A client for an IPC service.  IPC calls take a single {@link Writable} as a* parameter, and return a {@link Writable} as their value.  A service runs on* a port and is defined by a parameter class and a value class.* * @see Server*/
public class Client {public static final Log LOG = LogFactory.getLog(Client.class);/** A counter for generating call IDs. */private static final AtomicInteger callIdCounter = new AtomicInteger();private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();/** Set call id and retry count for the next call. */public static void setCallIdAndRetryCount(int cid, int rc) {Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID);Preconditions.checkState(callId.get() == null);Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT);callId.set(cid);retryCount.set(rc);}private final Cache<ConnectionId, Connection> connections =CacheBuilder.newBuilder().build();private Class<? extends Writable> valueClass;   // class of call valuesprivate AtomicBoolean running = new AtomicBoolean(true); // if client runsfinal private Configuration conf;private SocketFactory socketFactory;           // how to create socketsprivate int refCount = 1;private final int connectionTimeout;private final boolean fallbackAllowed;private final byte[] clientId;final static int CONNECTION_CONTEXT_CALL_ID = -3;/*** Executor on which IPC calls' parameters are sent.* Deferring the sending of parameters to a separate* thread isolates them from thread interruptions in the* calling code.*/private final ExecutorService sendParamsExecutor;private final static ClientExecutorServiceFactory clientExcecutorFactory =new ClientExecutorServiceFactory();private static class ClientExecutorServiceFactory {private int executorRefCount = 0;private ExecutorService clientExecutor = null;/*** Get Executor on which IPC calls' parameters are sent.* If the internal reference counter is zero, this method* creates the instance of Executor. If not, this method* just returns the reference of clientExecutor.* * @return An ExecutorService instance*/synchronized ExecutorService refAndGetInstance() {if (executorRefCount == 0) {clientExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("IPC Parameter Sending Thread #%d").build());}executorRefCount++;return clientExecutor;}/*** Cleanup Executor on which IPC calls' parameters are sent.* If reference counter is zero, this method discards the* instance of the Executor. If not, this method* just decrements the internal reference counter.* * @return An ExecutorService instance if it exists.*   Null is returned if not.*/synchronized ExecutorService unrefAndCleanup() {executorRefCount--;assert(executorRefCount >= 0);if (executorRefCount == 0) {clientExecutor.shutdown();try {if (!clientExecutor.awaitTermination(1, TimeUnit.MINUTES)) {clientExecutor.shutdownNow();}} catch (InterruptedException e) {LOG.warn("Interrupted while waiting for clientExecutor" +" to stop");clientExecutor.shutdownNow();Thread.currentThread().interrupt();}clientExecutor = null;}return clientExecutor;}};/*** set the ping interval value in configuration* * @param conf Configuration* @param pingInterval the ping interval*/final public static void setPingInterval(Configuration conf, int pingInterval) {conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval);}/*** Get the ping interval from configuration;* If not set in the configuration, return the default value.* * @param conf Configuration* @return the ping interval*/final public static int getPingInterval(Configuration conf) {return conf.getInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);}/*** The time after which a RPC will timeout.* If ping is not enabled (via ipc.client.ping), then the timeout value is the * same as the pingInterval.* If ping is enabled, then there is no timeout value.* * @param conf Configuration* @return the timeout period in milliseconds. -1 if no timeout value is set*/final public static int getTimeout(Configuration conf) {if (!conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY,CommonConfigurationKeys.IPC_CLIENT_PING_DEFAULT)) {return getPingInterval(conf);}return -1;}/*** set the connection timeout value in configuration* * @param conf Configuration* @param timeout the socket connect timeout value*/public static final void setConnectTimeout(Configuration conf, int timeout) {conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, timeout);}@VisibleForTestingpublic static final ExecutorService getClientExecutor() {return Client.clientExcecutorFactory.clientExecutor;}/*** Increment this client's reference count**/synchronized void incCount() {refCount++;}/*** Decrement this client's reference count**/synchronized void decCount() {refCount--;}/*** Return if this client has no reference* * @return true if this client has no reference; false otherwise*/synchronized boolean isZeroReference() {return refCount==0;}/** Check the rpc response header. */void checkResponse(RpcResponseHeaderProto header) throws IOException {if (header == null) {throw new EOFException("Response is null.");}if (header.hasClientId()) {// check client IDsfinal byte[] id = header.getClientId().toByteArray();if (!Arrays.equals(id, RpcConstants.DUMMY_CLIENT_ID)) {if (!Arrays.equals(id, clientId)) {throw new IOException("Client IDs not matched: local ID="+ StringUtils.byteToHexString(clientId) + ", ID in response="+ StringUtils.byteToHexString(header.getClientId().toByteArray()));}}}}Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {return new Call(rpcKind, rpcRequest);}/** * Class that represents an RPC call*/static class Call {final int id;               // call idfinal int retry;           // retry countfinal Writable rpcRequest;  // the serialized rpc requestWritable rpcResponse;       // null if rpc has errorIOException error;          // exception, null if successfinal RPC.RpcKind rpcKind;      // Rpc EngineKindboolean done;               // true when call is doneprivate Call(RPC.RpcKind rpcKind, Writable param) {this.rpcKind = rpcKind;this.rpcRequest = param;final Integer id = callId.get();if (id == null) {this.id = nextCallId();} else {callId.set(null);this.id = id;}final Integer rc = retryCount.get();if (rc == null) {this.retry = 0;} else {this.retry = rc;}}/** Indicate when the call is complete and the* value or error are available.  Notifies by default.  */protected synchronized void callComplete() {this.done = true;notify();                                 // notify caller}/** Set the exception when there is an error.* Notify the caller the call is done.* * @param error exception thrown by the call; either local or remote*/public synchronized void setException(IOException error) {this.error = error;callComplete();}/** Set the return value when there is no error. * Notify the caller the call is done.* * @param rpcResponse return value of the rpc call.*/public synchronized void setRpcResponse(Writable rpcResponse) {this.rpcResponse = rpcResponse;callComplete();}public synchronized Writable getRpcResponse() {return rpcResponse;}}/** Thread that reads responses and notifies callers.  Each connection owns a* socket connected to a remote address.  Calls are multiplexed through this* socket: responses may be delivered out of order. */private class Connection extends Thread {private InetSocketAddress server;             // server ip:portprivate final ConnectionId remoteId;                // connection idprivate AuthMethod authMethod; // authentication methodprivate AuthProtocol authProtocol;private int serviceClass;private SaslRpcClient saslRpcClient;private Socket socket = null;                 // connected socketprivate DataInputStream in;private DataOutputStream out;private int rpcTimeout;private int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecsprivate final RetryPolicy connectionRetryPolicy;private final int maxRetriesOnSasl;private int maxRetriesOnSocketTimeouts;private boolean tcpNoDelay; // if T then disable Nagle's Algorithmprivate boolean doPing; //do we need to send ping messageprivate int pingInterval; // how often sends ping to the server in msecsprivate ByteArrayOutputStream pingRequest; // ping message// currently active callsprivate Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();private AtomicLong lastActivity = new AtomicLong();// last I/O activity timeprivate AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closedprivate IOException closeException; // close reasonprivate final Object sendRpcRequestLock = new Object();public Connection(ConnectionId remoteId, int serviceClass) throws IOException {this.remoteId = remoteId;this.server = remoteId.getAddress();if (server.isUnresolved()) {throw NetUtils.wrapException(server.getHostName(),server.getPort(),null,0,new UnknownHostException());}this.rpcTimeout = remoteId.getRpcTimeout();this.maxIdleTime = remoteId.getMaxIdleTime();this.connectionRetryPolicy = remoteId.connectionRetryPolicy;this.maxRetriesOnSasl = remoteId.getMaxRetriesOnSasl();this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts();this.tcpNoDelay = remoteId.getTcpNoDelay();this.doPing = remoteId.getDoPing();if (doPing) {// construct a RPC header with the callId as the ping callIdpingRequest = new ByteArrayOutputStream();RpcRequestHeaderProto pingHeader = ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,RpcConstants.INVALID_RETRY_COUNT, clientId);pingHeader.writeDelimitedTo(pingRequest);}this.pingInterval = remoteId.getPingInterval();this.serviceClass = serviceClass;if (LOG.isDebugEnabled()) {LOG.debug("The ping interval is " + this.pingInterval + " ms.");}UserGroupInformation ticket = remoteId.getTicket();// try SASL if security is enabled or if the ugi contains tokens.// this causes a SIMPLE client with tokens to attempt SASLboolean trySasl = UserGroupInformation.isSecurityEnabled() ||(ticket != null && !ticket.getTokens().isEmpty());this.authProtocol = trySasl ? AuthProtocol.SASL : AuthProtocol.NONE;this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +server.toString() +" from " + ((ticket==null)?"an unknown user":ticket.getUserName()));this.setDaemon(true);}/** Update lastActivity with the current time. */private void touch() {lastActivity.set(Time.now());}/*** Add a call to this connection's call queue and notify* a listener; synchronized.* Returns false if called during shutdown.* @param call to add* @return true if the call was added.*/private synchronized boolean addCall(Call call) {if (shouldCloseConnection.get())return false;calls.put(call.id, call);notify();return true;}/** This class sends a ping to the remote side when timeout on* reading. If no failure is detected, it retries until at least* a byte is read.*/private class PingInputStream extends FilterInputStream {/* constructor */protected PingInputStream(InputStream in) {super(in);}/* Process timeout exception* if the connection is not going to be closed or * is not configured to have a RPC timeout, send a ping.* (if rpcTimeout is not set to be 0, then RPC should timeout.* otherwise, throw the timeout exception.*/private void handleTimeout(SocketTimeoutException e) throws IOException {if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {throw e;} else {sendPing();}}/** Read a byte from the stream.* Send a ping if timeout on read. Retries if no failure is detected* until a byte is read.* @throws IOException for any IO problem other than socket timeout*/@Overridepublic int read() throws IOException {do {try {return super.read();} catch (SocketTimeoutException e) {handleTimeout(e);}} while (true);}/** Read bytes into a buffer starting from offset <code>off</code>* Send a ping if timeout on read. Retries if no failure is detected* until a byte is read.* * @return the total number of bytes read; -1 if the connection is closed.*/@Overridepublic int read(byte[] buf, int off, int len) throws IOException {do {try {return super.read(buf, off, len);} catch (SocketTimeoutException e) {handleTimeout(e);}} while (true);}}private synchronized void disposeSasl() {if (saslRpcClient != null) {try {saslRpcClient.dispose();saslRpcClient = null;} catch (IOException ignored) {}}}private synchronized boolean shouldAuthenticateOverKrb() throws IOException {UserGroupInformation loginUser = UserGroupInformation.getLoginUser();UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();UserGroupInformation realUser = currentUser.getRealUser();if (authMethod == AuthMethod.KERBEROS && loginUser != null &&// Make sure user logged in using Kerberos either keytab or TGTloginUser.hasKerberosCredentials() &&// relogin only in case it is the login user (e.g. JT)// or superuser (like oozie).(loginUser.equals(currentUser) || loginUser.equals(realUser))) {return true;}return false;}private synchronized AuthMethod setupSaslConnection(final InputStream in2, final OutputStream out2) throws IOException {// Do not use Client.conf here! We must use ConnectionId.conf, since the// Client object is cached and shared between all RPC clients, even those// for separate services.saslRpcClient = new SaslRpcClient(remoteId.getTicket(),remoteId.getProtocol(), remoteId.getAddress(), remoteId.conf);return saslRpcClient.saslConnect(in2, out2);}/*** Update the server address if the address corresponding to the host* name has changed.** @return true if an addr change was detected.* @throws IOException when the hostname cannot be resolved.*/private synchronized boolean updateAddress() throws IOException {// Do a fresh lookup with the old host name.InetSocketAddress currentAddr = NetUtils.createSocketAddrForHost(server.getHostName(), server.getPort());if (!server.equals(currentAddr)) {LOG.warn("Address change detected. Old: " + server.toString() +" New: " + currentAddr.toString());server = currentAddr;return true;}return false;}private synchronized void setupConnection() throws IOException {short ioFailures = 0;short timeoutFailures = 0;while (true) {try {this.socket = socketFactory.createSocket();this.socket.setTcpNoDelay(tcpNoDelay);this.socket.setKeepAlive(true);/** Bind the socket to the host specified in the principal name of the* client, to ensure Server matching address of the client connection* to host name in principal passed.*/UserGroupInformation ticket = remoteId.getTicket();if (ticket != null && ticket.hasKerberosCredentials()) {KerberosInfo krbInfo = remoteId.getProtocol().getAnnotation(KerberosInfo.class);if (krbInfo != null && krbInfo.clientPrincipal() != null) {String host = SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName());// If host name is a valid local address then bind socket to itInetAddress localAddr = NetUtils.getLocalInetAddress(host);if (localAddr != null) {this.socket.bind(new InetSocketAddress(localAddr, 0));}}}NetUtils.connect(this.socket, server, connectionTimeout);if (rpcTimeout > 0) {pingInterval = rpcTimeout;  // rpcTimeout overwrites pingInterval}this.socket.setSoTimeout(pingInterval);return;} catch (ConnectTimeoutException toe) {/* Check for an address change and update the local reference.* Reset the failure counter if the address was changed*/if (updateAddress()) {timeoutFailures = ioFailures = 0;}handleConnectionTimeout(timeoutFailures++,maxRetriesOnSocketTimeouts, toe);} catch (IOException ie) {if (updateAddress()) {timeoutFailures = ioFailures = 0;}handleConnectionFailure(ioFailures++, ie);}}}/*** If multiple clients with the same principal try to connect to the same* server at the same time, the server assumes a replay attack is in* progress. This is a feature of kerberos. In order to work around this,* what is done is that the client backs off randomly and tries to initiate* the connection again. The other problem is to do with ticket expiry. To* handle that, a relogin is attempted.*/private synchronized void handleSaslConnectionFailure(final int currRetries, final int maxRetries, final Exception ex,final Random rand, final UserGroupInformation ugi) throws IOException,InterruptedException {ugi.doAs(new PrivilegedExceptionAction<Object>() {@Overridepublic Object run() throws IOException, InterruptedException {final short MAX_BACKOFF = 5000;closeConnection();disposeSasl();if (shouldAuthenticateOverKrb()) {if (currRetries < maxRetries) {if(LOG.isDebugEnabled()) {LOG.debug("Exception encountered while connecting to "+ "the server : ", ex);}// try re-loginif (UserGroupInformation.isLoginKeytabBased()) {UserGroupInformation.getLoginUser().reloginFromKeytab();} else if (UserGroupInformation.isLoginTicketBased()) {UserGroupInformation.getLoginUser().reloginFromTicketCache();}// have granularity of milliseconds//we are sleeping with the Connection lock held but since this//connection instance is being used for connecting to the server//in question, it is okayThread.sleep((rand.nextInt(MAX_BACKOFF) + 1));return null;} else {String msg = "Couldn't setup connection for "+ UserGroupInformation.getLoginUser().getUserName() + " to "+ remoteId;LOG.warn(msg, ex);throw (IOException) new IOException(msg).initCause(ex);}} else {LOG.warn("Exception encountered while connecting to "+ "the server : ", ex);}if (ex instanceof RemoteException)throw (RemoteException) ex;throw new IOException(ex);}});}/** Connect to the server and set up the I/O streams. It then sends* a header to the server and starts* the connection thread that waits for responses.*/private synchronized void setupIOstreams(AtomicBoolean fallbackToSimpleAuth) {if (socket != null || shouldCloseConnection.get()) {return;} try {if (LOG.isDebugEnabled()) {LOG.debug("Connecting to "+server);}if (Trace.isTracing()) {Trace.addTimelineAnnotation("IPC client connecting to " + server);}short numRetries = 0;Random rand = null;while (true) {setupConnection();InputStream inStream = NetUtils.getInputStream(socket);OutputStream outStream = NetUtils.getOutputStream(socket);writeConnectionHeader(outStream);if (authProtocol == AuthProtocol.SASL) {final InputStream in2 = inStream;final OutputStream out2 = outStream;UserGroupInformation ticket = remoteId.getTicket();if (ticket.getRealUser() != null) {ticket = ticket.getRealUser();}try {authMethod = ticket.doAs(new PrivilegedExceptionAction<AuthMethod>() {@Overridepublic AuthMethod run()throws IOException, InterruptedException {return setupSaslConnection(in2, out2);}});} catch (Exception ex) {authMethod = saslRpcClient.getAuthMethod();if (rand == null) {rand = new Random();}handleSaslConnectionFailure(numRetries++, maxRetriesOnSasl, ex,rand, ticket);continue;}if (authMethod != AuthMethod.SIMPLE) {// Sasl connect is successful. Let's set up Sasl i/o streams.inStream = saslRpcClient.getInputStream(inStream);outStream = saslRpcClient.getOutputStream(outStream);// for testingremoteId.saslQop =(String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);LOG.debug("Negotiated QOP is :" + remoteId.saslQop);if (fallbackToSimpleAuth != null) {fallbackToSimpleAuth.set(false);}} else if (UserGroupInformation.isSecurityEnabled()) {if (!fallbackAllowed) {throw new IOException("Server asks us to fall back to SIMPLE " +"auth, but this client is configured to only allow secure " +"connections.");}if (fallbackToSimpleAuth != null) {fallbackToSimpleAuth.set(true);}}}if (doPing) {inStream = new PingInputStream(inStream);}this.in = new DataInputStream(new BufferedInputStream(inStream));// SASL may have already buffered the streamif (!(outStream instanceof BufferedOutputStream)) {outStream = new BufferedOutputStream(outStream);}this.out = new DataOutputStream(outStream);writeConnectionContext(remoteId, authMethod);// update last activity timetouch();if (Trace.isTracing()) {Trace.addTimelineAnnotation("IPC client connected to " + server);}// start the receiver thread after the socket connection has been set// upstart();return;}} catch (Throwable t) {if (t instanceof IOException) {markClosed((IOException)t);} else {markClosed(new IOException("Couldn't set up IO streams", t));}close();}}private void closeConnection() {if (socket == null) {return;}// close the current connectiontry {socket.close();} catch (IOException e) {LOG.warn("Not able to close a socket", e);}// set socket to null so that the next call to setupIOstreams// can start the process of connect all over again.socket = null;}/* Handle connection failures due to timeout on connect** If the current number of retries is equal to the max number of retries,* stop retrying and throw the exception; Otherwise backoff 1 second and* try connecting again.** This Method is only called from inside setupIOstreams(), which is* synchronized. Hence the sleep is synchronized; the locks will be retained.** @param curRetries current number of retries* @param maxRetries max number of retries allowed* @param ioe failure reason* @throws IOException if max number of retries is reached*/private void handleConnectionTimeout(int curRetries, int maxRetries, IOException ioe) throws IOException {closeConnection();// throw the exception if the maximum number of retries is reachedif (curRetries >= maxRetries) {throw ioe;}LOG.info("Retrying connect to server: " + server + ". Already tried "+ curRetries + " time(s); maxRetries=" + maxRetries);}private void handleConnectionFailure(int curRetries, IOException ioe) throws IOException {closeConnection();final RetryAction action;try {action = connectionRetryPolicy.shouldRetry(ioe, curRetries, 0, true);} catch(Exception e) {throw e instanceof IOException? (IOException)e: new IOException(e);}if (action.action == RetryAction.RetryDecision.FAIL) {if (action.reason != null) {LOG.warn("Failed to connect to server: " + server + ": "+ action.reason, ioe);}throw ioe;}// Throw the exception if the thread is interruptedif (Thread.currentThread().isInterrupted()) {LOG.warn("Interrupted while trying for connection");throw ioe;}try {Thread.sleep(action.delayMillis);} catch (InterruptedException e) {throw (IOException)new InterruptedIOException("Interrupted: action="+ action + ", retry policy=" + connectionRetryPolicy).initCause(e);}LOG.info("Retrying connect to server: " + server + ". Already tried "+ curRetries + " time(s); retry policy is " + connectionRetryPolicy);}/*** Write the connection header - this is sent when connection is established* +----------------------------------+* |  "hrpc" 4 bytes                  |      * +----------------------------------+* |  Version (1 byte)                |* +----------------------------------+* |  Service Class (1 byte)          |* +----------------------------------+* |  AuthProtocol (1 byte)           |      * +----------------------------------+*/private void writeConnectionHeader(OutputStream outStream)throws IOException {DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));// Write out the header, version and authentication methodout.write(RpcConstants.HEADER.array());out.write(RpcConstants.CURRENT_VERSION);out.write(serviceClass);out.write(authProtocol.callId);out.flush();}/* Write the connection context header for each connection* Out is not synchronized because only the first thread does this.*/private void writeConnectionContext(ConnectionId remoteId,AuthMethod authMethod)throws IOException {// Write out the ConnectionHeaderIpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext(RPC.getProtocolName(remoteId.getProtocol()),remoteId.getTicket(),authMethod);RpcRequestHeaderProto connectionContextHeader = ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,RpcConstants.INVALID_RETRY_COUNT, clientId);RpcRequestMessageWrapper request =new RpcRequestMessageWrapper(connectionContextHeader, message);// Write out the packet lengthout.writeInt(request.getLength());request.write(out);}/* wait till someone signals us to start reading RPC response or* it is idle too long, it is marked as to be closed, * or the client is marked as not running.* * Return true if it is time to read a response; false otherwise.*/private synchronized boolean waitForWork() {if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {long timeout = maxIdleTime-(Time.now()-lastActivity.get());if (timeout>0) {try {wait(timeout);} catch (InterruptedException e) {}}}if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {return true;} else if (shouldCloseConnection.get()) {return false;} else if (calls.isEmpty()) { // idle connection closed or stoppedmarkClosed(null);return false;} else { // get stopped but there are still pending requests markClosed((IOException)new IOException().initCause(new InterruptedException()));return false;}}public InetSocketAddress getRemoteAddress() {return server;}/* Send a ping to the server if the time elapsed * since last I/O activity is equal to or greater than the ping interval*/private synchronized void sendPing() throws IOException {long curTime = Time.now();if ( curTime - lastActivity.get() >= pingInterval) {lastActivity.set(curTime);synchronized (out) {out.writeInt(pingRequest.size());pingRequest.writeTo(out);out.flush();}}}@Overridepublic void run() {if (LOG.isDebugEnabled())LOG.debug(getName() + ": starting, having connections " + connections.size());try {while (waitForWork()) {//wait here for work - read or close connectionreceiveRpcResponse();}} catch (Throwable t) {// This truly is unexpected, since we catch IOException in receiveResponse// -- this is only to be really sure that we don't leave a client hanging// forever.LOG.warn("Unexpected error reading responses on connection " + this, t);markClosed(new IOException("Error reading responses", t));}close();if (LOG.isDebugEnabled())LOG.debug(getName() + ": stopped, remaining connections "+ connections.size());}/** Initiates a rpc call by sending the rpc request to the remote server.* Note: this is not called from the Connection thread, but by other* threads.* @param call - the rpc request*/public void sendRpcRequest(final Call call)throws InterruptedException, IOException {if (shouldCloseConnection.get()) {return;}// Serialize the call to be sent. This is done from the actual// caller thread, rather than the sendParamsExecutor thread,// so that if the serialization throws an error, it is reported// properly. This also parallelizes the serialization.//// Format of a call on the wire:// 0) Length of rest below (1 + 2)// 1) RpcRequestHeader  - is serialized Delimited hence contains length// 2) RpcRequest//// Items '1' and '2' are prepared here. final DataOutputBuffer d = new DataOutputBuffer();RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,clientId);header.writeDelimitedTo(d);call.rpcRequest.write(d);synchronized (sendRpcRequestLock) {Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {@Overridepublic void run() {try {synchronized (Connection.this.out) {if (shouldCloseConnection.get()) {return;}if (LOG.isDebugEnabled())LOG.debug(getName() + " sending #" + call.id);byte[] data = d.getData();int totalLength = d.getLength();out.writeInt(totalLength); // Total Lengthout.write(data, 0, totalLength);// RpcRequestHeader + RpcRequestout.flush();}} catch (IOException e) {// exception at this point would leave the connection in an// unrecoverable state (eg half a call left on the wire).// So, close the connection, killing any outstanding callsmarkClosed(e);} finally {//the buffer is just an in-memory buffer, but it is still polite to// close earlyIOUtils.closeStream(d);}}});try {senderFuture.get();} catch (ExecutionException e) {Throwable cause = e.getCause();// cause should only be a RuntimeException as the Runnable above// catches IOExceptionif (cause instanceof RuntimeException) {throw (RuntimeException) cause;} else {throw new RuntimeException("unexpected checked exception", cause);}}}}/* Receive a response.* Because only one receiver, so no synchronization on in.*/private void receiveRpcResponse() {if (shouldCloseConnection.get()) {return;}touch();try {int totalLen = in.readInt();RpcResponseHeaderProto header = RpcResponseHeaderProto.parseDelimitedFrom(in);checkResponse(header);int headerLen = header.getSerializedSize();headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);int callId = header.getCallId();if (LOG.isDebugEnabled())LOG.debug(getName() + " got value #" + callId);Call call = calls.get(callId);RpcStatusProto status = header.getStatus();if (status == RpcStatusProto.SUCCESS) {Writable value = ReflectionUtils.newInstance(valueClass, conf);value.readFields(in);                 // read valuecalls.remove(callId);call.setRpcResponse(value);// verify that length was correct// only for ProtobufEngine where len can be verified easilyif (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {ProtobufRpcEngine.RpcWrapper resWrapper = (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();if (totalLen != headerLen + resWrapper.getLength()) { throw new RpcClientException("RPC response length mismatch on rpc success");}}} else { // Rpc Request failed// Verify that length was correctif (totalLen != headerLen) {throw new RpcClientException("RPC response length mismatch on rpc error");}final String exceptionClassName = header.hasExceptionClassName() ?header.getExceptionClassName() : "ServerDidNotSetExceptionClassName";final String errorMsg = header.hasErrorMsg() ? header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;final RpcErrorCodeProto erCode = (header.hasErrorDetail() ? header.getErrorDetail() : null);if (erCode == null) {LOG.warn("Detailed error code not set by server on rpc error");}RemoteException re = ( (erCode == null) ? new RemoteException(exceptionClassName, errorMsg) :new RemoteException(exceptionClassName, errorMsg, erCode));if (status == RpcStatusProto.ERROR) {calls.remove(callId);call.setException(re);} else if (status == RpcStatusProto.FATAL) {// Close the connectionmarkClosed(re);}}} catch (IOException e) {markClosed(e);}}private synchronized void markClosed(IOException e) {if (shouldCloseConnection.compareAndSet(false, true)) {closeException = e;notifyAll();}}/** Close the connection. */private synchronized void close() {if (!shouldCloseConnection.get()) {LOG.error("The connection is not in the closed state");return;}connections.invalidate(remoteId);// close the streams and therefore the socketIOUtils.closeStream(out);IOUtils.closeStream(in);disposeSasl();// clean up all callsif (closeException == null) {if (!calls.isEmpty()) {LOG.warn("A connection is closed for no cause and calls are not empty");// clean up calls anywaycloseException = new IOException("Unexpected closed connection");cleanupCalls();}} else {// log the infoif (LOG.isDebugEnabled()) {LOG.debug("closing ipc connection to " + server + ": " +closeException.getMessage(),closeException);}// cleanup callscleanupCalls();}closeConnection();if (LOG.isDebugEnabled())LOG.debug(getName() + ": closed");}/* Cleanup all calls and mark them as done */private void cleanupCalls() {Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;while (itor.hasNext()) {Call c = itor.next().getValue(); itor.remove();c.setException(closeException); // local exception}}}/** Construct an IPC client whose values are of the given {@link Writable}* class. */public Client(Class<? extends Writable> valueClass, Configuration conf, SocketFactory factory) {this.valueClass = valueClass;this.conf = conf;this.socketFactory = factory;this.connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY,CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);this.clientId = ClientId.getClientId();this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();}/*** Construct an IPC client with the default SocketFactory* @param valueClass* @param conf*/public Client(Class<? extends Writable> valueClass, Configuration conf) {this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));}/** Return the socket factory of this client** @return this client's socket factory*/SocketFactory getSocketFactory() {return socketFactory;}/** Stop all threads related to this client.  No further calls may be made* using this client. */public void stop() {if (LOG.isDebugEnabled()) {LOG.debug("Stopping client");}if (!running.compareAndSet(true, false)) {return;}// wake up all connectionsfor (Connection conn : connections.asMap().values()) {conn.interrupt();}// wait until all connections are closedwhile (connections.size() > 0) {try {Thread.sleep(100);} catch (InterruptedException e) {}}clientExcecutorFactory.unrefAndCleanup();}/*** Same as {@link #call(RPC.RpcKind, Writable, ConnectionId)}*  for RPC_BUILTIN*/public Writable call(Writable param, InetSocketAddress address)throws IOException {ConnectionId remoteId = ConnectionId.getConnectionId(address, null, null, 0,conf);return call(RpcKind.RPC_BUILTIN, param, remoteId);}/*** Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,* Class, UserGroupInformation, int, Configuration)}* except that rpcKind is writable.*/public Writable call(Writable param, InetSocketAddress addr,Class<?> protocol, UserGroupInformation ticket,int rpcTimeout, Configuration conf) throws IOException {ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,ticket, rpcTimeout, conf);return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);}/*** Same as {@link #call(Writable, InetSocketAddress,* Class, UserGroupInformation, int, Configuration)}* except that specifying serviceClass.*/public Writable call(Writable param, InetSocketAddress addr,Class<?> protocol, UserGroupInformation ticket,int rpcTimeout, int serviceClass, Configuration conf)throws IOException {ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,ticket, rpcTimeout, conf);return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass);}/*** Make a call, passing <code>param</code>, to the IPC server running at* <code>address</code> which is servicing the <code>protocol</code> protocol,* with the <code>ticket</code> credentials, <code>rpcTimeout</code> as* timeout and <code>conf</code> as conf for this connection, returning the* value. Throws exceptions if there are network problems or if the remote* code threw an exception.*/public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr, Class<?> protocol, UserGroupInformation ticket,int rpcTimeout, Configuration conf) throws IOException {ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,ticket, rpcTimeout, conf);return call(rpcKind, param, remoteId);}/*** Same as {link {@link #call(RPC.RpcKind, Writable, ConnectionId)}* except the rpcKind is RPC_BUILTIN*/public Writable call(Writable param, ConnectionId remoteId)throws IOException {return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);}/*** Make a call, passing <code>rpcRequest</code>, to the IPC server defined by* <code>remoteId</code>, returning the rpc respond.** @param rpcKind* @param rpcRequest -  contains serialized method and method parameters* @param remoteId - the target rpc server* @returns the rpc response* Throws exceptions if there are network problems or if the remote code * threw an exception.*/public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,ConnectionId remoteId) throws IOException {return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT);}/** * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by* <code>remoteId</code>, returning the rpc respond.** @param rpcKind* @param rpcRequest -  contains serialized method and method parameters* @param remoteId - the target rpc server* @param fallbackToSimpleAuth - set to true or false during this method to*   indicate if a secure client falls back to simple auth* @returns the rpc response* Throws exceptions if there are network problems or if the remote code* threw an exception.*/public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)throws IOException {return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,fallbackToSimpleAuth);}/*** Make a call, passing <code>rpcRequest</code>, to the IPC server defined by* <code>remoteId</code>, returning the rpc response.* * @param rpcKind* @param rpcRequest -  contains serialized method and method parameters* @param remoteId - the target rpc server* @param serviceClass - service class for RPC* @returns the rpc response* Throws exceptions if there are network problems or if the remote code * threw an exception.*/public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,ConnectionId remoteId, int serviceClass) throws IOException {return call(rpcKind, rpcRequest, remoteId, serviceClass, null);}/*** Make a call, passing <code>rpcRequest</code>, to the IPC server defined by* <code>remoteId</code>, returning the rpc response.** @param rpcKind* @param rpcRequest -  contains serialized method and method parameters* @param remoteId - the target rpc server* @param serviceClass - service class for RPC* @param fallbackToSimpleAuth - set to true or false during this method to*   indicate if a secure client falls back to simple auth* @returns the rpc response* Throws exceptions if there are network problems or if the remote code* threw an exception.*/public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,ConnectionId remoteId, int serviceClass,AtomicBoolean fallbackToSimpleAuth) throws IOException {final Call call = createCall(rpcKind, rpcRequest);Connection connection = getConnection(remoteId, call, serviceClass,fallbackToSimpleAuth);try {connection.sendRpcRequest(call);                 // send the rpc request} catch (RejectedExecutionException e) {throw new IOException("connection has been closed", e);} catch (InterruptedException e) {Thread.currentThread().interrupt();LOG.warn("interrupted waiting to send rpc request to server", e);throw new IOException(e);}synchronized (call) {while (!call.done) {try {call.wait();                           // wait for the result} catch (InterruptedException ie) {Thread.currentThread().interrupt();throw new InterruptedIOException("Call interrupted");}}if (call.error != null) {if (call.error instanceof RemoteException) {call.error.fillInStackTrace();throw call.error;} else { // local exceptionInetSocketAddress address = connection.getRemoteAddress();throw NetUtils.wrapException(address.getHostName(),address.getPort(),NetUtils.getHostname(),0,call.error);}} else {return call.getRpcResponse();}}}// for unit testing only@InterfaceAudience.Private@InterfaceStability.UnstableSet<ConnectionId> getConnectionIds() {return connections.asMap().keySet();}/** Get a connection from the pool, or create a new one and add it to the* pool.  Connections to a given ConnectionId are reused. */private Connection getConnection(final ConnectionId remoteId,Call call, final int serviceClass, AtomicBoolean fallbackToSimpleAuth)throws IOException {if (!running.get()) {// the client is stoppedthrow new IOException("The client is stopped");}Connection connection;/* we could avoid this allocation for each RPC by having a  * connectionsId object and with set() method. We need to manage the* refs for keys in HashMap properly. For now its ok.*/while(true) {try {connection = connections.get(remoteId, new Callable<Connection>() {@Overridepublic Connection call() throws Exception {return new Connection(remoteId, serviceClass);}});} catch (ExecutionException e) {Throwable cause = e.getCause();// the underlying exception should normally be IOExceptionif (cause instanceof IOException) {throw (IOException) cause;} else {throw new IOException(cause);}}if (connection.addCall(call)) {break;} else {connections.invalidate(remoteId);}}//we don't invoke the method below inside "synchronized (connections)"//block above. The reason for that is if the server happens to be slow,//it will take longer to establish a connection and that will slow the//entire system down.connection.setupIOstreams(fallbackToSimpleAuth);return connection;}/*** This class holds the address and the user ticket. The client connections* to servers are uniquely identified by <remoteAddress, protocol, ticket>*/@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})@InterfaceStability.Evolvingpublic static class ConnectionId {InetSocketAddress address;UserGroupInformation ticket;final Class<?> protocol;private static final int PRIME = 16777619;private final int rpcTimeout;private final int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecsprivate final RetryPolicy connectionRetryPolicy;private final int maxRetriesOnSasl;// the max. no. of retries for socket connections on time out exceptionsprivate final int maxRetriesOnSocketTimeouts;private final boolean tcpNoDelay; // if T then disable Nagle's Algorithmprivate final boolean doPing; //do we need to send ping messageprivate final int pingInterval; // how often sends ping to the server in msecsprivate String saslQop; // here for testingprivate final Configuration conf; // used to get the expected kerberos principal nameConnectionId(InetSocketAddress address, Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,RetryPolicy connectionRetryPolicy, Configuration conf) {this.protocol = protocol;this.address = address;this.ticket = ticket;this.rpcTimeout = rpcTimeout;this.connectionRetryPolicy = connectionRetryPolicy;this.maxIdleTime = conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);this.maxRetriesOnSasl = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY,CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT);this.maxRetriesOnSocketTimeouts = conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);this.tcpNoDelay = conf.getBoolean(CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_DEFAULT);this.doPing = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY,CommonConfigurationKeys.IPC_CLIENT_PING_DEFAULT);this.pingInterval = (doPing ? Client.getPingInterval(conf) : 0);this.conf = conf;}InetSocketAddress getAddress() {return address;}Class<?> getProtocol() {return protocol;}UserGroupInformation getTicket() {return ticket;}private int getRpcTimeout() {return rpcTimeout;}int getMaxIdleTime() {return maxIdleTime;}public int getMaxRetriesOnSasl() {return maxRetriesOnSasl;}/** max connection retries on socket time outs */public int getMaxRetriesOnSocketTimeouts() {return maxRetriesOnSocketTimeouts;}boolean getTcpNoDelay() {return tcpNoDelay;}boolean getDoPing() {return doPing;}int getPingInterval() {return pingInterval;}@VisibleForTestingString getSaslQop() {return saslQop;}static ConnectionId getConnectionId(InetSocketAddress addr,Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,Configuration conf) throws IOException {return getConnectionId(addr, protocol, ticket, rpcTimeout, null, conf);}/*** Returns a ConnectionId object. * @param addr Remote address for the connection.* @param protocol Protocol for RPC.* @param ticket UGI* @param rpcTimeout timeout* @param conf Configuration object* @return A ConnectionId instance* @throws IOException*/static ConnectionId getConnectionId(InetSocketAddress addr,Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,RetryPolicy connectionRetryPolicy, Configuration conf) throws IOException {if (connectionRetryPolicy == null) {final int max = conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT);final int retryInterval = conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY,CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_DEFAULT);connectionRetryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(max, retryInterval, TimeUnit.MILLISECONDS);}return new ConnectionId(addr, protocol, ticket, rpcTimeout,connectionRetryPolicy, conf);}static boolean isEqual(Object a, Object b) {return a == null ? b == null : a.equals(b);}@Overridepublic boolean equals(Object obj) {if (obj == this) {return true;}if (obj instanceof ConnectionId) {ConnectionId that = (ConnectionId) obj;return isEqual(this.address, that.address)&& this.doPing == that.doPing&& this.maxIdleTime == that.maxIdleTime&& isEqual(this.connectionRetryPolicy, that.connectionRetryPolicy)&& this.pingInterval == that.pingInterval&& isEqual(this.protocol, that.protocol)&& this.rpcTimeout == that.rpcTimeout&& this.tcpNoDelay == that.tcpNoDelay&& isEqual(this.ticket, that.ticket);}return false;}@Overridepublic int hashCode() {int result = connectionRetryPolicy.hashCode();result = PRIME * result + ((address == null) ? 0 : address.hashCode());result = PRIME * result + (doPing ? 1231 : 1237);result = PRIME * result + maxIdleTime;result = PRIME * result + pingInterval;result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());result = PRIME * result + rpcTimeout;result = PRIME * result + (tcpNoDelay ? 1231 : 1237);result = PRIME * result + ((ticket == null) ? 0 : ticket.hashCode());return result;}@Overridepublic String toString() {return address.toString();}}  /*** Returns the next valid sequential call ID by incrementing an atomic counter* and masking off the sign bit.  Valid call IDs are non-negative integers in* the range [ 0, 2^31 - 1 ].  Negative numbers are reserved for special* purposes.  The values can overflow back to 0 and be reused.  Note that prior* versions of the client did not mask off the sign bit, so a server may still* see a negative call ID if it receives connections from an old client.* * @return next call ID*/public static int nextCallId() {return callIdCounter.getAndIncrement() & 0x7FFFFFFF;}
}

hadoop-common2.7源码分析之ProtobufRpcEngine(RPC实现)相关推荐

  1. spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析

    spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析 TransportContext 首先官方文档对Transpor ...

  2. Dubbo源码分析系列-深入RPC协议扩展

    导语   在之前的博客里面提到了关于扩展机制以及SPI的原理,这篇博客主要来讨论一下关于协议的扩展问题,在系统与系统之间通信就需要两个系统之间遵循相同的协议.而现在被熟知的常用的协议有TCP/IP协议 ...

  3. Gorilla源码分析之gorilla/rpc源码分析

    本文公众号文章链接:https://mp.weixin.qq.com/s/6ZNJB1Qcwdk0MaC2fWu4ng 本文csdn博客链接:https://blog.csdn.net/scresce ...

  4. hadoop之BlockPoolManager源码分析

    在HDFS Federation架构中, 一个HDFS集群可以创建多个命名空间,每一个DataNode都可以存储多个BlockPool的的数据块,所以在 DataNode定义了一个BlockPoolM ...

  5. spark 源码分析之七--Spark RPC剖析之RpcEndPoint和RpcEndPointRef剖析

    RpcEndpoint 文档对RpcEndpoint的解释: An end point for the RPC that defines what functions to trigger given ...

  6. Hadoop源码分析(四)

    2021SC@SDUSC 研究内容简略介绍 上周我们分析了org.apache.hadoop.mapreduce.Cluster中的的核心代码,本周将继续对mapreduce部分进行分析.在对Clus ...

  7. Hadoop源码分析(25)

    Hadoop源码分析(25) ZKFC源码分析   从文档(4)到文档(24),详细分析了namenode的启动流程.最后namenode会以standby模式启动.但在standby模式下的name ...

  8. hadoop作业初始化过程详解(源码分析第三篇)

    (一)概述 我们在上一篇blog已经详细的分析了一个作业从用户输入提交命令到到达JobTracker之前的各个过程.在作业到达JobTracker之后初始化之前,JobTracker会通过submit ...

  9. hadoop之MapReduce框架TaskTracker端心跳机制分析(源码分析第六篇)

    1.概述 MapReduce框架中的master/slave心跳机制是整个集群运作的基础,是沟通TaskTracker和JobTracker的桥梁.TaskTracker周期性地调用心跳RPC函数,汇 ...

最新文章

  1. 如何制定客户留存策略_如何制定品牌营销策略?
  2. 现代操作系统:第三章 内存管理
  3. java dom4j 去除空行_如何从XML文件中删除多余的空行?
  4. isAlive()方法的作用
  5. 四种保留小数后两位输出方法
  6. 什么叫做罗列式_项目起盘的时候,如何确定自己该做什么社群?
  7. 人类为啥比小鼠发育更慢?同日两篇《科学》找到意想不到的原因
  8. T410i升级i3 380M,上测试对比图,附拆机心得
  9. python能开发微信公众号吗_用python如何开发微信公共帐号?
  10. 戴戒指的含义(以后要结婚的必看)
  11. synchronized实现原理之---Moniter的实现原理
  12. 用Excel获取数据——不仅仅只是打开表格
  13. linux安装系统前安装驱动(driver)方法
  14. win10系统访问局域网服务器,Win10系统不能访问局域网共享磁盘的解决方法
  15. Plants vs. Zombies ZOJ - 4062
  16. 微信小程序_Flex布局
  17. linux配置 rsync脚本
  18. Unity编辑器拓展-写一个查看当前所有PlayerPrefsKey的窗口
  19. python安装 pymssql 异常
  20. 花椒接口Mock方案

热门文章

  1. TensorFlow.js简介
  2. tableau连接不上oracle,Oracle
  3. 生于忧患,死于安乐-时刻保持危机感
  4. Java XML分析技术: StAX, SAX, DOM, DOM4j, JDOM
  5. 南卡315打假!揭露山寨耳机“十宗罪”!
  6. 用python生成M序列
  7. 点击按钮复制文本框内容
  8. linux64x gtx970,Nvidia GeForce GTX 970 ( 4 GB / 七彩虹 )无法正常驱动
  9. GD32F4xx 以太网芯片(enc28j60)驱动移植
  10. PXE启动配置及原理