在MySQL创建表

create table day_netType_access_topn_stat (
day varchar(8) not null,
uid bigint(10) not null,
times bigint(10) not null,
primary key (day, uid)
)

查看表结构:

创建Entity

package cn.ac.iie.log/*** 每天访问次数实体类* @param day* @param uid* @param times*/
case class DayNetTypeAccessStat (day: String, uid:Long, times:Long)

创建Dao

insert

package cn.ac.iie.logimport java.sql.{Connection, PreparedStatement}import scala.collection.mutable.ListBuffer/*** 各个维度统计的DAO操作*/
object StatDao {/*** 批量保存DayVideoAccessStat到数据库** @param list*/def insertNetTypeAccessTopN(list: ListBuffer[DayNetTypeAccessStat]): Unit = {var connection: Connection = nullvar pstmt: PreparedStatement = nulltry {connection = MysqlUtils.getConnection()// 设置手动提交connection.setAutoCommit(false)val sql = "insert into day_netType_access_topn_stat (day, uid, times) values (?,?,?)"pstmt = connection.prepareStatement(sql)for (ele <- list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.uid)pstmt.setLong(3, ele.times)pstmt.addBatch()}pstmt.executeBatch() // 执行批量处理// 手动提交connection.commit()} catch {case e: Exception => e.printStackTrace()} finally {MysqlUtils.release(connection, pstmt)}}
}

这里insert数据时,最好使用批处理,提交使用batch操作,手动提交。

将数据保存到Mysql中

package cn.ac.iie.logimport org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._import scala.collection.mutable.ListBuffer/*** TopN 统计spark作业*/
object TopNStatJob {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("TopNStatJob").config("spark.sql.sources.partitionColumnTypeInference.enabled","false").master("local[2]").getOrCreate()val accessDF = spark.read.format("parquet").load("file:///E:/test/clean")
//    accessDF.printSchema()accessDF.show(false)// 最受欢迎的TopN netTypenetTypeAccessTopNStat(spark, accessDF)spark.stop}/*** 最受欢迎的TopN netType* @param spark* @param accessDF*/def netTypeAccessTopNStat(spark: SparkSession, accessDF: DataFrame): Unit = {accessDF.createOrReplaceTempView("access_logs")val wifiAccessTopNDF = spark.sql("select day,uid,count(1) as times from access_logs where day='20190702' and netType='wifi' group by day,uid order by times desc")
//    wifiAccessTopNDF.show(false)// 将统计结果写入到Mysql中try{wifiAccessTopNDF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayNetTypeAccessStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val uid = info.getAs[Long]("uid")val times = info.getAs[Long]("times")list.append(DayNetTypeAccessStat(day, uid, times))})StatDao.insertNetTypeAccessTopN(list)})} catch {case e: Exception => e.printStackTrace()}}
}

SparkSQL 将统计结果保存到Mysql相关推荐

  1. SparkStreaming读取Kafka的Json数据然后保存到MySQL

    一般我们使用SparkStreaming消费kafka数据,获取到数据后解析,使用JDBC的方式写入数据库,如下所示. 以上的方式没什么毛病,但是当我们消费的kafka数据类型比较多样的时候,我们需要 ...

  2. kafka偏移量保存到mysql里_SparkStreaming+kafka保存offset的偏移量到mysql案例

    MySQL创建存储offset的表格 mysql> use test mysql> create table hlw_offset( topic varchar(32), groupid ...

  3. mysql打印语句_大数据挖掘—(八):scrapy爬取数据保存到MySql数据库

    (大数据挖掘-(七):读懂MySql数据库操作)(大数据挖掘神器--scrapy spider爬虫框架(五):解析多层网页) 通过往期的文章分享,我们了解了如何爬取想要的数据到Items中,也了解了如 ...

  4. pandas对象保存到mysql出错提示“BLOB/TEXT column used in key specification without a key length”解决办法

    问题 将DataFrame数据保存到mysql中时,出现错误提示: BLOB/TEXT column used in key specification without a key length 原因 ...

  5. android数据库给单选赋值,如何使用android studio将单选按钮的值保存到mysql数据库?...

    我想创建一个投票应用程序,其中有不同的职位,其中有两个职位每个都有两个联系人.我希望当我选择一个人记录到数据库. [职位的disgn与它的参赛者部] 下面是XML代码的一部分:如何使用android ...

  6. mysql t 保存_检查 (调试) - 离线消息保存到 MySQL - 《EMQ X Enterprise v4.1 中文文档》 - 书栈网 · BookStack...

    离线消息保存到 MySQL 搭建 MySQL 数据库,并设置用户名密码为 root/public,以 MacOS X 为例: $ brew install mysql $ brew services ...

  7. python股票接口_Python 从 sina 股票数据接口读取数据,并保存到 MySQL 数据库

    说明 从 sina 的数据接口获取数据,之后,保存到 MySql 数据库 文件:getDataFromSina.py ''' Created on 2018年2月11日 @author: Livon ...

  8. localdate存mysql相差一天_如何在保存到mySQL数据库时阻止LocalDate更改

    使用JPA CriteriaBuilder API将LocalDate字段(例如'2017-09-27′)保存到mySQL Date列时,结果不同(例如'2017-09-26′). 我已经使用SELE ...

  9. 将labview连续数据保存到mysql数据库器

    这一篇是在之前完成Labview和mysql连接,并且进行了简单的CRUD删除的基础上来的.我们一般不会拿Labview来做学生这种数据管理系统,而是对于基本传感器数据的采集和保存,而传感器采集数据会 ...

最新文章

  1. XGBoost缺失值引发的问题及其深度分析 | CSDN博文精选
  2. 流程 - 什么是真正的Scrum?
  3. 资管机构年中规模排名出炉:中信资管规模超万亿
  4. 数字图像处理目录列表
  5. 攻防世界-Misc-stegano(巨详细.零基础)
  6. Python中必学知识点:类方法、实例方法和静态方法
  7. java 两个页面传递数据,请问Cookie怎么在两个页面间传递数据?
  8. 一个账号,防止多设备登陆
  9. IOCP中的socket错误和资源释放处理方法
  10. 拓端tecdat|R语言可视化:ggplot2冲积/桑基图sankey分析大学录取情况、泰坦尼克幸存者数据
  11. 考研数学预热(肖老师)-2019-12-21
  12. 【RS3精简版】月伴流星LiteWin10_Pro_(RS3_16299.19)精简专业版x86/x64合集2017.11
  13. Flutter入门系列-VideoPlayer在列表使用
  14. 水晶报表 文件 xxxx{354234-523432-5235-325-2523}.rpt 文件内部出错:无法加载数据解决方案
  15. 英文聊天常见地道简写
  16. PTA 水题之7-20 镜子碎了
  17. STM32CubeIDE XiP 和 BootROM介绍, XiP外部内存QSPI FLASH执行用户代码
  18. 第3章 信息系统集成专业技术知识
  19. 游戏合作伙伴专题:BreederDAO 与 Air Ballerz 达成合作
  20. 云计算在教育方面的应用

热门文章

  1. 利用System.Net.Mail 的SmtpClient发送邮件
  2. C#中NameValueCollection类用法详解
  3. java中集合的结构(list和map)
  4. windows系统如何查看端口被占用、杀进程
  5. mysql隔离级别底层实现_1、深入理解mysql四种隔离级别及底层实现原理(MVCC和锁)...
  6. Jmeter对HTTP请求压力测试、并发测试的简单使用方法
  7. mac编译php apache,在Mac OS上自行编译安装Apache服务器和PHP解释器
  8. python中的itertools_在python中使用itertools操作csv数据
  9. php函数文件,PHP文件函数大全
  10. github 修改项目为public_在GitHub上为开源项目做贡献