接收日志并发送到kafka:gmall-logger模块--SpringBoot的部署

日志前加一个ts时间戳;org.slf4j.LoggerFactory,slf4j是一个接口,它会去找实现类;LoggeerFactory默认的会在jar包中找实现类;

logging(它是LoggeFactory默认使用的)和log4j是竞争关系,所以要在gmall-logger.pom.xml文件中加入exclusions把logging给排除了

  <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j</artifactId><version>1.3.8.RELEASE</version></dependency></dependencies>

com.atguigu.gmall.logger.controller.LoggerController@RestController
public class LoggerController {@AutowiredKafkaTemplate kafkaTemplate;private static final Logger logger = LoggerFactory.getLogger(LoggerController.class);@PostMapping("log")public String doLog(@RequestParam("log") String log){JSONObject jsonObject = JSON.parseObject(log);jsonObject.put("ts", System.currentTimeMillis());//System.out.println(log);// 落盘成为日志文件     // log4j
        logger.info(jsonObject.toJSONString());//发送kafkaif ("startup".equals(jsonObject.getString("type"))){kafkaTemplate.send(GmallConstant.KAFKA_TOPIC_STARTUP, jsonObject.toJSONString());}else {kafkaTemplate.send(GmallConstant.KAFKA_TOPIC_EVENT, jsonObject.toJSONString());}return "success";}
}

利用resources/ log4j.properties进行log日志的落盘:

log4j.appender.atguigu.MyConsole=org.apache.log4j.ConsoleAppender  //怎么写这个日志;类型--控制台
log4j.appender.atguigu.MyConsole.target=System.err  //控制台有两种:System.out日志颜色黑色和System.err日志是红色的
log4j.appender.atguigu.MyConsole.layout=org.apache.log4j.PatternLayout    //自定义的,除了要打印的日志级别,还要打印什么
log4j.appender.atguigu.MyConsole.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n  //格式,p是日志级别,%m输出的内容,%n是换行;log4j.appender.atguigu.File=org.apache.log4j.DailyRollingFileAppender  //每日滚动文件,每天产生一个文件;
log4j.appender.atguigu.File.file=/opt/module/applog/gmall/log/app.log  //输出的文件路径
log4j.appender.atguigu.File.DatePattern='.'yyyy-MM-dd        //输出文件的后缀; 当天的日志是没有后缀的,一旦过了12点,就有后缀.'yyyy-MM-dd',后缀是日志时间
log4j.appender.atguigu.File.layout=org.apache.log4j.PatternLayout //自定义格式
log4j.appender.atguigu.File.layout.ConversionPattern=%m%n  //要干干净净的打印信息;log4j.logger.com.atguigu.gmall.logger.controller.LoggerController=info,atguigu.File  //某一个类的路径,只监控某个类所产生的日志;log4j.rootLogger=error,atguigu.Myconsole表示根底的,除了上边指定的都是它,首先是精确匹配到info就日志输出就按照它的打印,它们后边的.File或.Myconsole都会输出

日志级别有:级别从低到高 trace、debug、info、warn、error、fatal,如果写info,从低到高比它高的都可以输出出来;

把日志采集模块打包部署到Linux中

  在idea中的maven执行package,把打好的jar包拷贝到Linux 路径下,java -jar  /app/gmall/dw-logger-0.0.1-SNAPSHOT.jar   >/dev/null  2>&1  &

  测试 由windows发送日志到linux 日志落盘

在三台系统同时部署日志采集系统的jar包,分别把/applog/目录拷贝到三台虚拟机上

java -jar /opt/module/applog/gmall/gmall-logger-0.0.1-SNAPSHOT.jar --server.port=8080 >./app.error 2>&1 &[kris@hadoop101 log]$ tail -10f app.log  //监控文件测试数据是否写入

搭建nginx  https://www.cnblogs.com/shengyang17/p/10836168.html ,只需一台部署nginx即可;

由windows发送模拟日志,nginx负责路由,日志服务负责接收。

window发送日志------>>niginx路由---> linux中接收日志的jar存储日志文件并发给kafka--->kafka

更新集群启动脚本 ,加入nginx操作: ./logger-cluster.sh start 启动nginx路由,路由三台虚拟机给接收日志服务的jar包 ,并发给fakfa;

logger-cluster.sh

[kris@hadoop101 gmall]$ vim logger-cluster.sh
#!/bin/bash
JAVA_BIN=/opt/module/jdk1.8.0_144/bin/java
PROJECT=gmall
APPNAME=gmall-logger-0.0.1-SNAPSHOT.jar
SERVER_PORT=8080case $1 in
"start"){for i in hadoop101 hadoop102 hadoop103doecho "========启动日志服务: $i==============="ssh $i  "$JAVA_BIN -Xms32m -Xmx64m -jar /opt/module/applog/$PROJECT/$APPNAME --server.port=$SERVER_PORT >./app.error 2>&1  &"doneecho "==============启动NGINX==============="/opt/module/nginx/sbin/nginx};;"stop"){echo "=============关闭NGINX================="/opt/module/nginx/sbin/nginx -s stopfor i in  hadoop101 hadoop102 hadoop103doecho "========关闭日志服务: $i==============="ssh $i "ps -ef|grep $APPNAME |grep -v grep|awk '{print \$2}'|xargs kill" >/dev/null 2>&1done};;
esac

View Code

ssh之间的互相连接:三种联通方式: ①source /etc/profile;  ②ssh 会读.bashrc cat /etc/profile>>.brashrc   ③$JAVA_BIN

netstart -anp | more 查看端口

当前日志模块:

日活DAU

搭建实时处理模块gmall-realtime:

  消费kafka;利用redis过滤当日已经计入的日活设备;把每批次新增的当日日活信息保存到ES中;从ES中查出数据,发布成数据接口

消费kafka& 利用redis去重

  1、把今日新增的活跃用户保存到redis中;   2、每条数据经过过滤,去掉redis中的已有的用户

  设计Redis的kv; Key:dau:2019-01-22, value: 设备id

  业务类开发

 DauApp.scala    消费kafka中数据(通过MyKafkaUtil获取) --->>利用redis去重---->>保存到ES(通过MyEsUtil工具类)中;

object DauApp {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("gmall").setMaster("local[*]")val streamingContext: StreamingContext = new StreamingContext(new SparkContext(conf),Seconds(5))val inputStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(GmallConstant.KAFKA_TOPIC_STARTUP, streamingContext)//  1 把当日已访问过的用户保存起来 redis//  2  以当日已访问用户清单为依据 ,过滤掉再次访问的请求// 转换case class  补全日期格式val startupLogDStream: DStream[StartUpLog] = inputStream.map { record =>val jsonStr: String = record.value()val startUpLog: StartUpLog = JSON.parseObject(jsonStr, classOf[StartUpLog])//把日期进行补全val dateTimeString: String = new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(startUpLog.ts))val dateTimeArray: Array[String] = dateTimeString.split(" ")startUpLog.logDate = dateTimeArray(0)startUpLog.logHour = dateTimeArray(1).split(":")(0)startUpLog.logHourMinute = dateTimeArray(1)startUpLog}// 去重操作val filterDStream: DStream[StartUpLog] = startupLogDStream.transform { rdd =>//driver 每时间间隔执行一次println("过滤前:" + rdd.count())val jedis: Jedis = RedisUtil.getJedisClientval curDate: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date())val key: String = "dau:" + curDateval dauSet: util.Set[String] = jedis.smembers(key) //当日日活用户清单//使用广播变量val dauBC: Broadcast[util.Set[String]] = streamingContext.sparkContext.broadcast(dauSet)val filterRDD: RDD[StartUpLog] = rdd.filter {startuplog =>!dauBC.value.contains(startuplog.mid)}println("过滤后:" + filterRDD.count())filterRDD}// 考虑到 新的访问可能会出现重复 ,所以以mid为key进行去重,每个mid为小组 每组取其中一个val startupLogGroupDStream: DStream[(String, Iterable[StartUpLog])] = filterDStream.map{startuplog => (startuplog.mid, startuplog)}.groupByKey()val startupLogFilterDistinctDStream: DStream[StartUpLog] = startupLogGroupDStream.flatMap {case (mid, startupLogIter) =>val startupLogOneIter: Iterable[StartUpLog] = startupLogIter.take(1)startupLogOneIter}//  1 把当日已访问过的用户保存到 redisstartupLogFilterDistinctDStream.foreachRDD{rdd =>rdd.foreachPartition{startupLogItr =>val jedis: Jedis = RedisUtil.getJedisClientval startupList: List[StartUpLog] = startupLogItr.toListfor (elem <- startupList) {val key: String = "dau:" + elem.logDatejedis.sadd(key, elem.mid)}jedis.close()//保存到ES
            MyEsUtil.insertEsBatch(GmallConstant.ES_INDEX_DAU, startupList)}}streamingContext.start()streamingContext.awaitTermination()}}

ES

综上 ,在实际环境中,需要一种能够容纳较大规模数据切交互性好的数据库。mysql虽然交互性好,但是容量扩展性有限。

hbase虽然能够支持海量数据,但是查询的灵活度不足。所以ES在容量及交互性上达到一个非常不错的平衡,而且还能支持全文检索。

搭建es集群 https://www.cnblogs.com/shengyang17/p/10597841.html

ES& kibana的启动脚本: ./ek.sh start 

[kris@hadoop101 gmall]$ cat ek.sh
#!/bin/bash
es_home=/opt/module/elasticsearch
kibana_home=/opt/module/kibana/
case $1 in
"start"){echo "=============启动ES集群============="for i in hadoop101 hadoop102 hadoop103dossh $i "source /etc/profile;${es_home}/bin/elasticsearch >/dev/null 2>&1 &"doneecho "=============启动kibana============="nohup ${kibana_home}/bin/kibana >/opt/module/kibana/kibana.log 2>&1 &
};;
"stop"){echo "=============关闭kibana============="ps -ef | grep ${kibana_home} | grep -v grep | awk '{print $2}'|xargs killecho "=============关闭ES集群============="for i in hadoop101 hadoop102 hadoop103dossh $i "ps -ef | grep $es_home | grep -v grep | awk '{print \$2}'|xargs kill" >/dev/null 2>&1done};;
esac

View Code

设计es索引结构

case class startup

case class Startup(mid:String,uid:String,appid:String,area:String,os:String,ch:String,logType:String,vs:String,var logDate:String,var logHour:String,var logHourMinute:String,var ts:Long) {}

View Code

  text 支持分词; keyword 只能全部内容匹配
保存数据之前一定要先定义好mapping:  每个字段的类型 ; 分清楚索引类型

1、需要索引 也需要分词:标题,商品名称,分类名称,  type:“text”

2、需要索引,但不需要分词:类型id , 日期,数量 ,年龄 ,各种id, type:"keyword";

    mid, uid,area,os ,ch ,vs,logDate,logHourMinute,ts

3、既不需要索引,也不需要分词: 不被会用于条件过滤,经过脱敏的字段,138****0101  index:false

##############在ES中创建index
PUT gmall_dau
{"mappings": {"_doc":{"properties":{"mid":{"type":"keyword" ,},"uid":{"type":"keyword"},"area":{"type":"keyword"},"os":{"type":"keyword"},"ch":{"type":"keyword"},"vs":{"type":"keyword"},"logDate":{"type":"keyword"},"logHour":{"type":"keyword"},"logHourMinute":{"type":"keyword"},"ts":{"type":"long"} }}}
}

 在Kibana中进行查询

  如果在在保存| 插入数据的时候,没有先建立mapping的数据结构,则ES是会自动推断;当你再去聚合aggs时,text的字段是不能进行聚合的(如果想要聚合要加 字段.keyword,如下所示),但是好一点的是ES给保存了两份,一个是text类型的字段、另外一个是keyword类型的;浪费了空间,在实际生产环境中是不能使用这种方式的;

GET /gmall_dau/_search
{"query": {"bool": {"filter": {"term": {"logDate": "2019-05-04"}}}}
}######groupby操作  聚合aggregation
GET /gmall_dau/_search
{"query": {"bool": {"filter": {"term": {"logDate": "2019-04-30"}}}},"aggs": {"groupby_logHour": {"terms": {"field": "logHour.keyword","size": 24}}}
}

保存到es中; 关于es java客户端的选择,目前市面上有两类客户端:

  一类是TransportClient 为代表的ES原生客户端,不能执行原生dsl语句必须使用它的Java api方法。
  另外一种是以Rest Api为主的missing client,最典型的就是jest。 这种客户端可以直接使用dsl语句拼成的字符串,直接传给服务端,然后返回json字符串再解析。
两种方式各有优劣,但是最近elasticsearch官网,宣布计划在7.0以后的版本中废除TransportClient。以RestClient为主。

所以在官方的RestClient 基础上,进行了简单包装的Jest客户端,就成了首选,而且该客户端也与springboot完美集成。

数据发布接口

详细见代码

通过gmall-mock模块的类JsonMocker发送数据--->nginx路由--->三台虚拟机的gmall-logger的接收数据并转发给kafka(用的是SpringBoot)--->

启动:gmall-publisher--springBoot的主类: com.atguigu.gmall.publisher.GmallPublisherApplication,给chart的接口,启动

启动:gmall--dw-chart---com.demo.DemoApplication的主类; 接接口展示数据的动态变化

启动:[kris@hadoop101 ~]$ redis-server myredis/redis.conf

启动:gmall-realtime的com.atguigu.gmall.realtime.app.DauApp类,

启动:gmall-mock模块的类JsonMocker发送数据

http://127.0.0.1:8070/realtime-total?date=2019-04-30
[{"name":"新增日活","id":"dau","value":761},{"name":"新增设备","id":"new_mid","value":233}]

http://127.0.0.1:8070/realtime-hour?id=dau&&date=2019-05-04
{"yesterday":{},"today":{"20":26,"21":96}}

通过前端页面展示: http://localhost:8089/index

转载于:https://www.cnblogs.com/shengyang17/p/10853555.html

实时--1.1 日志数据分析相关推荐

  1. 网站流量日志数据分析系统(1)

    1. 点击流数据模型 1.1. 点击流概念 点击流(Click Stream)是指用户在网站上持续访问的轨迹.这个概念更注重用户浏览网站的整个流程.用户对网站的每次访问包含了一系列的点击动作行为,这些 ...

  2. potainer 日志_【转】专治脑壳疼 | 横扫七大烦恼!日志数据分析还可以这样做!...

    近年来,业务系统逐渐往云化架构发展. 云化后,业务系统的复杂性与日俱增,服务器数量大幅度增长,出现故障的几率也不断增加,服务器上面临着以下诸多问题: 这些痛点具体表现为: 孤 跨业务系统日志.流水号. ...

  3. hadoop日志数据分析开发步骤及代码

    日志数据分析: 1.背景 1.1 hm论坛日志,数据分为两部分组成,原来是一个大文件,是56GB:以后每天生成一个文件,大约是150-200MB之间: 1.2 日志格式是apache common日志 ...

  4. 海量日志数据分析与应用》场景介绍及技术点分析

    摘要: 杭州云栖TI专场,大数据workshop:云数据·大计算-海量日志数据分析与应用 分享 接下来几个实验如下: 2.数据采集:日志数据上传 3.数据加工:用户画像 4.数据分析展现:可视化报表及 ...

  5. Android 日志自动分析,Android Log Viewer:一个日志查看器工具,可简化实时对Android日志的分析...

    作为与Cordova一起工作的移动应用程序开发人员, 我知道调试应用程序的本机部分会很困难, 例如, 当你为应用程序创建本机插件时(在这种情况下, Android Studio无效).在试图找出我的应 ...

  6. 日志查看_实时查看容器日志

    实时查看容器日志 介绍一款使用了几个月的开源小工具,Dozzle.它是一款轻量.简单的容器日志查看工具. 本篇将简单介绍如何使用它,包括搭配 Traefik,以及如何快速从源码构建它. 写在前面 这款 ...

  7. 大数据workshop:《云数据·大计算:海量日志数据分析与应用》之《社交数据分析:好友推荐》篇...

    大数据workshop:<云数据·大计算:海量日志数据分析与应用>之<社交数据分析:好友推荐>篇 实验背景介绍 了解更多2017云栖大会·成都峰会 TechInsight &a ...

  8. 网站点击流日志数据分析

    网站点击流日志数据分析 点击流数据:关注的是用户访问网站的轨迹,按照时间来进行先后区分 基本上所有的大型网站都有日志埋点 通过js的方式,可以获取到你再网站上面点击的所有的链接,按钮,商品,等等,包括 ...

  9. 大数据入门第一课 Hadoop基础知识与电商网站日志数据分析

    大数据入门第一课 Hadoop基础知识与电商网站日志数据分析 本课程从Hadoop核心技术入手,以电商项目为依托,带领你从0基础开始上手,逐步掌握大数据核心技术(如:HDFS.YARN.MapRedu ...

最新文章

  1. AB_PLC_入门教程
  2. linux创建多个子进程,[Linux进程]使用fork函数创建多个子进程
  3. js页面文字选中后分享到新浪微博实现
  4. WINCE快捷方式详解
  5. SpringBoot 2 整合 Spring Session 最简操作
  6. 意外断电后,Zabbix_proxy 重启无任何相关进程,如何处理?
  7. Atitit 命令行dsl传递参数的几种模式对比 cli url模式 键值对NameValuePair urlutil String string = -host 101.13
  8. python爬虫爬取公众号_Python爬虫案例:爬取微信公众号文章
  9. c语言代码姓名全拼,巧用拼音首字母输入人名(代码)
  10. 3dmax打开材质编辑器就崩溃
  11. 阿里云linux服务器安装ssh(从登录的坑开始,心酸)
  12. 一文看尽Stata绘图
  13. Python代码写好了怎么运行?为大家详细讲讲如何运行Python代码
  14. 关于红酒品质的python数据分析
  15. python PIL库安装
  16. 项目记录——为沙特客户Android开发Google地图应用
  17. idea mac 查询方法被调用_IntelliJ IDEA For Mac 快捷键
  18. 微信公众号如何做数据分析?4大模块34个关键指标
  19. 如何保证接口幂等性?
  20. TTL电平和RS232电平

热门文章

  1. elementui 嵌套表单验证_elementUI 表单嵌套表格验证,日期选择器联动限制等写法
  2. java左右连接sql写法,join用不了了 sql语句写法,不用join
  3. setuptools Automatic Script Creation
  4. opencv-api drawKeypoints drawMatches
  5. d3 v5 api arrays
  6. 电脑尺寸大小在哪里看_科技资讯:电脑弹出本地计算机上的服务启动后停止的提示在哪里看...
  7. asteroids模板 游戏 java_在高级Java游戏中存储全局/静态变量的最佳方法是什么?...
  8. Java基础学习总结(120)——JVM 参数使用详细说明
  9. 代码管理和检查平台汇总
  10. Git学习总结(11)——Git撤销操作详解