Python大数据处理库 PySpark实战 总结四
Python大数据处理库 PySpark实战四
- ETL 实战
- 实验数据来源
- 数据加载
- 观察资料
- 选择、筛选与聚合
- 机器学习实战
- 实验数据来源
- 数据加载
- 统计描述
- 清洗与变形
- Pipeline
- 逻辑回归预测
- 决策树预测
ETL 实战
实验数据来源
- https://groupllens.org/datasets/movielens/
- 下载一个精简数据集。rating.csv 电影评分记录 :userId给电影评价的用户ID movieId 电影的ID rating 打分5分满分,timestamp时间戳
数据加载
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext
from pyspark.sql.functions import to_utc_timestampspark = SparkSession.builder.master("local[*]").appName("PySpark ETL").getOrCreate()
sc = spark.sparkContext
#############################################
#相对路径,文件包含标题行
df = spark.read.csv('hdfs://localhost:9000/ml-latest-small/ratings.csv',header=True)
#打印默认的字段类型信息
df.printSchema()
#打印前20条数据
df.show()
#打印总行数
print(df.count())root|-- userId: string (nullable = true)|-- movieId: string (nullable = true)|-- rating: string (nullable = true)|-- timestamp: string (nullable = true)+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
| 1| 1| 4.0|964982703|
| 1| 3| 4.0|964981247|
| 1| 6| 4.0|964982224|
| 1| 47| 5.0|964983815|
| 1| 50| 5.0|964982931|
| 1| 70| 3.0|964982400|
| 1| 101| 5.0|964980868|
| 1| 110| 4.0|964982176|
| 1| 151| 5.0|964984041|
| 1| 157| 5.0|964984100|
| 1| 163| 5.0|964983650|
| 1| 216| 5.0|964981208|
| 1| 223| 3.0|964980985|
| 1| 231| 5.0|964981179|
| 1| 235| 4.0|964980908|
| 1| 260| 5.0|964981680|
| 1| 296| 3.0|964982967|
| 1| 316| 3.0|964982310|
| 1| 333| 5.0|964981179|
| 1| 349| 4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows100836
但是我们发现读取文件后,用printSchema打印各个字段类型都是string,不满足业务的实际情况
df = spark.read.csv('hdfs://localhost:9000/ml-latest-small/ratings.csv',header=True,inferSchema=True)
df.printSchema()
root|-- userId: integer (nullable = true)|-- movieId: integer (nullable = true)|-- rating: double (nullable = true)|-- timestamp: integer (nullable = true) |-- timestamp: integer (nullable = true)
观察资料
# Matplotlib 绘制折线图
import matplotlib
import matplotlib.pyplot as plt
import numpy as np#支持中文,否则乱码
plt.rcParams['font.family'] = ['sans-serif']
plt.rcParams['font.sans-serif'] = ['SimHei']
#准备数据
t = np.arange(0.0, 2.0, 0.01)
s = 1 + np.sin(2 * np.pi * t)
fig, ax = plt.subplots()
#设置窗口标题
fig.canvas.set_window_title('折线图示例')
#绘图,折线图
ax.plot(t, s)
#坐标轴设置
ax.set(xlabel='时间 (s)', ylabel='电压 (mV)',title='折线图')
ax.grid()
plt.show()
对rating.csv数据进行探究
df = spark.read.csv('hdfs://localhost:9000/ml-latest-small/ratings.csv',header=True,inferSchema=True)
df.show(10)
df.printSchema()
# 对rating列进行数据统计分析
df.select("rating").describe().show()
#打印资料的行数量和列数量
print(df.count(),len(df.columns))#删除所有列的空值和NaN
dfNotNull=df.na.drop()
print(dfNotNull.count(),len(dfNotNull.columns))#创建视图movie
df.createOrReplaceTempView("movie")
#spark sql
df2 = spark.sql("select count(*) as counter, rating from movie \group by rating order by rating asc")
df2.show()
##############################################
from matplotlib import pyplot as plt
pdf = df2.toPandas()
x = pdf["rating"]
y = pdf["counter"]
plt.xlabel("rating")
plt.ylabel("counter")
plt.title("movie rating")
plt.bar(x,y)
#显示数值标签
for x1,y1 in zip(x,y):plt.text(x1, y1+0.05, '%.0f' %y1, ha='center', va= 'bottom',fontsize=11)
plt.show()+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
| 1| 1| 4.0|964982703|
| 1| 3| 4.0|964981247|
| 1| 6| 4.0|964982224|
| 1| 47| 5.0|964983815|
| 1| 50| 5.0|964982931|
| 1| 70| 3.0|964982400|
| 1| 101| 5.0|964980868|
| 1| 110| 4.0|964982176|
| 1| 151| 5.0|964984041|
| 1| 157| 5.0|964984100|
+------+-------+------+---------+
only showing top 10 rowsroot|-- userId: integer (nullable = true)|-- movieId: integer (nullable = true)|-- rating: double (nullable = true)|-- timestamp: integer (nullable = true)+-------+------------------+
|summary| rating|
+-------+------------------+
| count| 100836|
| mean| 3.501556983616962|
| stddev|1.0425292390606342|
| min| 0.5|
| max| 5.0|
+-------+------------------+100836 4
100836 4
+-------+------+
|counter|rating|
+-------+------+
| 1370| 0.5|
| 2811| 1.0|
| 1791| 1.5|
| 7551| 2.0|
| 5550| 2.5|
| 20047| 3.0|
| 13136| 3.5|
| 26818| 4.0|
| 8551| 4.5|
| 13211| 5.0|
+-------+------+
选择、筛选与聚合
- 转换时间
- UNIX_TIMESTAMP:是把时间字段转化为整型,需要注意的是有些数据库需要指明时间字段类型,比如MySQL里是可以直接UNIX_TIMESTAMP(‘20200223’),而某些大数据平台需要UNIX_TIMESTAMP(‘20200223’,‘yyyyMMdd’)
- FROM_UNIXTIME:从整型里把时间整型进行破解成想要的时间格式,使用时可指定格式
from pyspark.sql.functions import from_unixtime
df = spark.read.csv('hdfs://localhost:9000/ml-latest-small/ratings.csv',header=True,inferSchema=True)
df = df.withColumn("rating", df.rating.cast("double"))
#新增一列date
df = df.withColumn("date",from_unixtime(df.timestamp.cast("bigint"), 'yyyy-MM-dd'))
df = df.withColumn("date",df.date.cast("date"))
#删除timestamp列
df = df.drop("timestamp")
df.show(10)+------+-------+------+----------+
|userId|movieId|rating| date|
+------+-------+------+----------+
| 1| 1| 4.0|2000-07-31|
| 1| 3| 4.0|2000-07-31|
| 1| 6| 4.0|2000-07-31|
| 1| 47| 5.0|2000-07-31|
| 1| 50| 5.0|2000-07-31|
| 1| 70| 3.0|2000-07-31|
| 1| 101| 5.0|2000-07-31|
| 1| 110| 4.0|2000-07-31|
| 1| 151| 5.0|2000-07-31|
| 1| 157| 5.0|2000-07-31|
+------+-------+------+----------+
- Inner Join 电影表
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udfdf2 = spark.read.csv('hdfs://localhost:9000/ml-latest-small/movies.csv',header=True)
df3 = df.join(df2, df.movieId == df2.movieId,"inner").select("userId",df.movieId,"title","date","rating")
#定义普通的python函数
def isLike(v):if v > 4:return Trueelse:return False
#创建udf函数
udf_isLike=udf(isLike,BooleanType())
df3 = df3.withColumn("isLike",udf_isLike(df3["rating"]))
df3.show()+------+-------+--------------------+----------+------+------+
|userId|movieId| title| date|rating|isLike|
+------+-------+--------------------+----------+------+------+
| 1| 1| Toy Story (1995)|2000-07-31| 4.0| false|
| 1| 3|Grumpier Old Men ...|2000-07-31| 4.0| false|
| 1| 6| Heat (1995)|2000-07-31| 4.0| false|
| 1| 47|Seven (a.k.a. Se7...|2000-07-31| 5.0| true|
| 1| 50|Usual Suspects, T...|2000-07-31| 5.0| true|
| 1| 70|From Dusk Till Da...|2000-07-31| 3.0| false|
| 1| 101|Bottle Rocket (1996)|2000-07-31| 5.0| true|
| 1| 110| Braveheart (1995)|2000-07-31| 4.0| false|
| 1| 151| Rob Roy (1995)|2000-07-31| 5.0| true|
| 1| 157|Canadian Bacon (1...|2000-07-31| 5.0| true|
+------+-------+--------------------+----------+------+------+
- 定义Pandas UDF函数,对象聚合 要求Pyspark2.3 以上
from pyspark.sql.functions import pandas_udf, udf#定义pandas udf函数,用于GroupedData
@pandas_udf("string", PandasUDFType.GROUPED_AGG)
def fmerge(v):return ','.join(v)df5 = spark.read.csv('hdfs://localhost:9000/ml-latest-small/tags.csv',header=True)
df5 = df5.drop("timestamp")
#groupBy聚合
df7 = df5.groupBy(["userId","movieId"]).agg(fmerge(df5["tag"]))
df7 = df7.withColumnRenamed("fmerge(tag)","tags")
#select选择
df6 = df3.join(df7,(df3.movieId == df7.movieId) & (df3.userId == df7.userId))\.select(df3.userId,df3.movieId,"title","date","tags","rating","isLike") \.orderBy(["date"], ascending=[0])
#filter筛选
df6 = df6.filter(df.date>'2015-10-25')
df6.show(20)
- 存储数据
#存储数据text格式
f6.rdd.coalesce(1).saveAsTextFile("movie-out")
#存储数据CSV格式
df6.coalesce(1).write.format("csv").option("header","true").save("movie-out-csv")
#parquet格式
df6.write.format('parquet').save("movie-out-parquet")
#json格式
df6.coalesce(1).write.format("json").save("movie-out-json")
#存储到数据库
#需要把对应的数据库驱动安装到目录jars下
#save to file
db_url = "jdbc:sqlserver://localhost:1433;databaseName=bg_data;user=root;password=root"
db_table = "movie"
#overwrite会重新生成表 append
df6.write.mode("overwrite").jdbc(db_url, db_table)
机器学习实战
实验数据来源
- https://www.kaggle.com/c/titanic
- 通过训练数据集分析哪些乘客可能幸存
数据加载
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext
from pyspark.sql.functions import to_utc_timestampspark = SparkSession.builder.master("local[*]").appName("PySpark AI").getOrCreate()
sc = spark.sparkContextprint("Titanic train.csv Info")
df_train = spark.read.csv('hdfs://localhost:9000/titanic/train.csv',header=True,inferSchema=True).cache()
df_train.printSchema()
print(df_train.count(),len(df_train.columns))
df_train.show()
print("#############################################")
print("Titanic test.csv Info")
df_test = spark.read.csv('hdfs://localhost:9000/titanic/test.csv',header=True,inferSchema=True).cache()
df_test.printSchema()
print(df_test.count(),len(df_test.columns))
df_test.show()Titanic train.csv Info
root|-- PassengerId: integer (nullable = true)|-- Survived: integer (nullable = true)|-- Pclass: integer (nullable = true)|-- Name: string (nullable = true)|-- Sex: string (nullable = true)|-- Age: double (nullable = true)|-- SibSp: integer (nullable = true)|-- Parch: integer (nullable = true)|-- Ticket: string (nullable = true)|-- Fare: double (nullable = true)|-- Cabin: string (nullable = true)|-- Embarked: string (nullable = true)891 12
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
| 1| 0| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| null| S|
| 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C|
| 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| null| S|
| 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 113803| 53.1| C123| S|
| 5| 0| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 373450| 8.05| null| S|
| 6| 0| 3| Moran, Mr. James| male|null| 0| 0| 330877| 8.4583| null| Q|
| 7| 0| 1|McCarthy, Mr. Tim...| male|54.0| 0| 0| 17463|51.8625| E46| S|
| 8| 0| 3|Palsson, Master. ...| male| 2.0| 3| 1| 349909| 21.075| null| S|
| 9| 1| 3|Johnson, Mrs. Osc...|female|27.0| 0| 2| 347742|11.1333| null| S|
| 10| 1| 2|Nasser, Mrs. Nich...|female|14.0| 1| 0| 237736|30.0708| null| C|
| 11| 1| 3|Sandstrom, Miss. ...|female| 4.0| 1| 1| PP 9549| 16.7| G6| S|
| 12| 1| 1|Bonnell, Miss. El...|female|58.0| 0| 0| 113783| 26.55| C103| S|
| 13| 0| 3|Saundercock, Mr. ...| male|20.0| 0| 0| A/5. 2151| 8.05| null| S|
| 14| 0| 3|Andersson, Mr. An...| male|39.0| 1| 5| 347082| 31.275| null| S|
| 15| 0| 3|Vestrom, Miss. Hu...|female|14.0| 0| 0| 350406| 7.8542| null| S|
| 16| 1| 2|Hewlett, Mrs. (Ma...|female|55.0| 0| 0| 248706| 16.0| null| S|
| 17| 0| 3|Rice, Master. Eugene| male| 2.0| 4| 1| 382652| 29.125| null| Q|
| 18| 1| 2|Williams, Mr. Cha...| male|null| 0| 0| 244373| 13.0| null| S|
| 19| 0| 3|Vander Planke, Mr...|female|31.0| 1| 0| 345763| 18.0| null| S|
| 20| 1| 3|Masselmani, Mrs. ...|female|null| 0| 0| 2649| 7.225| null| C|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 20 rows#############################################
Titanic test.csv Info
root|-- PassengerId: integer (nullable = true)|-- Pclass: integer (nullable = true)|-- Name: string (nullable = true)|-- Sex: string (nullable = true)|-- Age: double (nullable = true)|-- SibSp: integer (nullable = true)|-- Parch: integer (nullable = true)|-- Ticket: string (nullable = true)|-- Fare: double (nullable = true)|-- Cabin: string (nullable = true)|-- Embarked: string (nullable = true)418 11
+-----------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|
+-----------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
| 892| 3| Kelly, Mr. James| male|34.5| 0| 0| 330911| 7.8292| null| Q|
| 893| 3|Wilkes, Mrs. Jame...|female|47.0| 1| 0| 363272| 7.0| null| S|
| 894| 2|Myles, Mr. Thomas...| male|62.0| 0| 0| 240276| 9.6875| null| Q|
| 895| 3| Wirz, Mr. Albert| male|27.0| 0| 0| 315154| 8.6625| null| S|
| 896| 3|Hirvonen, Mrs. Al...|female|22.0| 1| 1| 3101298|12.2875| null| S|
| 897| 3|Svensson, Mr. Joh...| male|14.0| 0| 0| 7538| 9.225| null| S|
| 898| 3|Connolly, Miss. Kate|female|30.0| 0| 0| 330972| 7.6292| null| Q|
| 899| 2|Caldwell, Mr. Alb...| male|26.0| 1| 1| 248738| 29.0| null| S|
| 900| 3|Abrahim, Mrs. Jos...|female|18.0| 0| 0| 2657| 7.2292| null| C|
| 901| 3|Davies, Mr. John ...| male|21.0| 2| 0| A/4 48871| 24.15| null| S|
| 902| 3| Ilieff, Mr. Ylio| male|null| 0| 0| 349220| 7.8958| null| S|
| 903| 1|Jones, Mr. Charle...| male|46.0| 0| 0| 694| 26.0| null| S|
| 904| 1|Snyder, Mrs. John...|female|23.0| 1| 0| 21228|82.2667| B45| S|
| 905| 2|Howard, Mr. Benjamin| male|63.0| 1| 0| 24065| 26.0| null| S|
| 906| 1|Chaffee, Mrs. Her...|female|47.0| 1| 0| W.E.P. 5734| 61.175| E31| S|
| 907| 2|del Carlo, Mrs. S...|female|24.0| 1| 0| SC/PARIS 2167|27.7208| null| C|
| 908| 2| Keane, Mr. Daniel| male|35.0| 0| 0| 233734| 12.35| null| Q|
| 909| 3| Assaf, Mr. Gerios| male|21.0| 0| 0| 2692| 7.225| null| C|
| 910| 3|Ilmakangas, Miss....|female|27.0| 1| 0|STON/O2. 3101270| 7.925| null| S|
| 911| 3|"Assaf Khalil, Mr...|female|45.0| 0| 0| 2696| 7.225| null| C|
+-----------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 20 rows
统计描述
df_train = spark.read.csv('hdfs://localhost:9000/titanic/train.csv',header=True,inferSchema=True).cache()
#计算基本的统计描述信息
df_train.describe("Age","Pclass","SibSp","Parch").show()
df_train.describe("Sex","Cabin","Embarked","Fare","Survived").show()pdf = df_train.groupBy('sex','Survived') \.agg({'PassengerId': 'count'}) \.withColumnRenamed("count(PassengerId)","count") \.orderBy("sex") \.toPandas()
print(pdf)pdf = df_train.groupBy('Pclass','Survived') \.agg({'PassengerId': 'count'}) \.withColumnRenamed("count(PassengerId)","count") \.orderBy("Pclass") \.toPandas()
print(pdf)pdf = df_train.groupBy('Parch','Survived') \.agg({'PassengerId': 'count'}) \.withColumnRenamed("count(PassengerId)","count") \.orderBy("Parch") \.toPandas()
print(pdf)pdf = df_train.groupBy('SibSp','Survived') \.agg({'PassengerId': 'count'}) \.withColumnRenamed("count(PassengerId)","count") \.orderBy("SibSp") \.toPandas()
print(pdf)+-------+------------------+------------------+------------------+-------------------+
|summary| Age| Pclass| SibSp| Parch|
+-------+------------------+------------------+------------------+-------------------+
| count| 714| 891| 891| 891|
| mean| 29.69911764705882| 2.308641975308642|0.5230078563411896|0.38159371492704824|
| stddev|14.526497332334035|0.8360712409770491|1.1027434322934315| 0.8060572211299488|
| min| 0.42| 1| 0| 0|
| max| 80.0| 3| 8| 6|
+-------+------------------+------------------+------------------+-------------------++-------+------+-----+--------+-----------------+-------------------+
|summary| Sex|Cabin|Embarked| Fare| Survived|
+-------+------+-----+--------+-----------------+-------------------+
| count| 891| 204| 889| 891| 891|
| mean| null| null| null| 32.2042079685746| 0.3838383838383838|
| stddev| null| null| null|49.69342859718089|0.48659245426485753|
| min|female| A10| C| 0.0| 0|
| max| male| T| S| 512.3292| 1|
+-------+------+-----+--------+-----------------+-------------------+sex Survived count
0 female 1 233
1 female 0 81
2 male 0 468
3 male 1 109Pclass Survived count
0 1 0 80
1 1 1 136
2 2 1 87
3 2 0 97
4 3 1 119
5 3 0 372Parch Survived count
0 0 0 445
1 0 1 233
2 1 0 53
3 1 1 65
4 2 1 40
5 2 0 40
6 3 1 3
7 3 0 2
8 4 0 4
9 5 0 4
10 5 1 1
11 6 0 1SibSp Survived count
0 0 0 398
1 0 1 210
2 1 0 97
3 1 1 112
4 2 1 13
5 2 0 15
6 3 1 4
7 3 0 12
8 4 0 15
9 4 1 3
10 5 0 5
11 8 0 7
清洗与变形
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexerdf_train = spark.read.csv('hdfs://localhost:9000/titanic/train.csv',header=True,inferSchema=True).cache()
#用平均值29.699替换缺失值
df_train = df_train.fillna({'Age': round(29.699,2)})
#用登录最多的港口'S'替换缺失值
df_train = df_train.fillna({'Embarked': 'S'})
#df_train = df_train.fillna({'Cabin': 'unknown'})
#删除列
df_train = df_train.drop("Cabin")
df_train = df_train.drop("Ticket")#StringIndexer转换器把一列标签类型的特征数值化编码
labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked")
model = labelIndexer.fit(df_train)
df_train = model.transform(df_train)labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex")
model = labelIndexer.fit(df_train)
df_train = model.transform(df_train)df_train.show()
# 特征选择
features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked','Survived']
train_features = df_train[features]
train_features.show()
# train_labels = df_train['Survived']
# train_labels.show()# VectorAssembler将多个列转换成向量
df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked'], outputCol="features")
df = df_assembler.transform(train_features)#df["features"].show()-> TypeError: 'Column' object is not callable
df["features",].show()
df["Survived",].show()+-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+---------+----+
|PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Fare|Embarked|iEmbarked|iSex|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+---------+----+
| 1| 0| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| 7.25| S| 0.0| 0.0|
| 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0|71.2833| C| 1.0| 1.0|
| 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0| 7.925| S| 0.0| 1.0|
| 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 53.1| S| 0.0| 1.0|
| 5| 0| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 8.05| S| 0.0| 0.0|
| 6| 0| 3| Moran, Mr. James| male|30.0| 0| 0| 8.4583| Q| 2.0| 0.0|
| 7| 0| 1|McCarthy, Mr. Tim...| male|54.0| 0| 0|51.8625| S| 0.0| 0.0|
| 8| 0| 3|Palsson, Master. ...| male| 2.0| 3| 1| 21.075| S| 0.0| 0.0|
| 9| 1| 3|Johnson, Mrs. Osc...|female|27.0| 0| 2|11.1333| S| 0.0| 1.0|
| 10| 1| 2|Nasser, Mrs. Nich...|female|14.0| 1| 0|30.0708| C| 1.0| 1.0|
| 11| 1| 3|Sandstrom, Miss. ...|female| 4.0| 1| 1| 16.7| S| 0.0| 1.0|
| 12| 1| 1|Bonnell, Miss. El...|female|58.0| 0| 0| 26.55| S| 0.0| 1.0|
| 13| 0| 3|Saundercock, Mr. ...| male|20.0| 0| 0| 8.05| S| 0.0| 0.0|
| 14| 0| 3|Andersson, Mr. An...| male|39.0| 1| 5| 31.275| S| 0.0| 0.0|
| 15| 0| 3|Vestrom, Miss. Hu...|female|14.0| 0| 0| 7.8542| S| 0.0| 1.0|
| 16| 1| 2|Hewlett, Mrs. (Ma...|female|55.0| 0| 0| 16.0| S| 0.0| 1.0|
| 17| 0| 3|Rice, Master. Eugene| male| 2.0| 4| 1| 29.125| Q| 2.0| 0.0|
| 18| 1| 2|Williams, Mr. Cha...| male|30.0| 0| 0| 13.0| S| 0.0| 0.0|
| 19| 0| 3|Vander Planke, Mr...|female|31.0| 1| 0| 18.0| S| 0.0| 1.0|
| 20| 1| 3|Masselmani, Mrs. ...|female|30.0| 0| 0| 7.225| C| 1.0| 1.0|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+---------+----+
only showing top 20 rows+------+----+----+-----+-----+-------+---------+--------+
|Pclass|iSex| Age|SibSp|Parch| Fare|iEmbarked|Survived|
+------+----+----+-----+-----+-------+---------+--------+
| 3| 0.0|22.0| 1| 0| 7.25| 0.0| 0|
| 1| 1.0|38.0| 1| 0|71.2833| 1.0| 1|
| 3| 1.0|26.0| 0| 0| 7.925| 0.0| 1|
| 1| 1.0|35.0| 1| 0| 53.1| 0.0| 1|
| 3| 0.0|35.0| 0| 0| 8.05| 0.0| 0|
| 3| 0.0|30.0| 0| 0| 8.4583| 2.0| 0|
| 1| 0.0|54.0| 0| 0|51.8625| 0.0| 0|
| 3| 0.0| 2.0| 3| 1| 21.075| 0.0| 0|
| 3| 1.0|27.0| 0| 2|11.1333| 0.0| 1|
| 2| 1.0|14.0| 1| 0|30.0708| 1.0| 1|
| 3| 1.0| 4.0| 1| 1| 16.7| 0.0| 1|
| 1| 1.0|58.0| 0| 0| 26.55| 0.0| 1|
| 3| 0.0|20.0| 0| 0| 8.05| 0.0| 0|
| 3| 0.0|39.0| 1| 5| 31.275| 0.0| 0|
| 3| 1.0|14.0| 0| 0| 7.8542| 0.0| 0|
| 2| 1.0|55.0| 0| 0| 16.0| 0.0| 1|
| 3| 0.0| 2.0| 4| 1| 29.125| 2.0| 0|
| 2| 0.0|30.0| 0| 0| 13.0| 0.0| 1|
| 3| 1.0|31.0| 1| 0| 18.0| 0.0| 0|
| 3| 1.0|30.0| 0| 0| 7.225| 1.0| 1|
+------+----+----+-----+-----+-------+---------+--------+
only showing top 20 rows+--------------------+
| features|
+--------------------+
|[3.0,0.0,22.0,1.0...|
|[1.0,1.0,38.0,1.0...|
|[3.0,1.0,26.0,0.0...|
|[1.0,1.0,35.0,1.0...|
|(7,[0,2,5],[3.0,3...|
|[3.0,0.0,30.0,0.0...|
|(7,[0,2,5],[1.0,5...|
|[3.0,0.0,2.0,3.0,...|
|[3.0,1.0,27.0,0.0...|
|[2.0,1.0,14.0,1.0...|
|[3.0,1.0,4.0,1.0,...|
|[1.0,1.0,58.0,0.0...|
|(7,[0,2,5],[3.0,2...|
|[3.0,0.0,39.0,1.0...|
|[3.0,1.0,14.0,0.0...|
|[2.0,1.0,55.0,0.0...|
|[3.0,0.0,2.0,4.0,...|
|(7,[0,2,5],[2.0,3...|
|[3.0,1.0,31.0,1.0...|
|[3.0,1.0,30.0,0.0...|
+--------------------+
only showing top 20 rows+--------+
|Survived|
+--------+
| 0|
| 1|
| 1|
| 1|
| 0|
| 0|
| 0|
| 0|
| 1|
| 1|
| 1|
| 1|
| 0|
| 0|
| 0|
| 1|
| 0|
| 1|
| 0|
| 1|
+--------+
Pipeline
- 一个pipeline被指定成为一个阶段序列,
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
training = spark.createDataFrame([(0, "Hello PySpark", 1.0),(1, "Using Flink", 0.0),(2, "PySpark 3.0", 1.0),(3, "Test MySQL", 0.0)
], ["id", "text", "label"])
# pipeline 三个阶段: tokenizer -> hashingTF -> logR.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
logR = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, logR])
#训练数据上进行pipeline fit操作,产生一个model
model = pipeline.fit(training)
#############################################
#测试集
test = spark.createDataFrame([(4, "PySpark Pipeline"),(5, "pipeline"),(6, "PySpark python"),(7, "julia c#")
], ["id", "text"])#model执行transform
prediction = model.transform(test)
#预测
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():tid, text, prob, prediction = rowprint("(%d, %s) --> prediction=%f,prob=%s" \% (tid, text, prediction,str(prob)))(4, PySpark Pipeline) --> prediction=1.000000,prob=[0.029796174862816768,0.9702038251371833]
(5, pipeline) --> prediction=0.000000,prob=[0.56611226449896,0.43388773550104]
(6, PySpark python) --> prediction=1.000000,prob=[0.029796174862816768,0.9702038251371833]
(7, julia c#) --> prediction=0.000000,prob=[0.56611226449896,0.43388773550104]
逻辑回归预测
- 训练
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
import matplotlib.pyplot as pltspark = SparkSession.builder.master("local[*]").appName("PySpark ML").getOrCreate()
sc = spark.sparkContext
#############################################df_train = spark.read.csv('hdfs://localhost:9000/titanic/train.csv',header=True,inferSchema=True).cache()df_train = df_train.fillna({'Age': round(29.699,0)})
df_train = df_train.fillna({'Fare': 36.0})
df_train = df_train.fillna({'Embarked': 'S'})
df_train = df_train.drop("Cabin")
df_train = df_train.drop("Ticket")
labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked")
model = labelIndexer.fit(df_train)
df_train = model.transform(df_train)
labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex")
model = labelIndexer.fit(df_train)
df_train = model.transform(df_train)
features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked','Survived']
train_features = df_train[features]
df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp','Parch', 'Fare', 'iEmbarked'], outputCol="features")
train = df_assembler.transform(train_features)#LogisticRegression模型
lg = LogisticRegression(labelCol='Survived')
lgModel = lg.fit(train)
#保存模型
lgModel.write().overwrite().save("hdfs://localhost:9000/model/logistic-titanic")
print("save model to hdfs://localhost:9000/model/logistic-titanic")print("areaUnderROC: " + str(lgModel.summary.areaUnderROC))
#ROC curve is a plot of FPR against TPRplt.figure(figsize=(5,5))
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(lgModel.summary.roc.select('FPR').collect(),lgModel.summary.roc.select('TPR').collect())
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.show()save model to hdfs://localhost:9000/model/logistic-titanic
areaUnderROC: 0.8569355233864868
- 预测
from pyspark.ml.classification import LogisticRegressionModeldf_test = spark.read.csv('hdfs://localhost:9000/titanic/test.csv',header=True,inferSchema=True).cache()
df_test = df_test.fillna({'Age': round(29.699,0)})
df_test = df_test.fillna({'Embarked': 'S'})
#有一个null
df_test = df_test.fillna({'Fare': 36.0})
df_test = df_test.drop("Cabin")
df_test = df_test.drop("Ticket")
#新增Survived列,默认值为0
df_test = df_test.withColumn("Survived",0 * df_test["Age"])labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked")
model = labelIndexer.fit(df_test)
df_test = model.transform(df_test)
labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex")
model = labelIndexer.fit(df_test)
df_test = model.transform(df_test)
features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked','Survived']
test_features = df_test[features]
df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp', 'Parch','Fare', 'iEmbarked'], outputCol="features")
test = df_assembler.transform(test_features)lgModel = LogisticRegressionModel.load("hdfs://localhost:9000/model/logistic-titanic")
testSummary =lgModel.evaluate(test)results=testSummary.predictions
results["features","rawPrediction","probability","prediction"].show()+--------------------+--------------------+--------------------+----------+
| features| rawPrediction| probability|prediction|
+--------------------+--------------------+--------------------+----------+
|[3.0,0.0,34.5,0.0...|[1.99328605097899...|[0.88009035220072...| 0.0|
|[3.0,1.0,47.0,1.0...|[0.63374031844971...|[0.65333708360849...| 0.0|
|[2.0,0.0,62.0,0.0...|[1.97058477648159...|[0.87767391006101...| 0.0|
|(7,[0,2,5],[3.0,2...|[2.21170839644084...|[0.90129601257823...| 0.0|
|[3.0,1.0,22.0,1.0...|[-0.2919725495559...|[0.42752102300610...| 1.0|
|(7,[0,2,5],[3.0,1...|[1.68822917787023...|[0.84399113755992...| 0.0|
|[3.0,1.0,30.0,0.0...|[-0.9032166903750...|[0.28838991532794...| 1.0|
|[2.0,0.0,26.0,1.0...|[1.42490075002778...|[0.80610554993708...| 0.0|
|[3.0,1.0,18.0,0.0...|[-1.1236436862496...|[0.24533604281752...| 1.0|
|[3.0,0.0,21.0,2.0...|[2.59895227540995...|[0.93079411943702...| 0.0|
|(7,[0,2,5],[3.0,3...|[2.33390585204715...|[0.91164644844255...| 0.0|
|(7,[0,2,5],[1.0,4...|[0.69025711721974...|[0.66602412131662...| 0.0|
|[1.0,1.0,23.0,1.0...|[-2.7419887292668...|[0.06054069440361...| 1.0|
|[2.0,0.0,63.0,1.0...|[2.82767950026722...|[0.94415337330052...| 0.0|
|[1.0,1.0,47.0,1.0...|[-1.7316563679495...|[0.15037583472736...| 1.0|
|[2.0,1.0,24.0,1.0...|[-1.7197655536498...|[0.15190136429145...| 1.0|
|[2.0,0.0,35.0,0.0...|[0.88008689342827...|[0.70684022722317...| 0.0|
|[3.0,0.0,21.0,0.0...|[1.71304684487762...|[0.84723105652294...| 0.0|
|[3.0,1.0,27.0,1.0...|[-0.1717428611873...|[0.45716950894284...| 1.0|
|[3.0,1.0,45.0,0.0...|[-0.0389664987514...|[0.49025960775551...| 1.0|
+--------------------+--------------------+--------------------+----------+
决策树预测
from pyspark.ml.classification import DecisionTreeClassifier
#DecisionTree模型
dtree = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
treeModel = dtree.fit(train)
#打印treeModel
print(treeModel.toDebugString)
#对训练数据进行预测
dt_predictions = treeModel.transform(train)
dt_predictions.select("prediction", "Survived", "features").show()from pyspark.ml.evaluation import MulticlassClassificationEvaluator
multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'Survived', metricName = 'accuracy')
print('Decision Tree Accu:', multi_evaluator.evaluate(dt_predictions))#查看树信息
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_43ab8b3b232f4c305402) of depth 5 with 49 nodesIf (feature 1 in {0.0})If (feature 2 <= 3.0)If (feature 3 <= 2.0)Predict: 1.0Else (feature 3 > 2.0)If (feature 4 <= 1.0)Predict: 0.0Else (feature 4 > 1.0)If (feature 3 <= 4.0)Predict: 1.0Else (feature 3 > 4.0)Predict: 0.0...+----------+--------+--------------------+
|prediction|Survived| features|
+----------+--------+--------------------+
| 0.0| 0|[3.0,0.0,22.0,1.0...|
| 1.0| 1|[1.0,1.0,38.0,1.0...|
| 1.0| 1|[3.0,1.0,26.0,0.0...|
| 1.0| 1|[1.0,1.0,35.0,1.0...|
| 0.0| 0|(7,[0,2,5],[3.0,3...|
| 0.0| 0|[3.0,0.0,30.0,0.0...|
| 0.0| 0|(7,[0,2,5],[1.0,5...|
| 0.0| 0|[3.0,0.0,2.0,3.0,...|
| 1.0| 1|[3.0,1.0,27.0,0.0...|
| 1.0| 1|[2.0,1.0,14.0,1.0...|
| 1.0| 1|[3.0,1.0,4.0,1.0,...|
| 1.0| 1|[1.0,1.0,58.0,0.0...|
| 0.0| 0|(7,[0,2,5],[3.0,2...|
| 0.0| 0|[3.0,0.0,39.0,1.0...|
| 1.0| 0|[3.0,1.0,14.0,0.0...|
| 1.0| 1|[2.0,1.0,55.0,0.0...|
| 0.0| 0|[3.0,0.0,2.0,4.0,...|
| 0.0| 1|(7,[0,2,5],[2.0,3...|
| 1.0| 0|[3.0,1.0,31.0,1.0...|
| 1.0| 1|[3.0,1.0,30.0,0.0...|
+----------+--------+--------------------+
only showing top 20 rowsDecision Tree Accu: 0.8417508417508418
Python大数据处理库 PySpark实战 总结四相关推荐
- Python大数据处理库PySpark实战——使用PySpark处理文本多分类问题
[导读]近日,多伦多数据科学家Susan Li发表一篇博文,讲解利用PySpark处理文本多分类问题的详情.我们知道,Apache Spark在处理实时数据方面的能力非常出色,目前也在工业界广泛使用. ...
- Python大数据处理,应对海量数据挑战
Python大数据处理,应对海量数据挑战 Python的特点及在大数据处理中的优势 1 Python语言的特点 2 Python在大数据处理中所具备的优势 二.Python常用的大数据处理工具介绍 1 ...
- Python + 大数据 - 数仓实战之智能电商分析平台
Python + 大数据 - 数仓实战之智能电商分析平台 1. 项目架构 2. 数据仓库维度模型设计-事实表 事实表的特征:表里没有存放实际的内容,他是一堆主键的集合,这些ID分别能对应到维度表中的一 ...
- Python+大数据-数仓实战之滴滴出行(一)
Python+大数据-数仓实战之滴滴出行(一) 1. 项目架构图 1.1 数据流程处理 1.2 数仓分层 1.3 创建数据库 在Hive中创建数据库-- 创建ods库 create database ...
- Python+大数据-数仓实战之滴滴出行(二)
Python+大数据-数仓实战之滴滴出行(二) 1. 数据转移 #验证sqoop是否工作 /export/server/sqoop-1.4.7/bin/sqoop list-databases \ - ...
- Python大数据处理扩展库pySpark用法精要
Spark是一个开源的.通用的并行计算与分布式计算框架,其活跃度在Apache基金会所有开源项目中排第三位,最大特点是基于内存计算,适合迭代计算,兼容多种应用场景,同时还兼容Hadoop生态系统中的组 ...
- python基础代码库-Python基础数据处理库-NumPy
最近更新:2017-07-19 NumPy是Python做数据处理的底层库,是高性能科学计算和数据分析的基础,比如著名的Python机器学习库SKlearn就需要NumPy的支持.掌握NumPy的基础 ...
- 在线教育大数据营销平台实战(四):CRM线索生命周期及用户画像构建
作者介绍 @TigerHu 在线教育公司, 大数据营销产品线负责人, "一个数据人的自留地"创作者联盟成员. 数据化运营理念的落地不能只停留在对系统的盲目构建上,让企业内部用户会用 ...
- Python大数据分析与挖掘实战微课版答案 Python大数据分析与挖掘实战课后答案 例题 课后作业 python题目 python题库 数据分析与挖掘题库 数据分析与挖掘项目
(在此仅展示题目,所有数据.代码.答案.习题等点我头像,在资源中!!!) 以下关于pandas 数据预处理说法正确的是(). A. pandas没有做哑变量的函数 B. 在不导入其他厍的情况下,仅仅使 ...
最新文章
- 一种NVMe SSD友好的数据存储系统设计
- 唱吧DevOps的落地,微服务CI/CD的范本技术解读----最大的难点并不是实际业务代码的编写,而是服务的监控和调试以及容器的编排...
- linux字符驱动之概念介绍
- IT女性必备——5个方法变身小腰精
- 复制一段话,发现收费怎么办,下边帮你解决
- oracle rman optimization,关于RMAN中的优化(Optimization)
- python文件操作完成_基于python的文件操作
- H5动效的常见制作手法
- linux 天文软件,Stellarium 0.18.0虚拟天文馆软件发布,支持HiPS
- arx打印dwg为pdf
- Required view 'XXX' with ID XXX for field 'xxx' was not found
- 浅谈集合List,Set以及Map集合的特点及区别
- JPA错误 -- No identifier specified for entity: com.fyh.meng.configsystem.domain.GemConfig
- 来也UiBot RPA开发新手入门到高级开发者
- Android RxJava操作符的学习---变换操作符---网络请求嵌套回调
- 2021年低压电工考试题库及低压电工模拟考试
- Python模块---海龟(turtle)
- dw网页分栏html,DW网页布局(表格、布局表格).ppt
- 计算机管理权限数据完整性,某省发布数据完整性审计指南
- 克鲁斯卡尔算法生成最小树(画图)
热门文章
- 云计算中心怎么选址?
- python音乐编程_基于python实现音乐播放器代码实例
- Unity3D中角色撞击物体弹出提示框或显示对象效果
- Android多点触控之——MotionEvent(触控事件)
- IOS 自定义 滑动返回 手势
- Android 判断手机设备是否是小米,华为,魅族设备
- 通过ip无法获得计算机名称,电脑获取不到IP地址如何解决
- Java 给Word指定字符串添加批注
- Linux主流桌面环境简单介绍
- h5画三角形_如何利用css或html5画出一个三角形?两种不同的制作三角形方法(代码实例)...