Spark Steaming流式日志过滤与分析

这篇大概讲的是 spark steaming 监听 hdfs 的某个目录,当你在终端A使用 spark-submit 运行 Log2DB.py 文件后,在终端B依次上传那两个 .log 文件到 hdfs 这个目录后,那么 spark streaming 就会依据 Log2DB.py 文件按要求将清洗后的数据(日志文件中的 error 和 warn 级别的数据)保存在 MySQL 的 spark 数据库中的log032表上。
尤其需要注意的是先运行Log2DB.py后再将日志文件依次上传到监控目录中,这样才能起到监控的作用。

本网页数据源的下载链接

步骤:

1、启动hdfs

命令:

// 转到相应的工作目录
cd /home/syan/Hbase/hadoop/sbin

// 启动 HDFS
start-dfs.sh

// 查看开启Hadoop是否成功
jps

2、创建hdfs监控目录
// 创建 HDFS 目录/spark-exp6/sst-032。
hdfs dfs -mkdir -p /spark-exp6/sst-032


在ip:50070可以看到创建目录成功

3、使用 MySQL 的 root 用户创建数据库 spark,并创建表log032。

spark数据库的用户名和密码
用户名:spark
密码:spark

(1)打开Navicat连接MySQL数据库

(2)执行下列SQL语句创建spark数据库
注:IP地址是根据自身配置的

-- 创建spark数据库
create database spark;
-- 创建用户名密码为spark/spark的用户()
create user 'spark'@'IP地址' identified by 'spark';
create user 'spark'@'%' identified by 'spark';
-- 授予权限
grant all on spark.* to 'spark'@'IP地址';
grant all on spark.* to 'spark'@'%';


可以看到spark数据库已经创建成功

(3)然后执行下列SQL语句在spark数据库中创建表log032

use spark;

-- 创建表log032
create table log032(lvl varchar(15) comment '等级',method varchar(50) comment '方法',content varchar(200) comment '内容');



(4)为了让程序可以调用 MySQL 的JDBC驱动,把驱动文件mysql-connector-java-5.1.48-bin.jar放入$SPARK_HOME/jars
用xftp从Windows本地传到Ubuntu里面去

注:如果配置了Jupytor Notebook,要在$SPARK_HOME/conf/spark-env.sh文件中把这2个环境变量注释掉,并重启虚拟机。
#export PYSPARK_DRIVER_PYTHON=jupyter
#export PYSPARK_DRIVER_PYTHON_OPTS='notebook --ip=*'

(5)启动 Spark Standalone 模式。
看网上很多资料是说启动spark集群就是启动Spark Standalone模式,即在$SPARK_HOME/sbin下运行start-spark.sh文件。

注:如果你不想每次都进入这个目录去执行这个文件,那你可以选择设置临时环境或永久性环境。设置方法请看Ubuntu中设置PATH变量值

// 进入相应的工作目录
cd /home/syan/Spark/spark/sbin
// 启动服务
start-spark.sh


jps查看是否启动成功

(6)在本地编写程序Log2DB.py,有以下的要求。

(1)采用StreamingContext.textFileStream()算子监听hdfs://syan:9000/spark-exp6/sst-032目录。
(2)把 DStream 转换为 RDD。
(3)读入日志信息流,将 RDD 转为 DataFrame。
(4)DataFrame 注册为临时表。
(5)使用 SQL 过滤出级别为 error 或 warn 的日志。
(6)将过滤后的数据保存到 MySQL 的 spark数据库中的log032表

代码块:

#导入相应的包
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *#创建SparkContext对象
sc = SparkContext('spark://syan:7077','Log2DB')#生成流计算上下文,每10秒批处理一次
spark =SparkSession(sc)
ssc = StreamingContext(sc, 10)'''
采用textFileStream()算子监控目录hdfs://syan:50070/spark/sst-032不知道端口号是多少的话可以用下面这条命令查看:
hdfs getconf -confKey fs.default.name
'''
ds1 = ssc.textFileStream("hdfs://syan:9000/spark-exp6/sst-032")#把所有数据划分为[[],[]]格式
ds2 = ds1.map(lambda line:line.split("\t"))
ds3 = ds2.map(lambda arr:[arr[0],arr[1],arr[2]])def save2DB(fileRDD):if not fileRDD.isEmpty():'''构建表结构:日志级别------->lvl函数名--------->method日志内容------->content'''schema = StructType([StructField("lvl", StringType(), True), StructField("method", StringType(), True),StructField("content", StringType(), True)])#对[[],[],[]]数据转换成[[[],[],[]],[],[],[]]因为todf数据是数据格式传值:DataFrame 注册为临时表。fileRDD.map(lambda x:tuple(x)).toDF(schema).registerTempTable("tmp_log032")#使用 SQL 过滤出级别为 error 或 warn 的日志。df1 = spark.sql("select * from tmp_log032 where lvl != '[info]'")# df2 = sparkdf1.show()#写入mysql,即将过滤后的数据保存到 MySQL 的 spark数据库中的log123表df1.write.jdbc('jdbc:mysql://192.168.45.128:3306/spark?useSSL=false&user=spark&password=spark&characterEncoding=UTF-8',table='log032',mode='append',properties={'driver':'com.mysql.jdbc.Driver'})#打印输出到控制台
ds3.pprint()# 输出采用foreachRDD()算子
ds3.foreachRDD(lambda fileRDD:save2DB(fileRDD))#执行程序
ssc.start()#等待任务执行结束
ssc.awaitTermination()

Log2DB.py代码完整截图:

(7)使用 spark-submit 运行Log2DB.py
将写好的Log2DB.py上传到虚拟机(自己选一个位置,这里的是我上传的路径而已:/home/syan/Spark/Demo/input)中

终端A:
然后先转到spark解压目录下的bin目录下($SPARK_HOME/bin)

cd $SPARK_HOME/bin

注:如果你不想每次都进入这个目录去执行这个文件,那你可以选择设置临时环境或永久性环境。设置方法请看Ubuntu中设置PATH变量值

用 spark-submit运行python文件

$ spark-submit --master spark://syan:7077 /home/syan/Spark/Demo/input/Log2DB.py

(8)将我们刚刚从网盘下载下来的20180101.log、20180102.log文件上传到虚拟机(/home/syan/course/spark_course/data),再从虚拟机上传到hdfs中(/spark-exp6/sst-032)
1、把20180101.log、20180102.log文件一起上传到虚拟机(/home/syan/course/spark_course/data)

2、另外开一个终端:终端B分别把20180101.log、20180102.log(/home/syan/course/spark_course/data)从虚拟机上传到hdfs中(/spark-exp6/sst-032)

(1)终端B中上传20180101.log

终端B:

// 终端B中上传20180101.log
hdfs dfs -put /home/syan/course/spark_course/data/20180101.log /spark-exp6/sst-032


返回终端A查看打印结果:

再看看Ubuntu中的MySQL中的spark数据库的log032表有没有更新数据

(2)再回到终端B 中继续往hdfs中上传20180102.log文件

终端B:

// 终端B中上传20180102.log
hdfs dfs -put /home/syan/course/spark_course/data/20180102.log /spark-exp6/sst-032


返回终端A查看打印结果

再看看Ubuntu中的MySQL中的spark数据库的log032表有没有更新数据

在spark streaming输入源中的基本数据源是直接调用API的,而基本数据源的数据流中的文件流(textFileStream)中是通过监控文件系统目录下的变化,就比如要监控hdfs的目录,若是有新文件添加到监控目录,则将这个文件内容读入并作为数据流。但是它所监控的目录下的文件有下面三种要求:
(1)上传到监控目录中的文件要具有相同的格式。
(2)不一定是通过上传才能使能被监控,移动和重命名的方法都可以使得监控目录可以读取并将它写入到数据流中。
(3)但如果在已存在监控目录下的文件中写入或追加内容,这些追加的新数据不会被读取并写入到数据流中。

Spark Steaming流式日志过滤与分析相关推荐

  1. 从Storm和Spark 学习流式实时分布式计算的设计

    转自:http://www.dataguru.cn/thread-341168-1-1.html 流式实时分布式计算系统在互联网公司占有举足轻重的地位,尤其在在线和近线的海量数据处理上.而处理这些海量 ...

  2. Spark Streaming 流式计算实战

    这篇文章由一次平安夜的微信分享整理而来.在Stuq 做的分享,原文内容. 业务场景 这次分享会比较实战些.具体业务场景描述: 我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名 ...

  3. 大数据学习系列----基于Spark Streaming流式计算

    2019独角兽企业重金招聘Python工程师标准>>> 个性化的需求 随着互联网知识信息指数级膨胀,个性化的需求对于用户来说越来越重要,通过推荐算法和用户点击行为的流式计算可以很简单 ...

  4. 基于spark的流式数据处理—流计算处理流程以及应用场景

    本文主要从一下几个方面来介绍流计算处理流程: 概述 数据实时采集 数据实时计算 实时查询服务 概述 传统的数据处理流程,需要先采集数据并存储在关系数据库等数据管理系统中,之后由用户通过查询操作和数据管 ...

  5. 基于spark的流式数据处理—批处理和流处理区别

    静态数据 很多企业为了支持决策分析而构建的数据仓库系统,其中存放的大量历史数据就是静态数据.技术人员可以利用数据挖掘和OLAP(On-Line Analytical Processing)分析工具从静 ...

  6. Spark Stream 流式处理

    总结: Spark的核心数据结构是RDD(弹性分布式数据集) Spark Streaming 采用微批处理模式,保证消息传输精准性,采用checkpoint方式保证可靠性,具有良好的吞吐性能,延时表现 ...

  7. 2017云栖大会·杭州峰会:《在线用户行为分析:基于流式计算的数据处理及应用》之《数据可视化:构建实时动态运营数据分析大屏》篇...

    实验背景介绍 了解更多2017云栖大会·杭州峰会 TechInsight & Workshop. 本手册为云栖大会Workshop之<在线用户行为分析:基于流式计算的数据处理及应用> ...

  8. 2017云栖大会·杭州峰会:《在线用户行为分析:基于流式计算的数据处理及应用》之《数据可视化:构建实时动态运营数据分析大屏》篇

    点击有惊喜 实验背景介绍 了解更多2017云栖大会·杭州峰会 TechInsight & Workshop. 本手册为云栖大会Workshop之<在线用户行为分析:基于流式计算的数据处理 ...

  9. 图解大数据 | 流式数据处理-Spark Streaming

    作者:韩信子@ShowMeAI 教程地址:http://www.showmeai.tech/tutorials/84 本文地址:http://www.showmeai.tech/article-det ...

最新文章

  1. 机器学习四剑客3——Pandas
  2. 用户控件中动态加入脚本引用
  3. 打开excel文件并写入_双击Excel表格文件时只打开程序不能直接打开文件
  4. 从壹开始学习 NetCore 新篇章 ║ Blog.Core 开发社之招募计划书
  5. CSS 学习路线(二)选择器
  6. python中什么是关键字参数_如何使用python语言中函数的关键字参数的用法
  7. 瓜州县电子政务工程_甘肃瓜州:“厕所革命”带来乡村新生活
  8. Spring 国际化 MessageSource
  9. 文件fluent_Win10 中解决FLUENT中UDF 的方法
  10. 微信小程序踩坑(1):wx.showModal模态对话框中content换行
  11. jupyter nootbook支持matlab语言
  12. Unity 打包对接 XCode 记录
  13. android rmvb格式下载,rmvb格式转换器安卓版
  14. Python 乌龟吃鱼问题求解
  15. 又一个新概念——云编程(Cloud programming)
  16. 华为千亿美金年报里的5G、AI、云计算
  17. 黑苹果、win双系统,丢失黑苹果引导,如何修复黑苹果的引导
  18. ncbi blast MATLAB,NCBI-BLAST在线使用教程详细攻略(图解)
  19. 无心剑中译狄兰·托马斯《不要温顺地走进那个良夜》
  20. IE中不能自动选择UTF-8编码的解决办法

热门文章

  1. MySql每晚12点都会弹出这个?
  2. 哪款国产ESD二极管可直接替代LC3311CCW?
  3. 3D游戏设计读书笔记二
  4. office365安装后仍显示之前版本
  5. org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable
  6. 那一份无怨亦无悔的真情实意
  7. 网络计算机显示器接口,科普四种常见的电脑显示器连接线接口
  8. 网易云音乐的亏损,是社区经济的通病?
  9. 公寓宽带服务器无响应,利用RLDP协议解决网络环路故障
  10. 狐狸文│区块链不是用来讲故事的