[数仓]埋点数据接入
一、采集flume
日志服务器:将日志采集到本地,共有两个日志服务器,因此要安装两台flume,每个flume采集其所在服务器上的日志
source:taildir source 可以实时的读取文件中的数据,支持断点续传
channel:kafka channel我们需要先把数据写入kafka,因为还有实时数仓
1、flle_to_kafka.conf
在flume目录下创建一个job目录,将文件存于这个目录下
#定义组件
a1.sources=r1
a1.chennls=c1#配置source
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroup.f1=/opt/module/aoolog/log/app.*
#监控这个文件夹下以app.开头的文件,日志是按天滚动的,一天一个文件,文件格式:app.yyyy-MM-dd.log
a1.sources.r1.positionFile=/opt/module/flume/taildir_position.json #设置记录断点续传位置的
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = 包名.ETLInterceptor$Builder #配置拦截器#配置channels
a1.chennls.c1.type=org.apache.flume.channel.kafka.KafkaChannel
a1.chennls.c1.kafka.bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092
#往哪个kafka集群写数据
a1.chennls.c1.kafka.topic=topic_log #写到哪个topic
a1.chennls.c1.parseAsFlumeEvent=false
#为false将数据以正常格式写入kafka
#为true(默认)将数据以Flume Event(header+body)的形式写入kafka。这样读出来的时候也是Event#组装
a1.sources.r1.channels=c1
2、拦截器:判断json串是否完整
![](/assets/blank.gif)
![](/assets/blank.gif)
![](/assets/blank.gif)
3、启动(未写成脚本)
nohup bin/flume-ng agent -n a1 -c conf/ -f job/flle_to_kafka.conf -Dflume.root.logger=info,console 1>out 2>&1 &
二、消费flume
1、kafka_to_hdfs_log.conf
#定义组件
a1.channels=c1
a1.sinks=k1#配置source
a1.sources=r1
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092
a1.sources.r1.kafka.topics=topic_log #有多个用,分割
a1.sources.r1.kafka.consumer.group.id=topic_log
a1.sources.r1.batchSize=2000 #生产环境一般为2000
a1.sources.r1.batchDurationMillis=2000
#如果2000条迟迟没到.需要时间控制,单位毫秒.1000毫秒=1S
#生产环境,看多长时间生成2000条数据(上面设置的值),1天一亿条,约1000条/s,这里可以写2s
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=包名.TimestampInterceptor$Builder#配置channel
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/module/flume/checkpoint/behaviour1
#file chennel的索引维护在内存中,会备份到磁盘中,这个备份的磁盘目录就是checkpointDir
a1.channels.c1.useDualCheckpoints=false
#默认为false,如果设置为true,则会将checkpointDir再备份一份,担心一份不安全,可以用二次备份.需要再配置一个目录
a1.channels.c1.dataDirs=/opt/module/flume/data/behaviour1 #可以将数据存储在一个服务器的多个磁盘上
a1.channels.c1.maxFileSize=2146435071 #将数据写到文件,设置该文件的最大值,默认为2G
a1.channels.c1.capacity=1000000 #file channel 容量的设置,默认为100万条
a1.channels.c1.keep-alive=3 #默认3s
#如果已经达到file channel的容量100万条,再批写入15条,此时写入不成功,数据会回滚,重新从kafka读取,浪费了性能.
#设置keep-alive,flume会等sink写入一会,这样file channel也许就有空间了,keep-alive用于设置等待时长#配置sink
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/user/hive/warehouse/ods.db/ods_flow_ph/dt=%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix=log
a1.sinks.k1.hdfs.round=false #true则下支持用配置设置滚动时间,类似crontab定时那个
a1.sinks.k1.hdfs.rollInterval=300 #,单位为s需要设置成5分钟
a1.sinks.k1.hdfs.rollSize=134217720 #128M
a1.sinks.k1.hdfs.rollCount=110000 ##控制输出类型
a1.sinks.k1.hdfs.fileType=CompressedStream
a1.sinks.k1.hdfs.codeC=lzo#组装
a1.sources.r1.channels=c1
a1.sinks.channel=c1
2、时间拦截器
1)问题
flume从日志文件拉取数据有一定的时间
由于Flume默认会用Linux系统时间作为传输到HDFS的时间,如果数据是23:59:59产生的,消费Flume拉取数据的时候可能是00:00:03,那么这部分数据会被发往第二天的HDFS路径。我们希望的是根部日志里面的实际时间发往HDFS路径,所以下面的拦截器作用是获取日志中的实际时间。
问题:
![](/assets/blank.gif)
处理方法:
![](/assets/blank.gif)
2)解决思路
拦截json日志,解析event body中的json,获取实际时间ts,将获取的ts时间写入拦截器header头,header的key必须是timestamp。Flume框架会自动根据这个key值识别为时间,写入HDFS。
3)代码
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;publicclassTimeStampInterceptorimplementsInterceptor {@Overridepublicvoidinitialize() {}@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();Stringlog=newString(event.getBody(), StandardCharsets.UTF_8);JSONObjectjsonObject= JSONObject.parseObject(log);Stringts= jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List<Event> intercept(List<Event> events) {for (Event event : events) {intercept(event);}return events;}@Overridepublicvoidclose() {}publicstaticclassBuilderimplementsInterceptor.Builder {@Overridepublic Interceptor build() {returnnewTimeStampInterceptor();}@Overridepublicvoidconfigure(Context context) {}}
}
3、启动
#!/bin/bashcase $1 in
"start")echo "启动node01日志数据flume"ssh node01 "nohup /data/cluster/flume/bin/flume-ng agent -n a1 -c /data/cluster/flume/conf -f /data/cluster/flume/job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console 1>out 2>&1 &"
;;
case $1 in
"stop")echo "停止node01日志数据flume"ssh node01 "ps -ef | grep kafka_to_hdfs_log.conf | grep -v grep | awk '{print \$2}' | xargs -n1 kill"
;;
esac
三、小文件问题:参数调节
a1.sinks.k1.hdfs.path=/user/hive/warehouse/ods.db/ods_flow_ph/dt=%Y-%m-%d目录下的中的小文件问题
正常应该每个文件控制在128M
小文件的危害:
1.namenode元数据空间不足,2每个小文件都会对应一个map task,占用大量的内存
这里讲的是用参数进行调节
a1.sinks.k1.hdfs.rollInterval=300 #单位为s需要设置成5分钟
a1.sinks.k1.hdfs.rollSize=134217720 #128M,实际配置应该小于128M,如果本次不为127M不满128M但再过来一批就会超过一个块,变成2个块
a1.sinks.k1.hdfs.rollCount=1000000 #一条1KB的话,这个大小不到1G
四、fllume的kafka channel
1、知识点
用kafka channel必须有kafka集群,将数据写到kafka:存储到kafka的topic里,写到partition 的leader(负责读写)中
kafka source就是kafka的消费者
kafka sink就是kafka的生产者
kafka channel 将数据以event(header+body)的形式存储, 这样读的时候,读出来的是event,在kafka channel中设置 parseAsFlume=false则会以正常格式存储,不封装成Event,但是咱们得代码需要header,在拦截器中使用,所以不能这么设置。
![](/assets/blank.gif)
2、kafka channel的使用方式,可以是:
source + kafka channel + sink
source +拦截器 + kafka channel (没有sink)
kafka channel +sink (没有source)
[数仓]埋点数据接入相关推荐
- 数仓埋点体系与归因实践
目录 1. 埋点体系建设 1.1 埋点分类 1.2 开发流程&保障 2. 数仓建设 2.1 业务架构图 2.2 数仓架构图 2.3 事实表建设 2.4 维表建设 2.5 dws表建设 3.uu ...
- 数据仓库系列:初识数仓
数据仓库系列:初识数仓 前言: 本节是数据仓库系列文章的第一篇,本系列的目的在于快速的构建一套最小化可运行的基础数据体系,过程中也会涉及一些数仓的理论知识,但更偏重的是数仓的实现和背后的思考逻辑.所以 ...
- 数仓服务平台在唯品会的建设实践
00 导读 数据服务是数据中台体系中的关键组成部分.作为数仓对接上层应用的统一出入口,数据服务将数仓当作一个统一的 DB 来访问,提供统一的 API 接口控制数据的流入及流出,能够满足用户对不同类型数 ...
- 亿级数据服务平台:跟低效率、指标难统一的数仓说再见!
点击上方"朱小厮的博客",选择"设为星标" 后台回复"书",获取 后台回复"k8s",可领取k8s资料 数据服务是数据中 ...
- 制造业数字原生的OT数仓建设
关键词:OT数仓 工业物联网 智能工厂 时序库 数据采集 数字化转型 IT/OT融合 从技术演化的视角来看,工业互联网发展的核心在于OT技术(控制技术)与IT技术(信息技术)的融合.如果把时间 ...
- 电商离线数仓项目-埋点数据/事件日志的基本格式详细理解
数据格式详解 埋点数据 事件数据 商品点击 商品详情页 商品列表页 广告 消息通知 用户前台活跃 用户后台活跃 评论 收藏 点赞 错误日志数据 启动日志数据 埋点数据 数据埋点产生的Json格式的数据 ...
- 埋点、数仓到中台:数据体系的从0到1
本文由作者 董小矿 于社区发布 前言:有幸深度参与了公司从无数据,到有数据,到开始重视数据,最后能够尊重数据结果,参考数据进行决策的过程.本篇文章是笔者在这个过程中,作为数据产品搭建数据指标体系,如何 ...
- 企业级数据仓库:数据仓库概述;核心技术框架,数仓理论,数据通道Hive技术框架,HBase设计,系统调度,关系模式范式,ER图,维度建模,星型/雪花/星座模式,数据采集同步,业务数据埋点,数据仓库规范
文章目录 第一章 数据仓库概述 1.1 数据仓库简介 1.1.2 什么是数据仓库? 1.1.3 OLTP 与 OLAP 1.2 数据仓库技术架构 1.3 课程目标 第二章 核心技术框架 2.1 数据仓 ...
- 数仓(六)从0到1简单搭建数仓ODS层(埋点日志 + 业务数据)
数仓(一)简介数仓,OLTP和OLAP 数仓(二)关系建模和维度建模 数仓(三)简析阿里.美团.网易.恒丰银行.马蜂窝5家数仓分层架构 数仓(四)数据仓库分层 数仓(五)元数据管理系统解析 最近工作一 ...
最新文章
- 工作随笔-日常工作-小说站 PC版
- Java中的Atomic包
- 从会议、医患沟通和客服对话三大场景看对话文本摘要技术
- 深入理解 Tomcat(八)源码剖析之连接器
- 《MySQL DBA修炼之道》——1.4 MySQL权限
- java pcm16位_Java – 将16位带符号的pcm音频数据数组转换为双数组
- cmdpython命令大全_Python命令 python使用cmd命令
- 【整理】牛客网编程题前端篇(较难难度)
- EndNote20 for Mac 与搭载Apple M1芯片Mac版Word不兼容的解决方案(新发布的EndNote 20.1更新版可适配Apple M1)
- 联通无线网卡DNS服务器地址,全国各地电信、联通、网通的DNS服务器地址
- 2021 MWC上海 | 5G消息引关注,菊风共谋5G建设发展新篇章
- 华为7c手机怎么恢复出厂设置_华为荣耀畅玩7A/7C解锁教程_荣耀畅玩7A/7C用官方解锁码解锁方法...
- [病毒分析]远程木马创建傀儡进程分析
- 小程序—九宫格心形拼图
- 阿里巴巴2021校招
- 阿里天池供应链需求预测(二)
- 很好用的界面设计工具——Balsamiq(转载)
- python循环展示大写字母_python调用大写函数python中字典的循环遍历的两种方式
- R语言eval,parse批量生成变量并赋值
- 离散作业用c语言编写覆盖,c语言论文3000字_优秀论文范文3000字_大一论文范文3000字...