pyspark jvm端的scala代码PythonRDD

代码版本为 spark 2.2.0

1.PythonRDD.object

这个静态类是pyspark的一些基础入口

// 这里不会把这个类全部内容都介绍,因为大部分都是静态接口,被pyspark的同名代码调用
// 这里介绍几个主要函数
// 在pyspark的RDD中作为所有action的基础的collect方法调用的collectAndServer方法也在这个对象中被定义
private[spark] object PythonRDD extends Logging {//被pyspark.SparkContext.runJob调用//提供rdd.collect的功能,提交jobdef runJob(sc: SparkContext,rdd: JavaRDD[Array[Byte]],partitions: JArrayList[Int]): Int = {type ByteArray = Array[Byte]type UnrolledPartition = Array[ByteArray]val allPartitions: Array[UnrolledPartition] =sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala)val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)serveIterator(flattenedPartition.iterator,s"serve RDD ${rdd.id} with partitions ${partitions.asScala.mkString(",")}")}// 整个pyspark.RDD 的action都是在这个函数中被触发的// pyspark.RDD 的collect通过调用这个方法被触发rdd的执行和任务提交def collectAndServe[T](rdd: RDD[T]): Int = {//参数rdd 即pyspark中RDD里的_jrdd, 对应的是scala里数据源rdd或pythonRDD// 这里rdd.collect() 触发了任务开始运行serveIterator(rdd.collect().iterator, s"serve RDD ${rdd.id}")}//这个函数的作用是将计算结果写入到本地socket当中,再在pyspark里读取本地socket获取结果def serveIterator[T](items: Iterator[T], threadName: String): Int = {// 可以看见socket在本地随机端口和localhost上建立出来的val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost"))// Close the socket if no connection in 3 secondsserverSocket.setSoTimeout(3000)// 这里启动一个线程负责将结果写入到socket中new Thread(threadName) {setDaemon(true)override def run() {try {val sock = serverSocket.accept()val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream))Utils.tryWithSafeFinally {//具体负责写的是此函数,此函数主要做一些类型和序列化工作writeIteratorToStream(items, out)} {out.close()}} catch {case NonFatal(e) =>logError(s"Error while sending iterator", e)} finally {serverSocket.close()}}}.start()// 最后返回此socket的网络端口, 这样pyspark里就可以通过此端口读取数据serverSocket.getLocalPort}// 此函数负责写入数据结果// 做一些类型检查和对应的序列化工作// PythonRunner中WriterThread写入数据时使用的也是此函数def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {def write(obj: Any): Unit = obj match {case null =>dataOut.writeInt(SpecialLengths.NULL)case arr: Array[Byte] =>dataOut.writeInt(arr.length)dataOut.write(arr)case str: String =>writeUTF(str, dataOut)case stream: PortableDataStream =>write(stream.toArray())case (key, value) =>write(key)write(value)case other =>throw new SparkException("Unexpected element type " + other.getClass)}iter.foreach(write)}}

转载于:https://www.cnblogs.com/cloud-zhao/p/9048063.html

pyspark对应的scala代码PythonRDD对象相关推荐

  1. pyspark对应的scala代码PythonRDD类

    pyspark jvm端的scala代码PythonRDD 代码版本为 spark 2.2.0 1.PythonRDD.class 这个rdd类型是python能接入spark的关键 //这是一个标准 ...

  2. Scala 类和对象详解

    Scala 类和对象 类是对象的抽象,而对象是类的具体实例.类是抽象的,不占用内存,而对象是具体的,占用存储空间.类是用于创建对象的蓝图,它是一个定义包括在特定类型的对象中的方法和变量的软件模板. 我 ...

  3. scala代码示例_Scala注释示例

    scala代码示例 Scala Annotations are metadata or extra information added to the program source code. Like ...

  4. scala代码示例_Scala异常处理示例

    scala代码示例 Scala Exception handling is similar to exception handling in Java. However Scala does not ...

  5. scala代码示例_Scala元组和地图示例

    scala代码示例 Scala tuple is a collection of items together of different data types. Scala tuple is immu ...

  6. scala代码示例_Scala集合示例

    scala代码示例 Scala Collections are the containers that hold sequenced linear set of items like List, Se ...

  7. 学习Scala:伴生对象的实现原理

    在上一篇关于Scala的文章 学习Scala:孤立对象的实现原理 中, 主要分析了孤立对象是如何实现的. 首先回顾一下. 孤立对象是只有一个object关键字修饰的对象. 该对象会编译成两个class ...

  8. Java线程同步:synchronized锁住的是代码还是对象

    在Java中,synchronized关键字是用来控制线程同步的,就是在多线程的环境下,控制synchronized代码段不被多个线程同时执行.synchronized既可以加在一段代码上,也可以加在 ...

  9. java 运行scala_使用java命令运行scala代码

    Scala是运行在JVM上的语言,跑在标准的Java平台上,可以与所有的Java库实现无缝交互. 下面运行一个小程序来看看在JVM上如何使用java命令来运行Scala程序. 在编写代码之前,我们首先 ...

最新文章

  1. 硬核NeruIPS 2018最佳论文,一个神经了的常微分方程
  2. Dispose() C# 优化内存
  3. 使用双亲指针表示法存储一棵树,可以方便解决下列哪个应用问题( )
  4. LeetCode 916. 单词子集(计数)
  5. IBM推新编码系统 实现高清视频技术大突破
  6. Oracle 12c与GoldenGate 12c的一些问答
  7. vue 动态拼接style_vue style width a href动态拼接问题的解决
  8. 万事无忧之看看网站的PR值
  9. linux环境下python 库模块安装
  10. oa 触发器导出流程html,哪些配套产品帮南京OA画龙点睛
  11. 全球区块链专利排行榜中国52家企业上榜
  12. 今日睡眠质量记录77分
  13. sourcetree添加gitignore不生效解决方案
  14. 蔷薇灵动或成为云安全领域“独角兽”?
  15. 蚂蚁电竞ANT27VQ评测
  16. 微信公众号--发送模板消息
  17. MinIO集群怎么接入Prometheus监控?(上)
  18. CSV文件如何使用EXCEL打开
  19. 数位dp算法——洛谷p1980
  20. HDUOJ1865 1string

热门文章

  1. 视频特性TI(时间信息)和SI(空间信息)的计算工具:TIandSI-压缩码流版
  2. python中plot不能显示标签_python 2: 解决python中的plot函数的图例legend不能显示中文问题...
  3. java putall实现_java putAll与addAll的小区别
  4. jQuery 学习-样式篇(七):jQuery 控制元素类属性
  5. nginx防盗链功能
  6. Vue打包并发布项目
  7. HTML5几大新特性
  8. mysql拷贝目录迁移方案_mysql 直接拷贝data 目录下文件 进行数据库迁移时遇到的一些问题??...
  9. 项目结构设计 java_Java项目架构设计
  10. python解释执行器_有关Python脚本相关说明介绍