直接上代码,例子来源于官网的wordcount例子

package Sparkstreamingimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}object SQLtest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SQLtest").setMaster("local[2]")val ssc = new StreamingContext(conf, Seconds(5))val lines = ssc.socketTextStream("192.168.116.10", 9999)val words = lines.flatMap(_.split(" "))words.foreachRDD { (rdd: RDD[String], time: Time) =>// Get the singleton instance of SparkSessionval spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)import spark.implicits._// Convert RDD[String] to RDD[case class] to DataFrameval wordsDataFrame = rdd.map(w => Record(w)).toDF()// Creates a temporary view using the DataFramewordsDataFrame.createOrReplaceTempView("words")// Do word count on table using SQL and print itval wordCountsDataFrame =spark.sql("select word, count(*) as total from words group by word")println(s"========= $time =========")wordCountsDataFrame.show()}ssc.start()ssc.awaitTermination()}case class Record(word: String)/** Lazily instantiated singleton instance of SparkSession */object SparkSessionSingleton {@transient private var instance: SparkSession = _def getInstance(sparkConf: SparkConf): SparkSession = {if (instance == null) {instance = SparkSession.builder.config(sparkConf).getOrCreate()}instance}}}

测试

在Linux新建一个窗口,输入

可以发现IDEA控制台已经输出结果了

SparkStreaming使用SQL相关推荐

  1. java 必备面试必备

    1.JDK 和 JRE 有什么区别? JDK(Java Development Kit),Java开发工具包 JRE(Java Runtime Environment),Java运行环境 JDK中包含 ...

  2. Spark SQL 在SparkStreaming中的运用

    文章内容参考地址: http://spark.apache.org/docs/2.3.0/streaming-programming-guide.html#dataframe-and-sql-oper ...

  3. 2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用

    目录 整合Kafka 0-10-开发使用 原理 1.Direct方式 2.简单的并行度1 : 1 ​​​​​​​API 注意 ​​​​​​​代码实现-自动提交偏移量到默认主题 ​​​​​​​代码实现- ...

  4. 2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD

    目录 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 注意: 代码实现 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 对上述案例的结果 ...

  5. sql 忽略大小写_Flink使用Calcite解析Sql做维表关联(一)

    点击箭头处"蓝色字",关注我们哦!! 维表关联是离线计算或者实时计算里面常见的一种处理逻辑,常常用于字段补齐.规则过滤等,一般情况下维表数据放在MySql等数据库里面,对于离线计算 ...

  6. sql怎么实现取当前数据以及累计7天数据_年薪60万+大佬吐血整理字节跳动大数据面试真题...

    字节的面试难度一直很高,所以一直是我想攻克的目标,定下这个目标后就去准备收集了几十个字节实际面经,还包括面试前都需要做哪些准备.只能说是工欲善其事必先利其器,经过这些周全的准备,最终的面试还是很顺利的 ...

  7. spark sql 本地调试_干货 | 如何成为大数据Spark高手

    Spark是发源于美国加州大学伯克利分校AMPLab的集群计算平台,它立足于内存计算,性能超过Hadoop百倍,从多迭代批量处理出发,兼收并蓄数据仓库.流处理和图计算等多种计算范式,是罕见的全能选手. ...

  8. sql 截取_如何用 SQL 找一个女朋友?

    背景 作为一个sql boy,笔者认为写sql真的是很枯燥而且很简单的事情,但没想到身边的朋友竟然会写不出sql来,因此笔者突发灵感编写此文来梳理一下如何写sql,以及怎么样写好sql. 那么开头就以 ...

  9. 通过案例对SparkStreaming透彻理解-3

    2019独角兽企业重金招聘Python工程师标准>>> 本期内容: 解密Spark Streaming Job架构和运行机制 解密Spark Streaming 容错架构和运行机制 ...

最新文章

  1. 26.2. Web UI
  2. 浅谈:Wi-Fi 6的优势及应用前景
  3. [原创]IrrLicht的GUI使用
  4. 使用opencv和python进行智能图像处理_使用OpenCV在Python中进行图像处理
  5. FreeRTOS源码分析与应用开发10:内存管理
  6. 图像处理基本概念、术语
  7. php中==和===的含义及区别
  8. 《高等代数学》(姚慕生),习题1.1:二阶行列式
  9. Iframe用法总结
  10. GEE与Landsat
  11. php魔方阵,利用C语言玩转魔方阵实例教程
  12. 应用商店-华为应用市场
  13. 语言 {软件开发概述}
  14. python爬取电影票房前50_Python3爬取起猫眼电影实时票房信息,解决文字反爬~~~附源代码...
  15. StarUML如何将背景变成空白
  16. 基于ssm的校园二手电子交易平台
  17. RED5的API介紹-4
  18. 白嫖 IObit 系列软件例如 IObit Uninstaller
  19. Nelder_Mead算法的简介和用作求解二维函数最小值的Python实现
  20. 手机恢复出厂设置命令_华为手机双清和恢复出厂设置区别 recovery恢复出厂设置步骤...

热门文章

  1. Openfire安装
  2. 开课吧:C++学习的方向是什么?
  3. Mac使用技巧:系统升级后指纹无法使用!
  4. 2021牛客暑期多校训练营9C-Cells【LGV引理,范德蒙德行列式】
  5. java 最大递减数_华为机试题:最大递减数
  6. 比尔盖茨怼加密货币「能直接致死」引发众怒
  7. LaTeX 文档排版教程
  8. opencl non_uniform_workgroup
  9. mysql序列号生成_值得一看!数据库及Mysql入门,附详细安装教程
  10. 面试的时候如何介绍自己!