先写一个python脚本用于在hdfs上生成文件

vim files.py
import osfor index in range(10):content = """{"name":"Leijun"}{"name":"Andy", "age":30}{"name":"Justin", "age":19}"""file_name = "/root/text{0}.json".format(index)with open(file_name, "w") as file:file.write(content)os.system("/usr/local/hadoop/bin/hdfs dfs -mkdir -p /structture")os.system("/usr/local/hadoop/bin/hdfs dfs -put {0} /structture".format(file_name))

Structured Streaming代码如下

import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}object hdfsSource extends App {//构建spark sessionprivate val session: SparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()//创建schema,必须的一步private val schema = new StructType(Array(StructField("name", DataTypes.StringType),StructField("age", DataTypes.IntegerType)))//读取hdfs上的json文件,必须要有schema,不然报错private val df: DataFrame = session.readStream.schema(schema).json("hdfs://mypc01/structture")//输出读取的内容到控制台df.writeStream.outputMode(OutputMode.Update()).format("console").start().awaitTermination()
}

启动spark程序,同时执行python脚本

python files.py

输出结果

Batch: 0
-------------------------------------------+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
|Michael|null|
|   Andy|  30|
| Justin|  19|
|Michael|null|
|   Andy|  30|
| Justin|  19|
...

解析

readStream

def readStream: DataStreamReader

返回一个DataStreamReader

   sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")

schema

 def schema(schema: StructType): DataStreamReader

指定输入模式。 某些数据源(例如JSON)可以根据数据自动推断输入架构。 通过在此处指定架构,基础数据源可以跳过架构推断步骤,从而加快数据加载速度。
format

def format(source: String): DataStreamWriter[T]

指定基础输出数据源。

StructType

class StructType(fields: Array[StructField])
extends DataType
with Seq[StructField]

可以通过以下方式构造StructType对象:

 StructType(fields: Seq[StructField])

对于StructType对象,可以按名称提取一个或多个StructField。 如果提取了多个StructField,则将返回一个StructType对象。 如果提供的名称没有匹配的字段,它将被忽略。 对于提取单个StructField的情况,将返回null。

总结

hdfs--Structured Streaming--console案例相关推荐

  1. 【Spark分布式内存计算框架——Structured Streaming】3. Structured Streaming —— 入门案例:WordCount

    1.3 入门案例:WordCount 入门案例与SparkStreaming的入门案例基本一致:实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台C ...

  2. Structured Streaming 入门案例之WordCount

    1.编写一个流式计算的应用, 不断的接收外部系统的消息 2.对消息中的单词进行词频统计 3.统计全局的结果 步骤 Socket Server 等待 Structured Streaming 程序连接 ...

  3. Structured Streaming基础入门

    Structured Streaming 1. 回顾和展望 1.1. Spark 编程模型的进化过程 RDD rdd.flatMap(_.split(" ")).map((_, 1 ...

  4. Structured Streaming 开发入门

    Structured Streaming 作为 Spark 家族的新成员,通过 Spark SQL/DataFrame 来处理 Batch/Streaming 数据,基本的 SparkSQL API ...

  5. 2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

    目录 案例一 实时数据ETL架构 准备主题 ​​​​​​​模拟基站日志数据 ​​​​​​​实时增量ETL 案例一 实时数据ETL架构 在实际实时流式项目中,无论使用Storm.SparkStreami ...

  6. Structured Streaming 案例初体验

    Structured Streaming程序基本步骤 编写Structured Streaming程序的基本步骤是: 1.创建SparkSession实例: 2.创建DataFrame表示从数据源输入 ...

  7. Structured Streaming系列-1、Structured Streaming

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 传送门:大数据系列文章目录 官方网址:http://spark.apache.org/. ht ...

  8. 大数据Spark Structured Streaming

    目录 1 Spark Streaming 不足 2 Structured Streaming 概述 2.1 模块介绍 2.3 编程模型 3 入门案例:WordCount 3.1 功能演示 3.2 So ...

  9. Structured Streaming编程 Programming Guide

    Structured Streaming编程 Programming Guide • Overview • Quick Example • Programming Model o Basic Conc ...

  10. 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析

    目录 事件时间窗口分析 时间概念 ​​​​​​​event-time ​​​​​​​延迟数据处理 ​​​​​​​延迟数据 ​​​​​​​Watermarking 水位 ​​​​​​​官方案例演示 事件 ...

最新文章

  1. Windows10下如何安装配置 perl 环境
  2. hdu 4891 模拟
  3. 考研编程练习----递推数列(矩阵相乘法)
  4. 机器学习-数据科学库(第六天)
  5. C语言除法浮点型和整形,浅谈C语言整型与浮点型转换
  6. java date 之后_java中时间类(util Date)的后延与前推处理
  7. 黑色精美大气DJ音乐歌曲网站源码+带WAP手机端
  8. ubuntu下载字体
  9. 数学建模学习(93):方差分析、T检验、卡方分析(检验)
  10. 2015人生感悟哲理
  11. 华为手机备忘录怎样设置每个月12号短信提醒要做的事
  12. Img2Lcd 使用
  13. Python学习(中一)
  14. function Function函数
  15. 转让英孚10个月课程
  16. (收藏)【 数字化客户体验】NPS、CSAT和CES——2020年跟踪的客户满意度指标
  17. 【转】值得推荐的android开发框架简介
  18. UltraEdit的脚本使用
  19. CODESYS读取csv文件的方法(非excel)
  20. 程序员简历这么写,offer收到手软。

热门文章

  1. 树状数组求逆序对_算法系列之-数组中的逆序对
  2. python解释器调用_Python3.x那些事儿:[2]如何调用解释器-百度经验
  3. rosdep init 和rosdep update的解决方法,亲测有效
  4. 你离Python大神就差这课树了!建议收藏|Python技能树测评
  5. java死锁怎么用jvm调试_jvm 内存dump、gc查看、线程死锁,jmap、jstack、jstat
  6. HTML1个像素宽的代码,HTML5 Canvas中绘制一个像素宽的细线实现代码详情
  7. qt 隐藏控制台_带可选GUI的Qt控制台应用程序
  8. IDEA插件推荐:Material Theme UI(把IDEA变得更加美观)
  9. python数值比较器_python笔记16(数据处理笔记1)
  10. 波形发生器设计c语言文件,超低频波形发生器的设计论文(C语言编程) .doc