Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,13 @@
import com.google.cloud.RetryHelper;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.ServiceOptions;
import com.google.cloud.TransportOptions;
import com.google.cloud.datastore.execution.AggregationQueryExecutor;
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.cloud.datastore.telemetry.MetricsRecorder;
import com.google.cloud.datastore.telemetry.DatastoreMetricsRecorder;
import com.google.cloud.datastore.telemetry.TelemetryConstants;
import com.google.cloud.datastore.telemetry.TelemetryUtils;
import com.google.cloud.datastore.telemetry.TraceUtil;
import com.google.cloud.datastore.telemetry.TraceUtil.Scope;
import com.google.cloud.http.HttpTransportOptions;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
Expand All @@ -75,7 +73,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
Expand All @@ -101,16 +98,13 @@ final class DatastoreImpl extends BaseService<DatastoreOptions> implements Datas

private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil =
getOptions().getTraceUtil();
private final MetricsRecorder metricsRecorder = getOptions().getMetricsRecorder();
private final boolean isHttpTransport;

private final DatastoreMetricsRecorder metricsRecorder = getOptions().getMetricsRecorder();
private final ReadOptionProtoPreparer readOptionProtoPreparer;
private final AggregationQueryExecutor aggregationQueryExecutor;

DatastoreImpl(DatastoreOptions options) {
super(options);
this.datastoreRpc = options.getDatastoreRpcV1();
this.isHttpTransport = options.getTransportOptions() instanceof HttpTransportOptions;
retrySettings =
MoreObjects.firstNonNull(options.getRetrySettings(), ServiceOptions.getNoRetrySettings());

Expand Down Expand Up @@ -185,15 +179,15 @@ public boolean isClosed() {
static class ReadWriteTransactionCallable<T> implements Callable<T> {
private final Datastore datastore;
private final TransactionCallable<T> callable;
private final MetricsRecorder metricsRecorder;
private final DatastoreMetricsRecorder metricsRecorder;
private volatile TransactionOptions options;
private volatile Transaction transaction;

ReadWriteTransactionCallable(
Datastore datastore,
TransactionCallable<T> callable,
TransactionOptions options,
MetricsRecorder metricsRecorder) {
DatastoreMetricsRecorder metricsRecorder) {
this.datastore = datastore;
this.callable = callable;
this.options = options;
Expand Down Expand Up @@ -227,7 +221,7 @@ public T call() throws DatastoreException {
}
throw DatastoreException.propagateUserException(ex);
} finally {
recordAttempt(attemptStatus, datastore.getOptions().getTransportOptions());
recordAttempt(attemptStatus);
// If the transaction is active, then commit the rollback. If it was already successfully
// rolled back, the transaction is inactive (prevents rolling back an already rolled back
// transaction).
Expand All @@ -245,18 +239,10 @@ public T call() throws DatastoreException {
* Records a single transaction commit attempt with the given status code. This is called once
* per invocation of {@link #call()}, capturing the outcome of each individual commit attempt.
*/
private void recordAttempt(String status, TransportOptions transportOptions) {
Map<String, String> attributes = new HashMap<>();
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status);
attributes.put(
TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_COMMIT);
attributes.put(
TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, datastore.getOptions().getProjectId());
attributes.put(
TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, datastore.getOptions().getDatabaseId());
attributes.put(
TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT,
TelemetryConstants.getTransportName(transportOptions));
private void recordAttempt(String status) {
Map<String, String> attributes =
TelemetryUtils.buildMetricAttributes(
TelemetryConstants.METHOD_TRANSACTION_COMMIT, status);
metricsRecorder.recordTransactionAttemptCount(1, attributes);
}
}
Expand Down Expand Up @@ -293,15 +279,8 @@ public <T> T runInTransaction(
throw DatastoreException.translateAndThrow(e);
} finally {
long latencyMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
Map<String, String> attributes = new HashMap<>();
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status);
attributes.put(
TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_RUN);
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, getOptions().getProjectId());
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, getOptions().getDatabaseId());
attributes.put(
TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT,
TelemetryConstants.getTransportName(getOptions().getTransportOptions()));
Map<String, String> attributes =
TelemetryUtils.buildMetricAttributes(TelemetryConstants.METHOD_TRANSACTION_RUN, status);
metricsRecorder.recordTransactionLatency(latencyMs, attributes);
span.end();
}
Expand Down Expand Up @@ -805,15 +784,12 @@ private <T> T runWithObservability(
ResultRetryAlgorithm<?> exceptionHandler) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName);

// Gax already records operation and attempt metrics. Since Datastore HttpJson does not
// integrate with Gax, manually instrument these metrics when using HttpJson for parity
Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null;
Stopwatch operationStopwatch = Stopwatch.createStarted();
String operationStatus = StatusCode.Code.OK.toString();

DatastoreOptions options = getOptions();
Callable<T> attemptCallable =
TelemetryUtils.attemptMetricsCallable(
callable, metricsRecorder, options, isHttpTransport, methodName);
TelemetryUtils.attemptMetricsCallable(callable, metricsRecorder, methodName);
try (TraceUtil.Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
attemptCallable, retrySettings, exceptionHandler, options.getClock());
Expand All @@ -823,12 +799,7 @@ private <T> T runWithObservability(
throw DatastoreException.translateAndThrow(e);
} finally {
TelemetryUtils.recordOperationMetrics(
metricsRecorder,
options,
isHttpTransport,
operationStopwatch,
methodName,
operationStatus);
metricsRecorder, operationStopwatch, methodName, operationStatus);
span.end();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.cloud.datastore.spi.v1.GrpcDatastoreRpc;
import com.google.cloud.datastore.spi.v1.HttpDatastoreRpc;
import com.google.cloud.datastore.telemetry.MetricsRecorder;
import com.google.cloud.datastore.telemetry.DatastoreMetricsRecorder;
import com.google.cloud.datastore.v1.DatastoreSettings;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.http.HttpTransportOptions;
Expand Down Expand Up @@ -65,7 +65,7 @@ public class DatastoreOptions extends ServiceOptions<Datastore, DatastoreOptions

private final transient @Nonnull DatastoreOpenTelemetryOptions openTelemetryOptions;
private final transient @Nonnull com.google.cloud.datastore.telemetry.TraceUtil traceUtil;
private final transient @Nonnull MetricsRecorder metricsRecorder;
private final transient @Nonnull DatastoreMetricsRecorder metricsRecorder;

public static class DefaultDatastoreFactory implements DatastoreFactory {

Expand Down Expand Up @@ -107,7 +107,7 @@ public DatastoreOpenTelemetryOptions getOpenTelemetryOptions() {
}

@Nonnull
MetricsRecorder getMetricsRecorder() {
DatastoreMetricsRecorder getMetricsRecorder() {
return metricsRecorder;
}

Expand Down Expand Up @@ -193,7 +193,7 @@ public Builder setDatabaseId(String databaseId) {
}

/**
* Sets the {@link DatastoreOpenTelemetryOptions} to be used for this Firestore instance.
* Sets the {@link DatastoreOpenTelemetryOptions} to be used for this Datastore instance.
*
* @param openTelemetryOptions The `DatastoreOpenTelemetryOptions` to use.
*/
Expand Down Expand Up @@ -223,7 +223,7 @@ private DatastoreOptions(Builder builder) {
? builder.openTelemetryOptions
: DatastoreOpenTelemetryOptions.newBuilder().build();
this.traceUtil = com.google.cloud.datastore.telemetry.TraceUtil.getInstance(this);
this.metricsRecorder = MetricsRecorder.getInstance(openTelemetryOptions);
this.metricsRecorder = DatastoreMetricsRecorder.getInstance(this);

namespace = MoreObjects.firstNonNull(builder.namespace, defaultNamespace());
databaseId = MoreObjects.firstNonNull(builder.databaseId, DEFAULT_DATABASE_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@
import com.google.cloud.RetryHelper;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.cloud.datastore.telemetry.MetricsRecorder;
import com.google.cloud.datastore.telemetry.NoOpMetricsRecorder;
import com.google.cloud.datastore.telemetry.DatastoreMetricsRecorder;
import com.google.cloud.datastore.telemetry.NoOpDatastoreMetricsRecorder;
import com.google.cloud.datastore.telemetry.TelemetryConstants;
import com.google.cloud.datastore.telemetry.TelemetryUtils;
import com.google.cloud.datastore.telemetry.TraceUtil;
import com.google.cloud.http.HttpTransportOptions;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.datastore.v1.AllocateIdsRequest;
Expand Down Expand Up @@ -62,8 +61,7 @@ public class RetryAndTraceDatastoreRpcDecorator implements DatastoreRpc {
private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil;
private final RetrySettings retrySettings;
private final DatastoreOptions datastoreOptions;
private final MetricsRecorder metricsRecorder;
private final boolean isHttpTransport;
private final DatastoreMetricsRecorder metricsRecorder;

@ObsoleteApi("Prefer to create RetryAndTraceDatastoreRpcDecorator via the Builder")
public RetryAndTraceDatastoreRpcDecorator(
Expand All @@ -75,8 +73,7 @@ public RetryAndTraceDatastoreRpcDecorator(
this.retrySettings = retrySettings;
this.datastoreOptions = datastoreOptions;
this.otelTraceUtil = otelTraceUtil;
this.metricsRecorder = new NoOpMetricsRecorder();
this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions;
this.metricsRecorder = new NoOpDatastoreMetricsRecorder();
}

private RetryAndTraceDatastoreRpcDecorator(Builder builder) {
Expand All @@ -85,7 +82,6 @@ private RetryAndTraceDatastoreRpcDecorator(Builder builder) {
this.retrySettings = builder.retrySettings;
this.datastoreOptions = builder.datastoreOptions;
this.metricsRecorder = builder.metricsRecorder;
this.isHttpTransport = builder.isHttpTransport;
}

public static Builder newBuilder() {
Expand All @@ -99,8 +95,7 @@ public static class Builder {
private DatastoreOptions datastoreOptions;

// Defaults configured for this class
private MetricsRecorder metricsRecorder = new NoOpMetricsRecorder();
private boolean isHttpTransport = false;
private DatastoreMetricsRecorder metricsRecorder = new NoOpDatastoreMetricsRecorder();

private Builder() {}

Expand All @@ -124,7 +119,7 @@ public Builder setDatastoreOptions(DatastoreOptions datastoreOptions) {
return this;
}

public Builder setMetricsRecorder(MetricsRecorder metricsRecorder) {
public Builder setMetricsRecorder(DatastoreMetricsRecorder metricsRecorder) {
Preconditions.checkNotNull(metricsRecorder, "metricsRecorder can not be null");
this.metricsRecorder = metricsRecorder;
return this;
Expand All @@ -135,7 +130,6 @@ public RetryAndTraceDatastoreRpcDecorator build() {
Preconditions.checkNotNull(otelTraceUtil, "otelTraceUtil is required");
Preconditions.checkNotNull(retrySettings, "retrySettings is required");
Preconditions.checkNotNull(datastoreOptions, "datastoreOptions is required");
this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions;
return new RetryAndTraceDatastoreRpcDecorator(this);
}
}
Expand Down Expand Up @@ -207,12 +201,11 @@ public <O> O invokeRpc(Callable<O> block, String startSpan) {

<O> O invokeRpc(Callable<O> block, String startSpan, String methodName) {
TraceUtil.Span span = otelTraceUtil.startSpan(startSpan);
Stopwatch stopwatch = isHttpTransport ? Stopwatch.createStarted() : null;
Stopwatch stopwatch = Stopwatch.createStarted();
String operationStatus = StatusCode.Code.UNKNOWN.toString();
try (TraceUtil.Scope ignored = span.makeCurrent()) {
Callable<O> callable =
TelemetryUtils.attemptMetricsCallable(
block, metricsRecorder, datastoreOptions, isHttpTransport, methodName);
TelemetryUtils.attemptMetricsCallable(block, metricsRecorder, methodName);
O result =
RetryHelper.runWithRetries(
callable, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock());
Expand All @@ -224,12 +217,7 @@ <O> O invokeRpc(Callable<O> block, String startSpan, String methodName) {
throw DatastoreException.translateAndThrow(e);
} finally {
TelemetryUtils.recordOperationMetrics(
metricsRecorder,
datastoreOptions,
isHttpTransport,
stopwatch,
methodName,
operationStatus);
metricsRecorder, stopwatch, methodName, operationStatus);
span.end();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,9 @@
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.NoHeaderProvider;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.tracing.MetricsTracerFactory;
import com.google.api.gax.tracing.OpenTelemetryMetricsRecorder;
import com.google.cloud.ServiceOptions;
import com.google.cloud.datastore.DatastoreException;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.cloud.datastore.telemetry.TelemetryConstants;
import com.google.cloud.datastore.v1.DatastoreSettings;
import com.google.cloud.datastore.v1.stub.DatastoreStubSettings;
import com.google.cloud.datastore.v1.stub.GrpcDatastoreStub;
Expand All @@ -61,12 +58,8 @@
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

@InternalApi
public class GrpcDatastoreRpc implements DatastoreRpc {
Expand Down Expand Up @@ -95,44 +88,12 @@ public GrpcDatastoreRpc(DatastoreOptions datastoreOptions) throws IOException {
.build())
.build());

// Hook into Gax's Metrics collection framework
MetricsTracerFactory metricsTracerFactory = buildMetricsTracerFactory(datastoreOptions);
if (metricsTracerFactory != null) {
builder.setTracerFactory(metricsTracerFactory);
}

datastoreStub = GrpcDatastoreStub.create(builder.build());
} catch (IOException e) {
throw new IOException(e);
}
}

/**
* Build the MetricsTracerFactory to hook into Gax's Otel Framework. Only hooks into Gax on two
* conditions: 1. OpenTelemetry instance is passed in by the user 2. Metrics are enabled
*
* <p>Sets default attributes to be recorded as part of the metrics.
*/
static MetricsTracerFactory buildMetricsTracerFactory(DatastoreOptions datastoreOptions) {
if (!datastoreOptions.getOpenTelemetryOptions().isMetricsEnabled()) {
return null;
}
OpenTelemetry openTelemetry = datastoreOptions.getOpenTelemetryOptions().getOpenTelemetry();
if (openTelemetry == null) {
openTelemetry = GlobalOpenTelemetry.get();
}
OpenTelemetryMetricsRecorder gaxMetricsRecorder =
new OpenTelemetryMetricsRecorder(openTelemetry, TelemetryConstants.SERVICE_NAME);
Map<String, String> attributes = new HashMap<>();
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, datastoreOptions.getProjectId());
if (!Strings.isNullOrEmpty(datastoreOptions.getDatabaseId())) {
attributes.put(
TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, datastoreOptions.getDatabaseId());
}
attributes.put(TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT, "grpc");
return new MetricsTracerFactory(gaxMetricsRecorder, attributes);
}

@Override
public void close() throws Exception {
if (!closed) {
Expand Down
Loading
Loading