作者:李呈祥,花名司麟 ,阿里云智能EMR团队高级技术专家,Apache Hive Committer, Apache Flink Committer,目前主要专注于EMR产品中开源计算引擎的优化工作。


Spark社区在Spark Packages网站中索引了许多第三方库,这些第三方库由不同的开发者贡献,作为Spark生态圈的一部分,扩充了Spark的使用范围和使用场景,其中很多对于我们日常的使用可能有帮助,我们准备开启一个系列文章介绍Spark Packages中一些有意思的第三方库,作为系列的第一篇,本文主要介绍Optimus,一个基于PySpark的简单易用的数据准备工具。

本文的部分内容源自Optimus官网和相关介绍文章,原文链接参考文末引用部分。

在Spark(Pyspark)的支持下,Optimus允许用户使用自己的或一组预先创建的数据转换功能来清理数据,对其进行概要分析并应用与数据分析和机器学习等场景,可以轻松地利用python语言进行所有这些操作。Optimus主要关注与以下几个方面:

  • 创建一个可靠的API来访问和处理数据。

  • 让用户轻松地从Pandas迁移。

  • 使数据探索更加容易。

创建一个可靠的API来访问和处理数据

首先,我们来看看Optimus的基本使用方式

from pyspark.sql import SparkSession
from optimus import Optimus// 创建context
spark = SparkSession.builder.appName('optimus').getOrCreate()
op= Optimus(spark)// 加载数据
df = op.load.csv("../examples/data/foo.csv")// 执行清理
new_df = df\.rows.sort("rank","desc")\.withColumn('new_age', df.age)\.cols.lower(["names","function"])\.cols.date_transform("date arrival", "yyyy/MM/dd", "dd-MM-YYYY")\.cols.years_between("date arrival", "dd-MM-YYYY", output_cols = "from arrival")\.cols.remove_accents("names")\.cols.remove_special_chars("names")\.rows.drop(df["rank"]>8)\.cols.rename(str.lower)\.cols.trim("*")\.cols.unnest("japanese name", output_cols="other names")\.cols.unnest("last position seen",separator=",", output_cols="pos")\.cols.drop(["last position seen", "japanese name","date arrival", "cybertronian", "nulltype"])//保存结果
new_df.save.csv("data/foo.csv")

Optimus基本PySpark框架,重新组织了对数据进行清理/准备的相关API,把数据处理整理为针对dataframe.rows和dataframe.cols两类操作,基于rows和cols实现了非常丰富的针对数据清理和准备相关的接口,用户可以使用这些接口非常方便高效地完成相关工作。在Optimus中,核心的数据操作可以归纳为如下几类:

  • 创建一个DataFrame

  • 用append()追加行或列

  • 使用select()选择行或列

  • 使用apply()更新或转换列数据

  • 使用drop()删除行或列

  • 使用read()加载数据

  • 使用write()保存数据

针对列的操作

对于数据集的操作主要是针对列进行的,所以这里主要介绍一些典型的针对列的操作类型:

Aggregation

Optimus扩展了PySpark的操作,创建了一种更简单的方式来对数据集进行统计。

print(df.cols.min(“ height”))
print(df.cols.percentile(['height','rank'],[0.05,0.25,0.5,0.75,0.95]))
print(df.cols.max(“ height”))
print(df.cols.median([“ height”,“ rank”]))
print(df.cols.range([“ height”,“ rank”])))
print(df.cols.std([“ height”,“ rank”]))17.5
{'height':{0.05:17.5,0.25:17.5,0.5:26.0,0.75:28.0,0.95:28.0},'rank':{0.05:7.0,0.25:7.0,0.5:7.0,0.75:10.0,0.95 :10.0}}
28.0
{'height':26.0,'rank':7.0}
{'height':{'min':17.5,'max':28.0},'rank':{'min':7,'max ':10}}
{'height':{'stddev':5.575242894559244},'rank':{'stddev':1.7320508075688772}}
Transformation and Chaining

类似PySpark DataFrame的操作,Optimus的数据转换操作也可以链接起来,甚至还可以和PySpark的DataFrame操作链接起来。利用Spark的延迟计算的特性(在示例中show()才会触发计算),使用Catalyst优化执行计划。

df = df\.rows.sort(["rank","height"])\.cols.lower(["names","function"])\.cols.remove_accents("names")\.cols.remove_special_chars("names")\.cols.trim("names")\.show()
+---------+------+---------+----+
|    names|height| function|rank|
+---------+------+---------+----+
|  optimus|  28.0|   leader|  10|
| ironhide|  26.0| security|   7|
|bumblebee|  17.5|espionage|   7|
+---------+------+---------+----+
Nest/Unnest

使用nest和unnest操作可以将多列合并成一个新列或者将一列拆分为多列,如下所示:

df.cols.nest(["names", "function"], output_col = "new_col", shape ="string").show()
+---------+------+---------+----+-------------------+
|    names|height| function|rank|            new_col|
+---------+------+---------+----+-------------------+
|  optimus|  28.0|   leader|  10|     optimus leader|
| ironhide|  26.0| security|   7|  ironhide security|
|bumblebee|  17.5|espionage|   7|bumblebee espionage|
+---------+------+---------+----+-------------------+df.cols.unnest("new_col", " ").cols.drop("new_col")
+---------+------+---------+----+---------+---------+
|    names|height| function|rank|new_col_0|new_col_1|
+---------+------+---------+----+---------+---------+
|  optimus|  28.0|   leader|  10|  optimus|   leader|
| ironhide|  26.0| security|   7| ironhide| security|
|bumblebee|  17.5|espionage|   7|bumblebee|espionage|
+---------+------+---------+----+---------+---------+
自定义转换

Optimus具有两个函数apply()和apply_expr(),用户可以在其中实现函数(UDF或Pandas UDF)或列表达式。

from pyspark.sql import functions as F
def func(value, args):return value + 1df.cols.apply("height", func, "int")\.cols.apply_expr("rank", F.col("rank")+1)\.show()+---------+------+---------+----+
|    names|height| function|rank|
+---------+------+---------+----+
|  optimus|    29|   leader|  11|
| ironhide|    27| security|   8|
|bumblebee|    18|espionage|   8|
+---------+------+---------+----+

让用户轻松地从Pandas迁移

在数据分析领域,Python是通用语言,而Pandas是最常用的库,所以Optimus在设计时尽量和Pandas的接口保持一致,以下是Optimus和Pandas以及PySpark的接口对比:

Description Pandas Spark Optimus
Read csv file pd.read_csv() spark.read.csv() op.read.csv()
Create Dataframe pd.Dataframe df.createdataframe() op.create.df()
Append Row df.append df.union() df.row().append()
Column Mean df.mean df1.agg({"x": "max"}) df.cols().mean()
Show Rows from Dataframe df.head() df.show() df.show()
Drop Columns df.drop() df.drop() df.cols().drop()
Sum all values in a Column df.sum() df1.agg({"x": "sum"}) function df.cols().sum()
Save Dataframe to csv df.to_csv() df.write.csv() df.save().csv()
Get a value by index df.get() NA NA
Get the mode of a column df.mode() NI df.cols().mode()
Cast a Column df.astype() df.column.cast() df.cols().cast(), astype() as alias
Substract 2 dataframes df.sub() NI NI
Merge to dataframes pd.concat() df.union() optimus.concat()
Apply a user defined fucntion to a column df.apply(func) fn = F.udf(labmbda x:x+1, DoubleType()) df.withColumn('disp1', n(df.disp)) df.cols().apply(func)
Group rows df.groupby() df.groupby() df.groupby()
Joint operation between to dataframes df.join() df.join() df.join()
Fill Null values with x df.fillna() df.fillna() df.fillna()
Get the max number of a Column df.max() df1.agg({"x": "max"}) df.cols().max()
Reset index reset_index() NA NA

NI= Not implemented
NA= Not

除了在Spark无法实现的功能(如reset_index),Optimus实现了几乎所有Pandas可应用于Spark的功能,而且两个接口基本一致,大大方便了Pandas用户的迁移。

使数据探索更轻松

Optimus具有功能强大的内置数据探查器,除所有基本操作外,它还提供了独特的数据探查功能。用户可以查看特定列中存在多少种数据类型。例如,有一百万行的颜色值为白色,黑色,红色以及数百种颜色,如何确定一百万行中没有“ 1”数字?数据探查使得用户可以了解数据集的质量,是否有脏数据,为数据清理和准备提供前提信息和验证方式,使数据集以合适的状态用于后续的数据分析和ML/DL处理。

df = op.load.csv("https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/Meteorite_Landings.csv").h_repartition()
// 对name列进行profile
op.profiler.run(df, "name", infer=False)

可以看到profile提供了详细的数据统计信息,包括name列的数据类型,null数量,distinct count数量,topN出现次数,histogram分布等等,基于这些信息,用户可以准确了解数据的质量。

Profile由于计算量比较大,可能会比较耗时,特别是对于distinct count这种操作,使用relative_error 和 approx_count参数可以以降低精度为代价加速profile速度。

op.profiler.run(df, "name", infer=False, relative_error =1, approx_count=True)

总结

本文主要介绍了Optimus项目,作为一个Spark的第三方库,Optimus基于PySpark,为用户提供了一套完整的数据质量探查和数据清理工具集,接口参考Pandas设计,易用且强大,非常适合大规模数据的清理准备工作。限于篇幅,还有很多Optimus的清理接口和Profile功能没有介绍,感兴趣的同学可以访问Optimus官网探索更多功能和用法。

引用

  1. https://hi-optimus.com/

  2. https://github.com/ironmussa/Optimus

  3. http://docs.hioptimus.com/en/latest/ps/overview.html

  4. https://towardsdatascience.com/announcing-optimus-v2-agile-data-science-workflows-made-easy-c127a12d9e13


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!

对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。

Apache Spark技术交流社区公众号,微信扫一扫关注

Spark Packages寻宝(一):简单易用的数据准备工具Optimus相关推荐

  1. Maya v1.0.7.0 类似Rolan简洁小巧简单易用的快速启动工具

    Maya 是一款类似Rolan简洁小巧简单易用的快速启动工具,市面上有很多启动工具,如:RunAny ,Lucy快速启动,CLaunch,启动工具 Lily,nTrun,Rolan,音速启动等等多看免 ...

  2. Mac简单易用的文本识别工具:TextSniper

    TextSniper是适用于Mac的简单易用的文本识别工具.TextSniper不仅提取屏幕上选定部分内的所有文本,并将其转换为可编辑文本还可以轻松识别图形,数字文档和视频中的文本. 从任何地方提取文 ...

  3. 简单易用的AI桌面工具系列 - 图像生成

    简单易用的AI桌面工具系列 图像生成 文生图:输入提示词,生成图片(仅支持英文) 图生图:根据图片及提示词生成图片 图像无损放大:比如将 512*512 放大到 1024 * 1024 分辨率 512 ...

  4. FineBI:一个简单易用的自助BI工具

    过去,有关企业数据分析的重担都压在IT部门,传统BI分析更多面向的是具有IT背景的人员.但随着业务分析需求的增加,很多公司都希望为业务用户提供自助分析服务,将分析工作落实到业务人员手中.但同时,分析工 ...

  5. 一款简单易用的视频下载工具

    [公众号回复 "1024",免费领取程序员赚钱实操经验] 大家好,我是章鱼猫. 今天推荐的这个项目是「downkyi」,哔哩下载姬,一个简单易用的哔哩哔哩视频下载工具,支持批量下载 ...

  6. 快速上手!7款简单易用的在线绘图工具推荐

    随着科技的迅猛发展,越来越多的工作需要依赖电脑完成,而画图软件已经成为工作中不可或缺的一部分. 从过去简陋的Microsoft Paint到如今功能强大的Adobe Illustrator和即时设计等 ...

  7. 一款 0 门槛轻松易上手的数据可视化工具

    在职场中有一项共识是:数据驱动业务价值.业务在产品.运营.开发.技术支持.销售等环节都有着大量的数据需求, 市面上也出现了很多 BI 可视化工具,但如果能同时具备以下特性,则可以称为一款优秀的 BI ...

  8. 有哪些简单易用的高效办公工具?

    推荐10个非常实用的办公软件和网站. 1.PDF派 PDF派是CleverPDF品牌专为中国地区用户而建立的网站,是一个20合1的好工具,页数少的时候是免费使用! 2.智办事 智办事是第一个提出以&q ...

  9. 一个简单易用的Http访问工具类for Android

    前言 去年(2017)参加服务外包省赛的时候,负责App开发的我遇到了一个小难题--Http请求.虽说已经有成熟的HttpUrlConnection库供使用,但依然感到有些不方便:进行一次简单的请求并 ...

最新文章

  1. linux/docker个人服务器项目中文变问号??,时间差8小时问题解决方法,最新,最有效
  2. 【HTML】兴唐二十八节课之常用标签(不定期更新)
  3. Mac zsh切换bash bash切换zsh
  4. Nginx, HTTPS的配置
  5. android 文字fly动画,超好看的下拉刷新动画Android代码实现
  6. 《敏捷敬业度》作者访谈
  7. php 网站移动端自适应,HTML5 移动端自适应布局
  8. kubespray容器化部署kubernetes高可用集群
  9. MFC开发IM-第二十六篇、vs2008添加库文件
  10. curl post https_Fiddler抓包13fiddler 抓包导出 curl 命令行
  11. [FZYZOJ 1073] Password
  12. SpringBoot使用GZIP压缩返回数据
  13. js实现sqrt开方函数(二分法)
  14. Spring 实现发送电子邮件的两种方法
  15. 按键精灵q语言连接mysql_【按键精灵8】Q语言介绍
  16. TCP/IP 报文格式(IP数据包、TCP报头、UDP报头)
  17. 第二人生的源码分析(四十)创建多个工作线程
  18. 2112731-95-8,N-(Azido-PEG3)-N-Boc-PEG4-acid与炔丙基、BCN或DBCO试剂进行点击化学反应
  19. 【Codeforces 538 H】Summer Dichotomy(二分图染色)
  20. 程序员进阶!阿里P7级别面试经验总结,深度好文

热门文章

  1. python全局解释器锁 tensorflow_想轻松复现深度强化学习论文?看这篇经验之谈
  2. 现在的教育....哎...
  3. 家电清洗服务预约小程序
  4. 故障演练--使用 ChaosBlade 进行混沌实验
  5. 【开源小软件 】Bing每日壁纸 让桌面壁纸保持更新
  6. python读取excel学习day1
  7. 2020 2 跳线法兰
  8. 08 Adaptive AUTOSAR 和 Security
  9. Gtalent如何帮助HR确定人才测评指标
  10. Linux下配置短信猫的方法和常见错误。