spark解析csv文件

我发现自己经常使用大型CSV文件,并且意识到我现有的工具集不能让我快速浏览它们,我以为我会花一些时间在Spark上看看是否有帮助。

我正在使用芝加哥市发布的犯罪数据集 :它的大小为1GB,其中包含400万种犯罪的详细信息:

$ ls -alh ~/Downloads/Crimes_-_2001_to_present.csv
-rw-r--r--@ 1 markneedham  staff   1.0G 16 Nov 12:14 /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv$ wc -l ~/Downloads/Crimes_-_2001_to_present.csv4193441 /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv

通过查看第一行和标题,我们可以大致了解文件的内容:

$ head -n 2 ~/Downloads/Crimes_-_2001_to_present.csv
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,"(41.75017626412204, -87.55494559131228)"

我想对“主要类型”列进行计数,以了解我们每种犯罪有多少种。 仅使用Unix命令行工具,这就是我们的方法:

$ time tail +2 ~/Downloads/Crimes_-_2001_to_present.csv | cut -d, -f6  | sort | uniq -c | sort -rn
859197 THEFT
757530 BATTERY
489528 NARCOTICS
488209 CRIMINAL DAMAGE
257310 BURGLARY
253964 OTHER OFFENSE
247386 ASSAULT
197404 MOTOR VEHICLE THEFT
157706 ROBBERY
137538 DECEPTIVE PRACTICE
124974 CRIMINAL TRESPASS
47245 PROSTITUTION
40361 WEAPONS VIOLATION
31585 PUBLIC PEACE VIOLATION
26524 OFFENSE INVOLVING CHILDREN
14788 CRIM SEXUAL ASSAULT
14283 SEX OFFENSE
10632 GAMBLING
8847 LIQUOR LAW VIOLATION
6443 ARSON
5178 INTERFERE WITH PUBLIC OFFICER
4846 HOMICIDE
3585 KIDNAPPING
3147 INTERFERENCE WITH PUBLIC OFFICER
2471 INTIMIDATION
1985 STALKING355 OFFENSES INVOLVING CHILDREN219 OBSCENITY86 PUBLIC INDECENCY80 OTHER NARCOTIC VIOLATION12 RITUALISM12 NON-CRIMINAL6 OTHER OFFENSE2 NON-CRIMINAL (SUBJECT SPECIFIED)2 NON - CRIMINALreal 2m37.495s
user    3m0.337s
sys 0m1.471s

这还不错,但似乎是Spark进行的计算类型,因此我了解了如何进行此操作。 首先,我使用以下构建文件创建了一个SBT项目:

name := "playground"version := "1.0"scalaVersion := "2.10.4"libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0"libraryDependencies += "net.sf.opencsv" % "opencsv" % "2.3"ideaExcludeFolders += ".idea"ideaExcludeFolders += ".idea_modules"

我下载了Spark,并在解压缩后启动了Spark shell:

$ pwd
/Users/markneedham/projects/spark-play/spark-1.1.0/spark-1.1.0-bin-hadoop1$ ./bin/spark-shell
...
Welcome to____              __/ __/__  ___ _____/ /___\ \/ _ \/ _ `/ __/  '_//___/ .__/\_,_/_/ /_/\_\   version 1.1.0/_/Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_51)
...
Spark context available as sc.scala>

我首先导入一些我需要的类:

scala> import au.com.bytecode.opencsv.CSVParser
import au.com.bytecode.opencsv.CSVParserscala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

现在,在快速入门示例之后 ,我们将根据犯罪CSV文件创建弹性分布式数据集(RDD):

scala> val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
crimeFile: String = /Users/markneedham/Downloads/Crimes_-_2001_to_present.csvscala> val crimeData = sc.textFile(crimeFile).cache()
14/11/16 22:31:16 INFO MemoryStore: ensureFreeSpace(32768) called with curMem=0, maxMem=278302556
14/11/16 22:31:16 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 265.4 MB)
crimeData: org.apache.spark.rdd.RDD[String] = /Users/markneedham/Downloads/Crimes_-_2001_to_present.csv MappedRDD[1] at textFile at <console>:17

下一步是使用CSV解析器处理文件的每一行。 一种简单的方法是为每行创建一个新的CSVParser :

scala> crimeData.map(line => {val parser = new CSVParser(',')parser.parseLine(line).mkString(",")}).take(5).foreach(println)
14/11/16 22:35:49 INFO SparkContext: Starting job: take at <console>:23
...
4/11/16 22:35:49 INFO SparkContext: Job finished: take at <console>:23, took 0.013904 s
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,(41.75017626412204, -87.55494559131228)
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,(41.729576153145636, -87.57568059471686)
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,(41.884543798701515, -87.72803579358926)
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,(41.785633535413176, -87.74148516669783)

可以,但是每次都创建一个新的CSVParser有点浪费,所以让我们为每个将Spark分割成文件的分区创建一个:

scala> crimeData.mapPartitions(lines => {val parser = new CSVParser(',')lines.map(line => {parser.parseLine(line).mkString(",")})}).take(5).foreach(println)
14/11/16 22:38:44 INFO SparkContext: Starting job: take at <console>:25
...
14/11/16 22:38:44 INFO SparkContext: Job finished: take at <console>:25, took 0.015216 s
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,(41.75017626412204, -87.55494559131228)
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,(41.729576153145636, -87.57568059471686)
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,(41.884543798701515, -87.72803579358926)
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,(41.785633535413176, -87.74148516669783)

您会注意到我们仍然在打印不理想的标题–让我们摆脱它!

我希望有一个“放置”功能,该功能可以使我做到这一点,但实际上没有。 相反,我们可以利用我们的知识,即第一个分区将包含第一行并以这种方式将其删除 :

scala> def dropHeader(data: RDD[String]): RDD[String] = {data.mapPartitionsWithIndex((idx, lines) => {if (idx == 0) {lines.drop(1)}lines})}
dropHeader: (data: org.apache.spark.rdd.RDD[String])org.apache.spark.rdd.RDD[String]

现在,让我们再次获取前5行并打印出来:

scala> val withoutHeader: RDD[String] = dropHeader(crimeData)
withoutHeader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at mapPartitionsWithIndex at <console>:14scala> withoutHeader.mapPartitions(lines => {val parser = new CSVParser(',')lines.map(line => {parser.parseLine(line).mkString(",")})}).take(5).foreach(println)
14/11/16 22:43:27 INFO SparkContext: Starting job: take at <console>:29
...
14/11/16 22:43:27 INFO SparkContext: Job finished: take at <console>:29, took 0.018557 s
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,(41.75017626412204, -87.55494559131228)
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,(41.729576153145636, -87.57568059471686)
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,(41.884543798701515, -87.72803579358926)
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,(41.785633535413176, -87.74148516669783)
9460355,HX113738,01/14/2014 04:21:00 AM,070XX S PEORIA ST,0820,THEFT,$500 AND UNDER,STREET,true,false,0733,007,17,68,06,1171480,1858195,2014,01/16/2014 12:40:00 AM,41.766348042591375,-87.64702037047671,(41.766348042591375, -87.64702037047671)

我们终于可以从“主要类型”列中提取值,并计算每个值出现在数据集中的次数了:

scala> withoutHeader.mapPartitions(lines => {val parser=new CSVParser(',')lines.map(line => {val columns = parser.parseLine(line)Array(columns(5)).mkString(",")})}).countByValue().toList.sortBy(-_._2).foreach(println)
14/11/16 22:45:20 INFO SparkContext: Starting job: countByValue at <console>:30
14/11/16 22:45:20 INFO DAGScheduler: Got job 7 (countByValue at <console>:30) with 32 output partitions (allowLocal=false)
...
14/11/16 22:45:30 INFO SparkContext: Job finished: countByValue at <console>:30, took 9.796565 s
(THEFT,859197)
(BATTERY,757530)
(NARCOTICS,489528)
(CRIMINAL DAMAGE,488209)
(BURGLARY,257310)
(OTHER OFFENSE,253964)
(ASSAULT,247386)
(MOTOR VEHICLE THEFT,197404)
(ROBBERY,157706)
(DECEPTIVE PRACTICE,137538)
(CRIMINAL TRESPASS,124974)
(PROSTITUTION,47245)
(WEAPONS VIOLATION,40361)
(PUBLIC PEACE VIOLATION,31585)
(OFFENSE INVOLVING CHILDREN,26524)
(CRIM SEXUAL ASSAULT,14788)
(SEX OFFENSE,14283)
(GAMBLING,10632)
(LIQUOR LAW VIOLATION,8847)
(ARSON,6443)
(INTERFERE WITH PUBLIC OFFICER,5178)
(HOMICIDE,4846)
(KIDNAPPING,3585)
(INTERFERENCE WITH PUBLIC OFFICER,3147)
(INTIMIDATION,2471)
(STALKING,1985)
(OFFENSES INVOLVING CHILDREN,355)
(OBSCENITY,219)
(PUBLIC INDECENCY,86)
(OTHER NARCOTIC VIOLATION,80)
(NON-CRIMINAL,12)
(RITUALISM,12)
(OTHER OFFENSE ,6)
(NON-CRIMINAL (SUBJECT SPECIFIED),2)
(NON - CRIMINAL,2)

我们得到了与Unix命令相同的结果,只是计算不到10秒钟就很酷!

翻译自: https://www.javacodegeeks.com/2014/11/spark-parse-csv-file-and-group-by-column-value.html

spark解析csv文件

spark解析csv文件_Spark:解析CSV文件并按列值分组相关推荐

  1. Spark:解析CSV文件并按列值分组

    我发现自己经常使用大型CSV文件,并且意识到我现有的工具集不能让我快速浏览它们,我以为我会花一些时间在Spark上看看是否有帮助. 我正在使用芝加哥市发布的犯罪数据集 :它的大小为1GB,其中包含40 ...

  2. 【python学习】批量读取Materials Studio的sdf文件,从文件中提取特定信息并按列存储在CSV文件

    批量读取Materials Studio的sdf文件,从文件中提取特定信息并按列存储在CSV文件 Materials Studio在执行dmol3模块中结构优化任务时,任务结束后会产生一系列的输出文件 ...

  3. spark sql合并小文件_Spark SQL小文件问题在OPPO的解决方案

    Spark SQL小文件是指文件大小显著小于hdfs block块大小的的文件.过于繁多的小文件会给HDFS带来很严重的性能瓶颈,对任务的稳定和集群的维护会带来极大的挑战. 一般来说,通过Hive调度 ...

  4. c++读取json文件_SPARK入门 - json文件读取

    环境说明 spark.version=2.4.4 scala.version=2.11.12 文件示例 {"name": "Michael", "ag ...

  5. rapidjson读取json文件_SPARK入门 - json文件读取

    环境说明 spark.version=2.4.4 scala.version=2.11.12 文件示例 {"name": "Michael", "ag ...

  6. spark csv 导入_Spark:生成CSV文件以导入到Neo4j

    spark csv 导入 大约一年前, 伊恩(Ian)向我指向了一个芝加哥犯罪数据集,该数据集似乎非常适合Neo4j,经过长时间的拖延,我终于可以开始导入它了. 该数据集涵盖了从2001年到现在的犯罪 ...

  7. java读取csv合适文件_解析-您可以推荐一个Java库来读取(并可能写入)CSV文件吗?...

    Super CSV是读取/解析,验证和映射CSV文件到POJO的绝佳选择! 我们(Super CSV团队)刚刚发布了一个新版本(您可以从SourceForge或Maven下载它). 读取CSV文件 以 ...

  8. Java文件读写和CSV文件解析(读取csv文件的一列或若干列)

    文件类 Java 读文件流的知识不可少,先复习一下吧! OREACLE JDK8 DOCS 文件类是Java IO的一个对象,用于指定文件的相关信息,位置和名称信息.如txt文件,csv文件对Java ...

  9. java 解析csv_java解析CSV文件(getCsvData 解析CSV文件 zipFiles 打成压缩包 exportObeEventDataExcel 前端页面响应)...

    //CSVUtil.class为类名 private static final Logger log = Logger.getLogger(CSVUtil.class); //filepath 可以为 ...

最新文章

  1. keras 的 example 文件 mnist_siamese.py 解析
  2. FVF的D3DFVF_XYZ和D3DFVF_XYZRHW的区别
  3. 免费云服务器无限流量,云服务器弄无限流量
  4. drools。drools_Drools:fireAllRules,fireUntilHalt和Timers内部代码清理的详细说明
  5. nginx 中location和root,你确定真的明白他们关系?
  6. 质量不同的球java编程_荐非常经典的java编程题全集-共50题(1-10)...
  7. 如何配置 Linux ipv6 6RD隧道
  8. apusic6.0需要新项目需要改动文件
  9. Android自带语音播报TextToSpeech功能开发记录
  10. mysql sniffer 官网下载_MySQL抓包工具:MySQL Sniffer 和性能优化
  11. vue图片懒加载 以及 页面刷新加载不显示大括号{{}}
  12. 阿里云产品有哪些?各种产品都是干什么的?
  13. 关于命运(详细娱乐诠释)
  14. mian函数传入参数
  15. ruhe调整计算机色温,显示器色温怎么调节
  16. 相对简单的服务器安装MySQL方法(2021年5月9日)
  17. Windows系统下运行hadoop、HBase程序出错Could not locate executablenull\bin\winutils.exe in the Hadoop binaries
  18. 微信新功能曝光:定时发送消息
  19. gk的树---dfs+贪心
  20. ubuntu16.04下解决wps无法使用五笔输入中文的问题

热门文章

  1. Prege(图计算框架)
  2. html和js基础功能代码备份
  3. 使用LOL盒子的数据查询简单的战斗力信息
  4. 基于物理的渲染PBR(二):挑战手写pbr和IBL环境光部分的见解
  5. 把对象push进数组
  6. java弦截法,国家计算机软考高级程序员历年真题1996
  7. navicat显示中文为问号
  8. Re01:NerLTR-DTA: drug–target binding affinity prediction based on neighbor relationship and learning
  9. [回炉计划]当输入xxxxhub的时候,居然是这样
  10. 内存泄露-Android Studio 生成Dump 的 HPROF 文件查看和分析工具