文章目录

  • 项目背景
  • 案例需求
  • 一、分析
    • 1、日志分析
  • 二、日志采集
    • 第一步、代码编辑
    • 2、启动采集代码
  • 三、编写Spark Streaming的代码
    • 第一步 创建工程
    • 第二步 选择创建Scala工程
    • 第三步 设置工程名与工程所在路径和使用的Scala版本后完成创建
    • 第四步 创建scala文件
    • 第五步:导入依赖包
    • 第六步:引入本程序所需要的全部方法
    • 第七步:创建main函数与Spark程序入口。
    • 第八步:设置kafka服务的主机地址和端口号,并设置从哪个topic接收数据和设置消费者组
    • 第九步:数分析
    • 第十步:保存计算结果
    • 第十一步 数据库设计
    • jumpertab表
    • pvtab表
    • regusetab表
  • 四、编译运行
    • 第一步、将工程添加到jar文件并设置文件名称
    • 第二步、生成jar包
    • 第三步、提交运行Spark Streaming程序
    • 第四步:查看数据库
  • 完整代码如下

本案例源码下载
链接:https://pan.baidu.com/s/1IzOvSCtLvZzj81XZaYl6CQ
提取码:i6i8

项目背景

网络发展迅速的时代,越来越多人通过网络获取跟多的信息或通过网络作一番自己的事业,当投身于搭建属于自己的网站、APP或小程序时会发现,经过一段时间经营和维护发现浏览量和用户数量的增长速度始终没有提升。在对其进行设计改造时无从下手,当在不了解用户的浏览喜欢和个用户群体的喜好。虽然服务器日志中明确的记载了用户访浏览的喜好但是通过普通方式很难从大量的日志中及时有效的筛选出优质信息。Spark Streaming是一个实时的流计算框架,该技术可以对数据进行实时快速的分析,通过与Flume、Kafka的结合能够做到近乎零延迟的数据统计分析。

案例需求

要求:实时分析服务器日志数据,并实时计算出某时间段内的浏览量等信息。

使用技术:Flume-》Kafka-》SparkStreaming-》MySql数据库

#案例架构

架构中通过Flume实时监控日志文件,当日志文件中出现新数据时将该条数据发送给Kafka,并有Spark Streaming接收进行实时的数据分析最后将分析结果保存到MySQL数据库中。

一、分析

1、日志分析

1.通过浏览器访问服务器中的网页,每访问一次就会产生一条日志信息。日志中包含访问者IP、访问时间、访问地址、状态码和耗时等信息,如下图所示:

二、日志采集

第一步、代码编辑

通过使用Flume实时监控服务器日志文件内容,每生成一条都会进行采集,并将采集的结构发送给Kafka,Flume代码如下。

2、启动采集代码

代码编辑完成后启动Flume对服务器日志信息进行监控,进入Flume安装目录执行如下代码。

[root@master flume]# bin/flume-ng agent --name a1 --conf conf  --conf-file conf/access_log-HDFS.properties  -Dflume.root.logger=INFO,console

效果下图所示。

三、编写Spark Streaming的代码

第一步 创建工程

第二步 选择创建Scala工程

第三步 设置工程名与工程所在路径和使用的Scala版本后完成创建

第四步 创建scala文件

项目目录的”src”处单机鼠标右键依次选择”New”->”Package”创建一个包名为”com.wordcountdemo”,并在该包处单机右键依次选择”New”->”scala class”创建文件命名为wordcount

第五步:导入依赖包

在IDEA中导入Spark依赖包,在菜单中依次选择”File”->”Project Structure”->”Libraries”后单击”+”号按钮选择”Java”选项,在弹出的对话框中找到spark-assembly-1.6.1-hadoop2.6.0.jar依赖包点击”OK”将所有依赖包加载到工程中,结果如图X所示。

第六步:引入本程序所需要的全部方法

注意此处使用了三个spark2中没有的jar包分别为kafka_2.11-0.8.2.1.jar、
metrics-core-2.2.0.jar、spark-streaming-kafka_2.11-1.6.3.jar。

import java.sql.DriverManager                       //连接数据库
import kafka.serializer.StringDecoder                  //序列化数据
import org.apache.spark.streaming.dstream.DStream      //接收输入数据流
import org.apache.spark.streaming.kafka.KafkaUtils      //连接Kafka
import org.apache.spark.streaming.{Seconds, StreamingContext}  //实时流处理
import org.apache.spark.SparkConf                      //spark程序的入口函数

结果如图所示。

第七步:创建main函数与Spark程序入口。

def main(args: Array[String]): Unit = {//创建sparksessionval conf = new SparkConf().setAppName("Consumer")val ssc = new StreamingContext(conf,Seconds(20))  //设置每隔20秒接收并计算一次
}

结果如图所示。

第八步:设置kafka服务的主机地址和端口号,并设置从哪个topic接收数据和设置消费者组

//kafka服务器地址
val kafkaParam = Map("metadata.broker.list" -> "192.168.10.10:9092")
//设置topic
val topic = "testSpark".split(",").toSet
//接收kafka数据
val logDStream: DStream[String] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParam,topic).map(_._2)

第九步:数分析

接收到数据后,对数据进行分析,将服务器日志数据按照空格进行拆分,并分别统计出阶段时间内的网站浏览量、用户注册数量和用户的跳出率并将统计结果转换为键值对类型的RDD。

   //拆分接收到的数据val RDDIP =logDStream.transform(rdd=>rdd.map(x=>x.split(" ")))//进行数据分析val pv = RDDIP.map(x=>x(0)).count().map(x=>("pv",x))   //用户浏览量val jumper = RDDIP.map(x=>x(0)).map((_,1)).reduceByKey(_+_).filter(x=>x._2 == 1).map(x=>x._1).count.map(x=>("jumper",x))   //跳出率val reguser =RDDIP.filter(_(8).replaceAll("\"","").toString == "/member.php?mod=register&inajax=1").count.map(x=>("reguser",x))  //注册用户数量

第十步:保存计算结果

遍历统计结果RDD取出键值对中的值并分别分别将分析结果保存到pvtab、jumpertab和regusetab表中,最后启动Spark Streaming程序。

  pv.foreachRDD(line =>line.foreachPartition(rdd=>{rdd.foreach(word=>{val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")val format = new java.text.SimpleDateFormat("yyyy-MM-dd H:mm:ss")val dateFf= format.format(new java.util.Date())val sql = "insert into pvtab(time,pv) values("+"'"+dateFf+"'," +"'"+word._2+"')"conn.prepareStatement(sql).executeUpdate()})}))jumper.foreachRDD(line =>line.foreachPartition(rdd=>{rdd.foreach(word=>{val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")val format = new java.text.SimpleDateFormat("yyyy-MM-dd H:mm:ss")val dateFf= format.format(new java.util.Date())val sql = "insert into jumpertab(time,jumper) values("+"'"+dateFf+"'," +"'"+word._2+"')"conn.prepareStatement(sql).executeUpdate()})}))reguser.foreachRDD(line =>line.foreachPartition(rdd=>{rdd.foreach(word=>{val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")val format = new java.text.SimpleDateFormat("yyyy-MM-dd H:mm:ss")val dateFf= format.format(new java.util.Date())val sql = "insert into regusetab(time,reguse) values("+"'"+dateFf+"'," +"'"+word._2+"')"conn.prepareStatement(sql).executeUpdate()})}))ssc.start()        //启动Spark Streaming程序

结果如图所示。

第十一步 数据库设计

创建一个数据库名为“test”,并在该库中创建三个表分别名为"jumpertab"、“pvtab”、“regusetab”,数据库结构如下图所示

jumpertab表

pvtab表

regusetab表

四、编译运行

将程序编辑为jar包提交到集群中运行。

第一步、将工程添加到jar文件并设置文件名称

选择“File”-“Project Structure”命令,在弹出的对话框中选择“Artifacts”按钮,选择“+”下的“JAR”->“Empty”在随后弹出的对话框中“NAME”处设置JAR文件的名字为“WordCount”,并双击右侧“firstSpark”下的“’firstSpark’compile output”将其加载到左侧,表示已经将工程添加到JAR包中然后点击“OK”按钮,如下图所示。

第二步、生成jar包

点击菜单栏中的“Build”->“Build Artifacts…”按钮在弹出的对话框中单击“Build”按钮,jar包生成后工程根目录会自动创建一个out目录在目录中可以看到生成的jar包,结果如下图所示。

第三步、提交运行Spark Streaming程序

[root@master bin]# ./spark-submit --master local[*] --class  com.spark.streaming.sparkword /usr/local/Streaminglog.jar

结果如下图所示

第四步:查看数据库

完整代码如下

package spark
import java.sql.DriverManager
import java.util.Calendarimport kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
object kafkaspark {def main(args: Array[String]): Unit = {//    创建sparksessionval conf = new SparkConf().setAppName("Consumer")val ssc = new StreamingContext(conf,Seconds(1))val kafkaParam = Map("metadata.broker.list" -> "192.168.10.10:9092")val topic = "testSpark".split(",").toSet//接收kafka数据val logDStream: DStream[String] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParam,topic).map(_._2)//拆分接收到的数据val RDDIP =logDStream.transform(rdd=>rdd.map(x=>x.split(" ")))//进行数据分析val pv = RDDIP.map(x=>x(0)).count().map(x=>("pv",x))val jumper = RDDIP.map(x=>x(0)).map((_,1)).reduceByKey(_+_).filter(x=>x._2 == 1).map(x=>x._1).count.map(x=>("jumper",x))val reguser =RDDIP.filter(_(8).replaceAll("\"","").toString == "/member.php?mod=register&inajax=1").count.map(x=>("reguser",x))//将分析结果保存到MySQL数据库pv.foreachRDD(line =>line.foreachPartition(rdd=>{rdd.foreach(word=>{val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")val format = new java.text.SimpleDateFormat("H:mm:ss")val dateFf= format.format(new java.util.Date())var cal:Calendar=Calendar.getInstance()cal.add(Calendar.SECOND,-1)var Beforeasecond=format.format(cal.getTime())val date = Beforeasecond.toString+"-"+dateFf.toStringval sql = "insert into pvtab(time,pv) values("+"'"+date+"'," +"'"+word._2+"')"conn.prepareStatement(sql).executeUpdate()})}))jumper.foreachRDD(line =>line.foreachPartition(rdd=>{rdd.foreach(word=>{val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")val format = new java.text.SimpleDateFormat("H:mm:ss")val dateFf= format.format(new java.util.Date())var cal:Calendar=Calendar.getInstance()cal.add(Calendar.SECOND,-1)var Beforeasecond=format.format(cal.getTime())val date = Beforeasecond.toString+"-"+dateFf.toStringval sql = "insert into jumpertab(time,jumper) values("+"'"+date+"'," +"'"+word._2+"')"conn.prepareStatement(sql).executeUpdate()})}))reguser.foreachRDD(line =>line.foreachPartition(rdd=>{rdd.foreach(word=>{val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")val format = new java.text.SimpleDateFormat("H:mm:ss")val dateFf= format.format(new java.util.Date())var cal:Calendar=Calendar.getInstance()cal.add(Calendar.SECOND,-1)var Beforeasecond=format.format(cal.getTime())val date = Beforeasecond.toString+"-"+dateFf.toStringval sql = "insert into regusetab(time,reguse) values("+"'"+date+"'," +"'"+word._2+"')"conn.prepareStatement(sql).executeUpdate()})}))val num = logDStream.map(x=>(x,1)).reduceByKey(_+_)num.print()//启动Streamingssc.start()ssc.awaitTermination()ssc.stop()}
}

Flume+Kafka+Spark Streaming+MySQL实时日志分析相关推荐

  1. Flume+Kafka+Spark Streaming实现大数据实时流式数据采集

    近年来,随着企业信息化建设的飞速发展,大数据应用的问题越来越备受关注.很多企业投入大量的人力.物力和财力建设企业大数据平台,平台建设工作涵盖数据采集.数据处理.数据存储.数据服务.数据展示以及数据质量 ...

  2. 【python+flume+kafka+spark streaming】编写word_count入门示例

    一. 整体架构的一些理解 1.整体架构的理解: 架构中的角色分为了数据采集,数据缓冲,还有数据处理. flume由于输入和输出的接口众多,于是利用这特点来实现无编程的数据采集. 无编程的数据采集,我是 ...

  3. linux流式访问日志,流式实时日志分析系统的实现原理

    我们知道网站用户访问流量是不间断的,基于网站的访问日志,即 Web log 分析是典型的流式实时计算应用场景.比如百度统计,它可以做流量分析.来源分析.网站分析.转化分析.另外还有特定场景分析,比如安 ...

  4. 使用 Kafka 和 Spark Streaming 构建实时数据处理系统

    使用 Kafka 和 Spark Streaming 构建实时数据处理系统  来源:https://www.ibm.com/developerworks,这篇文章转载自微信里文章,正好解决了我项目中的 ...

  5. 大数据主题分享第三期 | 基于ELK的亿级实时日志分析平台实践

    猫友会希望建立更多高质量垂直细分社群,本次是"大数据学习交流付费群"的第三期分享. "大数据学习交流付费群"由猫友会联合,斗鱼数据平台总监吴瑞诚,卷皮BI技术总 ...

  6. grafana计算不同时间的差值_大数据时代!如何基于Spark Streaming构建实时计算平台...

    随着互联网技术的迅速发展,用户对于数据处理的时效性.准确性与稳定性要求越来越高,如何构建一个稳定易用并提供齐备的监控与预警功能的实时计算平台也成了很多公司一个很大的挑战. 自2015年携程实时计算平台 ...

  7. 实时计算业务介绍实时日志分析

    4.1 实时计算业务介绍 学习目标 目标 了解实时计算的业务需求 知道实时计算的作用 应用 无 4.1.1 实时计算业务需求 实时(在线)计算: 解决用户冷启动问题 实时计算能够根据用户的点击实时反馈 ...

  8. 基于Flink的实时日志分析系统实践

    前言 目前业界基于 Hadoop 技术栈的底层计算平台越发稳定成熟,计算能力不再成为主要瓶颈. 多样化的数据.复杂的业务分析需求.系统稳定性.数据可靠性, 这些软性要求, 逐渐成为日志分析系统面对的主 ...

  9. Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率

    一. 实战 1.用Spark Streaming实现实时WordCount 架构图: 说明:在hadoop1:9999下的nc上发送消息,消费端接收消息,然后并进行单词统计计算. * 2.安装并启动生 ...

最新文章

  1. 教程:14、系统性能分析
  2. 线程常用方法,线程安全和同步锁
  3. python处理csv文件案例_python3读取csv文件任意行列代码实例
  4. 推荐策略产品经理:剖析协同过滤(千人千面推荐的核心 )
  5. 深入探讨傅立叶变换、拉普拉斯变换、Z变换的联系与应用
  6. 【51NOD】1486 大大走格子
  7. 飞行模式的开启和关闭
  8. how to get the space size of some tables in one database?
  9. WHEREIS(1)
  10. xForm应用开发手册
  11. C语言去除字符串空格
  12. 【深度学习】你该会的精选面试题(一)
  13. iOS 疑难杂症 — — 推送本地国际化 loc-key 本地化失败的问题
  14. Windows 10 64位系统中安装加密狗驱动出现the returncode is 3003错误的解决方法
  15. 离散数学(下)第十章 群与环
  16. python解析excel公式_读取Excel单元格值,而不是计算它的公式-openpyx
  17. Python核心编程 课后习题 第一部分
  18. 驾驭你的“职场布朗运动” (作者李云)
  19. 乐视 无法播放服务器文件夹,乐视电视最新常见问题及解决方法分享!
  20. 12306 Tickets自动化购票软件操作说明与获取

热门文章

  1. Centos6.5搭建mongodb分片
  2. MySQL常见故障处理手册_转
  3. 中文乱码在java中URLEncoder.encode方法要调用两次解决
  4. GCC编译优化指南【作者:金步国】
  5. 好书一本:《设计心理学》
  6. java 中的static 用法
  7. 【大数相乘】LeetCode 43. Multiply Strings
  8. 【python】热力图绘制: intensity_heatmap,density_heatmap
  9. jQuery ajax error函数(交互错误信息的获取)
  10. SQL 之连接查询