sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据
Spark Streaming官方提供Receiver-based和Direct Approach两种方法接入Kafka数据,本文简单介绍两种方式的pyspark实现。
1、Spark Streaming接入Kafka方式介绍
Spark Streaming 官方提供了两种方式读取Kafka数据:
一是Receiver-based Approach。该种读取模式官方最先支持,并在Spark 1.2提供了数据零丢失(zero-data loss)的支持;
一是Direct Approach (No Receivers)。该种读取方式在Spark 1.3引入。
1.1 Receiver-based Approach
Receiver-based的Kafka读取方式是基于Kafka高阶(high-level) api来实现对Kafka数据的消费。在提交Spark Streaming任务后,Spark集群会划出指定的Receivers来专门、持续不断、异步读取Kafka的数据,读取时间间隔以及每次读取offsets范围可以由参数来配置。读取的数据保存在Receiver中,具体StorageLevel方式由用户指定,诸如MEMORY_ONLY等。当driver 触发batch任务的时候,Receivers中的数据会转移到剩余的Executors中去执行。在执行完之后,Receivers会相应更新ZooKeeper的offsets。如要确保at least once的读取方式,可以设置spark.streaming.receiver.writeAheadLog.enable为true。具体Receiver执行流程见下图:
需要借助Write Ahead Logs 来保证数据的不丢失,如果启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER)
在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度
对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream
1.2 Direct Approach (No Receivers)
Direct方式采用Kafka简单的consumer api方式来读取数据,无需经由ZooKeeper,此种方式不再需要专门Receiver来持续不断读取数据。当batch任务触发时,由Executor读取数据,并参与到其他Executor的数据计算过程中去。由drive来决定读取多少offsets,并将offsets交由checkpoints来维护。将触发下次batch任务,再由Executor读取Kafka数据并计算。从此过程可以发现Direct方式无需Receiver读取数据,而是需要计算时再读取数据,所以Direct方式的数据消费对内存的要求不高,只需要考虑批量计算所需要的内存即可;另外batch任务堆积时,也不会影响数据堆积。其具体读取方式如下图:
简化的并行:在Receiver的方式中提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。
2、Spark Streaming接入Kafka数据实现
以wordcount统计为例,kafka生产端输入词组,Spark端读取kafka流数据,并统计词频
2.1 Receiver方式收取数据
1)Import KafkaUtils并创建DStream
from pyspark.streaming.kafka import KafkaUtils
kafkaStream = KafkaUtils.createStream(streamingContext, \ [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
ZK Quorum:Zookeeper quorum (hostname:port,hostname:port,..)
Groupid:消费者的groupid
Topics:{topic_name : numPartitions}
2)具体实现代码如下:
from pyspark import SparkContextfrom pyspark.streaming import StreamingContextfrom pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__": #if len(sys.argv) != 3: # print("Usage: kafka_wordcount.py ", file=sys.stderr) # exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount") ssc = StreamingContext(sc, 10)
zkQuorum = "192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181" groupid = "spark-streaming-consumer" topic = {"kafka_spark_test1":0,"kafka_spark_test1":1,"kafka_spark_test1":2} #zkQuorum, topic = sys.argv[1:] kvs = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic) lines = kvs.map(lambda x: x[1]) counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a+b) counts.pprint()
ssc.start() ssc.awaitTermination()
在Spark目录执行命令:
spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-01.py
2.2 Direct方式收取数据
1)Import KafkaUtils并创建DStream
from pyspark.streaming.kafka import KafkaUtils directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
ssc:StreamingContext
topics:消费的topics清单
{"metadata.broker.list": brokers}:kafka参数,可以指定为 metadata.broker.list或bootstrap.servers
默认情况下,从每个kafka分区的最新的offset进行消费,如果在kafka参数中设置了auto.offset.reset 为smallest,则会从最小的offset进行消费
如果希望保存每个批量消费的kafka offset,可以进行如下操作:
offsetRanges = []
def storeOffsetRanges(rdd):global offsetRanges offsetRanges = rdd.offsetRanges()return rdd
def printOffsetRanges(rdd):for o in offsetRanges:print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
directKafkaStream \ .transform(storeOffsetRanges) \ .foreachRDD(printOffsetRanges)
如果希望使用基于Zookeeper的Kafka监控,也可以通过这种方法展现Streaming的进程。
2)具体实现代码如下:
from pyspark import SparkContextfrom pyspark.streaming import StreamingContextfrom pyspark.streaming.kafka import KafkaUtils
offsetRanges = []
def storeOffsetRanges(rdd): global offsetRanges offsetRanges = rdd.offsetRanges() return rdd
def printOffsetRanges(rdd): for o in offsetRanges: print("%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset))
if __name__ == "__main__": #if len(sys.argv) != 3: # print("Usage: direct_kafka_wordcount.py ", file=sys.stderr) # exit(-1)
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount") ssc = StreamingContext(sc, 10)
#brokers, topic = sys.argv[1:] topic="kafka_spark_test1" brokers = "192.168.112.101:9092,192.168.112.102:9092,192.168.112.103:9092" kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) lines = kvs.map(lambda x: x[1]) counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a+b) kvs.transform(storeOffsetRanges).foreachRDD(printOffsetRanges) counts.pprint()
ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate
在Spark根目录执行命令:
spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-02.py
2.3 Kafka生产者配置
Kafka集群环境的安装配置,参考之前的文档"大数据系列之Kafka集群环境部署"中相关内容
1)启动zookeeper
[root@tango-centos01 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &[root@tango-centos02 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &[root@tango-centos03 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
2)启动Kafka集群
[root@tango-centos01 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties &[root@tango-centos02 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties &[root@tango-centos03 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties &
3)创建Kafka topic
[root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-topics.sh --create --zookeeper 192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181 --replication-factor 2 --partitions 3 --topic kafka_spark_test1Created topic "kafka_spark_test1".
创建名为kafka_spark_test1 的Topic,复制因子设为2,同时分区数为3,注意,分区数是read parallelisms的最大值
4)查看Topic详情
[root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-topics.sh --describe --zookeeper 192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181 --topic kafka_spark_test1Topic:kafka_spark_test1 PartitionCount:3 ReplicationFactor:2 Configs: Topic: kafka_spark_test1 Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: kafka_spark_test1 Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1 Topic: kafka_spark_test1 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
指定--zookeeper选项的值为192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181,对应的Topic,即刚创建的kafka_spark_test1
2.4 Kafka-Spark Streaming流测试
1)下载依赖的jars包
2)启动kafka生产者
[root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-console-producer.sh --broker-list 192.168.112.101:9092 --topic kafka_spark_test1
3)运行Spark Streaming流数据处理程序
[root@tango-spark01 spark-2.3.0]# spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-01.py[root@tango-spark01 spark-2.3.0]# spark-submit --master yarn --deploy-mode client --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /usr/local/spark/ipynotebook/03-kafka2streaming-02.py
4)在Kafka生产端输入流数据
[root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-console-producer.sh --broker-list 192.168.112.101:9092 --topic kafka_spark_test1>hello world>hello tango hello>hello tango tango
5)终端打印结果
-------------------------------------------Time: 2018-08-08 11:03:15-------------------------------------------(u'tango', 2)(u'hello', 1)
6)登录SparkWeb UI,查看Spark Streaming的的运行情况
a) spark-submit时候指定spark-submit --master spark://192.168.112.121:7077才能在8080端口看到数据
b) 如果通过yarn模式调度,可通过8088端口查看
2.5 Spark写入Kafka
1)安装Kafka插件
Pyspark访问Kafka需要使用到kafka安装包,使用以下命令安装:
pip install --no-index --find-links=../kafka-1.3.5-py2.py3-none.any.whl kafka
2)调用KafkaProducer模块,spark作为生产者将数据传输到kafka端
from kafka import KafkaProducer
to_kafka = KafkaProducer(bootstrap_servers=broker_list)to_kafka.send(topic_name,send_msg,encode(‘utf8’))to_kafka.flush()
参考资料
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
大数据系列之Kafka集群环境部署
sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据相关推荐
- sparkstreaming监听hdfs目录_Spark Streaming消费Kafka数据的两种方案
下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考.文章写的通俗易懂,搭配代码,供大家参考. ...
- sparkstreaming监听hdfs目录如何终止_四十六、Spark Streaming简介及入门
1.什么是Spark Streaming Spark Streaming是基于Spark Core之间的实时计算框架,可以从很多数据源消费数据并对数据进行处理.它是Spark核心API的一个扩展与封装 ...
- sparkstreaming监听hdfs目录_flume kafka和sparkstreaming整合
本文介绍Flume.Kafka和Sparkstreaming的整合.代码流程是,我们通过shell脚本重播测试轨迹数据到指定轨迹文件中,使用Flume监听该轨迹数据文件,实时将轨迹数据发送到Kafka ...
- sparkstreaming监听hdfs目录如何终止_Spark笔试题:Spark Streaming 反压机制
Spark Streaming 反压机制是1.5版本推出的特性,用来解决处理速度比摄入速度慢的情况,简单来讲就是做流量控制.当批处理时间(Batch Processing Time)大于批次间隔(Ba ...
- sparkstreaming监听hdfs目录如何终止_HDFS—HA高可用详解
一.HA概述 1)所谓HA(high available),即高可用(7*24小时不中断服务). 2)实现高可用最关键的策略是消除单点故障.HA严格来说应该分成各个组件的HA 机制:HDFS的HA和Y ...
- Spark Streaming读取Kafka数据的两种方式
Kafka在0.8和0.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8与spark-s ...
- kafka监听topic消费_分布式专题|最近一直死磕kafka设计原理,都肝吐了
kafka架构图 kafka核心控制器 定义 在kafka集群中,会选举出一个broker作为控制器(controller),负责管理集群中所有的分区和副本的状态: 职责 监听broker变化,通过监 ...
- spark streaming 接收 kafka 数据java代码WordCount示例
1. 首先启动zookeeper 2. 启动kafka 3. 核心代码 生产者生产消息的java代码,生成要统计的单词 package streaming;import java.util.Prope ...
- Cris 玩转大数据系列之消息队列神器 Kafka
Cris 玩转大数据系列之消息队列神器 Kafka Author:Cris 文章目录 Cris 玩转大数据系列之消息队列神器 Kafka Author:Cris 1. Kafka 概述 1.1 消息队 ...
最新文章
- [转载]CentOS 7安装Gnome GUI 图形界面
- cmake 添加头文件目录,链接动态、静态库
- Python运行报错IndentationError: unindent does not match any outer indentation level
- 玩聚SR和FriendFeed的区别
- 如何上传本地图片到PictureBox控件
- [css] 用css画出一把刻度尺
- golang导入包的理解
- LeetCode刷题之1818. 绝对差值和
- ps切图后 JAVA开发_ps切图抠图详解-web前端(转)
- 量子计算机平行宇宙,量子纠缠效应揭示:每个人的行为也会影响到其它平行宇宙里的自己...
- arcgis server发布自定义打印模板及利用ArcGIS API javascript使用自定义打印服务打印地图
- poj 2454 随机化(划片使得选举胜利)
- 腾讯云数据库TDSQL-C(原CynosDB)的外网访问配置
- Unity3D中 使模型变成变透明
- 「高级java工程师」常见面试题及其答案(持续更新)
- Containerd高级命令行工具nerdctl安装及使用
- 2022年最新Upwork注册申请教程
- 小布机器人怎么断网_小布同学智能机器人好坏判断有诀窍,三大误区要避免
- 如何在 Kubernetes 集群中快速部署一个私有 Tailscale DERP 服务器
- 已解决windows开机时,系统提示此windows副本不是正版
热门文章
- J2EE项目代码编写规范分享
- 批量打印pdf并合并_批量打印CAD图(无删减版)
- linux javaweb环境单价,linux(centos)下Java Web环境开发
- excel实战应用案例100讲(十)-下载的文件显示“文件已损坏,无法打开”?
- 图形处理-几种图像修复方法
- 计算机如何学会自动地进行图像美学增强?
- python如何打印字符串_如何在Python中打印“漂亮”字符串输出
- 教你玩转CSS 轮廓(outline)属性
- python data frame_Python dataframer包_程序模块 - PyPI - Python中文网
- java 拼接html_程序员用1.5小时写出的Java代码,让同事瞠目结舌!直呼优秀