本文共 8350 字,大约阅读时间需要 27 分钟。
思考
1.归并、外排 2.多线程 RecursiveTask
测试生成文件代码
/*** *@author dongsheng *@date 2019/1/18 22:58 *@Description: *@version 1.0.0 */public class GenerateNumber { private static final String filePath="F:/file_test/data1.txt"; public static void main(String[] args) throws IOException { //随机生成数据 Random random = new Random(); try (PrintWriter out = new PrintWriter(new File(filePath))) { long beginTime = System.currentTimeMillis(); System.out.println("beginTime:"+beginTime); for (int i = 0; i < 1_000_000; i++) { out.println(random.nextInt()); if (i % 10000 == 0) out.flush(); } System.out.println("endTime:"+(System.currentTimeMillis()-beginTime)); } }}
排序代码
import java.io.BufferedReader;import java.io.FileReader;import java.io.PrintWriter;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ForkJoinPool;/*** *@author dongsheng *@date 2019/1/18 22:58 *@Description: *@version 1.0.0 */public class SortByForkjoin { private static final String filePath="F:/file_test/data1.txt"; private static final String afterFilePath="F:/file_test/data1-sort.txt"; public static void main(String[] args) throws Exception { ForkJoinPool pool = new ForkJoinPool(); /*** * 1.从文件中读取内存容量可处理的数据量 ,拆分为多个部分排序的文件 * 2.每100_000条数据为一个文件 * */ int size = 100_000; int[] array = new int[size]; BufferedReader reader = new BufferedReader(new FileReader(filePath)); String line = null; int i = 0; int partition = 0; /*** * 1.拆分的部分文件名list,后面归并部分文件需要用到它 * 2.每size条数据,排序后,放到一个文件 * */ ListpartFiles = new ArrayList<>(); while ((line = reader.readLine()) != null) { array[i++] = Integer.parseInt(line); if (i == size) { // 重置i为0 i = 0; // 排序输出到部分文件 String filename = "F:/file_test/data1-part-" + partition++ + ".txt"; //对当前partition的数据进行排序 doPartitionSort(pool, filename, array, 0, size); partFiles.add(filename); } } reader.close(); // 未达到size的部分数据,排序后放一个文件 if (i > 0) { // 排序输出到部分文件 String filename = "F:/file_test/data1-part-" + partition++ + ".txt"; doPartitionSort(pool, filename, array, 0, i); partFiles.add(filename); } if (partition > 1) { // 归并排序 MergerFileSortTask mtask = new MergerFileSortTask(partFiles, afterFilePath); pool.submit(mtask); mtask.get(); } else { // 将唯一的一个部分文件重命名为最终结果文件名 } pool.shutdown(); } /** * partition的数据进行排序 * @param pool 工作池 * @param filename 文件名 * @param array 数组 * @param start 开始下标 * @param end 结束下标 * @throws Exception */ static void doPartitionSort(ForkJoinPool pool, String filename, int[] array, int start, int end) throws Exception { ArrayMergerSortTask task = new ArrayMergerSortTask(array, start, end); pool.submit(task); task.get(); try (PrintWriter pw = new PrintWriter(filename);) { for (int i = start; i < end; i++) { pw.println(array[i]); } } }}/*** *@author dongsheng *@date 2019/1/18 22:58 *@Description: *@version 1.0.0 */public class ArrayMergerSortTask extends RecursiveAction { // implementation details follow: static final int THRESHOLD = 1000; final int[] array; final int lo, hi; ArrayMergerSortTask(int[] array, int lo, int hi) { this.array = array; this.lo = lo; this.hi = hi; } ArrayMergerSortTask(int[] array) { this(array, 0, array.length); } protected void compute() { if (hi - lo < THRESHOLD) //小于1000,就排序 sortSequentially(lo, hi); else { int mid = (lo + hi) >>> 1; //大于1000,拆分 invokeAll(new ArrayMergerSortTask(array, lo, mid), new ArrayMergerSortTask(array, mid, hi)); merge(lo, mid, hi); } } void sortSequentially(int lo, int hi) { Arrays.sort(array, lo, hi); //利用JDK自带的排序进行 } void merge(int lo, int mid, int hi) { int[] buf = Arrays.copyOfRange(array, lo, mid); for (int i = 0, j = lo, k = mid; i < buf.length; j++) array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++]; } }/*** *@author dongsheng *@date 2019/1/18 22:58 *@Description: *@version 1.0.0 */public class MergerFileSortTask extends RecursiveTask { List partFiles; int lo, hi; String filename; public MergerFileSortTask(List partFiles, int lo, int hi, String filename) { super(); this.partFiles = partFiles; this.lo = lo; this.hi = hi; this.filename = filename; } public MergerFileSortTask(List partFiles, String filename) { this(partFiles, 0, partFiles.size(), filename); } @Override protected String compute() { // 如果要归并的文件数大于2,则fork int fileCount = hi - lo; if (fileCount > 2) { //fileCount>2 则继续拆分 int mid = fileCount / 2; MergerFileSortTask task1 = new MergerFileSortTask(partFiles, lo, lo + mid, this.filename + "-1"); MergerFileSortTask task2 = new MergerFileSortTask(partFiles, lo + mid, hi, this.filename + "-2"); // 任务提交forkjoinPool中去执行 task1.fork(); task2.fork(); // 合并两个文件 try { //文件 this.mergerFile(task1.get(), task2.get()); //合并执行结果 } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } } else if (fileCount == 2) { //文件个数为2,合并文件 // 合并两个文件 try { this.mergerFile(this.partFiles.get(lo), this.partFiles.get(lo + 1)); } catch (IOException e) { e.printStackTrace(); } } else if (fileCount == 1) { return this.partFiles.get(lo); } return this.filename; } private void mergerFile(String f1, String f2) throws IOException { try (BufferedReader reader1 = new BufferedReader(new FileReader(f1)); BufferedReader reader2 = new BufferedReader(new FileReader(f2)); PrintWriter pw = new PrintWriter(filename);) { String s1 = reader1.readLine(), s2 = reader2.readLine(); Integer d1 = s1 == null ? null : Integer.valueOf(s1); Integer d2 = s2 == null ? null : Integer.valueOf(s2); while (true) { if (s1 == null) { //s1为null,直接把2写完 // 直接读取reader2,写 while (s2 != null) { pw.println(s2); s2 = reader2.readLine(); } } else if (s2 == null) { //s2为null,直接把1写完 // 直接读取reader1,写 while (s1 != null) { pw.println(s1); s1 = reader1.readLine(); } } else { // 比较两个值 while (d1 <= d2 && s1 != null) { // 写s1, 继续读reader1 pw.println(s1); s1 = reader1.readLine(); if (s1 != null) { d1 = Integer.valueOf(s1); } } while (d1 > d2 && s2 != null) { // 写s2, 继续读reader2 pw.println(s2); s2 = reader2.readLine(); if (s2 != null) { d2 = Integer.valueOf(s2); } } } if (s1 == null && s2 == null) // 都读完了 break; } } }}
RecursiveTask
RecusiveTask
和RecusiveAction
相似,只不过每个子任务处理之后会带一个返回值,最终所有的子任务的返回结果会join(合并)成一个结果.
源代码
/* * * * * * * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */package java.util.concurrent;/** * A recursive result-bearing {@link ForkJoinTask}. * *For a classic example, here is a task computing Fibonacci numbers: 斐波那契 * *
{@code * class Fibonacci extends RecursiveTask* * However, besides being a dumb way to compute Fibonacci functions * (there is a simple fast linear algorithm that you'd use in * practice), this is likely to perform poorly because the smallest * subtasks are too small to be worthwhile splitting up. Instead, as * is the case for nearly all fork/join applications, you'd pick some * minimum granularity size (for example 10 here) for which you always * sequentially solve rather than subdividing. * * @since 1.7 * @author Doug Lea */public abstract class RecursiveTask{ * final int n; * Fibonacci(int n) { this.n = n; } * Integer compute() { * if (n <= 1) * return n; * Fibonacci f1 = new Fibonacci(n - 1); * f1.fork(); * Fibonacci f2 = new Fibonacci(n - 2); * return f2.compute() + f1.join(); * } * }} extends ForkJoinTask { private static final long serialVersionUID = 5232453952276485270L; /** * The result of the computation. */ V result; /** * The main computation performed by this task. * @return the result of the computation */ protected abstract V compute(); public final V getRawResult() { return result; } protected final void setRawResult(V value) { result = value; } /** * Implements execution conventions for RecursiveTask. */ protected final boolean exec() { result = compute(); return true; }}
转载地址:http://bujdi.baihongyu.com/