2019独角兽企业重金招聘Python工程师标准>>>

如果用HttpURLConnection类的方法打开连接,然后用InputStream类获得输入流,再用BufferedInputStream构造出带缓冲区的输入流,如果网速太慢的话,无论缓冲区设置多大,听起来都是断断续续的,达不到真正缓冲的目的。于是尝试编写代码实现用缓冲方式读取远程文件,以下贴出的代码是我写的MP3解码器的一部分。我是不怎么赞同使用多线程下载的,加之有的链接下载速度本身就比较快,所以在下载速度足够的情况下,就让下载线程退出,直到只剩下一个下载线程。多线程中的同步、HttpURLConnection的超时阻塞等因素都会使代码看起来异常复杂。

      问题模型 数组buf[],N个“写”线程从文件顺序读入固定长度数据写入buf[];一个“读”线程按照文件内容顺序从缓冲区读取,一次读任意长度。一个写线程发生错误或文件读完,这个读线程和所有写线程退出。

针对上述问题,简要介绍一下实现多线程环形缓冲的方法。将缓冲区buf[]分为16块,每块32K,下载线程负责向缓冲区写数据,每次写一块;每次读小于32K的任意字节。线程同步:各个写线程互斥等待空闲块,用信号量机制分配空闲块;各写线程并发填写buf[];读线程和各写线程并发使用buf[]。

为突出主题略去了一些和本文无关的代码。

一、HttpReader类功能:HTTP协议从指定URL读取数据。

package instream;import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;public final class HttpReader {public static final int MAX_RETRY = 10;private URL url;private HttpURLConnection httpConnection;private InputStream in_stream;private long cur_pos;         //决定seek方法中是否执行文件读取定位private int connect_timeout;private int read_timeout;public HttpReader(URL u) {this(u, 5000, 5000);}public HttpReader(URL u, int connect_timeout, int read_timeout) {this.connect_timeout = connect_timeout;this.read_timeout = read_timeout;url = u;}public int read(byte[] b, int off, int len) throws IOException {int r = in_stream.read(b, off, len);cur_pos += r;return r;}/** 抛出异常通知重试.* 例如响应码503可能是由某种暂时的原因引起的,同一IP频繁的连接请求会遭服务器拒绝.*/public void seek(long start_pos) throws IOException {if (start_pos == cur_pos && in_stream != null)return;if (httpConnection != null) {httpConnection.disconnect();httpConnection = null;}if (in_stream != null) {in_stream.close();in_stream = null;}httpConnection = (HttpURLConnection) url.openConnection();httpConnection.setConnectTimeout(connect_timeout);httpConnection.setReadTimeout(read_timeout);String sProperty = "bytes=" + start_pos + "-";httpConnection.setRequestProperty("Range", sProperty);//httpConnection.setRequestProperty("Connection", "Keep-Alive");int responseCode = httpConnection.getResponseCode();if (responseCode < 200 || responseCode >= 300) {try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}throw new IOException("HTTP responseCode="+responseCode);}in_stream = httpConnection.getInputStream();cur_pos = start_pos;}
}

二、IWriterCallBack接口

package instream;/** 读/写通信接口.类似于C++的回调函数* * 例:* class BuffRandAcceURL 内实现本接口的方法tryWriting()* class BuffRandAcceURL 内new Writer(this, ...)传值到Writer.icb* class Writer 内调用icb.tryWriting()*/
public interface IWriterCallBack {public int tryWriting() throws InterruptedException;public void updateBuffer(int i, int len);public void updateWriterCount();public int getWriterCount();public void terminateWriters();
}

三、Writer类:下载线程,负责向buf[]写数据。

package instream;
import java.net.URL;public final class Writer implements Runnable {private static boolean isalive = true;  // 一个线程超时其它线程也能退出private static byte[] buf;private static IWriterCallBack icb;private HttpReader hr;public Writer(IWriterCallBack cb, URL u, byte[] b, int i) {hr = new HttpReader(u);icb = cb;buf = b;Thread t = new Thread(this,"dt_"+i);t.setPriority(Thread.NORM_PRIORITY + 1);t.start();}public void run() {int wbytes=0, wpos=0, rema = 0, retry = 0;int idxmask = BuffRandAcceURL.UNIT_COUNT - 1;boolean cont = true;int index = 0;       //buf[]内"块"索引号int startpos = 0;  //index对应的文件位置(相对于文件首的偏移量)long time0 = 0;while (cont) {try {// 1.等待空闲块if(retry == 0) {if ((startpos = icb.tryWriting()) == -1)break;index = (startpos >> BuffRandAcceURL.UNIT_LENGTH_BITS) & idxmask;wpos = startpos & BuffRandAcceURL.BUF_LENGTH_MASK;wbytes = 0;rema = BuffRandAcceURL.UNIT_LENGTH;time0 = System.currentTimeMillis();}// 2.定位hr.seek(startpos);// 3.下载"一块"int w;while (rema > 0 && isalive) {w = (rema < 2048) ? rema : 2048; //每次读几K合适?if ((w = hr.read(buf, wpos, w)) == -1) {cont = false;break;}rema -= w;wpos += w;startpos += w;    // 能断点续传wbytes += w;}// 下载速度足够快就结束本线程long t = System.currentTimeMillis() - time0;if(icb.getWriterCount() > 1 && t < 500)cont = false;//4.通知"读"线程retry = 0;icb.updateBuffer(index, wbytes);} catch (Exception e) {if(++retry == HttpReader.MAX_RETRY) {isalive = false;icb.terminateWriters();break;}}}icb.updateWriterCount();try {hr.close();} catch (Exception e) {}hr = null;}
}

四、BuffRandAcceURL类功能:创建下载线程;read方法从buf[]读数据。关键是如何简单有效防止死锁?

package instream;
import java.net.URL;public final class BuffRandAcceURL implements IWriterCallBack {public static final int UNIT_LENGTH_BITS = 15;public static final int UNIT_LENGTH = 1 << UNIT_LENGTH_BITS; //2^16=32Kpublic static final int BUF_LENGTH = UNIT_LENGTH << 4;public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS; //16块public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);private static final int MAX_WRITER = 5;private static long file_pointer;private static int read_pos;private static int fill_bytes;private static byte[] buf;           //同时作写线程同步锁private static int[] unit_bytes; //同时作读线程互斥锁private static int alloc_pos;private static URL url;private static boolean isalive = true;private static int writer_count;private static long file_length;private static long frame_bytes;private static int free_unit = UNIT_COUNT;   // "信号量"计数器public BuffRandAcceURL(String sURL) throws Exception {this(sURL,MAX_WRITER);}public BuffRandAcceURL(String sURL, int download_threads) throws Exception {buf = new byte[BUF_LENGTH];unit_bytes = new int[UNIT_COUNT];url = new URL(sURL);// 创建"写"线程writer_count = download_threads;for (int i = 0; i < download_threads; i++) {new Writer(this, url, buf, i + 1);Thread.sleep(200);}try_cache();}/** 缓冲*/private void try_cache() throws InterruptedException {int cache_size = BUF_LENGTH;int bi = unit_bytes[read_pos >> UNIT_LENGTH_BITS];if(bi != 0)cache_size -= UNIT_LENGTH - bi;while (fill_bytes < cache_size) {if (writer_count == 0 || isalive == false)return;synchronized (unit_bytes) {unit_bytes.wait(200);    //wait(200)错过通知也可结束循环?}}}private int try_reading(int i, int len) throws Exception {int n = (i + 1) & (UNIT_COUNT - 1);int r = (unit_bytes[i] > 0) ? (unit_bytes[i] + unit_bytes[n]) : unit_bytes[i];if (r < len) {if (writer_count == 0 || isalive == false)return r;try_cache();}return len;}/** 各个"写"线程互斥等待空闲块. 空闲块按由小到大的顺序分配.*/public int tryWriting() throws InterruptedException {int ret = -1;synchronized (buf) {while (free_unit == 0 && isalive)buf.wait();if(alloc_pos >= file_length || isalive == false)return -1;ret = alloc_pos;alloc_pos += UNIT_LENGTH;free_unit--;}return ret;}/** "写"线程向buf[]写完数据后调用,通知"读"线程*/public void updateBuffer(int i, int len) {synchronized (unit_bytes) {unit_bytes[i] = len;fill_bytes += len;unit_bytes.notify();}}/** "写"线程准备退出时调用*/public void updateWriterCount() {synchronized (unit_bytes) {writer_count--;unit_bytes.notify();}}public int getWriterCount() {return writer_count;}/** read方法内调用*/public void notifyWriter() {synchronized (buf) {buf.notifyAll();}}/** 被某个"写"线程调用,用于终止其它"写"线程;isalive也影响"读"线程流程*/public void terminateWriters() {synchronized (unit_bytes) { if (isalive) {isalive = false;System.out.println("\n读取文件超时。重试 " + HttpReader.MAX_RETRY+ " 次后放弃,请您稍后再试。");}unit_bytes.notify();}notifyWriter();}public int read(byte[] b, int off, int len) throws Exception {int i = read_pos >> UNIT_LENGTH_BITS;// 1.等待有足够内容可读if(try_reading(i, len) < len || isalive == false)return -1;// 2.读取int tail = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTHif (tail < len) {System.arraycopy(buf, read_pos, b, off, tail);System.arraycopy(buf, 0, b, off + tail, len - tail);} elseSystem.arraycopy(buf, read_pos, b, off, len);fill_bytes -= len;file_pointer += len;read_pos += len;read_pos &= BUF_LENGTH_MASK;unit_bytes[i] -= len;if (unit_bytes[i] < 0) {int ni = read_pos >> UNIT_LENGTH_BITS;unit_bytes[ni] += unit_bytes[i];unit_bytes[i] = 0;free_unit++;notifyWriter();} else if (unit_bytes[i] == 0) {free_unit++;    // 空闲块"信号量"计数加1notifyWriter();    // 3.通知}// 如果下一块未填满,意味着文件读完,第1步已处理一次读空两块的情况return len;}}

本文是JAVA开源项目jmp123源代码的一部分,单独成文旨在与朋友们交流文中提出的问题模型的解决方法。

【jmp123下载地址】http://jmp123.sourceforge.net/

转载于:https://my.oschina.net/darkness/blog/363516

JAVA实现环形缓冲多线程读取远程文件相关推荐

  1. java 读取 远程文件_利用JAVA获取远程文件及使用断点续传 供学习者使用

    闲来没事,就做做,程序还是要多写才好@ 原理不说,文件方面及I/O方面,去BAIDU一下就知道,断点续传的原理也很简单,就是在原来已经下载的基础之上继续下载就可以了,用到了这么关键的一句:urlc.s ...

  2. java 读取远程文件并让浏览器下载

    java 读取远程文件并让浏览器下载 @RequestMapping("/downLoadFile")@ResponseBodypublic ResponseEntity<b ...

  3. php输出远程文件边读边下载,php file_get_contents读取远程文件并输出

    php file_get_contents读取远程文件并输出,这样做的好得就是不会在flash播放代码中显示对方网站的文件地址,下面是我自己用的一个.是读取方视频文件用的. require_once( ...

  4. php远程读取几行文件,PHP读取远程文件的三种方法

    PHP读取远程文件的三种方法 (2008-08-01 14:29:55) 标签: php 下载远程文件 it HP读取远程文件的几种方法,做采集的第一步就是读取远程文件- 1.file_get_con ...

  5. 多线程读取大文件,尤其是对日志文件分析很有用。

    我在之前的公司里工作的时候,他们要求我做一个能够分析IIS日志的程序,可我做来做去,也只能做到,1个G的文件读取在140秒左右.愁了很久,想到了用多线程读取大文件的方法,又发现文件读取流可以有很多个, ...

  6. Java的marven工程读取resources文件,并打可执行的jar

    Java的maven工程读取resources文件,并打可执行的jar 文章目录 Java的maven工程读取resources文件,并打可执行的jar 如何读取resources文件 maven如何 ...

  7. 怎么使用远程php文件的函数调用函数,一个读取远程文件的PHP函数

    一个读取远程文件的函数,非常好用 function urlfopen($url, $limit = 0, $post = '', $cookie = '', $bysocket = FALSE  , ...

  8. Java多线程读取excel文件_解决springboot 多线程使用MultipartFile读取excel文件内容报错问题...

    springboot项目开启多线程 启动类加注解开启 @EnableAsync,实现类方法加注解 @Async 前端页面 报错信息 java.io.FileNotFoundException: C:\ ...

  9. java nio读取远程文件_操作分布式文件之七:如何并行读写远程文件

    FttpAdapter是通过FttpReadAdapter的tryReadAll方法进行并行读 FttpAdapter fa = new FttpAdapter("fttp://10.232 ...

最新文章

  1. 2021年大数据Spark(二十三):SparkSQL 概述
  2. [JavaScript] Set类型在JavaScript中的使用
  3. 实现Android和PC之间的蓝牙通信
  4. 雅虎复兴无望,梅耶尔或离职
  5. vue实现HTML转PDF (已解决清晰、页边距、图片跨域导出等问题)
  6. 直接在安装了redis的Linux机器上操作redis数据存储类型--hash类型
  7. 【计算机网络】复习荟萃(四)
  8. (转)Spring4.2.5+Hibernate4.3.11+Struts2.3.24整合开发
  9. python根据ip获取地理位置_使用python根据ip获取目标地理位置信息
  10. ps2021神经ai滤镜无法使用,ps2021没法用神经元滤镜
  11. Java程序强制删除文件
  12. 丢失数据文件和控制文件的恢复案例(zt)
  13. HDU 1880 魔咒词典(字符串hash)
  14. hyperledger-fabric第一天 Fabric环境搭建
  15. win10系统崩溃怎么修复_手把手教你“无损”修复win10系统
  16. 一文学明白数据库系统--数据库系统期末复习--从入门到入考场--考前抄ppt系列
  17. sUploadDir
  18. 3.8 JS 制作无间断图片循环滚动效果
  19. mysql查询练习题
  20. 硬币面值组合(上台阶)

热门文章

  1. 新手探索NLP(十五)——终章
  2. bootstrap-table页码ALL显示为NAN
  3. 十二、经典问题解析一
  4. 本地复现Zabbix v2.2.x, 3.0.0-3.0.3 jsrpc 参数 profileIdx2 SQL 注入漏洞
  5. 双人五子棋对战(需要EasyX图像库)
  6. 2010年厦门商报报导《监控》小说
  7. 现代浏览器博物馆_云旅游!Tableau 为你揭秘纽约现代艺术博物馆的珍贵馆藏
  8. shell编程1到10求和_重磅|郑州市第四届中小学创意编程暨智能设计大赛初中组真题解析(下)...
  9. ffmpeg 找不到bin_FFmpeg开发笔记(九):ffmpeg解码rtsp流并使用SDL同步播放
  10. html div 垂直对齐,div垂直对齐中间css