基于持久化的wordCount程序!中途遇到了一个坑!
自己手动封装一个静态线程池,使用RDD的foreachPartition操作,并且在该操作内部,从静态连接池中,通过静态方法,获取一个连接,使用之后再换回来,这样的话,可以在对个RDD的partition之间,也可以复用连接了,而且可以让连接池采取懒创建的策略,并且空闲一段时间后,将其释放掉。
代码:
package com.bynear.spark_Streaming;

import com.bynear.tool.ConnectionPool;
import com.google.common.base.Optional;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;/* 2018/5/16* 11:30* 基于持久化的wordcount程序*/
public class PersisWordCount {public static void main(String[] args) {final SparkConf conf = new SparkConf().setAppName("persiswordcount").setMaster("local[2]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(5));jssc.checkpoint("hdfs://Spark01:9000/zjs/chepoint");JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterable<String> call(String line) throws Exception {return Arrays.asList(line.split(" "));}});JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word, 1);}});final JavaPairDStream<String, Integer> wordcount = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {@Overridepublic Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {Integer newValue = 0;if (state.isPresent()) {newValue = state.get();}for (Integer value : values) {newValue += value;}return Optional.of(newValue);}});wordcount.foreachRDD(new Function<JavaPairRDD<String, Integer>, Void>() {@Overridepublic Void call(JavaPairRDD<String, Integer> wordCountsRDD) throws Exception {wordCountsRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() {@Overridepublic void call(Iterator<Tuple2<String, Integer>> wordcounts) throws Exception {Connection conn = ConnectionPool.getConection();Tuple2<String, Integer> wordcount = null;while (wordcounts.hasNext()) {wordcount = wordcounts.next();String sql = "insert into word (word,count) values ('" + wordcount._1 + "'," + wordcount._2 + ")";System.out.println(sql+conn+"YES");Statement stmt = conn.createStatement();stmt.executeUpdate(sql);}ConnectionPool.returnConnection(conn);}});return null;}});jssc.start();jssc.awaitTermination();jssc.stop();}
}

手动搭建的线程池

package com.bynear.tool;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;
/*** 2018/5/16* 12:24*/
public class ConnectionPool {//    静态的Connection队列public static LinkedList<Connection> connectionQueue;//      加载驱动static {try {Class.forName("com.mysql.jdbc.Driver");} catch (ClassNotFoundException e) {e.printStackTrace();}}//    获取连接,多线程访问并发控制public synchronized static Connection getConection() {connectionQueue = new LinkedList<Connection>();try {if (connectionQueue.isEmpty()) {for (int i = 0; i < 2; i++) {Connection conn = DriverManager.getConnection("jdbc:mysql://192.168.2.10:3306/testdb","root", "123456");connectionQueue.push(conn);}}} catch (Exception e) {e.printStackTrace();}return connectionQueue.poll();}public static void returnConnection(Connection conn) {connectionQueue.push(conn);}
}

最开始自己搭建的线程池中,用的方法为
if (connectionQueue==null) {
for (int i = 0; i < 2; i++) {
Connection conn = DriverManager.getConnection(“jdbc:mysql://192.168.2.10:3306/testdb”,
“root”, “123456”);
connectionQueue.push(conn);
}
}
将代码提交到集群上时,一直抱空指指针。
后来 System.out.println(sql+conn+”YES”);输出一下conn
conn = ConnectionPool.getConection();
insert into wordcount (word,count) values (‘heool,word’,1)nullYES 为null

跑成功代码:
if (connectionQueue.isEmpty()) {
for (int i = 0; i < 2; i++) {
Connection conn = DriverManager.getConnection(“jdbc:mysql://192.168.2.10:3306/testdb”,
“root”, “123456”);
connectionQueue.push(conn);
}
}
输出结果:在SQL中查询:
mysql> select * from word;
+—-+———————+————+——-+
| id | updated_time | word | count |
+—-+———————+————+——-+
| 1 | 2018-05-16 01:11:10 | ???,?? | 1 |
| 2 | 2018-05-16 01:11:15 | ???,?? | 1 |
| 3 | 2018-05-16 01:13:00 | hello,word | 1 |
| 4 | 2018-05-16 01:16:00 | hello | 1 |
| 5 | 2018-05-16 01:16:00 | word | 1 |
| 6 | 2018-05-16 01:16:05 | hello | 1 |
| 7 | 2018-05-16 01:16:05 | word | 1 |
+—-+———————+————+——-+
7 rows in set (0.00 sec)
完美成功!!!!

基于持久化的wordcount程序 foreachRDD相关推荐

  1. (附源码)基于springboot微信小程序的长沙县图书馆图书导览系统 毕业设计 170900

    基于springboot微信小程序的长沙县图书馆图书导览系统 摘  要 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也 ...

  2. 一个wordcount程序轻松玩转MapReduce编程模型

    可以毫不夸张的说,几乎开发中绝大部分的MR程序都是基于wordcount编程模型而来,或者说用wordcount变化而来(改变的主要是业务方面的逻辑).所以,熟练掌握wordcount编程模型,是掌握 ...

  3. [转]Android 和 iPhone 浏览器之战,第 2 部分: 为 iPhone 和 Android 构建基于浏览器的应用程序...

    简介: 本文是共两部分的系列文章 "Android 和 iPhone 浏览器之战" 的第 2 部分,主要关注为 iPhone 和 Android 开发基于浏览器的应用程序.在第 1 ...

  4. 【ReactiveX】基于Golang pmlpml/RxGo程序包的二次开发

    基于Golang pmlpml/RxGo程序包的二次开发[阅读时间:约20分钟] 一.ReactiveX & RxGo介绍 1.ReactiveX 2.RxGo 二.系统环境&项目介绍 ...

  5. java聊天程序步骤解析_java网络之基于UDP的聊天程序示例解析

    基于UDP的Socket通信 UDP协议不是一种基于稳定连接的协议,是一种面向数据报包的通信协议,不需要通信双方建立稳定的连接,也没有所谓服务端和客户的概念,数据报包在传输的时候不保证一定及时到达,也 ...

  6. 基于Python分析深圳程序员工资有多高?

    基于Python分析深圳程序员工资有多高? 概述 前言 统计结果 爬虫技术分析 爬虫代码实现 爬虫分析实现 后记 前言 多图预警.多图预警.多图预警.校招季,毕业也多,跳槽也多.我们的职业发展还是要顺 ...

  7. 命令行运行hadoop实例wordcount程序

    参考1:http://www.cnblogs.com/flying5/archive/2011/05/04/2078408.html 需要说明的有以下几点. 1.如果wordcount程序不含层次,即 ...

  8. 基于富盛SBO程序开发框架的自动序列生成器

    很多时候,我们期望系统中的某些关键字段可以按照我们期望的条件自动生成,比如,员工代码,商品序列号,商品条形码等.的确,针对个体业务,要实现这种自动生成方法很多,那么是否可以实现可以支持所有业务的序列生 ...

  9. 从安装Kafka服务到运行WordCount程序

    之所以写这篇文章,是因为Kafka初学的同学在了解了Kafka的基本原理之后,希望在自己的机器上面运行最简单的wordCount的时候,从开始安装Kafka到找到合适的example源码最后到成功运行 ...

最新文章

  1. stream流对象的理解及使用
  2. 图解HTTP学习笔记
  3. 把一个英语句子中的单词次序颠倒后输出。例如输入“how are you”,输出“you are how”;...
  4. 吉林推出百项政策扩开放
  5. springmvc 组合注解
  6. 【专升本计算机】甘肃省普通高等学校专升本考试计算机全真模拟试卷(一)
  7. 前端js 实现文件下载
  8. centos mysql导出数据库命令_在centos(linux)下用命令导出mysql数据库数据
  9. 无需在数据集上学习和预训练,这种图像修复新方法效果惊人 | 论文
  10. 使用python下载加密的流媒体m3u8视频文件,获取电影资源
  11. 中学生怎样学计算机编程6,中学生学电脑编程有什么好处
  12. 在html5中加下划线的方式,怎么给文字插入下划线?
  13. 欧几里得(Euclid)算法
  14. 激光雷达与组合惯导联合标定--方案二(matlab)
  15. 隐私保护和数据安全:区块链的隐私问题、零钞:基于zkSNARK的完美混币池、Hawk:保护合约数据私密性、Coco框架、Baby Zoe
  16. [矩阵论] Unit 1. 线性空间与线性变换 - 知识点整理
  17. 全球及中国InGaAs APD阵列行业研究及十四五规划分析报告
  18. flutter 学习资源汇总
  19. 通俗简单讲解贝叶斯原理,并python实现贝叶斯分类代码
  20. CentOS官网下载iso镜像(官网备份版)

热门文章

  1. cg word List 1
  2. 6-23 分离链接法的删除操作函数 (20 分)
  3. Java String类型变量的比较问题
  4. P5502 [JSOI2015]最大公约数(gcd性质/min性质/分治)
  5. Educational Codeforces Round 81 (Rated for Div. 2) F.Good Contest \ 洛谷 划艇 组合 计数dp
  6. P1020 导弹拦截(n*log n时间的最长上升子序列思想)
  7. bzoj#3456. 城市规划
  8. KMP Trie 例题讲解
  9. 一二三系列之优先队列、st表——Battle,Heapsort,A Magic Lamp
  10. [贪心专题]CF549G,CF351E,CF226D,CF1276C,CF1148E,CF798D