PySpark : Structured Streaming
Spark Structured Streaming
- main difference between streaming and batch data
- details on the Structured Streaming API
- use for Structured Streaming on imcoming data and save output result in memory
Batch vs. Stream
- batch data : 一段时间内收集的数据
- stream processing : 实时或接近实时的数据处理,更高效,(fraud detection)
Streaming framework
- Data input
- Data processing(real time or near real time)
- Final output
Data Input
- ingest and process data continuously
- Messaging systems : Apache Kafka, Flume, etc
- File folders/directory : 连续的读目录中的文件作为流数据
Data Processing
- aggregations
- filtering
- joins
- sorting
- etc
Final Output
- Append or Complete
- Append mode : adding only new results to the final output
- Complete mode : updates the entire results table at the final output
- File directory sink
- Console
- Memory sink
Building a Structured App
- File folders/directory (基于文件目录的流数据处理应用)
from pyspark.sql import SparkSessionspark = SparkSession.builder.appName('structured_streaming').getOrCreate()import pyspark.sql.functions as F
from pyspark.sql.types import *
step 1. Create dataset
(contains four columns, csv format or parquet format)
- User ID
- APP
- Time spend(secs)
- Age
# create a data file , in the local folder (csv_folder) # hdfs: dfs -mkdir /csv_folder
df_1=spark.createDataFrame([("XN203",'FB',300,30),("XN201",'Twitter',10,19),("XN202",'Insta',500,45)],["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')# define schema
schema=StructType().add("user_id","string").add("app","string").add("time_in_secs", "integer").add("age","integer")# readStream
data=spark.readStream.option("sep", ",").schema(schema).csv("csv_folder")
step 2. Operations
# count app
app_count=data.groupBy('app').count()# write result in memory
query=(app_count.writeStream.queryName('count_query').outputMode('complete').format('memory').start())# usd SQL command to view the output
# toPandas() , require install pandas
# spark.sql("select * from count_query ").toPandas().head(5)
spark.sql('select * from count_query').show(5)
####### filter FB app and calcualte avgrage time for each user
fb_data=data.filter(data['app']=='FB')
# 统计每个user,在FB上花费的平均时间
fb_avg_time=fb_data.groupBy('user_id').agg(F.avg("time_in_secs"))
fb_query=(fb_avg_time.writeStream.queryName('fb_query').outputMode('complete').format('memory').start())
spark.sql("select * from fb_query ").show()
# add new data
df_2=spark.createDataFrame([("XN203",'FB',100,30),("XN201",'FB',10,19),("XN202",'FB',2000,45)],["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')
# old qurey operation
spark.sql('select * from fb_query').show()
# add new data
df_2=spark.createDataFrame([("XN203",'FB',1000,30),("XN201",'FB',10,19),("XN202",'FB',2000,45)],["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')
# old qurey operation
spark.sql('select * from fb_query').show()
统计每个app上user花费的时间总和并排序
app_df=data.groupBy('app').agg(F.sum('time_in_secs').alias('total_time')).orderBy('total_time',ascending=False)
app_query=(app_df.writeStream.queryName('app_wise_query').outputMode('complete').format('memory').start())
spark.sql("select * from app_wise_query ").show()
Structured Streaming Alternatives
- Flink
- Google’s Beam
- Spark SQL API
PySpark : Structured Streaming相关推荐
- Structured Streaming使用staticDf和StreamingDf进行join
概观 结构化流是一种基于Spark SQL引擎的可扩展且容错的流处理引擎.您可以像表达静态数据的批处理计算一样表达流式计算.Spark SQL引擎将负责逐步和连续地运行它,并在流数据继续到达时更新最终 ...
- Spark Structured Streaming概述
Spark Structured Streaming概述 结构化流(Structured Streaming)是基于Spark SQL引擎的流处理引擎,它具有可扩展和容错性.可以使用类似批数据处理的表 ...
- Structured Streaming编程 Programming Guide
Structured Streaming编程 Programming Guide • Overview • Quick Example • Programming Model o Basic Conc ...
- 2021年大数据Spark(五十三):Structured Streaming Deduplication
目录 Streaming Deduplication 介绍 需求 代码演示 Streaming Deduplication 介绍 在实时流式应用中,最典型的应用场景:网站UV统计. 1: ...
- 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析
目录 事件时间窗口分析 时间概念 event-time 延迟数据处理 延迟数据 Watermarking 水位 官方案例演示 事件 ...
- 2021年大数据Spark(五十一):Structured Streaming 物联网设备数据分析
目录 物联网设备数据分析 设备监控数据准备 创建Topic 模拟数据 SQL风格 DSL风格 物联网设备数据分析 在 ...
- 2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构
目录 案例一 实时数据ETL架构 准备主题 模拟基站日志数据 实时增量ETL 案例一 实时数据ETL架构 在实际实时流式项目中,无论使用Storm.SparkStreami ...
- 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
目录 整合 Kafka 说明 Kafka特定配置 KafkaSoure 1.消费一个Topic数据 2.消费多个Topic数据 3.消费通配符匹配Topic数据 Kafka ...
- 2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
目录 输出终端/位置 文件接收器 Memory Sink Foreach和ForeachBatch Sink Foreach ForeachBatch 代码演 ...
- 2021年大数据Spark(四十五):Structured Streaming Sources 输入源
目录 Sources 输入源 Socket数据源-入门案例 需求 编程实现 文件数据源-了解 需求 代码实现 Rate source-了解 So ...
最新文章
- ​Openresty最佳案例 | 第8篇:RBAC介绍、sql和redis模块工具类
- Ubuntu操作系统安装之开发人员必备
- leetcode算法题--Remove K Digits
- 【AI-1000问】为什么CNN中的卷积核半径都是奇数?
- python c 混合编程 用c循环_混合编程:用 C 语言来扩展 Python 大法吧!
- iview table后端分页 多选 翻页选中回显
- U盘拒绝访问怎么办 快速方法解决U盘问题
- JDK 11,Tomcat卡在Deploying web application directory
- java微信刷卡支付demo_微信刷卡支付例子
- kali Linux的安装
- 完全掌握1级日本与能力考试语法问题对策
- Substrings (C++ find函数应用)
- 手机系统S40 S60 是什么意思?
- [渝粤教育] 中山大学 健康评估 参考 资料
- fbx模型压缩成gltf格式
- 关于98% after emitting CopyPlugin个人探索
- [讲座论坛] 应对气候变化的中国视角
- 计算机系统配置低会带来,电脑配置差装什么系统合适
- JavaScript压缩工具JSA使用介绍
- Cookie 跨域解决方案(IFrame跨域)