文章目录

  • group by agg用法
  • dataframe agg 直接计算

group by agg用法

pyspark中的agg聚合运算应该才能达到聚合字段的目的, apply的运算都是一行一行的运算且并没有真实的聚合.

pyspark中已经对agg操作定义了很多方便的运算函数,可以直接调用来对其进行运算.

from:
+---------+-------------+-----+-------+-------+-------+-------+--------+
|ID       |           P |index|xinf   |xup    |yinf   |ysup   |     M  |
+---------+-------------+-----+-------+-------+-------+-------+--------+
|        0|10279.9003906|   13|    0.3|    0.5|    2.5|    3.0|540928.0|
|        2|12024.2998047|   13|    0.3|    0.5|    2.5|    3.0|541278.0|
|        0|10748.7001953|   13|    0.3|    0.5|    2.5|    3.0|541243.0|
|        1|      10988.5|   13|    0.3|    0.5|    2.5|    3.0|540917.0|
+---------+-------------+-----+-------+-------+-------+-------+--------+to:
+---------+-------------+-----+-------+-------+-------+-------+--------+
|Id       |           P |index|xinf   |xup    |yinf   |ysup   |     M  |
+---------+-------------+-----+-------+-------+-------+-------+--------+
|        0|10514.3002929|   13|    0.3|    0.5|    2.5|    3.0|540928.0,541243.0|
|        2|12024.2998047|   13|    0.3|    0.5|    2.5|    3.0|541278.0|
|        1|      10988.5|   13|    0.3|    0.5|    2.5|    3.0|540917.0|
+---------+-------------+-----+-------+-------+-------+-------+--------+# 如果要想根据id聚合,计算P的均值,并且连接M
from pyspark.sql.functions import first, collect_list, meanIn:
df.groupBy("ID").agg(mean("P"), first("index"),first("xinf"), first("xup"), first("yinf"), first("ysup"), collect_list("M"))

下面再看几个例子

from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
import os
os.environ['JAVA_HOME'] = r'C:\servies\Java\jdk8'
os.environ['SPARK_HOME'] = r'D:\software\spark-2.2.0-bin-hadoop2.7'
os.environ['PYTHONPATH'] = r'D:\software\spark-2.2.0-bin-hadoop2.7\python'spark = SparkSession.builder.appName('test').getOrCreate()data = [("Carol","Data Scientist","USA",70000,5),("Peter","Data Scientist","USA",90000,7),("Clark","Data Scientist","UK",111000,10),("Jean","Data Scientist","UK",220000,30),("Bruce","Data Engineer","UK",80000,4),("Thanos","Data Engineer","USA",115000,13),("Scott","Data Engineer","UK",180000,15),("T'challa","CEO","USA",300000,20),("Xavier","Marketing","USA",100000,11),("Wade","Marketing","UK",60000,2)
]column = ["Name","Job","Country","salary","seniority"]
df = spark.createDataFrame(data=data, schema=column)
# df.printSchema()
df.show()


单列聚合,求每个job的最高工资

# 单列聚合,求每个job的最高工资
df1 = df.groupBy('Job').max('salary')
df1.show()


多关键字分组, 对多列进行同样的聚合操作,求每个Job,每个Country的最高工资和最高资备

# 多关键字分组, 对多列进行同样的聚合操作,求每个Job,每个Country的最高工资和最高资备
df.groupBy('Job','Country').agg(fn.max('salary'), fn.max('seniority')).show()


对多列进行不同的聚合操作, 并修改相应的列名

# 对多列进行不同的聚合操作, 并修改相应的列名
df.groupBy('job').agg(fn.sum("salary").alias("sum_salary"),fn.avg("salary").alias("avg_salary"),fn.min("salary").alias("min_salary"),fn.max("salary").alias("max_salary"),fn.mean("salary").alias("mean_salary")
).show(truncate=False)      # truncate=False:左对齐

dataframe agg 直接计算

spark的agg可以直接对DataFrame进行聚合运算, 简单情况即每一列是可以直接列举的

复杂情况是无法直接穷举每一列的表达式, 而是需要创建表达式集合的情况

# =========================简单情况=========================
data.show(5)
+--------+-------+--------+--------------------+-----+--------+
|glass_id|step_id|equip_id|             timekey|label| unit_id|
+--------+-------+--------+--------------------+-----+--------+
|Y95PR090|  14200|A2PDC100|20190601094814153863|    1|A2PDC100|
|Y95PR090|  14207|A2VTM100|20190601120431648744|    1|A2VTM100|
|Y95PR090|  1420V|A2PVD100|20190601120428511673|    1|A2PVD100|
|Y95PR090|  14300|A2RSM100|20190601125957981111|    1|A2RSM100|
|Y95PR090|  14315|A2PHT500|20190601150105054455|    1|A2PHT500|
+--------+-------+--------+--------------------+-----+--------+data.agg(mean('label')).show()
+------------------+
|        avg(label)|
+------------------+
|0.7411402157164869|
+------------------+# ============直接使用循环来创建表达式的集合===============
tips_.show(2)
+----------+----+----+
|total_bill| tip|size|
+----------+----+----+
|     16.99|1.01| 2.0|
|     10.34|1.66| 3.0|
+----------+----+----+agglist = [mean(x) for x in tips_.columns]agglist
Out[109]: [Column<b'avg(total_bill)'>, Column<b'avg(tip)'>, Column<b'avg(size)'>]tips_.agg(*agglist)
Out[111]: DataFrame[avg(total_bill): double, avg(tip): double, avg(size): double]tips_.agg(*agglist).show()
+------------------+----------------+-----------------+
|   avg(total_bill)|        avg(tip)|        avg(size)|
+------------------+----------------+-----------------+
|19.785942643392282|2.99827868821191|2.569672131147541|
+------------------+----------------+-----------------+

pyspark---agg的用法相关推荐

  1. pyspark dataframe基本用法

    pyspark dataframe基本用法 #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on F ...

  2. PySpark reduce reduceByKey用法

    用法 reduce:对rdd内部 元素 进行迭代操作 reduce方法 分区内和分区间调用相同的用户给定的函数; 先在每个分区内执行完用户给定的函数后,将每个分区的结果通过collect()方法统计到 ...

  3. when-otherwise for pyspark用法

    pyspark when otherwise用法 描述:根据某一列的值,修改另一列的值 1.第一种思路,生成临时表,统计数量,用withcolumn 修改数据 2.第二种思路,调用pandas API ...

  4. pandas 聚合函数agg

    今天看到pandas的聚合函数agg,比较陌生,平时的工作中处理数据的时候使用的也比较少,为了加深印象,总结一下使用的方法,其实还是挺好用的. DataFrame.agg(func,axis = 0, ...

  5. Pandas知识点-详解聚合函数agg

    Pandas知识点-详解聚合函数agg Pandas提供了多个聚合函数,聚合函数可以快速.简洁地将多个函数的执行结果聚合到一起. 本文介绍的聚合函数为DataFrame.aggregate(),别名D ...

  6. python agg函数_pandas agg函数使用方法

    DataFrame.agg(func,axis = 0,* args,** kwargs) func : 函数,函数名称,函数列表,字典{'行名/列名','函数名'} 使用指定轴上的一个或多个操作进行 ...

  7. Python之数据聚合——aggregate()方法

    文章目录 使用内置统计方法聚合数据 面向列的聚合方法 aggregate()方法 对每一列数据应用同一个函数 对某列数据应用不同的函数 对不同列数据应用不同函数 使用内置统计方法聚合数据 实现数据拆分 ...

  8. ElasticSearch Aggregations使用总结详解

    1.单字段情况下聚合 假设只需要对一个字段聚合,比如b字段,b字段是keyword类型,需要考虑的情况最为简单,当要对b字段聚合时语句很好写,如下即可 {"from": 0,&qu ...

  9. python:dataframe groupby后agg、apply、transfrom用法

    import pandas as pd data = pd.DataFrame({'name':['wencky','stany','barbio','barbio'],'age':[29,29,3, ...

  10. python agg函数_Python Pandas Series.agg()用法及代码示例

    Python是进行数据分析的一种出色语言,主要是因为以数据为中心的python软件包具有奇妙的生态系统. Pandas是其中的一种,使导入和分析数据更加容易. Pandas Series.agg()用 ...

最新文章

  1. asp.net core 创建允许跨域请求的api, cors.
  2. Greenplum——升级的分布式PostgresSQL
  3. mysql安装check requirements出错_超详细的MySQL8.0.17版本安装教程
  4. [TODO]Python拾遗(二)
  5. [HTTP] 重定向的302,301
  6. 中国蔬菜汤市场趋势报告、技术动态创新及市场预测
  7. iOS开发-当APP涉及到用户敏感信息适配Xcode9及(ios11)
  8. 46.Android 自己定义Dialog
  9. MATLAB疲劳检测系统
  10. JQuery EasyUI 结合ztrIee的后台页面开发
  11. keil的错误: Error: Encountered an improper argument 的解决方法
  12. 华为云CentOS7.6云耀服务器Python环境基本配置
  13. 叠加阶梯图和线图及合并线图和针状图
  14. 如何用简单的方式将数组转成json
  15. 大学linux操作系统,大学信息技术(Linux操作系统及其应用)
  16. 回头再说 008 瞬
  17. 分布式任务xxl-job调度中心安装说明
  18. php彩色教程,Photoshop简单制作立体彩色炫图
  19. java项目添加功能失败_学生信息的添加 Java web简单项目初试(失败)
  20. JavaScript:原生js写的一个多按钮Popover 弹出框

热门文章

  1. 浙江省杭州工程师职称申报方式
  2. 股市java_Java获取股市交易日
  3. 通俗易懂说字节序,大小端,网络序和主机序
  4. C语言全网最详细的分支和循环语句讲解
  5. android沉浸式+虚拟按键+Fragment+CoordinatorLayout(2)
  6. JTAG和SWD调试器
  7. 一套用了 70 年的计算机架构 —— 冯·诺依曼架构
  8. 魔兽世界-战士的真谛
  9. java 利用网页显示摄像头_web网页调用摄像头拍照
  10. Oracle数据库的空间管理技巧