目录

SparkStreaming实战案例六 自定义输出-foreachRDD

需求

注意:

代码实现


SparkStreaming实战案例六 自定义输出-foreachRDD

需求

对上述案例的结果数据输出到控制台外的其他组件,如MySQL/HDFS

注意:

foreachRDD函数属于将DStream中结果数据RDD输出的操作,类似transform函数,针对每批次RDD数据操作,但无返回值

DStream.print方法源码底层调用的也是foreachRDD:

代码实现

package cn.itcast.streamingimport java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}/*** 使用SparkStreaming接收Socket数据,node01:9999* 对上述案例的结果数据输出到控制台外的其他组件,如MySQL/HDFS*/
object SparkStreamingDemo06_Output {def main(args: Array[String]): Unit = {//1.创建环境val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")//设置数据输出文件系统的算法版本为2//https://blog.csdn.net/u013332124/article/details/92001346.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))ssc.checkpoint("./ckp")//2.接收socket数据val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999)//3.做WordCountval wordAndCountDS: DStream[(String, Int)] = linesDS.flatMap(_.split(" ")).map((_, 1))//windowDuration:窗口长度:就算最近多久的数据,必须都是微批间隔的整数倍//slideDuration :滑动间隔:就是每隔多久计算一次,,必须都是微批间隔的整数倍//每隔10s(slideDuration :滑动间隔)计算最近20s(windowDuration:窗口长度)的热搜排行榜!.reduceByKeyAndWindow((v1:Int, v2:Int)=>v1+v2, Seconds(20),Seconds(10))//排序取TopN//注意:DStream没有直接排序的方法!所以应该调用DStream底层的RDD的排序方法!//transform(函数),该函数会作用到DStream底层的RDD上!val resultDS: DStream[(String, Int)] = wordAndCountDS.transform(rdd => {val sortedRDD: RDD[(String, Int)] = rdd.sortBy(_._2, false)val top3: Array[(String, Int)] = sortedRDD.take(3) //取出当前RDD中排好序的前3个热搜词!println("======top3--start======")top3.foreach(println)println("======top3--end======")sortedRDD})//4.输出resultDS.print()resultDS.foreachRDD((rdd,time)=>{val df: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")val batchTime: String = df.format(time.milliseconds)println("-------自定义的输出-------")println(s"batchTime:${batchTime}")println("-------自定义的输出-------")if(!rdd.isEmpty()){//-1.输出到控制台rdd.foreach(println)//-2.输出到HDFSrdd.coalesce(1).saveAsTextFile(s"hdfs://node1:8020/wordcount/output-${time.milliseconds}")//-3.输出到MySQL/*CREATE TABLE `t_hotwords` (`time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,`word` varchar(255) NOT NULL,`count` int(11) DEFAULT NULL,PRIMARY KEY (`time`,`word`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;*/rdd.foreachPartition(iter=>{val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")val sql:String = "REPLACE INTO `t_hotwords` (`time`, `word`, `count`) VALUES (?, ?, ?);"val ps: PreparedStatement = conn.prepareStatement(sql)//获取预编译语句对象iter.foreach(t=>{val word: String = t._1val count: Int = t._2ps.setTimestamp(1,new Timestamp(time.milliseconds) )ps.setString(2,word)ps.setInt(3,count)ps.addBatch()})ps.executeBatch()ps.close()conn.close()})}})//5.启动并等待程序停止ssc.start()ssc.awaitTermination()ssc.stop(stopSparkContext = true, stopGracefully = true)}
}

2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD相关推荐

  1. 2021年大数据Spark(十一):应用开发基于IDEA集成环境

    目录 Spark应用开发-基于IDEA 创建工程 WordCount本地运行 WordCount集群运行 注意 修改代码如下 打成jar包 改名 上传jar包 提交到Yarn WordCount-Ja ...

  2. 2021年大数据Kafka(十一):❤️Kafka的消费者负载均衡机制和数据积压问题❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka的消费者负载均衡机制和数据积压问题 一.kafka ...

  3. 2021年大数据HBase(十一):Apache Phoenix的视图操作

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Apache Phoenix的视图操作 一.应用场景 ...

  4. 2021年大数据Hive(十一):Hive调优

    全网最详细的大数据Hive文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Hive调优 一.本地模式 1.空key处理 二.SQL ...

  5. 2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数

    目录 SparkStreaming实战案例四 窗口函数 需求 代码实现 SparkStreaming实战案例四 窗口函数 需求 使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据! ...

  6. 2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey

    目录 SparkStreaming实战案例二 UpdateStateByKey 需求 1.updateStateByKey 2.mapWithState 代码实现 SparkStreaming实战案例 ...

  7. 2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount

    目录 SparkStreaming实战案例一 WordCount 需求 准备工作 代码实现 第一种方式:构建SparkConf对象 第二种方式:构建SparkContext对象 完整代码如下所示: 应 ...

  8. 2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明

    目录 Kafka快速回顾 消息队列: 发布/订阅模式: Kafka 重要概念: 常用命令 整合说明 两种方式 两个版本API 在实际项目中,无论使用Storm还是SparkStreaming与Flin ...

  9. 2021年大数据Spark(三十一):Spark On Hive

    目录 Spark On Hive spark-sql中集成Hive Spark代码中集成Hive Spark On Hive Spark SQL模块从发展来说,从Apache Hive框架而来,发展历 ...

最新文章

  1. 正则表达式(Regular Expression)
  2. hihocoder1513 小Hi的烦恼
  3. 【Android 逆向】ART 脱壳 ( 修改 /art/runtime/dex_file.cc#OpenCommon 系统源码进行脱壳 )
  4. MyEclipse CI 2018.8.0正式发布(附下载)
  5. Redis持久化:AOF和RDB
  6. 释疑のCONTEXTS
  7. list(map(list,zip(*a)))
  8. Node.js基础知识普及
  9. 推荐一个比较好用的Chrome扩展应用,提供了桌面便签功能
  10. storm apache_Apache Storm的实时情绪分析示例
  11. Newtonsoft.Json 获取匿名类数据
  12. 清华大学开始招收高二学生,数学天赋是最重要的入围条件
  13. 互联网运营遇到瓶颈?这套数据运营体系,高手和小白都必看
  14. 周志华任大会首个华人程序主席!
  15. “鉴定一下网络热门平台上的小广告”
  16. c++ string类型转换为char *类型
  17. pandas 调整列的顺序
  18. 华为认证hcia含金量_华为hcna认证含金量高吗 华为hcna认证用处大吗
  19. 不精确微分/不完整微分(Inexact differential/Imperfect differential)
  20. android 获取经纬度的三种方法,Android获取经纬度

热门文章

  1. 2021-2027年中国中空纤维膜行业市场研究及前瞻分析报告
  2. C++ 笔记(27)— 指针变量、数组和指针、指针数组、数组指针、指针常量与常量指针
  3. python 实现桶排序
  4. Java堆和栈的基本理解
  5. Jackson、FastJson快速入门(整合SpringMVC)
  6. TensorFlow分布式(多GPU和多服务器)详解
  7. Xilinx Zynq FPGA Boards板
  8. 虚拟纹理与几何图像技术
  9. 解决每次git pull需要不用输入用户名信息
  10. Python:numpy生成正态分布的平均数