博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
只有1G内存,如何对10G的文件中数据进行排序
阅读量:4039 次
发布时间:2019-05-24

本文共 8350 字,大约阅读时间需要 27 分钟。

  思考

     1.归并、外排 2.多线程  RecursiveTask

测试生成文件代码

/*** *@author dongsheng *@date 2019/1/18 22:58 *@Description: *@version 1.0.0 */public class GenerateNumber {	private static final String filePath="F:/file_test/data1.txt";	public static void main(String[] args) throws IOException {		//随机生成数据		Random random = new Random();		try (PrintWriter out = new PrintWriter(new File(filePath))) {			long beginTime = System.currentTimeMillis();			System.out.println("beginTime:"+beginTime);			for (int i = 0; i < 1_000_000; i++) {				out.println(random.nextInt());				if (i % 10000 == 0)					out.flush();			}			System.out.println("endTime:"+(System.currentTimeMillis()-beginTime));		}	}}

排序代码

import java.io.BufferedReader;import java.io.FileReader;import java.io.PrintWriter;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ForkJoinPool;/*** *@author dongsheng *@date 2019/1/18 22:58 *@Description: *@version 1.0.0 */public class SortByForkjoin {	private static final String filePath="F:/file_test/data1.txt";	private static final String afterFilePath="F:/file_test/data1-sort.txt";	public static void main(String[] args) throws Exception {		ForkJoinPool pool = new ForkJoinPool();		/***		 * 1.从文件中读取内存容量可处理的数据量 ,拆分为多个部分排序的文件		 * 2.每100_000条数据为一个文件		 * */		int size = 100_000;		int[] array = new int[size];		BufferedReader reader = new BufferedReader(new FileReader(filePath));		String line = null;		int i = 0;		int partition = 0;		/***		 * 1.拆分的部分文件名list,后面归并部分文件需要用到它		 * 2.每size条数据,排序后,放到一个文件		 * */		List
partFiles = new ArrayList<>(); while ((line = reader.readLine()) != null) { array[i++] = Integer.parseInt(line); if (i == size) { // 重置i为0 i = 0; // 排序输出到部分文件 String filename = "F:/file_test/data1-part-" + partition++ + ".txt"; //对当前partition的数据进行排序 doPartitionSort(pool, filename, array, 0, size); partFiles.add(filename); } } reader.close(); // 未达到size的部分数据,排序后放一个文件 if (i > 0) { // 排序输出到部分文件 String filename = "F:/file_test/data1-part-" + partition++ + ".txt"; doPartitionSort(pool, filename, array, 0, i); partFiles.add(filename); } if (partition > 1) { // 归并排序 MergerFileSortTask mtask = new MergerFileSortTask(partFiles, afterFilePath); pool.submit(mtask); mtask.get(); } else { // 将唯一的一个部分文件重命名为最终结果文件名 } pool.shutdown(); } /** * partition的数据进行排序 * @param pool 工作池 * @param filename 文件名 * @param array 数组 * @param start 开始下标 * @param end 结束下标 * @throws Exception */ static void doPartitionSort(ForkJoinPool pool, String filename, int[] array, int start, int end) throws Exception { ArrayMergerSortTask task = new ArrayMergerSortTask(array, start, end); pool.submit(task); task.get(); try (PrintWriter pw = new PrintWriter(filename);) { for (int i = start; i < end; i++) { pw.println(array[i]); } } }}/*** *@author dongsheng *@date 2019/1/18 22:58 *@Description: *@version 1.0.0 */public class ArrayMergerSortTask extends RecursiveAction { // implementation details follow: static final int THRESHOLD = 1000; final int[] array; final int lo, hi; ArrayMergerSortTask(int[] array, int lo, int hi) { this.array = array; this.lo = lo; this.hi = hi; } ArrayMergerSortTask(int[] array) { this(array, 0, array.length); } protected void compute() { if (hi - lo < THRESHOLD) //小于1000,就排序 sortSequentially(lo, hi); else { int mid = (lo + hi) >>> 1; //大于1000,拆分 invokeAll(new ArrayMergerSortTask(array, lo, mid), new ArrayMergerSortTask(array, mid, hi)); merge(lo, mid, hi); } } void sortSequentially(int lo, int hi) { Arrays.sort(array, lo, hi); //利用JDK自带的排序进行 } void merge(int lo, int mid, int hi) { int[] buf = Arrays.copyOfRange(array, lo, mid); for (int i = 0, j = lo, k = mid; i < buf.length; j++) array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++]; } }/*** *@author dongsheng *@date 2019/1/18 22:58 *@Description: *@version 1.0.0 */public class MergerFileSortTask extends RecursiveTask
{ List
partFiles; int lo, hi; String filename; public MergerFileSortTask(List
partFiles, int lo, int hi, String filename) { super(); this.partFiles = partFiles; this.lo = lo; this.hi = hi; this.filename = filename; } public MergerFileSortTask(List
partFiles, String filename) { this(partFiles, 0, partFiles.size(), filename); } @Override protected String compute() { // 如果要归并的文件数大于2,则fork int fileCount = hi - lo; if (fileCount > 2) { //fileCount>2 则继续拆分 int mid = fileCount / 2; MergerFileSortTask task1 = new MergerFileSortTask(partFiles, lo, lo + mid, this.filename + "-1"); MergerFileSortTask task2 = new MergerFileSortTask(partFiles, lo + mid, hi, this.filename + "-2"); // 任务提交forkjoinPool中去执行 task1.fork(); task2.fork(); // 合并两个文件 try { //文件 this.mergerFile(task1.get(), task2.get()); //合并执行结果 } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } } else if (fileCount == 2) { //文件个数为2,合并文件 // 合并两个文件 try { this.mergerFile(this.partFiles.get(lo), this.partFiles.get(lo + 1)); } catch (IOException e) { e.printStackTrace(); } } else if (fileCount == 1) { return this.partFiles.get(lo); } return this.filename; } private void mergerFile(String f1, String f2) throws IOException { try (BufferedReader reader1 = new BufferedReader(new FileReader(f1)); BufferedReader reader2 = new BufferedReader(new FileReader(f2)); PrintWriter pw = new PrintWriter(filename);) { String s1 = reader1.readLine(), s2 = reader2.readLine(); Integer d1 = s1 == null ? null : Integer.valueOf(s1); Integer d2 = s2 == null ? null : Integer.valueOf(s2); while (true) { if (s1 == null) { //s1为null,直接把2写完 // 直接读取reader2,写 while (s2 != null) { pw.println(s2); s2 = reader2.readLine(); } } else if (s2 == null) { //s2为null,直接把1写完 // 直接读取reader1,写 while (s1 != null) { pw.println(s1); s1 = reader1.readLine(); } } else { // 比较两个值 while (d1 <= d2 && s1 != null) { // 写s1, 继续读reader1 pw.println(s1); s1 = reader1.readLine(); if (s1 != null) { d1 = Integer.valueOf(s1); } } while (d1 > d2 && s2 != null) { // 写s2, 继续读reader2 pw.println(s2); s2 = reader2.readLine(); if (s2 != null) { d2 = Integer.valueOf(s2); } } } if (s1 == null && s2 == null) // 都读完了 break; } } }}
RecursiveTask

      RecusiveTaskRecusiveAction相似,只不过每个子任务处理之后会带一个返回值,最终所有的子任务的返回结果会join(合并)成一个结果.

源代码

/* * * * * * * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */package java.util.concurrent;/** * A recursive result-bearing {@link ForkJoinTask}. * * 

For a classic example, here is a task computing Fibonacci numbers: 斐波那契 * *

 {@code * class Fibonacci extends RecursiveTask
{ * final int n; * Fibonacci(int n) { this.n = n; } * Integer compute() { * if (n <= 1) * return n; * Fibonacci f1 = new Fibonacci(n - 1); * f1.fork(); * Fibonacci f2 = new Fibonacci(n - 2); * return f2.compute() + f1.join(); * } * }}
* * However, besides being a dumb way to compute Fibonacci functions * (there is a simple fast linear algorithm that you'd use in * practice), this is likely to perform poorly because the smallest * subtasks are too small to be worthwhile splitting up. Instead, as * is the case for nearly all fork/join applications, you'd pick some * minimum granularity size (for example 10 here) for which you always * sequentially solve rather than subdividing. * * @since 1.7 * @author Doug Lea */public abstract class RecursiveTask
extends ForkJoinTask
{ private static final long serialVersionUID = 5232453952276485270L; /** * The result of the computation. */ V result; /** * The main computation performed by this task. * @return the result of the computation */ protected abstract V compute(); public final V getRawResult() { return result; } protected final void setRawResult(V value) { result = value; } /** * Implements execution conventions for RecursiveTask. */ protected final boolean exec() { result = compute(); return true; }}

 

转载地址:http://bujdi.baihongyu.com/

你可能感兴趣的文章
微信小程序开发全线记录
查看>>
Centos import torchvision 出现 No module named ‘_lzma‘
查看>>
PTA:一元多项式的加乘运算
查看>>
CCF 分蛋糕
查看>>
解决python2.7中UnicodeEncodeError
查看>>
小谈python 输出
查看>>
Django objects.all()、objects.get()与objects.filter()之间的区别介绍
查看>>
python:如何将excel文件转化成CSV格式
查看>>
Django 的Error: [Errno 10013]错误
查看>>
机器学习实战之决策树(一)
查看>>
机器学习实战之决策树二
查看>>
[LeetCode By Python]7 Reverse Integer
查看>>
[leetCode By Python] 14. Longest Common Prefix
查看>>
[leetCode By Python]111. Minimum Depth of Binary Tree
查看>>
[LeetCode By Python]118. Pascal's Triangle
查看>>
[LeetCode By Python]121. Best Time to Buy and Sell Stock
查看>>
[LeetCode By Python]122. Best Time to Buy and Sell Stock II
查看>>
[LeetCode By Python]125. Valid Palindrome
查看>>
[LeetCode By Python]136. Single Number
查看>>
[LeetCode By MYSQL] Combine Two Tables
查看>>