package SparkDemo

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

object DStreamToMySQL {

//定义更新函数

def updateFunc(newValues : Seq[Int],state :Option[Int]):Option[Int] = {

val currentCount = newValues.foldLeft(0)(_+_)

val previousCount = state.getOrElse(0)

Some(currentCount+previousCount)

}

def main(args : Array[String]): Unit ={

//建立SparkStream

val conf = new SparkConf().setAppName("DStreamToMySQL")

val ssc = new StreamingContext(conf,Seconds(1))

//设置日志等级

StreamingLoggingExample.setStreamingLogLevels()

val lines = ssc.textFileStream("/tmp/yuhang.zhang/data")

val words = lines.flatMap(_.split(" "))

val pairWord = words.map((_,1))

//累计更新

val stateWordCount = pairWord.updateStateByKey[Int](updateFunc)

//将stateWordCount存入数据库

//stateWordCount中包含一堆的Rdd

//我们需要对每个Rdd中的每条数据进行处理储存

stateWordCount.foreachRDD(rdd => {

//每个rdd中包含的数据类型为(String,Int)

//我们把所有数据records定义为Iterator类型,方便我们遍历

def func(records:Iterator[(String,Int)]): Unit ={

//注意,conn和stmt定义为var不能是val

var conn: Connection = null

var stmt : PreparedStatement = null

try{

//连接数据库

val url = "jdbc:mysql://localhost:3306/spark" //地址+数据库

val user = "root"

val password = ""

conn = DriverManager.getConnection(url,user,password)

//

records.foreach(p =>{

//wordcount为表名,word和count为要插入数据的属性

//插入数据

val sql = "insert into wordcount(word,count) values(?,?)"

stmt = conn.prepareStatement(sql)

stmt.setString(1,p._1.trim)

stmt.setInt(2,p._2.toInt)

stmt.executeUpdate()

})

}catch {

case e : Exception => e.printStackTrace()

}finally {

if(stmt != null)

stmt.close()

if(conn != null)

conn.close()

}

}

val repairtitionedRDD = rdd.repartition(3)//将每个rdd重新分区

repairtitionedRDD.foreachPartition(func)//对重新分区后的rdd执行func函数

})

ssc.start()//启动

ssc.awaitTermination()//等待终止命令

}

}

inputdstream mysql_【sparkStreaming】将DStream保存在MySQL相关推荐

  1. 【sparkStreaming】将DStream保存在MySQL

    package SparkDemoimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spa ...

  2. SparkStreaming读取Kafka的Json数据然后保存到MySQL

    一般我们使用SparkStreaming消费kafka数据,获取到数据后解析,使用JDBC的方式写入数据库,如下所示. 以上的方式没什么毛病,但是当我们消费的kafka数据类型比较多样的时候,我们需要 ...

  3. 批量保存到mysql_关于保存批量数据进入mysql

    提出的要求: 生成13位纯数字的卡号与8位纯数字的卡密,要求卡号与卡密都必须全表唯一,然后保存到mysql. 思路: 1.首先mysql中将这两个字段设置唯一索引,保证这两个字段的值在该表中是唯一存在 ...

  4. python hive mysql_[7] 编写Python脚本将Hive的运算结果保存到MySQL数据库中(1) - 摩西莫西 - ITeye技术网站...

    编写Python脚本将Hive的运算结果保存到MySQL数据库中(1) 很多情况下,需要将Hive中的运算结果保存到MySQL数据库中,可以通过简单的Python脚本来实现. 例子1:如果获取Hive ...

  5. 用java程序完成从kafka队列读取消息到sparkstreaming再从sparkstreaming里把数据导入mysql中

    有一段时间没好好写博客了,因为一直在做一个比较小型的工程项目,也常常用在企业里,就是将流式数据处理收集,再将这些流式数据进行一些计算以后再保存在mysql上,这是一套比较完整的流程,并且可以从数据库中 ...

  6. kafka偏移量保存到mysql里_SparkStreaming+kafka保存offset的偏移量到mysql案例

    MySQL创建存储offset的表格 mysql> use test mysql> create table hlw_offset( topic varchar(32), groupid ...

  7. 2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream

    目录 SparkStreaming数据抽象-DStream DStream 是什么 DStream Operations Transformation Output函数 SparkStreaming数 ...

  8. mysql打印语句_大数据挖掘—(八):scrapy爬取数据保存到MySql数据库

    (大数据挖掘-(七):读懂MySql数据库操作)(大数据挖掘神器--scrapy spider爬虫框架(五):解析多层网页) 通过往期的文章分享,我们了解了如何爬取想要的数据到Items中,也了解了如 ...

  9. pandas对象保存到mysql出错提示“BLOB/TEXT column used in key specification without a key length”解决办法

    问题 将DataFrame数据保存到mysql中时,出现错误提示: BLOB/TEXT column used in key specification without a key length 原因 ...

最新文章

  1. 几个书本上不常见到的C语言函数
  2. 湖南大学超级计算机中心 舒教授,湖南大学岳麓书院哲学系舒远招教授应邀来我院讲学...
  3. js添加keyword让搜索引擎能够搜到_搜索引擎优化考试培训任务书模板 ——XX网站SEO优化方案...
  4. SpringCloud微框架系列整体模块梳理
  5. js 节点 选中ztree_zTree使用
  6. Netty工作笔记0054---EventLoop组件
  7. javascript-函数声明和函数表达式-call-apply
  8. 破解压缩包密码的正确思路原理
  9. Python selenium淘宝抢购物品程序
  10. 华容道html源码,华容道(项目源代码)
  11. IT业软件测试的男女性别差异渐趋消褪
  12. 读《微波工程(第三版)》笔记 (10:终端接负载的无耗传输线)
  13. java正则表达式练习
  14. 用户使用手册编写方法
  15. PCB电路设计的14个误区
  16. cache存储器最全详细介绍
  17. 朝花夕拾 - jsliang 基金定投 1 年分享
  18. 【调剂】211中国矿业大学(徐州)2020年硕士研究生拟调剂专业
  19. HARDENING SALT - SaltStack安全加固措施介绍
  20. asp无组件上传类的应用实例/化境HTTP上传程序

热门文章

  1. Java中的功能性FizzBu​​zz Kata
  2. moxy json介绍_MOXy作为您的JAX-RS JSON提供程序–客户端
  3. 可视化Java 9模块关系
  4. gwt格式_GWT –利弊
  5. 在AWS Elastic MapReduce上运行PageRank Hadoop作业
  6. 使用Gradle构建和应用AST转换
  7. JPA教程:实体映射-第3部分
  8. Java线程面试的前50个问题,面向初学者和经验丰富的程序员
  9. 用Spring长轮询Tomcat
  10. Hamcrest Matchers,Guava谓词和Builder设计模式