Java并发编程之Fork-Join分治编程 - Go语言中文社区

Java并发编程之Fork-Join分治编程


在JDK1.7版本中提供了Fork-Join并行执行任务框架,它的主要作用是把大任务分割成若干个小任务,再对每个小任务得到的结果进行汇总,此种开发方法也叫分治编程,分治编程可以极大地利用CPU资源,提高任务执行的效率,也是目前与多线程有关的前沿技术。

9.1 Fork-Join分治编程与类结构

在这里插入图片描述
在JDK中并行执行框架Fork-Join使用了“工作窃取”算法,它是指某个线程从其他队列里窃取任务来执行,那这样做有什么好处呢?
比如要完成一个比较大的任务,完全可以把这个大的任务分割为若干个互不依赖的子任务/小任务,为了更加方便地管理这些任务,于是把这些子任务分别放到不同的队列里,这时就会出现有的线程会先把自己队列里的任务快速执行完毕,而其他线程对应的队列里还有任务等待处理,完成任务的线程与其等着,不如去帮助其他线程分担要执行的任务,于是它就去其他线程的队列里窃取一个任务来执行,这就是所谓的“工作窃取”算法。
在JKD1.7中实现分治编程需要使用ForkJoinPool类,此类的主要任务是创建一个任务池,类信息如下:
public class ForkJoinPool extends AbstractExecutorService{
该类也是从AbstractExecutorService类继承下来的

类ForkJoinPool所提供的功能是一个任务池,而执行具体任务却不是ForkJoinPool,而是ForkJoinTask类。
所以需要该类的3个子类CountedCompleter,RecursiveAction,RecursiveTask来实现具体功能。
9.2 使用RecursiveAction让任务跑起来

使用类RecursiveAction执行的任务是具有无返回值的,仅执行一次任务。

public class MyRecursiveAction extends RecursiveAction{

	@Override
	protected void compute() {
		System.out.println("跑起来了");
	}

}
..................................
public class Test {
  public static void main(String[] args) throws InterruptedException {
	ForkJoinPool pool = new ForkJoinPool();
	pool.submit(new MyRecursiveAction());
	Thread.sleep(5000);
	
}
}
运行结果:
跑起来了

9.3 使用RecursiveAction分解任务

前面的实例仅是让任务运行起来,并打印一个字符串信息,任务并没有得到fork分解,也就是并没有体现分治编程的运行效果。在调用ForkJoinTask.java类中的fork()方法时需要注意一下效率的问题,因为每一次调用fork都会分离任务,增加系统运行负担,所以在ForkJoinTask.java类中提供了public static void invokeAll(ForkJoinTask<?>t1,ForkJoinTask<?>t2)方法来优化执行效率。

public class MyRecursiveAction extends RecursiveAction{
   private int beginValue;
   private int endValue;
   public MyRecursiveAction(int beginValue,int endValue) {
	   super();
	   this.beginValue = beginValue;
	   this.endValue = endValue;
   }
@Override
protected void compute() {
	System.out.println(Thread.currentThread().getName()+"------------");
	
	if(endValue -beginValue>2) {
		int middelNum = (beginValue+endValue)/2;
		MyRecursiveAction leftAction = new MyRecursiveAction(beginValue,middelNum);
		MyRecursiveAction rightAction = new MyRecursiveAction(middelNum+1,endValue);
		this.invokeAll(leftAction,rightAction);
	}else {
		System.out.println("打印组合:"+beginValue+"-"+endValue);
	}
}
.........................
public class Test {
  public static void main(String[] args) throws InterruptedException {
	 ForkJoinPool pool = new ForkJoinPool();
	 pool.submit(new MyRecursiveAction(1,10));
	 Thread.sleep(5000);
}
}
运行结果:
ForkJoinPool-1-worker-3------------
ForkJoinPool-1-worker-3------------
ForkJoinPool-1-worker-3------------
打印组合:1-3
ForkJoinPool-1-worker-3------------
打印组合:4-5
ForkJoinPool-1-worker-5------------
ForkJoinPool-1-worker-5------------
ForkJoinPool-1-worker-1------------
打印组合:9-10
打印组合:6-8

9.4 使用RecursiveTask取得返回值与join()和get()方法的区别

使用get()获得返回值

public class MyRecursiveTask extends RecursiveTask<Integer>{

	@Override
	protected Integer compute() {
		System.out.println("compute time ="+System.currentTimeMillis());
		return 100;
	}
}
.............................
public class Test1 {
 public static void main(String[] args) throws InterruptedException, ExecutionException {
	 MyRecursiveTask task1 = new MyRecursiveTask();
	 System.out.println(task1.hashCode());
	 ForkJoinPool pool = new ForkJoinPool();
	 ForkJoinTask task2 = pool.submit(task1);
	 System.out.println(task2.hashCode()+" "+task2.get());
	 Thread.sleep(5000);
  }
}
运行结果:
666641942
compute time =1556522593926
666641942 100

使用join()获得返回值

public class Test2 {
 public static void main(String[] args) {
	MyRecursiveTask task1 = new MyRecursiveTask();
	System.out.println(task1.hashCode());
	ForkJoinPool pool = new  ForkJoinPool();
	ForkJoinTask<Integer> task2 = pool.submit(task1);
	System.out.println(task2.hashCode()+"-"+task2.join());
	try {
		Thread.sleep(5000);
	} catch (InterruptedException e) {
		
		e.printStackTrace();
	}
	
 }
}
运行结果:
666641942
compute time =1556523044041
666641942-100

方法join()与get()虽然都能取得计算后的结果值,但它们之间还是在出现异常时有处理上的区别。
使用get()方法执行任务时,当子任务出现异常时可以在main主线程中进行捕获。方法join()遇到异常直接抛出。

9.5 使用RecursiveTask执行多个任务并打印返回值

public class MyRecursiveTaskA extends RecursiveTask<Integer>{

	@Override
	protected Integer compute() {
		System.out.println(Thread.currentThread().getName()+"begin A"+System.currentTimeMillis());
		try {
			Thread.sleep(3000);
			System.out.println(Thread.currentThread().getName()+"end A"+System.currentTimeMillis());
		} catch (InterruptedException e) {
			
			e.printStackTrace();
		}
		return 100;
	}
}
.......................................
public class MyRecursiveTaskB extends RecursiveTask<Integer>{

	@Override
	protected Integer compute() {
		System.out.println(Thread.currentThread().getName()+"begin B"+System.currentTimeMillis());
		try {
			Thread.sleep(5000);
			System.out.println(Thread.currentThread().getName()+"end B"+System.currentTimeMillis());
		} catch (InterruptedException e) {
			
			e.printStackTrace();
		}
		return 100;
	}
}
......................................................
public class Test {
  public static void main(String[] args) throws InterruptedException {
	 ForkJoinPool pool = new ForkJoinPool();
	 ForkJoinTask<Integer> runTaskA = pool.submit(new MyRecursiveTaskA());
	 ForkJoinTask<Integer> runTaskB = pool.submit(new MyRecursiveTaskB());
	 System.out.println("准备打印"+System.currentTimeMillis());
	 System.out.println(runTaskA.join()+"A:"+System.currentTimeMillis());
	 System.out.println(runTaskB.join()+"B:"+System.currentTimeMillis());
	 pool.submit(new MyRecursiveAction(1,10));
	 Thread.sleep(5000);
}
}
运行结果:
准备打印1556524413868
ForkJoinPool-1-worker-5begin B1556524413868
ForkJoinPool-1-worker-3begin A1556524413869
ForkJoinPool-1-worker-3end A1556524416869
100A:1556524416869
ForkJoinPool-1-worker-5end B1556524418868
100B:1556524418868
ForkJoinPool-1-worker-5------------
ForkJoinPool-1-worker-5------------
ForkJoinPool-1-worker-3------------
ForkJoinPool-1-worker-5------------
ForkJoinPool-1-worker-7------------
打印组合:9-10
ForkJoinPool-1-worker-3------------
打印组合:1-3
打印组合:6-8
ForkJoinPool-1-worker-7------------
打印组合:4-5

每个任务返回值为100,并且任务之间的运行方式是异步的,但join()方法是同步的。

9.6 使用RecursiveTask实现字符串累加

public class MyRecursiveTask extends RecursiveTask<String>{
   private int beginValue;
   private int endValue;
   public MyRecursiveTask(int beginValue,int endValue) {
	   this.beginValue = beginValue;
	   this.endValue = endValue;
   }
	@Override
	protected String compute() {
		System.out.println(Thread.currentThread().getName()+"-----------");
		if(endValue -beginValue>2) {
			int middelValue = (beginValue+endValue)/2;
			MyRecursiveTask leftTask = new MyRecursiveTask(beginValue,middelValue);
			MyRecursiveTask rightTask = new MyRecursiveTask(middelValue+1,endValue);
			this.invokeAll(leftTask,rightTask);
			return leftTask.join()+rightTask.join();
		}else {
			String returnString = "";
			for(int i = beginValue;i<=endValue;i++) {
				returnString = returnString+(i);
			}
			System.out.println("else 返回"+returnString+" "+beginValue+" "+endValue);
			return returnString;
		}
		
	}
	

}
..................................................
public class Test {
  public static void main(String[] args) throws InterruptedException {
	 ForkJoinPool pool = new ForkJoinPool();
	 MyRecursiveTask taskA = new MyRecursiveTask(1,20);
	 ForkJoinTask<String> runTaskA = pool.submit(taskA);
	 System.out.println(runTaskA.join());
	 Thread.sleep(5000);
	 
}
}
运行结果:
ForkJoinPool-1-worker-3-----------
ForkJoinPool-1-worker-3-----------
ForkJoinPool-1-worker-3-----------
ForkJoinPool-1-worker-3-----------
else 返回123 1 3
ForkJoinPool-1-worker-5-----------
ForkJoinPool-1-worker-5-----------
ForkJoinPool-1-worker-3-----------
ForkJoinPool-1-worker-7-----------
ForkJoinPool-1-worker-5-----------
ForkJoinPool-1-worker-7-----------
else 返回678 6 8
ForkJoinPool-1-worker-7-----------
else 返回910 9 10
ForkJoinPool-1-worker-7-----------
ForkJoinPool-1-worker-7--
                        
                        
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_14842117/article/details/89680825
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-07 22:17:21
  • 阅读 ( 779 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢