开放存储服务(Open Storage Service,OSS),是阿里云对外提供的海量、安全和高可靠的云存储服务,目前越来越多的开发者将应用数据存放至OSS,对使用OSS实现文件的断点续传功能使用的也比较多,在这儿分享下自己使用OSS的实例。

所谓断点下载,就是要从文件已经下载的地方开始继续下载,对于断点续传这样有状态功能的实现,关键点在于如何在客户端完成状态维护。此篇主要介绍多文件的多线程的断点下载。

为了限定同时下载文件的个数和每个文件同时下载的线程数,使用了线程池嵌套线程池来完成,外层线程池downloadMainPool用来限定并发下载的文件数,内层线程池pool执行单个文件的多线程下载。

downloadMainPool 代码片段:

public static ExecutorService downloadMainPool = null;
    static{
        downloadMainPool = Executors.newFixedThreadPool(Constant.CONCURRENT_FILE_NUMBER,new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread s = Executors.defaultThreadFactory().newThread(r);
                s.setDaemon(true);
                return s;
            }
        });
    }

在static中实例化为固定大小的线程池,由于默认的ThreadFactory创建的线程为非守护状态,为了避免java程序不能退出的问题,保证在文件下载完后当前java程序结束在jvm中的运行,需要重写ThreadFactory,使其创建的线程为守护线程。

Constant类中自定义了程序中需要使用到的变量,可在类中直接定义或读取配置文件,Constant.CONCURRENT_FILE_NUMBER定义了并发文件数。

downloadMainPool中的每个线程负责一个文件,每个线程下载文件时创建子线程池,由子线程池分块下载文件;要做到断点续传需要记录每个子线程池中的每个线程下载位置,这里使用定时序列化子线程池线程对象的方式,定时将包含了下载位置的线程序列化到文件,在再次下载同一文件时反序列化,直接丢到子线程池下载即可。下面是downloadMainPool中的线程OSSDownloadFile代码清单

OSSDownloadFile 代码:

package cloudStorage.oss;

import java.io.File;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import cloudStorage.basis.Constant;
import cloudStorage.basis.Global;
import cloudStorage.basis.OSSClientFactory;
import cloudStorage.oss.download.DownloadPartObj;
import cloudStorage.oss.download.DownloadPartThread;
import cloudStorage.util.ObjectSerializableUtil;

import com.aliyun.oss.OSSClient;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.model.ObjectMetadata;

/**
 * @Description: oss多线程分段下载文件
 * @author: zrk
 * @time: 2015年4月1日 上午10:37:35
 */
public class OSSDownloadFile  implements Callable<Integer>{
    public static final Logger LOGGER = Logger.getLogger(OSSDownloadFile.class);
    //外层线程池
    public static ExecutorService downloadMainPool = null;
    //内层线程池
    private ExecutorService pool ;

static{
        downloadMainPool = Executors.newFixedThreadPool(Constant.CONCURRENT_FILE_NUMBER,new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread s = Executors.defaultThreadFactory().newThread(r);
                s.setDaemon(true);
                return s;
            }
        });
    }
    private String localFilePath;//本地文件路径
    private String bucketName; //bucketName
    private String key;//云端存储路径

public OSSDownloadFile() {
        super();
    }

public OSSDownloadFile(String localFilePath,String bucketName,String key) {
        //初始化子线程池
        pool = Executors.newFixedThreadPool(Constant.SINGLE_FILE_CONCURRENT_THREADS);
        this.localFilePath = localFilePath;
        this.bucketName = bucketName;
        this.key = key;

}

//执行当前线程
    public Integer downloadFile() {
        Integer r = Global.ERROR;
        //向downloadMainPool中submit当前线程
        Future<Integer> result = downloadMainPool.submit(this);
        try {
            r=result.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally{
            return r;
        }

}
    /**
     * 
     * @param localFilePath     需要存放的文件路径
     * @param bucketName        bucketName
     * @param key               存储key  -在oss的存储路径
     * @return
     */
    @Override
    public Integer call(){
        //OSSClient 使用单例
        OSSClient client = OSSClientFactory.getInstance();    
        ObjectMetadata objectMetadata = null;
        //判断文件在云端是否存在
        try {
            objectMetadata = client.getObjectMetadata(bucketName, key);
        } catch (OSSException e) {
            LOGGER.info("==请检查bucketName或key");
            return Global.ERROR;
        }
        long fileLength = objectMetadata.getContentLength();

//自定义的每个下载分块大小
        Integer partSize = Constant.DOWNLOAD_PART_SIZE;

//需要下载的文件分块数
        int partCount=calPartCount(fileLength, partSize);

//子线程池的线程对象封装类(用于序列化的)
        DownloadPartObj downloadPartObj = null;
        boolean isSerializationFile = false;
        //序列化的文件路径(与下载文件同路径使用.dw.temp后缀)
        String serializationFilePath = localFilePath+".dw.temp";
        //若存在反序列化对象
        if(new File(serializationFilePath).exists()){
            downloadPartObj = (DownloadPartObj)ObjectSerializableUtil.load(serializationFilePath);
            isSerializationFile = true;
        }
        //序列化文件不存在,分配分块给子线程池线程对象
        if(downloadPartObj==null||!isSerializationFile){
            downloadPartObj = new DownloadPartObj();
            for (int i = 0; i < partCount; i++) {
                  final  long startPos = partSize * i;
                  final  long endPos = partSize * i +( partSize < (fileLength - startPos) ? partSize : (fileLength - startPos)) - 1;
                  //DownloadPartThread是执行每个分块下载任务的线程
                  downloadPartObj.getDownloadPartThreads().add(new DownloadPartThread(startPos, endPos, localFilePath, bucketName, key,Constant.ACCESS_ID,Constant.ACCESS_KEY));
                }
        }

try {
            int i = 0;
            //download方法提交分块下载线程至子线程池下砸,while循环用于下载失败重复下载,Constant.RETRY定义重复下载次数
            while (download(downloadPartObj,serializationFilePath).isResult()==false) {
                if(++i == Constant.RETRY)break;
                LOGGER.info(Thread.currentThread().getName()+"重试第"+i+"次");
            }
        } catch (Exception e) {
            LOGGER.info("=="+e.getMessage());
            return Global.THREAD_ERROR;
        }
        if(!downloadPartObj.isResult()){
            return Global.NETWORK_ERROR;
        }
        return Global.SUCCESS;
    }

/**
     *  多线程下载单个文件
     * @param partThreadObj
     * @param serializationFilePath
     * @return
     */
    private DownloadPartObj download(DownloadPartObj partThreadObj,String serializationFilePath){

try {
            partThreadObj.setResult(true);
            //向子线程池中submit单个文件所有分块下载线程
            for (int i=0 ;i<partThreadObj.getDownloadPartThreads().size();i++) {
                if (partThreadObj.getDownloadPartThreads().get(i).geteTag() == null)
                    pool.submit(partThreadObj.getDownloadPartThreads().get(i));
            }
            //shutdown子线程池,池内所下载任务执行结束后停止当前线程池
            pool.shutdown();
            //循环检查线程池,同时在此序列化partThreadObj
            while (!pool.isTerminated()) {
                ObjectSerializableUtil.save(partThreadObj,serializationFilePath);
                pool.awaitTermination(Constant.SERIALIZATION_TIME, TimeUnit.SECONDS);
            }
            //判断下载结果
            for (DownloadPartThread downloadPartThread: partThreadObj.getDownloadPartThreads()) {
                if (downloadPartThread.geteTag() == null){
                    partThreadObj.setResult(false);
                }
            }
            //下载成功 删除序列化文件
            if (partThreadObj.isResult()==true) 
                ObjectSerializableUtil.delSerlzFile(serializationFilePath);

} catch (Exception e) {
            LOGGER.info("=="+e.getMessage());
        }
        return partThreadObj;
    }

/**
     * 获取分块数
     * @param fileLength
     * @param partSize
     * @return
     */
    private static int calPartCount(long fileLength,long partSize) {
        int partCount = (int) (fileLength / partSize);
        if (fileLength % partSize != 0){
            partCount++;
        }
        return partCount;
    }

public String getLocalFilePath() {
        return localFilePath;
    }

public void setLocalFilePath(String localFilePath) {
        this.localFilePath = localFilePath;
    }

public String getBucketName() {
        return bucketName;
    }

public void setBucketName(String bucketName) {
        this.bucketName = bucketName;
    }

public String getKey() {
        return key;
    }

public void setKey(String key) {
        this.key = key;
    }
}

此处引用了多个类 
cloudStorage.basis.Constant;//定义程序中使用的变量 
cloudStorage.basis.Global;//定义了全局的静态值,错误状态值 
cloudStorage.basis.OSSClientFactory;//OSSClient工厂 
cloudStorage.oss.download.DownloadPartObj;//分块下载线程类封装 
cloudStorage.oss.download.DownloadPartThread;//分块下载线程 
cloudStorage.util.ObjectSerializableUtil;//序列工具类

调下载方法是阻塞式,需要给调用者返回下结果,所以使用Callable和Future返回int型状态值,下面是子线程池pool中的分块下载线程DownloadPartThread的代码清单

DownloadPartThread 代码:

package cloudStorage.oss.download;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.util.Date;
import java.util.concurrent.Callable;
import org.apache.log4j.Logger;
import cloudStorage.basis.OSSClientFactory;
import com.aliyun.oss.common.utils.IOUtils;
import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.OSSObject;

/**
 * @Description: 用于上传每个part的线程类 可序列化 用于上传的断点续传
 * @author: zrk
 * @time: 2015年4月1日 上午10:35:34
 */
public class DownloadPartThread implements Callable<DownloadPartThread>,Serializable {

private static final long serialVersionUID = 1L;
    public static final Logger LOGGER = Logger.getLogger(DownloadPartThread.class);
    // 当前线程的下载开始位置
    private long startPos;

// 当前线程的下载结束位置
    private long endPos;

// 保存文件路径
    private String localFilePath;

private String bucketName;
    private String fileKey;
    private String eTag;
    private String accessId;
    private String accessKey;

public DownloadPartThread(long startPos, long endPos, String localFilePath,
            String bucketName, String fileKey, String accessId,
            String accessKey) {
        this.startPos = startPos;
        this.endPos = endPos;
        this.localFilePath = localFilePath;
        this.bucketName = bucketName;
        this.fileKey = fileKey;
        this.accessId = accessId;
        this.accessKey = accessKey;
    }

@Override
    public DownloadPartThread call() {
        RandomAccessFile file = null;
        OSSObject ossObject = null;
        try {
            File pFile = new File(localFilePath);
            if(!pFile.getParentFile().exists())
                pFile.getParentFile().mkdirs();
            file = new RandomAccessFile(localFilePath, "rw");
            //调用ossapi
            GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, fileKey);
            getObjectRequest.setRange(startPos, endPos);
            ossObject = OSSClientFactory.getInstance().getObject(getObjectRequest);
            file.seek(startPos);
            int bufSize = 1024;
            byte[] buffer = new byte[bufSize];
            int bytesRead;
            while ((bytesRead = ossObject.getObjectContent().read(buffer)) > -1) {
                file.write(buffer, 0, bytesRead);
                //更新开始位置,保证在出错后重下载是从上次结束的地方开始下,而不是下载整个块
                startPos += bytesRead; 
            }
            this.eTag = ossObject.getObjectMetadata().getETag();
        } catch (Exception e) {
            LOGGER.info("=="+e.getMessage());
        } finally{
            if(ossObject!=null)IOUtils.safeClose(ossObject.getObjectContent());
            try {if(file!=null)file.close();} catch (IOException e) {e.printStackTrace();}
            return this;
        }

}
}

每个DownloadPartThread下载线程执行结束都会将自己作为返回值返回,当前文件是否下载完整有每个线程的返回值决定。

ObjectSerializableUtil 代码 点击查看

DownloadPartObj 代码:

package cloudStorage.oss.download;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * @Description: 单个文件的下载线程集合
 * @author: zrk
 * @time: 2015年5月5日 上午10:15:11
 */
public class DownloadPartObj implements Serializable{
    private static final long serialVersionUID = 1L;
    /**
     * 下载线程集合
     */
    List<DownloadPartThread> downloadPartThreads = Collections.synchronizedList(new ArrayList<DownloadPartThread>());
    /**
     * 下载结果
     */
    boolean result = true;

public List<DownloadPartThread> getDownloadPartThreads() {
        return downloadPartThreads;
    }
    public void setDownloadPartThreads(List<DownloadPartThread> downloadPartThreads) {
        this.downloadPartThreads = downloadPartThreads;
    }
    public boolean isResult() {
        return result;
    }
    public void setResult(boolean result) {
        this.result = result;
    }

}

所有下载当前文件的线程都会操作downloadPartThreads,所以downloadPartThreads使用集合Collections.synchronizedList将其转换为一个线程安全的类。DownloadPartObj封装了downloadPartThreads和一个用于标识下载成功与失败的boolean值,定时保存序列化文件就直接序列化DownloadPartObj,DownloadPartObj的线程安全是至关重要的。

OSSClientFactory 代码:

package cloudStorage.basis;
import com.aliyun.oss.OSSClient;
public class OSSClientFactory {
    private static OSSClient ossClient = null;
    private OSSClientFactory() {
    }
    public static OSSClient getInstance() {
        if (ossClient == null) {
            // 可以使用ClientConfiguration对象设置代理服务器、最大重试次数等参数。
            // ClientConfiguration config = new ClientConfiguration();
            ossClient = new OSSClient(Constant.OSS_ENDPOINT,Constant.ACCESS_ID, Constant.ACCESS_KEY);
        }
        return ossClient;
    }
}

Constant 代码:

package cloudStorage.basis;

import cloudStorage.service.OSSConfigService;

/**
 * @Description: 
 * @author: zrk
 * @time: 2015年4月1日 下午5:22:28
 */
public class Constant {
    public static String OSS_ENDPOINT = "http://oss.aliyuncs.com/";
    public static String ACCESS_ID;
    public static String ACCESS_KEY;
    public static Integer DOWNLOAD_PART_SIZE ; // 每个下载Part的大小
    public static Integer UPLOAD_PART_SIZE ; // 每个上传Part的大小
    public static int CONCURRENT_FILE_NUMBER ; // 并发文件数。
    public static int SINGLE_FILE_CONCURRENT_THREADS ; // 单文件并发线程数。
    public static int RETRY ;//失败重试次数
    public static int SERIALIZATION_TIME;//断点保存时间间隔(秒)
    //。。。
}

Constant中数值是加载外部配置文件,也可在这儿直接配置,个别参数值断点上传时使用,只看下载的话请忽略。

Global代码:

package cloudStorage.basis;
/**
 * @Description: TODO
 * @author: zrk
 * @time: 2015年4月1日 下午5:22:46
 */
public class Global {
    public static final int SUCCESS = 1;
    public static final int ERROR = 10;
    public static final int FILE_NOT_FOUND_ERROR = 11;
    public static final int THREAD_ERROR = 12;
    public static final int NETWORK_ERROR = 13;
    public static final int OSS_SUBMIT_ERROR = 14;
//  META
    public static final String X_OSS_META_MY_MD5 = "x-oss-meta-my-md5";
}

调用下载方式:

实例化OSSDownloadFile后调用downloadFile方法: 
return new OSSDownloadFile(localFilePath,bucketName, key).downloadFile();

OSS SDK文件的分块下载支持的很好,OSS官方的SDK里面也提供了一个多线程下载功能的实现,所以在实现文件的分块下载并没有什么难度。这块儿多线程下载仅供大家参考,后面自己在使用过程中继续优化。基于OSS-SDK的分块断点上传请参考OSS实现多文件多线程的断点上传(java) 。

OSS实现多文件多线程的断点下载(java)相关推荐

  1. 高并发多线程分片断点下载

    基于Java的高并发多线程分片断点下载 首先直接看测试情况: 单线程下载72MB文件 7线程并发分片下载72MB文件: 下载效率提高2-3倍,当然以上测试结果还和设备CPU核心数.网络带宽息息相关. ...

  2. SpringBoot整合阿里云OSS,支持文件上传、下载、删除、加签等操作

    首先附上OSS基本介绍和官方文档链接:https://help.aliyun.com/product/31815.html?spm=ata.21736010.0.0.25d67536bR4cly 另外 ...

  3. java从url下载文件_从URL下载Java文件

    java从url下载文件 Today we will learn how to download a file from URL in java. We can use java.net.URL op ...

  4. android实现多任务多线程支持断点下载的下载软件

    运行效果图: 多任务多线程下载并不麻烦,只要思路清晰,逻辑清晰正确,是很好实现的.我最后遇到的纠结问题是数据库的操作上,我是拿数据库来存储下载信息的,所以在数据库的关闭上遇到了麻烦.上面那个版本是建立 ...

  5. 多线程多任务断点下载

    简单的说,只要利用了HTTP协议(http://www.ietf.org/rfc/rfc2616.txt)中的如下字段来和服务器端交互,就可以实现文件下载的断点续传: Range : 用于客户端到服务 ...

  6. 阿里云oss服务操作文件,上传下载生成链接删除通知等

    目录 oss关键字解读 pom.xml引入jar包 第1,后台提供上传文件的接口,把文件上传到oss服务器

  7. Java使用HttpUrlConnection实现多线程断点下载

    相信很多同学在面试的时候,经常会被面试官问到这么一个问题:请问如何实现断点下载,即在文件未下载完成时,保存进度,在下次继续下载.要实现这个功能其实并不难,只要使用一个临时文件记录当前的下载进度,然后在 ...

  8. 快速下载助手1.1--添加断点下载

    在上一章中实现了多线程的断点下载,将快速下载助手添加断点下载功能,明天实现了速率统计功能 效果图如下: 打印信息如下: 欢迎使用快速下载助手-->并不是线程多就下载的快! 文件夹已经存在 默认的 ...

  9. 多线程断点下载开发总结(二)- 多线程写文件

    2019独角兽企业重金招聘Python工程师标准>>> 上篇文章提到了向服务器请求部分数据,已达到多线程下载的目的. 这里我们看看如何实现多线程写入文件.先看示例代码: String ...

  10. Python编程:多线程断点下载文件

    一.前言 大多数网站为了服务器宽带均衡使用,会限制单个连接的传输速度.有时需要下载几百MB大小文件,但下载速度只有100~200KB/S的时候,可以采用python开多线程流式下载该文件以加快下载速度 ...

最新文章

  1. python交互式绘图库_一个交互式可视化Python库——Bokeh
  2. python二级考time库吗_学python第十七节:time库的学习
  3. JVM命令查看与设置参数
  4. echarts折线图相关
  5. java android aes加密解密_AES加密解密在JAVA和ANDROID下互通
  6. SharePoint 2013 图文开发系列之自定义字段
  7. 为什么有时候NSData转换成NSString的时候返回nil
  8. zend studio一些常用配置
  9. Linux Matlab服务器进一步改造成Application Server(应用程序服务器)
  10. Shell脚本学习-阶段一
  11. Swift 拷贝文件夹,实现文件夹内容整体复制
  12. hdu1175连连看
  13. Unity Editor 基础篇(三):自定义窗口
  14. 报错Check constraint “book_chk_1“ is violated。难道MySQL中insert 语句只能一条一条插入?
  15. Unity 异常记录日志功能
  16. 流程设计建模方法:流程的需求梳理之活动级别梳理
  17. 《高通QCS8250》Debug XBL开机启动、进入fastboot花屏问题
  18. Cocos Creator 编辑器扩展:Quick Finder
  19. 创建一个简单的springboot项目demo
  20. 贪心算法——国王游戏

热门文章

  1. DOS命令查看局域网所有IP
  2. 2018上半年掘金微信群日报优质文章合集:前端篇
  3. Eucalyptus云服务平台
  4. 拖动滑块验证 php,基于JS组件实现拖动滑块验证功能
  5. ES RestClient查询文档
  6. 【好书推荐】-你的灯亮着吗?
  7. 阿里巴巴Java开发手册详细版本
  8. USB协议学习笔记 - 引言
  9. 计算机中文无敌版,与电脑下象棋无敌版
  10. 2022好用的手机库存管理软件排行榜前十名 手机库存管理app