文章内容参考地址:

http://spark.apache.org/docs/2.3.0/streaming-programming-guide.html#dataframe-and-sql-operations

你可以使用SparkStreaming 中使用的SparkContext 来创建一个SparkSession   ,每个RDD 被转换成一个DataFrame,注册为临时表,然后使用SQL查询。

创建三个java类

package com.study.sqlandstreaming;public class JavaRow implements java.io.Serializable {private String word;public String getWord() {return word;}public void setWord(String word) {this.word = word;}}
package com.study.sqlandstreaming;import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;public class JavaSparkSessionSingleton{private static  transient SparkSession instance =null;public static SparkSession getInstance(SparkConf sparkConf){if(instance ==null){instance=SparkSession.builder().config(sparkConf).getOrCreate();}return  instance;}}
package com.study.sqlandstreaming;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import java.util.Arrays;
import java.util.Iterator;
import java.util.List;/*** @author :  yulei* @data :  2018/9/14 10:32* @Version :  1.0**/public class TestSparkSQLAndStreaming {public static void main(String[] args) throws Exception {SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("JavaSqlNetworkWordCount");JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));JavaReceiverInputDStream<String> lines = ssc.socketTextStream("192.168.44.128",9999);JavaDStream<String> words  = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {List<String> list = Arrays.asList( s.split(" "));return list.iterator();}});// Convert RDDs of the words DStream to DataFrame and run SQL query/*************START****************/words.foreachRDD((rdd,time)->{// Get the singleton instance of SparkSessionSparkSession spark = JavaSparkSessionSingleton.getInstance(sparkConf);// Convert RDD[String] to RDD[case class] to DataFrameJavaRDD<JavaRow> rowRDD = rdd.map( word->{JavaRow record = new JavaRow();record.setWord(word);return record;});Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);wordsDataFrame.createOrReplaceTempView("words");// Do word count on table using SQL and print itDataset<Row>     wordCountsDataFrame =  spark.sql("select word, count(*) as total from words group by word ");System.out.println("========= " + time + "=========");wordCountsDataFrame.show();});/*************END****************/ssc.start();ssc.awaitTermination();}}

输入:

[hadoop@s201 ~]$nc -lk 9999
he he sdf wd wd ss ss ss 

输出结果大致如下:

========= 1536898010000 ms=========
+----+-----+
|word|total|
+----+-----+
+----+-----+18/09/14 12:06:54 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/09/14 12:06:54 WARN BlockManager: Block input-0-1536898014600 replicated to only 0 peer(s) instead of 1 peers
========= 1536898015000 ms=========
+----+-----+
|word|total|
+----+-----+
|  ss|    3|
| sdf|    1|
|  wd|    2|
|  he|    2|
+----+-----+========= 1536898020000 ms=========
+----+-----+
|word|total|
+----+-----+
+----+-----+

Spark SQL 在SparkStreaming中的运用相关推荐

  1. spark官方文档_这些未在 Spark SQL 文档中说明的优化措施,你知道吗?

    本文来自上周(2020-11-17至2020-11-19)举办的 Data + AI Summit 2020 (原 Spark+AI Summit),主题为<Spark SQL Beyond O ...

  2. Spark SQL来读取现有Hive中的数据

    Spark SQL主要目的是使得用户可以在Spark上使用SQL,其数据源既可以是RDD,也可以是外部的数据源(比如Parquet.Hive.Json等). Spark SQL的其中一个分支就是Spa ...

  3. 【Spark】扩展Spark Catalyst,打造自定义的Spark SQL引擎

    1.概述 转载自:扩展Spark Catalyst,打造自定义的Spark SQL引擎 Apache Spark是大数据处理领域最常用的计算引擎之一,被应用在各种各样的场景中,除了易用的API,稳定高 ...

  4. 使用Spark SQL读取Hive上的数据

    Spark SQL主要目的是使得用户可以在Spark上使用SQL,其数据源既可以是RDD,也可以是外部的数据源(比如Parquet.Hive.Json等).Spark SQL的其中一个分支就是Spar ...

  5. 大数据进阶之路——Spark SQL 之 DataFrameDataset

    文章目录 dataframe 和 rdd API常用操作 DataFrame和RDD 案例 DataSet DataFrame它不是Spark SQL提出的,而是早起在R.Pandas语言就已经有了的 ...

  6. Spark SQL 工作流程源码解析(四)optimization 阶段(基于 Spark 3.3.0)

    前言 本文隶属于专栏<大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见大数据技术体系 目录 Spark SQL 工 ...

  7. 极光笔记丨Spark SQL 在极光的建设实践

    作者:极光高级工程师-蔡祖光 前言 Spark在2018开始在极光大数据平台部署使用,历经多个版本的迭代,逐步成为离线计算的核心引擎.当前在极光大数据平台每天运行的Spark任务有20000+,执行的 ...

  8. 深入理解Spark SQL原理

    1.前言   本文是对自己阅读Spark SQL源码过程的一个记录,主线是对尚硅谷Spark SQL最后练习中建立的表的一个简单SQL编写的源码实现流程的跟读.通过自问自答的方式,学习完了整个Spar ...

  9. 【未完成】[Spark SQL_2] 在 IDEA 中编写 Spark SQL 程序

    0. 说明 在 IDEA 中编写 Spark SQL 程序,分别编写 Java 程序 & Scala 程序 1. 编写 Java 程序 待补充 2. 编写 Scala 程序 待补充 转载于:h ...

最新文章

  1. Android之安装常见的一些解决方法
  2. 关于虚拟内存,你需要了解的一些概念
  3. Latex常用数学符号
  4. 总体参数的估计(概念)
  5. 基于Coravel定时任务之计算总页数
  6. R语言实战案例-蒙特卡罗方法(附实现代码)
  7. Ansible Inventory指北进阶
  8. SAP UI5 walkthrough第一第二部分解析:data-sap-ui-libs=“sap.ui.commons,sap.ui.table“
  9. TFS创建登录用户并连接TFS
  10. ZooKeeper程序员指南--使用ZooKeeper开发分布式应用程序
  11. jQuery常用方法
  12. python相对路径import 方法_Python 从相对路径下import的方法
  13. cnn输入层_一文掌握CNN卷积神经网络
  14. python 正则表达式 分组_正则表达式之分组的用法
  15. 魅族m8开发 step by step(1)(让程序跑起来)
  16. cvSetMouseCallback()鼠标坐标、事件返回
  17. 微信公众号h5开发流程
  18. 瞄准千亿工业物联网市场,有人物联网为2万企业级用户提供完整可靠方案
  19. slidebox使用教程 设定焦点数量
  20. 深度系统linux deepin如何按装,安装深度Deepin 15.11操作系统的方法

热门文章

  1. oracle常用分析函数与聚合函数的用法
  2. 湖大计算机人工智能专业导师,湖南大学考研研究生导师简介-贺旭
  3. 阿里实习生java面试
  4. 网线使用指南(如何买网线?怎么接网线?怎么修网线?)
  5. ArcGIS 图像合并至新栅格图层(Mosaic To New Raster和Mosaic)
  6. 梦幻西游手游300级装备、30级宝石、抽奖后台、坐骑版本搭建技术讲解
  7. Oracle数据库系统结构一(存储结构)
  8. i7 10700和10700f 10700k这三个CPU有什么区别
  9. 网络工程师常用的命令整理-windows版,还不快收藏起来
  10. unity urp内置lit材质源码解析(下)