举例

package com.scala.my

import org.apache.spark.SparkConf

import org.apache.spark.streaming.Durations

import org.apache.spark.streaming.StreamingContext

/**

*

* @author root

* 测试步骤:

*    1\打开h15\h16\h17\h18,启动zookeeper,再启动hadoop集群:start-all.sh,再启动mysql

*    2\在h15上创建文件夹wordcount_checkpoint,用于docheckpoint

*       在h5上mysql的dg数据库中创建表t_word

*    3\启动eclipse的本程序,让他等待着

*    4\在h15的dos窗口下输入单词,以空格分隔的单词(需要在h15上开启端口9999:#nc -lk 9999)

*    5\查询h15上的mysql的dg数据库的t_word表是否有数据即可

*

* 注:建表语句

*     mysql> show create table wordcount;  //查看表语句

CREATE TABLE   t_word (

id  int(11) NOT NULL AUTO_INCREMENT,

updated_time  timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

word varchar(255) DEFAULT NULL,

count  int(11) DEFAULT NULL,

PRIMARY KEY (id)

);

*/

*

* 测试结果:通过,注意-----》第74行没有取得数据,原因在最后没有触发事件(封装事件),目前已经解决

*

* sh spark-submit --master spark://de2:7077 --class 全类名 --driver-class-path /mysql-connector-java-5.1.26.jar  sparkstreaming.jar

sh spark-submit --class com.day6.scala.my.PresistMysqlWordCount --master yarn-cluster --driver-class-path /home/spark-1.5.1-bin-hadoop2.4/lib/mysql-connector-

java-5.1.31-bin.jar /home/spark-1.5.1-bin-hadoop2.4/sparkstreaming.jar

$bin/hadoop dfsadmin -safemode leave

也就是关闭Hadoop的安全模式,这样问题就解决了。

*/

object PresistMysqlWordCount {

def main(args: Array[String]): Unit = {

//获取streamingContext,并且设置每5秒切割一次rdd

//    val sc = new StreamingContext(new SparkConf().setAppName("mysqlPresist").setMaster("local[2]"), Durations.seconds(8))

val sc = new StreamingContext(new SparkConf().setAppName("mysqlPresist").setMaster("local[2]"), Durations.seconds(8))

//设置checkpoit缓存策略

/**

* 利用 checkpoint 来保留上一个窗口的状态,

* 这样可以做到移动窗口的更新统计

*/

sc.checkpoint("hdfs://hh15:8020/wordcount_checkpoint")

//    sc.checkpoint("hdfs://h15:8020/wordcount_checkpoint")

//获取doc窗口或者hdfs上的words

//    val lines=sc.textFileStream("hdfs://h15:8020/文件夹名称")  //实时监控hdfs文件夹下新增的数据

val lines = sc.socketTextStream("hh15", 9999)

//    val lines = sc.socketTextStream("h15", 9999)

//压扁

val words = lines.flatMap { x => x.split(" ") }

//map

val paris = words.map { (_, 1) }

//定义一个函数,用于保持状态

val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {

var newValue = prevValueState.getOrElse(0)

for (value wd.foreachPartition(

data => {

val conn = ConnectPool.getConn("root", "1714004716", "hh15", "dg")

//        val conn = ConnectPool.getConn("root", "1714004716", "h15", "dg")

//插入数据

//        conn.prepareStatement("insert into t_word2(word,num) values('tom',23)").executeUpdate()

try {

for (row

streaming接mysql数据库_[Spark streaming举例]-- 实时统计并且存储到mysql数据库中相关推荐

  1. Spark Streaming 实战案例(五) Spark Streaming与Kafka

    主要内容 Spark Streaming与Kafka版的WordCount示例(一) Spark Streaming与Kafka版的WordCount示例(二) 1. Spark Streaming与 ...

  2. Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制

    主要内容 Spark Stream 缓存 Checkpoint 案例 1. Spark Stream 缓存 通过前面一系列的课程介绍,我们知道DStream是由一系列的RDD构成的,它同一般的RDD一 ...

  3. ubuntu mysql 迁移_(最新)ubuntu20.04LTS版迁移mysql8.0数据库的方法

    (最新)ubuntu20.04LTS版迁移mysql8.0数据库的方法 之前mysql通过apt安装的,运行一段时间之后,发现根分区过小,有必要将占用空间大的数据库迁移到home分区,于是,有了这篇文 ...

  4. runtime批处理mysql导出_【原】使用批处理BAT文件处理Mysql数据库 | 学步园

    在做项目的时候,考虑到项目中很多模块是公用,数据库也是公用,所以考虑把公用模块的数据库全部用批处理生产,这样或多或少提高了一些效率. 处理方法是: 1:用一个txt保存该项目数据库名称,方便新建工程时 ...

  5. python搭配什么数据库_教你如何优雅地用Python连接MySQL数据库

    作者 | Python语音识别 来源 | 深度学习与python(ID:PythonDC) 不管是机器学习.web开发或者爬虫,数据库都是绕不过去的.那么今天我们就来介绍Python如何Mysql数据 ...

  6. python 爬虫源码 selenium并存储数据库_使用pythonSelenium爬取内容并存储MySQL数据库的实例图解...

    这篇文章主要介绍了python Selenium爬取内容并存储至MySQL数据库的实现代码,需要的朋友可以参考下 前面我通过一篇文章讲述了如何爬取CSDN的博客摘要等信息.通常,在使用Selenium ...

  7. 将image存入mysql数据库_有谁知道如何把一图片存放到mysql数据库中

    root@ytt:/var/lib/mysql-files# for i in `seq 1 100`; do cp 微信图片_20190711095019.jpg "$i".jp ...

  8. sqluldr2支持mysql吗_如何使用Sqluldr2将Oracle数据直接导入其它数据库

    展开全部 配置详细信息 转载 在本文提供的示例中,源服务器636f707962616964757a686964616f31333337393530配置有控制域和一个托管 Oracle 数据库的来宾域( ...

  9. python爬取前程无忧招聘网站数据搭建Hadoop、Flume、Kafka、Spark用Hive做数据分析Sqoop存储到Mysql并实现可视化

    文章目录 一.项目总体要求 二.环境搭建 1.安装包准备 2.安装jdk (1)查询是否安装java (2)卸载jdk (3)安装jdk (4)配置jdk环境变量 3.配置ssh免密登录 (1)进入到 ...

最新文章

  1. 机器学习算法与Python实践之(二)支持向量机(SVM)初
  2. Github 代码上边的Raw、Blame、History是啥意思?
  3. C# 算法题系列(一) 两数之和、无重复字符的最长子串
  4. Floyd —Warshall(最短路及其他用法详解)
  5. 端到端的地址翻译(虚拟地址是怎样取到相应高速缓存的数据的?)
  6. Softmax(假神经网络)与词向量的训练
  7. 斯坦福大学#深度多任务学习与元学习#视频及讲义下载
  8. 外媒:已有5家芯片厂商获准继续向华为供货
  9. 东西湖职业技术学校计算机专业怎么样,武汉东西湖职业技术学校怎么样
  10. 测试报告包含哪些内容?
  11. 如何在Excel里输入能打钩的选择框?
  12. uni-app - 改变 <switch> 组件大小(开关太大)
  13. python爬虫网页崩溃怎么处理_python程序爬虫总是崩溃
  14. nodejs ffi调用C++dll动态库 ffi调用语法
  15. 企业债拟引入大数据强化信用约束
  16. 三国志战略版360区S4服务器合并信息,三国志战略版s3赛季服务器合并与规则一览...
  17. 报名网站html代码,考试报名系统 附源码
  18. AlphaFold2源码解析(4)--模型架构
  19. MATLAB与STK互联29:仿真案例4—GEO赋形波束示例(Executecommand的一些用法、Sensor指向的设置)
  20. 对于5G时代的出现IT行业的前景怎么样!

热门文章

  1. java is start_PHP IntlChar::isJavaIDStart()用法及代码示例
  2. php使用七牛直播,七牛上传文件,PHP版本
  3. oracle证书洛阳,ORACLE手工建库
  4. python期中考试知识点_大学期末考试,有哪些高效复习的技巧?
  5. docker gitlab-ce
  6. Nexus 3.31.1 maven 私服 仓库和IntelliJ IDEA 2021.2 实战篇 linux
  7. Vue项目npm打包推荐方式
  8. Excel VBA 处理图形图表详解
  9. JavaScript-jQuery事件
  10. root - 计算机术语,root什么意思