Skip to content

Commit 6422b27

Browse files
authored
Merge branch 'master' into ing-784-lookml-parameter
2 parents 0cd9551 + 48f3cc5 commit 6422b27

File tree

83 files changed

+1129
-449
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+1129
-449
lines changed

datahub-frontend/app/auth/AuthModule.java

+2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.datahubproject.metadata.context.EntityRegistryContext;
2828
import io.datahubproject.metadata.context.OperationContext;
2929
import io.datahubproject.metadata.context.OperationContextConfig;
30+
import io.datahubproject.metadata.context.RetrieverContext;
3031
import io.datahubproject.metadata.context.SearchContext;
3132
import io.datahubproject.metadata.context.ValidationContext;
3233
import java.nio.charset.StandardCharsets;
@@ -195,6 +196,7 @@ protected OperationContext provideOperationContext(
195196
.searchContext(SearchContext.EMPTY)
196197
.entityRegistryContext(EntityRegistryContext.builder().build(EmptyEntityRegistry.EMPTY))
197198
.validationContext(ValidationContext.builder().alternateValidation(false).build())
199+
.retrieverContext(RetrieverContext.EMPTY)
198200
.build(systemAuthentication);
199201
}
200202

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.linkedin.gms.factory.kafka.common.TopicConventionFactory;
1414
import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
1515
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
16+
import com.linkedin.metadata.aspect.CachingAspectRetriever;
1617
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
1718
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
1819
import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
@@ -186,6 +187,7 @@ protected OperationContext javaSystemOperationContext(
186187
components.getIndexConvention(),
187188
RetrieverContext.builder()
188189
.aspectRetriever(entityServiceAspectRetriever)
190+
.cachingAspectRetriever(CachingAspectRetriever.EMPTY)
189191
.graphRetriever(systemGraphRetriever)
190192
.searchRetriever(searchServiceSearchRetriever)
191193
.build(),

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/RestoreStorageStep.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ private void readerExecutable(ReaderWrapper reader, UpgradeContext context) {
180180
try {
181181
aspectRecord =
182182
EntityUtils.toSystemAspect(
183-
context.opContext().getRetrieverContext().get(), aspect.toEntityAspect())
183+
context.opContext().getRetrieverContext(), aspect.toEntityAspect())
184184
.get()
185185
.getRecordTemplate();
186186
} catch (Exception e) {

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
113113
List<Pair<Future<?>, SystemAspect>> futures;
114114
futures =
115115
EntityUtils.toSystemAspectFromEbeanAspects(
116-
opContext.getRetrieverContext().get(),
117-
batch.collect(Collectors.toList()))
116+
opContext.getRetrieverContext(), batch.collect(Collectors.toList()))
118117
.stream()
119118
.map(
120119
systemAspect -> {

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/BootstrapMCPUtil.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ static AspectsBatch generateAspectBatch(
100100
.collect(Collectors.toList());
101101

102102
return AspectsBatchImpl.builder()
103-
.mcps(mcps, auditStamp, opContext.getRetrieverContext().get())
104-
.retrieverContext(opContext.getRetrieverContext().get())
103+
.mcps(mcps, auditStamp, opContext.getRetrieverContext())
104+
.retrieverContext(opContext.getRetrieverContext())
105105
.build();
106106
}
107107

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadataStep.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -168,13 +168,13 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
168168

169169
AspectsBatch aspectsBatch =
170170
AspectsBatchImpl.builder()
171-
.retrieverContext(opContext.getRetrieverContext().get())
171+
.retrieverContext(opContext.getRetrieverContext())
172172
.items(
173173
batch
174174
.flatMap(
175175
ebeanAspectV2 ->
176176
EntityUtils.toSystemAspectFromEbeanAspects(
177-
opContext.getRetrieverContext().get(),
177+
opContext.getRetrieverContext(),
178178
Set.of(ebeanAspectV2))
179179
.stream())
180180
.map(
@@ -189,11 +189,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
189189
.auditStamp(systemAspect.getAuditStamp())
190190
.systemMetadata(
191191
withAppSource(systemAspect.getSystemMetadata()))
192-
.build(
193-
opContext
194-
.getRetrieverContext()
195-
.get()
196-
.getAspectRetriever()))
192+
.build(opContext.getAspectRetriever()))
197193
.collect(Collectors.toList()))
198194
.build();
199195

datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/schemafield/GenerateSchemaFieldsFromSchemaMetadataStepTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.linkedin.upgrade.DataHubUpgradeState;
2323
import io.datahubproject.metadata.context.OperationContext;
2424
import io.datahubproject.metadata.context.RetrieverContext;
25-
import java.util.Optional;
2625
import java.util.stream.Stream;
2726
import org.junit.jupiter.api.BeforeEach;
2827
import org.junit.jupiter.api.Test;
@@ -48,7 +47,7 @@ public void setup() {
4847
step =
4948
new GenerateSchemaFieldsFromSchemaMetadataStep(
5049
mockOpContext, mockEntityService, mockAspectDao, 10, 100, 1000);
51-
when(mockOpContext.getRetrieverContext()).thenReturn(Optional.of(mockRetrieverContext));
50+
when(mockOpContext.getRetrieverContext()).thenReturn(mockRetrieverContext);
5251
}
5352

5453
/** Test to verify the correct step ID is returned. */
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,38 @@
11
package com.linkedin.metadata.aspect;
22

3+
import com.linkedin.common.urn.Urn;
4+
import com.linkedin.entity.Aspect;
5+
import com.linkedin.metadata.models.registry.EmptyEntityRegistry;
6+
import com.linkedin.metadata.models.registry.EntityRegistry;
7+
import java.util.Collections;
8+
import java.util.Map;
9+
import java.util.Set;
10+
import javax.annotation.Nonnull;
11+
312
/** Responses can be cached based on application.yaml caching configuration for the EntityClient */
4-
public interface CachingAspectRetriever extends AspectRetriever {}
13+
public interface CachingAspectRetriever extends AspectRetriever {
14+
15+
CachingAspectRetriever EMPTY = new EmptyAspectRetriever();
16+
17+
class EmptyAspectRetriever implements CachingAspectRetriever {
18+
@Nonnull
19+
@Override
20+
public Map<Urn, Map<String, Aspect>> getLatestAspectObjects(
21+
Set<Urn> urns, Set<String> aspectNames) {
22+
return Collections.emptyMap();
23+
}
24+
25+
@Nonnull
26+
@Override
27+
public Map<Urn, Map<String, SystemAspect>> getLatestSystemAspects(
28+
Map<Urn, Set<String>> urnAspectNames) {
29+
return Collections.emptyMap();
30+
}
31+
32+
@Nonnull
33+
@Override
34+
public EntityRegistry getEntityRegistry() {
35+
return EmptyEntityRegistry.EMPTY;
36+
}
37+
}
38+
}

entity-registry/src/main/java/com/linkedin/metadata/aspect/GraphRetriever.java

+23
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.linkedin.metadata.query.filter.Filter;
55
import com.linkedin.metadata.query.filter.RelationshipFilter;
66
import com.linkedin.metadata.query.filter.SortCriterion;
7+
import java.util.Collections;
78
import java.util.List;
89
import java.util.function.Function;
910
import javax.annotation.Nonnull;
@@ -97,4 +98,26 @@ default void consumeRelatedEntities(
9798
}
9899
}
99100
}
101+
102+
GraphRetriever EMPTY = new EmptyGraphRetriever();
103+
104+
class EmptyGraphRetriever implements GraphRetriever {
105+
106+
@Nonnull
107+
@Override
108+
public RelatedEntitiesScrollResult scrollRelatedEntities(
109+
@Nullable List<String> sourceTypes,
110+
@Nonnull Filter sourceEntityFilter,
111+
@Nullable List<String> destinationTypes,
112+
@Nonnull Filter destinationEntityFilter,
113+
@Nonnull List<String> relationshipTypes,
114+
@Nonnull RelationshipFilter relationshipFilter,
115+
@Nonnull List<SortCriterion> sortCriterion,
116+
@Nullable String scrollId,
117+
int count,
118+
@Nullable Long startTimeMillis,
119+
@Nullable Long endTimeMillis) {
120+
return new RelatedEntitiesScrollResult(0, 0, null, Collections.emptyList());
121+
}
122+
}
100123
}

entity-registry/src/main/java/com/linkedin/metadata/entity/SearchRetriever.java

+19
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.linkedin.metadata.query.filter.Filter;
44
import com.linkedin.metadata.search.ScrollResult;
5+
import com.linkedin.metadata.search.SearchEntityArray;
56
import java.util.List;
67
import javax.annotation.Nonnull;
78
import javax.annotation.Nullable;
@@ -21,4 +22,22 @@ ScrollResult scroll(
2122
@Nullable Filter filters,
2223
@Nullable String scrollId,
2324
int count);
25+
26+
SearchRetriever EMPTY = new EmptySearchRetriever();
27+
28+
class EmptySearchRetriever implements SearchRetriever {
29+
30+
@Override
31+
public ScrollResult scroll(
32+
@Nonnull List<String> entities,
33+
@Nullable Filter filters,
34+
@Nullable String scrollId,
35+
int count) {
36+
ScrollResult empty = new ScrollResult();
37+
empty.setEntities(new SearchEntityArray());
38+
empty.setNumEntities(0);
39+
empty.setPageSize(0);
40+
return empty;
41+
}
42+
}
2443
}

entity-registry/src/testFixtures/java/com/linkedin/test/metadata/aspect/MockAspectRetriever.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import com.linkedin.data.DataMap;
66
import com.linkedin.data.template.RecordTemplate;
77
import com.linkedin.entity.Aspect;
8-
import com.linkedin.metadata.aspect.AspectRetriever;
8+
import com.linkedin.metadata.aspect.CachingAspectRetriever;
99
import com.linkedin.metadata.aspect.SystemAspect;
1010
import com.linkedin.metadata.models.registry.EntityRegistry;
1111
import com.linkedin.mxe.SystemMetadata;
@@ -22,7 +22,7 @@
2222
import javax.annotation.Nonnull;
2323
import org.mockito.Mockito;
2424

25-
public class MockAspectRetriever implements AspectRetriever {
25+
public class MockAspectRetriever implements CachingAspectRetriever {
2626
private final Map<Urn, Map<String, Aspect>> data;
2727
private final Map<Urn, Map<String, SystemAspect>> systemData = new HashMap<>();
2828

li-utils/src/main/java/com/linkedin/metadata/Constants.java

+2
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,8 @@ public class Constants {
409409
/** User Status */
410410
public static final String CORP_USER_STATUS_ACTIVE = "ACTIVE";
411411

412+
public static final String CORP_USER_STATUS_SUSPENDED = "SUSPENDED";
413+
412414
/** Task Runs */
413415
public static final String DATA_PROCESS_INSTANCE_ENTITY_NAME = "dataProcessInstance";
414416

metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py

+20-8
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
PropertyValueClass,
1515
StructuredPropertyDefinitionClass,
1616
)
17-
from datahub.metadata.urns import StructuredPropertyUrn, Urn
17+
from datahub.metadata.urns import DataTypeUrn, StructuredPropertyUrn, Urn
1818
from datahub.utilities.urns._urn_base import URN_TYPES
1919

2020
logging.basicConfig(level=logging.INFO)
@@ -86,19 +86,31 @@ class StructuredProperties(ConfigModel):
8686

8787
@validator("type")
8888
def validate_type(cls, v: str) -> str:
89-
# Convert to lowercase if needed
90-
if not v.islower():
89+
# This logic is somewhat hacky, since we need to deal with
90+
# 1. fully qualified urns
91+
# 2. raw data types, that need to get the datahub namespace prefix
92+
# While keeping the user-facing interface and error messages clean.
93+
94+
if not v.startswith("urn:li:") and not v.islower():
95+
# Convert to lowercase if needed
96+
v = v.lower()
9197
logger.warning(
92-
f"Structured property type should be lowercase. Updated to {v.lower()}"
98+
f"Structured property type should be lowercase. Updated to {v}"
9399
)
94-
v = v.lower()
100+
101+
urn = Urn.make_data_type_urn(v)
95102

96103
# Check if type is allowed
97-
if not AllowedTypes.check_allowed_type(v):
104+
data_type_urn = DataTypeUrn.from_string(urn)
105+
unqualified_data_type = data_type_urn.id
106+
if unqualified_data_type.startswith("datahub."):
107+
unqualified_data_type = unqualified_data_type[len("datahub.") :]
108+
if not AllowedTypes.check_allowed_type(unqualified_data_type):
98109
raise ValueError(
99-
f"Type {v} is not allowed. Allowed types are {AllowedTypes.values()}"
110+
f"Type {unqualified_data_type} is not allowed. Allowed types are {AllowedTypes.values()}"
100111
)
101-
return v
112+
113+
return urn
102114

103115
@property
104116
def fqn(self) -> str:

metadata-ingestion/src/datahub/ingestion/source/mlflow.py

+30-5
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,30 @@
3838
class MLflowConfig(EnvConfigMixin):
3939
tracking_uri: Optional[str] = Field(
4040
default=None,
41-
description="Tracking server URI. If not set, an MLflow default tracking_uri is used (local `mlruns/` directory or `MLFLOW_TRACKING_URI` environment variable)",
41+
description=(
42+
"Tracking server URI. If not set, an MLflow default tracking_uri is used"
43+
" (local `mlruns/` directory or `MLFLOW_TRACKING_URI` environment variable)"
44+
),
4245
)
4346
registry_uri: Optional[str] = Field(
4447
default=None,
45-
description="Registry server URI. If not set, an MLflow default registry_uri is used (value of tracking_uri or `MLFLOW_REGISTRY_URI` environment variable)",
48+
description=(
49+
"Registry server URI. If not set, an MLflow default registry_uri is used"
50+
" (value of tracking_uri or `MLFLOW_REGISTRY_URI` environment variable)"
51+
),
4652
)
4753
model_name_separator: str = Field(
4854
default="_",
4955
description="A string which separates model name from its version (e.g. model_1 or model-1)",
5056
)
57+
base_external_url: Optional[str] = Field(
58+
default=None,
59+
description=(
60+
"Base URL to use when constructing external URLs to MLflow."
61+
" If not set, tracking_uri is used if it's an HTTP URL."
62+
" If neither is set, external URLs are not generated."
63+
),
64+
)
5165

5266

5367
@dataclass
@@ -279,12 +293,23 @@ def _make_ml_model_urn(self, model_version: ModelVersion) -> str:
279293
)
280294
return urn
281295

282-
def _make_external_url(self, model_version: ModelVersion) -> Union[None, str]:
296+
def _get_base_external_url_from_tracking_uri(self) -> Optional[str]:
297+
if isinstance(
298+
self.client.tracking_uri, str
299+
) and self.client.tracking_uri.startswith("http"):
300+
return self.client.tracking_uri
301+
else:
302+
return None
303+
304+
def _make_external_url(self, model_version: ModelVersion) -> Optional[str]:
283305
"""
284306
Generate URL for a Model Version to MLflow UI.
285307
"""
286-
base_uri = self.client.tracking_uri
287-
if base_uri.startswith("http"):
308+
base_uri = (
309+
self.config.base_external_url
310+
or self._get_base_external_url_from_tracking_uri()
311+
)
312+
if base_uri:
288313
return f"{base_uri.rstrip('/')}/#/models/{model_version.name}/versions/{model_version.version}"
289314
else:
290315
return None

metadata-ingestion/tests/unit/test_mlflow_source.py

+13
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,16 @@ def test_make_external_link_remote(source, model_version):
136136
url = source._make_external_url(model_version)
137137

138138
assert url == expected_url
139+
140+
141+
def test_make_external_link_remote_via_config(source, model_version):
142+
custom_base_url = "https://custom-server.org"
143+
source.config.base_external_url = custom_base_url
144+
source.client = MlflowClient(
145+
tracking_uri="https://dummy-mlflow-tracking-server.org"
146+
)
147+
expected_url = f"{custom_base_url}/#/models/{model_version.name}/versions/{model_version.version}"
148+
149+
url = source._make_external_url(model_version)
150+
151+
assert url == expected_url

metadata-io/metadata-io-api/src/test/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImplTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
1717
import com.linkedin.dataset.DatasetProperties;
1818
import com.linkedin.events.metadata.ChangeType;
19-
import com.linkedin.metadata.aspect.AspectRetriever;
19+
import com.linkedin.metadata.aspect.CachingAspectRetriever;
2020
import com.linkedin.metadata.aspect.GraphRetriever;
2121
import com.linkedin.metadata.aspect.batch.MCPItem;
2222
import com.linkedin.metadata.aspect.patch.GenericJsonPatch;
@@ -56,7 +56,7 @@
5656

5757
public class AspectsBatchImplTest {
5858
private EntityRegistry testRegistry;
59-
private AspectRetriever mockAspectRetriever;
59+
private CachingAspectRetriever mockAspectRetriever;
6060
private RetrieverContext retrieverContext;
6161

6262
@BeforeTest
@@ -75,12 +75,12 @@ public void beforeTest() throws EntityRegistryException {
7575

7676
@BeforeMethod
7777
public void setup() {
78-
this.mockAspectRetriever = mock(AspectRetriever.class);
78+
this.mockAspectRetriever = mock(CachingAspectRetriever.class);
7979
when(this.mockAspectRetriever.getEntityRegistry()).thenReturn(testRegistry);
8080
this.retrieverContext =
8181
RetrieverContext.builder()
8282
.searchRetriever(mock(SearchRetriever.class))
83-
.aspectRetriever(mockAspectRetriever)
83+
.cachingAspectRetriever(mockAspectRetriever)
8484
.graphRetriever(mock(GraphRetriever.class))
8585
.build();
8686
}

0 commit comments

Comments
 (0)