Split过程源码分析
split来源:
...
if (action.equals("split")) {
if (key != null && key.length() > 0) {
hbadmin.split(key);
} else {
hbadmin.split(fqtn);
}
out.write(" Split request accepted. ");
}
...
/**
* Split a table or an individual region.
* Asynchronous operation.
*/
public void split(final byte[] tableNameOrRegionName,
final byte [] splitPoint) throws IOException, InterruptedException {
CatalogTracker ct = getCatalogTracker();
try {
Pair<HRegionInfo, ServerName> regionServerPair = getRegion(tableNameOrRegionName, ct);
if (regionServerPair != null) {
if (regionServerPair.getSecond() == null) {
throw new NoServerForRegionException(Bytes.toStringBinary(tableNameOrRegionName));
} else {
split(regionServerPair.getSecond(), regionServerPair.getFirst(), splitPoint);
}
}
...
}
private void split(final ServerName sn, final HRegionInfo hri,
byte[] splitPoint) throws IOException {
if (hri.getStartKey() != null && splitPoint != null &&
Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) {
throw new IOException("should not give a splitkey which equals to startkey!");
}
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
ProtobufUtil.split(admin, hri, splitPoint);
}
/**
* A helper to split a region using admin protocol.
*/
public static void split(final AdminService.BlockingInterface admin,
final HRegionInfo hri, byte[] splitPoint) throws IOException {
SplitRegionRequest request = RequestConverter.buildSplitRegionRequest(hri.getRegionName(), splitPoint);
try {
admin.splitRegion(null, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse splitRegion(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest request)
throws com.google.protobuf.ServiceException;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
*/
@InterfaceAudience.Private
@SuppressWarnings("deprecation")
public class HRegionServer implements ClientProtos.ClientService.BlockingInterface,
AdminProtos.AdminService.BlockingInterface, Runnable, RegionServerServices,
HBaseRPCErrorHandler, LastSequenceId {
...
/**
* Split a region on the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.HIGH_QOS)
public SplitRegionResponse splitRegion(final RpcController controller,
final SplitRegionRequest request) throws ServiceException {
try {
checkOpen();
requestCount.increment();
HRegion region = getRegion(request.getRegion());
region.startRegionOperation(Operation.SPLIT_REGION);
LOG.info("Splitting " + region.getRegionNameAsString());
long startTime = EnvironmentEdgeManager.currentTimeMillis();
HRegion.FlushResult flushResult = region.flushcache();
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTimeMillis();
metricsRegionServer.updateFlushTime(endTime - startTime);
}
byte[] splitPoint = null;
if (request.hasSplitPoint()) {
splitPoint = request.getSplitPoint().toByteArray();
}
region.forceSplit(splitPoint);
//CompactSplitThread发起split请求,region.checkSplit()会获取到split的midkey
compactSplitThread.requestSplit(region, region.checkSplit(), RpcServer.getRequestUser());
...
}
}
/**
* Return the splitpoint. null indicates the region isn't splittable
* If the splitpoint isn't explicitly specified, it will go over the stores
* to find the best splitpoint. Currently the criteria of best splitpoint
* is based on the size of the store.
*/
public byte[] checkSplit() {
...
if (!splitPolicy.shouldSplit()) {
return null;
}
byte[] ret = splitPolicy.getSplitPoint();
...
return ret;
}
/**
* @return the key at which the region should be split, or null
* if it cannot be split. This will only be called if shouldSplit
* previously returned true.
*/
protected byte[] getSplitPoint() {
byte[] explicitSplitPoint = this.region.getExplicitSplitPoint();
if (explicitSplitPoint != null) {
return explicitSplitPoint;
}
Map<byte[], Store> stores = region.getStores();
byte[] splitPointFromLargestStore = null;
long largestStoreSize = 0;
for (Store s : stores.values()) {
byte[] splitPoint = s.getSplitPoint();
long storeSize = s.getSize();
if (splitPoint != null && largestStoreSize < storeSize) {
splitPointFromLargestStore = splitPoint;
largestStoreSize = storeSize;
}
}
return splitPointFromLargestStore;
}
@Override
public byte[] getSplitPoint() {
this.lock.readLock().lock();
try {
// Should already be enforced by the split policy!
assert !this.getRegionInfo().isMetaRegion();
// Not split-able if we find a reference store file present in the store.
if (hasReferences()) {
return null;
}
return this.storeEngine.getStoreFileManager().getSplitPoint();
} catch(IOException e) {
LOG.warn("Failed getting store size for " + this, e);
} finally {
this.lock.readLock().unlock();
}
return null;
}
@Override
public final byte[] getSplitPoint() throws IOException {
if (this.storefiles.isEmpty()) {
return null;
}
return StoreUtils.getLargestFile(this.storefiles).getFileSplitPoint(this.kvComparator);
}
/**
* Gets the approximate mid-point of this file that is optimal for use in splitting it.
* @param comparator Comparator used to compare KVs.
* @return The split point row, or null if splitting is not possible, or reader is null.
*/
@SuppressWarnings("deprecation")
byte[] getFileSplitPoint(KVComparator comparator) throws IOException {
if (this.reader == null) {
LOG.warn("Storefile " + this + " Reader is null; cannot get split point");
return null;
}
// Get first, last, and mid keys. Midkey is the key that starts block
// in middle of hfile. Has column and timestamp. Need to return just
// the row we want to split on as midkey.
byte [] midkey = this.reader.midkey();
if (midkey != null) {
KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
byte [] fk = this.reader.getFirstKey();
KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
byte [] lk = this.reader.getLastKey();
KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
// if the midkey is the same as the first or last keys, we cannot (ever) split this region.
if (comparator.compareRows(mk, firstKey) == 0 || comparator.compareRows(mk, lastKey) == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("cannot split because midkey is the same as first or last row");
}
return null;
}
return mk.getRow();
}
return null;
}
HBase还规定,如果定位到的rowkey是整个文件的首个rowkey或者最后一个rowkey的话,就认为没有切分点。
什么情况下会出现没有切分点的场景呢?最常见的就是一个文件只有一个block,执行split的时候就会发现无法切分。很多新同学在测试split的时候往往都是新建一张新表,然后往新表中插入几条数据并执行一下flush,再执行split,奇迹般地发现数据表并没有真正执行切分。原因就在这里,这个时候仔细的话你翻看debug日志是可以看到这样的日志滴:
/*
* The User parameter allows the split thread to assume the correct user identity
*/
public synchronized void requestSplit(final HRegion r, byte[] midKey, User user) {
if (midKey == null) {
LOG.debug("Region " + r.getRegionNameAsString() +
" not splittable because midkey=null");
if (r.shouldForceSplit()) {
r.clearSplit();
}
return;
}
try {
//线程池中调用SplitRequest类的doSplitting方法
this.splits.execute(new SplitRequest(r, midKey, this.server, user));
if (LOG.isDebugEnabled()) {
LOG.debug("Split requested for " + r + ". " + this);
}
} catch (RejectedExecutionException ree) {
LOG.info("Could not execute split for " + r, ree);
}
}
private void doSplitting(User user) {
boolean success = false;
server.getMetrics().incrSplitRequest();
long startTime = EnvironmentEdgeManager.currentTimeMillis();
SplitTransaction st = new SplitTransaction(parent, midKey);
try {
//acquire a shared read lock on the table, so that table schema modifications
//do not happen concurrently
tableLock = server.getTableLockManager().readLock(parent.getTableDesc().getTableName()
, "SPLIT_REGION:" + parent.getRegionNameAsString());
try {
tableLock.acquire();
} catch (IOException ex) {
tableLock = null;
throw ex;
}
// If prepare does not return true, for some reason -- logged inside in
// the prepare call -- we are not ready to split just now. Just return.
if (!st.prepare()) return;
try {
st.execute(this.server, this.server, user);
success = true;
} catch (Exception e) {
...
} catch (IOException ex) {
LOG.error("Split failed " + this, RemoteExceptionHandler.checkIOException(ex));
server.checkFileSystem();
} finally {
if (this.parent.getCoprocessorHost() != null) {
try {
this.parent.getCoprocessorHost().postCompleteSplit();
} catch (IOException io) {
LOG.error("Split failed " + this,
RemoteExceptionHandler.checkIOException(io));
}
}
if (parent.shouldForceSplit()) {
parent.clearSplit();
}
releaseTableLock();
long endTime = EnvironmentEdgeManager.currentTimeMillis();
// Update regionserver metrics with the split transaction total running time
server.getMetrics().updateSplitTime(endTime - startTime);
if (success) {
...
}
}
/**
* Does checks on split inputs.
* @return <code>true</code> if the region is splittable else
* <code>false</code> if it is not (e.g. its already closed, etc.).
*/
public boolean prepare() {
if (!this.parent.isSplittable()) return false;
// Split key can be null if this region is unsplittable; i.e. has refs.
if (this.splitrow == null) return false;
HRegionInfo hri = this.parent.getRegionInfo();
parent.prepareToSplit();
// Check splitrow.
byte [] startKey = hri.getStartKey();
byte [] endKey = hri.getEndKey();
if (Bytes.equals(startKey, splitrow) ||
!this.parent.getRegionInfo().containsRow(splitrow)) {
LOG.info("Split row is not inside region key range or is equal to " +
"startkey: " + Bytes.toStringBinary(this.splitrow));
return false;
}
long rid = getDaughterRegionIdTimestamp(hri);
this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
this.journal.add(new JournalEntry(JournalEntryType.PREPARED));
return true;
}
/**
* Run the transaction.
* @param server Hosting server instance. Can be null when testing (won't try
* and update in zk if a null server)
* @param services Used to online/offline regions.
* @throws IOException If thrown, transaction failed.
* Call {@link #rollback(Server, RegionServerServices)}
* @return Regions created
* @throws IOException
* @see #rollback(Server, RegionServerServices)
*/
public PairOfSameType<HRegion> execute(final Server server,
final RegionServerServices services, User user)
throws IOException {
useZKForAssignment =
server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration());
PairOfSameType<HRegion> regions = createDaughters(server, services, user);
...
return stepsAfterPONR(server, services, regions, user);
}
/* package */PairOfSameType<HRegion> createDaughters(final Server server,
final RegionServerServices services, User user) throws IOException {
...
PairOfSameType<HRegion> daughterRegions = stepsBeforePONR(server, services, testing);
final List<Mutation> metaEntries = new ArrayList<Mutation>();
boolean ret = false;
...
return daughterRegions;
}
public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
final RegionServerServices services, boolean testing) throws IOException {
// Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't
// have zookeeper so don't do zk stuff if server or zookeeper is null
if (server != null && server.getZooKeeper() != null && useZKForAssignment) {
try {
createNodeSplitting(server.getZooKeeper(), parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
} catch (KeeperException e) {
throw new IOException("Failed creating PENDING_SPLIT znode on " +
this.parent.getRegionNameAsString(), e);
}
} else if (services != null && !useZKForAssignment) {
if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT, parent.getRegionInfo(), hri_a, hri_b)) {
throw new IOException("Failed to get ok from master to split " + parent.getRegionNameAsString());
}
}
this.journal.add(new JournalEntry(JournalEntryType.SET_SPLITTING_IN_ZK));
if (server != null && server.getZooKeeper() != null && useZKForAssignment) {
// After creating the split node, wait for master to transition it
// from PENDING_SPLIT to SPLITTING so that we can move on. We want master
// knows about it and won't transition any region which is splitting.
znodeVersion = getZKNode(server, services);
}
this.parent.getRegionFileSystem().createSplitsDir();
this.journal.add(new JournalEntry(JournalEntryType.CREATE_SPLIT_DIR));
Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
Exception exceptionToThrow = null;
try{
// 获取到父region的所有sotre
hstoreFilesToSplit = this.parent.close(false);
} catch (Exception e) {
exceptionToThrow = e;
}
...
this.journal.add(new JournalEntry(JournalEntryType.OFFLINED_PARENT));
// splitStoreFiles creates daughter region dirs under the parent splits dir
// Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
// clean this up.
Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
// Log to the journal that we are creating region A, the first daughter
// region. We could fail halfway through. If we do, we could have left
// stuff in fs that needs cleanup -- a storefile or two. Thats why we
// add entry to journal BEFORE rather than AFTER the change.
this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_A_CREATION));
assertReferenceFileCount(expectedReferences.getFirst(),
this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
assertReferenceFileCount(expectedReferences.getFirst(),
new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
// Ditto
this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_B_CREATION));
assertReferenceFileCount(expectedReferences.getSecond(), this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
assertReferenceFileCount(expectedReferences.getSecond(),
new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
return new PairOfSameType<HRegion>(a, b);
}
/**
* Creates reference files for top and bottom half of the
* @param hstoreFilesToSplit map of store files to create half file references for.
* @return the number of reference files that were created.
* @throws IOException
*/
private Pair<Integer, Integer> splitStoreFiles(final Map<byte[],
List<StoreFile>> hstoreFilesToSplit) throws IOException {
if (hstoreFilesToSplit == null) {
// Could be null because close didn't succeed -- for now consider it fatal
throw new IOException("Close returned empty list of StoreFiles");
}
// The following code sets up a thread pool executor with as many slots as
// there's files to split. It then fires up everything, waits for
// completion and finally checks for any exception
int nbFiles = 0;
for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
nbFiles += entry.getValue().size();
}
if (nbFiles == 0) {
// no file needs to be splitted.
return new Pair<Integer, Integer>(0,0);
}
// Default max #threads to use is the smaller of table's configured number of blocking store
// files or the available number of logical cores.
int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY,
HStore.DEFAULT_BLOCKING_STOREFILE_COUNT),
Runtime.getRuntime().availableProcessors());
// Max #threads is the smaller of the number of storefiles or the default max determined above.
int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX, defMaxThreads), nbFiles);
LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent +
" using " + maxThreads + " threads");
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("StoreFileSplitter-%1$d");
ThreadFactory factory = builder.build();
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory);
List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
// Split each store file.
for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
for (StoreFile sf: entry.getValue()) {
StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
futures.add(threadPool.submit(sfs));
}
}
// Shutdown the pool
threadPool.shutdown();
// Wait for all the tasks to finish
try {
boolean stillRunning = !threadPool.awaitTermination(this.fileSplitTimeout, TimeUnit.MILLISECONDS);
if (stillRunning) {
threadPool.shutdownNow();
// wait for the thread to shutdown completely.
while (!threadPool.isTerminated()) {
Thread.sleep(50);
}
throw new IOException("Took too long to split the" + " files and create the references, aborting split");
}
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
int created_a = 0;
int created_b = 0;
// Look for any exception
for (Future<Pair<Path, Path>> future : futures) {
try {
Pair<Path, Path> p = future.get();
created_a += p.getFirst() != null ? 1 : 0;
created_b += p.getSecond() != null ? 1 : 0;
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
throw new IOException(e);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Split storefiles for region " + this.parent + " Daugther A: " + created_a
+ " storefiles, Daugther B: " + created_b + " storefiles.");
}
return new Pair<Integer, Integer>(created_a, created_b);
}
protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(Bytes.BYTES_RAWCOMPARATOR);
private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
throws IOException {
if (isClosed()) {
LOG.warn("Region " + this + " already closed");
return null;
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor pre-close hooks");
this.coprocessorHost.preClose(abort);
}
status.setStatus("Disabling compacts and flushes for region");
synchronized (writestate) {
// Disable compacting and flushing by background threads for this
// region.
writestate.writesEnabled = false;
LOG.debug("Closing " + this + ": disabling compactions & flushes");
waitForFlushesAndCompactions();
}
// If we were not just flushing, is it worth doing a preflush...one
// that will clear out of the bulk of the memstore before we put up
// the close flag?
if (!abort && worthPreFlushing()) {
status.setStatus("Pre-flushing region before close");
LOG.info("Running close preflush of " + this.getRegionNameAsString());
try {
internalFlushcache(status);
} catch (IOException ioe) {
// Failed to flush the region. Keep going.
status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
}
}
// block waiting for the lock for closing
lock.writeLock().lock();
this.closing.set(true);
status.setStatus("Disabling writes for close");
try {
if (this.isClosed()) {
status.abort("Already got closed by another process");
// SplitTransaction handles the null
return null;
}
LOG.debug("Updates disabled for region " + this);
// Don't flush the cache if we are aborting
if (!abort) {
int flushCount = 0;
while (this.getMemstoreSize().get() > 0) {
try {
if (flushCount++ > 0) {
int actualFlushes = flushCount - 1;
if (actualFlushes > 5) {
// If we tried 5 times and are unable to clear memory, abort
// so we do not lose data
throw new DroppedSnapshotException("Failed clearing memory after " +
actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
}
LOG.info("Running extra flush, " + actualFlushes + " (carrying snapshot?) " + this);
}
internalFlushcache(status);
} catch (IOException ioe) {
status.setStatus("Failed flush " + this + ", putting online again");
synchronized (writestate) {
writestate.writesEnabled = true;
}
// Have to throw to upper layers. I can't abort server from here.
throw ioe;
}
}
}
Map<byte[], List<StoreFile>> result =
new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
if (!stores.isEmpty()) {
// initialize the thread pool for closing stores in parallel.
ThreadPoolExecutor storeCloserThreadPool =
getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
// close each store in parallel
for (final Store store : stores.values()) {
long flushableSize = store.getFlushableSize();
if (!(abort || flushableSize == 0)) {
getRegionServerServices().abort("Assertion failed while closing store "
+ getRegionInfo().getRegionNameAsString() + " " + store
+ ". flushableSize expected=0, actual= " + flushableSize
+ ". Current memstoreSize=" + getMemstoreSize() + ". Maybe a coprocessor "
+ "operation failed and left the memstore in a partially updated state.", null);
}
completionService.submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
@Override
public Pair<byte[], Collection<StoreFile>> call() throws IOException {
return new Pair<byte[], Collection<StoreFile>>(
store.getFamily().getName(), store.close());
}
});
}
try {
for (int i = 0; i < stores.size(); i++) {
Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
if (familyFiles == null) {
familyFiles = new ArrayList<StoreFile>();
result.put(storeFiles.getFirst(), familyFiles);
}
familyFiles.addAll(storeFiles.getSecond());
}
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
storeCloserThreadPool.shutdownNow();
}
}
this.closed.set(true);
if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-close hooks");
this.coprocessorHost.postClose(abort);
}
if ( this.metricsRegion != null) {
this.metricsRegion.close();
}
if ( this.metricsRegionWrapper != null) {
Closeables.closeQuietly(this.metricsRegionWrapper);
}
status.markComplete("Closed");
LOG.info("Closed " + this);
return result;
} finally {
lock.writeLock().unlock();
}
}
private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
throws IOException, UnsupportedEncodingException {
// Load in all the HStores.
long maxSeqId = -1;
// initialized to -1 so that we pick up MemstoreTS from column families
long maxMemstoreTS = -1;
if (!htableDescriptor.getFamilies().isEmpty()) {
// initialize the thread pool for opening stores in parallel.
ThreadPoolExecutor storeOpenerThreadPool =
getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
CompletionService<HStore> completionService =
new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
// initialize each store in parallel
for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
status.setStatus("Instantiating store for column family " + family);
completionService.submit(new Callable<HStore>() {
@Override
public HStore call() throws IOException {
return instantiateHStore(family);
}
});
}
boolean allStoresOpened = false;
try {
for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
Future<HStore> future = completionService.take();
HStore store = future.get();
this.stores.put(store.getColumnFamilyName().getBytes(), store);
long storeMaxSequenceId = store.getMaxSequenceId();
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
storeMaxSequenceId);
if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
maxSeqId = storeMaxSequenceId;
}
long maxStoreMemstoreTS = store.getMaxMemstoreTS();
if (maxStoreMemstoreTS > maxMemstoreTS) {
maxMemstoreTS = maxStoreMemstoreTS;
}
}
allStoresOpened = true;
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
storeOpenerThreadPool.shutdownNow();
if (!allStoresOpened) {
// something went wrong, close all opened stores
LOG.error("Could not initialize all stores for the region=" + this);
for (Store store : this.stores.values()) {
try {
store.close();
} catch (IOException e) {
LOG.warn(e.getMessage());
}
}
}
}
}
mvcc.initialize(maxMemstoreTS + 1);
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
return maxSeqId;
}
private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " +
this.parent);
}
HRegionFileSystem fs = this.parent.getRegionFileSystem();
String familyName = Bytes.toString(family);
Path path_a = fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
this.parent.getSplitPolicy());
Path path_b = fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
this.parent.getSplitPolicy());
if (LOG.isDebugEnabled()) {
LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " +
this.parent);
}
return new Pair<Path,Path>(path_a, path_b);
}
/**
* Write out a split reference. Package local so it doesnt leak out of
* regionserver.
* @param hri {@link HRegionInfo} of the destination
* @param familyName Column Family Name
* @param f File to split.
* @param splitRow Split Row
* @param top True if we are referring to the top half of the hfile.
* @param splitPolicy
* @return Path to created reference.
* @throws IOException
*/
Path splitStoreFile(final HRegionInfo hri, final String familyName, final StoreFile f,
final byte[] splitRow, final boolean top, RegionSplitPolicy splitPolicy) throws IOException {
if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
// Check whether the split row lies in the range of the store file
// If it is outside the range, return directly.
try {
if (top) {
//check if larger than last key.
KeyValue splitKey = KeyValue.createFirstOnRow(splitRow);
byte[] lastKey = f.createReader().getLastKey();
// If lastKey is null means storefile is empty.
if (lastKey == null) return null;
if (f.getReader().getComparator().compareFlatKey(splitKey.getBuffer(),
splitKey.getKeyOffset(), splitKey.getKeyLength(), lastKey, 0, lastKey.length) > 0) {
return null;
}
} else {
//check if smaller than first key
KeyValue splitKey = KeyValue.createLastOnRow(splitRow);
byte[] firstKey = f.createReader().getFirstKey();
// If firstKey is null means storefile is empty.
if (firstKey == null) return null;
if (f.getReader().getComparator().compareFlatKey(splitKey.getBuffer(),
splitKey.getKeyOffset(), splitKey.getKeyLength(), firstKey, 0, firstKey.length) < 0) {
return null;
}
}
} finally {
f.closeReader(f.getCacheConf() != null ? f.getCacheConf().shouldEvictOnClose() : true);
}
}
Path splitDir = new Path(getSplitsDir(hri), familyName);
// A reference to the bottom half of the hsf store file.
Reference r =
top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
// Add the referred-to regions name as a dot separated suffix.
// See REF_NAME_REGEX regex above. The referred-to regions name is
// up in the path of the passed in <code>f</code> -- parentdir is family,
// then the directory above is the region name.
String parentRegionName = regionInfo.getEncodedName();
// Write reference with same file id only with the other region name as
// suffix and into the new region location (under same family).
Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
return r.write(fs, p);
}
/**
* Create a daughter region from given a temp directory with the region data.
* @param hri Spec. for daughter region to open.
* @throws IOException
*/
HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
// Move the files from the temporary .splits to the final /table/region directory
fs.commitDaughterRegion(hri);
// Create the daughter HRegion instance
HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
this.getBaseConf(), hri, this.getTableDesc(), rsServices);
r.readRequestsCount.set(this.getReadRequestsCount() / 2);
r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
return r;
}
Split过程源码分析相关推荐
- 【源码分析】storm拓扑运行全流程源码分析
[源码分析]storm拓扑运行全流程源码分析 @(STORM)[storm] 源码分析storm拓扑运行全流程源码分析 一拓扑提交流程 一stormpy 1storm jar 2def jar 3ex ...
- 分布式定时任务—xxl-job学习(三)——调度中心(xxl-job-admin)的启动和任务调度过程源码分析
分布式定时任务-xxl-job学习(三)--调度中心(xxl-job-admin)的启动和任务调度过程源码分析 RabbitsInTheGrass 2020-06-30 10:31:08 813 ...
- 【SRIO】5、Xilinx RapidIO核例子工程源码分析
目录 一.软件平台与硬件平台 二.打开例子工程 三.例子工程详解 3.1 工程概述 3.2 工程结构 3.3 工程分析 四.工程源码分析 3.1 顶层模块srio_example_top.v源码分析 ...
- 5.Xilinx RapidIO核例子工程源码分析
https://www.cnblogs.com/liujinggang/p/10091216.html 一.软件平台与硬件平台 软件平台: 操作系统:Windows 8.1 64-bit 开发套件:V ...
- Android系统默认Home应用程序(Launcher)的启动过程源码分析
在前面一篇文章中,我们分析了Android系统在启动时安装应用程序的过程,这些应用程序安装好之后,还须要有一个Home应用程序来负责把它们在桌面上展示出来,在Android系统中,这个默认的Home应 ...
- 小明分享|8ms平台下工程源码分析
今天小明为大家分享的是开发工具平台-8ms(www.8ms.xyz)工程源码分析 1.打开"8ms平台",创建工程制作完UI后,选中"编译"一栏,等待结束后,选 ...
- SpringMVC执行流程源码分析
SpringMVC执行流程源码分析 我们先来看张图片,帮助我们理解整个流程 然后我们开始来解析 首先SpringMVC基于Servlet来运行 那么我们首先来看HttpServletBean这个类 他 ...
- Android应用程序启动Binder线程源码分析
Android的应用程序包括Java应用及本地应用,Java应用运行在davik虚拟机中,由zygote进程来创建启动,而本地服务应用在Android系统启动时,通过配置init.rc文件来由Init ...
- Android服务查询完整过程源码分析
Android服务注册完整过程源码分析中从上到下详细分析了Android系统的服务注册过程,本文同样针对AudioService服务来介绍Android服务的查询过程. 客户端进程数据发送过程 pri ...
- Android服务注册完整过程源码分析
前面从不同片段分析了Android的Binder通信机制,本文结合前面介绍的内容,对整个Android的Binder通信过程进行一次完整的分析.分析以AudioService服务的注册过程为例. 由于 ...
最新文章
- 直观、形象、动态,一文了解无处不在的标准差
- PL/SQL 游标
- 图解WindowsXP修改MAC地址
- redmine 无法登录 mysql 服务器_Redmine 数据库连接错误
- python 单例模式 redis_python 单例模式实现多线程共享连接池
- 2017年AR大会上海站干货分享
- 【leetcode】Integer to Roman
- MyBatis 实践 -Mapper与DAO
- python包之间引用_python 子包引用父包和其他子包
- redis相关技能积累
- 24小时BTC全网合约成交数据显示:多头占优
- linux java解压文件怎么打开,linux下面的解压缩文件的命令
- linux如何从 命令行 将普通文件打印到 pdf
- libcef-框架架构中概念介绍-进程-线程-引用计数-字符串等(一)
- 《期权、期货及其他衍生产品》读书笔记(第二章:期货市场与中央交易对手)
- 做UI设计需要具备什么技能
- 红米4a android 9 速度,雷军感叹科技进步速度太快!Redmi 9入门机性能已经相当于骁龙835...
- 互联网站规划与设计.txt
- 数据结构(C++)——图:基于邻接矩阵实现的图结构
- 计算机等级考试怎么领取证书 领取方式