spark-streaming 编程(三)连接kafka消费数据
spark-streaming支持kafka消费,有以下方式:
我实验的版本是kafka0.10,试验的是spark-streaming-kafka-0.8的接入方式。另外,spark-streaming-kafka-0.10的分支并没有研究。
spark-streaming-kafka-0.8的方式支持kafka0.8.2.1以及更高的版本。有两种方式:
(1)Receiver Based Approach:基于kafka high-level consumer api,有一个Receiver负责接收数据到执行器
(2)Direct Approcah:基于kafka simple consumer api,没有receiver。
mavne项目需要添加依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.1.0</version></dependency>
Reviced based approach代码:使用方法见注释
package com.lgh.sparkstreamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils/*** Created by Administrator on 2017/8/23.*/
object KafkaWordCount {def main(args: Array[String]): Unit = {if (args.length < 4) {System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")System.exit(1)}//参数分别为 zk地址,消费者group名,topic名 多个的话,分隔 ,线程数val Array(zkQuorum, group, topics, numThreads) = args//setmaster,local是调试模式使用val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf, Seconds(2))ssc.checkpoint("checkpoint")//Map类型存储的是 key: topic名字 values: 读取该topic的消费者的分区数val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap//参数分别为StreamingContext,kafka的zk地址,消费者group,Map类型val kafkamessage = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)//_._2取出kafka的实际消息流val lines=kafkamessage.map(_._2)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)wordCounts.print()ssc.start()ssc.awaitTermination()}
}
Direct approach:
package com.lgh.sparkstreamingimport kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils/*** Created by Administrator on 2017/8/23.*/
object DirectKafkaWordCount {def main(args: Array[String]) {if (args.length < 2) {System.err.println(s"""|Usage: DirectKafkaWordCount <brokers> <topics>| <brokers> is a list of one or more Kafka brokers| <topics> is a list of one or more kafka topics to consume from|""".stripMargin)System.exit(1)}//borkers : kafka的broker 列表,多个的话以逗号分隔//topics: kafka topic,多个的话以逗号分隔val Array(brokers, topics) = args// Create context with 2 second batch intervalval sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf, Seconds(2))// Create direct kafka stream with brokers and topicsval topicsSet = topics.split(",").toSetval kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)// Get the lines, split them into words, count the words and printval lines = messages.map(_._2)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)wordCounts.print()// Start the computationssc.start()ssc.awaitTermination()}}
关于这两种方式的区别
1.Simplified Parallelism
Direct 方式将会创建跟kafka分区一样多的RDD partiions,并行的读取kafka topic的partition数据。kafka和RDD partition将会有一对一的对应关系。
2.Efficiency
Receiver-based Approach需要启用WAL才能保证消费不丢失数据
,效率比较低
3.Exactly-once semantics
Receiver-based Approach使用kafka high-level consumer api,存储消费者offset在zookeeper中,跟Write Ahead Log配合使用,能够实现至少消费一次语义。
Direct Approach 使用kafka simple consumer api,跟踪offset信息存储在spark checkpoint中。能够实现数据有且只消费一次语义。
spark-streaming 编程(三)连接kafka消费数据相关推荐
- 编程实现将rdd转换为dataframe:源文件内容如下(_第四篇|Spark Streaming编程指南(1)
Spark Streaming是构建在Spark Core基础之上的流处理框架,是Spark非常重要的组成部分.Spark Streaming于2013年2月在Spark0.7.0版本中引入,发展至今 ...
- Spark Streaming简介 (三十四)
Spark Streaming简介 Spark Streaming 是 Spark 提供的对实时数据进行流式计算的组件.它是 Spark 核心 API 的一个扩展,具有吞吐量高.容错能力强的实时流数据 ...
- Spark Streaming 编程指南[中英对照]
2019独角兽企业重金招聘Python工程师标准>>> 基于Spark 2.0 Preview的材料翻译,原[英]文地址: http://spark.apache.org/docs/ ...
- 第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考
特别说明: 在上一遍文章中有详细的叙述Receiver启动的过程,如果不清楚的朋友,请您查看上一篇博客,这里我们就基于上篇的结论,继续往下说. 博文的目标是: Spark Streaming在接收 ...
- Spark大数据分析与实战:Spark Streaming编程初级实践
Spark Streaming编程初级实践 一.安装Hadoop和Spark 具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作: Hadoop的安装:https://blog.csdn ...
- sparkstreaming直接从kafka消费数据
1.sparkstreaming直接从kafka消费数据 采用createDirectStream,示例: createDirectStream[K, V, KD <: Decoder[K], ...
- 【kafka】kafka 消费数据的时候 报错 (Re-) join group
文章目录 1.场景1 1.1 概述 2.场景2 3.场景3 1.场景1 1.1 概述 kafka 消费数据的时候 报错 如下 2.场景2 spirng-kafka的多consumer问题困扰了我好久, ...
- flume+kafka消费数据【纯个人笔记】
1.数据生产 使用java代码往一个文件中写入数据 package com.mobile;import java.io.*; import java.text.DecimalFormat; impor ...
- Spark Stream整合flum和kafka,数据存储在HBASE上,分析后存入数据库
开发环境:Hadoop+HBASE+Phoenix+flum+kafka+spark+MySQL 默认配置好了Hadoop的开发环境,并且已经安装好HBASE等组件. 下面通过一个简单的案例进行整合: ...
- Spark Streaming 编程新手入门指南
Spark Streaming 是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理.可以从许多数据源(例如Kafka,Flume,Kinesis或TCP sockets)中 ...
最新文章
- 每天一个linux命令(10):cat 命令
- 全面了解 Nginx 主要应用场景
- 【转】Android 获取本机号码(收集)
- SetupDiGetClassDevs函数详解
- JDK1.6官方下载_JDK6官方下载地址
- 将 Visual Studio 的代码片段导出到 VS Code
- GPU Saturday技术沙龙:OpenCL程序员眼中的下一代APU架构
- [你必须知道的.NET] 第五回:深入浅出关键字---把new说透(转载)
- eclipse 插件扩展新建java页面_java-Eclipse插件:创建动态菜单和相应的处理...
- 开源2D图形库/图形框架
- 华为一则面试题登上热搜;大众点评会员因点赞过多被处罚;Linux Kernel 5.12 发布|极客头条...
- 颜色列表(中英文名称,RGB HSV CMYK值)
- 作用JavaScript访问和操作数据库
- 轻量级网络之GhostNet
- 泰坦尼克号分析是否获救
- list迭代器的模拟实现
- android 加密手机功能,怎么为安卓手机加密
- 10个深度学习的工具
- FloorPlan 经验总结
- 计算机操作系统复习精简版