为利用Hadoop集群平台的分布存储和计算能力,基于MapReduce将ftp文件分布式下载并上传到HDFS中。

1、文件移动流程:ftp服务器->datanode本地目录->HDFS目录;

2、实现主要基于两个设计思想:
   1)将FTP服务器文件列表作为MapReduce处理的数据对象,按照文件名分布到不同Reduce节点下载和上传到HDFS中;
   2)在每个datanode节点都建立一个本地文件保存目录,最好是统一路径名,这样每个Reduce节点都把FTP服务器文件下载到该目录下;

3、代码主要过程:
   1)驱动类中先读取FTP服务器上要下载的文件列表,并移入到hdfs中,作为Map函数的输入;
   2)Map函数处理文件列表,获取文件名字,作为Reduce函数输入;
   3)Reduce函数根据输入的文件名去下载ftp服务器上对应的文件,并下载到datanode节点的统一本地目录,再将本地目录文件上传到HDFS中;

4、主要技术点:
   1)FTPClient实现ftp文件下载;
   2)hadoop的IOUtils类实现文件从本地上传到HDFS;

5、准备工作
   1)ftp服务器端口、用户名和密码、下载文件目录;
      linux下ftp命令:进入$ftp ip/常用命令:ls/cd/put/get/mput/mget
   2)每个节点统一建立本地目录/tmp/fjs/localftp,保存ftp服务器上下载的文件;
   3)Namenode上建立HDFS保存文件的目录/tmp/fjs/ftp;
   4)Namenode上建立HDFS保存文件列表的目录/tmp/fjs/in,即Map函数的输入数据;

6、具体代码:

1)主类FtpMR:驱动类加MapReduce类;

package ct.gd;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class FtpMR {public static class FtpMap extends Mapper<Object,Text,Text,Text>{private Text _key = new Text();private Text _value = new Text();        public void map(Object key,Text value,Context context) throws IOException,InterruptedException{String line = value.toString();//tag是随机值,目的是将文件均匀分到各节点下载,随机范围根据集群节点数,这里是0-100内//假设下载文件有1000个,100随机范围,集群有100个节点,那每个节点均匀可能获得10个文件下载,//map输出的<key,value>,输入reduce时,key值相同的会形成value list,因此设计该随机key值String tag = ComMethod.getRandomNbr();_key.set(tag);_value.set(line);context.write(_key,_value);}}public static class FtpReduce extends Reducer<Text,Text,Text,Text>{public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{String ftpSrv=context.getConfiguration().get("ftpSrv");//获取ftp服务器连接信息String outputPath=context.getConfiguration().get("outputPath");//获取hdfs存放文件的目录FtpUtil fu=new FtpUtil();             for(Text value:values){String filename=value.toString();//输入的value是ftp服务器上的文件名String localFile=fu.DownFileToLocal(ftpSrv,filename);//下载文件到本地目录,并返回文件保存的路径if (localFile!=null) fu.WriteFileToHDFS(localFile,ComMethod.changeToDir(outputPath),filename);//本地文件上传到hdfs中}}}public static void main(String[] args) throws Exception {         Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: FtpMR <in> <out>");System.exit(2);}String inputPath=otherArgs[0];//FTP服务器保存文件列表的文件目录String outputPath=otherArgs[1];//下载的ftp文件保存在hdfs中的目录FtpUtil fu=new FtpUtil();//ftp服务器字符串格式:IP|port|username|password|file directoryString strFtpSrv="IP|port|name|password|directory";       //获取ftp服务器上文件列表,保存到hdfs的inputPath目录下if(!fu.getFtpFileList(strFtpSrv,inputPath)){System.err.println("下载ftp服务器文件列表失败");System.exit(2);}//将ftp服务器的参数作为参数传递到Reduce中conf.set("ftpSrv", strFtpSrv);//将hdfs上保存下载文件的目录传递到Reduce中conf.set("outputPath", outputPath);Job job = new Job(conf, "FtpToHdfs");job.setJarByClass(FtpMR.class);//job.setNumReduceTasks(1);//设置reduce输入文件一个,方便查看结果job.setMapperClass(FtpMap.class);job.setReducerClass(FtpReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(inputPath));FileOutputFormat.setOutputPath(job, new Path(outputPath));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

2)接口类FtpUtil:主要处理ftp文件下载和写入hdfs中;

package ct.gd;import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;public class FtpUtil {/*下载文件列表处理函数,开始*/public boolean getFtpFileList(String strFtpSrv,String inputPath){//从ftp服务器上读取文件列表String[] FtpSrvConn=strFtpSrv.split("\\|");//截取ftp服务器连接信息 FTPClient ftp = new FTPClient();try {  ftp.connect(FtpSrvConn[0], Integer.parseInt(FtpSrvConn[1])); //url和port   ftp.login(FtpSrvConn[2], FtpSrvConn[3]); //name和passwordint reply = ftp.getReplyCode();  if (!FTPReply.isPositiveCompletion(reply)) {  ftp.disconnect();  return false;  }  String remotePath=FtpSrvConn[4];//Ftp服务器上文件目录ftp.changeWorkingDirectory(remotePath);FTPFile[] fs = ftp.listFiles(remotePath);StringBuffer buffer = new StringBuffer();for(FTPFile ff:fs){String fileName = ff.getName();buffer.append(fileName+"\n");}if(writeBufferToHDFSFile(buffer, inputPath)){ftp.logout();return true;}ftp.logout();} catch (IOException e) {System.out.println(e.getMessage());} finally {  if (ftp.isConnected()) {  try {  ftp.disconnect();  } catch (IOException ioe) { System.out.println(ioe.getMessage()); }  }  }  return false;  }private boolean writeBufferToHDFSFile(StringBuffer buffer, String inputPath){//将文件列表写到hdfs中Configuration conf =  new Configuration();FileSystem fs = null;String fileName="fileLists.txt";try {fs = FileSystem.get(conf);inputPath = ComMethod.changeToDir(inputPath) + fileName;Path fsInputPath=new Path(inputPath);FSDataOutputStream outputStream = fs.create(fsInputPath);outputStream.write(buffer.toString().getBytes("UTF-8"));outputStream.flush();outputStream.sync();outputStream.close();return true;} catch (IOException e) {System.out.println(e.getMessage());}return false;}/*下载文件列表处理函数,结束*//*下载文件处理函数,开始*/public String DownFileToLocal(String ftpSrv,String filename){//在节点上创建本地保存下载文件的目录String localPath="/tmp/fjs/localftp";File localDir = new File(localPath);//如果不存在就创建if(!localDir.exists()){localDir.mkdirs();}FTPClient ftp = new FTPClient();String[] FtpSrvConn=ftpSrv.split("\\|");//截取ftp服务器连接信息try {  ftp.connect(FtpSrvConn[0], Integer.parseInt(FtpSrvConn[1])); //url和port   ftp.login(FtpSrvConn[2], FtpSrvConn[3]); //name和passwordint reply = ftp.getReplyCode();  if (!FTPReply.isPositiveCompletion(reply)) {  ftp.disconnect();  return null;}  String remotePath=FtpSrvConn[4];//Ftp服务器上文件目录ftp.changeWorkingDirectory(remotePath);String localFilePath = ComMethod.changeToDir(localPath) + filename;File localFile = new File(localFilePath);OutputStream is = new FileOutputStream(localFile);ftp.retrieveFile(filename, is);//下载is.close();ftp.logout();return localFilePath;} catch (IOException e) {  System.err.println(e.getMessage());} finally {  if (ftp.isConnected()) {  try {  ftp.disconnect();  } catch (IOException ioe) { }  }  }  return null; }/*下载文件处理函数,结束*//*上传文件到hdfs处理函数,开始*/public void  WriteFileToHDFS(String localFile,String outputPath,String filename){Configuration conf =  new Configuration();FileSystem fs = null;try {fs=FileSystem.get(conf);InputStream in = new BufferedInputStream(new FileInputStream(localFile));String ouputFile = outputPath + filename;//hdfs存放文件路劲和名字OutputStream out = fs.create(new Path(ouputFile));IOUtils.copyBytes(in, out, 1024*1024,true);//迁移out.flush();if(out!=null) out.close();if(in!=null) in.close();//删除本地文件File _outputFileName = new File(localFile);if(_outputFileName.exists()) _outputFileName.delete();} catch (IOException e) {e.printStackTrace();}    }/*上传文件到hdfs处理函数,结束*/public static void main(String[] args) throws Exception { }
}

3)通用函数类ComMethod:主要是一些通用字符处理函数;

package ct.gd;import java.util.Random;public class ComMethod {public static String changeToDir(String dirPath){//目录最后是否有/if(dirPath.charAt(dirPath.length()-1)!='/'){dirPath = dirPath + "/";}return dirPath;}public static String getRandomNbr(){//获取随机数Random rand = new Random();String nbr = String.valueOf(rand.nextInt(100));return nbr;}}

7、执行结果
   1)执行命令:yarn jar /mnt/mr.jar /tmp/fjs/in /tmp/fjs/ftp
   2)hadoop fs -ls /tmp/fjs/in 可以看到文件列表文件
   3)hadoop fs -ls /tmp/fjs/ftp 可以看到下载的文件
   4)每个节点ls -l /tmp/fjs/localftp,如果文件都迁入hdfs,应该为空

MapReduce基础开发之五分布式下载ftp文件到本地再迁移到hdfs相关推荐

  1. geo ftp环境变量 export source ~/.bashrc Linux中filezilla下载ftp文件 ftp.ncbi.nlm.nih.gov linux下载 ftp下载geo非原始数

    https://filezilla-project.org/download.php?show_all=1 右键 检测(inspect) 获得下载链接 Linux如何下载ftp文件 2 Filezil ...

  2. R语言使用download.file函数下载网络文件到本地(Download File from the Internet)

    R语言使用download.file函数下载网络文件到本地(Download File from the Internet) 目录 R语言使用download.file函数下载网络文件到本地(Down ...

  3. python urlretrieve_使用urllib库的urlretrieve()方法下载网络文件到本地的方法

    概述 见源码 源码 # !/usr/bin/env python # -*- coding:utf-8 -*- """ 图片(文件)下载,核心方法是 urllib.url ...

  4. php远程下载到本地,PHP 下载远程文件到本地的简单示例

    搜索热词 对PHP下载远程文件到本地存储的代码感兴趣的小伙伴,下面一起跟随编程之家 jb51.cc的小编两巴掌来看看吧! /** * PHP下载远程文件到本地存储的代码 * * @param * @a ...

  5. java下载网络文件至本地

    通过url下载网络文件至本地 所需依赖和工具类代码 所需依赖 <dependency><groupId>org.apache.httpcomponents</groupI ...

  6. 【转】java下载网络文件至本地

    通过url下载网络文件至本地 所需依赖和工具类代码 所需依赖 <dependency><groupId>org.apache.httpcomponents</groupI ...

  7. MapReduce基础开发之二数据去重和排序

    因Hadoop集群平台网络限制,只能在eclipse里先写好代码再提交jar到集群平台namenode上执行,不能实时调试,所以没有配置eclipse的hadoop开发环境,只是引入了hadoop的l ...

  8. linux中下载ftp文件

    一.最简单的方法: wget 下面的命令用来下载ftp服务器上指定目录的所有文件 [html] view plaincopyprint? wget ftp://IP:PORT/* --ftp-user ...

  9. linux自动下载ftp文件夹,Linux 下FTP定时执行批量下载文件

    使用FTP定时批量下载指定文件的shell脚本 环境:centos6.9 1.目标FTP服务器地址 #FTP服务器地址 ip=10.19.15.23 2.FTP账号和密码 u=账号 p=密码 3.使用 ...

最新文章

  1. 教程:6、打印文件和发送邮件
  2. Cent OS – Tomcat 7 - 集群
  3. 新版 Edge 浏览器或将拥有两个不同的浏览器内核
  4. 【PC工具】windows图片文字识别软件,天若OCR文字识别软件
  5. OpenCV_(Corner Detect with Morphology) 基于形态学滤波的角点检测
  6. 健康体检信息管理系统方案/案列/软件/APP/小程序/网站
  7. 冰点还原_8.60.020.5592_Standard版本手动卸载
  8. 永恒之塔总是服务器未响应,《剑网3》《永恒之塔》怀旧服刚开上演“冲级热”,八月怀旧游戏集体搞事...
  9. 百度地图API计算经纬度
  10. 怎么判断私网地址_判断本机IP地址是公网地址还是私网地址
  11. M3U8 文件介绍 与 播放方法
  12. Go + C 一款简单的贪吃蛇
  13. Android模仿新浪微博(自定义ListView下拉刷新)
  14. win10自带sftp服务器_FreeSSHD在Windows环境下搭建SFTP服务器
  15. 服务器集群环境下session的共享问题
  16. modbus tcp主站和从站_组态王与西门子 PLC无线Modbus通讯
  17. MySQL 客户端安装
  18. Unable to open underlying table
  19. 应付帐款—制单处理,出现“供应商被锁定”的解决方法
  20. 梯度累加(Gradient Accumulation)

热门文章

  1. myeclipse html选取包含元素的标签_HTML基础2019-1-21
  2. git 入门教程之协同开发
  3. Python3基础知识之运算符
  4. BZOJ 2244: [SDOI2011]拦截导弹 DP+CDQ分治
  5. 增强画面纵深感的几个小技巧
  6. Linux下使用curl进行http请求(转)
  7. setInterval()方法只执行一次的解决方法
  8. linux/android 脚本相关
  9. UITextField的属性与程序启动后一系列方法
  10. 2D简单图形相关算法罗列