文章目录

  • 增量采集、处理、导出
    • 一、增量采集
      • 1.增量采集的业务需求
      • 2.增量采集的方案
      • 3.增量采集的实现
    • 二、增量处理
      • 1.Hive中创建表
      • 2.第一次增量处理
      • 3.第二次增量处理
    • 三、增量导出
      • 1.Hive中建立APP层结果表
      • 2.第一次增量导出
      • 3.第二次增量导出
    • 四、增量采集脚本的开发
    • 五、增量处理脚本的开发
    • 六、增量导出脚本的开发

增量采集、处理、导出

一、增量采集

1.增量采集的业务需求

离线与实时
离线:以时间为单位来实现数据的处理:采集、计算

  • 场景:每天处理一次,每个小时处理一次,每个月,每年
  • 特点:时效性比较低,一般都是分钟级别
  • 工具:Hadoop生态圈
    • Sqoop、HDFS、Hive、MapReduce、SparkCore、SparkSQL、Tez、Impala、Sqoop、MySQL

实时:以数据为单位的实现数据的处理:采集、计算

  • 场景:产生一条数据就要立刻采集以及处理一条数据
  • 特点:时效性非常高,一般都是毫秒级别
  • 工具:实时生态圈
    • Flume、Canal、Kafka、SparkStreaming/Flink、Redis、Hbase

离线需求

  • 实现离线采集、离线计算、离线结果保存
  • 所有过程:都是增量的过程

增量与全量

  • 全量:每次都所有数据进行处理

    • 一般用于数据迁移、维度表的更新
  • 增量:每次对最新【新增、更新】的数据进行处理
    • 工作中主要的场景

2.增量采集的方案

Flume:增量文件采集

  • exec:tail命令,动态的获取文件的尾部

    • tail命令,自动读取文件的尾部
  • taildir:动态实时监控多个文件
    • 记录文件的采集位置:taildir_position.json
    • 实现增量采集

Sqoop:增量采集数据库

  • 方式一:按照某一列自增的int值来实现:append

    • 要求:必须有一列自增的int值,必须有主键
    • 特点:只能采集新增的数据
  • 方式二:按照数据变化的时间列的值来实现:lastmodifield

    • 要求:必须有一列时间列,时间列随着数据的更新而自动更新
    • 特点:能采集新增和更新的数据
  • 方式三:通过指定目录分区采集到对应的HDFS目录下

    • 要求:表中有两个字段

      • create_time:创建时间
      • 新增的数据
  • update_time:更新时间

    • 更新的数据
  • 怎么解决更新和新增数据的问题:通过SQL的过滤

-e "select * from table where substr(create_time,1,10) = '2021-05-16' or substr(update_time,1,10) = '2021-05-16'“
  • 增量要求目录是提前存在,追加新增的数据进入,没有使用官方提供的增量,目录不能提前存在
--target-dir /nginx/log/2021-05-15/
  • 问题:如何通过Sqoop将数据采集到Hive的分区表中?

    • –hive-partition-key daystr:指定分区的字段是哪个字段
    • –hive-partition-value 2021-05-15:指定导入哪个分区
  • 原理
    • 根据指定的参数,在HDFS中创建一个目录:key=value
table/daystr=2021-05-15
  • 在Hive中加载分区即可

  • 方式二:通过手动指定HDFS方式来代替

--target-dir /nginx/log/daystr=2021-05-15/hourstr=00

3.增量采集的实现

  • 创建MySQL测试数据表
create database if not exists db_order;
use db_order;
drop table if exists tb_order;
create table tb_order(
id varchar(10) primary key,
pid varchar(10) not null,
userid varchar(10) not null,
price double not null,
create_time varchar(20) not null
);
  • 插入测试数据
insert into tb_order values('o00001','p00001','u00001',100,'2021-05-13 00:01:01');
insert into tb_order values('o00002','p00002','u00002',100,'2021-05-13 10:01:02');
insert into tb_order values('o00003','p00003','u00003',100,'2021-05-13 11:01:03');
insert into tb_order values('o00004','p00004','u00004',100,'2021-05-13 23:01:04');
insert into tb_order values('o00005','p00005','u00001',100,'2021-05-14 00:01:01');
insert into tb_order values('o00006','p00006','u00002',100,'2021-05-14 10:01:02');
insert into tb_order values('o00007','p00007','u00003',100,'2021-05-14 11:01:03');
insert into tb_order values('o00008','p00008','u00004',100,'2021-05-14 23:01:04');
  • 第一次增量采集:采集14日新增的数据
sqoop  import \
--connect jdbc:mysql://node3:3306/db_order \
--username root \
--password-file file:///export/data/sqoop.passwd \
--query "select * from tb_order where substring(create_time,1,10) = '2021-05-14' and \$CONDITIONS " \
--delete-target-dir \
--target-dir /nginx/logs/tb_order/daystr=2021-05-14 \
--fields-terminated-by '\t' \
-m 1

  • 15日MySQL中有新的数据生成
insert into tb_order values('o00009','p00005','u00001',100,'2021-05-15 00:01:01');
insert into tb_order values('o00010','p00006','u00002',100,'2021-05-15 10:01:02');
insert into tb_order values('o00011','p00007','u00003',100,'2021-05-15 11:01:03');
insert into tb_order values('o00012','p00008','u00004',100,'2021-05-15 23:01:04');
  • 第二次增量采集:采集15日新增的数据
sqoop  import \
--connect jdbc:mysql://node3:3306/db_order \
--username root \
--password-file file:///export/data/sqoop.passwd \
--query "select * from tb_order where substring(create_time,1,10) = '2021-05-15' and \$CONDITIONS " \
--delete-target-dir \
--target-dir /nginx/logs/tb_order/daystr=2021-05-15 \
--fields-terminated-by '\t' \
-m 1

二、增量处理

1.Hive中创建表

use default;
drop table default.tb_order;
create table if not exists default.tb_order(
id string ,
pid string,
userid string,
price double ,
create_time string
)
partitioned  by (daystr string)
row format delimited fields terminated by '\t'
location '/user/hive/warehouse/tb_order';

2.第一次增量处理

  • 假设今天15号:对14号的数据进行处理
  • 增量ETL
    • 如果ETL发生在进入Hive之前,ETL工作中都是通过SparkCore/MR程序进行处理
yarn jar etl.jar   main_class \
#输入目录:数据采集的目录
/nginx/logs/tb_order/daystr=2021-05-14  \
#输出目录:构建Hive表的目录
/user/hive/warehouse/tb_order/daystr=2021-05-14
  • 这个程序一般不会给参数,昨天的日期都是在ETL程序中动态获取的
//Input
Path inputPath = new Path("/nginx/logs/tb_order/daystr="+yesterday)
TextInputFormat.setInputPaths(job,inputPath)
//Output
Path outputPath = new Path("/user/hive/warehouse/tb_order/daystr="+yesterday)
TextOutputFormat.setOutputPath(job,outputPath)
  • 模拟数据ETL的过程
hdfs dfs -cp /nginx/logs/tb_order/daystr=2021-05-14 /user/hive/warehouse/tb_order/


  • 加载分区

    • 第一种方式:修改Hive元数据,默认用于修复添加分区,不常用
msck repair table tb_order;


  • 这个命令在Hive2.1版本中是个bug,不能用
  • 增量处理
select
daystr,
count(id) as order_number,
sum(price) as order_price
from default.tb_order
where daystr='2021-05-14'
group by daystr;

3.第二次增量处理

  • 假设今天16号:对15号的数据进行处理
  • 增量ETL
    • 模拟数据ETL的过程
hdfs dfs -cp /nginx/logs/tb_order/daystr=2021-05-15  /user/hive/warehouse/tb_order/


加载分区

  • 第二种方式,添加分区元数据信息
alter table tb_order add if not exists partition(daystr='2021-05-15');

  • 要求:HDFS上目录的名称必须为分区字段=值
/user/hive/warehouse/tb_order/daystr=2021-05-15
  • Hive自动根据分区默认的目录名来关联的

  • 场景

/user/hive/warehouse/tb_order/2021-05-15
  • 实现:通过location关键字手动指定分区对应的HDFS
alter table tb_order add if not exists partition(daystr='2021-05-15') location '/user/hive/warehouse/tb_order/2021-05-15';
  • 增量处理
select
daystr,
count(id) as order_number,
8ujhb sum(price) as order_price
from default.tb_order
where daystr='2021-05-15
group by daystr;

三、增量导出

1.Hive中建立APP层结果表

drop table if exists tb_order_rs;
create table if not exists default.tb_order_rs(
daystr string,
order_number int,
order_price double
)
row format delimited fields terminated by '\t';

MySQL中创建导出结果表

use db_order;
drop table if exists db_order.tb_order_rs;
create table db_order.tb_order_rs(
daystr varchar(20) primary key,
order_number int,
order_price double
);

2.第一次增量导出

  • 第一次分析的结果写入HiveAPP层的结果表
insert into table tb_order_rs
select
daystr,
count(id) as order_number,
sum(price) as order_price
from default.tb_order
where daystr='2021-05-14'
group by daystr;

第一次导出到MySQL

sqoop export \
--connect jdbc:mysql://node3:3306/db_order \
--username root \
--password 123456 \
--table tb_order_rs \
--hcatalog-database default \
--hcatalog-table tb_order_rs \
--input-fields-terminated-by '\t' \
--update-key daystr \
--update-mode allowinsert \
-m 1

3.第二次增量导出

第二次分析的结果写入HiveAPP层的结果表

insert into table tb_order_rs
select
daystr,
count(id) as order_number,
sum(price) as order_price
from default.tb_order
where daystr='2021-05-15'
group by daystr;

第二次导出到MySQL

sqoop export \
--connect jdbc:mysql://node3:3306/db_order \
--username root \
--password 123456 \
--table tb_order_rs \
--hcatalog-database default \
--hcatalog-table tb_order_rs \
--input-fields-terminated-by '\t' \
--update-key daystr \
--update-mode allowinsert \
-m 1

四、增量采集脚本的开发

  • 需求:实现增量的采集,默认采集昨天的数据,允许采集指定日期的数据,并且实现模拟ETL过程

    • 如果给定了参数,处理参数日期的数据
    • 如果没有给参数,默认处理昨天的数据
  • 创建脚本

mkdir /export/data/shell
vim /export/data/shell/01.collect.sh
  • 开发脚本
#!/bin/bash#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then#参数个数不为0if [ $# -ne 1 ]thenecho "参数至多只能有一个,为处理的日期,请重新运行!"exit 100else#参数个数只有1个,就用第一个参数作为处理的日期yesterday=$1fi
else#参数个数为0,默认处理昨天的日期yesterday=`date -d '-1 day' +%Y-%m-%d`
fi
echo "step1:要处理的日期是:${yesterday}"echo "step2:开始运行采集的程序"
#step2:运行增量采集
SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0
$SQOOP_HOME/bin/sqoop  import \
--connect jdbc:mysql://node3:3306/db_order \
--username root \
--password-file file:///export/data/sqoop.passwd \
--query "select * from tb_order where substring(create_time,1,10) = '${yesterday}' and \$CONDITIONS " \
--delete-target-dir \
--target-dir /nginx/logs/tb_order/daystr=${yesterday} \
--fields-terminated-by '\t' \
-m 1echo "step2:采集的程序运行结束"echo "step3:开始运行ETL"
#模拟ETL的过程,将采集的新增的数据移动到表的目录下
HADOOP_HOME=/export/server/hadoop-2.6.0-cdh5.14.0
#先判断结果是否存在,如果已经存在,先删除再移动
$HADOOP_HOME/bin/hdfs dfs -test -e  /user/hive/warehouse/tb_order/daystr=${yesterday}
if [ $? -eq 0 ]
then#存在$HADOOP_HOME/bin/hdfs dfs -rm -r  /user/hive/warehouse/tb_order/daystr=${yesterday}$HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=${yesterday} /user/hive/warehouse/tb_order/
else#不存在$HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=${yesterday} /user/hive/warehouse/tb_order/
fi
echo "step3:ETL结束"

五、增量处理脚本的开发

  • 创建脚本
vim /export/data/shell/02.analysis.sh
vim /export/data/shell/02.analysis.sql
  • Shell脚本
#!/bin/bash#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then#参数个数不为0if [ $# -ne 1 ]thenecho "参数至多只能有一个,为处理的日期,请重新运行!"exit 100else#参数个数只有1个,就用第一个参数作为处理的日期yesterday=$1fi
else#参数个数为0,默认处理昨天的日期yesterday=`date -d '-1 day' +%Y-%m-%d`
fi
echo "step1:要处理的日期是:${yesterday}"echo "step2:开始运行分析"
#step2:运行分析程序
HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0
$HIVE_HOME/bin/hive --hiveconf yest=${yesterday}  -f  /export/data/shell/02.analysis.sqlecho "step2:分析的程序运行结束"

hive --hiveconf 参数 用于传递参数
hive -f 文件名 执行某个文件中的sql
SQL文件

create table if not exists default.tb_order(
id string ,
pid string,
userid string,
price double ,
create_time string
)
partitioned  by (daystr string)
row format delimited fields terminated by '\t'
location '/user/hive/warehouse/tb_order';alter table default.tb_order add if not exists partition (daystr='${hiveconf:yest}');create table if not exists default.tb_order_rs(
daystr string,
order_number int,
order_price double
)
row format delimited fields terminated by '\t';insert into table default.tb_order_rs
select
daystr,
count(id) as order_number,
sum(price) as order_price
from default.tb_order
where daystr='${hiveconf:yest}'
group by daystr;

六、增量导出脚本的开发

  • 创建脚本
vim /export/data/shell/03.export.sh
  • Shell脚本
#!/bin/bashecho "step1:开始运行导出的程序"
#step2:运行增量采集
SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0
$SQOOP_HOME/bin/sqoop export \
--connect jdbc:mysql://node3:3306/db_order \
--username root \
--password 123456 \
--table tb_order_rs \
--hcatalog-database default \
--hcatalog-table tb_order_rs \
--input-fields-terminated-by '\t' \
--update-key daystr \
--update-mode allowinsert \
-m 1echo "step1:导出的程序运行结束"

大数据:增量采集、处理、导出相关推荐

  1. 大数据增量采集OGGAdapter的安装部署与相关配置

    大数据云时代,数据上云ETL已成了最基础,最根本,最必须的一个步骤.目前数据传输迁移的工具非常多,比如dataX,DTS,kettle等等.为了保证云上存储空间的有效利用和数据的整体唯一性,就没必要每 ...

  2. 大数据应用导论 Chapter02 | 大数据的采集与清洗

      大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语-不温不火,本意是希望自己性情温和.作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己 ...

  3. 审计大数据综合分析采集管理系统软件平台

    审计大数据综合分析采集管理系统软件平台 华盛恒辉审计管理系统集数据远程报送.智能入库.数据处理.综合查询.数据授权和分发等功能于--体,大大提升了数据管理的科学性.规范性使审计机关大量积累的各类被审计 ...

  4. 商圈分析如何大数据软件采集相关要素

    商圈分析如何大数据软件采集相关要素 商圈是在商业集聚的基础上逐步形成与发展起来的,商圈就是有一定辐射范围的商业集聚地.随着商业集聚地的辐射范围不断扩大,容量不断变大,商业网点不断增加,业态业种不断完善 ...

  5. Java操作大数据量Excel导入导出万能工具类(完整版)

    Java操作大数据量Excel导入导出万能工具类(完整版) 转载自:https://blog.csdn.net/JavaWebRookie/article/details/80843653 更新日志: ...

  6. 大数据分页查询 or 导出 慢sql治理

    大数据分页查询 or 导出 慢sql治理 背景 缺陷 要求 优化方案 适用场景 方案优点 方案缺点 时间拆分如处理分页查询问题 方案说明 使用说明 分页查询工具 时间拆分工具 背景 当前日增数据量将近 ...

  7. 大数据导论(三:大数据的采集及预处理)

    1.大数据采集 1.1 大数据采集概念 数据采集(DAQ)又称数据获取,通过RFID射频数据.传感器数据.社交网络数据.移动互联网数据等方式获得各种类型的结构化.半结构化及非结构化的海量数据. 1.2 ...

  8. 前嗅教你大数据:采集东方财富网数据

     l 采集场景 [场景描述]采集东方财富网行情中心沪深京A股数据. [使用工具]前嗅ForeSpider数据采集系统,免费下载: ForeSpider免费版本下载地址 l采集网站 [入口网址] htt ...

  9. 前嗅教你大数据:采集孔夫子旧书网

    l 采集网站 [场景描述]采集孔夫子旧书网数据. [源网站介绍]孔夫子旧书网是国内专业的古旧书交易平台,汇集全国各地13000家网上书店,50000家书摊,展示多达9000万种书籍:大量极具收藏价值的 ...

  10. 2022年最全教程:如何做大数据的采集数据及数据分析?

    这篇绝对是我分享过的最清楚.最全的一篇教程!能够解决大部分人的数据采集及分析需求! 实用.简单,尤其适合excel大户.办公族.业务人员,或者不会编程.不懂数据分析理论的技术小白-- 图文.动图.视频 ...

最新文章

  1. 步步为营-55-js练习
  2. P2172 [国家集训队]部落战争 二分图最小不相交路径覆盖
  3. 为什么要看源码、如何看源码,高手进阶必看
  4. 17 HTTP编程入门
  5. openstack部署(四)--网络配置(Networking)
  6. 计算机顶会论文写作科研利器
  7. Fedora/RHEL/openSUSE等Linux安装vivaldi——vivaldi浏览器
  8. mysql关键字及其用法_mysql的优化-explain
  9. Sublime Text3搭建HTML环境
  10. redis统计用户日活量_【赵强老师】Redis案例分析:用setbit统计活跃用户
  11. CAD线型设置:CAD软件中如何加粗曲线?
  12. WSJ新闻标题的中心词提取
  13. 记录在处理SIF数据中,遇到的一些问题及解决过程
  14. java frame 显示图片_java怎么在JFrame中显示动态图片?
  15. 从零开始写项目【总结】
  16. python社区微信群_30行Python代码,打造一个简单的微信群聊助手,简单方便
  17. 相机照片删除了怎么恢复?60%的人都用这三步解决了
  18. 拓端tecdat|R语言向量误差修正模型 (VECMs)分析长期利率和通胀率影响关系
  19. python中文爬取网页_Python抓取中文网页
  20. MySQL 数据库 User表权限以及用户授权详解

热门文章

  1. QLCDNumber设置背景色和显示数字颜色
  2. 5分钟摄像头抓拍一次,居家一天至少89次!尚德员工:连厕所都不敢上
  3. python打印日历_python 打印日历
  4. 【JVM笔记】Parallel Scavenge回收器:吞吐量优先
  5. Java学习之:如何将 java 程序打包成 .jar 文件
  6. Intelligent Reflecting Surface Assisted Secrecy Communication via Joint Beamforming and Jamming
  7. WINVNC源码分析(三)rdr
  8. 【基础知识】9、加州房价预测
  9. 微软经典面试题(数字翻译为中文)
  10. ANSYS ICEM CFD三维结构网格生成实例——汽车外流