本文为博主原创文章,未经允许不得转载

造轮子者:Season_zlc

轮子用法请戳作者链接 ↑

前言

本文主要讲述 RxDownload2 的多线程断点下载技术。

断点下载技术前提

服务器必须支持按 byte-range 下载,也就是支持 Range: bytes=xxx-xxx 请求头。详见Http协议 rfc2616 - Range。

下载范围分割

很简单,先读取 Content-Length 响应头,获取文件大小,然后用文件大小除以线程数就可计算出每条线程的下载范围。

比如,假设文件大小是 100 bytes,下载线程数为 3。因为 100 / 3 = 33,所以:

  • 线程 0 的下载范围是 0 ~32[0 * 33 ~ (0 + 1) * 33 - 1]
  • 线程 1 的下载范围是 33~65[1 * 33 ~ (1 + 1) * 33 - 1]
  • 线程 2 的下载范围是 66~99[2 * 33 ~ 100 - 1]

上代码:

  1. prepareDownload() [-> FileHelper.java]
    public void prepareDownload(File lastModifyFile, File tempFile, File saveFile,long fileLength, String lastModify)throws IOException, ParseException {// 将响应头中的上次修改时间转为 long 类型的 unix 时间戳,然后保存到文件中writeLastModify(lastModifyFile, lastModify);// 设置下载文件的大小、计算每条线程的下载范围并保存到 tempFile 中prepareFile(tempFile, saveFile, fileLength);}
  1. prepareFile() [-> FileHelper.java]
    private void prepareFile(File tempFile, File saveFile, long fileLength)throws IOException {RandomAccessFile rFile = null;RandomAccessFile rRecord = null;FileChannel channel = null;try {rFile = new RandomAccessFile(saveFile, ACCESS);rFile.setLength(fileLength);//设置下载文件的长度 rRecord = new RandomAccessFile(tempFile, ACCESS);// 下载范围在文件中的记录方式:|start|end|start|end|start|end|...// 数据类型是 long,long类型在 java 中占 8 个字节,所以每个线程的下载范围都占 16 字节// 所以 tempFile 的长度 RECORD_FILE_TOTAL_SIZE = 16 * 线程数rRecord.setLength(RECORD_FILE_TOTAL_SIZE); //设置指针记录文件的大小// NIO 内存映射文件的方式读写二进制文件,速度更快channel = rRecord.getChannel();// 注意映射方式为读写MappedByteBuffer buffer = channel.map(READ_WRITE, 0, RECORD_FILE_TOTAL_SIZE);long start;long end;// 计算并保存每条线程的下载范围,计算方法同上面举的例子int eachSize = (int) (fileLength / maxThreads);for (int i = 0; i < maxThreads; i++) {if (i == maxThreads - 1) {start = i * eachSize;end = fileLength - 1;} else {start = i * eachSize;end = (i + 1) * eachSize - 1;}buffer.putLong(start);buffer.putLong(end);}} finally {closeQuietly(channel);closeQuietly(rRecord);closeQuietly(rFile);}}

读取下载范围

很简单,上面已经将每条线程的下载范围保存到了 tempFile 中,只要再从 tempFile 中按位置读出来就行了。

  1. readDownloadRange() [-> FileHelper.java]
    public DownloadRange readDownloadRange(File tempFile, int i) throws IOException {RandomAccessFile record = null;FileChannel channel = null;try {// 入参 i 表示线程序号record = new RandomAccessFile(tempFile, ACCESS);channel = record.getChannel();MappedByteBuffer buffer = channel.map(READ_WRITE, i * EACH_RECORD_SIZE, (i + 1) * EACH_RECORD_SIZE);long startByte = buffer.getLong();long endByte = buffer.getLong();return new DownloadRange(startByte, endByte);} finally {closeQuietly(channel);closeQuietly(record);}}

注意 MappedByteBuffer buffer = channel.map(READ_WRITE, i * EACH_RECORD_SIZE, (i + 1) * EACH_RECORD_SIZE); 这句代码是有坑的,但是表现不出来,因为这里的文件打开方式为 READ_WRITE。要是改成 READ_ONLY 就有导致读取最后一条线程的下载范围时抛出IllegalArgumentException(代码静态检查工具 Fortify 提示要以合适的权限打开文件,我将其改为了 READ_ONLY ,发现了这一问题)。

错误原因:map() 方法的最后一个参数表示要映射的字节数,以只读方式打开时,若参数大小超过了文件剩余可读字节数,就会抛出 IllegalArgumentException。而以读写方式打开文件时,会自动扩展文件长度,所以不会抛出异常。

因为每段下载范围的长度都是 EACH_RECORD_SIZE = 16 bytes,所以,上述代码应修改为:
MappedByteBuffer buffer = channel.map(READ_WRITE, i * EACH_RECORD_SIZE, EACH_RECORD_SIZE);

Intellij IDEA 示例代码

自己写了个示例代码,测试了一下:

        RandomAccessFile file = new RandomAccessFile("temp.txt", "rw");file.setLength(48);FileChannel channel = file.getChannel();MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 48);for (int i = 0; i < 3; i++) {if (i == 2) {buffer.putLong(i * 33).putLong(99);} else {buffer.putLong(i * 33).putLong((i + 1) * 33 - 1);}}channel.close();RandomAccessFile file1 = new RandomAccessFile("temp.txt", "r");FileChannel channel1 = file1.getChannel();for (int i = 0; i < 3; i++) {MappedByteBuffer buffer1 = channel1.map(FileChannel.MapMode.READ_ONLY, i * 16, 16);System.out.println(String.format("long1: %d", buffer1.getLong()));System.out.println(String.format("long2: %d", buffer1.getLong()));}channel1.close();

Notepad++ 装个十六进制查看器,查看生成的 temp.txt 中的内容是否和我们代码写的一样:

temp.txt view in HEX

上面是十六进制,换算成十进制就是上面示例代码写的内容。

写下载文件

很简单,利用 RandomAccessFile 可从任意位置读写的属性,分别将每条线程下载的数据写到同一个文件的不同位置。

  1. saveFile() [-> FileHelper.java]
    public void saveFile(FlowableEmitter<DownloadStatus> emitter, int i, File tempFile,File saveFile, ResponseBody response) {RandomAccessFile record = null;FileChannel recordChannel = null;RandomAccessFile save = null;FileChannel saveChannel = null;InputStream inStream = null;try {try {// 1.映射 tempFile 到内存中record = new RandomAccessFile(tempFile, ACCESS);recordChannel = record.getChannel();MappedByteBuffer recordBuffer = recordChannel.map(READ_WRITE, 0, RECORD_FILE_TOTAL_SIZE);// i 代表线程序号,startIndex 代表该线程下载范围的 start 字段在文件中的指针位置int startIndex = i * EACH_RECORD_SIZE;// start 表示该线程的起始下载位置long start = recordBuffer.getLong(startIndex);// 新建一个下载状态对象,用于发射下载进度DownloadStatus status = new DownloadStatus();// totalSize 代表文件总大小,也可以从 saveFile 中读出long totalSize = recordBuffer.getLong(RECORD_FILE_TOTAL_SIZE - 8) + 1;status.setTotalSize(totalSize);int readLen;byte[] buffer = new byte[2048];inStream = response.byteStream();save = new RandomAccessFile(saveFile, ACCESS);saveChannel = save.getChannel();while ((readLen = inStream.read(buffer)) != -1 && !emitter.isCancelled()) {MappedByteBuffer saveBuffer = saveChannel.map(READ_WRITE, start, readLen);saveBuffer.put(buffer, 0, readLen);// 成功下载一段数据后,将已下载位置写回 start 字段start += readLen;recordBuffer.putLong(startIndex, start);// 计算已下载字节数 = 文件长度 - 每条线程剩余未下载字节数status.setDownloadSize(totalSize - getResidue(recordBuffer));// 发射下载进度emitter.onNext(status);}// 发射下载完成emitter.onComplete();} finally {closeQuietly(record);closeQuietly(recordChannel);closeQuietly(save);closeQuietly(saveChannel);closeQuietly(inStream);closeQuietly(response);}} catch (IOException e) {emitter.onError(e);}}

总结

  • 下载流程就不分析了,只要熟练使用下图所示两个快捷键,什么源码分析都是手到擒来:

  • RxDownload2 源码解析系列至此结束,虽然框架比较简单,但是还是有很多值得学习的东西。尤其是作者对 RxJava2 的使用,可以说非常之六了。他写的十篇 Rxjava2 教程也非常的通俗易懂,感兴趣的可以看一看。

拆轮子-RxDownload2源码解析(三)相关推荐

  1. Disruptor源码解析三 RingBuffer解析

    目录 系列索引 前言 主要内容 RingBuffer的要点 源码解析 系列索引 Disruptor源码解析一 Disruptor高性能之道 Disruptor源码解析二 Sequence相关类解析 D ...

  2. OkHttp3源码解析(三)——连接池复用

    OKHttp3源码解析系列 OkHttp3源码解析(一)之请求流程 OkHttp3源码解析(二)--拦截器链和缓存策略 本文基于OkHttp3的3.11.0版本 implementation 'com ...

  3. ReactiveSwift源码解析(三) Signal代码的基本实现

    上篇博客我们详细的聊了ReactiveSwift源码中的Bag容器,详情请参见<ReactiveSwift源码解析之Bag容器>.本篇博客我们就来聊一下信号量,也就是Signal的的几种状 ...

  4. 并发编程与源码解析 (三)

    并发编程 (三) 1 Fork/Join分解合并框架 1.1 什么是fork/join ​ Fork/Join框架是JDK1.7提供的一个用于并行执行任务的框架,开发者可以在不去了解如Thread.R ...

  5. 前端入门之(vuex源码解析三)

    上两节前端入门之(vuex源码解析二)我们把vuex的源码大概的撸了一遍,还剩下(插件.getters跟module),我们继续哈~ 插件童鞋们可以去看看vuex在各个浏览器的状态显示插件,小伙伴可以 ...

  6. Tomcat源码解析三:tomcat的启动过程

    Tomcat组件生命周期管理 在Tomcat总体结构 (Tomcat源代码解析之二)中,我们列出了Tomcat中Server,Service,Connector,Engine,Host,Context ...

  7. 【Vue.js源码解析 三】-- 模板编译和组件化

    前言 笔记来源:拉勾教育 大前端高薪训练营 阅读建议:建议通过左侧导航栏进行阅读 模板编译 模板编译的主要目的是将模板 (template) 转换为渲染函数 (render) <div> ...

  8. Cesium源码解析三(metadata元数据拓展中行列号的分块规则解析)

    目录 1.前言 2.layer.json中available参数意义 3.EPSG:4626切片及terrain分块原理 4.Cesium的terrain分块规则 5.自定义terrain分块规则 6 ...

  9. AFNetworking2.0源码解析三

    本篇说说安全相关的AFSecurityPolicy模块,AFSecurityPolicy用于验证HTTPS请求的证书,先来看看HTTPS的原理和证书相关的几个问题. HTTPS HTTPS连接建立过程 ...

最新文章

  1. MinkowskiBroadcast广播
  2. linux基础命令学习
  3. 模型压缩的开源项目工具
  4. leetcode-169.求众数
  5. OpenCV-图像几何变换:旋转,缩放,斜切 .
  6. Linux下内存泄露工具
  7. java 有穷自动机_Java实现雪花算法(snowflake)
  8. 科学计算机clr,科学计算器按键功能汇总
  9. c/c++教程 - 1.10 结构体 使用typedef定义struct结构体 结构体数组 结构体指针 结构体嵌套 结构体做函数参数 结构体const
  10. lj245a引脚功能图_CA3140中文资料-引脚图及功能
  11. 【NIPS 2020】通过文本压缩,让BERT支持长文本
  12. mathtype字体倾斜
  13. 资源 | 最新版区块链术语表(中英文对照)
  14. 文字表情 emoji 解析大全
  15. pyautogui在网页内写入excel文件内容
  16. 三行情书 计算机网络,“学霸式”三行情书走红!句子很短,爱你如诗
  17. 基于FPGA的ADS1256讲解
  18. 二级路由当作交换机,与一级路由同一个局域网
  19. Thread.currentThread.interrupt()
  20. ATA port 上插入盘后的错误处理(AHCI)

热门文章

  1. ps排版html,PS冷门技!十个不为人知的PS文本排版工具
  2. python 定义全局变量
  3. 字母预言卡里的魔术与数学(二)——魔术背后的建模思路
  4. pycharm爬虫打印网页出现中文乱码问题
  5. 第一章:基于 SpringBoot 快速搭建QQ机器人,并监听群事件
  6. 不工作,靠海外抖音(TikTok)还清所有债务:会赚钱的人都在做这件事 !
  7. Java 跳出For循环总结
  8. 西门子S7200方案 西门子S7200方案CPu型号LPC2136
  9. 学历高 机器学习_找一份机器学习的工作,学历有多重要?
  10. MySQL数据库的多种连接方式