目录

一.rowkey设计原则

1.1 唯一原则

1.2 长度原则

1.3 散列原则

二.Hbase优化实操

2.1 高可用

2.2 预分区

2.3 RowKey设计(记忆)

2.4 内存优化

2.5 HBase2.0新特性

三.Flume概述

3.1 大数据处理流程

3.2 Flume的简介

3.3 Flume的体系结构

3.4 Flume采集模型

3.4.1 模型分类

3.4.2 设计原则

3.4.3 采集方案模板

3.4.4 三大核心组件的常用接口

四、Flume的安装

五、flume采集方案演示


一.rowkey设计原则

1.1 唯一原则

必须在设计上保证其唯一性。

由于在HBase中数据存储是Key-Value形式,若HBase中同一张表插入相同Rowkey,并且是同一个列族下的同一个key时,如果version设置为1的话,则原先的数据会被覆盖掉,所以务必保证Rowkey的唯一性

1.2 长度原则

rowkey的长度统一。Rowkey是一个二进制码流,Rowkey的长度被很多开发者建议说设计在10~100个字节,不过建议是越短越好,不要超过16个字节。原因如下:-1. 数据的持久化文件HFile中是按照KeyValue存储的,如果Rowkey过长比如100个字节,1000万列数据光Rowkey就要占用100*1000万=10亿个字节,将近1G数据,这会极大影响HFile的存储效率;
-2. MemStore将缓存部分数据到内存,如果Rowkey字段过长内存的有效利用率会降低,系统将无法缓存更多的数据,这会降低检索效率。因此Rowkey的字节长度越短越好。
-3. 目前操作系统是都是64位系统,内存8字节对齐。控制在16个字节,8字节的整数倍利用操作系统的最佳特性。需要指出的是不仅Rowkey的长度是越短越好,而且列族名、列名等尽量使用短名字,因为HBase属于列式数据库,这些名字都是会写入到HBase的持久化文件HFile中去,过长的Rowkey、列族、列名都会导致整体的存储量成倍增加。

1.3 散列原则

我们设计的Rowkey应均匀的分布在各个HBase节点上。

拿常见的时间戳举例,假如Rowkey是按系统时间戳的方式递增,Rowkey的第一部分如果是时间戳信息的话将造成所有新数据都在一个RegionServer上堆积的热点现象,就是region热点问题。我们可以采取下面三种方式,来解决这个Region热点问题:-1. Reverse反转针对固定长度的Rowkey反转后存储,这样可以使Rowkey中经常改变的部分放在最前面,可以有效的随机Rowkey。
反转Rowkey的例子通常以手机举例,可以将手机号反转后的字符串作为Rowkey,这样的就避免了以手机号那样比较固定开头(137x、15x等)导致热点问题,这样做的缺点是牺牲了Rowkey的有序性。-2. Salting(加盐)Salting是将每一个Rowkey加一个前缀,前缀使用一些随机字符,使得数据分散在多个不同的Region,达到Region负载均衡的目标。比如在一个有4个Region(注:以 [ ,a)、[a,b)、[b,c)、[c, )为Region起止)的HBase表中,
加Salt前的Rowkey:abc001、abc002、abc003
我们分别加上a、b、c前缀,加Salt后Rowkey为:a-abc001、b-abc002、c-abc003
可以看到,加盐前的Rowkey默认会在第2个region中,加盐后的Rowkey数据会分布在3个region中,理论上处理后的吞吐量应是之前的3倍。由于前缀是随机的,读这些数据时需要耗费更多的时间,所以Salt增加了写操作的吞吐量,不过缺点是同时增加了读操作的开销。-3. Hash散列或者Mod
用Hash散列来替代随机Salt前缀的好处是能让一个给定的行有相同的前缀,这在分散了Region负载的同时,使读操作也能够推断。确定性Hash(比如md5后取前4位做前缀)能让客户端重建完整的RowKey,可以使用get操作直接get想要的行。例如将上述的原始Rowkey经过hash处理,此处我们采用md5散列算法取前4位做前缀,结果如下
9bf0-abc001 (abc001在md5后是9bf049097142c168c38a94c626eddf3d,取前4位是9bf0)
7006-abc002
95e6-abc003  若用前4个字符作为不同分区的起止,上面几个Rowkey数据会分布在3个region中。实际应用场景是当数据量越来越大的时候,这种设计会使得分区之间更加均衡。 如果Rowkey是数字类型的,也可以考虑Mod方法。

二.Hbase优化实操

2.1 高可用

    在HBase中Hmaster负责监控RegionServer的生命周期,均衡RegionServer的负载,如果Hmaster挂掉了,那么整个HBase集群将陷入不健康的状态,并且此时的工作状态并不会维持太久。所以HBase支持对Hmaster的高可用配置。

2.1.1 关闭HBase集群

[root@xxx1 home]# stop-hbase.sh

2.1.2 在conf目录下创建backup-masters文件

[root@xxx2 home]# touch conf/backup-masters

2.1.3 在backup-masters文件中配置高可用HMaster节点

[root@xxx2 home]# echo "192.168.49.101" > conf/backup-masters

2.1.4 将整个conf目录scp到其他节点

[root@xxx1 home]# scp -r conf/ 192.168.49.101:/home/hbase-1.2.1/hbase/

1.5 打开页面测试查看

2.2 预分区

   每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护。那么依照这个原则,我们可以将数据所要投放的分区提前大致的规划好,以提高HBase性能。

2.2.1 手动预分区

create 'stf1','info','partition1',SPLITS => ['1000','2000','3000','4000']
put 'stf1', '888', 'info:name', 'lixi'

2.2.2 生成16进制序列预分区

create 'stf2','info','partition2',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}

2.2.3 按照文件中设置的规则预分区

创建splits.txt文件内容如下:

aaaa
bbbb
cccc
dddd

然后执行

create 'stf3','partition3',SPLITS_FILE => '/home/splits.txt'

2.2.4 JavaAPI创建预分区

//自定义算法,产生一系列Hash散列值存储在二维数组中
byte[][] splitKeys = 某个散列值函数
//创建HBaseAdmin实例
HBaseAdmin hAdmin = new HBaseAdmin(HBaseConfiguration.create());
//创建HTableDescriptor实例
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
//通过HTableDescriptor实例和散列值二维数组创建带有预分区的HBase表
hAdmin.createTable(tableDesc, splitKeys);

2.3 RowKey设计(记忆)

一条数据的唯一标识就是rowkey,那么这条数据存储于哪个分区,取决于rowkey处于哪个一个预分区的区间内,设计rowkey的主要目的 ,就是让数据均匀的分布于所有的region中,在一定程度上防止数据倾斜。接下来我们就谈一谈rowkey常用的设计方案。

2.3.1 字符串反转

20191124000001转成10000042119102
20191124000002转成20000042119102

2.3.2 字符串拼接

20191124000001_lixi
20191124000001_rock

2.3.3 生成随机数、hash、散列值

比如:
原本rowKey为1001的,SHA1后变成:dd01903921ea24941c26a48f2cec24e0bb0e8cc7
原本rowKey为3001的,SHA1后变成:49042c54de64a1e9bf0b33e00245660ef92dc7bd
原本rowKey为5001的,SHA1后变成:7b61dec07e02c188790670af43e717f0f46e8913
在做此操作之前,一般我们会选择从数据集中抽取样本,来决定什么样的rowKey来Hash后作为每个分区的临界值。

2.4 内存优化

HBase操作过程中需要大量的内存开销,毕竟Table是可以缓存在内存中的,一般会分配整个可用内存的70%给HBase的Java堆。但是不建议分配非常大的堆内存,因为GC过程持续太久会导致RegionServer处于长期不可用状态,一般16~48G内存就可以了,如果因为框架占用内存过高导致系统内存不足,框架一样会被系统服务拖死。

2.4.1 设置在HDFS中追加内容

hdfs-site.xml、hbase-site.xml

<property><name>dfs.support.append</name><value>true</value><description>开启HDFS追加同步,可以优秀的配合HBase的数据同步和持久化。默认值为true</description>
</property>

2.4.2 dataNode允许的最大文件打开数

hdfs-site.xml

<property><name>dfs.datanode.max.transfer.threads</name><value>4096</value><description>HBase一般都会同一时间操作大量的文件,根据集群的数量和规模以及数据动作,设置为4096或者更高。默认值:4096</description>
</property>

2.4.3 设置延迟高的数据操作的等待时间

hdfs-site.xml

<property><name>dfs.image.transfer.timeout</name><value>60000</value><description>如果对于某一次数据操作来讲,延迟非常高,socket需要等待更长的时间,建议把该值设置为更大的值(默认60000毫秒),以确保socket不会被timeout掉。</description>
</property>

2.4.4 优化数据的写入效率

mapred-site.xml

<property><name>mapreduce.map.output.compress</name><value>true</value><description>开启这两个数据可以大大提高文件的写入效率,减少写入时间。</description>
</property>
​
<property><name>mapreduce.map.output.compress.codec</name><value>org.apache.hadoop.io.compress.GzipCodec</value><description>开启这两个数据可以大大提高文件的写入效率,减少写入时间。org.apache.hadoop.io.compress.GzipCodec或者其他压缩方式</description>
</property>

2.4.5 设置RPC监听数量

hbase-site.xml

<property><name>hbase.regionserver.handler.count</name><value>30</value><description>默认值为30,用于指定RPC监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值。</description>
</property>

2.4.6 设置HStore文件大小

hbase-site.xml

<property><name>hbase.hregion.max.filesize</name><value>10737418240</value><description>默认值10737418240(10GB),如果需要运行HBase的MR任务,可以减小此值,因为一个region对应一个map任务,如果单个region过大,会导致map任务执行时间过长。该值的意思就是,如果HFile的大小达到这个数值,则这个region会被切分为两个Hfile。</description>
</property>

2.4.7 设置hbase客户端缓存

hbase-site.xml

<property><name>hbase.client.write.buffer</name><value>4096</value><description>用于指定HBase客户端缓存,增大该值可以减少RPC调用次数,但是会消耗更多内存,反之则反之。一般我们需要设定一定的缓存大小,以达到减少RPC次数的目的。</description>
</property>

2.4.8 指定scan.next扫描HBase所获取的行数

hbase-site.xml

<property><name>hbase.client.scanner.caching</name><value>4096</value><description>用于指定scan.next方法获取的默认行数,值越大,消耗内存越大。</description>
</property>

2.4.9 flush、compact、split机制

大的Storefile文件。split则是当Region达到阈值,会把过大的Region一分为二。
涉及属性:
1. 128M就是Memstore的默认阈值
hbase.hregion.memstore.flush.size:134217728
tip:这个参数的作用是当单个HRegion内所有的Memstore大小总和超过指定值时,flush该HRegion的所有memstore。RegionServer的flush是通过将请求添加一个队列,模拟生产消费模型来异步处理的。那这里就有一个问题,当队列来不及消费,产生大量积压请求时,可能会导致内存陡增,最坏的情况是触发OOM。2.
hbase.regionserver.global.memstore.upperLimit:0.4
hbase.regionserver.global.memstore.lowerLimit:0.38
tip:当MemStore使用内存总量达到hbase.regionserver.global.memstore.upperLimit指定值时,将会有多个MemStores flush到文件中,MemStore flush 顺序是按照大小降序执行的,直到刷新到MemStore使用内存略小于lowerLimit

2.5 HBase2.0新特性

2017年8月22日凌晨2点左右,HBase发布了2.0.0 alpha-2,相比于上一个版本,修复了500个补丁,我们来了解一下2.0版本的HBase新特性。
最新文档:
http://hbase.apache.org/book.html#ttl
官方发布主页:
http://mail-archives.apache.org/mod_mbox/www-announce/201708.mbox/<CADcMMgFzmX0xYYso-UAYbU7V8z-Obk1J4pxzbGkRzbP5Hps+iA@mail.gmail.com
举例:
1) region进行了多份冗余主region负责读写,从region维护在其他HregionServer中,负责读以及同步主region中的信息,如果同步不及时,是有可能出现client在从region中读到了脏数据(主region还没来得及把memstore中的变动的内容flush)。
2) 更多变动
https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12340859&styleName=&projectId=12310753&Create=Create&atl_token=A5KQ-2QAV-T4JA-FDED%7Ce6f233490acdf4785b697d4b457f7adb0a72b69f%7Clout

三.Flume概述

3.1 大数据处理流程

在企业中,大数据的处理流程一般是:

1. 数据采集
2. 数据清洗ETL
3. 数据分析
4. 数据展示(BI,数据挖掘,为AI提供数据支持)

扩展:大数据在进行数据采集的时候,数据的种类可以这样分:

1. 业务数据
2. 行为日志数据
3. 内容数据
4. 购买的第三方数据

3.2 Flume的简介

1. Flume是一个分布式的、高可用、高可靠的行为日志数据的采集框架
2. 体系架构简单,是基于数据流的模型
3. 具有故障转移和恢复机制,以及良好的扩展机制。
4. 0.9.4以前的版本称之为Flume OG   之后称之为Flume NG版本

3.3 Flume的体系结构

1. source: 用来采集数据,封装成event,传输给channel
2. channel:  channel接受source的event,传输给sink
3. sink: 接收channel的event, 落地到相应的位置
4. Agent:  flume的最小运行单元,一个Agent中三个核心组件都要至少有一个。
5. event:  采集的数据的封装对象。  由消息头head(KV对)和消息体body(byte[]类型)组成
6. interceptor:  作用在source或者是sink端, 可以拦截event,进行设置等
7. selector: 作用在source端,用于将event分发到不同的channel中
8. processor:可以进行sink组设置,进行自动容灾,负载均衡

3.4 Flume采集模型

3.4.1 模型分类

1)单一数据模型

在单个 Agent 内由单个 Source, Channel, Sink 建立一个单一的数据流模型,如下图所示,整个数据流为 Web Server --> Source --> Channel --> Sink --> HDFS。

2) 多数据流模型

多 Agent 串行传输数据流模型

多 Agent 汇聚数据流模型

单 Agent 多路数据流模型

Sinkgroups 数据流模型

3.4.2 设计原则

Source--> Channel1.单个Source组件可以和多个Channel组合建立数据流,既可以replicating 和 multiplexing。2.多个Sources可以写入同一个 Channel
Channel-->Sink1.多个Sinks又可以组合成Sinkgroups从Channel中获取数据,既可以loadbalancing和failover机制。2.多个Sinks可以从同一个Channel中取数据。3.单个Sink只能从单个Channel中取数据

3.4.3 采集方案模板

参考官网

3.4.4 三大核心组件的常用接口

1)Source

Spooling Directory Source
TailDir Source
Kafka Source
Http Source
Avro Source
Exec Source

2)channel

Memory Channel
File Channel

3)Sink

HDFS Sink
Logger Sink
Kafka Sink
Avro Sink
Hive Sink
Hbase Sink

四、Flume的安装

1)上传,解压,更名,配置环境变量,验证

[root@xxx01 ~]# tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /usr/local/
[root@xxx01 ~]# cd /usr/local/
[root@xxx01 local]# mv apache-flume-1.8.0-bin/ flume
[root@xxx01 local]# vim /etc/profile
#FLUME environment
export FLUME_HOME=/usr/local/flume
export PATH=$FLUME_HOME/bin:$PATH[root@xxx01 local]# vim /etc/profile
[root@xxx01 local]# source /etc/profile
[root@xxx01 local]# flume-ng version

2)配置flume的环境脚本

[root@xxx01 local]# cd flume/conf
[root@xxx01 conf]# cp flume-env.sh.template flume-env.sh
[root@xxx01 conf]# vim flume-env.sh
........省略..........
export JAVA_HOME=/usr/local/jdk
........省略..........

3)集群模式的说明

flume的集群模式指的就是各个节点都安装了flume而已

五、flume采集方案演示

案例1)avro+memory+logger

logger通常用于测试,数据流中的event最终显示在屏幕上

1)采集方案的配置

[root@xxx01 ~]# mkdir flumeconf
[root@xxx01 ~]# vim ./flumeconf/avro-mem-logger.properties
# 定义三大组件的名称  和关联
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# 定义Source的相关属性
a1.sources.r1.type = avro
# 绑定本机的ip或者是hostname
a1.sources.r1.bind = xxx01
# 要监听的本机上的某一个端口号,  当程序启动时,该端口号就会被使用
a1.sources.r1.port = 10086# 定义channel的相关属性
a1.channels.c1.type = memory
#  内存存储容量 event的最大数量
a1.channels.c1.capacity=1000
#  从内存中出来时,一次性提交的event的数量
a1.channels.c1.transactionCapacity=100#定义Sink的相关属性
a1.sinks.k1.type=logger
a1.sinks.k1.maxBytesToLog = 16

2)启动方案

flume-ng agent -c /usr/local/flume/conf -f ./flumeconf/avro-mem-logger.properties -n a1 -Dflume.root.logger=INFO,console

3)测试:因为用的是avro的source,那么必须使用avro-client进行测试

[root@xxx01 ~]# mkdir flumedata
[root@xxx01 ~]# echo "hellworld" > flumedata/data.txt
[root@xxx01 ~]# flume-ng avro-client -c /usr/local/flume/conf/ -H xxx01 -p 10086 -F ./flumedata/data.txt

案例2)exec+memory+logger

注意:使用exec源,监听的文件,要提前创建

1)采集方案的编写

[root@xxx01 ~]# vim ./flumeconf/exec-mem-logger.properties
# 定义三大组件的名称  和关联
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# 定义Source的相关属性
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F ./flumedata/data.txt# 定义channel的相关属性
a1.channels.c1.type = memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100#定义Sink的相关属性
a1.sinks.k1.type=logger
a1.sinks.k1.maxBytesToLog = 16

2)启动采集方案

flume-ng agent -f ./flumeconf/exec-mem-logger.properties -n a1 -Dflume.root.logger=INFO,console

3)测试

echo "helloworld" >> ./flumedata/data.txt

案例3)exec+memory+hdfs

1)采集方案的编写

[root@xxx01 ~]# vim ./flumeconf/exec-mem-hdfs.conf
# 定义三大组件的名称  和关联
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# 定义Source的相关属性
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F ./flumedata/data.txt# 定义channel的相关属性
a1.channels.c1.type = memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100#定义Sink的相关属性
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path = /flume/%Y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = wcm
a1.sinks.k1.hdfs.fileSuffix = .wsy
# 下面三个条件满足其一,就会产生新文件
# 新文件产生的时间周期,单位是秒,   如果设置为0表示不会产生新文件。
a1.sinks.k1.hdfs.rollInterval = 60
# 当前文件达到1000字节,就会产生新文件
a1.sinks.k1.hdfs.rollSize = 1000
# 当前文件的event数量达到10条,就会产生新文件
a1.sinks.k1.hdfs.rollCount = 10
# 如果writeFormat指定了Text,那么fileType必须是DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream# round的作用,用于指定是否滚动文件夹  false 表示不滚动文件夹
a1.sinks.k1.hdfs.round = true
# 设置文件夹滚动的时间单位
a1.sinks.k1.hdfs.roundUnit = minute
# 设置文件夹固定的时间数字大小
a1.sinks.k1.hdfs.roundValue = 2
#  如果目录上设置了时间格式字符串,比如%Y等,那么下面的属性应该设置为true,除非event的head里有一个叫timestamp的消息头
a1.sinks.k1.hdfs.useLocalTimeStamp = true

2)启动方案

flume-ng agent -f ./flumeconf/exec-mem-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

3)测试

[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt
[root@xxx01 ~]# echo "chenyun " >> flumedata/data.txt

案例4)spool+memory+logger

spool源,是用来监听目录下的新文件的,并通过更名的方式来决定该文件已经采集完。注意,监听的目录必须提前存在。是一个可靠源exec源不可靠

1)采集方案的编写

[root@xxx01 ~]# vim flumeconf/spool-mem-logger.properties
# 列出每个组件的名称
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1#设置source组件的属性
a1.sources.r1.type=spooldir
#要监听的目录必须提前创建
a1.sources.r1.spoolDir=/root/data/subdir
a1.sources.r1.fileSuffix=.gyy
a1.sources.r1.deletePolicy=never
a1.sources.r1.fileHeader=false
a1.sources.r1.fileHeaderKey=file
a1.sources.r1.basenameHeader=false
a1.sources.r1.basenameHeaderKey=basename
a1.sources.r1.batchSize=100
a1.sources.r1.inputCharset=UTF-8
a1.sources.r1.bufferMaxLines=1000#设置channel组件的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100#设置sink组件的属性
a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=16

2)启动方案

flume-ng agent -f ./flumeconf/spool-mem-logger.properties -n a1 -Dflume.root.logger=INFO,console

3)测试

[root@xxx01 ~]# echo "helloworld" >>data/subdir/a.txt
[root@xxx01 ~]# cd data/subdir/
[root@xxx01 subdir]# ll
总用量 4
-rw-r--r-- 1 root root 11 12月 23 16:25 a.txt.gyy
[root@xxx01 subdir]# echo "helloworld" >>b.txt
[root@xxx01 subdir]# echo "helloworld" >>c.txt
[root@xxx01 subdir]# echo "helloworld" >>d.txt
[root@xxx01 subdir]# ll
总用量 16
-rw-r--r-- 1 root root 11 12月 23 16:25 a.txt.gyy
-rw-r--r-- 1 root root 11 12月 23 16:25 b.txt.gyy
-rw-r--r-- 1 root root 11 12月 23 16:25 c.txt.gyy
-rw-r--r-- 1 root root 11 12月 23 16:25 d.txt.gyy
注意:因为每次监听都会更名,因此再次监听的文件名不能与之前的名字重复。

案例5)spool+file+hdfs

1)方案的编写

[root@xxx01 ~]# vim flumeconf/spool-file-hdfs.properties
# 命名,并关联
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1#设置spool源
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/root/data/subdir
a1.sources.r1.fileSuffix=.gyy
a1.sources.r1.deletePolicy=never
a1.sources.r1.fileHeader=false
a1.sources.r1.fileHeaderKey=file
a1.sources.r1.basenameHeader=false
a1.sources.r1.basenameHeaderKey=basename
a1.sources.r1.batchSize=100
a1.sources.r1.inputCharset=UTF-8
a1.sources.r1.bufferMaxLines=1000#设置file的channel
a1.channels.c1.type=file#设置hdfs的sink
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://xxx01:8020/flume/hdfs/%Y
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s1.hdfs.filePrefix=michael
a1.sinks.s1.hdfs.fileSuffix=.gyy
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.round=false
a1.sinks.s1.hdfs.roundValue=2
a1.sinks.s1.hdfs.roundUnit=minute

2)启动采集方案

[root@xxx01 ~]# flume-ng agent  -f ./flumeconf/spool-file-hdfs.properties -n a1  -Dflume.root.logger=INFO,console

3)测试

[root@xxx01 subdir]# echo "helloworld" >>e.txt
[root@xxx01 subdir]# echo "helloworld" >>f.txt
[root@xxx01 subdir]# echo "helloworld" >>g.txt

案例6)http+memory+logger

1)采集方案的编写

[root@xxx01 ~]# vim flumeconf/http-mem-logger.properties
# list name of three core
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1#设置每个组件的接口以及属性
a1.sources.r1.type=http
#该源要监听的host或者是ip
a1.sources.r1.bind=xxx01
#该源要监听的port
a1.sources.r1.port=10086
a1.sources.r1.handler=org.apache.flume.source.http.JSONHandlera1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=32

2)启动方案

[root@xxx01 ~]# flume-ng agent  -f ./flumeconf/http-mem-logger.properties -n a1  -Dflume.root.logger=INFO,console

3)使用curl指令发送post协议进行测试

[root@xxx03 ~]# curl -X POST -d '[{"headers":{"girlfriend1":"zhangjunning","girlfriend":"nazha"},"body":"they are my girlfriends"}]' http://xxx01:10086解析:
-X  用来指定http的请求方式,如post或者是get
-d  用来模拟要发送的数据
第三个参数表示要将数据发送到的地址。

案例7)syslogtcp+memory+logger

1)采集方案的编写

[root@xxx01 ~]# vim flumeconf/syslogtcp-mem-logger.properties
# list name of three core
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1#设置每个组件的接口以及属性
a1.sources.r1.type=syslogtcp
a1.sources.r1.host=xxx01
a1.sources.r1.port=10086
a1.sources.r1.eventSize=2500a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100a1.sinks.s1.type=logger
a1.sinks.s1.maxBytesToLog=32

2)启动方案

[root@xxx01 ~]# flume-ng agent  -f ./flumeconf/syslogtcp-mem-logger.properties -n a1  -Dflume.root.logger=INFO,console

3)使用nc指令来发送tcp协议,进行测试

先安装nc指令:yum -y install nmap-ncat

[root@xxx03 ~]#  echo "helloworld" | nc xxx01 10086
nc的语法:   nc  host  port

案例8)taildir+memory+hdfs

taildir与spooling这两个源的比较- 相同点:1. 都是可靠源2. 监听的都是目录3. 该目录一定要提前创建
- 不同点:1. spooling监听完的文件会被重命名2. spooling监听的目录里的文件不能重名3. spooling监听的是目录里的新文件4. taildir监听的文件不会被重命名,可以一直监听文件里的新行。 

1)采集方案的编写

[root@xxx01 ~]# vim flumeconf/taildir-mem-hdfs.properties
# 命名,并关联
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1#设置spool源
a1.sources.r1.type=TAILDIR
#设置要监听的文件所属组,可以设置多个
a1.sources.r1.filegroups=g1 g2
#规定每一个组要监听的文件的绝对路径,可以使用正则表达式来表示一批文件
a1.sources.r1.filegroups.g1=/root/data/dir2/.*.txt
a1.sources.r1.filegroups.g2=/root/data/dir2/.*.csv
#a1.sources.r1.positionFile=/root/taildir_position.json#设置channel组件的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100#设置hdfs的sink
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://xxx01:8020/flume/hdfs/%Y-%m
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s1.hdfs.filePrefix=michael
a1.sinks.s1.hdfs.fileSuffix=.gyy
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=2
a1.sinks.s1.hdfs.roundUnit=minute

2)启动方案

[root@xxx01 ~]# flume-ng agent  -f ./flumeconf/taildir-mem-hdfs.properties -n a1  -Dflume.root.logger=INFO,console

3)测试

目录要提前创建出来

[root@xxx01 dir2]# echo "chenyun" > a.txt
[root@xxx01 dir2]# echo "chenyun" >> a.txt
[root@xxx01 dir2]# echo "chenyun" >> a.csv
[root@xxx01 dir2]# echo "chenyun" >> a.csv
[root@xxx01 dir2]# echo "chenyun" >> a.csv
[root@xxx01 dir2]# echo "chenyun" >> a.csv
[root@xxx01 dir2]# echo "chenyun" >> a.json   # 不会被采集的

去检查HDFS上的目录

(=-=,flume的实操几乎一样,hbase的优化需要多多注意,今天星期四,明天星期五~~~)

2021-12-23 迈向程序员的第五十二步相关推荐

  1. 《道德经》程序员版第四十二章

    原文: 道生一,一生二,二生三,三生万物.万物负阴而抱阳,冲气以为和.人之所恶,唯孤.寡.不谷,而王公以为称.故物或损之而益,或益之而损.人之所教,我亦教之:强梁者不得其死.吾将以为教父! 原文注释: ...

  2. 《那些年啊,那些事——一个程序员的奋斗史》十二

    111     虽然武总运用小手段对刘工他们进行了报复,但还是担心对方会做出什么冲动之举,所以几天后就买了回台湾的飞机票去避避风头了.当然咯,临走前武总还不忘开个会,并且严肃地宣布:"我明天 ...

  3. 2021 12 23 的程序

    #include<stdio.h> int main() {     int arr[] = {1,2,3,4,5,6,7,8,9,10};     int k = 7;     int ...

  4. 程序员的爱情 第十二章

    IT业是有名的"两高"行业--高工作强度和高跳槽率.一般情况下,每年都有两次大的跳槽机会.一个是春季2至4月,这个时候正好是春节前后,一般企业都已经发完年终奖金,准备跳槽的人都希望 ...

  5. 程序员的奋斗史(十二)——谈信念

    一个人活着连信念都没有,那他仿佛行尸走肉. 坚定的信念能够帮助我们达成目标.坚定的信念能给我们指明目标.坚定的信念能够让我们不再彷徨. 我有个妹妹就是那种做一件事遇到一丁点挫折就会放弃的人.到现在一无 ...

  6. 程序员修炼之路(十四)IT外企那点儿事--也说跳槽

    最近一个月一直在忙项目,几乎没什么时间写博客,今天中午才有时间看看csdn,在论坛上看到一篇很好的文章,分享给大家.也给自己留作备用. 原文地址:http://www.cnblogs.com/forf ...

  7. 2021年2月程序员工资统计,又拖后腿了……

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 2021年2月采集样本370020人.2021年2月程序员 ...

  8. 2021 年 1 月程序员薪资出炉,持续上涨!你过平均线了吗?

    瑟瑟发抖!程序员薪资报告又来了. 2021 年 1 月的薪资报告一定能给你个大惊喜!程序员薪资扶摇直上. (2021年1月程序员收入情况) 1 月全国程序员平均工资 14915 元,工资中位数 125 ...

  9. 2021 年 1 月程序员工资新出炉,你猜涨了多少?

    时间来到了 2021 年,关于程序员的 1 月的最新工资标准已经出炉(采集样本 352948 个),来看看有啥变化! 01 2021 年 1 月全国程序员平均工资 14915 元,工资中位数 1250 ...

最新文章

  1. MDNN:一种用于药物-药物反应预测的多模态深度神经网络
  2. 浅谈Javascript中的void操作符
  3. SQL Server中临时表与表变量的区别
  4. 为什么一定要用MQ中间件
  5. Sharepoint学习笔记---Linq to Sharepoint--查询语法
  6. paip.声音按键音延迟的解决
  7. 怎么批量修改pdf文件名?
  8. 广州科二化龙考场_广州考驾照[科目二]化龙考场.考试详解
  9. DWM安装及简略配置教程
  10. 简易计算机系统综合设计--函数发生器
  11. web前端期末大作业 html+css家乡旅游主题网页设计 湖北武汉家乡介绍网页设计实例
  12. Sencha Architect4.0破解教程
  13. linux设备模型 —— sysfs
  14. Linux更改本机ip
  15. 编译原理实验 -- 文法分析
  16. PLSQL developer下载、安装、详细教程
  17. 全球与中国DIN导轨式信号调理器市场现状及未来发展趋势
  18. 概率论与数理统计(3.4) 相互独立的随机变量
  19. 基于CommonsCollections4的Gadget分析
  20. Reliable Cloud Infrastructure: Design and Process学习笔记

热门文章

  1. 看图说话:从图片到文字
  2. 数学表达式转换成python_简单数学表达式_清华尹成python入门教程_少儿编程视频-51CTO学院...
  3. 手把手教学51单片机 | 第四节 动态数码管,用6位数码管做一个时钟
  4. SpringBoot集成邮箱功能并使用Knife4j测试
  5. Vue.js入门学习--列表渲染--v-for遍历数组生成元素(四)
  6. 单位员工通讯录管理系统
  7. 达梦数据库的TPCC测试记录
  8. C# webBrowser 通过代理访问网页
  9. 错误: 找不到或无法加载主类 com.xxxx.xxx.Application
  10. Web前端期末大作业---响应式美女健身教练瑜伽馆网页设计(HTML+CSS+JavaScript+)实现