1、编写一个流式计算的应用, 不断的接收外部系统的消息
2、对消息中的单词进行词频统计
3、统计全局的结果


步骤

  1. Socket Server 等待 Structured Streaming 程序连接
  2. Structured Streaming 程序启动, 连接 Socket Server, 等待 Socket Server 发送数据
  3. Socket Server 发送数据, Structured Streaming 程序接收数据
  4. Structured Streaming 程序接收到数据后处理数据
  5. 数据处理后, 生成对应的结果集, 在控制台打印

代码

import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{Dataset, KeyValueGroupedDataset, SparkSession}object StructDemo extends App {private val spark: SparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()import spark.implicits._//receive nc data//Returns a DataStreamReader that can be used to read streaming data in as a DataFrame.private val ds: Dataset[String] = spark.readStream.format("socket").option("host", "mypc01").option("port", 10087).load().as[String]private val value: KeyValueGroupedDataset[String, (String, Int)] = ds.flatMap((_.split(" "))).map((_, 1)).groupByKey(_._1)private val value1: Dataset[(String, Long)] = value.count()value1.writeStream.outputMode(OutputMode.Complete()).format("console").start().awaitTermination()
}

换种写法 .sql风格

import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object StructDemo2 extends App {private val spark: SparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()import spark.implicits._//receive nc data//Returns a DataStreamReader that can be used to read streaming data in as a DataFrame.private val ds: Dataset[String] = spark.readStream.format("socket").option("host", "mypc01").option("port", 10087).load().as[String]ds.flatMap((_.split(" "))).map((_, 1)).toDF("word", "num").createTempView("tmp")val sql="""|select word,count(1)|from tmp|group by word|""".stripMarginprivate val frame: DataFrame = spark.sql(sql)frame.writeStream.outputMode(OutputMode.Complete()).format("console").start().awaitTermination()
}

总结

1、Structured Streaming 中的编程步骤依然是先读, 后处理, 最后落地
2、Structured Streaming 中的编程模型依然是 DataFrame 和 Dataset
3、Structured Streaming 中依然是有外部数据源读写框架的, 叫做 readStream 和 writeStream
4、Structured Streaming 和 SparkSQL 几乎没有区别, 唯一的区别是, readStream 读出来的是流, writeStream 是将流输出, 而 SparkSQL 中的批处理使用 read 和 write

Structured Streaming 入门案例之WordCount相关推荐

  1. 【Spark分布式内存计算框架——Structured Streaming】3. Structured Streaming —— 入门案例:WordCount

    1.3 入门案例:WordCount 入门案例与SparkStreaming的入门案例基本一致:实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台C ...

  2. flink入门案例之WordCount

    flink入门案例之WordCount,以下测试代码都是在本地执行的 添加依赖 添加maven依赖 <dependencies><dependency><groupId& ...

  3. 09_Flink入门案例、word-count程序(java和scala版本)、添加依赖、Flink Streaming和Batch的区别 、在集群上执行程序等

    1.9.Flink入门案例-wordCount 1.9.1.开发工具 1.9.2.编写java版本word-count程序 1.9.2.1.添加Flink Maven依赖 1.9.2.2.编写word ...

  4. SparkStreaming 入门案例之wordcount

    案例概述 以nc作为源发送数据 案例演示 创建nc源,用于发送数据. [root@mypc01 ~]# nc -lk mypc01 10086 创建maven工程,导入依赖 <dependenc ...

  5. updateStateByKey算子入门案例之wordCount

    概念 有一个参数,是个函数,该函数有两个参数,第一个是序列类型,第二个是Option类型 def updateStateByKey[S : ClassTag](updateFunc: (Seq[V], ...

  6. Structured Streaming基础入门

    Structured Streaming 1. 回顾和展望 1.1. Spark 编程模型的进化过程 RDD rdd.flatMap(_.split(" ")).map((_, 1 ...

  7. Structured Streaming 开发入门

    Structured Streaming 作为 Spark 家族的新成员,通过 Spark SQL/DataFrame 来处理 Batch/Streaming 数据,基本的 SparkSQL API ...

  8. Structured Streaming系列-1、Structured Streaming

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 传送门:大数据系列文章目录 官方网址:http://spark.apache.org/. ht ...

  9. 大数据Spark Structured Streaming

    目录 1 Spark Streaming 不足 2 Structured Streaming 概述 2.1 模块介绍 2.3 编程模型 3 入门案例:WordCount 3.1 功能演示 3.2 So ...

最新文章

  1. java代码二进制转为十六进制_Java 中二进制转换成十六进制的两种实现方法
  2. 字符文本中的字符太多_文本对抗---字符级别的攻击
  3. php万年历月份处理_php实现万年历的完整代码
  4. 【spark】SparkSession的API
  5. 程序员:开汽车,难道我要知道汽车的原理才能把车开好吗?
  6. python报错:xml.parsers.expat.ExpatError: not well-formed (invalid token): line 3, column 1的解决办法
  7. matplotlib安装失败_Python | 安装中遇到“0x80072f7d 未指定的错误”
  8. 说说对javaee中的session的理解
  9. Linux---线程池的实现
  10. 用jk触发器构成二分频电路_模拟电路,电子电路,二极管,放大电路
  11. Python3+Selenium3自动化测试-(四)
  12. 江苏省计算机二级C操作题汇编
  13. Java——异常和断言
  14. 毕业设计 - - -数码交流论坛项目功能分析(暂时
  15. python编程实战(一):用户登录模块,用户注册、登录、信息管理、功能设计与实现!
  16. xlsxwriter
  17. mplay readme
  18. 聊聊Java中的System类
  19. python精灵什么意思_图像和精灵有什么区别?
  20. TensorFlow2.8.0报错TypeError: Descriptors cannot not be created directly.

热门文章

  1. android便签的作用,安卓手机中的便签有什么用?
  2. linux学习笔记:更换国内网易163 yum 源
  3. angelajs中ajax,Fabric.js Triangle angle属性用法及代码示例
  4. ftp搜索文件_CrossFTP for Mac(FTP客户端)
  5. datagridview滚动条自动滚动_一个自适应滚动条的实现
  6. mysql 存储过程游标 循环输出select 查询结果
  7. Python求1~300之间所有的完数
  8. spring学习--引入外部文件,初始化属性
  9. mysql基本sql语句总结(一)
  10. python 类 super_python的类的super()