构建PySpark环境

首先确保安装了python 2.7 ,强烈建议你使用Virtualenv方便python环境的管理。之后通过pip 安装pyspark

pip install pyspark

文件比较大,大约180多M,有点耐心。

下载 spark 2.2.0,然后解压到特定目录,设置SPARK_HOME即可。

其实如果通过spark-submit 提交程序,并不会需要额外安装pyspark, 这里通过pip安装的主要目的是为了让你的IDE能有代码提示。

PySpark worker启动机制

PySpark的工作原理是通过Spark里的PythonRDD启动一个(或者多个,以pythonExec, 和envVars为key)Python deamon进程,然后一旦有task过来了,就通过python deamon进程fork一个新的python worker。 python worker是可以复用的,并不会用完就立马销毁。一个task过来的流程为, 看看worker里有清闲的么,如果有,就直接返回。没有就fork一个新的worker.

PySpark 如何实现某个worker 里的变量单例

从前面PySpark worker启动机制里,我们可以看到,一个Python worker是可以反复执行任务的。在NLP任务中,我们经常要加载非常多的字典,我们希望字典只会加载一次。这个时候就需要做些额外处理了。做法如下:

class DictLoader(object):

clf = None

def __init__(self, baseDir, archive_auto_extract, zipResources):

if not DictLoader.is_loaded():

DictLoader.load_dic(baseDir)

@staticmethod

def load_dic(baseDir):

globPath = baseDir + "/dic/*.dic"

dicts = glob.glob(globPath)

for dictFile in dicts:

temp = dictFile if os.path.exists(dictFile) else SparkFiles.get(dictFile)

jieba.load_userdict(temp)

jieba.cut("nice to meet you")

DictLoader.clf = "SUCCESS"

@staticmethod

def is_loaded():

return DictLoader.clf is not None

定义一个cls对象,并且使用staicmethod annotation,这样就可以模拟类似Java的静态方法了。之后你可以随心所欲的loader = DictLoader ()

如何加载资源文件

在NLP处理了,字典是少不了,前面我们避免了一个worker多次加载字典,现在还有一个问题,就是程序如何加载字典。通常我们希望能够把字典打成一个zip包,代码也打成一个zip包,然后通过下面的命令进行提交:

./bin/spark-submit \

--py-files dist/jobs.zip \

--files dist/dics.zip \

--master "local[*]" python/src/batch.py

自己开发的模块可以打包成jobs.zip,对应的spark任务单独成一个batch.py文件,然后字典打包成dics.zip.

那么程序中如何读取dics.zip里的文件呢? 在Spark standalone 和 local模式下,dics.zip在各个worker的工作目录里并不会被解压,所以需要额外处理下:

def __init__(self, baseDir, archive_auto_extract, zipResources):

if not DictLoader.is_loaded():

for zr in zipResources:

if not archive_auto_extract:

with zipfile.ZipFile(SparkFiles.getRootDirectory() + '/' + zr, 'r') as f:

f.extractall(".")

DictLoader(baseDir)

archive_auto_extract 判定是不是会自动解压(yarn模式下回自动解压),判断的方法为:

archive_auto_extract = spark.conf.get("spark.master").lower().startswith("yarn")

zipResources 则是所有需要解压的zip包的名字,对应获取的方法为:

zipfiles = [f.split("/")[-1] for f in spark.conf.get("spark.files").split(",") if f.endswith(".zip")]

对应的zipfiles所在的目录你可以这样拼接:

SparkFiles.getRootDirectory() + '/' + zfilename

所以如果你不是运行在yarn模式的情况下,你需要先解压,然后进行加载。获取路径的方式建议如下:

temp = dictFile if os.path.exists(dictFile) else SparkFiles.get(dictFile)

这样可以兼容IDE里运行,local/standalone/yarn 模式运行。

前面的jobs.zip文件里面全部是python文件,并不需要压缩就可以直接读到。

主动定义schema,避免spark auto inference schema

我之前写过这么一段代码:

oldr = df.rdd.map(

lambda row: Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"]))

然后我需要把oldr 变回为rdd,这个时候我这么用:

resultDf = spark.createDataFrame(oldr)

resultDf.mode("overwrite").format(...).save(...

这会导致oldr被执行两次,一次是为了做schema推测,一次是为了做实际的计算。

我们可以这么写:

from pyspark.sql.types import StructType, IntegerType, ArrayType, StructField, StringType, MapType

fields = [StructField("ids", ArrayType(IntegerType())), StructField("mainId", IntegerType()),

StructField("tags", MapType(StringType(), IntegerType()))]

resultDf = spark.createDataFrame(resultRdd, StructType(fields=fields)

这样显示的为rdd定义schema,就可以避免额外的推测了。

lambda 和 函数的选择

lambda可以定义匿名函数,但是表现力有限:

.map(

lambda row: Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"]))

我们也可以定义函数:

def create_new_row(row):

return Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"])

然后直接使用:

.map(create_new_row).....

如何定义udf函数/如何避免使用Python UDF函数

先定义一个常规的python函数:

# 自定义split函数

def split_sentence(s):

return s.split(" ")

转化为udf函数并且使用。

from pyspark.sql.functions import udf

from pyspark.sql.types import *

ss = udf(split_sentence, ArrayType(StringType()))

documentDF.select(ss("text").alias("text_array")).show()

唯一麻烦的是,定义好udf函数时,你需要指定返回值的类型。

使用Python 的udf函数,显然效率是会受到损伤的,我们建议使用标准库的函数,具体这么用:

from pyspark.sql import functions as f

documentDF.select(f.split("text", "\\s+").alias("text_array")).show()

pyspark.sql. functions 引用的都是spark的实现,所以效率会更高。

另外,在使用UDF函数的时候,发现列是NoneType 或者null,那么有两种可能:

在PySpark里,有时候会发现udf函数返回的值总为null,可能的原因有:

忘了写return

def abc(c):

"yes"

返回的类型不匹配。

比如你明明是一个FloatType,但是你定义的时候说是一个ArrayType,这个时候似乎不会报错,而是udf函数执行会是null.

这个问题之前在处理二进制字段时遇到了。我们理所当然的认为二进制应该是类型 ArrayType(Byte(),True) ,但实际上是BinaryType.

dataframe.show 问题

主要是python方面的问题。

sparkpython效率低_Effective PySpark(PySpark 常见问题)相关推荐

  1. 大家都说 Java 反射效率低,为什么呢?

    我们在 Java 开发中,难免会接触到反射,而在一些框架中,反射的运用更是常见.我相信,每次提到反射,大家的第一反应一定是反射效率低,尽量少使用.但是反射的效率到底低多少?反射效率低的原因在哪里?这篇 ...

  2. @老板:别开视频会议了,效率低没人care,斯坦福、微软都可以作证

    贾浩楠 发自 凹非寺 量子位 报道 | 公众号 QbitAI 必须吐槽一下视频会议这件事. 复工绊脚石,尴尬制造机. 时间长效率低,一不小心还容易成为"快乐源泉": 微软的一群员工 ...

  3. mysql迁移导致数据库效率低_MySQL数据库慢–排查问题总结(整理自《抽丝剥茧之MySQL疑难杂症排查》叶金荣)...

    1.常见瓶颈 (1)SQL效率低 (2)选项配置不当 (3)访问题飙升 (4)硬件性能低 (5)其他进程抢资源 2.怎样确认是MySQL存在瓶颈 top/free/vmstat/sar/mpstat确 ...

  4. 为什么大家都说 SELECT * 效率低?

    版权声明:本文为CSDN博主「_陈哈哈」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明.原文链接:https://blog.csdn.net/qq_39390545/ ...

  5. mysql越筛越少_面试官:为什么SELECT * 会导致查询效率低?

    面试官:"小陈,说一下你常用的SQL优化方式吧." 陈小哈:"那很多啊,比如不要用SELECT *,查询效率低.巴拉巴拉..." 面试官:"为什么不要 ...

  6. 外贸EDM邮件营销效率低的原因分析

    目前,越来越多的外贸企业采用EDM邮件营销作为推广的手段之一.这是因为EDM邮件营销作为一种主动的营销模式,低成本的运作,高效率的投送,使得企业纷纷采用这种方式.可其效率低是什么原因呢? 研究发现,8 ...

  7. 大家都说 Java 反射效率低,你知道原因在哪里么

    [这是 ZY 第 17 篇原创技术文章] 预备知识 了解 Java 反射基本用法 看完本文可以达到什么程度 了解 Java 反射原理及 Java 反射效率低的原因 文章概览 我们在 Java 开发中, ...

  8. MySql | 为什么大家都在说 Select * 效率低

    Hi ! 我是小小,今天带来本周的第三篇.为什么大家都在说 Select * 效率低. 面试现场 面试官:"小小啊,说一说你常用的SQL优化方式吧". 小小:"很多的,相 ...

  9. 如何查询mysql中执行效率低的sql语句

    一些小技巧 1. 如何查出效率低的语句? 在MySQL下,在启动参数中设置 --log-slow-queries=[文件名],就可以在指定的日志文件中记录执行时间超过long_query_time(缺 ...

最新文章

  1. 使用openssl生成ssl(https)证书
  2. Linux下的图形界面编程
  3. AI基础:数据可视化简易入门(Matplotlib 和 Seaborn)
  4. “幕后英雄”之Backing Fields【Microsoft Entity Framework Core随笔】
  5. Java描述设计模式(11):观察者模式
  6. SpringBoot+AOP构建多数据源的切换实践
  7. linux mysql外网连接不成功的可能原因
  8. @objc private 定义函数
  9. Host 'admin-PC' is not allowed to connect to this MySQL server
  10. 虚拟机器系统安装管理 Xencenter
  11. Linux + ChromiumOS + ....操作系统资源(持续更新...)
  12. [轻笔记] SHAP值的计算步骤
  13. 超微A+ Server 4124GS-TNR做主板集成RAID
  14. OGG REPA进程 Error ORA-01031报错处理
  15. 谈谈小程序的赚钱方式
  16. 什么是根域名服务器?
  17. 保研论坛app服务器网站,保研通论坛 - 中国最大的保研交流社区
  18. 计算机没考好的检讨书300百以上,考试反思检讨书300字(精选10篇)
  19. 做seo为什么要了解网站?
  20. PyCharm恢复初始设置

热门文章

  1. 博为峰Java技术文章 ——JavaSE Swing 如何使用进度条组件JProgressBarⅡ
  2. H3C vrrp *** ipsec 基本配置
  3. 抽象方法和虚方法的区别 [C#]
  4. Java 用栈实现队列
  5. Python3之max key参数学习记录
  6. ubuntu使用git时,终端不显示git分支。
  7. 软件工程网络15个人阅读作业1
  8. CentOS平滑更新nginx版本
  9. codevs1287 矩阵乘法
  10. hadoop常用基础命令