1,安装并成功能运行flume

2,安装并成功能运行kafka

3,安装并成功能运行zookeeper

4,开始整合flume收集的数据,写入kafka

a,修改flume的配置文加:

vim  flume_kafka.conf

agent1.sources = r1

agent1.sinks = k1

agent1.channels = c1

# Describe/configure the source

agent1.sources.r1.type = exec

agent1.sources.r1.command=tail -f /opt/logs/usercenter.log

# Use a channel which buffers events in memory

agent1.channels.c1.type = memory

agent1.channels.c1.capacity = 1000

agent1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

agent1.sources.r1.channels = c1

agent1.sinks.k1.channel = c1

# # Describe the sink  这部分就是输入到kafka的写法

##############################################

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1.topic = test

agent1.sinks.k1.brokerList = hadoop1:9092,hadoop2:9092,hadoop3:9092

agent1.sinks.k1.requiredAcks = 1

agent1.sinks.k1.batchSize = 20

##############################################

b,下载第三方插件

下载flume-kafka-plus:https://github.com/beyondj2ee/flumeng-kafka-plugin

把lib目录下的

和package下的

都放到flume的lib目录

如果,报错,请看这个文档

http://wenda.chinahadoop.cn/question/4079?notification_id=290954&rf=false&item_id=10382#!answer_10382

修改原有的flume-conf文件

在插件包里有一个flume-conf.properties,把这个文件放到flume的conf文件夹里

然后修改以下内容

producer.sources.s.type = execproducer.sources.s.command = tail -f -n+1  /opt/logs/test.logproducer.sources.s.channels = c
……
producer.sinks.r.custom.topic.name=test
……
consumer.sources.s.custom.topic.name=test

c:启动服务

启动zookeeper集群

zkServer.sh start

zkServer.sh start

zkServer.sh start

还需要创建一个新的地址

zookeeper/bin/zkCli.sh

create /kafka  test

启动kafka broker 集群

bin/kafka-server-start.sh config/server.properties

bin/kafka-server-start.sh config/server.properties

bin/kafka-server-start.sh config/server.properties

创建kafka topic

bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic test

启动kafka consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic test --from-beginning

启动flume

bin/flume-ng agent --conf conf --conf-file conf/flume_kafka.properties --name producer -Dflume.root.logger=INFO,console

测试

echo "this is a test" >> /opt/logs/test.log

此时只要能在consumer里现“this is a test”就表示成功

错误总结:

http://472053211.blog.51cto.com/3692116/1655844

转载于:https://blog.51cto.com/douya/1860896

flume 整合kafka相关推荐

  1. Flume整合Kafka采集滚动的日志

    背景: 从Nginx中间件采集web项目产生的滚动日志.通过本地服务器(简称:A服务)的Flume采集日志,然后传输到另外一台服务器(简称:B服务器)的Flume上,然后暂存到 B服务器 的Kafka ...

  2. Flume与Kafka整合案例详解

    环境配置 名称 版本 下载地址 Centos 7.0 64x 百度 Zookeeper 3.4.5   Flume 1.6.0   Kafka 2.1.0   flume笔记 直接贴配置文件 [roo ...

  3. 大数据———Flume与Kafka整合

    环境配置 名称 版本 下载地址 Centos 7.0 64x 百度 Flume 1.8.0 http://flume.apache.org/download.html Kafka 2.11 http: ...

  4. 大数据集群搭建(12)——Flume和Kafka的整合

    Flume和Kafka的整合 1.配置flume,在flume的conf目录下新建文件(flume_kafka.conf)并配置.  ################################# ...

  5. 整合Flume和Kafka完成实时数据采集

    需要注意:参考的网站要与你的kafka的版本一致,因为里面的字段会不一致 例如:http://flume.apache.org/releases/content/1.6.0/FlumeUserGuid ...

  6. flume与kafka的整合

    案例1:syslog-memory-kafka 将flume采集到的数据落地到kafka上,即sink是kafka(生产者身份) vim syslog-mem-kafka.conf # 命名个组件 a ...

  7. flume消费kafka数据太慢_kafka补充01

    为什么高吞吐? •写数据 –1.页缓存技术 •kafka写出数据时先将数据写到操作系统的pageCache上,由操作系统自己决定什么时候将数据写到磁盘上 –2.磁盘顺序写 •磁盘顺序写的性能会比随机写 ...

  8. [大数据] 搜索日志数据采集系统 flume+hbase+kafka架构 (数据搜狗实验室)

    1 采集规划 说明: D1 日志所在服务器1 -bigdata02.com D2 日志所在服务器2 -bigdata03.com A flume2 - bigdata02.com 日志收集 C flu ...

  9. 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka

    目录 整合 Kafka 说明 Kafka特定配置 ​​​​​​​KafkaSoure 1.消费一个Topic数据 2.消费多个Topic数据 3.消费通配符匹配Topic数据 ​​​​​​​Kafka ...

  10. 2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用

    目录 整合Kafka 0-10-开发使用 原理 1.Direct方式 2.简单的并行度1 : 1 ​​​​​​​API 注意 ​​​​​​​代码实现-自动提交偏移量到默认主题 ​​​​​​​代码实现- ...

最新文章

  1. 目标检测Anchor的What/Where/When/Why/How
  2. android开发重要控件,Android界面编程——Android基本控件
  3. 报名 | 大咖云集,清华方圆系列之大数据分析与可视化报告会
  4. 本弗莱数据可视化的生产流程图_力控锂离子电池车间数据采集系统
  5. Runtime底层原理--Runtime简介、函数注释
  6. 「技美之路」图形 1.1 渲染流水线
  7. 为什么持续集成和部署在开发中非常重要?
  8. PagerSwitch tab样式加上下拉刷新(一)
  9. activemq中怎么知道推送消息是否成功_消息队列面试,你能顶得住面试官这波10大连环炮的攻势吗?...
  10. 所谓的三十之后无技术
  11. java的property_「propertyutils」java之PropertyUtils - seo实验室
  12. Docker-ce在线安装
  13. 搭建一个开发Predix软件的Windows系统(1)准备工作
  14. Chrome如何安装第三方扩展插件(crx)
  15. (七)Zabbix实现微信报警通知
  16. 小说程序源码 自动采集
  17. vue父传子通过对象的形式简写
  18. Rabbitmq用户角色
  19. 用Kettle的一套流程完成对整个数据库迁移
  20. 国内常见的CPU品牌与架构

热门文章

  1. Lesson_8 上课笔记 ----继承
  2. 除了数据恢复,EasyRecovery还有这样的功能!
  3. 一首歌是怎么诞生的?
  4. Git下修改提交的author和email信息
  5. netstat 用法
  6. oracle 体系结构及内存管理 14_锁
  7. 面试 其实就是短时间内展现出你最好的自我
  8. 故障:“远程计算机需要网络级别身份验证
  9. 如何在 Mac 上发布警报?
  10. ScreenFlow for mac(屏幕录像软件)