原始链接:https://blog.csdn.net/qq_35029061/article/details/86750369

深入学习java源码之Callable.call()与Future.get()

wespten 2019-02-03 08:07:50  7904  收藏 8
分类专栏: Java源码 文章标签: 深入学习java源码之Callable.call()与Fut
版权
深入学习java源码之Callable.call()与Future.get()

Callable和Future出现的原因

创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。 
这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。 
如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。

而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Callable和Future介绍

Callable接口代表一段可以调用并返回结果的代码;Future接口表示异步任务,是还没有完成的任务给出的未来结果。所以说Callable用于产生结果,Future用于获取结果。

Callable接口使用泛型去定义它的返回类型。Executors类提供了一些有用的方法在线程池中执行Callable内的任务。由于Callable任务是并行的(并行就是整体看上去是并行的,其实在某个时间点只有一个线程在执行),我们必须等待它返回的结果。 
java.util.concurrent.Future对象为我们解决了这个问题。在线程池提交Callable任务后返回了一个Future对象,使用它可以知道Callable任务的状态和得到Callable返回的执行结果。Future提供了get()方法让我们可以等待Callable结束并获取它的执行结果。

Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call():

public interface Callable<V> {
    V call() throws Exception;
}
可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

那么怎么使用Callable呢?

一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
第一个submit方法里面的参数类型就是Callable。

暂时只需要知道Callable一般是和ExecutorService配合来使用的,具体的使用方法讲在后面讲述。

一般情况下我们使用第一个submit方法和第三个submit方法,第二个submit方法很少使用。

Future

  Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

在Future接口中声明了5个方法,下面依次解释每个方法的作用:

cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。

isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

isDone方法表示任务是否已经完成,若任务完成,则返回true;

get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;

get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

也就是说Future提供了三种功能:

  1)判断任务是否完成;

  2)能够中断任务;

  3)能够获取任务执行结果。

  因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

public interface RunnableFuture<V> extends Runnable, Future<V> {  
    void run();  
}  
可以看到这个接口实现了Runnable和Future接口,接口中的具体实现由FutureTask来实现。这个类的两个构造方法如下 :

public FutureTask(Callable<V> callable) {  
        if (callable == null)  
            throw new NullPointerException();  
        sync = new Sync(callable);  
    }  
    public FutureTask(Runnable runnable, V result) {  
        sync = new Sync(Executors.callable(runnable, result));  
    }  
如上提供了两个构造函数,一个以Callable为参数,另外一个以Runnable为参数。这些类之间的关联对于任务建模的办法非常灵活,允许你基于FutureTask的Runnable特性(因为它实现了Runnable接口),把任务写成Callable,然后封装进一个由执行者调度并在必要时可以取消的FutureTask。

FutureTask可以由执行者调度,这一点很关键。它对外提供的方法基本上就是Future和Runnable接口的组合:get()、cancel、isDone()、isCancelled()和run(),而run()方法通常都是由执行者调用,我们基本上不需要直接调用它。

使用Callable,Future返回结果
Future<V>代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞。FutureTask<V>实现了Future<V>和Runable<V>。Callable代表一个有返回值得操作。

Callable<Integer> func = new Callable<Integer>(){  
        public Integer call() throws Exception {  
            System.out.println("inside callable");  
            Thread.sleep(1000);  
            return new Integer(8);  
        }         
    };        
    FutureTask<Integer> futureTask  = new FutureTask<Integer>(func);  
    Thread newThread = new Thread(futureTask);  
    newThread.start();  
      
    try {  
        System.out.println("blocking here");  
        Integer result = futureTask.get();  
        System.out.println(result);  
    } catch (InterruptedException ignored) {  
    } catch (ExecutionException ignored) {  
    } 
 ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。

FutureTask的例子

public class MyCallable implements Callable<String> {  
    private long waitTime;   
    public MyCallable(int timeInMillis){   
        this.waitTime=timeInMillis;  
    }  
    @Override  
    public String call() throws Exception {  
        Thread.sleep(waitTime);  
        //return the thread name executing this callable task  
        return Thread.currentThread().getName();  
    }  
 
}  
public class FutureTaskExample {  
     public static void main(String[] args) {  
        MyCallable callable1 = new MyCallable(1000);                       // 要执行的任务  
        MyCallable callable2 = new MyCallable(2000);  
 
        FutureTask<String> futureTask1 = new FutureTask<String>(callable1);// 将Callable写的任务封装到一个由执行者调度的FutureTask对象  
        FutureTask<String> futureTask2 = new FutureTask<String>(callable2);  
 
        ExecutorService executor = Executors.newFixedThreadPool(2);        // 创建线程池并返回ExecutorService实例  
        executor.execute(futureTask1);  // 执行任务  
        executor.execute(futureTask2);    
 
        while (true) {  
            try {  
                if(futureTask1.isDone() && futureTask2.isDone()){//  两个任务都完成  
                    System.out.println("Done");  
                    executor.shutdown();                          // 关闭线程池和服务   
                    return;  
                }  
 
                if(!futureTask1.isDone()){ // 任务1没有完成,会等待,直到任务完成  
                    System.out.println("FutureTask1 output="+futureTask1.get());  
                }  
 
                System.out.println("Waiting for FutureTask2 to complete");  
                String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);  
                if(s !=null){  
                    System.out.println("FutureTask2 output="+s);  
                }  
            } catch (InterruptedException | ExecutionException e) {  
                e.printStackTrace();  
            }catch(TimeoutException e){  
                //do nothing  
            }  
        }  
    }  
}  
运行如上程序后,可以看到一段时间内没有输出,因为get()方法等待任务执行完成然后才输出内容.

输出结果如下:

FutureTask1 output=pool-1-thread-1
Waiting for FutureTask2 to complete
Waiting for FutureTask2 to complete
Waiting for FutureTask2 to complete
Waiting for FutureTask2 to complete
Waiting for FutureTask2 to complete
FutureTask2 output=pool-1-thread-2
Done

例子:并行计算数组的和。

package executorservice;  
      
    import java.util.ArrayList;  
    import java.util.List;  
    import java.util.concurrent.Callable;  
    import java.util.concurrent.ExecutionException;  
    import java.util.concurrent.ExecutorService;  
    import java.util.concurrent.Executors;  
    import java.util.concurrent.Future;  
    import java.util.concurrent.FutureTask;  
      
    public class ConcurrentCalculator {  
      
        private ExecutorService exec;  
        private int cpuCoreNumber;  
        private List<Future<Long>> tasks = new ArrayList<Future<Long>>();  
      
        // 内部类  
        class SumCalculator implements Callable<Long> {  
            private int[] numbers;  
            private int start;  
            private int end;  
      
            public SumCalculator(final int[] numbers, int start, int end) {  
                this.numbers = numbers;  
                this.start = start;  
                this.end = end;  
            }  
      
            public Long call() throws Exception {  
                Long sum = 0l;  
                for (int i = start; i < end; i++) {  
                    sum += numbers[i];  
                }  
                return sum;  
            }  
        }  
      
        public ConcurrentCalculator() {  
            cpuCoreNumber = Runtime.getRuntime().availableProcessors();  
            exec = Executors.newFixedThreadPool(cpuCoreNumber);  
        }  
      
        public Long sum(final int[] numbers) {  
            // 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor  
            for (int i = 0; i < cpuCoreNumber; i++) {  
                int increment = numbers.length / cpuCoreNumber + 1;  
                int start = increment * i;  
                int end = increment * i + increment;  
                if (end > numbers.length)  
                    end = numbers.length;  
                SumCalculator subCalc = new SumCalculator(numbers, start, end);  
                FutureTask<Long> task = new FutureTask<Long>(subCalc);  
                tasks.add(task);  
                if (!exec.isShutdown()) {  
                    exec.submit(task);  
                }  
            }  
            return getResult();  
        }  
      
        /** 
         * 迭代每个只任务,获得部分和,相加返回 
         *  
         * @return 
         */  
        public Long getResult() {  
            Long result = 0l;  
            for (Future<Long> task : tasks) {  
                try {  
                    // 如果计算未完成则阻塞  
                    Long subSum = task.get();  
                    result += subSum;  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                } catch (ExecutionException e) {  
                    e.printStackTrace();  
                }  
            }  
            return result;  
        }  
      
        public void close() {  
            exec.shutdown();  
        }  
    }
   int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 };  
    ConcurrentCalculator calc = new ConcurrentCalculator();  
    Long sum = calc.sum(numbers);  
    System.out.println(sum);  
    calc.close();

java源码

Modifier and Type    Method and Description
V    call()
计算一个结果,如果不能这样做,就会抛出一个异常。

package java.util.concurrent;
 
@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}
Modifier and Type    Method and Description
boolean    cancel(boolean mayInterruptIfRunning)
尝试取消执行此任务。

V    get()
等待计算完成,然后检索其结果。

V    get(long timeout, TimeUnit unit)
如果需要等待最多在给定的时间计算完成,然后检索其结果(如果可用)。

boolean    isCancelled()
如果此任务在正常完成之前被取消,则返回 true 。

boolean    isDone()
返回 true如果任务已完成。

package java.util.concurrent;
 
public interface Future<V> {
 
    boolean cancel(boolean mayInterruptIfRunning);
 
    boolean isCancelled();
 
    boolean isDone();
 
    V get() throws InterruptedException, ExecutionException;
 
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
Modifier and Type    Method and Description
boolean    cancel(boolean mayInterruptIfRunning)
尝试取消执行此任务。

protected void    done()
此任务转换到状态 isDone (无论是正常还是通过取消)调用的受保护方法。

V    get()
等待计算完成,然后检索其结果。

V    get(long timeout, TimeUnit unit)
如果需要等待最多在给定的时间计算完成,然后检索其结果(如果可用)。

boolean    isCancelled()
如果此任务在正常完成之前取消,则返回 true 。

boolean    isDone()
返回 true如果任务已完成。

void    run()
将此未来设置为其计算结果,除非已被取消。

protected boolean    runAndReset()
执行计算而不设置其结果,然后将此将来重置为初始状态,如果计算遇到异常或被取消,则不执行此操作。

protected void    set(V v)
将此未来的结果设置为给定值,除非此未来已被设置或已被取消。

protected void    setException(Throwable t)
导致这个未来报告一个ExecutionException与给定的可抛弃的原因,除非这个未来已经被设置或被取消。

package java.util.concurrent;
import java.util.concurrent.locks.LockSupport;
 
public class FutureTask<V> implements RunnableFuture<V> {
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
 
    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
 
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
 
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
 
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
 
    public boolean isCancelled() {
        return state >= CANCELLED;
    }
 
    public boolean isDone() {
        return state != NEW;
    }
 
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
 
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
 
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
 
    protected void done() { }
 
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
 
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
 
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
 
    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }
 
    private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt
 
        // assert state == INTERRUPTED;
 
        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
    }
 
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
 
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
 
        done();
 
        callable = null;        // to reduce footprint
    }
 
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
 
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
 
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }
 
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
 
}
Modifier and Type    Method and Description
void    run()
将此未来设置为其计算结果,除非已被取消。

package java.util.concurrent;
 
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
Modifier and Type    Method and Description
boolean    isPeriodic()
如果此任务是周期性的,则返回 true 。

package java.util.concurrent;
 
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {
 
    boolean isPeriodic();
}
用于创建锁和其他同步类的基本线程阻塞原语。

这个类与每个使用它的线程相关联,一个许可证(在Semaphore类的意义上)。 如果许可证可用,则呼叫park将park返回,在此过程中消耗它; 否则可能会阻止。 致电unpark使许可证可用,如果尚不可用。 (与信号量不同,许可证不能累积,最多只有一个。)

方法park和unpark提供了阻止和解除阻塞线程的有效手段,该方法不会遇到导致不推荐使用的方法Thread.suspend和Thread.resume目的不能使用的问题:一个线程调用park和另一个线程之间的尝试unpark线程将保持活跃性,由于许可证。 另外,如果调用者的线程被中断, park将返回,并且支持超时版本。 park方法也可以在任何其他时间返回,因为“无理由”,因此一般必须在返回之前重新检查条件的循环中被调用。 在这个意义上, park作为一个“忙碌等待”的优化,不浪费时间旋转,但必须与unpark配对才能有效。

park的三种形式也支持blocker对象参数。 线程被阻止时记录此对象,以允许监视和诊断工具识别线程被阻止的原因。 (此类工具可以使用方法getBlocker(Thread)访问阻止程序 。)强烈鼓励使用这些形式而不是没有此参数的原始形式。 在锁实现中作为blocker提供的正常参数是this 。

Modifier and Type    Method and Description
static Object    getBlocker(Thread t)
返回提供给最近调用尚未解除阻塞的park方法的阻止程序对象,如果不阻止则返回null。

static void    park()
禁止当前线程进行线程调度,除非许可证可用。

static void    park(Object blocker)
禁止当前线程进行线程调度,除非许可证可用。

static void    parkNanos(long nanos)
禁用当前线程进行线程调度,直到指定的等待时间,除非许可证可用。

static void    parkNanos(Object blocker, long nanos)
禁用当前线程进行线程调度,直到指定的等待时间,除非许可证可用。

static void    parkUntil(long deadline)
禁用当前线程进行线程调度,直到指定的截止日期,除非许可证可用。

static void    parkUntil(Object blocker, long deadline)
禁用当前线程进行线程调度,直到指定的截止日期,除非许可证可用。

static void    unpark(Thread thread)
为给定的线程提供许可证(如果尚未提供)。

package java.util.concurrent.locks;
import sun.misc.Unsafe;
 
public class LockSupport {
    private LockSupport() {} // Cannot be instantiated.
 
    private static void setBlocker(Thread t, Object arg) {
        // Even though volatile, hotspot doesn't need a write barrier here.
        UNSAFE.putObject(t, parkBlockerOffset, arg);
    }
 
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }
 
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
 
    public static void parkNanos(Object blocker, long nanos) {
        if (nanos > 0) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, nanos);
            setBlocker(t, null);
        }
    }
 
    public static void parkUntil(Object blocker, long deadline) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(true, deadline);
        setBlocker(t, null);
    }
 
    public static Object getBlocker(Thread t) {
        if (t == null)
            throw new NullPointerException();
        return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
    }
 
    public static void park() {
        UNSAFE.park(false, 0L);
    }
 
    public static void parkNanos(long nanos) {
        if (nanos > 0)
            UNSAFE.park(false, nanos);
    }
 
    public static void parkUntil(long deadline) {
        UNSAFE.park(true, deadline);
    }
 
    static final int nextSecondarySeed() {
        int r;
        Thread t = Thread.currentThread();
        if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
            r ^= r << 13;   // xorshift
            r ^= r >>> 17;
            r ^= r << 5;
        }
        else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
            r = 1; // avoid zero
        UNSAFE.putInt(t, SECONDARY, r);
        return r;
    }
 
    // Hotspot implementation via intrinsics API
    private static final sun.misc.Unsafe UNSAFE;
    private static final long parkBlockerOffset;
    private static final long SEED;
    private static final long PROBE;
    private static final long SECONDARY;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            parkBlockerOffset = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            SEED = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSeed"));
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
            SECONDARY = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
        } catch (Exception ex) { throw new Error(ex); }
    }
 
}

————————————————
版权声明:本文为CSDN博主「wespten」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_35029061/article/details/86750369

转载:深入学习java源码之Callable.call()与Future.get()相关推荐

  1. Math源码java_深入学习java源码之Math.sin()与 Math.sqrt()

    深入学习java源码之Math.sin()与 Math.sqrt() native关键字 凡是一种语言,都希望是纯.比如解决某一个方案都喜欢就单单这个语言来写即可.Java平台有个用户和本地C代码进行 ...

  2. 深入学习java源码之Math.max()与 Math.min()

    深入学习java源码之Math.max()与 Math.min() java基本数据类型及自动转型 8种基本数据类型及其所占空间大小: 一.byte,占用一个字节,取值范围为 -128-127,默认是 ...

  3. 深入学习java源码之Math.addExact()与 Math.multiplyExact()

    深入学习java源码之Math.addExact()与 Math.multiplyExact() ^运算符 或的运算符,其运算规则是: 两个操作数的位中,相同则结果为0,不同则结果为1. int i ...

  4. 深入学习java源码之 Arrays.sort()与Arrays.parallelPrefix()

    深入学习java源码之 Arrays.sort()与Arrays.parallelPrefix() Comparator接口 能对不同类型的对象进行排序(当然排序依据还是基本类型),也不用自己实现排序 ...

  5. 如何学习java源码

    刚才在论坛不经意间,看到有关源码阅读的帖子.回想自己前几年,阅读源码那种兴奋和成就感(1),不禁又有一种激动. 源码阅读,我觉得最核心有三点:技术基础+强烈的求知欲+耐心. 说到技术基础,我打个比方吧 ...

  6. java 源码学习,Java源码剖析34讲学习笔记~4

    详解 ThreadPoolExecutor 的参数含义及源码执行流程 前言 在阿里巴巴的开发者手册中针对线程池有如下说明: [强制]线程池不允许使用 Executors 去创建,而是通过 Thread ...

  7. 面试官系统精讲Java源码及大厂真题 - 01 开篇词:为什么学习本专栏

    01 开篇词:为什么学习本专栏 更新时间:2019-10-30 10:08:31 才能一旦让懒惰支配,它就一无可为. --克雷洛夫 不为了源码而读源码,只为了更好的实践 你好,我是文贺,Java 技术 ...

  8. 基于Java毕业设计智友少儿编程学习平台源码+系统+mysql+lw文档+部署软件

    基于Java毕业设计智友少儿编程学习平台源码+系统+mysql+lw文档+部署软件 基于Java毕业设计智友少儿编程学习平台源码+系统+mysql+lw文档+部署软件 本源码技术栈: 项目架构:B/S ...

  9. java计算机毕业设计智友少儿编程学习平台源码+mysql数据库+系统+部署+lw文档

    java计算机毕业设计智友少儿编程学习平台源码+mysql数据库+系统+部署+lw文档 java计算机毕业设计智友少儿编程学习平台源码+mysql数据库+系统+部署+lw文档 本源码技术栈: 项目架构 ...

最新文章

  1. 1微秒等于多少皮秒_注册汽油贸易公司分享1升汽油等于多少公斤?
  2. Win7屏幕键盘 在哪 使用
  3. java nio Selector (新IO)分析
  4. Nginx配置文件粗解
  5. 电商企业纷纷结缘信息化 管理系统如何给力?
  6. asp.net 6中的mini api和mvc api性能对比
  7. 如何输入一个整数逆序输出_如何匹配DSP输入输出信号
  8. 2.两数相加 golang
  9. java用中根后根序列构造二叉树,106. 从中序与后序遍历序列构造二叉树
  10. 【OpenCV 例程200篇】74. 图像的抗混叠
  11. REDO LOG大小引起的Oracle数据库性能下降
  12. 【medium】220. Contains Duplicate III
  13. 理解 LruCache 机制
  14. spark和hadoop的区别
  15. MySQL不同数据类型如何表示_MySQL系列(二)--数据类型
  16. 中国目前拥有的人造卫星的种类及其作用
  17. 部署Hyper -V实现桌面虚拟化
  18. 来答疑了!关于网易云信 Innovation 2022 开发者大赛,你想知道的都在这儿!
  19. 嵌入式Linux中间件,高可用性(HA)和嵌入式管理中间件:Enea Element详解
  20. 20170305Meetup Git、heroku drop db

热门文章

  1. 查手机服务器ip和端口网站,手机怎么看ip和端口
  2. 小菜鸟的python学习之路(7)
  3. CentOS7安装apache2并启动
  4. 组合学:使用10个数字与52个字母生成1477万个不重复的4位串码V4衍生版本
  5. 时间序列预测——ARIMA
  6. python内存持续增长_Python 进程内存增长解决方案
  7. 【题解】UVa1665:Islands
  8. 美国临床营养专家:冬季营养建议
  9. jzoj. 4298. 【NOIP2015模拟11.2晚】我的天
  10. [破解]天草初级笔记