社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
1、Future的使用
结合Callable使用,使用一个 FutureTask 来包装Callable对象,而FutureTask是实现了Future和Runnable接口的,故可用Thread包装FutureTask的对象,然后调用start方法启动线程;
可见 FutureTask 是 Runnable 的子类
使用 Future 和 Callable 开启一个线程
/**
* Callalbe和Runnable的区别
*
* Runnable run方法是被线程调用的,在run方法是异步执行的
*
* Callable的call方法,不是异步执行的,是由Future的run方法调用的
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// 1、创建一个 Callable线程
Callable<Integer> call = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("正在执行call...");
Thread.sleep(3000);
return 1;
}
};
// 2、使用FutureTask包装Callable线程
FutureTask<Integer> task = new FutureTask<>(call);
// 3、使用Thread 包装FutureTask来启动线程
Thread thread = new Thread(task);
thread.start(); // 启动线程
// 此时主线程可以做别的事情,之后再获取Callable线程执行完毕的返回值
System.out.println(" 主线程在运行1...");
Integer result = task.get(); // 注:在拿结果的时候会阻塞当前线程
System.out.println(" 主线程在运行2...");
System.out.println("拿到call的结果为:" + result);
}
* Callalbe和Runnable的区别
* Runnable run方法是被线程调用的,在run方法是异步执行的
* Callable的call方法,不是异步执行的,是由Future的run方法调用的
FutureTask的源码
由于FutureTask是由Thread包装并用start启动的线程,故启动了FutureTask的run方法
run方法中通过 Callable的对象调用了call方法
get方法中通过FutureTask的状态值来对当前线程做yield、wait或return操作
FutureTask 部分源码解析
public class FutureTask<V> implements RunnableFuture<V> {
// 几个状态值
private volatile int state;
private static final int NEW = 0; // 新建线程
private static final int COMPLETING = 1; // 正在执行线程
private static final int NORMAL = 2; // 正常执行完线程
private static final int EXCEPTIONAL = 3; // 异常线程
private static final int CANCELLED = 4; // 取消线程
private static final int INTERRUPTING = 5; // 正在中断线程
private static final int INTERRUPTED = 6; // 已中断线程
// 1、构造方法
public FutureTask(Callable<V> callable) { // 传入一个Callable对象
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // 设置状态值
}
// 2、Thread启动FutureTask的方法
public void run() {
if (state != NEW || // 判断线程是否为 NEW 状态 ,若不是
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); // 直接调用Callable的call方法
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); // 执行异常时设置异常
}
if (ran) // 执行成功
set(result); // 设置返回值
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
// 3、更新线程状态值
protected void set(V v) {
// call(); 执行完后,将线程设置为完成中状态、拿到call的返回值、唤醒 get 的线程
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion(); // 唤醒 调用get的线程
}
}
// get 方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) // 若FutureTask线程处于未完成状态,则wait调用get的线程
s = awaitDone(false, 0L); // awaitDone方法中,会判断FutureTask线程的执行情况而进行yield或wait操作
return report(s); // 返回FutureTask执行call的返回值
}
}
1、什么是Fork/Join框架
- Fork/Join框架是java 7 提供的一个用于并行执行任务的框架,是一个把大任务分隔成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架;
- 多线程的目的不仅是提高程序运行的性能,还可以使程序充分利用CPU资源
2、工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。为什么要使用工作窃取算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。然而,有的线程会把自己队列的任务先完成,然后去窃取别的队列的任务,这时为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争;
工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列;
3、Fork/Join框架的设计
3.1、步骤
步骤1:分割任务。用Fork类把一个大任务分割成子任务,有可能子任务还是很大。所以还需要不停地分割,直到分割的子任务足够小。
步骤2:执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Fork/Join使用两个类来完成以上两件事情
ForkJoinTask类:要使用ForkJoin框架,必须要创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。一般使用它的子类:RecursiveAction(没有返回值);RecursiveTask(有返回值)
ForkJoinPool类:ForkJoinTask需要通过ForkJoinPool来执行
3.2、举个栗子:计算 1 + 2 + 3 + ... + 100
ForkJoinTask的父子类关系
public class Demo extends RecursiveTask<Integer> {
private int begin;
private int end;
public Demo(int begin, int end) {
this.begin = begin;
this.end = end;
}
/**
* RecursiveTask封装好的一个任务类
* 在这里分割任务、执行任务并合并结果
* @return
*/
@Override
protected Integer compute() {
System.out.println(Thread.currentThread().getName() + " ... ");
int sum = 0;
// 拆分任务; 拆分条件:当每个任务执行的运算个数小于等于3个时才开始运算
if (end - begin <= 2) {
// 计算
for (int i = begin; i <= end; i++) {
sum += i;
}
} else {
// 拆分
Demo d1 = new Demo(begin, (begin + end) / 2);
Demo d2 = new Demo((begin + end)/2 + 1, end);
// 执行任务
d1.fork(); // 再次来到 compute方法中进行“递归”拆分
d2.fork();
// 合并结果; join线程加塞
Integer a = d1.join();
Integer b = d2.join();
sum = a + b;
}
return sum;
}
public static void main(String[] args) throws Exception {
// 创建一个执行 ForkJoinTask任务的 ForkJoinPool 的实例;参数表示线程的个数
ForkJoinPool pool = new ForkJoinPool(3);
// 提交并执行一个任务;参数为ForkJoinTask的实例
Future<Integer> future = pool.submit(new Demo(1, 1000000000));
System.out.println("计算的值为:" + future.get());
}
}
3.3、Fork/Join框架的实现原理
ForkJoinPool 由 ForkJoinTask数组 和 ForkJoinWorkerThread 数组组成,ForkJoinTask 数组负责将存放程序提交给ForkJoinPool 的任务,而 ForkJoinWorkerThread 数组负责执行这些任务。
ForkJoinTask 的 fork 方法实现原理
当我们调用 ForkJoinTask 的 fork() 方法时,程序会调用 ForkJoinWorkerThread 的 push() 方法异步地执行这个任务,然后返回结果
push() 方法把当前任务存放在 ForkJoinTask 数组队列里。然后调用 ForkJoinPool 的 signalWork() 方法唤醒或创建一个工作线程来执行任务。
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this); // 调用 ForkJoinWorkerThread 的 push方法异步的执行这个任务
else
ForkJoinPool.common.externalPush(this);
return this;
}
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this); // signalWork 唤醒或创建一个工作线程来执行任务
}
else if (n >= m)
growArray();
}
}
ForkJoinTask 的 join() 方法实现原理
join() 方法的主要作用是阻塞当前线程并等待获取结果。
首先它调用了 doJoin() 方法,通过 doJoin() 方法得到当前任务的状态来判断返回什么结果,任务状态有4中:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult(); // 获取结果
}
public final V getRawResult() {
return result;
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
// 根据状态值返回不同的结果
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!