安装

1、下载
http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
2、上传到指定的服务器(master)中的某个目录
3、解压
tar -xvf apache-flume-1.8.0-bin.tar.gz
4、cd apache-flume-1.8.0-bin/conf
5、cp flume-conf.properties.template flume-conf.properties
6、vi flume-conf.properties

几个小案例带你了解flume

一个理论:flume每一次接收到的数据称为一个事件
说在前面telnet 需要自己去装,然后启动:telnet localhost 44445

案例一

接收netcat的数据展示到console

## 定义 sources、channels 以及 sinks
# 数据源
agent1.sources = netcatSrc
# 收集的数据放在那里,memoryChannel这个是放在内存中
agent1.channels = memoryChannel
# 数据放在那里loggerSink这个是以log的方式打印在控制台上
agent1.sinks = loggerSink## netcatSrc 的配置
# 启动的服务
agent1.sources.netcatSrc.type = netcat
# Ip是什么
agent1.sources.netcatSrc.bind = localhost
# 在哪个端口上
agent1.sources.netcatSrc.port = 44445## loggerSink 的配置
# 以log的方式写到控制台上
agent1.sinks.loggerSink.type = logger## memoryChannel 的配置
agent1.channels.memoryChannel.type = memory
# 能装100条记录
agent1.channels.memoryChannel.capacity = 100## 通过 memoryChannel 连接 netcatSrc 和 loggerSink
agent1.sources.netcatSrc.channels = memoryChannel
agent1.sinks.loggerSink.channel = memoryChannel# agent 一个代理  conf用conf的方式启动 agent1就是配置里的那个
# -Dflume.root.logger=INFO,console   info的方式打印在控制台
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name agent1 -Dflume.root.logger=INFO,console
接收到的数据先放内存,再打印到控制台上

案例二

接收telnet的数据,每五次事件存一次hdfs

## 定义 sources、channels 以及 sinks
agent1.sources = netcatSrc
# memoryChannel 可能会丢失数据  应该把它换成文件的形式
agent1.channels = memoryChannel
# 这个是把它放到hdfs去
agent1.sinks = hdfsSink## netcatSrc 的配置
agent1.sources.netcatSrc.type = netcat
agent1.sources.netcatSrc.bind = localhost
agent1.sources.netcatSrc.port = 44445## hdfsSink 的配置
# 保存到hdfs上去
agent1.sinks.hdfsSink.type = hdfs
# 放到这个目录下去
agent1.sinks.hdfsSink.hdfs.path = hdfs://master:8020/user/hadoop-jrq/spark-course/steaming/flume/%y-%m-%d
# 每5次flume事件写一次HDFS
agent1.sinks.hdfsSink.hdfs.batchSize = 5
# 这个一定位true,否则会报错
agent1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true## memoryChannel 的配置
agent1.channels.memoryChannel.type = memory
agent1.channels.memoryChannel.capacity = 100## 通过 memoryChannel 连接 netcatSrc 和 hdfsSink
agent1.sources.netcatSrc.channels = memoryChannel
agent1.sinks.hdfsSink.channel = memoryChannel启动:
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name agent1 

案例三

监控一个文件,每五次事件写一次hdfs

## 定义 sources、channels 以及 sinks
agent1.sources = logSrc
# 数据接收到后放在文件中去
agent1.channels = fileChannel
agent1.sinks = hdfsSink## logSrc 的配置
agent1.sources.logSrc.type = exec
# -F参数会有一个小bug因此,我使用-f0
agent1.sources.logSrc.command = tail -f0 /home/hadoop-jrq/spark-course/steaming/flume-course/demo3/logs/webserver.log## hdfsSink 的配置
agent1.sinks.hdfsSink.type = hdfs
agent1.sinks.hdfsSink.hdfs.path = hdfs://master:8020/user/hadoop-jrq/spark-course/steaming/flume/%y-%m-%d
agent1.sinks.hdfsSink.hdfs.batchSize = 5
agent1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true## fileChannel 的配置
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /home/hadoop-jrq/spark-course/steaming/flume-course/demo2-2/checkpoint
agent1.channels.fileChannel.dataDirs = /home/hadoop-jrq/spark-course/steaming/flume-course/demo2-2/data## 通过 fileChannel 连接 logSrc 和 hdfsSink
agent1.sources.logSrc.channels = fileChannel
agent1.sinks.hdfsSink.channel = fileChannel日志文件收集:实时的保存到hdfs中去
实时接收日志信息,然后保存到hdfs中去bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name agent1 echo testdata >> webserver.log

案例四,一个简单的模拟网站实时流处理

监控日志文件,写入kafka,然后再用spark streaming处理数据
flume配置:

agent1.sources = s1
agent1.channels = c1
agent1.sinks = k1agent1.sources.s1.type = exec
agent1.sources.s1.command = tail -fn0 /home/hadoop-jrq/spark-course/steaming/weblog.logagent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# kafka的主题
agent1.sinks.k1.kafka.topic = pageview
# kafka集群的地址
agent1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
# 每20次事件发送一次
agent1.sinks.k1.kafka.flumeBatchSize = 20
# ack机制的选用,不懂的,可以去看我kafka的ack机制文章
agent1.sinks.k1.kafka.producer.acks = 1
agent1.sinks.k1.kafka.producer.linger.ms = 1
# 压缩机制
agent1.sinks.k1.kafka.producer.compression.type = snappyagent1.sources.s1.channels = c1
agent1.sinks.k1.channel = c1启动flume:bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name agent1
# 模拟数据发送
echo testdata >> webserver.log

依赖的jar包:

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.2.0</version>
<!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.0</version>
<!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.5.2</version></dependency>

spark streaming代码:

import com.jrq.streaming.InternalRedisClient
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object PageViewStream {def main(args: Array[String]) {if (args.length < 2) {printIn("参数不够")System.exit(1)}println("开始")val sparkConf = new SparkConf()if (!sparkConf.contains("spark.master")){// 这个是因为我本地的 环境有点问题,手动加上的Hadoop环境,否则会报错,如果你的不会报错,就不需要加上System.setProperty("hadoop.home.dir", "D:\\hadoop-common-2.2.0-bin-master")// 我这是本地启动,这个程序最少要5个core才能启动,3个core用于接收数据,2个core用于处理数据sparkConf.setMaster("local[6]")}sparkConf.setAppName("DirectKafkaWordCount")//    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")val sc = new SparkContext(sparkConf)val Array(brokers, topics) = args// Create the contextval ssc = new StreamingContext(sc, Seconds(1))// 这里是因为我配置了HA的名字,因此程序中需要这么写,否则会有bug,就是当你master挂掉的时候,你的程序会报错,当然在本地测试的时候会有问题,换成hdfs://master:8020/就行了ssc.checkpoint("hdfs://mycluster/user/hadoop-jrq/spark-course/streaming/checkpoint")// 从kafka中读取数据val topicsSet = topics.split(",").toSetval kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)// 初始化一个kafka Direct 模式val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)// 且将每一行的字符串转换成PageView对象val pageViews = messages.map { case (_, value) => PageView.fromString(value) }// 每隔3秒统计前50秒内每一个URL的访问错误数val badCount = pageViews.window(Seconds(50), Seconds(3)).filter(_.status != 200).count()// 每隔两秒统计前15秒内有多少个用户访问了url:https:foo.com/val fooCount = pageViews.window(Seconds(15), Seconds(2)).filter(_.url != "https://foo.com/").map(view => (view.userID, 1)).groupByKey().count()// 每隔5秒统计前30秒内有多少活跃用户val activeUserCount = pageViews.window(Seconds(30), Seconds(1)).map(view => (view.userID, 1)).groupByKey().count()activeUserCount.print()activeUserCount.foreachRDD { rdd =>// 对每个分区进行遍历,写入redisrdd.foreachPartition { partitionRecords =>// redis 的连接池val jedis = InternalRedisClient.getPool.getResourcepartitionRecords.foreach { count =>jedis.set("active_user_count", count.toString)}// 写完后吧这个连接还给连接池InternalRedisClient.getPool.returnResource(jedis)}}ssc.start()ssc.awaitTermination()}
}/*表示访问一个网站页面的行为日志数据,格式为:url status zipCode userID\n*/
case class PageView(val url: String, val status: Int, val zipCode: Int, val userID: Int)extends Serializableobject PageView extends Serializable {def fromString(in: String): PageView = {val parts = in.split(" ")new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt)}
}

Redis连接池代码:

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool/*** Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool}*/
object InternalRedisClient extends Serializable {@transient private lazy val pool: JedisPool = {val maxTotal = 10val maxIdle = 10val minIdle = 1val redisHost = "192.168.240.186"val redisPort = 6379val redisTimeout = 30000val poolConfig = new GenericObjectPoolConfig()poolConfig.setMaxTotal(maxTotal)poolConfig.setMaxIdle(maxIdle)poolConfig.setMinIdle(minIdle)poolConfig.setTestOnBorrow(true)poolConfig.setTestOnReturn(false)poolConfig.setMaxWaitMillis(100000)val hook = new Thread{override def run = pool.destroy()}sys.addShutdownHook(hook.run)new JedisPool(poolConfig, redisHost, redisPort, redisTimeout)}def getPool: JedisPool = {assert(pool != null)pool}def main(args: Array[String]): Unit = {val p = getPoolval j = p.getResourcej.set("much", "")}
}

集群启动的方式

/home/hadoop-jrq/bigdata/spark-2.2 .0-bin-hadoop2 .7/bin/spark-submit
--class com.jrq.streaming.kafka.PageViewStream \
-- master spark :// master: 7077 \
-- deploy - mode client \
-- driver - memory 512 m \
-- executor - memory 512 m \
-- total - executor - cores 5 \
-- executor - cores 2 \
/home/hadoop-jrq/spark-course/steaming/spark-streaming-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar \
master: 9092, slave1: 9092, slave2: 9092  pageview

模拟发送日志的方式:
echo http:// baidu.com 200 899787 22224 >> weblog.log

Flume安装及几个入门案例相关推荐

  1. Microsoft Visual C++2010安装教程并编写入门案例

    目录 一.下载安装包 二.安装步骤 (1)下载安装包,解压文件 (2)以管理员身份运行setup.exe安装程序 (3)点击下一步 (4)选择同意协议,然后下一步 (5)选择软件安装目录,然后点击安装 ...

  2. Flask学习笔记01:安装Flask模块与入门案例

    文章目录 一.安装flask模块 1.安装Flask模块 2.在Python里查看Flask版本 二.案例演示--HelloWorld<

  3. RabbitMQ 服务异步通信 -- 入门案例(消息预存机制)、SpringAMQP、发布订阅模式(FanoutExchange、DirectExchange、TopicExchange)、消息转换器

    文章目录 1. 入门案例 2. 完成官方Demo中的hello world案例 2.1 创建1个工程,2个模块 2.1.1 父工程的依赖,子工程不需要导入额外的依赖 2.1.2 配置子工程的配置文件( ...

  4. Python:Scrapy的安装和入门案例

    Scrapy的安装介绍 Scrapy框架官方网址:http://doc.scrapy.org/en/latest Scrapy中文维护站点:http://scrapy-chs.readthedocs. ...

  5. Vue安装配置以及入门案例

    Vue Vue简介 Vue (读音 /vjuː/,类似于 view) 是一套用于构建用户界面的渐进式框架.与其它大型框架不同的是,Vue 被设计为可以自底向上逐层应用.Vue 的核心库只关注视图层,不 ...

  6. Phoenix安装、入门案例

    目录 一.Phoenix简介 1.什么是Phoenix 2.Phoenix性能 二.Phoenix的安装部署 三.Phoenix入门案例 四.建立与HBase表映射 五.使用Phoenix构建二级索引 ...

  7. Kyin学习笔记(一)-----Kylin安装、入门案例和原理介绍

    目录 一.Kylin简介 1.Kylin的诞生背景 2.Kylin的应用场景 3.为什么要使用Kylin 4.Kylin的总体架构 二.Kylin安装 1.依赖环境 2.集群规划 3.安装kylin- ...

  8. ActiveMQ-01-MQ概述,安装,入门案例

    文章目录 01.MQ概述 MQ的产品种类和对比 MQ的产生背景 MQ的主要作用 MQ的定义 MQ的特点 02.RPC架构 什么是RPC架构? 常见的RPC架构 03.ActiveMQ安装 04.入门案 ...

  9. StreamSets简介和入门案例

    目录 一.Streamsets简介 二.安装步骤 2.1 Java环境 2.2 打开文件数 三.入门案例 3.1 本地文件解析到HDFS 1. 数据流的整体设计 2.管道流的具体设计步骤 3.2 My ...

最新文章

  1. 数据流分析与 SSA | 什么是静态单赋值 SSA
  2. What is Mahalanobis distance? 马氏距离
  3. Intelli IDEA导入jar包
  4. android的json解析方式,Android解析JSON方式
  5. 广州科目三电子考需注意哪些问题?
  6. java创建临时文件夹_java创建临时文件
  7. [Swift通天遁地]三、手势与图表-(10)创建包含圆点、方形、三角形图标的散点图表...
  8. solr返回的字段带有中括号问题
  9. 安全策略已传播,但有警告信息。0x534:帐户名与安全标识间无任何映射完成
  10. Win10常用快捷键
  11. kafka的offset理解
  12. OSPF笔记(二):OSPF邻居与邻接、DR与BDR选举
  13. 小哈机器人发布新品_解决孩子学习烦恼 小哈教育机器人二代新品上市
  14. 实现病案首页数据上报自动化-小帮全面解决-数据上报自动化
  15. 热备用冷备用_个性化您的备用帐户的10种方法
  16. java发送信息到通知栏
  17. 心理测量?预知犯罪?AI可以减少京都之殇吗?
  18. html页面放大缩小样式不变,网页缩小放大后错位的解决方法
  19. Java面试突击(6):分库分表
  20. seata报错问题总结 Unable to commit against JDBC Connection

热门文章

  1. Ubuntu20.04部署yolov5目标检测算法,无人车/无人机应用
  2. 2016 百度之星 B题(java实现)
  3. 侠义型性格分析,侠义型人格的职业方向
  4. Java内存的一点理解, 静态方法和实例方法的区别及使用场景
  5. 52好压软件卸载办法
  6. 职业生涯发展阶段包括哪些?应该如何规划?
  7. windows管理右键菜单_在Windows 8中使用Windows 7开始菜单,资源管理器和任务管理器...
  8. golang使用grom连接mysql,Error 1146: Table ‘xxx.xxxs‘ doesn‘t exist
  9. java http异步调用_HttpClient的异步调用,你造吗?
  10. 【分布式理论】(二)分布式存储