kafka+flume 实时数据处理

1.监测数据处理技术路线

1.1数据层

2.介绍技术

我们很多人在在使用Flume和kafka时,都会问一句为什么要将Flume和Kafka集成?

​ 一般使用Flume+Kafka架构都是希望完成实时流式的日志处理,如果Flume直接对接实时计算框架,当数据采集速度大于数据处理速度,很容易发生数据堆积或者数据丢失,而kafka可以当做一个消息缓存队列,从广义上理解,把它当做一个数据库,可以存放一段时间的数据。第二、Kafka属于中间件,一个明显的优势就是使各层解耦,使得出错时不会干扰其他组件。因此数据从数据源到flume再到Kafka时,可以做实时计算,可实现数据多分发。

flume

Flume 是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,通过监控整个文件目录或者某一个特定文件,用于收集数据;同时Flume也 提供数据写到各种数据接受方(可定制)的能力,用于转发数据。Flume 的易用性在于通过读取配置文件,可以自动收集日志文件,在大数据处理及各种复杂的情况下,flume 经常被用来作为数据处理的工具。

Source:Flume 搜集日志的方式多种多样,比如可以检测文件夹的变化spool Source,可以监测端口信息 Netcat Source,可以监控某各文件新增的内容 Exec Source等等,通常使用检测文件夹变化的方式来实时收集信息,所以本例中我们也将使用Spool Source。
C

hannel:提供了一层缓冲机制,来实现数据的事务性传输,最大限度保证数据的安全传输。常用的有MemoryChannel:所有的events 被保存在内存中,优点是高吞吐,缺点是容量有限并且Agent 死掉时会丢失内存中的数据;

FileChannel:所有的Events 被保存在文件中,优点是容量较大且死掉时数据可恢复,缺点是速度较慢。因此为了保证Event 在数据流点对点传输中是可靠地,要注意Channel 的选择。目前为了提高速度,我们暂时采用MemoryChannel,之后的目标是实现一个自定义channel—doubleChannel,解决上述的两个痛点问题。

Sink:将数据转发到目的地,或者继续将数据转发到另外一个source,实现接力传输,多层之间通过AVRO Sink来实现。本例中,我们的最终目标是实现实时数据处理,因此实时的采集数据流就把数据发送到Kafka 中。

kafka

Kafka 是由LinkedIn 开发的开源分布式消息系统,主要用于处理LinkedIn 的活跃数据,说白了也就是用户访问日志数据。这些数据主要包括PV、UV、用户行为(登陆、浏览、搜索、分享、点击)、系统运行日志(CPU、内存、磁盘、进程、网络)等方面的数据。这些数据通常以日志的形式进行存储,现有的消息队列系统可以很好的用于日志分析系统对于实时数据的处理,提高日志解析效率。那么说到Kafka,就必须掌握三个原理部分:Producer、Topic、Consumer:

Producer:消息和数据的生产者,向Kafka的一个topic发布消息的过程即为生产过程,在本例中Flume应该是Producer;
Topic:主题,Kafka处理的消息的不同分类(逻辑概念),可以根据Topic的不同,去区分处理不同的消息。说的更直白一些,Topic就是起到资源隔离的作用,Producer向指定Topic中产生消息,Consumer再从指定的Topic中消费消息。
Consumer:消息和数据的消费者,订阅topic并处理其发布的消息的过程即为消费过程。

4.配置文件

声明基本组件 Source Channel Sink example2.properties

a1.sources = s1
a1.sinks = sk1
a1.channels = c1

配置Source组件,从Socket中接收数据

a1.sources.s1.type = netcat
a1.sources.s1.bind = node0
a1.sources.s1.port = 44444

配置Sink组件,将接收数据输出到对应kafka

#a1.sinks.sk1.type = logger

a1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sk1.kafka.bootstrap.servers = node0:9092
a1.sinks.sk1.kafka.topic = topic01
a1.sinks.sk1.kafka.flumeBatchSize = 20
a1.sinks.sk1.kafka.producer.acks = 1
a1.sinks.sk1.kafka.producer.linger.ms = 1
a1.sinks.sk1.kafka.producer.compression.type = snappy

配置Channel通道,主要负责数据缓冲

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

组件间的绑定

a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1

注意

A netcat-like source that listens on a given port and turns each line of text into an event. Acts like nc -k -l [host] [port]. In other words, it opens a specified port and listens for data. The expectation is that the supplied data is newline separated text. Each line of text is turned into a Flume event and sent via the connected channel.

一个类似于 netcat 的源,它侦听给定的端口并将每一行文本转换为一个事件。 作用类似于 nc -k -l [host] [port]。 换句话说,它打开一个指定的端口并监听数据。 期望提供的数据是换行符分隔的文本。 每行文本都变成一个 Flume 事件并通过连接的通道发送。

5.项目演示

kafka+flume 实时数据处理相关推荐

  1. 使用 Kafka 和 Spark Streaming 构建实时数据处理系统

    使用 Kafka 和 Spark Streaming 构建实时数据处理系统  来源:https://www.ibm.com/developerworks,这篇文章转载自微信里文章,正好解决了我项目中的 ...

  2. 浅析Kafka实时数据处理系统

    Kafka是啥?用Kafka官方的话来说就是: Kafka is used for building real-time data pipelines and streaming apps. It i ...

  3. 基于HDP使用Flume实时采集MySQL中数据传到Kafka

    注意:HDP中Kafka broker的端口是6667,不是9092 如有需要请看:基于HDP使用Flume实时采集MySQL中数据传到Kafka+Hive/HDFS 1.将flume-ng-sql- ...

  4. Flume实时采集mysql数据到kafka中并输出

    环境说明 centos7 flume1.9.0(flume-ng-sql-source插件版本1.5.3) jdk1.8 kafka 2.1.1 zookeeper(这个我用的kafka内置的zk) ...

  5. Apache Pulsar:实时数据处理中消息,计算和存储的统一

    本文转载自"AI前线",整理自翟佳在 QCon2018 北京站的演讲,在本次演讲中,翟佳介绍了 Apache Pulsar 的架构.特性和其生态系统的组成,并展示了 Apache ...

  6. 大数据学习--kafka+flume++sqoop+hadoop+zookeeper+spark+flink

    大数据工程师 学习指南 一必备技能 Zookeeper.Hadoop.Hive.HBase.Flume.Sqoop.Flink 等等 1定义(from百度百科) 1.1Zookeeper 百度百科-验 ...

  7. 基于 MaxCompute 的实时数据处理实践

    简介: MaxCompute 通过流式数据高性能写入和秒级别查询能力(查询加速),提供EB级云原生数仓近实时分析能力:高效的实现对变化中的数据进行快速分析及决策辅助.当前Demo基于近实时交互式BI分 ...

  8. sftp访问_实时数据处理探索:接收、处理、访问

    ETL(也包括ELT)是数据处理工作里必不可少的步骤,一直以来通常都是以天或小时为单位采用批处理来对大量的数据进行 ETL 操作.随着业务的增长及需求的变化,用户/客户希望能更快的看到各类数据操作的结 ...

  9. Flink实时数据处理实践经验(Flink去重、维表关联、定时器、双流join)

    Flink实时数据处理实践经验 文章目录 Flink实时数据处理实践经验 1. 数据输入与预处理 2. 实时数据处理 3. 实时数仓架构 4. 优化方案 Java.大数据开发学习要点(持续更新中-) ...

最新文章

  1. 漫画:什么是快速排序?(完整版)
  2. centos7下安装intel Media Server Studio记录
  3. 洛谷 P2573 [SCOI2012]滑雪
  4. 【SSH进阶之路】Struts + Spring + Hibernate 进阶开端(一)
  5. 分享一个android debug模式,出现 waiting for debugger把界面卡住,取巧的解决办法
  6. Go 语言编程 — 基本数据类型
  7. python软件安装-学python安装的软件总结
  8. java jframe 背景颜色_Java JFrame背景颜色不起作用
  9. 后台(crontab,at,,nohup)
  10. 某一个物种能够在 1 分钟之内干掉资深程序员...网友称:恐怖如斯!
  11. php获取sessionstorage,关于PHP session 存储方式的详细介绍
  12. 独家 | 2019届互联网校招本科薪酬清单|湾区人工智能
  13. php开发神器 -- phpStudy
  14. mysql中float、double、decimal的区别
  15. java se11.0.1安装_jdk11下载安装及环境变量配置
  16. docker启动elasticsearch容器put数据时: SERVICE_UNAVAILABLE/1/state not recovered /initialized
  17. r语言把两个折线图图像放到一个图里_OpenCV计算机视觉学习(10)——图像变换(傅里叶变换,高通滤波,低通滤波)...
  18. Soft Diffusion:谷歌新框架从通用扩散过程中正确调度、学习和采样
  19. 几何光学学习笔记(2)- 1.2 费马原理、马吕斯定律和成像
  20. 华为如何在开发者选项观察错误日志_爬虫scrapy框架--log日志输出配置及使用

热门文章

  1. AutoDesk CAD如何彻底卸载/不影响二次安装
  2. 模拟退火算法(Simulated Annealing,SA)MATLAB案例详细解析
  3. BPDU保护与边缘端口、RLDP防环
  4. Nim问题和阶梯Nim(staircase nim)
  5. 将Sublime Text 设置成中文版
  6. 个人隐私保护5:和 金士顿 DataTraveler Locker+加密盘优势比较
  7. webpack中的chunk
  8. python 求两线段是否相交,如果相交求交点
  9. 【微信小程序导入项目报错:[app.json文件内容错误]app.json未找到】解决方法
  10. linux mutt 收不到邮件,mutt 发邮件189邮件收不到邮件内容的解决办法