hadoop 文件上传
hdsf 本质上就是一个分布式文件系统,只是相对于普通计算机来说,它可以很容易横向扩展,自带高可用机制。
我们要在Hadoop做MapReduce计算的时候,就需要把写好的程序打成jar包放到hdfs上。hadoop提供多种方式方式让你能够把文件放入hdfs,比如 自带的shell命令行客户端put命令,java客户端的FileSystem
,REST的HDFS API(WebHDFS与HttpFS)。
命令行上传
hadoop shell 的 put 命令直接把本地文件上传到hdfs。
使用方法:hadoop fs -put <localsrc> ... <dst>从本地文件系统中复制单个或多个源路径到目标文件系统。也支持从标准输入中读取输入写入目标文件系统。
hadoop fs -put localfile /user/hadoop/hadoopfile
hadoop fs -put localfile1 localfile2 /user/hadoop/hadoopdir
hadoop fs -put localfile hdfs://host:port/hadoop/hadoopfile
hadoop fs -put - hdfs://host:port/hadoop/hadoopfile
从标准输入中读取输入。
返回值:成功返回0,失败返回-1。
hadoop shell 官方文档:http://hadoop.apache.org/docs/r1.0.4/cn/hdfs_shell.html
Java客户端上传
我们做工程化、平台化后,需要通过程序与hadoop进行交换,hadoop提供了FileSystem
工具类供我们使用。
FileSystem官方文档:http://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html
普通上传
普遍上传很简单,直接构建FileSystem的实例,调用API上传即可。需要注意的是fs.defaultFS
这个配置项的值是活动的namenode地址+端口。
eg.1
public class CopyToHDFS {public static void main(String[] args) throws IOException {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://active.namenode.host:9000");FileSystem fs = FileSystem.get(conf);fs.copyFromLocalFile(new Path("/home/lance/log.txt"), new Path("/log"));}
}
eg.2
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;import java.io.File;
import java.io.IOException;
import java.io.InputStream;public class HdfsUpload {public static void main(String[] args) throws IOException {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://active.namenode.host:9000");FileSystem fs = FileSystem.get(conf);String remoteDst = "/user/hive/devbak/itclj/abc112234.txt";File inputFile = new File("D:/abc112233.txt");InputStream in = FileUtils.openInputStream(inputFile);FSDataOutputStream out = fs.create(new Path(remoteDst));IOUtils.copyBytes(in, out, conf);}}
eg.3 通过代理账号上传
package com.itclj;import com.itclj.kerberos.KerberosLogin;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedExceptionAction;/*** @Author lujun.chen* @Date 2020/2/1 13:33* @Version 1.0*/
public class HdfsUpload {public static void main(String[] args) throws IOException, InterruptedException {KerberosLogin login = new KerberosLogin();login.login();//直接上传//upload();//通过代理账号上传userProxyUpload();}public static void upload() throws IOException {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://dptestservice1");//HA 模式的Hadoop集群//FileSystem fs = FileSystem.get(new URI("hdfs://s01cq-app-196-43-msxf.host:9000"), conf, "hive");FileSystem fs = FileSystem.get(conf);String remoteDst = "/user/hive/devbak/itclj/abc112234.txt";File inputFile = new File("D:/abc112233.txt");InputStream in = FileUtils.openInputStream(inputFile);FSDataOutputStream out = fs.create(new Path(remoteDst));IOUtils.copyBytes(in, out, conf);out.close();in.close();fs.close();System.out.println("===================");}public static void userProxyUpload() throws IOException, InterruptedException {String userProxy = "dsp";UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(userProxy, currentUser);boolean rest = proxyUser.doAs(new PrivilegedExceptionAction<Boolean>() {public Boolean run() throws Exception {upload();return true;}});System.out.print("rest=" + rest);}}
eg.4 上传限流
@Override
public HdfsOptVO upload(String path, InputStream inputStream) {try (FileSystem fs = buildFileSystem()) {try (FSDataOutputStream out = fs.create(new Path(path))) {DataTransferThrottler throttler = new DataTransferThrottler(hdfsDataTransferThrottlerSize);//限流byte buf[] = new byte[hdfsBufferSize];int bytesRead = inputStream.read(buf);while (bytesRead >= 0) {out.write(buf, 0, bytesRead);bytesRead = inputStream.read(buf);throttler.throttle(bytesRead);}//IOUtils.copyBytes(inputStream, out, 4096);inputStream.close();}} catch (Exception ex) {logger.error("HDFS文件上传失败,path: {}", path, ex);throw new DspException(CodeEnum.HDFS_OPT_UPLOAD_ERROR.getCode(), "文件上传出错," + ex.getMessage());}return new HdfsOptVO.Builder().hdfsUri(HDFS_PROTOCOL + path).build();
}
HA模式的hadoop集群文件上传
HA模式的Hadoop集群文件上传,代码和普通模式都一样的,fs.defaultFS
配置项改为集群名称(hdfs://dptestservice1
),但是单纯改这个值肯定是不行的,系统会报如下错误:
Exception in thread "main" java.lang.IllegalArgumentException: java.net.UnknownHostException: dptestservice1
主要还是工程缺少必要的Hadoop属性配置,配置属性方式一般有两种:一种是直接把hadoop的配置文件放到工程目录下,在配置文件里面设置属性;一种是如上所示,用set方法设置。
以下就是所需的关键属性,属性值得和自己集群的配置的属性值一样。
core-site.xml
<configuration><property><name>fs.defaultFS</name><value>hdfs://dptestservice1</value></property>
</configuration>
hdfs-site.xml
<configuration><property><name>dfs.nameservices</name><value>dptestservice1</value></property><property><name>dfs.ha.namenodes.dptestservice1</name><value>namenode148,namenode133</value></property><property><name>dfs.namenode.rpc-address.dptestservice1.namenode148</name><value>s01cq-app-196-39-itclj.host:9000</value></property><property><name>dfs.namenode.rpc-address.dptestservice1.namenode133</name><value>s01cq-app-196-43-itclj.host:9000</value></property><property><name>dfs.client.failover.proxy.provider.dptestservice1</name><value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value></property></configuration>
eg
package com.itclj;import com.itclj.kerberos.KerberosLogin;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;import java.io.File;
import java.io.IOException;
import java.io.InputStream;/*** @Author lujun.chen* @Date 2020/2/1 13:33* @Version 1.0*/
public class HdfsUpload {public static void main(String[] args) throws IOException {// 集群开启了开启了kerberos 认证的,需要先进行kerberos认证。KerberosLogin login = new KerberosLogin();login.login();Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://dptestservice1");//HA 模式的Hadoop集群FileSystem fs = FileSystem.get(conf);String remoteDst = "/user/hive/devbak/itclj/abc112234.txt";File inputFile = new File("D:/abc112233.txt");InputStream in = FileUtils.openInputStream(inputFile);FSDataOutputStream out = fs.create(new Path(remoteDst));IOUtils.copyBytes(in, out, conf);}}
工程代码:https://github.com/clj198606061111/hadoop-client-demo/blob/master/src/main/java/com/itclj/HdfsUpload.java
REST的API上传
hadoop提供了2套REST的API,WebHDFS与HttpFS。
- 两者都是基于REST的HDFS API,使得一个集群外的host可以不用安装HADOOP和JAVA环境就可以对集群内的HADOOP进行访问,并且client不受语言的限制。
- WebHDFS是HDFS内置的、默认开启的一个服务,而HttpFS是HDFS一个独立的服务,若使用需要手动安装(CDH中安装HDFS时将HttpFS勾选上即可)。
- WebHDFS是HortonWorks开发的,然后捐给了Apache;而HttpFS是Cloudera开发的,也捐给了Apache。
- 当client请求某文件时,WebHDFS会将其重定向到该资源所在的datanode,而HttpFs相等于一个“网关”,所有的数据先传输到该httpfs server,再由该httpfs server传输到client。
WebHDFS
官方文档:https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Open_and_Read_a_File
HttpFS
官方文档:http://hadoop.apache.org/docs/current/hadoop-hdfs-httpfs/index.html
hadoop 文件上传相关推荐
- hadoop文件上传
要能通过java上传文件到网页上localhost:9870 首先要启动start-yarn.sh start-dfs.sh jps能显示五个功能 java项目里 配置好maven的各项,与现有版本匹 ...
- hadoop HDFS的文件夹创建、文件上传、文件下载、文件夹删除,文件更名、文件详细信息、文件类型判断(文件夹或者文件)
摘要: 本篇文章主要介绍的是hadoop hdfs的基础api的使用.包括Windows端依赖配置,Maven依赖配置.最后就是进行实际的操作,包括:获取远程hadoop hdfs连接,并对其进行的一 ...
- HDFS中JAVA API的使用(hadoop的文件上传和下载)
HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件.删除文件.读取文件内容等操作.下面记录一下使用JAVA API对HDFS中的文件进行操作的过程. 对分HDFS中的 ...
- 大数据之-Hadoop之HDFS的API操作_文件上传---大数据之hadoop工作笔记0056
然后我们HDFS的java的api操作. 比如我们有个需求从mysql中把数据上传到完全分布式的集群中的,hdfs中 比如这里首先我们演示,把e盘下的banzhang.txt文件上传到完全分布式集群h ...
- java使用Jsch实现远程操作linux服务器进行文件上传、下载,删除和显示目录信息...
1.java使用Jsch实现远程操作linux服务器进行文件上传.下载,删除和显示目录信息. 参考链接:https://www.cnblogs.com/longyg/archive/2012/06/2 ...
- 为什么文件上传不了服务器上,文件上传存在服务器还是数据库
文件上传存在服务器还是数据库 内容精选 换一换 本章介绍如何在管理控制台购买GaussDB(for openGauss)实例,并通过内网使用弹性云服务器连接GaussDB(for openGauss) ...
- 6.HDFS文件上传和下载API
HDFS文件上传和下载API package hdfsAPI;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop ...
- 酷盘 文件服务器,文件上传云端 - 卡饭网
怎么通过手机版TIM端把文件上传到云端 怎么通过手机版TIM端把文件上传到云端 手机版Tim是我们常用的一款社交软件,给我们的生活带来了极大的便利和乐趣,深受用户欢迎,有的朋友想知道怎么通过手机版TI ...
- 大数据之 将txt文件上传到HDFS并用Hive查询
在生产上,一般对数据清洗之后直接放到HDFS上,再将目录加载到分区表中,之后通过hive去查询分析数据: 1.准备数据 order_created.txt 用 tab分割 10703007267488 ...
- 通过Java程序将“/你的名字拼音缩写/input1/shixun1.txt”文件上传到HDFS的“/你的名字拼音缩写/java/input1/”目录下;通过Java程序将HDFS上的“/你的名字拼音
题目: 通过Java程序将"/你的名字拼音缩写/input1/shixun1.txt"文件上传到HDFS的"/你的名字拼音缩写/java/input1/"目录下 ...
最新文章
- Python可变参数
- jQuery EasyUI使用教程之使用标记创建树形菜单
- IOS开发笔记 - 基于SDWebImage的网络图片加载处理
- java设计模式-简单工厂模式
- python-列表演练-根据学生id获取学生数据-获取学生数据中得分较高的前N条数据
- 互联网的逻辑和电商的逻辑是不一样的
- HackerRank python练习——Mean, Median, and Mode
- 如何在idea中设置Tomcat热部署
- 求解一元二次方程的解 (分支语句)
- 硬盘数据恢复方法有哪些?希望这些方法能帮助你
- SAP GOS cl_gos_manager 添加附件功能
- 手把手教你玩转谷歌TensorFlow
- Verification和Validation
- 什么是软件生命周期模型?试比较瀑布模型、快速原型模型、增量模型和螺旋模型的优缺点,说明每种模型的使用范围。
- printf函数输出问题
- 项目风险常见清单列表库--再也不愁不能提前预知风险了
- 1497: 变态杀人狂
- 乐橙育儿机器人 众筹_乐橙智能生活发布育儿机器人“小乐”
- AG9311MAQ设计100W USB TYPEC拓展坞资料|AG9311MAQ用于100W USB TYPEC转HDMI带PD快充+U3+SD/CF拓展坞方案说明
- 2020-2025年四大数字技术对数字经济的影响