目录

  • 1 Kafka 数据消费
  • 2 Kafka 数据源
  • 3 Kafka 接收器
    • 3.1 配置说明
    • 3.2 实时数据ETL架构
    • 3.3 模拟基站日志数据
    • 3.4 实时增量ETL
  • 4 Kafka 特定配置

1 Kafka 数据消费

Apache Kafka 是目前最流行的一个分布式的实时流消息系统,给下游订阅消费系统提供了并行处理和可靠容错机制,现在大公司在流式数据的处理场景,Kafka基本是标配。StructuredStreaming很好的集成Kafka,可以从Kafka拉取消息,然后就可以把流数据看做一个DataFrame, 一张无限增长的大表,在这个大表上做查询,Structured Streaming保证了端到端的 exactly-once,用户只需要关心业务即可,不用费心去关心底层是怎么做的。
StructuredStreaming集成Kafka,官方文档如下:

http://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html

目前仅支持Kafka 0.10.+版本及以上,底层使用Kafka New Consumer API拉取数据,如果公司Kafka版本为0.8.0版本,StructuredStreaming集成Kafka参考文档:

https://github.com/jerryshao/spark-kafka-0-8-sql

StructuredStreaming既可以从Kafka读取数据,又可以向Kafka 写入数据,添加Maven依赖:

<!-- Structured Streaming + Kafka 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.5</version>
</dependency>

Maven Project工程中目录结构如下:

Kafka把生产者发送的数据放在不同的分区里面,这样就可以并行进行消费了。每个分区里面的数据都是递增有序的,跟structured commit log类似,生产者和消费者使用Kafka 进行解耦,消费者不管你生产者发送的速率如何,只要按照一定的节奏进行消费就可以了。每条消息在一个分区里面都有一个唯一的序列号offset(偏移量),Kafka 会对内部存储的消息设置一个过期时间,如果过期了,就会标记删除,不管这条消息有没有被消费。Kafka 可以被看成一个无限的流,里面的流数据是短暂存在的,如果不消费,消息就过期滚动没了。涉及一个问题:如果开始消费,就要定一下从什么位置开始。

  • 第一、earliest:从最起始位置开始消费,当然不一定是从0开始,因为如果数据过期就清掉了,所以可以理解为从现存的数据里最小位置开始消费;
  • 第二、latest:从最末位置开始消费;
  • 第三、per-partition assignment:对每个分区都指定一个offset,然后从offset位置开始消费;当第一次开始消费一个Kafka 流的时候,上述策略任选其一,如果之前已经消费了,而且做了checkpoint ,比如消费程序升级了,这时候就会从上次结束的位置开始继续消费。目前StructuredStreaming和Flink框架从Kafka消费数据时,采用的就是上述的策略。

2 Kafka 数据源

Structured Streaming消费Kafka数据,采用的是poll方式拉取数据,与Spark Streaming中NewConsumer API集成方式一致。从Kafka Topics中读取消息,需要指定数据源(kafka)、Kafka集群的连接地址(kafka.bootstrap.servers)、消费的topic(subscribe或subscribePattern), 指定topic的时候,可以使用正则来指定,也可以指定一个 topic 的集合。官方提供三种方式从Kafka topic中消费数据,主要区别在于每次消费Topic名称指定,

  • 方式一:消费一个Topic数据
  • 方式二:消费多个Topic数据
  • 方式三:消费通配符匹配Topic数据
    从Kafka 获取数据后Schema字段信息如下,既包含数据信息有包含元数据信息:
    在实际开发时,往往需要获取每条数据的消息,存储在value字段中,由于是binary类型,需要转换为字符串String类型;此外了方便数据操作,通常将获取的key和value的DataFrame转换为Dataset强类型,伪代码如下:

    从Kafka数据源读取数据时,可以设置相关参数,包含必须参数和可选参数:
  • 必须参数:kafka.bootstrap.servers和subscribe,可以指定开始消费偏移量assign。
  • 可选参数:
    范例演示:从Kafka消费数据,进行词频统计,Topic为wordsTopic。
  • 第一步、创建Topic,相关命令如下:
# 启动Zookeeper
/export/server/zookeeper/bin/zkServer.sh start
# 启动Kafka Broker
/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
## ================================= wordsTopic =================================
# 查看Topic信息
/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1.oldlu.cn:2181/kafka200
# 创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1.oldlu.cn:2181/kafka200 --replicatio
n-factor 1 --partitions 3 --topic wordsTopic
# 模拟生产者
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1.oldlu.cn:9092 --topic wordsTopic
  • 第二步、编写代码,其中设置每批次消费数据最大量
package cn.oldlu.spark.kafka.sourceimport org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 使用Structured Streaming从Kafka实时读取数据,进行词频统计,将结果打印到控制台。*/
object StructuredKafkaSource {def main(args: Array[String]): Unit = {// 构建SparkSession实例对象val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[3]")// 设置Shuffle分区数目.config("spark.sql.shuffle.partitions", "3").getOrCreate()// 导入隐式转换和函数库import spark.implicits._import org.apache.spark.sql.functions._// TODO: 从Kafka读取数据,底层采用New Consumer APIval kafkaStreamDF: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node1.oldlu.cn:9092").option("subscribe", "wordsTopic")// TODO: 设置每批次消费数据最大值.option("maxOffsetsPerTrigger", "100000").load()// TODO: 进行词频统计val resultStreamDF: DataFrame = kafkaStreamDF// 获取value字段的值,转换为String类型.selectExpr("CAST(value AS STRING)")// 转换为Dataset类型.as[String]// 过滤数据.filter(line => null != line && line.trim.length > 0)// 分割单词.flatMap(line => line.trim.split("\\s+"))// 按照单词分组,聚合.groupBy($"value").count()// 设置Streaming应用输出及启动val query: StreamingQuery = resultStreamDF.writeStream.outputMode(OutputMode.Complete()).format("console").option("numRows", "10").option("truncate", "false").start()query.awaitTermination() // 查询器等待流式应用终止query.stop() // 等待所有任务运行完成才停止运行}
}

3 Kafka 接收器

往Kafka里面写数据类似读取数据,可以在DataFrame上调用writeStream来写入Kafka,设置参
数指定value,其中key是可选的,如果不指定就是null。如果key为null,有时候可能导致分区数据
不均匀。

3.1 配置说明

将DataFrame写入Kafka时,Schema信息中所需的字段:
需要写入哪个topic,可以像上述所示在操作DataFrame 的时候在每条record上加一列topic字段指定,也可以在DataStreamWriter上指定option配置。写入数据至Kafka,需要设置Kafka Brokers地址信息及可选配置:

  • 必选参数:kafka.bootstrap.servers,使用逗号隔开【host:port】字符;
  • 可选参数:topic,如果DataFrame中没有topic列,此处指定topic表示写入Kafka Topic。
    官方提供示例代码如下:

3.2 实时数据ETL架构

在实际实时流式项目中,无论使用Storm、SparkStreaming、Flink及Structured Streaming处理流式数据时,往往先从Kafka 消费原始的流式数据,经过ETL后将其存储到Kafka Topic中,以便其他业务相关应用消费数据,实时处理分析,技术架构流程图如下所示:
接下来模拟产生运营商基站数据,实时发送到Kafka 中,使用StructuredStreaming消费,经过ETL(获取通话状态为success数据)后,写入Kafka中,便于其他实时应用消费处理分析。

3.3 模拟基站日志数据

模拟产生运营商基站通话日志数据,封装到样例类中,字段信息如下

package cn.oldlu.spark.kafka.mock/*** 基站通话日志数据,字段如下:** @param stationId  基站标识符ID* @param callOut    主叫号码* @param callIn     被叫号码* @param callStatus 通话状态* @param callTime   通话时间* @param duration   通话时长*/
case class StationLog(stationId: String, //callOut: String, //callIn: String, //callStatus: String, //callTime: Long, //duration: Long //) {override def toString: String = s"$stationId,$callOut,$callIn,$callStatus,$callTime,$duration"
}

创建Topic,相关命令如下:

# 启动Zookeeper
/export/server/zookeeper/bin/zkServer.sh start
# 启动Kafka Broker
/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
## ================================= stationTopic =================================
# 创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1.oldlu.cn:2181/kafka200 --replication-
factor 1 --partitions 3 --topic stationTopic
# 模拟生产者
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1.oldlu.cn:9092 --topic stationTopic
# 模拟消费者
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.oldlu.cn:9092 --topic station
Topic --from-beginning
# 删除topic
/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1.oldlu.cn:2181/kafka200 --topic statio
nTopic
## ================================= etlTopic =================================
# 创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1.oldlu.cn:2181/kafka200 --replication-
factor 1 --partitions 3 --topic etlTopic
# 模拟生产者
/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node1.oldlu.cn:9092 --topic etlTopic
# 模拟消费者
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.oldlu.cn:9092 --topic etlTopi
c --from-beginning

编写代码,实时产生日志数据,发送Kafka Topic:

package cn.oldlu.spark.kafka.mockimport java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.Random/*** 模拟产生基站日志数据,实时发送Kafka Topic中,数据字段信息:* 基站标识符ID, 主叫号码, 被叫号码, 通话状态, 通话时间,通话时长*/
object MockStationLog {def main(args: Array[String]): Unit = {// 发送Kafka Topicval props = new Properties()props.put("bootstrap.servers", "node1.oldlu.cn:9092")props.put("acks", "1")props.put("retries", "3")props.put("key.serializer", classOf[StringSerializer].getName)props.put("value.serializer", classOf[StringSerializer].getName)val producer = new KafkaProducer[String, String](props)val random = new Random()val allStatus = Array("fail", "busy", "barring", "success", "success", "success","success", "success", "success", "success", "success", "success")while (true) {val callOut: String = "1860000%04d".format(random.nextInt(10000))val callIn: String = "1890000%04d".format(random.nextInt(10000))val callStatus: String = allStatus(random.nextInt(allStatus.length))val callDuration = if ("success".equals(callStatus)) (1 + random.nextInt(10)) * 1000L else 0L// 随机产生一条基站日志数据val stationLog: StationLog = StationLog("station_" + random.nextInt(10), //callOut, //callIn, //callStatus, //System.currentTimeMillis(), //callDuration //)println(stationLog.toString)Thread.sleep(100 + random.nextInt(100))val record = new ProducerRecord[String, String]("stationTopic", stationLog.toString)producer.send(record)}producer.close() // 关闭连接}
}

运行程序,基站通话日志数据格式如下:

station_7,18600009710,18900000269,success,1590709965144,4000
station_6,18600003894,18900000028,success,1590709965333,8000
station_7,18600007207,18900001057,busy,1590709965680,0

3.4 实时增量ETL

编写应用实时从Kafka的【stationTopic】消费数据,经过处理分析后,存储至Kafka的
【etlTopic】,其中需要设置检查点目录,保证应用一次且仅一次的语义。

package cn.oldlu.spark.kafka.sinkimport org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** 实时从Kafka Topic消费基站日志数据,过滤获取通话转态为success数据,再存储至Kafka Topic中* 1、从KafkaTopic中获取基站日志数据(模拟数据,JSON格式数据)* 2、ETL:只获取通话状态为success日志数据* 3、最终将ETL的数据存储到Kafka Topic中*/
object StructuredEtlSink {def main(args: Array[String]): Unit = {// 构建SparkSession实例对象val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[3]")// 设置Shuffle分区数目.config("spark.sql.shuffle.partitions", "3").getOrCreate()// 导入隐式转换和函数库import spark.implicits._import org.apache.spark.sql.functions._// 1. 从KAFKA读取数据val kafkaStreamDF: DataFrame = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node1.oldlu.cn:9092").option("subscribe", "stationTopic").load()// 2. 对基站日志数据进行ETL操作// station_0,18600004405,18900009049,success,1589711564033,9000val etlStreamDF: Dataset[String] = kafkaStreamDF// 获取value字段的值,转换为String类型.selectExpr("CAST(value AS STRING)")// 转换为Dataset类型.as[String]// 过滤数据:通话状态为success.filter { log =>null != log && log.trim.split(",").length == 6 && "success".equals(log.trim.split(",")(3))}etlStreamDF.printSchema()// 3. 针对流式应用来说,输出的是流val query: StreamingQuery = etlStreamDF.writeStream// 对流式应用输出来说,设置输出模式.outputMode(OutputMode.Append()).format("kafka").option("kafka.bootstrap.servers", "node1.oldlu.cn:9092").option("topic", "etlTopic")// 设置检查点目录.option("checkpointLocation", s"datas/structured/etl-100001")// 流式应用,需要启动start.start()// 查询器等待流式应用终止query.awaitTermination()query.stop() // 等待所有任务运行完成才停止运行}
}

4 Kafka 特定配置

从Kafka消费数据时,相关配置属性可以通过带有kafka.prefix的DataStreamReader.option进行
设置,例如前面设置Kafka Brokers地址属性:stream.option(“kafka.bootstrap.servers”, “host:port”),
更多关于Kafka 生产者Producer Config配置属和消费者Consumer Config配置属性,参考文档:

  • 生产者配置(Producer Configs):
  1. http://kafka.apache.org/20/documentation.html#producerconfigs
  • 消费者配置(New Consumer Configs):
  1. http://kafka.apache.org/20/documentation.html#newconsumerconfigs
    注意以下Kafka参数属性可以不设置,如果设置的话,Kafka source或者sink可能会抛出错误:
  • 1)、group.id:Kafka source将会自动为每次查询创建唯一的分组ID;
  • 2)、auto.offset.reset:在将source选项startingOffsets设置为指定从哪里开始。结构化流管理
    内部消费的偏移量,而不是依赖Kafka消费者来完成。这将确保在topic/partitons动态订阅时不
    会遗漏任何数据。注意,只有在启动新的流式查询时才会应用startingOffsets,并且恢复操作
    始终会从查询停止的位置启动;
  • 3)、key.deserializer/value.deserializer:Keys/Values总是被反序列化为ByteArrayDeserializer
    的字节数组,使用DataFrame操作显式反序列化keys/values;
  • 4)、key.serializer/value.serializer:keys/values总是使用ByteArraySerializer或StringSerializer
    进行序列化,使用DataFrame操作将keysvalues/显示序列化为字符串或字节数组;
  • 5)、enable.auto.commit:Kafka source不提交任何offset;
  • 6)、interceptor.classes:Kafka source总是以字节数组的形式读取key和value。使用
    ConsumerInterceptor是不安全的,因为它可能会打断查询;

大数据Spark Structured Streaming集成 Kafka相关推荐

  1. 大数据Spark Structured Streaming

    目录 1 Spark Streaming 不足 2 Structured Streaming 概述 2.1 模块介绍 2.3 编程模型 3 入门案例:WordCount 3.1 功能演示 3.2 So ...

  2. kafka spark Structured streaming整合后集群报错KafkaConsumer.subscribe(Ljava/util/Collection;)V

    简介 整个项目架构是在CDH中,,然后spark Structured streaming消费kafka. spark 2.3版本 kafka0.10版本 <!-- spark sql kafk ...

  3. 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka

    目录 整合 Kafka 说明 Kafka特定配置 ​​​​​​​KafkaSoure 1.消费一个Topic数据 2.消费多个Topic数据 3.消费通配符匹配Topic数据 ​​​​​​​Kafka ...

  4. 2021年大数据Spark(四十五):Structured Streaming Sources 输入源

    目录 Sources 输入源 Socket数据源-入门案例 需求 编程实现 ​​​​​​​文件数据源-了解 ​​​​​​​需求 ​​​​​​​代码实现 ​​​​​​​Rate source-了解 So ...

  5. 大数据Spark “蘑菇云”行动第76课: Kafka+Spark Streaming+Redis项目实战

    大数据Spark "蘑菇云"行动第76课:   Kafka+Spark Streaming+Redis项目实战 jedis插件 redis <dependency>   ...

  6. Structured Streaming从Kafka 0.8中读取数据的问题

    众所周知,Structured Streaming默认支持Kafka 0.10,没有提供针对Kafka 0.8的Connector,但这对高手来说不是事儿,于是有个Hortonworks的邵大牛(前段 ...

  7. 【Spark Streaming】(四)基于 Spark Structured Streaming 的开发与数据处理

    文章目录 一.前言 二.Spark Streaming vs Structured Streaming 2.1 Spark Streaming 2.2 Structured Streaming 2.3 ...

  8. 2016年大数据Spark“蘑菇云”行动代码学习之AdClickedStreamingStats模块分析

    2016年大数据Spark"蘑菇云"行动代码学习之AdClickedStreamingStats模块分析     系统背景:用户使用终端设备(IPAD.手机.浏览器)等登录系统,系 ...

  9. 光环大数据spark文档_推荐大数据Spark必读书目

    我有一个非常要好的同事,无数次帮我解决了业务上的痛.技术能力很强,业务方面也精通.而且更耐得住加班,并且是自愿加班,毫无怨言.不像我,6点到准时走人了.但就是这么一位兢兢业业的技术人,却一直没有升职加 ...

最新文章

  1. golang已关闭channel
  2. 找出口和BADI的ABAP程序
  3. MVC5 Controller构造方法获取User为空解决方法
  4. Linux 初始化系统(系统服务管理和控制程序/Init System) -- System V init(SysV init) 的简单理解
  5. 表单oninput和onchange事件区别
  6. mac电脑bash_profile创建,打开,编辑,保存
  7. Hls之TS流分离音视频
  8. iconfont-矢量图标字体的运用
  9. 计算机程序设计里的奇书
  10. BZOJ3674: 可持久化并查集加强版
  11. php发送sql,php学习笔记(二)php与mysql连接与用php发送SQL查询
  12. Python scrapy 将mmjpg图片下载到本地
  13. js 获取窗口高度 兼容 各种浏览器
  14. 2021中国工业软件上市企业公司排行2021中国智能制造企业排名
  15. Spring/SpringBoot 过滤器修改、获取http 请求request中的参数 和 response返回值,比如修改请求体和响应体的字符编码
  16. windows下sass开发环境的搭建
  17. 微信小程序图片404时显示默认图片
  18. vps服务器租用费用
  19. 《软件技术基础》之《同步》
  20. 讲真话、求真知、做真我——Facebook COO桑德伯格2012哈佛商学院毕业演讲

热门文章

  1. 新员工试用期月度总结
  2. 第三方支付接口,安全稳定永远是第一位
  3. 天猫“双11”狂欢节背后:阿里巴巴加速新技术演变
  4. 『开发』网页端展示深度学习模型|Gradio上手教程
  5. 利用js写一个类似打地鼠的小游戏
  6. 华为 Linux 内核贡献者被质疑刷 KPI
  7. etree.html 报错 AttributeError:‘function’ object has no attribut ‘HTML’
  8. 马云说今年他要读100本书,你呢?
  9. 【数据结构】【哈夫曼树】哈夫曼树、赫夫曼树(Huffman Tree)C语言实现
  10. c# delegate知识