Spark SQL 在SparkStreaming中的运用
文章内容参考地址:
http://spark.apache.org/docs/2.3.0/streaming-programming-guide.html#dataframe-and-sql-operations
你可以使用SparkStreaming 中使用的SparkContext 来创建一个SparkSession ,每个RDD 被转换成一个DataFrame,注册为临时表,然后使用SQL查询。
创建三个java类
package com.study.sqlandstreaming;public class JavaRow implements java.io.Serializable {private String word;public String getWord() {return word;}public void setWord(String word) {this.word = word;}}
package com.study.sqlandstreaming;import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;public class JavaSparkSessionSingleton{private static transient SparkSession instance =null;public static SparkSession getInstance(SparkConf sparkConf){if(instance ==null){instance=SparkSession.builder().config(sparkConf).getOrCreate();}return instance;}}
package com.study.sqlandstreaming;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import java.util.Arrays;
import java.util.Iterator;
import java.util.List;/*** @author : yulei* @data : 2018/9/14 10:32* @Version : 1.0**/public class TestSparkSQLAndStreaming {public static void main(String[] args) throws Exception {SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("JavaSqlNetworkWordCount");JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));JavaReceiverInputDStream<String> lines = ssc.socketTextStream("192.168.44.128",9999);JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {List<String> list = Arrays.asList( s.split(" "));return list.iterator();}});// Convert RDDs of the words DStream to DataFrame and run SQL query/*************START****************/words.foreachRDD((rdd,time)->{// Get the singleton instance of SparkSessionSparkSession spark = JavaSparkSessionSingleton.getInstance(sparkConf);// Convert RDD[String] to RDD[case class] to DataFrameJavaRDD<JavaRow> rowRDD = rdd.map( word->{JavaRow record = new JavaRow();record.setWord(word);return record;});Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);wordsDataFrame.createOrReplaceTempView("words");// Do word count on table using SQL and print itDataset<Row> wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word ");System.out.println("========= " + time + "=========");wordCountsDataFrame.show();});/*************END****************/ssc.start();ssc.awaitTermination();}}
输入:
[hadoop@s201 ~]$nc -lk 9999
he he sdf wd wd ss ss ss
输出结果大致如下:
========= 1536898010000 ms=========
+----+-----+
|word|total|
+----+-----+
+----+-----+18/09/14 12:06:54 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/09/14 12:06:54 WARN BlockManager: Block input-0-1536898014600 replicated to only 0 peer(s) instead of 1 peers
========= 1536898015000 ms=========
+----+-----+
|word|total|
+----+-----+
| ss| 3|
| sdf| 1|
| wd| 2|
| he| 2|
+----+-----+========= 1536898020000 ms=========
+----+-----+
|word|total|
+----+-----+
+----+-----+
Spark SQL 在SparkStreaming中的运用相关推荐
- spark官方文档_这些未在 Spark SQL 文档中说明的优化措施,你知道吗?
本文来自上周(2020-11-17至2020-11-19)举办的 Data + AI Summit 2020 (原 Spark+AI Summit),主题为<Spark SQL Beyond O ...
- Spark SQL来读取现有Hive中的数据
Spark SQL主要目的是使得用户可以在Spark上使用SQL,其数据源既可以是RDD,也可以是外部的数据源(比如Parquet.Hive.Json等). Spark SQL的其中一个分支就是Spa ...
- 【Spark】扩展Spark Catalyst,打造自定义的Spark SQL引擎
1.概述 转载自:扩展Spark Catalyst,打造自定义的Spark SQL引擎 Apache Spark是大数据处理领域最常用的计算引擎之一,被应用在各种各样的场景中,除了易用的API,稳定高 ...
- 使用Spark SQL读取Hive上的数据
Spark SQL主要目的是使得用户可以在Spark上使用SQL,其数据源既可以是RDD,也可以是外部的数据源(比如Parquet.Hive.Json等).Spark SQL的其中一个分支就是Spar ...
- 大数据进阶之路——Spark SQL 之 DataFrameDataset
文章目录 dataframe 和 rdd API常用操作 DataFrame和RDD 案例 DataSet DataFrame它不是Spark SQL提出的,而是早起在R.Pandas语言就已经有了的 ...
- Spark SQL 工作流程源码解析(四)optimization 阶段(基于 Spark 3.3.0)
前言 本文隶属于专栏<大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见大数据技术体系 目录 Spark SQL 工 ...
- 极光笔记丨Spark SQL 在极光的建设实践
作者:极光高级工程师-蔡祖光 前言 Spark在2018开始在极光大数据平台部署使用,历经多个版本的迭代,逐步成为离线计算的核心引擎.当前在极光大数据平台每天运行的Spark任务有20000+,执行的 ...
- 深入理解Spark SQL原理
1.前言 本文是对自己阅读Spark SQL源码过程的一个记录,主线是对尚硅谷Spark SQL最后练习中建立的表的一个简单SQL编写的源码实现流程的跟读.通过自问自答的方式,学习完了整个Spar ...
- 【未完成】[Spark SQL_2] 在 IDEA 中编写 Spark SQL 程序
0. 说明 在 IDEA 中编写 Spark SQL 程序,分别编写 Java 程序 & Scala 程序 1. 编写 Java 程序 待补充 2. 编写 Scala 程序 待补充 转载于:h ...
最新文章
- Android之安装常见的一些解决方法
- 关于虚拟内存,你需要了解的一些概念
- Latex常用数学符号
- 总体参数的估计(概念)
- 基于Coravel定时任务之计算总页数
- R语言实战案例-蒙特卡罗方法(附实现代码)
- Ansible Inventory指北进阶
- SAP UI5 walkthrough第一第二部分解析:data-sap-ui-libs=“sap.ui.commons,sap.ui.table“
- TFS创建登录用户并连接TFS
- ZooKeeper程序员指南--使用ZooKeeper开发分布式应用程序
- jQuery常用方法
- python相对路径import 方法_Python 从相对路径下import的方法
- cnn输入层_一文掌握CNN卷积神经网络
- python 正则表达式 分组_正则表达式之分组的用法
- 魅族m8开发 step by step(1)(让程序跑起来)
- cvSetMouseCallback()鼠标坐标、事件返回
- 微信公众号h5开发流程
- 瞄准千亿工业物联网市场,有人物联网为2万企业级用户提供完整可靠方案
- slidebox使用教程 设定焦点数量
- 深度系统linux deepin如何按装,安装深度Deepin 15.11操作系统的方法