Spark消息通信架构

在Spark中定义了通信框架接口,这些接口实现中调用了Netty的具体方法。通信框架使用了工厂设计模式,这种模式实现了对Netty的解耦,能够根据需要引入其他的消息通信工具。
Spark消息通信类图如下:

通信框架在上图中虚线的部分。其具体实现步骤为:

  • ①定义RpcEnv和RpcEnvFactory两个抽象类,其中在RpcEnv中定义了RPC通信框架启动、停止和关闭等抽象方法;在RpcEnvFactory中定义了创建抽象方法
  • ②在NettyRpcEnv和NettyRpcEnvFactory类中使用了Netty对继承的方法进行了实现
  • ③在RpcEnv的object类中通过反射方法实现了创建RpcEnv的实例静态方法

上述Spark消息类图中各模块使用流程:

  • ①使用RpcEnv的静态方法创建RpcEnv实例,实例化Master
  • ②调用RpcEnv启动终端点方法,把Master的终端点和其对应的引用注册到RpcEnv中
  • ③若其他对象获取了Master终端点的引用,就能够发消息给Master进行通信了。

Spark启动消息通信

Spark启动过程中主要是Master与Worker之间的通信。其过程如下:

其详细过程及源代码如下:
(1)Work向Master发送注册Worker的消息

private def registerWithMaster() {// onDisconnected may be triggered multiple times, so don't attempt registration// if there are outstanding registration attempts scheduled.registrationRetryTimer match {case None =>...registerMasterFutures = tryRegisterAllMasters()...}/private def tryRegisterAllMasters(): Array[JFuture[_]] = {...//获取Master终端点的引用val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)//调用sendRegisterMessageToMaster方法注册消息sendRegisterMessageToMaster(masterEndpoint)...}/private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = {masterEndpoint.send(RegisterWorker(workerId,host,port,self,cores,memory,workerWebUiUrl,masterEndpoint.address))}
/case class RegisterWorker(id: String,host: String,port: Int,worker: RpcEndpointRef,cores: Int,memory: Int,workerWebUiUrl: String,masterAddress: RpcAddress)extends DeployMessage {Utils.checkHost(host)assert (port > 0)}

(2)Master收到消息后,需要对Worker发送的消息进行验证、记录。如果注册成功,发送注册成功消息;否则发送注册失败消息。

override def receive: PartialFunction[Any, Unit] = {...case RegisterWorker(...//Master处于STANDBY状态,返回“MASTER处于STANDBY状态”if (state == RecoveryState.STANDBY) {workerRef.send(MasterInStandby)} else if (idToWorker.contains(id)) {workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))} else {val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,workerRef, workerWebUiUrl)//registerWorker方法中注册Worker,该方法中会把Worker放到列表中//用于后续运行任务时使用if (registerWorker(worker)) {persistenceEngine.addWorker(worker)workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))schedule()} else {val workerAddress = worker.endpoint.addresslogWarning("Worker registration failed. Attempted to re-register worker at same " +"address: " + workerAddress)workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "+ workerAddress))}}...}

(3)当Worker接受到注册后,会定时发送心跳信息Heartbeat给Master,使得Master能了解Worker的实时状态。

  private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {msg match {case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>if (preferConfiguredMasterAddress) {logInfo("Successfully registered with master " + masterAddress.toSparkURL)} else {logInfo("Successfully registered with master " + masterRef.address.toSparkURL)}....//如果设置清理以前应用使用的文件夹,则进行该动作if (CLEANUP_ENABLED) {logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")forwordMessageScheduler.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {self.send(WorkDirCleanup)}}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)}//向Master汇报Worker中Executor最新状态val execs = executors.values.map { e =>new ExecutorDescription(e.appId, e.execId, e.cores, e.state)}masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))case RegisterWorkerFailed(message) =>if (!registered) {logError("Worker registration failed: " + message)System.exit(1)}case MasterInStandby =>// Ignore. Master not yet ready.}}/private[deploy] object DeployMessages {...case object SendHeartbeat}

Spark运行时消息消息通信

Spark运行消息通信的交互过程如下图:

其详细过程及源代码如下:
(1)执行应用程序需要启动SparkContext,在SparkContext的启动过程中,会先实例化SchedulerBackend对象(上图中创建的是SparkDeploySchedulerBackend对象,因为是独立运行模式),在该对象的启动中会继承DriverEndpoint和创建Appclient的ClientEndpoint的两个终端点。
在ClientEndpoint的tryRegisterAllMasters方法中创建注册线程池registerMasterThreadPool,在该线程池中启动注册线程并向Master发送RegisterApplication注册应用的消息。

 private def tryRegisterAllMasters(): Array[JFuture[_]] = {//由于HA等环境有多个Master,需要遍历所有的Master发送消息for (masterAddress <- masterRpcAddresses) yield {//向线程池中启动注册线程,当该线程读到应用注册成功标志registered=ture时,退出注册线程registerMasterThreadPool.submit(new Runnable {override def run(): Unit = try {if (registered.get) {return}logInfo("Connecting to master " + masterAddress.toSparkURL + "...")//获取Master终端点的引用,发送注册应用的消息val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)masterRef.send(RegisterApplication(appDescription, self))} catch {case ie: InterruptedException => // Cancelledcase NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)}})}}

当Master接收到注册应用的消息时,在registerApplication方法中记录应用消息并把该消息加入到等待运行应用列表中,注册完毕发送RegisteredApplication给ClientEndpoint,同时调用startExecutorOnWorker方法运行应用,通知Worker启动Executor。

private def startExecutorsOnWorkers(): Unit = {// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app// in the queue, then the second app, etc.//使用FIFO调度算法运行应用,先注册的应用先运行for (app <- waitingApps) {val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)// If the cores left is less than the coresPerExecutor,the cores left will not be allocatedif (app.coresLeft >= coresPerExecutor) {// Filter out workers that don't have enough resources to launch an executorval usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE).filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&worker.coresFree >= coresPerExecutor).sortBy(_.coresFree).reverse//确定运行在哪些Worker上和每个Worker分配用于运行的核数,分配算法有两种,一种时把应用//运行在尽可能多的Worker上,相反,另一种是运行在尽可能少的Worker上val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)// Now that we've decided how many cores to allocate on each worker, let's allocate them//通知分配的Worker,启动Workerfor (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {allocateWorkerResourceToExecutors(app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))}}}}

(2)ApplicationClientEndpoint接收到Master发送RegisteredApplication消息,需要把注册表示registered改为true,Master注册线程获取状态变化后,完成注册Application。

 override def receive: PartialFunction[Any, Unit] = {//Master注册线程获取状态变化后,完成注册Application进程case RegisteredApplication(appId_, masterRef) =>// FIXME How to handle the following cases?// 1. A master receives multiple registrations and sends back multiple// RegisteredApplications due to an unstable network.// 2. Receive multiple RegisteredApplication from different masters because the master is// changing.appId.set(appId_)registered.set(true)master = Some(masterRef)listener.connected(appId.get)...}

(3)在Master类的startExecutorOnWorker方法中分配资源运行应用程序时,调用allocationWorkerResourceToExecutor方法实现Worker启动Executor。

override def receive: PartialFunction[Any, Unit] = synchronized {...case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>...//创建Executor执行目录val executorDir = new File(workDir, appId + "/" + execId)if (!executorDir.mkdirs()) {throw new IOException("Failed to create directory " + executorDir)}//通过SPARK_EXECUTOR_DIRS环境变量,在Worker中创建Executor中创建Executor执行目录,//当程序执行完后由Worker进行删除val appLocalDirs = appDirectories.getOrElse(appId, {val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)val dirs = localRootDirs.flatMap { dir =>try {val appDir = Utils.createDirectory(dir, namePrefix = "executor")Utils.chmod700(appDir)Some(appDir.getAbsolutePath())} catch {case e: IOException =>logWarning(s"${e.getMessage}. Ignoring this directory.")None}}.toSeqif (dirs.isEmpty) {throw new IOException("No subfolder can be created in " +s"${localRootDirs.mkString(",")}.")}dirs})appDirectories(appId) = appLocalDirs//在ExecutorRunner中创建CoarseGrainedExecutorBackend对象,创建的是使用应用信息中的//command,而command在SparkDeploySchedulerBackend的start方法中构建val manager = new ExecutorRunner(appId,execId,appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),cores_,memory_,self,workerId,host,webUi.boundPort,publicAddress,sparkHome,executorDir,workerUri,conf,appLocalDirs, ExecutorState.RUNNING)executors(appId + "/" + execId) = managermanager.start()coresUsed += cores_memoryUsed += memory_//向Master发送消息,表示Executor状态已经被更改ExecutorState.RUNNINGsendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))} catch {case e: Exception =>logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)if (executors.contains(appId + "/" + execId)) {executors(appId + "/" + execId).kill()executors -= appId + "/" + execId}sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,Some(e.toString), None))}}...}

在Executor创建中调用了fetchAndRunExecutor方法进行实现。

private def fetchAndRunExecutor() {try {// Launch the processval subsOpts = appDesc.command.javaOpts.map {Utils.substituteAppNExecIds(_, appId, execId.toString)}val subsCommand = appDesc.command.copy(javaOpts = subsOpts)//通过应用程序的信息和环境配置创建构造器builderval builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf),memory, sparkHome.getAbsolutePath, substituteVariables)val command = builder.command()val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")logInfo(s"Launch command: $formattedCommand")//在构造器builder中添加执行目录信息builder.directory(executorDir)builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))// In case we are running this from within the Spark Shell, avoid creating a "scala"// parent process for the executor commandbuilder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")// Add webUI log urls//在构造器builder中添加监控页面输入日志地址信息val baseUrl =if (conf.getBoolean("spark.ui.reverseProxy", false)) {s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="} else {s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="}builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")//启动构造器,创建CoarseGrainedExecutorBackend实例process = builder.start()val header = "Spark Executor Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)// Redirect its stdout and stderr to files//输出创建CoarseGrainedExecutorBackend实例运行信息val stdout = new File(executorDir, "stdout")stdoutAppender = FileAppender(process.getInputStream, stdout, conf)val stderr = new File(executorDir, "stderr")Files.write(header, stderr, StandardCharsets.UTF_8)stderrAppender = FileAppender(process.getErrorStream, stderr, conf)// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)// or with nonzero exit code//等待CoarseGrainedExecutorBackend运行结束,当结束时,向Worker发送退出状态信息val exitCode = process.waitFor()state = ExecutorState.EXITEDval message = "Command exited with code " + exitCodeworker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))} catch {case interrupted: InterruptedException =>logInfo("Runner thread for executor " + fullId + " interrupted")state = ExecutorState.KILLEDkillProcess(None)case e: Exception =>logError("Error running executor", e)state = ExecutorState.FAILEDkillProcess(Some(e.toString))}}
}

(4)Mater接收到Worker发送的ExecutorStateChanged消息

override def receive: PartialFunction[Any, Unit] = {...case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))execOption match {case Some(exec) =>val appInfo = idToApp(appId)val oldState = exec.stateexec.state = stateif (state == ExecutorState.RUNNING) {assert(oldState == ExecutorState.LAUNCHING,s"executor $execId state transfer from $oldState to RUNNING is illegal")appInfo.resetRetryCount()}//向Driver发送ExecutorUpdated消息exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))if (ExecutorState.isFinished(state)) {// Remove this executor from the worker and applogInfo(s"Removing executor ${exec.fullId} because it is $state")// If an application has already finished, preserve its// state to display its information properly on the UIif (!appInfo.isFinished) {appInfo.removeExecutor(exec)}exec.worker.removeExecutor(exec)val normalExit = exitStatus == Some(0)// Only retry certain number of times so we don't go into an infinite loop.// Important note: this code path is not exercised by tests, so be very careful when// changing this `if` condition.if (!normalExit&& appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES&& MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing pathval execs = appInfo.executors.valuesif (!execs.exists(_.state == ExecutorState.RUNNING)) {logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +s"${appInfo.retryCount} times; removing it")removeApplication(appInfo, ApplicationState.FAILED)}}}schedule()case None =>logWarning(s"Got status update for unknown executor $appId/$execId")}...}

(5)在DriverEndpoint终端点进行注册Executor。(在步骤(3)CoarseGrainedExecutorBackend启动方法Onstart中,会发送注册Executor消息给RegisterExecutor给DriverEndpoint)

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>if (executorDataMap.contains(executorId)) {executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))context.reply(true)}...//记录executor的编号,以及该executor使用的核数addressToExecutorId(executorAddress) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)val data = new ExecutorData(executorRef, executorAddress, hostname,cores, cores, logUrls)// This must be synchronized because variables mutated// in this block are read when requesting executors//创建executor编号和其具体信息的键值列表CoarseGrainedSchedulerBackend.this.synchronized {executorDataMap.put(executorId, data)if (currentExecutorIdCounter < executorId.toInt) {currentExecutorIdCounter = executorId.toInt}if (numPendingExecutors > 0) {numPendingExecutors -= 1logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")}}//回复executor完成注册消息executorRef.send(RegisteredExecutor)// Note: some tests expect the reply to come after we put the executor in the mapcontext.reply(true)listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))//分配运行任务资源并发送LaunchTask消息执行任务makeOffers()}...}

(6)当CoarseGrainedExecutorBackend接收到Executor注册成功的RegisteredExecutor消息时,在CoarseGrainedExecutorBackend容器中实例化Executor对象。

 override def receive: PartialFunction[Any, Unit] = {case RegisteredExecutor =>logInfo("Successfully registered with driver")try {//根据环境变量的参数,启动Executor,在Spark中,它是真正任务的执行者executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)} catch {case NonFatal(e) =>exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)}...
}

实例化的Executor对象会定时向Driver发送心跳信息,等待Driver下发任务。

private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")/private def startDriverHeartbeater(): Unit = {//设置间隔时间val intervalMs = HEARTBEAT_INTERVAL_MS// Wait a random interval so the heartbeats don't end up in sync//等待随机时间间隔,这样心跳不会在同步中结束val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]val heartbeatTask = new Runnable() {override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())}//发送心跳信息给Driverheartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)}
}

(7)CoarseGrainedExecutorBackend的Executor启动后,接收到从DriverEndpoint终端点发送的LaunchTask执行任务消息,任务执行是在Executor的launchTask方法实现的。

override def receive: PartialFunction[Any, Unit] = {...case LaunchTask(data) =>if (executor == null) {//当Executor没有成功启动时,输出异常日志并关闭ExecutorexitExecutor(1, "Received LaunchTask command but executor was null")} else {val taskDesc = TaskDescription.decode(data.value)logInfo("Got assigned task " + taskDesc.taskId)//启动TaskRunner进程执行任务executor.launchTask(this, taskDesc)}...
}

调用executor的launchTask方法,在该方法中创建TaskRunner进程,然后把该进程加入到threadPool中,由Executor统一调度。

def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {val tr = new TaskRunner(context, taskDescription)runningTasks.put(taskDescription.taskId, tr)threadPool.execute(tr)}

(8)在TaskRunner执行任务完成时,会由向DriverEndpoint终端点发送状态变更StatusUpdate消息。

override def receive: PartialFunction[Any, Unit] = {case StatusUpdate(executorId, taskId, state, data) =>//调用TaskSchedulerImpl的statusUpdate方法,根据任务执行不同结果继续处理scheduler.statusUpdate(taskId, state, data.value)if (TaskState.isFinished(state)) {executorDataMap.get(executorId) match {case Some(executorInfo) =>//任务执行成功后,回收该Executor运行该任务的CPU,再根据实际情况分配任务executorInfo.freeCores += scheduler.CPUS_PER_TASKmakeOffers(executorId)case None =>// Ignoring the update since we don't know about the executor.logWarning(s"Ignored task status update ($taskId state $state) " +s"from unknown executor with ID $executorId")}}...
}

Spark源码阅读02-Spark核心原理之消息通信原理相关推荐

  1. Spark源码阅读(五) --- Spark的支持的join方式以及join策略

    版本变动 2021-08-30 增加了对Broadcast Hash Join小表大小的评估内容 增加了对Sort Merge Join优于Shuffle Hash Join调用的解释 目录 Spar ...

  2. Spark源码阅读——任务提交过程

    2019独角兽企业重金招聘Python工程师标准>>> Spark 源码阅读--任务提交过程 当我们在使用spark编写mr作业是,最后都要涉及到调用reduce,foreach或者 ...

  3. Windows + IDEA + SBT 打造Spark源码阅读环境

    Spark源码阅读环境的准备 Spark源码是有Scala语言写成的,目前,IDEA对Scala的支持要比eclipse要好,大多数人会选在在IDEA上完成Spark平台应用的开发.因此,Spark源 ...

  4. 3000门徒内部训练绝密视频(泄密版)第2课:Scala面向对象彻底精通及Spark源码阅读

    Scala面向对象彻底精通及Spark源码阅读 不用写public class中的public class Person {private var myName = "flink" ...

  5. 3000门徒内部训练绝密视频(泄密版)第3课:Scala中函数式编程彻底精通及Spark源码阅读

    Scala中函数式编程彻底精通及Spark源码阅读 函数可以不依赖于类,函数可以作为函数的参数,函数可以作为函数的返回值 =>表明对左面的参数进行右面的加工 函数赋值给变量需要在函数名后面加空格 ...

  6. 3000门徒内部训练绝密视频(泄密版)第5课:彻底精通Scala隐式转换和并发编程及Spark源码阅读

    彻底精通Scala隐式转换和并发编程及Spark源码阅读 Akka ,Scala内部并发 隐式转换.隐式类.隐式参数 可以手动指定某种类型的对象或类转换成其他类型的对象或类.转换的原因是假设写好接口 ...

  7. spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析

    spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析 TransportContext 首先官方文档对Transpor ...

  8. 【源码阅读计划】浅析 Java 线程池工作原理及核心源码

    [源码阅读计划]浅析 Java 线程池工作原理及核心源码 为什么要用线程池? 线程池的设计 线程池如何维护自身状态? 线程池如何管理任务? execute函数执行过程(分配) getTask 函数(获 ...

  9. Spark源码阅读——DirectInputDStream

    2019独角兽企业重金招聘Python工程师标准>>> Spark源码分析--DirectInputDStream 在Spark-Streaming中,对流的抽象是使用DStream ...

  10. [以浪为码]Spark源码阅读03 - 序列化介绍 serializer

    版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/u013054888/article/details/90237348 系列文章专栏目录:小浪阅读 S ...

最新文章

  1. POJ-2513 Colored Sticks 字典树,欧拉回路
  2. CSU 1328: 近似回文词
  3. 【Ray Tracing in One Weekend 超详解】 光线追踪1-7 Dielectric 半径为负,实心球体镂空技巧...
  4. ASP.NET MVC 4 (一)路径映射
  5. phalapi做登录检测_phalApi
  6. python add_subplot_Python使用add_subplot与subplot画子图操作
  7. linux java 1.6 下载地址_linux 安装配置java环境 jdk1.6 jdk-6u45-linux-x64.bin
  8. paip.程序调试的几种方式大总结
  9. ios移动端 FullScreen
  10. Windows 10 开启卓越性能模式
  11. 【深度学习】【ICLR2019】DARTS代码解读
  12. 浩辰CAD建筑软件教程之门窗套
  13. Origin 图像复制到Word后字体变形
  14. 经管保研|2022复旦经院推免研究报告
  15. 伸展树(二) - C++实现
  16. Go/Goland 开发笔记
  17. 基于springboot jpa驾校管理系统源码
  18. 力扣编程题-解法汇总
  19. 01-Spring的初体验:spring工厂的化过程
  20. adc0832工作原理详解_单片机ADC的工作原理

热门文章

  1. 使用contour自定义等高线值
  2. 用离散傅里叶变换来实现OFDM
  3. HttpPrinter与YunPrinter区别
  4. oracle 更新丢失
  5. 基于注解的DWR使用
  6. 移动广告平台到底哪个好?哪个能赚的米米多?
  7. 跑步机到了,看能坚持多久
  8. python公共键_Python利用公共键如何对字典列表进行排序详解
  9. java注解方式实体类_如何用注解的方式在实体类中实现一对一,和一对多多对多...
  10. 1.1.2 标准化工作及相关组织