本文首发微信公众号:码上观世界

网络文件传输的应用场景很多,如网络聊天的点对点传输、文件同步网盘的上传与下载、文件上传到分布式文件存储器等,其传输速度主要受限于网络带宽、存储器大小、CPU处理速度以及磁盘读写速度,尤其是网络带宽。本文主要讨论通常情况下数十GB规模大小的文件传输的优化方式,对于更大规模的文件容量建议考虑人工硬盘运输,毕竟基于公路运输的方式不仅带宽大而且成本低。

文件传输涉及到客户端、中间网络和服务器,常用的传输协议有HTTP(s)、(S)FTP和TCP(UDP)协议等,对于客户端用户来讲,能够起作用的地方不大,所以本文就两种基本的场景来讨论文件传输在客户端的优化方式:基于HTTP协议的非结构化文件传输和基于TCP协议的结构化文件传输。

基于HTTP协议的非结构化文件传输

最常用的文件上传是基于HTTP POST。观察浏览器的请求头数据可知,文件的二进制数据被置于请求body里面,也就是说在上传文件过程中,客户端是一次性将文件内容加载到内存,如果文件过大,浏览器很可能会崩溃,加上HTTP请求连接本身有超时时间限制,所以这种方式不适合传输大文件。

所以一种自然的方式就是手写符合规范的HTTP协议跟服务端通信:

上面的示例代码相比通过浏览器上传文件方式显得自由度更大,但是问题也更多,比如OutputStream将数据写入到PosterOutputStream内部缓冲区,而该缓冲区只有当调用HttpURLConnection的getInputStream方法之后才会发送到Socket流中。所以当文件过大(也许几十MB)就会导致内存溢出,即使通过调用flush方法也无济于事,因为PosterOutputStream的flush方法是空操作,什么都不干!幸运的是HttpURLConnection提供的setFixedLengthStreamingMode方法能够获取到自动刷新流缓存的StreamingOutputStream。虽然这种方式能够解决问题,但是还可能会遇到其他大大小小的坑,而且上述方式还是过于原始,使用Apache HttpClient能够轻易实现上述功能:

HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
httpClientBuilder.setDefaultCredentialsProvider(credsProvider);
RequestConfig requestConfig = RequestConfig.custom().setCookieSpec(CookieSpecs.DEFAULT).build();
CloseableHttpClient httpClient = httpClientBuilder.build();
File file = new File(filePath);
HttpPut httpPut = new HttpPut(url);
FileEntity fileEntity = new FileEntity(file);
httpPut.setEntity(fileEntity);
FileInputStream fileInputStream = new FileInputStream(file);
InputStreamEntity reqEntity = new InputStreamEntity(fileInputStream, file.length());
//post.setEntity(reqEntity);
HttpResponse response = httpClient.execute(httpPut);
String content = EntityUtils.toString(response.getEntity());

示例代码中,HttpClient帮我们封装了协议相关的所有内容。对于文件传输FileEntity 和InputStreamEntity 都可以使用,不同的是,InputStreamEntity 用了流传输的方式,我们需要做的是就是验证这两种方式是否存在文件过大导致的内存溢出问题。先看FileEntity ,直接翻到代码DefaultBHttpClientConnection :

class DefaultBHttpClientConnection extends BHttpConnectionBase{
......
public void sendRequestEntity(final HttpEntityEnclosingRequest request)
throws HttpException, IOException {Args.notNull(request, "HTTP request");ensureOpen();final HttpEntity entity = request.getEntity();if (entity == null) {return;}final OutputStream outstream = prepareOutput(request);entity.writeTo(outstream);outstream.close();}
......
}
class FileEntity{
......
public void writeTo(final OutputStream outstream) throws IOException {
Args.notNull(outstream, "Output stream");final InputStream instream = new FileInputStream(this.file);try {final byte[] tmp = new byte[OUTPUT_BUFFER_SIZE];int l;while ((l = instream.read(tmp)) != -1) {outstream.write(tmp, 0, l);}outstream.flush();} finally {instream.close();}
}
......
}

再看看InputStreamEntity:

class InputStreamEntity{
......
public void writeTo(final OutputStream outstream) throws IOException {
Args.notNull(outstream, "Output stream");final InputStream instream = this.content;try {final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE];int l;if (this.length < 0) {// consume until EOFwhile ((l = instream.read(buffer)) != -1) {outstream.write(buffer, 0, l);}} else {// consume no more than lengthlong remaining = this.length;while (remaining > 0) {l = instream.read(buffer, 0, (int)Math.min(OUTPUT_BUFFER_SIZE, remaining));if (l == -1) {break;}outstream.write(buffer, 0, l);remaining -= l;}}} finally {instream.close();}
}
......
}

可见FileEntity 和InputStreamEntity使用了相同的outstream,其生成方式为:

class BHttpConnectionBase{
......protected OutputStream prepareOutput(final HttpMessage message) throws HttpException {final long len = this.outgoingContentStrategy.determineLength(message);return createOutputStream(len, this.outbuffer);}protected OutputStream createOutputStream(final long len,final SessionOutputBuffer outbuffer) {if (len == ContentLengthStrategy.CHUNKED) {return new ChunkedOutputStream(2048, outbuffer);} else if (len == ContentLengthStrategy.IDENTITY) {return new IdentityOutputStream(outbuffer);} else {return new ContentLengthOutputStream(outbuffer, len);}}
......
}

这里以ContentLengthOutputStream为例来看数据是如何发送到Socket流中的:

class ContentLengthOutputStream{private final SessionOutputBuffer out;
......public void write(final byte[] b, final int off, final int len) throws IOException {if (this.closed) {throw new IOException("Attempted write to closed stream.");}if (this.total < this.contentLength) {final long max = this.contentLength - this.total;int chunk = len;if (chunk > max) {chunk = (int) max;}this.out.write(b, off, chunk);this.total += chunk;}
}
......
}
class SessionOutputBufferImpl{private OutputStream outstream;
......public void write(final byte[] b, final int off, final int len) throws IOException {if (b == null) {return;}// Do not want to buffer large-ish chunks// if the byte array is larger then MIN_CHUNK_LIMIT// write it directly to the output streamif (len > this.fragementSizeHint || len > this.buffer.capacity()) {// flush the bufferflushBuffer();// write directly to the out streamstreamWrite(b, off, len);this.metrics.incrementBytesTransferred(len);} else {// Do not let the buffer grow unnecessarilyfinal int freecapacity = this.buffer.capacity() - this.buffer.length();if (len > freecapacity) {// flush the bufferflushBuffer();}// bufferthis.buffer.append(b, off, len);}
}
private void flushBuffer() throws IOException {final int len = this.buffer.length();if (len > 0) {streamWrite(this.buffer.buffer(), 0, len);this.buffer.clear();this.metrics.incrementBytesTransferred(len);}
}
private void streamWrite(final byte[] b, final int off, final int len) throws IOException {Asserts.notNull(outstream, "Output stream");this.outstream.write(b, off, len);
}
......
}
class SocketOutputStream {
......public void write(byte b[], int off, int len) throws IOException {socketWrite(b, off, len);}
......
}

通过上面关键代码可见,不管用哪一种Entity,当缓冲区满了就自动flush到Socket,理论上都可以进行大文件传输,只要超时时间允许,两者并没有什么特别的不同。

基于TCP协议的结构化文件传输

基于HTTP协议的文件传输,虽然通过流的方式能解决大文件传输问题,但是基于应用层协议毕竟效率不到,时间消耗仍是个大问题,尽管可以通过文件拆分,并行处理,但需要服务器端的配合才能完成(比如将小文件还原,断点续传等)。这里讨论的多文件传输到分布式系统不需要对服务端再做改造就能直接使用,天然具备并行处理能力。对于结构化文件传输的使用场景多用于数据迁移,比如从数据库系统或者文件系统传输到大数据存储计算平台。这里以将本地的CSV文件上传到HDFS为例,需要解决的是如何对文件拆分。虽然对非结构化,半结构化文件因为涉及到分隔符问题,对于文件拆分有点儿难度,但对规范化格式的文件,问题倒不大,但考虑让问题描述更简洁,这里不考虑文件拆分,只考虑一个文件(比如文件夹下已经拆分后的某个文件)的传输问题。该问题模型可以描述为:

引入Channel是为了解决File和HDFS存取速率不匹配的问题,通过Channel连接File读过程和HDFS写过程:当Channel缓存满的时候,File等待HDFS读取之后再开始写入Channel,HDFS读取之后File再写入Channel,两者通过信号量机制协调,HDFS每次写入都是一个独立的文件。关键代码实现如下:

File端读取数据到Channel:

public void readCSV(String filePath, String fieldDelimiter) {
......BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"), 8192);CsvReader csvReader = new CsvReader(reader);csvReader.setDelimiter(fieldDelimiter.charAt(0));String[] parseRows;while ((parseRows = splitBufferedReader(csvReader)) != null) {//Record 为文件中一行数据记录,由Column组成Record record = createRecord(parseRows);this.buffer.add(record);if (this.buffer.size() >= MemoryChannel.bufferSize) {this.channel.pushAll(this.buffer);this.buffer.clear();}}this.channel.pushAll(this.buffer);this.buffer.clear();
......}
//基于内存的Channel实现
class  MemoryChannel{private ArrayBlockingQueue<Record> queue;private ReentrantLock lock;private Condition notInsufficient, notEmpty;
......//将File读取端将记录push到Channelpublic void pushAll(final Collection<Record> rs) {Validate.notNull(rs);Validate.noNullElements(rs);try {lock.lockInterruptibly();while (!this.queue.isEmpty()) {notInsufficient.await(200L, TimeUnit.MILLISECONDS);}this.queue.addAll(rs);notEmpty.signalAll();} catch (InterruptedException e) {throw new RuntimeException("pushAll", e);} finally {lock.unlock();}}......
}
class HdfsWriteService{
......
public void writeFile(String fieldDelimiter) {FileOutputFormat outFormat = new TextOutputFormat();outFormat.setOutputPath(jobConf, outputPath);outFormat.setWorkOutputPath(jobConf, outputPath);List<Record> recordList= new ArrayList(MemoryChannel.bufferSize);this.channel.pullAll(recordList);RecordWriter writer = outFormat.getRecordWriter(fileSystem, jobConf, outputPath.toString(), Reporter.NULL);for (Record record : recordList) {//将Record记录组装成HDFS的TEXT行记录,列分隔符可自定义Text recordResult = new Text(StringUtils.join(mergeColumn(record), fieldDelimiter));writer.write(NullWritable.get(), recordResult);}writer.close(Reporter.NULL);
}
......
//基于内存的Channel实现
class  MemoryChannel{
......//HDFS写入端从Channel中Pull记录public void pullAll(Collection<Record> rs) {assert rs != null;rs.clear();try {lock.lockInterruptibly();while (this.queue.drainTo(rs, bufferSize) <= 0) {notEmpty.await(200L, TimeUnit.MILLISECONDS);}notInsufficient.signalAll();} catch (InterruptedException e) {throw new RuntimeException("pullAll", e);} finally {lock.unlock();}
}
......
}

上述方式是实现多文件并行传输的基础,每个独立Channel的传输过程互不影响,即使当前Chanel过程失败,也可以独立重跑恢复。

END

如何进行大文件传输?相关推荐

  1. c++ udp多线程 例子_[内附完整源码和文档] 基于udp实现tcp功能进行大文件传输

    一.项目要求 Please choose one of following programing languages: C, C++, Java, Python; 本项目采用的是python3.6 L ...

  2. 邮箱附件、QQ、微信等社交工具大文件传输解决方案

    工具说明:适用于邮箱附件.QQ.微信.钉钉.网盘等场景的大文件分割存储和传输. 下载地址:https://download.csdn.net/download/hj960511/85012515 作者 ...

  3. java rmi 文件传输_JAVA-RMI实现大文件传输

    在使用java-rmi的过程中,必然会遇到一个文件上传的问题,由于在rmi中无法传输文件流(比如rmi中的方法参数不能是FileInputStream之类的),那么我们只好选择一种折中的办法,就是先用 ...

  4. Linux、Windows都适用的跨国传输、大文件传输软件:飞驰传输

    随着"走出去"战略的实施,中国越来越多的企业走向国外,跨国企业将是大的潮流和趋势.跨国企业的分支机构遍及全球各地,员工来自多个国家,使用不同的语言.由于各地IT基础设施建设水平和使 ...

  5. 如何分发大文件、大文件传输解决方案

    随着云计算.大数据技术不断发展,4K 视频.虚拟现实(VR).视频直播等互联网应用领域不断升级更新,企业网.数据中心规模持续扩大,企业拥有的数据急剧增长,海量文件随之产生. 同时,互联网时代,众多行业 ...

  6. 基于UDP的企业级大文件传输体系

    在信息技术与互联网技术快速发展的今天,很多企业,特别是大中型企业都建设了林林总总的信息系统.这些信息系统助推企业实现了业务的快速发展.办公管理的科学高效,为企业创造了巨大的经济价值.这些信息系统在功能 ...

  7. 大文件传输软件的优势你了解吗?

    2012年以来,大数据(big data)一词越来越多地被提及,人们用它来描述和定义信息爆炸时代产生的海量数据,并命名与之相关的技术发展与创新.数据正在迅速膨胀并变大,它决定着企业的未来发展.企业面临 ...

  8. 镭速联合Azure Blob 存储,重塑大文件传输平台存储架构

    Azure Blob 存储是Microsoft 提供的适用于云原生工作负载.存档.数据湖.高性能计算和机器学习的可大规模缩放且安全的对象存储,是目前市场上唯一一种可为低延时和交互式方案提供基于SSD的 ...

  9. 哪里有免费大文件传输平台?通过这4个网站免费来进行大文件传输

    使用电子邮件发送大文件时,可能会遇到大小文件传输的限制.这四个免费大文件传输网站让大文件传输变得轻而易举.有许多大文件传输网站,但是通常您必须经过一些步骤才能使用它们,例如创建帐户,验证电子邮件地址或 ...

  10. 4种大文件传输工具和软件,用于共享大文件

    无论是个人还是与团队一起工作,大文件传输软件和网站都能协助提高工作效率.有效地管理工作内容.疫情原因有时我们不得不居家办公,在这种情况下可以分享文件的工具就显得尤为重要. 每个公司都需要一个文件传输软 ...

最新文章

  1. JEECG微云快速开发平台
  2. Python学习记录之-----类
  3. GDCM:cmyk的Png文件转dicom文件的测试程序
  4. java中的jre里面有什么_Java中JRE介绍,JRE是什么
  5. 数据链路层差错控制——奇偶校验码、循环冗余码和汉明码(海明码)
  6. day10_控制文件
  7. Silverlight Blend动画设计系列十二:三角函数(Trigonometry)动画之自由旋转(Free-form rotation)...
  8. 阿里前端开源的一些产品
  9. mysql 两张大表关联_MySQL的DropTable影响分析和最佳实践
  10. 使用STS临时访问凭证访问OSS
  11. 软件测试某公司面试题2014
  12. CentOS下apache绑定域名
  13. 桌面文件丢失如何找回
  14. pip安装:Cannot uninstall ''. It is a distutils installed project and thus we cannot accurately....解决办法
  15. 单细胞测序对于医学的意义
  16. 下载哔哩哔哩网页上的视频
  17. 针对iPhone X和iPhone XS这些傻叉手机安全距离的设定
  18. 如何给WORD文档添加外边框,教程在这里,WORD页面外边框怎么添加
  19. 狂神。JavaWeb学习(2)
  20. virtualbox 不能为虚拟电脑打开一个新任务/VT-x features locked or unavailable in MSR.

热门文章

  1. 京东联盟导购平台开发指南(附带API接口)
  2. Abnova循环肿瘤DNA丨全血分离,基因组DNA萃取分析
  3. PS常用快捷键及模板使用
  4. 知其然(1) 无法使用 DISTINCT, GROUP BY 等子句从视图中选择 ROWID 或采样
  5. 口袋超萌服务器维护中,口袋超萌手游加速攻略 口袋超萌加速方法说明
  6. 音频特征----频谱图
  7. 碰撞次数与π的关系问题程序求解
  8. Unity Shader 详细自学(一)
  9. FDM3D打印技术原理简析
  10. 使用freessl.orgq免费ssl证书