1、安装flume
2、到Spark-Streaming官网下载poll方式的Sink
3、将sink放入到flume的lib包里面
4、先启动flume(多个),然后在启动Streaming程序

下载spark-flume
http://spark.apache.org/documentation.html
到Spark-1.6.2中
http://spark.apache.org/docs/1.6.2/,

搜一下flume

最后在安装的flume中加入:commons-lang3-3.3.2.jar、scala-library-2.10.5.jar、spark-streaming-flume-sink_2.10-1.6.1.jar,效果如右侧:

同步到集群中的其它的flume中:

[root@hadoop1 lib]# pwd
/home/tuzq/software/apache-flume-1.6.0-bin/lib
[root@hadoop1 lib]# scp -r * root@hadoop2:$PWD
[root@hadoop1 lib]# scp -r * root@hadoop3:$PWD
[root@hadoop1 lib]# scp -r * root@hadoop4:$PWD
[root@hadoop1 lib]# scp -r * root@hadoop5:$PWD

编写flume的配置文件:

[root@hadoop1 agentconf]# pwd
/home/tuzq/software/apache-flume-1.6.0-bin/agentconf
[root@hadoop1 agentconf]# vim flume-poll.conf

其中flume-poll.conf的内容如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/tuzq/software/flumedata
a1.sources.r1.fileHeader = true# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
#表示从这里拉数据
a1.sinks.k1.hostname = hadoop1
a1.sinks.k1.port = 8888# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume.

[root@hadoop1 apache-flume-1.6.0-bin]# cd /home/tuzq/software/apache-flume-1.6.0-bin
[root@hadoop1 apache-flume-1.6.0-bin]# bin/flume-ng agent -n a1 -c agentconf/ -f agentconf/flume-poll.conf -Dflume.root.logger=WARN,console

启动后的效果如下:

这样,一直启动Flume

然后编写从Flume中读取数据的程序。
pom文件的内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.spark</groupId><artifactId>bigdata</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.10.6</scala.version><spark.version>1.6.2</spark.version><hadoop.version>2.6.4</hadoop.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-flume_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.10</artifactId><version>${spark.version}</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-make:transitive</arg><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.toto.spark.FlumeStreamingWordCount</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>

编写代码:

package cn.toto.sparkimport java.net.InetSocketAddressimport org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by toto on 2017/7/13.*/
object FlumeStreamingWordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("FlumeStreamingWordCount").setMaster("local[2]")//创建StreamingContext并设置产生批次的间隔时间val ssc = new StreamingContext(conf,Seconds(15))//从Socket端口中创建RDD,这里的SocketAddress可以传递多个val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =FlumeUtils.createPollingStream(ssc, Array(new InetSocketAddress("hadoop1", 8888)),StorageLevel.MEMORY_AND_DISK)//去取Flume中的数据val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" "))val wordAndOne : DStream[(String,Int)] = words.map((_,1))val result : DStream[(String,Int)] = wordAndOne.reduceByKey(_+_)//打印result.print()//开启程序ssc.start()//等待结束ssc.awaitTermination()}
}

启动程序。然后往Flume监控的flumedata目录下放入文件,如:

其中1.txt的内容如下:

最后在IDEA的控制台中观察结果:

Spark Streaming之:Flume监控目录下文件内容变化,然后Spark Streaming实时监听Flume,然后从其上拉取数据,并计算出结果相关推荐

  1. linux如何查看指定目录下文件内容,Linux 系统下通过关键词查找指定目录下的文件内容...

    #!/bin/bash # 作者:靑龍一笑(C.S.Ricen) # 功能:根据指定的关键词,查找指定目录下的文件内容 # 要查找的目录 Search_Dir=/opt/datas/ # 关键字列表 ...

  2. Python监控目录和文件变化

    原文:https://www.cnblogs.com/lcamry/p/8392376.html Python监控目录和文件变化 一.os.listdir import os, time path_t ...

  3. 在linux系统环境中 常用的关机命令,Linux常用基础命令整理:关机命令、查看目录下文件命令等...

    整理了一些Linux常用基础命令,欢迎指正. 首先记住四个热键,学会这四个键,收益一辈子. Tab按键---命令补齐功能 Ctrl+c按键---停掉正在运行的程序 Ctrl+d按键---相当于exit ...

  4. Linux中/proc目录下文件详解

    Linux中/proc目录下文件详解(一) 声明:可以自由转载本文,但请务必保留本文的完整性. 作者:张子坚 email:zhangzijian@163.com 说明:本文所涉及示例均在fedora ...

  5. linux 检查权限,检查目录下 文件的权限-linux shell脚本,

    检查目录下 文件的权限-linux shell脚本, #!/bin/bash #History: #2019/07/23    Fsq #This Program will check Permiss ...

  6. python实现文件搜索_python实现搜索指定目录下文件及文件内搜索指定关键词的方法...

    本文实例讲述了python实现搜索指定目录下文件及文件内搜索指定关键词的方法.分享给大家供大家参考.具体实现方法如下: #!/usr/bin/python -O # -*- coding: UTF-8 ...

  7. linux c 遍历目录 及 目录下文件

    目录 递归实现 非递归实现 我们知道,许多操作系统中的目录结构都是使用树结构. 使用递归的方法定义树是比较容易的.一棵树地一些节点的集合. 这个集合可以为空,若非空,则树由树根和0个或者多个非空的子树 ...

  8. Python 连接FTP服务器并实现文件夹下载实例演示,python区分ftp目录下文件和文件夹方法,ftp目录下包含中文名问题处理

    Python 连接 FTP 服务器并实现文件夹下载实例演示 第一章:连接 FTP 服务器并实现文件夹下载 ① 连接 FTP 服务器 ② 进入指定目录并显示文件信息 ③ 区分文件和文件夹名 ④ 文件夹名 ...

  9. Linux中/proc目录下文件详解(二)

    Linux中/proc目录下文件详解(二) /proc/mdstat文件 这个文件包含了由md设备驱动程序控制的RAID设备信息. 示例: [root@localhost ~]# cat /proc/ ...

最新文章

  1. Hibernate 的 session.load()使用方法
  2. cvelist.jsp
  3. 远程开启目标计算机的远程桌面
  4. POJ - 2186 Popular Cows(强连通缩点)
  5. node+socket.io 实现一个聊天室
  6. 电商管理系统源码_Dubbo/SSM/Elasticsearch/Redis/MySQL搭建分布式电商购物商城
  7. kafka 可视化工具_Kafka集群在马蜂窝大数据平台的优化与应用扩展
  8. 关注LoadRunner脚本回放日志中的Warning信息
  9. 网络转载 ! 不保证网站安全 谨慎!
  10. java api1.8中文版(由谷歌,百度,有道,必应翻译)
  11. JS里给日期增加n个月的方法
  12. 上传文件到本地操作和上传到Azure云上
  13. 计算机内 云盘图标,如何关闭我的电脑中百度网盘图标
  14. VR眼镜连接android设备,华为VR眼镜连接电脑教程
  15. zmud命令详细解答
  16. 大数据产业链结构_大数据产业链包含那几个应用环节?
  17. node.js 实现简单爬虫批量下载喜马拉雅音频
  18. amh升级php版本,AMH4.2升级PHP版本后续之组件安装
  19. [JS]JavaScript基础学习笔记(黑马pink+尚硅谷李立超)
  20. 无刷直流电机学习(3)

热门文章

  1. 机器学习(1.机器学习概述、数据集的组成以及机器学习的特征工程)
  2. 深度学习-机器学习(神经网络的应用 上)
  3. Uipath 学习栏目基础教学:4Uipath 循环语句
  4. python 实现文本自动翻译功能
  5. Java8 ArrayBlockingQueue 源码阅读
  6. wxWidgets:wxMouseCaptureLostEvent类用法
  7. wxWidgets:wxMessageDialog类用法
  8. wxWidgets:wxAuiManagerEvent类用法
  9. Boost.Signals2 的多槽 hello world 示例
  10. boost::python::back_reference相关的测试程序