diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java deleted file mode 100644 index b172a4ac3046..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.bigquery; - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; -import org.checkerframework.checker.nullness.qual.NonNull; -import org.checkerframework.checker.nullness.qual.Nullable; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Encapsulates the cache of {@link AppendClientInfo} objects and the synchronization protocol - * required to use them safely. The Guava cache object is thread-safe. However our protocol requires - * that client pin the StreamAppendClient after looking up the cache, and we must ensure that the - * cache is not accessed in between the lookup and the pin (any access of the cache could trigger - * element expiration). - */ -class AppendClientCache { - private static final Logger LOG = LoggerFactory.getLogger(AppendClientCache.class); - private final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool(); - - private final Cache appendCache; - - @SuppressWarnings({"FutureReturnValueIgnored"}) - AppendClientCache(Duration expireAfterAccess) { - this.appendCache = - CacheBuilder.newBuilder() - .expireAfterAccess(expireAfterAccess.getMillis(), TimeUnit.MILLISECONDS) - .removalListener( - (RemovalNotification removal) -> { - LOG.info("Expiring append client for {}", removal.getKey()); - final @Nullable AppendClientInfo appendClientInfo = removal.getValue(); - if (appendClientInfo != null) { - // Remove the pin owned by the cache itself. Since the client has not been - // marked as closed, we - // can call unpin in this thread without worrying about blocking the thread. - appendClientInfo.unpinAppendClient(null); - // Close the client in another thread to avoid blocking the main thread. - closeWriterExecutor.submit(appendClientInfo::close); - } - }) - .build(); - } - - // The cache itself always own one pin on the object. This Callable is always used to ensure that - // the cache - // adds a pin before loading a value. - private static Callable wrapWithPin(Callable loader) { - return () -> { - AppendClientInfo client = loader.call(); - client.pinAppendClient(); - return client; - }; - } - - /** - * Atomically get an append client from the cache and add a pin. This pin is owned by the client, - * which has the responsibility of removing it. If the client is not in the cache, loader will be - * used to load the client; in this case an additional pin will be added owned by the cache, - * removed when the item is evicted. - */ - public AppendClientInfo getAndPin(KeyT key, Callable loader) throws Exception { - synchronized (this) { - AppendClientInfo info = appendCache.get(key, wrapWithPin(loader)); - info.pinAppendClient(); - return info; - } - } - - /** "Refresh" an object by invalidating the old cache entry. */ - public AppendClientInfo refreshObjectAndPin(KeyT key, Callable loader) - throws Exception { - synchronized (this) { - appendCache.invalidate(key); - return getAndPin(key, loader); - } - } - - public void invalidate(KeyT key, AppendClientInfo expectedClient) { - // The default stream is cached across multiple different DoFns. If they all try - // and - // invalidate, then we can get races between threads invalidating and recreating - // streams. For this reason, - // we check to see that the cache still contains the object we created before - // invalidating (in case another - // thread has already invalidated and recreated the stream). - synchronized (this) { - AppendClientInfo cachedAppendClient = appendCache.getIfPresent(key); - if (cachedAppendClient != null - && System.identityHashCode(cachedAppendClient) - == System.identityHashCode(expectedClient)) { - appendCache.invalidate(key); - } - } - } - - public void invalidate(KeyT key) { - synchronized (this) { - appendCache.invalidate(key); - } - } - - public void tickle(KeyT key) { - synchronized (this) { - appendCache.getIfPresent(key); - } - } - - public void clear() { - synchronized (this) { - appendCache.invalidateAll(); - } - } - - public void unpinAsync(AppendClientInfo appendClientInfo) { - appendClientInfo.unpinAppendClient(closeWriterExecutor); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java index 55c6007e1986..e9adc8097604 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -27,17 +27,13 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import com.google.protobuf.Message; -import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; import javax.annotation.Nullable; -import org.apache.beam.sdk.function.ThrowingConsumer; -import org.apache.beam.sdk.function.ThrowingRunnable; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Container class used by {@link StorageApiWritesShardedRecords} and {@link @@ -46,8 +42,6 @@ */ @AutoValue abstract class AppendClientInfo { - private static final Logger LOG = LoggerFactory.getLogger(AppendClientInfo.class); - private final Counter activeStreamAppendClients = Metrics.counter(AppendClientInfo.class, "activeStreamAppendClients"); @@ -55,7 +49,7 @@ abstract class AppendClientInfo { abstract TableSchema getTableSchema(); - abstract ThrowingConsumer getCloseAppendClient(); + abstract Consumer getCloseAppendClient(); abstract com.google.api.services.bigquery.model.TableSchema getJsonTableSchema(); @@ -71,8 +65,7 @@ abstract static class Builder { abstract Builder setTableSchema(TableSchema value); - abstract Builder setCloseAppendClient( - ThrowingConsumer value); + abstract Builder setCloseAppendClient(Consumer value); abstract Builder setJsonTableSchema(com.google.api.services.bigquery.model.TableSchema value); @@ -90,7 +83,7 @@ abstract Builder setCloseAppendClient( static AppendClientInfo of( TableSchema tableSchema, DescriptorProtos.DescriptorProto descriptor, - ThrowingConsumer closeAppendClient) + Consumer closeAppendClient) throws Exception { return new AutoValue_AppendClientInfo.Builder() .setTableSchema(tableSchema) @@ -104,7 +97,7 @@ static AppendClientInfo of( static AppendClientInfo of( TableSchema tableSchema, - ThrowingConsumer closeAppendClient, + Consumer closeAppendClient, boolean includeCdcColumns) throws Exception { return of( @@ -141,12 +134,7 @@ public AppendClientInfo withAppendClient( public void close() { BigQueryServices.StreamAppendClient client = getStreamAppendClient(); if (client != null) { - try { - getCloseAppendClient().accept(client); - } catch (Exception e) { - // We ignore errors when closing clients. - LOG.warn("Caught exception whilw trying to close append client. Ignoring", e); - } + getCloseAppendClient().accept(client); activeStreamAppendClients.dec(); } } @@ -211,32 +199,4 @@ public TableRow toTableRow(ByteString protoBytes, Predicate includeField throw new RuntimeException(e); } } - - public void pinAppendClient() { - BigQueryServices.StreamAppendClient client = - Preconditions.checkStateNotNull(getStreamAppendClient()); - client.pin(); - } - - public void unpinAppendClient(@Nullable ExecutorService executor) { - BigQueryServices.StreamAppendClient client = - Preconditions.checkStateNotNull(getStreamAppendClient()); - if (executor != null) { - runAsyncIgnoreFailure(executor, client::unpin); - } else { - client.unpin(); - } - } - - @SuppressWarnings({"FutureReturnValueIgnored"}) - private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunnable task) { - executor.submit( - () -> { - try { - task.run(); - } catch (Throwable e) { - LOG.info("Exception happened while executing async task. Ignoring: ", e); - } - }); - } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 66458a8339f9..78e714a7ccd6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -270,7 +270,7 @@ default long getInflightWaitSeconds() { /** * Unpin this object. If the object has been closed, this will release any underlying resources. */ - void unpin(); + void unpin() throws Exception; } /** diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 14765a65ff0b..8f44d5041b55 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -1627,7 +1627,7 @@ public void pin() { } @Override - public void unpin() { + public void unpin() throws Exception { boolean closeWriter; synchronized (this) { Preconditions.checkState(pins > 0, "Tried to unpin when pins==0"); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 2dfc8b2f1c00..600f49b0be3e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -47,6 +47,9 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; @@ -86,6 +89,9 @@ import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -116,17 +122,56 @@ public class StorageApiWriteUnshardedRecords private final Coder successfulRowsCoder; private final boolean autoUpdateSchema; private final boolean ignoreUnknownValues; + private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool(); private final BigQueryIO.Write.CreateDisposition createDisposition; private final @Nullable String kmsKey; private final boolean usesCdc; private final AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation; private final @Nullable Map bigLakeConfiguration; - private static final AppendClientCache APPEND_CLIENTS = - new AppendClientCache<>(Duration.standardMinutes(15)); + /** + * The Guava cache object is thread-safe. However our protocol requires that client pin the + * StreamAppendClient after looking up the cache, and we must ensure that the cache is not + * accessed in between the lookup and the pin (any access of the cache could trigger element + * expiration). Therefore most used of APPEND_CLIENTS should synchronize. + */ + private static final Cache APPEND_CLIENTS = + CacheBuilder.newBuilder() + .expireAfterAccess(15, TimeUnit.MINUTES) + .removalListener( + (RemovalNotification removal) -> { + LOG.info("Expiring append client for {}", removal.getKey()); + final @Nullable AppendClientInfo appendClientInfo = removal.getValue(); + if (appendClientInfo != null) { + appendClientInfo.close(); + } + }) + .build(); static void clearCache() { - APPEND_CLIENTS.clear(); + APPEND_CLIENTS.invalidateAll(); + } + + // Run a closure asynchronously, ignoring failures. + private interface ThrowingRunnable { + void run() throws Exception; + } + + private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunnable task) { + executor.submit( + () -> { + try { + task.run(); + } catch (Exception e) { + String msg = + e.toString() + + "\n" + + Arrays.stream(e.getStackTrace()) + .map(StackTraceElement::toString) + .collect(Collectors.joining("\n")); + System.err.println("Exception happened while executing async task. Ignoring: " + msg); + } + }); } public StorageApiWriteUnshardedRecords( @@ -317,9 +362,18 @@ public TableDestination getTableDestination() { void teardown() { maybeTickleCache(); - // if this is a PENDING stream, we won't be using it again after cleaning up this - // destination state, so clear it from the cache - invalidateAppendClient(!useDefaultStream); + if (appendClientInfo != null) { + StreamAppendClient client = appendClientInfo.getStreamAppendClient(); + if (client != null) { + runAsyncIgnoreFailure(closeWriterExecutor, client::unpin); + } + // if this is a PENDING stream, we won't be using it again after cleaning up this + // destination state, so clear it from the cache + if (!useDefaultStream) { + APPEND_CLIENTS.invalidate(streamName); + } + appendClientInfo = null; + } } String getDefaultStreamName() { @@ -365,7 +419,18 @@ AppendClientInfo generateClient(@Nullable TableSchema updatedSchema) throws Exce AppendClientInfo.of( schemaAndDescriptor.tableSchema, schemaAndDescriptor.descriptor, - AutoCloseable::close)); + // Make sure that the client is always closed in a different thread to avoid + // blocking. + client -> + runAsyncIgnoreFailure( + closeWriterExecutor, + () -> { + synchronized (APPEND_CLIENTS) { + // Remove the pin owned by the cache. + client.unpin(); + client.close(); + } + }))); CreateTableHelpers.createTableWrapper( () -> { @@ -381,6 +446,9 @@ AppendClientInfo generateClient(@Nullable TableSchema updatedSchema) throws Exce return null; }, tryCreateTable); + + // This pin is "owned" by the cache. + Preconditions.checkStateNotNull(appendClientInfo.get().getStreamAppendClient()).pin(); return appendClientInfo.get(); } @@ -447,14 +515,23 @@ AppendClientInfo getAppendClientInfo( try { if (this.appendClientInfo == null) { getOrCreateStreamName(); - this.appendClientInfo = - lookupCache - ? APPEND_CLIENTS.getAndPin( - getStreamAppendClientCacheEntryKey(), () -> generateClient(updatedSchema)) - : APPEND_CLIENTS.refreshObjectAndPin( + final AppendClientInfo newAppendClientInfo; + synchronized (APPEND_CLIENTS) { + if (lookupCache) { + newAppendClientInfo = + APPEND_CLIENTS.get( getStreamAppendClientCacheEntryKey(), () -> generateClient(updatedSchema)); + } else { + newAppendClientInfo = generateClient(updatedSchema); + // override the clients in the cache. + APPEND_CLIENTS.put(getStreamAppendClientCacheEntryKey(), newAppendClientInfo); + } + // This pin is "owned" by the current DoFn. + Preconditions.checkStateNotNull(newAppendClientInfo.getStreamAppendClient()).pin(); + } + nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1)); + this.appendClientInfo = newAppendClientInfo; } - nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1)); return Preconditions.checkStateNotNull(appendClientInfo); } catch (Exception e) { throw new RuntimeException(e); @@ -463,24 +540,37 @@ AppendClientInfo getAppendClientInfo( void maybeTickleCache() { if (appendClientInfo != null && Instant.now().isAfter(nextCacheTickle)) { - APPEND_CLIENTS.tickle(getStreamAppendClientCacheEntryKey()); + synchronized (APPEND_CLIENTS) { + APPEND_CLIENTS.getIfPresent(getStreamAppendClientCacheEntryKey()); + } nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1)); } } - void invalidateAppendClient(boolean invalidateCache) { - if (this.appendClientInfo != null) { - // Unpin in a different thread, as it may execute a blocking close. - StreamAppendClient client = appendClientInfo.getStreamAppendClient(); - if (client != null) { - APPEND_CLIENTS.unpinAsync(Preconditions.checkStateNotNull(this.appendClientInfo)); - } - if (invalidateCache) { - APPEND_CLIENTS.invalidate( - getStreamAppendClientCacheEntryKey(), - Preconditions.checkStateNotNull(this.appendClientInfo)); + void invalidateWriteStream() { + if (appendClientInfo != null) { + synchronized (APPEND_CLIENTS) { + // Unpin in a different thread, as it may execute a blocking close. + StreamAppendClient client = appendClientInfo.getStreamAppendClient(); + if (client != null) { + runAsyncIgnoreFailure(closeWriterExecutor, client::unpin); + } + // The default stream is cached across multiple different DoFns. If they all try and + // invalidate, then we can get races between threads invalidating and recreating + // streams. For this reason, + // we check to see that the cache still contains the object we created before + // invalidating (in case another + // thread has already invalidated and recreated the stream). + String cacheEntryKey = getStreamAppendClientCacheEntryKey(); + @Nullable + AppendClientInfo cachedAppendClient = APPEND_CLIENTS.getIfPresent(cacheEntryKey); + if (cachedAppendClient != null + && System.identityHashCode(cachedAppendClient) + == System.identityHashCode(appendClientInfo)) { + APPEND_CLIENTS.invalidate(cacheEntryKey); + } } - this.appendClientInfo = null; + appendClientInfo = null; } } @@ -587,7 +677,7 @@ long flush( LOG.info("Schema out of date: refreshing table schema for {}.", tableUrn); // Refresh our view of the schema and try again.. this.messageConverter.updateSchemaFromTable(); - invalidateAppendClient(true); + invalidateWriteStream(); this.appendClientInfo = Preconditions.checkStateNotNull( getAppendClientInfo( @@ -784,7 +874,7 @@ long flush( if (!quotaError) { // This forces us to close and reopen all gRPC connections to Storage API on error, // which empirically fixes random stuckness issues. - invalidateAppendClient(true); + invalidateWriteStream(); allowedRetry = 5; } else { allowedRetry = 35; @@ -941,11 +1031,7 @@ void postFlush() { TableSchemaUpdateUtils.getUpdatedSchema( this.messageConverter.getTableSchema(), updatedTableSchemaReturned); if (updatedTableSchema.isPresent()) { - invalidateAppendClient(false); - // TODO: This overwrites whatever is in the cache which can cause races between - // threads. - // A better approach would be to check the cache, and keep whichever schema is - // "larger". + invalidateWriteStream(); appendClientInfo = Preconditions.checkStateNotNull( getAppendClientInfo(false, updatedTableSchema.get())); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index cbace6e7ff40..5771ea5074b8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -37,18 +37,23 @@ import io.grpc.Status.Code; import java.io.IOException; import java.time.Instant; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -99,6 +104,9 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -141,12 +149,44 @@ public class StorageApiWritesShardedRecords succussfulRowsCoder; private final TupleTag> flushTag = new TupleTag<>("flushTag"); - - private static final AppendClientCache>> APPEND_CLIENTS = - new AppendClientCache<>(Duration.standardMinutes(5)); + private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool(); + + private static final Cache>, AppendClientInfo> APPEND_CLIENTS = + CacheBuilder.newBuilder() + .expireAfterAccess(5, TimeUnit.MINUTES) + .removalListener( + (RemovalNotification>, AppendClientInfo> removal) -> { + final @Nullable AppendClientInfo appendClientInfo = removal.getValue(); + if (appendClientInfo != null) { + appendClientInfo.close(); + } + }) + .build(); static void clearCache() { - APPEND_CLIENTS.clear(); + APPEND_CLIENTS.invalidateAll(); + } + + // Run a closure asynchronously, ignoring failures. + private interface ThrowingRunnable { + void run() throws Exception; + } + + private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunnable task) { + executor.submit( + () -> { + try { + task.run(); + } catch (Exception e) { + String msg = + e.toString() + + "\n" + + Arrays.stream(e.getStackTrace()) + .map(StackTraceElement::toString) + .collect(Collectors.joining("\n")); + System.err.println("Exception happened while executing async task. Ignoring: " + msg); + } + }); } public StorageApiWritesShardedRecords( @@ -242,6 +282,7 @@ static class AppendRowsContext extends RetryManager.Operation.Context { final ShardedKey key; String streamName = ""; + @Nullable StreamAppendClient client = null; long offset = -1; long numRows = 0; long tryIteration = 0; @@ -446,46 +487,6 @@ public void onTeardown() { } } - // Holder for an AppendClientHolder. Maintains a pin on the client as long as it's active. - private class AppendClientHolder implements AutoCloseable { - private final KV> key; - private final Callable appendClientInfoCallable; - - private AppendClientInfo appendClientInfo; - private boolean valid; - - public AppendClientHolder( - ShardedKey key, Callable appendClientInfoCallable) - throws Exception { - this.key = messageConverters.getAppendClientKey(key); - this.appendClientInfoCallable = appendClientInfoCallable; - this.appendClientInfo = APPEND_CLIENTS.getAndPin(this.key, appendClientInfoCallable); - this.valid = true; - } - - void invalidateAndReset() throws Exception { - APPEND_CLIENTS.unpinAsync(this.appendClientInfo); // Make sure to unpin in another thread. - APPEND_CLIENTS.invalidate(key); - this.appendClientInfo = APPEND_CLIENTS.getAndPin(key, appendClientInfoCallable); - } - - @Override - public void close() { - APPEND_CLIENTS.unpinAsync(this.appendClientInfo); // Make sure to unpin in another thread. - this.valid = false; - } - - AppendClientInfo get() { - org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState( - valid); - return appendClientInfo; - } - - StreamAppendClient getStreamAppendClient() { - return Preconditions.checkStateNotNull(appendClientInfo.getStreamAppendClient()); - } - } - private CreateRetryManagerResult createRetryManager( ShardedKey key, Iterable messages, @@ -553,214 +554,6 @@ private CreateRetryManagerResult createRetryManager( retryManager, failedRows, recordsAppended, histogramUpdates); } - private void handleAppendFailure( - Iterable> failedContexts, - TableReference tableReference, - String shortTableId, - AppendClientInfo appendClientInfo, - Callable tryCreateTable, - BiConsumer>, Boolean> initializeContexts, - Consumer>> clearClients, - ValueState streamOffset, - MultiOutputReceiver o) { - // The first context is always the one that fails. - AppendRowsContext failedContext = - Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null)); - BigQuerySinkMetrics.reportFailedRPCMetrics( - failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableId); - String errorCode = BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError()); - - // AppendSerializationError means that BigQuery detected errors on individual rows, e.g. - // a row not conforming to bigQuery invariants. These errors are persistent, so we redirect - // those rows to the - // failedInserts PCollection, and retry with the remaining rows. - if (failedContext.getError() != null - && failedContext.getError() instanceof Exceptions.AppendSerializationError) { - Exceptions.AppendSerializationError error = - Preconditions.checkArgumentNotNull( - (Exceptions.AppendSerializationError) failedContext.getError()); - - Set failedRowIndices = error.getRowIndexToErrorMessage().keySet(); - for (int failedIndex : failedRowIndices) { - // Convert the message to a TableRow and send it to the failedRows collection. - TableRow failedRow = failedContext.failsafeTableRows.get(failedIndex); - if (failedRow == null) { - ByteString protoBytes = failedContext.protoRows.getSerializedRows(failedIndex); - failedRow = appendClientInfo.toTableRow(protoBytes, Predicates.alwaysTrue()); - } - org.joda.time.Instant timestamp = failedContext.timestamps.get(failedIndex); - o.get(failedRowsTag) - .outputWithTimestamp( - new BigQueryStorageApiInsertError( - failedRow, - error.getRowIndexToErrorMessage().get(failedIndex), - tableReference), - timestamp); - } - int failedRows = failedRowIndices.size(); - rowsSentToFailedRowsCollection.inc(failedRows); - BigQuerySinkMetrics.appendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.FAILED, errorCode, shortTableId) - .inc(failedRows); - - // Remove the failed row from the payload, so we retry the batch without the failed - // rows. - ProtoRows.Builder retryRows = ProtoRows.newBuilder(); - @Nullable List timestamps = Lists.newArrayList(); - for (int i = 0; i < failedContext.protoRows.getSerializedRowsCount(); ++i) { - if (!failedRowIndices.contains(i)) { - ByteString rowBytes = failedContext.protoRows.getSerializedRows(i); - retryRows.addSerializedRows(rowBytes); - timestamps.add(failedContext.timestamps.get(i)); - } - } - failedContext.protoRows = retryRows.build(); - failedContext.timestamps = timestamps; - int retriedRows = failedContext.protoRows.getSerializedRowsCount(); - BigQuerySinkMetrics.appendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId) - .inc(retriedRows); - - // Since we removed rows, we need to update the insert offsets for all remaining rows. - long offset = failedContext.offset; - for (AppendRowsContext context : failedContexts) { - context.offset = offset; - offset += context.protoRows.getSerializedRowsCount(); - } - streamOffset.write(offset); - return; - } - - Throwable error = Preconditions.checkStateNotNull(failedContext.getError()); - Status.Code statusCode = Status.fromThrowable(error).getCode(); - - // This means that the offset we have stored does not match the current end of - // the stream in the Storage API. Usually this happens because a crash or a bundle - // failure - // happened after an append but before the worker could checkpoint it's - // state. The records that were appended in a failed bundle will be retried, - // meaning that the unflushed tail of the stream must be discarded to prevent - // duplicates. - boolean offsetMismatch = - statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS); - - boolean quotaError = statusCode.equals(Code.RESOURCE_EXHAUSTED); - if (!offsetMismatch) { - // Don't log errors for expected offset mismatch. These will be logged as warnings - // below. - LOG.error("Got error {} closing {}", failedContext.getError(), failedContext.streamName); - } - - try { - // TODO: Only do this on explicit NOT_FOUND errors once BigQuery reliably produces - // them. - tryCreateTable.call(); - } catch (Exception e) { - throw new RuntimeException(e); - } - - if (!quotaError) { - // For known errors (offset mismatch, not found) we must reestablish - // the streams. - // However we've seen that doing this fixes random stuckness issues by reestablishing - // gRPC connections, - // so we close the clients for all non-quota errors. - - clearClients.accept(failedContexts); - } - appendFailures.inc(); - int retriedRows = failedContext.protoRows.getSerializedRowsCount(); - BigQuerySinkMetrics.appendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId) - .inc(retriedRows); - - // Schema mismatched exceptions can happen if the table was recently updated. Since - // vortex caches schemas - // we might see the new schema before vortex does. In this case, we simply need to - // retry. - Exceptions.@Nullable StorageException storageException = Exceptions.toStorageException(error); - boolean schemaMismatchError = - (storageException instanceof Exceptions.SchemaMismatchedException); - if (!schemaMismatchError) { - // There's no special error code for missing required fields, and that can also happen - // due to vortex - // being delayed at seeing a new schema. We're forced to parse the description to - // determine that this has happened. - // TODO: Vortex team to introduce a special storage error code for this, so we don't - // have to parse - // descriptions. - Status status = Status.fromThrowable(error); - if (status.getCode() == Code.INVALID_ARGUMENT) { - String description = status.getDescription(); - schemaMismatchError = description != null && description.contains("incompatible fields"); - } - } - if (schemaMismatchError) { - LOG.info( - "Vortex failed stream open due to incompatible fields. This is likely because the BigQuery " - + "schema was recently updated and Vortex hasn't noticed yet, so retrying. error {}", - Preconditions.checkStateNotNull(error).toString()); - } - - boolean explicitStreamFinalized = - failedContext.getError() instanceof StreamFinalizedException; - // This implies that the stream doesn't exist or has already been finalized. In this - // case we have no choice but to create a new stream. - boolean streamDoesNotExist = - explicitStreamFinalized - || statusCode.equals(Code.INVALID_ARGUMENT) - || statusCode.equals(Code.NOT_FOUND) - || statusCode.equals(Code.FAILED_PRECONDITION); - streamDoesNotExist = streamDoesNotExist && !schemaMismatchError; - - if (offsetMismatch || streamDoesNotExist) { - appendOffsetFailures.inc(); - LOG.warn( - "Append to {} failed. Will retry with a new stream", - failedContext, - failedContext.getError()); - // Finalize the stream and clear streamName so a new stream will be created. - o.get(flushTag) - .output(KV.of(failedContext.streamName, new Operation(failedContext.offset - 1, true))); - // Reinitialize all contexts with the new stream and new offsets. - initializeContexts.accept(failedContexts, true); - - // Offset failures imply that all subsequent parallel appends will also fail. - // Retry them all. - } - } - - private void handleAppendSuccess( - AppendRowsContext context, - String shortTableId, - AppendClientInfo appendClientInfo, - MultiOutputReceiver o) { - AppendRowsResponse response = Preconditions.checkStateNotNull(context.getResult()); - o.get(flushTag) - .output( - KV.of( - context.streamName, - new Operation( - context.offset + context.protoRows.getSerializedRowsCount() - 1, false))); - int flushedRows = context.protoRows.getSerializedRowsCount(); - flushesScheduled.inc(flushedRows); - BigQuerySinkMetrics.reportSuccessfulRpcMetrics( - context, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableId); - BigQuerySinkMetrics.appendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.SUCCESSFUL, BigQuerySinkMetrics.OK, shortTableId) - .inc(flushedRows); - - if (successfulRowsTag != null) { - for (int i = 0; i < context.protoRows.getSerializedRowsCount(); ++i) { - ByteString protoBytes = context.protoRows.getSerializedRows(i); - org.joda.time.Instant timestamp = context.timestamps.get(i); - o.get(successfulRowsTag) - .outputWithTimestamp( - appendClientInfo.toTableRow(protoBytes, successfulRowsPredicate), timestamp); - } - } - } - @ProcessElement public void process( ProcessContext c, @@ -838,12 +631,10 @@ public void process( () -> { @Nullable TableSchema tableSchema; DescriptorProtos.DescriptorProto descriptor; - TableSchema updatedSchemaValue = autoUpdateSchema ? updatedSchema.read() : null; - + TableSchema updatedSchemaValue = updatedSchema.read(); if (autoUpdateSchema && updatedSchemaValue != null) { - // This means that Vortex has told us in the past that the table schema has been - // updated. We should use - // this updated schema instead of the initial schema from the messageConverter. + // We've seen an updated schema, so we use that instead of querying the + // MessageConverter. tableSchema = updatedSchemaValue; descriptor = TableRowToStorageApiProto.descriptorSchemaFromTableSchema( @@ -857,8 +648,8 @@ public void process( if (autoUpdateSchema) { // A StreamWriter ignores table schema updates that happen prior to its creation. // So before creating a StreamWriter below, we fetch the table schema to check if we - // missed an update. If so, use the new schema instead of the base schema. - // TODO: There's still a race here! + // missed an update. + // If so, use the new schema instead of the base schema @Nullable TableSchema streamSchema = MoreObjects.firstNonNull( @@ -880,222 +671,475 @@ public void process( AppendClientInfo.of( Preconditions.checkStateNotNull(tableSchema), descriptor, - AutoCloseable::close) + // Make sure that the client is always closed in a different thread + // to + // avoid blocking. + client -> + runAsyncIgnoreFailure( + closeWriterExecutor, + () -> { + // Remove the pin that is "owned" by the cache. + client.unpin(); + client.close(); + })) .withAppendClient( writeStreamService, getOrCreateStream, false, defaultMissingValueInterpretation); + // This pin is "owned" by the cache. + Preconditions.checkStateNotNull(info.getStreamAppendClient()).pin(); return info; }; - // The StreamWriter has two pins on it. The static cache holds a pin, as it continues to cache - // values after this - // method exits, so must hold the pin. The local AppendClientHolder also holds a pin, as the - // cache could in - // theory evict the object during execution and we want a pin held throughout the execution of - // this function. - try (AppendClientHolder appendClientHolder = - new AppendClientHolder(element.getKey(), getAppendClientInfo)) { - String currentStream = getOrCreateStream.get(); - if (!currentStream.equals(appendClientHolder.get().getStreamName())) { - // Cached append client is inconsistent with persisted state. Throw away cached item and - // force it to be recreated. - appendClientHolder.invalidateAndReset(); - } + AtomicReference appendClientInfo = + new AtomicReference<>( + APPEND_CLIENTS.get( + messageConverters.getAppendClientKey(element.getKey()), getAppendClientInfo)); + String currentStream = getOrCreateStream.get(); + if (!currentStream.equals(appendClientInfo.get().getStreamName())) { + // Cached append client is inconsistent with persisted state. Throw away cached item and + // force it to be + // recreated. + APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey())); + appendClientInfo.set( + APPEND_CLIENTS.get( + messageConverters.getAppendClientKey(element.getKey()), getAppendClientInfo)); + } - TableSchema updatedSchemaValue = autoUpdateSchema ? updatedSchema.read() : null; - if ((autoUpdateSchema && updatedSchemaValue != null) - && appendClientHolder.get().hasSchemaChanged(updatedSchemaValue)) { - appendClientHolder.invalidateAndReset(); + TableSchema updatedSchemaValue = updatedSchema.read(); + if (autoUpdateSchema && updatedSchemaValue != null) { + if (appendClientInfo.get().hasSchemaChanged(updatedSchemaValue)) { + appendClientInfo.set( + AppendClientInfo.of( + updatedSchemaValue, appendClientInfo.get().getCloseAppendClient(), false)); + APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey())); + APPEND_CLIENTS.put( + messageConverters.getAppendClientKey(element.getKey()), appendClientInfo.get()); } + } - // Initialize stream names and offsets for all contexts. This will be called initially, but - // will also be called if we roll over to a new stream on a retry. - BiConsumer>, Boolean> initializeContexts = - (contexts, isFailure) -> { - try { - if (isFailure) { - // Clear the stream name, forcing a new one to be created. - streamName.write(""); - } - String streamNameRead = Preconditions.checkArgumentNotNull(streamName.read()); - long currentOffset = Preconditions.checkArgumentNotNull(streamOffset.read()); - for (AppendRowsContext context : contexts) { - context.streamName = streamNameRead; - context.offset = currentOffset; - ++context.tryIteration; - currentOffset = context.offset + context.protoRows.getSerializedRowsCount(); + // Initialize stream names and offsets for all contexts. This will be called initially, but + // will also be called if we roll over to a new stream on a retry. + BiConsumer>, Boolean> initializeContexts = + (contexts, isFailure) -> { + try { + if (isFailure) { + // Clear the stream name, forcing a new one to be created. + streamName.write(""); + } + appendClientInfo.set( + appendClientInfo + .get() + .withAppendClient( + writeStreamService, + getOrCreateStream, + false, + defaultMissingValueInterpretation)); + StreamAppendClient streamAppendClient = + Preconditions.checkArgumentNotNull( + appendClientInfo.get().getStreamAppendClient()); + String streamNameRead = Preconditions.checkArgumentNotNull(streamName.read()); + long currentOffset = Preconditions.checkArgumentNotNull(streamOffset.read()); + for (AppendRowsContext context : contexts) { + context.streamName = streamNameRead; + streamAppendClient.pin(); + context.client = appendClientInfo.get().getStreamAppendClient(); + context.offset = currentOffset; + ++context.tryIteration; + currentOffset = context.offset + context.protoRows.getSerializedRowsCount(); + } + streamOffset.write(currentOffset); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + + Consumer>> clearClients = + (contexts) -> { + APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey())); + appendClientInfo.set(appendClientInfo.get().withNoAppendClient()); + APPEND_CLIENTS.put( + messageConverters.getAppendClientKey(element.getKey()), appendClientInfo.get()); + for (AppendRowsContext context : contexts) { + if (context.client != null) { + // Unpin in a different thread, as it may execute a blocking close. + runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin); + context.client = null; + } + } + }; + + Function, ApiFuture> runOperation = + context -> { + if (context.protoRows.getSerializedRowsCount() == 0) { + // This might happen if all rows in a batch failed and were sent to the failed-rows + // PCollection. + return ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build()); + } + try { + appendClientInfo.set( + appendClientInfo + .get() + .withAppendClient( + writeStreamService, + getOrCreateStream, + false, + defaultMissingValueInterpretation)); + return Preconditions.checkStateNotNull(appendClientInfo.get().getStreamAppendClient()) + .appendRows(context.offset, context.protoRows); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + + Function>, RetryType> onError = + failedContexts -> { + // The first context is always the one that fails. + AppendRowsContext failedContext = + Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null)); + BigQuerySinkMetrics.reportFailedRPCMetrics( + failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableId); + String errorCode = + BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError()); + + // AppendSerializationError means that BigQuery detected errors on individual rows, e.g. + // a row not conforming + // to bigQuery invariants. These errors are persistent, so we redirect those rows to the + // failedInserts + // PCollection, and retry with the remaining rows. + if (failedContext.getError() != null + && failedContext.getError() instanceof Exceptions.AppendSerializationError) { + Exceptions.AppendSerializationError error = + Preconditions.checkArgumentNotNull( + (Exceptions.AppendSerializationError) failedContext.getError()); + + Set failedRowIndices = error.getRowIndexToErrorMessage().keySet(); + for (int failedIndex : failedRowIndices) { + // Convert the message to a TableRow and send it to the failedRows collection. + TableRow failedRow = failedContext.failsafeTableRows.get(failedIndex); + if (failedRow == null) { + // TODO: MAKE SURE WE USE UPDATED DESCRIPTOR + ByteString protoBytes = failedContext.protoRows.getSerializedRows(failedIndex); + failedRow = + appendClientInfo.get().toTableRow(protoBytes, Predicates.alwaysTrue()); } - streamOffset.write(currentOffset); - } catch (Exception e) { - throw new RuntimeException(e); + org.joda.time.Instant timestamp = failedContext.timestamps.get(failedIndex); + o.get(failedRowsTag) + .outputWithTimestamp( + new BigQueryStorageApiInsertError( + failedRow, + error.getRowIndexToErrorMessage().get(failedIndex), + tableReference), + timestamp); } - }; - - Consumer>> clearClients = - (contexts) -> { - try { - appendClientHolder.invalidateAndReset(); - } catch (Exception e) { - throw new RuntimeException(e); + int failedRows = failedRowIndices.size(); + rowsSentToFailedRowsCollection.inc(failedRows); + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.FAILED, errorCode, shortTableId) + .inc(failedRows); + + // Remove the failed row from the payload, so we retry the batch without the failed + // rows. + ProtoRows.Builder retryRows = ProtoRows.newBuilder(); + @Nullable List timestamps = Lists.newArrayList(); + for (int i = 0; i < failedContext.protoRows.getSerializedRowsCount(); ++i) { + if (!failedRowIndices.contains(i)) { + ByteString rowBytes = failedContext.protoRows.getSerializedRows(i); + retryRows.addSerializedRows(rowBytes); + timestamps.add(failedContext.timestamps.get(i)); + } } - }; - - Function, ApiFuture> runOperation = - context -> { - if (context.protoRows.getSerializedRowsCount() == 0) { - // This might happen if all rows in a batch failed and were sent to the failed-rows - // PCollection. - return ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build()); + failedContext.protoRows = retryRows.build(); + failedContext.timestamps = timestamps; + int retriedRows = failedContext.protoRows.getSerializedRowsCount(); + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId) + .inc(retriedRows); + + // Since we removed rows, we need to update the insert offsets for all remaining rows. + long offset = failedContext.offset; + for (AppendRowsContext context : failedContexts) { + context.offset = offset; + offset += context.protoRows.getSerializedRowsCount(); } - try { - return appendClientHolder - .getStreamAppendClient() - .appendRows(context.offset, context.protoRows); - } catch (Exception e) { - throw new RuntimeException(e); + streamOffset.write(offset); + return RetryType.RETRY_ALL_OPERATIONS; + } + + Throwable error = Preconditions.checkStateNotNull(failedContext.getError()); + + Status.Code statusCode = Status.fromThrowable(error).getCode(); + + // This means that the offset we have stored does not match the current end of + // the stream in the Storage API. Usually this happens because a crash or a bundle + // failure + // happened after an append but before the worker could checkpoint it's + // state. The records that were appended in a failed bundle will be retried, + // meaning that the unflushed tail of the stream must be discarded to prevent + // duplicates. + boolean offsetMismatch = + statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS); + + boolean quotaError = statusCode.equals(Code.RESOURCE_EXHAUSTED); + if (!offsetMismatch) { + // Don't log errors for expected offset mismatch. These will be logged as warnings + // below. + LOG.error( + "Got error {} closing {}", failedContext.getError(), failedContext.streamName); + } + + try { + // TODO: Only do this on explicit NOT_FOUND errors once BigQuery reliably produces + // them. + tryCreateTable.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + if (!quotaError) { + // For known errors (offset mismatch, not found) we must reestablish + // the streams. + // However we've seen that doing this fixes random stuckness issues by reestablishing + // gRPC connections, + // so we close the clients for all non-quota errors. + + clearClients.accept(failedContexts); + } + appendFailures.inc(); + int retriedRows = failedContext.protoRows.getSerializedRowsCount(); + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId) + .inc(retriedRows); + + // Schema mismatched exceptions can happen if the table was recently updated. Since + // vortex caches schemas + // we might see the new schema before vortex does. In this case, we simply need to + // retry. + Exceptions.@Nullable StorageException storageException = + Exceptions.toStorageException(error); + boolean schemaMismatchError = + (storageException instanceof Exceptions.SchemaMismatchedException); + if (!schemaMismatchError) { + // There's no special error code for missing required fields, and that can also happen + // due to vortex + // being delayed at seeing a new schema. We're forced to parse the description to + // determine that this has happened. + // TODO: Vortex team to introduce a special storage error code for this, so we don't + // have to parse + // descriptions. + Status status = Status.fromThrowable(error); + if (status.getCode() == Code.INVALID_ARGUMENT) { + String description = status.getDescription(); + schemaMismatchError = + description != null && description.contains("incompatible fields"); } - }; - - Function>, RetryType> onError = - failedContexts -> { - handleAppendFailure( - failedContexts, - tableReference, - shortTableId, - appendClientHolder.get(), - tryCreateTable, - initializeContexts, - clearClients, - streamOffset, - o); + } + if (schemaMismatchError) { + LOG.info( + "Vortex failed stream open due to incompatible fields. This is likely because the BigTable " + + "schema was recently updated and Vortex hasn't noticed yet, so retrying. error {}", + Preconditions.checkStateNotNull(error).toString()); + } + + boolean explicitStreamFinalized = + failedContext.getError() instanceof StreamFinalizedException; + // This implies that the stream doesn't exist or has already been finalized. In this + // case we have no choice but to create a new stream. + boolean streamDoesNotExist = + explicitStreamFinalized + || statusCode.equals(Code.INVALID_ARGUMENT) + || statusCode.equals(Code.NOT_FOUND) + || statusCode.equals(Code.FAILED_PRECONDITION); + streamDoesNotExist = streamDoesNotExist && !schemaMismatchError; + + if (offsetMismatch || streamDoesNotExist) { + appendOffsetFailures.inc(); + LOG.warn( + "Append to {} failed. Will retry with a new stream", + failedContext, + failedContext.getError()); + // Finalize the stream and clear streamName so a new stream will be created. + o.get(flushTag) + .output( + KV.of( + failedContext.streamName, new Operation(failedContext.offset - 1, true))); + // Reinitialize all contexts with the new stream and new offsets. + initializeContexts.accept(failedContexts, true); + + // Offset failures imply that all subsequent parallel appends will also fail. + // Retry them all. return RetryType.RETRY_ALL_OPERATIONS; - }; - Consumer> onSuccess = - context -> handleAppendSuccess(context, shortTableId, appendClientHolder.get(), o); - - BackOff backoff = - FluentBackoff.DEFAULT - .withInitialBackoff(Duration.standardSeconds(1)) - .withMaxBackoff(Duration.standardMinutes(1)) - .withMaxRetries(500) - .withThrottledTimeCounter( - BigQuerySinkMetrics.throttledTimeCounter( - BigQuerySinkMetrics.RpcMethod.OPEN_WRITE_STREAM)) - .backoff(); - CreateRetryManagerResult createRetryManagerResult; - do { - // Each ProtoRows object contains at most 1MB of rows. - // TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely - // if - // already proto or already schema. - Iterable messages = - new SplittingIterable( - element.getValue(), - splitSize, - // Unknown field merger - (bytes, tableRow) -> - appendClientHolder.get().mergeNewFields(bytes, tableRow, ignoreUnknownValues), - // Convert back to TableRow - bytes -> appendClientHolder.get().toTableRow(bytes, Predicates.alwaysTrue()), - // Failed rows consumer - (failedRow, errorMessage) -> { - o.get(failedRowsTag) - .outputWithTimestamp( - new BigQueryStorageApiInsertError( - failedRow.getValue(), errorMessage, tableReference), - failedRow.getTimestamp()); - rowsSentToFailedRowsCollection.inc(); - BigQuerySinkMetrics.appendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.FAILED, - BigQuerySinkMetrics.PAYLOAD_TOO_LARGE, - shortTableId) - .inc(1); - }, - // Get the currently-known TableSchema hash - () -> appendClientHolder.get().getTableSchemaHash(), - () -> - TableRowToStorageApiProto.wrapDescriptorProto( - messageConverter.getDescriptor(false)), - autoUpdateSchema, - elementTs); - - createRetryManagerResult = - createRetryManager( - element.getKey(), - messages, - runOperation, - onError, - onSuccess, - appendClientHolder.get(), - tableReference); - if (createRetryManagerResult.getSchemaMismatchSeen()) { - // TODO: The call to updateSchemaFromTable will throttle the DoFn (both because of the - // RPC - // call and because - // the cache has a delay on refresh). We should update throttling counters here as well. - LOG.info("Schema out of date: refreshing table schema for {}", tableId); - // Force the message converter to get the schema again from the table. - messageConverter.updateSchemaFromTable(); - // Close all RPC clients that were opened with the old descriptor. Clear the cache, - // forcing us to create a new append client with the updated descriptor. - appendClientHolder.invalidateAndReset(); - } - } while (createRetryManagerResult.getSchemaMismatchSeen() - && BackOffUtils.next(Sleeper.DEFAULT, backoff)); - - // Output any rows that failed along they way. - createRetryManagerResult - .getFailedRows() - .forEach( - tv -> o.get(failedRowsTag).outputWithTimestamp(tv.getValue(), tv.getTimestamp())); - rowsSentToFailedRowsCollection.inc(createRetryManagerResult.getFailedRows().size()); - BigQuerySinkMetrics.appendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.FAILED, - BigQuerySinkMetrics.PAYLOAD_TOO_LARGE, - shortTableId) - .inc(createRetryManagerResult.getFailedRows().size()); - - recordsAppended.inc(createRetryManagerResult.getRecordsAppended()); - createRetryManagerResult.getHistogramValues().forEach(appendSizeDistribution::update); - - Instant now = Instant.now(); - - RetryManager> retryManager = - Preconditions.checkStateNotNull(createRetryManagerResult.getRetryManager()); - int numAppends = retryManager.getRemainingOperationCount(); - Iterable> contexts = retryManager.getRemainingContexts(); - - if (numAppends > 0) { - initializeContexts.accept(contexts, false); - retryManager.run(true); + } + + return RetryType.RETRY_ALL_OPERATIONS; + }; - appendSplitDistribution.update(numAppends); - if (autoUpdateSchema) { - @Nullable - StreamAppendClient streamAppendClient = appendClientHolder.getStreamAppendClient(); - TableSchema originalSchema = appendClientHolder.get().getTableSchema(); - - @Nullable - TableSchema updatedSchemaReturned = - (streamAppendClient != null) ? streamAppendClient.getUpdatedSchema() : null; - // Update the table schema and clear the append client. - if (updatedSchemaReturned != null) { - Optional newSchema = - TableSchemaUpdateUtils.getUpdatedSchema(originalSchema, updatedSchemaReturned); - if (newSchema.isPresent()) { - APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey())); - LOG.debug( - "Fetched updated schema for table {}:\n\t{}", tableId, updatedSchemaReturned); - updatedSchema.write(newSchema.get()); + Consumer> onSuccess = + context -> { + AppendRowsResponse response = Preconditions.checkStateNotNull(context.getResult()); + o.get(flushTag) + .output( + KV.of( + context.streamName, + new Operation( + context.offset + context.protoRows.getSerializedRowsCount() - 1, + false))); + int flushedRows = context.protoRows.getSerializedRowsCount(); + flushesScheduled.inc(flushedRows); + BigQuerySinkMetrics.reportSuccessfulRpcMetrics( + context, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableId); + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.SUCCESSFUL, BigQuerySinkMetrics.OK, shortTableId) + .inc(flushedRows); + + if (successfulRowsTag != null) { + for (int i = 0; i < context.protoRows.getSerializedRowsCount(); ++i) { + ByteString protoBytes = context.protoRows.getSerializedRows(i); + org.joda.time.Instant timestamp = context.timestamps.get(i); + o.get(successfulRowsTag) + .outputWithTimestamp( + appendClientInfo.get().toTableRow(protoBytes, successfulRowsPredicate), + timestamp); } } - } + }; + + BackOff backoff = + FluentBackoff.DEFAULT + .withInitialBackoff(Duration.standardSeconds(1)) + .withMaxBackoff(Duration.standardMinutes(1)) + .withMaxRetries(500) + .withThrottledTimeCounter( + BigQuerySinkMetrics.throttledTimeCounter( + BigQuerySinkMetrics.RpcMethod.OPEN_WRITE_STREAM)) + .backoff(); + CreateRetryManagerResult createRetryManagerResult; + do { + // Each ProtoRows object contains at most 1MB of rows. + // TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely if + // already proto or already schema. + Iterable messages = + new SplittingIterable( + element.getValue(), + splitSize, + // Unknown field merger + (bytes, tableRow) -> + appendClientInfo.get().mergeNewFields(bytes, tableRow, ignoreUnknownValues), + // Convert back to TableRow + bytes -> appendClientInfo.get().toTableRow(bytes, Predicates.alwaysTrue()), + // Failed rows consumer + (failedRow, errorMessage) -> { + o.get(failedRowsTag) + .outputWithTimestamp( + new BigQueryStorageApiInsertError( + failedRow.getValue(), errorMessage, tableReference), + failedRow.getTimestamp()); + rowsSentToFailedRowsCollection.inc(); + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.FAILED, + BigQuerySinkMetrics.PAYLOAD_TOO_LARGE, + shortTableId) + .inc(1); + }, + // Get the currently-known TableSchema hash + () -> appendClientInfo.get().getTableSchemaHash(), + () -> + TableRowToStorageApiProto.wrapDescriptorProto( + messageConverter.getDescriptor(false)), + autoUpdateSchema, + elementTs); + + createRetryManagerResult = + createRetryManager( + element.getKey(), + messages, + runOperation, + onError, + onSuccess, + appendClientInfo.get(), + tableReference); + if (createRetryManagerResult.getSchemaMismatchSeen()) { + // TODO: The call to updateSchemaFromTable will throttle the DoFn (both because of the RPC + // call and because + // the cache has a delay on refresh). We should update throttling counters here as well. + LOG.info("Schema out of date: refreshing table schema for {}", tableId); + // Force the message converter to get the schema again from the table. + messageConverter.updateSchemaFromTable(); + // Close all RPC clients that were opened with the old descriptor. Clear the cache, + // forcing us to create a new append client with the updated descriptor. + APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey())); + appendClientInfo.set( + APPEND_CLIENTS.get( + messageConverters.getAppendClientKey(element.getKey()), getAppendClientInfo)); + } + } while (createRetryManagerResult.getSchemaMismatchSeen() + && BackOffUtils.next(Sleeper.DEFAULT, backoff)); + + // Output any rows that failed along they way. + createRetryManagerResult + .getFailedRows() + .forEach( + tv -> o.get(failedRowsTag).outputWithTimestamp(tv.getValue(), tv.getTimestamp())); + rowsSentToFailedRowsCollection.inc(createRetryManagerResult.getFailedRows().size()); + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.FAILED, + BigQuerySinkMetrics.PAYLOAD_TOO_LARGE, + shortTableId) + .inc(createRetryManagerResult.getFailedRows().size()); + + recordsAppended.inc(createRetryManagerResult.getRecordsAppended()); + createRetryManagerResult.getHistogramValues().forEach(appendSizeDistribution::update); + + Instant now = Instant.now(); - java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now()); - appendLatencyDistribution.update(timeElapsed.toMillis()); + RetryManager> retryManager = + Preconditions.checkStateNotNull(createRetryManagerResult.getRetryManager()); + int numAppends = retryManager.getRemainingOperationCount(); + Iterable> contexts = retryManager.getRemainingContexts(); + + if (numAppends > 0) { + initializeContexts.accept(contexts, false); + try { + retryManager.run(true); + } finally { + // Make sure that all pins are removed. + for (AppendRowsContext context : contexts) { + if (context.client != null) { + runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin); + } + } + } + appendSplitDistribution.update(numAppends); + + if (autoUpdateSchema) { + @Nullable + StreamAppendClient streamAppendClient = appendClientInfo.get().getStreamAppendClient(); + TableSchema originalSchema = appendClientInfo.get().getTableSchema(); + ; + @Nullable + TableSchema updatedSchemaReturned = + (streamAppendClient != null) ? streamAppendClient.getUpdatedSchema() : null; + // Update the table schema and clear the append client. + if (updatedSchemaReturned != null) { + Optional newSchema = + TableSchemaUpdateUtils.getUpdatedSchema(originalSchema, updatedSchemaReturned); + if (newSchema.isPresent()) { + appendClientInfo.set( + AppendClientInfo.of( + newSchema.get(), appendClientInfo.get().getCloseAppendClient(), false)); + APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey())); + APPEND_CLIENTS.put( + messageConverters.getAppendClientKey(element.getKey()), appendClientInfo.get()); + LOG.debug( + "Fetched updated schema for table {}:\n\t{}", tableId, updatedSchemaReturned); + updatedSchema.write(newSchema.get()); + } + } } + + java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now()); + appendLatencyDistribution.update(timeElapsed.toMillis()); } idleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index 814f4eec421f..1849df422425 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -764,7 +764,7 @@ public StreamAppendClient getStreamAppendClient( AppendRowsRequest.MissingValueInterpretation missingValueInterpretation) throws Exception { return new StreamAppendClient() { - private Descriptor protoDescriptor = null; + private Descriptor protoDescriptor; private TableSchema currentSchema; private @Nullable com.google.cloud.bigquery.storage.v1.TableSchema updatedSchema; TableRowToStorageApiProto.SchemaInformation schemaInformation; @@ -900,7 +900,7 @@ public void close() throws Exception {} public void pin() {} @Override - public void unpin() {} + public void unpin() throws Exception {} }; }