1.3 入门案例:WordCount

入门案例与SparkStreaming的入门案例基本一致:实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台Console。
文档:http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#quick-example

功能演示
运行词频统计WordCount程序,从TCP Socket消费数据,官方演示说明截图如下:

演示运行案例步骤:
第一步、打开终端Terminal,运行NetCat,命令为:nc -lk 9999
第二步、打开另一个终端Terminal,执行如下命令

# 官方入门案例运行:词频统计
/export/server/spark/bin/run-example \
--master local[2] \
--conf spark.sql.shuffle.partitions=2 \
org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount \
node1.itcast.cn 9999
# 测试数据
spark hadoop spark hadoop spark hive
spark spark spark
spark hadoop hive

发送数据以后,最终统计输出结果如下:

Socket 数据源
从Socket中读取UTF8文本数据。一般用于测试,使用nc -lk 端口号向Socket监听的端口发送数据,用于测试使用,有两个参数:

参数一:host,主机名称,必须指定参数
参数二:port,端口号,必须指定参数

范例如下所示:

Console 接收器
将结果数据打印到控制台或者标准输出,通常用于测试或Bedug使用,三种输出模式OutputMode(Append、Update、Complete)都支持,两个参数可设置:

参数一:numRows,打印多少条数据,默认为20条;
参数二:truncate,如果某列值字符串太长是否截取,默认为true,截取字符串;

范例如下所示:

编程实现
可以认为Structured Streaming = SparkStreaming + SparkSQL,对流式数据处理使用SparkSQL数据结构,应用入口为SparkSession,对比SparkSQL与SparkStreaming编程:

  • Spark Streaming:将流式数据按照时间间隔(BatchInterval)划分为很多Batch,每批次数据封装在RDD中,底层RDD数据,构建StreamingContext实时消费数据;
  • Structured Streaming属于SparkSQL模块中一部分,对流式数据处理,构建SparkSession对象,指定读取Stream数据和保存Streamn数据,具体语法格式:
    • 加载数据Load:读取静态数据【spark.read】、读取流式数据【spark.readStream】
    • 保存数据Save:保存静态数据【ds/df.write】、保存流式数据【ds/df.writeStrem】

词频统计案例:从TCP Socket实消费流式数据,进行词频统计,将结果打印在控制台Console。
第一点、程序入口SparkSession,加载流式数据:spark.readStream
第二点、数据封装Dataset/DataFrame中,分析数据时,建议使用DSL编程,调用API,很少使用SQL方式
第三点、启动流式应用,设置Output结果相关信息、start方法启动应用

完整案例代码如下:

import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果打印到控制台。
*/
object StructuredWordCount {def main(args: Array[String]): Unit = {// TODO: 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2") // 设置Shuffle分区数目
.getOrCreate()
// 导入隐式转换和函数库
import spark.implicits._
import org.apache.spark.sql.functions._
// TODO: 1. 从TCP Socket 读取数据
val inputStreamDF: DataFrame = spark.readStream
.format("socket")
.option("host", "node1.itcast.cn").option("port", 9999)
.load()
/*
root
|-- value: string (nullable = true)
*/
//inputStreamDF.printSchema()
// TODO: 2. 业务分析:词频统计WordCount
val resultStreamDF: DataFrame = inputStreamDF
.as[String] // 将DataFrame转换为Dataset进行操作
// 过滤数据
.filter(line => null != line && line.trim.length > 0)
// 分割单词
.flatMap(line => line.trim.split("\\s+"))
.groupBy($"value").count() // 按照单词分组,聚合
/*
root
|-- value: string (nullable = true)
|-- count: long (nullable = false)
*/
//resultStreamDF.printSchema()
// TODO: 3. 设置Streaming应用输出及启动
val query: StreamingQuery = resultStreamDF.writeStream
// TODO: 设置输出模式:Complete表示将ResultTable中所有结果数据输出
// .outputMode(OutputMode.Complete())
// TODO: 设置输出模式:Update表示将ResultTable中有更新结果数据输出
.outputMode(OutputMode.Update())
.format("console")
.option("numRows", "10").option("truncate", "false")
// 流式应用,需要启动start
.start()
// 流式查询等待流式应用终止
query.awaitTermination()
// 等待所有任务运行完成才停止运行
query.stop()
}
}

其中可以设置不同输出模式(OutputMode),当设置为Complete时,结果表ResultTable中所有数据都输出;当设置为Update时,仅仅输出结果表ResultTable中更新的数据。

【Spark分布式内存计算框架——Structured Streaming】3. Structured Streaming —— 入门案例:WordCount相关推荐

  1. 【Spark分布式内存计算框架——Spark Core】6. RDD 持久化

    3.6 RDD 持久化 在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高 ...

  2. 【Spark分布式内存计算框架——Structured Streaming】2. Structured Streaming 核心设计与编程模型

    核心设计 2016年,Spark在2.0版本中推出了结构化流处理的模块Structured Streaming,核心设计如下: 第一点:Input and Output(输入和输出) Structur ...

  3. 【Spark分布式内存计算框架——Spark Streaming】11. 应用案例:百度搜索风云榜(下)实时窗口统计

    5.5 实时窗口统计 SparkStreaming中提供一些列窗口函数,方便对窗口数据进行分析,文档: http://spark.apache.org/docs/2.4.5/streaming-pro ...

  4. 【Spark分布式内存计算框架——Spark Streaming】10. 应用案例:百度搜索风云榜(中)实时数据ETL存储

    5.3 实时数据ETL存储 实时从Kafka Topic消费数据,提取ip地址字段,调用[ip2Region]库解析为省份和城市,存储到HDFS文件中,设置批处理时间间隔BatchInterval为1 ...

  5. 【Spark分布式内存计算框架——Spark Streaming】9. 获取偏移量 应用案例:百度搜索风云榜(上)

    4.4 获取偏移量 当SparkStreaming集成Kafka时,无论是Old Consumer API中Direct方式还是New Consumer API方式获取的数据,每批次的数据封装在Kaf ...

  6. 云计算实验2 Spark分布式内存计算框架配置及编程案例

    一. 实验目的 掌握分布式多节点计算平台Spark配置,Spark编程环境IDEA配置,示例程序启动与运行 二. 实验环境 Linux的虚拟机环境.线上操作视频和实验指导手册 三. 实验任务 完成Sp ...

  7. 云计算与大数据第16章 分布式内存计算平台Spark习题

    第16章 分布式内存计算平台Spark习题 16.1 选择题 1.Spark是Hadoop生态(  B  )组件的替代方案. A. Hadoop     B. MapReduce        C. ...

  8. java中如何合并两个网格,Hazelcast: Java分布式内存网格框架(平台)

    转自:http://blog.csdn.net/iihero/article/details/7385641 下边是它的宣传内容: hazelcast是一个开放源码集群和高度可扩展的数据分发平台,这是 ...

  9. hazelcast java_Hazelcast: Java分布式内存网格框架(平台)

    下边是它的宣传内容: hazelcast是一个开放源码集群和高度可扩展的数据分发平台,这是为Java: 1. 快如闪电;数以千计的运算/秒. 2. 故障安全;崩溃后没有丢失数据. 3. 作为新服务器的 ...

最新文章

  1. 【Vue版】实现拖拽、排序效果(注意,这个方法在chrome谷歌浏览器上面不适用,dragend会情不自禁触发drag事件先执行,有点像浏览器的一个bug)
  2. 在windows程序中嵌入Lua脚本引擎--使用VS IDE编译Luajit脚本引擎
  3. R语言数据热力图绘制实战(基于原生R函数、ggplot2包、plotly包)
  4. %w(数组)是什么意思?
  5. Linux Kernel TCP/IP Stack — L1 Layer — NIC Controller — NAPI
  6. 全球新能源汽车行业前景规模及发展趋势预测报告2022-2028年版
  7. html输入框只能输入几个,input 两个input框只能允许同时输入一个
  8. 多版本opencv 兼容
  9. angular2安装笔记
  10. Android 消息处理源代码分析(1)
  11. Linux 的字符串截取很有用。有八种方法。
  12. STL学习笔记 ---- 由set的声明所引发的自定义比较的实现方式 作者:winterTTr(转载请注明)...
  13. 鲜活的数据 : 数据可视化指南
  14. c语言 怎么访问64位地址_大神用10000字总结了嵌入式C语言必学知识点……
  15. 二十六:Struts2 和 spring整合
  16. 3559A原生CAN总线调试
  17. 甲骨文裁员,这是一个危险信号
  18. 安卓4.0后新控件TextureView解决SurfaceView在修改默认屏幕方向后(硬件导致)视频方向无法翻转的问题
  19. 本机号码校验不只是免输密码、免输短信验证码
  20. staf linux运行模式,IBM 自动化测试框架STAF介绍

热门文章

  1. 线程钩子HookC#实例
  2. VS Code 使用火狐 FireFox 调试网页
  3. 神舟电脑装linux双系统,神舟战神肿么装双系统
  4. 求解点关于直线的距离、垂足、对称点公式
  5. CSU1020-真三国无双-模拟
  6. 物联网家电第一股,想离开小米的云米现在有多少实力?
  7. 互联网金融大事件,从泛亚到e租宝敲响的警示钟?
  8. 无人驾驶 | 自动驾驶技术和机器人技术的对比
  9. S-属性定义与L-属性定义
  10. SQL查询语句分步详解——SELECT...FROM...WHERE...GROUP BY...