Skip to content

Commit 18495bb

Browse files
committed
feat(telemetry): cross-component async write tracing
* created TraceContext for opentelemetry spans * added tracing header/cookies to control logging trace info * support legacy dropwizard tracing using opentelemetry * added smoke-tests for tracing conditions
1 parent 262dd76 commit 18495bb

File tree

146 files changed

+6929
-1805
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

146 files changed

+6929
-1805
lines changed

build.gradle

+5-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ buildscript {
3838
ext.springVersion = '6.1.14'
3939
ext.springBootVersion = '3.2.9'
4040
ext.springKafkaVersion = '3.1.6'
41-
ext.openTelemetryVersion = '1.18.0'
41+
ext.openTelemetryVersion = '1.45.0'
4242
ext.neo4jVersion = '5.14.0'
4343
ext.neo4jTestVersion = '5.14.0'
4444
ext.neo4jApocVersion = '5.14.0'
@@ -218,7 +218,10 @@ project.ext.externalDependency = [
218218
'neo4jApocCore': 'org.neo4j.procedure:apoc-core:' + neo4jApocVersion,
219219
'neo4jApocCommon': 'org.neo4j.procedure:apoc-common:' + neo4jApocVersion,
220220
'opentelemetryApi': 'io.opentelemetry:opentelemetry-api:' + openTelemetryVersion,
221-
'opentelemetryAnnotations': 'io.opentelemetry:opentelemetry-extension-annotations:' + openTelemetryVersion,
221+
'opentelemetrySdk': 'io.opentelemetry:opentelemetry-sdk:' + openTelemetryVersion,
222+
'opentelemetrySdkTrace': 'io.opentelemetry:opentelemetry-sdk-trace:' + openTelemetryVersion,
223+
'opentelemetryAutoConfig': 'io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:' + openTelemetryVersion,
224+
'opentelemetryAnnotations': 'io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:2.11.0',
222225
'opentracingJdbc':'io.opentracing.contrib:opentracing-jdbc:0.2.15',
223226
'parquet': 'org.apache.parquet:parquet-avro:1.12.3',
224227
'parquetHadoop': 'org.apache.parquet:parquet-hadoop:1.13.1',

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/group/EntityCountsResolver.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import com.linkedin.metadata.service.ViewService;
1313
import graphql.schema.DataFetcher;
1414
import graphql.schema.DataFetchingEnvironment;
15-
import io.opentelemetry.extension.annotations.WithSpan;
15+
import io.opentelemetry.instrumentation.annotations.WithSpan;
1616
import java.util.List;
1717
import java.util.Map;
1818
import java.util.concurrent.CompletableFuture;

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/MutableTypeBatchResolver.java

+23-18
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22

33
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
44

5-
import com.codahale.metrics.Timer;
5+
import com.linkedin.datahub.graphql.QueryContext;
66
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
77
import com.linkedin.datahub.graphql.exception.AuthorizationException;
88
import com.linkedin.datahub.graphql.types.BatchMutableType;
99
import com.linkedin.metadata.utils.metrics.MetricUtils;
1010
import graphql.schema.DataFetcher;
1111
import graphql.schema.DataFetchingEnvironment;
12+
import io.datahubproject.metadata.context.OperationContext;
1213
import java.util.List;
1314
import java.util.concurrent.CompletableFuture;
1415
import org.slf4j.Logger;
@@ -33,25 +34,29 @@ public MutableTypeBatchResolver(final BatchMutableType<I, B, T> batchMutableType
3334

3435
@Override
3536
public CompletableFuture<List<T>> get(DataFetchingEnvironment environment) throws Exception {
37+
final QueryContext context = environment.getContext();
38+
final OperationContext opContext = context.getOperationContext();
39+
3640
final B[] input =
3741
bindArgument(environment.getArgument("input"), _batchMutableType.batchInputClass());
3842

39-
return GraphQLConcurrencyUtils.supplyAsync(
40-
() -> {
41-
Timer.Context timer = MetricUtils.timer(this.getClass(), "batchMutate").time();
42-
43-
try {
44-
return _batchMutableType.batchUpdate(input, environment.getContext());
45-
} catch (AuthorizationException e) {
46-
throw e;
47-
} catch (Exception e) {
48-
_logger.error("Failed to perform batchUpdate", e);
49-
throw new IllegalArgumentException(e);
50-
} finally {
51-
timer.stop();
52-
}
53-
},
54-
this.getClass().getSimpleName(),
55-
"get");
43+
return opContext.withSpan(
44+
"batchMutate",
45+
() ->
46+
GraphQLConcurrencyUtils.supplyAsync(
47+
() -> {
48+
try {
49+
return _batchMutableType.batchUpdate(input, environment.getContext());
50+
} catch (AuthorizationException e) {
51+
throw e;
52+
} catch (Exception e) {
53+
_logger.error("Failed to perform batchUpdate", e);
54+
throw new IllegalArgumentException(e);
55+
}
56+
},
57+
this.getClass().getSimpleName(),
58+
"get"),
59+
MetricUtils.DROPWIZARD_METRIC,
60+
"true");
5661
}
5762
}

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/recommendation/ListRecommendationsResolver.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import graphql.schema.DataFetcher;
2828
import graphql.schema.DataFetchingEnvironment;
2929
import io.datahubproject.metadata.context.OperationContext;
30-
import io.opentelemetry.extension.annotations.WithSpan;
30+
import io.opentelemetry.instrumentation.annotations.WithSpan;
3131
import java.net.URISyntaxException;
3232
import java.util.Collections;
3333
import java.util.List;

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/search/SearchResolver.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import com.linkedin.metadata.query.SearchFlags;
2020
import graphql.schema.DataFetcher;
2121
import graphql.schema.DataFetchingEnvironment;
22-
import io.opentelemetry.extension.annotations.WithSpan;
22+
import io.opentelemetry.instrumentation.annotations.WithSpan;
2323
import java.util.Collections;
2424
import java.util.concurrent.CompletableFuture;
2525
import lombok.RequiredArgsConstructor;

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
import com.linkedin.gms.factory.graphql.GraphQLEngineFactory;
66
import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory;
77
import com.linkedin.gms.factory.kafka.SimpleKafkaConsumerFactory;
8+
import com.linkedin.gms.factory.kafka.trace.KafkaTraceReaderFactory;
89
import com.linkedin.gms.factory.telemetry.ScheduledAnalyticsFactory;
10+
import com.linkedin.gms.factory.trace.TraceServiceFactory;
911
import org.springframework.boot.WebApplicationType;
1012
import org.springframework.boot.autoconfigure.SpringBootApplication;
1113
import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration;
@@ -30,7 +32,9 @@
3032
DataHubAuthorizerFactory.class,
3133
SimpleKafkaConsumerFactory.class,
3234
KafkaEventConsumerFactory.class,
33-
GraphQLEngineFactory.class
35+
GraphQLEngineFactory.class,
36+
KafkaTraceReaderFactory.class,
37+
TraceServiceFactory.class
3438
})
3539
})
3640
public class UpgradeCliApplication {

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java

+1
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ protected OperationContext javaSystemOperationContext(
195195
.alternateValidation(
196196
configurationProvider.getFeatureFlags().isAlternateMCPValidation())
197197
.build(),
198+
null,
198199
true);
199200

200201
entityServiceAspectRetriever.setSystemOperationContext(systemOperationContext);

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/impl/DefaultUpgradeManager.java

+54-39
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.linkedin.datahub.upgrade.impl;
22

33
import com.codahale.metrics.MetricRegistry;
4-
import com.codahale.metrics.Timer;
54
import com.linkedin.datahub.upgrade.Upgrade;
65
import com.linkedin.datahub.upgrade.UpgradeCleanupStep;
76
import com.linkedin.datahub.upgrade.UpgradeContext;
@@ -119,44 +118,60 @@ private UpgradeResult executeInternal(UpgradeContext context) {
119118
}
120119

121120
private UpgradeStepResult executeStepInternal(UpgradeContext context, UpgradeStep step) {
122-
int retryCount = step.retryCount();
123-
UpgradeStepResult result = null;
124-
int maxAttempts = retryCount + 1;
125-
for (int i = 0; i < maxAttempts; i++) {
126-
try (Timer.Context completionTimer =
127-
MetricUtils.timer(MetricRegistry.name(step.id(), "completionTime")).time()) {
128-
try (Timer.Context executionTimer =
129-
MetricUtils.timer(MetricRegistry.name(step.id(), "executionTime")).time()) {
130-
result = step.executable().apply(context);
131-
}
132-
133-
if (result == null) {
134-
// Failed to even retrieve a result. Create a default failure result.
135-
result = new DefaultUpgradeStepResult(step.id(), DataHubUpgradeState.FAILED);
136-
context
137-
.report()
138-
.addLine(String.format("Retrying %s more times...", maxAttempts - (i + 1)));
139-
MetricUtils.counter(MetricRegistry.name(step.id(), "retry")).inc();
140-
}
141-
142-
if (DataHubUpgradeState.SUCCEEDED.equals(result.result())) {
143-
MetricUtils.counter(MetricRegistry.name(step.id(), "succeeded")).inc();
144-
break;
145-
}
146-
} catch (Exception e) {
147-
log.error("Caught exception during attempt {} of Step with id {}", i, step.id(), e);
148-
context
149-
.report()
150-
.addLine(
151-
String.format(
152-
"Caught exception during attempt %s of Step with id %s: %s", i, step.id(), e));
153-
MetricUtils.counter(MetricRegistry.name(step.id(), "failed")).inc();
154-
result = new DefaultUpgradeStepResult(step.id(), DataHubUpgradeState.FAILED);
155-
context.report().addLine(String.format("Retrying %s more times...", maxAttempts - (i + 1)));
156-
}
157-
}
158-
159-
return result;
121+
return context
122+
.opContext()
123+
.withSpan(
124+
"completionTime",
125+
() -> {
126+
int retryCount = step.retryCount();
127+
UpgradeStepResult result = null;
128+
int maxAttempts = retryCount + 1;
129+
for (int i = 0; i < maxAttempts; i++) {
130+
try {
131+
result =
132+
context
133+
.opContext()
134+
.withSpan(
135+
"executionTime",
136+
() -> step.executable().apply(context),
137+
"step.id",
138+
step.id(),
139+
MetricUtils.DROPWIZARD_NAME,
140+
MetricUtils.name(step.id(), "executionTime"));
141+
142+
if (result == null) {
143+
// Failed to even retrieve a result. Create a default failure result.
144+
result = new DefaultUpgradeStepResult(step.id(), DataHubUpgradeState.FAILED);
145+
context
146+
.report()
147+
.addLine(String.format("Retrying %s more times...", maxAttempts - (i + 1)));
148+
MetricUtils.counter(MetricRegistry.name(step.id(), "retry")).inc();
149+
}
150+
151+
if (DataHubUpgradeState.SUCCEEDED.equals(result.result())) {
152+
MetricUtils.counter(MetricRegistry.name(step.id(), "succeeded")).inc();
153+
break;
154+
}
155+
} catch (Exception e) {
156+
log.error(
157+
"Caught exception during attempt {} of Step with id {}", i, step.id(), e);
158+
context
159+
.report()
160+
.addLine(
161+
String.format(
162+
"Caught exception during attempt %s of Step with id %s: %s",
163+
i, step.id(), e));
164+
MetricUtils.counter(MetricRegistry.name(step.id(), "failed")).inc();
165+
result = new DefaultUpgradeStepResult(step.id(), DataHubUpgradeState.FAILED);
166+
context
167+
.report()
168+
.addLine(String.format("Retrying %s more times...", maxAttempts - (i + 1)));
169+
}
170+
}
171+
return result;
172+
},
173+
MetricUtils.DROPWIZARD_METRIC,
174+
"true");
160175
}
161176

162177
private void executeCleanupInternal(UpgradeContext context, UpgradeResult result) {

entity-registry/src/main/java/com/linkedin/metadata/aspect/ReadItem.java

+7
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ static <T> T getAspect(Class<T> clazz, @Nullable RecordTemplate recordTemplate)
6464
@Nullable
6565
SystemMetadata getSystemMetadata();
6666

67+
/**
68+
* Set system metadata on the item
69+
*
70+
* @param systemMetadata
71+
*/
72+
void setSystemMetadata(@Nonnull SystemMetadata systemMetadata);
73+
6774
/**
6875
* The entity's schema
6976
*

entity-registry/src/main/java/com/linkedin/metadata/aspect/SystemAspect.java

+6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.sql.Timestamp;
77
import java.util.Optional;
88
import javax.annotation.Nonnull;
9+
import org.apache.commons.lang3.NotImplementedException;
910

1011
/**
1112
* An aspect along with system metadata and creation timestamp. Represents an aspect as stored in
@@ -36,4 +37,9 @@ default Optional<Long> getSystemMetadataVersion() {
3637
.map(SystemMetadata::getVersion)
3738
.map(Long::parseLong);
3839
}
40+
41+
@Override
42+
default void setSystemMetadata(@Nonnull SystemMetadata systemMetadata) {
43+
throw new NotImplementedException();
44+
}
3945
}

entity-registry/src/main/java/com/linkedin/metadata/aspect/batch/MCLItem.java

+5
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ default SystemMetadata getSystemMetadata() {
3737
return getMetadataChangeLog().getSystemMetadata();
3838
}
3939

40+
@Override
41+
default void setSystemMetadata(@Nonnull SystemMetadata systemMetadata) {
42+
getMetadataChangeLog().setSystemMetadata(systemMetadata);
43+
}
44+
4045
default SystemMetadata getPreviousSystemMetadata() {
4146
return getMetadataChangeLog().getPreviousSystemMetadata();
4247
}

entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/batch/TestMCP.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public static <T extends RecordTemplate> Set<ChangeMCP> ofOneMCP(
116116

117117
private Urn urn;
118118
private RecordTemplate recordTemplate;
119-
private SystemMetadata systemMetadata;
119+
@Setter private SystemMetadata systemMetadata;
120120
private AuditStamp auditStamp;
121121
private ChangeType changeType;
122122
@Nonnull private final EntitySpec entitySpec;

0 commit comments

Comments
 (0)