Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,13 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.beam.sdk.function.ThrowingConsumer;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Container class used by {@link StorageApiWritesShardedRecords} and {@link
Expand All @@ -46,16 +42,14 @@
*/
@AutoValue
abstract class AppendClientInfo {
private static final Logger LOG = LoggerFactory.getLogger(AppendClientInfo.class);

private final Counter activeStreamAppendClients =
Metrics.counter(AppendClientInfo.class, "activeStreamAppendClients");

abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient();

abstract TableSchema getTableSchema();

abstract ThrowingConsumer<Exception, BigQueryServices.StreamAppendClient> getCloseAppendClient();
abstract Consumer<BigQueryServices.StreamAppendClient> getCloseAppendClient();

abstract com.google.api.services.bigquery.model.TableSchema getJsonTableSchema();

Expand All @@ -71,8 +65,7 @@ abstract static class Builder {

abstract Builder setTableSchema(TableSchema value);

abstract Builder setCloseAppendClient(
ThrowingConsumer<Exception, BigQueryServices.StreamAppendClient> value);
abstract Builder setCloseAppendClient(Consumer<BigQueryServices.StreamAppendClient> value);

abstract Builder setJsonTableSchema(com.google.api.services.bigquery.model.TableSchema value);

Expand All @@ -90,7 +83,7 @@ abstract Builder setCloseAppendClient(
static AppendClientInfo of(
TableSchema tableSchema,
DescriptorProtos.DescriptorProto descriptor,
ThrowingConsumer<Exception, BigQueryServices.StreamAppendClient> closeAppendClient)
Consumer<BigQueryServices.StreamAppendClient> closeAppendClient)
throws Exception {
return new AutoValue_AppendClientInfo.Builder()
.setTableSchema(tableSchema)
Expand All @@ -104,7 +97,7 @@ static AppendClientInfo of(

static AppendClientInfo of(
TableSchema tableSchema,
ThrowingConsumer<Exception, BigQueryServices.StreamAppendClient> closeAppendClient,
Consumer<BigQueryServices.StreamAppendClient> closeAppendClient,
boolean includeCdcColumns)
throws Exception {
return of(
Expand Down Expand Up @@ -141,12 +134,7 @@ public AppendClientInfo withAppendClient(
public void close() {
BigQueryServices.StreamAppendClient client = getStreamAppendClient();
if (client != null) {
try {
getCloseAppendClient().accept(client);
} catch (Exception e) {
// We ignore errors when closing clients.
LOG.warn("Caught exception whilw trying to close append client. Ignoring", e);
}
getCloseAppendClient().accept(client);
activeStreamAppendClients.dec();
}
}
Expand Down Expand Up @@ -211,32 +199,4 @@ public TableRow toTableRow(ByteString protoBytes, Predicate<String> includeField
throw new RuntimeException(e);
}
}

public void pinAppendClient() {
BigQueryServices.StreamAppendClient client =
Preconditions.checkStateNotNull(getStreamAppendClient());
client.pin();
}

public void unpinAppendClient(@Nullable ExecutorService executor) {
BigQueryServices.StreamAppendClient client =
Preconditions.checkStateNotNull(getStreamAppendClient());
if (executor != null) {
runAsyncIgnoreFailure(executor, client::unpin);
} else {
client.unpin();
}
}

@SuppressWarnings({"FutureReturnValueIgnored"})
private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunnable task) {
executor.submit(
() -> {
try {
task.run();
} catch (Throwable e) {
LOG.info("Exception happened while executing async task. Ignoring: ", e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ default long getInflightWaitSeconds() {
/**
* Unpin this object. If the object has been closed, this will release any underlying resources.
*/
void unpin();
void unpin() throws Exception;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1627,7 +1627,7 @@ public void pin() {
}

@Override
public void unpin() {
public void unpin() throws Exception {
boolean closeWriter;
synchronized (this) {
Preconditions.checkState(pins > 0, "Tried to unpin when pins==0");
Expand Down
Loading
Loading