Java多线程之Fork Join框架 - Go语言中文社区

Java多线程之Fork Join框架


版权声明:本文为神州灵云作者的原创文章,未经神州灵云允许不得转载。

本文作者:Kevin

引言:
相比传统顺序编程,多线程并发编程在运行效率有这巨大的优势,但是由于线程资源的共享,线程同步等问题,多线程编程的难度也要高许多。Java是最先支持多线程的开发的语言之一,Java 提供了许多方便好用的多线程并发方法,理解并合理地运用它们,会让我们在开发中受益匪浅。Fork/Join框架就是一个充分利用多线程优势,集“分治”与“并行”一体的并发框架。

1. Fork/Join框架思想
Fork/Join框架从名字就可以初见端倪,Fork(分支)的意思是将任务分离,Join(合并)将结果合并。Fork/Join框架的核心思想是“分治”,将一个问题分解为多个小问题,这些问题相互独立,得到所有小问题的结果后就可以得到整个问题的结果。
1.jpg

在传统编程中,我们可以采取递归的方式实现具有一定规律的问题分治,但是由于频繁的函数调用,栈切换,递归的运行效率十分低下。Fork/Join框架提供了分治问题的多线程并发解决方案,提供了高效的实现。

2. Fork/Join框架原理
Fork/Join框架建立在Java线程池的基础之上,主要通过ForkJoinPool,ForkJoinTask类实现:
2.jpg

ForkJoinPool是ExecutorService的实现类,是一种特殊的线程池,ForkJoinTask则是ForkJoinPool线程池可接受的任务对象,分为RecursiveTask(带执行结果的任务)和RecursiveAction(没有执行结果的任务)。 ForkJoinPool接收提交的ForkJoinTask,为任务及其分支子任务分配线程。

3. 关于线程池
ForkJoinPool是一种特殊的线程池,要了解它得先了解Java线程池,这里只大致展示Java线程池的结构,不做深入分析。
3.jpg

线程池内维护一个核心池,每次有任务提交,会提供一个线程执行任务,直至池中线程数量达到最大值,之后无法立即执行的任务会被存放到任务队列内(workQueue),池内任务完成后再从队列中取出任务继续执行。

线程池的线程生成销毁策略,队列类型,最大核心池数量,任务拒绝策略等都是可配置的,使得Java线程池可以应对不同的场景与问题。而ForkJoinPool就是其中的一种实现。

4. Fork/Join实现
对于一个任务,它需要继承ForkJoinTask,并实现compute()方法,它是任务的执行部分。在执行的compute方法中,可以通过新建子任务并调用子任务的fork()方法实现任务的拆分。随后调用子任务的join()方法,join()方法将等子任务处理完成后,获取子任务执行结果。

这时可能就会出现一个问题,如果子线程拆分得很细(递归层数很深),那岂不是会造成大量的线程等待,显然这是不合理的。ForkJoinPool采用work stealing(偷取任务)解决了这个问题。

ForkJoinPool除了线程池所带的工作队列, 每个线程都有自己的工作队列。工作队列采用双端队列(DeQueue)实现,双端队列可以从队列的头尾两端出队或入队。队伍采用LIFO算法,从队尾取出任务并执行,这很符合“分治”先计算子任务的规律。

work steal 顾名思义,就是偷取他人的任务,当一个线程处于空闲状态时,他会尝试从别的线程的工作队列中窃取任务放入自己的队列并执行,这样join()的子任务没有完成时,会先执行其他任务,防止线程空等待。同时,为了防止频繁产生竞争降低执行效率,窃取采用FIFO算法,从队头取任务。这样,只有在任务队列长度小于等于1,才会发生竞争。
4.jpg

接着来看看具体的实现,先看fork()函数的实现:

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

可以看到Fork函数将子任务push到了队伍中(队尾),它并没有执行子任务。
随后看看join()的实现,join()相对比较复杂。

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

首先join()判断该线程是否是ForkJoinThread,如果是,判断任务完成情况,如果完成了,则直接返回结果。

private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        externalAwaitDone();
}

如果没完成,判断是否在任务所在线程的工作队列顶端,如果是,就执行它。

final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
    int s = 0;
    if (task != null && w != null) {
        ForkJoinTask<?> prevJoin = w.currentJoin;
        U.putOrderedObject(w, QCURRENTJOIN, task);
        CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
            (CountedCompleter<?>)task : null;
        for (;;) {
            if ((s = task.status) < 0)
                break;
            if (cc != null)
                helpComplete(w, cc, 0);
            else if (w.base == w.top || w.tryRemoveAndExec(task))
                helpStealer(w, task);
            if ((s = task.status) < 0)
                break;
            long ms, ns;
            if (deadline == 0L)
                ms = 0L;
            else if ((ns = deadline - System.nanoTime()) <= 0L)
                break;
            else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                ms = 1L;
            if (tryCompensate(w)) {
                task.internalWait(ms);
                U.getAndAddLong(this, CTL, AC_UNIT);
            }
        }
        U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
    }
    return s;
}

如果不是,在工作队列中搜索,如果找到了,就用空任务替换取出,并执行(tryRemoveAndExec)
如果没有找到任务,说明任务被别的线程偷取了,则尝试从小偷的工作队列内窃取任务(LIFO)执行,帮助他更快完成join()任务(helpStealer)

private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
    WorkQueue[] ws = workQueues;
    int oldSum = 0, checkSum, m;
    if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
        task != null) {
        do {                                       // restart point
            checkSum = 0;                          // for stability check
            ForkJoinTask<?> subtask;
            WorkQueue j = w, v;                    // v is subtask stealer
            descent: for (subtask = task; subtask.status >= 0; ) {
                for (int h = j.hint | 1, k = 0, i; ; k += 2) {
                    if (k > m)                     // can't find stealer
                        break descent;
                    if ((v = ws[i = (h + k) & m]) != null) {
                        if (v.currentSteal == subtask) {
                            j.hint = i;
                            break;
                        }
                        checkSum += v.base;
                    }
                }
                for (;;) {                         // help v or descend
                    ForkJoinTask<?>[] a; int b;
                    checkSum += (b = v.base);
                    ForkJoinTask<?> next = v.currentJoin;
                    if (subtask.status < 0 || j.currentJoin != subtask ||
                        v.currentSteal != subtask) // stale
                        break descent;
                    if (b - v.top >= 0 || (a = v.array) == null) {
                        if ((subtask = next) == null)
                            break descent;
                        j = v;
                        break;
                    }
                    int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                    ForkJoinTask<?> t = ((ForkJoinTask<?>)
                                         U.getObjectVolatile(a, i));
                    if (v.base == b) {
                        if (t == null)             // stale
                            break descent;
                        if (U.compareAndSwapObject(a, i, t, null)) {
                            v.base = b + 1;
                            ForkJoinTask<?> ps = w.currentSteal;
                            int top = w.top;
                            do {
                                U.putOrderedObject(w, QCURRENTSTEAL, t);
                                t.doExec();        // clear local tasks too
                            } while (task.status >= 0 &&
                                     w.top != top &&
                                     (t = w.pop()) != null);
                            U.putOrderedObject(w, QCURRENTSTEAL, ps);
                            if (w.base != w.top)
                                return;            // can't further help
                        }
                    }
                }
            }
        } while (task.status >= 0 && oldSum != (oldSum = checkSum));
    }
}

如果在帮助完成后,发现自己的队列有了任务(w.base != w.top),就不在帮助,否则在自身的join等待的任务完成之前,不断偷取任务执行。
大致流程图如下:
5.jpg

ForkJoinPool正是通过work stealing 这一方法,巧妙地解决了子任务导致父任务线程等待的问题,实现了任务的高并发有序执行。

通过原理解读发现,父线程能实现非阻塞高并发执行子任务,都是实现在ForkJoinTask的dojoin()方法内,所以ForkJoinPool适合实现计算密集型的任务,而对于存在 I/O,线程间同步,sleep()等会阻塞线程的任务,ForkJoin本身不能阻止线程被阻塞,所以并不适合,除非配合ManagedBlocker使用。

5. 总结:
Java线程池提供了强大的并发编程功能。ForkJoinPool作为Java线程池的一种,是支持了高度并行实现“分治”的线程池,在处理递归等分治问题时可以有效提高效率,了解ForkJoinPool的实现原理让我们在适合的场景,可以更好的使用它。

神州灵云二维码.png

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/dclingcloud/article/details/89792295
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-07 22:17:33
  • 阅读 ( 1097 )
  • 分类:Linux

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢