SparkSQL练习--出租车数据清洗

  • 数据分析
  • 数据读取
    • 准备工作
    • 读取文件
  • 数据清洗
    • 数据类型转换
    • 解决报错问题
    • 剪除异常数据
  • 完整代码显示

数据分析

数据集结构

其中有几点需要注意

  • hack_license 是出租车执照, 可以唯一标识一辆出租车
  • pickup_datetime 和 dropoff_datetime 分别是上车时间和下车时间, 通过这个时间, 可以获知行车时间
  • pickup_longitude 和 dropoff_longitude 是经度, 经度所代表的是横轴, 也就是 X 轴
  • pickup_latitude 和 dropoff_latitude 是纬度, 纬度所代表的是纵轴, 也就是 Y 轴

跳转顶部


数据读取

准备工作

配置pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast</groupId><artifactId>taxi</artifactId><version>0.0.1</version><properties><scala.version>2.11.8</scala.version><spark.version>2.2.0</spark.version><hadoop.version>2.7.5</hadoop.version><slf4j.version>1.7.16</slf4j.version><log4j.version>1.2.17</log4j.version><mysql.version>5.1.35</mysql.version><esri.version>2.2.2</esri.version><json4s.version>3.6.6</json4s.version></properties><dependencies><!-- Scala 库 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang.modules</groupId><artifactId>scala-xml_2.11</artifactId><version>1.0.6</version></dependency><!-- Spark 系列包 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><!-- 地理位置处理库 --><dependency><groupId>com.esri.geometry</groupId><artifactId>esri-geometry-api</artifactId><version>${esri.version}</version></dependency><!-- JSON 解析库 --><dependency><groupId>org.json4s</groupId><artifactId>json4s-native_2.11</artifactId><version>${json4s.version}</version></dependency><dependency><groupId>org.json4s</groupId><artifactId>json4s-jackson_2.11</artifactId><version>${json4s.version}</version></dependency><!-- 日志相关 --><dependency><groupId>org.slf4j</groupId><artifactId>jcl-over-slf4j</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>${log4j.version}</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin></plugins></build>
</project>

创建scala目录,并且将这个目录设置为Source Root


目录结构如下

读取文件

创建SarkSession与读取文件

    //创建SparkSessionval spark = SparkSession.builder().master("local[6]").appName("taxi").getOrCreate()//导入隐式转换import spark.implicits._import org.apache.spark.sql.functions._//数据读取val taxiRow = spark.read.option("header", true).csv("dataset/half_trip.csv").show()

结果展示

跳转顶部


数据清洗

数据类型转换

剪去多余列

  • 现在数据集中包含了一些多余的列, 在后续的计算中并不会使用到, 如果让这些列参与计算的话, 会影响整体性能, 浪费集群资源

类型转换

  • 可以看到, 现在的数据集中, 所有列类型都是 String, 而在一些统计和运算中, 不能使用 String 来进行, 所以要将这些数据转为对应的类型

我们可以自定义一个样例类将Row类型数据转换成对象类

/*** 代表一个行程, 是集合中的一条记录* @param license 出租车执照号* @param pickUpTime 上车时间* @param dropOffTime 下车时间* @param pickUpX 上车地点的经度* @param pickUpY 上车地点的纬度* @param dropOffX 下车地点的经度* @param dropOffY 下车地点的纬度*/
case class Trip(license: String,pickUpTime: Long,dropOffTime: Long,pickUpX: Double,pickUpY: Double,dropOffX: Double,dropOffY: Double)

我们在逐行转换数据类型时不知道数据是否为空,因此我们应该新建一个类来判断数据是否为空

因为在针对 Row 类型对象进行数据转换时, 需要对一列是否为空进行判断和处理, 在 Scala中为空的处理进行一些支持和封装, 叫做 Option, 所以在读取 Row 类型对象的时候, 要返回 Option 对象, 通过一个包装类, 可以轻松做到这件事

class RichRow(row: Row) {// 为了返回空值,提醒外面进行处理def getAs[T](field: String): Option[T] = {//判断是否为空if (row.isNullAt(row.fieldIndex(field))) {None} else {Some(row.getAs[T](field))}}
}

RichRow类的解释与分析

  • 该类的输入时Row就是行
  • getAs方法输入的参数field是列名,输出的是每列的数据
  • 之所将数据包装成option类是因为,,此类有个方法getOrElse,使用该方法,不为空是时输出数据,为空时输出0或者指定的数据

我们需要的数据只有三种数据类型,分别是:StringLongDouble。,字符类型在数据读取时,默认就是,但是后两者类型需要我们自己手动转换
将数据转换成Long类型

  def parseTime(row: RichRow, field: String): Long = {//表示时间类型的格式val pattern = "yyyy-MM-dd HH:mm:ss"val formatter = new SimpleDateFormat(pattern, Locale.ENGLISH)//执行转换,获取时间戳val time: Option[String] = row.getAs[String](field)/*** 这个map时option中的map* 他的主要作用就是,如果有值的话就转换成定义的类型* 值为空就不返回* 所以它是安全的*/val timeOption = time.map(time => formatter.parse(time).getTime)timeOption.getOrElse(0l)}

读取Double类型数据

  def parseLocation(row: RichRow, field: String): Double = {//获取数据val location: Option[String] = row.getAs[String](field)//转换数据val locationOption: Option[Double] = location.map(local => local.toDouble)locationOption.getOrElse(0.0)}

此时我们可以将数据一起转换成自定义的类型

  def parse(row: Row): Trip = {val richRow = new RichRow(row)val license = richRow.getAs[String]("hack_license").orNullval pickUpTime = parseTime(richRow, "pickup_datetime")val pickOffTime = parseTime(richRow, "dropoff_datetime")val pickUpX = parseLocation(richRow, "pickup_longitude")val pickUpY = parseLocation(richRow, "pickup_latitude")val pickOffX = parseLocation(richRow, "dropoff_longitude")val pickOffY = parseLocation(richRow, "dropoff_latitude")Trip(license, pickUpTime, pickOffTime, pickUpX, pickUpY, pickOffX, pickOffY)}

解决报错问题

我们上面在对数据进行类型转换时,可能会因为数据的错误而报错,那我们改输入解决?

  def safe[P, R](f: P => R): P => Either[R, (P, Exception)] = {new Function[P, Either[R, (P, Exception)]] with Serializable {override def apply(param: P): Either[R, (P, Exception)] = {try {Left(f(param))} catch {case e: Exception => Right((param, e))}}}}

解释与分析

  • 我们为了保证数据处理过程中的安全性,可以在rdd.map的时候调用函数safe
  • 但是map输入的是一个函数parse,所以我们应该也是输入函数p
  • safe接受的参数也是函数p,但是这个函数p会返回一个函数r
  • 所以safe现在是有两种输出的结果,一个是输入的函数变换之后的r
  • 一个是输入函数和报错信息

针对异常值进行处理

   val taxiParsed: RDD[Either[Trip, (Row, Exception)]] = taxiRow.rdd.map(safe(parse))//现在result里面全是有问题的rowval result: RDD[Row] = taxiParsed.filter(e => e.isRight).map(e => e.right.get._1)//结果val taxiGood = taxiParsed.map(either => either.left.get).toDS()

剪除异常数据

    val hours = (pickupTime: Long, dropoffTime: Long) => {val duration = dropoffTime - pickupTimeval hour = TimeUnit.HOURS.convert(duration, TimeUnit.MICROSECONDS)hour}val hoursUDF = udf(hours)spark.udf.register("hours",hours)val taxiClean = taxiGood.where("hours(pickUpTime,dropOffTime) between 0 and 3").show()

结果展示

跳转顶部


完整代码显示

package taxiimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}import java.text.SimpleDateFormat
import java.util.Locale
import java.util.concurrent.TimeUnitobject TaxiAnalysisRunner {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().master("local[6]").appName("taxi").getOrCreate()//导入隐式转换import spark.implicits._import org.apache.spark.sql.functions._//数据读取val taxiRow = spark.read.option("header", true).csv("dataset/half_trip.csv")//转换操作val taxiParsed: RDD[Either[Trip, (Row, Exception)]] = taxiRow.rdd.map(safe(parse))//现在result里面全是有问题的row//    val result: RDD[Row] = taxiParsed.filter(e => e.isRight)//      .map(e => e.right.get._1)//结果val taxiGood = taxiParsed.map(either => either.left.get).toDS()//绘制时长直方图//统计时长val hours = (pickupTime: Long, dropoffTime: Long) => {val duration = dropoffTime - pickupTimeval hour = TimeUnit.HOURS.convert(duration, TimeUnit.MICROSECONDS)hour}val hoursUDF = udf(hours)spark.udf.register("hours",hours)val taxiClean = taxiGood.where("hours(pickUpTime,dropOffTime) between 0 and 3").show()}/*** 作用就是封装parse方法,捕获异常**//*** 包裹转换逻辑, 并返回 Either* 我们为了保证数据处理过程中的安全性,可以在rdd.map的时候调用函数safe* 但是map输入的是一个函数parse,所以我们应该也是输入函数p* safe接受的参数也是函数p,但是这个函数p会返回一个函数r* 所以safe现在是有两种输出的结果,一个是输入的函数变换之后的r* 一个是输入函数和报错信息*/def safe[P, R](f: P => R): P => Either[R, (P, Exception)] = {new Function[P, Either[R, (P, Exception)]] with Serializable {override def apply(param: P): Either[R, (P, Exception)] = {try {Left(f(param))} catch {case e: Exception => Right((param, e))}}}}/*** Row -> Trip** @param row* @return*/def parse(row: Row): Trip = {val richRow = new RichRow(row)val license = richRow.getAs[String]("hack_license").orNullval pickUpTime = parseTime(richRow, "pickup_datetime")val pickOffTime = parseTime(richRow, "dropoff_datetime")val pickUpX = parseLocation(richRow, "pickup_longitude")val pickUpY = parseLocation(richRow, "pickup_latitude")val pickOffX = parseLocation(richRow, "dropoff_longitude")val pickOffY = parseLocation(richRow, "dropoff_latitude")Trip(license, pickUpTime, pickOffTime, pickUpX, pickUpY, pickOffX, pickOffY)}def parseTime(row: RichRow, field: String): Long = {//表示时间类型的格式val pattern = "yyyy-MM-dd HH:mm:ss"val formatter = new SimpleDateFormat(pattern, Locale.ENGLISH)//执行转换,获取时间戳val time: Option[String] = row.getAs[String](field)/*** 这个map时option中的map* 他的主要作用就是,如果有值的话就转换成定义的类型* 值为空就不返回* 所以它是安全的*/val timeOption = time.map(time => formatter.parse(time).getTime)timeOption.getOrElse(0l)}def parseLocation(row: RichRow, field: String): Double = {//获取数据val location: Option[String] = row.getAs[String](field)//转换数据val locationOption: Option[Double] = location.map(local => local.toDouble)locationOption.getOrElse(0.0)}
}/**** @param row*/
class RichRow(row: Row) {// 为了返回空值,提醒外面进行处理def getAs[T](field: String): Option[T] = {//判断是否为空if (row.isNullAt(row.fieldIndex(field))) {None} else {Some(row.getAs[T](field))}}
}case class Trip(license: String,pickUpTime: Long,dropOffTime: Long,pickUpX: Double,pickUpY: Double,dropOffX: Double,dropOffY: Double)

跳转顶部


【Spark】SparkSQL练习--出租车数据清洗相关推荐

  1. Spark SQL 快速入门系列(五)SparkSQL 访问 Hive

    文章目录 访问 Hive SparkSQL 整合 Hive 访问 Hive 表 idea实现SparkSQL连接hive 访问 Hive 导读 1,整合 SparkSQL 和 Hive, 使用 Hiv ...

  2. spark如何进行聚类可视化_基于Spark的出租车轨迹处理与可视化平台

    由于城市化进程加剧以及汽车数量增加, 城市交通问题日益严重[, 通过分析各种空间数据解决交通问题是当前研究的热点. 出租车提供广泛且灵活的交通运输服务, 是城市交通的重要组成部分. 出租车轨迹数据记录 ...

  3. 离线数据清洗,Spark和Python Pandas对比

    导语 最近新学习了Spark中RDD的核心用法,为了巩固学习成果,于是使用Spark写了一个数据清洗的代码,正好之前使用过python中pandas对同样的数据做数据清洗,于是就把两种方式的代码都贴出 ...

  4. 全面对比,深度解析 Ignite 与 Spark

    经常有人拿 Ignite 和 Spark 进行比较,然后搞不清两者的区别和联系.Ignite 和 Spark,如果笼统归类,都可以归于内存计算平台,然而两者功能上虽然有交集,并且 Ignite 也会对 ...

  5. spark streaming 消费 kafka入门采坑解决过程

    kafka 服务相关的命令 # 开启kafka的服务器 bin/kafka-server-start.sh -daemon config/server.properties & # 创建top ...

  6. Spark On Yarn 运行项目

    在spark中,支持4中运行模式: Local:往往使用本地开发的时候使用. StandAlone:是spark自带的,如果一个集群是StandAlone模式的话,那么就需要在多台机器上同时部署Spa ...

  7. SparkSQL 控制文件输出的大小

    package cn.ac.iie.logimport org.apache.spark.sql.{SaveMode, SparkSession}/*** 使用Spark完成我们的数据清洗操作*/ o ...

  8. spark 连接mysql 命令_spark-submit命令包括mysql连接器

    我有一个scala对象文件,它在内部查询mysql表做一个连接并将数据写入s3,在本地测试我的代码它运行得很好 . 但是当我将它提交到集群时,它会抛出以下错误: 线程"main"j ...

  9. java spark读写hdfs_Spark读取HDFS数据输出到不同的文件

    最近有一个需求是这样的:原来的数据是存储在MySQL,然后通过Sqoop将MySQL的数据抽取到了HDFS集群上,抽取到HDFS上的数据都是纯数据,字段值之间以\t分隔,现在需要将这部分数据还原为js ...

  10. Spark 连接 HBase 入库及查询操作

    本实例采用Scala开发,实现了RDD数据两种方式入库到HBase,从HBase中读取数据并print输出. build.sbt name := "SparkSbt"version ...

最新文章

  1. 人工智能的影响调查_调查报告|文科大学生群体对于人工智能影响 就业的认知程度:基于访谈的质性研究...
  2. iOS 访问权限设置
  3. linux 流量 脚本,linux 检测网络出入流量 当达到一定量时 自动执行另一个脚本。...
  4. 程序包清单签名验证失败_数字世界的手写签名
  5. cannot enable both sham-link state-change interface traps
  6. Java继承_JAVA的三大特性
  7. ...............
  8. Scratch二次开发6:如何保存作品到自己的服务器
  9. 网红释一刀考察潮汕特色美食土虾
  10. 小小知识点(一):辨别性相似度(Discriminative Similarity)
  11. 【操作指导 | 代码实现】挑战程序设计竞赛2:算法和数据结构
  12. Api 接口文档是什么?如何直接使用 ApiDoc 生成接口文档
  13. 安卓手机root、修改文件权限、更改按键映射
  14. Oracle EBS R12 - Application patch可不可以reapply
  15. 快速搞懂C语言中exit(0)与exit(1)有什么区别??
  16. vue中created、mounted、activated的区别
  17. 【CCF】公共钥匙盒
  18. IntelliJ IDEA安装后,打开文件不能编辑问题
  19. 【LeetCode】【剑指offer】【剪绳子(二)】
  20. 禁用计算机账户控制,win8系统禁止弹出用户账户控制窗口的方法

热门文章

  1. 多块盘制作成一个lvm
  2. Elastic ik插件配置热更新功能
  3. [label][responsive-web-design]网页响应测试各种尺寸的工具
  4. ASP.NET页面间的传值方法(2)
  5. linux内存管理方式,简要概括Linux内存管理的方式
  6. 删除数据清理oracle表空间,oracle删除(释放)数据文件/表空间流程
  7. java switch finally_Java中的switch疑问
  8. sql语句ding_在postgresql中结束掉正在执行的SQL语句操作
  9. QML 环形进度条canvas 98行代码实现
  10. 转换pfb/pfm字体格式为otf