java - 多线程 05 - 并发容器和框架 - Future 和 Fork/Join - Go语言中文社区

java - 多线程 05 - 并发容器和框架 - Future 和 Fork/Join


一、线程的提前完成任务

 

 

    1、Future的使用

  • 结合Callable使用,使用一个 FutureTask 来包装Callable对象,而FutureTask是实现了Future和Runnable接口的,故可用Thread包装FutureTask的对象,然后调用start方法启动线程;

可见 FutureTask 是 Runnable 的子类

使用 Future 和 Callable 开启一个线程

/**
* Callalbe和Runnable的区别
*
* Runnable run方法是被线程调用的,在run方法是异步执行的
*
* Callable的call方法,不是异步执行的,是由Future的run方法调用的
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
    // 1、创建一个 Callable线程
    Callable<Integer> call = new Callable<Integer>() {

        @Override
        public Integer call() throws Exception {
            System.out.println("正在执行call...");
            Thread.sleep(3000);
            return 1;
        }
    };
    // 2、使用FutureTask包装Callable线程
    FutureTask<Integer> task = new FutureTask<>(call);

    // 3、使用Thread 包装FutureTask来启动线程
    Thread thread = new Thread(task);
    thread.start();  // 启动线程

    // 此时主线程可以做别的事情,之后再获取Callable线程执行完毕的返回值
    System.out.println(" 主线程在运行1...");
    Integer result = task.get();  // 注:在拿结果的时候会阻塞当前线程
    System.out.println(" 主线程在运行2...");

    System.out.println("拿到call的结果为:" + result);

}

* Callalbe和Runnable的区别

* Runnable run方法是被线程调用的,在run方法是异步执行的

* Callable的call方法,不是异步执行的,是由Future的run方法调用的

  • FutureTask的源码

    • 由于FutureTask是由Thread包装并用start启动的线程,故启动了FutureTask的run方法

    • run方法中通过 Callable的对象调用了call方法

    • get方法中通过FutureTask的状态值来对当前线程做yield、wait或return操作

FutureTask 部分源码解析 

public class FutureTask<V> implements RunnableFuture<V> {
    // 几个状态值
    private volatile int state;
    private static final int NEW          = 0; // 新建线程
    private static final int COMPLETING   = 1; // 正在执行线程
    private static final int NORMAL       = 2; // 正常执行完线程
    private static final int EXCEPTIONAL  = 3; // 异常线程
    private static final int CANCELLED    = 4; // 取消线程
    private static final int INTERRUPTING = 5; // 正在中断线程
    private static final int INTERRUPTED  = 6; // 已中断线程
        // 1、构造方法
    public FutureTask(Callable<V> callable) { // 传入一个Callable对象
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // 设置状态值
    }

    // 2、Thread启动FutureTask的方法
    public void run() {
        if (state != NEW ||  // 判断线程是否为 NEW 状态 ,若不是
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                        null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call(); // 直接调用Callable的call方法
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex); // 执行异常时设置异常
                }
                if (ran) // 执行成功
                    set(result); // 设置返回值
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    // 3、更新线程状态值
    protected void set(V v) {
        // call(); 执行完后,将线程设置为完成中状态、拿到call的返回值、唤醒 get 的线程
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion(); // 唤醒 调用get的线程
        }
    }


    // get 方法
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING) // 若FutureTask线程处于未完成状态,则wait调用get的线程
            s = awaitDone(false, 0L); // awaitDone方法中,会判断FutureTask线程的执行情况而进行yield或wait操作
        return report(s); // 返回FutureTask执行call的返回值
    }
}

 

二、Fork/Join框架

    1、什么是Fork/Join框架

        - Fork/Join框架是java 7 提供的一个用于并行执行任务的框架,是一个把大任务分隔成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架;

        - 多线程的目的不仅是提高程序运行的性能,还可以使程序充分利用CPU资源

 

    2、工作窃取算法

        工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。为什么要使用工作窃取算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。然而,有的线程会把自己队列的任务先完成,然后去窃取别的队列的任务,这时为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

  • 工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争;

  • 工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列;

 

    3、Fork/Join框架的设计

        3.1、步骤

    • 步骤1:分割任务。用Fork类把一个大任务分割成子任务,有可能子任务还是很大。所以还需要不停地分割,直到分割的子任务足够小。

    • 步骤2:执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

    • Fork/Join使用两个类来完成以上两件事情

      • ForkJoinTask类:要使用ForkJoin框架,必须要创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。一般使用它的子类:RecursiveAction(没有返回值);RecursiveTask(有返回值)

      • ForkJoinPool类:ForkJoinTask需要通过ForkJoinPool来执行

        3.2、举个栗子:计算 1 + 2 + 3 + ... + 100

ForkJoinTask的父子类关系

public class Demo extends RecursiveTask<Integer> {

   private int begin;
   private int end;

   public Demo(int begin, int end) {
      this.begin = begin;
      this.end = end;
   }

   /**
    *  RecursiveTask封装好的一个任务类
    *     在这里分割任务、执行任务并合并结果
    * @return
    */
   @Override
   protected Integer compute() {
      System.out.println(Thread.currentThread().getName() + " ... ");
      
      int sum = 0;
      // 拆分任务; 拆分条件:当每个任务执行的运算个数小于等于3个时才开始运算
      if (end - begin <= 2) {
         // 计算
         for (int i = begin; i <= end; i++) {
            sum += i;
         }
      } else {
         // 拆分
         Demo d1 = new Demo(begin, (begin + end) / 2);
         Demo d2 = new Demo((begin + end)/2 + 1, end);
         
         // 执行任务
         d1.fork(); // 再次来到 compute方法中进行“递归”拆分
         d2.fork();
         // 合并结果; join线程加塞
         Integer a = d1.join();
         Integer b = d2.join();
         
         sum = a + b;
      }

      return sum;
   }

   public static void main(String[] args) throws Exception {
      // 创建一个执行 ForkJoinTask任务的 ForkJoinPool 的实例;参数表示线程的个数
      ForkJoinPool pool = new ForkJoinPool(3);
      // 提交并执行一个任务;参数为ForkJoinTask的实例
      Future<Integer> future = pool.submit(new Demo(1, 1000000000));

      System.out.println("计算的值为:" + future.get());
   }

}

 

    3.3、Fork/Join框架的实现原理

        ForkJoinPool 由 ForkJoinTask数组 和 ForkJoinWorkerThread 数组组成,ForkJoinTask 数组负责将存放程序提交给ForkJoinPool 的任务,而 ForkJoinWorkerThread 数组负责执行这些任务。

  • ForkJoinTask 的 fork 方法实现原理

    • 当我们调用 ForkJoinTask 的 fork() 方法时,程序会调用 ForkJoinWorkerThread 的 push() 方法异步地执行这个任务,然后返回结果

    • push() 方法把当前任务存放在 ForkJoinTask 数组队列里。然后调用 ForkJoinPool 的 signalWork() 方法唤醒或创建一个工作线程来执行任务。

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);  // 调用 ForkJoinWorkerThread 的 push方法异步的执行这个任务
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}


final void push(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a; ForkJoinPool p;
    int b = base, s = top, n;
    if ((a = array) != null) {    // ignore if queue removed
        int m = a.length - 1;     // fenced write for task visibility
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
        U.putOrderedInt(this, QTOP, s + 1);
        if ((n = s - b) <= 1) {
            if ((p = pool) != null)
                p.signalWork(p.workQueues, this);  // signalWork 唤醒或创建一个工作线程来执行任务
        }
        else if (n >= m)
            growArray();
    }
}
  • ForkJoinTask 的 join() 方法实现原理

    • join() 方法的主要作用是阻塞当前线程并等待获取结果。

    • 首先它调用了 doJoin()  方法,通过 doJoin()  方法得到当前任务的状态来判断返回什么结果,任务状态有4中:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult(); // 获取结果
}

public final V getRawResult() {
    return result;
}


private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    // 根据状态值返回不同的结果
    return (s = status) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        externalAwaitDone();
}

 

有啥不正确的还望大佬指正,谢谢

 

 

 

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_40493969/article/details/86613491
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-07 22:17:38
  • 阅读 ( 803 )
  • 分类:Linux

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢