如果一个任务需要返回执行结果,一般我们会实现一个Callable任务,并创建一个线程来执行任务。对于执行时间比较长的任务,显然我们同步的等待结果再去执行后续的业务是不现实的,那么,Future模式是怎样解决这个问题的呢?
Future模式,可以让调用方立即返回,然后它自己会在后面慢慢处理,此时调用者拿到的仅仅是一个凭证,调用者可以先去处理其它任务,在真正需要用到调用结果的场合,再使用凭证去获取调用结果。这个凭证就是这里的Future。
Future接口的定义:
public interface Future<V> {
// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否取消
boolean isCancelled();
// 标记任务是否执行完成
boolean isDone();
// 阻塞获取任务结果
V get() throws InterruptedException, ExecutionException;
// 超时获取任务结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
1、NEW:表示任务的初始化状态;
2、COMPLETING:表示任务已执行完成(正常完成或异常完成),但任务结果或异常原因还未设置完成,属于中间状态;
3、NORMAL:表示任务已经执行完成(正常完成),且任务结果已设置完成,属于最终状态;
4、EXCEPTIONAL:表示任务已经执行完成(异常完成),且任务异常已设置完成,属于最终状态;
5、CANCELLED:表示任务还没开始执行就被取消(非中断方式),属于最终状态;
6、INTERRUPTING:表示任务还没开始执行就被取消(中断方式),正式被中断前的过渡状态,属于中间状态;
7、INTERRUPTED:表示任务还没开始执行就被取消(中断方式),且已被中断,属于最终状态。
各个状态之间的流转:
FutureTask在构造时可以接受Runnable或Callable任务,如果是Runnable,则最终包装成Callable:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
// 包装Runnable成为Callable
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
private volatile int state;//任务状态
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable<V> callable; // 真正的任务
private volatile Thread runner; // 保存正在执行任务的线程
/**
* 记录结果或异常
*/
private Object outcome;
/**
* 无锁栈(Treiber stack)
* 保存等待线程
*/
private volatile WaitNode waiters;
当调用FutureTask的get方法时,如果任务没有完成,则调用线程会被阻塞,其实就是将线程包装成WaitNode结点保存到waiters指向的栈中。
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
public void run() {
// 仅当任务为NEW状态时, 才能执行任务
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//设置异常
setException(ex);
}
if (ran)
//设置任务执行结果outcome
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
set方法:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;//存储结果值
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
// 仅NEW状态下可以取消任务
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) { // 中断任务
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//释放所有在栈上等待的线程
finishCompletion();
}
return true;
}
任务取消后,最终调用finishCompletion方法,释放所有在栈上等待的线程
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) { //自旋释放所有等待线程
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);//唤醒线程
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
FutureTask可以通过get方法获取任务结果,如果需要限时等待,可以调用get(long timeout, TimeUnit unit)
public V get() throws InterruptedException, ExecutionException {
int s = state;
//当前任务的状态是NEW或COMPLETING,会调用awaitDone阻塞线程
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s); // 任务执行结果
}
/**
* 返回执行结果.
*/
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V) x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable) x);
}
1、ScheduledFutureTask在普通FutureTask的基础上增加了周期执行/延迟执行的功能
2、ScheduledFutureTask是ScheduledThreadPoolExecutor这个线程池的默认调度任务类,通过继承FutureTask和Delayed接口来实现周期/延迟功能的。
public void run() {
// 是否是周期任务
boolean periodic = isPeriodic();
//// 能否运行任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic) // 非周期任务:调用FutureTask的run方法运行
ScheduledFutureTask.super.run();
// 周期任务:调用FutureTask的runAndReset方法运行
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
FutureTask的runAndReset方法与run方法的区别就是当任务正常执行完成后,不会设置任务的最终状态(即保持NEW状态),以便任务重复执行:
protected boolean runAndReset() {
// 仅NEW状态的任务可以执行
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); //不设置执行结果
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;//重新设置任务状态为NEW,继续重复执行
}
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。