【Spark分布式内存计算框架——Structured Streaming】3. Structured Streaming —— 入门案例:WordCount
1.3 入门案例:WordCount
入门案例与SparkStreaming的入门案例基本一致:实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台Console。
文档:http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#quick-example
功能演示
运行词频统计WordCount程序,从TCP Socket消费数据,官方演示说明截图如下:
演示运行案例步骤:
第一步、打开终端Terminal,运行NetCat,命令为:nc -lk 9999
第二步、打开另一个终端Terminal,执行如下命令
# 官方入门案例运行:词频统计
/export/server/spark/bin/run-example \
--master local[2] \
--conf spark.sql.shuffle.partitions=2 \
org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount \
node1.itcast.cn 9999
# 测试数据
spark hadoop spark hadoop spark hive
spark spark spark
spark hadoop hive
发送数据以后,最终统计输出结果如下:
Socket 数据源
从Socket中读取UTF8文本数据。一般用于测试,使用nc -lk 端口号向Socket监听的端口发送数据,用于测试使用,有两个参数:
参数一:host,主机名称,必须指定参数
参数二:port,端口号,必须指定参数
范例如下所示:
Console 接收器
将结果数据打印到控制台或者标准输出,通常用于测试或Bedug使用,三种输出模式OutputMode(Append、Update、Complete)都支持,两个参数可设置:
参数一:numRows,打印多少条数据,默认为20条;
参数二:truncate,如果某列值字符串太长是否截取,默认为true,截取字符串;
范例如下所示:
编程实现
可以认为Structured Streaming = SparkStreaming + SparkSQL,对流式数据处理使用SparkSQL数据结构,应用入口为SparkSession,对比SparkSQL与SparkStreaming编程:
- Spark Streaming:将流式数据按照时间间隔(BatchInterval)划分为很多Batch,每批次数据封装在RDD中,底层RDD数据,构建StreamingContext实时消费数据;
- Structured Streaming属于SparkSQL模块中一部分,对流式数据处理,构建SparkSession对象,指定读取Stream数据和保存Streamn数据,具体语法格式:
- 加载数据Load:读取静态数据【spark.read】、读取流式数据【spark.readStream】
- 保存数据Save:保存静态数据【ds/df.write】、保存流式数据【ds/df.writeStrem】
词频统计案例:从TCP Socket实消费流式数据,进行词频统计,将结果打印在控制台Console。
第一点、程序入口SparkSession,加载流式数据:spark.readStream
第二点、数据封装Dataset/DataFrame中,分析数据时,建议使用DSL编程,调用API,很少使用SQL方式
第三点、启动流式应用,设置Output结果相关信息、start方法启动应用
完整案例代码如下:
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果打印到控制台。
*/
object StructuredWordCount {def main(args: Array[String]): Unit = {// TODO: 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2") // 设置Shuffle分区数目
.getOrCreate()
// 导入隐式转换和函数库
import spark.implicits._
import org.apache.spark.sql.functions._
// TODO: 1. 从TCP Socket 读取数据
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1.itcast.cn").option("port", 9999)
.load()
/*
root
|-- value: string (nullable = true)
*/
//inputStreamDF.printSchema()
// TODO: 2. 业务分析:词频统计WordCount
val resultStreamDF: DataFrame = inputStreamDF
.as[String] // 将DataFrame转换为Dataset进行操作
// 过滤数据
.filter(line => null != line && line.trim.length > 0)
// 分割单词
.flatMap(line => line.trim.split("\\s+"))
.groupBy($"value").count() // 按照单词分组,聚合
/*
root
|-- value: string (nullable = true)
|-- count: long (nullable = false)
*/
//resultStreamDF.printSchema()
// TODO: 3. 设置Streaming应用输出及启动
val query: StreamingQuery = resultStreamDF.writeStream
// TODO: 设置输出模式:Complete表示将ResultTable中所有结果数据输出
// .outputMode(OutputMode.Complete())
// TODO: 设置输出模式:Update表示将ResultTable中有更新结果数据输出
.outputMode(OutputMode.Update())
.format("console")
.option("numRows", "10").option("truncate", "false")
// 流式应用,需要启动start
.start()
// 流式查询等待流式应用终止
query.awaitTermination()
// 等待所有任务运行完成才停止运行
query.stop()
}
}
其中可以设置不同输出模式(OutputMode),当设置为Complete时,结果表ResultTable中所有数据都输出;当设置为Update时,仅仅输出结果表ResultTable中更新的数据。
【Spark分布式内存计算框架——Structured Streaming】3. Structured Streaming —— 入门案例:WordCount相关推荐
- 【Spark分布式内存计算框架——Spark Core】6. RDD 持久化
3.6 RDD 持久化 在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高 ...
- 【Spark分布式内存计算框架——Structured Streaming】2. Structured Streaming 核心设计与编程模型
核心设计 2016年,Spark在2.0版本中推出了结构化流处理的模块Structured Streaming,核心设计如下: 第一点:Input and Output(输入和输出) Structur ...
- 【Spark分布式内存计算框架——Spark Streaming】11. 应用案例:百度搜索风云榜(下)实时窗口统计
5.5 实时窗口统计 SparkStreaming中提供一些列窗口函数,方便对窗口数据进行分析,文档: http://spark.apache.org/docs/2.4.5/streaming-pro ...
- 【Spark分布式内存计算框架——Spark Streaming】10. 应用案例:百度搜索风云榜(中)实时数据ETL存储
5.3 实时数据ETL存储 实时从Kafka Topic消费数据,提取ip地址字段,调用[ip2Region]库解析为省份和城市,存储到HDFS文件中,设置批处理时间间隔BatchInterval为1 ...
- 【Spark分布式内存计算框架——Spark Streaming】9. 获取偏移量 应用案例:百度搜索风云榜(上)
4.4 获取偏移量 当SparkStreaming集成Kafka时,无论是Old Consumer API中Direct方式还是New Consumer API方式获取的数据,每批次的数据封装在Kaf ...
- 云计算实验2 Spark分布式内存计算框架配置及编程案例
一. 实验目的 掌握分布式多节点计算平台Spark配置,Spark编程环境IDEA配置,示例程序启动与运行 二. 实验环境 Linux的虚拟机环境.线上操作视频和实验指导手册 三. 实验任务 完成Sp ...
- 云计算与大数据第16章 分布式内存计算平台Spark习题
第16章 分布式内存计算平台Spark习题 16.1 选择题 1.Spark是Hadoop生态( B )组件的替代方案. A. Hadoop B. MapReduce C. ...
- java中如何合并两个网格,Hazelcast: Java分布式内存网格框架(平台)
转自:http://blog.csdn.net/iihero/article/details/7385641 下边是它的宣传内容: hazelcast是一个开放源码集群和高度可扩展的数据分发平台,这是 ...
- hazelcast java_Hazelcast: Java分布式内存网格框架(平台)
下边是它的宣传内容: hazelcast是一个开放源码集群和高度可扩展的数据分发平台,这是为Java: 1. 快如闪电;数以千计的运算/秒. 2. 故障安全;崩溃后没有丢失数据. 3. 作为新服务器的 ...
最新文章
- 【Vue版】实现拖拽、排序效果(注意,这个方法在chrome谷歌浏览器上面不适用,dragend会情不自禁触发drag事件先执行,有点像浏览器的一个bug)
- 在windows程序中嵌入Lua脚本引擎--使用VS IDE编译Luajit脚本引擎
- R语言数据热力图绘制实战(基于原生R函数、ggplot2包、plotly包)
- %w(数组)是什么意思?
- Linux Kernel TCP/IP Stack — L1 Layer — NIC Controller — NAPI
- 全球新能源汽车行业前景规模及发展趋势预测报告2022-2028年版
- html输入框只能输入几个,input 两个input框只能允许同时输入一个
- 多版本opencv 兼容
- angular2安装笔记
- Android 消息处理源代码分析(1)
- Linux 的字符串截取很有用。有八种方法。
- STL学习笔记 ---- 由set的声明所引发的自定义比较的实现方式 作者:winterTTr(转载请注明)...
- 鲜活的数据 : 数据可视化指南
- c语言 怎么访问64位地址_大神用10000字总结了嵌入式C语言必学知识点……
- 二十六:Struts2 和 spring整合
- 3559A原生CAN总线调试
- 甲骨文裁员,这是一个危险信号
- 安卓4.0后新控件TextureView解决SurfaceView在修改默认屏幕方向后(硬件导致)视频方向无法翻转的问题
- 本机号码校验不只是免输密码、免输短信验证码
- staf linux运行模式,IBM 自动化测试框架STAF介绍
热门文章
- 线程钩子HookC#实例
- VS Code 使用火狐 FireFox 调试网页
- 神舟电脑装linux双系统,神舟战神肿么装双系统
- 求解点关于直线的距离、垂足、对称点公式
- CSU1020-真三国无双-模拟
- 物联网家电第一股,想离开小米的云米现在有多少实力?
- 互联网金融大事件,从泛亚到e租宝敲响的警示钟?
- 无人驾驶 | 自动驾驶技术和机器人技术的对比
- S-属性定义与L-属性定义
- SQL查询语句分步详解——SELECT...FROM...WHERE...GROUP BY...