spark 如何用netty实现akka的actor模型
Spark的中,通过netty实现了类似akka的actor机制。
在spark中,一个EndPointData就类似一个akka中的actor。
private class EndpointData(val name: String,val endpoint: RpcEndpoint,val ref: NettyRpcEndpointRef) {val inbox = new Inbox(ref, endpoint)
}
一个EndPointData由name,endpoint,ref,inbox四个成员组成。
其中name为该actor的名称,endpoint处理远程消息的处理,ref则为远程消息调用时客户端的目标对象,inbox则为endpoint处理消息来源时的消息存放信箱。
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASKlogDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +s"${executorData.executorHost}.")executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
举一个例子,当driver端向executor端下发task的时候,在CoarseGrainedSchedulerBackend中将会根据任务需要执行的所在executorId寻找得到相应的EndPointData,之后得到EndPointData下的ref调用send()方法即完成了一次远程调用,而调用的具体task数据将会被序列化,并包装在LaunchTask中,LaunchTask类似actor模型下对应的动作。
其中,ref为远程executor在向driver端注册executor的时候序列化后,在driver处反序列化的。在其send()方法中,会构造一条RequestMessage。
override def send(message: Any): Unit = {require(message != null, "Message is null")nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
}
RequestMessage包含了调用方的远程地址,该ref本身供远程寻找具体的endpoint进行操作,和上述的具体消息。
private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {if (receiver.client != null) {message.sendWith(receiver.client)} else {require(receiver.address != null,"Cannot send message to client endpoint with no listen address.")val targetOutbox = {val outbox = outboxes.get(receiver.address)if (outbox == null) {val newOutbox = new Outbox(this, receiver.address)val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)if (oldOutbox == null) {newOutbox} else {oldOutbox}} else {outbox}}if (stopped.get) {// It's possible that we put `targetOutbox` after stopping. So we need to clean it.outboxes.remove(receiver.address)targetOutbox.stop()} else {targetOutbox.send(message)}}
}
在包装好了的具体消息,如果该次消息为远程调用,将会判断是否已经完成了对应远程地址的netty客户端,如果已经完成了对应客户端的创建,将会直接发送,否则将会构造一个对应客户端的outbox缓存将要发送的数据,outbox中维护了一条队列用来存放等到发送的消息,直到对应客户端初试化完成后将会把outbox中的消息通过对应的netty客户端发送。
到executor观察其如何进行监听并处理相关的远程调用。
远程调用的主要处理在于Dispatcher类,其生成与封装在NettyRpcEnv的初始化过程中,NettyRpcEnv的初试化随着整个SparkEnv的初始化被调用。
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)private val transportContext = new TransportContext(transportConf,new NettyRpcHandler(dispatcher, this, streamManager))
其中Dispatcher被初试化并封装在NettyRpcHandler当中,而NettyRpcHandler将作为上下文的一员,后续在Netty服务端的初始化当中,将该NettyRpcHandler的调用写到ChannelHandler重写的channelRead()方法中。
@Override
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {if (request instanceof RequestMessage) {requestHandler.handle((RequestMessage) request);} else if (request instanceof ResponseMessage) {responseHandler.handle((ResponseMessage) request);} else {ctx.fireChannelRead(request);}
}private void processRpcRequest(final RpcRequest req) {try {rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {@Overridepublic void onSuccess(ByteBuffer response) {respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));}@Overridepublic void onFailure(Throwable e) {respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));}});} catch (Exception e) {logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));} finally {req.body().release();}
}
在channelRead()方法中,远程调用在完成类型判断后,最后将会通过NettyRpcHandler的handle()方法处理。
override def receive(client: TransportClient,message: ByteBuffer,callback: RpcResponseCallback): Unit = {val messageToDispatch = internalReceive(client, message)dispatcher.postRemoteMessage(messageToDispatch, callback)
}
在NettyRpcHandler中,首先通过internalReceive()方法,反序列化为RequestMessage,之后调用Dispatcher的 postRemoteMessage()方法,定位到具体的endpoint上前去执行。
private def postMessage(endpointName: String,message: InboxMessage,callbackIfStopped: (Exception) => Unit): Unit = {val error = synchronized {val data = endpoints.get(endpointName)if (stopped) {Some(new RpcEnvStoppedException())} else if (data == null) {Some(new SparkException(s"Could not find $endpointName."))} else {data.inbox.post(message)receivers.offer(data)None}}// We don't need to call `onStop` in the `synchronized` blockerror.foreach(callbackIfStopped)
}
Dispatcher在postMessage()方法中完成对于远程调用的分发,此处将会根据远端携带的ref的name从本地name与EndPointData的映射关系中找到对应的endpoint进行处理,如果能够找到,将会把这条消息放到endpoint对应的inbox邮箱中等待endpoint获取后进行处理。
在Dispatcher中,维护着一个线程池,执行MessageLoop线程。
private class MessageLoop extends Runnable {override def run(): Unit = {try {while (true) {try {val data = receivers.take()if (data == PoisonPill) {// Put PoisonPill back so that other MessageLoops can see it.receivers.offer(PoisonPill)return}data.inbox.process(Dispatcher.this)} catch {case NonFatal(e) => logError(e.getMessage, e)}}} catch {case ie: InterruptedException => // exit}}
}
MessageLoop将会不断从所有有消息但是还未处理的EndPointData集合中依次取出,调用其inbox信箱的process()方法调用endpoint的receive()方法根据远程调用的事件类型选择具体的逻辑处理对应的远程调用。
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
服务端通过RpcEnv的setUpEndpoint()方法注册一个EndPointData到Dispatcher当中,以便在上述消息分发的时候,可以找到对应的EndPointData,也就是处理事件远程调用的actor模型。
spark 如何用netty实现akka的actor模型相关推荐
- Akka入门(二)Akka的Actor模型如何满足现代分布式系统需求
Actor模型允许开发者: 在不诉诸锁定的情况下实施封装. 使用协作实体的模型对信号做出反应,改变状态,并相互发送信号以推动整个应用程序向前发展. 不要担心与我们的世界观不匹配的执行机制. (一) 消 ...
- scala之Akka的Actor模型(上)
原文地址:http://my.oschina.net/jingxing05/blog/287213 明确并行和并发 看两张图 并行parallelism 并发concurrency 关键点在于 多个任 ...
- 【Akka】Actor模型探索
Akka是什么 Akka就是为了改变编写高容错性和强可扩展性的并发程序而生的.通过使用Actor模型我们提升了抽象级别,为构建正确的可扩展并发应用提供了一个更好的平台.在容错性方面我们采取了" ...
- Akka之actor模型
一 定义Actor import akka.actor.{Props, ActorSystem, Actor} import akka.actor.Actor.Receive import akka. ...
- 【Flink】FLink 通讯组件 Akka与Actor 模型
1.概述 本博客是 视频的笔记,这个是讲解 flink 1.12 源码的,入门非常的好. 尚硅谷2021最新Flink内核源码解析课程(从入门到精通) 2.介绍 Flink内部节点之间的通信是用Akk ...
- Scala Akka的Actor模型
- Actor模型与Akka
Actor模型与Akka 一. Actor模型 Actor模型概念 一个概念模型,用于处理并发计算 Actor模型内部的状态由自己的行为维护,外部线程不能直接调用对象的行为,必须通过消息才能激发行为, ...
- 95-848-020-源码-AKKA-Akka与Actor 模型
1.概述 Akka是一个用来开发支持并发.容错.扩展性的应用程序框架.它是actor model的实现,因此跟Erlang的并发模型很像.在actor模型的上下文中,所有的活动实体都被认为是互不依赖的 ...
- actor 模型原理 (二)
现在开始研究一下akka的actor模型是怎么实现的: 老外写了一个程序说明actor的工作机制,下图就是学生给老师发邮件的具体示意图,那么1-6一共6个步骤 1.学生创建actor system , ...
最新文章
- 新兴内存技术准备突围
- 线段树专辑——pku 2886 Who Gets the Most Candies?
- nuxt静态部署_nuxt静态部署打包相对路径操作
- ad域帐号登录提示无法处理请求_微软Windows Server之AD域控制器迁移测试方案
- c语言实现双链表的基本操作—增删改查
- INFO:安装包文件共享(Shared Files)设置注意事项
- SDN精华问答 | 使用SDN的一个例子
- hdoj 3400 三分
- 一路走来一路歌—我和团队有个约会
- redis 介绍与安装
- 使用PublishSetting快速在Powershell中登录Azure
- bpm js 计算 音乐_构建Node.js和Arduino执行控制
- cpp头文件方法大全
- 紫光服务器管理口装系统,紫光一键重装系统步骤方法
- memory exhausted mysql 42000 1064
- UG NX二次开发(C#)-曲线-NXOpen.Curve初探
- JavaWeb学生信息管理系统
- 【热门】新生儿起名测名:女孩带墨字的名字有哪些
- PAT乙 1074. 宇宙无敌加法器
- React Native TEXT 组件文字显示不全 异常解决(小米文字显示不全:小米10 ,Redmi k30出现)
热门文章
- java.security.NoSuchAlgorithmException: SHA_256 MessageDigest not available
- Kylin开启Kerberos安全认证
- spring基础——<bean>scope属性
- Vue 自定义组件 —— slot插槽
- Linux文本复制到记事本文本文件乱码,解决“在windows里的记事本里编辑的汉字文本文件,上传到linux服务器上出现乱码“问题...
- python分支语句_Python中的分支语句和循环语句及案例
- DDGScreenShot —图片加各种滤镜高逼格操作
- ObjectDataSource与GridView配合使用经验总结系列二:分页
- 北京高院宣判:微信商标案终审驳回上诉 维持原判
- STM32中的位带(bit-band)操作