pyspark---agg的用法
文章目录
- group by agg用法
- dataframe agg 直接计算
group by agg用法
pyspark中的agg聚合运算应该才能达到聚合字段的目的, apply的运算都是一行一行的运算且并没有真实的聚合.
pyspark中已经对agg操作定义了很多方便的运算函数,可以直接调用来对其进行运算.
from:
+---------+-------------+-----+-------+-------+-------+-------+--------+
|ID | P |index|xinf |xup |yinf |ysup | M |
+---------+-------------+-----+-------+-------+-------+-------+--------+
| 0|10279.9003906| 13| 0.3| 0.5| 2.5| 3.0|540928.0|
| 2|12024.2998047| 13| 0.3| 0.5| 2.5| 3.0|541278.0|
| 0|10748.7001953| 13| 0.3| 0.5| 2.5| 3.0|541243.0|
| 1| 10988.5| 13| 0.3| 0.5| 2.5| 3.0|540917.0|
+---------+-------------+-----+-------+-------+-------+-------+--------+to:
+---------+-------------+-----+-------+-------+-------+-------+--------+
|Id | P |index|xinf |xup |yinf |ysup | M |
+---------+-------------+-----+-------+-------+-------+-------+--------+
| 0|10514.3002929| 13| 0.3| 0.5| 2.5| 3.0|540928.0,541243.0|
| 2|12024.2998047| 13| 0.3| 0.5| 2.5| 3.0|541278.0|
| 1| 10988.5| 13| 0.3| 0.5| 2.5| 3.0|540917.0|
+---------+-------------+-----+-------+-------+-------+-------+--------+# 如果要想根据id聚合,计算P的均值,并且连接M
from pyspark.sql.functions import first, collect_list, meanIn:
df.groupBy("ID").agg(mean("P"), first("index"),first("xinf"), first("xup"), first("yinf"), first("ysup"), collect_list("M"))
下面再看几个例子
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
import os
os.environ['JAVA_HOME'] = r'C:\servies\Java\jdk8'
os.environ['SPARK_HOME'] = r'D:\software\spark-2.2.0-bin-hadoop2.7'
os.environ['PYTHONPATH'] = r'D:\software\spark-2.2.0-bin-hadoop2.7\python'spark = SparkSession.builder.appName('test').getOrCreate()data = [("Carol","Data Scientist","USA",70000,5),("Peter","Data Scientist","USA",90000,7),("Clark","Data Scientist","UK",111000,10),("Jean","Data Scientist","UK",220000,30),("Bruce","Data Engineer","UK",80000,4),("Thanos","Data Engineer","USA",115000,13),("Scott","Data Engineer","UK",180000,15),("T'challa","CEO","USA",300000,20),("Xavier","Marketing","USA",100000,11),("Wade","Marketing","UK",60000,2)
]column = ["Name","Job","Country","salary","seniority"]
df = spark.createDataFrame(data=data, schema=column)
# df.printSchema()
df.show()
单列聚合,求每个job的最高工资
# 单列聚合,求每个job的最高工资
df1 = df.groupBy('Job').max('salary')
df1.show()
多关键字分组, 对多列进行同样的聚合操作,求每个Job,每个Country的最高工资和最高资备
# 多关键字分组, 对多列进行同样的聚合操作,求每个Job,每个Country的最高工资和最高资备
df.groupBy('Job','Country').agg(fn.max('salary'), fn.max('seniority')).show()
对多列进行不同的聚合操作, 并修改相应的列名
# 对多列进行不同的聚合操作, 并修改相应的列名
df.groupBy('job').agg(fn.sum("salary").alias("sum_salary"),fn.avg("salary").alias("avg_salary"),fn.min("salary").alias("min_salary"),fn.max("salary").alias("max_salary"),fn.mean("salary").alias("mean_salary")
).show(truncate=False) # truncate=False:左对齐
dataframe agg 直接计算
spark的agg可以直接对DataFrame进行聚合运算, 简单情况即每一列是可以直接列举的
复杂情况是无法直接穷举每一列的表达式, 而是需要创建表达式集合的情况
# =========================简单情况=========================
data.show(5)
+--------+-------+--------+--------------------+-----+--------+
|glass_id|step_id|equip_id| timekey|label| unit_id|
+--------+-------+--------+--------------------+-----+--------+
|Y95PR090| 14200|A2PDC100|20190601094814153863| 1|A2PDC100|
|Y95PR090| 14207|A2VTM100|20190601120431648744| 1|A2VTM100|
|Y95PR090| 1420V|A2PVD100|20190601120428511673| 1|A2PVD100|
|Y95PR090| 14300|A2RSM100|20190601125957981111| 1|A2RSM100|
|Y95PR090| 14315|A2PHT500|20190601150105054455| 1|A2PHT500|
+--------+-------+--------+--------------------+-----+--------+data.agg(mean('label')).show()
+------------------+
| avg(label)|
+------------------+
|0.7411402157164869|
+------------------+# ============直接使用循环来创建表达式的集合===============
tips_.show(2)
+----------+----+----+
|total_bill| tip|size|
+----------+----+----+
| 16.99|1.01| 2.0|
| 10.34|1.66| 3.0|
+----------+----+----+agglist = [mean(x) for x in tips_.columns]agglist
Out[109]: [Column<b'avg(total_bill)'>, Column<b'avg(tip)'>, Column<b'avg(size)'>]tips_.agg(*agglist)
Out[111]: DataFrame[avg(total_bill): double, avg(tip): double, avg(size): double]tips_.agg(*agglist).show()
+------------------+----------------+-----------------+
| avg(total_bill)| avg(tip)| avg(size)|
+------------------+----------------+-----------------+
|19.785942643392282|2.99827868821191|2.569672131147541|
+------------------+----------------+-----------------+
pyspark---agg的用法相关推荐
- pyspark dataframe基本用法
pyspark dataframe基本用法 #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on F ...
- PySpark reduce reduceByKey用法
用法 reduce:对rdd内部 元素 进行迭代操作 reduce方法 分区内和分区间调用相同的用户给定的函数; 先在每个分区内执行完用户给定的函数后,将每个分区的结果通过collect()方法统计到 ...
- when-otherwise for pyspark用法
pyspark when otherwise用法 描述:根据某一列的值,修改另一列的值 1.第一种思路,生成临时表,统计数量,用withcolumn 修改数据 2.第二种思路,调用pandas API ...
- pandas 聚合函数agg
今天看到pandas的聚合函数agg,比较陌生,平时的工作中处理数据的时候使用的也比较少,为了加深印象,总结一下使用的方法,其实还是挺好用的. DataFrame.agg(func,axis = 0, ...
- Pandas知识点-详解聚合函数agg
Pandas知识点-详解聚合函数agg Pandas提供了多个聚合函数,聚合函数可以快速.简洁地将多个函数的执行结果聚合到一起. 本文介绍的聚合函数为DataFrame.aggregate(),别名D ...
- python agg函数_pandas agg函数使用方法
DataFrame.agg(func,axis = 0,* args,** kwargs) func : 函数,函数名称,函数列表,字典{'行名/列名','函数名'} 使用指定轴上的一个或多个操作进行 ...
- Python之数据聚合——aggregate()方法
文章目录 使用内置统计方法聚合数据 面向列的聚合方法 aggregate()方法 对每一列数据应用同一个函数 对某列数据应用不同的函数 对不同列数据应用不同函数 使用内置统计方法聚合数据 实现数据拆分 ...
- ElasticSearch Aggregations使用总结详解
1.单字段情况下聚合 假设只需要对一个字段聚合,比如b字段,b字段是keyword类型,需要考虑的情况最为简单,当要对b字段聚合时语句很好写,如下即可 {"from": 0,&qu ...
- python:dataframe groupby后agg、apply、transfrom用法
import pandas as pd data = pd.DataFrame({'name':['wencky','stany','barbio','barbio'],'age':[29,29,3, ...
- python agg函数_Python Pandas Series.agg()用法及代码示例
Python是进行数据分析的一种出色语言,主要是因为以数据为中心的python软件包具有奇妙的生态系统. Pandas是其中的一种,使导入和分析数据更加容易. Pandas Series.agg()用 ...
最新文章
- asp.net core 创建允许跨域请求的api, cors.
- Greenplum——升级的分布式PostgresSQL
- mysql安装check requirements出错_超详细的MySQL8.0.17版本安装教程
- [TODO]Python拾遗(二)
- [HTTP] 重定向的302,301
- 中国蔬菜汤市场趋势报告、技术动态创新及市场预测
- iOS开发-当APP涉及到用户敏感信息适配Xcode9及(ios11)
- 46.Android 自己定义Dialog
- MATLAB疲劳检测系统
- JQuery EasyUI 结合ztrIee的后台页面开发
- keil的错误: Error: Encountered an improper argument 的解决方法
- 华为云CentOS7.6云耀服务器Python环境基本配置
- 叠加阶梯图和线图及合并线图和针状图
- 如何用简单的方式将数组转成json
- 大学linux操作系统,大学信息技术(Linux操作系统及其应用)
- 回头再说 008 瞬
- 分布式任务xxl-job调度中心安装说明
- php彩色教程,Photoshop简单制作立体彩色炫图
- java项目添加功能失败_学生信息的添加 Java web简单项目初试(失败)
- JavaScript:原生js写的一个多按钮Popover 弹出框