hdfs--Structured Streaming--console案例
先写一个python脚本用于在hdfs上生成文件
vim files.py
import osfor index in range(10):content = """{"name":"Leijun"}{"name":"Andy", "age":30}{"name":"Justin", "age":19}"""file_name = "/root/text{0}.json".format(index)with open(file_name, "w") as file:file.write(content)os.system("/usr/local/hadoop/bin/hdfs dfs -mkdir -p /structture")os.system("/usr/local/hadoop/bin/hdfs dfs -put {0} /structture".format(file_name))
Structured Streaming代码如下
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}object hdfsSource extends App {//构建spark sessionprivate val session: SparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()//创建schema,必须的一步private val schema = new StructType(Array(StructField("name", DataTypes.StringType),StructField("age", DataTypes.IntegerType)))//读取hdfs上的json文件,必须要有schema,不然报错private val df: DataFrame = session.readStream.schema(schema).json("hdfs://mypc01/structture")//输出读取的内容到控制台df.writeStream.outputMode(OutputMode.Update()).format("console").start().awaitTermination()
}
启动spark程序,同时执行python脚本
python files.py
输出结果
Batch: 0
-------------------------------------------+-------+----+
| name| age|
+-------+----+
|Michael|null|
| Andy| 30|
| Justin| 19|
|Michael|null|
| Andy| 30|
| Justin| 19|
|Michael|null|
| Andy| 30|
| Justin| 19|
...
解析
readStream
def readStream: DataStreamReader
返回一个DataStreamReader
sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
schema
def schema(schema: StructType): DataStreamReader
指定输入模式。 某些数据源(例如JSON)可以根据数据自动推断输入架构。 通过在此处指定架构,基础数据源可以跳过架构推断步骤,从而加快数据加载速度。
format
def format(source: String): DataStreamWriter[T]
指定基础输出数据源。
StructType
class StructType(fields: Array[StructField])
extends DataType
with Seq[StructField]
可以通过以下方式构造StructType对象:
StructType(fields: Seq[StructField])
对于StructType对象,可以按名称提取一个或多个StructField。 如果提取了多个StructField,则将返回一个StructType对象。 如果提供的名称没有匹配的字段,它将被忽略。 对于提取单个StructField的情况,将返回null。
总结
hdfs--Structured Streaming--console案例相关推荐
- 【Spark分布式内存计算框架——Structured Streaming】3. Structured Streaming —— 入门案例:WordCount
1.3 入门案例:WordCount 入门案例与SparkStreaming的入门案例基本一致:实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台C ...
- Structured Streaming 入门案例之WordCount
1.编写一个流式计算的应用, 不断的接收外部系统的消息 2.对消息中的单词进行词频统计 3.统计全局的结果 步骤 Socket Server 等待 Structured Streaming 程序连接 ...
- Structured Streaming基础入门
Structured Streaming 1. 回顾和展望 1.1. Spark 编程模型的进化过程 RDD rdd.flatMap(_.split(" ")).map((_, 1 ...
- Structured Streaming 开发入门
Structured Streaming 作为 Spark 家族的新成员,通过 Spark SQL/DataFrame 来处理 Batch/Streaming 数据,基本的 SparkSQL API ...
- 2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构
目录 案例一 实时数据ETL架构 准备主题 模拟基站日志数据 实时增量ETL 案例一 实时数据ETL架构 在实际实时流式项目中,无论使用Storm.SparkStreami ...
- Structured Streaming 案例初体验
Structured Streaming程序基本步骤 编写Structured Streaming程序的基本步骤是: 1.创建SparkSession实例: 2.创建DataFrame表示从数据源输入 ...
- Structured Streaming系列-1、Structured Streaming
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 传送门:大数据系列文章目录 官方网址:http://spark.apache.org/. ht ...
- 大数据Spark Structured Streaming
目录 1 Spark Streaming 不足 2 Structured Streaming 概述 2.1 模块介绍 2.3 编程模型 3 入门案例:WordCount 3.1 功能演示 3.2 So ...
- Structured Streaming编程 Programming Guide
Structured Streaming编程 Programming Guide • Overview • Quick Example • Programming Model o Basic Conc ...
- 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析
目录 事件时间窗口分析 时间概念 event-time 延迟数据处理 延迟数据 Watermarking 水位 官方案例演示 事件 ...
最新文章
- Windows10下如何安装配置 perl 环境
- hdu 4891 模拟
- 考研编程练习----递推数列(矩阵相乘法)
- 机器学习-数据科学库(第六天)
- C语言除法浮点型和整形,浅谈C语言整型与浮点型转换
- java date 之后_java中时间类(util Date)的后延与前推处理
- 黑色精美大气DJ音乐歌曲网站源码+带WAP手机端
- ubuntu下载字体
- 数学建模学习(93):方差分析、T检验、卡方分析(检验)
- 2015人生感悟哲理
- 华为手机备忘录怎样设置每个月12号短信提醒要做的事
- Img2Lcd 使用
- Python学习(中一)
- function Function函数
- 转让英孚10个月课程
- (收藏)【 数字化客户体验】NPS、CSAT和CES——2020年跟踪的客户满意度指标
- 【转】值得推荐的android开发框架简介
- UltraEdit的脚本使用
- CODESYS读取csv文件的方法(非excel)
- 程序员简历这么写,offer收到手软。
热门文章
- 树状数组求逆序对_算法系列之-数组中的逆序对
- python解释器调用_Python3.x那些事儿:[2]如何调用解释器-百度经验
- rosdep init 和rosdep update的解决方法,亲测有效
- 你离Python大神就差这课树了!建议收藏|Python技能树测评
- java死锁怎么用jvm调试_jvm 内存dump、gc查看、线程死锁,jmap、jstack、jstat
- HTML1个像素宽的代码,HTML5 Canvas中绘制一个像素宽的细线实现代码详情
- qt 隐藏控制台_带可选GUI的Qt控制台应用程序
- IDEA插件推荐:Material Theme UI(把IDEA变得更加美观)
- python数值比较器_python笔记16(数据处理笔记1)
- 波形发生器设计c语言文件,超低频波形发生器的设计论文(C语言编程) .doc