运行环境

SDK:openJDK13

内存:200M (通过 VM 参数 -Xmx200M 指定)

目标数据文件:raw.data (1.72G)

基本思路(分治)

  1. 切分:从目标数据文件中读取数据,读取一定数量后对读取到的数据进行排序,并生成临时排序文件,重复此过程,将原始数据文件分割为若干个已排序的数据文件
  2. 合并:根据上一阶段得到的分组文件数量,如果内存不足以一次创建所有文件的指针,则需要进行多次合并。合并时,将若干个临时数据文件合并为更大的数据文件,使用归并排序思想,使用优先级队列辅助。重复此过程直到生成的临时数据文件个数足够少。

实现

定义数据对象

  • 定义数据格式、数据对象与字符串的互转方法,使程序可以从输入流中分离到数据对象。下文代码中,使用 toString() 和 String 参数的构造方法实现
  • 定义排序规则:实现 Comparable<T> 接口

生成乱序数据文件

  • 在数据类中实现一个随机生成方法,在工具类中开几个线程,因为这里定义的数据对象略大,所以本实例中向文件中写入了 5000_0000 个数据对象

分割

  • 规定系统可分配的工作线程数量,因为内存十分有限,如果线程太多,则线程本身就会造成 OOM ,这里规定了 12 个线程。因为过分限制内存,整个过程效率比较底下。
  • 规定分割尺寸,这个值影响每个线程实际执行的任务量、生成的临时文件数量,即分割的粒度。这里指定为 80_0000
  • 多线程执行任务,针对读到的指定数量的数据对象进行排序、写入临时数据文件,因为各线程只能共享一个输入流(使用的 BufferedReader 是线程安全的,无需同步控制),所以可能多线程的优势并不明显,不过排序任务、写文件可并行进行。
  • 本实例中,分割生成了 754 个(从零开始编号)临时数据文件

合并

  • 因为一旦在上一阶段生成了太多临时文件,同时创建太多输入流可能导致 OOM ,可以进行多次合并,本实例中,进行了两次合并。
  • 封装一个类似链表的类,提供 next, hasNext 方法,该类用于从临时数据文件中读取数据对象,并预读下一个读取的对象,优先级队列可以根据它对数据文件进行排序,以保证合并后的数据顺序。
  • 合并时,每个线程根据为自己分配的临时文件数创建一个指定容量的优先级队列,将各个文件打开,进行合并,每个线程生成一个新的临时文件
  • 针对各线程生成的临时文件进行合并,得到最终排序后的文件。

  • 上图即为合并时的临时文件,和得到的最终排序文件(sorted.data)。比原数据文件小了接近 50M ,因为在处理换行符时未统一(生成 raw.data 时使用的换行符是 System.lineSeparator() 在 windows 下为 "\r\n",生成 sorted.data 时使用的是 '\n'),导致每一行缺少一个 '\r' ,数据共 5000_0000 行,所以空间约为 50M

闲扯(可忽略)

近期阅读 openJDK13 源码,觉得十分枯燥,决定写点啥,于是有了上面的东西。

撸代码之前,觉得这个大文件排序不是很难,为了加大难度,决定不搜索,全凭源码中的注释,另外使用线程池,支持需要兼顾数据元对象的拓展,即允许替换排序对象类,使之可以实现针对多种对象的大文件排序

所以,为了达到上述的需求,尝试了很多方法,最后决定使用抽象工厂,而这一方式也带来了一些方便,比如在分割、排序时,使用针对流的链式调用即可实现将读到的字符串数组转为数据对象并进行排序、生成排序后的字符数组并写入临时数据文件。

开始写的时候行云流水,在处理掉明显的错误后,发现最终得到的文件比原数据文件少了几百M,考虑到上文提到的换行符问题,相差的大小还是不对,冒着卡死的风险将排序后的数据文件拖到 Notepad++ 后,发现竟然少了40 万行数据而且顺序还有问题。无奈从头开始,将源数据文件控制到 100 行、200行、1000行并调整线程数、分割粒度等参数,将所有问题处理掉才最终完成。

整个过程,还是过于高估了自己的能力。虽说没有使用搜索,但是耗时远超出了当时的预期,不过,写代码还是比读代码有趣。

最后,放码过来

本代码未进行足够测试,只通过了数据文件为 100~2000 行以及 5000_0000 行的测试,其他测试均未进行

因为使用断言,而且断言影响程序功能,如果需要运行,需要指定虚拟机参数: -ea

Main.java

package work.cxlm;import work.cxlm.helper.BigFileSorter;
import work.cxlm.helper.Element;
import work.cxlm.helper.Generator;import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;public class Main {public static void main(String[] args) { // 通过调整注释以进行不同任务String originFileName = "D:/works/Java/try55_bigsort/raw.data";// generateDataFile(originFileName);// ensureMemorize();bigFileSort(80_0000, originFileName);}private static void bigFileSort(int divideSize, String originFilename) {try {BigFileSorter<Element> sorter = new BigFileSorter<>(originFilename, divideSize, Element::new);sorter.startSort();} catch (FileNotFoundException e) {e.printStackTrace();}}// 一定会 OOM 的方法,仅用于测试限定内存是否生效// 经测试,200M 内存可以存放 1633812 个对象private static void ensureMemorize() {List<Element> eles = new LinkedList<>();int counter = 0;try {for (; ; ) {eles.add(Element.random());counter++;}} catch (OutOfMemoryError error) {error.printStackTrace();eles = null;System.gc();System.out.println("指定的空间耗尽,共创建"+counter+"个对象");}}private static void generateDataFile(String filename) {try {Generator.generate(filename, 1000);} catch (IOException | InterruptedException e) {e.printStackTrace();}}}

BigFileSorter.java

package work.cxlm.helper;import java.io.*;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;public class BigFileSorter<T extends SortableStringConvertAble> {private static final int WORKER_COUNT = 12;private File originFile;private File targetFile;private final int DIVIDE_SIZE;private ConvertFactory<T> factory;private static final String TEMP_FILENAME_FORMAT = "%s/.temp%04d";private ThreadPoolExecutor pool = new ThreadPoolExecutor(WORKER_COUNT, WORKER_COUNT, 0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(WORKER_COUNT), runnable -> new Thread(runnable, "排序任务单元"));public BigFileSorter(String originPath, int divideSize, ConvertFactory<T> factory) throws FileNotFoundException {originFile = new File(originPath);if (!originFile.exists()) {throw new FileNotFoundException("文件[" + originPath + "]不存在");}targetFile = new File(originFile.getParent() + "/sorted.data");DIVIDE_SIZE = divideSize;this.factory = factory;}public void startSort() {try {int tempFileCount = divideAndSort();mergeSection(tempFileCount);} catch (Exception e) {e.printStackTrace();}}private void mergeSection(int tempFileCount) throws IOException, InterruptedException {System.out.println("正在进行合并");AtomicInteger fileCounter = new AtomicInteger();int groupSize = (tempFileCount / WORKER_COUNT) + 1;String finalFileFormat = "%s/.temp.%02d";for (int worker = 0; worker < WORKER_COUNT; worker++) {pool.submit(() -> {int thisId = fileCounter.getAndIncrement();int startFileNumber = thisId * groupSize;int endFileNumber = Math.min(tempFileCount, startFileNumber + groupSize);PriorityQueue<TempNodeList> nodeQueue = new PriorityQueue<>(groupSize, Comparator.comparing(a -> a.nowNode.val()));for (int i = startFileNumber; i < endFileNumber; i++) {File nowFile = new File(String.format(TEMP_FILENAME_FORMAT, originFile.getParent(), i));try {nodeQueue.offer(new TempNodeList(nowFile));} catch (IOException e) {e.printStackTrace();}}File threadTarget = new File(String.format(finalFileFormat, originFile.getParent(), thisId));try {assert threadTarget.createNewFile();queueToFile(threadTarget, nodeQueue);} catch (IOException e) {e.printStackTrace();}});}pool.shutdown();System.out.println("等待合并线程结束");while (!pool.awaitTermination(2, TimeUnit.SECONDS)) {System.out.print(".");}PriorityQueue<TempNodeList> nodeQueue = new PriorityQueue<>(WORKER_COUNT, Comparator.comparing(a -> a.nowNode.val()));System.out.println("进行目标文件生成");for (int i = 0; i < WORKER_COUNT; i++) {File nowFile = new File(String.format(finalFileFormat, originFile.getParent(), i));assert nowFile.exists();TempNodeList node = new TempNodeList(nowFile);if (node.nowNode != null)nodeQueue.offer(node);}assert targetFile.exists() || targetFile.createNewFile();queueToFile(targetFile, nodeQueue);System.out.println("完成合并,目标文件:" + targetFile.getName());}private void queueToFile(File file, Queue<TempNodeList> nodeQueue) throws IOException {BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(file));while (!nodeQueue.isEmpty()) {TempNodeList nowNodes = nodeQueue.poll();os.write(nowNodes.now().toString().getBytes());os.write("\n".getBytes());if (nowNodes.hasNext()) {nowNodes.next();nodeQueue.offer(nowNodes);}}os.flush();os.close();}class TempNodeList {BufferedReader reader;T nowNode;String nextStr;private boolean end;TempNodeList(File file) throws IOException {reader = new BufferedReader(new FileReader(file));next();next();}public void next() throws IOException {if (nextStr == null || nextStr.isEmpty()) {nowNode = null;} else {nowNode = factory.create(nextStr);}while ((nextStr = reader.readLine()) != null && nextStr.isEmpty());  // 跳过空行if (nextStr == null) {end = true;}}public T now() {return nowNode;}public boolean hasNext() {return !end;}}private int divideAndSort() throws FileNotFoundException {System.out.println("正在进行分割排序");int threadMissionSize = DIVIDE_SIZE / WORKER_COUNT;BufferedReader reader = new BufferedReader(new FileReader(originFile));AtomicBoolean end = new AtomicBoolean(false);AtomicInteger fileCounter = new AtomicInteger();AtomicInteger debugCounter = new AtomicInteger();for (int i = 0; i < WORKER_COUNT; i++) {pool.submit(() -> {while (!end.get()) {List<String> threadMissionOriginStringArray = new ArrayList<>(threadMissionSize);while (threadMissionOriginStringArray.size() < threadMissionSize && !end.get()) {String readMission = null;try {  // 尝试读取一行readMission = reader.readLine();} catch (IOException e) {e.printStackTrace();}if (readMission == null) {  // 读到文件末尾,结束读取end.set(true);break;} else if (!readMission.isEmpty()) { // 不是空行,添加到任务集合threadMissionOriginStringArray.add(readMission);}}  // -- 领取任务// 分割、排序、生成临时文件if (threadMissionOriginStringArray.isEmpty())return;  // 空集合,停止任务String threadFileName = String.format(TEMP_FILENAME_FORMAT, originFile.getParent(), fileCounter.getAndIncrement());File tempFile = new File(threadFileName);try {assert tempFile.createNewFile() : "创建临时文件";BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(tempFile));// 将无序字符串列表转为有序字符串列表threadMissionOriginStringArray.stream().map(factory::create).sorted(Comparator.comparing(SortableStringConvertAble::val)).map(T::toString).forEach(str -> {try {debugCounter.incrementAndGet();  // 此处检测得到正确数值bos.write(str.getBytes());  // 写入临时文件以释放内存bos.write("\n".getBytes());  // 写入新行} catch (IOException e) {e.printStackTrace();}});bos.flush();bos.close();} catch (IOException e) {System.out.println("写入临时文件出错");e.printStackTrace();}}  // -- thread's loop to read origin file});  // -- submit}  // -- loop to submit work threadSystem.out.println("等待分割排序线程结束");while (pool.getCompletedTaskCount() != WORKER_COUNT) {try {Thread.sleep(2000);System.out.print(".");} catch (InterruptedException e) {break;}}System.out.println("分割结束,共处理数据[" + debugCounter.get() + "]项");return fileCounter.get();}
}

ConvertFactory.java

package work.cxlm.helper;// 抽象工厂
public interface ConvertFactory<T extends SortableStringConvertAble> {T create(String str);
}

Element.java

package work.cxlm.helper;import java.util.Random;public class Element extends SortableStringConvertAble {private static final String[] STRING_POOL = {  // 用于生成的随机字符串"埋まる", "白咲", "花", "索菲", "本居", "日向", "星野", "梦美", "乃愛", "篝","越谷", "小鞠", "小林", "宫内", "莲华", "小雪", "北方", "羽未", "白羽", "実り"};private static Random random = new Random(System.currentTimeMillis());private Long value;private String key;public Element(String k, Long v) {key = k;value = v;}public Element(String raw) {if (raw == null || raw.isEmpty()) return;  // 构造空对象String[] kv = raw.split(":");if (kv.length != 2) throw new IllegalArgumentException("给定的值[" + raw + "]无法转化为 Element 对象");key = kv[0];value = Long.valueOf(kv[1]);}public static Element random() {String randKey = STRING_POOL[random.nextInt(STRING_POOL.length)] + "·" + STRING_POOL[random.nextInt(STRING_POOL.length)];Long randVal = random.nextLong();return new Element(randKey, randVal);}@Overridepublic long val() {return value;}@Overridepublic String toString() {return key + ":" + value;}
}

Generator.java

package work.cxlm.helper;import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;public class Generator {static final String SEP = System.lineSeparator();public static void generate(String filepath, int count) throws IOException, InterruptedException {File target = new File(filepath);// 创建父级目录、目标文件assert target.getParentFile().exists() || target.getParentFile().mkdirs();assert target.exists() || target.createNewFile();BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(target));int poolSize = 10;ThreadPoolExecutor pool = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(poolSize), runnable -> new Thread(runnable, "创建数据元"));AtomicInteger atomicCount = new AtomicInteger(count);for (int i = 0; i < poolSize; i++) {pool.submit(() -> {while (atomicCount.decrementAndGet() >= 0) {byte[] bytesToWrite = (Element.random().toString() + SEP).getBytes();synchronized (os) {try {os.write(bytesToWrite);} catch (IOException e) {e.printStackTrace();}}}});}pool.shutdown();pool.awaitTermination(5, TimeUnit.MINUTES);os.flush();os.close();}
}

SortableStringConvertAble.java

package work.cxlm.helper;import java.util.Objects;public abstract class SortableStringConvertAble implements Comparable<SortableStringConvertAble> {public SortableStringConvertAble() {}@Overridepublic abstract String toString();abstract protected long val();@Overridepublic int compareTo(SortableStringConvertAble o) {Objects.requireNonNull(o, "数据元不能为 null");return (int) (val() - o.val());}}

使用有限内存对大型数据文件排序相关推荐

  1. 外排序(大数据文件排序)

    内排序方法的共同特点是在排序的过程中所有数据都在内存中.但是当待排序的记录数目特别多时,在内存中不能一次处理.必须把他们以文件的形式存放于外存,排序时再把他们一部分一部分地调入内存进行处理.这样,在排 ...

  2. Oracle体系结构篇之数据文件

    目录 一.概述 1.1.数据文件 1.2.存储结构分类 二.物理存储结构 2.1.操作系统数据块 2.2.数据文件 三.逻辑存储结构 3.1.表空间 3.2.段 3.3.区 3.4.块 在开始介绍数据 ...

  3. 图像 存储csv_matplotlib基于数据文件绘制其图像

    先前,我们已经介绍过 matplotlib 第三方绘图库的基本操作方法. 这里,我们将介绍如何通过读取文件中的数据来绘制其图像的操作方法. 基本绘制方法 假设要绘制的数据存储在名为 sample.tx ...

  4. 只有1G内存,如何对10G的文件中数据进行排序

    思考 1.归并.外排 2.多线程  RecursiveTask 测试生成文件代码 /****@author dongsheng*@date 2019/1/18 22:58*@Description:* ...

  5. Cinchoo ETL-对大型CSV文件进行排序

    目录 1.介绍 2. 要求 3. 使用方法 3.1 样本数据 清单 1. 示例CSV数据文件 (customers.csv) .NET Framework .NET Core 3.2 快速排序--零配 ...

  6. java 文件内容排序_在Java中对2个大型文本文件进行排序的最佳方法是什么?

    我正在构建一个简单的 Java应用程序,涉及从csv文件中读取信息. csv文件中的信息以这种形式出现: "ID","Description" "AB ...

  7. python写xml文件 数据量特别大_python中大型xml文件的并行处理

    我有几个正在解析的大型xml文件(提取一些数据子集并写入文件),但是每个文件都有很多文件和大量记录,所以我尝试并行化.在 首先,我有一个生成器从文件中提取记录(这很好):def reader(file ...

  8. 内存不足时的大文件排序算法(spark shuffle的排序算法)

    1.问题场景 例如:当前磁盘2T,内存16G,文件大小500G.现在需要对500G的大文件进行排序,并存道另一个文件中. 抽象:当前有包含10000行数的文件,再只能使用长度为1000的数组的前提下, ...

  9. 数据结构 - 单链表(Linked List)实现在内存中实现数据以链表形式生成并根据序号排序

    下面实现一个例子来进行学习 1)介绍 单链表的逻辑结构 在内存中的实际结构 具体创建示意图: 2)代码实现 例子 1.第一个程序在添加的时候并没有按照序号排序,如果在添加的时候把位置改变输出的时候序号 ...

最新文章

  1. RSS FEED的应用
  2. ML之kNNC:基于iris莺尾花数据集(PCA处理+三维散点图可视化)利用kNN算法实现分类预测daiding
  3. java中的topicFont_Fontmin 快速指南
  4. 2016 ACM/ICPC Asia Regional Dalian Online
  5. 阿里与珠海横琴新区达成战略合作,阿里云助力打造横琴智能岛
  6. H5活动产品设计指南基础版
  7. Ubuntu之Docker安装
  8. python设置字体_Python实现文字特效的方法
  9. 软件测试完后,还有bug,责任全在于测试吗?
  10. 李宏毅自然语言处理——依存句法分析
  11. 6.1 tensorflow2实现WideDeep推荐系统——Python实战
  12. C程序设计 4顺序程序设计
  13. excel粘贴为图片不完整_excel转PDF不完整?办公大神的压箱绝技来了!
  14. java中jdk多大_Java中JDK和JRE的区别
  15. Listary——好用到哭的高效快速搜索工具
  16. Java学习决心计划书
  17. 社交产品分析:共同看片,微光
  18. C不会断句?【前后置,位,移位操作符详解】 b = ++c, c++, ++a, a++
  19. 《大象--Thinking in UML 第二版》已于近日在当当首发,同时邀请各位加入新浪微博[大象-thinkinginUml群]:http://q.weibo.com/1483929
  20. 视频数据集 | 视频动作识别video recognition常用数据集整理

热门文章

  1. 探秘Google美国总部
  2. 从实际应用角度浅析SAP仓库中的过账变更功能
  3. 第九届“中国软件杯”大学生软件设计大赛总决赛落幕
  4. 软件工程导图呈现——软件
  5. LocalResolver国际化语言转换
  6. Broadcom博通 EDI 成功案例
  7. 非上海户籍人员在上海买房需要啥条件?
  8. 2007中国各省GDP排名
  9. 前端面试题_2022-02
  10. 全球媒体网关行业收入预计2028年达到19.442亿美元