先附上flume官网地址:http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html#kafka-channel
flume有三大组件:source,channel,sink。
根据不同的适用场景其中又有进一步的分类,channel见得比较多的有file channel(base on secondary memory)、memory channel (base on main memory)。file channel的安全性要比memory channel高,但效率次之;反之即反。
在使用flume实时监控采集日志对接kafka时,kafka channel将会是一个比较好的选择(详细见这位大佬博客:flume高并发优化——(4)kafka channel)。
在此使用flume1.7,因为在此版本之前关于kafka Channel有一个业界前辈口中的bug;和a1.channels.c1.parseAsFlumeEvent = false参数设置有关,在1.7版本之前即使将这个参数设置为false还是会将head数据进行输出(就像是将head数据错放在body一样),如此导致使用上的不便。

1 flume日志配置文件如下

#flume-kafka-channel.conf
a1.sources=r1
a1.channels=c1 c2# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/wh/kafka-channel-positionFile
a1.sources.r1.filegroups = f1
#monitor multifile
a1.sources.r1.filegroups.f1 = /tmp/logs/sei-dk.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2#interceptor
a1.sources.r1.interceptors =  i1 i2
a1.sources.r1.interceptors.i1.type = eclab.interceptor.BaseInterceptor$Builde
a1.sources.r1.interceptors.i2.type = eclab.interceptor.OtInterceptor$Builde#fetching kafka topic
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_base= c1
a1.sources.r1.selector.mapping.topic_ot = c2# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = chdp11:9092,chdp12:9092,chdp13:9092
a1.channels.c1.kafka.topic = topic_base
#have a bug in flume 1.6 using in kafkaChannel
a1.channels.c1.parseAsFlumeEvent = falsea1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = chdp11:9092,chdp12:9092,chdp13:9092
a1.channels.c2.kafka.topic = topic_ot
a1.channels.c2.parseAsFlumeEvent = false

2 创建kafka topic
对接的是kafka topic,执行之前需要在kafka中创建对应的topic(关于kafka基本操作可见这篇文章:kafka控制台基本操作命令)

/usr/SFT/kafka-0.11/bin/kafka-topics.sh --zookeeper chdp11:2181 --create --replication-factor 2 --partitions 2 --topic topic_ot

3 使用脚本方便在多台机器上进行日志采集

#! /bin/bash
#fng.sh
case $1 in
"start"){for host in chdp11 chdp12doecho " --------stop flume colllect: $host-------"ssh $host "source /etc/profile ;nohup /usr/SFT/flume-1.7/bin/flume-ng agent --conf-file /usr/SFT/flume-1.7/wh/flume-kafka-channel.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/dev/null 2>&1 &"done
};;
"stop"){for host in chdp11 chdp12doecho " --------start flume colllect: $host-------"ssh $host "ps -ef | grep file-kafka | grep -v grep |awk '{print \$2}' | xargs kill"done};;

脚本相关解释
(1)注意其中的source /etc/profile ;,这段代码可以添加在~/.bashrc文件下(在此假设已配置ssh免密),也可以写在以下脚本中。否则无法启动flume-ng。
(2)代码中使用awk结合shell工具获取正在后台运行的日志采集程序pid,并使用kill较为温柔的方法结束进程。
(3)nohup( no hang up):类似于守护进程,进程在终端退出登录后继续运行。
(4)/dev/null:黑洞文件,直接丢失。
(5)... >/dev/null 2>&1详述如下:
0:标准输入 从键盘获得输入 /proc/self/fd/0
1:标准输出控制台 /proc/self/fd/1
2:错误输出到控制台 /proc/self/fd/2
上面代码(>/dev/null 2>&1)可以理解为将错误输出等效于标准输出,而标准输出已经重定向到“黑洞”,故而将错误输出也丢失了。
想要详细了解可以参见这位大佬博客:Linux里的2>&1究竟是什么
部分内容copy如下:

从command>/dev/null说起
其实这条命令是一个缩写版,对于一个重定向命令,肯定是a > b这种形式,那么command > /dev/null难道是command充当a的角色,/dev/null充当b的角色。这样看起来比较合理,其实一条命令肯定是充当不了a,肯定是command执行产生的输出来充当a,其实就是标准输出stdout。所以command > /dev/null相当于执行了command 1 > /dev/null。执行command产生了标准输出stdout(用1表示),重定向到/dev/null的设备文件中。
说说2>&1
通过上面command > /dev/null等价于command 1 > /dev/null,那么对于2>&1也就好理解了,2就是标准错误,1是标准输出,那么这条命令不就是相当于把标准错误重定向到标准输出么。等等是&1而不是1,这里&是什么?这里&相当于等效于标准输出。这里有点不好理解,先看下面。
command>a 2>a 与 command>a 2>&1的区别
通过上面的分析,对于command>a 2>&1这条命令,等价于command 1>a 2>&1可以理解为执行command产生的标准输入重定向到文件a中,标准错误也重定向到文件a中。那么是否就说command 1>a 2>&1等价于command 1>a 2>a呢。其实不是,command 1>a 2>&1与command 1>a 2>a还是有区别的,区别就在于前者只打开一次文件a,后者会打开文件两次,并导致stdout被stderr覆盖。&1的含义就可以理解为用标准输出的引用,引用的就是重定向标准输出产生打开的a。从IO效率上来讲,command 1>a 2>&1比command 1>a 2>a的效率更高。
————————————————
版权声明:本文为CSDN博主「GGxiaobai」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/ggxiaobai/article/details/53507530

Flume Kafka Channel使用案例相关推荐

  1. Flume+Kafka+Spark小案例

  2. Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

    http://blog.51cto.com/xpleaf/2104160?cid=704690 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进 ...

  3. Flume+Kafka+Strom基于伪分布式环境的结合使用

    --------------------------------------- 博文作者:迦壹 博客地址:Flume+Kafka+Strom基于伪分布式环境的结合使用 转载声明:可以转载, 但必须以超 ...

  4. Flume原理及使用案例

    本文为转载篇!原文: https://www.cnblogs.com/zhangyinhua/p/7803486.html https://www.cnblogs.com/ciade/p/549521 ...

  5. Flume+Kafka+Spark Streaming+MySQL实时日志分析

    文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...

  6. Flume笔记二:案例

    案例一: 复制和多路复用 vim a1.conf //第一道flume #各个组件命名 a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2#Sou ...

  7. Flume+Kafka双剑合璧玩转大数据平台日志采集

    点击上方蓝色字体,选择"设为星标" 回复"资源"获取更多资源 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 大数据真好玩 点击右侧关注,大数据真好 ...

  8. 业务数据采集_零点漂移处理方法(Flume+Kafka+HDFS)

    最近做了一个业务数据采集,总体架构就是 Flume-taildir source + kafka channel => Kafka => Flume-kafka source + memo ...

  9. flume+kafka消费数据【纯个人笔记】

    1.数据生产 使用java代码往一个文件中写入数据 package com.mobile;import java.io.*; import java.text.DecimalFormat; impor ...

  10. Kafka吞吐量测试案例

    Kafka吞吐量测试案例 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 领英公司参考连接:https://www.slideshare.net/JiangjieQin/produc ...

最新文章

  1. 用javascript打造一个简单的小人互殴系统(实现javascript游戏基本要素:生命值、伤害、移动、闪避等)
  2. Webpack不生成index.html
  3. ai背景合成_智能合成AI主播很危险,应立即取消!
  4. 什么是A记录,子域名,CNAME别名,MX记录,TXT记录,SRV 记录,泛域名(泛解析),域名转向,域名绑定...
  5. 关于C#中timer类
  6. 实用的 Python 包 —— 使用 win32 的剪贴板
  7. Pytorch 的迁移学习的理解
  8. PHP作用域和文件夹操作
  9. 个人作业--数组(续一)
  10. HAOI2018 反色游戏
  11. 推荐一款UI非常Good的 Redis 客户端工具
  12. jdbc连接timesten_JDBC远程连接TimesTen
  13. webService接口调试工具——Strom
  14. 面板模型混合效应模型_树助混合效应模型
  15. 时钟屏保fliqlo
  16. 视频教程-Java高级技术-Java
  17. Java实现图片上传到服务器,并把上传的图片读取出来
  18. linux系统下搜索文件,Linux系统下搜索文件的方法
  19. 单线激光雷达的外参标定方法
  20. js实现时间戳转化为自定义格式的年月日时分秒(yyyy-MM-dd HH:mm:ss)

热门文章

  1. 巧妙的位运算及模运算
  2. UE4天气效果加白天黑夜的平滑过度切换
  3. vue打开新html,vue在新窗口打开页面的方法
  4. java是牌子的眼镜多少钱一副_世界上最贵的眼镜:LOTOS眼镜,一副眼镜要500万元...
  5. c语言指针选择题库及答案,C语言指针练习习题及答案.doc
  6. 建议收藏|一文带你读懂 Prisma 的使用
  7. Springboot自定义注解+AOP实现日志管理
  8. 百度将严厉打击熊掌号发布大量与号领域不匹配的内容
  9. scrapy爬取动态网页
  10. RK 3568 IDB烧录失败解决方法