package SparkDemoimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object DStreamToMySQL {//定义更新函数def updateFunc(newValues : Seq[Int],state :Option[Int]):Option[Int] = {val currentCount = newValues.foldLeft(0)(_+_)val  previousCount = state.getOrElse(0)Some(currentCount+previousCount)}def main(args : Array[String]): Unit ={//建立SparkStreamval conf = new SparkConf().setAppName("DStreamToMySQL")val ssc = new StreamingContext(conf,Seconds(1))//设置日志等级StreamingLoggingExample.setStreamingLogLevels()val lines = ssc.textFileStream("/tmp/yuhang.zhang/data")val words = lines.flatMap(_.split(" "))val pairWord = words.map((_,1))//累计更新val stateWordCount = pairWord.updateStateByKey[Int](updateFunc)//将stateWordCount存入数据库//stateWordCount中包含一堆的Rdd//我们需要对每个Rdd中的每条数据进行处理储存stateWordCount.foreachRDD(rdd => {//每个rdd中包含的数据类型为(String,Int)//我们把所有数据records定义为Iterator类型,方便我们遍历def func(records:Iterator[(String,Int)]): Unit ={//注意,conn和stmt定义为var不能是valvar conn: Connection = nullvar stmt : PreparedStatement = nulltry{//连接数据库val url = "jdbc:mysql://localhost:3306/spark" //地址+数据库val user = "root"val password = ""conn = DriverManager.getConnection(url,user,password)//records.foreach(p =>{//wordcount为表名,word和count为要插入数据的属性//插入数据val sql = "insert into wordcount(word,count) values(?,?)"stmt = conn.prepareStatement(sql)stmt.setString(1,p._1.trim)stmt.setInt(2,p._2.toInt)stmt.executeUpdate()})}catch {case e : Exception => e.printStackTrace()}finally {if(stmt != null)stmt.close()if(conn != null)conn.close()}}val repairtitionedRDD = rdd.repartition(3)//将每个rdd重新分区repairtitionedRDD.foreachPartition(func)//对重新分区后的rdd执行func函数})ssc.start()//启动ssc.awaitTermination()//等待终止命令}}

  

转载于:https://www.cnblogs.com/zzhangyuhang/p/9075992.html

【sparkStreaming】将DStream保存在MySQL相关推荐

  1. inputdstream mysql_【sparkStreaming】将DStream保存在MySQL

    package SparkDemo import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.s ...

  2. SparkStreaming读取Kafka的Json数据然后保存到MySQL

    一般我们使用SparkStreaming消费kafka数据,获取到数据后解析,使用JDBC的方式写入数据库,如下所示. 以上的方式没什么毛病,但是当我们消费的kafka数据类型比较多样的时候,我们需要 ...

  3. kafka偏移量保存到mysql里_SparkStreaming+kafka保存offset的偏移量到mysql案例

    MySQL创建存储offset的表格 mysql> use test mysql> create table hlw_offset( topic varchar(32), groupid ...

  4. 用java程序完成从kafka队列读取消息到sparkstreaming再从sparkstreaming里把数据导入mysql中

    有一段时间没好好写博客了,因为一直在做一个比较小型的工程项目,也常常用在企业里,就是将流式数据处理收集,再将这些流式数据进行一些计算以后再保存在mysql上,这是一套比较完整的流程,并且可以从数据库中 ...

  5. 2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream

    目录 SparkStreaming数据抽象-DStream DStream 是什么 DStream Operations Transformation Output函数 SparkStreaming数 ...

  6. mysql打印语句_大数据挖掘—(八):scrapy爬取数据保存到MySql数据库

    (大数据挖掘-(七):读懂MySql数据库操作)(大数据挖掘神器--scrapy spider爬虫框架(五):解析多层网页) 通过往期的文章分享,我们了解了如何爬取想要的数据到Items中,也了解了如 ...

  7. pandas对象保存到mysql出错提示“BLOB/TEXT column used in key specification without a key length”解决办法

    问题 将DataFrame数据保存到mysql中时,出现错误提示: BLOB/TEXT column used in key specification without a key length 原因 ...

  8. SparkSQL 将统计结果保存到Mysql

    在MySQL创建表 create table day_netType_access_topn_stat ( day varchar(8) not null, uid bigint(10) not nu ...

  9. android数据库给单选赋值,如何使用android studio将单选按钮的值保存到mysql数据库?...

    我想创建一个投票应用程序,其中有不同的职位,其中有两个职位每个都有两个联系人.我希望当我选择一个人记录到数据库. [职位的disgn与它的参赛者部] 下面是XML代码的一部分:如何使用android ...

最新文章

  1. 关于枚举概念的理解以及存在意义
  2. mongodb数据库扩展名_MongoDB学习笔记:MongoDB 数据库的命名、设计规范
  3. 工作没有挑战性,怎么办?
  4. unity3d优化总结篇
  5. map传参上下文赋值的问题
  6. java也可以做黑客?
  7. 【编译打包】haproxy 1.4.23
  8. qt中使窗口的大小随窗口的内容大小进行调整
  9. 华为手机解锁码计算工具_一部华为手机解锁无数翻译,你浪费了此功能吗?
  10. java tempfile read_Java资源作为文件
  11. joblib 读取模型后对单条数据做预测并解决Reshape your data either using array报错
  12. 程序发布出现: 服务器无法处理请求---无法生成临时类(result = 1)。 错误CS2001:未能找到源文件“C:\ Windows \ TEMP \ lph54vwf.0.cs”...
  13. Linux文件系统(七)---系统调用之open操作(三) 之 open_namei函数
  14. ubuntu安装wechat
  15. spyder 5语言设置简体中文
  16. 骚操作:Mac局域网控制Windows10主机
  17. Thinkphp5 php会员实现单点登录
  18. 取小数点后两位(解析)
  19. 三运放差分放大电路分析_★三运放差分放大电路
  20. cell数据如何删除重复项

热门文章

  1. python--常用模块:collections 、time、random
  2. PHP CURL 中文说明
  3. 解决Failed to load class org.slf4j.impl.StaticLoggerBinder
  4. ASP.NET MVC过滤器(一)
  5. [转]Entity Framework走马观花之把握全局
  6. python iot平台_Python MQTT连接到Azure Iot中心
  7. leetcode算法题--从上到下打印二叉树
  8. Docker容器启动自动化脚本(五)
  9. JAVA黑白圆圈图形_CSS3 黑白交替旋转圆圈
  10. python 自动记录时间_python自动化之时间