官网文档可以参考:https://spark.apache.org/docs/latest/api/python/index.html

dataframe读写

生成以逗号分隔的数据

stringCSVRDD = spark.sparkContext.parallelize([(123, "Katie", 19, "brown"),(234, "Michael", 22, "green"),(345, "Simone", 23, "blue")
])

指定模式, StructField(name,dataType,nullable)

其中:

name: 该字段的名字,

dataType:该字段的数据类型,

nullable: 指示该字段的值是否为空

from pyspark.sql.types import StructType, StructField, LongType, StringType  # 导入类型schema = StructType([StructField("id", LongType(), True),StructField("name", StringType(), True),StructField("age", LongType(), True),StructField("eyeColor", StringType(), True)
])

对RDD应用该模式并且创建DataFrame

swimmers = spark.createDataFrame(stringCSVRDD,schema)

利用DataFrame创建一个临时视图

swimmers.registerTempTable(“swimmers”)

查看DataFrame的行数

swimmers.count()
2.2. 从变量创建

使用自动类型推断的方式创建dataframe

data = [(123, “Katie”, 19, “brown”),
(234, “Michael”, 22, “green”),
(345, “Simone”, 23, “blue”)]
df = spark.createDataFrame(data, schema=[‘id’, ‘name’, ‘age’, ‘eyccolor’])
df.show()
df.count()
2.3. 读取json

读取spark下面的示例数据

file = r"D:\hadoop_spark\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\people.json"
df = spark.read.json(file)
df.show()
2.4. 读取csv

先创建csv文件

import pandas as pd
import numpy as np
df=pd.DataFrame(np.random.rand(5,5),columns=[‘a’,‘b’,‘c’,‘d’,‘e’]).
applymap(lambda x: int(x*10))
file=r"D:\hadoop_spark\spark-2.1.0-bin-hadoop2.7\examples\src\main\resources\random.csv"
df.to_csv(file,index=False)

再读取csv文件

monthlySales = spark.read.csv(file, header=True, inferSchema=True)
monthlySales.show()
2.5. 读取MySQL

此时需要将mysql-jar驱动放到spark-2.2.0-bin-hadoop2.7\jars下面

单机环境可行,集群环境不行

重新执行

df = spark.read.format(‘jdbc’).options(
url=‘jdbc:mysql://127.0.0.1’,
dbtable=‘mysql.db’,
user=‘root’,
password=‘123456’
).load()
df.show()

也可以传入SQL语句

sql=“(select * from mysql.db where db=‘wp230’) t”
df = spark.read.format(‘jdbc’).options(
url=‘jdbc:mysql://127.0.0.1’,
dbtable=sql,
user=‘root’,
password=‘123456’
).load()
df.show()
2.6. 从pandas.dataframe创建

如果不指定schema则用pandas的列名

df = pd.DataFrame(np.random.random((4,4)))
spark_df = spark.createDataFrame (df,schema=[‘a’,‘b’,‘c’,‘d’])
2.7. 从列式存储的parquet读取

读取example下面的parquet文件

file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\users.parquet"
df=spark.read.parquet(file)
df.show()
2.8. 从hive读取

如果已经配置spark连接hive的参数,可以直接读取hive数据

spark = SparkSession
.builder
.enableHiveSupport() \
.master(“172.31.100.170:7077”)
.appName(“my_first_app_name”)
.getOrCreate()

df=spark.sql(“select * from hive_tb_name”)
df.show()
2.9.从hdfs读取
直接使用read.csv的方法即可。

直接读取,不需要指定ip和port

data= spark.read.csv(‘hdfs:///tmp/_da_exdata_path/data.csv’, header=True)
data.show()

有些情况下是需要指定ip和端口的

data= spark.read.csv(‘hdfs://localhost:9000/tmp/_da_exdata_path/data.csv’, header=True)
data.show()
3. 保存数据
3.1. 写到csv

创建dataframe

import numpy as np
df = pd.DataFrame(np.random.random((4, 4)),columns=[‘a’, ‘b’, ‘c’, ‘d’])
spark_df = spark.createDataFrame(df)

写到csv

file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\test.csv"
spark_df.write.csv(path=file, header=True, sep=“,”, mode=‘overwrite’)
3.2. 保存到parquet

创建dataframe

import numpy as np
df = pd.DataFrame(np.random.random((4, 4)),columns=[‘a’, ‘b’, ‘c’, ‘d’])
spark_df = spark.createDataFrame(df)

写到parquet

file=r"D:\apps\spark-2.2.0-bin-hadoop2.7\examples\src\main\resources\test.parquet"
spark_df.write.parquet(path=file,mode=‘overwrite’)
3.3. 写到hive

打开动态分区

spark.sql(“set hive.exec.dynamic.partition.mode = nonstrict”)
spark.sql(“set hive.exec.dynamic.partition=true”)

使用普通的hive-sql写入分区表

spark.sql(“”"
insert overwrite table ai.da_aipurchase_dailysale_hive
partition (saledate)
select productid, propertyid, processcenterid, saleplatform, sku, poa, salecount, saledate
from szy_aipurchase_tmp_szy_dailysale distribute by saledate
“”")

或者使用每次重建分区表的方式

jdbcDF.write.mode(“overwrite”).partitionBy(“saledate”).insertInto(“ai.da_aipurchase_dailysale_hive”)
jdbcDF.write.saveAsTable(“ai.da_aipurchase_dailysale_hive”, None, “append”, partitionBy=‘saledate’)

不写分区表,只是简单的导入到hive表

jdbcDF.write.saveAsTable(“ai.da_aipurchase_dailysale_for_ema_predict”, None, “overwrite”, None)
3.4. 写到hdfs

数据写到hdfs,而且以csv格式保存

jdbcDF.write.mode(“overwrite”).options(header=“true”).csv(“/home/ai/da/da_aipurchase_dailysale_for_ema_predict.csv”)
3.5. 写到mysql

会自动对齐字段,也就是说,spark_df 的列不一定要全部包含MySQL的表的全部列才行

overwrite 清空表再导入

spark_df.write.mode(“overwrite”).format(“jdbc”).options(
url=‘jdbc:mysql://127.0.0.1’,
user=‘root’,
password=‘123456’,
dbtable=“test.test”,
batchsize=“1000”,
).save()

append 追加方式

spark_df.write.mode(“append”).format(“jdbc”).options(
url=‘jdbc:mysql://127.0.0.1’,
user=‘root’,
password=‘123456’,
dbtable=“test.test”,
batchsize=“1000”,
).save()

我们采用本地实验的方式,来学习下语法

face.csv文件内容如下

image_id,device_id,date_str,age,gender,glass,hat,feat
2019-03-09_8_0007f1a502433ee0d80c7f14c3bf7bc0face.jpg,8,2019-03-09,11.0,female,noglass,nohat,11111111111
2019-03-09_8_000e791eb5978a9fad084f8ad012c780face.jpg,8,2019-03-09,49.0,female,Glass.TYPE1,nohat,2222222222
2019-03-09_8_0041cad3b76d6b30103ad5dd1396276cface.jpg,8,2019-03-09,24.0,male,noglass,nohat,333333

python的示例demo


# 并行计算文件
from pyspark import SparkConf
from pyspark import SparkContextconf =SparkConf().setAppName("file_test")   # 本地4核启动
sparkContext = SparkContext.getOrCreate(conf)    # 创建context
# sparkContext.setLogLevel("info")     # 设置打印日志等级rdd=sparkContext.textFile("face.csv")  # 每行一个item
print(rdd.first())   # 读取第一行
rdd=rdd.distinct()   # 先去除重复数据
rdd=rdd.map(lambda x: x.split(','))  # 对每个item进行并行操作  flatMap会把所有item平展开合并成一个list
rdd=rdd.filter(lambda x: x[4]=='male' or x[4]=='female')  # 筛选出满足条件的item。现在每个item是个列表了
print(rdd.count())
# rdd.foreach(lambda x: print(x))   # 并行执行某些函数,返回为空   action函数
gender_group_rdd=rdd.groupBy(lambda x:'female' if x[4]=='female' else 'male') # 按性别分组,[(key,results),(key,results),]
for (key,value) in gender_group_rdd.collect():print(key, type(value))
# print(gender_group_rdd[0])print('==============key-value===================')# key-value
# rdd转换
device_rdd=rdd.map(lambda x:(x[1],1))     # 将每个item转化为(key,value),这样可以进行group。rdd中的key和value都是以元素(key,value)的形式存在的
print((device_rdd.keys().collect()))   # 获取所有的key
print((device_rdd.values().collect()))  # 获取所有的value
print(device_rdd.lookup('8'))   # 根据key,查找value,action行为,返回list# 排序函数
count_rdd=device_rdd.sortByKey(ascending=True)  # 按key排序
count_rdd=device_rdd.sortBy(lambda x: x[1],ascending=True)  # 自定义排序规则
print(count_rdd.collect())# 变换计算函数
count_rdd=device_rdd.mapValues(lambda y:y+1-1)  # 将所有value进行操作
count_rdd=count_rdd.reduceByKey(lambda x,y:x+y)  # 对key相同的value进行求和,并行后只存在不重复的key
print(count_rdd.collectAsMap())  # 以字典的形式返回数据
print(count_rdd.take(30))  # 读取前n行列表
print(count_rdd.takeOrdered(3))
print(count_rdd.takeOrdered(3, key=lambda x:-x[1]))  # 自定义排序规则# 直接计算函数
device_count = device_rdd.countByKey()  # 按key进行组内求个数,直接是action函数
print(device_count.items())# 没有很方便的groupby后的egg函数。必须要用dataframe
### int rdd
print('====================int rdd=================')
int_rdd = count_rdd.map(lambda x: x[1])  # 取出内个摄像头的人脸数目
print(int_rdd.stats())
print(int_rdd.min(),int_rdd.max(),int_rdd.stdev(),int_rdd.count(),int_rdd.sum(),int_rdd.mean())
count_dif = int_rdd.countByValue()
print(count_dif.items())# print(rdd.collect())  # 打印全部数据

dataframe语法

DataFrame 的函数

Action 操作

collect() ,返回值是一个数组,返回dataframe集合所有的行
collectAsList() 返回值是一个java类型的数组,返回dataframe集合所有的行
count() 返回一个number类型的,返回dataframe集合的行数
describe(cols: String*) 返回一个通过数学计算的类表值(count, mean, stddev, min, and max),这个可以传多个参数,中间用逗号分隔,如果有字段为空,那么不参与运算,只这对数值类型的字段。例如df.describe("age", "height").show()
first() 返回第一行 ,类型是row类型
head() 返回第一行 ,类型是row类型
head(n:Int)返回n行  ,类型是row 类型
show()返回dataframe集合的值 默认是20行,返回类型是unit
show(n:Int)返回n行,,返回值类型是unit
table(n:Int) 返回n行  ,类型是row 类型

dataframe的基本操作

cache()同步数据的内存
columns 返回一个string类型的数组,返回值是所有列的名字
dtypes返回一个string类型的二维数组,返回值是所有列的名字以及类型
explan()打印执行计划  物理的
explain(n:Boolean) 输入值为 false 或者true ,返回值是unit  默认是false ,如果输入true 将会打印 逻辑的和物理的
isLocal 返回值是Boolean类型,如果允许模式是local返回true 否则返回false
persist(newlevel:StorageLevel) 返回一个dataframe.this.type 输入存储模型类型
printSchema() 打印出字段名称和类型 按照树状结构来打印
registerTempTable(tablename:String) 返回Unit ,将df的对象只放在一张表里面,这个表随着对象的删除而删除了
schema 返回structType 类型,将字段名称和类型按照结构体类型返回
toDF()返回一个新的dataframe类型的
toDF(colnames:String*)将参数中的几个字段返回一个新的dataframe类型的,
unpersist() 返回dataframe.this.type 类型,去除模式中的数据
unpersist(blocking:Boolean)返回dataframe.this.type类型 true 和unpersist是一样的作用false 是去除RDD

集成查询:

1、 agg(expers:column*) 返回dataframe类型 ,同数学计算求值df.agg(max("age"), avg("salary"))df.groupBy().agg(max("age"), avg("salary"))
2、 agg(exprs: Map[String, String])  返回dataframe类型 ,同数学计算求值 map类型的df.agg(Map("age" -> "max", "salary" -> "avg"))df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
3、 agg(aggExpr: (String, String), aggExprs: (String, String)*)  返回dataframe类型 ,同数学计算求值df.agg(Map("age" -> "max", "salary" -> "avg"))df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
4、 apply(colName: String) 返回column类型,捕获输入进去列的对象
5、 as(alias: String) 返回一个新的dataframe类型,就是原来的一个别名
6、 col(colName: String)  返回column类型,捕获输入进去列的对象
7、 cube(col1: String, cols: String*) 返回一个GroupedData类型,根据某些字段来汇总
8、 distinct 去重 返回一个dataframe类型
9、 drop(col: Column) 删除某列 返回dataframe类型
10、 dropDuplicates(colNames: Array[String]) 删除相同的列 返回一个dataframe
11、 except(other: DataFrame) 返回一个dataframe,返回在当前集合存在的在其他集合不存在的
12、 explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ⇒ TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]) 返回值是dataframe类型,这个 将一个字段进行更多行的拆分
df.explode("name","names") {name :String=> name.split(" ")}.show();
将name字段根据空格来拆分,拆分的字段放在names里面
13、 filter(conditionExpr: String): 刷选部分数据,返回dataframe类型 df.filter("age>10").show();  df.filter(df("age")>10).show();   df.where(df("age")>10).show(); 都可以
14、 groupBy(col1: String, cols: String*) 根据某写字段来汇总返回groupedate类型   df.groupBy("age").agg(Map("age" ->"count")).show();df.groupBy("age").avg().show();都可以
15、 intersect(other: DataFrame) 返回一个dataframe,在2个dataframe都存在的元素
16、 join(right: DataFrame, joinExprs: Column, joinType: String)
一个是关联的dataframe,第二个关联的条件,第三个关联的类型:inner, outer, left_outer, right_outer, leftsemi
df.join(ds,df("name")===ds("name") and  df("age")===ds("age"),"outer").show();
17、 limit(n: Int) 返回dataframe类型  去n 条数据出来
18、 na: DataFrameNaFunctions ,可以调用dataframenafunctions的功能区做过滤 df.na.drop().show(); 删除为空的行
19、 orderBy(sortExprs: Column*) 做alise排序
20、 select(cols:string*) dataframe 做字段的刷选 df.select($"colA", $"colB" + 1)
21、 selectExpr(exprs: String*) 做字段的刷选 df.selectExpr("name","name as names","upper(name)","age+1").show();
22、 sort(sortExprs: Column*) 排序 df.sort(df("age").desc).show(); 默认是asc
23、 unionAll(other:Dataframe) 合并 df.unionAll(ds).show();
24、 withColumnRenamed(existingName: String, newName: String) 修改列表 df.withColumnRenamed("name","names").show();
25、 withColumn(colName: String, col: Column) 增加一列 df.withColumn("aa",df("name")).show();

数据类型转换

BinaryType: binary
BooleanType: boolean
ByteType: tinyint
DateType: date
DecimalType: decimal(10,0)
DoubleType: double
FloatType: float
IntegerType: int
LongType: bigint
ShortType: smallint
StringType: string
TimestampType: timestamp

两种方式

from pyspark.sql.types import DoubleType,IntegerType
changedTypedf = dataframe.withColumn("label", dataframe["show"].cast(DoubleType()))或者
changedTypedf = dataframe.withColumn("label", dataframe["show"].cast("double"))如果改变原有列的类型
toDoublefunc = UserDefinedFunction(lambda x: float(x),DoubleType())

更复杂的类型变换

types.ArrayType(types.IntegerType()).simpleString()
'array<int>'
types.MapType(types.StringType(), types.IntegerType()).simpleString()
'map<string,int>'

dataframe python示例


# 并行计算文件
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import DataFrame,SQLContext,HiveContext,SparkSession
from pyspark.sql.functions import isnull,isnan,udf
from pyspark.sql import functions
from pyspark.sql import types
from pyspark.sql.types import DoubleType,IntegerType,StringType,DateType
import datetime,time# 创建
print('=============读取保存==================')
conf =SparkConf().setAppName("file_test")   # 本地4核启动
sparkContext = SparkContext.getOrCreate(conf)    # 创建context
# sparkContext.setLogLevel("info")     # 设置打印日志等级
sqlContext = SQLContext(sparkContext)
df = sqlContext.read.csv("face.csv",header="true")  # header设置为true将设置文件中的第一行作为表头
print(df.first())
print(df.head(2))
df.write.csv('age_gender.csv',header=True,mode='overwrite')# 应用
print('===========遍历================')
def apply1(x):pass# print(x['image_id'])
df.foreach(apply1)# 变换
print('===========变换================')
df = df.withColumn("age", df["age"].cast("Int"))  # 修改列的类型
print(df.show(3))
new_df = df.withColumn('userid',df['age'].cast('int')%10)    # 新增一列,cast 可用于列类型变换df.select(col.cast('int'))
print(new_df.show(3))# new_df = df.withColumn('image_id', '')   # 修改列的值
# print(new_df.show(3))
df = df.withColumnRenamed( "date_str","date")   # 修改列名,方便join
print(df.show(3))# dataframe中的apply函数,可以遍历每一行进行变换
# 定义一个 udf 函数
def today(day):if day==None:return datetime.datetime.now()else:return datetime.datetime.strptime(day,"%y-%m-%d")# 返回类型为字符串类型
udfday = udf(today, DateType())
df.withColumn('date', udfday(df.date))  # 对每行的指定列进行变换
print(df.show(3))# 填充缺失值
df=df.fillna('')
print(df.show(3))# 替换值
df = df.replace('male','male1')  # 直接替换值# 删除列
new_df = new_df.drop('userid')   # 删除列
# 删除行
df = df.na.drop()  # 扔掉任何列包含na的行
df = df.dropna(subset=['image_id', 'feat'])  # 扔掉image_id或feat中任一一列包含na的行# 筛选过滤
print('============过滤================')
df = df.filter(~isnull("device_id"))  # 把a列里面数据为null的筛选出来(代表python的None类型)  dataframe里面取逻辑运算为&  | ~ 因为dataframe重写了符号运算
df = df.filter(~isnan("device_id"))  # 把a列里面数据为nan的筛选出来(Not a Number,非数字数据)# df=df.where("gender=='female'" )   # 过滤where和filter都支持直接python表达式的方式   表达式内可以使用and or not
# df=df.where(df['gender']=='female')  # 过滤where和filter都支持boolean矩阵, &  | ~
print(df.show(3))
device_dif=df.select('device_id').distinct()   # 去除重复
device_dif=df.select('device_id','age').dropDuplicates(['age'])   # 按指定字段去重
print('摄像头id列表',)
device_dif.show()     # show 是action动作
print('摄像头数目',device_dif.count())  # count 是action动作# 统计
print('============统计=================')
df.stat.freqItems(['device_id','gender'], 0.3).show()  # 显示列的取值出现频率超过一定百分比的列取值。有多列是就分别计算每列的高频率列取值(注意不是列组合)
df.groupby('gender').count().show()   # 分组统计数量
df.crosstab('gender', 'age').show()   # 交叉统计,统计不能性别不同年龄的人脸数目
df.groupBy('gender').agg({'device_id':'count',"age":"avg"}).show()  # 分组计算,对自己想要的列进行想要的计算
df.groupBy('gender').agg(functions.avg('age'), functions.min('age'), functions.max('age')).show()  # 每个完成多种统计计算# 数据集和pandas的转化
print('============数据集类型转化==============')
pandas_df = df.toPandas()
spark_df = sqlContext.createDataFrame(pandas_df)# 与Spark RDD的相互转换:
rdd_df = df.rdd
df = rdd_df.toDF()# SQL操作
print('===============sql操作===================')df.createOrReplaceTempView("face")   # DataFrame注册成SQL的表
conf = SparkConf()
ss = SparkSession.builder.appName("APP_NAME").config(conf=conf).getOrCreate()
df = ss.sql("SELECT age, gender FROM face WHERE age >= 13 AND age <= 19")

参考:https://blog.csdn.net/sinat_26917383/article/details/80500349

pyspark dataframe 将json字符串列转为多列

对于json对象中包含不同的key值,需要先获取所有key, 将json字符串转为struct对象, 然后再转为多列


from pyspark import SparkConf,SparkContext,SparkContext,SQLContext
from pyspark.sql import SparkSession,SQLContext,functions,types,DataFrame,SQLContext,HiveContext,SparkSessionfrom pyspark.sql.functions import isnull,isnan,udf,from_json, col
from pyspark.sql.types import DoubleType,IntegerType,StringType,DateType,StructType,StructField
import datetime,time
import json
import os# 创建spark本地运行,日志目录
try:os.mkdir('/tmp/spark-events')
except Exception as e:print(e)# 创建
print('=============读取保存==================')
conf =SparkConf().setAppName("test")   # 本地4核启动
sparkContext = SparkContext.getOrCreate(conf)    # 创建context
sparkContext.setLogLevel("warn")     # 设置打印日志等级
sqlContext = SQLContext(sparkContext)dslist=[{'r':1,'data':'{"key1":"value1","key2":"value2"}'},{'r':2,'data':'{"key3":"value11","key1":"value3"}'}]
df = sqlContext.createDataFrame(dslist)
df.show(truncate=False)
df.printSchema()
print('=====================')# 获取所有keys,方法1
rdd_data = df.rdd.map(lambda row: list(json.loads(row.data).keys()))
all_keys = rdd_data.collect()
row_keys = []
for row_key in all_keys:row_keys = row_keys+row_key
all_keys = list(set(row_keys))   # key去重
print(all_keys)field = [StructField(key, StringType()) for key in all_keys]json_schema = StructType(field)
print('=====================')# 获取所有keys,方法2,没测试  不成功
# print('=====================')
# json_schema = sqlContext.read.json(df.rdd.map(lambda row: row.data)).schema
# new_df = sqlContext.read.json(df.rdd.map(lambda r: r.data))
# print(json_schema)new_df = df.withColumn('json', from_json(col('data'), json_schema))
new_df.printSchema()
new_df.show(truncate=False)# def flatten_struct(schema, prefix=""):
#     result = []
#     for elem in schema:
#         if isinstance(elem.dataType, StructType):
#             result += flatten_struct(elem.dataType, prefix + elem.name + ".")
#         else:
#             result.append(col(prefix + elem.name).alias(prefix + elem.name))
#     return result
# new_df = new_df.select(new_df.schema)new_col=[col('r')]+[col('json.'+key).alias(key) for key in all_keys]
new_df = new_df.select(new_col)
new_df.show(truncate=False)# python json转为df
# dslist=[{"key1":"value1","key2":"value2","r":1},{"key1":"value11","key3":"value3","r":2}]
# df1 = sqlContext.createDataFrame(dslist)
# df1.show()

pyspark dataframe列的合并与拆分

使用Spark SQL在对数据进行处理的过程中,可能会遇到对一列数据拆分为多列,或者把多列数据合并为一列。这里记录一下目前想到的对DataFrame列数据进行合并和拆分的几种方法。

from pyspark.sql import SparkSession
spark = SparkSession.builder \.master("local") \.appName("dataframe_split") \.config("spark.some.config.option", "some-value") \.getOrCreate()sc = spark.sparkContext
df = spark.read.csv('hdfs://master:9000/dataset/dataframe_split.csv', inferSchema=True, header=True)
df.show(3)

原始数据如下所示

dataframe列数据的分割

from pyspark.sql.functions import split, explode, concat, concat_ws
df_split = df.withColumn("s", split(df['score'], " "))
df_split.show()

dataframe列数据的拆分

zipWithIndex:给每个元素生成一个索引

排序首先基于分区索引,然后是每个分区内的项目顺序.因此,第一个分区中的第一个item索引为0,最后一个分区中的最后一个item的索引最大.当RDD包含多个分区时此方法需要触发spark作业.

first_row = df.first()
numAttrs = len(first_row['score'].split(" "))
print("新增列的个数", numAttrs)
attrs = sc.parallelize(["score_" + str(i) for i in range(numAttrs)]).zipWithIndex().collect()
print("列名:", attrs)
for name, index in attrs:df_split = df_split.withColumn(name, df_split['s'].getItem(index))
df_split.show()

dataframe将一行分成多行

df_explode = df.withColumn("e", explode(split(df['score'], " ")))
df_explode.show()

dataframe列数据的合并

列的合并有两个函数:一个不添加分隔符concat(),一个添加分隔符concat_ws()

concat

df_concat = df_split.withColumn("score_concat", concat(df_split['score_0'], \df_split['score_1'], df_split['score_2'], df_split['score_3']))
df_concat.show()

caoncat_ws

df_ws = df_split.withColumn("score_concat", concat_ws('-', df_split['score_0'], \df_split['score_1'], df_split['score_2'], df_split['score_3']))
df_ws.show()

dataframe多行转多列

pivot: 旋转当前[[dataframe]]列并执行指定的聚合

#DataFrame 数据格式:每个用户对每部电影的评分 userID 用户ID,movieID 电影ID,rating评分
df=spark.sparkContext.parallelize([[15,399,2], \[15,1401,5], \[15,1608,4], \[15,20,4], \[18,100,3], \[18,1401,3], \[18,399,1]])\.toDF(["userID","movieID","rating"])
#pivot 多行转多列
resultDF = df.groupBy("userID").pivot("movieID").sum("rating").na.fill(-1)
#结果
resultDF.show()

参考文献:

Spark DataFrame 列的合并与拆分

Spark DataFrame 多行转多列

参考:https://blog.csdn.net/intersting/article/details/84500978

pyspark 自定义聚合函数 UDAF

自定义聚合函数 UDAF 目前有点麻烦,PandasUDFType.GROUPED_AGG 在2.3.2的版本中不知怎么回事,不能使用!

这样的话只能曲线救国了!

PySpark有一组很好的聚合函数(例如,count,countDistinct,min,max,avg,sum),但这些并不适用于所有情况(特别是如果你试图避免代价高昂的Shuffle操作)。

PySpark目前有pandas_udfs,它可以创建自定义聚合器,但是你一次只能“应用”一个pandas_udf。如果你想使用多个,你必须预先形成多个groupBys …并且避免那些改组。

在这篇文章中,我描述了一个小黑客,它使您能够创建简单的python UDF,它们对聚合数据起作用(此功能只应存在于Scala中!)。

from pyspark.sql import functions as F
from pyspark.sql import types as Ta = sc.parallelize([[1, 'a'],[1, 'b'],[1, 'b'],[2, 'c']]).toDF(['id', 'value'])
a.show()

我使用collect_list将给定组中的所有数据放入一行。我打印下面这个操作的输出。

a.groupBy('id').agg(F.collect_list('value').alias('value_list')).show()

然后我创建一个UDF,它将计算这些列表中字母’a’的所有出现(这可以很容易地在没有UDF的情况下完成,但是你明白了)。此UDF包含collect_list,因此它作用于collect_list的输出。

def find_a(x):"""Count 'a's in list."""output_count = 0for i in x:if i == 'a':output_count += 1return output_countfind_a_udf = F.udf(find_a, T.IntegerType())a.groupBy('id').agg(find_a_udf(F.collect_list('value')).alias('a_count')).show()

我们去!作用于聚合数据的UDF!接下来,我展示了这种方法的强大功能,结合何时让我们控制哪些数据进入F.collect_list。

首先,让我们创建一个带有额外列的数据框。

from pyspark.sql import functions as F
from pyspark.sql import types as Ta = sc.parallelize([[1, 1, 'a'],[1, 2, 'a'],[1, 1, 'b'],[1, 2, 'b'],[2, 1, 'c']]).toDF(['id', 'value1', 'value2'])
a.show()

请注意,我如何在collect_list中包含一个when。请注意,UDF仍然包含collect_list。

a.groupBy('id').agg(find_a_udf( F.collect_list(F.when(F.col('value1') == 1, F.col('value2')))).alias('a_count')).show()

还有一种做法就是用pandas_udf, series 添加一列分组变量然后去重。

还有就是使用输入输出都是dataframe 的 pandas_udf

参考:https://www.cnblogs.com/wdmx/p/10156500.html

pyspark.sql.functions详解

pyspark.sql.functions包含了很多内置函数。

1.pyspark.sql.functions.abs(col)
计算绝对值。

2.pyspark.sql.functions.acos(col)
计算给定值的反余弦值; 返回的角度在0到π的范围内。

3.pyspark.sql.functions.add_months(start, months)
返回start后months个月的日期

4.pyspark.sql.functions.array_contains(col, value)
集合函数:如果数组包含给定值,则返回True。 收集元素和值必须是相同的类型。

5.pyspark.sql.functions.ascii(col)
计算字符串列的第一个字符的数值。

6.pyspark.sql.functions.avg(col)
聚合函数:返回组中的值的平均值。

7.pyspark.sql.functions.cbrt(col)
计算给定值的立方根。

8.pyspark.sql.functions.ceil(col)
计算给定值的上限。

9.pyspark.sql.functions.coalesce(*cols)
返回不为空的第一列。

10.pyspark.sql.functions.col(col)
根据给定的列名返回一个列。

11.pyspark.sql.functions.collect_list(col)
聚合函数:返回重复对象的列表。

12.pyspark.sql.functions.collect_set(col)
聚合函数:返回一组消除重复元素的对象。

13.pyspark.sql.functions.concat(*cols)
将多个输入字符串列连接成一个字符串列。

14.pyspark.sql.functions.concat_ws(sep, *cols)
使用给定的分隔符将多个输入字符串列连接到一个字符串列中。

15.pyspark.sql.functions.corr(col1, col2)
返回col1和col2的皮尔森相关系数的新列。

16.pyspark.sql.functions.cos(col)
计算给定值的余弦。

17.pyspark.sql.functions.cosh(col)
计算给定值的双曲余弦。

18.pyspark.sql.functions.count(col)
聚合函数:返回组中的项数量。

19.pyspark.sql.functions.countDistinct(col, *cols)
返回一列或多列的去重计数的新列。

20.pyspark.sql.functions.current_date()
以日期列的形式返回当前日期。

21.pyspark.sql.functions.current_timestamp()
将当前时间戳作为时间戳列返回。

22.pyspark.sql.functions.date_add(start, days)
返回start后days天的日期

23.pyspark.sql.functions.date_format(date, format)
将日期/时间戳/字符串转换为由第二个参数给定日期格式指定格式的字符串值。
一个模式可能是例如dd.MM.yyyy,可能会返回一个字符串,如“18 .03.1993”。 可以使用Java类java.text.SimpleDateFormat的所有模式字母。
注意:尽可能使用像年份这样的专业功能。 这些受益于专门的实施。

24.pyspark.sql.functions.date_sub(start, days)
返回start前days天的日期

25.pyspark.sql.functions.datediff(end, start)
返回从start到end的天数。

26.pyspark.sql.functions.dayofmonth(col)
将给定日期的月份的天解压为整数。

27.pyspark.sql.functions.dayofyear(col)
将给定日期的年份中的某一天提取为整数。

28.pyspark.sql.functions.desc(col)
基于给定列名称的降序返回一个排序表达式。

29.pyspark.sql.functions.exp(col)
计算给定值的指数。

30.pyspark.sql.functions.expm1(col)
计算给定值的指数减1。

31.pyspark.sql.functions.factorial(col)
计算给定值的阶乘。

32.pyspark.sql.functions.floor(col)
计算给定值的最小。

33.pyspark.sql.functions.format_number(col, d)
将数字X格式化为像’#, - #, - #.-'这样的格式,四舍五入到小数点后的位置,并以字符串形式返回结果。
参数:● col – 要格式化的数值的列名称
● d – N小数位

34.pyspark.sql.functions.format_string(format, *cols)
以printf样式格式化参数,并将结果作为字符串列返回。
参数:● format – 要格式化的格式
● cols - 要格式化的列

35.pyspark.sql.functions.hex(col)
计算给定列的十六进制值,可以是StringType,BinaryType,IntegerType或LongType

36.pyspark.sql.functions.hour(col)
将给定日期的小时数提取为整数。

37.pyspark.sql.functions.hypot(col1, col2)
计算sqrt(a ^ 2 ^ + b ^ 2 ^),无中间上溢或下溢。

38.pyspark.sql.functions.initcap(col)
在句子中将每个单词的第一个字母翻译成大写。

39.pyspark.sql.functions.isnan(col)
如果列是NaN,则返回true的表达式。

40.pyspark.sql.functions.kurtosis(col)
聚合函数:返回组中的值的峰度。

41.pyspark.sql.functions.last(col)
聚合函数:返回组中的最后一个值。

42.pyspark.sql.functions.last_day(date)
返回给定日期所属月份的最后一天。

43.pyspark.sql.functions.lit(col)
创建一个文字值的列

44.pyspark.sql.functions.log(arg1, arg2=None)
返回第二个参数的第一个基于参数的对数。
如果只有一个参数,那么这个参数就是自然对数。

45.pyspark.sql.functions.log1p(col)
计算给定值的自然对数加1。

46.pyspark.sql.functions.log2(col)
返回参数的基数为2的对数。

47.pyspark.sql.functions.lower(col)
将字符串列转换为小写。

48.pyspark.sql.functions.ltrim(col)
从左端修剪指定字符串值的空格。

49.pyspark.sql.functions.minute(col)
提取给定日期的分钟数为整数

50.pyspark.sql.functions.monotonically_increasing_id()
生成单调递增的64位整数的列。

生成的ID保证是单调递增和唯一的,但不是连续的。 当前的实现将分区ID放在高31位,并将每个分区内的记录号放在低33位。 假设
数据帧的分区少于10亿个,每个分区少于80亿条记录

例如,考虑一个DataFrame有两个分区,每个分区有三个记录。 该表达式将返回以下ID:0,1,2,8589934592(1L << 33),
8589934593,8589934594

51.pyspark.sql.functions.month(col)
将给定日期的月份提取为整数

52.pyspark.sql.functions.months_between(date1, date2)
返回date1和date2之间的月数。

53.pyspark.sql.functions.rand(seed=None)
用i.i.d生成一个随机列 来自样本[0.0,1.0]。

54.pyspark.sql.functions.randn(seed=None)
用i.i.d生成一列 来自标准正态分布的样本。

55.pyspark.sql.functions.reverse(col)
反转字符串列并将其作为新的字符串列返回

56.pyspark.sql.functions.rtrim(col)
从右端修剪指定字符串值的空格

57.pyspark.sql.functions.skewness(col)
聚合函数:返回组中值的偏度

58.pyspark.sql.functions.sort_array(col, asc=True)
集合函数:按升序对给定列的输入数组进行排序。
参数:col – 列或表达式名称

59.pyspark.sql.functions.split(str, pattern)
将模式分割(模式是正则表达式)。
注:pattern是一个字符串表示正则表达式。

60.pyspark.sql.functions.sqrt(col)
计算指定浮点值的平方根

61.pyspark.sql.functions.stddev(col)
聚合函数:返回组中表达式的无偏样本标准差

62.pyspark.sql.functions.sumDistinct(col)
聚合函数:返回表达式中不同值的总和

63.pyspark.sql.functions.to_date(col)
将StringType或TimestampType的列转换为DateType

64.pyspark.sql.functions.trim(col)
修剪指定字符串列的两端空格。

65.pyspark.sql.functions.trunc(date, format)
返回截断到格式指定单位的日期

参数: format – ‘year’, ‘YYYY’, ‘yy’ or ‘month’, ‘mon’, ‘mm’

66.pyspark.sql.functions.var_samp(col)
聚合函数:返回组中值的无偏差

67.pyspark.sql.functions.variance(col)
聚合函数:返回组中值的总体方差

参考:原文:https://blog.csdn.net/htbeker/article/details/86233819

pyspark操作 rdd dataframe,pyspark.sql.functions详解 行列变换相关推荐

  1. pyspark.sql.functions详解

    pyspark.sql.functions包含了很多内置函数. 1.pyspark.sql.functions.abs(col) 计算绝对值. 2.pyspark.sql.functions.acos ...

  2. oracle11g 常用函数(Functions)详解

    Oracle11g常用函数(Functions)详解 目录 ABS. 3 ACOS. 3 ADD_MONTHS. 4 ASCII 4 ASCIISTR. 5 ASIN.. 5 ATAN.. 5 ATA ...

  3. MySQL之SQL优化详解(二)

    目录 MySQL之SQL优化详解(二) 1. SQL的执行顺序 1.1 手写顺序 1.2 机读顺序 2. 七种join 3. 索引 3.1 索引初探 3.2 索引分类 3.3 建与不建 4. 性能分析 ...

  4. mysql切换用户sql语句,MySQL用户管理及SQL语句详解

    [(none)]>select user,host frommysql.user; #查询用户和主机+---------------+-----------+ | user | host | + ...

  5. [顶]ORACLE PL/SQL编程详解之二:PL/SQL块结构和组成元素(为山九仞,岂一日之功)...

    [顶]ORACLE PL/SQL编程详解之二:PL/SQL块结构和组成元素(为山九仞,岂一日之功) 原文:[顶]ORACLE PL/SQL编程详解之二:PL/SQL块结构和组成元素(为山九仞,岂一日之 ...

  6. [强烈推荐]ORACLE PL/SQL编程详解之七:程序包的创建与应用(聪明在于学习,天才在于积累!)...

    [强烈推荐]ORACLE PL/SQL编程详解之七: 程序包的创建与应用(聪明在于学习,天才在于积累!) --通过知识共享树立个人品牌.   继上七篇:            [推荐]ORACLE P ...

  7. mysql索引linke和等于_MySQL之SQL优化详解(三)

    摘要: 致索引失效而转向全表扫描存储引擎不能使用索引中范围条件右边的列mysql在使用不等于(!=或者<>)的时候无法使用索引会导致全表扫描isnull,isnotnull也无法使用索引l ...

  8. Oracle PL/SQL编程详解

    Oracle PL/SQL编程详解 - 古立 - 博客园 <我的网络摘抄本> 网摘/转载/备忘/随记 博客园 首页 新随笔 联系 管理 订阅 随笔- 84  文章- 0  评论- 0  & ...

  9. SQL注入详解及预防

    SQL注入详解及预防 1.1.1 摘要 日前,国内最大的程序员社区CSDN网站的用户数据库被黑客公开发布,600万用户的登录名及密码被公开泄露,随后又有多家网站的用户密码被流传于网络,连日来引发众多网 ...

最新文章

  1. 生产中NFS案例记录---写入权限解决过程
  2. 有6个候选人,100个选民,每个选民选择一个侯选人投票;从键盘输入每个选民选择的候选人名,统计并输出6个候选人的票数。java,c++实现
  3. 数据库期末复习之并发控制
  4. java超时导致oracle锁表_java – 正确的设计,以避免Oracle死锁?
  5. Qt for Python Mac下使用 fbs 打包软件
  6. JavaScript 对引擎、运行时、调用堆栈的概述理解
  7. html点击按钮获取列表数据传递到页面,随笔:ajax传递数组,layui父子页面传值,下来菜单赋值,父页面数据获取,表格数据替换为字符串...
  8. java中interrupt_Java中interrupt的使用
  9. iOS 对付内存泄漏,来说说我的调试方法
  10. hive insert into语句 和 insert overwrite语句
  11. 最近 搞定这5篇 java相关
  12. TuGraph安装与简单使用
  13. java大文件md5快速计算_java 计算文件MD5值 大文件
  14. 教你怎么搞定P2P终结者的网速限制
  15. linux权限英文,Linux常见英文报错中文翻译(菜鸟必知)
  16. HSV对应不同颜色的灰度空间
  17. 落日海鸥(Seagulls)
  18. 用计算机画画的意义,浅谈电脑绘画在美术教学中的意义.doc
  19. Windows上查看CUDA是否安装成功
  20. 可怜的博主跟小豆人杠起来啦!Python制作的吃豆人小游戏,快来围观!!

热门文章

  1. 一名90后的程序员之路(三) 工程师与码农
  2. 成年人的世界不敢轻易崩溃
  3. CodeForces - 91 B.Queue (单调栈)
  4. JSLint options JSLint 设置
  5. Typora 上传图片到 gitee
  6. CVPR2020最佳新框架|大规模人脸表情识别(附源代码)
  7. tomcat配置session共享
  8. 【Python_Scrapy学习笔记(十三)】基于Scrapy框架的图片管道实现图片抓取
  9. Docker全方位攻略与自动化运维
  10. 基于“总价、首付、贷款年限、利息、贷款额度”信息,计算每月最低还款额度。需要能按“等额本金、等额本息”两种形式计算还款