一、flume安装与配置
1.将flume解压到指定目录/home/jason/bigdata下:
[root@clsserv202 bigdata]# pwd
/home/jason/bigdata
[root@clsserv202 bigdata]# ll
总用量 4
drwxrwxrwx. 8 root root 4096 9月 8 14:38 flume-1.6.0
2.将flume的目录添加到环境变量
export FLUME_HOME=/home/jason/bigdata/flume-1.6.0
1.从指定端口号读取数据到Kafka中
1.1、首先指定本flume的Source、Channel、Sink的名称
agent_ps.sources = idg-src
agent_ps.sinks = idg-sk
agent_ps.channels = idg-ch
1.2、下面再对Source、Channel、Sink进行详细配置:
(1)配置源的各项配置为:
agent_ps.sources.idg-src.type = avro
agent_ps.sources.idg-src.bind = 11.12.112.215
agent_ps.sources.idg-src.port = 44440
agent_ps.sources.idg-src.threads = 5
agent_ps.sources.idg-src.channels = idg-ch
【注意】
type为avro,表示从本机指定的IP和端口接收数据;
channels 指定本Source对应的Channel的名称,这里的channels也表示一个Source可以对应多个Channel;
(2)channel预估一下日志手机的速度以及本机的性能进行分配,这里用的是:
agent_ps.channels.idg-ch.type = memory
agent_ps.channels.idg-ch.keep-alive = 60
agent_ps.channels.idg-ch.capacity = 500
agent_ps.channels.idg-ch.transactionCapacity = 100
(3)Sink配置为数据即将输出的Kafka的信息:
agent_ps.sinks.idg-sk.channel = idg-ch
agent_ps.sinks.idg-sk.type = org.apache.flume.sink.kafka.KafkaSink
agent_ps.sinks.idg-sk.topic = flume_log
agent_ps.sinks.idg-sk.brokerList = 11.12.112.206:9092,11.12.112.207:9092,11.12.112.208:9092
agent_ps.sinks.idg-sk.requireAcks = 0
agent_ps.sinks.idg-sk.batchSize = 20
【注意】
type中指定的Kafka的包名
brokerList中指定了Kafka集群的地址;
channel 指定本Sink对应的Channel的名称,这里的channels表示一个Sink的来源只能对应一个Channel
(4)综合上述的说明,实例配置文件中的数据为:
# list the sources, sinks and channels for the agent
agent_ps.sources = idg-src
agent_ps.sinks = idg-sk
agent_ps.channels = idg-ch
agent_ps.sources.idg-src.type = avro
agent_ps.sources.idg-src.bind = 11.12.112.215
agent_ps.sources.idg-src.port = 44440
agent_ps.sources.idg-src.threads = 5
agent_ps.sources.idg-src.channels = idg-ch
agent_ps.channels.idg-ch.type = memory
agent_ps.channels.idg-ch.keep-alive = 60
agent_ps.channels.idg-ch.capacity = 500
agent_ps.channels.idg-ch.transactionCapacity = 100
agent_ps.sinks.idg-sk.channel = idg-ch
agent_ps.sinks.idg-sk.type = org.apache.flume.sink.kafka.KafkaSink
agent_ps.sinks.idg-sk.topic = flume_log
agent_ps.sinks.idg-sk.brokerList = 11.12.112.206:9092,11.12.112.207:9092,11.12.112.208:9092
agent_ps.sinks.idg-sk.requireAcks = 0
agent_ps.sinks.idg-sk.batchSize = 20
2.从本地的日志文件中读取数据到Kafka中
与1相比只需要修改Source的配置,将其从本地的某个端口读取修改为从指定的文件中读取;
agent_ps.sources.idg-src.type = exec
agent_ps.sources.idg-src.shell = /bin/sh -c
agent_ps.sources.idg-src.command = tail -F /home/logs/idGen/idGen.log
agent_ps.sources.idg-src.restartThrottle = 2000
agent_ps.sources.idg-src.restart = true
agent_ps.sources.idg-src.batchSize = 20
agent_ps.sources.idg-src.channels = idg-ch
其他的Channel和Sink的配置与1一致;
3.从本地日志文件中读取数据到另一个flume中
与2相比,只需要修改Sink,见下面的红色部分:
# list sources, sinks and channels in the agent
agent_ps.sources = idg-src
agent_ps.sinks = idg-sk
agent_ps.channels = idg-ch
agent_ps.sources.idg-src.type = exec
agent_ps.sources.idg-src.shell = /bin/sh -c
agent_ps.sources.idg-src.command = tail -F /home/logs/idGen/idGen.log
agent_ps.sources.idg-src.restartThrottle = 2000
agent_ps.sources.idg-src.restart = true
agent_ps.sources.idg-src.batchSize = 20
agent_ps.sources.idg-src.channels = idg-ch
agent_ps.channels.idg-ch.type = memory
agent_ps.channels.idg-ch.keep-alive = 60
agent_ps.channels.idg-ch.capacity = 100
agent_ps.channels.idg-ch.transactionCapacity = 100
agent_ps.sinks.idg-sk.type = avro
agent_ps.sinks.idg-sk.hostname = 11.12.112.215
agent_ps.sinks.idg-sk.port = 44440
agent_ps.sinks.idg-sk.batch-size = 20
agent_ps.sinks.idg-sk.request-timeout = 3000
agent_ps.sinks.idg-sk.channel = idg-ch
二、flume启动
1.进入到flume的bin目录下:
[root@clsserv202 bin]# pwd
/home/jason/bigdata/flume-1.6.0/bin
[root@clsserv202 bin]# ll
总用量 36
-rwxrwxrwx. 1 root root 12845 9月 8 14:37 flume-ng
-rwxrwxrwx. 1 root root 936 9月 8 14:37 flume-ng.cmd
-rwxrwxrwx. 1 root root 14041 9月 8 14:37 flume-ng.ps1
2.运行flume-ng
flume-ng agent -n agent_ps -c $FLUME_HOME/conf -f $FLUME_HOME/conf/flume.conf -Dflume.root.logger=INFO,LOGFILE > /dev/null 2>&1 & (也可以-Dflume.root.logger=INFO,console)
3.查看flume-ng进程是否启动成功
[root@clsserv202 flume-1.6.0]# jps -l
6737 sun.tools.jps.Jps
6700 org.apache.flume.node.Application
6218 com.xyz.idgen.mainframe.Starter
4、在flume连接Kafka时,通过在Kafka集群创建一个消费者来消费flume创建的topic即可看到flume发来的数据,如果没有数据到来则说明这中间的连接有问题;
三、常见的source、channel、sink的含义及说明
3.1.source
(1)avro source
avro可以监听和收集指定端口的日志,使用avro的source需要说明被监听的主机ip和端口号,下面给出一个具体的例子:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
(2)exec source
exec可以通过指定的操作对日志进行读取,使用exec时需要指定shell命令,对日志进行读取,下面给出一个具体的例子:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
(3)spooling-directory source
spo_dir可以读取文件夹里的日志,使用时指定一个文件夹,可以读取该文件夹中的所有文件,需要注意的是该文件夹中的文件在读取过程中不能修改,同时文件名也不能修改。下面给出一个具体的例子:
agent-1.channels = ch-1
agent-1.sources = src-1
agent-1.sources.src-1.type = spooldir
agent-1.sources.src-1.channels = ch-1
agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
agent-1.sources.src-1.fileHeader = true
(4)syslog source
syslog可以通过syslog协议读取系统日志,分为tcp和udp两种,使用时需指定ip和端口,下面给出一个udp的例子:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
3.2.常见的channel
Flume的channel种类并不多,最常用的是memory channel,下面给出例子:
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
3.3.常见的sink
(1)logger sink
l ogger顾名思义,就是将收集到的日志写到flume的log中,是个十分简单但非常实用的sink
(2)avro sink
avro可以将接受到的日志发送到指定端口,供级联agent的下一跳收集和接受日志,使用时需要指定目的ip和端口:例子如下:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
(3)file roll sink
file_roll可以将一定时间内收集到的日志写到一个指定的文件中,具体过程为用户指定一个文件夹和一个周期,然后启动agent,这时该文件夹会产生一个文件将该周期内收集到的日志全部写进该文件内,直到下一个周期再次产生一个新文件继续写入,以此类推,周而复始。下面给出一个具体的例子:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
(4)hdfs sink
hdfs与file roll有些类似,都是将收集到的日志写入到新创建的文件中保存起来,但区别是file roll的文件存储路径为系统的本地路径,而hdfs的存储路径为分布式的文件系统hdfs的路径,同时hdfs创建新文件的周期可以是时间,也可以是文件的大小,还可以是采集日志的条数。具体实例如下:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
(5)hbase sink
hbase是一种数据库,可以储存日志,使用时需要指定存储日志的表名和列族名,然后agent就可以将收集到的日志逐条插入到数据库中。例子如下:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
CentOS7环境下搭建flume相关推荐
- 实现在CentOS7环境下搭建个人github博客
实现在CentOS7环境下搭建个人github博客 主机要求:必须是centos环境版本可以不一样,假如你用的是其他版本的linux系统,只不过是命令不太一样. Tips:这里提前说下,对于Ubunt ...
- CentOS7环境下搭建Kibana
本次安装的Kibana主要用于展示<CentOS7环境下搭建ElasticSearch>中搭建的ES,其详细安装部署过程如下所示: 1.解压Kibana压缩包kafka_2.11-0.8. ...
- CentOS7环境下搭建ElasticSearch
搭建环境在11.12.112.209主机,本文件主要说明搭建一个ES,如果多个es,只有让他们的配置文件中的集群名称cluster.name配置成一样,它们就会自动组成一个集群. 一.搭建elasti ...
- CentOS7环境下搭建Kafka
计划使用三台主机:11.12.112.206.11.12.112.207.11.12.112.208搭建Kafka集群环境, 使用的zookeeper集群为:11.12.112.215:2181,11 ...
- centos7环境下搭建storm集群
前提: 使用的zookeeper集群为:11.12.112.215:2181,11.12.112.216:2181,11.12.112.217:2181 并且zookeeper集群已经启动成功:下面将 ...
- Centos7环境下etcd集群的搭建
Centos7环境下etcd集群的搭建一.简介"A highly-available key value store for shared configuration and service ...
- CentOS7.5环境下搭建禅道
CentOS7.5环境下搭建禅道 在安装配置禅道之前,可以百度了解一下两款项目管理工具禅道与JIRA的区别. 一.安装 进入禅道官网https://www.zentao.net,选择适用的版本进行安装 ...
- 本机装载VirtualBox+CentOS7环境下安装Docker
本机装载VirtualBox+CentOS7环境下安装Docker 一.环境准备 VirtualBox安装 官网下载对应的安装包安装 镜像下载 下载 CentOS7的镜像 二.基础步骤及设置 2.1新 ...
- 在CentOS7.6下搭建Oracle19c RAC集群
在CentOS7.6下搭建Oracle19c RAC集群 1.准备阶段 1.1 虚拟环境准备 1.2 Oracle版本包准备 2.服务器配置阶段 2.1.配置IP 2.2.停止不相关服务 2.3.配置 ...
最新文章
- 开篇语 2008.8.11
- 【机器视觉】 stop算子
- android app无感知自动升级,Android中实现用户无感知处理后台崩溃
- 嵌入式开发常用工具软件
- 信息奥赛一本通(1311:【例2.5】求逆序对)
- ajax请求可以延时吗,延时校验AJAX请求
- QT下添加*.qrc(图标Icon、图像)资源
- javar -jar 和 java -cp :命令行运行java代码
- 为 Retrofit2 提供的 FastJson 转换库
- 计量经济学计算机实验报告,计量经济学实验报告.doc
- Microsoft Visual SourceSafe的使用
- 微信开发--微信分享自定义图标和标题
- firefox浏览器一分钟去广告--去广告插件安装教程(adblock plus)
- Java自由虾旅行平台菜单功能
- 网上交易的卫兵—数字证书和数字签名
- VLOOKUP函数的多条件引用
- 什么是生化分析中的反应曲线?
- JavaScript 播放多条音频与延迟播放
- 看完这篇Java基础,你也是天花板
- 上传图片直接显示图片操作
热门文章
- window10安装oracle VirtualBox 虚拟机+ubuntu16.04安装Ros
- python3 super_python3的super详解
- 接口中的成员只有静态常量和什么_为什么阿里工程师代码写的好?看看他的代码规范就知道了...
- python椭圆识别_Python+pillow计算椭圆图形几何中心
- 数据逻辑讨论计算机,1绪论信息技术算法与程序福建教师招考.ppt
- pycharm 怎么快速生成文件夹结构_为什么Python代码能运行但是PyCharm给我画红线?...
- python polar contour_13.极坐标系(Polar coordinate system)
- mac安装python3.8_mac安装pwntools(python3.8)解决多数问题
- 手机如何访问电脑局域网文件共享服务器,数据共享 手机怎么访问电脑文件?多个设备之间数据共享...
- wpf tabitem 点击事件_Mindfusion教程:WPF中的Fishbone(Ishikawa)图