inputdstream mysql_【sparkStreaming】将DStream保存在MySQL
package SparkDemo
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object DStreamToMySQL {
//定义更新函数
def updateFunc(newValues : Seq[Int],state :Option[Int]):Option[Int] = {
val currentCount = newValues.foldLeft(0)(_+_)
val previousCount = state.getOrElse(0)
Some(currentCount+previousCount)
}
def main(args : Array[String]): Unit ={
//建立SparkStream
val conf = new SparkConf().setAppName("DStreamToMySQL")
val ssc = new StreamingContext(conf,Seconds(1))
//设置日志等级
StreamingLoggingExample.setStreamingLogLevels()
val lines = ssc.textFileStream("/tmp/yuhang.zhang/data")
val words = lines.flatMap(_.split(" "))
val pairWord = words.map((_,1))
//累计更新
val stateWordCount = pairWord.updateStateByKey[Int](updateFunc)
//将stateWordCount存入数据库
//stateWordCount中包含一堆的Rdd
//我们需要对每个Rdd中的每条数据进行处理储存
stateWordCount.foreachRDD(rdd => {
//每个rdd中包含的数据类型为(String,Int)
//我们把所有数据records定义为Iterator类型,方便我们遍历
def func(records:Iterator[(String,Int)]): Unit ={
//注意,conn和stmt定义为var不能是val
var conn: Connection = null
var stmt : PreparedStatement = null
try{
//连接数据库
val url = "jdbc:mysql://localhost:3306/spark" //地址+数据库
val user = "root"
val password = ""
conn = DriverManager.getConnection(url,user,password)
//
records.foreach(p =>{
//wordcount为表名,word和count为要插入数据的属性
//插入数据
val sql = "insert into wordcount(word,count) values(?,?)"
stmt = conn.prepareStatement(sql)
stmt.setString(1,p._1.trim)
stmt.setInt(2,p._2.toInt)
stmt.executeUpdate()
})
}catch {
case e : Exception => e.printStackTrace()
}finally {
if(stmt != null)
stmt.close()
if(conn != null)
conn.close()
}
}
val repairtitionedRDD = rdd.repartition(3)//将每个rdd重新分区
repairtitionedRDD.foreachPartition(func)//对重新分区后的rdd执行func函数
})
ssc.start()//启动
ssc.awaitTermination()//等待终止命令
}
}
inputdstream mysql_【sparkStreaming】将DStream保存在MySQL相关推荐
- 【sparkStreaming】将DStream保存在MySQL
package SparkDemoimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spa ...
- SparkStreaming读取Kafka的Json数据然后保存到MySQL
一般我们使用SparkStreaming消费kafka数据,获取到数据后解析,使用JDBC的方式写入数据库,如下所示. 以上的方式没什么毛病,但是当我们消费的kafka数据类型比较多样的时候,我们需要 ...
- 批量保存到mysql_关于保存批量数据进入mysql
提出的要求: 生成13位纯数字的卡号与8位纯数字的卡密,要求卡号与卡密都必须全表唯一,然后保存到mysql. 思路: 1.首先mysql中将这两个字段设置唯一索引,保证这两个字段的值在该表中是唯一存在 ...
- python hive mysql_[7] 编写Python脚本将Hive的运算结果保存到MySQL数据库中(1) - 摩西莫西 - ITeye技术网站...
编写Python脚本将Hive的运算结果保存到MySQL数据库中(1) 很多情况下,需要将Hive中的运算结果保存到MySQL数据库中,可以通过简单的Python脚本来实现. 例子1:如果获取Hive ...
- 用java程序完成从kafka队列读取消息到sparkstreaming再从sparkstreaming里把数据导入mysql中
有一段时间没好好写博客了,因为一直在做一个比较小型的工程项目,也常常用在企业里,就是将流式数据处理收集,再将这些流式数据进行一些计算以后再保存在mysql上,这是一套比较完整的流程,并且可以从数据库中 ...
- kafka偏移量保存到mysql里_SparkStreaming+kafka保存offset的偏移量到mysql案例
MySQL创建存储offset的表格 mysql> use test mysql> create table hlw_offset( topic varchar(32), groupid ...
- 2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream
目录 SparkStreaming数据抽象-DStream DStream 是什么 DStream Operations Transformation Output函数 SparkStreaming数 ...
- mysql打印语句_大数据挖掘—(八):scrapy爬取数据保存到MySql数据库
(大数据挖掘-(七):读懂MySql数据库操作)(大数据挖掘神器--scrapy spider爬虫框架(五):解析多层网页) 通过往期的文章分享,我们了解了如何爬取想要的数据到Items中,也了解了如 ...
- pandas对象保存到mysql出错提示“BLOB/TEXT column used in key specification without a key length”解决办法
问题 将DataFrame数据保存到mysql中时,出现错误提示: BLOB/TEXT column used in key specification without a key length 原因 ...
最新文章
- 几个书本上不常见到的C语言函数
- 湖南大学超级计算机中心 舒教授,湖南大学岳麓书院哲学系舒远招教授应邀来我院讲学...
- js添加keyword让搜索引擎能够搜到_搜索引擎优化考试培训任务书模板 ——XX网站SEO优化方案...
- SpringCloud微框架系列整体模块梳理
- js 节点 选中ztree_zTree使用
- Netty工作笔记0054---EventLoop组件
- javascript-函数声明和函数表达式-call-apply
- 破解压缩包密码的正确思路原理
- Python selenium淘宝抢购物品程序
- 华容道html源码,华容道(项目源代码)
- IT业软件测试的男女性别差异渐趋消褪
- 读《微波工程(第三版)》笔记 (10:终端接负载的无耗传输线)
- java正则表达式练习
- 用户使用手册编写方法
- PCB电路设计的14个误区
- cache存储器最全详细介绍
- 朝花夕拾 - jsliang 基金定投 1 年分享
- 【调剂】211中国矿业大学(徐州)2020年硕士研究生拟调剂专业
- HARDENING SALT - SaltStack安全加固措施介绍
- asp无组件上传类的应用实例/化境HTTP上传程序
热门文章
- Java中的功能性FizzBu​​zz Kata
- moxy json介绍_MOXy作为您的JAX-RS JSON提供程序–客户端
- 可视化Java 9模块关系
- gwt格式_GWT –利弊
- 在AWS Elastic MapReduce上运行PageRank Hadoop作业
- 使用Gradle构建和应用AST转换
- JPA教程:实体映射-第3部分
- Java线程面试的前50个问题,面向初学者和经验丰富的程序员
- 用Spring长轮询Tomcat
- Hamcrest Matchers,Guava谓词和Builder设计模式