多线程下载及断点续传的实现是使用 HTTP/1.1 引入的 Range 请求参数,可以访问Web资源的指定区间的内容。虽然实现了多线程及断点续传,但还有很多不完善的地方。

包含四个类:

Downloader: 主类,负责分配任务给各个子线程,及检测进度

DownloadFile: 表示要下载的哪个文件,为了能写输入到文件的指定位置,使用 RandomAccessFile 类操作文件,多个线程写同一个文件需要保证线程安全,这里直接调用 getChannel 方法,获取一个文件通道,FileChannel是线程安全的。

DownloadTask: 实际执行下载的线程,获取 [lowerBound, upperBound] 区间的数据,当下载过程中出现异常时要通知其他线程(使用 AtomicBoolean),结束下载

Logger: 实时记录下载进度,以便续传时知道从哪开始。感觉这里做的比较差,为了能实时写出日志及方便地使用Properties类的load/store方法格式化输入输出,每次都是打开后再关闭。

演示:

随便找一个文件下载:

强行结束程序并重新运行:

日志文件:

断点续传的关键是记录各个线程的下载进度,这里细节比较多,花了很久。只需要记录每个线程请求的Range区间极客,每次成功写数据到文件时,就更新一次下载区间。下面是下载完成后的日志内容。

代码:

Downloader.java

package downloader;

import java.io.*;

import java.net.*;

import java.nio.file.Files;

import java.nio.file.Path;

import java.util.concurrent.atomic.AtomicBoolean;

public class Downloader {

private static final int DEFAULT_THREAD_COUNT = 4; // 默认线程数量

private AtomicBoolean canceled; // 取消状态,如果有一个子线程出现异常,则取消整个下载任务

private DownloadFile file; // 下载的文件对象

private String storageLocation;

private final int threadCount; // 线程数量

private long fileSize; // 文件大小

private final String url;

private long beginTime; // 开始时间

private Logger logger;

public Downloader(String url) {

this(url, DEFAULT_THREAD_COUNT);

}

public Downloader(String url, int threadCount) {

this.url = url;

this.threadCount = threadCount;

this.canceled = new AtomicBoolean(false);

this.storageLocation = url.substring(url.lastIndexOf('/')+1);

this.logger = new Logger(storageLocation + ".log", url, threadCount);

}

public void start() {

boolean reStart = Files.exists(Path.of(storageLocation + ".log"));

if (reStart) {

logger = new Logger(storageLocation + ".log");

System.out.printf("* 继续上次下载进度[已下载:%.2fMB]:%s\n", logger.getWroteSize() / 1014.0 / 1024, url);

} else {

System.out.println("* 开始下载:" + url);

}

if (-1 == (this.fileSize = getFileSize()))

return;

System.out.printf("* 文件大小:%.2fMB\n", fileSize / 1024.0 / 1024);

this.beginTime = System.currentTimeMillis();

try {

this.file = new DownloadFile(storageLocation, fileSize, logger);

if (reStart) {

file.setWroteSize(logger.getWroteSize());

}

// 分配线程下载

dispatcher(reStart);

// 循环打印进度

printDownloadProgress();

} catch (IOException e) {

System.err.println("x 创建文件失败[" + e.getMessage() + "]");

}

}

/**

* 分配器,决定每个线程下载哪个区间的数据

*/

private void dispatcher(boolean reStart) {

long blockSize = fileSize / threadCount; // 每个线程要下载的数据量

long lowerBound = 0, upperBound = 0;

long[][] bounds = null;

int threadID = 0;

if (reStart) {

bounds = logger.getBounds();

}

for (int i = 0; i < threadCount; i++) {

if (reStart) {

threadID = (int)(bounds[i][0]);

lowerBound = bounds[i][1];

upperBound = bounds[i][2];

} else {

threadID = i;

lowerBound = i * blockSize;

// fileSize-1 !!!!! fu.ck,找了一下午的错

upperBound = (i == threadCount - 1) ? fileSize-1 : lowerBound + blockSize;

}

new DownloadTask(url, lowerBound, upperBound, file, canceled, threadID).start();

}

}

/**

* 循环打印进度,直到下载完毕,或任务被取消

*/

private void printDownloadProgress() {

long downloadedSize = file.getWroteSize();

int i = 0;

long lastSize = 0; // 三秒前的下载量

while (!canceled.get() && downloadedSize < fileSize) {

if (i++ % 4 == 3) { // 每3秒打印一次

System.out.printf("下载进度:%.2f%%, 已下载:%.2fMB,当前速度:%.2fMB/s\n",

downloadedSize / (double)fileSize * 100 ,

downloadedSize / 1024.0 / 1024,

(downloadedSize - lastSize) / 1024.0 / 1024 / 3);

lastSize = downloadedSize;

i = 0;

}

try {

Thread.sleep(1000);

} catch (InterruptedException ignore) {}

downloadedSize = file.getWroteSize();

}

file.close();

if (canceled.get()) {

try {

Files.delete(Path.of(storageLocation));

} catch (IOException ignore) {

}

System.err.println("x 下载失败,任务已取消");

} else {

System.out.println("* 下载成功,本次用时"+ (System.currentTimeMillis() - beginTime) / 1000 +"秒");

}

}

/**

* @return 要下载的文件的尺寸

*/

private long getFileSize() {

if (fileSize != 0) {

return fileSize;

}

HttpURLConnection conn = null;

try {

conn = (HttpURLConnection)new URL(url).openConnection();

conn.setConnectTimeout(3000);

conn.setRequestMethod("HEAD");

conn.connect();

System.out.println("* 连接服务器成功");

} catch (MalformedURLException e) {

throw new RuntimeException("URL错误");

} catch (IOException e) {

System.err.println("x 连接服务器失败["+ e.getMessage() +"]");

return -1;

}

return conn.getContentLengthLong();

}

public static void main(String[] args) throws IOException {

new Downloader("http://js.xiazaicc.com//down2/ucliulanqi_downcc.zip").start();

}

}

DownloadTask.java

package downloader;

import java.io.*;

import java.net.HttpURLConnection;

import java.net.URL;

import java.nio.ByteBuffer;

import java.nio.channels.Channels;

import java.nio.channels.ReadableByteChannel;

import java.util.concurrent.atomic.AtomicBoolean;

class DownloadTask extends Thread {

private final String url;

private long lowerBound; // 下载的文件区间

private long upperBound;

private AtomicBoolean canceled;

private DownloadFile downloadFile;

private int threadId;

DownloadTask(String url, long lowerBound, long upperBound, DownloadFile downloadFile,

AtomicBoolean canceled, int threadID) {

this.url = url;

this.lowerBound = lowerBound;

this.upperBound = upperBound;

this.canceled = canceled;

this.downloadFile = downloadFile;

this.threadId = threadID;

}

@Override

public void run() {

ReadableByteChannel input = null;

try {

ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024 * 2); // 2MB

input = connect();

System.out.println("* [线程" + threadId + "]连接成功,开始下载...");

int len;

while (!canceled.get() && lowerBound <= upperBound) {

buffer.clear();

len = input.read(buffer);

downloadFile.write(lowerBound, buffer, threadId, upperBound);

lowerBound += len;

}

if (!canceled.get()) {

System.out.println("* [线程" + threadId + "]下载完成" + ": " + lowerBound + "-" + upperBound);

}

} catch (IOException e) {

canceled.set(true);

System.err.println("x [线程" + threadId + "]遇到错误[" + e.getMessage() + "],结束下载");

} finally {

if (input != null) {

try {

input.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

/**

* 连接WEB服务器,并返回一个数据通道

* @return 返回通道

* @throws IOException 网络连接错误

*/

private ReadableByteChannel connect() throws IOException {

HttpURLConnection conn = (HttpURLConnection)new URL(url).openConnection();

conn.setConnectTimeout(3000);

conn.setRequestMethod("GET");

conn.setRequestProperty("Range", "bytes=" + lowerBound + "-" + upperBound);

// System.out.println("thread_"+ threadId +": " + lowerBound + "-" + upperBound);

conn.connect();

int statusCode = conn.getResponseCode();

if (HttpURLConnection.HTTP_PARTIAL != statusCode) {

conn.disconnect();

throw new IOException("状态码错误:" + statusCode);

}

return Channels.newChannel(conn.getInputStream());

}

}

DownloadFile.java

package downloader;

import java.io.IOException;

import java.io.RandomAccessFile;

import java.nio.ByteBuffer;

import java.nio.channels.FileChannel;

import java.util.concurrent.atomic.AtomicLong;

class DownloadFile {

private final RandomAccessFile file;

private final FileChannel channel; // 线程安全类

private AtomicLong wroteSize; // 已写入的长度

private Logger logger;

DownloadFile(String fileName, long fileSize, Logger logger) throws IOException {

this.wroteSize = new AtomicLong(0);

this.logger = logger;

this.file = new RandomAccessFile(fileName, "rw");

file.setLength(fileSize);

channel = file.getChannel();

}

/**

* 写数据

* @param offset 写偏移量

* @param buffer 数据

* @throws IOException 写数据出现异常

*/

void write(long offset, ByteBuffer buffer, int threadID, long upperBound) throws IOException {

buffer.flip();

int length = buffer.limit();

while (buffer.hasRemaining()) {

channel.write(buffer, offset);

}

wroteSize.addAndGet(length);

logger.updateLog(threadID, length, offset + length, upperBound); // 更新日志

}

/**

* @return 已经下载的数据量,为了知道何时结束整个任务,以及统计信息

*/

long getWroteSize() {

return wroteSize.get();

}

// 继续下载时调用

void setWroteSize(long wroteSize) {

this.wroteSize.set(wroteSize);

}

void close() {

try {

file.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

Logger.java

package downloader;

import java.io.*;

import java.util.Properties;

class Logger {

private String logFileName; // 下载的文件的名字

private Properties log;

/**

* 重新开始下载时,使用该构造函数

* @param logFileName

*/

Logger(String logFileName) {

this.logFileName = logFileName;

log = new Properties();

FileInputStream fin = null;

try {

log.load(new FileInputStream(logFileName));

} catch (IOException ignore) {

} finally {

try {

fin.close();

} catch (Exception ignore) {}

}

}

Logger(String logFileName, String url, int threadCount) {

this.logFileName = logFileName;

this.log = new Properties();

log.put("url", url);

log.put("wroteSize", "0");

log.put("threadCount", String.valueOf(threadCount));

for (int i = 0; i < threadCount; i++) {

log.put("thread_" + i, "0-0");

}

}

synchronized void updateLog(int threadID, long length, long lowerBound, long upperBound) {

log.put("thread_"+threadID, lowerBound + "-" + upperBound);

log.put("wroteSize", String.valueOf(length + Long.parseLong(log.getProperty("wroteSize"))));

FileOutputStream file = null;

try {

file = new FileOutputStream(logFileName); // 每次写时都清空文件

log.store(file, null);

} catch (IOException e) {

e.printStackTrace();

} finally {

if (file != null) {

try {

file.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

/**

* 获取区间信息

* ret[i][0] = threadID, ret[i][1] = lowerBoundID, ret[i][2] = upperBoundID

* @return

*/

long[][] getBounds() {

long[][] bounds = new long[Integer.parseInt(log.get("threadCount").toString())][3];

int[] index = {0};

log.forEach((k, v) -> {

String key = k.toString();

if (key.startsWith("thread_")) {

String[] interval = v.toString().split("-");

bounds[index[0]][0] = Long.parseLong(key.substring(key.indexOf("_") + 1));

bounds[index[0]][1] = Long.parseLong(interval[0]);

bounds[index[0]++][2] = Long.parseLong(interval[1]);

}

});

return bounds;

}

long getWroteSize() {

return Long.parseLong(log.getProperty("wroteSize"));

}

}

java多线程下载_Java实现多线程下载,支持断点续传相关推荐

  1. java nio下载_Java Nio 多线程网络下载

    --> 默认最多50个线程 同一文件下载失败延迟超过30秒就结束下载 --> 下载5分钟超时时间,假设5分钟内未下载完就结束下载 --> 依赖 commons-httpclient ...

  2. java断点上传下载_java实现多线程断点续传,上传下载 分享

    程序采用的ftp工具, apache 的 commons-net-ftp-ftpclient package com.ftp; import java.io.File; import java.io. ...

  3. java如何实现下载_java 如何实现下载功能

    展开全部 import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import j ...

  4. jdbc连接mysql数据库驱动下载_Java Jdbc驱动下载配置连接mysql数据库方法代码

    1.MySQL JDBC驱动下载 打开上面的下载地址,选择"Platform Independent",然后下载tar.gz或zip格式的都行,之后解压出来得到Jdbc驱动(mys ...

  5. java 多线程性能_Java中多线程的性能比较

    java 多线程性能 Java中有多种用于多线程的技术. 可以通过同步关键字,锁或原子变量来并行化Java中的一段代码. 这篇文章将比较使用synced关键字ReentrantLock,getAndI ...

  6. java实现多线程抢单_JAVA实现多线程的四种方式

    JAVA多线程实现方式: 1.继承Thread类(无返回值) 2.实现Runnable接口(无返回值) 3.实现Callable接口,通过FutureTask包装器来创建Threak线程(有返回值) ...

  7. java多线程 游戏_java利用多线程和Socket实现猜拳游戏

    本文实例为大家分享了利用多线程和Socket实现猜拳游戏的具体代码,供大家参考,具体内容如下 实例:猜拳游戏 猜拳游戏是指小时候玩的石头.剪刀.布的游戏.客户端与服务器的"较量", ...

  8. java实现多线程抢单_Java模拟多线程实现抢票代码实例

    这篇文章主要介绍了Java模拟多线程实现抢票,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 实现100张票抢购的demo 这里需要一个变量,来保存1 ...

  9. java 手动线程调度_Java Thread 多线程 操作线程

    5.线程的创建和启动 A.继承Thread类或实现Runnable接口,重写或实现run方法,run方法代表线程要完成的任务 B.创建Thread子类或是Runnable的实现类,即创建的线程对象:不 ...

最新文章

  1. WCF(Sender) to MSMQ to WCF(Receiver)
  2. Hystrix 资料简单梳理
  3. 第7步 mybatis-generator dao层生成器
  4. SpringBoot获取配置文件常量值
  5. DevExpress使用技巧总结
  6. 常见的SQL错误和解决方法
  7. 1月3日 升 级gazebo7
  8. USB免驱NFC读写器 Android系统中NFC读写范例
  9. 《麦肯锡方法》学习笔记18
  10. pyside6的MQTT客户端
  11. python爬虫爬取qq音乐巅峰榜热歌歌词,jieba中文分词,词云展示
  12. 最重要的经济先行指标—PMI
  13. java中strlen,浅析C++中strlen函数的使用与模拟实现strlen的方法
  14. .NET定位CPU使用率过高问题
  15. 【经验】迅雨田下载测试
  16. MySQL5.7找到data文件夹
  17. 计算机高级属性启用玻璃,“win键+tab键无法使用”的解决方案
  18. 劝大家别去国企制造业干IT,软件多数据乱,报表开发完全没法做
  19. JavaScript-筑基(十八)键盘事件
  20. java飞行棋项目_java小项目 类与 对象 骑士飞行棋代码

热门文章

  1. 14、HTML <input>标签
  2. 3、MySQL查看或显示数据库(SHOW DATABASES语句)
  3. 1060 Are They Equal (25 分)【难度: 一般 / 知识点: 模拟 字符串处理】
  4. Acwing第 7 场周赛【未完结】
  5. 2.1.5 编码与调制(1)
  6. mySQL之单表更新
  7. Linux之磁盘概述
  8. Linux之grep
  9. Spring boot注册三大组件
  10. Oracle查询给表起别名