Flume+Kafka+Spark Streaming+MySQL实时日志分析
文章目录
- 项目背景
- 案例需求
- 一、分析
- 1、日志分析
- 二、日志采集
- 第一步、代码编辑
- 2、启动采集代码
- 三、编写Spark Streaming的代码
- 第一步 创建工程
- 第二步 选择创建Scala工程
- 第三步 设置工程名与工程所在路径和使用的Scala版本后完成创建
- 第四步 创建scala文件
- 第五步:导入依赖包
- 第六步:引入本程序所需要的全部方法
- 第七步:创建main函数与Spark程序入口。
- 第八步:设置kafka服务的主机地址和端口号,并设置从哪个topic接收数据和设置消费者组
- 第九步:数分析
- 第十步:保存计算结果
- 第十一步 数据库设计
- jumpertab表
- pvtab表
- regusetab表
- 四、编译运行
- 第一步、将工程添加到jar文件并设置文件名称
- 第二步、生成jar包
- 第三步、提交运行Spark Streaming程序
- 第四步:查看数据库
- 完整代码如下
本案例源码下载
链接:https://pan.baidu.com/s/1IzOvSCtLvZzj81XZaYl6CQ
提取码:i6i8
项目背景
案例需求
要求:实时分析服务器日志数据,并实时计算出某时间段内的浏览量等信息。
使用技术:Flume-》Kafka-》SparkStreaming-》MySql数据库
架构中通过Flume实时监控日志文件,当日志文件中出现新数据时将该条数据发送给Kafka,并有Spark Streaming接收进行实时的数据分析最后将分析结果保存到MySQL数据库中。
一、分析
1、日志分析
1.通过浏览器访问服务器中的网页,每访问一次就会产生一条日志信息。日志中包含访问者IP、访问时间、访问地址、状态码和耗时等信息,如下图所示:
二、日志采集
第一步、代码编辑
通过使用Flume实时监控服务器日志文件内容,每生成一条都会进行采集,并将采集的结构发送给Kafka,Flume代码如下。
2、启动采集代码
代码编辑完成后启动Flume对服务器日志信息进行监控,进入Flume安装目录执行如下代码。
[root@master flume]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/access_log-HDFS.properties -Dflume.root.logger=INFO,console
三、编写Spark Streaming的代码
第一步 创建工程
第二步 选择创建Scala工程
第三步 设置工程名与工程所在路径和使用的Scala版本后完成创建
第四步 创建scala文件
第五步:导入依赖包
第六步:引入本程序所需要的全部方法
import java.sql.DriverManager //连接数据库
import kafka.serializer.StringDecoder //序列化数据
import org.apache.spark.streaming.dstream.DStream //接收输入数据流
import org.apache.spark.streaming.kafka.KafkaUtils //连接Kafka
import org.apache.spark.streaming.{Seconds, StreamingContext} //实时流处理
import org.apache.spark.SparkConf //spark程序的入口函数
第七步:创建main函数与Spark程序入口。
def main(args: Array[String]): Unit = {//创建sparksessionval conf = new SparkConf().setAppName("Consumer")val ssc = new StreamingContext(conf,Seconds(20)) //设置每隔20秒接收并计算一次
}
第八步:设置kafka服务的主机地址和端口号,并设置从哪个topic接收数据和设置消费者组
//kafka服务器地址
val kafkaParam = Map("metadata.broker.list" -> "192.168.10.10:9092")
//设置topic
val topic = "testSpark".split(",").toSet
//接收kafka数据
val logDStream: DStream[String] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParam,topic).map(_._2)
第九步:数分析
接收到数据后,对数据进行分析,将服务器日志数据按照空格进行拆分,并分别统计出阶段时间内的网站浏览量、用户注册数量和用户的跳出率并将统计结果转换为键值对类型的RDD。
//拆分接收到的数据val RDDIP =logDStream.transform(rdd=>rdd.map(x=>x.split(" ")))//进行数据分析val pv = RDDIP.map(x=>x(0)).count().map(x=>("pv",x)) //用户浏览量val jumper = RDDIP.map(x=>x(0)).map((_,1)).reduceByKey(_+_).filter(x=>x._2 == 1).map(x=>x._1).count.map(x=>("jumper",x)) //跳出率val reguser =RDDIP.filter(_(8).replaceAll("\"","").toString == "/member.php?mod=register&inajax=1").count.map(x=>("reguser",x)) //注册用户数量
第十步:保存计算结果
遍历统计结果RDD取出键值对中的值并分别分别将分析结果保存到pvtab、jumpertab和regusetab表中,最后启动Spark Streaming程序。
pv.foreachRDD(line =>line.foreachPartition(rdd=>{rdd.foreach(word=>{val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")val format = new java.text.SimpleDateFormat("yyyy-MM-dd H:mm:ss")val dateFf= format.format(new java.util.Date())val sql = "insert into pvtab(time,pv) values("+"'"+dateFf+"'," +"'"+word._2+"')"conn.prepareStatement(sql).executeUpdate()})}))jumper.foreachRDD(line =>line.foreachPartition(rdd=>{rdd.foreach(word=>{val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")val format = new java.text.SimpleDateFormat("yyyy-MM-dd H:mm:ss")val dateFf= format.format(new java.util.Date())val sql = "insert into jumpertab(time,jumper) values("+"'"+dateFf+"'," +"'"+word._2+"')"conn.prepareStatement(sql).executeUpdate()})}))reguser.foreachRDD(line =>line.foreachPartition(rdd=>{rdd.foreach(word=>{val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")val format = new java.text.SimpleDateFormat("yyyy-MM-dd H:mm:ss")val dateFf= format.format(new java.util.Date())val sql = "insert into regusetab(time,reguse) values("+"'"+dateFf+"'," +"'"+word._2+"')"conn.prepareStatement(sql).executeUpdate()})}))ssc.start() //启动Spark Streaming程序
第十一步 数据库设计
创建一个数据库名为“test”,并在该库中创建三个表分别名为"jumpertab"、“pvtab”、“regusetab”,数据库结构如下图所示
jumpertab表
pvtab表
regusetab表
四、编译运行
第一步、将工程添加到jar文件并设置文件名称
第二步、生成jar包
第三步、提交运行Spark Streaming程序
[root@master bin]# ./spark-submit --master local[*] --class com.spark.streaming.sparkword /usr/local/Streaminglog.jar
第四步:查看数据库
完整代码如下
package spark
import java.sql.DriverManager
import java.util.Calendarimport kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
object kafkaspark {def main(args: Array[String]): Unit = {// 创建sparksessionval conf = new SparkConf().setAppName("Consumer")val ssc = new StreamingContext(conf,Seconds(1))val kafkaParam = Map("metadata.broker.list" -> "192.168.10.10:9092")val topic = "testSpark".split(",").toSet//接收kafka数据val logDStream: DStream[String] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParam,topic).map(_._2)//拆分接收到的数据val RDDIP =logDStream.transform(rdd=>rdd.map(x=>x.split(" ")))//进行数据分析val pv = RDDIP.map(x=>x(0)).count().map(x=>("pv",x))val jumper = RDDIP.map(x=>x(0)).map((_,1)).reduceByKey(_+_).filter(x=>x._2 == 1).map(x=>x._1).count.map(x=>("jumper",x))val reguser =RDDIP.filter(_(8).replaceAll("\"","").toString == "/member.php?mod=register&inajax=1").count.map(x=>("reguser",x))//将分析结果保存到MySQL数据库pv.foreachRDD(line =>line.foreachPartition(rdd=>{rdd.foreach(word=>{val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")val format = new java.text.SimpleDateFormat("H:mm:ss")val dateFf= format.format(new java.util.Date())var cal:Calendar=Calendar.getInstance()cal.add(Calendar.SECOND,-1)var Beforeasecond=format.format(cal.getTime())val date = Beforeasecond.toString+"-"+dateFf.toStringval sql = "insert into pvtab(time,pv) values("+"'"+date+"'," +"'"+word._2+"')"conn.prepareStatement(sql).executeUpdate()})}))jumper.foreachRDD(line =>line.foreachPartition(rdd=>{rdd.foreach(word=>{val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")val format = new java.text.SimpleDateFormat("H:mm:ss")val dateFf= format.format(new java.util.Date())var cal:Calendar=Calendar.getInstance()cal.add(Calendar.SECOND,-1)var Beforeasecond=format.format(cal.getTime())val date = Beforeasecond.toString+"-"+dateFf.toStringval sql = "insert into jumpertab(time,jumper) values("+"'"+date+"'," +"'"+word._2+"')"conn.prepareStatement(sql).executeUpdate()})}))reguser.foreachRDD(line =>line.foreachPartition(rdd=>{rdd.foreach(word=>{val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")val format = new java.text.SimpleDateFormat("H:mm:ss")val dateFf= format.format(new java.util.Date())var cal:Calendar=Calendar.getInstance()cal.add(Calendar.SECOND,-1)var Beforeasecond=format.format(cal.getTime())val date = Beforeasecond.toString+"-"+dateFf.toStringval sql = "insert into regusetab(time,reguse) values("+"'"+date+"'," +"'"+word._2+"')"conn.prepareStatement(sql).executeUpdate()})}))val num = logDStream.map(x=>(x,1)).reduceByKey(_+_)num.print()//启动Streamingssc.start()ssc.awaitTermination()ssc.stop()}
}
Flume+Kafka+Spark Streaming+MySQL实时日志分析相关推荐
- Flume+Kafka+Spark Streaming实现大数据实时流式数据采集
近年来,随着企业信息化建设的飞速发展,大数据应用的问题越来越备受关注.很多企业投入大量的人力.物力和财力建设企业大数据平台,平台建设工作涵盖数据采集.数据处理.数据存储.数据服务.数据展示以及数据质量 ...
- 【python+flume+kafka+spark streaming】编写word_count入门示例
一. 整体架构的一些理解 1.整体架构的理解: 架构中的角色分为了数据采集,数据缓冲,还有数据处理. flume由于输入和输出的接口众多,于是利用这特点来实现无编程的数据采集. 无编程的数据采集,我是 ...
- linux流式访问日志,流式实时日志分析系统的实现原理
我们知道网站用户访问流量是不间断的,基于网站的访问日志,即 Web log 分析是典型的流式实时计算应用场景.比如百度统计,它可以做流量分析.来源分析.网站分析.转化分析.另外还有特定场景分析,比如安 ...
- 使用 Kafka 和 Spark Streaming 构建实时数据处理系统
使用 Kafka 和 Spark Streaming 构建实时数据处理系统 来源:https://www.ibm.com/developerworks,这篇文章转载自微信里文章,正好解决了我项目中的 ...
- 大数据主题分享第三期 | 基于ELK的亿级实时日志分析平台实践
猫友会希望建立更多高质量垂直细分社群,本次是"大数据学习交流付费群"的第三期分享. "大数据学习交流付费群"由猫友会联合,斗鱼数据平台总监吴瑞诚,卷皮BI技术总 ...
- grafana计算不同时间的差值_大数据时代!如何基于Spark Streaming构建实时计算平台...
随着互联网技术的迅速发展,用户对于数据处理的时效性.准确性与稳定性要求越来越高,如何构建一个稳定易用并提供齐备的监控与预警功能的实时计算平台也成了很多公司一个很大的挑战. 自2015年携程实时计算平台 ...
- 实时计算业务介绍实时日志分析
4.1 实时计算业务介绍 学习目标 目标 了解实时计算的业务需求 知道实时计算的作用 应用 无 4.1.1 实时计算业务需求 实时(在线)计算: 解决用户冷启动问题 实时计算能够根据用户的点击实时反馈 ...
- 基于Flink的实时日志分析系统实践
前言 目前业界基于 Hadoop 技术栈的底层计算平台越发稳定成熟,计算能力不再成为主要瓶颈. 多样化的数据.复杂的业务分析需求.系统稳定性.数据可靠性, 这些软性要求, 逐渐成为日志分析系统面对的主 ...
- Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率
一. 实战 1.用Spark Streaming实现实时WordCount 架构图: 说明:在hadoop1:9999下的nc上发送消息,消费端接收消息,然后并进行单词统计计算. * 2.安装并启动生 ...
最新文章
- 教程:14、系统性能分析
- 线程常用方法,线程安全和同步锁
- python处理csv文件案例_python3读取csv文件任意行列代码实例
- 推荐策略产品经理:剖析协同过滤(千人千面推荐的核心 )
- 深入探讨傅立叶变换、拉普拉斯变换、Z变换的联系与应用
- 【51NOD】1486 大大走格子
- 飞行模式的开启和关闭
- how to get the space size of some tables in one database?
- WHEREIS(1)
- xForm应用开发手册
- C语言去除字符串空格
- 【深度学习】你该会的精选面试题(一)
- iOS 疑难杂症 — — 推送本地国际化 loc-key 本地化失败的问题
- Windows 10 64位系统中安装加密狗驱动出现the returncode is 3003错误的解决方法
- 离散数学(下)第十章 群与环
- python解析excel公式_读取Excel单元格值,而不是计算它的公式-openpyx
- Python核心编程 课后习题 第一部分
- 驾驭你的“职场布朗运动” (作者李云)
- 乐视 无法播放服务器文件夹,乐视电视最新常见问题及解决方法分享!
- 12306 Tickets自动化购票软件操作说明与获取