pythonspark实例_spark+python快速入门实战小例子(PySpark)
1、集群测试实例
代码如下:
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.master("spark://mini1:7077") \
.getOrCreate()
spark.conf.set("spark.executor.memory", "500M")
sc = spark.sparkContext
a = sc.parallelize([1, 2, 3])
b = a.flatMap(lambda x: (x,x ** 2))
print(a.collect())
print(b.collect())1
2
3
4
5
6
7
8
9
10
11
12
运行结果:
2、从文件中读取
为了方便调试,这里采用本地模式进行测试
from py4j.compat import long
from pyspark.sql import SparkSession
def formatData(arr):
# arr = arr.split(",")
mb = (arr[0], arr[2])
flag = arr[3]
time = long(arr[1])
# time = arr[1]
if flag == "1":
time = -time
return (mb,time)
if name == “main”:
spark = SparkSession
.builder
.appName(“PythonWordCount”)
.master(“local”)
.getOrCreate()
sc = spark.sparkContext
# sc = spark.sparkContext
line = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\bs_log").map(lambda x: x.split(','))
count = line.map(lambda x: formatData(x))
rdd0 = count.reduceByKey(lambda agg, obj: agg + obj)
# print(count.collect())
line2 = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\lac_info.txt").map(lambda x: x.split(','))
rdd = count.map(lambda arr: (arr[0][1], (arr[0][0], arr[1])))
rdd1 = line2.map(lambda arr: (arr[0], (arr[1], arr[2])))
rdd3 = rdd.join(rdd1)
rdd4 =rdd0.map(lambda arr: (arr[0][0], arr[0][1], arr[1]))
# .map(lambda arr: list(arr).sortBy(lambda arr1: arr1[2]).reverse)
rdd5 = rdd4.groupBy(lambda arr: arr[0]).values().map(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True))
print(rdd5.collect())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
原文件数据:
结果如下:
[[('18688888888', '16030401EAFB68F1E3CDF819735E1C66', 87600), ('18688888888', '9F36407EAD0629FC166F14DDE7970F68', 51200), ('18688888888', 'CC0710CC94ECC657A8561DE549D940E0', 1300)], [('18611132889', '16030401EAFB68F1E3CDF819735E1C66', 97500), ('18611132889', '9F36407EAD0629FC166F14DDE7970F68', 54000), ('18611132889', 'CC0710CC94ECC657A8561DE549D940E0', 1900)]]1
3、读取文件并将结果保存至文件
from pyspark.sql import SparkSession
from py4j.compat import long
def formatData(arr):
# arr = arr.split(",")
mb = (arr[0], arr[2])
flag = arr[3]
time = long(arr[1])
# time = arr[1]
if flag == “1”:
time = -time
return (mb,time)
if name == “main”:
spark = SparkSession
.builder
.appName(“PythonWordCount”)
.master(“local”)
.getOrCreate()
sc = spark.sparkContext
line = sc.textFile(“D:\code\hadoop\data\spark\day1\bs_log”).map(lambda x: x.split(’,’))
rdd0 = line.map(lambda x: formatData(x))
rdd1 = rdd0.reduceByKey(lambda agg, obj: agg + obj).map(lambda t: (t[0][1], (t[0][0], t[1])))
line2 = sc.textFile(“D:\code\hadoop\data\spark\day1\lac_info.txt”).map(lambda x: x.split(’,’))
rdd2 = line2.map(lambda x: (x[0], (x[1], x[2])))
rdd3 = rdd1.join(rdd2).map(lambda x: (x[1][0][0], x[0], x[1][0][1], x[1][1][0], x[1][1][1]))
rdd4 = rdd3.groupBy(lambda x: x[0])
rdd5 = rdd4.mapValues(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True)[:2])
print(rdd1.join(rdd2).collect())
print(rdd5.collect())
rdd5.saveAsTextFile("D:\\code\\hadoop\\data\\spark\\day02\\out1")
sc.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
结果如下:
4、根据自定义规则匹配
import urllib
from pyspark.sql import SparkSession
def getUrls(urls):
url = urls[0]
parsed = urllib.parse.urlparse(url)
return (parsed.netloc, url, urls[1])
if name == “main”:
spark = SparkSession
.builder
.appName(“PythonWordCount”)
.master(“local”)
.getOrCreate()
sc = spark.sparkContext
line = sc.textFile(“D:\code\hadoop\data\spark\day02\itcast.log”).map(lambda x: x.split(’\t’))
//从数据库中加载规则
arr = [“java.itcast.cn”, “php.itcast.cn”, “net.itcast.cn”]
rdd1 = line.map(lambda x: (x[1], 1))
rdd2 = rdd1.reduceByKey(lambda agg, obj: agg + obj)
rdd3 = rdd2.map(lambda x: getUrls(x))
for ins in arr:
rdd = rdd3.filter(lambda x:x[0] == ins)
result = rdd.sortBy(lambda x: x[2], ascending = False).take(2)
print(result)
spark.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
结果如下:
5、自定义类排序
from operator import gt
from pyspark.sql import SparkSession
class Girl:
def init(self, faceValue, age):
self.faceValue = faceValue
self.age = age
def __gt__(self, other):
if other.faceValue == self.faceValue:
return gt(self.age, other.age)
else:
return gt(self.faceValue, other.faceValue)
if name == “main”:
spark = SparkSession
.builder
.appName(“PythonWordCount”)
.master(“local”)
.getOrCreate()
sc = spark.sparkContext
rdd1 = sc.parallelize([(“yuihatano”, 90, 28, 1), (“angelababy”, 90, 27, 2), (“JuJingYi”, 95, 22, 3)])
rdd2 = rdd1.sortBy(lambda das: Girl(das[1], das[2]),False)
print(rdd2.collect())
sc.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
结果如下:
6、JDBC
from pyspark import SQLContext
from pyspark.sql import SparkSession
if name == “main”:
spark = SparkSession
.builder
.appName(“PythonWordCount”)
.master(“local”)
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format(“jdbc”).options(url=“jdbc:mysql://localhost:3306/hellospark”,driver=“com.mysql.jdbc.Driver”,dbtable="(select * from actor) tmp",user=“root”,password=“123456”).load()
print(df.select(‘description’,‘age’).show(2))
# print(df.printSchema)
sc.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
结果如下:
pythonspark实例_spark+python快速入门实战小例子(PySpark)相关推荐
- spark教程python案例_Spark实战(四)spark+python快速入门实战小例子(PySpark)
由于目前很多spark程序资料都是用scala语言写的,但是现在需要用python来实现,于是在网上找了scala写的例子改为python实现 1.集群测试实例 代码如下: from pyspark. ...
- python入门实战小例子(一朵花的绽放)(花が咲く)
花が咲く 这个例子对于入门来说,是很简单的,这个例子里面有很多初级的知识,包括创建一块画布,用一个海龟当做实体,把他的移动轨迹画出来,用一个for循环把这朵花画出来. 话不多说先上代码 import ...
- python 爬虫实例 电影-Python爬虫入门实战之猫眼电影数据抓取(实战篇)
电影名 上映信息 综合票房 票房占比 累计票房
- 造作吧,Python快速入门!
双十一的刀口还没愈合,双十二的折扣又戳到了胸口. 买买买, 还是小天最懂你 看看小天都准备了什么! 课程限时优惠,网易云课堂平台优惠券,优惠叠加,课程包更享折上折!双十二年终钜惠,还犹豫些什么? Py ...
- python3 爬虫实例_【实战练习】Python3网络爬虫快速入门实战解析(上)
原标题:[实战练习]Python3网络爬虫快速入门实战解析(上) 摘要 使用python3学习网络爬虫,快速入门静态网站爬取和动态网站爬取 [ 前言 ] 强烈建议:请在电脑的陪同下,阅读本文.本文以实 ...
- Python教程:网络爬虫快速入门实战解析
建议: 请在电脑的陪同下,阅读本文.本文以实战为主,阅读过程如稍有不适,还望多加练习. 网络爬虫简介 网络爬虫,也叫网络蜘蛛(Web Spider).它根据网页地址(URL)爬取网页内容,而网页地址( ...
- Python3网络爬虫快速入门实战解析(一小时入门 Python 3 网络爬虫)
Python3网络爬虫快速入门实战解析(一小时入门 Python 3 网络爬虫) https://blog.csdn.net/u012662731/article/details/78537432 出 ...
- Python3网络爬虫快速入门实战解析
Python3网络爬虫快速入门实战解析 标签: python网络爬虫 2017-09-28 14:48 6266人阅读 评论(34) 收藏 举报 分类: Python(26) 作者同类文章X 版权声明 ...
- Python3 网络爬虫快速入门实战解析
点击上方"Python高校",关注 文末干货立马到手 作者:Jack Cui http://cuijiahua.com/blog/2017/10/spider_tutorial_1 ...
最新文章
- 三维重建缺数据集?来看看Facebook最新发布的CO3D
- B1928 日期差值
- 70条程序员编程的专业名言,你认可几条?
- 林辉高考机器人_机器人2019年参加高考 力争考上一本
- Bootstrap HTML 编码规范之字符编码
- Fiddler之文件代理
- BIM平台 http://gzcd.bim001.cn
- 5.七个重点网络协议
- DB9接口定义 串口接口定义 MAX232电路
- 使用 MQL5 绘制阻力和支撑级别
- Pycharm工具下的数据可视化(图形绘制)
- 【摄像头】图像传感器尺寸、像素大小和成像质量的关系
- 我的团长我的团第九集
- 【渝粤题库】陕西师范大学201931 唐诗研究 作业
- 等额本息和等额本金、提前还款
- jQuery LigerUI使用教程入门篇
- 颜色RGB对照表(颜色大全)
- 脑图工具MindNode附属节点是什么意思 图解
- centos7.2安装dcos
- 任天堂 虚拟主机服务器,任天堂不计划在Switch上推出虚拟主机
热门文章
- UC/OS-II的学习
- python 查看当前目录_「Python」打包分发工具setuptools学习
- ORA-00018: maximum number of sessions exceeded 超出最大会话数
- Android中利用productFlavors配置多渠道
- [webview] 放大缩小的问题
- [字符串]与[数组]的互相转换
- POJ 1745 Divisibility DP
- 看看老外是如何理解抽象类的
- atmega8 例程:T1定时器 CTC模式 方波输出
- 全国计算机等级考试题库二级C操作题100套(第13套)