记录一下Java API 连接hadoop操作hdfs的实现流程(使用连接池管理)。

以前做过这方面的开发,本来以为不会有什么问题,但是做的还是坑坑巴巴,内心有些懊恼,记录下这烦人的过程,警示自己切莫眼高手低!

一:引入相关jar包如下

org.apache.hadoop

hadoop-common

2.8.2

org.apache.hadoop

hadoop-hdfs

2.8.2

org.apache.commons

commons-pool2

2.6.0

二:连接池开发的基本流程

2.1项目基本环境是SpringBoot大集成···

2.2hadoop相关包结构如下(自己感觉这结构划分的也是凸显了low逼水平【手动笑哭】)

2.2 画个图表达下开发思路

三、上代码

importcom.cmcc.datacenter.hdfs.client.HdfsClient;importcom.cmcc.datacenter.hdfs.client.HdfsFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;

@Configurationpublic classHdfsConfig {

@Value("${hadoop.hdfs.ip}")privateString hdfsServerIp;

@Value("${hadoop.hdfs.port}")privateString hdfsServerPort;

@Value("${hadoop.hdfs.pool.maxTotal}")private intmaxTotal;

@Value("${hadoop.hdfs.pool.maxIdle}")private intmaxIdle;

@Value("${hadoop.hdfs.pool.minIdle}")private intminIdle;

@Value("${hadoop.hdfs.pool.maxWaitMillis}")private intmaxWaitMillis;

@Value("${hadoop.hdfs.pool.testWhileIdle}")private booleantestWhileIdle;

@Value("${hadoop.hdfs.pool.minEvictableIdleTimeMillis}")private long minEvictableIdleTimeMillis = 60000;

@Value("${hadoop.hdfs.pool.timeBetweenEvictionRunsMillis}")private long timeBetweenEvictionRunsMillis = 30000;

@Value("${hadoop.hdfs.pool.numTestsPerEvictionRun}")private int numTestsPerEvictionRun = -1;

@Bean(initMethod= "init", destroyMethod = "stop")publicHdfsClient HdfsClient(){

HdfsClient client= newHdfsClient();returnclient;

}/*** TestWhileConfig - 在空闲时检查有效性, 默认false

* MinEvictableIdleTimeMillis - 逐出连接的最小空闲时间

* TimeBetweenEvictionRunsMillis - 逐出扫描的时间间隔(毫秒) 如果为负数则不运行逐出线程,默认-1

* NumTestsPerEvictionRun - 每次逐出检查时 逐出的最大数目

**/@BeanpublicHdfsPoolConfig HdfsPoolConfig(){

HdfsPoolConfig hdfsPoolConfig= newHdfsPoolConfig();

hdfsPoolConfig.setTestWhileIdle(testWhileIdle);

hdfsPoolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);

hdfsPoolConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);

hdfsPoolConfig.setNumTestsPerEvictionRun(numTestsPerEvictionRun);

hdfsPoolConfig.setMaxTotal(maxTotal);

hdfsPoolConfig.setMaxIdle(maxIdle);

hdfsPoolConfig.setMinIdle(minIdle);

hdfsPoolConfig.setMaxWaitMillis(maxWaitMillis);returnhdfsPoolConfig;

}

@BeanpublicHdfsFactory HdfsFactory(){return new HdfsFactory("hdfs://" + hdfsServerIp + ":" +hdfsServerPort);

}

}

importorg.apache.commons.pool2.impl.GenericObjectPoolConfig;public class HdfsPoolConfig extendsGenericObjectPoolConfig {publicHdfsPoolConfig(){}/*** TestWhileConfig - 在空闲时检查有效性, 默认false

* MinEvictableIdleTimeMillis - 逐出连接的最小空闲时间

* TimeBetweenEvictionRunsMillis - 逐出扫描的时间间隔(毫秒) 如果为负数则不运行逐出线程,默认-1

* NumTestsPerEvictionRun - 每次逐出检查时 逐出的最大数目

**/

public HdfsPoolConfig(boolean testWhileIdle, long minEvictableIdleTimeMillis, long timeBetweenEvictionRunsMillis, intnumTestsPerEvictionRun){this.setTestWhileIdle(testWhileIdle);this.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);this.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);this.setNumTestsPerEvictionRun(numTestsPerEvictionRun);

}

}

importcom.cmcc.datacenter.hdfs.config.HdfsPoolConfig;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importjava.util.List;public classHdfsClient {private Logger logger = LoggerFactory.getLogger(this.getClass());privateHdfsPool hdfsPool;

@AutowiredprivateHdfsPoolConfig hdfsPoolConfig;

@AutowiredprivateHdfsFactory hdfsFactory;public voidinit(){

hdfsPool= newHdfsPool(hdfsFactory,hdfsPoolConfig);

}public voidstop(){

hdfsPool.close();

}public long getPathSize(String path) throwsException {

Hdfs hdfs= null;try{

hdfs=hdfsPool.borrowObject();returnhdfs.getContentSummary(path).getLength();

}catch(Exception e) {

logger.error("[HDFS]获取路径大小失败", e);throwe;

}finally{if (null !=hdfs) {

hdfsPool.returnObject(hdfs);

}

}

}public ListgetBasePath(){

Hdfs hdfs= null;try{

hdfs=hdfsPool.borrowObject();returnhdfs.listFileName();

}catch(Exception e) {

e.printStackTrace();return null;

}finally{if (null !=hdfs) {

hdfsPool.returnObject(hdfs);

}

}

}

}

importorg.apache.commons.pool2.PooledObject;importorg.apache.commons.pool2.PooledObjectFactory;importorg.apache.commons.pool2.impl.DefaultPooledObject;importjava.io.IOException;public class HdfsFactory implements PooledObjectFactory{private finalString url;publicHdfsFactory(String url){this.url =url;

}

@Overridepublic PooledObject makeObject() throwsException {

Hdfs hdfs= newHdfs(url);

hdfs.open();return new DefaultPooledObject(hdfs);

}

@Overridepublic void destroyObject(PooledObject pooledObject) throwsException {

Hdfs hdfs=pooledObject.getObject();

hdfs.close();

}

@Overridepublic boolean validateObject(PooledObjectpooledObject) {

Hdfs hdfs=pooledObject.getObject();try{returnhdfs.isConnected();

}catch(IOException e) {

e.printStackTrace();return false;

}

}

@Overridepublic void activateObject(PooledObject pooledObject) throwsException {

}

@Overridepublic void passivateObject(PooledObject pooledObject) throwsException {

}

}

importorg.apache.commons.pool2.PooledObjectFactory;importorg.apache.commons.pool2.impl.AbandonedConfig;importorg.apache.commons.pool2.impl.GenericObjectPool;importorg.apache.commons.pool2.impl.GenericObjectPoolConfig;public class HdfsPool extends GenericObjectPool{public HdfsPool(PooledObjectFactoryfactory) {super(factory);

}public HdfsPool(PooledObjectFactory factory, GenericObjectPoolConfigconfig) {super(factory, config);

}public HdfsPool(PooledObjectFactory factory, GenericObjectPoolConfigconfig, AbandonedConfig abandonedConfig) {super(factory, config, abandonedConfig);

}

}

importcom.cmcc.datacenter.hdfs.config.HdfsConfig;importcom.google.common.collect.Lists;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.ContentSummary;importorg.apache.hadoop.fs.FileStatus;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importjava.io.IOException;importjava.util.List;public classHdfs {private Logger logger = LoggerFactory.getLogger(this.getClass());privateFileSystem fs;privateString coreResource;privateString hdfsResource;private finalString url;private static final String NAME = "fs.hdfs.impl";publicHdfs(String url) {this.url =url;

}public voidopen() {try{

Configuration conf= newConfiguration();

conf.set("fs.defaultFS", url);

System.out.println("url is "+url);

fs=FileSystem.get(conf);

logger.info("[Hadoop]创建实例成功");

}catch(Exception e) {

logger.error("[Hadoop]创建实例失败", e);

}

}public voidclose() {try{if (null !=fs) {

fs.close();

logger.info("[Hadoop]关闭实例成功");

}

}catch(Exception e) {

logger.error("[Hadoop]关闭实例失败", e);

}

}public boolean isConnected() throwsIOException {return fs.exists(new Path("/"));

}public boolean exists(String path) throwsIOException {

Path hdfsPath= newPath(path);returnfs.exists(hdfsPath);

}public FileStatus getFileStatus(String path) throwsIOException {

Path hdfsPath= newPath(path);returnfs.getFileStatus(hdfsPath);

}public ContentSummary getContentSummary(String path) throwsIOException {

ContentSummary contentSummary= null;

Path hdfsPath= newPath(path);if(fs.exists(hdfsPath)) {

contentSummary=fs.getContentSummary(hdfsPath);

}returncontentSummary;

}public List listFileName() throwsIOException {

List res =Lists.newArrayList();

FileStatus[] fileStatuses= fs.listStatus(new Path("/"));for(FileStatus fileStatus : fileStatuses){

res.add(fileStatus.getPath()+":类型--"+ (fileStatus.isDirectory()? "文件夹":"文件"));

}returnres;

}

}

四、总结:

一共六个类,理清思路看是很easy的。

这里就是spring对类的管理和commons-pool2对连接类的管理混着用了,所以显得有点乱。

1.@Configuration注解加到Hdfsconfig类上,作为一个配置类,作用类似于spring-xml文件中的标签,springboot会扫描并注入它名下管理的类,其中

@Bean(initMethod = "init", destroyMethod = "stop") 标签表示spring在初始化这个类时调用他的init方法,销毁时调用他的stop方法。

2.HdfsClient 是业务方法调用的类,spring在初始化这个类时,调用它的init方法,这个方法会创建HdfsPool(即Hdfs的连接池)。其他方法是对Hdfs中方法的二次封装,即先使用连接池获取实例,再调用实例方法。

3.HdfsPoolConfig继承commons-pool2包中的GenericObjectConfig,受spring管理,作为线程池的配置类,创建HdfsPool时作为参数传入。

4.HdfsFactory继承commons-pool2包中的GenericObjectFactory,受spring管理,作为创建连接实例的工厂类,创建HdfsPool时作为参数传入。实际上连接池就是通过它获取的连接实例。

5.HdfsPool继承commons-pool2包中的GenericObjectPool,是连接池。

6.Hdfs,是底层的连接实例,所有增删改查的方法都要在这里实现,只不过获取/销毁连接交给池管理。

声明:这里用spring管理一些类是应为项目本身用的springboot,spring管理方便,并不是强制使用,愿意完全可以自己new。

五、不得不说的一些不是坑的坑。

1.我真的不记得windows上用Java API连接远程的hadoop还要有一些神操作。

报错如下:java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset

解决如下:

1. 将已下载的 hadoop-2.9.0.tar 这个压缩文件解压,放到你想要的位置(本机任意位置);

2. 下载 windows 环境下所需的其他文件(hadoop2.9.0对应的hadoop.dll,winutils.exe 等),这步真是关键,吐槽某SDN想钱想疯了啊,霸占百度前10页,各种下载各种C币,各种要钱。

不多说了,附上github地址:github地址

3. 拿到上面下载的windows所需文件,执行以下步骤:

3.1:将文件解压到你解压的 hadoop-2.9.0.tar 的bin目录下(没有的放进去,有的不要替换,以免花式作死,想学习尝试的除外)

3.2:将hadoop.dll复制到C:\Window\System32下

3.3:添加环境变量HADOOP_HOME,指向hadoop目录

3.4:将%HADOOP_HOME%\bin加入到path里面,不管用的话将%HADOOP_HOME%\sbin也加进去。

3.5:重启 IDE(你的编辑工具,例如eclipse,intellij idea)

JAVA怎么连接华为的HDFS系统_Java使用连接池管理Hdfs连接相关推荐

  1. Java hdfs连接池_Java使用连接池管理Hdfs连接

    记录一下Java API 连接hadoop操作hdfs的实现流程(使用连接池管理). 以前做过这方面的开发,本来以为不会有什么问题,但是做的还是坑坑巴巴,内心有些懊恼,记录下这烦人的过程,警示自己切莫 ...

  2. 信号满格怎么显示无法连接服务器,Win7 32系统网络信号满格却无法连接上网怎么处理...

    无线网是我们常用的网络连接方法,通常笔记本电脑中都配备了无线网卡,可以使用无线网络,不过有的用户发现在win7 32系统中查看无线网络,信号是满格的,然而当我们进行连接的时候却出现无法连接,或是连接上 ...

  3. 不能连接本地数据库mysql_win7系统下mysql或sqlserver显示无法连接本地数据库如何解决...

    最近有win7系统用户在使用数据的时候可能会遇到一些问题,比如有时候会遇到mysql或sqlserver显示无法连接本地数据库的情况,经过分析可能是相关服务没有开启,本教程就给大家带来win7系统下m ...

  4. win10蓝牙已配对连接不上_Win10系统显示蓝牙已配对但未连接的解决方法

    最近有win10系统用户反映说碰到这样一个情况,就是在使用蓝牙的时候,明明已经显示蓝牙已配对了,但是却一直未连接,导致无法使用蓝牙功能,遇到这样的问题该如何处理呢?本文就给大家讲解一下Win10系统显 ...

  5. win10蓝牙已配对连接不上_Win10系统显示蓝牙已配对但未连接咋办?

    电脑的蓝牙设备是我们在Win10系统中电脑传输,接收文件有的小伙伴在使用的方式,安全快捷,今天看到有小伙伴在使用蓝牙的时候初选了问题,系统显示蓝牙已配对了的,但是一直不能成功连接的情况,遇到这个问题我 ...

  6. 计算机断开网络连接网络设置,win7系统网络总是自动断开网络无法连接怎么解决...

    随缘小编用的win7系统,在上网的过程中,网络常常自动断开,带来了很大的困扰.该如何解决这个问题呢?有时候win7在休眠的时候网络会自动断开,在登录系统的之后网络就无法连接上.这样的win7网络真是让 ...

  7. java win 窗体开发简单订餐系统_Java实现简单订餐系统

    本文实例为大家分享了Java实现简单订餐系统的具体代码,供大家参考,具体内容如下 import java.util.Scanner; import java.util.*; public class ...

  8. java写的学生信息查询系统_Java编写学生信息查询系统,报错!!!

    在窗口ClientFrame中有一个窗格,Newstudentinfo和Selectstudentinfo独立运行都没问题,但是在ClientFrame中只运行Selectstudentinfo,录入 ...

  9. java实现英文文件单词搜索系统_java对于目录下文件的单词查找操作代码实现

    这篇文章主要介绍了java对于目录下文件的单词查找操作代码实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 写入文件的目录.代码通过找目录下的文件 ...

最新文章

  1. gtkpod 0.99.8
  2. 圆形的CNN卷积核?华中科大清华黄高团队康奈尔提出圆形卷积,进一步提升卷积结构性能!
  3. 最简单的NamedPiep程序[秋镇菜]-初学者看看
  4. JDK 18 / Java 18 GA 发布
  5. 程序显示文本框_vb程序语言题库
  6. Spring框架入门(一)
  7. 打算为IBatisNet 引入自定义Membership和Role
  8. 移动端H5解惑-页面适配
  9. 【Python】使用Python调用Fragstats批量进行万级及以上数据的景观指数运算
  10. 深入浅出dev、test、pre、pro四大环境
  11. QQ、微信动态图表情包怎么制作?视频如何转GIF
  12. 160813_qt显示阿拉伯输入法9x7点阵
  13. python 图片文字化处理_Python图像处理之图片文字识别功能(OCR)
  14. go语言加解密算法 md5 sha256
  15. 阵列卡u盘安装系统步骤_带Raid的服务器安装系统(采用U盘安装)
  16. 电子商务专业英语参考试卷
  17. notepad集成jsonviewer 查看json
  18. 微信转账 服务器错误,微信转账转错了怎么办 两种补救方法介绍
  19. JavaScript实现页面倒计时效果
  20. 负荷分配问题的动态规划算法递归实现

热门文章

  1. PDF格式分析(七十一)—— Markup注释
  2. HM 中cfg配置文件
  3. 使用ZXing扫描条形码和二维码
  4. 2021年企业税务筹划方法有哪些?企业税务筹划重点是哪些税?
  5. YouTube 全球范围宕机事件,华为回应荣耀单飞
  6. insert sql语句_SQL Insert语句概述
  7. 279完全平方数(四平方定理、递归、动态规划)
  8. squid 和squid 集群
  9. 可能与不可能的边界(P/NP问题趣史)
  10. Java计算租金递增,根据租金递增周期和递增率生成合同期限的所有月租金(租金计算)...