spark sql教程_如何使用Spark SQL:动手教程
spark sql教程
在本系列的第一部分中,我们研究了使用Apache Spark SQL和DataFrames “大规模”利用关系数据库的功能方面的进展。 现在,我们将基于现实世界的数据集做一个简单的教程,以了解如何使用Spark SQL。 我们将使用Spark DataFrames,但重点将更多地放在使用SQL上。 在另一篇文章中,我将详细介绍Spark DataFrames和常见操作。
我喜欢将云服务用于我的机器学习,深度学习甚至大数据分析需求,而不是费力地设置自己的Spark集群。 我将使用Databricks平台满足我的Spark需求。 Databricks是由Apache Spark的创建者创建的公司,旨在帮助客户使用Spark进行基于云的大数据处理。
最简单(免费)的方法是转到Try Databricks页面并注册社区版帐户。 您将获得一个基于云的群集,该群集是具有6GB和无限笔记本数量的单节点群集-对于免费版本来说还不错! 如果您强烈需要分析大数据,建议使用Databricks平台。
现在让我们开始案例研究。 可以在Databricks或自己的Spark集群的主屏幕上随意创建一个新笔记本。
您也可以导入包含整个教程的笔记本,但是请确保运行每个单元并在其中玩转并进行探索,而不仅仅是阅读它。 不确定如何在Databricks上使用Spark? 请遵循此简短但有用的教程 。
本教程将使您熟悉基本的Spark功能,以处理通常从数据库或平面文件获得的结构化数据。 我们将利用Spark使用DataFrames和SQL的概念,探索查询和聚合关系数据的典型方法。 我们将处理来自KDD Cup 1999的一个有趣的数据集,并尝试使用高级抽象查询数据,例如数据框,该数据框已在流行的数据分析工具(如R和Python)中流行。 我们还将研究使用SQL语言构建数据查询并从我们的数据中检索有洞察力的信息有多么容易。 由于Spark在后端高效地分发了这些数据结构,因此这也大规模地进行而无需我们做更多的事情,这使得我们的查询可扩展且尽可能高效。 我们将从加载一些基本的依赖关系开始。
import pandas as pd
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')
资料检索
KDD Cup 1999数据集用于第三届国际知识发现和数据挖掘工具竞赛,该竞赛与KDD-99(第五届知识发现和数据挖掘国际会议)一起举行。 竞争任务是建立一个网络入侵检测器,这是一种能够区分不良连接 (即入侵或攻击)和良好,正常连接的预测模型。 该数据库包含一组要审核的标准数据,其中包括在军事网络环境中模拟的多种入侵。
我们将使用简化后的数据集kddcup.data_10_percent.gz ,其中包含近50万次网络交互。 我们将从本地从Web下载此Gzip文件,然后对其进行处理。 如果您拥有良好稳定的互联网连接,请随时下载并使用完整的数据集kddcup.data.gz 。
使用网络数据
在Databricks中处理从网络检索的数据集可能会有些棘手。 幸运的是,我们有一些出色的实用程序包,例如dbutils ,可帮助简化我们的工作。 让我们快速看一下该模块的一些基本功能。
dbutils. help ( )
This module provides various utilities for users to interact with the rest of Databricks.
fs: DbfsUtils -> Manipulates the Databricks filesystem (DBFS) from the console
meta: MetaUtils -> Methods to hook into the compiler (EXPERIMENTAL)
notebook: NotebookUtils -> Utilities for the control flow of a notebook (EXPERIMENTAL)
preview: Preview -> Utilities under preview category
secrets: SecretUtils -> Provides utilities for leveraging secrets within notebooks
widgets: WidgetsUtils -> Methods to create and get bound value of input widgets inside notebooks
在Databricks中检索并存储数据
现在,我们将利用Python urllib库从其Web存储库中提取KDD Cup 99数据,将其存储在一个临时位置,然后将其移至Databricks文件系统,从而可以轻松访问此数据进行分析。
注意:如果跳过此步骤并直接下载数据,则可能最终会收到InvalidInputException:输入路径不存在错误。
import urllib
urllib . urlretrieve ( "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz" , "/tmp/kddcup_data.gz" )
dbutils. fs . mv ( "file:/tmp/kddcup_data.gz" , "dbfs:/kdd/kddcup_data.gz" )
display ( dbutils. fs . ls ( "dbfs:/kdd" ) )
建立KDD资料集
现在我们已经将数据存储在Databricks文件系统中,现在让我们将磁盘中的数据加载到Spark的传统抽象数据结构即弹性分布式数据集 (RDD)中。
data_file = "dbfs:/kdd/kddcup_data.gz"
raw_rdd = sc. textFile ( data_file ) . cache ( )
raw_rdd. take ( 5 )
您还可以使用以下代码来验证我们的数据(RDD)的数据结构类型。
type(raw_rdd)
在我们的数据上构建一个Spark DataFrame
Spark DataFrame是一种有趣的数据结构,代表数据的分布式集合。 通常,Spark中所有SQL功能的入口点都是SQLContext类。 要创建此调用的基本实例,我们只需要一个SparkContext引用即可。 在Databricks中,为此目的,此全局上下文对象可用作sc 。
from pyspark. sql import SQLContext
sqlContext = SQLContext ( sc )
sqlContext
分割CSV数据
RDD中的每个条目都是以逗号分隔的数据行,在解析和构建数据帧之前,我们首先需要对其进行拆分。
csv_rdd = raw_rdd. map ( lambda row: row. split ( "," ) )
print ( csv_rdd. take ( 2 ) )
print ( type ( csv_rdd ) )
检查功能总数(列)
我们可以使用以下代码来检查数据集中潜在列的总数。
len(csv_rdd.take(1)[0])
Out[57]: 42
了解和解析数据
KDD 99 Cup数据包含从连接数据中捕获的不同属性。 您可以获取数据中属性的完整列表,以及与每个属性/列的描述有关的更多详细信息。 我们将只使用数据集中的一些特定列,其详细信息指定如下。
功能编号 | 功能名称 | 描述 | 类型 |
---|---|---|---|
1 | 持续时间 | 连接的长度(秒数) | 连续 |
2 | protocol_type | 协议的类型,例如tcp,udp等。 | 离散的 |
3 | 服务 | 目的地上的网络服务,例如,http,telnet等。 | 离散的 |
4 | src_bytes | 从源到目标的数据字节数 | 连续 |
5 | dst_bytes | 从目标到源的数据字节数 | 连续 |
6 | 旗 | 连接的正常或错误状态 | 离散的 |
7 | 错误片段 | “错误”片段的数量 | 连续 |
8 | 紧急 | 紧急包数 | 连续 |
9 | 热 | “热”指标数量 | 连续 |
10 | num_failed_logins | 失败的登录尝试次数 | 连续 |
11 | num_compromised | “妥协”条件的数量 | 连续 |
12 | su_attempted | 如果尝试“ su root”命令,则为1; 否则为0 | 离散的 |
13 | num_root | “根”访问次数 | 连续 |
14 | num_file_creations | 文件创建操作数 | 连续 |
我们将根据它们在每个数据点(行)中的位置提取以下列,并按如下所示构建新的RDD。
from pyspark. sql import Row
parsed_rdd = csv_rdd. map ( lambda r: Row (
duration = int ( r [ 0 ] ) ,
protocol_type = r [ 1 ] ,
service = r [ 2 ] ,
flag = r [ 3 ] ,
src_bytes = int ( r [ 4 ] ) ,
dst_bytes = int ( r [ 5 ] ) ,
wrong_fragment = int ( r [ 7 ] ) ,
urgent = int ( r [ 8 ] ) ,
hot = int ( r [ 9 ] ) ,
num_failed_logins = int ( r [ 10 ] ) ,
num_compromised = int ( r [ 12 ] ) ,
su_attempted = r [ 14 ] ,
num_root = int ( r [ 15 ] ) ,
num_file_creations = int ( r [ 16 ] ) ,
label = r [ - 1 ]
)
)
parsed_rdd. take ( 5 )
构造数据框
现在,我们的数据已经被很好地解析和格式化了,让我们构建我们的DataFrame!
df = sqlContext. createDataFrame ( parsed_rdd )
display ( df. head ( 10 ) )
现在,您还可以使用以下代码签出DataFrame的架构。
df.printSchema()
建立一个临时表
我们可以利用registerTempTable()函数来构建一个临时表,以便在DataFrame上大规模运行SQL命令! 要记住的一点是,此临时表的生存期与会话相关。 它创建一个内存表,该表的作用域仅限于创建该表的群集。 数据使用Hive高度优化的内存中列格式存储。
您还可以签出saveAsTable() ,它使用Parquet格式创建一个永久的物理表,存储在S3中。 该表可用于所有集群。 表元数据(包括文件的位置)存储在Hive元存储中。
help(df.registerTempTable)
df.registerTempTable("connections")
大规模执行SQL
让我们看一些如何根据数据框在表上运行SQL查询的示例。 我们将从一些简单的查询开始,然后看一下本教程中的聚合,过滤器,排序,子查询和数据透视。
基于协议类型的连接
让我们看看如何根据连接协议的类型获得连接总数。 首先,我们将使用常规的DataFrame DSL语法获取此信息以执行聚合。
display ( df. groupBy ( 'protocol_type' )
. count ( )
. orderBy ( 'count' , ascending = False ) )
我们还可以使用SQL来执行相同的聚合吗? 是的,我们可以利用我们之前构建的表!
protocols = sqlContext. sql ( """
SELECT protocol_type, count(*) as freq
FROM connections
GROUP BY protocol_type
ORDER BY 2 DESC
""" )
display ( protocols )
您可以清楚地看到获得相同的结果,而不必担心您的后台基础结构或代码如何执行。 只需编写简单SQL!
基于好或坏(攻击类型)签名的连接
现在,我们将运行一个简单的聚合,以根据良好(正常)或不良(入侵攻击)类型检查连接总数。
labels = sqlContext. sql ( """
SELECT label, count(*) as freq
FROM connections
GROUP BY label
ORDER BY 2 DESC
""" )
display ( labels )
我们有很多不同的攻击类型。 我们可以以条形图的形式将其可视化。 最简单的方法是在Databricks笔记本中使用出色的界面选项。
这为我们提供了漂亮的条形图,您可以通过单击“ 绘图选项”进一步自定义。
另一种方法是编写代码来做到这一点。 您可以将汇总数据提取为Pandas DataFrame并将其绘制为常规条形图。
labels_df = pd. DataFrame ( labels. toPandas ( ) )
labels_df. set_index ( "label" , drop = True , inplace = True )
labels_fig = labels_df. plot ( kind = 'barh' )
plt. rcParams [ "figure.figsize" ] = ( 7 , 5 )
plt. rcParams . update ( { 'font.size' : 10 } )
plt. tight_layout ( )
display ( labels_fig. figure )
基于协议和攻击的连接
通过使用以下SQL查询,看看哪些协议最容易受到攻击。
attack_protocol = sqlContext. sql ( """
SELECT
protocol_type,
CASE label
WHEN 'normal.' THEN 'no attack'
ELSE 'attack'
END AS state,
COUNT(*) as freq
FROM connections
GROUP BY protocol_type, state
ORDER BY 3 DESC
""" )
display ( attack_protocol )
好吧,看起来像ICMP连接,其次是TCP连接受到的攻击最多。
基于协议和攻击的连接状态
让我们看一下与这些协议有关的统计量以及针对我们的连接请求的攻击。
attack_stats = sqlContext. sql ( """
SELECT
protocol_type,
CASE label
WHEN 'normal.' THEN 'no attack'
ELSE 'attack'
END AS state,
COUNT(*) as total_freq,
ROUND(AVG(src_bytes), 2) as mean_src_bytes,
ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
ROUND(AVG(duration), 2) as mean_duration,
SUM(num_failed_logins) as total_failed_logins,
SUM(num_compromised) as total_compromised,
SUM(num_file_creations) as total_file_creations,
SUM(su_attempted) as total_root_attempts,
SUM(num_root) as total_root_acceses
FROM connections
GROUP BY protocol_type, state
ORDER BY 3 DESC
""" )
display ( attack_stats )
看起来在TCP请求中传输的平均数据量要高得多,这不足为奇。 有趣的是,攻击从源到目的地传输的数据的平均有效负载要高得多。
根据TCP协议按服务和攻击类型筛选连接统计信息
鉴于我们拥有更多相关的数据和统计信息,让我们仔细看一下TCP攻击。 现在,我们将根据服务和攻击类型汇总不同类型的TCP攻击,并观察不同的指标。
tcp_attack_stats = sqlContext. sql ( """
SELECT
service,
label as attack_type,
COUNT(*) as total_freq,
ROUND(AVG(duration), 2) as mean_duration,
SUM(num_failed_logins) as total_failed_logins,
SUM(num_file_creations) as total_file_creations,
SUM(su_attempted) as total_root_attempts,
SUM(num_root) as total_root_acceses
FROM connections
WHERE protocol_type = 'tcp'
AND label != 'normal.'
GROUP BY service, attack_type
ORDER BY total_freq DESC
""" )
display ( tcp_attack_stats )
攻击类型很多,前面的输出显示了它们的特定部分。
根据TCP协议按服务和攻击类型筛选连接统计信息
现在,我们将根据持续时间,文件创建和root用户访问在查询中施加一些约束,从而过滤掉某些攻击类型。
tcp_attack_stats = sqlContext. sql ( """
SELECT
service,
label as attack_type,
COUNT(*) as total_freq,
ROUND(AVG(duration), 2) as mean_duration,
SUM(num_failed_logins) as total_failed_logins,
SUM(num_file_creations) as total_file_creations,
SUM(su_attempted) as total_root_attempts,
SUM(num_root) as total_root_acceses
FROM connections
WHERE (protocol_type = 'tcp'
AND label != 'normal.')
GROUP BY service, attack_type
HAVING (mean_duration >= 50
AND total_file_creations >= 5
AND total_root_acceses >= 1)
ORDER BY total_freq DESC
""" )
display ( tcp_attack_stats )
有趣的是, 多跳攻击可以对目标主机进行root访问!
子查询根据服务过滤TCP攻击类型
让我们尝试基于服务和攻击类型获取所有TCP攻击,以使这些攻击的总体平均持续时间大于零( > 0 )。 为此,我们可以使用所有聚合统计信息进行内部查询,提取相关查询,并在外部查询中应用平均持续时间过滤器,如下所示。
tcp_attack_stats = sqlContext. sql ( """
SELECT
t.service,
t.attack_type,
t.total_freq
FROM
(SELECT
service,
label as attack_type,
COUNT(*) as total_freq,
ROUND(AVG(duration), 2) as mean_duration,
SUM(num_failed_logins) as total_failed_logins,
SUM(num_file_creations) as total_file_creations,
SUM(su_attempted) as total_root_attempts,
SUM(num_root) as total_root_acceses
FROM connections
WHERE protocol_type = 'tcp'
AND label != 'normal.'
GROUP BY service, attack_type
ORDER BY total_freq DESC) as t
WHERE t.mean_duration > 0
""" )
display ( tcp_attack_stats )
太好了! 现在,查看此数据的另一种有趣方式是使用数据透视表,其中一个属性代表行,另一个属性代表列。 让我们看看是否可以利用Spark DataFrames做到这一点!
根据汇总数据构建数据透视表
我们将在之前的DataFrame对象的基础上,根据类型和服务汇总攻击。 为此,我们可以利用Spark DataFrames和DataFrame DSL的功能。
display ( ( tcp_attack_stats. groupby ( 'service' )
. pivot ( 'attack_type' )
. agg ( { 'total_freq' : 'max' } )
. na . fill ( 0 ) )
)
我们得到一个漂亮,整洁的数据透视表,该表根据服务和攻击类型显示所有事件!
下一步
我鼓励您出去玩Spark SQL和DataFrames。 您甚至可以导入我的笔记本并以自己的帐户使用它。
也可以随意参考我的GitHub存储库,以获取本文中使用的所有代码和笔记本。 它涵盖了我们此处未涵盖的内容,包括:
加入
视窗功能
Spark DataFrames的详细操作和转换
如果您想脱机使用它,也可以作为Jupyter Notebook访问我的教程。
在线上有很多文章和教程,因此我建议您查看一下。 Databricks的Spark SQL完整指南是有用的资源。
想使用JSON数据,但不确定使用Spark SQL? Databricks支持它! 查阅这份出色的Spark SQL JSON支持指南。
对窗口功能和SQL中的排名等高级概念感兴趣吗? 看一下“ 在Spark SQL中介绍窗口函数 ”。
我将撰写另一篇文章,以直观的方式介绍其中的一些概念,这对您来说应该很容易理解。 敬请关注!
如果您有任何反馈或疑问,可以通过LinkedIn与我联系。
这篇文章最初出现在Medium的Towards Data Science频道上,经许可重新发布。
接下来要读什么
翻译自: https://opensource.com/article/19/3/apache-spark-and-dataframes-tutorial
spark sql教程
spark sql教程_如何使用Spark SQL:动手教程相关推荐
- python实现sql数据处理_再见Python, 你好SQL
原标题:再见Python, 你好SQL 雄凌求职:专注求职内推.金融名企实习内推的教育平台.可内推投行部.研究部门.互联网.私募.基金.四大.咨询等实习和工作岗位.专注大学生背景提升.工作求职.留学申 ...
- java中sql语句怎么把开始和结束时间作为参数写sql查询_聊一聊MyBatis 和 SQL 注入间的恩恩怨怨
整理了一些Java方面的架构.面试资料(微服务.集群.分布式.中间件等),有需要的小伙伴可以关注公众号[程序员内点事],无套路自行领取 引言 MyBatis 是一种持久层框架,介于 JDBC 和 Hi ...
- 为什么preparedstatement能防止sql注入_使用Python防止SQL注入攻击的实现示例
文章背景 每隔几年,开放式Web应用程序安全项目就会对最关键的Web应用程序安全风险进行排名.自第一次报告以来,注入风险高居其位!在所有注入类型中,SQL注入是最常见的攻击手段之一,而且是最危险的.由 ...
- 转储sql文件_在Linux上SQL Server中更改SQL转储文件位置
转储sql文件 In this article, we will talk about SQL Dump files and the process to change the dump direct ...
- java 日志打印sql语句_利用log4j打印sql的log日志
默认情况下,使用ibatis是不打印ibatis相关的log的,因为内部的sql执行都是内部调用,在server的控制台是不 会 打印log的. 在log4j的配置文件log4j.properties ...
- 惠普打印机墨盒更换教程_惠普打印机加墨教程:老司机教你
01惠普打印机加墨教程 [中关村在线原创]惠普的打印机用户众多.那么墨盒用完了怎么办?很多人想到的是找个加墨教程.这里我们作为老司机就给各位带带路,来一个惠普打印机加墨教程. 惠普打印机加墨教程 惠普 ...
- php 网站访问统计插件,帝国CMS教程_网站访问统计插件使用教程_好特教程
帝国CMS教程:网站访问统计插件使用教程,先来看下效果图 上传以下图片: ******************** 安装插件 ******************** 1.如果你的后台 ...
- PostgreSQL 10.1 手册_部分 I. 教程_第 2 章 SQL语言
第 2 章 SQL语言 目录 2.1. 引言2.2. 概念2.3. 创建一个新表2.4. 在表中增加行2.5. 查询一个表2.6. 在表之间连接2.7. 聚集函数2.8. 更新2.9. 删除 本文转自 ...
- 小米mysql安装教程_小米 SOAR 开源SQL优化工具安装
github :https://github.com/xiaomi/soar 安装说明 :https://github.com/XiaoMi/soar/blob/master/doc/install. ...
最新文章
- 概述VB.NET正则表达式简化程序代码
- -webkit-overflow-scrolling与苹果
- MFC中的模态对话框与非模态对话框
- c语言从入门到精通ppt,C语言从入门到精通第1章.ppt
- 【powerdesign】从mysql数据库导出到powerdesign,生成数据字典
- MySql5.7.12设置log-bin
- Docker(一) docker简介安装以及下载运行第一个镜像
- eclipse java类图_eclipse中。green UML 自动生成类图
- 连接超时_记一次网络请求连接超时的事故
- 利用dropbox来Host你的silverlight应用
- kafka入门1-集群生产消息 报:ERROR Producer connection to localhost:9092 unsuccessful
- Linux下挂载和格式化虚拟磁盘图文教程
- matlab判断传递函数的稳定性,基于Matlab的控制系统稳定性判定.pdf
- wps office2019PC版和Mac版_来试试这款政府版WPS吧
- sam文件获取与解密
- IDEA快捷键设置,选择Eclipse风格的快捷键,自动补全快捷键,关闭当前窗口快捷键Ctrl+W 自动导入设置,引入外
- xgboost 怎么读_你真的会读书了吗?五本书让你会读书,读好书,好读书!
- 织梦后台验证码显示不出来-处理办法
- 计算机网络协议,以太网帧格式
- linux网络通的端口开的 网页打不开,打不开HTTPS网页的解决方案 解决打不开HTTPS...
热门文章
- 介绍9种常用的项目管理工具
- 日常记账后,项目图表显示各种收支类别
- dotnetty android 交互,C#教程之在 DotNetty 中实现同步请求
- 【花雕体验】14 行空板pinpong库测试外接传感器模块
- Ontology × DoraHacks 10万美金黑客松现已启航,在WEB3的世界里添“码”行空
- 知识丨软件定义汽车下的整车开发
- 优秀企业文化学习(学习节选)
- 国外注册的域名dns服务器换回国内dns服务器的详细教程!
- docker 安装 portainer 并汉化 中文版
- 【​观察】探索中小企业SaaS定制化之路 解读搭搭云三大“颠覆式”创新