


public interface Callable<V> {V call() throws Exception;


public interface Runnable {public abstract void run();


public interface Future<V> {boolean cancel(boolean mayInterruptIfRunning);boolean isCancelled();boolean isDone();V get() throws InterruptedException, ExecutionException;V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

Callable Runnable Future 都是为异步执行设计的接口类。Callable与Runnable接口的区别是Callable有返回值,并且会抛出异常信息,Runnable没有返回值,也不允许抛出异常。Future则可以判断任务是否执行完,是否取消,以及取消当前任务和获取结果。



Runnable runnable = new Runnable() {@Overridepublic void run() {// do business jobSystem.out.println("thread run = " + Math.random());}
};new Thread(runnable).start();
//  new Thread(runnable).run();System.out.println("done");



/* What will be run. */
private Runnable target;public Thread(Runnable target) {init(null, target, "Thread-" + nextThreadNum(), 0);
public void run() {if (target != null) {target.run();}
}/*** Causes this thread to begin execution; the Java Virtual Machine* calls the <code>run</code> method of this thread.*/
public synchronized void start() {if (threadStatus != 0)throw new IllegalThreadStateException();group.add(this);boolean started = false;try {start0();started = true;} finally {try {if (!started) {group.threadStartFailed(this);}} catch (Throwable ignore) {}}
}private native void start0();



Callable<Double> callable = new Callable<Double>() {@Overridepublic Double call() throws Exception {// do business jobreturn Math.random();}
};FutureTask<Double> future = new FutureTask<>(callable);new Thread(future).start();// do business jobSystem.out.println("future result = " + future.get());
System.out.println("future result = " + future.get(100, TimeUnit.MILLISECONDS));

Callable必须要结合Future来一起使用,声明一个callable实例,通过这个实例再生成一个FutureTask类型的实例放入Thread执行。当调用future.get()的时候,如果future task已经执行完毕则可以获得结果,否则堵塞当前线程直到线程执行完并且返回结果,future.get(long, TimeUnit)支持获取执行结果超时限制。


Thread(Runnable target) {...}
Thread(Runnable target, AccessControlContext acc) {...}
Thread(Runnable target, String name) {...}


public class FutureTask<V> implements RunnableFuture<V> {...public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;}...
}public interface RunnableFuture<V> extends Runnable, Future<V> {void run();


Future 源码阅读



/*** The run state of this task, initially NEW.  The run state* transitions to a terminal state only in methods set,* setException, and cancel.  During completion, state may take on* transient values of COMPLETING (while outcome is being set) or* INTERRUPTING (only while interrupting the runner to satisfy a* cancel(true)). Transitions from these intermediate to final* states use cheaper ordered/lazy writes because values are unique* and cannot be further modified.** Possible state transitions:* NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*/
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;


  1. NEW(初始化)-> COMPLETING(运行中)-> NORMAL(完成状态)
  2. NEW(初始化)-> COMPLETING(运行中)-> EXCEPTIONAL(运行发生错误)
  3. NEW(初始化)-> CANCELLED(还未运行已经被取消)
  4. NEW(初始化)-> INTERRUPTING(运行中被取消)-> INTERRUPTED(被取消状态)


/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }

callable即实际运行的callable对象,outcome是运行结果存储的变量,waiters是一个链表结构的东西,其实是一个对象拥有下面一个对象的指针,后面会解释(注释上说这个叫Treiber stack)。



public void run() {if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {runner = null;int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}



public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);
}private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);



private int awaitDone(boolean timed, long nanos) throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}
}static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }


  1. 如果线程状态大于COMPLETING,将q(waitNode)变量的thread设置为null,然后把线程状态返回出去
  2. 如果线程状态等于COMPLETING,调用Thread.yield()让出当前线程的CPU使用时间
  3. 如果q==null,创建一个新的WaitNode节点
  4. 如果queued==false(还未被加入等待队列),使用CAS操作将上一步创建的waitNode设置为waiters链表的表头
  5. 如果有超时限制,判断是否超时,如果超时,将waiters链表的节点移除,如果未超时,调用LockSupport.parkNanos()阻塞线程
  6. 以上都不满足,调用LockSupport.park()阻塞线程


  • 第一次循环执行q==null的条件,创建WaitNode节点。
  • 第二次循环执行!queued条件,将刚才创建的waitNode节点设置为waiters链表的表头。WaitNode类存了一个线程的引用以及下一个WaitNode节点的引用,这是一个单向链表的数据结构。
  • 第三次循环执行到LockSupport.park*()阻塞线程




protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}
}protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}


private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();callable = null;        // to reduce footprint
}protected void done() { }




