streaming接mysql数据库_[Spark streaming举例]-- 实时统计并且存储到mysql数据库中
举例
package com.scala.my
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Durations
import org.apache.spark.streaming.StreamingContext
/**
*
* @author root
* 测试步骤:
* 1\打开h15\h16\h17\h18,启动zookeeper,再启动hadoop集群:start-all.sh,再启动mysql
* 2\在h15上创建文件夹wordcount_checkpoint,用于docheckpoint
* 在h5上mysql的dg数据库中创建表t_word
* 3\启动eclipse的本程序,让他等待着
* 4\在h15的dos窗口下输入单词,以空格分隔的单词(需要在h15上开启端口9999:#nc -lk 9999)
* 5\查询h15上的mysql的dg数据库的t_word表是否有数据即可
*
* 注:建表语句
* mysql> show create table wordcount; //查看表语句
CREATE TABLE t_word (
id int(11) NOT NULL AUTO_INCREMENT,
updated_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
word varchar(255) DEFAULT NULL,
count int(11) DEFAULT NULL,
PRIMARY KEY (id)
);
*/
*
* 测试结果:通过,注意-----》第74行没有取得数据,原因在最后没有触发事件(封装事件),目前已经解决
*
* sh spark-submit --master spark://de2:7077 --class 全类名 --driver-class-path /mysql-connector-java-5.1.26.jar sparkstreaming.jar
sh spark-submit --class com.day6.scala.my.PresistMysqlWordCount --master yarn-cluster --driver-class-path /home/spark-1.5.1-bin-hadoop2.4/lib/mysql-connector-
java-5.1.31-bin.jar /home/spark-1.5.1-bin-hadoop2.4/sparkstreaming.jar
$bin/hadoop dfsadmin -safemode leave
也就是关闭Hadoop的安全模式,这样问题就解决了。
*/
object PresistMysqlWordCount {
def main(args: Array[String]): Unit = {
//获取streamingContext,并且设置每5秒切割一次rdd
// val sc = new StreamingContext(new SparkConf().setAppName("mysqlPresist").setMaster("local[2]"), Durations.seconds(8))
val sc = new StreamingContext(new SparkConf().setAppName("mysqlPresist").setMaster("local[2]"), Durations.seconds(8))
//设置checkpoit缓存策略
/**
* 利用 checkpoint 来保留上一个窗口的状态,
* 这样可以做到移动窗口的更新统计
*/
sc.checkpoint("hdfs://hh15:8020/wordcount_checkpoint")
// sc.checkpoint("hdfs://h15:8020/wordcount_checkpoint")
//获取doc窗口或者hdfs上的words
// val lines=sc.textFileStream("hdfs://h15:8020/文件夹名称") //实时监控hdfs文件夹下新增的数据
val lines = sc.socketTextStream("hh15", 9999)
// val lines = sc.socketTextStream("h15", 9999)
//压扁
val words = lines.flatMap { x => x.split(" ") }
//map
val paris = words.map { (_, 1) }
//定义一个函数,用于保持状态
val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
var newValue = prevValueState.getOrElse(0)
for (value wd.foreachPartition(
data => {
val conn = ConnectPool.getConn("root", "1714004716", "hh15", "dg")
// val conn = ConnectPool.getConn("root", "1714004716", "h15", "dg")
//插入数据
// conn.prepareStatement("insert into t_word2(word,num) values('tom',23)").executeUpdate()
try {
for (row
streaming接mysql数据库_[Spark streaming举例]-- 实时统计并且存储到mysql数据库中相关推荐
- Spark Streaming 实战案例(五) Spark Streaming与Kafka
主要内容 Spark Streaming与Kafka版的WordCount示例(一) Spark Streaming与Kafka版的WordCount示例(二) 1. Spark Streaming与 ...
- Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制
主要内容 Spark Stream 缓存 Checkpoint 案例 1. Spark Stream 缓存 通过前面一系列的课程介绍,我们知道DStream是由一系列的RDD构成的,它同一般的RDD一 ...
- ubuntu mysql 迁移_(最新)ubuntu20.04LTS版迁移mysql8.0数据库的方法
(最新)ubuntu20.04LTS版迁移mysql8.0数据库的方法 之前mysql通过apt安装的,运行一段时间之后,发现根分区过小,有必要将占用空间大的数据库迁移到home分区,于是,有了这篇文 ...
- runtime批处理mysql导出_【原】使用批处理BAT文件处理Mysql数据库 | 学步园
在做项目的时候,考虑到项目中很多模块是公用,数据库也是公用,所以考虑把公用模块的数据库全部用批处理生产,这样或多或少提高了一些效率. 处理方法是: 1:用一个txt保存该项目数据库名称,方便新建工程时 ...
- python搭配什么数据库_教你如何优雅地用Python连接MySQL数据库
作者 | Python语音识别 来源 | 深度学习与python(ID:PythonDC) 不管是机器学习.web开发或者爬虫,数据库都是绕不过去的.那么今天我们就来介绍Python如何Mysql数据 ...
- python 爬虫源码 selenium并存储数据库_使用pythonSelenium爬取内容并存储MySQL数据库的实例图解...
这篇文章主要介绍了python Selenium爬取内容并存储至MySQL数据库的实现代码,需要的朋友可以参考下 前面我通过一篇文章讲述了如何爬取CSDN的博客摘要等信息.通常,在使用Selenium ...
- 将image存入mysql数据库_有谁知道如何把一图片存放到mysql数据库中
root@ytt:/var/lib/mysql-files# for i in `seq 1 100`; do cp 微信图片_20190711095019.jpg "$i".jp ...
- sqluldr2支持mysql吗_如何使用Sqluldr2将Oracle数据直接导入其它数据库
展开全部 配置详细信息 转载 在本文提供的示例中,源服务器636f707962616964757a686964616f31333337393530配置有控制域和一个托管 Oracle 数据库的来宾域( ...
- python爬取前程无忧招聘网站数据搭建Hadoop、Flume、Kafka、Spark用Hive做数据分析Sqoop存储到Mysql并实现可视化
文章目录 一.项目总体要求 二.环境搭建 1.安装包准备 2.安装jdk (1)查询是否安装java (2)卸载jdk (3)安装jdk (4)配置jdk环境变量 3.配置ssh免密登录 (1)进入到 ...
最新文章
- 机器学习算法与Python实践之(二)支持向量机(SVM)初
- Github 代码上边的Raw、Blame、History是啥意思?
- C# 算法题系列(一) 两数之和、无重复字符的最长子串
- Floyd —Warshall(最短路及其他用法详解)
- 端到端的地址翻译(虚拟地址是怎样取到相应高速缓存的数据的?)
- Softmax(假神经网络)与词向量的训练
- 斯坦福大学#深度多任务学习与元学习#视频及讲义下载
- 外媒:已有5家芯片厂商获准继续向华为供货
- 东西湖职业技术学校计算机专业怎么样,武汉东西湖职业技术学校怎么样
- 测试报告包含哪些内容?
- 如何在Excel里输入能打钩的选择框?
- uni-app - 改变 <switch> 组件大小(开关太大)
- python爬虫网页崩溃怎么处理_python程序爬虫总是崩溃
- nodejs ffi调用C++dll动态库 ffi调用语法
- 企业债拟引入大数据强化信用约束
- 三国志战略版360区S4服务器合并信息,三国志战略版s3赛季服务器合并与规则一览...
- 报名网站html代码,考试报名系统 附源码
- AlphaFold2源码解析(4)--模型架构
- MATLAB与STK互联29:仿真案例4—GEO赋形波束示例(Executecommand的一些用法、Sensor指向的设置)
- 对于5G时代的出现IT行业的前景怎么样!
热门文章
- java is start_PHP IntlChar::isJavaIDStart()用法及代码示例
- php使用七牛直播,七牛上传文件,PHP版本
- oracle证书洛阳,ORACLE手工建库
- python期中考试知识点_大学期末考试,有哪些高效复习的技巧?
- docker gitlab-ce
- Nexus 3.31.1 maven 私服 仓库和IntelliJ IDEA 2021.2 实战篇 linux
- Vue项目npm打包推荐方式
- Excel VBA 处理图形图表详解
- JavaScript-jQuery事件
- root - 计算机术语,root什么意思