Spark的中,通过netty实现了类似akka的actor机制。

在spark中,一个EndPointData就类似一个akka中的actor。

private class EndpointData(val name: String,val endpoint: RpcEndpoint,val ref: NettyRpcEndpointRef) {val inbox = new Inbox(ref, endpoint)
}

一个EndPointData由name,endpoint,ref,inbox四个成员组成。

其中name为该actor的名称,endpoint处理远程消息的处理,ref则为远程消息调用时客户端的目标对象,inbox则为endpoint处理消息来源时的消息存放信箱。

val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASKlogDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +s"${executorData.executorHost}.")executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

举一个例子,当driver端向executor端下发task的时候,在CoarseGrainedSchedulerBackend中将会根据任务需要执行的所在executorId寻找得到相应的EndPointData,之后得到EndPointData下的ref调用send()方法即完成了一次远程调用,而调用的具体task数据将会被序列化,并包装在LaunchTask中,LaunchTask类似actor模型下对应的动作。

其中,ref为远程executor在向driver端注册executor的时候序列化后,在driver处反序列化的。在其send()方法中,会构造一条RequestMessage。

override def send(message: Any): Unit = {require(message != null, "Message is null")nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
}

RequestMessage包含了调用方的远程地址,该ref本身供远程寻找具体的endpoint进行操作,和上述的具体消息。

private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {if (receiver.client != null) {message.sendWith(receiver.client)} else {require(receiver.address != null,"Cannot send message to client endpoint with no listen address.")val targetOutbox = {val outbox = outboxes.get(receiver.address)if (outbox == null) {val newOutbox = new Outbox(this, receiver.address)val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)if (oldOutbox == null) {newOutbox} else {oldOutbox}} else {outbox}}if (stopped.get) {// It's possible that we put `targetOutbox` after stopping. So we need to clean it.outboxes.remove(receiver.address)targetOutbox.stop()} else {targetOutbox.send(message)}}
}

在包装好了的具体消息,如果该次消息为远程调用,将会判断是否已经完成了对应远程地址的netty客户端,如果已经完成了对应客户端的创建,将会直接发送,否则将会构造一个对应客户端的outbox缓存将要发送的数据,outbox中维护了一条队列用来存放等到发送的消息,直到对应客户端初试化完成后将会把outbox中的消息通过对应的netty客户端发送。

到executor观察其如何进行监听并处理相关的远程调用。

远程调用的主要处理在于Dispatcher类,其生成与封装在NettyRpcEnv的初始化过程中,NettyRpcEnv的初试化随着整个SparkEnv的初始化被调用。

private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)private val transportContext = new TransportContext(transportConf,new NettyRpcHandler(dispatcher, this, streamManager))

其中Dispatcher被初试化并封装在NettyRpcHandler当中,而NettyRpcHandler将作为上下文的一员,后续在Netty服务端的初始化当中,将该NettyRpcHandler的调用写到ChannelHandler重写的channelRead()方法中。

@Override
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {if (request instanceof RequestMessage) {requestHandler.handle((RequestMessage) request);} else if (request instanceof ResponseMessage) {responseHandler.handle((ResponseMessage) request);} else {ctx.fireChannelRead(request);}
}private void processRpcRequest(final RpcRequest req) {try {rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {@Overridepublic void onSuccess(ByteBuffer response) {respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));}@Overridepublic void onFailure(Throwable e) {respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));}});} catch (Exception e) {logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));} finally {req.body().release();}
}

在channelRead()方法中,远程调用在完成类型判断后,最后将会通过NettyRpcHandler的handle()方法处理。

override def receive(client: TransportClient,message: ByteBuffer,callback: RpcResponseCallback): Unit = {val messageToDispatch = internalReceive(client, message)dispatcher.postRemoteMessage(messageToDispatch, callback)
}

在NettyRpcHandler中,首先通过internalReceive()方法,反序列化为RequestMessage,之后调用Dispatcher的 postRemoteMessage()方法,定位到具体的endpoint上前去执行。

private def postMessage(endpointName: String,message: InboxMessage,callbackIfStopped: (Exception) => Unit): Unit = {val error = synchronized {val data = endpoints.get(endpointName)if (stopped) {Some(new RpcEnvStoppedException())} else if (data == null) {Some(new SparkException(s"Could not find $endpointName."))} else {data.inbox.post(message)receivers.offer(data)None}}// We don't need to call `onStop` in the `synchronized` blockerror.foreach(callbackIfStopped)
}

Dispatcher在postMessage()方法中完成对于远程调用的分发,此处将会根据远端携带的ref的name从本地name与EndPointData的映射关系中找到对应的endpoint进行处理,如果能够找到,将会把这条消息放到endpoint对应的inbox邮箱中等待endpoint获取后进行处理。

在Dispatcher中,维护着一个线程池,执行MessageLoop线程。

private class MessageLoop extends Runnable {override def run(): Unit = {try {while (true) {try {val data = receivers.take()if (data == PoisonPill) {// Put PoisonPill back so that other MessageLoops can see it.receivers.offer(PoisonPill)return}data.inbox.process(Dispatcher.this)} catch {case NonFatal(e) => logError(e.getMessage, e)}}} catch {case ie: InterruptedException => // exit}}
}

MessageLoop将会不断从所有有消息但是还未处理的EndPointData集合中依次取出,调用其inbox信箱的process()方法调用endpoint的receive()方法根据远程调用的事件类型选择具体的逻辑处理对应的远程调用。

env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))

服务端通过RpcEnv的setUpEndpoint()方法注册一个EndPointData到Dispatcher当中,以便在上述消息分发的时候,可以找到对应的EndPointData,也就是处理事件远程调用的actor模型。

spark 如何用netty实现akka的actor模型相关推荐

  1. Akka入门(二)Akka的Actor模型如何满足现代分布式系统需求

    Actor模型允许开发者: 在不诉诸锁定的情况下实施封装. 使用协作实体的模型对信号做出反应,改变状态,并相互发送信号以推动整个应用程序向前发展. 不要担心与我们的世界观不匹配的执行机制. (一) 消 ...

  2. scala之Akka的Actor模型(上)

    原文地址:http://my.oschina.net/jingxing05/blog/287213 明确并行和并发 看两张图 并行parallelism 并发concurrency 关键点在于 多个任 ...

  3. 【Akka】Actor模型探索

    Akka是什么 Akka就是为了改变编写高容错性和强可扩展性的并发程序而生的.通过使用Actor模型我们提升了抽象级别,为构建正确的可扩展并发应用提供了一个更好的平台.在容错性方面我们采取了" ...

  4. Akka之actor模型

    一 定义Actor import akka.actor.{Props, ActorSystem, Actor} import akka.actor.Actor.Receive import akka. ...

  5. 【Flink】FLink 通讯组件 Akka与Actor 模型

    1.概述 本博客是 视频的笔记,这个是讲解 flink 1.12 源码的,入门非常的好. 尚硅谷2021最新Flink内核源码解析课程(从入门到精通) 2.介绍 Flink内部节点之间的通信是用Akk ...

  6. Scala Akka的Actor模型

  7. Actor模型与Akka

    Actor模型与Akka 一. Actor模型 Actor模型概念 一个概念模型,用于处理并发计算 Actor模型内部的状态由自己的行为维护,外部线程不能直接调用对象的行为,必须通过消息才能激发行为, ...

  8. 95-848-020-源码-AKKA-Akka与Actor 模型

    1.概述 Akka是一个用来开发支持并发.容错.扩展性的应用程序框架.它是actor model的实现,因此跟Erlang的并发模型很像.在actor模型的上下文中,所有的活动实体都被认为是互不依赖的 ...

  9. actor 模型原理 (二)

    现在开始研究一下akka的actor模型是怎么实现的: 老外写了一个程序说明actor的工作机制,下图就是学生给老师发邮件的具体示意图,那么1-6一共6个步骤 1.学生创建actor system , ...

最新文章

  1. 新兴内存技术准备突围
  2. 线段树专辑——pku 2886 Who Gets the Most Candies?
  3. nuxt静态部署_nuxt静态部署打包相对路径操作
  4. ad域帐号登录提示无法处理请求_微软Windows Server之AD域控制器迁移测试方案
  5. c语言实现双链表的基本操作—增删改查
  6. INFO:安装包文件共享(Shared Files)设置注意事项
  7. SDN精华问答 | 使用SDN的一个例子
  8. hdoj 3400 三分
  9. 一路走来一路歌—我和团队有个约会
  10. redis 介绍与安装
  11. 使用PublishSetting快速在Powershell中登录Azure
  12. bpm js 计算 音乐_构建Node.js和Arduino执行控制
  13. cpp头文件方法大全
  14. 紫光服务器管理口装系统,紫光一键重装系统步骤方法
  15. memory exhausted mysql 42000 1064
  16. UG NX二次开发(C#)-曲线-NXOpen.Curve初探
  17. JavaWeb学生信息管理系统
  18. 【热门】新生儿起名测名:女孩带墨字的名字有哪些
  19. PAT乙 1074. 宇宙无敌加法器
  20. React Native TEXT 组件文字显示不全 异常解决(小米文字显示不全:小米10 ,Redmi k30出现)

热门文章

  1. java.security.NoSuchAlgorithmException: SHA_256 MessageDigest not available
  2. Kylin开启Kerberos安全认证
  3. spring基础——<bean>scope属性
  4. Vue 自定义组件 —— slot插槽
  5. Linux文本复制到记事本文本文件乱码,解决“在windows里的记事本里编辑的汉字文本文件,上传到linux服务器上出现乱码“问题...
  6. python分支语句_Python中的分支语句和循环语句及案例
  7. DDGScreenShot —图片加各种滤镜高逼格操作
  8. ObjectDataSource与GridView配合使用经验总结系列二:分页
  9. 北京高院宣判:微信商标案终审驳回上诉 维持原判
  10. STM32中的位带(bit-band)操作