Java hdfs连接池_Java使用连接池管理Hdfs连接
记录一下Java API 连接hadoop操作hdfs的实现流程(使用连接池管理)。
以前做过这方面的开发,本来以为不会有什么问题,但是做的还是坑坑巴巴,内心有些懊恼,记录下这烦人的过程,警示自己切莫眼高手低!
一:引入相关jar包如下
org.apache.hadoop
hadoop-common
2.8.2
org.apache.hadoop
hadoop-hdfs
2.8.2
org.apache.commons
commons-pool2
2.6.0
二:连接池开发的基本流程
2.1项目基本环境是SpringBoot大集成···
2.2hadoop相关包结构如下(自己感觉这结构划分的也是凸显了low逼水平【手动笑哭】)
2.2 画个图表达下开发思路
三、上代码
importcom.cmcc.datacenter.hdfs.client.HdfsClient;importcom.cmcc.datacenter.hdfs.client.HdfsFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;
@Configurationpublic classHdfsConfig {
@Value("${hadoop.hdfs.ip}")privateString hdfsServerIp;
@Value("${hadoop.hdfs.port}")privateString hdfsServerPort;
@Value("${hadoop.hdfs.pool.maxTotal}")private intmaxTotal;
@Value("${hadoop.hdfs.pool.maxIdle}")private intmaxIdle;
@Value("${hadoop.hdfs.pool.minIdle}")private intminIdle;
@Value("${hadoop.hdfs.pool.maxWaitMillis}")private intmaxWaitMillis;
@Value("${hadoop.hdfs.pool.testWhileIdle}")private booleantestWhileIdle;
@Value("${hadoop.hdfs.pool.minEvictableIdleTimeMillis}")private long minEvictableIdleTimeMillis = 60000;
@Value("${hadoop.hdfs.pool.timeBetweenEvictionRunsMillis}")private long timeBetweenEvictionRunsMillis = 30000;
@Value("${hadoop.hdfs.pool.numTestsPerEvictionRun}")private int numTestsPerEvictionRun = -1;
@Bean(initMethod= "init", destroyMethod = "stop")publicHdfsClient HdfsClient(){
HdfsClient client= newHdfsClient();returnclient;
}/*** TestWhileConfig - 在空闲时检查有效性, 默认false
* MinEvictableIdleTimeMillis - 逐出连接的最小空闲时间
* TimeBetweenEvictionRunsMillis - 逐出扫描的时间间隔(毫秒) 如果为负数则不运行逐出线程,默认-1
* NumTestsPerEvictionRun - 每次逐出检查时 逐出的最大数目
**/@BeanpublicHdfsPoolConfig HdfsPoolConfig(){
HdfsPoolConfig hdfsPoolConfig= newHdfsPoolConfig();
hdfsPoolConfig.setTestWhileIdle(testWhileIdle);
hdfsPoolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
hdfsPoolConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
hdfsPoolConfig.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
hdfsPoolConfig.setMaxTotal(maxTotal);
hdfsPoolConfig.setMaxIdle(maxIdle);
hdfsPoolConfig.setMinIdle(minIdle);
hdfsPoolConfig.setMaxWaitMillis(maxWaitMillis);returnhdfsPoolConfig;
}
@BeanpublicHdfsFactory HdfsFactory(){return new HdfsFactory("hdfs://" + hdfsServerIp + ":" +hdfsServerPort);
}
}
importorg.apache.commons.pool2.impl.GenericObjectPoolConfig;public class HdfsPoolConfig extendsGenericObjectPoolConfig {publicHdfsPoolConfig(){}/*** TestWhileConfig - 在空闲时检查有效性, 默认false
* MinEvictableIdleTimeMillis - 逐出连接的最小空闲时间
* TimeBetweenEvictionRunsMillis - 逐出扫描的时间间隔(毫秒) 如果为负数则不运行逐出线程,默认-1
* NumTestsPerEvictionRun - 每次逐出检查时 逐出的最大数目
**/
public HdfsPoolConfig(boolean testWhileIdle, long minEvictableIdleTimeMillis, long timeBetweenEvictionRunsMillis, intnumTestsPerEvictionRun){this.setTestWhileIdle(testWhileIdle);this.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);this.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);this.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
}
}
packagecom.cmcc.datacenter.hdfs.client;importcom.cmcc.datacenter.hdfs.config.HdfsPoolConfig;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importjava.util.List;public classHdfsClient {private Logger logger = LoggerFactory.getLogger(this.getClass());privateHdfsPool hdfsPool;
@AutowiredprivateHdfsPoolConfig hdfsPoolConfig;
@AutowiredprivateHdfsFactory hdfsFactory;public voidinit(){
hdfsPool= newHdfsPool(hdfsFactory,hdfsPoolConfig);
}public voidstop(){
hdfsPool.close();
}public long getPathSize(String path) throwsException {
Hdfs hdfs= null;try{
hdfs=hdfsPool.borrowObject();returnhdfs.getContentSummary(path).getLength();
}catch(Exception e) {
logger.error("[HDFS]获取路径大小失败", e);throwe;
}finally{if (null !=hdfs) {
hdfsPool.returnObject(hdfs);
}
}
}public ListgetBasePath(){
Hdfs hdfs= null;try{
hdfs=hdfsPool.borrowObject();returnhdfs.listFileName();
}catch(Exception e) {
e.printStackTrace();return null;
}finally{if (null !=hdfs) {
hdfsPool.returnObject(hdfs);
}
}
}
}
importorg.apache.commons.pool2.PooledObject;importorg.apache.commons.pool2.PooledObjectFactory;importorg.apache.commons.pool2.impl.DefaultPooledObject;importjava.io.IOException;public class HdfsFactory implements PooledObjectFactory{private finalString url;publicHdfsFactory(String url){this.url =url;
}
@Overridepublic PooledObject makeObject() throwsException {
Hdfs hdfs= newHdfs(url);
hdfs.open();return new DefaultPooledObject(hdfs);
}
@Overridepublic void destroyObject(PooledObject pooledObject) throwsException {
Hdfs hdfs=pooledObject.getObject();
hdfs.close();
}
@Overridepublic boolean validateObject(PooledObjectpooledObject) {
Hdfs hdfs=pooledObject.getObject();try{returnhdfs.isConnected();
}catch(IOException e) {
e.printStackTrace();return false;
}
}
@Overridepublic void activateObject(PooledObject pooledObject) throwsException {
}
@Overridepublic void passivateObject(PooledObject pooledObject) throwsException {
}
}
packagecom.cmcc.datacenter.hdfs.client;importorg.apache.commons.pool2.PooledObjectFactory;importorg.apache.commons.pool2.impl.AbandonedConfig;importorg.apache.commons.pool2.impl.GenericObjectPool;importorg.apache.commons.pool2.impl.GenericObjectPoolConfig;public class HdfsPool extends GenericObjectPool{public HdfsPool(PooledObjectFactoryfactory) {super(factory);
}public HdfsPool(PooledObjectFactory factory, GenericObjectPoolConfigconfig) {super(factory, config);
}public HdfsPool(PooledObjectFactory factory, GenericObjectPoolConfigconfig, AbandonedConfig abandonedConfig) {super(factory, config, abandonedConfig);
}
}
importcom.cmcc.datacenter.hdfs.config.HdfsConfig;importcom.google.common.collect.Lists;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.ContentSummary;importorg.apache.hadoop.fs.FileStatus;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importjava.io.IOException;importjava.util.List;public classHdfs {private Logger logger = LoggerFactory.getLogger(this.getClass());privateFileSystem fs;privateString coreResource;privateString hdfsResource;private finalString url;private static final String NAME = "fs.hdfs.impl";publicHdfs(String url) {this.url =url;
}public voidopen() {try{
Configuration conf= newConfiguration();
conf.set("fs.defaultFS", url);
System.out.println("url is "+url);
fs=FileSystem.get(conf);
logger.info("[Hadoop]创建实例成功");
}catch(Exception e) {
logger.error("[Hadoop]创建实例失败", e);
}
}public voidclose() {try{if (null !=fs) {
fs.close();
logger.info("[Hadoop]关闭实例成功");
}
}catch(Exception e) {
logger.error("[Hadoop]关闭实例失败", e);
}
}public boolean isConnected() throwsIOException {return fs.exists(new Path("/"));
}public boolean exists(String path) throwsIOException {
Path hdfsPath= newPath(path);returnfs.exists(hdfsPath);
}public FileStatus getFileStatus(String path) throwsIOException {
Path hdfsPath= newPath(path);returnfs.getFileStatus(hdfsPath);
}public ContentSummary getContentSummary(String path) throwsIOException {
ContentSummary contentSummary= null;
Path hdfsPath= newPath(path);if(fs.exists(hdfsPath)) {
contentSummary=fs.getContentSummary(hdfsPath);
}returncontentSummary;
}public List listFileName() throwsIOException {
List res =Lists.newArrayList();
FileStatus[] fileStatuses= fs.listStatus(new Path("/"));for(FileStatus fileStatus : fileStatuses){
res.add(fileStatus.getPath()+":类型--"+ (fileStatus.isDirectory()? "文件夹":"文件"));
}returnres;
}
}
四、总结:
一共六个类,理清思路看是很easy的。
这里就是spring对类的管理和commons-pool2对连接类的管理混着用了,所以显得有点乱。
1.@Configuration注解加到Hdfsconfig类上,作为一个配置类,作用类似于spring-xml文件中的标签,springboot会扫描并注入它名下管理的类,其中
@Bean(initMethod = "init", destroyMethod = "stop") 标签表示spring在初始化这个类时调用他的init方法,销毁时调用他的stop方法。
2.HdfsClient 是业务方法调用的类,spring在初始化这个类时,调用它的init方法,这个方法会创建HdfsPool(即Hdfs的连接池)。其他方法是对Hdfs中方法的二次封装,即先使用连接池获取实例,再调用实例方法。
3.HdfsPoolConfig继承commons-pool2包中的GenericObjectConfig,受spring管理,作为线程池的配置类,创建HdfsPool时作为参数传入。
4.HdfsFactory继承commons-pool2包中的GenericObjectFactory,受spring管理,作为创建连接实例的工厂类,创建HdfsPool时作为参数传入。实际上连接池就是通过它获取的连接实例。
5.HdfsPool继承commons-pool2包中的GenericObjectPool,是连接池。
6.Hdfs,是底层的连接实例,所有增删改查的方法都要在这里实现,只不过获取/销毁连接交给池管理。
声明:这里用spring管理一些类是应为项目本身用的springboot,spring管理方便,并不是强制使用,愿意完全可以自己new。
五、不得不说的一些不是坑的坑。
1.我真的不记得windows上用Java API连接远程的hadoop还要有一些神操作。
报错如下:java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset
解决如下:
1. 将已下载的 hadoop-2.9.0.tar 这个压缩文件解压,放到你想要的位置(本机任意位置);
2. 下载 windows 环境下所需的其他文件(hadoop2.9.0对应的hadoop.dll,winutils.exe 等),这步真是关键,吐槽某SDN想钱想疯了啊,霸占百度前10页,各种下载各种C币,各种要钱。
不多说了,附上github地址:github地址
3. 拿到上面下载的windows所需文件,执行以下步骤:
3.1:将文件解压到你解压的 hadoop-2.9.0.tar 的bin目录下(没有的放进去,有的不要替换,以免花式作死,想学习尝试的除外)
3.2:将hadoop.dll复制到C:\Window\System32下
3.3:添加环境变量HADOOP_HOME,指向hadoop目录
3.4:将%HADOOP_HOME%\bin加入到path里面,不管用的话将%HADOOP_HOME%\sbin也加进去。
3.5:重启 IDE(你的编辑工具,例如eclipse,intellij idea)
原文:https://www.cnblogs.com/peripateticism/p/10895903.html
Java hdfs连接池_Java使用连接池管理Hdfs连接相关推荐
- java定时线程池_java 定时器线程池(ScheduledThreadPoolExecutor)的实现
前言 定时器线程池提供了定时执行任务的能力,即可以延迟执行,可以周期性执行.但定时器线程池也还是线程池,最底层实现还是ThreadPoolExecutor,可以参考我的另外一篇文章多线程–精通Thre ...
- java多线程线程池_Java多线程——线程池(ThreadPool)
我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁 ...
- java 多线程池_Java ThreadPoolExecutor线程池 同时执行50个线程
最近项目上有个需求,需要从FTP服务器中下载大批量的数据文件,然后解析该数据文件进行入库,数据库为oracle,最后在通过web工程,以报表和图表的形式进行展现. 这些批量的数据文件为纯文本文件,每天 ...
- java的连接 初始化_java类从加载、连接到初始化过程详解
Java代码在编译后会转化成Java字节码,字节码被类加载器加载到JVM里,JVM执行字节码,最终需要转化成汇编指令在CPU上执行,Java中所使用的并发机制依赖于JVM的实现和CPU的指令. 类加载 ...
- java中定时任务和线程池_java基于线程池和反射机制实现定时任务完整实例
本文实例讲述了java基于线程池和反射机制实现定时任务的方法.分享给大家供大家参考,具体如下: 主要包括如下实现类: 1. Main类: 任务执行的入口: 调用main方法,开始加载任务配置并执行任务 ...
- java模拟连接超时_Java:使用Toxiproxy模拟各种连接问题
java模拟连接超时 用Toxiproxy和Java的HttpURLConnection模拟各种连接问题,以查看产生了什么样的错误:连接超时vs.读取超时vs.连接被拒绝-. 结果: 系统:openj ...
- java中的字符串常量池_java字符串常量池
字符串常量池SCP jdk1.6是放在永久代(8中叫方法区或叫元空间)中; jdk1.7+中,字符串常量池放入了堆中,注意运行时常量依然存放在方法区,例如,Integer a = 40:Java在编译 ...
- java 异步线程池_Java - 异步线程池
一.异步线程启动: new Thread newThread(newRunnable() { @Overridepublic voidrun() {//-- 这里是异步线程内的逻辑 } } ).sta ...
- java socket 连接异常_java.net.SocketException:软件导致连接中止:套接字写错误
参见英文答案 > Official reasons for "Software caused connection abort: socket write error" ...
- java库存信息管理系统_Java商户管理系统 客户管理 库存管理 销售报表 SSM项目源码...
系统介绍: 1.系统采用主流的 SSM 框架 jsp JSTL bootstrap html5 (PC浏览器使用) 2.springmvc +spring4.3.7+ mybaits3.3 SSM ...
最新文章
- c语言实现图形界面实现四则运算,C语言实现四则运算的生成器
- 收藏 | 机器学习的基础图表
- img src=/引发的问题
- 皮一皮:我真的没买这么多...
- 皮一皮:加了个班还以为鬼门大开魔界连同了...
- python语言入门w-Python完全小白入门指南
- MAT之PSO:利用PSO算法优化二元函数,寻找最优个体适应度
- java arrays.equals_Java Arrays类的常见使用
- NOIP 2013 day1
- 简易中控紫猫插件版(3)压缩包使用说明
- c语言数组最大可定义多少位_C语言求数组的最大值三种方法
- java中的比较运算符_Java基础---Java中的比较运算符(十三)
- [转载] pip快速下载python包
- apache+nginx 实现动静分离
- raid5用户mbr还是gpt_对硬盘进行分区时,GPT和MBR有什么区别?
- 我们为什么要推广经方?
- python哪个字体好看_Python实现对比不同字体中的同一字符的显示效果
- Win7怎么设置工作组?Win7电脑设置工作组的方法
- oracle.exe占用cpu太高,360tray.exe占用CPU过高,怎么办
- 面试题:重写equals方法为什么通常会重写hashcode方法?
热门文章
- scala 字符串转换数组_如何在Scala中将字节数组转换为字符串?
- 解决方案_智能工厂全套解决方案
- python图形界面库哪个好_8个必备的Python GUI库
- in-nan(ind)_NaN16 Constant in Julia
- php数据类型_PHP数据类型能力问题和解答
- 判断dll是版本(Debug Or Release)[测试通过]
- Android 模拟器调试的缺点
- 天气预报HTML代码
- elf文件格式实例解析
- 使用 Packer、Ansible 和 Terraform 构建不可变的基础设施