MapReduce基础开发之八HDFS文件CRUD操作
HDFS文件操作的基础代码。
package com.hive;import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;public class HdfsCRUD {public static void main(String[] args) {//提交集群执行yarn jar//ls//String srcPath="/tmp/fjs/dpi1/";//HdfsCRUD.list(srcPath);//cat//String file="/tmp/fjs/in/test.txt";//HdfsCRUD.readFile(file);//HdfsCRUD.getModificationTime(file);//HdfsCRUD.getBlockLocations(file);//HdfsCRUD.getHostnames();String dir="/tmp/fjs/in/hdfs";HdfsCRUD.mkdir(dir);}/** hdoop fs -ls命令*/public static void list(String srcPath) { Configuration conf = new Configuration(); FileSystem fs = null;try {fs = FileSystem.get(conf);RemoteIterator<LocatedFileStatus>rmIterator = fs.listLocatedStatus(new Path(srcPath)); while (rmIterator.hasNext()) { Path path = rmIterator.next().getPath(); if(fs.isDirectory(path)){ System.out.println("-----------DirectoryName: "+path.getName()); } else if(fs.isFile(path)){ System.out.println("-----------FileName: "+path.getName()); } } } catch (IOException e) {System.out.println(e.getMessage());}} /** hdoop fs -cat命令*/public static void readFile(String file){ Configuration conf = new Configuration(); FileSystem fs = null;try { fs= FileSystem.get(conf); Path path = new Path(file); if(!fs.exists(path)){ System.out.println("file'"+ file+"' doesn't exist!"); return ; } FSDataInputStream in = fs.open(path); String filename = file.substring(file.lastIndexOf('/') + 1, file.length()); OutputStream out = new BufferedOutputStream(new FileOutputStream(new File("/tmp/"+filename))); byte[] b = new byte[1024]; int numBytes = 0; while ((numBytes = in.read(b)) > 0) { out.write(b,0,numBytes); } in.close(); out.close(); fs.close(); }catch (IOException e) {System.out.println(e.getMessage());} } /** * Gets the information about the file modified time. */ public static void getModificationTime(String source){ try{Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path srcPath = new Path(source); // Check if the file alreadyexists if (!(fs.exists(srcPath))) { System.out.println("No such destination " + srcPath);return; } // Get the filename out of thefile path String filename = source.substring(source.lastIndexOf('/') + 1, source.length()); FileStatus fileStatus = fs.getFileStatus(srcPath); long modificationTime =fileStatus.getModificationTime(); System.out.println("modified datetime: " + System.out.format("File %s; Modification time :%2$tI:%2$tM:%2$tS%n",filename,modificationTime)); }catch (IOException e) {System.out.println(e.getMessage());} } /** * Gets the file block location info */ public static void getBlockLocations(String source){ try{Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path srcPath = new Path(source); // Check if the file alreadyexists if (!(fs.exists(srcPath))) { System.out.println("No such destination " + srcPath); return; } // Get the filename out of thefile path String filename = source.substring(source.lastIndexOf('/') + 1, source.length()); FileStatus fileStatus = fs.getFileStatus(srcPath); BlockLocation[] blkLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); int blkCount = blkLocations.length; System.out.println("File :" + filename + "stored at:"); for (int i=0; i < blkCount; i++) { String[] hosts = blkLocations[i].getHosts(); System.out.println("host ip:" +System.out.format("Host %d: %s %n", i, hosts)); } }catch (IOException e) {System.out.println(e.getMessage());} } /** 获取Hadoop集群中data node的DNS主机名*/public static void getHostnames (){ try{Configuration config = new Configuration(); FileSystem fs = FileSystem.get(config); DistributedFileSystem hdfs = (DistributedFileSystem) fs; DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats(); String[]names = new String[dataNodeStats.length]; for (int i = 0; i < dataNodeStats.length; i++) { names[i]= dataNodeStats[i].getHostName(); System.out.println("datenode hostname:"+(dataNodeStats[i].getHostName())); } }catch (IOException e) {System.out.println(e.getMessage());} } /** hadoop fs -mkdir命令*/public static void mkdir(String dir){ Configuration conf = new Configuration(); FileSystem fs = null; try { fs= FileSystem.get(conf); Path path = new Path(dir); if(!fs.exists(path)){ fs.mkdirs(path); System.out.println("create directory '"+dir+"' successfully!"); }else{ System.out.println("directory '"+dir+"' exits!"); } }catch (IOException e) { System.out.println("FileSystem get configuration with anerror"); e.printStackTrace(); }finally{ if(fs!= null){ try { fs.close(); }catch (IOException e) { System.out.println(e.getMessage()); new RuntimeException(e); } } } } /** 本地文件上传到hdfs * hadoop fs -put命令*/public void copyFromLocal (String source, String dest) { Configuration conf = new Configuration(); FileSystem fs=null; try { fs= FileSystem.get(conf); Path srcPath = new Path(source); Path dstPath = new Path(dest);// Check if the file alreadyexists if (!(fs.exists(dstPath))) { System.out.println("dstPathpath doesn't exist" ); System.out.println("No such destination " + dstPath); return; } // Get the filename out of thefile path String filename = source.substring(source.lastIndexOf('/') + 1, source.length());try{ //if the file exists in the destination path, it will throw exception. //fs.copyFromLocalFile(srcPath,dstPath);//remove and overwrite files with the method //copyFromLocalFile(booleandelSrc, boolean overwrite, Path src, Path dst) fs.copyFromLocalFile(false, true, srcPath, dstPath); System.out.println("File " + filename + "copied to " + dest); }catch(Exception e){ System.out.println(e.getMessage()); new RuntimeException(e); }finally{ fs.close(); } }catch (IOException e1) { System.out.println(e1.getMessage());new RuntimeException(e1); } } /** 添加一个文件到指定的目录下*/public void addFile(String source, String dest) { // Conf object will readthe HDFS configuration parameters Configuration conf = new Configuration(); FileSystem fs=null; try { fs= FileSystem.get(conf); // Get the filename out of thefile path String filename = source.substring(source.lastIndexOf('/') + 1, source.length()); // Create the destination pathincluding the filename. if (dest.charAt(dest.length() - 1) != '/') { dest= dest + "/" + filename; }else { dest= dest + filename; } // Check if the file alreadyexists Path path = new Path(dest); if (fs.exists(path)) { System.out.println("File " + dest + " already exists"); return; } // Create a new file and writedata to it.FSDataOutputStream out = fs.create(path); InputStream in = new BufferedInputStream(new FileInputStream(new File(source)));byte[] b = new byte[1024]; int numBytes = 0; //In this way read and write datato destination file.while ((numBytes = in.read(b)) > 0) {out.write(b,0, numBytes); } in.close(); out.close(); fs.close(); }catch (IOException e) { System.out.println(e.getMessage());new RuntimeException(e); } } /** 重新命名hdfs中的文件名称*/public void renameFile (String fromthis, String tothis){ Configuration conf = new Configuration(); FileSystem fs=null; try { fs= FileSystem.get(conf); Path fromPath = new Path(fromthis); Path toPath = new Path(tothis); if (!(fs.exists(fromPath))) { System.out.println("No such destination " + fromPath);return; } if (fs.exists(toPath)) { System.out.println("Already exists! " + toPath); return; } try{ boolean isRenamed = fs.rename(fromPath,toPath); //renames file name indeed.if(isRenamed){ System.out.println("Renamed from " + fromthis + " to " + tothis); } }catch(Exception e){ System.out.println(e.getMessage()); new RuntimeException(e); }finally{ fs.close(); } }catch (IOException e1) { System.out.println(e1.getMessage()); new RuntimeException(e1); } } /** 删除指定的一个文件* hadoop fs -rm -r命令*/public void deleteFile(String file) { Configuration conf = new Configuration(); FileSystem fs=null; try { fs= FileSystem.get(conf); Path path = new Path(file); if (!fs.exists(path)) { System.out.println("File " + file + " does not exists"); return; } /* * recursively delete the file(s) if it is adirectory. * If you want to mark the path that will bedeleted as * a result of closing the FileSystem. * deleteOnExit(Path f) */ fs.delete(new Path(file), true); fs.close(); }catch (IOException e) { System.out.println(e.getMessage()); new RuntimeException(e); } } }
MapReduce基础开发之八HDFS文件CRUD操作相关推荐
- MapReduce基础开发之三字段处理并输出Hive表
1.MR设计和开发 1)设计: 输入:用户名 | 数字ip | 时间戳 | url MR处理:正则表达式匹配url,满足则解析url并转换ip和时间戳, 输出:用 ...
- MapReduce基础开发之二数据去重和排序
因Hadoop集群平台网络限制,只能在eclipse里先写好代码再提交jar到集群平台namenode上执行,不能实时调试,所以没有配置eclipse的hadoop开发环境,只是引入了hadoop的l ...
- Python基础必掌握的文件读写操作详解
读取和写入文件等操作是 Python 可以完成的最常见任务之一.无论是写入简单的文本文件,读取复杂的服务器日志,甚至分析原始字节数据,进行相关的处理操作,最终所有这些情况都需要读取或写入文件. 整套学 ...
- MapReduce基础开发之五分布式下载ftp文件到本地再迁移到hdfs
为利用Hadoop集群平台的分布存储和计算能力,基于MapReduce将ftp文件分布式下载并上传到HDFS中. 1.文件移动流程:ftp服务器->datanode本地目录->HDFS目录 ...
- MapReduce基础开发之十三FileSystem实现本地文件上传
场景:从本地目录下上传文件到hdfs. 参考代码: package ct.gd;import java.io.BufferedInputStream; import java.io.File; imp ...
- MapReduce基础开发之十一DistributedCache使用
1.需求场景: 过滤无意义的单词后再进行文本词频统计.处理流程是: 1)预定义要过滤的无意义单词保存成文件,保存到HDFS中: 2)程序中将该文件定位为作业的缓存文件,使用Distributed ...
- MapReduce基础开发之十读写ORC File
1.ORC File Orc是Hive特有的一种列式存储的文件格式,它有着非常高的压缩比和读取效率,因此很快取代了之前的RCFile,成为Hive中非常常用的一种文件格式. 2.编译ORC Jar包 ...
- MapReduce基础开发之一词汇统计和排序(wordcount)
统计/var/log/boot.log中含k的字符的数量,并对含k的字符按照数量排序.需分两个job完成,一个用来统计,一个用来排序. 一.统计 1.上传文件到hadoop: 1)新建文件夹:h ...
- MapReduce基础开发之十二ChainMapper和ChainReducer使用
1.需求场景: 过滤无意义的单词后再进行文本词频统计.处理流程是: 1)第一个Map使用无意义单词数组过滤输入流: 2)第二个Map将过滤后的单词加上出现一次的标签: 3)最后Reduce输出词 ...
最新文章
- 一文了解推荐系统中的图神经网络
- jQuery温度计,支持摄氏度华氏度同时展示
- 普通计算机硬件,将普通显示器更改为触摸屏_计算机硬件和网络_IT /计算机_数据...
- 同态加密应用_重新设计具有同态性的银行应用
- 手写tomcat socket closed_【消费电子】:重在阅读,新增手写 BOOX Nova Pro 体验评测...
- JobDataMap传递参数_02
- AuthenticationManager验证原理分析
- 基于Android的人事管理系统 开发与设计
- python卸载_技术 | Python 包安装和卸载的几种方式
- TCP/UDP,SOCKET,HTTP,FTP 简析
- 适合初学者的大数据学习路线
- 如何恢复U盘误删数据?
- win11系统下,迅雷启动后闪退的问题
- java 简单框架_最简单的Java框架
- vue + element-ui 对登录功能、重置表单、退出功能、路由重定向、挂载路由导航守卫的实现
- uni-app使用Hbuilder X如何安卓APP打包、发布、运行
- 当你的电脑C盘满了怎么办?这两种方法都能解决
- mobaxterm显示图像闪退_日报|苹果修复王者闪退Bug;OPPO公布新一代混合光学变焦技术...
- qq空间制作常用软件
- 【Joy of Cryptography 读书笔记】Chapter 1 一次性密码本(one-time pad)Kerckhoffs原则
热门文章
- python 可变参数 关键字参数_Python关键字及可变参数*args,**kw原理解析
- louvian算法 缺点 优化_机器学习中的优化算法(1)-优化算法重要性,SGD,Momentum(附Python示例)...
- web浏览器_你最常用的web测试-浏览器兼容性测试
- Golang 匿名结构体及测试代码编写技巧
- 解决 vue路由跳转到新页面底部而不是顶部和后退到首页就不让他继续后退了
- bzoj1334[Baltic2008]Elect(背包dp)
- (转)jQuery.fn.extend与jQuery.extend到底区别在哪?
- ilpimage to bitmap
- 开机logo切换逻辑深入研究
- Windows 7 延长支持服务价格曝光:一台电脑最低25美元