大数据小项目之电视收视率企业项目12
因为环境不足,所以没有用flume收集日志,而是用的已经从别的渠道的日志
日志截图:
编写以下脚本,将收集的日志文件上传至HDFS
#!/bin/bash#set java env export JAVA_HOME=/soft/jdk1.8.0_65/ export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib export PATH=${JAVA_HOME}/bin:$PATH#set hadoop env export HADOOP_HOME=/soft/hadoop-2.7.3/ export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH #日志文件存放的目录 log_src_dir=/home/wang/logs/log/ #待上传文件存放的目录 log_toupload_dir=/home/wang/logs/toupload/#日志文件上传到hdfs的根路径 hdfs_root_dir=/data/clickLog/20180811/#打印环境变量信息 echo "envs: hadoop_home: $HADOOP_HOME"#读取日志文件的目录,判断是否有需要上传的文件 echo "log_src_dir:"$log_src_dir ls $log_src_dir | while read fileName doif [[ "$fileName" == access.log.* ]]; then# if [ "access.log" = "$fileName" ];thendate=`date +%Y_%m_%d_%H_%M_%S`#将文件移动到待上传目录并重命名#打印信息echo "moving $log_src_dir$fileName to $log_toupload_dir"xxxxx_click_log_$fileName"$date"mv $log_src_dir$fileName $log_toupload_dir"xxxxx_click_log_$fileName"$date#将待上传的文件path写入一个列表文件willDoingecho $log_toupload_dir"xxxxx_click_log_$fileName"$date >> $log_toupload_dir"willDoing."$datefidone #找到列表文件willDoing ls $log_toupload_dir | grep will |grep -v "_COPY_" | grep -v "_DONE_" | while read line do#打印信息echo "toupload is in file:"$line#将待上传文件列表willDoing改名为willDoing_COPY_mv $log_toupload_dir$line $log_toupload_dir$line"_COPY_"#读列表文件willDoing_COPY_的内容(一个一个的待上传文件名) ,此处的line 就是列表中的一个待上传文件的pathcat $log_toupload_dir$line"_COPY_" |while read linedo#打印信息echo "puting...$line to hdfs path.....$hdfs_root_dir"hadoop fs -put $line $hdfs_root_dirdone mv $log_toupload_dir$line"_COPY_" $log_toupload_dir$line"_DONE_" done
MR清洗数据
创建工程导入依赖库:
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.3</version></dependency><dependency><groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId><version>1.8</version><scope>system</scope><systemPath>${JAVA_HOME}/lib/tools.jar</systemPath></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-jdbc</artifactId><version>2.1.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.33</version></dependency>
</dependencies>
Mapper类
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import com.it18zhang.project.util.AnalysisNginxTool;public class AccessLogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable>{Text text = new Text();@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String value1 = value.toString();String itr[] = value1.toString().split(" ");if (itr.length < 11){return;}String ip = itr[0];String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]);String url = itr[6];String upFlow = itr[9];text.set(ip+","+date+","+url+","+upFlow);context.write(text, NullWritable.get());} }
Driver类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import com.it18zhang.project.util.DateToNUM;public class AccessLogDriver {public static void main(String[] args) throws Exception {DateToNUM.initMap();Configuration conf = new Configuration();if(args.length != 2){args[0] = "hdfs://wang201/data/access.log";args[1] = "hdfs://wang201/uvout/hive" ;}Job job = Job.getInstance(conf); // 设置一个用户定义的job名称job.setJarByClass(AccessLogDriver.class);job.setMapperClass(AccessLogPreProcessMapper.class); // 为job设置Mapper类// 为job设置Reducer类job.setNumReduceTasks(0);job.setMapOutputKeyClass(Text.class);// 为job的输出数据设置Key类job.setMapOutputValueClass(NullWritable.class);// 为job输出设置value类FileInputFormat.addInputPath(job, new Path(args[0])); // 为job设置输入路径FileOutputFormat.setOutputPath(job, new Path(args[1]));// 为job设置输出路径System.exit(job.waitForCompletion(true) ? 0 : 1); // 运行job }}
AnalysisNginxTool工具类
package com.it18wang;import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date;import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class AnalysisNginxTool {private static Logger logger = LoggerFactory.getLogger(AnalysisNginxTool.class);public static String nginxDateStmpToDate(String date){String res = "";try{SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss");String datetmp = date.split(" ")[0].toUpperCase();String mtmp = datetmp.split("/")[1];DateToNUM.initMap();datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp));System.out.println(datetmp);Date d = df.parse(datetmp);SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd");res = sdf.format(d);}catch (ParseException e){logger.error("error:" + date, e);}return res;}public static long nginxDateStmpToDateTime(String date){long l = 0;try{SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss");String datetmp = date.split(" ")[0].toUpperCase();String mtmp = datetmp.split("/")[1];datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp));Date d = df.parse(datetmp);l = d.getTime();}catch (ParseException e){logger.error("error:" + date, e);}return l;} }
package com.it18wang;import java.util.HashMap; public class DateToNUM {public static HashMap map = new HashMap();public static void initMap(){map.put("JAN", "01");map.put("FEB", "02");map.put("MAR", "03");map.put("APR", "04");map.put("MAY", "05");map.put("JUN", "06");map.put("JUL", "07");map.put("AUG", "08");map.put("SEPT", "09");map.put("OCT", "10");map.put("NOV", "11");map.put("DEC", "12");} }
打jar包
上传jar包到虚拟机,执行命令
hadoop jar /home/wang/mrclick.jar com.it18wang.AccessLogDriver /data/clickLog/20180812 /vout/hive
清洗后的数据
数据导入hive表中
create database mydb;
use mydb;
create external table mydb2.access(ip string,day string,url string,upflow string) row format delimited fields terminated by ',';
create external table pvnum(id int,sum int) row format delimited fields terminated by ',';
数据导入mysql中:
本地Navicat连接虚拟机的mysql
在数据库创建mysql数据:upflow
里面有两个字段
利用sqoop进行从hive数据库中导出到mysql的upflow表中
sqoop export --connect \
jdbc:mysql://wang201:3306/userdb \
--username sqoop --password sqoop --table upflow --export-dir \
/user/hive/warehouse/db2.db/upflow --input-fields-terminated-by ','
转载于:https://www.cnblogs.com/wakerwang/p/9479677.html
大数据小项目之电视收视率企业项目12相关推荐
- 大数据小项目之电视收视率企业项目09--hive环境搭建
Hive是一个数据仓库基础工具在Hadoop中用来处理结构化数据.它架构在Hadoop之上,总归为大数据,并使得查询和分析方便.并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务 ...
- 大数据小项目之电视收视率企业项目07
maven搭建(前提是在你的Windows电脑上有Java,并且能用) 下图为maven官网(http://maven.apache.org/) 安装 Maven 之前要求先确定你的 JDK 已经安装 ...
- 大数据小项目之电视收视率企业项目10
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据进行简单处理,并 ...
- 大数据小项目之电视收视率企业项目11
sqoop数据迁移 概述 sqoop是apache旗下一款"Hadoop和关系数据库服务器之间传送数据"的工具. 导入数据:MySQL,Oracle导入数据到Hadoop的HDFS ...
- 大数据小项目之电视收视率企业项目04--完全分布式搭建
完全分布式搭建 前边已经修改完ip了,那么现在就开始搭建完全分布式了 1.修改主机名 hostname(查看主机名) sudo nano /etc/hostname(修改主机名) 2.修改hosts文 ...
- 大数据小项目之电视收视率企业项目05
通过hadoop自带的demo运行单词统计(测试) 在家目录下操作: 1)mkdir input 2)cd intput 3)echo "hello word" > file ...
- 大数据小项目之电视收视率企业项目06
hadoop常用的命令 hdfs dfs -mkdir -p /user/wang/hadoop 递归创建目录 hdfs dfs -ls /user 查看/user下内容 hdfs dfs -ls ...
- 大数据小项目之电视收视率企业项目01
一.VM安装(这个大家去百度吧....) 二.centos安装 版本:centos7 'https://jingyan.baidu.com/article/a3aad71aa180e7b1fa0096 ...
- 大数据小项目之电视收视率企业项目03
Yum的介绍 Yum 全称为 Yellow dog Updater, Modified,它是一个在线的软件安装命令. 他能够从指定的服务器自动下载RPM包并且安装,可以自动处理依赖性关系,并且一次安装 ...
最新文章
- 微信小程序点击右下角的图片移动到当前位置
- Unix_Linux系统定时器的应用(案例)
- 一个简单的配置管理器(SettingManager)
- 【每周CV论文推荐】 初学深度学习人脸属性分析必读的文章
- VTK修炼之道7_三维场景基本要素:光照
- devtools安装_R语言如何批量安装软件包
- 使用c#创建php可以调用的dll
- arduino ps2摇杆程序_PS2手柄在arduino上进行测试,可用,供喜欢diy的朋友借鉴
- 黑客入侵3个月浑然不知,或影响3万家客户!这个软件测试公司心太大了!
- C#笔记15 反射、特性、序列化和动态编程
- 彻底解决pycharm中用matplotlib表格绘制时图表中汉字设置
- 物联网应用网站数据库设计
- 2022年4月份京东有什么活动?
- 儒家文化圈孕育人工智能新文明
- ERP系统里的BOM展开函数
- 【题目】pyCharm 专业版 和 社区版的区别以及如何查看其版本
- 怎么从微信群聊中引流?如何从群聊进行引流?微信群怎么引流
- 查看.tar.gz文件内容(不需要解压)
- kmp, 字符串相同前后缀
- arcpy提取线段/道路起点,终点
热门文章
- curaengine linux编译,CuraEngine_vs-master 在vs编译的curaengine - 下载 - 搜珍网
- 使用uvm_report_catcher屏蔽掉特定的uvm_error/uvm_warning
- sendcloud php,Sendcloud的x_smtpapi具体如何定义?
- Android开发学习笔记整理(9)-Adapter、ListView和GridView
- Android中MVP模式
- 程序员增加收入的几种方法
- 漫画:骚操作系列(必须掌握的疯子找座问题)
- 请不要“妖魔化”外包
- Win11终端管理员打不开解决方法
- Android Studio修改工程项目名称以及修改包名