一、采集flume

日志服务器:将日志采集到本地,共有两个日志服务器,因此要安装两台flume,每个flume采集其所在服务器上的日志

source:taildir source 可以实时的读取文件中的数据,支持断点续传

channel:kafka channel我们需要先把数据写入kafka,因为还有实时数仓

1、flle_to_kafka.conf

在flume目录下创建一个job目录,将文件存于这个目录下

#定义组件
a1.sources=r1
a1.chennls=c1#配置source
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroup.f1=/opt/module/aoolog/log/app.*
#监控这个文件夹下以app.开头的文件,日志是按天滚动的,一天一个文件,文件格式:app.yyyy-MM-dd.log
a1.sources.r1.positionFile=/opt/module/flume/taildir_position.json   #设置记录断点续传位置的
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = 包名.ETLInterceptor$Builder #配置拦截器#配置channels
a1.chennls.c1.type=org.apache.flume.channel.kafka.KafkaChannel
a1.chennls.c1.kafka.bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092
#往哪个kafka集群写数据
a1.chennls.c1.kafka.topic=topic_log #写到哪个topic
a1.chennls.c1.parseAsFlumeEvent=false
#为false将数据以正常格式写入kafka
#为true(默认)将数据以Flume Event(header+body)的形式写入kafka。这样读出来的时候也是Event#组装
a1.sources.r1.channels=c1

2、拦截器:判断json串是否完整

3、启动(未写成脚本)

nohup bin/flume-ng agent -n a1 -c conf/ -f job/flle_to_kafka.conf -Dflume.root.logger=info,console 1>out 2>&1 &

二、消费flume

1、kafka_to_hdfs_log.conf

#定义组件
a1.channels=c1
a1.sinks=k1#配置source
a1.sources=r1
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092
a1.sources.r1.kafka.topics=topic_log  #有多个用,分割
a1.sources.r1.kafka.consumer.group.id=topic_log
a1.sources.r1.batchSize=2000 #生产环境一般为2000
a1.sources.r1.batchDurationMillis=2000
#如果2000条迟迟没到.需要时间控制,单位毫秒.1000毫秒=1S
#生产环境,看多长时间生成2000条数据(上面设置的值),1天一亿条,约1000条/s,这里可以写2s
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=包名.TimestampInterceptor$Builder#配置channel
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/module/flume/checkpoint/behaviour1
#file chennel的索引维护在内存中,会备份到磁盘中,这个备份的磁盘目录就是checkpointDir
a1.channels.c1.useDualCheckpoints=false
#默认为false,如果设置为true,则会将checkpointDir再备份一份,担心一份不安全,可以用二次备份.需要再配置一个目录
a1.channels.c1.dataDirs=/opt/module/flume/data/behaviour1  #可以将数据存储在一个服务器的多个磁盘上
a1.channels.c1.maxFileSize=2146435071  #将数据写到文件,设置该文件的最大值,默认为2G
a1.channels.c1.capacity=1000000  #file channel 容量的设置,默认为100万条
a1.channels.c1.keep-alive=3 #默认3s
#如果已经达到file channel的容量100万条,再批写入15条,此时写入不成功,数据会回滚,重新从kafka读取,浪费了性能.
#设置keep-alive,flume会等sink写入一会,这样file channel也许就有空间了,keep-alive用于设置等待时长#配置sink
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/user/hive/warehouse/ods.db/ods_flow_ph/dt=%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix=log
a1.sinks.k1.hdfs.round=false  #true则下支持用配置设置滚动时间,类似crontab定时那个
a1.sinks.k1.hdfs.rollInterval=300 #,单位为s需要设置成5分钟
a1.sinks.k1.hdfs.rollSize=134217720 #128M
a1.sinks.k1.hdfs.rollCount=110000 ##控制输出类型
a1.sinks.k1.hdfs.fileType=CompressedStream
a1.sinks.k1.hdfs.codeC=lzo#组装
a1.sources.r1.channels=c1
a1.sinks.channel=c1

2、时间拦截器

1)问题

flume从日志文件拉取数据有一定的时间

由于Flume默认会用Linux系统时间作为传输到HDFS的时间,如果数据是23:59:59产生的,消费Flume拉取数据的时候可能是00:00:03,那么这部分数据会被发往第二天的HDFS路径。我们希望的是根部日志里面的实际时间发往HDFS路径,所以下面的拦截器作用是获取日志中的实际时间。

问题:

处理方法:

2)解决思路

拦截json日志,解析event body中的json,获取实际时间ts,将获取的ts时间写入拦截器header头,header的key必须是timestamp。Flume框架会自动根据这个key值识别为时间,写入HDFS。

3)代码

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;publicclassTimeStampInterceptorimplementsInterceptor {@Overridepublicvoidinitialize() {}@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();Stringlog=newString(event.getBody(), StandardCharsets.UTF_8);JSONObjectjsonObject= JSONObject.parseObject(log);Stringts= jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List<Event> intercept(List<Event> events) {for (Event event : events) {intercept(event);}return events;}@Overridepublicvoidclose() {}publicstaticclassBuilderimplementsInterceptor.Builder {@Overridepublic Interceptor build() {returnnewTimeStampInterceptor();}@Overridepublicvoidconfigure(Context context) {}}
}

3、启动

#!/bin/bashcase $1 in
"start")echo "启动node01日志数据flume"ssh node01 "nohup /data/cluster/flume/bin/flume-ng agent -n a1 -c /data/cluster/flume/conf -f /data/cluster/flume/job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console 1>out 2>&1 &"
;;
case $1 in
"stop")echo "停止node01日志数据flume"ssh node01 "ps -ef | grep kafka_to_hdfs_log.conf | grep -v grep | awk '{print \$2}' | xargs -n1 kill"
;;
esac

三、小文件问题:参数调节

a1.sinks.k1.hdfs.path=/user/hive/warehouse/ods.db/ods_flow_ph/dt=%Y-%m-%d目录下的中的小文件问题

正常应该每个文件控制在128M

小文件的危害:

1.namenode元数据空间不足,2每个小文件都会对应一个map task,占用大量的内存

这里讲的是用参数进行调节

a1.sinks.k1.hdfs.rollInterval=300 #单位为s需要设置成5分钟
a1.sinks.k1.hdfs.rollSize=134217720 #128M,实际配置应该小于128M,如果本次不为127M不满128M但再过来一批就会超过一个块,变成2个块
a1.sinks.k1.hdfs.rollCount=1000000 #一条1KB的话,这个大小不到1G

四、fllume的kafka channel

1、知识点

用kafka channel必须有kafka集群,将数据写到kafka:存储到kafka的topic里,写到partition 的leader(负责读写)中

kafka source就是kafka的消费者

kafka sink就是kafka的生产者

kafka channel 将数据以event(header+body)的形式存储, 这样读的时候,读出来的是event,在kafka channel中设置 parseAsFlume=false则会以正常格式存储,不封装成Event,但是咱们得代码需要header,在拦截器中使用,所以不能这么设置。

2、kafka channel的使用方式,可以是:

  1. source + kafka channel + sink

  1. source +拦截器 + kafka channel (没有sink)

  1. kafka channel +sink (没有source)

[数仓]埋点数据接入相关推荐

  1. 数仓埋点体系与归因实践

    目录 1. 埋点体系建设 1.1 埋点分类 1.2 开发流程&保障 2. 数仓建设 2.1 业务架构图 2.2 数仓架构图 2.3 事实表建设 2.4 维表建设 2.5 dws表建设 3.uu ...

  2. 数据仓库系列:初识数仓

    数据仓库系列:初识数仓 前言: 本节是数据仓库系列文章的第一篇,本系列的目的在于快速的构建一套最小化可运行的基础数据体系,过程中也会涉及一些数仓的理论知识,但更偏重的是数仓的实现和背后的思考逻辑.所以 ...

  3. 数仓服务平台在唯品会的建设实践

    00 导读 数据服务是数据中台体系中的关键组成部分.作为数仓对接上层应用的统一出入口,数据服务将数仓当作一个统一的 DB 来访问,提供统一的 API 接口控制数据的流入及流出,能够满足用户对不同类型数 ...

  4. 亿级数据服务平台:跟低效率、指标难统一的数仓说再见!

    点击上方"朱小厮的博客",选择"设为星标" 后台回复"书",获取 后台回复"k8s",可领取k8s资料 数据服务是数据中 ...

  5. 制造业数字原生的OT数仓建设

    关键词:OT数仓 工业物联网 智能工厂 时序库 数据采集 数字化转型 IT/OT融合     从技术演化的视角来看,工业互联网发展的核心在于OT技术(控制技术)与IT技术(信息技术)的融合.如果把时间 ...

  6. 电商离线数仓项目-埋点数据/事件日志的基本格式详细理解

    数据格式详解 埋点数据 事件数据 商品点击 商品详情页 商品列表页 广告 消息通知 用户前台活跃 用户后台活跃 评论 收藏 点赞 错误日志数据 启动日志数据 埋点数据 数据埋点产生的Json格式的数据 ...

  7. 埋点、数仓到中台:数据体系的从0到1

    本文由作者 董小矿 于社区发布 前言:有幸深度参与了公司从无数据,到有数据,到开始重视数据,最后能够尊重数据结果,参考数据进行决策的过程.本篇文章是笔者在这个过程中,作为数据产品搭建数据指标体系,如何 ...

  8. 企业级数据仓库:数据仓库概述;核心技术框架,数仓理论,数据通道Hive技术框架,HBase设计,系统调度,关系模式范式,ER图,维度建模,星型/雪花/星座模式,数据采集同步,业务数据埋点,数据仓库规范

    文章目录 第一章 数据仓库概述 1.1 数据仓库简介 1.1.2 什么是数据仓库? 1.1.3 OLTP 与 OLAP 1.2 数据仓库技术架构 1.3 课程目标 第二章 核心技术框架 2.1 数据仓 ...

  9. 数仓(六)从0到1简单搭建数仓ODS层(埋点日志 + 业务数据)

    数仓(一)简介数仓,OLTP和OLAP 数仓(二)关系建模和维度建模 数仓(三)简析阿里.美团.网易.恒丰银行.马蜂窝5家数仓分层架构 数仓(四)数据仓库分层 数仓(五)元数据管理系统解析 最近工作一 ...

最新文章

  1. 工作随笔-日常工作-小说站 PC版
  2. Java中的Atomic包
  3. 从会议、医患沟通和客服对话三大场景看对话文本摘要技术
  4. 深入理解 Tomcat(八)源码剖析之连接器
  5. 《MySQL DBA修炼之道》——1.4 MySQL权限
  6. java pcm16位_Java – 将16位带符号的pcm音频数据数组转换为双数组
  7. cmdpython命令大全_Python命令 python使用cmd命令
  8. 【整理】牛客网编程题前端篇(较难难度)
  9. EndNote20 for Mac 与搭载Apple M1芯片Mac版Word不兼容的解决方案(新发布的EndNote 20.1更新版可适配Apple M1)
  10. 联通无线网卡DNS服务器地址,全国各地电信、联通、网通的DNS服务器地址
  11. 2021 MWC上海 | 5G消息引关注,菊风共谋5G建设发展新篇章
  12. 华为7c手机怎么恢复出厂设置_华为荣耀畅玩7A/7C解锁教程_荣耀畅玩7A/7C用官方解锁码解锁方法...
  13. [病毒分析]远程木马创建傀儡进程分析
  14. 小程序—九宫格心形拼图
  15. 阿里巴巴2021校招
  16. 阿里天池供应链需求预测(二)
  17. 很好用的界面设计工具——Balsamiq(转载)
  18. python循环展示大写字母_python调用大写函数python中字典的循环遍历的两种方式
  19. R语言eval,parse批量生成变量并赋值
  20. 离散作业用c语言编写覆盖,c语言论文3000字_优秀论文范文3000字_大一论文范文3000字...

热门文章

  1. eclipse中green UML 自动生成类图
  2. matlab绘制3维椭球,matlab拟合三维椭球
  3. 轮式机器人初设计随笔
  4. Android 阿里百川cps SDK接入流程
  5. 在线文档编辑工具比较
  6. 超大规模AI异构计算集群的设计和优化
  7. 【2022寒假基础集训】第一场 - B.炸鸡块君与FIFA22【倍增DP+集合分类】
  8. 用Firebird自带的工具管理数据库
  9. Python打印星号图形系列(Python基础)
  10. 村田噪声抑制基础教程-第一章 需要EMI静噪滤波器的原因