Java并发编程—Fork/Join - Go语言中文社区

Java并发编程—Fork/Join


并发编程—Fork/Join

之前的学习都是基础的,接下来升一个阶段了,自己也是很期待啊!不够前边学习过的还是要牢牢掌握的。
这边文章就用来记录Fork-Join这里的知识点了。

Fork-Join初识

Fork-Join是什么?

ForkJoin是Java7提供的原生多线程并行处理框架,其基本思想是将大任务分割成小任务,最后将小任务聚合起来得到结果。fork是分解的意思, join是收集的意思.。

体现了分而治之

Fork-Join框架体现了分而治之的算法思想,除了Fork-Join我看看有哪些使用了分而治之,二分查找、快速排序、归并排序、HADOOP提供的MapReduce框架等这些都是,那么什么是分而治之?
分而治之:规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解。
在这里插入图片描述

在Fork-Join中还有一个点,就是使用工作密取的方式提高效率。
工作密取:我在网上搜了这相关的概念,虽然没有确定的说法,但是都是一样的思想,那个例子来说,比如:张三和李四同时做一样工作,把这个工作分成两个部分让他们去做,张三的任务少做的速度比李四快,张三做完自己的那部分任务之后,有去帮李四做,当然张三帮李四做的那部分任务依旧算是李四的,只不过张帮李四做完了。让每个人都不闲着,效率就提上去了,这就是我理解的工作密取了。

Fork-Join实战

讲了很多Fork-Join 的理论思想,下面就写一些干货吧,为了不让我这个健忘的人,忘记。

怎样使用?

Fork-Join框架提供了两个类供我们继承,以此创建Fork/Join任务;

  • RecursiveAction,用于没有返回结果的任务
  • RecursiveTask,用于有返回值的任务

这两个类的父类都是ForkJoinTask,它重要的两个方法fork和join。fork方法用以一部方式启动任务的执行,join方法则等待任务完成并返回指向结果。
下面就要来说下ForkJoinPool:
ForkJoinPool:来执行ForkJoinTask,也就是我们创建的任务。分割的子任务也会添加到当前工作线程的双端队列中,进入队列的头部。当一个工作线程中没有任务时,会从其他工作线程的队列尾部获取一个任务(工作密取)
。下面就是ForkJoinPool提供的三个方法,用来执行我们的Task的。

  • 异步执行:execute(ForkJoinTask)
  • 同步调用:invoke(ForkJoinTask)
  • 执行,获取Future:submit(ForkJoinTask)

异常处理:
ForkJoinTask在执行的时候可能会抛出异常,但是没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常.
getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。

代码展示

说了这么多,不如代码来的直接【看着代码去理解,更容易懂】:
栗子一:计算数组的总和【继承RecursiveTask】:

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

import com.xiangxue.tools.SleepTools;
/**
 * 计算数组的总和【继承RecursiveTask】
 */
public class SumArray {
    private static class SumTask extends RecursiveTask<Integer>{

        private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10;
        private int[] src; //表示我们要实际统计的数组
        private int fromIndex;//开始统计的下标
        private int toIndex;//统计到哪里结束的下标

        public SumTask(int[] src, int fromIndex, int toIndex) {
            this.src = src;
            this.fromIndex = fromIndex;
            this.toIndex = toIndex;
        }

		@Override
		protected Integer compute() {
			if(toIndex-fromIndex < THRESHOLD) {
				int count = 0;
				for(int i=fromIndex;i<=toIndex;i++) {
			    	//SleepTools.ms(1);
			    	count = count + src[i];
				}
				return count;
			}else {
				//fromIndex....mid....toIndex
				int mid = (fromIndex+toIndex)/2;
				SumTask left = new SumTask(src,fromIndex,mid);
				SumTask right = new SumTask(src,mid+1,toIndex);
				invokeAll(left,right);
				return left.join()+right.join();
			}
		}
    }


    public static void main(String[] args) {

        ForkJoinPool pool = new ForkJoinPool();
        int[] src = MakeArray.makeArray();

        SumTask innerFind = new SumTask(src,0,src.length-1);

        long start = System.currentTimeMillis();

        pool.invoke(innerFind);//同步调用
        System.out.println("Task is Running.....");

        System.out.println("The count is "+innerFind.join()
                +" spend time:"+(System.currentTimeMillis()-start)+"ms");

    }
}

栗子二:遍历文件目录【继承RecursiveAction】

public class FindDirsFiles extends RecursiveAction{

    private File path;//当前任务需要搜寻的目录

    public FindDirsFiles(File path) {
        this.path = path;
    }

    public static void main(String [] args){
        try {
            // 用一个 ForkJoinPool 实例调度总任务
            ForkJoinPool pool = new ForkJoinPool();
            FindDirsFiles task = new FindDirsFiles(new File("F:/"));

            pool.execute(task);//异步调用

            System.out.println("Task is Running......");
            Thread.sleep(1);
            int otherWork = 0;
            for(int i=0;i<100;i++){
                otherWork = otherWork+i;
            }
            System.out.println("Main Thread done sth......,otherWork="+otherWork);
            task.join();//阻塞的方法
            System.out.println("Task end");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

	@Override
	protected void compute() {
		
		List<FindDirsFiles> subTasks = new ArrayList<>();
		
		File[] files = path.listFiles();
		if(files!=null) {
			for(File file:files) {
				if(file.isDirectory()) {
					subTasks.add(new FindDirsFiles(file));
				}else {
					//遇到文件,检查
					if(file.getAbsolutePath().endsWith("txt")) {
						System.out.println("文件:"+file.getAbsolutePath());
					}
				}
			}
			if(!subTasks.isEmpty()) {
				for(FindDirsFiles subTask:invokeAll(subTasks)) {
					subTask.join();//等待子任务执行完成
				}
			}
		}
	}
}

注意

Fork/Join 虽然能提高任务的执行效率,但也不是一定。有个的因素,阈值:阈值决定着任务所分的粒度,如果分的粒度太小不好,太大也不好,在实际使用时要注意。

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_38345031/article/details/84570148
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢