ForkJoinPool 的源码涉及到大量的位运算,这里会带领大家看下整体流程,想要理解的更深入,还需要大家自己一点点追踪查看

ForkJoinPool 里有三个重要的角色:

  • ForkJoinWorkerThread(继承 Thread):就是我们上面说的线程(Worker)
  • WorkQueue:双向的任务队列
  • ForkJoinTask:Worker 执行的对象

源码分析的整个流程围绕这几个类的方法来说明

ForkJoinPool 构造方法

public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false);
}public ForkJoinPool(int parallelism) {this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}public ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode) {this(checkParallelism(parallelism),checkFactory(factory),handler,asyncMode ? FIFO_QUEUE : LIFO_QUEUE,"ForkJoinPool-" + nextPoolId() + "-worker-");checkPermission();
}

除了以上三个构造方法之外,在 JDK1.8 中还增加了另外一种初始化 ForkJoinPool 对象的方式

static final ForkJoinPool common;/*** @return the common pool instance* @since 1.8*/
public static ForkJoinPool commonPool() {// assert common != null : "static init error";return common;
}
Common 是在静态块里面初始化的(只会被执行一次):common = java.security.AccessController.doPrivileged(new java.security.PrivilegedAction<ForkJoinPool>() {public ForkJoinPool run() { return makeCommonPool(); }});private static ForkJoinPool makeCommonPool() {int parallelism = -1;... 其他默认初始化内容 if (parallelism < 0 && // default 1 less than #cores(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)parallelism = 1;if (parallelism > MAX_CAP)parallelism = MAX_CAP;// 执行上面的构造方法return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,"ForkJoinPool.commonPool-worker-");
}

因为这是一个单例通用的 ForkJoinPool,所以切记:

如果使用通用 ForkJoinPool,最好只做 CPU 密集型的计算操作,不要有不确定性的 I/O 内容在任务里面,以防拖垮整体

上面所有的构造方法最后都会调用这个私有方法:

private ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,int mode,String workerNamePrefix) {this.workerNamePrefix = workerNamePrefix;this.factory = factory;this.ueh = handler;this.config = (parallelism & SMASK) | mode;long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}   

参数有点多,在这里解释一下每个参数的含义:

序号 参数名 描述/解释
1 parallelism 并行度,这并不是定义的线程数,具体线程数,以及 WorkQueue 的长度等都是根据这个并行度来计算的,通过上面 makeCommonPool 方法可以知道,parallelism 默认值是 CPU 核心线程数减 1
2 factory 很常见了,创建 ForkJoinWorkerThread 的工厂接口
3 handler 每个线程的异常处理器
4 mode 上面说的 WorkQueue 的模式,LIFO/FIFO;
5 workerNamePrefix ForkJoinWorkerThread的前缀名称
6 ctl 线程池的核心控制线程字段

CTL

他将一个64位分为了四段。每段16位。由高到低分别为AC,TC,SS,ID

AC: 正在运行工作线程数减去目标并行度,高16位

TC: 总工作线程数减去目标并行度,中高16位

SS: 栈顶工作线程状态和版本数(每一个线程在挂起时都会持有前一个等待线程所在工作队列的索引,由此构成一个等待的工作线程栈,栈顶是最新等待的线程),第一位表示状态 1:不活动(inactive); 0:活动(active),后15表示版本号,防止 ABA 问题

ID: 栈顶工作线程所在工作队列的索引

AC为负说明,活跃的worker没有到达预期数量,需要激活或者创建。 1.没有空闲的worker并且worker太少的话会创建,具体是调用tryAddWorker方法。 2.如果有空闲worker,获取到空闲worker,解除worker阻塞。并且更新sp。

通过SS我们可以进行判断是否存在空闲的worker。

ID用来定位该worker对应队列的下标。

runState

除了构造方法所构建的成员变量,ForkJoinPool 还有一个非常重要的成员变量 runState,和你之前了解的知识一样,线程池也需要状态来进行管理

volatile int runState;               // lockable status// runState bits: SHUTDOWN must be negative, others arbitrary powers of two
private static final int  RSLOCK     = 1;       //线程池被锁定
private static final int  RSIGNAL    = 1 << 1;    //线程池有线程需要唤醒
private static final int  STARTED    = 1 << 2;  //线程池已经初始化
private static final int  STOP       = 1 << 29;    //线程池停止
private static final int  TERMINATED = 1 << 30; //线程池终止
private static final int  SHUTDOWN   = 1 << 31; //线程池关闭

runState 有上面 6 种状态切换,按注释所言,只有 SHUTDOWN 状态是负数,其他都是整数

invoke/submit/execute

向 ForkJoinPool 提交任务有三种方法,它们之间的区别:

  • invoke:提交任务,并等待返回执行结果
  • submit:提交并立刻返回任务,ForkJoinTask实现了Future,可以充分利用 Future 的特性
  • execute:只提交任务

在这三大类基础上又重载了几个更细粒度的方法,这里不一一列举:

public <T> T invoke(ForkJoinTask<T> task) {if (task == null)throw new NullPointerException();externalPush(task);return task.join();
}public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {if (task == null)throw new NullPointerException();externalPush(task);return task;
}public void execute(ForkJoinTask<?> task) {if (task == null)throw new NullPointerException();externalPush(task);
}

提交任务的方法都会调用 externalPush(task) 这个方法

WorkQueue

讲 externalPush(task) 之前我们先讲下workQueue,externalPush第一行会定义一个workQueue
//初始队列容量
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
//最大队列容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M// Instance fields
volatile int scanState;    // versioned, <0: inactive; odd:scanning
int stackPred;             // pool stack (ctl) predecessor  前任池(WorkQueue[])索引,由此构成一个栈
int nsteals;               // number of steals  偷取的任务个数
int hint;                  // randomization and stealer index hint  记录偷取者的索引,方便后面顺藤摸瓜
int config;                // pool index and mode
volatile int qlock;        // 1: locked, < 0: terminate; else 0
volatile int base;         // index of next slot for poll
int top;                   // index of next slot for push
ForkJoinTask<?>[] array;   // the elements (initially unallocated)  任务数组
final ForkJoinPool pool;   // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared  当前工作队列的工作线程,共享模式下为null
volatile Thread parker;    // == owner during call to park; else null  调用park阻塞期间为owner,其他情况为null
volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin  记录当前join来的任务
volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer  记录从其他工作队列偷取过来的任务

WorkQueue 是一个双端队列,线程池有 runState,WorkQueue 有 scanState

  • 小于零:inactive (未激活状态)
  • 奇数:scanning (扫描状态)
  • 偶数:running (运行状态)

操作线程池需要锁,操作队列也是需要锁的,qlock 就派上用场了

  • 1: 锁定
  • 0:未锁定
  • 小于零:终止状态

externalPush

前面说过,task 会细分成 submission task 和 worker taskworker task 是 fork 出来的,那从这个入口进入的,自然也就是 submission task 了,也就是说:

  • 通过invoke() | submit() | execute() 等方法提交的 task, 是 submission task,会放到 WorkQueue 数组的偶数索引位置
  • 调用 fork() 方法生成出的任务,叫 worker task,会放到 WorkQueue 数组的奇数索引位置

该方法上的注释也写的很清楚,具体请参考代码注释

    /*** Tries to add the given task to a submission queue at* submitter's current queue. Only the (vastly) most common path* is directly handled in this method, while screening for need* for externalSubmit.** @param task the task. Caller must ensure non-null.*/final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;//Flag1: 通过ThreadLocalRandom产生随机数,用于下面计算槽位索引int r = ThreadLocalRandom.getProbe();int rs = runState; //初始状态为0//Flag2: 如果ws,即ForkJoinPool中的WorkQueue数组已经完成初始化,且根据随机数定位的index存在workQueue,且cas的方式加锁成功if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&//对WorkQueue操作加锁U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a; int am, n, s;//WorkQueue中的任务数组不为空if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) {  //组长度大于任务个数,不需要扩容int j = ((am & s) << ASHIFT) + ABASE; //WorkQueue中的任务数组不为空U.putOrderedObject(a, j, task); //向Queue中放入任务U.putOrderedInt(q, QTOP, s + 1);//top值加一U.putIntVolatile(q, QLOCK, 0);  //对WorkQueue操作解锁//任务个数小于等于1,那么此槽位上的线程有可能等待,如果大家都没任务,可能都在等待,新任务来了,唤醒,起来干活了if (n <= 1)//唤醒可能存在等待的线程signalWork(ws, q);return;}//任务入队失败,前面加锁了,这里也要解锁U.compareAndSwapInt(q, QLOCK, 1, 0);}//Flag3: 不满足上述条件,也就是说上面的这些 WorkQueue[]等都不存在,就要通过这个方法一切从头开始创建externalSubmit(task);}

上面加了三处 Flag,为了让大家更好的理解代码还是有必要做进一步说明的:

Flag1: ThreadLocalRandom 是 ThreadLocal 的衍生物,每个线程默认的 probe 是 0,当线程调用ThreadLocalRandom.current()时,会初始化 seed 和 probe,维护在线程内部,这里就知道是生成一个随机数就好,具体细节还是值得大家自行看一下

Flag2: 这里包含的信息还是非常多的

// 二进制为:0000 0000 0000 0000 0000 0000 0111 1110
static final int SQMASK       = 0x007e;        // max 64 (even) slots
  • m 的值代表 WorkQueue 数组的最大下表
  • m & r 会保证随机数 r 大于 m 的部分不可用
  • m & r & SQMASK 因为 SQMASK 最后一位是 0,最终的结果就会是偶数
  • r != 0 说明当前线程已经初始化过一些内容
  • rs > 0 说明 ForkJoinPool 的 runState 也已经被初始化过

Flag3: 看过 flag2 的描述,你也就很好理解 Flag 3 了,如果是第一次提交任务,必走 Flag 3 的 externalSubmit 方法

externalSubmit

这个方法很长,但没超过 80 行,具体请看方法注释

 //初始化所需要的一切  private void externalSubmit(ForkJoinTask<?> task) {int r;                                    // initialize caller's probe//生成随机数if ((r = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit();r = ThreadLocalRandom.getProbe();}for (;;) {WorkQueue[] ws; WorkQueue q; int rs, m, k;boolean move = false;// 如果线程池的状态为终止状态,则帮助终止if ((rs = runState) < 0) {tryTerminate(false, false);     // help terminatethrow new RejectedExecutionException();}//Flag1: 再判断一次状态是否为初始化,因为在lockRunState过程中有可能状态被别的线程更改了else if ((rs & STARTED) == 0 ||     // initialize((ws = workQueues) == null || (m = ws.length - 1) < 0)) {int ns = 0;//Flag1.1: 加锁rs = lockRunState();try {if ((rs & STARTED) == 0) {// 初始化stealcounter的值(任务窃取计数器,原子变量)U.compareAndSwapObject(this, STEALCOUNTER, null,new AtomicLong());// create workQueues array with size a power of two//取config的低16位(确切说是低15位),获取并行度int p = config & SMASK; // ensure at least 2 slots//Flag1.2: 如果你看过HashMap 的源码,这个就很好理解了,获取2次幂大小int n = (p > 1) ? p - 1 : 1;n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;//初始化 WorkQueue 数组workQueues = new WorkQueue[n];// 标记初始化完成ns = STARTED;}} finally {// 解锁unlockRunState(rs, (rs & ~RSLOCK) | ns);}}//Flag2 上面分析过,取偶数位槽位,将任务放进偶数槽位else if ((q = ws[k = r & m & SQMASK]) != null) {// 对 WorkQueue 加锁if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a = q.array;int s = q.top;// 初始化任务提交标识boolean submitted = false; // initial submission or resizingtry {                      // locked version of push//计算内存偏移量,放任务,更新top值if ((a != null && a.length > s + 1 - q.base) ||(a = q.growArray()) != null) {int j = (((a.length - 1) & s) << ASHIFT) + ABASE;U.putOrderedObject(a, j, task);U.putOrderedInt(q, QTOP, s + 1);//提交任务成功submitted = true;}} finally {//WorkQueue解锁U.compareAndSwapInt(q, QLOCK, 1, 0);}// 任务提交成功了if (submitted) {//自然要唤醒可能存在等待的线程来处理任务了signalWork(ws, q);return;}}//任务提交没成功,可以重新计算随机数,再走一次流程move = true;                   // move on failure}//Flag3: 接Flag2,如果找到的槽位是空,则要初始化一个WorkQueueelse if (((rs = runState) & RSLOCK) == 0) { // create new queueq = new WorkQueue(this, null);// 设置工作队列的窃取线索值q.hint = r;// 如上面 WorkQueue 中config 的介绍,记录当前WorkQueue在WorkQueue[]数组中的值,和队列模式q.config = k | SHARED_QUEUE;// 初始化为 inactive 状态q.scanState = INACTIVE;//加锁rs = lockRunState();           // publish indexif (rs > 0 &&  (ws = workQueues) != null &&k < ws.length && ws[k] == null)ws[k] = q;                 // else terminated//解锁unlockRunState(rs, rs & ~RSLOCK);}elsemove = true;                   // move if busyif (move)r = ThreadLocalRandom.advanceProbe(r);}}

Flag1.1 : 有个细节需要说一下,我们在 Java AQS队列同步器以及ReentrantLock的应用 时提到过使用锁的范式以及为什么要这样用,ForkJoinPool 这里同样遵循这种范式

Lock lock = new ReentrantLock();
lock.lock();
try{...
}finally{lock.unlock();
}

Flag1.2: 简单描述这个过程,就是根据不同的并行度来初始化不同大小的 WorkQueue[]数组,数组大小要求是 2 的 n 次幂,所以给大家个表格直观理解一下并行度和队列容量的关系:

并行度p 容量
1,2 4
3,4 8
5 ~ 8 16
9 ~ 16 32

Flag 1,2,3: 如果你理解了上面这个方法,很显然,第一次执行这个方法内部的逻辑顺序应该是 Flag1——> Flag3——>Flag2

externalSubmit 如果任务成功提交,就会调用 signalWork 方法了

signalWork

这个会用到 ForkJoinPool 的 ctl

//常量值
static final int SS_SEQ       = 1 << 16;       // version countfinal void signalWork(WorkQueue[] ws, WorkQueue q) {long c; int sp, i; WorkQueue v; Thread p;// ctl 小于零,说明活动的线程数 AC 不够while ((c = ctl) < 0L) {                       // too few active// 取ctl的低32位,如果为0,说明没有等待的线程if ((sp = (int)c) == 0) {                  // no idle workers// 取TC的高位,如果不等于0,则说明目前的工作着还没有达到并行度if ((c & ADD_WORKER) != 0L)            // too few workers//添加 Worker,也就是说要创建线程了tryAddWorker(c);break;}//未开始或者已停止,直接跳出if (ws == null)                            // unstarted/terminatedbreak;//i=空闲线程栈顶端所属的工作队列索引if (ws.length <= (i = sp & SMASK))         // terminatedbreak;if ((v = ws[i]) == null)                   // terminatingbreak;//程序执行到这里,说明有空闲线程,计算下一个scanState,增加了版本号,并且调整为 active 状态int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanStateint d = sp - v.scanState;                  // screen CAS//计算下一个ctl的值,活动线程数 AC + 1,通过stackPred取得前一个WorkQueue的索引,重新设置回sp,行程最终的ctl值long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);//更新 ctl 的值if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {v.scanState = vs;                      // activate v//如果有线程阻塞,则调用unpark唤醒即可 if ((p = v.parker) != null)U.unpark(p);break;}//没有任务,直接跳出if (q != null && q.base == q.top)          // no more workbreak;}}

假设程序刚开始执行,那么活动线程数以及总线程数肯定都没达到并行度要求,这时就会调用 tryAddWorker 方法了

tryAddWorker

tryAddWorker 的逻辑就非常简单了,因为是操作线程池,同样会用到 lockRunState/unlockRunState 的锁控制

  private void tryAddWorker(long c) {//初始化添加worker表识boolean add = false;do {//因为要添加Worker,所以AC和TC都要加一long nc = ((AC_MASK & (c + AC_UNIT)) |(TC_MASK & (c + TC_UNIT)));//ctl还没被改变if (ctl == c) {int rs, stop;                 // check if terminatingif ((stop = (rs = lockRunState()) & STOP) == 0)//更新ctl 的值,add = U.compareAndSwapLong(this, CTL, c, nc);unlockRunState(rs, rs & ~RSLOCK);if (stop != 0)break;//ctl值更新成功,开始真正的创建Workerif (add) {createWorker();break;}}// 重新获取ctl,并且没有达到最大线程数,并且没有空闲的线程} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);}

一切顺利,就要调用 createWorker 方法来创建真正的 Worker 了,

createWorker

以上 WorkerQueue 和 ForkJoinTask,上文说的三个重要角色中的最后一个 ForkJoinWorkerThread 终于登场了

  private boolean createWorker() {ForkJoinWorkerThreadFactory fac = factory;Throwable ex = null;ForkJoinWorkerThread wt = null;try {//如果工厂已经存在了,就用factory来创建线程,会去注册线程,这里的this就是ForkJoinPool对象if (fac != null && (wt = fac.newThread(this)) != null) {//启动线程  wt.start();return true;}} catch (Throwable rex) {ex = rex;}//如果创建线程失败,就要逆向注销线程,包括前面对ctl等的操作deregisterWorker(wt, ex);return false;}

Worker 线程是如何与 WorkQueue 对应的,就藏在 fac.newThread(this) 这个方法里面,下面这点代码展示一下调用过程

public ForkJoinWorkerThread newThread(ForkJoinPool pool);static final class DefaultForkJoinWorkerThreadFactoryimplements ForkJoinWorkerThreadFactory {public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {return new ForkJoinWorkerThread(pool);}
}protected ForkJoinWorkerThread(ForkJoinPool pool) {// Use a placeholder until a useful name can be set in registerWorkersuper("aForkJoinWorkerThread");this.pool = pool;this.workQueue = pool.registerWorker(this);
}

很显然核心内容在 registerWorker 方法里面了

registerWorker

WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {this.pool = pool;this.owner = owner;// Place indices in the center of array (that is not yet allocated)base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}  final WorkQueue registerWorker(ForkJoinWorkerThread wt) {UncaughtExceptionHandler handler;//这里线程被设置为守护线程,因为,当只剩下守护线程时,JVM就会推出wt.setDaemon(true);                           // configure thread//填补处理异常的handlerif ((handler = ueh) != null)wt.setUncaughtExceptionHandler(handler);//创建一个WorkQueue,并且设置当前WorkQueue的owner是当前线程WorkQueue w = new WorkQueue(this, wt);int i = 0;                                    // assign a pool index//又用到了config的知识,提取出我们期望的WorkQueue模式int mode = config & MODE_MASK;//加锁int rs = lockRunState();try {WorkQueue[] ws; int n;                    // skip if no array//判断ForkJoinPool的WorkQueue[]都初始化完全if ((ws = workQueues) != null && (n = ws.length) > 0) {//一种魔数计算方式,用以减少冲突int s = indexSeed += SEED_INCREMENT;  // unlikely to collide//假设WorkQueue的初始长度是16,那这里的m就是15,最终目的就是为了得到一个奇数int m = n - 1;//和得到偶数的计算方式一样,得到一个小于m的奇数ii = ((s << 1) | 1) & m;               // odd-numbered indices//如果这个槽位不为空,说明已经被其他线程初始化过了,也就是有冲突,选取别的槽位if (ws[i] != null) {                  // collisionint probes = 0;                   // step by approx half n//步长加2,也就保证step还是奇数int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;//一直遍历,直到找到空槽位,如果都遍历了一遍,那就需要对WorkQueue[]扩容了while (ws[i = (i + step) & m] != null) {if (++probes >= n) {workQueues = ws = Arrays.copyOf(ws, n <<= 1);m = n - 1;probes = 0;}}}//初始化一个随机数w.hint = s;                           // use as random seed//如文章前面所说,config记录索引值和模式w.config = i | mode;//扫描状态也记录为索引值,如文章前面所说,奇数表示为scanning状态w.scanState = i;                      // publication fence//把初始化好的WorkQueue放到ForkJoinPool的WorkQueue[]数组中ws[i] = w;}} finally {//解锁unlockRunState(rs, rs & ~RSLOCK);}//设置worker的前缀名,用于业务区分wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));//返回当前线程创建的WorkQueue,回到上一层调用栈,也就将WorkQueue注册到ForkJoinWorkerThread里面了return w;
}

到这里线程是顺利创建成功了,可是如果线程没有创建成功,就需要 deregisterWorker来做善后工作了

deregisterWorker

deregisterWorker 方法接收刚刚创建的线程引用和异常作为参数,来做善后工作,将 registerWorker 相关工作撤销回来

final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {WorkQueue w = null;if (wt != null && (w = wt.workQueue) != null) {WorkQueue[] ws;                           // remove index from array//获取当前线程注册的索引值int idx = w.config & SMASK;//加锁int rs = lockRunState();//如果奇数槽位都不为空,则清空内容if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)ws[idx] = null;//解锁unlockRunState(rs, rs & ~RSLOCK);}long c;                                       // decrement counts//死循环式CAS更改ctl的值,将前面AC和TC加1的值再减1,ctl就在那里,不增不减do {} while (!U.compareAndSwapLong(this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |(TC_MASK & (c - TC_UNIT)) |(SP_MASK & c))));//清空WorkQueue,将其中的task取消掉if (w != null) {w.qlock = -1;                             // ensure setw.transferStealCount(this);w.cancelAll();                            // cancel remaining tasks}//可能的替换操作for (;;) {                                    // possibly replaceWorkQueue[] ws; int m, sp;//如果线程池终止了,那就跳出循环即可if (tryTerminate(false, false) || w == null || w.array == null ||(runState & STOP) != 0 || (ws = workQueues) == null ||(m = ws.length - 1) < 0)              // already terminatingbreak;//当前线程创建失败,通过sp判断,如果还存在空闲线程,则调用tryRelease来唤醒这个线程,然后跳出if ((sp = (int)(c = ctl)) != 0) {         // wake up replacementif (tryRelease(c, ws[sp & m], AC_UNIT))break;}//如果没空闲线程,并且还没有达到满足并行度的条件,那就得再次尝试创建一个线程,弥补刚刚的失败else if (ex != null && (c & ADD_WORKER) != 0L) {tryAddWorker(c);                      // create replacementbreak;}else                                      // don't need replacementbreak;}if (ex == null)                               // help clean on way out//处理异常ForkJoinTask.helpExpungeStaleExceptions();else                                          // rethrowForkJoinTask.rethrow(ex);
}

总之 deregisterWorker 方法从线程池里注销线程,清空WorkQueue,同时更新ctl,最后做可能的替换,根据线程池的状态决定是否找一个自己的替代者:

  • 有空闲线程,则唤醒一个
  • 没有空闲线程,再次尝试创建一个新的工作线程

deregisterWorker 线程解释清楚了是为了帮助大家完整理解流程,我们继续看 registerWorker 成功后的流程,有了 Worker,接着调用 wt.start() 开始真正的工作

run

ForkJoinWorkerThread 继承自Thread,调用start() 方法后,自然要调用自己重写的 run() 方法

public void run() {if (workQueue.array == null) { // only run onceThrowable exception = null;try {onStart();//Work开始工作,处理workQueue中的任务pool.runWorker(workQueue);} catch (Throwable ex) {exception = ex;} finally {try {onTermination(exception);} catch (Throwable ex) {if (exception == null)exception = ex;} finally {pool.deregisterWorker(this, exception);}}}
}

方法的重点自然是进入到 runWorker

runWorker

runWorker 是很常规的三部曲操作:

  • scan: 通过扫描获取任务
  • runTask:执行扫描到的任务
  • awaitWork:没任务进入等待

具体请看注释

  final void runWorker(WorkQueue w) {//初始化队列,并根据需要是否扩容为原来的2倍w.growArray();                   // allocate queueint seed = w.hint;               // initially holds randomization hintint r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift//死循环更新偏移r,为扫描任务作准备  for (ForkJoinTask<?> t;;) {//扫描任务if ((t = scan(w, r)) != null)//扫描到就执行任务w.runTask(t);//没扫描到就等待,如果等也等不到任务,那就跳出循环别死等了else if (!awaitWork(w, r))break;r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift}}

先来看 scan 方法

scan

ForkJoinPool 的任务窃取机制要来了,如何 steal 的,就藏在scan 方法中

private ForkJoinTask<?> scan(WorkQueue w, int r) {WorkQueue[] ws; int m;//再次验证workQueue[]数组的初始化情况if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {//获取当前扫描状态int ss = w.scanState;                     // initially non-negative//又一个死循环,注意到出口位置就好//和前面逻辑类似,随机一个起始位置,并赋值给kfor (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;int b, n; long c;//如果k槽位不为空if ((q = ws[k]) != null) {//base-top小于零,并且任务q不为空if ((n = (b = q.base) - q.top) < 0 &&(a = q.array) != null) {      // non-empty//获取base的偏移量,赋值给ilong i = (((a.length - 1) & b) << ASHIFT) + ABASE;//从base端获取任务,和前文的描述的steal搭配上了,是从base端stealif ((t = ((ForkJoinTask<?>)U.getObjectVolatile(a, i))) != null &&q.base == b) {//是active状态if (ss >= 0) {//更新WorkQueue中数组i索引位置为空,并且更新base的值if (U.compareAndSwapObject(a, i, t, null)) {q.base = b + 1;//n<-1,说明当前队列还有剩余任务,继续唤醒可能存在的其他线程if (n < -1)       // signal otherssignalWork(ws, q);//直接返回任务return t;}}else if (oldSum == 0 &&   // try to activatew.scanState < 0)tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);}//如果获取任务失败,则准备换位置扫描if (ss < 0)                   // refreshss = w.scanState;r ^= r << 1; r ^= r >>> 3; r ^= r << 10;origin = k = r & m;           // move and rescanoldSum = checkSum = 0;continue;}checkSum += b;}//k一直在变,扫描到最后,如果等于origin,说明已经扫描了一圈还没扫描到任务if ((k = (k + 1) & m) == origin) {    // continue until stableif ((ss >= 0 || (ss == (ss = w.scanState))) &&oldSum == (oldSum = checkSum)) {if (ss < 0 || w.qlock < 0)    // already inactivebreak;//准备inactive当前工作队列int ns = ss | INACTIVE;       // try to inactivate//活动线程数AC减1long nc = ((SP_MASK & ns) |(UC_MASK & ((c = ctl) - AC_UNIT)));w.stackPred = (int)c;         // hold prev stack topU.putInt(w, QSCANSTATE, ns);if (U.compareAndSwapLong(this, CTL, c, nc))ss = ns;elsew.scanState = ss;         // back out}checkSum = 0;}}}return null;
}

如果顺利扫描到任务,那就要调用 runTask 方法来真正的运行这个任务了

runTask

steal 到任务了,接线来开始runTask

      final void runTask(ForkJoinTask<?> task) {if (task != null) {scanState &= ~SCANNING; // mark as busy//Flag1: 记录当前的任务是偷来的,至于如何执行task,是我们写在compute方法中的,我们一会看doExec() 方法(currentSteal = task).doExec();U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GCexecLocalTasks();ForkJoinWorkerThread thread = owner;//累加偷来的数量,亲兄弟明算帐啊,虽然算完也没啥实际意义if (++nsteals < 0)      // collect on overflowtransferStealCount(pool);//任务执行完后,就重新更新scanState为SCANNINGscanState |= SCANNING;if (thread != null)thread.afterTopLevelExec();}}

Flag1: doExec 方法才是真正执行任务的关键,它是链接我们自定义 compute 方法的核心,来看 doExec 方法

doExec

//ForkJoinTask中的抽象方法,RecursiveTask 和 RecursiveAction 都重写了它
protected abstract boolean exec();final int doExec() {int s; boolean completed;if ((s = status) >= 0) {try {completed = exec();} catch (Throwable rex) {return setExceptionalCompletion(rex);}if (completed)s = setCompletion(NORMAL);}return s;
}//RecursiveTask重写的内容,终于看到我们文章开头 demo 中的compute 了
protected final boolean exec() {result = compute();return true;
}

到这里,终于和我们自己重写的compute方法联系到了一起

awaitWork

上面说的是 scan 到了任务,要是没有scan到任务,那就得将当前线程阻塞一下,具体标注在注释中,可以简单了解一下

private boolean awaitWork(WorkQueue w, int r) {if (w == null || w.qlock < 0)                 // w is terminatingreturn false;for (int pred = w.stackPred, spins = SPINS, ss;;) {if ((ss = w.scanState) >= 0)break;else if (spins > 0) {r ^= r << 6; r ^= r >>> 21; r ^= r << 7;if (r >= 0 && --spins == 0) {         // randomize spinsWorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;if (pred != 0 && (ws = workQueues) != null &&(j = pred & SMASK) < ws.length &&//前驱任务队列还在(v = ws[j]) != null &&        // see if pred parking//并且工作队列已经激活,说明任务来了了(v.parker == null || v.scanState >= 0))//继续自旋等一会,别返回falsespins = SPINS;                // continue spinning}}//自旋之后,再次检查工作队列是否终止,若是,退出扫描else if (w.qlock < 0)                     // recheck after spinsreturn false;else if (!Thread.interrupted()) {long c, prevctl, parkTime, deadline;int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);if ((ac <= 0 && tryTerminate(false, false)) ||(runState & STOP) != 0)           // pool terminatingreturn false;if (ac <= 0 && ss == (int)c) {        // is last waiterprevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);int t = (short)(c >>> TC_SHIFT);  // shrink excess sparesif (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))return false;                 // else use timed waitparkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;}elseprevctl = parkTime = deadline = 0L;Thread wt = Thread.currentThread();U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupportw.parker = wt;if (w.scanState < 0 && ctl == c)      // recheck before parkU.park(false, parkTime);U.putOrderedObject(w, QPARKER, null);U.putObject(wt, PARKBLOCKER, null);if (w.scanState >= 0)break;if (parkTime != 0L && ctl == c &&deadline - System.nanoTime() <= 0L &&U.compareAndSwapLong(this, CTL, c, prevctl))return false;                     // shrink pool}}return true;
}

到这里,讲完了ForkJoinPool 的完整流程,以上内容都是从 submission task 作为切入点的。刚刚聊到的 compute 方法,我们按照分治算法范式写了自己的逻辑,具体请看demo,在 compute 中调用了 fork 方法,这就给我们了解 worker task 的机会了,继续来看 fork 方法

fork

Fork 方法的逻辑很简单,如果当前线程是 ForkJoinWorkerThread 类型,也就是说已经通过上文注册的 Worker,那么直接调用 push 方法将 task 放到当前线程拥有的 WorkQueue 中,否则就再调用 externalPush 重走我们已上说的所有逻辑

public final ForkJoinTask<V> fork() {Thread t;if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);elseForkJoinPool.common.externalPush(this);return this;
}//push 方法很简单,这里就不再过多解释了
final void push(ForkJoinTask<?> task) {ForkJoinTask<?>[] a; ForkJoinPool p;int b = base, s = top, n;if ((a = array) != null) {    // ignore if queue removedint m = a.length - 1;     // fenced write for task visibilityU.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);U.putOrderedInt(this, QTOP, s + 1);if ((n = s - b) <= 1) {if ((p = pool) != null)p.signalWork(p.workQueues, this);}else if (n >= m)growArray();}
}

有 fork 就有 join,继续看一下 join 方法()

join

join 的核心调用在 doJoin,这里用到了很多级联三元运算符

public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)reportException(s);return getRawResult();
}private int doJoin() {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;//status,task 的运行状态return (s = status) < 0 ? s :((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?(w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :wt.pool.awaitJoin(w, this, 0L) :externalAwaitDone();
}

我们将 doJoin 方法用我们最熟悉的 if/else 做个改动,这样方便解读

private int doJoin() {int s;Thread t;ForkJoinWorkerThread wt;ForkJoinPool.WorkQueue w;if((s = status) < 0) { // 有结果,直接返回return s;}else {if((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {         // 如果是 ForkJoinWorkerThread Workerif((w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) // 类似上面提到的 scan,但是是专项尝试从本工作队列里取出等待的任务// 取出了任务,就去执行它,并返回结果&& (s = doExec()) < 0) { return s;}else {// 也有可能别的线程把这个任务偷走了,那就执行内部等待方法return wt.pool.awaitJoin(w, this, 0L); }}else { // 如果不是 ForkJoinWorkerThread,执行外部等待方法return externalAwaitDone();}}}

其中 awaitJoin 和 externalAwaitDone 都用到了 Helper(帮助) 和 Compensating(补偿) 两种策略,这两种策略大家完全可以自行阅读了,尤其是 awaitJoin 方法,强烈推荐大家自行阅读,其中 pop 的过程在这里,这里不再展开

externalAwaitDone

 private int externalAwaitDone() {int s = ((this instanceof CountedCompleter) ? // 如果是CountedCompleter类型的任务,执行externalHelpComplete方法ForkJoinPool.common.externalHelpComplete((CountedCompleter<?>) this, 0): ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); // 否则,执行tryExternalUnpush方法,成功则执行任务,否则,返回0if (s >= 0 && (s = status) >= 0) { // 如果任务没有结束,则等待boolean interrupted = false;do {if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { // 将status设置为SIGNAL,等着被notifysynchronized (this) {if (status >= 0) { // 任务未结束try {wait(0L); // 等待} catch (InterruptedException ie) {interrupted = true; // 记录中断标记}} elsenotifyAll(); // 任务已经结束,通知等待的线程}}} while ((s = status) >= 0); // 任务未结束,就一直等待if (interrupted)Thread.currentThread().interrupt(); // 补上中断}return s;}

是CountedCompleter则执行 ForkJoinPool.common.externalHelpComplete((CountedCompleter<?>)否则判断任务是否在任务队列top位置是的话则执行任务,不是则等待

tryExternalUnpush(ForkJoinTask<?> task)

 1     final boolean tryExternalUnpush(ForkJoinTask<?> task) {2         WorkQueue[] ws;3         WorkQueue w;4         ForkJoinTask<?>[] a;5         int m, s;6         int r = ThreadLocalRandom.getProbe();7         if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && (w = ws[m & r & SQMASK]) != null8                 && (a = w.array) != null && (s = w.top) != w.base) {9             long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
10             if (U.compareAndSwapInt(w, QLOCK, 0, 1)) {
11                 if (w.top == s && w.array == a && U.getObject(a, j) == task
12                         && U.compareAndSwapObject(a, j, task, null)) { // 如果task在任务队列的top位置,则返回true; 否则,返回false
13                     U.putOrderedInt(w, QTOP, s - 1);
14                     U.putOrderedInt(w, QLOCK, 0);
15                     return true;
16                 }
17                 U.compareAndSwapInt(w, QLOCK, 1, 0);
18             }
19         }
20         return false;
21     }

是否task在任务队列的top位置,则返回true; 否则,返回false

awaitJoin

   final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {int s = 0;if (task != null && w != null) {ForkJoinTask<?> prevJoin = w.currentJoin; // 记录上一个join的任务U.putOrderedObject(w, QCURRENTJOIN, task); // 设置task为当前join的任务CountedCompleter<?> cc = (task instanceof CountedCompleter) ? (CountedCompleter<?>) task : null;for (;;) {if ((s = task.status) < 0) // 任务已经结束,跳出循环break;if (cc != null)helpComplete(w, cc, 0); // CountedCompleter类型的任务调用helpComplete()方法else if (w.base == w.top || w.tryRemoveAndExec(task)) // 任务队列为空或执行失败(任务被别的线程偷走了),帮助偷取者执行该任务helpStealer(w, task);if ((s = task.status) < 0) // 任务已经结束,跳出循环break;long ms, ns;if (deadline == 0L) // 任务等待时间ms = 0L;else if ((ns = deadline - System.nanoTime()) <= 0L) // 超时退出break;else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)ms = 1L;if (tryCompensate(w)) { // 尝试补偿策略(找一个替代者执行任务,自己在这儿等)task.internalWait(ms); // 补偿成功,等待指定时间U.getAndAddLong(this, CTL, AC_UNIT); // 活跃线程加1}}U.putOrderedObject(w, QCURRENTJOIN, prevJoin); // 设置回前一个join的任务}return s;}

helpStealer 偷取

  private void helpStealer(WorkQueue w, ForkJoinTask<?> task) { // 帮助偷取者(偷取自己任务的线程)执行任务WorkQueue[] ws = workQueues;int oldSum = 0, checkSum, m;if (ws != null && (m = ws.length - 1) >= 0 && w != null && task != null) {do { // restart pointcheckSum = 0; // for stability checkForkJoinTask<?> subtask;WorkQueue j = w, v; // v是子任务的偷取者descent: for (subtask = task; subtask.status >= 0;) { // 从目标任务开始,记录当前Join的任务,也包括偷取者当前Join的任务,递归帮助for (int h = j.hint | 1, k = 0, i;; k += 2) { // 每次跳2个,遍历奇数位索引if (k > m) // 如果遍历一遍没有找到偷取者,跳出循环break descent;if ((v = ws[i = (h + k) & m]) != null) {if (v.currentSteal == subtask) { // 定位到偷取者,更新hint为偷取者索引,方便下次定位j.hint = i;break;}checkSum += v.base; // 若没有定位到,则累加工作队列的base值,继续遍历}}for (;;) { // 帮助偷取者执行任务ForkJoinTask<?>[] a; // 偷取者的任务数组int b;checkSum += (b = v.base); // 累加偷取者的base值ForkJoinTask<?> next = v.currentJoin; // 记录偷取者Join的任务if (subtask.status < 0 || j.currentJoin != subtask || v.currentSteal != subtask) // subtask结束,或者数据不一致了(j.currentJoin != subtask || v.currentSteal != subtask)break descent; // 跳出外层循环重来if (b - v.top >= 0 || (a = v.array) == null) { // 偷取者的任务列表为空if ((subtask = next) == null) // 偷取者的Join任务为空,跳出外层循环break descent;j = v; // 否则,j取v的值(j指向被偷者,v指向偷取者),且subtask指向next Join任务break; // 继续遍历,寻找偷取者的偷取者(递归)}int i = (((a.length - 1) & b) << ASHIFT) + ABASE; // 偷取者的base内存地址ForkJoinTask<?> t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i)); // 获取base位置任务if (v.base == b) {if (t == null) // 任务为空,跳出外层循环(可能被别的线程拿走了)break descent;if (U.compareAndSwapObject(a, i, t, null)) { // poll(从base位置取出任务)v.base = b + 1; // 更新base的值ForkJoinTask<?> ps = w.currentSteal; // 记录调用者之前偷取的任务int top = w.top; // 记录调用者的top值do {U.putOrderedObject(w, QCURRENTSTEAL, t); // 更新currentSteal为刚刚偷取到的任务t.doExec(); // 指向任务} while (task.status >= 0 && w.top != top && (t = w.pop()) != null); // 如果任务未结束,且自己任务队列不为空,优先处理自己队列里的任务U.putOrderedObject(w, QCURRENTSTEAL, ps); // 把之前偷取的任务设置回currentStealif (w.base != w.top) // 自己队列来新任务了,直接返回return;}}}}} while (task.status >= 0 && oldSum != (oldSum = checkSum)); // Join的任务未结束,且任务在流动中,继续帮助执行}}
  • 每次跳2个槽位,遍历奇数位索引,直到定位到偷取者,并记录偷取者的索引(hint = i),方便下次定位。
  • 获取偷取者的任务列表,帮助其执行任务,如果执行过程中发现自己任务列表里有任务,则依次弹出执行。
  • 如果偷取者任务队列为空,则帮助其执行Join任务,寻找偷取者的偷取者,如此往复,加快任务执行。
  • 如果最后发现自己任务队列不为空(base != top),则退出帮助。
  • 最后判断任务task是否结束,如果未结束,且工作队列base和在变动中,说明偷取任务一直在进行,则重复以上操作,加快任务执行

tryCompensate 补偿

private boolean tryCompensate(WorkQueue w) { // 找一个替代者执行任务,自己等待任务结束boolean canBlock;WorkQueue[] ws;long c;int m, pc, sp;if (w == null || w.qlock < 0 // 调用者正在终止|| (ws = workQueues) == null || (m = ws.length - 1) <= 0 // 线程池结束|| (pc = config & SMASK) == 0) // 并行度为0(不可用)canBlock = false; // 不可阻塞else if ((sp = (int) (c = ctl)) != 0) // 如果有空闲线程,释放空闲线程canBlock = tryRelease(c, ws[sp & m], 0L);else { // 没有空闲线程,尝试创建一个int ac = (int) (c >> AC_SHIFT) + pc; // 活跃线程数int tc = (short) (c >> TC_SHIFT) + pc; // 总的线程数int nbusy = 0; // 验证饱和度(Running线程数是否等于总的线程数)for (int i = 0; i <= m; ++i) {WorkQueue v;if ((v = ws[((i << 1) | 1) & m]) != null) { // 遍历两遍奇数索引槽位if ((v.scanState & SCANNING) != 0)break;++nbusy;}}if (nbusy != (tc << 1) || ctl != c) // 遍历两遍奇数索引槽位,tc需要乘以2canBlock = false; // 并不是所有的线程都在干活,或者数据(ctl)失效,不要阻塞else if (tc >= pc && ac > 1 && w.isEmpty()) { // 总线程数大于并行度 && 活动线程数大于1 && 调用者任务队列为空long nc = ((AC_MASK & (c - AC_UNIT)) | (~AC_MASK & c)); // AC - 1canBlock = U.compareAndSwapLong(this, CTL, c, nc);} else if (tc >= MAX_CAP || (this == common && tc >= pc + commonMaxSpares)) // TC达到最大容量throw new RejectedExecutionException("Thread limit exceeded replacing blocked worker");else {boolean add = false;int rs;long nc = ((AC_MASK & c) | (TC_MASK & (c + TC_UNIT))); // 总的线程数加1,活跃线程数不变(补偿)if (((rs = lockRunState()) & STOP) == 0)add = U.compareAndSwapLong(this, CTL, c, nc);unlockRunState(rs, rs & ~RSLOCK);canBlock = add && createWorker(); // 创建工作线程}}return canBlock;}
  • 调用者队列不为空,并且有空闲工作线程,唤醒空闲线程(tryRelease)
  • 线程池未停止,活跃线程数不足,新建一个工作线程(createWorker)
  • 工作队列正在停止或线程池停止,总线程数大于并行度 && 活动线程数大于1 && 调用者任务队列为空,不需要补偿

forkJoin源码解读相关推荐

  1. Bert系列(二)——源码解读之模型主体

    本篇文章主要是解读模型主体代码modeling.py.在阅读这篇文章之前希望读者们对bert的相关理论有一定的了解,尤其是transformer的结构原理,网上的资料很多,本文内容对原理部分就不做过多 ...

  2. Bert系列(三)——源码解读之Pre-train

    https://www.jianshu.com/p/22e462f01d8c pre-train是迁移学习的基础,虽然Google已经发布了各种预训练好的模型,而且因为资源消耗巨大,自己再预训练也不现 ...

  3. linux下free源码,linux命令free源码解读:Procps free.c

    linux命令free源码解读 linux命令free源码解读:Procps free.c 作者:isayme 发布时间:September 26, 2011 分类:Linux 我们讨论的是linux ...

  4. nodeJS之eventproxy源码解读

    1.源码缩影 !(function (name, definition) { var hasDefine = typeof define === 'function', //检查上下文环境是否为AMD ...

  5. PyTorch 源码解读之即时编译篇

    点击上方"AI遇见机器学习",选择"星标"公众号 重磅干货,第一时间送达 作者丨OpenMMLab 来源丨https://zhuanlan.zhihu.com/ ...

  6. Alamofire源码解读系列(九)之响应封装(Response)

    本篇主要带来Alamofire中Response的解读 前言 在每篇文章的前言部分,我都会把我认为的本篇最重要的内容提前讲一下.我更想同大家分享这些顶级框架在设计和编码层次究竟有哪些过人的地方?当然, ...

  7. Feflow 源码解读

    Feflow 源码解读 Feflow(Front-end flow)是腾讯IVWEB团队的前端工程化解决方案,致力于改善多类型项目的开发流程中的规范和非业务相关的问题,可以让开发者将绝大部分精力集中在 ...

  8. spring-session源码解读 sesion

    2019独角兽企业重金招聘Python工程师标准>>> spring-session源码解读 sesion 博客分类: java spring 摘要: session通用策略 Ses ...

  9. 前端日报-20160527-underscore 源码解读

    underscore 源码解读 API文档浏览器 JavaScript 中加号操作符细节 抛弃 jQuery,拥抱原生 JS 从 0 开始学习 GitHub 系列之「加入 GitHub」 js实现克隆 ...

最新文章

  1. leetcode算法题--买卖股票的最佳时机 II
  2. 【MFC】在CHtmlView中准确判断页面加载完成
  3. cdn刷新api_闲话 CDN
  4. JLupin Next Server乍一看
  5. 视觉中的经典图像特征小结(一): 颜色直方图, HOG, LBP
  6. JMeter4.0以上 分布式测试报错 server failed start Listen failed on port
  7. 搜狗输入法为什么按空格出字 搜狗输入法按空格出字怎么设置
  8. 数字化改革看“浙”里 CDEC2021中国数字智能生态大会杭州站举行
  9. 正则匹配减号_2020年这些正则应该被收藏(64条)
  10. 自学Python能干些什么副业
  11. @ARGV:perl命令行参数
  12. poj 1251 Jungle Roads
  13. 2020辅警考试计算机知识题,2019年辅警考试题库:计算机概述-计算机软件系统
  14. java 解析dataset_C# DataSet用法的详细解析|C#教程
  15. 【图形学实验】Loop Subdivision与Modified Butterfly Subdivision
  16. pyaudio 声音处理
  17. 关于谷歌不兼容showModalDialog的解决方案
  18. matlab的四个取整函数!
  19. windows server 2008 r2集成USB3.0
  20. VM虚拟机安装CentOS7添加硬盘扩展存储空间的方法

热门文章

  1. 量化投资与数据分析一: 如何用PYTHON下载WIND数据并转化成dataframe格式 分享
  2. 奥运排行榜(25 分)
  3. ipad无线无法连接到服务器,ipad无法连接无线路由器原因有哪些【解决方法】
  4. 钢条切割算法python实现
  5. Qt Style Sheets(QSS)参考
  6. SecureCRT 多个会话显示在同一窗口
  7. 全国计算机注册时密码为什么老是错误,电脑密码正确却显示密码错误怎么办
  8. ffmpeg libx264 h264_nvenc 编码参数解析
  9. iOS 关于leak检测内存问题的使用
  10. 最新android 电脑系统,你的Windows电脑即将可以运行最新版安卓系统