spark structured stream的Append模式例子
本例子实现一个从源kafka消费消息进行分组聚合后重新输出到目的kafka的例子,参见代码:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import traceback
# import builtins as py_builtin
from pyspark.sql.functions import max
from pyspark.sql.functions import desc
from pyspark.sql.types import StructField, StructType, StringType, LongType
from pyspark.sql.types import *
from pyspark.sql.functions import col, column, expr
from pyspark.sql.functions import *
from pyspark.sql import Rowappname = "test" # 任务名称
master = "local[*]" # 单机模式设置
'''
local: 所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式。
local[K]: 指定使用几个线程来运行计算,比如local[4]就是运行4个worker线程。通常我们的cpu有几个core,就指定几个线程,最大化利用cpu的计算能力
local[*]: 这种模式直接帮你按照cpu最多cores来设置线程数了。
'''
# spark_driver_host = "10.0.0.248"try:conf = SparkConf().setAppName(appname). \set('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0') \.set("spark.jars.repositories", 'http://maven.aliyun.com/nexus/content/groups/public/') \.setMaster(master) # 本地spark = SparkSession.builder.config(conf=conf).getOrCreate()df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("startingOffsets", "latest") \.option("subscribe", "mykafkatest") \.load()words = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "offset", "timestamp")schema = StructType() \.add("topic", StringType()) \.add("age", StringType()) \\# 通过from_json,定义schema来解析json# res = words.where(instr("value", '{') ==0).select(from_json("value", schema).alias("data")).select("data.*")streamSource = words.where(instr("value", 'topic') > 0).select(from_json("value", schema).alias("data"),col("timestamp")) \.select("data.*", 'timestamp')streamSource = streamSource.select(col('age').cast('int').alias('age'), col('topic'), col('timestamp'))windowedCounts = streamSource.withWatermark("timestamp", "10 seconds") \.groupBy(window(col("timestamp"), '10 seconds', '10 seconds'), col("topic")).count()# query = windowedCounts \# .writeStream \# .outputMode('complete') \# .format('console') \# .option('truncate', 'false') \# .start()res = windowedCounts.withColumn('constfield', lit('1'))query = res.select(to_json(struct("topic", "window")).cast('string').alias("key"),to_json(struct("topic", "window", "count")).cast('string').alias("value"))\.writeStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("topic", "mykafkatestdest") \.option("checkpointLocation", '''D:\spark\spark\spark-2.3.0-bin-hadoop2.7\checkpoint\kafkatest2''') \.trigger(processingTime='3 seconds') \.start()query.awaitTermination()print('计算成功!')
except:traceback.print_exc() # 返回出错信息print('连接出错!')
运行结果:
结论:写入kafka sink时使用的是默认的append输出模式,也就是窗口的信息输出是等到对应的wartermark达到时才会输出的,此时也会顺便清理state,防止state过大。不过在这个例子中也发现一个问题:最后的几条消息一直在等待wartermark的到来,现象就是最后几条消息一直没有聚合输出,不论等待了多少个trigger触发时间,除非再次输入新的几条消息才输出,然而这样新的这几条消息又会一直再次等待wartermark的到达,—这里感觉有点问题,如果只是最后一条消息等待wartermark这能理解,因为append输出模式的语义来说未确定wartermark之前确实不能输出,但是有几条小子都在等待wartermark这里就很难理解了,难道是不同的执行器还有自己的wartermark?需要每个执行器都等到了自己的wartermark才触发?
spark structured stream的Append模式例子相关推荐
- spark structured stream的Update模式
spark的update模式的定义为:自动上一次trigger以来有变化的key都会输出到kafka sink中. 下面的例子完整实现一个从kafka消费并聚合消息,然后把聚合消息写入到目标kafka ...
- Spark Structured SQL报错:Stream stream joins without equality predicate is not supported
1.背景 写一个Spark Structured SQL 任务,任务的功能是对kafka的两个topic进行join处理. select q.sysdt, q.systm, q.event_time ...
- Spark Structured Steaming实战
Spark Structured Steaming Spark Structured Streaming 简介 什么是 Spark Structured Streaming Structured St ...
- 【Spark Streaming】(四)基于 Spark Structured Streaming 的开发与数据处理
文章目录 一.前言 二.Spark Streaming vs Structured Streaming 2.1 Spark Streaming 2.2 Structured Streaming 2.3 ...
- kafka 的structured stream 总结
比较重要的几个概念: 1.trigger触发时间,这个触发时间是指每次触发从kafka读取数据的时间间隔,如果不设置,就是尽可能快的意思,上一批处理完马上下一批,如果偶尔停机,而kafka中积累了大量 ...
- 大数据之Spark:Structured Streaming
目录 1. API 2. 核心思想 3. 应用场景 4.Structured Streaming 实战 1) 读取 Socket 数据 2) 读取目录下文本数据 3) 计算操作 4) 输出 在 2.0 ...
- Spark Structured Streaming概述
Spark Structured Streaming概述 结构化流(Structured Streaming)是基于Spark SQL引擎的流处理引擎,它具有可扩展和容错性.可以使用类似批数据处理的表 ...
- Spark2.3(三十五)Spark Structured Streaming源代码剖析(从CSDN和Github中看到别人分析的源代码的文章值得收藏)...
从CSDN中读取到关于spark structured streaming源代码分析不错的几篇文章 spark源码分析--事件总线LiveListenerBus spark事件总线的核心是LiveLi ...
- kafka spark Structured streaming整合后集群报错KafkaConsumer.subscribe(Ljava/util/Collection;)V
简介 整个项目架构是在CDH中,,然后spark Structured streaming消费kafka. spark 2.3版本 kafka0.10版本 <!-- spark sql kafk ...
最新文章
- T-SQL Convert转换时间类型
- Redis 安装详细过程(redis基本使用(服务端和客户端)、修改密码)
- python 打开 pip_python pip
- 美国计算机专业录取率,美国留学高录取率院校计算机专业申请条件是什么? 爱问知识人...
- gbdt降低学习率可以实现正则化效果呢
- rsync 配置详解
- web页面在线编辑功能
- 排烟管道过长怎么处理_厨房装修失误导致烟道过长该怎么办?
- python中使用selenium模块登录QQ邮箱
- 大神是如何装逼的 之 vim插件使用taglist和nerdtree
- 分治算法——在真币中找出伪币
- 多路复用增益,PASTA定理
- 爬虫进阶之路---处理滑块验证码(以解决极验平台的滑动验证码为例[附带本项目源码!],通过率百分之九十以上!!!)
- jy在线制图系统免费源码丨一秒生成广告横图海报图
- excel条件格式标记一整行
- python爬取淘宝数据
- Milton 1.5.1发布,开源服务器端类库
- wifi辐射安全距离
- Visual C++编译错误:error C2220: 警告被视为错误 - 没有生成“object”文件
- 2022-2028年全球与中国汽车自动变速箱控制单元产业市场前瞻与投资战略规划分析
热门文章
- Google Chrome 工程师:JavaScript 不容错过的八大优化建议
- “十亿赌约”,雷军输,董明珠胜?
- 正确的初始化,在 Java 编程中至关重要!
- 一文了解“最好编程语言”PHP 必知的 16 个编程法则!
- 机器学习如何从 Python 2 迁移到 Python 3
- 19 岁美女自学编程:我是如何成功教会自己的
- 眼图matlab仿真_MATLAB开发自动驾驶第二十课-自动驾驶场景设计器中的预建驾驶场景...
- 交通流预测python代码_Python 3 amp; Keras 实现基于神经网络的交通流预测
- python变量和数据类型_Python变量和数据类型简介
- php测试接口的小工具,PHP API接口测试小工具