|
| 1 | +from datahub.ingestion.extractor.schema_util import AvroToMceSchemaConverter |
| 2 | +from avro.schema import parse as parse_avro, RecordSchema |
| 3 | +from datahub.emitter.synchronized_file_emitter import SynchronizedFileEmitter |
| 4 | +import datahub.metadata.schema_classes as models |
| 5 | +import click |
| 6 | +from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn |
| 7 | +from datahub.emitter.mcp import MetadataChangeProposalWrapper |
| 8 | +import os |
| 9 | +import hashlib |
| 10 | +from datahub.ingestion.graph.client import get_default_graph |
| 11 | + |
| 12 | + |
| 13 | +def get_schema_hash(schema): |
| 14 | + # Convert schema to string if it isn't already |
| 15 | + schema_str = str(schema) |
| 16 | + |
| 17 | + # Create MD5 hash |
| 18 | + schema_hash = hashlib.md5(schema_str.encode("utf-8")).hexdigest() |
| 19 | + |
| 20 | + return schema_hash |
| 21 | + |
| 22 | + |
| 23 | +@click.command(name="avro2datahub") |
| 24 | +@click.option("--input-file", "-i", type=click.Path(exists=True), required=True) |
| 25 | +@click.option("--platform", type=str, required=True) |
| 26 | +@click.option("--output-file", "-o", type=click.Path(), default="metadata.py.json") |
| 27 | +@click.option("--to-file", "-f", is_flag=True, default=True) |
| 28 | +@click.option("--to-server", "-s", is_flag=True, default=False) |
| 29 | +def generate_schema_file_from_avro_schema( |
| 30 | + input_file: str, platform: str, output_file: str, to_file: bool, to_server: bool |
| 31 | +): |
| 32 | + avro_schema_file = input_file |
| 33 | + output_file_name = output_file |
| 34 | + platform_urn = make_data_platform_urn(platform) |
| 35 | + converter = AvroToMceSchemaConverter(is_key_schema=False) |
| 36 | + |
| 37 | + # Delete the output file if it exists |
| 38 | + if os.path.exists(output_file_name): |
| 39 | + os.remove(output_file_name) |
| 40 | + |
| 41 | + with open(avro_schema_file) as f: |
| 42 | + raw_string = f.read() |
| 43 | + avro_schema = parse_avro(raw_string) |
| 44 | + # Get fingerprint bytes |
| 45 | + canonical_form = avro_schema.canonical_form |
| 46 | + print( |
| 47 | + f"Schema canonical form: Length ({len(canonical_form)}); {canonical_form}" |
| 48 | + ) |
| 49 | + md5_bytes = avro_schema.fingerprint("md5") |
| 50 | + # Convert to hex string |
| 51 | + avro_schema_hash = md5_bytes.hex() |
| 52 | + assert isinstance( |
| 53 | + avro_schema, RecordSchema |
| 54 | + ), "This command only works for Avro records" |
| 55 | + dataset_urn = make_dataset_urn( |
| 56 | + platform=platform_urn, |
| 57 | + name=( |
| 58 | + f"{avro_schema.namespace}.{avro_schema.name}" |
| 59 | + if avro_schema.namespace |
| 60 | + else avro_schema.name |
| 61 | + ), |
| 62 | + ) |
| 63 | + schema_fields = [ |
| 64 | + f for f in converter.to_mce_fields(avro_schema, is_key_schema=False) |
| 65 | + ] |
| 66 | + schema_metadata = models.SchemaMetadataClass( |
| 67 | + schemaName=avro_schema.name, |
| 68 | + platform=platform_urn, |
| 69 | + version=0, |
| 70 | + hash=avro_schema_hash, |
| 71 | + platformSchema=models.OtherSchemaClass(rawSchema=raw_string), |
| 72 | + fields=schema_fields, |
| 73 | + ) |
| 74 | + assert schema_metadata.validate() |
| 75 | + if to_file: |
| 76 | + with SynchronizedFileEmitter(output_file_name) as file_emitter: |
| 77 | + file_emitter.emit( |
| 78 | + MetadataChangeProposalWrapper( |
| 79 | + entityUrn=dataset_urn, aspect=schema_metadata |
| 80 | + ) |
| 81 | + ) |
| 82 | + if to_server: |
| 83 | + with get_default_graph() as graph: |
| 84 | + graph.emit( |
| 85 | + MetadataChangeProposalWrapper( |
| 86 | + entityUrn=dataset_urn, aspect=schema_metadata |
| 87 | + ) |
| 88 | + ) |
| 89 | + |
| 90 | + print(f"Wrote metadata to {output_file}") |
| 91 | + |
| 92 | + |
| 93 | +if __name__ == "__main__": |
| 94 | + generate_schema_file_from_avro_schema() |
0 commit comments