背景

最近温习《Java虚拟机精讲》的时候,里面有提到使用Folk/Join框架实现多核并行计算,似乎有点陌生又有点熟悉,原来真的好久没用了,根据书上的相关资料,自Java7开始新增在java.util.concurrent包下面新增了基于细粒度的多核并行计算Folk/Join框架。它的主要思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。这种思想和开源基金会Apache提供的Hadoop里面MapReduce很像(input –> split –> map –> reduce –> output)

主要有两步:

  • 第一、任务切分(Folk)
  • 第二、结果合并 (Join)

使用说明

自JDK1.7开始,java.util.concurrent包下提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。多线程ForkJoinPool运用了Fork/Join原理,使用“分而治之”的思想,将大任务分拆成小任务分配给多个线程执行,最后合并得到最终结果,加快运算。

Folk_Join原理图片
它的模型大致是这样的:线程池中的每个线程都有自己的工作队列(这一点和ThreadPoolExecutor不同,ThreadPoolExecutor是所有线程共用一个工作队列。
ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。

使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask task) 或invoke(ForkJoinTask task)方法来执行指定任务了。

其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。

代码DEMO

  • Demo 1.使用ForkJoinPool完成一个任务的分段执行

    简单的打印0-500的数值。用多线程实现并行执行实现

    package com.vincent.demo.folkjoin;

    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveAction;
    import java.util.concurrent.TimeUnit;

    /**
    * @Desc 使用ForkJoinPool完成一个任务的分段执行
    * 简单的打印0-500的数值。用多线程实现并行执行
    * @author vincent.li
    * @version 1.0
    * @since JDK 1.7
    * @see
    */
    public class ForkJoinPoolAction {

    public static void main(String[] args) throws Exception {
    PrintTask task = new PrintTask(0, 500);
    //创建实例,并执行分割任务
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    forkJoinPool.submit(task);
    //线程阻塞,等待所有任务完成
    forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
    forkJoinPool.shutdown();
    }
    }

    /**
    * @Desc: 继承RecursiveAction来实现“可分解”的任务。
    *
    * @author vincent.li
    * @version 1.0
    * @since JDK 1.7
    */
    class PrintTask extends RecursiveAction {
    //最多只能打印50个数
    private static final int THRESHOLD = 50;
    private int start;
    private int end;



    public PrintTask(int start, int end) {
    super();
    this.start = start;
    this.end = end;
    }



    @Override
    protected void compute() {

    if(end - start < THRESHOLD){
    for(int i = start; i < end; i++){
    System.out.println(Thread.currentThread().getName()+"打印i值>>>"+i);
    }
    }else {
    int middle = (start + end) / 2;
    PrintTask left = new PrintTask(start, middle);
    PrintTask right = new PrintTask(middle, end);
    //并行执行两个“小任务”
    left.fork();
    right.fork();
    }

    }

    }

  • 执行结果如下:
    ······
    ······
    ······
    ForkJoinPool-1-worker-6打印i值>>>282
    ForkJoinPool-1-worker-2打印i值>>>219
    ForkJoinPool-1-worker-3打印i值>>>343
    ForkJoinPool-1-worker-4打印i值>>>97
    ForkJoinPool-1-worker-0打印i值>>>31
    ForkJoinPool-1-worker-4打印i值>>>98
    ForkJoinPool-1-worker-3打印i值>>>344
    ForkJoinPool-1-worker-2打印i值>>>220
    ForkJoinPool-1-worker-6打印i值>>>283
    ForkJoinPool-1-worker-7打印i值>>>2
    ForkJoinPool-1-worker-5打印i值>>>408
    ······
    ······
    ······

我的Macbook是2.2 GHz Intel Core i7,是8核的,由线程名称可以知道,8个CPU都参与了执行,其实我们ForkJoinPool默认会开启和CPU核数一样的线程数。

  • Demo 2.对一个长度为100000000的数组的元素进行累加求和
    package com.vincent.demo.folkjoin;

    import java.util.Random;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.Future;
    import java.util.concurrent.RecursiveTask;

    /**
    * @Desc 对一个长度为100000000的数组的元素进行累加求和
    * @author vincent.li
    * @version 1.0
    * @since JDK 1.7
    * @see
    */
    public class ForJoinPollTask {

    public static void main(String[] args) throws Exception {
    long[] arr = new long[100000000];
    Random random = new Random();
    long total =0L;
    int len = arr.length;
    //初始化10000000个数组元素
    for(int i=0; i<len; i++){
    int temp = random.nextInt(50);
    arr[i]=Long.valueOf(temp).longValue() ;
    }
    System.out.println("数组初始化完成。。。");
    long start = System.currentTimeMillis();
    for(int i=0; i<len; i++){
    //对数组元素赋值,并将数组元素的值添加到sum总和中
    total += arr[i];
    }
    long timeCost = System.currentTimeMillis() - start;
    System.out.println("for循环累加求和结果:" + total + ",耗时:" + timeCost + " ms");
    start = System.currentTimeMillis();
    SumTask task = new SumTask(arr, 0, len);
    // 创建一个通用池,这个是jdk1.8提供的功能
    ForkJoinPool pool = ForkJoinPool.commonPool();
    Future<Long> future = pool.submit(task); //提交分解的SumTask 任务
    timeCost = System.currentTimeMillis() - start;
    System.out.println("多线程执行求和结果:"+future.get()+ ",耗时:" + timeCost + " ms");
    pool.shutdown(); //关闭线程池
    }

    }

    /**
    * @Desc: 继承抽象类RecursiveTask,通过返回的结果,来实现数组的多线程分段累累加
    * RecursiveTask 具有返回值
    *
    * @author vincent.li
    * @version 1.0
    * @since JDK 1.7
    */
    class SumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 50; //每个小任务 最多只累加50个数
    private long arry[];
    private int start;
    private int end;



    /**
    * Creates a new instance of SumTask.
    * 累加从start到end的arry数组
    * @param arry
    * @param start
    * @param end
    */
    public SumTask(long[] arry, int start, int end) {
    super();
    this.arry = arry;
    this.start = start;
    this.end = end;
    }



    @Override
    protected Long compute() {
    long sum = 0L;
    //当end与start之间的差小于threshold时,开始进行实际的累加
    if(end - start < THRESHOLD){
    for(int i = start; i < end; i++){
    sum += arry[i];
    }
    return sum;
    } else {
    //当end与start之间的差大于threshold,即要累加的数超过50个时候,将大任务分解成小任务
    int middle = (start+ end)/2;
    SumTask left = new SumTask(arry, start, middle);
    SumTask right = new SumTask(arry, middle, end);
    //并行执行两个 小任务
    left.fork();
    right.fork();
    //把两个小任务累加的结果合并起来
    return left.join()+right.join();
    }

    }

    }
  • 执行结果如下:
    数组初始化完成。。。
    for循环累加求和结果:2450102493,耗时:86 ms
    多线程执行求和结果:2450102493,耗时:3 ms
    我们这次是针对长度1亿的数组的所有元素求和,很明显看出多核并行计算执行效率缴传统for循环累加高了20多将近30倍,如果我们采用这个多核并行计算框架来处理海量数据,那效率必须杠杠的。

分析

自JDK1.7开始,Java中引入了可以多核并行计算一种新的线程池:ForkJoinPool。它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。
ForkJoinPool主要用来使用 分治算法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法。

他们区别在于,ForkJoinPool可以使用相对少的线程来处理大量的任务,对于有限资源可以充分使用。

比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。

那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。

所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。

以上程序的关键方法是fork()和join()方法。在ForkJoinPool使用的线程中,会使用一个内部队列来对需要执行的任务以及子任务进行操作来保证它们的执行顺序。

那么使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?

首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。

Notice: ForkJoinPool在执行过程中,会创建大量的子任务,这会导致GC进行垃圾回收,这里可能带来一些系统卡顿的风险。