目录

1、join异步阻塞

2、Callable接口和TaskFuture类的异步执行

3、Guava的异步回调

4、Guava异步回调的底层原理


1、join异步阻塞

主线程中调用子线程的join方法,主线程被阻塞,一直等到子线程返回才能往下执行;

当然你可以设置超时时间来控制阻塞的时常,但是这种方法不知道子线程的执行结果,无法控制子任务的执行结果,换句话说就是老板不知道员工干活的结果,结果失控。

2、Callable接口和TaskFuture类的异步执行

这是一次伟大的改进,但是伟大的不彻底,这一次老板可以掌控员工干活的结果了。

我们将任务封装在Callable接口的call方法中,然后将实现了Callable接口的任务类的实例作为参数传递给FutureTask(Callable t) f,将f作为参数传递给new Thread(f),最后可以通过FutureTask.get()方法获取任务执行结果。

这个方法虽然可以监管子程序的任务执行结果,成功了奖励,失败来了惩罚,对吧!

但是,这个方案有个缺点,就是老板在使用FutureTask.get()自己被阻塞了,如果这个方法不返回,那么老板就一直在等这个结果,不能抽身去干别的!

3、Guava的异步回调

这时候,我们优秀的Guava就闪亮登场了!

这个设计方案中,老板不会为获取员工的行为结果而”僵化“,他还是一个自由的老板!

先来看一下Guava的异步回调怎么使用吧!我写了一个例子如下:

Pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>ox-new-bc</artifactId><groupId>com.miller</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>future-ox</artifactId><dependencies><!-- https://mvnrepository.com/artifact/com.google.guava/guava --><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>30.1-jre</version></dependency></dependencies></project>

测试的dome是这样的:


import com.google.common.util.concurrent.*;
import org.checkerframework.checker.nullness.qual.Nullable;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @program: oxnewbc* @description: guava的测试类* @author: Miller.FAN* @create: 2021-03-22 11:22**/
public class GuavaTest {//全局变量定义static Boolean hotB= false;static Boolean washC = false;//创建java线程池private static ExecutorService jPool = Executors.newFixedThreadPool(10);//构造guava线程池private static ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);//新建一个烧水任务private static HotWater hotWater = new HotWater();//新建一个洗茶的任务private static WashTea washTea = new WashTea();//新建一个主任务public static MainTask mainTask = new MainTask();//提交任务private static ListenableFuture<Boolean> hotFuture = gPool.submit(hotWater);public static void main(String[] args) {//绑定异步回调的实例到异步任务上Futures.addCallback(hotFuture, new FutureCallback<Boolean>() {public void onSuccess(@Nullable Boolean aBoolean) {if (aBoolean)hotB = true;}public void onFailure(Throwable throwable) {System.out.println("烧水失败,不能喝茶了");}},jPool);ListenableFuture<Boolean> washFuture = gPool.submit(washTea);Futures.addCallback(washFuture, new FutureCallback<Boolean>() {public void onSuccess(@Nullable Boolean aBoolean) {if (aBoolean)washC = true;}public void onFailure(Throwable throwable) {System.out.println("洗茶失败,不能喝茶了");}},jPool);//mainTask.run();}//烧水的任务static class HotWater implements Callable<Boolean> {public Boolean call() throws Exception {Thread.sleep(5000);System.out.println("烧水,烧水!");return true;}}//洗茶的任务static class WashTea implements Callable<Boolean> {public Boolean call() throws Exception {Thread.sleep(3000);System.out.println("洗茶,洗茶!");return true;}}//主任务static class MainTask implements Runnable{public void run() {while(true) {if(hotB  && washC)System.out.println("泡茶了,泡茶了!");else {System.out.println("等待返回值,我先睡一会儿!");}try {Thread.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}}}}}

执行的结果是这样的:

等待返回值,我先睡一会儿!
等待返回值,我先睡一会儿!
等待返回值,我先睡一会儿!
等待返回值,我先睡一会儿!
等待返回值,我先睡一会儿!
等待返回值,我先睡一会儿!
等待返回值,我先睡一会儿!
等待返回值,我先睡一会儿!
等待返回值,我先睡一会儿!
等待返回值,我先睡一会儿!
洗茶,洗茶!
等待返回值,我先睡一会儿!
等待返回值,我先睡一会儿!
等待返回值,我先睡一会儿!
等待返回值,我先睡一会儿!
等待返回值,我先睡一会儿!
等待返回值,我先睡一会儿!
等待返回值,我先睡一会儿!
烧水,烧水!
泡茶了,泡茶了!
泡茶了,泡茶了!
泡茶了,泡茶了!
泡茶了,泡茶了!

在烧水和洗茶完成之前,老板一直在睡觉。如果换成Future.get(),主线程就不得不等待,老板就不能睡整觉,每次询问结果都要爬起来。体会到这个模式的好处了吧!

4、Guava异步回调的底层原理

Guava异步回调的核心在于Futures.addCallback,它是一个静态方法。这里注意你使用的Guava的版本,不同的版本这个方法的实现可能有差别!至少我看到其他人写的例子是没有第三个参数的。

    public static <V> void addCallback(ListenableFuture<V> future, FutureCallback<? super V> callback, Executor executor) {//这句是参数检查,我们不管Preconditions.checkNotNull(callback);//关键是这个future.addListener(new Futures.CallbackListener(future, callback), executor);}

我们看到这个静态方法里边还封装了一个方法,添加了一个监听器。

@DoNotMock("Use the methods in Futures (like immediateFuture) or SettableFuture")
public interface ListenableFuture<V> extends Future<V> {void addListener(Runnable var1, Executor var2);
}

监听器的第一个参数是:

new Futures.CallbackListener(future, callback)
//也就是我们代码里传入的第一个参数和第二个参数new Futures.CallbackListener(hotFuture, new FutureCallback<Boolean>() {public void onSuccess(@Nullable Boolean aBoolean) {if (aBoolean)hotB = true;}public void onFailure(Throwable throwable) {System.out.println("烧水失败,不能喝茶了");}})

接下来我们zh重点去看一下CallbackListener:

 private static final class CallbackListener<V> implements Runnable {final Future<V> future;final FutureCallback<? super V> callback;CallbackListener(Future<V> future, FutureCallback<? super V> callback) {this.future = future;this.callback = callback;}public void run() {if (this.future instanceof InternalFutureFailureAccess) {Throwable failure = InternalFutures.tryInternalFastPathGetFailure((InternalFutureFailureAccess)this.future);if (failure != null) {this.callback.onFailure(failure);return;}}Object value;try {value = Futures.getDone(this.future);} catch (ExecutionException var3) {this.callback.onFailure(var3.getCause());return;} catch (Error | RuntimeException var4) {this.callback.onFailure(var4);return;}this.callback.onSuccess(value);}public String toString() {return MoreObjects.toStringHelper(this).addValue(this.callback).toString();}}

Futures的静态方法addCallback

    public static <V> void addCallback(ListenableFuture<V> future, FutureCallback<? super V> callback, Executor executor) {Preconditions.checkNotNull(callback); //参数检查//重点关注这里future.addListener(new Futures.CallbackListener(future, callback), executor);}private static final class CallbackListener<V> implements Runnable {final Future<V> future;final FutureCallback<? super V> callback;//生成一个回调监听器的实例CallbackListener(Future<V> future, FutureCallback<? super V> callback) {this.future = future;this.callback = callback;}public void run() {if (this.future instanceof InternalFutureFailureAccess) {Throwable failure = InternalFutures.tryInternalFastPathGetFailure((InternalFutureFailureAccess)this.future);if (failure != null) {this.callback.onFailure(failure);return;}}Object value;try {value = Futures.getDone(this.future);} catch (ExecutionException var3) {this.callback.onFailure(var3.getCause());return;} catch (Error | RuntimeException var4) {this.callback.onFailure(var4);return;}this.callback.onSuccess(value);}public String toString() {return MoreObjects.toStringHelper(this).addValue(this.callback).toString();}}

接下来跳转到:

        //该方法的位置---abstract class FluentFuture<V>public final void addListener(Runnable listener, Executor executor) {super.addListener(listener, executor);}

在进入其超类的addListener方法:

public void addListener(Runnable listener, Executor executor) {Preconditions.checkNotNull(listener, "Runnable was null.");  //参数检查Preconditions.checkNotNull(executor, "Executor was null.");  //参数检查if (!this.isDone()) { //判断异步任务是否已经被执行 ,如果没有执行就构建节点AbstractFuture.Listener oldHead = this.listeners;if (oldHead != AbstractFuture.Listener.TOMBSTONE) {//构建节点AbstractFuture.Listener newNode = new AbstractFuture.Listener(listener, executor);//节点添加到一个节点链表中do {newNode.next = oldHead;if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {return;}oldHead = this.listeners;} while(oldHead != AbstractFuture.Listener.TOMBSTONE);}}executeListener(listener, executor);  //如果异步任务已经被执行,那么就执行监听器的回调业务逻辑,执行任务的线程就是用户传入的executor中的线程}

最终执行在这里:

    private static void executeListener(Runnable runnable, Executor executor) {try {executor.execute(runnable);   //直接把任务放进执行单元执行} catch (RuntimeException var5) {Logger var10000 = log;java.util.logging.Level var10001 = java.util.logging.Level.SEVERE;String var3 = String.valueOf(runnable);String var4 = String.valueOf(executor);var10000.log(var10001, (new StringBuilder(57 + String.valueOf(var3).length() + String.valueOf(var4).length())).append("RuntimeException while executing runnable ").append(var3).append(" with executor ").append(var4).toString(), var5);}}

捋一捋,捋到这里已经找到最想要的底层逻辑到底在什么地方了,接下来来研究一下AbstractFuture


import sun.misc.Unsafe;@GwtCompatible(emulated = true
)
@ReflectionSupport(Level.FULL)
public abstract class AbstractFuture<V> extends InternalFutureFailureAccess implements ListenableFuture<V> {private static final boolean GENERATE_CANCELLATION_CAUSES;private static final Logger log;private static final long SPIN_THRESHOLD_NANOS = 1000L;private static final AbstractFuture.AtomicHelper ATOMIC_HELPER;private static final Object NULL;@Nullableprivate volatile Object value;@Nullableprivate volatile AbstractFuture.Listener listeners;@Nullableprivate volatile AbstractFuture.Waiter waiters;private void removeWaiter(AbstractFuture.Waiter node) {node.thread = null;label28:while(true) {AbstractFuture.Waiter pred = null;AbstractFuture.Waiter curr = this.waiters;if (curr == AbstractFuture.Waiter.TOMBSTONE) {return;}AbstractFuture.Waiter succ;for(; curr != null; curr = succ) {succ = curr.next;if (curr.thread != null) {pred = curr;} else if (pred != null) {pred.next = succ;if (pred.thread == null) {continue label28;}} else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) {continue label28;}}return;}}protected AbstractFuture() {}@CanIgnoreReturnValuepublic V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException {long timeoutNanos = unit.toNanos(timeout);long remainingNanos = timeoutNanos;if (Thread.interrupted()) {throw new InterruptedException();} else {Object localValue = this.value;if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {return this.getDoneValue(localValue);} else {long endNanos = timeoutNanos > 0L ? System.nanoTime() + timeoutNanos : 0L;if (timeoutNanos >= 1000L) {label136: {AbstractFuture.Waiter oldHead = this.waiters;if (oldHead != AbstractFuture.Waiter.TOMBSTONE) {AbstractFuture.Waiter node = new AbstractFuture.Waiter();do {node.setNext(oldHead);if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {do {OverflowAvoidingLockSupport.parkNanos(this, remainingNanos);if (Thread.interrupted()) {this.removeWaiter(node);throw new InterruptedException();}localValue = this.value;if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {return this.getDoneValue(localValue);}remainingNanos = endNanos - System.nanoTime();} while(remainingNanos >= 1000L);this.removeWaiter(node);break label136;}oldHead = this.waiters;} while(oldHead != AbstractFuture.Waiter.TOMBSTONE);}return this.getDoneValue(this.value);}}while(remainingNanos > 0L) {localValue = this.value;if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {return this.getDoneValue(localValue);}if (Thread.interrupted()) {throw new InterruptedException();}remainingNanos = endNanos - System.nanoTime();}String futureToString = this.toString();String unitString = unit.toString().toLowerCase(Locale.ROOT);String var14 = unit.toString().toLowerCase(Locale.ROOT);String message = (new StringBuilder(28 + String.valueOf(var14).length())).append("Waited ").append(timeout).append(" ").append(var14).toString();if (remainingNanos + 1000L < 0L) {message = String.valueOf(message).concat(" (plus ");long overWaitNanos = -remainingNanos;long overWaitUnits = unit.convert(overWaitNanos, TimeUnit.NANOSECONDS);long overWaitLeftoverNanos = overWaitNanos - unit.toNanos(overWaitUnits);boolean shouldShowExtraNanos = overWaitUnits == 0L || overWaitLeftoverNanos > 1000L;String var21;if (overWaitUnits > 0L) {var21 = String.valueOf(message);message = (new StringBuilder(21 + String.valueOf(var21).length() + String.valueOf(unitString).length())).append(var21).append(overWaitUnits).append(" ").append(unitString).toString();if (shouldShowExtraNanos) {message = String.valueOf(message).concat(",");}message = String.valueOf(message).concat(" ");}if (shouldShowExtraNanos) {var21 = String.valueOf(message);message = (new StringBuilder(33 + String.valueOf(var21).length())).append(var21).append(overWaitLeftoverNanos).append(" nanoseconds ").toString();}message = String.valueOf(message).concat("delay)");}if (this.isDone()) {throw new TimeoutException(String.valueOf(message).concat(" but future completed as timeout expired"));} else {throw new TimeoutException((new StringBuilder(5 + String.valueOf(message).length() + String.valueOf(futureToString).length())).append(message).append(" for ").append(futureToString).toString());}}}}@CanIgnoreReturnValuepublic V get() throws InterruptedException, ExecutionException {if (Thread.interrupted()) {throw new InterruptedException();} else {Object localValue = this.value;if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {return this.getDoneValue(localValue);} else {AbstractFuture.Waiter oldHead = this.waiters;if (oldHead != AbstractFuture.Waiter.TOMBSTONE) {AbstractFuture.Waiter node = new AbstractFuture.Waiter();do {node.setNext(oldHead);if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {do {LockSupport.park(this);if (Thread.interrupted()) {this.removeWaiter(node);throw new InterruptedException();}localValue = this.value;} while(!(localValue != null & !(localValue instanceof AbstractFuture.SetFuture)));return this.getDoneValue(localValue);}oldHead = this.waiters;} while(oldHead != AbstractFuture.Waiter.TOMBSTONE);}return this.getDoneValue(this.value);}}}private V getDoneValue(Object obj) throws ExecutionException {if (obj instanceof AbstractFuture.Cancellation) {throw cancellationExceptionWithCause("Task was cancelled.", ((AbstractFuture.Cancellation)obj).cause);} else if (obj instanceof AbstractFuture.Failure) {throw new ExecutionException(((AbstractFuture.Failure)obj).exception);} else {return obj == NULL ? null : obj;}}public boolean isDone() {Object localValue = this.value;return localValue != null & !(localValue instanceof AbstractFuture.SetFuture);}public boolean isCancelled() {Object localValue = this.value;return localValue instanceof AbstractFuture.Cancellation;}@CanIgnoreReturnValuepublic boolean cancel(boolean mayInterruptIfRunning) {Object localValue = this.value;boolean rValue = false;if (localValue == null | localValue instanceof AbstractFuture.SetFuture) {Object valueToSet = GENERATE_CANCELLATION_CAUSES ? new AbstractFuture.Cancellation(mayInterruptIfRunning, new CancellationException("Future.cancel() was called.")) : (mayInterruptIfRunning ? AbstractFuture.Cancellation.CAUSELESS_INTERRUPTED : AbstractFuture.Cancellation.CAUSELESS_CANCELLED);AbstractFuture abstractFuture = this;while(true) {while(!ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) {localValue = abstractFuture.value;if (!(localValue instanceof AbstractFuture.SetFuture)) {return rValue;}}rValue = true;if (mayInterruptIfRunning) {abstractFuture.interruptTask();}complete(abstractFuture);if (!(localValue instanceof AbstractFuture.SetFuture)) {break;}ListenableFuture<?> futureToPropagateTo = ((AbstractFuture.SetFuture)localValue).future;if (!(futureToPropagateTo instanceof AbstractFuture.Trusted)) {futureToPropagateTo.cancel(mayInterruptIfRunning);break;}AbstractFuture<?> trusted = (AbstractFuture)futureToPropagateTo;localValue = trusted.value;if (!(localValue == null | localValue instanceof AbstractFuture.SetFuture)) {break;}abstractFuture = trusted;}}return rValue;}protected void interruptTask() {}protected final boolean wasInterrupted() {Object localValue = this.value;return localValue instanceof AbstractFuture.Cancellation && ((AbstractFuture.Cancellation)localValue).wasInterrupted;}public void addListener(Runnable listener, Executor executor) {Preconditions.checkNotNull(listener, "Runnable was null.");Preconditions.checkNotNull(executor, "Executor was null.");if (!this.isDone()) {AbstractFuture.Listener oldHead = this.listeners;if (oldHead != AbstractFuture.Listener.TOMBSTONE) {AbstractFuture.Listener newNode = new AbstractFuture.Listener(listener, executor);do {newNode.next = oldHead;if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {return;}oldHead = this.listeners;} while(oldHead != AbstractFuture.Listener.TOMBSTONE);}}executeListener(listener, executor);}@CanIgnoreReturnValueprotected boolean set(@Nullable V value) {Object valueToSet = value == null ? NULL : value;if (ATOMIC_HELPER.casValue(this, (Object)null, valueToSet)) {complete(this);return true;} else {return false;}}@CanIgnoreReturnValueprotected boolean setException(Throwable throwable) {Object valueToSet = new AbstractFuture.Failure((Throwable)Preconditions.checkNotNull(throwable));if (ATOMIC_HELPER.casValue(this, (Object)null, valueToSet)) {complete(this);return true;} else {return false;}}@CanIgnoreReturnValueprotected boolean setFuture(ListenableFuture<? extends V> future) {Preconditions.checkNotNull(future);Object localValue = this.value;if (localValue == null) {if (future.isDone()) {Object value = getFutureValue(future);if (ATOMIC_HELPER.casValue(this, (Object)null, value)) {complete(this);return true;}return false;}AbstractFuture.SetFuture valueToSet = new AbstractFuture.SetFuture(this, future);if (ATOMIC_HELPER.casValue(this, (Object)null, valueToSet)) {try {future.addListener(valueToSet, DirectExecutor.INSTANCE);} catch (Throwable var8) {Throwable t = var8;AbstractFuture.Failure failure;try {failure = new AbstractFuture.Failure(t);} catch (Throwable var7) {failure = AbstractFuture.Failure.FALLBACK_INSTANCE;}ATOMIC_HELPER.casValue(this, valueToSet, failure);}return true;}localValue = this.value;}if (localValue instanceof AbstractFuture.Cancellation) {future.cancel(((AbstractFuture.Cancellation)localValue).wasInterrupted);}return false;}private static Object getFutureValue(ListenableFuture<?> future) {if (future instanceof AbstractFuture.Trusted) {Object v = ((AbstractFuture)future).value;if (v instanceof AbstractFuture.Cancellation) {AbstractFuture.Cancellation c = (AbstractFuture.Cancellation)v;if (c.wasInterrupted) {v = c.cause != null ? new AbstractFuture.Cancellation(false, c.cause) : AbstractFuture.Cancellation.CAUSELESS_CANCELLED;}}return v;} else {if (future instanceof InternalFutureFailureAccess) {Throwable throwable = InternalFutures.tryInternalFastPathGetFailure((InternalFutureFailureAccess)future);if (throwable != null) {return new AbstractFuture.Failure(throwable);}}boolean wasCancelled = future.isCancelled();if (!GENERATE_CANCELLATION_CAUSES & wasCancelled) {return AbstractFuture.Cancellation.CAUSELESS_CANCELLED;} else {String var3;try {Object v = getUninterruptibly(future);if (wasCancelled) {var3 = String.valueOf(future);return new AbstractFuture.Cancellation(false, new IllegalArgumentException((new StringBuilder(84 + String.valueOf(var3).length())).append("get() did not throw CancellationException, despite reporting isCancelled() == true: ").append(var3).toString()));} else {return v == null ? NULL : v;}} catch (ExecutionException var4) {if (wasCancelled) {var3 = String.valueOf(future);return new AbstractFuture.Cancellation(false, new IllegalArgumentException((new StringBuilder(84 + String.valueOf(var3).length())).append("get() did not throw CancellationException, despite reporting isCancelled() == true: ").append(var3).toString(), var4));} else {return new AbstractFuture.Failure(var4.getCause());}} catch (CancellationException var5) {if (!wasCancelled) {var3 = String.valueOf(future);return new AbstractFuture.Failure(new IllegalArgumentException((new StringBuilder(77 + String.valueOf(var3).length())).append("get() threw CancellationException, despite reporting isCancelled() == false: ").append(var3).toString(), var5));} else {return new AbstractFuture.Cancellation(false, var5);}} catch (Throwable var6) {return new AbstractFuture.Failure(var6);}}}}private static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {boolean interrupted = false;try {while(true) {try {Object var2 = future.get();return var2;} catch (InterruptedException var6) {interrupted = true;}}} finally {if (interrupted) {Thread.currentThread().interrupt();}}}private static void complete(AbstractFuture<?> future) {AbstractFuture.Listener next = null;label23:while(true) {future.releaseWaiters();future.afterDone();next = future.clearListeners(next);future = null;while(next != null) {AbstractFuture.Listener curr = next;next = next.next;Runnable task = curr.task;if (task instanceof AbstractFuture.SetFuture) {AbstractFuture.SetFuture<?> setFuture = (AbstractFuture.SetFuture)task;future = setFuture.owner;if (future.value == setFuture) {Object valueToSet = getFutureValue(setFuture.future);if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) {continue label23;}}} else {executeListener(task, curr.executor);}}return;}}@Beta@ForOverrideprotected void afterDone() {}@Nullableprotected final Throwable tryInternalFastPathGetFailure() {if (this instanceof AbstractFuture.Trusted) {Object obj = this.value;if (obj instanceof AbstractFuture.Failure) {return ((AbstractFuture.Failure)obj).exception;}}return null;}final void maybePropagateCancellationTo(@Nullable Future<?> related) {if (related != null & this.isCancelled()) {related.cancel(this.wasInterrupted());}}private void releaseWaiters() {AbstractFuture.Waiter head;do {head = this.waiters;} while(!ATOMIC_HELPER.casWaiters(this, head, AbstractFuture.Waiter.TOMBSTONE));for(AbstractFuture.Waiter currentWaiter = head; currentWaiter != null; currentWaiter = currentWaiter.next) {currentWaiter.unpark();}}private AbstractFuture.Listener clearListeners(AbstractFuture.Listener onto) {AbstractFuture.Listener head;do {head = this.listeners;} while(!ATOMIC_HELPER.casListeners(this, head, AbstractFuture.Listener.TOMBSTONE));AbstractFuture.Listener reversedList;AbstractFuture.Listener tmp;for(reversedList = onto; head != null; reversedList = tmp) {tmp = head;head = head.next;tmp.next = reversedList;}return reversedList;}public String toString() {StringBuilder builder = new StringBuilder();if (this.getClass().getName().startsWith("com.google.common.util.concurrent.")) {builder.append(this.getClass().getSimpleName());} else {builder.append(this.getClass().getName());}builder.append('@').append(Integer.toHexString(System.identityHashCode(this))).append("[status=");if (this.isCancelled()) {builder.append("CANCELLED");} else if (this.isDone()) {this.addDoneString(builder);} else {this.addPendingString(builder);}return builder.append("]").toString();}@Nullableprotected String pendingToString() {if (this instanceof ScheduledFuture) {long var1 = ((ScheduledFuture)this).getDelay(TimeUnit.MILLISECONDS);return (new StringBuilder(41)).append("remaining delay=[").append(var1).append(" ms]").toString();} else {return null;}}private void addPendingString(StringBuilder builder) {int truncateLength = builder.length();builder.append("PENDING");Object localValue = this.value;if (localValue instanceof AbstractFuture.SetFuture) {builder.append(", setFuture=[");this.appendUserObject(builder, ((AbstractFuture.SetFuture)localValue).future);builder.append("]");} else {String pendingDescription;try {pendingDescription = Strings.emptyToNull(this.pendingToString());} catch (StackOverflowError | RuntimeException var7) {String var6 = String.valueOf(var7.getClass());pendingDescription = (new StringBuilder(38 + String.valueOf(var6).length())).append("Exception thrown from implementation: ").append(var6).toString();}if (pendingDescription != null) {builder.append(", info=[").append(pendingDescription).append("]");}}if (this.isDone()) {builder.delete(truncateLength, builder.length());this.addDoneString(builder);}}private void addDoneString(StringBuilder builder) {try {V value = getUninterruptibly(this);builder.append("SUCCESS, result=[");this.appendResultObject(builder, value);builder.append("]");} catch (ExecutionException var3) {builder.append("FAILURE, cause=[").append(var3.getCause()).append("]");} catch (CancellationException var4) {builder.append("CANCELLED");} catch (RuntimeException var5) {builder.append("UNKNOWN, cause=[").append(var5.getClass()).append(" thrown from get()]");}}private void appendResultObject(StringBuilder builder, Object o) {if (o == null) {builder.append("null");} else if (o == this) {builder.append("this future");} else {builder.append(o.getClass().getName()).append("@").append(Integer.toHexString(System.identityHashCode(o)));}}private void appendUserObject(StringBuilder builder, Object o) {try {if (o == this) {builder.append("this future");} else {builder.append(o);}} catch (StackOverflowError | RuntimeException var4) {builder.append("Exception thrown from implementation: ").append(var4.getClass());}}private static void executeListener(Runnable runnable, Executor executor) {try {executor.execute(runnable);} catch (RuntimeException var5) {Logger var10000 = log;java.util.logging.Level var10001 = java.util.logging.Level.SEVERE;String var3 = String.valueOf(runnable);String var4 = String.valueOf(executor);var10000.log(var10001, (new StringBuilder(57 + String.valueOf(var3).length() + String.valueOf(var4).length())).append("RuntimeException while executing runnable ").append(var3).append(" with executor ").append(var4).toString(), var5);}}private static CancellationException cancellationExceptionWithCause(@Nullable String message, @Nullable Throwable cause) {CancellationException exception = new CancellationException(message);exception.initCause(cause);return exception;}static {boolean generateCancellationCauses;try {generateCancellationCauses = Boolean.parseBoolean(System.getProperty("guava.concurrent.generate_cancellation_cause", "false"));} catch (SecurityException var7) {generateCancellationCauses = false;}GENERATE_CANCELLATION_CAUSES = generateCancellationCauses;log = Logger.getLogger(AbstractFuture.class.getName());Throwable thrownUnsafeFailure = null;Throwable thrownAtomicReferenceFieldUpdaterFailure = null;Object helper;try {helper = new AbstractFuture.UnsafeAtomicHelper();} catch (Throwable var6) {thrownUnsafeFailure = var6;try {helper = new AbstractFuture.SafeAtomicHelper(AtomicReferenceFieldUpdater.newUpdater(AbstractFuture.Waiter.class, Thread.class, "thread"), AtomicReferenceFieldUpdater.newUpdater(AbstractFuture.Waiter.class, AbstractFuture.Waiter.class, "next"), AtomicReferenceFieldUpdater.newUpdater(AbstractFuture.class, AbstractFuture.Waiter.class, "waiters"), AtomicReferenceFieldUpdater.newUpdater(AbstractFuture.class, AbstractFuture.Listener.class, "listeners"), AtomicReferenceFieldUpdater.newUpdater(AbstractFuture.class, Object.class, "value"));} catch (Throwable var5) {thrownAtomicReferenceFieldUpdaterFailure = var5;helper = new AbstractFuture.SynchronizedHelper();}}ATOMIC_HELPER = (AbstractFuture.AtomicHelper)helper;Class<?> ensureLoaded = LockSupport.class;if (thrownAtomicReferenceFieldUpdaterFailure != null) {log.log(java.util.logging.Level.SEVERE, "UnsafeAtomicHelper is broken!", thrownUnsafeFailure);log.log(java.util.logging.Level.SEVERE, "SafeAtomicHelper is broken!", thrownAtomicReferenceFieldUpdaterFailure);}NULL = new Object();}private static final class SynchronizedHelper extends AbstractFuture.AtomicHelper {private SynchronizedHelper() {super(null);}void putThread(AbstractFuture.Waiter waiter, Thread newValue) {waiter.thread = newValue;}void putNext(AbstractFuture.Waiter waiter, AbstractFuture.Waiter newValue) {waiter.next = newValue;}boolean casWaiters(AbstractFuture<?> future, AbstractFuture.Waiter expect, AbstractFuture.Waiter update) {synchronized(future) {if (future.waiters == expect) {future.waiters = update;return true;} else {return false;}}}boolean casListeners(AbstractFuture<?> future, AbstractFuture.Listener expect, AbstractFuture.Listener update) {synchronized(future) {if (future.listeners == expect) {future.listeners = update;return true;} else {return false;}}}boolean casValue(AbstractFuture<?> future, Object expect, Object update) {synchronized(future) {if (future.value == expect) {future.value = update;return true;} else {return false;}}}}private static final class SafeAtomicHelper extends AbstractFuture.AtomicHelper {final AtomicReferenceFieldUpdater<AbstractFuture.Waiter, Thread> waiterThreadUpdater;final AtomicReferenceFieldUpdater<AbstractFuture.Waiter, AbstractFuture.Waiter> waiterNextUpdater;final AtomicReferenceFieldUpdater<AbstractFuture, AbstractFuture.Waiter> waitersUpdater;final AtomicReferenceFieldUpdater<AbstractFuture, AbstractFuture.Listener> listenersUpdater;final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater;SafeAtomicHelper(AtomicReferenceFieldUpdater<AbstractFuture.Waiter, Thread> waiterThreadUpdater, AtomicReferenceFieldUpdater<AbstractFuture.Waiter, AbstractFuture.Waiter> waiterNextUpdater, AtomicReferenceFieldUpdater<AbstractFuture, AbstractFuture.Waiter> waitersUpdater, AtomicReferenceFieldUpdater<AbstractFuture, AbstractFuture.Listener> listenersUpdater, AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) {super(null);this.waiterThreadUpdater = waiterThreadUpdater;this.waiterNextUpdater = waiterNextUpdater;this.waitersUpdater = waitersUpdater;this.listenersUpdater = listenersUpdater;this.valueUpdater = valueUpdater;}void putThread(AbstractFuture.Waiter waiter, Thread newValue) {this.waiterThreadUpdater.lazySet(waiter, newValue);}void putNext(AbstractFuture.Waiter waiter, AbstractFuture.Waiter newValue) {this.waiterNextUpdater.lazySet(waiter, newValue);}boolean casWaiters(AbstractFuture<?> future, AbstractFuture.Waiter expect, AbstractFuture.Waiter update) {return this.waitersUpdater.compareAndSet(future, expect, update);}boolean casListeners(AbstractFuture<?> future, AbstractFuture.Listener expect, AbstractFuture.Listener update) {return this.listenersUpdater.compareAndSet(future, expect, update);}boolean casValue(AbstractFuture<?> future, Object expect, Object update) {return this.valueUpdater.compareAndSet(future, expect, update);}}private static final class UnsafeAtomicHelper extends AbstractFuture.AtomicHelper {static final Unsafe UNSAFE;static final long LISTENERS_OFFSET;static final long WAITERS_OFFSET;static final long VALUE_OFFSET;static final long WAITER_THREAD_OFFSET;static final long WAITER_NEXT_OFFSET;private UnsafeAtomicHelper() {super(null);}void putThread(AbstractFuture.Waiter waiter, Thread newValue) {UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue);}void putNext(AbstractFuture.Waiter waiter, AbstractFuture.Waiter newValue) {UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue);}boolean casWaiters(AbstractFuture<?> future, AbstractFuture.Waiter expect, AbstractFuture.Waiter update) {return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, expect, update);}boolean casListeners(AbstractFuture<?> future, AbstractFuture.Listener expect, AbstractFuture.Listener update) {return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, expect, update);}boolean casValue(AbstractFuture<?> future, Object expect, Object update) {return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update);}static {Unsafe unsafe = null;try {unsafe = Unsafe.getUnsafe();} catch (SecurityException var5) {try {unsafe = (Unsafe)AccessController.doPrivileged(new PrivilegedExceptionAction<Unsafe>() {public Unsafe run() throws Exception {Class<Unsafe> k = Unsafe.class;Field[] var2 = k.getDeclaredFields();int var3 = var2.length;for(int var4 = 0; var4 < var3; ++var4) {Field f = var2[var4];f.setAccessible(true);Object x = f.get((Object)null);if (k.isInstance(x)) {return (Unsafe)k.cast(x);}}throw new NoSuchFieldError("the Unsafe");}});} catch (PrivilegedActionException var4) {throw new RuntimeException("Could not initialize intrinsics", var4.getCause());}}try {Class<?> abstractFuture = AbstractFuture.class;WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters"));LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners"));VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value"));WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(AbstractFuture.Waiter.class.getDeclaredField("thread"));WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(AbstractFuture.Waiter.class.getDeclaredField("next"));UNSAFE = unsafe;} catch (Exception var3) {Throwables.throwIfUnchecked(var3);throw new RuntimeException(var3);}}}private abstract static class AtomicHelper {private AtomicHelper() {}abstract void putThread(AbstractFuture.Waiter var1, Thread var2);abstract void putNext(AbstractFuture.Waiter var1, AbstractFuture.Waiter var2);abstract boolean casWaiters(AbstractFuture<?> var1, AbstractFuture.Waiter var2, AbstractFuture.Waiter var3);abstract boolean casListeners(AbstractFuture<?> var1, AbstractFuture.Listener var2, AbstractFuture.Listener var3);abstract boolean casValue(AbstractFuture<?> var1, Object var2, Object var3);}private static final class SetFuture<V> implements Runnable {final AbstractFuture<V> owner;final ListenableFuture<? extends V> future;SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) {this.owner = owner;this.future = future;}public void run() {if (this.owner.value == this) {Object valueToSet = AbstractFuture.getFutureValue(this.future);if (AbstractFuture.ATOMIC_HELPER.casValue(this.owner, this, valueToSet)) {AbstractFuture.complete(this.owner);}}}}private static final class Cancellation {static final AbstractFuture.Cancellation CAUSELESS_INTERRUPTED;static final AbstractFuture.Cancellation CAUSELESS_CANCELLED;final boolean wasInterrupted;@Nullablefinal Throwable cause;Cancellation(boolean wasInterrupted, @Nullable Throwable cause) {this.wasInterrupted = wasInterrupted;this.cause = cause;}static {if (AbstractFuture.GENERATE_CANCELLATION_CAUSES) {CAUSELESS_CANCELLED = null;CAUSELESS_INTERRUPTED = null;} else {CAUSELESS_CANCELLED = new AbstractFuture.Cancellation(false, (Throwable)null);CAUSELESS_INTERRUPTED = new AbstractFuture.Cancellation(true, (Throwable)null);}}}private static final class Failure {static final AbstractFuture.Failure FALLBACK_INSTANCE = new AbstractFuture.Failure(new Throwable("Failure occurred while trying to finish a future.") {public synchronized Throwable fillInStackTrace() {return this;}});final Throwable exception;Failure(Throwable exception) {this.exception = (Throwable)Preconditions.checkNotNull(exception);}}//静态内部类,不容许继承,TOMBSTONE是墓碑的意思,明显的是单链表的节点,这个节点保存的是待执行的监听器回调逻辑和执行容器task和job组成的节点private static final class Listener {static final AbstractFuture.Listener TOMBSTONE = new AbstractFuture.Listener((Runnable)null, (Executor)null);final Runnable task;final Executor executor;@NullableAbstractFuture.Listener next;Listener(Runnable task, Executor executor) {this.task = task;this.executor = executor;}}//等待的线程单链表结构 ,静态的内部类,不容许继承private static final class Waiter {static final AbstractFuture.Waiter TOMBSTONE = new AbstractFuture.Waiter(false);@Nullablevolatile Thread thread;@Nullablevolatile AbstractFuture.Waiter next;Waiter(boolean unused) {}Waiter() {AbstractFuture.ATOMIC_HELPER.putThread(this, Thread.currentThread());}void setNext(AbstractFuture.Waiter next) {AbstractFuture.ATOMIC_HELPER.putNext(this, next);}void unpark() {Thread w = this.thread;if (w != null) {this.thread = null;LockSupport.unpark(w);}}}abstract static class TrustedFuture<V> extends AbstractFuture<V> implements AbstractFuture.Trusted<V> {TrustedFuture() {}@CanIgnoreReturnValuepublic final V get() throws InterruptedException, ExecutionException {return super.get();}@CanIgnoreReturnValuepublic final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return super.get(timeout, unit);}public final boolean isDone() {return super.isDone();}public final boolean isCancelled() {return super.isCancelled();}public final void addListener(Runnable listener, Executor executor) {super.addListener(listener, executor);}@CanIgnoreReturnValuepublic final boolean cancel(boolean mayInterruptIfRunning) {return super.cancel(mayInterruptIfRunning);}}interface Trusted<V> extends ListenableFuture<V> {}
}

深入理解Guava的异步回调模式相关推荐

  1. 【JUC系列】Future异步回调模式

    何为异步回调 前面只是一个例子,对并发的主要模式进行形象的说明. 下面正式来讲下经常使用的几个和并发相关的概念. 1.2.1. 同步.异步.阻塞.非阻塞 一:同步 所谓同步,就是在发出一个功能调用时, ...

  2. Guava Futures异步回调机制源码解析

    本文是在学习中的总结,欢迎转载但请注明出处:http://blog.csdn.net/pistolove/article/details/51758194 1.前言 在前两篇文章中简单阐述了Java ...

  3. Future异步回调详解

    文章目录 案例 join实现 FutureTask Guava 的异步回调 扩展 关于我 案例 在深入理解 异步回调模式前,我们以一个经典案例来说明,即 数学家华罗庚先生的文章<统筹方法> ...

  4. 使用Future实现异步回调的方式

    在JDK5中增加了Future异步获取结果的功能,但是这种方式在获取的时候是阻塞的,在正常场景下这种实现方式肯定是不太友好的,当然可以通过轮询的方式去获取异步结果,但是这种方式比较消耗CPU并且获取结 ...

  5. 【Promise】入门-同步回调-异步回调-JS中的异常error处理-Promis的理解和使用-基本使用-链式调用-七个关键问题

    文章目录 1. 预备知识 1.1 实例对象与函数对象 1.2 两种类型的回调函数 1. 同步回调 2. 异步回调 1.3 JS中的异常error处理 1. 错误的类型 2. 错误处理(捕获与抛出) 3 ...

  6. Java接口回调,异步回调理解

    文章目录 前言 一.回调简单理解 二.Java中用接口实现回调 1.实现接口回调 1.1同步回调 1.2 异步回调 2.为啥要用接口实现 3.关于接口(基础) 总结 前言 本文是作者在学习接口回调时看 ...

  7. 理解支付宝同步回调和异步回调

    支付宝同步回调和异步回调 当一个支付请求被发送到支付渠道方,支付渠道会很快返回一个结果.但是这个结果,只是告诉你调用成功了,不是扣款成功,这叫同步调用. 很多新手会拿这个结果 当作支付成功了,那就会被 ...

  8. 异步重试_异步重试模式

    异步重试 当您有一段经常失败且必须重试的代码时,此Java 7/8库提供了丰富且简洁的API以及针对此问题的快速且可扩展的解决方案: ScheduledExecutorService schedule ...

  9. java 回调模式_总结!!!总结!!!java回调以及future模式

    总是忘记,我这里直接写实际的东西,看其他的博客都是类图,文字描述,这里直接用代码描述. 疑问:什么是回调 回调,回调.要先有调用,才有调用者和被调用者之间的回调.所以在百度百科中是这样的: 软件模块之 ...

  10. 理解javascript中的回调函数(callback)【转】

    在JavaScrip中,function是内置的类对象,也就是说它是一种类型的对象,可以和其它String.Array.Number.Object类的对象一样用于内置对象的管理.因为function实 ...

最新文章

  1. 微生物组领域最高质量的资源全在这
  2. c# xml html标签,在asp.net(C#)中采用自定义标签和XML、XSL显示数据
  3. 利用Win32 Debug API打造自己的调试器Debugger
  4. 归并排序改良 java_Java 八种排序算法总结
  5. Ex 2_5 求解递推式..._第三次作业
  6. [html] 使用svg画一个爱心
  7. MyBatis框架学习笔记04:利用MyBatis实现条件查询
  8. LNMMP架构的实现
  9. day023 常用模块02
  10. 多个服务器数据互通_数据中心
  11. 如何为 Apple 设备使用通用控制?
  12. 纯CSS实现二级下拉导航菜单
  13. js遍历对象去除空格
  14. 基于verliog的异步模10计数器(含模块代码以及测试代码)
  15. 魔点人脸识别智慧工地实名制考勤管理系统
  16. sym4 matlab,使用MATLAB工具wfusimg函数进行图像的融合
  17. android自动夜间模式,夜晚的故事(android夜间模式实现)
  18. 我的世界服务器连接显示不见了,我的世界服务器为什么进去就卡,然后几十秒后显示“连接已丢失”怎么? 爱问知识人...
  19. 科猫网项目总结(基于SSM框架)
  20. 蓝牙耳机哪个品牌最好?数码博主整理2023超高性价比蓝牙耳机推荐

热门文章

  1. 责任编辑-www.saierhaowaigua.net
  2. BZOJ4340:[BJOI2015]隐身术(后缀数组,ST表,DFS)
  3. win7无法打开计算机共享文件夹,win7共享文件夹怎么设置?win7共享文件夹无法访问...
  4. 中国石油沥青行业市场供需与战略研究报告
  5. 这些百度搜索技巧,你知道吗?
  6. 帝国cms配置php,帝国cms如何安装
  7. 微分方程之一阶线性及可降阶方程
  8. kuangbin 最小生成树
  9. 基于PHP的学生在线考试管理系统
  10. pem证书转p12和p12证书转cert、key