Spark Streaming与Spark SQL协同工作

Spark Streaming可以和Spark Core,Spark SQL整合在一起使用,这也是它最强大的一个地方。

实例:实时统计搜索次数大于3次的搜索词

package StreamingDemoimport org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Spark Streaming与Spark相结合* 需求:实时统计搜索次数大于3次的搜索词*/
object StreamingAndSQLDemo {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.WARN)System.setProperty("HADOOP_USER_NAME", "Setsuna")val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")val ssc = new StreamingContext(conf, Seconds(2))//开启checkpointssc.checkpoint("hdfs://Hadoop01:9000/checkpoint")//分词,wordcount,记录状态val resultDStream =ssc.socketTextStream("Hadoop01", 6666).flatMap(_.split(" ")).map((_, 1)).updateStateByKey((values: Seq[Int], state: Option[Int]) => {var count = state.getOrElse(0)for (value <- values) {count += value}Option(count)})//将wordcount结果存进表里resultDStream.foreachRDD(rdd=>{//创建SparkSession对象val sparkSession=SparkSession.builder().getOrCreate()//创建Row类型的RDDval rowRDD=rdd.map(x=>Row(x._1,x._2))//创建schemaval schema=StructType(List(StructField("word",StringType,true),StructField("count",IntegerType,true)))//创建DataFrame,并注册临时视图sparkSession.createDataFrame(rowRDD,schema).createOrReplaceTempView("wordcount")//进行查询并在Console里输出sparkSession.sql("select * from wordcount where count>3").show()})ssc.start()ssc.awaitTermination()}
}

测试
在nc里输入数据

Console里的输出

Spark学习笔记:Spark Streaming与Spark SQL协同工作相关推荐

  1. Spark学习笔记1——第一个Spark程序:单词数统计

    Spark学习笔记1--第一个Spark程序:单词数统计 笔记摘抄自 [美] Holden Karau 等著的<Spark快速大数据分析> 添加依赖 通过 Maven 添加 Spark-c ...

  2. 大数据实时计算Spark学习笔记(9)—— Spar SQL(1) 读取 json 文件

    1 Spark SQL 编程方式:(1)SQL;(2) DataFrame API scala> case class Customer(id:Int,name:String,age:Int) ...

  3. spark 学习笔记

    spark 学习笔记 spark介绍 Spark是是一种快速通用的集群计算系统,它的主要特点是能够在内存中进行计算.它包含了 spark 核心组件 spark-core,用于 SQL 和结构化处理数据 ...

  4. Spark学习笔记[1]-scala环境安装与基本语法

    Spark学习笔记[1]-scala环境安装与基本语法   正所谓工欲善其事必先利其器,Spark的开发语言不是java而是scala,虽然都是运行于JVM,但是两门语言的基本特性还是有些不一样,这里 ...

  5. C# 学习笔记(19)操作SQL Server下

    C# 学习笔记(19)操作SQL Server下 ADO.net操作数据库 这应该是比较老的技术了,以后有空的话学学 Linq /// <summary> /// 数据库使用类 /// & ...

  6. C# 学习笔记(18)操作SQL Server 中

    C# 学习笔记(18)操作SQL Server 中 数据库基础操作 SQL语法可以参考 菜鸟教程 或者微软官方的SQL示例 注意SQL不区分大小写 查 1.基础查询 --最基础的查询语句, selec ...

  7. C# 学习笔记(17)操作SQL Server 上

    C# 学习笔记(17)操作SQL Server上 安装SQL Server 微软官网 https://www.microsoft.com/zh-cn/sql-server/sql-server-dow ...

  8. cockroachdb mysql_CockroachDB学习笔记——[译]CockroachDB中的SQL:映射表中数据到键值存储...

    CockroachDB学习笔记--[译]CockroachDB中的SQL:映射表中数据到键值存储 原文标题:SQL in CockroachDB: Mapping Table Data to Key- ...

  9. mysql 临时表 事务_MySQL学习笔记十:游标/动态SQL/临时表/事务

    逆天十三少 发表于:2020-11-12 08:12 阅读: 90次 这篇教程主要讲解了MySQL学习笔记十:游标/动态SQL/临时表/事务,并附有相关的代码样列,我觉得非常有帮助,现在分享出来大家一 ...

  10. 【学习笔记】一些postgreSQL常用sql语句

    [学习笔记]一些postgreSQL常用sql语句 1.序列自增id 2.将从一个表中查出来的数据插入到另一个表中 3.sql更新替换字段中某个字 4.统计一个字段多个值的次数 5.统计某个字段重复项 ...

最新文章

  1. Windows环境下的NodeJS+NPM+Bower安装配置
  2. Android recipe 在代码中写布局
  3. 每天一个linux命令(7):mv命令
  4. 04——确定对象使用前被初始化
  5. 算法--06年华为面试:求两个数组的最小差值(Java实现)
  6. OpenCASCADE绘制测试线束:几何命令之概述
  7. java实用教程——组件及事件处理——DocumentEvent事件
  8. 针对提高48V 配电性能的诸多思考!
  9. se是什么职位_女皇大学PSE&SE 独家解析!
  10. HDU 1034 - Candy Sharing Game
  11. HandlerMethodArgumentResolver 参数解析器
  12. 190407每日一句
  13. 电脑复制,电脑复制粘贴,详细教您电脑不能复制粘贴怎么办
  14. 英文缩写400个速查
  15. pointer-events用法
  16. 带得动ps和python的笔记本_配台电脑,能玩LOL顶配和能够写一些python脚本能用ps不卡,预算6k到8k?...
  17. Rasa课程、Rasa培训、Rasa面试系列之: Rasa NLU意图和实体-分词器
  18. IDEA快捷键转换大小写
  19. 习惯养成微信小程序的设计与实现
  20. 2020年度总结-送你一张腾讯视频VIP月卡

热门文章

  1. 像外行一样思考,像专家一样实践——科研成功之道(修订版)
  2. 2021中国华录杯·算法大赛直通车!
  3. 如何将英文PDF翻译成中文且格式不变?(PDF免费翻译攻略)
  4. web前端基础——Less语法
  5. 谷歌浏览器应用翻译插件,
  6. 解决Tomcat运行内存不足问题
  7. asp.net mvc3 简单的文件上传下载
  8. TINA仿真系列之555定时器
  9. VUE项目开发,使用开发者工具查看源文件
  10. 循环不变式、数学归纳法、归纳推理和演绎推理学习总结