一,flume配置
# Name the components on this agent
a1.sources = tailsource-1
a1.sinks = remotesink
a1.channels = memoryChnanel-1

# Describe/configure the source
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.command = tail -F /var/log/test/raw_data.txt

a1.sources.tailsource-1.channels = memoryChnanel-1

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.memoryChnanel-1.type = memory
a1.channels.memoryChnanel-1.keep-alive = 10
a1.channels.memoryChnanel-1.capacity = 100000
a1.channels.memoryChnanel-1.transactionCapacity = 100000

# Bind the source and sink to the channel
a1.sinks.remotesink.type = avro
a1.sinks.remotesink.hostname = 172.18.203.137
a1.sinks.remotesink.port = 9999
a1.sinks.remotesink.channel = memoryChnanel-1

#agent section
producer.sources = s
producer.channels = c
producer.sinks = r

#source section
producer.sources.s.type = avro
producer.sources.s.bind = 172.18.203.137
producer.sources.s.port = 9999

producer.sources.s.channels = c

# Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
producer.sinks.r.topic = mytopic
producer.sinks.r.brokerList = master1:9092,master2:9092,slave2:9092
producer.sinks.r.requiredAcks = 1
producer.sinks.r.batchSize = 20
producer.sinks.r.channel = c1

#Specify the channel the sink should use
producer.sinks.r.channel = c

# Each channel's type is defined.
producer.channels.c.type = org.apache.flume.channel.kafka.KafkaChannel
producer.channels.c.capacity = 10000
producer.channels.c.transactionCapacity = 1000
producer.channels.c.brokerList=master1:9092,master2:9092,slave2:9092
producer.channels.c.topic=channel1
producer.channels.c.zookeeperConnect=master2:2181,slave2:2181,slave4:2181

二, Spark代码

import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
* Author: david
* Date : 3/7/17
*/
object StreamingDataTest {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);

val conf = new SparkConf().setAppName("StreamingDataTest").setMaster("local[4]")

val sc = new SparkContext(conf)

val ssc = new StreamingContext(sc, Seconds(1))

// Kafka的topic
val topics = Set("mytopic")

//kafka brokers列表
val brokers = "master1:9092,master2:9092,slave3:9092"

//kafka查询参数
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")

//创建direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

//kafkaStream这个tuple的第二部分为接收kafka topic里的文本流
val rawDStream = kafkaStream.flatMap(_._2.split("\\s+")).map((_, 1))

val resDStream = rawDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Seconds(8),
Seconds(4));

resDStream.print();

ssc.start()
ssc.awaitTermination()
}

}

三,注意事项
查看/var/log/flume-ng下面的日志报错信息
avro端口号绑定大于公共端口1024
注意linux防火墙service iptables stop
注意运行scala依赖的scope为 provided编译可以,但本机运行找不到class

Flume+kafka+Spark Steaming demo2相关推荐

  1. Flume+Kafka+Spark Steaming demo

    一.准备flume配置 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = spooldir a1.sources. ...

  2. Flume+Kafka+Spark Streaming+MySQL实时日志分析

    文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...

  3. pyspark steaming 连接kafka数据实时处理(也可以对接flume+kafka+spark)

    需要下载对应 spark-streaming-kafka-0-8-assembly jar包(版本要对于) 下载地址: https://mvnrepository.com/artifact/org.a ...

  4. 【python+flume+kafka+spark streaming】编写word_count入门示例

    一. 整体架构的一些理解 1.整体架构的理解: 架构中的角色分为了数据采集,数据缓冲,还有数据处理. flume由于输入和输出的接口众多,于是利用这特点来实现无编程的数据采集. 无编程的数据采集,我是 ...

  5. Flume+Kafka+Spark Streaming实现大数据实时流式数据采集

    近年来,随着企业信息化建设的飞速发展,大数据应用的问题越来越备受关注.很多企业投入大量的人力.物力和财力建设企业大数据平台,平台建设工作涵盖数据采集.数据处理.数据存储.数据服务.数据展示以及数据质量 ...

  6. Flume+Kafka+Spark小案例

  7. Spark Steaming快速入门

    Spark Steaming Spark Streaming 简介 什么是Spark Streaming Spark Streaming使用Spark Core的快速调度功能来执行流分析.它以小批量方 ...

  8. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

    如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...

  9. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装

    一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...

最新文章

  1. 独家 | 深度学习 V.S. 谜题游戏
  2. excel Match函数不同匹配类型用法解析
  3. JS函数方法Call Apply Bind运用
  4. 蓝桥杯 方格填数(全排列+图形补齐)
  5. java 幽灵引用_Java 幽灵引用的作用
  6. 未来论坛:AI决策的可靠性和可解释性
  7. mysql innodb 读加锁,Mysql InnoDB加锁分析
  8. Mybatis JdbcType与Oracle、MySql数据类型对应列表
  9. BPF 之巅:洞悉 Linux 系统和应用性能
  10. gvim【一】【安装和基本使用】
  11. Qt应用程序嵌入浏览器的常用方法
  12. java实现支付宝网页扫码支付
  13. elasticserach(一)
  14. BiliDuang(哔哩哔哩视频下载器)
  15. 人民币升值破七 香港物价上涨发工资改用人民币
  16. Ant Design Pro 4 动态菜单icon丢失解决办法
  17. python 实现贷款计算
  18. Android哪个系统占用内存小,哪个安卓模拟器占用的内存小,使用起来又不卡
  19. 靠写iPhone程序发财的三个故事
  20. surface go写php,【反馈】超便宜:851rmb的Surface go - 笔记本电脑(Notebook)版 - 北大未名BBS...

热门文章

  1. 《三十岁前的每一天》--水湄物语 读后感
  2. [计算几何] [BZOJ4246] 两个人的星座
  3. 基于麒麟座开发板2.0的MQTT实现例程
  4. BPDU Guard, BPDU Filter, Root Guard, Loop Guard UDLD
  5. android 获取粗略位置_android – 如何使用Wifi或GSM或GPS获取粗略的位置,以哪一个可用?...
  6. 人生短暂,持之以恒地做一件事情就会成功
  7. 微信小程序播放视频卡顿问题
  8. feign.codec.DecodeException: Error while extracting response for type报错记录
  9. 哈工大C语言程序设计精髓第十三周
  10. poi导出Excel合并单元格、设置打印参数页眉页脚等