Flume+kafka+Spark Steaming demo2
一,flume配置
# Name the components on this agent
a1.sources = tailsource-1
a1.sinks = remotesink
a1.channels = memoryChnanel-1
# Describe/configure the source
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.command = tail -F /var/log/test/raw_data.txt
a1.sources.tailsource-1.channels = memoryChnanel-1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.memoryChnanel-1.type = memory
a1.channels.memoryChnanel-1.keep-alive = 10
a1.channels.memoryChnanel-1.capacity = 100000
a1.channels.memoryChnanel-1.transactionCapacity = 100000
# Bind the source and sink to the channel
a1.sinks.remotesink.type = avro
a1.sinks.remotesink.hostname = 172.18.203.137
a1.sinks.remotesink.port = 9999
a1.sinks.remotesink.channel = memoryChnanel-1
#agent section
producer.sources = s
producer.channels = c
producer.sinks = r
#source section
producer.sources.s.type = avro
producer.sources.s.bind = 172.18.203.137
producer.sources.s.port = 9999
producer.sources.s.channels = c
# Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
producer.sinks.r.topic = mytopic
producer.sinks.r.brokerList = master1:9092,master2:9092,slave2:9092
producer.sinks.r.requiredAcks = 1
producer.sinks.r.batchSize = 20
producer.sinks.r.channel = c1
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = org.apache.flume.channel.kafka.KafkaChannel
producer.channels.c.capacity = 10000
producer.channels.c.transactionCapacity = 1000
producer.channels.c.brokerList=master1:9092,master2:9092,slave2:9092
producer.channels.c.topic=channel1
producer.channels.c.zookeeperConnect=master2:2181,slave2:2181,slave4:2181
二, Spark代码
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author: david
* Date : 3/7/17
*/
object StreamingDataTest {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
val conf = new SparkConf().setAppName("StreamingDataTest").setMaster("local[4]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
// Kafka的topic
val topics = Set("mytopic")
//kafka brokers列表
val brokers = "master1:9092,master2:9092,slave3:9092"
//kafka查询参数
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
//创建direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
//kafkaStream这个tuple的第二部分为接收kafka topic里的文本流
val rawDStream = kafkaStream.flatMap(_._2.split("\\s+")).map((_, 1))
val resDStream = rawDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Seconds(8),
Seconds(4));
resDStream.print();
ssc.start()
ssc.awaitTermination()
}
}
三,注意事项
查看/var/log/flume-ng下面的日志报错信息
avro端口号绑定大于公共端口1024
注意linux防火墙service iptables stop
注意运行scala依赖的scope为 provided编译可以,但本机运行找不到class
Flume+kafka+Spark Steaming demo2相关推荐
- Flume+Kafka+Spark Steaming demo
一.准备flume配置 a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = spooldir a1.sources. ...
- Flume+Kafka+Spark Streaming+MySQL实时日志分析
文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...
- pyspark steaming 连接kafka数据实时处理(也可以对接flume+kafka+spark)
需要下载对应 spark-streaming-kafka-0-8-assembly jar包(版本要对于) 下载地址: https://mvnrepository.com/artifact/org.a ...
- 【python+flume+kafka+spark streaming】编写word_count入门示例
一. 整体架构的一些理解 1.整体架构的理解: 架构中的角色分为了数据采集,数据缓冲,还有数据处理. flume由于输入和输出的接口众多,于是利用这特点来实现无编程的数据采集. 无编程的数据采集,我是 ...
- Flume+Kafka+Spark Streaming实现大数据实时流式数据采集
近年来,随着企业信息化建设的飞速发展,大数据应用的问题越来越备受关注.很多企业投入大量的人力.物力和财力建设企业大数据平台,平台建设工作涵盖数据采集.数据处理.数据存储.数据服务.数据展示以及数据质量 ...
- Flume+Kafka+Spark小案例
- Spark Steaming快速入门
Spark Steaming Spark Streaming 简介 什么是Spark Streaming Spark Streaming使用Spark Core的快速调度功能来执行流分析.它以小批量方 ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0
如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装
一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...
最新文章
- 独家 | 深度学习 V.S. 谜题游戏
- excel Match函数不同匹配类型用法解析
- JS函数方法Call Apply Bind运用
- 蓝桥杯 方格填数(全排列+图形补齐)
- java 幽灵引用_Java 幽灵引用的作用
- 未来论坛:AI决策的可靠性和可解释性
- mysql innodb 读加锁,Mysql InnoDB加锁分析
- Mybatis JdbcType与Oracle、MySql数据类型对应列表
- BPF 之巅:洞悉 Linux 系统和应用性能
- gvim【一】【安装和基本使用】
- Qt应用程序嵌入浏览器的常用方法
- java实现支付宝网页扫码支付
- elasticserach(一)
- BiliDuang(哔哩哔哩视频下载器)
- 人民币升值破七 香港物价上涨发工资改用人民币
- Ant Design Pro 4 动态菜单icon丢失解决办法
- python 实现贷款计算
- Android哪个系统占用内存小,哪个安卓模拟器占用的内存小,使用起来又不卡
- 靠写iPhone程序发财的三个故事
- surface go写php,【反馈】超便宜:851rmb的Surface go - 笔记本电脑(Notebook)版 - 北大未名BBS...
热门文章
- 《三十岁前的每一天》--水湄物语 读后感
- [计算几何] [BZOJ4246] 两个人的星座
- 基于麒麟座开发板2.0的MQTT实现例程
- BPDU Guard, BPDU Filter, Root Guard, Loop Guard UDLD
- android 获取粗略位置_android – 如何使用Wifi或GSM或GPS获取粗略的位置,以哪一个可用?...
- 人生短暂,持之以恒地做一件事情就会成功
- 微信小程序播放视频卡顿问题
- feign.codec.DecodeException: Error while extracting response for type报错记录
- 哈工大C语言程序设计精髓第十三周
- poi导出Excel合并单元格、设置打印参数页眉页脚等