社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
版权声明:本文为神州灵云作者的原创文章,未经神州灵云允许不得转载。
本文作者:Kevin
引言:
相比传统顺序编程,多线程并发编程在运行效率有这巨大的优势,但是由于线程资源的共享,线程同步等问题,多线程编程的难度也要高许多。Java是最先支持多线程的开发的语言之一,Java 提供了许多方便好用的多线程并发方法,理解并合理地运用它们,会让我们在开发中受益匪浅。Fork/Join框架就是一个充分利用多线程优势,集“分治”与“并行”一体的并发框架。
1. Fork/Join框架思想
Fork/Join框架从名字就可以初见端倪,Fork(分支)的意思是将任务分离,Join(合并)将结果合并。Fork/Join框架的核心思想是“分治”,将一个问题分解为多个小问题,这些问题相互独立,得到所有小问题的结果后就可以得到整个问题的结果。
在传统编程中,我们可以采取递归的方式实现具有一定规律的问题分治,但是由于频繁的函数调用,栈切换,递归的运行效率十分低下。Fork/Join框架提供了分治问题的多线程并发解决方案,提供了高效的实现。
2. Fork/Join框架原理
Fork/Join框架建立在Java线程池的基础之上,主要通过ForkJoinPool,ForkJoinTask类实现:
ForkJoinPool是ExecutorService的实现类,是一种特殊的线程池,ForkJoinTask则是ForkJoinPool线程池可接受的任务对象,分为RecursiveTask(带执行结果的任务)和RecursiveAction(没有执行结果的任务)。 ForkJoinPool接收提交的ForkJoinTask,为任务及其分支子任务分配线程。
3. 关于线程池
ForkJoinPool是一种特殊的线程池,要了解它得先了解Java线程池,这里只大致展示Java线程池的结构,不做深入分析。
线程池内维护一个核心池,每次有任务提交,会提供一个线程执行任务,直至池中线程数量达到最大值,之后无法立即执行的任务会被存放到任务队列内(workQueue),池内任务完成后再从队列中取出任务继续执行。
线程池的线程生成销毁策略,队列类型,最大核心池数量,任务拒绝策略等都是可配置的,使得Java线程池可以应对不同的场景与问题。而ForkJoinPool就是其中的一种实现。
4. Fork/Join实现
对于一个任务,它需要继承ForkJoinTask,并实现compute()方法,它是任务的执行部分。在执行的compute方法中,可以通过新建子任务并调用子任务的fork()方法实现任务的拆分。随后调用子任务的join()方法,join()方法将等子任务处理完成后,获取子任务执行结果。
这时可能就会出现一个问题,如果子线程拆分得很细(递归层数很深),那岂不是会造成大量的线程等待,显然这是不合理的。ForkJoinPool采用work stealing(偷取任务)解决了这个问题。
ForkJoinPool除了线程池所带的工作队列, 每个线程都有自己的工作队列。工作队列采用双端队列(DeQueue)实现,双端队列可以从队列的头尾两端出队或入队。队伍采用LIFO算法,从队尾取出任务并执行,这很符合“分治”先计算子任务的规律。
work steal 顾名思义,就是偷取他人的任务,当一个线程处于空闲状态时,他会尝试从别的线程的工作队列中窃取任务放入自己的队列并执行,这样join()的子任务没有完成时,会先执行其他任务,防止线程空等待。同时,为了防止频繁产生竞争降低执行效率,窃取采用FIFO算法,从队头取任务。这样,只有在任务队列长度小于等于1,才会发生竞争。
接着来看看具体的实现,先看fork()
函数的实现:
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
可以看到Fork
函数将子任务push
到了队伍中(队尾),它并没有执行子任务。
随后看看join()
的实现,join()
相对比较复杂。
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
首先join()
判断该线程是否是ForkJoinThread
,如果是,判断任务完成情况,如果完成了,则直接返回结果。
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
如果没完成,判断是否在任务所在线程的工作队列顶端,如果是,就执行它。
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
if (task != null && w != null) {
ForkJoinTask<?> prevJoin = w.currentJoin;
U.putOrderedObject(w, QCURRENTJOIN, task);
CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
(CountedCompleter<?>)task : null;
for (;;) {
if ((s = task.status) < 0)
break;
if (cc != null)
helpComplete(w, cc, 0);
else if (w.base == w.top || w.tryRemoveAndExec(task))
helpStealer(w, task);
if ((s = task.status) < 0)
break;
long ms, ns;
if (deadline == 0L)
ms = 0L;
else if ((ns = deadline - System.nanoTime()) <= 0L)
break;
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L;
if (tryCompensate(w)) {
task.internalWait(ms);
U.getAndAddLong(this, CTL, AC_UNIT);
}
}
U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
}
return s;
}
如果不是,在工作队列中搜索,如果找到了,就用空任务替换取出,并执行(tryRemoveAndExec)
如果没有找到任务,说明任务被别的线程偷取了,则尝试从小偷的工作队列内窃取任务(LIFO)执行
,帮助他更快完成join()
任务(helpStealer)
private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
WorkQueue[] ws = workQueues;
int oldSum = 0, checkSum, m;
if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
task != null) {
do { // restart point
checkSum = 0; // for stability check
ForkJoinTask<?> subtask;
WorkQueue j = w, v; // v is subtask stealer
descent: for (subtask = task; subtask.status >= 0; ) {
for (int h = j.hint | 1, k = 0, i; ; k += 2) {
if (k > m) // can't find stealer
break descent;
if ((v = ws[i = (h + k) & m]) != null) {
if (v.currentSteal == subtask) {
j.hint = i;
break;
}
checkSum += v.base;
}
}
for (;;) { // help v or descend
ForkJoinTask<?>[] a; int b;
checkSum += (b = v.base);
ForkJoinTask<?> next = v.currentJoin;
if (subtask.status < 0 || j.currentJoin != subtask ||
v.currentSteal != subtask) // stale
break descent;
if (b - v.top >= 0 || (a = v.array) == null) {
if ((subtask = next) == null)
break descent;
j = v;
break;
}
int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
ForkJoinTask<?> t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i));
if (v.base == b) {
if (t == null) // stale
break descent;
if (U.compareAndSwapObject(a, i, t, null)) {
v.base = b + 1;
ForkJoinTask<?> ps = w.currentSteal;
int top = w.top;
do {
U.putOrderedObject(w, QCURRENTSTEAL, t);
t.doExec(); // clear local tasks too
} while (task.status >= 0 &&
w.top != top &&
(t = w.pop()) != null);
U.putOrderedObject(w, QCURRENTSTEAL, ps);
if (w.base != w.top)
return; // can't further help
}
}
}
}
} while (task.status >= 0 && oldSum != (oldSum = checkSum));
}
}
如果在帮助完成后,发现自己的队列有了任务(w.base != w.top
),就不在帮助,否则在自身的join
等待的任务完成之前,不断偷取任务执行。
大致流程图如下:
ForkJoinPool正是通过work stealing 这一方法,巧妙地解决了子任务导致父任务线程等待的问题,实现了任务的高并发有序执行。
通过原理解读发现,父线程能实现非阻塞高并发执行子任务,都是实现在ForkJoinTask的dojoin()方法内,所以ForkJoinPool适合实现计算密集型的任务,而对于存在 I/O,线程间同步,sleep()等会阻塞线程的任务,ForkJoin本身不能阻止线程被阻塞,所以并不适合,除非配合ManagedBlocker使用。
5. 总结:
Java线程池提供了强大的并发编程功能。ForkJoinPool作为Java线程池的一种,是支持了高度并行实现“分治”的线程池,在处理递归等分治问题时可以有效提高效率,了解ForkJoinPool的实现原理让我们在适合的场景,可以更好的使用它。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!