Skip to content

Commit 9f333f2

Browse files
david-leifkerchakru-r
authored andcommitted
feat(openapi-v3): add minimal timeseries aspect support (datahub-project#12096)
1 parent 121402d commit 9f333f2

File tree

19 files changed

+596
-64
lines changed

19 files changed

+596
-64
lines changed

li-utils/src/main/java/com/linkedin/metadata/Constants.java

+2
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ public class Constants {
108108
// Common
109109
public static final String OWNERSHIP_ASPECT_NAME = "ownership";
110110

111+
public static final String TIMESTAMP_MILLIS = "timestampMillis";
112+
111113
public static final String INSTITUTIONAL_MEMORY_ASPECT_NAME = "institutionalMemory";
112114
public static final String DATA_PLATFORM_INSTANCE_ASPECT_NAME = "dataPlatformInstance";
113115
public static final String BROWSE_PATHS_ASPECT_NAME = "browsePaths";

metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -1265,6 +1265,7 @@ private Stream<IngestResult> ingestTimeseriesProposal(
12651265
return timeseriesResults.stream()
12661266
.map(
12671267
result -> {
1268+
MCPItem item = result.getFirst();
12681269
Optional<Pair<Future<?>, Boolean>> emissionStatus = result.getSecond();
12691270

12701271
emissionStatus.ifPresent(
@@ -1276,10 +1277,16 @@ private Stream<IngestResult> ingestTimeseriesProposal(
12761277
}
12771278
});
12781279

1279-
MCPItem request = result.getFirst();
12801280
return IngestResult.builder()
1281-
.urn(request.getUrn())
1282-
.request(request)
1281+
.urn(item.getUrn())
1282+
.request(item)
1283+
.result(
1284+
UpdateAspectResult.builder()
1285+
.urn(item.getUrn())
1286+
.newValue(item.getRecordTemplate())
1287+
.auditStamp(item.getAuditStamp())
1288+
.newSystemMetadata(item.getSystemMetadata())
1289+
.build())
12831290
.publishedMCL(
12841291
emissionStatus.map(status -> status.getFirst() != null).orElse(false))
12851292
.processedMCL(emissionStatus.map(Pair::getSecond).orElse(false))

metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java

+83-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.linkedin.common.urn.Urn;
1414
import com.linkedin.data.ByteString;
1515
import com.linkedin.metadata.aspect.EnvelopedAspect;
16+
import com.linkedin.metadata.config.TimeseriesAspectServiceConfig;
1617
import com.linkedin.metadata.models.AspectSpec;
1718
import com.linkedin.metadata.models.EntitySpec;
1819
import com.linkedin.metadata.models.annotation.SearchableAnnotation;
@@ -53,8 +54,15 @@
5354
import java.util.Collection;
5455
import java.util.List;
5556
import java.util.Map;
57+
import java.util.Objects;
5658
import java.util.Optional;
5759
import java.util.Set;
60+
import java.util.concurrent.ArrayBlockingQueue;
61+
import java.util.concurrent.ExecutionException;
62+
import java.util.concurrent.ExecutorService;
63+
import java.util.concurrent.Future;
64+
import java.util.concurrent.ThreadPoolExecutor;
65+
import java.util.concurrent.TimeUnit;
5866
import java.util.stream.Collectors;
5967
import javax.annotation.Nonnull;
6068
import javax.annotation.Nullable;
@@ -103,18 +111,29 @@ public class ElasticSearchTimeseriesAspectService
103111
private final RestHighLevelClient searchClient;
104112
private final ESAggregatedStatsDAO esAggregatedStatsDAO;
105113
private final QueryFilterRewriteChain queryFilterRewriteChain;
114+
private final ExecutorService queryPool;
106115

107116
public ElasticSearchTimeseriesAspectService(
108117
@Nonnull RestHighLevelClient searchClient,
109118
@Nonnull TimeseriesAspectIndexBuilders indexBuilders,
110119
@Nonnull ESBulkProcessor bulkProcessor,
111120
int numRetries,
112-
@Nonnull QueryFilterRewriteChain queryFilterRewriteChain) {
121+
@Nonnull QueryFilterRewriteChain queryFilterRewriteChain,
122+
@Nonnull TimeseriesAspectServiceConfig timeseriesAspectServiceConfig) {
113123
this.indexBuilders = indexBuilders;
114124
this.searchClient = searchClient;
115125
this.bulkProcessor = bulkProcessor;
116126
this.numRetries = numRetries;
117127
this.queryFilterRewriteChain = queryFilterRewriteChain;
128+
this.queryPool =
129+
new ThreadPoolExecutor(
130+
timeseriesAspectServiceConfig.getQuery().getConcurrency(), // core threads
131+
timeseriesAspectServiceConfig.getQuery().getConcurrency(), // max threads
132+
timeseriesAspectServiceConfig.getQuery().getKeepAlive(),
133+
TimeUnit.SECONDS, // thread keep-alive time
134+
new ArrayBlockingQueue<>(
135+
timeseriesAspectServiceConfig.getQuery().getQueueSize()), // fixed size queue
136+
new ThreadPoolExecutor.CallerRunsPolicy());
118137

119138
esAggregatedStatsDAO = new ESAggregatedStatsDAO(searchClient, queryFilterRewriteChain);
120139
}
@@ -400,6 +419,69 @@ public List<EnvelopedAspect> getAspectValues(
400419
.collect(Collectors.toList());
401420
}
402421

422+
@Nonnull
423+
@Override
424+
public Map<Urn, Map<String, EnvelopedAspect>> getLatestTimeseriesAspectValues(
425+
@Nonnull OperationContext opContext,
426+
@Nonnull Set<Urn> urns,
427+
@Nonnull Set<String> aspectNames,
428+
@Nullable Map<String, Long> endTimeMillis) {
429+
Map<Urn, List<Future<Pair<String, EnvelopedAspect>>>> futures =
430+
urns.stream()
431+
.map(
432+
urn -> {
433+
List<Future<Pair<String, EnvelopedAspect>>> aspectFutures =
434+
aspectNames.stream()
435+
.map(
436+
aspectName ->
437+
queryPool.submit(
438+
() -> {
439+
List<EnvelopedAspect> oneResultList =
440+
getAspectValues(
441+
opContext,
442+
urn,
443+
urn.getEntityType(),
444+
aspectName,
445+
null,
446+
endTimeMillis == null
447+
? null
448+
: endTimeMillis.get(aspectName),
449+
1,
450+
null,
451+
null);
452+
return !oneResultList.isEmpty()
453+
? Pair.of(aspectName, oneResultList.get(0))
454+
: null;
455+
}))
456+
.collect(Collectors.toList());
457+
458+
return Map.entry(urn, aspectFutures);
459+
})
460+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
461+
462+
return futures.entrySet().stream()
463+
.map(
464+
e ->
465+
Map.entry(
466+
e.getKey(),
467+
e.getValue().stream()
468+
.map(
469+
f -> {
470+
try {
471+
return f.get();
472+
} catch (InterruptedException | ExecutionException ex) {
473+
throw new RuntimeException(ex);
474+
}
475+
})
476+
.filter(Objects::nonNull)
477+
.collect(Collectors.toList())))
478+
.collect(
479+
Collectors.toMap(
480+
Map.Entry::getKey,
481+
e ->
482+
e.getValue().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue))));
483+
}
484+
403485
@Override
404486
@Nonnull
405487
public GenericTable getAggregatedStats(

metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceTestBase.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.linkedin.data.template.StringMap;
2727
import com.linkedin.data.template.StringMapArray;
2828
import com.linkedin.metadata.aspect.EnvelopedAspect;
29+
import com.linkedin.metadata.config.TimeseriesAspectServiceConfig;
2930
import com.linkedin.metadata.models.AspectSpec;
3031
import com.linkedin.metadata.models.DataSchemaFactory;
3132
import com.linkedin.metadata.models.EntitySpec;
@@ -151,7 +152,8 @@ private ElasticSearchTimeseriesAspectService buildService() {
151152
opContext.getSearchContext().getIndexConvention()),
152153
getBulkProcessor(),
153154
1,
154-
QueryFilterRewriteChain.EMPTY);
155+
QueryFilterRewriteChain.EMPTY,
156+
TimeseriesAspectServiceConfig.builder().build());
155157
}
156158

157159
/*

metadata-io/src/test/java/com/linkedin/metadata/timeseries/search/TimeseriesAspectServiceUnitTest.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
66
import com.fasterxml.jackson.databind.node.NumericNode;
77
import com.fasterxml.jackson.databind.node.ObjectNode;
8+
import com.linkedin.metadata.config.TimeseriesAspectServiceConfig;
89
import com.linkedin.metadata.search.elasticsearch.query.filter.QueryFilterRewriteChain;
910
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
1011
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
@@ -44,7 +45,8 @@ public class TimeseriesAspectServiceUnitTest {
4445
_timeseriesAspectIndexBuilders,
4546
_bulkProcessor,
4647
0,
47-
QueryFilterRewriteChain.EMPTY);
48+
QueryFilterRewriteChain.EMPTY,
49+
TimeseriesAspectServiceConfig.builder().build());
4850
private final OperationContext opContext =
4951
TestOperationContexts.systemContextNoSearchAuthorization(_indexConvention);
5052

metadata-service/configuration/src/main/java/com/linkedin/metadata/config/DataHubAppConfiguration.java

+3
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,7 @@ public class DataHubAppConfiguration {
5858

5959
/** MCP throttling configuration */
6060
private MetadataChangeProposalConfig metadataChangeProposal;
61+
62+
/** Timeseries Aspect Service configuration */
63+
private TimeseriesAspectServiceConfig timeseriesAspectService;
6164
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.linkedin.metadata.config;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Builder;
5+
import lombok.Data;
6+
import lombok.NoArgsConstructor;
7+
8+
@Data
9+
@Builder(toBuilder = true)
10+
@AllArgsConstructor
11+
@NoArgsConstructor
12+
public class ExecutorServiceConfig {
13+
@Builder.Default private int concurrency = 2;
14+
@Builder.Default private int queueSize = 100;
15+
@Builder.Default private int keepAlive = 60;
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.linkedin.metadata.config;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Builder;
5+
import lombok.Data;
6+
import lombok.NoArgsConstructor;
7+
8+
@Data
9+
@Builder(toBuilder = true)
10+
@AllArgsConstructor
11+
@NoArgsConstructor
12+
public class TimeseriesAspectServiceConfig {
13+
@Builder.Default private ExecutorServiceConfig query = ExecutorServiceConfig.builder().build();
14+
}

metadata-service/configuration/src/main/resources/application.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ searchService:
118118
pageSize: ${SEARCH_SERVICE_FILTER_DOMAIN_EXPANSION_PAGE_SIZE:100}
119119
limit: ${SEARCH_SERVICE_FILTER_DOMAIN_EXPANSION_LIMIT:100}
120120

121+
timeseriesAspectService:
122+
query:
123+
concurrency: ${TIMESERIES_ASPECT_SERVICE_QUERY_CONCURRENCY:10} # parallel threads
124+
queueSize: ${TIMESERIES_ASPECT_SERVICE_QUERY)QUEUE_SIZE:500}
125+
threadKeepAlive: ${TIMESERIES_ASPECT_SERVICE_QUERY_THREAD_KEEP_ALIVE:60}
126+
121127
configEntityRegistry:
122128
path: ${ENTITY_REGISTRY_CONFIG_PATH:../../metadata-models/src/main/resources/entity-registry.yml}
123129
# Priority is given to the `path` setting above (outside jar)

metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeseries/ElasticSearchTimeseriesAspectServiceFactory.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.gms.factory.timeseries;
22

3+
import com.linkedin.gms.factory.config.ConfigurationProvider;
34
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
45
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
56
import com.linkedin.metadata.models.registry.EntityRegistry;
@@ -27,13 +28,15 @@ public class ElasticSearchTimeseriesAspectServiceFactory {
2728
@Bean(name = "elasticSearchTimeseriesAspectService")
2829
@Nonnull
2930
protected ElasticSearchTimeseriesAspectService getInstance(
30-
final QueryFilterRewriteChain queryFilterRewriteChain) {
31+
final QueryFilterRewriteChain queryFilterRewriteChain,
32+
final ConfigurationProvider configurationProvider) {
3133
return new ElasticSearchTimeseriesAspectService(
3234
components.getSearchClient(),
3335
new TimeseriesAspectIndexBuilders(
3436
components.getIndexBuilder(), entityRegistry, components.getIndexConvention()),
3537
components.getBulkProcessor(),
3638
components.getNumRetries(),
37-
queryFilterRewriteChain);
39+
queryFilterRewriteChain,
40+
configurationProvider.getTimeseriesAspectService());
3841
}
3942
}

0 commit comments

Comments
 (0)