一、前言

本文主要关于基于hive2.3.6版本的hiveserver2源码部分剖析。

二、背景

前段时间做chdfs迁移,发现beeline连接hiveserver2的用户在hdfs创建的临时目录变成了hive。其中代理用户并未起效果,感觉chdfs有代理用户上的bug。然后翻看了下hiveserver2服务端和客户端连接的代码。(hive.server2.enable.doAs 已经设置为 true)
正常情况下:

代理用户不起效果:

三、服务端启动

1、Hiveserver2

  public static void main(String[] args) {// 加载配置文件HiveConf.setLoadHiveServer2Config(true);try {ServerOptionsProcessor oproc = new ServerOptionsProcessor("hiveserver2");// 返回 StartOptionExecutorServerOptionsProcessorResponse oprocResponse = oproc.parse(args);// NOTE: It is critical to do this here so that log4j is reinitialized// before any of the other core hive classes are loadedString initLog4jMessage = LogUtils.initHiveLog4j();LOG.debug(initLog4jMessage);HiveStringUtils.startupShutdownMessage(HiveServer2.class, args, LOG);// Logger debug message from "oproc" after log4j initialize properlyLOG.debug(oproc.getDebugMessage().toString());// Call the executor which will execute the appropriate command based on the parsed options// 执行 StartOptionExecutor.execute()oprocResponse.getServerOptionsExecutor().execute();} catch (LogInitializationException e) {LOG.error("Error initializing log: " + e.getMessage(), e);System.exit(-1);}}
  static class StartOptionExecutor implements ServerOptionsExecutor {@Overridepublic void execute() {try {// 开启hiveserver2 thriftstartHiveServer2();} catch (Throwable t) {LOG.error("Error starting HiveServer2", t);System.exit(-1);}}}
  private static void startHiveServer2() throws Throwable {long attempts = 0, maxAttempts = 1;while (true) {LOG.info("Starting HiveServer2");HiveConf hiveConf = new HiveConf();// 最大启动重试次数maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);// 两次重试之间的时间间隔long retrySleepIntervalMs = hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_SLEEP_INTERVAL_BETWEEN_START_ATTEMPTS,TimeUnit.MILLISECONDS);HiveServer2 server = null;try {// Initialize the pool before we start the server; don't start yet.TezSessionPoolManager sessionPool = null;if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {sessionPool = TezSessionPoolManager.getInstance();sessionPool.setupPool(hiveConf);}// Cleanup the scratch dir before starting// 清理scratchdir 默认hdfs目录 /tmp/hiveServerUtils.cleanUpScratchDir(hiveConf);// Schedule task to cleanup dangling scratch dir periodically,// initial wait for a random time between 0-10 min to// avoid intial spike when using multiple HS2// 定时服务清理hive不正常关闭遗留的目录,通过判断 inuse.lck 来判断目录是否被锁住,未被锁定自动清理scheduleClearDanglingScratchDir(hiveConf, new Random().nextInt(600));server = new HiveServer2();// 初始化,主要添加两个服务:1、CLIService 2、ThriftBinaryCLIService 3、启动webui serverserver.init(hiveConf);// 启动服务server.start();try {JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(hiveConf);pauseMonitor.start();} catch (Throwable t) {LOG.warn("Could not initiate the JvmPauseMonitor thread." + " GCs and Pauses may not be " +"warned upon.", t);}if (sessionPool != null) {sessionPool.startPool();}if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {SparkSessionManagerImpl.getInstance().setup(hiveConf);}break;} catch (Throwable throwable) {if (server != null) {try {server.stop();} catch (Throwable t) {LOG.info("Exception caught when calling stop of HiveServer2 before retrying start", t);} finally {server = null;}}if (++attempts >= maxAttempts) {throw new Error("Max start attempts " + maxAttempts + " exhausted", throwable);} else {LOG.warn("Error starting HiveServer2 on attempt " + attempts+ ", will retry in " + retrySleepIntervalMs + "ms", throwable);try {Thread.sleep(retrySleepIntervalMs);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}}}

CliService

 public synchronized void init(HiveConf hiveConf) {this.hiveConf = hiveConf;// session管理服务sessionManager = new SessionManager(hiveServer2);// 每批次返回给client的条数默认1000defaultFetchRows = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE);addService(sessionManager);//  If the hadoop cluster is secure, do a kerberos login for the service from the keytabif (UserGroupInformation.isSecurityEnabled()) {try {HiveAuthFactory.loginFromKeytab(hiveConf);this.serviceUGI = Utils.getUGI();} catch (IOException e) {throw new ServiceException("Unable to login to kerberos with given principal/keytab", e);} catch (LoginException e) {throw new ServiceException("Unable to login to kerberos with given principal/keytab", e);}// Also try creating a UGI object for the SPNego principalString principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_PRINCIPAL);String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_KEYTAB);if (principal.isEmpty() || keyTabFile.isEmpty()) {LOG.info("SPNego httpUGI not created, spNegoPrincipal: " + principal +", ketabFile: " + keyTabFile);} else {try {this.httpUGI = HiveAuthFactory.loginFromSpnegoKeytabAndReturnUGI(hiveConf);LOG.info("SPNego httpUGI successfully created.");} catch (IOException e) {LOG.warn("SPNego httpUGI creation failed: ", e);}}}// creates connection to HMS and thus *must* occur after kerberos login abovetry {// 初始化hive SessionState 此时用户未hive。还没有代理用户applyAuthorizationConfigPolicy(hiveConf);} catch (Exception e) {throw new RuntimeException("Error applying authorization policy on hive configuration: "+ e.getMessage(), e);}setupBlockedUdfs();super.init(hiveConf);}

ThriftBinaryCLIService

 public void run() {try {// Server thread poolString threadPoolName = "HiveServer2-Handler-Pool";ExecutorService executorService = new ThreadPoolExecutorWithOomHook(minWorkerThreads,maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), new ThreadFactoryWithGarbageCleanup(threadPoolName),oomHook);// Thrift configshiveAuthFactory = new HiveAuthFactory(hiveConf);// 里面加了callback 做账号密码校验,自定义实现 PasswdAuthenticationProviderTTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();// TCLIService.Processor thift服务端处理接口TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);TServerSocket serverSocket = null;List<String> sslVersionBlacklist = new ArrayList<String>();for (String sslVersion : hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) {sslVersionBlacklist.add(sslVersion);}if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {serverSocket = HiveAuthUtils.getServerSocket(hiveHost, portNum);} else {String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();if (keyStorePath.isEmpty()) {throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname+ " Not configured for SSL connection");}String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);serverSocket = HiveAuthUtils.getServerSSLSocket(hiveHost, portNum, keyStorePath,keyStorePassword, sslVersionBlacklist);}// Server argsint maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);int requestTimeout = (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS);int beBackoffSlotLength = (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket).processorFactory(processorFactory).transportFactory(transportFactory).protocolFactory(new TBinaryProtocol.Factory()).inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize)).requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS).beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS).executorService(executorService);// TCP Serverserver = new TThreadPoolServer(sargs);server.setServerEventHandler(new TServerEventHandler() {@Overridepublic ServerContext createContext(TProtocol input, TProtocol output) {Metrics metrics = MetricsFactory.getInstance();if (metrics != null) {try {metrics.incrementCounter(MetricsConstant.OPEN_CONNECTIONS);metrics.incrementCounter(MetricsConstant.CUMULATIVE_CONNECTION_COUNT);} catch (Exception e) {LOG.warn("Error Reporting JDO operation to Metrics system", e);}}return new ThriftCLIServerContext();}@Overridepublic void deleteContext(ServerContext serverContext,TProtocol input, TProtocol output) {Metrics metrics = MetricsFactory.getInstance();if (metrics != null) {try {metrics.decrementCounter(MetricsConstant.OPEN_CONNECTIONS);} catch (Exception e) {LOG.warn("Error Reporting JDO operation to Metrics system", e);}}ThriftCLIServerContext context = (ThriftCLIServerContext) serverContext;SessionHandle sessionHandle = context.getSessionHandle();if (sessionHandle != null) {LOG.info("Session disconnected without closing properly. ");try {boolean close = cliService.getSessionManager().getSession(sessionHandle).getHiveConf().getBoolVar(ConfVars.HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT);LOG.info((close ? "" : "Not ") + "Closing the session: " + sessionHandle);if (close) {cliService.closeSession(sessionHandle);}} catch (HiveSQLException e) {LOG.warn("Failed to close session: " + e, e);}}}@Overridepublic void preServe() {}@Overridepublic void processContext(ServerContext serverContext,TTransport input, TTransport output) {currentServerContext.set(serverContext);}});String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "+ portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";LOG.info(msg);// 启动服务server.serve();} catch (Throwable t) {LOG.error("Error starting HiveServer2: could not start "+ ThriftBinaryCLIService.class.getSimpleName(), t);System.exit(-1);}}

以上代码主要未服务端的一些服务启动,最主要的服务就是ThriftBinaryCLIService 这个。启动了thrift服务用来接收客户端发过来的请求。接下来看下客户端如何请求过来的~

四、客户端代码

客户端连接主要同HiveDriver jdbc连接过去的,HiveDriver封装了thrift客户端
Beeline

  public static void main(String[] args) throws IOException {mainWithInputRedirection(args, null);}public static void mainWithInputRedirection(String[] args, InputStream inputStream)throws IOException {// isBeeline = trueBeeLine beeLine = new BeeLine();try {// 启动beelineint status = beeLine.begin(args, inputStream);if (!Boolean.getBoolean(BeeLineOpts.PROPERTY_NAME_EXIT)) {System.exit(status);}} finally {beeLine.close();}}public int begin(String[] args, InputStream inputStream) throws IOException {try {// load the options first, so we can override on the command linegetOpts().load();} catch (Exception e) {// nothing}setupHistory();//add shutdown hook to cleanup the beeline for smooth exitaddBeelineShutdownHook();//this method also initializes the consoleReader which is//needed by initArgs for certain execution pathsConsoleReader reader = initializeConsoleReader(inputStream);if (isBeeLine) {// 初始话参数,里面做了jdbc连接int code = initArgs(args);if (code != 0) {return code;}} else {int code = initArgsFromCliVars(args);if (code != 0 || exit) {return code;}defaultConnect(false);}if (getOpts().isHelpAsked()) {return 0;}if (getOpts().getScriptFile() != null) {return executeFile(getOpts().getScriptFile());}try {info(getApplicationTitle());} catch (Exception e) {// ignore}return execute(reader, false);}int initArgs(String[] args) {List<String> commands = Collections.emptyList();CommandLine cl;BeelineParser beelineParser;try {beelineParser = new BeelineParser();cl = beelineParser.parse(options, args);} catch (ParseException e1) {output(e1.getMessage());usage();return -1;}// 连接到服务端boolean connSuccessful = connectUsingArgs(beelineParser, cl);// checks if default hs2 connection configuration file is present// and uses it to connect if found// no-op if the file is not presentif(!connSuccessful && !exit) {connSuccessful = defaultBeelineConnect();}int code = 0;if (cl.getOptionValues('e') != null) {commands = Arrays.asList(cl.getOptionValues('e'));}if (!commands.isEmpty() && getOpts().getScriptFile() != null) {error("The '-e' and '-f' options cannot be specified simultaneously");return 1;} else if(!commands.isEmpty() && !connSuccessful) {error("Cannot run commands specified using -e. No current connection");return 1;}if (!commands.isEmpty()) {for (Iterator<String> i = commands.iterator(); i.hasNext();) {String command = i.next().toString();debug(loc("executing-command", command));if (!dispatch(command)) {code++;}}exit = true; // execute and exit}return code;}private boolean connectUsingArgs(BeelineParser beelineParser, CommandLine cl) {String driver = null, user = null, pass = "", url = null;String auth = null;if (cl.hasOption("help")) {usage();getOpts().setHelpAsked(true);return true;}Properties hiveVars = cl.getOptionProperties("hivevar");for (String key : hiveVars.stringPropertyNames()) {getOpts().getHiveVariables().put(key, hiveVars.getProperty(key));}Properties hiveConfs = cl.getOptionProperties("hiveconf");for (String key : hiveConfs.stringPropertyNames()) {setHiveConfVar(key, hiveConfs.getProperty(key));}driver = cl.getOptionValue("d");auth = cl.getOptionValue("a");user = cl.getOptionValue("n");getOpts().setAuthType(auth);if (cl.hasOption("w")) {pass = obtainPasswordFromFile(cl.getOptionValue("w"));} else {if (beelineParser.isPasswordOptionSet) {pass = cl.getOptionValue("p");}}url = cl.getOptionValue("u");if ((url == null) && cl.hasOption("reconnect")){// If url was not specified with -u, but -r was present, use that.url = getOpts().getLastConnectedUrl();}getOpts().setInitFiles(cl.getOptionValues("i"));getOpts().setScriptFile(cl.getOptionValue("f"));if (url != null) {String com;String comForDebug;if(pass != null) {com = constructCmd(url, user, pass, driver, false);comForDebug = constructCmd(url, user, pass, driver, true);} else {com = constructCmdUrl(url, user, driver, false);comForDebug = constructCmdUrl(url, user, driver, true);}debug(comForDebug);// !connect  url user passwdreturn dispatch(com);}// load property fileString propertyFile = cl.getOptionValue("property-file");if (propertyFile != null) {try {this.consoleReader = new ConsoleReader();} catch (IOException e) {handleException(e);}if (!dispatch("!properties " + propertyFile)) {exit = true;return false;}}return false;}boolean dispatch(String line) {if (line == null) {// exitexit = true;return true;}if (line.trim().length() == 0) {return true;}if (isComment(line)) {return true;}line = line.trim();// save it to the current script, if anyif (scriptOutputFile != null) {scriptOutputFile.addLine(line);}if (isHelpRequest(line)) {line = "!help";}if (isBeeLine) {if (line.startsWith(COMMAND_PREFIX)) {// handle SQLLine command in beeline which starts with ! and does not end with ;// !connect 入口return execCommandWithPrefix(line);} else {return commands.sql(line, getOpts().getEntireLineAsCommand());}} else {return commands.sql(line, getOpts().getEntireLineAsCommand());}}
  public boolean execCommandWithPrefix(String line) {Map<String, CommandHandler> cmdMap = new TreeMap<String, CommandHandler>();line = line.substring(1);/*** connect匹配到*       new ReflectiveCommandHandler(this, new String[] {"connect", "open"},*           new Completer[] {new StringsCompleter(getConnectionURLExamples())}),*  其中getName拿到的就是connect*  最终执行Commands.connect*/for (int i = 0; i < commandHandlers.length; i++) {String match = commandHandlers[i].matches(line);if (match != null) {cmdMap.put(match, commandHandlers[i]);}}if (cmdMap.size() == 0) {return error(loc("unknown-command", line));}if (cmdMap.size() > 1) {// any exact match?CommandHandler handler = cmdMap.get(line);if (handler == null) {return error(loc("multiple-matches", cmdMap.keySet().toString()));}return handler.execute(line);}return cmdMap.values().iterator().next().execute(line);}

执行部分 Commands.connect
Commands

  public boolean connect(Properties props) throws IOException {String url = getProperty(props, new String[] {JdbcConnectionParams.PROPERTY_URL,"javax.jdo.option.ConnectionURL","ConnectionURL",});String driver = getProperty(props, new String[] {JdbcConnectionParams.PROPERTY_DRIVER,"javax.jdo.option.ConnectionDriverName","ConnectionDriverName",});String username = getProperty(props, new String[] {JdbcConnectionParams.AUTH_USER,"javax.jdo.option.ConnectionUserName","ConnectionUserName",});String password = getProperty(props, new String[] {JdbcConnectionParams.AUTH_PASSWD,"javax.jdo.option.ConnectionPassword","ConnectionPassword",});if (url == null || url.length() == 0) {return beeLine.error("Property \"url\" is required");}if (driver == null || driver.length() == 0) {if (!beeLine.scanForDriver(url)) {return beeLine.error(beeLine.loc("no-driver", url));}}String auth = getProperty(props, new String[] {JdbcConnectionParams.AUTH_TYPE});if (auth == null) {auth = beeLine.getOpts().getAuthType();if (auth != null) {props.setProperty(JdbcConnectionParams.AUTH_TYPE, auth);}}beeLine.info("Connecting to " + url);if (Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PRINCIPAL) == null) {String urlForPrompt = url.substring(0, url.contains(";") ? url.indexOf(';') : url.length());if (username == null) {username = beeLine.getConsoleReader().readLine("Enter username for " + urlForPrompt + ": ");}props.setProperty(JdbcConnectionParams.AUTH_USER, username);if (password == null) {password = beeLine.getConsoleReader().readLine("Enter password for " + urlForPrompt + ": ",new Character('*'));}props.setProperty(JdbcConnectionParams.AUTH_PASSWD, password);}try {beeLine.getDatabaseConnections().setConnection(new DatabaseConnection(beeLine, driver, url, props));// 里面是调用HiveDriver进行连接,session.初始化beeLine.getDatabaseConnection().getConnection();if (!beeLine.isBeeLine()) {beeLine.updateOptsForCli();}beeLine.runInit();beeLine.setCompletions();beeLine.getOpts().setLastConnectedUrl(url);return true;} catch (SQLException sqle) {beeLine.getDatabaseConnections().remove();return beeLine.error(sqle);} catch (IOException ioe) {return beeLine.error(ioe);}}
setConnection(DriverManager.getConnection(getUrl(), info));

走到了hivedriver

  public Connection connect(String url, Properties info) throws SQLException {return acceptsURL(url) ? new HiveConnection(url, info) : null;}

HiveConnection 初始化客户端,建立session

          // open the client transportopenTransport();// set up the clientclient = new TCLIService.Client(new TBinaryProtocol(transport));// open client session// 告知服务端需要打开session建立连接,服务端回把session初始化  服务端入口ThriftCliService.OpenSessionopenSession();executeInitSql();break;

看服务端代码 ThriftCliService.OpenSession

  public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException {LOG.info("Client protocol version: " + req.getClient_protocol());TOpenSessionResp resp = new TOpenSessionResp();try {Map<String, String> openConf = req.getConfiguration();// session 初始化入口SessionHandle sessionHandle = getSessionHandle(req, resp);resp.setSessionHandle(sessionHandle.toTSessionHandle());Map<String, String> configurationMap = new HashMap<String, String>();// Set the updated fetch size from the server into the configuration map for the clientHiveConf sessionConf = cliService.getSessionConf(sessionHandle);configurationMap.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname,Integer.toString(sessionConf != null ?sessionConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE) :hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE)));resp.setConfiguration(configurationMap);resp.setStatus(OK_STATUS);ThriftCLIServerContext context =(ThriftCLIServerContext)currentServerContext.get();if (context != null) {context.setSessionHandle(sessionHandle);}} catch (Exception e) {LOG.warn("Error opening session: ", e);resp.setStatus(HiveSQLException.toTStatus(e));}return resp;}SessionHandle getSessionHandle(TOpenSessionReq req, TOpenSessionResp res)throws HiveSQLException, LoginException, IOException {String userName = getUserName(req);String ipAddress = getIpAddress();TProtocolVersion protocol = getMinVersion(CLIService.SERVER_VERSION,req.getClient_protocol());SessionHandle sessionHandle;// 参数判断是否使用代理用户  hive.server2.enable.doAs if (cliService.getHiveConf().getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&(userName != null)) {String delegationTokenStr = getDelegationToken(userName);sessionHandle = cliService.openSessionWithImpersonation(protocol, userName,req.getPassword(), ipAddress, req.getConfiguration(), delegationTokenStr);} else {sessionHandle = cliService.openSession(protocol, userName, req.getPassword(),ipAddress, req.getConfiguration());}res.setServerProtocolVersion(protocol);return sessionHandle;}public SessionHandle openSessionWithImpersonation(TProtocolVersion protocol, String username,String password, String ipAddress, Map<String, String> configuration, String delegationToken)throws HiveSQLException {// 打开sessionSessionHandle sessionHandle = sessionManager.openSession(protocol, username, password, ipAddress, configuration,true, delegationToken);LOG.debug(sessionHandle + ": openSession()");return sessionHandle;}public SessionHandle openSession(TProtocolVersion protocol, String username, String password, String ipAddress,Map<String, String> sessionConf, boolean withImpersonation, String delegationToken)throws HiveSQLException {return createSession(null, protocol, username, password, ipAddress, sessionConf,withImpersonation, delegationToken).getSessionHandle();}public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion protocol, String username,String password, String ipAddress, Map<String, String> sessionConf, boolean withImpersonation,String delegationToken)throws HiveSQLException {HiveSession session;// If doAs is set to true for HiveServer2, we will create a proxy object for the session impl.// Within the proxy object, we wrap the method call in a UserGroupInformation#doAsif (withImpersonation) {HiveSessionImplwithUGI hiveSessionUgi;if (sessionImplWithUGIclassName == null) {// 最终调用 UserGroupInformation.createProxyUser 创建了代理,往Subject塞入了代理用户hiveSessionUgi = new HiveSessionImplwithUGI(sessionHandle, protocol, username, password,hiveConf, ipAddress, delegationToken);} else {try {Class<?> clazz = Class.forName(sessionImplWithUGIclassName);Constructor<?> constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class, String.class,String.class, HiveConf.class, String.class, String.class);hiveSessionUgi = (HiveSessionImplwithUGI) constructor.newInstance(sessionHandle,protocol, username, password, hiveConf, ipAddress, delegationToken);} catch (Exception e) {throw new HiveSQLException("Cannot initilize session class:" + sessionImplWithUGIclassName);}}session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi());hiveSessionUgi.setProxySession(session);} else {if (sessionImplclassName == null) {session = new HiveSessionImpl(sessionHandle, protocol, username, password, hiveConf,ipAddress);} else {try {Class<?> clazz = Class.forName(sessionImplclassName);Constructor<?> constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class,String.class, String.class, HiveConf.class, String.class);session = (HiveSession) constructor.newInstance(sessionHandle, protocol, username, password,hiveConf, ipAddress);} catch (Exception e) {throw new HiveSQLException("Cannot initilize session class:" + sessionImplclassName, e);}}}session.setSessionManager(this);session.setOperationManager(operationManager);try {session.open(sessionConf);} catch (Exception e) {LOG.warn("Failed to open session", e);try {session.close();} catch (Throwable t) {LOG.warn("Error closing session", t);}session = null;throw new HiveSQLException("Failed to open new session: " + e.getMessage(), e);}if (isOperationLogEnabled) {session.setOperationLogSessionDir(operationLogRootDir);}try {executeSessionHooks(session);} catch (Exception e) {LOG.warn("Failed to execute session hooks", e);try {session.close();} catch (Throwable t) {LOG.warn("Error closing session", t);}session = null;throw new HiveSQLException("Failed to execute session hooks: " + e.getMessage(), e);}handleToSession.put(session.getSessionHandle(), session);LOG.info("Session opened, " + session.getSessionHandle() + ", current sessions:" + getOpenSessionCount());return session;}

后面就是session.start。到这里为止我们的ugi部分已经出现了。。下面详细看下ugi’的代码

五、UGI代码

hiveserver端调用了 UserGroupInformation.createProxyUser 创建了代理用户

  // setup appropriate UGI for the sessionpublic void setSessionUGI(String owner) throws HiveSQLException {if (owner == null) {throw new HiveSQLException("No username provided for impersonation");}try {sessionUgi = UserGroupInformation.createProxyUser(owner, UserGroupInformation.getLoginUser());} catch (IOException e) {throw new HiveSQLException("Couldn't setup proxy user", e);}}

UserGroupInformation中是将代理用户和真实用户塞入了Subject

  public static UserGroupInformation createProxyUser(String user,UserGroupInformation realUser) {if (user == null || user.isEmpty()) {throw new IllegalArgumentException("Null user");}if (realUser == null) {throw new IllegalArgumentException("Null real user");}Subject subject = new Subject();Set<Principal> principals = subject.getPrincipals();principals.add(new User(user));principals.add(new RealUser(realUser));UserGroupInformation result =new UserGroupInformation(subject, false);result.setAuthenticationMethod(AuthenticationMethod.PROXY);return result;}

接下来看下 FileSystem初始化时怎么拿用户的

  public static FileSystem get(Configuration conf) throws IOException {return get(getDefaultUri(conf), conf);}public static FileSystem get(URI uri, Configuration conf) throws IOException {String scheme = uri.getScheme();String authority = uri.getAuthority();if (scheme == null && authority == null) {     // use default FSreturn get(conf);}if (scheme != null && authority == null) {     // no authorityURI defaultUri = getDefaultUri(conf);if (scheme.equals(defaultUri.getScheme())    // if scheme matches default&& defaultUri.getAuthority() != null) {  // & default has authorityreturn get(defaultUri, conf);              // return default}}String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);if (conf.getBoolean(disableCacheName, false)) {return createFileSystem(uri, conf);}return CACHE.get(uri, conf);}

可以看到如果cache存了这个uri和conf 那么就直接拿出来,如果没有的话局势新建,然后丢到cache里

    FileSystem get(URI uri, Configuration conf) throws IOException{Key key = new Key(uri, conf);return getInternal(uri, conf, key);}

然后看下这个Key构成

      Key(URI uri, Configuration conf, long unique) throws IOException {scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();this.unique = unique;// ugi当前用户this.ugi = UserGroupInformation.getCurrentUser();}

UserGroupInformation.getCurrentUser()是个什么?
拿的就是subject,也就是前面createProxy后再去拿这个currentUser就是代理用户了

  static UserGroupInformation getCurrentUser() throws IOException {AccessControlContext context = AccessController.getContext();Subject subject = Subject.getSubject(context);if (subject == null || subject.getPrincipals(User.class).isEmpty()) {return getLoginUser();} else {return new UserGroupInformation(subject);}}

继续看FileSystem

  private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{FileSystem fs;synchronized (this) {fs = map.get(key);}if (fs != null) {return fs;}// 创建fsfs = createFileSystem(uri, conf);synchronized (this) { // refetch the lock againFileSystem oldfs = map.get(key);if (oldfs != null) { // a file system is created while lock is releasingfs.close(); // close the new file systemreturn oldfs;  // return the old file system}// now insert the new file system into the mapif (map.isEmpty()&& !ShutdownHookManager.get().isShutdownInProgress()) {ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);}fs.key = key;map.put(key, fs);if (conf.getBoolean("fs.automatic.close", true)) {toAutoClose.add(key);}return fs;}}

createFileSystem(uri, conf) 里面调用的是 DistributedFileSystem.initialize,在里面就是DFSClient,ugi就在dfsclient的构造函数里

  private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {Tracer tracer = FsTracer.get(conf);TraceScope scope = null;if (tracer != null) {scope = tracer.newScope("FileSystem#createFileSystem");scope.addKVAnnotation("scheme", uri.getScheme());}try {Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);if (clazz == null) {throw new IOException("No FileSystem for scheme: " + uri.getScheme());}FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);fs.tracer = tracer;fs.initialize(uri, conf);return fs;} finally {if (scope != null) scope.close();}}

在看下namenode服务端的mkdir代码

  @Override // ClientProtocolpublic boolean mkdirs(String src, FsPermission masked, boolean createParent)throws IOException {checkNNStartup();if(stateChangeLog.isDebugEnabled()) {stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);}if (!checkPathLength(src)) {throw new IOException("mkdirs: Pathname too long.  Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");}return namesystem.mkdirs(src,new PermissionStatus(getRemoteUser().getShortUserName(),null, masked), createParent);}

可以看到 getRemoteUser().getShortUserName() 拿了这里的用户这个就是 UserGroupInformation.getShortUserName

  public String getShortUserName() {for (User p: subject.getPrincipals(User.class)) {return p.getShortName();}return null;}

所以最终namenode执行的用户就是设置的代理用户

六、总结

文章写的比较凌乱,更多的还是代码上的跟踪。hiveserver后面的都没有写道,以后有时间再写。主要在hiveserver如何实现代理用户这块写了下,这块底层还是hdfs的UserGroupInformation这个类~~~~

Hiveserver2源码剖析---如何实现代理用户相关推荐

  1. 豌豆夹Redis解决方案Codis源码剖析:Proxy代理

    豌豆夹Redis解决方案Codis源码剖析:Proxy代理 1.预备知识 1.1 Codis Codis就不详细说了,摘抄一下GitHub上的一些项目描述: Codis is a proxy base ...

  2. Chrome源码剖析、上--多线程模型、进程通信、进程模型

    Chrome源码剖析.上 原著:duguguiyu. 整理:July. 时间:二零一一年四月二日. 出处:http://blog.csdn.net/v_JULY_v. 说明:此Chrome源码剖析很大 ...

  3. Chrome源码剖析 上--多线程模型 进程通信 进程模型

    分享一下我老师大神的人工智能教程!零基础,通俗易懂!http://blog.csdn.net/jiangjunshow 也欢迎大家转载本篇文章.分享知识,造福人民,实现我们中华民族伟大复兴! Chro ...

  4. CHROME源码剖析 上《转》

    转自:http://www.blogjava.net/xiaomage234/archive/2012/02/16/370122.html 原著:duguguiyu. 整理:July. 时间:二零一一 ...

  5. 【chrome】Chrome源码剖析、上--多线程模型、进程通信、进程模型

     Chrome源码剖析.上 原著:duguguiyu. 整理:July. 时间:二零一一年四月二日. 出处:http://blog.csdn.net/v_JULY_v. 说明:此Chrome源码剖析很 ...

  6. Chrome源码剖析

    转自http://www.wy182000.com/2011/05/06/chrome%E6%BA%90%E7%A0%81%E5%89%96%E6%9E%90/ 开源是口好东西,它让这个充斥着大量工业 ...

  7. Google Chrome源码剖析

    http://www.ha97.com/2903.html Google Chrome源码剖析[序] 发表于: Google, Google Chrome, 开源世界, 旧文存档, 编程开发 | 作者 ...

  8. Chrome-Chrome源码剖析

    Chrome源码剖析 [序] && [一] [序] 开源是口好东西,它让这个充斥着大量工业垃圾代码和教材玩具代码的行业,多了一些艺术气息和美的潜质.它使得每个人,无论你来自米国纽约还是 ...

  9. Chrome源码剖析——多线程模型、进程通信、进程模型

    说明:本文内容由 v_JULY_v 根据 duguguiyu 的博客整理而成. 前言 1.  之所以整理此文,有两个目的:一是为了供自己学习研究之用:二是为了备份,以作日后反复研究.除此之外,无它. ...

最新文章

  1. 【Tools】CMAKE的使用
  2. Hibernate持久化类与主键生成策略
  3. map.get(key)空指针异常_NPE空指针异常总结
  4. 输出字母沙漏+对称字符串
  5. 项目经理在项目各阶段的工作重点
  6. qt4.7 mysql_详解Qt 4.7编译和访问Mysql驱动
  7. matlab绘制三维图形
  8. “李宏毅老师对不起,我要去追这门美女老师的课了” ,台大陈蕴侬深度学习课程最新资料下载...
  9. java获取字符串第一位_Java程序员经典面试题集大全 (十一)
  10. 飞秋2010下载在未来讲差异化
  11. 韩国咖啡连锁店Tom N Toms将发布TomTom代币
  12. 网页输出pdf并转为word
  13. 16/4/4二代支付硬盘故障处理
  14. Typora_Markdown_图片标题(题注)
  15. 2019 GDUT 新生专题I选集 L题(CodeForces - 1260B)
  16. C语言入门-跑步问题
  17. SpringCloud2.0版本入门 | 服务链路追踪(Spring Cloud Sleuth)简单入门
  18. nacos与eureka的区别
  19. Tay继任者:微软人工智能聊天机器人Zo上线测试
  20. TP-LINK wn822n USB型无线网卡win10环境下驱动程序

热门文章

  1. ai论文调研——PAMI Popular Articles (December 2015)
  2. java 字符 加密_Java 字符串的加密与解密
  3. one world,one dream
  4. Python基础之闭包函数
  5. 产品经理必知的2020年手机 App 7大原型设计流行趋势
  6. 年轻人的第一个APM-Skywalking
  7. 我跟OpenStack 1-8年,从ABC到HI、到KO
  8. 借助CatGPT让turtlesim小乌龟画曲线
  9. 麻省理工大学教授教你怎样做…
  10. pyqt win32发送QQ消息