1、集群测试实例

代码如下:

from pyspark.sql import SparkSession

if __name__ == "__main__":

spark = SparkSession\

.builder\

.appName("PythonWordCount")\

.master("spark://mini1:7077") \

.getOrCreate()

spark.conf.set("spark.executor.memory", "500M")

sc = spark.sparkContext

a = sc.parallelize([1, 2, 3])

b = a.flatMap(lambda x: (x,x ** 2))

print(a.collect())

print(b.collect())1

2

3

4

5

6

7

8

9

10

11

12

运行结果:

2、从文件中读取

为了方便调试,这里采用本地模式进行测试

from py4j.compat import long

from pyspark.sql import SparkSession

def formatData(arr):

# arr = arr.split(",")

mb = (arr[0], arr[2])

flag = arr[3]

time = long(arr[1])

# time = arr[1]

if flag == "1":

time = -time

return (mb,time)

if name == “main”:

spark = SparkSession

.builder

.appName(“PythonWordCount”)

.master(“local”)

.getOrCreate()

sc = spark.sparkContext

# sc = spark.sparkContext

line = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\bs_log").map(lambda x: x.split(','))

count = line.map(lambda x: formatData(x))

rdd0 = count.reduceByKey(lambda agg, obj: agg + obj)

# print(count.collect())

line2 = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\lac_info.txt").map(lambda x: x.split(','))

rdd = count.map(lambda arr: (arr[0][1], (arr[0][0], arr[1])))

rdd1 = line2.map(lambda arr: (arr[0], (arr[1], arr[2])))

rdd3 = rdd.join(rdd1)

rdd4 =rdd0.map(lambda arr: (arr[0][0], arr[0][1], arr[1]))

# .map(lambda arr: list(arr).sortBy(lambda arr1: arr1[2]).reverse)

rdd5 = rdd4.groupBy(lambda arr: arr[0]).values().map(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True))

print(rdd5.collect())

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

原文件数据:

结果如下:

[[('18688888888', '16030401EAFB68F1E3CDF819735E1C66', 87600), ('18688888888', '9F36407EAD0629FC166F14DDE7970F68', 51200), ('18688888888', 'CC0710CC94ECC657A8561DE549D940E0', 1300)], [('18611132889', '16030401EAFB68F1E3CDF819735E1C66', 97500), ('18611132889', '9F36407EAD0629FC166F14DDE7970F68', 54000), ('18611132889', 'CC0710CC94ECC657A8561DE549D940E0', 1900)]]1

3、读取文件并将结果保存至文件

from pyspark.sql import SparkSession

from py4j.compat import long

def formatData(arr):

# arr = arr.split(",")

mb = (arr[0], arr[2])

flag = arr[3]

time = long(arr[1])

# time = arr[1]

if flag == “1”:

time = -time

return (mb,time)

if name == “main”:

spark = SparkSession

.builder

.appName(“PythonWordCount”)

.master(“local”)

.getOrCreate()

sc = spark.sparkContext

line = sc.textFile(“D:\code\hadoop\data\spark\day1\bs_log”).map(lambda x: x.split(’,’))

rdd0 = line.map(lambda x: formatData(x))

rdd1 = rdd0.reduceByKey(lambda agg, obj: agg + obj).map(lambda t: (t[0][1], (t[0][0], t[1])))

line2 = sc.textFile(“D:\code\hadoop\data\spark\day1\lac_info.txt”).map(lambda x: x.split(’,’))

rdd2 = line2.map(lambda x: (x[0], (x[1], x[2])))

rdd3 = rdd1.join(rdd2).map(lambda x: (x[1][0][0], x[0], x[1][0][1], x[1][1][0], x[1][1][1]))

rdd4 = rdd3.groupBy(lambda x: x[0])

rdd5 = rdd4.mapValues(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True)[:2])

print(rdd1.join(rdd2).collect())

print(rdd5.collect())

rdd5.saveAsTextFile("D:\\code\\hadoop\\data\\spark\\day02\\out1")

sc.stop()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

结果如下:

4、根据自定义规则匹配

import urllib

from pyspark.sql import SparkSession

def getUrls(urls):

url = urls[0]

parsed = urllib.parse.urlparse(url)

return (parsed.netloc, url, urls[1])

if name == “main”:

spark = SparkSession

.builder

.appName(“PythonWordCount”)

.master(“local”)

.getOrCreate()

sc = spark.sparkContext

line = sc.textFile(“D:\code\hadoop\data\spark\day02\itcast.log”).map(lambda x: x.split(’\t’))

//从数据库中加载规则

arr = [“java.itcast.cn”, “php.itcast.cn”, “net.itcast.cn”]

rdd1 = line.map(lambda x: (x[1], 1))

rdd2 = rdd1.reduceByKey(lambda agg, obj: agg + obj)

rdd3 = rdd2.map(lambda x: getUrls(x))

for ins in arr:

rdd = rdd3.filter(lambda x:x[0] == ins)

result = rdd.sortBy(lambda x: x[2], ascending = False).take(2)

print(result)

spark.stop()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

结果如下:

5、自定义类排序

from operator import gt

from pyspark.sql import SparkSession

class Girl:

def init(self, faceValue, age):

self.faceValue = faceValue

self.age = age

def __gt__(self, other):

if other.faceValue == self.faceValue:

return gt(self.age, other.age)

else:

return gt(self.faceValue, other.faceValue)

if name == “main”:

spark = SparkSession

.builder

.appName(“PythonWordCount”)

.master(“local”)

.getOrCreate()

sc = spark.sparkContext

rdd1 = sc.parallelize([(“yuihatano”, 90, 28, 1), (“angelababy”, 90, 27, 2), (“JuJingYi”, 95, 22, 3)])

rdd2 = rdd1.sortBy(lambda das: Girl(das[1], das[2]),False)

print(rdd2.collect())

sc.stop()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

结果如下:

6、JDBC

from pyspark import SQLContext

from pyspark.sql import SparkSession

if name == “main”:

spark = SparkSession

.builder

.appName(“PythonWordCount”)

.master(“local”)

.getOrCreate()

sc = spark.sparkContext

sqlContext = SQLContext(sc)

df = sqlContext.read.format(“jdbc”).options(url=“jdbc:mysql://localhost:3306/hellospark”,driver=“com.mysql.jdbc.Driver”,dbtable="(select * from actor) tmp",user=“root”,password=“123456”).load()

print(df.select(‘description’,‘age’).show(2))

# print(df.printSchema)

sc.stop()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

结果如下:

pythonspark实例_spark+python快速入门实战小例子(PySpark)相关推荐

  1. spark教程python案例_Spark实战(四)spark+python快速入门实战小例子(PySpark)

    由于目前很多spark程序资料都是用scala语言写的,但是现在需要用python来实现,于是在网上找了scala写的例子改为python实现 1.集群测试实例 代码如下: from pyspark. ...

  2. python入门实战小例子(一朵花的绽放)(花が咲く)

    花が咲く 这个例子对于入门来说,是很简单的,这个例子里面有很多初级的知识,包括创建一块画布,用一个海龟当做实体,把他的移动轨迹画出来,用一个for循环把这朵花画出来. 话不多说先上代码 import ...

  3. python 爬虫实例 电影-Python爬虫入门实战之猫眼电影数据抓取(实战篇)

    电影名 上映信息 综合票房 票房占比 累计票房

  4. 造作吧,Python快速入门!

    双十一的刀口还没愈合,双十二的折扣又戳到了胸口. 买买买, 还是小天最懂你 看看小天都准备了什么! 课程限时优惠,网易云课堂平台优惠券,优惠叠加,课程包更享折上折!双十二年终钜惠,还犹豫些什么? Py ...

  5. python3 爬虫实例_【实战练习】Python3网络爬虫快速入门实战解析(上)

    原标题:[实战练习]Python3网络爬虫快速入门实战解析(上) 摘要 使用python3学习网络爬虫,快速入门静态网站爬取和动态网站爬取 [ 前言 ] 强烈建议:请在电脑的陪同下,阅读本文.本文以实 ...

  6. Python教程:网络爬虫快速入门实战解析

    建议: 请在电脑的陪同下,阅读本文.本文以实战为主,阅读过程如稍有不适,还望多加练习. 网络爬虫简介 网络爬虫,也叫网络蜘蛛(Web Spider).它根据网页地址(URL)爬取网页内容,而网页地址( ...

  7. Python3网络爬虫快速入门实战解析(一小时入门 Python 3 网络爬虫)

    Python3网络爬虫快速入门实战解析(一小时入门 Python 3 网络爬虫) https://blog.csdn.net/u012662731/article/details/78537432 出 ...

  8. Python3网络爬虫快速入门实战解析

    Python3网络爬虫快速入门实战解析 标签: python网络爬虫 2017-09-28 14:48 6266人阅读 评论(34) 收藏 举报 分类: Python(26) 作者同类文章X 版权声明 ...

  9. Python3 网络爬虫快速入门实战解析

    点击上方"Python高校",关注 文末干货立马到手 作者:Jack Cui http://cuijiahua.com/blog/2017/10/spider_tutorial_1 ...

最新文章

  1. 三维重建缺数据集?来看看Facebook最新发布的CO3D
  2. B1928 日期差值
  3. 70条程序员编程的专业名言,你认可几条?
  4. 林辉高考机器人_机器人2019年参加高考 力争考上一本
  5. Bootstrap HTML 编码规范之字符编码
  6. Fiddler之文件代理
  7. BIM平台 http://gzcd.bim001.cn
  8. 5.七个重点网络协议
  9. DB9接口定义 串口接口定义 MAX232电路
  10. 使用 MQL5 绘制阻力和支撑级别
  11. Pycharm工具下的数据可视化(图形绘制)
  12. 【摄像头】图像传感器尺寸、像素大小和成像质量的关系
  13. 我的团长我的团第九集
  14. 【渝粤题库】陕西师范大学201931 唐诗研究 作业
  15. 等额本息和等额本金、提前还款
  16. jQuery LigerUI使用教程入门篇
  17. 颜色RGB对照表(颜色大全)
  18. 脑图工具MindNode附属节点是什么意思 图解
  19. centos7.2安装dcos
  20. 任天堂 虚拟主机服务器,任天堂不计划在Switch上推出虚拟主机

热门文章

  1. UC/OS-II的学习
  2. python 查看当前目录_「Python」打包分发工具setuptools学习
  3. ORA-00018: maximum number of sessions exceeded 超出最大会话数
  4. Android中利用productFlavors配置多渠道
  5. [webview] 放大缩小的问题
  6. [字符串]与[数组]的互相转换
  7. POJ 1745 Divisibility DP
  8. 看看老外是如何理解抽象类的
  9. atmega8 例程:T1定时器 CTC模式 方波输出
  10. 全国计算机等级考试题库二级C操作题100套(第13套)