文章目录

  • 4. 用户行为数据采集模块
    • 4.3 日志采集Flume
      • 4.3.2 日志采集Flume配置概述
        • 4.3.2.1 TailDirSource
        • 4.3.2.2 KafkaChannel
      • 4.3.3 日志采集Flume配置实操
        • 4.3.3.1 创建Flume配置文件
        • 4.3.3.2 配置文件内容如下
        • 4.3.3.3 编写Flume拦截器
          • 4.3.3.3.1 创建Maven工程flume-interceptor
          • 4.3.3.3.2 创建包:com.summer.gmall.flume.interceptor
          • 4.3.3.3.3 在pom.xml文件中添加如下配置
          • 4.3.3.3.4 在com.summer.gmall.flume.utils包下创建JSONUtil类
          • 4.3.3.3.5 在com.summer.gmall.flume.interceptor包下创建ETLInterceptor类
          • 4.3.3.3.6 打包
          • 4.3.3.3.7 需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面

4. 用户行为数据采集模块

4.3 日志采集Flume

4.3.2 日志采集Flume配置概述

  按照规划,需要采集的用户行为日志文件分布在hadoop102,hadoop103两台日志服务器,故需要在hadoop102,hadoop103两台节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。
  此处可选择TaildirSource和KafkaChannel,并配置日志校验拦截器。
  选择TailDirSource和KafkaChannel的原因如下:

4.3.2.1 TailDirSource

  TailDirSource相比ExecSource、SpoolingDirectorySource的优势
  TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
  ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
  SpoolingDirectorySource监控目录,支持断点续传。

4.3.2.2 KafkaChannel

  采用Kafka Channel,省去了Sink,提高了效率。
日志采集Flume关键配置如下:

4.3.3 日志采集Flume配置实操

4.3.3.1 创建Flume配置文件

[summer@hadoop102 flume-1.9.0]$ mkdir job
[summer@hadoop102 flume-1.9.0]$ cd job/
[summer@hadoop102 job]$ vim file_to_kafka.conf

4.3.3.2 配置文件内容如下

#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.summer.gmall.flume.interceptor.ETLInterceptor$Builder#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false#组装
a1.sources.r1.channels = c1


这个是将拦截器加上了,类名和Builder要用$连接

4.3.3.3 编写Flume拦截器

  如果没有ETL清洗,则数据有好多残缺,但是只能是轻度清洗,过于重的话则数据会堵塞到管道里,导致数据传输变慢

4.3.3.3.1 创建Maven工程flume-interceptor


4.3.3.3.2 创建包:com.summer.gmall.flume.interceptor

4.3.3.3.3 在pom.xml文件中添加如下配置
<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>
</dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

4.3.3.3.4 在com.summer.gmall.flume.utils包下创建JSONUtil类

package com.summer.gmall.flume.utils;import com.alibaba.fastjson.JSONObject;/*** @author Redamancy* @create 2022-10-25 16:38*/
public class JSONUtil {public static boolean isJSONValidata(String log) {//1.JSONObject.parseObject(log);判断json,如果是json,则返回JSON的值,如果不是JSON则会报错。// 因此使用try,catch来捕捉,如果报错,则该log不是我们想要的JSON格式,就返回false,正确的话就返回truetry {JSONObject.parseObject(log);return true;} catch (Exception e) {return false;}}
}

4.3.3.3.5 在com.summer.gmall.flume.interceptor包下创建ETLInterceptor类

package com.summer.gmall.flume.interceptor;import com.summer.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;/*** @author Redamancy* @create 2022-10-25 16:38*/
public class ETLInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1.获取body当中的数据并转成字符串byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);//2.判断字符串是否是一个合法的json,是:返回当前的event;不是:返回nullif(JSONUtil.isJSONValidata(log)){return event;}else{return null;}}@Overridepublic List<Event> intercept(List<Event> list) {//不能使用for循环来删,因为在list里面删除一个数据,则后面的数据会补上来,index会减少,使用使用这个来remove数据是不可行的,到后面会报错。//可以使用迭代器来删数据/*for (int i = 0; i < list.size(); i++) {Event event = list.get(i);if(intercept(event) == null){list.remove(i);}}*/Iterator<Event> iterator = list.iterator();while(iterator.hasNext()){Event next = iterator.next();if(intercept(next) == null) {iterator.remove();}}return list;}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}@Overridepublic void close() {}
}

4.3.3.3.6 打包

先点击clean,然后点击package。


将带依赖的放到/opt/module/flume/lib文件夹下面

4.3.3.3.7 需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面

大数据项目之电商数仓、日志采集Flume配置概述、日志采集Flume配置实操相关推荐

  1. 尚硅谷大数据项目之电商数仓(4即席查询数据仓库)

    尚硅谷大数据项目之电商数仓(即席查询) (作者:尚硅谷大数据研发部) 版本:V4.0 第1章 Presto 1.1 Presto简介 1.1.1 Presto概念 1.1.2 Presto架构 1.1 ...

  2. 电商数仓描述_笔记-尚硅谷大数据项目数据仓库-电商数仓V1.2新版

    架构 项目框架 数仓架构 存储压缩 Snappy与LZO LZO安装: 读取LZO文件时,需要先创建索引,才可以进行切片. 框架版本选型Apache:运维麻烦,需要自己调研兼容性. CDH:国内使用最 ...

  3. 31 大数据项目之电商数仓(用户行为数据采集)

    文章目录 第1章 大数据项目之电商数仓(用户行为数据采集) 第2章 项目需求 2.1 项目需求分析 2.2 项目框架 2.2.1 技术选型 2.2.2 系统架构图设计 2.2.3 系统数据流程设计 2 ...

  4. 大数据项目之电商数仓、业务数据介绍、电商系统表结构

    文章目录 6. 业务数据介绍 6.1 电商系统表结构 6.1.1 活动信息表(activity_info) 6.1.2 活动规则表(activity_rule) 6.1.3 活动商品关联表(activ ...

  5. 大数据项目之电商数仓离线计算

    本次项目是基于企业大数据的电商经典案例项目(大数据日志以及网站数据分析),业务分析.技术选型.架构设计.集群规划.安装部署.整合继承与开发和web可视化交互设计. 1.系统数据流程设计 我这里主要分享 ...

  6. 大数据项目之电商数仓(业务数据仓库)

    第1章 电商业务与数据结构简介 1.1 电商业务流程 1.2 电商表结构 电商业务流程 1.2.1 电商常识(SKU.SPU) SKU=Stock Keeping Unit(库存量单位).即库存进出计 ...

  7. 大数据项目之电商数仓(3电商数据仓库系统)V6.1.3

    第1章 数仓分层1.1 为什么要分层 1.2 数据集市与数据仓库概念 1.3 数仓命名规范1.3.1 表命名ODS层命名为ods_表名DWD层命名为dwd_dim/fact_表名DWS层命名为d ...

  8. 大数据项目之电商数仓(用户行为数据采集)

    第1章 数据仓库概念 第2章 项目需求 2.1 项目需求分析 2.2 项目框架 2.2.1 技术选型 2.2.2 系统架构图设计 2.2.3 系统数据流程设计 2.2.4 框架版本选型 产品 版本 H ...

  9. 大数据项目之电商数仓DataX、DataX简介、DataX支持的数据源、DataX架构原理、DataX部署

    文章目录 1. DataX简介 1.1 DataX概述 1.2 DataX支持的数据源 2. DataX架构原理 2.1 DataX设计理念 2.2 DataX框架设计 2.3 DataX运行流程 2 ...

最新文章

  1. 新工科教育的实践与思考——曾勇校长在工程教育高峰论坛上的报告
  2. Mongo服务器二进制文件修复,Mongodb-File-Server
  3. centos6.9下安装composer
  4. CodeForces - 1088E Ehab and a component choosing problem(树形dp)
  5. dart系列之:浏览器中的舞者,用dart发送HTTP请求
  6. Elasticsearch技术解析与实战(四)shardreplica机制
  7. python三维数据转换成二维_5大Python可视化库到底选哪个好?一篇文章搞定从选库到教学...
  8. LeetCode 2212. 射箭比赛中的最大得分(状态枚举)
  9. Flowable 数据库表结构 ACT_HI_VARINST
  10. UML---(1)一张图看懂UML 类图
  11. 分类算法python程序_分类算法——k最近邻算法(Python实现)(文末附工程源代码)...
  12. 为什么要使用 dns-prefetch
  13. 30万美元:Zerodium 出3倍价格求 WordPress RCE exploit
  14. input type=date 移动端显示placeholder失效问题
  15. 华硕afudos刷bios_华硕M2N-MX SE PLUS主板 如何用afudos命令刷BIOS
  16. c语言里除法符号,c语言整除符号(c语言switch用法举例)
  17. 软件测试之如何介绍自己的项目
  18. 科技爱好者周刊(第 190 期):产品化思维
  19. Java 基础入门,小白提升路线图
  20. 【小学】小学汉语拼音知识复习汇总

热门文章

  1. 菜鸟浅谈“诈骗”希望“治未病
  2. vim插件——YouCompleteMe
  3. 自然人报税时无企业信息怎么办
  4. 一些常用的batch命令
  5. 李兴华-JAVA10 第1章: Java语言简介
  6. 苹果键盘怎么手写_支持9种外语语音识别,新增4款外语键盘,搜狗输入法10.8版本上线丨18周新闻...
  7. 将windows窗口设置成护眼的浅绿色 .
  8. DSP DSP汇编伪指令汇总
  9. 【Nav2中文网】七、配置指南(三)行为树XML节点 之 控制插件--RoundRobin
  10. Eclipse安装插件后手动删除依赖无法再次安装插件