http://www.importnew.com/10712.html

本文由 ImportNew - 踏雁寻花 翻译自 javacodegeeks。欢迎加入翻译小组。转载请见文末要求。我最近要处理一套存储历史实时数据的大文件fx market data,我很快便意识到,使用传统的InputStream不能够将它们读取到内存,因为每一个文件都超过了4G。甚至编辑器都不能够打开这些文件。在这种特殊情况下,我可以写一个简单的bash脚本将这些文件分成更小的文件块,然后再读取它。但是我不想这样做,因为二进制格式会使这个方法失效。处理这个问题的方式通常就是使用内存映射文件递增地处理区域的数据。关于内存映射文件的一个好处就是它们不会使用虚拟内存和换页空间,因为它们是从磁盘上的文件返回来的数据。很好,让我们来看一看这些文件和额外的一些数据。似乎它们使用逗号分隔的字段包含ASCII文本行。格式:[currency-pair],[timestamp],[bid-price],[ask-price]例子:EUR/USD,20120102 00:01:30.420,1.29451,1.2949我可以为这种格式去写一个程序,但是,读取文件和解析文件是无关的概念。让我们退一步来想一个通用的设计,当在将来面临相似的问题时这个设计可以被重复利用。这个问题可以归结为递增地解码一个已经在无限长的数组中被编码的记录,并且没有耗尽内存。实际上,以逗号分割的示例格式编码与通常的解决方案是不相关的。所以,很明显需要一个解码器来处理不同的格式。再来看,知道整个文件处理完成,每一条记录都不能被解析并保存在内存中,所以我们需要一种方式来转移记录,在它们成为垃圾被回收之前可以被写到其他地方,例如磁盘或者网络。迭代器是处理这个需求的很好的抽象,因为它们就像游标一样,可以正确的指向某个位置。每一次迭代都可以转发文件指针,并且可以让我们使用数据做其他的事情。首先来写一个Decoder 接口,递增地把对象从MappedByteBuffer中解码,如果buffer中没有对象,则返回null。
public interface Decoder<T> {public T decode(ByteBuffer buffer);
}然后让FileReader 实现Iterable接口。每一个迭代器将会处理下一个4096字节的数据,并使用Decoder把它们解码成一个对象的List集合。注意,FileReader 接收文件(files)的list对象,这样是很好的,因为它可以遍历数据,并且不需要考虑聚合的问题。顺便说一下,4096个字节块对于大文件来说是非常小的。
public class FileReader implements Iterable<List<T>> {private static final long CHUNK_SIZE = 4096;private final Decoder<T> decoder;private Iterator<File> files;private FileReader(Decoder<T> decoder, File... files) {this(decoder, Arrays.asList(files));}private FileReader(Decoder<T> decoder, List<File> files) {this.files = files.iterator();this.decoder = decoder;}public static <T> FileReader<T> create(Decoder<T> decoder, List<File> files) {return new FileReader<T>(decoder, files);}public static <T> FileReader<T> create(Decoder<T> decoder, File... files) {return new FileReader<T>(decoder, files);}@Overridepublic Iterator<List<T>> iterator() {return new Iterator<List<T>>() {private List<T> entries;private long chunkPos = 0;private MappedByteBuffer buffer;private FileChannel channel;@Overridepublic boolean hasNext() {if (buffer == null || !buffer.hasRemaining()) {buffer = nextBuffer(chunkPos);if (buffer == null) {return false;}}T result = null;while ((result = decoder.decode(buffer)) != null) {if (entries == null) {entries = new ArrayList<T>();}entries.add(result);}// set next MappedByteBuffer chunkchunkPos += buffer.position();buffer = null;if (entries != null) {return true;} else {Closeables.closeQuietly(channel);return false;}}private MappedByteBuffer nextBuffer(long position) {try {if (channel == null || channel.size() == position) {if (channel != null) {Closeables.closeQuietly(channel);channel = null;}if (files.hasNext()) {File file = files.next();channel = new RandomAccessFile(file, "r").getChannel();chunkPos = 0;position = 0;} else {return null;}}long chunkSize = CHUNK_SIZE;if (channel.size() - position < chunkSize) {chunkSize = channel.size() - position;}return channel.map(FileChannel.MapMode.READ_ONLY, chunkPos, chunkSize);} catch (IOException e) {Closeables.closeQuietly(channel);throw new RuntimeException(e);}}@Overridepublic List<T> next() {List<T> res = entries;entries = null;return res;}@Overridepublic void remove() {throw new UnsupportedOperationException();}};}
}下一个任务就是写一个Decoder 。针对逗号分隔的任何文本格式,编写一个TextRowDecoder 类。接收的参数是每行字段的数量和一个字段分隔符,返回byte的二维数组。TextRowDecoder 可以被操作不同字符集的特定格式解码器重复利用。
public class TextRowDecoder implements Decoder<byte[][]> {private static final byte LF = 10;private final int numFields;private final byte delimiter;public TextRowDecoder(int numFields, byte delimiter) {this.numFields = numFields;this.delimiter = delimiter;}@Overridepublic byte[][] decode(ByteBuffer buffer) {int lineStartPos = buffer.position();int limit = buffer.limit();while (buffer.hasRemaining()) {byte b = buffer.get();if (b == LF) { // reached line feed so parse lineint lineEndPos = buffer.position();// set positions for one row duplicationif (buffer.limit() < lineEndPos + 1) {buffer.position(lineStartPos).limit(lineEndPos);} else {buffer.position(lineStartPos).limit(lineEndPos + 1);}byte[][] entry = parseRow(buffer.duplicate());if (entry != null) {// reset main buffer
          buffer.position(lineEndPos);buffer.limit(limit);// set start after LFlineStartPos = lineEndPos;}return entry;}}buffer.position(lineStartPos);return null;}public byte[][] parseRow(ByteBuffer buffer) {int fieldStartPos = buffer.position();int fieldEndPos = 0;int fieldNumber = 0;byte[][] fields = new byte[numFields][];while (buffer.hasRemaining()) {byte b = buffer.get();if (b == delimiter || b == LF) {fieldEndPos = buffer.position();// save limitint limit = buffer.limit();// set positions for one row duplication
        buffer.position(fieldStartPos).limit(fieldEndPos);fields[fieldNumber] = parseField(buffer.duplicate(), fieldNumber, fieldEndPos - fieldStartPos - 1);fieldNumber++;// reset main buffer
        buffer.position(fieldEndPos);buffer.limit(limit);// set start after LFfieldStartPos = fieldEndPos;}if (fieldNumber == numFields) {return fields;}}return null;}private byte[] parseField(ByteBuffer buffer, int pos, int length) {byte[] field = new byte[length];for (int i = 0; i < field.length; i++) {field[i] = buffer.get();}return field;}
}这是文件被处理的过程。每一个List包含的元素都从一个单独的buffer中解码,每一个元素都是被TextRowDecoder定义的byte二维数组。
TextRowDecoder decoder = new TextRowDecoder(4, comma);
FileReader<byte[][]> reader = FileReader.create(decoder, file.listFiles());
for (List<byte[][]> chunk : reader) {// do something with each chunk
}我们可以在这里打住,不过还有额外的需求。每一行都包含一个时间戳,每一批都必须分组,使用时间段来代替buffers,如按照天分组、或者按照小时分组。我还想要遍历每一批的数据,因此,第一反应就是,为FileReader创建一个Iterable包装器,实现它的行为。一个额外的细节,每一个元素必须通过实现Timestamped接口(这里没有显示)提供时间戳到PeriodEntries。

public class PeriodEntries<T extends Timestamped> implements Iterable<List<T>> {private final Iterator<List<T extends Timestamped>> entriesIt;private final long interval;private PeriodEntries(Iterable<List<T>> entriesIt, long interval) {this.entriesIt = entriesIt.iterator();this.interval = interval;}public static <T extends Timestamped> PeriodEntries<T> create(Iterable<List<T>> entriesIt, long interval) {return new PeriodEntries<T>(entriesIt, interval);}@Overridepublic Iterator<List<T extends Timestamped>> iterator() {return new Iterator<List<T>>() {private Queue<List<T>> queue = new LinkedList<List<T>>();private long previous;private Iterator<T> entryIt;@Overridepublic boolean hasNext() {if (!advanceEntries()) {return false;}T entry =  entryIt.next();long time = normalizeInterval(entry);if (previous == 0) {previous = time;}if (queue.peek() == null) {List<T> group = new ArrayList<T>();queue.add(group);}while (previous == time) {queue.peek().add(entry);if (!advanceEntries()) {break;}entry = entryIt.next();time = normalizeInterval(entry);}previous = time;List<T> result = queue.peek();if (result == null || result.isEmpty()) {return false;}return true;}private boolean advanceEntries() {// if there are no rows leftif (entryIt == null || !entryIt.hasNext()) {// try get more rows if possibleif (entriesIt.hasNext()) {entryIt = entriesIt.next().iterator();return true;} else {// no more rowsreturn false;}}return true;}private long normalizeInterval(Timestamped entry) {long time = entry.getTime();int utcOffset = TimeZone.getDefault().getOffset(time);long utcTime = time + utcOffset;long elapsed = utcTime % interval;return time - elapsed;}@Overridepublic List<T> next() {return queue.poll();}@Overridepublic void remove() {throw new UnsupportedOperationException();}};}
}最后的处理代码通过引入这个函数并无太大变动,只有一个干净的且紧密的循环,不必关心文件、缓冲区、时间周期的分组元素。PeriodEntries也是足够的灵活管理任何时长的时间。

TrueFxDecoder decoder = new TrueFxDecoder();
FileReader<TrueFxData> reader = FileReader.create(decoder, file.listFiles());
long periodLength = TimeUnit.DAYS.toMillis(1);
PeriodEntries<TrueFxData> periods = PeriodEntries.create(reader, periodLength);for (List<TrueFxData> entries : periods) {// data for each dayfor (TrueFxData entry : entries) {// process each entry
   }
}你也许意识到了,使用集合不可能解决这样的问题;选择迭代器是一个关键的设计决策,能够解析兆字节的数组,且不会消耗过多的空间。

转载于:https://www.cnblogs.com/wh-king/p/4324846.html

使用Java处理大文件相关推荐

  1. java 读取txt,java读取大文件

    java 读取txt,java读取大文件 package com.bbcmart.util; import java.io.File; import java.io.RandomAccessFile; ...

  2. Java 高效大文件 读取 和 写入(一亿行)

    写文件 需求:写入1亿行,7位以内的随机的数字. 首先看成果图,代表没骗大家!!!!! 这个是最终生成的文件,有770多MB .下面用glogg打开预览: 程序打印耗时 7149ms + 923 ms ...

  3. java实现大文件分片上传

    java实现大文件分片上传 在项目中用到了大文件上传功能,最初从网上参考了一些代码来实现,但是最终的上传效果不是很好,速度比较慢. 之前的上传思路是: 前端利用webUploader分片大文件 后端接 ...

  4. java写大文件_java实现超大文件的读写功能

    对于几百M或上G的大文件可使用java nio进行读写 , 根据个人的需求 可能需要将一个超大文件读写形成很多较小的文件进行分析,这也不是什么难事,在读完一个缓冲区后 更换写入的对象即可,本文就不做详 ...

  5. java 大文件 处理_用Java处理大文件

    java 大文件 处理 我最近不得不处理一组包含历史逐笔交易的外汇市场数据的文件,并很快意识到使用传统的InputStream都无法将它们读取到内存中,因为每个文件的大小都超过4 GB. Emacs甚 ...

  6. 用Java处理大文件

    最近,我不得不处理一组包含逐笔历史汇率市场数据的文件,并很快意识到使用传统的InputStream都无法将它们读取到内存中,因为每个文件的大小都超过4 GB. Emacs甚至无法打开它们. 在这种特殊 ...

  7. java io大文件_JavaIO流对大文件进行分割与合并

    对于大文件传输不方便时候可以试一下分割之后再操作: package com.lym; import java.io.BufferedInputStream; import java.io.Buffer ...

  8. java 读取大文件内容_java读取大文件

    java一般读取文件时,将文件文内容全部加在到内存,然后读取,但是这种读取方式很明显不适合读取大文件,在进行大文件处理时,考虑到内存有限,采用分次读取的方式. java分次读取文件内容有三种方式, 1 ...

  9. java 拷贝大文件_java高效实现大文件拷贝功能

    在java中,FileChannel类中有一些优化方法可以提高传输的效率,其中transferTo( )和 transferFrom( )方法允许将一个通道交叉连接到另一个通道,而不需要通过一个缓冲区 ...

  10. java实现大文件分片上传功能(前后端都有,代码down下来配置完后可以直接运行)

    问题 项目解决的问题主要是java实现分片上传功能,问题描述: 楼主在公司最近项目中使用multipart文件上传视频文件到服务器上,然后用fastdfs保存到数据库中.发现当上传的视频文件太大的时候 ...

最新文章

  1. mysql 事务_MySQL事务
  2. 平衡不完全区组设计 数据分析的SAS实践
  3. Linux命令——cp
  4. Qt学习之路(52): 拖放技术之一
  5. 未定义标识符 stringc/c++(20)_20款丰田酷路泽5700绝版现车最后促销
  6. 《数据库原理与应用》(第三版)第11章 存储过程和触发器 基础 习题参考答案
  7. CSS3 box-reflect 属性
  8. Axios实现异步通信
  9. 在Linux下开发多语言软件: Hello GetText!
  10. 深度学习《patchGAN》
  11. postgresql 相关杂记
  12. sqlserver 存储过程 分页搜索查询
  13. 新冠最新研究进展(2021年11月)
  14. 微软ime日文输入法在假名输入模式下怎么快速输入英文
  15. mysql优化-Explain工具介绍
  16. 英语的计算机软件如何拼写,软的英文单词
  17. 临床血液学综合练习题库【2】
  18. dos命令之 assoc 用法详解
  19. 企业什么喜欢做电视看板,电视看板浏览网页的必备工具 电视看板浏览器 电视看板自动打开网页
  20. mySQL 教程 第7章 存储过程和函数

热门文章

  1. 乐在其中设计模式(C#) - 命令模式(Command Pattern)
  2. 也谈WEB打印(四):让我们的模板支持打印,并根据内容动态的生成页面
  3. fpga烧写bin文件_FPGA烧写程序
  4. encoder decoder模型_3分钟|聊一聊 Decoder 模块
  5. Vue之非单文件组件介绍
  6. RocketMQ的一些基本概念和RocketMQ特性的讲解
  7. thinkphp php 5.2,ThinkPHP5.2:时间查询(改进、优化)
  8. CentOS7.5搭建k8s集群
  9. 原始的DSH深度哈希代码
  10. java中两短行代码合并一行_帮忙啊!!!!找出两个Java文件相似程度超过某一%的代码行。...