Structured Streaming 整合 Kafka指南
用于 Kafka 0.10 的结构化流式处理集成,用于从 Kafka 读取数据和写入数据。
从kafka读取数据
// Subscribe to 1 topic
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]// Subscribe to 1 topic, with headers
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1").option("includeHeaders", "true").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers").as[(String, String, Map)]// Subscribe to multiple topics
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1,topic2").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]// Subscribe to a pattern
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribePattern", "topic.*").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
从kafka指定offset读取数据
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1,topic2").option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""").option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]// Subscribe to a pattern, at the earliest and latest offsets
val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribePattern", "topic.*").option("startingOffsets", "earliest").option("endingOffsets", "latest").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
source中内容每一行的schema
如下,key
和value
都是binary
类型的,所以需要处理下,cast as string
以下参数必须设置,其中assign,subscribe,subscribePattern
三个参数选择一个设置就可以了
Option | value | meaning |
---|---|---|
assign | json string {“topicA”:[0,1],“topicB”:[2,4]} | Specific TopicPartitions to consume. |
subscribe | 逗号分开的topics | The topic list to subscribe. |
subscribePattern | java正则表达式 | The pattern used to subscribe to topic(s). |
kafka.bootstrap.servers | 逗号分割的host:port | 等价于 Kafka “bootstrap.servers” 配置 |
以下参数是可选的
startingOffsets
取值可以是
"earliest", "latest" (streaming only),
or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """
含义
开始查询时的起点,可以是“earliest”(来自最早的偏移量),“latest”(仅来自最新的偏移量),或者是为每个TopicPartition
指定起始偏移量的json字符串
。 在json
中,可使用-2
作为偏移量来指代最早的,-1
到最新的。 注意:对于批查询,不允许最新(隐式或在json中使用-1)。 对于流查询,这仅在启动新查询时适用,并且恢复将始终从查询中断的地方开始。 查询期间新发现的分区最早将开始。
参考
Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.0.1 Documentation
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Structured Streaming 整合 Kafka指南相关推荐
- Structured Streaming整合kafka
Structured Streaming整合kafka Spark2.0以后开始推出Structured Streaming,详情参考上文Spark2.0 Structured Streaming.本 ...
- 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
目录 整合 Kafka 说明 Kafka特定配置 KafkaSoure 1.消费一个Topic数据 2.消费多个Topic数据 3.消费通配符匹配Topic数据 Kafka ...
- kafka spark Structured streaming整合后集群报错KafkaConsumer.subscribe(Ljava/util/Collection;)V
简介 整个项目架构是在CDH中,,然后spark Structured streaming消费kafka. spark 2.3版本 kafka0.10版本 <!-- spark sql kafk ...
- Structured Streaming从Kafka 0.8中读取数据的问题
众所周知,Structured Streaming默认支持Kafka 0.10,没有提供针对Kafka 0.8的Connector,但这对高手来说不是事儿,于是有个Hortonworks的邵大牛(前段 ...
- spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access
问题描述 spark streaming 使用 直连方式 读取kafka 数据,使用窗口时出现 java.util.ConcurrentModificationException: KafkaCons ...
- 大数据Spark Structured Streaming集成 Kafka
目录 1 Kafka 数据消费 2 Kafka 数据源 3 Kafka 接收器 3.1 配置说明 3.2 实时数据ETL架构 3.3 模拟基站日志数据 3.4 实时增量ETL 4 Kafka 特定配置 ...
- 7.1.5 智慧物流【车辆监控Structured Streaming、整合kafka、Redis、Mysql、HBASE 写入数据】
车辆监控 文章目录 车辆监控 第一节 Structured Streaming 1.1 Structured Streaming发展历史 1.1.1 Spark Streaming 1.1.2 Dat ...
- Structured Streaming基础入门
Structured Streaming 1. 回顾和展望 1.1. Spark 编程模型的进化过程 RDD rdd.flatMap(_.split(" ")).map((_, 1 ...
- Structured Streaming + Kafka测试
前言 Structured Streaming出来有几年了,一直没有机会使用,最近闲来无事,就想先测试一下,完全没有细看关于它的一些详细介绍情况,仅仅想根据官网案例,执行一遍,没想到- copy官网的 ...
最新文章
- OpenCV使用 GrabCut 算法进行交互式前景提取
- request.getSession(false)到底返回什么
- 8086PC机的内存地址空间分配
- 【UIKit】UIView基础学习
- java二维对象数组存入文件_关于Java:将2D数组保存到磁盘文件
- java keytool 代码_JDK keytool证书工具功能代码解析_java_脚本之家
- D进制转换-C++实现
- 【转】MATLAB的polar函数 极坐标绘制最大半径怎样设置
- 《集体智慧编程》第二章(一)
- 电脑休眠和睡眠的区别_Windows操作系统中的休眠模式和睡眠模式有什么区别?...
- linux系统支持多种硬件平台吗,linux操作系统对硬件的要求是多少
- 反斜杠在C/C++中的作用
- vue 路由参数变化,页面不刷新(数据不更新)解决方法
- R语言金融基础:tidyquant数据整理(滑窗建模)
- Atitit 文件读取规范 目录 1.1. 以fgetss取代fgets读取一行并过滤掉 HTML 和 PHP 标记。	1 1.2. 3. 以二进制读取 fread取代fgets	1 1.3. 4.
- 有了WCF,Socket是否已人老珠黄?
- Python(三):数
- 微信小程序体验版无法调用接口
- 经验分享 针式打印机经典案例分析
- CSS深入理解之absolute
热门文章
- php写入mysql表格失败,麻烦给我看一下为什么MySQL创建表格失败嘛
- python怎么读取图片文件大小_python怎么读取图片大小
- anki模板_【授权转载】【Anki高级操作技巧】(19)-如何在模板上添加新字段
- java 弹出另存为_java如何实现 io流传输过来的文件,提示另存为弹出窗口?
- C语言会场安排问题贪心算法,贪心算法解决会场安排问题多处最优服务次序问题(含源代码).doc...
- python 表格格式输出_简单介绍python输出列表元素的所有排列形式
- retrofit content-length为0_Retrofit 源码剖析
- 是以微型计算机为中心 配以相应的外围设备,______是以微型计算机为中心,配以相应的外围设备、电源和辅助电路,以及指挥微型计算机工作的系统软件而构成的。...
- table中加表单元素怎么验证_el-table嵌入表单元素注意事项(验证规则prop写法与数据初始化)...
- php判断是字符串类型,php使用strpos判断字符串中数字类型子字符串出错的解决方法 原创...