背景:给定一数据集,结构如下:

数据说明:

字段

字段说明

positionName

职位名称

salary

薪水

workYear

工作年限

city

城市

companyShortName

公司简称

companySize

公司规模

district

所在区

financeStage

融资阶段

industryField

所在领域

thirdType

职位类型

resumeProcessDay

简历日处理

resumeProcessRate

简历处理率

任务:使用spark对给定的数据进行数据分析

要求:拆分字段salary -> min_salary,max_salry,并且取薪资的整数

说明:

如果salary字段是否包含类似'10k-20k*15薪'的值 如果存在 则把 *15这样的类似数据给去掉,保留10K-20K; 则拆分为 min_salary,max_salry 分别是 10,20,

如果salary字段 类似10-20K 则拆分为 min_salary,max_salry 分别是 10,20,

如果 salay字段为空值或者 面议,则拆分为 min_salary,max_salry 分别是 -1,-1,

拆分后字段 表结构为

字段

positionName

salary

min_salary

max_salry

workYear

city

companyShortName

companySize

district

financeStage

industryField

thirdType

resumeProcessDay

resumeProcessRate

分析思路:

根据逗号分隔为多列字段。

1、文件丢进去

2、上传到hdfs

[admin@Master ~]$ hadoop fs -mkdir /user/admin/spark

[admin@Master ~]$ hadoop fs -put zhaopin.txt /user/admin/spark

[admin@Master ~]$ hadoop fs -ls -R /user/admin/spark

3、进入shell交互

[admin@Master ~]$ spark-shell

scala>

import spark.implicits._

val Df1 = spark.read.textFile("/user/admin/spark/zhaopin.txt");

Df1.show

# 本部操作创建一个名为Df1的RDD.首先要确保zhaopin.txt在HDFS文件系统相应的路径中。

Df1.show

/**

* withColumn的第二个参数要传入已有列的Column对象,否则会报错;

* column的表达式只能引用此数据集提供的属性。 添加引用其他数据集的列是错误的

*/

val Df2 = Df1.withColumn("splitcol",split(Df1.col("value"), ",")).select(

col("splitcol").getItem(0).as("positionName"),

col("splitcol").getItem(1).as("salary"),

col("splitcol").getItem(2).as("workYear"),

col("splitcol").getItem(3).as("city"),

col("splitcol").getItem(4).as("companyShortName"),

col("splitcol").getItem(5).as("companySize"),

col("splitcol").getItem(6).as("district"),

col("splitcol").getItem(7).as("financeStage"),

col("splitcol").getItem(8).as("industryField"),

col("splitcol").getItem(9).as("thirdType"),

col("splitcol").getItem(10).as("resumeProcessDay"),

col("splitcol").getItem(11).as("resumeProcessRate")

).drop("splitcol");

Df2.show

Df2.write.csv("/user/admin/spark/output02");

如果双字符拆分,以逗号和短横线:

val Df3 = Df1.withColumn("splitcol",split(Df1.col("value"), ",|-")).select(

col("splitcol").getItem(0).as("positionName"),

col("splitcol").getItem(1).as("min_salary"),

col("splitcol").getItem(2).as("max_salry"),

col("splitcol").getItem(3).as("workYear"),

col("splitcol").getItem(4).as("city"),

col("splitcol").getItem(5).as("companyShortName"),

col("splitcol").getItem(6).as("companySize"),

col("splitcol").getItem(7).as("district"),

col("splitcol").getItem(8).as("financeStage"),

col("splitcol").getItem(9).as("industryField"),

col("splitcol").getItem(10).as("thirdType"),

col("splitcol").getItem(11).as("resumeProcessDay"),

col("splitcol").getItem(12).as("resumeProcessRate")

).drop("splitcol");

# 这个思路是有问题的,因为把年限也给分拆开了,表结构对应错位了。除非把年限也拆分为两个表结构字段。

Df3.show

val Df4 = Df3.withColumn("max_salry",regexp_replace(col("max_salry"), "\\*15", ""));

Df4.show

Df4.write.csv("/user/admin/spark/output04");

#RDD的saveASTextFile如果文件存在则无法追加写入,数据只能覆盖,对于有数据追加需求的人很不友好。而且saveASTextFile只能导出一列的数据,多列无效。所以不能用saveASTextFile方法。

val Df5 = Df4.withColumn("min_salary",regexp_replace(col("min_salary"), "k", ""));

Df5.show

val Df6 = Df5.withColumn("max_salry",regexp_replace(col("max_salry"), "k", ""));

Df6.show

如果只保留最低工资和最高工资两列:

Df2.show

val Df8 = Df2.withColumn("splitcol",split(col("salary"), "-")).select(

col("splitcol").getItem(0).as("min_salary"),

col("splitcol").getItem(1).as("max_salry")

).drop("splitcol");

Df8.show

Df8.write.csv("/user/admin/spark/output08") ;

Df6.show

import org.apache.spark.sql._

Df6.repartition(2).write.mode(SaveMode.Overwrite).text("/user/admin/spark/output02");

#报错,因为文本输出只支持一列数据,我们有13列,故报错。只能以csv格式导出。

注意:DataSrt[Row]格式的数据无法写入到text文件中,因为text文件不含表头信息,它只能保存一列的数据,多列的数据保存时会报错。

Df6.write.parquet("/user/admin/spark/output08") ;

# parquent是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。Parquet是语言无关的,而且不与任何一种数据处理框架绑定在一起,适配多种语言和组件,能够与Parquet配合的组件有:

* 查询引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL

* 计算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite

* 数据模型: Avro, Thrift, Protocol Buffers, POJOs

Spark已经为我们提供了parquet样例数据,就保存在“/usr/local/spark/examples/src/main/resources/”这个目录下,有个users.parquet文件,这个文件格式比较特殊,如果你用vim编辑器打开,或者用cat命令查看文件内容,肉眼是一堆乱七八糟的东西,是无法理解的。只有被加载到程序中以后,Spark会对这种格式进行解析,然后我们才能理解其中的数据。

parquet既保存数据又保存schema信息(列的名称、类型、列的偏移量(它的文件中没有换行,相同列的数据存在一起,而不是一个记录的数据存在一起))

问题:如果 salay字段为空值或者 面议,则拆分为 min_salary,max_salry 分别是 -1,-1,

import org.apache.spark.sql._

我们把原始数据改几个null进去。重新命名zhaopin01.txt上传。

[admin@Master ~]$ hadoop fs -put zhaopin01.txt /user/admin/spark

[admin@Master ~]$ hadoop fs -ls -R /user/admin/spark

import spark.implicits._

val Df11 = spark.read.textFile("/user/admin/spark/zhaopin01.txt");

Df11.show

val Df12 = Df11.na.fill(value="-1--1",Array[String]("value"));

#这种空值替换思路是错误的,因为只有一列,而null仅仅为列中的某字符串。

Df12.show

先拆分,以逗号分12列,再替换salary表结构字段下的空值为-1--1,然后再拆分salary,再去除不要的字符串*15和k。

val Df9 = Df11.withColumn("splitcol",split(Df11.col("value"), ",")).select(

col("splitcol").getItem(0).as("positionName"),

col("splitcol").getItem(1).as("salary"),

col("splitcol").getItem(2).as("workYear"),

col("splitcol").getItem(3).as("city"),

col("splitcol").getItem(4).as("companyShortName"),

col("splitcol").getItem(5).as("companySize"),

col("splitcol").getItem(6).as("district"),

col("splitcol").getItem(7).as("financeStage"),

col("splitcol").getItem(8).as("industryField"),

col("splitcol").getItem(9).as("thirdType"),

col("splitcol").getItem(10).as("resumeProcessDay"),

col("splitcol").getItem(11).as("resumeProcessRate")

).drop("splitcol");

Df9.show

val Df10 = Df9.na.fill(value="-1--1",Array[String]("salary"));

val Df10 = Df9.na.fill("-1--1",Seq("salary")).show

如果两列批量替换则:

val Df10 = Df9.na.fill("-1--1",Seq("salary","workYear")).show

#填充失败,暂时未找到原因。

Df10.show()

我们使用方法二:利用map进行填充,试试:

import java.util

var map = new util.HashMap[String, Any]()

map.put("salary","-1--1")

map.put("workYear","无")

println(map)

var Df20 = Df9.na.fill(map)

Df20.show()

#还是填充失败。

我们换个思路,先以“-”拆分,然后再对分开的两列进行一列一列的替换。或者清洗完再替换。

import org.apache.spark.sql._

val Df22 = Df20.withColumn("splitcol",split(col("salary"), "-")).select(

'positionName,

col("splitcol").getItem(0).as("min_salary"),

col("splitcol").getItem(1).as("max_salry"),

'workYear,

'city,

'companyShortName,

'companySize,

'district,

'financeStage,

'industryField,

'thirdType,

'resumeProcessDay,

'resumeProcessRate

).drop("splitcol");

Df22.show

然后,再清洗,去掉多余的字符串。

val Df23 = Df22.withColumn("min_salary",regexp_replace(col("min_salary"), "k", "")).withColumn("max_salry",regexp_replace(col("max_salry"), "k", "")).withColumn("max_salry",regexp_replace(col("max_salry"), "\\*15", ""));

Df23.show

val Df24 = Df23.na.fill(value="-1",Array[String]("min_salary"));

val Df25 = Df24.na.fill(value="-1",Array[String]("max_salry"));

或val Df24 = D23.na.fill("-1",Seq("min_salary"));

Df25.show

#可以看出max_salry修改成功,但是min_salary失败,应该是表格中的null不能手动写成null,手动应该写成空着就好。大家可以试一下。

通过以上示例,我们简单的学习了spark拆分字段方法。

文章作者傅先全:深信服云计算认证专家,产业教育中心资深讲师,曾任职于国内知名企业、某知名教育集团,分别担任云平台资深架构师、IT课程总监及名师团金牌讲师、四川某大学特聘企业讲师。十余年云计算、大数据行业从业经验,在企业信息化建设、企业项目管理、云平台架构设计等方面有较强的实战经验;对IT技术、云计算技术、大数据技术相关课程具备丰富的课程交付经验。

深信服技术认证之使用spark进行数据分析示例之拆分字段相关推荐

  1. 深信服技术认证之容灾与备份(一)

    1.1 业务数据可用性问题 随着信息技术的发展,互联网上的数据量激增,随之而来的数据和业务的可用性越来越重要.但是实际上业务系统中的业务数据可用性问题就像打地鼠游戏一样,永远猜不到下一个问题会在哪里冒 ...

  2. 深信服技术认证之IPV6地址表示及常见分类

    根据最新数据显示我国IPV6活跃用户数已达4.35亿,约占中国网民的46.27%,IPV6地址资源位居全球第二,未来IPV6市场规模将超10亿.在巨大的资源背景推动下,了解IPV6已经成为当代IT从业 ...

  3. 深信服技术认证之F5隐写工具初探

    什么是"隐写" "隐写",即人们通常所说的信息隐藏,是利用人类感觉器官的不敏感性(感觉冗余),以及多媒体数字信号本身存在的冗余(数据冗余特性),将秘密信息隐藏于 ...

  4. IBM 技术文档:Spark, 快速数据分析的又一选择

    IBM 技术文档:Spark, 快速数据分析的又一选择 原文出处:http://www.ibm.com/developerworks/library/os-spark/ 摘要:尽管Hadoop在分布式 ...

  5. 深信服技术支持工程师(安全、云计算方向)面试题目

    深信服的技术支持面试确实要比其他厂商的难得很多,我实习的时候也有非常多的985/211 的本科生和研究生,最开始将近2000个人,到最后才留下几十个,竞争是异常惨烈(秋招春招也肯定激烈)毕竟给的薪资比 ...

  6. 面经----深信服---技术支持实习生

    前段时间面了深信服的技术面,大概20分钟左右,今天给大家分享一下过程.基本上简历上写的都会问 1.自我介绍,然后面试官介绍了他公司的情形,会经常加班,问我有什么看法 2.谈谈ospf的工作过程 3.浅 ...

  7. 上网行为安全之深信服用户认证技术和用户、组管理

    文章目录 认证技术框架 1.不需要认证技术 1.1数据包特征信息 1.2不需要认证配置思路 1.3不需要认证效果显示 2.IP/MAC地址绑定技术 2.1工作原理 2.2配置思路 2.3 配置结果显示 ...

  8. 深信服SCSA认证最新知识点四

    SCSA认证四: 1.如果要分配资源的访问权限给用户,是通过以下哪项配置实现的? 通过角色管理把用户和资源管理起来 FTP使用的协议是TCP NGAF僵尸网络防护功能的说法 恶意域名重新定向的原理是恶 ...

  9. 秋招深信服技术服务1面过2面挂

    一面(技术面) 基本都是问简历上的东西 1.自我介绍 然后就从简历上抽东西问 1.stp过程 2.stp&rstp&mstp 3.tcp三次握手 4.udp 5.ospf状态机 6.D ...

  10. 深信服联合谷安学院,技术认证调研活动

    深信服基于当前社会发展趋势,凭借20年来对网络信息化行业发展的深刻理解及技术人才培养,推出社会化认证体系覆盖网络安全及云计算领域,致力于提供先进的人才培养理念及体系化的专业技术认证标准,培养全球领先的 ...

最新文章

  1. 谷歌将屏蔽一切与加密货币相关的广告 6月正式生效
  2. C++中cin、cin.get()、cin.getline()、getline()、gets()等函数的用法
  3. 前端面试高频题:删除数组重复元素的多种方法
  4. wps如何交叉引用多个文献_WPS中引用参考文献的设置
  5. 看图说说class文件结构(部分)
  6. LaTeX(6)——LaTeX引用使用(\label)
  7. 新手入门Java疯狂讲义遇到的100个问题
  8. cad打开a3样板图形_cad a3图框下载-cada3标准图框模板 dwg版 - 河东下载站
  9. android局域网怎么传文件,两手机同一局域网怎么传文件
  10. 从算法原理到应用部署!微信「扫一扫识物」 的背后技术揭秘
  11. 舞蹈课(dancingLessons)
  12. django 开发(一) mezzanine源码+ubuntu实现CMS Demo
  13. WSL2 Ubuntu18.04 apt-get update失败
  14. 基于用户的协同过滤与基于物品的协同过滤比较
  15. 过拟合现象,原因,以及降低过拟合的方法
  16. 工作3年,看啥资料能月薪30K?
  17. firefox的一些插件~
  18. 中班音乐活动计算机反思,中班音乐活动教案及反思
  19. 设计程序,判断给定的一个字符是否是英文字母。
  20. mamp pro中mysql报错解决

热门文章

  1. 将1自动补位为01_自动补位为辅助后游戏就输了一半?那是你不懂辅助的正确打开方式...
  2. 持久性连接和非持久性连接
  3. WEB安全(十六)单点登录的基本实现
  4. PHP常用正则表达式,如验证网址,邮箱等
  5. hive 学习系列五(hive 和elasticsearch 的交互,很详细哦,我又来吹liubi了)
  6. BH1750 传感器实战教学 —— 硬件设计篇
  7. 动态规划入门(走楼梯问题 c++)
  8. uchar t 单片机C语言的注释是什么,uchar(单片机中uchar是什么意思)
  9. 程序员月薪8000,丢人吗?
  10. 三星android文件传输,三星手机怎么连接电脑?三星手机连接电脑传输文件教程...