pyspark jvm端的scala代码PythonRDD

代码版本为 spark 2.2.0

1.PythonRDD.class

这个rdd类型是python能接入spark的关键

//这是一个标准的RDD实现,实现对应的compute,partitioner,getPartitions等方法
//这个PythonRDD就是pyspark里PipelinedRDD里_jrdd属性方法返回的东西
//parent就是PipelinedRDD里传递进来的_prev_jrdd,是最初构建的数据源RDD
private[spark] class PythonRDD(parent: RDD[_],  //这个parentRDD是关键,python使用spark的所有数据来源都从这里来的func: PythonFunction, //这个是用户实现的python计算逻辑preservePartitoning: Boolean)extends RDD[Array[Byte]](parent) {val bufferSize = conf.getInt("spark.buffer.size", 65536)val reuse_worker = conf.getBoolean("spark.python.worker.reuse", true)override def getPartitions: Array[Partition] = firstParent.partitionsoverride val partitioner: Option[Partitioner] = {if (preservePartitoning) firstParent.partitioner else None}val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {//调用PythonRunner执行此处任务逻辑//这里这个PythonRunner跟spark-submit时执行的PythonRunner不是同一个东西val runner = PythonRunner(func, bufferSize, reuse_worker)//执行runner的计算逻辑,第一个参数是spark数据源rdd的计算结果//firstParent.iterator会触发parent 这个rdd的计算,返回计算结果//这里第一个参数的rdd跟pyspark中RDD里的_jrdd是同一个东西runner.compute(firstParent.iterator(split, context), split.index, context)}
}

2.PythonRunner.class

这个类是rdd内部执行计算时的实体计算类,并不是代码提交时那个启动py4j的PythonRunner

/** 这个类做了三件事* 1.启动pyspark.daemon 接收task启动work执行接收到的task* 2.启动writerThread 将数据源的计算结果写到pyspark.work中* 3.从pyspark.work中拉取执行结果* * writerThread写的数据就是pyspark中_jrdd计算出来的结果,也就是数据源rdd的数据*/
private[spark] class PythonRunner(funcs: Seq[ChainedPythonFunctions],bufferSize: Int,reuse_worker: Boolean,isUDF: Boolean,argOffsets: Array[Array[Int]])extends Logging {require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs")//python执行的环境和命令private val envVars = funcs.head.funcs.head.envVarsprivate val pythonExec = funcs.head.funcs.head.pythonExecprivate val pythonVer = funcs.head.funcs.head.pythonVerprivate val accumulator = funcs.head.funcs.head.accumulatordef compute(inputIterator: Iterator[_],partitionIndex: Int,context: TaskContext): Iterator[Array[Byte]] = {val startTime = System.currentTimeMillisval env = SparkEnv.getval localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",")envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor threadif (reuse_worker) {envVars.put("SPARK_REUSE_WORKER", "1")}//创建pyspark 的work进程,底层执行的是pyspark.daemon//这个方法保证一次任务只启动一个pyspark.daemon//返回结果是跟work通信用的socket//具体分析将在其它部分记录val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)@volatile var released = false// 创建writerThread,把数据源数据写到socket,发送到pyspark.workval writerThread = new WriterThread(env, worker, inputIterator, partitionIndex, context)//注册task完成监听,完成后停止writerThread线程context.addTaskCompletionListener { context =>writerThread.shutdownOnTaskCompletion()if (!reuse_worker || !released) {try {worker.close()} catch {case e: Exception =>logWarning("Failed to close worker socket", e)}}}writerThread.start()new MonitorThread(env, worker, context).start()val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))// 创建拉取pyspark.work执行结果的迭代器val stdoutIterator = new Iterator[Array[Byte]] {override def next(): Array[Byte] = {val obj = _nextObjif (hasNext) {_nextObj = read()}obj}private def read(): Array[Byte] = {if (writerThread.exception.isDefined) {throw writerThread.exception.get}try {stream.readInt() match {case length if length > 0 =>val obj = new Array[Byte](length)stream.readFully(obj)objcase 0 => Array.empty[Byte]case SpecialLengths.TIMING_DATA =>// Timing data from workerval bootTime = stream.readLong()val initTime = stream.readLong()val finishTime = stream.readLong()val boot = bootTime - startTimeval init = initTime - bootTimeval finish = finishTime - initTimeval total = finishTime - startTimelogInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot,init, finish))val memoryBytesSpilled = stream.readLong()val diskBytesSpilled = stream.readLong()context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)read()case SpecialLengths.PYTHON_EXCEPTION_THROWN =>// Signals that an exception has been thrown in pythonval exLength = stream.readInt()val obj = new Array[Byte](exLength)stream.readFully(obj)throw new PythonException(new String(obj, StandardCharsets.UTF_8),writerThread.exception.getOrElse(null))case SpecialLengths.END_OF_DATA_SECTION =>// We've finished the data section of the output, but we can still// read some accumulator updates:val numAccumulatorUpdates = stream.readInt()(1 to numAccumulatorUpdates).foreach { _ =>val updateLen = stream.readInt()val update = new Array[Byte](updateLen)stream.readFully(update)accumulator.add(update)}// Check whether the worker is ready to be re-used.if (stream.readInt() == SpecialLengths.END_OF_STREAM) {if (reuse_worker) {env.releasePythonWorker(pythonExec, envVars.asScala.toMap, worker)released = true}}null}} catch {case e: Exception if context.isInterrupted =>logDebug("Exception thrown after task interruption", e)throw new TaskKilledException(context.getKillReason().getOrElse("unknown reason"))case e: Exception if env.isStopped =>logDebug("Exception thrown after context is stopped", e)null  // exit silentlycase e: Exception if writerThread.exception.isDefined =>logError("Python worker exited unexpectedly (crashed)", e)logError("This may have been caused by a prior exception:", writerThread.exception.get)throw writerThread.exception.getcase eof: EOFException =>throw new SparkException("Python worker exited unexpectedly (crashed)", eof)}}var _nextObj = read()override def hasNext: Boolean = _nextObj != null}//返回这个拉取数据结果的迭代器new InterruptibleIterator(context, stdoutIterator)}/*** WriterThread 线程的实现代码*/class WriterThread(env: SparkEnv,worker: Socket,inputIterator: Iterator[_],partitionIndex: Int,context: TaskContext)extends Thread(s"stdout writer for $pythonExec") {@volatile private var _exception: Exception = nullprivate val pythonIncludes = funcs.flatMap(_.funcs.flatMap(_.pythonIncludes.asScala)).toSetprivate val broadcastVars = funcs.flatMap(_.funcs.flatMap(_.broadcastVars.asScala))setDaemon(true)/** Contains the exception thrown while writing the parent iterator to the Python process. */def exception: Option[Exception] = Option(_exception)/** Terminates the writer thread, ignoring any exceptions that may occur due to cleanup. */def shutdownOnTaskCompletion() {assert(context.isCompleted)this.interrupt()}// 主要逻辑在run里,把数据源rdd的执行结果写进去// 把广播变量和环境,以及python的执行逻辑代码写进去// 把需要计算的数据源数据写进去override def run(): Unit = Utils.logUncaughtExceptions {try {TaskContext.setTaskContext(context)val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)val dataOut = new DataOutputStream(stream)// Partition indexdataOut.writeInt(partitionIndex)// Python version of driverPythonRDD.writeUTF(pythonVer, dataOut)// Write out the TaskContextInfodataOut.writeInt(context.stageId())dataOut.writeInt(context.partitionId())dataOut.writeInt(context.attemptNumber())dataOut.writeLong(context.taskAttemptId())// sparkFilesDirPythonRDD.writeUTF(SparkFiles.getRootDirectory(), dataOut)// Python includes (*.zip and *.egg files)dataOut.writeInt(pythonIncludes.size)for (include <- pythonIncludes) {PythonRDD.writeUTF(include, dataOut)}// Broadcast variablesval oldBids = PythonRDD.getWorkerBroadcasts(worker)val newBids = broadcastVars.map(_.id).toSet// number of different broadcastsval toRemove = oldBids.diff(newBids)val cnt = toRemove.size + newBids.diff(oldBids).sizedataOut.writeInt(cnt)for (bid <- toRemove) {// remove the broadcast from workerdataOut.writeLong(- bid - 1)  // bid >= 0oldBids.remove(bid)}for (broadcast <- broadcastVars) {if (!oldBids.contains(broadcast.id)) {// send new broadcastdataOut.writeLong(broadcast.id)PythonRDD.writeUTF(broadcast.value.path, dataOut)oldBids.add(broadcast.id)}}dataOut.flush()// Serialized command:if (isUDF) {dataOut.writeInt(1)dataOut.writeInt(funcs.length)funcs.zip(argOffsets).foreach { case (chained, offsets) =>dataOut.writeInt(offsets.length)offsets.foreach { offset =>dataOut.writeInt(offset)}dataOut.writeInt(chained.funcs.length)chained.funcs.foreach { f =>dataOut.writeInt(f.command.length)dataOut.write(f.command)}}} else {dataOut.writeInt(0)val command = funcs.head.funcs.head.commanddataOut.writeInt(command.length)dataOut.write(command)}// Data valuesPythonRDD.writeIteratorToStream(inputIterator, dataOut)dataOut.writeInt(SpecialLengths.END_OF_DATA_SECTION)dataOut.writeInt(SpecialLengths.END_OF_STREAM)dataOut.flush()} catch {case e: Exception if context.isCompleted || context.isInterrupted =>logDebug("Exception thrown after task completion (likely due to cleanup)", e)if (!worker.isClosed) {Utils.tryLog(worker.shutdownOutput())}case e: Exception =>// We must avoid throwing exceptions here, because the thread uncaught exception handler// will kill the whole executor (see org.apache.spark.executor.Executor)._exception = eif (!worker.isClosed) {Utils.tryLog(worker.shutdownOutput())}}}}// 监控task是不是还在执行class MonitorThread(env: SparkEnv, worker: Socket, context: TaskContext)extends Thread(s"Worker Monitor for $pythonExec") {setDaemon(true)override def run() {// Kill the worker if it is interrupted, checking until task completion.// TODO: This has a race condition if interruption occurs, as completed may still become true.while (!context.isInterrupted && !context.isCompleted) {Thread.sleep(2000)}if (!context.isCompleted) {try {logWarning("Incomplete task interrupted: Attempting to kill Python Worker")env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)} catch {case e: Exception =>logError("Exception when trying to kill worker", e)}}}}
}

转载于:https://www.cnblogs.com/cloud-zhao/p/9046850.html

pyspark对应的scala代码PythonRDD类相关推荐

  1. pyspark对应的scala代码PythonRDD对象

    pyspark jvm端的scala代码PythonRDD 代码版本为 spark 2.2.0 1.PythonRDD.object 这个静态类是pyspark的一些基础入口 // 这里不会把这个类全 ...

  2. Scala类型系统——高级类类型(higher-kinded types)

    高级类类型就是使用其他类型构造成为一个新的类型,因此也称为 类型构造器(type constructors).它的语法和高阶函数(higher-order functions)相似,高阶函数就是将其它 ...

  3. Scala(三):类

    类:Class 1.简单类和无参方法 2.带getter和setter属性 3.只带getter属性 4.对象私有字段 5.Bean属性 6.辅助构造器 7.主构造器 8.嵌套类 1.简单类和无参方法 ...

  4. scala使用java类_使用Java和Scala将Play Framework 2应用程序部署到Openshift

    scala使用java类 几个星期, 马克·阿特伍德 ( Mark Atwood) , 豪尔赫·艾利斯 ( Jorge Aliss )和我塞巴斯蒂安 ·斯卡塔诺 ( SebastiánScarano) ...

  5. scala代码示例_Scala注释示例

    scala代码示例 Scala Annotations are metadata or extra information added to the program source code. Like ...

  6. scala代码示例_Scala集合示例

    scala代码示例 Scala Collections are the containers that hold sequenced linear set of items like List, Se ...

  7. Scala系列-3、scala中的类和对象有哪些?

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 传送门:大数据系列文章目录 目录 如何使用IDEA创建scala项目 创建普通的scala项目 ...

  8. scala 编译插件_使用Scala插件在Griffon应用程序中编译和运行Scala代码

    scala 编译插件 用于Griffon的Scala插件 0.7.1版本现已发布. 这个插件可以在Griffon应用程序上编译和运行Scala代码. Scala插件使用LangBridge插件与其他J ...

  9. Scala面向对象基础--类和对象

    一.类和对象介绍 在Scala里,类是用关键字"class"开头的代码定义,它是用于创建对象的蓝图.一个类就是一个类型,不同的类就是不同的类型,一个对象的类型就是创建它用的那个类. ...

最新文章

  1. Pytorch中的数据加载
  2. Python入门之编程与编程语言
  3. nacos config基本使用
  4. CodeForces 351A Jeff and Rounding
  5. Java命令行界面(第22部分):argparser
  6. azure git怎么使用_Azure(一)Azure Traffic Manager为我们的Web项目提供负载均衡
  7. java 位运算取8位_Java 9 AOT 试用:仅支持 64 位 Linux和java.base 模块编译
  8. Python--网络编程
  9. java settimezone_Java时间处理2----时区TimeZone类方法探究(Java8以前)
  10. Effective Modern C++ 第二章 auto的使用
  11. 【转】详解:oracle10G 数据库名、实例名、ORACLE_SID
  12. vue 第五天 (事件监听基础)
  13. 安卓直播详细教程(一)-----bilibili开源播放器
  14. MQL5由简到繁系列一
  15. Lottie 动画在项目中的使用总结
  16. 每日excel学习之排序与筛选
  17. 水洼数dfs(java)
  18. Python - Flask 图片验证码和邮箱验证码的后端实现
  19. 基于 Matlab的录屏软件
  20. 微分环节的matlab仿真,典型环节的MATLAB仿真 实验二.doc

热门文章

  1. AIR中文帮助 第十章. 窗体(Windows)和菜单
  2. 百度初级认证有用吗_知乎“打败”了百度知道吗?
  3. 我的docker随笔18:阿里云docker仓库的使用
  4. freetype在Linux平台编译小记
  5. 【算法】剑指 Offer 21. 调整数组顺序使奇数位于偶数前面
  6. 【Flink】 Flink与Kafka版本对应关系
  7. 【MySQL】MySQL drop,truncate,delete 区别
  8. 【Es】Elasticsearch 7.x 新的集群协调层
  9. 【Zookeeper】zookeeper客户端KeeperErrorCode = ConnectionLoss
  10. 【Kafka】Mac 环境 Kafka诡异问题之kafka eagle 界面无法访问