用于 Kafka 0.10 的结构化流式处理集成,用于从 Kafka 读取数据和写入数据。

从kafka读取数据

// Subscribe to 1 topic
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]// Subscribe to 1 topic, with headers
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1").option("includeHeaders", "true").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers").as[(String, String, Map)]// Subscribe to multiple topics
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1,topic2").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]// Subscribe to a pattern
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribePattern", "topic.*").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]

从kafka指定offset读取数据

// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribe", "topic1,topic2").option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""").option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]// Subscribe to a pattern, at the earliest and latest offsets
val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("subscribePattern", "topic.*").option("startingOffsets", "earliest").option("endingOffsets", "latest").load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]

source中内容每一行的schema如下,keyvalue都是binary类型的,所以需要处理下,cast as string

以下参数必须设置,其中assign,subscribe,subscribePattern三个参数选择一个设置就可以了

Option value meaning
assign json string {“topicA”:[0,1],“topicB”:[2,4]} Specific TopicPartitions to consume.
subscribe 逗号分开的topics The topic list to subscribe.
subscribePattern java正则表达式 The pattern used to subscribe to topic(s).
kafka.bootstrap.servers 逗号分割的host:port 等价于 Kafka “bootstrap.servers” 配置

以下参数是可选的

startingOffsets

取值可以是

"earliest", "latest" (streaming only),
or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """

含义
开始查询时的起点,可以是“earliest”(来自最早的偏移量),“latest”(仅来自最新的偏移量),或者是为每个TopicPartition指定起始偏移量的json字符串。 在json中,可使用-2作为偏移量来指代最早的,-1到最新的。 注意:对于批查询,不允许最新(隐式或在json中使用-1)。 对于流查询,这仅在启动新查询时适用,并且恢复将始终从查询中断的地方开始。 查询期间新发现的分区最早将开始。

参考

Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.0.1 Documentation
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

Structured Streaming 整合 Kafka指南相关推荐

  1. Structured Streaming整合kafka

    Structured Streaming整合kafka Spark2.0以后开始推出Structured Streaming,详情参考上文Spark2.0 Structured Streaming.本 ...

  2. 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka

    目录 整合 Kafka 说明 Kafka特定配置 ​​​​​​​KafkaSoure 1.消费一个Topic数据 2.消费多个Topic数据 3.消费通配符匹配Topic数据 ​​​​​​​Kafka ...

  3. kafka spark Structured streaming整合后集群报错KafkaConsumer.subscribe(Ljava/util/Collection;)V

    简介 整个项目架构是在CDH中,,然后spark Structured streaming消费kafka. spark 2.3版本 kafka0.10版本 <!-- spark sql kafk ...

  4. Structured Streaming从Kafka 0.8中读取数据的问题

    众所周知,Structured Streaming默认支持Kafka 0.10,没有提供针对Kafka 0.8的Connector,但这对高手来说不是事儿,于是有个Hortonworks的邵大牛(前段 ...

  5. spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access

    问题描述 spark streaming 使用 直连方式 读取kafka 数据,使用窗口时出现 java.util.ConcurrentModificationException: KafkaCons ...

  6. 大数据Spark Structured Streaming集成 Kafka

    目录 1 Kafka 数据消费 2 Kafka 数据源 3 Kafka 接收器 3.1 配置说明 3.2 实时数据ETL架构 3.3 模拟基站日志数据 3.4 实时增量ETL 4 Kafka 特定配置 ...

  7. 7.1.5 智慧物流【车辆监控Structured Streaming、整合kafka、Redis、Mysql、HBASE 写入数据】

    车辆监控 文章目录 车辆监控 第一节 Structured Streaming 1.1 Structured Streaming发展历史 1.1.1 Spark Streaming 1.1.2 Dat ...

  8. Structured Streaming基础入门

    Structured Streaming 1. 回顾和展望 1.1. Spark 编程模型的进化过程 RDD rdd.flatMap(_.split(" ")).map((_, 1 ...

  9. Structured Streaming + Kafka测试

    前言 Structured Streaming出来有几年了,一直没有机会使用,最近闲来无事,就想先测试一下,完全没有细看关于它的一些详细介绍情况,仅仅想根据官网案例,执行一遍,没想到- copy官网的 ...

最新文章

  1. OpenCV使用 GrabCut 算法进行交互式前景提取
  2. request.getSession(false)到底返回什么
  3. 8086PC机的内存地址空间分配
  4. 【UIKit】UIView基础学习
  5. java二维对象数组存入文件_关于Java:将2D数组保存到磁盘文件
  6. java keytool 代码_JDK keytool证书工具功能代码解析_java_脚本之家
  7. D进制转换-C++实现
  8. 【转】MATLAB的polar函数 极坐标绘制最大半径怎样设置
  9. 《集体智慧编程》第二章(一)
  10. 电脑休眠和睡眠的区别_Windows操作系统中的休眠模式和睡眠模式有什么区别?...
  11. linux系统支持多种硬件平台吗,linux操作系统对硬件的要求是多少
  12. 反斜杠在C/C++中的作用
  13. vue 路由参数变化,页面不刷新(数据不更新)解决方法
  14. R语言金融基础:tidyquant数据整理(滑窗建模)
  15. Atitit 文件读取规范 目录 1.1. 以fgetss取代fgets读取一行并过滤掉 HTML 和 PHP 标记。 1 1.2. 3. 以二进制读取 fread取代fgets 1 1.3. 4.
  16. 有了WCF,Socket是否已人老珠黄?
  17. Python(三):数
  18. 微信小程序体验版无法调用接口
  19. 经验分享 针式打印机经典案例分析
  20. CSS深入理解之absolute

热门文章

  1. php写入mysql表格失败,麻烦给我看一下为什么MySQL创建表格失败嘛
  2. python怎么读取图片文件大小_python怎么读取图片大小
  3. anki模板_【授权转载】【Anki高级操作技巧】(19)-如何在模板上添加新字段
  4. java 弹出另存为_java如何实现 io流传输过来的文件,提示另存为弹出窗口?
  5. C语言会场安排问题贪心算法,贪心算法解决会场安排问题多处最优服务次序问题(含源代码).doc...
  6. python 表格格式输出_简单介绍python输出列表元素的所有排列形式
  7. retrofit content-length为0_Retrofit 源码剖析
  8. 是以微型计算机为中心 配以相应的外围设备,______是以微型计算机为中心,配以相应的外围设备、电源和辅助电路,以及指挥微型计算机工作的系统软件而构成的。...
  9. table中加表单元素怎么验证_el-table嵌入表单元素注意事项(验证规则prop写法与数据初始化)...
  10. php判断是字符串类型,php使用strpos判断字符串中数字类型子字符串出错的解决方法 原创...