|
1 |
| -import json |
2 | 1 | import logging
|
3 | 2 | from datetime import datetime, timezone
|
4 | 3 | from typing import (
|
|
25 | 24 | BrowsePathsV2Class,
|
26 | 25 | ChangeTypeClass,
|
27 | 26 | ContainerClass,
|
| 27 | + DatasetProfileClass, |
28 | 28 | DatasetPropertiesClass,
|
29 | 29 | DatasetUsageStatisticsClass,
|
30 | 30 | MetadataChangeEventClass,
|
@@ -80,20 +80,40 @@ def create_dataset_props_patch_builder(
|
80 | 80 |
|
81 | 81 |
|
82 | 82 | def check_mcp_correctness(mcp: MetadataChangeProposalClass):
|
83 |
| - logger.debug(f"Processing as MCP with urn: {mcp.entityUrn} and aspect: {mcp.aspectName}, change type: {mcp.changeType}") |
| 83 | + logger.debug( |
| 84 | + f"Processing as MCP with urn: {mcp.entityUrn} and aspect: {mcp.aspectName}, change type: {mcp.changeType}" |
| 85 | + ) |
84 | 86 | logger.debug(f"Aspect length: {len(mcp.aspect.value)}")
|
85 | 87 | logger.debug(f"Full aspect:\n{mcp.aspect}")
|
86 | 88 |
|
87 | 89 |
|
88 | 90 | def check_mcpw_correctness(mcp: MetadataChangeProposalWrapper):
|
89 |
| - logger.debug(f"Processing as MCPW with urn: {mcp.entityUrn} and aspect: {mcp.aspectName}, change type: {mcp.changeType}") |
| 91 | + logger.debug( |
| 92 | + f"Processing as MCPW with urn: {mcp.entityUrn} and aspect: {mcp.aspectName}, change type: {mcp.changeType}" |
| 93 | + ) |
90 | 94 | logger.debug(f"Full aspect:\n{mcp.aspect}")
|
91 | 95 | if isinstance(mcp.aspect, SchemaMetadataClass):
|
92 | 96 | schema: SchemaMetadataClass = mcp.aspect
|
93 | 97 | logger.debug(f"Schema aspect dump:\n{schema.to_obj()}")
|
| 98 | + if isinstance(mcp.aspect, DatasetProfileClass): |
| 99 | + profile: DatasetProfileClass = mcp.aspect |
| 100 | + logger.debug(f"Dataset Profile aspect dump:\n{profile.to_obj()}") |
| 101 | + logger.debug(f"Length of field profiles: {len(profile.fieldProfiles)}") |
| 102 | + for field in profile.fieldProfiles: |
| 103 | + logger.debug( |
| 104 | + f"Field {field.fieldPath} has {len(field.sampleValues)} sample values" |
| 105 | + ) |
| 106 | + values_len = 0 |
| 107 | + for value in field.sampleValues: |
| 108 | + values_len += len(value) |
| 109 | + logger.debug( |
| 110 | + f"Field {field.fieldPath} has {len(field.sampleValues)} sample values, taking total bytes {values_len}" |
| 111 | + ) |
94 | 112 |
|
95 | 113 |
|
96 |
| -def check_workunit_correctness(stream: Iterable[MetadataWorkUnit]) -> Iterable[MetadataWorkUnit]: |
| 114 | +def check_workunit_correctness( |
| 115 | + stream: Iterable[MetadataWorkUnit], |
| 116 | +) -> Iterable[MetadataWorkUnit]: |
97 | 117 | for wu in stream:
|
98 | 118 | logger.debug(f"Checking correctnes for workunit: {wu.id}")
|
99 | 119 | if isinstance(wu.metadata, MetadataChangeProposalClass):
|
|
0 commit comments