park安装略,下载解压配置下就OK 我使用的是spark-2.2.0-bin-hadoop2.7

安装完毕后需要配置一下SPARK_HOME:

SPARK_HOME=C:\spark\spark-2.2.0-bin-hadoop2.7

Path里也要记得添加一下:

Path=XXXX;%SPARK_HOME%\bin;

Python与Spark交互主要用到pyspark这个模块,所以需要准备好扩展包,详细请参考《

Whl安装好后,能得到一个py4j文件夹,但是还需要pyspark模块这个文件夹里的内容,pyspark的获得更简单,直接去复制spark-2.2.0-bin-hadoop2.7/python/pyspark就好了。

PS:在某些版本的pyspark调用时会出现,自己稍微查下原因,网上都有配套的py文件可以覆盖,这里不是本文的重点,所以略过。

我们在《Spark原理详解》中介绍过,RDD分为转化(transformation)和动作(action)两种操作。RDD是基于当前的partitions生成新的partitions;动作是基于当前的partitions生成返回对象(数值、集合、字典等)。所以在通过python调用spark的API时需要搞清楚返回值是什么。如果返回的是partitions,调用collect()函数可以拿到封装后的数据集,分区部分对客户端是透明的,也可以调用glom()来关心具体的分区情况。如果调用的是action那么就简单得多,API直接返回结果内容。

Map、Reduce API:

最典型,也是最基本的入门API

from pyspark import SparkContext

sc = SparkContext('local')

#第二个参数2代表的是分区数,默认为1

old=sc.parallelize([1,2,3,4,5],2)

newMap = old.map(lambda x:(x,x**2))

newReduce = old.reduce(lambda a,b : a+b)

print(newMap.glom().collect())

print(newReduce)

[[(1, 1), (2, 4)], [(3, 9), (4, 16), (5, 25)]]

15

SparkContext是代码的核心,初始化时需要设置spark的启动类型,分为local、Mesos、YARN、Standalone模式(详见

Map和reduce里都要设置一个function,我们这里用了lambda匿名函数来实现。从结果可以看将前两和后三个分别放在了1个分区中,reduce是个action直接返回的是key的sum。

预留问题:能否reduce按第二行进行求和合并,how?

flatMap、filter、distinc API:

数据的拆分、过滤和去重

sc = SparkContext('local')

old=sc.parallelize([1,2,3,4,5])

#新的map里将原来的每个元素拆成了3个

newFlatPartitions = old.flatMap(lambda x : (x, x+1, x*2))

#过滤,只保留小于6的元素

newFilterPartitions = newFlatPartitions.filter(lambda x: x<6)

#去重

newDiscinctPartitions = newFilterPartitions.distinct()

print(newFlatPartitions.collect())

print(newFilterPartitions.collect())

print(newDiscinctPartitions.collect())

[1, 2, 2, 2, 3, 4, 3, 4, 6, 4, 5, 8, 5, 6, 10]

[1, 2, 2, 2, 3, 4, 3, 4, 4, 5, 5]

[1, 2, 3, 4, 5]

Sample、taskSample、sampleByKey API:

数据的抽样,在机器学习中十分实用的功能,而它们有的是传输有的是动作,需要留意这个区别。

代码:

sc = SparkContext('local')

old=sc.parallelize(range(8))

samplePartition = [old.sample(withReplacement=True, fraction=0.5) for i in range(5)]

for num, element in zip(range(len(samplePartition)), samplePartition) :

print('sample: %s y=%s' %(str(num),str(element.collect())))

taskSamplePartition = [old.takeSample(withReplacement=False, num=4) for i in range(5)]

for num, element in zip(range(len(taskSamplePartition)), taskSamplePartition) :

#注意因为是action,所以element是集合对象,而不是rdd的分区

print('taskSample: %s y=%s' %(str(num),str(element)))

mapRdd = sc.parallelize([('B',1),('A',2),('C',3),('D',4),('E',5)])

y = [mapRdd.sampleByKey(withReplacement=False,

fractions={'A':0.5, 'B':1, 'C':0.2, 'D':0.6, 'E':0.8}) for i in range(5)]

for num, element in zip(range(len(y)), y) :

#注意因为是action,所以element是集合对象,而不是rdd的分区

print('y: %s y=%s' %(str(num),str(element.collect())))

sample: 0 y=[2, 5]

sample: 1 y=[0, 3, 3, 6]

sample: 2 y=[0, 4, 7]

sample: 3 y=[1, 3, 3, 3, 6, 7]

sample: 4 y=[2, 4, 6]

taskSample: 0 y=[3, 4, 1, 6]

taskSample: 1 y=[2, 5, 3, 4]

taskSample: 2 y=[7, 1, 2, 5]

taskSample: 3 y=[6, 3, 1, 2]

taskSample: 4 y=[4, 6, 5, 0]

y: 0 y=[('B', 1)]

y: 1 y=[('B', 1), ('D', 4), ('E', 5)]

y: 2 y=[('B', 1), ('A', 2), ('C', 3), ('E', 5)]

y: 3 y=[('B', 1), ('A', 2), ('D', 4), ('E', 5)]

y: 4 y=[('B', 1), ('A', 2), ('C', 3), ('E', 5)]

有几个参数需要说明下:

withReplacement代表取值后是否重新放回元素池,也就决定了某元素能否重复出现。

Fraction代表每个元素被取出来的概率。

Num代表取出元素的个数。

交集intersection、并集union、排序sortBy API:

sc = SparkContext('local')

rdd1 = sc.parallelize(['C','A','B','B'])

rdd2 = sc.parallelize(['A','A','D','E','B'])

rdd3 = rdd1.union(rdd2)

rdd4 = rdd1.intersection(rdd2)

print(rdd3.collect())

print(rdd4.collect())

print(rdd3.sortBy(lambda x : x[0]).collect())

['C', 'A', 'B', 'B', 'A', 'A', 'D', 'E', 'B']

['A', 'B']

['A', 'A', 'A', 'B', 'B', 'B', 'C', 'D', 'E']

flod折叠、aggregate聚合API:

这俩都是action,虽然pyspark提供了max、min、sum、count、mean、stdev(标准差,反应平均值的离散程度)、sampleStdev(与stdev意义相同,stdev分母N-1,sampleStdev分母N)、sampleVariance(方差,所有值平方和除N-1)、top、countByValue、first、collectAsMap等内置的统计函数,但是在某型特殊场景下还是希望能人工订制聚合的公式,需要用到这两个动作。

代码:

sc = SparkContext('local')

rdd1 = sc.parallelize([2,4,6,1])

rdd2 = sc.parallelize([2,4,6,1],4)

zeroValue = 0

foldResult = rdd1.fold(zeroValue,lambda element, accumulate : accumulate+element)

zeroValue = (1,2)

seqOp = lambda accumulate,element : (accumulate[0] + element, accumulate[1] * element)

combOp = lambda accumulate,element : (accumulate[0]+element[0], accumulate[1] * element[1])

aggregateResult = rdd1.aggregate(zeroValue,seqOp,combOp)

print(foldResult)

print(aggregateResult)

aggregateResult = rdd2.aggregate(zeroValue,seqOp,combOp)

print(foldResult)

print(aggregateResult)

13

(15, 192)

13

(18, 1536)

Fold略简单,但是agregate的理解非常难,不同的分区场景会得到不同的结果,这里用图来解释说明下:

默认1个partition的情况:

4个partition的情况:

reduceByKey、reduceByKeyLocal API:

这两个要计算的效果是一样的,但是前者是传输,后者是动作,使用时候需要注意:

sc = SparkContext('local')

oldRdd=sc.parallelize([('Key1',1),('Key3',2),('Key1',3),('Key2',4),('Key2',5)])

newRdd = oldRdd.reduceByKey(lambda accumulate,ele : accumulate+ele)

newActionResult = oldRdd.reduceByKeyLocally(lambda accumulate,ele : accumulate+ele)

print(newRdd.collect())

print(newActionResult)

[('Key1', 4), ('Key3', 2), ('Key2', 9)]

{'Key1': 4, 'Key3': 2, 'Key2': 9}

回到前面map、reduce尾巴留的那个思考题,实现的方式不止一种,我这里给出两种解题思路:

方案A:

sc = SparkContext('local')

#第二个参数2代表的是分区数,默认为1

old=sc.parallelize([1,2,3,4,5])

newMapRdd = old.map(lambda x : (str(x),x**2))

print(newMapRdd.collect())

mergeRdd = newMapRdd.values()

print(mergeRdd.sum())

sc = SparkContext('local')

oldRdd=sc.parallelize([1,2,3,4,5])

newListRdd = oldRdd.map(lambda x : x**2)

newMapRdd = oldRdd.zip(newListRdd)

print(newMapRdd.values().sum())

之所以给出这些思路,是因为我们在使用 pyspark 的时候,除了要关心 transformation 和 action 之分,还需要注意你要处理的 rdd 里的数据是 list 还是 map ,因为对于他们实用的方法又是不同的。如果有必要,可以像这样做 list 和 map 的转换。

python连接spark_python如何通过pyspark的API操作spark相关推荐

  1. python连接sap接口_python中调用api接口

    CDN之API以及SDK的调用 简介 阿里云CDN提供了丰富的API接口,除了控制台,CDN还开发了多样化的接口.目前我们推荐用户使用新版API,详情请参见新版API参考.同时CDN提供了多语言SDK ...

  2. python连接plc实例_Python连接数据库MySQL与操作示例

    安装pyMySQL>> pip install pyMySQL 数据连接示例#! /usr/bin/env python # -*- coding: utf-8 -*- "&qu ...

  3. c/c++连接mysql数据库设置及乱码问题(vs2013连接mysql数据库,使用Mysql API操作数据库)...

    我的安装环境: (1)vs2013(32位版) (vs2013只有32位的 没有64位的,但是它可以编译出64位的程序)  : (2)mysql-5.7.15(64位) vs2013中的设置(按步骤来 ...

  4. java hbase创建_hadoop组件介绍及python 连接Hbase

    Ambari Apache Ambari是一种基于Web的工具,支持Hadoop集群的供应.管理和监控.是Apache顶级开源项目之一,由Hortonworks公司开源. Ø 官方网站地址: http ...

  5. 【数据平台】基于pymysql库python连接mysql

    1.场景:python连接mysql的API使用pymysql库 https://pypi.python.org/pypi/PyMySQL/ 安装:pip install pymysql 即可 2.代 ...

  6. python连接阿里云odps

    怎么下载他的库这个就自行百度了,配置完毕之后就用以下代码即可用python连接阿里云odps的数据库了. from odps import ODPS o = ODPS('嘿嘿嘿', #这个地方是阿里云 ...

  7. Python——Python连接MySQL数据库

    基本概念 PyMySQL:PyMySQL是封装了MySQL驱动的Python驱动,一个能使Python连接到MySQL的库. mysql-connector-python(MySQL Connecto ...

  8. python连接sql数据库_python连接sql server数据库实现增删改查

    简述 python连接微软的sql server数据库用的第三方模块叫做pymssql(document:http://www.pymssql.org/en/stable/index.html).在官 ...

  9. Python连接MySQL数据库(pymysql),DataFrame写入 MySQL(create_engine)- Python代码

    模块安装 使用以下命令安装 PyMySQL: $ pip install PyMySQL 若系统不支持 pip,还可以这样安装: $ git clone https://github.com/PyMy ...

最新文章

  1. 清晰易懂的条件随机场原理总结
  2. C语言——第四次作业
  3. Leetcode 120. 三角形最小路径和 (每日一题 20210927)
  4. 南邮计算机图形学水不水,南邮计算机图形学实验报告(修正版)….doc
  5. 2021算法竞赛入门班第二节课【递归、分治、二分】练习题
  6. python算闰年和平年的天数_Python自定义函数计算给定日期是该年第几天的方法示例...
  7. 从一个疯狂下载者变成一个学习者
  8. 03-树3 Tree Traversals Again
  9. mybatis一个怪异的问题: Invalid bound statement (not found)
  10. java--内存模型
  11. pcl学习之kd-tree
  12. 40亿个手机号码如何去重?
  13. 温度能够瞬间提升到千度以上?究竟是什么原理
  14. Excel 下拉列表数据有效性智能匹配筛选
  15. 2016书单总结--看透SpringMvc源代码分析与实践-概述
  16. 【PDF】java使用Itext生成pdf文档--详解
  17. 《图解HTTP》全文笔记梳理
  18. 心灵鸡汤1------让人奋进的五句话
  19. 微型计算机原理跟什么有关,微机原理 课后题 标准答案
  20. 戴尔笔记本电脑USB口失灵

热门文章

  1. 常用的认证机制之session认证和token认证
  2. [mmu/cache]-ARM cache的学习笔记-一篇就够了
  3. 2021-07-03
  4. JWT对称加密非对称加密
  5. 熊猫烧香变种病毒分析
  6. Windows进程与线程学习笔记(一)—— 进程结构体
  7. 004 人物数据查找和代码编写
  8. MySQL修改和删除触发器(DROP TRIGGER)
  9. 【easysnmp】python snmp IF-MIB::ifPhysAddress messy code,解析mac地址乱码
  10. 1052 Linked List Sorting (25 分)【难度: 一般 / 知识点: 链表】