spark-streaming支持kafka消费,有以下方式:

我实验的版本是kafka0.10,试验的是spark-streaming-kafka-0.8的接入方式。另外,spark-streaming-kafka-0.10的分支并没有研究。

spark-streaming-kafka-0.8的方式支持kafka0.8.2.1以及更高的版本。有两种方式:
(1)Receiver Based Approach:基于kafka high-level consumer api,有一个Receiver负责接收数据到执行器
(2)Direct Approcah:基于kafka simple consumer api,没有receiver。

mavne项目需要添加依赖

    <dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.1.0</version></dependency>

Reviced based approach代码:使用方法见注释

package com.lgh.sparkstreamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils/*** Created by Administrator on 2017/8/23.*/
object KafkaWordCount {def main(args: Array[String]): Unit = {if (args.length < 4) {System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")System.exit(1)}//参数分别为 zk地址,消费者group名,topic名 多个的话,分隔 ,线程数val Array(zkQuorum, group, topics, numThreads) = args//setmaster,local是调试模式使用val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf, Seconds(2))ssc.checkpoint("checkpoint")//Map类型存储的是   key: topic名字   values: 读取该topic的消费者的分区数val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap//参数分别为StreamingContext,kafka的zk地址,消费者group,Map类型val kafkamessage = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)//_._2取出kafka的实际消息流val lines=kafkamessage.map(_._2)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)wordCounts.print()ssc.start()ssc.awaitTermination()}
}

Direct approach:

package com.lgh.sparkstreamingimport kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils/*** Created by Administrator on 2017/8/23.*/
object DirectKafkaWordCount {def main(args: Array[String]) {if (args.length < 2) {System.err.println(s"""|Usage: DirectKafkaWordCount <brokers> <topics>|  <brokers> is a list of one or more Kafka brokers|  <topics> is a list of one or more kafka topics to consume from|""".stripMargin)System.exit(1)}//borkers : kafka的broker 列表,多个的话以逗号分隔//topics: kafka topic,多个的话以逗号分隔val Array(brokers, topics) = args// Create context with 2 second batch intervalval sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf, Seconds(2))// Create direct kafka stream with brokers and topicsval topicsSet = topics.split(",").toSetval kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)// Get the lines, split them into words, count the words and printval lines = messages.map(_._2)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)wordCounts.print()// Start the computationssc.start()ssc.awaitTermination()}}

关于这两种方式的区别

1.Simplified Parallelism
Direct 方式将会创建跟kafka分区一样多的RDD partiions,并行的读取kafka topic的partition数据。kafka和RDD partition将会有一对一的对应关系。
2.Efficiency
Receiver-based Approach需要启用WAL才能保证消费不丢失数据
,效率比较低
3.Exactly-once semantics
Receiver-based Approach使用kafka high-level consumer api,存储消费者offset在zookeeper中,跟Write Ahead Log配合使用,能够实现至少消费一次语义。
Direct Approach 使用kafka simple consumer api,跟踪offset信息存储在spark checkpoint中。能够实现数据有且只消费一次语义。

spark-streaming 编程(三)连接kafka消费数据相关推荐

  1. 编程实现将rdd转换为dataframe:源文件内容如下(_第四篇|Spark Streaming编程指南(1)

    Spark Streaming是构建在Spark Core基础之上的流处理框架,是Spark非常重要的组成部分.Spark Streaming于2013年2月在Spark0.7.0版本中引入,发展至今 ...

  2. Spark Streaming简介 (三十四)

    Spark Streaming简介 Spark Streaming 是 Spark 提供的对实时数据进行流式计算的组件.它是 Spark 核心 API 的一个扩展,具有吞吐量高.容错能力强的实时流数据 ...

  3. Spark Streaming 编程指南[中英对照]

    2019独角兽企业重金招聘Python工程师标准>>> 基于Spark 2.0 Preview的材料翻译,原[英]文地址: http://spark.apache.org/docs/ ...

  4. 第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考

    特别说明:  在上一遍文章中有详细的叙述Receiver启动的过程,如果不清楚的朋友,请您查看上一篇博客,这里我们就基于上篇的结论,继续往下说. 博文的目标是:  Spark Streaming在接收 ...

  5. Spark大数据分析与实战:Spark Streaming编程初级实践

    Spark Streaming编程初级实践 一.安装Hadoop和Spark 具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作: Hadoop的安装:https://blog.csdn ...

  6. sparkstreaming直接从kafka消费数据

    1.sparkstreaming直接从kafka消费数据 采用createDirectStream,示例: createDirectStream[K, V, KD <: Decoder[K],  ...

  7. 【kafka】kafka 消费数据的时候 报错 (Re-) join group

    文章目录 1.场景1 1.1 概述 2.场景2 3.场景3 1.场景1 1.1 概述 kafka 消费数据的时候 报错 如下 2.场景2 spirng-kafka的多consumer问题困扰了我好久, ...

  8. flume+kafka消费数据【纯个人笔记】

    1.数据生产 使用java代码往一个文件中写入数据 package com.mobile;import java.io.*; import java.text.DecimalFormat; impor ...

  9. Spark Stream整合flum和kafka,数据存储在HBASE上,分析后存入数据库

    开发环境:Hadoop+HBASE+Phoenix+flum+kafka+spark+MySQL 默认配置好了Hadoop的开发环境,并且已经安装好HBASE等组件. 下面通过一个简单的案例进行整合: ...

  10. Spark Streaming 编程新手入门指南

    Spark Streaming 是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理.可以从许多数据源(例如Kafka,Flume,Kinesis或TCP sockets)中 ...

最新文章

  1. 每天一个linux命令(10):cat 命令
  2. 全面了解 Nginx 主要应用场景
  3. 【转】Android 获取本机号码(收集)
  4. SetupDiGetClassDevs函数详解
  5. JDK1.6官方下载_JDK6官方下载地址
  6. 将 Visual Studio 的代码片段导出到 VS Code
  7. GPU Saturday技术沙龙:OpenCL程序员眼中的下一代APU架构
  8. [你必须知道的.NET] 第五回:深入浅出关键字---把new说透(转载)
  9. eclipse 插件扩展新建java页面_java-Eclipse插件:创建动态菜单和相应的处理...
  10. 开源2D图形库/图形框架
  11. 华为一则面试题登上热搜;大众点评会员因点赞过多被处罚;Linux Kernel 5.12 发布|极客头条...
  12. 颜色列表(中英文名称,RGB HSV CMYK值)
  13. 作用JavaScript访问和操作数据库
  14. 轻量级网络之GhostNet
  15. 泰坦尼克号分析是否获救
  16. list迭代器的模拟实现
  17. android 加密手机功能,怎么为安卓手机加密
  18. 10个深度学习的工具
  19. FloorPlan 经验总结
  20. 计算机操作系统复习精简版

热门文章

  1. 火遍全网的「蚂蚁呀嘿」教程开源了!
  2. 海关179号出口清单报文CEB603Message描述规范
  3. nginx出现499现象及原因
  4. maven setting配置超详解
  5. 慧荣SM2246EN主控如何进行RDT测试开卡
  6. 心灵捕手影评,觉得很好。看一部好电影就像读一本好书。
  7. 无盘服务器内存回写速度,网吧文化监管平台异常频繁回写数据导致无盘客户机游戏秒卡,打字卡...
  8. mysql 唯一性榆树_榆树有多少种
  9. 模拟实现strlen
  10. Win10系统如何通过备忘录的日历设定定时提醒实现日程计划和提醒