spark业务开发-空值处理
spark业务开发-空值处理
- 项目地址:https://gitee.com/cch-bigdata/spark-process.git
输入数据集
"id","name","description","weight"
"102","car battery","12V car battery","8.1"
"103","12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3","0.8"
"104","hammer","12oz carpenter's hammer","0.75"
"105","hammer","14oz carpenter's hammer","0.875"
"106","hammer","16oz carpenter's hammer","1"
"107","rocks","box of assorted rocks","5.3"
"108","jacket","water resistent black wind breaker","0.1"
"109","spare tire","24 inch spare tire","22.2"
"101","scooter","Small 2-wheel scooter","3.14"
"102","scooter1","Small 2-wheel scooter1","3.14"
输出数据
+---+------------------+--------------------+------+
| id| name| description|weight|
+---+------------------+--------------------+------+
|102| car battery| 12V car battery| 8.1|
|103|12-pack drill bits|12-pack of drill ...| 0.8|
|104| hammer|12oz carpenter's ...| 0.75|
|105| hammer|14oz carpenter's ...| 0.875|
|106| hammer|16oz carpenter's ...| 1.0|
|107| rocks|box of assorted r...| 5.3|
|108| jacket|water resistent b...| 0.1|
|109| spare tire| 24 inch spare tire| 22.2|
|101| scooter|Small 2-wheel sco...| 3.14|
|102| scooter1|Small 2-wheel sco...| 3.14|
+---+------------------+--------------------+------+
程序代码
package com.cch.bigdata.spark.process.nvlimport com.cch.bigdata.spark.process.AbstractTransform
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.{avg, max, min}
import org.apache.spark.sql.{DataFrame, Row}import scala.collection.mutable.ListBufferclass Nvl extends AbstractTransform{case class NvlColumn(name:String, strategy:String, spec:Any)//需要做空值处理的列,使用逗号分隔private val columns = Array("weight","description")//空值处理策略,使用逗号分隔,要与nvl_columns数量匹配//max 选择该列中最大值来填充空值//min 选择该列中最小值来填充空值(null做0)//mid 选择该列中中位值来填充空值//most 选择该列中出现频率最多的值来填充空值//spec 使用特定的值来填充空值,对应nvl_spec属性private val nvl_strategy = Array("max","most")//使用特定的值来填充空值时,这个属性有值private val nvl_spec = Array("","")override def process(): Unit = {if(columns.isEmpty){throw new RuntimeException("空值列配置不能为空")}if(nvl_strategy.isEmpty){throw new RuntimeException("空值处理策略配置不能为空")}val list:ListBuffer[NvlColumn] = ListBuffer()var index = 0columns.foreach(c=>{var spec:String = ""try {spec = nvl_spec(index)}catch {case e:ArrayIndexOutOfBoundsException=>{//什么也不做}}list.append(NvlColumn(c,nvl_strategy(index),spec))index+=1})//获取数据集var df: DataFrame = loadCsv("src/main/resources/csv/products.csv",spark)val types: Map[String, String] = df.dtypes.toMaplist.foreach(nvlColumn=>{if(nvlColumn.name.isEmpty){throw new RuntimeException("空值列未配置!")}if(nvlColumn.strategy.isEmpty){throw new RuntimeException("空值列处理策略不能为空!")}if(nvlColumn.strategy.equals("spec")){if(nvlColumn.spec==null){throw new RuntimeException("使用了特定值策略替换空值,但未陪配置对应的特定值!")}}nvlColumn.strategy match {case "max" => {df = fill(nvlColumn.name,types.get(nvlColumn.name).get,df,(d:DataFrame,c:String)=>{maxValue(d,c)})}case "min" => {df = fill(nvlColumn.name,types.get(nvlColumn.name).get,df,(d:DataFrame,c:String)=>{minValue(d,c)})}case "mid" => {df = fill(nvlColumn.name,types.get(nvlColumn.name).get,df,(d:DataFrame,c:String)=>{midValue(d,c)})}case "most" => {df = fill(nvlColumn.name,types.get(nvlColumn.name).get,df,(d:DataFrame,c:String)=>{mostValue(d,c)})}case "spec" => {df = specFill(nvlColumn.name,types.get(nvlColumn.name).get,df,nvlColumn.spec)}}})df.show()}def fill(col:String,typeName:String,df:DataFrame,strategy:(DataFrame,String)=>Any): DataFrame ={var result:DataFrame = nullval opResult: Any = strategy(df, col)if(typeName.startsWith("DoubleType")){result = df.na.fill(opResult.toString.toDouble, Seq(col))}else if(typeName.startsWith("LongType")){result = df.na.fill(opResult.toString.toLong, Seq(col))}else if(typeName.startsWith("IntegerType")){result = df.na.fill(opResult.toString.toInt, Seq(col))}else if(typeName.startsWith("ShortType")){result = df.na.fill(opResult.toString.toShort, Seq(col))}else if(typeName.startsWith("BooleanType")){result = df.na.fill(opResult.toString.toBoolean, Seq(col))}else if(typeName.startsWith("DecimalType")){result = df.na.fill(opResult.toString.toDouble, Seq(col))}else{result = df.na.fill(opResult.toString, Seq(col))}//其他类型需要后续继续验证result}def specFill(col:String,typeName:String,df:DataFrame,value:Any): DataFrame ={var result:DataFrame = nullif(typeName.startsWith("DoubleType")){result = df.na.fill(value.toString.toDouble, Seq(col))}else if(typeName.startsWith("LongType")){result = df.na.fill(value.toString.toLong, Seq(col))}else if(typeName.startsWith("IntegerType")){result = df.na.fill(value.toString.toInt, Seq(col))}else if(typeName.startsWith("ShortType")){result = df.na.fill(value.toString.toShort, Seq(col))}else if(typeName.startsWith("BooleanType")){result = df.na.fill(value.toString.toBoolean, Seq(col))}else if(typeName.startsWith("DecimalType")){result = df.na.fill(value.toString.toDouble, Seq(col))}else{result = df.na.fill(value.toString, Seq(col))}//其他类型需要后续继续验证result}def mostValue(dataFrame: DataFrame,column: String): Any ={//返回单列的新的dataframe,空值置为0val newDataFrame: DataFrame = dataFrame.select(column).na.fill(0)//对每个数据做数量标记,并且做相同值数量相加val mapRdd: RDD[(Any, Int)] = newDataFrame.rdd.mapPartitions(row => {row.map(r => {(r.get(0), 1)})}).reduceByKey(_ + _)//执行计算,并转换为数组,按倒序排列,获取第一行的第一列数据作为//出现次数最多的值val tuples: Array[(Any, Int)] = mapRdd.sortBy(_._2,false).collect()tuples(0)._1}//某列的最大值def maxValue(dataFrame: DataFrame,column: String):Any={dataFrame.select(max(column)).collect()(0).get(0)}//某列的最小值def minValue(dataFrame: DataFrame,column: String): Any ={dataFrame.select(min(column)).collect()(0).get(0)}//某列的平均值def avgValue(dataFrame: DataFrame,column: String): Any ={dataFrame.select(avg(column)).collect()(0).get(0)}//某列的中位数//中位数(Median)又称中值,统计学中的专有名词,//是按顺序排列的一组数据中居于中间位置的数,代表一个样本、种群或概率分布中的一个数值,//其可将数值集合划分为相等的上下两部分。对于有限的数集,//可以通过把所有观察值高低排序后找出正中间的一个作为中位数。如果观察值有偶数个,通常取最中间的两个数值的平均数作为中位数。def midValue(dataFrame: DataFrame,column: String): Any ={//默认升序val newDataFrame: DataFrame = dataFrame.select(column)//将指定列转换为数组val rows: Array[Row] = newDataFrame.select(column).sort(column).collect()//获取数组长度val size: Long = rows.length//判断奇偶性if(size%2==0){//偶数,需要使用最中间的两位数的平均值val midIndex:Int = (size/2).toIntval value_1 = rows(midIndex).get(0).asInstanceOf[Number].longValueval value_2 = rows(midIndex-1).get(0).asInstanceOf[Number].longValue(value_1+value_2)/2}else{//奇数,使用最中间的那个数val midIndex: Int = (size / 2).toIntval row: Row = rows(midIndex)row.get(0)}}override def getAppName(): String = "空值处理"
}
package com.cch.bigdata.spark.processimport com.cch.bigdata.spark.process.nvl.Nvlobject NvlTest {def main(args: Array[String]): Unit = {new Nvl().process()}
}
参数解释
columns:需要处理空值的列名,字符串数组
nvl_strategy:空值处理策略,字符串数组,数组索引与columnsy一致
nvl_spec:指定值填充空值
strategy:处理表达式(策略)
- max代表选用该列中最大值填充空值
- min代表选用该列中最小值填充空值
- mid代表选用该列中的中位值来填充空值
- most代表选用该列中出现次数最多的值来填充空值
- spec代表使用自定义指定的内容来填充空值
spec:如果空值策略选择了spec,则这个属性需要设置为需要填充空值的内容
spark业务开发-空值处理相关推荐
- spark业务开发-列选择
spark业务开发-列选择 项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据 "id","name&qu ...
- spark业务开发-添加索引列
spark业务开发-添加索引列 项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据 name,profession,enroll,sco ...
- spark业务开发-聚合(agg)
spark业务开发-聚合(agg) 项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据 name,profession,enroll,s ...
- spark业务开发-union合并(union)
spark业务开发-union合并(union) 项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据集1 id,name,profess ...
- 数据服务让业务开发更敏捷
本文笔记全部来自<极客新闻> 数据中台离业务最近,能更快地响应业务和应用开发的需求,数据可追溯性也会更强,更精准. 数据服务就是数据中台在落地实施过程中的一个对外输出数据的环节,将数据服务 ...
- 潘在亮:给业务开发提供黑科技装备的“测试Q博士”(图灵访谈)
嘉宾简介: 潘在亮, 现任腾讯 社交网络质量部测试开发中心组长.加入腾讯之前,先后任职甲骨文.微软公司测试开发工程师,参与企业级搜索和Visual Studio产品的测试开发工作.于2009年加入腾讯 ...
- Apache Spark学习:利用Eclipse构建Spark集成开发环境
介绍了如何使用Maven编译生成可直接运行在Hadoop 2.2.0上的Spark jar包,而本文则在此基础上, 介绍如何利用Eclipse构建Spark集成开发环境 . 不建议大家使用eclips ...
- Spring 声明式事务在业务开发中容易碰到的坑总结
Spring 声明式事务,在业务开发使用上可能遇到的三类坑,包括: 第一,因为配置不正确,导致方法上的事务没生效.我们务必确认调用 @Transactional 注解标记的方法是 public 的,并 ...
- 灵魂拷问:后端业务开发要会用 K8s 到什么程度?
来源 | 阿里巴巴云原生公众号 很多人看着 K8s 成为最热门的开源技术,都纷纷开始学习 K8s,但也有很多人在抱怨 K8s 太复杂了.用 CNCF 新晋 TOC 张磊的话来说:这里的根本问题在于,K ...
最新文章
- pandas将列表list插入到dataframe的单元格中、pandas使用read_csv函数读取文件并设置保留数值的前置0( leading zeroes)
- Cocos2d-x 3.0 rc0中加入附加项目,解决无法打开包括文件:“extensions/ExtensionMacros.h”...
- python的主函数如何书写_Python程序的入口在哪里?main函数的恰当写法
- 好文推荐 | MySQL binlog应用场景与原理深度剖析
- Tomcat设置Http自动跳转Https
- Java单例模式优化写法
- 平衡二叉树AVL插入
- 深度学习中的数据增强方法
- 【数字全排列】LeetCode 47. Permutations II
- WPF学习之路(二) XAML(续)
- java视频通话_Java使用WebSocket和WebRTC视频通话
- excel使用教程_【免费】人人入手的办公软件(excel,ppt,word)——安装包/教程/模板合集...
- 计算机系统 基础知识点汇总,超全!!!
- Java项目部署的完整流程
- linux c语言 模拟键盘输入
- 面试题大汇总华为面试题
- 史上最全的数据库面试题,面试前刷一刷!
- 2014年360广告算法面试经历
- 基于单片机的多功能数字时钟设计
- Java IO Steam