2019独角兽企业重金招聘Python工程师标准>>>

访问:https://github.com/yangbajing/scala-applications/tree/master/file-upload 获取本文所述完整源码,包括Akka HTTP后端和HTML5实现的前端。

在很多应用里面都会有类似大文件上传的需求,但很多时候我们程序员都会以不支持或不好实现将其推脱掉^_^。

这次因为公司项目需要,另一个组的同事使用Spring实现了一版。他们是采用在前端对大文件分块上传,后端在所有分块上传完成后合并的方式实现了。这个方案有一个弊端,若文件很大,后期对分块做合并时会非常的耗磁盘资源。

本文,我使用Akka HTTP和Akka Stream做为后端服务,可以很优雅的实现大文件的断点续传。原理其实非常的简单,前端计算文件的hash(使用sha256),将hash传到后端查询是否有相同文件已上传,若有将返回已上传文件及文件长度(bytes)。这时候前端就可以知道文件的上传进度,进而判断还需要断点续传的偏移量或者已上传完成(这就是秒传)。

这里有一个设计取舍:客户端对单个文件不做分片,从文件头开始上传。这样的一个好处是可简化服务端的实现,同时也可以优化服务端对文件的存储 (同一个文件将一直使用APPEND的方式写入文件,这样可以更高效的利用磁盘IO。同时,不需要分块合并,若文件很大,生成的大量分块在文件上传完成后再次合并会是一个非常大的资源开销)

断点下载

这个怎么说呢?断点下载Akka HTTP原生支持^_^。你只需要使用如下代码:

  private def downloadRoute: Route = path("download" / Segment) { hash =>getFromFile(FileUtils.getLocalPath(hash).toFile)}

FileRoute#downloadRoute

FileUtils.getLocalPath(hash) 函数通过对hash值(sha256hex)进行计算和拼接,获取实际文件的本地存储路径再交给Akka HTTP提供的getFromFile指令,剩下的工作就交给Akka。

我们可以通过向Akka HTTP发起HEAD请求来查看支持的HTTP功能,看到在反回的header里有Accept-Ranges: bytes,意思是服务端支持使用字节为单位的范围下载(断点下载功能既基于此实现)。

$ curl --head http://localhost:33333/file/download/7d0559e2f7bf42f0c2becc7fbf91b20ca2e7ec373c941fca21314169de9c7ef4
HTTP/1.1 200 OK
Last-Modified: Fri, 28 Dec 2018 14:12:32 GMT
ETag: "132766a7f528d080"
Accept-Ranges: bytes
Server: akka-http/10.1.6
Date: Sat, 29 Dec 2018 02:17:41 GMT
Content-Type: application/octet-stream
Content-Length: 65463496

断点上传

很遗憾,Akka HTTP默认不支持断点上传,这需要自行实现。但是,Akka HTTP做为一个toolkit,足够灵活且强大,实现断点上传功能so easy。

断点上传实现

基于常规的代码设计方式,我们需要ControllerService,那就先从Controller开始:

FileRoute#uploadRoute

def uploadRoute: Route = path("upload") {post {withoutSizeLimit {entity(as[Multipart.FormData]) { formData =>onSuccess(fileService.handleUpload(formData)) { results =>import me.yangbajing.fileupload.util.JacksonSupport._complete(results)}}}}
}

FileRoute#uploadRoute

对于不熟悉Akka HTTP的朋友,推荐阅读我写的电子书(打个广告):Scala Web 开发。这里需要注意的一个指令是withoutSizeLimit,默认Akka HTTP对请求大小限制比较低,我们可以通过withoutSizeLimit指令取消对单个API的请求大小限制,同时又不影响整个Web服务的大小限制。另外,这里通过entity(as[Multipart.FormData])Unmarshaller的方式获取整个Multipart.FormData对象并传入 FileService#handleUpload 函数进行处理。

FileService#handleUpload

  override def handleUpload(formData: Multipart.FormData): Future[Seq[FileBO]] = {formData.parts.map { part =>val (hash, contentLength, startPosition) = part.name.split('.') match {case Array(a, b, c) => (a, b.toLong, c.toLong)case Array(a, b)    => (a, b.toLong, 0L)case Array(a)       => (a, 0L, 0L)case _              => throw new IllegalArgumentException(s"Multipart.FormData name格式不符合要求:${part.name}")}FileInfo(part, hash, contentLength, startPosition)}.log("fileInfo", info => logger.debug(s"fileInfo: $info")).mapAsync(Constants.FILE_PART_MAX)(processFile).runWith(Sink.seq)}

formData.parts是一个Akka Stream流,类型签名为Source[Multipart.FormData.BodyPart, Any]。有关Akka Stream更详细的资料请参阅Akka Streams官方文档。这里,每个part都代表FormData的一个字段(对应HTML 5的FormData类型,同时前端需要使用Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryrP4DAyu31ilqEWmz方式发起请求)。每个part.name都是用英号逗号分隔的三部分来做为请求的字段名,分别是:<hash>.<content length>.<start position>,这样我们就可以在不加入任何其它字段的情况下告知服务端当前上传文件的sha256hex计算出的hash值、文件大小(bytes)和上传起始偏移量。

FileUtils#uploadFile

文件上传的核心逻辑在 FileUtils#uploadFile 函数:

  def uploadFile(fileInfo: FileInfo)(implicit mat: Materializer, ec: ExecutionContext): Future[FileBO] = {val maybeMeta = FileUtils.getFileMeta(fileInfo.hash)val beContinue = maybeMeta.isDefined && fileInfo.startPosition > 0Lif (beContinue) uploadContinue(fileInfo, maybeMeta.get) else uploadNewFile(fileInfo)}

uploadFile函数根据是否为续传来分别调用uploadContinueuploadNewFile函数。首先来看看新文件上传时的代码逻辑:

  def uploadNewFile(fileInfo: FileInfo)(implicit mat: Materializer, ec: ExecutionContext) = {val bodyPart = fileInfo.bodyPartval tmpPath =if (fileInfo.validHash) FileUtils.getLocalPath(fileInfo.hash)else Files.createTempFile(FileUtils.TMP_DIR, bodyPart.filename.getOrElse(""), "") // (1)val sha = MessageDigest.getInstance("SHA-256")bodyPart.entity.dataBytes.map { byteString =>byteString.asByteBuffers.foreach(sha.update) // (2)byteString}.runWith(FileIO.toPath(tmpPath)) // (3).map { ioResult =>val hash = Utils.bytesToHex(sha.digest()) // (4)if (fileInfo.validHash) {require(fileInfo.hash == hash, s"前端上传hash与服务端计算hash值不匹配,${fileInfo.hash} != $hash")}val localPath = if (fileInfo.validHash) tmpPath else move(hash, tmpPath, ioResult.count) // (5)FileBO(hash, localPath, ioResult.count, bodyPart.filename, bodyPart.headers)}}
  1. 根据前端是否上传了有效的hash值(sha256hex)来判断是把文件先写入临时文件还是直接写入实际的本地存储位置(根据hash值计算出本地实际的存储位置)。
  2. Akka HTTP中,上传的文件以流的方式进入,在此对每个ByteString计算并更新sha256值。
  3. 在Akka Stream的Sink端,接收流传入的元素并写入本地文件。
  4. 文件写入结束后调用sha.digest()方法获取已上传文件的sha256值。
  5. 根据是否临时文件来判断是否需要将临时文件移动到实际的本地存储路径,通过文件的hash值来计算出实际的本地存储路径。
  def uploadContinue(fileInfo: FileInfo, meta: FileMeta)(implicit mat: Materializer, ec: ExecutionContext) = {val bodyPart = fileInfo.bodyPartval localPath = FileUtils.getLocalPath(fileInfo.hash)logger.debug(s"断点续传,startPosition:${fileInfo.startPosition},路径:$localPath")val hash = fileInfo.hashbodyPart.entity.dataBytes.runWith(FileIO.toPath(localPath, Set(APPEND), fileInfo.startPosition)) // (1).map(ioResult => FileBO(hash, localPath, meta.size + ioResult.count, bodyPart.filename, bodyPart.headers))}

断点上传时的逻辑其实相对简单,需要注意的是在(1)处调用FileIO.toPath将流定入本地时需要以APPEND模式追加写入到已存在文件。

秒传

在已实现断点上传功能之上,秒传的实现逻辑就非常清晰了。客户端在调用file/upload接口上传文件之前先调用/file/progress/{hash}接口判断相同hash值文件的上传情况,再决定下一步处理。

  1. 客户端计算文件hash,并以文件hash和文件大小作为参数调用/file/progress/{hash}接口
  2. 服务端根据上传hash值判断文件是否已上传,若存在返回已上传文档大小(bytes)
  3. 客户端收到服务端响应后根据文件是否存在及已存在文件大小判断秒传断点续传还是新上传
  4. 秒传,返回文件长度与当前准备上传文件长度大小一致
  5. 断点续传,返回文件大小比当前准备上传文件长度小
  6. 新上传,返回文件不存在
  7. 其它情况,作为新文件上传

秒传的代码逻辑台下:

  def progressRoute: Route = path("progress" / Segment) { hash =>onSuccess(fileService.progressByHash(hash)) {case Some(v) =>import me.yangbajing.fileupload.util.JacksonSupport._complete(v)case None => complete(StatusCodes.NotFound)}}

文件上传进度接口定义如上。

  // FileServiceImpl.scalaoverride def progressByHash(hash: String): Future[Option[FileMeta]] = {require(Objects.nonNull(hash) && hash.nonEmpty, "hash 不能为空。")Future.successful(FileUtils.getFileMeta(hash))}// FileUtils.getFileMetadef getFileMeta(hash: String): Option[FileMeta] = {if (hash == null || Constants.HASH_LENGTH != hash.length) {None} else {val path = getLocalPath(hash)if (Files.exists(path) && Files.isReadable(path)) Some(FileMeta(hash, Files.size(path), path)) else None}}

文件上传进度服务实现定义如上。

小结

本文以Akka HTTP和HTML 5讲述了怎样实现一个支持大文件断点上传、下载和秒传的示例应用程序。

转载于:https://my.oschina.net/yangbajing/blog/3013997

Akka实战:HTTP大文件断点上传、下载,秒传相关推荐

  1. php+大文件断点上传

    总结一下大文件分片上传和断点续传的问题.因为文件过大(比如1G以上),必须要考虑上传过程网络中断的情况.http的网络请求中本身就已经具备了分片上传功能,当传输的文件比较大时,http协议自动会将文件 ...

  2. asp.net 如何实现大文件断点上传功能?

    之前仿造uploadify写了一个HTML5版的文件上传插件,没看过的朋友可以点此先看一下~得到了不少朋友的好评,我自己也用在了项目中,不论是用户头像上传,还是各种媒体文件的上传,以及各种个性的业务需 ...

  3. android实现大文件断点上传

    前言 之前项目需要上传大文件的功能,上传大文件经常遇到上传一半由于网络或者其他一些原因上传失败.然后又得重新上传(很麻烦),所以就想能不能做个断点上传的功能.于是网上搜索,发现市面上很少有断点上传的案 ...

  4. Android应用开发之使用Socket进行大文件断点上传续传

    http://blog.csdn.net/binyao02123202/article/details/7460517

  5. applet实现大文件ftp上传(一)

    由于要用APPLET实现大文件FTP上传下载,从网上搜索了几下,找到很多资料,最后决定采用基于 org.apache.commons.net.ftp包实现FTP上传下载,Net包中的类既提供对协议的底 ...

  6. android学习笔记---32_文件断点上传器,解决多用户并发,以及自定义协议,注意协议中的漏洞

    32_文件断点上传器 --------------------------- 1.当文件很大的时候就无法通过http协议进行上传了,因为get,post的安全原因,很多服 务器会   禁止这些协议,而 ...

  7. 大文件分片上传前后端实现

    最近在做公司的视频业务,涉及到大视频的上传. 之前的图片.Excel等上传做的很简单,直接表单提交后端用MultipartFile接收保存到磁盘就行了. 但是针对大文件的上传,需要做额外的处理,否则可 ...

  8. 请问:怎么实现大文件快速上传?

    关注公众号 前端开发博客,领27本电子书 回复加群,自助秒进前端群 前言 大文件快速上传的方案,相信你也有过了解,其实无非就是将 文件变小,也就是通过 压缩文件资源 或者 文件资源分块 后再上传. 本 ...

  9. iOS 利用AFNetworking实现大文件分片上传

    概述 一说到文件上传,想必大家都并不陌生,更何况是利用AFNetworking(PS:后期统称AF)来做,那更是小菜一碟.比如开发中常见的场景:头像上传,九宫格图片上传...等等,这些场景无一不使用到 ...

最新文章

  1. cpu负载过高案例,解决方法记录
  2. flannel源码分析--main
  3. poj 3984
  4. Vue 视频播放插件vue-video-player
  5. nginx php实例,多个mysql,nginx,php实例环境安装zabbix(完全自定义)
  6. Android插件化开发之用DexClassLoader加载未安装的APK资源文件来实现app切换背景皮肤
  7. mysql sync es 异步双写_mysql数据同步es方案思考
  8. bugscan泄露代码解密
  9. linux设备驱动程序 脚本之家,linux – 使用systemd自动挂载USB驱动器
  10. Python PDF转图片 Word
  11. 关于「数据分析师」的一些理解
  12. Linux隧道sit
  13. 【二〇二一·立秋】读书笔记
  14. MySQL中用户订单复购率的计算
  15. 红米AC3000、小米cr8806、8808、8809成功刷入openwrt
  16. maven 零散配置
  17. x390yoga 关掉触控屏幕_八代酷睿变形金刚:ThinkPad X390 Yoga变形本带你飞
  18. 如何在网页版163邮箱中添加别的邮箱
  19. 浏览器上查看微信公众号的所有历史文章
  20. Vue 使用 yarn 报错

热门文章

  1. 我今天的收获,必备stadio 插件
  2. Retrofit 入门和提高
  3. 第七周项目一-成员函数(4)
  4. Android中自定义checkbox样式
  5. 关于python3的标识符_python3中关于基础语法的详解
  6. vue父子组件传值,sync语法糖
  7. matplotlib坐标轴调整
  8. group by的查询
  9. MySqli操作数据库
  10. Swift - 使用网格(UICollectionView)的自定义布局实现复杂页面