Spark SQL 练习之疫情分析

2020 年,春节假期期间,国内因疫情事件影响巨大!

众志成城,抗击疫情!(衷心希望,形势能够好转)

现在,将使用的 Spark SQL 的功能,分析一组数据!

假设疫情数据有:

  1. 人员信息 (civic_info.csv)
  2. 票务信息(ticket_info.csv)

分析任务如下:

任务序号 任务描述
1 湖北籍人员信息
2 武汉疫区人员信息
3 需要对员工进行隔离观察14天的公司
4 有感染风险的车厢
5 需要隔离观察的人员信息

坚定信心、同舟共济、科学防治、精准施策!坚决打赢疫情防控阻击战!

数据信息(虚拟的测试数据)

civic_info.csv 文件内容

id_no,name,sex,age,province,city,district,residence,home_domicile,working_company
310228198706300137,李言,男,33,湖北,武汉,江岸区,湖北省武汉市江岸区XXX小区NNN室,上海市松江区XXX小区MMM室,XXX有限公司
310228198808241049,朱艳,女,32,湖北,武汉,江汉区,湖北省武汉市江汉区XXX小区NNN室,上海市嘉定区XXX小区MMM室,YYY有限公司
310228198907141175,肖人风,男,31,湖北,武汉,汉阳区,湖北省武汉市汉阳区XXX小区NNN室,上海市浦东新区XXX小区MMM室,ZZZ有限公司
310228199009212154,黄军,男,30,湖北,武汉,青山区,湖北省武汉市青山区XXX小区NNN室,上海市黄浦区XXX小区MMM室,TTT有限公司
310228199101304567,周子明,男,29,湖北,武汉,洪山区,湖北省武汉市洪山区XXX小区NNN室,上海市闵行区XXX小区MMM室,FFF有限公司
310228199204213278,张燕,女,28,湖北,武汉,江夏区,湖北省武汉市江夏区XXX小区NNN室,上海市静安区XXX小区MMM室,SSS有限公司
310228199305213306,江大仁,男,27,湖北,武汉,蔡甸区,湖北省武汉市蔡甸区XXX小区NNN室,上海市长宁区XXX小区MMM室,UUU有限公司
310228199411010721,袁天罡,男,26,湖北,武汉,黄陂区,湖北省武汉市黄陂区XXX小区NNN室,上海市虹口区XXX小区MMM室,III有限公司
310228199503220823,马鹏,男,25,湖北,武汉,硚口区,湖北省武汉市硚口区XXX小区NNN室,上海市徐汇区XXX小区MMM室,PPP有限公司
310228199608120317,聂平,男,24,湖北,黄冈,黄州区,湖北省黄冈市黄州区XXX小区NNN室,湖北省武汉市东西湖区XXX小区MMM室,WWW有限公司
310228199609170831,胡冰,女,24,湖北,孝感,孝南区,湖北省孝感市孝南区XXX小区NNN室,湖北省武汉市江夏区XXX小区MMM室,QQQ有限公司

ticket_info.csv 文件内容

ticket_no,train_no,carriage_no,seat_no,passenger_name,passenger_id,departure,destination,departure_time,arrival_time
HB9567,SH6634,B,11,李言,310228198706300137,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
HA6749,SH6634,C,23,朱艳,310228198808241049,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
HC7746,SH6634,D,14,肖人风,310228198907141175,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
HD8279,SH6634,A,22,黄军,310228199009212154,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
HM3324,SH6634,C,12,周子明,310228199101304567,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
HB4597,SH6634,D,23,张燕,310228199204213278,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
HA2163,SH6634,E,07,江大仁,310228199305213306,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
HC5632,SH6634,A,03,袁天罡,310228199411010721,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
HB3306,SH6634,B,09,马鹏,310228199503220823,武汉,上海,2020-02-09|19:30,2020-02-10|10:30
HA1601,SH6634,C,11,梁冬,310228199307290931,重庆,上海,2020-02-09|15:30,2020-02-10|10:30
HA2703,SH6634,D,15,赵珂,310228199106151321,四川,上海,2020-02-09|12:30,2020-02-10|10:30
HC7734,SH6634,F,13,戴拿,310228199212012371,拉萨,上海,2020-02-09|06:30,2020-02-10|10:30

分析数据

首先,测试要用的数据是很规整的,就不进行数据清洗了。(反正是用的 Spark SQL 进行处理的,查询效率暂时不做考究!)

使用 Spark SQL 中的 DataFrame 创建一个临时视图,然后使用临时视图调用 sql 方法。

重点是设计 sql 语句!

代码实现

项目目录结构

设计简介

首先,套用了老套的 MVC 结构思想,将项目设计为数据访问层,以及设计了单例对象 SparkUtil ,用这个单例对象去初始化 SparkSession。然后是接口设计(原谅我是一个 java 程序员吧,可以这麽理解哈,其实是 trait 的设计),首先是一个 BaseDao,它的职能是将实例化后的单例对象 sparkUtil 设置到实现类中去。

接着是业务方面,两个基本的接口,定义了需要完成的任务,也就是 DocumentContentDaoPersonnelInformationDao 。然后是它们对应的实现类,最终,需要一个入口方法!也就是 EpidemicDemo 类。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.feng</groupId><artifactId>spark_demo</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><spark.version>2.4.4</spark.version></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency></dependencies>
</project>

resources 中的两个 csv 文件

文件内容,上文中已经给出!

SparkUtil.scala 文件

package org.feng.utilimport org.apache.spark.sql.{DataFrame, SparkSession}/*** Created by Feng on 2020/2/9 13:45* CurrentProject's name is spark_demo* Spark 读取 csv 文件的工具类:设计为单例。** @see org.feng.dao.BaseDao*/
class SparkUtil {val spark = SparkSession.builder().appName("SparkUtil_".concat(System.currentTimeMillis().toString)).master("local").getOrCreate()def getDF(path:String): DataFrame ={// 加载 csv 文件spark.read.format("csv").option("sep", ",").option("inferSchema", "true").option("header", "true").load(path)}
}

BaseDao.scala

package org.feng.daoimport org.feng.util.SparkUtil/*** Created by Feng on 2020/2/9 14:03* CurrentProject's name is spark_demo**/
trait BaseDao {/*** 使用前:必须先 set 一个值* new 一个全局的 SparkUtil 对象*/var sparkUtil:SparkUtil = _def setSparkUtil(_sparkUtil:SparkUtil):Unit ={sparkUtil = _sparkUtil}def getSparkUtil:SparkUtil={sparkUtil}
}

PersonnelInformationDao.scala

package org.feng.dao/*** Created by Feng on 2020/2/9 13:27* CurrentProject's name is spark_demo* 读取人员信息 civic_info.csv 文件*/
trait PersonnelInformationDao extends BaseDao {/*** 查询湖北籍人员信息*/def hubeiPersonnel():Unit/*** 查询武汉疫区人员*/def wuhanPersonnel():Unit/*** 需要对员工隔离 14 天的公司*/def someCompany():Unit
}

DocumentContentDao.scala

package org.feng.dao/*** Created by Feng on 2020/2/9 13:29* CurrentProject's name is spark_demo** 读取文件内容 ticket_info.csv 文件*/
trait DocumentContentDao extends BaseDao {/*** 查询有感染风险的车厢*/def infectedCarriage():Unit/*** 需要隔离观察的人员信息*/def segregatedObservers():Unit
}

PersonnelInformationDaoImpl.scala

package org.feng.dao.implimport org.feng.dao.PersonnelInformationDao/*** Created by Feng on 2020/2/9 13:44* CurrentProject's name is spark_demo*/
class PersonnelInformationDaoImpl extends PersonnelInformationDao {/*** 查询湖北籍人员信息*/override def hubeiPersonnel(): Unit = {sparkUtil.spark.sql("select id_no,name,province from civic_info where province = '湖北'").show()}/*** 查询武汉疫区人员*/override def wuhanPersonnel(): Unit = {sparkUtil.spark.sql("select id_no,name,city from civic_info where city = '武汉'").show()}/*** 需要对员工隔离 14 天的公司*/override def someCompany(): Unit = {sparkUtil.spark.sql("select working_company from civic_info where city = '武汉'").show()}
}

DocumentContentDaoImpl.scala

package org.feng.dao.implimport org.feng.dao.DocumentContentDao/*** Created by Feng on 2020/2/9 15:21* CurrentProject's name is spark_demo*/
class DocumentContentDaoImpl extends DocumentContentDao{/*** 查询有感染风险的车厢*/override def infectedCarriage(): Unit = {sparkUtil.spark.sql("select train_no,carriage_no from ticket_info where departure = '武汉'").distinct().show()}/*** 需要隔离观察的人员信息*/override def segregatedObservers(): Unit = {sparkUtil.spark.sql("select id_no,name from civic_info,ticket_info where civic_info.name = ticket_info.passenger_name and " +"departure = '武汉'").show()}
}

EpidemicDemo.scala

package org.feng.clientimport org.feng.dao.impl.{DocumentContentDaoImpl, PersonnelInformationDaoImpl}
import org.feng.util.SparkUtil/*** Created by Feng on 2020/2/9 13:21* CurrentProject's name is spark_demo* 疫情分析*/
object EpidemicDemo {def main(args: Array[String]): Unit = {// 创建唯一的 sparkUtilval sparkUtil = new SparkUtil// 创建数据访问层的对象val personnelInformationDao = new PersonnelInformationDaoImplval documentContentDao = new DocumentContentDaoImpl// 设置唯一的 sparkUtilpersonnelInformationDao.setSparkUtil(sparkUtil)documentContentDao.setSparkUtil(sparkUtil)// csv 文件路径val civicInfoPath= "D:\\jee-2019-7-idea-maven-workspace\\spark_demo\\src\\main\\resources\\civic_info.csv"val ticketInfoPath = "D:\\jee-2019-7-idea-maven-workspace\\spark_demo\\src\\main\\resources\\ticket_info.csv"// 创建视图sparkUtil.getDF(civicInfoPath).createOrReplaceTempView("civic_info")sparkUtil.getDF(ticketInfoPath).createOrReplaceTempView("ticket_info")// 查询湖北籍人员信息:输出到控制台personnelInformationDao.hubeiPersonnel()// 查询武汉疫区人员信息:输出到控制台personnelInformationDao.wuhanPersonnel()// 查询需要对员工隔离 14 天的公司personnelInformationDao.someCompany()// 查询有感染风险的车厢documentContentDao.infectedCarriage()// 查询需要隔离观察的人员信息documentContentDao.segregatedObservers()}
}

程序运行结果

查询湖北籍人员信息

+------------------+------+--------+
|             id_no|  name|province|
+------------------+------+--------+
|310228198706300137|  李言|    湖北|
|310228198808241049|  朱艳|    湖北|
|310228198907141175|肖人风|    湖北|
|310228199009212154|  黄军|    湖北|
|310228199101304567|周子明|    湖北|
|310228199204213278|  张燕|    湖北|
|310228199305213306|江大仁|    湖北|
|310228199411010721|袁天罡|    湖北|
|310228199503220823|  马鹏|    湖北|
|310228199608120317|  聂平|    湖北|
|310228199609170831|  胡冰|    湖北|
+------------------+------+--------+

查询武汉疫区人员信息

+------------------+------+----+
|             id_no|  name|city|
+------------------+------+----+
|310228198706300137|  李言|武汉|
|310228198808241049|  朱艳|武汉|
|310228198907141175|肖人风|武汉|
|310228199009212154|  黄军|武汉|
|310228199101304567|周子明|武汉|
|310228199204213278|  张燕|武汉|
|310228199305213306|江大仁|武汉|
|310228199411010721|袁天罡|武汉|
|310228199503220823|  马鹏|武汉|
+------------------+------+----+

查询需要对员工隔离 14 天的公司

+---------------+
|working_company|
+---------------+
|    XXX有限公司|
|    YYY有限公司|
|    ZZZ有限公司|
|    TTT有限公司|
|    FFF有限公司|
|    SSS有限公司|
|    UUU有限公司|
|    III有限公司|
|    PPP有限公司|
+---------------+

查询有感染风险的车厢

+--------+-----------+
|train_no|carriage_no|
+--------+-----------+
|  SH6634|          C|
|  SH6634|          B|
|  SH6634|          D|
|  SH6634|          A|
|  SH6634|          E|
+--------+-----------+

查询需要隔离观察的人员信息

+------------------+------+
|             id_no|  name|
+------------------+------+
|310228198706300137|  李言|
|310228198808241049|  朱艳|
|310228198907141175|肖人风|
|310228199009212154|  黄军|
|310228199101304567|周子明|
|310228199204213278|  张燕|
|310228199305213306|江大仁|
|310228199411010721|袁天罡|
|310228199503220823|  马鹏|
+------------------+------+

共勉

再声明一次,虽然此次练习题目不难,但是我现在的心情却是很难!

这次疫情,传染性高于03年的非典,而且得病的人数也很多。

但最近两天应该是有了好消息的,就是目前的一个确诊人数的增幅降低了!

这是一个好消息!希望呢,疫情快些过去,大家能安然度过!

另外,刚才还看到一个词语: 别来无恙! 你品,你细品哈!

Spark 练习之疫情分析相关推荐

  1. Spark详解(十一):Spark运行架构原理分析

    1. Spark 运行架构总体分析 1.1 总体介绍 Spark应用程序的运行架构基本上由三部分组成,包括SparkContext(驱动程序).ClusterManger(集群资源管理器)和Execu ...

  2. 阿里云数据库快速搭建疫情分析系统最佳实践

    简介:疫情降临,疫情态势分析和防控任务迫在眉睫,如果快速搭建高效的疫情态势分析系统是众多部门和单位的难题,阿里云RDS PG+Ganos解决方案可在极短时间内完成分析系统搭建,有效助力疫情防控. 直达 ...

  3. Spark配置启动脚本分析

    2019独角兽企业重金招聘Python工程师标准>>> 今天想停止spark集群,发现执行stop-all.sh的时候spark的相关进程都无法停止.提示: no org.apach ...

  4. 2019年7月勒索病毒疫情分析

    勒索病毒的蔓延,给企业和个人都带来了严重的安全威胁.360安全大脑针对勒索病毒进行了全方位的监控与防御.从本月数据来看,反勒索服务反馈量有小幅度上升,其中Stop是反馈量上升最大的一个家族. 360解 ...

  5. 教育行业疫情分析研判报告撰写格式与模板详解

    要写好一份每日疫情研判报告的前提是需要做好大量的数据梳理分析.所以小编就来为各位分享一些疫情舆情数据快速整理分析的方法以及提供了一份教育疫情舆情分析报告格式模板,供各位参考. 疫情舆情数据快速整理分析 ...

  6. 基于Spark的用户行为分析系统

    基于Spark的用户行为分析系统源码下载 一.项目介绍   本项目主要用于互联网电商企业中使用Spark技术开发的大数据统计分析平台,对电商网站的各种用户行为(访问行为.购物行为.广告点击行为等)进行 ...

  7. Python数据分析:实时更新全国全球疫情分析

    实时更新全国全球疫情分析 简介 步骤流程 准备数据集(获取数据集) 国内数据集 国外数据集 国内分析 生成网页版 国外分析 生成网页版 后记 简介 运用到Python爬虫request库,Excel ...

  8. Spark之购物篮分析

    Spark之购物篮分析 关于购物篮分析,具体的思路可以看 Mapreduce之购物篮分析 以下是编写号的Spark程序 package MBAimport org.apache.spark.{Spar ...

  9. FineBI04:【案例】稍微复杂的案例:全球疫情分析--南丁格尔玫瑰图

    一.目标 在上一个文档,我们讲了一个简单的案例,下面我们根据上一个案例做一个稍微复杂的案例:生成一个南丁格尔玫瑰图. 二.数据准备 下载地址: 链接:https://pan.baidu.com/s/1 ...

  10. 用Python进行新型冠状病毒(COVID-19/2019-nCoV)疫情分析

    新型冠状病毒(COVID-19/2019-nCoV)疫情分析 祈LHL 重要说明 分析文档:完成度:代码质量 3:5:2 其中分析文档是指你数据分析的过程中,对各问题分析的思路.对结果的解释.说明(要 ...

最新文章

  1. outlook正在与服务器联系以获取信息,Outlook 2016点击邮件显示正在与服务器联系以获取信息...
  2. mysql set 子表,mysql update set 更新表数据
  3. 那些在一个公司死磕5-10年的人,最后都怎么样了...
  4. 23. 进程并发控制之Semaphore
  5. 08、求x的y的幂次方的最后3位数——循环
  6. php smtp tls,php – RoundcubePostfix SMTP:SSL例程:SSL3_READ_BYTES:tlsv1 alert unknown ca:s3_pkt.c...
  7. python中将已有链接的视频进行下载
  8. pdfLaTeX和XeLaTeX
  9. 来吧,自己动手撸一个分布式ID生成器组件
  10. mysql sql能力_MySQL SQL优化
  11. 2013腾讯编程马拉松||HDU 4505 小Q系列故事——电梯里的爱情 水水水
  12. transform3D转换
  13. 第一台计算机 采用工 作原理,第1讲计算机工作原理模版课件.ppt
  14. 青岛地区服务器不稳定怎么办,青岛联通现大面积DNS故障 用户该如何上网
  15. linux点亮硬盘locat,Linux中locate whereis which find grep5种查询命令总结
  16. 怎么注册quora?
  17. 标签条码打印软件如何创建连续数据变量
  18. BSP板机支持包、linux启动分析、ARM裸机编程
  19. 如何展示舞台灯光秀的艺术表现力
  20. 自定义Group,解决Group setVisibility后,子View再次设置setVisibility无效的问题

热门文章

  1. Android 集成友盟统计
  2. ssh连接服务器超时解决方案
  3. 大道至简(周爱民)第一章读后感-------伪代码
  4. 使用OLED屏显示汉字
  5. iPhone 13,战略性“不香”!
  6. centos7克隆机修改ip地址,并与另一台虚拟机ping起来
  7. matlab的特殊字符(上下标和希腊字母等)
  8. oracle exadata x7发布,没有对比就没有伤害 QData T5完虐Oracle Exadata X7
  9. 【手把手】JavaWeb 入门级项目实战 -- 文章发布系统 (作者:剽悍一小兔)第七、八、九节学习随笔
  10. 学计算机系的考公好考吗,最适合考公务员的十大专业,学个好专业,考公很容易!...