Structured Streaming 入门案例之WordCount
1、编写一个流式计算的应用, 不断的接收外部系统的消息
2、对消息中的单词进行词频统计
3、统计全局的结果
步骤
Socket Server
等待Structured Streaming
程序连接Structured Streaming
程序启动, 连接Socket Server
, 等待Socket Server
发送数据Socket Server
发送数据,Structured Streaming
程序接收数据Structured Streaming
程序接收到数据后处理数据- 数据处理后, 生成对应的结果集, 在控制台打印
代码
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{Dataset, KeyValueGroupedDataset, SparkSession}object StructDemo extends App {private val spark: SparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()import spark.implicits._//receive nc data//Returns a DataStreamReader that can be used to read streaming data in as a DataFrame.private val ds: Dataset[String] = spark.readStream.format("socket").option("host", "mypc01").option("port", 10087).load().as[String]private val value: KeyValueGroupedDataset[String, (String, Int)] = ds.flatMap((_.split(" "))).map((_, 1)).groupByKey(_._1)private val value1: Dataset[(String, Long)] = value.count()value1.writeStream.outputMode(OutputMode.Complete()).format("console").start().awaitTermination()
}
换种写法 .sql风格
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StructDemo2 extends App {private val spark: SparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()import spark.implicits._//receive nc data//Returns a DataStreamReader that can be used to read streaming data in as a DataFrame.private val ds: Dataset[String] = spark.readStream.format("socket").option("host", "mypc01").option("port", 10087).load().as[String]ds.flatMap((_.split(" "))).map((_, 1)).toDF("word", "num").createTempView("tmp")val sql="""|select word,count(1)|from tmp|group by word|""".stripMarginprivate val frame: DataFrame = spark.sql(sql)frame.writeStream.outputMode(OutputMode.Complete()).format("console").start().awaitTermination()
}
总结
1、Structured Streaming 中的编程步骤依然是先读, 后处理, 最后落地
2、Structured Streaming 中的编程模型依然是 DataFrame 和 Dataset
3、Structured Streaming 中依然是有外部数据源读写框架的, 叫做 readStream 和 writeStream
4、Structured Streaming 和 SparkSQL 几乎没有区别, 唯一的区别是, readStream 读出来的是流, writeStream 是将流输出, 而 SparkSQL 中的批处理使用 read 和 write
Structured Streaming 入门案例之WordCount相关推荐
- 【Spark分布式内存计算框架——Structured Streaming】3. Structured Streaming —— 入门案例:WordCount
1.3 入门案例:WordCount 入门案例与SparkStreaming的入门案例基本一致:实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台C ...
- flink入门案例之WordCount
flink入门案例之WordCount,以下测试代码都是在本地执行的 添加依赖 添加maven依赖 <dependencies><dependency><groupId& ...
- 09_Flink入门案例、word-count程序(java和scala版本)、添加依赖、Flink Streaming和Batch的区别 、在集群上执行程序等
1.9.Flink入门案例-wordCount 1.9.1.开发工具 1.9.2.编写java版本word-count程序 1.9.2.1.添加Flink Maven依赖 1.9.2.2.编写word ...
- SparkStreaming 入门案例之wordcount
案例概述 以nc作为源发送数据 案例演示 创建nc源,用于发送数据. [root@mypc01 ~]# nc -lk mypc01 10086 创建maven工程,导入依赖 <dependenc ...
- updateStateByKey算子入门案例之wordCount
概念 有一个参数,是个函数,该函数有两个参数,第一个是序列类型,第二个是Option类型 def updateStateByKey[S : ClassTag](updateFunc: (Seq[V], ...
- Structured Streaming基础入门
Structured Streaming 1. 回顾和展望 1.1. Spark 编程模型的进化过程 RDD rdd.flatMap(_.split(" ")).map((_, 1 ...
- Structured Streaming 开发入门
Structured Streaming 作为 Spark 家族的新成员,通过 Spark SQL/DataFrame 来处理 Batch/Streaming 数据,基本的 SparkSQL API ...
- Structured Streaming系列-1、Structured Streaming
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 传送门:大数据系列文章目录 官方网址:http://spark.apache.org/. ht ...
- 大数据Spark Structured Streaming
目录 1 Spark Streaming 不足 2 Structured Streaming 概述 2.1 模块介绍 2.3 编程模型 3 入门案例:WordCount 3.1 功能演示 3.2 So ...
最新文章
- java代码二进制转为十六进制_Java 中二进制转换成十六进制的两种实现方法
- 字符文本中的字符太多_文本对抗---字符级别的攻击
- php万年历月份处理_php实现万年历的完整代码
- 【spark】SparkSession的API
- 程序员:开汽车,难道我要知道汽车的原理才能把车开好吗?
- python报错:xml.parsers.expat.ExpatError: not well-formed (invalid token): line 3, column 1的解决办法
- matplotlib安装失败_Python | 安装中遇到“0x80072f7d 未指定的错误”
- 说说对javaee中的session的理解
- Linux---线程池的实现
- 用jk触发器构成二分频电路_模拟电路,电子电路,二极管,放大电路
- Python3+Selenium3自动化测试-(四)
- 江苏省计算机二级C操作题汇编
- Java——异常和断言
- 毕业设计 - - -数码交流论坛项目功能分析(暂时
- python编程实战(一):用户登录模块,用户注册、登录、信息管理、功能设计与实现!
- xlsxwriter
- mplay readme
- 聊聊Java中的System类
- python精灵什么意思_图像和精灵有什么区别?
- TensorFlow2.8.0报错TypeError: Descriptors cannot not be created directly.
热门文章
- android便签的作用,安卓手机中的便签有什么用?
- linux学习笔记:更换国内网易163 yum 源
- angelajs中ajax,Fabric.js Triangle angle属性用法及代码示例
- ftp搜索文件_CrossFTP for Mac(FTP客户端)
- datagridview滚动条自动滚动_一个自适应滚动条的实现
- mysql 存储过程游标 循环输出select 查询结果
- Python求1~300之间所有的完数
- spring学习--引入外部文件,初始化属性
- mysql基本sql语句总结(一)
- python 类 super_python的类的super()