目录

日志数据内容

用户行为日志分析的意义

离线数据处理流程

需求分析

数据清洗

解析访问日志

使用github上的开源项目

对日志进行统计分析

统计最受欢迎的TOPN的视频访问次数

按照地市统计imooc主站最受欢迎的TOPN课程

按流量统计imooc主站最受欢迎的TOPN课程

调优点

EChart展示图形化界面

静态数据展示

动态数据展示


日志数据内容

  1. 访问系统属性,操作系统,浏览器
  2. 访问特征,点击的url,从那个url跳转过来的,页面的停留时间
  3. 访问信息,session_id,访问ip(访问城市)

用户行为日志分析的意义

  • 网站的眼睛
  • 网站的神经,在网站的布局是否合理,导航是否清晰,内容是否合理
  • 网站的大脑,分析的目标,要做哪些优化。

离线数据处理流程

  • 1)数据采集 flume来采集
  • 2)数据清洗,采集过来的数据有很多不符合日志规范的脏数据
  • 3)数据处理。 按照我们的需求进行相应业务的统计的分析
  • 4)处理结果入库  结果可以存放到RDBMS\noSQl
  • 5)数据可视化展示  ECharts\HUE\Zeppelin

需求分析

  1. 统计imooc主站最受欢迎的课程/手机的TOpN访问次数
  2. 按照地市统计imooc主站最受欢迎的TOPN课程
  3. 按流量统计imooc主站最受欢迎的TOPN课程

数据清洗

  1. 需要先将日志文件中的日期格式进行转换

simpleDataformat是线程不安全的。

package com.rachelimport java.text.SimpleDateFormat
import java.util.{Date, Locale}import org.apache.commons.lang3.time.FastDateFormatobject DateUtils {//输入文件日期格式val YYYYMMDDHHMM_TIME_FORMAT = FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:ss Z",Locale.ENGLISH)//目标格式val TARGET_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")def parse(time:String)={TARGET_FORMAT.format(new Date(getTime(time)))}def getTime(time:String) ={try{YYYYMMDDHHMM_TIME_FORMAT.parse(time.substring(time.indexOf("[")+1,time.lastIndexOf("]"))).getTime}catch {case e:Exception =>{0l}}}def main(args: Array[String]): Unit = {println(parse("[10/Nov/2016:00:01:02 +0800]"))}
}
package com.rachelimport org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContextobject SparkStatFormatJob {def main(args: Array[String]): Unit = {val spark =  SparkSession.builder().appName("SparkStatFormatJob").master("local[2]").getOrCreate()val access = spark.sparkContext.textFile("file:///f:/java/10000_access.log")//access.take(10).foreach(println)access.map(line => {val splits = line.split(" ")val ip = splits(0)val time = splits(3)+" "+splits(4)val url = splits(11).replaceAll("\"","")val traffic = splits(9)// (ip,DateUtils.parse(time),url)DateUtils.parse(time)+"\t"+url+"\t"+traffic+"\t"+ip+"\t"}).saveAsTextFile("file:///f:/java/output_10000_access.log")spark.stop()}
}

解析访问日志

  • 使用SparkSql解析访问日志
  • 解析出课程编号、类型
  • 根据IP解析出城市信息 val city = IpUtils.getCity(ip)
  • 使用SparkSql将访问时间按天进行分区。
package com.rachelimport org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}object AccessConvertUtil {val struct = StructType {Array(StructField("url", StringType),StructField("cmsType", StringType),StructField("cmsId", LongType),StructField("traffic", LongType),StructField("ip", StringType),StructField("city", StringType),StructField("time", StringType),StructField("day", StringType))}/*** 根据输入的每一行信息转换成输出的样式* @param log  输入的每一行记录信息*/def parseLog(log:String) = {try{val splits = log.split("\t")val url = splits(1)val traffic = splits(2).toLongval ip = splits(3)val domain = "http://www.imooc.com/"val cms = url.substring(url.indexOf(domain) + domain.length)val cmsTypeId = cms.split("/")var cmsType = ""var cmsId = 0lif(cmsTypeId.length > 1) {cmsType = cmsTypeId(0)cmsId = cmsTypeId(1).toLong}val city = IpUtils.getCity(ip)val time = splits(0)val day = time.substring(0,10).replaceAll("-","")//这个row里面的字段要和struct中的字段对应上Row(url, cmsType, cmsId, traffic, ip, city, time, day)} catch {case e:Exception => Row(0)}}
}
package com.rachelimport org.apache.spark.sql.{SaveMode, SparkSession}object SparkStatCleanJob {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("SparkStatCleanJob").config("spark.sql.parquet.compression.codec","gzip").master("local[2]").getOrCreate()val accessRDD = spark.sparkContext.textFile("file:///f:/java/access.log")val accessDF = spark.createDataFrame(accessRDD.map(x => AccessConvertUtil.parseLog(x)),AccessConvertUtil.struct)//  accessDF.printSchema()
//  accessDF.show(false)accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite).partitionBy("day").save("file:///f:/java/imooc/clean2")}
}

这一过程中使用了ipdatabase项目来解析ip地址。所以需要在自己的项目中使用别人的项目。

使用github上的开源项目

  1. git clone https://github.com/wzhe06/ipdatabase
  2. #编译下载的项目(需要切换到项目目录下)
    mvn clean package -DskipTests
  3. #将依赖的jar包导到自己的maven仓库
    mvn install:install-file -Dfile=F:/开发文档/ipdatabase/target/ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstart -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar

对日志进行统计分析

统计最受欢迎的TOPN的视频访问次数

package com.rachelimport org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
object TopNStatJob {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().config("spark.sql.sources.partitionColumnTypeInference.enabled","false").appName("TopNStatJob").master("local[2]").getOrCreate()val accessDF =  spark.read.format("parquet").load("file:///f:/java/imooc/clean2")//  accessDF.printSchema()
//  accessDF.show(false)videoAccessTopNStat(spark,accessDF)spark.stop()}/*** 最受欢迎的TOPN课程,* 这其中包含了两种方法操作DF,一种是函数,一种是以SQL的方式对DF进行统计分析* @param spark* @param accessDF*/def videoAccessTopNStat(spark:SparkSession,accessDF:DataFrame): Unit ={import  spark.implicits._
//  val videoAccessTopDF =  accessDF.filter($"day" === "20170511" && $"cmsType" === "video")
//  .groupBy("day","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)accessDF.createOrReplaceTempView("access_logs")val videoAccessTopDF = spark.sql("select day, cmsId, count(1) as times from access_logs " +"where day = '20170511' and cmsType = 'video'" +"group by day ,cmsId order by times desc")videoAccessTopDF.show(false)}
}

统计结果展示如下:

+--------+-----+------+
|day     |cmsId|times |
+--------+-----+------+
|20170511|14540|111027|
|20170511|4000 |55734 |
|20170511|14704|55701 |
|20170511|14390|55683 |
|20170511|14623|55621 |
|20170511|4600 |55501 |
|20170511|4500 |55366 |
|20170511|14322|55102 |
+--------+-----+------+
  • 将统计结果写入Mysql中,这个过程是从底层往上层封装的

1;编写MySQL的连接工具类

package com.rachelimport java.sql.{DriverManager, PreparedStatement}import com.mysql.jdbc.Connection/*** mysql操作工具类*/
object MySQLUtils {/*** 获取数据库连接*/def getConnection()={DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_project?user=root&&password=root")}/*** 释放数据库连接等资源* @param connection* @param pstmt*/def release(connection: Connection, pstmt: PreparedStatement): Unit = {try {if (pstmt != null) {pstmt.close()}} catch {case e: Exception => e.printStackTrace()} finally {if (connection != null) {connection.close()}}}
}

2.创建数据库表

create table day_video_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
times bigint(10) not null,
primary key (day, cms_id)
);

3:在POM.xml文件中添加JDBC依赖包

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version>
</dependency>

4:和表结构对应的课程访问次数实体类

package com.rachel/*** 每天课程访问次数实体类*/
case class DayVideoAccessStat (day: String, cmsId: Long, times: Long)

5:DAO层封装

package com.rachelimport java.sql.{Connection, PreparedStatement}import scala.collection.mutable.ListBufferobject StatDAO {/*** 批量保存DayVideoAccessStat到数据库** @param list*/def insertDayVideoAccessTopN(list: ListBuffer[DayVideoAccessStat]): Unit = {var connection: Connection = nullvar pstmt: PreparedStatement = nulltry {connection = MySQLUtils.getConnection()//使用批处理 设置手动提交connection.setAutoCommit(false)val sql = "insert into day_video_access_topn_stat(day,cms_id,times) values(?,?,?)"pstmt = connection.prepareStatement(sql)for (ele <- list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.cmsId)pstmt.setLong(3, ele.times)pstmt.addBatch()}//执行批量处理pstmt.executeBatch()//手动提交connection.commit()}catch {case e: Exception => e.printStackTrace()}finally {}}
}

6:调用DAO实现统计结果写入MySQL

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._import scala.collection.mutable.ListBuffer
object TopNStatJob {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().config("spark.sql.sources.partitionColumnTypeInference.enabled","false").appName("TopNStatJob").master("local[2]").getOrCreate()val accessDF =  spark.read.format("parquet").load("file:///f:/java/imooc/clean2")//  accessDF.printSchema()
//  accessDF.show(false)videoAccessTopNStat(spark,accessDF)spark.stop()}/*** 最受欢迎的TOPN课程,* 这其中包含了两种方法操作DF,一种是函数,一种是以SQL的方式对DF进行统计分析* @param spark* @param accessDF*/def videoAccessTopNStat(spark:SparkSession,accessDF:DataFrame): Unit ={accessDF.createOrReplaceTempView("access_logs")val videoAccessTopDF = spark.sql("select day, cmsId, count(1) as times from access_logs " +"where day = '20170511' and cmsType = 'video'" +"group by day ,cmsId order by times desc")try {videoAccessTopDF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayVideoAccessStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val cmsId = info.getAs[Long]("cmsId")val times = info.getAs[Long]("times")/*** 不建议大家在此处进行数据库的数据插入*/list.append(DayVideoAccessStat(day, cmsId, times))})StatDAO.insertDayVideoAccessTopN(list)})} catch {case e:Exception => e.printStackTrace()}}
}

7:执行程序后,查看数据库结果,展示如下

mysql> select * from day_video_access_topn_stat;
+----------+--------+--------+
| day      | cms_id | times  |
+----------+--------+--------+
| 20170511 |   4000 |  55734 |
| 20170511 |   4500 |  55366 |
| 20170511 |   4600 |  55501 |
| 20170511 |  14322 |  55102 |
| 20170511 |  14390 |  55683 |
| 20170511 |  14540 | 111027 |
| 20170511 |  14623 |  55621 |
| 20170511 |  14704 |  55701 |
+----------+--------+--------+
8 rows in set (0.00 sec)

按照地市统计imooc主站最受欢迎的TOPN课程

1.获取统计结果

  /*** 按照地市统计imooc主站最受欢迎的TOPN课程** @param spark* @param accessDF*/def cityAccessTopNStat(spark:SparkSession,accessDF:DataFrame)={import spark.implicits._val cityAccessTopNDF = accessDF.filter($"day" === "20170511" && $"cmsType" === "video").groupBy("day","city","cmsId").agg(count("cmsId").as("times"))cityAccessTopNDF.show(false)val top3DF = cityAccessTopNDF.select(cityAccessTopNDF("day"),cityAccessTopNDF("city"),cityAccessTopNDF("cmsId"),cityAccessTopNDF("times"),
/*通过 ROW_NUMBER() over (PARTITION BY xx ORDER BY ** DESC) as row_number 可以根据xx字段分组,在分组内根据**字段排序,然后赋予每一行数据一个行编号
*/row_number().over(Window.partitionBy(cityAccessTopNDF("city")).orderBy(cityAccessTopNDF("times").desc)).as("times_rank")).filter("times_rank <=3") //.show(false)  //Top3}

2.创建数据库表

mysql> create table day_video_city_access_topn_stat (-> day varchar(8) not null,-> cms_id bigint(10) not null,-> city varchar(20) not null,-> times bigint(10) not null,-> times_rank int not null,-> primary key (day, cms_id, city)-> );
Query OK, 0 rows affected (0.56 sec)

3.创建对应的实体类对象

package com.rachelcase class DayCityVideoAccessStat(day:String, cmsId:Long, city:String,times:Long,timesRank:Int)

4.DAO层编写

 /*** 批量保存DayCityVideoAccessStat到数据库*/def insertDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit = {var connection: Connection = nullvar pstmt: PreparedStatement = nulltry {connection = MySQLUtils.getConnection()connection.setAutoCommit(false) //设置手动提交val sql = "insert into day_video_city_access_topn_stat(day,cms_id,city,times,times_rank) values (?,?,?,?,?) "pstmt = connection.prepareStatement(sql)for (ele <- list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.cmsId)pstmt.setString(3, ele.city)pstmt.setLong(4, ele.times)pstmt.setInt(5, ele.timesRank)pstmt.addBatch()}pstmt.executeBatch() // 执行批量处理connection.commit() //手工提交} catch {case e: Exception => e.printStackTrace()} finally {MySQLUtils.release(connection, pstmt)}}

5.调用DAO层,将统计结果写入到Mysql中

    /*** 将统计结果写入到MySQL中*/try {top3DF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayCityVideoAccessStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val cmsId = info.getAs[Long]("cmsId")val city = info.getAs[String]("city")val times = info.getAs[Long]("times")val timesRank = info.getAs[Int]("times_rank")list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))})StatDAO.insertDayCityVideoAccessTopN(list)})} catch {case e:Exception => e.printStackTrace()}

在Mysql中验证结果:

mysql> select * from day_video_city_access_topn_stat order by city desc,times_rank asc;
+----------+--------+-----------+-------+------------+
| day      | cms_id | city      | times | times_rank |
+----------+--------+-----------+-------+------------+
| 20170511 |  14540 | 浙江省    | 22435 |          1 |
| 20170511 |  14322 | 浙江省    | 11151 |          2 |
| 20170511 |  14390 | 浙江省    | 11110 |          3 |
| 20170511 |  14540 | 广东省    | 22115 |          1 |
| 20170511 |  14623 | 广东省    | 11226 |          2 |
| 20170511 |  14704 | 广东省    | 11216 |          3 |
| 20170511 |  14540 | 安徽省    | 22149 |          1 |
| 20170511 |  14390 | 安徽省    | 11229 |          2 |
| 20170511 |  14704 | 安徽省    | 11162 |          3 |
| 20170511 |  14540 | 北京市    | 22270 |          1 |
| 20170511 |   4600 | 北京市    | 11271 |          2 |
| 20170511 |  14390 | 北京市    | 11175 |          3 |
| 20170511 |  14540 | 上海市    | 22058 |          1 |
| 20170511 |  14704 | 上海市    | 11219 |          2 |
| 20170511 |   4000 | 上海市    | 11182 |          3 |
+----------+--------+-----------+-------+------------+

按流量统计imooc主站最受欢迎的TOPN课程

1:获取统计结果

  /*** 按照流量进行统计*/def videoTrafficsTopNStat(spark: SparkSession, accessDF:DataFrame): Unit = {import spark.implicits._val cityAccessTopNDF = accessDF.filter($"day" === "20170511" && $"cmsType" === "video").groupBy("day","cmsId").agg(sum("traffic").as("traffics")).orderBy($"traffics".desc)//.show(false)}

2.创建数据库表

create table day_video_traffics_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
traffics bigint(20) not null,
primary key (day, cms_id)
);

3.实体类的创建

package com.rachelcase class DayVideoTrafficsStat(day:String,cmsId:Long,traffics:Long)

4.DAO的实现

  /*** 批量保存DayVideoTrafficsStat到数据库*/def insertDayVideoTrafficsAccessTopN(list: ListBuffer[DayVideoTrafficsStat]): Unit = {var connection: Connection = nullvar pstmt: PreparedStatement = nulltry {connection = MySQLUtils.getConnection()connection.setAutoCommit(false) //设置手动提交val sql = "insert into day_video_traffics_topn_stat(day,cms_id,traffics) values (?,?,?) "pstmt = connection.prepareStatement(sql)for (ele <- list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.cmsId)pstmt.setLong(3, ele.traffics)pstmt.addBatch()}pstmt.executeBatch() // 执行批量处理connection.commit() //手工提交} catch {case e: Exception => e.printStackTrace()} finally {MySQLUtils.release(connection, pstmt)}}

5.调用DAO实现往数据库中数据插入

 /*** 将统计结果写入到MySQL中*/try {cityAccessTopNDF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayVideoTrafficsStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val cmsId = info.getAs[Long]("cmsId")val traffics = info.getAs[Long]("traffics")list.append(DayVideoTrafficsStat(day, cmsId,traffics))})StatDAO.insertDayVideoTrafficsAccessTopN(list)})} catch {case e:Exception => e.printStackTrace()}

6:在mysql中查询数据

mysql> select * from  day_video_traffics_topn_stat-> ;
+----------+--------+----------+
| day      | cms_id | traffics |
+----------+--------+----------+
| 20170511 |   4000 | 27847261 |
| 20170511 |   4500 | 27877433 |
| 20170511 |   4600 | 27777838 |
| 20170511 |  14322 | 27592386 |
| 20170511 |  14390 | 27895139 |
| 20170511 |  14540 | 55454898 |
| 20170511 |  14623 | 27822312 |
| 20170511 |  14704 | 27737876 |
+----------+--------+----------+
8 rows in set (0.00 sec)

调优点

  • 控制文件输出的大小----coalesce
  • 分区数据类型的调整
.config("spark.sql.sources.partitionColumnTypeInference.enabled","false")

在执行以下代码的时候

val accessDF =  spark.read.format("parquet").load("file:///f:/java/imooc/clean2")
accessDF.printSchema()

控制台打印出来的schema如下:

 root|-- url: string (nullable = true)|-- cmsType: string (nullable = true)|-- cmsId: long (nullable = true)|-- traffic: long (nullable = true)|-- ip: string (nullable = true)|-- city: string (nullable = true)|-- time: string (nullable = true)|-- day: integer (nullable = true)

而类中定义的Struct是如下所示:

  val struct = StructType {Array(StructField("url", StringType),StructField("cmsType", StringType),StructField("cmsId", LongType),StructField("traffic", LongType),StructField("ip", StringType),StructField("city", StringType),StructField("time", StringType),StructField("day", StringType))}

其中day的类型原本定义的StringType,但是在schema中变成了Integer类型

原因在官网中有解释

Notice that the data types of the partitioning columns are automatically inferred. Currently, numeric data types and string type are supported. Sometimes users may not want to automatically infer the data types of the partitioning columns. For these use cases, the automatic type inference can be configured by spark.sql.sources.partitionColumnTypeInference.enabled, which is default to true. When type inference is disabled, string type will be used for the partitioning columns.
  • Mysql的批量提交,在向mysql数据库中提交数据时,

    connection.setAutoCommit(false)来设置自动提交为false,
    for (ele <- list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.cmsId)pstmt.setLong(3, ele.times)pstmt.addBatch()
    }//执行批量处理
    pstmt.executeBatch()
    //手动提交
    connection.commit()
    这样可以避免关系型数据库频繁的提交任务,可以提升效率。
    

EChart展示图形化界面

  • 静态数据展示

1,IDEA创建J2EE 的 WEB项目

2:测试静态图片是否能正常显示,验证echart是否正常工作

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>test</title><!-- 引入 ECharts 文件 --><script src="js/echarts.min.js"></script>
</head>
<body><!-- 为 ECharts 准备一个具备大小(宽高)的 DOM --><div id="main" style="width: 600px;height:400px;"></div><script type="text/javascript">// 基于准备好的dom,初始化echarts实例var myChart = echarts.init(document.getElementById('main'));var option = {title : {text: '某站点用户访问来源',subtext: '纯属虚构',x:'center'},tooltip : {trigger: 'item',formatter: "{a} <br/>{b} : {c} ({d}%)"},legend: {orient: 'vertical',left: 'left',data: ['直接访问','邮件营销','联盟广告','视频广告','搜索引擎']},series : [{name: '访问来源',type: 'pie',radius : '55%',center: ['50%', '60%'],data:[{value:335, name:'直接访问'},{value:310, name:'邮件营销'},{value:234, name:'联盟广告'},{value:135, name:'视频广告'},{value:1548, name:'搜索引擎'}],itemStyle: {emphasis: {shadowBlur: 10,shadowOffsetX: 0,shadowColor: 'rgba(0, 0, 0, 0.5)'}}}]};myChart.setOption(option);</script>
</body>
</html>

3:效果展示

动态数据展示

1:引入依赖包

  <dependency><groupId>javax.servlet</groupId><artifactId>servlet-api</artifactId><version>2.5</version></dependency><dependency><groupId>javax.servlet</groupId><artifactId>jsp-api</artifactId><version>2.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>net.sf.json-lib</groupId><artifactId>json-lib</artifactId><version>2.4</version><classifier>jdk15</classifier></dependency>

2:创建servlet,此次是由上往下构建

package com.rachel.web;import com.rachel.dao.VideoAccessTopNDAO;
import com.rachel.domain.VideoAccessTopN;
import net.sf.json.JSONArray;import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;public class VideoAccessTopNServlet extends HttpServlet {private VideoAccessTopNDAO dao;@Overridepublic void init() throws ServletException {dao = new VideoAccessTopNDAO();}@Overrideprotected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {String day = req.getParameter("day");List<VideoAccessTopN> results =  dao.query(day);JSONArray json = JSONArray.fromObject(results);resp.setContentType("text/html;charset=utf-8");PrintWriter writer = resp.getWriter();writer.println(json);writer.flush();writer.close();}@Overrideprotected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {this.doGet(req, resp);}
}

3:创建DAO

package com.rachel.dao;import com.rachel.domain.VideoAccessTopN;
import com.rachel.utils.MySQLUtils;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class VideoAccessTopNDAO {static Map<String,String> courses = new HashMap<String,String>();static {courses.put("4000", "MySQL优化");courses.put("4500", "Crontab");courses.put("4600", "Swift");courses.put("14540", "SpringData");courses.put("14704", "R");courses.put("14390", "机器学习");courses.put("14322", "redis");courses.put("14390", "神经网络");courses.put("14623", "Docker");}/*** 根据课程编号查询课程名称*/public String getCourseName(String id) {return courses.get(id);}/*** 根据day查询当天的最受欢迎的Top5课程* @param day*/public List<VideoAccessTopN> query(String day) {List<VideoAccessTopN> list = new ArrayList<VideoAccessTopN>();Connection connection = null;PreparedStatement psmt = null;ResultSet rs = null;try {connection = MySQLUtils.getConnection();String sql = "select cms_id ,times  from  day_video_access_topn_stat where day =? order by times desc limit 5";psmt = connection.prepareStatement(sql);psmt.setString(1, day);rs = psmt.executeQuery();VideoAccessTopN domain = null;while(rs.next()) {domain = new VideoAccessTopN();/*** TODO... 在页面上应该显示的是课程名称,而我们此时拿到的是课程编号** 如何根据课程编号去获取课程名称呢?* 编号和名称是有一个对应关系的,一般是存放在关系型数据库*/domain.setName(getCourseName(rs.getLong("cms_id")+""));domain.setValue(rs.getLong("times"));list.add(domain);}}catch (Exception e) {e.printStackTrace();} finally {MySQLUtils.release(connection, psmt, rs);}return list;}public static void main(String[] args) {VideoAccessTopNDAO dao = new VideoAccessTopNDAO();List<VideoAccessTopN> list = dao.query("20170511");for(VideoAccessTopN result: list) {System.out.println(result.getName() + " , " + result.getValue());}}}

4:创建javaBean

package com.rachel.domain;public class VideoAccessTopN {private String name;private long value ;public String getName() {return name;}public void setName(String name) {this.name = name;}public long getValue() {return value;}public void setValue(long value) {this.value = value;}
}

5:连接数据库

package com.rachel.utils;import java.sql.*;public class MySQLUtils {private static final String USERNAME = "root";private static final String PASSWORD = "root";private static final String DRIVERCLASS = "com.mysql.jdbc.Driver";private static final String URL = "jdbc:mysql://localhost:3306/imooc_project";/*** 获取数据库连接*/public static Connection getConnection() {Connection connection = null;try {Class.forName(DRIVERCLASS);connection = DriverManager.getConnection(URL,USERNAME,PASSWORD);} catch (Exception e) {e.printStackTrace();}return connection;}/*** 释放资源*/public static void release(Connection connection, PreparedStatement pstmt, ResultSet rs) {if(rs != null) {try {rs.close();} catch (SQLException e) {e.printStackTrace();}}if(pstmt != null) {try {pstmt.close();} catch (SQLException e) {e.printStackTrace();}}if(connection != null) {try {connection.close();} catch (SQLException e) {e.printStackTrace();}}}public static void main(String[] args) {System.out.println(getConnection());}
}

6:因为想用servlet,所以前台用ajax调用后台代码,引入jquery,并修改前端代码

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>test</title><!-- 引入 ECharts 文件 --><script src="js/echarts.min.js"></script><SCRIPT src="js/jquery.js"></SCRIPT>
</head>
<body><!-- 为 ECharts 准备一个具备大小(宽高)的 DOM --><div id="main" style="width: 600px;height:400px;"></div><script type="text/javascript">// 基于准备好的dom,初始化echarts实例var myChart = echarts.init(document.getElementById('main'));var option = {title : {text: '最受欢迎的TOPN课程',x:'center'},tooltip : {trigger: 'item',formatter: "{a} <br/>{b} : {c} ({d}%)"},legend: {orient: 'vertical',left: 'left',data: []},series : [{name: '访问次数',type: 'pie',radius : '55%',center: ['50%', '60%'],data:(function(){var courses = [];$.ajax({type:"GET",url:"/stat?day=20170511",dataType:'json',async:false,success:function(result) {for(var i=0;i<result.length; i++){courses.push({"value": result[i].value,"name":result[i].name});}}})return courses;})(),itemStyle: {emphasis: {shadowBlur: 10,shadowOffsetX: 0,shadowColor: 'rgba(0, 0, 0, 0.5)'}}}]};myChart.setOption(option);</script>
</body>
</html>

7:配置web.xml中url和servlet类的对应

<!DOCTYPE web-app PUBLIC"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN""http://java.sun.com/dtd/web-app_2_3.dtd" ><web-app><display-name>Archetype Created Web Application</display-name><servlet><servlet-name>stat</servlet-name><servlet-class>com.rachel.web.VideoAccessTopNServlet</servlet-class></servlet><servlet-mapping><servlet-name>stat</servlet-name><url-pattern>/stat</url-pattern></servlet-mapping>
</web-app>

8:重启服务,查看效果

Zeppelin的数据展示

1:安装下载Zeppelin

下载地址:http://www.apache.org/dyn/closer.cgi/zeppelin/zeppelin-0.8.0/zeppelin-0.8.0-bin-all.tgz

2:解压安装

tar -zxvf zeppelin-0.8.0-bin-all.tgz 

3:启动服务

[hadoop@hadoop000 zeppelin-0.8.0-bin-all]$ cd bin
[hadoop@hadoop000 bin]$ ./zeppelin-daemon.sh start

4:访问http://192.168.1.6:8080

5:配置JDBC连接Mysql

6;新建note

基于SparkSql的日志分析实战相关推荐

  1. 第十一篇|基于SparkSQL的电影分析项目实战

    在之前的分享中,曾系统地介绍了Spark的基本原理和使用方式,感兴趣的可以翻看之前的分享文章.在本篇分享中,将介绍一个完整的项目案例,该案例会真实还原企业中SparkSQL的开发流程,手把手教你构建一 ...

  2. 基于SQL的日志分析工具myselect

    基本介绍 程序开发者常常要分析程序日志,包括自己打印的日志及使用的其他软件打印的日志,如php,nginx日志等,linux环境下分析日志有一些内置命令能够使用,如grep,sort,uniq,awk ...

  3. Java中GC (Allocation Failure)日志分析实战

    概述 博主在最近使用spring batch的过程当中遇到了内存容量耗尽程序崩溃的问题,于是决定将此次的内存问题分析通过本篇博客记录下来. 在分析gc日实例志之前,我们先通过一条<深入理解jav ...

  4. 降本增效工具系列(二):IOS 系统 Crash 日志分析实战

    文章目录 一.前言 二.Crash 崩溃报告分析实战 三.常见 Exception Type 四.拓展阅读 一.前言 当应用程序在IOS 设备上崩溃(例如,闪退)时,一份"Crash崩溃报告 ...

  5. GoAccess 被设计成快速的并基于终端的日志分析工具

    通用统计: 此面板展示了几个主要指标,比如:有效和无效请求的数量,分析这些数据所花费的时间,独立访客的情况,请求的文件,静态文件(CSS, ICO, JPG 等)的完整URL,404错误,被解析的日志 ...

  6. 【Hadoop】基于Hadoop/Hbase/Hive的小型离线服务器日志分析应用

    ##1.项目简介 本项目主要设计一个基于Hadoop的日志分析系统.其中的日志数据主要来自于某系统开发测试期间的服务器访问日志数据(Tomcat),通过对这些日志数据利用正则表达式等技术手段进行处理, ...

  7. 业界重磅新书《UNIX/Linux网络日志分析与流量监控》首发

    <UNIX/Linux网络日志分析与流量监控> 出版社官网: http://www.cmpbook.com/stackroom.php?id=39384 每本图书附赠51CTO学院的价值1 ...

  8. 小白玩大数据日志分析系统经典入门实操篇FileBeat+ElasticSearch+Kibana 实时日志系统搭建从入门到放弃

    大数据实时日志系统搭建 距离全链路跟踪分析系统第二个迭代已经有一小阵子了,由于在项目中主要在写ES查询\Storm Bolt逻辑,都没有去搭建实时日志分析系统,全链路跟踪分析系统采用的开源产品组合为F ...

  9. 软件定义数据中心(SDDC)的日志分析

    现代化基础设施不断生成日志数据的速度已远远超过人类分析的速度.而且,现在的数据中心可以在脚本控制下建立或拆除,其活动数量和数据量都在呈指数增长. 传统的数据分析法是每周或每天依照列表审查日志文件,这种 ...

最新文章

  1. FM:代谢无机硫化合物的古菌Ferroplasma可介导细胞外电子传递
  2. ExecutorService线程池
  3. rsync 模块同步失败
  4. 百分点认知智能实验室:NLP模型开发平台在舆情分析中的设计和实践(下)
  5. linux两个网段默认网关_Linux下配置多网卡多网关
  6. java inputstream read_20191209-java部分流处理
  7. 关于word中插入知网e-study插件问题
  8. X-UA-Compatible 解决IE浏览器样式不兼容问题
  9. 实验六 团队作业2—团队项目评审与团队项目选题报告
  10. 最详细 Spring Boot 入门(-)
  11. HttpClient发送get,post接口请求
  12. Linux安装redis及redis的php扩展。
  13. 顺序容器和关联容器添加新元素方法详解
  14. 软件测试-常见数据库笔试题
  15. 为软件简单加密的小程序,附源码
  16. android 动态透明图片下载,动态透明壁纸软件下载-动态透明壁纸 安卓版v1.110-PC6安卓网...
  17. Vue 接入高德地图
  18. Pyrene-PEG-Biotin,芘丁酸聚乙二醇生物素,Biotin-PEG-Pyrene
  19. c语言alpha通道的用法,alpha通道最主要的用途是什么
  20. quora ios_企业家的Quora指南

热门文章

  1. 安装homeassistant+python3.6
  2. 寻找基于JS的三维GIS二次开发团队
  3. 20.闪光灯打闪和拍照不同步,拍照暗
  4. IHE那些事儿(2)
  5. office2010安装教程
  6. 浅谈金达莱花在朝鲜族家居装饰中的传承发展
  7. 软著申请流程及所需材料
  8. 个人申请官方微信支付接口,即时到账!还支持个人小程序支付!附支付demo...
  9. python快速编程入门课本中的名片管理器_python打造名片管理系统,小白入门最佳练手项目!...
  10. aes加密算法python语言实现_如何用Python实现AES CCM的加解密