hadoop文件系统与I/O流
本文地址:http://www.cnblogs.com/archimedes/p/hadoop-filesystem-io.html,转载请注明源地址。
hadoop借鉴了Linux虚拟文件系统的概念,引入了hadoop抽象文件系统,并在此基础上,提供了大量的具体文件系统的实现,满足构建于hadoop上应用的各种数据访问需求
hadoop文件系统API
hadoop提供一个抽象的文件系统,HDFS只是这个抽象文件系统的一个具体的实现。hadoop文件系统的抽象类org.apache.hadoop.fs.FileSystem
hadoop抽象文件系统的方法可以分为两部分:
1、用于处理文件和目录的相关事务
2、用于读写文件数据
hadoop抽象文件系统的操作
Hadoop的FileSystem |
Java操作 |
Linux操作 |
描述 |
URL.openSteam FileSystem.open FileSystem.create FileSystem.append |
URL.openStream |
open |
打开一个文件 |
FSDataInputStream.read |
InputSteam.read |
read |
读取文件中的数据 |
FSDataOutputStream.write |
OutputSteam.write |
write |
向文件写入数据 |
FSDataInputStream.close FSDataOutputStream.close |
InputSteam.close OutputSteam.close |
close |
关闭一个文件 |
FSDataInputStream.seek |
RandomAccessFile.seek |
lseek |
改变文件读写位置 |
FileSystem.getFileStatus FileSystem.get* |
File.get* |
stat |
获取文件/目录的属性 |
FileSystem.set* |
File.set* |
Chmod等 |
改变文件的属性 |
FileSystem.createNewFile |
File.createNewFile |
create |
创建一个文件 |
FileSystem.delete |
File.delete |
remove |
从文件系统中删除一个文件 |
FileSystem.rename |
File.renameTo |
rename |
更改文件/目录名 |
FileSystem.mkdirs |
File.mkdir |
mkdir |
在给定目录下创建一个子目录 |
FileSystem.delete |
File.delete |
rmdir |
从一个目录中删除一个空的子目录 |
FileSystem.listStatus |
File.list |
readdir |
读取一个目录下的项目 |
FileSystem.getWorkingDirectory |
getcwd/getwd |
返回当前工作目录 |
|
FileSystem.setWorkingDirectory |
chdir |
更改当前工作目录 |
通过FileSystem.getFileStatus()方法,Hadoop抽象文件系统可以一次获得文件/目录的所有属性,这些属性被保存在类FileStatus中
public class FileStatus implements Writable, Comparable {private Path path; //文件路径private long length; //文件长度private boolean isdir; //是否是目录private short block_replication; //副本数(为HDFS而准的特殊参数)private long blocksize; //块大小(为HDFS而准的特殊参数)private long modification_time; //最后修改时间private long access_time; //最后访问时间private FsPermission permission; //许可信息private String owner; //文件所有者private String group; //用户组 …… }
FileStatus实现了Writable接口,也就是说,FileStatus可以被序列化后在网络上传输,同时一次性将文件的所有属性读出并返回到客户端,可以减少在分布式系统中进行网络传输的次数
完整的FileStatus类的源代码如下:
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package org.apache.hadoop.fs;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable;/** Interface that represents the client side information for a file.*/ public class FileStatus implements Writable, Comparable {private Path path;private long length;private boolean isdir;private short block_replication;private long blocksize;private long modification_time;private long access_time;private FsPermission permission;private String owner;private String group;public FileStatus() { this(0, false, 0, 0, 0, 0, null, null, null, null); }//We should deprecate this soon?public FileStatus(long length, boolean isdir, int block_replication,long blocksize, long modification_time, Path path) {this(length, isdir, block_replication, blocksize, modification_time,0, null, null, null, path);}public FileStatus(long length, boolean isdir, int block_replication,long blocksize, long modification_time, long access_time,FsPermission permission, String owner, String group, Path path) {this.length = length;this.isdir = isdir;this.block_replication = (short)block_replication;this.blocksize = blocksize;this.modification_time = modification_time;this.access_time = access_time;this.permission = (permission == null) ? FsPermission.getDefault() : permission;this.owner = (owner == null) ? "" : owner;this.group = (group == null) ? "" : group;this.path = path;}/* * @return the length of this file, in blocks*/public long getLen() {return length;}/*** Is this a directory?* @return true if this is a directory*/public boolean isDir() {return isdir;}/*** Get the block size of the file.* @return the number of bytes*/public long getBlockSize() {return blocksize;}/*** Get the replication factor of a file.* @return the replication factor of a file.*/public short getReplication() {return block_replication;}/*** Get the modification time of the file.* @return the modification time of file in milliseconds since January 1, 1970 UTC.*/public long getModificationTime() {return modification_time;}/*** Get the access time of the file.* @return the access time of file in milliseconds since January 1, 1970 UTC.*/public long getAccessTime() {return access_time;}/*** Get FsPermission associated with the file.* @return permssion. If a filesystem does not have a notion of permissions* or if permissions could not be determined, then default * permissions equivalent of "rwxrwxrwx" is returned.*/public FsPermission getPermission() {return permission;}/*** Get the owner of the file.* @return owner of the file. The string could be empty if there is no* notion of owner of a file in a filesystem or if it could not * be determined (rare).*/public String getOwner() {return owner;}/*** Get the group associated with the file.* @return group for the file. The string could be empty if there is no* notion of group of a file in a filesystem or if it could not * be determined (rare).*/public String getGroup() {return group;}public Path getPath() {return path;}/* These are provided so that these values could be loaded lazily * by a filesystem (e.g. local file system).*//*** Sets permission.* @param permission if permission is null, default value is set*/protected void setPermission(FsPermission permission) {this.permission = (permission == null) ? FsPermission.getDefault() : permission;}/*** Sets owner.* @param owner if it is null, default value is set*/ protected void setOwner(String owner) {this.owner = (owner == null) ? "" : owner;}/*** Sets group.* @param group if it is null, default value is set*/ protected void setGroup(String group) {this.group = (group == null) ? "" : group;}//// Writable// public void write(DataOutput out) throws IOException {Text.writeString(out, getPath().toString());out.writeLong(length);out.writeBoolean(isdir);out.writeShort(block_replication);out.writeLong(blocksize);out.writeLong(modification_time);out.writeLong(access_time);permission.write(out);Text.writeString(out, owner);Text.writeString(out, group);}public void readFields(DataInput in) throws IOException {String strPath = Text.readString(in);this.path = new Path(strPath);this.length = in.readLong();this.isdir = in.readBoolean();this.block_replication = in.readShort();blocksize = in.readLong();modification_time = in.readLong();access_time = in.readLong();permission.readFields(in);owner = Text.readString(in);group = Text.readString(in);}/*** Compare this object to another object* * @param o the object to be compared.* @return a negative integer, zero, or a positive integer as this object* is less than, equal to, or greater than the specified object.* * @throws ClassCastException if the specified object's is not of * type FileStatus*/public int compareTo(Object o) {FileStatus other = (FileStatus)o;return this.getPath().compareTo(other.getPath());}/** Compare if this object is equal to another object* @param o the object to be compared.* @return true if two file status has the same path name; false if not.*/public boolean equals(Object o) {if (o == null) {return false;}if (this == o) {return true;}if (!(o instanceof FileStatus)) {return false;}FileStatus other = (FileStatus)o;return this.getPath().equals(other.getPath());}/*** Returns a hash code value for the object, which is defined as* the hash code of the path name.** @return a hash code value for the path name.*/public int hashCode() {return getPath().hashCode();} }
FileStatus
出现在FileSystem中的,但在java文件API中找不到对应的方法有:setReplication()、getReplication()、getContentSummary(),其声明如下:
public boolean setReplication(Path src, short replication)throws IOException {return true; } public short getReplication(Path src) throws IOException {return getFileStatus(src).getReplication(); } public ContentSummary getContentSummary(Path f) throws IOException {FileStatus status = getFileStatus(f);if (!status.isDir()) {// f is a filereturn new ContentSummary(status.getLen(), 1, 0);}// f is a directorylong[] summary = {0, 0, 1};for(FileStatus s : listStatus(f)) {ContentSummary c = s.isDir() ? getContentSummary(s.getPath()) :new ContentSummary(s.getLen(), 1, 0);summary[0] += c.getLength();summary[1] += c.getFileCount();summary[2] += c.getDirectoryCount();}return new ContentSummary(summary[0], summary[1], summary[2]); }
实现一个Hadoop具体文件系统,需要实现的功能有哪些?下面整理org.apache.hadoop.fs.FileSystem中的抽象方法:
//获取文件系统URI public abstract URI getUri();//为读打开一个文件,并返回一个输入流 public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;//创建一个文件,并返回一个输出流 public abstract FSDataOutputStream create(Path f,FsPermission permission,boolean overwrite,int bufferSize,short replication,long blockSize,Progressable progress) throws IOException;//在一个已经存在的文件中追加数据 public abstract FSDataOutputStream append(Path f, int bufferSize,Progressable progress) throws IOException;//修改文件名或目录名 public abstract boolean rename(Path src, Path dst) throws IOException;//删除文件 public abstract boolean delete(Path f) throws IOException; public abstract boolean delete(Path f, boolean recursive) throws IOException;//如果Path是一个目录,读取一个目录下的所有项目和项目属性 //如果Path是一个文件,获取文件属性 public abstract FileStatus[] listStatus(Path f) throws IOException;//设置当前的工作目录 public abstract void setWorkingDirectory(Path new_dir);//获取当前的工作目录 public abstract Path getWorkingDirectory();//如果Path是一个文件,获取文件属性 public abstract boolean mkdirs(Path f, FsPermission permission) throws IOException;//获取文件或目录的属性 public abstract FileStatus getFileStatus(Path f) throws IOException;
实现一个具体的文件系统,至少需要实现上面的这些抽象方法
hadoop完整的FileSystem类的源代码如下:
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package org.apache.hadoop.fs;import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern;import javax.security.auth.login.LoginException;import org.apache.commons.logging.*;import org.apache.hadoop.conf.*; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.*; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.security.UserGroupInformation;/***************************************************************** An abstract base class for a fairly generic filesystem. It* may be implemented as a distributed filesystem, or as a "local"* one that reflects the locally-connected disk. The local version* exists for small Hadoop instances and for testing.** <p>** All user code that may potentially use the Hadoop Distributed* File System should be written to use a FileSystem object. The* Hadoop DFS is a multi-machine system that appears as a single* disk. It's useful because of its fault tolerance and potentially* very large capacity.* * <p>* The local implementation is {@link LocalFileSystem} and distributed* implementation is DistributedFileSystem.*****************************************************************/ public abstract class FileSystem extends Configured implements Closeable {private static final String FS_DEFAULT_NAME_KEY = "fs.default.name";public static final Log LOG = LogFactory.getLog(FileSystem.class);/** FileSystem cache */private static final Cache CACHE = new Cache();/** The key this instance is stored under in the cache. */private Cache.Key key;/** Recording statistics per a FileSystem class */private static final Map<Class<? extends FileSystem>, Statistics> statisticsTable =new IdentityHashMap<Class<? extends FileSystem>, Statistics>();/*** The statistics for this file system.*/protected Statistics statistics;/*** A cache of files that should be deleted when filsystem is closed* or the JVM is exited.*/private Set<Path> deleteOnExit = new TreeSet<Path>();/** Returns the configured filesystem implementation.*/public static FileSystem get(Configuration conf) throws IOException {return get(getDefaultUri(conf), conf);}/** Get the default filesystem URI from a configuration.* @param conf the configuration to access* @return the uri of the default filesystem*/public static URI getDefaultUri(Configuration conf) {return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, "file:///")));}/** Set the default filesystem URI in a configuration.* @param conf the configuration to alter* @param uri the new default filesystem uri*/public static void setDefaultUri(Configuration conf, URI uri) {conf.set(FS_DEFAULT_NAME_KEY, uri.toString());}/** Set the default filesystem URI in a configuration.* @param conf the configuration to alter* @param uri the new default filesystem uri*/public static void setDefaultUri(Configuration conf, String uri) {setDefaultUri(conf, URI.create(fixName(uri)));}/** Called after a new FileSystem instance is constructed.* @param name a uri whose authority section names the host, port, etc.* for this FileSystem* @param conf the configuration*/public void initialize(URI name, Configuration conf) throws IOException {statistics = getStatistics(name.getScheme(), getClass()); }/** Returns a URI whose scheme and authority identify this FileSystem.*/public abstract URI getUri();/** @deprecated call #getUri() instead.*/public String getName() { return getUri().toString(); }/** @deprecated call #get(URI,Configuration) instead. */public static FileSystem getNamed(String name, Configuration conf)throws IOException {return get(URI.create(fixName(name)), conf);}/** Update old-format filesystem names, for back-compatibility. This should* eventually be replaced with a checkName() method that throws an exception* for old-format names. */ private static String fixName(String name) {// convert old-format name to new-format nameif (name.equals("local")) { // "local" is now "file:///".LOG.warn("\"local\" is a deprecated filesystem name."+" Use \"file:///\" instead.");name = "file:///";} else if (name.indexOf('/')==-1) { // unqualified is "hdfs://"LOG.warn("\""+name+"\" is a deprecated filesystem name."+" Use \"hdfs://"+name+"/\" instead.");name = "hdfs://"+name;}return name;}/*** Get the local file syste* @param conf the configuration to configure the file system with* @return a LocalFileSystem*/public static LocalFileSystem getLocal(Configuration conf)throws IOException {return (LocalFileSystem)get(LocalFileSystem.NAME, conf);}/** Returns the FileSystem for this URI's scheme and authority. The scheme* of the URI determines a configuration property name,* <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.* The entire URI is passed to the FileSystem instance's initialize method.*/public static FileSystem get(URI uri, Configuration conf) throws IOException {String scheme = uri.getScheme();String authority = uri.getAuthority();if (scheme == null) { // no scheme: use default FSreturn get(conf);}if (authority == null) { // no authorityURI defaultUri = getDefaultUri(conf);if (scheme.equals(defaultUri.getScheme()) // if scheme matches default&& defaultUri.getAuthority() != null) { // & default has authorityreturn get(defaultUri, conf); // return default }}String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);if (conf.getBoolean(disableCacheName, false)) {return createFileSystem(uri, conf);}return CACHE.get(uri, conf);}private static class ClientFinalizer extends Thread {public synchronized void run() {try {FileSystem.closeAll();} catch (IOException e) {LOG.info("FileSystem.closeAll() threw an exception:\n" + e);}}}private static final ClientFinalizer clientFinalizer = new ClientFinalizer();/*** Close all cached filesystems. Be sure those filesystems are not* used anymore.* * @throws IOException*/public static void closeAll() throws IOException {CACHE.closeAll();}/** Make sure that a path specifies a FileSystem. */public Path makeQualified(Path path) {checkPath(path);return path.makeQualified(this);}/** create a file with the provided permission* The permission of the file is set to be the provided permission as in* setPermission, not permission&~umask* * It is implemented using two RPCs. It is understood that it is inefficient,* but the implementation is thread-safe. The other option is to change the* value of umask in configuration to be 0, but it is not thread-safe.* * @param fs file system handle* @param file the name of the file to be created* @param permission the permission of the file* @return an output stream* @throws IOException*/public static FSDataOutputStream create(FileSystem fs,Path file, FsPermission permission) throws IOException {// create the file with default permissionFSDataOutputStream out = fs.create(file);// set its permission to the supplied one fs.setPermission(file, permission);return out;}/** create a directory with the provided permission* The permission of the directory is set to be the provided permission as in* setPermission, not permission&~umask* * @see #create(FileSystem, Path, FsPermission)* * @param fs file system handle* @param dir the name of the directory to be created* @param permission the permission of the directory* @return true if the directory creation succeeds; false otherwise* @throws IOException*/public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)throws IOException {// create the directory using the default permissionboolean result = fs.mkdirs(dir);// set its permission to be the supplied one fs.setPermission(dir, permission);return result;}///// FileSystem///protected FileSystem() {super(null);}/** Check that a Path belongs to this FileSystem. */protected void checkPath(Path path) {URI uri = path.toUri();if (uri.getScheme() == null) // fs is relative return;String thisScheme = this.getUri().getScheme();String thatScheme = uri.getScheme();String thisAuthority = this.getUri().getAuthority();String thatAuthority = uri.getAuthority();//authority and scheme are not case sensitiveif (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes matchif (thisAuthority == thatAuthority || // & authorities match(thisAuthority != null && thisAuthority.equalsIgnoreCase(thatAuthority)))return;if (thatAuthority == null && // path's authority is nullthisAuthority != null) { // fs has an authorityURI defaultUri = getDefaultUri(getConf()); // & is the conf default if (thisScheme.equalsIgnoreCase(defaultUri.getScheme()) &&thisAuthority.equalsIgnoreCase(defaultUri.getAuthority()))return;try { // or the default fs's uridefaultUri = get(getConf()).getUri();} catch (IOException e) {throw new RuntimeException(e);}if (thisScheme.equalsIgnoreCase(defaultUri.getScheme()) &&thisAuthority.equalsIgnoreCase(defaultUri.getAuthority()))return;}}throw new IllegalArgumentException("Wrong FS: "+path+", expected: "+this.getUri());}/*** Return an array containing hostnames, offset and size of * portions of the given file. For a nonexistent * file or regions, null will be returned.** This call is most helpful with DFS, where it returns * hostnames of machines that contain the given file.** The FileSystem will simply return an elt containing 'localhost'.*/public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {if (file == null) {return null;}if ( (start<0) || (len < 0) ) {throw new IllegalArgumentException("Invalid start or len parameter");}if (file.getLen() < start) {return new BlockLocation[0];}String[] name = { "localhost:50010" };String[] host = { "localhost" };return new BlockLocation[] { new BlockLocation(name, host, 0, file.getLen()) };}/*** Opens an FSDataInputStream at the indicated Path.* @param f the file name to open* @param bufferSize the size of the buffer to be used.*/public abstract FSDataInputStream open(Path f, int bufferSize)throws IOException;/*** Opens an FSDataInputStream at the indicated Path.* @param f the file to open*/public FSDataInputStream open(Path f) throws IOException {return open(f, getConf().getInt("io.file.buffer.size", 4096));}/*** Opens an FSDataOutputStream at the indicated Path.* Files are overwritten by default.*/public FSDataOutputStream create(Path f) throws IOException {return create(f, true);}/*** Opens an FSDataOutputStream at the indicated Path.*/public FSDataOutputStream create(Path f, boolean overwrite)throws IOException {return create(f, overwrite, getConf().getInt("io.file.buffer.size", 4096),getDefaultReplication(),getDefaultBlockSize());}/*** Create an FSDataOutputStream at the indicated Path with write-progress* reporting.* Files are overwritten by default.*/public FSDataOutputStream create(Path f, Progressable progress) throws IOException {return create(f, true, getConf().getInt("io.file.buffer.size", 4096),getDefaultReplication(),getDefaultBlockSize(), progress);}/*** Opens an FSDataOutputStream at the indicated Path.* Files are overwritten by default.*/public FSDataOutputStream create(Path f, short replication)throws IOException {return create(f, true, getConf().getInt("io.file.buffer.size", 4096),replication,getDefaultBlockSize());}/*** Opens an FSDataOutputStream at the indicated Path with write-progress* reporting.* Files are overwritten by default.*/public FSDataOutputStream create(Path f, short replication, Progressable progress)throws IOException {return create(f, true, getConf().getInt("io.file.buffer.size", 4096),replication,getDefaultBlockSize(), progress);}/*** Opens an FSDataOutputStream at the indicated Path.* @param f the file name to open* @param overwrite if a file with this name already exists, then if true,* the file will be overwritten, and if false an error will be thrown.* @param bufferSize the size of the buffer to be used.*/public FSDataOutputStream create(Path f, boolean overwrite,int bufferSize) throws IOException {return create(f, overwrite, bufferSize, getDefaultReplication(),getDefaultBlockSize());}/*** Opens an FSDataOutputStream at the indicated Path with write-progress* reporting.* @param f the file name to open* @param overwrite if a file with this name already exists, then if true,* the file will be overwritten, and if false an error will be thrown.* @param bufferSize the size of the buffer to be used.*/public FSDataOutputStream create(Path f, boolean overwrite,int bufferSize,Progressable progress) throws IOException {return create(f, overwrite, bufferSize, getDefaultReplication(),getDefaultBlockSize(), progress);}/*** Opens an FSDataOutputStream at the indicated Path.* @param f the file name to open* @param overwrite if a file with this name already exists, then if true,* the file will be overwritten, and if false an error will be thrown.* @param bufferSize the size of the buffer to be used.* @param replication required block replication for the file. */public FSDataOutputStream create(Path f, boolean overwrite,int bufferSize,short replication,long blockSize) throws IOException {return create(f, overwrite, bufferSize, replication, blockSize, null);}/*** Opens an FSDataOutputStream at the indicated Path with write-progress* reporting.* @param f the file name to open* @param overwrite if a file with this name already exists, then if true,* the file will be overwritten, and if false an error will be thrown.* @param bufferSize the size of the buffer to be used.* @param replication required block replication for the file. */public FSDataOutputStream create(Path f,boolean overwrite,int bufferSize,short replication,long blockSize,Progressable progress) throws IOException {return this.create(f, FsPermission.getDefault(),overwrite, bufferSize, replication, blockSize, progress);}/*** Opens an FSDataOutputStream at the indicated Path with write-progress* reporting.* @param f the file name to open* @param permission* @param overwrite if a file with this name already exists, then if true,* the file will be overwritten, and if false an error will be thrown.* @param bufferSize the size of the buffer to be used.* @param replication required block replication for the file.* @param blockSize* @param progress* @throws IOException* @see #setPermission(Path, FsPermission)*/public abstract FSDataOutputStream create(Path f,FsPermission permission,boolean overwrite,int bufferSize,short replication,long blockSize,Progressable progress) throws IOException;/*** Creates the given Path as a brand-new zero-length file. If* create fails, or if it already existed, return false.*/public boolean createNewFile(Path f) throws IOException {if (exists(f)) {return false;} else {create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();return true;}}/*** Append to an existing file (optional operation).* Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)* @param f the existing file to be appended.* @throws IOException*/public FSDataOutputStream append(Path f) throws IOException {return append(f, getConf().getInt("io.file.buffer.size", 4096), null);}/*** Append to an existing file (optional operation).* Same as append(f, bufferSize, null).* @param f the existing file to be appended.* @param bufferSize the size of the buffer to be used.* @throws IOException*/public FSDataOutputStream append(Path f, int bufferSize) throws IOException {return append(f, bufferSize, null);}/*** Append to an existing file (optional operation).* @param f the existing file to be appended.* @param bufferSize the size of the buffer to be used.* @param progress for reporting progress if it is not null.* @throws IOException*/public abstract FSDataOutputStream append(Path f, int bufferSize,Progressable progress) throws IOException;/*** Get replication.* * @deprecated Use getFileStatus() instead* @param src file name* @return file replication* @throws IOException*/ @Deprecatedpublic short getReplication(Path src) throws IOException {return getFileStatus(src).getReplication();}/*** Set replication for an existing file.* * @param src file name* @param replication new replication* @throws IOException* @return true if successful;* false if file does not exist or is a directory*/public boolean setReplication(Path src, short replication)throws IOException {return true;}/*** Renames Path src to Path dst. Can take place on local fs* or remote DFS.*/public abstract boolean rename(Path src, Path dst) throws IOException;/** Delete a file. *//** @deprecated Use delete(Path, boolean) instead */ @Deprecated public abstract boolean delete(Path f) throws IOException;/** Delete a file.** @param f the path to delete.* @param recursive if path is a directory and set to * true, the directory is deleted else throws an exception. In* case of a file the recursive can be set to either true or false. * @return true if delete is successful else false. * @throws IOException*/public abstract boolean delete(Path f, boolean recursive) throws IOException;/*** Mark a path to be deleted when FileSystem is closed.* When the JVM shuts down,* all FileSystem objects will be closed automatically.* Then,* the marked path will be deleted as a result of closing the FileSystem.** The path has to exist in the file system.* * @param f the path to delete.* @return true if deleteOnExit is successful, otherwise false.* @throws IOException*/public boolean deleteOnExit(Path f) throws IOException {if (!exists(f)) {return false;}synchronized (deleteOnExit) {deleteOnExit.add(f);}return true;}/*** Delete all files that were marked as delete-on-exit. This recursively* deletes all files in the specified paths.*/protected void processDeleteOnExit() {synchronized (deleteOnExit) {for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {Path path = iter.next();try {delete(path, true);}catch (IOException e) {LOG.info("Ignoring failure to deleteOnExit for path " + path);}iter.remove();}}}/** Check if exists.* @param f source file*/public boolean exists(Path f) throws IOException {try {return getFileStatus(f) != null;} catch (FileNotFoundException e) {return false;}}/** True iff the named path is a directory. *//** @deprecated Use getFileStatus() instead */ @Deprecatedpublic boolean isDirectory(Path f) throws IOException {try {return getFileStatus(f).isDir();} catch (FileNotFoundException e) {return false; // f does not exist }}/** True iff the named path is a regular file. */public boolean isFile(Path f) throws IOException {try {return !getFileStatus(f).isDir();} catch (FileNotFoundException e) {return false; // f does not exist }}/** The number of bytes in a file. *//** @deprecated Use getFileStatus() instead */ @Deprecatedpublic long getLength(Path f) throws IOException {return getFileStatus(f).getLen();}/** Return the {@link ContentSummary} of a given {@link Path}. */public ContentSummary getContentSummary(Path f) throws IOException {FileStatus status = getFileStatus(f);if (!status.isDir()) {// f is a filereturn new ContentSummary(status.getLen(), 1, 0);}// f is a directorylong[] summary = {0, 0, 1};for(FileStatus s : listStatus(f)) {ContentSummary c = s.isDir() ? getContentSummary(s.getPath()) :new ContentSummary(s.getLen(), 1, 0);summary[0] += c.getLength();summary[1] += c.getFileCount();summary[2] += c.getDirectoryCount();}return new ContentSummary(summary[0], summary[1], summary[2]);}final private static PathFilter DEFAULT_FILTER = new PathFilter() {public boolean accept(Path file) {return true;} };/*** List the statuses of the files/directories in the given path if the path is* a directory.* * @param f* given path* @return the statuses of the files/directories in the given patch* @throws IOException*/public abstract FileStatus[] listStatus(Path f) throws IOException;/** Filter files/directories in the given path using the user-supplied path* filter. Results are added to the given array <code>results</code>.*/private void listStatus(ArrayList<FileStatus> results, Path f,PathFilter filter) throws IOException {FileStatus listing[] = listStatus(f);if (listing != null) {for (int i = 0; i < listing.length; i++) {if (filter.accept(listing[i].getPath())) {results.add(listing[i]);}}}}/*** Filter files/directories in the given path using the user-supplied path* filter.* * @param f* a path name* @param filter* the user-supplied path filter* @return an array of FileStatus objects for the files under the given path* after applying the filter* @throws IOException* if encounter any problem while fetching the status*/public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException {ArrayList<FileStatus> results = new ArrayList<FileStatus>();listStatus(results, f, filter);return results.toArray(new FileStatus[results.size()]);}/*** Filter files/directories in the given list of paths using default* path filter.* * @param files* a list of paths* @return a list of statuses for the files under the given paths after* applying the filter default Path filter* @exception IOException*/public FileStatus[] listStatus(Path[] files)throws IOException {return listStatus(files, DEFAULT_FILTER);}/*** Filter files/directories in the given list of paths using user-supplied* path filter.* * @param files* a list of paths* @param filter* the user-supplied path filter* @return a list of statuses for the files under the given paths after* applying the filter* @exception IOException*/public FileStatus[] listStatus(Path[] files, PathFilter filter)throws IOException {ArrayList<FileStatus> results = new ArrayList<FileStatus>();for (int i = 0; i < files.length; i++) {listStatus(results, files[i], filter);}return results.toArray(new FileStatus[results.size()]);}/*** <p>Return all the files that match filePattern and are not checksum* files. Results are sorted by their names.* * <p>* A filename pattern is composed of <i>regular</i> characters and* <i>special pattern matching</i> characters, which are:** <dl>* <dd>* <dl>* <p>* <dt> <tt> ? </tt>* <dd> Matches any single character.** <p>* <dt> <tt> * </tt>* <dd> Matches zero or more characters.** <p>* <dt> <tt> [<i>abc</i>] </tt>* <dd> Matches a single character from character set* <tt>{<i>a,b,c</i>}</tt>.** <p>* <dt> <tt> [<i>a</i>-<i>b</i>] </tt>* <dd> Matches a single character from the character range* <tt>{<i>a...b</i>}</tt>. Note that character <tt><i>a</i></tt> must be* lexicographically less than or equal to character <tt><i>b</i></tt>.** <p>* <dt> <tt> [^<i>a</i>] </tt>* <dd> Matches a single character that is not from character set or range* <tt>{<i>a</i>}</tt>. Note that the <tt>^</tt> character must occur* immediately to the right of the opening bracket.** <p>* <dt> <tt> \<i>c</i> </tt>* <dd> Removes (escapes) any special meaning of character <i>c</i>.** <p>* <dt> <tt> {ab,cd} </tt>* <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>* * <p>* <dt> <tt> {ab,c{de,fh}} </tt>* <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>** </dl>* </dd>* </dl>** @param pathPattern a regular expression specifying a pth pattern* @return an array of paths that match the path pattern* @throws IOException*/public FileStatus[] globStatus(Path pathPattern) throws IOException {return globStatus(pathPattern, DEFAULT_FILTER);}/*** Return an array of FileStatus objects whose path names match pathPattern* and is accepted by the user-supplied path filter. Results are sorted by* their path names.* Return null if pathPattern has no glob and the path does not exist.* Return an empty array if pathPattern has a glob and no path matches it. * * @param pathPattern* a regular expression specifying the path pattern* @param filter* a user-supplied path filter* @return an array of FileStatus objects* @throws IOException if any I/O error occurs when fetching file status*/public FileStatus[] globStatus(Path pathPattern, PathFilter filter)throws IOException {String filename = pathPattern.toUri().getPath();List<String> filePatterns = GlobExpander.expand(filename);if (filePatterns.size() == 1) {return globStatusInternal(pathPattern, filter);} else {List<FileStatus> results = new ArrayList<FileStatus>();for (String filePattern : filePatterns) {FileStatus[] files = globStatusInternal(new Path(filePattern), filter);for (FileStatus file : files) {results.add(file);}}return results.toArray(new FileStatus[results.size()]);}}private FileStatus[] globStatusInternal(Path pathPattern, PathFilter filter)throws IOException {Path[] parents = new Path[1];int level = 0;String filename = pathPattern.toUri().getPath();// path has only zero componentif ("".equals(filename) || Path.SEPARATOR.equals(filename)) {return getFileStatus(new Path[]{pathPattern});}// path has at least one componentString[] components = filename.split(Path.SEPARATOR);// get the first componentif (pathPattern.isAbsolute()) {parents[0] = new Path(Path.SEPARATOR);level = 1;} else {parents[0] = new Path(Path.CUR_DIR);}// glob the paths that match the parent path, i.e., [0, components.length-1]boolean[] hasGlob = new boolean[]{false};Path[] parentPaths = globPathsLevel(parents, components, level, hasGlob);FileStatus[] results;if (parentPaths == null || parentPaths.length == 0) {results = null;} else {// Now work on the last component of the pathGlobFilter fp = new GlobFilter(components[components.length - 1], filter);if (fp.hasPattern()) { // last component has a pattern// list parent directories and then glob the resultsresults = listStatus(parentPaths, fp);hasGlob[0] = true;} else { // last component does not have a pattern// get all the path namesArrayList<Path> filteredPaths = new ArrayList<Path>(parentPaths.length);for (int i = 0; i < parentPaths.length; i++) {parentPaths[i] = new Path(parentPaths[i],components[components.length - 1]);if (fp.accept(parentPaths[i])) {filteredPaths.add(parentPaths[i]);}}// get all their statusesresults = getFileStatus(filteredPaths.toArray(new Path[filteredPaths.size()]));}}// Decide if the pathPattern contains a glob or notif (results == null) {if (hasGlob[0]) {results = new FileStatus[0];}} else {if (results.length == 0 ) {if (!hasGlob[0]) {results = null;}} else {Arrays.sort(results);}}return results;}/** For a path of N components, return a list of paths that match the* components [<code>level</code>, <code>N-1</code>].*/private Path[] globPathsLevel(Path[] parents, String[] filePattern,int level, boolean[] hasGlob) throws IOException {if (level == filePattern.length - 1)return parents;if (parents == null || parents.length == 0) {return null;}GlobFilter fp = new GlobFilter(filePattern[level]);if (fp.hasPattern()) {parents = FileUtil.stat2Paths(listStatus(parents, fp));hasGlob[0] = true;} else {for (int i = 0; i < parents.length; i++) {parents[i] = new Path(parents[i], filePattern[level]);}}return globPathsLevel(parents, filePattern, level + 1, hasGlob);}/* A class that could decide if a string matches the glob or not */private static class GlobFilter implements PathFilter {private PathFilter userFilter = DEFAULT_FILTER;private Pattern regex;private boolean hasPattern = false;/** Default pattern character: Escape any special meaning. */private static final char PAT_ESCAPE = '\\';/** Default pattern character: Any single character. */private static final char PAT_ANY = '.';/** Default pattern character: Character set close. */private static final char PAT_SET_CLOSE = ']';GlobFilter() {}GlobFilter(String filePattern) throws IOException {setRegex(filePattern);}GlobFilter(String filePattern, PathFilter filter) throws IOException {userFilter = filter;setRegex(filePattern);}private boolean isJavaRegexSpecialChar(char pChar) {return pChar == '.' || pChar == '$' || pChar == '(' || pChar == ')' ||pChar == '|' || pChar == '+';}void setRegex(String filePattern) throws IOException {int len;int setOpen;int curlyOpen;boolean setRange;StringBuilder fileRegex = new StringBuilder();// Validate the patternlen = filePattern.length();if (len == 0)return;setOpen = 0;setRange = false;curlyOpen = 0;for (int i = 0; i < len; i++) {char pCh;// Examine a single pattern characterpCh = filePattern.charAt(i);if (pCh == PAT_ESCAPE) {fileRegex.append(pCh);i++;if (i >= len)error("An escaped character does not present", filePattern, i);pCh = filePattern.charAt(i);} else if (isJavaRegexSpecialChar(pCh)) {fileRegex.append(PAT_ESCAPE);} else if (pCh == '*') {fileRegex.append(PAT_ANY);hasPattern = true;} else if (pCh == '?') {pCh = PAT_ANY;hasPattern = true;} else if (pCh == '{') {fileRegex.append('(');pCh = '(';curlyOpen++;hasPattern = true;} else if (pCh == ',' && curlyOpen > 0) {fileRegex.append(")|");pCh = '(';} else if (pCh == '}' && curlyOpen > 0) {// End of a groupcurlyOpen--;fileRegex.append(")");pCh = ')';} else if (pCh == '[' && setOpen == 0) {setOpen++;hasPattern = true;} else if (pCh == '^' && setOpen > 0) {} else if (pCh == '-' && setOpen > 0) {// Character set rangesetRange = true;} else if (pCh == PAT_SET_CLOSE && setRange) {// Incomplete character set rangeerror("Incomplete character set range", filePattern, i);} else if (pCh == PAT_SET_CLOSE && setOpen > 0) {// End of a character setif (setOpen < 2)error("Unexpected end of set", filePattern, i);setOpen = 0;} else if (setOpen > 0) {// Normal character, or the end of a character set rangesetOpen++;setRange = false;}fileRegex.append(pCh);}// Check for a well-formed patternif (setOpen > 0 || setRange || curlyOpen > 0) {// Incomplete character set or character rangeerror("Expecting set closure character or end of range, or }", filePattern, len);}regex = Pattern.compile(fileRegex.toString());}boolean hasPattern() {return hasPattern;}public boolean accept(Path path) {return regex.matcher(path.getName()).matches() && userFilter.accept(path);}private void error(String s, String pattern, int pos) throws IOException {throw new IOException("Illegal file pattern: "+s+ " for glob "+ pattern + " at " + pos);}}/** Return the current user's home directory in this filesystem.* The default implementation returns "/user/$USER/".*/public Path getHomeDirectory() {return new Path("/user/"+System.getProperty("user.name")).makeQualified(this);}/*** Set the current working directory for the given file system. All relative* paths will be resolved relative to it.* * @param new_dir*/public abstract void setWorkingDirectory(Path new_dir);/*** Get the current working directory for the given file system* @return the directory pathname*/public abstract Path getWorkingDirectory();/*** Call {@link #mkdirs(Path, FsPermission)} with default permission.*/public boolean mkdirs(Path f) throws IOException {return mkdirs(f, FsPermission.getDefault());}/*** Make the given file and all non-existent parents into* directories. Has the semantics of Unix 'mkdir -p'.* Existence of the directory hierarchy is not an error.*/public abstract boolean mkdirs(Path f, FsPermission permission) throws IOException;/*** The src file is on the local disk. Add it to FS at* the given dst name and the source is kept intact afterwards*/public void copyFromLocalFile(Path src, Path dst)throws IOException {copyFromLocalFile(false, src, dst);}/*** The src files is on the local disk. Add it to FS at* the given dst name, removing the source afterwards.*/public void moveFromLocalFile(Path[] srcs, Path dst)throws IOException {copyFromLocalFile(true, true, srcs, dst);}/*** The src file is on the local disk. Add it to FS at* the given dst name, removing the source afterwards.*/public void moveFromLocalFile(Path src, Path dst)throws IOException {copyFromLocalFile(true, src, dst);}/*** The src file is on the local disk. Add it to FS at* the given dst name.* delSrc indicates if the source should be removed*/public void copyFromLocalFile(boolean delSrc, Path src, Path dst)throws IOException {copyFromLocalFile(delSrc, true, src, dst);}/*** The src files are on the local disk. Add it to FS at* the given dst name.* delSrc indicates if the source should be removed*/public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst)throws IOException {Configuration conf = getConf();FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);}/*** The src file is on the local disk. Add it to FS at* the given dst name.* delSrc indicates if the source should be removed*/public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)throws IOException {Configuration conf = getConf();FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);}/*** The src file is under FS, and the dst is on the local disk.* Copy it from FS control to the local dst name.*/public void copyToLocalFile(Path src, Path dst) throws IOException {copyToLocalFile(false, src, dst);}/*** The src file is under FS, and the dst is on the local disk.* Copy it from FS control to the local dst name.* Remove the source afterwards*/public void moveToLocalFile(Path src, Path dst) throws IOException {copyToLocalFile(true, src, dst);}/*** The src file is under FS, and the dst is on the local disk.* Copy it from FS control to the local dst name.* delSrc indicates if the src will be removed or not.*/ public void copyToLocalFile(boolean delSrc, Path src, Path dst)throws IOException {FileUtil.copy(this, src, getLocal(getConf()), dst, delSrc, getConf());}/*** Returns a local File that the user can write output to. The caller* provides both the eventual FS target name and the local working* file. If the FS is local, we write directly into the target. If* the FS is remote, we write into the tmp local area.*/public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)throws IOException {return tmpLocalFile;}/*** Called when we're all done writing to the target. A local FS will* do nothing, because we've written to exactly the right place. A remote* FS will copy the contents of tmpLocalFile to the correct target at* fsOutputFile.*/public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)throws IOException {moveFromLocalFile(tmpLocalFile, fsOutputFile);}/*** No more filesystem operations are needed. Will* release any held locks.*/public void close() throws IOException {// delete all files that were marked as delete-on-exit. processDeleteOnExit();CACHE.remove(this.key, this);}/** Return the total size of all files in the filesystem.*/public long getUsed() throws IOException{long used = 0;FileStatus[] files = listStatus(new Path("/"));for(FileStatus file:files){used += file.getLen();}return used;}/*** Get the block size for a particular file.* @param f the filename* @return the number of bytes in a block*//** @deprecated Use getFileStatus() instead */ @Deprecatedpublic long getBlockSize(Path f) throws IOException {return getFileStatus(f).getBlockSize();}/** Return the number of bytes that large input files should be optimally* be split into to minimize i/o time. */public long getDefaultBlockSize() {// default to 32MB: large enough to minimize the impact of seeksreturn getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);}/*** Get the default replication.*/public short getDefaultReplication() { return 1; }/*** Return a file status object that represents the path.* @param f The path we want information from* @return a FileStatus object* @throws FileNotFoundException when the path does not exist;* IOException see specific implementation*/public abstract FileStatus getFileStatus(Path f) throws IOException;/*** Get the checksum of a file.** @param f The file path* @return The file checksum. The default return value is null,* which indicates that no checksum algorithm is implemented* in the corresponding FileSystem.*/public FileChecksum getFileChecksum(Path f) throws IOException {return null;}/*** Set the verify checksum flag. This is only applicable if the * corresponding FileSystem supports checksum. By default doesn't do anything.* @param verifyChecksum*/public void setVerifyChecksum(boolean verifyChecksum) {//doesn't do anything }/*** Return a list of file status objects that corresponds to the list of paths* excluding those non-existent paths.* * @param paths* the list of paths we want information from* @return a list of FileStatus objects* @throws IOException* see specific implementation*/private FileStatus[] getFileStatus(Path[] paths) throws IOException {if (paths == null) {return null;}ArrayList<FileStatus> results = new ArrayList<FileStatus>(paths.length);for (int i = 0; i < paths.length; i++) {try {results.add(getFileStatus(paths[i]));} catch (FileNotFoundException e) { // do nothing }}return results.toArray(new FileStatus[results.size()]);}/*** Set permission of a path.* @param p* @param permission*/public void setPermission(Path p, FsPermission permission) throws IOException {}/*** Set owner of a path (i.e. a file or a directory).* The parameters username and groupname cannot both be null.* @param p The path* @param username If it is null, the original username remains unchanged.* @param groupname If it is null, the original groupname remains unchanged.*/public void setOwner(Path p, String username, String groupname) throws IOException {}/*** Set access time of a file* @param p The path* @param mtime Set the modification time of this file.* The number of milliseconds since Jan 1, 1970. * A value of -1 means that this call should not set modification time.* @param atime Set the access time of this file.* The number of milliseconds since Jan 1, 1970. * A value of -1 means that this call should not set access time.*/public void setTimes(Path p, long mtime, long atime) throws IOException {}private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);if (clazz == null) {throw new IOException("No FileSystem for scheme: " + uri.getScheme());}FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);fs.initialize(uri, conf);return fs;}/** Caching FileSystem objects */static class Cache {private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();synchronized FileSystem get(URI uri, Configuration conf) throws IOException{Key key = new Key(uri, conf);FileSystem fs = map.get(key);if (fs == null) {fs = createFileSystem(uri, conf);if (map.isEmpty() && !clientFinalizer.isAlive()) {Runtime.getRuntime().addShutdownHook(clientFinalizer);}fs.key = key;map.put(key, fs);}return fs;}synchronized void remove(Key key, FileSystem fs) {if (map.containsKey(key) && fs == map.get(key)) {map.remove(key);if (map.isEmpty() && !clientFinalizer.isAlive()) {if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {LOG.info("Could not cancel cleanup thread, though no " +"FileSystems are open");}}}}synchronized void closeAll() throws IOException {List<IOException> exceptions = new ArrayList<IOException>();for(; !map.isEmpty(); ) {Map.Entry<Key, FileSystem> e = map.entrySet().iterator().next();final Key key = e.getKey();final FileSystem fs = e.getValue();//remove from cache remove(key, fs);if (fs != null) {try {fs.close();}catch(IOException ioe) {exceptions.add(ioe);}}}if (!exceptions.isEmpty()) {throw MultipleIOException.createIOException(exceptions);}}/** FileSystem.Cache.Key */static class Key {final String scheme;final String authority;final String username;Key(URI uri, Configuration conf) throws IOException {scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();UserGroupInformation ugi = UserGroupInformation.readFrom(conf);if (ugi == null) {try {ugi = UserGroupInformation.login(conf);} catch(LoginException e) {LOG.warn("uri=" + uri, e);}}username = ugi == null? null: ugi.getUserName();}/** {@inheritDoc} */public int hashCode() {return (scheme + authority + username).hashCode();}static boolean isEqual(Object a, Object b) {return a == b || (a != null && a.equals(b)); }/** {@inheritDoc} */public boolean equals(Object obj) {if (obj == this) {return true;}if (obj != null && obj instanceof Key) {Key that = (Key)obj;return isEqual(this.scheme, that.scheme)&& isEqual(this.authority, that.authority)&& isEqual(this.username, that.username);}return false; }/** {@inheritDoc} */public String toString() {return username + "@" + scheme + "://" + authority; }}}public static final class Statistics {private final String scheme;private AtomicLong bytesRead = new AtomicLong();private AtomicLong bytesWritten = new AtomicLong();public Statistics(String scheme) {this.scheme = scheme;}/*** Increment the bytes read in the statistics* @param newBytes the additional bytes read*/public void incrementBytesRead(long newBytes) {bytesRead.getAndAdd(newBytes);}/*** Increment the bytes written in the statistics* @param newBytes the additional bytes written*/public void incrementBytesWritten(long newBytes) {bytesWritten.getAndAdd(newBytes);}/*** Get the total number of bytes read* @return the number of bytes*/public long getBytesRead() {return bytesRead.get();}/*** Get the total number of bytes written* @return the number of bytes*/public long getBytesWritten() {return bytesWritten.get();}public String toString() {return bytesRead + " bytes read and " + bytesWritten + " bytes written";}/*** Reset the counts of bytes to 0.*/public void reset() {bytesWritten.set(0);bytesRead.set(0);}/*** Get the uri scheme associated with this statistics object.* @return the schema associated with this set of statistics*/public String getScheme() {return scheme;}}/*** Get the Map of Statistics object indexed by URI Scheme.* @return a Map having a key as URI scheme and value as Statistics object* @deprecated use {@link #getAllStatistics} instead*/public static synchronized Map<String, Statistics> getStatistics() {Map<String, Statistics> result = new HashMap<String, Statistics>();for(Statistics stat: statisticsTable.values()) {result.put(stat.getScheme(), stat);}return result;}/*** Return the FileSystem classes that have Statistics*/public static synchronized List<Statistics> getAllStatistics() {return new ArrayList<Statistics>(statisticsTable.values());}/*** Get the statistics for a particular file system* @param cls the class to lookup* @return a statistics object*/public static synchronized Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {Statistics result = statisticsTable.get(cls);if (result == null) {result = new Statistics(scheme);statisticsTable.put(cls, result);}return result;}public static synchronized void clearStatistics() {for(Statistics stat: statisticsTable.values()) {stat.reset();}}public static synchronizedvoid printStatistics() throws IOException {for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: statisticsTable.entrySet()) {System.out.println(" FileSystem " + pair.getKey().getName() + ": " + pair.getValue());}} }
FileSystem
Hadoop 输入/输出流
Hadoop抽象文件系统和java类似,也是使用流机制进行文件的读写,用于读文件数据流和写文件的抽象类分别是:FSDataInputStream和FSDataOutputStream
1、FSDataInputStream
public class FSDataInputStream extends DataInputStreamimplements Seekable, PositionedReadable { …… }
可以看到,FSDataInputStream继承自DataInputStream类,实现了Seekable和PositionedReadable接口
Seekable接口提供在(文件)流中进行随机存取的方法,其功能类似于RandomAccessFile中的getFilePointer()和seek()方法,它提供了某种随机定位文件读取位置的能力
Seekable接口代码以及相关注释如下:
/** 接口,用于支持在流中定位. */ public interface Seekable {/*** 将当前偏移量设置到参数位置,下次读取数据将从该位置开始*/void seek(long pos) throws IOException;/**得到当前偏移量 */long getPos() throws IOException;/**重新选择一个副本 */boolean seekToNewSource(long targetPos) throws IOException; }
完整的FSDataInputStream类源代码如下:
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package org.apache.hadoop.fs;import java.io.*;/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}* and buffers input through a {@link BufferedInputStream}. */ public class FSDataInputStream extends DataInputStreamimplements Seekable, PositionedReadable {public FSDataInputStream(InputStream in)throws IOException {super(in);if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable");}}public synchronized void seek(long desired) throws IOException {((Seekable)in).seek(desired);}public long getPos() throws IOException {return ((Seekable)in).getPos();}public int read(long position, byte[] buffer, int offset, int length)throws IOException {return ((PositionedReadable)in).read(position, buffer, offset, length);}public void readFully(long position, byte[] buffer, int offset, int length)throws IOException {((PositionedReadable)in).readFully(position, buffer, offset, length);}public void readFully(long position, byte[] buffer)throws IOException {((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);}public boolean seekToNewSource(long targetPos) throws IOException {return ((Seekable)in).seekToNewSource(targetPos); } }
FSDataInputStream
FSDataInputStream实现的另一个接口是PositionedReadable,它提供了从流中某一个位置开始读数据的一系列方法:
//接口,用于在流中进行定位读 public interface PositionedReadable {//从指定位置开始,读最多指定长度的数据到buffer中offset开始的缓冲区中//注意,该函数不改变读流的当前位置,同时,它是线程安全的public int read(long position, byte[] buffer, int offset, int length)throws IOException;//从指定位置开始,读指定长度的数据到buffer中offset开始的缓冲区中public void readFully(long position, byte[] buffer, int offset, int length)throws IOException;public void readFully(long position, byte[] buffer) throws IOException; }
PositionedReadable中的3个读方法,都不会改变流的当前位置,而且还是线程安全的
2、FSInputStream
org.apache.hadoop.fs包中还包含抽象类FSInputStream。Seekable接口和PositionedReadable中的方法都成为这个类的抽象方法
在FSInputStream类中,通过Seekable接口的seek()方法实现了PositionedReadable接口中的read()方法
//实现PositionedReadable.read()方法 public int read(long position, byte[] buffer, int offset, int length) throws IOException {/*** 由于PositionedReadable.read()是线程安全的,所以此处要借助synchronized (this) * 来保证方法被调用的时候其他方法不会被调用,也保证不会有其他线程改变Seekable.getPos()保存的* 当前读位置*/synchronized (this) {long oldPos = getPos(); //保存当前读的位置,调用 Seekable.getPos()int nread = -1;try {seek(position); //移动读数据的位置,调用Seekable.seek()nread = read(buffer, offset, length); //调用InputStream.read()读取数据} finally {seek(oldPos); //调用Seekable.seek()恢复InputStream.read()前的位置 }return nread;} }
完整的FSInputStream源代码如下:
package org.apache.hadoop.fs;import java.io.*;/***************************************************************** FSInputStream is a generic old InputStream with a little bit* of RAF-style seek ability.******************************************************************/ public abstract class FSInputStream extends InputStreamimplements Seekable, PositionedReadable {/*** Seek to the given offset from the start of the file.* The next read() will be from that location. Can't* seek past the end of the file.*/public abstract void seek(long pos) throws IOException;/*** Return the current offset from the start of the file*/public abstract long getPos() throws IOException;/*** Seeks a different copy of the data. Returns true if * found a new source, false otherwise.*/public abstract boolean seekToNewSource(long targetPos) throws IOException;public int read(long position, byte[] buffer, int offset, int length)throws IOException {synchronized (this) {long oldPos = getPos();int nread = -1;try {seek(position);nread = read(buffer, offset, length);} finally {seek(oldPos);}return nread;}}public void readFully(long position, byte[] buffer, int offset, int length)throws IOException {int nread = 0;while (nread < length) {int nbytes = read(position+nread, buffer, offset+nread, length-nread);if (nbytes < 0) {throw new EOFException("End of file reached before reading fully.");}nread += nbytes;}}public void readFully(long position, byte[] buffer)throws IOException {readFully(position, buffer, 0, buffer.length);} }
FSInputStream
注意:hadoop中没有相对应的FSOutputStream类
3、FSDataOutputStream
FSDataOutputStream用于写数据,和FSDataInputStream类似,继承自DataOutputStream,提供writeInt()和writeChar()等方法,但是FSDataOutputStream更加的简单,没有实现Seekable接口,也就是说,Hadoop文件系统不支持随机写,用户不能在文件中重新定位写位置,并通过写数据来覆盖文件原有的内容。单用户可以通过getPos()方法获得当前流的写位置,为了实现getPos()方法,FSDataOutputStream定义了内部类PositionCache,该类继承自FilterOutputStream,并通过重载write()方法跟踪目前流的写位置.
PositionCache是一个典型的过滤流,在基础的流功能上添加了getPos()方法,同时利用FileSystem.Statistics实现了文件系统读写的一些统计。
public class FSDataOutputStream extends DataOutputStream implements Syncable {private OutputStream wrappedStream;private static class PositionCache extends FilterOutputStream {private FileSystem.Statistics statistics;long position; //当前流的写位置public PositionCache(OutputStream out, FileSystem.Statistics stats,long pos) throws IOException {super(out);statistics = stats;position = pos;}public void write(int b) throws IOException {out.write(b);position++; //跟新当前位置if (statistics != null) {statistics.incrementBytesWritten(1); //跟新文件统计值 }}public void write(byte b[], int off, int len) throws IOException {out.write(b, off, len);position += len; // update positionif (statistics != null) {statistics.incrementBytesWritten(len);}}public long getPos() throws IOException {return position; //返回当前流的写位置 }public void close() throws IOException {out.close();}}@Deprecatedpublic FSDataOutputStream(OutputStream out) throws IOException {this(out, null);}public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats)throws IOException {this(out, stats, 0);}public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats,long startPosition) throws IOException {super(new PositionCache(out, stats, startPosition)); //直接生成PositionCache对象并调用父类构造方法wrappedStream = out;}public long getPos() throws IOException {return ((PositionCache)out).getPos();}public void close() throws IOException {out.close(); // This invokes PositionCache.close() }// Returns the underlying output stream. This is used by unit tests.public OutputStream getWrappedStream() {return wrappedStream;}/** {@inheritDoc} */public void sync() throws IOException {if (wrappedStream instanceof Syncable) {((Syncable)wrappedStream).sync();}} }
FSDataOutputStream实现了Syncable接口,该接口只有一个函数sync(),其目的和Linux中系统调用sync()类似,用于将流中保存的数据同步到设备中
/** This interface declare the sync() operation. */ public interface Syncable {/*** Synchronize all buffer with the underlying devices.* @throws IOException*/public void sync() throws IOException; }
hadoop文件系统与I/O流相关推荐
- Hadoop文件系统
HDFS全称是Hadoop Distributed System.HDFS是为以流的方式存取大文件而设计的.适用于几百MB,GB以及TB,并写一次读多次的场合.而对于低延时数据访问.大量小文件.同时写 ...
- 大数据-Hadoop文件系统- 学习笔记 -BH2
Hadoop文件系统(HDFS) HDFS的概念和特性 首先,它是一个文件系统,用于存储文件,通过统一的命名空间--目录树来定位文件 其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器 ...
- 第三天 hadoop文件系统
hadoop 文件系统 一.系统启动脚本分析( hdfs+yarn : 五个守护进程 ) 二.HDFS的常见操作 三.分布式文件系统的介绍 <1>概念 <2>特点 <3& ...
- Hadoop文件系统元数据fsimage和编辑日志edits
提到NameNode的$dfs.namenode.name.dir/current/文件夹的几个文件: 1 current/ 2 |-- VERSION 3 |-- edits_* 4 |-- fsi ...
- 文件系统位于服务器,如何访问hadoop文件系统上的文件,该文件系统位于与本地计算机不同的服务器上?...
我有一台本地计算机(local_user@local_machine).hadoop文件系统位于不同的服务器(some_user@another_server)上.hadoop服务器中的一个用户名为t ...
- Java 读取 Hadoop 文件系统 文件
放一个hello的文件到Hadoop 文件系统 根目录: [root@hadoop local]# touch hello [root@hadoop local]# vi hello [root@ha ...
- Hadoop 文件系统与 COS 之间的数据迁移
Hadoop Distcp(Distributed copy)主要是用于 Hadoop 文件系统内部或之间进行大规模数据复制的工具,它基于 Map/Reduce 实现文件分发.错误处理以及最终的报告生 ...
- 如何通过浏览器访问Hadoop文件系统
如何通过浏览器访问Hadoop文件系统 打开浏览器,再浏览器网址处输入:(谷歌浏览器吧,IE好像不行) 192.168.x.xxx:50070 进入 前面为自己的虚拟机IP地址,例如我的虚拟机IP地址 ...
- Hadoop实战: 论坛点击流日志分析
简介 网站点击流日志数据,比如,点击了哪一个链接,在哪个网页停留时间最多,采用了哪个搜索项.总体浏览时间等.而所有这些信息都可被保存在网站日志中.通过分析这些数据,可以获知许多对网站运营至关重要的信息 ...
最新文章
- BeanUtils入门
- linux内核杂记(16)-系统调用(3)
- 当singleton Bean依赖propotype Bean,可以使用在配置Bean添加look-method来解决
- VTK:Points之SignedDistance
- 使用工具分析 SAP UI5 应用前端执行的性能问题
- 程序员吐槽_产品经理吐槽大会,程序员勿入
- Java异常处理深入理解_关于java异常处理机制的深入理解.doc
- [PyTorch] 记录一次PyTorch版本更新
- HDU 4296 building
- 在图层上使用CATransform3D制做三维动画-b
- 我的世界服务器彩虹云,《我的世界》彩虹云跑酷地图存档
- 软件项目的规模、工作量和成本是如何进行估算或评估的?
- (已解决)报错:collect2 error ld returned 1 exit status
- LTE CQI优化提升方法
- 常用 vm 参数分析
- win10 企业版 设备管理器找不到 端口(COM和LPT)
- 国科大学习资料--人工智能原理与算法-2020年期末考试题解析(学长整理)
- Linux删除重复行
- 25.JavaScript的Symbol类型、隐藏属性、全局注册表
- 计算机运行速度和科学研究,()的计算机运算速度可达到一太次以上,主要用于国家高科技领域与工程计算和尖端技术研究。...
热门文章
- 修改linux文件夹密码忘记,Linux中如何重新获取丢失的密码
- 2个红外传感器循迹原理_红外线光学气体浓度传感器作用原理
- java静态多态_Java静态方法不具有多态性详解
- oracle method_opt,统计量收集Method_Opt参数使用(下)
- java main启动spring_gradle 搭建springMVC项目,main函数启动
- html文件设置成mac屏保,Mac怎么设置屏幕保护?如何设置Mac屏幕保护程序?
- 日志框架 android,深度剖析Android JDK 日志框架
- 苹果cms10的php.ini目录列表,[苹果cmsV10]常见问题整理官方版
- c语言函数参数类型格式化,格式化输出的几种方法 主要介绍format函数的用法
- java fileupload 文件_java用Commons fileupload 文件的上传