split来源:

    1、Memstore flush时直接CompactSplitThread.requestSplit。
    2、HBaseAdmin客户端发起的请求,HRegionServer收到后,转CompactSplitThread.requestSplit处理。
    下面介绍HBaseAdmin发起请求的实现细节:
    client通过界面发起action请求,调用org.apache.hadoop.hbase.generated.master类中的_jspService方法,
    下面为方法中的代码片段:
1
...

2
    

3
if (action.equals("split")) {

4
    if (key != null && key.length() > 0) {

5
        hbadmin.split(key);

6
    } else {

7
        hbadmin.split(fqtn);

8
    }

9
    out.write(" Split request accepted. ");

10

11
} 

12
...

可以看到如果发起split请求,会调用HbaseAdmin的split方法,下面进入到org.apache.hadoop.hbase.client.HbaseAdmin的split方法中,此时的第二个参数默认为null。
1
/**

2
   * Split a table or an individual region.

3
   * Asynchronous operation.

4
   */

5
public void split(final byte[] tableNameOrRegionName,

6
          final byte [] splitPoint) throws IOException, InterruptedException {

7
    CatalogTracker ct = getCatalogTracker();

8
    try {

9
        Pair<HRegionInfo, ServerName> regionServerPair = getRegion(tableNameOrRegionName, ct);

10
        if (regionServerPair != null) {

11
            if (regionServerPair.getSecond() == null) {

12
                throw new NoServerForRegionException(Bytes.toStringBinary(tableNameOrRegionName));

13
            } else {

14
                split(regionServerPair.getSecond(), regionServerPair.getFirst(), splitPoint);

15
            }

16
        } 

17
        ...

18
}

接着进入到split(pair.getSecond(), pair.getFirst(), splitPoint)方法中去,在此方法中获取到相应的regionserver,并在ProtobufUtil内部调用此regionserver的split方法:
1
private void split(final ServerName sn, final HRegionInfo hri,

2
                   byte[] splitPoint) throws IOException {

3
    if (hri.getStartKey() != null && splitPoint != null &&

4
        Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) {

5
        throw new IOException("should not give a splitkey which equals to startkey!");

6
    }

7
    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);

8
    ProtobufUtil.split(admin, hri, splitPoint);

9
}

下面ProtobufUtil的split方法内:
1
/**

2
   * A helper to split a region using admin protocol.

3
   */

4
public static void split(final AdminService.BlockingInterface admin,

5
                         final HRegionInfo hri, byte[] splitPoint) throws IOException {

6
    SplitRegionRequest request = RequestConverter.buildSplitRegionRequest(hri.getRegionName(), splitPoint);

7
    try {

8
        admin.splitRegion(null, request);

9
    } catch (ServiceException se) {

10
        throw ProtobufUtil.getRemoteException(se);

11
    }

12
}

再进入BlockingInterface的splitRegion方法内如下:
1
public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse splitRegion(

2
    com.google.protobuf.RpcController controller,

3
    org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest request)

4
    throws com.google.protobuf.ServiceException;

    可以看到HRegionServer类实现了BlockingInterface接口,并且实现了相应的splitRegion方法。
1
/**

2
 * HRegionServer makes a set of HRegions available to clients. It checks in with

3
 * the HMaster. There are many HRegionServers in a single HBase deployment.

4
 */

5
@InterfaceAudience.Private

6
@SuppressWarnings("deprecation")

7
public class HRegionServer implements ClientProtos.ClientService.BlockingInterface,

8
  AdminProtos.AdminService.BlockingInterface, Runnable, RegionServerServices,

9
  HBaseRPCErrorHandler, LastSequenceId {

10
    

11
  ...

12
      

13
  /**

14
   * Split a region on the region server.

15
   *

16
   * @param controller the RPC controller

17
   * @param request the request

18
   * @throws ServiceException

19
   */

20
  @Override

21
  @QosPriority(priority=HConstants.HIGH_QOS)

22
  public SplitRegionResponse splitRegion(final RpcController controller,

23
      final SplitRegionRequest request) throws ServiceException {

24
    try {

25
      checkOpen();

26
      requestCount.increment();

27
      HRegion region = getRegion(request.getRegion());

28
      region.startRegionOperation(Operation.SPLIT_REGION);

29
      LOG.info("Splitting " + region.getRegionNameAsString());

30
      long startTime = EnvironmentEdgeManager.currentTimeMillis();

31
      HRegion.FlushResult flushResult = region.flushcache();

32
      if (flushResult.isFlushSucceeded()) {

33
        long endTime = EnvironmentEdgeManager.currentTimeMillis();

34
        metricsRegionServer.updateFlushTime(endTime - startTime);

35
      }

36
      byte[] splitPoint = null;

37
      if (request.hasSplitPoint()) {

38
        splitPoint = request.getSplitPoint().toByteArray();

39
      }

40
      region.forceSplit(splitPoint);

41
      //CompactSplitThread发起split请求,region.checkSplit()会获取到split的midkey

42
      compactSplitThread.requestSplit(region, region.checkSplit(), RpcServer.getRequestUser());

43
      ...

44
 

45
  }

46
}

    首先进入到region.checkSplit()内部:
1
/**

2
   * Return the splitpoint. null indicates the region isn't splittable

3
   * If the splitpoint isn't explicitly specified, it will go over the stores

4
   * to find the best splitpoint. Currently the criteria of best splitpoint

5
   * is based on the size of the store.

6
   */

7
public byte[] checkSplit() {

8
    ...

9
    if (!splitPolicy.shouldSplit()) {

10
        return null;

11
    }

12

13
    byte[] ret = splitPolicy.getSplitPoint();

14
    ...

15
    return ret;

16
}

   进入getSplitPoint内,此方法会获取到regionserver中最大的一个store,并拿到该store的:
1
/**

2
   * @return the key at which the region should be split, or null

3
   * if it cannot be split. This will only be called if shouldSplit

4
   * previously returned true.

5
   */

6
protected byte[] getSplitPoint() {

7
    byte[] explicitSplitPoint = this.region.getExplicitSplitPoint();

8
    if (explicitSplitPoint != null) {

9
        return explicitSplitPoint;

10
    }

11
    Map<byte[], Store> stores = region.getStores();

12

13
    byte[] splitPointFromLargestStore = null;

14
    long largestStoreSize = 0;

15
    for (Store s : stores.values()) {

16
        byte[] splitPoint = s.getSplitPoint();

17
        long storeSize = s.getSize();

18
        if (splitPoint != null && largestStoreSize < storeSize) {

19
            splitPointFromLargestStore = splitPoint;

20
            largestStoreSize = storeSize;

21
        }

22
    }

23

24
    return splitPointFromLargestStore;

25
}

    进到HStore的getSplitPoint方法内:
1
@Override

2
public byte[] getSplitPoint() {

3
    this.lock.readLock().lock();

4
    try {

5
        // Should already be enforced by the split policy!

6
        assert !this.getRegionInfo().isMetaRegion();

7
        // Not split-able if we find a reference store file present in the store.

8
        if (hasReferences()) {

9
            return null;

10
        }

11
        return this.storeEngine.getStoreFileManager().getSplitPoint();

12
    } catch(IOException e) {

13
        LOG.warn("Failed getting store size for " + this, e);

14
    } finally {

15
        this.lock.readLock().unlock();

16
    }

17
    return null;

18
}

   接着进入到StoreFile的getFileSplitPoint方法内,此方法中主要是返回最大store的最大一个storefile的中间一个block的第一个key:
1
@Override

2
public final byte[] getSplitPoint() throws IOException {

3
    if (this.storefiles.isEmpty()) {

4
        return null;

5
    }

6
    return StoreUtils.getLargestFile(this.storefiles).getFileSplitPoint(this.kvComparator);

7
}

1
/**

2
   * Gets the approximate mid-point of this file that is optimal for use in splitting it.

3
   * @param comparator Comparator used to compare KVs.

4
   * @return The split point row, or null if splitting is not possible, or reader is null.

5
   */

6
@SuppressWarnings("deprecation")

7
byte[] getFileSplitPoint(KVComparator comparator) throws IOException {

8
    if (this.reader == null) {

9
        LOG.warn("Storefile " + this + " Reader is null; cannot get split point");

10
        return null;

11
    }

12
    // Get first, last, and mid keys.  Midkey is the key that starts block

13
    // in middle of hfile.  Has column and timestamp.  Need to return just

14
    // the row we want to split on as midkey.

15
    byte [] midkey = this.reader.midkey();

16
    if (midkey != null) {

17
        KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);

18
        byte [] fk = this.reader.getFirstKey();

19
        KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);

20
        byte [] lk = this.reader.getLastKey();

21
        KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);

22
        // if the midkey is the same as the first or last keys, we cannot (ever) split this region.

23
        if (comparator.compareRows(mk, firstKey) == 0 || comparator.compareRows(mk, lastKey) == 0) {

24
            if (LOG.isDebugEnabled()) {

25
                LOG.debug("cannot split because midkey is the same as first or last row");

26
            }

27
            return null;

28
        }

29
        return mk.getRow();

30
    }

31
    return null;

32
}

HBase还规定,如果定位到的rowkey是整个文件的首个rowkey或者最后一个rowkey的话,就认为没有切分点。

什么情况下会出现没有切分点的场景呢?最常见的就是一个文件只有一个block,执行split的时候就会发现无法切分。很多新同学在测试split的时候往往都是新建一张新表,然后往新表中插入几条数据并执行一下flush,再执行split,奇迹般地发现数据表并没有真正执行切分。原因就在这里,这个时候仔细的话你翻看debug日志是可以看到这样的日志滴:

    再回到之前CompactSplitThread的requestSplit方法内:
1
/*

2
   * The User parameter allows the split thread to assume the correct user identity

3
   */

4
public synchronized void requestSplit(final HRegion r, byte[] midKey, User user) {

5
    if (midKey == null) {

6
        LOG.debug("Region " + r.getRegionNameAsString() +

7
                  " not splittable because midkey=null");

8
        if (r.shouldForceSplit()) {

9
            r.clearSplit();

10
        }

11
        return;

12
    }

13
    try {

14
        //线程池中调用SplitRequest类的doSplitting方法

15
        this.splits.execute(new SplitRequest(r, midKey, this.server, user));

16
        if (LOG.isDebugEnabled()) {

17
            LOG.debug("Split requested for " + r + ".  " + this);

18
        }

19
    } catch (RejectedExecutionException ree) {

20
        LOG.info("Could not execute split for " + r, ree);

21
    }

22
}

    下面进入到SplitRequest的soSplitting方法内,此方法中主要有两个阶段prepare方法和execute方法,在这之前会把midkey赋值给SplitTransaction类的splitrow变量,这个变量会在后面创建reference files的时候用到,用于比较store:
1
private void doSplitting(User user) {

2
    boolean success = false;

3
    server.getMetrics().incrSplitRequest();

4
    long startTime = EnvironmentEdgeManager.currentTimeMillis();

5
    SplitTransaction st = new SplitTransaction(parent, midKey);

6
    try {

7
      //acquire a shared read lock on the table, so that table schema modifications

8
      //do not happen concurrently

9
      tableLock = server.getTableLockManager().readLock(parent.getTableDesc().getTableName()

10
          , "SPLIT_REGION:" + parent.getRegionNameAsString());

11
      try {

12
        tableLock.acquire();

13
      } catch (IOException ex) {

14
        tableLock = null;

15
        throw ex;

16
      }

17

18
      // If prepare does not return true, for some reason -- logged inside in

19
      // the prepare call -- we are not ready to split just now. Just return.

20
      if (!st.prepare()) return;

21
      try {

22
        st.execute(this.server, this.server, user);

23
        success = true;

24
      } catch (Exception e) {

25
          ...

26
    } catch (IOException ex) {

27
      LOG.error("Split failed " + this, RemoteExceptionHandler.checkIOException(ex));

28
      server.checkFileSystem();

29
    } finally {

30
      if (this.parent.getCoprocessorHost() != null) {

31
        try {

32
          this.parent.getCoprocessorHost().postCompleteSplit();

33
        } catch (IOException io) {

34
          LOG.error("Split failed " + this,

35
              RemoteExceptionHandler.checkIOException(io));

36
        }

37
      }

38
      if (parent.shouldForceSplit()) {

39
        parent.clearSplit();

40
      }

41
      releaseTableLock();

42
      long endTime = EnvironmentEdgeManager.currentTimeMillis();

43
      // Update regionserver metrics with the split transaction total running time

44
      server.getMetrics().updateSplitTime(endTime - startTime);

45
      if (success) {

46
        ...

47
    }

48
}

  关于SplitTransaction的prepare方法主要是用于初始化两个子region:
1
/**

2
   * Does checks on split inputs.

3
   * @return <code>true</code> if the region is splittable else

4
   * <code>false</code> if it is not (e.g. its already closed, etc.).

5
   */

6
public boolean prepare() {

7
    if (!this.parent.isSplittable()) return false;

8
    // Split key can be null if this region is unsplittable; i.e. has refs.

9
    if (this.splitrow == null) return false;

10
    HRegionInfo hri = this.parent.getRegionInfo();

11
    parent.prepareToSplit();

12
    // Check splitrow.

13
    byte [] startKey = hri.getStartKey();

14
    byte [] endKey = hri.getEndKey();

15
    if (Bytes.equals(startKey, splitrow) ||

16
        !this.parent.getRegionInfo().containsRow(splitrow)) {

17
        LOG.info("Split row is not inside region key range or is equal to " +

18
                 "startkey: " + Bytes.toStringBinary(this.splitrow));

19
        return false;

20
    }

21
    long rid = getDaughterRegionIdTimestamp(hri);

22
    this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);

23
    this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);

24
    this.journal.add(new JournalEntry(JournalEntryType.PREPARED));

25
    return true;

26
}

    重点在于SplitTransaction的execute方法内部,在执行完createDaughters方法,会执行stepsAfterPONR方法,这个方法内部会执行openDaughters方法,在这个方法中会调用openDaughterRegion方法,然后对子region做一些初始化操作:
1
/**

2
   * Run the transaction.

3
   * @param server Hosting server instance.  Can be null when testing (won't try

4
   * and update in zk if a null server)

5
   * @param services Used to online/offline regions.

6
   * @throws IOException If thrown, transaction failed.

7
   *          Call {@link #rollback(Server, RegionServerServices)}

8
   * @return Regions created

9
   * @throws IOException

10
   * @see #rollback(Server, RegionServerServices)

11
   */

12
public PairOfSameType<HRegion> execute(final Server server,

13
                                       final RegionServerServices services, User user)

14
    throws IOException {

15
    useZKForAssignment =

16
        server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration());

17
    PairOfSameType<HRegion> regions = createDaughters(server, services, user);

18
    ...

19
    return stepsAfterPONR(server, services, regions, user);

20
}

  进入到SplitTransaction的createDaughters方法内,获取parent region的写锁:
1
/* package */PairOfSameType<HRegion> createDaughters(final Server server,

2
      final RegionServerServices services, User user) throws IOException {

3
    ...

4

5
    PairOfSameType<HRegion> daughterRegions = stepsBeforePONR(server, services, testing);

6

7
    final List<Mutation> metaEntries = new ArrayList<Mutation>();

8
    boolean ret = false;

9
    

10
    ...

11
    return daughterRegions;

12
}

   进入到SplitTransaction的stepsBeforePONR方法内,在这个方法内会通过SplitTransaction的createNodeSplitting方法标识父region在splitting,在journal中加入开始split的日志,在zookeeper上创建splitting文件夹。并且通过splitStoreFiles方法创建reference files去引用两个子region中的hfile在父region中的位置。
1
public PairOfSameType<HRegion> stepsBeforePONR(final Server server,

2
      final RegionServerServices services, boolean testing) throws IOException {

3
    // Set ephemeral SPLITTING znode up in zk.  Mocked servers sometimes don't

4
    // have zookeeper so don't do zk stuff if server or zookeeper is null

5
    if (server != null && server.getZooKeeper() != null && useZKForAssignment) {

6
        try {

7
            createNodeSplitting(server.getZooKeeper(), parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);

8
        } catch (KeeperException e) {

9
            throw new IOException("Failed creating PENDING_SPLIT znode on " +

10
                                  this.parent.getRegionNameAsString(), e);

11
        }

12
    } else if (services != null && !useZKForAssignment) {

13
        if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT, parent.getRegionInfo(), hri_a, hri_b)) {

14
            throw new IOException("Failed to get ok from master to split " + parent.getRegionNameAsString());

15
        }

16
    }

17
    this.journal.add(new JournalEntry(JournalEntryType.SET_SPLITTING_IN_ZK));

18
    if (server != null && server.getZooKeeper() != null && useZKForAssignment) {

19
        // After creating the split node, wait for master to transition it

20
        // from PENDING_SPLIT to SPLITTING so that we can move on. We want master

21
        // knows about it and won't transition any region which is splitting.

22
        znodeVersion = getZKNode(server, services);

23
    }

24

25
    this.parent.getRegionFileSystem().createSplitsDir();

26
    this.journal.add(new JournalEntry(JournalEntryType.CREATE_SPLIT_DIR));

27

28
    Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;

29
    Exception exceptionToThrow = null;

30
    try{

31
        // 获取到父region的所有sotre

32
        hstoreFilesToSplit = this.parent.close(false);

33
    } catch (Exception e) {

34
        exceptionToThrow = e;

35
    }

36
    ...

37
    this.journal.add(new JournalEntry(JournalEntryType.OFFLINED_PARENT));

38

39
    // splitStoreFiles creates daughter region dirs under the parent splits dir

40
    // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will

41
    // clean this up.

42
    Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);

43

44
    // Log to the journal that we are creating region A, the first daughter

45
    // region.  We could fail halfway through.  If we do, we could have left

46
    // stuff in fs that needs cleanup -- a storefile or two.  Thats why we

47
    // add entry to journal BEFORE rather than AFTER the change.

48
    this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_A_CREATION));

49
    assertReferenceFileCount(expectedReferences.getFirst(),

50
                             this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));

51
    HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);

52
    assertReferenceFileCount(expectedReferences.getFirst(),

53
            new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));

54

55
    // Ditto

56
    this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_B_CREATION));

57
    assertReferenceFileCount(expectedReferences.getSecond(), this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));

58
    HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);

59
    assertReferenceFileCount(expectedReferences.getSecond(),

60
            new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));

61

62
    return new PairOfSameType<HRegion>(a, b);

63
}

 进入splitStoreFiles内部,其中会遍历整个hstoreFilesToSplit,这里面是父region的所有store,这个值是从上一层传过来的:
1
/**

2
   * Creates reference files for top and bottom half of the

3
   * @param hstoreFilesToSplit map of store files to create half file references for.

4
   * @return the number of reference files that were created.

5
   * @throws IOException

6
   */

7
private Pair<Integer, Integer> splitStoreFiles(final Map<byte[], 

8
      List<StoreFile>> hstoreFilesToSplit) throws IOException {

9
    if (hstoreFilesToSplit == null) {

10
        // Could be null because close didn't succeed -- for now consider it fatal

11
        throw new IOException("Close returned empty list of StoreFiles");

12
    }

13
    // The following code sets up a thread pool executor with as many slots as

14
    // there's files to split. It then fires up everything, waits for

15
    // completion and finally checks for any exception

16
    int nbFiles = 0;

17
    for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {

18
        nbFiles += entry.getValue().size();

19
    }

20
    if (nbFiles == 0) {

21
        // no file needs to be splitted.

22
        return new Pair<Integer, Integer>(0,0);

23
    }

24
    // Default max #threads to use is the smaller of table's configured number of blocking store

25
    // files or the available number of logical cores.

26
    int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY,

27
                HStore.DEFAULT_BLOCKING_STOREFILE_COUNT),

28
                Runtime.getRuntime().availableProcessors());

29
    // Max #threads is the smaller of the number of storefiles or the default max determined above.

30
    int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX, defMaxThreads), nbFiles);

31
    LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent +

32
             " using " + maxThreads + " threads");

33
    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();

34
    builder.setNameFormat("StoreFileSplitter-%1$d");

35
    ThreadFactory factory = builder.build();

36
    ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory);

37
    List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);

38

39
    // Split each store file.

40
    for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {

41
        for (StoreFile sf: entry.getValue()) {

42
            StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);

43
            futures.add(threadPool.submit(sfs));

44
        }

45
    }

46
    // Shutdown the pool

47
    threadPool.shutdown();

48

49
    // Wait for all the tasks to finish

50
    try {

51
        boolean stillRunning = !threadPool.awaitTermination(this.fileSplitTimeout, TimeUnit.MILLISECONDS);

52
        if (stillRunning) {

53
            threadPool.shutdownNow();

54
            // wait for the thread to shutdown completely.

55
            while (!threadPool.isTerminated()) {

56
                Thread.sleep(50);

57
            }

58
            throw new IOException("Took too long to split the" + " files and create the references, aborting split");

59
        }

60
    } catch (InterruptedException e) {

61
        throw (InterruptedIOException)new InterruptedIOException().initCause(e);

62
    }

63

64
    int created_a = 0;

65
    int created_b = 0;

66
    // Look for any exception

67
    for (Future<Pair<Path, Path>> future : futures) {

68
        try {

69
            Pair<Path, Path> p = future.get();

70
            created_a += p.getFirst() != null ? 1 : 0;

71
            created_b += p.getSecond() != null ? 1 : 0;

72
        } catch (InterruptedException e) {

73
            throw (InterruptedIOException) new InterruptedIOException().initCause(e);

74
        } catch (ExecutionException e) {

75
            throw new IOException(e);

76
        }

77
    }

78

79
    if (LOG.isDebugEnabled()) {

80
        LOG.debug("Split storefiles for region " + this.parent + " Daugther A: " + created_a

81
                  + " storefiles, Daugther B: " + created_b + " storefiles.");

82
    }

83
    return new Pair<Integer, Integer>(created_a, created_b);

84
}

回到上一层hstoreFilesToSplit的值的来源,进入this.parent.close(false)方法,最终这个doClose方法在HRegion内部,所有的store来源于HRegion内的 stores成员变量,如下所示:
1
  protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(Bytes.BYTES_RAWCOMPARATOR);
1
private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)

2
    throws IOException {

3
    if (isClosed()) {

4
        LOG.warn("Region " + this + " already closed");

5
        return null;

6
    }

7

8
    if (coprocessorHost != null) {

9
        status.setStatus("Running coprocessor pre-close hooks");

10
        this.coprocessorHost.preClose(abort);

11
    }

12

13
    status.setStatus("Disabling compacts and flushes for region");

14
    synchronized (writestate) {

15
        // Disable compacting and flushing by background threads for this

16
        // region.

17
        writestate.writesEnabled = false;

18
        LOG.debug("Closing " + this + ": disabling compactions & flushes");

19
        waitForFlushesAndCompactions();

20
    }

21
    // If we were not just flushing, is it worth doing a preflush...one

22
    // that will clear out of the bulk of the memstore before we put up

23
    // the close flag?

24
    if (!abort && worthPreFlushing()) {

25
        status.setStatus("Pre-flushing region before close");

26
        LOG.info("Running close preflush of " + this.getRegionNameAsString());

27
        try {

28
            internalFlushcache(status);

29
        } catch (IOException ioe) {

30
            // Failed to flush the region. Keep going.

31
            status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());

32
        }

33
    }

34

35
    // block waiting for the lock for closing

36
    lock.writeLock().lock();

37
    this.closing.set(true);

38
    status.setStatus("Disabling writes for close");

39
    try {

40
        if (this.isClosed()) {

41
            status.abort("Already got closed by another process");

42
            // SplitTransaction handles the null

43
            return null;

44
        }

45
        LOG.debug("Updates disabled for region " + this);

46
        // Don't flush the cache if we are aborting

47
        if (!abort) {

48
            int flushCount = 0;

49
            while (this.getMemstoreSize().get() > 0) {

50
                try {

51
                    if (flushCount++ > 0) {

52
                        int actualFlushes = flushCount - 1;

53
                        if (actualFlushes > 5) {

54
                            // If we tried 5 times and are unable to clear memory, abort

55
                            // so we do not lose data

56
                            throw new DroppedSnapshotException("Failed clearing memory after " +

57
                                  actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));

58
                        }

59
                        LOG.info("Running extra flush, " + actualFlushes + " (carrying snapshot?) " + this);

60
                    }

61
                    internalFlushcache(status);

62
                } catch (IOException ioe) {

63
                    status.setStatus("Failed flush " + this + ", putting online again");

64
                    synchronized (writestate) {

65
                        writestate.writesEnabled = true;

66
                    }

67
                    // Have to throw to upper layers.  I can't abort server from here.

68
                    throw ioe;

69
                }

70
            }

71
        }

72

73
        Map<byte[], List<StoreFile>> result =

74
            new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);

75
        if (!stores.isEmpty()) {

76
            // initialize the thread pool for closing stores in parallel.

77
            ThreadPoolExecutor storeCloserThreadPool =

78
                getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());

79
            CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =

80
                new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);

81

82
            // close each store in parallel

83
            for (final Store store : stores.values()) {

84
                long flushableSize = store.getFlushableSize();

85
                if (!(abort || flushableSize == 0)) {

86
                    getRegionServerServices().abort("Assertion failed while closing store "

87
                         + getRegionInfo().getRegionNameAsString() + " " + store

88
                         + ". flushableSize expected=0, actual= " + flushableSize

89
                         + ". Current memstoreSize=" + getMemstoreSize() + ". Maybe a coprocessor "

90
                         + "operation failed and left the memstore in a partially updated state.", null);

91
                }

92
                completionService.submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {

93
                        @Override

94
                        public Pair<byte[], Collection<StoreFile>> call() throws IOException {

95
                            return new Pair<byte[], Collection<StoreFile>>(

96
                                store.getFamily().getName(), store.close());

97
                        }

98
                    });

99
            }

100
            try {

101
                for (int i = 0; i < stores.size(); i++) {

102
                    Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();

103
                    Pair<byte[], Collection<StoreFile>> storeFiles = future.get();

104
                    List<StoreFile> familyFiles = result.get(storeFiles.getFirst());

105
                    if (familyFiles == null) {

106
                        familyFiles = new ArrayList<StoreFile>();

107
                        result.put(storeFiles.getFirst(), familyFiles);

108
                    }

109
                    familyFiles.addAll(storeFiles.getSecond());

110
                }

111
            } catch (InterruptedException e) {

112
                throw (InterruptedIOException)new InterruptedIOException().initCause(e);

113
            } catch (ExecutionException e) {

114
                throw new IOException(e.getCause());

115
            } finally {

116
                storeCloserThreadPool.shutdownNow();

117
            }

118
        }

119
        this.closed.set(true);

120
        if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());

121
        if (coprocessorHost != null) {

122
            status.setStatus("Running coprocessor post-close hooks");

123
            this.coprocessorHost.postClose(abort);

124
        }

125
        if ( this.metricsRegion != null) {

126
            this.metricsRegion.close();

127
        }

128
        if ( this.metricsRegionWrapper != null) {

129
            Closeables.closeQuietly(this.metricsRegionWrapper);

130
        }

131
        status.markComplete("Closed");

132
        LOG.info("Closed " + this);

133
        return result;

134
    } finally {

135
        lock.writeLock().unlock();

136
    }

137
}

138

 sotres这个成员变量会在初始化regionStores的时候赋值,此方法在HRegion类中:
1
private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)

2
    throws IOException, UnsupportedEncodingException {

3
    // Load in all the HStores.

4

5
    long maxSeqId = -1;

6
    // initialized to -1 so that we pick up MemstoreTS from column families

7
    long maxMemstoreTS = -1;

8

9
    if (!htableDescriptor.getFamilies().isEmpty()) {

10
        // initialize the thread pool for opening stores in parallel.

11
        ThreadPoolExecutor storeOpenerThreadPool =

12
            getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());

13
        CompletionService<HStore> completionService =

14
            new ExecutorCompletionService<HStore>(storeOpenerThreadPool);

15

16
        // initialize each store in parallel

17
        for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {

18
            status.setStatus("Instantiating store for column family " + family);

19
            completionService.submit(new Callable<HStore>() {

20
                @Override

21
                public HStore call() throws IOException {

22
                    return instantiateHStore(family);

23
                }

24
            });

25
        }

26
        boolean allStoresOpened = false;

27
        try {

28
            for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {

29
                Future<HStore> future = completionService.take();

30
                HStore store = future.get();

31
                this.stores.put(store.getColumnFamilyName().getBytes(), store);

32

33
                long storeMaxSequenceId = store.getMaxSequenceId();

34
                maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),

35
                                     storeMaxSequenceId);

36
                if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {

37
                    maxSeqId = storeMaxSequenceId;

38
                }

39
                long maxStoreMemstoreTS = store.getMaxMemstoreTS();

40
                if (maxStoreMemstoreTS > maxMemstoreTS) {

41
                    maxMemstoreTS = maxStoreMemstoreTS;

42
                }

43
            }

44
            allStoresOpened = true;

45
        } catch (InterruptedException e) {

46
            throw (InterruptedIOException)new InterruptedIOException().initCause(e);

47
        } catch (ExecutionException e) {

48
            throw new IOException(e.getCause());

49
        } finally {

50
            storeOpenerThreadPool.shutdownNow();

51
            if (!allStoresOpened) {

52
                // something went wrong, close all opened stores

53
                LOG.error("Could not initialize all stores for the region=" + this);

54
                for (Store store : this.stores.values()) {

55
                    try {

56
                        store.close();

57
                    } catch (IOException e) {

58
                        LOG.warn(e.getMessage());

59
                    }

60
                }

61
            }

62
        }

63
    }

64
    mvcc.initialize(maxMemstoreTS + 1);

65
    // Recover any edits if available.

66
    maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));

67
    return maxSeqId;

68
}

上面就是parent.close()的获取stores的流程,下面进入到splitStoreFile内部,查看如何创建reference files的:
1
private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)

2
    throws IOException {

3
    if (LOG.isDebugEnabled()) {

4
        LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " +

5
                  this.parent);

6
    }

7
    HRegionFileSystem fs = this.parent.getRegionFileSystem();

8
    String familyName = Bytes.toString(family);

9
    Path path_a = fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,

10
                          this.parent.getSplitPolicy());

11
    Path path_b = fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,

12
                          this.parent.getSplitPolicy());

13
    if (LOG.isDebugEnabled()) {

14
        LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " +

15
                  this.parent);

16
    }

17
    return new Pair<Path,Path>(path_a, path_b);

18
}

在下面的方法中,如果top为true,则splitKey会与storefile中的firstKey做比较,如果大于firstKey则会为第一个子region添加reference file,如果top为false,则splitKey会与storefile中的lastKey做比较,如果小于lastKey则会为第二个子region添加reference file,大部分hifle会分别在a,b中生成一个reference file。
1
/**

2
   * Write out a split reference. Package local so it doesnt leak out of

3
   * regionserver.

4
   * @param hri {@link HRegionInfo} of the destination

5
   * @param familyName Column Family Name

6
   * @param f File to split.

7
   * @param splitRow Split Row

8
   * @param top True if we are referring to the top half of the hfile.

9
   * @param splitPolicy

10
   * @return Path to created reference.

11
   * @throws IOException

12
   */

13
Path splitStoreFile(final HRegionInfo hri, final String familyName, final StoreFile f,

14
                    final byte[] splitRow, final boolean top, RegionSplitPolicy splitPolicy) throws IOException {

15

16
    if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {

17
        // Check whether the split row lies in the range of the store file

18
        // If it is outside the range, return directly.

19
        try {

20
            if (top) {

21
                //check if larger than last key.

22
                KeyValue splitKey = KeyValue.createFirstOnRow(splitRow);

23
                byte[] lastKey = f.createReader().getLastKey();

24
                // If lastKey is null means storefile is empty.

25
                if (lastKey == null) return null;

26
                if (f.getReader().getComparator().compareFlatKey(splitKey.getBuffer(),

27
                       splitKey.getKeyOffset(), splitKey.getKeyLength(), lastKey, 0, lastKey.length) > 0) {

28
                    return null;

29
                }

30
            } else {

31
                //check if smaller than first key

32
                KeyValue splitKey = KeyValue.createLastOnRow(splitRow);

33
                byte[] firstKey = f.createReader().getFirstKey();

34
                // If firstKey is null means storefile is empty.

35
                if (firstKey == null) return null;

36
                if (f.getReader().getComparator().compareFlatKey(splitKey.getBuffer(),

37
                        splitKey.getKeyOffset(), splitKey.getKeyLength(), firstKey, 0, firstKey.length) < 0) {

38
                    return null;

39
                }

40
            }

41
        } finally {

42
            f.closeReader(f.getCacheConf() != null ? f.getCacheConf().shouldEvictOnClose() : true);

43
        }

44
    }

45

46
    Path splitDir = new Path(getSplitsDir(hri), familyName);

47
    // A reference to the bottom half of the hsf store file.

48
    Reference r =

49
        top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);

50
    // Add the referred-to regions name as a dot separated suffix.

51
    // See REF_NAME_REGEX regex above.  The referred-to regions name is

52
    // up in the path of the passed in <code>f</code> -- parentdir is family,

53
    // then the directory above is the region name.

54
    String parentRegionName = regionInfo.getEncodedName();

55
    // Write reference with same file id only with the other region name as

56
    // suffix and into the new region location (under same family).

57
    Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);

58
    return r.write(fs, p);

59
}

    下面再进入createDaughterRegionFromSplits方法内部,会将reference files移到相应的子region目录下去,然后创建子region实例并返回:
1
/**

2
   * Create a daughter region from given a temp directory with the region data.

3
   * @param hri Spec. for daughter region to open.

4
   * @throws IOException

5
   */

6
HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {

7
    // Move the files from the temporary .splits to the final /table/region directory

8
    fs.commitDaughterRegion(hri);

9

10
    // Create the daughter HRegion instance

11
    HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),

12
                                   this.getBaseConf(), hri, this.getTableDesc(), rsServices);

13
    r.readRequestsCount.set(this.getReadRequestsCount() / 2);

14
    r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);

15
    return r;

16
}

    以上便是split一个region的主要过程,整个流程图如下:

Split过程源码分析相关推荐

  1. 【源码分析】storm拓扑运行全流程源码分析

    [源码分析]storm拓扑运行全流程源码分析 @(STORM)[storm] 源码分析storm拓扑运行全流程源码分析 一拓扑提交流程 一stormpy 1storm jar 2def jar 3ex ...

  2. 分布式定时任务—xxl-job学习(三)——调度中心(xxl-job-admin)的启动和任务调度过程源码分析

    分布式定时任务-xxl-job学习(三)--调度中心(xxl-job-admin)的启动和任务调度过程源码分析 RabbitsInTheGrass 2020-06-30 10:31:08  813   ...

  3. 【SRIO】5、Xilinx RapidIO核例子工程源码分析

    目录 一.软件平台与硬件平台 二.打开例子工程 三.例子工程详解 3.1 工程概述 3.2 工程结构 3.3 工程分析 四.工程源码分析 3.1 顶层模块srio_example_top.v源码分析 ...

  4. 5.Xilinx RapidIO核例子工程源码分析

    https://www.cnblogs.com/liujinggang/p/10091216.html 一.软件平台与硬件平台 软件平台: 操作系统:Windows 8.1 64-bit 开发套件:V ...

  5. Android系统默认Home应用程序(Launcher)的启动过程源码分析

    在前面一篇文章中,我们分析了Android系统在启动时安装应用程序的过程,这些应用程序安装好之后,还须要有一个Home应用程序来负责把它们在桌面上展示出来,在Android系统中,这个默认的Home应 ...

  6. 小明分享|8ms平台下工程源码分析

    今天小明为大家分享的是开发工具平台-8ms(www.8ms.xyz)工程源码分析 1.打开"8ms平台",创建工程制作完UI后,选中"编译"一栏,等待结束后,选 ...

  7. SpringMVC执行流程源码分析

    SpringMVC执行流程源码分析 我们先来看张图片,帮助我们理解整个流程 然后我们开始来解析 首先SpringMVC基于Servlet来运行 那么我们首先来看HttpServletBean这个类 他 ...

  8. Android应用程序启动Binder线程源码分析

    Android的应用程序包括Java应用及本地应用,Java应用运行在davik虚拟机中,由zygote进程来创建启动,而本地服务应用在Android系统启动时,通过配置init.rc文件来由Init ...

  9. Android服务查询完整过程源码分析

    Android服务注册完整过程源码分析中从上到下详细分析了Android系统的服务注册过程,本文同样针对AudioService服务来介绍Android服务的查询过程. 客户端进程数据发送过程 pri ...

  10. Android服务注册完整过程源码分析

    前面从不同片段分析了Android的Binder通信机制,本文结合前面介绍的内容,对整个Android的Binder通信过程进行一次完整的分析.分析以AudioService服务的注册过程为例. 由于 ...

最新文章

  1. 直观、形象、动态,一文了解无处不在的标准差
  2. PL/SQL 游标
  3. 图解WindowsXP修改MAC地址
  4. redmine 无法登录 mysql 服务器_Redmine 数据库连接错误
  5. python 单例模式 redis_python 单例模式实现多线程共享连接池
  6. 2017年AR大会上海站干货分享
  7. 【leetcode】Integer to Roman
  8. MyBatis 实践 -Mapper与DAO
  9. python包之间引用_python 子包引用父包和其他子包
  10. redis相关技能积累
  11. 24小时BTC全网合约成交数据显示:多头占优
  12. linux java解压文件怎么打开,linux下面的解压缩文件的命令
  13. linux如何从 命令行 将普通文件打印到 pdf
  14. libcef-框架架构中概念介绍-进程-线程-引用计数-字符串等(一)
  15. 《期权、期货及其他衍生产品》读书笔记(第二章:期货市场与中央交易对手)
  16. 做UI设计需要具备什么技能
  17. 红米4a android 9 速度,雷军感叹科技进步速度太快!Redmi 9入门机性能已经相当于骁龙835...
  18. 互联网站规划与设计.txt
  19. 数据结构(C++)——图:基于邻接矩阵实现的图结构
  20. 计算机等级考试怎么领取证书 领取方式

热门文章

  1. 2020届校园招聘360笔试题
  2. 有些公司的年会,还不如《天龙八部》的万仙大会
  3. linux环境下的jmeter测试
  4. Windows PE的作用
  5. 2019年电子设计国赛综合测评回顾
  6. myeclipse使用(技术和快捷键)
  7. IPTABLES中SNAT和MASQUERADE的区别
  8. Selenium-WEB自动化学习笔记--更新ing
  9. c语言浮点数常量,C 浮点常量
  10. 读网卡MAC地址的几种方法