import org.apache.beam.sdk.util.BackOff; //导入依赖的package包/类


* Writes a batch of mutations to Cloud Datastore.



If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. All

* mutations in the batch will be committed again, even if the commit was partially

* successful. If the retry limit is exceeded, the last exception from Cloud Datastore will be

* thrown.


* @throws DatastoreException if the commit fails or IOException or InterruptedException if

* backing off between retries fails.


private void flushBatch() throws DatastoreException, IOException, InterruptedException {

LOG.debug("Writing batch of {} mutations", mutations.size());

Sleeper sleeper = Sleeper.DEFAULT;

BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();

while (true) {

// Batch upsert entities.

CommitRequest.Builder commitRequest = CommitRequest.newBuilder();



long startTime = System.currentTimeMillis(), endTime;

if (throttler.throttleRequest(startTime)) {"Delaying request due to previous failures"); / 1000);




try {


endTime = System.currentTimeMillis();

writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size());


// Break if the commit threw no exception.


} catch (DatastoreException exception) {

if (exception.getCode() == Code.DEADLINE_EXCEEDED) {

/* Most errors are not related to request size, and should not change our expectation of

* the latency of successful requests. DEADLINE_EXCEEDED can be taken into

* consideration, though. */

endTime = System.currentTimeMillis();

writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size());


// Only log the code and message for potentially-transient errors. The entire exception

// will be propagated upon the last retry.

LOG.error("Error writing batch of {} mutations to Datastore ({}): {}", mutations.size(),

exception.getCode(), exception.getMessage());;

if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) {

throw exception;


if (!, backoff)) {

LOG.error("Aborting after {} retries.", MAX_RETRIES);

throw exception;




LOG.debug("Successfully wrote {} mutations", mutations.size());


mutationsSize = 0;


