环境准备:

涉及到的技术有flume,Kafka,zookeeper。

操作步骤:

1、构建agent

train.sources=trainSource
train.channels=trainChannel
train.sinks=trainSinktrain.sources.trainSource.type=spooldir
train.sources.trainSource.spoolDir=/opt/kb15tmp/flumelogfile/train
train.sources.trainSource.deserializer=LINE
train.sources.trainSource.deserializer.maxLineLength=320000
train.sources.trainSource.includePattern=train_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
train.sources.trainSource.interceptors=head_filter
train.sources.trainSource.interceptors.head_filter.type=regex_filter
train.sources.trainSource.interceptors.head_filter.regex=^user*
train.sources.trainSource.interceptors.head_filter.excludeEvents=truetrain.channels.trainChannel.type=file
train.channels.trainChannel.checkpointDir=/opt/kb15tmp/checkpoint/train
train.channels.trainChannel.dataDirs=/opt/kb15tmp/checkpoint/data/traintrain.sinks.trainSink.type=org.apache.flume.sink.kafka.KafkaSink
train.sinks.trainSink.batchSize=640
train.sinks.trainSink.brokerList=192.168.91.180:9092
train.sinks.trainSink.topic=traintrain.sources.trainSource.channels=trainChannel
train.sinks.trainSink.channel=trainChannel

2、启动Kafka和zookeeper
启动zookeeperzkServer.sh start
启动Kafkanohup kafka-server-start.sh /opt/soft/kafka211/config/server.properties &

3、启动消费者进行消费
首先先创建主题,kafka-topics.sh --create --zookeeper 192.168.91.180:2181 --topic train --partitions 1 --replication-factor 1
消费:
kafka-console-consumer.sh --bootstrap-server 192.168.91.180:9092 --topic train --from-beginning

4、启动flume
./bin/flume-ng agent --name train --conf conf/ --conf-file conf/KB15conf/train.conf -Dflume.root.logger=INFO,console

5、将需要消费的日志文件拷贝到指定的文件夹下
cp train.csv /opt/kb15tmp/flumelogfile/train/train_2021-12-27.csv

Flume采集日志到Kafka经典案例相关推荐

  1. Flume采集日志数据

    一.为什么选用Flume? Flume vs Logstash vs Filebeat 当时选择数据采集工具时,我们主要参考了市面上热度比较高的Flume和Logstash还有Filebeat,据目前 ...

  2. Kubernetes日志收集:log-pilot采集日志到kafka

    1.log-pilot配置环境变量 需要修改log-pilot环境变量的两个参数 LOGGING_OUTPUT的值配置为kafka 若将日志采集到kafka,则需要新增KAFKA_BROKERS环境变 ...

  3. SparkStreaming+kafka+flume+hbase日志实时流处理项目

    1.项目背景: 互联网访问日志概述 为什么要记录访问日志的行为呢? 通过日志我们可以得到网站页面的访问量,网站的黏性,推荐用户行为分析,是指在获得网站访问量基本数据的情况下,对有关数据进行统计.分析, ...

  4. Kafka对接采集日志Flum的集群搭建与部署

    目录 Kafka简介 消息队列 Kafka的应用场景 消息队列的两种模型 Kafka中的重要概念 消费者组 幂等性 Kafka集群搭建 kafka集群部署 kafka启动脚本 Kafka命令行操作 1 ...

  5. 【采集层】Kafka 与 Flume 如何选择

    2019独角兽企业重金招聘Python工程师标准>>> 采集层 主要可以使用Flume, Kafka两种技术. Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过 ...

  6. 【Linux】logrotate切割Tomcat日志并轮转(Flume采集准备工作适用)

    [Linux]logrotate切割Tomcat日志并轮转(Flume采集准备工作适用) 背景及使用场景 系统及软件环境 解决方案及具体操作 logrotate配置文件编辑 logrotate全部配置 ...

  7. flume采集hive日志写到hdfs问题

    解决flume采集hive日志写到hdfs问题 在配置flume写日志到hdfs发现写不进去,很苦恼,查了很多资料,终于找到了原因! 原来 用Flume监听目录,在自己建的.conf文件中需要配置hd ...

  8. Flume与Kafka整合案例详解

    环境配置 名称 版本 下载地址 Centos 7.0 64x 百度 Zookeeper 3.4.5   Flume 1.6.0   Kafka 2.1.0   flume笔记 直接贴配置文件 [roo ...

  9. 生成器案例,#采集日志

    生成器: 只要在函数体中看到yield,那么这个函数就是生成器. yield功能: 1:为我们提供了一种自定义迭代器对象的方法 2:yield其实也是一种返回值,他和return的区别就是:他可以返回 ...

最新文章

  1. Access把每一天的数据累加_如何设计 QQ、微信等第三方账号登陆 ?以及设计数据库表!...
  2. java中堆与栈的区别 彻底理解
  3. 澳洲留学征文活动获奖情况公布
  4. Testing and Test-First Programming
  5. python两个列表合并 从小到大排列_Python对两个有序列表进行合并和排序的例子...
  6. 资源推荐—HTML5精品资源
  7. 一步一步学pwntools(适合新手)
  8. you have mixed tabs and spaces fix this
  9. SegmentFault无法访问,因出现违规内容被网警要求停机!
  10. 依赖注入的两种方式并附上例子
  11. WingIDE中文乱码问题解决方法
  12. 【运动学】基于matlab斜抛物体斜坡射程【含Matlab源码 980期】
  13. 安装SQL Server2012
  14. python判断完美数_Python识别完美数
  15. android小米便签源代码分析,小米开源便签Notes-源码研究(1)-导出功能整体思路
  16. 使用最小二乘法拟合曲线
  17. Hive 知识体系保姆级教程
  18. linux openwrt 域名,linux dnspod客户端(适用于openwrt,ddwrt, centos, ubuntu等)
  19. 2019云栖大会归来有感
  20. 《大数据处理技术Spark》--林子雨

热门文章

  1. .NET Framework 4.0/4.5离线版下载
  2. Jenkins-流水线相关知识
  3. php表单的交互(post方法)
  4. Josh 的学习笔记之数字通信(Part 5——通信链路分析)
  5. python:海龟交易法则 画唐奇安通道
  6. 树莓派3使用红外遥控器模拟鼠标和键盘
  7. Java国际化编程之中英文切换
  8. 机器人3D视觉在物流仓储领域的自动化应用
  9. 手机音质变差_为什么不同手机音质差别那么大?这4个方面很重要
  10. 目标检测YOLO实战应用案例100讲-基于深度学习的自动驾驶目标检测算法研究