SparkStreaming使用SQL
直接上代码,例子来源于官网的wordcount例子
package Sparkstreamingimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}object SQLtest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SQLtest").setMaster("local[2]")val ssc = new StreamingContext(conf, Seconds(5))val lines = ssc.socketTextStream("192.168.116.10", 9999)val words = lines.flatMap(_.split(" "))words.foreachRDD { (rdd: RDD[String], time: Time) =>// Get the singleton instance of SparkSessionval spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)import spark.implicits._// Convert RDD[String] to RDD[case class] to DataFrameval wordsDataFrame = rdd.map(w => Record(w)).toDF()// Creates a temporary view using the DataFramewordsDataFrame.createOrReplaceTempView("words")// Do word count on table using SQL and print itval wordCountsDataFrame =spark.sql("select word, count(*) as total from words group by word")println(s"========= $time =========")wordCountsDataFrame.show()}ssc.start()ssc.awaitTermination()}case class Record(word: String)/** Lazily instantiated singleton instance of SparkSession */object SparkSessionSingleton {@transient private var instance: SparkSession = _def getInstance(sparkConf: SparkConf): SparkSession = {if (instance == null) {instance = SparkSession.builder.config(sparkConf).getOrCreate()}instance}}}
测试
在Linux新建一个窗口,输入
可以发现IDEA控制台已经输出结果了
SparkStreaming使用SQL相关推荐
- java 必备面试必备
1.JDK 和 JRE 有什么区别? JDK(Java Development Kit),Java开发工具包 JRE(Java Runtime Environment),Java运行环境 JDK中包含 ...
- Spark SQL 在SparkStreaming中的运用
文章内容参考地址: http://spark.apache.org/docs/2.3.0/streaming-programming-guide.html#dataframe-and-sql-oper ...
- 2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
目录 整合Kafka 0-10-开发使用 原理 1.Direct方式 2.简单的并行度1 : 1 API 注意 代码实现-自动提交偏移量到默认主题 代码实现- ...
- 2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
目录 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 注意: 代码实现 SparkStreaming实战案例六 自定义输出-foreachRDD 需求 对上述案例的结果 ...
- sql 忽略大小写_Flink使用Calcite解析Sql做维表关联(一)
点击箭头处"蓝色字",关注我们哦!! 维表关联是离线计算或者实时计算里面常见的一种处理逻辑,常常用于字段补齐.规则过滤等,一般情况下维表数据放在MySql等数据库里面,对于离线计算 ...
- sql怎么实现取当前数据以及累计7天数据_年薪60万+大佬吐血整理字节跳动大数据面试真题...
字节的面试难度一直很高,所以一直是我想攻克的目标,定下这个目标后就去准备收集了几十个字节实际面经,还包括面试前都需要做哪些准备.只能说是工欲善其事必先利其器,经过这些周全的准备,最终的面试还是很顺利的 ...
- spark sql 本地调试_干货 | 如何成为大数据Spark高手
Spark是发源于美国加州大学伯克利分校AMPLab的集群计算平台,它立足于内存计算,性能超过Hadoop百倍,从多迭代批量处理出发,兼收并蓄数据仓库.流处理和图计算等多种计算范式,是罕见的全能选手. ...
- sql 截取_如何用 SQL 找一个女朋友?
背景 作为一个sql boy,笔者认为写sql真的是很枯燥而且很简单的事情,但没想到身边的朋友竟然会写不出sql来,因此笔者突发灵感编写此文来梳理一下如何写sql,以及怎么样写好sql. 那么开头就以 ...
- 通过案例对SparkStreaming透彻理解-3
2019独角兽企业重金招聘Python工程师标准>>> 本期内容: 解密Spark Streaming Job架构和运行机制 解密Spark Streaming 容错架构和运行机制 ...
最新文章
- 26.2. Web UI
- 浅谈:Wi-Fi 6的优势及应用前景
- [原创]IrrLicht的GUI使用
- 使用opencv和python进行智能图像处理_使用OpenCV在Python中进行图像处理
- FreeRTOS源码分析与应用开发10:内存管理
- 图像处理基本概念、术语
- php中==和===的含义及区别
- 《高等代数学》(姚慕生),习题1.1:二阶行列式
- Iframe用法总结
- GEE与Landsat
- php魔方阵,利用C语言玩转魔方阵实例教程
- 应用商店-华为应用市场
- 语言 {软件开发概述}
- python爬取电影票房前50_Python3爬取起猫眼电影实时票房信息,解决文字反爬~~~附源代码...
- StarUML如何将背景变成空白
- 基于ssm的校园二手电子交易平台
- RED5的API介紹-4
- 白嫖 IObit 系列软件例如 IObit Uninstaller
- Nelder_Mead算法的简介和用作求解二维函数最小值的Python实现
- 手机恢复出厂设置命令_华为手机双清和恢复出厂设置区别 recovery恢复出厂设置步骤...