Structured Streaming整合kafka

Spark2.0以后开始推出Structured Streaming,详情参考上文Spark2.0 Structured Streaming。本文介绍一种常用的方式: Structured Streaming读取kafka数据,并使用spark sql过滤,最终输出到终端。

本示例能够读取多个topic数据,并分别映射为Spark内存表,执行多个sql。

1.环境

(1)版本

Spark版本:spark-2.2.0

Scala版本:scala-2.11.4

Kafka版本:kafka 0.10 or higher

(2)数据

Kafka topic:dyl_test01

数据内容:{"a":"1","b":"2","c":"2018-01-08","d":[23.9,45]}

2.基本介绍

2.1 Linking

使用maven/SBT的项目,需要引用如下的库文件:

groupId = org.apache.spark

artifactId = spark-sql-kafka-0-10_2.11

version = 2.2.1

其他jar

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
      <scope>${scope}</scope>

</dependency>

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
      <scope>${scope}</scope>

</dependency>

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
      <scope>${scope}</scope>

</dependency>

2.2 Reading Data from Kafka

(1)直接从Kafka读取数据,并且进行查询:

// Subscribe to 1 topic

val df= spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers","host1:port1,host2:port2")

.option("subscribe","topic1")

.load()

df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

.as[(String,String)]

// Subscribe to multiple topics

val df= spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers","host1:port1,host2:port2")

.option("subscribe","topic1,topic2")

.load()

df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

.as[(String,String)]

// Subscribe to a pattern

val df= spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers","host1:port1,host2:port2")

.option("subscribePattern","topic.*")

.load()

df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

.as[(String,String)]

这个没什么说的,简单的设置kafka集群参数以及topic,然后进行查询,df.selectExpr中能够使用sql里的语法,df.select里只能设置选择的字段。设置多个kafka topic时,可以逗号分割,或者正则匹配,这时候,所有topic的数据都会写入到一张表中,很多时候,不同topic更想映射到不同的表中,稍后说明。先来看看直接读取Kafka数据并输出时,结果如何,如下图2.1所示:

图2.1 直接读取kafka数据输出

如上图2.1,是没有执行selectExpr("CAST(value AS STRING)"的结果,value是字节数组,所以,执行CAST的作用是将value转换为字符串,结果如下2.2所示:

图2.2 value转为字符串

此时,selectExpr中只选择了key和value,所以结果中也只有key和value字段。读取kafka时,默认并不会解析kafka的数据内容,而是直接将kafka数据放到value列中显示出来,关于其他几个字段如下表2.1所示:

表2.1 默认内存表schema

Column                                                                                                                   

Type                                                                                                                      

key

binary

value

binary

topic

string

partition

int

offset

long

timestamp

long

timestampType

int

关于Kafka source的详细配置,可以参考官网:Kafka Integration Guide (Kafka broker version 0.10.0 or higher)。这里不多介绍,kafka的数据被写入value,其实对用户而言,并没有什么用,除非使用正则去匹配value的内容,我们更想解析value的内容,将value的内容映射为一张表,如上,value内容是JSON格式数据,解析方式也很简单,使用from_json函数即可,这里有个例子可以参考:kafka-spark-structured-streaming-example,主要内容如下所示:

valschema=StructType(Seq(

StructField("schema",StringType, true),

StructField("payload",StringType, true)

))

 
 

valdf= ds1.selectExpr("cast (value as string) as json")

.select(from_json($"json",schema=schema).as("data"))

.select("data.payload")

首先,将value字段转换为字符串,重名了为"json",然后使用from_json函数,应用预定义的schema,解析json数据,将结果重命名为"data",接下来就可以直接使用json中的字段了。

这里有一个比较纠结的地方,必须预定义json的格式,而不是像spark Streaming一样,提供了自动推测json格式的功能,让我一度觉得Structured Streaming太局限,不实用,经过查证以及思考,觉悟这原来是一个优化,json格式自动推测的前提是遍历所有数据,收集信息才能正确推测json的字段和字段类型,这无疑是很慢的,也很消耗资源,实际上,json格式字段事先基本上都是能够知道的,发生更改的情况很少,但是,还是有可能发生的,目前,我没找到如何动态更改json schema的方式,提供了Schema之后,spark生成的任务计划中已经写死了json数据解析的字段名和字段类型,当查询任务start之后,则使用该任务计划解析数据,并没有提供修改scheme的接口,但是通过查看源码,发现有个RuntimeReplaceable Expression,例子是spark的nvl函数,好像可以运行时更改选择列的表达式,没细看,知道的朋友交流一下,也因此,实际中并没有使用Structured Steamming,因为项目中需要修改json的解析方式。

经过from_json解析之后,结果如下图2.3所示:

图2.3 from_json解析数据

完整代码如下:

  1. package com.dyl.pwrd  
  2. import org.apache.spark.SparkConf  
  3. import org.apache.spark.sql.functions._  
  4. import org.apache.spark.sql.streaming.StreamingQuery  
  5. import org.apache.spark.sql.types._  
  6. import org.apache.spark.sql.SparkSession  
  7. import scala.collection.mutable  
  8. /** 
  9.   * Created by dongyunlong on 2018/1/5. 
  10.   * {"a":"1","b":"2","c":"2018-01-08","d":[23.9,45]} 
  11.   */  
  12. object SparkStructuredStreaming {  
  13.   /** 
  14.     * 创建SparkSession 
  15.     * @return 
  16.     */  
  17.   def getSparkSession={  
  18.     SparkSession  
  19.       .builder()  
  20.       .config(new SparkConf().setMaster("local[2]"))  
  21.       .appName(getClass.getName)  
  22.       .getOrCreate()  
  23.   }  
  24.   /** 
  25.     * 解析kafka json数据,并将其映射为spark临时表 
  26.     * @param spark 
  27.     * @param kafkaTopic 
  28.     * @param sourceName 
  29.     */  
  30.   def createOrReplaceTempView(spark:SparkSession, kafkaTopic:String, sourceName:String): Unit ={  
  31.     import spark.implicits._  
  32.     val df = spark  
  33.       .readStream  
  34.       .format("kafka")  
  35.       .option("kafka.bootstrap.servers", "XX.XX.XX.XX:9092")  
  36.       .option("subscribe", kafkaTopic)  
  37.       .option("startingOffsets", "earliest")  
  38.     .load()  
  39. //    val schema = SocSchemaCollection.getSchemaBySourceName(sourceName) //从数据库加载json schema  
  40.     val schema = StructType(mutable.Seq(  
  41.       StructField("a", DataTypes.StringType),  
  42.       StructField("b", DataTypes.StringType),  
  43.       StructField("c", DataTypes.StringType),  
  44.       StructField("d", DataTypes.createArrayType(DataTypes.StringType))  
  45.     ))  
  46.     if(schema != null){  
  47.       val jsonDf = df.selectExpr("CAST(key AS STRING)", "cast (value as string) as json")  
  48.           .select(from_json($"json", schema=schema).as("data"))  
  49.       jsonDf.select("data.*").createOrReplaceTempView(sourceName)  
  50.     }else{  
  51.       println("error,schema is null")  
  52.     }  
  53.   }  
  54.   /** 
  55.     * 输出spark sql的查询结果 
  56.     * @param spark 
  57.     * @param sql 
  58.     * @return 
  59.     */  
  60.   def sqlWriteStream(spark:SparkSession, sql:String): StreamingQuery ={  
  61.     val query = spark.sql(sql)  
  62.       .writeStream  
  63.       .outputMode("append")  
  64.       .format("console")  
  65.       .start()  
  66.     query  
  67.   }  
  68.   /** 
  69.     * 注册spark临时表,执行sql语句,注意这里每一个sql都是一个writeStream,最后使用spark.streams.awaitAnyTermination()等待所有查询 
  70.     * @param spark 
  71.     */  
  72.   def sparkReadKafka(spark:SparkSession): Unit ={  
  73.     createOrReplaceTempView(spark, "dyl_test01", "dyl_test")  
  74.     val sqls = Array("select * from dyl_test","select *,'2' as e from dyl_test")  
  75.     val querys = mutable.ListBuffer[StreamingQuery]()  
  76.     for(sql <- sqls){  
  77.       println(sql)  
  78.       querys += sqlWriteStream(spark, sql)  
  79.     }  
  80.   }  
  81.   /** 
  82.     * 主函数 
  83.     * @param args 
  84.     */  
  85.   def main(args: Array[String]) {  
  86.     println("hello world")  
  87.     val spark = getSparkSession  
  88.     sparkReadKafka(spark)  
  89.     spark.streams.awaitAnyTermination()  
  90.   }  
  91. }  

Structured Streaming整合kafka相关推荐

  1. 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka

    目录 整合 Kafka 说明 Kafka特定配置 ​​​​​​​KafkaSoure 1.消费一个Topic数据 2.消费多个Topic数据 3.消费通配符匹配Topic数据 ​​​​​​​Kafka ...

  2. Structured Streaming 整合 Kafka指南

    用于 Kafka 0.10 的结构化流式处理集成,用于从 Kafka 读取数据和写入数据. 从kafka读取数据 // Subscribe to 1 topic val df = spark.read ...

  3. kafka spark Structured streaming整合后集群报错KafkaConsumer.subscribe(Ljava/util/Collection;)V

    简介 整个项目架构是在CDH中,,然后spark Structured streaming消费kafka. spark 2.3版本 kafka0.10版本 <!-- spark sql kafk ...

  4. Structured Streaming从Kafka 0.8中读取数据的问题

    众所周知,Structured Streaming默认支持Kafka 0.10,没有提供针对Kafka 0.8的Connector,但这对高手来说不是事儿,于是有个Hortonworks的邵大牛(前段 ...

  5. spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access

    问题描述 spark streaming 使用 直连方式 读取kafka 数据,使用窗口时出现 java.util.ConcurrentModificationException: KafkaCons ...

  6. 大数据Spark Structured Streaming集成 Kafka

    目录 1 Kafka 数据消费 2 Kafka 数据源 3 Kafka 接收器 3.1 配置说明 3.2 实时数据ETL架构 3.3 模拟基站日志数据 3.4 实时增量ETL 4 Kafka 特定配置 ...

  7. 7.1.5 智慧物流【车辆监控Structured Streaming、整合kafka、Redis、Mysql、HBASE 写入数据】

    车辆监控 文章目录 车辆监控 第一节 Structured Streaming 1.1 Structured Streaming发展历史 1.1.1 Spark Streaming 1.1.2 Dat ...

  8. Structured Streaming基础入门

    Structured Streaming 1. 回顾和展望 1.1. Spark 编程模型的进化过程 RDD rdd.flatMap(_.split(" ")).map((_, 1 ...

  9. Structured Streaming + Kafka测试

    前言 Structured Streaming出来有几年了,一直没有机会使用,最近闲来无事,就想先测试一下,完全没有细看关于它的一些详细介绍情况,仅仅想根据官网案例,执行一遍,没想到- copy官网的 ...

最新文章

  1. 两圆重叠问题你会求解吗?这个问题的准确答案,德国数学家最近才找到
  2. php获取xml某个节点的所有内容,怎样输出XML所有的同名节点内容?
  3. jpa hibernate mysql_008Spring JPA Hibernate MySQL
  4. AWK如何打印从某一列到最后一列的内容
  5. pixhawk篇之坐标系转化,相关转化矩阵知识,算法截取
  6. Ehcache学习总结(2)--Ehcache整合spring配置
  7. ZKtime5.0考勤管理系统标准版客户端登录忘记登录密码
  8. 英文字母间距非常大的问题
  9. 抖音如何推动音乐的流行?看完这篇文章你就明白了
  10. 计算机主板尺寸,电脑主板大中小三个等级的尺寸是多少?
  11. 2021年在vue中使用 Google Map
  12. 计算机行业热点的专业信息渠道,新南威尔士大学信息技术硕士专业详解 通往IT大神的必经之路...
  13. 将数组修改为峰谷相间
  14. 小说中场景的功能_小说叙述视角及作用
  15. Developing Large Web Applications
  16. “千人千面”的个性化时代,金融产品也能快速应“变”
  17. couchbase的使用(springboot连接couchbase,node.js连接couchbase)
  18. 基础教育阶段计算机教育需求调研,计算机专业基础教育调研
  19. 聚观早报 | 微软Build开发者大会开幕;阿里云智能裁员7%
  20. WindowBlinds 11 - Windows 11 系统美化工具软件下载

热门文章

  1. 后缀数组(SA)倍增法总结
  2. 00058 imp,IMP-00058: 遇到 ORACLE 错误 904
  3. 周星驰影片经典台词之《唐伯虎点秋香》
  4. 2008升级到2012
  5. 如何成为github开源项目的贡献者contributor
  6. 2006年8月贵州行(黄果树 兴义万峰林与马岭
  7. Fillchar过程全解
  8. POJ - 1269 Intersecting Lines(计算几何 + 叉积 + 跨立实验)
  9. 在Wordpress中安装WooCommerce插件
  10. 区块链软件公司:区块链技术不等于数字货币