Flume安装及几个入门案例
安装
1、下载
http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
2、上传到指定的服务器(master)中的某个目录
3、解压
tar -xvf apache-flume-1.8.0-bin.tar.gz
4、cd apache-flume-1.8.0-bin/conf
5、cp flume-conf.properties.template flume-conf.properties
6、vi flume-conf.properties
几个小案例带你了解flume
一个理论:flume每一次接收到的数据称为一个事件
说在前面telnet 需要自己去装,然后启动:telnet localhost 44445
案例一
接收netcat的数据展示到console
## 定义 sources、channels 以及 sinks
# 数据源
agent1.sources = netcatSrc
# 收集的数据放在那里,memoryChannel这个是放在内存中
agent1.channels = memoryChannel
# 数据放在那里loggerSink这个是以log的方式打印在控制台上
agent1.sinks = loggerSink## netcatSrc 的配置
# 启动的服务
agent1.sources.netcatSrc.type = netcat
# Ip是什么
agent1.sources.netcatSrc.bind = localhost
# 在哪个端口上
agent1.sources.netcatSrc.port = 44445## loggerSink 的配置
# 以log的方式写到控制台上
agent1.sinks.loggerSink.type = logger## memoryChannel 的配置
agent1.channels.memoryChannel.type = memory
# 能装100条记录
agent1.channels.memoryChannel.capacity = 100## 通过 memoryChannel 连接 netcatSrc 和 loggerSink
agent1.sources.netcatSrc.channels = memoryChannel
agent1.sinks.loggerSink.channel = memoryChannel# agent 一个代理 conf用conf的方式启动 agent1就是配置里的那个
# -Dflume.root.logger=INFO,console info的方式打印在控制台
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name agent1 -Dflume.root.logger=INFO,console
接收到的数据先放内存,再打印到控制台上
案例二
接收telnet的数据,每五次事件存一次hdfs
## 定义 sources、channels 以及 sinks
agent1.sources = netcatSrc
# memoryChannel 可能会丢失数据 应该把它换成文件的形式
agent1.channels = memoryChannel
# 这个是把它放到hdfs去
agent1.sinks = hdfsSink## netcatSrc 的配置
agent1.sources.netcatSrc.type = netcat
agent1.sources.netcatSrc.bind = localhost
agent1.sources.netcatSrc.port = 44445## hdfsSink 的配置
# 保存到hdfs上去
agent1.sinks.hdfsSink.type = hdfs
# 放到这个目录下去
agent1.sinks.hdfsSink.hdfs.path = hdfs://master:8020/user/hadoop-jrq/spark-course/steaming/flume/%y-%m-%d
# 每5次flume事件写一次HDFS
agent1.sinks.hdfsSink.hdfs.batchSize = 5
# 这个一定位true,否则会报错
agent1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true## memoryChannel 的配置
agent1.channels.memoryChannel.type = memory
agent1.channels.memoryChannel.capacity = 100## 通过 memoryChannel 连接 netcatSrc 和 hdfsSink
agent1.sources.netcatSrc.channels = memoryChannel
agent1.sinks.hdfsSink.channel = memoryChannel启动:
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name agent1
案例三
监控一个文件,每五次事件写一次hdfs
## 定义 sources、channels 以及 sinks
agent1.sources = logSrc
# 数据接收到后放在文件中去
agent1.channels = fileChannel
agent1.sinks = hdfsSink## logSrc 的配置
agent1.sources.logSrc.type = exec
# -F参数会有一个小bug因此,我使用-f0
agent1.sources.logSrc.command = tail -f0 /home/hadoop-jrq/spark-course/steaming/flume-course/demo3/logs/webserver.log## hdfsSink 的配置
agent1.sinks.hdfsSink.type = hdfs
agent1.sinks.hdfsSink.hdfs.path = hdfs://master:8020/user/hadoop-jrq/spark-course/steaming/flume/%y-%m-%d
agent1.sinks.hdfsSink.hdfs.batchSize = 5
agent1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true## fileChannel 的配置
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /home/hadoop-jrq/spark-course/steaming/flume-course/demo2-2/checkpoint
agent1.channels.fileChannel.dataDirs = /home/hadoop-jrq/spark-course/steaming/flume-course/demo2-2/data## 通过 fileChannel 连接 logSrc 和 hdfsSink
agent1.sources.logSrc.channels = fileChannel
agent1.sinks.hdfsSink.channel = fileChannel日志文件收集:实时的保存到hdfs中去
实时接收日志信息,然后保存到hdfs中去bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name agent1 echo testdata >> webserver.log
案例四,一个简单的模拟网站实时流处理
监控日志文件,写入kafka,然后再用spark streaming处理数据
flume配置:
agent1.sources = s1
agent1.channels = c1
agent1.sinks = k1agent1.sources.s1.type = exec
agent1.sources.s1.command = tail -fn0 /home/hadoop-jrq/spark-course/steaming/weblog.logagent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# kafka的主题
agent1.sinks.k1.kafka.topic = pageview
# kafka集群的地址
agent1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
# 每20次事件发送一次
agent1.sinks.k1.kafka.flumeBatchSize = 20
# ack机制的选用,不懂的,可以去看我kafka的ack机制文章
agent1.sinks.k1.kafka.producer.acks = 1
agent1.sinks.k1.kafka.producer.linger.ms = 1
# 压缩机制
agent1.sinks.k1.kafka.producer.compression.type = snappyagent1.sources.s1.channels = c1
agent1.sinks.k1.channel = c1启动flume:bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name agent1
# 模拟数据发送
echo testdata >> webserver.log
依赖的jar包:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.2.0</version>
<!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.0</version>
<!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.5.2</version></dependency>
spark streaming代码:
import com.jrq.streaming.InternalRedisClient
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object PageViewStream {def main(args: Array[String]) {if (args.length < 2) {printIn("参数不够")System.exit(1)}println("开始")val sparkConf = new SparkConf()if (!sparkConf.contains("spark.master")){// 这个是因为我本地的 环境有点问题,手动加上的Hadoop环境,否则会报错,如果你的不会报错,就不需要加上System.setProperty("hadoop.home.dir", "D:\\hadoop-common-2.2.0-bin-master")// 我这是本地启动,这个程序最少要5个core才能启动,3个core用于接收数据,2个core用于处理数据sparkConf.setMaster("local[6]")}sparkConf.setAppName("DirectKafkaWordCount")// val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")val sc = new SparkContext(sparkConf)val Array(brokers, topics) = args// Create the contextval ssc = new StreamingContext(sc, Seconds(1))// 这里是因为我配置了HA的名字,因此程序中需要这么写,否则会有bug,就是当你master挂掉的时候,你的程序会报错,当然在本地测试的时候会有问题,换成hdfs://master:8020/就行了ssc.checkpoint("hdfs://mycluster/user/hadoop-jrq/spark-course/streaming/checkpoint")// 从kafka中读取数据val topicsSet = topics.split(",").toSetval kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)// 初始化一个kafka Direct 模式val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)// 且将每一行的字符串转换成PageView对象val pageViews = messages.map { case (_, value) => PageView.fromString(value) }// 每隔3秒统计前50秒内每一个URL的访问错误数val badCount = pageViews.window(Seconds(50), Seconds(3)).filter(_.status != 200).count()// 每隔两秒统计前15秒内有多少个用户访问了url:https:foo.com/val fooCount = pageViews.window(Seconds(15), Seconds(2)).filter(_.url != "https://foo.com/").map(view => (view.userID, 1)).groupByKey().count()// 每隔5秒统计前30秒内有多少活跃用户val activeUserCount = pageViews.window(Seconds(30), Seconds(1)).map(view => (view.userID, 1)).groupByKey().count()activeUserCount.print()activeUserCount.foreachRDD { rdd =>// 对每个分区进行遍历,写入redisrdd.foreachPartition { partitionRecords =>// redis 的连接池val jedis = InternalRedisClient.getPool.getResourcepartitionRecords.foreach { count =>jedis.set("active_user_count", count.toString)}// 写完后吧这个连接还给连接池InternalRedisClient.getPool.returnResource(jedis)}}ssc.start()ssc.awaitTermination()}
}/*表示访问一个网站页面的行为日志数据,格式为:url status zipCode userID\n*/
case class PageView(val url: String, val status: Int, val zipCode: Int, val userID: Int)extends Serializableobject PageView extends Serializable {def fromString(in: String): PageView = {val parts = in.split(" ")new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt)}
}
Redis连接池代码:
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool/*** Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool}*/
object InternalRedisClient extends Serializable {@transient private lazy val pool: JedisPool = {val maxTotal = 10val maxIdle = 10val minIdle = 1val redisHost = "192.168.240.186"val redisPort = 6379val redisTimeout = 30000val poolConfig = new GenericObjectPoolConfig()poolConfig.setMaxTotal(maxTotal)poolConfig.setMaxIdle(maxIdle)poolConfig.setMinIdle(minIdle)poolConfig.setTestOnBorrow(true)poolConfig.setTestOnReturn(false)poolConfig.setMaxWaitMillis(100000)val hook = new Thread{override def run = pool.destroy()}sys.addShutdownHook(hook.run)new JedisPool(poolConfig, redisHost, redisPort, redisTimeout)}def getPool: JedisPool = {assert(pool != null)pool}def main(args: Array[String]): Unit = {val p = getPoolval j = p.getResourcej.set("much", "")}
}
集群启动的方式
/home/hadoop-jrq/bigdata/spark-2.2 .0-bin-hadoop2 .7/bin/spark-submit
--class com.jrq.streaming.kafka.PageViewStream \
-- master spark :// master: 7077 \
-- deploy - mode client \
-- driver - memory 512 m \
-- executor - memory 512 m \
-- total - executor - cores 5 \
-- executor - cores 2 \
/home/hadoop-jrq/spark-course/steaming/spark-streaming-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar \
master: 9092, slave1: 9092, slave2: 9092 pageview
模拟发送日志的方式:
echo http:// baidu.com 200 899787 22224 >> weblog.log
Flume安装及几个入门案例相关推荐
- Microsoft Visual C++2010安装教程并编写入门案例
目录 一.下载安装包 二.安装步骤 (1)下载安装包,解压文件 (2)以管理员身份运行setup.exe安装程序 (3)点击下一步 (4)选择同意协议,然后下一步 (5)选择软件安装目录,然后点击安装 ...
- Flask学习笔记01:安装Flask模块与入门案例
文章目录 一.安装flask模块 1.安装Flask模块 2.在Python里查看Flask版本 二.案例演示--HelloWorld<
- RabbitMQ 服务异步通信 -- 入门案例(消息预存机制)、SpringAMQP、发布订阅模式(FanoutExchange、DirectExchange、TopicExchange)、消息转换器
文章目录 1. 入门案例 2. 完成官方Demo中的hello world案例 2.1 创建1个工程,2个模块 2.1.1 父工程的依赖,子工程不需要导入额外的依赖 2.1.2 配置子工程的配置文件( ...
- Python:Scrapy的安装和入门案例
Scrapy的安装介绍 Scrapy框架官方网址:http://doc.scrapy.org/en/latest Scrapy中文维护站点:http://scrapy-chs.readthedocs. ...
- Vue安装配置以及入门案例
Vue Vue简介 Vue (读音 /vjuː/,类似于 view) 是一套用于构建用户界面的渐进式框架.与其它大型框架不同的是,Vue 被设计为可以自底向上逐层应用.Vue 的核心库只关注视图层,不 ...
- Phoenix安装、入门案例
目录 一.Phoenix简介 1.什么是Phoenix 2.Phoenix性能 二.Phoenix的安装部署 三.Phoenix入门案例 四.建立与HBase表映射 五.使用Phoenix构建二级索引 ...
- Kyin学习笔记(一)-----Kylin安装、入门案例和原理介绍
目录 一.Kylin简介 1.Kylin的诞生背景 2.Kylin的应用场景 3.为什么要使用Kylin 4.Kylin的总体架构 二.Kylin安装 1.依赖环境 2.集群规划 3.安装kylin- ...
- ActiveMQ-01-MQ概述,安装,入门案例
文章目录 01.MQ概述 MQ的产品种类和对比 MQ的产生背景 MQ的主要作用 MQ的定义 MQ的特点 02.RPC架构 什么是RPC架构? 常见的RPC架构 03.ActiveMQ安装 04.入门案 ...
- StreamSets简介和入门案例
目录 一.Streamsets简介 二.安装步骤 2.1 Java环境 2.2 打开文件数 三.入门案例 3.1 本地文件解析到HDFS 1. 数据流的整体设计 2.管道流的具体设计步骤 3.2 My ...
最新文章
- 数据流分析与 SSA | 什么是静态单赋值 SSA
- What is Mahalanobis distance? 马氏距离
- Intelli IDEA导入jar包
- android的json解析方式,Android解析JSON方式
- 广州科目三电子考需注意哪些问题?
- java创建临时文件夹_java创建临时文件
- [Swift通天遁地]三、手势与图表-(10)创建包含圆点、方形、三角形图标的散点图表...
- solr返回的字段带有中括号问题
- 安全策略已传播,但有警告信息。0x534:帐户名与安全标识间无任何映射完成
- Win10常用快捷键
- kafka的offset理解
- OSPF笔记(二):OSPF邻居与邻接、DR与BDR选举
- 小哈机器人发布新品_解决孩子学习烦恼 小哈教育机器人二代新品上市
- 实现病案首页数据上报自动化-小帮全面解决-数据上报自动化
- 热备用冷备用_个性化您的备用帐户的10种方法
- java发送信息到通知栏
- 心理测量?预知犯罪?AI可以减少京都之殇吗?
- html页面放大缩小样式不变,网页缩小放大后错位的解决方法
- Java面试突击(6):分库分表
- seata报错问题总结 Unable to commit against JDBC Connection
热门文章
- Ubuntu20.04部署yolov5目标检测算法,无人车/无人机应用
- 2016 百度之星 B题(java实现)
- 侠义型性格分析,侠义型人格的职业方向
- Java内存的一点理解, 静态方法和实例方法的区别及使用场景
- 52好压软件卸载办法
- 职业生涯发展阶段包括哪些?应该如何规划?
- windows管理右键菜单_在Windows 8中使用Windows 7开始菜单,资源管理器和任务管理器...
- golang使用grom连接mysql,Error 1146: Table ‘xxx.xxxs‘ doesn‘t exist
- java http异步调用_HttpClient的异步调用,你造吗?
- 【分布式理论】(二)分布式存储