Skip to content

Commit ac1ee6c

Browse files
authored
fix(lineage): logging reduction and fixes (#9878)
1 parent 6a06770 commit ac1ee6c

File tree

4 files changed

+46
-65
lines changed

4 files changed

+46
-65
lines changed

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/search/SearchAcrossLineageResolver.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import com.google.common.collect.ImmutableSet;
88
import com.linkedin.common.urn.Urn;
9+
import com.linkedin.datahub.graphql.generated.AndFilterInput;
910
import com.linkedin.datahub.graphql.generated.EntityType;
1011
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
1112
import com.linkedin.datahub.graphql.generated.LineageDirection;
@@ -92,9 +93,14 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
9293

9394
final int start = input.getStart() != null ? input.getStart() : DEFAULT_START;
9495
final int count = input.getCount() != null ? input.getCount() : DEFAULT_COUNT;
95-
final List<FacetFilterInput> filters =
96-
input.getFilters() != null ? input.getFilters() : new ArrayList<>();
97-
final Integer maxHops = getMaxHops(filters);
96+
final List<AndFilterInput> filters =
97+
input.getOrFilters() != null ? input.getOrFilters() : new ArrayList<>();
98+
final List<FacetFilterInput> facetFilters =
99+
filters.stream()
100+
.map(AndFilterInput::getAnd)
101+
.flatMap(List::stream)
102+
.collect(Collectors.toList());
103+
final Integer maxHops = getMaxHops(facetFilters);
98104

99105
@Nullable
100106
final Long startTimeMillis =
@@ -117,7 +123,8 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
117123
start,
118124
count);
119125

120-
final Filter filter = ResolverUtils.buildFilter(filters, input.getOrFilters());
126+
final Filter filter =
127+
ResolverUtils.buildFilter(input.getFilters(), input.getOrFilters());
121128
SearchFlags searchFlags = null;
122129
com.linkedin.datahub.graphql.generated.SearchFlags inputFlags = input.getSearchFlags();
123130
if (inputFlags != null) {

metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java

+1-25
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ public LineageResponse getLineage(
302302
exploreMultiplePaths);
303303
for (LineageRelationship oneHopRelnship : oneHopRelationships) {
304304
if (result.containsKey(oneHopRelnship.getEntity())) {
305+
log.debug("Urn encountered again during graph walk {}", oneHopRelnship.getEntity());
305306
result.put(
306307
oneHopRelnship.getEntity(),
307308
mergeLineageRelationships(result.get(oneHopRelnship.getEntity()), oneHopRelnship));
@@ -553,26 +554,6 @@ public static void addEdgeToPaths(
553554
addEdgeToPaths(existingPaths, parentUrn, null, childUrn);
554555
}
555556

556-
/**
557-
* Utility method to log paths to the debug log.
558-
*
559-
* @param paths
560-
* @param message
561-
*/
562-
private static void logPaths(UrnArrayArray paths, String message) {
563-
if (log.isDebugEnabled()) {
564-
log.debug("xxxxxxxxxx");
565-
log.debug(message);
566-
log.debug("---------");
567-
if (paths != null) {
568-
paths.forEach(path -> log.debug("{}", path));
569-
} else {
570-
log.debug("EMPTY");
571-
}
572-
log.debug("xxxxxxxxxx");
573-
}
574-
}
575-
576557
private static boolean containsCycle(final UrnArray path) {
577558
Set<Urn> urnSet = path.stream().collect(Collectors.toUnmodifiableSet());
578559
// path contains a cycle if any urn is repeated twice
@@ -587,8 +568,6 @@ public static boolean addEdgeToPaths(
587568
boolean edgeAdded = false;
588569
// Collect all full-paths to this child node. This is what will be returned.
589570
UrnArrayArray pathsToParent = existingPaths.get(parentUrn);
590-
logPaths(pathsToParent, String.format("Paths to Parent: %s, Child: %s", parentUrn, childUrn));
591-
logPaths(existingPaths.get(childUrn), String.format("Existing Paths to Child: %s", childUrn));
592571
if (pathsToParent != null && !pathsToParent.isEmpty()) {
593572
// If there are existing paths to this parent node, then we attempt
594573
// to append the child to each of the existing paths (lengthen it).
@@ -630,7 +609,6 @@ public static boolean addEdgeToPaths(
630609
existingPaths.get(childUrn).add(pathToChild);
631610
edgeAdded = true;
632611
}
633-
logPaths(existingPaths.get(childUrn), String.format("New paths to Child: %s", childUrn));
634612
return edgeAdded;
635613
}
636614

@@ -655,7 +633,6 @@ private static List<LineageRelationship> extractRelationships(
655633
for (SearchHit hit : hits) {
656634
index++;
657635
final Map<String, Object> document = hit.getSourceAsMap();
658-
log.debug("{}: hit: {}", index, document);
659636
final Urn sourceUrn =
660637
UrnUtils.getUrn(((Map<String, Object>) document.get(SOURCE)).get("urn").toString());
661638
final Urn destinationUrn =
@@ -808,7 +785,6 @@ private static List<LineageRelationship> extractRelationships(
808785
}
809786
List<LineageRelationship> result = new ArrayList<>(lineageRelationshipMap.values());
810787
log.debug("Number of lineage relationships in list: {}", result.size());
811-
log.debug("Result: {}", result);
812788
return result;
813789
} catch (Exception e) {
814790
// This exception handler merely exists to log the exception at an appropriate point and

metadata-service/configuration/src/main/resources/application.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ elasticsearch:
230230
timeoutSeconds: ${ELASTICSEARCH_SEARCH_GRAPH_TIMEOUT_SECONDS:50} # graph dao timeout seconds
231231
batchSize: ${ELASTICSEARCH_SEARCH_GRAPH_BATCH_SIZE:1000} # graph dao batch size
232232
maxResult: ${ELASTICSEARCH_SEARCH_GRAPH_MAX_RESULT:10000} # graph dao max result size
233-
enableMultiPathSearch: ${ELASTICSEARCH_SEARCH_GRAPH_MULTI_PATH_SEARCH:true}
233+
enableMultiPathSearch: ${ELASTICSEARCH_SEARCH_GRAPH_MULTI_PATH_SEARCH:false}
234234

235235
# TODO: Kafka topic convention
236236
kafka:
@@ -315,7 +315,7 @@ systemUpdate:
315315
backOffFactor: ${BOOTSTRAP_SYSTEM_UPDATE_BACK_OFF_FACTOR:2} # Multiplicative factor for back off, default values will result in waiting 5min 15s
316316
waitForSystemUpdate: ${BOOTSTRAP_SYSTEM_UPDATE_WAIT_FOR_SYSTEM_UPDATE:true}
317317
dataJobNodeCLL:
318-
enabled: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_ENABLED:true}
318+
enabled: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_ENABLED:false}
319319
batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_BATCH_SIZE:200}
320320
browsePathsV2:
321321
enabled: ${BOOTSTRAP_SYSTEM_UPDATE_BROWSE_PATHS_V2_ENABLED:true}

smoke-test/tests/lineage/test_lineage.py

+32-34
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,8 @@
88
import pytest
99
from datahub.cli.cli_utils import get_url_and_token
1010
from datahub.emitter.mcp import MetadataChangeProposalWrapper
11-
from datahub.ingestion.graph.client import (
12-
DatahubClientConfig,
13-
DataHubGraph,
14-
get_default_graph,
15-
)
11+
from datahub.ingestion.graph.client import DataHubGraph # get_default_graph,
12+
from datahub.ingestion.graph.client import DatahubClientConfig
1613
from datahub.metadata.schema_classes import (
1714
AuditStampClass,
1815
ChangeAuditStampsClass,
@@ -959,32 +956,33 @@ def ingest_multipath_metadata(
959956
wait_for_writes_to_sync()
960957

961958

962-
@pytest.mark.dependency(depends=["test_healthchecks"])
963-
def test_simple_lineage_multiple_paths(
964-
ingest_multipath_metadata,
965-
chart_urn_fixture,
966-
intermediates_fixture,
967-
destination_urn_fixture,
968-
):
969-
chart_urn = chart_urn_fixture
970-
intermediates = intermediates_fixture
971-
destination_urn = destination_urn_fixture
972-
results = search_across_lineage(
973-
get_default_graph(),
974-
chart_urn,
975-
direction="UPSTREAM",
976-
convert_schema_fields_to_datasets=True,
977-
)
978-
assert destination_urn in [
979-
x["entity"]["urn"] for x in results["searchAcrossLineage"]["searchResults"]
980-
]
981-
for search_result in results["searchAcrossLineage"]["searchResults"]:
982-
if search_result["entity"]["urn"] == destination_urn:
983-
assert (
984-
len(search_result["paths"]) == 2
985-
) # 2 paths from the chart to the dataset
986-
for path in search_result["paths"]:
987-
assert len(path["path"]) == 3
988-
assert path["path"][-1]["urn"] == destination_urn
989-
assert path["path"][0]["urn"] == chart_urn
990-
assert path["path"][1]["urn"] in intermediates
959+
# TODO: Reenable once fixed
960+
# @pytest.mark.dependency(depends=["test_healthchecks"])
961+
# def test_simple_lineage_multiple_paths(
962+
# ingest_multipath_metadata,
963+
# chart_urn_fixture,
964+
# intermediates_fixture,
965+
# destination_urn_fixture,
966+
# ):
967+
# chart_urn = chart_urn_fixture
968+
# intermediates = intermediates_fixture
969+
# destination_urn = destination_urn_fixture
970+
# results = search_across_lineage(
971+
# get_default_graph(),
972+
# chart_urn,
973+
# direction="UPSTREAM",
974+
# convert_schema_fields_to_datasets=True,
975+
# )
976+
# assert destination_urn in [
977+
# x["entity"]["urn"] for x in results["searchAcrossLineage"]["searchResults"]
978+
# ]
979+
# for search_result in results["searchAcrossLineage"]["searchResults"]:
980+
# if search_result["entity"]["urn"] == destination_urn:
981+
# assert (
982+
# len(search_result["paths"]) == 2
983+
# ) # 2 paths from the chart to the dataset
984+
# for path in search_result["paths"]:
985+
# assert len(path["path"]) == 3
986+
# assert path["path"][-1]["urn"] == destination_urn
987+
# assert path["path"][0]["urn"] == chart_urn
988+
# assert path["path"][1]["urn"] in intermediates

0 commit comments

Comments
 (0)