拆轮子-RxDownload2源码解析(三)
本文为博主原创文章,未经允许不得转载
造轮子者:Season_zlc
轮子用法请戳作者链接 ↑
前言
本文主要讲述 RxDownload2
的多线程断点下载技术。
断点下载技术前提
服务器必须支持按 byte-range
下载,也就是支持 Range: bytes=xxx-xxx
请求头。详见Http协议 rfc2616 - Range。
下载范围分割
很简单,先读取 Content-Length
响应头,获取文件大小,然后用文件大小除以线程数就可计算出每条线程的下载范围。
比如,假设文件大小是
100 bytes
,下载线程数为3
。因为100 / 3 = 33
,所以:
线程 0
的下载范围是0 ~32
即[0 * 33 ~ (0 + 1) * 33 - 1]
线程 1
的下载范围是33~65
即[1 * 33 ~ (1 + 1) * 33 - 1]
线程 2
的下载范围是66~99
即[2 * 33 ~ 100 - 1]
上代码:
- prepareDownload() [-> FileHelper.java]
public void prepareDownload(File lastModifyFile, File tempFile, File saveFile,long fileLength, String lastModify)throws IOException, ParseException {// 将响应头中的上次修改时间转为 long 类型的 unix 时间戳,然后保存到文件中writeLastModify(lastModifyFile, lastModify);// 设置下载文件的大小、计算每条线程的下载范围并保存到 tempFile 中prepareFile(tempFile, saveFile, fileLength);}
- prepareFile() [-> FileHelper.java]
private void prepareFile(File tempFile, File saveFile, long fileLength)throws IOException {RandomAccessFile rFile = null;RandomAccessFile rRecord = null;FileChannel channel = null;try {rFile = new RandomAccessFile(saveFile, ACCESS);rFile.setLength(fileLength);//设置下载文件的长度 rRecord = new RandomAccessFile(tempFile, ACCESS);// 下载范围在文件中的记录方式:|start|end|start|end|start|end|...// 数据类型是 long,long类型在 java 中占 8 个字节,所以每个线程的下载范围都占 16 字节// 所以 tempFile 的长度 RECORD_FILE_TOTAL_SIZE = 16 * 线程数rRecord.setLength(RECORD_FILE_TOTAL_SIZE); //设置指针记录文件的大小// NIO 内存映射文件的方式读写二进制文件,速度更快channel = rRecord.getChannel();// 注意映射方式为读写MappedByteBuffer buffer = channel.map(READ_WRITE, 0, RECORD_FILE_TOTAL_SIZE);long start;long end;// 计算并保存每条线程的下载范围,计算方法同上面举的例子int eachSize = (int) (fileLength / maxThreads);for (int i = 0; i < maxThreads; i++) {if (i == maxThreads - 1) {start = i * eachSize;end = fileLength - 1;} else {start = i * eachSize;end = (i + 1) * eachSize - 1;}buffer.putLong(start);buffer.putLong(end);}} finally {closeQuietly(channel);closeQuietly(rRecord);closeQuietly(rFile);}}
读取下载范围
很简单,上面已经将每条线程的下载范围保存到了 tempFile
中,只要再从 tempFile
中按位置读出来就行了。
- readDownloadRange() [-> FileHelper.java]
public DownloadRange readDownloadRange(File tempFile, int i) throws IOException {RandomAccessFile record = null;FileChannel channel = null;try {// 入参 i 表示线程序号record = new RandomAccessFile(tempFile, ACCESS);channel = record.getChannel();MappedByteBuffer buffer = channel.map(READ_WRITE, i * EACH_RECORD_SIZE, (i + 1) * EACH_RECORD_SIZE);long startByte = buffer.getLong();long endByte = buffer.getLong();return new DownloadRange(startByte, endByte);} finally {closeQuietly(channel);closeQuietly(record);}}
注意 MappedByteBuffer buffer = channel.map(READ_WRITE, i * EACH_RECORD_SIZE, (i + 1) * EACH_RECORD_SIZE);
这句代码是有坑的,但是表现不出来,因为这里的文件打开方式为 READ_WRITE
。要是改成 READ_ONLY
就有导致读取最后一条线程的下载范围时抛出IllegalArgumentException
(代码静态检查工具 Fortify
提示要以合适的权限打开文件,我将其改为了 READ_ONLY
,发现了这一问题)。
错误原因:map() 方法的最后一个参数表示要映射的字节数,以只读方式打开时,若参数大小超过了文件剩余可读字节数,就会抛出 IllegalArgumentException
。而以读写方式打开文件时,会自动扩展文件长度,所以不会抛出异常。
因为每段下载范围的长度都是 EACH_RECORD_SIZE = 16 bytes
,所以,上述代码应修改为:
MappedByteBuffer buffer = channel.map(READ_WRITE, i * EACH_RECORD_SIZE, EACH_RECORD_SIZE);
Intellij IDEA 示例代码
自己写了个示例代码,测试了一下:
RandomAccessFile file = new RandomAccessFile("temp.txt", "rw");file.setLength(48);FileChannel channel = file.getChannel();MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 48);for (int i = 0; i < 3; i++) {if (i == 2) {buffer.putLong(i * 33).putLong(99);} else {buffer.putLong(i * 33).putLong((i + 1) * 33 - 1);}}channel.close();RandomAccessFile file1 = new RandomAccessFile("temp.txt", "r");FileChannel channel1 = file1.getChannel();for (int i = 0; i < 3; i++) {MappedByteBuffer buffer1 = channel1.map(FileChannel.MapMode.READ_ONLY, i * 16, 16);System.out.println(String.format("long1: %d", buffer1.getLong()));System.out.println(String.format("long2: %d", buffer1.getLong()));}channel1.close();
给 Notepad++
装个十六进制查看器,查看生成的 temp.txt
中的内容是否和我们代码写的一样:
上面是十六进制,换算成十进制就是上面示例代码写的内容。
写下载文件
很简单,利用 RandomAccessFile
可从任意位置读写的属性,分别将每条线程下载的数据写到同一个文件的不同位置。
- saveFile() [-> FileHelper.java]
public void saveFile(FlowableEmitter<DownloadStatus> emitter, int i, File tempFile,File saveFile, ResponseBody response) {RandomAccessFile record = null;FileChannel recordChannel = null;RandomAccessFile save = null;FileChannel saveChannel = null;InputStream inStream = null;try {try {// 1.映射 tempFile 到内存中record = new RandomAccessFile(tempFile, ACCESS);recordChannel = record.getChannel();MappedByteBuffer recordBuffer = recordChannel.map(READ_WRITE, 0, RECORD_FILE_TOTAL_SIZE);// i 代表线程序号,startIndex 代表该线程下载范围的 start 字段在文件中的指针位置int startIndex = i * EACH_RECORD_SIZE;// start 表示该线程的起始下载位置long start = recordBuffer.getLong(startIndex);// 新建一个下载状态对象,用于发射下载进度DownloadStatus status = new DownloadStatus();// totalSize 代表文件总大小,也可以从 saveFile 中读出long totalSize = recordBuffer.getLong(RECORD_FILE_TOTAL_SIZE - 8) + 1;status.setTotalSize(totalSize);int readLen;byte[] buffer = new byte[2048];inStream = response.byteStream();save = new RandomAccessFile(saveFile, ACCESS);saveChannel = save.getChannel();while ((readLen = inStream.read(buffer)) != -1 && !emitter.isCancelled()) {MappedByteBuffer saveBuffer = saveChannel.map(READ_WRITE, start, readLen);saveBuffer.put(buffer, 0, readLen);// 成功下载一段数据后,将已下载位置写回 start 字段start += readLen;recordBuffer.putLong(startIndex, start);// 计算已下载字节数 = 文件长度 - 每条线程剩余未下载字节数status.setDownloadSize(totalSize - getResidue(recordBuffer));// 发射下载进度emitter.onNext(status);}// 发射下载完成emitter.onComplete();} finally {closeQuietly(record);closeQuietly(recordChannel);closeQuietly(save);closeQuietly(saveChannel);closeQuietly(inStream);closeQuietly(response);}} catch (IOException e) {emitter.onError(e);}}
总结
下载流程就不分析了,只要熟练使用下图所示两个快捷键,什么源码分析都是手到擒来:
RxDownload2
源码解析系列至此结束,虽然框架比较简单,但是还是有很多值得学习的东西。尤其是作者对RxJava2
的使用,可以说非常之六了。他写的十篇Rxjava2
教程也非常的通俗易懂,感兴趣的可以看一看。
拆轮子-RxDownload2源码解析(三)相关推荐
- Disruptor源码解析三 RingBuffer解析
目录 系列索引 前言 主要内容 RingBuffer的要点 源码解析 系列索引 Disruptor源码解析一 Disruptor高性能之道 Disruptor源码解析二 Sequence相关类解析 D ...
- OkHttp3源码解析(三)——连接池复用
OKHttp3源码解析系列 OkHttp3源码解析(一)之请求流程 OkHttp3源码解析(二)--拦截器链和缓存策略 本文基于OkHttp3的3.11.0版本 implementation 'com ...
- ReactiveSwift源码解析(三) Signal代码的基本实现
上篇博客我们详细的聊了ReactiveSwift源码中的Bag容器,详情请参见<ReactiveSwift源码解析之Bag容器>.本篇博客我们就来聊一下信号量,也就是Signal的的几种状 ...
- 并发编程与源码解析 (三)
并发编程 (三) 1 Fork/Join分解合并框架 1.1 什么是fork/join Fork/Join框架是JDK1.7提供的一个用于并行执行任务的框架,开发者可以在不去了解如Thread.R ...
- 前端入门之(vuex源码解析三)
上两节前端入门之(vuex源码解析二)我们把vuex的源码大概的撸了一遍,还剩下(插件.getters跟module),我们继续哈~ 插件童鞋们可以去看看vuex在各个浏览器的状态显示插件,小伙伴可以 ...
- Tomcat源码解析三:tomcat的启动过程
Tomcat组件生命周期管理 在Tomcat总体结构 (Tomcat源代码解析之二)中,我们列出了Tomcat中Server,Service,Connector,Engine,Host,Context ...
- 【Vue.js源码解析 三】-- 模板编译和组件化
前言 笔记来源:拉勾教育 大前端高薪训练营 阅读建议:建议通过左侧导航栏进行阅读 模板编译 模板编译的主要目的是将模板 (template) 转换为渲染函数 (render) <div> ...
- Cesium源码解析三(metadata元数据拓展中行列号的分块规则解析)
目录 1.前言 2.layer.json中available参数意义 3.EPSG:4626切片及terrain分块原理 4.Cesium的terrain分块规则 5.自定义terrain分块规则 6 ...
- AFNetworking2.0源码解析三
本篇说说安全相关的AFSecurityPolicy模块,AFSecurityPolicy用于验证HTTPS请求的证书,先来看看HTTPS的原理和证书相关的几个问题. HTTPS HTTPS连接建立过程 ...
最新文章
- MinkowskiBroadcast广播
- linux基础命令学习
- 模型压缩的开源项目工具
- leetcode-169.求众数
- OpenCV-图像几何变换:旋转,缩放,斜切 .
- Linux下内存泄露工具
- java 有穷自动机_Java实现雪花算法(snowflake)
- 科学计算机clr,科学计算器按键功能汇总
- c/c++教程 - 1.10 结构体 使用typedef定义struct结构体 结构体数组 结构体指针 结构体嵌套 结构体做函数参数 结构体const
- lj245a引脚功能图_CA3140中文资料-引脚图及功能
- 【NIPS 2020】通过文本压缩,让BERT支持长文本
- mathtype字体倾斜
- 资源 | 最新版区块链术语表(中英文对照)
- 文字表情 emoji 解析大全
- pyautogui在网页内写入excel文件内容
- 三行情书 计算机网络,“学霸式”三行情书走红!句子很短,爱你如诗
- 基于FPGA的ADS1256讲解
- 二级路由当作交换机,与一级路由同一个局域网
- Thread.currentThread.interrupt()
- ATA port 上插入盘后的错误处理(AHCI)