Spark Structured Streaming

    1. main difference between streaming and batch data
    1. details on the Structured Streaming API
    1. use for Structured Streaming on imcoming data and save output result in memory

Batch vs. Stream

  • batch data : 一段时间内收集的数据
  • stream processing : 实时或接近实时的数据处理,更高效,(fraud detection)

Streaming framework

  • Data input
  • Data processing(real time or near real time)
  • Final output

Data Input

  • ingest and process data continuously

    • Messaging systems : Apache Kafka, Flume, etc
    • File folders/directory : 连续的读目录中的文件作为流数据

Data Processing

  • aggregations
  • filtering
  • joins
  • sorting
  • etc

Final Output

  • Append or Complete

    • Append mode : adding only new results to the final output
    • Complete mode : updates the entire results table at the final output
    • File directory sink
    • Console
    • Memory sink

Building a Structured App

  • File folders/directory (基于文件目录的流数据处理应用)
from pyspark.sql import SparkSessionspark = SparkSession.builder.appName('structured_streaming').getOrCreate()import pyspark.sql.functions as F
from pyspark.sql.types import *

step 1. Create dataset

(contains four columns, csv format or parquet format)

    1. User ID
    1. APP
    1. Time spend(secs)
    1. Age
# create a data file , in the local folder (csv_folder) # hdfs: dfs -mkdir /csv_folder
df_1=spark.createDataFrame([("XN203",'FB',300,30),("XN201",'Twitter',10,19),("XN202",'Insta',500,45)],["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')# define schema
schema=StructType().add("user_id","string").add("app","string").add("time_in_secs", "integer").add("age","integer")# readStream
data=spark.readStream.option("sep", ",").schema(schema).csv("csv_folder")

step 2. Operations

# count app
app_count=data.groupBy('app').count()# write result in memory
query=(app_count.writeStream.queryName('count_query').outputMode('complete').format('memory').start())# usd SQL command to view the output
# toPandas() , require install pandas
# spark.sql("select * from count_query ").toPandas().head(5)
spark.sql('select * from count_query').show(5)

####### filter FB app and calcualte avgrage time for each user
fb_data=data.filter(data['app']=='FB')
# 统计每个user,在FB上花费的平均时间
fb_avg_time=fb_data.groupBy('user_id').agg(F.avg("time_in_secs"))
fb_query=(fb_avg_time.writeStream.queryName('fb_query').outputMode('complete').format('memory').start())
spark.sql("select * from fb_query ").show()

# add new data
df_2=spark.createDataFrame([("XN203",'FB',100,30),("XN201",'FB',10,19),("XN202",'FB',2000,45)],["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')
# old qurey operation
spark.sql('select * from fb_query').show()

# add new data
df_2=spark.createDataFrame([("XN203",'FB',1000,30),("XN201",'FB',10,19),("XN202",'FB',2000,45)],["user_id","app","time_in_secs","age"]).write.csv("csv_folder",mode='append')
# old qurey operation
spark.sql('select * from fb_query').show()


统计每个app上user花费的时间总和并排序

app_df=data.groupBy('app').agg(F.sum('time_in_secs').alias('total_time')).orderBy('total_time',ascending=False)
app_query=(app_df.writeStream.queryName('app_wise_query').outputMode('complete').format('memory').start())
spark.sql("select * from app_wise_query ").show()

Structured Streaming Alternatives

    1. Flink
    1. Google’s Beam
    1. Spark SQL API

PySpark : Structured Streaming相关推荐

  1. Structured Streaming使用staticDf和StreamingDf进行join

    概观 结构化流是一种基于Spark SQL引擎的可扩展且容错的流处理引擎.您可以像表达静态数据的批处理计算一样表达流式计算.Spark SQL引擎将负责逐步和连续地运行它,并在流数据继续到达时更新最终 ...

  2. Spark Structured Streaming概述

    Spark Structured Streaming概述 结构化流(Structured Streaming)是基于Spark SQL引擎的流处理引擎,它具有可扩展和容错性.可以使用类似批数据处理的表 ...

  3. Structured Streaming编程 Programming Guide

    Structured Streaming编程 Programming Guide • Overview • Quick Example • Programming Model o Basic Conc ...

  4. 2021年大数据Spark(五十三):Structured Streaming Deduplication

    目录 Streaming Deduplication 介绍 需求 ​​​​​​​代码演示 Streaming Deduplication 介绍 在实时流式应用中,最典型的应用场景:网站UV统计. 1: ...

  5. 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析

    目录 事件时间窗口分析 时间概念 ​​​​​​​event-time ​​​​​​​延迟数据处理 ​​​​​​​延迟数据 ​​​​​​​Watermarking 水位 ​​​​​​​官方案例演示 事件 ...

  6. 2021年大数据Spark(五十一):Structured Streaming 物联网设备数据分析

    目录 ​​​​​​​物联网设备数据分析 ​​​​​​​设备监控数据准备 ​​​​​​​创建Topic ​​​​​​​模拟数据 ​​​​​​​SQL风格 ​​​​​​​DSL风格 物联网设备数据分析 在 ...

  7. 2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

    目录 案例一 实时数据ETL架构 准备主题 ​​​​​​​模拟基站日志数据 ​​​​​​​实时增量ETL 案例一 实时数据ETL架构 在实际实时流式项目中,无论使用Storm.SparkStreami ...

  8. 2021年大数据Spark(四十九):Structured Streaming 整合 Kafka

    目录 整合 Kafka 说明 Kafka特定配置 ​​​​​​​KafkaSoure 1.消费一个Topic数据 2.消费多个Topic数据 3.消费通配符匹配Topic数据 ​​​​​​​Kafka ...

  9. 2021年大数据Spark(四十八):Structured Streaming 输出终端/位置

    目录 输出终端/位置 文件接收器 ​​​​​​​Memory Sink Foreach和ForeachBatch Sink Foreach ​​​​​​​ForeachBatch ​​​​​​​代码演 ...

  10. 2021年大数据Spark(四十五):Structured Streaming Sources 输入源

    目录 Sources 输入源 Socket数据源-入门案例 需求 编程实现 ​​​​​​​文件数据源-了解 ​​​​​​​需求 ​​​​​​​代码实现 ​​​​​​​Rate source-了解 So ...

最新文章

  1. ​Openresty最佳案例 | 第8篇:RBAC介绍、sql和redis模块工具类
  2. Ubuntu操作系统安装之开发人员必备
  3. leetcode算法题--Remove K Digits
  4. 【AI-1000问】为什么CNN中的卷积核半径都是奇数?
  5. python c 混合编程 用c循环_混合编程:用 C 语言来扩展 Python 大法吧!
  6. iview table后端分页 多选 翻页选中回显
  7. U盘拒绝访问怎么办 快速方法解决U盘问题
  8. JDK 11,Tomcat卡在Deploying web application directory
  9. java微信刷卡支付demo_微信刷卡支付例子
  10. kali Linux的安装
  11. 完全掌握1级日本与能力考试语法问题对策
  12. Substrings (C++ find函数应用)
  13. 手机系统S40 S60 是什么意思?
  14. [渝粤教育] 中山大学 健康评估 参考 资料
  15. fbx模型压缩成gltf格式
  16. 关于98% after emitting CopyPlugin个人探索
  17. [讲座论坛] 应对气候变化的中国视角
  18. 计算机系统配置低会带来,电脑配置差装什么系统合适
  19. JavaScript压缩工具JSA使用介绍
  20. Cookie 跨域解决方案(IFrame跨域)

热门文章

  1. 找不到列 dbo 或用户定义的函数或聚合_Power BI 的大数据处理方案:聚合
  2. c++ 多线程 类成员函数_C++11多线程
  3. SQL语句:查询多表更新数据
  4. IE7的CSS兼容性
  5. 编译asp.net 2.0项目到dll文件
  6. Usage of #pragma
  7. 微服务下flask和celery的通信
  8. css实现动态阴影、蚀刻文本、渐变文本
  9. 向云上迁移数据时如何避免停机和中断
  10. 广东电网公司大数据平台初步建成