由于目前很多spark程序资料都是用scala语言写的,但是现在需要用python来实现,于是在网上找了scala写的例子改为python实现

1、集群测试实例

代码如下:

from pyspark.sql import SparkSession

if __name__ == "__main__":

spark = SparkSession\

.builder\

.appName("PythonWordCount")\

.master("spark://mini1:7077") \

.getOrCreate()

spark.conf.set("spark.executor.memory", "500M")

sc = spark.sparkContext

a = sc.parallelize([1, 2, 3])

b = a.flatMap(lambda x: (x,x ** 2))

print(a.collect())

print(b.collect())

1

2

3

4

5

6

7

8

9

10

11

12

运行结果:

2、从文件中读取

为了方便调试,这里采用本地模式进行测试

from py4j.compat import long

from pyspark.sql import SparkSession

def formatData(arr):

# arr = arr.split(",")

mb = (arr[0], arr[2])

flag = arr[3]

time = long(arr[1])

# time = arr[1]

if flag == "1":

time = -time

return (mb,time)

if __name__ == "__main__":

spark = SparkSession\

.builder\

.appName("PythonWordCount")\

.master("local")\

.getOrCreate()

sc = spark.sparkContext

# sc = spark.sparkContext

line = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\bs_log").map(lambda x: x.split(','))

count = line.map(lambda x: formatData(x))

rdd0 = count.reduceByKey(lambda agg, obj: agg + obj)

# print(count.collect())

line2 = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\lac_info.txt").map(lambda x: x.split(','))

rdd = count.map(lambda arr: (arr[0][1], (arr[0][0], arr[1])))

rdd1 = line2.map(lambda arr: (arr[0], (arr[1], arr[2])))

rdd3 = rdd.join(rdd1)

rdd4 =rdd0.map(lambda arr: (arr[0][0], arr[0][1], arr[1]))

# .map(lambda arr: list(arr).sortBy(lambda arr1: arr1[2]).reverse)

rdd5 = rdd4.groupBy(lambda arr: arr[0]).values().map(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True))

print(rdd5.collect())

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

原文件数据:

结果如下:

[[('18688888888', '16030401EAFB68F1E3CDF819735E1C66', 87600), ('18688888888', '9F36407EAD0629FC166F14DDE7970F68', 51200), ('18688888888', 'CC0710CC94ECC657A8561DE549D940E0', 1300)], [('18611132889', '16030401EAFB68F1E3CDF819735E1C66', 97500), ('18611132889', '9F36407EAD0629FC166F14DDE7970F68', 54000), ('18611132889', 'CC0710CC94ECC657A8561DE549D940E0', 1900)]]

1

3、读取文件并将结果保存至文件

from pyspark.sql import SparkSession

from py4j.compat import long

def formatData(arr):

# arr = arr.split(",")

mb = (arr[0], arr[2])

flag = arr[3]

time = long(arr[1])

# time = arr[1]

if flag == "1":

time = -time

return (mb,time)

if __name__ == "__main__":

spark = SparkSession\

.builder\

.appName("PythonWordCount")\

.master("local")\

.getOrCreate()

sc = spark.sparkContext

line = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\bs_log").map(lambda x: x.split(','))

rdd0 = line.map(lambda x: formatData(x))

rdd1 = rdd0.reduceByKey(lambda agg, obj: agg + obj).map(lambda t: (t[0][1], (t[0][0], t[1])))

line2 = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\lac_info.txt").map(lambda x: x.split(','))

rdd2 = line2.map(lambda x: (x[0], (x[1], x[2])))

rdd3 = rdd1.join(rdd2).map(lambda x: (x[1][0][0], x[0], x[1][0][1], x[1][1][0], x[1][1][1]))

rdd4 = rdd3.groupBy(lambda x: x[0])

rdd5 = rdd4.mapValues(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True)[:2])

print(rdd1.join(rdd2).collect())

print(rdd5.collect())

rdd5.saveAsTextFile("D:\\code\\hadoop\\data\\spark\\day02\\out1")

sc.stop()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

结果如下:

4、根据自定义规则匹配

import urllib

from pyspark.sql import SparkSession

def getUrls(urls):

url = urls[0]

parsed = urllib.parse.urlparse(url)

return (parsed.netloc, url, urls[1])

if __name__ == "__main__":

spark = SparkSession \

.builder \

.appName("PythonWordCount") \

.master("local") \

.getOrCreate()

sc = spark.sparkContext

line = sc.textFile("D:\\code\\hadoop\\data\\spark\\day02\\itcast.log").map(lambda x: x.split('\t'))

//从数据库中加载规则

arr = ["java.itcast.cn", "php.itcast.cn", "net.itcast.cn"]

rdd1 = line.map(lambda x: (x[1], 1))

rdd2 = rdd1.reduceByKey(lambda agg, obj: agg + obj)

rdd3 = rdd2.map(lambda x: getUrls(x))

for ins in arr:

rdd = rdd3.filter(lambda x:x[0] == ins)

result = rdd.sortBy(lambda x: x[2], ascending = False).take(2)

print(result)

spark.stop()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

结果如下:

5、自定义类排序

from operator import gt

from pyspark.sql import SparkSession

class Girl:

def __init__(self, faceValue, age):

self.faceValue = faceValue

self.age = age

def __gt__(self, other):

if other.faceValue == self.faceValue:

return gt(self.age, other.age)

else:

return gt(self.faceValue, other.faceValue)

if __name__ == "__main__":

spark = SparkSession\

.builder\

.appName("PythonWordCount")\

.master("local")\

.getOrCreate()

sc = spark.sparkContext

rdd1 = sc.parallelize([("yuihatano", 90, 28, 1), ("angelababy", 90, 27, 2), ("JuJingYi", 95, 22, 3)])

rdd2 = rdd1.sortBy(lambda das: Girl(das[1], das[2]),False)

print(rdd2.collect())

sc.stop()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

结果如下:

6、JDBC

from pyspark import SQLContext

from pyspark.sql import SparkSession

if __name__ == "__main__":

spark = SparkSession\

.builder\

.appName("PythonWordCount")\

.master("local")\

.getOrCreate()

sc = spark.sparkContext

sqlContext = SQLContext(sc)

df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/hellospark",driver="com.mysql.jdbc.Driver",dbtable="(select * from actor) tmp",user="root",password="123456").load()

print(df.select('description','age').show(2))

# print(df.printSchema)

sc.stop()

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

结果如下:

spark教程python案例_Spark实战(四)spark+python快速入门实战小例子(PySpark)相关推荐

  1. Python教程:网络爬虫快速入门实战解析

    建议: 请在电脑的陪同下,阅读本文.本文以实战为主,阅读过程如稍有不适,还望多加练习. 网络爬虫简介 网络爬虫,也叫网络蜘蛛(Web Spider).它根据网页地址(URL)爬取网页内容,而网页地址( ...

  2. 『Python开发实战菜鸟教程』实战篇:爬虫快速入门——统计分析CSDN与博客园博客阅读数据

    文章目录 0x01:引子 首先介绍一下网络爬虫是什么,可以用来做什么? 这里简单探讨一下网络爬虫的合法性 正式进入爬虫实战前,需要我们了解下网页结构 HTML CSS JScript 写一个简单的 H ...

  3. Python3网络爬虫快速入门实战解析(一小时入门 Python 3 网络爬虫)

    Python3网络爬虫快速入门实战解析(一小时入门 Python 3 网络爬虫) https://blog.csdn.net/u012662731/article/details/78537432 出 ...

  4. Python学习教程(Python学习路线_Python基础学习教程_Python视频教程):初学者新手怎样快速入门Python

    Python学习教程(Python学习路线_Python基础学习教程_Python视频教程):初学者新手怎样快速入门Python? 人生苦短,我用Python!!!短短几个字,现在在各大学习类平台随处 ...

  5. Python3网络爬虫快速入门实战解析

    Python3网络爬虫快速入门实战解析 标签: python网络爬虫 2017-09-28 14:48 6266人阅读 评论(34) 收藏 举报 分类: Python(26) 作者同类文章X 版权声明 ...

  6. Python3 网络爬虫快速入门实战解析

    点击上方"Python高校",关注 文末干货立马到手 作者:Jack Cui http://cuijiahua.com/blog/2017/10/spider_tutorial_1 ...

  7. python3 爬虫实例_【实战练习】Python3网络爬虫快速入门实战解析(上)

    原标题:[实战练习]Python3网络爬虫快速入门实战解析(上) 摘要 使用python3学习网络爬虫,快速入门静态网站爬取和动态网站爬取 [ 前言 ] 强烈建议:请在电脑的陪同下,阅读本文.本文以实 ...

  8. beautifulsoup网页爬虫解析_Python3 网络爬虫快速入门实战解析

    点击上方"Python高校",关注 文末干货立马到手 作者:Jack Cui http://cuijiahua.com/blog/2017/10/spider_tutorial_1 ...

  9. 猿创征文 | 国产数据库实战之TiDB 数据库快速入门

    猿创征文 | 国产数据库实战之TiDB 数据库快速入门 一.系统检查 1.检查系统版本 2.查看本地IP地址 3.TiDB集群介绍 二.快速部署本地测试集群 1.安装 TiUP工具 2.声明全局环境变 ...

  10. 新手第四课-PaddlePaddle快速入门

    新手第四课-PaddlePaddle快速入门 文章目录 新手第四课-PaddlePaddle快速入门 PaddlePaddle基础命令 计算常量的加法:1+1 计算变量的加法:1+1 使用Paddle ...

最新文章

  1. WebDriver API 元素定位(三)
  2. dubbo源码解析(四十一)集群——Mock
  3. 机器学习(十二)朴素贝叶斯分类
  4. java 32个面试问题
  5. php file_get_contents 效率,php 浅析file_get_contents、curl 的效率和稳定性
  6. 查看编译class文件时,使用的JDK版本(只能查看到大版本)
  7. 5类6类7类网线对比_超6类网线和7类网线有何区别?有何用途?家用电脑,看这!...
  8. 鸿蒙 HarmonyOS 3.0,终于来了!
  9. 关于select和option下拉框样式问题
  10. 商汤科技——机器视觉面试
  11. 如何选择企业级数据存储DAS、NAS和SAN
  12. 抖音关键词排名优化技巧,手把手教你怎样优化抖音关键词
  13. AKM项目轶事之与高中同学徐挺会见
  14. 为什么会有堆内存和栈内存之分
  15. [审核]App Store审核指南
  16. 爬取网易云热门音乐歌单
  17. 安慰奶牛 C++ kruskal
  18. 360怎样修改wifi服务器,360路由器怎么改wi-fi密码(无线密码)?
  19. 【网络安全】内网介绍+windows信息收集(含命令)
  20. pico的学习之路(一)——MQ-2烟雾传感器模块(树莓派pico实现)

热门文章

  1. python布尔类型运算_9.python的布尔类型与流程控制
  2. 《深入浅出数据分析》第十三章
  3. quill鼠标悬浮 出现提示_外设报道——DELUX多彩M618X垂直鼠标颠覆创新
  4. Qt 中pro文件换行注意的问题
  5. IOS 企业版发布后,用户通过sarafi浏览器安装无效的解决方案
  6. html触发js参数怎么用,js 绑定带参数的事件以及手动触发事件
  7. matlab有模糊分析,用matlab进行模糊综合评判_模糊综合评判matlab
  8. 键盘enter按钮出发登陆事件
  9. IIS 5.0 和 6.0 的 ASP.NET 应用程序生命周期概述
  10. Android ORM 框架:GreenDao 数据库升级